Removed completion context support.

This commit is contained in:
chris 2004-03-20 01:01:39 +00:00
parent 8a3d80d22c
commit 5c1d5b2e80
26 changed files with 397 additions and 1573 deletions

View File

@ -24,7 +24,6 @@
#include "asio/buffered_recv_stream.hpp"
#include "asio/buffered_send_stream.hpp"
#include "asio/buffered_stream.hpp"
#include "asio/counting_completion_context.hpp"
#include "asio/demuxer.hpp"
#include "asio/dgram_socket.hpp"
#include "asio/error_handler.hpp"
@ -32,7 +31,6 @@
#include "asio/ipv4/address.hpp"
#include "asio/ipv4/tcp.hpp"
#include "asio/ipv4/udp.hpp"
#include "asio/null_completion_context.hpp"
#include "asio/recv.hpp"
#include "asio/send.hpp"
#include "asio/service_factory.hpp"

View File

@ -21,7 +21,6 @@
#include <boost/noncopyable.hpp>
#include "asio/detail/pop_options.hpp"
#include "asio/null_completion_context.hpp"
#include "asio/service_factory.hpp"
#include "asio/detail/service_registry.hpp"
#include "asio/detail/signal_init.hpp"
@ -55,13 +54,14 @@ public:
/// Run the demuxer's event processing loop.
/**
* The run function blocks until all operations have completed and there are
* no more completions to be delivered, or until the demuxer has been
* interrupted. The run function may be safely called again once it has
* completed to execute any new operations or deliver new completions.
* The run function blocks until all work has finished and there are no more
* handlers to be dispatched, or until the demuxer has been interrupted.
*
* Multiple threads may call the run function to set up a pool of threads
* from which the demuxer may dispatch the completion notifications.
* from which the demuxer may execute handlers.
*
* The run function may be safely called again once it has completed only
* after a call to reset.
*/
void run()
{
@ -75,10 +75,9 @@ public:
* possible.
*
* Note that if the run function is interrupted and is not called again later
* then its operations may not have finished and completions not delivered.
* In this case a demuxer implementation is not required to make any
* guarantee that any resources associated with those operations would be
* cleaned up.
* then its work may not have finished and handlers may not be delivered. In
* this case a demuxer implementation is not required to make any guarantee
* that any resources associated with unfinished work will be cleaned up.
*/
void interrupt()
{
@ -99,167 +98,68 @@ public:
service_.reset();
}
/// Notify the demuxer that an operation has started.
/// Notify the demuxer that some work has started.
/**
* This function is used to inform the demuxer that a new operation has
* begun. A call to this function must be matched with a later corresponding
* call to operation_completed.
* This function is used to inform the demuxer that some work has begun. This
* ensures that the run function will not exit while the work is under way.
*
* A call to this function must be matched with a later corresponding
* call to work_finished.
*/
void operation_started()
void work_started()
{
service_.operation_started();
service_.work_started();
}
/// Notify the demuxer that an operation has completed.
/// Notify the demuxer that some work has finished.
/**
* This function is used to inform the demuxer that an operation has
* completed and that it should invoke the given completion handler. A call
* to this function must be matched with an earlier corresponding call to
* operation_started.
* This function is used to inform the demuxer that some work has finished.
* Once the count of unfinished work reaches zero, the demuxer's run function
* is permitted to exit.
*
* The completion handler is guaranteed to be called only from a thread in
* which the run member function is being invoked.
* A call to this function must be matched with a later corresponding call to
* work_started.
*/
void work_finished()
{
service_.work_finished();
}
/// Request the demuxer to invoke the given handler.
/**
* This function is used to ask the demuxer to execute the given handler.
*
* @param handler The completion handler to be called. The demuxer will make
* The demuxer guarantees that the handler will only be called in a thread in
* which the run member function is currently being invoked. The handler may
* be executed inside this function if the guarantee can be met.
*
* @param handler The handler to be called. The demuxer will make
* a copy of the handler object as required. The equivalent function
* signature of the handler must be: @code void handler(); @endcode
*/
template <typename Handler>
void operation_completed(Handler handler)
void dispatch(Handler handler)
{
service_.operation_completed(handler, null_completion_context(), false);
service_.dispatch(handler);
}
/// Notify the demuxer that an operation has completed.
/// Request the demuxer to invoke the given handler and return immediately.
/**
* This function is used to inform the demuxer that an operation has
* completed and that it should invoke the given completion handler. A call
* to this function must be matched with an earlier corresponding call to
* operation_started.
* This function is used to ask the demuxer to execute the given handler, but
* without allowing the demuxer to call the handler from inside this
* function.
*
* The completion handler is guaranteed to be called only from a thread in
* which the run member function is being invoked.
* The demuxer guarantees that the handler will only be called in a thread in
* which the run member function is currently being invoked.
*
* @param handler The completion handler to be called. The demuxer will make
* a copy of the handler object as required. The equivalent function
* signature of the handler must be: @code void handler(); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made
* of the context object as required, however all copies are equivalent.
*/
template <typename Handler, typename Completion_Context>
void operation_completed(Handler handler, Completion_Context context)
{
service_.operation_completed(handler, context, false);
}
/// Notify the demuxer that an operation has completed.
/**
* This function is used to inform the demuxer that an operation has
* completed and that it should invoke the given completion handler. A call
* to this function must be matched with an earlier corresponding call to
* operation_started.
*
* The completion handler is guaranteed to be called only from a thread in
* which the run member function is being invoked.
*
* @param handler The completion handler to be called. The demuxer will make
* a copy of the handler object as required. The equivalent function
* signature of the handler must be: @code void handler(); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made
* of the context object as required, however all copies are equivalent.
*
* @param allow_nested_delivery If true, this allows the demuxer to run the
* completion handler before operation_completed returns, as an optimisation.
* This is at the discretion of the demuxer implementation, but may only be
* done if it can meet the guarantee that the handler is invoked from a
* thread executing the run function. If false, the function returns
* immediately.
*/
template <typename Handler, typename Completion_Context>
void operation_completed(Handler handler, Completion_Context context,
bool allow_nested_delivery)
{
service_.operation_completed(handler, context, allow_nested_delivery);
}
/// Notify the demuxer of an operation that started and finished immediately.
/**
* This function is used to inform the demuxer that an operation has both
* started and completed immediately, and that it should invoke the given
* completion handler. A call to this function must not have either of a
* corresponding operation_started or operation_completed.
*
* The completion handler is guaranteed to be called only from a thread in
* which the run member function is being invoked.
*
* @param handler The completion handler to be called. The demuxer will make
* @param handler The handler to be called. The demuxer will make
* a copy of the handler object as required. The equivalent function
* signature of the handler must be: @code void handler(); @endcode
*/
template <typename Handler>
void operation_immediate(Handler handler)
void post(Handler handler)
{
service_.operation_immediate(handler, null_completion_context(), false);
}
/// Notify the demuxer of an operation that started and finished immediately.
/**
* This function is used to inform the demuxer that an operation has both
* started and completed immediately, and that it should invoke the given
* completion handler. A call to this function must not have either of a
* corresponding operation_started or operation_completed.
*
* The completion handler is guaranteed to be called only from a thread in
* which the run member function is being invoked.
*
* @param handler The completion handler to be called. The demuxer will make
* a copy of the handler object as required. The equivalent function
* signature of the handler must be: @code void handler(); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made
* of the context object as required, however all copies are equivalent.
*/
template <typename Handler, typename Completion_Context>
void operation_immediate(Handler handler, Completion_Context context)
{
service_.operation_immediate(handler, context, false);
}
/// Notify the demuxer of an operation that started and finished immediately.
/**
* This function is used to inform the demuxer that an operation has both
* started and completed immediately, and that it should invoke the given
* completion handler. A call to this function must not have either of a
* corresponding operation_started or operation_completed.
*
* The completion handler is guaranteed to be called only from a thread in
* which the run member function is being invoked.
*
* @param handler The completion handler to be called. The demuxer will make
* a copy of the handler object as required. The equivalent function
* signature of the handler must be: @code void handler(); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made
* of the context object as required, however all copies are equivalent.
*
* @param allow_nested_delivery If true, this allows the demuxer to run the
* completion handler before operation_immediate returns, as an optimisation.
* This is at the discretion of the demuxer implementation, but may only be
* done if it can meet the guarantee that the handler is invoked from a
* thread executing the run function. If false, the function returns
* immediately.
*/
template <typename Handler, typename Completion_Context>
void operation_immediate(Handler handler, Completion_Context context,
bool allow_nested_delivery)
{
service_.operation_immediate(handler, context, allow_nested_delivery);
service_.post(handler);
}
/// Obtain the service interface corresponding to the given type.

View File

@ -22,7 +22,6 @@
#include "asio/detail/pop_options.hpp"
#include "asio/error_handler.hpp"
#include "asio/null_completion_context.hpp"
#include "asio/service_factory.hpp"
namespace asio {
@ -49,8 +48,8 @@ public:
* This constructor creates a dgram socket without opening it. The open()
* function must be called before data can be sent or received on the socket.
*
* @param d The demuxer object that the dgram socket will use to deliver
* completions for any asynchronous operations performed on the socket.
* @param d The demuxer object that the dgram socket will use to dispatch
* handlers for any asynchronous operations performed on the socket.
*/
explicit basic_dgram_socket(demuxer_type& d)
: service_(d.get_service(service_factory<Service>())),
@ -66,8 +65,8 @@ public:
* automatically to be the default datagram protocol associated with the
* given address type.
*
* @param d The demuxer object that the dgram socket will use to deliver
* completions for any asynchronous operations performed on the socket.
* @param d The demuxer object that the dgram socket will use to dispatch
* handlers for any asynchronous operations performed on the socket.
*
* @param address An address on the local machine to which the dgram socket
* will be bound.
@ -93,11 +92,10 @@ public:
/// Get the demuxer associated with the asynchronous object.
/**
* This function may be used to obtain the demuxer object that the dgram
* socket uses to deliver completions for asynchronous operations.
* socket uses to dispatch handlers for asynchronous operations.
*
* @return A reference to the demuxer object that dgram socket will use to
* deliver completion notifications. Ownership is not transferred to the
* caller.
* dispatch handlers. Ownership is not transferred to the caller.
*/
demuxer_type& demuxer()
{
@ -366,9 +364,9 @@ public:
* @param destination The remote address to which the data will be sent.
* Copies will be made of the address as required.
*
* @param handler The completion handler to be called when the send operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @param handler The handler to be called when the send operation completes.
* Copies will be made of the handler as required. The equivalent function
* signature of the handler must be:
* @code void handler(
* const asio::socket_error& error, // Result of operation
* size_t bytes_sent // Number of bytes sent
@ -378,47 +376,7 @@ public:
void async_sendto(const void* data, size_t length,
const Address& destination, Handler handler)
{
service_.async_sendto(impl_, data, length, destination, handler,
null_completion_context());
}
/// Start an asynchronous send.
/**
* This function is used to asynchronously send a datagram to the specified
* remote address. The function call always returns immediately.
*
* @param data The data to be sent to the remote address. Ownership of the
* data is retained by the caller, which must guarantee that it is valid
* until the handler is called.
*
* @param length The size of the data to be sent, in bytes.
*
* @param destination The remote address to which the data will be sent.
* Copies will be made of the address as required.
*
* @param handler The completion handler to be called when the send operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be <tt>void handler(const
* socket_error& error, size_t bytes_sent)</tt>.
*
* @param handler The completion handler to be called when the send operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @code void handler(
* const asio::socket_error& error, // Result of operation
* size_t bytes_sent // Number of bytes sent
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made
* of the context object as required, however all copies are equivalent.
*/
template <typename Address, typename Handler, typename Completion_Context>
void async_sendto(const void* data, size_t length,
const Address& destination, Handler handler,
Completion_Context context)
{
service_.async_sendto(impl_, data, length, destination, handler, context);
service_.async_sendto(impl_, data, length, destination, handler);
}
/// Receive a datagram with the address of the sender.
@ -494,9 +452,9 @@ public:
* retained by the caller, which must guarantee that it is valid until the
* handler is called.
*
* @param handler The completion handler to be called when the receive
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @param handler The handler to be called when the receive operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @code void handler(
* const asio::socket_error& error, // Result of operation
* size_t bytes_received // Number of bytes received
@ -506,45 +464,7 @@ public:
void async_recvfrom(void* data, size_t max_length, Address& sender_address,
Handler handler)
{
service_.async_recvfrom(impl_, data, max_length, sender_address, handler,
null_completion_context());
}
/// Start an asynchronous receive.
/**
* This function is used to asynchronously receive a datagram. The function
* call always returns immediately.
*
* @param data The data buffer into which the received datagram will be
* written. Ownership of the data buffer is retained by the caller, which
* must guarantee that it is valid until the handler is called.
*
* @param max_length The maximum length, in bytes, of data that can be held
* in the supplied buffer.
*
* @param sender_address An address object that receives the address of the
* remote sender of the datagram. Ownership of the sender_address object is
* retained by the caller, which must guarantee that it is valid until the
* handler is called.
*
* @param handler The completion handler to be called when the receive
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @code void handler(
* const asio::socket_error& error, // Result of operation
* size_t bytes_received // Number of bytes received
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made
* of the context object as required, however all copies are equivalent.
*/
template <typename Address, typename Handler, typename Completion_Context>
void async_recvfrom(void* data, size_t max_length, Address& sender_address,
Handler handler, Completion_Context context)
{
service_.async_recvfrom(impl_, data, max_length, sender_address, handler,
context);
service_.async_recvfrom(impl_, data, max_length, sender_address, handler);
}
private:

View File

@ -22,7 +22,6 @@
#include "asio/detail/pop_options.hpp"
#include "asio/error_handler.hpp"
#include "asio/null_completion_context.hpp"
#include "asio/service_factory.hpp"
namespace asio {
@ -49,8 +48,8 @@ public:
* connections. The open() function must be called before the acceptor can
* accept new socket connections.
*
* @param d The demuxer object that the acceptor will use to deliver
* completions for any asynchronous operations performed on the acceptor.
* @param d The demuxer object that the acceptor will use to dispatch
* handlers for any asynchronous operations performed on the acceptor.
*/
explicit basic_socket_acceptor(demuxer_type& d)
: service_(d.get_service(service_factory<Service>())),
@ -63,8 +62,8 @@ public:
* This constructor creates an acceptor and automatically opens it to listen
* for new connections on the specified address.
*
* @param d The demuxer object that the acceptor will use to deliver
* completions for any asynchronous operations performed on the acceptor.
* @param d The demuxer object that the acceptor will use to dispatch
* handlers for any asynchronous operations performed on the acceptor.
*
* @param address An address on the local machine on which the acceptor will
* listen for new connections.
@ -95,11 +94,10 @@ public:
/// Get the demuxer associated with the asynchronous object.
/**
* This function may be used to obtain the demuxer object that the acceptor
* uses to deliver completions for asynchronous operations.
* uses to dispatch handlers for asynchronous operations.
*
* @return A reference to the demuxer object that acceptor will use to
* deliver completion notifications. Ownership is not transferred to the
* caller.
* dispatch handlers. Ownership is not transferred to the caller.
*/
demuxer_type& demuxer()
{
@ -388,9 +386,9 @@ public:
* accepted. Ownership of the peer_socket object is retained by the caller,
* which must guarantee that it is valid until the handler is called.
*
* @param handler The completion handler to be called when the accept
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @param handler The handler to be called when the accept operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @code void handler(
* const asio::socket_error& error // Result of operation
* ); @endcode
@ -398,35 +396,7 @@ public:
template <typename Stream, typename Handler>
void async_accept(Stream& peer_socket, Handler handler)
{
service_.async_accept(impl_, peer_socket.lowest_layer(), handler,
null_completion_context());
}
/// Start an asynchronous accept.
/**
* This function is used to asynchronously accept a new connection into a
* stream socket. The function call always returns immediately.
*
* @param peer_socket The stream socket into which the new connection will be
* accepted. Ownership of the peer_socket object is retained by the caller,
* which must guarantee that it is valid until the handler is called.
*
* @param handler The completion handler to be called when the accept
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @code void handler(
* const asio::socket_error& error // Result of operation
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made
* of the context object as required, however all copies are equivalent.
*/
template <typename Stream, typename Handler, typename Completion_Context>
void async_accept(Stream& peer_socket, Handler handler,
Completion_Context context)
{
service_.async_accept(impl_, peer_socket.lowest_layer(), handler, context);
service_.async_accept(impl_, peer_socket.lowest_layer(), handler);
}
/// Accept a new connection and obtain the address of the peer
@ -494,9 +464,9 @@ public:
* the caller, which must guarantee that it is valid until the handler is
* called.
*
* @param handler The completion handler to be called when the accept
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @param handler The handler to be called when the accept operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @code void handler(
* const asio::socket_error& error // Result of operation
* ); @endcode
@ -506,42 +476,7 @@ public:
Handler handler)
{
service_.async_accept_address(impl_, peer_socket.lowest_layer(),
peer_address, handler, null_completion_context());
}
/// Start an asynchronous accept.
/**
* This function is used to asynchronously accept a new connection into a
* stream socket, and additionally obtain the address of the remote peer. The
* function call always returns immediately.
*
* @param peer_socket The stream socket into which the new connection will be
* accepted. Ownership of the peer_socket object is retained by the caller,
* which must guarantee that it is valid until the handler is called.
*
* @param peer_address An address object into which the address of the remote
* peer will be written. Ownership of the peer_address object is retained by
* the caller, which must guarantee that it is valid until the handler is
* called.
*
* @param handler The completion handler to be called when the accept
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @code void handler(
* const asio::socket_error& error // Result of operation
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made
* of the context object as required, however all copies are equivalent.
*/
template <typename Stream, typename Address, typename Handler,
typename Completion_Context>
void async_accept_address(Stream& peer_socket, Address& peer_address,
Handler handler, Completion_Context context)
{
service_.async_accept_address(impl_, peer_socket.lowest_layer(),
peer_address, handler, context);
peer_address, handler);
}
private:

View File

@ -22,7 +22,6 @@
#include "asio/detail/pop_options.hpp"
#include "asio/error_handler.hpp"
#include "asio/null_completion_context.hpp"
#include "asio/service_factory.hpp"
namespace asio {
@ -47,8 +46,8 @@ public:
/**
* Constructs the connector and opens it automatically.
*
* @param d The demuxer object that the connector will use to deliver
* completions for any asynchronous operations performed on the connector.
* @param d The demuxer object that the connector will use to dispatch
* handlers for any asynchronous operations performed on the connector.
*/
explicit basic_socket_connector(demuxer_type& d)
: service_(d.get_service(service_factory<Service>())),
@ -62,8 +61,8 @@ public:
* This constructor automatically opens the connector so that it will
* establish connections using the specified protocol.
*
* @param d The demuxer object that the connector will use to deliver
* completions for any asynchronous operations performed on the connector.
* @param d The demuxer object that the connector will use to dispatch
* handlers for any asynchronous operations performed on the connector.
*
* @param protocol The protocol to be used for all new connections
* established using the connector.
@ -85,11 +84,10 @@ public:
/// Get the demuxer associated with the asynchronous object.
/**
* This function may be used to obtain the demuxer object that the connector
* uses to deliver completions for asynchronous operations.
* uses to dispatch handlers for asynchronous operations.
*
* @return A reference to the demuxer object that the connector will use to
* deliver completion notifications. Ownership is not transferred to the
* caller.
* dispatch handlers. Ownership is not transferred to the caller.
*/
demuxer_type& demuxer()
{
@ -212,9 +210,9 @@ public:
* @param peer_address The remote address of the peer to which the socket
* will be connected. Copies will be made of the address as required.
*
* @param handler The completion handler to be called when the connection
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @param handler The handler to be called when the connection operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @code void handler(
* const asio::socket_error& error // Result of operation
* ); @endcode
@ -224,39 +222,7 @@ public:
Handler handler)
{
service_.async_connect(impl_, peer_socket.lowest_layer(), peer_address,
handler, null_completion_context());
}
/// Start an asynchronous connect.
/**
* This function is used to asynchronously connect a stream socket to the
* specified remote address. The function call always returns immediately.
*
* @param peer_socket The stream socket to be connected. Ownership of the
* peer_socket object is retained by the caller, which must guarantee that it
* is valid until the handler is called.
*
* @param peer_address The remote address of the peer to which the socket
* will be connected. Copies will be made of the address as required.
*
* @param handler The completion handler to be called when the connection
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @code void handler(
* const asio::socket_error& error // Result of operation
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made
* of the context object as required, however all copies are equivalent.
*/
template <typename Stream, typename Address, typename Handler,
typename Completion_Context>
void async_connect(Stream& peer_socket, const Address& peer_address,
Handler handler, Completion_Context context)
{
service_.async_connect(impl_, peer_socket.lowest_layer(), peer_address,
handler, context);
handler);
}
private:

View File

@ -22,7 +22,6 @@
#include "asio/detail/pop_options.hpp"
#include "asio/error_handler.hpp"
#include "asio/null_completion_context.hpp"
#include "asio/service_factory.hpp"
namespace asio {
@ -53,8 +52,8 @@ public:
* peer. The socket needs to be connected or accepted before data can be sent
* or received on it.
*
* @param d The demuxer object that the stream socket will use to deliver
* completions for any asynchronous operations performed on the socket.
* @param d The demuxer object that the stream socket will use to dispatch
* handlers for any asynchronous operations performed on the socket.
*/
explicit basic_stream_socket(demuxer_type& d)
: service_(d.get_service(service_factory<Service>())),
@ -71,11 +70,10 @@ public:
/// Get the demuxer associated with the asynchronous object.
/**
* This function may be used to obtain the demuxer object that the stream
* socket uses to deliver completions for asynchronous operations.
* socket uses to dispatch handlers for asynchronous operations.
*
* @return A reference to the demuxer object that stream socket will use to
* deliver completion notifications. Ownership is not transferred to the
* caller.
* dispatch handlers. Ownership is not transferred to the caller.
*/
demuxer_type& demuxer()
{
@ -330,9 +328,9 @@ public:
*
* @param length The size of the data to be sent, in bytes.
*
* @param handler The completion handler to be called when the send operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @param handler The handler to be called when the send operation completes.
* Copies will be made of the handler as required. The equivalent function
* signature of the handler must be:
* @code void handler(
* const asio::socket_error& error, // Result of operation
* size_t bytes_sent // Number of bytes sent
@ -345,42 +343,7 @@ public:
template <typename Handler>
void async_send(const void* data, size_t length, Handler handler)
{
service_.async_send(impl_, data, length, handler,
null_completion_context());
}
/// Start an asynchronous send.
/**
* This function is used to asynchronously send data to the stream socket's
* peer. The function call always returns immediately.
*
* @param data The data to be sent to the remote peer. Ownership of the data
* is retained by the caller, which must guarantee that it is valid until the
* handler is called.
*
* @param length The size of the data to be sent, in bytes.
*
* @param handler The completion handler to be called when the send operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @code void handler(
* const asio::socket_error& error, // Result of operation
* size_t bytes_sent // Number of bytes sent
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made
* of the context object as required, however all copies are equivalent.
*
* @note The send operation may not transmit all of the data to the peer.
* Consider using the asio::async_send_n() function if you need to ensure
* that all data is sent before the asynchronous operation completes.
*/
template <typename Handler, typename Completion_Context>
void async_send(const void* data, size_t length, Handler handler,
Completion_Context context)
{
service_.async_send(impl_, data, length, handler, context);
service_.async_send(impl_, data, length, handler);
}
/// Receive some data from the peer.
@ -450,9 +413,9 @@ public:
*
* @param max_length The maximum size of the data to be received, in bytes.
*
* @param handler The completion handler to be called when the receive
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @param handler The handler to be called when the receive operation
* completes. Copies will be made of the handler as required. The equivalent\
* function signature of the handler must be:
* @code void handler(
* const asio::socket_error& error, // Result of operation
* size_t bytes_received // Number of bytes received
@ -466,43 +429,7 @@ public:
template <typename Handler>
void async_recv(void* data, size_t max_length, Handler handler)
{
service_.async_recv(impl_, data, max_length, handler,
null_completion_context());
}
/// Start an asynchronous receive.
/**
* This function is used to asynchronously receive data from the stream
* socket's peer. The function call always returns immediately.
*
* @param data The buffer into which the received data will be written.
* Ownership of the buffer is retained by the caller, which must guarantee
* that it is valid until the handler is called.
*
* @param max_length The maximum size of the data to be received, in bytes.
*
* @param handler The completion handler to be called when the receive
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @code void handler(
* const asio::socket_error& error, // Result of operation
* size_t bytes_received // Number of bytes received
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made
* of the context object as required, however all copies are equivalent.
*
* @note The recv operation may not receive all of the requested number of
* bytes. Consider using the asio::async_recv_n() function if you need to
* ensure that the requested amount of data is received before the
* asynchronous operation completes.
*/
template <typename Handler, typename Completion_Context>
void async_recv(void* data, size_t max_length, Handler handler,
Completion_Context context)
{
service_.async_recv(impl_, data, max_length, handler, context);
service_.async_recv(impl_, data, max_length, handler);
}
private:

View File

@ -21,7 +21,6 @@
#include <boost/noncopyable.hpp>
#include "asio/detail/pop_options.hpp"
#include "asio/null_completion_context.hpp"
#include "asio/service_factory.hpp"
#include "asio/timer_base.hpp"
@ -49,7 +48,7 @@ public:
* This constructor creates a timer without setting an expiry time. The set()
* function must be called before the timer can be waited on.
*
* @param d The demuxer object that the timer will use to deliver completions
* @param d The demuxer object that the timer will use to dispatch handlers
* for any asynchronous operations performed on the timer.
*/
explicit basic_timer(demuxer_type& d)
@ -63,7 +62,7 @@ public:
/**
* This constructor creates a timer and sets the expiry time.
*
* @param d The demuxer object that the timer will use to deliver completions
* @param d The demuxer object that the timer will use to dispatch handlers
* for any asynchronous operations performed on the timer.
*
* @param from_when The origin time against which the seconds and
@ -94,11 +93,10 @@ public:
/// Get the demuxer associated with the asynchronous object.
/**
* This function may be used to obtain the demuxer object that the timer uses
* to deliver completions for asynchronous operations.
* to dispatch handlers for asynchronous operations.
*
* @return A reference to the demuxer object that the timer will use to
* deliver completion notifications. Ownership is not transferred to the
* caller.
* dispatch handlers. Ownership is not transferred to the caller.
*/
demuxer_type& demuxer()
{
@ -162,34 +160,14 @@ public:
* timer. It always returns immediately, but the specified handler will be
* notified when the timer expires.
*
* @param handler The completion handler to be called when the timer expires.
* Copies will be made of the handler as required. The equivalent function
* signature of the handler must be: @code void handler(); @endcode
* @param handler The handler to be called when the timer expires. Copies
* will be made of the handler as required. The equivalent function signature
* of the handler must be: @code void handler(); @endcode
*/
template <typename Handler>
void async_wait(Handler handler)
{
service_.async_wait(impl_, handler, null_completion_context());
}
/// Start an asynchronous wait on the timer.
/**
* This function may be used to initiate an asynchronous wait against the
* timer. It always returns immediately, but the specified handler will be
* notified when the timer expires.
*
* @param handler The completion handler to be called when the timer expires.
* Copies will be made of the handler as required. The equivalent function
* signature of the handler must be: @code void handler(); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made
* of the context object as required, however all copies are equivalent.
*/
template <typename Handler, typename Completion_Context>
void async_wait(Handler handler, Completion_Context context)
{
service_.async_wait(impl_, handler, context);
service_.async_wait(impl_, handler);
}
private:

View File

@ -24,7 +24,6 @@
#include "asio/detail/pop_options.hpp"
#include "asio/fixed_buffer.hpp"
#include "asio/null_completion_context.hpp"
#include "asio/detail/bind_handler.hpp"
#include "asio/detail/buffer_resize_guard.hpp"
@ -110,15 +109,6 @@ public:
next_layer_.async_send(data, length, handler);
}
/// Start an asynchronous send. The data being sent must be valid for the
/// lifetime of the asynchronous operation.
template <typename Handler, typename Completion_Context>
void async_send(const void* data, size_t length, Handler handler,
Completion_Context context)
{
next_layer_.async_send(data, length, handler, context);
}
/// Fill the buffer with some data. Returns the number of bytes placed in the
/// buffer as a result of the operation, or 0 if the underlying connection
/// was closed. Throws an exception on failure.
@ -184,17 +174,6 @@ public:
fill_handler<Handler>(buffer_, previous_size, handler));
}
/// Start an asynchronous fill.
template <typename Handler, typename Completion_Context>
void async_fill(Handler handler, Completion_Context context)
{
size_t previous_size = buffer_.size();
buffer_.resize(buffer_.capacity());
next_layer_.async_recv(buffer_.begin() + previous_size,
buffer_.size() - previous_size,
fill_handler<Handler>(buffer_, previous_size, handler), context);
}
/// Receive some data from the peer. Returns the number of bytes received or
/// 0 if the stream was closed cleanly. Throws an exception on failure.
size_t recv(void* data, size_t max_length)
@ -271,25 +250,6 @@ public:
}
}
/// Start an asynchronous receive. The buffer for the data being received
/// must be valid for the lifetime of the asynchronous operation.
template <typename Handler, typename Completion_Context>
void async_recv(void* data, size_t max_length, Handler handler,
Completion_Context context)
{
if (buffer_.empty())
{
async_fill(recv_handler<Handler>(buffer_, data, max_length, handler),
context);
}
else
{
size_t length = copy(data, max_length);
next_layer_.demuxer().operation_immediate(
detail::bind_handler(handler, 0, length), context);
}
}
private:
/// Copy data out of the internal buffer to the specified target buffer.
/// Returns the number of bytes copied.

View File

@ -141,14 +141,6 @@ public:
flush_handler<Handler>(buffer_, handler));
}
/// Start an asynchronous flush.
template <typename Handler, typename Completion_Context>
void async_flush(Handler handler, Completion_Context context)
{
async_send_n(next_layer_, buffer_.begin(), buffer_.size(),
flush_handler<Handler>(buffer_, handler), context);
}
/// Send the given data to the peer. Returns the number of bytes sent or 0 if
/// the stream was closed cleanly. Throws an exception on failure.
size_t send(const void* data, size_t length)
@ -224,25 +216,6 @@ public:
}
}
/// Start an asynchronous send. The data being sent must be valid for the
/// lifetime of the asynchronous operation.
template <typename Handler, typename Completion_Context>
void async_send(const void* data, size_t length, Handler handler,
Completion_Context context)
{
if (buffer_.size() == buffer_.capacity())
{
async_flush(send_handler<Handler>(buffer_, data, length, handler),
context);
}
else
{
size_t bytes_copied = copy(data, length);
next_layer_.demuxer().operation_immediate(
detail::bind_handler(handler, 0, bytes_copied), context);
}
}
/// Receive some data from the peer. Returns the number of bytes received or
/// 0 if the stream was closed cleanly. Throws an exception on failure.
size_t recv(void* data, size_t max_length)
@ -266,15 +239,6 @@ public:
next_layer_.async_recv(data, max_length, handler);
}
/// Start an asynchronous receive. The buffer for the data being received
/// must be valid for the lifetime of the asynchronous operation.
template <typename Handler, typename Completion_Context>
void async_recv(void* data, size_t max_length, Handler handler,
Completion_Context context)
{
next_layer_.async_recv(data, max_length, handler, context);
}
private:
/// Copy data into the internal buffer from the specified source buffer.
/// Returns the number of bytes copied.

