...one of the most highly
regarded and expertly designed C++ library projects in the
world.
— Herb Sutter and Andrei
Alexandrescu, C++
Coding Standards
All our programs until now have used one-shot connections. They also didn't feature any fault tolerance: if the server is unavailable, our program throws an exception and terminates. Most real world scenarios require long-lived, reliable connections, instead.
In this tutorial, we will implement a server for a simple request-reply protocol.
The protocol allows clients to retrieve the full name of an employee given
their ID. We will use connection_pool
to maintain a set of healthy connections that we can use when a client connects
to our server.
The protocol is TCP based, and can be described as follows:
This protocol is intentionally overly simplistic, and shouldn't be used in production. See our HTTP examples for more advanced use cases.
connection_pool
is an I/O object that contains any_connection
objects, and can be constructed from an execution context and a pool_params
config struct:
// Create an I/O context, required by all I/O objects asio::io_context ctx; // pool_params contains configuration for the pool. // You must specify enough information to establish a connection, // including the server address and credentials. // You can configure a lot of other things, like pool limits mysql::pool_params params; params.server_address.emplace_host_and_port(server_hostname); params.username = username; params.password = password; params.database = "boost_mysql_examples"; // Construct the pool. // ctx will be used to create the connections and other I/O objects mysql::connection_pool pool(ctx, std::move(params));
A single connection pool is usually created per application.
connection_pool::async_run
should be called once per pool:
// You need to call async_run on the pool before doing anything useful with it. // async_run creates connections and keeps them healthy. It must be called // only once per pool. // The detached completion token means that we don't want to be notified when // the operation ends. It's similar to a no-op callback. pool.async_run(asio::detached);
Let's first write a coroutine that encapsulates database access. Given an employee ID, it should return the string to be sent as response to the client. Don't worry about error handling for now - we will take care of it in the next tutorial.
When using a pool, we don't need to explicitly create, connect or close connections.
Instead, we use connection_pool::async_get_connection
to obtain them from the pool:
// Get a connection from the pool. // This will wait until a healthy connection is ready to be used. // pooled_connection grants us exclusive access to the connection until // the object is destroyed mysql::pooled_connection conn = co_await pool.async_get_connection();
pooled_connection
is a wrapper around any_connection
,
with some pool-specific additions. We can use it like a regular connection:
// Use the connection normally to query the database. // operator-> returns a reference to an any_connection, // so we can apply all what we learnt in previous tutorials mysql::static_results<mysql::pfr_by_name<employee>> result; co_await conn->async_execute( mysql::with_params("SELECT first_name, last_name FROM employee WHERE id = {}", employee_id), result );
When a pooled_connection
is destroyed, the connection is returned to the pool. The underlying connection
will be cleaned up using a lightweight session reset mechanism and recycled.
Subsequent async_get_connection
calls may retrieve the same connection. This improves efficiency, since session
establishment is costly.
async_get_connection
waits for a client connection to become available before completing. If the
server is unavailable or credentials are invalid, it may wait indefinitely.
This is a problem for both development and production. We can solve this by
using asio::cancel_after
,
which allows setting timeouts to async operations:
// Get a connection from the pool. // This will wait until a healthy connection is ready to be used. // pooled_connection grants us exclusive access to the connection until // the object is destroyed. // Fail the operation if no connection becomes available in the next 20 seconds. mysql::pooled_connection conn = co_await pool.async_get_connection( asio::cancel_after(std::chrono::seconds(1)) );
Don't worry if you don't fully understand how this works. We will go into more
detail on asio::cancel_after
,
cancellations and completion tokens in the next tutorial.
Putting all pieces together, our coroutine becomes:
// Encapsulates the database access logic. // Given an employee_id, retrieves the employee details to be sent to the client. asio::awaitable<std::string> get_employee_details(mysql::connection_pool& pool, std::int64_t employee_id) { // Get a connection from the pool. // This will wait until a healthy connection is ready to be used. // pooled_connection grants us exclusive access to the connection until // the object is destroyed. // Fail the operation if no connection becomes available in the next 20 seconds. mysql::pooled_connection conn = co_await pool.async_get_connection( asio::cancel_after(std::chrono::seconds(1)) ); // Use the connection normally to query the database. // operator-> returns a reference to an any_connection, // so we can apply all what we learnt in previous tutorials mysql::static_results<mysql::pfr_by_name<employee>> result; co_await conn->async_execute( mysql::with_params("SELECT first_name, last_name FROM employee WHERE id = {}", employee_id), result ); // Compose the message to be sent back to the client if (result.rows().empty()) { co_return "NOT_FOUND"; } else { const auto& emp = result.rows()[0]; co_return emp.first_name + ' ' + emp.last_name; } // When the pooled_connection is destroyed, the connection is returned // to the pool, so it can be re-used. }
Let's now build a function that handles a client sessions, invoking the database access logic in the process:
asio::awaitable<void> handle_session(mysql::connection_pool& pool, asio::ip::tcp::socket client_socket) { // Read the request from the client. // async_read ensures that the 8-byte buffer is filled, handling partial reads. unsigned char message[8]{}; co_await asio::async_read(client_socket, asio::buffer(message)); // Parse the 64-bit big-endian int into a native int64_t std::int64_t employee_id = boost::endian::load_big_s64(message); // Invoke the database handling logic std::string response = co_await get_employee_details(pool, employee_id); // Write the response back to the client. // async_write ensures that the entire message is written, handling partial writes co_await asio::async_write(client_socket, asio::buffer(response)); // The socket's destructor will close the client connection }
We now need logic to accept incoming TCP connections. We will use an asio::ip::tcp::acceptor
object to accomplish it, listening
for connections in a loop until the server is stopped:
asio::awaitable<void> listener(mysql::connection_pool& pool, unsigned short port) { // An object that accepts incoming TCP connections. asio::ip::tcp::acceptor acc(co_await asio::this_coro::executor); // The endpoint where the server will listen. asio::ip::tcp::endpoint listening_endpoint(asio::ip::make_address("0.0.0.0"), port); // Open the acceptor acc.open(listening_endpoint.protocol()); // Allow reusing the local address, so we can restart our server // without encountering errors in bind acc.set_option(asio::socket_base::reuse_address(true)); // Bind to the local address acc.bind(listening_endpoint); // Start listening for connections acc.listen(); std::cout << "Server listening at " << acc.local_endpoint() << std::endl; // Start the accept loop while (true) { // Accept a new connection auto sock = co_await acc.async_accept(); // Function implementing our session logic. // Take ownership of the socket. // Having this as a named variable workarounds a gcc bug // (https://gcc.gnu.org/bugzilla/show_bug.cgi?id=107288) auto session_logic = [&pool, s = std::move(sock)]() mutable { return handle_session(pool, std::move(s)); }; // Launch a coroutine that runs our session logic. // We don't co_await this coroutine so we can listen // to new connections while the session is running. asio::co_spawn( // Use the same executor as the current coroutine co_await asio::this_coro::executor, // Session logic std::move(session_logic), // Propagate exceptions thrown in handle_session [](std::exception_ptr ex) { if (ex) std::rethrow_exception(ex); } ); } }
Finally, we need a way to stop our program. We will use an asio::signal_set
object to catch signals, and call io_context::stop
when
Ctrl-C is pressed:
// signal_set is an I/O object that allows waiting for signals asio::signal_set signals(ctx, SIGINT, SIGTERM); // Wait for signals signals.async_wait([&](boost::system::error_code, int) { // Stop the execution context. This will cause io_context::run to return ctx.stop(); });
Putting all these pieces together, our main program becomes:
// Create an I/O context, required by all I/O objects asio::io_context ctx; // pool_params contains configuration for the pool. // You must specify enough information to establish a connection, // including the server address and credentials. // You can configure a lot of other things, like pool limits mysql::pool_params params; params.server_address.emplace_host_and_port(server_hostname); params.username = username; params.password = password; params.database = "boost_mysql_examples"; // Construct the pool. // ctx will be used to create the connections and other I/O objects mysql::connection_pool pool(ctx, std::move(params)); // You need to call async_run on the pool before doing anything useful with it. // async_run creates connections and keeps them healthy. It must be called // only once per pool. // The detached completion token means that we don't want to be notified when // the operation ends. It's similar to a no-op callback. pool.async_run(asio::detached); // signal_set is an I/O object that allows waiting for signals asio::signal_set signals(ctx, SIGINT, SIGTERM); // Wait for signals signals.async_wait([&](boost::system::error_code, int) { // Stop the execution context. This will cause io_context::run to return ctx.stop(); }); // Launch our listener asio::co_spawn( ctx, [&pool, listener_port] { return listener(pool, listener_port); }, // If any exception is thrown in the coroutine body, rethrow it. [](std::exception_ptr ptr) { if (ptr) { std::rethrow_exception(ptr); } } ); // Calling run will actually execute the coroutine until completion ctx.run();
Full program listing for this tutorial is here.
For simplicity, we've left error handling out of this tutorial. This is usually very important in a server like the one we've written, and is the topic of our next tutorial.