diff --git a/locked_queue.hh b/locked_queue.hh index e96faac..ccf45ea 100644 --- a/locked_queue.hh +++ b/locked_queue.hh @@ -15,6 +15,7 @@ namespace utility typedef std::recursive_mutex Mutex; typedef std::unique_lock Lock; + int _waiting = 0; Mutex _mtx; std::condition_variable_any _cv; std::deque _q; @@ -83,11 +84,14 @@ namespace utility bool try_pop_back(T& out, std::chrono::steady_clock::time_point end_time) { Lock L(_mtx); + ++_waiting; if(! _cv.wait_until(L, end_time, [this]{ return !_q.empty(); } ) ) { + --_waiting; return false; } + --_waiting; out = std::move(_q.back()); _q.pop_back(); return true; @@ -98,11 +102,14 @@ namespace utility bool try_pop_front(T& out, std::chrono::steady_clock::time_point end_time) { Lock L(_mtx); + ++_waiting; if(! _cv.wait_until(L, end_time, [this]{ return !_q.empty(); } ) ) { + --_waiting; return false; } + --_waiting; out = std::move(_q.front()); _q.pop_front(); return true; @@ -113,11 +120,14 @@ namespace utility bool try_pop_front(T& out, std::chrono::seconds duration) { Lock L(_mtx); + ++_waiting; if(! _cv.wait_for(L, duration, [this]{ return !_q.empty(); } ) ) { + --_waiting; return false; } + --_waiting; out = std::move(_q.front()); _q.pop_front(); return true; @@ -162,6 +172,13 @@ namespace utility Lock lg(_mtx); return _q.empty(); } + + // returns the number of threads that are waiting in pop_...() or try_pop_...() + int waiting() + { + Lock L(_mtx); + return _waiting; + } }; -} +} // end of namespace utility