525 lines
16 KiB
Python
525 lines
16 KiB
Python
import datetime
|
|
import email.message
|
|
import logging
|
|
import logging.config
|
|
import os
|
|
import smtplib
|
|
|
|
import psycopg
|
|
import psycopg.rows
|
|
|
|
from flask import (
|
|
Flask, session, redirect, url_for, request, render_template,
|
|
g, abort, flash
|
|
)
|
|
|
|
import config
|
|
|
|
logging.config.dictConfig(config.logging)
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = config.secret_key
|
|
app.config["PERMANENT_SESSION_LIFETIME"] = datetime.timedelta(days=92)
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
@app.route("/")
|
|
def home():
|
|
log.debug("in home")
|
|
log.debug("session = %s", session)
|
|
if "user" not in session:
|
|
return redirect(url_for("register", target="/"))
|
|
return render_template("home.html")
|
|
|
|
@app.route("/register", methods=["GET", "POST"])
|
|
def register():
|
|
log.debug("in register")
|
|
if request.method == "GET":
|
|
return render_template("register.html")
|
|
email_address = request.form.get("email", "")
|
|
if "@" not in email_address:
|
|
flash("Das schaut nicht wie eine E-Mail-Adresse aus")
|
|
csr = get_cursor()
|
|
csr.execute("select * from bod where email = %s", (email_address,))
|
|
r = csr.fetchone()
|
|
if r:
|
|
if r.key:
|
|
key = r.key
|
|
else:
|
|
key = os.urandom(8).hex()
|
|
csr.execute(
|
|
"update bod set key = %s, keychange = now() where email = %s",
|
|
(key, email_address,))
|
|
else:
|
|
key = os.urandom(8).hex()
|
|
csr.execute(
|
|
"insert into bod(email, key, keychange) values(%s, %s, now())",
|
|
(email_address, key,))
|
|
log.debug("request.scheme = %s", request.scheme)
|
|
log.debug("request.server = %s", request.server)
|
|
log.debug("request.root_url = %s", request.root_url)
|
|
confirmation_url = \
|
|
request.root_url + \
|
|
url_for("confirm",
|
|
target=request.form["target"],
|
|
key=key)
|
|
send_mail(email_address, confirmation_url)
|
|
return render_template("wait_for_confirmation.html")
|
|
|
|
@app.route("/confirm")
|
|
def confirm():
|
|
csr = get_cursor()
|
|
csr.execute("select * from bod where key = %s", (request.args["key"],))
|
|
bod = csr.fetchone()
|
|
if not bod:
|
|
return render_template("wrong_key.html")
|
|
session["user"] = { "id": bod.id, "email": bod.email}
|
|
session.permanent = True
|
|
app.permanent_session_lifetime = datetime.timedelta(days=90)
|
|
return redirect(request.args["target"])
|
|
|
|
@app.route("/vote/<string:key>")
|
|
def vote(key):
|
|
log.debug("session = %s", session)
|
|
if "user" not in session:
|
|
return redirect(url_for("register", target=request.url))
|
|
csr = get_cursor()
|
|
csr.execute("select * from meet where key = %s", (key,))
|
|
meet = csr.fetchone()
|
|
if not meet:
|
|
abort(404)
|
|
|
|
csr.execute(
|
|
"""
|
|
select email from bod where id in (
|
|
select date_vote.bod
|
|
from date join date_vote on date.id = date_vote.date
|
|
where meet = %(meet_id)s
|
|
union
|
|
select time_vote.bod
|
|
from time join time_vote on time.id = time_vote.time
|
|
where meet = %(meet_id)s
|
|
union
|
|
select place_vote.bod
|
|
from place join place_vote on place.id = place_vote.place
|
|
where meet = %(meet_id)s
|
|
) order by 1
|
|
""",
|
|
{ "meet_id": meet.id,})
|
|
voters = csr.fetchall()
|
|
|
|
csr.execute(
|
|
"""
|
|
select d.id, d.date, d.display, position
|
|
from date d left join date_vote v on d.id = v.date and v.bod = %s
|
|
where meet = %s order by position, d.date
|
|
""",
|
|
(session["user"]["id"], meet.id,))
|
|
dates = csr.fetchall()
|
|
|
|
csr.execute(
|
|
"""
|
|
select d.id, d.time, d.display, position
|
|
from time d left join time_vote v on d.id = v.time and v.bod = %s
|
|
where meet = %s order by position, d.time
|
|
""",
|
|
(session["user"]["id"], meet.id,))
|
|
times = csr.fetchall()
|
|
|
|
csr.execute(
|
|
"""
|
|
select d.id, d.name, position
|
|
from place d left join place_vote v on d.id = v.place and v.bod = %s
|
|
where meet = %s order by position, d.name
|
|
""",
|
|
(session["user"]["id"], meet.id,))
|
|
places = csr.fetchall()
|
|
|
|
return render_template("vote.html",
|
|
meet=meet, voters=voters,dates=dates, times=times, places=places)
|
|
|
|
@app.post("/vote/date")
|
|
def vote_date():
|
|
log.debug("form = %s", request.form)
|
|
date_ids = request.form.getlist("date")
|
|
|
|
csr = get_cursor()
|
|
# Retrieve the meet.id from the date ids. This also ensures that all the
|
|
# dates refer to the same meet.
|
|
csr.execute("select distinct meet from date where id = any (%s)",
|
|
(date_ids,))
|
|
r = csr.fetchall()
|
|
if len(r) != 1:
|
|
# this should never happen. Is the client messing around?
|
|
log.warning("Date ids %s map to meets %s", date_ids, r)
|
|
abort(400)
|
|
meet_id = r[0].meet
|
|
|
|
csr.execute(
|
|
"delete from date_vote where date = any (%s) and bod = %s",
|
|
(date_ids, session["user"]["id"]))
|
|
for pos, date_id in enumerate(date_ids):
|
|
csr.execute(
|
|
"insert into date_vote(date, bod, position) values(%s, %s, %s)",
|
|
(date_id, session["user"]["id"], pos)
|
|
)
|
|
# XXX - (almost) duplicate
|
|
csr.execute(
|
|
"""
|
|
select d.id, d.date, d.display, position
|
|
from date d left join date_vote v on d.id = v.date and v.bod = %s
|
|
where meet = %s order by position, d.date
|
|
""",
|
|
(session["user"]["id"], meet_id,))
|
|
dates = csr.fetchall()
|
|
|
|
result = instantrunoff_backward(meet_id, "date")
|
|
log.debug("result = %s", result)
|
|
|
|
csr.execute(
|
|
"""
|
|
select distinct email, short
|
|
from date_vote
|
|
join date on date_vote.date = date.id
|
|
join bod on date_vote.bod = bod.id
|
|
where meet = %s
|
|
order by email
|
|
""",
|
|
(meet_id,))
|
|
voters = csr.fetchall()
|
|
|
|
return render_template(
|
|
"date_vote_fragment.html",
|
|
dates=dates, result=result, voters=voters)
|
|
|
|
@app.get("/result/<int:meet_id>/date")
|
|
def result_date(meet_id):
|
|
result = instantrunoff_backward(meet_id, "date")
|
|
log.debug("result = %s", result)
|
|
|
|
csr = get_cursor()
|
|
csr.execute(
|
|
"""
|
|
select distinct email, short
|
|
from date_vote
|
|
join date on date_vote.date = date.id
|
|
join bod on date_vote.bod = bod.id
|
|
where meet = %s
|
|
order by email
|
|
""",
|
|
(meet_id,))
|
|
voters = csr.fetchall()
|
|
|
|
return render_template(
|
|
"date_result_fragment.html",
|
|
result=result, voters=voters)
|
|
|
|
|
|
|
|
@app.post("/vote/time")
|
|
def vote_time():
|
|
log.debug("form = %s", request.form)
|
|
time_ids = request.form.getlist("time")
|
|
|
|
csr = get_cursor()
|
|
# Retrieve the meet.id from the time ids. This also ensures that all the
|
|
# times refer to the same meet.
|
|
csr.execute("select distinct meet from time where id = any (%s)",
|
|
(time_ids,))
|
|
r = csr.fetchall()
|
|
if len(r) != 1:
|
|
# this should never happen. Is the client messing around?
|
|
log.warning("Time ids %s map to meets %s", time_ids, r)
|
|
abort(400)
|
|
meet_id = r[0].meet
|
|
|
|
csr.execute(
|
|
"delete from time_vote where time = any (%s) and bod = %s",
|
|
(time_ids, session["user"]["id"]))
|
|
for pos, time_id in enumerate(time_ids):
|
|
csr.execute(
|
|
"insert into time_vote(time, bod, position) values(%s, %s, %s)",
|
|
(time_id, session["user"]["id"], pos)
|
|
)
|
|
# XXX - (almost) duplicate
|
|
csr.execute(
|
|
"""
|
|
select d.id, d.time, d.display, position
|
|
from time d left join time_vote v on d.id = v.time and v.bod = %s
|
|
where meet = %s order by position, d.time
|
|
""",
|
|
(session["user"]["id"], meet_id,))
|
|
times = csr.fetchall()
|
|
|
|
result = instantrunoff_backward(meet_id, "time")
|
|
log.debug("result = %s", result)
|
|
|
|
csr.execute(
|
|
"""
|
|
select distinct email, short
|
|
from time_vote
|
|
join time on time_vote.time = time.id
|
|
join bod on time_vote.bod = bod.id
|
|
where meet = %s
|
|
order by email
|
|
""",
|
|
(meet_id,))
|
|
voters = csr.fetchall()
|
|
|
|
return render_template(
|
|
"time_vote_fragment.html",
|
|
times=times, result=result, voters=voters)
|
|
|
|
@app.get("/result/<int:meet_id>/time")
|
|
def result_time(meet_id):
|
|
result = instantrunoff_backward(meet_id, "time")
|
|
log.debug("result = %s", result)
|
|
|
|
csr = get_cursor()
|
|
csr.execute(
|
|
"""
|
|
select distinct email, short
|
|
from time_vote
|
|
join time on time_vote.time = time.id
|
|
join bod on time_vote.bod = bod.id
|
|
where meet = %s
|
|
order by email
|
|
""",
|
|
(meet_id,))
|
|
voters = csr.fetchall()
|
|
|
|
return render_template(
|
|
"time_result_fragment.html",
|
|
result=result, voters=voters)
|
|
|
|
@app.post("/vote/place")
|
|
def vote_place():
|
|
log.debug("form = %s", request.form)
|
|
place_ids = request.form.getlist("place")
|
|
|
|
csr = get_cursor()
|
|
# Retrieve the meet.id from the place ids. This also ensures that all the
|
|
# places refer to the same meet.
|
|
csr.execute("select distinct meet from place where id = any (%s)",
|
|
(place_ids,))
|
|
r = csr.fetchall()
|
|
if len(r) != 1:
|
|
# this should never happen. Is the client messing around?
|
|
log.warning("Place ids %s map to meets %s", place_ids, r)
|
|
abort(400)
|
|
meet_id = r[0].meet
|
|
|
|
csr.execute(
|
|
"delete from place_vote where place = any (%s) and bod = %s",
|
|
(place_ids, session["user"]["id"]))
|
|
for pos, place_id in enumerate(place_ids):
|
|
csr.execute(
|
|
"insert into place_vote(place, bod, position) values(%s, %s, %s)",
|
|
(place_id, session["user"]["id"], pos)
|
|
)
|
|
# XXX - (almost) duplicate
|
|
csr.execute(
|
|
"""
|
|
select d.id, d.name, position
|
|
from place d left join place_vote v on d.id = v.place and v.bod = %s
|
|
where meet = %s order by position, d.name
|
|
""",
|
|
(session["user"]["id"], meet_id,))
|
|
places = csr.fetchall()
|
|
|
|
result = instantrunoff_backward(meet_id, "place")
|
|
log.debug("result = %s", result)
|
|
|
|
csr.execute(
|
|
"""
|
|
select distinct email, short
|
|
from place_vote
|
|
join place on place_vote.place = place.id
|
|
join bod on place_vote.bod = bod.id
|
|
where meet = %s
|
|
order by email
|
|
""",
|
|
(meet_id,))
|
|
voters = csr.fetchall()
|
|
|
|
return render_template(
|
|
"place_vote_fragment.html",
|
|
places=places, result=result, voters=voters)
|
|
|
|
|
|
@app.get("/result/<int:meet_id>/place")
|
|
def result_place(meet_id):
|
|
result = instantrunoff_backward(meet_id, "place")
|
|
log.debug("result = %s", result)
|
|
|
|
csr = get_cursor()
|
|
csr.execute(
|
|
"""
|
|
select distinct email, short
|
|
from place_vote
|
|
join place on place_vote.place = place.id
|
|
join bod on place_vote.bod = bod.id
|
|
where meet = %s
|
|
order by email
|
|
""",
|
|
(meet_id,))
|
|
voters = csr.fetchall()
|
|
|
|
return render_template(
|
|
"place_result_fragment.html",
|
|
result=result, voters=voters)
|
|
|
|
|
|
@app.get("/result/<int:meet_id>/date/ballot")
|
|
def result_ballots_date(meet_id):
|
|
ballots = get_ballots(meet_id, "date")
|
|
result = instantrunoff_backward(meet_id, "date")
|
|
|
|
return render_template(
|
|
"date_result_ballots.html",
|
|
result=result, ballots=ballots)
|
|
|
|
@app.get("/result/<int:meet_id>/time/ballot")
|
|
def result_ballots_time(meet_id):
|
|
ballots = get_ballots(meet_id, "time")
|
|
result = instantrunoff_backward(meet_id, "time")
|
|
|
|
return render_template(
|
|
"time_result_ballots.html",
|
|
result=result, ballots=ballots)
|
|
|
|
@app.get("/result/<int:meet_id>/place/ballot")
|
|
def result_ballots_place(meet_id):
|
|
ballots = get_ballots(meet_id, "place")
|
|
result = instantrunoff_backward(meet_id, "place")
|
|
|
|
return render_template(
|
|
"place_result_ballots.html",
|
|
result=result, ballots=ballots)
|
|
|
|
|
|
def send_mail(email_address, confirmation_url):
|
|
msg = email.message.EmailMessage()
|
|
msg["From"] ="noreply@hjp.at"
|
|
msg["To"] = email_address
|
|
msg["Subject"] = "MEEAT confirmation"
|
|
body = confirmation_url # Really minimalistic
|
|
msg.set_content(body)
|
|
mta = smtplib.SMTP(host="localhost")
|
|
mta.send_message(msg)
|
|
|
|
def get_ballots(meet_id, kind):
|
|
csr = get_cursor()
|
|
|
|
q = f"""
|
|
select {kind}.*, bod, position, email,
|
|
min(w) over(partition by bod order by position) as vote_w
|
|
from {kind}
|
|
join {kind}_vote on {kind}.id = {kind}_vote.{kind}
|
|
join bod on bod = bod.id
|
|
where meet = %s
|
|
order by bod, position
|
|
"""
|
|
csr.execute(q, (meet_id,))
|
|
|
|
last_bod = None
|
|
ballots = []
|
|
for r in csr:
|
|
if r.bod != last_bod:
|
|
ballot = []
|
|
ballots.append(ballot)
|
|
last_bod = r.bod
|
|
ballot.append(r)
|
|
return ballots
|
|
|
|
def dump_ballots(ballots):
|
|
for ballot in ballots:
|
|
log.debug ("---")
|
|
for r in ballot:
|
|
log.debug(r)
|
|
|
|
def runoff(ballots, direction):
|
|
count = {}
|
|
candidates = {}
|
|
for ballot in ballots:
|
|
for r in ballot:
|
|
if r.id not in count or len(count[r.id]) < len(ballot):
|
|
log.debug("count[%d] <- %d elements", r.id, len(ballot))
|
|
count[r.id] = [0] * len(ballot)
|
|
candidates[r.id] = r
|
|
for ballot in ballots:
|
|
weight = max(r.vote_w for r in ballot)
|
|
for pos, r in enumerate(ballot):
|
|
log.debug("count = %s", count)
|
|
log.debug("r.id = %s", r.id)
|
|
log.debug("pos = %s", pos)
|
|
count[r.id][pos] += weight
|
|
log.debug("count[%d][%d]) = %d", r.id, pos, count[r.id][pos])
|
|
if direction == "backward":
|
|
result = sorted(count.keys(), key=lambda i: list(reversed(count[i])), reverse=True)
|
|
else:
|
|
result = sorted(count.keys(), key=lambda i: count[i])
|
|
log.debug("result of this round:")
|
|
for r in result:
|
|
log.debug("%s %s", r, count[r])
|
|
log.debug("striking %d", result[0])
|
|
loser = candidates[result[0]]
|
|
new_ballots = [
|
|
[
|
|
r for r in ballot if r.id != loser.id
|
|
] for ballot in ballots
|
|
]
|
|
return loser, new_ballots
|
|
|
|
def instantrunoff_forward(meet_id, kind):
|
|
ballots = get_ballots(meet_id, kind)
|
|
|
|
result = []
|
|
while max(len(b) for b in ballots):
|
|
dump_ballots(ballots)
|
|
loser, ballots = runoff(ballots, "forward")
|
|
result.append(loser)
|
|
result = list(reversed(result))
|
|
log.debug("final result")
|
|
for r in result:
|
|
log.debug(r)
|
|
return result
|
|
|
|
def instantrunoff_backward(meet_id, kind):
|
|
ballots = get_ballots(meet_id, kind)
|
|
|
|
result = []
|
|
while max(len(b) for b in ballots):
|
|
dump_ballots(ballots)
|
|
loser, ballots = runoff(ballots, "backward")
|
|
result.append(loser)
|
|
result = list(reversed(result))
|
|
log.debug("final result")
|
|
for r in result:
|
|
log.debug(r)
|
|
return result
|
|
|
|
def get_cursor():
|
|
db = get_db()
|
|
csr = db.cursor(row_factory=psycopg.rows.namedtuple_row)
|
|
return csr
|
|
|
|
def get_db():
|
|
if "db" not in g:
|
|
g.db = psycopg.connect(dbname=config.dbname, user=config.dbuser)
|
|
return g.db
|
|
|
|
@app.teardown_appcontext
|
|
def teardown_db(exception):
|
|
log.debug("in teardown_db: exception = %s", exception)
|
|
db = g.pop('db', None)
|
|
if db is not None:
|
|
if not exception:
|
|
try:
|
|
db.commit()
|
|
except:
|
|
pass
|
|
db.rollback()
|
|
db.close()
|
|
|