import logging import uuid import psycopg2 import psycopg2.extras from flask import Flask, request, render_template, render_template_string, session, g from flask_babel import Babel, _ logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(lineno)d | %(message)s", level=logging.DEBUG) app = Flask(__name__) babel = Babel(app) app.secret_key = b"1234" # XXX - Should be in config (not in source, but stable across restarts) log = logging.getLogger(__name__) # Routes @app.route("/") def home(): log.info("in home") app.logger.info("in home (via app.logger)") log.info("greeting = %s", _("hello")) user_public_id = None if "uuid" not in session: session["uuid"], user_public_id = new_user() if not user_public_id: csr = get_cursor() csr.execute("select * from users where uuid = %s", (session["uuid"],)) user_public_id = csr.fetchone().public_id # Can this fail? log.info("session = %s", session) return render_template("home.html", public_id=user_public_id) @app.route("/entry_form", methods=["POST"]) def entry_form(): log.info("session = %s", session) user_uuid = session["uuid"] log.info("user_uuid = %s", user_uuid) csr = get_cursor() csr.execute( "select * from country_names natural join countries where lang=%s order by population desc", (get_locale(),)) f = "" for r in csr.fetchall(): csr.execute( "select * from entries where uuid = %s and country = %s", (user_uuid, r.code,)) f += render_template("entry_row.html", code=r.code, name=r.name, entries=csr.fetchall()) return f @app.route("/update_entries", methods=["POST"]) def update_entries(): csr = get_cursor() log.debug(request.form) csr = get_cursor() r = "" for k, v in request.form.items(): (band, code, id) = k.split("_") if id == "new": csr.execute( "insert into entries(country, uuid, band) values(%s, %s, %s) returning id", (code, session["uuid"], v,)) id = csr.fetchone().id extra = render_template_string( """""", code=code) else: csr.execute( "update entries set band = %s where id = %s and uuid = %s", (v, id, session["uuid"],)) extra = "" r += render_template_string( """""", code=code, id=id, band=v) r += extra return r @app.route("/result/") def result(public_id): lang = get_locale() with get_dict_cursor() as csr: csr.execute( """ select country, name, band, count(*) as total, count(*) filter(where public_id = %s) as by_user from entries e join countries c on c.code = e.country join country_names cn on e.country = cn.code and cn.lang = %s join users u using (uuid) group by country, name, population, band order by population desc, name, band """, (public_id, lang,)) country_stats = [] for r in csr: if not country_stats or country_stats[-1]["code"] != r["country"]: country_stats.append({"code": r["country"], "name": r["name"], "bands": []}) country_stats[-1]["bands"].append({"name": r["band"], "total_count": r["total"], "by_user": bool(r["by_user"])}) stats = { "countries": country_stats } csr.execute( """ with a as ( select public_id, count(distinct(country)) as c from entries join users using (uuid) group by 1) select public_id, c, rank() over (order by c desc) from a; """ ) for r in csr: if r["public_id"] == public_id: stats["country_rank"] = r["rank"] stats["country_count"] = r["c"] stats["user_count"] = csr.rowcount csr.execute( """ with a as ( select public_id, count(*) as c from entries join users using (uuid) group by 1) select public_id, c, rank() over (order by c desc) from a; """ ) for r in csr: if r["public_id"] == public_id: # XXX - clobbering previous results? stats["band_rank"] = r["rank"] stats["band_count"] = r["c"] ask_mail = session["uuid"].endswith(public_id) log.debug("uuid = %s, public_id=%s, ask_mail=%s", session["uuid"], public_id, ask_mail) return render_template("result.html", public_id=public_id, stats=stats, ask_mail=ask_mail) @app.route("/add-email", methods=["POST"]) def add_email(): csr = get_cursor() csr.execute("update users set email=%s where uuid = %s", (request.form["email"], session["uuid"],)) if csr.rowcount: return render_template_string(_("Saved. Expect mail at {{email}}"), email=request.form["email"]) else: return _("You don't exist. Go away!") # Middleware @babel.localeselector def get_locale(): lang = request.accept_languages.best_match(["de", "en"]) log.info("get_locale: best language is %s", lang) return lang # Helpers def get_dict_cursor(): db = get_db() csr = db.cursor(cursor_factory=psycopg2.extras.RealDictCursor) return csr def get_cursor(): db = get_db() csr = db.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) return csr def get_db(): if "db" not in g: g.db = psycopg2.connect("dbname=internationale") return g.db @app.teardown_appcontext def teardown_db(exception): log.info("in teardown_db: exception = %s", exception) db = g.pop('db', None) if db is not None: if not exception: try: db.commit() except: pass db.rollback() db.close() def new_user(): user_uuid = str(uuid.uuid4()) user_public_id = user_uuid[-12:] csr = get_cursor() csr.execute("insert into users(uuid, public_id) values(%s, %s)", (user_uuid, user_public_id,)) return user_uuid, user_public_id # vim: tw=99