Add a special null_buffers type that allows read and write operations to
be used to indicate the socket's readiness to read or write without blocking.
This commit is contained in:
parent
e88cdf3204
commit
2786514d4a
@ -378,6 +378,33 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
/// An implementation of both the ConstBufferSequence and MutableBufferSequence
|
||||
/// concepts to represent a null buffer sequence.
|
||||
class null_buffers
|
||||
{
|
||||
public:
|
||||
/// The type for each element in the list of buffers.
|
||||
typedef mutable_buffer value_type;
|
||||
|
||||
/// A random-access iterator type that may be used to read elements.
|
||||
typedef const mutable_buffer* const_iterator;
|
||||
|
||||
/// Get a random-access iterator to the first element.
|
||||
const_iterator begin() const
|
||||
{
|
||||
return &buf_;
|
||||
}
|
||||
|
||||
/// Get a random-access iterator for one past the last element.
|
||||
const_iterator end() const
|
||||
{
|
||||
return &buf_;
|
||||
}
|
||||
|
||||
private:
|
||||
mutable_buffer buf_;
|
||||
};
|
||||
|
||||
#if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
|
||||
namespace detail {
|
||||
|
||||
|
@ -24,6 +24,8 @@
|
||||
#include <boost/iterator/iterator_facade.hpp>
|
||||
#include "asio/detail/pop_options.hpp"
|
||||
|
||||
#include "asio/buffer.hpp"
|
||||
|
||||
namespace asio {
|
||||
namespace detail {
|
||||
|
||||
@ -197,6 +199,24 @@ private:
|
||||
typename Buffers::const_iterator begin_remainder_;
|
||||
};
|
||||
|
||||
// Specialisation for null_buffers to ensure that the null_buffers type is
|
||||
// always passed through to the underlying read or write operation.
|
||||
template <typename Buffer>
|
||||
class consuming_buffers<Buffer, asio::null_buffers>
|
||||
: public asio::null_buffers
|
||||
{
|
||||
public:
|
||||
consuming_buffers(const asio::null_buffers&)
|
||||
{
|
||||
// No-op.
|
||||
}
|
||||
|
||||
void consume(std::size_t)
|
||||
{
|
||||
// No-op.
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace detail
|
||||
} // namespace asio
|
||||
|
||||
|
@ -127,16 +127,18 @@ public:
|
||||
// Start a new read operation. The handler object will be invoked when the
|
||||
// given descriptor is ready to be read, or an error has occurred.
|
||||
template <typename Handler>
|
||||
void start_read_op(socket_type descriptor, Handler handler)
|
||||
void start_read_op(socket_type descriptor, Handler handler,
|
||||
bool allow_speculative_read = true)
|
||||
{
|
||||
asio::detail::mutex::scoped_lock lock(mutex_);
|
||||
|
||||
if (shutdown_)
|
||||
return;
|
||||
|
||||
if (!read_op_queue_.has_operation(descriptor))
|
||||
if (handler(asio::error_code()))
|
||||
return;
|
||||
if (allow_speculative_read)
|
||||
if (!read_op_queue_.has_operation(descriptor))
|
||||
if (handler(asio::error_code()))
|
||||
return;
|
||||
|
||||
if (read_op_queue_.enqueue_operation(descriptor, handler))
|
||||
{
|
||||
@ -153,16 +155,18 @@ public:
|
||||
// Start a new write operation. The handler object will be invoked when the
|
||||
// given descriptor is ready to be written, or an error has occurred.
|
||||
template <typename Handler>
|
||||
void start_write_op(socket_type descriptor, Handler handler)
|
||||
void start_write_op(socket_type descriptor, Handler handler,
|
||||
bool allow_speculative_write = true)
|
||||
{
|
||||
asio::detail::mutex::scoped_lock lock(mutex_);
|
||||
|
||||
if (shutdown_)
|
||||
return;
|
||||
|
||||
if (!write_op_queue_.has_operation(descriptor))
|
||||
if (handler(asio::error_code()))
|
||||
return;
|
||||
if (allow_speculative_write)
|
||||
if (!write_op_queue_.has_operation(descriptor))
|
||||
if (handler(asio::error_code()))
|
||||
return;
|
||||
|
||||
if (write_op_queue_.enqueue_operation(descriptor, handler))
|
||||
{
|
||||
|
@ -134,14 +134,17 @@ public:
|
||||
// Start a new read operation. The handler object will be invoked when the
|
||||
// given descriptor is ready to be read, or an error has occurred.
|
||||
template <typename Handler>
|
||||
void start_read_op(socket_type descriptor, Handler handler)
|
||||
void start_read_op(socket_type descriptor, Handler handler,
|
||||
bool allow_speculative_read = true)
|
||||
{
|
||||
asio::detail::mutex::scoped_lock lock(mutex_);
|
||||
|
||||
if (shutdown_)
|
||||
return;
|
||||
|
||||
if (!read_op_queue_.has_operation(descriptor))
|
||||
if (!allow_speculative_read)
|
||||
need_epoll_wait_ = true;
|
||||
else if (!read_op_queue_.has_operation(descriptor))
|
||||
if (handler(asio::error_code()))
|
||||
return;
|
||||
|
||||
@ -170,14 +173,17 @@ public:
|
||||
// Start a new write operation. The handler object will be invoked when the
|
||||
// given descriptor is ready to be written, or an error has occurred.
|
||||
template <typename Handler>
|
||||
void start_write_op(socket_type descriptor, Handler handler)
|
||||
void start_write_op(socket_type descriptor, Handler handler,
|
||||
bool allow_speculative_write = true)
|
||||
{
|
||||
asio::detail::mutex::scoped_lock lock(mutex_);
|
||||
|
||||
if (shutdown_)
|
||||
return;
|
||||
|
||||
if (!write_op_queue_.has_operation(descriptor))
|
||||
if (!allow_speculative_write)
|
||||
need_epoll_wait_ = true;
|
||||
else if (!write_op_queue_.has_operation(descriptor))
|
||||
if (handler(asio::error_code()))
|
||||
return;
|
||||
|
||||
|
@ -134,14 +134,17 @@ public:
|
||||
// Start a new read operation. The handler object will be invoked when the
|
||||
// given descriptor is ready to be read, or an error has occurred.
|
||||
template <typename Handler>
|
||||
void start_read_op(socket_type descriptor, Handler handler)
|
||||
void start_read_op(socket_type descriptor, Handler handler,
|
||||
bool allow_speculative_read = true)
|
||||
{
|
||||
asio::detail::mutex::scoped_lock lock(mutex_);
|
||||
|
||||
if (shutdown_)
|
||||
return;
|
||||
|
||||
if (!read_op_queue_.has_operation(descriptor))
|
||||
if (!allow_speculative_read)
|
||||
need_kqueue_wait_ = true;
|
||||
else if (!read_op_queue_.has_operation(descriptor))
|
||||
if (handler(asio::error_code()))
|
||||
return;
|
||||
|
||||
@ -161,14 +164,17 @@ public:
|
||||
// Start a new write operation. The handler object will be invoked when the
|
||||
// given descriptor is ready to be written, or an error has occurred.
|
||||
template <typename Handler>
|
||||
void start_write_op(socket_type descriptor, Handler handler)
|
||||
void start_write_op(socket_type descriptor, Handler handler,
|
||||
bool allow_speculative_write = true)
|
||||
{
|
||||
asio::detail::mutex::scoped_lock lock(mutex_);
|
||||
|
||||
if (shutdown_)
|
||||
return;
|
||||
|
||||
if (!write_op_queue_.has_operation(descriptor))
|
||||
if (!allow_speculative_write)
|
||||
need_kqueue_wait_ = true;
|
||||
else if (!write_op_queue_.has_operation(descriptor))
|
||||
if (handler(asio::error_code()))
|
||||
return;
|
||||
|
||||
|
@ -558,6 +558,22 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until data can be sent without blocking.
|
||||
size_t send(implementation_type& impl, const null_buffers&,
|
||||
socket_base::message_flags, asio::error_code& ec)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
ec = asio::error::bad_descriptor;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Wait for socket to become ready.
|
||||
socket_ops::poll_write(impl.socket_, ec);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
template <typename ConstBufferSequence, typename Handler>
|
||||
class send_handler
|
||||
{
|
||||
@ -672,6 +688,45 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Handler>
|
||||
class null_buffers_handler
|
||||
{
|
||||
public:
|
||||
null_buffers_handler(asio::io_service& io_service, Handler handler)
|
||||
: work_(io_service),
|
||||
handler_(handler)
|
||||
{
|
||||
}
|
||||
|
||||
bool operator()(const asio::error_code& result)
|
||||
{
|
||||
work_.get_io_service().post(bind_handler(handler_, result, 0));
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
asio::io_service::work work_;
|
||||
Handler handler_;
|
||||
};
|
||||
|
||||
// Start an asynchronous wait until data can be sent without blocking.
|
||||
template <typename Handler>
|
||||
void async_send(implementation_type& impl, const null_buffers&,
|
||||
socket_base::message_flags, Handler handler)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
this->get_io_service().post(bind_handler(handler,
|
||||
asio::error::bad_descriptor, 0));
|
||||
}
|
||||
else
|
||||
{
|
||||
reactor_.start_write_op(impl.socket_,
|
||||
null_buffers_handler<Handler>(this->get_io_service(), handler),
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
||||
// Send a datagram to the specified endpoint. Returns the number of bytes
|
||||
// sent.
|
||||
template <typename ConstBufferSequence>
|
||||
@ -733,6 +788,23 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until data can be sent without blocking.
|
||||
size_t send_to(implementation_type& impl, const null_buffers&,
|
||||
socket_base::message_flags, const endpoint_type&,
|
||||
asio::error_code& ec)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
ec = asio::error::bad_descriptor;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Wait for socket to become ready.
|
||||
socket_ops::poll_write(impl.socket_, ec);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
template <typename ConstBufferSequence, typename Handler>
|
||||
class send_to_handler
|
||||
{
|
||||
@ -831,6 +903,24 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Start an asynchronous wait until data can be sent without blocking.
|
||||
template <typename Handler>
|
||||
void async_send_to(implementation_type& impl, const null_buffers&,
|
||||
socket_base::message_flags, const endpoint_type&, Handler handler)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
this->get_io_service().post(bind_handler(handler,
|
||||
asio::error::bad_descriptor, 0));
|
||||
}
|
||||
else
|
||||
{
|
||||
reactor_.start_write_op(impl.socket_,
|
||||
null_buffers_handler<Handler>(this->get_io_service(), handler),
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
||||
// Receive some data from the peer. Returns the number of bytes received.
|
||||
template <typename MutableBufferSequence>
|
||||
size_t receive(implementation_type& impl,
|
||||
@ -906,6 +996,23 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until data can be received without blocking.
|
||||
size_t receive(implementation_type& impl,
|
||||
const null_buffers& buffers,
|
||||
socket_base::message_flags, asio::error_code& ec)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
ec = asio::error::bad_descriptor;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Wait for socket to become ready.
|
||||
socket_ops::poll_read(impl.socket_, ec);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
template <typename MutableBufferSequence, typename Handler>
|
||||
class receive_handler
|
||||
{
|
||||
@ -1032,6 +1139,29 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until data can be received without blocking.
|
||||
template <typename Handler>
|
||||
void async_receive(implementation_type& impl, const null_buffers&,
|
||||
socket_base::message_flags flags, Handler handler)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
this->get_io_service().post(bind_handler(handler,
|
||||
asio::error::bad_descriptor, 0));
|
||||
}
|
||||
else if (flags & socket_base::message_out_of_band)
|
||||
{
|
||||
reactor_.start_except_op(impl.socket_,
|
||||
null_buffers_handler<Handler>(this->get_io_service(), handler));
|
||||
}
|
||||
else
|
||||
{
|
||||
reactor_.start_read_op(impl.socket_,
|
||||
null_buffers_handler<Handler>(this->get_io_service(), handler),
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
||||
// Receive a datagram with the endpoint of the sender. Returns the number of
|
||||
// bytes received.
|
||||
template <typename MutableBufferSequence>
|
||||
@ -1105,6 +1235,26 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until data can be received without blocking.
|
||||
size_t receive_from(implementation_type& impl,
|
||||
const null_buffers& buffers, endpoint_type& sender_endpoint,
|
||||
socket_base::message_flags, asio::error_code& ec)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
ec = asio::error::bad_descriptor;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Wait for socket to become ready.
|
||||
socket_ops::poll_read(impl.socket_, ec);
|
||||
|
||||
// Reset endpoint since it can be given no sensible value at this time.
|
||||
sender_endpoint = endpoint_type();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
template <typename MutableBufferSequence, typename Handler>
|
||||
class receive_from_handler
|
||||
{
|
||||
@ -1208,6 +1358,36 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until data can be received without blocking.
|
||||
template <typename Handler>
|
||||
void async_receive_from(implementation_type& impl,
|
||||
const null_buffers&, endpoint_type& sender_endpoint,
|
||||
socket_base::message_flags flags, Handler handler)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
this->get_io_service().post(bind_handler(handler,
|
||||
asio::error::bad_descriptor, 0));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Reset endpoint since it can be given no sensible value at this time.
|
||||
sender_endpoint = endpoint_type();
|
||||
|
||||
if (flags & socket_base::message_out_of_band)
|
||||
{
|
||||
reactor_.start_except_op(impl.socket_,
|
||||
null_buffers_handler<Handler>(this->get_io_service(), handler));
|
||||
}
|
||||
else
|
||||
{
|
||||
reactor_.start_read_op(impl.socket_,
|
||||
null_buffers_handler<Handler>(this->get_io_service(), handler),
|
||||
false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Accept a new connection.
|
||||
template <typename Socket>
|
||||
asio::error_code accept(implementation_type& impl,
|
||||
|
@ -114,7 +114,8 @@ public:
|
||||
// Start a new read operation. The handler object will be invoked when the
|
||||
// given descriptor is ready to be read, or an error has occurred.
|
||||
template <typename Handler>
|
||||
void start_read_op(socket_type descriptor, Handler handler)
|
||||
void start_read_op(socket_type descriptor, Handler handler,
|
||||
bool /*allow_speculative_read*/ = true)
|
||||
{
|
||||
asio::detail::mutex::scoped_lock lock(mutex_);
|
||||
if (!shutdown_)
|
||||
@ -125,7 +126,8 @@ public:
|
||||
// Start a new write operation. The handler object will be invoked when the
|
||||
// given descriptor is ready to be written, or an error has occurred.
|
||||
template <typename Handler>
|
||||
void start_write_op(socket_type descriptor, Handler handler)
|
||||
void start_write_op(socket_type descriptor, Handler handler,
|
||||
bool /*allow_speculative_write*/ = true)
|
||||
{
|
||||
asio::detail::mutex::scoped_lock lock(mutex_);
|
||||
if (!shutdown_)
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "asio/detail/push_options.hpp"
|
||||
#include <cstring>
|
||||
#include <boost/shared_ptr.hpp>
|
||||
#include <boost/type_traits/is_same.hpp>
|
||||
#include <boost/weak_ptr.hpp>
|
||||
#include "asio/detail/pop_options.hpp"
|
||||
|
||||
@ -699,6 +700,22 @@ public:
|
||||
return bytes_transferred;
|
||||
}
|
||||
|
||||
// Wait until data can be sent without blocking.
|
||||
size_t send(implementation_type& impl, const null_buffers&,
|
||||
socket_base::message_flags, asio::error_code& ec)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
ec = asio::error::bad_descriptor;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Wait for socket to become ready.
|
||||
socket_ops::poll_write(impl.socket_, ec);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
template <typename ConstBufferSequence, typename Handler>
|
||||
class send_operation
|
||||
: public operation
|
||||
@ -857,6 +874,57 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Handler>
|
||||
class null_buffers_handler
|
||||
{
|
||||
public:
|
||||
null_buffers_handler(asio::io_service& io_service, Handler handler)
|
||||
: work_(io_service),
|
||||
handler_(handler)
|
||||
{
|
||||
}
|
||||
|
||||
bool operator()(const asio::error_code& result)
|
||||
{
|
||||
work_.get_io_service().post(bind_handler(handler_, result, 0));
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
asio::io_service::work work_;
|
||||
Handler handler_;
|
||||
};
|
||||
|
||||
// Start an asynchronous wait until data can be sent without blocking.
|
||||
template <typename Handler>
|
||||
void async_send(implementation_type& impl, const null_buffers&,
|
||||
socket_base::message_flags, Handler handler)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
this->get_io_service().post(bind_handler(handler,
|
||||
asio::error::bad_descriptor, 0));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Check if the reactor was already obtained from the io_service.
|
||||
reactor_type* reactor = static_cast<reactor_type*>(
|
||||
interlocked_compare_exchange_pointer(
|
||||
reinterpret_cast<void**>(&reactor_), 0, 0));
|
||||
if (!reactor)
|
||||
{
|
||||
reactor = &(asio::use_service<reactor_type>(
|
||||
this->get_io_service()));
|
||||
interlocked_exchange_pointer(
|
||||
reinterpret_cast<void**>(&reactor_), reactor);
|
||||
}
|
||||
|
||||
reactor->start_write_op(impl.socket_,
|
||||
null_buffers_handler<Handler>(this->get_io_service(), handler),
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
||||
// Send a datagram to the specified endpoint. Returns the number of bytes
|
||||
// sent.
|
||||
template <typename ConstBufferSequence>
|
||||
@ -901,6 +969,23 @@ public:
|
||||
return bytes_transferred;
|
||||
}
|
||||
|
||||
// Wait until data can be sent without blocking.
|
||||
size_t send_to(implementation_type& impl, const null_buffers&,
|
||||
socket_base::message_flags, const endpoint_type&,
|
||||
asio::error_code& ec)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
ec = asio::error::bad_descriptor;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Wait for socket to become ready.
|
||||
socket_ops::poll_write(impl.socket_, ec);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
template <typename ConstBufferSequence, typename Handler>
|
||||
class send_to_operation
|
||||
: public operation
|
||||
@ -1037,6 +1122,36 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Start an asynchronous wait until data can be sent without blocking.
|
||||
template <typename Handler>
|
||||
void async_send_to(implementation_type& impl, const null_buffers&,
|
||||
socket_base::message_flags, const endpoint_type&, Handler handler)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
this->get_io_service().post(bind_handler(handler,
|
||||
asio::error::bad_descriptor, 0));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Check if the reactor was already obtained from the io_service.
|
||||
reactor_type* reactor = static_cast<reactor_type*>(
|
||||
interlocked_compare_exchange_pointer(
|
||||
reinterpret_cast<void**>(&reactor_), 0, 0));
|
||||
if (!reactor)
|
||||
{
|
||||
reactor = &(asio::use_service<reactor_type>(
|
||||
this->get_io_service()));
|
||||
interlocked_exchange_pointer(
|
||||
reinterpret_cast<void**>(&reactor_), reactor);
|
||||
}
|
||||
|
||||
reactor->start_write_op(impl.socket_,
|
||||
null_buffers_handler<Handler>(this->get_io_service(), handler),
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
||||
// Receive some data from the peer. Returns the number of bytes received.
|
||||
template <typename MutableBufferSequence>
|
||||
size_t receive(implementation_type& impl,
|
||||
@ -1096,6 +1211,23 @@ public:
|
||||
return bytes_transferred;
|
||||
}
|
||||
|
||||
// Wait until data can be received without blocking.
|
||||
size_t receive(implementation_type& impl,
|
||||
const null_buffers& buffers,
|
||||
socket_base::message_flags, asio::error_code& ec)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
ec = asio::error::bad_descriptor;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Wait for socket to become ready.
|
||||
socket_ops::poll_read(impl.socket_, ec);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
template <typename MutableBufferSequence, typename Handler>
|
||||
class receive_operation
|
||||
: public operation
|
||||
@ -1156,7 +1288,8 @@ public:
|
||||
}
|
||||
|
||||
// Check for connection closed.
|
||||
else if (!ec && bytes_transferred == 0)
|
||||
else if (!ec && bytes_transferred == 0
|
||||
&& !boost::is_same<MutableBufferSequence, null_buffers>::value)
|
||||
{
|
||||
ec = asio::error::eof;
|
||||
}
|
||||
@ -1261,6 +1394,84 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until data can be received without blocking.
|
||||
template <typename Handler>
|
||||
void async_receive(implementation_type& impl, const null_buffers& buffers,
|
||||
socket_base::message_flags flags, Handler handler)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
this->get_io_service().post(bind_handler(handler,
|
||||
asio::error::bad_descriptor, 0));
|
||||
}
|
||||
else if (impl.protocol_.type() == SOCK_STREAM)
|
||||
{
|
||||
// For stream sockets on Windows, we may issue a 0-byte overlapped
|
||||
// WSARecv to wait until there is data available on the socket.
|
||||
|
||||
#if defined(ASIO_ENABLE_CANCELIO)
|
||||
// Update the ID of the thread from which cancellation is safe.
|
||||
if (impl.safe_cancellation_thread_id_ == 0)
|
||||
impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
|
||||
else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
|
||||
impl.safe_cancellation_thread_id_ = ~DWORD(0);
|
||||
#endif // defined(ASIO_ENABLE_CANCELIO)
|
||||
|
||||
// Allocate and construct an operation to wrap the handler.
|
||||
typedef receive_operation<null_buffers, Handler> value_type;
|
||||
typedef handler_alloc_traits<Handler, value_type> alloc_traits;
|
||||
raw_handler_ptr<alloc_traits> raw_ptr(handler);
|
||||
handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
|
||||
impl.cancel_token_, buffers, handler);
|
||||
|
||||
// Issue a receive operation with an empty buffer.
|
||||
::WSABUF buf = { 0, 0 };
|
||||
DWORD bytes_transferred = 0;
|
||||
DWORD recv_flags = flags;
|
||||
int result = ::WSARecv(impl.socket_, &buf, 1,
|
||||
&bytes_transferred, &recv_flags, ptr.get(), 0);
|
||||
DWORD last_error = ::WSAGetLastError();
|
||||
if (result != 0 && last_error != WSA_IO_PENDING)
|
||||
{
|
||||
asio::io_service::work work(this->get_io_service());
|
||||
ptr.reset();
|
||||
asio::error_code ec(last_error,
|
||||
asio::error::get_system_category());
|
||||
iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
|
||||
}
|
||||
else
|
||||
{
|
||||
ptr.release();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Check if the reactor was already obtained from the io_service.
|
||||
reactor_type* reactor = static_cast<reactor_type*>(
|
||||
interlocked_compare_exchange_pointer(
|
||||
reinterpret_cast<void**>(&reactor_), 0, 0));
|
||||
if (!reactor)
|
||||
{
|
||||
reactor = &(asio::use_service<reactor_type>(
|
||||
this->get_io_service()));
|
||||
interlocked_exchange_pointer(
|
||||
reinterpret_cast<void**>(&reactor_), reactor);
|
||||
}
|
||||
|
||||
if (flags & socket_base::message_out_of_band)
|
||||
{
|
||||
reactor->start_except_op(impl.socket_,
|
||||
null_buffers_handler<Handler>(this->get_io_service(), handler));
|
||||
}
|
||||
else
|
||||
{
|
||||
reactor->start_read_op(impl.socket_,
|
||||
null_buffers_handler<Handler>(this->get_io_service(), handler),
|
||||
false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Receive a datagram with the endpoint of the sender. Returns the number of
|
||||
// bytes received.
|
||||
template <typename MutableBufferSequence>
|
||||
@ -1314,6 +1525,26 @@ public:
|
||||
return bytes_transferred;
|
||||
}
|
||||
|
||||
// Wait until data can be received without blocking.
|
||||
size_t receive_from(implementation_type& impl,
|
||||
const null_buffers& buffers, endpoint_type& sender_endpoint,
|
||||
socket_base::message_flags, asio::error_code& ec)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
ec = asio::error::bad_descriptor;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Wait for socket to become ready.
|
||||
socket_ops::poll_read(impl.socket_, ec);
|
||||
|
||||
// Reset endpoint since it can be given no sensible value at this time.
|
||||
sender_endpoint = endpoint_type();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
template <typename MutableBufferSequence, typename Handler>
|
||||
class receive_from_operation
|
||||
: public operation
|
||||
@ -1472,6 +1703,48 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until data can be received without blocking.
|
||||
template <typename Handler>
|
||||
void async_receive_from(implementation_type& impl,
|
||||
const null_buffers&, endpoint_type& sender_endpoint,
|
||||
socket_base::message_flags flags, Handler handler)
|
||||
{
|
||||
if (!is_open(impl))
|
||||
{
|
||||
this->get_io_service().post(bind_handler(handler,
|
||||
asio::error::bad_descriptor, 0));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Check if the reactor was already obtained from the io_service.
|
||||
reactor_type* reactor = static_cast<reactor_type*>(
|
||||
interlocked_compare_exchange_pointer(
|
||||
reinterpret_cast<void**>(&reactor_), 0, 0));
|
||||
if (!reactor)
|
||||
{
|
||||
reactor = &(asio::use_service<reactor_type>(
|
||||
this->get_io_service()));
|
||||
interlocked_exchange_pointer(
|
||||
reinterpret_cast<void**>(&reactor_), reactor);
|
||||
}
|
||||
|
||||
// Reset endpoint since it can be given no sensible value at this time.
|
||||
sender_endpoint = endpoint_type();
|
||||
|
||||
if (flags & socket_base::message_out_of_band)
|
||||
{
|
||||
reactor->start_except_op(impl.socket_,
|
||||
null_buffers_handler<Handler>(this->get_io_service(), handler));
|
||||
}
|
||||
else
|
||||
{
|
||||
reactor->start_read_op(impl.socket_,
|
||||
null_buffers_handler<Handler>(this->get_io_service(), handler),
|
||||
false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Accept a new connection.
|
||||
template <typename Socket>
|
||||
asio::error_code accept(implementation_type& impl, Socket& peer,
|
||||
|
7
asio/src/examples/nonblocking/.cvsignore
Normal file
7
asio/src/examples/nonblocking/.cvsignore
Normal file
@ -0,0 +1,7 @@
|
||||
.deps
|
||||
.dirstamp
|
||||
*.exe
|
||||
server
|
||||
*.ilk
|
||||
*.pdb
|
||||
*.tds
|
241
asio/src/examples/nonblocking/third_party_lib.cpp
Normal file
241
asio/src/examples/nonblocking/third_party_lib.cpp
Normal file
@ -0,0 +1,241 @@
|
||||
//
|
||||
// third_party_lib.cpp
|
||||
// ~~~~~~~~~~~~~~~~~~~
|
||||
//
|
||||
// Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
|
||||
//
|
||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||
//
|
||||
|
||||
#include <asio.hpp>
|
||||
#include <boost/array.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
#include <boost/shared_ptr.hpp>
|
||||
#include <boost/enable_shared_from_this.hpp>
|
||||
|
||||
using asio::ip::tcp;
|
||||
|
||||
namespace third_party_lib {
|
||||
|
||||
// Simulation of a third party library that wants to perform read and write
|
||||
// operations directly on a socket. It needs to be polled to determine whether
|
||||
// it requires a read or write operation, and notified when the socket is ready
|
||||
// for reading or writing.
|
||||
class session
|
||||
{
|
||||
public:
|
||||
session(tcp::socket& socket)
|
||||
: socket_(socket),
|
||||
state_(reading)
|
||||
{
|
||||
}
|
||||
|
||||
// Returns true if the third party library wants to be notified when the
|
||||
// socket is ready for reading.
|
||||
bool want_read() const
|
||||
{
|
||||
return state_ == reading;
|
||||
}
|
||||
|
||||
// Notify that third party library that it should perform its read operation.
|
||||
void do_read(asio::error_code& ec)
|
||||
{
|
||||
if (std::size_t len = socket_.read_some(asio::buffer(data_), ec))
|
||||
{
|
||||
write_buffer_ = asio::buffer(data_, len);
|
||||
state_ = writing;
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if the third party library wants to be notified when the
|
||||
// socket is ready for writing.
|
||||
bool want_write() const
|
||||
{
|
||||
return state_ == writing;
|
||||
}
|
||||
|
||||
// Notify that third party library that it should perform its write operation.
|
||||
void do_write(asio::error_code& ec)
|
||||
{
|
||||
if (std::size_t len = socket_.write_some(
|
||||
asio::buffer(write_buffer_), ec))
|
||||
{
|
||||
write_buffer_ = write_buffer_ + len;
|
||||
state_ = asio::buffer_size(write_buffer_) > 0 ? writing : reading;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
tcp::socket& socket_;
|
||||
enum { reading, writing } state_;
|
||||
boost::array<char, 128> data_;
|
||||
asio::const_buffer write_buffer_;
|
||||
};
|
||||
|
||||
} // namespace third_party_lib
|
||||
|
||||
// The glue between asio's sockets and the third party library.
|
||||
class connection
|
||||
: public boost::enable_shared_from_this<connection>
|
||||
{
|
||||
public:
|
||||
typedef boost::shared_ptr<connection> pointer;
|
||||
|
||||
static pointer create(asio::io_service& io_service)
|
||||
{
|
||||
return pointer(new connection(io_service));
|
||||
}
|
||||
|
||||
tcp::socket& socket()
|
||||
{
|
||||
return socket_;
|
||||
}
|
||||
|
||||
void start()
|
||||
{
|
||||
// Put the socket into non-blocking mode.
|
||||
tcp::socket::non_blocking_io non_blocking_io(true);
|
||||
socket_.io_control(non_blocking_io);
|
||||
|
||||
start_operations();
|
||||
}
|
||||
|
||||
private:
|
||||
connection(asio::io_service& io_service)
|
||||
: socket_(io_service),
|
||||
session_impl_(socket_),
|
||||
read_in_progress_(false),
|
||||
write_in_progress_(false)
|
||||
{
|
||||
}
|
||||
|
||||
void start_operations()
|
||||
{
|
||||
// Start a read operation if the third party library wants one.
|
||||
if (session_impl_.want_read() && !read_in_progress_)
|
||||
{
|
||||
read_in_progress_ = true;
|
||||
socket_.async_read_some(
|
||||
asio::null_buffers(),
|
||||
boost::bind(&connection::handle_read,
|
||||
shared_from_this(),
|
||||
asio::placeholders::error));
|
||||
}
|
||||
|
||||
// Start a write operation if the third party library wants one.
|
||||
if (session_impl_.want_write() && !write_in_progress_)
|
||||
{
|
||||
write_in_progress_ = true;
|
||||
socket_.async_write_some(
|
||||
asio::null_buffers(),
|
||||
boost::bind(&connection::handle_write,
|
||||
shared_from_this(),
|
||||
asio::placeholders::error));
|
||||
}
|
||||
}
|
||||
|
||||
void handle_read(asio::error_code ec)
|
||||
{
|
||||
read_in_progress_ = false;
|
||||
|
||||
// Notify third party library that it can perform a read.
|
||||
if (!ec)
|
||||
session_impl_.do_read(ec);
|
||||
|
||||
// The third party library successfully performed a read on the socket.
|
||||
// Start new read or write operations based on what it now wants.
|
||||
if (!ec || ec == asio::error::would_block)
|
||||
start_operations();
|
||||
|
||||
// Otherwise, an error occurred. Closing the socket cancels any outstanding
|
||||
// asynchronous read or write operations. The connection object will be
|
||||
// destroyed once automatically once those outstanding operations complete.
|
||||
else
|
||||
socket_.close();
|
||||
}
|
||||
|
||||
void handle_write(asio::error_code ec)
|
||||
{
|
||||
write_in_progress_ = false;
|
||||
|
||||
// Notify third party library that it can perform a write.
|
||||
if (!ec)
|
||||
session_impl_.do_write(ec);
|
||||
|
||||
// The third party library successfully performed a write on the socket.
|
||||
// Start new read or write operations based on what it now wants.
|
||||
if (!ec || ec == asio::error::would_block)
|
||||
start_operations();
|
||||
|
||||
// Otherwise, an error occurred. Closing the socket cancels any outstanding
|
||||
// asynchronous read or write operations. The connection object will be
|
||||
// destroyed once automatically once those outstanding operations complete.
|
||||
else
|
||||
socket_.close();
|
||||
}
|
||||
|
||||
private:
|
||||
tcp::socket socket_;
|
||||
third_party_lib::session session_impl_;
|
||||
bool read_in_progress_;
|
||||
bool write_in_progress_;
|
||||
};
|
||||
|
||||
class server
|
||||
{
|
||||
public:
|
||||
server(asio::io_service& io_service, unsigned short port)
|
||||
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
|
||||
{
|
||||
start_accept();
|
||||
}
|
||||
|
||||
private:
|
||||
void start_accept()
|
||||
{
|
||||
connection::pointer new_connection =
|
||||
connection::create(acceptor_.io_service());
|
||||
|
||||
acceptor_.async_accept(new_connection->socket(),
|
||||
boost::bind(&server::handle_accept, this, new_connection,
|
||||
asio::placeholders::error));
|
||||
}
|
||||
|
||||
void handle_accept(connection::pointer new_connection,
|
||||
const asio::error_code& error)
|
||||
{
|
||||
if (!error)
|
||||
{
|
||||
new_connection->start();
|
||||
start_accept();
|
||||
}
|
||||
}
|
||||
|
||||
tcp::acceptor acceptor_;
|
||||
};
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
try
|
||||
{
|
||||
if (argc != 2)
|
||||
{
|
||||
std::cerr << "Usage: third_party_lib <port>\n";
|
||||
return 1;
|
||||
}
|
||||
|
||||
asio::io_service io_service;
|
||||
|
||||
using namespace std; // For atoi.
|
||||
server s(io_service, atoi(argv[1]));
|
||||
|
||||
io_service.run();
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
std::cerr << "Exception: " << e.what() << "\n";
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue
Block a user