Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

boost/interprocess/sync/windows/condition.hpp

//////////////////////////////////////////////////////////////////////////////
//
// (C) Copyright Ion Gaztanaga 2005-2011. Distributed under the Boost
// Software License, Version 1.0. (See accompanying file
// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// See http://www.boost.org/libs/interprocess for documentation.
//
//////////////////////////////////////////////////////////////////////////////

#ifndef BOOST_INTERPROCESS_DETAIL_WINDOWS_CONDITION_HPP
#define BOOST_INTERPROCESS_DETAIL_WINDOWS_CONDITION_HPP

#include <boost/interprocess/detail/config_begin.hpp>
#include <boost/interprocess/detail/workaround.hpp>
#include <boost/interprocess/detail/posix_time_types_wrk.hpp>

#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/exceptions.hpp>
#include <boost/interprocess/sync/windows/semaphore.hpp>
#include <boost/interprocess/sync/windows/mutex.hpp>
#include <boost/cstdint.hpp>
#include <limits>

////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
// 
// Condition variable algorithm taken from pthreads-win32 discussion.
//
// The algorithm was developed by Alexander Terekhov in colaboration with
// Louis Thomas.
// 
//     Algorithm 8a / IMPL_SEM,UNBLOCK_STRATEGY == UNBLOCK_ALL
//
// semBlockLock - bin.semaphore
// semBlockQueue - semaphore
// mtxExternal - mutex or CS
// mtxUnblockLock - mutex or CS
// nWaitersGone - int
// nWaitersBlocked - int
// nWaitersToUnblock - int
// 
// wait( timeout ) {
// 
//   [auto: register int result          ]     // error checking omitted
//   [auto: register int nSignalsWasLeft ]
//   [auto: register int nWaitersWasGone ]
// 
//   sem_wait( semBlockLock );
//   nWaitersBlocked++;
//   sem_post( semBlockLock );
// 
//   unlock( mtxExternal );
//   bTimedOut = sem_wait( semBlockQueue,timeout );
// 
//   lock( mtxUnblockLock );
//   if ( 0 != (nSignalsWasLeft = nWaitersToUnblock) ) {
//     if ( bTimedOut ) {                       // timeout (or canceled)
//       if ( 0 != nWaitersBlocked ) {
//         nWaitersBlocked--;
//       }
//       else {
//         nWaitersGone++;                     // count spurious wakeups.
//       }
//     }
//     if ( 0 == --nWaitersToUnblock ) {
//       if ( 0 != nWaitersBlocked ) {
//         sem_post( semBlockLock );           // open the gate.
//         nSignalsWasLeft = 0;                // do not open the gate
//                                             // below again.
//       }
//       else if ( 0 != (nWaitersWasGone = nWaitersGone) ) {
//         nWaitersGone = 0;
//       }
//     }
//   }
//   else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or
//                                             // spurious semaphore :-)
//     sem_wait( semBlockLock );
//     nWaitersBlocked -= nWaitersGone;     // something is going on here
//                                          //  - test of timeouts? :-)
//     sem_post( semBlockLock );
//     nWaitersGone = 0;
//   }
//   unlock( mtxUnblockLock );
// 
//   if ( 1 == nSignalsWasLeft ) {
//     if ( 0 != nWaitersWasGone ) {
//       // sem_adjust( semBlockQueue,-nWaitersWasGone );
//       while ( nWaitersWasGone-- ) {
//         sem_wait( semBlockQueue );       // better now than spurious later
//       }
//     } sem_post( semBlockLock );          // open the gate
//   }
// 
//   lock( mtxExternal );
// 
//   return ( bTimedOut ) ? ETIMEOUT : 0;
// }
// 
// signal(bAll) {
// 
//   [auto: register int result         ]
//   [auto: register int nSignalsToIssue]
// 
//   lock( mtxUnblockLock );
// 
//   if ( 0 != nWaitersToUnblock ) {        // the gate is closed!!!
//     if ( 0 == nWaitersBlocked ) {        // NO-OP
//       return unlock( mtxUnblockLock );
//     }
//     if (bAll) {
//       nWaitersToUnblock += nSignalsToIssue=nWaitersBlocked;
//       nWaitersBlocked = 0;
//     }
//     else {
//       nSignalsToIssue = 1;
//       nWaitersToUnblock++;
//       nWaitersBlocked--;
//     }
//   }
//   else if ( nWaitersBlocked > nWaitersGone ) { // HARMLESS RACE CONDITION!
//     sem_wait( semBlockLock );                  // close the gate
//     if ( 0 != nWaitersGone ) {
//       nWaitersBlocked -= nWaitersGone;
//       nWaitersGone = 0;
//     }
//     if (bAll) {
//       nSignalsToIssue = nWaitersToUnblock = nWaitersBlocked;
//       nWaitersBlocked = 0;
//     }
//     else {
//       nSignalsToIssue = nWaitersToUnblock = 1;
//       nWaitersBlocked--;
//     }
//   }
//   else { // NO-OP
//     return unlock( mtxUnblockLock );
//   }
// 
//   unlock( mtxUnblockLock );
//   sem_post( semBlockQueue,nSignalsToIssue );
//   return result;
// }
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////

