Keep track of the number of OVERLAPPED-derived operations to ensure that

they all get cleaned up when the io_service is destroyed.
This commit is contained in:
chris_kohlhoff 2008-01-14 13:02:57 +00:00
parent 6a982e260a
commit a314529da0
3 changed files with 82 additions and 115 deletions

View File

@ -34,7 +34,6 @@
#include "asio/detail/service_base.hpp" #include "asio/detail/service_base.hpp"
#include "asio/detail/socket_types.hpp" #include "asio/detail/socket_types.hpp"
#include "asio/detail/timer_queue.hpp" #include "asio/detail/timer_queue.hpp"
#include "asio/detail/win_iocp_operation.hpp"
#include "asio/detail/mutex.hpp" #include "asio/detail/mutex.hpp"
namespace asio { namespace asio {
@ -44,14 +43,64 @@ class win_iocp_io_service
: public asio::detail::service_base<win_iocp_io_service> : public asio::detail::service_base<win_iocp_io_service>
{ {
public: public:
// Base class for all operations. // Base class for all operations. A function pointer is used instead of
typedef win_iocp_operation operation; // virtual functions to avoid the associated overhead.
//
// This class inherits from OVERLAPPED so that we can downcast to get back to
// the operation pointer from the LPOVERLAPPED out parameter of
// GetQueuedCompletionStatus.
class operation
: public OVERLAPPED
{
public:
typedef void (*invoke_func_type)(operation*, DWORD, size_t);
typedef void (*destroy_func_type)(operation*);
operation(win_iocp_io_service& iocp_service,
invoke_func_type invoke_func, destroy_func_type destroy_func)
: outstanding_operations_(&iocp_service.outstanding_operations_),
invoke_func_(invoke_func),
destroy_func_(destroy_func)
{
Internal = 0;
InternalHigh = 0;
Offset = 0;
OffsetHigh = 0;
hEvent = 0;
::InterlockedIncrement(outstanding_operations_);
}
void do_completion(DWORD last_error, size_t bytes_transferred)
{
invoke_func_(this, last_error, bytes_transferred);
}
void destroy()
{
destroy_func_(this);
}
protected:
// Prevent deletion through this type.
~operation()
{
::InterlockedDecrement(outstanding_operations_);
}
private:
long* outstanding_operations_;
invoke_func_type invoke_func_;
destroy_func_type destroy_func_;
};
// Constructor. // Constructor.
win_iocp_io_service(asio::io_service& io_service) win_iocp_io_service(asio::io_service& io_service)
: asio::detail::service_base<win_iocp_io_service>(io_service), : asio::detail::service_base<win_iocp_io_service>(io_service),
iocp_(), iocp_(),
outstanding_work_(0), outstanding_work_(0),
outstanding_operations_(0),
stopped_(0), stopped_(0),
shutdown_(0), shutdown_(0),
timer_thread_(0), timer_thread_(0),
@ -79,7 +128,7 @@ public:
{ {
::InterlockedExchange(&shutdown_, 1); ::InterlockedExchange(&shutdown_, 1);
for (;;) while (::InterlockedExchangeAdd(&outstanding_operations_, 0) > 0)
{ {
DWORD bytes_transferred = 0; DWORD bytes_transferred = 0;
#if (WINVER < 0x0500) #if (WINVER < 0x0500)
@ -88,12 +137,8 @@ public:
DWORD_PTR completion_key = 0; DWORD_PTR completion_key = 0;
#endif #endif
LPOVERLAPPED overlapped = 0; LPOVERLAPPED overlapped = 0;
::SetLastError(0); ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &completion_key, &overlapped, INFINITE);
&bytes_transferred, &completion_key, &overlapped, 0);
DWORD last_error = ::GetLastError();
if (!ok && overlapped == 0 && last_error == WAIT_TIMEOUT)
break;
if (overlapped) if (overlapped)
static_cast<operation*>(overlapped)->destroy(); static_cast<operation*>(overlapped)->destroy();
} }
@ -249,7 +294,7 @@ public:
} }
// Request invocation of the given OVERLAPPED-derived operation. // Request invocation of the given OVERLAPPED-derived operation.
void post_completion(win_iocp_operation* op, DWORD op_last_error, void post_completion(operation* op, DWORD op_last_error,
DWORD bytes_transferred) DWORD bytes_transferred)
{ {
// Enqueue the operation on the I/O completion port. // Enqueue the operation on the I/O completion port.
@ -547,7 +592,7 @@ private:
{ {
handler_operation(win_iocp_io_service& io_service, handler_operation(win_iocp_io_service& io_service,
Handler handler) Handler handler)
: operation(&handler_operation<Handler>::do_completion_impl, : operation(io_service, &handler_operation<Handler>::do_completion_impl,
&handler_operation<Handler>::destroy_impl), &handler_operation<Handler>::destroy_impl),
io_service_(io_service), io_service_(io_service),
handler_(handler) handler_(handler)
@ -608,6 +653,10 @@ private:
// The count of unfinished work. // The count of unfinished work.
long outstanding_work_; long outstanding_work_;
// The count of unfinished operations.
long outstanding_operations_;
friend class operation;
// Flag to indicate whether the event loop has been stopped. // Flag to indicate whether the event loop has been stopped.
long stopped_; long stopped_;

View File

@ -1,81 +0,0 @@
//
// win_iocp_operation.hpp
// ~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2007 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef ASIO_DETAIL_WIN_IOCP_OPERATION_HPP
#define ASIO_DETAIL_WIN_IOCP_OPERATION_HPP
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#include "asio/detail/push_options.hpp"
#include "asio/detail/win_iocp_io_service_fwd.hpp"
#if defined(ASIO_HAS_IOCP)
#include "asio/detail/socket_types.hpp"
namespace asio {
namespace detail {
// Base class for all IOCP operations. A function pointer is used instead of
// virtual functions to avoid the associated overhead.
//
// This class inherits from OVERLAPPED so that we can downcast to get back to
// the win_iocp_operation pointer from the LPOVERLAPPED out parameter of
// GetQueuedCompletionStatus.
struct win_iocp_operation
: public OVERLAPPED
{
typedef void (*invoke_func_type)(win_iocp_operation*, DWORD, size_t);
typedef void (*destroy_func_type)(win_iocp_operation*);
win_iocp_operation(invoke_func_type invoke_func,
destroy_func_type destroy_func)
: invoke_func_(invoke_func),
destroy_func_(destroy_func)
{
Internal = 0;
InternalHigh = 0;
Offset = 0;
OffsetHigh = 0;
hEvent = 0;
}
void do_completion(DWORD last_error, size_t bytes_transferred)
{
invoke_func_(this, last_error, bytes_transferred);
}
void destroy()
{
destroy_func_(this);
}
protected:
// Prevent deletion through this type.
~win_iocp_operation()
{
}
private:
invoke_func_type invoke_func_;
destroy_func_type destroy_func_;
};
} // namespace detail
} // namespace asio
#endif // defined(ASIO_HAS_IOCP)
#include "asio/detail/pop_options.hpp"
#endif // ASIO_DETAIL_WIN_IOCP_OPERATION_HPP

View File

@ -56,7 +56,7 @@ public:
typedef typename Protocol::endpoint endpoint_type; typedef typename Protocol::endpoint endpoint_type;
// Base class for all operations. // Base class for all operations.
typedef win_iocp_operation operation; typedef win_iocp_io_service::operation operation;
struct noop_deleter { void operator()(void*) {} }; struct noop_deleter { void operator()(void*) {} };
typedef boost::shared_ptr<void> shared_cancel_token_type; typedef boost::shared_ptr<void> shared_cancel_token_type;
@ -680,13 +680,13 @@ public:
: public operation : public operation
{ {
public: public:
send_operation(asio::io_service& io_service, send_operation(win_iocp_io_service& io_service,
weak_cancel_token_type cancel_token, weak_cancel_token_type cancel_token,
const ConstBufferSequence& buffers, Handler handler) const ConstBufferSequence& buffers, Handler handler)
: operation( : operation(io_service,
&send_operation<ConstBufferSequence, Handler>::do_completion_impl, &send_operation<ConstBufferSequence, Handler>::do_completion_impl,
&send_operation<ConstBufferSequence, Handler>::destroy_impl), &send_operation<ConstBufferSequence, Handler>::destroy_impl),
work_(io_service), work_(io_service.get_io_service()),
cancel_token_(cancel_token), cancel_token_(cancel_token),
buffers_(buffers), buffers_(buffers),
handler_(handler) handler_(handler)
@ -782,8 +782,8 @@ public:
typedef send_operation<ConstBufferSequence, Handler> value_type; typedef send_operation<ConstBufferSequence, Handler> value_type;
typedef handler_alloc_traits<Handler, value_type> alloc_traits; typedef handler_alloc_traits<Handler, value_type> alloc_traits;
raw_handler_ptr<alloc_traits> raw_ptr(handler); raw_handler_ptr<alloc_traits> raw_ptr(handler);
handler_ptr<alloc_traits> ptr(raw_ptr, handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
this->get_io_service(), impl.cancel_token_, buffers, handler); impl.cancel_token_, buffers, handler);
// Copy buffers into WSABUF array. // Copy buffers into WSABUF array.
::WSABUF bufs[max_buffers]; ::WSABUF bufs[max_buffers];
@ -880,12 +880,12 @@ public:
: public operation : public operation
{ {
public: public:
send_to_operation(asio::io_service& io_service, send_to_operation(win_iocp_io_service& io_service,
const ConstBufferSequence& buffers, Handler handler) const ConstBufferSequence& buffers, Handler handler)
: operation( : operation(io_service,
&send_to_operation<ConstBufferSequence, Handler>::do_completion_impl, &send_to_operation<ConstBufferSequence, Handler>::do_completion_impl,
&send_to_operation<ConstBufferSequence, Handler>::destroy_impl), &send_to_operation<ConstBufferSequence, Handler>::destroy_impl),
work_(io_service), work_(io_service.get_io_service()),
buffers_(buffers), buffers_(buffers),
handler_(handler) handler_(handler)
{ {
@ -973,8 +973,7 @@ public:
typedef send_to_operation<ConstBufferSequence, Handler> value_type; typedef send_to_operation<ConstBufferSequence, Handler> value_type;
typedef handler_alloc_traits<Handler, value_type> alloc_traits; typedef handler_alloc_traits<Handler, value_type> alloc_traits;
raw_handler_ptr<alloc_traits> raw_ptr(handler); raw_handler_ptr<alloc_traits> raw_ptr(handler);
handler_ptr<alloc_traits> ptr(raw_ptr, handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, buffers, handler);
this->get_io_service(), buffers, handler);
// Copy buffers into WSABUF array. // Copy buffers into WSABUF array.
::WSABUF bufs[max_buffers]; ::WSABUF bufs[max_buffers];
@ -1074,15 +1073,15 @@ public:
: public operation : public operation
{ {
public: public:
receive_operation(asio::io_service& io_service, receive_operation(win_iocp_io_service& io_service,
weak_cancel_token_type cancel_token, weak_cancel_token_type cancel_token,
const MutableBufferSequence& buffers, Handler handler) const MutableBufferSequence& buffers, Handler handler)
: operation( : operation(io_service,
&receive_operation< &receive_operation<
MutableBufferSequence, Handler>::do_completion_impl, MutableBufferSequence, Handler>::do_completion_impl,
&receive_operation< &receive_operation<
MutableBufferSequence, Handler>::destroy_impl), MutableBufferSequence, Handler>::destroy_impl),
work_(io_service), work_(io_service.get_io_service()),
cancel_token_(cancel_token), cancel_token_(cancel_token),
buffers_(buffers), buffers_(buffers),
handler_(handler) handler_(handler)
@ -1185,8 +1184,8 @@ public:
typedef receive_operation<MutableBufferSequence, Handler> value_type; typedef receive_operation<MutableBufferSequence, Handler> value_type;
typedef handler_alloc_traits<Handler, value_type> alloc_traits; typedef handler_alloc_traits<Handler, value_type> alloc_traits;
raw_handler_ptr<alloc_traits> raw_ptr(handler); raw_handler_ptr<alloc_traits> raw_ptr(handler);
handler_ptr<alloc_traits> ptr(raw_ptr, handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
this->get_io_service(), impl.cancel_token_, buffers, handler); impl.cancel_token_, buffers, handler);
// Copy buffers into WSABUF array. // Copy buffers into WSABUF array.
::WSABUF bufs[max_buffers]; ::WSABUF bufs[max_buffers];
@ -1290,17 +1289,17 @@ public:
: public operation : public operation
{ {
public: public:
receive_from_operation(asio::io_service& io_service, receive_from_operation(win_iocp_io_service& io_service,
endpoint_type& endpoint, const MutableBufferSequence& buffers, endpoint_type& endpoint, const MutableBufferSequence& buffers,
Handler handler) Handler handler)
: operation( : operation(io_service,
&receive_from_operation< &receive_from_operation<
MutableBufferSequence, Handler>::do_completion_impl, MutableBufferSequence, Handler>::do_completion_impl,
&receive_from_operation< &receive_from_operation<
MutableBufferSequence, Handler>::destroy_impl), MutableBufferSequence, Handler>::destroy_impl),
endpoint_(endpoint), endpoint_(endpoint),
endpoint_size_(static_cast<int>(endpoint.capacity())), endpoint_size_(static_cast<int>(endpoint.capacity())),
work_(io_service), work_(io_service.get_io_service()),
buffers_(buffers), buffers_(buffers),
handler_(handler) handler_(handler)
{ {
@ -1405,8 +1404,8 @@ public:
typedef receive_from_operation<MutableBufferSequence, Handler> value_type; typedef receive_from_operation<MutableBufferSequence, Handler> value_type;
typedef handler_alloc_traits<Handler, value_type> alloc_traits; typedef handler_alloc_traits<Handler, value_type> alloc_traits;
raw_handler_ptr<alloc_traits> raw_ptr(handler); raw_handler_ptr<alloc_traits> raw_ptr(handler);
handler_ptr<alloc_traits> ptr(raw_ptr, handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
this->get_io_service(), sender_endp, buffers, handler); sender_endp, buffers, handler);
// Copy buffers into WSABUF array. // Copy buffers into WSABUF array.
::WSABUF bufs[max_buffers]; ::WSABUF bufs[max_buffers];
@ -1508,7 +1507,7 @@ public:
socket_type socket, socket_type new_socket, Socket& peer, socket_type socket, socket_type new_socket, Socket& peer,
const protocol_type& protocol, endpoint_type* peer_endpoint, const protocol_type& protocol, endpoint_type* peer_endpoint,
bool enable_connection_aborted, Handler handler) bool enable_connection_aborted, Handler handler)
: operation( : operation(io_service,
&accept_operation<Socket, Handler>::do_completion_impl, &accept_operation<Socket, Handler>::do_completion_impl,
&accept_operation<Socket, Handler>::destroy_impl), &accept_operation<Socket, Handler>::destroy_impl),
io_service_(io_service), io_service_(io_service),