import datetime
import json
import logging
import math
import time
from flask import (Flask, request, jsonify, abort, render_template_string, render_template)
from markupsafe import Markup
from ltsdb_json import LTS
log = logging.getLogger(__name__)
class Dashboard:
def __init__(self, filename="dashboard.json"):
with open (filename) as fh:
d = json.load(fh)
self.title = d["title"]
log.debug("title = ā%sā", self.title)
self.description = d["description"]
self.widgets = []
for w in d["widgets"]:
if w["type"] == "timeseries":
# XXX - currently just a copy of the gauge code,
# but I want to add support for timeseries graphs with
# multiple series later so this will change
if w.get("multi"):
ts_list = LTS.find(w["data"][0])
for ts in ts_list:
tso = LTS(id=ts)
if not tso.data:
log.warning("%s has no data: Skipping", tso.id)
continue
if tso.data[-1][0] < time.time() - 86400:
log.info("%s too old; Skipping", tso.id)
continue
w1 = {**w, "data": [ts]}
self.widgets.append(TimeSeries(w1))
else:
self.widgets.append(TimeSeries(w))
elif w["type"] == "gauge":
if w.get("multi"):
ts_list = LTS.find(w["data"][0])
for ts in ts_list:
tso = LTS(id=ts)
if not tso.data:
log.warning("%s has no data: Skipping", tso.id)
continue
if tso.data[-1][0] < time.time() - 86400:
log.info("%s too old; Skipping", tso.id)
continue
w1 = {**w, "data": [ts]}
self.widgets.append(Gauge(w1))
else:
self.widgets.append(Gauge(w))
else:
self.widgets.append(Widget(w))
def as_html(self):
return render_template("dashboard.html", dashboard=self)
class Widget:
def __init__(self, d):
self.type = d["type"]
self.stops = d["stops"]
log.debug("data = %s", d["data"])
self.lts = LTS(id=d["data"][0]) # by default we handle only one data source
pass
def as_html(self):
log.debug("")
self.lastvalue = self.lts.data[-1][1]
return Markup(render_template("widget.html", widget=self))
@property
def criticalcolor(self):
log.debug("stops = %s", self.stops)
brightness = 30
if self.stops[0] < self.stops[2]:
if self.lastvalue < self.stops[0]:
log.debug("definitely ok")
return f"hsl(120, 100%, {brightness}%)"
elif self.lastvalue < self.stops[1]:
log.debug("mostly ok")
hue = 120 - round(
(self.lastvalue - self.stops[0])
/ (self.stops[1] - self.stops[0])
* 60
)
return f"hsl({hue}, 100%, {brightness}%)"
elif self.lastvalue < self.stops[2]:
log.debug("maybe fail")
hue = 60 - round(
(self.lastvalue - self.stops[1])
/ (self.stops[2] - self.stops[1])
* 60
)
return f"hsl({hue}, 100%, {brightness}%)"
else:
log.debug("definitely fail")
return f"hsl(0, 100%, {brightness}%)"
else:
log.debug("the other side")
if self.lastvalue > self.stops[0]:
log.debug("definitely ok")
return f"hsl(120, 100%, {brightness}%)"
elif self.lastvalue > self.stops[1]:
log.debug("mostly ok")
hue = 120 - round(
(self.lastvalue - self.stops[0])
/ (self.stops[1] - self.stops[0])
* 60
)
return f"hsl({hue}, 100%, {brightness}%)"
elif self.lastvalue > self.stops[2]:
log.debug("maybe fail")
hue = 60 - round(
(self.lastvalue - self.stops[1])
/ (self.stops[2] - self.stops[1])
* 60
)
return f"hsl({hue}, 100%, {brightness}%)"
else:
log.debug("definitely fail")
return f"hsl(0, 100%, {brightness}%)"
@property
def description_formatted(self):
s = "
"
for d, v in self.lts.description.items():
if v:
s += render_template_string(
"{{d}}: | {{v}} |
",
d=d, v=v)
s += "
"
return Markup(s)
class TimeSeries(Widget):
def __init__(self, d):
super().__init__(d)
pass
@property
def graph(self):
def t2x(t):
x = n - math.log((t_last - t) / (n*dt) + 1) * n / k
return x
data = self.lts.data
n = len(data)
t_last = data[-1][0]
dt = (t_last - data[-5][0]) / 4
k = math.log((t_last - data[0][0]) / dt / n + 1)
max_value = max([d[1] for d in self.lts.data])
max_value = max(max_value, 0.001) # ensure positive
v_data = []
for i in range(n):
t = data[i][0]
x = t2x(t)
t_h = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t))
#print(t, t_h, x)
v_data.append({"t": t, "v": data[i][1], "x": x, "y": (1 - data[i][1]/max_value) * 200})
tickmarks = []
t = v_data[-1]["t"]
x = v_data[-1]["x"]
d = datetime.datetime.fromtimestamp(t)
tickmarks.append({"t": t, "t_h": d.strftime("%Y-%m-%d %H:%M:%S"), "x": x})
min_step = 25
steps = ("s", "m", "h", "D", "10D", "M", "Y")
step_i = 0
while True:
t0 = tickmarks[-1]["t"]
x0 = tickmarks[-1]["x"]
d0 = datetime.datetime.fromtimestamp(t0)
if steps[step_i] == "s":
d1 = datetime.datetime(d0.year, d0.month, d0.day, d0.hour, d0.minute, d0.second)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
d1 -= datetime.timedelta(seconds=1)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
step_i += 1
continue
if steps[step_i] == "m":
d1 = datetime.datetime(d0.year, d0.month, d0.day, d0.hour, d0.minute)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
d1 -= datetime.timedelta(minutes=1)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
step_i += 1
continue
if steps[step_i] == "h":
d1 = datetime.datetime(d0.year, d0.month, d0.day, d0.hour)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
d1 -= datetime.timedelta(hours=1)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
step_i += 1
continue
if steps[step_i] == "D":
d1 = datetime.datetime(d0.year, d0.month, d0.day)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
d1 -= datetime.timedelta(days=1)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
step_i += 1
continue
if steps[step_i] == "10D":
year = d0.year
month = d0.month
day = d0.day
if day > 21:
day = 21
elif day > 11:
day = 11
elif day > 1:
day = 1
else:
day = 21
month -= 1
if month < 1:
month += 12
year -= 1
d1 = datetime.datetime(year, month, day)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
if day > 21:
day = 21
elif day > 11:
day = 11
elif day > 1:
day = 1
else:
day = 21
month -= 1
if month < 1:
month += 12
year -= 1
d1 = datetime.datetime(year, month, day)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
step_i += 1
continue
if steps[step_i] == "M":
d1 = datetime.datetime(d0.year, d0.month, 1)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
if d1.month > 1:
d1 = datetime.datetime(d1.year, d1.month-1, 1)
else:
d1 = datetime.datetime(d1.year-1, 12, 1)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
step_i += 1
continue
if steps[step_i] == "Y":
d1 = datetime.datetime(d0.year, 1, 1)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
d1 = datetime.datetime(d1.year-1, 1, 1)
t1 = d1.timestamp()
x1 = t2x(t1)
if x0 - x1 < min_step:
step_i += 1
continue
if x1 < 0:
break
tickmarks.append({"t": t1, "t_h": d1.strftime("%Y-%m-%d %H:%M:%S"), "x": x1})
if tickmarks[-1]["x"] > min_step:
t = v_data[0]["t"]
x = v_data[0]["x"]
d = datetime.datetime.fromtimestamp(t)
tickmarks.append({"t": t, "t_h": d.strftime("%Y-%m-%d %H:%M:%S"), "x": x})
self.x_tickmarks = tickmarks
self.y_tickmarks = []
step = 10 ** math.floor(math.log10(max_value))
v = 0
while v < max_value:
self.y_tickmarks.append({"y": (1 - v/max_value) * 200, "v_h": str(v)})
v += step
html = ""
html += ""
return Markup(html)
def as_html(self):
return Markup(render_template("timeseries.html", widget=self))
class Gauge(Widget):
def __init__(self, d):
super().__init__(d)
pass
gaugesize = 100
@property
def gaugepos(self):
max_value = max([d[1] for d in self.lts.data])
max_value = max(max_value, 1) # ensure positive
log.debug("max_value = %s", max_value)
return self.lastvalue / max_value * self.gaugesize
def as_html(self):
log.debug("")
self.lastvalue = self.lts.data[-1][1]
value = self.lastvalue
unit = self.lts.description["unit"]
log.debug("unit = %s", unit)
if unit == "s":
if value >= 365.25 * 86400:
value /= 365.25 * 86400
unit = "years"
elif value >= 86400:
value /= 86400
unit = "days"
elif value >= 3600:
value /= 3600
unit = "h"
elif value >= 60:
value /= 60
unit = "m"
elif value >= 1:
pass
elif value >= 0.001:
value *= 1000
unit = "ms"
self.lastvalue_formatted = Markup(f"{value:.2f}{unit}")
return Markup(render_template("gauge.html", widget=self))