postgres-monitor/audit/changed_tables

84 lines
2.6 KiB
Plaintext
Raw Normal View History

#!/usr/bin/python3
import argparse
import datetime
import logging
import os
import psycopg2
from psycopg2.extras import NamedTupleCursor
from psycopg2.extensions import quote_ident
import sys
import hashlib
parser = argparse.ArgumentParser(description='Record changes in tables')
parser.add_argument('--dbhost')
parser.add_argument('--dbname')
parser.add_argument('--dbuser')
parser.add_argument('--schema', default='public')
a = parser.parse_args()
conn = ""
if a.dbhost:
conn += "host='%s' " % a.dbhost
if a.dbname:
conn += "dbname='%s' " % a.dbname
if a.dbuser:
conn += "user='%s' " % a.dbuser
db = psycopg2.connect(conn)
csr = db.cursor(cursor_factory=NamedTupleCursor)
# Create table if necessary
audit_table = quote_ident(a.schema, csr) + ".changed_tables"
try:
csr.execute("select * from " + audit_table)
csr.fetchone()
except Exception as e:
db.rollback()
csr.execute("create table " + audit_table +
"""
(
id serial,
table_schema text,
table_name text,
ts timestamptz,
sha256 text
)
"""
)
db.commit()
csr.execute("select * from information_schema.tables where table_schema = %s", (a.schema,))
for table in csr.fetchall():
print(table.table_schema, table.table_name, end="", flush=True)
q = "select * from " + quote_ident(table.table_schema, csr) + "." + quote_ident(table.table_name, csr)
hash = hashlib.sha256()
csr = db.cursor()
csr.execute(q)
for r in csr:
r_bytes = ("\N{US}".join(map(lambda x: str(x), r)) + "\N{RS}").encode()
hash.update(r_bytes)
digest = hash.hexdigest()
csr = db.cursor(cursor_factory=NamedTupleCursor)
q = """
with m as (
select table_schema, table_name, max(ts) as ts
from {audit_table}
where table_schema=%(table_schema)s and table_name=%(table_name)s
group by table_schema, table_name
)
select * from {audit_table} natural join m
""".format(audit_table=audit_table)
csr.execute(q, {"table_schema": table.table_schema, "table_name": table.table_name})
old = csr.fetchone()
if not old or old.sha256 != digest:
print(" changed", digest, end="")
q = "insert into {audit_table}(table_schema, table_name, ts, sha256) values(%s, %s, now(), %s)".format(audit_table=audit_table)
csr.execute(q, (table.table_schema, table.table_name, digest))
db.commit()
db.rollback() # to close transaction
print(flush=True)
# vim: tw=99