""" Tests for incident_intelligence.py — correlation function. Uses MemoryIncidentStore with controlled fixture data. """ import sys import os import datetime import pytest sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "router")) def _ts(offset_hours: float = 0.0) -> str: """Return ISO timestamp relative to now.""" return (datetime.datetime.utcnow() - datetime.timedelta(hours=offset_hours)).isoformat() def _make_inc(store, service, kind_tag, sig=None, started_offset_h=0.0, status="open", severity="P2"): """Helper to create an incident with controlled signature / kind.""" meta = {} if sig: meta["incident_signature"] = sig if kind_tag: meta["kind"] = kind_tag inc = store.create_incident({ "service": service, "env": "prod", "severity": severity, "title": f"{service} {kind_tag} issue", "started_at": _ts(started_offset_h), "created_by": "test", "meta": meta, }) if status != "open": store.close_incident(inc["id"], _ts(started_offset_h - 1), "resolved in test") return inc # ─── Fixtures ───────────────────────────────────────────────────────────────── @pytest.fixture def store(): from incident_store import MemoryIncidentStore return MemoryIncidentStore() @pytest.fixture def policy(): from incident_intelligence import load_intel_policy, _POLICY_CACHE import incident_intelligence incident_intelligence._POLICY_CACHE = None # clear any cached file-load return { "correlation": { "lookback_days": 30, "max_related": 10, "min_score": 20, "rules": [ {"name": "same_signature", "weight": 100, "match": {"signature": True}}, {"name": "same_service_and_kind", "weight": 60, "match": {"same_service": True, "same_kind": True}}, {"name": "same_service_time_cluster", "weight": 40, "match": {"same_service": True, "within_minutes": 180}}, {"name": "same_kind_cross_service", "weight": 30, "match": {"same_kind": True, "within_minutes": 120}}, ], }, "recurrence": { "windows_days": [7, 30], "thresholds": {"signature": {"warn": 2, "high": 4}, "kind": {"warn": 3, "high": 6}}, "top_n": 15, }, "digest": {"markdown_max_chars": 8000, "top_incidents": 20, "output_dir": "/tmp/test_incident_reports", "include_closed": True, "include_open": True}, } # ─── Tests ──────────────────────────────────────────────────────────────────── class TestCorrelateIncident: def test_same_signature_ranks_first(self, store, policy): from incident_intelligence import correlate_incident SIG = "aabbccdd1234" * 2 # fake sha-like sig target = _make_inc(store, "gateway", "error_rate", sig=SIG) same_sig = _make_inc(store, "gateway", "error_rate", sig=SIG, started_offset_h=1.0) diff_sig = _make_inc(store, "gateway", "error_rate", sig="zz99887766aabb1122") related = correlate_incident(target["id"], policy=policy, store=store) assert len(related) >= 2 # same_sig must be first (highest score) top_ids = [r["incident_id"] for r in related] assert same_sig["id"] in top_ids # same_sig should have higher score than diff_sig if diff_sig appears if diff_sig["id"] in top_ids: score_same = next(r["score"] for r in related if r["incident_id"] == same_sig["id"]) score_diff = next(r["score"] for r in related if r["incident_id"] == diff_sig["id"]) assert score_same > score_diff def test_same_service_and_kind_ranks_above_time_only(self, store, policy): from incident_intelligence import correlate_incident target = _make_inc(store, "gateway", "latency", sig=None, started_offset_h=0) same_svc_kind = _make_inc(store, "gateway", "latency", sig=None, started_offset_h=2) time_only = _make_inc(store, "gateway", "oom", sig=None, started_offset_h=1) related = correlate_incident(target["id"], policy=policy, store=store) ids = [r["incident_id"] for r in related] assert same_svc_kind["id"] in ids, "same_service_and_kind should appear" if time_only["id"] in ids: s1 = next(r["score"] for r in related if r["incident_id"] == same_svc_kind["id"]) s2 = next(r["score"] for r in related if r["incident_id"] == time_only["id"]) assert s1 >= s2, "same_service_and_kind should score >= time_cluster" def test_same_kind_cross_service_matches(self, store, policy): from incident_intelligence import correlate_incident target = _make_inc(store, "gateway", "latency", started_offset_h=0) cross = _make_inc(store, "router", "latency", started_offset_h=0.5) related = correlate_incident(target["id"], policy=policy, store=store) cross_match = next((r for r in related if r["incident_id"] == cross["id"]), None) assert cross_match is not None, "cross-service same-kind within window should match" assert "same_kind_cross_service" in cross_match["reasons"] def test_target_excluded_from_results(self, store, policy): from incident_intelligence import correlate_incident inc = _make_inc(store, "svc", "error_rate") _make_inc(store, "svc", "error_rate") # another incident related = correlate_incident(inc["id"], policy=policy, store=store) incident_ids = [r["incident_id"] for r in related] assert inc["id"] not in incident_ids, "target must not appear in related list" def test_max_related_enforced(self, store, policy): from incident_intelligence import correlate_incident # Create 20 incidents with same service and kind target = _make_inc(store, "svc", "latency", started_offset_h=0) for i in range(20): _make_inc(store, "svc", "latency", started_offset_h=float(i) / 10.0 + 0.1) policy["correlation"]["max_related"] = 5 related = correlate_incident(target["id"], policy=policy, store=store) assert len(related) <= 5 def test_min_score_filters_low_matches(self, store, policy): from incident_intelligence import correlate_incident target = _make_inc(store, "gateway", "latency", started_offset_h=0) # Service=other, kind=other, time=far → score 0 _make_inc(store, "other_svc", "disk", started_offset_h=24) policy["correlation"]["min_score"] = 10 related = correlate_incident(target["id"], policy=policy, store=store) for r in related: assert r["score"] >= 10 def test_returns_empty_for_unknown_incident(self, store, policy): from incident_intelligence import correlate_incident related = correlate_incident("inc_nonexistent", policy=policy, store=store) assert related == [] def test_append_note_adds_timeline_event(self, store, policy): from incident_intelligence import correlate_incident SIG = "sig123456789abc" target = _make_inc(store, "gateway", "error_rate", sig=SIG) _make_inc(store, "gateway", "error_rate", sig=SIG, started_offset_h=1.0) related = correlate_incident( target["id"], policy=policy, store=store, append_note=True ) assert len(related) >= 1 # Check that a note event was appended to target incident events = store.get_events(target["id"]) note_events = [e for e in events if e.get("type") == "note" and "Related incidents" in e.get("message", "")] assert len(note_events) >= 1 def test_reasons_populated(self, store, policy): from incident_intelligence import correlate_incident SIG = "sha256matchingsig" target = _make_inc(store, "svc", "latency", sig=SIG) _make_inc(store, "svc", "latency", sig=SIG, started_offset_h=0.5) related = correlate_incident(target["id"], policy=policy, store=store) assert len(related) > 0 assert len(related[0]["reasons"]) > 0