Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.
PrevUpHomeNext

Introduction

Definition

In computer science routines are defined as a sequence of operations. The execution of routines form a parent-child relationship and the child terminates always before the parent. Coroutines are a generalization of routines. The principal difference between coroutines and routines is that a coroutine enables explicit suspend and resume of their progress via additional operations by preserving local state, e.g. a coroutine is a kind of continuation. A continuation is a object representing a suspended execution (registers, stack). Each coroutine has its own stack and local variables, sub-routine calls etc. In this sense coroutines are (actually) a language concept.

How it works

Functions foo() and bar() are supposed to alternate their execution (leave and enter function body).

foo_bar

If coroutines would be called such as routines, the stack would grow with every call and will never be degraded. A jump into the middle of a coroutine would not be possible, because the return address would have been on top of stack entries.

The solution is that each coroutine has its own stack and control-block (boost::contexts::fcontext_t from Boost.Context). Before the coroutine gets suspended, the non-volatile registers (including stack and instruction/program pointer) of the currently active coroutine are stored in coroutine's control-block. The registers of the newly activated coroutine must be restored from its associated control-block before it can continue with their work.

foo_bar_seq

The context switch requires no system privileges and provides cooperative multitasking on the level of language. Coroutines provide quasi parallelism. When a program is supposed to do several things at the same time, coroutines help to do this much simpler and more elegant than with only a single flow of control. Advantages can be seen particularly clearly with the use of a recursive function, such as traversal of binary trees (see example 'same fringe').

Example: asio::io_stream with std::stream

This section demonstrates how stackfull coroutines help to use standard C++ IO-streams together with IO-demultiplexer like boost::asio::io_sevice (using non-blocking IO).

int main( int argc, char * argv[])
{
    ...
    {
        boost::asio::io_service io_service;
        io_service.post(
            boost::bind(
                & server::start,
                server::create(
                    io_service, port) ) );
        io_service.run();
    }
    ...
}

server accepts connection-requests made by clients, creates for each new connection an instance of type session and invokes session::start() on it.

class server : public boost::enable_shared_from_this< server >
{
private:
    boost::asio::io_service         &   io_service_;
    boost::asio::ip::tcp::acceptor      acceptor_;

    void handle_accept_( session * new_session, boost::system::error_code const& error)
    {
        if ( ! error)
        {
            // start asynchronous read
            new_session->start();

            // start asynchronous accept
            start();
        }
    }

    server( boost::asio::io_service & io_service, short port) :
        io_service_( io_service),
        acceptor_(
            io_service_,
            boost::asio::ip::tcp::endpoint( boost::asio::ip::tcp::v4(), port) )
    {}

public:
    typedef boost::shared_ptr< server > ptr_t;

    static ptr_t create( boost::asio::io_service & io_service, short port)
    { return ptr_t( new server( io_service, port) ); }

    void start()
    {
        // create new session which gets started if asynchronous
        // accept completes
        session * new_session( new session( io_service_) );
        acceptor_.async_accept(
            new_session->socket(),
            boost::bind( & server::handle_accept_, this->shared_from_this(),
                new_session, boost::asio::placeholders::error) );
    }
};

Each session communicates with the connected client and handles the requests. The application protocol in this example uses TCP-sockets as channel and 'newline' to separate the messages in the byte stream. An application protocol is a set of rules for the order in which messages are exchanged. std::istream is used to extract the messages from the character stream . Message 'exit' terminates the session.

class session : private boost::noncopyable
{
private:
    void handle_read_( coro_t::caller_type & self)
    {
        // create stream-buffer reading from socket
        inbuf buf( socket_);
        std::istream s( & buf);

        // messages are separated by 'newline'
        std::string msg;
        std::getline( s, msg);
        std::cout << msg << std::endl;

        // terminate session for message 'exit'
        // else do asynchronous read
        if ( "exit" == msg)
            io_service_.post(
                boost::bind(
                    & session::destroy_, this) );
        else
            start();
    }

    void destroy_()
    { delete this; }

    boost::asio::io_service     &   io_service_;
    boost::asio::ip::tcp::socket    socket_;

public:
    session( boost::asio::io_service & io_service) :
        io_service_( io_service),
        socket_( io_service_)
    { std::cout << "service(): " << socket_.remote_endpoint() << std::endl; }

    ~session()
    { std::cout << "~service(): " << socket_.remote_endpoint() << std::endl; }

    boost::asio::ip::tcp::socket & socket()
    { return socket_; }

