Files
microdao-daarion/tests/test_drift_analyzer.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

619 lines
28 KiB
Python

"""
Tests for Drift Analyzer.
Uses isolated temp directories as mini-repo fixtures — no dependency on actual repo content.
Categories:
1. tools: rollout tool without handler → DRIFT-TOOLS-001 error
2. openapi: OpenAPI path not in code → DRIFT-OAS-001 error
3. services: compose service not in catalog → DRIFT-SVC-002 warning (pass=true)
4. nats: missing inventory → skipped, pass not affected
5. nats: code subject not in inventory → DRIFT-NATS-001 warning
6. integration: release_check with drift gate
"""
import asyncio
import csv
import json
import os
import sys
import tempfile
import yaml
import pytest
# Ensure imports work
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "router"))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
# ─── Fixture Helpers ─────────────────────────────────────────────────────────
def _write(path: str, content: str):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write(content)
def _write_yaml(path: str, data):
_write(path, yaml.dump(data))
def _make_minimal_governance_configs(root: str):
"""Write minimal tools_rollout.yml and rbac_tools_matrix.yml for tools drift tests."""
rollout = {
"default_tools_read": ["repo_tool", "kb_tool"],
"cto_tools": ["pr_reviewer_tool", "drift_analyzer_tool"],
"role_map": {
"agent_default": {"tools": ["@default_tools_read"]},
"agent_cto": {"tools": ["@default_tools_read", "@cto_tools"]},
},
"agent_roles": {"sofiia": "agent_cto"},
}
_write_yaml(os.path.join(root, "config", "tools_rollout.yml"), rollout)
matrix = {
"tools": {
"repo_tool": {"actions": {"read": {"entitlements": ["tools.repo.read"]}}},
"kb_tool": {"actions": {"search": {"entitlements": ["tools.kb.read"]}}},
"pr_reviewer_tool": {"actions": {"review": {"entitlements": ["tools.pr_review.use"]}}},
"drift_analyzer_tool": {"actions": {"analyze": {"entitlements": ["tools.drift.read"]}}},
},
"role_entitlements": {
"agent_default": ["tools.repo.read", "tools.kb.read"],
"agent_cto": ["tools.repo.read", "tools.pr_review.use", "tools.drift.read"],
},
}
_write_yaml(os.path.join(root, "config", "rbac_tools_matrix.yml"), matrix)
# ─── 1. Tools Drift ───────────────────────────────────────────────────────────
class TestToolsDrift:
"""tools_rollout has fake_tool_x but handler is absent → DRIFT-TOOLS-001 error."""
def test_rollout_tool_missing_handler(self):
from drift_analyzer import analyze_drift
with tempfile.TemporaryDirectory() as root:
rollout = {
"default_tools_read": ["repo_tool", "fake_tool_x"],
"role_map": {"agent_default": {"tools": ["@default_tools_read"]}},
"agent_roles": {},
}
_write_yaml(os.path.join(root, "config", "tools_rollout.yml"), rollout)
_write_yaml(os.path.join(root, "config", "rbac_tools_matrix.yml"), {
"tools": {
"repo_tool": {"actions": {"read": {"entitlements": ["tools.repo.read"]}}},
},
"role_entitlements": {"agent_default": ["tools.repo.read"]},
})
report = analyze_drift(root, categories=["tools"])
assert report.pass_ is False, "Missing handler should fail"
ids = [f["id"] for f in report.findings]
assert "DRIFT-TOOLS-001" in ids
# Find the specific finding
f = next(f for f in report.findings if f["id"] == "DRIFT-TOOLS-001")
assert "fake_tool_x" in f["title"]
assert f["severity"] == "error"
def test_handler_not_in_matrix_generates_tools_002(self):
"""
Handler exists but not in RBAC matrix → DRIFT-TOOLS-002.
Severity is error if the handler is actively rollouted (to signal missing RBAC gate),
or warning if experimental/not rollouted.
This test verifies the finding is emitted and has the right id.
"""
from drift_analyzer import analyze_drift, KNOWN_TOOL_HANDLERS
with tempfile.TemporaryDirectory() as root:
rollout = {
"default_tools_read": ["repo_tool"],
"role_map": {"agent_default": {"tools": ["@default_tools_read"]}},
"agent_roles": {},
}
_write_yaml(os.path.join(root, "config", "tools_rollout.yml"), rollout)
# Matrix only has repo_tool; all other handlers are missing
_write_yaml(os.path.join(root, "config", "rbac_tools_matrix.yml"), {
"tools": {
"repo_tool": {"actions": {"read": {"entitlements": ["tools.repo.read"]}}},
},
"role_entitlements": {"agent_default": ["tools.repo.read"]},
})
report = analyze_drift(root, categories=["tools"])
ids = [f["id"] for f in report.findings]
assert "DRIFT-TOOLS-002" in ids
# All DRIFT-TOOLS-002 findings must have severity "error" or "warning"
tools_002 = [f for f in report.findings if f["id"] == "DRIFT-TOOLS-002"]
assert all(f["severity"] in ("error", "warning") for f in tools_002)
# The finding should mention "absent from rbac_tools_matrix"
assert all("absent from rbac_tools_matrix" in f["title"] for f in tools_002)
def test_all_tools_consistent_is_pass(self):
"""All tools in rollout have handlers → no DRIFT-TOOLS-001 errors."""
from drift_analyzer import analyze_drift, KNOWN_TOOL_HANDLERS
with tempfile.TemporaryDirectory() as root:
# Only use tools that actually have handlers
rollout = {
"known_tools": ["repo_tool", "kb_tool"],
"role_map": {"agent_default": {"tools": ["@known_tools"]}},
"agent_roles": {},
}
_write_yaml(os.path.join(root, "config", "tools_rollout.yml"), rollout)
_write_yaml(os.path.join(root, "config", "rbac_tools_matrix.yml"), {
"tools": {
t: {"actions": {"_default": {"entitlements": [f"tools.{t}.use"]}}}
for t in KNOWN_TOOL_HANDLERS
},
"role_entitlements": {"agent_default": [f"tools.{t}.use" for t in KNOWN_TOOL_HANDLERS]},
})
report = analyze_drift(root, categories=["tools"])
# No DRIFT-TOOLS-001 errors
drift_001 = [f for f in report.findings if f["id"] == "DRIFT-TOOLS-001"]
assert len(drift_001) == 0
# ─── 2. OpenAPI Drift ─────────────────────────────────────────────────────────
class TestOpenAPIDrift:
"""OpenAPI spec has /v1/ping GET but code has no such route → DRIFT-OAS-001 error."""
def test_openapi_path_missing_in_code(self):
from drift_analyzer import analyze_drift
with tempfile.TemporaryDirectory() as root:
spec = {
"openapi": "3.0.0",
"info": {"title": "Test", "version": "1.0"},
"paths": {
"/health": {"get": {"summary": "Health"}},
"/v1/ping": {"get": {"summary": "Ping"}},
},
}
_write_yaml(os.path.join(root, "docs", "contracts", "test.openapi.yaml"), spec)
# Code has /health but NOT /v1/ping
code = '@app.get("/health")\ndef health(): pass\n'
_write(os.path.join(root, "services", "myservice", "main.py"), code)
report = analyze_drift(root, categories=["openapi"])
ids = [f["id"] for f in report.findings]
# /v1/ping is in spec but not in code → DRIFT-OAS-001
assert "DRIFT-OAS-001" in ids
f = next(f for f in report.findings if f["id"] == "DRIFT-OAS-001" and "ping" in f["title"])
assert f["severity"] == "error"
assert report.pass_ is False
def test_code_route_missing_in_openapi(self):
"""Code has /v1/agents route not in spec → DRIFT-OAS-002 error."""
from drift_analyzer import analyze_drift
with tempfile.TemporaryDirectory() as root:
spec = {
"openapi": "3.0.0",
"info": {"title": "Test", "version": "1.0"},
"paths": {"/health": {"get": {"summary": "Health"}}},
}
_write_yaml(os.path.join(root, "docs", "contracts", "test.openapi.yaml"), spec)
code = '@app.get("/health")\ndef health(): pass\n@app.post("/v1/agents/infer")\ndef infer(): pass\n'
_write(os.path.join(root, "services", "router", "main.py"), code)
report = analyze_drift(root, categories=["openapi"])
ids = [f["id"] for f in report.findings]
assert "DRIFT-OAS-002" in ids
def test_no_openapi_specs_is_pass(self):
"""No OpenAPI specs found → no findings (spec_paths=0 → skip comparison)."""
from drift_analyzer import analyze_drift
with tempfile.TemporaryDirectory() as root:
code = '@app.get("/health")\ndef health(): pass\n'
_write(os.path.join(root, "services", "main.py"), code)
report = analyze_drift(root, categories=["openapi"])
assert report.pass_ is True
assert report.stats["by_category"]["openapi"]["spec_paths"] == 0
def test_matching_spec_and_code_is_pass(self):
"""Spec and code match exactly → no errors."""
from drift_analyzer import analyze_drift
with tempfile.TemporaryDirectory() as root:
spec = {
"openapi": "3.0.0",
"info": {"title": "T", "version": "1"},
"paths": {"/health": {"get": {"summary": "ok"}}},
}
_write_yaml(os.path.join(root, "docs", "contracts", "svc.openapi.yaml"), spec)
code = '@app.get("/health")\ndef health(): pass\n'
_write(os.path.join(root, "services", "main.py"), code)
report = analyze_drift(root, categories=["openapi"])
errors = [f for f in report.findings if f["severity"] == "error"]
assert len(errors) == 0, f"Expected no errors: {errors}"
# ─── 3. Services Drift ────────────────────────────────────────────────────────
class TestServicesDrift:
"""Compose has new-service not in catalog → DRIFT-SVC-002 warning (pass=true)."""
def _make_catalog_csv(self, root: str, services: list):
path = os.path.join(root, "docs", "architecture_inventory", "inventory_services.csv")
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["service", "type", "runtime", "port(s)", "deps", "image", "compose_file", "node/env"])
writer.writeheader()
for svc in services:
writer.writerow(svc)
def test_compose_service_not_in_catalog(self):
"""new-mystery-service in compose but not in catalog → DRIFT-SVC-002 warning."""
from drift_analyzer import analyze_drift
with tempfile.TemporaryDirectory() as root:
self._make_catalog_csv(root, [
{"service": "router", "type": "api", "runtime": "python-fastapi",
"port(s)": "9102", "deps": "", "image": "build:.", "compose_file": "docker-compose.node1.yml", "node/env": "node1"},
])
compose = {"services": {"router": {"image": "router:latest"}, "new-mystery-service": {"image": "mystery:latest"}}}
_write_yaml(os.path.join(root, "docker-compose.node1.yml"), compose)
report = analyze_drift(root, categories=["services"])
ids = [f["id"] for f in report.findings]
assert "DRIFT-SVC-002" in ids
f = next(f for f in report.findings if f["id"] == "DRIFT-SVC-002")
assert "new-mystery-service" in f["title"]
assert f["severity"] == "warning"
# Warnings don't fail the gate
assert report.pass_ is True
def test_deployed_service_missing_in_compose(self):
"""Service marked DEPLOYED in catalog but absent from compose → DRIFT-SVC-001 error."""
from drift_analyzer import analyze_drift
with tempfile.TemporaryDirectory() as root:
self._make_catalog_csv(root, [
{"service": "deployed-svc", "type": "DEPLOYED", "runtime": "python",
"port(s)": "9000", "deps": "", "image": "build:.", "compose_file": "docker-compose.node1.yml", "node/env": "node1"},
])
# Compose has some other service, NOT deployed-svc
compose = {"services": {"other-svc": {"image": "other:latest"}}}
_write_yaml(os.path.join(root, "docker-compose.node1.yml"), compose)
report = analyze_drift(root, categories=["services"])
ids = [f["id"] for f in report.findings]
assert "DRIFT-SVC-001" in ids
f = next(f for f in report.findings if f["id"] == "DRIFT-SVC-001")
assert "deployed-svc" in f["title"]
assert report.pass_ is False
def test_services_match_is_pass(self):
"""All catalog DEPLOYED services are in compose → no errors."""
from drift_analyzer import analyze_drift
with tempfile.TemporaryDirectory() as root:
self._make_catalog_csv(root, [
{"service": "router", "type": "DEPLOYED", "runtime": "python",
"port(s)": "9102", "deps": "", "image": "build:.", "compose_file": "docker-compose.node1.yml", "node/env": "node1"},
])
compose = {"services": {"router": {"image": "router:latest"}}}
_write_yaml(os.path.join(root, "docker-compose.node1.yml"), compose)
report = analyze_drift(root, categories=["services"])
drift_001 = [f for f in report.findings if f["id"] == "DRIFT-SVC-001"]
assert len(drift_001) == 0
# ─── 4. NATS Drift ────────────────────────────────────────────────────────────
class TestNATSDrift:
def test_missing_inventory_is_skipped(self):
"""No inventory_nats_topics.csv → nats category skipped, pass=true."""
from drift_analyzer import analyze_drift
with tempfile.TemporaryDirectory() as root:
# Some Python code that uses NATS
code = 'nc.publish("agent.run.requested.myagent", data)\n'
_write(os.path.join(root, "services", "worker.py"), code)
# No inventory file
report = analyze_drift(root, categories=["nats"])
assert "nats" in report.stats.get("skipped", [])
assert report.pass_ is True
assert report.stats["by_category"] == {} or "nats" not in report.stats.get("by_category", {})
def test_code_subject_not_in_inventory_is_warning(self):
"""Code uses subject absent from inventory → DRIFT-NATS-001 warning."""
from drift_analyzer import analyze_drift
with tempfile.TemporaryDirectory() as root:
# Write inventory with known subjects
inv_path = os.path.join(root, "docs", "architecture_inventory", "inventory_nats_topics.csv")
os.makedirs(os.path.dirname(inv_path), exist_ok=True)
with open(inv_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["subject", "publisher(s)", "subscriber(s)", "purpose", "source"])
writer.writeheader()
writer.writerow({"subject": "agent.run.completed.{agent_id}", "publisher(s)": "worker",
"subscriber(s)": "router", "purpose": "run done", "source": "code"})
# Code uses undocumented subject
code = 'nc.publish("totally.undocumented.subject", data)\nnc.publish("agent.run.completed.myagent", data)\n'
_write(os.path.join(root, "services", "worker.py"), code)
report = analyze_drift(root, categories=["nats"])
ids = [f["id"] for f in report.findings]
assert "DRIFT-NATS-001" in ids
# Warnings don't fail gate
assert report.pass_ is True
def test_documented_subject_not_in_code_is_info(self):
"""Inventory subject not in code → DRIFT-NATS-002 info."""
from drift_analyzer import analyze_drift
with tempfile.TemporaryDirectory() as root:
inv_path = os.path.join(root, "docs", "architecture_inventory", "inventory_nats_topics.csv")
os.makedirs(os.path.dirname(inv_path), exist_ok=True)
with open(inv_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["subject", "publisher(s)", "subscriber(s)", "purpose", "source"])
writer.writeheader()
writer.writerow({"subject": "legacy.old.subject", "publisher(s)": "oldservice",
"subscriber(s)": "none", "purpose": "legacy", "source": "docs"})
# No code with nats usage
_write(os.path.join(root, "services", "main.py"), "# no nats here\n")
report = analyze_drift(root, categories=["nats"])
ids = [f["id"] for f in report.findings]
assert "DRIFT-NATS-002" in ids
f = next(f for f in report.findings if f["id"] == "DRIFT-NATS-002")
assert f["severity"] == "info"
# ─── 5. Report Structure ──────────────────────────────────────────────────────
class TestReportStructure:
def test_report_has_required_fields(self):
from drift_analyzer import analyze_drift
with tempfile.TemporaryDirectory() as root:
_make_minimal_governance_configs(root)
report = analyze_drift(root, categories=["tools"])
assert hasattr(report, "pass_")
assert hasattr(report, "summary")
assert hasattr(report, "stats")
assert hasattr(report, "findings")
assert isinstance(report.findings, list)
assert isinstance(report.stats, dict)
assert "errors" in report.stats
assert "warnings" in report.stats
def test_findings_sorted_error_first(self):
"""Findings must be sorted: error > warning > info."""
from drift_analyzer import analyze_drift, Finding, _analyze_tools
# Manufacture findings of different severities
from drift_analyzer import DriftReport
with tempfile.TemporaryDirectory() as root:
rollout = {
"default_tools_read": ["repo_tool", "ghost_tool_xyz"],
"role_map": {"agent_default": {"tools": ["@default_tools_read"]}},
"agent_roles": {},
}
_write_yaml(os.path.join(root, "config", "tools_rollout.yml"), rollout)
_write_yaml(os.path.join(root, "config", "rbac_tools_matrix.yml"), {
"tools": {
"repo_tool": {"actions": {"read": {"entitlements": ["tools.repo.read"]}}},
},
"role_entitlements": {"agent_default": ["tools.repo.read"]},
})
report = analyze_drift(root, categories=["tools"])
severity_order = {"error": 0, "warning": 1, "info": 2}
severities = [severity_order[f["severity"]] for f in report.findings]
assert severities == sorted(severities), "Findings not sorted by severity"
def test_evidence_redacted(self):
"""Secrets in evidence should be redacted."""
from drift_analyzer import _redact_evidence
evidence = 'api_key = "sk-abc123def456ghi789" found in config'
result = _redact_evidence(evidence)
assert "sk-abc123def456ghi789" not in result
assert "REDACTED" in result
def test_dict_output(self):
"""analyze_drift_dict returns plain dict with pass key."""
from drift_analyzer import analyze_drift_dict
with tempfile.TemporaryDirectory() as root:
_make_minimal_governance_configs(root)
result = analyze_drift_dict(root, categories=["tools"])
assert isinstance(result, dict)
assert "pass" in result
assert "findings" in result
assert "stats" in result
assert "summary" in result
# ─── 6. Release Check Integration with Drift Gate ────────────────────────────
class FakeToolResult:
def __init__(self, success, result=None, error=None):
self.success = success
self.result = result
self.error = error
class TestReleaseCheckWithDrift:
def test_drift_error_fails_release_check(self):
"""When drift_analyzer_tool returns pass=false → release_check fails."""
from release_check_runner import run_release_check
class DriftFailTM:
async def execute_tool(self, tool_name, arguments, agent_id=None, **kwargs):
if tool_name == "pr_reviewer_tool":
return FakeToolResult(True, {"blocking_count": 0})
if tool_name == "config_linter_tool":
return FakeToolResult(True, {"blocking_count": 0})
if tool_name == "contract_tool":
return FakeToolResult(True, {"breaking_count": 0})
if tool_name == "threatmodel_tool":
return FakeToolResult(True, {"unmitigated_high_count": 0})
if tool_name == "drift_analyzer_tool":
return FakeToolResult(True, {
"pass": False,
"summary": "Drift errors found",
"stats": {"errors": 2, "warnings": 0, "skipped": []},
"findings": [
{"severity": "error", "id": "DRIFT-TOOLS-001",
"title": "Tool fake_x missing handler", "category": "tools"},
],
})
return FakeToolResult(True, {})
inputs = {
"service_name": "router",
"diff_text": "minor",
"run_drift": True,
}
report = asyncio.run(run_release_check(DriftFailTM(), inputs, "sofiia"))
assert report["pass"] is False
drift_gate = next(g for g in report["gates"] if g["name"] == "drift")
assert drift_gate["status"] == "fail"
assert drift_gate["errors"] == 2
def test_drift_warnings_only_pass_release(self):
"""Drift has only warnings → drift gate passes → release passes."""
from release_check_runner import run_release_check
class DriftWarnTM:
async def execute_tool(self, tool_name, arguments, agent_id=None, **kwargs):
if tool_name == "pr_reviewer_tool":
return FakeToolResult(True, {"blocking_count": 0})
if tool_name == "config_linter_tool":
return FakeToolResult(True, {"blocking_count": 0})
if tool_name == "contract_tool":
return FakeToolResult(True, {"breaking_count": 0})
if tool_name == "threatmodel_tool":
return FakeToolResult(True, {"unmitigated_high_count": 0})
if tool_name == "drift_analyzer_tool":
return FakeToolResult(True, {
"pass": True,
"summary": "2 warnings, no errors",
"stats": {"errors": 0, "warnings": 2, "skipped": []},
"findings": [
{"severity": "warning", "id": "DRIFT-SVC-002",
"title": "new-svc in compose not in catalog", "category": "services"},
],
})
return FakeToolResult(True, {})
inputs = {"service_name": "router", "diff_text": "minor", "run_drift": True}
report = asyncio.run(run_release_check(DriftWarnTM(), inputs, "sofiia"))
assert report["pass"] is True
drift_gate = next(g for g in report["gates"] if g["name"] == "drift")
assert drift_gate["status"] == "pass"
assert drift_gate["warnings"] == 2
def test_no_drift_flag_skips_gate(self):
"""run_drift=False (default) → no drift gate in report."""
from release_check_runner import run_release_check
class MinimalTM:
async def execute_tool(self, tool_name, arguments, agent_id=None, **kwargs):
if tool_name == "pr_reviewer_tool":
return FakeToolResult(True, {"blocking_count": 0})
if tool_name == "config_linter_tool":
return FakeToolResult(True, {"blocking_count": 0})
if tool_name == "threatmodel_tool":
return FakeToolResult(True, {"unmitigated_high_count": 0})
return FakeToolResult(True, {})
inputs = {"service_name": "router"} # run_drift defaults to False
report = asyncio.run(run_release_check(MinimalTM(), inputs, "sofiia"))
drift_gate = next((g for g in report["gates"] if g["name"] == "drift"), None)
assert drift_gate is None, "Drift gate should not appear when run_drift=False"
# ─── 7. NATS Wildcard Matching ────────────────────────────────────────────────
class TestNATSWildcardMatching:
def test_exact_match(self):
from drift_analyzer import _nats_subject_matches
assert _nats_subject_matches("agent.run.completed.myagent",
["agent.run.completed.*"])
def test_wildcard_no_match(self):
from drift_analyzer import _nats_subject_matches
assert not _nats_subject_matches("totally.different.subject",
["agent.run.completed.*"])
def test_gt_wildcard(self):
from drift_analyzer import _nats_subject_matches
assert _nats_subject_matches("agent.run.completed.myagent.extra",
["agent.run.>"])
def test_inventory_wildcard_matches_code(self):
from drift_analyzer import _nats_subject_matches
assert _nats_subject_matches("agent.run.completed.*",
["agent.run.completed.myagent"])
def test_different_segment_count(self):
from drift_analyzer import _nats_subject_matches
assert not _nats_subject_matches("a.b", ["a.b.c"])
# ─── 8. RBAC: drift tool entitlements ────────────────────────────────────────
class TestDriftRBAC:
def test_cto_can_run_drift(self):
from tool_governance import check_rbac
ok, reason = check_rbac("sofiia", "drift_analyzer_tool", "analyze")
assert ok, f"sofiia CTO should have tools.drift.read: {reason}"
def test_cto_has_drift_gate(self):
from tool_governance import check_rbac
ok, reason = check_rbac("sofiia", "drift_analyzer_tool", "gate")
assert ok, f"sofiia CTO should have tools.drift.gate: {reason}"
def test_default_agent_denied_drift_gate(self):
from tool_governance import check_rbac
ok, reason = check_rbac("brand_new_agent_xyz", "drift_analyzer_tool", "gate")
assert not ok, "Default agent should NOT have tools.drift.gate"
def test_sofiia_gets_drift_tool_in_rollout(self):
from agent_tools_config import get_agent_tools, reload_rollout_config
reload_rollout_config()
tools = get_agent_tools("sofiia")
assert "drift_analyzer_tool" in tools, "Sofiia (CTO) should have drift_analyzer_tool"