View File

@ -107,13 +107,6 @@ public:
return stream_impl_.next_layer().async_flush(handler);
}
/// Start an asynchronous flush.
template <typename Handler, typename Completion_Context>
void async_flush(Handler handler, Completion_Context context)
{
return stream_impl_.next_layer().async_flush(handler, context);
}
/// Send the given data to the peer. Returns the number of bytes sent or 0 if
/// the stream was closed cleanly. Throws an exception on failure.
size_t send(const void* data, size_t length)
@ -137,15 +130,6 @@ public:
stream_impl_.async_send(data, length, handler);
}
/// Start an asynchronous send. The data being sent must be valid for the
/// lifetime of the asynchronous operation.
template <typename Handler, typename Completion_Context>
void async_send(const void* data, size_t length, Handler handler,
Completion_Context context)
{
stream_impl_.async_send(data, length, handler, context);
}
/// Fill the buffer with some data. Returns the number of bytes placed in the
/// buffer as a result of the operation, or 0 if the underlying connection
/// was closed. Throws an exception on failure.
@ -170,13 +154,6 @@ public:
stream_impl_.async_fill(handler);
}
/// Start an asynchronous fill.
template <typename Handler, typename Completion_Context>
void async_fill(Handler handler, Completion_Context context)
{
stream_impl_.async_fill(handler, context);
}
/// Receive some data from the peer. Returns the number of bytes received or
/// 0 if the stream was closed cleanly. Throws an exception on failure.
size_t recv(void* data, size_t max_length)
@ -200,15 +177,6 @@ public:
stream_impl_.async_recv(data, max_length, handler);
}
/// Start an asynchronous receive. The buffer for the data being received
/// must be valid for the lifetime of the asynchronous operation.
template <typename Handler, typename Completion_Context>
void async_recv(void* data, size_t max_length, Handler handler,
Completion_Context context)
{
stream_impl_.async_recv(data, max_length, handler, context);
}
private:
/// The buffered stream implementation.
buffered_recv_stream<buffered_send_stream<Next_Layer, Buffer>, Buffer>

