...one of the most highly
regarded and expertly designed C++ library projects in the
world.
— Herb Sutter and Andrei
Alexandrescu, C++
Coding Standards
As mentioned before, the ability to shared memory between processes through memory mapped files or shared memory objects is not very useful if the access to that memory can't be effectively synchronized. This is the same problem that happens with thread-synchronization mechanisms, where heap memory and global variables are shared between threads, but the access to these resources needs to be synchronized typically through mutex and condition variables. Boost.Threads implements these synchronization utilities between threads inside the same process. Boost.Interprocess implements similar mechanisms to synchronize threads from different processes.
Boost.Interprocess presents two types of synchronization objects:
fstream
with
the name filename and another process opens that
file using another fstream
with the same filename argument. Each
process uses a different object to access to the resource, but both
processes are using the same underlying resource.
Each type has it's own advantages and disadvantages:
The main interface difference between named and anonymous utilities are the constructors. Usually anonymous utilities have only one constructor, whereas the named utilities have several constructors whose first argument is a special type that requests creation, opening or opening or creation of the underlying resource:
using namespace boost::interprocess; //Create the synchronization utility. If it previously //exists, throws an error NamedUtility(create_only, ...) //Open the synchronization utility. If it does not previously //exist, it's created. NamedUtility(open_or_create, ...) //Open the synchronization utility. If it does not previously //exist, throws an error. NamedUtility(open_only, ...)
On the other hand the anonymous synchronization utility can only be created and the processes must synchronize using other mechanisms who creates the utility:
using namespace boost::interprocess; //Create the synchronization utility. AnonymousUtility(...)
Apart from its named/anonymous nature, Boost.Interprocess presents the following synchronization utilities:
Mutex stands for mutual exclusion and it's the most basic form of synchronization between processes. Mutexes guarantee that only one thread can lock a given mutex. If a code section is surrounded by a mutex locking and unlocking, it's guaranteed that only a thread at a time executes that section of code. When that thread unlocks the mutex, other threads can enter to that code region:
//The mutex has been previously constructed lock_the_mutex(); //This code will be executed only by one thread //at a time. unlock_the_mutex();
A mutex can also be recursive or non-recursive:
All the mutex types from Boost.Interprocess implement the following operations:
Effects: The calling thread tries to obtain ownership of the mutex, and if another thread has ownership of the mutex, it waits until it can obtain the ownership. If a thread takes ownership of the mutex the mutex must be unlocked by the same thread. If the mutex supports recursive locking, the mutex must be unlocked the same number of times it is locked.
Throws: interprocess_exception on error.
Effects: The calling thread tries to obtain ownership of the mutex, and if another thread has ownership of the mutex returns immediately. If the mutex supports recursive locking, the mutex must be unlocked the same number of times it is locked.
Returns: If the thread acquires ownership of the mutex, returns true, if the another thread has ownership of the mutex, returns false.
Throws: interprocess_exception on error.
Effects: The calling thread will try to obtain exclusive ownership of the mutex if it can do so in until the specified time is reached. If the mutex supports recursive locking, the mutex must be unlocked the same number of times it is locked.
Returns: If the thread acquires ownership of the mutex, returns true, if the timeout expires returns false.
Throws: interprocess_exception on error.
Precondition: The thread must have exclusive ownership of the mutex.
Effects: The calling thread releases the exclusive ownership of the mutex. If the mutex supports recursive locking, the mutex must be unlocked the same number of times it is locked.
Throws: An exception derived from interprocess_exception on error.
Boost.Interprocess offers the following mutex types:
#include <boost/interprocess/sync/interprocess_mutex.hpp>
interprocess_mutex
:
A non-recursive, anonymous mutex that can be placed in shared memory
or memory mapped files.
#include <boost/interprocess/sync/interprocess_recursive_mutex.hpp>
interprocess_recursive_mutex
:
A recursive, anonymous mutex that can be placed in shared memory or
memory mapped files.
#include <boost/interprocess/sync/named_mutex.hpp>
named_mutex
:
A non-recursive, named mutex.
#include <boost/interprocess/sync/named_recursive_mutex.hpp>
named_recursive_mutex
:
A recursive, named mutex.
It's very important to unlock a mutex after the process has read or written the data. This can be difficult when dealing with exceptions, so usually mutexes are used with a scoped lock, a class that can guarantee that a mutex will always be unlocked even when an exception occurs. To use a scoped lock just include:
#include <boost/interprocess/sync/scoped_lock.hpp>
Basically, a scoped lock calls unlock() in its destructor, and a mutex is always unlocked when an exception occurs. Scoped lock has many constructors to lock, try_lock, timed_lock a mutex or not to lock it at all.
using namespace boost::interprocess; //Let's create any mutex type: MutexType mutex; { //This will lock the mutex scoped_lock<MutexType> lock(mutex); //Some code //The mutex will be unlocked here } { //This will try_lock the mutex scoped_lock<MutexType> lock(mutex, try_to_lock); //Check if the mutex has been successfully locked if(lock){ //Some code } //If the mutex was locked it will be unlocked } { boost::posix_time::ptime abs_time = ... //This will timed_lock the mutex scoped_lock<MutexType> lock(mutex, abs_time); //Check if the mutex has been successfully locked if(lock){ //Some code } //If the mutex was locked it will be unlocked }
For more information, check the scoped_lock's
reference
.
Imagine that two processes need to write traces to a cyclic buffer built in shared memory. Each process needs to obtain exclusive access to the cyclic buffer, write the trace and continue.
To protect the cyclic buffer, we can store a process shared mutex in the
cyclic buffer. Each process will lock the mutex before writing the data
and will write a flag when ends writing the traces (doc_anonymous_mutex_shared_data.hpp
header):
#include <boost/interprocess/sync/interprocess_mutex.hpp> struct shared_memory_log { enum { NumItems = 100 }; enum { LineSize = 100 }; shared_memory_log() : current_line(0) , end_a(false) , end_b(false) {} //Mutex to protect access to the queue boost::interprocess::interprocess_mutex mutex; //Items to fill char items[NumItems][LineSize]; int current_line; bool end_a; bool end_b; };
This is the process main process. Creates the shared memory, constructs the cyclic buffer and start writing traces:
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <boost/interprocess/sync/scoped_lock.hpp> #include "doc_anonymous_mutex_shared_data.hpp" #include <iostream> #include <cstdio> using namespace boost::interprocess; int main () { try{ //Remove shared memory on construction and destruction struct shm_remove { shm_remove() { shared_memory_object::remove("MySharedMemory"); } ~shm_remove(){ shared_memory_object::remove("MySharedMemory"); } } remover; //Create a shared memory object. shared_memory_object shm (create_only //only create ,"MySharedMemory" //name ,read_write //read-write mode ); //Set size shm.truncate(sizeof(shared_memory_log)); //Map the whole shared memory in this process mapped_region region (shm //What to map ,read_write //Map it as read-write ); //Get the address of the mapped region void * addr = region.get_address(); //Construct the shared structure in memory shared_memory_log * data = new (addr) shared_memory_log; //Write some logs for(int i = 0; i < shared_memory_log::NumItems; ++i){ //Lock the mutex scoped_lock<interprocess_mutex> lock(data->mutex); std::sprintf(data->items[(data->current_line++) % shared_memory_log::NumItems] ,"%s_%d", "process_a", i); if(i == (shared_memory_log::NumItems-1)) data->end_a = true; //Mutex is released here } //Wait until the other process ends while(1){ scoped_lock<interprocess_mutex> lock(data->mutex); if(data->end_b) break; } } catch(interprocess_exception &ex){ std::cout << ex.what() << std::endl; return 1; } return 0; }
The second process opens the shared memory, obtains access to the cyclic buffer and starts writing traces:
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <boost/interprocess/sync/scoped_lock.hpp> #include "doc_anonymous_mutex_shared_data.hpp" #include <iostream> #include <cstdio> using namespace boost::interprocess; int main () { //Remove shared memory on destruction struct shm_remove { ~shm_remove(){ shared_memory_object::remove("MySharedMemory"); } } remover; //Open the shared memory object. shared_memory_object shm (open_only //only create ,"MySharedMemory" //name ,read_write //read-write mode ); //Map the whole shared memory in this process mapped_region region (shm //What to map ,read_write //Map it as read-write ); //Get the address of the mapped region void * addr = region.get_address(); //Construct the shared structure in memory shared_memory_log * data = static_cast<shared_memory_log*>(addr); //Write some logs for(int i = 0; i < 100; ++i){ //Lock the mutex scoped_lock<interprocess_mutex> lock(data->mutex); std::sprintf(data->items[(data->current_line++) % shared_memory_log::NumItems] ,"%s_%d", "process_a", i); if(i == (shared_memory_log::NumItems-1)) data->end_b = true; //Mutex is released here } //Wait until the other process ends while(1){ scoped_lock<interprocess_mutex> lock(data->mutex); if(data->end_a) break; } return 0; }
As we can see, a mutex is useful to protect data but not to notify an event to another process. For this, we need a condition variable, as we will see in the next section.
Now imagine that two processes want to write a trace to a file. First they write their name, and after that they write the message. Since the operating system can interrupt a process in any moment we can mix parts of the messages of both processes, so we need a way to write the whole message to the file atomically. To achieve this, we can use a named mutex so that each process locks the mutex before writing:
#include <boost/interprocess/sync/scoped_lock.hpp> #include <boost/interprocess/sync/named_mutex.hpp> #include <fstream> #include <iostream> #include <cstdio> int main () { using namespace boost::interprocess; try{ struct file_remove { file_remove() { std::remove("file_name"); } ~file_remove(){ std::remove("file_name"); } } file_remover; struct mutex_remove { mutex_remove() { named_mutex::remove("fstream_named_mutex"); } ~mutex_remove(){ named_mutex::remove("fstream_named_mutex"); } } remover; //Open or create the named mutex named_mutex mutex(open_or_create, "fstream_named_mutex"); std::ofstream file("file_name"); for(int i = 0; i < 10; ++i){ //Do some operations... //Write to file atomically scoped_lock<named_mutex> lock(mutex); file << "Process name, "; file << "This is iteration #" << i; file << std::endl; } } catch(interprocess_exception &ex){ std::cout << ex.what() << std::endl; return 1; } return 0; }
In the previous example, a mutex is used to lock but we can't use it to wait efficiently until the condition to continue is met. A condition variable can do two things:
Waiting in a condition variable is always associated with a mutex. The mutex must be locked prior to waiting on the condition. When waiting on the condition variable, the thread unlocks the mutex and waits atomically.
When the thread returns from a wait function (because of a signal or a timeout, for example) the mutex object is again locked.
Boost.Interprocess offers the following condition types:
#include <boost/interprocess/sync/interprocess_condition.hpp>
interprocess_condition
:
An anonymous condition variable that can be placed in shared memory
or memory mapped files to be used with boost::interprocess::interprocess_mutex
.
#include <boost/interprocess/sync/named_condition.hpp>
named_condition
:
A named condition variable to be used with named_mutex
.
Named conditions are similar to anonymous conditions, but they are used in combination with named mutexes. Several times, we don't want to store synchronization objects with the synchronized data:
Imagine that a process that writes a trace to a simple shared memory buffer that another process prints one by one. The first process writes the trace and waits until the other process prints the data. To achieve this, we can use two condition variables: the first one is used to block the sender until the second process prints the message and the second one to block the receiver until the buffer has a trace to print.
The shared memory trace buffer (doc_anonymous_condition_shared_data.hpp):
#include <boost/interprocess/sync/interprocess_mutex.hpp> #include <boost/interprocess/sync/interprocess_condition.hpp> struct trace_queue { enum { LineSize = 100 }; trace_queue() : message_in(false) {} //Mutex to protect access to the queue boost::interprocess::interprocess_mutex mutex; //Condition to wait when the queue is empty boost::interprocess::interprocess_condition cond_empty; //Condition to wait when the queue is full boost::interprocess::interprocess_condition cond_full; //Items to fill char items[LineSize]; //Is there any message bool message_in; };
This is the process main process. Creates the shared memory, places there the buffer and starts writing messages one by one until it writes "last message" to indicate that there are no more messages to print:
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <boost/interprocess/sync/scoped_lock.hpp> #include <iostream> #include <cstdio> #include "doc_anonymous_condition_shared_data.hpp" using namespace boost::interprocess; int main () { //Erase previous shared memory and schedule erasure on exit struct shm_remove { shm_remove() { shared_memory_object::remove("MySharedMemory"); } ~shm_remove(){ shared_memory_object::remove("MySharedMemory"); } } remover; //Create a shared memory object. shared_memory_object shm (create_only //only create ,"MySharedMemory" //name ,read_write //read-write mode ); try{ //Set size shm.truncate(sizeof(trace_queue)); //Map the whole shared memory in this process mapped_region region (shm //What to map ,read_write //Map it as read-write ); //Get the address of the mapped region void * addr = region.get_address(); //Construct the shared structure in memory trace_queue * data = new (addr) trace_queue; const int NumMsg = 100; for(int i = 0; i < NumMsg; ++i){ scoped_lock<interprocess_mutex> lock(data->mutex); if(data->message_in){ data->cond_full.wait(lock); } if(i == (NumMsg-1)) std::sprintf(data->items, "%s", "last message"); else std::sprintf(data->items, "%s_%d", "my_trace", i); //Notify to the other process that there is a message data->cond_empty.notify_one(); //Mark message buffer as full data->message_in = true; } } catch(interprocess_exception &ex){ std::cout << ex.what() << std::endl; return 1; } return 0; }
The second process opens the shared memory and prints each message until the "last message" message is received:
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <boost/interprocess/sync/scoped_lock.hpp> #include <iostream> #include <cstring> #include "doc_anonymous_condition_shared_data.hpp" using namespace boost::interprocess; int main () { //Create a shared memory object. shared_memory_object shm (open_only //only create ,"MySharedMemory" //name ,read_write //read-write mode ); try{ //Map the whole shared memory in this process mapped_region region (shm //What to map ,read_write //Map it as read-write ); //Get the address of the mapped region void * addr = region.get_address(); //Obtain a pointer to the shared structure trace_queue * data = static_cast<trace_queue*>(addr); //Print messages until the other process marks the end bool end_loop = false; do{ scoped_lock<interprocess_mutex> lock(data->mutex); if(!data->message_in){ data->cond_empty.wait(lock); } if(std::strcmp(data->items, "last message") == 0){ end_loop = true; } else{ //Print the message std::cout << data->items << std::endl; //Notify the other process that the buffer is empty data->message_in = false; data->cond_full.notify_one(); } } while(!end_loop); } catch(interprocess_exception &ex){ std::cout << ex.what() << std::endl; return 1; } return 0; }
With condition variables, a process can block if it can't continue the work, and when the conditions to continue are met another process can wake it.
A semaphore is a synchronization mechanism between processes based in an internal count that offers two basic operations:
If the initial semaphore count is initialized to 1, a Wait operation is equivalent to a mutex locking and Post is equivalent to a mutex unlocking. This type of semaphore is known as a binary semaphore.
Although semaphores can be used like mutexes, they have a unique feature: unlike mutexes, a Post operation need not be executed by the same thread/process that executed the Wait operation.
Boost.Interprocess offers the following semaphore types:
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
interprocess_semaphore
:
An anonymous semaphore that can be placed in shared memory or memory
mapped files.
#include <boost/interprocess/sync/named_semaphore.hpp>
named_semaphore
:
A named semaphore.
We will implement an integer array in shared memory that will be used to transfer data from one process to another process. The first process will write some integers to the array and the process will block if the array is full.
The second process will copy the transmitted data to its own buffer, blocking if there is no new data in the buffer.
This is the shared integer array (doc_anonymous_semaphore_shared_data.hpp):
#include <boost/interprocess/sync/interprocess_semaphore.hpp> struct shared_memory_buffer { enum { NumItems = 10 }; shared_memory_buffer() : mutex(1), nempty(NumItems), nstored(0) {} //Semaphores to protect and synchronize access boost::interprocess::interprocess_semaphore mutex, nempty, nstored; //Items to fill int items[NumItems]; };
This is the process main process. Creates the shared memory, places there the integer array and starts integers one by one, blocking if the array is full:
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <iostream> #include "doc_anonymous_semaphore_shared_data.hpp" using namespace boost::interprocess; int main () { //Remove shared memory on construction and destruction struct shm_remove { shm_remove() { shared_memory_object::remove("MySharedMemory"); } ~shm_remove(){ shared_memory_object::remove("MySharedMemory"); } } remover; //Create a shared memory object. shared_memory_object shm (create_only //only create ,"MySharedMemory" //name ,read_write //read-write mode ); //Set size shm.truncate(sizeof(shared_memory_buffer)); //Map the whole shared memory in this process mapped_region region (shm //What to map ,read_write //Map it as read-write ); //Get the address of the mapped region void * addr = region.get_address(); //Construct the shared structure in memory shared_memory_buffer * data = new (addr) shared_memory_buffer; const int NumMsg = 100; //Insert data in the array for(int i = 0; i < NumMsg; ++i){ data->nempty.wait(); data->mutex.wait(); data->items[i % shared_memory_buffer::NumItems] = i; data->mutex.post(); data->nstored.post(); } return 0; }
The second process opens the shared memory and copies the received integers to it's own buffer:
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <iostream> #include "doc_anonymous_semaphore_shared_data.hpp" using namespace boost::interprocess; int main () { //Remove shared memory on destruction struct shm_remove { ~shm_remove(){ shared_memory_object::remove("MySharedMemory"); } } remover; //Create a shared memory object. shared_memory_object shm (open_only //only create ,"MySharedMemory" //name ,read_write //read-write mode ); //Map the whole shared memory in this process mapped_region region (shm //What to map ,read_write //Map it as read-write ); //Get the address of the mapped region void * addr = region.get_address(); //Obtain the shared structure shared_memory_buffer * data = static_cast<shared_memory_buffer*>(addr); const int NumMsg = 100; int extracted_data [NumMsg]; //Extract the data for(int i = 0; i < NumMsg; ++i){ data->nstored.wait(); data->mutex.wait(); extracted_data[i] = data->items[i % shared_memory_buffer::NumItems]; data->mutex.post(); data->nempty.post(); } return 0; }
The same interprocess communication can be achieved with a condition variables and mutexes, but for several synchronization patterns, a semaphore is more efficient than a mutex/condition combination.
An upgradable mutex is a special mutex that offers more locking possibilities than a normal mutex. Sometimes, we can distinguish between reading the data and modifying the data. If just some threads need to modify the data, and a plain mutex is used to protect the data from concurrent access, concurrency is pretty limited: two threads that only read the data will be serialized instead of being executed concurrently.
If we allow concurrent access to threads that just read the data but we avoid concurrent access between threads that read and modify or between threads that modify, we can increase performance. This is specially true in applications where data reading is more common than data modification and the synchronized data reading code needs some time to execute. With an upgradable mutex we can acquire 3 lock types:
To sum up:
Table 11.5. Locking Possibilities
If a thread has acquired the... |
Other threads can acquire... |
---|---|
Sharable lock |
many sharable locks and 1 upgradable lock |
Upgradable lock |
many sharable locks |
Exclusive lock |
no locks |
A thread that has acquired a lock can try to acquire another lock type atomically. All lock transitions are not guaranteed to succeed. Even if a transition is guaranteed to succeed, some transitions will block the thread waiting until other threads release the sharable locks. Atomically means that no other thread will acquire an Upgradable or Exclusive lock in the transition, so data is guaranteed to remain unchanged:
Table 11.6. Transition Possibilities
If a thread has acquired the... |
It can atomically release the previous lock and... |
---|---|
Sharable lock |
try to obtain (not guaranteed) immediately the Exclusive lock if no other thread has exclusive or upgrable lock |
Sharable lock |
try to obtain (not guaranteed) immediately the Upgradable lock if no other thread has exclusive or upgrable lock |
Upgradable lock |
obtain the Exclusive lock when all sharable locks are released |
Upgradable lock |
obtain the Sharable lock immediately |
Exclusive lock |
obtain the Upgradable lock immediately |
Exclusive lock |
obtain the Sharable lock immediately |
As we can see, an upgradable mutex is a powerful synchronization utility that can improve the concurrency. However, if most of the time we have to modify the data, or the synchronized code section is very short, it's more efficient to use a plain mutex, since it has less overhead. Upgradable lock shines when the synchronized code section is bigger and there are more readers than modifiers.
All the upgradable mutex types from Boost.Interprocess implement the following operations:
Effects: The calling thread tries to obtain exclusive ownership of the mutex, and if another thread has exclusive, sharable or upgradable ownership of the mutex, it waits until it can obtain the ownership.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire exclusive ownership of the mutex without waiting. If no other thread has exclusive, sharable or upgradable ownership of the mutex this succeeds.
Returns: If it can acquire exclusive ownership immediately returns true. If it has to wait, returns false.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire exclusive ownership of the mutex waiting if necessary until no other thread has has exclusive, sharable or upgradable ownership of the mutex or abs_time is reached.
Returns: If acquires exclusive ownership, returns true. Otherwise returns false.
Throws: interprocess_exception on error.
Precondition: The thread must have exclusive ownership of the mutex.
Effects: The calling thread releases the exclusive ownership of the mutex.
Throws: An exception derived from interprocess_exception on error.
Effects: The calling thread tries to obtain sharable ownership of the mutex, and if another thread has exclusive or upgradable ownership of the mutex, waits until it can obtain the ownership.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire sharable ownership of the mutex without waiting. If no other thread has has exclusive or upgradable ownership of the mutex this succeeds.
Returns: If it can acquire sharable ownership immediately returns true. If it has to wait, returns false.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire sharable ownership of the mutex waiting if necessary until no other thread has has exclusive or upgradable ownership of the mutex or abs_time is reached.
Returns: If acquires sharable ownership, returns true. Otherwise returns false.
Throws: interprocess_exception on error.
Precondition: The thread must have sharable ownership of the mutex.
Effects: The calling thread releases the sharable ownership of the mutex.
Throws: An exception derived from interprocess_exception on error.
Effects: The calling thread tries to obtain upgradable ownership of the mutex, and if another thread has exclusive or upgradable ownership of the mutex, waits until it can obtain the ownership.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire upgradable ownership of the mutex without waiting. If no other thread has has exclusive or upgradable ownership of the mutex this succeeds.
Returns: If it can acquire upgradable ownership immediately returns true. If it has to wait, returns false.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire upgradable ownership of the mutex waiting if necessary until no other thread has has exclusive or upgradable ownership of the mutex or abs_time is reached.
Returns: If acquires upgradable ownership, returns true. Otherwise returns false.
Throws: interprocess_exception on error.
Precondition: The thread must have upgradable ownership of the mutex.
Effects: The calling thread releases the upgradable ownership of the mutex.
Throws: An exception derived from interprocess_exception on error.
Precondition: The thread must have exclusive ownership of the mutex.
Effects: The thread atomically releases exclusive ownership and acquires upgradable ownership. This operation is non-blocking.
Throws: An exception derived from interprocess_exception on error.
Precondition: The thread must have exclusive ownership of the mutex.
Effects: The thread atomically releases exclusive ownership and acquires sharable ownership. This operation is non-blocking.
Throws: An exception derived from interprocess_exception on error.
Precondition: The thread must have upgradable ownership of the mutex.
Effects: The thread atomically releases upgradable ownership and acquires sharable ownership. This operation is non-blocking.
Throws: An exception derived from interprocess_exception on error.
Precondition: The thread must have upgradable ownership of the mutex.
Effects: The thread atomically releases upgradable ownership and acquires exclusive ownership. This operation will block until all threads with sharable ownership release it.
Throws: An exception derived from interprocess_exception on error.
Precondition: The thread must have upgradable ownership of the mutex.
Effects: The thread atomically releases upgradable ownership and tries to acquire exclusive ownership. This operation will fail if there are threads with sharable ownership, but it will maintain upgradable ownership.
Returns: If acquires exclusive ownership, returns true. Otherwise returns false.
Throws: An exception derived from interprocess_exception on error.
Precondition: The thread must have upgradable ownership of the mutex.
Effects: The thread atomically releases upgradable ownership and tries to acquire exclusive ownership, waiting if necessary until abs_time. This operation will fail if there are threads with sharable ownership or timeout reaches, but it will maintain upgradable ownership.
Returns: If acquires exclusive ownership, returns true. Otherwise returns false.
Throws: An exception derived from interprocess_exception on error.
Precondition: The thread must have sharable ownership of the mutex.
Effects: The thread atomically releases sharable ownership and tries to acquire exclusive ownership. This operation will fail if there are threads with sharable or upgradable ownership, but it will maintain sharable ownership.
Returns: If acquires exclusive ownership, returns true. Otherwise returns false.
Throws: An exception derived from interprocess_exception on error.
Precondition: The thread must have sharable ownership of the mutex.
Effects: The thread atomically releases sharable ownership and tries to acquire upgradable ownership. This operation will fail if there are threads with sharable or upgradable ownership, but it will maintain sharable ownership.
Returns: If acquires upgradable ownership, returns true. Otherwise returns false.
Throws: An exception derived from interprocess_exception on error.
Boost.Interprocess offers the following upgradable mutex types:
#include <boost/interprocess/sync/interprocess_upgradable_mutex.hpp>
interprocess_upgradable_mutex
:
A non-recursive, anonymous upgradable mutex that can be placed in shared
memory or memory mapped files.
#include <boost/interprocess/sync/named_upgradable_mutex.hpp>
named_upgradable_mutex
:
A non-recursive, named upgradable mutex.
As with plain mutexes, it's important to release the acquired lock even
in the presence of exceptions. Boost.Interprocess
mutexes are best used with the scoped_lock
utility, and this class only offers exclusive locking.
As we have sharable locking and upgradable locking with upgradable mutexes,
we have two new utilities: sharable_lock
and upgradable_lock
.
Both classes are similar to scoped_lock
but sharable_lock
acquires
the sharable lock in the constructor and upgradable_lock
acquires the upgradable lock in the constructor.
These two utilities can be use with any synchronization object that offers
the needed operations. For example, a user defined mutex type with no upgradable
locking features can use sharable_lock
if the synchronization object offers lock_sharable()
and unlock_sharable() operations:
#include <boost/interprocess/sync/sharable_lock.hpp>
#include <boost/interprocess/sync/upgradable_lock.hpp>
sharable_lock
calls unlock_sharable() in its destructor, and upgradable_lock
calls unlock_upgradable()
in its destructor, so the upgradable mutex is always unlocked when an exception
occurs. Scoped lock has many constructors to lock, try_lock, timed_lock
a mutex or not to lock it at all.
using namespace boost::interprocess; //Let's create any mutex type: MutexType mutex; { //This will call lock_sharable() sharable_lock<MutexType> lock(mutex); //Some code //The mutex will be unlocked here } { //This won't lock the mutex() sharable_lock<MutexType> lock(mutex, defer_lock); //Lock it on demand. This will call lock_sharable() lock.lock(); //Some code //The mutex will be unlocked here } { //This will call try_lock_sharable() sharable_lock<MutexType> lock(mutex, try_to_lock); //Check if the mutex has been successfully locked if(lock){ //Some code } //If the mutex was locked it will be unlocked } { boost::posix_time::ptime abs_time = ... //This will call timed_lock_sharable() scoped_lock<MutexType> lock(mutex, abs_time); //Check if the mutex has been successfully locked if(lock){ //Some code } //If the mutex was locked it will be unlocked } { //This will call lock_upgradable() upgradable_lock<MutexType> lock(mutex); //Some code //The mutex will be unlocked here } { //This won't lock the mutex() upgradable_lock<MutexType> lock(mutex, defer_lock); //Lock it on demand. This will call lock_upgradable() lock.lock(); //Some code //The mutex will be unlocked here } { //This will call try_lock_upgradable() upgradable_lock<MutexType> lock(mutex, try_to_lock); //Check if the mutex has been successfully locked if(lock){ //Some code } //If the mutex was locked it will be unlocked } { boost::posix_time::ptime abs_time = ... //This will call timed_lock_upgradable() scoped_lock<MutexType> lock(mutex, abs_time); //Check if the mutex has been successfully locked if(lock){ //Some code } //If the mutex was locked it will be unlocked }
upgradable_lock
and sharable_lock
offer more features and operations, see their reference for more informations
Scoped locks and similar utilities offer simple resource management possibilities,
but with advanced mutex types like upgradable mutexes, there are operations
where an acquired lock type is released and another lock type is acquired
atomically. This is implemented by upgradable mutex operations like unlock_and_lock_sharable()
.
These operations can be managed more effectively using lock transfer operations. A lock transfer operations explicitly indicates that a mutex owned by a lock is transferred to another lock executing atomic unlocking plus locking operations.
Imagine that a thread modifies some data in the beginning but after that, it has to just read it in a long time. The code can acquire the exclusive lock, modify the data and atomically release the exclusive lock and acquire the sharable lock. With these sequence we guarantee that no other thread can modify the data in the transition and that more readers can acquire sharable lock, increasing concurrency. Without lock transfer operations, this would be coded like this:
using boost::interprocess; interprocess_upgradable_mutex mutex; //Acquire exclusive lock mutex.lock(); //Modify data //Atomically release exclusive lock and acquire sharable lock. //More threads can acquire the sharable lock and read the data. mutex.unlock_and_lock_sharable(); //Read data //Explicit unlocking mutex.unlock_sharable();
This can be simple, but in the presence of exceptions, it's complicated to know what type of lock the mutex had when the exception was thrown and what function we should call to unlock it:
try{ //Mutex operations } catch(...){ //What should we call? "unlock()" or "unlock_sharable()" //Is the mutex locked? }
We can use lock transfer to simplify all this management:
using boost::interprocess; interprocess_upgradable_mutex mutex; //Acquire exclusive lock scoped_lock s_lock(mutex); //Modify data //Atomically release exclusive lock and acquire sharable lock. //More threads can acquire the sharable lock and read the data. sharable_lock(move(s_lock)); //Read data //The lock is automatically unlocked calling the appropriate unlock //function even in the presence of exceptions. //If the mutex was not locked, no function is called.
As we can see, even if an exception is thrown at any moment, the mutex
will be automatically unlocked calling the appropriate unlock()
or unlock_sharable()
method.
There are many lock transfer operations that we can classify according to the operations presented in the upgradable mutex operations:
Transfers to scoped_lock
are guaranteed to succeed only from an upgradable_lock
and only if a blocking operation is requested, due to the fact that this
operation needs to wait until all sharable locks are released. The user
can also use "try" or "timed" transfer to avoid infinite
locking, but succeed is not guaranteed.
A conversion from a sharable_lock
is never guaranteed and thus, only a try operation is permitted:
//Conversions to scoped_lock { upgradable_lock<Mutex> u_lock(mut); //This calls unlock_upgradable_and_lock() scoped_lock<Mutex> e_lock(move(u_lock)); } { upgradable_lock<Mutex> u_lock(mut); //This calls try_unlock_upgradable_and_lock() scoped_lock<Mutex> e_lock(move(u_lock, try_to_lock)); } { boost::posix_time::ptime t = test::delay(100); upgradable_lock<Mutex> u_lock(mut); //This calls timed_unlock_upgradable_and_lock() scoped_lock<Mutex> e_lock(move(u_lock)); } { sharable_lock<Mutex> s_lock(mut); //This calls try_unlock_sharable_and_lock() scoped_lock<Mutex> e_lock(move(s_lock, try_to_lock)); }
A transfer to an upgradable_lock
is guaranteed to succeed only from a scoped_lock
since scoped locking is a more restrictive locking than an upgradable
locking. This operation is also non-blocking.
A transfer from a sharable_lock
is not guaranteed and only a "try" operation is permitted:
//Conversions to upgradable { sharable_lock<Mutex> s_lock(mut); //This calls try_unlock_sharable_and_lock_upgradable() upgradable_lock<Mutex> u_lock(move(s_lock, try_to_lock)); } { scoped_lock<Mutex> e_lock(mut); //This calls unlock_and_lock_upgradable() upgradable_lock<Mutex> u_lock(move(e_lock)); }
All transfers to a sharable_lock
are guaranteed to succeed since both upgradable_lock
and scoped_lock
are more
restrictive than sharable_lock
.
These operations are also non-blocking:
//Conversions to sharable_lock { upgradable_lock<Mutex> u_lock(mut); //This calls unlock_upgradable_and_lock_sharable() sharable_lock<Mutex> s_lock(move(u_lock)); } { scoped_lock<Mutex> e_lock(mut); //This calls unlock_and_lock_sharable() sharable_lock<Mutex> s_lock(move(e_lock)); }
In the previous examples, the mutex used in the transfer operation was previously locked:
Mutex mut; //This calls mut.lock() scoped_lock<Mutex> e_lock(mut); //This calls unlock_and_lock_sharable() sharable_lock<Mutex> s_lock(move(e_lock)); }
but it's possible to execute the transfer with an unlocked source, due
to explicit unlocking, a try, timed or a defer_lock
constructor:
//These operations can leave the mutex unlocked! { //Try might fail scoped_lock<Mutex> e_lock(mut, try_to_lock); sharable_lock<Mutex> s_lock(move(e_lock)); } { //Timed operation might fail scoped_lock<Mutex> e_lock(mut, time); sharable_lock<Mutex> s_lock(move(e_lock)); } { //Avoid mutex locking scoped_lock<Mutex> e_lock(mut, defer_lock); sharable_lock<Mutex> s_lock(move(e_lock)); } { //Explicitly call unlock scoped_lock<Mutex> e_lock(mut); e_lock.unlock(); //Mutex was explicitly unlocked sharable_lock<Mutex> s_lock(move(e_lock)); }
If the source mutex was not locked:
unlock_xxx_and_lock_xxx
operation.
{ scoped_lock<Mutex> e_lock(mut, defer_lock); sharable_lock<Mutex> s_lock(move(e_lock)); //Assertions assert(e_lock.mutex() == 0); assert(s_lock.mutex() != 0); assert(e_lock.owns() == false); }
When executing a lock transfer, the operation can fail:
In the first case, the mutex ownership is not transferred and the source lock's destructor will unlock the mutex:
{ scoped_lock<Mutex> e_lock(mut, defer_lock); //This operations throws because //"unlock_and_lock_sharable()" throws!!! sharable_lock<Mutex> s_lock(move(e_lock)); //Some code ... //e_lock's destructor will call "unlock()" }
In the second case, if an internal "try" or "timed" operation fails (returns "false") then the mutex ownership is not transferred, the source lock is unchanged and the target lock's state will the same as a default construction:
{ sharable_lock<Mutex> s_lock(mut); //Internal "try_unlock_sharable_and_lock_upgradable()" returns false upgradable_lock<Mutex> u_lock(move(s_lock, try_to_lock)); assert(s_lock.mutex() == &mut); assert(s_lock.owns() == true); assert(u_lock.mutex() == 0); assert(u_lock.owns() == false); //u_lock's destructor does nothing //s_lock's destructor calls "unlock()" }
A file lock is an interprocess synchronization mechanism to protect concurrent writes and reads to files using a mutex embedded in the file. This embedded mutex has sharable and exclusive locking capabilities. With a file lock, an existing file can be used as a mutex without the need of creating additional synchronization objects to control concurrent file reads or writes.
Generally speaking, we can have two file locking capabilities:
Boost.Interprocess implements advisory blocking because of portability reasons. This means that every process accessing to a file concurrently, must cooperate using file locks to synchronize the access.
In some systems file locking can be even further refined, leading to record locking, where a user can specify a byte range within the file where the lock is applied. This allows concurrent write access by several processes if they need to access a different byte range in the file. Boost.Interprocess does not offer record locking for the moment, but might offer it in the future. To use a file lock just include:
#include <boost/interprocess/sync/file_lock.hpp>
A file locking is a class that has process lifetime. This means that if a process holding a file lock ends or crashes, the operating system will automatically unlock it. This feature is very useful in some situations where we want to assure automatic unlocking even when the process crashes and avoid leaving blocked resources in the system. A file lock is constructed using the name of the file as an argument:
#include <boost/interprocess/sync/file_lock.hpp> int main() { //This throws if the file does not exist or it can't //open it with read-write access! boost::interprocess::file_lock flock("my_file"); return 0; }
File locking has normal mutex operations plus sharable locking capabilities. This means that we can have multiple readers holding the sharable lock and writers holding the exclusive lock waiting until the readers end their job.
However, file locking does not support upgradable locking or promotion or demotion (lock transfers), so it's more limited than an upgradable lock. These are the operations:
Effects: The calling thread tries to obtain exclusive ownership of the file lock, and if another thread has exclusive or sharable ownership of the mutex, it waits until it can obtain the ownership.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire exclusive ownership of the file lock without waiting. If no other thread has exclusive or sharable ownership of the file lock, this succeeds.
Returns: If it can acquire exclusive ownership immediately returns true. If it has to wait, returns false.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire exclusive ownership of the file lock waiting if necessary until no other thread has has exclusive or sharable ownership of the file lock or abs_time is reached.
Returns: If acquires exclusive ownership, returns true. Otherwise returns false.
Throws: interprocess_exception on error.
Precondition: The thread must have exclusive ownership of the file lock.
Effects: The calling thread releases the exclusive ownership of the file lock.
Throws: An exception derived from interprocess_exception on error.
Effects: The calling thread tries to obtain sharable ownership of the file lock, and if another thread has exclusive ownership of the file lock, waits until it can obtain the ownership.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire sharable ownership of the file lock without waiting. If no other thread has has exclusive ownership of the file lock, this succeeds.
Returns: If it can acquire sharable ownership immediately returns true. If it has to wait, returns false.
Throws: interprocess_exception on error.
Effects: The calling thread tries to acquire sharable ownership of the file lock waiting if necessary until no other thread has has exclusive ownership of the file lock or abs_time is reached.
Returns: If acquires sharable ownership, returns true. Otherwise returns false.
Throws: interprocess_exception on error.
Precondition: The thread must have sharable ownership of the file lock.
Effects: The calling thread releases the sharable ownership of the file lock.
Throws: An exception derived from interprocess_exception on error.
For more file locking methods, please file_lock
reference
.
scoped_lock
and sharable_lock
can be used to make file locking easier in the presence of exceptions,
just like with mutexes:
#include <boost/interprocess/sync/file_lock.hpp> #include <boost/interprocess/sync/sharable_lock.hpp> //... using namespace boost::interprocess; //This process reads the file // ... //Open the file lock file_lock f_lock("my_file"); { //Construct a sharable lock with the filel lock. //This will call "f_lock.sharable_lock()". sharable_lock<file_lock> sh_lock(f_lock); //Now read the file... //The sharable lock is automatically released by //sh_lock's destructor }
#include <boost/interprocess/sync/file_lock.hpp> #include <boost/interprocess/sync/scoped_lock.hpp> //... using namespace boost::interprocess; //This process writes the file // ... //Open the file lock file_lock f_lock("my_file"); { //Construct a sharable lock with the filel lock. //This will call "f_lock.lock()". scoped_lock<file_lock> e_lock(f_lock); //Now write the file... //The exclusive lock is automatically released by //e_lock's destructor }
However, lock transfers are only allowed between same type of locks, that
is, from a sharable lock to another sharable lock or from a scoped lock
to another scoped lock. A transfer from a scoped lock to a sharable lock
is not allowed, because file_lock
has no lock promotion or demotion functions like unlock_and_lock_sharable()
. This will produce a compilation error:
//Open the file lock file_lock f_lock("my_file"); scoped_lock<file_lock> e_lock(f_lock); //Compilation error, f_lock has no "unlock_and_lock_sharable()" member! sharable_lock<file_lock> e_lock(move(f_lock));
If you plan to use file locks just like named mutexes, be careful, because portable file locks have synchronization limitations, mainly because different implementations (POSIX, Windows) offer different guarantees. Interprocess file locks have the following limitations:
file_lock
synchronizes two threads from the same process.
file_lock
objects pointing to the same file.
The first limitation comes mainly from POSIX, since a file handle is a
per-process attribute and not a per-thread attribute. This means that if
a thread uses a file_lock
object to lock a file, other threads will see the file as locked. Windows
file locking mechanism, on the other hand, offer thread-synchronization
guarantees so a thread trying to lock the already locked file, would block.
The second limitation comes from the fact that file locking synchronization
state is tied with a single file descriptor in Windows. This means that
if two file_lock
objects
are created pointing to the same file, no synchronization is guaranteed.
In POSIX, when two file descriptors are used to lock a file if a descriptor
is closed, all file locks set by the calling process are cleared.
To sum up, if you plan to use portable file locking in your processes, use the following restrictions:
file_lock
object per process.
As we've seen file locking can be useful to synchronize two processes, but make sure data is written to the file before unlocking the file lock. Take in care that iostream classes do some kind of buffering, so if you want to make sure that other processes can see the data you've written, you have the following alternatives:
fflush
if you are using standard
C functions or the flush()
member function when using C++ iostreams.
In windows you can't even use another class to access the same file.
//... using namespace boost::interprocess; //This process writes the file // ... //Open the file lock fstream file("my_file") file_lock f_lock("my_lock_file"); { scoped_lock<file_lock> e_lock(f_lock); //Now write the file... //Flush data before unlocking the exclusive lock file.flush(); }
A message queue is similar to a list of messages. Threads can put messages in the queue and they can also remove messages from the queue. Each message can have also a priority so that higher priority messages are read before lower priority messages. Each message has some attributes:
A thread can send a message to or receive a message from the message queue using 3 methods:
A message queue just copies raw bytes between processes
and does not send objects. This means that if we want to send an object
using a message queue the object must be binary serializable.
For example, we can send integers between processes but not
a std::string
. You should use Boost.Serialization
or use advanced Boost.Interprocess mechanisms
to send complex data between processes.
The Boost.Interprocess message queue is a named interprocess communication: the message queue is created with a name and it's opened with a name, just like a file. When creating a message queue, the user must specify the maximum message size and the maximum message number that the message queue can store. These parameters will define the resources (for example the size of the shared memory used to implement the message queue if shared memory is used).
using boost::interprocess; //Create a message_queue. If the queue //exists throws an exception message_queue mq (create_only //only create ,"message_queue" //name ,100 //max message number ,100 //max message size );
using boost::interprocess; //Creates or opens a message_queue. If the queue //does not exist creates it, otherwise opens it. //Message number and size are ignored if the queue //is opened message_queue mq (open_or_create //open or create ,"message_queue" //name ,100 //max message number ,100 //max message size );
using boost::interprocess; //Opens a message_queue. If the queue //does not exist throws an exception. message_queue mq (open_only //only open ,"message_queue" //name );
The message queue is explicitly removed calling the static remove
function:
using boost::interprocess; message_queue::remove("message_queue");
The function can fail if the message queue is still being used by any process.
To use a message queue you must include the following header:
#include <boost/interprocess/ipc/message_queue.hpp>
In the following example, the first process creates the message queue, and writes an array of integers on it. The other process just reads the array and checks that the sequence number is correct. This is the first process:
#include <boost/interprocess/ipc/message_queue.hpp> #include <iostream> #include <vector> using namespace boost::interprocess; int main () { try{ //Erase previous message queue message_queue::remove("message_queue"); //Create a message_queue. message_queue mq (create_only //only create ,"message_queue" //name ,100 //max message number ,sizeof(int) //max message size ); //Send 100 numbers for(int i = 0; i < 100; ++i){ mq.send(&i, sizeof(i), 0); } } catch(interprocess_exception &ex){ std::cout << ex.what() << std::endl; return 1; } return 0; }
This is the second process:
#include <boost/interprocess/ipc/message_queue.hpp> #include <iostream> #include <vector> using namespace boost::interprocess; int main () { try{ //Open a message queue. message_queue mq (open_only //only create ,"message_queue" //name ); unsigned int priority; message_queue::size_type recvd_size; //Receive 100 numbers for(int i = 0; i < 100; ++i){ int number; mq.receive(&number, sizeof(number), recvd_size, priority); if(number != i || recvd_size != sizeof(number)) return 1; } } catch(interprocess_exception &ex){ message_queue::remove("message_queue"); std::cout << ex.what() << std::endl; return 1; } message_queue::remove("message_queue"); return 0; }
To know more about this class and all its operations, please see the message_queue
class
reference.