Browse Source

Sync thread handling could be simplified.

Remove the barrier waiting for FSM init. This COULD be a problem.
But currently its not possible anymore because of the design.
IF we need to this, one option is to have another callback from the engine. Or move FSM init out of do_sync_protocol().
pull/15/head
heck 2 years ago
parent
commit
04a73cce6f
  1. 55
      src/Adapter.hxx

55
src/Adapter.hxx

@ -16,12 +16,6 @@ namespace pEp {
extern std::thread _sync_thread;
extern ::utility::locked_queue<::SYNC_EVENT, ::free_Sync_event> sync_evt_q;
extern std::mutex mut;
::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold);
static std::exception_ptr _ex;
static std::atomic_bool register_done{false};
/*
* Sync Thread
@ -34,10 +28,9 @@ namespace pEp {
*/
// private
template<class T>
void sync_thread(Session *rhs, T *obj, function<void(T *)> _startup, function<void(T *)> _shutdown)
void sync_thread(T *obj, function<void(T *)> _startup, function<void(T *)> _shutdown)
{
pEpLog("called");
_ex = nullptr;
// 1. Execute registered startup function
if (obj && _startup) {
@ -46,28 +39,10 @@ namespace pEp {
// 2. Create session for the sync thread
pEpLog("creating session for the sync thread");
session.initialize(
rhs->_sync_mode,
rhs->_adapter_manages_sync_thread,
rhs->_messageToSend,
rhs->_notifyHandshake);
try {
// 3. register_sync_callbacks()
::PEP_STATUS status = ::register_sync_callbacks(
session(),
nullptr,
session._notifyHandshake,
_retrieve_next_sync_event);
throw_status(status);
register_done.store(true);
} catch (...) {
_ex = std::current_exception();
register_done.store(true);
return;
}
session();
// 3. Enter Sync Event Processing Loop (do_sync_protocol())
// this internally calls _retrieve_next_sync_event
pEpLog("sync protocol loop started");
::do_sync_protocol(session(), (void *)obj);
pEpLog("sync protocol loop ended");
@ -90,26 +65,10 @@ namespace pEp {
void startup(T *obj, std::function<void(T *)> _startup, std::function<void(T *)> _shutdown)
{
pEpLog("called");
// refresh the session
// due to partially initialized session, see session.initialize()
session.refresh();
if (!_sync_thread.joinable()) {
register_done.store(false);
pEpLog("creating sync-thread");
// 2. Start the sync thread
_sync_thread = std::thread(sync_thread<T>, &session, obj, _startup, _shutdown);
// 3. Defer execution until sync thread register_sync_callbacks() has returned
while (register_done.load() == false) {
pEpLog("waiting for sync-thread to init...");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 4. Throw pending exception from the sync thread
if (_ex) {
pEpLog("exception pending, rethrowing");
std::rethrow_exception(_ex);
}
_sync_thread = std::thread(sync_thread<T>, obj, _startup, _shutdown);
} else {
pEpLog("Sync thread already running");
}
}
} // namespace Adapter

Loading…
Cancel
Save