""" tests/test_risk_attribution.py — Unit tests for the Risk Attribution Engine. Tests: - deploy alerts → deploy cause - occurrences/escalations → incident_storm cause - SLO violations → slo_violation cause - overdue followups → followups_overdue cause - alert-loop degradation → alert_loop_degraded cause - sort + max_causes + confidence bands - release gate results → dependency + drift causes """ import datetime import sys import pytest from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "services" / "router")) from risk_attribution import ( compute_attribution, _detect_deploy, _detect_dependency, _detect_drift, _detect_incident_storm, _detect_slo, _detect_followups_overdue, _detect_alert_loop_degraded, _score_to_confidence, _build_summary, _builtin_attr_defaults, _reload_attribution_policy, ) @pytest.fixture(autouse=True) def reset_cache(): _reload_attribution_policy() yield _reload_attribution_policy() @pytest.fixture def policy(): return _builtin_attr_defaults() def _alert(kind: str, hours_ago: float = 1.0) -> dict: ts = (datetime.datetime.utcnow() - datetime.timedelta(hours=hours_ago)).isoformat() return {"kind": kind, "created_at": ts, "service": "gateway"} def _cutoff(hours: int = 24) -> str: return (datetime.datetime.utcnow() - datetime.timedelta(hours=hours)).isoformat() # ─── Individual signal detectors ───────────────────────────────────────────── class TestDetectDeploy: def test_deploy_alert_gives_score(self, policy): alerts = [_alert("deploy", hours_ago=1)] score, evidence, _ = _detect_deploy(alerts, _cutoff(), policy) assert score == 30 assert "deploy alerts: 1" in evidence[0] def test_no_deploy_alerts_zero_score(self, policy): alerts = [_alert("cpu_high", hours_ago=1)] score, evidence, _ = _detect_deploy(alerts, _cutoff(), policy) assert score == 0 assert evidence == [] def test_multiple_deploy_alerts(self, policy): alerts = [_alert("deploy"), _alert("rollout", hours_ago=2), _alert("canary", hours_ago=3)] score, evidence, _ = _detect_deploy(alerts, _cutoff(), policy) assert score == 30 assert "3" in evidence[0] def test_old_deploy_ignored(self, policy): old_ts = (datetime.datetime.utcnow() - datetime.timedelta(hours=30)).isoformat() alerts = [{"kind": "deploy", "created_at": old_ts}] score, evidence, _ = _detect_deploy(alerts, _cutoff(24), policy) assert score == 0 class TestDetectDependency: def test_dependency_scan_fail_gives_score(self, policy): gates = [{"gate": "dependency_scan", "status": "fail"}] score, evidence, _ = _detect_dependency(gates, policy) assert score == 25 assert "dependency_scan" in evidence[0] def test_dependency_scan_warn_gives_score(self, policy): gates = [{"gate": "dependency_scan", "status": "warn"}] score, evidence, _ = _detect_dependency(gates, policy) assert score == 25 def test_dependency_scan_pass_zero(self, policy): gates = [{"gate": "dependency_scan", "status": "pass"}] score, evidence, _ = _detect_dependency(gates, policy) assert score == 0 def test_no_gate_results_zero(self, policy): score, evidence, _ = _detect_dependency([], policy) assert score == 0 class TestDetectDrift: def test_drift_fail_gives_score(self, policy): gates = [{"gate": "drift", "status": "fail"}] score, evidence, _ = _detect_drift(gates, policy) assert score == 25 def test_drift_pass_zero(self, policy): gates = [{"gate": "drift", "status": "pass"}] score, evidence, _ = _detect_drift(gates, policy) assert score == 0 class TestDetectIncidentStorm: def test_high_occurrences_gives_score(self, policy): score, evidence, _ = _detect_incident_storm(occurrences_60m=15, escalations_24h=0, policy=policy) assert score == 20 assert "occurrences_60m=15" in evidence[0] def test_high_escalations_gives_score(self, policy): score, evidence, _ = _detect_incident_storm(occurrences_60m=0, escalations_24h=3, policy=policy) assert score == 20 assert "escalations_24h=3" in evidence[0] def test_both_signals_combined_evidence(self, policy): score, evidence, _ = _detect_incident_storm(occurrences_60m=12, escalations_24h=4, policy=policy) assert score == 20 assert len(evidence) == 2 def test_below_threshold_zero(self, policy): score, evidence, _ = _detect_incident_storm(occurrences_60m=5, escalations_24h=1, policy=policy) assert score == 0 class TestDetectSlo: def test_one_violation_gives_score(self, policy): score, evidence, _ = _detect_slo(slo_violations=1, policy=policy) assert score == 15 def test_zero_violations_zero(self, policy): score, evidence, _ = _detect_slo(slo_violations=0, policy=policy) assert score == 0 class TestDetectFollowups: def test_overdue_gives_score(self, policy): score, evidence, _ = _detect_followups_overdue(overdue_count=2, policy=policy) assert score == 10 assert "2" in evidence[0] def test_zero_overdue_zero(self, policy): score, evidence, _ = _detect_followups_overdue(overdue_count=0, policy=policy) assert score == 0 class TestDetectAlertLoop: def test_loop_degraded_gives_score(self, policy): score, evidence, _ = _detect_alert_loop_degraded(loop_slo_violations=1, policy=policy) assert score == 10 def test_no_violations_zero(self, policy): score, evidence, _ = _detect_alert_loop_degraded(loop_slo_violations=0, policy=policy) assert score == 0 # ─── Confidence bands ───────────────────────────────────────────────────────── class TestConfidence: def test_score_60_is_high(self, policy): assert _score_to_confidence(60, policy) == "high" def test_score_35_is_medium(self, policy): assert _score_to_confidence(35, policy) == "medium" def test_score_30_is_low(self, policy): assert _score_to_confidence(30, policy) == "low" def test_score_0_is_low(self, policy): assert _score_to_confidence(0, policy) == "low" # ─── Full compute_attribution ───────────────────────────────────────────────── class TestComputeAttribution: def test_no_signals_empty_causes(self, policy): result = compute_attribution("gateway", "prod", policy=policy) assert result["causes"] == [] assert result["service"] == "gateway" assert result["summary"] == "No significant attribution signals detected." assert result["llm_enrichment"]["enabled"] is False def test_deploy_signal_produces_cause(self, policy): alerts = [_alert("deploy", hours_ago=1)] result = compute_attribution( "gateway", "prod", alerts_24h=alerts, policy=policy, ) types = [c["type"] for c in result["causes"]] assert "deploy" in types def test_multiple_causes_sorted_desc(self, policy): # deploy=30, slo=15, followups=10 alerts = [_alert("deploy")] result = compute_attribution( "gateway", "prod", alerts_24h=alerts, slo_violations=1, overdue_followup_count=2, policy=policy, ) scores = [c["score"] for c in result["causes"]] assert scores == sorted(scores, reverse=True) def test_max_causes_respected(self, policy): # Inject all 7 signal types to exceed max_causes=5 alerts = [_alert("deploy")] result = compute_attribution( "gateway", "prod", alerts_24h=alerts, occurrences_60m=15, escalations_24h=3, release_gate_results=[ {"gate": "dependency_scan", "status": "fail"}, {"gate": "drift", "status": "warn"}, ], slo_violations=1, overdue_followup_count=2, loop_slo_violations=1, policy=policy, ) assert len(result["causes"]) <= 5 def test_causes_have_confidence(self, policy): alerts = [_alert("deploy")] result = compute_attribution("gateway", "prod", alerts_24h=alerts, policy=policy) for cause in result["causes"]: assert "confidence" in cause assert cause["confidence"] in ("high", "medium", "low") def test_causes_have_evidence(self, policy): alerts = [_alert("rollout")] result = compute_attribution("gateway", "prod", alerts_24h=alerts, policy=policy) for cause in result["causes"]: assert isinstance(cause.get("evidence"), list) def test_slo_from_risk_report_components(self, policy): """If slo_violations=0 but risk_report has SLO data, it extracts from components.""" risk_report = { "service": "gateway", "env": "prod", "components": {"slo": {"violations": 2, "points": 20}}, } result = compute_attribution( "gateway", "prod", risk_report=risk_report, policy=policy, ) types = [c["type"] for c in result["causes"]] assert "slo_violation" in types def test_followups_from_risk_report_components(self, policy): risk_report = { "components": { "followups": {"P0": 1, "P1": 0, "other": 0, "points": 20} } } result = compute_attribution( "gateway", "prod", risk_report=risk_report, policy=policy, ) types = [c["type"] for c in result["causes"]] assert "followups_overdue" in types def test_summary_template_filled(self, policy): alerts = [_alert("deploy")] result = compute_attribution("gateway", "prod", alerts_24h=alerts, policy=policy) assert result["summary"].startswith("Likely causes:") assert "deploy" in result["summary"].lower() def test_incident_storm_cause(self, policy): result = compute_attribution( "router", "prod", occurrences_60m=12, escalations_24h=3, policy=policy, ) types = [c["type"] for c in result["causes"]] assert "incident_storm" in types