Browse Source

Merge branch 'PYADPT-110 - Add Mutlithreaded Sync Implementation' into Release_2.1

Complete Sync implementation in python using python threading.

from libpEpAdapter, use:
* Session
* CallbackDispatcher
* Sync Event Queue
    * retrieve_next_sync_event()
    * _inject_sync_event()

*Sync event processing loop*
Write single threaded implementation of the engine function "do_sync_protocol()" using:
* retrieve_next_sync_event() (libpEpAdapter)
* do_sync_protocol_step() (Engine)
The event loop has to be terminated on NULL event.

*Sync Thread*
Implement Sync thread in python. The Thread has to do:
* Init: register_sync_callbacks()
* Sync event processing loop ()
* Cleanup: unregister_sync_callbacks()

*Python API*
The python API stays backwards compatible.
* start_sync() is not allowed to return before "register_sync_callbacks()" in the sync thread has returned
* stop_sync() has to use _inject_sync_event from libpEpAdapter
* is_sync_active() reports if the python Sync thread is running

*Testing*
* Update [~fdik]s sync_test.py / sync_handshake.py to work with the new sync impl.
* Add a test for start_sync() / stop_sync() continuously
PYADPT-116
heck 4 years ago
parent
commit
aaaa54f6ad
  1. 1
      .gitignore
  2. 76
      src/pEp/__init__.py
  3. 121
      src/pEp/_pEp/pEpmodule.cc
  4. 4
      src/pEp/_pEp/pEpmodule.hh
  5. 35
      test/start_sync.py
  6. 73
      test/sync_handshake.py

1
.gitignore

@ -77,3 +77,4 @@ __pycache__/
# platform artifacts
.DS_store
/src/pEp/_pEp.cpython-38-darwin.so

76
src/pEp/__init__.py

