import collections import datetime import email.message import logging import logging.config import os import re import smtplib import psycopg import psycopg.rows from flask import ( Flask, session, redirect, url_for, request, render_template, g, abort, flash ) import config logging.config.dictConfig(config.logging) app = Flask(__name__) app.secret_key = config.secret_key app.config["PERMANENT_SESSION_LIFETIME"] = datetime.timedelta(days=92) log = logging.getLogger(__name__) @app.route("/") def home(): log.debug("in home") log.debug("session = %s", session) if "user" not in session: return redirect(url_for("register", target="/")) return render_template("home.html") @app.route("/register", methods=["GET", "POST"]) def register(): log.debug("in register") if request.method == "GET": return render_template("register.html") email_address = request.form.get("email", "") if "@" not in email_address: flash("Das schaut nicht wie eine E-Mail-Adresse aus") csr = get_cursor() csr.execute("select * from bod where email = %s", (email_address,)) r = csr.fetchone() if r: if r.key: key = r.key else: key = os.urandom(8).hex() csr.execute( "update bod set key = %s, keychange = now() where email = %s", (key, email_address,)) else: key = os.urandom(8).hex() csr.execute( "insert into bod(email, key, keychange) values(%s, %s, now())", (email_address, key,)) log.debug("request.scheme = %s", request.scheme) log.debug("request.server = %s", request.server) log.debug("request.root_url = %s", request.root_url) confirmation_url = \ request.root_url + \ url_for("confirm", target=request.form["target"], key=key) send_mail(email_address, confirmation_url) return render_template("wait_for_confirmation.html") @app.route("/confirm") def confirm(): csr = get_cursor() csr.execute("select * from bod where key = %s", (request.args["key"],)) bod = csr.fetchone() if not bod: return render_template("wrong_key.html") session["user"] = { "id": bod.id, "email": bod.email} session.permanent = True app.permanent_session_lifetime = datetime.timedelta(days=90) return redirect(request.args["target"]) @app.route("/vote/") def vote(key): log.debug("session = %s", session) if "user" not in session: return redirect(url_for("register", target=request.url)) csr = get_cursor() csr.execute("select * from meet where key = %s", (key,)) meet = csr.fetchone() if not meet: abort(404) csr.execute( """ select email from bod where id in ( select date_vote.bod from date join date_vote on date.id = date_vote.date where meet = %(meet_id)s union select time_vote.bod from time join time_vote on time.id = time_vote.time where meet = %(meet_id)s union select place_vote.bod from place join place_vote on place.id = place_vote.place where meet = %(meet_id)s ) order by 1 """, { "meet_id": meet.id,}) voters = csr.fetchall() csr.execute( """ select d.id, d.date, d.display, preference from date d left join date_vote v on d.id = v.date and v.bod = %s where meet = %s order by d.date """, (session["user"]["id"], meet.id,)) dates = csr.fetchall() csr.execute( """ select d.id, d.time, d.display, preference from time d left join time_vote v on d.id = v.time and v.bod = %s where meet = %s order by d.time """, (session["user"]["id"], meet.id,)) times = csr.fetchall() csr.execute( """ select d.id, d.name, preference from place d left join place_vote v on d.id = v.place and v.bod = %s where meet = %s order by d.name """, (session["user"]["id"], meet.id,)) places = csr.fetchall() return render_template("vote.html", meet=meet, voters=voters, dates=dates, times=times, places=places) @app.post("/vote/date") def vote_date(): log.debug("form = %s", request.form) meet_id, preferences = get_preferences("date") date_ids = list(preferences.keys()) csr = get_cursor() csr.execute( "delete from date_vote where date = any (%s) and bod = %s", (date_ids, session["user"]["id"])) for date_id, pref in preferences.items(): csr.execute( "insert into date_vote(date, bod, preference) values(%s, %s, %s)", (date_id, session["user"]["id"], pref) ) # XXX - (almost) duplicate csr.execute( """ select d.id, d.date, d.display, preference from date d left join date_vote v on d.id = v.date and v.bod = %s where meet = %s order by d.date """, (session["user"]["id"], meet_id,)) dates = csr.fetchall() result = simple_sum(meet_id, "date") log.debug("result = %s", result) csr.execute( """ select distinct email, short from date_vote join date on date_vote.date = date.id join bod on date_vote.bod = bod.id where meet = %s order by email """, (meet_id,)) voters = csr.fetchall() return render_template( "date_vote_fragment.html", dates=dates, result=result, voters=voters) @app.get("/result//date") def result_date(meet_id): result = simple_sum(meet_id, "date") log.debug("result = %s", result) csr = get_cursor() csr.execute( """ select distinct email, short from date_vote join date on date_vote.date = date.id join bod on date_vote.bod = bod.id where meet = %s order by email """, (meet_id,)) voters = csr.fetchall() return render_template( "date_result_fragment.html", result=result, voters=voters) @app.post("/vote/time") def vote_time(): log.debug("form = %s", request.form) meet_id, preferences = get_preferences("time") time_ids = list(preferences.keys()) csr = get_cursor() csr.execute( "delete from time_vote where time = any (%s) and bod = %s", (time_ids, session["user"]["id"])) for time_id, pref in preferences.items(): csr.execute( "insert into time_vote(time, bod, preference) values(%s, %s, %s)", (time_id, session["user"]["id"], pref) ) # XXX - (almost) duplicate csr.execute( """ select d.id, d.time, d.display, preference from time d left join time_vote v on d.id = v.time and v.bod = %s where meet = %s order by d.time """, (session["user"]["id"], meet_id,)) times = csr.fetchall() result = simple_sum(meet_id, "time") log.debug("result = %s", result) csr.execute( """ select distinct email, short from time_vote join time on time_vote.time = time.id join bod on time_vote.bod = bod.id where meet = %s order by email """, (meet_id,)) voters = csr.fetchall() return render_template( "time_vote_fragment.html", times=times, result=result, voters=voters) @app.get("/result//time") def result_time(meet_id): result = simple_sum(meet_id, "time") log.debug("result = %s", result) csr = get_cursor() csr.execute( """ select distinct email, short from time_vote join time on time_vote.time = time.id join bod on time_vote.bod = bod.id where meet = %s order by email """, (meet_id,)) voters = csr.fetchall() return render_template( "time_result_fragment.html", result=result, voters=voters) @app.post("/vote/place") def vote_place(): log.debug("form = %s", request.form) meet_id, preferences = get_preferences("place") place_ids = list(preferences.keys()) csr = get_cursor() csr.execute( "delete from place_vote where place = any (%s) and bod = %s", (place_ids, session["user"]["id"])) for place_id, pref in preferences.items(): csr.execute( "insert into place_vote(place, bod, preference) values(%s, %s, %s)", (place_id, session["user"]["id"], pref) ) # XXX - (almost) duplicate csr.execute( """ select d.id, d.name, preference from place d left join place_vote v on d.id = v.place and v.bod = %s where meet = %s order by d.name """, (session["user"]["id"], meet_id,)) places = csr.fetchall() result = simple_sum(meet_id, "place") log.debug("result = %s", result) csr.execute( """ select distinct email, short from place_vote join place on place_vote.place = place.id join bod on place_vote.bod = bod.id where meet = %s order by email """, (meet_id,)) voters = csr.fetchall() return render_template( "place_vote_fragment.html", places=places, result=result, voters=voters) @app.get("/result//place") def result_place(meet_id): result = simple_sum(meet_id, "place") log.debug("result = %s", result) csr = get_cursor() csr.execute( """ select distinct email, short from place_vote join place on place_vote.place = place.id join bod on place_vote.bod = bod.id where meet = %s order by email """, (meet_id,)) voters = csr.fetchall() return render_template( "place_result_fragment.html", result=result, voters=voters) @app.get("/result//date/matrix") def result_matrix_date(meet_id): candidates, voters, preference, candidate_sum, voter_max = get_matrix(meet_id, "date") return render_template( "date_result_matrix.html", candidates=candidates, voters=voters, preference=preference, candidate_sum=candidate_sum, voter_max=voter_max ) @app.get("/result//time/matrix") def result_matrix_time(meet_id): candidates, voters, preference, candidate_sum, voter_max = get_matrix(meet_id, "time") return render_template( "time_result_matrix.html", candidates=candidates, voters=voters, preference=preference, candidate_sum=candidate_sum, voter_max=voter_max ) @app.get("/result//place/matrix") def result_matrix_place(meet_id): candidates, voters, preference, candidate_sum, voter_max = get_matrix(meet_id, "place") return render_template( "place_result_matrix.html", candidates=candidates, voters=voters, preference=preference, candidate_sum=candidate_sum, voter_max=voter_max ) def send_mail(email_address, confirmation_url): msg = email.message.EmailMessage() msg["From"] ="noreply@hjp.at" msg["To"] = email_address msg["Subject"] = "MEEAT confirmation" body = confirmation_url # Really minimalistic msg.set_content(body) mta = smtplib.SMTP(host="localhost") mta.send_message(msg) def get_matrix(meet_id, kind): csr = get_cursor() q = f""" select {kind}.*, preference, email, short from {kind} join {kind}_vote on {kind}.id = {kind}_vote.{kind} join bod on bod = bod.id where meet = %s order by preference desc """ csr.execute(q, (meet_id,)) candidates = {} voters = {} voter_max = collections.defaultdict(float) candidate_sum = collections.defaultdict(float) preference = collections.defaultdict(dict) for r in csr: candidates[r.id] = r voters[r.email] = r preference[r.id][r.email] = r.preference if r.preference > voter_max[r.email]: voter_max[r.email] = r.preference candidate_sum[r.id] += r.preference app.logger.debug(f"{candidates=}") return candidates, voters, preference, candidate_sum, voter_max def simple_sum(meet_id, kind): q = \ f""" select k.*, sum(preference) as preference from {kind} k join {kind}_vote v on k.id = v.{kind} where k.meet = %s group by k.id order by preference desc """ csr = get_cursor() csr.execute(q, (meet_id,)) return csr.fetchall() def get_preferences(kind): preferences = {} for k, v in request.form.items(): if m := re.match(kind + r"_(\d+)", k): candidate = int(m.group(1)) preference = float(v) if not (0 <= preference <= 100): # this should never happen. Is the client messing around? log.warning(f"Preference {preference} not in range [0, 100]") abort(400) preferences[candidate] = preference csr = get_cursor() # Retrieve the meet.id from the kind ids. This also ensures that all # refer to the same meet. csr.execute(f"select distinct meet from {kind} where id = any (%s)", (list(preferences.keys()),)) r = csr.fetchall() if len(r) != 1: # this should never happen. Is the client messing around? log.warning(f"{kind} ids {list(preferences.keys())} map to meets {r}") abort(400) meet_id = r[0].meet return meet_id, preferences def get_cursor(): db = get_db() csr = db.cursor(row_factory=psycopg.rows.namedtuple_row) return csr def get_db(): if "db" not in g: g.db = psycopg.connect(dbname=config.dbname, user=config.dbuser) return g.db @app.teardown_appcontext def teardown_db(exception): log.debug("in teardown_db: exception = %s", exception) db = g.pop('db', None) if db is not None: if not exception: try: db.commit() except: pass db.rollback() db.close()