yama/mbox2web

608 lines
23 KiB
Plaintext
Raw Normal View History

#!/usr/bin/python3
2019-04-30 21:55:21 +02:00
import email.header
import email.parser
2019-03-01 11:58:22 +01:00
import hashlib
import html
import html.parser
import mailbox
import os
import re
2019-03-01 13:54:13 +01:00
import subprocess
import sys
2019-03-01 13:54:13 +01:00
import tempfile
import urllib.parse
import jinja2
basedir = "."
jenv = jinja2.Environment(
loader=jinja2.FileSystemLoader(["templates"]),
autoescape=True,
)
def get_message_id(msg):
"""
Extract the message id from a message
Note that this assumes that there is (at least) one message id. If
this is not the case, it will raise an exception (currently an
IndexError, but we may use something more suitable in the future).
"""
match = re.search(r'<(.*?)>', msg["Message-ID"])
return match.group(1)
2019-04-30 21:55:21 +02:00
def encode_message_id(msgid):
encmsgid = re.sub('[^!"$(-.0-9:=@-z|~]', lambda x: "{%02x}" % (ord(x.group(0))), msgid)
return encmsgid
2019-04-30 21:55:21 +02:00
def decode_rfc2047(s):
if s is None:
return None
r = ""
for chunk in email.header.decode_header(s):
if chunk[1]:
try:
r += chunk[0].decode(chunk[1])
except LookupError:
r += chunk[0].decode("windows-1252")
except UnicodeDecodeError:
r += chunk[0].decode("windows-1252")
elif type(chunk[0]) == bytes:
r += chunk[0].decode('us-ascii')
else:
r += chunk[0]
return r
def render_message(msg):
msgtmpl = jenv.get_template("message2.html")
bodyhtml = render_body(msg)
context = {
2019-03-10 22:47:10 +01:00
"msg": msg,
"message_id": msg["Message-Id"],
2019-04-30 21:55:21 +02:00
"subject": decode_rfc2047(msg["Subject"]),
"from": decode_rfc2047(msg["From"]),
"date": msg["Date"],
"bodyhtml": bodyhtml,
}
msghtml = msgtmpl.render(context)
return jinja2.Markup(msghtml)
def save_part(msg, disposition):
content_type = msg.get_content_type()
extension = {
"application/octet-stream": ".bin",
"text/html": ".html",
"text/x-vcard": ".vcf",
"text/plain": ".txt",
"application/x-gzip": ".gz", # more likely tar.gz, but we can't know without looking into it which we ain't
"image/gif": ".gif",
"text/x-c": ".c",
"application/x-perl": ".pl",
"application/msword": ".doc",
"application/ms-tnef": ".ms-tnef",
"application/x-bzip2": ".bz2", # more likely tar.bz2, but we can't know without looking into it which we ain't
"application/x-shellscript": ".sh",
"application/x-java-vm": ".bin", # The only instances are mis-labelled
"image/png": ".png",
"application/pgp-keys": ".pgp",
"application/x-gunzip": ".gz", # that sort of makes sense, but not really
"image/jpeg": ".jpg",
}[content_type]
name = msg.get_param("name") or "(data)"
m = hashlib.sha256()
payload = msg.get_payload(decode=True)
m.update(payload)
filename = m.hexdigest() + extension
os.makedirs("parts", exist_ok=True)
with open("parts/" + filename, "wb") as fh:
fh.write(payload)
url = "../../parts/" + filename
if disposition == "_url":
return url
else:
template_name = disposition + "_" + content_type.replace("/", "_") + ".html"
bodytmpl = jenv.get_template(template_name)
context = {
"name": name,
"url": url,
}
bodyhtml = bodytmpl.render(context)
return bodyhtml
partial_message_cache = {}
def render_body(msg, extra=None):
def render_text_plain(msg, extra=None):
bodytmpl = jenv.get_template("body_text_plain.html")
context = {
2019-03-01 11:57:55 +01:00
"body": msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1")
}
return bodytmpl.render(context)
def render_multipart_mixed(msg, extra=None):
parts = msg.get_payload()
if type(parts) == str:
# mislabelled, assume text/plain
2019-03-17 22:24:17 +01:00
return render_text_plain(msg)
partshtml = []
for part in msg.get_payload():
partshtml.append(render_body(part))
bodytmpl = jenv.get_template("body_multipart_mixed.html")
context = {
"parts": partshtml
}
return bodytmpl.render(context)
def render_multipart_digest(msg, extra=None):
partshtml = []
for part in msg.get_payload():
partshtml.append(render_message(part))
bodytmpl = jenv.get_template("body_multipart_digest.html")
context = {
"parts": partshtml
}
return bodytmpl.render(context)
def render_message_rfc822(msg, extra=None):
partshtml = []
for part in msg.get_payload():
partshtml.append(render_message(part))
bodytmpl = jenv.get_template("body_message_rfc822.html")
context = {
"parts": partshtml
}
return bodytmpl.render(context)
def render_text_html(msg, extra=None):
htmlpart = HTMLPart(extra)
2019-03-01 22:51:06 +01:00
htmlpart.feed(msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1"))
bodytmpl = jenv.get_template("body_text_html.html")
context = {
"body": jinja2.Markup(htmlpart.as_string())
}
return bodytmpl.render(context)
def render_text_enriched(msg, extra=None):
payload = msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1")
tepart = TextEnrichedPart(payload)
bodytmpl = jenv.get_template("body_text_enriched.html")
context = {
"body": jinja2.Markup(tepart.as_string())
}
return bodytmpl.render(context)
def render_message_partial(msg, extra=None):
# Default header for get_param is Content-Type
whole_msg_id = msg.get_param("id")
if not whole_msg_id in partial_message_cache:
# For now we assume that total is present on all parts. This
# isn't guarantueed, however, and we may need to handle the
# case where total is only present on the last part.
partial_message_cache[whole_msg_id] = [None] * int(msg.get_param("total"))
payload = msg.get_payload()
s = payload[0].as_string() # Only one part
partial_message_cache[whole_msg_id][int(msg.get_param("number"))-1] = s
if not None in partial_message_cache[whole_msg_id]:
p = email.parser.Parser()
whole_msg = p.parsestr("".join(partial_message_cache[whole_msg_id]))
whole_msg_embedded_id = whole_msg["Message-Id"]
if not whole_msg_embedded_id:
whole_msg.add_header("Message-Id", "<" + whole_msg_id + ">")
whole_msg_embedded_id = whole_msg_id
archive(whole_msg)
del partial_message_cache[whole_msg_id]
return "<p>This is part %d of %d of <a href='../%s/'>%s</a></p>" % (
int(msg.get_param("number")),
int(msg.get_param("total")),
encode_message_id(whole_msg_id),
html.escape(whole_msg_id))
def render_application_octet_stream(msg, extra=None):
return save_part(msg, "attachment")
2019-03-01 11:58:22 +01:00
def render_multipart_signed(msg, extra=None):
2019-03-01 13:54:13 +01:00
content, signature = msg.get_payload()
with tempfile.NamedTemporaryFile(buffering=0) as content_fh:
content_fh.write(content.as_bytes())
with tempfile.NamedTemporaryFile(buffering=0, suffix=".asc") as signature_fh:
signature_fh.write(signature.get_payload(decode=True))
r = subprocess.run(["gpg", "--verify", signature_fh.name, content_fh.name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
gpgresult = r.stderr
2019-03-02 12:24:56 +01:00
# XXX - Analyze gpgresult or just use r,returncode?
2019-03-01 13:54:13 +01:00
gpgstatus = "dubious"
contenthtml = render_message(content)
bodytmpl = jenv.get_template("body_multipart_signed.html")
context = {
"content": contenthtml,
"gpgresult": gpgresult,
"gpgstatus": gpgstatus,
}
return bodytmpl.render(context)
2019-03-01 11:58:22 +01:00
def render_application_pgp(msg, extra=None):
2019-03-02 12:24:56 +01:00
with tempfile.NamedTemporaryFile(buffering=0) as content_fh:
content_fh.write(msg.get_payload(decode=True))
r = subprocess.run(["gpg", "--decrypt", content_fh.name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
gpgresult = r.stderr.decode()
# XXX - Analyze gpgresult or just use r,returncode?
gpgstatus = "dubious"
decrypted_content = r.stdout
p = email.parser.BytesParser()
embedded_message = p.parsebytes(decrypted_content)
contenthtml = render_message(embedded_message)
bodytmpl = jenv.get_template("body_application_pgp.html")
context = {
"content": contenthtml,
"gpgresult": gpgresult,
"gpgstatus": gpgstatus,
}
return bodytmpl.render(context)
2019-03-02 12:24:56 +01:00
def render_multipart_alternative(msg, extra=None):
partshtml = []
partstypes = []
for part in msg.get_payload():
partstypes.append(part.get_content_type())
partshtml.append(render_body(part))
bodytmpl = jenv.get_template("body_multipart_alternative.html")
context = {
"types": partstypes,
"parts": partshtml,
}
return bodytmpl.render(context)
def render_application_x_unknown_content_type_scpfile(msg, extra=None):
bodytmpl = jenv.get_template("body_application_x-unknown-content-type-scpfile.html")
context = {
"body": msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1")
}
return bodytmpl.render(context)
def render_application_pgp_signature(msg, extra=None):
# A PGP signature outside of a multipart/signed - useless
bodytmpl = jenv.get_template("body_application_pgp-signature.html")
context = {
}
return bodytmpl.render(context)
def render_application_x_gzip(msg, extra=None):
return save_part(msg, "attachment")
def render_message_news(msg, extra=None):
2019-03-10 22:47:10 +01:00
partshtml = []
for part in msg.get_payload():
partshtml.append(render_message(part))
bodytmpl = jenv.get_template("body_message_news.html")
context = {
"msg": msg,
"parts": partshtml,
}
return bodytmpl.render(context)
def render_image_gif(msg, extra=None):
return save_part(msg, "inline")
def render_multipart_related(msg, extra=None):
start = msg.get_param("start")
start_part = None
# collect content-ids
content = {}
for i, part in enumerate(msg.get_payload()):
content_id = part.get("Content-Id")
if start_part is None and (start is None or content_id == start):
start_part = part
continue
if content_id:
content[content_id] = {
"i": i,
"part": part,
"url": save_part(part, "_url"),
}
parthtml = render_body(start_part, content)
bodytmpl = jenv.get_template("body_multipart_related.html")
context = {
"msg": msg,
"parts": [parthtml],
}
return bodytmpl.render(context)
renderers = {
"text/plain": render_text_plain,
"multipart/mixed": render_multipart_mixed,
"multipart/digest": render_multipart_digest,
"message/rfc822": render_message_rfc822,
"text/html": render_text_html,
"text/enriched": render_text_enriched,
"message/partial": render_message_partial,
"application/octet-stream": render_application_octet_stream,
"multipart/signed": render_multipart_signed,
"application/pgp": render_application_pgp,
"multipart/alternative": render_multipart_alternative,
"application/x-unknown-content-type-scpfile": render_application_x_unknown_content_type_scpfile,
"application/pgp-signature": render_application_pgp_signature,
"application/x-gzip": render_application_x_gzip,
"message/news": render_message_news,
"image/gif": render_image_gif,
"multipart/related": render_multipart_related,
"application/x-java-vm": render_application_octet_stream,
}
content_type = msg.get_content_type()
content_disposition = msg.get_content_disposition()
if content_disposition == "attachment":
# XXX - not sure, if we should just store all content-types.
# We probably should clean up html. Alternatively we could just store
# all of them application/octet-stream, which browsers should download
# and not try to display.
bodyhtml = save_part(msg, content_disposition)
else:
bodyhtml = renderers[content_type](msg, extra)
return jinja2.Markup(bodyhtml)
def archive(msg):
mid = get_message_id(msg)
print("M", mid, file=sys.stderr)
encmid = encode_message_id(mid)
msgdir = basedir + "/msg/" + encmid
os.makedirs(msgdir, exist_ok=True)
with open(msgdir + "/index.html", "w") as hfd:
msgtmpl = jenv.get_template("message.html")
bodyhtml = render_body(msg)
context = {
"list": "LUGA",
"message_id": mid,
2019-04-30 21:55:21 +02:00
"subject": decode_rfc2047(msg["Subject"]),
"from": decode_rfc2047(msg["From"]),
"date": msg["Date"],
"bodyhtml": bodyhtml,
}
msghtml = msgtmpl.render(context)
hfd.write(msghtml)
class HTMLPart(html.parser.HTMLParser):
allowed_tags = [
"h2", "a", "wbr", "hr", "pre", "img", "font", "i", "br", "table", "tr",
"th", "td", "b", "select", "option", "input", "sup", "address",
"center", "p", "h1", "dl", "h3", "ul", "li", "ol", "u", "blockquote",
"h4",
]
hide_tags = [ "title" ]
ignore_tags = [ "html", "head", "body", "marquee", "meta", "form", ]
def __init__(self, extra):
super().__init__()
self.hide = False
self.content = []
2019-03-01 22:52:41 +01:00
self.base = None
self.extra = extra or {}
def handle_starttag(self, tag, attrs):
if tag == "base":
href = [x[1] for x in attrs if x[0] == "href"]
if href:
self.base = href[0]
elif tag in self.allowed_tags:
attrstr = "".join(
[' %s="%s"' % (a[0], html.escape(a[1])) if a[1] else ' %s' % (a[0])
for a in self.clean_attrs(tag, attrs)
]
)
self.content.append("<%s%s>" % ( tag, attrstr ))
elif tag in self.hide_tags:
self.hide = True
elif tag in self.ignore_tags:
pass
else:
print("Encountered unknown start tag", tag, attrs, file=sys.stderr)
def handle_endtag(self, tag):
if tag in self.allowed_tags:
self.content.append("</%s>" % tag)
elif tag in self.hide_tags:
self.hide = False # XXX - Need stack?
elif tag in self.ignore_tags:
pass
else:
print("Encountered unknown end tag", tag, file=sys.stderr)
def handle_data(self, data):
if not self.hide:
self.content.append(data)
def as_string(self):
return "".join(self.content)
def clean_attrs(self, tag, attrs):
safe_attrs = [
"border", "alt", "size", "face", "width", "height", "hspace",
"cellpadding", "cellspacing", "bgcolor", "valign", "nowrap",
"color", "colspan", "name", "value", "type", "align", "clear",
"noshade", "type",
]
clean_attrs = []
for a in attrs:
if a[0] in safe_attrs:
clean_attrs.append(a)
elif a[0] == "href":
url = a[1]
url = urllib.parse.urljoin(self.base, url)
u = urllib.parse.urlparse(url)
if u[0] in ['https', 'http', 'ftp']:
clean_attrs.append((a[0], url))
elif a[0] == "src":
url = a[1]
url = urllib.parse.urljoin(self.base, url)
u = urllib.parse.urlparse(url)
if u[0] == "cid":
print("Encountered src cid attribute", a, file=sys.stderr)
clean_attrs.append((a[0], self.extra["<" + u.path + ">"]["url"]))
else:
print("Ignored src attribute", a, file=sys.stderr)
elif a[0] == "target":
pass
else:
print("Encountered unknown attribute", a, file=sys.stderr)
return clean_attrs
class TextEnrichedPart:
class TEElement:
def __init__(self, t, parent):
self.type = t.lower()
self.content = []
if self.type == "nofill":
self.filled = False
elif parent:
self.filled = parent.filled
else:
self.filled = True
def append_text(self, s):
s = s.replace("<<", "<")
if self.filled:
s = re.sub(r'\n+',
lambda m: m.group(0)[1:] if len(m.group(0)) > 1 else " ",
s)
self.content.append(s)
def as_string(self):
if self.type == "":
pre = "<div class='text-enriched'>"
post = "</div>"
elif self.type == "bold":
pre = "<b>"
post = "</b>"
elif self.type == "param":
# We shouldn't ever get here since the param should be consumed
# by the parent, but there are broken messages ...
return ""
elif self.type.startswith("x-"):
# Just ignore all experimental elements and render their
# contents.
pre = ""
post = ""
elif self.type == "flushleft":
pre = "<div class='flushleft'>"
post = "</div>"
elif self.type == "smaller":
# HTML has a "small" element, but that is meant for "side
# comments such as small print", while t/e "smaller" is purely
# typographical
pre = "<span style='font-size: 0.9em'>"
post = "</span>"
elif self.type == "color":
param = self.content.pop(0)
if param.type != "param":
raise RuntimeError("Expected 'param', got '%s'" % param.type)
colorstring = param.content[0]
if re.match(r'^\w+$', colorstring):
# a single word, i.e. a colorname like "red" or cyan".
# The 8 colors in the spec aren't a subset of the 17 colors in CSS2,
# but recognized by most/all browsers. And if we encounter a non-standard
# color the best we can do is let the browser handle it.
pass
else:
m = re.match(r'([0-9a-f]{4}),([0-9a-f]{4}),([0-9a-f]{4})', colorstring, re.IGNORECASE)
if m:
# an RGB triple. Use only the top 8 bits of each component:
colorstring = "#%s%s%s" % (m.group(1)[:2], m.group(2)[:2], m.group(3)[:2])
else:
# syntax error. Replace with "black"
colorstring = "#000"
pre = "<span style='color: %s'>" % colorstring
post = "</span>"
elif self.type == "nofill":
pre = "<div class='nofill'>"
post = "</div>"
elif self.type == "fontfamily":
param = self.content.pop(0)
if param.type != "param":
raise RuntimeError("Expected 'param', got '%s'" % param.type)
fontfamily = param.content[0]
if "'" in fontfamily or '"' in fontfamily:
raise RuntimeError("Can't handle quotes in font names (%s)" % fontfamily)
pre = "<span style='font-family: \"%s\"'>" % fontfamily
post = "</span>"
elif self.type == "bigger":
# HTML used to have a "big" element, but that has been removed from HTML5
pre = "<span style='font-size: 1.1em'>"
post = "</span>"
2019-03-17 22:42:44 +01:00
elif self.type == "underline":
# HTML5 redefined the meaning of "u", but I'm using it anyway
pre = "<u>"
post = "</u>"
else:
raise NotImplementedError("Unknown type " + self.type)
s = pre
for c in self.content:
if isinstance(c, type(self)):
s += c.as_string()
else:
s += html.escape(c)
s += post
return s
def __init__(self, s):
self.stack = [ self.TEElement("", None) ]
while s:
stack_top = self.stack[-1]
m = re.match(r'(.*?)<(/?[A-Za-z0-9-]{,60})>(.*)', s, re.DOTALL)
if m:
text = m.group(1)
tag = m.group(2).lower()
if not (tag == "param" and re.match(r'\s*', text) or text == ""):
stack_top.append_text(text)
if tag[0] != "/":
new = self.TEElement(tag, stack_top)
stack_top.content.append(new)
self.stack.append(new)
else:
closed_tag = tag[1:]
if stack_top.type == closed_tag:
self.stack.pop()
elif closed_tag in [e.type for e in self.stack]:
# We close a tag which has been opened, but it
# wasn't the last one. This is clearly a nesting
# error, but there was broken software (e.g.
# http://www.fozztexx.com/Mynah/) which used
# non-closing tags, and by just popping them off
# the stack we can "re-synchronize".
while self.stack.pop().type != closed_tag:
pass
else:
raise RuntimeError("Nesting error: Expected %s, got %s near %s" % (self.stack[-1].type, closed_tag, s))
s = m.group(3)
else:
stack_top.append_text(s)
s = ""
def as_string(self):
return self.stack[0].as_string()
for f in sys.argv[1:]:
print("F", f, file=sys.stderr)
mb = mailbox.mbox(f)
for m in mb:
archive(m)
# vim: tw=79