diff --git a/.hgignore b/.hgignore index b9c430c..67d8ab3 100644 --- a/.hgignore +++ b/.hgignore @@ -17,5 +17,9 @@ test/Laptop test/Library test/Phone test/TestInbox +test/Backup test/lib - +test/imap_settings.py +venv +launch.json +settings.json \ No newline at end of file diff --git a/test/README.md b/test/README.md index 33bd014..270c9bc 100644 --- a/test/README.md +++ b/test/README.md @@ -21,6 +21,15 @@ $ HOME=$PWD lldb python3 -- ../sync_handshake.py -e Phone Then this side is doing a replay in the debugger. Using touch to set a different timestamp on the marker will only partly replay. +In order to work with IMAP you need to create a imap_settings.py file with the +following variables: + +IMAP_HOST = 'domain.ch' +IMAP_PORT = '993' +IMAP_USER = 'your_username' +IMAP_PWD = 'password' +IMAP_EMAIL = 'user@domain.ch' + = Hint = installing termcolor and lxml will beautify the output diff --git a/test/miniimap.py b/test/miniimap.py new file mode 100644 index 0000000..a4b0151 --- /dev/null +++ b/test/miniimap.py @@ -0,0 +1,100 @@ +import imaplib +import pathlib +import time +import os +from secrets import token_urlsafe + +try: + import imap_settings as settings +except: + raise ValueError("Imap settings file not found, please check the readme - miniimap") + + +def connect(): + "connect to the IMAP server Inbox" + server = imaplib.IMAP4_SSL(settings.IMAP_HOST) + server.login(settings.IMAP_USER, settings.IMAP_PWD) + tmp, data = server.select('Inbox') + + #When you connect to the inbox one of the parameters returned is the current + #number of messages in it + if os.environ.get('NUMMESSAGES') is None: + os.environ["NUMMESSAGES"] = data[0].decode("UTF-8") + + return server + +def bytesmessage_to_string(msg): + "converts bytes-like message to string" + if type(msg) is bytes: + msg = msg.decode("UTF-8").rstrip() + return msg + else: + return str(msg) + +def send(inbox, msg): + "send msg to inbox in MIME format" + + server = connect() + tmp, data = server.append(inbox, flags='', date_time=time.time(), message=str(msg).encode("UTF-8")) + server.close() + + +def recv_all(): + """receive a list of all MIME messages from inbox newer than the last message when first connected""" + + server = connect() + r = [] + + tmp, data = server.search(None, 'ALL') + + oldermsgid = os.environ.get('NUMMESSAGES') + + for num in data[0].split(): + if int(num) >= int(oldermsgid): + tmp, data = server.fetch(num, '(RFC822)') + msg = bytesmessage_to_string(data[0][1]) + r.append((num, msg)) + os.environ["NUMMESSAGES"] = num.decode("UTF-8") + + server.close() + + return r + + +def clean_inbox(): + """clean all messsages from IMAP inbox""" + print('cleaning IMAP...') + server = connect() + tmp, data = server.search(None, 'ALL') + for num in data[0].split(): + server.store(num, '+FLAGS', '\\Deleted') + server.expunge() + server.close() + print('IMAP inbox empty.') + + +def backup_inbox(): + """copy all messsages from IMAP to local backup folder""" + server = connect() + tmp, data = server.search(None, 'ALL') + for num in data[0].split(): + tmp, data = server.fetch(num, '(RFC822 BODY[HEADER])') + device = str(data[0][1]).split('From: "')[1].split(' of')[0] + name = device + "_" + token_urlsafe(16) + ".eml" + msg = bytesmessage_to_string(data[0][1]) + with open(os.path.join('Backup/TestInbox',name), "wb") as f: + f.write(str(msg).encode()) + + server.close() + +def restore_inbox(): + """copy all the messages from the Backup folder to the IMAP inbox""" + server = connect() + backups = pathlib.Path("./Backup/TestInbox") + emails = backups.glob("*.eml") + l = [ path for path in emails ] + for p in l: + with open(p, "rb") as f: + tmp, data = server.append("Inbox", flags='', date_time=p.stat().st_ctime, message=f.read(-1)) + + server.close() diff --git a/test/sync_handshake.py b/test/sync_handshake.py index 6e83cb8..62253ed 100644 --- a/test/sync_handshake.py +++ b/test/sync_handshake.py @@ -73,7 +73,7 @@ def print_msg(p): else: raise TypeError("print_msg(): pathlib.Path and pEp.Message supported, but " + str(type(p)) + " delivered") - + m = re.search("(.*)", msg.opt_fields["pEp.sync"].replace("\n", " ")) if m: if etree: @@ -87,6 +87,15 @@ def print_msg(p): def messageToSend(msg): + msg = add_debug_info(msg) + minimail.send(inbox, msg, device_name) + +def messageImapToSend(msg): + import miniimap + msg = add_debug_info(msg) + miniimap.send('Inbox', msg) + +def add_debug_info(msg): if msg.enc_format: m, keys, rating, flags = msg.decrypt(DONT_TRIGGER_SYNC) else: @@ -94,7 +103,7 @@ def messageToSend(msg): text = "\n" + m.attachments[0].decode() output(text) msg.opt_fields = { "pEp.sync": text } - minimail.send(inbox, msg, device_name) + return msg class UserInterface(pEp.UserInterface): @@ -132,7 +141,11 @@ def shutdown_sync(): pEp.shutdown_sync() -def run(name, color=None, own_ident=1): +def run(name, color=None, imap=False, own_ident=1): + if imap: + import miniimap + import imap_settings + global device_name device_name = name @@ -146,18 +159,23 @@ def run(name, color=None, own_ident=1): elif color == "cyan": pEp.debug_color(36) - me = pEp.Identity("alice@peptest.ch", name + " of Alice Neuman", name) - pEp.myself(me) + if imap: + me = pEp.Identity(imap_settings.IMAP_EMAIL, name + " of " + imap_settings.IMAP_USER, name) + pEp.myself(me) + pEp.messageToSend = messageImapToSend + else: + me = pEp.Identity("alice@peptest.ch", name + " of Alice Neuman", name) + pEp.myself(me) - if own_ident >= 2: - me2 = pEp.Identity("alice@pep.security", name + " of Alice Neuman", name) - pEp.myself(me2) + if own_ident >= 2: + me2 = pEp.Identity("alice@pep.security", name + " of Alice Neuman", name) + pEp.myself(me2) - if own_ident == 3: - me3 = pEp.Identity("alice@pep.foundation", name + " of Alice Neuman", name) - pEp.myself(me3) + if own_ident == 3: + me3 = pEp.Identity("alice@pep.foundation", name + " of Alice Neuman", name) + pEp.myself(me3) - pEp.messageToSend = messageToSend + pEp.messageToSend = messageToSend if multithreaded: from threading import Thread @@ -175,7 +193,10 @@ def run(name, color=None, own_ident=1): try: while not the_end: - l = minimail.recv_all(inbox, name) + if imap: + l = miniimap.recv_all() + else: + l = minimail.recv_all(inbox, name) for n, m in l: msg = pEp.Message(m) output("*** Reading") @@ -209,8 +230,12 @@ if __name__=="__main__": help="use multithreaded instead of single threaded implementation") optParser.add_option("-n", "--noend", action="store_true", dest="noend", help="do not end") + optParser.add_option("-i", "--imap", action="store_true", + dest="imap", + help="use imap instead of minimail") optParser.add_option("-o", "--own-identities", type="int", dest="own_ident", help="simulate having OWN_IDENT own identities (1 to 3)", default=1) + options, args = optParser.parse_args() if not options.exec_for: @@ -228,6 +253,9 @@ if __name__=="__main__": if options.noend: end_on = (None,) + if options.imap and options.own_ident >1: + raise ValueError("Multiple own identities not supported for imap mode") + multithreaded = options.multithreaded - run(options.exec_for, options.color, options.own_ident) + run(options.exec_for, options.color, options.imap, options.own_ident) diff --git a/test/sync_test.py b/test/sync_test.py index 1083f09..6d0511e 100644 --- a/test/sync_test.py +++ b/test/sync_test.py @@ -20,7 +20,10 @@ import shutil import pathlib -def test_for(path, color=None, end_on=None, mt=False): +def test_for(path, color=None, end_on=None, mt=False, imap=False): + if imap: + import miniimap + cwd = os.getcwd(); os.chdir(path) os.environ["HOME"] = os.getcwd() @@ -31,7 +34,7 @@ def test_for(path, color=None, end_on=None, mt=False): sync_handshake.end_on = end_on sync_handshake.multithreaded = mt - sync_handshake.run(path, color) + sync_handshake.run(path, color, imap) os.chdir(cwd) @@ -98,26 +101,37 @@ if __name__ == "__main__": optParser.add_option("-j", "--multi-threaded", action="store_true", dest="multithreaded", help="use multithreaded instead of single threaded implementation") + optParser.add_option("-i", "--imap", action="store_true", + dest="imap", + help="use imap instead of minimail") options, args = optParser.parse_args() if options.cleanall: options.clean = True if options.clean: - rmrf("TestInbox") - rmrf("Phone") - rmrf("Laptop") - rmrf("Pad") - - if options.cleanall: - rmrf("Backup") - - if options.setup_only: - os.makedirs("TestInbox", exist_ok=True) - setup("Phone") - setup("Laptop") - if options.third: - setup("Pad") + + if options.imap: + miniimap.clean_inbox() + + if options.cleanall: + rmrf("Backup") + + else: + rmrf("TestInbox") + rmrf("Phone") + rmrf("Laptop") + rmrf("Pad") + + if options.cleanall: + rmrf("Backup") + + if options.setup_only: + os.makedirs("TestInbox", exist_ok=True) + setup("Phone") + setup("Laptop") + if options.third: + setup("Pad") elif options.backup: @@ -128,21 +142,39 @@ if __name__ == "__main__": except FileExistsError: pass - shutil.copytree("Phone", "Backup/Phone", symlinks=True, copy_function=shutil.copy2) - shutil.copytree("Laptop", "Backup/Laptop", symlinks=True, copy_function=shutil.copy2) - shutil.copytree("Pad", "Backup/Pad", symlinks=True, copy_function=shutil.copy2) - shutil.copytree("TestInbox", "Backup/TestInbox", symlinks=True, copy_function=shutil.copy2) + if options.imap: + try: + os.mkdir("Backup/TestInbox") + except FileExistsError: + pass + miniimap.backup_inbox() + else: + shutil.copytree("Phone", "Backup/Phone", symlinks=True, copy_function=shutil.copy2) + shutil.copytree("Laptop", "Backup/Laptop", symlinks=True, copy_function=shutil.copy2) + shutil.copytree("TestInbox", "Backup/TestInbox", symlinks=True, copy_function=shutil.copy2) + try: + shutil.copytree("Pad", "Backup/Pad", symlinks=True, copy_function=shutil.copy2) + except FileNotFoundError: + pass - elif options.restore: - rmrf("TestInbox") - rmrf("Phone") - rmrf("Laptop") - rmrf("Pad") - shutil.copytree("Backup/Phone", "Phone", symlinks=True, copy_function=shutil.copy2) - shutil.copytree("Backup/Laptop", "Laptop", symlinks=True, copy_function=shutil.copy2) - shutil.copytree("Backup/Pad", "Pad", symlinks=True, copy_function=shutil.copy2) - shutil.copytree("Backup/TestInbox", "TestInbox", symlinks=True, copy_function=shutil.copy2) + elif options.restore: + if options.imap: + miniimap.clean_inbox() + miniimap.restore_inbox() + else: + rmrf("TestInbox") + rmrf("Phone") + rmrf("Laptop") + rmrf("Pad") + + shutil.copytree("Backup/Phone", "Phone", symlinks=True, copy_function=shutil.copy2) + shutil.copytree("Backup/Laptop", "Laptop", symlinks=True, copy_function=shutil.copy2) + shutil.copytree("Backup/TestInbox", "TestInbox", symlinks=True, copy_function=shutil.copy2) + try: + shutil.copytree("Backup/Pad", "Pad", symlinks=True, copy_function=shutil.copy2) + except FileNotFoundError: + pass elif options.print: from sync_handshake import print_msg @@ -152,7 +184,7 @@ if __name__ == "__main__": l.sort(key=(lambda p: p.stat().st_mtime)) for p in l: print_msg(p) - + else: from multiprocessing import Process @@ -173,12 +205,12 @@ if __name__ == "__main__": end_on = (None,) Phone = Process(target=test_for, args=("Phone", "red", end_on, - options.multithreaded)) + options.multithreaded, options.imap)) Laptop = Process(target=test_for, args=("Laptop", "green", end_on, - options.multithreaded)) + options.multithreaded, options.imap)) if options.third: Pad = Process(target=test_for, args=("Pad", "cyan", end_on, - options.multithreaded)) + options.multithreaded, options.imap)) Phone.start() Laptop.start()