Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.
PrevUpHomeNext

when_all, return values

As soon as we want to collect return values from all the task functions, we can see right away how to reuse wait_first_value()'s channel<T> for the purpose. All we have to do is avoid closing it after the first value!

But in fact, collecting multiple values raises an interesting question: do we really want to wait until the slowest of them has arrived? Wouldn't we rather process each result as soon as it becomes available?

Fortunately we can present both APIs. Let's define wait_all_values_source() to return shared_ptr<unbounded_channel<T>>.[6]

Given wait_all_values_source(), it's straightforward to implement wait_all_values():

template< typename Fn, typename ... Fns >
std::vector< typename std::result_of< Fn() >::type >
wait_all_values( Fn && function, Fns && ... functions) {
    std::size_t count( 1 + sizeof ... ( functions) );
    typedef typename std::result_of< Fn() >::type return_t;
    typedef std::vector< return_t > vector_t;
    vector_t results;
    results.reserve( count);

    // get channel
    std::shared_ptr< boost::fibers::unbounded_channel< return_t > > channel =
        wait_all_values_source( std::forward< Fn >( function),
                                std::forward< Fns >( functions) ... );
    // fill results vector
    return_t value;
    while ( boost::fibers::channel_op_status::success == channel->pop(value) ) {
        results.push_back( value);
    }
    // return vector to caller
    return results;
}

It might be called like this:

std::vector< std::string > values =
    wait_all_values(
            [](){ return sleeper("wav_late",   150); },
            [](){ return sleeper("wav_middle", 100); },
            [](){ return sleeper("wav_early",   50); });

As you can see from the loop in wait_all_values(), instead of requiring its caller to count values, we define wait_all_values_source() to unbounded_channel::close() the channel when done. But how do we do that? Each producer fiber is independent. It has no idea whether it is the last one to unbounded_channel::push() a value.

We can address that problem with a counting façade for the unbounded_channel<>. In fact, our façade need only support the producer end of the channel.

// Introduce a channel facade that closes the channel once a specific number
// of items has been pushed. This allows an arbitrary consumer to read until
// 'closed' without itself having to count items.
template< typename T >
class nchannel {
public:
    nchannel( std::shared_ptr< boost::fibers::unbounded_channel< T > > cp,
              std::size_t lm):
        channel_( cp),
        limit_( lm) {
        assert(channel_);
        if ( 0 == limit_) {
            channel_->close();
        }
    }

    boost::fibers::channel_op_status push( T && va) {
        boost::fibers::channel_op_status ok =
            channel_->push( std::forward< T >( va) );
        if ( ok == boost::fibers::channel_op_status::success &&
             --limit_ == 0) {
            // after the 'limit_'th successful push, close the channel
            channel_->close();
        }
        return ok;
    }

private:
    std::shared_ptr< boost::fibers::unbounded_channel< T > >    channel_;
    std::size_t                                                 limit_;
};

Armed with nchannel<>, we can implement wait_all_values_source(). It starts just like wait_first_value(). The difference is that we wrap the unbounded_channel<T> with an nchannel<T> to pass to the producer fibers.

Then, of course, instead of popping the first value, closing the channel and returning it, we simply return the shared_ptr<unbounded_channel<T>>.

// Return a shared_ptr<unbounded_channel<T>> from which the caller can
// retrieve each new result as it arrives, until 'closed'.
template< typename Fn, typename ... Fns >
std::shared_ptr< boost::fibers::unbounded_channel< typename std::result_of< Fn() >::type > >
wait_all_values_source( Fn && function, Fns && ... functions) {
    std::size_t count( 1 + sizeof ... ( functions) );
    typedef typename std::result_of< Fn() >::type return_t;
    typedef boost::fibers::unbounded_channel< return_t > channel_t;
    // make the channel
    auto channelp( std::make_shared< channel_t >() );
    // and make an nchannel facade to close it after 'count' items
    auto ncp( std::make_shared< nchannel< return_t > >( channelp, count) );
    // pass that nchannel facade to all the relevant fibers
    wait_all_values_impl< return_t >( ncp,
                                      std::forward< Fn >( function),
                                      std::forward< Fns >( functions) ... );
    // then return the channel for consumer
    return channelp;
}

For example:

std::shared_ptr< boost::fibers::unbounded_channel< std::string > > channel =
    wait_all_values_source(
            [](){ return sleeper("wavs_third",  150); },
            [](){ return sleeper("wavs_second", 100); },
            [](){ return sleeper("wavs_first",   50); });
std::string value;
while ( boost::fibers::channel_op_status::success == channel->pop(value) ) {
    std::cout << "wait_all_values_source() => '" << value
              << "'" << std::endl;
}

wait_all_values_impl() really is just like wait_first_value_impl() except for the use of nchannel<T> rather than unbounded_channel<T>:

template< typename T, typename Fn >
void wait_all_values_impl( std::shared_ptr< nchannel< T > > channel,
                           Fn && function) {
    boost::fibers::fiber( [channel, function](){
                              channel->push(function());
                          }).detach();
}



[6] We could have used either bounded_channel<> or unbounded_channel<>. We chose unbounded_channel<> on the assumption that its simpler semantics imply a cheaper implementation.


PrevUpHomeNext