Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.
PrevUpHomeNext

Synchronization mechanisms

Synchronization mechanisms overview
Mutexes
Conditions
Semaphores
Upgradable Mutexes
Lock Transfers Through Move Semantics
File Locks
Message Queue

As mentioned before, the ability to shared memory between processes through memory mapped files or shared memory objects is not very useful if the access to that memory can't be effectively synchronized. This is the same problem that happens with thread-synchronization mechanisms, where heap memory and global variables are shared between threads, but the access to these resources needs to be synchronized typically through mutex and condition variables. Boost.Threads implements these synchronization utilities between threads inside the same process. Boost.Interprocess implements similar mechanisms to synchronize threads from different processes.

Boost.Interprocess presents two types of synchronization objects:

  • Named utilities: When two processes want to create an object of such type, both processes must create or open an object using the same name. This is similar to creating or opening files: a process creates a file with using a fstream with the name filename and another process opens that file using another fstream with the same filename argument. Each process uses a different object to access to the resource, but both processes are using the same underlying resource.
  • Anonymous utilities: Since these utilities have no name, two processes must share the same object through shared memory or memory mapped files. This is similar to traditional thread synchronization objects: Both processes share the same object. Unlike thread synchronization, where global variables and heap memory is shared between threads of the same process, sharing objects between two threads from different process can be only possible through mapped regions that map the same mappable resource (for example, shared memory or memory mapped files).

Each type has it's own advantages and disadvantages:

  • Named utilities are easier to handle for simple synchronization tasks, since both process don't have to create a shared memory region and construct the synchronization mechanism there.
  • Anonymous utilities can be serialized to disk when using memory mapped objects obtaining automatic persistence of synchronization utilities. One could construct a synchronization utility in a memory mapped file, reboot the system, map the file again, and use the synchronization utility again without any problem. This can't be achieved with named synchronization utilities.

The main interface difference between named and anonymous utilities are the constructors. Usually anonymous utilities have only one constructor, whereas the named utilities have several constructors whose first argument is a special type that requests creation, opening or opening or creation of the underlying resource:

using namespace boost::interprocess;

//Create the synchronization utility. If it previously
//exists, throws an error
NamedUtility(create_only, ...)

//Open the synchronization utility. If it does not previously
//exist, it's created.
NamedUtility(open_or_create, ...)

//Open the synchronization utility. If it does not previously
//exist, throws an error.
NamedUtility(open_only, ...)

On the other hand the anonymous synchronization utility can only be created and the processes must synchronize using other mechanisms who creates the utility:

using namespace boost::interprocess;

//Create the synchronization utility.
AnonymousUtility(...)

Apart from its named/anonymous nature, Boost.Interprocess presents the following synchronization utilities:

  • Mutexes (named and anonymous)
  • Condition variables (named and anonymous)
  • Semaphores (named and anonymous)
  • Upgradable mutexes
  • File locks

Mutex stands for mutual exclusion and it's the most basic form of synchronization between processes. Mutexes guarantee that only one thread can lock a given mutex. If a code section is surrounded by a mutex locking and unlocking, it's guaranteed that only a thread at a time executes that section of code. When that thread unlocks the mutex, other threads can enter to that code region:

//The mutex has been previously constructed

lock_the_mutex();

//This code will be executed only by one thread
//at a time.

unlock_the_mutex();

A mutex can also be recursive or non-recursive:

  • Recursive mutexes can be locked several times by the same thread. To fully unlock the mutex, the thread has to unlock the mutex the same times it has locked it.
  • Non-recursive mutexes can't be locked several times by the same thread. If a mutex is locked twice by a thread, the result is undefined, it might throw an error or the thread could be blocked forever.

All the mutex types from Boost.Interprocess implement the following operations:

Effects: The calling thread tries to obtain ownership of the mutex, and if another thread has ownership of the mutex, it waits until it can obtain the ownership. If a thread takes ownership of the mutex the mutex must be unlocked by the same thread. If the mutex supports recursive locking, the mutex must be unlocked the same number of times it is locked.

Throws: interprocess_exception on error.

Effects: The calling thread tries to obtain ownership of the mutex, and if another thread has ownership of the mutex returns immediately. If the mutex supports recursive locking, the mutex must be unlocked the same number of times it is locked.

Returns: If the thread acquires ownership of the mutex, returns true, if the another thread has ownership of the mutex, returns false.

Throws: interprocess_exception on error.

Effects: The calling thread will try to obtain exclusive ownership of the mutex if it can do so in until the specified time is reached. If the mutex supports recursive locking, the mutex must be unlocked the same number of times it is locked.

Returns: If the thread acquires ownership of the mutex, returns true, if the timeout expires returns false.

Throws: interprocess_exception on error.

Precondition: The thread must have exclusive ownership of the mutex.

Effects: The calling thread releases the exclusive ownership of the mutex. If the mutex supports recursive locking, the mutex must be unlocked the same number of times it is locked.

Throws: An exception derived from interprocess_exception on error.

Boost.Interprocess offers the following mutex types:

#include <boost/interprocess/sync/interprocess_mutex.hpp>
  • interprocess_mutex: A non-recursive, anonymous mutex that can be placed in shared memory or memory mapped files.
#include <boost/interprocess/sync/interprocess_recursive_mutex.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_recursive_mutex.hpp>

It's very important to unlock a mutex after the process has read or written the data. This can be difficult when dealing with exceptions, so usually mutexes are used with a scoped lock, a class that can guarantee that a mutex will always be unlocked even when an exception occurs. To use a scoped lock just include:

#include <boost/interprocess/sync/scoped_lock.hpp>

Basically, a scoped lock calls unlock() in its destructor, and a mutex is always unlocked when an exception occurs. Scoped lock has many constructors to lock, try_lock, timed_lock a mutex or not to lock it at all.

using namespace boost::interprocess;

//Let's create any mutex type:
MutexType mutex;

{
   //This will lock the mutex
   scoped_lock<MutexType> lock(mutex);

   //Some code

   //The mutex will be unlocked here
}

{
   //This will try_lock the mutex
   scoped_lock<MutexType> lock(mutex, try_to_lock);

   //Check if the mutex has been successfully locked
   if(lock){
      //Some code
   }

   //If the mutex was locked it will be unlocked
}

{
   boost::posix_time::ptime abs_time = ...

   //This will timed_lock the mutex
   scoped_lock<MutexType> lock(mutex, abs_time);

   //Check if the mutex has been successfully locked
   if(lock){
      //Some code
   }

   //If the mutex was locked it will be unlocked
}

For more information, check the scoped_lock's reference.

