Add bulk_execute() to thread_pool executor.

This commit is contained in:
Christopher Kohlhoff 2020-07-01 22:42:48 +10:00
parent 9174724d04
commit aa11542e38
7 changed files with 264 additions and 0 deletions

View File

@ -47,6 +47,7 @@ nobase_include_HEADERS = \
asio/detail/buffered_stream_storage.hpp \
asio/detail/buffer_resize_guard.hpp \
asio/detail/buffer_sequence_adapter.hpp \
asio/detail/bulk_executor_op.hpp \
asio/detail/call_stack.hpp \
asio/detail/chrono.hpp \
asio/detail/chrono_time_traits.hpp \

View File

@ -0,0 +1,88 @@
//
// detail/bulk_executor_op.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef ASIO_DETAIL_BULK_EXECUTOR_OP_HPP
#define ASIO_DETAIL_BULK_EXECUTOR_OP_HPP
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#include "asio/detail/config.hpp"
#include "asio/detail/bind_handler.hpp"
#include "asio/detail/fenced_block.hpp"
#include "asio/detail/handler_alloc_helpers.hpp"
#include "asio/detail/handler_invoke_helpers.hpp"
#include "asio/detail/scheduler_operation.hpp"
#include "asio/detail/push_options.hpp"
namespace asio {
namespace detail {
template <typename Handler, typename Alloc,
typename Operation = scheduler_operation>
class bulk_executor_op : public Operation
{
public:
ASIO_DEFINE_HANDLER_ALLOCATOR_PTR(bulk_executor_op);
template <typename H>
bulk_executor_op(ASIO_MOVE_ARG(H) h,
const Alloc& allocator, std::size_t i)
: Operation(&bulk_executor_op::do_complete),
handler_(ASIO_MOVE_CAST(H)(h)),
allocator_(allocator),
index_(i)
{
}
static void do_complete(void* owner, Operation* base,
const asio::error_code& /*ec*/,
std::size_t /*bytes_transferred*/)
{
// Take ownership of the handler object.
bulk_executor_op* o(static_cast<bulk_executor_op*>(base));
Alloc allocator(o->allocator_);
ptr p = { detail::addressof(allocator), o, o };
ASIO_HANDLER_COMPLETION((*o));
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder1<Handler, std::size_t> handler(o->handler_, o->index_);
p.reset();
// Make the upcall if required.
if (owner)
{
fenced_block b(fenced_block::half);
ASIO_HANDLER_INVOCATION_BEGIN(());
asio_handler_invoke_helpers::invoke(handler, handler.handler_);
ASIO_HANDLER_INVOCATION_END;
}
}
private:
Handler handler_;
Alloc allocator_;
std::size_t index_;
};
} // namespace detail
} // namespace asio
#include "asio/detail/pop_options.hpp"
#endif // ASIO_DETAIL_BULK_EXECUTOR_OP_HPP

View File

@ -353,6 +353,29 @@ void scheduler::post_immediate_completion(
wake_one_thread_and_unlock(lock);
}
void scheduler::post_immediate_completions(std::size_t n,
op_queue<scheduler::operation>& ops, bool is_continuation)
{
#if defined(ASIO_HAS_THREADS)
if (one_thread_ || is_continuation)
{
if (thread_info_base* this_thread = thread_call_stack::contains(this))
{
static_cast<thread_info*>(this_thread)->private_outstanding_work += n;
static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
return;
}
}
#else // defined(ASIO_HAS_THREADS)
(void)is_continuation;
#endif // defined(ASIO_HAS_THREADS)
increment(outstanding_work_, static_cast<long>(n));
mutex::scoped_lock lock(mutex_);
op_queue_.push(ops);
wake_one_thread_and_unlock(lock);
}
void scheduler::post_deferred_completion(scheduler::operation* op)
{
#if defined(ASIO_HAS_THREADS)

View File

@ -112,6 +112,11 @@ public:
ASIO_DECL void post_immediate_completion(
operation* op, bool is_continuation);
// Request invocation of the given operations and return immediately. Assumes
// that work_started() has not yet been called for the operations.
ASIO_DECL void post_immediate_completions(std::size_t n,
op_queue<operation>& ops, bool is_continuation);
// Request invocation of the given operation and return immediately. Assumes
// that work_started() was previously called for the operation.
ASIO_DECL void post_deferred_completion(operation* op);

View File

@ -16,6 +16,7 @@
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#include "asio/detail/blocking_executor_op.hpp"
#include "asio/detail/bulk_executor_op.hpp"
#include "asio/detail/executor_op.hpp"
#include "asio/detail/fenced_block.hpp"
#include "asio/detail/non_const_lvalue.hpp"
@ -187,6 +188,70 @@ void thread_pool::basic_executor_type<Allocator,
op.wait();
}
template <typename Allocator, unsigned int Bits>
template <typename Function>
void thread_pool::basic_executor_type<Allocator, Bits>::do_bulk_execute(
ASIO_MOVE_ARG(Function) f, std::size_t n, false_type) const
{
typedef typename decay<Function>::type function_type;
typedef detail::bulk_executor_op<function_type, Allocator> op;
// Allocate and construct operations to wrap the function.
detail::op_queue<detail::scheduler_operation> ops;
for (std::size_t i = 0; i < n; ++i)
{
typename op::ptr p = { detail::addressof(allocator_),
op::ptr::allocate(allocator_), 0 };
p.p = new (p.v) op(ASIO_MOVE_CAST(Function)(f), allocator_, i);
ops.push(p.p);
if ((bits_ & relationship_continuation) != 0)
{
ASIO_HANDLER_CREATION((*pool_, *p.p,
"thread_pool", pool_, 0, "bulk_execute(blk=never,rel=cont)"));
}
else
{
ASIO_HANDLER_CREATION((*pool_, *p.p,
"thread_pool", pool_, 0, "bulk)execute(blk=never,rel=fork)"));
}
p.v = p.p = 0;
}
pool_->scheduler_.post_immediate_completions(n,
ops, (bits_ & relationship_continuation) != 0);
}
template <typename Function>
struct thread_pool_always_blocking_function_adapter
{
typename decay<Function>::type* f;
std::size_t n;
void operator()()
{
for (std::size_t i = 0; i < n; ++i)
{
(*f)(i);
}
}
};
template <typename Allocator, unsigned int Bits>
template <typename Function>
void thread_pool::basic_executor_type<Allocator, Bits>::do_bulk_execute(
ASIO_MOVE_ARG(Function) f, std::size_t n, true_type) const
{
// Obtain a non-const instance of the function.
detail::non_const_lvalue<Function> f2(f);
thread_pool_always_blocking_function_adapter<Function>
adapter = { detail::addressof(f2.value), n };
this->do_execute(adapter, true_type());
}
#if !defined(ASIO_NO_TS_EXECUTORS)
template <typename Allocator, unsigned int Bits>
inline thread_pool& thread_pool::basic_executor_type<

