""" Tests for incident_intelligence.py — weekly_digest function. Verifies markdown generation, JSON structure, length clamp, deterministic recommendations, and artifact saving. """ import sys import os import datetime import json import tempfile import pytest sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "router")) def _ts(days_ago: float = 0.0) -> str: return (datetime.datetime.utcnow() - datetime.timedelta(days=days_ago)).isoformat() def _make_inc(store, service, kind_tag, sig=None, days_ago=0.0, status="open", severity="P2", title=None): meta = {} if sig: meta["incident_signature"] = sig if kind_tag: meta["kind"] = kind_tag inc = store.create_incident({ "service": service, "env": "prod", "severity": severity, "title": title or f"{kind_tag} on {service}", "started_at": _ts(days_ago), "created_by": "test", "meta": meta, }) if status == "closed": store.close_incident(inc["id"], _ts(days_ago - 0.01), "resolved in test") return inc @pytest.fixture def store(): from incident_store import MemoryIncidentStore return MemoryIncidentStore() @pytest.fixture def policy(tmp_path): import incident_intelligence incident_intelligence._POLICY_CACHE = None return { "correlation": {"lookback_days": 30, "max_related": 10, "min_score": 20, "rules": []}, "recurrence": { "windows_days": [7, 30], "thresholds": { "signature": {"warn": 2, "high": 4}, "kind": {"warn": 3, "high": 6}, }, "top_n": 15, "recommendations": { "signature_high": "Create fix for sig {sig}", "signature_warn": "Review sig {sig}", "kind_high": "Fix systemic {kind}", "kind_warn": "Review {kind}", }, }, "digest": { "markdown_max_chars": 8000, "top_incidents": 20, "output_dir": str(tmp_path / "reports"), "include_closed": True, "include_open": True, }, } # ─── Tests ──────────────────────────────────────────────────────────────────── class TestWeeklyDigest: def test_markdown_length_clamp(self, store, policy): from incident_intelligence import weekly_digest # Create many incidents to generate verbose markdown SIG = "aaabbbcccdddeee11122233" for i in range(30): _make_inc(store, f"svc_{i % 5}", "latency", sig=SIG, days_ago=float(i) * 0.2) policy["digest"]["markdown_max_chars"] = 500 result = weekly_digest(policy=policy, store=store, save_artifacts=False) assert len(result["markdown"]) <= 600, "markdown must be near or under max_chars" def test_includes_open_incidents(self, store, policy): from incident_intelligence import weekly_digest open_inc = _make_inc(store, "gateway", "error_rate", status="open", severity="P1") result = weekly_digest(policy=policy, store=store, save_artifacts=False) assert result["json_data"]["open_incidents_count"] >= 1 open_ids = [i["id"] for i in result["json_data"]["open_incidents"]] assert open_inc["id"] in open_ids def test_includes_recurrence_tables(self, store, policy): from incident_intelligence import weekly_digest SIG = "reccurencetestsig0099" for i in range(5): _make_inc(store, "svc", "latency", sig=SIG, days_ago=float(i) * 0.5) result = weekly_digest(policy=policy, store=store, save_artifacts=False) rec7 = result["json_data"]["recurrence_7d"] assert "top_signatures" in rec7 assert rec7["total_incidents"] >= 5 def test_deterministic_recommendations_stable(self, store, policy): from incident_intelligence import weekly_digest SIG = "stablerecurrence123456" # 5 incidents → high threshold (high=4) for i in range(5): _make_inc(store, "gateway", "latency", sig=SIG, days_ago=float(i) * 0.3) result1 = weekly_digest(policy=policy, store=store, save_artifacts=False) result2 = weekly_digest(policy=policy, store=store, save_artifacts=False) recs1 = [r["message"] for r in result1["json_data"]["recommendations"]] recs2 = [r["message"] for r in result2["json_data"]["recommendations"]] assert recs1 == recs2, "recommendations must be deterministic" def test_recommendations_for_high_signature(self, store, policy): from incident_intelligence import weekly_digest SIG = "highrecurrencesig0001" for i in range(5): # 5 >= high threshold (4) _make_inc(store, "gateway", "latency", sig=SIG, days_ago=float(i) * 0.3) result = weekly_digest(policy=policy, store=store, save_artifacts=False) recs = result["json_data"]["recommendations"] high_recs = [r for r in recs if r["level"] == "high" and r["category"] == "signature"] assert len(high_recs) >= 1 # Recommendation message should contain "fix" (from our policy template) assert any("fix" in r["message"].lower() or "Fix" in r["message"] for r in high_recs) def test_markdown_contains_key_sections(self, store, policy): from incident_intelligence import weekly_digest _make_inc(store, "gateway", "error_rate", status="open") result = weekly_digest(policy=policy, store=store, save_artifacts=False) md = result["markdown"] assert "Weekly Incident Digest" in md assert "Summary" in md assert "Recurrence" in md def test_save_artifacts_writes_files(self, store, policy, tmp_path): from incident_intelligence import weekly_digest policy["digest"]["output_dir"] = str(tmp_path / "reports") _make_inc(store, "svc", "latency") result = weekly_digest(policy=policy, store=store, save_artifacts=True) assert len(result["artifact_paths"]) == 2 # .json and .md for path in result["artifact_paths"]: assert os.path.exists(path), f"Artifact {path} not written" # Validate JSON is parseable json_path = next(p for p in result["artifact_paths"] if p.endswith(".json")) with open(json_path) as f: data = json.load(f) assert "week" in data assert "generated_at" in data def test_save_artifacts_false_no_files(self, store, policy, tmp_path): from incident_intelligence import weekly_digest policy["digest"]["output_dir"] = str(tmp_path / "no_artifacts") result = weekly_digest(policy=policy, store=store, save_artifacts=False) assert result["artifact_paths"] == [] def test_week_field_correct_format(self, store, policy): from incident_intelligence import weekly_digest result = weekly_digest(policy=policy, store=store, save_artifacts=False) week = result["week"] # Format YYYY-WWW (e.g. 2026-W08) assert len(week) >= 6 assert "-W" in week or "-w" in week.lower() or ( len(week.split("-")) == 2 and week.split("-")[1].isdigit() ), f"Unexpected week format: {week}" def test_empty_store_does_not_crash(self, store, policy): from incident_intelligence import weekly_digest result = weekly_digest(policy=policy, store=store, save_artifacts=False) assert result["json_data"]["open_incidents_count"] == 0 assert result["json_data"]["recent_7d_count"] == 0 assert "markdown" in result def test_markdown_no_secrets(self, store, policy): from incident_intelligence import weekly_digest # Title with fake secret — should NOT appear raw _make_inc(store, "svc", "security", title="API_KEY=sk-test-fakesecret in log line", status="open") result = weekly_digest(policy=policy, store=store, save_artifacts=False) md = result["markdown"] # The title is stored (it's not a real secret pattern), but digest truncates it safely assert len(md) < 20000, "markdown shouldn't explode with many incidents"