""" platform_priority_digest.py — Weekly Platform Priority Digest. DAARION.city | deterministic, no LLM. Generates a Markdown + JSON report prioritising services by Architecture Pressure, optionally correlated with Risk score/delta. Outputs: ops/reports/platform/{YYYY-WW}.md ops/reports/platform/{YYYY-WW}.json Public API: weekly_platform_digest(env, ...) -> DigestResult """ from __future__ import annotations import datetime import json import logging import os from pathlib import Path from typing import Dict, List, Optional from architecture_pressure import load_pressure_policy logger = logging.getLogger(__name__) # ─── Action templates ───────────────────────────────────────────────────────── _ACTION_TEMPLATES = { "arch_review": ( "📋 **Schedule architecture review**: '{service}' pressure={score} " "({band}). Review structural debt and recurring failure patterns." ), "refactor_sprint": ( "🔧 **Allocate refactor sprint**: '{service}' has {regressions} regressions " "and {escalations} escalations in 30d — structural instability requires investment." ), "freeze_features": ( "🚫 **Freeze non-critical features**: '{service}' is critical-pressure + " "risk-high. Stabilise before new feature work." ), "reduce_backlog": ( "📌 **Reduce followup backlog**: '{service}' has {overdue} overdue follow-ups. " "Address before next release cycle." ), } def _now_week() -> str: """Return ISO week string: YYYY-WNN.""" return datetime.datetime.utcnow().strftime("%Y-W%V") def _now_date() -> str: return datetime.datetime.utcnow().strftime("%Y-%m-%d") def _clamp(text: str, max_chars: int) -> str: if max_chars and len(text) > max_chars: return text[:max_chars - 3] + "…" return text # ─── Action list builder ────────────────────────────────────────────────────── def _build_priority_actions(pressure_reports: List[Dict], risk_reports: Optional[Dict] = None) -> List[str]: actions = [] risk_reports = risk_reports or {} for r in pressure_reports: svc = r["service"] score = r.get("score", 0) band = r.get("band", "low") comp = r.get("components", {}) if r.get("requires_arch_review"): actions.append( _ACTION_TEMPLATES["arch_review"].format( service=svc, score=score, band=band ) ) regressions = int(comp.get("regressions_30d", 0)) escalations = int(comp.get("escalations_30d", 0)) if regressions >= 3 and escalations >= 2: actions.append( _ACTION_TEMPLATES["refactor_sprint"].format( service=svc, regressions=regressions, escalations=escalations ) ) rr = risk_reports.get(svc, {}) risk_band = rr.get("band", "low") if rr else r.get("risk_band", "low") if band == "critical" and risk_band in ("high", "critical"): actions.append( _ACTION_TEMPLATES["freeze_features"].format(service=svc) ) overdue = int(comp.get("followups_overdue", 0)) if overdue >= 2: actions.append( _ACTION_TEMPLATES["reduce_backlog"].format(service=svc, overdue=overdue) ) return actions[:20] # cap # ─── Markdown builder ───────────────────────────────────────────────────────── def _build_markdown( week_str: str, env: str, pressure_reports: List[Dict], investment_list: List[Dict], actions: List[str], band_counts: Dict[str, int], ) -> str: lines = [ f"# Platform Priority Digest — {env.upper()} | {week_str}", f"_Generated: {_now_date()} | Deterministic | No LLM_", "", "## Pressure Band Summary", "", f"| Band | Services |", f"|------|---------|", f"| 🔴 Critical | {band_counts.get('critical', 0)} |", f"| 🟠 High | {band_counts.get('high', 0)} |", f"| 🟡 Medium | {band_counts.get('medium', 0)} |", f"| 🟢 Low | {band_counts.get('low', 0)} |", "", ] # Critical pressure critical = [r for r in pressure_reports if r.get("band") == "critical"] if critical: lines += ["## 🔴 Critical Structural Pressure", ""] for r in critical: svc = r["service"] score = r.get("score", 0) summary = "; ".join(r.get("signals_summary", [])[:3]) arch_flag = " ⚠️ ARCH REVIEW REQUIRED" if r.get("requires_arch_review") else "" lines.append(f"### {svc} (score={score}){arch_flag}") lines.append(f"> {summary}") # Risk correlation if r.get("risk_score") is not None: lines.append( f"> Risk: {r['risk_score']} ({r.get('risk_band', '?')})" + (f" Δ24h: +{r['risk_delta_24h']}" if r.get("risk_delta_24h") else "") ) lines.append("") # High pressure high = [r for r in pressure_reports if r.get("band") == "high"] if high: lines += ["## 🟠 High Pressure Services", ""] for r in high: svc = r["service"] score = r.get("score", 0) summary = (r.get("signals_summary") or [""])[0] lines.append( f"- **{svc}** (score={score}): {summary}" ) lines.append("") # Investment priority list if investment_list: lines += ["## 📊 Investment Priority List", ""] lines.append("Services where Pressure ≥ require_arch_review_at AND risk is elevated:") lines.append("") for i, item in enumerate(investment_list, 1): lines.append( f"{i}. **{item['service']}** — Pressure: {item['pressure_score']} " f"({item['pressure_band']}) | Risk: {item.get('risk_score', 'N/A')} " f"({item.get('risk_band', 'N/A')})" ) lines.append("") # Action recommendations if actions: lines += ["## ✅ Action Recommendations", ""] for action in actions: lines.append(f"- {action}") lines.append("") lines += [ "---", "_Generated by DAARION.city Platform Priority Digest (deterministic, no LLM)_", ] return "\n".join(lines) # ─── Main digest function ───────────────────────────────────────────────────── def weekly_platform_digest( env: str = "prod", *, pressure_reports: Optional[List[Dict]] = None, risk_reports: Optional[Dict[str, Dict]] = None, policy: Optional[Dict] = None, week_str: Optional[str] = None, output_dir: Optional[str] = None, date_str: Optional[str] = None, write_files: bool = True, auto_followup: bool = True, incident_store=None, ) -> Dict: """ Generate Weekly Platform Priority Digest. Args: pressure_reports: pre-computed pressure reports list (sorted by score desc) risk_reports: {service: RiskReport} for side-by-side correlation policy: architecture_pressure_policy (loaded if None) week_str: ISO week for filenames (defaults to current week) output_dir: override output directory write_files: write .md and .json to disk auto_followup: call maybe_create_arch_review_followup for each requiring review incident_store: needed for auto_followup Returns: DigestResult dict with markdown, json_data, files_written, followups_created. """ if policy is None: policy = load_pressure_policy() effective_week = week_str or _now_week() effective_date = date_str or _now_date() cfg_output_dir = policy.get("digest", {}).get("output_dir", "ops/reports/platform") effective_output_dir = output_dir or cfg_output_dir max_chars = int(policy.get("digest", {}).get("max_chars", 12000)) top_n = int(policy.get("digest", {}).get("top_n_in_digest", 10)) pressure_reports = sorted(pressure_reports or [], key=lambda r: -r.get("score", 0))[:top_n] risk_reports = risk_reports or {} # Band counts band_counts: Dict[str, int] = {"critical": 0, "high": 0, "medium": 0, "low": 0} for r in pressure_reports: b = r.get("band", "low") band_counts[b] = band_counts.get(b, 0) + 1 # Investment priority list: requires_arch_review AND (risk high/critical OR delta > 0) review_at = int(policy.get("priority_rules", {}).get("require_arch_review_at", 70)) investment_list = [] for r in pressure_reports: if not r.get("requires_arch_review"): continue svc = r["service"] rr = risk_reports.get(svc, {}) risk_band = rr.get("band", "low") if rr else r.get("risk_band", "low") or "low" risk_delta = (rr.get("trend") or {}).get("delta_24h") if rr else r.get("risk_delta_24h") if risk_band in ("high", "critical") or (risk_delta is not None and risk_delta > 0): investment_list.append({ "service": svc, "pressure_score": r.get("score"), "pressure_band": r.get("band"), "risk_score": rr.get("score") if rr else r.get("risk_score"), "risk_band": risk_band, "risk_delta_24h": risk_delta, }) actions = _build_priority_actions(pressure_reports, risk_reports) markdown_raw = _build_markdown( week_str=effective_week, env=env, pressure_reports=pressure_reports, investment_list=investment_list, actions=actions, band_counts=band_counts, ) markdown = _clamp(markdown_raw, max_chars) json_data = { "week": effective_week, "date": effective_date, "env": env, "generated_at": datetime.datetime.utcnow().isoformat(), "band_counts": band_counts, "top_pressure_services": [ { "service": r.get("service"), "score": r.get("score"), "band": r.get("band"), "requires_arch_review": r.get("requires_arch_review"), "signals_summary": r.get("signals_summary", [])[:4], "components": r.get("components", {}), "risk_score": r.get("risk_score"), "risk_band": r.get("risk_band"), "risk_delta_24h": r.get("risk_delta_24h"), } for r in pressure_reports ], "investment_priority_list": investment_list, "actions": actions, } # ── Auto followup creation ──────────────────────────────────────────────── followups_created = [] if auto_followup and incident_store is not None: from architecture_pressure import maybe_create_arch_review_followup for r in pressure_reports: if r.get("requires_arch_review"): fu_result = maybe_create_arch_review_followup( r, incident_store=incident_store, policy=policy, week_str=effective_week, ) if fu_result.get("created"): followups_created.append({ "service": r["service"], "dedupe_key": fu_result.get("dedupe_key"), "incident_id": fu_result.get("incident_id"), }) # ── Write files ─────────────────────────────────────────────────────────── files_written: List[str] = [] if write_files: try: out_path = Path(effective_output_dir) out_path.mkdir(parents=True, exist_ok=True) md_file = out_path / f"{effective_week}.md" json_file = out_path / f"{effective_week}.json" md_file.write_text(markdown, encoding="utf-8") json_file.write_text(json.dumps(json_data, indent=2, default=str), encoding="utf-8") files_written = [str(md_file), str(json_file)] logger.info("platform_priority_digest: wrote %s and %s", md_file, json_file) except Exception as e: logger.warning("platform_priority_digest: failed to write files: %s", e) return { "week": effective_week, "env": env, "markdown": markdown, "json_data": json_data, "files_written": files_written, "followups_created": followups_created, "band_counts": band_counts, }