Treat accept errors as non-fatal.

This commit is contained in:
Christopher Kohlhoff 2011-05-24 18:42:22 +10:00
parent d12ffda0a9
commit 7470c7b6d9
21 changed files with 147 additions and 92 deletions

View File

@ -199,11 +199,12 @@ public:
if (!error)
{
new_session->start();
new_session.reset(new session(io_service_));
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
asio::placeholders::error));
}
new_session.reset(new session(io_service_));
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
asio::placeholders::error));
}
private:

View File

@ -92,11 +92,12 @@ public:
if (!error)
{
new_session->start();
new_session.reset(new session(io_service_));
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
asio::placeholders::error));
}
new_session.reset(new session(io_service_));
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
asio::placeholders::error));
}
private:

View File

@ -181,6 +181,11 @@ public:
const tcp::endpoint& endpoint)
: io_service_(io_service),
acceptor_(io_service, endpoint)
{
start_accept();
}
void start_accept()
{
chat_session_ptr new_session(new chat_session(io_service_, room_));
acceptor_.async_accept(new_session->socket(),
@ -194,11 +199,9 @@ public:
if (!error)
{
session->start();
chat_session_ptr new_session(new chat_session(io_service_, room_));
acceptor_.async_accept(new_session->socket(),
boost::bind(&chat_server::handle_accept, this, new_session,
asio::placeholders::error));
}
start_accept();
}
private:

View File

@ -36,6 +36,7 @@ public:
asio::placeholders::bytes_transferred));
}
private:
void handle_read(const asio::error_code& error,
size_t bytes_transferred)
{
@ -67,7 +68,6 @@ public:
}
}
private:
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
@ -79,6 +79,12 @@ public:
server(asio::io_service& io_service, short port)
: io_service_(io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
{
start_accept();
}
private:
void start_accept()
{
session* new_session = new session(io_service_);
acceptor_.async_accept(new_session->socket(),
@ -92,18 +98,15 @@ public:
if (!error)
{
new_session->start();
new_session = new session(io_service_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
asio::placeholders::error));
}
else
{
delete new_session;
}
start_accept();
}
private:
asio::io_service& io_service_;
tcp::acceptor acceptor_;
};

View File

@ -43,11 +43,16 @@ private:
void handle_signal_wait()
{
// Reap completed child processes so that we don't end up with zombies.
int status = 0;
while (waitpid(-1, &status, WNOHANG) > 0) {}
// Only the parent process should check for this signal. We can determine
// whether we are in the parent by checking if the acceptor is still open.
if (acceptor_.is_open())
{
// Reap completed child processes so that we don't end up with zombies.
int status = 0;
while (waitpid(-1, &status, WNOHANG) > 0) {}
start_signal_wait();
start_signal_wait();
}
}
void start_accept()
@ -76,6 +81,9 @@ private:
// acceptor. It remains open in the parent.
acceptor_.close();
// The child process is not interested in processing the SIGCHLD signal.
signal_.cancel();
start_read();
}
else
@ -93,6 +101,7 @@ private:
else
{
std::cerr << "Accept error: " << ec.message() << std::endl;
start_accept();
}
}

View File

@ -21,8 +21,7 @@ server::server(const std::string& address, const std::string& port,
signals_(io_service_),
acceptor_(io_service_),
connection_manager_(),
new_connection_(new connection(io_service_,
connection_manager_, request_handler_)),
new_connection_(),
request_handler_(doc_root)
{
// Register to handle the signals that indicate when the server should exit.
@ -43,9 +42,8 @@ server::server(const std::string& address, const std::string& port,
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
asio::placeholders::error));
start_accept();
}
void server::run()
@ -57,17 +55,30 @@ void server::run()
io_service_.run();
}
void server::start_accept()
{
new_connection_.reset(new connection(io_service_,
connection_manager_, request_handler_));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
asio::placeholders::error));
}
void server::handle_accept(const asio::error_code& e)
{
// Check whether the server was stopped by a signal before this completion
// handler had a chance to run.
if (!acceptor_.is_open())
{
return;
}
if (!e)
{
connection_manager_.start(new_connection_);
new_connection_.reset(new connection(io_service_,
connection_manager_, request_handler_));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
asio::placeholders::error));
}
start_accept();
}
void server::handle_stop()

View File

@ -35,6 +35,9 @@ public:
void run();
private:
/// Initiate an asynchronous accept operation.
void start_accept();
/// Handle completion of an asynchronous accept operation.
void handle_accept(const asio::error_code& e);

View File

@ -19,8 +19,7 @@ server::server(const std::string& address, const std::string& port,
: io_service_pool_(io_service_pool_size),
signals_(io_service_pool_.get_io_service()),
acceptor_(io_service_pool_.get_io_service()),
new_connection_(new connection(
io_service_pool_.get_io_service(), request_handler_)),
new_connection_(),
request_handler_(doc_root)
{
// Register to handle the signals that indicate when the server should exit.
@ -41,9 +40,8 @@ server::server(const std::string& address, const std::string& port,
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
asio::placeholders::error));
start_accept();
}
void server::run()
@ -51,17 +49,23 @@ void server::run()
io_service_pool_.run();
}
void server::start_accept()
{
new_connection_.reset(new connection(
io_service_pool_.get_io_service(), request_handler_));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
asio::placeholders::error));
}
void server::handle_accept(const asio::error_code& e)
{
if (!e)
{
new_connection_->start();
new_connection_.reset(new connection(
io_service_pool_.get_io_service(), request_handler_));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
asio::placeholders::error));
}
start_accept();
}
void server::handle_stop()

