Browse Source

added imap module for sync_test and sync_handshake PYADPT-47

PYADPT-55
David 6 years ago
parent
commit
3b07321501
  1. 2
      test/README.md
  2. 5
      test/imap_settings.py
  3. 48
      test/miniimap.py
  4. 18
      test/sync_handshake.py
  5. 11
      test/sync_test.py

2
test/README.md

@ -21,7 +21,7 @@ $ HOME=$PWD lldb python3 -- ../sync_handshake.py -e Phone
Then this side is doing a replay in the debugger. Using touch to set a Then this side is doing a replay in the debugger. Using touch to set a
different timestamp on the marker will only partly replay. different timestamp on the marker will only partly replay.
In order to work with IMAP you need to create a sync_settings.py file with the In order to work with IMAP you need to create a imap_settings.py file with the
following variables: following variables:
IMAP_HOST = 'domain.ch' IMAP_HOST = 'domain.ch'

5
test/imap_settings.py

@ -0,0 +1,5 @@
IMAP_HOST = 'exchange.peptest.ch'
IMAP_PORT = '993'
IMAP_USER = r'testnet\test046'
IMAP_PWD = 'pEpdichauf5'
IMAP_EMAIL = 'test046@exchange.peptest.ch'

48
test/miniimap.py

@ -3,33 +3,20 @@ import pprint
import email.message import email.message
import email.charset import email.charset
import time import time
import sync_settings as settings import os
import imap_settings as settings
def connect(): def connect():
"connect to the IMAP server Inbox" "connect to the IMAP server Inbox"
server = imaplib.IMAP4_SSL(settings.IMAP_HOST) server = imaplib.IMAP4_SSL(settings.IMAP_HOST)
server.login(settings.IMAP_USER, settings.IMAP_PWD) server.login(settings.IMAP_USER, settings.IMAP_PWD)
server.select('Inbox') tmp, data = server.select('Inbox')
if os.environ.get('NUMMESSAGES') is None:
os.environ["NUMMESSAGES"] = data[0].decode("UTF-8")
return server return server
def pEpMessage_to_imap(msg):
"convert pEpMessage to python imap formatted string"
new_message = email.message.Message()
new_message["From"] = str(msg.from_).replace("\n", " ")
new_message["To"] = str(msg.to[0])
new_message["Subject"] = msg.shortmsg
if msg.opt_fields:
for field, value in msg.opt_fields.items():
new_message[field] = str(value).replace("\n", " ")
new_message.set_payload(msg.longmsg)
new_message.set_charset(email.charset.Charset("utf-8"))
encoded_message = str(new_message).encode("utf-8")
return encoded_message
def bytesmessage_to_string(msg): def bytesmessage_to_string(msg):
"converts bytes-like message to string" "converts bytes-like message to string"
msg = msg.decode("UTF-8").rstrip() msg = msg.decode("UTF-8").rstrip()
@ -37,30 +24,30 @@ def bytesmessage_to_string(msg):
def send(inbox, msg): def send(inbox, msg):
"send msg to inbox in MIME format" "send msg to inbox in MIME format"
print("send imap")
server = connect() server = connect()
msg = pEpMessage_to_imap(msg) tmp, data = server.append('Inbox', '', imaplib.Time2Internaldate(time.time()), str(msg).encode("UTF-8"))
print('******** sent msg *******')
print(msg)
server.append('Inbox', '', imaplib.Time2Internaldate(time.time()), msg)
server.close() server.close()
def recv_all(inbox, start_time): def recv_all(inbox):
"""receive a list of new MIME messages from inbox, which are newer than the """receive a list of all MIME messages from inbox newer than the last message when first connected"""
start_time""" print("recieve imap")
server = connect() server = connect()
r = [] r = []
tmp, data = server.search(None, 'ALL') tmp, data = server.search(None, 'ALL')
# tmp, data = server.search(None, 'SENTSINCE {0}'.format(start_time.strftime("%d-%b-%Y %H:%M%S")))
oldermsgid = os.environ.get('NUMMESSAGES')
for num in data[0].split(): for num in data[0].split():
if int(num) >= int(oldermsgid):
tmp, data = server.fetch(num, '(RFC822)') tmp, data = server.fetch(num, '(RFC822)')
msg = bytesmessage_to_string(data[0][1]) msg = bytesmessage_to_string(data[0][1])
r.append((num, msg)) r.append((num, msg))
print('******** recieved msg *******') os.environ["NUMMESSAGES"] = num.decode("UTF-8")
print(msg)
server.close() server.close()
@ -68,11 +55,14 @@ def recv_all(inbox, start_time):
def clean_inbox(): def clean_inbox():
print('clean IMAP') """clean all messsages from IMAP inbox"""
print('cleaning IMAP...')
server = connect() server = connect()
typ, data = server.search(None, 'ALL') typ, data = server.search(None, 'ALL')
for num in data[0].split(): for num in data[0].split():
server.store(num, '+FLAGS', '\\Deleted') server.store(num, '+FLAGS', '\\Deleted')
server.expunge() server.expunge()
server.close()
print('IMAP inbox empty.')

