boost/interprocess/sync/windows/condition.hpp
//////////////////////////////////////////////////////////////////////////////
//
// (C) Copyright Ion Gaztanaga 2005-2011. Distributed under the Boost
// Software License, Version 1.0. (See accompanying file
// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// See http://www.boost.org/libs/interprocess for documentation.
//
//////////////////////////////////////////////////////////////////////////////
#ifndef BOOST_INTERPROCESS_DETAIL_WINDOWS_CONDITION_HPP
#define BOOST_INTERPROCESS_DETAIL_WINDOWS_CONDITION_HPP
#include <boost/interprocess/detail/config_begin.hpp>
#include <boost/interprocess/detail/workaround.hpp>
#include <boost/interprocess/detail/posix_time_types_wrk.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/exceptions.hpp>
#include <boost/interprocess/sync/windows/semaphore.hpp>
#include <boost/interprocess/sync/windows/mutex.hpp>
#include <boost/cstdint.hpp>
#include <limits>
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
//
// Condition variable algorithm taken from pthreads-win32 discussion.
//
// The algorithm was developed by Alexander Terekhov in colaboration with
// Louis Thomas.
//
// Algorithm 8a / IMPL_SEM,UNBLOCK_STRATEGY == UNBLOCK_ALL
//
// semBlockLock - bin.semaphore
// semBlockQueue - semaphore
// mtxExternal - mutex or CS
// mtxUnblockLock - mutex or CS
// nWaitersGone - int
// nWaitersBlocked - int
// nWaitersToUnblock - int
//
// wait( timeout ) {
//
// [auto: register int result ] // error checking omitted
// [auto: register int nSignalsWasLeft ]
// [auto: register int nWaitersWasGone ]
//
// sem_wait( semBlockLock );
// nWaitersBlocked++;
// sem_post( semBlockLock );
//
// unlock( mtxExternal );
// bTimedOut = sem_wait( semBlockQueue,timeout );
//
// lock( mtxUnblockLock );
// if ( 0 != (nSignalsWasLeft = nWaitersToUnblock) ) {
// if ( bTimedOut ) { // timeout (or canceled)
// if ( 0 != nWaitersBlocked ) {
// nWaitersBlocked--;
// }
// else {
// nWaitersGone++; // count spurious wakeups.
// }
// }
// if ( 0 == --nWaitersToUnblock ) {
// if ( 0 != nWaitersBlocked ) {
// sem_post( semBlockLock ); // open the gate.
// nSignalsWasLeft = 0; // do not open the gate
// // below again.
// }
// else if ( 0 != (nWaitersWasGone = nWaitersGone) ) {
// nWaitersGone = 0;
// }
// }
// }
// else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or
// // spurious semaphore :-)
// sem_wait( semBlockLock );
// nWaitersBlocked -= nWaitersGone; // something is going on here
// // - test of timeouts? :-)
// sem_post( semBlockLock );
// nWaitersGone = 0;
// }
// unlock( mtxUnblockLock );
//
// if ( 1 == nSignalsWasLeft ) {
// if ( 0 != nWaitersWasGone ) {
// // sem_adjust( semBlockQueue,-nWaitersWasGone );
// while ( nWaitersWasGone-- ) {
// sem_wait( semBlockQueue ); // better now than spurious later
// }
// } sem_post( semBlockLock ); // open the gate
// }
//
// lock( mtxExternal );
//
// return ( bTimedOut ) ? ETIMEOUT : 0;
// }
//
// signal(bAll) {
//
// [auto: register int result ]
// [auto: register int nSignalsToIssue]
//
// lock( mtxUnblockLock );
//
// if ( 0 != nWaitersToUnblock ) { // the gate is closed!!!
// if ( 0 == nWaitersBlocked ) { // NO-OP
// return unlock( mtxUnblockLock );
// }
// if (bAll) {
// nWaitersToUnblock += nSignalsToIssue=nWaitersBlocked;
// nWaitersBlocked = 0;
// }
// else {
// nSignalsToIssue = 1;
// nWaitersToUnblock++;
// nWaitersBlocked--;
// }
// }
// else if ( nWaitersBlocked > nWaitersGone ) { // HARMLESS RACE CONDITION!
// sem_wait( semBlockLock ); // close the gate
// if ( 0 != nWaitersGone ) {
// nWaitersBlocked -= nWaitersGone;
// nWaitersGone = 0;
// }
// if (bAll) {
// nSignalsToIssue = nWaitersToUnblock = nWaitersBlocked;
// nWaitersBlocked = 0;
// }
// else {
// nSignalsToIssue = nWaitersToUnblock = 1;
// nWaitersBlocked--;
// }
// }
// else { // NO-OP
// return unlock( mtxUnblockLock );
// }
//
// unlock( mtxUnblockLock );
// sem_post( semBlockQueue,nSignalsToIssue );
// return result;
// }
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
namespace boost {
namespace interprocess {
namespace ipcdetail {
class windows_condition
{
windows_condition(const windows_condition &);
windows_condition &operator=(const windows_condition &);
public:
windows_condition();
~windows_condition();
void notify_one();
void notify_all();
template <typename L>
bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time)
{
if(abs_time == boost::posix_time::pos_infin){
this->wait(lock);
return true;
}
if (!lock)
throw lock_exception();
return this->do_timed_wait(abs_time, *lock.mutex());
}
template <typename L, typename Pr>
bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time, Pr pred)
{
if(abs_time == boost::posix_time::pos_infin){
this->wait(lock, pred);
return true;
}
if (!lock)
throw lock_exception();
while (!pred()){
if (!this->do_timed_wait(abs_time, *lock.mutex()))
return pred();
}
return true;
}
template <typename L>
void wait(L& lock)
{
if (!lock)
throw lock_exception();
do_wait(*lock.mutex());
}
template <typename L, typename Pr>
void wait(L& lock, Pr pred)
{
if (!lock)
throw lock_exception();
while (!pred())
do_wait(*lock.mutex());
}
template<class InterprocessMutex>
void do_wait(InterprocessMutex &mut);
template<class InterprocessMutex>
bool do_timed_wait(const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
private:
template<class InterprocessMutex>
bool do_timed_wait(bool timeout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
void do_signal (bool broadcast);
boost::int32_t m_nwaiters_blocked;
boost::int32_t m_nwaiters_gone;
boost::int32_t m_nwaiters_to_unblock;
windows_semaphore m_sem_block_queue;
windows_semaphore m_sem_block_lock;
windows_mutex m_mtx_unblock_lock;
};
inline windows_condition::windows_condition()
: m_nwaiters_blocked(0)
, m_nwaiters_gone(0)
, m_nwaiters_to_unblock(0)
, m_sem_block_queue(0)
, m_sem_block_lock(1)
, m_mtx_unblock_lock()
{}
inline windows_condition::~windows_condition()
{}
inline void windows_condition::notify_one()
{ this->do_signal(false); }
inline void windows_condition::notify_all()
{ this->do_signal(true); }
inline void windows_condition::do_signal(bool broadcast)
{
boost::int32_t nsignals_to_issue;
{
scoped_lock<windows_mutex> locker(m_mtx_unblock_lock);
if ( 0 != m_nwaiters_to_unblock ) { // the gate is closed!!!
if ( 0 == m_nwaiters_blocked ) { // NO-OP
//locker's destructor triggers m_mtx_unblock_lock.unlock()
return;
}
if (broadcast) {
m_nwaiters_to_unblock += nsignals_to_issue = m_nwaiters_blocked;
m_nwaiters_blocked = 0;
}
else {
nsignals_to_issue = 1;
m_nwaiters_to_unblock++;
m_nwaiters_blocked--;
}
}
else if ( m_nwaiters_blocked > m_nwaiters_gone ) { // HARMLESS RACE CONDITION!
m_sem_block_lock.wait(); // close the gate
if ( 0 != m_nwaiters_gone ) {
m_nwaiters_blocked -= m_nwaiters_gone;
m_nwaiters_gone = 0;
}
if (broadcast) {
nsignals_to_issue = m_nwaiters_to_unblock = m_nwaiters_blocked;
m_nwaiters_blocked = 0;
}
else {
nsignals_to_issue = m_nwaiters_to_unblock = 1;
m_nwaiters_blocked--;
}
}
else { // NO-OP
//locker's destructor triggers m_mtx_unblock_lock.unlock()
return;
}
//locker's destructor triggers m_mtx_unblock_lock.unlock()
}
m_sem_block_queue.post(nsignals_to_issue);
}
template<class InterprocessMutex>
inline void windows_condition::do_wait(InterprocessMutex &mut)
{ this->do_timed_wait(false, boost::posix_time::ptime(), mut); }
template<class InterprocessMutex>
inline bool windows_condition::do_timed_wait
(const boost::posix_time::ptime &abs_time, InterprocessMutex &mut)
{ return this->do_timed_wait(true, abs_time, mut); }
template<class InterprocessMutex>
inline bool windows_condition::do_timed_wait
(bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mtxExternal)
{
//Initialize to avoid warnings
boost::int32_t nsignals_was_left = 0;
boost::int32_t nwaiters_was_gone = 0;
m_sem_block_lock.wait();
++m_nwaiters_blocked;
m_sem_block_lock.post();
struct scoped_unlock
{
InterprocessMutex & mut;
scoped_unlock(InterprocessMutex & m)
: mut(m)
{ m.unlock(); }
~scoped_unlock()
{ mut.lock(); }
} unlocker(mtxExternal);
bool bTimedOut = tout_enabled ? !m_sem_block_queue.timed_wait(abs_time) : (m_sem_block_queue.wait(), false);
{
scoped_lock<windows_mutex> locker(m_mtx_unblock_lock);
if ( 0 != (nsignals_was_left = m_nwaiters_to_unblock) ) {
if ( bTimedOut ) { // timeout (or canceled)
if ( 0 != m_nwaiters_blocked ) {
m_nwaiters_blocked--;
}
else {
m_nwaiters_gone++; // count spurious wakeups.
}
}
if ( 0 == --m_nwaiters_to_unblock ) {
if ( 0 != m_nwaiters_blocked ) {
m_sem_block_lock.post(); // open the gate.
nsignals_was_left = 0; // do not open the gate below again.
}
else if ( 0 != (nwaiters_was_gone = m_nwaiters_gone) ) {
m_nwaiters_gone = 0;
}
}
}
else if ( (std::numeric_limits<boost::int32_t>::max)()/2
== ++m_nwaiters_gone ) { // timeout/canceled or spurious semaphore :-)
m_sem_block_lock.wait();
m_nwaiters_blocked -= m_nwaiters_gone; // something is going on here - test of timeouts? :-)
m_sem_block_lock.post();
m_nwaiters_gone = 0;
}
//locker's destructor triggers m_mtx_unblock_lock.unlock()
}
if ( 1 == nsignals_was_left ) {
if ( 0 != nwaiters_was_gone ) {
// sem_adjust( m_sem_block_queue,-nwaiters_was_gone );
while ( nwaiters_was_gone-- ) {
m_sem_block_queue.wait(); // better now than spurious later
}
}
m_sem_block_lock.post(); // open the gate
}
//mtxExternal.lock(); called from unlocker
return ( bTimedOut ) ? false : true;
}
} //namespace ipcdetail
} //namespace interprocess
} //namespace boost
#include <boost/interprocess/detail/config_end.hpp>
#endif //BOOST_INTERPROCESS_DETAIL_WINDOWS_CONDITION_HPP