Implement deferred closure of sockets if the select system call is

currently in progress.
This commit is contained in:
chris 2003-10-17 08:49:30 +00:00
parent 1c2b183e93
commit cf1f5270b9
6 changed files with 111 additions and 28 deletions

View File

@ -73,8 +73,7 @@ public:
{
if (impl != null())
{
reactor_.close_descriptor(impl);
socket_ops::close(impl);
reactor_.close_descriptor(impl, socket_ops::close);
impl = null();
}
}

View File

@ -84,8 +84,7 @@ public:
{
if (impl != null())
{
reactor_.close_descriptor(impl);
socket_ops::close(impl);
reactor_.close_descriptor(impl, socket_ops::close);
impl = null();
}
}

View File

@ -104,7 +104,7 @@ public:
impl->get_sockets(sockets);
typename connector_impl::socket_set::iterator i = sockets.begin();
while (i != sockets.end())
reactor_.close_descriptor(*i++);
reactor_.close_descriptor(*i++, socket_ops::close);
delete impl;
impl = null();
}
@ -203,8 +203,8 @@ public:
void do_cancel()
{
socket_holder new_socket_holder(new_socket_);
impl_->remove_socket(new_socket_);
// The socket is closed when the reactor_.close_descriptor is called,
// so no need to close it here.
socket_error error(socket_error::operation_aborted);
demuxer_.operation_completed(bind_handler(handler_, error), context_);
}

View File

@ -58,8 +58,7 @@ public:
{
if (impl != null())
{
reactor_.close_descriptor(impl);
socket_ops::close(impl);
reactor_.close_descriptor(impl, socket_ops::close);
impl = null();
}
}

View File

@ -30,6 +30,13 @@ class reactor_op_queue
: private boost::noncopyable
{
public:
// Constructor.
reactor_op_queue()
: operations_(),
cancelled_operations_(0)
{
}
// Add a new operation to the queue. Returns true if this is the only
// operation for the given descriptor, in which case the reactor's event
// demultiplexing function call may need to be interrupted and restarted.
@ -52,6 +59,28 @@ public:
return false;
}
// Close the given descriptor. Any operations pending for the descriptor will
// be notified that they have been cancelled next time dispatch_cancellations
// is called. Returns true if any operations were cancelled, in which case
// the reactor's event demultiplexing function may need to be interrupted and
// restarted.
bool close_descriptor(Descriptor descriptor)
{
typename operation_map::iterator i = operations_.find(descriptor);
if (i != operations_.end())
{
op_base* last_op = i->second;
while (last_op->next_)
last_op = last_op->next_;
last_op->next_ = cancelled_operations_;
cancelled_operations_ = i->second;
operations_.erase(i);
return true;
}
return false;
}
// Whether there are no operations in the queue.
bool empty() const
{
@ -94,23 +123,16 @@ public:
}
}
// Close the given descriptor. Any operations pending for the descriptor will
// be notified that they are being cancelled.
void close_descriptor(Descriptor descriptor)
// Dispatch any pending cancels for operations.
void dispatch_cancellations()
{
typename operation_map::iterator i = operations_.find(descriptor);
if (i != operations_.end())
while (cancelled_operations_)
{
op_base* op = i->second;
while (op)
{
op_base* next_op = op->next_;
op->next_ = 0;
op->do_cancel();
delete op;
op = next_op;
}
operations_.erase(i);
op_base* next_op = cancelled_operations_->next_;
cancelled_operations_->next_ = 0;
cancelled_operations_->do_cancel();
delete cancelled_operations_;
cancelled_operations_ = next_op;
}
}
@ -188,6 +210,9 @@ private:
// The operations that are currently executing asynchronously.
operation_map operations_;
// The list of operations that have been cancelled.
op_base* cancelled_operations_;
};
} // namespace detail

View File

@ -43,6 +43,7 @@ public:
template <typename Demuxer>
select_reactor(Demuxer&)
: mutex_(),
select_in_progress_(false),
interrupter_(),
read_op_queue_(),
write_op_queue_(),
@ -115,13 +116,61 @@ public:
write_op_queue_.enqueue_operation(descriptor, handler);
}
// Class template to adapt a close function as a timer handler.
template <typename Close_Function>
class close_handler
{
public:
close_handler(socket_type descriptor, Close_Function close_function)
: descriptor_(descriptor),
close_function_(close_function)
{
}
void do_operation()
{
close_function_(descriptor_);
}
void do_cancel()
{
}
private:
socket_type descriptor_;
Close_Function close_function_;
};
// Close the given descriptor and cancel any operations that are running
// against it.
void close_descriptor(socket_type descriptor)
// against it. The given close function will be called to actually perform
// the closure of the resource.
template <typename Close_Function>
void close_descriptor(socket_type descriptor, Close_Function close_function)
{
asio::detail::mutex::scoped_lock lock(mutex_);
read_op_queue_.close_descriptor(descriptor);
write_op_queue_.close_descriptor(descriptor);
// We need to interrupt the select if any operations were cancelled.
bool interrupt = read_op_queue_.close_descriptor(descriptor);
interrupt = write_op_queue_.close_descriptor(descriptor) || interrupt;
if (select_in_progress_)
{
// The close function cannot be called while the select call is running,
// so we schedule a dummy timer to perform the socket close when the
// select has been interrupted.
void* token = 0;
interrupt = timer_queue_.enqueue_timer(time(0, 0),
close_handler<Close_Function>(descriptor, close_function), token)
|| interrupt;
}
else
{
// Select is not currently running so we can close the socket now.
close_function(descriptor);
}
if (interrupt)
interrupter_.interrupt();
}
// Schedule a timer to expire at the specified absolute time. The
@ -163,6 +212,11 @@ private:
{
asio::detail::mutex::scoped_lock lock(mutex_);
// Dispatch any operation cancellations that were made while the select
// loop was not running.
read_op_queue_.dispatch_cancellations();
write_op_queue_.dispatch_cancellations();
bool stop = false;
while (!stop)
{
@ -179,9 +233,11 @@ private:
// operations can be started while the call is executing.
timeval tv_buf;
timeval* tv = get_timeout(tv_buf);
select_in_progress_ = true;
lock.unlock();
int retval = socket_ops::select(max_fd + 1, read_fds, write_fds, 0, tv);
lock.lock();
select_in_progress_ = false;
// Reset the interrupter.
if (read_fds.is_set(interrupter_.read_descriptor()))
@ -192,6 +248,8 @@ private:
{
read_op_queue_.dispatch_descriptors(read_fds);
write_op_queue_.dispatch_descriptors(write_fds);
read_op_queue_.dispatch_cancellations();
write_op_queue_.dispatch_cancellations();
}
timer_queue_.dispatch_timers(time::now());
}
@ -286,6 +344,9 @@ private:
// Mutex to protect access to internal data.
asio::detail::mutex mutex_;
// Whether the select loop is currently running or not.
bool select_in_progress_;
// The interrupter is used to break a blocking select call.
select_interrupter interrupter_;