Browse Source

merging

pull/1/head
Volker Birk 5 years ago
parent
commit
f3eed22158
  1. 19
      locked_queue.hh
  2. 1
      message_cache.cc

19
locked_queue.hh

@ -15,6 +15,7 @@ namespace utility
typedef std::recursive_mutex Mutex; typedef std::recursive_mutex Mutex;
typedef std::unique_lock<Mutex> Lock; typedef std::unique_lock<Mutex> Lock;
int _waiting = 0;
Mutex _mtx; Mutex _mtx;
std::condition_variable_any _cv; std::condition_variable_any _cv;
std::deque<T> _q; std::deque<T> _q;
@ -83,11 +84,14 @@ namespace utility
bool try_pop_back(T& out, std::chrono::steady_clock::time_point end_time) bool try_pop_back(T& out, std::chrono::steady_clock::time_point end_time)
{ {
Lock L(_mtx); Lock L(_mtx);
++_waiting;
if(! _cv.wait_until(L, end_time, [this]{ return !_q.empty(); } ) ) if(! _cv.wait_until(L, end_time, [this]{ return !_q.empty(); } ) )
{ {
--_waiting;
return false; return false;
} }
--_waiting;
out = std::move(_q.back()); out = std::move(_q.back());
_q.pop_back(); _q.pop_back();
return true; return true;
@ -98,11 +102,14 @@ namespace utility
bool try_pop_front(T& out, std::chrono::steady_clock::time_point end_time) bool try_pop_front(T& out, std::chrono::steady_clock::time_point end_time)
{ {
Lock L(_mtx); Lock L(_mtx);
++_waiting;
if(! _cv.wait_until(L, end_time, [this]{ return !_q.empty(); } ) ) if(! _cv.wait_until(L, end_time, [this]{ return !_q.empty(); } ) )
{ {
--_waiting;
return false; return false;
} }
--_waiting;
out = std::move(_q.front()); out = std::move(_q.front());
_q.pop_front(); _q.pop_front();
return true; return true;
@ -113,11 +120,14 @@ namespace utility
bool try_pop_front(T& out, std::chrono::seconds duration) bool try_pop_front(T& out, std::chrono::seconds duration)
{ {
Lock L(_mtx); Lock L(_mtx);
++_waiting;
if(! _cv.wait_for(L, duration, [this]{ return !_q.empty(); } ) ) if(! _cv.wait_for(L, duration, [this]{ return !_q.empty(); } ) )
{ {
--_waiting;
return false; return false;
} }
--_waiting;
out = std::move(_q.front()); out = std::move(_q.front());
_q.pop_front(); _q.pop_front();
return true; return true;
@ -162,6 +172,13 @@ namespace utility
Lock lg(_mtx); Lock lg(_mtx);
return _q.empty(); return _q.empty();
} }
};
// returns the number of threads that are waiting in pop_...() or try_pop_...()
int waiting()
{
Lock L(_mtx);
return _waiting;
} }
};
} // end of namespace utility

1
message_cache.cc

@ -204,6 +204,7 @@ namespace pEp {
} }
static ::message *empty_message_copy(const ::message *src, std::string _id = "") static ::message *empty_message_copy(const ::message *src, std::string _id = "")
static ::message *empty_message_copy(const ::message *src)
{ {
if (!src) if (!src)
return nullptr; return nullptr;

Loading…
Cancel
Save