namespace boost {
namespace interprocess {
namespace ipcdetail {

class windows_condition
{
   windows_condition(const windows_condition &);
   windows_condition &operator=(const windows_condition &);
   public:
   windows_condition();
   ~windows_condition();

   void notify_one();
   void notify_all();

   template <typename L>
   bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time)
   {
      if(abs_time == boost::posix_time::pos_infin){
         this->wait(lock);
         return true;
      }
      if (!lock)
         throw lock_exception();
      return this->do_timed_wait(abs_time, *lock.mutex());
   }

   template <typename L, typename Pr>
   bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time, Pr pred)
   {
      if(abs_time == boost::posix_time::pos_infin){
         this->wait(lock, pred);
         return true;
      }
      if (!lock)
         throw lock_exception();
      while (!pred()){
         if (!this->do_timed_wait(abs_time, *lock.mutex()))
            return pred();
      }
      return true;
   }

   template <typename L>
   void wait(L& lock)
   {
      if (!lock)
         throw lock_exception();
      do_wait(*lock.mutex());
   }

   template <typename L, typename Pr>
   void wait(L& lock, Pr pred)
   {
      if (!lock)
         throw lock_exception();

      while (!pred())
         do_wait(*lock.mutex());
   }

   template<class InterprocessMutex>
   void do_wait(InterprocessMutex &mut);

   template<class InterprocessMutex>
   bool do_timed_wait(const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);

   private:

