Process queue
This commit is contained in:
parent
caeaa0bf73
commit
a802f2ee27
|
@ -0,0 +1,71 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
|
||||
from ltsdb_json import LTS
|
||||
|
||||
node = socket.gethostbyaddr(socket.gethostname())[0]
|
||||
|
||||
class DiskFullPredictor:
|
||||
def match(self, lts):
|
||||
# measure=bytes_used, mountpoint=*
|
||||
if "measure" not in lts.description:
|
||||
return False
|
||||
if lts.description["measure"] != "bytes_used":
|
||||
return False
|
||||
if "mountpoint" not in lts.description:
|
||||
return False
|
||||
return True
|
||||
|
||||
def run(self, lts):
|
||||
# find matching bytes_usable series
|
||||
desc = {**lts.description, "measure": "bytes_usable"}
|
||||
usable_lts = LTS(description=desc)
|
||||
# The two timeseries are always updated together, so the
|
||||
# timestamps should match exactly. But just in case we decide to
|
||||
# change that in the future accept a difference of up to an
|
||||
# hour.
|
||||
now = lts.data[-1][0]
|
||||
if abs(now - usable_lts.data[-1][0]) > 3600:
|
||||
log.warning("Timeseries %s and %s have different end times: %s vs %s",
|
||||
lts.id, usable_lts.id,
|
||||
now, usable_lts.data[-1][0])
|
||||
return
|
||||
current_used_bytes = lts.data[-1][1]
|
||||
current_usable_bytes = usable_lts.data[-1][1]
|
||||
tuf = float('inf')
|
||||
for d in reversed(lts.data):
|
||||
if d[1] < current_usable_bytes * 0.1:
|
||||
continue # for sanity
|
||||
if current_used_bytes ** 2 / d[1] > current_usable_bytes:
|
||||
tuf = now - d[0]
|
||||
break
|
||||
desc = {**lts.description,
|
||||
"measure": "time_until_disk_full",
|
||||
"node": node,
|
||||
"unit": "s",
|
||||
"remote_addr": "",
|
||||
}
|
||||
lts = LTS(desc)
|
||||
lts.add(now, tuf)
|
||||
lts.save()
|
||||
|
||||
processors = [
|
||||
DiskFullPredictor(),
|
||||
]
|
||||
|
||||
def process(lts):
|
||||
for processor in processors:
|
||||
if processor.match(lts):
|
||||
processor.run(lts)
|
||||
|
||||
while True:
|
||||
for id in os.listdir("queue"):
|
||||
lts = LTS(id=id)
|
||||
os.remove("queue/" + id)
|
||||
process(lts)
|
||||
time.sleep(1)
|
||||
|
||||
|
Loading…
Reference in New Issue