#!/usr/bin/python3 import email.parser import hashlib import html import html.parser import mailbox import os import re import subprocess import sys import tempfile import urllib.parse import jinja2 basedir = "." jenv = jinja2.Environment( loader=jinja2.FileSystemLoader(["templates"]), autoescape=True, ) def get_message_id(msg): """ Extract the message id from a message Note that this assumes that there is (at least) one message id. If this is not the case, it will raise an exception (currently an IndexError, but we may use something more suitable in the future). """ match = re.search(r'<(.*?)>', msg["Message-ID"]) return match.group(1) def encode_message_id(msgid): encmsgid = re.sub('[^!"$(-.0-9:=@-z|~]', lambda x: "{%02x}" % (ord(x.group(0))), msgid) return encmsgid def render_message(msg): msgtmpl = jenv.get_template("message2.html") bodyhtml = render_body(msg) context = { "message_id": msg["Message-Id"], "subject": msg["Subject"], "from": msg["From"], "date": msg["Date"], "bodyhtml": bodyhtml, } msghtml = msgtmpl.render(context) return jinja2.Markup(msghtml) partial_message_cache = {} def render_body(msg): content_type = msg.get_content_type() if content_type == "text/plain": bodytmpl = jenv.get_template("body_text_plain.html") context = { "body": msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1") } bodyhtml = bodytmpl.render(context) elif content_type == "multipart/mixed": partshtml = [] for part in msg.get_payload(): partshtml.append(render_body(part)) bodytmpl = jenv.get_template("body_multipart_mixed.html") context = { "parts": partshtml } bodyhtml = bodytmpl.render(context) elif content_type == "multipart/digest": partshtml = [] for part in msg.get_payload(): partshtml.append(render_message(part)) bodytmpl = jenv.get_template("body_multipart_digest.html") context = { "parts": partshtml } bodyhtml = bodytmpl.render(context) elif content_type == "message/rfc822": partshtml = [] for part in msg.get_payload(): partshtml.append(render_message(part)) bodytmpl = jenv.get_template("body_message_rfc822.html") context = { "parts": partshtml } bodyhtml = bodytmpl.render(context) elif content_type == "text/html": htmlpart = HTMLPart() htmlpart.feed(msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1")) bodytmpl = jenv.get_template("body_text_html.html") context = { "body": jinja2.Markup(htmlpart.as_string()) } bodyhtml = bodytmpl.render(context) elif content_type == "text/enriched": tepart = TextEnrichedPart(msg.get_payload()) bodytmpl = jenv.get_template("body_text_enriched.html") context = { "body": jinja2.Markup(tepart.as_string()) } bodyhtml = bodytmpl.render(context) elif content_type == "message/partial": # Default header for get_param is Content-Type whole_msg_id = msg.get_param("id") if not whole_msg_id in partial_message_cache: # For now we assume that total is present on all parts. This # isn't guarantueed, however, and we may need to handle the # case where total is only present on the last part. partial_message_cache[whole_msg_id] = [None] * int(msg.get_param("total")) payload = msg.get_payload() s = payload[0].as_string() # Only one part partial_message_cache[whole_msg_id][int(msg.get_param("number"))-1] = s if not None in partial_message_cache[whole_msg_id]: p = email.parser.Parser() whole_msg = p.parsestr("".join(partial_message_cache[whole_msg_id])) whole_msg_embedded_id = whole_msg["Message-Id"] if not whole_msg_embedded_id: whole_msg.add_header("Message-Id", "<" + whole_msg_id + ">") whole_msg_embedded_id = whole_msg_id archive(whole_msg) del partial_message_cache[whole_msg_id] bodyhtml = "
This is part %d of %d of %s
" % ( int(msg.get_param("number")), int(msg.get_param("total")), encode_message_id(whole_msg_id), html.escape(whole_msg_id)) elif content_type == "application/octet-stream": name = msg.get_param("name") or "(data)" m = hashlib.sha256() payload = msg.get_payload(decode=True) m.update(payload) filename = m.hexdigest() + ".bin" os.makedirs("parts", exist_ok=True) with open("parts/" + filename, "wb") as fh: fh.write(payload) bodytmpl = jenv.get_template("body_application_octet_stream.html") context = { "name": name, "url": "../../parts/" + filename, } bodyhtml = bodytmpl.render(context) elif content_type == "multipart/signed": content, signature = msg.get_payload() with tempfile.NamedTemporaryFile(buffering=0) as content_fh: content_fh.write(content.as_bytes()) with tempfile.NamedTemporaryFile(buffering=0, suffix=".asc") as signature_fh: signature_fh.write(signature.get_payload(decode=True)) r = subprocess.run(["gpg", "--verify", signature_fh.name, content_fh.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) gpgresult = r.stderr # Analyze gpgresult or just use r,returncode? gpgstatus = "dubious" contenthtml = render_message(content) bodytmpl = jenv.get_template("body_multipart_signed.html") context = { "content": contenthtml, "gpgresult": gpgresult, "gpgstatus": gpgstatus, } bodyhtml = bodytmpl.render(context) else: raise RuntimeError("Content-type " + content_type + " not implemented yet") return jinja2.Markup(bodyhtml) def archive(msg): mid = get_message_id(msg) encmid = encode_message_id(mid) msgdir = basedir + "/msg/" + encmid os.makedirs(msgdir, exist_ok=True) with open(msgdir + "/index.html", "w") as hfd: msgtmpl = jenv.get_template("message.html") bodyhtml = render_body(msg) context = { "list": "LUGA", "message_id": mid, "subject": msg["Subject"], "from": msg["From"], "date": msg["Date"], "bodyhtml": bodyhtml, } msghtml = msgtmpl.render(context) hfd.write(msghtml) class HTMLPart(html.parser.HTMLParser): allowed_tags = [ "h2", "a", "wbr", "hr", "pre", "img", "font", "i", "br", "table", "tr", "th", "td", "b", "select", "option", "input", "sup", "address", "center", "p", "h1", "dl", "h3", "ul", "li", "ol", "u" ] hide_tags = [ "title" ] ignore_tags = [ "html", "head", "body", "marquee", "meta", "form", ] def __init__(self): super().__init__() self.hide = False self.content = [] def handle_starttag(self, tag, attrs): if tag == "base": href = [x[1] for x in attrs if x[0] == "href"] if href: self.base = href[0] elif tag in self.allowed_tags: attrstr = "".join( [' %s="%s"' % (a[0], html.escape(a[1])) if a[1] else ' %s' % (a[0]) for a in self.clean_attrs(tag, attrs) ] ) self.content.append("<%s%s>" % ( tag, attrstr )) elif tag in self.hide_tags: self.hide = True elif tag in self.ignore_tags: pass else: print("Encountered unknown start tag", tag, attrs, file=sys.stderr) def handle_endtag(self, tag): if tag in self.allowed_tags: self.content.append("%s>" % tag) elif tag in self.hide_tags: self.hide = False # XXX - Need stack? elif tag in self.ignore_tags: pass else: print("Encountered unknown end tag", tag, file=sys.stderr) def handle_data(self, data): if not self.hide: self.content.append(data) def as_string(self): return "".join(self.content) def clean_attrs(self, tag, attrs): safe_attrs = [ "border", "alt", "size", "face", "width", "height", "hspace", "cellpadding", "cellspacing", "bgcolor", "valign", "nowrap", "color", "colspan", "name", "value", "type", "align", "clear", "noshade" ] clean_attrs = [] for a in attrs: if a[0] in safe_attrs: clean_attrs.append(a) elif a[0] == "href": url = a[1] url = urllib.parse.urljoin(self.base, url) u = urllib.parse.urlparse(url) if u[0] in ['https', 'http', 'ftp']: clean_attrs.append((a[0], url)) elif a[0] == "src": url = a[1] url = urllib.parse.urljoin(self.base, url) u = urllib.parse.urlparse(url) if u[0] == "cid": print("Encountered src cid attribute", a, file=sys.stderr) # XXX - implement cid clean_attrs.append((a[0], url)) else: print("Ignored src attribute", a, file=sys.stderr) elif a[0] == "target": pass else: print("Encountered unknown attribute", a, file=sys.stderr) return clean_attrs class TextEnrichedPart: class TEElement: def __init__(self, t): self.type = t.lower() self.content = [] self.filled = True def append_text(self, s): s = s.replace("<<", "<") if self.filled: s = re.sub(r'\n+', lambda m: m.group(0)[1:] if len(m.group(0)) > 1 else " ", s) self.content.append(s) def as_string(self): if self.type == "": pre = "