Create C++11 versions of the fork, local, multicast and nonblocking examples.

This commit is contained in:
Christopher Kohlhoff 2017-12-02 15:13:17 +11:00
parent f16e39e78e
commit ad52c82d2d
17 changed files with 1240 additions and 3 deletions

View File

@ -518,10 +518,15 @@ sub copy_examples
"src/examples/cpp11/chat",
"src/examples/cpp11/echo",
"src/examples/cpp11/executors",
"src/examples/cpp11/fork",
"src/examples/cpp11/futures",
"src/examples/cpp11/handler_tracking",
"src/examples/cpp11/http/server",
"src/examples/cpp11/invocation",
"src/examples/cpp11/iostreams",
"src/examples/cpp11/local",
"src/examples/cpp11/multicast",
"src/examples/cpp11/nonblocking",
"src/examples/cpp11/spawn",
"src/examples/cpp14/executors");

View File

@ -313,6 +313,7 @@ coroutines.
Examples showing how to use UNIX domain (local) sockets.
* [@../src/examples/cpp03/local/connect_pair.cpp]
* [@../src/examples/cpp03/local/iostream_client.cpp]
* [@../src/examples/cpp03/local/stream_server.cpp]
* [@../src/examples/cpp03/local/stream_client.cpp]
@ -370,6 +371,20 @@ and asynchronous operations.
* [@../src/examples/cpp11/echo/blocking_udp_echo_server.cpp] ([@examples/diffs/echo/blocking_udp_echo_server.cpp.html diff to C++03])
[heading Fork]
These POSIX-specific examples show how to use Asio in conjunction with the
`fork()` system call. The first example illustrates the steps required to start
a daemon process:
* [@../src/examples/cpp11/fork/daemon.cpp] ([@examples/diffs/fork/daemon.cpp.html diff to C++03])
The second example demonstrates how it is possible to fork a process from
within a completion handler.
* [@../src/examples/cpp11/fork/process_per_connection.cpp] ([@examples/diffs/fork/process_per_connection.cpp.html diff to C++03])
[heading Futures]
This example demonstrates how to use std::future in conjunction with
@ -378,6 +393,13 @@ Asio's asynchronous operations.
* [@../src/examples/cpp11/futures/daytime_client.cpp]
[heading Handler Tracking]
This example shows how to implement custom handler tracking.
* [@../src/examples/cpp11/handler_tracking/custom_tracking.hpp]
[heading HTTP Server]
This example illustrates the use of asio in a simple single-threaded server
@ -403,6 +425,23 @@ cancelling all outstanding asynchronous operations.
* [@../src/examples/cpp11/http/server/server.hpp] ([@examples/diffs/http/server/server.hpp.html diff to C++03])
[heading Multicast]
An example showing the use of multicast to transmit packets to a group of
subscribers.
* [@../src/examples/cpp11/multicast/receiver.cpp] ([@examples/diffs/multicast/receiver.cpp.html diff to C++03])
* [@../src/examples/cpp11/multicast/sender.cpp] ([@examples/diffs/multicast/sender.cpp.html diff to C++03])
[heading Nonblocking]
Example demonstrating reactor-style operations for integrating a third-party
library that wants to perform the I/O operations itself.
* [@../src/examples/cpp11/nonblocking/third_party_lib.cpp] ([@examples/diffs/nonblocking/third_party_lib.cpp.html diff to C++03])
[heading Spawn]
Example of using the asio::spawn() function, a wrapper around the
@ -413,6 +452,16 @@ coroutines.
* [@../src/examples/cpp11/spawn/echo_server.cpp] ([@examples/diffs/spawn/echo_server.cpp.html diff to C++03])
[heading UNIX Domain Sockets]
Examples showing how to use UNIX domain (local) sockets.
* [@../src/examples/cpp11/local/connect_pair.cpp] ([@examples/diffs/local/connect_pair.cpp.html diff to C++03])
* [@../src/examples/cpp11/local/iostream_client.cpp] ([@examples/diffs/local/iostream_client.cpp.html diff to C++03])
* [@../src/examples/cpp11/local/stream_server.cpp] ([@examples/diffs/local/stream_server.cpp.html diff to C++03])
* [@../src/examples/cpp11/local/stream_client.cpp] ([@examples/diffs/local/stream_client.cpp.html diff to C++03])
[endsect]

