Added buffering implementation for send operations.

This commit is contained in:
chris 2004-01-17 04:40:20 +00:00
parent 40e376e4b4
commit 8a3d80d22c
2 changed files with 192 additions and 5 deletions

View File

@ -18,10 +18,12 @@
#include "asio/detail/push_options.hpp"
#include "asio/detail/push_options.hpp"
#include <cstring>
#include <boost/noncopyable.hpp>
#include <boost/type_traits.hpp>
#include "asio/detail/pop_options.hpp"
#include "asio/send.hpp"
#include "asio/fixed_buffer.hpp"
namespace asio {
@ -36,7 +38,8 @@ public:
/// Construct, passing the specified argument to initialise the next layer.
template <typename Arg>
explicit buffered_send_stream(Arg& a)
: next_layer_(a)
: next_layer_(a),
buffer_()
{
}
@ -67,17 +70,92 @@ public:
return next_layer_.demuxer();
}
/// The buffer type for this buffering layer.
typedef Buffer buffer_type;
/// Get the recv buffer used by this buffering layer.
buffer_type& recv_buffer()
{
return buffer_;
}
/// Close the stream.
void close()
{
next_layer_.close();
}
/// Flush all data from the buffer to the next layer. Returns the number of
/// bytes wrriten to the next layer on the last send operation, or 0 if the
/// underlying connection was closed. Throws an exception on failure.
size_t flush()
{
size_t total_bytes_sent = 0;
size_t last_bytes_sent = send_n(next_layer_, buffer_.begin(),
buffer_.size(), &total_bytes_sent);
buffer_.pop(total_bytes_sent);
return last_bytes_sent;
}
/// Flush all data from the buffer to the next layer. Returns the number of
/// bytes wrriten to the next layer on the last send operation, or 0 if the
/// underlying connection was closed.
template <typename Error_Handler>
size_t flush(Error_Handler error_handler)
{
size_t total_bytes_sent = 0;
size_t last_bytes_sent = send_n(next_layer_, buffer_.begin(),
buffer_.size(), &total_bytes_sent, error_handler);
buffer_.pop(total_bytes_sent);
return last_bytes_sent;
}
template <typename Handler>
class flush_handler
{
public:
flush_handler(Buffer& buffer, Handler handler)
: buffer_(buffer),
handler_(handler)
{
}
template <typename Error>
void operator()(const Error& e, size_t total_bytes_sent,
size_t last_bytes_sent)
{
buffer_.pop(total_bytes_sent);
handler_(e, last_bytes_sent);
}
private:
Buffer& buffer_;
Handler handler_;
};
/// Start an asynchronous flush.
template <typename Handler>
void async_flush(Handler handler)
{
async_send_n(next_layer_, buffer_.begin(), buffer_.size(),
flush_handler<Handler>(buffer_, handler));
}
/// Start an asynchronous flush.
template <typename Handler, typename Completion_Context>
void async_flush(Handler handler, Completion_Context context)
{
async_send_n(next_layer_, buffer_.begin(), buffer_.size(),
flush_handler<Handler>(buffer_, handler), context);
}
/// Send the given data to the peer. Returns the number of bytes sent or 0 if
/// the stream was closed cleanly. Throws an exception on failure.
size_t send(const void* data, size_t length)
{
return next_layer_.send(data, length);
if (buffer_.size() == buffer_.capacity() && !flush())
return 0;
return copy(data, length);
}
/// Send the given data to the peer. Returns the number of bytes sent or 0 if
@ -85,15 +163,65 @@ public:
template <typename Error_Handler>
size_t send(const void* data, size_t length, Error_Handler error_handler)
{
return next_layer_.send(data, length, error_handler);
if (buffer_.size() == buffer_.capacity() && !flush(error_handler))
return 0;
return copy(data, length);
}
template <typename Handler>
class send_handler
{
public:
send_handler(Buffer& buffer, const void* data, size_t length,
Handler handler)
: buffer_(buffer),
data_(data),
length_(length),
handler_(handler)
{
}
template <typename Error>
void operator()(const Error& e, size_t bytes_sent)
{
if (e || bytes_sent == 0)
{
handler_(e, bytes_sent);
}
else
{
using namespace std; // For memcpy.
size_t orig_size = buffer_.size();
size_t bytes_avail = buffer_.capacity() - orig_size;
size_t bytes_copied = (length_ < bytes_avail) ? length_ : bytes_avail;
buffer_.resize(orig_size + bytes_copied);
memcpy(buffer_.begin() + orig_size, data_, bytes_copied);
handler_(e, bytes_copied);
}
}
private:
Buffer& buffer_;
const void* data_;
size_t length_;
Handler handler_;
};
/// Start an asynchronous send. The data being sent must be valid for the
/// lifetime of the asynchronous operation.
template <typename Handler>
void async_send(const void* data, size_t length, Handler handler)
{
next_layer_.async_send(data, length, handler);
if (buffer_.size() == buffer_.capacity())
{
async_flush(send_handler<Handler>(buffer_, data, length, handler));
}
else
{
size_t bytes_copied = copy(data, length);
next_layer_.demuxer().operation_immediate(
detail::bind_handler(handler, 0, bytes_copied));
}
}
/// Start an asynchronous send. The data being sent must be valid for the
@ -102,7 +230,17 @@ public:
void async_send(const void* data, size_t length, Handler handler,
Completion_Context context)
{
next_layer_.async_send(data, length, handler, context);
if (buffer_.size() == buffer_.capacity())
{
async_flush(send_handler<Handler>(buffer_, data, length, handler),
context);
}
else
{
size_t bytes_copied = copy(data, length);
next_layer_.demuxer().operation_immediate(
detail::bind_handler(handler, 0, bytes_copied), context);
}
}
/// Receive some data from the peer. Returns the number of bytes received or
@ -138,8 +276,26 @@ public:
}
private:
/// Copy data into the internal buffer from the specified source buffer.
/// Returns the number of bytes copied.
size_t copy(const void* data, size_t length)
{
using namespace std; // For memcpy.
size_t orig_size = buffer_.size();
size_t bytes_avail = buffer_.capacity() - orig_size;
size_t bytes_copied = (length < bytes_avail) ? length : bytes_avail;
buffer_.resize(orig_size + bytes_copied);
memcpy(buffer_.begin() + orig_size, data, bytes_copied);
return length;
}
/// The next layer.
Next_Layer next_layer_;
// The data in the buffer.
Buffer buffer_;
};
} // namespace asio

View File

@ -83,6 +83,37 @@ public:
stream_impl_.close();
}
/// Flush all data from the buffer to the next layer. Returns the number of
/// bytes wrriten to the next layer on the last send operation, or 0 if the
/// underlying connection was closed. Throws an exception on failure.
size_t flush()
{
return stream_impl_.next_layer().flush();
}
/// Flush all data from the buffer to the next layer. Returns the number of
/// bytes wrriten to the next layer on the last send operation, or 0 if the
/// underlying connection was closed.
template <typename Error_Handler>
size_t flush(Error_Handler error_handler)
{
return stream_impl_.next_layer().flush(error_handler);
}
/// Start an asynchronous flush.
template <typename Handler>
void async_flush(Handler handler)
{
return stream_impl_.next_layer().async_flush(handler);
}
/// Start an asynchronous flush.
template <typename Handler, typename Completion_Context>
void async_flush(Handler handler, Completion_Context context)
{
return stream_impl_.next_layer().async_flush(handler, context);
}
/// Send the given data to the peer. Returns the number of bytes sent or 0 if
/// the stream was closed cleanly. Throws an exception on failure.
size_t send(const void* data, size_t length)