""" Tests for Incident Escalation Engine (deterministic, no LLM). Covers: - evaluate: P2→P1 when occurrences_60m crosses threshold - evaluate: P1→P0 when triage_count_24h crosses threshold - severity cap respected (never above P0) - followup event created on escalation - no escalation if thresholds not crossed - auto_resolve_candidates: found when no recent alerts - auto_resolve_candidates: not found when alerts recent - dry_run=True returns candidates but no state changes - occurrences_60m bucket rolling logic (MemorySignatureStateStore) """ import os import sys from datetime import datetime, timedelta from pathlib import Path from unittest.mock import patch ROOT = Path(__file__).resolve().parent.parent ROUTER = ROOT / "services" / "router" if str(ROUTER) not in sys.path: sys.path.insert(0, str(ROUTER)) # ─── Fixtures ──────────────────────────────────────────────────────────────── def _policy(): return { "defaults": {"window_minutes": 60}, "escalation": { "occurrences_thresholds": {"P2_to_P1": 10, "P1_to_P0": 25}, "triage_thresholds_24h": {"P2_to_P1": 3, "P1_to_P0": 6}, "severity_cap": "P0", "create_followup_on_escalate": True, "followup": { "priority": "P1", "due_hours": 24, "owner": "oncall", "message_template": "Escalated: occ={occurrences_60m}, triages={triage_count_24h}", }, }, "auto_resolve": { "no_alerts_minutes_for_candidate": 60, "close_allowed_severities": ["P2", "P3"], "auto_close": False, "candidate_event_type": "note", "candidate_message": "Auto-resolve candidate: no alerts in {no_alerts_minutes} minutes", }, "alert_loop_slo": { "claim_to_ack_p95_seconds": 60, "failed_rate_pct": 5, "processing_stuck_minutes": 15, }, } def _sig_store_with_state(signature, occurrences_60m=0, triage_count_24h=0): from signature_state_store import MemorySignatureStateStore, set_signature_state_store store = MemorySignatureStateStore() # Manually set state for testing now = datetime.utcnow().isoformat() store._states[signature] = { "signature": signature, "last_triage_at": now, "last_alert_at": now, "triage_count_24h": triage_count_24h, "occurrences_60m": occurrences_60m, "occurrences_60m_bucket_start": now, "updated_at": now, } set_signature_state_store(store) return store def _incident_store_with_open(incident_id, service="gateway", severity="P2", signature=None, env="prod"): from incident_store import MemoryIncidentStore, set_incident_store store = MemoryIncidentStore() # Create incident manually inc = { "id": incident_id, "service": service, "env": env, "severity": severity, "status": "open", "title": f"{service} issue", "summary": "", "started_at": datetime.utcnow().isoformat(), "created_by": "test", "created_at": datetime.utcnow().isoformat(), "updated_at": datetime.utcnow().isoformat(), "meta": {"incident_signature": signature} if signature else {}, } store._incidents[incident_id] = inc store._events[incident_id] = [] set_incident_store(store) return store class TestEscalationEngine: def setup_method(self): from alert_store import MemoryAlertStore, set_alert_store self.alert_store = MemoryAlertStore() set_alert_store(self.alert_store) def teardown_method(self): from alert_store import set_alert_store from signature_state_store import set_signature_state_store from incident_store import set_incident_store set_alert_store(None) set_signature_state_store(None) set_incident_store(None) def test_escalate_p2_to_p1_via_occurrences(self): from incident_escalation import evaluate_escalations sig = "sig_p2_to_p1" sig_store = _sig_store_with_state(sig, occurrences_60m=12, triage_count_24h=1) istore = _incident_store_with_open("inc_001", severity="P2", signature=sig) result = evaluate_escalations( params={"window_minutes": 60}, alert_store=self.alert_store, sig_state_store=sig_store, incident_store=istore, policy=_policy(), dry_run=False, ) assert result["escalated"] == 1 assert result["candidates"][0]["from_severity"] == "P2" assert result["candidates"][0]["to_severity"] == "P1" def test_escalate_p1_to_p0_via_triage_count(self): from incident_escalation import evaluate_escalations sig = "sig_p1_to_p0" sig_store = _sig_store_with_state(sig, occurrences_60m=5, triage_count_24h=7) istore = _incident_store_with_open("inc_002", severity="P1", signature=sig) result = evaluate_escalations( params={}, alert_store=self.alert_store, sig_state_store=sig_store, incident_store=istore, policy=_policy(), dry_run=False, ) assert result["escalated"] == 1 assert result["candidates"][0]["to_severity"] == "P0" def test_no_escalation_below_threshold(self): from incident_escalation import evaluate_escalations sig = "sig_ok" sig_store = _sig_store_with_state(sig, occurrences_60m=3, triage_count_24h=1) istore = _incident_store_with_open("inc_003", severity="P2", signature=sig) result = evaluate_escalations( params={}, alert_store=self.alert_store, sig_state_store=sig_store, incident_store=istore, policy=_policy(), dry_run=False, ) assert result["escalated"] == 0 def test_severity_cap_p0_not_exceeded(self): from incident_escalation import evaluate_escalations sig = "sig_p0_already" sig_store = _sig_store_with_state(sig, occurrences_60m=100, triage_count_24h=20) istore = _incident_store_with_open("inc_004", severity="P0", signature=sig) result = evaluate_escalations( params={}, alert_store=self.alert_store, sig_state_store=sig_store, incident_store=istore, policy=_policy(), dry_run=False, ) # P0 already at cap → no escalation assert result["escalated"] == 0 def test_followup_event_created_on_escalation(self): from incident_escalation import evaluate_escalations sig = "sig_followup" sig_store = _sig_store_with_state(sig, occurrences_60m=15, triage_count_24h=2) istore = _incident_store_with_open("inc_005", severity="P2", signature=sig) evaluate_escalations( params={}, alert_store=self.alert_store, sig_state_store=sig_store, incident_store=istore, policy=_policy(), dry_run=False, ) events = istore._events.get("inc_005", []) types = [e.get("type") for e in events] assert "decision" in types assert "followup" in types def test_dry_run_no_state_change(self): from incident_escalation import evaluate_escalations sig = "sig_dryrun" sig_store = _sig_store_with_state(sig, occurrences_60m=15, triage_count_24h=2) istore = _incident_store_with_open("inc_006", severity="P2", signature=sig) result = evaluate_escalations( params={"dry_run": True}, alert_store=self.alert_store, sig_state_store=sig_store, incident_store=istore, policy=_policy(), dry_run=True, ) # Candidates are returned but no incident events appended assert len(result["candidates"]) >= 1 assert result["escalated"] == 0 events = istore._events.get("inc_006", []) assert len(events) == 0 def test_no_incident_for_signature_skipped(self): from incident_escalation import evaluate_escalations sig = "sig_no_incident" sig_store = _sig_store_with_state(sig, occurrences_60m=50, triage_count_24h=10) # No incident for this signature from incident_store import MemoryIncidentStore, set_incident_store istore = MemoryIncidentStore() set_incident_store(istore) result = evaluate_escalations( params={}, alert_store=self.alert_store, sig_state_store=sig_store, incident_store=istore, policy=_policy(), dry_run=False, ) assert result["escalated"] == 0 class TestAutoResolveCandidates: def teardown_method(self): from signature_state_store import set_signature_state_store from incident_store import set_incident_store set_signature_state_store(None) set_incident_store(None) def test_candidate_found_when_no_recent_alerts(self): from incident_escalation import find_auto_resolve_candidates sig = "sig_quiet" from signature_state_store import MemorySignatureStateStore, set_signature_state_store sig_store = MemorySignatureStateStore() old_time = (datetime.utcnow() - timedelta(minutes=90)).isoformat() sig_store._states[sig] = { "signature": sig, "last_triage_at": old_time, "last_alert_at": old_time, "triage_count_24h": 0, "occurrences_60m": 0, "occurrences_60m_bucket_start": old_time, "updated_at": old_time, } set_signature_state_store(sig_store) istore = _incident_store_with_open("inc_quiet", severity="P2", signature=sig) result = find_auto_resolve_candidates( params={"no_alerts_minutes": 60}, sig_state_store=sig_store, incident_store=istore, policy=_policy(), dry_run=True, ) assert result["candidates_count"] >= 1 assert result["candidates"][0]["incident_id"] == "inc_quiet" assert result["closed_count"] == 0 # dry_run + auto_close=false def test_no_candidate_when_recent_alert(self): from incident_escalation import find_auto_resolve_candidates sig = "sig_active" from signature_state_store import MemorySignatureStateStore, set_signature_state_store sig_store = MemorySignatureStateStore() sig_store.mark_alert_seen(sig) # just now set_signature_state_store(sig_store) istore = _incident_store_with_open("inc_active", severity="P2", signature=sig) result = find_auto_resolve_candidates( params={"no_alerts_minutes": 60}, sig_state_store=sig_store, incident_store=istore, policy=_policy(), dry_run=True, ) assert result["candidates_count"] == 0 def test_p0_not_auto_close_eligible(self): from incident_escalation import find_auto_resolve_candidates sig = "sig_p0_quiet" from signature_state_store import MemorySignatureStateStore, set_signature_state_store sig_store = MemorySignatureStateStore() old_time = (datetime.utcnow() - timedelta(minutes=90)).isoformat() sig_store._states[sig] = { "signature": sig, "last_alert_at": old_time, "last_triage_at": old_time, "triage_count_24h": 0, "occurrences_60m": 0, "occurrences_60m_bucket_start": old_time, "updated_at": old_time, } set_signature_state_store(sig_store) istore = _incident_store_with_open("inc_p0", severity="P0", signature=sig) result = find_auto_resolve_candidates( params={}, sig_state_store=sig_store, incident_store=istore, policy=_policy(), dry_run=True, ) # P0 is a candidate but not auto_close_eligible (not in close_allowed_severities) assert result["candidates_count"] >= 1 cand = result["candidates"][0] assert cand["auto_close_eligible"] is False def test_candidate_event_appended_when_not_dry_run(self): from incident_escalation import find_auto_resolve_candidates sig = "sig_event" from signature_state_store import MemorySignatureStateStore, set_signature_state_store sig_store = MemorySignatureStateStore() old_time = (datetime.utcnow() - timedelta(minutes=90)).isoformat() sig_store._states[sig] = { "signature": sig, "last_alert_at": old_time, "last_triage_at": old_time, "triage_count_24h": 0, "occurrences_60m": 0, "occurrences_60m_bucket_start": old_time, "updated_at": old_time, } set_signature_state_store(sig_store) istore = _incident_store_with_open("inc_event", severity="P2", signature=sig) find_auto_resolve_candidates( params={"no_alerts_minutes": 60}, sig_state_store=sig_store, incident_store=istore, policy=_policy(), dry_run=False, # should append event ) events = istore._events.get("inc_event", []) assert len(events) == 1 assert "Auto-resolve candidate" in events[0]["message"] class TestOccurrences60mBucket: def setup_method(self): from signature_state_store import MemorySignatureStateStore, set_signature_state_store self.store = MemorySignatureStateStore() set_signature_state_store(self.store) def teardown_method(self): from signature_state_store import set_signature_state_store set_signature_state_store(None) def test_first_alert_starts_bucket(self): self.store.mark_alert_seen("sig1") state = self.store.get_state("sig1") assert state["occurrences_60m"] == 1 assert state["occurrences_60m_bucket_start"] is not None def test_repeated_alerts_increment_bucket(self): for _ in range(5): self.store.mark_alert_seen("sig2") state = self.store.get_state("sig2") assert state["occurrences_60m"] == 5 def test_old_bucket_resets(self): self.store.mark_alert_seen("sig3") # Back-date bucket start to > 60 min ago old_time = (datetime.utcnow() - timedelta(minutes=70)).isoformat() with self.store._lock: self.store._states["sig3"]["occurrences_60m_bucket_start"] = old_time self.store._states["sig3"]["occurrences_60m"] = 99 self.store.mark_alert_seen("sig3") state = self.store.get_state("sig3") assert state["occurrences_60m"] == 1 # reset to 1 def test_list_active_signatures(self): self.store.mark_alert_seen("active_sig") # Old sig (>60m without alerts) old_time = (datetime.utcnow() - timedelta(minutes=90)).isoformat() with self.store._lock: self.store._states["old_sig"] = { "signature": "old_sig", "last_alert_at": old_time, "last_triage_at": None, "triage_count_24h": 0, "occurrences_60m": 5, "occurrences_60m_bucket_start": old_time, "updated_at": old_time, } active = self.store.list_active_signatures(window_minutes=60) sigs = [s["signature"] for s in active] assert "active_sig" in sigs assert "old_sig" not in sigs def test_list_sorted_by_occurrences(self): self.store.mark_alert_seen("sig_low") # 1 occurrence for _ in range(10): self.store.mark_alert_seen("sig_high") # 10 occurrences active = self.store.list_active_signatures(window_minutes=60) assert active[0]["signature"] == "sig_high" assert active[0]["occurrences_60m"] == 10