From 080076582ecaa07ff7c89bf601d31e2ccf7dd0b1 Mon Sep 17 00:00:00 2001 From: heck Date: Mon, 12 Jul 2021 01:10:35 +0200 Subject: [PATCH] API: session(init/release) removed. Before using the static object Adapter::session, initialize it using session.initialize(). Release it using session.release(). --- src/Adapter.cc | 188 +++++++++++++++++++++++++----------------------- src/Adapter.hh | 65 +++++++++++------ src/Adapter.hxx | 69 ++++++------------ 3 files changed, 162 insertions(+), 160 deletions(-) diff --git a/src/Adapter.cc b/src/Adapter.cc index 30fe078..359e1aa 100644 --- a/src/Adapter.cc +++ b/src/Adapter.cc @@ -8,6 +8,9 @@ #include "status_to_string.hh" #include "pEpLog.hh" #include "passphrase_cache.hh" +#include "callback_dispatcher.hh" +#include "group_manager_api.h" +#include using namespace std; @@ -36,18 +39,12 @@ namespace pEp { throw RuntimeError(_status, status); } - RuntimeError::RuntimeError(const std::string &_text, ::PEP_STATUS _status) - : std::runtime_error(_text.c_str()), text(_text), status(_status) + RuntimeError::RuntimeError(const std::string &_text, ::PEP_STATUS _status) : + std::runtime_error(_text.c_str()), text(_text), status(_status) { } namespace Adapter { - // private - SyncModes _sync_mode = SyncModes::Async; - ::messageToSend_t _messageToSend = nullptr; - ::notifyHandshake_t _notifyHandshake = nullptr; - bool _adapter_manages_sync_thread = false; - ::inject_sync_event_t _inject_action = _inject_sync_event; std::thread _sync_thread; ::utility::locked_queue sync_evt_q; std::mutex mut; @@ -58,67 +55,6 @@ namespace pEp { return _sync_thread.get_id(); } - // public - void sync_initialize( - SyncModes mode, - ::messageToSend_t messageToSend, - ::notifyHandshake_t notifyHandshake, - bool adapter_manages_sync_thread) - { - _messageToSend = messageToSend; - _notifyHandshake = notifyHandshake; - _adapter_manages_sync_thread = adapter_manages_sync_thread; - set_sync_mode(mode); - return; - } - - // public - void set_sync_mode(SyncModes mode) - { -// std::lock_guard lock(mut); - _sync_mode = mode; - if (_sync_mode == SyncModes::Sync) { - // init session with inject_sync = process - // stop sync - session(release); - _inject_action = _process_sync_event; - session(init); - ::register_sync_callbacks(session(), nullptr, _notifyHandshake, _retrieve_next_sync_event); - if(!_adapter_manages_sync_thread) { - shutdown(); - } else { - // The adapter need to shutdown sync thread - } - } - if (_sync_mode == SyncModes::Async) { - // init session with inject_sync = queue - // start sync thread - session(release); - _inject_action = _inject_sync_event; - session(init); - if(!_adapter_manages_sync_thread) { - if (!is_sync_running()) { - startup(_messageToSend, _notifyHandshake, nullptr, nullptr); - } - } else { - // The adapter need to do sync thread start up - } - } - if (_sync_mode == SyncModes::Off) { - // init sesssion with inject_sync = null - // stop sync thread - if(!_adapter_manages_sync_thread) { - shutdown(); - } else { - // Adapter needs to shutdown sync thread - } - session(release); - _inject_action = _inject_sync_event; - session(init); - } - return; - } - // private int _process_sync_event(::SYNC_EVENT ev, void *management) { @@ -171,34 +107,104 @@ namespace pEp { return _sync_thread.get_id() == this_thread::get_id(); } - // public - ::PEP_SESSION Session::operator()(session_action action) + // --------------------------------------------------------------------------------------- + Session::Session() : + _messageToSend{ nullptr }, _notifyHandshake{ nullptr }, _sync_mode{ SyncModes::Async }, + _adapter_manages_sync_thread{ false } { - std::lock_guard lock(mut); + } - ::PEP_STATUS status = ::PEP_STATUS_OK; + void Session::initialize(SyncModes sync_mode, bool adapter_manages_sync_thread) + { + pEpLog("Initializing session with CallbackDispatcher..."); + _init( + pEp::CallbackDispatcher::messageToSend, + pEp::CallbackDispatcher::notifyHandshake, + sync_mode, + adapter_manages_sync_thread); + } - switch (action) { - case release: - if (_session.get()) { - _session = nullptr; - } - break; + void Session::initialize( + SyncModes sync_mode, + bool adapter_manages_sync_thread, + ::messageToSend_t messageToSend, + ::notifyHandshake_t notifyHandshake) + { + pEpLog("Initializing session..."); + _init(messageToSend, notifyHandshake, sync_mode, adapter_manages_sync_thread); + } - case init: - if (!_session.get()) { - ::PEP_SESSION session_; - status = ::init(&session_, _messageToSend, _inject_action, _ensure_passphrase); - throw_status(status); - _session = SessionPtr{session_, ::release}; - } - break; - default: - status = ::PEP_ILLEGAL_VALUE; + void Session::_init( + ::messageToSend_t messageToSend, + ::notifyHandshake_t notifyHandshake, + SyncModes sync_mode, + bool adapter_manages_sync_thread) + { + // cache the values for sync-thread session creation + _messageToSend = messageToSend; + _notifyHandshake = notifyHandshake; + _sync_mode = sync_mode; + _adapter_manages_sync_thread = adapter_manages_sync_thread; + refresh(); + ::adapter_group_init(); + } + + void Session::refresh() + { + std::lock_guard lock(mut); + release(); + + // Switch to mode "Sync" ensures the sync thread to be shutdown + if (_sync_mode == SyncModes::Sync) { + // process the event directly + _inject_action = _process_sync_event; + if (!_adapter_manages_sync_thread) { + stop_sync(); + } else { + // The adapter needs to shutdown sync thread + } + } + // Switch to mode "ASync", sync thread needs to be started using start_sync + if (_sync_mode == SyncModes::Async) { + // put the event on queue + _inject_action = _inject_sync_event; } + // create + ::PEP_SESSION session_; + ::PEP_STATUS status; + status = ::init(&session_, _messageToSend, _inject_action, _ensure_passphrase); throw_status(status); - return _session.get(); + status = ::register_sync_callbacks( + session_, + nullptr, + _notifyHandshake, + _retrieve_next_sync_event); + if (status != PEP_STATUS_OK) { + pEpLog("libpEpAdapter: WARNING - session is initialized but without sync/callbacks. " + "This is normal if there are no own identities yet. Call session.init() again to " + "re-initialize the session after creating an own identity."); + } + // store + _session = SessionPtr{ session_, ::release }; + } + + void Session::release() + { + if (_session.get()) { + _session = nullptr; + } + } + + // public + ::PEP_SESSION Session::operator()() + { + if (!_session.get()) { + throw std::runtime_error( + "libpEpAdapter: No session! Before use, call session.initialize() for each thread"); + } else { + return _session.get(); + } } // public @@ -230,7 +236,7 @@ namespace pEp { // public bool is_sync_running() { - if(!_adapter_manages_sync_thread) { + if (!session._adapter_manages_sync_thread) { return _sync_thread.joinable(); } else { return false; diff --git a/src/Adapter.hh b/src/Adapter.hh index 3f6f2b9..deda41a 100644 --- a/src/Adapter.hh +++ b/src/Adapter.hh @@ -11,6 +11,7 @@ #include #include +#include "callback_dispatcher.hh" namespace pEp { @@ -29,19 +30,10 @@ namespace pEp { // public enum class SyncModes { - Off, Sync, Async }; - void sync_initialize( - SyncModes mode, - ::messageToSend_t messageToSend, - ::notifyHandshake_t notifyHandshake, - bool adapter_manages_sync_thread); - - void set_sync_mode(SyncModes mode); - int _inject_sync_event(::SYNC_EVENT ev, void *management); int _process_sync_event(::SYNC_EVENT ev, void *management); @@ -51,8 +43,6 @@ namespace pEp { template void startup( - messageToSend_t messageToSend, - notifyHandshake_t notifyHandshake, T *obj = nullptr, std::function _startup = nullptr, std::function _shutdown = nullptr); @@ -63,18 +53,53 @@ namespace pEp { // returns the thread id of the sync thread std::thread::id sync_thread_id(); - enum session_action - { - init, - release, - }; - class Session { + public: + // TODO: needed because libpEpAdapter provides a static instance + // the session needs to be initialized in order to be usable. + Session(); + // Init using CallbackDispatcher + // CAUTION: This may result in a partially initialized session. + // If there are any problem with register_sync_callbacks(), it will still + // succeed. (e.g. due to no own identities yet) + // BUT + // * Sync will not work + // * Group Encryption will not work + // TODO: This needs to be resolved in the engine, new func register_callbacks() + // that is not sync specific, and move the sync-checks to "start-sync()" + void initialize(SyncModes sync_mode = SyncModes::Async, bool adapter_manages_sync_thread = false); + // Arbitrary callbacks + void initialize( + SyncModes sync_mode, + bool adapter_manages_sync_thread, + ::messageToSend_t messageToSend, + ::notifyHandshake_t notifyHandshake); + + // re-creates the session using same values + void refresh(); + + // Not copyable + Session(const Session &) = delete; + Session operator=(const Session&) = delete; + + void release(); + PEP_SESSION operator()(); + + SyncModes _sync_mode; + ::messageToSend_t _messageToSend; + ::notifyHandshake_t _notifyHandshake; + bool _adapter_manages_sync_thread; + ::inject_sync_event_t _inject_action; + + private: + void _init( + ::messageToSend_t messageToSend, + ::notifyHandshake_t notifyHandshake, + SyncModes sync_mode, + bool adapter_manages_sync_thread); + using SessionPtr = std::unique_ptr<_pEpSession, std::function>; SessionPtr _session = nullptr; - - public: - PEP_SESSION operator()(session_action action = init); }; extern thread_local Session session; diff --git a/src/Adapter.hxx b/src/Adapter.hxx index 3878b2b..f5732ad 100644 --- a/src/Adapter.hxx +++ b/src/Adapter.hxx @@ -14,8 +14,6 @@ namespace pEp { namespace Adapter { using std::function; - extern ::messageToSend_t _messageToSend; - extern ::notifyHandshake_t _notifyHandshake; extern std::thread _sync_thread; extern ::utility::locked_queue<::SYNC_EVENT, ::free_Sync_event> sync_evt_q; @@ -38,12 +36,10 @@ namespace pEp { */ // private template - void sync_thread(T *obj, function _startup, function _shutdown) + void sync_thread(Session *rhs, T *obj, function _startup, function _shutdown) { pEpLog("called"); _ex = nullptr; - assert(_messageToSend); - assert(_notifyHandshake); // 1. Execute registered startup function if (obj && _startup) { @@ -52,29 +48,18 @@ namespace pEp { pEpLog("creating session for the sync thread"); // 2. Create session for the sync thread - session(); - - // 3. register_sync_callbacks() - { - // TODO: Do we need to use a passphraseWrap here??? - pEpLog("register_sync_callbacks()"); - ::PEP_STATUS status = ::register_sync_callbacks( - session(), - nullptr, - _notifyHandshake, - _retrieve_next_sync_event); - - pEpLog("register_sync_callbacks() return:" << status); - // Convert status into exception and store it - // set register_done AFTER that - try { - throw_status(status); - register_done.store(true); - } catch (...) { - _ex = std::current_exception(); - register_done.store(true); - return; - } + // 3. register_sync_callbacks() (in session.initialize()) + try { + session.initialize( + rhs->_sync_mode, + rhs->_adapter_manages_sync_thread, + rhs->_messageToSend, + rhs->_notifyHandshake); + register_done.store(true); + } catch (...) { + _ex = std::current_exception(); + register_done.store(true); + return; } pEpLog("sync protocol loop started"); @@ -86,8 +71,7 @@ namespace pEp { unregister_sync_callbacks(session()); // 6. Release the session - // TODO: Maybe do that AFTER shutdown? - session(release); + session.release(); // 7. Execute registered shutdown function if (obj && _shutdown) { @@ -97,38 +81,25 @@ namespace pEp { /* * Sync Thread Startup - * 1. ensure session for the main thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase) + * 1. throw if main thread session is not initialized * 2. Start the sync thread * 3. Defer execution until sync thread register_sync_callbacks() has returned * 4. Throw pending exception from the sync thread */ // private template - void startup( - ::messageToSend_t messageToSend, - ::notifyHandshake_t notifyHandshake, - T *obj, - function _startup, - function _shutdown) + void startup(T *obj, std::function _startup, std::function _shutdown) { pEpLog("called"); - if (messageToSend) { - _messageToSend = messageToSend; - } - - if (notifyHandshake) { - _notifyHandshake = notifyHandshake; - } - pEpLog("ensure session for the main thread"); - // 1. re-initialize session for the main thread (registers: messageToSend, _inject_sync_event, _ensure_passphrase) - session(release); - session(init); + // refresh the session + // due to partially initialized session, see session.initialize() + session.refresh(); if (!_sync_thread.joinable()) { register_done.store(false); pEpLog("creating sync-thread"); // 2. Start the sync thread - _sync_thread = std::thread(sync_thread, obj, _startup, _shutdown); + _sync_thread = std::thread(sync_thread, &session, obj, _startup, _shutdown); // 3. Defer execution until sync thread register_sync_callbacks() has returned while (register_done.load() == false) { pEpLog("waiting for sync-thread to init...");