Browse Source

merge "LPA-1" into default branch

sync
Roker 7 years ago
parent
commit
462ac26d0c
  1. 2
      Adapter.cc
  2. 2
      Adapter.hxx
  3. 107
      locked_queue.hh

2
Adapter.cc

@ -36,7 +36,7 @@ namespace pEp {
notifyHandshake_t _notifyHandshake = nullptr;
std::thread *_sync_thread = nullptr;
::utility::locked_queue< SYNC_EVENT > q;
::utility::locked_queue< SYNC_EVENT, &free_Sync_event> q;
std::mutex m;
int _inject_sync_event(SYNC_EVENT ev, void *management)

2
Adapter.hxx

@ -10,7 +10,7 @@ namespace pEp {
extern notifyHandshake_t _notifyHandshake;
extern std::thread *_sync_thread;
extern ::utility::locked_queue< SYNC_EVENT > q;
extern ::utility::locked_queue< SYNC_EVENT, &free_Sync_event > q;
extern std::mutex m;
SYNC_EVENT _retrieve_next_sync_event(void *management, time_t threshold);

107
locked_queue.hh

@ -4,60 +4,133 @@
#pragma once
#include <list>
#include <condition_variable>
#include <mutex>
namespace utility
{
using namespace std;
template<class T> class locked_queue
template<class T, void(*Deleter)(T)>
class locked_queue
{
mutex _mtx;
list<T> _q;
typedef std::recursive_mutex Mutex;
typedef std::unique_lock<Mutex> Lock;
Mutex _mtx;
std::condition_variable_any _cv;
std::list<T> _q;
public:
~locked_queue()
{
clear();
}
void clear()
{
Lock L(_mtx);
for(auto& element : _q)
{
Deleter(element);
}
_q.clear();
}
// undefined behavior when queue empty
T& back()
{
lock_guard<mutex> lg(_mtx);
Lock lg(_mtx);
return _q.back();
}
// undefined behavior when queue empty
T& front()
{
lock_guard<mutex> lg(_mtx);
Lock lg(_mtx);
return _q.front();
}
// returns a copy of the last element.
// blocks when queue is empty
T pop_back()
{
lock_guard<mutex> lg(_mtx);
T r = _q.back();
Lock L(_mtx);
_cv.wait(L, [&]{ return !_q.empty(); } );
T r{std::move(_q.back())};
_q.pop_back();
return r;
}
// returns a copy of the first element.
// blocks when queue is empty
T pop_front()
{
lock_guard<mutex> lg(_mtx);
T r = _q.front();
Lock L(_mtx);
_cv.wait(L, [&]{ return !_q.empty(); } );
T r{std::move(_q.front())};
_q.pop_front();
return r;
}
// returns true and set a copy of the last element and pop it from queue if there is any
// returns false and leaves 'out' untouched if queue is empty even after 'end_time'
bool try_pop_back(T& out, std::chrono::steady_clock::time_point end_time)
{
Lock L(_mtx);
if(! _cv.wait_until(L, end_time, [this]{ return !_q.empty(); } ) )
{
return false;
}
out = std::move(_q.back());
_q.pop_back();
return true;
}
// returns true and set a copy of the first element and pop it from queue if there is any
// returns false and leaves 'out' untouched if queue is empty even after 'end_time'
bool try_pop_front(T& out, std::chrono::steady_clock::time_point end_time)
{
Lock L(_mtx);
if(! _cv.wait_until(L, end_time, [this]{ return !_q.empty(); } ) )
{
return false;
}
out = std::move(_q.front());
_q.pop_front();
return true;
}
void push_back(const T& data)
{
lock_guard<mutex> lg(_mtx);
_q.push_back(data);
{
Lock L(_mtx);
_q.push_back(data);
}
_cv.notify_one();
}
void push_front(const T& data)
{
lock_guard<mutex> lg(_mtx);
_q.push_front(data);
{
Lock L(_mtx);
_q.push_front(data);
}
_cv.notify_one();
}
size_t size()
{
lock_guard<mutex> lg(_mtx);
Lock lg(_mtx);
return _q.size();
}
bool empty()
{
lock_guard<mutex> lg(_mtx);
Lock lg(_mtx);
return _q.empty();
}
};

Loading…
Cancel
Save