220 lines
6.3 KiB
Python
220 lines
6.3 KiB
Python
import base64
|
|
import email.message
|
|
import os
|
|
import smtplib
|
|
|
|
import markdown
|
|
import psycopg
|
|
|
|
from flask import Flask, render_template, g, url_for, session, request
|
|
from markupsafe import Markup
|
|
|
|
import config
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_object(config)
|
|
|
|
#-----------------------------------------------------------------------
|
|
|
|
# HTTP endpoints
|
|
|
|
@app.get("/")
|
|
def index():
|
|
return show_posts("")
|
|
|
|
@app.get("/<path:p>")
|
|
def show_posts(p):
|
|
roles = get_roles()
|
|
app.logger.info(f"{p}")
|
|
tags = p.split("/")
|
|
csr = get_cursor()
|
|
csr.execute("select * from tag where lower(name) = any(%s)", (tags,))
|
|
tag_ids = [r.id for r in csr]
|
|
# XXX - Check that tag_ids has the same length as tags.
|
|
# Otherwise
|
|
# - return 404?
|
|
# - redirect?
|
|
q = "select distinct p.* from post p"
|
|
for tag_id in tag_ids:
|
|
# we got tag_id from the database so we know it's an int
|
|
q += f" join tag_post t{tag_id} on p.id = t{tag_id}.post and t{tag_id}.tag = {tag_id}"
|
|
# and the user_id is an int, too
|
|
q += f" join acl on p.id = acl.post and acl.role = any(%s)"
|
|
q += " order by p.ts desc"
|
|
csr.execute(q, (roles,))
|
|
posts = csr.fetchall()
|
|
# XXX find all tags for each post
|
|
post_ids = [p.id for p in posts]
|
|
csr.execute(
|
|
"select * from tag_post tp join tag t on tp.tag = t.id where post = any(%s) order by name",
|
|
(post_ids,))
|
|
tags_by_id = {}
|
|
tags_by_post = {}
|
|
for r in csr:
|
|
if r.id not in tags_by_id:
|
|
selected = r.id in tag_ids
|
|
if selected:
|
|
# XXX - should really use id
|
|
new_tags = [t for t in tags if t != r.name.lower()]
|
|
else:
|
|
new_tags = tags + [r.name.lower()]
|
|
link = url_for("show_posts", p="/".join(new_tags))
|
|
tags_by_id[r.id] = { "id": r.id, "name": r.name, "selected": selected, "link": link }
|
|
tags_by_post.setdefault(r.post, []).append(tags_by_id[r.id])
|
|
tags = tags_by_id.values()
|
|
|
|
|
|
|
|
return render_template(
|
|
"posts.html",
|
|
posts=posts, tags=tags, tags_by_post=tags_by_post,
|
|
)
|
|
|
|
|
|
# XXX - do we need a prefix for paths?
|
|
# Or use a separate domain for the api (no)
|
|
# Or use an impossible prefix for the api (what would that be?)
|
|
#@get("/api")
|
|
|
|
#@get("/@/add_post")
|
|
#@post("/@/add_post")
|
|
# also json
|
|
|
|
#@get("/@/tag/<tag_name_or_id>")
|
|
# plus one or more posts for operations, like editing implies, bulk
|
|
# updates, etc.
|
|
|
|
@app.get("/@/login")
|
|
def login_form():
|
|
# could duplicate as registration
|
|
return render_template("login_form.html")
|
|
|
|
@app.post("/@/login")
|
|
def login_step1():
|
|
csr = get_cursor()
|
|
email = request.form["email"].strip()
|
|
csr.execute("select * from role where email = %s", (email,))
|
|
r = csr.fetchone()
|
|
if not r:
|
|
csr.execute("insert into role(email) values(%s) returning *", (email,))
|
|
r = csr.fetchone()
|
|
user_id = r.id
|
|
key = base64.urlsafe_b64encode(os.urandom(15)).decode('us-ascii')
|
|
csr.execute("insert into confirm_token(role, token) values(%s, %s) returning *", (user_id, key,))
|
|
r = csr.fetchone()
|
|
csr.connection.commit()
|
|
send_mail(email, request.root_url + url_for("login_confirm", key=key))
|
|
return render_template("mail_sent.html")
|
|
|
|
@app.get("/@/confirm")
|
|
def login_confirm():
|
|
# XXX - Check that key isn't expired
|
|
return render_template("login_confirm.html")
|
|
|
|
@app.post("/@/confirm")
|
|
def do_confirm():
|
|
csr = get_cursor()
|
|
csr.execute("delete from confirm_token where expires <= now()")
|
|
csr.execute(
|
|
"""
|
|
select id, name from confirm_token t join role r on t.role = r.id
|
|
where token = %s
|
|
""",
|
|
(request.form["key"],))
|
|
r = csr.fetchone()
|
|
if r:
|
|
csr.execute(
|
|
"""
|
|
update role
|
|
set first_confirmed = coalesce(first_confirmed, now()),
|
|
last_confirmed = now()
|
|
where id = %s
|
|
""",
|
|
(r.id,))
|
|
|
|
session["user_id"] = r.id
|
|
session["user_name"] = r.name
|
|
return render_template("welcome.html")
|
|
else:
|
|
return render_template("no_token.html")
|
|
|
|
@app.route("/@/logout", methods=["GET", "POST"])
|
|
def logout():
|
|
del session["user_id"]
|
|
del session["user_name"]
|
|
return redirect("/", code=303)
|
|
|
|
#@get("/~user")
|
|
# maybe too oldskool? use @user?
|
|
#@get("/~user/token")
|
|
|
|
#-----------------------------------------------------------------------
|
|
|
|
# Content generation
|
|
|
|
@app.template_filter("markdown")
|
|
def markdown_to_html(m):
|
|
return Markup(markdown.markdown(m))
|
|
|
|
def send_mail(email_address, confirmation_url):
|
|
msg = email.message.EmailMessage()
|
|
msg["From"] ="noreply@hjp.at"
|
|
msg["To"] = email_address
|
|
msg["Subject"] = "Zettel confirmation"
|
|
body = confirmation_url # Really minimalistic
|
|
msg.set_content(body)
|
|
mta = smtplib.SMTP(host="localhost")
|
|
mta.send_message(msg)
|
|
|
|
#-----------------------------------------------------------------------
|
|
|
|
# Plumbing
|
|
|
|
def get_roles():
|
|
user_id = session.get("user_id")
|
|
csr = get_cursor()
|
|
if not user_id:
|
|
csr.execute("select * from role where name = %s", ("public",))
|
|
r = csr.fetchone()
|
|
user_id = session["user_id"] = r.id
|
|
session["user_name"] = r.name
|
|
assert type(user_id) == int
|
|
roles = [user_id]
|
|
csr.execute(
|
|
"""
|
|
with recursive r as (
|
|
select group_role from role_member where member_role = %s
|
|
union
|
|
select role_member.group_role from role_member join r on role_member.member_role = r.group_role
|
|
)
|
|
select * from r
|
|
""",
|
|
(user_id,))
|
|
for r in csr:
|
|
roles.append(r.group_role)
|
|
return roles
|
|
|
|
def get_cursor():
|
|
db = get_db()
|
|
csr = db.cursor(row_factory=psycopg.rows.namedtuple_row)
|
|
return csr
|
|
|
|
def get_db():
|
|
if "db" not in g:
|
|
g.db = psycopg.connect(dbname=config.dbname, user=config.dbuser)
|
|
return g.db
|
|
|
|
@app.teardown_appcontext
|
|
def teardown_db(exception):
|
|
app.logger.debug("in teardown_db: exception = %s", exception)
|
|
db = g.pop('db', None)
|
|
if db is not None:
|
|
if not exception:
|
|
try:
|
|
db.commit()
|
|
except:
|
|
pass
|
|
db.rollback()
|
|
db.close()
|
|
|