Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for a snapshot of the master branch, built from commit eef40d496b.
PrevUpHomeNext

Interfacing sync and async code: using connection_pool in sync code

As you may already know, we recommend using asynchronous functions over sync ones because they are more versatile and scalable. Additionally, some classes like connection_pool do not offer a sync API.

If your entire application uses Asio, you can use async functions everywhere as explained in the tutorials. However, some legacy applications are inherently synchronous, and might need to call asynchronous code and wait for it synchronously.

This section explains how to handle these cases. We will build a synchronous function that retrieves an employee object from the database given their ID. It will use connection_pool and asio::cancel_after, which can only be accessed through asynchronous functions.

The asio::use_future completion token

asio::use_future is a completion token that does what we want: it launches an asynchronous operation and returns a std::future that will complete when the task finishes.

With this knowledge, we can write a first version of our function:

// Gets an employee's name given their ID, using a connection pool. This is a sync function.
std::optional<employee> get_employee_by_id(mysql::connection_pool& pool, std::int64_t id)
{
    // Get a connection from the pool. This will launch the operation, but won't wait for it
    std::future<mysql::pooled_connection> fut = pool.async_get_connection(asio::use_future);

    // Block the current thread until the operation completes.
    // As we will explain later, you need a thread running your execution context for this to complete
    mysql::pooled_connection conn = fut.get();

    // There is a sync version of execute, so we can use it
    mysql::static_results<employee> r;
    conn->execute(mysql::with_params("SELECT * FROM employee WHERE id = {}", id), r);

    // Done
    return r.rows().empty() ? std::optional<employee>() : r.rows()[0];
}

For this to work, we need a thread that runs the execution context (event loop). This is, calling get() on the future doesn't run the event loop. Also note that our function will be called from a thread different to the one running the execution context, so we need to make our pool thread-safe:

// Initialization code - run this once at program startup

// Execution context, required to run all async operations.
// This is equivalent to using asio::io_context and a thread that calls run()
asio::thread_pool ctx(1);  // Use only one thread

// Create the connection pool
mysql::pool_params params;
params.server_address.emplace_host_and_port(server_hostname);
params.username = mysql_username;
params.password = mysql_password;
params.database = "boost_mysql_examples";
params.thread_safe = true;  // allow initiating async_get_connection from any thread
mysql::connection_pool pool(ctx, std::move(params));
pool.async_run(asio::detached);

Adding timeouts

As you might know, async_get_connection may block indefinitely, so we should use asio::cancel_after to set a timeout. We might be tempted to do this:

std::optional<employee> get_employee_by_id(mysql::connection_pool& pool, std::int64_t id)
{
    using namespace std::chrono_literals;

    // Do NOT do this!! This is a race condition!!
    auto fut = pool.async_get_connection(asio::cancel_after(10s, asio::use_future));

    // ...
}

It might not be obvious, but this is a data race. asio::cancel_after creates a timer under the hood. This timer is shared between the thread calling async_get_connection and the one running the execution context. The race condition goes like this:

Note that this happens even if the pool is thread-safe because the timer is not part of the pool.

To work this around, we can use a strand, Asio's mechanism to protect against data races. We will create a strand, then enter it and use it to run async_get_connection. This is a chain of asynchronous operations, so we can use an asio::deferred chain to implement it:

std::optional<employee> get_employee_by_id(mysql::connection_pool& pool, std::int64_t id)
{
    using namespace std::chrono_literals;

    // Create a strand for this operation. Strands require an underlying
    // executor. Use the pool's executor, which points to the thread_pool we created.
    auto strand = asio::make_strand(pool.get_executor());

    // First enter the strand with asio::dispatch, then call async_get_connection through the strand.
    // asio::dispatch + asio::bind_executor is Asio's standard way to "run a function in an executor"
    std::future<mysql::pooled_connection> fut = asio::dispatch(
        // bind_executor binds an executor to a completion token.
        // deferred creates an async chain
        asio::bind_executor(
            strand,
            asio::deferred([&] {
                // This function will be called when we're in the strand and determines what to do next
                return pool.async_get_connection(
                    asio::cancel_after(10s, asio::bind_executor(strand, asio::deferred))
                );
            })
        )
    )(asio::use_future); // Initiate the chain and convert it into a future

    // Wait for the async chain to finish
    mysql::pooled_connection conn = fut.get();

    // Execute as in the previous version
    mysql::static_results<employee> r;
    conn->execute(mysql::with_params("SELECT * FROM employee WHERE id = {}", id), r);
    return r.rows().empty() ? std::optional<employee>() : r.rows()[0];
}

