241 lines
8.2 KiB
Python
241 lines
8.2 KiB
Python
import email.message
|
|
import json
|
|
import logging
|
|
import smtplib
|
|
import uuid
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
from flask import Flask, request, render_template, render_template_string, session, g, url_for
|
|
from flask_babel import Babel, _
|
|
|
|
import config
|
|
|
|
logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(lineno)d | %(message)s", level=logging.DEBUG)
|
|
|
|
app = Flask(__name__)
|
|
babel = Babel(app)
|
|
app.secret_key = config.secret_key
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Routes
|
|
@app.route("/")
|
|
def home():
|
|
log.info("in home")
|
|
app.logger.info("in home (via app.logger)")
|
|
log.info("greeting = %s", _("hello"))
|
|
user_public_id = None
|
|
if "uuid" not in session:
|
|
session["uuid"], user_public_id = new_user()
|
|
if not user_public_id:
|
|
csr = get_cursor()
|
|
csr.execute("select * from users where uuid = %s", (session["uuid"],))
|
|
user_public_id = csr.fetchone().public_id # Can this fail?
|
|
|
|
log.info("session = %s", session)
|
|
return render_template("home.html", public_id=user_public_id)
|
|
|
|
@app.route("/entry_form", methods=["POST"])
|
|
def entry_form():
|
|
log.info("session = %s", session)
|
|
user_uuid = session["uuid"]
|
|
log.info("user_uuid = %s", user_uuid)
|
|
csr = get_cursor()
|
|
csr.execute(
|
|
"select * from country_names natural join countries where lang=%s order by population desc",
|
|
(get_locale(),))
|
|
f = ""
|
|
for r in csr.fetchall():
|
|
csr.execute(
|
|
"select * from entries where uuid = %s and country = %s",
|
|
(user_uuid, r.code,))
|
|
f += render_template("entry_row.html", code=r.code, name=r.name, entries=csr.fetchall())
|
|
return f
|
|
|
|
@app.route("/update_entries", methods=["POST"])
|
|
def update_entries():
|
|
csr = get_cursor()
|
|
log.debug(request.form)
|
|
csr = get_cursor()
|
|
r = ""
|
|
for k, v in request.form.items():
|
|
(band, code, id) = k.split("_")
|
|
if id == "new":
|
|
csr.execute(
|
|
"insert into entries(country, uuid, band) values(%s, %s, %s) returning id",
|
|
(code, session["uuid"], v,))
|
|
id = csr.fetchone().id
|
|
extra = render_template_string(
|
|
"""<input name="band_{{code}}_new" hx-trigger="change" hx-post="/update_entries" hx-swap="outerHTML" autofocus>""",
|
|
code=code)
|
|
else:
|
|
csr.execute(
|
|
"update entries set band = %s where id = %s and uuid = %s",
|
|
(v, id, session["uuid"],))
|
|
extra = ""
|
|
r += render_template_string(
|
|
"""<input name="band_{{code}}_{{id}}" value="{{band}}" hx-trigger="change" hx-post="/update_entries" hx-swap="outerHTML">""",
|
|
code=code, id=id, band=v)
|
|
r += extra
|
|
return r
|
|
|
|
@app.route("/result/<public_id>")
|
|
def result(public_id):
|
|
|
|
lang = get_locale()
|
|
with get_dict_cursor() as csr:
|
|
csr.execute(
|
|
"""
|
|
select country, name,
|
|
band,
|
|
count(*) as total,
|
|
count(*) filter(where public_id = %s) as by_user
|
|
from entries e
|
|
join countries c on c.code = e.country
|
|
join country_names cn on e.country = cn.code and cn.lang = %s
|
|
join users u using (uuid)
|
|
group by country, name, population, band
|
|
order by population desc, name, band
|
|
""",
|
|
(public_id, lang,))
|
|
country_stats = []
|
|
for r in csr:
|
|
if not country_stats or country_stats[-1]["code"] != r["country"]:
|
|
country_stats.append({"code": r["country"],
|
|
"name": r["name"],
|
|
"bands": []})
|
|
country_stats[-1]["bands"].append({"name": r["band"],
|
|
"total_count": r["total"],
|
|
"by_user": bool(r["by_user"])})
|
|
|
|
stats = { "countries": country_stats }
|
|
csr.execute(
|
|
"""
|
|
with a as (
|
|
select public_id, count(distinct(country)) as c
|
|
from entries join users using (uuid)
|
|
group by 1)
|
|
select public_id, c, rank() over (order by c desc) from a;
|
|
"""
|
|
)
|
|
for r in csr:
|
|
if r["public_id"] == public_id:
|
|
stats["country_rank"] = r["rank"]
|
|
stats["country_count"] = r["c"]
|
|
stats["user_count"] = csr.rowcount
|
|
|
|
csr.execute(
|
|
"""
|
|
with a as (
|
|
select public_id, count(*) as c
|
|
from entries join users using (uuid)
|
|
group by 1)
|
|
select public_id, c, rank() over (order by c desc) from a;
|
|
"""
|
|
)
|
|
for r in csr:
|
|
if r["public_id"] == public_id:
|
|
# XXX - clobbering previous results?
|
|
stats["band_rank"] = r["rank"]
|
|
stats["band_count"] = r["c"]
|
|
|
|
uuid = session.get("uuid", "")
|
|
ask_mail = uuid.endswith(public_id)
|
|
log.debug("uuid = %s, public_id=%s, ask_mail=%s", uuid, public_id, ask_mail)
|
|
return render_template("result.html", public_id=public_id, stats=stats, ask_mail=ask_mail)
|
|
|
|
@app.route("/add-email", methods=["POST"])
|
|
def add_email():
|
|
csr = get_cursor()
|
|
csr.execute("update users set email=%s, lang=%s, confirmation_sent_ts=now() where uuid = %s",
|
|
(request.form["email"], get_locale(), session["uuid"],))
|
|
if csr.rowcount:
|
|
send_mail(request.form["email"], session["uuid"])
|
|
return render_template_string(_("Saved. Expect mail at {{email}}"),
|
|
email=request.form["email"])
|
|
else:
|
|
return _("<span>You don't exist. Go away!</span>")
|
|
|
|
@app.route("/confirm-email/<uuid>", methods=["GET"])
|
|
def confirm_email_ask(uuid):
|
|
return render_template("confirm.html", uuid=uuid)
|
|
|
|
@app.route("/confirm-email/<uuid>", methods=["POST"])
|
|
def confirm_email_do(uuid):
|
|
csr = get_cursor()
|
|
csr.execute("update users set confirmation_received_ts=now(), confirmation_info=%s where uuid=%s",
|
|
(json.dumps({"ip": request.remote_addr, "ua": request.user_agent.string}), uuid))
|
|
return "<p>" + _("Thanks!") + "</p>"
|
|
|
|
# Middleware
|
|
|
|
@babel.localeselector
|
|
def get_locale():
|
|
lang = request.accept_languages.best_match(["de", "en"])
|
|
log.info("get_locale: best language is %s", lang)
|
|
return lang
|
|
|
|
# Helpers
|
|
|
|
def get_dict_cursor():
|
|
db = get_db()
|
|
csr = db.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
return csr
|
|
|
|
def get_cursor():
|
|
db = get_db()
|
|
csr = db.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor)
|
|
return csr
|
|
|
|
def get_db():
|
|
if "db" not in g:
|
|
g.db = psycopg2.connect("dbname=internationale")
|
|
return g.db
|
|
|
|
@app.teardown_appcontext
|
|
def teardown_db(exception):
|
|
log.info("in teardown_db: exception = %s", exception)
|
|
db = g.pop('db', None)
|
|
if db is not None:
|
|
if not exception:
|
|
try:
|
|
db.commit()
|
|
except:
|
|
pass
|
|
db.rollback()
|
|
db.close()
|
|
|
|
def new_user():
|
|
lang = get_locale()
|
|
user_uuid = str(uuid.uuid4())
|
|
user_public_id = user_uuid[-12:]
|
|
csr = get_cursor()
|
|
csr.execute("insert into users(uuid, public_id, lang) values(%s, %s, %s)",
|
|
(user_uuid, user_public_id, lang))
|
|
return user_uuid, user_public_id
|
|
|
|
def send_mail(mailaddress, uuid):
|
|
msg = email.message.EmailMessage()
|
|
msg["From"] ="i12e@hjp.at"
|
|
msg["To"] = mailaddress
|
|
msg["Subject"] = _("The Musical Internatiionale: Confirm mail address")
|
|
confirmation_url = url_for("confirm_email_ask", uuid=uuid, _external=True)
|
|
body = _(
|
|
"Hello,\n"
|
|
"\n"
|
|
"somebody requested news about new developments at https://i12e.hjp.at/ to be sent to\n"
|
|
"{mailaddress}.\n"
|
|
"\n"
|
|
"To confirm that that this was you, please visit this url:\n"
|
|
"{url}\n"
|
|
"\n"
|
|
"With musical greetings\n"
|
|
" I12E Bot\n"
|
|
"").format(mailaddress=mailaddress, url=confirmation_url)
|
|
msg.set_content(body)
|
|
mta = smtplib.SMTP(host="localhost")
|
|
mta.send_message(msg)
|
|
|
|
# vim: tw=99
|