""" Tests for Drift Analyzer. Uses isolated temp directories as mini-repo fixtures — no dependency on actual repo content. Categories: 1. tools: rollout tool without handler → DRIFT-TOOLS-001 error 2. openapi: OpenAPI path not in code → DRIFT-OAS-001 error 3. services: compose service not in catalog → DRIFT-SVC-002 warning (pass=true) 4. nats: missing inventory → skipped, pass not affected 5. nats: code subject not in inventory → DRIFT-NATS-001 warning 6. integration: release_check with drift gate """ import asyncio import csv import json import os import sys import tempfile import yaml import pytest # Ensure imports work sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "router")) sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) # ─── Fixture Helpers ───────────────────────────────────────────────────────── def _write(path: str, content: str): os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as f: f.write(content) def _write_yaml(path: str, data): _write(path, yaml.dump(data)) def _make_minimal_governance_configs(root: str): """Write minimal tools_rollout.yml and rbac_tools_matrix.yml for tools drift tests.""" rollout = { "default_tools_read": ["repo_tool", "kb_tool"], "cto_tools": ["pr_reviewer_tool", "drift_analyzer_tool"], "role_map": { "agent_default": {"tools": ["@default_tools_read"]}, "agent_cto": {"tools": ["@default_tools_read", "@cto_tools"]}, }, "agent_roles": {"sofiia": "agent_cto"}, } _write_yaml(os.path.join(root, "config", "tools_rollout.yml"), rollout) matrix = { "tools": { "repo_tool": {"actions": {"read": {"entitlements": ["tools.repo.read"]}}}, "kb_tool": {"actions": {"search": {"entitlements": ["tools.kb.read"]}}}, "pr_reviewer_tool": {"actions": {"review": {"entitlements": ["tools.pr_review.use"]}}}, "drift_analyzer_tool": {"actions": {"analyze": {"entitlements": ["tools.drift.read"]}}}, }, "role_entitlements": { "agent_default": ["tools.repo.read", "tools.kb.read"], "agent_cto": ["tools.repo.read", "tools.pr_review.use", "tools.drift.read"], }, } _write_yaml(os.path.join(root, "config", "rbac_tools_matrix.yml"), matrix) # ─── 1. Tools Drift ─────────────────────────────────────────────────────────── class TestToolsDrift: """tools_rollout has fake_tool_x but handler is absent → DRIFT-TOOLS-001 error.""" def test_rollout_tool_missing_handler(self): from drift_analyzer import analyze_drift with tempfile.TemporaryDirectory() as root: rollout = { "default_tools_read": ["repo_tool", "fake_tool_x"], "role_map": {"agent_default": {"tools": ["@default_tools_read"]}}, "agent_roles": {}, } _write_yaml(os.path.join(root, "config", "tools_rollout.yml"), rollout) _write_yaml(os.path.join(root, "config", "rbac_tools_matrix.yml"), { "tools": { "repo_tool": {"actions": {"read": {"entitlements": ["tools.repo.read"]}}}, }, "role_entitlements": {"agent_default": ["tools.repo.read"]}, }) report = analyze_drift(root, categories=["tools"]) assert report.pass_ is False, "Missing handler should fail" ids = [f["id"] for f in report.findings] assert "DRIFT-TOOLS-001" in ids # Find the specific finding f = next(f for f in report.findings if f["id"] == "DRIFT-TOOLS-001") assert "fake_tool_x" in f["title"] assert f["severity"] == "error" def test_handler_not_in_matrix_generates_tools_002(self): """ Handler exists but not in RBAC matrix → DRIFT-TOOLS-002. Severity is error if the handler is actively rollouted (to signal missing RBAC gate), or warning if experimental/not rollouted. This test verifies the finding is emitted and has the right id. """ from drift_analyzer import analyze_drift, KNOWN_TOOL_HANDLERS with tempfile.TemporaryDirectory() as root: rollout = { "default_tools_read": ["repo_tool"], "role_map": {"agent_default": {"tools": ["@default_tools_read"]}}, "agent_roles": {}, } _write_yaml(os.path.join(root, "config", "tools_rollout.yml"), rollout) # Matrix only has repo_tool; all other handlers are missing _write_yaml(os.path.join(root, "config", "rbac_tools_matrix.yml"), { "tools": { "repo_tool": {"actions": {"read": {"entitlements": ["tools.repo.read"]}}}, }, "role_entitlements": {"agent_default": ["tools.repo.read"]}, }) report = analyze_drift(root, categories=["tools"]) ids = [f["id"] for f in report.findings] assert "DRIFT-TOOLS-002" in ids # All DRIFT-TOOLS-002 findings must have severity "error" or "warning" tools_002 = [f for f in report.findings if f["id"] == "DRIFT-TOOLS-002"] assert all(f["severity"] in ("error", "warning") for f in tools_002) # The finding should mention "absent from rbac_tools_matrix" assert all("absent from rbac_tools_matrix" in f["title"] for f in tools_002) def test_all_tools_consistent_is_pass(self): """All tools in rollout have handlers → no DRIFT-TOOLS-001 errors.""" from drift_analyzer import analyze_drift, KNOWN_TOOL_HANDLERS with tempfile.TemporaryDirectory() as root: # Only use tools that actually have handlers rollout = { "known_tools": ["repo_tool", "kb_tool"], "role_map": {"agent_default": {"tools": ["@known_tools"]}}, "agent_roles": {}, } _write_yaml(os.path.join(root, "config", "tools_rollout.yml"), rollout) _write_yaml(os.path.join(root, "config", "rbac_tools_matrix.yml"), { "tools": { t: {"actions": {"_default": {"entitlements": [f"tools.{t}.use"]}}} for t in KNOWN_TOOL_HANDLERS }, "role_entitlements": {"agent_default": [f"tools.{t}.use" for t in KNOWN_TOOL_HANDLERS]}, }) report = analyze_drift(root, categories=["tools"]) # No DRIFT-TOOLS-001 errors drift_001 = [f for f in report.findings if f["id"] == "DRIFT-TOOLS-001"] assert len(drift_001) == 0 # ─── 2. OpenAPI Drift ───────────────────────────────────────────────────────── class TestOpenAPIDrift: """OpenAPI spec has /v1/ping GET but code has no such route → DRIFT-OAS-001 error.""" def test_openapi_path_missing_in_code(self): from drift_analyzer import analyze_drift with tempfile.TemporaryDirectory() as root: spec = { "openapi": "3.0.0", "info": {"title": "Test", "version": "1.0"}, "paths": { "/health": {"get": {"summary": "Health"}}, "/v1/ping": {"get": {"summary": "Ping"}}, }, } _write_yaml(os.path.join(root, "docs", "contracts", "test.openapi.yaml"), spec) # Code has /health but NOT /v1/ping code = '@app.get("/health")\ndef health(): pass\n' _write(os.path.join(root, "services", "myservice", "main.py"), code) report = analyze_drift(root, categories=["openapi"]) ids = [f["id"] for f in report.findings] # /v1/ping is in spec but not in code → DRIFT-OAS-001 assert "DRIFT-OAS-001" in ids f = next(f for f in report.findings if f["id"] == "DRIFT-OAS-001" and "ping" in f["title"]) assert f["severity"] == "error" assert report.pass_ is False def test_code_route_missing_in_openapi(self): """Code has /v1/agents route not in spec → DRIFT-OAS-002 error.""" from drift_analyzer import analyze_drift with tempfile.TemporaryDirectory() as root: spec = { "openapi": "3.0.0", "info": {"title": "Test", "version": "1.0"}, "paths": {"/health": {"get": {"summary": "Health"}}}, } _write_yaml(os.path.join(root, "docs", "contracts", "test.openapi.yaml"), spec) code = '@app.get("/health")\ndef health(): pass\n@app.post("/v1/agents/infer")\ndef infer(): pass\n' _write(os.path.join(root, "services", "router", "main.py"), code) report = analyze_drift(root, categories=["openapi"]) ids = [f["id"] for f in report.findings] assert "DRIFT-OAS-002" in ids def test_no_openapi_specs_is_pass(self): """No OpenAPI specs found → no findings (spec_paths=0 → skip comparison).""" from drift_analyzer import analyze_drift with tempfile.TemporaryDirectory() as root: code = '@app.get("/health")\ndef health(): pass\n' _write(os.path.join(root, "services", "main.py"), code) report = analyze_drift(root, categories=["openapi"]) assert report.pass_ is True assert report.stats["by_category"]["openapi"]["spec_paths"] == 0 def test_matching_spec_and_code_is_pass(self): """Spec and code match exactly → no errors.""" from drift_analyzer import analyze_drift with tempfile.TemporaryDirectory() as root: spec = { "openapi": "3.0.0", "info": {"title": "T", "version": "1"}, "paths": {"/health": {"get": {"summary": "ok"}}}, } _write_yaml(os.path.join(root, "docs", "contracts", "svc.openapi.yaml"), spec) code = '@app.get("/health")\ndef health(): pass\n' _write(os.path.join(root, "services", "main.py"), code) report = analyze_drift(root, categories=["openapi"]) errors = [f for f in report.findings if f["severity"] == "error"] assert len(errors) == 0, f"Expected no errors: {errors}" # ─── 3. Services Drift ──────────────────────────────────────────────────────── class TestServicesDrift: """Compose has new-service not in catalog → DRIFT-SVC-002 warning (pass=true).""" def _make_catalog_csv(self, root: str, services: list): path = os.path.join(root, "docs", "architecture_inventory", "inventory_services.csv") os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=["service", "type", "runtime", "port(s)", "deps", "image", "compose_file", "node/env"]) writer.writeheader() for svc in services: writer.writerow(svc) def test_compose_service_not_in_catalog(self): """new-mystery-service in compose but not in catalog → DRIFT-SVC-002 warning.""" from drift_analyzer import analyze_drift with tempfile.TemporaryDirectory() as root: self._make_catalog_csv(root, [ {"service": "router", "type": "api", "runtime": "python-fastapi", "port(s)": "9102", "deps": "", "image": "build:.", "compose_file": "docker-compose.node1.yml", "node/env": "node1"}, ]) compose = {"services": {"router": {"image": "router:latest"}, "new-mystery-service": {"image": "mystery:latest"}}} _write_yaml(os.path.join(root, "docker-compose.node1.yml"), compose) report = analyze_drift(root, categories=["services"]) ids = [f["id"] for f in report.findings] assert "DRIFT-SVC-002" in ids f = next(f for f in report.findings if f["id"] == "DRIFT-SVC-002") assert "new-mystery-service" in f["title"] assert f["severity"] == "warning" # Warnings don't fail the gate assert report.pass_ is True def test_deployed_service_missing_in_compose(self): """Service marked DEPLOYED in catalog but absent from compose → DRIFT-SVC-001 error.""" from drift_analyzer import analyze_drift with tempfile.TemporaryDirectory() as root: self._make_catalog_csv(root, [ {"service": "deployed-svc", "type": "DEPLOYED", "runtime": "python", "port(s)": "9000", "deps": "", "image": "build:.", "compose_file": "docker-compose.node1.yml", "node/env": "node1"}, ]) # Compose has some other service, NOT deployed-svc compose = {"services": {"other-svc": {"image": "other:latest"}}} _write_yaml(os.path.join(root, "docker-compose.node1.yml"), compose) report = analyze_drift(root, categories=["services"]) ids = [f["id"] for f in report.findings] assert "DRIFT-SVC-001" in ids f = next(f for f in report.findings if f["id"] == "DRIFT-SVC-001") assert "deployed-svc" in f["title"] assert report.pass_ is False def test_services_match_is_pass(self): """All catalog DEPLOYED services are in compose → no errors.""" from drift_analyzer import analyze_drift with tempfile.TemporaryDirectory() as root: self._make_catalog_csv(root, [ {"service": "router", "type": "DEPLOYED", "runtime": "python", "port(s)": "9102", "deps": "", "image": "build:.", "compose_file": "docker-compose.node1.yml", "node/env": "node1"}, ]) compose = {"services": {"router": {"image": "router:latest"}}} _write_yaml(os.path.join(root, "docker-compose.node1.yml"), compose) report = analyze_drift(root, categories=["services"]) drift_001 = [f for f in report.findings if f["id"] == "DRIFT-SVC-001"] assert len(drift_001) == 0 # ─── 4. NATS Drift ──────────────────────────────────────────────────────────── class TestNATSDrift: def test_missing_inventory_is_skipped(self): """No inventory_nats_topics.csv → nats category skipped, pass=true.""" from drift_analyzer import analyze_drift with tempfile.TemporaryDirectory() as root: # Some Python code that uses NATS code = 'nc.publish("agent.run.requested.myagent", data)\n' _write(os.path.join(root, "services", "worker.py"), code) # No inventory file report = analyze_drift(root, categories=["nats"]) assert "nats" in report.stats.get("skipped", []) assert report.pass_ is True assert report.stats["by_category"] == {} or "nats" not in report.stats.get("by_category", {}) def test_code_subject_not_in_inventory_is_warning(self): """Code uses subject absent from inventory → DRIFT-NATS-001 warning.""" from drift_analyzer import analyze_drift with tempfile.TemporaryDirectory() as root: # Write inventory with known subjects inv_path = os.path.join(root, "docs", "architecture_inventory", "inventory_nats_topics.csv") os.makedirs(os.path.dirname(inv_path), exist_ok=True) with open(inv_path, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=["subject", "publisher(s)", "subscriber(s)", "purpose", "source"]) writer.writeheader() writer.writerow({"subject": "agent.run.completed.{agent_id}", "publisher(s)": "worker", "subscriber(s)": "router", "purpose": "run done", "source": "code"}) # Code uses undocumented subject code = 'nc.publish("totally.undocumented.subject", data)\nnc.publish("agent.run.completed.myagent", data)\n' _write(os.path.join(root, "services", "worker.py"), code) report = analyze_drift(root, categories=["nats"]) ids = [f["id"] for f in report.findings] assert "DRIFT-NATS-001" in ids # Warnings don't fail gate assert report.pass_ is True def test_documented_subject_not_in_code_is_info(self): """Inventory subject not in code → DRIFT-NATS-002 info.""" from drift_analyzer import analyze_drift with tempfile.TemporaryDirectory() as root: inv_path = os.path.join(root, "docs", "architecture_inventory", "inventory_nats_topics.csv") os.makedirs(os.path.dirname(inv_path), exist_ok=True) with open(inv_path, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=["subject", "publisher(s)", "subscriber(s)", "purpose", "source"]) writer.writeheader() writer.writerow({"subject": "legacy.old.subject", "publisher(s)": "oldservice", "subscriber(s)": "none", "purpose": "legacy", "source": "docs"}) # No code with nats usage _write(os.path.join(root, "services", "main.py"), "# no nats here\n") report = analyze_drift(root, categories=["nats"]) ids = [f["id"] for f in report.findings] assert "DRIFT-NATS-002" in ids f = next(f for f in report.findings if f["id"] == "DRIFT-NATS-002") assert f["severity"] == "info" # ─── 5. Report Structure ────────────────────────────────────────────────────── class TestReportStructure: def test_report_has_required_fields(self): from drift_analyzer import analyze_drift with tempfile.TemporaryDirectory() as root: _make_minimal_governance_configs(root) report = analyze_drift(root, categories=["tools"]) assert hasattr(report, "pass_") assert hasattr(report, "summary") assert hasattr(report, "stats") assert hasattr(report, "findings") assert isinstance(report.findings, list) assert isinstance(report.stats, dict) assert "errors" in report.stats assert "warnings" in report.stats def test_findings_sorted_error_first(self): """Findings must be sorted: error > warning > info.""" from drift_analyzer import analyze_drift, Finding, _analyze_tools # Manufacture findings of different severities from drift_analyzer import DriftReport with tempfile.TemporaryDirectory() as root: rollout = { "default_tools_read": ["repo_tool", "ghost_tool_xyz"], "role_map": {"agent_default": {"tools": ["@default_tools_read"]}}, "agent_roles": {}, } _write_yaml(os.path.join(root, "config", "tools_rollout.yml"), rollout) _write_yaml(os.path.join(root, "config", "rbac_tools_matrix.yml"), { "tools": { "repo_tool": {"actions": {"read": {"entitlements": ["tools.repo.read"]}}}, }, "role_entitlements": {"agent_default": ["tools.repo.read"]}, }) report = analyze_drift(root, categories=["tools"]) severity_order = {"error": 0, "warning": 1, "info": 2} severities = [severity_order[f["severity"]] for f in report.findings] assert severities == sorted(severities), "Findings not sorted by severity" def test_evidence_redacted(self): """Secrets in evidence should be redacted.""" from drift_analyzer import _redact_evidence evidence = 'api_key = "sk-abc123def456ghi789" found in config' result = _redact_evidence(evidence) assert "sk-abc123def456ghi789" not in result assert "REDACTED" in result def test_dict_output(self): """analyze_drift_dict returns plain dict with pass key.""" from drift_analyzer import analyze_drift_dict with tempfile.TemporaryDirectory() as root: _make_minimal_governance_configs(root) result = analyze_drift_dict(root, categories=["tools"]) assert isinstance(result, dict) assert "pass" in result assert "findings" in result assert "stats" in result assert "summary" in result # ─── 6. Release Check Integration with Drift Gate ──────────────────────────── class FakeToolResult: def __init__(self, success, result=None, error=None): self.success = success self.result = result self.error = error class TestReleaseCheckWithDrift: def test_drift_error_fails_release_check(self): """When drift_analyzer_tool returns pass=false → release_check fails.""" from release_check_runner import run_release_check class DriftFailTM: async def execute_tool(self, tool_name, arguments, agent_id=None, **kwargs): if tool_name == "pr_reviewer_tool": return FakeToolResult(True, {"blocking_count": 0}) if tool_name == "config_linter_tool": return FakeToolResult(True, {"blocking_count": 0}) if tool_name == "contract_tool": return FakeToolResult(True, {"breaking_count": 0}) if tool_name == "threatmodel_tool": return FakeToolResult(True, {"unmitigated_high_count": 0}) if tool_name == "drift_analyzer_tool": return FakeToolResult(True, { "pass": False, "summary": "Drift errors found", "stats": {"errors": 2, "warnings": 0, "skipped": []}, "findings": [ {"severity": "error", "id": "DRIFT-TOOLS-001", "title": "Tool fake_x missing handler", "category": "tools"}, ], }) return FakeToolResult(True, {}) inputs = { "service_name": "router", "diff_text": "minor", "run_drift": True, } report = asyncio.run(run_release_check(DriftFailTM(), inputs, "sofiia")) assert report["pass"] is False drift_gate = next(g for g in report["gates"] if g["name"] == "drift") assert drift_gate["status"] == "fail" assert drift_gate["errors"] == 2 def test_drift_warnings_only_pass_release(self): """Drift has only warnings → drift gate passes → release passes.""" from release_check_runner import run_release_check class DriftWarnTM: async def execute_tool(self, tool_name, arguments, agent_id=None, **kwargs): if tool_name == "pr_reviewer_tool": return FakeToolResult(True, {"blocking_count": 0}) if tool_name == "config_linter_tool": return FakeToolResult(True, {"blocking_count": 0}) if tool_name == "contract_tool": return FakeToolResult(True, {"breaking_count": 0}) if tool_name == "threatmodel_tool": return FakeToolResult(True, {"unmitigated_high_count": 0}) if tool_name == "drift_analyzer_tool": return FakeToolResult(True, { "pass": True, "summary": "2 warnings, no errors", "stats": {"errors": 0, "warnings": 2, "skipped": []}, "findings": [ {"severity": "warning", "id": "DRIFT-SVC-002", "title": "new-svc in compose not in catalog", "category": "services"}, ], }) return FakeToolResult(True, {}) inputs = {"service_name": "router", "diff_text": "minor", "run_drift": True} report = asyncio.run(run_release_check(DriftWarnTM(), inputs, "sofiia")) assert report["pass"] is True drift_gate = next(g for g in report["gates"] if g["name"] == "drift") assert drift_gate["status"] == "pass" assert drift_gate["warnings"] == 2 def test_no_drift_flag_skips_gate(self): """run_drift=False (default) → no drift gate in report.""" from release_check_runner import run_release_check class MinimalTM: async def execute_tool(self, tool_name, arguments, agent_id=None, **kwargs): if tool_name == "pr_reviewer_tool": return FakeToolResult(True, {"blocking_count": 0}) if tool_name == "config_linter_tool": return FakeToolResult(True, {"blocking_count": 0}) if tool_name == "threatmodel_tool": return FakeToolResult(True, {"unmitigated_high_count": 0}) return FakeToolResult(True, {}) inputs = {"service_name": "router"} # run_drift defaults to False report = asyncio.run(run_release_check(MinimalTM(), inputs, "sofiia")) drift_gate = next((g for g in report["gates"] if g["name"] == "drift"), None) assert drift_gate is None, "Drift gate should not appear when run_drift=False" # ─── 7. NATS Wildcard Matching ──────────────────────────────────────────────── class TestNATSWildcardMatching: def test_exact_match(self): from drift_analyzer import _nats_subject_matches assert _nats_subject_matches("agent.run.completed.myagent", ["agent.run.completed.*"]) def test_wildcard_no_match(self): from drift_analyzer import _nats_subject_matches assert not _nats_subject_matches("totally.different.subject", ["agent.run.completed.*"]) def test_gt_wildcard(self): from drift_analyzer import _nats_subject_matches assert _nats_subject_matches("agent.run.completed.myagent.extra", ["agent.run.>"]) def test_inventory_wildcard_matches_code(self): from drift_analyzer import _nats_subject_matches assert _nats_subject_matches("agent.run.completed.*", ["agent.run.completed.myagent"]) def test_different_segment_count(self): from drift_analyzer import _nats_subject_matches assert not _nats_subject_matches("a.b", ["a.b.c"]) # ─── 8. RBAC: drift tool entitlements ──────────────────────────────────────── class TestDriftRBAC: def test_cto_can_run_drift(self): from tool_governance import check_rbac ok, reason = check_rbac("sofiia", "drift_analyzer_tool", "analyze") assert ok, f"sofiia CTO should have tools.drift.read: {reason}" def test_cto_has_drift_gate(self): from tool_governance import check_rbac ok, reason = check_rbac("sofiia", "drift_analyzer_tool", "gate") assert ok, f"sofiia CTO should have tools.drift.gate: {reason}" def test_default_agent_denied_drift_gate(self): from tool_governance import check_rbac ok, reason = check_rbac("brand_new_agent_xyz", "drift_analyzer_tool", "gate") assert not ok, "Default agent should NOT have tools.drift.gate" def test_sofiia_gets_drift_tool_in_rollout(self): from agent_tools_config import get_agent_tools, reload_rollout_config reload_rollout_config() tools = get_agent_tools("sofiia") assert "drift_analyzer_tool" in tools, "Sofiia (CTO) should have drift_analyzer_tool"