#!/usr/bin/python3 import fcntl import glob import hashlib import json import logging import math import random import time log = logging.getLogger() class LTS: base_dir = "data" queue_dir = "queue" limit = 1000 def __init__(self, description=None, id=None): if description: canonical_description = {x: description[x] for x in sorted(description.keys())} self.description = canonical_description serialized_description = json.dumps(canonical_description) m = hashlib.sha256() m.update(bytes(serialized_description, encoding="UTF-8")) id = m.hexdigest() self.filename = self.base_dir + "/" + id self.id = id try: with open(self.filename, "r") as fh: fcntl.flock(fh, fcntl.LOCK_SH) d = json.load(fh) self.new = False self.description = d["description"] self.data = d["data"] except FileNotFoundError as e: self.new = True self.data = [] # Create the file immediately. Makes saving later simpler if we can # assume it exists with open(self.filename, "x+") as fh: fcntl.flock(fh, fcntl.LOCK_EX) json.dump({"description": self.description, "data": self.data}, fh) log.info(f"Created {self.filename}") self.rebuild_index() except json.decoder.JSONDecodeError as e: log.exception(f"Cannot decode JSON in {self.filename}: {e}") raise def pop(self, i): # Pop the element at index i and adjust the min/max values of the # neighbours. # We might also want to adjust the value of the neighbours to some # (weighted) average, but I'm not sure if this is actually a good idea. data = self.data old = data.pop(i) # after that the neighbours are at i-1, i min_v = old[2] if len(old) >= 4 else old[1] max_v = old[3] if len(old) >= 4 else old[1] if i > 0: if len(data[i-1]) == 2: data[i-1] = [data[i-1][0], data[i-1][1], data[i-1][1], data[i-1][1]] if min_v < data[i-1][2]: data[i-1][2] = min_v if max_v > data[i-1][3]: data[i-1][3] = max_v if i < len(data): if len(data[i]) == 2: data[i] = [data[i][0], data[i][1], data[i][1], data[i][1]] if min_v < data[i][2]: data[i][2] = min_v if max_v > data[i][3]: data[i][3] = max_v return old def shrink(self): # Remove one element in such a way that the distributions gets closer # to an exponential curve through the first and the last few data # points. # To do this we compute the ideal t value at each point and compare it # to the real value. We remove the first point which sticks out too # much (I'm tempted to dub this the barber's algorithm). # This extremely inefficient but it's simple to understand and works. data = self.data n = len(data) t_last = data[-1][0] dt = max((t_last - data[-5][0]) / 4, 1) log.debug("dt = %s, n = %s", dt, n) k = math.log((t_last - data[0][0]) / dt / n + 1) for i in range(1, n): t_ideal = (math.exp(k * (n - i)/n) - 1) * (n * dt) if t_last - data[i][0] > t_ideal: log.debug("%s - %s > %s -> popping element %s", t_last, data[i][0], t_ideal, i) self.pop(i) break else: # Well, it works mostly. Sometimes all the real points are below # the curve but we have to remove one anyway. This needs to be # heavily biased towards newer data points, but we don't want to # delete the few newest data points so choose one at random from a # narrow range just before that. i = random.randrange(int(n*0.98), int(n*0.99)) log.debug("no match -> popping element %s", i) self.pop(i) def add(self, ts, value): while len(self.data) >= self.limit: self.shrink() if len(self.data) == 0 or ts >= self.data[-1][0]: self.data.append((ts, value,)) else: # Shouldn't happen that often, so I do a simple linear search instead # of a binary search for i in range(len(self.data)): if self.data[i][0] >= ts: break self.data.insert(i, (ts, value,)) def save(self): with open(self.filename, "r+") as fh: fcntl.flock(fh, fcntl.LOCK_EX) json.dump({"description": self.description, "data": self.data}, fh) fh.truncate() with open(self.queue_dir + "/" + self.id, "w") as fh: pass def rebuild_index(self): t0 = time.time() index = {} for fn in glob.glob(self.base_dir + "/*"): (_, _, hash) = fn.rpartition("/") with open(fn, "r") as fh: fcntl.flock(fh, fcntl.LOCK_SH) try: d = json.load(fh) except json.decoder.JSONDecodeError as e: log.exception(f"Cannot decode JSON in {fn}: {e}") raise for k, v in d["description"].items(): d1 = index.setdefault(k, {}) d2 = d1.setdefault(v, []) d2.append(hash) with open(self.base_dir + "/.index", "r+") as fh: fcntl.flock(fh, fcntl.LOCK_EX) json.dump(index, fh) t1 = time.time() print("index rebuilt in", t1 - t0, "seconds") @classmethod def find(cls, match): result = None with open(cls.base_dir + "/.index", "r") as fh: fcntl.flock(fh, fcntl.LOCK_SH) index = json.load(fh) for d, v in match.items(): ts = set(index[d][v]) if result is None: result = ts else: result &= ts return result def data_json_by_row(self): d = [] for dp in self.data: d.append({ "t": dp[0], "v": dp[1], "utc": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(dp[0])), }) return json.dumps(d)