import base64 import email.message import os import smtplib import markdown import psycopg from flask import Flask, render_template, g, url_for, session, request from markupsafe import Markup import config app = Flask(__name__) app.config.from_object(config) #----------------------------------------------------------------------- # HTTP endpoints @app.get("/") def index(): return show_posts("") @app.get("/") def show_posts(p): roles = get_roles() app.logger.info(f"{p}") tags = p.split("/") csr = get_cursor() csr.execute("select * from tag where lower(name) = any(%s)", (tags,)) tag_ids = [r.id for r in csr] # XXX - Check that tag_ids has the same length as tags. # Otherwise # - return 404? # - redirect? q = "select distinct p.* from post p" for tag_id in tag_ids: # we got tag_id from the database so we know it's an int q += f" join tag_post t{tag_id} on p.id = t{tag_id}.post and t{tag_id}.tag = {tag_id}" # and the user_id is an int, too q += f" join acl on p.id = acl.post and acl.role = any(%s)" q += " order by p.ts desc" csr.execute(q, (roles,)) posts = csr.fetchall() # XXX find all tags for each post post_ids = [p.id for p in posts] csr.execute( "select * from tag_post tp join tag t on tp.tag = t.id where post = any(%s) order by name", (post_ids,)) tags_by_id = {} tags_by_post = {} for r in csr: if r.id not in tags_by_id: selected = r.id in tag_ids if selected: # XXX - should really use id new_tags = [t for t in tags if t != r.name.lower()] else: new_tags = tags + [r.name.lower()] link = url_for("show_posts", p="/".join(new_tags)) tags_by_id[r.id] = { "id": r.id, "name": r.name, "selected": selected, "link": link } tags_by_post.setdefault(r.post, []).append(tags_by_id[r.id]) tags = tags_by_id.values() return render_template( "posts.html", posts=posts, tags=tags, tags_by_post=tags_by_post, ) # XXX - do we need a prefix for paths? # Or use a separate domain for the api (no) # Or use an impossible prefix for the api (what would that be?) #@get("/api") #@get("/@/add_post") #@post("/@/add_post") # also json #@get("/@/tag/") # plus one or more posts for operations, like editing implies, bulk # updates, etc. @app.get("/@/login") def login_form(): # could duplicate as registration return render_template("login_form.html") @app.post("/@/login") def login_step1(): csr = get_cursor() email = request.form["email"].strip() csr.execute("select * from role where email = %s", (email,)) r = csr.fetchone() if not r: csr.execute("insert into role(email) values(%s) returning *", (email,)) r = csr.fetchone() user_id = r.id key = base64.urlsafe_b64encode(os.urandom(15)).decode('us-ascii') csr.execute("insert into confirm_token(role, token) values(%s, %s) returning *", (user_id, key,)) r = csr.fetchone() csr.connection.commit() send_mail(email, request.root_url + url_for("login_confirm", key=key)) return render_template("mail_sent.html") @app.get("/@/confirm") def login_confirm(): # XXX - Check that key isn't expired return render_template("login_confirm.html") @app.post("/@/confirm") def do_confirm(): csr = get_cursor() csr.execute("delete from confirm_token where expires <= now()") csr.execute( """ select id, name from confirm_token t join role r on t.role = r.id where token = %s """, (request.form["key"],)) r = csr.fetchone() if r: csr.execute( """ update role set first_confirmed = coalesce(first_confirmed, now()), last_confirmed = now() where id = %s """, (r.id,)) session["user_id"] = r.id session["user_name"] = r.name return render_template("welcome.html") else: return render_template("no_token.html") @app.route("/@/logout", methods=["GET", "POST"]) def logout(): del session["user_id"] del session["user_name"] return redirect("/", code=303) #@get("/~user") # maybe too oldskool? use @user? #@get("/~user/token") #----------------------------------------------------------------------- # Content generation @app.template_filter("markdown") def markdown_to_html(m): return Markup(markdown.markdown(m)) def send_mail(email_address, confirmation_url): msg = email.message.EmailMessage() msg["From"] ="noreply@hjp.at" msg["To"] = email_address msg["Subject"] = "Zettel confirmation" body = confirmation_url # Really minimalistic msg.set_content(body) mta = smtplib.SMTP(host="localhost") mta.send_message(msg) #----------------------------------------------------------------------- # Plumbing def get_roles(): user_id = session.get("user_id") csr = get_cursor() if not user_id: csr.execute("select * from role where name = %s", ("public",)) r = csr.fetchone() user_id = session["user_id"] = r.id session["user_name"] = r.name assert type(user_id) == int roles = [user_id] csr.execute( """ with recursive r as ( select group_role from role_member where member_role = %s union select role_member.group_role from role_member join r on role_member.member_role = r.group_role ) select * from r """, (user_id,)) for r in csr: roles.append(r.group_role) return roles def get_cursor(): db = get_db() csr = db.cursor(row_factory=psycopg.rows.namedtuple_row) return csr def get_db(): if "db" not in g: g.db = psycopg.connect(dbname=config.dbname, user=config.dbuser) return g.db @app.teardown_appcontext def teardown_db(exception): app.logger.debug("in teardown_db: exception = %s", exception) db = g.pop('db', None) if db is not None: if not exception: try: db.commit() except: pass db.rollback() db.close()