View File

@ -29,7 +29,20 @@ noinst_PROGRAMS = \
futures/daytime_client \
http/server/http_server \
invocation/prioritised_handlers \
iostreams/http_client
iostreams/http_client \
multicast/receiver \
multicast/sender \
nonblocking/third_party_lib
if !WINDOWS_TARGET
noinst_PROGRAMS += \
fork/daemon \
fork/process_per_connection \
local/connect_pair \
local/iostream_client \
local/stream_server \
local/stream_client
endif
if HAVE_BOOST_COROUTINE
noinst_PROGRAMS += \
@ -70,6 +83,18 @@ http_server_http_server_SOURCES = \
http/server/server.cpp
invocation_prioritised_handlers_SOURCES = invocation/prioritised_handlers.cpp
iostreams_http_client_SOURCES = iostreams/http_client.cpp
multicast_receiver_SOURCES = multicast/receiver.cpp
multicast_sender_SOURCES = multicast/sender.cpp
nonblocking_third_party_lib_SOURCES = nonblocking/third_party_lib.cpp
if !WINDOWS_TARGET
fork_daemon_SOURCES = fork/daemon.cpp
fork_process_per_connection_SOURCES = fork/process_per_connection.cpp
local_connect_pair_SOURCES = local/connect_pair.cpp
local_iostream_client_SOURCES = local/iostream_client.cpp
local_stream_server_SOURCES = local/stream_server.cpp
local_stream_client_SOURCES = local/stream_client.cpp
endif
if HAVE_BOOST_COROUTINE
spawn_echo_server_SOURCES = spawn/echo_server.cpp

11
asio/src/examples/cpp11/fork/.gitignore vendored Normal file
View File

@ -0,0 +1,11 @@
.deps
.dirstamp
*.o
*.obj
*.exe
*.ilk
*.manifest
*.pdb
*.tds
daemon
process_per_connection

View File

