Files
microdao-daarion/services/sofiia-supervisor/tests/test_alert_triage_graph.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

753 lines
32 KiB
Python

"""
Tests for alert_triage_graph.
Covers:
- P1 prod alert → incident created + deterministic triage + ack (no LLM)
- P3 alert → digest-only, no incident
- Signature dedupe → same signature reuses existing incident
- Gateway error on one alert → loop continues (non-fatal)
- Policy loader fallback (missing file)
- LLM guard: llm_mode=off forces deterministic even when rule says llm
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
from unittest.mock import patch, MagicMock, AsyncMock
ROOT = Path(__file__).resolve().parents[4]
SUPERVISOR = ROOT / "services" / "sofiia-supervisor"
if str(SUPERVISOR) not in sys.path:
sys.path.insert(0, str(SUPERVISOR))
# ─── Mock GatewayClient ───────────────────────────────────────────────────────
class MockToolCallResult:
def __init__(self, success=True, data=None, error_message=""):
self.success = success
self.data = data or {}
self.error_message = error_message
class MockGatewayClient:
"""Records all calls, returns configurable responses per (tool, action)."""
def __init__(self, responses: Optional[Dict] = None):
self.calls: List[Dict] = []
self.responses = responses or {}
async def call_tool(self, tool_name, action, params=None, **kwargs) -> MockToolCallResult:
self.calls.append({"tool": tool_name, "action": action, "params": params or {}})
key = f"{tool_name}.{action}"
if key in self.responses:
resp = self.responses[key]
if callable(resp):
return resp(tool_name, action, params)
return MockToolCallResult(True, resp)
# Default success responses per tool/action
defaults = {
"alert_ingest_tool.claim": {
"alerts": [], "claimed": 0, "requeued_stale": 0,
},
"alert_ingest_tool.list": {
"alerts": [], "count": 0,
},
"alert_ingest_tool.ack": {"ack_status": "acked"},
"alert_ingest_tool.fail": {"alert_ref": "?", "status": "failed"},
"oncall_tool.signature_should_triage": {"should_triage": True},
"oncall_tool.signature_mark_triage": {"marked": "triage_run"},
"oncall_tool.signature_mark_alert": {"marked": "alert_seen"},
"incident_escalation_tool.evaluate": {
"evaluated": 0, "escalated": 0, "followups_created": 0,
"candidates": [], "recommendations": [], "dry_run": False,
},
"incident_escalation_tool.auto_resolve_candidates": {
"candidates": [], "candidates_count": 0,
"closed": [], "closed_count": 0, "dry_run": True,
},
"oncall_tool.alert_to_incident": {
"incident_id": "inc_test_001",
"created": True,
"severity": "P1",
"incident_signature": "abcd1234" * 4,
},
"oncall_tool.incident_attach_artifact": {"artifact": {"path": "ops/incidents/test/triage.json"}},
"oncall_tool.incident_append_event": {"event": {"id": 1}},
"oncall_tool.service_health": {"healthy": True, "status": "ok"},
"observability_tool.service_overview": {"metrics": {}, "status": "ok"},
"kb_tool.snippets": {"snippets": []},
}
if key in defaults:
return MockToolCallResult(True, defaults[key])
return MockToolCallResult(True, {})
async def __aenter__(self):
return self
async def __aexit__(self, *args):
pass
# ─── Alert fixtures ───────────────────────────────────────────────────────────
def _make_alert(
service="gateway", severity="P1", kind="slo_breach",
env="prod", fingerprint="fp1", ref="alrt_001",
):
return {
"alert_ref": ref,
"source": "monitor@node1",
"service": service,
"env": env,
"severity": severity,
"kind": kind,
"title": f"{service} {kind} alert",
"summary": f"{service} is experiencing {kind}",
"started_at": "2025-01-23T09:00:00",
"labels": {"node": "node1", "fingerprint": fingerprint},
"metrics": {"latency_p95_ms": 450, "error_rate_pct": 2.5},
"ack_status": "pending",
}
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _run_graph(state_input: Dict, mock_gw: MockGatewayClient) -> Dict:
"""Execute alert_triage_graph with mocked GatewayClient."""
from app.graphs.alert_triage_graph import build_alert_triage_graph
graph = build_alert_triage_graph()
async def _run():
with patch("app.graphs.alert_triage_graph.GatewayClient", return_value=mock_gw):
with patch("app.graphs.alert_triage_graph.GatewayClient.__aenter__",
return_value=mock_gw):
with patch("app.graphs.alert_triage_graph.GatewayClient.__aexit__",
return_value=AsyncMock(return_value=None)):
return await graph.ainvoke(state_input)
return asyncio.run(_run())
# ─── Tests ────────────────────────────────────────────────────────────────────
class TestAlertTriageNoLLM:
"""P1 prod alert → incident + deterministic triage, zero LLM calls."""
def _run_with_p1_alert(self, alert_ref="alrt_p1"):
p1_alert = _make_alert(severity="P1", env="prod", ref=alert_ref)
inc_sig = hashlib.sha256(f"gateway|prod|slo_breach|fp1".encode()).hexdigest()[:32]
gw = MockGatewayClient(responses={
"alert_ingest_tool.claim": {
"alerts": [p1_alert], "claimed": 1, "requeued_stale": 0,
},
"alert_ingest_tool.list": {
"alerts": [p1_alert], "count": 1,
},
"oncall_tool.signature_should_triage": {"should_triage": False},
"oncall_tool.alert_to_incident": {
"incident_id": "inc_test_p1",
"created": True,
"severity": "P1",
"incident_signature": inc_sig,
},
})
state = {
"workspace_id": "default",
"user_id": "test",
"agent_id": "sofiia",
"_run_id": "test_run_001",
}
with patch("app.graphs.alert_triage_graph.load_policy") as mp:
mp.return_value = {
"defaults": {
"max_alerts_per_run": 10,
"only_unacked": False,
"max_incidents_per_run": 5,
"max_triages_per_run": 5,
"llm_mode": "off",
"llm_on": {"triage": False},
"dedupe_window_minutes_default": 120,
"ack_note_prefix": "test_loop",
},
"routing": [
{
"match": {"env_in": ["prod"], "severity_in": ["P0", "P1"]},
"actions": {
"auto_incident": True,
"auto_triage": False, # skip triage in unit test
"triage_mode": "deterministic",
"incident_severity_cap": "P1",
"dedupe_window_minutes": 120,
"attach_alert_artifact": True,
"ack": True,
},
},
],
}
with patch("app.graphs.alert_triage_graph.match_alert",
side_effect=lambda a, p: {
"auto_incident": True, "auto_triage": False,
"triage_mode": "deterministic",
"incident_severity_cap": "P1",
"dedupe_window_minutes": 120,
"ack": True,
"_normalized_kind": "slo_breach",
}):
result = asyncio.run(self._async_run_graph(state, gw))
return result, gw
async def _async_run_graph(self, state, gw):
from app.graphs.alert_triage_graph import (
load_policy_node, list_alerts_node, process_alerts_node, build_digest_node
)
s = await load_policy_node(state)
s["_run_id"] = "test_run_001"
with patch("app.graphs.alert_triage_graph.GatewayClient", return_value=gw):
s = await list_alerts_node(s)
s = await process_alerts_node(s)
s = await build_digest_node(s)
return s
def test_incident_created_for_p1_prod(self):
result, gw = self._run_with_p1_alert()
created = result.get("created_incidents", [])
assert len(created) >= 1
assert created[0]["incident_id"] == "inc_test_p1"
def test_no_llm_calls(self):
result, gw = self._run_with_p1_alert()
llm_tools = [c for c in gw.calls if c["tool"] in ("llm_tool", "chat_tool")]
assert len(llm_tools) == 0, f"Unexpected LLM calls: {llm_tools}"
def test_alert_acked(self):
result, gw = self._run_with_p1_alert()
ack_calls = [c for c in gw.calls
if c["tool"] == "alert_ingest_tool" and c["action"] == "ack"]
assert len(ack_calls) >= 1
def test_digest_contains_incident(self):
result, gw = self._run_with_p1_alert()
digest = result.get("digest_md", "")
assert "inc_test_p1" in digest
def test_result_summary_populated(self):
result, gw = self._run_with_p1_alert()
summary = result.get("result_summary", {})
assert summary.get("created_incidents", 0) >= 1
class TestAlertTriageDigestOnly:
"""P3 alert → digest_only, no incident created, alert acked."""
async def _run(self, gw, state):
from app.graphs.alert_triage_graph import (
load_policy_node, list_alerts_node, process_alerts_node, build_digest_node
)
with patch("app.graphs.alert_triage_graph.load_policy") as mp:
mp.return_value = {
"defaults": {
"max_alerts_per_run": 10,
"only_unacked": False,
"max_incidents_per_run": 5,
"max_triages_per_run": 5,
"llm_mode": "off",
"llm_on": {"triage": False},
"dedupe_window_minutes_default": 120,
"ack_note_prefix": "test_loop",
},
"routing": [
{
"match": {"severity_in": ["P2", "P3", "INFO"]},
"actions": {"auto_incident": False, "digest_only": True, "ack": True},
},
],
}
s = await load_policy_node(state)
s["_run_id"] = "test_p3"
with patch("app.graphs.alert_triage_graph.GatewayClient") as MockGW:
MockGW.return_value.__aenter__ = AsyncMock(return_value=gw)
MockGW.return_value.__aexit__ = AsyncMock(return_value=None)
with patch("app.graphs.alert_triage_graph.load_policy") as mp2:
mp2.return_value = s["policy"]
with patch("app.graphs.alert_triage_graph.match_alert",
side_effect=lambda a, p: {
"auto_incident": False, "digest_only": True, "ack": True,
}):
s = await list_alerts_node(s)
s = await process_alerts_node(s)
return await build_digest_node(s)
def test_no_incident_created(self):
p3_alert = _make_alert(severity="P3", env="prod", ref="alrt_p3")
gw = MockGatewayClient(responses={
"alert_ingest_tool.claim": {"alerts": [p3_alert], "claimed": 1, "requeued_stale": 0},
"alert_ingest_tool.list": {"alerts": [p3_alert], "count": 1},
})
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
result = asyncio.run(self._run(gw, state))
assert result.get("created_incidents", []) == []
assert len(result.get("skipped_alerts", [])) >= 1
def test_no_oncall_write_calls(self):
p3_alert = _make_alert(severity="P3", env="prod", ref="alrt_p3_2")
gw = MockGatewayClient(responses={
"alert_ingest_tool.claim": {"alerts": [p3_alert], "claimed": 1, "requeued_stale": 0},
"alert_ingest_tool.list": {"alerts": [p3_alert], "count": 1},
})
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
asyncio.run(self._run(gw, state))
write_calls = [c for c in gw.calls if c["tool"] == "oncall_tool"
and "incident" in c["action"]]
assert len(write_calls) == 0
def test_digest_shows_skipped(self):
p3_alert = _make_alert(severity="P3", env="prod", ref="alrt_p3_3")
gw = MockGatewayClient(responses={
"alert_ingest_tool.claim": {"alerts": [p3_alert], "claimed": 1, "requeued_stale": 0},
"alert_ingest_tool.list": {"alerts": [p3_alert], "count": 1},
})
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
result = asyncio.run(self._run(gw, state))
digest = result.get("digest_md", "")
assert "Skipped" in digest or "skipped" in digest.lower()
class TestAlertTriageSignatureDedupe:
"""Same signature → existing incident reused, no duplicate created."""
def test_same_signature_reuse(self):
from app.alert_routing import compute_incident_signature
alert1 = _make_alert(ref="alrt_sig1", fingerprint="samefp")
alert2 = _make_alert(ref="alrt_sig2", fingerprint="samefp") # same fingerprint
# Verify both produce the same signature
sig1 = compute_incident_signature(alert1)
sig2 = compute_incident_signature(alert2)
assert sig1 == sig2, f"Signatures differ: {sig1} vs {sig2}"
def test_different_fingerprint_different_signature(self):
from app.alert_routing import compute_incident_signature
alert1 = _make_alert(ref="alrt_diff1", fingerprint="fp_a")
alert2 = _make_alert(ref="alrt_diff2", fingerprint="fp_b")
sig1 = compute_incident_signature(alert1)
sig2 = compute_incident_signature(alert2)
assert sig1 != sig2
def test_different_service_different_signature(self):
from app.alert_routing import compute_incident_signature
alert1 = _make_alert(service="gateway", fingerprint="fp1")
alert2 = _make_alert(service="router", fingerprint="fp1")
assert compute_incident_signature(alert1) != compute_incident_signature(alert2)
def test_signature_stored_in_incident_meta(self):
"""Verify that alert_to_incident stores incident_signature in result."""
from app.alert_routing import compute_incident_signature
alert = _make_alert(ref="alrt_meta_test")
sig = compute_incident_signature(alert)
# The router tool_manager stores sig in incident meta and returns it
# We test the compute function here; integration tested in test_alert_to_incident.py
assert len(sig) == 32
assert all(c in "0123456789abcdef" for c in sig)
class TestAlertTriageNonFatalErrors:
"""Gateway error on one alert → loop continues others."""
async def _run_mixed(self, alerts, gw, state):
from app.graphs.alert_triage_graph import (
load_policy_node, list_alerts_node, process_alerts_node, build_digest_node
)
with patch("app.graphs.alert_triage_graph.load_policy") as mp:
mp.return_value = {
"defaults": {
"max_alerts_per_run": 10,
"only_unacked": False,
"max_incidents_per_run": 5,
"max_triages_per_run": 5,
"llm_mode": "off",
"llm_on": {},
"dedupe_window_minutes_default": 120,
"ack_note_prefix": "test",
},
"routing": [],
}
s = await load_policy_node(state)
s["_run_id"] = "test_nonfatal"
s["alerts"] = alerts
with patch("app.graphs.alert_triage_graph.GatewayClient") as MockGW:
MockGW.return_value.__aenter__ = AsyncMock(return_value=gw)
MockGW.return_value.__aexit__ = AsyncMock(return_value=None)
with patch("app.graphs.alert_triage_graph.match_alert") as mock_match:
call_count = [0]
def match_side_effect(alert, policy=None):
call_count[0] += 1
if call_count[0] == 1:
# First alert raises (simulated via actions that trigger error)
raise RuntimeError("Gateway timeout for first alert")
return {
"auto_incident": False, "digest_only": True, "ack": True,
}
mock_match.side_effect = match_side_effect
s = await process_alerts_node(s)
return await build_digest_node(s)
def test_error_on_one_continues_others(self):
alerts = [
_make_alert(ref="alrt_fail", severity="P1"),
_make_alert(ref="alrt_ok", severity="P3"),
]
gw = MockGatewayClient()
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
result = asyncio.run(self._run_mixed(alerts, gw, state))
# Both should be counted as processed
assert result.get("processed", 0) == 2
# Error recorded
errors = result.get("errors", [])
assert len(errors) >= 1
def test_digest_shows_errors(self):
alerts = [
_make_alert(ref="alrt_err", severity="P1"),
_make_alert(ref="alrt_ok2", severity="P3"),
]
gw = MockGatewayClient()
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
result = asyncio.run(self._run_mixed(alerts, gw, state))
digest = result.get("digest_md", "")
assert "Error" in digest or "error" in digest.lower()
class TestPostProcessNodes:
"""Test escalation + autoresolve post-process nodes."""
def setup_method(self):
sup_path = ROOT.parent / "services" / "sofiia-supervisor"
if str(sup_path) not in sys.path:
sys.path.insert(0, str(sup_path))
def test_escalation_result_in_digest(self):
"""Escalation results appear in digest when incidents are escalated."""
import asyncio
from app.graphs.alert_triage_graph import (
load_policy_node, list_alerts_node, process_alerts_node,
post_process_escalation_node, post_process_autoresolve_node, build_digest_node
)
p1_alert = _make_alert(severity="P1", fingerprint="fp_esc")
gw = MockGatewayClient(responses={
"alert_ingest_tool.claim": {
"alerts": [p1_alert], "claimed": 1, "requeued_stale": 0,
},
"oncall_tool.alert_to_incident": {
"incident_id": "inc_esc_001", "created": True,
"incident_signature": "esc_sig_001",
},
"oncall_tool.signature_should_triage": {"should_triage": False},
"incident_escalation_tool.evaluate": {
"evaluated": 1, "escalated": 1, "followups_created": 1,
"candidates": [{"incident_id": "inc_esc_001", "service": "gateway",
"from_severity": "P2", "to_severity": "P1",
"occurrences_60m": 15, "triage_count_24h": 2}],
"recommendations": ["Escalated inc_esc_001"],
"dry_run": False,
},
"incident_escalation_tool.auto_resolve_candidates": {
"candidates": [], "candidates_count": 0,
"closed": [], "closed_count": 0, "dry_run": True,
},
})
state = {
"workspace_id": "ws1", "user_id": "u1", "agent_id": "sofiia",
"policy": {
"defaults": {"only_unacked": True, "auto_incident": True,
"auto_triage": False, "llm_mode": "off", "ack": True},
"routing": [],
},
"dry_run": False, "max_alerts": 20,
"max_incidents_per_run": 5, "max_triages_per_run": 5,
"created_incidents": [], "updated_incidents": [], "skipped_alerts": [],
"errors": [],
}
async def run():
s = {**state, "_run_id": "test_esc_001"}
with patch("app.graphs.alert_triage_graph.GatewayClient", return_value=gw):
s = await list_alerts_node(s)
s = await process_alerts_node(s)
s = await post_process_escalation_node(s)
s = await post_process_autoresolve_node(s)
s = await build_digest_node(s)
return s
result = asyncio.run(run())
assert result["escalation_result"]["escalated"] == 1
assert result["result_summary"]["escalated"] == 1
assert "Escalated Incidents" in result["digest_md"]
def test_post_process_skipped_when_no_alerts_processed(self):
"""If 0 alerts processed, post-process nodes skip gracefully."""
import asyncio
from app.graphs.alert_triage_graph import (
post_process_escalation_node, post_process_autoresolve_node
)
state = {"processed": 0, "agent_id": "sofiia", "workspace_id": "ws1",
"_run_id": "test_skip_001", "dry_run": False}
gw = MockGatewayClient()
async def run():
s = {**state}
with patch("app.graphs.alert_triage_graph.GatewayClient", return_value=gw):
s = await post_process_escalation_node(s)
s = await post_process_autoresolve_node(s)
return s
result = asyncio.run(run())
assert result["escalation_result"] == {}
assert result["autoresolve_result"] == {}
# No tool calls made
esc_calls = [c for c in gw.calls if c["tool"] == "incident_escalation_tool"]
assert len(esc_calls) == 0
class TestCooldownPreventsTriage:
def setup_method(self):
sup_path = ROOT.parent / "services" / "sofiia-supervisor"
if str(sup_path) not in sys.path:
sys.path.insert(0, str(sup_path))
def test_cooldown_active_appends_event_but_acks(self):
"""When cooldown is active: no triage, but alert is acked and event appended."""
import asyncio
from app.graphs.alert_triage_graph import (
load_policy_node, list_alerts_node, process_alerts_node, build_digest_node
)
policy = {
"defaults": {
"only_unacked": True, "auto_incident": True, "auto_triage": True,
"triage_mode": "deterministic", "triage_cooldown_minutes": 15,
"llm_mode": "off",
},
"routing": [
{"match": {"severity": "P1"}, "actions": {
"auto_incident": True, "auto_triage": True,
"triage_mode": "deterministic", "incident_severity_cap": "P1",
"ack": True,
}}
],
}
p1_alert = _make_alert(severity="P1", fingerprint="fp_cooldown")
# signature_should_triage returns False (cooldown active)
gw = MockGatewayClient(responses={
"alert_ingest_tool.claim": {"alerts": [p1_alert], "claimed": 1, "requeued_stale": 0},
"oncall_tool.alert_to_incident": {
"incident_id": "inc_cooldown_001", "created": True,
"incident_signature": "abcd1234",
},
"oncall_tool.signature_should_triage": {"should_triage": False},
"oncall_tool.incident_append_event": {"event_id": 10},
"alert_ingest_tool.ack": {"alert_ref": p1_alert["alert_ref"], "status": "acked"},
})
state = {
"workspace_id": "ws1", "user_id": "u1", "agent_id": "sofiia",
"policy": policy, "dry_run": False, "max_alerts": 20,
"max_incidents_per_run": 5, "max_triages_per_run": 5,
"created_incidents": [], "updated_incidents": [], "skipped_alerts": [],
"errors": [],
}
async def run():
s = {**state, "_run_id": "test_cooldown_001"}
with patch("app.graphs.alert_triage_graph.GatewayClient", return_value=gw):
s = await list_alerts_node(s)
s = await process_alerts_node(s)
return s
result = asyncio.run(run())
# Incident was created
assert len(result.get("created_incidents", [])) >= 1
# No triage_run_id appended (cooldown blocked it)
# Verify append_event was called (for cooldown notification)
calls = gw.calls
append_calls = [c for c in calls
if c["tool"] == "oncall_tool" and c["action"] == "incident_append_event"]
assert len(append_calls) >= 1
# Ack was still called
ack_calls = [c for c in calls
if c["tool"] == "alert_ingest_tool" and c["action"] == "ack"]
assert len(ack_calls) >= 1
class TestAlertRoutingPolicy:
"""Policy loader and match_alert tests."""
def test_load_policy_builtin_fallback(self):
from app.alert_routing import load_policy
from pathlib import Path
result = load_policy(Path("/nonexistent/path.yml"))
assert "defaults" in result
assert "routing" in result
def test_match_p1_prod_returns_auto_incident(self):
from app.alert_routing import match_alert, load_policy
policy = load_policy()
alert = _make_alert(severity="P1", env="prod")
actions = match_alert(alert, policy)
assert actions["auto_incident"] is True
def test_match_p3_returns_digest_only(self):
from app.alert_routing import match_alert, load_policy
policy = load_policy()
alert = _make_alert(severity="P3", env="prod")
actions = match_alert(alert, policy)
assert actions.get("auto_incident", True) is False
assert actions.get("digest_only", False) is True
def test_match_security_returns_auto_incident(self):
from app.alert_routing import match_alert
# Use inline policy with security rule (avoids path resolution in tests)
policy = {
"defaults": {"dedupe_window_minutes_default": 120},
"routing": [
{
"match": {"kind_in": ["security"]},
"actions": {
"auto_incident": True, "auto_triage": True,
"triage_mode": "deterministic",
"incident_severity_cap": "P0",
"ack": True,
},
},
],
"kind_map": {},
}
alert = _make_alert(kind="security", severity="P2", env="dev")
actions = match_alert(alert, policy)
assert actions.get("auto_incident") is True
def test_llm_guard_off_mode(self):
from app.alert_routing import is_llm_allowed
policy = {
"defaults": {
"llm_mode": "off",
"llm_on": {"triage": True},
}
}
assert is_llm_allowed("triage", policy) is False
def test_llm_guard_local_mode_enabled(self):
from app.alert_routing import is_llm_allowed
policy = {
"defaults": {
"llm_mode": "local",
"llm_on": {"triage": True},
}
}
assert is_llm_allowed("triage", policy) is True
def test_kind_normalization(self):
from app.alert_routing import match_alert, load_policy
policy = load_policy()
# "oom_kill" is an alias for "oom" in kind_map
alert = _make_alert(kind="oom_kill", severity="P1", env="prod")
actions = match_alert(alert, policy)
assert actions["auto_incident"] is True
def test_fallback_no_match(self):
"""Alert with severity=P2 and no matching rule → digest_only."""
from app.alert_routing import match_alert
policy = {
"defaults": {"dedupe_window_minutes_default": 120},
"routing": [
{
"match": {"env_in": ["prod"], "severity_in": ["P0", "P1"]},
"actions": {"auto_incident": True, "ack": True},
}
],
}
alert = _make_alert(severity="P2", env="staging")
actions = match_alert(alert, policy)
assert actions["auto_incident"] is False
assert actions["digest_only"] is True
class TestDryRunMode:
"""Dry run should not write anything but still build digest."""
async def _run_dry(self, alerts, gw, state):
from app.graphs.alert_triage_graph import (
load_policy_node, list_alerts_node, process_alerts_node, build_digest_node
)
with patch("app.graphs.alert_triage_graph.load_policy") as mp:
mp.return_value = {
"defaults": {
"max_alerts_per_run": 10,
"only_unacked": False,
"max_incidents_per_run": 5,
"max_triages_per_run": 5,
"llm_mode": "off",
"llm_on": {},
"dedupe_window_minutes_default": 120,
"ack_note_prefix": "dry",
},
"routing": [],
}
s = await load_policy_node({**state, "dry_run": True})
s["_run_id"] = "dry_run_test"
s["alerts"] = alerts
with patch("app.graphs.alert_triage_graph.GatewayClient") as MockGW:
MockGW.return_value.__aenter__ = AsyncMock(return_value=gw)
MockGW.return_value.__aexit__ = AsyncMock(return_value=None)
with patch("app.graphs.alert_triage_graph.match_alert",
side_effect=lambda a, p=None: {
"auto_incident": True, "auto_triage": False,
"triage_mode": "deterministic",
"incident_severity_cap": "P1",
"dedupe_window_minutes": 120,
"ack": False,
}):
with patch("app.graphs.alert_triage_graph.compute_incident_signature",
return_value="drysigsig"):
s = await process_alerts_node(s)
return await build_digest_node(s)
def test_dry_run_no_write_calls(self):
gw = MockGatewayClient()
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
alerts = [_make_alert(ref="alrt_dry", severity="P1")]
result = asyncio.run(self._run_dry(alerts, gw, state))
# No oncall tool write calls
write_calls = [c for c in gw.calls
if c["tool"] == "oncall_tool" and "incident" in c["action"]]
assert len(write_calls) == 0
def test_dry_run_digest_has_marker(self):
gw = MockGatewayClient()
state = {"workspace_id": "default", "user_id": "test", "agent_id": "sofiia"}
alerts = [_make_alert(ref="alrt_dry2", severity="P1")]
result = asyncio.run(self._run_dry(alerts, gw, state))
digest = result.get("digest_md", "")
assert "DRY RUN" in digest