Browse Source

why a class anymore

LPA-1
Volker Birk 7 years ago
parent
commit
0293edca01
  1. 228
      Adapter.cc
  2. 56
      Adapter.hh
  3. 10
      test_adapter.cc

228
Adapter.cc

@ -7,142 +7,150 @@
#include <assert.h> #include <assert.h>
namespace pEp { namespace pEp {
using namespace std; namespace Adapter {
using namespace std;
void throw_status(PEP_STATUS status)
{
if (status == PEP_STATUS_OK)
return;
if (status >= 0x400 && status <= 0x4ff)
return;
if (status == PEP_OUT_OF_MEMORY)
throw bad_alloc();
if (status == PEP_ILLEGAL_VALUE)
throw invalid_argument("illegal value");
stringstream build;
build << setfill('0') << "p≡p 0x" << setw(4) << hex << status;
throw runtime_error(build.str());
}
messageToSend_t Adapter::_messageToSend = nullptr;
notifyHandshake_t Adapter::_notifyHandshake = nullptr;
std::thread *Adapter::_sync_thread = nullptr;
Adapter::Adapter(messageToSend_t messageToSend,
notifyHandshake_t notifyHandshake, void *obj)
{
startup(messageToSend, notifyHandshake, obj);
}
void Adapter::startup(messageToSend_t messageToSend,
notifyHandshake_t notifyHandshake, void *obj)
{
if (messageToSend)
_messageToSend = messageToSend;
if (notifyHandshake) static messageToSend_t _messageToSend = nullptr;
_notifyHandshake = notifyHandshake; static notifyHandshake_t _notifyHandshake = nullptr;
static std::thread *_sync_thread = nullptr;
session(); static ::utility::locked_queue< SYNC_EVENT >& queue()
{
static ::utility::locked_queue< SYNC_EVENT > q;
return q;
}
static std::mutex& mtx()
{ {
lock_guard<mutex> lock(mtx()); static std::mutex m;
return m;
}
if (!Adapter::_sync_thread) void throw_status(PEP_STATUS status)
Adapter::_sync_thread = new thread(sync_thread, obj); {
if (status == PEP_STATUS_OK)
return;
if (status >= 0x400 && status <= 0x4ff)
return;
if (status == PEP_OUT_OF_MEMORY)
throw bad_alloc();
if (status == PEP_ILLEGAL_VALUE)
throw invalid_argument("illegal value");
stringstream build;
build << setfill('0') << "p≡p 0x" << setw(4) << hex << status;
throw runtime_error(build.str());
} }
}
PEP_SESSION Adapter::session(session_action action) static int _inject_sync_event(SYNC_EVENT ev, void *management)
{ {
lock_guard<mutex> lock(mtx()); if (is_sync_thread(session())) {
PEP_STATUS status = do_sync_protocol_step(session(), nullptr, ev);
return status == PEP_STATUS_OK ? 0 : 1;
}
thread_local static PEP_SESSION _session = nullptr; try {
PEP_STATUS status = PEP_STATUS_OK; queue().push_front(ev);
}
catch (exception&) {
return 1;
}
return 0;
}
switch (action) { static SYNC_EVENT _retrieve_next_sync_event(void *management, time_t threshold)
case release: {
if (_session) { time_t started = time(nullptr);
::release(_session); bool timeout = false;
_session = nullptr;
while (queue().empty()) {
int i = 0;
++i;
if (i > 10) {
if (time(nullptr) > started + threshold) {
timeout = true;
break;
}
i = 0;
} }
break; nanosleep((const struct timespec[]){{0, 100000000L}}, NULL);
}
case init: if (timeout)
if (!_session) return new_sync_timeout_event();
status = ::init(&_session, _messageToSend, _inject_sync_event);
break;
default: return queue().pop_front();
status = PEP_ILLEGAL_VALUE;
} }
throw_status(status); static void sync_thread(void *obj)
return _session; {
} PEP_STATUS status = register_sync_callbacks(session(), nullptr,
_notifyHandshake, _retrieve_next_sync_event);
throw_status(status);
void Adapter::shutdown() do_sync_protocol(session(), obj);
{ unregister_sync_callbacks(session());
if (Adapter::_sync_thread) {
_inject_sync_event(nullptr, nullptr);
Adapter::_sync_thread->join();
delete Adapter::_sync_thread;
Adapter::_sync_thread = nullptr;
}
session(release);
}
int Adapter::_inject_sync_event(SYNC_EVENT ev, void *management) session(release);
{
if (is_sync_thread(session())) {
PEP_STATUS status = do_sync_protocol_step(session(), nullptr, ev);
return status == PEP_STATUS_OK ? 0 : 1;
} }
try { void startup(messageToSend_t messageToSend,
queue().push_front(ev); notifyHandshake_t notifyHandshake, void *obj)
} {
catch (exception&) { if (messageToSend)
return 1; _messageToSend = messageToSend;
}
return 0;
}
SYNC_EVENT Adapter::_retrieve_next_sync_event(void *management, time_t threshold) if (notifyHandshake)
{ _notifyHandshake = notifyHandshake;
time_t started = time(nullptr);
bool timeout = false; session();
while (queue().empty()) { {
int i = 0; lock_guard<mutex> lock(mtx());
++i;
if (i > 10) { if (!_sync_thread)
if (time(nullptr) > started + threshold) { _sync_thread = new thread(sync_thread, obj);
timeout = true;
break;
}
i = 0;
} }
nanosleep((const struct timespec[]){{0, 100000000L}}, NULL);
} }
if (timeout) PEP_SESSION session(session_action action)
return new_sync_timeout_event(); {
lock_guard<mutex> lock(mtx());
return queue().pop_front(); thread_local static PEP_SESSION _session = nullptr;
} PEP_STATUS status = PEP_STATUS_OK;
switch (action) {
case release:
if (_session) {
::release(_session);
_session = nullptr;
}
break;
case init:
if (!_session)
status = ::init(&_session, _messageToSend, _inject_sync_event);
break;
void Adapter::sync_thread(void *obj) default:
{ status = PEP_ILLEGAL_VALUE;
PEP_STATUS status = register_sync_callbacks(session(), nullptr, }
_notifyHandshake, _retrieve_next_sync_event);
throw_status(status);
do_sync_protocol(session(), obj); throw_status(status);
unregister_sync_callbacks(session()); return _session;
}
session(release); void shutdown()
{
if (_sync_thread) {
_inject_sync_event(nullptr, nullptr);
_sync_thread->join();
delete _sync_thread;
_sync_thread = nullptr;
}
session(release);
}
} }
} }

