Implement json prototype of LTsDb
This commit is contained in:
parent
1a4d8ef471
commit
6d689666e9
|
@ -0,0 +1,17 @@
|
||||||
|
PoC, not optimized for performance.
|
||||||
|
Store data as JSON
|
||||||
|
Use one file per timeseries
|
||||||
|
Metadata is a dict of dimension/value pairs.
|
||||||
|
We can find each file quickly by using a hash of the metadata as the
|
||||||
|
filename
|
||||||
|
Can we find all timeseries which match only some of the dimensions
|
||||||
|
(e.g. response times of a particular service across all nodes)?
|
||||||
|
Opening each file is going to be slow pretty fast. So we need an index.
|
||||||
|
We don't expect new timeseries to spring into existence all that often,
|
||||||
|
so I guess for now we can just rewrite the whole index when a new
|
||||||
|
timeseries is added. Structure is pretty simple: Two levels of
|
||||||
|
dict (dimension, value) and then a list of matching timeseries.
|
||||||
|
|
||||||
|
Using an RDBMS doesn't seem like a good idea,
|
||||||
|
|
||||||
|
I'll design an efficient binary format later.
|
|
@ -0,0 +1,81 @@
|
||||||
|
#!/usr/bin/python3
|
||||||
|
|
||||||
|
import fcntl
|
||||||
|
import glob
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
|
||||||
|
class LTS:
|
||||||
|
base_dir = "data"
|
||||||
|
limit = 1000
|
||||||
|
|
||||||
|
def __init__(self, description):
|
||||||
|
# Oh, I think we need to be able to load by hash, too
|
||||||
|
canonical_description = {x: description[x] for x in sorted(description.keys())}
|
||||||
|
self.description = canonical_description
|
||||||
|
serialized_description = json.dumps(canonical_description)
|
||||||
|
m = hashlib.sha256()
|
||||||
|
m.update(bytes(serialized_description, encoding="UTF-8"))
|
||||||
|
self.filename = self.base_dir + "/" + m.hexdigest()
|
||||||
|
try:
|
||||||
|
with open(self.filename, "r") as fh:
|
||||||
|
fcntl.flock(fh, fcntl.LOCK_SH)
|
||||||
|
d = json.load(fh)
|
||||||
|
self.new = False
|
||||||
|
self.data = d["data"]
|
||||||
|
except FileNotFoundError as e:
|
||||||
|
self.new = True
|
||||||
|
self.data = []
|
||||||
|
# Create the file immediately. Makes saving later simpler if we can
|
||||||
|
# assume it exists
|
||||||
|
with open(self.filename, "x+") as fh:
|
||||||
|
fcntl.flock(fh, fcntl.LOCK_EX)
|
||||||
|
json.dump({"description": self.description, "data": self.data}, fh)
|
||||||
|
self.rebuild_index()
|
||||||
|
|
||||||
|
def add(self, ts, value):
|
||||||
|
while len(self.data) >= self.limit:
|
||||||
|
r = random.randrange(0, self.limit)
|
||||||
|
self.data.pop(r)
|
||||||
|
|
||||||
|
if len(self.data) == 0 or ts >= self.data[-1][0]:
|
||||||
|
self.data.append((ts, value,))
|
||||||
|
else:
|
||||||
|
# Shouldn't happen that often, so I do a simple linear search instead
|
||||||
|
# of a binary search
|
||||||
|
for i in range(len(self.data)):
|
||||||
|
if self.data[i][0] >= ts:
|
||||||
|
break
|
||||||
|
self.data.insert(i, (ts, value,))
|
||||||
|
|
||||||
|
def save(self):
|
||||||
|
with open(self.filename, "r+") as fh:
|
||||||
|
fcntl.flock(fh, fcntl.LOCK_EX)
|
||||||
|
json.dump({"description": self.description, "data": self.data}, fh)
|
||||||
|
fh.truncate()
|
||||||
|
|
||||||
|
def rebuild_index(self):
|
||||||
|
t0 = time.time()
|
||||||
|
index = {}
|
||||||
|
for fn in glob.glob(self.base_dir + "/*"):
|
||||||
|
(_, _, hash) = fn.rpartition("/")
|
||||||
|
with open(fn, "r") as fh:
|
||||||
|
fcntl.flock(fh, fcntl.LOCK_SH)
|
||||||
|
d = json.load(fh)
|
||||||
|
for k, v in d["description"].items():
|
||||||
|
d1 = index.setdefault(k, {})
|
||||||
|
d2 = d1.setdefault(v, [])
|
||||||
|
d2.append(hash)
|
||||||
|
with open(self.base_dir + "/.index", "r+") as fh:
|
||||||
|
fcntl.flock(fh, fcntl.LOCK_EX)
|
||||||
|
json.dump(index, fh)
|
||||||
|
t1 = time.time()
|
||||||
|
print("index rebuilt in", t1 - t0, "seconds")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def find(self, match):
|
||||||
|
with open(self.base_dir + "/.index", "r") as fh:
|
||||||
|
fcntl.flock(fh, fcntl.LOCK_SH)
|
||||||
|
index = json.dump(fh)
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
#!/usr/bin/python3
|
||||||
|
|
||||||
|
from ltsdb_json import LTS
|
||||||
|
|
||||||
|
ts1 = LTS({"hostname": "rorschach.hjp.at", "measure": "uptime"})
|
||||||
|
ts1.add(1661026122, 4)
|
||||||
|
ts1.save()
|
||||||
|
|
||||||
|
ts1 = LTS({"hostname": "rorschach.hjp.at", "website": "i12e.hjp.at", "measure": "rtt"})
|
||||||
|
ts1.add(1661026122, 0.06)
|
||||||
|
ts1.save()
|
||||||
|
|
||||||
|
ts1 = LTS({"hostname": "rorschach.hjp.at", "measure": "uptime"})
|
||||||
|
ts1.add(1661026361, 5)
|
||||||
|
ts1.save()
|
||||||
|
|
||||||
|
ts1 = LTS({"hostname": "charly.wsr.ac.at", "website": "www.wifo.ac.at", "measure": "rtt"})
|
||||||
|
ts1.add(1661026122, 0.347)
|
||||||
|
ts1.save()
|
||||||
|
|
Loading…
Reference in New Issue