diff --git a/test/miniimap.py b/test/miniimap.py
new file mode 100644
index 0000000..3a1f955
--- /dev/null
+++ b/test/miniimap.py
@@ -0,0 +1,78 @@
+import imaplib
+import pprint
+import email.message
+import email.charset
+import time
+import sync_settings as settings
+
+
+def connect():
+ "connect to the IMAP server Inbox"
+ server = imaplib.IMAP4_SSL(settings.IMAP_HOST)
+ server.login(settings.IMAP_USER, settings.IMAP_PWD)
+ server.select('Inbox')
+
+ return server
+
+def pEpMessage_to_imap(msg):
+ "convert pEpMessage to python imap formatted string"
+ new_message = email.message.Message()
+ new_message["From"] = str(msg.from_).replace("\n", " ")
+ new_message["To"] = str(msg.to[0])
+ new_message["Subject"] = msg.shortmsg
+ if msg.opt_fields:
+ for field, value in msg.opt_fields.items():
+ new_message[field] = str(value).replace("\n", " ")
+ new_message.set_payload(msg.longmsg)
+
+ new_message.set_charset(email.charset.Charset("utf-8"))
+ encoded_message = str(new_message).encode("utf-8")
+
+ return encoded_message
+
+def bytesmessage_to_string(msg):
+ "converts bytes-like message to string"
+ msg = msg.decode("UTF-8").rstrip()
+ return msg
+
+def send(inbox, msg):
+ "send msg to inbox in MIME format"
+ server = connect()
+ msg = pEpMessage_to_imap(msg)
+ print('******** sent msg *******')
+ print(msg)
+ server.append('Inbox', '', imaplib.Time2Internaldate(time.time()), msg)
+ server.close()
+
+
+def recv_all(inbox, start_time):
+ """receive a list of new MIME messages from inbox, which are newer than the
+ start_time"""
+
+ server = connect()
+ r = []
+
+ tmp, data = server.search(None, 'ALL')
+ # tmp, data = server.search(None, 'SENTSINCE {0}'.format(start_time.strftime("%d-%b-%Y %H:%M%S")))
+
+ for num in data[0].split():
+ tmp, data = server.fetch(num, '(RFC822)')
+ msg = bytesmessage_to_string(data[0][1])
+ r.append((num, msg))
+ print('******** recieved msg *******')
+ print(msg)
+
+ server.close()
+
+ return r
+
+
+def clean_inbox():
+ print('clean IMAP')
+ server = connect()
+ typ, data = server.search(None, 'ALL')
+ for num in data[0].split():
+ server.store(num, '+FLAGS', '\\Deleted')
+ server.expunge()
+
+
diff --git a/test/sync_handshake.py b/test/sync_handshake.py
index a3a993a..fc1dddb 100644
--- a/test/sync_handshake.py
+++ b/test/sync_handshake.py
@@ -23,6 +23,9 @@ import sys
import re
import pEp
import minimail
+import miniimap
+
+import sync_settings as settings
from datetime import datetime
@@ -72,8 +75,12 @@ def print_msg(p):
else:
raise TypeError("print_msg(): pathlib.Path and pEp.Message supported, but "
+ str(type(p)) + " delivered")
+
+ m = False
- m = re.search("(.*)", msg.opt_fields["pEp.sync"].replace("\n", " "))
+ if msg.opt_fields.get("pEp.sync"):
+ m = re.search("(.*)", msg.opt_fields["pEp.sync"].replace("\n", " "))
+
if m:
if etree:
tree = objectify.fromstring(m.group(1).replace("\r", ""))
@@ -82,6 +89,7 @@ def print_msg(p):
text = m.group(1).replace("\r", "").strip()
while text.count(" "):
text = text.replace(" ", " ")
+ print('-- BEACON --')
print(text)
@@ -95,6 +103,25 @@ def messageToSend(msg):
msg.opt_fields = { "pEp.sync": text }
minimail.send(inbox, msg, device_name)
+def messageImapToSend(msg):
+ if msg.enc_format:
+ m, keys, rating, flags = msg.decrypt(DONT_TRIGGER_SYNC)
+ else:
+ m = msg
+ text = "\n" + m.attachments[0].decode()
+ output(text)
+ msg.opt_fields = { "pEp.sync": text }
+ miniimap.send(inbox, msg)
+
+def getMessageToSend(msg):
+ if msg.enc_format:
+ m, keys, rating, flags = msg.decrypt(DONT_TRIGGER_SYNC)
+ else:
+ m = msg
+ text = "\n" + m.attachments[0].decode()
+ output(text)
+ msg.opt_fields = { "pEp.sync": text }
+ return msg
class UserInterface(pEp.UserInterface):
def notifyHandshake(self, me, partner, signal):
@@ -125,7 +152,7 @@ def shutdown_sync():
pEp.shutdown_sync()
-def run(name, color=None):
+def run(name, color=None, imap=False):
global device_name
device_name = name
@@ -133,9 +160,16 @@ def run(name, color=None):
global output
output = lambda x: print(colored(x, color))
- me = pEp.Identity("alice@peptest.ch", name + " of Alice Neuman", name)
- pEp.myself(me)
- pEp.messageToSend = messageToSend
+ if imap:
+ me = pEp.Identity(settings.IMAP_EMAIL, name + " of " + settings.IMAP_USER, name)
+ pEp.myself(me)
+ pEp.messageToSend = messageImapToSend
+ else:
+ me = pEp.Identity("alice@peptest.ch", name + " of Alice Neuman", name)
+ pEp.myself(me)
+ pEp.messageToSend = messageToSend
+
+
if multithreaded:
from threading import Thread
@@ -153,7 +187,10 @@ def run(name, color=None):
try:
while not the_end:
- l = minimail.recv_all(inbox, name)
+ if imap:
+ l = miniimap.recv_all(inbox, 'start_time')
+ else:
+ l = minimail.recv_all(inbox, name)
for n, m in l:
msg = pEp.Message(m)
output("*** Reading")
diff --git a/test/sync_test.py b/test/sync_test.py
index 1849305..d41bd27 100644
--- a/test/sync_test.py
+++ b/test/sync_test.py
@@ -19,8 +19,9 @@ import sys
import shutil
import pathlib
+import miniimap
-def test_for(path, color=None, end_on=None, mt=False):
+def test_for(path, color=None, end_on=None, mt=False, imap=False):
cwd = os.getcwd();
os.chdir(path)
os.environ["HOME"] = os.getcwd()
@@ -31,7 +32,7 @@ def test_for(path, color=None, end_on=None, mt=False):
sync_handshake.end_on = end_on
sync_handshake.multithreaded = mt
- sync_handshake.run(path, color)
+ sync_handshake.run(path, color, imap)
os.chdir(cwd)
@@ -96,6 +97,9 @@ if __name__ == "__main__":
optParser.add_option("-j", "--multi-threaded", action="store_true",
dest="multithreaded",
help="use multithreaded instead of single threaded implementation")
+ optParser.add_option("-i", "--imap", action="store_true",
+ dest="imap",
+ help="use imap instead of minimail")
options, args = optParser.parse_args()
if options.cleanall:
@@ -106,6 +110,10 @@ if __name__ == "__main__":
rmrf("Phone")
rmrf("Laptop")
rmrf("Pad")
+ try:
+ miniimap.clean_inbox()
+ except:
+ pass
if options.cleanall:
rmrf("Backup")
@@ -160,12 +168,12 @@ if __name__ == "__main__":
except TypeError:
end_on = (end_on,)
Phone = Process(target=test_for, args=("Phone", "red", end_on,
- options.multithreaded))
+ options.multithreaded, options.imap))
Laptop = Process(target=test_for, args=("Laptop", "green", end_on,
- options.multithreaded))
+ options.multithreaded, options.imap))
if options.third:
Pad = Process(target=test_for, args=("Pad", "cyan", end_on,
- options.multithreaded))
+ options.multithreaded, options.imap))
Phone.start()
Laptop.start()