yama/mbox2web

292 lines
9.7 KiB
Plaintext
Raw Normal View History

#!/usr/bin/python3
import email.parser
import html
import html.parser
import mailbox
import os
import re
import sys
import urllib.parse
import jinja2
basedir = "."
jenv = jinja2.Environment(
loader=jinja2.FileSystemLoader(["templates"]),
autoescape=True,
)
def get_message_id(msg):
"""
Extract the message id from a message
Note that this assumes that there is (at least) one message id. If
this is not the case, it will raise an exception (currently an
IndexError, but we may use something more suitable in the future).
"""
match = re.search(r'<(.*?)>', msg["Message-ID"])
return match.group(1)
def encode_message_id(msgid):
encmsgid = re.sub('[^!"$(-.0-9:=@-z|~]', lambda x: "{%02x}" % (ord(x.group(0))), msgid)
return encmsgid
def render_message(msg):
msgtmpl = jenv.get_template("message2.html")
bodyhtml = render_body(msg)
context = {
"message_id": msg["Message-Id"],
"subject": msg["Subject"],
"from": msg["From"],
"date": msg["Date"],
"bodyhtml": bodyhtml,
}
msghtml = msgtmpl.render(context)
return jinja2.Markup(msghtml)
partial_message_cache = {}
def render_body(msg):
content_type = msg.get_content_type()
if content_type == "text/plain":
bodytmpl = jenv.get_template("body_text_plain.html")
context = {
"body": msg.get_payload()
}
bodyhtml = bodytmpl.render(context)
elif content_type == "multipart/mixed":
partshtml = []
for part in msg.get_payload():
partshtml.append(render_body(part))
bodytmpl = jenv.get_template("body_multipart_mixed.html")
context = {
"parts": partshtml
}
bodyhtml = bodytmpl.render(context)
elif content_type == "multipart/digest":
partshtml = []
for part in msg.get_payload():
partshtml.append(render_message(part))
bodytmpl = jenv.get_template("body_multipart_digest.html")
context = {
"parts": partshtml
}
bodyhtml = bodytmpl.render(context)
elif content_type == "message/rfc822":
partshtml = []
for part in msg.get_payload():
partshtml.append(render_message(part))
bodytmpl = jenv.get_template("body_message_rfc822.html")
context = {
"parts": partshtml
}
bodyhtml = bodytmpl.render(context)
elif content_type == "text/html":
htmlpart = HTMLPart()
htmlpart.feed(msg.get_payload())
bodyhtml = htmlpart.as_string()
elif content_type == "text/enriched":
tepart = TextEnrichedPart(msg.get_payload())
bodyhtml = tepart.as_string()
elif content_type == "message/partial":
# Default header for get_param is Content-Type
whole_msg_id = msg.get_param("id")
if not whole_msg_id in partial_message_cache:
# For now we assume that total is present on all parts. This
# isn't guarantueed, however, and we may need to handle the
# case where total is only present on the last part.
partial_message_cache[whole_msg_id] = [None] * int(msg.get_param("total"))
payload = msg.get_payload()
s = payload[0].as_string() # Only one part
partial_message_cache[whole_msg_id][int(msg.get_param("number"))-1] = s
if not None in partial_message_cache[whole_msg_id]:
p = email.parser.Parser()
whole_msg = p.parsestr("".join(partial_message_cache[whole_msg_id]))
whole_msg_embedded_id = whole_msg["Message-Id"]
if not whole_msg_embedded_id:
whole_msg.add_header("Message-Id", "<" + whole_msg_id + ">")
whole_msg_embedded_id = whole_msg_id
archive(whole_msg)
del partial_message_cache[whole_msg_id]
bodyhtml = "<p>This is part %d of %d of <a href='../%s/'>%s</a></p>" % (
int(msg.get_param("number")),
int(msg.get_param("total")),
encode_message_id(whole_msg_id),
html.escape(whole_msg_id))
else:
raise RuntimeError("Content-type " + content_type + " not implemented yet")
return jinja2.Markup(bodyhtml)
def archive(msg):
mid = get_message_id(msg)
encmid = encode_message_id(mid)
msgdir = basedir + "/msg/" + encmid
os.makedirs(msgdir, exist_ok=True)
with open(msgdir + "/index.html", "w") as hfd:
msgtmpl = jenv.get_template("message.html")
bodyhtml = render_body(msg)
context = {
"list": "LUGA",
"message_id": mid,
"subject": msg["Subject"],
"from": msg["From"],
"date": msg["Date"],
"bodyhtml": bodyhtml,
}
msghtml = msgtmpl.render(context)
hfd.write(msghtml)
class HTMLPart(html.parser.HTMLParser):
allowed_tags = [ 'h2', 'a', 'wbr', 'hr', 'pre', 'img', 'font', 'i' ]
hide_tags = [ 'title' ]
ignore_tags = [ 'html', 'head', 'body' ]
def __init__(self):
super().__init__()
self.hide = False
self.content = []
def handle_starttag(self, tag, attrs):
if tag == "base":
href = [x[1] for x in attrs if x[0] == "href"]
if href:
self.base = href[0]
elif tag in self.allowed_tags:
attrstr = "".join(
[' %s="%s"' % (a[0], html.escape(a[1]))
for a in self.clean_attrs(tag, attrs)
]
)
self.content.append("<%s%s>" % ( tag, attrstr ))
elif tag in self.hide_tags:
self.hide = True
elif tag in self.ignore_tags:
pass
else:
print("Encountered unknown start tag", tag, attrs, file=sys.stderr)
def handle_endtag(self, tag):
if tag in self.allowed_tags:
self.content.append("</%s>" % tag)
elif tag in self.hide_tags:
self.hide = False # XXX - Need stack?
elif tag in self.ignore_tags:
pass
else:
print("Encountered unknown end tag", tag, file=sys.stderr)
def handle_data(self, data):
if not self.hide:
self.content.append(data)
def as_string(self):
return "".join(self.content)
def clean_attrs(self, tag, attrs):
clean_attrs = []
for a in attrs:
if a[0] == "href":
url = a[1]
url = urllib.parse.urljoin(self.base, url)
u = urllib.parse.urlparse(url)
if u[0] in ['https', 'http', 'ftp']:
clean_attrs.append((a[0], url))
elif a[0] == "src":
url = a[1]
url = urllib.parse.urljoin(self.base, url)
u = urllib.parse.urlparse(url)
if u[0] == "cid":
print("Encountered src cid attribute", a, file=sys.stderr)
# XXX - implement cid
clean_attrs.append((a[0], url))
else:
print("Ignored src attribute", a, file=sys.stderr)
elif a[0] == "border":
clean_attrs.append(a)
elif a[0] == "alt":
clean_attrs.append(a)
elif a[0] == "size":
clean_attrs.append(a)
else:
print("Encountered unknown attribute", a, file=sys.stderr)
return clean_attrs
class TextEnrichedPart:
class TEElement:
def __init__(self, t):
self.type = t.lower()
self.content = []
self.filled = True
def append_text(self, s):
s = s.replace("<<", "<")
if self.filled:
s = re.sub(r'\n+',
lambda m: m.group(0)[1:] if len(m.group(0)) > 1 else " ",
s)
self.content.append(s)
def as_string(self):
if self.type == "":
pre = "<div class='text-enriched'>"
post = "</div>"
elif self.type == "bold":
pre = "<b>"
post = "</b>"
else:
raise NotImplementedError("Unknown type " + self.type)
s = pre
for c in self.content:
if isinstance(c, type(self)):
s += c.as_string()
else:
s += html.escape(c)
s += post
return s
def __init__(self, s):
self.stack = [ self.TEElement("") ]
while s:
stack_top = self.stack[-1]
m = re.match(r'(.*?)<(/?[A-Za-z0-9-]{,60})>(.*)', s, re.DOTALL)
if m:
if m.group(2).lower == "param" and re.match(r'\s*', m.group(1)):
stack_top.content.append(TEElement("param"))
else:
stack_top.append_text(m.group(1))
if m.group(2)[0] != "/":
new = self.TEElement(m.group(2))
stack_top.content.append(new)
self.stack.append(new)
else:
if stack_top.type == m.group(2)[1:]:
self.stack.pop()
else:
raise RuntimeError("Nesting error: Expected %s, got %s near %s", self.stack[-1].type, m.group(2)[1:], s)
s = m.group(3)
else:
stack_top.append_text(s)
s = ""
def as_string(self):
return self.stack[0].as_string()
for f in sys.argv[1:]:
print("F", f)
mb = mailbox.mbox(f)
for m in mb:
archive(m)