meeat/app.py

361 lines
11 KiB
Python

import datetime
import email.message
import logging
import os
import smtplib
import psycopg
import psycopg.rows
from flask import (
Flask, session, redirect, url_for, request, render_template,
g, abort
)
import config
logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(lineno)d | %(message)s", level=logging.DEBUG)
app = Flask(__name__)
app.secret_key = config.secret_key
app.config["PERMANENT_SESSION_LIFETIME"] = datetime.timedelta(days=92)
log = logging.getLogger(__name__)
@app.route("/")
def home():
log.debug("in home")
log.debug("session = %s", session)
if "user" not in session:
return redirect(url_for("register", target="/"))
return render_template("home.html")
@app.route("/register", methods=["GET", "POST"])
def register():
log.debug("in register")
if request.method == "GET":
return render_template("register.html")
email_address = request.form.get("email", "")
if "@" not in email_address:
flash("Das schaut nicht wie eine E-Mail-Adresse aus")
csr = get_cursor()
csr.execute("select * from bod where email = %s", (email_address,))
r = csr.fetchone()
if r:
if r.key:
key = r.key
else:
key = os.urandom(8).hex()
csr.execute(
"update bod set key = %s, keychange = now() where email = %s",
(key, email_address,))
else:
key = os.urandom(8).hex()
csr.execute(
"insert into bod(email, key, keychange) values(%s, %s, now())",
(email_address, key,))
log.debug("request.scheme = %s", request.scheme)
log.debug("request.server = %s", request.server)
log.debug("request.root_url = %s", request.root_url)
confirmation_url = \
request.root_url + \
url_for("confirm",
target=request.form["target"],
key=key)
send_mail(email_address, confirmation_url)
return render_template("wait_for_confirmation.html")
@app.route("/confirm")
def confirm():
csr = get_cursor()
csr.execute("select * from bod where key = %s", (request.args["key"],))
bod = csr.fetchone()
if not bod:
return render_template("wrong_key.html")
session["user"] = { "id": bod.id, "email": bod.email}
return redirect(request.args["target"])
@app.route("/vote/<string:key>")
def vote(key):
log.debug("session = %s", session)
if "user" not in session:
return redirect(url_for("register", target=request.url))
csr = get_cursor()
csr.execute("select * from meet where key = %s", (key,))
meet = csr.fetchone()
if not meet:
abort(404)
csr.execute(
"""
select d.id, d.date, d.display, position
from date d left join date_vote v on d.id = v.date and v.bod = %s
where meet = %s order by position, d.date
""",
(session["user"]["id"], meet.id,))
dates = csr.fetchall()
csr.execute(
"""
select d.id, d.time, d.display, position
from time d left join time_vote v on d.id = v.time and v.bod = %s
where meet = %s order by position, d.time
""",
(session["user"]["id"], meet.id,))
times = csr.fetchall()
csr.execute(
"""
select d.id, d.name, position
from place d left join place_vote v on d.id = v.place and v.bod = %s
where meet = %s order by position, d.name
""",
(session["user"]["id"], meet.id,))
places = csr.fetchall()
return render_template("vote.html",
meet=meet, dates=dates, times=times, places=places)
@app.post("/vote/date")
def vote_date():
log.debug("form = %s", request.form)
date_ids = request.form.getlist("date")
csr = get_cursor()
# Retrieve the meet.id from the date ids. This also ensures that all the
# dates refer to the same meet.
csr.execute("select distinct meet from date where id = any (%s)",
(date_ids,))
r = csr.fetchall()
if len(r) != 1:
# this should never happen. Is the client messing around?
log.warning("Date ids %s map to meets %s", date_ids, r)
abort(400)
meet_id = r[0].meet
csr.execute(
"delete from date_vote where date = any (%s) and bod = %s",
(date_ids, session["user"]["id"]))
for pos, date_id in enumerate(date_ids):
csr.execute(
"insert into date_vote(date, bod, position) values(%s, %s, %s)",
(date_id, session["user"]["id"], pos)
)
# XXX - (almost) duplicate
csr.execute(
"""
select d.id, d.date, d.display, position
from date d left join date_vote v on d.id = v.date and v.bod = %s
where meet = %s order by position, d.date
""",
(session["user"]["id"], meet_id,))
dates = csr.fetchall()
result = instantrunoff_forward(meet_id, "date")
log.debug("result = %s", result)
return render_template("date_vote_fragment.html", dates=dates, result=result)
@app.get("/result/<int:meet_id>/date")
def result_date(meet_id):
result = instantrunoff_forward(meet_id, "date")
log.debug("result = %s", result)
return render_template("date_result_fragment.html", result=result)
@app.post("/vote/time")
def vote_time():
log.debug("form = %s", request.form)
time_ids = request.form.getlist("time")
csr = get_cursor()
# Retrieve the meet.id from the time ids. This also ensures that all the
# times refer to the same meet.
csr.execute("select distinct meet from time where id = any (%s)",
(time_ids,))
r = csr.fetchall()
if len(r) != 1:
# this should never happen. Is the client messing around?
log.warning("Time ids %s map to meets %s", time_ids, r)
abort(400)
meet_id = r[0].meet
csr.execute(
"delete from time_vote where time = any (%s) and bod = %s",
(time_ids, session["user"]["id"]))
for pos, time_id in enumerate(time_ids):
csr.execute(
"insert into time_vote(time, bod, position) values(%s, %s, %s)",
(time_id, session["user"]["id"], pos)
)
# XXX - (almost) duplicate
csr.execute(
"""
select d.id, d.time, d.display, position
from time d left join time_vote v on d.id = v.time and v.bod = %s
where meet = %s order by position, d.time
""",
(session["user"]["id"], meet_id,))
times = csr.fetchall()
result = instantrunoff_forward(meet_id, "time")
log.debug("result = %s", result)
return render_template("time_vote_fragment.html", times=times, result=result)
@app.get("/result/<int:meet_id>/time")
def result_time(meet_id):
result = instantrunoff_forward(meet_id, "time")
log.debug("result = %s", result)
return render_template("time_result_fragment.html", result=result)
@app.post("/vote/place")
def vote_place():
log.debug("form = %s", request.form)
place_ids = request.form.getlist("place")
csr = get_cursor()
# Retrieve the meet.id from the place ids. This also ensures that all the
# places refer to the same meet.
csr.execute("select distinct meet from place where id = any (%s)",
(place_ids,))
r = csr.fetchall()
if len(r) != 1:
# this should never happen. Is the client messing around?
log.warning("Place ids %s map to meets %s", place_ids, r)
abort(400)
meet_id = r[0].meet
csr.execute(
"delete from place_vote where place = any (%s) and bod = %s",
(place_ids, session["user"]["id"]))
for pos, place_id in enumerate(place_ids):
csr.execute(
"insert into place_vote(place, bod, position) values(%s, %s, %s)",
(place_id, session["user"]["id"], pos)
)
# XXX - (almost) duplicate
csr.execute(
"""
select d.id, d.name, position
from place d left join place_vote v on d.id = v.place and v.bod = %s
where meet = %s order by position, d.name
""",
(session["user"]["id"], meet_id,))
places = csr.fetchall()
result = instantrunoff_forward(meet_id, "place")
log.debug("result = %s", result)
return render_template("place_vote_fragment.html", places=places, result=result)
@app.get("/result/<int:meet_id>/place")
def result_place(meet_id):
result = instantrunoff_forward(meet_id, "place")
log.debug("result = %s", result)
return render_template("place_result_fragment.html", result=result)
def send_mail(email_address, confirmation_url):
msg = email.message.EmailMessage()
msg["From"] ="noreply@hjp.at"
msg["To"] = email_address
msg["Subject"] = "MEEAT confirmation"
body = confirmation_url # Really minimalistic
msg.set_content(body)
mta = smtplib.SMTP(host="localhost")
mta.send_message(msg)
def get_ballots(meet_id, kind):
csr = get_cursor()
q = f"""
select {kind}.*, bod, position
from {kind} join {kind}_vote on {kind}.id = {kind}_vote.{kind}
where meet = %s
order by bod, position
"""
csr.execute(q, (meet_id,))
last_bod = None
ballots = []
for r in csr:
if r.bod != last_bod:
ballot = []
ballots.append(ballot)
last_bod = r.bod
ballot.append(r)
return ballots
def dump_ballots(ballots):
for ballot in ballots:
log.debug ("---")
for r in ballot:
log.debug(r)
def runoff(ballots):
count = {}
candidates = {}
for ballot in ballots:
for r in ballot:
count[r.id] = [0] * len(ballot)
candidates[r.id] = r
for ballot in ballots:
for pos, r in enumerate(ballot):
count[r.id][pos] += 1
result = sorted(count.keys(), key=lambda i: count[i])
log.debug("result of this round:")
for r in result:
log.debug("%s %s", r, count[r])
log.debug("striking %d", result[0])
loser = candidates[result[0]]
new_ballots = [
[
r for r in ballot if r.id != loser.id
] for ballot in ballots
]
return loser, new_ballots
def instantrunoff_forward(meet_id, kind):
ballots = get_ballots(meet_id, kind)
result = []
while max(len(b) for b in ballots):
dump_ballots(ballots)
loser, ballots = runoff(ballots)
result.append(loser)
result = list(reversed(result))
log.debug("final result")
for r in result:
log.debug(r)
return result
def get_cursor():
db = get_db()
csr = db.cursor(row_factory=psycopg.rows.namedtuple_row)
return csr
def get_db():
if "db" not in g:
g.db = psycopg.connect(dbname=config.dbname)
return g.db
@app.teardown_appcontext
def teardown_db(exception):
log.debug("in teardown_db: exception = %s", exception)
db = g.pop('db', None)
if db is not None:
if not exception:
try:
db.commit()
except:
pass
db.rollback()
db.close()