New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
406 lines
16 KiB
Python
406 lines
16 KiB
Python
"""
|
|
Tests for Tool Governance System:
|
|
1. default_tools merge: agent without explicit tools gets read tools
|
|
2. RBAC matrix enforcement: deny without entitlement
|
|
3. Middleware:
|
|
- max_chars_in enforced
|
|
- redaction applied
|
|
- allowlist blocks unknown host
|
|
4. release_check:
|
|
- fixture diff with secret → gate fail
|
|
- fixture openapi breaking → gate fail
|
|
- pass case → gate pass
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import pytest
|
|
|
|
# Add services/router to path for imports
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "router"))
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
|
|
|
|
# ─── 1. default_tools merge ───────────────────────────────────────────────────
|
|
|
|
class TestDefaultToolsMerge:
|
|
"""Agent without explicit tools should get read tools automatically."""
|
|
|
|
def test_unknown_agent_gets_read_tools(self):
|
|
from agent_tools_config import get_agent_tools, reload_rollout_config
|
|
reload_rollout_config()
|
|
tools = get_agent_tools("brand_new_agent_xyz")
|
|
# Must include baseline read tools
|
|
for expected in ["repo_tool", "kb_tool", "oncall_tool", "observability_tool"]:
|
|
assert expected in tools, f"Missing {expected} in tools for unknown agent"
|
|
|
|
def test_sofiia_gets_cto_tools(self):
|
|
from agent_tools_config import get_agent_tools, reload_rollout_config
|
|
reload_rollout_config()
|
|
tools = get_agent_tools("sofiia")
|
|
for expected in [
|
|
"pr_reviewer_tool", "contract_tool", "config_linter_tool",
|
|
"threatmodel_tool", "job_orchestrator_tool",
|
|
]:
|
|
assert expected in tools, f"Sofiia missing CTO tool: {expected}"
|
|
|
|
def test_sofiia_gets_media_tools(self):
|
|
from agent_tools_config import get_agent_tools, reload_rollout_config
|
|
reload_rollout_config()
|
|
tools = get_agent_tools("sofiia")
|
|
assert "comfy_generate_image" in tools
|
|
assert "comfy_generate_video" in tools
|
|
|
|
def test_helion_gets_read_tools(self):
|
|
from agent_tools_config import get_agent_tools, reload_rollout_config
|
|
reload_rollout_config()
|
|
tools = get_agent_tools("helion")
|
|
for expected in ["repo_tool", "kb_tool", "oncall_tool", "job_orchestrator_tool"]:
|
|
assert expected in tools
|
|
|
|
def test_no_duplicates(self):
|
|
from agent_tools_config import get_agent_tools, reload_rollout_config
|
|
reload_rollout_config()
|
|
tools = get_agent_tools("sofiia")
|
|
assert len(tools) == len(set(tools)), "Duplicate tools in list"
|
|
|
|
def test_stable_order(self):
|
|
"""get_agent_tools should return same order on repeated calls."""
|
|
from agent_tools_config import get_agent_tools
|
|
t1 = get_agent_tools("sofiia")
|
|
t2 = get_agent_tools("sofiia")
|
|
assert t1 == t2
|
|
|
|
def test_get_agent_role(self):
|
|
from agent_tools_config import get_agent_role
|
|
assert get_agent_role("sofiia") == "agent_cto"
|
|
assert get_agent_role("helion") == "agent_oncall"
|
|
assert get_agent_role("unknown_xyz") == "agent_default"
|
|
|
|
|
|
# ─── 2. RBAC Matrix Enforcement ───────────────────────────────────────────────
|
|
|
|
class TestRBACMatrix:
|
|
"""Tool dispatch should deny calls without required entitlements."""
|
|
|
|
def test_agent_cto_has_pr_review(self):
|
|
from tool_governance import check_rbac
|
|
ok, reason = check_rbac("sofiia", "pr_reviewer_tool", "review")
|
|
assert ok, f"sofiia should have pr_review.use: {reason}"
|
|
|
|
def test_agent_default_denied_pr_review_gate(self):
|
|
"""agent_default role does NOT have tools.pr_review.gate entitlement."""
|
|
from tool_governance import check_rbac
|
|
# Use a new/unknown agent that gets agent_default role
|
|
ok, reason = check_rbac("brand_new_agent_xyz", "pr_reviewer_tool", "gate")
|
|
assert not ok, "Default agent should NOT have gate entitlement"
|
|
assert "Missing entitlements" in reason
|
|
|
|
def test_agent_default_allowed_repo_read(self):
|
|
from tool_governance import check_rbac
|
|
ok, reason = check_rbac("brand_new_agent_xyz", "repo_tool", "read")
|
|
assert ok, f"Default agent should have repo.read: {reason}"
|
|
|
|
def test_agent_default_denied_incident_write(self):
|
|
from tool_governance import check_rbac
|
|
ok, reason = check_rbac("brand_new_agent_xyz", "oncall_tool", "incident_create")
|
|
assert not ok, "Default agent should NOT have incident_write"
|
|
|
|
def test_cto_allowed_incident_write(self):
|
|
from tool_governance import check_rbac
|
|
ok, reason = check_rbac("sofiia", "oncall_tool", "incident_create")
|
|
assert ok, f"sofiia CTO should have incident_write: {reason}"
|
|
|
|
def test_unknown_tool_allowed(self):
|
|
"""Tools not in matrix should not be blocked (no required entitlements)."""
|
|
from tool_governance import check_rbac
|
|
ok, _ = check_rbac("sofiia", "some_unknown_tool", "action")
|
|
assert ok, "Tool not in matrix should pass (no requirements)"
|
|
|
|
|
|
# ─── 3. Middleware ────────────────────────────────────────────────────────────
|
|
|
|
class TestMiddleware:
|
|
|
|
def test_max_chars_in_enforced(self):
|
|
from tool_governance import check_input_limits
|
|
oversized = "x" * 600_000 # 600KB > pr_reviewer_tool 400KB limit
|
|
ok, reason, limits = check_input_limits("pr_reviewer_tool", oversized)
|
|
assert not ok
|
|
assert "too large" in reason.lower()
|
|
|
|
def test_within_limit_passes(self):
|
|
from tool_governance import check_input_limits
|
|
small = "x" * 100
|
|
ok, reason, limits = check_input_limits("pr_reviewer_tool", small)
|
|
assert ok, f"Small input should pass: {reason}"
|
|
|
|
def test_redaction_api_key(self):
|
|
from tool_governance import redact
|
|
text = 'api_key = "sk-abc123def456xyz789"'
|
|
result = redact(text)
|
|
assert "sk-abc123def456xyz789" not in result
|
|
assert "REDACTED" in result
|
|
|
|
def test_redaction_password(self):
|
|
from tool_governance import redact
|
|
text = "password = 'super_secret_123'"
|
|
result = redact(text)
|
|
assert "super_secret_123" not in result
|
|
|
|
def test_redaction_empty_string(self):
|
|
from tool_governance import redact
|
|
assert redact("") == ""
|
|
assert redact(None) is None
|
|
|
|
def test_allowlist_blocks_unknown_host(self):
|
|
from tool_governance import check_url_allowed
|
|
ok, reason = check_url_allowed("oncall_tool", "http://evil.example.com/steal")
|
|
assert not ok
|
|
assert "not in allowlist" in reason.lower() or "no allowlist" in reason.lower()
|
|
|
|
def test_allowlist_allows_localhost(self):
|
|
from tool_governance import check_url_allowed
|
|
ok, reason = check_url_allowed("oncall_tool", "http://localhost:9102/health")
|
|
assert ok, f"localhost should be allowed: {reason}"
|
|
|
|
def test_allowlist_blocks_private_ip_for_web_extract(self):
|
|
from tool_governance import check_url_allowed
|
|
ok, reason = check_url_allowed("web_extract", "https://192.168.1.1/admin")
|
|
assert not ok, "Private IP should be blocked for web_extract"
|
|
|
|
def test_allowlist_allows_public_url_for_web_extract(self):
|
|
from tool_governance import check_url_allowed
|
|
# web_extract has allow_any_public = true
|
|
ok, reason = check_url_allowed("web_extract", "https://example.com/page")
|
|
assert ok, f"Public URL should be allowed for web_extract: {reason}"
|
|
|
|
|
|
class TestPreCallGovernance:
|
|
"""Test the pre_call method of ToolGovernance."""
|
|
|
|
def test_rbac_denial_in_pre_call(self):
|
|
from tool_governance import ToolGovernance
|
|
gov = ToolGovernance()
|
|
pre = gov.pre_call(
|
|
tool="pr_reviewer_tool",
|
|
action="gate",
|
|
agent_id="brand_new_agent_xyz",
|
|
)
|
|
assert not pre.allowed
|
|
assert "RBAC" in pre.reason
|
|
|
|
def test_allowed_call_returns_context(self):
|
|
from tool_governance import ToolGovernance
|
|
gov = ToolGovernance()
|
|
pre = gov.pre_call(
|
|
tool="repo_tool",
|
|
action="read",
|
|
agent_id="sofiia",
|
|
)
|
|
assert pre.allowed
|
|
assert pre.call_ctx is not None
|
|
assert pre.call_ctx.req_id
|
|
|
|
def test_limits_denial_in_pre_call(self):
|
|
from tool_governance import ToolGovernance
|
|
gov = ToolGovernance()
|
|
big_input = "x" * 600_000
|
|
pre = gov.pre_call(
|
|
tool="pr_reviewer_tool",
|
|
action="review",
|
|
agent_id="sofiia",
|
|
input_text=big_input,
|
|
)
|
|
assert not pre.allowed
|
|
assert "Limits exceeded" in pre.reason
|
|
|
|
|
|
# ─── 4. release_check ─────────────────────────────────────────────────────────
|
|
|
|
class FakeToolResult:
|
|
def __init__(self, success, result=None, error=None):
|
|
self.success = success
|
|
self.result = result
|
|
self.error = error
|
|
|
|
|
|
class MockToolManager:
|
|
"""Minimal mock for release_check_runner tests."""
|
|
|
|
def __init__(self, responses: dict):
|
|
self._responses = responses # tool_name → result
|
|
|
|
async def execute_tool(self, tool_name, arguments, agent_id=None, **kwargs):
|
|
if tool_name in self._responses:
|
|
return self._responses[tool_name]
|
|
return FakeToolResult(success=True, result={})
|
|
|
|
|
|
class TestReleaseCheck:
|
|
|
|
def _run(self, coro):
|
|
return asyncio.run(coro)
|
|
|
|
def test_fixture_diff_with_secret_fails_pr_review(self):
|
|
"""Diff containing a secret → pr_review gate fail → overall fail."""
|
|
from release_check_runner import run_release_check
|
|
|
|
# Simulate pr_reviewer_tool returning blocking issues
|
|
tm = MockToolManager({
|
|
"pr_reviewer_tool": FakeToolResult(
|
|
success=True,
|
|
result={"blocking_count": 1, "summary": "Secret detected", "score": 0},
|
|
),
|
|
"config_linter_tool": FakeToolResult(
|
|
success=True,
|
|
result={"blocking_count": 0, "total_findings": 0},
|
|
),
|
|
"threatmodel_tool": FakeToolResult(
|
|
success=True,
|
|
result={"unmitigated_high_count": 0},
|
|
),
|
|
})
|
|
|
|
inputs = {
|
|
"service_name": "router",
|
|
"diff_text": 'api_key = "sk-abc123def456xyz789"\n+password="secret"',
|
|
"risk_profile": "default",
|
|
}
|
|
report = self._run(run_release_check(tm, inputs, "sofiia"))
|
|
|
|
assert report["pass"] is False
|
|
pr_gate = next(g for g in report["gates"] if g["name"] == "pr_review")
|
|
assert pr_gate["status"] == "fail"
|
|
|
|
def test_fixture_openapi_breaking_fails_contract(self):
|
|
"""Breaking OpenAPI change → contract_diff gate fail → overall fail."""
|
|
from release_check_runner import run_release_check
|
|
|
|
tm = MockToolManager({
|
|
"pr_reviewer_tool": FakeToolResult(
|
|
success=True, result={"blocking_count": 0},
|
|
),
|
|
"config_linter_tool": FakeToolResult(
|
|
success=True, result={"blocking_count": 0, "total_findings": 0},
|
|
),
|
|
"contract_tool": FakeToolResult(
|
|
success=True,
|
|
result={"breaking_count": 2, "summary": "endpoint removed"},
|
|
),
|
|
"threatmodel_tool": FakeToolResult(
|
|
success=True, result={"unmitigated_high_count": 0},
|
|
),
|
|
})
|
|
|
|
inputs = {
|
|
"service_name": "router",
|
|
"diff_text": "minor change",
|
|
"openapi_base": "openapi: 3.0.0\npaths:\n /v1/health:\n get: {}\n /v1/agents:\n get: {}",
|
|
"openapi_head": "openapi: 3.0.0\npaths:\n /v1/health:\n get: {}",
|
|
"risk_profile": "default",
|
|
}
|
|
report = self._run(run_release_check(tm, inputs, "sofiia"))
|
|
|
|
assert report["pass"] is False
|
|
contract_gate = next(g for g in report["gates"] if g["name"] == "contract_diff")
|
|
assert contract_gate["status"] == "fail"
|
|
assert contract_gate["breaking_count"] == 2
|
|
|
|
def test_pass_case(self):
|
|
"""All gates pass → overall pass."""
|
|
from release_check_runner import run_release_check
|
|
|
|
tm = MockToolManager({
|
|
"pr_reviewer_tool": FakeToolResult(
|
|
success=True, result={"blocking_count": 0, "summary": "Clean"},
|
|
),
|
|
"config_linter_tool": FakeToolResult(
|
|
success=True, result={"blocking_count": 0, "total_findings": 0},
|
|
),
|
|
"contract_tool": FakeToolResult(
|
|
success=True, result={"breaking_count": 0},
|
|
),
|
|
"threatmodel_tool": FakeToolResult(
|
|
success=True, result={"unmitigated_high_count": 0},
|
|
),
|
|
})
|
|
|
|
inputs = {
|
|
"service_name": "router",
|
|
"diff_text": "+# simple comment\n",
|
|
"openapi_base": "openapi: 3.0.0",
|
|
"openapi_head": "openapi: 3.0.0",
|
|
"risk_profile": "default",
|
|
}
|
|
report = self._run(run_release_check(tm, inputs, "sofiia"))
|
|
|
|
assert report["pass"] is True
|
|
for gate in report["gates"]:
|
|
assert gate["status"] in ("pass", "skipped")
|
|
|
|
def test_no_diff_skips_pr_review(self):
|
|
"""Empty diff_text causes pr_review gate to be skipped."""
|
|
from release_check_runner import run_release_check
|
|
|
|
tm = MockToolManager({
|
|
"config_linter_tool": FakeToolResult(
|
|
success=True, result={"blocking_count": 0, "total_findings": 0},
|
|
),
|
|
"threatmodel_tool": FakeToolResult(
|
|
success=True, result={"unmitigated_high_count": 0},
|
|
),
|
|
})
|
|
|
|
inputs = {"service_name": "router", "risk_profile": "default"}
|
|
report = self._run(run_release_check(tm, inputs, "sofiia"))
|
|
|
|
pr_gate = next((g for g in report["gates"] if g["name"] == "pr_review"), None)
|
|
assert pr_gate is not None
|
|
assert pr_gate["status"] == "skipped"
|
|
|
|
def test_fail_fast_stops_at_first_failure(self):
|
|
"""fail_fast=true: stops after first failing gate."""
|
|
from release_check_runner import run_release_check
|
|
|
|
call_count = {"n": 0}
|
|
|
|
class CountingMockTM:
|
|
async def execute_tool(self, tool_name, arguments, agent_id=None, **kwargs):
|
|
call_count["n"] += 1
|
|
if tool_name == "pr_reviewer_tool":
|
|
return FakeToolResult(success=True, result={"blocking_count": 1})
|
|
return FakeToolResult(success=True, result={"blocking_count": 0})
|
|
|
|
inputs = {
|
|
"service_name": "router",
|
|
"diff_text": "something",
|
|
"fail_fast": True,
|
|
}
|
|
report = asyncio.run(
|
|
run_release_check(CountingMockTM(), inputs, "sofiia")
|
|
)
|
|
|
|
assert report["pass"] is False
|
|
# With fail_fast, should have stopped early (only 1 tool call)
|
|
assert call_count["n"] == 1
|
|
|
|
def test_report_structure(self):
|
|
"""Report must contain pass, gates, recommendations, summary."""
|
|
from release_check_runner import run_release_check
|
|
|
|
tm = MockToolManager({
|
|
"pr_reviewer_tool": FakeToolResult(success=True, result={"blocking_count": 0}),
|
|
"config_linter_tool": FakeToolResult(success=True, result={"blocking_count": 0}),
|
|
"threatmodel_tool": FakeToolResult(success=True, result={"unmitigated_high_count": 0}),
|
|
})
|
|
report = self._run(run_release_check(tm, {"service_name": "svc"}, "sofiia"))
|
|
|
|
assert "pass" in report
|
|
assert "gates" in report
|
|
assert isinstance(report["gates"], list)
|
|
assert "recommendations" in report
|
|
assert "summary" in report
|
|
assert "elapsed_ms" in report
|