@ -0,0 +1,189 @@
//
// daemon.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <asio/io_context.hpp>
#include <asio/ip/udp.hpp>
#include <asio/signal_set.hpp>
#include <array>
#include <ctime>
#include <iostream>
#include <syslog.h>
#include <unistd.h>
using asio::ip::udp;
class udp_daytime_server
{
public:
udp_daytime_server(asio::io_context& io_context)
: socket_(io_context, {udp::v4(), 13})
{
receive();
}
private:
void receive()
{
socket_.async_receive_from(
asio::buffer(recv_buffer_), remote_endpoint_,
[this](std::error_code ec, std::size_t /*n*/)
{
if (!ec || ec == asio::error::message_size)
{
using namespace std; // For time_t, time and ctime;
time_t now = time(0);
std::string message = ctime(&now);
std::error_code ignored_ec;
socket_.send_to(asio::buffer(message),
remote_endpoint_, 0, ignored_ec);
}
receive();
});
}
udp::socket socket_;
udp::endpoint remote_endpoint_;
std::array<char, 1> recv_buffer_;
};
int main()
{
try
{
asio::io_context io_context;
// Initialise the server before becoming a daemon. If the process is
// started from a shell, this means any errors will be reported back to the
// user.
udp_daytime_server server(io_context);
// Register signal handlers so that the daemon may be shut down. You may
// also want to register for other signals, such as SIGHUP to trigger a
// re-read of a configuration file.
asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait(
[&](std::error_code /*ec*/, int /*signo*/)
{
io_context.stop();
});
// Inform the io_context that we are about to become a daemon. The
// io_context cleans up any internal resources, such as threads, that may
// interfere with forking.
io_context.notify_fork(asio::io_context::fork_prepare);
// Fork the process and have the parent exit. If the process was started
// from a shell, this returns control to the user. Forking a new process is
// also a prerequisite for the subsequent call to setsid().
if (pid_t pid = fork())
{
if (pid > 0)
{
// We're in the parent process and need to exit.
//
// When the exit() function is used, the program terminates without
// invoking local variables' destructors. Only global variables are
// destroyed. As the io_context object is a local variable, this means
// we do not have to call:
//
// io_context.notify_fork(asio::io_context::fork_parent);
//
// However, this line should be added before each call to exit() if
// using a global io_context object. An additional call:
//
// io_context.notify_fork(asio::io_context::fork_prepare);
//
// should also precede the second fork().
exit(0);
}
else
{
syslog(LOG_ERR | LOG_USER, "First fork failed: %m");
return 1;
}
}
// Make the process a new session leader. This detaches it from the
// terminal.
setsid();
// A process inherits its working directory from its parent. This could be
// on a mounted filesystem, which means that the running daemon would
// prevent this filesystem from being unmounted. Changing to the root
// directory avoids this problem.
chdir("/");
// The file mode creation mask is also inherited from the parent process.
// We don't want to restrict the permissions on files created by the
// daemon, so the mask is cleared.
umask(0);
// A second fork ensures the process cannot acquire a controlling terminal.
if (pid_t pid = fork())
{
if (pid > 0)
{
exit(0);
}
else
{
syslog(LOG_ERR | LOG_USER, "Second fork failed: %m");
return 1;
}
}
// Close the standard streams. This decouples the daemon from the terminal
// that started it.
close(0);
close(1);
close(2);
// We don't want the daemon to have any standard input.
if (open("/dev/null", O_RDONLY) < 0)
{
syslog(LOG_ERR | LOG_USER, "Unable to open /dev/null: %m");
return 1;
}
// Send standard output to a log file.
const char* output = "/tmp/asio.daemon.out";
const int flags = O_WRONLY | O_CREAT | O_APPEND;
const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
if (open(output, flags, mode) < 0)
{
syslog(LOG_ERR | LOG_USER, "Unable to open output file %s: %m", output);
return 1;
}
// Also send standard error to the same log file.
if (dup(1) < 0)
{
syslog(LOG_ERR | LOG_USER, "Unable to dup output descriptor: %m");
return 1;
}
// Inform the io_context that we have finished becoming a daemon. The
// io_context uses this opportunity to create any internal file descriptors
// that need to be private to the new process.
io_context.notify_fork(asio::io_context::fork_child);
// The io_context can now be used normally.
syslog(LOG_INFO | LOG_USER, "Daemon started");
io_context.run();
syslog(LOG_INFO | LOG_USER, "Daemon stopped");
}
catch (std::exception& e)
{
syslog(LOG_ERR | LOG_USER, "Exception: %s", e.what());
std::cerr << "Exception: " << e.what() << std::endl;
}
}

View File

