Disable speculative operations after a short read or write.

When using epoll, interpret a short read or write on a stream-oriented
socket as an indication that we should wait for a readiness event. After
this condition, speculative operations are disabled until we receive the
event notification from epoll.
This commit is contained in:
Christopher Kohlhoff 2016-09-09 12:45:05 +10:00
parent c9e33cee0e
commit 6cec69ea0c
21 changed files with 123 additions and 51 deletions

View File

@ -122,6 +122,11 @@ public:
return count_;
}
std::size_t total_size() const
{
return total_buffer_size_;
}
bool all_empty() const
{
return total_buffer_size_ == 0;
@ -223,6 +228,11 @@ public:
return 1;
}
std::size_t total_size() const
{
return total_buffer_size_;
}
bool all_empty() const
{
return total_buffer_size_ == 0;
@ -270,6 +280,11 @@ public:
return 1;
}
std::size_t total_size() const
{
return total_buffer_size_;
}
bool all_empty() const
{
return total_buffer_size_ == 0;
@ -319,6 +334,11 @@ public:
return 1;
}
std::size_t total_size() const
{
return total_buffer_size_;
}
bool all_empty() const
{
return total_buffer_size_ == 0;
@ -366,6 +386,11 @@ public:
return 1;
}
std::size_t total_size() const
{
return total_buffer_size_;
}
bool all_empty() const
{
return total_buffer_size_ == 0;
@ -416,6 +441,11 @@ public:
return 2;
}
std::size_t total_size() const
{
return total_buffer_size_;
}
bool all_empty() const
{
return total_buffer_size_ == 0;
@ -468,6 +498,11 @@ public:
return 2;
}
std::size_t total_size() const
{
return total_buffer_size_;
}
bool all_empty() const
{
return total_buffer_size_ == 0;

View File

@ -44,15 +44,16 @@ public:
{
}
static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
descriptor_read_op_base* o(static_cast<descriptor_read_op_base*>(base));
buffer_sequence_adapter<asio::mutable_buffer,
MutableBufferSequence> bufs(o->buffers_);
bool result = descriptor_ops::non_blocking_read(o->descriptor_,
bufs.buffers(), bufs.count(), o->ec_, o->bytes_transferred_);
status result = descriptor_ops::non_blocking_read(o->descriptor_,
bufs.buffers(), bufs.count(), o->ec_, o->bytes_transferred_)
? done : not_done;
ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_read",
o->ec_, o->bytes_transferred_));

View File

@ -44,15 +44,16 @@ public:
{
}
static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
descriptor_write_op_base* o(static_cast<descriptor_write_op_base*>(base));
buffer_sequence_adapter<asio::const_buffer,
ConstBufferSequence> bufs(o->buffers_);
bool result = descriptor_ops::non_blocking_write(o->descriptor_,
bufs.buffers(), bufs.count(), o->ec_, o->bytes_transferred_);
status result = descriptor_ops::non_blocking_write(o->descriptor_,
bufs.buffers(), bufs.count(), o->ec_, o->bytes_transferred_)
? done : not_done;
ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_write",
o->ec_, o->bytes_transferred_));

View File

@ -62,6 +62,7 @@ public:
int descriptor_;
uint32_t registered_events_;
op_queue<reactor_op> op_queue_[max_ops];
bool try_speculative_[max_ops];
bool shutdown_;
ASIO_DECL descriptor_state(bool locking);

View File

