Add polymorphic executor wrapper.

This commit is contained in:
Christopher Kohlhoff 2014-10-04 16:42:09 +10:00
parent b261967aee
commit 652a788b11
13 changed files with 1317 additions and 1 deletions

View File

@ -274,6 +274,7 @@ nobase_include_HEADERS = \
asio/error.hpp \
asio/execution_context.hpp \
asio/executor_work.hpp \
asio/executor.hpp \
asio/generic/basic_endpoint.hpp \
asio/generic/datagram_protocol.hpp \
asio/generic/detail/endpoint.hpp \
@ -296,6 +297,8 @@ nobase_include_HEADERS = \
asio/impl/error.ipp \
asio/impl/execution_context.hpp \
asio/impl/execution_context.ipp \
asio/impl/executor.hpp \
asio/impl/executor.ipp \
asio/impl/handler_alloc_hook.ipp \
asio/impl/io_service.hpp \
asio/impl/io_service.ipp \

View File

@ -50,6 +50,7 @@
#include "asio/error.hpp"
#include "asio/error_code.hpp"
#include "asio/execution_context.hpp"
#include "asio/executor.hpp"
#include "asio/executor_work.hpp"
#include "asio/generic/basic_endpoint.hpp"
#include "asio/generic/datagram_protocol.hpp"

View File

@ -32,7 +32,7 @@ public:
template <typename U>
struct rebind
{
typedef recycling_allocator<T> other;
typedef recycling_allocator<U> other;
};
recycling_allocator()

View File

