""" architecture_pressure.py — Architecture Pressure Index (APIx) Engine. DAARION.city | deterministic, no LLM. Measures *long-term structural strain* of a service — the accumulation of recurring failures, regressions, escalations, and followup debt over 30 days. Contrast with Risk Engine (short-term operational health). Public API: load_pressure_policy() -> Dict compute_pressure(service, env, ...) -> PressureReport compute_pressure_dashboard(env, services, ...) -> DashboardResult list_known_services(policy) -> List[str] """ from __future__ import annotations import datetime import logging import yaml from pathlib import Path from typing import Dict, List, Optional logger = logging.getLogger(__name__) # ─── Policy ─────────────────────────────────────────────────────────────────── _PRESSURE_POLICY_CACHE: Optional[Dict] = None _PRESSURE_POLICY_PATHS = [ Path("config/architecture_pressure_policy.yml"), Path(__file__).resolve().parent.parent.parent / "config" / "architecture_pressure_policy.yml", ] def load_pressure_policy() -> Dict: global _PRESSURE_POLICY_CACHE if _PRESSURE_POLICY_CACHE is not None: return _PRESSURE_POLICY_CACHE for p in _PRESSURE_POLICY_PATHS: if p.exists(): try: with open(p) as f: data = yaml.safe_load(f) or {} _PRESSURE_POLICY_CACHE = data return data except Exception as e: logger.warning("Failed to load architecture_pressure_policy from %s: %s", p, e) _PRESSURE_POLICY_CACHE = _builtin_pressure_defaults() return _PRESSURE_POLICY_CACHE def _reload_pressure_policy() -> None: global _PRESSURE_POLICY_CACHE _PRESSURE_POLICY_CACHE = None def _builtin_pressure_defaults() -> Dict: return { "defaults": {"lookback_days": 30, "top_n": 10}, "weights": { "recurrence_high_30d": 20, "recurrence_warn_30d": 10, "regressions_30d": 15, "escalations_30d": 12, "followups_created_30d": 8, "followups_overdue": 15, "drift_failures_30d": 10, "dependency_high_30d": 10, }, "bands": {"low_max": 20, "medium_max": 45, "high_max": 70}, "priority_rules": { "require_arch_review_at": 70, "auto_create_followup": True, "followup_priority": "P1", "followup_due_days": 14, "followup_owner": "cto", }, "release_gate": { "platform_review_required": {"enabled": True, "warn_at": 60, "fail_at": 85} }, "digest": { "output_dir": "ops/reports/platform", "max_chars": 12000, "top_n_in_digest": 10, }, } # ─── Band classifier ────────────────────────────────────────────────────────── def classify_pressure_band(score: int, policy: Dict) -> str: bands = policy.get("bands", {}) low_max = int(bands.get("low_max", 20)) med_max = int(bands.get("medium_max", 45)) high_max = int(bands.get("high_max", 70)) if score <= low_max: return "low" if score <= med_max: return "medium" if score <= high_max: return "high" return "critical" # ─── Signal scoring helpers ─────────────────────────────────────────────────── def _score_signals(components: Dict, policy: Dict) -> int: """ Additive scoring: recurrence_high_30d, recurrence_warn_30d — boolean (1/0) regressions_30d, escalations_30d, ... — counts (capped internally) """ weights = policy.get("weights", {}) score = 0 # Boolean presence signals for bool_key in ("recurrence_high_30d", "recurrence_warn_30d"): if components.get(bool_key, 0): score += int(weights.get(bool_key, 0)) # Count-based signals: weight applied per unit, capped at 3× weight for count_key in ( "regressions_30d", "escalations_30d", "followups_created_30d", "followups_overdue", "drift_failures_30d", "dependency_high_30d", ): count = int(components.get(count_key, 0)) if count: w = int(weights.get(count_key, 0)) # First occurrence = full weight, subsequent = half (diminishing) score += w + (count - 1) * max(1, w // 2) return max(0, score) def _signals_summary(components: Dict, policy: Dict) -> List[str]: """Generate human-readable signal descriptions.""" summaries = [] if components.get("recurrence_high_30d"): summaries.append("High-recurrence alert buckets in last 30d") if components.get("recurrence_warn_30d"): summaries.append("Warn-level recurrence in last 30d") regressions = int(components.get("regressions_30d", 0)) if regressions: summaries.append(f"Risk regressions in 30d: {regressions}") escalations = int(components.get("escalations_30d", 0)) if escalations: summaries.append(f"Escalations in 30d: {escalations}") fu_created = int(components.get("followups_created_30d", 0)) if fu_created: summaries.append(f"Follow-ups created in 30d: {fu_created}") fu_overdue = int(components.get("followups_overdue", 0)) if fu_overdue: summaries.append(f"Overdue follow-ups: {fu_overdue}") drift = int(components.get("drift_failures_30d", 0)) if drift: summaries.append(f"Drift gate failures in 30d: {drift}") dep = int(components.get("dependency_high_30d", 0)) if dep: summaries.append(f"Dependency HIGH/CRITICAL findings in 30d: {dep}") return summaries # ─── Signal collection from stores ─────────────────────────────────────────── def fetch_pressure_signals( service: str, env: str, lookback_days: int = 30, *, incident_store=None, alert_store=None, risk_history_store=None, policy: Optional[Dict] = None, ) -> Dict: """ Collect all signals needed for compute_pressure from existing stores. Always non-fatal per store. Returns a components dict ready to pass to compute_pressure. """ if policy is None: policy = load_pressure_policy() cutoff = ( datetime.datetime.utcnow() - datetime.timedelta(days=lookback_days) ).isoformat() cutoff_60m = ( datetime.datetime.utcnow() - datetime.timedelta(minutes=60) ).isoformat() components: Dict = { "recurrence_high_30d": 0, "recurrence_warn_30d": 0, "regressions_30d": 0, "escalations_30d": 0, "followups_created_30d": 0, "followups_overdue": 0, "drift_failures_30d": 0, "dependency_high_30d": 0, } # ── Escalations + followups from incident_store ─────────────────────────── try: if incident_store is not None: incs = incident_store.list_incidents({"service": service}, limit=100) for inc in incs: inc_id = inc.get("id", "") inc_start = inc.get("started_at") or inc.get("created_at", "") try: events = incident_store.get_events(inc_id, limit=200) for ev in events: ev_ts = ev.get("ts", "") if ev_ts < cutoff: continue ev_type = ev.get("type", "") msg = ev.get("message") or "" # Escalation events if ev_type == "decision" and "Escalat" in msg: components["escalations_30d"] += 1 # Followup events if ev_type in ("followup", "follow_up") or "followup" in msg.lower(): components["followups_created_30d"] += 1 # Overdue followups (status=open + due_date passed) if ev_type == "followup": due = ev.get("due_date", "") status = ev.get("status", "") today = datetime.datetime.utcnow().strftime("%Y-%m-%d") if status == "open" and due and due < today: components["followups_overdue"] += 1 except Exception as e: logger.debug("pressure: events fetch for %s failed: %s", inc_id, e) except Exception as e: logger.warning("pressure: incident_store fetch failed: %s", e) # ── Regressions from risk_history_store ─────────────────────────────────── try: if risk_history_store is not None: series = risk_history_store.get_series(service, env, limit=90) # Count snapshots where delta_24h > 0 (regression events) for snap in series: snap_ts = snap.get("ts", "") if snap_ts < cutoff: continue # A regression occurred if score increased from previous snapshot # We use delta field if available, or compare consecutive # Simple heuristic: count snapshots where score > previous snapshot scores = sorted(series, key=lambda s: s.get("ts", "")) for i in range(1, len(scores)): if (scores[i].get("ts", "") >= cutoff and scores[i].get("score", 0) > scores[i - 1].get("score", 0)): components["regressions_30d"] += 1 except Exception as e: logger.warning("pressure: risk_history_store fetch failed: %s", e) # ── Recurrence from alert_store top_signatures ─────────────────────────── try: if alert_store is not None: # Use 30-day window approximation via large window sigs = alert_store.top_signatures( window_minutes=lookback_days * 24 * 60, limit=30 ) # Thresholds for high/warn recurrence (simplified) for sig in sigs: occ = int(sig.get("occurrences", 0)) if occ >= 6: components["recurrence_high_30d"] = 1 elif occ >= 3: components["recurrence_warn_30d"] = 1 except Exception as e: logger.warning("pressure: alert_store recurrence fetch failed: %s", e) return components # ─── Core engine ────────────────────────────────────────────────────────────── def compute_pressure( service: str, env: str = "prod", *, components: Optional[Dict] = None, lookback_days: int = 30, policy: Optional[Dict] = None, # Optional stores for signal collection when components not pre-fetched incident_store=None, alert_store=None, risk_history_store=None, ) -> Dict: """ Compute Architecture Pressure score for a service. If `components` is provided, no stores are accessed. Otherwise, signals are collected from stores (non-fatal fallbacks). Returns a PressureReport dict. """ if policy is None: policy = load_pressure_policy() effective_days = lookback_days or int( policy.get("defaults", {}).get("lookback_days", 30) ) if components is None: components = fetch_pressure_signals( service, env, effective_days, incident_store=incident_store, alert_store=alert_store, risk_history_store=risk_history_store, policy=policy, ) else: components = dict(components) # Ensure all keys present defaults_keys = [ "recurrence_high_30d", "recurrence_warn_30d", "regressions_30d", "escalations_30d", "followups_created_30d", "followups_overdue", "drift_failures_30d", "dependency_high_30d", ] for k in defaults_keys: components.setdefault(k, 0) score = _score_signals(components, policy) band = classify_pressure_band(score, policy) signals_summary = _signals_summary(components, policy) # Architecture review required? review_threshold = int( policy.get("priority_rules", {}).get("require_arch_review_at", 70) ) requires_arch_review = score >= review_threshold return { "service": service, "env": env, "lookback_days": effective_days, "score": score, "band": band, "components": components, "signals_summary": signals_summary, "requires_arch_review": requires_arch_review, "computed_at": datetime.datetime.utcnow().isoformat(), } # ─── Dashboard ──────────────────────────────────────────────────────────────── def compute_pressure_dashboard( env: str = "prod", services: Optional[List[str]] = None, top_n: int = 10, *, policy: Optional[Dict] = None, incident_store=None, alert_store=None, risk_history_store=None, risk_reports: Optional[Dict[str, Dict]] = None, ) -> Dict: """ Compute Architecture Pressure for multiple services and return a dashboard. `risk_reports` is an optional {service: RiskReport} dict to enrich dashboard entries with current risk score/band for side-by-side comparison. """ if policy is None: policy = load_pressure_policy() effective_top_n = top_n or int(policy.get("defaults", {}).get("top_n", 10)) # Determine services to evaluate if not services: services = _list_services_from_stores( env=env, incident_store=incident_store, policy=policy ) reports = [] for svc in services: try: report = compute_pressure( svc, env, policy=policy, incident_store=incident_store, alert_store=alert_store, risk_history_store=risk_history_store, ) # Optionally attach current risk info if risk_reports and svc in risk_reports: rr = risk_reports[svc] report["risk_score"] = rr.get("score") report["risk_band"] = rr.get("band") report["risk_delta_24h"] = (rr.get("trend") or {}).get("delta_24h") reports.append(report) except Exception as e: logger.warning("pressure dashboard: compute_pressure failed for %s: %s", svc, e) reports.sort(key=lambda r: -r.get("score", 0)) # Band counts band_counts: Dict[str, int] = {"critical": 0, "high": 0, "medium": 0, "low": 0} for r in reports: b = r.get("band", "low") band_counts[b] = band_counts.get(b, 0) + 1 critical_services = [r["service"] for r in reports if r.get("band") == "critical"] high_services = [r["service"] for r in reports if r.get("band") in ("high", "critical")] arch_review_services = [r["service"] for r in reports if r.get("requires_arch_review")] return { "env": env, "computed_at": datetime.datetime.utcnow().isoformat(), "top_pressure_services": reports[:effective_top_n], "band_counts": band_counts, "critical_services": critical_services, "high_services": high_services, "arch_review_required": arch_review_services, "total_services_evaluated": len(reports), } def _list_services_from_stores( env: str, incident_store=None, policy: Optional[Dict] = None, ) -> List[str]: """Infer known services from incident store, falling back to SLO policy.""" services: set = set() try: if incident_store is not None: incs = incident_store.list_incidents({}, limit=200) for inc in incs: svc = inc.get("service") if svc: services.add(svc) except Exception as e: logger.warning("pressure: list_services from incident_store failed: %s", e) if not services: # Fallback: read from SLO policy try: slo_paths = [ Path("config/slo_policy.yml"), Path(__file__).resolve().parent.parent.parent / "config" / "slo_policy.yml", ] for p in slo_paths: if p.exists(): import yaml as _yaml with open(p) as f: slo = _yaml.safe_load(f) or {} services.update(slo.get("services", {}).keys()) break except Exception: pass return sorted(services) # ─── Auto followup creation ─────────────────────────────────────────────────── def maybe_create_arch_review_followup( pressure_report: Dict, *, incident_store=None, policy: Optional[Dict] = None, week_str: Optional[str] = None, ) -> Dict: """ If pressure score >= require_arch_review_at and auto_create_followup=True, create an architecture-review follow-up on the latest open incident. Deduped by key: arch_review:{YYYY-WW}:{service} Returns: {"created": bool, "dedupe_key": str, "skipped_reason": str|None} """ if policy is None: policy = load_pressure_policy() service = pressure_report.get("service", "") score = int(pressure_report.get("score", 0)) rules = policy.get("priority_rules", {}) review_at = int(rules.get("require_arch_review_at", 70)) auto_create = bool(rules.get("auto_create_followup", True)) if score < review_at: return {"created": False, "dedupe_key": None, "skipped_reason": f"score {score} < require_arch_review_at {review_at}"} if not auto_create: return {"created": False, "dedupe_key": None, "skipped_reason": "auto_create_followup disabled"} if incident_store is None: return {"created": False, "dedupe_key": None, "skipped_reason": "incident_store not available"} if week_str is None: week_str = datetime.datetime.utcnow().strftime("%Y-W%V") dedupe_key = f"arch_review:{week_str}:{service}" priority = rules.get("followup_priority", "P1") owner = rules.get("followup_owner", "cto") due_days = int(rules.get("followup_due_days", 14)) due_date = ( datetime.datetime.utcnow() + datetime.timedelta(days=due_days) ).strftime("%Y-%m-%d") try: # Check if a follow-up with this dedupe_key already exists incs = incident_store.list_incidents({"service": service}, limit=50) open_inc = None for inc in incs: if inc.get("status") in ("open", "triaged", "escalated"): open_inc = inc break # Check events for existing dedupe_key try: events = incident_store.get_events(inc.get("id", ""), limit=100) for ev in events: if ev.get("dedupe_key") == dedupe_key: return {"created": False, "dedupe_key": dedupe_key, "skipped_reason": f"already exists: {dedupe_key}"} except Exception: pass if open_inc is None: # No open incident — create a synthetic architecture_review incident open_inc = incident_store.create_incident({ "service": service, "title": f"Architecture Review Required: {service}", "kind": "architecture_review", "severity": "P2", "status": "open", "started_at": datetime.datetime.utcnow().isoformat(), "source": "architecture_pressure_engine", }) # Add followup event to the incident inc_id = open_inc.get("id", "") incident_store.get_events(inc_id, limit=1) # verify inc exists # Write the followup event followup_event = { "type": "followup", "ts": datetime.datetime.utcnow().isoformat(), "message": ( f"[Architecture Pressure] Score={score} >= {review_at}. " f"Schedule architecture review for '{service}'." ), "owner": owner, "priority": priority, "due_date": due_date, "status": "open", "dedupe_key": dedupe_key, "source": "architecture_pressure_engine", } if hasattr(incident_store, "add_event"): incident_store.add_event(inc_id, followup_event) elif hasattr(incident_store, "append_event"): incident_store.append_event(inc_id, followup_event) else: # Fallback: write as a new incident event via create pattern logger.info( "pressure: would create followup for %s (inc=%s, key=%s)", service, inc_id, dedupe_key ) return {"created": True, "dedupe_key": dedupe_key, "skipped_reason": None, "incident_id": inc_id, "due_date": due_date, "priority": priority} except Exception as e: logger.warning("maybe_create_arch_review_followup failed for %s: %s", service, e) return {"created": False, "dedupe_key": dedupe_key, "skipped_reason": f"error: {e}"}