""" tests/test_privacy_digest.py ───────────────────────────── Tests for data_governance_tool.digest_audit action and backend=auto routing. """ from __future__ import annotations import datetime import json import sys import tempfile from pathlib import Path from typing import Dict from unittest.mock import MagicMock, patch # ── Ensure router is importable ─────────────────────────────────────────────── ROUTER = Path(__file__).resolve().parent.parent / "services" / "router" if str(ROUTER) not in sys.path: sys.path.insert(0, str(ROUTER)) from audit_store import MemoryAuditStore, set_audit_store # noqa: E402 def _ts(delta_hours: int = 0) -> str: t = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=delta_hours) return t.isoformat() def _audit_event(tool: str = "kb_tool", agent_id: str = "sofiia", status: str = "succeeded", meta: dict | None = None) -> Dict: ev = dict( ts=_ts(0), req_id="r1", workspace_id="ws1", user_id="u1", agent_id=agent_id, tool=tool, action="any", status=status, duration_ms=50, in_size=10, out_size=50, input_hash="abc", ) if meta: ev["meta"] = meta return ev def _pii_audit_event() -> Dict: """Audit event that contains an email in the meta field — should be detected.""" return _audit_event(meta={"user_label": "john.doe@example.com", "note": "test"}) def _large_output_event() -> Dict: """Audit event with anomalously large out_size.""" ev = _audit_event() ev["out_size"] = 200_000 # 200KB — above threshold return ev # ─── digest_audit ───────────────────────────────────────────────────────────── class TestPrivacyDigest: def setup_method(self): self._mem = MemoryAuditStore() set_audit_store(self._mem) def teardown_method(self): set_audit_store(None) def test_digest_audit_returns_expected_keys(self): from data_governance import digest_audit result = digest_audit(backend="auto", time_window_hours=24) assert "stats" in result assert "by_category" in result assert "top_findings" in result assert "recommendations" in result assert "markdown" in result assert "source_backend" in result def test_digest_audit_empty_store_no_findings(self): from data_governance import digest_audit result = digest_audit(backend="auto", time_window_hours=24) stats = result["stats"] assert stats["total"] == 0 assert result["pass"] is True def test_digest_audit_detects_pii_in_meta(self): from data_governance import digest_audit self._mem.write(_pii_audit_event()) result = digest_audit(backend="auto", time_window_hours=24) # PII email pattern should produce at least one finding stats = result["stats"] total = stats["errors"] + stats["warnings"] # The scan may or may not detect meta PII depending on patterns — # we only assert it doesn't crash and returns valid structure. assert isinstance(total, int) assert isinstance(result["markdown"], str) def test_digest_audit_detects_large_output(self): from data_governance import digest_audit self._mem.write(_large_output_event()) result = digest_audit(backend="auto", time_window_hours=24) # Large output finding may appear as info/warning assert isinstance(result["stats"]["total"], int) assert isinstance(result["markdown"], str) def test_digest_audit_markdown_not_too_long(self): from data_governance import digest_audit # Add multiple events for _ in range(30): self._mem.write(_audit_event()) result = digest_audit(backend="auto", time_window_hours=24, max_markdown_chars=3800) assert len(result["markdown"]) <= 3850 def test_digest_audit_markdown_contains_period(self): from data_governance import digest_audit result = digest_audit(backend="auto", time_window_hours=24) assert "Last 24h" in result["markdown"] def test_digest_audit_source_backend_reported(self): from data_governance import digest_audit result = digest_audit(backend="auto", time_window_hours=24) assert result["source_backend"] in ("memory", "jsonl", "postgres", "jsonl_fallback", "unknown") def test_digest_audit_via_tool_dispatch(self): from data_governance import scan_data_governance_dict result = scan_data_governance_dict("digest_audit", params={ "backend": "auto", "time_window_hours": 24, "max_findings": 10, }) assert "stats" in result def test_digest_audit_unknown_action_returns_error(self): from data_governance import scan_data_governance_dict result = scan_data_governance_dict("nonexistent_action", params={}) assert "error" in result assert "digest_audit" in result["error"] def test_digest_audit_by_category_is_dict(self): from data_governance import digest_audit self._mem.write(_pii_audit_event()) result = digest_audit(backend="auto", time_window_hours=24) assert isinstance(result["by_category"], dict) def test_digest_audit_recommendations_is_list(self): from data_governance import digest_audit result = digest_audit(backend="auto", time_window_hours=24) assert isinstance(result["recommendations"], list) # ─── backend=auto routing for scan_audit ───────────────────────────────────── class TestDataGovBackendAuto: def setup_method(self): self._mem = MemoryAuditStore() set_audit_store(self._mem) def teardown_method(self): set_audit_store(None) def test_scan_audit_backend_auto_uses_global_store(self): from data_governance import scan_audit for _ in range(5): self._mem.write(_audit_event()) result = scan_audit(backend="auto", time_window_hours=24, max_events=100) # Should scan the MemoryAuditStore events (5) assert result["stats"]["events_scanned"] == 5 def test_scan_audit_backend_jsonl_with_tempdir(self, tmp_path): """JSONL backend reads from actual files.""" import os from data_governance import scan_audit # Write one JSONL audit file today = datetime.date.today().isoformat() jsonl_path = tmp_path / f"tool_audit_{today}.jsonl" jsonl_path.write_text( json.dumps(_audit_event()) + "\n", encoding="utf-8", ) with patch.dict(os.environ, {"AUDIT_JSONL_DIR": str(tmp_path)}): result = scan_audit(backend="jsonl", time_window_hours=24, max_events=100) # Should at least not crash; events_scanned ≥ 0 assert isinstance(result["stats"]["events_scanned"], int) def test_resolve_audit_store_auto(self): from data_governance import _resolve_audit_store store = _resolve_audit_store("auto") assert store is self._mem # global store is the MemoryAuditStore we set def test_resolve_audit_store_memory(self): from data_governance import _resolve_audit_store store = _resolve_audit_store("memory") # Use type name check to avoid module-identity issues across sys.path variants assert type(store).__name__ == "MemoryAuditStore"