@ -0,0 +1,162 @@
//
// process_per_connection.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/signal_set.hpp>
#include <asio/write.hpp>
#include <cstdlib>
#include <iostream>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
using asio::ip::tcp;
class server
{
public:
server(asio::io_context& io_context, unsigned short port)
: io_context_(io_context),
signal_(io_context, SIGCHLD),
acceptor_(io_context, {tcp::v4(), port}),
socket_(io_context)
{
wait_for_signal();
accept();
}
private:
void wait_for_signal()
{
signal_.async_wait(
[this](std::error_code /*ec*/, int /*signo*/)
{
// Only the parent process should check for this signal. We can
// determine whether we are in the parent by checking if the acceptor
// is still open.
if (acceptor_.is_open())
{
// Reap completed child processes so that we don't end up with
// zombies.
int status = 0;
while (waitpid(-1, &status, WNOHANG) > 0) {}
wait_for_signal();
}
});
}
void accept()
{
acceptor_.async_accept(
[this](std::error_code ec, tcp::socket new_socket)
{
if (!ec)
{
// Take ownership of the newly accepted socket.
socket_ = std::move(new_socket);
// Inform the io_context that we are about to fork. The io_context
// cleans up any internal resources, such as threads, that may
// interfere with forking.
io_context_.notify_fork(asio::io_context::fork_prepare);
if (fork() == 0)
{
// Inform the io_context that the fork is finished and that this
// is the child process. The io_context uses this opportunity to
// create any internal file descriptors that must be private to
// the new process.
io_context_.notify_fork(asio::io_context::fork_child);
// The child won't be accepting new connections, so we can close
// the acceptor. It remains open in the parent.
acceptor_.close();
// The child process is not interested in processing the SIGCHLD
// signal.
signal_.cancel();
read();
}
else
{
// Inform the io_context that the fork is finished (or failed)
// and that this is the parent process. The io_context uses this
// opportunity to recreate any internal resources that were
// cleaned up during preparation for the fork.
io_context_.notify_fork(asio::io_context::fork_parent);
// The parent process can now close the newly accepted socket. It
// remains open in the child.
socket_.close();
accept();
}
}
else
{
std::cerr << "Accept error: " << ec.message() << std::endl;
accept();
}
});
}
void read()
{
socket_.async_read_some(asio::buffer(data_),
[this](std::error_code ec, std::size_t length)
{
if (!ec)
write(length);
});
}
void write(std::size_t length)
{
asio::async_write(socket_, asio::buffer(data_, length),
[this](std::error_code ec, std::size_t /*length*/)
{
if (!ec)
read();
});
}
asio::io_context& io_context_;
asio::signal_set signal_;
tcp::acceptor acceptor_;
tcp::socket socket_;
std::array<char, 1024> data_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: process_per_connection <port>\n";
return 1;
}
asio::io_context io_context;
using namespace std; // For atoi.
server s(io_context, atoi(argv[1]));
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
}

View File