View File

@ -37,6 +37,9 @@ public:
void run();
private:
/// Initiate an asynchronous accept operation.
void start_accept();
/// Handle completion of an asynchronous accept operation.
void handle_accept(const asio::error_code& e);

View File

@ -21,7 +21,7 @@ server::server(const std::string& address, const std::string& port,
: thread_pool_size_(thread_pool_size),
signals_(io_service_),
acceptor_(io_service_),
new_connection_(new connection(io_service_, request_handler_)),
new_connection_(),
request_handler_(doc_root)
{
// Register to handle the signals that indicate when the server should exit.
@ -42,9 +42,8 @@ server::server(const std::string& address, const std::string& port,
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
asio::placeholders::error));
start_accept();
}
void server::run()
@ -63,16 +62,22 @@ void server::run()
threads[i]->join();
}
void server::start_accept()
{
new_connection_.reset(new connection(io_service_, request_handler_));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
asio::placeholders::error));
}
void server::handle_accept(const asio::error_code& e)
{
if (!e)
{
new_connection_->start();
new_connection_.reset(new connection(io_service_, request_handler_));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
asio::placeholders::error));
}
start_accept();
}
void server::handle_stop()

View File

@ -36,6 +36,9 @@ public:
void run();
private:
/// Initiate an asynchronous accept operation.
void start_accept();
/// Handle completion of an asynchronous accept operation.
void handle_accept(const asio::error_code& e);

View File

@ -34,8 +34,12 @@ int main()
for (;;)
{
tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
stream << make_daytime_string();
asio::error_code ec;
acceptor.accept(*stream.rdbuf(), ec);
if (!ec)
{
stream << make_daytime_string();
}
}
}
catch (std::exception& e)

View File

@ -208,8 +208,9 @@ private:
if (!error)
{
new_connection->start();
start_accept();
}
start_accept();
}
tcp::acceptor acceptor_;

View File

@ -56,13 +56,13 @@ public:
asio::async_read(*socket, request->to_buffers(),
boost::bind(&server::handle_control_request, this,
asio::placeholders::error, socket, request));
// Start waiting for a new control connection.
tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_io_service()));
acceptor_.async_accept(*new_socket,
boost::bind(&server::handle_accept, this,
asio::placeholders::error, new_socket));
}
// Start waiting for a new control connection.
tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_io_service()));
acceptor_.async_accept(*new_socket,
boost::bind(&server::handle_accept, this,
asio::placeholders::error, new_socket));
}
// Handle a new control request.

View File

@ -72,20 +72,13 @@ public:
conn->async_write(stocks_,
boost::bind(&server::handle_write, this,
asio::placeholders::error, conn));
}
// Start an accept operation for a new connection.
connection_ptr new_conn(new connection(acceptor_.get_io_service()));
acceptor_.async_accept(new_conn->socket(),
boost::bind(&server::handle_accept, this,
asio::placeholders::error, new_conn));
}
else
{
// An error occurred. Log it and return. Since we are not starting a new
// accept operation the io_service will run out of work to do and the
// server will exit.
std::cerr << e.message() << std::endl;
}
// Start an accept operation for a new connection.
connection_ptr new_conn(new connection(acceptor_.get_io_service()));
acceptor_.async_accept(new_conn->socket(),
boost::bind(&server::handle_accept, this,
asio::placeholders::error, new_conn));
}
/// Handle completion of a write operation.

View File

@ -107,10 +107,7 @@ public:
context_.use_private_key_file("server.pem", asio::ssl::context::pem);
context_.use_tmp_dh_file("dh512.pem");
session* new_session = new session(io_service_, context_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
asio::placeholders::error));
start_accept();
}
std::string get_password() const
@ -118,21 +115,27 @@ public:
return "test";
}
void start_accept()
{
session* new_session = new session(io_service_, context_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
asio::placeholders::error));
}
void handle_accept(session* new_session,
const asio::error_code& error)
{
if (!error)
{
new_session->start();
new_session = new session(io_service_, context_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
asio::placeholders::error));
}
else
{
delete new_session;
}
start_accept();
}
private:

View File

@ -365,6 +365,11 @@ public:
subscriber_ptr bc(new udp_broadcaster(io_service_, broadcast_endpoint));
channel_.join(bc);
start_accept();
}
void start_accept()
{
tcp_session_ptr new_session(new tcp_session(io_service_, channel_));
acceptor_.async_accept(new_session->socket(),
@ -377,12 +382,9 @@ public:
if (!ec)
{
session->start();
tcp_session_ptr new_session(new tcp_session(io_service_, channel_));
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session, _1));
}
start_accept();
}
private:

View File

@ -92,8 +92,9 @@ private:
if (!error)
{
new_connection->start();
start_accept();
}
start_accept();
}
tcp::acceptor acceptor_;

View File

@ -91,8 +91,9 @@ private:
if (!error)
{
new_connection->start();
start_accept();
}
start_accept();
}
tcp::acceptor acceptor_;

View File

@ -132,8 +132,9 @@ private:
if (!error)
{
new_connection->start();
start_accept();
}
start_accept();
}
tcp::acceptor acceptor_;

View File

@ -155,6 +155,11 @@ public:
acceptor_.bind(endpoint);
acceptor_.listen();
start_accept();
}
void start_accept()
{
session* new_session = new session(io_service_, block_size_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
@ -166,15 +171,13 @@ public:
if (!err)
{
new_session->start();
new_session = new session(io_service_, block_size_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
asio::placeholders::error));
}
else
{
delete new_session;
}
start_accept();
}
private: