Browse Source

merge imap + avoid crashes when imap settings are not created but no used

PYADPT-55
David 6 years ago
parent
commit
0e26772090
  1. 6
      .hgignore
  2. 9
      test/README.md
  3. 100
      test/miniimap.py
  4. 56
      test/sync_handshake.py
  5. 98
      test/sync_test.py

6
.hgignore

@ -17,5 +17,9 @@ test/Laptop
test/Library
test/Phone
test/TestInbox
test/Backup
test/lib
test/imap_settings.py
venv
launch.json
settings.json

9
test/README.md

@ -21,6 +21,15 @@ $ HOME=$PWD lldb python3 -- ../sync_handshake.py -e Phone
Then this side is doing a replay in the debugger. Using touch to set a
different timestamp on the marker will only partly replay.
In order to work with IMAP you need to create a imap_settings.py file with the
following variables:
IMAP_HOST = 'domain.ch'
IMAP_PORT = '993'
IMAP_USER = 'your_username'
IMAP_PWD = 'password'
IMAP_EMAIL = 'user@domain.ch'
= Hint =
installing termcolor and lxml will beautify the output

100
test/miniimap.py

@ -0,0 +1,100 @@
import imaplib
import pathlib
import time
import os
from secrets import token_urlsafe
try:
import imap_settings as settings
except:
raise ValueError("Imap settings file not found, please check the readme - miniimap")
def connect():
"connect to the IMAP server Inbox"
server = imaplib.IMAP4_SSL(settings.IMAP_HOST)
server.login(settings.IMAP_USER, settings.IMAP_PWD)
tmp, data = server.select('Inbox')
#When you connect to the inbox one of the parameters returned is the current
#number of messages in it
if os.environ.get('NUMMESSAGES') is None:
os.environ["NUMMESSAGES"] = data[0].decode("UTF-8")
return server
def bytesmessage_to_string(msg):
"converts bytes-like message to string"
if type(msg) is bytes:
msg = msg.decode("UTF-8").rstrip()
return msg
else:
return str(msg)
def send(inbox, msg):
"send msg to inbox in MIME format"
server = connect()
tmp, data = server.append(inbox, flags='', date_time=time.time(), message=str(msg).encode("UTF-8"))
server.close()
def recv_all():
"""receive a list of all MIME messages from inbox newer than the last message when first connected"""
server = connect()
r = []
tmp, data = server.search(None, 'ALL')
oldermsgid = os.environ.get('NUMMESSAGES')
for num in data[0].split():
if int(num) >= int(oldermsgid):
tmp, data = server.fetch(num, '(RFC822)')
msg = bytesmessage_to_string(data[0][1])
r.append((num, msg))
os.environ["NUMMESSAGES"] = num.decode("UTF-8")
server.close()
return r
def clean_inbox():
"""clean all messsages from IMAP inbox"""
print('cleaning IMAP...')
server = connect()
tmp, data = server.search(None, 'ALL')
for num in data[0].split():
server.store(num, '+FLAGS', '\\Deleted')
server.expunge()
server.close()
print('IMAP inbox empty.')
def backup_inbox():
"""copy all messsages from IMAP to local backup folder"""
server = connect()
tmp, data = server.search(None, 'ALL')
for num in data[0].split():
tmp, data = server.fetch(num, '(RFC822 BODY[HEADER])')
device = str(data[0][1]).split('From: "')[1].split(' of')[0]
name = device + "_" + token_urlsafe(16) + ".eml"
msg = bytesmessage_to_string(data[0][1])
with open(os.path.join('Backup/TestInbox',name), "wb") as f:
f.write(str(msg).encode())
server.close()
def restore_inbox():
"""copy all the messages from the Backup folder to the IMAP inbox"""
server = connect()
backups = pathlib.Path("./Backup/TestInbox")
emails = backups.glob("*.eml")
l = [ path for path in emails ]
for p in l:
with open(p, "rb") as f:
tmp, data = server.append("Inbox", flags='', date_time=p.stat().st_ctime, message=f.read(-1))
server.close()

56
test/sync_handshake.py

@ -73,7 +73,7 @@ def print_msg(p):
else:
raise TypeError("print_msg(): pathlib.Path and pEp.Message supported, but "
+ str(type(p)) + " delivered")
m = re.search("<keysync>(.*)</keysync>", msg.opt_fields["pEp.sync"].replace("\n", " "))
if m:
if etree:
@ -87,6 +87,15 @@ def print_msg(p):
def messageToSend(msg):
msg = add_debug_info(msg)
minimail.send(inbox, msg, device_name)
def messageImapToSend(msg):
import miniimap
msg = add_debug_info(msg)
miniimap.send('Inbox', msg)
def add_debug_info(msg):
if msg.enc_format:
m, keys, rating, flags = msg.decrypt(DONT_TRIGGER_SYNC)
else:
@ -94,7 +103,7 @@ def messageToSend(msg):
text = "<!-- sending from " + device_name + " -->\n" + m.attachments[0].decode()
output(text)
msg.opt_fields = { "pEp.sync": text }
minimail.send(inbox, msg, device_name)
return msg
class UserInterface(pEp.UserInterface):
@ -132,7 +141,11 @@ def shutdown_sync():
pEp.shutdown_sync()
def run(name, color=None, own_ident=1):
def run(name, color=None, imap=False, own_ident=1):
if imap:
import miniimap
import imap_settings
global device_name
device_name = name
@ -146,18 +159,23 @@ def run(name, color=None, own_ident=1):
elif color == "cyan":
pEp.debug_color(36)
me = pEp.Identity("alice@peptest.ch", name + " of Alice Neuman", name)
pEp.myself(me)
if imap:
me = pEp.Identity(imap_settings.IMAP_EMAIL, name + " of " + imap_settings.IMAP_USER, name)
pEp.myself(me)
pEp.messageToSend = messageImapToSend
else:
me = pEp.Identity("alice@peptest.ch", name + " of Alice Neuman", name)
pEp.myself(me)
if own_ident >= 2:
me2 = pEp.Identity("alice@pep.security", name + " of Alice Neuman", name)
pEp.myself(me2)
if own_ident >= 2:
me2 = pEp.Identity("alice@pep.security", name + " of Alice Neuman", name)
pEp.myself(me2)
if own_ident == 3:
me3 = pEp.Identity("alice@pep.foundation", name + " of Alice Neuman", name)
pEp.myself(me3)
if own_ident == 3:
me3 = pEp.Identity("alice@pep.foundation", name + " of Alice Neuman", name)
pEp.myself(me3)
pEp.messageToSend = messageToSend
pEp.messageToSend = messageToSend
if multithreaded:
from threading import Thread
@ -175,7 +193,10 @@ def run(name, color=None, own_ident=1):
try:
while not the_end:
l = minimail.recv_all(inbox, name)
if imap:
l = miniimap.recv_all()
else:
l = minimail.recv_all(inbox, name)
for n, m in l:
msg = pEp.Message(m)
output("*** Reading")
@ -209,8 +230,12 @@ if __name__=="__main__":
help="use multithreaded instead of single threaded implementation")
optParser.add_option("-n", "--noend", action="store_true",
dest="noend", help="do not end")
optParser.add_option("-i", "--imap", action="store_true",
dest="imap",
help="use imap instead of minimail")
optParser.add_option("-o", "--own-identities", type="int", dest="own_ident",
help="simulate having OWN_IDENT own identities (1 to 3)", default=1)
options, args = optParser.parse_args()
if not options.exec_for:
@ -228,6 +253,9 @@ if __name__=="__main__":
if options.noend:
end_on = (None,)
if options.imap and options.own_ident >1:
raise ValueError("Multiple own identities not supported for imap mode")
multithreaded = options.multithreaded
run(options.exec_for, options.color, options.own_ident)
run(options.exec_for, options.color, options.imap, options.own_ident)

98
test/sync_test.py

