Boost
C++ Libraries“...one of the most highly regarded and expertly designed C++ library projects in the world.” — Herb Sutter and Andrei Alexandrescu, C++ Coding Standards
![]() |
Home | Libraries | People | FAQ | More |
As mentioned before, the ability to shared memory between processes through memory mapped files or shared memory objects is not very useful if the access to that memory can't be effectively synchronized. This is the same problem that happens with thread-synchronization mechanisms, where heap memory and global variables are shared between threads, but the access to these resources needs to be synchronized tipically through mutex and condition variables. Boost.Threads implements these synchronization utilities between threads inside the same process. Boost.Interprocess implements similar mechanisms to synchronize threads from different processes.
Boost.Interprocess presents two types of synchronization objects:
fstream with the name
filename and another process opens that file using
another fstream with
the same filename argument. Each
process uses a different object to access to the resource, but both processes
are using the the same underlying resource.
Each type has it's own advantages and disadvantages:
The main interface difference between named and anonymous utilities are the constructors. Usually anonymous utilities have only one contructor, whereas the named utilities have several constructors whose first argument is an special type that requests creation, opening or opening or creation of the underlying resource:
using namespace boost::interprocess; //Create the synchronization utility. If it previously //exists, throws an error NamedUtility(create_only, ...) //Open the synchronization utility. If it does not previously //exist, it's created. NamedUtility(open_or_create, ...) //Open the synchronization utility. If it does not previously //exist, throws an error. NamedUtility(open_only, ...)
On the other hand the anonymous synchronization utility can only be created and the processes must synchronize using other mechanisms who creates the utility:
using namespace boost::interprocess; //Create the synchronization utility. AnonymousUtility(...)
Apart from its named/anonymous nature, Boost.Interprocess presents the following synchronization utilities:
Mutex stands for mutual exclusion and it's the most basic form of synchronization between processes. Mutexes guarantee that only one thread can lock a given mutex. If a code section is surrounded by a mutex locking and unlocking, it's guaranteed that only a thread at a time executes that section of code. When that thread unlocks the mutex, other threads can enter to that code region:
//The mutex has been previously constructed lock_the_mutex(); //This code will be executed only by one thread //at a time. unlock_the_mutex();
A mutex can also be recursive or non-recursive:
All the mutex types from Boost.Interprocess implement the following operations:
Effects: The calling thread tries to obtain ownership of the mutex, and if another thread has ownership of the mutex, it waits until it can obtain the ownership. If a thread takes ownership of the mutex the mutex must be unlocked by the same mutex. If the mutex supports recursive locking, the mutex must be unlocked the same number of times it is locked.
Throws: interprocess_exception on error.
Effects: The calling thread tries to obtain ownership of the mutex, and if another thread has ownership of the mutex returns immediately. If the mutex supports recursive locking, the mutex must be unlocked the same number of times it is locked.
Returns: If the thread acquires ownership of the mutex, returns true, if the another thread has ownership of the mutex, returns false.
Throws: interprocess_exception on error.
Effects: The calling thread will try to obtain exclusive ownership of the mutex if it can do so in until the specified time is reached. If the mutex supports recursive locking, the mutex must be unlocked the same number of times it is locked.
Returns: If the thread acquires ownership of the mutex, returns true, if the timeout expires returns false.
Throws: interprocess_exception on error.
Precondition: The thread must have exclusive ownership of the mutex.
Effects: The calling thread releases the exclusive ownership of the mutex. If the mutex supports recursive locking, the mutex must be unlocked the same number of times it is locked.
Throws: An exception derived from interprocess_exception on error.
Boost.Interprocess offers the following mutex types:
#include <boost/interprocess/sync/interprocess_mutex.hpp>
boost::interprocess::interprocess_mutex:
A non-recursive, anonymous mutex that can be placed in shared memory
or memory mapped files.
#include <boost/interprocess/sync/interprocess_recursive_mutex.hpp>
boost::interprocess::interprocess_recursive_mutex:
A recursive, anonymous mutex that can be placed in shared memory or memory
mapped files.
#include <boost/interprocess/sync/named_mutex.hpp>
named_mutex:
A non-recursive, named mutex.
#include <boost/interprocess/sync/named_recursive_mutex.hpp>
named_recursive_mutex:
A recursive, named mutex.
It's very important to unlock a mutex after the process has read or written the data. This can be difficult when dealing with exceptions, so usually mutexes are used with a scoped lock, a class that can guarantee that a mutex will always be unlocked even when an exception occurs. To use an scoped lock just include:
#include <boost/interprocess/sync/scoped_lock.hpp>
Basically, a scoped lock calls unlock() in its destructor, and a mutex is always unlocked when an exception occurs. Scoped lock has many constructors to lock, try_lock, timed_lock a mutex or not to lock it at all.
using namespace boost::interprocess; //Let's create any mutex type: MutexType mutex; { //This will lock the mutex scoped_lock<MutexType> lock(mutex); //Some code //The mutex will be unlocked here } { //This will try_lock the mutex scoped_lock<MutexType> lock(mutex, try_to_lock); //Check if the mutex has been successfully locked if(lock){ //Some code } //If the mutex was locked it will be unlocked } { boost::posix_time::ptime abs_time = ... //This will timed_lock the mutex scoped_lock<MutexType> lock(mutex, abs_time); //Check if the mutex has been successfully locked if(lock){ //Some code } //If the mutex was locked it will be unlocked }
For more information, check the scoped_lock's
reference.
Imagine that two processes need to write traces to a cyclic buffer built in shared memory. Each process needs to obtain exclusive access to the cyclic buffer, write the trace and continue.
To protect the cyclic buffer, we can store a process shared mutex in the
cyclic buffer. Each process will lock the mutex before writing the data
and will write a flag when ends writing the traces (doc_anonymous_mutex_shared_data.hpp
header):
#include <boost/interprocess/sync/interprocess_mutex.hpp> struct shared_memory_log { enum { NumItems = 100 }; enum { LineSize = 100 }; shared_memory_log() : current_line(0) , end_a(false) , end_b(false) {} //Mutex to protect access to the queue boost::interprocess::interprocess_mutex mutex; //Items to fill char items[NumItems][LineSize]; int current_line; bool end_a; bool end_b; };
This is the process main process. Creates the shared memory, constructs the the cyclic buffer and start writing traces:
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <boost/interprocess/sync/scoped_lock.hpp> #include "doc_anonymous_mutex_shared_data.hpp" #include <iostream> #include <cstdio> using namespace boost::interprocess; int main () { try{ //Erase previous shared memory shared_memory_object::remove("shared_memory"); //Create a shared memory object. shared_memory_object shm (create_only //only create ,"shared_memory" //name ,read_write //read-write mode ); //Set size shm.truncate(sizeof(shared_memory_log)); //Map the whole shared memory in this process mapped_region region (shm //What to map ,read_write //Map it as read-write ); //Get the address of the mapped region void * addr = region.get_address(); //Construct the shared structure in memory shared_memory_log * data = new (addr) shared_memory_log; //Write some logs for(int i = 0; i < shared_memory_log::NumItems; ++i){ //Lock the mutex scoped_lock<interprocess_mutex> lock(data->mutex); std::sprintf(data->items[(data->current_line++) % shared_memory_log::NumItems] ,"%s_%d", "process_a", i); if(i == (shared_memory_log::NumItems-1)) data->end_a = true; //Mutex is released here } //Wait until the other process ends while(1){ scoped_lock<interprocess_mutex> lock(data->mutex); if(data->end_b) break; } } catch(interprocess_exception &ex){ shared_memory_object::remove("shared_memory"); std::cout << ex.what() << std::endl; return 1; } //Erase shared memory shared_memory_object::remove("shared_memory"); return 0; }
The second process opens the shared memory, obtains access to the cyclic buffer and starts writing traces:
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <boost/interprocess/sync/scoped_lock.hpp> #include "doc_anonymous_mutex_shared_data.hpp" #include <iostream> #include <cstdio> using namespace boost::interprocess; int main () { try{ //Open the shared memory object. shared_memory_object shm (open_only //only create ,"shared_memory" //name ,read_write //read-write mode ); //Map the whole shared memory in this process mapped_region region (shm //What to map ,read_write //Map it as read-write ); //Get the address of the mapped region void * addr = region.get_address(); //Construct the shared structure in memory shared_memory_log * data = static_cast<shared_memory_log*>(addr); //Write some logs for(int i = 0; i < 100; ++i){ //Lock the mutex scoped_lock<interprocess_mutex> lock(data->mutex); std::sprintf(data->items[(data->current_line++) % shared_memory_log::NumItems] ,"%s_%d", "process_a", i); if(i == (shared_memory_log::NumItems-1)) data->end_b = true; //Mutex is released here } //Wait until the other process ends while(1){ scoped_lock<interprocess_mutex> lock(data->mutex); if(data->end_a) break; } } catch(interprocess_exception &ex){ shared_memory_object::remove("shared_memory"); std::cout << ex.what() << std::endl; return 1; } shared_memory_object::remove("shared_memory"); return 0; }
As we can see, a mutex is useful to protect data but not to notify to another process an event. For this, we need a condition variable, as we will see in the next section.
Now imagine that two processes want to write a trace to a file. First they write their name, and after that they write the message. Since the operating system can interrupt a process in any moment we can mix parts of the messages of both processes, so we need a way to write the whole message to the file atomically. To achieve this, we can use a named mutex so that each process locks the mutex before writing:
#include <boost/interprocess/sync/scoped_lock.hpp> #include <boost/interprocess/sync/named_mutex.hpp> #include <fstream> int main () { using namespace boost::interprocess; try{ //Open or create the named mutex named_mutex mutex(open_or_create, "fstream_named_mutex"); std::ofstream file("file_name"); for(int i = 0; i < 10; ++i){ //Do some operations... //Write to file atomically scoped_lock<named_mutex> lock(mutex); file << "Process name, "; file << "This is iteration #" << i; file << std::endl; } } catch(interprocess_exception &ex){ named_mutex::remove("fstream_named_mutex"); std::cout << ex.what() << std::endl; return 1; } named_mutex::remove("fstream_named_mutex"); return 0; }
In the previous example, a mutex is used to lock but we can't use it to wait efficiently until the condition to continue is met. A condition variable can do two things:
Waiting in a condition variable is always associated with a mutex. The mutex must be locked prior to waiting on the condition. When waiting on the condition variable, the thread unlocks the mutex and waits atomically.
When the thread returns from a wait function (because of a signal or a timeout, for example) the mutex object is again locked.
Boost.Interprocess offers the following condition types:
#include <boost/interprocess/sync/interprocess_condition.hpp>
boost::interprocess::interprocess_condition:
An anonymous condition variable that can be placed in shared memory or
memory mapped files to be used with boost::interprocess::interprocess_mutex.
#include <boost/interprocess/sync/named_condition.hpp>
boost::interprocess::named_condition:
A named condition variable to be used with boost::interprocess::named_mutex.
Named conditions are similar to anonymous conditions, but they are used in combination with named mutexes. Several times, we don't want to store synchronization objects with the synchronized data:
Imagine that a process that writes a trace to a simple shared memory buffer that another process prints one by one. The first process writes the trace and waits until the other process prints the data. To achieve this, we can use two condition variables: the first one is used to block the sender until the second process prints the message and the second one to block the receiver until the buffer has a trace to print.
The shared memory trace buffer (doc_anonymous_condition_shared_data.hpp):
#include <boost/interprocess/sync/interprocess_mutex.hpp> #include <boost/interprocess/sync/interprocess_condition.hpp> struct trace_queue { enum { LineSize = 100 }; trace_queue() : message_in(false) {} //Mutex to protect access to the queue boost::interprocess::interprocess_mutex mutex; //Condition to wait when the queue is empty boost::interprocess::interprocess_condition cond_empty; //Condition to wait when the queue is full boost::interprocess::interprocess_condition cond_full; //Items to fill char items[LineSize]; //Is there any message bool message_in; };
This is the process main process. Creates the shared memory, places there the buffer and starts writing messages one by one until it writes "last message" to indicate that there are no more messages to print:
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <boost/interprocess/sync/scoped_lock.hpp> #include <iostream> #include <cstdio> #include "doc_anonymous_condition_shared_data.hpp" using namespace boost::interprocess; int main () { //Erase previous shared memory shared_memory_object::remove("shared_memory"); //Create a shared memory object. shared_memory_object shm (create_only //only create ,"shared_memory" //name ,read_write //read-write mode ); try{ //Set size shm.truncate(sizeof(trace_queue)); //Map the whole shared memory in this process mapped_region region (shm //What to map ,read_write //Map it as read-write ); //Get the address of the mapped region void * addr = region.get_address(); //Construct the shared structure in memory trace_queue * data = new (addr) trace_queue; const int NumMsg = 100; for(int i = 0; i < NumMsg; ++i){ scoped_lock<interprocess_mutex> lock(data->mutex); if(data->message_in){ data->cond_full.wait(lock); } if(i == (NumMsg-1)) std::sprintf(data->items, "%s", "last message"); else std::sprintf(data->items, "%s_%d", "my_trace", i); //Notify to the other process that there is a message data->cond_empty.notify_one(); //Mark message buffer as full data->message_in = true; } } catch(interprocess_exception &ex){ shared_memory_object::remove("shared_memory"); std::cout << ex.what() << std::endl; return 1; } //Erase shared memory shared_memory_object::remove("shared_memory"); return 0; }
The second process opens the shared memory and prints each message until the "last message" message is received:
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <boost/interprocess/sync/scoped_lock.hpp> #include <iostream> #include <cstring> #include "doc_anonymous_condition_shared_data.hpp" using namespace boost::interprocess; int main () { //Create a shared memory object. shared_memory_object shm (open_only //only create ,"shared_memory" //name ,read_write //read-write mode ); try{ //Map the whole shared memory in this process mapped_region region (shm //What to map ,read_write //Map it as read-write ); //Get the address of the mapped region void * addr = region.get_address(); //Obtain a pointer to the shared structure trace_queue * data = static_cast<trace_queue*>(addr); //Print messages until the other process marks the end bool end_loop = false; do{ scoped_lock<interprocess_mutex> lock(data->mutex); if(!data->message_in){ data->cond_empty.wait(lock); } if(std::strcmp(data->items, "last message") == 0){ end_loop = true; } else{ //Print the message std::cout << data->items << std::endl; //Notify the other process that the buffer is empty data->message_in = false; data->cond_full.notify_one(); } } while(!end_loop); } catch(interprocess_exception &ex){ shared_memory_object::remove("shared_memory"); std::cout << ex.what() << std::endl; return 1; } //Erase shared memory shared_memory_object::remove("shared_memory"); return 0; }
With condition variables, a process can block if it can't continue the work, and when the conditions to continue are met another process can wake it.
A semaphore is a synchronization mechanism between processes based in an internal count that offers two basic operations:
If the initial semaphore count is initialized to 1, a Wait operation is equivalent to a mutex locking and Post is equivalent to a mutex unlocking. This type of semaphore is known as a binary semaphore.
Although semaphores can be used like mutexes, they have a unique feature: unlike mutexes, a Post operation need not be executed by the same thread/process that executed the Wait operation.
Boost.Interprocess offers the following semaphore types:
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
boost::interprocess::interprocess_semaphore:
An anonymous semaphore that can be placed in shared memory or memory
mapped files.
#include <boost/interprocess/sync/named_semaphore.hpp>
boost::interprocess::named_condition:
A named semaphore.
Named conditions are similar to anonymous conditions, but they are used in combination with named mutexes. Several times, we don't want to store synchronization objects with the synchronized data:
We will implement a integer array in shared memory that will be used to trasfer data from one process to another process. The first process will write some integers to the array and the process will block if the array is full.
The second process will copy the transmitted data to its own buffer, blocking if there is no new data in the buffer.
This is the shared integer array (doc_anonymous_semaphore_shared_data.hpp):
#include <boost/interprocess/sync/interprocess_semaphore.hpp> struct shared_memory_buffer { enum { NumItems = 10 }; shared_memory_buffer() : mutex(1), nempty(NumItems), nstored(0) {} //Semaphores to protect and synchronize access boost::interprocess::interprocess_semaphore mutex, nempty, nstored; //Items to fill int items[NumItems]; };
This is the process main process. Creates the shared memory, places there the interger array and starts integers one by one, blocking if the array is full:
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <iostream> #include "doc_anonymous_semaphore_shared_data.hpp" using namespace boost::interprocess; int main () { try{ //Erase previous shared memory shared_memory_object::remove("shared_memory"); //Create a shared memory object. shared_memory_object shm (create_only //only create ,"shared_memory" //name ,read_write //read-write mode ); //Set size shm.truncate(sizeof(shared_memory_buffer)); //Map the whole shared memory in this process mapped_region region (shm //What to map ,read_write //Map it as read-write ); //Get the address of the mapped region void * addr = region.get_address(); //Construct the shared structure in memory shared_memory_buffer * data = new (addr) shared_memory_buffer; const int NumMsg = 100; //Insert data in the array for(int i = 0; i < NumMsg; ++i){ data->nempty.wait(); data->mutex.wait(); data->items[i % shared_memory_buffer::NumItems] = i; data->mutex.post(); data->nstored.post(); } } catch(interprocess_exception &ex){ shared_memory_object::remove("shared_memory"); std::cout << ex.what() << std::endl; return 1; } //Erase shared memory shared_memory_object::remove("shared_memory"); return 0; }
The second process opens the shared memory and copies the received integers to it's own buffer:
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <iostream> #include "doc_anonymous_semaphore_shared_data.hpp" using namespace boost::interprocess; int main () { try{ //Create a shared memory object. shared_memory_object shm (open_only //only create ,"shared_memory" //name ,read_write //read-write mode ); //Map the whole shared memory in this process mapped_region region (shm //What to map ,read_write //Map it as read-write ); //Get the address of the mapped region void * addr = region.get_address(); //Obtain the shared structure shared_memory_buffer * data = static_cast<shared_memory_buffer*>(addr); const int NumMsg = 100; int extracted_data [NumMsg]; //Extract the data for(int i = 0; i < NumMsg; ++i){ data->nstored.wait(); data->mutex.wait(); extracted_data[i] = data->items[i % shared_memory_buffer::NumItems]; data->mutex.post(); data->nempty.post(); } } catch(interprocess_exception &ex){ shared_memory_object::remove("shared_memory"); std::cout << ex.what() << std::endl; return 1; } //Erase shared memory shared_memory_object::remove("shared_memory"); return 0; }
The same interprocess communication can be achieved with a condition variables and mutexes, but for several synchronization patterns, a semaphore is more efficient than a mutex/condition combination.
An upgradable mutex is an special mutex that offers more locking possibilities than a normal mutex. Sometimes, we can distinguish between reading the data and modifying the data. If just some threads need to modify the data, and a plain mutex is used to protect the data from concurrent access, concurrency is pretty limited: two threads that only read the data will be serialized instead of being executed concurrently.
If we allow concurrent access to threads that just read the data but we avoid concurrent access between threads that read and modify or between threads that modify, we can increase performance. This is specially true in applications where data reading is more common than data modification and the synchronized data reading code needs some time to execute. With an upgradable mutex we can acquire 3 lock types:
To sum up:
Table 8.5. Locking Possibilities
|
If a thread has acquired the... |
Other threads can acquire... |
|---|---|
|
Sharable lock |
many sharable locks and 1 upgradable lock |
|
Upgradable lock |
many sharable locks |
|
Exclusive lock |
no locks |
A thread that has acquired a lock can try to acquire another lock type atomically. All lock transitions are not guaranteed to succeed. Even if a transition is guaranteed to succeed, some transitions will block the thread waiting until other threads release the sharable locks. Atomically means that no other thread will acquire an Upgradable or Exclusive lock in the transition, so data is guaranteed to remain unchanged:
Table 8.6. Transition Possibilities
|
If a thread has acquired the... |
It can atomically release the previous lock and... |
|---|---|
|
Sharable lock |
try to obtain (not guaranteed) immediately the Exclusive lock if no other thread has exclusive or upgrable lock |
|
Sharable lock |
try to obtain (not guaranteed) immediately the Upgradable lock if no other thread has exclusive or upgrable lock |
|
Upgradable lock |
obtain the Exclusive lock when all sharable locks are released |
|
Upgradable lock |
obtain the Sharable lock immediately |
|
Exclusive lock |
obtain the Upgradable lock immediately |
|
Exclusive lock |
obtain the Sharable lock immediately |
As we can see, an upgradable mutex is a powerful synchronization utility that can improve the concurrency. However, if most of the time we have to modify the data, or the synchronized code section is very short, it's more efficient to use a plain mutex, since it has less overhead. Upgradable lock shines when the synchronized code section is bigger and there are more readers than modifiers.
All the upgradable mutex types from Boost.Interprocess implement the following operations:
Effects: The calling thread tries to obtain exclusive ownership of the mutex, and if another thread has exclusive, sharable or upgradable ownership of the mutex, it waits until it can obtain the ownership.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire exclusive ownership of the mutex without waiting. If no other thread has exclusive, sharable or upgradable ownership of the mutex this succeeds.
Returns: If it can acquire exclusive ownership immediately returns true. If it has to wait, returns false.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire exclusive ownership of the mutex waiting if necessary until no other thread has has exclusive, sharable or upgradable ownership of the mutex or abs_time is reached.
Returns: If acquires exclusive ownership, returns true. Otherwise returns false.
Throws: interprocess_exception on error.
Precondition: The thread must have exclusive ownership of the mutex.
Effects: The calling thread releases the exclusive ownership of the mutex.
Throws: An exception derived from interprocess_exception on error.
Effects: The calling thread tries to obtain sharable ownership of the mutex, and if another thread has exclusive or upgradable ownership of the mutex, waits until it can obtain the ownership.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire sharable ownership of the mutex without waiting. If no other thread has has exclusive or upgradable ownership of the mutex this succeeds.
Returns: If it can acquire sharable ownership immediately returns true. If it has to wait, returns false.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire sharable ownership of the mutex waiting if necessary until no other thread has has exclusive or upgradable ownership of the mutex or abs_time is reached.
Returns: If acquires sharable ownership, returns true. Otherwise returns false.
Throws: interprocess_exception on error.
Precondition: The thread must have sharable ownership of the mutex.
Effects: The calling thread releases the sharable ownership of the mutex.
Throws: An exception derived from interprocess_exception on error.
Effects: The calling thread tries to obtain upgradable ownership of the mutex, and if another thread has exclusive or upgradable ownership of the mutex, waits until it can obtain the ownership.
Throws: interprocess_exception on error.