Watch multiple files or directories instead of a single directory

This commit is contained in:
Peter J. Holzer 2021-12-18 11:25:56 +01:00 committed by Peter J. Holzer
parent 17c6e87a89
commit 8954588ab4
1 changed files with 32 additions and 19 deletions

45
kitsune
View File

@ -3,44 +3,57 @@
import datetime
import os
import sys
import stat
import time
class WatchedFile:
def __init__(self, name):
self.name = name
def __init__(self, path):
self.path = path
self.reopen()
def reopen(self):
self.fd = open(self.name, errors='replace')
self.fd = open(self.path, errors='replace')
self.fileno = self.fd.fileno()
self.last_ts = 0
def format_ts(ts_ns):
return datetime.datetime.fromtimestamp(ts_ns / 1E9).strftime("%H:%M:%S.%f")
def watch(dir):
os.chdir(dir)
last_ts = 0
def watch(args):
watched_dirs = []
watched_files = {}
filename_length = 0
for a in args:
st = os.stat(a)
if stat.S_ISDIR(st.st_mode):
watched_dirs.append(a)
elif stat.S_ISREG(st.st_mode):
watched_files[a] = WatchedFile(a)
if len(a) > filename_length:
filename_length = len(a)
else:
print(a, "is not a file or directory - skipping", file=sys.stderr)
last_ts = 0
while True:
# are there new files?
st = os.stat(".")
for d in watched_dirs:
st = os.stat(d)
if st.st_mtime_ns > last_ts:
last_ts = st.st_mtime_ns
for de in os.scandir("."):
for de in os.scandir(d):
if de.is_file():
if de.name not in watched_files:
watched_files[de.name] = WatchedFile(de.name)
if len(de.name) > filename_length:
filename_length = len(de.name)
if de.path not in watched_files:
watched_files[de.path] = WatchedFile(de.path)
if len(de.path) > filename_length:
filename_length = len(de.path)
# has any of the files changed
for f in watched_files.values():
# XXX - we should also detect replaced files.
st = os.stat(f.fileno)
try:
stf = os.stat(f.name)
stf = os.stat(f.path)
if stf.st_ino != st.st_ino:
f.reopen()
except FileNotFoundError:
@ -55,10 +68,10 @@ def watch(dir):
if lines[-1] == "":
lines.pop()
for ln in lines:
print(f"{f.name:{filename_length}}", format_ts(f.last_ts), ln)
print(f"{f.path:{filename_length}}", format_ts(f.last_ts), ln)
time.sleep(0.1)
if __name__ == "__main__":
dir = sys.argv[1] if len(sys.argv) > 1 else "."
args = sys.argv[1:] if len(sys.argv) > 1 else ["."]
watch(dir)
watch(args)