yama/mbox2web

1317 lines
49 KiB
Plaintext
Raw Permalink Normal View History

#!/usr/bin/python3
from collections import defaultdict
import datetime
2019-04-30 21:55:21 +02:00
import email.header
import email.parser
2019-03-01 11:58:22 +01:00
import hashlib
import html
import html.parser
import mailbox
import os
import pprint
import re
2019-03-01 13:54:13 +01:00
import subprocess
import sys
2019-03-01 13:54:13 +01:00
import tempfile
import urllib.parse
import jinja2
2019-10-31 21:22:03 +01:00
import tinycss
basedir = "."
jenv = jinja2.Environment(
loader=jinja2.FileSystemLoader(["templates"]),
autoescape=True,
)
def get_message_id(msg):
"""
Extract the message id from a message
Note that this assumes that there is (at least) one message id. If
this is not the case, it will raise an exception (currently an
IndexError, but we may use something more suitable in the future).
"""
match = re.search(r'<(.*?)>', msg["Message-ID"])
return match.group(1)
2019-04-30 21:55:21 +02:00
def encode_message_id(msgid):
encmsgid = re.sub('[^!"$(-.0-9:=@-z|~]', lambda x: "{%02x}" % (ord(x.group(0))), msgid)
return encmsgid
2019-04-30 21:55:21 +02:00
def decode_rfc2047(s):
if s is None:
return None
r = ""
for chunk in email.header.decode_header(s):
if chunk[1]:
try:
r += chunk[0].decode(chunk[1])
except LookupError:
r += chunk[0].decode("windows-1252")
except UnicodeDecodeError:
r += chunk[0].decode("windows-1252")
elif type(chunk[0]) == bytes:
r += chunk[0].decode('us-ascii')
else:
r += chunk[0]
return r
def render_message(msg):
msgtmpl = jenv.get_template("message2.html")
bodyhtml = render_body(msg)
context = {
2019-03-10 22:47:10 +01:00
"msg": msg,
"message_id": msg["Message-Id"],
2019-04-30 21:55:21 +02:00
"subject": decode_rfc2047(msg["Subject"]),
"from": decode_rfc2047(msg["From"]),
"date": msg["Date"],
"bodyhtml": bodyhtml,
}
msghtml = msgtmpl.render(context)
return jinja2.Markup(msghtml)
def save_part(msg, disposition):
content_type = msg.get_content_type()
extension = {
"application/octet-stream": ".bin",
"text/html": ".html",
"text/x-vcard": ".vcf",
"text/plain": ".txt",
"application/x-gzip": ".gz", # more likely tar.gz, but we can't know without looking into it which we ain't
"image/gif": ".gif",
"text/x-c": ".c",
"application/x-perl": ".pl",
"application/msword": ".doc",
"application/ms-tnef": ".ms-tnef",
"application/x-bzip2": ".bz2", # more likely tar.bz2, but we can't know without looking into it which we ain't
"application/x-shellscript": ".sh",
"application/x-java-vm": ".bin", # The only instances are mis-labelled
"image/png": ".png",
"application/pgp-keys": ".pgp",
"application/x-gunzip": ".gz", # that sort of makes sense, but not really
"image/jpeg": ".jpg",
"text/x-python": ".py",
"text/x-java": ".java",
"application/x-sh": ".sh",
"text/x-patch": ".patch",
"text/x-c++src": ".c++",
"application/x-compressed-tar": ".tar.gz",
"application/vnd.oasis.opendocument.text": ".odt",
2019-05-12 23:11:21 +02:00
"text/x-perl": ".pl",
"application/pgp-signature": ".pgp",
2019-05-20 23:16:30 +02:00
"image/svg+xml": ".svg",
}[content_type]
name = msg.get_param("name") or "(data)"
m = hashlib.sha256()
payload = msg.get_payload(decode=True)
m.update(payload)
filename = m.hexdigest() + extension
os.makedirs("parts", exist_ok=True)
with open("parts/" + filename, "wb") as fh:
fh.write(payload)
url = "../../parts/" + filename
if disposition == "_url":
return url
else:
template_name = disposition + "_" + content_type.replace("/", "_") + ".html"
bodytmpl = jenv.get_template(template_name)
context = {
"name": name,
"url": url,
}
bodyhtml = bodytmpl.render(context)
return bodyhtml
partial_message_cache = {}
def render_body(msg, extra=None):
def render_text_plain(msg, extra=None):
# msg.get_charset() doesn't work
ct_params = dict(msg.get_params() or [])
charset = ct_params.get("charset", "iso-8859-1")
format = ct_params.get("format", "fixed")
if format == "fixed":
bodytmpl = jenv.get_template("body_text_plain.html")
partbytes = msg.get_payload(decode=True)
try:
parttext = partbytes.decode(charset, errors="replace")
except LookupError as e:
# Unknown encoding? Probably win-1252
print(e, file=sys.stderr)
parttext = partbytes.decode("windows-1252", errors="replace")
context = {
"body": parttext
}
return bodytmpl.render(context)
elif format == "flowed":
bodytmpl = jenv.get_template("body_text_plain_flowed.html")
parthtml = TextFlowedPart(msg).as_string()
context = {
"body": jinja2.Markup(parthtml),
}
return bodytmpl.render(context)
else:
raise NotImplementedError()
def render_multipart_mixed(msg, extra=None):
parts = msg.get_payload()
if type(parts) == str:
# mislabelled, assume text/plain
2019-03-17 22:24:17 +01:00
return render_text_plain(msg)
2019-05-12 22:06:51 +02:00
# First, scan for parts with a content-id. A multipart/mixed shouldn't
# have them, but I've seen them in the wild and it should be harmless
# to support at least images. We don't want all content types, though,
# because save_part doesn't support nested parts and I don't want to
# fully implement what is really just a workaround for buggy software.
for i, part in enumerate(msg.get_payload()):
content_id = part.get("Content-Id")
content_type = part.get_content_type()
if content_id and content_type.startswith("image/"):
if extra is None:
extra = {}
extra[content_id] = {
"i": i,
"part": part,
"url": save_part(part, "_url"),
}
partshtml = []
for part in msg.get_payload():
2019-05-12 22:06:51 +02:00
partshtml.append(render_body(part, extra))
bodytmpl = jenv.get_template("body_multipart_mixed.html")
context = {
"parts": partshtml
}
return bodytmpl.render(context)
def render_multipart_digest(msg, extra=None):
partshtml = []
for part in msg.get_payload():
partshtml.append(render_message(part))
bodytmpl = jenv.get_template("body_multipart_digest.html")
context = {
"parts": partshtml
}
return bodytmpl.render(context)
def render_message_rfc822(msg, extra=None):
partshtml = []
for part in msg.get_payload():
partshtml.append(render_message(part))
bodytmpl = jenv.get_template("body_message_rfc822.html")
context = {
"parts": partshtml
}
return bodytmpl.render(context)
def render_text_html(msg, extra=None):
htmlpart = HTMLPart(extra)
ct_params = dict(msg.get_params())
charset = ct_params.get("charset", "iso-8859-1")
raw_text = msg.get_payload(decode=True).decode(charset, errors="replace")
htmlpart.feed(raw_text)
bodytmpl = jenv.get_template("body_text_html.html")
context = {
"body": jinja2.Markup(htmlpart.as_string())
}
return bodytmpl.render(context)
def render_text_enriched(msg, extra=None):
payload = msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1")
tepart = TextEnrichedPart(payload)
bodytmpl = jenv.get_template("body_text_enriched.html")
context = {
"body": jinja2.Markup(tepart.as_string())
}
return bodytmpl.render(context)
def render_message_partial(msg, extra=None):
# Default header for get_param is Content-Type
whole_msg_id = msg.get_param("id")
if not whole_msg_id in partial_message_cache:
# For now we assume that total is present on all parts. This
# isn't guarantueed, however, and we may need to handle the
# case where total is only present on the last part.
partial_message_cache[whole_msg_id] = [None] * int(msg.get_param("total"))
payload = msg.get_payload()
s = payload[0].as_string() # Only one part
partial_message_cache[whole_msg_id][int(msg.get_param("number"))-1] = s
if not None in partial_message_cache[whole_msg_id]:
p = email.parser.Parser()
whole_msg = p.parsestr("".join(partial_message_cache[whole_msg_id]))
whole_msg_embedded_id = whole_msg["Message-Id"]
if not whole_msg_embedded_id:
whole_msg.add_header("Message-Id", "<" + whole_msg_id + ">")
whole_msg_embedded_id = whole_msg_id
if whole_msg["Date"] is None:
whole_msg["Date"] = msg["Date"]
arch.add_message(whole_msg) # XXX - global
del partial_message_cache[whole_msg_id]
return "<p>This is part %d of %d of <a href='../%s/'>%s</a></p>" % (
int(msg.get_param("number")),
int(msg.get_param("total")),
encode_message_id(whole_msg_id),
html.escape(whole_msg_id))
def render_application_octet_stream(msg, extra=None):
return save_part(msg, "attachment")
2019-03-01 11:58:22 +01:00
def render_multipart_signed(msg, extra=None):
2019-03-01 13:54:13 +01:00
content, signature = msg.get_payload()
with tempfile.NamedTemporaryFile(buffering=0) as content_fh:
content_fh.write(content.as_bytes())
with tempfile.NamedTemporaryFile(buffering=0, suffix=".asc") as signature_fh:
signature_fh.write(signature.get_payload(decode=True))
r = subprocess.run(["gpg", "--verify", signature_fh.name, content_fh.name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
gpgresult = r.stderr
2019-03-02 12:24:56 +01:00
# XXX - Analyze gpgresult or just use r,returncode?
2019-03-01 13:54:13 +01:00
gpgstatus = "dubious"
contenthtml = render_message(content)
bodytmpl = jenv.get_template("body_multipart_signed.html")
context = {
"content": contenthtml,
"gpgresult": gpgresult,
"gpgstatus": gpgstatus,
}
return bodytmpl.render(context)
2019-03-01 11:58:22 +01:00
def render_application_pgp(msg, extra=None):
2019-03-02 12:24:56 +01:00
with tempfile.NamedTemporaryFile(buffering=0) as content_fh:
content_fh.write(msg.get_payload(decode=True))
r = subprocess.run(["gpg", "--decrypt", content_fh.name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
gpgresult = r.stderr.decode()
# XXX - Analyze gpgresult or just use r,returncode?
gpgstatus = "dubious"
decrypted_content = r.stdout
p = email.parser.BytesParser()
embedded_message = p.parsebytes(decrypted_content)
contenthtml = render_message(embedded_message)
bodytmpl = jenv.get_template("body_application_pgp.html")
context = {
"content": contenthtml,
"gpgresult": gpgresult,
"gpgstatus": gpgstatus,
}
return bodytmpl.render(context)
2019-03-02 12:24:56 +01:00
def render_multipart_alternative(msg, extra=None):
partshtml = []
partstypes = []
for part in msg.get_payload():
partstypes.append(part.get_content_type())
partshtml.append(render_body(part, extra))
bodytmpl = jenv.get_template("body_multipart_alternative.html")
context = {
"types": partstypes,
"parts": partshtml,
}
return bodytmpl.render(context)
def render_application_x_unknown_content_type_scpfile(msg, extra=None):
bodytmpl = jenv.get_template("body_application_x-unknown-content-type-scpfile.html")
context = {
"body": msg.get_payload(decode=True).decode(msg.get_charset() or "iso-8859-1")
}
return bodytmpl.render(context)
def render_application_pgp_signature(msg, extra=None):
# A PGP signature outside of a multipart/signed - useless
bodytmpl = jenv.get_template("body_application_pgp-signature.html")
context = {
}
return bodytmpl.render(context)
def render_application_x_gzip(msg, extra=None):
return save_part(msg, "attachment")
def render_message_news(msg, extra=None):
2019-03-10 22:47:10 +01:00
partshtml = []
for part in msg.get_payload():
partshtml.append(render_message(part))
bodytmpl = jenv.get_template("body_message_news.html")
context = {
"msg": msg,
"parts": partshtml,
}
return bodytmpl.render(context)
def render_image_gif(msg, extra=None):
return save_part(msg, "inline")
def render_multipart_related(msg, extra=None):
start = msg.get_param("start")
start_part = None
# collect content-ids
content = {}
for i, part in enumerate(msg.get_payload()):
content_id = part.get("Content-Id")
if start_part is None and (start is None or content_id == start):
start_part = part
continue
if content_id:
content[content_id] = {
"i": i,
"part": part,
"url": save_part(part, "_url"),
}
parthtml = render_body(start_part, content)
bodytmpl = jenv.get_template("body_multipart_related.html")
context = {
"msg": msg,
"parts": [parthtml],
}
return bodytmpl.render(context)
2019-05-12 22:12:12 +02:00
def render_image_jpeg(msg, extra=None):
return save_part(msg, "inline")
2019-05-20 23:59:26 +02:00
def render_message_delivery_status(msg, extra=None):
bodytmpl = jenv.get_template("body_message_delivery_status.html")
# A message/delivery status consists of one per-message block
# followed by one or more per-recipient blocks.
# Pythons message parser apparently parses each block as a message
# consisting only of headers. So we just stringify and concatenate them
parts = msg.get_payload()
parttext = "".join([str(p) for p in parts])
context = {
"body": parttext
}
return bodytmpl.render(context)
renderers = {
"text/plain": render_text_plain,
"multipart/mixed": render_multipart_mixed,
"multipart/digest": render_multipart_digest,
"message/rfc822": render_message_rfc822,
"text/html": render_text_html,
"text/enriched": render_text_enriched,
"message/partial": render_message_partial,
"application/octet-stream": render_application_octet_stream,
"multipart/signed": render_multipart_signed,
"application/pgp": render_application_pgp,
"multipart/alternative": render_multipart_alternative,
"application/x-unknown-content-type-scpfile": render_application_x_unknown_content_type_scpfile,
"application/pgp-signature": render_application_pgp_signature,
"application/x-gzip": render_application_x_gzip,
"message/news": render_message_news,
"image/gif": render_image_gif,
"multipart/related": render_multipart_related,
"application/x-java-vm": render_application_octet_stream,
2019-05-12 22:12:12 +02:00
"image/jpeg": render_image_jpeg,
"application/x-compressed-tar": render_application_octet_stream,
2019-05-20 23:59:26 +02:00
"message/delivery-status": render_message_delivery_status,
2019-06-17 20:59:23 +02:00
"application/pgp-keys": render_application_octet_stream,
}
content_type = msg.get_content_type()
content_disposition = msg.get_content_disposition()
if content_disposition == "attachment":
# XXX - not sure if we should just store all content-types.
# We probably should clean up html. Alternatively we could just store
# all of them application/octet-stream, which browsers should download
# and not try to display.
bodyhtml = save_part(msg, content_disposition)
else:
bodyhtml = renderers[content_type](msg, extra)
return jinja2.Markup(bodyhtml)
class HTMLPart(html.parser.HTMLParser):
2019-10-31 21:22:03 +01:00
"""
A text/html part
This is a subclass of HTMLParser, so the handle_* methods will be invoked
as appropriate during parsing. There are a few additional attributes to
keep track of the state:
... attribute:: content
Accumulates parts fragments of the final, cleaned up, html message as
strings
... attribute:: base
The base URL
... attribute:: extra
Context information. This includes info about cids or references to
other messages
... attribute:: hide
If true, the content of the current tag is omitted from the output.
This is set when encountering a start tag in hide_tags, and reset at
each end tag (so it works only for leaves).
... attribute:: current_tag
The current tag. Similar to hide, this is set and reset when
encountering start end end tags, so it is only correct while processing
a leaf element. But since we use it only for style elements, that's
acceptable.
"""
allowed_tags = [
"h2", "a", "wbr", "hr", "pre", "img", "font", "i", "br", "table", "tr",
"th", "td", "b", "select", "option", "input", "sup", "address",
"center", "p", "h1", "dl", "h3", "ul", "li", "ol", "u", "blockquote",
2019-10-31 21:22:03 +01:00
"h4", "div", "span", "style",
]
hide_tags = [ "title" ]
ignore_tags = [ "html", "head", "body", "marquee", "meta", "form", ]
def __init__(self, extra):
super().__init__()
self.hide = False
self.content = []
2019-03-01 22:52:41 +01:00
self.base = None
self.extra = extra or {}
2019-10-31 21:22:03 +01:00
self.current_tag = None
def handle_starttag(self, tag, attrs):
2019-10-31 21:22:03 +01:00
self.current_tag = tag
if tag == "base":
href = [x[1] for x in attrs if x[0] == "href"]
if href:
self.base = href[0]
elif tag in self.allowed_tags:
2019-06-18 22:11:23 +02:00
cleaned_attrs, extra = self.clean_attrs(tag, attrs)
attrstr = "".join(
[' %s="%s"' % (a[0], html.escape(a[1])) if a[1] else ' %s' % (a[0])
2019-06-18 22:11:23 +02:00
for a in cleaned_attrs
]
)
self.content.append("<%s%s>" % ( tag, attrstr ))
2019-06-18 22:11:23 +02:00
if extra:
self.content.append(extra)
elif tag in self.hide_tags:
self.hide = True
elif tag in self.ignore_tags:
pass
else:
print("Encountered unknown start tag", tag, attrs, file=sys.stderr)
def handle_endtag(self, tag):
if tag in self.allowed_tags:
self.content.append("</%s>" % tag)
elif tag in self.hide_tags:
self.hide = False # XXX - Need stack?
elif tag in self.ignore_tags:
pass
else:
print("Encountered unknown end tag", tag, file=sys.stderr)
2019-10-31 21:22:03 +01:00
self.current_tag = None
def handle_data(self, data):
2019-10-31 21:22:03 +01:00
if self.current_tag == "style":
data = self.clean_style(data)
if not self.hide:
self.content.append(data)
def as_string(self):
return "".join(self.content)
def clean_attrs(self, tag, attrs):
safe_attrs = [
"border", "alt", "size", "face", "width", "height", "hspace",
"cellpadding", "cellspacing", "bgcolor", "valign", "nowrap",
"color", "colspan", "name", "value", "type", "align", "clear",
"noshade", "type",
]
clean_attrs = []
2019-06-18 22:11:23 +02:00
extra = None
for a in attrs:
if a[0] in safe_attrs:
clean_attrs.append(a)
elif a[0] == "href":
url = a[1]
url = urllib.parse.urljoin(self.base, url)
u = urllib.parse.urlparse(url)
if u[0] in ['https', 'http', 'ftp']:
clean_attrs.append((a[0], url))
elif a[0] == "src":
url = a[1]
url = urllib.parse.urljoin(self.base, url)
u = urllib.parse.urlparse(url)
if u[0] == "cid":
print("Encountered src cid attribute", a, file=sys.stderr)
clean_attrs.append((a[0], self.extra["<" + u.path + ">"]["url"]))
else:
print("Ignored src attribute", a, file=sys.stderr)
elif a[0] == "target":
pass
2019-06-18 22:11:23 +02:00
elif a[0] == "cite":
if a[1].startswith("mid:"):
mid = a[1][4:]
encmid = encode_message_id(mid)
extra = "<a class='citesource' href='../%s'>\u2397</a>" % encmid
2019-10-31 21:22:03 +01:00
elif a[0] == "class":
clean_attrs.append((a[0], "msg-" + a[1],))
else:
2019-10-31 21:22:03 +01:00
print("Encountered unknown attribute", a, "in", tag, file=sys.stderr)
2019-06-18 22:11:23 +02:00
return clean_attrs, extra
2019-10-31 21:22:03 +01:00
def clean_style(self, stylesheet):
cssparser = tinycss.make_parser()
stylesheet = cssparser.parse_stylesheet(stylesheet)
clean_stylesheet = ""
for rule in stylesheet.rules:
# first clean up selectors: Prepend "msg-" to every class or id
next_is_local_id = False
new_selector = []
for token in rule.selector:
if next_is_local_id and token.type == "IDENT":
new_id = "msg-" + token.value
new_selector.append(tinycss.token_data.Token(token.type, new_id, new_id, token.unit, token.line, token.column))
else:
new_selector.append(token)
next_is_local_id = token.type == "DELIM" and (token.value == "." or token.value == "#")
rule.selector = tinycss.token_data.TokenList(new_selector)
clean_stylesheet += rule.selector.as_css()
# Then clean up declarations.
# We keep only declarations we recognize
safe_declarations = {
"background-color",
"border-bottom-style",
"border-color",
"border-left",
"border-left-style",
"border-right-style",
"border-style",
"border-top-style",
"color",
"font-family",
"font-size",
"font-style",
"font-weight",
"height",
"list-style",
"margin",
"margin-bottom",
"margin-left",
"margin-right",
"margin-top",
"padding",
"padding-bottom",
"padding-left",
"padding-right",
"padding-top",
"page-break-after",
"text-align",
"text-decoration",
"white-space",
"width",
}
# Ignore these silently to avoid cluttering logs
ignore_declarations = {
"mso-ansi-font-size",
"mso-ansi-language",
"mso-ascii-font-family",
"mso-bidi-font-family",
"mso-bidi-font-size",
"mso-bidi-font-weight",
"mso-fareast-font-family",
"mso-fareast-language",
"mso-gram-e",
"mso-hansi-font-family",
"mso-margin-bottom-alt",
"mso-margin-top-alt",
"mso-outline-level",
"mso-pagination",
"mso-spl-e",
"mso-style-link",
"mso-style-name",
"mso-style-next",
"mso-style-noshow",
"mso-style-parent",
"mso-style-priority",
"mso-style-type",
"page", # doesn't exist in CSS 2.2
"panose-1", # doesn't exist in CSS 2.2
"text-underline", # doesn't exist in CSS 2.2
}
new_declarations = []
for declaration in rule.declarations:
if declaration.name in safe_declarations:
new_declarations.append(declaration)
elif declaration.name == "background-image":
# check if URL is cid, discard if not
ok = False
if len(declaration.value) == 1:
if declaration.value[0].type == "URI":
if declaration.value[0].value.startswith("cid:"):
print("accepting url", declaration.value[0].value)
# Get the real converted url here
new_declarations.append(
parser.parse_style_attr("background-image: url(/whatever.png)")[0][0]
)
ok = True
if not ok:
print("ignoring unsafe CSS property", declaration)
pass
elif declaration.name in ignore_declarations:
pass
else:
print("ignoring unknown CSS property", declaration.name)
clean_stylesheet += " {\n"
for declaration in new_declarations:
clean_stylesheet += "\t" + declaration.name + ":" + declaration.value.as_css() + ";\n"
clean_stylesheet += "}\n\n"
return clean_stylesheet
class TextEnrichedPart:
class TEElement:
def __init__(self, t, parent):
self.type = t.lower()
self.content = []
if self.type == "nofill":
self.filled = False
elif parent:
self.filled = parent.filled
else:
self.filled = True
def append_text(self, s):
s = s.replace("<<", "<")
if self.filled:
s = re.sub(r'\n+',
lambda m: m.group(0)[1:] if len(m.group(0)) > 1 else " ",
s)
self.content.append(s)
def as_string(self):
if self.type == "":
pre = "<div class='text-enriched'>"
post = "</div>"
elif self.type == "bold":
pre = "<b>"
post = "</b>"
elif self.type == "param":
# We shouldn't ever get here since the param should be consumed
# by the parent, but there are broken messages ...
return ""
elif self.type.startswith("x-"):
# Just ignore all experimental elements and render their
# contents.
pre = ""
post = ""
elif self.type == "flushleft":
pre = "<div class='flushleft'>"
post = "</div>"
elif self.type == "smaller":
# HTML has a "small" element, but that is meant for "side
# comments such as small print", while t/e "smaller" is purely
# typographical
pre = "<span style='font-size: 0.9em'>"
post = "</span>"
elif self.type == "color":
param = self.content.pop(0)
if param.type != "param":
raise RuntimeError("Expected 'param', got '%s'" % param.type)
colorstring = param.content[0]
if re.match(r'^\w+$', colorstring):
# a single word, i.e. a colorname like "red" or cyan".
# The 8 colors in the spec aren't a subset of the 17 colors in CSS2,
# but recognized by most/all browsers. And if we encounter a non-standard
# color the best we can do is let the browser handle it.
pass
else:
m = re.match(r'([0-9a-f]{4}),([0-9a-f]{4}),([0-9a-f]{4})', colorstring, re.IGNORECASE)
if m:
# an RGB triple. Use only the top 8 bits of each component:
colorstring = "#%s%s%s" % (m.group(1)[:2], m.group(2)[:2], m.group(3)[:2])
else:
# syntax error. Replace with "black"
colorstring = "#000"
pre = "<span style='color: %s'>" % colorstring
post = "</span>"
elif self.type == "nofill":
pre = "<div class='nofill'>"
post = "</div>"
elif self.type == "fontfamily":
param = self.content.pop(0)
if param.type != "param":
raise RuntimeError("Expected 'param', got '%s'" % param.type)
fontfamily = param.content[0]
if "'" in fontfamily or '"' in fontfamily:
raise RuntimeError("Can't handle quotes in font names (%s)" % fontfamily)
pre = "<span style='font-family: \"%s\"'>" % fontfamily
post = "</span>"
elif self.type == "bigger":
# HTML used to have a "big" element, but that has been removed from HTML5
pre = "<span style='font-size: 1.1em'>"
post = "</span>"
2019-03-17 22:42:44 +01:00
elif self.type == "underline":
# HTML5 redefined the meaning of "u", but I'm using it anyway
pre = "<u>"
post = "</u>"
else:
raise NotImplementedError("Unknown type " + self.type)
s = pre
for c in self.content:
if isinstance(c, type(self)):
s += c.as_string()
else:
s += html.escape(c)
s += post
return s
def __init__(self, s):
self.stack = [ self.TEElement("", None) ]
while s:
stack_top = self.stack[-1]
m = re.match(r'(.*?)<(/?[A-Za-z0-9-]{,60})>(.*)', s, re.DOTALL)
if m:
text = m.group(1)
tag = m.group(2).lower()
if not (tag == "param" and re.match(r'\s*', text) or text == ""):
stack_top.append_text(text)
if tag[0] != "/":
new = self.TEElement(tag, stack_top)
stack_top.content.append(new)
self.stack.append(new)
else:
closed_tag = tag[1:]
if stack_top.type == closed_tag:
self.stack.pop()
elif closed_tag in [e.type for e in self.stack]:
# We close a tag which has been opened, but it
# wasn't the last one. This is clearly a nesting
# error, but there was broken software (e.g.
# http://www.fozztexx.com/Mynah/) which used
# non-closing tags, and by just popping them off
# the stack we can "re-synchronize".
while self.stack.pop().type != closed_tag:
pass
else:
raise RuntimeError("Nesting error: Expected %s, got %s near %s" % (self.stack[-1].type, closed_tag, s))
s = m.group(3)
else:
stack_top.append_text(s)
s = ""
def as_string(self):
return self.stack[0].as_string()
class TextFlowedPart:
def __init__(self, msg):
self.quote_depth = 0
self.current_line = ""
self.flowed = False
self.lines = []
self.buffer_filled = False
ct_params = dict(msg.get_params())
charset = ct_params.get("charset", "iso-8859-1")
format = ct_params.get("format", "fixed")
delsp = ct_params.get("delsp", "no") == "yes"
charset_map = {
"x-mac-roman": "mac_roman",
}
if charset in charset_map:
charset = charset_map[charset]
raw_text = msg.get_payload(decode=True).decode(charset, errors="replace")
raw_lines = raw_text.split("\n")
for rl in raw_lines:
quote_depth = 0
while rl[:1] == ">":
quote_depth += 1
rl = rl[1:]
if rl[:1] == " ":
rl = rl[1:]
if rl == "-- ":
flowed = None
elif rl[-1:] == " ":
flowed = True
if delsp:
rl = rl[:-1]
else:
flowed = False
self.add_buffer(rl, quote_depth, flowed)
self.flush()
def add_buffer(self, line, quote_depth, flowed):
if flowed is None:
self.flush()
flowed = False
if quote_depth != self.quote_depth:
self.flush()
self.quote_depth = quote_depth
self.current_line += line
self.flowed |= flowed
self.buffer_filled = True
if not flowed:
self.flush()
def flush(self):
if self.buffer_filled:
self.lines.append({
"quote_depth": self.quote_depth,
"flowed": self.flowed,
"content": self.current_line
})
self.current_line = ""
self.flowed = False
self.buffer_filled = False
def as_string(self):
prev_quote_depth = 0
s = ""
for ln in self.lines:
while ln["quote_depth"] > prev_quote_depth:
s += "<blockquote>\n"
prev_quote_depth += 1
while ln["quote_depth"] < prev_quote_depth:
s += "</blockquote>\n"
prev_quote_depth -= 1
if ln["flowed"]:
s += "<p class='flowed'>" + html.escape(ln["content"]) + "</p>\n"
else:
s += "<p class='fixed'>" + html.escape(ln["content"]) + "</p>\n"
while 0 < prev_quote_depth:
s += "</blockquote>"
prev_quote_depth -= 1
return s
class Message:
def __init__(self, msg):
self.msgid = get_message_id(msg)
print("M", self.msgid, file=sys.stderr)
self.encmsgid = encode_message_id(self.msgid)
self.date = email.utils.parsedate_to_datetime(msg["Date"])
# In-Reply-To headers with more than one message-id are rare, but
# standard-conforming, and some MUAs (e.g., mutt) create them.
in_reply_to = msg["In-Reply-To"]
if in_reply_to:
if isinstance(in_reply_to, email.header.Header):
in_reply_to = in_reply_to.encode()
in_reply_to_msgids = re.findall(r'<(.*?)>', in_reply_to)
else:
in_reply_to_msgids = []
references = msg["References"]
if references:
references_msgids = re.findall(r'<(.*?)>', references)
else:
references_msgids = []
for msgid in in_reply_to_msgids:
if msgid not in references_msgids:
references_msgids.append(msgid)
if not in_reply_to_msgids and references_msgids:
in_reply_to_msgid = [references_msgids[-1]]
self.in_reply_to = in_reply_to_msgids
self.references = references_msgids
2020-04-15 21:39:56 +02:00
self.mfrom = decode_rfc2047(msg["From"])
self.subject = decode_rfc2047(msg["Subject"])
self.msg = msg
self.kids = False
if self.date.tzinfo is None:
# If timezone is missing, assume local time
self.date = self.date.astimezone()
def __repr__(self):
return (
self.msgid + " " +
self.date.strftime("%Y-%m-%d %H:%M:%S%z") +
" [" + ", ".join(self.references) + "]"
)
def webify(self):
msg = self.msg
mid = self.msgid
print("M", mid, file=sys.stderr)
encmid = self.encmsgid
msgdir = basedir + "/msg/" + encmid
os.makedirs(msgdir, exist_ok=True)
with open(msgdir + "/index.html", "w") as hfd:
msgtmpl = jenv.get_template("message.html")
bodyhtml = render_body(msg)
context = {
"list": "LUGA",
"message_id": mid,
"subject": decode_rfc2047(msg["Subject"]),
"from": decode_rfc2047(msg["From"]),
"date": msg["Date"],
"bodyhtml": bodyhtml,
"threadhtml": self.thread.as_html(),
2020-04-17 01:00:36 +02:00
"threadindex": self.threadindex(),
}
msghtml = msgtmpl.render(context)
hfd.write(msghtml)
2020-04-17 01:00:36 +02:00
def threadindex(self):
return self.thread.index(self)
# For each message-id, record the thread it belongs to.
# This should probably be an instance variable of Archive instead of global,
# but for it doesn't matter.
msg2thread = {}
class Thread:
def __init__(self, archive):
self.archive = archive
self.messages = {}
self.threadid = None
self._as_html = None
def add_message(self, msg):
self.messages[msg.msgid] = msg
self.archive.msg2thread[msg.msgid] = self
msg.thread = self
def merge_thread(self, other):
for msg in other.messages.values():
self.add_message(msg)
def __repr__(self):
if self.threadid:
s = self.threadid
else:
s = str(id(self))
if self.messages:
s += " {" + ", ".join(self.messages.keys()) + "}"
return s
def fixup_in_reply_tos(self):
# Fix up some problems with in_reply_to:
# Sometimes an in_reply_to refers to a message which isn't in the
# archive. Add a dummy message if this happens.
# Sometimes an in_reply_to refers to a message with a later date.
# In this case one of the two date headers must be wrong. We could try
# to analyze other headers (especially received), but for now we just
# assume that it is the referrer (although in the example I'm
# currently looking at it is the referree) and adjust that. We should
# preserve the original date header, though. Use separate sort_date and
# date?
missing = set()
for m in self.messages.values():
for r in m.in_reply_to:
if r not in self.messages:
missing.add(r)
for r in missing:
firstdate = sorted(self.messages.values(), key=lambda x: x.date)[0].date
missingdate = firstdate - datetime.timedelta(seconds=1)
msg = email.message.EmailMessage()
msg["Message-Id"] = f"<{r}>"
msg["Date"] = missingdate
msg["From"] = "unknown@invalid"
msg["Subject"] = "(not in archive)"
self.add_message(Message(msg))
dates_ok = False
while not dates_ok:
dates_ok = True
for m in self.messages.values():
for r in m.in_reply_to:
rr = self.messages[r]
if rr.date >= m.date:
m.date = rr.date + datetime.timedelta(seconds=1)
dates_ok = False
def as_html(self):
if self._as_html:
# This method isn't that expensive, but it isn't idempotent - so we
# must not run the algorithm twice on the same thread. Therefore we
# remember the result and return it on subsequent runs.
s = self._as_html
return jinja2.Markup(s)
self.fixup_in_reply_tos()
y = 0
x = 0
nodes = []
edges = []
lines = []
for m in sorted(self.messages.values(), key=lambda x: x.date):
# We have already fudged the in_reply_to field to always contain
# the latest reference(s), so we only need to consider that
if len(m.in_reply_to) == 0:
if y == 0:
# first message in thread
# Just add a node
nodes.append((x, y, m.encmsgid))
m.x = x
m.y = y
else:
# Not in reply to anything, but not the start of the thread
# either. This will happen if fixup_in_reply_tos adds more
# than one dummy message, but it might also happen if we
# use different criteria for matching threads (e.g. Subject
# or Thread-Index)
# Just start a new column to get out of the way
x += 1
nodes.append((x, y, m.encmsgid))
m.x = x
m.y = y
elif len(m.in_reply_to) == 1:
p = self.messages[m.in_reply_to[0]]
if p.kids:
# The parent already has kids, so we must move to the side
# to avoid running an edge through an existing kid. We
# could use a sophisticated algorithm to find the best
# position here, but I think it sufficient to just start a
# new column. This may waste some space (there might have
# been a suitable position in the existing columns, but it
# will avoid collisions and is very simple.
x += 1
m.x = x
m.y = y
else:
# Just put the new kid directly below the parent
m.x = p.x
m.y = y
nodes.append((m.x, m.y, m.encmsgid))
edges.append((p.x, p.y, m.x, m.y))
p.kids = True
else:
# Generic case with multiple references.
# I think this should always work well if we start a new
# column. There may be special cases where we can avoid it, not
# sure.
x += 1
m.x = x
m.y = y
nodes.append((m.x, m.y, m.encmsgid))
for r in m.in_reply_to:
p = self.messages[r]
edges.append((p.x, p.y, m.x, m.y))
2020-06-14 20:28:20 +02:00
lines.append((m.date, m.mfrom or "(no sender)", m.subject or "(no subject)", m.encmsgid))
y += 1
s = "<table class='thread'>"
s += "<tr>"
s += f"<td rowspan={y}>"
r = 4
fx = 16
fy = 32
s += f"<svg width={(x + 1) * fx} height={y * fy}>"
for e in edges:
if e[0] == e[2]:
s += f"<line x1={e[0] * fx + fx/2} y1={e[1] * fy + fy/2} x2={e[2] * fx + fx/2} y2={e[3] * fy + fy/2} stroke='black' />"
else:
if e[3] == e[1] + 1:
yc = (e[1] + e[2]) / 2
else:
yc = e[1] + 1
s += f"<path d='M {e[0] * fx + fx/2} {e[1] * fy + fy/2} Q {e[2] * fx + fx/2} {yc * fy + fy/2} {e[2] * fx + fx/2} {e[3] * fy + fy/2}' stroke='black' fill='none' />"
for n in nodes:
s += f"<a xlink:href='../../msg/{n[2]}/' >"
s += f"<circle cx={n[0] * fx + fx/2} cy={n[1] * fy + fy/2} r={r} />"
s += f"</a>"
s += "</svg>"
s += "</td>"
# XXX - escape!
s += f"<td class='date'><a href='../../msg/{lines[0][3]}/'>{lines[0][0]}</a></td>"
2020-04-15 21:39:56 +02:00
s += f"<td class='from'>{html.escape(lines[0][1])}</td>"
s += f"<td class='subject'>{html.escape(lines[0][2])}</td>"
s += "</tr>"
for ln in lines[1:]:
s += "<tr>"
s += f"<td class='date'><a href='../../msg/{ln[3]}/'>{ln[0]}</a></td>"
2020-04-15 21:39:56 +02:00
s += f"<td class='from'>{html.escape(ln[1])}</td>"
s += f"<td class='subject'>{html.escape(ln[2])}</td>"
s += "</tr>"
s += "</table>"
self._as_html = s
return jinja2.Markup(s)
@property
def subject(self):
return list(self.messages.values())[0].subject
def index(self, message):
for i, m in enumerate(sorted(self.messages.values(), key=lambda x: x.date)):
if m == message:
return i
class Month:
def __init__(self, year, month):
self.year = year
self.month = month
self.threads = defaultdict(int)
def add(self, thread):
self.threads[thread] += 1
@property
def longest_thread(self):
thread = None
maxcount = 0
for t, c in self.threads.items():
if c > maxcount:
maxcount = c
thread = t
print("longest_thread: found thread", thread)
return thread
class Archive:
def __init__(self):
self.messages = []
self.msg2thread = {}
def add_message(self, msg):
self.self_check()
m = Message(msg)
if m.msgid in self.msg2thread:
# We have already seen this message, so ignore it
return
t = Thread(self)
t.add_message(m)
self.messages.append(m)
self.self_check()
def merge_threads(self):
self.self_check()
finished = False
while not finished:
finished = True
for msgid in list(self.msg2thread.keys()):
thread = self.msg2thread[msgid]
for msgid2 in list(thread.messages.keys()):
msg = thread.messages[msgid2]
for r in msg.references:
if r in thread.messages:
pass
else:
# references may contain non-existant messages, so
# be careful:
if r in self.msg2thread:
thread.merge_thread(self.msg2thread[r])
finished = False
self.thread_list = []
for thread in self.msg2thread.values():
if thread.threadid:
continue
messages = iter(thread.messages.values())
msg = next(messages)
thread.date = msg.date
thread.threadid = msg.msgid
for msg in messages:
if msg.date < thread.date:
thread.threadid = msg.msgid
thread.date = msg.date
self.thread_list.append(thread)
def webify_messages(self):
self.self_check()
for m in self.messages:
m.webify()
def webify_threads(self):
self.self_check()
threadtmpl = jenv.get_template("thread.html")
for t in self.thread_list:
threaddir = basedir + "/thread/" + t.threadid
os.makedirs(threaddir, exist_ok=True)
with open(threaddir + "/index.html", "w") as hfd:
context = {
"list": "LUGA",
"threadhtml": t.as_html(),
}
threadhtml = threadtmpl.render(context)
hfd.write(threadhtml)
def webify_calendar(self):
ovrtmpl = jenv.get_template("overview.html")
bmotmpl = jenv.get_template("by_month.html")
cal = {}
for t in self.thread_list:
for m in t.messages.values():
y = m.date.year
m = m.date.month
if y not in cal:
cal[y] = {}
if m not in cal[y]:
cal[y][m] = Month(y, m)
cal[y][m].add(t)
caldir = basedir + "/cal"
os.makedirs(caldir, exist_ok=True)
with open(caldir + "/index.html", "w") as hfd:
context = {
"list": "LUGA",
"cal": cal,
}
calhtml = ovrtmpl.render(context)
hfd.write(calhtml)
for y in cal.keys():
for m in cal[y].keys():
monthdir = f"{caldir}/{y}/{m}"
os.makedirs(monthdir, exist_ok=True)
with open(monthdir + "/index.html", "w") as hfd:
context = {
"month": cal[y][m]
}
monthhtml = bmotmpl.render(context)
hfd.write(monthhtml)
def self_check(self):
# The messages in self.messages must be unique:
seen = set()
for m in self.messages:
assert m.msgid not in seen, m.msgid
seen.add(m.msgid)
arch = Archive()
for f in sys.argv[1:]:
print("F", f, file=sys.stderr)
mb = mailbox.mbox(f)
for m in mb:
arch.add_message(m)
# Now I have a lot of 1 message threads
# Merge them
arch.merge_threads()
# Then dump all the messages
arch.webify_messages()
# And the threads
arch.webify_threads()
# And a calendar view
arch.webify_calendar()
# vim: tw=79