@ -20,7 +20,10 @@ import shutil
import pathlib
def test_for(path, color=None, end_on=None, mt=False):
def test_for(path, color=None, end_on=None, mt=False, imap=False):
if imap:
import miniimap
cwd = os.getcwd();
os.chdir(path)
os.environ["HOME"] = os.getcwd()
@ -31,7 +34,7 @@ def test_for(path, color=None, end_on=None, mt=False):
sync_handshake.end_on = end_on
sync_handshake.multithreaded = mt
sync_handshake.run(path, color)
sync_handshake.run(path, color, imap)
os.chdir(cwd)
@ -98,26 +101,37 @@ if __name__ == "__main__":
optParser.add_option("-j", "--multi-threaded", action="store_true",
dest="multithreaded",
help="use multithreaded instead of single threaded implementation")
optParser.add_option("-i", "--imap", action="store_true",
dest="imap",
help="use imap instead of minimail")
options, args = optParser.parse_args()
if options.cleanall:
options.clean = True
if options.clean:
rmrf("TestInbox")
rmrf("Phone")
rmrf("Laptop")
rmrf("Pad")
if options.cleanall:
rmrf("Backup")
if options.setup_only:
os.makedirs("TestInbox", exist_ok=True)
setup("Phone")
setup("Laptop")
if options.third:
setup("Pad")
if options.imap:
miniimap.clean_inbox()
if options.cleanall:
rmrf("Backup")
else:
rmrf("TestInbox")
rmrf("Phone")
rmrf("Laptop")
rmrf("Pad")
if options.cleanall:
rmrf("Backup")
if options.setup_only:
os.makedirs("TestInbox", exist_ok=True)
setup("Phone")
setup("Laptop")
if options.third:
setup("Pad")
elif options.backup:
@ -128,21 +142,39 @@ if __name__ == "__main__":
except FileExistsError:
pass
shutil.copytree("Phone", "Backup/Phone", symlinks=True, copy_function=shutil.copy2)
shutil.copytree("Laptop", "Backup/Laptop", symlinks=True, copy_function=shutil.copy2)
shutil.copytree("Pad", "Backup/Pad", symlinks=True, copy_function=shutil.copy2)
shutil.copytree("TestInbox", "Backup/TestInbox", symlinks=True, copy_function=shutil.copy2)
if options.imap:
try:
os.mkdir("Backup/TestInbox")
except FileExistsError:
pass
miniimap.backup_inbox()
else:
shutil.copytree("Phone", "Backup/Phone", symlinks=True, copy_function=shutil.copy2)
shutil.copytree("Laptop", "Backup/Laptop", symlinks=True, copy_function=shutil.copy2)
shutil.copytree("TestInbox", "Backup/TestInbox", symlinks=True, copy_function=shutil.copy2)
try:
shutil.copytree("Pad", "Backup/Pad", symlinks=True, copy_function=shutil.copy2)
except FileNotFoundError:
pass
elif options.restore:
rmrf("TestInbox")
rmrf("Phone")
rmrf("Laptop")
rmrf("Pad")
shutil.copytree("Backup/Phone", "Phone", symlinks=True, copy_function=shutil.copy2)
shutil.copytree("Backup/Laptop", "Laptop", symlinks=True, copy_function=shutil.copy2)
shutil.copytree("Backup/Pad", "Pad", symlinks=True, copy_function=shutil.copy2)
shutil.copytree("Backup/TestInbox", "TestInbox", symlinks=True, copy_function=shutil.copy2)
elif options.restore:
if options.imap:
miniimap.clean_inbox()
miniimap.restore_inbox()
else:
rmrf("TestInbox")
rmrf("Phone")
rmrf("Laptop")
rmrf("Pad")
shutil.copytree("Backup/Phone", "Phone", symlinks=True, copy_function=shutil.copy2)
shutil.copytree("Backup/Laptop", "Laptop", symlinks=True, copy_function=shutil.copy2)
shutil.copytree("Backup/TestInbox", "TestInbox", symlinks=True, copy_function=shutil.copy2)
try:
shutil.copytree("Backup/Pad", "Pad", symlinks=True, copy_function=shutil.copy2)
except FileNotFoundError:
pass
elif options.print:
from sync_handshake import print_msg
@ -152,7 +184,7 @@ if __name__ == "__main__":
l.sort(key=(lambda p: p.stat().st_mtime))
for p in l:
print_msg(p)
else:
from multiprocessing import Process
@ -173,12 +205,12 @@ if __name__ == "__main__":
end_on = (None,)
Phone = Process(target=test_for, args=("Phone", "red", end_on,
options.multithreaded))
options.multithreaded, options.imap))
Laptop = Process(target=test_for, args=("Laptop", "green", end_on,
options.multithreaded))
options.multithreaded, options.imap))
if options.third:
Pad = Process(target=test_for, args=("Pad", "cyan", end_on,
options.multithreaded))
options.multithreaded, options.imap))
Phone.start()
Laptop.start()

Loading…
Cancel
Save