""" risk_digest.py — Daily Risk Digest generator (deterministic, no LLM). Produces: ops/reports/risk/YYYY-MM-DD.json ops/reports/risk/YYYY-MM-DD.md Content: - Top risky services (score desc) - Top regressions (delta_24h desc) - SLO violation summary - Deterministic action list based on risk state """ from __future__ import annotations import datetime import json import logging import math import os from pathlib import Path from typing import Dict, List, Optional logger = logging.getLogger(__name__) _ACTION_TEMPLATES = { "regression_fail": "🚨 **Regression detected**: {service} score +{delta} in 24h. Freeze deployments; inspect recent incidents/followups immediately.", "regression_warn": "⚠️ **Score rising**: {service} +{delta} in 24h. Review open incidents and overdue follow-ups.", "critical_band": "🔴 **Critical risk**: {service} (score {score}). Oncall review required within 2h.", "high_band": "🟠 **High risk**: {service} (score {score}). Coordinate with oncall before next release.", "overdue_followups": "📋 **Overdue follow-ups**: {service} has {count} overdue follow-up(s). Close them to reduce risk score.", "slo_violation": "📉 **SLO violation**: {service} has {count} active SLO violation(s). Avoid deploying until clear.", } def _now_date() -> str: return datetime.datetime.utcnow().strftime("%Y-%m-%d") def _clamp(text: str, max_chars: int) -> str: if len(text) <= max_chars: return text truncated = text[:max_chars] return truncated + "\n\n_[digest truncated to policy max_chars]_" def _build_action_list(reports: List[Dict]) -> List[str]: actions = [] for r in reports[:10]: service = r.get("service", "?") score = r.get("score", 0) band = r.get("band", "low") trend = r.get("trend") or {} comp = r.get("components", {}) delta_24h = trend.get("delta_24h") reg = trend.get("regression", {}) if reg.get("fail") and delta_24h is not None and delta_24h > 0: actions.append(_ACTION_TEMPLATES["regression_fail"].format( service=service, delta=delta_24h)) elif reg.get("warn") and delta_24h is not None and delta_24h > 0: actions.append(_ACTION_TEMPLATES["regression_warn"].format( service=service, delta=delta_24h)) if band == "critical": actions.append(_ACTION_TEMPLATES["critical_band"].format( service=service, score=score)) elif band == "high": actions.append(_ACTION_TEMPLATES["high_band"].format( service=service, score=score)) overdue = ( (comp.get("followups") or {}).get("P0", 0) + (comp.get("followups") or {}).get("P1", 0) + (comp.get("followups") or {}).get("other", 0) ) if overdue: actions.append(_ACTION_TEMPLATES["overdue_followups"].format( service=service, count=overdue)) slo_count = (comp.get("slo") or {}).get("violations", 0) if slo_count: actions.append(_ACTION_TEMPLATES["slo_violation"].format( service=service, count=slo_count)) return actions[:20] # cap def _build_markdown( date_str: str, env: str, reports: List[Dict], top_regressions: List[Dict], improving: List[Dict], actions: List[str], band_counts: Dict, ) -> str: lines = [ f"# Risk Digest — {date_str} ({env})", "", f"Generated: {datetime.datetime.utcnow().isoformat()} UTC", "", "## Band Summary", "", "| Band | Count |", "|------|-------|", ] for band in ("critical", "high", "medium", "low"): lines.append(f"| {band} | {band_counts.get(band, 0)} |") lines += [ "", "## Top Risky Services", "", "| Service | Score | Band | Δ24h | Δ7d |", "|---------|-------|------|------|-----|", ] for r in reports: t = r.get("trend") or {} d24 = t.get("delta_24h") d7 = t.get("delta_7d") d24_str = (f"+{d24}" if d24 and d24 > 0 else str(d24)) if d24 is not None else "—" d7_str = (f"+{d7}" if d7 and d7 > 0 else str(d7)) if d7 is not None else "—" lines.append( f"| {r['service']} | {r.get('score', 0)} | {r.get('band', '?')} " f"| {d24_str} | {d7_str} |" ) if top_regressions: lines += ["", "## Top Regressions (Δ24h)", ""] for item in top_regressions: delta = item.get("delta_24h", 0) lines.append(f"- **{item['service']}**: +{delta} points in 24h") # ── Likely Causes (Attribution) ─────────────────────────────────────────── regressions_with_attribution = [ r for r in reports if (r.get("trend") or {}).get("delta_24h") is not None and r["trend"]["delta_24h"] > 0 and r.get("attribution") is not None and r["attribution"].get("causes") ] regressions_with_attribution = sorted( regressions_with_attribution, key=lambda r: -(r.get("trend") or {}).get("delta_24h", 0), )[:5] if regressions_with_attribution: lines += ["", "## Likely Causes (Top Regressions)", ""] for r in regressions_with_attribution: svc = r["service"] attr = r["attribution"] delta = r["trend"]["delta_24h"] summary = attr.get("summary", "") lines.append(f"### {svc} (+{delta} pts)") if summary: lines.append(f"> {summary}") causes = attr.get("causes", [])[:2] for c in causes: evid = "; ".join(c.get("evidence", [])) lines.append( f"- **{c['type']}** (confidence: {c.get('confidence', '?')}): {evid}" ) # LLM text if available llm = attr.get("llm_enrichment") or {} if llm.get("enabled") and llm.get("text"): lines += ["", f" _LLM insight_: {llm['text'][:400]}"] lines.append("") # ── Change Timeline (Top Regressions) ──────────────────────────────────── regressions_with_timeline = [ r for r in regressions_with_attribution if r.get("attribution") and r["attribution"].get("timeline") ] if regressions_with_timeline: lines += ["", "## Change Timeline (Top Regressions)", ""] for r in regressions_with_timeline: svc = r["service"] timeline = r["attribution"]["timeline"][:5] # top 5 per service lines.append(f"### {svc}") for item in timeline: ts = (item.get("ts") or "")[:16] label = item.get("label", "") ev_type = item.get("type", "") lines.append(f"- `{ts}` [{ev_type}] {label}") lines.append("") if improving: lines += ["", "## Improving Services (Δ7d)", ""] for item in improving: delta = item.get("delta_7d", 0) lines.append(f"- **{item['service']}**: {delta} points over 7d") if actions: lines += ["", "## Action List", ""] for action in actions: lines.append(f"- {action}") lines += ["", "---", "_Generated by DAARION.city Risk Digest (deterministic, no LLM by default)_"] return "\n".join(lines) def daily_digest( env: str = "prod", *, service_reports: Optional[List[Dict]] = None, policy: Optional[Dict] = None, date_str: Optional[str] = None, output_dir: Optional[str] = None, write_files: bool = True, ) -> Dict: """ Build and optionally persist the daily risk digest. service_reports — pre-fetched+enriched list of RiskReports (with trend). Returns {json_path, md_path, json_data, markdown, date, env} """ from risk_engine import load_risk_policy, compute_risk_dashboard if policy is None: policy = load_risk_policy() digest_cfg = policy.get("digest", {}) top_n = int(digest_cfg.get("top_n", 10)) max_chars = int(digest_cfg.get("markdown_max_chars", 8000)) cfg_output_dir = digest_cfg.get("output_dir", "ops/reports/risk") effective_output_dir = output_dir or cfg_output_dir effective_date = date_str or _now_date() reports = sorted(service_reports or [], key=lambda r: -r.get("score", 0))[:top_n] # Band counts band_counts: Dict[str, int] = {"critical": 0, "high": 0, "medium": 0, "low": 0} for r in reports: b = r.get("band", "low") band_counts[b] = band_counts.get(b, 0) + 1 # Top regressions top_regressions = sorted( [r for r in reports if (r.get("trend") or {}).get("delta_24h") is not None and r["trend"]["delta_24h"] > 0], key=lambda r: -r["trend"]["delta_24h"], )[:5] top_regressions_out = [ {"service": r["service"], "delta_24h": r["trend"]["delta_24h"], "attribution_causes": [ {"type": c["type"], "score": c["score"], "confidence": c.get("confidence", "low"), "evidence": c.get("evidence", [])[:2], "refs": c.get("refs", [])[:3]} for c in (r.get("attribution") or {}).get("causes", [])[:2] ], "timeline_preview": (r.get("attribution") or {}).get("timeline", [])[:3], } for r in top_regressions ] # Improving services improving = sorted( [r for r in reports if (r.get("trend") or {}).get("delta_7d") is not None and r["trend"]["delta_7d"] < 0], key=lambda r: r["trend"]["delta_7d"], )[:5] improving_out = [ {"service": r["service"], "delta_7d": r["trend"]["delta_7d"]} for r in improving ] actions = _build_action_list(reports) markdown_raw = _build_markdown( date_str=effective_date, env=env, reports=reports, top_regressions=top_regressions_out, improving=improving_out, actions=actions, band_counts=band_counts, ) markdown = _clamp(markdown_raw, max_chars) json_data = { "date": effective_date, "env": env, "generated_at": datetime.datetime.utcnow().isoformat(), "band_counts": band_counts, "top_services": [ { "service": r.get("service"), "score": r.get("score"), "band": r.get("band"), "delta_24h": (r.get("trend") or {}).get("delta_24h"), "delta_7d": (r.get("trend") or {}).get("delta_7d"), "regression": (r.get("trend") or {}).get("regression"), "reasons": r.get("reasons", [])[:5], "attribution_summary": (r.get("attribution") or {}).get("summary"), "top_causes": [ {"type": c["type"], "score": c["score"], "confidence": c.get("confidence", "low"), "evidence": c.get("evidence", [])[:2], "refs": c.get("refs", [])[:3]} for c in (r.get("attribution") or {}).get("causes", [])[:2] ], "timeline_preview": (r.get("attribution") or {}).get("timeline", [])[:3], "evidence_refs": (r.get("attribution") or {}).get("evidence_refs", {}), } for r in reports ], "top_regressions": top_regressions_out, "improving_services": improving_out, "actions": actions, } json_path: Optional[str] = None md_path: Optional[str] = None if write_files: try: out = Path(effective_output_dir) out.mkdir(parents=True, exist_ok=True) json_path = str(out / f"{effective_date}.json") md_path = str(out / f"{effective_date}.md") with open(json_path, "w") as f: json.dump(json_data, f, indent=2) with open(md_path, "w") as f: f.write(markdown) logger.info("Risk digest written: %s, %s", json_path, md_path) except Exception as e: logger.warning("Risk digest write failed: %s", e) json_path = md_path = None return { "date": effective_date, "env": env, "json_path": json_path, "md_path": md_path, "json_data": json_data, "markdown": markdown, }