Update io_service::strand to satisfy the Executor requirements.

This commit is contained in:
Christopher Kohlhoff 2014-10-05 15:21:42 +11:00
parent 5925ceb3d8
commit daba8047e0
7 changed files with 177 additions and 45 deletions

View File

@ -110,7 +110,9 @@ public:
{
}
/// Get the io_service associated with the strand.
#if !defined(ASIO_NO_DEPRECATED)
/// (Deprecated: Use context().) Get the io_service associated with the
/// strand.
/**
* This function may be used to obtain the io_service object that the strand
* uses to dispatch handlers for asynchronous operations.
@ -122,8 +124,58 @@ public:
{
return service_.get_io_service();
}
#endif // !defined(ASIO_NO_DEPRECATED)
/// Request the strand to invoke the given handler.
/// Obtain the underlying execution context.
asio::io_service& context() ASIO_NOEXCEPT
{
return service_.get_io_service();
}
/// Inform the strand that it has some outstanding work to do.
/**
* The strand delegates this call to its underlying io_service.
*/
void on_work_started() ASIO_NOEXCEPT
{
context().get_executor().on_work_started();
}
/// Inform the strand that some work is no longer outstanding.
/**
* The strand delegates this call to its underlying io_service.
*/
void on_work_finished() ASIO_NOEXCEPT
{
context().get_executor().on_work_finished();
}
/// Request the strand to invoke the given function object.
/**
* This function is used to ask the strand to execute the given function
* object on its underlying io_service. The function object will be executed
* inside this function if the strand is not otherwise busy and if the
* underlying io_service's executor's @c dispatch() function is also able to
* execute the function before returning.
*
* @param f The function object to be called. The executor will make
* a copy of the handler object as required. The function signature of the
* function object must be: @code void function(); @endcode
*
* @param a An allocator that may be used by the executor to allocate the
* internal storage needed for function invocation.
*/
template <typename Function, typename Allocator>
void dispatch(ASIO_MOVE_ARG(Function) f, const Allocator& a)
{
typename decay<Function>::type tmp(ASIO_MOVE_CAST(Function)(f));
service_.dispatch(impl_, tmp);
(void)a;
}
#if !defined(ASIO_NO_DEPRECATED)
/// (Deprecated: Use asio::dispatch().) Request the strand to invoke
/// the given handler.
/**
* This function is used to ask the strand to execute the given handler.
*
@ -156,9 +208,32 @@ public:
return init.result.get();
}
#endif // !defined(ASIO_NO_DEPRECATED)
/// Request the strand to invoke the given handler and return
/// immediately.
/// Request the strand to invoke the given function object.
/**
* This function is used to ask the executor to execute the given function
* object. The function object will never be executed inside this function.
* Instead, it will be scheduled to run by the underlying io_service.
*
* @param f The function object to be called. The executor will make
* a copy of the handler object as required. The function signature of the
* function object must be: @code void function(); @endcode
*
* @param a An allocator that may be used by the executor to allocate the
* internal storage needed for function invocation.
*/
template <typename Function, typename Allocator>
void post(ASIO_MOVE_ARG(Function) f, const Allocator& a)
{
typename decay<Function>::type tmp(ASIO_MOVE_CAST(Function)(f));
service_.post(impl_, tmp);
(void)a;
}
#if !defined(ASIO_NO_DEPRECATED)
/// (Deprecated: Use asio::post().) Request the strand to invoke the
/// given handler and return immediately.
/**
* This function is used to ask the strand to execute the given handler, but
* without allowing the strand to call the handler from inside this function.
@ -187,9 +262,32 @@ public:
return init.result.get();
}
#endif // !defined(ASIO_NO_DEPRECATED)
/// Create a new handler that automatically dispatches the wrapped handler
/// on the strand.
/// Request the strand to invoke the given function object.
/**
* This function is used to ask the executor to execute the given function
* object. The function object will never be executed inside this function.
* Instead, it will be scheduled to run by the underlying io_service.
*
* @param f The function object to be called. The executor will make
* a copy of the handler object as required. The function signature of the
* function object must be: @code void function(); @endcode
*
* @param a An allocator that may be used by the executor to allocate the
* internal storage needed for function invocation.
*/
template <typename Function, typename Allocator>
void defer(ASIO_MOVE_ARG(Function) f, const Allocator& a)
{
typename decay<Function>::type tmp(ASIO_MOVE_CAST(Function)(f));
service_.post(impl_, tmp);
(void)a;
}
#if !defined(ASIO_NO_DEPRECATED)
/// (Deprecated: Use asio::wrap().) Create a new handler that
/// automatically dispatches the wrapped handler on the strand.
/**
* This function is used to create a new handler function object that, when
* invoked, will automatically pass the wrapped handler to the strand's
@ -220,6 +318,7 @@ public:
return detail::wrapped_handler<io_service::strand, Handler,
detail::is_continuation_if_running>(*this, handler);
}
#endif // !defined(ASIO_NO_DEPRECATED)
/// Determine whether the strand is running in the current thread.
/**
@ -227,16 +326,41 @@ public:
* submitted to the strand using post(), dispatch() or wrap(). Otherwise
* returns @c false.
*/
bool running_in_this_thread() const
bool running_in_this_thread() const ASIO_NOEXCEPT
{
return service_.running_in_this_thread(impl_);
}
/// Compare two strands for equality.
/**
* Two strands are equal if they refer to the same ordered, non-concurrent
* state.
*/
friend bool operator==(const strand& a, const strand& b) ASIO_NOEXCEPT
{
return a.impl_ == b.impl_;
}
/// Compare two strands for inequality.
/**
* Two strands are equal if they refer to the same ordered, non-concurrent
* state.
*/
friend bool operator!=(const strand& a, const strand& b) ASIO_NOEXCEPT
{
return a.impl_ != b.impl_;
}
private:
asio::detail::strand_service& service_;
asio::detail::strand_service::implementation_type impl_;
};
#if !defined(GENERATING_DOCUMENTATION)
template <>
struct is_executor<io_service::strand> : true_type {};
#endif // !defined(GENERATING_DOCUMENTATION)
} // namespace asio
#include "asio/detail/pop_options.hpp"

