Remove need to have a mutex per idle thread.

This commit is contained in:
chris_kohlhoff 2007-08-30 12:38:44 +00:00
parent d7bf2a4bcd
commit c2957d1adc
6 changed files with 65 additions and 44 deletions

View File

@ -43,17 +43,20 @@ public:
} }
// Signal the event. // Signal the event.
void signal() template <typename Lock>
void signal(Lock&)
{ {
} }
// Reset the event. // Reset the event.
void clear() template <typename Lock>
void clear(Lock&)
{ {
} }
// Wait for the event to become signalled. // Wait for the event to become signalled.
void wait() template <typename Lock>
void wait(Lock&)
{ {
} }
}; };

View File

@ -24,6 +24,7 @@
#if defined(BOOST_HAS_PTHREADS) #if defined(BOOST_HAS_PTHREADS)
#include "asio/detail/push_options.hpp" #include "asio/detail/push_options.hpp"
#include <boost/assert.hpp>
#include <boost/throw_exception.hpp> #include <boost/throw_exception.hpp>
#include <pthread.h> #include <pthread.h>
#include "asio/detail/pop_options.hpp" #include "asio/detail/pop_options.hpp"
@ -42,7 +43,7 @@ public:
posix_event() posix_event()
: signalled_(false) : signalled_(false)
{ {
int error = ::pthread_mutex_init(&mutex_, 0); int error = ::pthread_cond_init(&cond_, 0);
if (error != 0) if (error != 0)
{ {
asio::system_error e( asio::system_error e(
@ -50,53 +51,43 @@ public:
"event"); "event");
boost::throw_exception(e); boost::throw_exception(e);
} }
error = ::pthread_cond_init(&cond_, 0);
if (error != 0)
{
::pthread_mutex_destroy(&mutex_);
asio::system_error e(
asio::error_code(error, asio::native_ecat),
"event");
boost::throw_exception(e);
}
} }
// Destructor. // Destructor.
~posix_event() ~posix_event()
{ {
::pthread_cond_destroy(&cond_); ::pthread_cond_destroy(&cond_);
::pthread_mutex_destroy(&mutex_);
} }
// Signal the event. // Signal the event.
void signal() template <typename Lock>
void signal(Lock& lock)
{ {
::pthread_mutex_lock(&mutex_); // Ignore EINVAL and EDEADLK. BOOST_ASSERT(lock.locked());
(void)lock;
signalled_ = true; signalled_ = true;
::pthread_cond_signal(&cond_); // Ignore EINVAL. ::pthread_cond_signal(&cond_); // Ignore EINVAL.
::pthread_mutex_unlock(&mutex_); // Ignore EINVAL and EPERM.
} }
// Reset the event. // Reset the event.
void clear() template <typename Lock>
void clear(Lock& lock)
{ {
::pthread_mutex_lock(&mutex_); // Ignore EINVAL and EDEADLK. BOOST_ASSERT(lock.locked());
(void)lock;
signalled_ = false; signalled_ = false;
::pthread_mutex_unlock(&mutex_); // Ignore EINVAL and EPERM.
} }
// Wait for the event to become signalled. // Wait for the event to become signalled.
void wait() template <typename Lock>
void wait(Lock& lock)
{ {
::pthread_mutex_lock(&mutex_); // Ignore EINVAL and EDEADLK. BOOST_ASSERT(lock.locked());
while (!signalled_) while (!signalled_)
::pthread_cond_wait(&cond_, &mutex_); // Ignore EINVAL. ::pthread_cond_wait(&cond_, &lock.mutex().mutex_); // Ignore EINVAL.
::pthread_mutex_unlock(&mutex_); // Ignore EINVAL and EPERM.
} }
private: private:
::pthread_mutex_t mutex_;
::pthread_cond_t cond_; ::pthread_cond_t cond_;
bool signalled_; bool signalled_;
}; };

View File

@ -35,6 +35,8 @@
namespace asio { namespace asio {
namespace detail { namespace detail {
class posix_event;
class posix_mutex class posix_mutex
: private noncopyable : private noncopyable
{ {
@ -87,6 +89,7 @@ public:
} }
private: private:
friend class posix_event;
::pthread_mutex_t mutex_; ::pthread_mutex_t mutex_;
}; };

View File

@ -63,6 +63,18 @@ public:
} }
} }
// Test whether the lock is held.
bool locked() const
{
return locked_;
}
// Get the underlying mutex.
Mutex& mutex()
{
return mutex_;
}
private: private:
// The underlying mutex. // The underlying mutex.
Mutex& mutex_; Mutex& mutex_;

View File

