import os import sys import multiprocessing import importlib import tempfile import time import types import itertools from copy import deepcopy from collections import OrderedDict # ---------------------------------------------------------------------------- # GLOBALS # ---------------------------------------------------------------------------- # per-instance globals sync_handler = None own_addresses = [] indent = 0 i_name = "" handshakes_pending = None pEp = None # manager globals instances = None if(multiprocessing.current_process().name == "MainProcess"): ctx = multiprocessing.get_context('spawn') # import pEp in main process to get enums pEp = importlib.import_module("pEp") # both side globals (managed by MP) handshakes_seen = None test_config = None msgs_folders = None # both side globals (not managed) disable_sync = False # ---------------------------------------------------------------------------- # INSTANCE ACTIONS # ---------------------------------------------------------------------------- def create_account(address, name, flags=None): global own_addresses i = pEp.Identity(address, name) if flags is not None: i.flags = flags pEp.myself(i) own_addresses.append(address) def _send_message(address, msg): global msgs_folders # list inside dict from MP manager are not proxified. msgs = msgs_folders.get(address,[]) msg.sent = int(time.time()) msgs.append(str(msg)) msgs_folders[address] = msgs def _encrypted_message(from_address, to_address, shortmsg, longmsg): m = pEp.outgoing_message(pEp.Identity(from_address, from_address)) if type(to_address) != list : to_address = [to_address] m.to = [ pEp.Identity(address, address) for address in to_address ] m.shortmsg = shortmsg m.longmsg = longmsg begin = time.time() ret = m.encrypt() end = time.time() printi("ENCRYPTION TIME:", end - begin) return ret def encrypted_message(from_address, to_address, shortmsg, longmsg): return str(_encrypted_message(from_address, to_address, shortmsg, longmsg)) def send_message(from_address, to_address, shortmsg, longmsg): msg = _encrypted_message(from_address, to_address, shortmsg, longmsg) if type(to_address) != list : to_address = [to_address] for address in to_address: _send_message(address, msg) def decrypt_message(msgstr): msg = pEp.incoming_message(msgstr) printi("--- decrypt()") msg.recv = int(time.time()) printmsg(msg) msg2, keys, rating, consumed, flags = msg.decrypt() printi("->-", rating, "->-") printmsg(msg2) printi("---") return rating def simulate_timeout(): global sync_handler sync_handler.onTimeout() no_inbox_decrypt = [simulate_timeout, create_account] # ---------------------------------------------------------------------------- # MANAGER ACTIONS # ---------------------------------------------------------------------------- def flush_all_mails(): global msgs_folders count = sum(map(len,msgs_folders.values())) msgs_folders.clear() return count def restart_instance(iname): tmpdir, instance_addresses = stop_instance(iname) instances[iname] = start_instance(iname, tmpdir, instance_addresses) def cycle_until_no_change(*instancelist, maxcycles=20): count = 0 while True: global msgs_folders tmp = deepcopy(dict(msgs_folders)) for iname in instancelist: action = (iname, []) run_instance_action(action) count += 1 if dict(msgs_folders) == tmp: return count if count >= maxcycles: raise Exception("Too many cycles waiting for stability") def disable_auto_handshake(): global test_config test_config.disable_handshake = True def enable_auto_handshake(): global test_config test_config.disable_handshake = False def expect(expectation): def _expect(res, action): if(expectation != res): raise Exception("Expected " + str(expectation) + ", got " + str(res)) return _expect # ---------------------------------------------------------------------------- # "PRETTY" PRINTING # ---------------------------------------------------------------------------- def printi(*args): global indent print(i_name + ">" * indent, *args) def printheader(blah=None): global indent if blah is None: printi("-" * 80) indent = indent - 1 else: indent = indent + 1 printi("-" * (39 - int(len(blah)/2)) + " " + blah + " " + "-" * (39 - len(blah) + int(len(blah)/2))) def printmsg(msg): printi("from :", msg.from_) printi("to :", msg.to) printi("recv :", msg.recv) printi("sent :", msg.sent) printi("short :", msg.shortmsg) printi("opt_fields :", msg.opt_fields) lng = msg.longmsg.splitlines() lngcut = lng[:40]+["[...]"] if len(lng)>40 else lng pfx = "long : " for l in lngcut : printi(pfx + l) pfx = " " printi("attachments : ", msg.attachments) # ---------------------------------------------------------------------------- # INSTANCE TEST EXECUTION # ---------------------------------------------------------------------------- def execute_order(order): global handshakes_pending, hanshakes_seen global test_config, msgs_folders, own_addresses, sync_handler func, args, kwargs, timeoff = order[0:] + [None, [], {}, 0][len(order):] printheader("DECRYPT messages") # decrypt every non-consumed message for all instance accounts if func not in no_inbox_decrypt : for own_address in own_addresses : msgs_for_me = msgs_folders.get(own_address, []) for msgstr in msgs_for_me: msg = pEp.incoming_message(msgstr) printi("--- decrypt()") msg.recv = int(time.time() + timeoff) printmsg(msg) msg2, keys, rating, consumed, flags = msg.decrypt() if consumed == "MESSAGE_CONSUME": printi("--- PEP_MESSAGE_CONSUMED") # folder may have changed in the meantime, # remove item directly from latest version of it. folder = msgs_folders[own_address] folder.remove(msgstr) msgs_folders[own_address] = folder elif consumed == "MESSAGE_IGNORE": printi("--- PEP_MESSAGE_DISCARDED") else : printi("->-", rating, "->-") printmsg(msg2) printi("---") printheader() res = None if func is not None: printheader("Executing instance function " + func.__name__) printi("args :", args) printi("kwargs :", kwargs) res = func(*args,**kwargs) printi("function " + func.__name__ + " returned :", res) printheader() if handshakes_pending and not test_config.disable_handshake : printheader("check pending handshakes accepted on other device") tw, partner, nth_seen = handshakes_pending if handshakes_seen[tw] >= test_config.handshake_count_to_accept : if nth_seen in [1, test_config.handshake_count_to_accept]: # equiv to close dialog handshakes_pending = None printi("ACCEPT pending handshake : "+ tw) sync_handler.deliverHandshakeResult(partner, 0) # else dialog closed later by OVERTAKEN notification printheader() return res def pEp_instance_run(iname, _own_addresses, conn, _msgs_folders, _handshakes_seen, _test_config): global pEp, sync_handler, own_addresses, i_name, msgs_folders global handshakes_pending global handshakes_seen, test_config # assign instance globals own_addresses = _own_addresses msgs_folders = _msgs_folders handshakes_seen = _handshakes_seen test_config = _test_config i_name = iname pEp = importlib.import_module("pEp") class Handler(pEp.SyncMixIn): def messageToSend(self, msg): printheader("SYNC MESSAGE to send") printmsg(msg) printheader() for rcpt in msg.to + msg.cc + msg.bcc: _send_message(rcpt.address, msg) def notifyHandshake(self, me, partner, signal): global handshakes_pending if test_config.disable_handshake : printheader("HANDSHAKE disabled. Notification ignored") printi(signal) printheader() return if signal in [ pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_ADD_OUR_DEVICE, pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_ADD_OTHER_DEVICE, pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_FORM_GROUP, pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_MOVE_OUR_DEVICE]: printheader("show HANDSHAKE dialog") printi(signal) printi("handshake needed between " + repr(me) + " and " + repr(partner)) tw = pEp.trustwords(me, partner, 'en') printi(tw) # This is an error from pEpEngine if asked to open handshake dialog twice if handshakes_pending: raise Exception("Asked to open a second Sync Handshake Dialog !") if tw in handshakes_seen : handshakes_seen[tw] += 1 else: handshakes_seen[tw] = 1 handshakes_pending = (tw,partner,handshakes_seen[tw]) printheader() elif signal == pEp.sync_handshake_signal.SYNC_NOTIFY_OVERTAKEN: if handshakes_pending: tw, partner, nth_seen = handshakes_pending printi("OVERTAKEN handshake : "+ tw) handshakes_pending = None else: raise Exception("Asked to close a non existing Sync Handshake Dialog !") else : printheader("other HANDSHAKE notification - ignored") printi(signal) printheader() def setTimeout(self, timeout): printi("SET TIMEOUT :", timeout) def cancelTimeout(self): printi("CANCEL TIMEOUT") return 42 if not disable_sync: sync_handler = Handler() while True: order = conn.recv() if order is None: break res = execute_order(order) conn.send(res) conn.send(own_addresses) msgs_folders = None def pEp_instance_main(iname, tmpdirname, *args): # run with a dispensable $HOME to get fresh DB and PGP keyrings print("Instance " + iname + " runs into " + tmpdirname) os.environ['HOME'] = tmpdirname pEp_instance_run(iname, *args) print(iname + " exiting") # ---------------------------------------------------------------------------- # MANAGER TEST EXECUTION # ---------------------------------------------------------------------------- def start_debug(iname, proc): print("#"*80 + "\n" + "INSTANCE " + iname + "\n" + "launching debugger attaching to process " + str(proc.pid) + "\n" + "#"*80 + "\n") # TODO : linux terminal support #import subprocess #subprocess.call(['xterm', '-e', 'lldb', '-p', str(proc.pid)]) import appscript appscript.app('Terminal').do_script('lldb -p ' + str(proc.pid)) time.sleep(2) def start_instance(iname, tmpdir=None, instance_addresses = []): global handshakes_seen, test_config, msgs_folders if tmpdir is None: tmpdir = tempfile.TemporaryDirectory() tmpdirname = tmpdir.name conn, child_conn = ctx.Pipe() proc = ctx.Process( target=pEp_instance_main, args=(iname, tmpdirname, instance_addresses, child_conn, msgs_folders, handshakes_seen, test_config)) proc.start() debug = False if "debug_"+iname in sys.argv : debug = True if not debug and "wait_for_debug" in sys.argv : yes = input("#"*80 + "\n" + "INSTANCE " + iname + "\n" + "Enter y/yes/Y/YES to attach debugger to process " + str(proc.pid) + "\nor just press ENTER\n" + "#"*80 + "\n") if yes in ["y", "Y", "yes" "YES"]: debug = True if debug : start_debug(iname, proc) return (proc, conn, tmpdir, 0) def get_instance(iname): global instances if iname not in instances: res = start_instance(iname) instances[iname] = res return res else: return instances[iname] def stop_instance(iname): proc, conn, tmpdir, execnt = instances.pop(iname) # tell process to terminate conn.send(None) instance_addresses = conn.recv() proc.join() return tmpdir, instance_addresses def purge_instances(): global instances for iname in list(instances.keys()): stop_instance(iname) def run_instance_action(action): iname, order = action proc, conn, tmpdir, execnt = get_instance(iname) execnt = execnt + 1 instances[iname] = (proc, conn, tmpdir, execnt) debug_here_arg = "debug_"+iname+"_"+str(execnt) print(iname, ": execution number :", execnt , "(add", debug_here_arg, "to args to debug from here)") if debug_here_arg in sys.argv : start_debug(iname, proc) conn.send(order) return conn.recv() def run_manager_action(action): func, args, kwargs = action[0:] + (None, [], {})[len(action):] print("------------------------- Executing manager function -----------------------------") print("function name :", func.__name__) print("args :", args) print("kwargs :", kwargs) res = func(*args, **kwargs) print("manager function " + func.__name__ + " returned :", res) print("-" * 80) return res def run_scenario(scenario): global pEp for a in sys.argv: if a.startswith("only_") and a != "only_" + scenario.__name__ : print("IGNORING: " + scenario.__name__) return print("RUNNING: " + scenario.__name__) global handshakes_seen, test_config, msgs_folders, instances instances = OrderedDict() with ctx.Manager() as manager: msgs_folders = manager.dict() handshakes_seen = manager.dict() test_config = manager.Namespace( disable_handshake=False, handshake_count_to_accept=2) sc = scenario() t = None try: action = next(sc) while True: res = None output = None if len(action) > 1 and type(action[-1]) == types.FunctionType: output = action[-1] action = action[:-1] if type(action[0]) == str: res = run_instance_action(action) else: res = run_manager_action(action) if output is not None: output(res, action) action = sc.send(res) except StopIteration: pass except : t,v,tv = sys.exc_info() import traceback print("EXCEPTION IN: " + scenario.__name__) traceback.print_exc() if "wait_for_cleanup" in sys.argv: for iname,(proc, conn, tmpdir, execnt) in instances.items(): print("Instance " + iname + " waits into " + tmpdir.name) input("#"*80 + "\n" + "Press ENTER to cleanup\n" + "#"*80 + "\n") purge_instances() if t: raise t(v).with_traceback(tv)