New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
226 lines
8.2 KiB
Python
226 lines
8.2 KiB
Python
"""
|
|
Tests for release_check_graph.
|
|
|
|
Mocks the GatewayClient — no real network calls.
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
from tests.conftest import MockGatewayClient, _run
|
|
|
|
|
|
RELEASE_CHECK_PASS_REPORT = {
|
|
"pass": True,
|
|
"gates": [
|
|
{"name": "pr_review", "status": "pass"},
|
|
{"name": "config_lint", "status": "pass"},
|
|
{"name": "dependency_scan", "status": "pass"},
|
|
{"name": "contract_diff", "status": "pass"},
|
|
],
|
|
"recommendations": [],
|
|
"summary": "All gates passed.",
|
|
"elapsed_ms": 1200,
|
|
}
|
|
|
|
RELEASE_CHECK_FAIL_REPORT = {
|
|
"pass": False,
|
|
"gates": [
|
|
{"name": "pr_review", "status": "fail"},
|
|
{"name": "config_lint", "status": "pass"},
|
|
],
|
|
"recommendations": ["Fix PR review issues before release."],
|
|
"summary": "PR review failed.",
|
|
"elapsed_ms": 800,
|
|
}
|
|
|
|
|
|
class TestReleaseCheckGraphSuccess:
|
|
"""release_check_graph: job starts → job succeeds → returns pass=True."""
|
|
|
|
def test_async_job_flow(self):
|
|
"""start_task returns job_id, then get_job returns succeeded."""
|
|
from app.graphs.release_check_graph import build_release_check_graph
|
|
|
|
mock_gw = MockGatewayClient()
|
|
# start_task: returns a job that needs polling
|
|
mock_gw.register("job_orchestrator_tool", "start_task", {
|
|
"job_id": "j_test_001", "status": "running"
|
|
})
|
|
# First poll: still running
|
|
mock_gw.register("job_orchestrator_tool", "get_job", {"status": "running"})
|
|
# Second poll: succeeded with result
|
|
mock_gw.register("job_orchestrator_tool", "get_job", {
|
|
"status": "succeeded",
|
|
"result": RELEASE_CHECK_PASS_REPORT,
|
|
})
|
|
|
|
compiled = build_release_check_graph()
|
|
initial_state = {
|
|
"run_id": "gr_test_release_001",
|
|
"agent_id": "sofiia",
|
|
"workspace_id": "daarion",
|
|
"user_id": "u_001",
|
|
"input": {
|
|
"service_name": "router",
|
|
"fail_fast": True,
|
|
"run_deps": True,
|
|
"run_drift": True,
|
|
},
|
|
}
|
|
|
|
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
|
|
final = _run(compiled.ainvoke(initial_state))
|
|
|
|
assert final["graph_status"] == "succeeded"
|
|
assert final["result"]["pass"] is True
|
|
assert final["result"]["summary"] == "All gates passed."
|
|
|
|
def test_synchronous_job_completion(self):
|
|
"""start_task returns result immediately (no polling needed)."""
|
|
from app.graphs.release_check_graph import build_release_check_graph
|
|
|
|
mock_gw = MockGatewayClient()
|
|
mock_gw.register("job_orchestrator_tool", "start_task", {
|
|
"job_id": "j_sync_001",
|
|
"status": "succeeded",
|
|
"result": RELEASE_CHECK_PASS_REPORT,
|
|
})
|
|
|
|
compiled = build_release_check_graph()
|
|
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
|
|
final = _run(compiled.ainvoke({
|
|
"run_id": "gr_sync_001",
|
|
"agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u_001",
|
|
"input": {"service_name": "router"},
|
|
}))
|
|
|
|
assert final["graph_status"] == "succeeded"
|
|
assert final["result"]["pass"] is True
|
|
# Only one call made (no polling)
|
|
tool_calls = [c for c in mock_gw.calls if c["tool"] == "job_orchestrator_tool"]
|
|
assert len(tool_calls) == 1
|
|
|
|
|
|
class TestReleaseCheckGraphFail:
|
|
"""release_check_graph: job fails → pass=False with error."""
|
|
|
|
def test_job_fails(self):
|
|
"""get_job returns failed → result.pass=False."""
|
|
from app.graphs.release_check_graph import build_release_check_graph
|
|
|
|
mock_gw = MockGatewayClient()
|
|
mock_gw.register("job_orchestrator_tool", "start_task", {
|
|
"job_id": "j_fail_001", "status": "running"
|
|
})
|
|
mock_gw.register("job_orchestrator_tool", "get_job", {
|
|
"status": "failed",
|
|
"error": "PR review failed",
|
|
"result": RELEASE_CHECK_FAIL_REPORT,
|
|
})
|
|
|
|
compiled = build_release_check_graph()
|
|
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
|
|
final = _run(compiled.ainvoke({
|
|
"run_id": "gr_fail_001",
|
|
"agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u_001",
|
|
"input": {"service_name": "router"},
|
|
}))
|
|
|
|
assert final["graph_status"] == "failed"
|
|
|
|
def test_start_task_gateway_error(self):
|
|
"""Gateway returns error on start_task → graph fails gracefully."""
|
|
from app.graphs.release_check_graph import build_release_check_graph
|
|
|
|
mock_gw = MockGatewayClient()
|
|
mock_gw.register("job_orchestrator_tool", "start_task",
|
|
None, error="RBAC denied: tools.jobs.run not found")
|
|
|
|
compiled = build_release_check_graph()
|
|
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
|
|
final = _run(compiled.ainvoke({
|
|
"run_id": "gr_err_001",
|
|
"agent_id": "nobody", "workspace_id": "w", "user_id": "u",
|
|
"input": {},
|
|
}))
|
|
|
|
assert final["graph_status"] == "failed"
|
|
assert "start_task failed" in (final.get("error") or "")
|
|
|
|
def test_finalize_produces_valid_report(self):
|
|
"""Even on failure, finalize returns a valid report structure."""
|
|
from app.graphs.release_check_graph import build_release_check_graph
|
|
|
|
mock_gw = MockGatewayClient()
|
|
mock_gw.register("job_orchestrator_tool", "start_task",
|
|
None, error="timeout")
|
|
|
|
compiled = build_release_check_graph()
|
|
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
|
|
final = _run(compiled.ainvoke({
|
|
"run_id": "gr_fin_001",
|
|
"agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u",
|
|
"input": {},
|
|
}))
|
|
|
|
result = final.get("result")
|
|
assert result is not None
|
|
assert "pass" in result
|
|
assert "summary" in result
|
|
|
|
|
|
# ─── Correlation IDs test ─────────────────────────────────────────────────────
|
|
|
|
class TestCorrelationIds:
|
|
"""Every tool call must carry graph_run_id in metadata."""
|
|
|
|
def test_all_calls_have_run_id(self):
|
|
from app.graphs.release_check_graph import build_release_check_graph
|
|
|
|
run_id = "gr_correlation_test_001"
|
|
mock_gw = MockGatewayClient()
|
|
mock_gw.register("job_orchestrator_tool", "start_task", {
|
|
"job_id": "j_corr_001", "status": "succeeded",
|
|
"result": RELEASE_CHECK_PASS_REPORT,
|
|
})
|
|
|
|
compiled = build_release_check_graph()
|
|
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
|
|
_run(compiled.ainvoke({
|
|
"run_id": run_id,
|
|
"agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u",
|
|
"input": {"service_name": "router"},
|
|
}))
|
|
|
|
for call in mock_gw.calls:
|
|
assert call["graph_run_id"] == run_id, (
|
|
f"Call {call['tool']}:{call['action']} missing graph_run_id"
|
|
)
|
|
|
|
def test_graph_node_included_in_calls(self):
|
|
"""Each call should have a non-empty graph_node."""
|
|
from app.graphs.release_check_graph import build_release_check_graph
|
|
|
|
mock_gw = MockGatewayClient()
|
|
mock_gw.register("job_orchestrator_tool", "start_task", {
|
|
"job_id": "j_node_001", "status": "succeeded",
|
|
"result": RELEASE_CHECK_PASS_REPORT,
|
|
})
|
|
|
|
compiled = build_release_check_graph()
|
|
with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw):
|
|
_run(compiled.ainvoke({
|
|
"run_id": "gr_node_001",
|
|
"agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u",
|
|
"input": {},
|
|
}))
|
|
|
|
for call in mock_gw.calls:
|
|
assert call["graph_node"], f"Call missing graph_node: {call}"
|