Imagine that two processes need to write traces to a cyclic buffer built in shared memory. Each process needs to obtain exclusive access to the cyclic buffer, write the trace and continue.

To protect the cyclic buffer, we can store a process shared mutex in the cyclic buffer. Each process will lock the mutex before writing the data and will write a flag when ends writing the traces (doc_anonymous_mutex_shared_data.hpp header):

#include <boost/interprocess/sync/interprocess_mutex.hpp>

struct shared_memory_log
{
   enum { NumItems = 100 };
   enum { LineSize = 100 };

   shared_memory_log()
      :  current_line(0)
      ,  end_a(false)
      ,  end_b(false)
   {}

   //Mutex to protect access to the queue
   boost::interprocess::interprocess_mutex mutex;

   //Items to fill
   char   items[NumItems][LineSize];
   int    current_line;
   bool   end_a;
   bool   end_b;
};

This is the process main process. Creates the shared memory, constructs the cyclic buffer and start writing traces:

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include "doc_anonymous_mutex_shared_data.hpp"
#include <iostream>
#include <cstdio>

using namespace boost::interprocess;

int main ()
{
   try{
      //Remove shared memory on construction and destruction
      struct shm_remove
      {
         shm_remove() { shared_memory_object::remove("MySharedMemory"); }
         ~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
      } remover;

      //Create a shared memory object.
      shared_memory_object shm
         (create_only               //only create
         ,"MySharedMemory"          //name
         ,read_write   //read-write mode
         );

      //Set size
      shm.truncate(sizeof(shared_memory_log));

      //Map the whole shared memory in this process
      mapped_region region
         (shm                       //What to map
         ,read_write   //Map it as read-write
         );

      //Get the address of the mapped region
      void * addr       = region.get_address();

      //Construct the shared structure in memory
      shared_memory_log * data = new (addr) shared_memory_log;

      //Write some logs
      for(int i = 0; i < shared_memory_log::NumItems; ++i){
         //Lock the mutex
         scoped_lock<interprocess_mutex> lock(data->mutex);
         std::sprintf(data->items[(data->current_line++) % shared_memory_log::NumItems]
                  ,"%s_%d", "process_a", i);
         if(i == (shared_memory_log::NumItems-1))
            data->end_a = true;
         //Mutex is released here
      }

      //Wait until the other process ends
      while(1){
         scoped_lock<interprocess_mutex> lock(data->mutex);
         if(data->end_b)
            break;
      }
   }
   catch(interprocess_exception &ex){
      std::cout << ex.what() << std::endl;
      return 1;
   }
   return 0;
}

The second process opens the shared memory, obtains access to the cyclic buffer and starts writing traces:

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include "doc_anonymous_mutex_shared_data.hpp"
#include <iostream>
#include <cstdio>

using namespace boost::interprocess;

int main ()
{
   //Remove shared memory on destruction
   struct shm_remove
   {
      ~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
   } remover;

   //Open the shared memory object.
   shared_memory_object shm
      (open_only                    //only create
      ,"MySharedMemory"              //name
      ,read_write  //read-write mode
      );

   //Map the whole shared memory in this process
   mapped_region region
      (shm                       //What to map
      ,read_write //Map it as read-write
      );

   //Get the address of the mapped region
   void * addr       = region.get_address();

   //Construct the shared structure in memory
   shared_memory_log * data = static_cast<shared_memory_log*>(addr);

   //Write some logs
   for(int i = 0; i < 100; ++i){
      //Lock the mutex
      scoped_lock<interprocess_mutex> lock(data->mutex);
      std::sprintf(data->items[(data->current_line++) % shared_memory_log::NumItems]
               ,"%s_%d", "process_a", i);
      if(i == (shared_memory_log::NumItems-1))
         data->end_b = true;
      //Mutex is released here
   }

   //Wait until the other process ends
   while(1){
      scoped_lock<interprocess_mutex> lock(data->mutex);
      if(data->end_a)
         break;
   }
   return 0;
}

As we can see, a mutex is useful to protect data but not to notify an event to another process. For this, we need a condition variable, as we will see in the next section.

Now imagine that two processes want to write a trace to a file. First they write their name, and after that they write the message. Since the operating system can interrupt a process in any moment we can mix parts of the messages of both processes, so we need a way to write the whole message to the file atomically. To achieve this, we can use a named mutex so that each process locks the mutex before writing:

#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <fstream>
#include <iostream>
#include <cstdio>


int main ()
{
   using namespace boost::interprocess;
   try{
      struct file_remove
      {
         file_remove() { std::remove("file_name"); }
         ~file_remove(){ std::remove("file_name"); }
      } file_remover;
      struct mutex_remove
      {
         mutex_remove() { named_mutex::remove("fstream_named_mutex"); }
         ~mutex_remove(){ named_mutex::remove("fstream_named_mutex"); }
      } remover;

      //Open or create the named mutex
      named_mutex mutex(open_or_create, "fstream_named_mutex");

      std::ofstream file("file_name");

      for(int i = 0; i < 10; ++i){

         //Do some operations...

         //Write to file atomically
         scoped_lock<named_mutex> lock(mutex);
         file << "Process name, ";
         file << "This is iteration #" << i;
         file << std::endl;
      }
   }
   catch(interprocess_exception &ex){
      std::cout << ex.what() << std::endl;
      return 1;
   }
   return 0;
}

In the previous example, a mutex is used to lock but we can't use it to wait efficiently until the condition to continue is met. A condition variable can do two things:

  • wait: The thread is blocked until some other thread notifies that it can continue because the condition that lead to waiting has disappeared.
  • notify: The thread sends a signal to one blocked thread or to all blocked threads to tell them that they the condition that provoked their wait has disappeared.

Waiting in a condition variable is always associated with a mutex. The mutex must be locked prior to waiting on the condition. When waiting on the condition variable, the thread unlocks the mutex and waits atomically.

When the thread returns from a wait function (because of a signal or a timeout, for example) the mutex object is again locked.

Boost.Interprocess offers the following condition types:

#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/named_condition.hpp>

Named conditions are similar to anonymous conditions, but they are used in combination with named mutexes. Several times, we don't want to store synchronization objects with the synchronized data:

  • We want to change the synchronization method (from interprocess to intra-process, or without any synchronization) using the same data. Storing the process-shared anonymous synchronization with the synchronized data would forbid this.
  • We want to send the synchronized data through the network or any other communication method. Sending the process-shared synchronization objects wouldn't have any sense.

Imagine that a process that writes a trace to a simple shared memory buffer that another process prints one by one. The first process writes the trace and waits until the other process prints the data. To achieve this, we can use two condition variables: the first one is used to block the sender until the second process prints the message and the second one to block the receiver until the buffer has a trace to print.