@ -0,0 +1,313 @@
//
// executor.hpp
// ~~~~~~~~~~~~
//
// Copyright (c) 2003-2014 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef ASIO_EXECUTOR_HPP
#define ASIO_EXECUTOR_HPP
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#include "asio/detail/config.hpp"
#include <typeinfo>
#include "asio/detail/cstddef.hpp"
#include "asio/detail/memory.hpp"
#include "asio/execution_context.hpp"
#include "asio/is_executor.hpp"
#include "asio/detail/push_options.hpp"
namespace asio {
/// Exception thrown when trying to access an empty polymorphic executor.
class bad_executor
: public std::exception
{
public:
/// Constructor.
ASIO_DECL bad_executor() ASIO_NOEXCEPT;
/// Obtain message associated with exception.
ASIO_DECL virtual const char* what() const ASIO_NOEXCEPT_OR_NOTHROW;
};
/// Polymorphic wrapper for executors.
class executor
{
public:
/// Default constructor.
executor() ASIO_NOEXCEPT
: impl_(0)
{
}
/// Construct from nullptr.
executor(nullptr_t) ASIO_NOEXCEPT
: impl_(0)
{
}
/// Copy constructor.
executor(const executor& other) ASIO_NOEXCEPT
: impl_(other.clone())
{
}
#if defined(ASIO_HAS_MOVE) || defined(GENERATING_DOCUMENTATION)
/// Move constructor.
executor(executor&& other) ASIO_NOEXCEPT
: impl_(other.impl_)
{
other.impl_ = 0;
}
#endif // defined(ASIO_HAS_MOVE) || defined(GENERATING_DOCUMENTATION)
/// Construct a polymorphic wrapper for the specified executor.
template <typename Executor>
executor(Executor e);
/// Allocator-aware constructor to create a polymorphic wrapper for the
/// specified executor.
template <typename Executor, typename Allocator>
executor(allocator_arg_t, const Allocator& a, Executor e);
/// Destructor.
~executor()
{
destroy();
}
/// Assignment operator.
executor& operator=(const executor& other) ASIO_NOEXCEPT
{
destroy();
impl_ = other.clone();
return *this;
}
#if defined(ASIO_HAS_MOVE) || defined(GENERATING_DOCUMENTATION)
// Move assignment operator.
executor& operator=(executor&& other) ASIO_NOEXCEPT
{
destroy();
impl_ = other.impl_;
other.impl_ = 0;
return *this;
}
#endif // defined(ASIO_HAS_MOVE) || defined(GENERATING_DOCUMENTATION)
/// Assignment operator for nullptr_t.
executor& operator=(nullptr_t) ASIO_NOEXCEPT
{
destroy();
impl_ = 0;
return *this;
}
/// Assignment operator to create a polymorphic wrapper for the specified
/// executor.
template <typename Executor>
executor& operator=(ASIO_MOVE_ARG(Executor) e) ASIO_NOEXCEPT
{
executor tmp(ASIO_MOVE_CAST(Executor)(e));
destroy();
impl_ = tmp.impl_;
tmp.impl_ = 0;
return *this;
}
/// Obtain the underlying execution context.
execution_context& context() ASIO_NOEXCEPT
{
return get_impl()->context();
}
/// Inform the executor that it has some outstanding work to do.
void on_work_started() ASIO_NOEXCEPT
{
get_impl()->on_work_started();
}
/// Inform the executor that some work is no longer outstanding.
void on_work_finished() ASIO_NOEXCEPT
{
get_impl()->on_work_finished();
}
/// Request the executor to invoke the given function object.
/**
* This function is used to ask the executor to execute the given function
* object. The function object is executed according to the rules of the
* target executor object.
*
* @param f The function object to be called. The executor will make a copy
* of the handler object as required. The function signature of the function
* object must be: @code void function(); @endcode
*
* @param a An allocator that may be used by the executor to allocate the
* internal storage needed for function invocation.
*/
template <typename Function, typename Allocator>
void dispatch(ASIO_MOVE_ARG(Function) f, const Allocator& a);
/// Request the executor to invoke the given function object.
/**
* This function is used to ask the executor to execute the given function
* object. The function object is executed according to the rules of the
* target executor object.
*
* @param f The function object to be called. The executor will make
* a copy of the handler object as required. The function signature of the
* function object must be: @code void function(); @endcode
*
* @param a An allocator that may be used by the executor to allocate the
* internal storage needed for function invocation.
*/
template <typename Function, typename Allocator>
void post(ASIO_MOVE_ARG(Function) f, const Allocator& a);
/// Request the executor to invoke the given function object.
/**
* This function is used to ask the executor to execute the given function
* object. The function object is executed according to the rules of the
* target executor object.
*
* @param f The function object to be called. The executor will make
* a copy of the handler object as required. The function signature of the
* function object must be: @code void function(); @endcode
*
* @param a An allocator that may be used by the executor to allocate the
* internal storage needed for function invocation.
*/
template <typename Function, typename Allocator>
void defer(ASIO_MOVE_ARG(Function) f, const Allocator& a);
struct unspecified_bool_type_t {};
typedef void (*unspecified_bool_type)(unspecified_bool_type_t);
static void unspecified_bool_true(unspecified_bool_type_t) {}
/// Operator to test if the executor contains a valid target.
operator unspecified_bool_type() const ASIO_NOEXCEPT
{
return impl_ ? &executor::unspecified_bool_true : 0;
}
/// Obtain type information for the target executor object.
/**
* @returns If @c *this has a target type of type @c T, <tt>typeid(T)</tt>;
* otherwise, <tt>typeid(void)</tt>.
*/
const std::type_info& target_type() const ASIO_NOEXCEPT
{
return impl_ ? impl_->target_type() : typeid(void);
}
/// Obtain a pointer to the target executor object.
/**
* @returns If <tt>target_type() == typeid(T)</tt>, a pointer to the stored
* executor target; otherwise, a null pointer.
*/
template <typename Executor>
Executor* target() ASIO_NOEXCEPT;
/// Obtain a pointer to the target executor object.
/**
* @returns If <tt>target_type() == typeid(T)</tt>, a pointer to the stored
* executor target; otherwise, a null pointer.
*/
template <typename Executor>
const Executor* target() const ASIO_NOEXCEPT;
/// Compare two executors for equality.
friend bool operator==(const executor& a,
const executor& b) ASIO_NOEXCEPT
{
if (a.impl_ == b.impl_)
return true;
if (!a.impl_ || !b.impl_)
return false;
return a.impl_->equals(b.impl_);
}
/// Compare two executors for inequality.
friend bool operator!=(const executor& a,
const executor& b) ASIO_NOEXCEPT
{
return !(a == b);
}
private:
class function;
template <typename, typename> class impl;
// Base class for all polymorphic executor implementations.
class impl_base
{
public:
virtual impl_base* clone() const ASIO_NOEXCEPT = 0;
virtual void destroy() ASIO_NOEXCEPT = 0;
virtual execution_context& context() ASIO_NOEXCEPT = 0;
virtual void on_work_started() ASIO_NOEXCEPT = 0;
virtual void on_work_finished() ASIO_NOEXCEPT = 0;
virtual void dispatch(ASIO_MOVE_ARG(function)) = 0;
virtual void post(ASIO_MOVE_ARG(function)) = 0;
virtual void defer(ASIO_MOVE_ARG(function)) = 0;
virtual const std::type_info& target_type() const ASIO_NOEXCEPT = 0;
virtual void* target() ASIO_NOEXCEPT = 0;
virtual const void* target() const ASIO_NOEXCEPT = 0;
virtual bool equals(const impl_base* e) const ASIO_NOEXCEPT = 0;
protected:
impl_base(bool fast_dispatch) : fast_dispatch_(fast_dispatch) {}
virtual ~impl_base() {}
private:
friend class executor;
const bool fast_dispatch_;
};
// Helper function to check and return the implementation pointer.
impl_base* get_impl()
{
return impl_ ? impl_ : throw bad_executor();
}
// Helper function to clone another implementation.
impl_base* clone() const ASIO_NOEXCEPT
{
return impl_ ? impl_->clone() : 0;
}
// Helper function to destroy an implementation.
void destroy() ASIO_NOEXCEPT
{
if (impl_)
impl_->destroy();
}
impl_base* impl_;
};
#if !defined(GENERATING_DOCUMENTATION)
template <> struct is_executor<executor> : true_type {};
#endif // !defined(GENERATING_DOCUMENTATION)
} // namespace asio
ASIO_USES_ALLOCATOR(asio::executor)
#include "asio/detail/pop_options.hpp"
#include "asio/impl/executor.hpp"
#if defined(ASIO_HEADER_ONLY)
# include "asio/impl/executor.ipp"
#endif // defined(ASIO_HEADER_ONLY)
#endif // ASIO_EXECUTOR_HPP