View File

@ -1,267 +0,0 @@
//
// counting_completion_context.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003, 2004 Christopher M. Kohlhoff (chris@kohlhoff.com)
//
// Permission to use, copy, modify, distribute and sell this software and its
// documentation for any purpose is hereby granted without fee, provided that
// the above copyright notice appears in all copies and that both the copyright
// notice and this permission notice appear in supporting documentation. This
// software is provided "as is" without express or implied warranty, and with
// no claim as to its suitability for any purpose.
//
#ifndef ASIO_COUNTING_COMPLETION_CONTEXT_HPP
#define ASIO_COUNTING_COMPLETION_CONTEXT_HPP
#include "asio/detail/push_options.hpp"
#include "asio/detail/mutex.hpp"
namespace asio {
/// The counting_completion_context class is a concrete implementation of the
/// Completion_Context concept. It allows a limitation on the number of
/// concurrent upcalls to completion handlers that may be associated with the
/// context. Copies of an instance of this class represent the same context as
/// the original.
class counting_completion_context
{
public:
/// Default constructor.
counting_completion_context()
: impl_(new impl(1))
{
}
/// Construct with a specified limit on the number of upcalls.
explicit counting_completion_context(int max_concurrent_upcalls)
: impl_(new impl(max_concurrent_upcalls))
{
}
/// Copy constructor.
counting_completion_context(const counting_completion_context& other)
: impl_(other.impl_)
{
impl_->add_ref();
}
/// Destructor.
~counting_completion_context()
{
impl_->remove_ref();
}
/// Assignment operator.
counting_completion_context& operator=(
const counting_completion_context& other)
{
counting_completion_context tmp(other);
impl* tmp_impl = tmp.impl_;
tmp.impl_ = impl_;
impl_ = tmp_impl;
return *this;
}
/// Attempt to acquire the right to make an upcall.
bool try_acquire()
{
return impl_->try_acquire();
}
/// Acquire the right to make an upcall.
template <typename Handler>
void acquire(Handler handler)
{
impl_->acquire(handler);
}
/// Relinquish a previously granted right to make an upcall.
void release()
{
impl_->release();
}
private:
// Inner implementation class to allow all copies made of a
// counting_completion_context to remain equivalent.
class impl
{
public:
// Constructor.
explicit impl(int max_concurrent_upcalls)
: mutex_(),
ref_count_(1),
max_concurrent_upcalls_(max_concurrent_upcalls),
concurrent_upcalls_(0),
first_waiter_(0),
last_waiter_(0)
{
}
// Destructor.
~impl()
{
while (first_waiter_)
{
waiter_base* delete_waiter = first_waiter_;
first_waiter_ = first_waiter_->next_;
delete delete_waiter;
}
}
// Attempt to acquire the right to make an upcall.
bool try_acquire()
{
detail::mutex::scoped_lock lock(mutex_);
if (concurrent_upcalls_ < max_concurrent_upcalls_)
{
++concurrent_upcalls_;
return true;
}
return false;
}
// Acquire the right to make an upcall.
template <typename Handler>
void acquire(Handler handler)
{
detail::mutex::scoped_lock lock(mutex_);
if (concurrent_upcalls_ < max_concurrent_upcalls_)
{
// The context can been acquired for the locker.
++concurrent_upcalls_;
lock.unlock();
handler();
}
else
{
// Can't acquire the context for the locker right now, so put it on the
// list of waiters.
waiter_base* new_waiter = new waiter<Handler>(handler);
if (first_waiter_ == 0)
{
first_waiter_ = new_waiter;
last_waiter_ = new_waiter;
}
else
{
last_waiter_->next_ = new_waiter;
last_waiter_ = new_waiter;
}
}
}
// Relinquish a previously granted right to make an upcall.
void release()
{
detail::mutex::scoped_lock lock(mutex_);
// Check if we can start one of the waiting tasks now.
if (concurrent_upcalls_ <= max_concurrent_upcalls_ && first_waiter_)
{
waiter_base* next_waiter = first_waiter_;
first_waiter_ = first_waiter_->next_;
if (first_waiter_ == 0)
last_waiter_ = 0;
lock.unlock();
next_waiter->notify();
delete next_waiter;
}
else
{
--concurrent_upcalls_;
}
}
// Increment the reference count.
void add_ref()
{
detail::mutex::scoped_lock lock(mutex_);
++ref_count_;
}
// Decrement the reference count and delete the object if necessary.
void remove_ref()
{
detail::mutex::scoped_lock lock(mutex_);
if (--ref_count_ == 0)
{
lock.unlock();
delete this;
}
}
private:
// Mutex to protect access to internal data.
detail::mutex mutex_;
// The reference count on the implementation.
int ref_count_;
// The maximum number of concurrent upcalls.
int max_concurrent_upcalls_;
// The current number of upcalls.
int concurrent_upcalls_;
// Base class for all waiter types.
class waiter_base
{
public:
waiter_base()
: next_(0)
{
}
virtual ~waiter_base()
{
}
virtual void notify() = 0;
waiter_base* next_;
};
// Class template for a waiter.
template <typename Handler>
class waiter
: public waiter_base
{
public:
waiter(Handler handler)
: handler_(handler)
{
}
virtual void notify()
{
handler_();
}
private:
Handler handler_;
};
// The start of the list of waiters for the context.
waiter_base* first_waiter_;
// The end of the list of waiters for the context.
waiter_base* last_waiter_;
};
// The inner implementation.
impl* impl_;
};
} // namespace asio
#include "asio/detail/pop_options.hpp"
#endif // ASIO_COUNTING_COMPLETION_CONTEXT_HPP

