Files
microdao-daarion/services/sofiia-supervisor/tests/test_release_check_graph.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

226 lines
8.2 KiB
Python

"""
Tests for release_check_graph.
Mocks the GatewayClient — no real network calls.
"""
import asyncio
import sys
from pathlib import Path
from unittest.mock import patch
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from tests.conftest import MockGatewayClient, _run
RELEASE_CHECK_PASS_REPORT = {
"pass": True,
"gates": [
{"name": "pr_review", "status": "pass"},
{"name": "config_lint", "status": "pass"},
{"name": "dependency_scan", "status": "pass"},
{"name": "contract_diff", "status": "pass"},
],
"recommendations": [],
"summary": "All gates passed.",
"elapsed_ms": 1200,
}
RELEASE_CHECK_FAIL_REPORT = {
"pass": False,
"gates": [
{"name": "pr_review", "status": "fail"},
{"name": "config_lint", "status": "pass"},
],
"recommendations": ["Fix PR review issues before release."],
"summary": "PR review failed.",
"elapsed_ms": 800,
}
class TestReleaseCheckGraphSuccess:
"""release_check_graph: job starts → job succeeds → returns pass=True."""
def test_async_job_flow(self):
"""start_task returns job_id, then get_job returns succeeded."""
from app.graphs.release_check_graph import build_release_check_graph
mock_gw = MockGatewayClient()
# start_task: returns a job that needs polling
mock_gw.register("job_orchestrator_tool", "start_task", {
"job_id": "j_test_001", "status": "running"
})
# First poll: still running
mock_gw.register("job_orchestrator_tool", "get_job", {"status": "running"})
# Second poll: succeeded with result
mock_gw.register("job_orchestrator_tool", "get_job", {
"status": "succeeded",
"result": RELEASE_CHECK_PASS_REPORT,
})
compiled = build_release_check_graph()
initial_state = {
"run_id": "gr_test_release_001",
"agent_id": "sofiia",
"workspace_id": "daarion",
"user_id": "u_001",
"input": {
"service_name": "router",
"fail_fast": True,
"run_deps": True,
"run_drift": True,
},
}
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
final = _run(compiled.ainvoke(initial_state))
assert final["graph_status"] == "succeeded"
assert final["result"]["pass"] is True
assert final["result"]["summary"] == "All gates passed."
def test_synchronous_job_completion(self):
"""start_task returns result immediately (no polling needed)."""
from app.graphs.release_check_graph import build_release_check_graph
mock_gw = MockGatewayClient()
mock_gw.register("job_orchestrator_tool", "start_task", {
"job_id": "j_sync_001",
"status": "succeeded",
"result": RELEASE_CHECK_PASS_REPORT,
})
compiled = build_release_check_graph()
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
final = _run(compiled.ainvoke({
"run_id": "gr_sync_001",
"agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u_001",
"input": {"service_name": "router"},
}))
assert final["graph_status"] == "succeeded"
assert final["result"]["pass"] is True
# Only one call made (no polling)
tool_calls = [c for c in mock_gw.calls if c["tool"] == "job_orchestrator_tool"]
assert len(tool_calls) == 1
class TestReleaseCheckGraphFail:
"""release_check_graph: job fails → pass=False with error."""
def test_job_fails(self):
"""get_job returns failed → result.pass=False."""
from app.graphs.release_check_graph import build_release_check_graph
mock_gw = MockGatewayClient()
mock_gw.register("job_orchestrator_tool", "start_task", {
"job_id": "j_fail_001", "status": "running"
})
mock_gw.register("job_orchestrator_tool", "get_job", {
"status": "failed",
"error": "PR review failed",
"result": RELEASE_CHECK_FAIL_REPORT,
})
compiled = build_release_check_graph()
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
final = _run(compiled.ainvoke({
"run_id": "gr_fail_001",
"agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u_001",
"input": {"service_name": "router"},
}))
assert final["graph_status"] == "failed"
def test_start_task_gateway_error(self):
"""Gateway returns error on start_task → graph fails gracefully."""
from app.graphs.release_check_graph import build_release_check_graph
mock_gw = MockGatewayClient()
mock_gw.register("job_orchestrator_tool", "start_task",
None, error="RBAC denied: tools.jobs.run not found")
compiled = build_release_check_graph()
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
final = _run(compiled.ainvoke({
"run_id": "gr_err_001",
"agent_id": "nobody", "workspace_id": "w", "user_id": "u",
"input": {},
}))
assert final["graph_status"] == "failed"
assert "start_task failed" in (final.get("error") or "")
def test_finalize_produces_valid_report(self):
"""Even on failure, finalize returns a valid report structure."""
from app.graphs.release_check_graph import build_release_check_graph
mock_gw = MockGatewayClient()
mock_gw.register("job_orchestrator_tool", "start_task",
None, error="timeout")
compiled = build_release_check_graph()
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
final = _run(compiled.ainvoke({
"run_id": "gr_fin_001",
"agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u",
"input": {},
}))
result = final.get("result")
assert result is not None
assert "pass" in result
assert "summary" in result
# ─── Correlation IDs test ─────────────────────────────────────────────────────
class TestCorrelationIds:
"""Every tool call must carry graph_run_id in metadata."""
def test_all_calls_have_run_id(self):
from app.graphs.release_check_graph import build_release_check_graph
run_id = "gr_correlation_test_001"
mock_gw = MockGatewayClient()
mock_gw.register("job_orchestrator_tool", "start_task", {
"job_id": "j_corr_001", "status": "succeeded",
"result": RELEASE_CHECK_PASS_REPORT,
})
compiled = build_release_check_graph()
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
_run(compiled.ainvoke({
"run_id": run_id,
"agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u",
"input": {"service_name": "router"},
}))
for call in mock_gw.calls:
assert call["graph_run_id"] == run_id, (
f"Call {call['tool']}:{call['action']} missing graph_run_id"
)
def test_graph_node_included_in_calls(self):
"""Each call should have a non-empty graph_node."""
from app.graphs.release_check_graph import build_release_check_graph
mock_gw = MockGatewayClient()
mock_gw.register("job_orchestrator_tool", "start_task", {
"job_id": "j_node_001", "status": "succeeded",
"result": RELEASE_CHECK_PASS_REPORT,
})
compiled = build_release_check_graph()
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
_run(compiled.ainvoke({
"run_id": "gr_node_001",
"agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u",
"input": {},
}))
for call in mock_gw.calls:
assert call["graph_node"], f"Call missing graph_node: {call}"