New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
167 lines
6.2 KiB
Python
167 lines
6.2 KiB
Python
"""
|
|
Tests for Alert-loop SLO metrics in MemoryAlertStore.compute_loop_slo.
|
|
|
|
Covers:
|
|
- claim_to_ack_p95_seconds computed correctly
|
|
- failed_rate_pct computed correctly
|
|
- processing_stuck_count detected
|
|
- violations list populated on threshold breach
|
|
- no violations when all healthy
|
|
"""
|
|
import sys
|
|
import datetime
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
ROUTER = ROOT / "services" / "router"
|
|
if str(ROUTER) not in sys.path:
|
|
sys.path.insert(0, str(ROUTER))
|
|
|
|
|
|
def _make_acked_record(store, claimed_delta_s: float, ack_delta_s: float):
|
|
"""Ingest an alert and manually set claimed_at + acked_at to simulate latency."""
|
|
from alert_ingest import ingest_alert
|
|
import uuid
|
|
now = datetime.datetime.utcnow()
|
|
alert_data = {
|
|
"source": "test",
|
|
"service": "gw",
|
|
"env": "prod",
|
|
"severity": "P1",
|
|
"kind": "slo_breach",
|
|
"title": "Test alert",
|
|
"summary": "test",
|
|
"started_at": now.isoformat(),
|
|
"labels": {"fingerprint": uuid.uuid4().hex},
|
|
"metrics": {},
|
|
}
|
|
r = ingest_alert(store, alert_data)
|
|
ref = r["alert_ref"]
|
|
with store._lock:
|
|
rec = store._alerts[ref]
|
|
rec["claimed_at"] = (now - datetime.timedelta(seconds=claimed_delta_s)).isoformat()
|
|
rec["acked_at"] = (now - datetime.timedelta(seconds=ack_delta_s)).isoformat()
|
|
rec["status"] = "acked"
|
|
return ref
|
|
|
|
|
|
def _make_failed_record(store):
|
|
from alert_ingest import ingest_alert
|
|
import uuid
|
|
now = datetime.datetime.utcnow()
|
|
alert_data = {
|
|
"source": "test",
|
|
"service": "gw",
|
|
"env": "prod",
|
|
"severity": "P1",
|
|
"kind": "error_rate",
|
|
"title": "Failed test alert",
|
|
"summary": "fail",
|
|
"started_at": now.isoformat(),
|
|
"labels": {"fingerprint": uuid.uuid4().hex},
|
|
"metrics": {},
|
|
}
|
|
r = ingest_alert(store, alert_data)
|
|
ref = r["alert_ref"]
|
|
store.mark_failed(ref, "processing error", retry_after_seconds=300)
|
|
return ref
|
|
|
|
|
|
class TestAlertLoopSLO:
|
|
def setup_method(self):
|
|
from alert_store import MemoryAlertStore, set_alert_store
|
|
self.store = MemoryAlertStore()
|
|
set_alert_store(self.store)
|
|
|
|
def teardown_method(self):
|
|
from alert_store import set_alert_store
|
|
set_alert_store(None)
|
|
|
|
def test_p95_computed_from_claim_to_ack(self):
|
|
# 10 alerts: claim→ack times of 10s, 20s, 30s, ... 100s
|
|
for i in range(1, 11):
|
|
_make_acked_record(self.store, claimed_delta_s=200, ack_delta_s=200 - i * 10)
|
|
slo = self.store.compute_loop_slo(window_minutes=60)
|
|
p95 = slo["claim_to_ack_p95_seconds"]
|
|
assert p95 is not None
|
|
assert 80 <= p95 <= 110 # p95 of 10,20,...100 ≈ 90-100s
|
|
|
|
def test_violation_when_p95_exceeds_threshold(self):
|
|
# 5 slow alerts: 120s each
|
|
for _ in range(5):
|
|
_make_acked_record(self.store, claimed_delta_s=200, ack_delta_s=200 - 120)
|
|
slo = self.store.compute_loop_slo(
|
|
window_minutes=60,
|
|
p95_threshold_s=60.0,
|
|
)
|
|
violations = slo["violations"]
|
|
viol_names = [v["metric"] for v in violations]
|
|
assert "claim_to_ack_p95_seconds" in viol_names
|
|
|
|
def test_no_violation_when_fast(self):
|
|
# 5 fast alerts: 5s each
|
|
for _ in range(5):
|
|
_make_acked_record(self.store, claimed_delta_s=100, ack_delta_s=100 - 5)
|
|
slo = self.store.compute_loop_slo(window_minutes=60, p95_threshold_s=60.0)
|
|
p95 = slo["claim_to_ack_p95_seconds"]
|
|
assert p95 is not None and p95 < 60.0
|
|
assert not slo["violations"]
|
|
|
|
def test_failed_rate_computed(self):
|
|
for _ in range(9):
|
|
_make_acked_record(self.store, claimed_delta_s=50, ack_delta_s=40)
|
|
_make_failed_record(self.store) # 1/10 = 10% failed
|
|
|
|
slo = self.store.compute_loop_slo(window_minutes=60, failed_rate_threshold_pct=5.0)
|
|
assert slo["failed_rate_pct"] >= 9.0 # at least 9%
|
|
assert any(v["metric"] == "failed_rate_pct" for v in slo["violations"])
|
|
|
|
def test_failed_rate_zero_when_all_acked(self):
|
|
for _ in range(5):
|
|
_make_acked_record(self.store, claimed_delta_s=50, ack_delta_s=40)
|
|
slo = self.store.compute_loop_slo(window_minutes=60)
|
|
assert slo["failed_rate_pct"] == 0.0
|
|
|
|
def test_processing_stuck_count(self):
|
|
from alert_ingest import ingest_alert
|
|
import uuid
|
|
now = datetime.datetime.utcnow()
|
|
# Create alert stuck in processing for 20 min
|
|
alert_data = {
|
|
"source": "test", "service": "gw", "env": "prod",
|
|
"severity": "P1", "kind": "custom", "title": "Stuck",
|
|
"summary": "stuck", "started_at": now.isoformat(),
|
|
"labels": {"fingerprint": uuid.uuid4().hex}, "metrics": {},
|
|
}
|
|
r = ingest_alert(self.store, alert_data)
|
|
ref = r["alert_ref"]
|
|
with self.store._lock:
|
|
rec = self.store._alerts[ref]
|
|
stuck_time = (now - datetime.timedelta(minutes=20)).isoformat()
|
|
rec["status"] = "processing"
|
|
rec["claimed_at"] = stuck_time
|
|
rec["processing_lock_until"] = (now + datetime.timedelta(minutes=5)).isoformat()
|
|
|
|
slo = self.store.compute_loop_slo(window_minutes=60, stuck_minutes=15.0)
|
|
assert slo["processing_stuck_count"] >= 1
|
|
assert any(v["metric"] == "processing_stuck_count" for v in slo["violations"])
|
|
|
|
def test_empty_store_returns_none_p95(self):
|
|
slo = self.store.compute_loop_slo(window_minutes=60)
|
|
assert slo["claim_to_ack_p95_seconds"] is None
|
|
assert slo["failed_rate_pct"] == 0.0
|
|
assert slo["processing_stuck_count"] == 0
|
|
assert slo["violations"] == []
|
|
|
|
def test_slo_thresholds_from_policy(self):
|
|
"""Verify that policy thresholds are used (not hardcoded)."""
|
|
from incident_escalation import load_escalation_policy, _builtin_defaults
|
|
# Force reset
|
|
import incident_escalation
|
|
incident_escalation._POLICY_CACHE = _builtin_defaults()
|
|
policy = load_escalation_policy()
|
|
loop_slo_cfg = policy.get("alert_loop_slo", {})
|
|
assert "claim_to_ack_p95_seconds" in loop_slo_cfg
|
|
assert "failed_rate_pct" in loop_slo_cfg
|
|
assert "processing_stuck_minutes" in loop_slo_cfg
|