View File

@ -23,8 +23,9 @@ namespace detail {
// Some compilers (notably MSVC6) run into mysterious compiler errors when
// trying to use the boost::bind template in this library. The class and
// function templates below provide only the functionality of bind to create
// function objects with the signature void() as used in completion handlers.
// This should make it simpler for the compiler to work correctly.
// function objects with the signature void() as used in handlers passed to a
// demuxer's dispatch or post functions. This should make it simpler for the
// compiler to work correctly.
template <typename Handler, typename Arg1>
class binder1

View File

@ -143,20 +143,18 @@ public:
return bytes_sent;
}
template <typename Address, typename Handler, typename Completion_Context>
template <typename Address, typename Handler>
class sendto_handler
{
public:
sendto_handler(impl_type impl, Demuxer& demuxer, const void* data,
size_t length, const Address& address, Handler handler,
Completion_Context context)
size_t length, const Address& address, Handler handler)
: impl_(impl),
demuxer_(demuxer),
data_(data),
length_(length),
destination_(address),
handler_(handler),
context_(context)
handler_(handler)
{
}
@ -166,14 +164,15 @@ public:
destination_.native_address(), destination_.native_size());
socket_error error(bytes < 0
? socket_ops::get_error() : socket_error::success);
demuxer_.operation_completed(bind_handler(handler_, error, bytes),
context_);
demuxer_.post(bind_handler(handler_, error, bytes));
demuxer_.work_finished();
}
void do_cancel()
{
socket_error error(socket_error::operation_aborted);
demuxer_.operation_completed(bind_handler(handler_, error, 0), context_);
demuxer_.post(bind_handler(handler_, error, 0));
demuxer_.work_finished();
}
private:
@ -183,19 +182,17 @@ public:
size_t length_;
Address destination_;
Handler handler_;
Completion_Context context_;
};
// Start an asynchronous send. The data being sent must be valid for the
// lifetime of the asynchronous operation.
template <typename Address, typename Handler, typename Completion_Context>
template <typename Address, typename Handler>
void async_sendto(impl_type& impl, const void* data, size_t length,
const Address& destination, Handler handler, Completion_Context context)
const Address& destination, Handler handler)
{
demuxer_.operation_started();
reactor_.start_write_op(impl,
sendto_handler<Address, Handler, Completion_Context>(impl, demuxer_,
data, length, destination, handler, context));
demuxer_.work_started();
reactor_.start_write_op(impl, sendto_handler<Address, Handler>(
impl, demuxer_, data, length, destination, handler));
}
// Receive a datagram with the address of the sender. Returns the number of
@ -218,20 +215,18 @@ public:
return bytes_recvd;
}
template <typename Address, typename Handler, typename Completion_Context>
template <typename Address, typename Handler>
class recvfrom_handler
{
public:
recvfrom_handler(impl_type impl, Demuxer& demuxer, void* data,
size_t max_length, Address& address, Handler handler,
Completion_Context context)
size_t max_length, Address& address, Handler handler)
: impl_(impl),
demuxer_(demuxer),
data_(data),
max_length_(max_length),
sender_address_(address),
handler_(handler),
context_(context)
handler_(handler)
{
}
@ -243,14 +238,15 @@ public:
socket_error error(bytes < 0
? socket_ops::get_error() : socket_error::success);
sender_address_.native_size(addr_len);
demuxer_.operation_completed(bind_handler(handler_, error, bytes),
context_);
demuxer_.post(bind_handler(handler_, error, bytes));
demuxer_.work_finished();
}
void do_cancel()
{
socket_error error(socket_error::operation_aborted);
demuxer_.operation_completed(bind_handler(handler_, error, 0), context_);
demuxer_.post(bind_handler(handler_, error, 0));
demuxer_.work_finished();
}
private:
@ -260,24 +256,22 @@ public:
size_t max_length_;
Address& sender_address_;
Handler handler_;
Completion_Context context_;
};
// Start an asynchronous receive. The buffer for the data being received and
// the sender_address obejct must both be valid for the lifetime of the
// asynchronous operation.
template <typename Address, typename Handler, typename Completion_Context>
template <typename Address, typename Handler>
void async_recvfrom(impl_type& impl, void* data, size_t max_length,
Address& sender_address, Handler handler, Completion_Context context)
Address& sender_address, Handler handler)
{
demuxer_.operation_started();
reactor_.start_read_op(impl,
recvfrom_handler<Address, Handler, Completion_Context>(impl, demuxer_,
data, max_length, sender_address, handler, context));
demuxer_.work_started();
reactor_.start_read_op(impl, recvfrom_handler<Address, Handler>(
impl, demuxer_, data, max_length, sender_address, handler));
}
private:
// The demuxer used for delivering completion notifications.
// The demuxer used for dispatching handlers.
Demuxer& demuxer_;
// The selector that performs event demultiplexing for the provider.

View File

@ -185,19 +185,16 @@ public:
peer.set_impl(new_socket);
}
template <typename Stream_Socket_Service, typename Handler,
typename Completion_Context>
template <typename Stream_Socket_Service, typename Handler>
class accept_handler
{
public:
accept_handler(impl_type impl, Demuxer& demuxer,
basic_stream_socket<Stream_Socket_Service>& peer, Handler handler,
Completion_Context context)
basic_stream_socket<Stream_Socket_Service>& peer, Handler handler)
: impl_(impl),
demuxer_(demuxer),
peer_(peer),
handler_(handler),
context_(context)
handler_(handler)
{
}
@ -207,13 +204,15 @@ public:
socket_error error(new_socket == invalid_socket
? socket_ops::get_error() : socket_error::success);
peer_.set_impl(new_socket);
demuxer_.operation_completed(bind_handler(handler_, error), context_);
demuxer_.post(bind_handler(handler_, error));
demuxer_.work_finished();
}
void do_cancel()
{
socket_error error(socket_error::operation_aborted);
demuxer_.operation_completed(bind_handler(handler_, error), context_);
demuxer_.post(bind_handler(handler_, error));
demuxer_.work_finished();
}
private:
@ -221,45 +220,40 @@ public:
Demuxer& demuxer_;
basic_stream_socket<Stream_Socket_Service>& peer_;
Handler handler_;
Completion_Context context_;
};
// Start an asynchronous accept. The peer_socket object must be valid until
// the accept's completion handler is invoked.
template <typename Stream_Socket_Service, typename Handler,
typename Completion_Context>
// the accept's handler is invoked.
template <typename Stream_Socket_Service, typename Handler>
void async_accept(impl_type& impl,
basic_stream_socket<Stream_Socket_Service>& peer, Handler handler,
Completion_Context context)
basic_stream_socket<Stream_Socket_Service>& peer, Handler handler)
{
if (peer.impl() != invalid_socket)
{
socket_error error(socket_error::already_connected);
demuxer_.operation_immediate(bind_handler(handler, error));
demuxer_.post(bind_handler(handler, error));
}
else
{
demuxer_.operation_started();
demuxer_.work_started();
reactor_.start_read_op(impl,
accept_handler<Stream_Socket_Service, Handler, Completion_Context>(
impl, demuxer_, peer, handler, context));
accept_handler<Stream_Socket_Service, Handler>(
impl, demuxer_, peer, handler));
}
}
template <typename Stream_Socket_Service, typename Address, typename Handler,
typename Completion_Context>
template <typename Stream_Socket_Service, typename Address, typename Handler>
class accept_addr_handler
{
public:
accept_addr_handler(impl_type impl, Demuxer& demuxer,
basic_stream_socket<Stream_Socket_Service>& peer,
Address& peer_address, Handler handler, Completion_Context context)
Address& peer_address, Handler handler)
: impl_(impl),
demuxer_(demuxer),
peer_(peer),
peer_address_(peer_address),
handler_(handler),
context_(context)
handler_(handler)
{
}
@ -272,13 +266,15 @@ public:
? socket_ops::get_error() : socket_error::success);
peer_address_.native_size(addr_len);
peer_.set_impl(new_socket);
demuxer_.operation_completed(bind_handler(handler_, error), context_);
demuxer_.post(bind_handler(handler_, error));
demuxer_.work_finished();
}
void do_cancel()
{
socket_error error(socket_error::operation_aborted);
demuxer_.operation_completed(bind_handler(handler_, error), context_);
demuxer_.post(bind_handler(handler_, error));
demuxer_.work_finished();
}
private:
@ -287,34 +283,31 @@ public:
basic_stream_socket<Stream_Socket_Service>& peer_;
Address& peer_address_;
Handler handler_;
Completion_Context context_;
};
// Start an asynchronous accept. The peer_socket and peer_address objects
// must be valid until the accept's completion handler is invoked.
template <typename Stream_Socket_Service, typename Address, typename Handler,
typename Completion_Context>
// must be valid until the accept's handler is invoked.
template <typename Stream_Socket_Service, typename Address, typename Handler>
void async_accept_address(impl_type& impl,
basic_stream_socket<Stream_Socket_Service>& peer,
Address& peer_address, Handler handler, Completion_Context context)
Address& peer_address, Handler handler)
{
if (peer.impl() != invalid_socket)
{
socket_error error(socket_error::already_connected);
demuxer_.operation_immediate(bind_handler(handler, error));
demuxer_.post(bind_handler(handler, error));
}
else
{
demuxer_.operation_started();
demuxer_.work_started();
reactor_.start_read_op(impl,
accept_addr_handler<Stream_Socket_Service, Address, Handler,
Completion_Context>(impl, demuxer_, peer, peer_address, handler,
context));
accept_addr_handler<Stream_Socket_Service, Address, Handler>(
impl, demuxer_, peer, peer_address, handler));
}
}
private:
// The demuxer used for delivering completion notifications.
// The demuxer used for dispatching handlers.
Demuxer& demuxer_;
// The selector that performs event demultiplexing for the provider.

