yama/mbox2web

415 lines
14 KiB
Plaintext
Raw Normal View History

#!/usr/bin/python3
import email.parser
2019-03-01 11:58:22 +01:00
import hashlib
import html
import html.parser
import mailbox
import os
import re
2019-03-01 13:54:13 +01:00
import subprocess
import sys
2019-03-01 13:54:13 +01:00
import tempfile
import urllib.parse
import jinja2
basedir = "."
jenv = jinja2.Environment(
loader=jinja2.FileSystemLoader(["templates"]),
autoescape=True,
)
def get_message_id(msg):
"""
Extract the message id from a message
Note that this assumes that there is (at least) one message id. If
this is not the case, it will raise an exception (currently an
IndexError, but we may use something more suitable in the future).
"""
match = re.search(r'<(.*?)>', msg["Message-ID"])
return match.group(1)
def encode_message_id(msgid):
encmsgid = re.sub('[^!"$(-.0-9:=@-z|~]', lambda x: "{%02x}" % (ord(x.group(0))), msgid)
return encmsgid
def render_message(msg):
msgtmpl = jenv.get_template("message2.html")
bodyhtml = render_body(msg)
context = {
"message_id": msg["Message-Id"],
"subject": msg["Subject"],
"from": msg["From"],
"date": msg["Date"],
"bodyhtml": bodyhtml,
}
msghtml = msgtmpl.render(context)
return jinja2.Markup(msghtml)
def save_part(msg):
content_type = msg.get_content_type()
extension = {
"application/octet-stream": ".bin",
"text/html": ".html",
"text/x-vcard": ".vcf",
}[content_type]
name = msg.get_param("name") or "(data)"
m = hashlib.sha256()
payload = msg.get_payload(decode=True)
m.update(payload)
filename = m.hexdigest() + extension
os.makedirs("parts", exist_ok=True)
with open("parts/" + filename, "wb") as fh:
fh.write(payload)
bodytmpl = jenv.get_template("body_application_octet_stream.html")
context = {
"name": name,
"url": "../../parts/" + filename,
}
bodyhtml = bodytmpl.render(context)
return bodyhtml
partial_message_cache = {}
def render_body(msg):
content_type = msg.get_content_type()
content_disposition = msg.get_content_disposition()
if content_disposition == "attachment":
# XXX - not sure, if we should just store all content-types.
# We probably should clean up html. Alternatively we could just store
# all of them application/octet-stream, which browsers should download
# and not try to display.
bodyhtml = save_part(msg)
elif content_type == "text/plain":
bodytmpl = jenv.get_template("body_text_plain.html")
context = {
2019-03-01 11:57:55 +01:00
"body": msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1")
}
bodyhtml = bodytmpl.render(context)
elif content_type == "multipart/mixed":
partshtml = []
for part in msg.get_payload():
partshtml.append(render_body(part))
bodytmpl = jenv.get_template("body_multipart_mixed.html")
context = {
"parts": partshtml
}
bodyhtml = bodytmpl.render(context)
elif content_type == "multipart/digest":
partshtml = []
for part in msg.get_payload():
partshtml.append(render_message(part))
bodytmpl = jenv.get_template("body_multipart_digest.html")
context = {
"parts": partshtml
}
bodyhtml = bodytmpl.render(context)
elif content_type == "message/rfc822":
partshtml = []
for part in msg.get_payload():
partshtml.append(render_message(part))
bodytmpl = jenv.get_template("body_message_rfc822.html")
context = {
"parts": partshtml
}
bodyhtml = bodytmpl.render(context)
elif content_type == "text/html":
htmlpart = HTMLPart()
2019-03-01 22:51:06 +01:00
htmlpart.feed(msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1"))
bodytmpl = jenv.get_template("body_text_html.html")
context = {
"body": jinja2.Markup(htmlpart.as_string())
}
bodyhtml = bodytmpl.render(context)
elif content_type == "text/enriched":
tepart = TextEnrichedPart(msg.get_payload())
bodytmpl = jenv.get_template("body_text_enriched.html")
context = {
"body": jinja2.Markup(tepart.as_string())
}
bodyhtml = bodytmpl.render(context)
elif content_type == "message/partial":
# Default header for get_param is Content-Type
whole_msg_id = msg.get_param("id")
if not whole_msg_id in partial_message_cache:
# For now we assume that total is present on all parts. This
# isn't guarantueed, however, and we may need to handle the
# case where total is only present on the last part.
partial_message_cache[whole_msg_id] = [None] * int(msg.get_param("total"))
payload = msg.get_payload()
s = payload[0].as_string() # Only one part
partial_message_cache[whole_msg_id][int(msg.get_param("number"))-1] = s
if not None in partial_message_cache[whole_msg_id]:
p = email.parser.Parser()
whole_msg = p.parsestr("".join(partial_message_cache[whole_msg_id]))
whole_msg_embedded_id = whole_msg["Message-Id"]
if not whole_msg_embedded_id:
whole_msg.add_header("Message-Id", "<" + whole_msg_id + ">")
whole_msg_embedded_id = whole_msg_id
archive(whole_msg)
del partial_message_cache[whole_msg_id]
bodyhtml = "<p>This is part %d of %d of <a href='../%s/'>%s</a></p>" % (
int(msg.get_param("number")),
int(msg.get_param("total")),
encode_message_id(whole_msg_id),
html.escape(whole_msg_id))
2019-03-01 11:58:22 +01:00
elif content_type == "application/octet-stream":
bodyhtml = save_part(msg)
2019-03-01 11:58:22 +01:00
2019-03-01 13:54:13 +01:00
elif content_type == "multipart/signed":
content, signature = msg.get_payload()
with tempfile.NamedTemporaryFile(buffering=0) as content_fh:
content_fh.write(content.as_bytes())
with tempfile.NamedTemporaryFile(buffering=0, suffix=".asc") as signature_fh:
signature_fh.write(signature.get_payload(decode=True))
r = subprocess.run(["gpg", "--verify", signature_fh.name, content_fh.name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
gpgresult = r.stderr
2019-03-02 12:24:56 +01:00
# XXX - Analyze gpgresult or just use r,returncode?
2019-03-01 13:54:13 +01:00
gpgstatus = "dubious"
contenthtml = render_message(content)
bodytmpl = jenv.get_template("body_multipart_signed.html")
context = {
"content": contenthtml,
"gpgresult": gpgresult,
"gpgstatus": gpgstatus,
}
bodyhtml = bodytmpl.render(context)
2019-03-01 11:58:22 +01:00
2019-03-02 12:24:56 +01:00
elif content_type == "application/pgp":
with tempfile.NamedTemporaryFile(buffering=0) as content_fh:
content_fh.write(msg.get_payload(decode=True))
r = subprocess.run(["gpg", "--decrypt", content_fh.name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
gpgresult = r.stderr.decode()
# XXX - Analyze gpgresult or just use r,returncode?
gpgstatus = "dubious"
decrypted_content = r.stdout
p = email.parser.BytesParser()
embedded_message = p.parsebytes(decrypted_content)
contenthtml = render_message(embedded_message)
bodytmpl = jenv.get_template("body_application_pgp.html")
context = {
"content": contenthtml,
"gpgresult": gpgresult,
"gpgstatus": gpgstatus,
}
bodyhtml = bodytmpl.render(context)
elif content_type == "multipart/alternative":
partshtml = []
partstypes = []
for part in msg.get_payload():
partstypes.append(part.get_content_type())
partshtml.append(render_body(part))
bodytmpl = jenv.get_template("body_multipart_alternative.html")
context = {
"types": partstypes,
"parts": partshtml,
}
bodyhtml = bodytmpl.render(context)
elif content_type == "application/x-unknown-content-type-scpfile":
bodytmpl = jenv.get_template("body_application_x-unknown-content-type-scpfile.html")
context = {
"body": msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1")
}
bodyhtml = bodytmpl.render(context)
else:
raise RuntimeError("Content-type " + content_type + " not implemented yet")
return jinja2.Markup(bodyhtml)
def archive(msg):
mid = get_message_id(msg)
encmid = encode_message_id(mid)
msgdir = basedir + "/msg/" + encmid
os.makedirs(msgdir, exist_ok=True)
with open(msgdir + "/index.html", "w") as hfd:
msgtmpl = jenv.get_template("message.html")
bodyhtml = render_body(msg)
context = {
"list": "LUGA",
"message_id": mid,
"subject": msg["Subject"],
"from": msg["From"],
"date": msg["Date"],
"bodyhtml": bodyhtml,
}
msghtml = msgtmpl.render(context)
hfd.write(msghtml)
class HTMLPart(html.parser.HTMLParser):
allowed_tags = [
"h2", "a", "wbr", "hr", "pre", "img", "font", "i", "br", "table", "tr",
"th", "td", "b", "select", "option", "input", "sup", "address",
"center", "p", "h1", "dl", "h3", "ul", "li", "ol", "u", "blockquote",
]
hide_tags = [ "title" ]
ignore_tags = [ "html", "head", "body", "marquee", "meta", "form", ]
def __init__(self):
super().__init__()
self.hide = False
self.content = []
2019-03-01 22:52:41 +01:00
self.base = None
def handle_starttag(self, tag, attrs):
if tag == "base":
href = [x[1] for x in attrs if x[0] == "href"]
if href:
self.base = href[0]
elif tag in self.allowed_tags:
attrstr = "".join(
[' %s="%s"' % (a[0], html.escape(a[1])) if a[1] else ' %s' % (a[0])
for a in self.clean_attrs(tag, attrs)
]
)
self.content.append("<%s%s>" % ( tag, attrstr ))
elif tag in self.hide_tags:
self.hide = True
elif tag in self.ignore_tags:
pass
else:
print("Encountered unknown start tag", tag, attrs, file=sys.stderr)
def handle_endtag(self, tag):
if tag in self.allowed_tags:
self.content.append("</%s>" % tag)
elif tag in self.hide_tags:
self.hide = False # XXX - Need stack?
elif tag in self.ignore_tags:
pass
else:
print("Encountered unknown end tag", tag, file=sys.stderr)
def handle_data(self, data):
if not self.hide:
self.content.append(data)
def as_string(self):
return "".join(self.content)
def clean_attrs(self, tag, attrs):
safe_attrs = [
"border", "alt", "size", "face", "width", "height", "hspace",
"cellpadding", "cellspacing", "bgcolor", "valign", "nowrap",
"color", "colspan", "name", "value", "type", "align", "clear",
"noshade", "type",
]
clean_attrs = []
for a in attrs:
if a[0] in safe_attrs:
clean_attrs.append(a)
elif a[0] == "href":
url = a[1]
url = urllib.parse.urljoin(self.base, url)
u = urllib.parse.urlparse(url)
if u[0] in ['https', 'http', 'ftp']:
clean_attrs.append((a[0], url))
elif a[0] == "src":
url = a[1]
url = urllib.parse.urljoin(self.base, url)
u = urllib.parse.urlparse(url)
if u[0] == "cid":
print("Encountered src cid attribute", a, file=sys.stderr)
# XXX - implement cid
clean_attrs.append((a[0], url))
else:
print("Ignored src attribute", a, file=sys.stderr)
elif a[0] == "target":
pass
else:
print("Encountered unknown attribute", a, file=sys.stderr)
return clean_attrs
class TextEnrichedPart:
class TEElement:
def __init__(self, t):
self.type = t.lower()
self.content = []
self.filled = True
def append_text(self, s):
s = s.replace("<<", "<")
if self.filled:
s = re.sub(r'\n+',
lambda m: m.group(0)[1:] if len(m.group(0)) > 1 else " ",
s)
self.content.append(s)
def as_string(self):
if self.type == "":
pre = "<div class='text-enriched'>"
post = "</div>"
elif self.type == "bold":
pre = "<b>"
post = "</b>"
else:
raise NotImplementedError("Unknown type " + self.type)
s = pre
for c in self.content:
if isinstance(c, type(self)):
s += c.as_string()
else:
s += html.escape(c)
s += post
return s
def __init__(self, s):
self.stack = [ self.TEElement("") ]
while s:
stack_top = self.stack[-1]
m = re.match(r'(.*?)<(/?[A-Za-z0-9-]{,60})>(.*)', s, re.DOTALL)
if m:
if m.group(2).lower == "param" and re.match(r'\s*', m.group(1)):
stack_top.content.append(TEElement("param"))
else:
stack_top.append_text(m.group(1))
if m.group(2)[0] != "/":
new = self.TEElement(m.group(2))
stack_top.content.append(new)
self.stack.append(new)
else:
if stack_top.type == m.group(2)[1:]:
self.stack.pop()
else:
raise RuntimeError("Nesting error: Expected %s, got %s near %s", self.stack[-1].type, m.group(2)[1:], s)
s = m.group(3)
else:
stack_top.append_text(s)
s = ""
def as_string(self):
return self.stack[0].as_string()
for f in sys.argv[1:]:
print("F", f, file=sys.stderr)
mb = mailbox.mbox(f)
for m in mb:
archive(m)
# vim: tw=79