@ -71,7 +71,7 @@ void epoll_reactor::move_timer(timer_queue<Time_Traits>& queue,
typename timer_queue<Time_Traits>::per_timer_data& target,
typename timer_queue<Time_Traits>::per_timer_data& source)
{
asio::detail::mutex::scoped_lock lock(mutex_);
mutex::scoped_lock lock(mutex_);
op_queue<operation> ops;
queue.cancel_timer(target, ops);
queue.move_timer(target, source);

View File

@ -162,6 +162,8 @@ int epoll_reactor::register_descriptor(socket_type descriptor,
descriptor_data->reactor_ = this;
descriptor_data->descriptor_ = descriptor;
descriptor_data->shutdown_ = false;
for (int i = 0; i < max_ops; ++i)
descriptor_data->try_speculative_[i] = true;
}
epoll_event ev = { 0, { 0 } };
@ -203,6 +205,8 @@ int epoll_reactor::register_internal_descriptor(
descriptor_data->descriptor_ = descriptor;
descriptor_data->shutdown_ = false;
descriptor_data->op_queue_[op_type].push(op);
for (int i = 0; i < max_ops; ++i)
descriptor_data->try_speculative_[i] = true;
}
epoll_event ev = { 0, { 0 } };
@ -249,11 +253,17 @@ void epoll_reactor::start_op(int op_type, socket_type descriptor,
&& (op_type != read_op
|| descriptor_data->op_queue_[except_op].empty()))
{
if (op->perform())
if (descriptor_data->try_speculative_[op_type])
{
descriptor_lock.unlock();
scheduler_.post_immediate_completion(op, is_continuation);
return;
if (reactor_op::status status = op->perform())
{
if (status == reactor_op::done_and_exhausted)
if (descriptor_data->registered_events_ != 0)
descriptor_data->try_speculative_[op_type] = false;
descriptor_lock.unlock();
scheduler_.post_immediate_completion(op, is_continuation);
return;
}
}
if (descriptor_data->registered_events_ == 0)
@ -695,12 +705,18 @@ operation* epoll_reactor::descriptor_state::perform_io(uint32_t events)
{
if (events & (flag[j] | EPOLLERR | EPOLLHUP))
{
try_speculative_[j] = true;
while (reactor_op* op = op_queue_[j].front())
{
if (op->perform())
if (reactor_op::status status = op->perform())
{
op_queue_[j].pop();
io_cleanup.ops_.push(op);
if (status == reactor_op::done_and_exhausted)
{
try_speculative_[j] = false;
break;
}
}
else
break;

View File

@ -93,7 +93,7 @@ public:
{
}
static bool do_perform(reactor_op*)
static status do_perform(reactor_op*)
{
signal_state* state = get_signal_state();
@ -103,7 +103,7 @@ public:
if (signal_number >= 0 && signal_number < max_signal_number)
signal_set_service::deliver_signal(signal_number);
return false;
return not_done;
}
static void do_complete(void* /*owner*/, operation* base,

View File

@ -41,9 +41,9 @@ public:
handler_work<Handler>::start(handler_);
}
static bool do_perform(reactor_op*)
static status do_perform(reactor_op*)
{
return true;
return done;
}
static void do_complete(void* owner, operation* base,

View File

@ -45,16 +45,16 @@ public:
{
}
static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_accept_op_base* o(
static_cast<reactive_socket_accept_op_base*>(base));
std::size_t addrlen = o->peer_endpoint_ ? o->peer_endpoint_->capacity() : 0;
socket_type new_socket = invalid_socket;
bool result = socket_ops::non_blocking_accept(o->socket_,
o->state_, o->peer_endpoint_ ? o->peer_endpoint_->data() : 0,
o->peer_endpoint_ ? &addrlen : 0, o->ec_, new_socket);
status result = socket_ops::non_blocking_accept(o->socket_,
o->state_, o->peer_endpoint_ ? o->peer_endpoint_->data() : 0,
o->peer_endpoint_ ? &addrlen : 0, o->ec_, new_socket) ? done : not_done;
ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_accept", o->ec_));

View File

@ -37,12 +37,13 @@ public:
{
}
static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_connect_op_base* o(
static_cast<reactive_socket_connect_op_base*>(base));
bool result = socket_ops::non_blocking_connect(o->socket_, o->ec_);
status result = socket_ops::non_blocking_connect(
o->socket_, o->ec_) ? done : not_done;
ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_connect", o->ec_));

View File

@ -43,7 +43,7 @@ public:
{
}
static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_recv_op_base* o(
static_cast<reactive_socket_recv_op_base*>(base));
@ -51,10 +51,15 @@ public:
buffer_sequence_adapter<asio::mutable_buffer,
MutableBufferSequence> bufs(o->buffers_);
bool result = socket_ops::non_blocking_recv(o->socket_,
status result = socket_ops::non_blocking_recv(o->socket_,
bufs.buffers(), bufs.count(), o->flags_,
(o->state_ & socket_ops::stream_oriented) != 0,
o->ec_, o->bytes_transferred_);
o->ec_, o->bytes_transferred_) ? done : not_done;
if (result == done)
if ((o->state_ & socket_ops::stream_oriented) != 0)
if (o->bytes_transferred_ < bufs.total_size())
result = done_and_exhausted;
ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_recv",
o->ec_, o->bytes_transferred_));

View File

@ -44,7 +44,7 @@ public:
{
}
static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_recvfrom_op_base* o(
static_cast<reactive_socket_recvfrom_op_base*>(base));
@ -53,10 +53,10 @@ public:
MutableBufferSequence> bufs(o->buffers_);
std::size_t addr_len = o->sender_endpoint_.capacity();
bool result = socket_ops::non_blocking_recvfrom(o->socket_,
status result = socket_ops::non_blocking_recvfrom(o->socket_,
bufs.buffers(), bufs.count(), o->flags_,
o->sender_endpoint_.data(), &addr_len,
o->ec_, o->bytes_transferred_);
o->ec_, o->bytes_transferred_) ? done : not_done;
if (result && !o->ec_)
o->sender_endpoint_.resize(addr_len);

View File

@ -44,7 +44,7 @@ public:
{
}
static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_recvmsg_op_base* o(
static_cast<reactive_socket_recvmsg_op_base*>(base));
@ -52,10 +52,10 @@ public:
buffer_sequence_adapter<asio::mutable_buffer,
MutableBufferSequence> bufs(o->buffers_);
bool result = socket_ops::non_blocking_recvmsg(o->socket_,
status result = socket_ops::non_blocking_recvmsg(o->socket_,
bufs.buffers(), bufs.count(),
o->in_flags_, o->out_flags_,
o->ec_, o->bytes_transferred_);
o->ec_, o->bytes_transferred_) ? done : not_done;
ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_recvmsg",
o->ec_, o->bytes_transferred_));

View File

@ -33,16 +33,17 @@ class reactive_socket_send_op_base : public reactor_op
{
public:
reactive_socket_send_op_base(socket_type socket,
const ConstBufferSequence& buffers,
socket_ops::state_type state, const ConstBufferSequence& buffers,
socket_base::message_flags flags, func_type complete_func)
: reactor_op(&reactive_socket_send_op_base::do_perform, complete_func),
socket_(socket),
state_(state),
buffers_(buffers),
flags_(flags)
{
}
static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_send_op_base* o(
static_cast<reactive_socket_send_op_base*>(base));
@ -50,9 +51,14 @@ public:
buffer_sequence_adapter<asio::const_buffer,
ConstBufferSequence> bufs(o->buffers_);
bool result = socket_ops::non_blocking_send(o->socket_,
status result = socket_ops::non_blocking_send(o->socket_,
bufs.buffers(), bufs.count(), o->flags_,
o->ec_, o->bytes_transferred_);
o->ec_, o->bytes_transferred_) ? done : not_done;
if (result == done)
if ((o->state_ & socket_ops::stream_oriented) != 0)
if (o->bytes_transferred_ < bufs.total_size())
result = done_and_exhausted;
ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_send",
o->ec_, o->bytes_transferred_));
@ -62,6 +68,7 @@ public:
private:
socket_type socket_;
socket_ops::state_type state_;
ConstBufferSequence buffers_;
socket_base::message_flags flags_;
};
@ -74,10 +81,10 @@ public:
ASIO_DEFINE_HANDLER_PTR(reactive_socket_send_op);
reactive_socket_send_op(socket_type socket,
const ConstBufferSequence& buffers,
socket_ops::state_type state, const ConstBufferSequence& buffers,
socket_base::message_flags flags, Handler& handler)
: reactive_socket_send_op_base<ConstBufferSequence>(socket,
buffers, flags, &reactive_socket_send_op::do_complete),
state, buffers, flags, &reactive_socket_send_op::do_complete),
handler_(ASIO_MOVE_CAST(Handler)(handler))
{
handler_work<Handler>::start(handler_);

View File

@ -43,7 +43,7 @@ public:
{
}
static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_sendto_op_base* o(
static_cast<reactive_socket_sendto_op_base*>(base));
@ -51,10 +51,10 @@ public:
buffer_sequence_adapter<asio::const_buffer,
ConstBufferSequence> bufs(o->buffers_);
bool result = socket_ops::non_blocking_sendto(o->socket_,
status result = socket_ops::non_blocking_sendto(o->socket_,
bufs.buffers(), bufs.count(), o->flags_,
o->destination_.data(), o->destination_.size(),
o->ec_, o->bytes_transferred_);
o->ec_, o->bytes_transferred_) ? done : not_done;
ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_sendto",
o->ec_, o->bytes_transferred_));