View File

@ -398,6 +398,14 @@ public:
integral_constant<bool, (Bits & blocking_always) != 0>());
}
/// Bulk execution function.
template <typename Function>
void bulk_execute(ASIO_MOVE_ARG(Function) f, std::size_t n) const
{
this->do_bulk_execute(ASIO_MOVE_CAST(Function)(f), n,
integral_constant<bool, (Bits & blocking_always) != 0>());
}
/// Schedule function.
sender_type schedule() const ASIO_NOEXCEPT
{
@ -515,6 +523,16 @@ private:
template <typename Function>
void do_execute(ASIO_MOVE_ARG(Function) f, true_type) const;
/// Bulk execution helper implementation for possibly and never blocking.
template <typename Function>
void do_bulk_execute(ASIO_MOVE_ARG(Function) f,
std::size_t n, false_type) const;
/// Bulk execution helper implementation for always blocking.
template <typename Function>
void do_bulk_execute(ASIO_MOVE_ARG(Function) f,
std::size_t n, true_type) const;
// The underlying thread pool.
thread_pool* pool_;

View File

@ -443,6 +443,69 @@ void thread_pool_scheduler_test()
ASIO_CHECK(count == 10);
}
void thread_pool_executor_bulk_execute_test()
{
int count = 0;
thread_pool pool(1);
pool.executor().bulk_execute(
bindns::bind(increment, &count), 2);
asio::require(pool.executor(),
asio::execution::blocking.possibly).bulk_execute(
bindns::bind(increment, &count), 2);
asio::require(pool.executor(),
asio::execution::blocking.always).bulk_execute(
bindns::bind(increment, &count), 2);
asio::require(pool.executor(),
asio::execution::blocking.never).bulk_execute(
bindns::bind(increment, &count), 2);
asio::require(pool.executor(),
asio::execution::blocking.never,
asio::execution::outstanding_work.tracked).bulk_execute(
bindns::bind(increment, &count), 2);
asio::require(pool.executor(),
asio::execution::blocking.never,
asio::execution::outstanding_work.untracked).bulk_execute(
bindns::bind(increment, &count), 2);
asio::require(pool.executor(),
asio::execution::blocking.never,
asio::execution::outstanding_work.untracked,
asio::execution::relationship.fork).bulk_execute(
bindns::bind(increment, &count), 2);
asio::require(pool.executor(),
asio::execution::blocking.never,
asio::execution::outstanding_work.untracked,
asio::execution::relationship.continuation).bulk_execute(
bindns::bind(increment, &count), 2);
asio::prefer(
asio::require(pool.executor(),
asio::execution::blocking.never,
asio::execution::outstanding_work.untracked,
asio::execution::relationship.continuation),
asio::execution::allocator(std::allocator<void>())).bulk_execute(
bindns::bind(increment, &count), 2);
asio::prefer(
asio::require(pool.executor(),
asio::execution::blocking.never,
asio::execution::outstanding_work.untracked,
asio::execution::relationship.continuation),
asio::execution::allocator).bulk_execute(
bindns::bind(increment, &count), 2);
pool.wait();
ASIO_CHECK(count == 20);
}
ASIO_TEST_SUITE
(
"thread_pool",
@ -450,5 +513,6 @@ ASIO_TEST_SUITE
ASIO_TEST_CASE(thread_pool_service_test)
ASIO_TEST_CASE(thread_pool_executor_query_test)
ASIO_TEST_CASE(thread_pool_executor_execute_test)
ASIO_TEST_CASE(thread_pool_executor_bulk_execute_test)
ASIO_TEST_CASE(thread_pool_scheduler_test)
)