ltsdb/ltsdb_json.py

172 lines
6.4 KiB
Python
Raw Permalink Normal View History

2022-08-21 11:58:31 +02:00
#!/usr/bin/python3
import fcntl
import glob
import hashlib
import json
import logging
import math
2022-09-02 14:06:47 +02:00
import random
2022-08-21 11:58:31 +02:00
import time
log = logging.getLogger()
2022-08-21 11:58:31 +02:00
class LTS:
base_dir = "data"
2022-12-11 22:58:04 +01:00
queue_dir = "queue"
2022-08-21 11:58:31 +02:00
limit = 1000
2022-09-04 17:58:17 +02:00
def __init__(self, description=None, id=None):
if description:
canonical_description = {x: description[x] for x in sorted(description.keys())}
self.description = canonical_description
serialized_description = json.dumps(canonical_description)
m = hashlib.sha256()
m.update(bytes(serialized_description, encoding="UTF-8"))
id = m.hexdigest()
self.filename = self.base_dir + "/" + id
2022-12-11 22:58:04 +01:00
self.id = id
2022-08-21 11:58:31 +02:00
try:
with open(self.filename, "r") as fh:
fcntl.flock(fh, fcntl.LOCK_SH)
d = json.load(fh)
self.new = False
2022-09-04 17:58:17 +02:00
self.description = d["description"]
2022-08-21 11:58:31 +02:00
self.data = d["data"]
except FileNotFoundError as e:
self.new = True
self.data = []
# Create the file immediately. Makes saving later simpler if we can
# assume it exists
with open(self.filename, "x+") as fh:
fcntl.flock(fh, fcntl.LOCK_EX)
json.dump({"description": self.description, "data": self.data}, fh)
2024-08-24 22:51:29 +02:00
log.info(f"Created {self.filename}")
2022-08-21 11:58:31 +02:00
self.rebuild_index()
2024-05-24 22:32:25 +02:00
except json.decoder.JSONDecodeError as e:
log.exception(f"Cannot decode JSON in {self.filename}: {e}")
raise
2022-08-21 11:58:31 +02:00
def pop(self, i):
# Pop the element at index i and adjust the min/max values of the
# neighbours.
# We might also want to adjust the value of the neighbours to some
# (weighted) average, but I'm not sure if this is actually a good idea.
data = self.data
old = data.pop(i) # after that the neighbours are at i-1, i
min_v = old[2] if len(old) >= 4 else old[1]
max_v = old[3] if len(old) >= 4 else old[1]
if i > 0:
if len(data[i-1]) == 2:
data[i-1] = [data[i-1][0], data[i-1][1], data[i-1][1], data[i-1][1]]
if min_v < data[i-1][2]:
data[i-1][2] = min_v
if max_v > data[i-1][3]:
data[i-1][3] = max_v
if i < len(data):
if len(data[i]) == 2:
data[i] = [data[i][0], data[i][1], data[i][1], data[i][1]]
if min_v < data[i][2]:
data[i][2] = min_v
if max_v > data[i][3]:
data[i][3] = max_v
return old
def shrink(self):
# Remove one element in such a way that the distributions gets closer
# to an exponential curve through the first and the last few data
# points.
# To do this we compute the ideal t value at each point and compare it
# to the real value. We remove the first point which sticks out too
# much (I'm tempted to dub this the barber's algorithm).
# This extremely inefficient but it's simple to understand and works.
data = self.data
n = len(data)
t_last = data[-1][0]
dt = max((t_last - data[-5][0]) / 4, 1)
log.debug("dt = %s, n = %s", dt, n)
k = math.log((t_last - data[0][0]) / dt / n + 1)
for i in range(1, n):
t_ideal = (math.exp(k * (n - i)/n) - 1) * (n * dt)
if t_last - data[i][0] > t_ideal:
log.debug("%s - %s > %s -> popping element %s", t_last, data[i][0], t_ideal, i)
self.pop(i)
break
else:
# Well, it works mostly. Sometimes all the real points are below
# the curve but we have to remove one anyway. This needs to be
# heavily biased towards newer data points, but we don't want to
# delete the few newest data points so choose one at random from a
# narrow range just before that.
i = random.randrange(int(n*0.98), int(n*0.99))
log.debug("no match -> popping element %s", i)
self.pop(i)
2022-08-21 11:58:31 +02:00
def add(self, ts, value):
while len(self.data) >= self.limit:
self.shrink()
2022-08-21 11:58:31 +02:00
if len(self.data) == 0 or ts >= self.data[-1][0]:
self.data.append((ts, value,))
else:
# Shouldn't happen that often, so I do a simple linear search instead
# of a binary search
for i in range(len(self.data)):
if self.data[i][0] >= ts:
break
self.data.insert(i, (ts, value,))
def save(self):
with open(self.filename, "r+") as fh:
fcntl.flock(fh, fcntl.LOCK_EX)
json.dump({"description": self.description, "data": self.data}, fh)
fh.truncate()
2022-12-11 22:58:04 +01:00
with open(self.queue_dir + "/" + self.id, "w") as fh:
pass
2022-08-21 11:58:31 +02:00
def rebuild_index(self):
t0 = time.time()
index = {}
for fn in glob.glob(self.base_dir + "/*"):
(_, _, hash) = fn.rpartition("/")
with open(fn, "r") as fh:
fcntl.flock(fh, fcntl.LOCK_SH)
2024-08-24 22:51:29 +02:00
try:
d = json.load(fh)
except json.decoder.JSONDecodeError as e:
log.exception(f"Cannot decode JSON in {fn}: {e}")
raise
2022-08-21 11:58:31 +02:00
for k, v in d["description"].items():
d1 = index.setdefault(k, {})
d2 = d1.setdefault(v, [])
d2.append(hash)
with open(self.base_dir + "/.index", "r+") as fh:
fcntl.flock(fh, fcntl.LOCK_EX)
json.dump(index, fh)
t1 = time.time()
print("index rebuilt in", t1 - t0, "seconds")
@classmethod
2022-09-02 14:06:47 +02:00
def find(cls, match):
result = None
with open(cls.base_dir + "/.index", "r") as fh:
2022-08-21 11:58:31 +02:00
fcntl.flock(fh, fcntl.LOCK_SH)
2022-09-02 14:06:47 +02:00
index = json.load(fh)
for d, v in match.items():
ts = set(index[d][v])
if result is None:
result = ts
else:
result &= ts
return result
2022-08-21 11:58:31 +02:00
2022-11-20 18:43:45 +01:00
def data_json_by_row(self):
d = []
for dp in self.data:
d.append({
"t": dp[0],
"v": dp[1],
"utc": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(dp[0])),
})
return json.dumps(d)