@ -72,8 +72,8 @@ struct custom_tracking
}
// Record the creation of a tracked handler.
static void creation(asio::execution_context& /*ctx*/, tracked_handler& h,
const char* object_type, void* /*object*/,
static void creation(asio::execution_context& /*ctx*/,
tracked_handler& h, const char* object_type, void* /*object*/,
std::uintmax_t native_handle, const char* op_name)
{
// Generate a unique id for the new handler.

View File

@ -0,0 +1,13 @@
.deps
.dirstamp
*.o
*.obj
*.exe
connect_pair
stream_server
stream_client
iostream_client
*.ilk
*.manifest
*.pdb
*.tds

View File

@ -0,0 +1,129 @@
//
// connect_pair.cpp
// ~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <array>
#include <iostream>
#include <string>
#include <cctype>
#include <asio.hpp>
#if defined(ASIO_HAS_LOCAL_SOCKETS)
using asio::local::stream_protocol;
class uppercase_filter
{
public:
uppercase_filter(stream_protocol::socket sock)
: socket_(std::move(sock))
{
read();
}
private:
void read()
{
socket_.async_read_some(asio::buffer(data_),
[this](std::error_code ec, std::size_t size)
{
if (!ec)
{
// Compute result.
for (std::size_t i = 0; i < size; ++i)
data_[i] = std::toupper(data_[i]);
// Send result.
write(size);
}
else
{
throw asio::system_error(ec);
}
});
}
void write(std::size_t size)
{
asio::async_write(socket_, asio::buffer(data_, size),
[this](std::error_code ec, std::size_t /*size*/)
{
if (!ec)
{
// Wait for request.
read();
}
else
{
throw asio::system_error(ec);
}
});
}
stream_protocol::socket socket_;
std::array<char, 512> data_;
};
int main()
{
try
{
asio::io_context io_context;
// Create a connected pair and pass one end to a filter.
stream_protocol::socket socket(io_context);
stream_protocol::socket filter_socket(io_context);
asio::local::connect_pair(socket, filter_socket);
uppercase_filter filter(std::move(filter_socket));
// The io_context runs in a background thread to perform filtering.
asio::thread thread(
[&io_context]()
{
try
{
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception in thread: " << e.what() << "\n";
std::exit(1);
}
});
for (;;)
{
// Collect request from user.
std::cout << "Enter a string: ";
std::string request;
std::getline(std::cin, request);
// Send request to filter.
asio::write(socket, asio::buffer(request));
// Wait for reply from filter.
std::vector<char> reply(request.size());
asio::read(socket, asio::buffer(reply));
// Show reply to user.
std::cout << "Result: ";
std::cout.write(&reply[0], request.size());
std::cout << std::endl;
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
std::exit(1);
}
}
#else // defined(ASIO_HAS_LOCAL_SOCKETS)
# error Local sockets not available on this platform.
#endif // defined(ASIO_HAS_LOCAL_SOCKETS)

View File

@ -0,0 +1,61 @@
//
// stream_client.cpp
// ~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstring>
#include <iostream>
#include "asio.hpp"
#if defined(ASIO_HAS_LOCAL_SOCKETS)
using asio::local::stream_protocol;
constexpr std::size_t max_length = 1024;
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: iostream_client <file>\n";
return 1;
}
stream_protocol::endpoint ep(argv[1]);
stream_protocol::iostream s(ep);
if (!s)
{
std::cerr << "Unable to connect: " << s.error().message() << std::endl;
return 1;
}
std::cout << "Enter message: ";
char request[max_length];
std::cin.getline(request, max_length);
size_t length = std::strlen(request);
s << request;
char reply[max_length];
s.read(reply, length);
std::cout << "Reply is: ";
std::cout.write(reply, length);
std::cout << "\n";
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
#else // defined(ASIO_HAS_LOCAL_SOCKETS)
# error Local sockets not available on this platform.
#endif // defined(ASIO_HAS_LOCAL_SOCKETS)

View File

@ -0,0 +1,60 @@
//
// stream_client.cpp
// ~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <cstring>
#include <iostream>
#include "asio.hpp"
#if defined(ASIO_HAS_LOCAL_SOCKETS)
using asio::local::stream_protocol;
constexpr std::size_t max_length = 1024;
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: stream_client <file>\n";
return 1;
}
asio::io_context io_context;
stream_protocol::socket s(io_context);
s.connect(stream_protocol::endpoint(argv[1]));
std::cout << "Enter message: ";
char request[max_length];
std::cin.getline(request, max_length);
size_t request_length = std::strlen(request);
asio::write(s, asio::buffer(request, request_length));
char reply[max_length];
size_t reply_length = asio::read(s,
asio::buffer(reply, request_length));
std::cout << "Reply is: ";
std::cout.write(reply, reply_length);
std::cout << "\n";
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
#else // defined(ASIO_HAS_LOCAL_SOCKETS)
# error Local sockets not available on this platform.
#endif // defined(ASIO_HAS_LOCAL_SOCKETS)

View File

@ -0,0 +1,121 @@
//
// stream_server.cpp
// ~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <array>
#include <cstdio>
#include <iostream>
#include <memory>
#include "asio.hpp"
#if defined(ASIO_HAS_LOCAL_SOCKETS)
using asio::local::stream_protocol;
class session
: public std::enable_shared_from_this<session>
{
public:
session(stream_protocol::socket sock)
: socket_(std::move(sock))
{
}
void start()
{
do_read();
}
private:
void do_read()
{
auto self(shared_from_this());
socket_.async_read_some(asio::buffer(data_),
[this, self](std::error_code ec, std::size_t length)
{
if (!ec)
do_write(length);
});
}
void do_write(std::size_t length)
{
auto self(shared_from_this());
asio::async_write(socket_,
asio::buffer(data_, length),
[this, self](std::error_code ec, std::size_t /*length*/)
{
if (!ec)
do_read();
});
}
// The socket used to communicate with the client.
stream_protocol::socket socket_;
// Buffer used to store data received from the client.
std::array<char, 1024> data_;
};
class server
{
public:
server(asio::io_context& io_context, const std::string& file)
: acceptor_(io_context, stream_protocol::endpoint(file))
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(
[this](std::error_code ec, stream_protocol::socket socket)
{
if (!ec)
{
std::make_shared<session>(std::move(socket))->start();
}
do_accept();
});
}
stream_protocol::acceptor acceptor_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: stream_server <file>\n";
std::cerr << "*** WARNING: existing file is removed ***\n";
return 1;
}
asio::io_context io_context;
std::remove(argv[1]);
server s(io_context, argv[1]);
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
#else // defined(ASIO_HAS_LOCAL_SOCKETS)
# error Local sockets not available on this platform.
#endif // defined(ASIO_HAS_LOCAL_SOCKETS)

