New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
753 lines
32 KiB
Python
753 lines
32 KiB
Python
"""
|
|
Tests for alert_triage_graph.
|
|
|
|
Covers:
|
|
- P1 prod alert → incident created + deterministic triage + ack (no LLM)
|
|
- P3 alert → digest-only, no incident
|
|
- Signature dedupe → same signature reuses existing incident
|
|
- Gateway error on one alert → loop continues (non-fatal)
|
|
- Policy loader fallback (missing file)
|
|
- LLM guard: llm_mode=off forces deterministic even when rule says llm
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
from unittest.mock import patch, MagicMock, AsyncMock
|
|
|
|
ROOT = Path(__file__).resolve().parents[4]
|
|
SUPERVISOR = ROOT / "services" / "sofiia-supervisor"
|
|
if str(SUPERVISOR) not in sys.path:
|
|
sys.path.insert(0, str(SUPERVISOR))
|
|
|
|
# ─── Mock GatewayClient ───────────────────────────────────────────────────────
|
|
|
|
class MockToolCallResult:
|
|
def __init__(self, success=True, data=None, error_message=""):
|
|
self.success = success
|
|
self.data = data or {}
|
|
self.error_message = error_message
|
|
|
|
|
|
class MockGatewayClient:
|
|
"""Records all calls, returns configurable responses per (tool, action)."""
|
|
|
|
def __init__(self, responses: Optional[Dict] = None):
|
|
self.calls: List[Dict] = []
|
|
self.responses = responses or {}
|
|
|
|
async def call_tool(self, tool_name, action, params=None, **kwargs) -> MockToolCallResult:
|
|
self.calls.append({"tool": tool_name, "action": action, "params": params or {}})
|
|
key = f"{tool_name}.{action}"
|
|
if key in self.responses:
|
|
resp = self.responses[key]
|
|
if callable(resp):
|
|
return resp(tool_name, action, params)
|
|
return MockToolCallResult(True, resp)
|
|
# Default success responses per tool/action
|
|
defaults = {
|
|
"alert_ingest_tool.claim": {
|
|
"alerts": [], "claimed": 0, "requeued_stale": 0,
|
|
},
|
|
"alert_ingest_tool.list": {
|
|
"alerts": [], "count": 0,
|
|
},
|
|
"alert_ingest_tool.ack": {"ack_status": "acked"},
|
|
"alert_ingest_tool.fail": {"alert_ref": "?", "status": "failed"},
|
|
"oncall_tool.signature_should_triage": {"should_triage": True},
|
|
"oncall_tool.signature_mark_triage": {"marked": "triage_run"},
|
|
"oncall_tool.signature_mark_alert": {"marked": "alert_seen"},
|
|
"incident_escalation_tool.evaluate": {
|
|
"evaluated": 0, "escalated": 0, "followups_created": 0,
|
|
"candidates": [], "recommendations": [], "dry_run": False,
|
|
},
|
|
"incident_escalation_tool.auto_resolve_candidates": {
|
|
"candidates": [], "candidates_count": 0,
|
|
"closed": [], "closed_count": 0, "dry_run": True,
|
|
},
|
|
"oncall_tool.alert_to_incident": {
|
|
"incident_id": "inc_test_001",
|
|
"created": True,
|
|
"severity": "P1",
|
|
"incident_signature": "abcd1234" * 4,
|
|
},
|
|
"oncall_tool.incident_attach_artifact": {"artifact": {"path": "ops/incidents/test/triage.json"}},
|
|
"oncall_tool.incident_append_event": {"event": {"id": 1}},
|
|
"oncall_tool.service_health": {"healthy": True, "status": "ok"},
|
|
"observability_tool.service_overview": {"metrics": {}, "status": "ok"},
|
|
"kb_tool.snippets": {"snippets": []},
|
|
}
|
|
if key in defaults:
|
|
return MockToolCallResult(True, defaults[key])
|
|
return MockToolCallResult(True, {})
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, *args):
|
|
pass
|
|
|
|
|
|
# ─── Alert fixtures ───────────────────────────────────────────────────────────
|
|
|
|
def _make_alert(
|
|
service="gateway", severity="P1", kind="slo_breach",
|
|
env="prod", fingerprint="fp1", ref="alrt_001",
|
|
):
|
|
return {
|
|
"alert_ref": ref,
|
|
"source": "monitor@node1",
|
|
"service": service,
|
|
"env": env,
|
|
"severity": severity,
|
|
"kind": kind,
|
|
"title": f"{service} {kind} alert",
|
|
"summary": f"{service} is experiencing {kind}",
|
|
"started_at": "2025-01-23T09:00:00",
|
|
"labels": {"node": "node1", "fingerprint": fingerprint},
|
|
"metrics": {"latency_p95_ms": 450, "error_rate_pct": 2.5},
|
|
"ack_status": "pending",
|
|
}
|
|
|
|
|
|
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def _run_graph(state_input: Dict, mock_gw: MockGatewayClient) -> Dict:
|
|
"""Execute alert_triage_graph with mocked GatewayClient."""
|
|
from app.graphs.alert_triage_graph import build_alert_triage_graph
|
|
|
|
graph = build_alert_triage_graph()
|
|
|
|
async def _run():
|
|
with patch("app.graphs.alert_triage_graph.GatewayClient", return_value=mock_gw):
|
|
with patch("app.graphs.alert_triage_graph.GatewayClient.__aenter__",
|
|
return_value=mock_gw):
|
|
with patch("app.graphs.alert_triage_graph.GatewayClient.__aexit__",
|
|
return_value=AsyncMock(return_value=None)):
|
|
return await graph.ainvoke(state_input)
|
|
|
|
return asyncio.run(_run())
|
|
|
|
|
|
# ─── Tests ────────────────────────────────────────────────────────────────────
|
|
|
|
class TestAlertTriageNoLLM:
|
|
"""P1 prod alert → incident + deterministic triage, zero LLM calls."""
|
|
|
|
def _run_with_p1_alert(self, alert_ref="alrt_p1"):
|
|
p1_alert = _make_alert(severity="P1", env="prod", ref=alert_ref)
|
|
inc_sig = hashlib.sha256(f"gateway|prod|slo_breach|fp1".encode()).hexdigest()[:32]
|
|
|
|
gw = MockGatewayClient(responses={
|
|
"alert_ingest_tool.claim": {
|
|
"alerts": [p1_alert], "claimed": 1, "requeued_stale": 0,
|
|
},
|
|
"alert_ingest_tool.list": {
|
|
"alerts": [p1_alert], "count": 1,
|
|
},
|
|
"oncall_tool.signature_should_triage": {"should_triage": False},
|
|
"oncall_tool.alert_to_incident": {
|
|
"incident_id": "inc_test_p1",
|
|
"created": True,
|
|
"severity": "P1",
|
|
"incident_signature": inc_sig,
|
|
},
|
|
})
|
|
|
|
state = {
|
|
"workspace_id": "default",
|
|
"user_id": "test",
|
|
"agent_id": "sofiia",
|
|
"_run_id": "test_run_001",
|
|
}
|
|
|
|
with patch("app.graphs.alert_triage_graph.load_policy") as mp:
|
|
mp.return_value = {
|
|
"defaults": {
|
|
"max_alerts_per_run": 10,
|
|
"only_unacked": False,
|
|
"max_incidents_per_run": 5,
|
|
"max_triages_per_run": 5,
|
|
"llm_mode": "off",
|
|
"llm_on": {"triage": False},
|
|
"dedupe_window_minutes_default": 120,
|
|
"ack_note_prefix": "test_loop",
|
|
},
|
|
"routing": [
|
|
{
|
|
"match": {"env_in": ["prod"], "severity_in": ["P0", "P1"]},
|
|
"actions": {
|
|
"auto_incident": True,
|
|
"auto_triage": False, # skip triage in unit test
|
|
"triage_mode": "deterministic",
|
|
"incident_severity_cap": "P1",
|
|
"dedupe_window_minutes": 120,
|
|
"attach_alert_artifact": True,
|
|
"ack": True,
|
|
},
|
|
},
|
|
],
|
|
}
|
|
with patch("app.graphs.alert_triage_graph.match_alert",
|
|
side_effect=lambda a, p: {
|
|
"auto_incident": True, "auto_triage": False,
|
|
"triage_mode": "deterministic",
|
|
"incident_severity_cap": "P1",
|
|
"dedupe_window_minutes": 120,
|
|
"ack": True,
|
|
"_normalized_kind": "slo_breach",
|
|
}):
|
|
result = asyncio.run(self._async_run_graph(state, gw))
|
|
return result, gw
|
|
|
|
async def _async_run_graph(self, state, gw):
|
|
from app.graphs.alert_triage_graph import (
|
|
load_policy_node, list_alerts_node, process_alerts_node, build_digest_node
|
|
)
|
|
s = await load_policy_node(state)
|
|
s["_run_id"] = "test_run_001"
|
|
with patch("app.graphs.alert_triage_graph.GatewayClient", return_value=gw):
|
|
s = await list_alerts_node(s)
|
|
s = await process_alerts_node(s)
|
|
s = await build_digest_node(s)
|
|
return s
|
|
|
|
def test_incident_created_for_p1_prod(self):
|
|
result, gw = self._run_with_p1_alert()
|
|
created = result.get("created_incidents", [])
|
|
assert len(created) >= 1
|
|
assert created[0]["incident_id"] == "inc_test_p1"
|
|
|
|
def test_no_llm_calls(self):
|
|
result, gw = self._run_with_p1_alert()
|
|
llm_tools = [c for c in gw.calls if c["tool"] in ("llm_tool", "chat_tool")]
|
|
assert len(llm_tools) == 0, f"Unexpected LLM calls: {llm_tools}"
|
|
|
|
def test_alert_acked(self):
|
|
result, gw = self._run_with_p1_alert()
|
|
ack_calls = [c for c in gw.calls
|
|
if c["tool"] == "alert_ingest_tool" and c["action"] == "ack"]
|
|
assert len(ack_calls) >= 1
|
|
|
|
def test_digest_contains_incident(self):
|
|
result, gw = self._run_with_p1_alert()
|
|
digest = result.get("digest_md", "")
|
|
assert "inc_test_p1" in digest
|
|
|
|
def test_result_summary_populated(self):
|
|
result, gw = self._run_with_p1_alert()
|
|
summary = result.get("result_summary", {})
|
|
assert summary.get("created_incidents", 0) >= 1
|
|
|
|
|
|
class TestAlertTriageDigestOnly:
|
|
"""P3 alert → digest_only, no incident created, alert acked."""
|
|
|
|
async def _run(self, gw, state):
|
|
from app.graphs.alert_triage_graph import (
|
|
load_policy_node, list_alerts_node, process_alerts_node, build_digest_node
|
|
)
|
|
with patch("app.graphs.alert_triage_graph.load_policy") as mp:
|
|
mp.return_value = {
|
|
"defaults": {
|
|
"max_alerts_per_run": 10,
|
|
"only_unacked": False,
|
|
"max_incidents_per_run": 5,
|
|
"max_triages_per_run": 5,
|
|
"llm_mode": "off",
|
|
"llm_on": {"triage": False},
|
|
"dedupe_window_minutes_default": 120,
|
|
"ack_note_prefix": "test_loop",
|
|
},
|
|
"routing": [
|
|
{
|
|
"match": {"severity_in": ["P2", "P3", "INFO"]},
|
|
"actions": {"auto_incident": False, "digest_only": True, "ack": True},
|
|
},
|
|
],
|
|
}
|
|
s = await load_policy_node(state)
|
|
s["_run_id"] = "test_p3"
|
|
with patch("app.graphs.alert_triage_graph.GatewayClient") as MockGW:
|
|
MockGW.return_value.__aenter__ = AsyncMock(return_value=gw)
|
|
MockGW.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
with patch("app.graphs.alert_triage_graph.load_policy") as mp2:
|
|
mp2.return_value = s["policy"]
|
|
with patch("app.graphs.alert_triage_graph.match_alert",
|
|
side_effect=lambda a, p: {
|
|
"auto_incident": False, "digest_only": True, "ack": True,
|
|
}):
|
|
s = await list_alerts_node(s)
|
|
s = await process_alerts_node(s)
|
|
return await build_digest_node(s)
|
|
|
|
def test_no_incident_created(self):
|
|
p3_alert = _make_alert(severity="P3", env="prod", ref="alrt_p3")
|
|
gw = MockGatewayClient(responses={
|
|
"alert_ingest_tool.claim": {"alerts": [p3_alert], "claimed": 1, "requeued_stale": 0},
|
|
"alert_ingest_tool.list": {"alerts": [p3_alert], "count": 1},
|
|
})
|
|
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
|
|
result = asyncio.run(self._run(gw, state))
|
|
assert result.get("created_incidents", []) == []
|
|
assert len(result.get("skipped_alerts", [])) >= 1
|
|
|
|
def test_no_oncall_write_calls(self):
|
|
p3_alert = _make_alert(severity="P3", env="prod", ref="alrt_p3_2")
|
|
gw = MockGatewayClient(responses={
|
|
"alert_ingest_tool.claim": {"alerts": [p3_alert], "claimed": 1, "requeued_stale": 0},
|
|
"alert_ingest_tool.list": {"alerts": [p3_alert], "count": 1},
|
|
})
|
|
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
|
|
asyncio.run(self._run(gw, state))
|
|
write_calls = [c for c in gw.calls if c["tool"] == "oncall_tool"
|
|
and "incident" in c["action"]]
|
|
assert len(write_calls) == 0
|
|
|
|
def test_digest_shows_skipped(self):
|
|
p3_alert = _make_alert(severity="P3", env="prod", ref="alrt_p3_3")
|
|
gw = MockGatewayClient(responses={
|
|
"alert_ingest_tool.claim": {"alerts": [p3_alert], "claimed": 1, "requeued_stale": 0},
|
|
"alert_ingest_tool.list": {"alerts": [p3_alert], "count": 1},
|
|
})
|
|
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
|
|
result = asyncio.run(self._run(gw, state))
|
|
digest = result.get("digest_md", "")
|
|
assert "Skipped" in digest or "skipped" in digest.lower()
|
|
|
|
|
|
class TestAlertTriageSignatureDedupe:
|
|
"""Same signature → existing incident reused, no duplicate created."""
|
|
|
|
def test_same_signature_reuse(self):
|
|
from app.alert_routing import compute_incident_signature
|
|
|
|
alert1 = _make_alert(ref="alrt_sig1", fingerprint="samefp")
|
|
alert2 = _make_alert(ref="alrt_sig2", fingerprint="samefp") # same fingerprint
|
|
|
|
# Verify both produce the same signature
|
|
sig1 = compute_incident_signature(alert1)
|
|
sig2 = compute_incident_signature(alert2)
|
|
assert sig1 == sig2, f"Signatures differ: {sig1} vs {sig2}"
|
|
|
|
def test_different_fingerprint_different_signature(self):
|
|
from app.alert_routing import compute_incident_signature
|
|
|
|
alert1 = _make_alert(ref="alrt_diff1", fingerprint="fp_a")
|
|
alert2 = _make_alert(ref="alrt_diff2", fingerprint="fp_b")
|
|
|
|
sig1 = compute_incident_signature(alert1)
|
|
sig2 = compute_incident_signature(alert2)
|
|
assert sig1 != sig2
|
|
|
|
def test_different_service_different_signature(self):
|
|
from app.alert_routing import compute_incident_signature
|
|
|
|
alert1 = _make_alert(service="gateway", fingerprint="fp1")
|
|
alert2 = _make_alert(service="router", fingerprint="fp1")
|
|
|
|
assert compute_incident_signature(alert1) != compute_incident_signature(alert2)
|
|
|
|
def test_signature_stored_in_incident_meta(self):
|
|
"""Verify that alert_to_incident stores incident_signature in result."""
|
|
from app.alert_routing import compute_incident_signature
|
|
|
|
alert = _make_alert(ref="alrt_meta_test")
|
|
sig = compute_incident_signature(alert)
|
|
|
|
# The router tool_manager stores sig in incident meta and returns it
|
|
# We test the compute function here; integration tested in test_alert_to_incident.py
|
|
assert len(sig) == 32
|
|
assert all(c in "0123456789abcdef" for c in sig)
|
|
|
|
|
|
class TestAlertTriageNonFatalErrors:
|
|
"""Gateway error on one alert → loop continues others."""
|
|
|
|
async def _run_mixed(self, alerts, gw, state):
|
|
from app.graphs.alert_triage_graph import (
|
|
load_policy_node, list_alerts_node, process_alerts_node, build_digest_node
|
|
)
|
|
with patch("app.graphs.alert_triage_graph.load_policy") as mp:
|
|
mp.return_value = {
|
|
"defaults": {
|
|
"max_alerts_per_run": 10,
|
|
"only_unacked": False,
|
|
"max_incidents_per_run": 5,
|
|
"max_triages_per_run": 5,
|
|
"llm_mode": "off",
|
|
"llm_on": {},
|
|
"dedupe_window_minutes_default": 120,
|
|
"ack_note_prefix": "test",
|
|
},
|
|
"routing": [],
|
|
}
|
|
s = await load_policy_node(state)
|
|
s["_run_id"] = "test_nonfatal"
|
|
s["alerts"] = alerts
|
|
|
|
with patch("app.graphs.alert_triage_graph.GatewayClient") as MockGW:
|
|
MockGW.return_value.__aenter__ = AsyncMock(return_value=gw)
|
|
MockGW.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
with patch("app.graphs.alert_triage_graph.match_alert") as mock_match:
|
|
call_count = [0]
|
|
def match_side_effect(alert, policy=None):
|
|
call_count[0] += 1
|
|
if call_count[0] == 1:
|
|
# First alert raises (simulated via actions that trigger error)
|
|
raise RuntimeError("Gateway timeout for first alert")
|
|
return {
|
|
"auto_incident": False, "digest_only": True, "ack": True,
|
|
}
|
|
mock_match.side_effect = match_side_effect
|
|
s = await process_alerts_node(s)
|
|
return await build_digest_node(s)
|
|
|
|
def test_error_on_one_continues_others(self):
|
|
alerts = [
|
|
_make_alert(ref="alrt_fail", severity="P1"),
|
|
_make_alert(ref="alrt_ok", severity="P3"),
|
|
]
|
|
gw = MockGatewayClient()
|
|
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
|
|
result = asyncio.run(self._run_mixed(alerts, gw, state))
|
|
|
|
# Both should be counted as processed
|
|
assert result.get("processed", 0) == 2
|
|
# Error recorded
|
|
errors = result.get("errors", [])
|
|
assert len(errors) >= 1
|
|
|
|
def test_digest_shows_errors(self):
|
|
alerts = [
|
|
_make_alert(ref="alrt_err", severity="P1"),
|
|
_make_alert(ref="alrt_ok2", severity="P3"),
|
|
]
|
|
gw = MockGatewayClient()
|
|
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
|
|
result = asyncio.run(self._run_mixed(alerts, gw, state))
|
|
digest = result.get("digest_md", "")
|
|
assert "Error" in digest or "error" in digest.lower()
|
|
|
|
|
|
class TestPostProcessNodes:
|
|
"""Test escalation + autoresolve post-process nodes."""
|
|
|
|
def setup_method(self):
|
|
sup_path = ROOT.parent / "services" / "sofiia-supervisor"
|
|
if str(sup_path) not in sys.path:
|
|
sys.path.insert(0, str(sup_path))
|
|
|
|
def test_escalation_result_in_digest(self):
|
|
"""Escalation results appear in digest when incidents are escalated."""
|
|
import asyncio
|
|
from app.graphs.alert_triage_graph import (
|
|
load_policy_node, list_alerts_node, process_alerts_node,
|
|
post_process_escalation_node, post_process_autoresolve_node, build_digest_node
|
|
)
|
|
|
|
p1_alert = _make_alert(severity="P1", fingerprint="fp_esc")
|
|
gw = MockGatewayClient(responses={
|
|
"alert_ingest_tool.claim": {
|
|
"alerts": [p1_alert], "claimed": 1, "requeued_stale": 0,
|
|
},
|
|
"oncall_tool.alert_to_incident": {
|
|
"incident_id": "inc_esc_001", "created": True,
|
|
"incident_signature": "esc_sig_001",
|
|
},
|
|
"oncall_tool.signature_should_triage": {"should_triage": False},
|
|
"incident_escalation_tool.evaluate": {
|
|
"evaluated": 1, "escalated": 1, "followups_created": 1,
|
|
"candidates": [{"incident_id": "inc_esc_001", "service": "gateway",
|
|
"from_severity": "P2", "to_severity": "P1",
|
|
"occurrences_60m": 15, "triage_count_24h": 2}],
|
|
"recommendations": ["Escalated inc_esc_001"],
|
|
"dry_run": False,
|
|
},
|
|
"incident_escalation_tool.auto_resolve_candidates": {
|
|
"candidates": [], "candidates_count": 0,
|
|
"closed": [], "closed_count": 0, "dry_run": True,
|
|
},
|
|
})
|
|
|
|
state = {
|
|
"workspace_id": "ws1", "user_id": "u1", "agent_id": "sofiia",
|
|
"policy": {
|
|
"defaults": {"only_unacked": True, "auto_incident": True,
|
|
"auto_triage": False, "llm_mode": "off", "ack": True},
|
|
"routing": [],
|
|
},
|
|
"dry_run": False, "max_alerts": 20,
|
|
"max_incidents_per_run": 5, "max_triages_per_run": 5,
|
|
"created_incidents": [], "updated_incidents": [], "skipped_alerts": [],
|
|
"errors": [],
|
|
}
|
|
|
|
async def run():
|
|
s = {**state, "_run_id": "test_esc_001"}
|
|
with patch("app.graphs.alert_triage_graph.GatewayClient", return_value=gw):
|
|
s = await list_alerts_node(s)
|
|
s = await process_alerts_node(s)
|
|
s = await post_process_escalation_node(s)
|
|
s = await post_process_autoresolve_node(s)
|
|
s = await build_digest_node(s)
|
|
return s
|
|
|
|
result = asyncio.run(run())
|
|
assert result["escalation_result"]["escalated"] == 1
|
|
assert result["result_summary"]["escalated"] == 1
|
|
assert "Escalated Incidents" in result["digest_md"]
|
|
|
|
def test_post_process_skipped_when_no_alerts_processed(self):
|
|
"""If 0 alerts processed, post-process nodes skip gracefully."""
|
|
import asyncio
|
|
from app.graphs.alert_triage_graph import (
|
|
post_process_escalation_node, post_process_autoresolve_node
|
|
)
|
|
|
|
state = {"processed": 0, "agent_id": "sofiia", "workspace_id": "ws1",
|
|
"_run_id": "test_skip_001", "dry_run": False}
|
|
gw = MockGatewayClient()
|
|
|
|
async def run():
|
|
s = {**state}
|
|
with patch("app.graphs.alert_triage_graph.GatewayClient", return_value=gw):
|
|
s = await post_process_escalation_node(s)
|
|
s = await post_process_autoresolve_node(s)
|
|
return s
|
|
|
|
result = asyncio.run(run())
|
|
assert result["escalation_result"] == {}
|
|
assert result["autoresolve_result"] == {}
|
|
# No tool calls made
|
|
esc_calls = [c for c in gw.calls if c["tool"] == "incident_escalation_tool"]
|
|
assert len(esc_calls) == 0
|
|
|
|
|
|
class TestCooldownPreventsTriage:
|
|
def setup_method(self):
|
|
sup_path = ROOT.parent / "services" / "sofiia-supervisor"
|
|
if str(sup_path) not in sys.path:
|
|
sys.path.insert(0, str(sup_path))
|
|
|
|
def test_cooldown_active_appends_event_but_acks(self):
|
|
"""When cooldown is active: no triage, but alert is acked and event appended."""
|
|
import asyncio
|
|
from app.graphs.alert_triage_graph import (
|
|
load_policy_node, list_alerts_node, process_alerts_node, build_digest_node
|
|
)
|
|
policy = {
|
|
"defaults": {
|
|
"only_unacked": True, "auto_incident": True, "auto_triage": True,
|
|
"triage_mode": "deterministic", "triage_cooldown_minutes": 15,
|
|
"llm_mode": "off",
|
|
},
|
|
"routing": [
|
|
{"match": {"severity": "P1"}, "actions": {
|
|
"auto_incident": True, "auto_triage": True,
|
|
"triage_mode": "deterministic", "incident_severity_cap": "P1",
|
|
"ack": True,
|
|
}}
|
|
],
|
|
}
|
|
p1_alert = _make_alert(severity="P1", fingerprint="fp_cooldown")
|
|
|
|
# signature_should_triage returns False (cooldown active)
|
|
gw = MockGatewayClient(responses={
|
|
"alert_ingest_tool.claim": {"alerts": [p1_alert], "claimed": 1, "requeued_stale": 0},
|
|
"oncall_tool.alert_to_incident": {
|
|
"incident_id": "inc_cooldown_001", "created": True,
|
|
"incident_signature": "abcd1234",
|
|
},
|
|
"oncall_tool.signature_should_triage": {"should_triage": False},
|
|
"oncall_tool.incident_append_event": {"event_id": 10},
|
|
"alert_ingest_tool.ack": {"alert_ref": p1_alert["alert_ref"], "status": "acked"},
|
|
})
|
|
|
|
state = {
|
|
"workspace_id": "ws1", "user_id": "u1", "agent_id": "sofiia",
|
|
"policy": policy, "dry_run": False, "max_alerts": 20,
|
|
"max_incidents_per_run": 5, "max_triages_per_run": 5,
|
|
"created_incidents": [], "updated_incidents": [], "skipped_alerts": [],
|
|
"errors": [],
|
|
}
|
|
|
|
async def run():
|
|
s = {**state, "_run_id": "test_cooldown_001"}
|
|
with patch("app.graphs.alert_triage_graph.GatewayClient", return_value=gw):
|
|
s = await list_alerts_node(s)
|
|
s = await process_alerts_node(s)
|
|
return s
|
|
|
|
result = asyncio.run(run())
|
|
# Incident was created
|
|
assert len(result.get("created_incidents", [])) >= 1
|
|
# No triage_run_id appended (cooldown blocked it)
|
|
# Verify append_event was called (for cooldown notification)
|
|
calls = gw.calls
|
|
append_calls = [c for c in calls
|
|
if c["tool"] == "oncall_tool" and c["action"] == "incident_append_event"]
|
|
assert len(append_calls) >= 1
|
|
# Ack was still called
|
|
ack_calls = [c for c in calls
|
|
if c["tool"] == "alert_ingest_tool" and c["action"] == "ack"]
|
|
assert len(ack_calls) >= 1
|
|
|
|
|
|
class TestAlertRoutingPolicy:
|
|
"""Policy loader and match_alert tests."""
|
|
|
|
def test_load_policy_builtin_fallback(self):
|
|
from app.alert_routing import load_policy
|
|
from pathlib import Path
|
|
result = load_policy(Path("/nonexistent/path.yml"))
|
|
assert "defaults" in result
|
|
assert "routing" in result
|
|
|
|
def test_match_p1_prod_returns_auto_incident(self):
|
|
from app.alert_routing import match_alert, load_policy
|
|
policy = load_policy()
|
|
alert = _make_alert(severity="P1", env="prod")
|
|
actions = match_alert(alert, policy)
|
|
assert actions["auto_incident"] is True
|
|
|
|
def test_match_p3_returns_digest_only(self):
|
|
from app.alert_routing import match_alert, load_policy
|
|
policy = load_policy()
|
|
alert = _make_alert(severity="P3", env="prod")
|
|
actions = match_alert(alert, policy)
|
|
assert actions.get("auto_incident", True) is False
|
|
assert actions.get("digest_only", False) is True
|
|
|
|
def test_match_security_returns_auto_incident(self):
|
|
from app.alert_routing import match_alert
|
|
# Use inline policy with security rule (avoids path resolution in tests)
|
|
policy = {
|
|
"defaults": {"dedupe_window_minutes_default": 120},
|
|
"routing": [
|
|
{
|
|
"match": {"kind_in": ["security"]},
|
|
"actions": {
|
|
"auto_incident": True, "auto_triage": True,
|
|
"triage_mode": "deterministic",
|
|
"incident_severity_cap": "P0",
|
|
"ack": True,
|
|
},
|
|
},
|
|
],
|
|
"kind_map": {},
|
|
}
|
|
alert = _make_alert(kind="security", severity="P2", env="dev")
|
|
actions = match_alert(alert, policy)
|
|
assert actions.get("auto_incident") is True
|
|
|
|
def test_llm_guard_off_mode(self):
|
|
from app.alert_routing import is_llm_allowed
|
|
policy = {
|
|
"defaults": {
|
|
"llm_mode": "off",
|
|
"llm_on": {"triage": True},
|
|
}
|
|
}
|
|
assert is_llm_allowed("triage", policy) is False
|
|
|
|
def test_llm_guard_local_mode_enabled(self):
|
|
from app.alert_routing import is_llm_allowed
|
|
policy = {
|
|
"defaults": {
|
|
"llm_mode": "local",
|
|
"llm_on": {"triage": True},
|
|
}
|
|
}
|
|
assert is_llm_allowed("triage", policy) is True
|
|
|
|
def test_kind_normalization(self):
|
|
from app.alert_routing import match_alert, load_policy
|
|
policy = load_policy()
|
|
# "oom_kill" is an alias for "oom" in kind_map
|
|
alert = _make_alert(kind="oom_kill", severity="P1", env="prod")
|
|
actions = match_alert(alert, policy)
|
|
assert actions["auto_incident"] is True
|
|
|
|
def test_fallback_no_match(self):
|
|
"""Alert with severity=P2 and no matching rule → digest_only."""
|
|
from app.alert_routing import match_alert
|
|
policy = {
|
|
"defaults": {"dedupe_window_minutes_default": 120},
|
|
"routing": [
|
|
{
|
|
"match": {"env_in": ["prod"], "severity_in": ["P0", "P1"]},
|
|
"actions": {"auto_incident": True, "ack": True},
|
|
}
|
|
],
|
|
}
|
|
alert = _make_alert(severity="P2", env="staging")
|
|
actions = match_alert(alert, policy)
|
|
assert actions["auto_incident"] is False
|
|
assert actions["digest_only"] is True
|
|
|
|
|
|
class TestDryRunMode:
|
|
"""Dry run should not write anything but still build digest."""
|
|
|
|
async def _run_dry(self, alerts, gw, state):
|
|
from app.graphs.alert_triage_graph import (
|
|
load_policy_node, list_alerts_node, process_alerts_node, build_digest_node
|
|
)
|
|
with patch("app.graphs.alert_triage_graph.load_policy") as mp:
|
|
mp.return_value = {
|
|
"defaults": {
|
|
"max_alerts_per_run": 10,
|
|
"only_unacked": False,
|
|
"max_incidents_per_run": 5,
|
|
"max_triages_per_run": 5,
|
|
"llm_mode": "off",
|
|
"llm_on": {},
|
|
"dedupe_window_minutes_default": 120,
|
|
"ack_note_prefix": "dry",
|
|
},
|
|
"routing": [],
|
|
}
|
|
s = await load_policy_node({**state, "dry_run": True})
|
|
s["_run_id"] = "dry_run_test"
|
|
s["alerts"] = alerts
|
|
|
|
with patch("app.graphs.alert_triage_graph.GatewayClient") as MockGW:
|
|
MockGW.return_value.__aenter__ = AsyncMock(return_value=gw)
|
|
MockGW.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
with patch("app.graphs.alert_triage_graph.match_alert",
|
|
side_effect=lambda a, p=None: {
|
|
"auto_incident": True, "auto_triage": False,
|
|
"triage_mode": "deterministic",
|
|
"incident_severity_cap": "P1",
|
|
"dedupe_window_minutes": 120,
|
|
"ack": False,
|
|
}):
|
|
with patch("app.graphs.alert_triage_graph.compute_incident_signature",
|
|
return_value="drysigsig"):
|
|
s = await process_alerts_node(s)
|
|
return await build_digest_node(s)
|
|
|
|
def test_dry_run_no_write_calls(self):
|
|
gw = MockGatewayClient()
|
|
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
|
|
alerts = [_make_alert(ref="alrt_dry", severity="P1")]
|
|
result = asyncio.run(self._run_dry(alerts, gw, state))
|
|
|
|
# No oncall tool write calls
|
|
write_calls = [c for c in gw.calls
|
|
if c["tool"] == "oncall_tool" and "incident" in c["action"]]
|
|
assert len(write_calls) == 0
|
|
|
|
def test_dry_run_digest_has_marker(self):
|
|
gw = MockGatewayClient()
|
|
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
|
|
alerts = [_make_alert(ref="alrt_dry2", severity="P1")]
|
|
result = asyncio.run(self._run_dry(alerts, gw, state))
|
|
digest = result.get("digest_md", "")
|
|
assert "DRY RUN" in digest
|