View File

@ -194,11 +194,11 @@ public:
executor_, ASIO_MOVE_CAST(Function)(f), a);
}
/// Request the system executor to invoke the given function object.
/// Request the strand to invoke the given function object.
/**
* This function is used to ask the executor to execute the given function
* object. The function object will never be executed inside this function.
* Instead, it will be scheduled to run on an unspecified system thread pool.
* Instead, it will be scheduled by the underlying executor's defer function.
*
* @param f The function object to be called. The executor will make
* a copy of the handler object as required. The function signature of the
@ -214,11 +214,11 @@ public:
executor_, ASIO_MOVE_CAST(Function)(f), a);
}
/// Request the system executor to invoke the given function object.
/// Request the strand to invoke the given function object.
/**
* This function is used to ask the executor to execute the given function
* object. The function object will never be executed inside this function.
* Instead, it will be scheduled to run on an unspecified system thread pool.
* Instead, it will be scheduled by the underlying executor's defer function.
*
* @param f The function object to be called. The executor will make
* a copy of the handler object as required. The function signature of the

View File

@ -32,7 +32,7 @@ asio::ip::tcp::socket& connection::socket()
void connection::start()
{
socket_.async_read_some(asio::buffer(buffer_),
strand_.wrap(
asio::wrap(strand_,
boost::bind(&connection::handle_read, shared_from_this(),
asio::placeholders::error,
asio::placeholders::bytes_transferred)));
@ -51,7 +51,7 @@ void connection::handle_read(const asio::error_code& e,
{
request_handler_.handle_request(request_, reply_);
asio::async_write(socket_, reply_.to_buffers(),
strand_.wrap(
asio::wrap(strand_,
boost::bind(&connection::handle_write, shared_from_this(),
asio::placeholders::error)));
}
@ -59,14 +59,14 @@ void connection::handle_read(const asio::error_code& e,
{
reply_ = reply::stock_reply(reply::bad_request);
asio::async_write(socket_, reply_.to_buffers(),
strand_.wrap(
asio::wrap(strand_,
boost::bind(&connection::handle_write, shared_from_this(),
asio::placeholders::error)));
}
else
{
socket_.async_read_some(asio::buffer(buffer_),
strand_.wrap(
asio::wrap(strand_,
boost::bind(&connection::handle_read, shared_from_this(),
asio::placeholders::error,
asio::placeholders::bytes_transferred)));

View File

@ -22,8 +22,11 @@ public:
timer2_(io, boost::posix_time::seconds(1)),
count_(0)
{
timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
timer1_.async_wait(asio::wrap(strand_,
boost::bind(&printer::print1, this)));
timer2_.async_wait(asio::wrap(strand_,
boost::bind(&printer::print2, this)));
}
~printer()
@ -39,7 +42,9 @@ public:
++count_;
timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
timer1_.async_wait(asio::wrap(strand_,
boost::bind(&printer::print1, this)));
}
}
@ -51,7 +56,9 @@ public:
++count_;
timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
timer2_.async_wait(asio::wrap(strand_,
boost::bind(&printer::print2, this)));
}
}

View File

@ -77,13 +77,13 @@ public:
void start(asio::ip::tcp::resolver::iterator endpoint_iterator)
{
asio::async_connect(socket_, endpoint_iterator,
strand_.wrap(boost::bind(&session::handle_connect, this,
asio::wrap(strand_,boost::bind(&session::handle_connect, this,
asio::placeholders::error)));
}
void stop()
{
strand_.post(boost::bind(&session::close_socket, this));
asio::post(strand_, boost::bind(&session::close_socket, this));
}
private:
@ -98,13 +98,13 @@ private:
{
++unwritten_count_;
async_write(socket_, asio::buffer(write_data_, block_size_),
strand_.wrap(
asio::wrap(strand_,
make_custom_alloc_handler(write_allocator_,
boost::bind(&session::handle_write, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred))));
socket_.async_read_some(asio::buffer(read_data_, block_size_),
strand_.wrap(
asio::wrap(strand_,
make_custom_alloc_handler(read_allocator_,
boost::bind(&session::handle_read, this,
asio::placeholders::error,
@ -125,13 +125,13 @@ private:
{
std::swap(read_data_, write_data_);
async_write(socket_, asio::buffer(write_data_, read_data_length_),
strand_.wrap(
asio::wrap(strand_,
make_custom_alloc_handler(write_allocator_,
boost::bind(&session::handle_write, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred))));
socket_.async_read_some(asio::buffer(read_data_, block_size_),
strand_.wrap(
asio::wrap(strand_,
make_custom_alloc_handler(read_allocator_,
boost::bind(&session::handle_read, this,
asio::placeholders::error,
@ -151,13 +151,13 @@ private:
{
std::swap(read_data_, write_data_);
async_write(socket_, asio::buffer(write_data_, read_data_length_),
strand_.wrap(
asio::wrap(strand_,
make_custom_alloc_handler(write_allocator_,
boost::bind(&session::handle_write, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred))));
socket_.async_read_some(asio::buffer(read_data_, block_size_),
strand_.wrap(
asio::wrap(strand_,
make_custom_alloc_handler(read_allocator_,
boost::bind(&session::handle_read, this,
asio::placeholders::error,

View File

@ -51,7 +51,7 @@ public:
{
++op_count_;
socket_.async_read_some(asio::buffer(read_data_, block_size_),
strand_.wrap(
asio::wrap(strand_,
make_custom_alloc_handler(read_allocator_,
boost::bind(&session::handle_read, this,
asio::placeholders::error,
@ -76,12 +76,12 @@ public:
op_count_ += 2;
std::swap(read_data_, write_data_);
async_write(socket_, asio::buffer(write_data_, read_data_length_),
strand_.wrap(
asio::wrap(strand_,
make_custom_alloc_handler(write_allocator_,
boost::bind(&session::handle_write, this,
asio::placeholders::error))));
socket_.async_read_some(asio::buffer(read_data_, block_size_),
strand_.wrap(
asio::wrap(strand_,
make_custom_alloc_handler(read_allocator_,
boost::bind(&session::handle_read, this,
asio::placeholders::error,
@ -105,12 +105,12 @@ public:
op_count_ += 2;
std::swap(read_data_, write_data_);
async_write(socket_, asio::buffer(write_data_, read_data_length_),
strand_.wrap(
asio::wrap(strand_,
make_custom_alloc_handler(write_allocator_,
boost::bind(&session::handle_write, this,
asio::placeholders::error))));
socket_.async_read_some(asio::buffer(read_data_, block_size_),
strand_.wrap(
asio::wrap(strand_,
make_custom_alloc_handler(read_allocator_,
boost::bind(&session::handle_read, this,
asio::placeholders::error,

View File

@ -18,6 +18,7 @@
#include <sstream>
#include "asio/io_service.hpp"
#include "asio/post.hpp"
#include "asio/thread.hpp"
#include "unit_test.hpp"
@ -119,7 +120,7 @@ void strand_test()
io_service::strand s(ios);
int count = 0;
ios.post(bindns::bind(increment_without_lock, &s, &count));
post(ios, bindns::bind(increment_without_lock, &s, &count));
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
@ -131,7 +132,7 @@ void strand_test()
count = 0;
ios.restart();
s.post(bindns::bind(increment_with_lock, &s, &count));
post(s, bindns::bind(increment_with_lock, &s, &count));
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
@ -143,7 +144,7 @@ void strand_test()
count = 0;
ios.restart();
ios.post(bindns::bind(start_sleep_increments, &ios, &s, &count));
post(ios, bindns::bind(start_sleep_increments, &ios, &s, &count));
thread thread1(bindns::bind(io_service_run, &ios));
thread thread2(bindns::bind(io_service_run, &ios));
@ -155,14 +156,14 @@ void strand_test()
timer1.expires_at(timer1.expires_at() + chronons::seconds(2));
#else // defined(ASIO_HAS_BOOST_DATE_TIME)
timer1.expires_at(timer1.expiry() + chronons::seconds(2));
#endif defined(ASIO_HAS_BOOST_DATE_TIME)
#endif // defined(ASIO_HAS_BOOST_DATE_TIME)
timer1.wait();
ASIO_CHECK(count == 1);
#if defined(ASIO_HAS_BOOST_DATE_TIME)
timer1.expires_at(timer1.expires_at() + chronons::seconds(2));
#else // defined(ASIO_HAS_BOOST_DATE_TIME)
timer1.expires_at(timer1.expiry() + chronons::seconds(2));
#endif defined(ASIO_HAS_BOOST_DATE_TIME)
#endif // defined(ASIO_HAS_BOOST_DATE_TIME)
timer1.wait();
ASIO_CHECK(count == 2);
@ -175,11 +176,11 @@ void strand_test()
count = 0;
int exception_count = 0;
ios.restart();
s.post(throw_exception);
s.post(bindns::bind(increment, &count));
s.post(bindns::bind(increment, &count));
s.post(throw_exception);
s.post(bindns::bind(increment, &count));
post(s, throw_exception);
post(s, bindns::bind(increment, &count));
post(s, bindns::bind(increment, &count));
post(s, throw_exception);
post(s, bindns::bind(increment, &count));
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
@ -209,9 +210,9 @@ void strand_test()
// are abandoned.
{
io_service::strand s2(ios);
s2.post(bindns::bind(increment, &count));
s2.post(bindns::bind(increment, &count));
s2.post(bindns::bind(increment, &count));
post(s2, bindns::bind(increment, &count));
post(s2, bindns::bind(increment, &count));
post(s2, bindns::bind(increment, &count));
}
// No handlers can be called until run() is called.