...one of the most highly
regarded and expertly designed C++ library projects in the
world.
— Herb Sutter and Andrei
Alexandrescu, C++
Coding Standards
As soon as we want to collect return values from all the task functions,
we can see right away how to reuse wait_first_value()
's
queue<T> for the purpose. All we have to do is avoid closing it after
the first value!
But in fact, collecting multiple values raises an interesting question: do we really want to wait until the slowest of them has arrived? Wouldn't we rather process each result as soon as it becomes available?
Fortunately we can present both APIs. Let's define wait_all_values_source()
to return shared_ptr<buffered_channel<T>>
.
Given wait_all_values_source()
, it's straightforward to implement wait_all_values()
:
template< typename Fn, typename ... Fns > std::vector< typename std::result_of< Fn() >::type > wait_all_values( Fn && function, Fns && ... functions) { std::size_t count( 1 + sizeof ... ( functions) ); typedef typename std::result_of< Fn() >::type return_t; typedef std::vector< return_t > vector_t; vector_t results; results.reserve( count); // get channel std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan = wait_all_values_source( std::forward< Fn >( function), std::forward< Fns >( functions) ... ); // fill results vector return_t value; while ( boost::fibers::channel_op_status::success == chan->pop(value) ) { results.push_back( value); } // return vector to caller return results; }
It might be called like this:
std::vector< std::string > values = wait_all_values( [](){ return sleeper("wav_late", 150); }, [](){ return sleeper("wav_middle", 100); }, [](){ return sleeper("wav_early", 50); });
As you can see from the loop in wait_all_values()
, instead of requiring its caller to count
values, we define wait_all_values_source()
to buffered_channel::close()
the
queue when done. But how do we do that? Each producer fiber is independent.
It has no idea whether it is the last one to buffered_channel::push()
a
value.
We can address that problem with a counting façade
for the queue<>
.
In fact, our façade need only support the producer end of the queue.
[wait_nqueue]
Armed with nqueue<>
, we can implement wait_all_values_source()
.
It starts just like wait_first_value()
. The difference is that we wrap
the queue<T>
with an nqueue<T>
to pass to the producer fibers.
Then, of course, instead of popping the first value, closing the queue
and returning it, we simply return the shared_ptr<queue<T>>
.
// Return a shared_ptr<buffered_channel<T>> from which the caller can // retrieve each new result as it arrives, until 'closed'. template< typename Fn, typename ... Fns > std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > > wait_all_values_source( Fn && function, Fns && ... functions) { std::size_t count( 1 + sizeof ... ( functions) ); typedef typename std::result_of< Fn() >::type return_t; typedef boost::fibers::buffered_channel< return_t > channel_t; // make the channel auto chanp( std::make_shared< channel_t >( 64) ); // and make an nchannel facade to close it after 'count' items auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) ); // pass that nchannel facade to all the relevant fibers wait_all_values_impl< return_t >( ncp, std::forward< Fn >( function), std::forward< Fns >( functions) ... ); // then return the channel for consumer return chanp; }
For example:
std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan = wait_all_values_source( [](){ return sleeper("wavs_third", 150); }, [](){ return sleeper("wavs_second", 100); }, [](){ return sleeper("wavs_first", 50); }); std::string value; while ( boost::fibers::channel_op_status::success == chan->pop(value) ) { std::cout << "wait_all_values_source() => '" << value << "'" << std::endl; }
wait_all_values_impl()
really is just like wait_first_value_impl()
except for the use of nqueue<T>
rather than queue<T>
:
template< typename T, typename Fn > void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan, Fn && function) { boost::fibers::fiber( [chan, function](){ chan->push(function()); }).detach(); }