From 8a3d80d22c4209b271972a9ff108b1dc965bc67c Mon Sep 17 00:00:00 2001 From: chris Date: Sat, 17 Jan 2004 04:40:20 +0000 Subject: [PATCH] Added buffering implementation for send operations. --- asio/include/asio/buffered_send_stream.hpp | 166 ++++++++++++++++++++- asio/include/asio/buffered_stream.hpp | 31 ++++ 2 files changed, 192 insertions(+), 5 deletions(-) diff --git a/asio/include/asio/buffered_send_stream.hpp b/asio/include/asio/buffered_send_stream.hpp index 45227e5a..6b7ee317 100644 --- a/asio/include/asio/buffered_send_stream.hpp +++ b/asio/include/asio/buffered_send_stream.hpp @@ -18,10 +18,12 @@ #include "asio/detail/push_options.hpp" #include "asio/detail/push_options.hpp" +#include #include #include #include "asio/detail/pop_options.hpp" +#include "asio/send.hpp" #include "asio/fixed_buffer.hpp" namespace asio { @@ -36,7 +38,8 @@ public: /// Construct, passing the specified argument to initialise the next layer. template explicit buffered_send_stream(Arg& a) - : next_layer_(a) + : next_layer_(a), + buffer_() { } @@ -67,17 +70,92 @@ public: return next_layer_.demuxer(); } + /// The buffer type for this buffering layer. + typedef Buffer buffer_type; + + /// Get the recv buffer used by this buffering layer. + buffer_type& recv_buffer() + { + return buffer_; + } + /// Close the stream. void close() { next_layer_.close(); } + /// Flush all data from the buffer to the next layer. Returns the number of + /// bytes wrriten to the next layer on the last send operation, or 0 if the + /// underlying connection was closed. Throws an exception on failure. + size_t flush() + { + size_t total_bytes_sent = 0; + size_t last_bytes_sent = send_n(next_layer_, buffer_.begin(), + buffer_.size(), &total_bytes_sent); + buffer_.pop(total_bytes_sent); + return last_bytes_sent; + } + + /// Flush all data from the buffer to the next layer. Returns the number of + /// bytes wrriten to the next layer on the last send operation, or 0 if the + /// underlying connection was closed. + template + size_t flush(Error_Handler error_handler) + { + size_t total_bytes_sent = 0; + size_t last_bytes_sent = send_n(next_layer_, buffer_.begin(), + buffer_.size(), &total_bytes_sent, error_handler); + buffer_.pop(total_bytes_sent); + return last_bytes_sent; + } + + template + class flush_handler + { + public: + flush_handler(Buffer& buffer, Handler handler) + : buffer_(buffer), + handler_(handler) + { + } + + template + void operator()(const Error& e, size_t total_bytes_sent, + size_t last_bytes_sent) + { + buffer_.pop(total_bytes_sent); + handler_(e, last_bytes_sent); + } + + private: + Buffer& buffer_; + Handler handler_; + }; + + /// Start an asynchronous flush. + template + void async_flush(Handler handler) + { + async_send_n(next_layer_, buffer_.begin(), buffer_.size(), + flush_handler(buffer_, handler)); + } + + /// Start an asynchronous flush. + template + void async_flush(Handler handler, Completion_Context context) + { + async_send_n(next_layer_, buffer_.begin(), buffer_.size(), + flush_handler(buffer_, handler), context); + } + /// Send the given data to the peer. Returns the number of bytes sent or 0 if /// the stream was closed cleanly. Throws an exception on failure. size_t send(const void* data, size_t length) { - return next_layer_.send(data, length); + if (buffer_.size() == buffer_.capacity() && !flush()) + return 0; + return copy(data, length); } /// Send the given data to the peer. Returns the number of bytes sent or 0 if @@ -85,15 +163,65 @@ public: template size_t send(const void* data, size_t length, Error_Handler error_handler) { - return next_layer_.send(data, length, error_handler); + if (buffer_.size() == buffer_.capacity() && !flush(error_handler)) + return 0; + return copy(data, length); } + template + class send_handler + { + public: + send_handler(Buffer& buffer, const void* data, size_t length, + Handler handler) + : buffer_(buffer), + data_(data), + length_(length), + handler_(handler) + { + } + + template + void operator()(const Error& e, size_t bytes_sent) + { + if (e || bytes_sent == 0) + { + handler_(e, bytes_sent); + } + else + { + using namespace std; // For memcpy. + size_t orig_size = buffer_.size(); + size_t bytes_avail = buffer_.capacity() - orig_size; + size_t bytes_copied = (length_ < bytes_avail) ? length_ : bytes_avail; + buffer_.resize(orig_size + bytes_copied); + memcpy(buffer_.begin() + orig_size, data_, bytes_copied); + handler_(e, bytes_copied); + } + } + + private: + Buffer& buffer_; + const void* data_; + size_t length_; + Handler handler_; + }; + /// Start an asynchronous send. The data being sent must be valid for the /// lifetime of the asynchronous operation. template void async_send(const void* data, size_t length, Handler handler) { - next_layer_.async_send(data, length, handler); + if (buffer_.size() == buffer_.capacity()) + { + async_flush(send_handler(buffer_, data, length, handler)); + } + else + { + size_t bytes_copied = copy(data, length); + next_layer_.demuxer().operation_immediate( + detail::bind_handler(handler, 0, bytes_copied)); + } } /// Start an asynchronous send. The data being sent must be valid for the @@ -102,7 +230,17 @@ public: void async_send(const void* data, size_t length, Handler handler, Completion_Context context) { - next_layer_.async_send(data, length, handler, context); + if (buffer_.size() == buffer_.capacity()) + { + async_flush(send_handler(buffer_, data, length, handler), + context); + } + else + { + size_t bytes_copied = copy(data, length); + next_layer_.demuxer().operation_immediate( + detail::bind_handler(handler, 0, bytes_copied), context); + } } /// Receive some data from the peer. Returns the number of bytes received or @@ -138,8 +276,26 @@ public: } private: + /// Copy data into the internal buffer from the specified source buffer. + /// Returns the number of bytes copied. + size_t copy(const void* data, size_t length) + { + using namespace std; // For memcpy. + + size_t orig_size = buffer_.size(); + size_t bytes_avail = buffer_.capacity() - orig_size; + size_t bytes_copied = (length < bytes_avail) ? length : bytes_avail; + buffer_.resize(orig_size + bytes_copied); + memcpy(buffer_.begin() + orig_size, data, bytes_copied); + + return length; + } + /// The next layer. Next_Layer next_layer_; + + // The data in the buffer. + Buffer buffer_; }; } // namespace asio diff --git a/asio/include/asio/buffered_stream.hpp b/asio/include/asio/buffered_stream.hpp index 8d751538..2cefe585 100644 --- a/asio/include/asio/buffered_stream.hpp +++ b/asio/include/asio/buffered_stream.hpp @@ -83,6 +83,37 @@ public: stream_impl_.close(); } + /// Flush all data from the buffer to the next layer. Returns the number of + /// bytes wrriten to the next layer on the last send operation, or 0 if the + /// underlying connection was closed. Throws an exception on failure. + size_t flush() + { + return stream_impl_.next_layer().flush(); + } + + /// Flush all data from the buffer to the next layer. Returns the number of + /// bytes wrriten to the next layer on the last send operation, or 0 if the + /// underlying connection was closed. + template + size_t flush(Error_Handler error_handler) + { + return stream_impl_.next_layer().flush(error_handler); + } + + /// Start an asynchronous flush. + template + void async_flush(Handler handler) + { + return stream_impl_.next_layer().async_flush(handler); + } + + /// Start an asynchronous flush. + template + void async_flush(Handler handler, Completion_Context context) + { + return stream_impl_.next_layer().async_flush(handler, context); + } + /// Send the given data to the peer. Returns the number of bytes sent or 0 if /// the stream was closed cleanly. Throws an exception on failure. size_t send(const void* data, size_t length)