View File

@ -0,0 +1,11 @@
.deps
.dirstamp
receiver
sender
*.o
*.obj
*.exe
*.ilk
*.manifest
*.pdb
*.tds

View File

@ -0,0 +1,88 @@
//
// receiver.cpp
// ~~~~~~~~~~~~
//
// Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <array>
#include <iostream>
#include <string>
#include "asio.hpp"
constexpr short multicast_port = 30001;
class receiver
{
public:
receiver(asio::io_context& io_context,
const asio::ip::address& listen_address,
const asio::ip::address& multicast_address)
: socket_(io_context)
{
// Create the socket so that multiple may be bound to the same address.
asio::ip::udp::endpoint listen_endpoint(
listen_address, multicast_port);
socket_.open(listen_endpoint.protocol());
socket_.set_option(asio::ip::udp::socket::reuse_address(true));
socket_.bind(listen_endpoint);
// Join the multicast group.
socket_.set_option(
asio::ip::multicast::join_group(multicast_address));
do_receive();
}
private:
void do_receive()
{
socket_.async_receive_from(
asio::buffer(data_), sender_endpoint_,
[this](std::error_code ec, std::size_t length)
{
if (!ec)
{
std::cout.write(data_.data(), length);
std::cout << std::endl;
do_receive();
}
});
}
asio::ip::udp::socket socket_;
asio::ip::udp::endpoint sender_endpoint_;
std::array<char, 1024> data_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 3)
{
std::cerr << "Usage: receiver <listen_address> <multicast_address>\n";
std::cerr << " For IPv4, try:\n";
std::cerr << " receiver 0.0.0.0 239.255.0.1\n";
std::cerr << " For IPv6, try:\n";
std::cerr << " receiver 0::0 ff31::8000:1234\n";
return 1;
}
asio::io_context io_context;
receiver r(io_context,
asio::ip::make_address(argv[1]),
asio::ip::make_address(argv[2]));
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}

View File

@ -0,0 +1,91 @@
//
// sender.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <iostream>
#include <sstream>
#include <string>
#include "asio.hpp"
constexpr short multicast_port = 30001;
constexpr int max_message_count = 10;
class sender
{
public:
sender(asio::io_context& io_context,
const asio::ip::address& multicast_address)
: endpoint_(multicast_address, multicast_port),
socket_(io_context, endpoint_.protocol()),
timer_(io_context),
message_count_(0)
{
do_send();
}
private:
void do_send()
{
std::ostringstream os;
os << "Message " << message_count_++;
message_ = os.str();
socket_.async_send_to(
asio::buffer(message_), endpoint_,
[this](std::error_code ec, std::size_t /*length*/)
{
if (!ec && message_count_ < max_message_count)
do_timeout();
});
}
void do_timeout()
{
timer_.expires_after(std::chrono::seconds(1));
timer_.async_wait(
[this](std::error_code ec)
{
if (!ec)
do_send();
});
}
private:
asio::ip::udp::endpoint endpoint_;
asio::ip::udp::socket socket_;
asio::steady_timer timer_;
int message_count_;
std::string message_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: sender <multicast_address>\n";
std::cerr << " For IPv4, try:\n";
std::cerr << " sender 239.255.0.1\n";
std::cerr << " For IPv6, try:\n";
std::cerr << " sender ff31::8000:1234\n";
return 1;
}
asio::io_context io_context;
sender s(io_context, asio::ip::make_address(argv[1]));
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}

View File

@ -0,0 +1,10 @@
.deps
.dirstamp
*.o
*.obj
*.exe
third_party_lib
*.ilk
*.manifest
*.pdb
*.tds

View File

@ -0,0 +1,212 @@
//
// third_party_lib.cpp
// ~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <asio.hpp>
#include <array>
#include <iostream>
#include <memory>
using asio::ip::tcp;
namespace third_party_lib {
// Simulation of a third party library that wants to perform read and write
// operations directly on a socket. It needs to be polled to determine whether
// it requires a read or write operation, and notified when the socket is ready
// for reading or writing.
class session
{
public:
session(tcp::socket& socket)
: socket_(socket)
{
}
// Returns true if the third party library wants to be notified when the
// socket is ready for reading.
bool want_read() const
{
return state_ == reading;
}
// Notify that third party library that it should perform its read operation.
void do_read(std::error_code& ec)
{
if (std::size_t len = socket_.read_some(asio::buffer(data_), ec))
{
write_buffer_ = asio::buffer(data_, len);
state_ = writing;
}
}
// Returns true if the third party library wants to be notified when the
// socket is ready for writing.
bool want_write() const
{
return state_ == writing;
}
// Notify that third party library that it should perform its write operation.
void do_write(std::error_code& ec)
{
if (std::size_t len = socket_.write_some(
asio::buffer(write_buffer_), ec))
{
write_buffer_ = write_buffer_ + len;
state_ = asio::buffer_size(write_buffer_) > 0 ? writing : reading;
}
}
private:
tcp::socket& socket_;
enum { reading, writing } state_ = reading;
std::array<char, 128> data_;
asio::const_buffer write_buffer_;
};
} // namespace third_party_lib
// The glue between asio's sockets and the third party library.
class connection
: public std::enable_shared_from_this<connection>
{
public:
connection(tcp::socket socket)
: socket_(std::move(socket))
{
}
void start()
{
// Put the socket into non-blocking mode.
socket_.non_blocking(true);
do_operations();
}
private:
void do_operations()
{
auto self(shared_from_this());
// Start a read operation if the third party library wants one.
if (session_impl_.want_read() && !read_in_progress_)
{
read_in_progress_ = true;
socket_.async_wait(tcp::socket::wait_read,
[this, self](std::error_code ec)
{
read_in_progress_ = false;
// Notify third party library that it can perform a read.
if (!ec)
session_impl_.do_read(ec);
// The third party library successfully performed a read on the
// socket. Start new read or write operations based on what it now
// wants.
if (!ec || ec == asio::error::would_block)
do_operations();
// Otherwise, an error occurred. Closing the socket cancels any
// outstanding asynchronous read or write operations. The
// connection object will be destroyed automatically once those
// outstanding operations complete.
else
socket_.close();
});
}
// Start a write operation if the third party library wants one.
if (session_impl_.want_write() && !write_in_progress_)
{
write_in_progress_ = true;
socket_.async_wait(tcp::socket::wait_write,
[this, self](std::error_code ec)
{
write_in_progress_ = false;
// Notify third party library that it can perform a write.
if (!ec)
session_impl_.do_write(ec);
// The third party library successfully performed a write on the
// socket. Start new read or write operations based on what it now
// wants.
if (!ec || ec == asio::error::would_block)
do_operations();
// Otherwise, an error occurred. Closing the socket cancels any
// outstanding asynchronous read or write operations. The
// connection object will be destroyed automatically once those
// outstanding operations complete.
else
socket_.close();
});
}
}
private:
tcp::socket socket_;
third_party_lib::session session_impl_{socket_};
bool read_in_progress_ = false;
bool write_in_progress_ = false;
};
class server
{
public:
server(asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, {tcp::v4(), port})
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(
[this](std::error_code ec, tcp::socket socket)
{
if (!ec)
{
std::make_shared<connection>(std::move(socket))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: third_party_lib <port>\n";
return 1;
}
asio::io_context io_context;
server s(io_context, std::atoi(argv[1]));
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}