Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

PrevUpHomeNext

Deeper Dive into Boost.Asio

By now the alert reader is thinking: but surely, with Asio in particular, we ought to be able to do much better than periodic polling pings!

This turns out to be surprisingly tricky. We present a possible approach in examples/asio/round_robin.hpp.

One consequence of using Boost.Asio is that you must always let Asio suspend the running thread. Since Asio is aware of pending I/O requests, it can arrange to suspend the thread in such a way that the OS will wake it on I/O completion. No one else has sufficient knowledge.

So the fiber scheduler must depend on Asio for suspension and resumption. It requires Asio handler calls to wake it.

One dismaying implication is that we cannot support multiple threads calling io_service::run() on the same io_service instance. The reason is that Asio provides no way to constrain a particular handler to be called only on a specified thread. A fiber scheduler instance is locked to a particular thread: that instance cannot manage any other thread’s fibers. Yet if we allow multiple threads to call io_service::run() on the same io_service instance, a fiber scheduler which needs to sleep can have no guarantee that it will reawaken in a timely manner. It can set an Asio timer, as described above — but that timer’s handler may well execute on a different thread!

Another implication is that since an Asio-aware fiber scheduler (not to mention boost::fibers::asio::yield) depends on handler calls from the io_service, it is the application’s responsibility to ensure that io_service::stop() is not called until every fiber has terminated.

It is easier to reason about the behavior of the presented asio::round_robin scheduler if we require that after initial setup, the thread’s main fiber is the fiber that calls io_service::run(), so let’s impose that requirement.

Naturally, the first thing we must do on each thread using a custom fiber scheduler is call use_scheduling_algorithm(). However, since asio::round_robin requires an io_service instance, we must first declare that.

std::shared_ptr< boost::asio::io_context > io_ctx = std::make_shared< boost::asio::io_context >();
boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_ctx);

use_scheduling_algorithm() instantiates asio::round_robin, which naturally calls its constructor:

