From 1a14b1a205ead399611bde6514a343dd1e7c0740 Mon Sep 17 00:00:00 2001 From: heck Date: Sat, 27 Mar 2021 05:08:32 +0100 Subject: [PATCH] Adapter.*: Add Sync/Async switchable Event execution. rework needed, higher level API, as a class. --- .clang-format | 1 - src/Adapter.cc | 105 +++++++++++++++++++++++++++++++++++++++++++----- src/Adapter.hh | 22 +++++++++- src/Adapter.hxx | 10 +++-- 4 files changed, 122 insertions(+), 16 deletions(-) diff --git a/.clang-format b/.clang-format index c893fb9..cfbf3de 100644 --- a/.clang-format +++ b/.clang-format @@ -1,4 +1,3 @@ -# Generated from CLion C/C++ Code Style settings BasedOnStyle: LLVM Language: Cpp DerivePointerAlignment: true diff --git a/src/Adapter.cc b/src/Adapter.cc index baf0254..cb1b4c3 100644 --- a/src/Adapter.cc +++ b/src/Adapter.cc @@ -42,19 +42,96 @@ namespace pEp { } namespace Adapter { + // private + SyncModes _sync_mode = SyncModes::Async; ::messageToSend_t _messageToSend = nullptr; ::notifyHandshake_t _notifyHandshake = nullptr; + bool _adapter_manages_sync_thread = false; + ::inject_sync_event_t _inject_action = _queue_sync_event; std::thread _sync_thread; - ::utility::locked_queue sync_evt_q; - std::mutex m; + std::mutex mut; + // private std::thread::id sync_thread_id() { return _sync_thread.get_id(); } - int _inject_sync_event(::SYNC_EVENT ev, void *management) + // public + void sync_initialize( + SyncModes mode, + ::messageToSend_t messageToSend, + ::notifyHandshake_t notifyHandshake, + bool adapter_manages_sync_thread) + { + _messageToSend = messageToSend; + _notifyHandshake = notifyHandshake; + _adapter_manages_sync_thread = adapter_manages_sync_thread; + set_sync_mode(mode); + return; + } + + // public + void set_sync_mode(SyncModes mode) + { +// std::lock_guard lock(mut); + _sync_mode = mode; + if (_sync_mode == SyncModes::Sync) { + // init sesssion with inject_sync = process + // stop sync + session(release); + _inject_action = _process_sync_event; + session(init); + ::register_sync_callbacks(session(), nullptr, _notifyHandshake, _retrieve_next_sync_event); + if(!_adapter_manages_sync_thread) { + shutdown(); + } else { + // The adapter need to shutdown sync thread + } + } + if (_sync_mode == SyncModes::Async) { + // init session with inject_sync = queue + // start sync thread + session(release); + _inject_action = _queue_sync_event; + session(init); + if(!_adapter_manages_sync_thread) { + if (!is_sync_running()) { + startup(_messageToSend, _notifyHandshake, nullptr, nullptr); + } + } else { + // The adapter need to do sync thread start up + } + } + if (_sync_mode == SyncModes::Off) { + // init sesssion with inject_sync = null + // stop sync thread + if(!_adapter_manages_sync_thread) { + shutdown(); + } else { + // Adapter needs to shutdown sync thread + } + session(release); + _inject_action = _queue_sync_event; + session(init); + } + return; + } + + // private + int _process_sync_event(::SYNC_EVENT ev, void *management) + { + if (ev != nullptr) { + ::do_sync_protocol_step(session(), nullptr, ev); + return 0; + } else { + return 0; + } + } + + // private + int _queue_sync_event(::SYNC_EVENT ev, void *management) { try { if (ev == nullptr) { @@ -69,12 +146,13 @@ namespace pEp { return 0; } + // private PEP_STATUS _ensure_passphrase(::PEP_SESSION session, const char *fpr) { return passphrase_cache.ensure_passphrase(session, fpr); } - // threshold: max waiting time in seconds + // public ::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold) { ::SYNC_EVENT syncEvent = nullptr; @@ -87,14 +165,16 @@ namespace pEp { return syncEvent; } + // public bool on_sync_thread() { return _sync_thread.get_id() == this_thread::get_id(); } + // public ::PEP_SESSION Session::operator()(session_action action) { - std::lock_guard lock(m); + std::lock_guard lock(mut); ::PEP_STATUS status = ::PEP_STATUS_OK; @@ -108,12 +188,11 @@ namespace pEp { case init: if (!_session.get()) { ::PEP_SESSION session_; - status = ::init(&session_, _messageToSend, _inject_sync_event, _ensure_passphrase); + status = ::init(&session_, _messageToSend, _inject_action, _ensure_passphrase); throw_status(status); _session = SessionPtr{session_, ::release}; } break; - default: status = ::PEP_ILLEGAL_VALUE; } @@ -122,21 +201,29 @@ namespace pEp { return _session.get(); } + // public void shutdown() { pEpLog("called"); if (_sync_thread.joinable()) { pEpLog("sync_is_running - injecting null event"); - _inject_sync_event(nullptr, nullptr); + _queue_sync_event(nullptr, nullptr); _sync_thread.join(); } } + // public bool is_sync_running() { - return _sync_thread.joinable(); + if(!_adapter_manages_sync_thread) { + return _sync_thread.joinable(); + } else { + return false; + } } + // public + // Works even if adapter is managing sync thread, BUT must be using this queue bool in_shutdown() { SYNC_EVENT ev; diff --git a/src/Adapter.hh b/src/Adapter.hh index b02252e..e63b34a 100644 --- a/src/Adapter.hh +++ b/src/Adapter.hh @@ -26,7 +26,25 @@ namespace pEp { }; namespace Adapter { - int _inject_sync_event(::SYNC_EVENT ev, void *management); + // public + enum class SyncModes + { + Off, + Sync, + Async + }; + + void sync_initialize( + SyncModes mode, + ::messageToSend_t messageToSend, + ::notifyHandshake_t notifyHandshake, + bool adapter_manages_sync_thread); + + void set_sync_mode(SyncModes mode); + + int _queue_sync_event(::SYNC_EVENT ev, void *management); + int _process_sync_event(::SYNC_EVENT ev, void *management); + ::PEP_STATUS _ensure_passphrase(::PEP_SESSION session, const char *fpr); template @@ -46,7 +64,7 @@ namespace pEp { enum session_action { init, - release + release, }; class Session { diff --git a/src/Adapter.hxx b/src/Adapter.hxx index f2050ba..80f4eb2 100644 --- a/src/Adapter.hxx +++ b/src/Adapter.hxx @@ -19,7 +19,7 @@ namespace pEp { extern std::thread _sync_thread; extern ::utility::locked_queue<::SYNC_EVENT, ::free_Sync_event> sync_evt_q; - extern std::mutex m; + extern std::mutex mut; ::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold); @@ -29,13 +29,14 @@ namespace pEp { /* * Sync Thread * 1. Execute registered startup function - * 2. Create session for the sync thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase) + * 2. Create session for the sync thread (registers: messageToSend, inject_sync_event, ensure_passphrase) * 3. register_sync_callbacks() (registers: _notifyHandshake, _retrieve_next_sync_event) * 4. Enter Sync Event Dispatching Loop (do_sync_protocol()) * 5. unregister_sync_callbacks() * 6. Release the session * 7. Execute registered shutdown function */ + // private template void sync_thread(T *obj, function _startup, function _shutdown) { @@ -96,11 +97,12 @@ namespace pEp { /* * Sync Thread Startup - * 1. re-initialize session for the main thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase) + * 1. ensure session for the main thread (registers: messageToSend, _queue_sync_event, _ensure_passphrase) * 2. Start the sync thread * 3. Defer execution until sync thread register_sync_callbacks() has returned * 4. Throw pending exception from the sync thread */ + // private template void startup( ::messageToSend_t messageToSend, @@ -118,7 +120,7 @@ namespace pEp { _notifyHandshake = notifyHandshake; } pEpLog("ensure session for the main thread"); - // 1. re-initialize session for the main thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase) + // 1. re-initialize session for the main thread (registers: messageToSend, _queue_sync_event, _ensure_passphrase) session(release); session(init);