   template<class InterprocessMutex>
   bool do_timed_wait(bool timeout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
   void do_signal    (bool broadcast);

   boost::int32_t    m_nwaiters_blocked;
   boost::int32_t    m_nwaiters_gone;
   boost::int32_t    m_nwaiters_to_unblock;
   windows_semaphore m_sem_block_queue;
   windows_semaphore m_sem_block_lock;
   windows_mutex     m_mtx_unblock_lock;
};

inline windows_condition::windows_condition()
 : m_nwaiters_blocked(0)
 , m_nwaiters_gone(0)
 , m_nwaiters_to_unblock(0)
 , m_sem_block_queue(0)
 , m_sem_block_lock(1)
 , m_mtx_unblock_lock()
{}

inline windows_condition::~windows_condition()
{}

inline void windows_condition::notify_one()
{  this->do_signal(false);  }

inline void windows_condition::notify_all()
{  this->do_signal(true);  }

inline void windows_condition::do_signal(bool broadcast)
{
   boost::int32_t nsignals_to_issue;

   {
      scoped_lock<windows_mutex> locker(m_mtx_unblock_lock);

      if ( 0 != m_nwaiters_to_unblock ) {        // the gate is closed!!!
         if ( 0 == m_nwaiters_blocked ) {        // NO-OP
            //locker's destructor triggers m_mtx_unblock_lock.unlock() 
            return;
         }
         if (broadcast) {
            m_nwaiters_to_unblock += nsignals_to_issue = m_nwaiters_blocked;
            m_nwaiters_blocked = 0;
         }
         else {
            nsignals_to_issue = 1;
            m_nwaiters_to_unblock++;
            m_nwaiters_blocked--;
         }
      }
      else if ( m_nwaiters_blocked > m_nwaiters_gone ) { // HARMLESS RACE CONDITION!
         m_sem_block_lock.wait();                      // close the gate
         if ( 0 != m_nwaiters_gone ) {
            m_nwaiters_blocked -= m_nwaiters_gone;
            m_nwaiters_gone = 0;
         }
         if (broadcast) {
            nsignals_to_issue = m_nwaiters_to_unblock = m_nwaiters_blocked;
            m_nwaiters_blocked = 0;
         }
         else {
            nsignals_to_issue = m_nwaiters_to_unblock = 1;
            m_nwaiters_blocked--;
         }
      }
      else { // NO-OP
         //locker's destructor triggers m_mtx_unblock_lock.unlock() 
         return;
      }
      //locker's destructor triggers m_mtx_unblock_lock.unlock() 
   }
   m_sem_block_queue.post(nsignals_to_issue);
}

template<class InterprocessMutex>
inline void windows_condition::do_wait(InterprocessMutex &mut)
{  this->do_timed_wait(false, boost::posix_time::ptime(), mut);  }

template<class InterprocessMutex>
inline bool windows_condition::do_timed_wait
   (const boost::posix_time::ptime &abs_time, InterprocessMutex &mut)
{  return this->do_timed_wait(true, abs_time, mut);  }

template<class InterprocessMutex>
inline bool windows_condition::do_timed_wait
   (bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mtxExternal)
{
   //Initialize to avoid warnings
   boost::int32_t nsignals_was_left = 0;
   boost::int32_t nwaiters_was_gone = 0;
 
   m_sem_block_lock.wait();
   ++m_nwaiters_blocked;
   m_sem_block_lock.post();

   struct scoped_unlock
   {
      InterprocessMutex & mut;
      scoped_unlock(InterprocessMutex & m)
         : mut(m)
      {  m.unlock();  }

      ~scoped_unlock()
      {  mut.lock();  }
   } unlocker(mtxExternal);
 

   bool bTimedOut = tout_enabled ? !m_sem_block_queue.timed_wait(abs_time) : (m_sem_block_queue.wait(), false);
 
   {
      scoped_lock<windows_mutex> locker(m_mtx_unblock_lock);
      if ( 0 != (nsignals_was_left = m_nwaiters_to_unblock) ) {
         if ( bTimedOut ) {                       // timeout (or canceled)
            if ( 0 != m_nwaiters_blocked ) {
               m_nwaiters_blocked--;
            }
            else {
               m_nwaiters_gone++;                     // count spurious wakeups.
            }
         }
         if ( 0 == --m_nwaiters_to_unblock ) {
            if ( 0 != m_nwaiters_blocked ) {
               m_sem_block_lock.post();          // open the gate.
               nsignals_was_left = 0;          // do not open the gate below again.
            }
            else if ( 0 != (nwaiters_was_gone = m_nwaiters_gone) ) {
               m_nwaiters_gone = 0;
            }
         }
      }
      else if ( (std::numeric_limits<boost::int32_t>::max)()/2
                == ++m_nwaiters_gone ) { // timeout/canceled or spurious semaphore :-)
         m_sem_block_lock.wait();
         m_nwaiters_blocked -= m_nwaiters_gone;       // something is going on here - test of timeouts? :-)
         m_sem_block_lock.post();
         m_nwaiters_gone = 0;
      }
      //locker's destructor triggers m_mtx_unblock_lock.unlock() 
   }  
 
   if ( 1 == nsignals_was_left ) {
      if ( 0 != nwaiters_was_gone ) {
         // sem_adjust( m_sem_block_queue,-nwaiters_was_gone );
         while ( nwaiters_was_gone-- ) {
            m_sem_block_queue.wait();       // better now than spurious later
         }
      }
      m_sem_block_lock.post(); // open the gate
   }
 
   //mtxExternal.lock(); called from unlocker
 
   return ( bTimedOut ) ? false : true;
}

}  //namespace ipcdetail
}  //namespace interprocess
}  //namespace boost

#include <boost/interprocess/detail/config_end.hpp>

#endif   //BOOST_INTERPROCESS_DETAIL_WINDOWS_CONDITION_HPP