Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.

libs/fiber/examples/asio/round_robin.hpp

//          Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
//    (See accompanying file LICENSE_1_0.txt or copy at
//          http://www.boost.org/LICENSE_1_0.txt)

#ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H
#define BOOST_FIBERS_ASIO_ROUND_ROBIN_H

#include <chrono>
#include <cstddef>
#include <mutex>
#include <queue>

#include <boost/asio.hpp>
#include <boost/assert.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/config.hpp>

#include <boost/fiber/condition_variable.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/operations.hpp>
#include <boost/fiber/scheduler.hpp>

#include "yield.hpp"

#ifdef BOOST_HAS_ABI_HEADERS
#  include BOOST_ABI_PREFIX
#endif

namespace boost {
namespace fibers {
namespace asio {

class round_robin : public algo::algorithm {
private:
    typedef scheduler::ready_queue_t rqueue_t;

//[asio_rr_suspend_timer
    boost::asio::io_service                     &   io_svc_;
    boost::asio::steady_timer                       suspend_timer_;
//]
    rqueue_t                                        rqueue_{};

public:
//[asio_rr_service_top
    struct service : public boost::asio::io_service::service {
        static boost::asio::io_service::id                  id;

        std::unique_ptr< boost::asio::io_service::work >    work_;

        service( boost::asio::io_service & io_svc) :
            boost::asio::io_service::service( io_svc),
            work_{ new boost::asio::io_service::work( io_svc) } {
            io_svc.post([&io_svc](){
//]
//[asio_rr_service_lambda
                while ( ! io_svc.stopped() ) {
                    if ( boost::fibers::has_ready_fibers() ) {
                        // run all pending handlers in round_robin
                        while ( io_svc.poll() );
                        // run pending (ready) fibers
                        this_fiber::yield();
                    } else {
                        // run one handler inside io_service
                        // if no handler available, block this thread
                        if ( ! io_svc.run_one() ) {
                            break;
                        }
                    }
                }
//]
//[asio_rr_service_bottom
            });
        }

        virtual ~service() {}

        service( service const&) = delete;
        service & operator=( service const&) = delete;

        void shutdown_service() override final {
            work_.reset();
        }
    };
//]

//[asio_rr_ctor
    round_robin( boost::asio::io_service & io_svc) :
        io_svc_( io_svc),
        suspend_timer_( io_svc_) {
        // We use add_service() very deliberately. This will throw
        // service_already_exists if you pass the same io_service instance to
        // more than one round_robin instance.
        boost::asio::add_service( io_svc_, new service( io_svc_));
    }
//]

    void awakened( context * ctx) noexcept {
        BOOST_ASSERT( nullptr != ctx);
        ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/
    }

    context * pick_next() noexcept {
        context * ctx( nullptr);
        if ( ! rqueue_.empty() ) { /*<
            pop an item from the ready queue
        >*/
            ctx = & rqueue_.front();
            rqueue_.pop_front();
            BOOST_ASSERT( nullptr != ctx);
            BOOST_ASSERT( context::active() != ctx);
        }
        return ctx;
    }

    bool has_ready_fibers() const noexcept {
        return ! rqueue_.empty();
    }

//[asio_rr_suspend_until
    void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept {
        // Set a timer so at least one handler will eventually fire, causing
        // run_one() to eventually return. Set a timer even if abs_time ==
        // time_point::max() so the timer can be canceled by our notify()
        // method -- which calls the handler.
        if ( suspend_timer_.expires_at() != abs_time) {
            // Each expires_at(time_point) call cancels any previous pending
            // call. We could inadvertently spin like this:
            // dispatcher calls suspend_until() with earliest wake time
            // suspend_until() sets suspend_timer_
            // lambda loop calls run_one()
            // some other asio handler runs before timer expires
            // run_one() returns to lambda loop
            // lambda loop yields to dispatcher
            // dispatcher finds no ready fibers
            // dispatcher calls suspend_until() with SAME wake time
            // suspend_until() sets suspend_timer_ to same time, canceling
            // previous async_wait()
            // lambda loop calls run_one()
            // asio calls suspend_timer_ handler with operation_aborted
            // run_one() returns to lambda loop... etc. etc.
            // So only actually set the timer when we're passed a DIFFERENT
            // abs_time value.
            suspend_timer_.expires_at( abs_time);
            // It really doesn't matter what the suspend_timer_ handler does,
            // or even whether it's called because the timer ran out or was
            // canceled. The whole point is to cause the run_one() call to
            // return. So just pass a no-op lambda with proper signature.
            suspend_timer_.async_wait([](boost::system::error_code const&){});
        }
    }
//]

//[asio_rr_notify
    void notify() noexcept {
        // Something has happened that should wake one or more fibers BEFORE
        // suspend_timer_ expires. Reset the timer to cause it to fire
        // immediately, causing the run_one() call to return. In theory we
        // could use cancel() because we don't care whether suspend_timer_'s
        // handler is called with operation_aborted or success. However --
        // cancel() doesn't change the expiration time, and we use
        // suspend_timer_'s expiration time to decide whether it's already
        // set. If suspend_until() set some specific wake time, then notify()
        // canceled it, then suspend_until() was called again with the same
        // wake time, it would match suspend_timer_'s expiration time and we'd
        // refrain from setting the timer. So instead of simply calling
        // cancel(), reset the timer, which cancels the pending sleep AND sets
        // a new expiration time. This will cause us to spin the loop twice --
        // once for the operation_aborted handler, once for timer expiration
        // -- but that shouldn't be a big problem.
        suspend_timer_.expires_at( std::chrono::steady_clock::now() );
    }
//]
};

boost::asio::io_service::id round_robin::service::id;

}}}

#ifdef BOOST_HAS_ABI_HEADERS
#  include BOOST_ABI_SUFFIX
#endif

#endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H