import logging
import uuid
import psycopg2
import psycopg2.extras
from flask import Flask, request, render_template, render_template_string, session, g
from flask_babel import Babel, _
logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(lineno)d | %(message)s", level=logging.DEBUG)
app = Flask(__name__)
babel = Babel(app)
app.secret_key = b"1234" # XXX - Should be in config (not in source, but stable across restarts)
log = logging.getLogger(__name__)
# Routes
@app.route("/")
def home():
log.info("in home")
app.logger.info("in home (via app.logger)")
log.info("greeting = %s", _("hello"))
user_public_id = None
if "uuid" not in session:
session["uuid"], user_public_id = new_user()
if not user_public_id:
csr = get_cursor()
csr.execute("select * from users where uuid = %s", (session["uuid"],))
user_public_id = csr.fetchone().public_id # Can this fail?
log.info("session = %s", session)
return render_template("home.html", public_id=user_public_id)
@app.route("/entry_form", methods=["POST"])
def entry_form():
log.info("session = %s", session)
user_uuid = session["uuid"]
log.info("user_uuid = %s", user_uuid)
csr = get_cursor()
csr.execute(
"select * from country_names natural join countries where lang=%s order by population desc",
(get_locale(),))
f = ""
for r in csr.fetchall():
csr.execute(
"select * from entries where uuid = %s and country = %s",
(user_uuid, r.code,))
f += render_template("entry_row.html", code=r.code, name=r.name, entries=csr.fetchall())
return f
@app.route("/update_entries", methods=["POST"])
def update_entries():
csr = get_cursor()
log.debug(request.form)
csr = get_cursor()
r = ""
for k, v in request.form.items():
(band, code, id) = k.split("_")
if id == "new":
csr.execute(
"insert into entries(country, uuid, band) values(%s, %s, %s) returning id",
(code, session["uuid"], v,))
id = csr.fetchone().id
extra = render_template_string(
"""""",
code=code)
else:
csr.execute(
"update entries set band = %s where id = %s and uuid = %s",
(v, id, session["uuid"],))
extra = ""
r += render_template_string(
"""""",
code=code, id=id, band=v)
r += extra
return r
@app.route("/result/")
def result(public_id):
lang = get_locale()
with get_dict_cursor() as csr:
csr.execute(
"""
select country, name,
band,
count(*) as total,
count(*) filter(where public_id = %s) as by_user
from entries e
join countries c on c.code = e.country
join country_names cn on e.country = cn.code and cn.lang = %s
join users u using (uuid)
group by country, name, population, band
order by population desc, name, band
""",
(public_id, lang,))
country_stats = []
for r in csr:
if not country_stats or country_stats[-1]["code"] != r["country"]:
country_stats.append({"code": r["country"],
"name": r["name"],
"bands": []})
country_stats[-1]["bands"].append({"name": r["band"],
"total_count": r["total"],
"by_user": bool(r["by_user"])})
stats = { "countries": country_stats }
csr.execute(
"""
with a as (
select public_id, count(distinct(country)) as c
from entries join users using (uuid)
group by 1)
select public_id, c, rank() over (order by c desc) from a;
"""
)
for r in csr:
if r["public_id"] == public_id:
stats["country_rank"] = r["rank"]
stats["country_count"] = r["c"]
stats["user_count"] = csr.rowcount
csr.execute(
"""
with a as (
select public_id, count(*) as c
from entries join users using (uuid)
group by 1)
select public_id, c, rank() over (order by c desc) from a;
"""
)
for r in csr:
if r["public_id"] == public_id:
# XXX - clobbering previous results?
stats["band_rank"] = r["rank"]
stats["band_count"] = r["c"]
ask_mail = session["uuid"].endswith(public_id)
log.debug("uuid = %s, public_id=%s, ask_mail=%s", session["uuid"], public_id, ask_mail)
return render_template("result.html", public_id=public_id, stats=stats, ask_mail=ask_mail)
@app.route("/add-email", methods=["POST"])
def add_email():
csr = get_cursor()
csr.execute("update users set email=%s where uuid = %s", (request.form["email"], session["uuid"],))
if csr.rowcount:
return render_template_string(_("Saved. Expect mail at {{email}}"),
email=request.form["email"])
else:
return _("You don't exist. Go away!")
# Middleware
@babel.localeselector
def get_locale():
lang = request.accept_languages.best_match(["de", "en"])
log.info("get_locale: best language is %s", lang)
return lang
# Helpers
def get_dict_cursor():
db = get_db()
csr = db.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return csr
def get_cursor():
db = get_db()
csr = db.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor)
return csr
def get_db():
if "db" not in g:
g.db = psycopg2.connect("dbname=internationale")
return g.db
@app.teardown_appcontext
def teardown_db(exception):
log.info("in teardown_db: exception = %s", exception)
db = g.pop('db', None)
if db is not None:
if not exception:
try:
db.commit()
except:
pass
db.rollback()
db.close()
def new_user():
user_uuid = str(uuid.uuid4())
user_public_id = user_uuid[-12:]
csr = get_cursor()
csr.execute("insert into users(uuid, public_id) values(%s, %s)", (user_uuid, user_public_id,))
return user_uuid, user_public_id
# vim: tw=99