Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

PrevUpHomeNext

Tutorial 6: Connection pools

All our programs until now have used one-shot connections. They also didn't feature any fault tolerance: if the server is unavailable, our program throws an exception and terminates. Most real world scenarios require long-lived, reliable connections, instead.

In this tutorial, we will implement a server for a simple request-reply protocol. The protocol allows clients to retrieve the full name of an employee given their ID. We will use connection_pool to maintain a set of healthy connections that we can use when a client connects to our server.

The protocol

The protocol is TCP based, and can be described as follows:

This protocol is intentionally overly simplistic, and shouldn't be used in production. See our HTTP examples for more advanced use cases.

Creating a connection pool

connection_pool is an I/O object that contains any_connection objects, and can be constructed from an execution context and a pool_params config struct:

// Create an I/O context, required by all I/O objects
asio::io_context ctx;

// pool_params contains configuration for the pool.
// You must specify enough information to establish a connection,
// including the server address and credentials.
// You can configure a lot of other things, like pool limits
mysql::pool_params params;
params.server_address.emplace_host_and_port(server_hostname);
params.username = username;
params.password = password;
params.database = "boost_mysql_examples";

// Construct the pool.
// ctx will be used to create the connections and other I/O objects
mysql::connection_pool pool(ctx, std::move(params));

A single connection pool is usually created per application.

connection_pool::async_run should be called once per pool:

// You need to call async_run on the pool before doing anything useful with it.
// async_run creates connections and keeps them healthy. It must be called
// only once per pool.
// The detached completion token means that we don't want to be notified when
// the operation ends. It's similar to a no-op callback.
pool.async_run(asio::detached);

Using pooled connections

Let's first write a coroutine that encapsulates database access. Given an employee ID, it should return the string to be sent as response to the client. Don't worry about error handling for now - we will take care of it in the next tutorial.

When using a pool, we don't need to explicitly create, connect or close connections. Instead, we use connection_pool::async_get_connection to obtain them from the pool:

// Get a connection from the pool.
// This will wait until a healthy connection is ready to be used.
// pooled_connection grants us exclusive access to the connection until
// the object is destroyed
mysql::pooled_connection conn = co_await pool.async_get_connection();

pooled_connection is a wrapper around any_connection, with some pool-specific additions. We can use it like a regular connection:

// Use the connection normally to query the database.
// operator-> returns a reference to an any_connection,
// so we can apply all what we learnt in previous tutorials
mysql::static_results<mysql::pfr_by_name<employee>> result;
co_await conn->async_execute(
    mysql::with_params("SELECT first_name, last_name FROM employee WHERE id = {}", employee_id),
    result
);

When a pooled_connection is destroyed, the connection is returned to the pool. The underlying connection will be cleaned up using a lightweight session reset mechanism and recycled. Subsequent async_get_connection calls may retrieve the same connection. This improves efficiency, since session establishment is costly.

async_get_connection waits for a client connection to become available before completing. If the server is unavailable or credentials are invalid, it may wait indefinitely. This is a problem for both development and production. We can solve this by using asio::cancel_after, which allows setting timeouts to async operations:

// Get a connection from the pool.
// This will wait until a healthy connection is ready to be used.
// pooled_connection grants us exclusive access to the connection until
// the object is destroyed.
// Fail the operation if no connection becomes available in the next 20 seconds.
mysql::pooled_connection conn = co_await pool.async_get_connection(
    asio::cancel_after(std::chrono::seconds(1))
);

Don't worry if you don't fully understand how this works. We will go into more detail on asio::cancel_after, cancellations and completion tokens in the next tutorial.

Putting all pieces together, our coroutine becomes:

// Encapsulates the database access logic.
// Given an employee_id, retrieves the employee details to be sent to the client.
asio::awaitable<std::string> get_employee_details(mysql::connection_pool& pool, std::int64_t employee_id)
{
    // Get a connection from the pool.
    // This will wait until a healthy connection is ready to be used.
    // pooled_connection grants us exclusive access to the connection until
    // the object is destroyed.
    // Fail the operation if no connection becomes available in the next 20 seconds.
    mysql::pooled_connection conn = co_await pool.async_get_connection(
        asio::cancel_after(std::chrono::seconds(1))
    );

    // Use the connection normally to query the database.
    // operator-> returns a reference to an any_connection,
    // so we can apply all what we learnt in previous tutorials
    mysql::static_results<mysql::pfr_by_name<employee>> result;
    co_await conn->async_execute(
        mysql::with_params("SELECT first_name, last_name FROM employee WHERE id = {}", employee_id),
        result
    );

    // Compose the message to be sent back to the client
    if (result.rows().empty())
    {
        co_return "NOT_FOUND";
    }
    else
    {
        const auto& emp = result.rows()[0];
        co_return emp.first_name + ' ' + emp.last_name;
    }

    // When the pooled_connection is destroyed, the connection is returned
    // to the pool, so it can be re-used.
}

