""" multipEp.py : multiple process python testing framework for pEp = Command line switches = wait_for_debug Block and ask if debugger should be attached each time an instance is started debug_${instance_name} Launch lldb in another terminal, and attach it to given intsance immediately after instance startup. debug_${instance_name}_${execution_number} Launch lldb in another terminal, and attach it to given intsance when instance is at some particular step in the test. ${execution_number} is found by reading test output. only_${test_scenario_name} Execute only given test scenario. Scenario with different name are skipped. libs_${instance_name}=/path/to/libs Set LD_LIBRARY_PATH to given path before launching instance, meant to allow selection of per-instance pEpEngines flavors wait_for_cleanup Block at the end of each test scenario, before deleting temporary directory. It is meant to be able to examine keyring and DBs after test finished or crashed. """ import os import sys import multiprocessing import importlib import tempfile import time import types import itertools from copy import deepcopy from collections import OrderedDict # ---------------------------------------------------------------------------- # GLOBALS # ---------------------------------------------------------------------------- # per-instance globals sync_handler = None own_addresses = [] indent = 0 i_name = "" handshakes_pending = None pEp = None # manager globals instances = None if(multiprocessing.current_process().name == "MainProcess"): ctx = multiprocessing.get_context('spawn') # import pEp in main process to get enums pEp = importlib.import_module("pEp") # both side globals (managed by MP) handshakes_seen = None test_config = None msgs_folders = None # both side globals (not managed) disable_sync = False # ---------------------------------------------------------------------------- # INSTANCE ACTIONS # ---------------------------------------------------------------------------- def create_account(address, name, flags=None): global own_addresses i = pEp.Identity(address, name) if flags is not None: i.flags = flags pEp.myself(i) own_addresses.append(address) def _send_message(address, msg): global msgs_folders # list inside dict from MP manager are not proxified. msgs = msgs_folders.get(address,[]) msg.sent = int(time.time()) msgs.append(str(msg)) msgs_folders[address] = msgs def _encrypted_message(from_address, to_address, shortmsg, longmsg): m = pEp.outgoing_message(pEp.Identity(from_address, from_address)) if type(to_address) != list : to_address = [to_address] m.to = [ pEp.Identity(address, address) for address in to_address ] m.shortmsg = shortmsg m.longmsg = longmsg begin = time.time() ret = m.encrypt() end = time.time() printi("ENCRYPTION TIME:", end - begin) return ret def encrypted_message(from_address, to_address, shortmsg, longmsg): return str(_encrypted_message(from_address, to_address, shortmsg, longmsg)) def send_message(from_address, to_address, shortmsg, longmsg): msg = _encrypted_message(from_address, to_address, shortmsg, longmsg) if type(to_address) != list : to_address = [to_address] for address in to_address: _send_message(address, msg) def decrypt_message(msgstr): msg = pEp.incoming_message(msgstr) printi("--- decrypt()") msg.recv = int(time.time()) printmsg(msg) msg2, keys, rating, consumed, flags = msg.decrypt() printi("->-", rating, "->-") printmsg(msg2) printi("---") return rating def simulate_timeout(): global sync_handler sync_handler.onTimeout() no_inbox_decrypt = [simulate_timeout, create_account] # ---------------------------------------------------------------------------- # MANAGER ACTIONS # ---------------------------------------------------------------------------- def flush_all_mails(): global msgs_folders count = sum(map(len,msgs_folders.values())) msgs_folders.clear() return count def restart_instance(iname): tmpdir, instance_addresses = stop_instance(iname) instances[iname] = start_instance(iname, tmpdir, instance_addresses) def cycle_until_no_change(*instancelist, maxcycles=20): count = 0 while True: global msgs_folders tmp = deepcopy(dict(msgs_folders)) for iname in instancelist: action = (iname, []) run_instance_action(action) count += 1 if dict(msgs_folders) == tmp: return count if count >= maxcycles: raise Exception("Too many cycles waiting for stability") def disable_auto_handshake(): global test_config test_config.disable_handshake = True def enable_auto_handshake(): global test_config test_config.disable_handshake = False def expect(expectation): def _expect(res, action): if(expectation != res): raise Exception("Expected " + str(expectation) + ", got " + str(res)) return _expect # ---------------------------------------------------------------------------- # "PRETTY" PRINTING # ---------------------------------------------------------------------------- def printi(*args): global indent print(i_name + ">" * indent, *args) def printheader(blah=None): global indent if blah is None: printi("-" * 80) indent = indent - 1 else: indent = indent + 1 printi("-" * (39 - int(len(blah)/2)) + " " + blah + " " + "-" * (39 - len(blah) + int(len(blah)/2))) def printmsg(msg): printi("from :", msg.from_) printi("to :", msg.to) printi("recv :", msg.recv) printi("sent :", msg.sent) printi("short :", msg.shortmsg) printi("opt_fields :", msg.opt_fields) lng = msg.longmsg.splitlines() lngcut = lng[:40]+["[...]"] if len(lng)>40 else lng pfx = "long : " for l in lngcut : printi(pfx + l) pfx = " " printi("attachments : ", msg.attachments) # ---------------------------------------------------------------------------- # INSTANCE TEST EXECUTION # ---------------------------------------------------------------------------- def execute_order(order): global handshakes_pending, hanshakes_seen global test_config, msgs_folders, own_addresses, sync_handler func, args, kwargs, timeoff = order[0:] + [None, [], {}, 0][len(order):] printheader("DECRYPT messages") # decrypt every non-consumed message for all instance accounts if func not in no_inbox_decrypt : for own_address in own_addresses : msgs_for_me = msgs_folders.get(own_address, []) for msgstr in msgs_for_me: msg = pEp.incoming_message(msgstr) printi("--- decrypt()") msg.recv = int(time.time() + timeoff) printmsg(msg) msg2, keys, rating, consumed, flags = msg.decrypt() if consumed == "MESSAGE_CONSUME": printi("--- PEP_MESSAGE_CONSUMED") # folder may have changed in the meantime, # remove item directly from latest version of it. folder = msgs_folders[own_address] folder.remove(msgstr) msgs_folders[own_address] = folder elif consumed == "MESSAGE_IGNORE": printi("--- PEP_MESSAGE_DISCARDED") else : printi("->-", rating, "->-") printmsg(msg2) printi("---") printheader() res = None if func is not None: printheader("Executing instance function " + func.__name__) printi("args :", args) printi("kwargs :", kwargs) res = func(*args,**kwargs) printi("function " + func.__name__ + " returned :", res) printheader() if handshakes_pending and not test_config.disable_handshake : printheader("check pending handshakes accepted on other device") tw, partner, nth_seen = handshakes_pending if handshakes_seen[tw] >= test_config.handshake_count_to_accept : if nth_seen in [1, test_config.handshake_count_to_accept]: # equiv to close dialog handshakes_pending = None printi("ACCEPT pending handshake : "+ tw) sync_handler.deliverHandshakeResult(partner, 0) # else dialog closed later by OVERTAKEN notification printheader() return res def pEp_instance_run(iname, _own_addresses, conn, _msgs_folders, _handshakes_seen, _test_config): global pEp, sync_handler, own_addresses, i_name, msgs_folders global handshakes_pending global handshakes_seen, test_config # assign instance globals own_addresses = _own_addresses msgs_folders = _msgs_folders handshakes_seen = _handshakes_seen test_config = _test_config i_name = iname pEp = importlib.import_module("pEp") class Handler(pEp.SyncMixIn): def messageToSend(self, msg): printheader("SYNC MESSAGE to send") printmsg(msg) printheader() for rcpt in msg.to + msg.cc + msg.bcc: _send_message(rcpt.address, msg) def notifyHandshake(self, me, partner, signal): global handshakes_pending if test_config.disable_handshake : printheader("HANDSHAKE disabled. Notification ignored") printi(signal) printheader() return if signal in [ pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_ADD_OUR_DEVICE, pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_ADD_OTHER_DEVICE, pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_FORM_GROUP, pEp.sync_handshake_signal.SYNC_NOTIFY_INIT_MOVE_OUR_DEVICE]: printheader("show HANDSHAKE dialog") printi(signal) printi("handshake needed between " + repr(me) + " and " + repr(partner)) tw = pEp.trustwords(me, partner, 'en') printi(tw) # This is an error from pEpEngine if asked to open handshake dialog twice if handshakes_pending: raise Exception("Asked to open a second Sync Handshake Dialog !") if tw in handshakes_seen : handshakes_seen[tw] += 1 else: handshakes_seen[tw] = 1 handshakes_pending = (tw,partner,handshakes_seen[tw]) printheader() elif signal == pEp.sync_handshake_signal.SYNC_NOTIFY_OVERTAKEN: if handshakes_pending: tw, partner, nth_seen = handshakes_pending printi("OVERTAKEN handshake : "+ tw) handshakes_pending = None else: raise Exception("Asked to close a non existing Sync Handshake Dialog !") else : printheader("other HANDSHAKE notification - ignored") printi(signal) printheader() def setTimeout(self, timeout): printi("SET TIMEOUT :", timeout) def cancelTimeout(self): printi("CANCEL TIMEOUT") return 42 if not disable_sync: sync_handler = Handler() while True: order = conn.recv() if order is None: break res = execute_order(order) conn.send(res) conn.send(own_addresses) msgs_folders = None def pEp_instance_main(iname, tmpdirname, *args): # run with a dispensable $HOME to get fresh DB and PGP keyrings print("Instance " + iname + " runs into " + tmpdirname) os.environ['HOME'] = tmpdirname pEp_instance_run(iname, *args) print(iname + " exiting") # ---------------------------------------------------------------------------- # MANAGER TEST EXECUTION # ---------------------------------------------------------------------------- def start_debug(iname, proc): print("#"*80 + "\n" + "INSTANCE " + iname + "\n" + "launching debugger attaching to process " + str(proc.pid) + "\n" + "#"*80 + "\n") # TODO : linux terminal support #import subprocess #subprocess.call(['xterm', '-e', 'lldb', '-p', str(proc.pid)]) import appscript appscript.app('Terminal').do_script('lldb -p ' + str(proc.pid)) time.sleep(2) def start_instance(iname, tmpdir=None, instance_addresses = []): global handshakes_seen, test_config, msgs_folders given_libs = None for a in sys.argv: if a.startswith("libs_"+iname+"="): given_libs = a.split("=")[1] break if tmpdir is None: tmpdir = tempfile.TemporaryDirectory() if given_libs is not None: os.symlink(given_libs, os.path.join(tmpdir.name, "libs")) if sys.platform.startswith('darwin'): ld_env_name = 'DYLD_LIBRARY_PATH' else: ld_env_name = 'LD_LIBRARY_PATH' orig_ld_env_val = None if given_libs is not None: orig_ld_env_val = os.environ.pop(ld_env_name, None) os.environ[ld_env_name] = os.path.join(tmpdir.name, "libs") conn, child_conn = ctx.Pipe() proc = ctx.Process( target=pEp_instance_main, args=(iname, tmpdir.name, instance_addresses, child_conn, msgs_folders, handshakes_seen, test_config)) proc.start() if orig_ld_env_val is not None: os.environ[ld_env_name] = orig_ld_env_val elif given_libs is not None: os.environ.pop(ld_env_name) debug = False if "debug_"+iname in sys.argv : debug = True if not debug and "wait_for_debug" in sys.argv : yes = input("#"*80 + "\n" + "INSTANCE " + iname + "\n" + "Enter y/yes/Y/YES to attach debugger to process " + str(proc.pid) + "\nor just press ENTER\n" + "#"*80 + "\n") if yes in ["y", "Y", "yes" "YES"]: debug = True if debug : start_debug(iname, proc) return (proc, conn, tmpdir, 0) def get_instance(iname): global instances if iname not in instances: res = start_instance(iname) instances[iname] = res return res else: return instances[iname] def stop_instance(iname): proc, conn, tmpdir, execnt = instances.pop(iname) # tell process to terminate conn.send(None) instance_addresses = conn.recv() proc.join() return tmpdir, instance_addresses def purge_instances(): global instances for iname in list(instances.keys()): stop_instance(iname) def run_instance_action(action): iname, order = action proc, conn, tmpdir, execnt = get_instance(iname) execnt = execnt + 1 instances[iname] = (proc, conn, tmpdir, execnt) debug_here_arg = "debug_"+iname+"_"+str(execnt) print(iname, ": execution number :", execnt , "(add", debug_here_arg, "to args to debug from here)") if debug_here_arg in sys.argv : start_debug(iname, proc) conn.send(order) return conn.recv() def run_manager_action(action): func, args, kwargs = action[0:] + (None, [], {})[len(action):] print("------------------------- Executing manager function -----------------------------") print("function name :", func.__name__) print("args :", args) print("kwargs :", kwargs) res = func(*args, **kwargs) print("manager function " + func.__name__ + " returned :", res) print("-" * 80) return res def run_scenario(scenario): global pEp for a in sys.argv: if a.startswith("only_") and a != "only_" + scenario.__name__ : print("IGNORING: " + scenario.__name__) return print("RUNNING: " + scenario.__name__) global handshakes_seen, test_config, msgs_folders, instances instances = OrderedDict() with ctx.Manager() as manager: msgs_folders = manager.dict() handshakes_seen = manager.dict() test_config = manager.Namespace( disable_handshake=False, handshake_count_to_accept=2) sc = scenario() t = None try: action = next(sc) while True: res = None output = None if len(action) > 1 and type(action[-1]) == types.FunctionType: output = action[-1] action = action[:-1] if type(action[0]) == str: res = run_instance_action(action) else: res = run_manager_action(action) if output is not None: output(res, action) action = sc.send(res) except StopIteration: pass except : t,v,tv = sys.exc_info() import traceback print("EXCEPTION IN: " + scenario.__name__) traceback.print_exc() if "wait_for_cleanup" in sys.argv: for iname,(proc, conn, tmpdir, execnt) in instances.items(): print("Instance " + iname + " waits into " + tmpdir.name) input("#"*80 + "\n" + "Press ENTER to cleanup\n" + "#"*80 + "\n") purge_instances() if t: raise t(v).with_traceback(tv)