...one of the most highly
regarded and expertly designed C++ library projects in the
world.
— Herb Sutter and Andrei
Alexandrescu, C++
Coding Standards
We may not be running in an environment in which we can guarantee no exception
will be thrown by any of our task functions. In that case, the above implementations
of wait_first_something()
would be naïve: as mentioned in the section on Fiber Management, an uncaught
exception in one of our task fibers would cause std::terminate()
to be called.
Let's at least ensure that such an exception would propagate to the fiber
awaiting the first result. We can use future<>
to transport
either a return value or an exception. Therefore, we will change wait_first_value()
's buffered_channel<>
to
hold future<
T >
items instead of simply T
.
Once we have a future<>
in hand, all we need do is call future::get()
, which will either
return the value or rethrow the exception.
template< typename Fn, typename ... Fns > typename std::result_of< Fn() >::type wait_first_outcome( Fn && function, Fns && ... functions) { // In this case, the value we pass through the channel is actually a // future -- which is already ready. future can carry either a value or an // exception. typedef typename std::result_of< Fn() >::type return_t; typedef boost::fibers::future< return_t > future_t; typedef boost::fibers::buffered_channel< future_t > channel_t; auto chanp(std::make_shared< channel_t >( 64) ); // launch all the relevant fibers wait_first_outcome_impl< return_t >( chanp, std::forward< Fn >( function), std::forward< Fns >( functions) ... ); // retrieve the first future future_t future( chanp->value_pop() ); // close the channel: no subsequent push() has to succeed chanp->close(); // either return value or throw exception return future.get(); }
So far so good — but there's a timing issue. How should we obtain the future<>
to buffered_channel::push()
on the queue?
We could call fibers::async()
. That would certainly produce
a future<>
for the task function. The trouble is that it would return too quickly!
We only want future<>
items for completed tasks on our queue<>
. In fact, we only want the future<>
for the one that completes first. If each fiber launched by wait_first_outcome()
were to push()
the result of calling async()
, the queue would only ever report the
result of the leftmost task item — not the one that
completes most quickly.
Calling future::get()
on the future returned by async()
wouldn't be right. You can only call get()
once per future<>
instance! And if there were an
exception, it would be rethrown inside the helper fiber at the producer
end of the queue, rather than propagated to the consumer end.
We could call future::wait()
. That would block the helper fiber
until the future<>
became ready, at which point we could push()
it to be retrieved by wait_first_outcome()
.
That would work — but there's a simpler tactic that avoids creating an extra
fiber. We can wrap the task function in a packaged_task<>
.
While one naturally thinks of passing a packaged_task<>
to a new fiber — that is, in fact,
what async()
does — in this case, we're already running in the helper fiber at the producer
end of the queue! We can simply call the packaged_task<>
.
On return from that call, the task function has completed, meaning that
the future<>
obtained from the packaged_task<>
is certain to be ready. At that
point we can simply push()
it to the queue.
template< typename T, typename CHANP, typename Fn > void wait_first_outcome_impl( CHANP chan, Fn && function) { boost::fibers::fiber( // Use std::bind() here for C++11 compatibility. C++11 lambda capture // can't move a move-only Fn type, but bind() can. Let bind() move the // channel pointer and the function into the bound object, passing // references into the lambda. std::bind( []( CHANP & chan, typename std::decay< Fn >::type & function) { // Instantiate a packaged_task to capture any exception thrown by // function. boost::fibers::packaged_task< T() > task( function); // Immediately run this packaged_task on same fiber. We want // function() to have completed BEFORE we push the future. task(); // Pass the corresponding future to consumer. Ignore // channel_op_status returned by push(): might be closed; we // simply don't care. chan->push( task.get_future() ); }, chan, std::forward< Fn >( function) )).detach(); }
Calling it might look like this:
std::string result = wait_first_outcome( [](){ return sleeper("wfos_first", 50); }, [](){ return sleeper("wfos_second", 100); }, [](){ return sleeper("wfos_third", 150); }); std::cout << "wait_first_outcome(success) => " << result << std::endl; assert(result == "wfos_first"); std::string thrown; try { result = wait_first_outcome( [](){ return sleeper("wfof_first", 50, true); }, [](){ return sleeper("wfof_second", 100); }, [](){ return sleeper("wfof_third", 150); }); } catch ( std::exception const& e) { thrown = e.what(); } std::cout << "wait_first_outcome(fail) threw '" << thrown << "'" << std::endl; assert(thrown == "wfof_first");