New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
732 lines
27 KiB
Python
732 lines
27 KiB
Python
"""
|
||
risk_attribution.py — Change Impact Attribution Engine (deterministic, no LLM by default).
|
||
|
||
Given a service + env, explains WHY risk spiked by correlating signals:
|
||
deploy activity, dependency scan findings, drift errors, incident storms,
|
||
SLO violations, overdue follow-ups, alert-loop degradation.
|
||
|
||
New in this revision:
|
||
- Change Timeline: ordered event stream (deploy, incident, slo, followup, …)
|
||
- Evidence refs: alert_ref[], incident_id[], release_check_run_id, artifact paths
|
||
- Per-cause refs (clickthrough IDs for UI)
|
||
|
||
Provides:
|
||
load_attribution_policy() -> Dict
|
||
compute_attribution(service, env, ...) -> AttributionReport (includes timeline + evidence_refs)
|
||
build_timeline(events, policy) -> List[TimelineItem]
|
||
fetch_signals_from_stores(service, env, ...) -> SignalsData
|
||
|
||
LLM enrichment is separate (llm_enrichment.py) and off by default.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import datetime
|
||
import logging
|
||
import yaml
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ─── Policy ───────────────────────────────────────────────────────────────────
|
||
|
||
_ATTR_POLICY_CACHE: Optional[Dict] = None
|
||
_ATTR_POLICY_SEARCH_PATHS = [
|
||
Path("config/risk_attribution_policy.yml"),
|
||
Path(__file__).resolve().parent.parent.parent / "config" / "risk_attribution_policy.yml",
|
||
]
|
||
|
||
|
||
def load_attribution_policy() -> Dict:
|
||
global _ATTR_POLICY_CACHE
|
||
if _ATTR_POLICY_CACHE is not None:
|
||
return _ATTR_POLICY_CACHE
|
||
for p in _ATTR_POLICY_SEARCH_PATHS:
|
||
if p.exists():
|
||
try:
|
||
with open(p) as f:
|
||
data = yaml.safe_load(f) or {}
|
||
_ATTR_POLICY_CACHE = data
|
||
return data
|
||
except Exception as e:
|
||
logger.warning("Failed to load risk_attribution_policy from %s: %s", p, e)
|
||
_ATTR_POLICY_CACHE = _builtin_attr_defaults()
|
||
return _ATTR_POLICY_CACHE
|
||
|
||
|
||
def _reload_attribution_policy() -> None:
|
||
global _ATTR_POLICY_CACHE
|
||
_ATTR_POLICY_CACHE = None
|
||
|
||
|
||
def _builtin_attr_defaults() -> Dict:
|
||
return {
|
||
"defaults": {"lookback_hours": 24, "max_causes": 5, "llm_mode": "off",
|
||
"llm_max_chars_in": 3500, "llm_max_chars_out": 800},
|
||
"llm_triggers": {"risk_delta_warn": 10, "risk_delta_fail": 20,
|
||
"band_in": ["high", "critical"]},
|
||
"weights": {"deploy": 30, "dependency": 25, "drift": 25, "incident_storm": 20,
|
||
"slo_violation": 15, "followups_overdue": 10, "alert_loop_degraded": 10},
|
||
"signals": {
|
||
"deploy": {"kinds": ["deploy", "deployment", "rollout", "canary"]},
|
||
"dependency": {"release_gate_names": ["dependency_scan", "deps"]},
|
||
"drift": {"release_gate_names": ["drift", "config_drift"]},
|
||
"incident_storm": {"thresholds": {"occurrences_60m_warn": 10,
|
||
"escalations_24h_warn": 2}},
|
||
"slo": {"require_active_violation": True},
|
||
},
|
||
"output": {"confidence_bands": {"high": 60, "medium": 35}},
|
||
"timeline": {
|
||
"enabled": True,
|
||
"lookback_hours": 24,
|
||
"max_items": 30,
|
||
"include_types": ["deploy", "dependency", "drift", "incident", "slo",
|
||
"followup", "alert_loop", "release_gate"],
|
||
"time_bucket_minutes": 5,
|
||
},
|
||
"evidence_linking": {"enabled": True, "max_refs_per_cause": 10},
|
||
"llm_local": {
|
||
"endpoint": "http://localhost:11434/api/generate",
|
||
"model": "llama3",
|
||
"timeout_seconds": 15,
|
||
"model_allowlist": ["qwen2.5-coder:3b", "llama3.1:8b-instruct", "phi3:mini", "llama3"],
|
||
"max_calls_per_digest": 3,
|
||
"per_day_dedupe": True,
|
||
},
|
||
}
|
||
|
||
|
||
# ─── Confidence ───────────────────────────────────────────────────────────────
|
||
|
||
def _score_to_confidence(score: int, policy: Dict) -> str:
|
||
bands = policy.get("output", {}).get("confidence_bands", {})
|
||
high_t = int(bands.get("high", 60))
|
||
med_t = int(bands.get("medium", 35))
|
||
if score >= high_t:
|
||
return "high"
|
||
if score >= med_t:
|
||
return "medium"
|
||
return "low"
|
||
|
||
|
||
# ─── Signal detection helpers (now also return refs) ──────────────────────────
|
||
|
||
def _cap_refs(refs: List[Any], max_refs: int) -> List[Any]:
|
||
return refs[:max_refs]
|
||
|
||
|
||
def _detect_deploy(
|
||
alerts: List[Dict],
|
||
cutoff_iso: str,
|
||
policy: Dict,
|
||
max_refs: int = 10,
|
||
) -> Tuple[int, List[str], List[Dict]]:
|
||
"""Returns (score, evidence_list, refs)."""
|
||
kinds = set(policy.get("signals", {}).get("deploy", {}).get(
|
||
"kinds", ["deploy", "deployment", "rollout", "canary"]
|
||
))
|
||
deploy_alerts = [
|
||
a for a in alerts
|
||
if a.get("kind", "").lower() in kinds and a.get("created_at", "") >= cutoff_iso
|
||
]
|
||
if not deploy_alerts:
|
||
return 0, [], []
|
||
weight = int(policy.get("weights", {}).get("deploy", 30))
|
||
last_seen = max(a.get("created_at", "") for a in deploy_alerts)
|
||
evidence = [
|
||
f"deploy alerts: {len(deploy_alerts)} in last 24h",
|
||
f"last seen: {last_seen[:16] if last_seen else 'unknown'}",
|
||
]
|
||
refs = _cap_refs(
|
||
[{"alert_ref": a["alert_ref"], "kind": a.get("kind", "deploy"),
|
||
"ts": a.get("created_at", "")}
|
||
for a in deploy_alerts if a.get("alert_ref")],
|
||
max_refs,
|
||
)
|
||
return weight, evidence, refs
|
||
|
||
|
||
def _detect_dependency(
|
||
release_gate_results: List[Dict],
|
||
policy: Dict,
|
||
max_refs: int = 10,
|
||
) -> Tuple[int, List[str], List[Dict]]:
|
||
gate_names = set(policy.get("signals", {}).get("dependency", {}).get(
|
||
"release_gate_names", ["dependency_scan", "deps"]
|
||
))
|
||
failing = [
|
||
g for g in release_gate_results
|
||
if g.get("gate") in gate_names and g.get("status") in ("fail", "warn")
|
||
]
|
||
if not failing:
|
||
return 0, [], []
|
||
weight = int(policy.get("weights", {}).get("dependency", 25))
|
||
evidence = [f"dependency_scan gate: {g['gate']} = {g['status']}" for g in failing[:3]]
|
||
refs = _cap_refs(
|
||
[{"release_check_run_id": g.get("run_id"), "gate": g["gate"],
|
||
"artifact": g.get("artifact")}
|
||
for g in failing if g.get("run_id") or g.get("artifact")],
|
||
max_refs,
|
||
)
|
||
return weight, evidence, refs
|
||
|
||
|
||
def _detect_drift(
|
||
release_gate_results: List[Dict],
|
||
policy: Dict,
|
||
max_refs: int = 10,
|
||
) -> Tuple[int, List[str], List[Dict]]:
|
||
gate_names = set(policy.get("signals", {}).get("drift", {}).get(
|
||
"release_gate_names", ["drift", "config_drift"]
|
||
))
|
||
failing = [
|
||
g for g in release_gate_results
|
||
if g.get("gate") in gate_names and g.get("status") in ("fail", "warn")
|
||
]
|
||
if not failing:
|
||
return 0, [], []
|
||
weight = int(policy.get("weights", {}).get("drift", 25))
|
||
evidence = [f"drift gate: {g['gate']} = {g['status']}" for g in failing[:3]]
|
||
refs = _cap_refs(
|
||
[{"release_check_run_id": g.get("run_id"), "gate": g["gate"],
|
||
"artifact": g.get("artifact")}
|
||
for g in failing if g.get("run_id") or g.get("artifact")],
|
||
max_refs,
|
||
)
|
||
return weight, evidence, refs
|
||
|
||
|
||
def _detect_incident_storm(
|
||
occurrences_60m: int,
|
||
escalations_24h: int,
|
||
policy: Dict,
|
||
incident_ids: Optional[List[str]] = None,
|
||
max_refs: int = 10,
|
||
) -> Tuple[int, List[str], List[Dict]]:
|
||
storm_cfg = policy.get("signals", {}).get("incident_storm", {}).get("thresholds", {})
|
||
occ_warn = int(storm_cfg.get("occurrences_60m_warn", 10))
|
||
esc_warn = int(storm_cfg.get("escalations_24h_warn", 2))
|
||
|
||
triggered = (occurrences_60m >= occ_warn) or (escalations_24h >= esc_warn)
|
||
if not triggered:
|
||
return 0, [], []
|
||
|
||
weight = int(policy.get("weights", {}).get("incident_storm", 20))
|
||
evidence = []
|
||
if occurrences_60m >= occ_warn:
|
||
evidence.append(f"occurrences_60m={occurrences_60m} (≥{occ_warn})")
|
||
if escalations_24h >= esc_warn:
|
||
evidence.append(f"escalations_24h={escalations_24h} (≥{esc_warn})")
|
||
refs = _cap_refs(
|
||
[{"incident_id": iid} for iid in (incident_ids or [])],
|
||
max_refs,
|
||
)
|
||
return weight, evidence, refs
|
||
|
||
|
||
def _detect_slo(
|
||
slo_violations: int,
|
||
policy: Dict,
|
||
slo_metrics: Optional[List[str]] = None,
|
||
max_refs: int = 10,
|
||
) -> Tuple[int, List[str], List[Dict]]:
|
||
require_active = policy.get("signals", {}).get("slo", {}).get("require_active_violation", True)
|
||
if require_active and slo_violations == 0:
|
||
return 0, [], []
|
||
if slo_violations == 0:
|
||
return 0, [], []
|
||
weight = int(policy.get("weights", {}).get("slo_violation", 15))
|
||
evidence = [f"active SLO violations: {slo_violations}"]
|
||
refs = _cap_refs(
|
||
[{"metric": m} for m in (slo_metrics or [])],
|
||
max_refs,
|
||
)
|
||
return weight, evidence, refs
|
||
|
||
|
||
def _detect_followups_overdue(
|
||
overdue_count: int,
|
||
policy: Dict,
|
||
followup_refs: Optional[List[Dict]] = None,
|
||
max_refs: int = 10,
|
||
) -> Tuple[int, List[str], List[Dict]]:
|
||
if overdue_count == 0:
|
||
return 0, [], []
|
||
weight = int(policy.get("weights", {}).get("followups_overdue", 10))
|
||
evidence = [f"overdue follow-ups: {overdue_count}"]
|
||
refs = _cap_refs(followup_refs or [], max_refs)
|
||
return weight, evidence, refs
|
||
|
||
|
||
def _detect_alert_loop_degraded(
|
||
loop_slo_violations: int,
|
||
policy: Dict,
|
||
max_refs: int = 10,
|
||
) -> Tuple[int, List[str], List[Dict]]:
|
||
if loop_slo_violations == 0:
|
||
return 0, [], []
|
||
weight = int(policy.get("weights", {}).get("alert_loop_degraded", 10))
|
||
evidence = [f"alert-loop SLO violations: {loop_slo_violations}"]
|
||
refs: List[Dict] = []
|
||
return weight, evidence, refs
|
||
|
||
|
||
# ─── Timeline builder ────────────────────────────────────────────────────────
|
||
|
||
def _bucket_key(ts_iso: str, bucket_minutes: int) -> str:
|
||
"""Round timestamp down to the nearest bucket boundary."""
|
||
try:
|
||
dt = datetime.datetime.fromisoformat(ts_iso.rstrip("Z"))
|
||
total_mins = dt.hour * 60 + dt.minute
|
||
bucket_start = (total_mins // bucket_minutes) * bucket_minutes
|
||
return f"{dt.strftime('%Y-%m-%d')}T{bucket_start // 60:02d}:{bucket_start % 60:02d}"
|
||
except Exception:
|
||
return ts_iso[:13] # fallback: truncate to hour
|
||
|
||
|
||
def build_timeline(
|
||
raw_events: List[Dict],
|
||
policy: Optional[Dict] = None,
|
||
) -> List[Dict]:
|
||
"""
|
||
Build an ordered Change Timeline from raw event dicts.
|
||
|
||
raw_events is a list of:
|
||
{ts, type, label, refs, ...}
|
||
|
||
Returns newest-first list, bucketed and capped at max_items.
|
||
Multiple same-type events in the same time bucket are coalesced into
|
||
one "xN" item.
|
||
"""
|
||
if policy is None:
|
||
policy = load_attribution_policy()
|
||
|
||
tl_cfg = policy.get("timeline", {})
|
||
if not tl_cfg.get("enabled", True):
|
||
return []
|
||
|
||
max_items = int(tl_cfg.get("max_items", 30))
|
||
bucket_minutes = int(tl_cfg.get("time_bucket_minutes", 5))
|
||
include_types = set(tl_cfg.get("include_types", []))
|
||
|
||
# Filter by allowed types
|
||
filtered = [
|
||
e for e in raw_events
|
||
if not include_types or e.get("type") in include_types
|
||
]
|
||
|
||
# Sort newest-first
|
||
filtered.sort(key=lambda e: e.get("ts", ""), reverse=True)
|
||
|
||
# Bucket coalescing: same type + same bucket → single item with count
|
||
seen: Dict[str, Dict] = {} # key → accumulated item
|
||
order: List[str] = [] # preserve insertion order
|
||
|
||
for ev in filtered:
|
||
bk = _bucket_key(ev.get("ts", ""), bucket_minutes)
|
||
key = f"{ev.get('type', 'unknown')}:{bk}"
|
||
if key not in seen:
|
||
seen[key] = {
|
||
"ts": ev.get("ts", ""),
|
||
"type": ev.get("type", "unknown"),
|
||
"label": ev.get("label", ""),
|
||
"refs": list(ev.get("refs", {}).items() if isinstance(ev.get("refs"), dict)
|
||
else ev.get("refs", [])),
|
||
"_count": 1,
|
||
"_latest_ts": ev.get("ts", ""),
|
||
}
|
||
order.append(key)
|
||
else:
|
||
seen[key]["_count"] += 1
|
||
# Keep latest ts
|
||
if ev.get("ts", "") > seen[key]["_latest_ts"]:
|
||
seen[key]["_latest_ts"] = ev.get("ts", "")
|
||
seen[key]["ts"] = ev.get("ts", "")
|
||
# Merge refs (up to 5 per bucket)
|
||
new_refs = (list(ev.get("refs", {}).items()) if isinstance(ev.get("refs"), dict)
|
||
else ev.get("refs", []))
|
||
if len(seen[key]["refs"]) < 5:
|
||
seen[key]["refs"].extend(new_refs[:5 - len(seen[key]["refs"])])
|
||
|
||
# Build final items
|
||
items = []
|
||
for key in order:
|
||
item = seen[key]
|
||
count = item.pop("_count", 1)
|
||
item.pop("_latest_ts", None)
|
||
if count > 1:
|
||
item["label"] = f"{item['label']} (×{count})"
|
||
# Convert refs back to dict if needed
|
||
if isinstance(item["refs"], list) and item["refs"] and isinstance(item["refs"][0], tuple):
|
||
item["refs"] = dict(item["refs"])
|
||
items.append(item)
|
||
|
||
return items[:max_items]
|
||
|
||
|
||
def _make_timeline_events_from_alerts(
|
||
alerts: List[Dict],
|
||
deploy_kinds: set,
|
||
cutoff_iso: str,
|
||
) -> List[Dict]:
|
||
"""Convert alert records to raw timeline events."""
|
||
events = []
|
||
for a in alerts:
|
||
if a.get("created_at", "") < cutoff_iso:
|
||
continue
|
||
kind = a.get("kind", "").lower()
|
||
ev_type = "deploy" if kind in deploy_kinds else "alert"
|
||
refs = {}
|
||
if a.get("alert_ref"):
|
||
refs["alert_ref"] = a["alert_ref"]
|
||
if a.get("service"):
|
||
refs["service"] = a["service"]
|
||
events.append({
|
||
"ts": a.get("created_at", ""),
|
||
"type": ev_type,
|
||
"label": f"Alert: {kind}" + (f" ({a.get('title', '')})"
|
||
if a.get("title") else ""),
|
||
"refs": refs,
|
||
})
|
||
return events
|
||
|
||
|
||
def _make_timeline_events_from_incidents(
|
||
incidents: List[Dict],
|
||
events_by_id: Dict[str, List[Dict]],
|
||
cutoff_iso: str,
|
||
) -> List[Dict]:
|
||
"""Convert incident + escalation events to raw timeline events."""
|
||
timeline_events = []
|
||
for inc in incidents:
|
||
inc_id = inc.get("id", "")
|
||
started = inc.get("started_at") or inc.get("created_at", "")
|
||
if started >= cutoff_iso:
|
||
timeline_events.append({
|
||
"ts": started,
|
||
"type": "incident",
|
||
"label": f"Incident started: {inc.get('title', inc_id)[:80]}",
|
||
"refs": {"incident_id": inc_id},
|
||
})
|
||
for ev in events_by_id.get(inc_id, []):
|
||
if (ev.get("type") == "decision"
|
||
and "Escalat" in (ev.get("message") or "")
|
||
and ev.get("ts", "") >= cutoff_iso):
|
||
timeline_events.append({
|
||
"ts": ev["ts"],
|
||
"type": "incident",
|
||
"label": f"Incident escalated: {inc_id}",
|
||
"refs": {"incident_id": inc_id,
|
||
"event_type": ev.get("type", "")},
|
||
})
|
||
return timeline_events
|
||
|
||
|
||
def _make_timeline_events_from_gates(
|
||
release_gate_results: List[Dict],
|
||
) -> List[Dict]:
|
||
"""Convert release gate results to raw timeline events."""
|
||
events = []
|
||
for g in release_gate_results:
|
||
if g.get("status") not in ("fail", "warn"):
|
||
continue
|
||
gate_type = "dependency" if "dep" in g.get("gate", "").lower() else "release_gate"
|
||
if "drift" in g.get("gate", "").lower():
|
||
gate_type = "drift"
|
||
refs: Dict = {}
|
||
if g.get("run_id"):
|
||
refs["release_check_run_id"] = g["run_id"]
|
||
if g.get("artifact"):
|
||
refs["artifact"] = g["artifact"]
|
||
events.append({
|
||
"ts": g.get("ts", datetime.datetime.utcnow().isoformat()),
|
||
"type": gate_type,
|
||
"label": f"Gate {g['gate']} = {g['status']}",
|
||
"refs": refs,
|
||
})
|
||
return events
|
||
|
||
|
||
# ─── Evidence refs builder ────────────────────────────────────────────────────
|
||
|
||
def build_evidence_refs(
|
||
alerts_24h: List[Dict],
|
||
incidents_24h: List[Dict],
|
||
release_gate_results: List[Dict],
|
||
followup_refs: Optional[List[Dict]] = None,
|
||
policy: Optional[Dict] = None,
|
||
) -> Dict:
|
||
"""
|
||
Collect top-level evidence_refs: alert_refs, incident_ids,
|
||
release_check_run_ids, artifacts.
|
||
"""
|
||
if policy is None:
|
||
policy = load_attribution_policy()
|
||
|
||
max_refs = int(policy.get("evidence_linking", {}).get("max_refs_per_cause", 10))
|
||
|
||
alert_refs = _cap_refs(
|
||
[a["alert_ref"] for a in alerts_24h if a.get("alert_ref")], max_refs
|
||
)
|
||
incident_ids = _cap_refs(
|
||
list({inc.get("id", "") for inc in incidents_24h if inc.get("id")}), max_refs
|
||
)
|
||
rc_ids = _cap_refs(
|
||
list({g.get("run_id") for g in release_gate_results if g.get("run_id")}), max_refs
|
||
)
|
||
artifacts = _cap_refs(
|
||
list({g.get("artifact") for g in release_gate_results if g.get("artifact")}), max_refs
|
||
)
|
||
fu_refs = _cap_refs(
|
||
[r for r in (followup_refs or []) if r], max_refs
|
||
)
|
||
|
||
return {
|
||
"alerts": alert_refs,
|
||
"incidents": incident_ids,
|
||
"release_checks": list(filter(None, rc_ids)),
|
||
"artifacts": list(filter(None, artifacts)),
|
||
"followups": fu_refs,
|
||
}
|
||
|
||
|
||
# ─── Summary builder ──────────────────────────────────────────────────────────
|
||
|
||
_TYPE_LABELS = {
|
||
"deploy": "deploy activity",
|
||
"dependency": "dependency change",
|
||
"drift": "config/infrastructure drift",
|
||
"incident_storm": "incident storm",
|
||
"slo_violation": "SLO violation",
|
||
"followups_overdue": "overdue follow-ups",
|
||
"alert_loop_degraded": "alert-loop degradation",
|
||
}
|
||
|
||
|
||
def _build_summary(causes: List[Dict]) -> str:
|
||
if not causes:
|
||
return "No significant attribution signals detected."
|
||
labels = [_TYPE_LABELS.get(c["type"], c["type"]) for c in causes[:3]]
|
||
return "Likely causes: " + " + ".join(labels) + "."
|
||
|
||
|
||
# ─── Main attribution function ────────────────────────────────────────────────
|
||
|
||
def compute_attribution(
|
||
service: str,
|
||
env: str,
|
||
*,
|
||
risk_report: Optional[Dict] = None,
|
||
# Signals (pre-fetched)
|
||
alerts_24h: Optional[List[Dict]] = None,
|
||
occurrences_60m: int = 0,
|
||
escalations_24h: int = 0,
|
||
release_gate_results: Optional[List[Dict]] = None,
|
||
slo_violations: int = 0,
|
||
slo_metrics: Optional[List[str]] = None,
|
||
overdue_followup_count: int = 0,
|
||
followup_refs: Optional[List[Dict]] = None,
|
||
loop_slo_violations: int = 0,
|
||
# For evidence + timeline
|
||
incidents_24h: Optional[List[Dict]] = None,
|
||
incident_events: Optional[Dict[str, List[Dict]]] = None,
|
||
window_hours: int = 24,
|
||
policy: Optional[Dict] = None,
|
||
) -> Dict:
|
||
"""
|
||
Deterministic attribution: causes with evidence, refs, timeline, evidence_refs.
|
||
|
||
All signal arguments default to safe empty values.
|
||
Never raises (returns minimal report on any error).
|
||
"""
|
||
if policy is None:
|
||
policy = load_attribution_policy()
|
||
|
||
cutoff = (
|
||
datetime.datetime.utcnow() - datetime.timedelta(hours=window_hours)
|
||
).isoformat()
|
||
|
||
max_causes = int(policy.get("defaults", {}).get("max_causes", 5))
|
||
max_refs = int(policy.get("evidence_linking", {}).get("max_refs_per_cause", 10))
|
||
risk_report = risk_report or {}
|
||
alerts_24h = alerts_24h or []
|
||
release_gate_results = release_gate_results or []
|
||
incidents_24h = incidents_24h or []
|
||
incident_events = incident_events or {}
|
||
|
||
# Extract from risk_report.components when not explicitly provided
|
||
if slo_violations == 0 and risk_report:
|
||
slo_violations = (risk_report.get("components", {}).get("slo") or {}).get("violations", 0)
|
||
if overdue_followup_count == 0 and risk_report:
|
||
fu = risk_report.get("components", {}).get("followups") or {}
|
||
overdue_followup_count = fu.get("P0", 0) + fu.get("P1", 0) + fu.get("other", 0)
|
||
if loop_slo_violations == 0 and risk_report:
|
||
loop_slo_violations = (
|
||
risk_report.get("components", {}).get("alerts_loop") or {}
|
||
).get("violations", 0)
|
||
|
||
incident_ids = [inc.get("id", "") for inc in incidents_24h if inc.get("id")]
|
||
|
||
# ── Score each signal (now with refs) ────────────────────────────────────
|
||
candidates: List[Dict] = []
|
||
|
||
score, evid, refs = _detect_deploy(alerts_24h, cutoff, policy, max_refs)
|
||
if score:
|
||
candidates.append({"type": "deploy", "score": score, "evidence": evid, "refs": refs})
|
||
|
||
score, evid, refs = _detect_dependency(release_gate_results, policy, max_refs)
|
||
if score:
|
||
candidates.append({"type": "dependency", "score": score, "evidence": evid, "refs": refs})
|
||
|
||
score, evid, refs = _detect_drift(release_gate_results, policy, max_refs)
|
||
if score:
|
||
candidates.append({"type": "drift", "score": score, "evidence": evid, "refs": refs})
|
||
|
||
score, evid, refs = _detect_incident_storm(
|
||
occurrences_60m, escalations_24h, policy, incident_ids, max_refs
|
||
)
|
||
if score:
|
||
candidates.append({"type": "incident_storm", "score": score, "evidence": evid, "refs": refs})
|
||
|
||
score, evid, refs = _detect_slo(slo_violations, policy, slo_metrics, max_refs)
|
||
if score:
|
||
candidates.append({"type": "slo_violation", "score": score, "evidence": evid, "refs": refs})
|
||
|
||
score, evid, refs = _detect_followups_overdue(
|
||
overdue_followup_count, policy, followup_refs, max_refs
|
||
)
|
||
if score:
|
||
candidates.append({"type": "followups_overdue", "score": score,
|
||
"evidence": evid, "refs": refs})
|
||
|
||
score, evid, refs = _detect_alert_loop_degraded(loop_slo_violations, policy, max_refs)
|
||
if score:
|
||
candidates.append({"type": "alert_loop_degraded", "score": score,
|
||
"evidence": evid, "refs": refs})
|
||
|
||
# Sort desc, cap, add confidence
|
||
candidates.sort(key=lambda c: -c["score"])
|
||
causes = candidates[:max_causes]
|
||
for c in causes:
|
||
c["confidence"] = _score_to_confidence(c["score"], policy)
|
||
|
||
delta_24h = (risk_report.get("trend") or {}).get("delta_24h")
|
||
summary = _build_summary(causes)
|
||
|
||
# ── Timeline ──────────────────────────────────────────────────────────────
|
||
tl_cfg = policy.get("timeline", {})
|
||
deploy_kinds = set(policy.get("signals", {}).get("deploy", {}).get(
|
||
"kinds", ["deploy", "deployment", "rollout", "canary"]
|
||
))
|
||
raw_events: List[Dict] = []
|
||
raw_events.extend(_make_timeline_events_from_alerts(alerts_24h, deploy_kinds, cutoff))
|
||
raw_events.extend(_make_timeline_events_from_incidents(incidents_24h, incident_events, cutoff))
|
||
raw_events.extend(_make_timeline_events_from_gates(release_gate_results))
|
||
timeline = build_timeline(raw_events, policy) if tl_cfg.get("enabled", True) else []
|
||
|
||
# ── Evidence refs ─────────────────────────────────────────────────────────
|
||
evidence_refs: Dict = {}
|
||
if policy.get("evidence_linking", {}).get("enabled", True):
|
||
evidence_refs = build_evidence_refs(
|
||
alerts_24h, incidents_24h, release_gate_results,
|
||
followup_refs=followup_refs, policy=policy,
|
||
)
|
||
|
||
return {
|
||
"service": service,
|
||
"env": env,
|
||
"window_hours": window_hours,
|
||
"delta_24h": delta_24h,
|
||
"causes": causes,
|
||
"summary": summary,
|
||
"timeline": timeline,
|
||
"evidence_refs": evidence_refs,
|
||
"llm_enrichment": {"enabled": False, "text": None},
|
||
}
|
||
|
||
|
||
# ─── Signal fetcher (for wiring in tool_manager/risk_engine) ─────────────────
|
||
|
||
def fetch_signals_from_stores(
|
||
service: str,
|
||
env: str,
|
||
window_hours: int = 24,
|
||
*,
|
||
alert_store=None,
|
||
incident_store=None,
|
||
policy: Optional[Dict] = None,
|
||
) -> Dict:
|
||
"""
|
||
Fetches raw signals from existing stores.
|
||
Returns a dict ready to unpack into compute_attribution().
|
||
Always non-fatal per store.
|
||
"""
|
||
if policy is None:
|
||
policy = load_attribution_policy()
|
||
|
||
cutoff = (
|
||
datetime.datetime.utcnow() - datetime.timedelta(hours=window_hours)
|
||
).isoformat()
|
||
|
||
# ── Deploy + other alerts ─────────────────────────────────────────────────
|
||
alerts_24h: List[Dict] = []
|
||
try:
|
||
if alert_store is not None:
|
||
all_alerts = alert_store.list_alerts(limit=200)
|
||
alerts_24h = [
|
||
a for a in all_alerts
|
||
if a.get("created_at", "") >= cutoff
|
||
and (not a.get("service") or a.get("service") == service)
|
||
]
|
||
except Exception as e:
|
||
logger.warning("attribution fetch alerts failed: %s", e)
|
||
|
||
# ── Incidents in window + event maps ──────────────────────────────────────
|
||
incidents_24h: List[Dict] = []
|
||
incident_events: Dict[str, List[Dict]] = {}
|
||
occurrences_60m = 0
|
||
escalations_24h = 0
|
||
|
||
try:
|
||
if incident_store is not None:
|
||
cutoff_60m = (
|
||
datetime.datetime.utcnow() - datetime.timedelta(minutes=60)
|
||
).isoformat()
|
||
|
||
# Count alert occurrences from alert_store top_signatures
|
||
if alert_store is not None:
|
||
try:
|
||
sigs = alert_store.top_signatures(window_minutes=60, limit=20)
|
||
occurrences_60m = sum(s.get("occurrences", 0) for s in sigs)
|
||
except Exception:
|
||
pass
|
||
|
||
incs = incident_store.list_incidents({"service": service}, limit=30)
|
||
for inc in incs:
|
||
inc_id = inc.get("id", "")
|
||
inc_started = inc.get("started_at") or inc.get("created_at", "")
|
||
try:
|
||
events = incident_store.get_events(inc_id, limit=50)
|
||
incident_events[inc_id] = events
|
||
for ev in events:
|
||
if (ev.get("type") == "decision"
|
||
and "Escalat" in (ev.get("message") or "")
|
||
and ev.get("ts", "") >= cutoff):
|
||
escalations_24h += 1
|
||
except Exception:
|
||
pass
|
||
# Include incident if started within window
|
||
if inc_started >= cutoff:
|
||
incidents_24h.append(inc)
|
||
except Exception as e:
|
||
logger.warning("attribution fetch incident signals failed: %s", e)
|
||
|
||
return {
|
||
"alerts_24h": alerts_24h,
|
||
"occurrences_60m": occurrences_60m,
|
||
"escalations_24h": escalations_24h,
|
||
"incidents_24h": incidents_24h,
|
||
"incident_events": incident_events,
|
||
"release_gate_results": [], # caller can inject if persisted
|
||
}
|