Add a make_strand function.

The make_strand function creates a strand with a deduced Executor template
argument.
This commit is contained in:
Christopher Kohlhoff 2019-03-01 08:58:38 +11:00
parent 52eef463ef
commit fff63d15a7
10 changed files with 383 additions and 107 deletions

View File

@ -271,6 +271,33 @@ private:
implementation_type impl_;
};
/** @defgroup make_strand asio::make_strand
*
* @brief The asio::make_strand function creates a @ref strand object for
* an executor or execution context.
*/
/*@{*/
/// Create a @ref strand object for an executor.
template <typename Executor>
inline strand<Executor> make_strand(const Executor& ex,
typename enable_if<is_executor<Executor>::value>::type* = 0)
{
return strand<Executor>(ex);
}
/// Create a @ref strand object for an execution context.
template <typename ExecutionContext>
inline strand<typename ExecutionContext::executor_type>
make_strand(ExecutionContext& ctx,
typename enable_if<
is_convertible<ExecutionContext&, execution_context&>::value>::type* = 0)
{
return strand<typename ExecutionContext::executor_type>(ctx.get_executor());
}
/*@}*/
} // namespace asio
#include "asio/detail/pop_options.hpp"

View File

@ -150,6 +150,7 @@ UNIT_TEST_EXES = \
tests\unit\generic\stream_protocol.exe \
tests\unit\high_resolution_timer.exe \
tests\unit\io_context.exe \
tests\unit\io_context_strand.exe \
tests\unit\ip\address.exe \
tests\unit\ip\address_v4.exe \
tests\unit\ip\address_v4_iterator.exe \

View File