View File

@ -235,22 +235,19 @@ public:
peer.set_impl(sock.release());
}
template <typename Stream_Socket_Service, typename Address, typename Handler,
typename Completion_Context>
template <typename Stream_Socket_Service, typename Address, typename Handler>
class connect_handler
{
public:
connect_handler(impl_type impl, socket_type new_socket, Demuxer& demuxer,
basic_stream_socket<Stream_Socket_Service>& peer,
const Address& peer_address, Handler handler,
Completion_Context context)
const Address& peer_address, Handler handler)
: impl_(impl),
new_socket_(new_socket),
demuxer_(demuxer),
peer_(peer),
peer_address_(peer_address),
handler_(handler),
context_(context)
handler_(handler)
{
}
@ -267,15 +264,17 @@ public:
&connect_error, &connect_error_len) == socket_error_retval)
{
socket_error error(socket_ops::get_error());
demuxer_.operation_completed(bind_handler(handler_, error), context_);
demuxer_.post(bind_handler(handler_, error));
demuxer_.work_finished();
return;
}
// If connection failed then post a completion with the error code.
// If connection failed then post the handler with the error code.
if (connect_error)
{
socket_error error(connect_error);
demuxer_.operation_completed(bind_handler(handler_, error), context_);
demuxer_.post(bind_handler(handler_, error));
demuxer_.work_finished();
return;
}
@ -284,7 +283,8 @@ public:
if (socket_ops::ioctl(new_socket_, FIONBIO, &non_blocking))
{
socket_error error(socket_ops::get_error());
demuxer_.operation_completed(bind_handler(handler_, error), context_);
demuxer_.post(bind_handler(handler_, error));
demuxer_.work_finished();
return;
}
@ -292,7 +292,8 @@ public:
peer_.set_impl(new_socket_);
new_socket_holder.release();
socket_error error(socket_error::success);
demuxer_.operation_completed(bind_handler(handler_, error), context_);
demuxer_.post(bind_handler(handler_, error));
demuxer_.work_finished();
}
void do_cancel()
@ -300,7 +301,8 @@ public:
// The socket is closed when the reactor_.close_descriptor is called,
// so no need to close it here.
socket_error error(socket_error::operation_aborted);
demuxer_.operation_completed(bind_handler(handler_, error), context_);
demuxer_.post(bind_handler(handler_, error));
demuxer_.work_finished();
}
private:
@ -310,22 +312,19 @@ public:
basic_stream_socket<Stream_Socket_Service>& peer_;
Address peer_address_;
Handler handler_;
Completion_Context context_;
};
// Start an asynchronous connect. The peer socket object must be valid until
// the connect's completion handler is invoked.
template <typename Stream_Socket_Service, typename Address, typename Handler,
typename Completion_Context>
// the connect's handler is invoked.
template <typename Stream_Socket_Service, typename Address, typename Handler>
void async_connect(impl_type& impl,
basic_stream_socket<Stream_Socket_Service>& peer,
const Address& peer_address, Handler handler,
Completion_Context context)
const Address& peer_address, Handler handler)
{
if (peer.impl() != invalid_socket)
{
socket_error error(socket_error::already_connected);
demuxer_.operation_immediate(bind_handler(handler, error));
demuxer_.post(bind_handler(handler, error));
return;
}
@ -340,7 +339,7 @@ public:
if (type != SOCK_STREAM)
{
socket_error error(socket_error::invalid_argument);
demuxer_.operation_immediate(bind_handler(handler, error));
demuxer_.post(bind_handler(handler, error));
return;
}
@ -350,7 +349,7 @@ public:
if (new_socket.get() == invalid_socket)
{
socket_error error(socket_ops::get_error());
demuxer_.operation_immediate(bind_handler(handler, error), context);
demuxer_.post(bind_handler(handler, error));
return;
}
@ -360,7 +359,7 @@ public:
if (socket_ops::ioctl(new_socket.get(), FIONBIO, &non_blocking))
{
socket_error error(socket_ops::get_error());
demuxer_.operation_immediate(bind_handler(handler, error), context);
demuxer_.post(bind_handler(handler, error));
return;
}
@ -369,10 +368,10 @@ public:
peer_address.native_size()) == 0)
{
// The connect operation has finished successfully so we need to post the
// completion immediately.
// handler immediately.
peer.set_impl(new_socket.release());
socket_error error(socket_error::success);
demuxer_.operation_immediate(bind_handler(handler, error), context);
demuxer_.post(bind_handler(handler, error));
}
else if (socket_ops::get_error() == socket_error::in_progress
|| socket_ops::get_error() == socket_error::would_block)
@ -380,23 +379,22 @@ public:
// The connection is happening in the background, and we need to wait
// until the socket becomes writeable.
impl->add_socket(new_socket.get());
demuxer_.operation_started();
demuxer_.work_started();
reactor_.start_write_op(new_socket.get(),
connect_handler<Stream_Socket_Service, Address, Handler,
Completion_Context>(impl, new_socket.get(), demuxer_, peer,
peer_address, handler, context));
connect_handler<Stream_Socket_Service, Address, Handler>(
impl, new_socket.get(), demuxer_, peer, peer_address, handler));
new_socket.release();
}
else
{
// The connect operation has failed, so post completion immediately.
// The connect operation has failed, so post the handler immediately.
socket_error error(socket_ops::get_error());
demuxer_.operation_immediate(bind_handler(handler, error), context);
demuxer_.post(bind_handler(handler, error));
}
}
private:
// The demuxer used for delivering completion notifications.
// The demuxer used for dispatching handlers.
Demuxer& demuxer_;
// The selector that performs event demultiplexing for the provider.

View File

@ -129,18 +129,17 @@ public:
return bytes_sent;
}
template <typename Handler, typename Completion_Context>
template <typename Handler>
class send_handler
{
public:
send_handler(impl_type impl, Demuxer& demuxer, const void* data,
size_t length, Handler handler, Completion_Context context)
size_t length, Handler handler)
: impl_(impl),
demuxer_(demuxer),
data_(data),
length_(length),
handler_(handler),
context_(context)
handler_(handler)
{
}
@ -149,15 +148,15 @@ public:
int bytes = socket_ops::send(impl_, data_, length_, 0);
socket_error error(bytes < 0
? socket_ops::get_error() : socket_error::success);
demuxer_.operation_completed(bind_handler(handler_, error, bytes),
context_);
demuxer_.post(bind_handler(handler_, error, bytes));
demuxer_.work_finished();
}
void do_cancel()
{
socket_error error(socket_error::operation_aborted);
demuxer_.operation_completed(bind_handler(handler_, error, 0),
context_);
demuxer_.post(bind_handler(handler_, error, 0));
demuxer_.work_finished();
}
private:
@ -166,18 +165,17 @@ public:
const void* data_;
size_t length_;
Handler handler_;
Completion_Context context_;
};
// Start an asynchronous send. The data being sent must be valid for the
// lifetime of the asynchronous operation.
template <typename Handler, typename Completion_Context>
template <typename Handler>
void async_send(impl_type& impl, const void* data, size_t length,
Handler handler, Completion_Context context)
Handler handler)
{
demuxer_.operation_started();
reactor_.start_write_op(impl, send_handler<Handler, Completion_Context>(
impl, demuxer_, data, length, handler, context));
demuxer_.work_started();
reactor_.start_write_op(impl,
send_handler<Handler>(impl, demuxer_, data, length, handler));
}
// Receive some data from the peer. Returns the number of bytes received or
@ -195,18 +193,17 @@ public:
return bytes_recvd;
}
template <typename Handler, typename Completion_Context>
template <typename Handler>
class recv_handler
{
public:
recv_handler(impl_type impl, Demuxer& demuxer, void* data,
size_t max_length, Handler handler, Completion_Context context)
size_t max_length, Handler handler)
: impl_(impl),
demuxer_(demuxer),
data_(data),
max_length_(max_length),
handler_(handler),
context_(context)
handler_(handler)
{
}
@ -215,14 +212,15 @@ public:
int bytes = socket_ops::recv(impl_, data_, max_length_, 0);
socket_error error(bytes < 0
? socket_ops::get_error() : socket_error::success);
demuxer_.operation_completed(bind_handler(handler_, error, bytes),
context_);
demuxer_.post(bind_handler(handler_, error, bytes));
demuxer_.work_finished();
}
void do_cancel()
{
socket_error error(socket_error::operation_aborted);
demuxer_.operation_completed(bind_handler(handler_, error, 0), context_);
demuxer_.post(bind_handler(handler_, error, 0));
demuxer_.work_finished();
}
private:
@ -231,22 +229,21 @@ public:
void* data_;
size_t max_length_;
Handler handler_;
Completion_Context context_;
};
// Start an asynchronous receive. The buffer for the data being received
// must be valid for the lifetime of the asynchronous operation.
template <typename Handler, typename Completion_Context>
template <typename Handler>
void async_recv(impl_type& impl, void* data, size_t max_length,
Handler handler, Completion_Context context)
Handler handler)
{
demuxer_.operation_started();
reactor_.start_read_op(impl, recv_handler<Handler, Completion_Context>(
impl, demuxer_, data, max_length, handler, context));
demuxer_.work_started();
reactor_.start_read_op(impl,
recv_handler<Handler>( impl, demuxer_, data, max_length, handler));
}
private:
// The demuxer used for delivering completion notifications.
// The demuxer used for dispatching handlers.
Demuxer& demuxer_;
// The reactor that performs event demultiplexing for the provider.

View File

@ -130,50 +130,48 @@ public:
}
}
template <typename Handler, typename Completion_Context>
template <typename Handler>
class wait_handler
{
public:
wait_handler(impl_type& impl, Demuxer& demuxer, Handler handler,
Completion_Context context)
wait_handler(impl_type& impl, Demuxer& demuxer, Handler handler)
: impl_(impl),
demuxer_(demuxer),
handler_(handler),
context_(context)
handler_(handler)
{
}
void do_operation()
{
impl_->token = 0;
demuxer_.operation_completed(handler_, context_);
demuxer_.post(handler_);
demuxer_.work_finished();
}
void do_cancel()
{
impl_->token = 0;
demuxer_.operation_completed(handler_, context_);
demuxer_.post(handler_);
demuxer_.work_finished();
}
private:
impl_type& impl_;
Demuxer& demuxer_;
Handler handler_;
Completion_Context context_;
};
// Start an asynchronous wait on the timer.
template <typename Handler, typename Completion_Context>
void async_wait(impl_type& impl, Handler handler, Completion_Context context)
template <typename Handler>
void async_wait(impl_type& impl, Handler handler)
{
demuxer_.operation_started();
demuxer_.work_started();
reactor_.schedule_timer(impl->expiry.sec(), impl->expiry.usec(),
wait_handler<Handler, Completion_Context>(impl, demuxer_, handler,
context), impl->token);
wait_handler<Handler>(impl, demuxer_, handler), impl->token);
}
private:
// The demuxer used for delivering completion notifications.
// The demuxer used for dispatching handlers.
Demuxer& demuxer_;
// The selector that performs event demultiplexing for the provider.

View File