@ -14,9 +14,11 @@ try:
from .__version__ import version as __version__
except ImportError:
import warnings
warnings.warn("Error loading build-time defined __version__.py, trying setuptools now...")
try:
import setuptools_scm
__version__ = setuptools_scm.get_version()
del setuptools_scm
except Exception:
@ -30,18 +32,36 @@ from ._pEp import *
# with an underscore (of _pEp), but we dont want to import them into this module
import pEp._pEp
# 3rd party imports
from threading import Thread, Barrier
from time import sleep
# Executed on module import
def init():
print(init, "called")
# print(init, "called")
_pEp._init_after_main_module()
def start_sync() -> None:
"""starts the sync thread"""
Sync.start_sync()
def shutdown_sync() -> None :
"""call this to shut down the sync thread"""
Sync.shutdown_sync()
def is_sync_active() -> bool:
"""True if sync is active, False otherwise"""
return Sync.getInstance().isAlive()
def message_to_send(msg):
"""
message_to_send(msg)
override pEp.message_to_send(msg) with your own implementation
this callback is being called when a pp management message needs to be sent
GIL CAVEAT
"""
print("message_to_send() - default callback\n")
print("overwrite this method")
@ -54,10 +74,60 @@ def notify_handshake(me, partner, signal):
partner identity of communication partner
signal the handshake signal
overwrite this method with an implementation of a handshake dialog
GIL CAVEAT
"""
print("notify_handshake() - default callback\n")
print("overwrite this method")
class Sync(Thread):
__instance:'Sync' = None
barr = Barrier(2)
def __init__(self):
if Sync.__instance != None:
raise Exception("singleton!")
else:
Sync.__instance = self
Thread.__init__(self)
@staticmethod
def getInstance() -> 'Sync':
if Sync.__instance == None:
Sync()
return Sync.__instance
def run(self):
pEp.register_sync_callbacks()
self.barr.wait()
while pEp.do_protocol_step():
sleep(1)
pEp.unregister_sync_callbacks()
def start(self):
"""
1. NEW ADLIB FUNC register_sync_callbacks() ONLY
2. create python thread that does do_sync_protocol()
3. NEWADLIB FUNC CallbackDispatcher.Signal_all_SYNC_NOTIFY_START()
"""
Thread.start(self)
self.barr.wait()
# pEp.notifyHandshake_sync_start()
# sleep(2)
@staticmethod
def start_sync():
Sync.getInstance().start()
@staticmethod
def shutdown_sync():
if Sync.__instance:
if Sync.__instance.isAlive():
pEp.inject_sync_shutdown()
Sync.__instance.join()
Sync.__instance = None
# pEp.notifyHandshake_sync_stop()
init()

121
src/pEp/_pEp/pEpmodule.cc

@ -37,11 +37,12 @@ namespace pEp {
pEpLog("called");
}
// hidden init function, wrapped by hello_world.init()
// hidden init function, wrapped by _pEp.init()
void _init_after_main_module() {
pEpLog("called");
callback_dispatcher.add(_messageToSend, notifyHandshake, nullptr, nullptr);
callback_dispatcher.add(_messageToSend, _notifyHandshake, nullptr, nullptr);
Adapter::_messageToSend = CallbackDispatcher::messageToSend;
Adapter::_notifyHandshake = CallbackDispatcher::notifyHandshake;
}
@ -96,6 +97,7 @@ namespace pEp {
}
}
// TODO: GIL handling isnt really required here, i think
PEP_STATUS _messageToSend(::message *msg) {
pEpLog("called");
try {
@ -103,7 +105,7 @@ namespace pEp {
pEpLog("GIL Aquired");
object modref = import("pEp");
object funcref = modref.attr("message_to_send");
call<void>(funcref.ptr(), Message());
call<void>(funcref.ptr(), Message(msg));
PyGILState_Release(gil);
pEpLog("GIL released");
} catch (exception &e) {}
@ -111,14 +113,15 @@ namespace pEp {
return PEP_STATUS_OK;
}
PEP_STATUS notifyHandshake(pEp_identity *me, pEp_identity *partner, sync_handshake_signal signal) {
// TODO: GIL handling isnt really required here, i think
PEP_STATUS _notifyHandshake(pEp_identity *me, pEp_identity *partner, sync_handshake_signal signal) {
pEpLog("called");
try {
PyGILState_STATE gil = PyGILState_Ensure();
pEpLog("GIL Aquired");
object modref = import("pEp");
object funcref = modref.attr("notify_handshake");
call<void>(funcref.ptr(), me, partner, signal);
call<void>(funcref.ptr(), Identity(me), Identity(partner), signal);
PyGILState_Release(gil);
pEpLog("GIL released");
} catch (exception &e) {}
@ -126,13 +129,43 @@ namespace pEp {
return PEP_STATUS_OK;
}
bool do_protocol_step() {
pEpLog("called");
SYNC_EVENT event = Adapter::_retrieve_next_sync_event(nullptr, 0);
if (event != NULL) {
::do_sync_protocol_step(Adapter::session(), (void *)&callback_dispatcher, event);
return true;
} else {
pEpLog("null event, signaling sync shutdown");
return false;
}
}
void register_sync_callbacks() {
pEpLog("called");
PEP_STATUS status = ::register_sync_callbacks(Adapter::session(), nullptr, Adapter::_notifyHandshake, Adapter::_retrieve_next_sync_event);
_throw_status(status);
}
void start_sync() {
CallbackDispatcher::start_sync();
void unregister_sync_callbacks() {
::unregister_sync_callbacks(Adapter::session());
}
void shutdown_sync() {
CallbackDispatcher::stop_sync();
void inject_sync_shutdown() {
pEpLog("injecting null event");
Adapter::_inject_sync_event(nullptr,nullptr);
}
// TODO: Integrate this (currently SEGFAULTING)
void notifyHandshake_sync_start() {
pEpLog("all targets signal: SYNC_NOTIFY_START");
CallbackDispatcher::notifyHandshake(nullptr, nullptr, SYNC_NOTIFY_START);
}
// TODO: Integrate this (currently SEGFAULTING)
void notifyHandshake_sync_stop() {
pEpLog("all targets signal: SYNC_NOTIFY_STOP");
CallbackDispatcher::notifyHandshake(nullptr, nullptr, SYNC_NOTIFY_STOP);
}
void debug_color(int ansi_color) {
@ -143,10 +176,6 @@ namespace pEp {
::leave_device_group(Adapter::session());
}
bool is_sync_active() {
return Adapter::is_sync_running();
}
void testfunc() {
_messageToSend(NULL);
}
@ -196,6 +225,27 @@ namespace pEp {
scope().attr("engine_version") = get_engine_version();
scope().attr("protocol_version") = get_protocol_version();
def("set_debug_log_enabled", &Adapter::pEpLog::set_enabled,
"Switch debug logging on/off");
def("register_sync_callbacks", register_sync_callbacks,
"");
def("unregister_sync_callbacks", unregister_sync_callbacks,
"");
def("do_protocol_step", do_protocol_step,
"");
def("inject_sync_shutdown", inject_sync_shutdown,
"");
def("notifyHandshake_sync_start", notifyHandshake_sync_start,
"");
def("notifyHandshake_sync_stop", notifyHandshake_sync_stop,
"");
def("passive_mode", config_passive_mode,
"do not attach pub keys to all messages");
@ -584,32 +634,6 @@ namespace pEp {
.value("SYNC_NOTIFY_SOLE", SYNC_NOTIFY_SOLE)
.value("SYNC_NOTIFY_IN_GROUP", SYNC_NOTIFY_IN_GROUP);
// auto user_interface_class = class_<UserInterface, UserInterface_callback, boost::noncopyable>(
// "UserInterface",
// "class MyUserInterface(UserInterface):\n"
// " def notifyHandshake(self, me, partner):\n"
// " ...\n"
// "\n"
// "p≡p User Interface class\n"
// "To be used as a mixin\n"
// )
// .def("notifyHandshake", &UserInterface::notifyHandshake,
// "notifyHandshake(self, me, partner)\n"
// "\n"
// " me own identity\n"
// " partner identity of communication partner\n"
// "\n"
// "overwrite this method with an implementation of a handshake dialog")
// .def("deliverHandshakeResult", &UserInterface::deliverHandshakeResult,
// boost::python::arg("identities")=object(),
// "deliverHandshakeResult(self, result, identities=None)\n"
// "\n"
// " result -1: cancel, 0: accepted, 1: rejected\n"
// " identities list of identities to share or None for all\n"
// "\n"
// "call to deliver the handshake result of the handshake dialog"
// );
def("deliver_handshake_result", &deliverHandshakeResult, boost::python::arg("identities")=object(),
"deliverHandshakeResult(self, result, identities=None)\n"
"\n"
@ -619,18 +643,6 @@ namespace pEp {
"call to deliver the handshake result of the handshake dialog"
);
def("start_sync", &start_sync,
"start_sync()\n"
"\n"
"starts the sync thread"
);
def("shutdown_sync", &shutdown_sync,
"shutdown_sync()\n"
"\n"
"call this from another thread to shut down the sync thread\n"
);
def("debug_color", &debug_color,
"for debug builds set ANSI color value");
@ -640,13 +652,6 @@ namespace pEp {
"call this for a grouped device, which should leave\n"
);
def("is_sync_active", &is_sync_active,
"is_sync_active()\n"
"\n"
"True if sync is active, False otherwise\n"
);
// codecs
call< object >(((object)(import("codecs").attr("register"))).ptr(), make_function(sync_search));
call< object >(((object)(import("codecs").attr("register"))).ptr(), make_function(distribution_search));

4
src/pEp/_pEp/pEpmodule.hh

@ -27,9 +27,9 @@ namespace pEp {
PEP_STATUS _messageToSend(::message *msg);
PEP_STATUS notifyHandshake(pEp_identity *me, pEp_identity *partner, sync_handshake_signal signal);
PEP_STATUS _notifyHandshake(pEp_identity *me, pEp_identity *partner, sync_handshake_signal signal);
} /* namespace PythonAdapter */
} /* namespace pEp */
#endif /* PEPMODULE_HH */
#endif /* PEPMODULE_HH */

35
test/start_sync.py

@ -0,0 +1,35 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pEp
import time
pEp.set_debug_log_enabled(True)
def msg2send(message):
print("MSG2SEND")
# print(message)
def handshake(me, partner, signal):
print("HANDSHAKE")
print(me.fpr)
print(partner.fpr)
print(signal)
pEp.message_to_send = msg2send
pEp.notify_handshake = handshake
alice = pEp.Identity("tedst@alice.com", "alice", "23")
pEp.myself(alice)
print(alice.fpr)
while True:
print("start_sync()")
pEp.start_sync()
print("Running...")
time.sleep(3)
print("shutdown_sync()")
pEp.shutdown_sync()
print("END")
time.sleep(3)

73
test/sync_handshake.py

@ -112,37 +112,36 @@ def add_debug_info(msg):
return msg
class UserInterface(pEp.UserInterface):
def notifyHandshake(self, me, partner, signal):
print(colored(str(signal), "yellow"), end=" ")
output("on " + device_name + "" if not me.fpr else
"for identities " + str(me.fpr) + " " + str(partner.fpr))
if me.fpr and partner.fpr:
assert me.fpr != partner.fpr
if signal in (
pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_ADD_OTHER_DEVICE,
pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_ADD_OUR_DEVICE,
pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_FORM_GROUP
):
if isinstance(end_on, list):
end_on.extend([
pEp.sync_handshake_signal.SYNC_NOTIFY_SOLE,
pEp.sync_handshake_signal.SYNC_NOTIFY_IN_GROUP,
])
sleep(.5) # user is reading message
try:
if not options.noanswer:
if options.reject:
self.deliverHandshakeResult(SYNC_HANDSHAKE_REJECTED)
else:
self.deliverHandshakeResult(SYNC_HANDSHAKE_ACCEPTED)
except NameError:
self.deliverHandshakeResult(SYNC_HANDSHAKE_ACCEPTED)
if signal in end_on:
global the_end
the_end = True
def this_notifyHandshake(me, partner, signal):
print(colored(str(signal), "yellow"), end=" ")
output("on " + device_name + "" if not me.fpr else
"for identities " + str(me.fpr) + " " + str(partner.fpr))
if me.fpr and partner.fpr:
assert me.fpr != partner.fpr
if signal in (
pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_ADD_OTHER_DEVICE,
pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_ADD_OUR_DEVICE,
pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_FORM_GROUP
):
if isinstance(end_on, list):
end_on.extend([
pEp.sync_handshake_signal.SYNC_NOTIFY_SOLE,
pEp.sync_handshake_signal.SYNC_NOTIFY_IN_GROUP,
])
sleep(.5) # user is reading message
try:
if not options.noanswer:
if options.reject:
pEp.deliver_handshake_result(SYNC_HANDSHAKE_REJECTED)
else:
pEp.deliver_handshake_result(SYNC_HANDSHAKE_ACCEPTED)
except NameError:
pEp.deliver_handshake_result(SYNC_HANDSHAKE_ACCEPTED)
if signal in end_on:
global the_end
the_end = True
def shutdown_sync():
@ -153,6 +152,7 @@ def run(name, color=None, imap=False, own_ident=1, leave=False):
global device_name
device_name = name
pEp.notify_handshake = this_notifyHandshake
if color:
global output
@ -170,7 +170,7 @@ def run(name, color=None, imap=False, own_ident=1, leave=False):
me = pEp.Identity(imap_settings.IMAP_EMAIL, name + " of " + imap_settings.IMAP_USER, name)
pEp.myself(me)
pEp.messageToSend = messageImapToSend
pEp.message_to_send = messageImapToSend
else:
me = pEp.Identity("alice@peptest.ch", name + " of Alice Neuman", name)
pEp.myself(me)
@ -183,22 +183,19 @@ def run(name, color=None, imap=False, own_ident=1, leave=False):
me3 = pEp.Identity("alice@pep.foundation", name + " of Alice Neuman", name)
pEp.myself(me3)
pEp.messageToSend = messageToSend
pEp.message_to_send = messageToSend
if multithreaded:
from threading import Thread
def sync_thread():
print(colored("********* ", "yellow") + colored("sync_thread entered", color))
ui = UserInterface()
print(colored("********* ", "yellow") + colored("UserInterface object created", color))
pEp.do_sync_protocol()
pEp.Sync.run()
print(colored("********* ", "yellow") + colored("leaving sync_thread", color))
sync = Thread(target=sync_thread)
sync.start()
else:
pEp.script_is_implementing_sync()
sync = None
ui = UserInterface()
pEp.start_sync();
try:
if leave:

Loading…
Cancel
Save