#!/usr/bin/env python3 """ audit_cleanup.py — Audit JSONL Retention Enforcement Finds ops/audit/tool_audit_YYYY-MM-DD.jsonl files older than `retention_days`, then either: - dry_run=True → report only, no changes - archive_gzip=True → compress to .jsonl.gz, delete original - otherwise → delete original Exit codes: 0 — success (including dry_run) 1 — script error Usage: python3 ops/scripts/audit_cleanup.py \ --retention-days 30 \ --audit-dir ops/audit \ [--dry-run] [--archive-gzip] [--verbose] Also callable programmatically via run_cleanup() for Job Orchestrator. """ from __future__ import annotations import argparse import datetime import gzip import json import logging import os import re import shutil import sys from pathlib import Path from typing import Dict, List, Optional logger = logging.getLogger(__name__) _DATE_PAT = re.compile(r"tool_audit_(\d{4}-\d{2}-\d{2})\.jsonl$") # ─── Core logic ─────────────────────────────────────────────────────────────── def find_eligible_files( audit_dir: Path, cutoff_date: datetime.date, ) -> List[Path]: """Return JSONL files whose embedded date < cutoff_date.""" eligible = [] if not audit_dir.exists(): return eligible for fpath in sorted(audit_dir.glob("tool_audit_*.jsonl")): m = _DATE_PAT.search(fpath.name) if not m: continue try: file_date = datetime.date.fromisoformat(m.group(1)) except ValueError: continue if file_date < cutoff_date: eligible.append(fpath) return eligible def run_cleanup( retention_days: int, audit_dir: str = "ops/audit", dry_run: bool = True, archive_gzip: bool = False, repo_root: Optional[str] = None, verbose: bool = False, ) -> Dict: """ Main cleanup routine. Returns: {scanned, eligible, deleted, archived, bytes_freed, dry_run, errors} """ if retention_days < 1 or retention_days > 365: raise ValueError(f"retention_days must be 1–365, got {retention_days}") root = Path(repo_root or os.getenv("REPO_ROOT", ".")).resolve() dir_path = (root / audit_dir).resolve() # Path traversal guard if not str(dir_path).startswith(str(root)): raise ValueError(f"audit_dir '{audit_dir}' resolves outside repo root") today = datetime.date.today() cutoff = today - datetime.timedelta(days=retention_days) all_jsonl = list(sorted(dir_path.glob("tool_audit_*.jsonl"))) eligible = find_eligible_files(dir_path, cutoff) deleted = 0 archived = 0 bytes_freed = 0 errors: List[str] = [] for fpath in eligible: size = fpath.stat().st_size if dry_run: action = "archive" if archive_gzip else "delete" if verbose: logger.info("[dry_run] Would %s: %s (%d bytes)", action, fpath.name, size) bytes_freed += size if archive_gzip: archived += 1 else: deleted += 1 continue try: if archive_gzip: gz_path = fpath.with_suffix(".jsonl.gz") with open(fpath, "rb") as f_in: with gzip.open(gz_path, "wb") as f_out: shutil.copyfileobj(f_in, f_out) fpath.unlink() archived += 1 bytes_freed += size if verbose: logger.info("Archived: %s → %s (%d bytes)", fpath.name, gz_path.name, size) else: fpath.unlink() deleted += 1 bytes_freed += size if verbose: logger.info("Deleted: %s (%d bytes)", fpath.name, size) except Exception as e: msg = f"Error processing {fpath.name}: {e}" logger.warning(msg) errors.append(msg) result = { "scanned": len(all_jsonl), "eligible": len(eligible), "deleted": deleted, "archived": archived, "bytes_freed": bytes_freed, "dry_run": dry_run, "retention_days": retention_days, "cutoff_date": cutoff.isoformat(), "audit_dir": str(dir_path), "errors": errors, } if verbose or not dry_run: summary = ( f"audit_cleanup: scanned={result['scanned']}, eligible={result['eligible']}, " f"{'[DRY RUN] ' if dry_run else ''}" f"deleted={deleted}, archived={archived}, freed={bytes_freed} bytes" ) logger.info(summary) return result # ─── CLI entrypoint ─────────────────────────────────────────────────────────── def _parse_args(argv=None) -> argparse.Namespace: p = argparse.ArgumentParser( description="Audit JSONL retention cleanup", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) p.add_argument("--retention-days", type=int, default=30, help="Delete/archive files older than this many days") p.add_argument("--audit-dir", default="ops/audit", help="Relative path to audit directory") p.add_argument("--repo-root", default=None, help="Repo root (default: REPO_ROOT env or cwd)") p.add_argument("--dry-run", action="store_true", help="Report only; do not delete or archive") p.add_argument("--archive-gzip", action="store_true", help="Compress to .jsonl.gz before deleting") p.add_argument("--verbose", action="store_true", help="Verbose output") p.add_argument("--output-json", action="store_true", help="Print JSON result to stdout") return p.parse_args(argv) def main(argv=None): logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s audit_cleanup %(message)s", stream=sys.stderr, ) args = _parse_args(argv) result = run_cleanup( retention_days=args.retention_days, audit_dir=args.audit_dir, dry_run=args.dry_run, archive_gzip=args.archive_gzip, repo_root=args.repo_root, verbose=args.verbose, ) if args.output_json: print(json.dumps(result, indent=2)) else: status = "DRY RUN" if result["dry_run"] else "DONE" print( f"[{status}] scanned={result['scanned']} eligible={result['eligible']} " f"deleted={result['deleted']} archived={result['archived']} " f"freed={result['bytes_freed']}B" ) if result["errors"]: sys.exit(1) if __name__ == "__main__": main()