@ -44,9 +44,9 @@ public:
mutex_(),
task_(demuxer.get_service(service_factory<Task>())),
task_is_running_(false),
outstanding_operations_(0),
ready_completions_(0),
ready_completions_end_(0),
outstanding_work_(0),
handler_queue_(0),
handler_queue_end_(0),
interrupted_(false),
current_thread_in_pool_(),
first_idle_thread_(0)
@ -71,19 +71,19 @@ public:
asio::detail::mutex::scoped_lock lock(mutex_);
while (!interrupted_ && outstanding_operations_ > 0)
while (!interrupted_ && outstanding_work_ > 0)
{
if (ready_completions_)
if (handler_queue_)
{
completion_base* comp = ready_completions_;
ready_completions_ = comp->next_;
if (ready_completions_ == 0)
ready_completions_end_ = 0;
handler_base* h = handler_queue_;
handler_queue_ = h->next_;
if (handler_queue_ == 0)
handler_queue_end_ = 0;
lock.unlock();
comp->call();
delete comp;
h->call();
delete h;
lock.lock();
--outstanding_operations_;
--outstanding_work_;
}
else if (!task_is_running_)
{
@ -148,91 +148,56 @@ public:
interrupted_ = false;
}
// Notify the demuxer that an operation has started.
void operation_started()
// Notify the demuxer that some work has started.
void work_started()
{
asio::detail::mutex::scoped_lock lock(mutex_);
++outstanding_operations_;
++outstanding_work_;
}
// Notify the demuxer that an operation has completed.
template <typename Handler, typename Completion_Context>
void operation_completed(Handler handler, Completion_Context context,
bool allow_nested_delivery)
// Notify the demuxer that some work has finished.
void work_finished()
{
asio::detail::mutex::scoped_lock lock(mutex_);
if (context.try_acquire())
{
if (allow_nested_delivery && current_thread_in_pool_)
{
lock.unlock();
completion<Handler, Completion_Context>::do_upcall(handler);
context.release();
lock.lock();
if (--outstanding_operations_ == 0)
if (--outstanding_work_ == 0)
interrupt_all_threads();
}
// Request the demuxer to invoke the given handler.
template <typename Handler>
void dispatch(Handler handler)
{
if (current_thread_in_pool_)
handler_wrapper<Handler>::do_upcall(handler);
else
post(handler);
}
// Request the demuxer to invoke the given handler and return immediately.
template <typename Handler>
void post(Handler handler)
{
completion_base* comp =
new completion<Handler, Completion_Context>(handler, context);
if (ready_completions_end_)
asio::detail::mutex::scoped_lock lock(mutex_);
// Add the handler to the end of the queue.
handler_base* h = new handler_wrapper<Handler>(handler);
if (handler_queue_end_)
{
ready_completions_end_->next_ = comp;
ready_completions_end_ = comp;
handler_queue_end_->next_ = h;
handler_queue_end_ = h;
}
else
{
ready_completions_ = ready_completions_end_ = comp;
handler_queue_ = handler_queue_end_ = h;
}
// An undelivered handler is treated as unfinished work.
++outstanding_work_;
// Wake up a thread to execute the handler.
if (!interrupt_one_idle_thread())
interrupt_task();
}
}
else
{
completion_base* comp =
new completion<Handler, Completion_Context>(handler, context);
context.acquire(bind_handler(
task_demuxer_service<Task>::completion_context_acquired, this,
comp));
}
}
// Notify the demuxer of an operation that started and finished immediately.
template <typename Handler, typename Completion_Context>
void operation_immediate(Handler handler, Completion_Context context,
bool allow_nested_delivery)
{
operation_started();
operation_completed(handler, context, allow_nested_delivery);
}
// Callback function when a completion context has been acquired.
static void completion_context_acquired(task_demuxer_service<Task>* service,
void* arg) throw ()
{
try
{
asio::detail::mutex::scoped_lock lock(service->mutex_);
completion_base* comp = static_cast<completion_base*>(arg);
if (service->ready_completions_end_)
{
service->ready_completions_end_->next_ = comp;
service->ready_completions_end_ = comp;
}
else
{
service->ready_completions_ = service->ready_completions_end_ = comp;
}
if (!service->interrupt_one_idle_thread())
service->interrupt_task();
}
catch (...)
{
}
}
private:
// Interrupt the task and all idle threads.
@ -283,43 +248,41 @@ private:
return false;
}
// The base class for all completions.
class completion_base
// The base class for all handler wrappers.
class handler_base
{
public:
virtual ~completion_base()
virtual ~handler_base()
{
}
virtual void call() = 0;
protected:
completion_base()
handler_base()
: next_(0)
{
}
private:
friend class task_demuxer_service<Task>;
completion_base* next_;
handler_base* next_;
};
// Template for completions specific to a handler.
template <typename Handler, typename Completion_Context>
class completion
: public completion_base
// Template wrapper for handlers.
template <typename Handler>
class handler_wrapper
: public handler_base
{
public:
completion(Handler handler, Completion_Context context)
: handler_(handler),
context_(context)
handler_wrapper(Handler handler)
: handler_(handler)
{
}
virtual void call()
{
do_upcall(handler_);
context_.release();
}
static void do_upcall(Handler& handler)
@ -335,7 +298,6 @@ private:
private:
Handler handler_;
Completion_Context context_;
};
// The demuxer that owns this service.
@ -350,14 +312,14 @@ private:
// Whether the task is currently running.
bool task_is_running_;
// The number of operations that have not yet completed.
int outstanding_operations_;
// The count of unfinished work.
int outstanding_work_;
// The start of a linked list of completions that are ready to be delivered.
completion_base* ready_completions_;
// The start of a linked list of handlers that are ready to be delivered.
handler_base* handler_queue_;
// The end of a linked list of completions that are ready to be delivered.
completion_base* ready_completions_end_;
// The end of a linked list of handlers that are ready to be delivered.
handler_base* handler_queue_end_;
// Flag to indicate that the dispatcher has been interrupted.
bool interrupted_;

View File

@ -1,68 +0,0 @@
//
// null_completion_context.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003, 2004 Christopher M. Kohlhoff (chris@kohlhoff.com)
//
// Permission to use, copy, modify, distribute and sell this software and its
// documentation for any purpose is hereby granted without fee, provided that
// the above copyright notice appears in all copies and that both the copyright
// notice and this permission notice appear in supporting documentation. This
// software is provided "as is" without express or implied warranty, and with
// no claim as to its suitability for any purpose.
//
#ifndef ASIO_NULL_COMPLETION_CONTEXT_HPP
#define ASIO_NULL_COMPLETION_CONTEXT_HPP
#include "asio/detail/push_options.hpp"
namespace asio {
/// The null_completion_context class is a concrete implementation of the
/// Completion_Context concept. It does not place any limits on the number of
/// concurrent upcalls to completion handlers that may be associated with the
/// context. All instances of this class are equivalent.
class null_completion_context
{
public:
/// Attempt to acquire the right to make an upcall.
/**
* This function is called to attempt to obtain the right to make an upcall
* to a completion handler. This function always returns a result
* immediately.
*
* If the right to make an upcall was successfully acquired, then a later
* call must be made to the release() function to relinquish that right.
*
* @return Returns true if the right to make an upcall was granted.
*/
bool try_acquire()
{
return true;
}
/// Acquire the right to make an upcall.
/**
* This function is called to obtain the right to make an upcall to a
* completion handler. The handler will be called when the right is granted.
*
* @param handler The function object to be called when the right is granted.
*/
template <typename Handler>
void acquire(Handler handler)
{
handler();
}
/// Relinquish a previously granted right to make an upcall.
void release()
{
}
};
} // namespace asio
#include "asio/detail/pop_options.hpp"
#endif // ASIO_NULL_COMPLETION_CONTEXT_HPP

View File

