New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
227 lines
8.9 KiB
Python
227 lines
8.9 KiB
Python
"""
|
||
Tests for Root-Cause Buckets: build_root_cause_buckets + bucket_recommendations.
|
||
"""
|
||
import sys, os, datetime
|
||
import pytest
|
||
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "router"))
|
||
|
||
|
||
def _ts(days_ago: float = 0.0) -> str:
|
||
return (datetime.datetime.utcnow() - datetime.timedelta(days=days_ago)).isoformat()
|
||
|
||
|
||
def _make_inc(store, service, kind_tag, sig=None, days_ago=0.0, status="open", severity="P2"):
|
||
meta = {}
|
||
if sig:
|
||
meta["incident_signature"] = sig
|
||
if kind_tag:
|
||
meta["kind"] = kind_tag
|
||
inc = store.create_incident({
|
||
"service": service, "env": "prod", "severity": severity,
|
||
"title": f"{kind_tag} on {service}", "started_at": _ts(days_ago),
|
||
"created_by": "test", "meta": meta,
|
||
})
|
||
if status == "closed":
|
||
store.close_incident(inc["id"], _ts(days_ago - 0.01), "resolved")
|
||
return inc
|
||
|
||
|
||
@pytest.fixture
|
||
def store():
|
||
from incident_store import MemoryIncidentStore
|
||
return MemoryIncidentStore()
|
||
|
||
|
||
@pytest.fixture
|
||
def policy():
|
||
import incident_intelligence
|
||
incident_intelligence._POLICY_CACHE = None
|
||
return {
|
||
"correlation": {"lookback_days": 30, "max_related": 10, "min_score": 20, "rules": []},
|
||
"recurrence": {
|
||
"thresholds": {"signature": {"warn": 2, "high": 4}, "kind": {"warn": 3, "high": 6}},
|
||
"top_n": 15,
|
||
},
|
||
"buckets": {
|
||
"mode": "service_kind",
|
||
"signature_prefix_len": 12,
|
||
"top_n": 10,
|
||
"min_count": {"7": 2, "30": 3},
|
||
},
|
||
"autofollowups": {"enabled": True, "only_when_high": True, "owner": "oncall",
|
||
"priority": "P1", "due_days": 7,
|
||
"max_followups_per_bucket_per_week": 1,
|
||
"dedupe_key_prefix": "intel_recur"},
|
||
"digest": {"markdown_max_chars": 8000, "top_incidents": 20,
|
||
"output_dir": "/tmp/test_bucket_reports",
|
||
"include_closed": True, "include_open": True},
|
||
}
|
||
|
||
|
||
class TestBuildRootCauseBuckets:
|
||
|
||
def test_groups_by_service_kind(self, store, policy):
|
||
from incident_intelligence import build_root_cause_buckets
|
||
|
||
for _ in range(4):
|
||
_make_inc(store, "gateway", "error_rate", days_ago=1.0)
|
||
for _ in range(3):
|
||
_make_inc(store, "router", "latency", days_ago=2.0)
|
||
|
||
incidents = store.list_incidents(limit=100)
|
||
buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30])
|
||
|
||
bkeys = [b["bucket_key"] for b in buckets]
|
||
assert "gateway|error_rate" in bkeys
|
||
assert "router|latency" in bkeys
|
||
|
||
def test_min_count_filter_7d(self, store, policy):
|
||
from incident_intelligence import build_root_cause_buckets
|
||
|
||
# 1 incident only (below min_count[7]=2) — should not appear
|
||
_make_inc(store, "svc", "latency", days_ago=1.0)
|
||
# 3 incidents — should appear
|
||
for _ in range(3):
|
||
_make_inc(store, "svc2", "error_rate", days_ago=1.0)
|
||
|
||
incidents = store.list_incidents(limit=100)
|
||
buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30])
|
||
|
||
bkeys = [b["bucket_key"] for b in buckets]
|
||
assert "svc2|error_rate" in bkeys
|
||
assert "svc|latency" not in bkeys
|
||
|
||
def test_min_count_filter_30d(self, store, policy):
|
||
from incident_intelligence import build_root_cause_buckets
|
||
|
||
# 4 incidents in 8–20d window (beyond 7d but within 30d, count_30d=4 >= min_30=3)
|
||
for i in range(4):
|
||
_make_inc(store, "gateway", "oom", days_ago=8.0 + i)
|
||
|
||
incidents = store.list_incidents(limit=100)
|
||
buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30])
|
||
|
||
bkeys = [b["bucket_key"] for b in buckets]
|
||
assert "gateway|oom" in bkeys
|
||
|
||
def test_top_n_enforced(self, store, policy):
|
||
from incident_intelligence import build_root_cause_buckets
|
||
|
||
for i in range(15):
|
||
for j in range(3):
|
||
_make_inc(store, f"svc{i}", "latency", days_ago=float(j) * 0.5)
|
||
|
||
policy["buckets"]["top_n"] = 5
|
||
incidents = store.list_incidents(limit=200)
|
||
buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30])
|
||
assert len(buckets) <= 5
|
||
|
||
def test_counts_correct(self, store, policy):
|
||
from incident_intelligence import build_root_cause_buckets
|
||
|
||
# 5 incidents in 7d window, 2 more in 8-15d (30d bucket)
|
||
for _ in range(5):
|
||
_make_inc(store, "gateway", "error_rate", days_ago=2.0)
|
||
for _ in range(2):
|
||
_make_inc(store, "gateway", "error_rate", days_ago=10.0)
|
||
|
||
incidents = store.list_incidents(limit=100)
|
||
buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30])
|
||
gw = next(b for b in buckets if b["bucket_key"] == "gateway|error_rate")
|
||
assert gw["counts"]["7d"] == 5
|
||
assert gw["counts"]["30d"] == 7
|
||
|
||
def test_open_count_only_includes_open_mitigating(self, store, policy):
|
||
from incident_intelligence import build_root_cause_buckets
|
||
|
||
_make_inc(store, "svc", "latency", days_ago=1.0, status="open")
|
||
_make_inc(store, "svc", "latency", days_ago=1.5, status="closed")
|
||
_make_inc(store, "svc", "latency", days_ago=2.0, status="open")
|
||
|
||
incidents = store.list_incidents(limit=100) + [
|
||
i for i in store.list_incidents({"status": "closed"}, limit=10)
|
||
]
|
||
buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30])
|
||
svc_b = next((b for b in buckets if b["bucket_key"] == "svc|latency"), None)
|
||
if svc_b:
|
||
assert svc_b["counts"]["open"] == 2
|
||
|
||
def test_recommendations_are_deterministic(self, store, policy):
|
||
from incident_intelligence import build_root_cause_buckets
|
||
|
||
for _ in range(5):
|
||
_make_inc(store, "gateway", "latency", days_ago=1.0)
|
||
|
||
incidents = store.list_incidents(limit=100)
|
||
b1 = build_root_cause_buckets(incidents, policy=policy)
|
||
b2 = build_root_cause_buckets(incidents, policy=policy)
|
||
assert b1[0]["recommendations"] == b2[0]["recommendations"]
|
||
|
||
def test_signature_mode(self, store, policy):
|
||
from incident_intelligence import build_root_cause_buckets
|
||
|
||
SIG = "aabbccddee112233" * 2
|
||
for _ in range(3):
|
||
_make_inc(store, "gateway", "error_rate", sig=SIG, days_ago=1.0)
|
||
|
||
policy["buckets"]["mode"] = "signature_prefix"
|
||
policy["buckets"]["signature_prefix_len"] = 12
|
||
incidents = store.list_incidents(limit=100)
|
||
buckets = build_root_cause_buckets(incidents, policy=policy)
|
||
bkeys = [b["bucket_key"] for b in buckets]
|
||
assert any(b.startswith(SIG[:12]) for b in bkeys)
|
||
|
||
def test_sorted_by_count_7d_desc(self, store, policy):
|
||
from incident_intelligence import build_root_cause_buckets
|
||
|
||
for _ in range(6):
|
||
_make_inc(store, "svc_a", "error_rate", days_ago=1.0)
|
||
for _ in range(3):
|
||
_make_inc(store, "svc_b", "latency", days_ago=1.0)
|
||
|
||
incidents = store.list_incidents(limit=100)
|
||
buckets = build_root_cause_buckets(incidents, policy=policy)
|
||
assert len(buckets) >= 2
|
||
assert buckets[0]["counts"]["7d"] >= buckets[1]["counts"]["7d"]
|
||
|
||
|
||
class TestBucketRecommendations:
|
||
|
||
def test_error_rate_recommendations(self):
|
||
from incident_intelligence import bucket_recommendations
|
||
b = {"kinds": {"error_rate"}, "counts": {"open": 0}}
|
||
recs = bucket_recommendations(b)
|
||
assert any("regression" in r.lower() or "SLO" in r for r in recs)
|
||
|
||
def test_latency_recommendations(self):
|
||
from incident_intelligence import bucket_recommendations
|
||
b = {"kinds": {"latency"}, "counts": {"open": 0}}
|
||
recs = bucket_recommendations(b)
|
||
assert any("p95" in r.lower() or "perf" in r.lower() for r in recs)
|
||
|
||
def test_security_recommendations(self):
|
||
from incident_intelligence import bucket_recommendations
|
||
b = {"kinds": {"security"}, "counts": {"open": 0}}
|
||
recs = bucket_recommendations(b)
|
||
assert any("secret" in r.lower() or "scanner" in r.lower() or "rotate" in r.lower()
|
||
for r in recs)
|
||
|
||
def test_open_incident_adds_warning(self):
|
||
from incident_intelligence import bucket_recommendations
|
||
b = {"kinds": {"latency"}, "counts": {"open": 2}}
|
||
recs = bucket_recommendations(b)
|
||
assert any("deploy" in r.lower() or "mitigat" in r.lower() for r in recs)
|
||
|
||
def test_unknown_kind_returns_defaults(self):
|
||
from incident_intelligence import bucket_recommendations
|
||
b = {"kinds": {"custom"}, "counts": {"open": 0}}
|
||
recs = bucket_recommendations(b)
|
||
assert len(recs) > 0
|
||
|
||
def test_max_recs_capped(self):
|
||
from incident_intelligence import bucket_recommendations
|
||
b = {"kinds": {"error_rate", "latency", "oom", "disk", "security"}, "counts": {"open": 3}}
|
||
recs = bucket_recommendations(b)
|
||
assert len(recs) <= 5
|