Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

Click here to view the latest version of this page.
PrevUpHomeNext

Introduction

Definition

In computer science routines are defined as a sequence of operations. The execution of routines form a parent-child relationship and the child terminates always before the parent. Coroutines are a generalization of routines. The principal difference between coroutines and routines is that a coroutine enables explicit suspend and resume their progress via additional operations by preserving local state, e.g. a coroutine is a kind of continuation. A continuation is a object representing a suspended execution (registers, stack). Each coroutine has its own stack and local variables, sub-routine calls etc. In this sense coroutines are (actually) a language concept.

How it works

Functions foo() and bar() are supposed to alternate their execution (leave and enter function body).

foo_bar

If coroutines would be called such as routines, the stack would grow with every call and will never be degraded. A jump into the middle of a coroutine would not be possible, because the return address would have been on top of stack entries.

The solution is that each coroutine has its own stack and control-block (boost::contexts::fcontext_t from Boost.Context). Before the coroutine gets suspended, the non-volatile registers (including stack and instruction/program pointer) of the currently active coroutine are stored in coroutine's control-block. The registers of the newly activated coroutine must be restored from its associated control-block before it can continue with their work.

foo_bar_seq

The context switch requires no system privileges and provides cooperative multitasking on the level of language. Coroutines provide quasi parallelism. When a program is supposed to do several things at the same time, coroutines help to do this much simpler and more elegant than with only a single flow of control. Advantages can be seen particularly clearly with the use of a recursive function, such as traversal of binary trees (see example 'same fringe').

Example: asio::io_stream with std::stream

This section demonstrates how stackfull coroutines help to use standard C++ IO-streams together with IO-demultiplexer like boost::asio::io_sevice (using non-blocking IO).

int main( int argc, char * argv[])
{
    ...
    {
        boost::asio::io_service io_service;
        io_service.post(
            boost::bind(
                & server::start,
                server::create(
                    io_service, port) ) );
        io_service.run();
    }
    ...
}

server accepts connection-requests made by clients, creates for each new connection an instance of type session and invokes session::start() on it.

class server : public boost::enable_shared_from_this< server >
{
private:
    boost::asio::io_service         &   io_service_;
    boost::asio::ip::tcp::acceptor      acceptor_;

    void handle_accept_( session * new_session, boost::system::error_code const& error)
    {
        if ( ! error)
        {
            // start asynchronous read
            new_session->start();

            // start asynchronous accept
            start();
        }
    }

    server( boost::asio::io_service & io_service, short port) :
        io_service_( io_service),
        acceptor_(
            io_service_,
            boost::asio::ip::tcp::endpoint( boost::asio::ip::tcp::v4(), port) )
    {}

public:
    typedef boost::shared_ptr< server > ptr_t;

    static ptr_t create( boost::asio::io_service & io_service, short port)
    { return ptr_t( new server( io_service, port) ); }

    void start()
    {
        // create new session which gets started if asynchronous
        // accept completes
        session * new_session( new session( io_service_) );
        acceptor_.async_accept(
            new_session->socket(),
            boost::bind( & server::handle_accept_, this->shared_from_this(),
                new_session, boost::asio::placeholders::error) );
    }
};

Each session communicates with the connected client and handles the requests. The application protocol in this example uses TCP-sockets as channel and 'newline' to separate the messages in the byte stream. An application protocol is a set of rules for the order in which messages are exchanged. std::istream is used to extract the messages from the character stream . Message 'exit' terminates the session.

class session : private boost::noncopyable
{
private:
    void handle_read_( coro_t::caller_type & self)
    {
        // create stream-buffer reading from socket
        inbuf buf( socket_);
        std::istream s( & buf);

        // messages are separated by 'newline'
        std::string msg;
        std::getline( s, msg);
        std::cout << msg << std::endl;

        // terminate session for message 'exit'
        // else do asynchronous read
        if ( "exit" == msg)
            io_service_.post(
                boost::bind(
                    & session::destroy_, this) );
        else
            start();
    }

    void destroy_()
    { delete this; }

    boost::asio::io_service     &   io_service_;
    boost::asio::ip::tcp::socket    socket_;

public:
    session( boost::asio::io_service & io_service) :
        io_service_( io_service),
        socket_( io_service_)
    { std::cout << "service(): " << socket_.remote_endpoint() << std::endl; }

    ~session()
    { std::cout << "~service(): " << socket_.remote_endpoint() << std::endl; }

    boost::asio::ip::tcp::socket & socket()
    { return socket_; }