    void start()
    {
        // register on io_service for asynchronous read
        io_service_.async_read(
            socket_,
            boost::bind(
                & session::handle_read_, this->shared_from_this(), _1, _2) );
    }
};

Function std::getline() returns only if a 'newline' was read from the socket. Therefore the application will block until 'newline' is received by the socket. The stream-buffer used by the stream maintains an internal buffer which gets (re-)filled by its function stream_buf::underflow(). stream_buf::underflow() does the read-operation on the socket. The C++ IO-streams framework does not provide an easy way to create an continuation which represents reading bytes from the socket.

Coroutines help in this case to make the application non-blocking even if no 'newline' was received. Class session creates a coroutine which uses session::handle_read() as coroutine-function. On a new created session start() called starting the coroutine. In the coroutine-function session::handle_read() the messages are received via std::getline() in a loop until 'exit' is delivered.

class session : private boost::noncopyable
{
private:
    void handle_read_( coro_t::caller_type & ca)
    {
        // create stream-buffer with coroutine
        inbuf buf( socket_, coro_, ca);
        std::istream s( & buf);

        std::string msg;
        do
        {
            // read message
            // we not block if no newline was received yet
            std::getline( s, msg);
            std::cout << msg << std::endl;
        } while ( msg != "exit");
        io_service_.post(
                boost::bind(
                    & session::destroy_, this) );
    }

    void destroy_()
    { delete this; }

    coro_t                          coro_;
    boost::asio::io_service     &   io_service_;
    boost::asio::ip::tcp::socket    socket_;

public:
    session( boost::asio::io_service & io_service) :
        coro_(),
        io_service_( io_service),
        socket_( io_service_)
    { std::cout << "service(): " << socket_.remote_endpoint() << std::endl; }

    ~session()
    { std::cout << "~service(): " << socket_.remote_endpoint() << std::endl; }

    boost::asio::ip::tcp::socket & socket()
    { return socket_; }

    void start()
    {
        // create and start a coroutine
        // handle_read_() is used as coroutine-function
        coro_ = coro_t( boost::bind( & session::handle_read_, this, _1) );
    }
};

The stream-buffer is created with boost::coroutines::coroutine<>::caller_type and handles suspend/resume of this code path depending on if bytes can be read from the socket.

class inbuf : public std::streambuf,
              private boost::noncopyable
{
private:
    static const std::streamsize        pb_size;

    enum
    { bf_size = 16 };

    int fetch_()
    {
        std::streamsize num = std::min(
            static_cast< std::streamsize >( gptr() - eback() ), pb_size);

        std::memmove(
            buffer_ + ( pb_size - num),
            gptr() - num, num);

        // read bytes from the socket into internal buffer 'buffer_'
        // make coro_t::operator() as callback, invoked if some
        // bytes are read into 'buffer_'
        s_.async_read_some(
                boost::asio::buffer( buffer_ + pb_size, bf_size - pb_size),
                boost::bind( & coro_t::operator(), & coro_, _1, _2) );
        // suspend this coroutine
        ca_();

        // coroutine was resumed by boost::asio::io_sevice
        boost::system::error_code ec;
        std::size_t n = 0;

        // check arguments
        boost::tie( ec, n) = ca_.get();

        // check if an error occurred
        if ( ec)
        {
            setg( 0, 0, 0);
            return -1;
        }

        setg( buffer_ + pb_size - num, buffer_ + pb_size, buffer_ + pb_size + n);
        return n;
    }

    boost::asio::ip::tcp::socket      &   s_;
    coro_t                            &   coro_;
    coro_t::caller_type               &   ca_;
    char                                  buffer_[bf_size];

protected:
    virtual int underflow()
    {
        if ( gptr() < egptr() )
            return traits_type::to_int_type( * gptr() );

        if ( 0 > fetch_() )
            return traits_type::eof();
        else
            return traits_type::to_int_type( * gptr() );
    }

public:
    inbuf(
            boost::asio::ip::tcp::socket & s,
            coro_t & coro,
            coro_t::caller_type & ca) :
        s_( s), coro_( coro), ca_( ca), buffer_()
    { setg( buffer_ + 4, buffer_ + 4, buffer_ + 4); }
};
const std::streamsize inbuf::pb_size = 4;

inbuf::fetch() uses boost::coroutines::coroutine<>::operator() as callback for the asynchronous read-operation on the socket and suspends itself (ca_() jumps back to session::start()). If some bytes are available in the socket receive buffer boost::asio::io_sevice copies the bytes to the internal buffer buffer_ and invokes the callback which resumes the coroutine (ca_() returns).


PrevUpHomeNext