View File

@ -265,7 +265,7 @@ public:
typedef reactive_socket_send_op<ConstBufferSequence, Handler> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(impl.socket_, buffers, flags, handler);
p.p = new (p.v) op(impl.socket_, impl.state_, buffers, flags, handler);
ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_, "async_send"));

View File

@ -41,9 +41,9 @@ public:
handler_work<Handler>::start(handler_);
}
static bool do_perform(reactor_op*)
static status do_perform(reactor_op*)
{
return true;
return done;
}
static void do_complete(void* owner, operation* base,

View File

@ -33,14 +33,18 @@ public:
// The number of bytes transferred, to be passed to the completion handler.
std::size_t bytes_transferred_;
// Status returned by perform function. May be used to decide whether it is
// worth performing more operations on the descriptor immediately.
enum status { not_done, done, done_and_exhausted };
// Perform the operation. Returns true if it is finished.
bool perform()
status perform()
{
return perform_func_(this);
}
protected:
typedef bool (*perform_func_type)(reactor_op*);
typedef status (*perform_func_type)(reactor_op*);
reactor_op(perform_func_type perform_func, func_type complete_func)
: operation(complete_func),

View File

@ -50,9 +50,9 @@ public:
handler_work<Handler>::start(handler_);
}
static bool do_perform(reactor_op*)
static status do_perform(reactor_op*)
{
return true;
return done;
}
static void do_complete(void* owner, operation* base,

View File

@ -43,12 +43,13 @@ public:
{
}
static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
win_iocp_socket_connect_op_base* o(
static_cast<win_iocp_socket_connect_op_base*>(base));
return socket_ops::non_blocking_connect(o->socket_, o->ec_);
return socket_ops::non_blocking_connect(
o->socket_, o->ec_) ? done : not_done;
}
socket_type socket_;

View File

@ -50,9 +50,9 @@ public:
handler_work<Handler>::start(handler_);
}
static bool do_perform(reactor_op*)
static status do_perform(reactor_op*)
{
return true;
return done;
}
static void do_complete(void* owner, operation* base,