    void start()
    {
        // register on io_service for asynchronous read
        io_service_.async_read(
            socket_,
            boost::bind(
                & session::handle_read_, this->shared_from_this(), _1, _2) );
    }
};

Function std::getline() returns only if a 'newline' was read from the socket. Therefore the application will block until 'newline' is received by the socket. The stream-buffer used by the stream maintains an internal buffer which gets (re-)filled by its function stream_buf::underflow(). stream_buf::underflow() does the read-operation on the socket. The C++ IO-streams framework does not provide an easy way to create an continuation which represents reading bytes from the socket.

Coroutines help in this case to make the application non-blocking even if no 'newline' was received. Class session creates a coroutine which uses session::handle_read() as coroutine-function. On a new created session start() called starting the coroutine. In the coroutine-function session::handle_read() the messages are received via std::getline() in a loop until 'exit' is delivered.

class session : private boost::noncopyable
{
private:
    void handle_read_( coro_t::caller_type & ca)
    {
        // create stream-buffer with coroutine
        inbuf buf( socket_, coro_, ca);
        std::istream s( & buf);

        std::string msg;
        do
        {
            // read message
            // we not block if no newline was received yet
            std::getline( s, msg);
            std::cout << msg << std::endl;
        } while ( msg != "exit");
        io_service_.post(
                boost::bind(
                    & session::destroy_, this) );
    }

    void destroy_()
    { delete this; }

    coro_t                          coro_;
    boost::asio::io_service     &   io_service_;
    boost::asio::ip::tcp::socket    socket_;

public:
    session( boost::asio::io_service & io_service) :
        coro_(),
        io_service_( io_service),
        socket_( io_service_)
    { std::cout << "service(): " << socket_.remote_endpoint() << std::endl; }

    ~session()
    { std::cout << "~service(): " << socket_.remote_endpoint() << std::endl; }

    boost::asio::ip::tcp::socket & socket()
    { return socket_; }

    void start()
    {
        // create and start a coroutine
        // handle_read_() is used as coroutine-function
        coro_ = coro_t( boost::bind( & session::handle_read_, this, _1) );
    }
};

The stream-buffer is created with boost::coroutines::coroutine<>::caller_type and handles suspend/resume of this code path depending on if bytes can be read from the socket.

class inbuf : public std::streambuf,
              private boost::noncopyable
{
private:
    static const std::streamsize        pb_size;

    enum
    { bf_size = 16 };

    int fetch_()
    {
        std::streamsize num = std::min(
            static_cast< std::streamsize >( gptr() - eback() ), pb_size);

        std::memmove(
            buffer_ + ( pb_size - num),
            gptr() - num, num);

        // read bytes from the socket into internal buffer 'buffer_'
        // make coro_t::operator() as callback, invoked if some
        // bytes are read into 'buffer_'
        s_.async_read_some(
                boost::asio::buffer( buffer_ + pb_size, bf_size - pb_size),
                boost::bind( & coro_t::operator(), & coro_, _1, _2) );
        // suspend this coroutine
        ca_();

        // coroutine was resumed by boost::asio::io_sevice
        boost::system::error_code ec;
        std::size_t n = 0;

        // check arguments
        boost::tie( ec, n) = ca_.get();

        // check if an error occurred
        if ( ec)
        {
            setg( 0, 0, 0);
            return -1;
        }

        setg( buffer_ + pb_size - num, buffer_ + pb_size, buffer_ + pb_size + n);
        return n;
    }

    boost::asio::ip::tcp::socket      &   s_;
    coro_t                            &   coro_;
    coro_t::caller_type               &   ca_;
    char                                  buffer_[bf_size];

protected:
    virtual int underflow()
    {
        if ( gptr() < egptr() )
            return traits_type::to_int_type( * gptr() );

        if ( 0 > fetch_() )
            return traits_type::eof();
        else
            return traits_type::to_int_type( * gptr() );
    }

public:
    inbuf(
            boost::asio::ip::tcp::socket & s,
            coro_t & coro,
            coro_t::caller_type & ca) :
        s_( s), coro_( coro), ca_( ca), buffer_()
    { setg( buffer_ + 4, buffer_ + 4, buffer_ + 4); }
};
const std::streamsize inbuf::pb_size = 4;

inbuf::fetch() uses boost::coroutines::coroutine<>::operator() as callback for the asynchronous read-operation on the socket and suspends itself (ca_() jumps back to session::start()). If some bytes are available in the socket receive buffer boost::asio::io_sevice copies the bytes to the internal buffer buffer_ and invokes the callback which resumes the coroutine (ca_() returns).


PrevUpHomeNext