""" Tests for audit_cleanup.py and audit_compact.py scripts. Covers: 1. test_dry_run_does_not_delete — dry_run=True reports but changes nothing 2. test_retention_days_respected — files newer than cutoff are kept 3. test_delete_old_files — files older than retention_days are deleted 4. test_archive_gzip — old files compressed to .jsonl.gz, original removed 5. test_compact_dry_run — compact dry_run counts lines, no file written 6. test_compact_creates_gz — compact writes correct .jsonl.gz 7. test_invalid_retention_days — validation error for out-of-range 8. test_path_traversal_blocked — ../../ traversal raises ValueError 9. test_empty_audit_dir — empty dir → 0 scanned, no error 10. test_cleanup_already_gzipped — .gz files ignored by cleanup (not double-archived) """ from __future__ import annotations import datetime import gzip import json import sys import tempfile from pathlib import Path import pytest # ─── Path setup ────────────────────────────────────────────────────────────── SCRIPTS_DIR = Path(__file__).parent.parent / "ops" / "scripts" sys.path.insert(0, str(SCRIPTS_DIR)) from audit_cleanup import run_cleanup, find_eligible_files from audit_compact import run_compact # ─── Helpers ────────────────────────────────────────────────────────────────── def _make_jsonl(directory: Path, date: datetime.date, lines: int = 3) -> Path: """Create a tool_audit_YYYY-MM-DD.jsonl file with dummy events.""" fpath = directory / f"tool_audit_{date.isoformat()}.jsonl" with open(fpath, "w") as f: for i in range(lines): f.write(json.dumps({ "ts": date.isoformat() + "T12:00:00+00:00", "tool": "test_tool", "status": "pass", "duration_ms": 100 + i, }) + "\n") return fpath def _today() -> datetime.date: return datetime.date.today() def _days_ago(n: int) -> datetime.date: return _today() - datetime.timedelta(days=n) # ─── 1. dry_run does not delete ──────────────────────────────────────────────── def test_dry_run_does_not_delete(): with tempfile.TemporaryDirectory() as tmp: audit_dir = Path(tmp) / "audit" audit_dir.mkdir() # Create a file 35 days old old_file = _make_jsonl(audit_dir, _days_ago(35)) result = run_cleanup( retention_days=30, audit_dir=str(audit_dir), dry_run=True, repo_root=tmp, ) assert result["dry_run"] is True assert result["eligible"] == 1 assert result["deleted"] == 1 # reported as "would delete" assert old_file.exists(), "dry_run must NOT delete files" # ─── 2. retention_days respected ───────────────────────────────────────────── def test_retention_days_respected(): """Files newer than cutoff are not deleted.""" with tempfile.TemporaryDirectory() as tmp: audit_dir = Path(tmp) / "audit" audit_dir.mkdir() _make_jsonl(audit_dir, _days_ago(10)) # new — should be kept old = _make_jsonl(audit_dir, _days_ago(40)) # old — eligible result = run_cleanup( retention_days=30, audit_dir=str(audit_dir), dry_run=False, repo_root=tmp, ) assert result["scanned"] == 2 assert result["eligible"] == 1 assert result["deleted"] == 1 assert not old.exists(), "Old file should be deleted" # New file intact assert (audit_dir / f"tool_audit_{_days_ago(10).isoformat()}.jsonl").exists() # ─── 3. delete old files ─────────────────────────────────────────────────────── def test_delete_old_files(): with tempfile.TemporaryDirectory() as tmp: audit_dir = Path(tmp) / "audit" audit_dir.mkdir() files = [_make_jsonl(audit_dir, _days_ago(d)) for d in [35, 50, 60, 5, 2]] result = run_cleanup( retention_days=30, audit_dir=str(audit_dir), dry_run=False, repo_root=tmp, ) assert result["scanned"] == 5 assert result["eligible"] == 3 # 35, 50, 60 days old assert result["deleted"] == 3 assert result["bytes_freed"] > 0 assert len(result["errors"]) == 0 # ─── 4. archive_gzip ────────────────────────────────────────────────────────── def test_archive_gzip(): with tempfile.TemporaryDirectory() as tmp: audit_dir = Path(tmp) / "audit" audit_dir.mkdir() old = _make_jsonl(audit_dir, _days_ago(45)) result = run_cleanup( retention_days=30, audit_dir=str(audit_dir), dry_run=False, archive_gzip=True, repo_root=tmp, ) assert result["archived"] == 1 assert result["deleted"] == 0 assert not old.exists(), "Original .jsonl should be removed" gz_path = old.with_suffix(".jsonl.gz") assert gz_path.exists(), ".gz file should be created" # Verify gzip content is readable with gzip.open(gz_path, "rt") as f: lines = [line for line in f if line.strip()] assert len(lines) == 3, "gz should contain original 3 lines" # ─── 5. compact dry_run ──────────────────────────────────────────────────────── def test_compact_dry_run(): with tempfile.TemporaryDirectory() as tmp: audit_dir = Path(tmp) / "audit" audit_dir.mkdir() for d in range(5): _make_jsonl(audit_dir, _days_ago(d), lines=4) result = run_compact( window_days=7, audit_dir=str(audit_dir), dry_run=True, repo_root=tmp, ) assert result["dry_run"] is True assert result["source_files"] == 5 assert result["lines_written"] == 20 # 5 files × 4 lines assert result["bytes_written"] == 0 # No output file created compact_dir = Path(tmp) / "audit" / "compact" assert not compact_dir.exists() or not list(compact_dir.glob("*.gz")) # ─── 6. compact creates .jsonl.gz ───────────────────────────────────────────── def test_compact_creates_gz(): with tempfile.TemporaryDirectory() as tmp: audit_dir = Path(tmp) / "audit" audit_dir.mkdir() for d in range(3): _make_jsonl(audit_dir, _days_ago(d), lines=5) result = run_compact( window_days=7, audit_dir=str(audit_dir), dry_run=False, repo_root=tmp, ) assert result["source_files"] == 3 assert result["lines_written"] == 15 assert result["bytes_written"] > 0 out_file = Path(result["output_file"]) assert out_file.exists() with gzip.open(out_file, "rt") as f: lines = [line for line in f if line.strip()] assert len(lines) == 15 # ─── 7. invalid retention_days ──────────────────────────────────────────────── def test_invalid_retention_days(): with pytest.raises(ValueError, match="retention_days"): run_cleanup(retention_days=0, audit_dir="ops/audit", dry_run=True) with pytest.raises(ValueError, match="retention_days"): run_cleanup(retention_days=400, audit_dir="ops/audit", dry_run=True) # ─── 8. path traversal blocked ──────────────────────────────────────────────── def test_path_traversal_blocked(): with tempfile.TemporaryDirectory() as tmp: with pytest.raises(ValueError, match="outside repo root"): run_cleanup( retention_days=30, audit_dir="../../etc/passwd", dry_run=True, repo_root=tmp, ) # ─── 9. empty audit dir ─────────────────────────────────────────────────────── def test_empty_audit_dir(): with tempfile.TemporaryDirectory() as tmp: audit_dir = Path(tmp) / "audit" audit_dir.mkdir() result = run_cleanup( retention_days=30, audit_dir=str(audit_dir), dry_run=True, repo_root=tmp, ) assert result["scanned"] == 0 assert result["eligible"] == 0 assert result["bytes_freed"] == 0 # ─── 10. .gz files not double-processed ────────────────────────────────────── def test_gz_files_not_processed(): """Already-compressed .jsonl.gz files should NOT be touched by cleanup.""" with tempfile.TemporaryDirectory() as tmp: audit_dir = Path(tmp) / "audit" audit_dir.mkdir() # Create a .gz file (simulating already-archived) gz_path = audit_dir / f"tool_audit_{_days_ago(45).isoformat()}.jsonl.gz" with gzip.open(gz_path, "wt") as f: f.write('{"ts":"2026-01-01","tool":"x"}\n') result = run_cleanup( retention_days=30, audit_dir=str(audit_dir), dry_run=False, repo_root=tmp, ) # .gz files not matched by glob("*.jsonl") assert result["scanned"] == 0 assert gz_path.exists(), ".gz should not be touched" # ─── 11. find_eligible_files cutoff logic ───────────────────────────────────── def test_find_eligible_files(): with tempfile.TemporaryDirectory() as tmp: audit_dir = Path(tmp) dates = [_days_ago(60), _days_ago(31), _days_ago(30), _days_ago(29), _days_ago(1)] for d in dates: _make_jsonl(audit_dir, d) cutoff = _today() - datetime.timedelta(days=30) eligible = find_eligible_files(audit_dir, cutoff) eligible_names = [f.name for f in eligible] # 60 and 31 days ago → eligible (strictly before cutoff) assert len(eligible) == 2 assert f"tool_audit_{_days_ago(60).isoformat()}.jsonl" in eligible_names assert f"tool_audit_{_days_ago(31).isoformat()}.jsonl" in eligible_names # 30 and newer → not eligible assert f"tool_audit_{_days_ago(30).isoformat()}.jsonl" not in eligible_names