107 lines
3.6 KiB
Python
107 lines
3.6 KiB
Python
#!/usr/bin/python3
|
|
|
|
import logging
|
|
import logging.config
|
|
import math
|
|
import os
|
|
import socket
|
|
import statistics
|
|
import time
|
|
|
|
from ltsdb_json import LTS
|
|
|
|
import config
|
|
|
|
logging.config.dictConfig(config.logging)
|
|
log = logging.getLogger("process_queue")
|
|
|
|
|
|
node = socket.gethostbyaddr(socket.gethostname())[0]
|
|
|
|
class DiskFullPredictor:
|
|
def match(self, lts):
|
|
# measure=bytes_used, mountpoint=*
|
|
if "measure" not in lts.description:
|
|
return False
|
|
if lts.description["measure"] != "bytes_used":
|
|
return False
|
|
if "mountpoint" not in lts.description:
|
|
return False
|
|
return True
|
|
|
|
def run(self, lts):
|
|
# find matching bytes_usable series
|
|
desc = {**lts.description, "measure": "bytes_usable"}
|
|
usable_lts = LTS(description=desc)
|
|
# The two timeseries are always updated together, so the
|
|
# timestamps should match exactly. But just in case we decide to
|
|
# change that in the future accept a difference of up to an
|
|
# hour.
|
|
now = lts.data[-1][0]
|
|
if abs(now - usable_lts.data[-1][0]) > 3600:
|
|
log.warning("Timeseries %s and %s have different end times: %s vs %s",
|
|
lts.id, usable_lts.id,
|
|
now, usable_lts.data[-1][0])
|
|
return
|
|
current_used_bytes = lts.data[-1][1]
|
|
current_usable_bytes = usable_lts.data[-1][1]
|
|
tuf = 1E9
|
|
for i in reversed(range(len(lts.data))):
|
|
m = statistics.mean(x[1] for x in lts.data[max(0, i - 2) : min(len(lts.data), i + 3)])
|
|
if m < current_usable_bytes * 0.1:
|
|
continue # for sanity
|
|
if current_used_bytes ** 2 / m > current_usable_bytes:
|
|
log.info("d = %s, current_used_bytes = %s, current_usable_bytes = %s", m, current_used_bytes, current_usable_bytes)
|
|
tuf = now - lts.data[i][0]
|
|
break
|
|
else:
|
|
# Try always use the minimum of a range.
|
|
# We prefer the first datapoint
|
|
first_used_bytes = lts.data[0][2] if len(lts.data[0]) >= 4 else lts.data[0][1]
|
|
# But if that's not useable we search the whole timeseries for the
|
|
# minimum
|
|
if first_used_bytes >= current_used_bytes:
|
|
first_used_bytes = current_used_bytes
|
|
first_i = None
|
|
for i in range(len(lts.data)):
|
|
used_bytes = lts.data[i][2] if len(lts.data[i]) >= 4 else lts.data[i][1]
|
|
if used_bytes < first_used_bytes:
|
|
first_used_bytes = used_bytes
|
|
first_i = i
|
|
else:
|
|
first_i = 0
|
|
|
|
if first_i is not None:
|
|
historic_growth = current_used_bytes / first_used_bytes
|
|
future_growth = current_usable_bytes / current_used_bytes
|
|
tuf = math.log(future_growth) / math.log(historic_growth) * (now - lts.data[first_i][0])
|
|
tuf = max(tuf, now - lts.data[first_i][0])
|
|
tuf = min(tuf, 1E9)
|
|
desc = {**lts.description,
|
|
"measure": "time_until_disk_full",
|
|
"node": node,
|
|
"unit": "s",
|
|
"remote_addr": "",
|
|
}
|
|
lts = LTS(desc)
|
|
lts.add(now, tuf)
|
|
lts.save()
|
|
|
|
processors = [
|
|
DiskFullPredictor(),
|
|
]
|
|
|
|
def process(lts):
|
|
for processor in processors:
|
|
if processor.match(lts):
|
|
processor.run(lts)
|
|
|
|
while True:
|
|
for id in os.listdir("queue"):
|
|
lts = LTS(id=id)
|
|
os.remove("queue/" + id)
|
|
process(lts)
|
|
time.sleep(1)
|
|
|
|
|