...one of the most highly
regarded and expertly designed C++ library projects in the
world.
— Herb Sutter and Andrei
Alexandrescu, C++
Coding Standards
Naturally, just as with wait_first_outcome()
,
we can elaborate wait_all_values()
and wait_all_values_source()
by passing future<
T >
instead of plain T
.
wait_all_until_error()
pops that future< T >
and calls its future::get()
:
template< typename Fn, typename ... Fns > std::vector< typename std::result_of< Fn() >::type > wait_all_until_error( Fn && function, Fns && ... functions) { std::size_t count( 1 + sizeof ... ( functions) ); typedef typename std::result_of< Fn() >::type return_t; typedef typename boost::fibers::future< return_t > future_t; typedef std::vector< return_t > vector_t; vector_t results; results.reserve( count); // get channel std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan( wait_all_until_error_source( std::forward< Fn >( function), std::forward< Fns >( functions) ... ) ); // fill results vector future_t future; while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { results.push_back( future.get() ); } // return vector to caller return results; }
For example:
std::string thrown; try { std::vector< std::string > values = wait_all_until_error( [](){ return sleeper("waue_late", 150); }, [](){ return sleeper("waue_middle", 100, true); }, [](){ return sleeper("waue_early", 50); }); } catch ( std::exception const& e) { thrown = e.what(); } std::cout << "wait_all_until_error(fail) threw '" << thrown << "'" << std::endl;
Naturally this complicates the
API for wait_all_until_error_source()
. The caller must both retrieve a future<
T >
and call its get()
method. It would, of course, be possible to return a façade over the consumer
end of the queue that would implicitly perform the get()
and return a simple T
(or throw).
The implementation is just as you would expect. Notice, however, that we
can reuse wait_first_outcome_impl()
, passing the nqueue<T>
rather than queue<T>
.
// Return a shared_ptr<buffered_channel<future<T>>> from which the caller can // get() each new result as it arrives, until 'closed'. template< typename Fn, typename ... Fns > std::shared_ptr< boost::fibers::buffered_channel< boost::fibers::future< typename std::result_of< Fn() >::type > > > wait_all_until_error_source( Fn && function, Fns && ... functions) { std::size_t count( 1 + sizeof ... ( functions) ); typedef typename std::result_of< Fn() >::type return_t; typedef boost::fibers::future< return_t > future_t; typedef boost::fibers::buffered_channel< future_t > channel_t; // make the channel auto chanp( std::make_shared< channel_t >( 64) ); // and make an nchannel facade to close it after 'count' items auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) ); // pass that nchannel facade to all the relevant fibers wait_first_outcome_impl< return_t >( ncp, std::forward< Fn >( function), std::forward< Fns >( functions) ... ); // then return the channel for consumer return chanp; }
For example:
typedef boost::fibers::future< std::string > future_t; std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan = wait_all_until_error_source( [](){ return sleeper("wauess_third", 150); }, [](){ return sleeper("wauess_second", 100); }, [](){ return sleeper("wauess_first", 50); }); future_t future; while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { std::string value( future.get() ); std::cout << "wait_all_until_error_source(success) => '" << value << "'" << std::endl; }