18
test/sync_handshake.py

@ -25,7 +25,7 @@ import pEp
import minimail import minimail
import miniimap import miniimap
import sync_settings as settings import imap_settings
from datetime import datetime from datetime import datetime
@ -105,6 +105,7 @@ def messageToSend(msg):
minimail.send(inbox, msg, device_name) minimail.send(inbox, msg, device_name)
def messageImapToSend(msg): def messageImapToSend(msg):
print("send imap message")
if msg.enc_format: if msg.enc_format:
m, keys, rating, flags = msg.decrypt(DONT_TRIGGER_SYNC) m, keys, rating, flags = msg.decrypt(DONT_TRIGGER_SYNC)
else: else:
@ -126,6 +127,7 @@ def getMessageToSend(msg):
class UserInterface(pEp.UserInterface): class UserInterface(pEp.UserInterface):
def notifyHandshake(self, me, partner, signal): def notifyHandshake(self, me, partner, signal):
print('ui.notifyHandshake')
print(colored(str(signal), "yellow"), end=" ") print(colored(str(signal), "yellow"), end=" ")
output("on " + device_name + "" if not me.fpr else output("on " + device_name + "" if not me.fpr else
"for identities " + str(me.fpr) + " " + str(partner.fpr)) "for identities " + str(me.fpr) + " " + str(partner.fpr))
@ -163,6 +165,8 @@ def run(name, color=None, imap=False):
global device_name global device_name
device_name = name device_name = name
print("run sync_handhske")
if color: if color:
global output global output
output = lambda x: print(colored(x, color)) output = lambda x: print(colored(x, color))
@ -174,7 +178,8 @@ def run(name, color=None, imap=False):
pEp.debug_color(36) pEp.debug_color(36)
if imap: if imap:
me = pEp.Identity(settings.IMAP_EMAIL, name + " of " + settings.IMAP_USER, name) print("run handshake using imap")
me = pEp.Identity(imap_settings.IMAP_EMAIL, name + " of " + imap_settings.IMAP_USER, name)
pEp.myself(me) pEp.myself(me)
pEp.messageToSend = messageImapToSend pEp.messageToSend = messageImapToSend
else: else:
@ -195,13 +200,14 @@ def run(name, color=None, imap=False):
sync = Thread(target=sync_thread) sync = Thread(target=sync_thread)
sync.start() sync.start()
else: else:
print('no threading')
sync = None sync = None
ui = UserInterface() ui = UserInterface()
try: try:
while not the_end: while not the_end:
if imap: if imap:
l = miniimap.recv_all(inbox, 'start_time') l = miniimap.recv_all(inbox)
else: else:
l = minimail.recv_all(inbox, name) l = minimail.recv_all(inbox, name)
for n, m in l: for n, m in l:
@ -237,6 +243,10 @@ if __name__=="__main__":
help="use multithreaded instead of single threaded implementation") help="use multithreaded instead of single threaded implementation")
optParser.add_option("-n", "--noend", action="store_true", optParser.add_option("-n", "--noend", action="store_true",
dest="noend", help="do not end") dest="noend", help="do not end")
optParser.add_option("-i", "--imap", action="store_true",
dest="imap",
help="use imap instead of minimail")
options, args = optParser.parse_args() options, args = optParser.parse_args()
if not options.exec_for: if not options.exec_for:
@ -252,5 +262,5 @@ if __name__=="__main__":
end_on = (None,) end_on = (None,)
multithreaded = options.multithreaded multithreaded = options.multithreaded
run(options.exec_for, options.color) run(options.exec_for, options.color, options.imap)

11
test/sync_test.py

@ -108,14 +108,17 @@ if __name__ == "__main__":
options.clean = True options.clean = True
if options.clean: if options.clean:
rmrf("TestInbox")
rmrf("Phone") if options.imap:
rmrf("Laptop")
rmrf("Pad")
try: try:
miniimap.clean_inbox() miniimap.clean_inbox()
except: except:
pass pass
else:
rmrf("TestInbox")
rmrf("Phone")
rmrf("Laptop")
rmrf("Pad")
if options.cleanall: if options.cleanall:
rmrf("Backup") rmrf("Backup")

Loading…
Cancel
Save