kitsune/kitsune

117 lines
4.1 KiB
Python
Executable File

#!/usr/bin/python3
import argparse
import datetime
import fnmatch
import os
import sys
import stat
import time
class WatchedFile:
def __init__(self, path, seek_to_end):
self.path = path
self.seek_to_end = seek_to_end
stf = os.stat(self.path)
self.st_ctime_ns = stf.st_ctime_ns
self.st_size = stf.st_size
self.fileno = None
if stf.st_ctime_ns >= time.time_ns() - 1E9:
self.reopen()
def reopen(self):
self.fd = open(self.path, errors='replace')
self.fileno = self.fd.fileno()
self.last_ts = 0
if self.seek_to_end:
self.fd.seek(self.st_size, 0)
self.seek_to_end = False
def format_ts(ts_ns):
return datetime.datetime.fromtimestamp(ts_ns / 1E9).strftime("%H:%M:%S.%f")
def watch(args):
seek_to_end = not args.start
watched_dirs = []
watched_files = {}
filename_length = 0
for a in args.files:
st = os.stat(a)
if stat.S_ISDIR(st.st_mode):
watched_dirs.append(a)
elif stat.S_ISREG(st.st_mode):
watched_files[a] = WatchedFile(a, seek_to_end)
if len(a) > filename_length:
filename_length = len(a)
else:
print(a, "is not a file or directory - skipping", file=sys.stderr)
last_ts = 0
while True:
# are there new files?
for d in watched_dirs:
st = os.stat(d)
if st.st_mtime_ns > last_ts:
last_ts = st.st_mtime_ns
for de in os.scandir(d):
if de.is_file():
if args.match_filename is None or fnmatch.fnmatch(de.name, args.match_filename):
if de.path not in watched_files:
watched_files[de.path] = WatchedFile(de.path, seek_to_end)
if len(de.path) > filename_length:
filename_length = len(de.path)
# has any of the files changed
for f in watched_files.values():
# XXX - we should also detect replaced files.
if f.fileno:
st = os.stat(f.fileno)
try:
stf = os.stat(f.path)
if stf.st_ino != st.st_ino:
f.reopen()
st = os.stat(f.fileno)
except FileNotFoundError:
# ignore,
# or maybe remove from watched_files?
# or just mark as deleted?
pass
if st.st_mtime_ns > f.last_ts:
if st.st_size < f.fd.tell():
# We are beyond the end of the file, so it has probably
# been truncated and rewritten - read from beginning
f.fd.seek(0, 0)
dead = "" if st.st_nlink == 0 else " "
f.last_ts = st.st_mtime_ns
new_content = f.fd.read()
lines = new_content.split("\n")
if lines[-1] == "":
lines.pop()
for ln in lines:
print(f"{f.path:{filename_length}}", format_ts(f.last_ts), dead, ln)
else:
try:
stf = os.stat(f.path)
if stf.st_ctime_ns != f.st_ctime_ns:
f.reopen()
except FileNotFoundError:
# ignore,
# or maybe remove from watched_files?
# or just mark as deleted?
pass
time.sleep(0.1)
seek_to_end = False
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument("--start", action='store_true',
help="dump existing files from the start")
ap.add_argument("--match-filename",
help="follow only matching files in directories",
metavar="GLOB-PATTERN")
ap.add_argument("files", nargs="*", default=["."],
metavar="file")
args = ap.parse_args()
watch(args)