56
Adapter.hh

@ -10,47 +10,19 @@
namespace pEp { namespace pEp {
void throw_status(PEP_STATUS status); void throw_status(PEP_STATUS status);
class Adapter { namespace Adapter {
static messageToSend_t _messageToSend; void throw_status(PEP_STATUS status);
static notifyHandshake_t _notifyHandshake;
static std::thread *_sync_thread; void startup(messageToSend_t messageToSend,
notifyHandshake_t notifyHandshake, void *obj = nullptr);
public:
Adapter(messageToSend_t messageToSend, enum session_action {
notifyHandshake_t notifyHandshake, void *obj = nullptr); init,
release
static void startup(messageToSend_t messageToSend, };
notifyHandshake_t notifyHandshake, void *obj = nullptr); PEP_SESSION session(session_action action = init);
enum session_action { void shutdown();
init, }
release
};
static PEP_SESSION session(session_action action = init);
static void release_session()
{
session(release);
}
static void shutdown();
protected:
static int _inject_sync_event(SYNC_EVENT ev, void *management);
static SYNC_EVENT _retrieve_next_sync_event(void *management, time_t threshold);
static void sync_thread(void *obj);
private:
static ::utility::locked_queue< SYNC_EVENT >& queue()
{
static ::utility::locked_queue< SYNC_EVENT > q;
return q;
}
static std::mutex& mtx()
{
static std::mutex m;
return m;
}
};
} }

10
test_adapter.cc

@ -7,8 +7,8 @@
#include <unistd.h> #include <unistd.h>
#include <pEp/keymanagement.h> #include <pEp/keymanagement.h>
using namespace pEp;
using namespace std; using namespace std;
using namespace pEp::Adapter;
PEP_STATUS messageToSend(struct _message *msg) PEP_STATUS messageToSend(struct _message *msg)
{ {
@ -27,15 +27,15 @@ int main()
cout << "updating or creating identity for me\n"; cout << "updating or creating identity for me\n";
pEp_identity *me = new_identity("alice@peptest.ch", NULL, "23", "Who the F* is Alice"); pEp_identity *me = new_identity("alice@peptest.ch", NULL, "23", "Who the F* is Alice");
assert(me); assert(me);
PEP_STATUS status = myself(Adapter::session(), me); PEP_STATUS status = myself(session(), me);
free_identity(me);
throw_status(status); throw_status(status);
cout << "starting the adapter including sync\n"; cout << "starting the adapter including sync\n";
Adapter::startup(messageToSend, notifyHandshake); startup(messageToSend, notifyHandshake);
sleep(3); sleep(3);
Adapter::shutdown(); shutdown();
free_identity(me);
return 0; return 0;
} }

Loading…
Cancel
Save