Added initial locking_dispatcher implementation.

This commit is contained in:
chris 2004-03-22 07:59:33 +00:00
parent 24e0725efa
commit f80bc25875
4 changed files with 174 additions and 39 deletions

View File

@ -62,6 +62,7 @@ check: all
tests\unit\dgram_socket_test.exe && \
tests\unit\error_handler_test.exe && \
tests\unit\fixed_buffer_test.exe && \
tests\unit\locking_dispatcher_test.exe && \
tests\unit\socket_acceptor_test.exe && \
tests\unit\timer_test.exe

View File

@ -56,7 +56,7 @@ class session
{
public:
session(demuxer& d, size_t block_size, stats& s)
: demuxer_(d),
: dispatcher_(d),
socket_(d),
block_size_(block_size),
recv_data_(new char[block_size]),
@ -86,15 +86,15 @@ public:
void start()
{
++unsent_count_;
async_send_n(socket_, send_data_, block_size_,
boost::bind(&session::handle_send, this, _1, _2, _3));
socket_.async_recv(recv_data_, block_size_,
boost::bind(&session::handle_recv, this, _1, _2));
async_send_n(socket_, send_data_, block_size_, dispatcher_.wrap(
boost::bind(&session::handle_send, this, _1, _2, _3)));
socket_.async_recv(recv_data_, block_size_, dispatcher_.wrap(
boost::bind(&session::handle_recv, this, _1, _2)));
}
void stop()
{
demuxer_.post(boost::bind(&stream_socket::close, &socket_));
dispatcher_.post(boost::bind(&stream_socket::close, &socket_));
}
void handle_recv(const socket_error& error, size_t length)
@ -107,10 +107,10 @@ public:
if (unsent_count_ == 1)
{
std::swap(recv_data_, send_data_);
async_send_n(socket_, send_data_, length,
boost::bind(&session::handle_send, this, _1, _2, _3));
socket_.async_recv(recv_data_, block_size_,
boost::bind(&session::handle_recv, this, _1, _2));
async_send_n(socket_, send_data_, length, dispatcher_.wrap(
boost::bind(&session::handle_send, this, _1, _2, _3)));
socket_.async_recv(recv_data_, block_size_, dispatcher_.wrap(
boost::bind(&session::handle_recv, this, _1, _2)));
}
}
}
@ -125,16 +125,16 @@ public:
if (unsent_count_ == 1)
{
std::swap(recv_data_, send_data_);
async_send_n(socket_, send_data_, length,
boost::bind(&session::handle_send, this, _1, _2, _3));
socket_.async_recv(recv_data_, block_size_,
boost::bind(&session::handle_recv, this, _1, _2));
async_send_n(socket_, send_data_, length, dispatcher_.wrap(
boost::bind(&session::handle_send, this, _1, _2, _3)));
socket_.async_recv(recv_data_, block_size_, dispatcher_.wrap(
boost::bind(&session::handle_recv, this, _1, _2)));
}
}
}
private:
demuxer& demuxer_;
locking_dispatcher dispatcher_;
stream_socket socket_;
size_t block_size_;
char* recv_data_;
@ -151,6 +151,7 @@ public:
client(demuxer& d, const char* host, short port, size_t block_size,
size_t session_count, int timeout)
: demuxer_(d),
dispatcher_(d),
stop_timer_(d, timer::from_now, timeout),
connector_(d),
server_addr_(port, host),
@ -161,9 +162,11 @@ public:
{
session* new_session = new session(demuxer_, block_size, stats_);
connector_.async_connect(new_session->socket(), server_addr_,
boost::bind(&client::handle_connect, this, new_session, _1));
dispatcher_.wrap(boost::bind(&client::handle_connect, this,
new_session, _1)));
stop_timer_.async_wait(boost::bind(&client::handle_timeout, this));
stop_timer_.async_wait(dispatcher_.wrap(
boost::bind(&client::handle_timeout, this)));
}
~client()
@ -194,7 +197,8 @@ public:
{
new_session = new session(demuxer_, block_size_, stats_);
connector_.async_connect(new_session->socket(), server_addr_,
boost::bind(&client::handle_connect, this, new_session, _1));
dispatcher_.wrap(boost::bind(&client::handle_connect, this,
new_session, _1)));
}
}
else
@ -205,6 +209,7 @@ public:
private:
demuxer& demuxer_;
locking_dispatcher dispatcher_;
timer stop_timer_;
socket_connector connector_;
ipv4::address server_addr_;
@ -237,23 +242,22 @@ int main(int argc, char* argv[])
client c(d, host, port, block_size, session_count, timeout);
// Threads not currently supported in this test.
/*std::list<detail::thread*> threads;
std::list<detail::thread*> threads;
while (--thread_count > 0)
{
detail::thread* new_thread =
new detail::thread(boost::bind(&demuxer::run, &d));
threads.push_back(new_thread);
}*/
}
d.run();
/*while (!threads.empty())
while (!threads.empty())
{
threads.front()->join();
delete threads.front();
threads.pop_front();
}*/
}
}
catch (std::exception& e)
{

View File

@ -24,7 +24,9 @@ class session
{
public:
session(demuxer& d, size_t block_size)
: socket_(d),
: demuxer_(d),
dispatcher_(d),
socket_(d),
block_size_(block_size),
recv_data_(new char[block_size]),
send_data_(new char[block_size]),
@ -48,7 +50,7 @@ public:
{
++op_count_;
socket_.async_recv(recv_data_, block_size_,
boost::bind(&session::handle_recv, this, _1, _2));
dispatcher_.wrap(boost::bind(&session::handle_recv, this, _1, _2)));
}
void handle_recv(const socket_error& error, size_t length)
@ -60,15 +62,15 @@ public:
{
op_count_ += 2;
std::swap(recv_data_, send_data_);
async_send_n(socket_, send_data_, length,
boost::bind(&session::handle_send, this, _1, _2, _3));
socket_.async_recv(recv_data_, block_size_,
boost::bind(&session::handle_recv, this, _1, _2));
async_send_n(socket_, send_data_, length, dispatcher_.wrap(
boost::bind(&session::handle_send, this, _1, _2, _3)));
socket_.async_recv(recv_data_, block_size_, dispatcher_.wrap(
boost::bind(&session::handle_recv, this, _1, _2)));
}
}
else if (--op_count_ == 0)
{
delete this;
demuxer_.post(boost::bind(&session::destroy, this));
}
}
@ -81,19 +83,26 @@ public:
{
op_count_ += 2;
std::swap(recv_data_, send_data_);
async_send_n(socket_, send_data_, length,
boost::bind(&session::handle_send, this, _1, _2, _3));
socket_.async_recv(recv_data_, block_size_,
boost::bind(&session::handle_recv, this, _1, _2));
async_send_n(socket_, send_data_, length, dispatcher_.wrap(
boost::bind(&session::handle_send, this, _1, _2, _3)));
socket_.async_recv(recv_data_, block_size_, dispatcher_.wrap(
boost::bind(&session::handle_recv, this, _1, _2)));
}
}
else if (--op_count_ == 0)
{
delete this;
demuxer_.post(boost::bind(&session::destroy, this));
}
}
static void destroy(session* s)
{
delete s;
}
private:
demuxer& demuxer_;
locking_dispatcher dispatcher_;
stream_socket socket_;
size_t block_size_;
char* recv_data_;
@ -162,22 +171,22 @@ int main(int argc, char* argv[])
server s(d, port, block_size);
// Threads not currently supported in this test.
/*std::list<detail::thread*> threads;
std::list<detail::thread*> threads;
while (--thread_count > 0)
{
detail::thread* new_thread =
new detail::thread(boost::bind(&demuxer::run, &d));
threads.push_back(new_thread);
}*/
}
d.run();
/*while (!threads.empty())
while (!threads.empty())
{
threads.front()->join();
delete threads.front();
threads.pop_front();
}*/
}
}
catch (std::exception& e)
{

View File

@ -0,0 +1,121 @@
//
// locking_dispatcher_test.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003, 2004 Christopher M. Kohlhoff (chris@kohlhoff.com)
//
// Permission to use, copy, modify, distribute and sell this software and its
// documentation for any purpose is hereby granted without fee, provided that
// the above copyright notice appears in all copies and that both the copyright
// notice and this permission notice appear in supporting documentation. This
// software is provided "as is" without express or implied warranty, and with
// no claim as to its suitability for any purpose.
//
#include <sstream>
#include <boost/bind.hpp>
#include "asio.hpp"
#include "unit_test.hpp"
using namespace asio;
void increment(int* count)
{
++(*count);
}
void increment_without_lock(locking_dispatcher* l, int* count)
{
int original_count = *count;
l->dispatch(boost::bind(increment, count));
// No other functions are currently executing through the locking dispatcher,
// so the previous call to dispatch should have successfully nested.
UNIT_TEST_CHECK(*count == original_count + 1);
}
void increment_with_lock(locking_dispatcher* l, int* count)
{
int original_count = *count;
l->dispatch(boost::bind(increment, count));
// The current function already holds the locking_dispatcher's lock, so the
// previous call to dispatch should not have nested.
UNIT_TEST_CHECK(*count == original_count);
}
void sleep_increment(demuxer* d, int* count)
{
timer t(*d, timer::from_now, 2);
t.wait();
++(*count);
}
void start_sleep_increments(demuxer* d, locking_dispatcher* l, int* count)
{
// Give all threads a chance to start.
timer t(*d, timer::from_now, 2);
t.wait();
// Start three increments.
l->post(boost::bind(sleep_increment, d, count));
l->post(boost::bind(sleep_increment, d, count));
l->post(boost::bind(sleep_increment, d, count));
}
void locking_dispatcher_test()
{
demuxer d;
locking_dispatcher l(d);
int count = 0;
d.post(boost::bind(increment_without_lock, &l, &count));
// No handlers can be called until run() is called.
UNIT_TEST_CHECK(count == 0);
d.run();
// The run() call will not return until all work has finished.
UNIT_TEST_CHECK(count == 1);
count = 0;
d.reset();
l.post(boost::bind(increment_with_lock, &l, &count));
// No handlers can be called until run() is called.
UNIT_TEST_CHECK(count == 0);
d.run();
// The run() call will not return until all work has finished.
UNIT_TEST_CHECK(count == 1);
count = 0;
d.reset();
d.post(boost::bind(start_sleep_increments, &d, &l, &count));
detail::thread thread1(boost::bind(&demuxer::run, &d));
detail::thread thread2(boost::bind(&demuxer::run, &d));
// Check all events run one after another even though there are two threads.
timer timer1(d, timer::from_now, 3);
timer1.wait();
UNIT_TEST_CHECK(count == 0);
timer1.set(timer::from_existing, 2);
timer1.wait();
UNIT_TEST_CHECK(count == 1);
timer1.set(timer::from_existing, 2);
timer1.wait();
UNIT_TEST_CHECK(count == 2);
thread1.join();
thread2.join();
// The run() calls will not return until all work has finished.
UNIT_TEST_CHECK(count == 3);
}
UNIT_TEST(locking_dispatcher_test)