View File

@ -0,0 +1,383 @@
//
// impl/executor.hpp
// ~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2014 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef ASIO_IMPL_EXECUTOR_HPP
#define ASIO_IMPL_EXECUTOR_HPP
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#include "asio/detail/config.hpp"
#include "asio/detail/atomic_count.hpp"
#include "asio/detail/executor_op.hpp"
#include "asio/detail/global.hpp"
#include "asio/detail/memory.hpp"
#include "asio/detail/recycling_allocator.hpp"
#include "asio/executor.hpp"
#include "asio/system_executor.hpp"
#include "asio/detail/push_options.hpp"
namespace asio {
#if defined(ASIO_HAS_MOVE)
// Lightweight, move-only function object wrapper.
class executor::function
{
public:
template <typename F, typename Alloc>
explicit function(F f, const Alloc& a)
{
// Construct an allocator to be used for the operation.
typedef typename detail::get_recycling_allocator<Alloc>::type alloc_type;
alloc_type allocator(detail::get_recycling_allocator<Alloc>::get(a));
// Allocate and construct an operation to wrap the function.
typedef detail::executor_op<F, alloc_type> op;
typename op::ptr p = { allocator, 0, 0 };
p.v = p.a.allocate(1);
op_ = new (p.v) op(f, allocator);
p.v = 0;
}
function(function&& other)
: op_(other.op_)
{
other.op_ = 0;
}
~function()
{
if (op_)
op_->destroy();
}
void operator()()
{
if (op_)
{
detail::scheduler_operation* op = op_;
op_ = 0;
op->complete(this, asio::error_code(), 0);
}
}
private:
detail::scheduler_operation* op_;
};
#else // defined(ASIO_HAS_MOVE)
// Not so lightweight, copyable function object wrapper.
class executor::function
{
public:
template <typename F, typename Alloc>
explicit function(const F& f, const Alloc&)
: impl_(new impl<F>(f))
{
}
void operator()()
{
impl_->invoke_(impl_.get());
}
private:
// Base class for polymorphic function implementations.
struct impl_base
{
void (*invoke_)(impl_base*);
};
// Polymorphic function implementation.
template <typename F>
struct impl : impl_base
{
impl(const F& f)
: function_(f)
{
invoke_ = &function::invoke<F>;
}
F function_;
};
// Helper to invoke a function.
template <typename F>
static void invoke(impl_base* i)
{
static_cast<impl<F>*>(i)->function_();
}
detail::shared_ptr<impl_base> impl_;
};
#endif // defined(ASIO_HAS_MOVE)
// Default polymorphic allocator implementation.
template <typename Executor, typename Allocator>
class executor::impl
: public executor::impl_base
{
public:
typedef typename Allocator::template rebind<impl>::other allocator_type;
static impl_base* create(const Executor& e, Allocator a = Allocator())
{
raw_mem mem(a);
impl* p = new (mem.ptr_) impl(e, mem.allocator_);
mem.ptr_ = 0;
return p;
}
impl(const Executor& e, const allocator_type& a) ASIO_NOEXCEPT
: impl_base(false),
ref_count_(1),
executor_(e),
allocator_(a)
{
}
impl_base* clone() const ASIO_NOEXCEPT
{
++ref_count_;
return const_cast<impl_base*>(static_cast<const impl_base*>(this));
}
void destroy() ASIO_NOEXCEPT
{
if (--ref_count_ == 0)
{
allocator_type alloc(allocator_);
impl* p = this;
p->~impl();
alloc.deallocate(p, 1);
}
}
void on_work_started() ASIO_NOEXCEPT
{
executor_.on_work_started();
}
void on_work_finished() ASIO_NOEXCEPT
{
executor_.on_work_finished();
}
execution_context& context() ASIO_NOEXCEPT
{
return executor_.context();
}
void dispatch(ASIO_MOVE_ARG(function) f)
{
executor_.dispatch(ASIO_MOVE_CAST(function)(f), allocator_);
}
void post(ASIO_MOVE_ARG(function) f)
{
executor_.post(ASIO_MOVE_CAST(function)(f), allocator_);
}
void defer(ASIO_MOVE_ARG(function) f)
{
executor_.defer(ASIO_MOVE_CAST(function)(f), allocator_);
}
const std::type_info& target_type() const ASIO_NOEXCEPT
{
return typeid(Executor);
}
void* target() ASIO_NOEXCEPT
{
return &executor_;
}
const void* target() const ASIO_NOEXCEPT
{
return &executor_;
}
bool equals(const impl_base* e) const ASIO_NOEXCEPT
{
if (this == e)
return true;
if (target_type() != e->target_type())
return false;
return executor_ == *static_cast<const Executor*>(e->target());
}
private:
mutable detail::atomic_count ref_count_;
Executor executor_;
allocator_type allocator_;
struct raw_mem
{
allocator_type allocator_;
impl* ptr_;
explicit raw_mem(const Allocator& a)
: allocator_(a),
ptr_(allocator_.allocate(1))
{
}
~raw_mem()
{
if (ptr_)
allocator_.deallocate(ptr_, 1);
}
private:
// Disallow copying and assignment.
raw_mem(const raw_mem&);
raw_mem operator=(const raw_mem&);
};
};
// Polymorphic allocator specialisation for system_executor.
template <typename Allocator>
class executor::impl<system_executor, Allocator>
: public executor::impl_base
{
public:
static impl_base* create(const system_executor&, const Allocator& = Allocator())
{
return &detail::global<impl<system_executor, std::allocator<void> > >();
}
impl()
: impl_base(true)
{
}
impl_base* clone() const ASIO_NOEXCEPT
{
return const_cast<impl_base*>(static_cast<const impl_base*>(this));
}
void destroy() ASIO_NOEXCEPT
{
}
void on_work_started() ASIO_NOEXCEPT
{
executor_.on_work_started();
}
void on_work_finished() ASIO_NOEXCEPT
{
executor_.on_work_finished();
}
execution_context& context() ASIO_NOEXCEPT
{
return executor_.context();
}
void dispatch(ASIO_MOVE_ARG(function) f)
{
executor_.dispatch(ASIO_MOVE_CAST(function)(f), allocator_);
}
void post(ASIO_MOVE_ARG(function) f)
{
executor_.post(ASIO_MOVE_CAST(function)(f), allocator_);
}
void defer(ASIO_MOVE_ARG(function) f)
{
executor_.defer(ASIO_MOVE_CAST(function)(f), allocator_);
}
const std::type_info& target_type() const ASIO_NOEXCEPT
{
return typeid(system_executor);
}
void* target() ASIO_NOEXCEPT
{
return &executor_;
}
const void* target() const ASIO_NOEXCEPT
{
return &executor_;
}
bool equals(const impl_base* e) const ASIO_NOEXCEPT
{
return this == e;
}
private:
system_executor executor_;
Allocator allocator_;
};
template <typename Executor>
executor::executor(Executor e)
: impl_(impl<Executor, std::allocator<void> >::create(e))
{
}
template <typename Executor, typename Allocator>
executor::executor(allocator_arg_t, const Allocator& a, Executor e)
: impl_(impl<Executor, Allocator>::create(e, a))
{
}
template <typename Function, typename Allocator>
void executor::dispatch(ASIO_MOVE_ARG(Function) f, const Allocator& a)
{
impl_base* i = get_impl();
if (i->fast_dispatch_)
system_executor().dispatch(ASIO_MOVE_CAST(Function)(f), a);
else
i->dispatch(function(ASIO_MOVE_CAST(Function)(f), a));
}
template <typename Function, typename Allocator>
void executor::post(ASIO_MOVE_ARG(Function) f, const Allocator& a)
{
get_impl()->post(function(ASIO_MOVE_CAST(Function)(f), a));
}
template <typename Function, typename Allocator>
void executor::defer(ASIO_MOVE_ARG(Function) f, const Allocator& a)
{
get_impl()->defer(function(ASIO_MOVE_CAST(Function)(f), a));
}
template <typename Executor>
Executor* executor::target() ASIO_NOEXCEPT
{
return impl_ && impl_->target_type() == typeid(Executor)
? static_cast<Executor*>(impl_->target()) : 0;
}
template <typename Executor>
const Executor* executor::target() const ASIO_NOEXCEPT
{
return impl_ && impl_->target_type() == typeid(Executor)
? static_cast<Executor*>(impl_->target()) : 0;
}
} // namespace asio
#include "asio/detail/pop_options.hpp"
#endif // ASIO_IMPL_EXECUTOR_HPP

