Files
microdao-daarion/tests/test_incident_escalation.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

422 lines
16 KiB
Python

"""
Tests for Incident Escalation Engine (deterministic, no LLM).
Covers:
- evaluate: P2→P1 when occurrences_60m crosses threshold
- evaluate: P1→P0 when triage_count_24h crosses threshold
- severity cap respected (never above P0)
- followup event created on escalation
- no escalation if thresholds not crossed
- auto_resolve_candidates: found when no recent alerts
- auto_resolve_candidates: not found when alerts recent
- dry_run=True returns candidates but no state changes
- occurrences_60m bucket rolling logic (MemorySignatureStateStore)
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parent.parent
ROUTER = ROOT / "services" / "router"
if str(ROUTER) not in sys.path:
sys.path.insert(0, str(ROUTER))
# ─── Fixtures ────────────────────────────────────────────────────────────────
def _policy():
return {
"defaults": {"window_minutes": 60},
"escalation": {
"occurrences_thresholds": {"P2_to_P1": 10, "P1_to_P0": 25},
"triage_thresholds_24h": {"P2_to_P1": 3, "P1_to_P0": 6},
"severity_cap": "P0",
"create_followup_on_escalate": True,
"followup": {
"priority": "P1", "due_hours": 24, "owner": "oncall",
"message_template": "Escalated: occ={occurrences_60m}, triages={triage_count_24h}",
},
},
"auto_resolve": {
"no_alerts_minutes_for_candidate": 60,
"close_allowed_severities": ["P2", "P3"],
"auto_close": False,
"candidate_event_type": "note",
"candidate_message": "Auto-resolve candidate: no alerts in {no_alerts_minutes} minutes",
},
"alert_loop_slo": {
"claim_to_ack_p95_seconds": 60,
"failed_rate_pct": 5,
"processing_stuck_minutes": 15,
},
}
def _sig_store_with_state(signature, occurrences_60m=0, triage_count_24h=0):
from signature_state_store import MemorySignatureStateStore, set_signature_state_store
store = MemorySignatureStateStore()
# Manually set state for testing
now = datetime.utcnow().isoformat()
store._states[signature] = {
"signature": signature,
"last_triage_at": now,
"last_alert_at": now,
"triage_count_24h": triage_count_24h,
"occurrences_60m": occurrences_60m,
"occurrences_60m_bucket_start": now,
"updated_at": now,
}
set_signature_state_store(store)
return store
def _incident_store_with_open(incident_id, service="gateway", severity="P2",
signature=None, env="prod"):
from incident_store import MemoryIncidentStore, set_incident_store
store = MemoryIncidentStore()
# Create incident manually
inc = {
"id": incident_id,
"service": service,
"env": env,
"severity": severity,
"status": "open",
"title": f"{service} issue",
"summary": "",
"started_at": datetime.utcnow().isoformat(),
"created_by": "test",
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"meta": {"incident_signature": signature} if signature else {},
}
store._incidents[incident_id] = inc
store._events[incident_id] = []
set_incident_store(store)
return store
class TestEscalationEngine:
def setup_method(self):
from alert_store import MemoryAlertStore, set_alert_store
self.alert_store = MemoryAlertStore()
set_alert_store(self.alert_store)
def teardown_method(self):
from alert_store import set_alert_store
from signature_state_store import set_signature_state_store
from incident_store import set_incident_store
set_alert_store(None)
set_signature_state_store(None)
set_incident_store(None)
def test_escalate_p2_to_p1_via_occurrences(self):
from incident_escalation import evaluate_escalations
sig = "sig_p2_to_p1"
sig_store = _sig_store_with_state(sig, occurrences_60m=12, triage_count_24h=1)
istore = _incident_store_with_open("inc_001", severity="P2", signature=sig)
result = evaluate_escalations(
params={"window_minutes": 60},
alert_store=self.alert_store,
sig_state_store=sig_store,
incident_store=istore,
policy=_policy(),
dry_run=False,
)
assert result["escalated"] == 1
assert result["candidates"][0]["from_severity"] == "P2"
assert result["candidates"][0]["to_severity"] == "P1"
def test_escalate_p1_to_p0_via_triage_count(self):
from incident_escalation import evaluate_escalations
sig = "sig_p1_to_p0"
sig_store = _sig_store_with_state(sig, occurrences_60m=5, triage_count_24h=7)
istore = _incident_store_with_open("inc_002", severity="P1", signature=sig)
result = evaluate_escalations(
params={},
alert_store=self.alert_store,
sig_state_store=sig_store,
incident_store=istore,
policy=_policy(),
dry_run=False,
)
assert result["escalated"] == 1
assert result["candidates"][0]["to_severity"] == "P0"
def test_no_escalation_below_threshold(self):
from incident_escalation import evaluate_escalations
sig = "sig_ok"
sig_store = _sig_store_with_state(sig, occurrences_60m=3, triage_count_24h=1)
istore = _incident_store_with_open("inc_003", severity="P2", signature=sig)
result = evaluate_escalations(
params={},
alert_store=self.alert_store,
sig_state_store=sig_store,
incident_store=istore,
policy=_policy(),
dry_run=False,
)
assert result["escalated"] == 0
def test_severity_cap_p0_not_exceeded(self):
from incident_escalation import evaluate_escalations
sig = "sig_p0_already"
sig_store = _sig_store_with_state(sig, occurrences_60m=100, triage_count_24h=20)
istore = _incident_store_with_open("inc_004", severity="P0", signature=sig)
result = evaluate_escalations(
params={},
alert_store=self.alert_store,
sig_state_store=sig_store,
incident_store=istore,
policy=_policy(),
dry_run=False,
)
# P0 already at cap → no escalation
assert result["escalated"] == 0
def test_followup_event_created_on_escalation(self):
from incident_escalation import evaluate_escalations
sig = "sig_followup"
sig_store = _sig_store_with_state(sig, occurrences_60m=15, triage_count_24h=2)
istore = _incident_store_with_open("inc_005", severity="P2", signature=sig)
evaluate_escalations(
params={},
alert_store=self.alert_store,
sig_state_store=sig_store,
incident_store=istore,
policy=_policy(),
dry_run=False,
)
events = istore._events.get("inc_005", [])
types = [e.get("type") for e in events]
assert "decision" in types
assert "followup" in types
def test_dry_run_no_state_change(self):
from incident_escalation import evaluate_escalations
sig = "sig_dryrun"
sig_store = _sig_store_with_state(sig, occurrences_60m=15, triage_count_24h=2)
istore = _incident_store_with_open("inc_006", severity="P2", signature=sig)
result = evaluate_escalations(
params={"dry_run": True},
alert_store=self.alert_store,
sig_state_store=sig_store,
incident_store=istore,
policy=_policy(),
dry_run=True,
)
# Candidates are returned but no incident events appended
assert len(result["candidates"]) >= 1
assert result["escalated"] == 0
events = istore._events.get("inc_006", [])
assert len(events) == 0
def test_no_incident_for_signature_skipped(self):
from incident_escalation import evaluate_escalations
sig = "sig_no_incident"
sig_store = _sig_store_with_state(sig, occurrences_60m=50, triage_count_24h=10)
# No incident for this signature
from incident_store import MemoryIncidentStore, set_incident_store
istore = MemoryIncidentStore()
set_incident_store(istore)
result = evaluate_escalations(
params={},
alert_store=self.alert_store,
sig_state_store=sig_store,
incident_store=istore,
policy=_policy(),
dry_run=False,
)
assert result["escalated"] == 0
class TestAutoResolveCandidates:
def teardown_method(self):
from signature_state_store import set_signature_state_store
from incident_store import set_incident_store
set_signature_state_store(None)
set_incident_store(None)
def test_candidate_found_when_no_recent_alerts(self):
from incident_escalation import find_auto_resolve_candidates
sig = "sig_quiet"
from signature_state_store import MemorySignatureStateStore, set_signature_state_store
sig_store = MemorySignatureStateStore()
old_time = (datetime.utcnow() - timedelta(minutes=90)).isoformat()
sig_store._states[sig] = {
"signature": sig, "last_triage_at": old_time,
"last_alert_at": old_time, "triage_count_24h": 0,
"occurrences_60m": 0, "occurrences_60m_bucket_start": old_time,
"updated_at": old_time,
}
set_signature_state_store(sig_store)
istore = _incident_store_with_open("inc_quiet", severity="P2", signature=sig)
result = find_auto_resolve_candidates(
params={"no_alerts_minutes": 60},
sig_state_store=sig_store,
incident_store=istore,
policy=_policy(),
dry_run=True,
)
assert result["candidates_count"] >= 1
assert result["candidates"][0]["incident_id"] == "inc_quiet"
assert result["closed_count"] == 0 # dry_run + auto_close=false
def test_no_candidate_when_recent_alert(self):
from incident_escalation import find_auto_resolve_candidates
sig = "sig_active"
from signature_state_store import MemorySignatureStateStore, set_signature_state_store
sig_store = MemorySignatureStateStore()
sig_store.mark_alert_seen(sig) # just now
set_signature_state_store(sig_store)
istore = _incident_store_with_open("inc_active", severity="P2", signature=sig)
result = find_auto_resolve_candidates(
params={"no_alerts_minutes": 60},
sig_state_store=sig_store,
incident_store=istore,
policy=_policy(),
dry_run=True,
)
assert result["candidates_count"] == 0
def test_p0_not_auto_close_eligible(self):
from incident_escalation import find_auto_resolve_candidates
sig = "sig_p0_quiet"
from signature_state_store import MemorySignatureStateStore, set_signature_state_store
sig_store = MemorySignatureStateStore()
old_time = (datetime.utcnow() - timedelta(minutes=90)).isoformat()
sig_store._states[sig] = {
"signature": sig, "last_alert_at": old_time,
"last_triage_at": old_time, "triage_count_24h": 0,
"occurrences_60m": 0, "occurrences_60m_bucket_start": old_time,
"updated_at": old_time,
}
set_signature_state_store(sig_store)
istore = _incident_store_with_open("inc_p0", severity="P0", signature=sig)
result = find_auto_resolve_candidates(
params={},
sig_state_store=sig_store,
incident_store=istore,
policy=_policy(),
dry_run=True,
)
# P0 is a candidate but not auto_close_eligible (not in close_allowed_severities)
assert result["candidates_count"] >= 1
cand = result["candidates"][0]
assert cand["auto_close_eligible"] is False
def test_candidate_event_appended_when_not_dry_run(self):
from incident_escalation import find_auto_resolve_candidates
sig = "sig_event"
from signature_state_store import MemorySignatureStateStore, set_signature_state_store
sig_store = MemorySignatureStateStore()
old_time = (datetime.utcnow() - timedelta(minutes=90)).isoformat()
sig_store._states[sig] = {
"signature": sig, "last_alert_at": old_time,
"last_triage_at": old_time, "triage_count_24h": 0,
"occurrences_60m": 0, "occurrences_60m_bucket_start": old_time,
"updated_at": old_time,
}
set_signature_state_store(sig_store)
istore = _incident_store_with_open("inc_event", severity="P2", signature=sig)
find_auto_resolve_candidates(
params={"no_alerts_minutes": 60},
sig_state_store=sig_store,
incident_store=istore,
policy=_policy(),
dry_run=False, # should append event
)
events = istore._events.get("inc_event", [])
assert len(events) == 1
assert "Auto-resolve candidate" in events[0]["message"]
class TestOccurrences60mBucket:
def setup_method(self):
from signature_state_store import MemorySignatureStateStore, set_signature_state_store
self.store = MemorySignatureStateStore()
set_signature_state_store(self.store)
def teardown_method(self):
from signature_state_store import set_signature_state_store
set_signature_state_store(None)
def test_first_alert_starts_bucket(self):
self.store.mark_alert_seen("sig1")
state = self.store.get_state("sig1")
assert state["occurrences_60m"] == 1
assert state["occurrences_60m_bucket_start"] is not None
def test_repeated_alerts_increment_bucket(self):
for _ in range(5):
self.store.mark_alert_seen("sig2")
state = self.store.get_state("sig2")
assert state["occurrences_60m"] == 5
def test_old_bucket_resets(self):
self.store.mark_alert_seen("sig3")
# Back-date bucket start to > 60 min ago
old_time = (datetime.utcnow() - timedelta(minutes=70)).isoformat()
with self.store._lock:
self.store._states["sig3"]["occurrences_60m_bucket_start"] = old_time
self.store._states["sig3"]["occurrences_60m"] = 99
self.store.mark_alert_seen("sig3")
state = self.store.get_state("sig3")
assert state["occurrences_60m"] == 1 # reset to 1
def test_list_active_signatures(self):
self.store.mark_alert_seen("active_sig")
# Old sig (>60m without alerts)
old_time = (datetime.utcnow() - timedelta(minutes=90)).isoformat()
with self.store._lock:
self.store._states["old_sig"] = {
"signature": "old_sig", "last_alert_at": old_time,
"last_triage_at": None, "triage_count_24h": 0,
"occurrences_60m": 5, "occurrences_60m_bucket_start": old_time,
"updated_at": old_time,
}
active = self.store.list_active_signatures(window_minutes=60)
sigs = [s["signature"] for s in active]
assert "active_sig" in sigs
assert "old_sig" not in sigs
def test_list_sorted_by_occurrences(self):
self.store.mark_alert_seen("sig_low") # 1 occurrence
for _ in range(10):
self.store.mark_alert_seen("sig_high") # 10 occurrences
active = self.store.list_active_signatures(window_minutes=60)
assert active[0]["signature"] == "sig_high"
assert active[0]["occurrences_60m"] == 10