""" Tests for Data Governance & Privacy Tool Covers: 1. test_scan_repo_detects_pii_logging — logger + email → warning 2. test_scan_repo_detects_secret — API_KEY=sk-... → error, masked 3. test_scan_repo_detects_credit_card — credit-card pattern → error 4. test_scan_repo_no_findings_clean — clean code → 0 findings 5. test_scan_audit_detects_pii_in_meta — email in audit meta → warning 6. test_scan_audit_detects_large_output — out_size anomaly → warning 7. test_retention_check_missing_cleanup — no cleanup task → warning 8. test_retention_check_with_cleanup — runbook mentions cleanup → info 9. test_scan_repo_raw_payload_audit — raw payload near logger → error 10. test_release_check_privacy_watch_integration — gate always pass 11. test_rbac_deny — wrong agent → denied 12. test_rbac_allow — sofiia → allowed 13. test_policy_action — returns policy structure 14. test_path_traversal_protection — ../../etc/passwd blocked 15. test_scan_repo_excludes_lock_files — *.lock not scanned """ from __future__ import annotations import asyncio import json import os import sys import tempfile from pathlib import Path from typing import Any, Dict from unittest.mock import AsyncMock, MagicMock import pytest # ─── Path setup ────────────────────────────────────────────────────────────── ROUTER_DIR = Path(__file__).parent.parent / "services" / "router" REPO_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(ROUTER_DIR)) sys.path.insert(0, str(REPO_ROOT)) os.environ.setdefault("REPO_ROOT", str(REPO_ROOT)) os.environ["AUDIT_BACKEND"] = "memory" from data_governance import ( scan_repo, scan_audit, retention_check, get_policy, scan_data_governance_dict, reload_policy, _mask_evidence, ) from audit_store import MemoryAuditStore, set_audit_store # ─── Helpers ────────────────────────────────────────────────────────────────── def _write(tmp: Path, name: str, content: str) -> Path: p = tmp / name p.write_text(content, encoding="utf-8") return p def _repo_scan(tmp: Path, **kwargs) -> Dict: return scan_repo( repo_root=str(tmp), paths_include=[""], # scan root paths_exclude=["**/__pycache__/**"], **kwargs, ) # ─── 1. PII in logging ──────────────────────────────────────────────────────── def test_scan_repo_detects_pii_logging(): """logger call with literal email in log message → DG-PII-001 warning.""" with tempfile.TemporaryDirectory() as tmp: _write(Path(tmp), "service.py", """\ import logging logger = logging.getLogger(__name__) def process_user(data): # BAD: logging real email address logger.info("Processing user john.doe@example.com request: %s", data) return True """) result = _repo_scan(Path(tmp), focus=["pii", "logging"]) findings = result["findings"] assert result["pass"] is True # Should detect email PII pattern (DG-PII-001) in the source file pii_ids = [f["id"] for f in findings] assert any(fid.startswith("DG-PII") or fid.startswith("DG-LOG") for fid in pii_ids), \ f"Expected PII/logging finding, got: {pii_ids}" def test_scan_repo_detects_logging_forbidden_field(): """logger call with 'token' field → DG-LOG-001.""" with tempfile.TemporaryDirectory() as tmp: _write(Path(tmp), "auth.py", """\ import logging logger = logging.getLogger(__name__) def verify(token, user_id): logger.debug(f"Verifying token={token} for user={user_id}") return True """) result = _repo_scan(Path(tmp), focus=["logging"]) log_findings = [f for f in result["findings"] if f["id"] == "DG-LOG-001"] assert len(log_findings) >= 1 assert result["stats"]["warnings"] + result["stats"]["errors"] >= 1 # ─── 2. Secret detection ────────────────────────────────────────────────────── def test_scan_repo_detects_secret(): """Hardcoded API key → DG-SEC-000/DG-SEC-001, evidence masked.""" with tempfile.TemporaryDirectory() as tmp: _write(Path(tmp), "config.py", """\ # Configuration API_KEY = "sk-abc123xyz9012345678901234567890" DATABASE_URL = "postgresql://user:mysecretpassword@localhost/db" """) result = _repo_scan(Path(tmp), focus=["secrets"]) sec_findings = [f for f in result["findings"] if f["category"] == "secrets"] assert len(sec_findings) >= 1 # Evidence must be masked — no raw key in output for f in sec_findings: detail = f["evidence"].get("details", "") # The raw key value should not be visible assert "sk-abc123xyz9012345678901234567890" not in detail def test_scan_repo_detects_private_key(): """Private key block → DG-SEC-001 error.""" with tempfile.TemporaryDirectory() as tmp: _write(Path(tmp), "keys.py", """\ PRIVATE_KEY = ''' -----BEGIN RSA PRIVATE KEY----- MIIEowIBAAKCAQEA...base64data... -----END RSA PRIVATE KEY----- ''' """) result = _repo_scan(Path(tmp), focus=["secrets"]) sec_findings = [f for f in result["findings"] if "DG-SEC" in f["id"]] assert len(sec_findings) >= 1 # At least one error for private key assert any(f["severity"] == "error" for f in sec_findings) # ─── 3. Credit card pattern ─────────────────────────────────────────────────── def test_scan_repo_detects_credit_card(): """Credit card number in code → DG-PII-003 error.""" with tempfile.TemporaryDirectory() as tmp: _write(Path(tmp), "payment.py", """\ # Test data (NEVER use real card numbers!) TEST_CARD = "4111111111111111" # Visa test number """) result = _repo_scan(Path(tmp), focus=["pii"]) pii_findings = [f for f in result["findings"] if f["id"] == "DG-PII-003"] assert len(pii_findings) >= 1 assert pii_findings[0]["severity"] == "error" # ─── 4. Clean code — no findings ───────────────────────────────────────────── def test_scan_repo_no_findings_clean(): """Clean code with proper practices → no findings (or minimal).""" with tempfile.TemporaryDirectory() as tmp: _write(Path(tmp), "service.py", """\ import logging from governance import redact logger = logging.getLogger(__name__) def process_request(req_id: str, workspace_id: str): # Log only safe identifiers logger.info("Processing request=%s ws=%s", req_id[:8], workspace_id[:8]) return {"status": "ok"} """) result = _repo_scan(Path(tmp), focus=["pii", "logging", "secrets"]) # Should have 0 or very few findings (no credit cards, no raw emails, no raw secrets) error_findings = [f for f in result["findings"] if f["severity"] == "error"] assert len(error_findings) == 0 # ─── 5. scan_audit PII in meta ─────────────────────────────────────────────── def test_scan_audit_detects_pii_in_meta(): """Email in audit event user_id field → DG-AUD-101 warning.""" store = MemoryAuditStore() set_audit_store(store) # Inject audit event where user_id looks like an email store.write({ "ts": "2026-02-23T12:00:00+00:00", "req_id": "req-001", "workspace_id": "ws1", "user_id": "test.user@example.com", # PII in user_id "agent_id": "sofiia", "tool": "observability_tool", "action": "logs_query", "status": "pass", "duration_ms": 100, "in_size": 50, "out_size": 200, "input_hash": "sha256:abc", }) result = scan_audit(time_window_hours=24) pii_audit = [f for f in result["findings"] if f["id"] == "DG-AUD-101"] assert len(pii_audit) >= 1 assert pii_audit[0]["severity"] in ("warning", "error") # Evidence must be masked detail = pii_audit[0]["evidence"].get("details", "") # Real email may be partially masked assert len(detail) <= 250 # truncated to safe length def test_scan_audit_detects_large_output(): """Very large out_size → DG-AUD-102 warning.""" store = MemoryAuditStore() set_audit_store(store) store.write({ "ts": "2026-02-23T12:00:00+00:00", "req_id": "req-002", "workspace_id": "ws1", "user_id": "user_x", "agent_id": "sofiia", "tool": "observability_tool", "action": "logs_query", "status": "pass", "duration_ms": 500, "in_size": 100, "out_size": 200000, # 200KB — above 65536 threshold "input_hash": "sha256:def", }) result = scan_audit(time_window_hours=24) large_findings = [f for f in result["findings"] if f["id"] == "DG-AUD-102"] assert len(large_findings) >= 1 assert "200000" in large_findings[0]["evidence"].get("details", "") # ─── 6. scan_audit — no store → graceful ───────────────────────────────────── def test_scan_audit_no_findings_for_clean_events(): """Normal audit events without PII → no findings.""" store = MemoryAuditStore() set_audit_store(store) for i in range(5): store.write({ "ts": "2026-02-23T12:00:00+00:00", "req_id": f"req-{i:03d}", "workspace_id": "ws_opaque_hash", "user_id": f"usr_{i:04d}", "agent_id": "sofiia", "tool": "cost_analyzer_tool", "action": "top", "status": "pass", "duration_ms": 50, "in_size": 40, "out_size": 300, "input_hash": "sha256:aaa", }) result = scan_audit(time_window_hours=24) # No DG-AUD-101 (no PII) and no DG-AUD-102 (small outputs) assert not any(f["id"] == "DG-AUD-101" for f in result["findings"]) assert not any(f["id"] == "DG-AUD-102" for f in result["findings"]) # ─── 7. retention_check — missing cleanup ───────────────────────────────────── def test_retention_check_missing_cleanup(): """Empty repo → no cleanup mechanisms → DG-RET-201 warning.""" with tempfile.TemporaryDirectory() as tmp: # Create empty ops/ and task_registry.yml without audit_cleanup ops = Path(tmp) / "ops" ops.mkdir() (ops / "task_registry.yml").write_text("tasks: []\n") result = retention_check( repo_root=str(tmp), check_audit_cleanup_task=True, check_jsonl_rotation=True, check_memory_retention_docs=False, check_logs_retention_docs=False, ) assert result["pass"] is True warn_ids = [f["id"] for f in result["findings"]] assert "DG-RET-201" in warn_ids # ─── 8. retention_check — with cleanup documented ──────────────────────────── def test_retention_check_with_cleanup(): """Runbook mentioning audit cleanup → DG-RET-202 info.""" with tempfile.TemporaryDirectory() as tmp: ops = Path(tmp) / "ops" ops.mkdir() # Runbook that mentions audit cleanup and rotation (ops / "runbook-audit.md").write_text( "# Audit Runbook\n\nRun audit_cleanup task to rotate jsonl files older than 30 days.\n" ) result = retention_check( repo_root=str(tmp), check_audit_cleanup_task=True, check_jsonl_rotation=False, check_memory_retention_docs=False, check_logs_retention_docs=False, ) assert result["pass"] is True info_ids = [f["id"] for f in result["findings"]] assert "DG-RET-202" in info_ids # ─── 9. Raw payload near audit write ────────────────────────────────────────── def test_scan_repo_raw_payload_audit_write(): """payload field near logger.info call → DG-AUD-001 error.""" with tempfile.TemporaryDirectory() as tmp: _write(Path(tmp), "audit_writer.py", """\ import logging logger = logging.getLogger(__name__) def emit_event(req_id, payload, tool): # Storing full payload in audit log record = {"req_id": req_id, "payload": payload, "tool": tool} logger.info("AUDIT_EVENT %s", record) """) result = _repo_scan(Path(tmp), focus=["logging"]) aud_findings = [f for f in result["findings"] if f["id"] == "DG-AUD-001"] assert len(aud_findings) >= 1 assert aud_findings[0]["severity"] == "error" # ─── 10. Release check privacy_watch integration ───────────────────────────── def test_release_check_privacy_watch_integration(): """privacy_watch gate always pass=True; adds recommendations.""" async def _run(): from release_check_runner import run_release_check class FakeResult: def __init__(self, data, success=True, error=None): self.success = success self.result = data self.error = error async def fake_exec(tool_name, args, agent_id=None): if tool_name == "pr_reviewer_tool": return FakeResult({"approved": True, "verdict": "LGTM", "issues": []}) if tool_name == "config_linter_tool": return FakeResult({"pass": True, "errors": [], "warnings": []}) if tool_name == "dependency_scanner_tool": return FakeResult({"pass": True, "summary": "No vulns", "vulnerabilities": []}) if tool_name == "contract_tool": return FakeResult({"pass": True, "breaking_changes": [], "warnings": []}) if tool_name == "threatmodel_tool": return FakeResult({"risk_level": "low", "threats": []}) if tool_name == "data_governance_tool": # Simulate findings with warning action = args.get("action", "") if action == "scan_repo": return FakeResult({ "pass": True, "summary": "2 warnings", "stats": {"errors": 0, "warnings": 2, "infos": 0}, "findings": [ {"id": "DG-LOG-001", "severity": "warning", "title": "Potential sensitive field logged", "category": "logging", "evidence": {}, "recommended_fix": "Use redact()"}, ], "recommendations": ["Review logger calls for sensitive fields."], }) return FakeResult({"pass": True, "findings": [], "recommendations": [], "stats": {}}) if tool_name == "cost_analyzer_tool": return FakeResult({"anomalies": [], "anomaly_count": 0}) return FakeResult({}) tm = MagicMock() tm.execute_tool = AsyncMock(side_effect=fake_exec) inputs = { "diff_text": "small fix", "run_smoke": False, "run_drift": False, "run_deps": True, "run_privacy_watch": True, "run_cost_watch": True, "fail_fast": False, } return await run_release_check(tm, inputs, agent_id="sofiia") report = asyncio.run(_run()) gate_names = [g["name"] for g in report["gates"]] assert "privacy_watch" in gate_names pw_gate = next(g for g in report["gates"] if g["name"] == "privacy_watch") assert pw_gate["status"] == "pass" assert pw_gate.get("warnings", 0) >= 0 # warnings don't block release assert report["pass"] is True # Recommendations from privacy_watch should be in the final report all_recs = report.get("recommendations", []) assert any("logger" in r.lower() or "redact" in r.lower() or "sensitiv" in r.lower() for r in all_recs), f"Expected privacy rec in {all_recs}" # ─── 11. privacy_watch skipped on error ────────────────────────────────────── def test_privacy_watch_skipped_on_tool_error(): """Unhandled exception in data_governance_tool → gate still pass=True (skipped).""" async def _run(): from release_check_runner import _run_privacy_watch tm = MagicMock() # Raise a real exception (not just FailResult) so outer try/except catches it tm.execute_tool = AsyncMock(side_effect=RuntimeError("connection refused")) return await _run_privacy_watch(tm, "sofiia") ok, gate = asyncio.run(_run()) assert ok is True assert gate["status"] == "pass" # skipped=True is set when the outer except catches the error assert gate.get("skipped") is True # ─── 12–13. RBAC ────────────────────────────────────────────────────────────── def test_rbac_deny(): """Agent without tools.data_gov.read → denied.""" from tool_governance import ToolGovernance gov = ToolGovernance(enable_rbac=True, enable_limits=False, enable_allowlist=False) result = gov.pre_call( tool="data_governance_tool", action="scan_repo", agent_id="alateya", # agent_media — no data_gov entitlement ) assert not result.allowed assert "entitlement" in result.reason.lower() or "denied" in result.reason.lower() def test_rbac_allow(): """'sofiia' (agent_cto) has tools.data_gov.read → allowed.""" from tool_governance import ToolGovernance gov = ToolGovernance(enable_rbac=True, enable_limits=False, enable_allowlist=False) result = gov.pre_call( tool="data_governance_tool", action="scan_repo", agent_id="sofiia", ) assert result.allowed # ─── 14. policy action ──────────────────────────────────────────────────────── def test_policy_action(): """policy action returns structured governance policy.""" reload_policy() result = scan_data_governance_dict("policy") assert "retention" in result assert "pii_patterns" in result assert "severity_behavior" in result assert "logging_rules" in result ret = result["retention"] assert "audit_jsonl_days" in ret assert int(ret["audit_jsonl_days"]) > 0 # ─── 15. Path traversal protection ─────────────────────────────────────────── def test_path_traversal_protection(): """Traversal outside repo_root is blocked (safe_path returns None).""" from data_governance import _safe_path with tempfile.TemporaryDirectory() as tmp: result = _safe_path(tmp, "../../etc/passwd") assert result is None # ─── 16. Lock files excluded ───────────────────────────────────────────────── def test_scan_repo_excludes_lock_files(): """poetry.lock / package-lock.json not scanned (false-positive prevention).""" with tempfile.TemporaryDirectory() as tmp: _write( Path(tmp), "poetry.lock", # Lock files often have long hex strings that look like secrets "token = \"ghp_faketoken12345678901234567890123456\"\n" ) _write(Path(tmp), "service.py", "def hello(): return 'world'\n") result = scan_repo( repo_root=str(tmp), paths_include=[""], paths_exclude=["**/*.lock"], # lock files excluded focus=["secrets"], ) # poetry.lock should be excluded, so no secrets from it lock_findings = [ f for f in result["findings"] if "poetry.lock" in f["evidence"].get("path", "") ] assert len(lock_findings) == 0 # ─── 17. mask_evidence ──────────────────────────────────────────────────────── def test_mask_evidence_redacts_secrets(): """_mask_evidence masks key=value patterns.""" raw = "api_key = sk-supersecretvalue12345" masked = _mask_evidence(raw) assert "sk-supersecretvalue12345" not in masked assert "***" in masked or "REDACTED" in masked def test_mask_evidence_truncates(): """_mask_evidence truncates long strings.""" long_str = "x" * 500 result = _mask_evidence(long_str, max_chars=100) assert len(result) <= 120 # truncated + "…[truncated]" suffix # ─── 18. scan_data_governance_dict unknown action ─────────────────────────── def test_unknown_action_returns_error(): """Unknown action → error dict, not exception.""" result = scan_data_governance_dict("explode_everything") assert "error" in result assert "Unknown action" in result["error"]