Browse Source

refactored multi process test framework, so that test can evaluate to failed/passed and not require someone to read and interpred if it did work

PYADPT-55
Edouard Tisserant 9 years ago
parent
commit
4930c6eb56
  1. 39
      test/mp_sync_test.py
  2. 281
      test/multipEp.py

39
test/mp_sync_test.py

@ -8,43 +8,24 @@ DYLD_LIBRARY_PATH=/Users/ed/lib/ PYTHONPATH=`pwd`/../build/lib.macosx-10.11-x86_
import multipEp as mp
# unused
def send_message(from_address, to_address):
m = mp.pEp.outgoing_message(Identity(from_address, from_address))
m.to = [mp.pEp.Identity(to_address, to_address)]
m.shortmsg = "Hello"
m.longmsg = "Something\\n"
m.encrypt()
mp.sent_messages.append(str(m))
scenario0 = [
#("instance name", ["func name", [args], {kwargs}]),
#("instance name", [func, [args], {kwargs}]),
("A", [mp.create_account, ["some.one@some.where", "Some One"]]),
("B", [mp.create_account, ["some.one@some.where", "Some One"]]),
("A", []),
("B", []),
("A", []),
("B", []),
("A", []),
("B", []),
(mp.cycle_until_no_change, ["A", "B"]),
("C", [mp.create_account, ["some.one@some.where", "Some One"]]),
("A", []),
("B", []),
("C", []),
("A", []),
("B", []),
("C", []),
("A", []),
("B", []),
("C", []),
("A", []),
("B", []),
("C", []),
("A", []),
("B", []),
(mp.cycle_until_no_change, ["A", "B", "C"]),
# force consume messages
("C", [None, None, None, -60*15])
]
scenario1 = [
#("instance name", [func, [args], {kwargs}]),
("A", [mp.send_message, ["some.one@some.where", "some.other@else.where", "Hey Bro", "Heeeey Brooooo"]]),
("B", [mp.send_message, ["some.other@else.where", "some.one@some.where", "Hey Bro", "Heeeey Brooooo"]]),
]
if __name__ == "__main__":
mp.run_scenario(scenario0)

281
test/multipEp.py

