Browse Source

Adapter.*: Add Sync/Async switchable Event execution. rework needed, higher level API, as a class.

pull/8/head
heck 4 years ago
parent
commit
1a14b1a205
  1. 1
      .clang-format
  2. 105
      src/Adapter.cc
  3. 22
      src/Adapter.hh
  4. 10
      src/Adapter.hxx

1
.clang-format

@ -1,4 +1,3 @@
# Generated from CLion C/C++ Code Style settings
BasedOnStyle: LLVM BasedOnStyle: LLVM
Language: Cpp Language: Cpp
DerivePointerAlignment: true DerivePointerAlignment: true

105
src/Adapter.cc

@ -42,19 +42,96 @@ namespace pEp {
} }
namespace Adapter { namespace Adapter {
// private
SyncModes _sync_mode = SyncModes::Async;
::messageToSend_t _messageToSend = nullptr; ::messageToSend_t _messageToSend = nullptr;
::notifyHandshake_t _notifyHandshake = nullptr; ::notifyHandshake_t _notifyHandshake = nullptr;
bool _adapter_manages_sync_thread = false;
::inject_sync_event_t _inject_action = _queue_sync_event;
std::thread _sync_thread; std::thread _sync_thread;
::utility::locked_queue<SYNC_EVENT, ::free_Sync_event> sync_evt_q; ::utility::locked_queue<SYNC_EVENT, ::free_Sync_event> sync_evt_q;
std::mutex m; std::mutex mut;
// private
std::thread::id sync_thread_id() std::thread::id sync_thread_id()
{ {
return _sync_thread.get_id(); return _sync_thread.get_id();
} }
int _inject_sync_event(::SYNC_EVENT ev, void *management) // public
void sync_initialize(
SyncModes mode,
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
bool adapter_manages_sync_thread)
{
_messageToSend = messageToSend;
_notifyHandshake = notifyHandshake;
_adapter_manages_sync_thread = adapter_manages_sync_thread;
set_sync_mode(mode);
return;
}
// public
void set_sync_mode(SyncModes mode)
{
// std::lock_guard<mutex> lock(mut);
_sync_mode = mode;
if (_sync_mode == SyncModes::Sync) {
// init sesssion with inject_sync = process
// stop sync
session(release);
_inject_action = _process_sync_event;
session(init);
::register_sync_callbacks(session(), nullptr, _notifyHandshake, _retrieve_next_sync_event);
if(!_adapter_manages_sync_thread) {
shutdown();
} else {
// The adapter need to shutdown sync thread
}
}
if (_sync_mode == SyncModes::Async) {
// init session with inject_sync = queue
// start sync thread
session(release);
_inject_action = _queue_sync_event;
session(init);
if(!_adapter_manages_sync_thread) {
if (!is_sync_running()) {
startup<void>(_messageToSend, _notifyHandshake, nullptr, nullptr);
}
} else {
// The adapter need to do sync thread start up
}
}
if (_sync_mode == SyncModes::Off) {
// init sesssion with inject_sync = null
// stop sync thread
if(!_adapter_manages_sync_thread) {
shutdown();
} else {
// Adapter needs to shutdown sync thread
}
session(release);
_inject_action = _queue_sync_event;
session(init);
}
return;
}
// private
int _process_sync_event(::SYNC_EVENT ev, void *management)
{
if (ev != nullptr) {
::do_sync_protocol_step(session(), nullptr, ev);
return 0;
} else {
return 0;
}
}
// private
int _queue_sync_event(::SYNC_EVENT ev, void *management)
{ {
try { try {
if (ev == nullptr) { if (ev == nullptr) {
@ -69,12 +146,13 @@ namespace pEp {
return 0; return 0;
} }
// private
PEP_STATUS _ensure_passphrase(::PEP_SESSION session, const char *fpr) PEP_STATUS _ensure_passphrase(::PEP_SESSION session, const char *fpr)
{ {
return passphrase_cache.ensure_passphrase(session, fpr); return passphrase_cache.ensure_passphrase(session, fpr);
} }
// threshold: max waiting time in seconds // public
::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold) ::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold)
{ {
::SYNC_EVENT syncEvent = nullptr; ::SYNC_EVENT syncEvent = nullptr;
@ -87,14 +165,16 @@ namespace pEp {
return syncEvent; return syncEvent;
} }
// public
bool on_sync_thread() bool on_sync_thread()
{ {
return _sync_thread.get_id() == this_thread::get_id(); return _sync_thread.get_id() == this_thread::get_id();
} }
// public
::PEP_SESSION Session::operator()(session_action action) ::PEP_SESSION Session::operator()(session_action action)
{ {
std::lock_guard<mutex> lock(m); std::lock_guard<mutex> lock(mut);
::PEP_STATUS status = ::PEP_STATUS_OK; ::PEP_STATUS status = ::PEP_STATUS_OK;
@ -108,12 +188,11 @@ namespace pEp {
case init: case init:
if (!_session.get()) { if (!_session.get()) {
::PEP_SESSION session_; ::PEP_SESSION session_;
status = ::init(&session_, _messageToSend, _inject_sync_event, _ensure_passphrase); status = ::init(&session_, _messageToSend, _inject_action, _ensure_passphrase);
throw_status(status); throw_status(status);
_session = SessionPtr{session_, ::release}; _session = SessionPtr{session_, ::release};
} }
break; break;
default: default:
status = ::PEP_ILLEGAL_VALUE; status = ::PEP_ILLEGAL_VALUE;
} }
@ -122,21 +201,29 @@ namespace pEp {
return _session.get(); return _session.get();
} }
// public
void shutdown() void shutdown()
{ {
pEpLog("called"); pEpLog("called");
if (_sync_thread.joinable()) { if (_sync_thread.joinable()) {
pEpLog("sync_is_running - injecting null event"); pEpLog("sync_is_running - injecting null event");
_inject_sync_event(nullptr, nullptr); _queue_sync_event(nullptr, nullptr);
_sync_thread.join(); _sync_thread.join();
} }
} }
// public
bool is_sync_running() bool is_sync_running()
{ {
return _sync_thread.joinable(); if(!_adapter_manages_sync_thread) {
return _sync_thread.joinable();
} else {
return false;
}
} }
// public
// Works even if adapter is managing sync thread, BUT must be using this queue
bool in_shutdown() bool in_shutdown()
{ {
SYNC_EVENT ev; SYNC_EVENT ev;

22
src/Adapter.hh

@ -26,7 +26,25 @@ namespace pEp {
}; };
namespace Adapter { namespace Adapter {
int _inject_sync_event(::SYNC_EVENT ev, void *management); // public
enum class SyncModes
{
Off,
Sync,
Async
};
void sync_initialize(
SyncModes mode,
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
bool adapter_manages_sync_thread);
void set_sync_mode(SyncModes mode);
int _queue_sync_event(::SYNC_EVENT ev, void *management);
int _process_sync_event(::SYNC_EVENT ev, void *management);
::PEP_STATUS _ensure_passphrase(::PEP_SESSION session, const char *fpr); ::PEP_STATUS _ensure_passphrase(::PEP_SESSION session, const char *fpr);
template<class T = void> template<class T = void>
@ -46,7 +64,7 @@ namespace pEp {
enum session_action enum session_action
{ {
init, init,
release release,
}; };
class Session { class Session {

10
src/Adapter.hxx

@ -19,7 +19,7 @@ namespace pEp {
extern std::thread _sync_thread; extern std::thread _sync_thread;
extern ::utility::locked_queue<::SYNC_EVENT, ::free_Sync_event> sync_evt_q; extern ::utility::locked_queue<::SYNC_EVENT, ::free_Sync_event> sync_evt_q;
extern std::mutex m; extern std::mutex mut;
::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold); ::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold);
@ -29,13 +29,14 @@ namespace pEp {
/* /*
* Sync Thread * Sync Thread
* 1. Execute registered startup function * 1. Execute registered startup function
* 2. Create session for the sync thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase) * 2. Create session for the sync thread (registers: messageToSend, inject_sync_event, ensure_passphrase)
* 3. register_sync_callbacks() (registers: _notifyHandshake, _retrieve_next_sync_event) * 3. register_sync_callbacks() (registers: _notifyHandshake, _retrieve_next_sync_event)
* 4. Enter Sync Event Dispatching Loop (do_sync_protocol()) * 4. Enter Sync Event Dispatching Loop (do_sync_protocol())
* 5. unregister_sync_callbacks() * 5. unregister_sync_callbacks()
* 6. Release the session * 6. Release the session
* 7. Execute registered shutdown function * 7. Execute registered shutdown function
*/ */
// private
template<class T> template<class T>
void sync_thread(T *obj, function<void(T *)> _startup, function<void(T *)> _shutdown) void sync_thread(T *obj, function<void(T *)> _startup, function<void(T *)> _shutdown)
{ {
@ -96,11 +97,12 @@ namespace pEp {
/* /*
* Sync Thread Startup * Sync Thread Startup
* 1. re-initialize session for the main thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase) * 1. ensure session for the main thread (registers: messageToSend, _queue_sync_event, _ensure_passphrase)
* 2. Start the sync thread * 2. Start the sync thread
* 3. Defer execution until sync thread register_sync_callbacks() has returned * 3. Defer execution until sync thread register_sync_callbacks() has returned
* 4. Throw pending exception from the sync thread * 4. Throw pending exception from the sync thread
*/ */
// private
template<class T> template<class T>
void startup( void startup(
::messageToSend_t messageToSend, ::messageToSend_t messageToSend,
@ -118,7 +120,7 @@ namespace pEp {
_notifyHandshake = notifyHandshake; _notifyHandshake = notifyHandshake;
} }
pEpLog("ensure session for the main thread"); pEpLog("ensure session for the main thread");
// 1. re-initialize session for the main thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase) // 1. re-initialize session for the main thread (registers: messageToSend, _queue_sync_event, _ensure_passphrase)
session(release); session(release);
session(init); session(init);

Loading…
Cancel
Save