""" Tests for release_check_graph. Mocks the GatewayClient — no real network calls. """ import asyncio import sys from pathlib import Path from unittest.mock import patch import pytest sys.path.insert(0, str(Path(__file__).parent.parent)) from tests.conftest import MockGatewayClient, _run RELEASE_CHECK_PASS_REPORT = { "pass": True, "gates": [ {"name": "pr_review", "status": "pass"}, {"name": "config_lint", "status": "pass"}, {"name": "dependency_scan", "status": "pass"}, {"name": "contract_diff", "status": "pass"}, ], "recommendations": [], "summary": "All gates passed.", "elapsed_ms": 1200, } RELEASE_CHECK_FAIL_REPORT = { "pass": False, "gates": [ {"name": "pr_review", "status": "fail"}, {"name": "config_lint", "status": "pass"}, ], "recommendations": ["Fix PR review issues before release."], "summary": "PR review failed.", "elapsed_ms": 800, } class TestReleaseCheckGraphSuccess: """release_check_graph: job starts → job succeeds → returns pass=True.""" def test_async_job_flow(self): """start_task returns job_id, then get_job returns succeeded.""" from app.graphs.release_check_graph import build_release_check_graph mock_gw = MockGatewayClient() # start_task: returns a job that needs polling mock_gw.register("job_orchestrator_tool", "start_task", { "job_id": "j_test_001", "status": "running" }) # First poll: still running mock_gw.register("job_orchestrator_tool", "get_job", {"status": "running"}) # Second poll: succeeded with result mock_gw.register("job_orchestrator_tool", "get_job", { "status": "succeeded", "result": RELEASE_CHECK_PASS_REPORT, }) compiled = build_release_check_graph() initial_state = { "run_id": "gr_test_release_001", "agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u_001", "input": { "service_name": "router", "fail_fast": True, "run_deps": True, "run_drift": True, }, } with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw): final = _run(compiled.ainvoke(initial_state)) assert final["graph_status"] == "succeeded" assert final["result"]["pass"] is True assert final["result"]["summary"] == "All gates passed." def test_synchronous_job_completion(self): """start_task returns result immediately (no polling needed).""" from app.graphs.release_check_graph import build_release_check_graph mock_gw = MockGatewayClient() mock_gw.register("job_orchestrator_tool", "start_task", { "job_id": "j_sync_001", "status": "succeeded", "result": RELEASE_CHECK_PASS_REPORT, }) compiled = build_release_check_graph() with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw): final = _run(compiled.ainvoke({ "run_id": "gr_sync_001", "agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u_001", "input": {"service_name": "router"}, })) assert final["graph_status"] == "succeeded" assert final["result"]["pass"] is True # Only one call made (no polling) tool_calls = [c for c in mock_gw.calls if c["tool"] == "job_orchestrator_tool"] assert len(tool_calls) == 1 class TestReleaseCheckGraphFail: """release_check_graph: job fails → pass=False with error.""" def test_job_fails(self): """get_job returns failed → result.pass=False.""" from app.graphs.release_check_graph import build_release_check_graph mock_gw = MockGatewayClient() mock_gw.register("job_orchestrator_tool", "start_task", { "job_id": "j_fail_001", "status": "running" }) mock_gw.register("job_orchestrator_tool", "get_job", { "status": "failed", "error": "PR review failed", "result": RELEASE_CHECK_FAIL_REPORT, }) compiled = build_release_check_graph() with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw): final = _run(compiled.ainvoke({ "run_id": "gr_fail_001", "agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u_001", "input": {"service_name": "router"}, })) assert final["graph_status"] == "failed" def test_start_task_gateway_error(self): """Gateway returns error on start_task → graph fails gracefully.""" from app.graphs.release_check_graph import build_release_check_graph mock_gw = MockGatewayClient() mock_gw.register("job_orchestrator_tool", "start_task", None, error="RBAC denied: tools.jobs.run not found") compiled = build_release_check_graph() with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw): final = _run(compiled.ainvoke({ "run_id": "gr_err_001", "agent_id": "nobody", "workspace_id": "w", "user_id": "u", "input": {}, })) assert final["graph_status"] == "failed" assert "start_task failed" in (final.get("error") or "") def test_finalize_produces_valid_report(self): """Even on failure, finalize returns a valid report structure.""" from app.graphs.release_check_graph import build_release_check_graph mock_gw = MockGatewayClient() mock_gw.register("job_orchestrator_tool", "start_task", None, error="timeout") compiled = build_release_check_graph() with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw): final = _run(compiled.ainvoke({ "run_id": "gr_fin_001", "agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u", "input": {}, })) result = final.get("result") assert result is not None assert "pass" in result assert "summary" in result # ─── Correlation IDs test ───────────────────────────────────────────────────── class TestCorrelationIds: """Every tool call must carry graph_run_id in metadata.""" def test_all_calls_have_run_id(self): from app.graphs.release_check_graph import build_release_check_graph run_id = "gr_correlation_test_001" mock_gw = MockGatewayClient() mock_gw.register("job_orchestrator_tool", "start_task", { "job_id": "j_corr_001", "status": "succeeded", "result": RELEASE_CHECK_PASS_REPORT, }) compiled = build_release_check_graph() with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw): _run(compiled.ainvoke({ "run_id": run_id, "agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u", "input": {"service_name": "router"}, })) for call in mock_gw.calls: assert call["graph_run_id"] == run_id, ( f"Call {call['tool']}:{call['action']} missing graph_run_id" ) def test_graph_node_included_in_calls(self): """Each call should have a non-empty graph_node.""" from app.graphs.release_check_graph import build_release_check_graph mock_gw = MockGatewayClient() mock_gw.register("job_orchestrator_tool", "start_task", { "job_id": "j_node_001", "status": "succeeded", "result": RELEASE_CHECK_PASS_REPORT, }) compiled = build_release_check_graph() with patch("app.graphs.release_check_graph.GatewayClient", return_value=mock_gw): _run(compiled.ainvoke({ "run_id": "gr_node_001", "agent_id": "sofiia", "workspace_id": "daarion", "user_id": "u", "input": {}, })) for call in mock_gw.calls: assert call["graph_node"], f"Call missing graph_node: {call}"