diff --git a/asio/include/asio/detail/epoll_reactor.hpp b/asio/include/asio/detail/epoll_reactor.hpp index 9870d628..89583007 100644 --- a/asio/include/asio/detail/epoll_reactor.hpp +++ b/asio/include/asio/detail/epoll_reactor.hpp @@ -302,13 +302,29 @@ private: // Check if the thread is supposed to stop. if (stop_thread_) + { + // Clean up operations. We must not hold the lock since the operations may + // make calls back into this reactor. + lock.unlock(); + read_op_queue_.cleanup_operations(); + write_op_queue_.cleanup_operations(); + except_op_queue_.cleanup_operations(); return; + } // We can return immediately if there's no work to do and the reactor is // not supposed to block. if (!block && read_op_queue_.empty() && write_op_queue_.empty() && except_op_queue_.empty() && timer_queue_.empty()) + { + // Clean up operations. We must not hold the lock since the operations may + // make calls back into this reactor. + lock.unlock(); + read_op_queue_.cleanup_operations(); + write_op_queue_.cleanup_operations(); + except_op_queue_.cleanup_operations(); return; + } int timeout = block ? get_timeout() : 0; wait_in_progress_ = true; @@ -398,6 +414,13 @@ private: for (size_t i = 0; i < pending_cancellations_.size(); ++i) cancel_ops_unlocked(pending_cancellations_[i]); pending_cancellations_.clear(); + + // Clean up operations. We must not hold the lock since the operations may + // make calls back into this reactor. + lock.unlock(); + read_op_queue_.cleanup_operations(); + write_op_queue_.cleanup_operations(); + except_op_queue_.cleanup_operations(); } // Run the select loop in the thread. diff --git a/asio/include/asio/detail/kqueue_reactor.hpp b/asio/include/asio/detail/kqueue_reactor.hpp index 036206ec..ba3d018f 100644 --- a/asio/include/asio/detail/kqueue_reactor.hpp +++ b/asio/include/asio/detail/kqueue_reactor.hpp @@ -282,13 +282,29 @@ private: // Check if the thread is supposed to stop. if (stop_thread_) + { + // Clean up operations. We must not hold the lock since the operations may + // make calls back into this reactor. + lock.unlock(); + read_op_queue_.cleanup_operations(); + write_op_queue_.cleanup_operations(); + except_op_queue_.cleanup_operations(); return; + } // We can return immediately if there's no work to do and the reactor is // not supposed to block. if (!block && read_op_queue_.empty() && write_op_queue_.empty() && except_op_queue_.empty() && timer_queue_.empty()) + { + // Clean up operations. We must not hold the lock since the operations may + // make calls back into this reactor. + lock.unlock(); + read_op_queue_.cleanup_operations(); + write_op_queue_.cleanup_operations(); + except_op_queue_.cleanup_operations(); return; + } // Determine how long to block while waiting for events. timespec timeout_buf = { 0, 0 }; @@ -393,6 +409,13 @@ private: for (size_t i = 0; i < pending_cancellations_.size(); ++i) cancel_ops_unlocked(pending_cancellations_[i]); pending_cancellations_.clear(); + + // Clean up operations. We must not hold the lock since the operations may + // make calls back into this reactor. + lock.unlock(); + read_op_queue_.cleanup_operations(); + write_op_queue_.cleanup_operations(); + except_op_queue_.cleanup_operations(); } // Run the select loop in the thread. diff --git a/asio/include/asio/detail/reactor_op_queue.hpp b/asio/include/asio/detail/reactor_op_queue.hpp index fb44210a..1fa0dec0 100644 --- a/asio/include/asio/detail/reactor_op_queue.hpp +++ b/asio/include/asio/detail/reactor_op_queue.hpp @@ -36,7 +36,8 @@ public: // Constructor. reactor_op_queue() : operations_(), - cancelled_operations_(0) + cancelled_operations_(0), + cleanup_operations_(0) { } @@ -104,15 +105,16 @@ public: typename operation_map::iterator i = operations_.find(descriptor); if (i != operations_.end()) { - op_base* next_op = i->second->next_; - i->second->next_ = 0; - bool done = i->second->invoke(result); + op_base* op = i->second; + i->second = op->next_; + op->next_ = cleanup_operations_; + cleanup_operations_ = op; + bool done = op->invoke(result); if (done) { // Operation has finished. - if (next_op) + if (i->second) { - i->second = next_op; return true; } else @@ -124,8 +126,10 @@ public: else { // Operation wants to be called again. Leave it at the front of the - // queue for this descriptor. - i->second->next_ = next_op; + // queue for this descriptor, and remove from the cleanup list. + cleanup_operations_ = op->next_; + op->next_ = i->second; + i->second = op; return true; } } @@ -138,19 +142,23 @@ public: typename operation_map::iterator i = operations_.find(descriptor); if (i != operations_.end()) { - operations_.erase(i); while (i->second) { - op_base* next_op = i->second->next_; - i->second->next_ = 0; + op_base* op = i->second; + i->second = op->next_; + op->next_ = cleanup_operations_; + cleanup_operations_ = op; bool done = i->second->invoke(result); if (!done) { - // Operation has not finished yet, so leave at front of queue. - i->second->next_ = next_op; + // Operation has not finished yet, so leave at front of queue, and + // remove from the cleanup list. + cleanup_operations_ = op->next_; + op->next_ = i->second; + i->second = op; return; } - i->second = next_op; + operations_.erase(i); } } } @@ -176,22 +184,26 @@ public: typename operation_map::iterator i = operations_.begin(); while (i != operations_.end()) { - typename operation_map::iterator op = i++; - if (descriptors.is_set(op->first)) + typename operation_map::iterator op_iter = i++; + if (descriptors.is_set(op_iter->first)) { - op_base* next_op = op->second->next_; - op->second->next_ = 0; + op_base* op = op_iter->second; + op_iter->second = op->next_; + op->next_ = cleanup_operations_; + cleanup_operations_ = op; bool done = op->second->invoke(result); if (done) { - if (next_op) - op->second = next_op; - else + if (!op_iter->second) operations_.erase(op); } else { - op->second->next_ = next_op; + // Operation has not finished yet, so leave at front of queue, and + // remove from the cleanup list. + cleanup_operations_ = op->next_; + op->next_ = op_iter->second; + op_iter->second = op; } } } @@ -202,10 +214,23 @@ public: { while (cancelled_operations_) { - op_base* next_op = cancelled_operations_->next_; - cancelled_operations_->next_ = 0; - cancelled_operations_->invoke(asio::error::operation_aborted); - cancelled_operations_ = next_op; + op_base* op = cancelled_operations_; + cancelled_operations_ = op->next_; + op->next_ = cleanup_operations_; + cleanup_operations_ = op; + op->invoke(asio::error::operation_aborted); + } + } + + // Destroy operations that are waiting to be cleaned up. + void cleanup_operations() + { + while (cleanup_operations_) + { + op_base* next_op = cleanup_operations_->next_; + cleanup_operations_->next_ = 0; + cleanup_operations_->destroy(); + cleanup_operations_ = next_op; } } @@ -224,15 +249,24 @@ private: // Perform the operation. bool invoke(int result) { - return func_(this, result); + return invoke_func_(this, result); + } + + // Destroy the operation. + void destroy() + { + return destroy_func_(this); } protected: - typedef bool (*func_type)(op_base*, int); + typedef bool (*invoke_func_type)(op_base*, int); + typedef void (*destroy_func_type)(op_base*); // Construct an operation for the given descriptor. - op_base(func_type func, Descriptor descriptor) - : func_(func), + op_base(invoke_func_type invoke_func, + destroy_func_type destroy_func, Descriptor descriptor) + : invoke_func_(invoke_func), + destroy_func_(destroy_func), descriptor_(descriptor), next_(0) { @@ -247,7 +281,10 @@ private: friend class reactor_op_queue; // The function to be called to dispatch the handler. - func_type func_; + invoke_func_type invoke_func_; + + // The function to be called to delete the handler. + destroy_func_type destroy_func_; // The descriptor associated with the operation. Descriptor descriptor_; @@ -264,7 +301,8 @@ private: public: // Constructor. op(Descriptor descriptor, Handler handler) - : op_base(&op::invoke_handler, descriptor), + : op_base(&op::invoke_handler, + &op::destroy_handler, descriptor), handler_(handler) { } @@ -272,11 +310,13 @@ private: // Invoke the handler. static bool invoke_handler(op_base* base, int result) { - std::auto_ptr > o(static_cast*>(base)); - bool done = o->handler_(result); - if (!done) - o.release(); - return done; + return static_cast*>(base)->handler_(result); + } + + // Delete the handler. + static void destroy_handler(op_base* base) + { + delete static_cast*>(base); } private: @@ -291,6 +331,9 @@ private: // The list of operations that have been cancelled. op_base* cancelled_operations_; + + // The list of operations to be destroyed. + op_base* cleanup_operations_; }; } // namespace detail diff --git a/asio/include/asio/detail/select_reactor.hpp b/asio/include/asio/detail/select_reactor.hpp index 142256fc..abd2cb34 100644 --- a/asio/include/asio/detail/select_reactor.hpp +++ b/asio/include/asio/detail/select_reactor.hpp @@ -196,13 +196,29 @@ private: // Check if the thread is supposed to stop. if (stop_thread_) + { + // Clean up operations. We must not hold the lock since the operations may + // make calls back into this reactor. + lock.unlock(); + read_op_queue_.cleanup_operations(); + write_op_queue_.cleanup_operations(); + except_op_queue_.cleanup_operations(); return; + } // We can return immediately if there's no work to do and the reactor is // not supposed to block. if (!block && read_op_queue_.empty() && write_op_queue_.empty() && except_op_queue_.empty() && timer_queue_.empty()) + { + // Clean up operations. We must not hold the lock since the operations may + // make calls back into this reactor. + lock.unlock(); + read_op_queue_.cleanup_operations(); + write_op_queue_.cleanup_operations(); + except_op_queue_.cleanup_operations(); return; + } // Set up the descriptor sets. fd_set_adapter read_fds; @@ -255,6 +271,13 @@ private: for (size_t i = 0; i < pending_cancellations_.size(); ++i) cancel_ops_unlocked(pending_cancellations_[i]); pending_cancellations_.clear(); + + // Clean up operations. We must not hold the lock since the operations may + // make calls back into this reactor. + lock.unlock(); + read_op_queue_.cleanup_operations(); + write_op_queue_.cleanup_operations(); + except_op_queue_.cleanup_operations(); } // Run the select loop in the thread.