diff --git a/test/mp_sync_test.py b/test/mp_sync_test.py index 413d6b3..e526637 100644 --- a/test/mp_sync_test.py +++ b/test/mp_sync_test.py @@ -8,43 +8,24 @@ DYLD_LIBRARY_PATH=/Users/ed/lib/ PYTHONPATH=`pwd`/../build/lib.macosx-10.11-x86_ import multipEp as mp -# unused -def send_message(from_address, to_address): - m = mp.pEp.outgoing_message(Identity(from_address, from_address)) - m.to = [mp.pEp.Identity(to_address, to_address)] - m.shortmsg = "Hello" - m.longmsg = "Something\\n" - m.encrypt() - mp.sent_messages.append(str(m)) scenario0 = [ - #("instance name", ["func name", [args], {kwargs}]), + #("instance name", [func, [args], {kwargs}]), ("A", [mp.create_account, ["some.one@some.where", "Some One"]]), ("B", [mp.create_account, ["some.one@some.where", "Some One"]]), - ("A", []), - ("B", []), - ("A", []), - ("B", []), - ("A", []), - ("B", []), + (mp.cycle_until_no_change, ["A", "B"]), ("C", [mp.create_account, ["some.one@some.where", "Some One"]]), - ("A", []), - ("B", []), - ("C", []), - ("A", []), - ("B", []), - ("C", []), - ("A", []), - ("B", []), - ("C", []), - ("A", []), - ("B", []), - ("C", []), - ("A", []), - ("B", []), + (mp.cycle_until_no_change, ["A", "B", "C"]), + # force consume messages ("C", [None, None, None, -60*15]) ] +scenario1 = [ + #("instance name", [func, [args], {kwargs}]), + ("A", [mp.send_message, ["some.one@some.where", "some.other@else.where", "Hey Bro", "Heeeey Brooooo"]]), + ("B", [mp.send_message, ["some.other@else.where", "some.one@some.where", "Hey Bro", "Heeeey Brooooo"]]), +] + if __name__ == "__main__": mp.run_scenario(scenario0) diff --git a/test/multipEp.py b/test/multipEp.py index 4c0bf7f..0bc99c0 100644 --- a/test/multipEp.py +++ b/test/multipEp.py @@ -4,8 +4,13 @@ import multiprocessing import importlib import tempfile import time +import types +from copy import deepcopy from collections import OrderedDict +# manager globals +instances = None + # per-instance globals pEp = None handler = None @@ -15,12 +20,44 @@ i_name = "" handshakes_pending = [] handshakes_to_accept = [] +# both side globals (managed by MP) +handshakes_seen = [] +handshakes_validated = [] +msgs_folders = None + def create_account(address, name): global own_addresses i = pEp.Identity(address, name) pEp.myself(i) own_addresses.append(address) +def _send_message(address, msg): + global msgs_folders + # list inside dict from MP manager are not proxified. + msgs = msgs_folders.get(address,[]) + msg.sent = int(time.time()) + msgs.append(str(msg)) + msgs_folders[address] = msgs + +def _encrypted_message(from_address, to_address, shortmsg, longmsg): + m = pEp.outgoing_message(Identity(from_address, from_address)) + m.to = [pEp.Identity(to_address, to_address)] + m.shortmsg = shortmsg + m.longmsg = longmsg + m.encrypt() + return msg + +def encrypted_message(from_address, to_address, shortmsg, longmsg): + return str(_encrypted_message(from_address, to_address, shortmsg, longmsg)) + +def send_message(from_address, to_address, shortmsg, longmsg): + msg = _encrypted_message(from_address, to_address, shortmsg, longmsg) + _send_message(to_address, msg) + +def decrypt_message(msgstr): + msg = pEp.incoming_message(msgstr) + msg2, keys, rating, consumed, flags = msg.decrypt() + def printi(*args): global indent print(i_name + ">" * indent, *args) @@ -51,9 +88,78 @@ def printmsg(msg): pfx = " " printi("attachments : ", msg.attachments) -def pEp_instance_run(iname, conn, msgs_folders, handshakes_seen, handshakes_validated): - global pEp, handler, own_addresses, i_name +def execute_order(order, handler, conn): + global handshakes_pending, handshakes_to_accept, handshakes_seen + global handshakes_validated, msgs_folders + func, args, kwargs, timeoff = order[0:] + [None, [], {}, 0][len(order):] + + printheader("DECRYPT messages") + # decrypt every non-consumed message for all instance accounts + for own_address in own_addresses: + msgs_for_me = msgs_folders[own_address] + for msgstr in msgs_for_me: + msg = pEp.incoming_message(msgstr) + printi("--- decrypt()") + msg.recv = int(time.time() + timeoff) + printmsg(msg) + msg2, keys, rating, consumed, flags = msg.decrypt() + if consumed == "MESSAGE_CONSUMED": + printi("--- PEP_MESSAGE_CONSUMED") + # folder may have changed in the meantime, + # remove item directly from latest version of it. + folder = msgs_folders[own_address] + folder.remove(msgstr) + msgs_folders[own_address] = folder + elif consumed == "MESSAGE_DISCARDED": + printi("--- PEP_MESSAGE_DISCARDED") + else : + printi("->-", rating, "->-") + printmsg(msg2) + printi("---") + printheader() + + if handshakes_pending: + printheader("check pending handshakes accepted on other device") + for tple in handshakes_pending: + tw, partner = tple + if tw in handshakes_validated: + handshakes_validated.remove(tw) + handshakes_pending.remove(tple) + printi("ACCEPT pending handshake : "+ tw) + handler.deliverHandshakeResult(partner, 0) + printheader() + + res = None + if func is not None: + printheader("Executing function " + func.__name__) + printi("args :", args) + printi("kwargs :", kwargs) + res = func(*args,**kwargs) + printi("function " + func.__name__ + " returned :", res) + printheader() + + if handshakes_to_accept: + printheader("apply to-accept-because-already-seen handshakes") + for tple in handshakes_to_accept: + tw, partner = tple + printi("ACCEPT handshake : "+ tw) + handshakes_validated.append(tw) + handshakes_to_accept.remove(tple) + handler.deliverHandshakeResult(partner, 0) + printheader() + + conn.send(res) + +def pEp_instance_run(iname, conn, _msgs_folders, _handshakes_seen, _handshakes_validated): + global pEp, handler, own_addresses, i_name, msgs_folders + global handshakes_pending, handshakes_to_accept + global handshakes_seen, handshakes_validated + + # assign instance globals + msgs_folders = _msgs_folders + handshakes_seen = _handshakes_seen + handshakes_validated = _handshakes_validated i_name = iname pEp = importlib.import_module("pEp") @@ -64,11 +170,7 @@ def pEp_instance_run(iname, conn, msgs_folders, handshakes_seen, handshakes_vali printmsg(msg) printheader() for rcpt in msg.to + msg.cc + msg.bcc: - # list inside dict from MP manager are not proxified. - msgs = msgs_folders.get(rcpt.address,[]) - msg.sent = int(time.time()) - msgs.append(str(msg)) - msgs_folders[rcpt.address] = msgs + _send_message(rcpt.address, msg) def showHandshake(self, me, partner): printheader("show HANDSHAKE dialog") @@ -84,8 +186,6 @@ def pEp_instance_run(iname, conn, msgs_folders, handshakes_seen, handshakes_vali handshakes_seen.append(tw) printheader() - # TODO: reject scenario ? - handler = Handler() while True: @@ -93,65 +193,9 @@ def pEp_instance_run(iname, conn, msgs_folders, handshakes_seen, handshakes_vali if order is None: break - func, args, kwargs, timeoff = order[0:] + [None, [], {}, 0][len(order):] - - printheader("DECRYPT messages") - # decrypt every non-consumed message for all instance accounts - for own_address in own_addresses: - msgs_for_me = msgs_folders[own_address] - for msgstr in msgs_for_me: - msg = pEp.incoming_message(msgstr) - printi("--- decrypt()") - msg.recv = int(time.time() + timeoff) - printmsg(msg) - msg2, keys, rating, consumed, flags = msg.decrypt() - - if consumed == "MESSAGE_CONSUMED": - printi("--- PEP_MESSAGE_CONSUMED") - # folder may have changed in the meantime, - # remove item directly from latest version of it. - folder = msgs_folders[own_address] - folder.remove(msgstr) - msgs_folders[own_address] = folder - elif consumed == "MESSAGE_DISCARDED": - printi("--- PEP_MESSAGE_DISCARDED") - else : - printi("->-", rating, "->-") - printmsg(msg2) - printi("---") - printheader() - - if handshakes_pending: - printheader("check pending handshakes accepted on other device") - for tple in handshakes_pending: - tw, partner = tple - if tw in handshakes_validated: - handshakes_validated.remove(tw) - handshakes_pending.remove(tple) - printi("ACCEPT pending handshake : "+ tw) - handler.deliverHandshakeResult(partner, 0) - printheader() + execute_order(order, handler, conn) - res = None - if func is not None: - printheader("Executing function " + func.__name__) - printi("args :", args) - printi("kwargs :", kwargs) - res = func(*args,**kwargs) - printi("function " + func.__name__ + " returned :", res) - printheader() - - if handshakes_to_accept: - printheader("apply to-accept-because-already-seen handshakes") - for tple in handshakes_to_accept: - tw, partner = tple - printi("ACCEPT handshake : "+ tw) - handshakes_validated.append(tw) - handshakes_to_accept.remove(tple) - handler.deliverHandshakeResult(partner, 0) - printheader() - - conn.send(res) + msgs_folders = None def pEp_instance_main(iname, *args): # run with a dispensable $HOME to get fresh DB and PGP keyrings @@ -161,47 +205,92 @@ def pEp_instance_main(iname, *args): pEp_instance_run(iname, *args) print(iname + " exiting") +def run_instance_action(action): + global handshakes_seen, handshakes_validated, msgs_folders + global instances + iname, order = action + if iname not in instances: + conn, child_conn = multiprocessing.Pipe() + proc = multiprocessing.Process( + target=pEp_instance_main, + args=(iname, child_conn, msgs_folders, + handshakes_seen, handshakes_validated)) + proc.start() + instances[iname] = (proc, conn) + if "wait_for_debug" in sys.argv: + yes = input("#"*80 + "\n" + + "INSTANCE " + iname + "\n" + + "Enter y/yes/Y/YES to attach debugger to process " + + str(proc.pid) + "\nor just press ENTER\n" + + "#"*80 + "\n") + if yes in ["y", "Y", "yes" "YES"]: + # TODO : linux terminal support + #import subprocess + #subprocess.call(['xterm', '-e', 'lldb', '-p', str(proc.pid)]) + import appscript + appscript.app('Terminal').do_script('lldb -p ' + str(proc.pid)) + time.sleep(2) + else: + proc, conn = instances[iname] + + conn.send(order) + return conn.recv() + +def purge_instance(): + global instances + for iname, (proc, conn) in instances.items(): + # tell process to terminate + conn.send(None) + proc.join() + +def run_manager_action(action): + func, args, kwargs = action[0:] + (None, [], {})[len(action):] + return func(*args, **kwargs) + def run_scenario(scenario): + global handshakes_seen, handshakes_validated, msgs_folders, instances instances = OrderedDict() with multiprocessing.Manager() as manager: msgs_folders = manager.dict() handshakes_seen = manager.list() handshakes_validated = manager.list() - for iname, order in scenario: - if iname not in instances: - conn, child_conn = multiprocessing.Pipe() - proc = multiprocessing.Process( - target=pEp_instance_main, - args=(iname, child_conn, msgs_folders, - handshakes_seen, handshakes_validated)) - proc.start() - instances[iname] = (proc, conn) - if "wait_for_debug" in sys.argv: - yes = input("#"*80 + "\n" + - "INSTANCE " + iname + "\n" + - "Enter y/yes/Y/YES to attach debugger to process " + - str(proc.pid) + "\nor just press ENTER\n" + - "#"*80 + "\n") - if yes in ["y", "Y", "yes" "YES"]: - # TODO : linux terminal support - #import subprocess - #subprocess.call(['xterm', '-e', 'lldb', '-p', str(proc.pid)]) - import appscript - appscript.app('Terminal').do_script('lldb -p ' + str(proc.pid)) - time.sleep(2) + + res = None + for action in scenario: + output = None + if type(action[-1]) == types.FunctionType: + output = action[-1] + action = action[:-1] + + if type(action[0]) == str: + res = run_instance_action(action) else: - proc, conn = instances[iname] + res = run_manager_action(action) - conn.send(order) - res = conn.recv() + if output is not None: + output(res, action) if "wait_for_debug" in sys.argv: input("#"*80 + "\n" + "Press ENTER to cleanup\n" + "#"*80 + "\n") - for iname, (proc, conn) in instances.items(): - # tell process to terminate - conn.send(None) - proc.join() + purge_instance() + +def cycle_until_no_change(*instancelist, maxcycles=20): + count = 0 + while True: + global msgs_folders + tmp = deepcopy(dict(msgs_folders)) + for iname in instancelist: + action = (iname, []) + run_instance_action(action) + count += 1 + + if dict(msgs_folders) == tmp: + return + + if count >= maxcycles: + raise Exception("Too many cycles waiting for stability") +