Handling a client session

Let's now build a function that handles a client sessions, invoking the database access logic in the process:

asio::awaitable<void> handle_session(mysql::connection_pool& pool, asio::ip::tcp::socket client_socket)
{
    // Read the request from the client.
    // async_read ensures that the 8-byte buffer is filled, handling partial reads.
    unsigned char message[8]{};
    co_await asio::async_read(client_socket, asio::buffer(message));

    // Parse the 64-bit big-endian int into a native int64_t
    std::int64_t employee_id = boost::endian::load_big_s64(message);

    // Invoke the database handling logic
    std::string response = co_await get_employee_details(pool, employee_id);

    // Write the response back to the client.
    // async_write ensures that the entire message is written, handling partial writes
    co_await asio::async_write(client_socket, asio::buffer(response));

    // The socket's destructor will close the client connection
}

Listening for connections

We now need logic to accept incoming TCP connections. We will use an asio::ip::tcp::acceptor object to accomplish it, listening for connections in a loop until the server is stopped:

asio::awaitable<void> listener(mysql::connection_pool& pool, unsigned short port)
{
    // An object that accepts incoming TCP connections.
    asio::ip::tcp::acceptor acc(co_await asio::this_coro::executor);

    // The endpoint where the server will listen.
    asio::ip::tcp::endpoint listening_endpoint(asio::ip::make_address("0.0.0.0"), port);

    // Open the acceptor
    acc.open(listening_endpoint.protocol());

    // Allow reusing the local address, so we can restart our server
    // without encountering errors in bind
    acc.set_option(asio::socket_base::reuse_address(true));

    // Bind to the local address
    acc.bind(listening_endpoint);

    // Start listening for connections
    acc.listen();
    std::cout << "Server listening at " << acc.local_endpoint() << std::endl;

    // Start the accept loop
    while (true)
    {
        // Accept a new connection
        auto sock = co_await acc.async_accept();

        // Function implementing our session logic.
        // Take ownership of the socket.
        // Having this as a named variable workarounds a gcc bug
        // (https://gcc.gnu.org/bugzilla/show_bug.cgi?id=107288)
        auto session_logic = [&pool, s = std::move(sock)]() mutable {
            return handle_session(pool, std::move(s));
        };

        // Launch a coroutine that runs our session logic.
        // We don't co_await this coroutine so we can listen
        // to new connections while the session is running.
        asio::co_spawn(
            // Use the same executor as the current coroutine
            co_await asio::this_coro::executor,

            // Session logic
            std::move(session_logic),

            // Propagate exceptions thrown in handle_session
            [](std::exception_ptr ex) {
                if (ex)
                    std::rethrow_exception(ex);
            }
        );
    }
}

Waiting for signals

Finally, we need a way to stop our program. We will use an asio::signal_set object to catch signals, and call io_context::stop when Ctrl-C is pressed:

// signal_set is an I/O object that allows waiting for signals
asio::signal_set signals(ctx, SIGINT, SIGTERM);

// Wait for signals
signals.async_wait([&](boost::system::error_code, int) {
    // Stop the execution context. This will cause io_context::run to return
    ctx.stop();
});

Putting all these pieces together, our main program becomes:

// Create an I/O context, required by all I/O objects
asio::io_context ctx;

// pool_params contains configuration for the pool.
// You must specify enough information to establish a connection,
// including the server address and credentials.
// You can configure a lot of other things, like pool limits
mysql::pool_params params;
params.server_address.emplace_host_and_port(server_hostname);
params.username = username;
params.password = password;
params.database = "boost_mysql_examples";

// Construct the pool.
// ctx will be used to create the connections and other I/O objects
mysql::connection_pool pool(ctx, std::move(params));

// You need to call async_run on the pool before doing anything useful with it.
// async_run creates connections and keeps them healthy. It must be called
// only once per pool.
// The detached completion token means that we don't want to be notified when
// the operation ends. It's similar to a no-op callback.
pool.async_run(asio::detached);

// signal_set is an I/O object that allows waiting for signals
asio::signal_set signals(ctx, SIGINT, SIGTERM);

// Wait for signals
signals.async_wait([&](boost::system::error_code, int) {
    // Stop the execution context. This will cause io_context::run to return
    ctx.stop();
});

// Launch our listener
asio::co_spawn(
    ctx,
    [&pool, listener_port] { return listener(pool, listener_port); },
    // If any exception is thrown in the coroutine body, rethrow it.
    [](std::exception_ptr ptr) {
        if (ptr)
        {
            std::rethrow_exception(ptr);
        }
    }
);

// Calling run will actually execute the coroutine until completion
ctx.run();

Wrapping up

Full program listing for this tutorial is here.

For simplicity, we've left error handling out of this tutorial. This is usually very important in a server like the one we've written, and is the topic of our next tutorial.


PrevUpHomeNext