ltsdb/process_queue

93 lines
3.0 KiB
Python

#!/usr/bin/python3
import logging
import logging.config
import math
import os
import socket
import statistics
import time
from ltsdb_json import LTS
import config
logging.config.dictConfig(config.logging)
log = logging.getLogger("process_queue")
node = socket.gethostbyaddr(socket.gethostname())[0]
class DiskFullPredictor:
def match(self, lts):
# measure=bytes_used, mountpoint=*
if "measure" not in lts.description:
return False
if lts.description["measure"] != "bytes_used":
return False
if "mountpoint" not in lts.description:
return False
return True
def run(self, lts):
# find matching bytes_usable series
desc = {**lts.description, "measure": "bytes_usable"}
usable_lts = LTS(description=desc)
# The two timeseries are always updated together, so the
# timestamps should match exactly. But just in case we decide to
# change that in the future accept a difference of up to an
# hour.
now = lts.data[-1][0]
if abs(now - usable_lts.data[-1][0]) > 3600:
log.warning("Timeseries %s and %s have different end times: %s vs %s",
lts.id, usable_lts.id,
now, usable_lts.data[-1][0])
return
current_used_bytes = lts.data[-1][1]
current_usable_bytes = usable_lts.data[-1][1]
tuf = 1E9
for i in reversed(range(len(lts.data))):
m = statistics.mean(x[1] for x in lts.data[max(0, i - 2) : min(len(lts.data), i + 3)])
if m < current_usable_bytes * 0.1:
continue # for sanity
if current_used_bytes ** 2 / m > current_usable_bytes:
log.info("d = %s, current_used_bytes = %s, current_usable_bytes = %s", m, current_used_bytes, current_usable_bytes)
tuf = now - lts.data[i][0]
break
else:
# XXX - this is probably a range, so maybe we should use some kind
# of average. It might also be zero, so maybe we have to search for
# the first non-zero value? For now keep it simple.
first_used_bytes = lts.data[0][1]
historic_growth = current_used_bytes / first_used_bytes
future_growth = current_usable_bytes / current_used_bytes
tuf = math.log(future_growth) / math.log(historic_growth) * (now - lts.data[0][0])
tuf = max(tuf, now - lts.data[0][0])
desc = {**lts.description,
"measure": "time_until_disk_full",
"node": node,
"unit": "s",
"remote_addr": "",
}
lts = LTS(desc)
lts.add(now, tuf)
lts.save()
processors = [
DiskFullPredictor(),
]
def process(lts):
for processor in processors:
if processor.match(lts):
processor.run(lts)
while True:
for id in os.listdir("queue"):
lts = LTS(id=id)
os.remove("queue/" + id)
process(lts)
time.sleep(1)