zettel/app.py

220 lines
6.3 KiB
Python

import base64
import email.message
import os
import smtplib
import markdown
import psycopg
from flask import Flask, render_template, g, url_for, session, request
from markupsafe import Markup
import config
app = Flask(__name__)
app.config.from_object(config)
#-----------------------------------------------------------------------
# HTTP endpoints
@app.get("/")
def index():
return show_posts("")
@app.get("/<path:p>")
def show_posts(p):
roles = get_roles()
app.logger.info(f"{p}")
tags = p.split("/")
csr = get_cursor()
csr.execute("select * from tag where lower(name) = any(%s)", (tags,))
tag_ids = [r.id for r in csr]
# XXX - Check that tag_ids has the same length as tags.
# Otherwise
# - return 404?
# - redirect?
q = "select distinct p.* from post p"
for tag_id in tag_ids:
# we got tag_id from the database so we know it's an int
q += f" join tag_post t{tag_id} on p.id = t{tag_id}.post and t{tag_id}.tag = {tag_id}"
# and the user_id is an int, too
q += f" join acl on p.id = acl.post and acl.role = any(%s)"
q += " order by p.ts desc"
csr.execute(q, (roles,))
posts = csr.fetchall()
# XXX find all tags for each post
post_ids = [p.id for p in posts]
csr.execute(
"select * from tag_post tp join tag t on tp.tag = t.id where post = any(%s) order by name",
(post_ids,))
tags_by_id = {}
tags_by_post = {}
for r in csr:
if r.id not in tags_by_id:
selected = r.id in tag_ids
if selected:
# XXX - should really use id
new_tags = [t for t in tags if t != r.name.lower()]
else:
new_tags = tags + [r.name.lower()]
link = url_for("show_posts", p="/".join(new_tags))
tags_by_id[r.id] = { "id": r.id, "name": r.name, "selected": selected, "link": link }
tags_by_post.setdefault(r.post, []).append(tags_by_id[r.id])
tags = tags_by_id.values()
return render_template(
"posts.html",
posts=posts, tags=tags, tags_by_post=tags_by_post,
)
# XXX - do we need a prefix for paths?
# Or use a separate domain for the api (no)
# Or use an impossible prefix for the api (what would that be?)
#@get("/api")
#@get("/@/add_post")
#@post("/@/add_post")
# also json
#@get("/@/tag/<tag_name_or_id>")
# plus one or more posts for operations, like editing implies, bulk
# updates, etc.
@app.get("/@/login")
def login_form():
# could duplicate as registration
return render_template("login_form.html")
@app.post("/@/login")
def login_step1():
csr = get_cursor()
email = request.form["email"].strip()
csr.execute("select * from role where email = %s", (email,))
r = csr.fetchone()
if not r:
csr.execute("insert into role(email) values(%s) returning *", (email,))
r = csr.fetchone()
user_id = r.id
key = base64.urlsafe_b64encode(os.urandom(15)).decode('us-ascii')
csr.execute("insert into confirm_token(role, token) values(%s, %s) returning *", (user_id, key,))
r = csr.fetchone()
csr.connection.commit()
send_mail(email, request.root_url + url_for("login_confirm", key=key))
return render_template("mail_sent.html")
@app.get("/@/confirm")
def login_confirm():
# XXX - Check that key isn't expired
return render_template("login_confirm.html")
@app.post("/@/confirm")
def do_confirm():
csr = get_cursor()
csr.execute("delete from confirm_token where expires <= now()")
csr.execute(
"""
select id, name from confirm_token t join role r on t.role = r.id
where token = %s
""",
(request.form["key"],))
r = csr.fetchone()
if r:
csr.execute(
"""
update role
set first_confirmed = coalesce(first_confirmed, now()),
last_confirmed = now()
where id = %s
""",
(r.id,))
session["user_id"] = r.id
session["user_name"] = r.name
return render_template("welcome.html")
else:
return render_template("no_token.html")
@app.route("/@/logout", methods=["GET", "POST"])
def logout():
del session["user_id"]
del session["user_name"]
return redirect("/", code=303)
#@get("/~user")
# maybe too oldskool? use @user?
#@get("/~user/token")
#-----------------------------------------------------------------------
# Content generation
@app.template_filter("markdown")
def markdown_to_html(m):
return Markup(markdown.markdown(m))
def send_mail(email_address, confirmation_url):
msg = email.message.EmailMessage()
msg["From"] ="noreply@hjp.at"
msg["To"] = email_address
msg["Subject"] = "Zettel confirmation"
body = confirmation_url # Really minimalistic
msg.set_content(body)
mta = smtplib.SMTP(host="localhost")
mta.send_message(msg)
#-----------------------------------------------------------------------
# Plumbing
def get_roles():
user_id = session.get("user_id")
csr = get_cursor()
if not user_id:
csr.execute("select * from role where name = %s", ("public",))
r = csr.fetchone()
user_id = session["user_id"] = r.id
session["user_name"] = r.name
assert type(user_id) == int
roles = [user_id]
csr.execute(
"""
with recursive r as (
select group_role from role_member where member_role = %s
union
select role_member.group_role from role_member join r on role_member.member_role = r.group_role
)
select * from r
""",
(user_id,))
for r in csr:
roles.append(r.group_role)
return roles
def get_cursor():
db = get_db()
csr = db.cursor(row_factory=psycopg.rows.namedtuple_row)
return csr
def get_db():
if "db" not in g:
g.db = psycopg.connect(dbname=config.dbname, user=config.dbuser)
return g.db
@app.teardown_appcontext
def teardown_db(exception):
app.logger.debug("in teardown_db: exception = %s", exception)
db = g.pop('db', None)
if db is not None:
if not exception:
try:
db.commit()
except:
pass
db.rollback()
db.close()