""" risk_attribution.py — Change Impact Attribution Engine (deterministic, no LLM by default). Given a service + env, explains WHY risk spiked by correlating signals: deploy activity, dependency scan findings, drift errors, incident storms, SLO violations, overdue follow-ups, alert-loop degradation. New in this revision: - Change Timeline: ordered event stream (deploy, incident, slo, followup, …) - Evidence refs: alert_ref[], incident_id[], release_check_run_id, artifact paths - Per-cause refs (clickthrough IDs for UI) Provides: load_attribution_policy() -> Dict compute_attribution(service, env, ...) -> AttributionReport (includes timeline + evidence_refs) build_timeline(events, policy) -> List[TimelineItem] fetch_signals_from_stores(service, env, ...) -> SignalsData LLM enrichment is separate (llm_enrichment.py) and off by default. """ from __future__ import annotations import datetime import logging import yaml from pathlib import Path from typing import Any, Dict, List, Optional, Tuple logger = logging.getLogger(__name__) # ─── Policy ─────────────────────────────────────────────────────────────────── _ATTR_POLICY_CACHE: Optional[Dict] = None _ATTR_POLICY_SEARCH_PATHS = [ Path("config/risk_attribution_policy.yml"), Path(__file__).resolve().parent.parent.parent / "config" / "risk_attribution_policy.yml", ] def load_attribution_policy() -> Dict: global _ATTR_POLICY_CACHE if _ATTR_POLICY_CACHE is not None: return _ATTR_POLICY_CACHE for p in _ATTR_POLICY_SEARCH_PATHS: if p.exists(): try: with open(p) as f: data = yaml.safe_load(f) or {} _ATTR_POLICY_CACHE = data return data except Exception as e: logger.warning("Failed to load risk_attribution_policy from %s: %s", p, e) _ATTR_POLICY_CACHE = _builtin_attr_defaults() return _ATTR_POLICY_CACHE def _reload_attribution_policy() -> None: global _ATTR_POLICY_CACHE _ATTR_POLICY_CACHE = None def _builtin_attr_defaults() -> Dict: return { "defaults": {"lookback_hours": 24, "max_causes": 5, "llm_mode": "off", "llm_max_chars_in": 3500, "llm_max_chars_out": 800}, "llm_triggers": {"risk_delta_warn": 10, "risk_delta_fail": 20, "band_in": ["high", "critical"]}, "weights": {"deploy": 30, "dependency": 25, "drift": 25, "incident_storm": 20, "slo_violation": 15, "followups_overdue": 10, "alert_loop_degraded": 10}, "signals": { "deploy": {"kinds": ["deploy", "deployment", "rollout", "canary"]}, "dependency": {"release_gate_names": ["dependency_scan", "deps"]}, "drift": {"release_gate_names": ["drift", "config_drift"]}, "incident_storm": {"thresholds": {"occurrences_60m_warn": 10, "escalations_24h_warn": 2}}, "slo": {"require_active_violation": True}, }, "output": {"confidence_bands": {"high": 60, "medium": 35}}, "timeline": { "enabled": True, "lookback_hours": 24, "max_items": 30, "include_types": ["deploy", "dependency", "drift", "incident", "slo", "followup", "alert_loop", "release_gate"], "time_bucket_minutes": 5, }, "evidence_linking": {"enabled": True, "max_refs_per_cause": 10}, "llm_local": { "endpoint": "http://localhost:11434/api/generate", "model": "llama3", "timeout_seconds": 15, "model_allowlist": ["qwen2.5-coder:3b", "llama3.1:8b-instruct", "phi3:mini", "llama3"], "max_calls_per_digest": 3, "per_day_dedupe": True, }, } # ─── Confidence ─────────────────────────────────────────────────────────────── def _score_to_confidence(score: int, policy: Dict) -> str: bands = policy.get("output", {}).get("confidence_bands", {}) high_t = int(bands.get("high", 60)) med_t = int(bands.get("medium", 35)) if score >= high_t: return "high" if score >= med_t: return "medium" return "low" # ─── Signal detection helpers (now also return refs) ────────────────────────── def _cap_refs(refs: List[Any], max_refs: int) -> List[Any]: return refs[:max_refs] def _detect_deploy( alerts: List[Dict], cutoff_iso: str, policy: Dict, max_refs: int = 10, ) -> Tuple[int, List[str], List[Dict]]: """Returns (score, evidence_list, refs).""" kinds = set(policy.get("signals", {}).get("deploy", {}).get( "kinds", ["deploy", "deployment", "rollout", "canary"] )) deploy_alerts = [ a for a in alerts if a.get("kind", "").lower() in kinds and a.get("created_at", "") >= cutoff_iso ] if not deploy_alerts: return 0, [], [] weight = int(policy.get("weights", {}).get("deploy", 30)) last_seen = max(a.get("created_at", "") for a in deploy_alerts) evidence = [ f"deploy alerts: {len(deploy_alerts)} in last 24h", f"last seen: {last_seen[:16] if last_seen else 'unknown'}", ] refs = _cap_refs( [{"alert_ref": a["alert_ref"], "kind": a.get("kind", "deploy"), "ts": a.get("created_at", "")} for a in deploy_alerts if a.get("alert_ref")], max_refs, ) return weight, evidence, refs def _detect_dependency( release_gate_results: List[Dict], policy: Dict, max_refs: int = 10, ) -> Tuple[int, List[str], List[Dict]]: gate_names = set(policy.get("signals", {}).get("dependency", {}).get( "release_gate_names", ["dependency_scan", "deps"] )) failing = [ g for g in release_gate_results if g.get("gate") in gate_names and g.get("status") in ("fail", "warn") ] if not failing: return 0, [], [] weight = int(policy.get("weights", {}).get("dependency", 25)) evidence = [f"dependency_scan gate: {g['gate']} = {g['status']}" for g in failing[:3]] refs = _cap_refs( [{"release_check_run_id": g.get("run_id"), "gate": g["gate"], "artifact": g.get("artifact")} for g in failing if g.get("run_id") or g.get("artifact")], max_refs, ) return weight, evidence, refs def _detect_drift( release_gate_results: List[Dict], policy: Dict, max_refs: int = 10, ) -> Tuple[int, List[str], List[Dict]]: gate_names = set(policy.get("signals", {}).get("drift", {}).get( "release_gate_names", ["drift", "config_drift"] )) failing = [ g for g in release_gate_results if g.get("gate") in gate_names and g.get("status") in ("fail", "warn") ] if not failing: return 0, [], [] weight = int(policy.get("weights", {}).get("drift", 25)) evidence = [f"drift gate: {g['gate']} = {g['status']}" for g in failing[:3]] refs = _cap_refs( [{"release_check_run_id": g.get("run_id"), "gate": g["gate"], "artifact": g.get("artifact")} for g in failing if g.get("run_id") or g.get("artifact")], max_refs, ) return weight, evidence, refs def _detect_incident_storm( occurrences_60m: int, escalations_24h: int, policy: Dict, incident_ids: Optional[List[str]] = None, max_refs: int = 10, ) -> Tuple[int, List[str], List[Dict]]: storm_cfg = policy.get("signals", {}).get("incident_storm", {}).get("thresholds", {}) occ_warn = int(storm_cfg.get("occurrences_60m_warn", 10)) esc_warn = int(storm_cfg.get("escalations_24h_warn", 2)) triggered = (occurrences_60m >= occ_warn) or (escalations_24h >= esc_warn) if not triggered: return 0, [], [] weight = int(policy.get("weights", {}).get("incident_storm", 20)) evidence = [] if occurrences_60m >= occ_warn: evidence.append(f"occurrences_60m={occurrences_60m} (≥{occ_warn})") if escalations_24h >= esc_warn: evidence.append(f"escalations_24h={escalations_24h} (≥{esc_warn})") refs = _cap_refs( [{"incident_id": iid} for iid in (incident_ids or [])], max_refs, ) return weight, evidence, refs def _detect_slo( slo_violations: int, policy: Dict, slo_metrics: Optional[List[str]] = None, max_refs: int = 10, ) -> Tuple[int, List[str], List[Dict]]: require_active = policy.get("signals", {}).get("slo", {}).get("require_active_violation", True) if require_active and slo_violations == 0: return 0, [], [] if slo_violations == 0: return 0, [], [] weight = int(policy.get("weights", {}).get("slo_violation", 15)) evidence = [f"active SLO violations: {slo_violations}"] refs = _cap_refs( [{"metric": m} for m in (slo_metrics or [])], max_refs, ) return weight, evidence, refs def _detect_followups_overdue( overdue_count: int, policy: Dict, followup_refs: Optional[List[Dict]] = None, max_refs: int = 10, ) -> Tuple[int, List[str], List[Dict]]: if overdue_count == 0: return 0, [], [] weight = int(policy.get("weights", {}).get("followups_overdue", 10)) evidence = [f"overdue follow-ups: {overdue_count}"] refs = _cap_refs(followup_refs or [], max_refs) return weight, evidence, refs def _detect_alert_loop_degraded( loop_slo_violations: int, policy: Dict, max_refs: int = 10, ) -> Tuple[int, List[str], List[Dict]]: if loop_slo_violations == 0: return 0, [], [] weight = int(policy.get("weights", {}).get("alert_loop_degraded", 10)) evidence = [f"alert-loop SLO violations: {loop_slo_violations}"] refs: List[Dict] = [] return weight, evidence, refs # ─── Timeline builder ──────────────────────────────────────────────────────── def _bucket_key(ts_iso: str, bucket_minutes: int) -> str: """Round timestamp down to the nearest bucket boundary.""" try: dt = datetime.datetime.fromisoformat(ts_iso.rstrip("Z")) total_mins = dt.hour * 60 + dt.minute bucket_start = (total_mins // bucket_minutes) * bucket_minutes return f"{dt.strftime('%Y-%m-%d')}T{bucket_start // 60:02d}:{bucket_start % 60:02d}" except Exception: return ts_iso[:13] # fallback: truncate to hour def build_timeline( raw_events: List[Dict], policy: Optional[Dict] = None, ) -> List[Dict]: """ Build an ordered Change Timeline from raw event dicts. raw_events is a list of: {ts, type, label, refs, ...} Returns newest-first list, bucketed and capped at max_items. Multiple same-type events in the same time bucket are coalesced into one "xN" item. """ if policy is None: policy = load_attribution_policy() tl_cfg = policy.get("timeline", {}) if not tl_cfg.get("enabled", True): return [] max_items = int(tl_cfg.get("max_items", 30)) bucket_minutes = int(tl_cfg.get("time_bucket_minutes", 5)) include_types = set(tl_cfg.get("include_types", [])) # Filter by allowed types filtered = [ e for e in raw_events if not include_types or e.get("type") in include_types ] # Sort newest-first filtered.sort(key=lambda e: e.get("ts", ""), reverse=True) # Bucket coalescing: same type + same bucket → single item with count seen: Dict[str, Dict] = {} # key → accumulated item order: List[str] = [] # preserve insertion order for ev in filtered: bk = _bucket_key(ev.get("ts", ""), bucket_minutes) key = f"{ev.get('type', 'unknown')}:{bk}" if key not in seen: seen[key] = { "ts": ev.get("ts", ""), "type": ev.get("type", "unknown"), "label": ev.get("label", ""), "refs": list(ev.get("refs", {}).items() if isinstance(ev.get("refs"), dict) else ev.get("refs", [])), "_count": 1, "_latest_ts": ev.get("ts", ""), } order.append(key) else: seen[key]["_count"] += 1 # Keep latest ts if ev.get("ts", "") > seen[key]["_latest_ts"]: seen[key]["_latest_ts"] = ev.get("ts", "") seen[key]["ts"] = ev.get("ts", "") # Merge refs (up to 5 per bucket) new_refs = (list(ev.get("refs", {}).items()) if isinstance(ev.get("refs"), dict) else ev.get("refs", [])) if len(seen[key]["refs"]) < 5: seen[key]["refs"].extend(new_refs[:5 - len(seen[key]["refs"])]) # Build final items items = [] for key in order: item = seen[key] count = item.pop("_count", 1) item.pop("_latest_ts", None) if count > 1: item["label"] = f"{item['label']} (×{count})" # Convert refs back to dict if needed if isinstance(item["refs"], list) and item["refs"] and isinstance(item["refs"][0], tuple): item["refs"] = dict(item["refs"]) items.append(item) return items[:max_items] def _make_timeline_events_from_alerts( alerts: List[Dict], deploy_kinds: set, cutoff_iso: str, ) -> List[Dict]: """Convert alert records to raw timeline events.""" events = [] for a in alerts: if a.get("created_at", "") < cutoff_iso: continue kind = a.get("kind", "").lower() ev_type = "deploy" if kind in deploy_kinds else "alert" refs = {} if a.get("alert_ref"): refs["alert_ref"] = a["alert_ref"] if a.get("service"): refs["service"] = a["service"] events.append({ "ts": a.get("created_at", ""), "type": ev_type, "label": f"Alert: {kind}" + (f" ({a.get('title', '')})" if a.get("title") else ""), "refs": refs, }) return events def _make_timeline_events_from_incidents( incidents: List[Dict], events_by_id: Dict[str, List[Dict]], cutoff_iso: str, ) -> List[Dict]: """Convert incident + escalation events to raw timeline events.""" timeline_events = [] for inc in incidents: inc_id = inc.get("id", "") started = inc.get("started_at") or inc.get("created_at", "") if started >= cutoff_iso: timeline_events.append({ "ts": started, "type": "incident", "label": f"Incident started: {inc.get('title', inc_id)[:80]}", "refs": {"incident_id": inc_id}, }) for ev in events_by_id.get(inc_id, []): if (ev.get("type") == "decision" and "Escalat" in (ev.get("message") or "") and ev.get("ts", "") >= cutoff_iso): timeline_events.append({ "ts": ev["ts"], "type": "incident", "label": f"Incident escalated: {inc_id}", "refs": {"incident_id": inc_id, "event_type": ev.get("type", "")}, }) return timeline_events def _make_timeline_events_from_gates( release_gate_results: List[Dict], ) -> List[Dict]: """Convert release gate results to raw timeline events.""" events = [] for g in release_gate_results: if g.get("status") not in ("fail", "warn"): continue gate_type = "dependency" if "dep" in g.get("gate", "").lower() else "release_gate" if "drift" in g.get("gate", "").lower(): gate_type = "drift" refs: Dict = {} if g.get("run_id"): refs["release_check_run_id"] = g["run_id"] if g.get("artifact"): refs["artifact"] = g["artifact"] events.append({ "ts": g.get("ts", datetime.datetime.utcnow().isoformat()), "type": gate_type, "label": f"Gate {g['gate']} = {g['status']}", "refs": refs, }) return events # ─── Evidence refs builder ──────────────────────────────────────────────────── def build_evidence_refs( alerts_24h: List[Dict], incidents_24h: List[Dict], release_gate_results: List[Dict], followup_refs: Optional[List[Dict]] = None, policy: Optional[Dict] = None, ) -> Dict: """ Collect top-level evidence_refs: alert_refs, incident_ids, release_check_run_ids, artifacts. """ if policy is None: policy = load_attribution_policy() max_refs = int(policy.get("evidence_linking", {}).get("max_refs_per_cause", 10)) alert_refs = _cap_refs( [a["alert_ref"] for a in alerts_24h if a.get("alert_ref")], max_refs ) incident_ids = _cap_refs( list({inc.get("id", "") for inc in incidents_24h if inc.get("id")}), max_refs ) rc_ids = _cap_refs( list({g.get("run_id") for g in release_gate_results if g.get("run_id")}), max_refs ) artifacts = _cap_refs( list({g.get("artifact") for g in release_gate_results if g.get("artifact")}), max_refs ) fu_refs = _cap_refs( [r for r in (followup_refs or []) if r], max_refs ) return { "alerts": alert_refs, "incidents": incident_ids, "release_checks": list(filter(None, rc_ids)), "artifacts": list(filter(None, artifacts)), "followups": fu_refs, } # ─── Summary builder ────────────────────────────────────────────────────────── _TYPE_LABELS = { "deploy": "deploy activity", "dependency": "dependency change", "drift": "config/infrastructure drift", "incident_storm": "incident storm", "slo_violation": "SLO violation", "followups_overdue": "overdue follow-ups", "alert_loop_degraded": "alert-loop degradation", } def _build_summary(causes: List[Dict]) -> str: if not causes: return "No significant attribution signals detected." labels = [_TYPE_LABELS.get(c["type"], c["type"]) for c in causes[:3]] return "Likely causes: " + " + ".join(labels) + "." # ─── Main attribution function ──────────────────────────────────────────────── def compute_attribution( service: str, env: str, *, risk_report: Optional[Dict] = None, # Signals (pre-fetched) alerts_24h: Optional[List[Dict]] = None, occurrences_60m: int = 0, escalations_24h: int = 0, release_gate_results: Optional[List[Dict]] = None, slo_violations: int = 0, slo_metrics: Optional[List[str]] = None, overdue_followup_count: int = 0, followup_refs: Optional[List[Dict]] = None, loop_slo_violations: int = 0, # For evidence + timeline incidents_24h: Optional[List[Dict]] = None, incident_events: Optional[Dict[str, List[Dict]]] = None, window_hours: int = 24, policy: Optional[Dict] = None, ) -> Dict: """ Deterministic attribution: causes with evidence, refs, timeline, evidence_refs. All signal arguments default to safe empty values. Never raises (returns minimal report on any error). """ if policy is None: policy = load_attribution_policy() cutoff = ( datetime.datetime.utcnow() - datetime.timedelta(hours=window_hours) ).isoformat() max_causes = int(policy.get("defaults", {}).get("max_causes", 5)) max_refs = int(policy.get("evidence_linking", {}).get("max_refs_per_cause", 10)) risk_report = risk_report or {} alerts_24h = alerts_24h or [] release_gate_results = release_gate_results or [] incidents_24h = incidents_24h or [] incident_events = incident_events or {} # Extract from risk_report.components when not explicitly provided if slo_violations == 0 and risk_report: slo_violations = (risk_report.get("components", {}).get("slo") or {}).get("violations", 0) if overdue_followup_count == 0 and risk_report: fu = risk_report.get("components", {}).get("followups") or {} overdue_followup_count = fu.get("P0", 0) + fu.get("P1", 0) + fu.get("other", 0) if loop_slo_violations == 0 and risk_report: loop_slo_violations = ( risk_report.get("components", {}).get("alerts_loop") or {} ).get("violations", 0) incident_ids = [inc.get("id", "") for inc in incidents_24h if inc.get("id")] # ── Score each signal (now with refs) ──────────────────────────────────── candidates: List[Dict] = [] score, evid, refs = _detect_deploy(alerts_24h, cutoff, policy, max_refs) if score: candidates.append({"type": "deploy", "score": score, "evidence": evid, "refs": refs}) score, evid, refs = _detect_dependency(release_gate_results, policy, max_refs) if score: candidates.append({"type": "dependency", "score": score, "evidence": evid, "refs": refs}) score, evid, refs = _detect_drift(release_gate_results, policy, max_refs) if score: candidates.append({"type": "drift", "score": score, "evidence": evid, "refs": refs}) score, evid, refs = _detect_incident_storm( occurrences_60m, escalations_24h, policy, incident_ids, max_refs ) if score: candidates.append({"type": "incident_storm", "score": score, "evidence": evid, "refs": refs}) score, evid, refs = _detect_slo(slo_violations, policy, slo_metrics, max_refs) if score: candidates.append({"type": "slo_violation", "score": score, "evidence": evid, "refs": refs}) score, evid, refs = _detect_followups_overdue( overdue_followup_count, policy, followup_refs, max_refs ) if score: candidates.append({"type": "followups_overdue", "score": score, "evidence": evid, "refs": refs}) score, evid, refs = _detect_alert_loop_degraded(loop_slo_violations, policy, max_refs) if score: candidates.append({"type": "alert_loop_degraded", "score": score, "evidence": evid, "refs": refs}) # Sort desc, cap, add confidence candidates.sort(key=lambda c: -c["score"]) causes = candidates[:max_causes] for c in causes: c["confidence"] = _score_to_confidence(c["score"], policy) delta_24h = (risk_report.get("trend") or {}).get("delta_24h") summary = _build_summary(causes) # ── Timeline ────────────────────────────────────────────────────────────── tl_cfg = policy.get("timeline", {}) deploy_kinds = set(policy.get("signals", {}).get("deploy", {}).get( "kinds", ["deploy", "deployment", "rollout", "canary"] )) raw_events: List[Dict] = [] raw_events.extend(_make_timeline_events_from_alerts(alerts_24h, deploy_kinds, cutoff)) raw_events.extend(_make_timeline_events_from_incidents(incidents_24h, incident_events, cutoff)) raw_events.extend(_make_timeline_events_from_gates(release_gate_results)) timeline = build_timeline(raw_events, policy) if tl_cfg.get("enabled", True) else [] # ── Evidence refs ───────────────────────────────────────────────────────── evidence_refs: Dict = {} if policy.get("evidence_linking", {}).get("enabled", True): evidence_refs = build_evidence_refs( alerts_24h, incidents_24h, release_gate_results, followup_refs=followup_refs, policy=policy, ) return { "service": service, "env": env, "window_hours": window_hours, "delta_24h": delta_24h, "causes": causes, "summary": summary, "timeline": timeline, "evidence_refs": evidence_refs, "llm_enrichment": {"enabled": False, "text": None}, } # ─── Signal fetcher (for wiring in tool_manager/risk_engine) ───────────────── def fetch_signals_from_stores( service: str, env: str, window_hours: int = 24, *, alert_store=None, incident_store=None, policy: Optional[Dict] = None, ) -> Dict: """ Fetches raw signals from existing stores. Returns a dict ready to unpack into compute_attribution(). Always non-fatal per store. """ if policy is None: policy = load_attribution_policy() cutoff = ( datetime.datetime.utcnow() - datetime.timedelta(hours=window_hours) ).isoformat() # ── Deploy + other alerts ───────────────────────────────────────────────── alerts_24h: List[Dict] = [] try: if alert_store is not None: all_alerts = alert_store.list_alerts(limit=200) alerts_24h = [ a for a in all_alerts if a.get("created_at", "") >= cutoff and (not a.get("service") or a.get("service") == service) ] except Exception as e: logger.warning("attribution fetch alerts failed: %s", e) # ── Incidents in window + event maps ────────────────────────────────────── incidents_24h: List[Dict] = [] incident_events: Dict[str, List[Dict]] = {} occurrences_60m = 0 escalations_24h = 0 try: if incident_store is not None: cutoff_60m = ( datetime.datetime.utcnow() - datetime.timedelta(minutes=60) ).isoformat() # Count alert occurrences from alert_store top_signatures if alert_store is not None: try: sigs = alert_store.top_signatures(window_minutes=60, limit=20) occurrences_60m = sum(s.get("occurrences", 0) for s in sigs) except Exception: pass incs = incident_store.list_incidents({"service": service}, limit=30) for inc in incs: inc_id = inc.get("id", "") inc_started = inc.get("started_at") or inc.get("created_at", "") try: events = incident_store.get_events(inc_id, limit=50) incident_events[inc_id] = events for ev in events: if (ev.get("type") == "decision" and "Escalat" in (ev.get("message") or "") and ev.get("ts", "") >= cutoff): escalations_24h += 1 except Exception: pass # Include incident if started within window if inc_started >= cutoff: incidents_24h.append(inc) except Exception as e: logger.warning("attribution fetch incident signals failed: %s", e) return { "alerts_24h": alerts_24h, "occurrences_60m": occurrences_60m, "escalations_24h": escalations_24h, "incidents_24h": incidents_24h, "incident_events": incident_events, "release_gate_results": [], # caller can inject if persisted }