Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.
PrevUpHomeNext

Migrating fibers between threads

Overview

Each fiber owns a stack and manages its execution state, including all registers and CPU flags, the instruction pointer and the stack pointer. That means, in general, a fiber is not bound to a specific thread.[3],[4]

Migrating a fiber from a logical CPU with heavy workload to another logical CPU with a lighter workload might speed up the overall execution. Note that in the case of NUMA-architectures, it is not always advisable to migrate data between threads. Suppose fiber f is running on logical CPU cpu0 which belongs to NUMA node node0. The data of f are allocated on the physical memory located at node0. Migrating the fiber from cpu0 to another logical CPU cpuX which is part of a different NUMA node nodeX might reduce the performance of the application due to increased latency of memory access.

Only fibers that are contained in algorithm’s ready queue can migrate between threads. You cannot migrate a running fiber, nor one that is blocked. You cannot migrate a fiber if its context::is_context() method returns true for pinned_context.

In Boost.Fiber a fiber is migrated by invoking context::detach() on the thread from which the fiber migrates and context::attach() on the thread to which the fiber migrates.

Thus, fiber migration is accomplished by sharing state between instances of a user-coded algorithm implementation running on different threads. The fiber’s original thread calls algorithm::awakened(), passing the fiber’s context*. The awakened() implementation calls context::detach().

At some later point, when the same or a different thread calls algorithm::pick_next(), the pick_next() implementation selects a ready fiber and calls context::attach() on it before returning it.

As stated above, a context for which is_context(pinned_context) == true must never be passed to either context::detach() or context::attach(). It may only be returned from pick_next() called by the same thread that passed that context to awakened().

Example of work sharing

In the example work_sharing.cpp multiple worker fibers are created on the main thread. Each fiber gets a character as parameter at construction. This character is printed out ten times. Between each iteration the fiber calls this_fiber::yield(). That puts the fiber in the ready queue of the fiber-scheduler shared_ready_queue, running in the current thread. The next fiber ready to be executed is dequeued from the shared ready queue and resumed by shared_ready_queue running on any participating thread.

All instances of shared_ready_queue share one global concurrent queue, used as ready queue. This mechanism shares all worker fibers between all instances of shared_ready_queue, thus between all participating threads.

Setup of threads and fibers

In main() the fiber-scheduler is installed and the worker fibers and the threads are launched.

boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); 1

for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { 2
    boost::fibers::fiber([c](){ whatevah( c); }).detach();
    ++fiber_count; 3
}
thread_barrier b( 4);
std::thread threads[] = { 4
    std::thread( thread, & b),
    std::thread( thread, & b),
    std::thread( thread, & b)
};
b.wait(); 5
{
    lock_type6 lk( mtx_count);
    cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); 7
} 8
BOOST_ASSERT( 0 == fiber_count);
for ( std::thread & t : threads) { 9
    t.join();
}

1

Install the scheduling algorithm boost::fibers::algo::shared_work in the main thread too, so each new fiber gets launched into the shared pool.

2

Launch a number of worker fibers; each worker fiber picks up a character that is passed as parameter to fiber-function whatevah. Each worker fiber gets detached.

3

Increment fiber counter for each new fiber.

4

Launch a couple of threads that join the work sharing.

5

sync with other threads: allow them to start processing

6

lock_type is typedef'ed as std::unique_lock< std::mutex >

7

Suspend main fiber and resume worker fibers in the meanwhile. Main fiber gets resumed (e.g returns from condition_variable_any::wait()) if all worker fibers are complete.

8

Releasing lock of mtx_count is required before joining the threads, otherwise the other threads would be blocked inside condition_variable::wait() and would never return (deadlock).

9

wait for threads to terminate

The start of the threads is synchronized with a barrier. The main fiber of each thread (including main thread) is suspended until all worker fibers are complete. When the main fiber returns from condition_variable::wait(), the thread terminates: the main thread joins all other threads.

void thread( thread_barrier * b) {
    std::ostringstream buffer;
    buffer << "thread started " << std::this_thread::get_id() << std::endl;
    std::cout << buffer.str() << std::flush;
    boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); 1
    b->wait(); 2
    lock_type lk( mtx_count);
    cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); 3
    BOOST_ASSERT( 0 == fiber_count);
}

1

Install the scheduling algorithm boost::fibers::algo::shared_work in order to join the work sharing.

2

sync with other threads: allow them to start processing

3

Suspend main fiber and resume worker fibers in the meanwhile. Main fiber gets resumed (e.g returns from condition_variable_any::wait()) if all worker fibers are complete.

Each worker fiber executes function whatevah() with character me as parameter. The fiber yields in a loop and prints out a message if it was migrated to another thread.

void whatevah( char me) {
    try {
        std::thread::id my_thread = std::this_thread::get_id(); 1
        {
            std::ostringstream buffer;
            buffer << "fiber " << me << " started on thread " << my_thread << '\n';
            std::cout << buffer.str() << std::flush;
        }
        for ( unsigned i = 0; i < 10; ++i) { 2
            boost::this_fiber::yield(); 3
            std::thread::id new_thread = std::this_thread::get_id(); 4
            if ( new_thread != my_thread) { 5
                my_thread = new_thread;
                std::ostringstream buffer;
                buffer << "fiber " << me << " switched to thread " << my_thread << '\n';
                std::cout << buffer.str() << std::flush;
            }
        }
    } catch ( ... ) {
    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { 6
        lk.unlock();
        cnd_count.notify_all(); 7
    }
}

1

get ID of initial thread

2

loop ten times

3

yield to other fibers

4

get ID of current thread

5

test if fiber was migrated to another thread

6

Decrement fiber counter for each completed fiber.

7

Notify all fibers waiting on cnd_count.

Scheduling fibers

The fiber scheduler shared_ready_queue is like round_robin, except that it shares a common ready queue among all participating threads. A thread participates in this pool by executing use_scheduling_algorithm() before any other Boost.Fiber operation.

The important point about the ready queue is that it’s a class static, common to all instances of shared_ready_queue. Fibers that are enqueued via algorithm::awakened() (fibers that are ready to be resumed) are thus available to all threads. It is required to reserve a separate, scheduler-specific queue for the thread’s main fiber and dispatcher fibers: these may not be shared between threads! When we’re passed either of these fibers, push it there instead of in the shared queue: it would be Bad News for thread B to retrieve and attempt to execute thread A’s main fiber.

[awakened_ws]

When algorithm::pick_next() gets called inside one thread, a fiber is dequeued from rqueue_ and will be resumed in that thread.

[pick_next_ws]

The source code above is found in work_sharing.cpp.



[3] The main fiber on each thread, that is, the fiber on which the thread is launched, cannot migrate to any other thread. Also Boost.Fiber implicitly creates a dispatcher fiber for each thread — this cannot migrate either.

[4] Of course it would be problematic to migrate a fiber that relies on thread-local storage.


PrevUpHomeNext