@ -18,7 +18,7 @@ namespace server3 {
connection::connection(asio::io_context& io_context,
request_handler& handler)
: strand_(io_context),
: strand_(asio::make_strand(io_context)),
socket_(strand_),
request_handler_(handler)
{

View File

@ -49,7 +49,7 @@ private:
void handle_write(const asio::error_code& e);
/// Strand to ensure the connection's handlers are not called concurrently.
asio::io_context::strand strand_;
asio::strand<asio::io_context::executor_type> strand_;
/// Socket for the connection.
asio::ip::tcp::socket socket_;

View File

@ -24,7 +24,7 @@ class session : public boost::enable_shared_from_this<session>
{
public:
explicit session(asio::io_context& io_context)
: strand_(io_context),
: strand_(asio::make_strand(io_context)),
socket_(io_context),
timer_(io_context)
{
@ -76,7 +76,7 @@ private:
}
}
asio::io_context::strand strand_;
asio::strand<asio::io_context::executor_type> strand_;
tcp::socket socket_;
asio::steady_timer timer_;
};

View File

@ -16,7 +16,7 @@ class printer
{
public:
printer(asio::io_context& io)
: strand_(io),
: strand_(asio::make_strand(io)),
timer1_(io, asio::chrono::seconds(1)),
timer2_(io, asio::chrono::seconds(1)),
count_(0)
@ -62,7 +62,7 @@ public:
}
private:
asio::io_context::strand strand_;
asio::strand<asio::io_context::executor_type> strand_;
asio::steady_timer timer1_;
asio::steady_timer timer2_;
int count_;

View File

@ -287,7 +287,7 @@ Return to \ref tuttimer4
/**
\page tuttimer5 Timer.5 - Synchronising handlers in multithreaded programs
This tutorial demonstrates the use of the asio::io_context::strand class to
This tutorial demonstrates the use of the asio::strand class template to
synchronise callback handlers in a multithreaded program.
The previous four tutorials avoided the issue of handler synchronisation by
@ -324,25 +324,25 @@ tutorial by running two timers in parallel.
In addition to initialising a pair of asio::steady_timer members, the
constructor initialises the <tt>strand_</tt> member, an object of type
asio::io_context::strand.
asio::strand<asio::io_context::executor_type>.
An asio::io_context::strand is an executor that guarantees that, for those
handlers that are dispatched through it, an executing handler will be allowed
to complete before the next one is started. This is guaranteed irrespective of
the number of threads that are calling asio::io_context::run(). Of course, the
handlers may still execute concurrently with other handlers that were
<b>not</b> dispatched through an asio::io_context::strand, or were dispatched
through a different asio::io_context::strand object.
The asio::strand class template is an executor adapter that guarantees
that, for those handlers that are dispatched through it, an executing handler
will be allowed to complete before the next one is started. This is guaranteed
irrespective of the number of threads that are calling
asio::io_context::run(). Of course, the handlers may still execute
concurrently with other handlers that were <b>not</b> dispatched through an
asio::strand, or were dispatched through a different asio::strand
object.
\until {
When initiating the asynchronous operations, each callback handler is "bound"
to an asio::io_context::strand object. The
asio::io_context::strand::bind_executor() function returns a new handler that
automatically dispatches its contained handler through the
asio::io_context::strand object. By binding the handlers to the same
asio::io_context::strand, we are ensuring that they cannot execute
concurrently.
to an asio::strand<asio::io_context::executor_type> object. The
asio::bind_executor() function returns a new handler that automatically
dispatches its contained handler through the asio::strand object. By
binding the handlers to the same asio::strand, we are ensuring that they
cannot execute concurrently.
\until }
\until }

View File

@ -50,6 +50,7 @@ check_PROGRAMS = \
unit/generic/stream_protocol \
unit/high_resolution_timer \
unit/io_context \
unit/io_context_strand \
unit/ip/address \
unit/ip/address_v4 \
unit/ip/address_v4_iterator \
@ -182,6 +183,7 @@ TESTS = \
unit/executor_work_guard \
unit/high_resolution_timer \
unit/io_context \
unit/io_context_strand \
unit/ip/address \
unit/ip/address_v4 \
unit/ip/address_v4_iterator \
@ -324,6 +326,7 @@ unit_generic_seq_packet_protocol_SOURCES = unit/generic/seq_packet_protocol.cpp
unit_generic_stream_protocol_SOURCES = unit/generic/stream_protocol.cpp
unit_high_resolution_timer_SOURCES = unit/high_resolution_timer.cpp
unit_io_context_SOURCES = unit/io_context.cpp
unit_io_context_strand_SOURCES = unit/io_context_strand.cpp
unit_ip_address_SOURCES = unit/ip/address.cpp
unit_ip_address_v4_SOURCES = unit/ip/address_v4.cpp
unit_ip_address_v4_iterator_SOURCES = unit/ip/address_v4_iterator.cpp

View File

@ -0,0 +1,325 @@
//
// io_context_strand.cpp
// ~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2019 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Disable autolinking for unit tests.
#if !defined(BOOST_ALL_NO_LIB)
#define BOOST_ALL_NO_LIB 1
#endif // !defined(BOOST_ALL_NO_LIB)
// Test that header file is self-contained.
#include "asio/io_context_strand.hpp"
#include <sstream>
#include "asio/io_context.hpp"
#include "asio/dispatch.hpp"
#include "asio/post.hpp"
#include "asio/thread.hpp"
#include "unit_test.hpp"
#if defined(ASIO_HAS_BOOST_DATE_TIME)
# include "asio/deadline_timer.hpp"
#else // defined(ASIO_HAS_BOOST_DATE_TIME)
# include "asio/steady_timer.hpp"
#endif // defined(ASIO_HAS_BOOST_DATE_TIME)
#if defined(ASIO_HAS_BOOST_BIND)
# include <boost/bind.hpp>
#else // defined(ASIO_HAS_BOOST_BIND)
# include <functional>
#endif // defined(ASIO_HAS_BOOST_BIND)
using namespace asio;
#if defined(ASIO_HAS_BOOST_BIND)
namespace bindns = boost;
#else // defined(ASIO_HAS_BOOST_BIND)
namespace bindns = std;
#endif
#if defined(ASIO_HAS_BOOST_DATE_TIME)
typedef deadline_timer timer;
namespace chronons = boost::posix_time;
#elif defined(ASIO_HAS_CHRONO)
typedef steady_timer timer;
namespace chronons = asio::chrono;
#endif // defined(ASIO_HAS_BOOST_DATE_TIME)
void increment(int* count)
{
++(*count);
}
void increment_without_lock(io_context::strand* s, int* count)
{
ASIO_CHECK(!s->running_in_this_thread());
int original_count = *count;
dispatch(*s, bindns::bind(increment, count));
// No other functions are currently executing through the locking dispatcher,
// so the previous call to dispatch should have successfully nested.
ASIO_CHECK(*count == original_count + 1);
}
void increment_with_lock(io_context::strand* s, int* count)
{
ASIO_CHECK(s->running_in_this_thread());
int original_count = *count;
dispatch(*s, bindns::bind(increment, count));
// The current function already holds the strand's lock, so the
// previous call to dispatch should have successfully nested.
ASIO_CHECK(*count == original_count + 1);
}
void sleep_increment(io_context* ioc, int* count)
{
timer t(*ioc, chronons::seconds(2));
t.wait();
++(*count);
}
void increment_by_a(int* count, int a)
{
(*count) += a;
}
void increment_by_a_b(int* count, int a, int b)
{
(*count) += a + b;
}
void increment_by_a_b_c(int* count, int a, int b, int c)
{
(*count) += a + b + c;
}
void increment_by_a_b_c_d(int* count, int a, int b, int c, int d)
{
(*count) += a + b + c + d;
}
void start_sleep_increments(io_context* ioc, io_context::strand* s, int* count)
{
// Give all threads a chance to start.
timer t(*ioc, chronons::seconds(2));
t.wait();
// Start three increments.
post(*s, bindns::bind(sleep_increment, ioc, count));
post(*s, bindns::bind(sleep_increment, ioc, count));
post(*s, bindns::bind(sleep_increment, ioc, count));
}
void throw_exception()
{
throw 1;
}
void io_context_run(io_context* ioc)
{
ioc->run();
}
void strand_test()
{
io_context ioc;
io_context::strand s(ioc);
int count = 0;
post(ioc, bindns::bind(increment_without_lock, &s, &count));
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.run();
// The run() call will not return until all work has finished.
ASIO_CHECK(count == 1);
count = 0;
ioc.restart();
post(s, bindns::bind(increment_with_lock, &s, &count));
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.run();
// The run() call will not return until all work has finished.
ASIO_CHECK(count == 1);
count = 0;
ioc.restart();
post(ioc, bindns::bind(start_sleep_increments, &ioc, &s, &count));
thread thread1(bindns::bind(io_context_run, &ioc));
thread thread2(bindns::bind(io_context_run, &ioc));
// Check all events run one after another even though there are two threads.
timer timer1(ioc, chronons::seconds(3));
timer1.wait();
ASIO_CHECK(count == 0);
#if defined(ASIO_HAS_BOOST_DATE_TIME)
timer1.expires_at(timer1.expires_at() + chronons::seconds(2));
#else // defined(ASIO_HAS_BOOST_DATE_TIME)
timer1.expires_at(timer1.expiry() + chronons::seconds(2));
#endif // defined(ASIO_HAS_BOOST_DATE_TIME)
timer1.wait();
ASIO_CHECK(count == 1);
#if defined(ASIO_HAS_BOOST_DATE_TIME)
timer1.expires_at(timer1.expires_at() + chronons::seconds(2));
#else // defined(ASIO_HAS_BOOST_DATE_TIME)
timer1.expires_at(timer1.expiry() + chronons::seconds(2));
#endif // defined(ASIO_HAS_BOOST_DATE_TIME)
timer1.wait();
ASIO_CHECK(count == 2);
thread1.join();
thread2.join();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 3);
count = 0;
int exception_count = 0;
ioc.restart();
post(s, throw_exception);
post(s, bindns::bind(increment, &count));
post(s, bindns::bind(increment, &count));
post(s, throw_exception);
post(s, bindns::bind(increment, &count));
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ASIO_CHECK(exception_count == 0);
for (;;)
{
try
{
ioc.run();
break;
}
catch (int)
{
++exception_count;
}
}
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 3);
ASIO_CHECK(exception_count == 2);
count = 0;
ioc.restart();
// Check for clean shutdown when handlers posted through an orphaned strand
// are abandoned.
{
io_context::strand s2(ioc);
post(s2, bindns::bind(increment, &count));
post(s2, bindns::bind(increment, &count));
post(s2, bindns::bind(increment, &count));
}
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
}
void strand_wrap_test()
{
#if !defined(ASIO_NO_DEPRECATED)
io_context ioc;
io_context::strand s(ioc);
int count = 0;
s.wrap(bindns::bind(increment, &count))();
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.restart();
ioc.run();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 1);
count = 0;
s.wrap(increment)(&count);
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.restart();
ioc.run();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 1);
count = 0;
s.wrap(increment_by_a)(&count, 1);
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.restart();
ioc.run();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 1);
count = 0;
s.wrap(increment_by_a_b)(&count, 1, 2);
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.restart();
ioc.run();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 3);
count = 0;
s.wrap(increment_by_a_b_c)(&count, 1, 2, 3);
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.restart();
ioc.run();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 6);
count = 0;
s.wrap(increment_by_a_b_c_d)(&count, 1, 2, 3, 4);
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.restart();
ioc.run();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 10);
#endif // !defined(ASIO_NO_DEPRECATED)
}
ASIO_TEST_SUITE
(
"strand",
ASIO_TEST_CASE(strand_test)
ASIO_TEST_CASE(strand_wrap_test)
)

View File

@ -56,7 +56,7 @@ void increment(int* count)
++(*count);
}
void increment_without_lock(io_context::strand* s, int* count)
void increment_without_lock(strand<io_context::executor_type>* s, int* count)
{
ASIO_CHECK(!s->running_in_this_thread());
@ -69,7 +69,7 @@ void increment_without_lock(io_context::strand* s, int* count)
ASIO_CHECK(*count == original_count + 1);
}
void increment_with_lock(io_context::strand* s, int* count)
void increment_with_lock(strand<io_context::executor_type>* s, int* count)
{
ASIO_CHECK(s->running_in_this_thread());
@ -110,7 +110,8 @@ void increment_by_a_b_c_d(int* count, int a, int b, int c, int d)
(*count) += a + b + c + d;
}
void start_sleep_increments(io_context* ioc, io_context::strand* s, int* count)
void start_sleep_increments(io_context* ioc,
strand<io_context::executor_type>* s, int* count)
{
// Give all threads a chance to start.
timer t(*ioc, chronons::seconds(2));
@ -135,7 +136,7 @@ void io_context_run(io_context* ioc)
void strand_test()
{
io_context ioc;
io_context::strand s(ioc);
strand<io_context::executor_type> s = make_strand(ioc);
int count = 0;
post(ioc, bindns::bind(increment_without_lock, &s, &count));
@ -227,7 +228,7 @@ void strand_test()
// Check for clean shutdown when handlers posted through an orphaned strand
// are abandoned.
{
io_context::strand s2(ioc);
strand<io_context::executor_type> s2 = make_strand(ioc.get_executor());
post(s2, bindns::bind(increment, &count));
post(s2, bindns::bind(increment, &count));
post(s2, bindns::bind(increment, &count));
@ -237,89 +238,8 @@ void strand_test()
ASIO_CHECK(count == 0);
}
void strand_wrap_test()
{
#if !defined(ASIO_NO_DEPRECATED)
io_context ioc;
io_context::strand s(ioc);
int count = 0;
s.wrap(bindns::bind(increment, &count))();
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.restart();
ioc.run();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 1);
count = 0;
s.wrap(increment)(&count);
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.restart();
ioc.run();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 1);
count = 0;
s.wrap(increment_by_a)(&count, 1);
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.restart();
ioc.run();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 1);
count = 0;
s.wrap(increment_by_a_b)(&count, 1, 2);
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.restart();
ioc.run();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 3);
count = 0;
s.wrap(increment_by_a_b_c)(&count, 1, 2, 3);
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.restart();
ioc.run();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 6);
count = 0;
s.wrap(increment_by_a_b_c_d)(&count, 1, 2, 3, 4);
// No handlers can be called until run() is called.
ASIO_CHECK(count == 0);
ioc.restart();
ioc.run();
// The run() calls will not return until all work has finished.
ASIO_CHECK(count == 10);
#endif // !defined(ASIO_NO_DEPRECATED)
}
ASIO_TEST_SUITE
(
"strand",
ASIO_TEST_CASE(strand_test)
ASIO_TEST_CASE(strand_wrap_test)
)