Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

PrevUpHomeNext

(Experimental) Pipelines

Functions like execute, prepare_statement and their async counterparts are half-duplex: they write a single request to the server and wait for its response. In contrast, pipelines can increase efficiency by coalescing several requests into a single message, saving round-trips to the server.

[Warning] Warning

The MySQL client/server protocol doesn't have explicit support for pipelines. From the server's point of view, a pipeline is just a sequence of unrelated requests. The server will try to execute all stages in each pipeline, regardless of the result of previous stages. Pipelines are considered an advanced feature. Please read the pitfalls section for more info.

[Note] Note

This feature is experimental. Its API may change in subsequent releases.

Use cases

You should use pipelines for lightweight operations, dominated by round-trip time. Typical examples include:

You should avoid pipelines for the following cases:

If you're not sure, don't use this feature.

Pipeline requests and responses

To run a pipeline, create a pipeline_request object describing what should the pipeline do:

// Create a pipeline request and add three stages to it.
// When run, this pipeline will set the connection's character set to utf8mb4
// and prepare two statements.
pipeline_request req;
req.add_set_character_set(utf8mb4_charset)
    .add_prepare_statement("INSERT INTO audit_log (t, msg) VALUES (?, ?)")
    .add_prepare_statement("INSERT INTO employee (company_id, first_name, last_name) VALUES (?, ?, ?)");

We're using add_execute and add_prepare_statement to add stages to our pipeline. You can find all available stage types in the reference section.

To actually run the pipeline, create a response object and call any_connection::run_pipeline or async_run_pipeline:

// Run the pipeline request req, and store responses into res
// stage_response is a variant-like type that can store the response
// of any stage type (including results and statements).
std::vector<stage_response> res;
conn.run_pipeline(req, res);

Finally, you can access the statements using stage_response::as_statement:

// The 2nd and 3rd stages were statement preparation requests,
// so res[1] and res[2] contain statement objects
statement stmt1 = res[1].as_statement();
statement stmt2 = res[2].as_statement();

If your pipeline contains an execution stage, it will generate a results object that can be accessed using stage_response::as_results.

Error handling

If any of the pipeline stages result in an error, the entire run_pipeline operation is considered failed. This means that if run_pipipeline completed successfully, all stages succeeded. Recall that all stages are always run, regardless of the outcome of previous stages.

If run_pipipeline fails, you can check which stages succeeded and failed by inspecting responses. stage_response::error and stage_response::diag will return error information about failed steps. For instance:

// The second step in the pipeline will fail, the other ones will succeeded
pipeline_request req;
req.add_set_character_set(utf8mb4_charset)
    .add_prepare_statement("INSERT INTO bad_table (t, msg) VALUES (?, ?)")  // will fail
    .add_prepare_statement("INSERT INTO employee (company_id, first_name, last_name) VALUES (?, ?, ?)");

std::vector<stage_response> res;
error_code ec;
diagnostics diag;

conn.run_pipeline(req, res, ec, diag);

// The overall operation failed
BOOST_TEST(ec == common_server_errc::er_no_such_table);

// You can check which stages failed using .error()
BOOST_TEST(res[0].error() == error_code());
BOOST_TEST(res[1].error() == common_server_errc::er_no_such_table);
BOOST_TEST(res[2].error() == error_code());

Potential pitfalls

All requests in the pipeline are always run, regardless of the outcome of previous requests. As a result, some pipelines can behave non-intuitively:

// This doesn't behave correctly - DO NOT DO THIS
// The first INSERT will fail due to a failed foreign key check (there is no such company),
// but COMMIT will still be run, thus leaving us with an inconsistent data model
pipeline_request req;

req.add_execute("START TRANSACTION")
    .add_execute(
        "INSERT INTO employee (first_name, last_name, company_id) VALUES ('John', 'Doe', 'bad')"
    )
    .add_execute("INSERT INTO logs VALUES ('Inserted 1 employee')")
    .add_execute("COMMIT");

Pipelines aren't the best fit here. Instead, you can express the same logic using semicolon-separated queries:

const char* sql =
    "START TRANSACTION;"
    "INSERT INTO employee (first_name, last_name, company_id) VALUES ('John', 'Doe', 'bad');"
    "INSERT INTO logs VALUES ('Inserted 1 employee');"
    "COMMIT";

// After the first INSERT fails, nothing else will be run. This is what we want.
// Note that you need to enable multi queries when connecting to be able to run this.
results r;
conn.execute(sql, r);

Pipeline stages are run sequentially by the server. If any of the stages involves a heavyweight query, the server won't process subsequent stages until the query completes.

Pipeline stage reference

In the table below, the following variables are assumed:

Stage type

Example

When run, equivalent to...

Response type

Execute: behaves like any_connection::execute

pipeline_request::add_execute
pipeline_request::add_execute_range

// Text query
req.add_execute("SELECT 1");

// Prepared statement, with number of parameters known at compile time
req.add_execute(stmt, "John", "Doe", 42);

// Prepared statement, with number of parameters unknown at compile time
std::vector<field_view> params{
    /* ... */
};
req.add_execute_range(stmt, params);
// Text query
conn.execute("SELECT 1", result);

// Prepared statement, with number of parameters known at compile time
conn.execute(stmt.bind("John", "Doe", 42), result);

// Prepared statement, with number of parameters unknown at compile time
std::vector<field_view> params{
    /* ... */
};
conn.execute(stmt.bind(params.begin(), params.end()), result);

results or an error

Prepare statement: behaves like any_connection::prepare_statement

pipeline_request::add_prepare_statement

req.add_prepare_statement("SELECT * FROM employee WHERE id = ?");
statement stmt = conn.prepare_statement("SELECT * FROM employee WHERE id = ?");

statement or an error

Close statement: behaves like any_connection::close_statement

pipeline_request::add_close_statement

req.add_close_statement(stmt);
conn.close_statement(stmt);

Possibly empty error

Reset connection: behaves like any_connection::reset_connection

pipeline_request::add_reset_connection

req.add_reset_connection();
conn.reset_connection();

Possibly empty error

Set character set: behaves like any_connection::set_character_set

pipeline_request::add_set_character_set

req.add_set_character_set(utf8mb4_charset);
conn.set_character_set(utf8mb4_charset);

Possibly empty error


PrevUpHomeNext