import datetime import email.message import logging import logging.config import os import smtplib import psycopg import psycopg.rows from flask import ( Flask, session, redirect, url_for, request, render_template, g, abort ) import config logging.config.dictConfig(config.logging) app = Flask(__name__) app.secret_key = config.secret_key app.config["PERMANENT_SESSION_LIFETIME"] = datetime.timedelta(days=92) log = logging.getLogger(__name__) @app.route("/") def home(): log.debug("in home") log.debug("session = %s", session) if "user" not in session: return redirect(url_for("register", target="/")) return render_template("home.html") @app.route("/register", methods=["GET", "POST"]) def register(): log.debug("in register") if request.method == "GET": return render_template("register.html") email_address = request.form.get("email", "") if "@" not in email_address: flash("Das schaut nicht wie eine E-Mail-Adresse aus") csr = get_cursor() csr.execute("select * from bod where email = %s", (email_address,)) r = csr.fetchone() if r: if r.key: key = r.key else: key = os.urandom(8).hex() csr.execute( "update bod set key = %s, keychange = now() where email = %s", (key, email_address,)) else: key = os.urandom(8).hex() csr.execute( "insert into bod(email, key, keychange) values(%s, %s, now())", (email_address, key,)) log.debug("request.scheme = %s", request.scheme) log.debug("request.server = %s", request.server) log.debug("request.root_url = %s", request.root_url) confirmation_url = \ request.root_url + \ url_for("confirm", target=request.form["target"], key=key) send_mail(email_address, confirmation_url) return render_template("wait_for_confirmation.html") @app.route("/confirm") def confirm(): csr = get_cursor() csr.execute("select * from bod where key = %s", (request.args["key"],)) bod = csr.fetchone() if not bod: return render_template("wrong_key.html") session["user"] = { "id": bod.id, "email": bod.email} session.permanent = True app.permanent_session_lifetime = datetime.timedelta(days=90) return redirect(request.args["target"]) @app.route("/vote/") def vote(key): log.debug("session = %s", session) if "user" not in session: return redirect(url_for("register", target=request.url)) csr = get_cursor() csr.execute("select * from meet where key = %s", (key,)) meet = csr.fetchone() if not meet: abort(404) csr.execute( """ select email from bod where id in ( select date_vote.bod from date join date_vote on date.id = date_vote.date where meet = %(meet_id)s union select time_vote.bod from time join time_vote on time.id = time_vote.time where meet = %(meet_id)s union select place_vote.bod from place join place_vote on place.id = place_vote.place where meet = %(meet_id)s ) order by 1 """, { "meet_id": meet.id,}) voters = csr.fetchall() csr.execute( """ select d.id, d.date, d.display, position from date d left join date_vote v on d.id = v.date and v.bod = %s where meet = %s order by position, d.date """, (session["user"]["id"], meet.id,)) dates = csr.fetchall() csr.execute( """ select d.id, d.time, d.display, position from time d left join time_vote v on d.id = v.time and v.bod = %s where meet = %s order by position, d.time """, (session["user"]["id"], meet.id,)) times = csr.fetchall() csr.execute( """ select d.id, d.name, position from place d left join place_vote v on d.id = v.place and v.bod = %s where meet = %s order by position, d.name """, (session["user"]["id"], meet.id,)) places = csr.fetchall() return render_template("vote.html", meet=meet, voters=voters,dates=dates, times=times, places=places) @app.post("/vote/date") def vote_date(): log.debug("form = %s", request.form) date_ids = request.form.getlist("date") csr = get_cursor() # Retrieve the meet.id from the date ids. This also ensures that all the # dates refer to the same meet. csr.execute("select distinct meet from date where id = any (%s)", (date_ids,)) r = csr.fetchall() if len(r) != 1: # this should never happen. Is the client messing around? log.warning("Date ids %s map to meets %s", date_ids, r) abort(400) meet_id = r[0].meet csr.execute( "delete from date_vote where date = any (%s) and bod = %s", (date_ids, session["user"]["id"])) for pos, date_id in enumerate(date_ids): csr.execute( "insert into date_vote(date, bod, position) values(%s, %s, %s)", (date_id, session["user"]["id"], pos) ) # XXX - (almost) duplicate csr.execute( """ select d.id, d.date, d.display, position from date d left join date_vote v on d.id = v.date and v.bod = %s where meet = %s order by position, d.date """, (session["user"]["id"], meet_id,)) dates = csr.fetchall() result = instantrunoff_backward(meet_id, "date") log.debug("result = %s", result) csr.execute( """ select distinct email, short from date_vote join date on date_vote.date = date.id join bod on date_vote.bod = bod.id where meet = %s order by email """, (meet_id,)) voters = csr.fetchall() return render_template( "date_vote_fragment.html", dates=dates, result=result, voters=voters) @app.get("/result//date") def result_date(meet_id): result = instantrunoff_backward(meet_id, "date") log.debug("result = %s", result) csr = get_cursor() csr.execute( """ select distinct email, short from date_vote join date on date_vote.date = date.id join bod on date_vote.bod = bod.id where meet = %s order by email """, (meet_id,)) voters = csr.fetchall() return render_template( "date_result_fragment.html", result=result, voters=voters) @app.post("/vote/time") def vote_time(): log.debug("form = %s", request.form) time_ids = request.form.getlist("time") csr = get_cursor() # Retrieve the meet.id from the time ids. This also ensures that all the # times refer to the same meet. csr.execute("select distinct meet from time where id = any (%s)", (time_ids,)) r = csr.fetchall() if len(r) != 1: # this should never happen. Is the client messing around? log.warning("Time ids %s map to meets %s", time_ids, r) abort(400) meet_id = r[0].meet csr.execute( "delete from time_vote where time = any (%s) and bod = %s", (time_ids, session["user"]["id"])) for pos, time_id in enumerate(time_ids): csr.execute( "insert into time_vote(time, bod, position) values(%s, %s, %s)", (time_id, session["user"]["id"], pos) ) # XXX - (almost) duplicate csr.execute( """ select d.id, d.time, d.display, position from time d left join time_vote v on d.id = v.time and v.bod = %s where meet = %s order by position, d.time """, (session["user"]["id"], meet_id,)) times = csr.fetchall() result = instantrunoff_backward(meet_id, "time") log.debug("result = %s", result) csr.execute( """ select distinct email, short from time_vote join time on time_vote.time = time.id join bod on time_vote.bod = bod.id where meet = %s order by email """, (meet_id,)) voters = csr.fetchall() return render_template( "time_vote_fragment.html", times=times, result=result, voters=voters) @app.get("/result//time") def result_time(meet_id): result = instantrunoff_backward(meet_id, "time") log.debug("result = %s", result) csr = get_cursor() csr.execute( """ select distinct email, short from time_vote join time on time_vote.time = time.id join bod on time_vote.bod = bod.id where meet = %s order by email """, (meet_id,)) voters = csr.fetchall() return render_template( "time_result_fragment.html", result=result, voters=voters) @app.post("/vote/place") def vote_place(): log.debug("form = %s", request.form) place_ids = request.form.getlist("place") csr = get_cursor() # Retrieve the meet.id from the place ids. This also ensures that all the # places refer to the same meet. csr.execute("select distinct meet from place where id = any (%s)", (place_ids,)) r = csr.fetchall() if len(r) != 1: # this should never happen. Is the client messing around? log.warning("Place ids %s map to meets %s", place_ids, r) abort(400) meet_id = r[0].meet csr.execute( "delete from place_vote where place = any (%s) and bod = %s", (place_ids, session["user"]["id"])) for pos, place_id in enumerate(place_ids): csr.execute( "insert into place_vote(place, bod, position) values(%s, %s, %s)", (place_id, session["user"]["id"], pos) ) # XXX - (almost) duplicate csr.execute( """ select d.id, d.name, position from place d left join place_vote v on d.id = v.place and v.bod = %s where meet = %s order by position, d.name """, (session["user"]["id"], meet_id,)) places = csr.fetchall() result = instantrunoff_backward(meet_id, "place") log.debug("result = %s", result) csr.execute( """ select distinct email, short from place_vote join place on place_vote.place = place.id join bod on place_vote.bod = bod.id where meet = %s order by email """, (meet_id,)) voters = csr.fetchall() return render_template( "place_vote_fragment.html", places=places, result=result, voters=voters) @app.get("/result//place") def result_place(meet_id): result = instantrunoff_backward(meet_id, "place") log.debug("result = %s", result) csr = get_cursor() csr.execute( """ select distinct email, short from place_vote join place on place_vote.place = place.id join bod on place_vote.bod = bod.id where meet = %s order by email """, (meet_id,)) voters = csr.fetchall() return render_template( "place_result_fragment.html", result=result, voters=voters) @app.get("/result//date/ballot") def result_ballots_date(meet_id): ballots = get_ballots(meet_id, "date") result = instantrunoff_backward(meet_id, "date") return render_template( "date_result_ballots.html", result=result, ballots=ballots) @app.get("/result//time/ballot") def result_ballots_time(meet_id): ballots = get_ballots(meet_id, "time") result = instantrunoff_backward(meet_id, "time") return render_template( "time_result_ballots.html", result=result, ballots=ballots) @app.get("/result//place/ballot") def result_ballots_place(meet_id): ballots = get_ballots(meet_id, "place") result = instantrunoff_backward(meet_id, "place") return render_template( "place_result_ballots.html", result=result, ballots=ballots) def send_mail(email_address, confirmation_url): msg = email.message.EmailMessage() msg["From"] ="noreply@hjp.at" msg["To"] = email_address msg["Subject"] = "MEEAT confirmation" body = confirmation_url # Really minimalistic msg.set_content(body) mta = smtplib.SMTP(host="localhost") mta.send_message(msg) def get_ballots(meet_id, kind): csr = get_cursor() q = f""" select {kind}.*, bod, position, email, min(w) over(partition by bod order by position) as vote_w from {kind} join {kind}_vote on {kind}.id = {kind}_vote.{kind} join bod on bod = bod.id where meet = %s order by bod, position """ csr.execute(q, (meet_id,)) last_bod = None ballots = [] for r in csr: if r.bod != last_bod: ballot = [] ballots.append(ballot) last_bod = r.bod ballot.append(r) return ballots def dump_ballots(ballots): for ballot in ballots: log.debug ("---") for r in ballot: log.debug(r) def runoff(ballots, direction): count = {} candidates = {} for ballot in ballots: for r in ballot: if r.id not in count or len(count[r.id]) < len(ballot): log.debug("count[%d] <- %d elements", r.id, len(ballot)) count[r.id] = [0] * len(ballot) candidates[r.id] = r for ballot in ballots: weight = max(r.vote_w for r in ballot) for pos, r in enumerate(ballot): log.debug("count = %s", count) log.debug("r.id = %s", r.id) log.debug("pos = %s", pos) count[r.id][pos] += weight log.debug("count[%d][%d]) = %d", r.id, pos, count[r.id][pos]) if direction == "backward": result = sorted(count.keys(), key=lambda i: list(reversed(count[i])), reverse=True) else: result = sorted(count.keys(), key=lambda i: count[i]) log.debug("result of this round:") for r in result: log.debug("%s %s", r, count[r]) log.debug("striking %d", result[0]) loser = candidates[result[0]] new_ballots = [ [ r for r in ballot if r.id != loser.id ] for ballot in ballots ] return loser, new_ballots def instantrunoff_forward(meet_id, kind): ballots = get_ballots(meet_id, kind) result = [] while max(len(b) for b in ballots): dump_ballots(ballots) loser, ballots = runoff(ballots, "forward") result.append(loser) result = list(reversed(result)) log.debug("final result") for r in result: log.debug(r) return result def instantrunoff_backward(meet_id, kind): ballots = get_ballots(meet_id, kind) result = [] while max(len(b) for b in ballots): dump_ballots(ballots) loser, ballots = runoff(ballots, "backward") result.append(loser) result = list(reversed(result)) log.debug("final result") for r in result: log.debug(r) return result def get_cursor(): db = get_db() csr = db.cursor(row_factory=psycopg.rows.namedtuple_row) return csr def get_db(): if "db" not in g: g.db = psycopg.connect(dbname=config.dbname, user=config.dbuser) return g.db @app.teardown_appcontext def teardown_db(exception): log.debug("in teardown_db: exception = %s", exception) db = g.pop('db', None) if db is not None: if not exception: try: db.commit() except: pass db.rollback() db.close()