@ -132,7 +132,7 @@ public:
void stop() void stop()
{ {
asio::detail::mutex::scoped_lock lock(mutex_); asio::detail::mutex::scoped_lock lock(mutex_);
stop_all_threads(); stop_all_threads(lock);
} }
// Reset in preparation for a subsequent run invocation. // Reset in preparation for a subsequent run invocation.
@ -154,7 +154,7 @@ public:
{ {
asio::detail::mutex::scoped_lock lock(mutex_); asio::detail::mutex::scoped_lock lock(mutex_);
if (--outstanding_work_ == 0) if (--outstanding_work_ == 0)
stop_all_threads(); stop_all_threads(lock);
} }
// Request invocation of the given handler. // Request invocation of the given handler.
@ -199,7 +199,7 @@ public:
++outstanding_work_; ++outstanding_work_;
// Wake up a thread to execute the handler. // Wake up a thread to execute the handler.
if (!interrupt_one_idle_thread()) if (!interrupt_one_idle_thread(lock))
if (task_handler_.next_ == 0 && handler_queue_end_ != &task_handler_) if (task_handler_.next_ == 0 && handler_queue_end_ != &task_handler_)
task_.interrupt(); task_.interrupt();
} }
@ -212,7 +212,7 @@ private:
{ {
if (outstanding_work_ == 0 && !stopped_) if (outstanding_work_ == 0 && !stopped_)
{ {
stop_all_threads(); stop_all_threads(lock);
ec = asio::error_code(); ec = asio::error_code();
return 0; return 0;
} }
@ -267,10 +267,8 @@ private:
// Nothing to run right now, so just wait for work to do. // Nothing to run right now, so just wait for work to do.
this_idle_thread->next = first_idle_thread_; this_idle_thread->next = first_idle_thread_;
first_idle_thread_ = this_idle_thread; first_idle_thread_ = this_idle_thread;
this_idle_thread->wakeup_event.clear(); this_idle_thread->wakeup_event.clear(lock);
lock.unlock(); this_idle_thread->wakeup_event.wait(lock);
this_idle_thread->wakeup_event.wait();
lock.lock();
} }
else else
{ {
@ -284,38 +282,41 @@ private:
} }
// Stop the task and all idle threads. // Stop the task and all idle threads.
void stop_all_threads() void stop_all_threads(
asio::detail::mutex::scoped_lock& lock)
{ {
stopped_ = true; stopped_ = true;
interrupt_all_idle_threads(); interrupt_all_idle_threads(lock);
if (task_handler_.next_ == 0 && handler_queue_end_ != &task_handler_) if (task_handler_.next_ == 0 && handler_queue_end_ != &task_handler_)
task_.interrupt(); task_.interrupt();
} }
// Interrupt a single idle thread. Returns true if a thread was interrupted, // Interrupt a single idle thread. Returns true if a thread was interrupted,
// false if no running thread could be found to interrupt. // false if no running thread could be found to interrupt.
bool interrupt_one_idle_thread() bool interrupt_one_idle_thread(
asio::detail::mutex::scoped_lock& lock)
{ {
if (first_idle_thread_) if (first_idle_thread_)
{ {
idle_thread_info* idle_thread = first_idle_thread_; idle_thread_info* idle_thread = first_idle_thread_;
first_idle_thread_ = idle_thread->next; first_idle_thread_ = idle_thread->next;
idle_thread->next = 0; idle_thread->next = 0;
idle_thread->wakeup_event.signal(); idle_thread->wakeup_event.signal(lock);
return true; return true;
} }
return false; return false;
} }
// Interrupt all idle threads. // Interrupt all idle threads.
void interrupt_all_idle_threads() void interrupt_all_idle_threads(
asio::detail::mutex::scoped_lock& lock)
{ {
while (first_idle_thread_) while (first_idle_thread_)
{ {
idle_thread_info* idle_thread = first_idle_thread_; idle_thread_info* idle_thread = first_idle_thread_;
first_idle_thread_ = idle_thread->next; first_idle_thread_ = idle_thread->next;
idle_thread->next = 0; idle_thread->next = 0;
idle_thread->wakeup_event.signal(); idle_thread->wakeup_event.signal(lock);
} }
} }
@ -459,7 +460,7 @@ private:
{ {
lock_.lock(); lock_.lock();
if (--task_io_service_.outstanding_work_ == 0) if (--task_io_service_.outstanding_work_ == 0)
task_io_service_.stop_all_threads(); task_io_service_.stop_all_threads(lock_);
} }
private: private:

View File

@ -28,6 +28,7 @@
#include "asio/detail/socket_types.hpp" #include "asio/detail/socket_types.hpp"
#include "asio/detail/push_options.hpp" #include "asio/detail/push_options.hpp"
#include <boost/assert.hpp>
#include <boost/throw_exception.hpp> #include <boost/throw_exception.hpp>
#include "asio/detail/pop_options.hpp" #include "asio/detail/pop_options.hpp"
@ -59,21 +60,31 @@ public:
} }
// Signal the event. // Signal the event.
void signal() template <typename Lock>
void signal(Lock& lock)
{ {
BOOST_ASSERT(lock.locked());
(void)lock;
::SetEvent(event_); ::SetEvent(event_);
} }
// Reset the event. // Reset the event.
void clear() template <typename Lock>
void clear(Lock& lock)
{ {
BOOST_ASSERT(lock.locked());
(void)lock;
::ResetEvent(event_); ::ResetEvent(event_);
} }
// Wait for the event to become signalled. // Wait for the event to become signalled.
void wait() template <typename Lock>
void wait(Lock& lock)
{ {
BOOST_ASSERT(lock.locked());
lock.unlock();
::WaitForSingleObject(event_, INFINITE); ::WaitForSingleObject(event_, INFINITE);
lock.lock();
} }
private: private: