doc/html/boost_asio/example/cpp20/channels/mutual_exclusion_1.cpp
//
// mutual_exclusion_1.cpp
// ~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/asio.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <iostream>
#include <memory>
using boost::asio::as_tuple;
using boost::asio::awaitable;
using boost::asio::dynamic_buffer;
using boost::asio::co_spawn;
using boost::asio::deferred;
using boost::asio::detached;
using boost::asio::experimental::channel;
using boost::asio::io_context;
using boost::asio::ip::tcp;
using boost::asio::steady_timer;
using namespace boost::asio::buffer_literals;
using namespace std::literals::chrono_literals;
// This class implements a simple line-based protocol:
//
// * For event line that is received from the client, the session sends a
// message header followed by the content of the line as the message body.
//
// * The session generates heartbeat messages once a second.
//
// This protocol is implemented using two actors, handle_messages() and
// send_heartbeats(), each written as a coroutine.
class line_based_echo_session :
public std::enable_shared_from_this<line_based_echo_session>
{
// The socket used to read from and write to the client. This socket is a
// data member as it is shared between the two actors.
tcp::socket socket_;
// As both of the actors will write to the socket, we need a lock to prevent
// these writes from overlapping. To achieve this, we use a channel with a
// buffer size of one. The lock is claimed by sending a message to the
// channel, and then released by receiving this message back again. If the
// lock is not held then the channel's buffer is empty, and the send will
// complete without delay. Otherwise, if the lock is held by the other actor,
// then the send operation will not complete until the lock is released.
channel<void()> write_lock_{socket_.get_executor(), 1};
public:
line_based_echo_session(tcp::socket socket)
: socket_{std::move(socket)}
{
socket_.set_option(tcp::no_delay(true));
}
void start()
{
co_spawn(socket_.get_executor(),
[self = shared_from_this()]{ return self->handle_messages(); },
detached);
co_spawn(socket_.get_executor(),
[self = shared_from_this()]{ return self->send_heartbeats(); },
detached);
}
private:
void stop()
{
socket_.close();
write_lock_.cancel();
}
awaitable<void> handle_messages()
{
try
{
constexpr std::size_t max_line_length = 1024;
std::string data;
for (;;)
{
// Read an entire line from the client.
std::size_t length = co_await async_read_until(socket_,
dynamic_buffer(data, max_line_length), '\n', deferred);
// Claim the write lock by sending a message to the channel. Since the
// channel signature is void(), there are no arguments to send in the
// message itself.
co_await write_lock_.async_send(deferred);
// Respond to the client with a message, echoing the line they sent.
co_await async_write(socket_, "<line>"_buf, deferred);
co_await async_write(socket_, dynamic_buffer(data, length), deferred);
// Release the lock by receiving the message back again.
write_lock_.try_receive([](auto...){});
}
}
catch (const std::exception&)
{
stop();
}
}
awaitable<void> send_heartbeats()
{
steady_timer timer{socket_.get_executor()};
try
{
for (;;)
{
// Wait one second before trying to send the next heartbeat.
timer.expires_after(1s);
co_await timer.async_wait(deferred);
// Claim the write lock by sending a message to the channel. Since the
// channel signature is void(), there are no arguments to send in the
// message itself.
co_await write_lock_.async_send(deferred);
// Send a heartbeat to the client. As the content of the heartbeat
// message never varies, a buffer literal can be used to specify the
// bytes of the message. The memory associated with a buffer literal is
// valid for the lifetime of the program, which mean that the buffer
// can be safely passed as-is to the asynchronous operation.
co_await async_write(socket_, "<heartbeat>\n"_buf, deferred);
// Release the lock by receiving the message back again.
write_lock_.try_receive([](auto...){});
}
}
catch (const std::exception&)
{
stop();
}
}
};
awaitable<void> listen(tcp::acceptor& acceptor)
{
for (;;)
{
auto [e, socket] = co_await acceptor.async_accept(as_tuple(deferred));
if (!e)
{
std::make_shared<line_based_echo_session>(std::move(socket))->start();
}
}
}
int main(int argc, char* argv[])
{
try
{
if (argc != 3)
{
std::cerr << "Usage: mutual_exclusion_1";
std::cerr << " <listen_address> <listen_port>\n";
return 1;
}
io_context ctx;
auto listen_endpoint =
*tcp::resolver(ctx).resolve(argv[1], argv[2],
tcp::resolver::passive).begin();
tcp::acceptor acceptor(ctx, listen_endpoint);
co_spawn(ctx, listen(acceptor), detached);
ctx.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
}