The shared memory trace buffer (doc_anonymous_condition_shared_data.hpp):

#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>

struct trace_queue
{
   enum { LineSize = 100 };

   trace_queue()
      :  message_in(false)
   {}

   //Mutex to protect access to the queue
   boost::interprocess::interprocess_mutex      mutex;

   //Condition to wait when the queue is empty
   boost::interprocess::interprocess_condition  cond_empty;

   //Condition to wait when the queue is full
   boost::interprocess::interprocess_condition  cond_full;

   //Items to fill
   char   items[LineSize];

   //Is there any message
   bool message_in;
};

This is the process main process. Creates the shared memory, places there the buffer and starts writing messages one by one until it writes "last message" to indicate that there are no more messages to print:

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstdio>
#include "doc_anonymous_condition_shared_data.hpp"

using namespace boost::interprocess;

int main ()
{

   //Erase previous shared memory and schedule erasure on exit
   struct shm_remove
   {
      shm_remove() { shared_memory_object::remove("MySharedMemory"); }
      ~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
   } remover;

   //Create a shared memory object.
   shared_memory_object shm
      (create_only               //only create
      ,"MySharedMemory"           //name
      ,read_write                //read-write mode
      );
   try{
      //Set size
      shm.truncate(sizeof(trace_queue));

      //Map the whole shared memory in this process
      mapped_region region
         (shm                       //What to map
         ,read_write //Map it as read-write
         );

      //Get the address of the mapped region
      void * addr       = region.get_address();

      //Construct the shared structure in memory
      trace_queue * data = new (addr) trace_queue;

      const int NumMsg = 100;

      for(int i = 0; i < NumMsg; ++i){
         scoped_lock<interprocess_mutex> lock(data->mutex);
         if(data->message_in){
            data->cond_full.wait(lock);
         }
         if(i == (NumMsg-1))
            std::sprintf(data->items, "%s", "last message");
         else
            std::sprintf(data->items, "%s_%d", "my_trace", i);

         //Notify to the other process that there is a message
         data->cond_empty.notify_one();

         //Mark message buffer as full
         data->message_in = true;
      }
   }
   catch(interprocess_exception &ex){
      std::cout << ex.what() << std::endl;
      return 1;
   }

   return 0;
}

The second process opens the shared memory and prints each message until the "last message" message is received:

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstring>
#include "doc_anonymous_condition_shared_data.hpp"

using namespace boost::interprocess;

int main ()
{
   //Create a shared memory object.
   shared_memory_object shm
      (open_only                    //only create
      ,"MySharedMemory"              //name
      ,read_write                   //read-write mode
      );

   try{
      //Map the whole shared memory in this process
      mapped_region region
         (shm                       //What to map
         ,read_write //Map it as read-write
         );

      //Get the address of the mapped region
      void * addr       = region.get_address();

      //Obtain a pointer to the shared structure
      trace_queue * data = static_cast<trace_queue*>(addr);

      //Print messages until the other process marks the end
      bool end_loop = false;
      do{
         scoped_lock<interprocess_mutex> lock(data->mutex);
         if(!data->message_in){
            data->cond_empty.wait(lock);
         }
         if(std::strcmp(data->items, "last message") == 0){
            end_loop = true;
         }
         else{
            //Print the message
            std::cout << data->items << std::endl;
            //Notify the other process that the buffer is empty
            data->message_in = false;
            data->cond_full.notify_one();
         }
      }
      while(!end_loop);
   }
   catch(interprocess_exception &ex){
      std::cout << ex.what() << std::endl;
      return 1;
   }

   return 0;
}

With condition variables, a process can block if it can't continue the work, and when the conditions to continue are met another process can wake it.

A semaphore is a synchronization mechanism between processes based in an internal count that offers two basic operations:

  • Wait: Tests the value of the semaphore count, and waits if the value is less than or equal than 0. Otherwise, decrements the semaphore count.
  • Post: Increments the semaphore count. If any process is blocked, one of those processes is awoken.

If the initial semaphore count is initialized to 1, a Wait operation is equivalent to a mutex locking and Post is equivalent to a mutex unlocking. This type of semaphore is known as a binary semaphore.

Although semaphores can be used like mutexes, they have a unique feature: unlike mutexes, a Post operation need not be executed by the same thread/process that executed the Wait operation.

Boost.Interprocess offers the following semaphore types:

#include <boost/interprocess/sync/interprocess_semaphore.hpp>
#include <boost/interprocess/sync/named_semaphore.hpp>

We will implement an integer array in shared memory that will be used to transfer data from one process to another process. The first process will write some integers to the array and the process will block if the array is full.

The second process will copy the transmitted data to its own buffer, blocking if there is no new data in the buffer.

This is the shared integer array (doc_anonymous_semaphore_shared_data.hpp):

#include <boost/interprocess/sync/interprocess_semaphore.hpp>

struct shared_memory_buffer
{
   enum { NumItems = 10 };

   shared_memory_buffer()
      : mutex(1), nempty(NumItems), nstored(0)
   {}

   //Semaphores to protect and synchronize access
   boost::interprocess::interprocess_semaphore
      mutex, nempty, nstored;

   //Items to fill
   int items[NumItems];
};

This is the process main process. Creates the shared memory, places there the integer array and starts integers one by one, blocking if the array is full:

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>
#include "doc_anonymous_semaphore_shared_data.hpp"

using namespace boost::interprocess;

int main ()
{
   //Remove shared memory on construction and destruction
   struct shm_remove
   {
      shm_remove() { shared_memory_object::remove("MySharedMemory"); }
      ~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
   } remover;

   //Create a shared memory object.
   shared_memory_object shm
      (create_only                  //only create
      ,"MySharedMemory"              //name
      ,read_write  //read-write mode
      );

   //Set size
   shm.truncate(sizeof(shared_memory_buffer));

   //Map the whole shared memory in this process
   mapped_region region
      (shm                       //What to map
      ,read_write //Map it as read-write
      );

   //Get the address of the mapped region
   void * addr       = region.get_address();

   //Construct the shared structure in memory
   shared_memory_buffer * data = new (addr) shared_memory_buffer;

   const int NumMsg = 100;

   //Insert data in the array
   for(int i = 0; i < NumMsg; ++i){
      data->nempty.wait();
      data->mutex.wait();
      data->items[i % shared_memory_buffer::NumItems] = i;
      data->mutex.post();
      data->nstored.post();
   }

   return 0;
}