Don't worry if this looks intimidating. Let's break this down into pieces:

Refactoring to use C++20 coroutines

Deferred compositions can be used even in C++11, but they can get messy pretty fast. Reasoning about their thread safety is non-trivial, either.

If you're in C++20 or above, a cleaner approach is to encapsulate all operations involving networking into a coroutine:

// Gets an employee's name given their ID, using a connection pool. This is a sync function.
std::optional<employee> get_employee_by_id(mysql::connection_pool& pool, std::int64_t id)
{
    using namespace std::chrono_literals;

    // Spawn a coroutine in the pool's executor - that is, in the thread_pool.
    // Since the pool has only one thread, and all code in the coroutine runs within that thread,
    // there is no need for a strand here.
    // co_spawn is an async operation, and can be used with asio::use_future
    std::future<std::optional<employee>> fut = asio::co_spawn(
        pool.get_executor(),
        [&pool, id]() -> asio::awaitable<std::optional<employee>> {
            // Get a connection from the pool
            auto conn = co_await pool.async_get_connection(asio::cancel_after(30s));

            // Execute
            mysql::static_results<employee> r;
            co_await conn->async_execute(
                mysql::with_params("SELECT * FROM employee WHERE id = {}", id),
                r,
                asio::cancel_after(30s)
            );

            // Done
            co_return r.rows().empty() ? std::optional<employee>() : r.rows()[0];
        },
        asio::use_future
    );

    // Wait for the future
    return fut.get();
}

We're keeping all interactions with the connection_pool within coroutines, so we don't need to make it thread-safe anymore:

// Initialization code - run this once at program startup

// Execution context, required to run all async operations.
// This is equivalent to asio::io_context plus a thread calling run()
asio::thread_pool ctx(1);

// Create the connection pool. The pool is NOT thread-safe
mysql::pool_params params;
params.server_address.emplace_host_and_port(server_hostname);
params.username = mysql_username;
params.password = mysql_password;
params.database = "boost_mysql_examples";
mysql::connection_pool pool(ctx, std::move(params));

// Run the pool. async_run should be executed in the thread_pool's thread -
// otherwise, we have a race condition
asio::dispatch(asio::bind_executor(ctx.get_executor(), [&] { pool.async_run(asio::detached); }));

If C++20 is not available

If you can't use C++20, you can still use asio::spawn or imitate the behavior of asio::use_future with callbacks. This is what the latter could look like:

// Gets an employee's name given their ID, using a connection pool. This is a sync function.
std::optional<employee> get_employee_by_id(mysql::connection_pool& pool, std::int64_t id)
{
    using namespace std::chrono_literals;

    // A promise, so we can wait for the task to complete
    std::promise<std::optional<employee>> prom;

    // These temporary variables should be kept alive until all async operations
    // complete, so they're declared here
    mysql::pooled_connection conn;
    mysql::static_results<employee> r;

    // Ensure that everything runs within the thread pool
    asio::dispatch(asio::bind_executor(pool.get_executor(), [&] {
        // Get a connection from the pool
        pool.async_get_connection(asio::cancel_after(
            30s,
            [&](boost::system::error_code ec, mysql::pooled_connection temp_conn) {
                if (ec)
                {
                    // If there was an error getting the connection, complete the promise and return
                    prom.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                }
                else
                {
                    // Store the connection somewhere. If it gets destroyed, it's returned to the pool
                    conn = std::move(temp_conn);

                    // Start executing the query
                    conn->async_execute(
                        mysql::with_params("SELECT * FROM employee WHERE id = {}", id),
                        r,
                        asio::cancel_after(
                            30s,
                            [&](boost::system::error_code ec) {
                                if (ec)
                                {
                                    // If there was an error, complete the promise and return
                                    prom.set_exception(std::make_exception_ptr(boost::system::system_error(ec)
                                    ));
                                }
                                else
                                {
                                    // Done
                                    prom.set_value(
                                        r.rows().empty() ? std::optional<employee>() : r.rows()[0]
                                    );
                                }
                            }
                        )
                    );
                }
            }
        ));
    }));

    return prom.get_future().get();
}

It's not as clean, but the idea remains the same.


PrevUpHomeNext