Cannot perform concurrent operations on the /dev/poll descriptor where

the sockets descriptors involved may already be being waited on. Changed
the dev_poll_reactor class to keep a vector of pending event changes and
interrupt the /dev/poll ioctl() wait to apply it.
This commit is contained in:
chris_kohlhoff 2007-11-12 11:48:59 +00:00
parent 83a1c63803
commit aa41681de5

View File

@ -140,22 +140,13 @@ public:
if (read_op_queue_.enqueue_operation(descriptor, handler))
{
::pollfd ev = { 0 };
ev.fd = descriptor;
::pollfd& ev = add_pending_event_change(descriptor);
ev.events = POLLIN | POLLERR | POLLHUP;
if (write_op_queue_.has_operation(descriptor))
ev.events |= POLLOUT;
if (except_op_queue_.has_operation(descriptor))
ev.events |= POLLPRI;
ev.revents = 0;
int result = ::write(dev_poll_fd_, &ev, sizeof(ev));
if (result != sizeof(ev))
{
asio::error_code ec(errno,
asio::error::system_category);
read_op_queue_.dispatch_all_operations(descriptor, ec);
}
interrupter_.interrupt();
}
}
@ -175,22 +166,13 @@ public:
if (write_op_queue_.enqueue_operation(descriptor, handler))
{
::pollfd ev = { 0 };
ev.fd = descriptor;
::pollfd& ev = add_pending_event_change(descriptor);
ev.events = POLLOUT | POLLERR | POLLHUP;
if (read_op_queue_.has_operation(descriptor))
ev.events |= POLLIN;
if (except_op_queue_.has_operation(descriptor))
ev.events |= POLLPRI;
ev.revents = 0;
int result = ::write(dev_poll_fd_, &ev, sizeof(ev));
if (result != sizeof(ev))
{
asio::error_code ec(errno,
asio::error::system_category);
write_op_queue_.dispatch_all_operations(descriptor, ec);
}
interrupter_.interrupt();
}
}
@ -206,22 +188,13 @@ public:
if (except_op_queue_.enqueue_operation(descriptor, handler))
{
::pollfd ev = { 0 };
ev.fd = descriptor;
::pollfd& ev = add_pending_event_change(descriptor);
ev.events = POLLPRI | POLLERR | POLLHUP;
if (read_op_queue_.has_operation(descriptor))
ev.events |= POLLIN;
if (write_op_queue_.has_operation(descriptor))
ev.events |= POLLOUT;
ev.revents = 0;
int result = ::write(dev_poll_fd_, &ev, sizeof(ev));
if (result != sizeof(ev))
{
asio::error_code ec(errno,
asio::error::system_category);
except_op_queue_.dispatch_all_operations(descriptor, ec);
}
interrupter_.interrupt();
}
}
@ -241,21 +214,11 @@ public:
&& need_mod;
if (need_mod)
{
::pollfd ev = { 0 };
ev.fd = descriptor;
::pollfd& ev = add_pending_event_change(descriptor);
ev.events = POLLOUT | POLLPRI | POLLERR | POLLHUP;
if (read_op_queue_.has_operation(descriptor))
ev.events |= POLLIN;
ev.revents = 0;
int result = ::write(dev_poll_fd_, &ev, sizeof(ev));
if (result != sizeof(ev))
{
asio::error_code ec(errno,
asio::error::system_category);
write_op_queue_.dispatch_all_operations(descriptor, ec);
except_op_queue_.dispatch_all_operations(descriptor, ec);
}
interrupter_.interrupt();
}
}
@ -285,11 +248,9 @@ public:
asio::detail::mutex::scoped_lock lock(mutex_);
// Remove the descriptor from /dev/poll.
::pollfd ev = { 0 };
ev.fd = descriptor;
::pollfd& ev = add_pending_event_change(descriptor);
ev.events = POLLREMOVE;
ev.revents = 0;
::write(dev_poll_fd_, &ev, sizeof(ev));
interrupter_.interrupt();
// Cancel any outstanding operations associated with the descriptor.
cancel_ops_unlocked(descriptor);
@ -374,6 +335,26 @@ private:
return;
}
// Write the pending event registration changes to the /dev/poll descriptor.
std::size_t events_size = sizeof(::pollfd) * pending_event_changes_.size();
errno = 0;
int result = ::write(dev_poll_fd_,
&pending_event_changes_[0], events_size);
if (result != static_cast<int>(events_size))
{
for (std::size_t i = 0; i < pending_event_changes_.size(); ++i)
{
int descriptor = pending_event_changes_[i].fd;
asio::error_code ec = asio::error_code(
errno, asio::error::system_category);
read_op_queue_.dispatch_all_operations(descriptor, ec);
write_op_queue_.dispatch_all_operations(descriptor, ec);
except_op_queue_.dispatch_all_operations(descriptor, ec);
}
}
pending_event_changes_.clear();
pending_event_change_index_.clear();
int timeout = block ? get_timeout() : 0;
wait_in_progress_ = true;
lock.unlock();
@ -588,12 +569,39 @@ private:
timer_queues_for_cleanup_[i]->cleanup_timers();
}
// Add a pending event entry for the given descriptor.
::pollfd& add_pending_event_change(int descriptor)
{
hash_map<int, std::size_t>::iterator iter
= pending_event_change_index_.find(descriptor);
if (iter == pending_event_change_index_.end())
{
std::size_t index = pending_event_changes_.size();
pending_event_changes_.reserve(pending_event_changes_.size() + 1);
pending_event_change_index_.insert(std::make_pair(descriptor, index));
pending_event_changes_.push_back(::pollfd());
pending_event_changes_[index].fd = descriptor;
pending_event_changes_[index].revents = 0;
return pending_event_changes_[index];
}
else
{
return pending_event_changes_[iter->second];
}
}
// Mutex to protect access to internal data.
asio::detail::mutex mutex_;
// The /dev/poll file descriptor.
int dev_poll_fd_;
// Vector of /dev/poll events waiting to be written to the descriptor.
std::vector< ::pollfd> pending_event_changes_;
// Hash map to associate a descriptor with a pending event change index.
hash_map<int, std::size_t> pending_event_change_index_;
// Whether the DP_POLL operation is currently in progress
bool wait_in_progress_;