@ -4,8 +4,13 @@ import multiprocessing
import importlib
import tempfile
import time
import types
from copy import deepcopy
from collections import OrderedDict
# manager globals
instances = None
# per-instance globals
pEp = None
handler = None
@ -15,12 +20,44 @@ i_name = ""
handshakes_pending = []
handshakes_to_accept = []
# both side globals (managed by MP)
handshakes_seen = []
handshakes_validated = []
msgs_folders = None
def create_account(address, name):
global own_addresses
i = pEp.Identity(address, name)
pEp.myself(i)
own_addresses.append(address)
def _send_message(address, msg):
global msgs_folders
# list inside dict from MP manager are not proxified.
msgs = msgs_folders.get(address,[])
msg.sent = int(time.time())
msgs.append(str(msg))
msgs_folders[address] = msgs
def _encrypted_message(from_address, to_address, shortmsg, longmsg):
m = pEp.outgoing_message(Identity(from_address, from_address))
m.to = [pEp.Identity(to_address, to_address)]
m.shortmsg = shortmsg
m.longmsg = longmsg
m.encrypt()
return msg
def encrypted_message(from_address, to_address, shortmsg, longmsg):
return str(_encrypted_message(from_address, to_address, shortmsg, longmsg))
def send_message(from_address, to_address, shortmsg, longmsg):
msg = _encrypted_message(from_address, to_address, shortmsg, longmsg)
_send_message(to_address, msg)
def decrypt_message(msgstr):
msg = pEp.incoming_message(msgstr)
msg2, keys, rating, consumed, flags = msg.decrypt()
def printi(*args):
global indent
print(i_name + ">" * indent, *args)
@ -51,9 +88,78 @@ def printmsg(msg):
pfx = " "
printi("attachments : ", msg.attachments)
def pEp_instance_run(iname, conn, msgs_folders, handshakes_seen, handshakes_validated):
global pEp, handler, own_addresses, i_name
def execute_order(order, handler, conn):
global handshakes_pending, handshakes_to_accept, handshakes_seen
global handshakes_validated, msgs_folders
func, args, kwargs, timeoff = order[0:] + [None, [], {}, 0][len(order):]
printheader("DECRYPT messages")
# decrypt every non-consumed message for all instance accounts
for own_address in own_addresses:
msgs_for_me = msgs_folders[own_address]
for msgstr in msgs_for_me:
msg = pEp.incoming_message(msgstr)
printi("--- decrypt()")
msg.recv = int(time.time() + timeoff)
printmsg(msg)
msg2, keys, rating, consumed, flags = msg.decrypt()
if consumed == "MESSAGE_CONSUMED":
printi("--- PEP_MESSAGE_CONSUMED")
# folder may have changed in the meantime,
# remove item directly from latest version of it.
folder = msgs_folders[own_address]
folder.remove(msgstr)
msgs_folders[own_address] = folder
elif consumed == "MESSAGE_DISCARDED":
printi("--- PEP_MESSAGE_DISCARDED")
else :
printi("->-", rating, "->-")
printmsg(msg2)
printi("---")
printheader()
if handshakes_pending:
printheader("check pending handshakes accepted on other device")
for tple in handshakes_pending:
tw, partner = tple
if tw in handshakes_validated:
handshakes_validated.remove(tw)
handshakes_pending.remove(tple)
printi("ACCEPT pending handshake : "+ tw)
handler.deliverHandshakeResult(partner, 0)
printheader()
res = None
if func is not None:
printheader("Executing function " + func.__name__)
printi("args :", args)
printi("kwargs :", kwargs)
res = func(*args,**kwargs)
printi("function " + func.__name__ + " returned :", res)
printheader()
if handshakes_to_accept:
printheader("apply to-accept-because-already-seen handshakes")
for tple in handshakes_to_accept:
tw, partner = tple
printi("ACCEPT handshake : "+ tw)
handshakes_validated.append(tw)
handshakes_to_accept.remove(tple)
handler.deliverHandshakeResult(partner, 0)
printheader()
conn.send(res)
def pEp_instance_run(iname, conn, _msgs_folders, _handshakes_seen, _handshakes_validated):
global pEp, handler, own_addresses, i_name, msgs_folders
global handshakes_pending, handshakes_to_accept
global handshakes_seen, handshakes_validated
# assign instance globals
msgs_folders = _msgs_folders
handshakes_seen = _handshakes_seen
handshakes_validated = _handshakes_validated
i_name = iname
pEp = importlib.import_module("pEp")
@ -64,11 +170,7 @@ def pEp_instance_run(iname, conn, msgs_folders, handshakes_seen, handshakes_vali
printmsg(msg)
printheader()
for rcpt in msg.to + msg.cc + msg.bcc:
# list inside dict from MP manager are not proxified.
msgs = msgs_folders.get(rcpt.address,[])
msg.sent = int(time.time())
msgs.append(str(msg))
msgs_folders[rcpt.address] = msgs
_send_message(rcpt.address, msg)
def showHandshake(self, me, partner):
printheader("show HANDSHAKE dialog")
@ -84,8 +186,6 @@ def pEp_instance_run(iname, conn, msgs_folders, handshakes_seen, handshakes_vali
handshakes_seen.append(tw)
printheader()
# TODO: reject scenario ?
handler = Handler()
while True:
@ -93,65 +193,9 @@ def pEp_instance_run(iname, conn, msgs_folders, handshakes_seen, handshakes_vali
if order is None:
break
func, args, kwargs, timeoff = order[0:] + [None, [], {}, 0][len(order):]
printheader("DECRYPT messages")
# decrypt every non-consumed message for all instance accounts
for own_address in own_addresses:
msgs_for_me = msgs_folders[own_address]
for msgstr in msgs_for_me:
msg = pEp.incoming_message(msgstr)
printi("--- decrypt()")
msg.recv = int(time.time() + timeoff)
printmsg(msg)
msg2, keys, rating, consumed, flags = msg.decrypt()
if consumed == "MESSAGE_CONSUMED":
printi("--- PEP_MESSAGE_CONSUMED")
# folder may have changed in the meantime,
# remove item directly from latest version of it.
folder = msgs_folders[own_address]
folder.remove(msgstr)
msgs_folders[own_address] = folder
elif consumed == "MESSAGE_DISCARDED":
printi("--- PEP_MESSAGE_DISCARDED")
else :
printi("->-", rating, "->-")
printmsg(msg2)
printi("---")
printheader()
execute_order(order, handler, conn)
if handshakes_pending:
printheader("check pending handshakes accepted on other device")
for tple in handshakes_pending:
tw, partner = tple
if tw in handshakes_validated:
handshakes_validated.remove(tw)
handshakes_pending.remove(tple)
printi("ACCEPT pending handshake : "+ tw)
handler.deliverHandshakeResult(partner, 0)
printheader()
res = None
if func is not None:
printheader("Executing function " + func.__name__)
printi("args :", args)
printi("kwargs :", kwargs)
res = func(*args,**kwargs)
printi("function " + func.__name__ + " returned :", res)
printheader()
if handshakes_to_accept:
printheader("apply to-accept-because-already-seen handshakes")
for tple in handshakes_to_accept:
tw, partner = tple
printi("ACCEPT handshake : "+ tw)
handshakes_validated.append(tw)
handshakes_to_accept.remove(tple)
handler.deliverHandshakeResult(partner, 0)
printheader()
conn.send(res)
msgs_folders = None
def pEp_instance_main(iname, *args):
# run with a dispensable $HOME to get fresh DB and PGP keyrings
@ -161,47 +205,92 @@ def pEp_instance_main(iname, *args):
pEp_instance_run(iname, *args)
print(iname + " exiting")
def run_instance_action(action):
global handshakes_seen, handshakes_validated, msgs_folders
global instances
iname, order = action
if iname not in instances:
conn, child_conn = multiprocessing.Pipe()
proc = multiprocessing.Process(
target=pEp_instance_main,
args=(iname, child_conn, msgs_folders,
handshakes_seen, handshakes_validated))
proc.start()
instances[iname] = (proc, conn)
if "wait_for_debug" in sys.argv:
yes = input("#"*80 + "\n" +
"INSTANCE " + iname + "\n" +
"Enter y/yes/Y/YES to attach debugger to process " +
str(proc.pid) + "\nor just press ENTER\n" +
"#"*80 + "\n")
if yes in ["y", "Y", "yes" "YES"]:
# TODO : linux terminal support
#import subprocess
#subprocess.call(['xterm', '-e', 'lldb', '-p', str(proc.pid)])
import appscript
appscript.app('Terminal').do_script('lldb -p ' + str(proc.pid))
time.sleep(2)
else:
proc, conn = instances[iname]
conn.send(order)
return conn.recv()
def purge_instance():
global instances
for iname, (proc, conn) in instances.items():
# tell process to terminate
conn.send(None)
proc.join()
def run_manager_action(action):
func, args, kwargs = action[0:] + (None, [], {})[len(action):]
return func(*args, **kwargs)
def run_scenario(scenario):
global handshakes_seen, handshakes_validated, msgs_folders, instances
instances = OrderedDict()
with multiprocessing.Manager() as manager:
msgs_folders = manager.dict()
handshakes_seen = manager.list()
handshakes_validated = manager.list()
for iname, order in scenario:
if iname not in instances:
conn, child_conn = multiprocessing.Pipe()
proc = multiprocessing.Process(
target=pEp_instance_main,
args=(iname, child_conn, msgs_folders,
handshakes_seen, handshakes_validated))
proc.start()
instances[iname] = (proc, conn)
if "wait_for_debug" in sys.argv:
yes = input("#"*80 + "\n" +
"INSTANCE " + iname + "\n" +
"Enter y/yes/Y/YES to attach debugger to process " +
str(proc.pid) + "\nor just press ENTER\n" +
"#"*80 + "\n")
if yes in ["y", "Y", "yes" "YES"]:
# TODO : linux terminal support
#import subprocess
#subprocess.call(['xterm', '-e', 'lldb', '-p', str(proc.pid)])
import appscript
appscript.app('Terminal').do_script('lldb -p ' + str(proc.pid))
time.sleep(2)
res = None
for action in scenario:
output = None
if type(action[-1]) == types.FunctionType:
output = action[-1]
action = action[:-1]
if type(action[0]) == str:
res = run_instance_action(action)
else:
proc, conn = instances[iname]
res = run_manager_action(action)
conn.send(order)
res = conn.recv()
if output is not None:
output(res, action)
if "wait_for_debug" in sys.argv:
input("#"*80 + "\n" +
"Press ENTER to cleanup\n" +
"#"*80 + "\n")
for iname, (proc, conn) in instances.items():
# tell process to terminate
conn.send(None)
proc.join()
purge_instance()
def cycle_until_no_change(*instancelist, maxcycles=20):
count = 0
while True:
global msgs_folders
tmp = deepcopy(dict(msgs_folders))
for iname in instancelist:
action = (iname, [])
run_instance_action(action)
count += 1
if dict(msgs_folders) == tmp:
return
if count >= maxcycles:
raise Exception("Too many cycles waiting for stability")

Loading…
Cancel
Save