Files
microdao-daarion/ops/scripts/audit_cleanup.py
Apple 67225a39fa docs(platform): add policy configs, runbooks, ops scripts and platform documentation
Config policies (16 files): alert_routing, architecture_pressure, backlog,
cost_weights, data_governance, incident_escalation, incident_intelligence,
network_allowlist, nodes_registry, observability_sources, rbac_tools_matrix,
release_gate, risk_attribution, risk_policy, slo_policy, tool_limits, tools_rollout

Ops (22 files): Caddyfile, calendar compose, grafana voice dashboard,
deployments/incidents logs, runbooks for alerts/audit/backlog/incidents/sofiia/voice,
cron jobs, scripts (alert_triage, audit_cleanup, migrate_*, governance, schedule),
task_registry, voice alerts/ha/latency/policy

Docs (30+ files): HUMANIZED_STEPAN v2.7-v3 changelogs and runbooks,
NODA1/NODA2 status and setup, audit index and traces, backlog, incident,
supervisor, tools, voice, opencode, release, risk, aistalk, spacebot

Made-with: Cursor
2026-03-03 07:14:53 -08:00

216 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
audit_cleanup.py — Audit JSONL Retention Enforcement
Finds ops/audit/tool_audit_YYYY-MM-DD.jsonl files older than `retention_days`,
then either:
- dry_run=True → report only, no changes
- archive_gzip=True → compress to .jsonl.gz, delete original
- otherwise → delete original
Exit codes:
0 — success (including dry_run)
1 — script error
Usage:
python3 ops/scripts/audit_cleanup.py \
--retention-days 30 \
--audit-dir ops/audit \
[--dry-run] [--archive-gzip] [--verbose]
Also callable programmatically via run_cleanup() for Job Orchestrator.
"""
from __future__ import annotations
import argparse
import datetime
import gzip
import json
import logging
import os
import re
import shutil
import sys
from pathlib import Path
from typing import Dict, List, Optional
logger = logging.getLogger(__name__)
_DATE_PAT = re.compile(r"tool_audit_(\d{4}-\d{2}-\d{2})\.jsonl$")
# ─── Core logic ───────────────────────────────────────────────────────────────
def find_eligible_files(
audit_dir: Path,
cutoff_date: datetime.date,
) -> List[Path]:
"""Return JSONL files whose embedded date < cutoff_date."""
eligible = []
if not audit_dir.exists():
return eligible
for fpath in sorted(audit_dir.glob("tool_audit_*.jsonl")):
m = _DATE_PAT.search(fpath.name)
if not m:
continue
try:
file_date = datetime.date.fromisoformat(m.group(1))
except ValueError:
continue
if file_date < cutoff_date:
eligible.append(fpath)
return eligible
def run_cleanup(
retention_days: int,
audit_dir: str = "ops/audit",
dry_run: bool = True,
archive_gzip: bool = False,
repo_root: Optional[str] = None,
verbose: bool = False,
) -> Dict:
"""
Main cleanup routine.
Returns:
{scanned, eligible, deleted, archived, bytes_freed, dry_run, errors}
"""
if retention_days < 1 or retention_days > 365:
raise ValueError(f"retention_days must be 1365, got {retention_days}")
root = Path(repo_root or os.getenv("REPO_ROOT", ".")).resolve()
dir_path = (root / audit_dir).resolve()
# Path traversal guard
if not str(dir_path).startswith(str(root)):
raise ValueError(f"audit_dir '{audit_dir}' resolves outside repo root")
today = datetime.date.today()
cutoff = today - datetime.timedelta(days=retention_days)
all_jsonl = list(sorted(dir_path.glob("tool_audit_*.jsonl")))
eligible = find_eligible_files(dir_path, cutoff)
deleted = 0
archived = 0
bytes_freed = 0
errors: List[str] = []
for fpath in eligible:
size = fpath.stat().st_size
if dry_run:
action = "archive" if archive_gzip else "delete"
if verbose:
logger.info("[dry_run] Would %s: %s (%d bytes)", action, fpath.name, size)
bytes_freed += size
if archive_gzip:
archived += 1
else:
deleted += 1
continue
try:
if archive_gzip:
gz_path = fpath.with_suffix(".jsonl.gz")
with open(fpath, "rb") as f_in:
with gzip.open(gz_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
fpath.unlink()
archived += 1
bytes_freed += size
if verbose:
logger.info("Archived: %s%s (%d bytes)", fpath.name, gz_path.name, size)
else:
fpath.unlink()
deleted += 1
bytes_freed += size
if verbose:
logger.info("Deleted: %s (%d bytes)", fpath.name, size)
except Exception as e:
msg = f"Error processing {fpath.name}: {e}"
logger.warning(msg)
errors.append(msg)
result = {
"scanned": len(all_jsonl),
"eligible": len(eligible),
"deleted": deleted,
"archived": archived,
"bytes_freed": bytes_freed,
"dry_run": dry_run,
"retention_days": retention_days,
"cutoff_date": cutoff.isoformat(),
"audit_dir": str(dir_path),
"errors": errors,
}
if verbose or not dry_run:
summary = (
f"audit_cleanup: scanned={result['scanned']}, eligible={result['eligible']}, "
f"{'[DRY RUN] ' if dry_run else ''}"
f"deleted={deleted}, archived={archived}, freed={bytes_freed} bytes"
)
logger.info(summary)
return result
# ─── CLI entrypoint ───────────────────────────────────────────────────────────
def _parse_args(argv=None) -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Audit JSONL retention cleanup",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p.add_argument("--retention-days", type=int, default=30,
help="Delete/archive files older than this many days")
p.add_argument("--audit-dir", default="ops/audit",
help="Relative path to audit directory")
p.add_argument("--repo-root", default=None,
help="Repo root (default: REPO_ROOT env or cwd)")
p.add_argument("--dry-run", action="store_true",
help="Report only; do not delete or archive")
p.add_argument("--archive-gzip", action="store_true",
help="Compress to .jsonl.gz before deleting")
p.add_argument("--verbose", action="store_true",
help="Verbose output")
p.add_argument("--output-json", action="store_true",
help="Print JSON result to stdout")
return p.parse_args(argv)
def main(argv=None):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s audit_cleanup %(message)s",
stream=sys.stderr,
)
args = _parse_args(argv)
result = run_cleanup(
retention_days=args.retention_days,
audit_dir=args.audit_dir,
dry_run=args.dry_run,
archive_gzip=args.archive_gzip,
repo_root=args.repo_root,
verbose=args.verbose,
)
if args.output_json:
print(json.dumps(result, indent=2))
else:
status = "DRY RUN" if result["dry_run"] else "DONE"
print(
f"[{status}] scanned={result['scanned']} eligible={result['eligible']} "
f"deleted={result['deleted']} archived={result['archived']} "
f"freed={result['bytes_freed']}B"
)
if result["errors"]:
sys.exit(1)
if __name__ == "__main__":
main()