Add lazy kqueue EVFILT_WRITE registration.

This fixes support for read-only file descriptors, such as those obtained
from libpcap by calling pcap_get_selectable_fd.
This commit is contained in:
Christopher Kohlhoff 2014-10-14 21:21:15 +11:00
parent 4b1731d973
commit b72b473cda
2 changed files with 44 additions and 18 deletions

View File

@ -100,9 +100,9 @@ void kqueue_reactor::fork_service(
EVFILT_READ, EV_ADD, 0, 0, &interrupter_); EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1) if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
{ {
asio::error_code error(errno, asio::error_code ec(errno,
asio::error::get_system_category()); asio::error::get_system_category());
asio::detail::throw_error(error); asio::detail::throw_error(ec, "kqueue interrupter registration");
} }
// Re-register all descriptors with kqueue. // Re-register all descriptors with kqueue.
@ -115,11 +115,11 @@ void kqueue_reactor::fork_service(
EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state); EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state);
ASIO_KQUEUE_EV_SET(&events[1], state->descriptor_, ASIO_KQUEUE_EV_SET(&events[1], state->descriptor_,
EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state); EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state);
if (::kevent(kqueue_fd_, events, 2, 0, 0, 0) == -1) if (::kevent(kqueue_fd_, events, state->num_kevents_, 0, 0, 0) == -1)
{ {
asio::error_code error(errno, asio::error_code ec(errno,
asio::error::get_system_category()); asio::error::get_system_category());
asio::detail::throw_error(error); asio::detail::throw_error(ec, "kqueue re-registration");
} }
} }
} }
@ -138,14 +138,13 @@ int kqueue_reactor::register_descriptor(socket_type descriptor,
mutex::scoped_lock lock(descriptor_data->mutex_); mutex::scoped_lock lock(descriptor_data->mutex_);
descriptor_data->descriptor_ = descriptor; descriptor_data->descriptor_ = descriptor;
descriptor_data->num_kevents_ = 1;
descriptor_data->shutdown_ = false; descriptor_data->shutdown_ = false;
struct kevent events[2]; struct kevent events[1];
ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ, ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data); EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE, if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
if (::kevent(kqueue_fd_, events, 2, 0, 0, 0) == -1)
return errno; return errno;
return 0; return 0;
@ -160,15 +159,14 @@ int kqueue_reactor::register_internal_descriptor(
mutex::scoped_lock lock(descriptor_data->mutex_); mutex::scoped_lock lock(descriptor_data->mutex_);
descriptor_data->descriptor_ = descriptor; descriptor_data->descriptor_ = descriptor;
descriptor_data->num_kevents_ = 1;
descriptor_data->shutdown_ = false; descriptor_data->shutdown_ = false;
descriptor_data->op_queue_[op_type].push(op); descriptor_data->op_queue_[op_type].push(op);
struct kevent events[2]; struct kevent events[1];
ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ, ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data); EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE, if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
if (::kevent(kqueue_fd_, events, 2, 0, 0, 0) == -1)
return errno; return errno;
return 0; return 0;
@ -201,8 +199,7 @@ void kqueue_reactor::start_op(int op_type, socket_type descriptor,
return; return;
} }
bool first = descriptor_data->op_queue_[op_type].empty(); if (descriptor_data->op_queue_[op_type].empty())
if (first)
{ {
if (allow_speculative if (allow_speculative
&& (op_type != read_op && (op_type != read_op
@ -214,15 +211,43 @@ void kqueue_reactor::start_op(int op_type, socket_type descriptor,
scheduler_.post_immediate_completion(op, is_continuation); scheduler_.post_immediate_completion(op, is_continuation);
return; return;
} }
}
else if (op_type == write_op)
{
if (descriptor_data->num_kevents_ == 1)
{ {
struct kevent events[2]; struct kevent events[2];
ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ, ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data); EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE, ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data); EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
::kevent(kqueue_fd_, events, 2, 0, 0, 0); if (::kevent(kqueue_fd_, events, 2, 0, 0, 0) != -1)
{
descriptor_data->num_kevents_ = 2;
}
else
{
op->ec_ = asio::error_code(errno,
asio::error::get_system_category());
io_service_.post_immediate_completion(op, is_continuation);
return;
}
}
}
}
else
{
if (op_type == write_op)
{
descriptor_data->num_kevents_ = 2;
}
struct kevent events[2];
ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
} }
} }
@ -276,7 +301,7 @@ void kqueue_reactor::deregister_descriptor(socket_type descriptor,
EVFILT_READ, EV_DELETE, 0, 0, 0); EVFILT_READ, EV_DELETE, 0, 0, 0);
ASIO_KQUEUE_EV_SET(&events[1], descriptor, ASIO_KQUEUE_EV_SET(&events[1], descriptor,
EVFILT_WRITE, EV_DELETE, 0, 0, 0); EVFILT_WRITE, EV_DELETE, 0, 0, 0);
::kevent(kqueue_fd_, events, 2, 0, 0, 0); ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
} }
op_queue<operation> ops; op_queue<operation> ops;
@ -317,7 +342,7 @@ void kqueue_reactor::deregister_internal_descriptor(socket_type descriptor,
EVFILT_READ, EV_DELETE, 0, 0, 0); EVFILT_READ, EV_DELETE, 0, 0, 0);
ASIO_KQUEUE_EV_SET(&events[1], descriptor, ASIO_KQUEUE_EV_SET(&events[1], descriptor,
EVFILT_WRITE, EV_DELETE, 0, 0, 0); EVFILT_WRITE, EV_DELETE, 0, 0, 0);
::kevent(kqueue_fd_, events, 2, 0, 0, 0); ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
op_queue<operation> ops; op_queue<operation> ops;
for (int i = 0; i < max_ops; ++i) for (int i = 0; i < max_ops; ++i)

View File

@ -67,6 +67,7 @@ public:
mutex mutex_; mutex mutex_;
int descriptor_; int descriptor_;
int num_kevents_; // 1 == read only, 2 == read and write
op_queue<reactor_op> op_queue_[max_ops]; op_queue<reactor_op> op_queue_[max_ops];
bool shutdown_; bool shutdown_;
}; };