From 6d689666e98a28c30d416114e0621d553fd88ea8 Mon Sep 17 00:00:00 2001 From: "Peter J. Holzer" Date: Sun, 21 Aug 2022 11:58:31 +0200 Subject: [PATCH] Implement json prototype of LTsDb --- doc/multiple-timeseries | 17 +++++++++ ltsdb_json.py | 81 +++++++++++++++++++++++++++++++++++++++++ ltsdb_test | 20 ++++++++++ 3 files changed, 118 insertions(+) create mode 100644 doc/multiple-timeseries create mode 100644 ltsdb_json.py create mode 100755 ltsdb_test diff --git a/doc/multiple-timeseries b/doc/multiple-timeseries new file mode 100644 index 0000000..f94dd5d --- /dev/null +++ b/doc/multiple-timeseries @@ -0,0 +1,17 @@ +PoC, not optimized for performance. +Store data as JSON +Use one file per timeseries +Metadata is a dict of dimension/value pairs. +We can find each file quickly by using a hash of the metadata as the +filename +Can we find all timeseries which match only some of the dimensions +(e.g. response times of a particular service across all nodes)? +Opening each file is going to be slow pretty fast. So we need an index. +We don't expect new timeseries to spring into existence all that often, +so I guess for now we can just rewrite the whole index when a new +timeseries is added. Structure is pretty simple: Two levels of +dict (dimension, value) and then a list of matching timeseries. + +Using an RDBMS doesn't seem like a good idea, + +I'll design an efficient binary format later. diff --git a/ltsdb_json.py b/ltsdb_json.py new file mode 100644 index 0000000..19e13ff --- /dev/null +++ b/ltsdb_json.py @@ -0,0 +1,81 @@ +#!/usr/bin/python3 + +import fcntl +import glob +import hashlib +import json +import time + +class LTS: + base_dir = "data" + limit = 1000 + + def __init__(self, description): + # Oh, I think we need to be able to load by hash, too + canonical_description = {x: description[x] for x in sorted(description.keys())} + self.description = canonical_description + serialized_description = json.dumps(canonical_description) + m = hashlib.sha256() + m.update(bytes(serialized_description, encoding="UTF-8")) + self.filename = self.base_dir + "/" + m.hexdigest() + try: + with open(self.filename, "r") as fh: + fcntl.flock(fh, fcntl.LOCK_SH) + d = json.load(fh) + self.new = False + self.data = d["data"] + except FileNotFoundError as e: + self.new = True + self.data = [] + # Create the file immediately. Makes saving later simpler if we can + # assume it exists + with open(self.filename, "x+") as fh: + fcntl.flock(fh, fcntl.LOCK_EX) + json.dump({"description": self.description, "data": self.data}, fh) + self.rebuild_index() + + def add(self, ts, value): + while len(self.data) >= self.limit: + r = random.randrange(0, self.limit) + self.data.pop(r) + + if len(self.data) == 0 or ts >= self.data[-1][0]: + self.data.append((ts, value,)) + else: + # Shouldn't happen that often, so I do a simple linear search instead + # of a binary search + for i in range(len(self.data)): + if self.data[i][0] >= ts: + break + self.data.insert(i, (ts, value,)) + + def save(self): + with open(self.filename, "r+") as fh: + fcntl.flock(fh, fcntl.LOCK_EX) + json.dump({"description": self.description, "data": self.data}, fh) + fh.truncate() + + def rebuild_index(self): + t0 = time.time() + index = {} + for fn in glob.glob(self.base_dir + "/*"): + (_, _, hash) = fn.rpartition("/") + with open(fn, "r") as fh: + fcntl.flock(fh, fcntl.LOCK_SH) + d = json.load(fh) + for k, v in d["description"].items(): + d1 = index.setdefault(k, {}) + d2 = d1.setdefault(v, []) + d2.append(hash) + with open(self.base_dir + "/.index", "r+") as fh: + fcntl.flock(fh, fcntl.LOCK_EX) + json.dump(index, fh) + t1 = time.time() + print("index rebuilt in", t1 - t0, "seconds") + + @classmethod + def find(self, match): + with open(self.base_dir + "/.index", "r") as fh: + fcntl.flock(fh, fcntl.LOCK_SH) + index = json.dump(fh) + diff --git a/ltsdb_test b/ltsdb_test new file mode 100755 index 0000000..373d7c1 --- /dev/null +++ b/ltsdb_test @@ -0,0 +1,20 @@ +#!/usr/bin/python3 + +from ltsdb_json import LTS + +ts1 = LTS({"hostname": "rorschach.hjp.at", "measure": "uptime"}) +ts1.add(1661026122, 4) +ts1.save() + +ts1 = LTS({"hostname": "rorschach.hjp.at", "website": "i12e.hjp.at", "measure": "rtt"}) +ts1.add(1661026122, 0.06) +ts1.save() + +ts1 = LTS({"hostname": "rorschach.hjp.at", "measure": "uptime"}) +ts1.add(1661026361, 5) +ts1.save() + +ts1 = LTS({"hostname": "charly.wsr.ac.at", "website": "www.wifo.ac.at", "measure": "rtt"}) +ts1.add(1661026122, 0.347) +ts1.save() +