View File

@ -0,0 +1,38 @@
//
// impl/executor.ipp
// ~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2014 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef ASIO_IMPL_EXECUTOR_IPP
#define ASIO_IMPL_EXECUTOR_IPP
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#include "asio/detail/config.hpp"
#include "asio/executor.hpp"
#include "asio/detail/push_options.hpp"
namespace asio {
bad_executor::bad_executor() ASIO_NOEXCEPT
{
}
const char* bad_executor::what() const ASIO_NOEXCEPT_OR_NOTHROW
{
return "bad executor";
}
} // namespace asio
#include "asio/detail/pop_options.hpp"
#endif // ASIO_IMPL_EXECUTOR_IPP

View File

@ -22,6 +22,7 @@
#include "asio/impl/error.ipp"
#include "asio/impl/error_code.ipp"
#include "asio/impl/execution_context.ipp"
#include "asio/impl/executor.ipp"
#include "asio/impl/handler_alloc_hook.ipp"
#include "asio/impl/io_service.ipp"
#include "asio/impl/serial_port_base.ipp"

View File

@ -20,6 +20,7 @@ noinst_PROGRAMS = \
echo/blocking_tcp_echo_server \
echo/blocking_udp_echo_client \
echo/blocking_udp_echo_server \
executors/actor \
executors/bank_account_1 \
executors/bank_account_2 \
executors/fork_join \
@ -48,6 +49,7 @@ echo_blocking_tcp_echo_client_SOURCES = echo/blocking_tcp_echo_client.cpp
echo_blocking_tcp_echo_server_SOURCES = echo/blocking_tcp_echo_server.cpp
echo_blocking_udp_echo_client_SOURCES = echo/blocking_udp_echo_client.cpp
echo_blocking_udp_echo_server_SOURCES = echo/blocking_udp_echo_server.cpp
executors_actor_SOURCES = executors/actor.cpp
executors_bank_account_1_SOURCES = executors/bank_account_1.cpp
executors_bank_account_2_SOURCES = executors/bank_account_2.cpp
executors_fork_join_SOURCES = executors/fork_join.cpp

