diff --git a/examples/simple_xml_msgformat.py b/examples/simple_xml_msgformat.py new file mode 100644 index 0000000..878ca2d --- /dev/null +++ b/examples/simple_xml_msgformat.py @@ -0,0 +1,131 @@ +#/!usr/bin/env python3 +# +# Copyleft 2020, p≡p Security +# Copyleft 2020, Hartmut Goebel +# +# This file is under GNU General Public License 3.0 +""" +This examples shows how to transport pEp messages within a +simple XML based message format which is capable for attachments. + +pEp message elements loosely follow the "pEp for XML specification" and +https://pEp.software/ns/pEp-1.0.xsd. + +IMPORTANT: In this example, no error checking is done. Neither is + the "pEp for XML" specification enforced. + +Structure of the simple XML based message: + + from-addr + to-addr + text of the messgage (must not be XML) + + textual data + … + + +""" + +from lxml import etree +import email.utils +import pEp + +__all__ = ["serialize_pEp_message", "parse_serialized_message"] + +CT2TAG = { + 'application/pgp-keys': 'keydata', + 'application/pEp.sync': 'sync', + 'application/pEp.distribution': 'distribution', + 'application/pgp-signature': 'signature', +} + +TAG2CT = dict((v,k)for (k,v) in CT2TAG.items()) + +PEP_NAMESPACE = "https://pEp.software/ns/pEp-1.0.xsd" +PEP_NS = '{%s}' % PEP_NAMESPACE +NSMAP = {'pEp': PEP_NAMESPACE} + +INCOMING = 0 +OUTGOING = 1 + +UNENCRYPTED = 0 + +def serialize_pEp_message(msg): + root = etree.Element("msg", nsmap=NSMAP) + etree.SubElement(root, "to").text = str(msg.to[0]) + etree.SubElement(root, "from").text = str(msg.from_) + + if msg.enc_format == UNENCRYPTED: + # unencrypted + etree.SubElement(root, "body").text = msg.longmsg + attachments = etree.SubElement(root, "attachments") + # FIXME: Namespace + attachments = etree.SubElement(attachments, + PEP_NS + "attachments", + version=msg.opt_fields['X-pEp-Version']) + # TODO: opt_fields, esp. auto-consume + # TODO: Order pEp attachments by type + for attach in msg.attachments: + # no need to base64-encode, attachement are ascii-armoured + # already + #attachment = base64_encode(attachment) + # FIXME: Namespace + a = etree.SubElement(attachments, + PEP_NS + CT2TAG[attach.mime_type]) + a.text = attach.decode("ascii") + else: # encrypted + # forget about longmsg and original body + # encrypted message is an attachment and there might be + # further attachments, e.g. new keys + # build a new message out of these attachments + etree.SubElement(root, "body") # emptry body + attachments = etree.SubElement(root, "attachments") + assert len(msg.attachments) == 2 + # first attachment is "Version: 1" + attach = msg.attachments[1] + # no need to base64-encode, attachements are ascii-armoured + # already + n = etree.SubElement(root, PEP_NS + "message") + n.text = attach.decode("ascii") + return etree.tostring(root) + + +def parse_serialized_message(transport_message): + + def addr2identity(text): + name, addr = email.utils.parseaddr(text) + ident = pEp.Identity(addr, name) + ident.update() + return ident + + # parse the XML text, fetch from and to + root = etree.fromstring(transport_message) + from_ = addr2identity(root.xpath("./from/text()")[0]) + msg1 = pEp.Message(INCOMING, from_) + msg1.to.append(addr2identity(root.xpath("./to/text()")[0])) + enc_msg = root.find("{%s}message" % PEP_NAMESPACE) + if enc_msg is not None: + # this is an encrypted message, ignore all but the encrypted message + msg1.attachments = [ + # As of Engine r4652 the encrypted message must be the second + # attachment + pEp.Blob(b"Version: 1", "application/pgp-encrypted"), + pEp.Blob(enc_msg.text.encode(), "application/xxpgp-encrypted")] + else: + # this is an unencrypted message, might contain pEp attachments + msg1.longmsg = root.findtext("body") + pEp_attachments = None + attachments = root.find("attachments") + if attachments is not None: + pEp_attachments = attachments.find("{%s}attachments" % PEP_NAMESPACE) + if pEp_attachments is not None: + msg1.opt_fields['X-pEp-Version'] = pEp_attachments.attrib["version"] + pEp_attachs = [] + for tagname in ("keydata", "signature", "sync", "distribution"): + for attach in pEp_attachments.iterfind( + "{%s}%s" % (PEP_NAMESPACE, tagname)): + pEp_attachs.append( + pEp.Blob(attach.text.encode(), TAG2CT[tagname])) + msg1.attachments = pEp_attachs + msg2, keys, rating, flags = msg1.decrypt() + return msg2, rating