Browse Source

use .joinable() instead of pointer value to detect if sync is running

pull/1/head Release_2.1.0-RC24
Volker Birk 5 years ago
parent
commit
0260722d6c
  1. 32
      Adapter.cc
  2. 11
      Adapter.hxx
  3. 3
      callback_dispatcher.cc
  4. 50
      test/framework.cc
  5. 9
      test/test_leave_device_group.cc

32
Adapter.cc

@ -38,17 +38,14 @@ namespace pEp {
namespace Adapter { namespace Adapter {
messageToSend_t _messageToSend = nullptr; messageToSend_t _messageToSend = nullptr;
notifyHandshake_t _notifyHandshake = nullptr; notifyHandshake_t _notifyHandshake = nullptr;
std::thread *_sync_thread = nullptr; std::thread _sync_thread;
::utility::locked_queue< SYNC_EVENT, ::free_Sync_event > q; ::utility::locked_queue< SYNC_EVENT, ::free_Sync_event > q;
std::mutex m; std::mutex m;
std::thread::id sync_thread_id() std::thread::id sync_thread_id()
{ {
if (_sync_thread) return _sync_thread.get_id();
return _sync_thread->get_id();
else
return std::thread::id();
} }
int _inject_sync_event(SYNC_EVENT ev, void *management) int _inject_sync_event(SYNC_EVENT ev, void *management)
@ -65,21 +62,6 @@ namespace pEp {
catch (exception&) { catch (exception&) {
return 1; return 1;
} }
if (ev == nullptr) {
if (!on_sync_thread()) {
if(_sync_thread->joinable()) {
pEpLog("Waiting for Sync thread to join...");
_sync_thread->join();
delete _sync_thread;
_sync_thread = nullptr;
pEpLog("...thread joined");
q.clear();
} else {
//FATAL
pEpLog("FATAL: sync thread not joinable/detached");
}
}
}
return 0; return 0;
} }
@ -102,10 +84,7 @@ namespace pEp {
bool on_sync_thread() bool on_sync_thread()
{ {
if (_sync_thread && _sync_thread->get_id() == this_thread::get_id()) return _sync_thread.get_id() == this_thread::get_id();
return true;
else
return false;
} }
PEP_SESSION session(session_action action) PEP_SESSION session(session_action action)
@ -140,15 +119,16 @@ namespace pEp {
void shutdown() void shutdown()
{ {
pEpLog("called"); pEpLog("called");
if (_sync_thread) { if (_sync_thread.joinable()) {
pEpLog("sync_is_running - injecting null event"); pEpLog("sync_is_running - injecting null event");
_inject_sync_event(nullptr, nullptr); _inject_sync_event(nullptr, nullptr);
_sync_thread.join();
} }
} }
bool is_sync_running() bool is_sync_running()
{ {
return _sync_thread != nullptr; return _sync_thread.joinable();
} }
bool in_shutdown() bool in_shutdown()

11
Adapter.hxx

@ -14,7 +14,7 @@ namespace pEp {
extern messageToSend_t _messageToSend; extern messageToSend_t _messageToSend;
extern notifyHandshake_t _notifyHandshake; extern notifyHandshake_t _notifyHandshake;
extern std::thread *_sync_thread; extern std::thread _sync_thread;
extern ::utility::locked_queue< SYNC_EVENT, ::free_Sync_event > q; extern ::utility::locked_queue< SYNC_EVENT, ::free_Sync_event > q;
extern std::mutex m; extern std::mutex m;
@ -75,17 +75,14 @@ namespace pEp {
session(); session();
if (!_sync_thread) { if (!_sync_thread.joinable()) {
register_done = false; register_done = false;
_sync_thread = new std::thread(sync_thread<T>, obj, _startup, _shutdown); _sync_thread = std::thread(sync_thread<T>, obj, _startup, _shutdown);
while (!register_done) while (!register_done)
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (_ex) { if (_ex)
delete _sync_thread;
_sync_thread = nullptr;
std::rethrow_exception(_ex); std::rethrow_exception(_ex);
}
} }
} }
} }

3
callback_dispatcher.cc

@ -81,8 +81,7 @@ namespace pEp {
void CallbackDispatcher::stop_sync() void CallbackDispatcher::stop_sync()
{ {
callback_dispatcher.semaphore.stop(); callback_dispatcher.semaphore.stop();
Adapter::q.clear(); Adapter::shutdown();
Adapter::q.push_back(nullptr);
callback_dispatcher.semaphore.go(); callback_dispatcher.semaphore.go();
for (auto target : callback_dispatcher.targets) { for (auto target : callback_dispatcher.targets) {

50
test/framework.cc

@ -22,7 +22,7 @@
pEp::Test::Transport pEp::Test::transport; pEp::Test::Transport pEp::Test::transport;
std::string pEp::Test::path; std::string pEp::Test::path;
extern std::thread *pEp::Adapter::_sync_thread; extern std::thread pEp::Adapter::_sync_thread;
namespace pEp { namespace pEp {
namespace Test { namespace Test {
@ -127,33 +127,41 @@ namespace pEp {
PEP_decrypt_flags_t flags = 0; PEP_decrypt_flags_t flags = 0;
PEP_STATUS status = ::decrypt_message(session(), msg.get(), &_dst, &keylist, &rating, &flags); PEP_STATUS status = ::decrypt_message(session(), msg.get(), &_dst, &keylist, &rating, &flags);
throw_status(status); throw_status(status);
Message dst = make_message(_dst);
Message dst;
for (auto a = _dst->attachments; a && a->value; a = a->next) { if (_dst)
if (string("application/pEp.sync") == a->mime_type) { dst = make_message(_dst);
char *_text; else
status = PER_to_XER_Sync_msg(a->value, a->size, &_text); dst = msg;
throw_status(status);
text += _text; if (dst.get()->attachments) {
pEp_free(_text); for (auto a = dst.get()->attachments; a && a->value; a = a->next) {
return text; if (string("application/pEp.sync") == a->mime_type) {
} char *_text;
else if (string("application/pEp.distribution") == a->mime_type) { status = PER_to_XER_Sync_msg(a->value, a->size, &_text);
char *_text; throw_status(status);
status = PER_to_XER_Distribution_msg(a->value, a->size, &_text); text += _text;
throw_status(status); pEp_free(_text);
text += _text; return text;
pEp_free(_text); }
return text; else if (string("application/pEp.distribution") == a->mime_type) {
char *_text;
status = PER_to_XER_Distribution_msg(a->value, a->size, &_text);
throw_status(status);
text += _text;
pEp_free(_text);
return text;
}
} }
} }
return text; return text;
} }
void join_sync_thread() void join_sync_thread()
{ {
_sync_thread->join(); if (_sync_thread.joinable())
_sync_thread.join();
} }
Message Transport::recv() Message Transport::recv()

9
test/test_leave_device_group.cc

@ -1,4 +1,5 @@
#include <iostream> #include <iostream>
#include <unistd.h>
#include "framework.hh" #include "framework.hh"
#include "passphrase_cache.hh" #include "passphrase_cache.hh"
@ -76,6 +77,14 @@ int main(int argc, char **argv)
// wait for sync shutdown and release first session // wait for sync shutdown and release first session
Test::join_sync_thread(); Test::join_sync_thread();
// switch off and on again
CallbackDispatcher::start_sync();
sleep(2);
CallbackDispatcher::stop_sync();
Test::join_sync_thread();
session(Adapter::release); session(Adapter::release);
return 0; return 0;

Loading…
Cancel
Save