Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for a snapshot of the develop branch, built from commit 3bdf7dba54.
PrevUpHomeNext

when_any, produce first outcome, whether result or exception

We may not be running in an environment in which we can guarantee no exception will be thrown by any of our task functions. In that case, the above implementations of wait_first_something() would be naïve: as mentioned in the section on Fiber Management, an uncaught exception in one of our task fibers would cause std::terminate() to be called.

Let's at least ensure that such an exception would propagate to the fiber awaiting the first result. We can use future<> to transport either a return value or an exception. Therefore, we will change wait_first_value()'s buffered_channel<> to hold future< T > items instead of simply T.

Once we have a future<> in hand, all we need do is call future::get(), which will either return the value or rethrow the exception.

template< typename Fn, typename ... Fns >
typename std::result_of< Fn() >::type
wait_first_outcome( Fn && function, Fns && ... functions) {
    // In this case, the value we pass through the channel is actually a
    // future -- which is already ready. future can carry either a value or an
    // exception.
    typedef typename std::result_of< Fn() >::type return_t;
    typedef boost::fibers::future< return_t > future_t;
    typedef boost::fibers::buffered_channel< future_t > channel_t;
    auto chanp(std::make_shared< channel_t >( 64) );
    // launch all the relevant fibers
    wait_first_outcome_impl< return_t >( chanp,
                                         std::forward< Fn >( function),
                                         std::forward< Fns >( functions) ... );
    // retrieve the first future
    future_t future( chanp->value_pop() );
    // close the channel: no subsequent push() has to succeed
    chanp->close();
    // either return value or throw exception
    return future.get();
}

So far so good — but there's a timing issue. How should we obtain the future<> to buffered_channel::push() on the queue?

We could call fibers::async(). That would certainly produce a future<> for the task function. The trouble is that it would return too quickly! We only want future<> items for completed tasks on our queue<>. In fact, we only want the future<> for the one that completes first. If each fiber launched by wait_first_outcome() were to push() the result of calling async(), the queue would only ever report the result of the leftmost task item — not the one that completes most quickly.

Calling future::get() on the future returned by async() wouldn't be right. You can only call get() once per future<> instance! And if there were an exception, it would be rethrown inside the helper fiber at the producer end of the queue, rather than propagated to the consumer end.

We could call future::wait(). That would block the helper fiber until the future<> became ready, at which point we could push() it to be retrieved by wait_first_outcome().

That would work — but there's a simpler tactic that avoids creating an extra fiber. We can wrap the task function in a packaged_task<>. While one naturally thinks of passing a packaged_task<> to a new fiber — that is, in fact, what async() does — in this case, we're already running in the helper fiber at the producer end of the queue! We can simply call the packaged_task<>. On return from that call, the task function has completed, meaning that the future<> obtained from the packaged_task<> is certain to be ready. At that point we can simply push() it to the queue.

template< typename T, typename CHANP, typename Fn >
void wait_first_outcome_impl( CHANP chan, Fn && function) {
    boost::fibers::fiber(
        // Use std::bind() here for C++11 compatibility. C++11 lambda capture
        // can't move a move-only Fn type, but bind() can. Let bind() move the
        // channel pointer and the function into the bound object, passing
        // references into the lambda.
        std::bind(
            []( CHANP & chan,
                typename std::decay< Fn >::type & function) {
                // Instantiate a packaged_task to capture any exception thrown by
                // function.
                boost::fibers::packaged_task< T() > task( function);
                // Immediately run this packaged_task on same fiber. We want
                // function() to have completed BEFORE we push the future.
                task();
                // Pass the corresponding future to consumer. Ignore
                // channel_op_status returned by push(): might be closed; we
                // simply don't care.
                chan->push( task.get_future() );
            },
            chan,
            std::forward< Fn >( function)
        )).detach();
}

Calling it might look like this:

std::string result = wait_first_outcome(
        [](){ return sleeper("wfos_first",   50); },
        [](){ return sleeper("wfos_second", 100); },
        [](){ return sleeper("wfos_third",  150); });
std::cout << "wait_first_outcome(success) => " << result << std::endl;
assert(result == "wfos_first");

std::string thrown;
try {
    result = wait_first_outcome(
            [](){ return sleeper("wfof_first",   50, true); },
            [](){ return sleeper("wfof_second", 100); },
            [](){ return sleeper("wfof_third",  150); });
} catch ( std::exception const& e) {
    thrown = e.what();
}
std::cout << "wait_first_outcome(fail) threw '" << thrown
          << "'" << std::endl;
assert(thrown == "wfof_first");


PrevUpHomeNext