ops(dev): add audit retention pruning script
Made-with: Cursor
This commit is contained in:
163
ops/prune_audit_db.py
Executable file
163
ops/prune_audit_db.py
Executable file
@@ -0,0 +1,163 @@
|
||||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
|
||||
|
||||
def _utc_now() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def _fmt_ts(dt: datetime) -> str:
|
||||
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
|
||||
def _resolve_data_dir(cli_data_dir: str | None) -> Path:
|
||||
raw = (cli_data_dir or os.getenv("SOFIIA_DATA_DIR") or "/app/data").strip()
|
||||
return Path(raw).expanduser().resolve()
|
||||
|
||||
|
||||
def _parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(
|
||||
description="Prune old audit_events records from sofiia SQLite DB."
|
||||
)
|
||||
p.add_argument(
|
||||
"--data-dir",
|
||||
default=None,
|
||||
help="Path to SOFIIA_DATA_DIR. Defaults to env SOFIIA_DATA_DIR or /app/data.",
|
||||
)
|
||||
p.add_argument(
|
||||
"--retention-days",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Retention period in days. Defaults to SOFIIA_AUDIT_RETENTION_DAYS or 90.",
|
||||
)
|
||||
p.add_argument(
|
||||
"--batch-size",
|
||||
type=int,
|
||||
default=5000,
|
||||
help="Delete batch size (default: 5000).",
|
||||
)
|
||||
p.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Report candidates only, do not delete.",
|
||||
)
|
||||
p.add_argument(
|
||||
"--vacuum",
|
||||
action="store_true",
|
||||
help="Run VACUUM after deletion.",
|
||||
)
|
||||
p.add_argument(
|
||||
"--yes",
|
||||
action="store_true",
|
||||
help="Reserved for non-interactive confirmation (no-op in current script).",
|
||||
)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def _check_table_exists(conn: sqlite3.Connection, table_name: str) -> bool:
|
||||
row = conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
|
||||
(table_name,),
|
||||
).fetchone()
|
||||
return bool(row)
|
||||
|
||||
|
||||
def _candidate_stats(conn: sqlite3.Connection, cutoff_ts: str) -> Tuple[int, str | None, str | None]:
|
||||
row = conn.execute(
|
||||
"SELECT COUNT(*), MIN(ts), MAX(ts) FROM audit_events WHERE ts < ?",
|
||||
(cutoff_ts,),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return 0, None, None
|
||||
return int(row[0] or 0), row[1], row[2]
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = _parse_args()
|
||||
|
||||
data_dir = _resolve_data_dir(args.data_dir)
|
||||
db_path = (data_dir / "sofiia.db").resolve()
|
||||
retention_days = args.retention_days
|
||||
if retention_days is None:
|
||||
retention_days = int(os.getenv("SOFIIA_AUDIT_RETENTION_DAYS", "90"))
|
||||
retention_days = max(1, int(retention_days))
|
||||
batch_size = max(1, int(args.batch_size))
|
||||
|
||||
cutoff_dt = _utc_now() - timedelta(days=retention_days)
|
||||
cutoff_ts = _fmt_ts(cutoff_dt)
|
||||
|
||||
print("Audit retention pruning")
|
||||
print(f" db_path: {db_path}")
|
||||
print(f" retention_days: {retention_days}")
|
||||
print(f" cutoff_ts: {cutoff_ts}")
|
||||
print(f" batch_size: {batch_size}")
|
||||
print(f" dry_run: {bool(args.dry_run)}")
|
||||
print(f" vacuum: {bool(args.vacuum)}")
|
||||
|
||||
if not db_path.exists():
|
||||
print(f"ERROR: DB file not found: {db_path}")
|
||||
return 1
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.execute("PRAGMA busy_timeout = 5000")
|
||||
except Exception as exc:
|
||||
print(f"ERROR: cannot open DB: {exc}")
|
||||
return 1
|
||||
|
||||
try:
|
||||
if not _check_table_exists(conn, "audit_events"):
|
||||
print("ERROR: table 'audit_events' not found in DB schema")
|
||||
return 1
|
||||
|
||||
total_candidates, min_ts, max_ts = _candidate_stats(conn, cutoff_ts)
|
||||
print(f" candidates: {total_candidates}")
|
||||
print(f" candidates_min_ts: {min_ts or '-'}")
|
||||
print(f" candidates_max_ts: {max_ts or '-'}")
|
||||
|
||||
if args.dry_run:
|
||||
print("Dry-run complete. No rows were deleted.")
|
||||
return 0
|
||||
|
||||
deleted_total = 0
|
||||
batch_no = 0
|
||||
while True:
|
||||
cur = conn.execute(
|
||||
"DELETE FROM audit_events WHERE id IN ("
|
||||
"SELECT id FROM audit_events WHERE ts < ? ORDER BY ts ASC LIMIT ?"
|
||||
")",
|
||||
(cutoff_ts, batch_size),
|
||||
)
|
||||
deleted = int(cur.rowcount or 0)
|
||||
if deleted <= 0:
|
||||
break
|
||||
conn.commit()
|
||||
deleted_total += deleted
|
||||
batch_no += 1
|
||||
print(f" batch {batch_no}: deleted {deleted} (total={deleted_total})")
|
||||
|
||||
print(f"Deletion complete. Total deleted: {deleted_total}")
|
||||
|
||||
if args.vacuum:
|
||||
print("Running VACUUM...")
|
||||
conn.execute("VACUUM")
|
||||
print("VACUUM complete.")
|
||||
|
||||
return 0
|
||||
except Exception as exc:
|
||||
print(f"ERROR: prune failed: {exc}")
|
||||
return 1
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user