diff --git a/process_queue b/process_queue new file mode 100644 index 0000000..2886862 --- /dev/null +++ b/process_queue @@ -0,0 +1,71 @@ +#!/usr/bin/python3 + +import os +import socket +import time + +from ltsdb_json import LTS + +node = socket.gethostbyaddr(socket.gethostname())[0] + +class DiskFullPredictor: + def match(self, lts): + # measure=bytes_used, mountpoint=* + if "measure" not in lts.description: + return False + if lts.description["measure"] != "bytes_used": + return False + if "mountpoint" not in lts.description: + return False + return True + + def run(self, lts): + # find matching bytes_usable series + desc = {**lts.description, "measure": "bytes_usable"} + usable_lts = LTS(description=desc) + # The two timeseries are always updated together, so the + # timestamps should match exactly. But just in case we decide to + # change that in the future accept a difference of up to an + # hour. + now = lts.data[-1][0] + if abs(now - usable_lts.data[-1][0]) > 3600: + log.warning("Timeseries %s and %s have different end times: %s vs %s", + lts.id, usable_lts.id, + now, usable_lts.data[-1][0]) + return + current_used_bytes = lts.data[-1][1] + current_usable_bytes = usable_lts.data[-1][1] + tuf = float('inf') + for d in reversed(lts.data): + if d[1] < current_usable_bytes * 0.1: + continue # for sanity + if current_used_bytes ** 2 / d[1] > current_usable_bytes: + tuf = now - d[0] + break + desc = {**lts.description, + "measure": "time_until_disk_full", + "node": node, + "unit": "s", + "remote_addr": "", + } + lts = LTS(desc) + lts.add(now, tuf) + lts.save() + +processors = [ + DiskFullPredictor(), +] + +def process(lts): + for processor in processors: + if processor.match(lts): + processor.run(lts) + +while True: + for id in os.listdir("queue"): + lts = LTS(id=id) + os.remove("queue/" + id) + process(lts) + time.sleep(1) + +