Fix potential deadlock when a handler's destructor makes a call back into
the reactor.
This commit is contained in:
parent
ab54b864ea
commit
00acdadebd
@ -302,13 +302,29 @@ private:
|
|||||||
|
|
||||||
// Check if the thread is supposed to stop.
|
// Check if the thread is supposed to stop.
|
||||||
if (stop_thread_)
|
if (stop_thread_)
|
||||||
|
{
|
||||||
|
// Clean up operations. We must not hold the lock since the operations may
|
||||||
|
// make calls back into this reactor.
|
||||||
|
lock.unlock();
|
||||||
|
read_op_queue_.cleanup_operations();
|
||||||
|
write_op_queue_.cleanup_operations();
|
||||||
|
except_op_queue_.cleanup_operations();
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// We can return immediately if there's no work to do and the reactor is
|
// We can return immediately if there's no work to do and the reactor is
|
||||||
// not supposed to block.
|
// not supposed to block.
|
||||||
if (!block && read_op_queue_.empty() && write_op_queue_.empty()
|
if (!block && read_op_queue_.empty() && write_op_queue_.empty()
|
||||||
&& except_op_queue_.empty() && timer_queue_.empty())
|
&& except_op_queue_.empty() && timer_queue_.empty())
|
||||||
|
{
|
||||||
|
// Clean up operations. We must not hold the lock since the operations may
|
||||||
|
// make calls back into this reactor.
|
||||||
|
lock.unlock();
|
||||||
|
read_op_queue_.cleanup_operations();
|
||||||
|
write_op_queue_.cleanup_operations();
|
||||||
|
except_op_queue_.cleanup_operations();
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int timeout = block ? get_timeout() : 0;
|
int timeout = block ? get_timeout() : 0;
|
||||||
wait_in_progress_ = true;
|
wait_in_progress_ = true;
|
||||||
@ -398,6 +414,13 @@ private:
|
|||||||
for (size_t i = 0; i < pending_cancellations_.size(); ++i)
|
for (size_t i = 0; i < pending_cancellations_.size(); ++i)
|
||||||
cancel_ops_unlocked(pending_cancellations_[i]);
|
cancel_ops_unlocked(pending_cancellations_[i]);
|
||||||
pending_cancellations_.clear();
|
pending_cancellations_.clear();
|
||||||
|
|
||||||
|
// Clean up operations. We must not hold the lock since the operations may
|
||||||
|
// make calls back into this reactor.
|
||||||
|
lock.unlock();
|
||||||
|
read_op_queue_.cleanup_operations();
|
||||||
|
write_op_queue_.cleanup_operations();
|
||||||
|
except_op_queue_.cleanup_operations();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the select loop in the thread.
|
// Run the select loop in the thread.
|
||||||
|
@ -282,13 +282,29 @@ private:
|
|||||||
|
|
||||||
// Check if the thread is supposed to stop.
|
// Check if the thread is supposed to stop.
|
||||||
if (stop_thread_)
|
if (stop_thread_)
|
||||||
|
{
|
||||||
|
// Clean up operations. We must not hold the lock since the operations may
|
||||||
|
// make calls back into this reactor.
|
||||||
|
lock.unlock();
|
||||||
|
read_op_queue_.cleanup_operations();
|
||||||
|
write_op_queue_.cleanup_operations();
|
||||||
|
except_op_queue_.cleanup_operations();
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// We can return immediately if there's no work to do and the reactor is
|
// We can return immediately if there's no work to do and the reactor is
|
||||||
// not supposed to block.
|
// not supposed to block.
|
||||||
if (!block && read_op_queue_.empty() && write_op_queue_.empty()
|
if (!block && read_op_queue_.empty() && write_op_queue_.empty()
|
||||||
&& except_op_queue_.empty() && timer_queue_.empty())
|
&& except_op_queue_.empty() && timer_queue_.empty())
|
||||||
|
{
|
||||||
|
// Clean up operations. We must not hold the lock since the operations may
|
||||||
|
// make calls back into this reactor.
|
||||||
|
lock.unlock();
|
||||||
|
read_op_queue_.cleanup_operations();
|
||||||
|
write_op_queue_.cleanup_operations();
|
||||||
|
except_op_queue_.cleanup_operations();
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Determine how long to block while waiting for events.
|
// Determine how long to block while waiting for events.
|
||||||
timespec timeout_buf = { 0, 0 };
|
timespec timeout_buf = { 0, 0 };
|
||||||
@ -393,6 +409,13 @@ private:
|
|||||||
for (size_t i = 0; i < pending_cancellations_.size(); ++i)
|
for (size_t i = 0; i < pending_cancellations_.size(); ++i)
|
||||||
cancel_ops_unlocked(pending_cancellations_[i]);
|
cancel_ops_unlocked(pending_cancellations_[i]);
|
||||||
pending_cancellations_.clear();
|
pending_cancellations_.clear();
|
||||||
|
|
||||||
|
// Clean up operations. We must not hold the lock since the operations may
|
||||||
|
// make calls back into this reactor.
|
||||||
|
lock.unlock();
|
||||||
|
read_op_queue_.cleanup_operations();
|
||||||
|
write_op_queue_.cleanup_operations();
|
||||||
|
except_op_queue_.cleanup_operations();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the select loop in the thread.
|
// Run the select loop in the thread.
|
||||||
|
@ -36,7 +36,8 @@ public:
|
|||||||
// Constructor.
|
// Constructor.
|
||||||
reactor_op_queue()
|
reactor_op_queue()
|
||||||
: operations_(),
|
: operations_(),
|
||||||
cancelled_operations_(0)
|
cancelled_operations_(0),
|
||||||
|
cleanup_operations_(0)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -104,15 +105,16 @@ public:
|
|||||||
typename operation_map::iterator i = operations_.find(descriptor);
|
typename operation_map::iterator i = operations_.find(descriptor);
|
||||||
if (i != operations_.end())
|
if (i != operations_.end())
|
||||||
{
|
{
|
||||||
op_base* next_op = i->second->next_;
|
op_base* op = i->second;
|
||||||
i->second->next_ = 0;
|
i->second = op->next_;
|
||||||
bool done = i->second->invoke(result);
|
op->next_ = cleanup_operations_;
|
||||||
|
cleanup_operations_ = op;
|
||||||
|
bool done = op->invoke(result);
|
||||||
if (done)
|
if (done)
|
||||||
{
|
{
|
||||||
// Operation has finished.
|
// Operation has finished.
|
||||||
if (next_op)
|
if (i->second)
|
||||||
{
|
{
|
||||||
i->second = next_op;
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -124,8 +126,10 @@ public:
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Operation wants to be called again. Leave it at the front of the
|
// Operation wants to be called again. Leave it at the front of the
|
||||||
// queue for this descriptor.
|
// queue for this descriptor, and remove from the cleanup list.
|
||||||
i->second->next_ = next_op;
|
cleanup_operations_ = op->next_;
|
||||||
|
op->next_ = i->second;
|
||||||
|
i->second = op;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -138,19 +142,23 @@ public:
|
|||||||
typename operation_map::iterator i = operations_.find(descriptor);
|
typename operation_map::iterator i = operations_.find(descriptor);
|
||||||
if (i != operations_.end())
|
if (i != operations_.end())
|
||||||
{
|
{
|
||||||
operations_.erase(i);
|
|
||||||
while (i->second)
|
while (i->second)
|
||||||
{
|
{
|
||||||
op_base* next_op = i->second->next_;
|
op_base* op = i->second;
|
||||||
i->second->next_ = 0;
|
i->second = op->next_;
|
||||||
|
op->next_ = cleanup_operations_;
|
||||||
|
cleanup_operations_ = op;
|
||||||
bool done = i->second->invoke(result);
|
bool done = i->second->invoke(result);
|
||||||
if (!done)
|
if (!done)
|
||||||
{
|
{
|
||||||
// Operation has not finished yet, so leave at front of queue.
|
// Operation has not finished yet, so leave at front of queue, and
|
||||||
i->second->next_ = next_op;
|
// remove from the cleanup list.
|
||||||
|
cleanup_operations_ = op->next_;
|
||||||
|
op->next_ = i->second;
|
||||||
|
i->second = op;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
i->second = next_op;
|
operations_.erase(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -176,22 +184,26 @@ public:
|
|||||||
typename operation_map::iterator i = operations_.begin();
|
typename operation_map::iterator i = operations_.begin();
|
||||||
while (i != operations_.end())
|
while (i != operations_.end())
|
||||||
{
|
{
|
||||||
typename operation_map::iterator op = i++;
|
typename operation_map::iterator op_iter = i++;
|
||||||
if (descriptors.is_set(op->first))
|
if (descriptors.is_set(op_iter->first))
|
||||||
{
|
{
|
||||||
op_base* next_op = op->second->next_;
|
op_base* op = op_iter->second;
|
||||||
op->second->next_ = 0;
|
op_iter->second = op->next_;
|
||||||
|
op->next_ = cleanup_operations_;
|
||||||
|
cleanup_operations_ = op;
|
||||||
bool done = op->second->invoke(result);
|
bool done = op->second->invoke(result);
|
||||||
if (done)
|
if (done)
|
||||||
{
|
{
|
||||||
if (next_op)
|
if (!op_iter->second)
|
||||||
op->second = next_op;
|
|
||||||
else
|
|
||||||
operations_.erase(op);
|
operations_.erase(op);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
op->second->next_ = next_op;
|
// Operation has not finished yet, so leave at front of queue, and
|
||||||
|
// remove from the cleanup list.
|
||||||
|
cleanup_operations_ = op->next_;
|
||||||
|
op->next_ = op_iter->second;
|
||||||
|
op_iter->second = op;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -202,10 +214,23 @@ public:
|
|||||||
{
|
{
|
||||||
while (cancelled_operations_)
|
while (cancelled_operations_)
|
||||||
{
|
{
|
||||||
op_base* next_op = cancelled_operations_->next_;
|
op_base* op = cancelled_operations_;
|
||||||
cancelled_operations_->next_ = 0;
|
cancelled_operations_ = op->next_;
|
||||||
cancelled_operations_->invoke(asio::error::operation_aborted);
|
op->next_ = cleanup_operations_;
|
||||||
cancelled_operations_ = next_op;
|
cleanup_operations_ = op;
|
||||||
|
op->invoke(asio::error::operation_aborted);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Destroy operations that are waiting to be cleaned up.
|
||||||
|
void cleanup_operations()
|
||||||
|
{
|
||||||
|
while (cleanup_operations_)
|
||||||
|
{
|
||||||
|
op_base* next_op = cleanup_operations_->next_;
|
||||||
|
cleanup_operations_->next_ = 0;
|
||||||
|
cleanup_operations_->destroy();
|
||||||
|
cleanup_operations_ = next_op;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,15 +249,24 @@ private:
|
|||||||
// Perform the operation.
|
// Perform the operation.
|
||||||
bool invoke(int result)
|
bool invoke(int result)
|
||||||
{
|
{
|
||||||
return func_(this, result);
|
return invoke_func_(this, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Destroy the operation.
|
||||||
|
void destroy()
|
||||||
|
{
|
||||||
|
return destroy_func_(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
typedef bool (*func_type)(op_base*, int);
|
typedef bool (*invoke_func_type)(op_base*, int);
|
||||||
|
typedef void (*destroy_func_type)(op_base*);
|
||||||
|
|
||||||
// Construct an operation for the given descriptor.
|
// Construct an operation for the given descriptor.
|
||||||
op_base(func_type func, Descriptor descriptor)
|
op_base(invoke_func_type invoke_func,
|
||||||
: func_(func),
|
destroy_func_type destroy_func, Descriptor descriptor)
|
||||||
|
: invoke_func_(invoke_func),
|
||||||
|
destroy_func_(destroy_func),
|
||||||
descriptor_(descriptor),
|
descriptor_(descriptor),
|
||||||
next_(0)
|
next_(0)
|
||||||
{
|
{
|
||||||
@ -247,7 +281,10 @@ private:
|
|||||||
friend class reactor_op_queue<Descriptor>;
|
friend class reactor_op_queue<Descriptor>;
|
||||||
|
|
||||||
// The function to be called to dispatch the handler.
|
// The function to be called to dispatch the handler.
|
||||||
func_type func_;
|
invoke_func_type invoke_func_;
|
||||||
|
|
||||||
|
// The function to be called to delete the handler.
|
||||||
|
destroy_func_type destroy_func_;
|
||||||
|
|
||||||
// The descriptor associated with the operation.
|
// The descriptor associated with the operation.
|
||||||
Descriptor descriptor_;
|
Descriptor descriptor_;
|
||||||
@ -264,7 +301,8 @@ private:
|
|||||||
public:
|
public:
|
||||||
// Constructor.
|
// Constructor.
|
||||||
op(Descriptor descriptor, Handler handler)
|
op(Descriptor descriptor, Handler handler)
|
||||||
: op_base(&op<Handler>::invoke_handler, descriptor),
|
: op_base(&op<Handler>::invoke_handler,
|
||||||
|
&op<Handler>::destroy_handler, descriptor),
|
||||||
handler_(handler)
|
handler_(handler)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -272,11 +310,13 @@ private:
|
|||||||
// Invoke the handler.
|
// Invoke the handler.
|
||||||
static bool invoke_handler(op_base* base, int result)
|
static bool invoke_handler(op_base* base, int result)
|
||||||
{
|
{
|
||||||
std::auto_ptr<op<Handler> > o(static_cast<op<Handler>*>(base));
|
return static_cast<op<Handler>*>(base)->handler_(result);
|
||||||
bool done = o->handler_(result);
|
}
|
||||||
if (!done)
|
|
||||||
o.release();
|
// Delete the handler.
|
||||||
return done;
|
static void destroy_handler(op_base* base)
|
||||||
|
{
|
||||||
|
delete static_cast<op<Handler>*>(base);
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -291,6 +331,9 @@ private:
|
|||||||
|
|
||||||
// The list of operations that have been cancelled.
|
// The list of operations that have been cancelled.
|
||||||
op_base* cancelled_operations_;
|
op_base* cancelled_operations_;
|
||||||
|
|
||||||
|
// The list of operations to be destroyed.
|
||||||
|
op_base* cleanup_operations_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace detail
|
} // namespace detail
|
||||||
|
@ -196,13 +196,29 @@ private:
|
|||||||
|
|
||||||
// Check if the thread is supposed to stop.
|
// Check if the thread is supposed to stop.
|
||||||
if (stop_thread_)
|
if (stop_thread_)
|
||||||
|
{
|
||||||
|
// Clean up operations. We must not hold the lock since the operations may
|
||||||
|
// make calls back into this reactor.
|
||||||
|
lock.unlock();
|
||||||
|
read_op_queue_.cleanup_operations();
|
||||||
|
write_op_queue_.cleanup_operations();
|
||||||
|
except_op_queue_.cleanup_operations();
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// We can return immediately if there's no work to do and the reactor is
|
// We can return immediately if there's no work to do and the reactor is
|
||||||
// not supposed to block.
|
// not supposed to block.
|
||||||
if (!block && read_op_queue_.empty() && write_op_queue_.empty()
|
if (!block && read_op_queue_.empty() && write_op_queue_.empty()
|
||||||
&& except_op_queue_.empty() && timer_queue_.empty())
|
&& except_op_queue_.empty() && timer_queue_.empty())
|
||||||
|
{
|
||||||
|
// Clean up operations. We must not hold the lock since the operations may
|
||||||
|
// make calls back into this reactor.
|
||||||
|
lock.unlock();
|
||||||
|
read_op_queue_.cleanup_operations();
|
||||||
|
write_op_queue_.cleanup_operations();
|
||||||
|
except_op_queue_.cleanup_operations();
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Set up the descriptor sets.
|
// Set up the descriptor sets.
|
||||||
fd_set_adapter read_fds;
|
fd_set_adapter read_fds;
|
||||||
@ -255,6 +271,13 @@ private:
|
|||||||
for (size_t i = 0; i < pending_cancellations_.size(); ++i)
|
for (size_t i = 0; i < pending_cancellations_.size(); ++i)
|
||||||
cancel_ops_unlocked(pending_cancellations_[i]);
|
cancel_ops_unlocked(pending_cancellations_[i]);
|
||||||
pending_cancellations_.clear();
|
pending_cancellations_.clear();
|
||||||
|
|
||||||
|
// Clean up operations. We must not hold the lock since the operations may
|
||||||
|
// make calls back into this reactor.
|
||||||
|
lock.unlock();
|
||||||
|
read_op_queue_.cleanup_operations();
|
||||||
|
write_op_queue_.cleanup_operations();
|
||||||
|
except_op_queue_.cleanup_operations();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the select loop in the thread.
|
// Run the select loop in the thread.
|
||||||
|
Loading…
Reference in New Issue
Block a user