@ -102,9 +102,9 @@ size_t recv(Stream& s, void* data, size_t max_length,
*
* @param max_length The maximum size of the data to be received, in bytes.
*
* @param handler The completion handler to be called when the receive
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @param handler The handler to be called when the receive operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @code template <typename Error>
* void handler(
* const Error& error, // Result of operation (the actual type is dependent
@ -123,45 +123,6 @@ void async_recv(Stream& s, void* data, size_t max_length, Handler handler)
s.async_recv(data, max_length, handler);
}
/// Start an asynchronous receive.
/**
* This function is used to asynchronously receive data on a stream. The
* function call always returns immediately.
*
* @param s The stream on which the data is to be received.
*
* @param data The buffer into which the received data will be written.
* Ownership of the buffer is retained by the caller, which must guarantee
* that it is valid until the handler is called.
*
* @param max_length The maximum size of the data to be received, in bytes.
*
* @param handler The completion handler to be called when the receive
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @code template <typename Error>
* void handler(
* const Error& error, // Result of operation (the actual type is dependent
* // on the underlying stream's recv operation)
* size_t bytes_received // Number of bytes received
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made of
* the context object as required, however all copies are equivalent.
*
* @note The recv operation may not receive all of the requested number of
* bytes. Consider using the asio::async_recv_n() function if you need to
* ensure that the requested amount of data is received before the asynchronous
* operation completes.
*/
template <typename Stream, typename Handler, typename Completion_Context>
void async_recv(Stream& s, void* data, size_t max_length, Handler handler,
Completion_Context context)
{
s.async_recv(data, max_length, handler, context);
}
/// Read the specified amount of data from the stream before returning.
/**
* This function is used to receive an exact number of bytes of data on a
@ -258,18 +219,16 @@ size_t recv_n(Stream& s, void* data, size_t length,
namespace detail
{
template <typename Stream, typename Handler, typename Completion_Context>
template <typename Stream, typename Handler>
class recv_n_handler
{
public:
recv_n_handler(Stream& stream, void* data, size_t length, Handler handler,
Completion_Context context)
recv_n_handler(Stream& stream, void* data, size_t length, Handler handler)
: stream_(stream),
data_(data),
length_(length),
total_recvd_(0),
handler_(handler),
context_(context)
handler_(handler)
{
}
@ -279,8 +238,8 @@ namespace detail
total_recvd_ += bytes_recvd;
if (e || bytes_recvd == 0 || total_recvd_ == length_)
{
stream_.demuxer().operation_immediate(detail::bind_handler(handler_, e,
total_recvd_, bytes_recvd), context_, true);
stream_.demuxer().dispatch(detail::bind_handler(handler_, e,
total_recvd_, bytes_recvd));
}
else
{
@ -295,7 +254,6 @@ namespace detail
size_t length_;
size_t total_recvd_;
Handler handler_;
Completion_Context context_;
};
} // namespace detail
@ -313,9 +271,9 @@ namespace detail
*
* @param length The size of the data to be received, in bytes.
*
* @param handler The completion handler to be called when the receive
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @param handler The handler to be called when the receive operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @code template <typename Error>
* void handler(
* const Error& error, // Result of operation (the actual type is
@ -330,48 +288,7 @@ template <typename Stream, typename Handler>
void async_recv_n(Stream& s, void* data, size_t length, Handler handler)
{
async_recv(s, data, length,
detail::recv_n_handler<Stream, Handler, null_completion_context>(s, data,
length, handler, null_completion_context()));
}
/// Start an asynchronous receive that will not complete until the specified
/// amount of data has been received.
/**
* This function is used to asynchronously receive an exact number of bytes of
* data on a stream. The function call always returns immediately.
*
* @param s The stream on which the data is to be received.
*
* @param data The buffer into which the received data will be written.
* Ownership of the buffer is retained by the caller, which must guarantee
* that it is valid until the handler is called.
*
* @param length The size of the data to be received, in bytes.
*
* @param handler The completion handler to be called when the receive
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @code template <typename Error>
* void handler(
* const Error& error, // Result of operation (the actual type is
* // dependent on the underlying stream's recv
* // operation)
* size_t total_bytes_recvd, // Total number of bytes successfully received
* size_t last_bytes_recvd // Number of bytes received on last recv
* // operation
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made of
* the context object as required, however all copies are equivalent.
*/
template <typename Stream, typename Handler, typename Completion_Context>
void async_recv_n(Stream& s, void* data, size_t length, Handler handler,
Completion_Context context)
{
async_recv(s, data, length,
detail::recv_n_handler<Stream, Handler, Completion_Context>(s, data,
length, handler, context));
detail::recv_n_handler<Stream, Handler>(s, data, length, handler));
}
/// Read some data from a stream and decode it.
@ -508,18 +425,16 @@ size_t recv_decode(Buffered_Stream& s, Decoder decoder,
namespace detail
{
template <typename Buffered_Stream, typename Decoder, typename Handler,
typename Completion_Context>
template <typename Buffered_Stream, typename Decoder, typename Handler>
class recv_decode_handler
{
public:
recv_decode_handler(Buffered_Stream& stream, Decoder decoder,
Handler handler, Completion_Context context)
Handler handler)
: stream_(stream),
decoder_(decoder),
total_recvd_(0),
handler_(handler),
context_(context)
handler_(handler)
{
}
@ -528,8 +443,8 @@ namespace detail
{
if (e || bytes_recvd == 0)
{
stream_.demuxer().operation_immediate(detail::bind_handler(handler_, e,
total_recvd_, bytes_recvd), context_, true);
stream_.demuxer().dispatch(
detail::bind_handler(handler_, e, total_recvd_, bytes_recvd));
}
else
{
@ -545,8 +460,8 @@ namespace detail
if (result.first)
{
stream_.demuxer().operation_immediate(detail::bind_handler(
handler_, 0, total_recvd_, bytes_read), context_, true);
stream_.demuxer().dispatch(
detail::bind_handler(handler_, 0, total_recvd_, bytes_read));
return;
}
}
@ -560,7 +475,6 @@ namespace detail
Decoder decoder_;
size_t total_recvd_;
Handler handler_;
Completion_Context context_;
};
} // namespace detail
@ -590,9 +504,9 @@ namespace detail
* The second element is a pointer to the beginning of the unused portion of
* the data.
*
* @param handler The completion handler to be called when the receive
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @param handler The handler to be called when the receive operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @code template <typename Error>
* void handler(
* const Error& error, // Result of operation (the actual type is
@ -616,83 +530,14 @@ void async_recv_decode(Buffered_Stream& s, Decoder decoder, Handler handler)
if (result.first)
{
s.demuxer().operation_immediate(detail::bind_handler(handler, 0,
bytes_read, bytes_read));
s.demuxer().dispatch(
detail::bind_handler(handler, 0, bytes_read, bytes_read));
return;
}
}
s.async_fill(detail::recv_decode_handler<Buffered_Stream, Decoder, Handler,
null_completion_context>(s, decoder, handler,
null_completion_context()));
}
/// Start an asynchronous receive that will not complete until some data has
/// been fully decoded.
/**
* This function is used to receive data on a stream and decode it in a single
* asynchronous operation. The function call always returns immediately. The
* asynchronous operation will complete only when the decoder indicates that it
* has finished.
*
* @param s The stream on which the data is to be received.
*
* @param decoder The decoder function object to be called to decode the
* received data. The function object is assumed to be stateful. That is, it
* may not be given sufficient data in a single invocation to complete
* decoding, and is expected to maintain state so that it may resume decoding
* when the next piece of data is supplied. Copies will be made of the decoder
* function object as required, however with respect to maintaining state it
* can rely on the fact that only an up-to-date copy will be used. The
* equivalent function signature of the handler must be:
* @code std::pair<bool, const char*> decoder(
* const char* begin, // Pointer to the beginning of data to be decoded.
* const char* end // Pointer to one-past-the-end of data to be decoded.
* ); @endcode
* The first element of the return value is true if the decoder has finished.
* The second element is a pointer to the beginning of the unused portion of
* the data.
*
* @param handler The completion handler to be called when the receive
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @code template <typename Error>
* void handler(
* const Error& error, // Result of operation (the actual type is
* // dependent on the underlying stream's recv
* // operation)
* size_t total_bytes_recvd, // Total number of bytes successfully received
* size_t last_bytes_recvd // Number of bytes received on last recv
* // operation
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made of
* the context object as required, however all copies are equivalent.
*/
template <typename Buffered_Stream, typename Decoder, typename Handler,
typename Completion_Context>
void async_recv_decode(Buffered_Stream& s, Decoder decoder, Handler handler,
Completion_Context context)
{
while (!s.recv_buffer().empty())
{
std::pair<bool, const char*> result =
decoder(s.recv_buffer().begin(), s.recv_buffer().end());
size_t bytes_read = result.second - s.recv_buffer().begin();
s.recv_buffer().pop(bytes_read);
if (result.first)
{
s.demuxer().operation_immediate(detail::bind_handler(handler, 0,
bytes_read, bytes_read));
return;
}
}
s.async_fill(detail::recv_decode_handler<Buffered_Stream, Decoder, Handler,
Completion_Context>(s, decoder, handler, context));
s.async_fill(detail::recv_decode_handler<Buffered_Stream, Decoder, Handler>(
s, decoder, handler));
}
namespace detail
@ -822,9 +667,9 @@ size_t recv_until(Buffered_Stream& s, std::string& data,
* @param delimiter The pattern marking the end of the data to receive. Copies
* will be made of the string as required.
*
* @param handler The completion handler to be called when the receive
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @param handler The handler to be called when the receive operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @code template <typename Error>
* void handler(
* const Error& error, // Result of operation (the actual type is
@ -842,47 +687,6 @@ void async_recv_until(Buffered_Stream& s, std::string& data,
async_recv_decode(s, detail::recv_until_decoder(data, delimiter), handler);
}
/// Start an asynchronous receive that will not complete until the specified
/// delimiter is encountered.
/**
* This function is used to asynchronously receive data from a stream until a
* given delimiter is found. The function call always returns immediately.
*
* @param s The stream on which the data is to be received.
*
* @param data The std:::string object into which the received data will be
* written. Ownership of the object is retained by the caller, which must
* guarantee that it is valid until the handler is called.
*
* @param delimiter The pattern marking the end of the data to receive. Copies
* will be made of the string as required.
*
* @param handler The completion handler to be called when the receive
* operation completes. Copies will be made of the handler as required. The
* equivalent function signature of the handler must be:
* @code template <typename Error>
* void handler(
* const Error& error, // Result of operation (the actual type is
* // dependent on the underlying stream's recv
* // operation)
* size_t total_bytes_recvd, // Total number of bytes successfully received
* size_t last_bytes_recvd // Number of bytes received on last recv
* // operation
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made of
* the context object as required, however all copies are equivalent.
*/
template <typename Buffered_Stream, typename Handler,
typename Completion_Context>
void async_recv_until(Buffered_Stream& s, std::string& data,
const std::string& delimiter, Handler handler, Completion_Context context)
{
async_recv_decode(s, detail::recv_until_decoder(data, delimiter), handler,
context);
}
} // namespace asio
#include "asio/detail/pop_options.hpp"

View File

@ -95,9 +95,9 @@ size_t send(Stream& s, const void* data, size_t length,
*
* @param length The size of the data to be sent, in bytes.
*
* @param handler The completion handler to be called when the send operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @param handler The handler to be called when the send operation completes.
* Copies will be made of the handler as required. The equivalent function
* signature of the handler must be:
* @code template <typename Error>
* void handler(
* const Error& error, // Result of operation (the actual type is dependent
@ -115,44 +115,6 @@ void async_send(Stream& s, const void* data, size_t length, Handler handler)
s.async_send(data, length, handler);
}
/// Start an asynchronous send.
/**
* This function is used to asynchronously send data on a stream. The function
* call always returns immediately.
*
* @param s The stream on which the data is to be sent.
*
* @param data The data to be sent on the stream. Ownership of the data is
* retained by the caller, which must guarantee that it is valid until the
* handler is called.
*
* @param length The size of the data to be sent, in bytes.
*
* @param handler The completion handler to be called when the send operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @code template <typename Error>
* void handler(
* const Error& error, // Result of operation (the actual type is dependent
* // on the underlying stream's send operation)
* size_t bytes_sent // Number of bytes sent
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made of
* the context object as required, however all copies are equivalent.
*
* @note The send operation may not transmit all of the data to the peer.
* Consider using the asio::async_send_n() function if you need to ensure that
* all data is sent before the asynchronous operation completes.
*/
template <typename Stream, typename Handler, typename Completion_Context>
void async_send(Stream& s, const void* data, size_t length, Handler handler,
Completion_Context context)
{
s.async_send(data, length, handler, context);
}
/// Write all of the given data to the stream before returning.
/**
* This function is used to send an exact number of bytes of data on a stream.
@ -249,18 +211,17 @@ size_t send_n(Stream& s, const void* data, size_t length,
namespace detail
{
template <typename Stream, typename Handler, typename Completion_Context>
template <typename Stream, typename Handler>
class send_n_handler
{
public:
send_n_handler(Stream& stream, const void* data, size_t length,
Handler handler, Completion_Context context)
Handler handler)
: stream_(stream),
data_(data),
length_(length),
total_sent_(0),
handler_(handler),
context_(context)
handler_(handler)
{
}
@ -270,8 +231,8 @@ namespace detail
total_sent_ += bytes_sent;
if (e || bytes_sent == 0 || total_sent_ == length_)
{
stream_.demuxer().operation_immediate(detail::bind_handler(handler_, e,
total_sent_, bytes_sent), context_, true);
stream_.demuxer().dispatch(
detail::bind_handler(handler_, e, total_sent_, bytes_sent));
}
else
{
@ -286,7 +247,6 @@ namespace detail
size_t length_;
size_t total_sent_;
Handler handler_;
Completion_Context context_;
};
} // namespace detail
@ -304,9 +264,9 @@ namespace detail
*
* @param length The size of the data to be sent, in bytes.
*
* @param handler The completion handler to be called when the send operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @param handler The handler to be called when the send operation completes.
* Copies will be made of the handler as required. The equivalent function
* signature of the handler must be:
* @code template <typename Error>
* void handler(
* const Error& error, // Result of operation (the actual type is
@ -320,47 +280,7 @@ template <typename Stream, typename Handler>
void async_send_n(Stream& s, const void* data, size_t length, Handler handler)
{
async_send(s, data, length,
detail::send_n_handler<Stream, Handler, null_completion_context>(s,
data, length, handler, null_completion_context()));
}
/// Start an asynchronous send that will not complete until the specified
/// amount of data has been sent.
/**
* This function is used to asynchronously send an exact number of bytes of
* data on a stream. The function call always returns immediately.
*
* @param s The stream on which the data is to be sent.
*
* @param data The data to be sent on the stream. Ownership of the data is
* retained by the caller, which must guarantee that it is valid until the
* handler is called.
*
* @param length The size of the data to be sent, in bytes.
*
* @param handler The completion handler to be called when the send operation
* completes. Copies will be made of the handler as required. The equivalent
* function signature of the handler must be:
* @code template <typename Error>
* void handler(
* const Error& error, // Result of operation (the actual type is
* // dependent on the underlying stream's send
* // operation)
* size_t total_bytes_sent, // Total number of bytes successfully sent
* size_t last_bytes_sent // Number of bytes sent on last send operation
* ); @endcode
*
* @param context The completion context which controls the number of
* concurrent invocations of handlers that may be made. Copies will be made of
* the context object as required, however all copies are equivalent.
*/
template <typename Stream, typename Handler, typename Completion_Context>
void async_send_n(Stream& s, const void* data, size_t length, Handler handler,
Completion_Context context)
{
async_send(s, data, length,
detail::send_n_handler<Stream, Handler, Completion_Context>(s, data,
length, handler, context));
detail::send_n_handler<Stream, Handler>(s, data, length, handler));
}
} // namespace asio

View File

@ -37,9 +37,7 @@
<H2>Support Classes</H2>
\li asio::counting_completion_context
\li asio::ipv4::address
\li asio::null_completion_context
\li asio::socket_error
\li asio::timer_base

View File

@ -21,13 +21,12 @@ public:
void send(const chat_message& msg)
{
demuxer_.operation_immediate(
boost::bind(&chat_client::do_send, this, msg));
demuxer_.post(boost::bind(&chat_client::do_send, this, msg));
}
void close()
{
demuxer_.operation_immediate(boost::bind(&chat_client::do_close, this));
demuxer_.post(boost::bind(&chat_client::do_close, this));
}
private:

