i12e/htmx/app.py

241 lines
8.2 KiB
Python

import email.message
import json
import logging
import smtplib
import uuid
import psycopg2
import psycopg2.extras
from flask import Flask, request, render_template, render_template_string, session, g, url_for
from flask_babel import Babel, _
import config
logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(lineno)d | %(message)s", level=logging.DEBUG)
app = Flask(__name__)
babel = Babel(app)
app.secret_key = config.secret_key
log = logging.getLogger(__name__)
# Routes
@app.route("/")
def home():
log.info("in home")
app.logger.info("in home (via app.logger)")
log.info("greeting = %s", _("hello"))
user_public_id = None
if "uuid" not in session:
session["uuid"], user_public_id = new_user()
if not user_public_id:
csr = get_cursor()
csr.execute("select * from users where uuid = %s", (session["uuid"],))
user_public_id = csr.fetchone().public_id # Can this fail?
log.info("session = %s", session)
return render_template("home.html", public_id=user_public_id)
@app.route("/entry_form", methods=["POST"])
def entry_form():
log.info("session = %s", session)
user_uuid = session["uuid"]
log.info("user_uuid = %s", user_uuid)
csr = get_cursor()
csr.execute(
"select * from country_names natural join countries where lang=%s order by population desc",
(get_locale(),))
f = ""
for r in csr.fetchall():
csr.execute(
"select * from entries where uuid = %s and country = %s",
(user_uuid, r.code,))
f += render_template("entry_row.html", code=r.code, name=r.name, entries=csr.fetchall())
return f
@app.route("/update_entries", methods=["POST"])
def update_entries():
csr = get_cursor()
log.debug(request.form)
csr = get_cursor()
r = ""
for k, v in request.form.items():
(band, code, id) = k.split("_")
if id == "new":
csr.execute(
"insert into entries(country, uuid, band) values(%s, %s, %s) returning id",
(code, session["uuid"], v,))
id = csr.fetchone().id
extra = render_template_string(
"""<input name="band_{{code}}_new" hx-trigger="change" hx-post="/update_entries" hx-swap="outerHTML" autofocus>""",
code=code)
else:
csr.execute(
"update entries set band = %s where id = %s and uuid = %s",
(v, id, session["uuid"],))
extra = ""
r += render_template_string(
"""<input name="band_{{code}}_{{id}}" value="{{band}}" hx-trigger="change" hx-post="/update_entries" hx-swap="outerHTML">""",
code=code, id=id, band=v)
r += extra
return r
@app.route("/result/<public_id>")
def result(public_id):
lang = get_locale()
with get_dict_cursor() as csr:
csr.execute(
"""
select country, name,
band,
count(*) as total,
count(*) filter(where public_id = %s) as by_user
from entries e
join countries c on c.code = e.country
join country_names cn on e.country = cn.code and cn.lang = %s
join users u using (uuid)
group by country, name, population, band
order by population desc, name, band
""",
(public_id, lang,))
country_stats = []
for r in csr:
if not country_stats or country_stats[-1]["code"] != r["country"]:
country_stats.append({"code": r["country"],
"name": r["name"],
"bands": []})
country_stats[-1]["bands"].append({"name": r["band"],
"total_count": r["total"],
"by_user": bool(r["by_user"])})
stats = { "countries": country_stats }
csr.execute(
"""
with a as (
select public_id, count(distinct(country)) as c
from entries join users using (uuid)
group by 1)
select public_id, c, rank() over (order by c desc) from a;
"""
)
for r in csr:
if r["public_id"] == public_id:
stats["country_rank"] = r["rank"]
stats["country_count"] = r["c"]
stats["user_count"] = csr.rowcount
csr.execute(
"""
with a as (
select public_id, count(*) as c
from entries join users using (uuid)
group by 1)
select public_id, c, rank() over (order by c desc) from a;
"""
)
for r in csr:
if r["public_id"] == public_id:
# XXX - clobbering previous results?
stats["band_rank"] = r["rank"]
stats["band_count"] = r["c"]
uuid = session.get("uuid", "")
ask_mail = uuid.endswith(public_id)
log.debug("uuid = %s, public_id=%s, ask_mail=%s", uuid, public_id, ask_mail)
return render_template("result.html", public_id=public_id, stats=stats, ask_mail=ask_mail)
@app.route("/add-email", methods=["POST"])
def add_email():
csr = get_cursor()
csr.execute("update users set email=%s, lang=%s, confirmation_sent_ts=now() where uuid = %s",
(request.form["email"], get_locale(), session["uuid"],))
if csr.rowcount:
send_mail(request.form["email"], session["uuid"])
return render_template_string(_("Saved. Expect mail at {{email}}"),
email=request.form["email"])
else:
return _("<span>You don't exist. Go away!</span>")
@app.route("/confirm-email/<uuid>", methods=["GET"])
def confirm_email_ask(uuid):
return render_template("confirm.html", uuid=uuid)
@app.route("/confirm-email/<uuid>", methods=["POST"])
def confirm_email_do(uuid):
csr = get_cursor()
csr.execute("update users set confirmation_received_ts=now(), confirmation_info=%s where uuid=%s",
(json.dumps({"ip": request.remote_addr, "ua": request.user_agent.string}), uuid))
return "<p>" + _("Thanks!") + "</p>"
# Middleware
@babel.localeselector
def get_locale():
lang = request.accept_languages.best_match(["de", "en"])
log.info("get_locale: best language is %s", lang)
return lang
# Helpers
def get_dict_cursor():
db = get_db()
csr = db.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return csr
def get_cursor():
db = get_db()
csr = db.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor)
return csr
def get_db():
if "db" not in g:
g.db = psycopg2.connect("dbname=internationale")
return g.db
@app.teardown_appcontext
def teardown_db(exception):
log.info("in teardown_db: exception = %s", exception)
db = g.pop('db', None)
if db is not None:
if not exception:
try:
db.commit()
except:
pass
db.rollback()
db.close()
def new_user():
lang = get_locale()
user_uuid = str(uuid.uuid4())
user_public_id = user_uuid[-12:]
csr = get_cursor()
csr.execute("insert into users(uuid, public_id, lang) values(%s, %s, %s)",
(user_uuid, user_public_id, lang))
return user_uuid, user_public_id
def send_mail(mailaddress, uuid):
msg = email.message.EmailMessage()
msg["From"] ="i12e@hjp.at"
msg["To"] = mailaddress
msg["Subject"] = _("The Musical Internatiionale: Confirm mail address")
confirmation_url = url_for("confirm_email_ask", uuid=uuid, _external=True)
body = _(
"Hello,\n"
"\n"
"somebody requested news about new developments at https://i12e.hjp.at/ to be sent to\n"
"{mailaddress}.\n"
"\n"
"To confirm that that this was you, please visit this url:\n"
"{url}\n"
"\n"
"With musical greetings\n"
" I12E Bot\n"
"").format(mailaddress=mailaddress, url=confirmation_url)
msg.set_content(body)
mta = smtplib.SMTP(host="localhost")
mta.send_message(msg)
# vim: tw=99