Files
microdao-daarion/tests/test_alert_dashboard_slo.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

167 lines
6.2 KiB
Python

"""
Tests for Alert-loop SLO metrics in MemoryAlertStore.compute_loop_slo.
Covers:
- claim_to_ack_p95_seconds computed correctly
- failed_rate_pct computed correctly
- processing_stuck_count detected
- violations list populated on threshold breach
- no violations when all healthy
"""
import sys
import datetime
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
ROUTER = ROOT / "services" / "router"
if str(ROUTER) not in sys.path:
sys.path.insert(0, str(ROUTER))
def _make_acked_record(store, claimed_delta_s: float, ack_delta_s: float):
"""Ingest an alert and manually set claimed_at + acked_at to simulate latency."""
from alert_ingest import ingest_alert
import uuid
now = datetime.datetime.utcnow()
alert_data = {
"source": "test",
"service": "gw",
"env": "prod",
"severity": "P1",
"kind": "slo_breach",
"title": "Test alert",
"summary": "test",
"started_at": now.isoformat(),
"labels": {"fingerprint": uuid.uuid4().hex},
"metrics": {},
}
r = ingest_alert(store, alert_data)
ref = r["alert_ref"]
with store._lock:
rec = store._alerts[ref]
rec["claimed_at"] = (now - datetime.timedelta(seconds=claimed_delta_s)).isoformat()
rec["acked_at"] = (now - datetime.timedelta(seconds=ack_delta_s)).isoformat()
rec["status"] = "acked"
return ref
def _make_failed_record(store):
from alert_ingest import ingest_alert
import uuid
now = datetime.datetime.utcnow()
alert_data = {
"source": "test",
"service": "gw",
"env": "prod",
"severity": "P1",
"kind": "error_rate",
"title": "Failed test alert",
"summary": "fail",
"started_at": now.isoformat(),
"labels": {"fingerprint": uuid.uuid4().hex},
"metrics": {},
}
r = ingest_alert(store, alert_data)
ref = r["alert_ref"]
store.mark_failed(ref, "processing error", retry_after_seconds=300)
return ref
class TestAlertLoopSLO:
def setup_method(self):
from alert_store import MemoryAlertStore, set_alert_store
self.store = MemoryAlertStore()
set_alert_store(self.store)
def teardown_method(self):
from alert_store import set_alert_store
set_alert_store(None)
def test_p95_computed_from_claim_to_ack(self):
# 10 alerts: claim→ack times of 10s, 20s, 30s, ... 100s
for i in range(1, 11):
_make_acked_record(self.store, claimed_delta_s=200, ack_delta_s=200 - i * 10)
slo = self.store.compute_loop_slo(window_minutes=60)
p95 = slo["claim_to_ack_p95_seconds"]
assert p95 is not None
assert 80 <= p95 <= 110 # p95 of 10,20,...100 ≈ 90-100s
def test_violation_when_p95_exceeds_threshold(self):
# 5 slow alerts: 120s each
for _ in range(5):
_make_acked_record(self.store, claimed_delta_s=200, ack_delta_s=200 - 120)
slo = self.store.compute_loop_slo(
window_minutes=60,
p95_threshold_s=60.0,
)
violations = slo["violations"]
viol_names = [v["metric"] for v in violations]
assert "claim_to_ack_p95_seconds" in viol_names
def test_no_violation_when_fast(self):
# 5 fast alerts: 5s each
for _ in range(5):
_make_acked_record(self.store, claimed_delta_s=100, ack_delta_s=100 - 5)
slo = self.store.compute_loop_slo(window_minutes=60, p95_threshold_s=60.0)
p95 = slo["claim_to_ack_p95_seconds"]
assert p95 is not None and p95 < 60.0
assert not slo["violations"]
def test_failed_rate_computed(self):
for _ in range(9):
_make_acked_record(self.store, claimed_delta_s=50, ack_delta_s=40)
_make_failed_record(self.store) # 1/10 = 10% failed
slo = self.store.compute_loop_slo(window_minutes=60, failed_rate_threshold_pct=5.0)
assert slo["failed_rate_pct"] >= 9.0 # at least 9%
assert any(v["metric"] == "failed_rate_pct" for v in slo["violations"])
def test_failed_rate_zero_when_all_acked(self):
for _ in range(5):
_make_acked_record(self.store, claimed_delta_s=50, ack_delta_s=40)
slo = self.store.compute_loop_slo(window_minutes=60)
assert slo["failed_rate_pct"] == 0.0
def test_processing_stuck_count(self):
from alert_ingest import ingest_alert
import uuid
now = datetime.datetime.utcnow()
# Create alert stuck in processing for 20 min
alert_data = {
"source": "test", "service": "gw", "env": "prod",
"severity": "P1", "kind": "custom", "title": "Stuck",
"summary": "stuck", "started_at": now.isoformat(),
"labels": {"fingerprint": uuid.uuid4().hex}, "metrics": {},
}
r = ingest_alert(self.store, alert_data)
ref = r["alert_ref"]
with self.store._lock:
rec = self.store._alerts[ref]
stuck_time = (now - datetime.timedelta(minutes=20)).isoformat()
rec["status"] = "processing"
rec["claimed_at"] = stuck_time
rec["processing_lock_until"] = (now + datetime.timedelta(minutes=5)).isoformat()
slo = self.store.compute_loop_slo(window_minutes=60, stuck_minutes=15.0)
assert slo["processing_stuck_count"] >= 1
assert any(v["metric"] == "processing_stuck_count" for v in slo["violations"])
def test_empty_store_returns_none_p95(self):
slo = self.store.compute_loop_slo(window_minutes=60)
assert slo["claim_to_ack_p95_seconds"] is None
assert slo["failed_rate_pct"] == 0.0
assert slo["processing_stuck_count"] == 0
assert slo["violations"] == []
def test_slo_thresholds_from_policy(self):
"""Verify that policy thresholds are used (not hardcoded)."""
from incident_escalation import load_escalation_policy, _builtin_defaults
# Force reset
import incident_escalation
incident_escalation._POLICY_CACHE = _builtin_defaults()
policy = load_escalation_policy()
loop_slo_cfg = policy.get("alert_loop_slo", {})
assert "claim_to_ack_p95_seconds" in loop_slo_cfg
assert "failed_rate_pct" in loop_slo_cfg
assert "processing_stuck_minutes" in loop_slo_cfg