172 lines
6.4 KiB
Python
172 lines
6.4 KiB
Python
#!/usr/bin/python3
|
|
|
|
import fcntl
|
|
import glob
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import math
|
|
import random
|
|
import time
|
|
|
|
log = logging.getLogger()
|
|
|
|
class LTS:
|
|
base_dir = "data"
|
|
queue_dir = "queue"
|
|
limit = 1000
|
|
|
|
def __init__(self, description=None, id=None):
|
|
if description:
|
|
canonical_description = {x: description[x] for x in sorted(description.keys())}
|
|
self.description = canonical_description
|
|
serialized_description = json.dumps(canonical_description)
|
|
m = hashlib.sha256()
|
|
m.update(bytes(serialized_description, encoding="UTF-8"))
|
|
id = m.hexdigest()
|
|
self.filename = self.base_dir + "/" + id
|
|
self.id = id
|
|
try:
|
|
with open(self.filename, "r") as fh:
|
|
fcntl.flock(fh, fcntl.LOCK_SH)
|
|
d = json.load(fh)
|
|
self.new = False
|
|
self.description = d["description"]
|
|
self.data = d["data"]
|
|
except FileNotFoundError as e:
|
|
self.new = True
|
|
self.data = []
|
|
# Create the file immediately. Makes saving later simpler if we can
|
|
# assume it exists
|
|
with open(self.filename, "x+") as fh:
|
|
fcntl.flock(fh, fcntl.LOCK_EX)
|
|
json.dump({"description": self.description, "data": self.data}, fh)
|
|
log.info(f"Created {self.filename}")
|
|
self.rebuild_index()
|
|
except json.decoder.JSONDecodeError as e:
|
|
log.exception(f"Cannot decode JSON in {self.filename}: {e}")
|
|
raise
|
|
|
|
def pop(self, i):
|
|
# Pop the element at index i and adjust the min/max values of the
|
|
# neighbours.
|
|
# We might also want to adjust the value of the neighbours to some
|
|
# (weighted) average, but I'm not sure if this is actually a good idea.
|
|
data = self.data
|
|
old = data.pop(i) # after that the neighbours are at i-1, i
|
|
min_v = old[2] if len(old) >= 4 else old[1]
|
|
max_v = old[3] if len(old) >= 4 else old[1]
|
|
if i > 0:
|
|
if len(data[i-1]) == 2:
|
|
data[i-1] = [data[i-1][0], data[i-1][1], data[i-1][1], data[i-1][1]]
|
|
if min_v < data[i-1][2]:
|
|
data[i-1][2] = min_v
|
|
if max_v > data[i-1][3]:
|
|
data[i-1][3] = max_v
|
|
if i < len(data):
|
|
if len(data[i]) == 2:
|
|
data[i] = [data[i][0], data[i][1], data[i][1], data[i][1]]
|
|
if min_v < data[i][2]:
|
|
data[i][2] = min_v
|
|
if max_v > data[i][3]:
|
|
data[i][3] = max_v
|
|
return old
|
|
|
|
def shrink(self):
|
|
# Remove one element in such a way that the distributions gets closer
|
|
# to an exponential curve through the first and the last few data
|
|
# points.
|
|
# To do this we compute the ideal t value at each point and compare it
|
|
# to the real value. We remove the first point which sticks out too
|
|
# much (I'm tempted to dub this the barber's algorithm).
|
|
# This extremely inefficient but it's simple to understand and works.
|
|
data = self.data
|
|
n = len(data)
|
|
t_last = data[-1][0]
|
|
dt = max((t_last - data[-5][0]) / 4, 1)
|
|
log.debug("dt = %s, n = %s", dt, n)
|
|
k = math.log((t_last - data[0][0]) / dt / n + 1)
|
|
for i in range(1, n):
|
|
t_ideal = (math.exp(k * (n - i)/n) - 1) * (n * dt)
|
|
if t_last - data[i][0] > t_ideal:
|
|
log.debug("%s - %s > %s -> popping element %s", t_last, data[i][0], t_ideal, i)
|
|
self.pop(i)
|
|
break
|
|
else:
|
|
# Well, it works mostly. Sometimes all the real points are below
|
|
# the curve but we have to remove one anyway. This needs to be
|
|
# heavily biased towards newer data points, but we don't want to
|
|
# delete the few newest data points so choose one at random from a
|
|
# narrow range just before that.
|
|
i = random.randrange(int(n*0.98), int(n*0.99))
|
|
log.debug("no match -> popping element %s", i)
|
|
self.pop(i)
|
|
|
|
def add(self, ts, value):
|
|
while len(self.data) >= self.limit:
|
|
self.shrink()
|
|
|
|
if len(self.data) == 0 or ts >= self.data[-1][0]:
|
|
self.data.append((ts, value,))
|
|
else:
|
|
# Shouldn't happen that often, so I do a simple linear search instead
|
|
# of a binary search
|
|
for i in range(len(self.data)):
|
|
if self.data[i][0] >= ts:
|
|
break
|
|
self.data.insert(i, (ts, value,))
|
|
|
|
def save(self):
|
|
with open(self.filename, "r+") as fh:
|
|
fcntl.flock(fh, fcntl.LOCK_EX)
|
|
json.dump({"description": self.description, "data": self.data}, fh)
|
|
fh.truncate()
|
|
with open(self.queue_dir + "/" + self.id, "w") as fh:
|
|
pass
|
|
|
|
def rebuild_index(self):
|
|
t0 = time.time()
|
|
index = {}
|
|
for fn in glob.glob(self.base_dir + "/*"):
|
|
(_, _, hash) = fn.rpartition("/")
|
|
with open(fn, "r") as fh:
|
|
fcntl.flock(fh, fcntl.LOCK_SH)
|
|
try:
|
|
d = json.load(fh)
|
|
except json.decoder.JSONDecodeError as e:
|
|
log.exception(f"Cannot decode JSON in {fn}: {e}")
|
|
raise
|
|
for k, v in d["description"].items():
|
|
d1 = index.setdefault(k, {})
|
|
d2 = d1.setdefault(v, [])
|
|
d2.append(hash)
|
|
with open(self.base_dir + "/.index", "r+") as fh:
|
|
fcntl.flock(fh, fcntl.LOCK_EX)
|
|
json.dump(index, fh)
|
|
t1 = time.time()
|
|
print("index rebuilt in", t1 - t0, "seconds")
|
|
|
|
@classmethod
|
|
def find(cls, match):
|
|
result = None
|
|
with open(cls.base_dir + "/.index", "r") as fh:
|
|
fcntl.flock(fh, fcntl.LOCK_SH)
|
|
index = json.load(fh)
|
|
for d, v in match.items():
|
|
ts = set(index[d][v])
|
|
if result is None:
|
|
result = ts
|
|
else:
|
|
result &= ts
|
|
return result
|
|
|
|
def data_json_by_row(self):
|
|
d = []
|
|
for dp in self.data:
|
|
d.append({
|
|
"t": dp[0],
|
|
"v": dp[1],
|
|
"utc": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(dp[0])),
|
|
})
|
|
return json.dumps(d)
|