View File

@ -1,3 +1,4 @@
actor
bank_account_[0-9]
fork_join
pipeline

View File

@ -0,0 +1,286 @@
#include <asio/defer.hpp>
#include <asio/executor.hpp>
#include <asio/post.hpp>
#include <asio/strand.hpp>
#include <asio/system_executor.hpp>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <typeinfo>
#include <vector>
using asio::defer;
using asio::executor;
using asio::post;
using asio::strand;
using asio::system_executor;
//------------------------------------------------------------------------------
// A tiny actor framework
// ~~~~~~~~~~~~~~~~~~~~~~
class actor;
// Used to identify the sender and recipient of messages.
typedef actor* actor_address;
// Base class for all registered message handlers.
class message_handler_base
{
public:
virtual ~message_handler_base() {}
// Used to determine which message handlers receive an incoming message.
virtual const std::type_info& message_id() const = 0;
};
// Base class for a handler for a specific message type.
template <class Message>
class message_handler : public message_handler_base
{
public:
// Handle an incoming message.
virtual void handle_message(Message msg, actor_address from) = 0;
};
// Concrete message handler for a specific message type.
template <class Actor, class Message>
class mf_message_handler : public message_handler<Message>
{
public:
// Construct a message handler to invoke the specified member function.
mf_message_handler(void (Actor::* mf)(Message, actor_address), Actor* a)
: function_(mf), actor_(a)
{
}
// Used to determine which message handlers receive an incoming message.
virtual const std::type_info& message_id() const
{
return typeid(Message);
}
// Handle an incoming message.
virtual void handle_message(Message msg, actor_address from)
{
(actor_->*function_)(std::move(msg), from);
}
// Determine whether the message handler represents the specified function.
bool is_function(void (Actor::* mf)(Message, actor_address)) const
{
return mf == function_;
}
private:
void (Actor::* function_)(Message, actor_address);
Actor* actor_;
};
// Base class for all actors.
class actor
{
public:
virtual ~actor()
{
}
// Obtain the actor's address for use as a message sender or recipient.
actor_address address()
{
return this;
}
// Send a message from one actor to another.
template <class Message>
friend void send(Message msg, actor_address from, actor_address to)
{
// Execute the message handler in the context of the target's executor.
post(to->executor_,
[=]
{
to->call_handler(std::move(msg), from);
});
}
protected:
// Construct the actor to use the specified executor for all message handlers.
actor(executor e)
: executor_(std::move(e))
{
}
// Register a handler for a specific message type. Duplicates are permitted.
template <class Actor, class Message>
void register_handler(void (Actor::* mf)(Message, actor_address))
{
handlers_.push_back(
std::make_shared<mf_message_handler<Actor, Message>>(
mf, static_cast<Actor*>(this)));
}
// Deregister a handler. Removes only the first matching handler.
template <class Actor, class Message>
void deregister_handler(void (Actor::* mf)(Message, actor_address))
{
const std::type_info& id = typeid(message_handler<Message>);
for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter)
{
if ((*iter)->message_id() == id)
{
auto mh = static_cast<mf_message_handler<Actor, Message>*>(iter->get());
if (mh->is_function(mf))
{
handlers_.erase(iter);
return;
}
}
}
}
// Send a message from within a message handler.
template <class Message>
void tail_send(Message msg, actor_address to)
{
// Execute the message handler in the context of the target's executor.
actor* from = this;
defer(to->executor_,
[=]
{
to->call_handler(std::move(msg), from);
});
}
private:
// Find the matching message handlers, if any, and call them.
template <class Message>
void call_handler(Message msg, actor_address from)
{
const std::type_info& message_id = typeid(Message);
for (auto& h: handlers_)
{
if (h->message_id() == message_id)
{
auto mh = static_cast<message_handler<Message>*>(h.get());
mh->handle_message(msg, from);
}
}
}
// All messages associated with a single actor object should be processed
// non-concurrently. We use a strand to ensure non-concurrent execution even
// if the underlying executor may use multiple threads.
strand<executor> executor_;
std::vector<std::shared_ptr<message_handler_base>> handlers_;
};
// A concrete actor that allows synchronous message retrieval.
template <class Message>
class receiver : public actor
{
public:
receiver()
: actor(system_executor())
{
register_handler(&receiver::message_handler);
}
// Block until a message has been received.
Message wait()
{
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this]{ return !message_queue_.empty(); });
Message msg(std::move(message_queue_.front()));
message_queue_.pop_front();
return msg;
}
private:
// Handle a new message by adding it to the queue and waking a waiter.
void message_handler(Message msg, actor_address /* from */)
{
std::lock_guard<std::mutex> lock(mutex_);
message_queue_.push_back(std::move(msg));
condition_.notify_one();
}
std::mutex mutex_;
std::condition_variable condition_;
std::deque<Message> message_queue_;
};
//------------------------------------------------------------------------------
#include <asio/thread_pool.hpp>
#include <iostream>
using asio::thread_pool;
class member : public actor
{
public:
explicit member(executor e)
: actor(std::move(e))
{
register_handler(&member::init_handler);
}
private:
void init_handler(actor_address next, actor_address from)
{
next_ = next;
caller_ = from;
register_handler(&member::token_handler);
deregister_handler(&member::init_handler);
}
void token_handler(int token, actor_address /*from*/)
{
int msg(token);
actor_address to(caller_);
if (token > 0)
{
msg = token - 1;
to = next_;
}
tail_send(msg, to);
}
actor_address next_;
actor_address caller_;
};
int main()
{
const std::size_t num_threads = 16;
const int num_hops = 50000000;
const std::size_t num_actors = 503;
const int token_value = (num_hops + num_actors - 1) / num_actors;
const std::size_t actors_per_thread = num_actors / num_threads;
struct single_thread_pool : thread_pool { single_thread_pool() : thread_pool(1) {} };
single_thread_pool pools[num_threads];
std::vector<std::shared_ptr<member>> members(num_actors);
receiver<int> rcvr;
// Create the member actors.
for (std::size_t i = 0; i < num_actors; ++i)
members[i] = std::make_shared<member>(pools[(i / actors_per_thread) % num_threads].get_executor());
// Initialise the actors by passing each one the address of the next actor in the ring.
for (std::size_t i = num_actors, next_i = 0; i > 0; next_i = --i)
send(members[next_i]->address(), rcvr.address(), members[i - 1]->address());
// Send exactly one token to each actor, all with the same initial value, rounding up if required.
for (std::size_t i = 0; i < num_actors; ++i)
send(token_value, rcvr.address(), members[i]->address());
// Wait for all signal messages, indicating the tokens have all reached zero.
for (std::size_t i = 0; i < num_actors; ++i)
rcvr.wait();
}

