import email.message import json import logging import smtplib import uuid import psycopg2 import psycopg2.extras from flask import Flask, request, render_template, render_template_string, session, g, url_for from flask_babel import Babel, _ logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(lineno)d | %(message)s", level=logging.DEBUG) app = Flask(__name__) babel = Babel(app) app.secret_key = b"1234" # XXX - Should be in config (not in source, but stable across restarts) log = logging.getLogger(__name__) # Routes @app.route("/") def home(): log.info("in home") app.logger.info("in home (via app.logger)") log.info("greeting = %s", _("hello")) user_public_id = None if "uuid" not in session: session["uuid"], user_public_id = new_user() if not user_public_id: csr = get_cursor() csr.execute("select * from users where uuid = %s", (session["uuid"],)) user_public_id = csr.fetchone().public_id # Can this fail? log.info("session = %s", session) return render_template("home.html", public_id=user_public_id) @app.route("/entry_form", methods=["POST"]) def entry_form(): log.info("session = %s", session) user_uuid = session["uuid"] log.info("user_uuid = %s", user_uuid) csr = get_cursor() csr.execute( "select * from country_names natural join countries where lang=%s order by population desc", (get_locale(),)) f = "" for r in csr.fetchall(): csr.execute( "select * from entries where uuid = %s and country = %s", (user_uuid, r.code,)) f += render_template("entry_row.html", code=r.code, name=r.name, entries=csr.fetchall()) return f @app.route("/update_entries", methods=["POST"]) def update_entries(): csr = get_cursor() log.debug(request.form) csr = get_cursor() r = "" for k, v in request.form.items(): (band, code, id) = k.split("_") if id == "new": csr.execute( "insert into entries(country, uuid, band) values(%s, %s, %s) returning id", (code, session["uuid"], v,)) id = csr.fetchone().id extra = render_template_string( """""", code=code) else: csr.execute( "update entries set band = %s where id = %s and uuid = %s", (v, id, session["uuid"],)) extra = "" r += render_template_string( """""", code=code, id=id, band=v) r += extra return r @app.route("/result/") def result(public_id): lang = get_locale() with get_dict_cursor() as csr: csr.execute( """ select country, name, band, count(*) as total, count(*) filter(where public_id = %s) as by_user from entries e join countries c on c.code = e.country join country_names cn on e.country = cn.code and cn.lang = %s join users u using (uuid) group by country, name, population, band order by population desc, name, band """, (public_id, lang,)) country_stats = [] for r in csr: if not country_stats or country_stats[-1]["code"] != r["country"]: country_stats.append({"code": r["country"], "name": r["name"], "bands": []}) country_stats[-1]["bands"].append({"name": r["band"], "total_count": r["total"], "by_user": bool(r["by_user"])}) stats = { "countries": country_stats } csr.execute( """ with a as ( select public_id, count(distinct(country)) as c from entries join users using (uuid) group by 1) select public_id, c, rank() over (order by c desc) from a; """ ) for r in csr: if r["public_id"] == public_id: stats["country_rank"] = r["rank"] stats["country_count"] = r["c"] stats["user_count"] = csr.rowcount csr.execute( """ with a as ( select public_id, count(*) as c from entries join users using (uuid) group by 1) select public_id, c, rank() over (order by c desc) from a; """ ) for r in csr: if r["public_id"] == public_id: # XXX - clobbering previous results? stats["band_rank"] = r["rank"] stats["band_count"] = r["c"] uuid = session.get("uuid", "") ask_mail = uuid.endswith(public_id) log.debug("uuid = %s, public_id=%s, ask_mail=%s", uuid, public_id, ask_mail) return render_template("result.html", public_id=public_id, stats=stats, ask_mail=ask_mail) @app.route("/add-email", methods=["POST"]) def add_email(): csr = get_cursor() csr.execute("update users set email=%s, lang=%s, confirmation_sent_ts=now() where uuid = %s", (request.form["email"], get_locale(), session["uuid"],)) if csr.rowcount: send_mail(request.form["email"], session["uuid"]) return render_template_string(_("Saved. Expect mail at {{email}}"), email=request.form["email"]) else: return _("You don't exist. Go away!") @app.route("/confirm-email/", methods=["GET"]) def confirm_email_ask(uuid): return render_template("confirm.html", uuid=uuid) @app.route("/confirm-email/", methods=["POST"]) def confirm_email_do(uuid): csr = get_cursor() csr.execute("update users set confirmation_received_ts=now(), confirmation_info=%s where uuid=%s", (json.dumps({"ip": request.remote_addr, "ua": request.user_agent.string}), uuid)) return "

" + _("Thanks!") + "

" # Middleware @babel.localeselector def get_locale(): lang = request.accept_languages.best_match(["de", "en"]) log.info("get_locale: best language is %s", lang) return lang # Helpers def get_dict_cursor(): db = get_db() csr = db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) return csr def get_cursor(): db = get_db() csr = db.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) return csr def get_db(): if "db" not in g: g.db = psycopg2.connect("dbname=internationale") return g.db @app.teardown_appcontext def teardown_db(exception): log.info("in teardown_db: exception = %s", exception) db = g.pop('db', None) if db is not None: if not exception: try: db.commit() except: pass db.rollback() db.close() def new_user(): lang = get_locale() user_uuid = str(uuid.uuid4()) user_public_id = user_uuid[-12:] csr = get_cursor() csr.execute("insert into users(uuid, public_id, lang) values(%s, %s, %s)", (user_uuid, user_public_id, lang)) return user_uuid, user_public_id def send_mail(mailaddress, uuid): msg = email.message.EmailMessage() msg["From"] ="i12e@hjp.at" msg["To"] = mailaddress msg["Subject"] = _("The Musical Internatiionale: Confirm mail address") confirmation_url = url_for("confirm_email_ask", uuid=uuid, _external=True) body = _( "Hello,\n" "\n" "somebody requested news about new developments at https://i12e.hjp.at/ to be sent to\n" "{mailaddress}.\n" "\n" "To confirm that that this was you, please visit this url:\n" "{url}\n" "\n" "With musical greetings\n" " I12E Bot\n" "").format(mailaddress=mailaddress, url=confirmation_url) msg.set_content(body) mta = smtplib.SMTP(host="localhost") mta.send_message(msg) # vim: tw=99