...one of the most highly
regarded and expertly designed C++ library projects in the
world.
— Herb Sutter and Andrei
Alexandrescu, C++
Coding Standards
One scenario for “when_any” functionality is when we're redundantly contacting some number of possibly-unreliable web services. Not only might they be slow — any one of them might produce a failure rather than the desired result.
In such a case, wait_first_outcome()
isn't the right approach. If one
of the services produces an error quickly, while another follows up with
a real answer, we don't want to prefer the error just because it arrived
first!
Given the queue<
future<
T >
>
we already constructed for
wait_first_outcome()
,
though, we can readily recast the interface function to deliver the first
successful result.
That does beg the question: what if all the task functions throw an exception? In that case we'd probably better know about it.
The C++
Parallelism Draft Technical Specification proposes a std::exception_list
exception capable of delivering
a collection of std::exception_ptr
s. Until that becomes universally
available, let's fake up an exception_list
of our own:
class exception_list : public std::runtime_error { public: exception_list( std::string const& what) : std::runtime_error( what) { } typedef std::vector< std::exception_ptr > bundle_t; // N4407 proposed std::exception_list API typedef bundle_t::const_iterator iterator; std::size_t size() const noexcept { return bundle_.size(); } iterator begin() const noexcept { return bundle_.begin(); } iterator end() const noexcept { return bundle_.end(); } // extension to populate void add( std::exception_ptr ep) { bundle_.push_back( ep); } private: bundle_t bundle_; };
Now we can build wait_first_success()
, using wait_first_outcome_impl()
.
Instead of retrieving only the first future<>
from the queue, we must now loop
over future<>
items. Of course we must limit that iteration! If we launch only count
producer fibers, the (count+1)
st
buffered_channel::pop()
call
would block forever.
Given a ready future<>
,
we can distinguish failure by calling future::get_exception_ptr()
.
If the future<>
in fact contains a result rather than an exception, get_exception_ptr()
returns nullptr
.
In that case, we can confidently call future::get()
to return
that result to our caller.
If the std::exception_ptr
is not
nullptr
, though, we collect
it into our pending exception_list
and loop back for the next future<>
from the queue.
If we fall out of the loop — if every single task fiber threw an exception
— we throw the exception_list
exception into which we've been collecting those std::exception_ptr
s.
template< typename Fn, typename ... Fns > typename std::result_of< Fn() >::type wait_first_success( Fn && function, Fns && ... functions) { std::size_t count( 1 + sizeof ... ( functions) ); // In this case, the value we pass through the channel is actually a // future -- which is already ready. future can carry either a value or an // exception. typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t; typedef boost::fibers::future< return_t > future_t; typedef boost::fibers::buffered_channel< future_t > channel_t; auto chanp( std::make_shared< channel_t >( 64) ); // launch all the relevant fibers wait_first_outcome_impl< return_t >( chanp, std::forward< Fn >( function), std::forward< Fns >( functions) ... ); // instantiate exception_list, just in case exception_list exceptions("wait_first_success() produced only errors"); // retrieve up to 'count' results -- but stop there! for ( std::size_t i = 0; i < count; ++i) { // retrieve the next future future_t future( chanp->value_pop() ); // retrieve exception_ptr if any std::exception_ptr error( future.get_exception_ptr() ); // if no error, then yay, return value if ( ! error) { // close the channel: no subsequent push() has to succeed chanp->close(); // show caller the value we got return future.get(); } // error is non-null: collect exceptions.add( error); } // We only arrive here when every passed function threw an exception. // Throw our collection to inform caller. throw exceptions; }
A call might look like this:
std::string result = wait_first_success( [](){ return sleeper("wfss_first", 50, true); }, [](){ return sleeper("wfss_second", 100); }, [](){ return sleeper("wfss_third", 150); }); std::cout << "wait_first_success(success) => " << result << std::endl; assert(result == "wfss_second");