The second process opens the shared memory and copies the received integers to it's own buffer:

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>
#include "doc_anonymous_semaphore_shared_data.hpp"

using namespace boost::interprocess;

int main ()
{
   //Remove shared memory on destruction
   struct shm_remove
   {
      ~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
   } remover;

   //Create a shared memory object.
   shared_memory_object shm
      (open_only                    //only create
      ,"MySharedMemory"              //name
      ,read_write  //read-write mode
      );

   //Map the whole shared memory in this process
   mapped_region region
      (shm                       //What to map
      ,read_write //Map it as read-write
      );

   //Get the address of the mapped region
   void * addr       = region.get_address();

   //Obtain the shared structure
   shared_memory_buffer * data = static_cast<shared_memory_buffer*>(addr);

   const int NumMsg = 100;

   int extracted_data [NumMsg];

   //Extract the data
   for(int i = 0; i < NumMsg; ++i){
      data->nstored.wait();
      data->mutex.wait();
      extracted_data[i] = data->items[i % shared_memory_buffer::NumItems];
      data->mutex.post();
      data->nempty.post();
   }
   return 0;
}

The same interprocess communication can be achieved with a condition variables and mutexes, but for several synchronization patterns, a semaphore is more efficient than a mutex/condition combination.

An upgradable mutex is a special mutex that offers more locking possibilities than a normal mutex. Sometimes, we can distinguish between reading the data and modifying the data. If just some threads need to modify the data, and a plain mutex is used to protect the data from concurrent access, concurrency is pretty limited: two threads that only read the data will be serialized instead of being executed concurrently.

If we allow concurrent access to threads that just read the data but we avoid concurrent access between threads that read and modify or between threads that modify, we can increase performance. This is specially true in applications where data reading is more common than data modification and the synchronized data reading code needs some time to execute. With an upgradable mutex we can acquire 3 lock types:

  • Exclusive lock: Similar to a plain mutex. If a thread acquires an exclusive lock, no other thread can acquire any lock (exclusive or other) until the exclusive lock is released. If any thread has a sharable or upgradable lock a thread trying to acquire an exclusive lock will block. This lock will be acquired by threads that will modify the data.
  • Sharable lock: If a thread acquires a sharable lock, other threads can acquire a sharable lock or an upgradable lock. If any thread has acquired the exclusive lock a thread trying to acquire a sharable lock will block. This locking is executed by threads that just need to read the data.
  • Upgradable lock: Acquiring an upgradable lock is similar to acquiring a privileged sharable lock. If a thread acquires an upgradable lock, other threads can acquire a sharable lock. If any thread has acquired the exclusive or upgradable lock a thread trying to acquire an upgradable lock will block. A thread that has acquired an upgradable lock, is guaranteed to be able to acquire atomically an exclusive lock when other threads that have acquired a sharable lock release it. This is used for a thread that maybe needs to modify the data, but usually just needs to read the data. This thread acquires the upgradable lock and other threads can acquire the sharable lock. If the upgradable thread reads the data and it has to modify it, the thread can be promoted to acquire the exclusive lock: when all sharable threads have released the sharable lock, the upgradable lock is atomically promoted to an exclusive lock. The newly promoted thread can modify the data and it can be sure that no other thread has modified it while doing the transition. Only 1 thread can acquire the upgradable (privileged reader) lock.

To sum up:

Table 11.5. Locking Possibilities

If a thread has acquired the...

Other threads can acquire...

Sharable lock

many sharable locks and 1 upgradable lock

Upgradable lock

many sharable locks

Exclusive lock

no locks


A thread that has acquired a lock can try to acquire another lock type atomically. All lock transitions are not guaranteed to succeed. Even if a transition is guaranteed to succeed, some transitions will block the thread waiting until other threads release the sharable locks. Atomically means that no other thread will acquire an Upgradable or Exclusive lock in the transition, so data is guaranteed to remain unchanged:

Table 11.6. Transition Possibilities

If a thread has acquired the...

It can atomically release the previous lock and...

Sharable lock

try to obtain (not guaranteed) immediately the Exclusive lock if no other thread has exclusive or upgrable lock

Sharable lock

try to obtain (not guaranteed) immediately the Upgradable lock if no other thread has exclusive or upgrable lock

Upgradable lock

obtain the Exclusive lock when all sharable locks are released

Upgradable lock

obtain the Sharable lock immediately

Exclusive lock

obtain the Upgradable lock immediately

Exclusive lock

obtain the Sharable lock immediately


As we can see, an upgradable mutex is a powerful synchronization utility that can improve the concurrency. However, if most of the time we have to modify the data, or the synchronized code section is very short, it's more efficient to use a plain mutex, since it has less overhead. Upgradable lock shines when the synchronized code section is bigger and there are more readers than modifiers.

All the upgradable mutex types from Boost.Interprocess implement the following operations:

Effects: The calling thread tries to obtain exclusive ownership of the mutex, and if another thread has exclusive, sharable or upgradable ownership of the mutex, it waits until it can obtain the ownership.

Throws: interprocess_exception on error.

Effects: The calling thread tries to acquire exclusive ownership of the mutex without waiting. If no other thread has exclusive, sharable or upgradable ownership of the mutex this succeeds.

Returns: If it can acquire exclusive ownership immediately returns true. If it has to wait, returns false.

Throws: interprocess_exception on error.

Effects: The calling thread tries to acquire exclusive ownership of the mutex waiting if necessary until no other thread has has exclusive, sharable or upgradable ownership of the mutex or abs_time is reached.

Returns: If acquires exclusive ownership, returns true. Otherwise returns false.

Throws: interprocess_exception on error.

Precondition: The thread must have exclusive ownership of the mutex.

Effects: The calling thread releases the exclusive ownership of the mutex.

Throws: An exception derived from interprocess_exception on error.

Effects: The calling thread tries to obtain sharable ownership of the mutex, and if another thread has exclusive or upgradable ownership of the mutex, waits until it can obtain the ownership.

Throws: interprocess_exception on error.

Effects: The calling thread tries to acquire sharable ownership of the mutex without waiting. If no other thread has has exclusive or upgradable ownership of the mutex this succeeds.

Returns: If it can acquire sharable ownership immediately returns true. If it has to wait, returns false.

Throws: interprocess_exception on error.

Effects: The calling thread tries to acquire sharable ownership of the mutex waiting if necessary until no other thread has has exclusive or upgradable ownership of the mutex or abs_time is reached.

Returns: If acquires sharable ownership, returns true. Otherwise returns false.

Throws: interprocess_exception on error.

Precondition: The thread must have sharable ownership of the mutex.