round_robin( std::shared_ptr< boost::asio::io_context > const& io_ctx) :
    io_ctx_( io_ctx),
    suspend_timer_( * io_ctx_) {
    // We use add_service() very deliberately. This will throw
    // service_already_exists if you pass the same io_context instance to
    // more than one round_robin instance.
    boost::asio::add_service( * io_ctx_, new service( * io_ctx_) );
    boost::asio::post( * io_ctx_, [this]() mutable {

asio::round_robin binds the passed io_service pointer and initializes a boost::asio::steady_timer:

std::shared_ptr< boost::asio::io_context >      io_ctx_;
boost::asio::steady_timer                       suspend_timer_;

Then it calls boost::asio::add_service() with a nested service struct:

struct service : public boost::asio::io_context::service {
    static boost::asio::io_context::id                  id;

    std::unique_ptr< boost::asio::io_context::work >    work_;

    service( boost::asio::io_context & io_ctx) :
        boost::asio::io_context::service( io_ctx),
        work_{ new boost::asio::io_context::work( io_ctx) } {
    }

    virtual ~service() {}

    service( service const&) = delete;
    service & operator=( service const&) = delete;

    void shutdown_service() override final {
        work_.reset();
    }
};

... [asio_rr_service_bottom]

The service struct has a couple of roles.

Its foremost role is to manage a std::unique_ptr<boost::asio::io_service::work>. We want the io_service instance to continue its main loop even when there is no pending Asio I/O.

But when boost::asio::io_service::service::shutdown_service() is called, we discard the io_service::work instance so the io_service can shut down properly.

Its other purpose is to post() a lambda (not yet shown). Let’s walk further through the example program before coming back to explain that lambda.

The service constructor returns to asio::round_robin’s constructor, which returns to use_scheduling_algorithm(), which returns to the application code.

Once it has called use_scheduling_algorithm(), the application may now launch some number of fibers:

// server
tcp::acceptor a( * io_ctx, tcp::endpoint( tcp::v4(), 9999) );
boost::fibers::fiber( server, io_ctx, std::ref( a) ).detach();
// client
const unsigned iterations = 2;
const unsigned clients = 3;
boost::fibers::barrier b( clients);
for ( unsigned i = 0; i < clients; ++i) {
    boost::fibers::fiber(
            client, io_ctx, std::ref( a), std::ref( b), iterations).detach();
}

Since we don’t specify a launch, these fibers are ready to run, but have not yet been entered.

Having set everything up, the application calls io_service::run():

io_ctx->run();

Now what?

Because this io_service instance owns an io_service::work instance, run() does not immediately return. But — none of the fibers that will perform actual work has even been entered yet!

Without that initial post() call in service’s constructor, nothing would happen. The application would hang right here.

So, what should the post() handler execute? Simply this_fiber::yield()?

That would be a promising start. But we have no guarantee that any of the other fibers will initiate any Asio operations to keep the ball rolling. For all we know, every other fiber could reach a similar boost::this_fiber::yield() call first. Control would return to the post() handler, which would return to Asio, and... the application would hang.

The post() handler could post() itself again. But as discussed in the previous section, once there are actual I/O operations in flight — once we reach a state in which no fiber is ready — that would cause the thread to spin.

We could, of course, set an Asio timer — again as previously discussed. But in this deeper dive, we’re trying to do a little better.

The key to doing better is that since we’re in a fiber, we can run an actual loop — not just a chain of callbacks. We can wait for something to happen by calling io_service::run_one() — or we can execute already-queued Asio handlers by calling io_service::poll().

Here’s the body of the lambda passed to the post() call.

 while ( ! io_ctx_->stopped() ) {
     if ( has_ready_fibers() ) {
         // run all pending handlers in round_robin
         while ( io_ctx_->poll() );
         // block this fiber till all pending (ready) fibers are processed
         // == round_robin::suspend_until() has been called
         std::unique_lock< boost::fibers::mutex > lk( mtx_);
         cnd_.wait( lk);
     } else {
         // run one handler inside io_context
         // if no handler available, block this thread
         if ( ! io_ctx_->run_one() ) {
             break;
         }
     }
}

We want this loop to exit once the io_service instance has been stopped().

As long as there are ready fibers, we interleave running ready Asio handlers with running ready fibers.

If there are no ready fibers, we wait by calling run_one(). Once any Asio handler has been called — no matter which — run_one() returns. That handler may have transitioned some fiber to ready state, so we loop back to check again.

(We won’t describe awakened(), pick_next() or has_ready_fibers(), as these are just like round_robin::awakened(), round_robin::pick_next() and round_robin::has_ready_fibers().)

That leaves suspend_until() and notify().

Doubtless you have been asking yourself: why are we calling io_service::run_one() in the lambda loop? Why not call it in suspend_until(), whose very API was designed for just such a purpose?

Under normal circumstances, when the fiber manager finds no ready fibers, it calls algorithm::suspend_until(). Why test has_ready_fibers() in the lambda loop? Why not leverage the normal mechanism?

The answer is: it matters who’s asking.

Consider the lambda loop shown above. The only Boost.Fiber APIs it engages are has_ready_fibers() and this_fiber::yield(). yield() does not block the calling fiber: the calling fiber does not become unready. It is immediately passed back to algorithm::awakened(), to be resumed in its turn when all other ready fibers have had a chance to run. In other words: during a yield() call, there is always at least one ready fiber.

As long as this lambda loop is still running, the fiber manager does not call suspend_until() because it always has a fiber ready to run.

However, the lambda loop itself can detect the case when no other fibers are ready to run: the running fiber is not ready but running.

That said, suspend_until() and notify() are in fact called during orderly shutdown processing, so let’s try a plausible implementation.

void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept {
    // Set a timer so at least one handler will eventually fire, causing
    // run_one() to eventually return.
    if ( (std::chrono::steady_clock::time_point::max)() != abs_time) {
        // Each expires_at(time_point) call cancels any previous pending
        // call. We could inadvertently spin like this:
        // dispatcher calls suspend_until() with earliest wake time
        // suspend_until() sets suspend_timer_
        // lambda loop calls run_one()
        // some other asio handler runs before timer expires
        // run_one() returns to lambda loop
        // lambda loop yields to dispatcher
        // dispatcher finds no ready fibers
        // dispatcher calls suspend_until() with SAME wake time
        // suspend_until() sets suspend_timer_ to same time, canceling
        // previous async_wait()
        // lambda loop calls run_one()
        // asio calls suspend_timer_ handler with operation_aborted
        // run_one() returns to lambda loop... etc. etc.
        // So only actually set the timer when we're passed a DIFFERENT
        // abs_time value.
        suspend_timer_.expires_at( abs_time);
        suspend_timer_.async_wait([](boost::system::error_code const&){
                                    this_fiber::yield();
                                  });
    }
    cnd_.notify_one();
}

As you might expect, suspend_until() sets an asio::steady_timer to expires_at() the passed std::chrono::steady_clock::time_point. Usually.

As indicated in comments, we avoid setting suspend_timer_ multiple times to the same time_point value since every expires_at() call cancels any previous async_wait() call. There is a chance that we could spin. Reaching suspend_until() means the fiber manager intends to yield the processor to Asio. Cancelling the previous async_wait() call would fire its handler, causing run_one() to return, potentially causing the fiber manager to call suspend_until() again with the same time_point value...

Given that we suspend the thread by calling io_service::run_one(), what’s important is that our async_wait() call will cause a handler to run, which will cause run_one() to return. It’s not so important specifically what that handler does.

void notify() noexcept {
    // Something has happened that should wake one or more fibers BEFORE
    // suspend_timer_ expires. Reset the timer to cause it to fire
    // immediately, causing the run_one() call to return. In theory we
    // could use cancel() because we don't care whether suspend_timer_'s
    // handler is called with operation_aborted or success. However --
    // cancel() doesn't change the expiration time, and we use
    // suspend_timer_'s expiration time to decide whether it's already
    // set. If suspend_until() set some specific wake time, then notify()
    // canceled it, then suspend_until() was called again with the same
    // wake time, it would match suspend_timer_'s expiration time and we'd
    // refrain from setting the timer. So instead of simply calling
    // cancel(), reset the timer, which cancels the pending sleep AND sets
    // a new expiration time. This will cause us to spin the loop twice --
    // once for the operation_aborted handler, once for timer expiration
    // -- but that shouldn't be a big problem.
    suspend_timer_.async_wait([](boost::system::error_code const&){
                                this_fiber::yield();
                              });
    suspend_timer_.expires_at( std::chrono::steady_clock::now() );
}

Since an expires_at() call cancels any previous async_wait() call, we can make notify() simply call steady_timer::expires_at(). That should cause the io_service to call the async_wait() handler with operation_aborted.

The comments in notify() explain why we call expires_at() rather than cancel().

This boost::fibers::asio::round_robin implementation is used in examples/asio/autoecho.cpp.

It seems possible that you could put together a more elegant Fiber / Asio integration. But as noted at the outset: it’s tricky.


PrevUpHomeNext