View File

@ -10,6 +10,7 @@ LDADD = libasio.a
endif
noinst_PROGRAMS = \
executors/actor \
executors/async_1 \
executors/async_2 \
executors/bank_account_1 \
@ -20,6 +21,7 @@ noinst_PROGRAMS = \
AM_CXXFLAGS = -I$(srcdir)/../../../include
executors_actor_SOURCES = executors/actor.cpp
executors_async_1_SOURCES = executors/async_1.cpp
executors_async_2_SOURCES = executors/async_2.cpp
executors_bank_account_1_SOURCES = executors/bank_account_1.cpp

View File

@ -1,3 +1,4 @@
actor
async_[0-9]
bank_account_[0-9]
fork_join

View File

@ -0,0 +1,285 @@
#include <asio/defer.hpp>
#include <asio/executor.hpp>
#include <asio/post.hpp>
#include <asio/strand.hpp>
#include <asio/system_executor.hpp>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <typeinfo>
#include <vector>
using asio::defer;
using asio::executor;
using asio::post;
using asio::strand;
using asio::system_executor;
//------------------------------------------------------------------------------
// A tiny actor framework
// ~~~~~~~~~~~~~~~~~~~~~~
class actor;
// Used to identify the sender and recipient of messages.
typedef actor* actor_address;
// Base class for all registered message handlers.
class message_handler_base
{
public:
virtual ~message_handler_base() {}
// Used to determine which message handlers receive an incoming message.
virtual const std::type_info& message_id() const = 0;
};
// Base class for a handler for a specific message type.
template <class Message>
class message_handler : public message_handler_base
{
public:
// Handle an incoming message.
virtual void handle_message(Message msg, actor_address from) = 0;
};
// Concrete message handler for a specific message type.
template <class Actor, class Message>
class mf_message_handler : public message_handler<Message>
{
public:
// Construct a message handler to invoke the specified member function.
mf_message_handler(void (Actor::* mf)(Message, actor_address), Actor* a)
: function_(mf), actor_(a)
{
}
// Used to determine which message handlers receive an incoming message.
virtual const std::type_info& message_id() const
{
return typeid(Message);
}
// Handle an incoming message.
virtual void handle_message(Message msg, actor_address from)
{
(actor_->*function_)(std::move(msg), from);
}
// Determine whether the message handler represents the specified function.
bool is_function(void (Actor::* mf)(Message, actor_address)) const
{
return mf == function_;
}
private:
void (Actor::* function_)(Message, actor_address);
Actor* actor_;
};
// Base class for all actors.
class actor
{
public:
virtual ~actor()
{
}
// Obtain the actor's address for use as a message sender or recipient.
actor_address address()
{
return this;
}
// Send a message from one actor to another.
template <class Message>
friend void send(Message msg, actor_address from, actor_address to)
{
// Execute the message handler in the context of the target's executor.
post(to->executor_,
[=, msg=std::move(msg)]
{
to->call_handler(std::move(msg), from);
});
}
protected:
// Construct the actor to use the specified executor for all message handlers.
actor(executor e)
: executor_(std::move(e))
{
}
// Register a handler for a specific message type. Duplicates are permitted.
template <class Actor, class Message>
void register_handler(void (Actor::* mf)(Message, actor_address))
{
handlers_.push_back(
std::make_shared<mf_message_handler<Actor, Message>>(
mf, static_cast<Actor*>(this)));
}
// Deregister a handler. Removes only the first matching handler.
template <class Actor, class Message>
void deregister_handler(void (Actor::* mf)(Message, actor_address))
{
const std::type_info& id = typeid(message_handler<Message>);
for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter)
{
if ((*iter)->message_id() == id)
{
auto mh = static_cast<mf_message_handler<Actor, Message>*>(iter->get());
if (mh->is_function(mf))
{
handlers_.erase(iter);
return;
}
}
}
}
// Send a message from within a message handler.
template <class Message>
void tail_send(Message msg, actor_address to)
{
// Execute the message handler in the context of the target's executor.
defer(to->executor_,
[=, msg=std::move(msg), from=this]
{
to->call_handler(std::move(msg), from);
});
}
private:
// Find the matching message handlers, if any, and call them.
template <class Message>
void call_handler(Message msg, actor_address from)
{
const std::type_info& message_id = typeid(Message);
for (auto& h: handlers_)
{
if (h->message_id() == message_id)
{
auto mh = static_cast<message_handler<Message>*>(h.get());
mh->handle_message(msg, from);
}
}
}
// All messages associated with a single actor object should be processed
// non-concurrently. We use a strand to ensure non-concurrent execution even
// if the underlying executor may use multiple threads.
strand<executor> executor_;
std::vector<std::shared_ptr<message_handler_base>> handlers_;
};
// A concrete actor that allows synchronous message retrieval.
template <class Message>
class receiver : public actor
{
public:
receiver()
: actor(system_executor())
{
register_handler(&receiver::message_handler);
}
// Block until a message has been received.
Message wait()
{
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this]{ return !message_queue_.empty(); });
Message msg(std::move(message_queue_.front()));
message_queue_.pop_front();
return msg;
}
private:
// Handle a new message by adding it to the queue and waking a waiter.
void message_handler(Message msg, actor_address /* from */)
{
std::lock_guard<std::mutex> lock(mutex_);
message_queue_.push_back(std::move(msg));
condition_.notify_one();
}
std::mutex mutex_;
std::condition_variable condition_;
std::deque<Message> message_queue_;
};
//------------------------------------------------------------------------------
#include <asio/thread_pool.hpp>
#include <iostream>
using asio::thread_pool;
class member : public actor
{
public:
explicit member(executor e)
: actor(std::move(e))
{
register_handler(&member::init_handler);
}
private:
void init_handler(actor_address next, actor_address from)
{
next_ = next;
caller_ = from;
register_handler(&member::token_handler);
deregister_handler(&member::init_handler);
}
void token_handler(int token, actor_address /*from*/)
{
int msg(token);
actor_address to(caller_);
if (token > 0)
{
msg = token - 1;
to = next_;
}
tail_send(msg, to);
}
actor_address next_;
actor_address caller_;
};
int main()
{
const std::size_t num_threads = 16;
const int num_hops = 50000000;
const std::size_t num_actors = 503;
const int token_value = (num_hops + num_actors - 1) / num_actors;
const std::size_t actors_per_thread = num_actors / num_threads;
struct single_thread_pool : thread_pool { single_thread_pool() : thread_pool(1) {} };
single_thread_pool pools[num_threads];
std::vector<std::shared_ptr<member>> members(num_actors);
receiver<int> rcvr;
// Create the member actors.
for (std::size_t i = 0; i < num_actors; ++i)
members[i] = std::make_shared<member>(pools[(i / actors_per_thread) % num_threads].get_executor());
// Initialise the actors by passing each one the address of the next actor in the ring.
for (std::size_t i = num_actors, next_i = 0; i > 0; next_i = --i)
send(members[next_i]->address(), rcvr.address(), members[i - 1]->address());
// Send exactly one token to each actor, all with the same initial value, rounding up if required.
for (std::size_t i = 0; i < num_actors; ++i)
send(token_value, rcvr.address(), members[i]->address());
// Wait for all signal messages, indicating the tokens have all reached zero.
for (std::size_t i = 0; i < num_actors; ++i)
rcvr.wait();
}