Effects: The calling thread releases the sharable ownership of the mutex.

Throws: An exception derived from interprocess_exception on error.

Effects: The calling thread tries to obtain upgradable ownership of the mutex, and if another thread has exclusive or upgradable ownership of the mutex, waits until it can obtain the ownership.

Throws: interprocess_exception on error.

Effects: The calling thread tries to acquire upgradable ownership of the mutex without waiting. If no other thread has has exclusive or upgradable ownership of the mutex this succeeds.

Returns: If it can acquire upgradable ownership immediately returns true. If it has to wait, returns false.

Throws: interprocess_exception on error.

Effects: The calling thread tries to acquire upgradable ownership of the mutex waiting if necessary until no other thread has has exclusive or upgradable ownership of the mutex or abs_time is reached.

Returns: If acquires upgradable ownership, returns true. Otherwise returns false.

Throws: interprocess_exception on error.

Precondition: The thread must have upgradable ownership of the mutex.

Effects: The calling thread releases the upgradable ownership of the mutex.

Throws: An exception derived from interprocess_exception on error.

Precondition: The thread must have exclusive ownership of the mutex.

Effects: The thread atomically releases exclusive ownership and acquires upgradable ownership. This operation is non-blocking.

Throws: An exception derived from interprocess_exception on error.

Precondition: The thread must have exclusive ownership of the mutex.

Effects: The thread atomically releases exclusive ownership and acquires sharable ownership. This operation is non-blocking.

Throws: An exception derived from interprocess_exception on error.

Precondition: The thread must have upgradable ownership of the mutex.

Effects: The thread atomically releases upgradable ownership and acquires sharable ownership. This operation is non-blocking.

Throws: An exception derived from interprocess_exception on error.

Precondition: The thread must have upgradable ownership of the mutex.

Effects: The thread atomically releases upgradable ownership and acquires exclusive ownership. This operation will block until all threads with sharable ownership release it.

Throws: An exception derived from interprocess_exception on error.

Precondition: The thread must have upgradable ownership of the mutex.

Effects: The thread atomically releases upgradable ownership and tries to acquire exclusive ownership. This operation will fail if there are threads with sharable ownership, but it will maintain upgradable ownership.

Returns: If acquires exclusive ownership, returns true. Otherwise returns false.

Throws: An exception derived from interprocess_exception on error.

Precondition: The thread must have upgradable ownership of the mutex.

Effects: The thread atomically releases upgradable ownership and tries to acquire exclusive ownership, waiting if necessary until abs_time. This operation will fail if there are threads with sharable ownership or timeout reaches, but it will maintain upgradable ownership.

Returns: If acquires exclusive ownership, returns true. Otherwise returns false.

Throws: An exception derived from interprocess_exception on error.

Precondition: The thread must have sharable ownership of the mutex.

Effects: The thread atomically releases sharable ownership and tries to acquire exclusive ownership. This operation will fail if there are threads with sharable or upgradable ownership, but it will maintain sharable ownership.

Returns: If acquires exclusive ownership, returns true. Otherwise returns false.

Throws: An exception derived from interprocess_exception on error.

Precondition: The thread must have sharable ownership of the mutex.

Effects: The thread atomically releases sharable ownership and tries to acquire upgradable ownership. This operation will fail if there are threads with sharable or upgradable ownership, but it will maintain sharable ownership.

Returns: If acquires upgradable ownership, returns true. Otherwise returns false.

Throws: An exception derived from interprocess_exception on error.

Boost.Interprocess offers the following upgradable mutex types:

#include <boost/interprocess/sync/interprocess_upgradable_mutex.hpp>
#include <boost/interprocess/sync/named_upgradable_mutex.hpp>

As with plain mutexes, it's important to release the acquired lock even in the presence of exceptions. Boost.Interprocess mutexes are best used with the scoped_lock utility, and this class only offers exclusive locking.

As we have sharable locking and upgradable locking with upgradable mutexes, we have two new utilities: sharable_lock and upgradable_lock. Both classes are similar to scoped_lock but sharable_lock acquires the sharable lock in the constructor and upgradable_lock acquires the upgradable lock in the constructor.

These two utilities can be use with any synchronization object that offers the needed operations. For example, a user defined mutex type with no upgradable locking features can use sharable_lock if the synchronization object offers lock_sharable() and unlock_sharable() operations:

#include <boost/interprocess/sync/sharable_lock.hpp>
#include <boost/interprocess/sync/upgradable_lock.hpp>

sharable_lock calls unlock_sharable() in its destructor, and upgradable_lock calls unlock_upgradable() in its destructor, so the upgradable mutex is always unlocked when an exception occurs. Scoped lock has many constructors to lock, try_lock, timed_lock a mutex or not to lock it at all.

using namespace boost::interprocess;

//Let's create any mutex type:
MutexType mutex;

{
   //This will call lock_sharable()
   sharable_lock<MutexType> lock(mutex);

   //Some code

   //The mutex will be unlocked here
}

{
   //This won't lock the mutex()
   sharable_lock<MutexType> lock(mutex, defer_lock);

   //Lock it on demand. This will call lock_sharable()
   lock.lock();

   //Some code

   //The mutex will be unlocked here
}

{
   //This will call try_lock_sharable()
   sharable_lock<MutexType> lock(mutex, try_to_lock);

   //Check if the mutex has been successfully locked
   if(lock){
      //Some code
   }
   //If the mutex was locked it will be unlocked
}

{
   boost::posix_time::ptime abs_time = ...

   //This will call timed_lock_sharable()
   scoped_lock<MutexType> lock(mutex, abs_time);

   //Check if the mutex has been successfully locked
   if(lock){
      //Some code
   }
   //If the mutex was locked it will be unlocked
}

{
   //This will call lock_upgradable()
   upgradable_lock<MutexType> lock(mutex);

   //Some code

   //The mutex will be unlocked here
}

{
   //This won't lock the mutex()
   upgradable_lock<MutexType> lock(mutex, defer_lock);

   //Lock it on demand. This will call lock_upgradable()
   lock.lock();

   //Some code

   //The mutex will be unlocked here
}

{
   //This will call try_lock_upgradable()
   upgradable_lock<MutexType> lock(mutex, try_to_lock);

   //Check if the mutex has been successfully locked
   if(lock){
      //Some code
   }
   //If the mutex was locked it will be unlocked
}

{
   boost::posix_time::ptime abs_time = ...

   //This will call timed_lock_upgradable()
   scoped_lock<MutexType> lock(mutex, abs_time);

   //Check if the mutex has been successfully locked
   if(lock){
      //Some code
   }
   //If the mutex was locked it will be unlocked
}

