...one of the most highly
regarded and expertly designed C++ library projects in the
world.
— Herb Sutter and Andrei
Alexandrescu, C++
Coding Standards
As you may already know, we recommend using asynchronous functions over sync
ones because they are more versatile and scalable. Additionally, some classes
like connection_pool
do not offer a sync API.
If your entire application uses Asio, you can use async functions everywhere as explained in the tutorials. However, some legacy applications are inherently synchronous, and might need to call asynchronous code and wait for it synchronously.
This section explains how to handle these cases. We will build a synchronous
function that retrieves an employee object from the database given their ID.
It will use connection_pool
and asio::cancel_after
, which can only be accessed
through asynchronous functions.
asio::use_future
is a completion
token that does what we want: it launches an asynchronous operation
and returns a std::future
that will complete when the task finishes.
With this knowledge, we can write a first version of our function:
// Gets an employee's name given their ID, using a connection pool. This is a sync function. std::optional<employee> get_employee_by_id(mysql::connection_pool& pool, std::int64_t id) { // Get a connection from the pool. This will launch the operation, but won't wait for it std::future<mysql::pooled_connection> fut = pool.async_get_connection(asio::use_future); // Block the current thread until the operation completes. // As we will explain later, you need a thread running your execution context for this to complete mysql::pooled_connection conn = fut.get(); // There is a sync version of execute, so we can use it mysql::static_results<employee> r; conn->execute(mysql::with_params("SELECT * FROM employee WHERE id = {}", id), r); // Done return r.rows().empty() ? std::optional<employee>() : r.rows()[0]; }
For this to work, we need a thread that runs the execution context (event loop).
This is, calling get()
on the future doesn't run the event loop. Also note that our function will
be called from a thread different to the one running the execution context,
so we need to make our pool thread-safe:
// Initialization code - run this once at program startup // Execution context, required to run all async operations. // This is equivalent to using asio::io_context and a thread that calls run() asio::thread_pool ctx(1); // Use only one thread // Create the connection pool mysql::pool_params params; params.server_address.emplace_host_and_port(server_hostname); params.username = mysql_username; params.password = mysql_password; params.database = "boost_mysql_examples"; params.thread_safe = true; // allow initiating async_get_connection from any thread mysql::connection_pool pool(ctx, std::move(params)); pool.async_run(asio::detached);
As you might know, async_get_connection
may block indefinitely, so we should use asio::cancel_after
to set a timeout. We might be tempted to do this:
std::optional<employee> get_employee_by_id(mysql::connection_pool& pool, std::int64_t id) { using namespace std::chrono_literals; // Do NOT do this!! This is a race condition!! auto fut = pool.async_get_connection(asio::cancel_after(10s, asio::use_future)); // ... }
It might not be obvious, but this is a data race. asio::cancel_after
creates a timer under the hood. This timer is shared between the thread calling
async_get_connection
and the
one running the execution context. The race condition goes like this:
async_get_connection
sets up the timer required by asio::cancel_after
.
async_get_connection
operation. As a result, the timer is cancelled. Thus, the timer is accessed
concurrently from both threads without protection.
Note that this happens even if the pool is thread-safe because the timer is not part of the pool.
To work this around, we can use a strand,
Asio's mechanism to protect against data races. We will create a strand, then
enter it and use it to run async_get_connection
.
This is a chain of asynchronous operations, so we can use an asio::deferred
chain to implement it:
std::optional<employee> get_employee_by_id(mysql::connection_pool& pool, std::int64_t id) { using namespace std::chrono_literals; // Create a strand for this operation. Strands require an underlying // executor. Use the pool's executor, which points to the thread_pool we created. auto strand = asio::make_strand(pool.get_executor()); // First enter the strand with asio::dispatch, then call async_get_connection through the strand. // asio::dispatch + asio::bind_executor is Asio's standard way to "run a function in an executor" std::future<mysql::pooled_connection> fut = asio::dispatch( // bind_executor binds an executor to a completion token. // deferred creates an async chain asio::bind_executor( strand, asio::deferred([&] { // This function will be called when we're in the strand and determines what to do next return pool.async_get_connection( asio::cancel_after(10s, asio::bind_executor(strand, asio::deferred)) ); }) ) )(asio::use_future); // Initiate the chain and convert it into a future // Wait for the async chain to finish mysql::pooled_connection conn = fut.get(); // Execute as in the previous version mysql::static_results<employee> r; conn->execute(mysql::with_params("SELECT * FROM employee WHERE id = {}", id), r); return r.rows().empty() ? std::optional<employee>() : r.rows()[0]; }
Don't worry if this looks intimidating. Let's break this down into pieces:
asio::dispatch
and similar functions to submit
work to it.
asio::dispatch
submits a piece of work to an executor. We specify the work to execute
as a completion token. It uses the executor bound to the passed completion
token.
asio::bind_executor
binds an executor to a completion token. Here, we're binding the strand
to a deferred completion chain. This means that dispatch
will use the strand to run its work.
asio::deferred
to an async operation, like dispatch
,
it returns a packaged async operation. We can call the operation with any
completion token to initiate it. Here, we use asio::use_future
to transform the operation into a future. If we were in a C++20 coroutine,
we could co_await the returned object, too.
deferred
will be executed when the first operation completes, and determines what
to do next. This is similar to JavaScript promise chains. Our next operation
is async_get_connection
.
bind_executor
with
asio::deferred
to make any intermediate handlers
used by async_get_connection
and asio::cancel_after
go through the strand, effectively
protecting our timer.
Deferred compositions can be used even in C++11, but they can get messy pretty fast. Reasoning about their thread safety is non-trivial, either.
If you're in C++20 or above, a cleaner approach is to encapsulate all operations involving networking into a coroutine:
// Gets an employee's name given their ID, using a connection pool. This is a sync function. std::optional<employee> get_employee_by_id(mysql::connection_pool& pool, std::int64_t id) { using namespace std::chrono_literals; // Spawn a coroutine in the pool's executor - that is, in the thread_pool. // Since the pool has only one thread, and all code in the coroutine runs within that thread, // there is no need for a strand here. // co_spawn is an async operation, and can be used with asio::use_future std::future<std::optional<employee>> fut = asio::co_spawn( pool.get_executor(), [&pool, id]() -> asio::awaitable<std::optional<employee>> { // Get a connection from the pool auto conn = co_await pool.async_get_connection(asio::cancel_after(30s)); // Execute mysql::static_results<employee> r; co_await conn->async_execute( mysql::with_params("SELECT * FROM employee WHERE id = {}", id), r, asio::cancel_after(30s) ); // Done co_return r.rows().empty() ? std::optional<employee>() : r.rows()[0]; }, asio::use_future ); // Wait for the future return fut.get(); }
We're keeping all interactions with the connection_pool
within coroutines, so we don't need to make it thread-safe anymore:
// Initialization code - run this once at program startup // Execution context, required to run all async operations. // This is equivalent to asio::io_context plus a thread calling run() asio::thread_pool ctx(1); // Create the connection pool. The pool is NOT thread-safe mysql::pool_params params; params.server_address.emplace_host_and_port(server_hostname); params.username = mysql_username; params.password = mysql_password; params.database = "boost_mysql_examples"; mysql::connection_pool pool(ctx, std::move(params)); // Run the pool. async_run should be executed in the thread_pool's thread - // otherwise, we have a race condition asio::dispatch(asio::bind_executor(ctx.get_executor(), [&] { pool.async_run(asio::detached); }));
If you can't use C++20, you can still use asio::spawn
or
imitate the behavior of asio::use_future
with callbacks. This is what the latter could look like:
// Gets an employee's name given their ID, using a connection pool. This is a sync function. std::optional<employee> get_employee_by_id(mysql::connection_pool& pool, std::int64_t id) { using namespace std::chrono_literals; // A promise, so we can wait for the task to complete std::promise<std::optional<employee>> prom; // These temporary variables should be kept alive until all async operations // complete, so they're declared here mysql::pooled_connection conn; mysql::static_results<employee> r; // Ensure that everything runs within the thread pool asio::dispatch(asio::bind_executor(pool.get_executor(), [&] { // Get a connection from the pool pool.async_get_connection(asio::cancel_after( 30s, [&](boost::system::error_code ec, mysql::pooled_connection temp_conn) { if (ec) { // If there was an error getting the connection, complete the promise and return prom.set_exception(std::make_exception_ptr(boost::system::system_error(ec))); } else { // Store the connection somewhere. If it gets destroyed, it's returned to the pool conn = std::move(temp_conn); // Start executing the query conn->async_execute( mysql::with_params("SELECT * FROM employee WHERE id = {}", id), r, asio::cancel_after( 30s, [&](boost::system::error_code ec) { if (ec) { // If there was an error, complete the promise and return prom.set_exception(std::make_exception_ptr(boost::system::system_error(ec) )); } else { // Done prom.set_value( r.rows().empty() ? std::optional<employee>() : r.rows()[0] ); } } ) ); } } )); })); return prom.get_future().get(); }
It's not as clean, but the idea remains the same.