View File

@ -57,7 +57,6 @@ class session
public:
session(demuxer& d, size_t block_size, stats& s)
: demuxer_(d),
context_(1),
socket_(d),
block_size_(block_size),
recv_data_(new char[block_size]),
@ -88,15 +87,14 @@ public:
{
++unsent_count_;
async_send_n(socket_, send_data_, block_size_,
boost::bind(&session::handle_send, this, _1, _2, _3), context_);
boost::bind(&session::handle_send, this, _1, _2, _3));
socket_.async_recv(recv_data_, block_size_,
boost::bind(&session::handle_recv, this, _1, _2), context_);
boost::bind(&session::handle_recv, this, _1, _2));
}
void stop()
{
demuxer_.operation_immediate(boost::bind(&stream_socket::close, &socket_),
context_);
demuxer_.post(boost::bind(&stream_socket::close, &socket_));
}
void handle_recv(const socket_error& error, size_t length)
@ -110,9 +108,9 @@ public:
{
std::swap(recv_data_, send_data_);
async_send_n(socket_, send_data_, length,
boost::bind(&session::handle_send, this, _1, _2, _3), context_);
boost::bind(&session::handle_send, this, _1, _2, _3));
socket_.async_recv(recv_data_, block_size_,
boost::bind(&session::handle_recv, this, _1, _2), context_);
boost::bind(&session::handle_recv, this, _1, _2));
}
}
}
@ -128,16 +126,15 @@ public:
{
std::swap(recv_data_, send_data_);
async_send_n(socket_, send_data_, length,
boost::bind(&session::handle_send, this, _1, _2, _3), context_);
boost::bind(&session::handle_send, this, _1, _2, _3));
socket_.async_recv(recv_data_, block_size_,
boost::bind(&session::handle_recv, this, _1, _2), context_);
boost::bind(&session::handle_recv, this, _1, _2));
}
}
}
private:
demuxer& demuxer_;
counting_completion_context context_;
stream_socket socket_;
size_t block_size_;
char* recv_data_;
@ -154,7 +151,6 @@ public:
client(demuxer& d, const char* host, short port, size_t block_size,
size_t session_count, int timeout)
: demuxer_(d),
context_(1),
stop_timer_(d, timer::from_now, timeout),
connector_(d),
server_addr_(port, host),
@ -165,10 +161,9 @@ public:
{
session* new_session = new session(demuxer_, block_size, stats_);
connector_.async_connect(new_session->socket(), server_addr_,
boost::bind(&client::handle_connect, this, new_session, _1), context_);
boost::bind(&client::handle_connect, this, new_session, _1));
stop_timer_.async_wait(boost::bind(&client::handle_timeout, this),
context_);
stop_timer_.async_wait(boost::bind(&client::handle_timeout, this));
}
~client()
@ -199,8 +194,7 @@ public:
{
new_session = new session(demuxer_, block_size_, stats_);
connector_.async_connect(new_session->socket(), server_addr_,
boost::bind(&client::handle_connect, this, new_session, _1),
context_);
boost::bind(&client::handle_connect, this, new_session, _1));
}
}
else
@ -211,7 +205,6 @@ public:
private:
demuxer& demuxer_;
counting_completion_context context_;
timer stop_timer_;
socket_connector connector_;
ipv4::address server_addr_;
@ -244,22 +237,23 @@ int main(int argc, char* argv[])
client c(d, host, port, block_size, session_count, timeout);
std::list<detail::thread*> threads;
// Threads not currently supported in this test.
/*std::list<detail::thread*> threads;
while (--thread_count > 0)
{
detail::thread* new_thread =
new detail::thread(boost::bind(&demuxer::run, &d));
threads.push_back(new_thread);
}
}*/
d.run();
while (!threads.empty())
/*while (!threads.empty())
{
threads.front()->join();
delete threads.front();
threads.pop_front();
}
}*/
}
catch (std::exception& e)
{

View File

@ -24,8 +24,7 @@ class session
{
public:
session(demuxer& d, size_t block_size)
: context_(1),
socket_(d),
: socket_(d),
block_size_(block_size),
recv_data_(new char[block_size]),
send_data_(new char[block_size]),
@ -49,7 +48,7 @@ public:
{
++op_count_;
socket_.async_recv(recv_data_, block_size_,
boost::bind(&session::handle_recv, this, _1, _2), context_);
boost::bind(&session::handle_recv, this, _1, _2));
}
void handle_recv(const socket_error& error, size_t length)
@ -62,9 +61,9 @@ public:
op_count_ += 2;
std::swap(recv_data_, send_data_);
async_send_n(socket_, send_data_, length,
boost::bind(&session::handle_send, this, _1, _2, _3), context_);
boost::bind(&session::handle_send, this, _1, _2, _3));
socket_.async_recv(recv_data_, block_size_,
boost::bind(&session::handle_recv, this, _1, _2), context_);
boost::bind(&session::handle_recv, this, _1, _2));
}
}
else if (--op_count_ == 0)
@ -83,9 +82,9 @@ public:
op_count_ += 2;
std::swap(recv_data_, send_data_);
async_send_n(socket_, send_data_, length,
boost::bind(&session::handle_send, this, _1, _2, _3), context_);
boost::bind(&session::handle_send, this, _1, _2, _3));
socket_.async_recv(recv_data_, block_size_,
boost::bind(&session::handle_recv, this, _1, _2), context_);
boost::bind(&session::handle_recv, this, _1, _2));
}
}
else if (--op_count_ == 0)
@ -95,7 +94,6 @@ public:
}
private:
counting_completion_context context_;
stream_socket socket_;
size_t block_size_;
char* recv_data_;
@ -163,22 +161,23 @@ int main(int argc, char* argv[])
server s(d, port, block_size);
std::list<detail::thread*> threads;
// Threads not currently supported in this test.
/*std::list<detail::thread*> threads;
while (--thread_count > 0)
{
detail::thread* new_thread =
new detail::thread(boost::bind(&demuxer::run, &d));
threads.push_back(new_thread);
}
}*/
d.run();
while (!threads.empty())
/*while (!threads.empty())
{
threads.front()->join();
delete threads.front();
threads.pop_front();
}
}*/
}
catch (std::exception& e)
{

View File

@ -31,9 +31,9 @@ void decrement_to_zero(demuxer* d, int* count)
--(*count);
int before_value = *count;
d->operation_immediate(boost::bind(decrement_to_zero, d, count));
d->post(boost::bind(decrement_to_zero, d, count));
// Completion cannot nest, so count value should remain unchanged.
// Handler execution cannot nest, so count value should remain unchanged.
UNIT_TEST_CHECK(*count == before_value);
}
}
@ -44,10 +44,9 @@ void nested_decrement_to_zero(demuxer* d, int* count)
{
--(*count);
d->operation_immediate(boost::bind(nested_decrement_to_zero, d, count),
null_completion_context(), true);
d->dispatch(boost::bind(nested_decrement_to_zero, d, count));
// Completion is nested, so count value should now be zero.
// Handler execution is nested, so count value should now be zero.
UNIT_TEST_CHECK(*count == 0);
}
}
@ -66,11 +65,10 @@ void start_sleep_increments(demuxer* d, int* count)
timer t(*d, timer::from_now, 2);
t.wait();
// Start three increments which cannot run in parallel.
counting_completion_context ctx1(1);
d->operation_immediate(boost::bind(sleep_increment, d, count), ctx1);
d->operation_immediate(boost::bind(sleep_increment, d, count), ctx1);
d->operation_immediate(boost::bind(sleep_increment, d, count), ctx1);
// Start three increments.
d->post(boost::bind(sleep_increment, d, count));
d->post(boost::bind(sleep_increment, d, count));
d->post(boost::bind(sleep_increment, d, count));
}
void demuxer_test()
@ -78,111 +76,99 @@ void demuxer_test()
demuxer d;
int count = 0;
d.operation_immediate(boost::bind(increment, &count));
d.post(boost::bind(increment, &count));
// No completions can be delivered until run() is called.
// No handlers can be called until run() is called.
UNIT_TEST_CHECK(count == 0);
d.run();
// The run() call will not return until all operations have finished.
// The run() call will not return until all work has finished.
UNIT_TEST_CHECK(count == 1);
count = 0;
d.reset();
d.operation_immediate(boost::bind(increment, &count));
d.operation_immediate(boost::bind(increment, &count));
d.operation_immediate(boost::bind(increment, &count));
d.operation_immediate(boost::bind(increment, &count));
d.operation_immediate(boost::bind(increment, &count));
d.post(boost::bind(increment, &count));
d.post(boost::bind(increment, &count));
d.post(boost::bind(increment, &count));
d.post(boost::bind(increment, &count));
d.post(boost::bind(increment, &count));
// No completions can be delivered until run() is called.
// No handlers can be called until run() is called.
UNIT_TEST_CHECK(count == 0);
d.run();
// The run() call will not return until all operations have finished.
// The run() call will not return until all work has finished.
UNIT_TEST_CHECK(count == 5);
count = 0;
d.reset();
d.operation_started();
d.operation_immediate(boost::bind(&demuxer::interrupt, &d));
d.work_started();
d.post(boost::bind(&demuxer::interrupt, &d));
d.run();
// The only operation executed should have been to interrupt run().
UNIT_TEST_CHECK(count == 0);
d.reset();
d.operation_completed(boost::bind(increment, &count));
d.post(boost::bind(increment, &count));
d.work_finished();
// No completions can be delivered until run() is called.
// No handlers can be called until run() is called.
UNIT_TEST_CHECK(count == 0);
d.run();
// The run() call will not return until all operations have finished.
// The run() call will not return until all work has finished.
UNIT_TEST_CHECK(count == 1);
count = 10;
d.reset();
d.operation_immediate(boost::bind(decrement_to_zero, &d, &count));
d.post(boost::bind(decrement_to_zero, &d, &count));
// No completions can be delivered until run() is called.
// No handlers can be called until run() is called.
UNIT_TEST_CHECK(count == 10);
d.run();
// The run() call will not return until all operations have finished.
// The run() call will not return until all work has finished.
UNIT_TEST_CHECK(count == 0);
count = 10;
d.reset();
d.operation_immediate(boost::bind(nested_decrement_to_zero, &d, &count));
d.post(boost::bind(nested_decrement_to_zero, &d, &count));
// No completions can be delivered until run() is called.
// No handlers can be called until run() is called.
UNIT_TEST_CHECK(count == 10);
d.run();
// The run() call will not return until all operations have finished.
// The run() call will not return until all work has finished.
UNIT_TEST_CHECK(count == 0);
count = 10;
d.reset();
d.operation_immediate(boost::bind(nested_decrement_to_zero, &d, &count),
null_completion_context(), true);
d.dispatch(boost::bind(nested_decrement_to_zero, &d, &count));
// No completions can be delivered until run() is called, even though nested
// No handlers can be called until run() is called, even though nested
// delivery was specifically allowed in the previous call.
UNIT_TEST_CHECK(count == 10);
d.run();
// The run() call will not return until all operations have finished.
// The run() call will not return until all work has finished.
UNIT_TEST_CHECK(count == 0);
count = 0;
d.reset();
d.operation_immediate(boost::bind(start_sleep_increments, &d, &count));
d.post(boost::bind(start_sleep_increments, &d, &count));
detail::thread thread1(boost::bind(&demuxer::run, &d));
detail::thread thread2(boost::bind(&demuxer::run, &d));
// Check all events run one after another even though there are two threads.
timer timer1(d, timer::from_now, 3);
timer1.wait();
UNIT_TEST_CHECK(count == 0);
timer1.set(timer::from_existing, 2);
timer1.wait();
UNIT_TEST_CHECK(count == 1);
timer1.set(timer::from_existing, 2);
timer1.wait();
UNIT_TEST_CHECK(count == 2);
thread1.join();
thread2.join();
// The run() calls will not return until all operations have finished.
// The run() calls will not return until all work has finished.
UNIT_TEST_CHECK(count == 3);
}