upgradable_lock and sharable_lock offer more features and operations, see their reference for more informations

Scoped locks and similar utilities offer simple resource management possibilities, but with advanced mutex types like upgradable mutexes, there are operations where an acquired lock type is released and another lock type is acquired atomically. This is implemented by upgradable mutex operations like unlock_and_lock_sharable().

These operations can be managed more effectively using lock transfer operations. A lock transfer operations explicitly indicates that a mutex owned by a lock is transferred to another lock executing atomic unlocking plus locking operations.

Imagine that a thread modifies some data in the beginning but after that, it has to just read it in a long time. The code can acquire the exclusive lock, modify the data and atomically release the exclusive lock and acquire the sharable lock. With these sequence we guarantee that no other thread can modify the data in the transition and that more readers can acquire sharable lock, increasing concurrency. Without lock transfer operations, this would be coded like this:

using boost::interprocess;
interprocess_upgradable_mutex mutex;

//Acquire exclusive lock
mutex.lock();

//Modify data

//Atomically release exclusive lock and acquire sharable lock.
//More threads can acquire the sharable lock and read the data.
mutex.unlock_and_lock_sharable();

//Read data

//Explicit unlocking
mutex.unlock_sharable();

This can be simple, but in the presence of exceptions, it's complicated to know what type of lock the mutex had when the exception was thrown and what function we should call to unlock it:

try{
   //Mutex operations
}
catch(...){
   //What should we call? "unlock()" or "unlock_sharable()"
   //Is the mutex locked?
}

We can use lock transfer to simplify all this management:

using boost::interprocess;
interprocess_upgradable_mutex mutex;

//Acquire exclusive lock
scoped_lock s_lock(mutex);

//Modify data

//Atomically release exclusive lock and acquire sharable lock.
//More threads can acquire the sharable lock and read the data.
sharable_lock(move(s_lock));

//Read data

//The lock is automatically unlocked calling the appropriate unlock
//function even in the presence of exceptions.
//If the mutex was not locked, no function is called.

As we can see, even if an exception is thrown at any moment, the mutex will be automatically unlocked calling the appropriate unlock() or unlock_sharable() method.

There are many lock transfer operations that we can classify according to the operations presented in the upgradable mutex operations:

  • Guaranteed to succeed and non-blocking: Any transition from a more restrictive lock to a less restrictive one. Scoped -> Upgradable, Scoped -> Sharable, Upgradable -> Sharable.
  • Not guaranteed to succeed: The operation might succeed if no one has acquired the upgradable or exclusive lock: Sharable -> Exclusive. This operation is a try operation.
  • Guaranteed to succeed if using an infinite waiting: Any transition that will succeed but needs to wait until all Sharable locks are released: Upgradable -> Scoped. Since this is a blocking operation, we can also choose not to wait infinitely and just try or wait until a timeout is reached.

Transfers to scoped_lock are guaranteed to succeed only from an upgradable_lock and only if a blocking operation is requested, due to the fact that this operation needs to wait until all sharable locks are released. The user can also use "try" or "timed" transfer to avoid infinite locking, but succeed is not guaranteed.

A conversion from a sharable_lock is never guaranteed and thus, only a try operation is permitted:

//Conversions to scoped_lock
{
   upgradable_lock<Mutex>  u_lock(mut);
   //This calls unlock_upgradable_and_lock()
   scoped_lock<Mutex>      e_lock(move(u_lock));
}
{
   upgradable_lock<Mutex>  u_lock(mut);
   //This calls try_unlock_upgradable_and_lock()
   scoped_lock<Mutex>      e_lock(move(u_lock, try_to_lock));
}
{
   boost::posix_time::ptime t = test::delay(100);
   upgradable_lock<Mutex>  u_lock(mut);
   //This calls timed_unlock_upgradable_and_lock()
   scoped_lock<Mutex>      e_lock(move(u_lock));
}
{
   sharable_lock<Mutex>    s_lock(mut);
   //This calls try_unlock_sharable_and_lock()
   scoped_lock<Mutex>      e_lock(move(s_lock, try_to_lock));
}

A transfer to an upgradable_lock is guaranteed to succeed only from a scoped_lock since scoped locking is a more restrictive locking than an upgradable locking. This operation is also non-blocking.

A transfer from a sharable_lock is not guaranteed and only a "try" operation is permitted:

//Conversions to upgradable
{
   sharable_lock<Mutex>    s_lock(mut);
   //This calls try_unlock_sharable_and_lock_upgradable()
   upgradable_lock<Mutex>  u_lock(move(s_lock, try_to_lock));
}
{
   scoped_lock<Mutex>      e_lock(mut);
   //This calls unlock_and_lock_upgradable()
   upgradable_lock<Mutex>  u_lock(move(e_lock));
}

All transfers to a sharable_lock are guaranteed to succeed since both upgradable_lock and scoped_lock are more restrictive than sharable_lock. These operations are also non-blocking:

//Conversions to sharable_lock
{
   upgradable_lock<Mutex>  u_lock(mut);
   //This calls unlock_upgradable_and_lock_sharable()
   sharable_lock<Mutex>    s_lock(move(u_lock));
}
{
   scoped_lock<Mutex>      e_lock(mut);
   //This calls unlock_and_lock_sharable()
   sharable_lock<Mutex>    s_lock(move(e_lock));
}

In the previous examples, the mutex used in the transfer operation was previously locked:

   Mutex mut;

   //This calls mut.lock()
   scoped_lock<Mutex>      e_lock(mut);

   //This calls unlock_and_lock_sharable()
   sharable_lock<Mutex>    s_lock(move(e_lock));
}

but it's possible to execute the transfer with an unlocked source, due to explicit unlocking, a try, timed or a defer_lock constructor:

//These operations can leave the mutex unlocked!

{
   //Try might fail
   scoped_lock<Mutex>      e_lock(mut, try_to_lock);
   sharable_lock<Mutex>    s_lock(move(e_lock));
}
{
   //Timed operation might fail
   scoped_lock<Mutex>      e_lock(mut, time);
   sharable_lock<Mutex>    s_lock(move(e_lock));
}
{
   //Avoid mutex locking
   scoped_lock<Mutex>      e_lock(mut, defer_lock);
   sharable_lock<Mutex>    s_lock(move(e_lock));
}
{
   //Explicitly call unlock
   scoped_lock<Mutex>      e_lock(mut);
   e_lock.unlock();
   //Mutex was explicitly unlocked
   sharable_lock<Mutex>    s_lock(move(e_lock));
}

