""" Tests for Root-Cause Buckets: build_root_cause_buckets + bucket_recommendations. """ import sys, os, datetime import pytest sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "router")) def _ts(days_ago: float = 0.0) -> str: return (datetime.datetime.utcnow() - datetime.timedelta(days=days_ago)).isoformat() def _make_inc(store, service, kind_tag, sig=None, days_ago=0.0, status="open", severity="P2"): meta = {} if sig: meta["incident_signature"] = sig if kind_tag: meta["kind"] = kind_tag inc = store.create_incident({ "service": service, "env": "prod", "severity": severity, "title": f"{kind_tag} on {service}", "started_at": _ts(days_ago), "created_by": "test", "meta": meta, }) if status == "closed": store.close_incident(inc["id"], _ts(days_ago - 0.01), "resolved") return inc @pytest.fixture def store(): from incident_store import MemoryIncidentStore return MemoryIncidentStore() @pytest.fixture def policy(): import incident_intelligence incident_intelligence._POLICY_CACHE = None return { "correlation": {"lookback_days": 30, "max_related": 10, "min_score": 20, "rules": []}, "recurrence": { "thresholds": {"signature": {"warn": 2, "high": 4}, "kind": {"warn": 3, "high": 6}}, "top_n": 15, }, "buckets": { "mode": "service_kind", "signature_prefix_len": 12, "top_n": 10, "min_count": {"7": 2, "30": 3}, }, "autofollowups": {"enabled": True, "only_when_high": True, "owner": "oncall", "priority": "P1", "due_days": 7, "max_followups_per_bucket_per_week": 1, "dedupe_key_prefix": "intel_recur"}, "digest": {"markdown_max_chars": 8000, "top_incidents": 20, "output_dir": "/tmp/test_bucket_reports", "include_closed": True, "include_open": True}, } class TestBuildRootCauseBuckets: def test_groups_by_service_kind(self, store, policy): from incident_intelligence import build_root_cause_buckets for _ in range(4): _make_inc(store, "gateway", "error_rate", days_ago=1.0) for _ in range(3): _make_inc(store, "router", "latency", days_ago=2.0) incidents = store.list_incidents(limit=100) buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30]) bkeys = [b["bucket_key"] for b in buckets] assert "gateway|error_rate" in bkeys assert "router|latency" in bkeys def test_min_count_filter_7d(self, store, policy): from incident_intelligence import build_root_cause_buckets # 1 incident only (below min_count[7]=2) — should not appear _make_inc(store, "svc", "latency", days_ago=1.0) # 3 incidents — should appear for _ in range(3): _make_inc(store, "svc2", "error_rate", days_ago=1.0) incidents = store.list_incidents(limit=100) buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30]) bkeys = [b["bucket_key"] for b in buckets] assert "svc2|error_rate" in bkeys assert "svc|latency" not in bkeys def test_min_count_filter_30d(self, store, policy): from incident_intelligence import build_root_cause_buckets # 4 incidents in 8–20d window (beyond 7d but within 30d, count_30d=4 >= min_30=3) for i in range(4): _make_inc(store, "gateway", "oom", days_ago=8.0 + i) incidents = store.list_incidents(limit=100) buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30]) bkeys = [b["bucket_key"] for b in buckets] assert "gateway|oom" in bkeys def test_top_n_enforced(self, store, policy): from incident_intelligence import build_root_cause_buckets for i in range(15): for j in range(3): _make_inc(store, f"svc{i}", "latency", days_ago=float(j) * 0.5) policy["buckets"]["top_n"] = 5 incidents = store.list_incidents(limit=200) buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30]) assert len(buckets) <= 5 def test_counts_correct(self, store, policy): from incident_intelligence import build_root_cause_buckets # 5 incidents in 7d window, 2 more in 8-15d (30d bucket) for _ in range(5): _make_inc(store, "gateway", "error_rate", days_ago=2.0) for _ in range(2): _make_inc(store, "gateway", "error_rate", days_ago=10.0) incidents = store.list_incidents(limit=100) buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30]) gw = next(b for b in buckets if b["bucket_key"] == "gateway|error_rate") assert gw["counts"]["7d"] == 5 assert gw["counts"]["30d"] == 7 def test_open_count_only_includes_open_mitigating(self, store, policy): from incident_intelligence import build_root_cause_buckets _make_inc(store, "svc", "latency", days_ago=1.0, status="open") _make_inc(store, "svc", "latency", days_ago=1.5, status="closed") _make_inc(store, "svc", "latency", days_ago=2.0, status="open") incidents = store.list_incidents(limit=100) + [ i for i in store.list_incidents({"status": "closed"}, limit=10) ] buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30]) svc_b = next((b for b in buckets if b["bucket_key"] == "svc|latency"), None) if svc_b: assert svc_b["counts"]["open"] == 2 def test_recommendations_are_deterministic(self, store, policy): from incident_intelligence import build_root_cause_buckets for _ in range(5): _make_inc(store, "gateway", "latency", days_ago=1.0) incidents = store.list_incidents(limit=100) b1 = build_root_cause_buckets(incidents, policy=policy) b2 = build_root_cause_buckets(incidents, policy=policy) assert b1[0]["recommendations"] == b2[0]["recommendations"] def test_signature_mode(self, store, policy): from incident_intelligence import build_root_cause_buckets SIG = "aabbccddee112233" * 2 for _ in range(3): _make_inc(store, "gateway", "error_rate", sig=SIG, days_ago=1.0) policy["buckets"]["mode"] = "signature_prefix" policy["buckets"]["signature_prefix_len"] = 12 incidents = store.list_incidents(limit=100) buckets = build_root_cause_buckets(incidents, policy=policy) bkeys = [b["bucket_key"] for b in buckets] assert any(b.startswith(SIG[:12]) for b in bkeys) def test_sorted_by_count_7d_desc(self, store, policy): from incident_intelligence import build_root_cause_buckets for _ in range(6): _make_inc(store, "svc_a", "error_rate", days_ago=1.0) for _ in range(3): _make_inc(store, "svc_b", "latency", days_ago=1.0) incidents = store.list_incidents(limit=100) buckets = build_root_cause_buckets(incidents, policy=policy) assert len(buckets) >= 2 assert buckets[0]["counts"]["7d"] >= buckets[1]["counts"]["7d"] class TestBucketRecommendations: def test_error_rate_recommendations(self): from incident_intelligence import bucket_recommendations b = {"kinds": {"error_rate"}, "counts": {"open": 0}} recs = bucket_recommendations(b) assert any("regression" in r.lower() or "SLO" in r for r in recs) def test_latency_recommendations(self): from incident_intelligence import bucket_recommendations b = {"kinds": {"latency"}, "counts": {"open": 0}} recs = bucket_recommendations(b) assert any("p95" in r.lower() or "perf" in r.lower() for r in recs) def test_security_recommendations(self): from incident_intelligence import bucket_recommendations b = {"kinds": {"security"}, "counts": {"open": 0}} recs = bucket_recommendations(b) assert any("secret" in r.lower() or "scanner" in r.lower() or "rotate" in r.lower() for r in recs) def test_open_incident_adds_warning(self): from incident_intelligence import bucket_recommendations b = {"kinds": {"latency"}, "counts": {"open": 2}} recs = bucket_recommendations(b) assert any("deploy" in r.lower() or "mitigat" in r.lower() for r in recs) def test_unknown_kind_returns_defaults(self): from incident_intelligence import bucket_recommendations b = {"kinds": {"custom"}, "counts": {"open": 0}} recs = bucket_recommendations(b) assert len(recs) > 0 def test_max_recs_capped(self): from incident_intelligence import bucket_recommendations b = {"kinds": {"error_rate", "latency", "oom", "disk", "security"}, "counts": {"open": 3}} recs = bucket_recommendations(b) assert len(recs) <= 5