Files
microdao-daarion/tests/test_tool_governance.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

406 lines
16 KiB
Python

"""
Tests for Tool Governance System:
1. default_tools merge: agent without explicit tools gets read tools
2. RBAC matrix enforcement: deny without entitlement
3. Middleware:
- max_chars_in enforced
- redaction applied
- allowlist blocks unknown host
4. release_check:
- fixture diff with secret → gate fail
- fixture openapi breaking → gate fail
- pass case → gate pass
"""
import asyncio
import os
import sys
import pytest
# Add services/router to path for imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "router"))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
# ─── 1. default_tools merge ───────────────────────────────────────────────────
class TestDefaultToolsMerge:
"""Agent without explicit tools should get read tools automatically."""
def test_unknown_agent_gets_read_tools(self):
from agent_tools_config import get_agent_tools, reload_rollout_config
reload_rollout_config()
tools = get_agent_tools("brand_new_agent_xyz")
# Must include baseline read tools
for expected in ["repo_tool", "kb_tool", "oncall_tool", "observability_tool"]:
assert expected in tools, f"Missing {expected} in tools for unknown agent"
def test_sofiia_gets_cto_tools(self):
from agent_tools_config import get_agent_tools, reload_rollout_config
reload_rollout_config()
tools = get_agent_tools("sofiia")
for expected in [
"pr_reviewer_tool", "contract_tool", "config_linter_tool",
"threatmodel_tool", "job_orchestrator_tool",
]:
assert expected in tools, f"Sofiia missing CTO tool: {expected}"
def test_sofiia_gets_media_tools(self):
from agent_tools_config import get_agent_tools, reload_rollout_config
reload_rollout_config()
tools = get_agent_tools("sofiia")
assert "comfy_generate_image" in tools
assert "comfy_generate_video" in tools
def test_helion_gets_read_tools(self):
from agent_tools_config import get_agent_tools, reload_rollout_config
reload_rollout_config()
tools = get_agent_tools("helion")
for expected in ["repo_tool", "kb_tool", "oncall_tool", "job_orchestrator_tool"]:
assert expected in tools
def test_no_duplicates(self):
from agent_tools_config import get_agent_tools, reload_rollout_config
reload_rollout_config()
tools = get_agent_tools("sofiia")
assert len(tools) == len(set(tools)), "Duplicate tools in list"
def test_stable_order(self):
"""get_agent_tools should return same order on repeated calls."""
from agent_tools_config import get_agent_tools
t1 = get_agent_tools("sofiia")
t2 = get_agent_tools("sofiia")
assert t1 == t2
def test_get_agent_role(self):
from agent_tools_config import get_agent_role
assert get_agent_role("sofiia") == "agent_cto"
assert get_agent_role("helion") == "agent_oncall"
assert get_agent_role("unknown_xyz") == "agent_default"
# ─── 2. RBAC Matrix Enforcement ───────────────────────────────────────────────
class TestRBACMatrix:
"""Tool dispatch should deny calls without required entitlements."""
def test_agent_cto_has_pr_review(self):
from tool_governance import check_rbac
ok, reason = check_rbac("sofiia", "pr_reviewer_tool", "review")
assert ok, f"sofiia should have pr_review.use: {reason}"
def test_agent_default_denied_pr_review_gate(self):
"""agent_default role does NOT have tools.pr_review.gate entitlement."""
from tool_governance import check_rbac
# Use a new/unknown agent that gets agent_default role
ok, reason = check_rbac("brand_new_agent_xyz", "pr_reviewer_tool", "gate")
assert not ok, "Default agent should NOT have gate entitlement"
assert "Missing entitlements" in reason
def test_agent_default_allowed_repo_read(self):
from tool_governance import check_rbac
ok, reason = check_rbac("brand_new_agent_xyz", "repo_tool", "read")
assert ok, f"Default agent should have repo.read: {reason}"
def test_agent_default_denied_incident_write(self):
from tool_governance import check_rbac
ok, reason = check_rbac("brand_new_agent_xyz", "oncall_tool", "incident_create")
assert not ok, "Default agent should NOT have incident_write"
def test_cto_allowed_incident_write(self):
from tool_governance import check_rbac
ok, reason = check_rbac("sofiia", "oncall_tool", "incident_create")
assert ok, f"sofiia CTO should have incident_write: {reason}"
def test_unknown_tool_allowed(self):
"""Tools not in matrix should not be blocked (no required entitlements)."""
from tool_governance import check_rbac
ok, _ = check_rbac("sofiia", "some_unknown_tool", "action")
assert ok, "Tool not in matrix should pass (no requirements)"
# ─── 3. Middleware ────────────────────────────────────────────────────────────
class TestMiddleware:
def test_max_chars_in_enforced(self):
from tool_governance import check_input_limits
oversized = "x" * 600_000 # 600KB > pr_reviewer_tool 400KB limit
ok, reason, limits = check_input_limits("pr_reviewer_tool", oversized)
assert not ok
assert "too large" in reason.lower()
def test_within_limit_passes(self):
from tool_governance import check_input_limits
small = "x" * 100
ok, reason, limits = check_input_limits("pr_reviewer_tool", small)
assert ok, f"Small input should pass: {reason}"
def test_redaction_api_key(self):
from tool_governance import redact
text = 'api_key = "sk-abc123def456xyz789"'
result = redact(text)
assert "sk-abc123def456xyz789" not in result
assert "REDACTED" in result
def test_redaction_password(self):
from tool_governance import redact
text = "password = 'super_secret_123'"
result = redact(text)
assert "super_secret_123" not in result
def test_redaction_empty_string(self):
from tool_governance import redact
assert redact("") == ""
assert redact(None) is None
def test_allowlist_blocks_unknown_host(self):
from tool_governance import check_url_allowed
ok, reason = check_url_allowed("oncall_tool", "http://evil.example.com/steal")
assert not ok
assert "not in allowlist" in reason.lower() or "no allowlist" in reason.lower()
def test_allowlist_allows_localhost(self):
from tool_governance import check_url_allowed
ok, reason = check_url_allowed("oncall_tool", "http://localhost:9102/health")
assert ok, f"localhost should be allowed: {reason}"
def test_allowlist_blocks_private_ip_for_web_extract(self):
from tool_governance import check_url_allowed
ok, reason = check_url_allowed("web_extract", "https://192.168.1.1/admin")
assert not ok, "Private IP should be blocked for web_extract"
def test_allowlist_allows_public_url_for_web_extract(self):
from tool_governance import check_url_allowed
# web_extract has allow_any_public = true
ok, reason = check_url_allowed("web_extract", "https://example.com/page")
assert ok, f"Public URL should be allowed for web_extract: {reason}"
class TestPreCallGovernance:
"""Test the pre_call method of ToolGovernance."""
def test_rbac_denial_in_pre_call(self):
from tool_governance import ToolGovernance
gov = ToolGovernance()
pre = gov.pre_call(
tool="pr_reviewer_tool",
action="gate",
agent_id="brand_new_agent_xyz",
)
assert not pre.allowed
assert "RBAC" in pre.reason
def test_allowed_call_returns_context(self):
from tool_governance import ToolGovernance
gov = ToolGovernance()
pre = gov.pre_call(
tool="repo_tool",
action="read",
agent_id="sofiia",
)
assert pre.allowed
assert pre.call_ctx is not None
assert pre.call_ctx.req_id
def test_limits_denial_in_pre_call(self):
from tool_governance import ToolGovernance
gov = ToolGovernance()
big_input = "x" * 600_000
pre = gov.pre_call(
tool="pr_reviewer_tool",
action="review",
agent_id="sofiia",
input_text=big_input,
)
assert not pre.allowed
assert "Limits exceeded" in pre.reason
# ─── 4. release_check ─────────────────────────────────────────────────────────
class FakeToolResult:
def __init__(self, success, result=None, error=None):
self.success = success
self.result = result
self.error = error
class MockToolManager:
"""Minimal mock for release_check_runner tests."""
def __init__(self, responses: dict):
self._responses = responses # tool_name → result
async def execute_tool(self, tool_name, arguments, agent_id=None, **kwargs):
if tool_name in self._responses:
return self._responses[tool_name]
return FakeToolResult(success=True, result={})
class TestReleaseCheck:
def _run(self, coro):
return asyncio.run(coro)
def test_fixture_diff_with_secret_fails_pr_review(self):
"""Diff containing a secret → pr_review gate fail → overall fail."""
from release_check_runner import run_release_check
# Simulate pr_reviewer_tool returning blocking issues
tm = MockToolManager({
"pr_reviewer_tool": FakeToolResult(
success=True,
result={"blocking_count": 1, "summary": "Secret detected", "score": 0},
),
"config_linter_tool": FakeToolResult(
success=True,
result={"blocking_count": 0, "total_findings": 0},
),
"threatmodel_tool": FakeToolResult(
success=True,
result={"unmitigated_high_count": 0},
),
})
inputs = {
"service_name": "router",
"diff_text": 'api_key = "sk-abc123def456xyz789"\n+password="secret"',
"risk_profile": "default",
}
report = self._run(run_release_check(tm, inputs, "sofiia"))
assert report["pass"] is False
pr_gate = next(g for g in report["gates"] if g["name"] == "pr_review")
assert pr_gate["status"] == "fail"
def test_fixture_openapi_breaking_fails_contract(self):
"""Breaking OpenAPI change → contract_diff gate fail → overall fail."""
from release_check_runner import run_release_check
tm = MockToolManager({
"pr_reviewer_tool": FakeToolResult(
success=True, result={"blocking_count": 0},
),
"config_linter_tool": FakeToolResult(
success=True, result={"blocking_count": 0, "total_findings": 0},
),
"contract_tool": FakeToolResult(
success=True,
result={"breaking_count": 2, "summary": "endpoint removed"},
),
"threatmodel_tool": FakeToolResult(
success=True, result={"unmitigated_high_count": 0},
),
})
inputs = {
"service_name": "router",
"diff_text": "minor change",
"openapi_base": "openapi: 3.0.0\npaths:\n /v1/health:\n get: {}\n /v1/agents:\n get: {}",
"openapi_head": "openapi: 3.0.0\npaths:\n /v1/health:\n get: {}",
"risk_profile": "default",
}
report = self._run(run_release_check(tm, inputs, "sofiia"))
assert report["pass"] is False
contract_gate = next(g for g in report["gates"] if g["name"] == "contract_diff")
assert contract_gate["status"] == "fail"
assert contract_gate["breaking_count"] == 2
def test_pass_case(self):
"""All gates pass → overall pass."""
from release_check_runner import run_release_check
tm = MockToolManager({
"pr_reviewer_tool": FakeToolResult(
success=True, result={"blocking_count": 0, "summary": "Clean"},
),
"config_linter_tool": FakeToolResult(
success=True, result={"blocking_count": 0, "total_findings": 0},
),
"contract_tool": FakeToolResult(
success=True, result={"breaking_count": 0},
),
"threatmodel_tool": FakeToolResult(
success=True, result={"unmitigated_high_count": 0},
),
})
inputs = {
"service_name": "router",
"diff_text": "+# simple comment\n",
"openapi_base": "openapi: 3.0.0",
"openapi_head": "openapi: 3.0.0",
"risk_profile": "default",
}
report = self._run(run_release_check(tm, inputs, "sofiia"))
assert report["pass"] is True
for gate in report["gates"]:
assert gate["status"] in ("pass", "skipped")
def test_no_diff_skips_pr_review(self):
"""Empty diff_text causes pr_review gate to be skipped."""
from release_check_runner import run_release_check
tm = MockToolManager({
"config_linter_tool": FakeToolResult(
success=True, result={"blocking_count": 0, "total_findings": 0},
),
"threatmodel_tool": FakeToolResult(
success=True, result={"unmitigated_high_count": 0},
),
})
inputs = {"service_name": "router", "risk_profile": "default"}
report = self._run(run_release_check(tm, inputs, "sofiia"))
pr_gate = next((g for g in report["gates"] if g["name"] == "pr_review"), None)
assert pr_gate is not None
assert pr_gate["status"] == "skipped"
def test_fail_fast_stops_at_first_failure(self):
"""fail_fast=true: stops after first failing gate."""
from release_check_runner import run_release_check
call_count = {"n": 0}
class CountingMockTM:
async def execute_tool(self, tool_name, arguments, agent_id=None, **kwargs):
call_count["n"] += 1
if tool_name == "pr_reviewer_tool":
return FakeToolResult(success=True, result={"blocking_count": 1})
return FakeToolResult(success=True, result={"blocking_count": 0})
inputs = {
"service_name": "router",
"diff_text": "something",
"fail_fast": True,
}
report = asyncio.run(
run_release_check(CountingMockTM(), inputs, "sofiia")
)
assert report["pass"] is False
# With fail_fast, should have stopped early (only 1 tool call)
assert call_count["n"] == 1
def test_report_structure(self):
"""Report must contain pass, gates, recommendations, summary."""
from release_check_runner import run_release_check
tm = MockToolManager({
"pr_reviewer_tool": FakeToolResult(success=True, result={"blocking_count": 0}),
"config_linter_tool": FakeToolResult(success=True, result={"blocking_count": 0}),
"threatmodel_tool": FakeToolResult(success=True, result={"unmitigated_high_count": 0}),
})
report = self._run(run_release_check(tm, {"service_name": "svc"}, "sofiia"))
assert "pass" in report
assert "gates" in report
assert isinstance(report["gates"], list)
assert "recommendations" in report
assert "summary" in report
assert "elapsed_ms" in report