If the source mutex was not locked:

  • The target lock does not execute the atomic unlock_xxx_and_lock_xxx operation.
  • The target lock is also unlocked.
  • The source lock is released() and the ownership of the mutex is transferred to the target.
{
   scoped_lock<Mutex>      e_lock(mut, defer_lock);
   sharable_lock<Mutex>    s_lock(move(e_lock));

   //Assertions
   assert(e_lock.mutex() == 0);
   assert(s_lock.mutex() != 0);
   assert(e_lock.owns()  == false);
}

When executing a lock transfer, the operation can fail:

  • The executed atomic mutex unlock plus lock function might throw.
  • The executed atomic function might be a "try" or "timed" function that can fail.

In the first case, the mutex ownership is not transferred and the source lock's destructor will unlock the mutex:

{
   scoped_lock<Mutex>      e_lock(mut, defer_lock);

   //This operations throws because
   //"unlock_and_lock_sharable()" throws!!!
   sharable_lock<Mutex>    s_lock(move(e_lock));

   //Some code ...

   //e_lock's destructor will call "unlock()"
}

In the second case, if an internal "try" or "timed" operation fails (returns "false") then the mutex ownership is not transferred, the source lock is unchanged and the target lock's state will the same as a default construction:

{
   sharable_lock<Mutex>    s_lock(mut);

   //Internal "try_unlock_sharable_and_lock_upgradable()" returns false
   upgradable_lock<Mutex>  u_lock(move(s_lock, try_to_lock));

   assert(s_lock.mutex() == &mut);
   assert(s_lock.owns()  == true);
   assert(u_lock.mutex() == 0);
   assert(u_lock.owns()  == false);

   //u_lock's destructor does nothing
   //s_lock's destructor calls "unlock()"
}

A file lock is an interprocess synchronization mechanism to protect concurrent writes and reads to files using a mutex embedded in the file. This embedded mutex has sharable and exclusive locking capabilities. With a file lock, an existing file can be used as a mutex without the need of creating additional synchronization objects to control concurrent file reads or writes.

Generally speaking, we can have two file locking capabilities:

  • Advisory locking: The operating system kernel maintains a list of files that have been locked. But does not prevent writing to those files even if a process has acquired a sharable lock or does not prevent reading from the file when a process has acquired the exclusive lock. Any process can ignore an advisory lock. This means that advisory locks are for cooperating processes, processes that can trust each other. This is similar to a mutex protecting data in a shared memory segment: any process connected to that memory can overwrite the data but cooperative processes use mutexes to protect the data first acquiring the mutex lock.
  • Mandatory locking: The OS kernel checks every read and write request to verify that the operation can be performed according to the acquired lock. Reads and writes block until the lock is released.

Boost.Interprocess implements advisory blocking because of portability reasons. This means that every process accessing to a file concurrently, must cooperate using file locks to synchronize the access.

In some systems file locking can be even further refined, leading to record locking, where a user can specify a byte range within the file where the lock is applied. This allows concurrent write access by several processes if they need to access a different byte range in the file. Boost.Interprocess does not offer record locking for the moment, but might offer it in the future. To use a file lock just include:

#include <boost/interprocess/sync/file_lock.hpp>

A file locking is a class that has process lifetime. This means that if a process holding a file lock ends or crashes, the operating system will automatically unlock it. This feature is very useful in some situations where we want to assure automatic unlocking even when the process crashes and avoid leaving blocked resources in the system. A file lock is constructed using the name of the file as an argument:

#include <boost/interprocess/sync/file_lock.hpp>

int main()
{
   //This throws if the file does not exist or it can't
   //open it with read-write access!
   boost::interprocess::file_lock flock("my_file");
   return 0;
}

File locking has normal mutex operations plus sharable locking capabilities. This means that we can have multiple readers holding the sharable lock and writers holding the exclusive lock waiting until the readers end their job.

However, file locking does not support upgradable locking or promotion or demotion (lock transfers), so it's more limited than an upgradable lock. These are the operations:

Effects: The calling thread tries to obtain exclusive ownership of the file lock, and if another thread has exclusive or sharable ownership of the mutex, it waits until it can obtain the ownership.

Throws: interprocess_exception on error.

Effects: The calling thread tries to acquire exclusive ownership of the file lock without waiting. If no other thread has exclusive or sharable ownership of the file lock, this succeeds.

Returns: If it can acquire exclusive ownership immediately returns true. If it has to wait, returns false.

Throws: interprocess_exception on error.

Effects: The calling thread tries to acquire exclusive ownership of the file lock waiting if necessary until no other thread has has exclusive or sharable ownership of the file lock or abs_time is reached.

Returns: If acquires exclusive ownership, returns true. Otherwise returns false.

Throws: interprocess_exception on error.

Precondition: The thread must have exclusive ownership of the file lock.

Effects: The calling thread releases the exclusive ownership of the file lock.

Throws: An exception derived from interprocess_exception on error.

Effects: The calling thread tries to obtain sharable ownership of the file lock, and if another thread has exclusive ownership of the file lock, waits until it can obtain the ownership.

Throws: interprocess_exception on error.

Effects: The calling thread tries to acquire sharable ownership of the file lock without waiting. If no other thread has has exclusive ownership of the file lock, this succeeds.

Returns: If it can acquire sharable ownership immediately returns true. If it has to wait, returns false.

Throws: interprocess_exception on error.

Effects: The calling thread tries to acquire sharable ownership of the file lock waiting if necessary until no other thread has has exclusive ownership of the file lock or abs_time is reached.

Returns: If acquires sharable ownership, returns true. Otherwise returns false.

Throws: interprocess_exception on error.

Precondition: The thread must have sharable ownership of the file lock.

Effects: The calling thread releases the sharable ownership of the file lock.

Throws: An exception derived from interprocess_exception on error.

For more file locking methods, please file_lock reference.

scoped_lock and sharable_lock can be used to make file locking easier in the presence of exceptions, just like with mutexes:

#include <boost/interprocess/sync/file_lock.hpp>
#include <boost/interprocess/sync/sharable_lock.hpp>
//...

using namespace boost::interprocess;
//This process reads the file
//    ...
//Open the file lock
file_lock f_lock("my_file");

{
   //Construct a sharable lock with the filel lock.
   //This will call "f_lock.sharable_lock()".
   sharable_lock<file_lock> sh_lock(f_lock);

   //Now read the file...

   //The sharable lock is automatically released by
   //sh_lock's destructor
}
#include <boost/interprocess/sync/file_lock.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>

//...

using namespace boost::interprocess;
//This process writes the file
//    ...
//Open the file lock
file_lock f_lock("my_file");

