Files
microdao-daarion/tests/test_audit_cleanup.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

300 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Tests for audit_cleanup.py and audit_compact.py scripts.
Covers:
1. test_dry_run_does_not_delete — dry_run=True reports but changes nothing
2. test_retention_days_respected — files newer than cutoff are kept
3. test_delete_old_files — files older than retention_days are deleted
4. test_archive_gzip — old files compressed to .jsonl.gz, original removed
5. test_compact_dry_run — compact dry_run counts lines, no file written
6. test_compact_creates_gz — compact writes correct .jsonl.gz
7. test_invalid_retention_days — validation error for out-of-range
8. test_path_traversal_blocked — ../../ traversal raises ValueError
9. test_empty_audit_dir — empty dir → 0 scanned, no error
10. test_cleanup_already_gzipped — .gz files ignored by cleanup (not double-archived)
"""
from __future__ import annotations
import datetime
import gzip
import json
import sys
import tempfile
from pathlib import Path
import pytest
# ─── Path setup ──────────────────────────────────────────────────────────────
SCRIPTS_DIR = Path(__file__).parent.parent / "ops" / "scripts"
sys.path.insert(0, str(SCRIPTS_DIR))
from audit_cleanup import run_cleanup, find_eligible_files
from audit_compact import run_compact
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _make_jsonl(directory: Path, date: datetime.date, lines: int = 3) -> Path:
"""Create a tool_audit_YYYY-MM-DD.jsonl file with dummy events."""
fpath = directory / f"tool_audit_{date.isoformat()}.jsonl"
with open(fpath, "w") as f:
for i in range(lines):
f.write(json.dumps({
"ts": date.isoformat() + "T12:00:00+00:00",
"tool": "test_tool",
"status": "pass",
"duration_ms": 100 + i,
}) + "\n")
return fpath
def _today() -> datetime.date:
return datetime.date.today()
def _days_ago(n: int) -> datetime.date:
return _today() - datetime.timedelta(days=n)
# ─── 1. dry_run does not delete ────────────────────────────────────────────────
def test_dry_run_does_not_delete():
with tempfile.TemporaryDirectory() as tmp:
audit_dir = Path(tmp) / "audit"
audit_dir.mkdir()
# Create a file 35 days old
old_file = _make_jsonl(audit_dir, _days_ago(35))
result = run_cleanup(
retention_days=30,
audit_dir=str(audit_dir),
dry_run=True,
repo_root=tmp,
)
assert result["dry_run"] is True
assert result["eligible"] == 1
assert result["deleted"] == 1 # reported as "would delete"
assert old_file.exists(), "dry_run must NOT delete files"
# ─── 2. retention_days respected ─────────────────────────────────────────────
def test_retention_days_respected():
"""Files newer than cutoff are not deleted."""
with tempfile.TemporaryDirectory() as tmp:
audit_dir = Path(tmp) / "audit"
audit_dir.mkdir()
_make_jsonl(audit_dir, _days_ago(10)) # new — should be kept
old = _make_jsonl(audit_dir, _days_ago(40)) # old — eligible
result = run_cleanup(
retention_days=30,
audit_dir=str(audit_dir),
dry_run=False,
repo_root=tmp,
)
assert result["scanned"] == 2
assert result["eligible"] == 1
assert result["deleted"] == 1
assert not old.exists(), "Old file should be deleted"
# New file intact
assert (audit_dir / f"tool_audit_{_days_ago(10).isoformat()}.jsonl").exists()
# ─── 3. delete old files ───────────────────────────────────────────────────────
def test_delete_old_files():
with tempfile.TemporaryDirectory() as tmp:
audit_dir = Path(tmp) / "audit"
audit_dir.mkdir()
files = [_make_jsonl(audit_dir, _days_ago(d)) for d in [35, 50, 60, 5, 2]]
result = run_cleanup(
retention_days=30,
audit_dir=str(audit_dir),
dry_run=False,
repo_root=tmp,
)
assert result["scanned"] == 5
assert result["eligible"] == 3 # 35, 50, 60 days old
assert result["deleted"] == 3
assert result["bytes_freed"] > 0
assert len(result["errors"]) == 0
# ─── 4. archive_gzip ──────────────────────────────────────────────────────────
def test_archive_gzip():
with tempfile.TemporaryDirectory() as tmp:
audit_dir = Path(tmp) / "audit"
audit_dir.mkdir()
old = _make_jsonl(audit_dir, _days_ago(45))
result = run_cleanup(
retention_days=30,
audit_dir=str(audit_dir),
dry_run=False,
archive_gzip=True,
repo_root=tmp,
)
assert result["archived"] == 1
assert result["deleted"] == 0
assert not old.exists(), "Original .jsonl should be removed"
gz_path = old.with_suffix(".jsonl.gz")
assert gz_path.exists(), ".gz file should be created"
# Verify gzip content is readable
with gzip.open(gz_path, "rt") as f:
lines = [line for line in f if line.strip()]
assert len(lines) == 3, "gz should contain original 3 lines"
# ─── 5. compact dry_run ────────────────────────────────────────────────────────
def test_compact_dry_run():
with tempfile.TemporaryDirectory() as tmp:
audit_dir = Path(tmp) / "audit"
audit_dir.mkdir()
for d in range(5):
_make_jsonl(audit_dir, _days_ago(d), lines=4)
result = run_compact(
window_days=7,
audit_dir=str(audit_dir),
dry_run=True,
repo_root=tmp,
)
assert result["dry_run"] is True
assert result["source_files"] == 5
assert result["lines_written"] == 20 # 5 files × 4 lines
assert result["bytes_written"] == 0
# No output file created
compact_dir = Path(tmp) / "audit" / "compact"
assert not compact_dir.exists() or not list(compact_dir.glob("*.gz"))
# ─── 6. compact creates .jsonl.gz ─────────────────────────────────────────────
def test_compact_creates_gz():
with tempfile.TemporaryDirectory() as tmp:
audit_dir = Path(tmp) / "audit"
audit_dir.mkdir()
for d in range(3):
_make_jsonl(audit_dir, _days_ago(d), lines=5)
result = run_compact(
window_days=7,
audit_dir=str(audit_dir),
dry_run=False,
repo_root=tmp,
)
assert result["source_files"] == 3
assert result["lines_written"] == 15
assert result["bytes_written"] > 0
out_file = Path(result["output_file"])
assert out_file.exists()
with gzip.open(out_file, "rt") as f:
lines = [line for line in f if line.strip()]
assert len(lines) == 15
# ─── 7. invalid retention_days ────────────────────────────────────────────────
def test_invalid_retention_days():
with pytest.raises(ValueError, match="retention_days"):
run_cleanup(retention_days=0, audit_dir="ops/audit", dry_run=True)
with pytest.raises(ValueError, match="retention_days"):
run_cleanup(retention_days=400, audit_dir="ops/audit", dry_run=True)
# ─── 8. path traversal blocked ────────────────────────────────────────────────
def test_path_traversal_blocked():
with tempfile.TemporaryDirectory() as tmp:
with pytest.raises(ValueError, match="outside repo root"):
run_cleanup(
retention_days=30,
audit_dir="../../etc/passwd",
dry_run=True,
repo_root=tmp,
)
# ─── 9. empty audit dir ───────────────────────────────────────────────────────
def test_empty_audit_dir():
with tempfile.TemporaryDirectory() as tmp:
audit_dir = Path(tmp) / "audit"
audit_dir.mkdir()
result = run_cleanup(
retention_days=30,
audit_dir=str(audit_dir),
dry_run=True,
repo_root=tmp,
)
assert result["scanned"] == 0
assert result["eligible"] == 0
assert result["bytes_freed"] == 0
# ─── 10. .gz files not double-processed ──────────────────────────────────────
def test_gz_files_not_processed():
"""Already-compressed .jsonl.gz files should NOT be touched by cleanup."""
with tempfile.TemporaryDirectory() as tmp:
audit_dir = Path(tmp) / "audit"
audit_dir.mkdir()
# Create a .gz file (simulating already-archived)
gz_path = audit_dir / f"tool_audit_{_days_ago(45).isoformat()}.jsonl.gz"
with gzip.open(gz_path, "wt") as f:
f.write('{"ts":"2026-01-01","tool":"x"}\n')
result = run_cleanup(
retention_days=30,
audit_dir=str(audit_dir),
dry_run=False,
repo_root=tmp,
)
# .gz files not matched by glob("*.jsonl")
assert result["scanned"] == 0
assert gz_path.exists(), ".gz should not be touched"
# ─── 11. find_eligible_files cutoff logic ─────────────────────────────────────
def test_find_eligible_files():
with tempfile.TemporaryDirectory() as tmp:
audit_dir = Path(tmp)
dates = [_days_ago(60), _days_ago(31), _days_ago(30), _days_ago(29), _days_ago(1)]
for d in dates:
_make_jsonl(audit_dir, d)
cutoff = _today() - datetime.timedelta(days=30)
eligible = find_eligible_files(audit_dir, cutoff)
eligible_names = [f.name for f in eligible]
# 60 and 31 days ago → eligible (strictly before cutoff)
assert len(eligible) == 2
assert f"tool_audit_{_days_ago(60).isoformat()}.jsonl" in eligible_names
assert f"tool_audit_{_days_ago(31).isoformat()}.jsonl" in eligible_names
# 30 and newer → not eligible
assert f"tool_audit_{_days_ago(30).isoformat()}.jsonl" not in eligible_names