#!/usr/bin/python3 import email.header import email.parser import hashlib import html import html.parser import mailbox import os import re import subprocess import sys import tempfile import urllib.parse import jinja2 basedir = "." jenv = jinja2.Environment( loader=jinja2.FileSystemLoader(["templates"]), autoescape=True, ) def get_message_id(msg): """ Extract the message id from a message Note that this assumes that there is (at least) one message id. If this is not the case, it will raise an exception (currently an IndexError, but we may use something more suitable in the future). """ match = re.search(r'<(.*?)>', msg["Message-ID"]) return match.group(1) def encode_message_id(msgid): encmsgid = re.sub('[^!"$(-.0-9:=@-z|~]', lambda x: "{%02x}" % (ord(x.group(0))), msgid) return encmsgid def decode_rfc2047(s): if s is None: return None r = "" for chunk in email.header.decode_header(s): if chunk[1]: try: r += chunk[0].decode(chunk[1]) except LookupError: r += chunk[0].decode("windows-1252") except UnicodeDecodeError: r += chunk[0].decode("windows-1252") elif type(chunk[0]) == bytes: r += chunk[0].decode('us-ascii') else: r += chunk[0] return r def render_message(msg): msgtmpl = jenv.get_template("message2.html") bodyhtml = render_body(msg) context = { "msg": msg, "message_id": msg["Message-Id"], "subject": decode_rfc2047(msg["Subject"]), "from": decode_rfc2047(msg["From"]), "date": msg["Date"], "bodyhtml": bodyhtml, } msghtml = msgtmpl.render(context) return jinja2.Markup(msghtml) def save_part(msg, disposition): content_type = msg.get_content_type() extension = { "application/octet-stream": ".bin", "text/html": ".html", "text/x-vcard": ".vcf", "text/plain": ".txt", "application/x-gzip": ".gz", # more likely tar.gz, but we can't know without looking into it which we ain't "image/gif": ".gif", "text/x-c": ".c", "application/x-perl": ".pl", "application/msword": ".doc", "application/ms-tnef": ".ms-tnef", "application/x-bzip2": ".bz2", # more likely tar.bz2, but we can't know without looking into it which we ain't "application/x-shellscript": ".sh", "application/x-java-vm": ".bin", # The only instances are mis-labelled "image/png": ".png", "application/pgp-keys": ".pgp", "application/x-gunzip": ".gz", # that sort of makes sense, but not really "image/jpeg": ".jpg", "text/x-python": ".py", "text/x-java": ".java", "application/x-sh": ".sh", "text/x-patch": ".patch", "text/x-c++src": ".c++", "application/x-compressed-tar": ".tar.gz", "application/vnd.oasis.opendocument.text": ".odt", "text/x-perl": ".pl", "application/pgp-signature": ".pgp", "image/svg+xml": ".svg", }[content_type] name = msg.get_param("name") or "(data)" m = hashlib.sha256() payload = msg.get_payload(decode=True) m.update(payload) filename = m.hexdigest() + extension os.makedirs("parts", exist_ok=True) with open("parts/" + filename, "wb") as fh: fh.write(payload) url = "../../parts/" + filename if disposition == "_url": return url else: template_name = disposition + "_" + content_type.replace("/", "_") + ".html" bodytmpl = jenv.get_template(template_name) context = { "name": name, "url": url, } bodyhtml = bodytmpl.render(context) return bodyhtml partial_message_cache = {} def render_body(msg, extra=None): def render_text_plain(msg, extra=None): # msg.get_charset() doesn't work ct_params = dict(msg.get_params() or []) charset = ct_params.get("charset", "iso-8859-1") format = ct_params.get("format", "fixed") if format == "fixed": bodytmpl = jenv.get_template("body_text_plain.html") partbytes = msg.get_payload(decode=True) try: parttext = partbytes.decode(charset, errors="replace") except LookupError as e: # Unknown encoding? Probably win-1252 print(e, file=sys.stderr) parttext = partbytes.decode("windows-1252", errors="replace") context = { "body": parttext } return bodytmpl.render(context) elif format == "flowed": bodytmpl = jenv.get_template("body_text_plain_flowed.html") parthtml = TextFlowedPart(msg).as_string() context = { "body": jinja2.Markup(parthtml), } return bodytmpl.render(context) else: raise NotImplementedError() def render_multipart_mixed(msg, extra=None): parts = msg.get_payload() if type(parts) == str: # mislabelled, assume text/plain return render_text_plain(msg) # First, scan for parts with a content-id. A multipart/mixed shouldn't # have them, but I've seen them in the wild and it should be harmless # to support at least images. We don't want all content types, though, # because save_part doesn't support nested parts and I don't want to # fully implement what is really just a workaround for buggy software. for i, part in enumerate(msg.get_payload()): content_id = part.get("Content-Id") content_type = part.get_content_type() if content_id and content_type.startswith("image/"): if extra is None: extra = {} extra[content_id] = { "i": i, "part": part, "url": save_part(part, "_url"), } partshtml = [] for part in msg.get_payload(): partshtml.append(render_body(part, extra)) bodytmpl = jenv.get_template("body_multipart_mixed.html") context = { "parts": partshtml } return bodytmpl.render(context) def render_multipart_digest(msg, extra=None): partshtml = [] for part in msg.get_payload(): partshtml.append(render_message(part)) bodytmpl = jenv.get_template("body_multipart_digest.html") context = { "parts": partshtml } return bodytmpl.render(context) def render_message_rfc822(msg, extra=None): partshtml = [] for part in msg.get_payload(): partshtml.append(render_message(part)) bodytmpl = jenv.get_template("body_message_rfc822.html") context = { "parts": partshtml } return bodytmpl.render(context) def render_text_html(msg, extra=None): htmlpart = HTMLPart(extra) htmlpart.feed(msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1")) bodytmpl = jenv.get_template("body_text_html.html") context = { "body": jinja2.Markup(htmlpart.as_string()) } return bodytmpl.render(context) def render_text_enriched(msg, extra=None): payload = msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1") tepart = TextEnrichedPart(payload) bodytmpl = jenv.get_template("body_text_enriched.html") context = { "body": jinja2.Markup(tepart.as_string()) } return bodytmpl.render(context) def render_message_partial(msg, extra=None): # Default header for get_param is Content-Type whole_msg_id = msg.get_param("id") if not whole_msg_id in partial_message_cache: # For now we assume that total is present on all parts. This # isn't guarantueed, however, and we may need to handle the # case where total is only present on the last part. partial_message_cache[whole_msg_id] = [None] * int(msg.get_param("total")) payload = msg.get_payload() s = payload[0].as_string() # Only one part partial_message_cache[whole_msg_id][int(msg.get_param("number"))-1] = s if not None in partial_message_cache[whole_msg_id]: p = email.parser.Parser() whole_msg = p.parsestr("".join(partial_message_cache[whole_msg_id])) whole_msg_embedded_id = whole_msg["Message-Id"] if not whole_msg_embedded_id: whole_msg.add_header("Message-Id", "<" + whole_msg_id + ">") whole_msg_embedded_id = whole_msg_id archive(whole_msg) del partial_message_cache[whole_msg_id] return "
This is part %d of %d of %s
" % ( int(msg.get_param("number")), int(msg.get_param("total")), encode_message_id(whole_msg_id), html.escape(whole_msg_id)) def render_application_octet_stream(msg, extra=None): return save_part(msg, "attachment") def render_multipart_signed(msg, extra=None): content, signature = msg.get_payload() with tempfile.NamedTemporaryFile(buffering=0) as content_fh: content_fh.write(content.as_bytes()) with tempfile.NamedTemporaryFile(buffering=0, suffix=".asc") as signature_fh: signature_fh.write(signature.get_payload(decode=True)) r = subprocess.run(["gpg", "--verify", signature_fh.name, content_fh.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) gpgresult = r.stderr # XXX - Analyze gpgresult or just use r,returncode? gpgstatus = "dubious" contenthtml = render_message(content) bodytmpl = jenv.get_template("body_multipart_signed.html") context = { "content": contenthtml, "gpgresult": gpgresult, "gpgstatus": gpgstatus, } return bodytmpl.render(context) def render_application_pgp(msg, extra=None): with tempfile.NamedTemporaryFile(buffering=0) as content_fh: content_fh.write(msg.get_payload(decode=True)) r = subprocess.run(["gpg", "--decrypt", content_fh.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) gpgresult = r.stderr.decode() # XXX - Analyze gpgresult or just use r,returncode? gpgstatus = "dubious" decrypted_content = r.stdout p = email.parser.BytesParser() embedded_message = p.parsebytes(decrypted_content) contenthtml = render_message(embedded_message) bodytmpl = jenv.get_template("body_application_pgp.html") context = { "content": contenthtml, "gpgresult": gpgresult, "gpgstatus": gpgstatus, } return bodytmpl.render(context) def render_multipart_alternative(msg, extra=None): partshtml = [] partstypes = [] for part in msg.get_payload(): partstypes.append(part.get_content_type()) partshtml.append(render_body(part, extra)) bodytmpl = jenv.get_template("body_multipart_alternative.html") context = { "types": partstypes, "parts": partshtml, } return bodytmpl.render(context) def render_application_x_unknown_content_type_scpfile(msg, extra=None): bodytmpl = jenv.get_template("body_application_x-unknown-content-type-scpfile.html") context = { "body": msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1") } return bodytmpl.render(context) def render_application_pgp_signature(msg, extra=None): # A PGP signature outside of a multipart/signed - useless bodytmpl = jenv.get_template("body_application_pgp-signature.html") context = { } return bodytmpl.render(context) def render_application_x_gzip(msg, extra=None): return save_part(msg, "attachment") def render_message_news(msg, extra=None): partshtml = [] for part in msg.get_payload(): partshtml.append(render_message(part)) bodytmpl = jenv.get_template("body_message_news.html") context = { "msg": msg, "parts": partshtml, } return bodytmpl.render(context) def render_image_gif(msg, extra=None): return save_part(msg, "inline") def render_multipart_related(msg, extra=None): start = msg.get_param("start") start_part = None # collect content-ids content = {} for i, part in enumerate(msg.get_payload()): content_id = part.get("Content-Id") if start_part is None and (start is None or content_id == start): start_part = part continue if content_id: content[content_id] = { "i": i, "part": part, "url": save_part(part, "_url"), } parthtml = render_body(start_part, content) bodytmpl = jenv.get_template("body_multipart_related.html") context = { "msg": msg, "parts": [parthtml], } return bodytmpl.render(context) def render_image_jpeg(msg, extra=None): return save_part(msg, "inline") renderers = { "text/plain": render_text_plain, "multipart/mixed": render_multipart_mixed, "multipart/digest": render_multipart_digest, "message/rfc822": render_message_rfc822, "text/html": render_text_html, "text/enriched": render_text_enriched, "message/partial": render_message_partial, "application/octet-stream": render_application_octet_stream, "multipart/signed": render_multipart_signed, "application/pgp": render_application_pgp, "multipart/alternative": render_multipart_alternative, "application/x-unknown-content-type-scpfile": render_application_x_unknown_content_type_scpfile, "application/pgp-signature": render_application_pgp_signature, "application/x-gzip": render_application_x_gzip, "message/news": render_message_news, "image/gif": render_image_gif, "multipart/related": render_multipart_related, "application/x-java-vm": render_application_octet_stream, "image/jpeg": render_image_jpeg, "application/x-compressed-tar": render_application_octet_stream, } content_type = msg.get_content_type() content_disposition = msg.get_content_disposition() if content_disposition == "attachment": # XXX - not sure, if we should just store all content-types. # We probably should clean up html. Alternatively we could just store # all of them application/octet-stream, which browsers should download # and not try to display. bodyhtml = save_part(msg, content_disposition) else: bodyhtml = renderers[content_type](msg, extra) return jinja2.Markup(bodyhtml) def archive(msg): mid = get_message_id(msg) print("M", mid, file=sys.stderr) encmid = encode_message_id(mid) msgdir = basedir + "/msg/" + encmid os.makedirs(msgdir, exist_ok=True) with open(msgdir + "/index.html", "w") as hfd: msgtmpl = jenv.get_template("message.html") bodyhtml = render_body(msg) context = { "list": "LUGA", "message_id": mid, "subject": decode_rfc2047(msg["Subject"]), "from": decode_rfc2047(msg["From"]), "date": msg["Date"], "bodyhtml": bodyhtml, } msghtml = msgtmpl.render(context) hfd.write(msghtml) class HTMLPart(html.parser.HTMLParser): allowed_tags = [ "h2", "a", "wbr", "hr", "pre", "img", "font", "i", "br", "table", "tr", "th", "td", "b", "select", "option", "input", "sup", "address", "center", "p", "h1", "dl", "h3", "ul", "li", "ol", "u", "blockquote", "h4", ] hide_tags = [ "title" ] ignore_tags = [ "html", "head", "body", "marquee", "meta", "form", ] def __init__(self, extra): super().__init__() self.hide = False self.content = [] self.base = None self.extra = extra or {} def handle_starttag(self, tag, attrs): if tag == "base": href = [x[1] for x in attrs if x[0] == "href"] if href: self.base = href[0] elif tag in self.allowed_tags: attrstr = "".join( [' %s="%s"' % (a[0], html.escape(a[1])) if a[1] else ' %s' % (a[0]) for a in self.clean_attrs(tag, attrs) ] ) self.content.append("<%s%s>" % ( tag, attrstr )) elif tag in self.hide_tags: self.hide = True elif tag in self.ignore_tags: pass else: print("Encountered unknown start tag", tag, attrs, file=sys.stderr) def handle_endtag(self, tag): if tag in self.allowed_tags: self.content.append("%s>" % tag) elif tag in self.hide_tags: self.hide = False # XXX - Need stack? elif tag in self.ignore_tags: pass else: print("Encountered unknown end tag", tag, file=sys.stderr) def handle_data(self, data): if not self.hide: self.content.append(data) def as_string(self): return "".join(self.content) def clean_attrs(self, tag, attrs): safe_attrs = [ "border", "alt", "size", "face", "width", "height", "hspace", "cellpadding", "cellspacing", "bgcolor", "valign", "nowrap", "color", "colspan", "name", "value", "type", "align", "clear", "noshade", "type", ] clean_attrs = [] for a in attrs: if a[0] in safe_attrs: clean_attrs.append(a) elif a[0] == "href": url = a[1] url = urllib.parse.urljoin(self.base, url) u = urllib.parse.urlparse(url) if u[0] in ['https', 'http', 'ftp']: clean_attrs.append((a[0], url)) elif a[0] == "src": url = a[1] url = urllib.parse.urljoin(self.base, url) u = urllib.parse.urlparse(url) if u[0] == "cid": print("Encountered src cid attribute", a, file=sys.stderr) clean_attrs.append((a[0], self.extra["<" + u.path + ">"]["url"])) else: print("Ignored src attribute", a, file=sys.stderr) elif a[0] == "target": pass else: print("Encountered unknown attribute", a, file=sys.stderr) return clean_attrs class TextEnrichedPart: class TEElement: def __init__(self, t, parent): self.type = t.lower() self.content = [] if self.type == "nofill": self.filled = False elif parent: self.filled = parent.filled else: self.filled = True def append_text(self, s): s = s.replace("<<", "<") if self.filled: s = re.sub(r'\n+', lambda m: m.group(0)[1:] if len(m.group(0)) > 1 else " ", s) self.content.append(s) def as_string(self): if self.type == "": pre = "\n" prev_quote_depth += 1 while ln["quote_depth"] < prev_quote_depth: s += "\n" prev_quote_depth -= 1 if ln["flowed"]: s += "
" + html.escape(ln["content"]) + "
\n" else: s += "" + html.escape(ln["content"]) + "
\n" while 0 < prev_quote_depth: s += "" prev_quote_depth -= 1 return s for f in sys.argv[1:]: print("F", f, file=sys.stderr) mb = mailbox.mbox(f) for m in mb: archive(m) # vim: tw=79