{
   //Construct a sharable lock with the filel lock.
   //This will call "f_lock.lock()".
   scoped_lock<file_lock> e_lock(f_lock);

   //Now write the file...

   //The exclusive lock is automatically released by
   //e_lock's destructor
}

However, lock transfers are only allowed between same type of locks, that is, from a sharable lock to another sharable lock or from a scoped lock to another scoped lock. A transfer from a scoped lock to a sharable lock is not allowed, because file_lock has no lock promotion or demotion functions like unlock_and_lock_sharable(). This will produce a compilation error:

//Open the file lock
file_lock f_lock("my_file");

scoped_lock<file_lock> e_lock(f_lock);

//Compilation error, f_lock has no "unlock_and_lock_sharable()" member!
sharable_lock<file_lock> e_lock(move(f_lock));

If you plan to use file locks just like named mutexes, be careful, because portable file locks have synchronization limitations, mainly because different implementations (POSIX, Windows) offer different guarantees. Interprocess file locks have the following limitations:

  • It's unspecified if a file_lock synchronizes two threads from the same process.
  • It's unspecified if a process can use two file_lock objects pointing to the same file.

The first limitation comes mainly from POSIX, since a file handle is a per-process attribute and not a per-thread attribute. This means that if a thread uses a file_lock object to lock a file, other threads will see the file as locked. Windows file locking mechanism, on the other hand, offer thread-synchronization guarantees so a thread trying to lock the already locked file, would block.

The second limitation comes from the fact that file locking synchronization state is tied with a single file descriptor in Windows. This means that if two file_lock objects are created pointing to the same file, no synchronization is guaranteed. In POSIX, when two file descriptors are used to lock a file if a descriptor is closed, all file locks set by the calling process are cleared.

To sum up, if you plan to use portable file locking in your processes, use the following restrictions:

  • For each file, use a single file_lock object per process.
  • Use the same thread to lock and unlock a file.
  • If you are using a std::fstream/native file handle to write to the file while using file locks on that file, don't close the file before releasing all the locks of the file.

As we've seen file locking can be useful to synchronize two processes, but make sure data is written to the file before unlocking the file lock. Take in care that iostream classes do some kind of buffering, so if you want to make sure that other processes can see the data you've written, you have the following alternatives:

  • Use native file functions (read()/write() in Unix systems and ReadFile/WriteFile in Windows systems) instead of iostream.
  • Flush data before unlocking the file lock in writers using fflush if you are using standard C functions or the flush() member function when using C++ iostreams. In windows you can't even use another class to access the same file.
//...

using namespace boost::interprocess;
//This process writes the file
//    ...
//Open the file lock
fstream file("my_file")
file_lock f_lock("my_lock_file");

{
   scoped_lock<file_lock> e_lock(f_lock);

   //Now write the file...

   //Flush data before unlocking the exclusive lock
   file.flush();
}

A message queue is similar to a list of messages. Threads can put messages in the queue and they can also remove messages from the queue. Each message can have also a priority so that higher priority messages are read before lower priority messages. Each message has some attributes:

  • A priority.
  • The length of the message.
  • The data (if length is bigger than 0).

A thread can send a message to or receive a message from the message queue using 3 methods:

  • Blocking: If the message queue is full when sending or the message queue is empty when receiving, the thread is blocked until there is room for a new message or there is a new message.
  • Try: If the message queue is full when sending or the message queue is empty when receiving, the thread returns immediately with an error.
  • Timed: If the message queue is full when sending or the message queue is empty when receiving, the thread retries the operation until succeeds (returning successful state) or a timeout is reached (returning a failure).

A message queue just copies raw bytes between processes and does not send objects. This means that if we want to send an object using a message queue the object must be binary serializable. For example, we can send integers between processes but not a std::string. You should use Boost.Serialization or use advanced Boost.Interprocess mechanisms to send complex data between processes.

The Boost.Interprocess message queue is a named interprocess communication: the message queue is created with a name and it's opened with a name, just like a file. When creating a message queue, the user must specify the maximum message size and the maximum message number that the message queue can store. These parameters will define the resources (for example the size of the shared memory used to implement the message queue if shared memory is used).

using boost::interprocess;
//Create a message_queue. If the queue
//exists throws an exception
message_queue mq
   (create_only         //only create
   ,"message_queue"     //name
   ,100                 //max message number
   ,100                 //max message size
   );
using boost::interprocess;
//Creates or opens a message_queue. If the queue
//does not exist creates it, otherwise opens it.
//Message number and size are ignored if the queue
//is opened
message_queue mq
   (open_or_create      //open or create
   ,"message_queue"     //name
   ,100                 //max message number
   ,100                 //max message size
   );
using boost::interprocess;
//Opens a message_queue. If the queue
//does not exist throws an exception.
message_queue mq
   (open_only           //only open
   ,"message_queue"     //name
   );

The message queue is explicitly removed calling the static remove function:

using boost::interprocess;
message_queue::remove("message_queue");

The function can fail if the message queue is still being used by any process.

To use a message queue you must include the following header:

#include <boost/interprocess/ipc/message_queue.hpp>

In the following example, the first process creates the message queue, and writes an array of integers on it. The other process just reads the array and checks that the sequence number is correct. This is the first process:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <vector>

using namespace boost::interprocess;

int main ()
{
   try{
      //Erase previous message queue
      message_queue::remove("message_queue");

      //Create a message_queue.
      message_queue mq
         (create_only               //only create
         ,"message_queue"           //name
         ,100                       //max message number
         ,sizeof(int)               //max message size
         );

      //Send 100 numbers
      for(int i = 0; i < 100; ++i){
         mq.send(&i, sizeof(i), 0);
      }
   }
   catch(interprocess_exception &ex){
      std::cout << ex.what() << std::endl;
      return 1;
   }

   return 0;
}

This is the second process:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <vector>

using namespace boost::interprocess;

int main ()
{
   try{
      //Open a message queue.
      message_queue mq
         (open_only        //only create
         ,"message_queue"  //name
         );

      unsigned int priority;
      message_queue::size_type recvd_size;

      //Receive 100 numbers
      for(int i = 0; i < 100; ++i){
         int number;
         mq.receive(&number, sizeof(number), recvd_size, priority);
         if(number != i || recvd_size != sizeof(number))
            return 1;
      }
   }
   catch(interprocess_exception &ex){
      message_queue::remove("message_queue");
      std::cout << ex.what() << std::endl;
      return 1;
   }
   message_queue::remove("message_queue");
   return 0;
}

To know more about this class and all its operations, please see the message_queue class reference.


PrevUpHomeNext