Files
microdao-daarion/services/router/architecture_pressure.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

575 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
architecture_pressure.py — Architecture Pressure Index (APIx) Engine.
DAARION.city | deterministic, no LLM.
Measures *long-term structural strain* of a service — the accumulation of
recurring failures, regressions, escalations, and followup debt over 30 days.
Contrast with Risk Engine (short-term operational health).
Public API:
load_pressure_policy() -> Dict
compute_pressure(service, env, ...) -> PressureReport
compute_pressure_dashboard(env, services, ...) -> DashboardResult
list_known_services(policy) -> List[str]
"""
from __future__ import annotations
import datetime
import logging
import yaml
from pathlib import Path
from typing import Dict, List, Optional
logger = logging.getLogger(__name__)
# ─── Policy ───────────────────────────────────────────────────────────────────
_PRESSURE_POLICY_CACHE: Optional[Dict] = None
_PRESSURE_POLICY_PATHS = [
Path("config/architecture_pressure_policy.yml"),
Path(__file__).resolve().parent.parent.parent / "config" / "architecture_pressure_policy.yml",
]
def load_pressure_policy() -> Dict:
global _PRESSURE_POLICY_CACHE
if _PRESSURE_POLICY_CACHE is not None:
return _PRESSURE_POLICY_CACHE
for p in _PRESSURE_POLICY_PATHS:
if p.exists():
try:
with open(p) as f:
data = yaml.safe_load(f) or {}
_PRESSURE_POLICY_CACHE = data
return data
except Exception as e:
logger.warning("Failed to load architecture_pressure_policy from %s: %s", p, e)
_PRESSURE_POLICY_CACHE = _builtin_pressure_defaults()
return _PRESSURE_POLICY_CACHE
def _reload_pressure_policy() -> None:
global _PRESSURE_POLICY_CACHE
_PRESSURE_POLICY_CACHE = None
def _builtin_pressure_defaults() -> Dict:
return {
"defaults": {"lookback_days": 30, "top_n": 10},
"weights": {
"recurrence_high_30d": 20,
"recurrence_warn_30d": 10,
"regressions_30d": 15,
"escalations_30d": 12,
"followups_created_30d": 8,
"followups_overdue": 15,
"drift_failures_30d": 10,
"dependency_high_30d": 10,
},
"bands": {"low_max": 20, "medium_max": 45, "high_max": 70},
"priority_rules": {
"require_arch_review_at": 70,
"auto_create_followup": True,
"followup_priority": "P1",
"followup_due_days": 14,
"followup_owner": "cto",
},
"release_gate": {
"platform_review_required": {"enabled": True, "warn_at": 60, "fail_at": 85}
},
"digest": {
"output_dir": "ops/reports/platform",
"max_chars": 12000,
"top_n_in_digest": 10,
},
}
# ─── Band classifier ──────────────────────────────────────────────────────────
def classify_pressure_band(score: int, policy: Dict) -> str:
bands = policy.get("bands", {})
low_max = int(bands.get("low_max", 20))
med_max = int(bands.get("medium_max", 45))
high_max = int(bands.get("high_max", 70))
if score <= low_max:
return "low"
if score <= med_max:
return "medium"
if score <= high_max:
return "high"
return "critical"
# ─── Signal scoring helpers ───────────────────────────────────────────────────
def _score_signals(components: Dict, policy: Dict) -> int:
"""
Additive scoring:
recurrence_high_30d, recurrence_warn_30d — boolean (1/0)
regressions_30d, escalations_30d, ... — counts (capped internally)
"""
weights = policy.get("weights", {})
score = 0
# Boolean presence signals
for bool_key in ("recurrence_high_30d", "recurrence_warn_30d"):
if components.get(bool_key, 0):
score += int(weights.get(bool_key, 0))
# Count-based signals: weight applied per unit, capped at 3× weight
for count_key in (
"regressions_30d", "escalations_30d", "followups_created_30d",
"followups_overdue", "drift_failures_30d", "dependency_high_30d",
):
count = int(components.get(count_key, 0))
if count:
w = int(weights.get(count_key, 0))
# First occurrence = full weight, subsequent = half (diminishing)
score += w + (count - 1) * max(1, w // 2)
return max(0, score)
def _signals_summary(components: Dict, policy: Dict) -> List[str]:
"""Generate human-readable signal descriptions."""
summaries = []
if components.get("recurrence_high_30d"):
summaries.append("High-recurrence alert buckets in last 30d")
if components.get("recurrence_warn_30d"):
summaries.append("Warn-level recurrence in last 30d")
regressions = int(components.get("regressions_30d", 0))
if regressions:
summaries.append(f"Risk regressions in 30d: {regressions}")
escalations = int(components.get("escalations_30d", 0))
if escalations:
summaries.append(f"Escalations in 30d: {escalations}")
fu_created = int(components.get("followups_created_30d", 0))
if fu_created:
summaries.append(f"Follow-ups created in 30d: {fu_created}")
fu_overdue = int(components.get("followups_overdue", 0))
if fu_overdue:
summaries.append(f"Overdue follow-ups: {fu_overdue}")
drift = int(components.get("drift_failures_30d", 0))
if drift:
summaries.append(f"Drift gate failures in 30d: {drift}")
dep = int(components.get("dependency_high_30d", 0))
if dep:
summaries.append(f"Dependency HIGH/CRITICAL findings in 30d: {dep}")
return summaries
# ─── Signal collection from stores ───────────────────────────────────────────
def fetch_pressure_signals(
service: str,
env: str,
lookback_days: int = 30,
*,
incident_store=None,
alert_store=None,
risk_history_store=None,
policy: Optional[Dict] = None,
) -> Dict:
"""
Collect all signals needed for compute_pressure from existing stores.
Always non-fatal per store.
Returns a components dict ready to pass to compute_pressure.
"""
if policy is None:
policy = load_pressure_policy()
cutoff = (
datetime.datetime.utcnow() - datetime.timedelta(days=lookback_days)
).isoformat()
cutoff_60m = (
datetime.datetime.utcnow() - datetime.timedelta(minutes=60)
).isoformat()
components: Dict = {
"recurrence_high_30d": 0,
"recurrence_warn_30d": 0,
"regressions_30d": 0,
"escalations_30d": 0,
"followups_created_30d": 0,
"followups_overdue": 0,
"drift_failures_30d": 0,
"dependency_high_30d": 0,
}
# ── Escalations + followups from incident_store ───────────────────────────
try:
if incident_store is not None:
incs = incident_store.list_incidents({"service": service}, limit=100)
for inc in incs:
inc_id = inc.get("id", "")
inc_start = inc.get("started_at") or inc.get("created_at", "")
try:
events = incident_store.get_events(inc_id, limit=200)
for ev in events:
ev_ts = ev.get("ts", "")
if ev_ts < cutoff:
continue
ev_type = ev.get("type", "")
msg = ev.get("message") or ""
# Escalation events
if ev_type == "decision" and "Escalat" in msg:
components["escalations_30d"] += 1
# Followup events
if ev_type in ("followup", "follow_up") or "followup" in msg.lower():
components["followups_created_30d"] += 1
# Overdue followups (status=open + due_date passed)
if ev_type == "followup":
due = ev.get("due_date", "")
status = ev.get("status", "")
today = datetime.datetime.utcnow().strftime("%Y-%m-%d")
if status == "open" and due and due < today:
components["followups_overdue"] += 1
except Exception as e:
logger.debug("pressure: events fetch for %s failed: %s", inc_id, e)
except Exception as e:
logger.warning("pressure: incident_store fetch failed: %s", e)
# ── Regressions from risk_history_store ───────────────────────────────────
try:
if risk_history_store is not None:
series = risk_history_store.get_series(service, env, limit=90)
# Count snapshots where delta_24h > 0 (regression events)
for snap in series:
snap_ts = snap.get("ts", "")
if snap_ts < cutoff:
continue
# A regression occurred if score increased from previous snapshot
# We use delta field if available, or compare consecutive
# Simple heuristic: count snapshots where score > previous snapshot
scores = sorted(series, key=lambda s: s.get("ts", ""))
for i in range(1, len(scores)):
if (scores[i].get("ts", "") >= cutoff
and scores[i].get("score", 0) > scores[i - 1].get("score", 0)):
components["regressions_30d"] += 1
except Exception as e:
logger.warning("pressure: risk_history_store fetch failed: %s", e)
# ── Recurrence from alert_store top_signatures ───────────────────────────
try:
if alert_store is not None:
# Use 30-day window approximation via large window
sigs = alert_store.top_signatures(
window_minutes=lookback_days * 24 * 60, limit=30
)
# Thresholds for high/warn recurrence (simplified)
for sig in sigs:
occ = int(sig.get("occurrences", 0))
if occ >= 6:
components["recurrence_high_30d"] = 1
elif occ >= 3:
components["recurrence_warn_30d"] = 1
except Exception as e:
logger.warning("pressure: alert_store recurrence fetch failed: %s", e)
return components
# ─── Core engine ──────────────────────────────────────────────────────────────
def compute_pressure(
service: str,
env: str = "prod",
*,
components: Optional[Dict] = None,
lookback_days: int = 30,
policy: Optional[Dict] = None,
# Optional stores for signal collection when components not pre-fetched
incident_store=None,
alert_store=None,
risk_history_store=None,
) -> Dict:
"""
Compute Architecture Pressure score for a service.
If `components` is provided, no stores are accessed.
Otherwise, signals are collected from stores (non-fatal fallbacks).
Returns a PressureReport dict.
"""
if policy is None:
policy = load_pressure_policy()
effective_days = lookback_days or int(
policy.get("defaults", {}).get("lookback_days", 30)
)
if components is None:
components = fetch_pressure_signals(
service, env, effective_days,
incident_store=incident_store,
alert_store=alert_store,
risk_history_store=risk_history_store,
policy=policy,
)
else:
components = dict(components)
# Ensure all keys present
defaults_keys = [
"recurrence_high_30d", "recurrence_warn_30d", "regressions_30d",
"escalations_30d", "followups_created_30d", "followups_overdue",
"drift_failures_30d", "dependency_high_30d",
]
for k in defaults_keys:
components.setdefault(k, 0)
score = _score_signals(components, policy)
band = classify_pressure_band(score, policy)
signals_summary = _signals_summary(components, policy)
# Architecture review required?
review_threshold = int(
policy.get("priority_rules", {}).get("require_arch_review_at", 70)
)
requires_arch_review = score >= review_threshold
return {
"service": service,
"env": env,
"lookback_days": effective_days,
"score": score,
"band": band,
"components": components,
"signals_summary": signals_summary,
"requires_arch_review": requires_arch_review,
"computed_at": datetime.datetime.utcnow().isoformat(),
}
# ─── Dashboard ────────────────────────────────────────────────────────────────
def compute_pressure_dashboard(
env: str = "prod",
services: Optional[List[str]] = None,
top_n: int = 10,
*,
policy: Optional[Dict] = None,
incident_store=None,
alert_store=None,
risk_history_store=None,
risk_reports: Optional[Dict[str, Dict]] = None,
) -> Dict:
"""
Compute Architecture Pressure for multiple services and return a dashboard.
`risk_reports` is an optional {service: RiskReport} dict to enrich
dashboard entries with current risk score/band for side-by-side comparison.
"""
if policy is None:
policy = load_pressure_policy()
effective_top_n = top_n or int(policy.get("defaults", {}).get("top_n", 10))
# Determine services to evaluate
if not services:
services = _list_services_from_stores(
env=env, incident_store=incident_store, policy=policy
)
reports = []
for svc in services:
try:
report = compute_pressure(
svc, env,
policy=policy,
incident_store=incident_store,
alert_store=alert_store,
risk_history_store=risk_history_store,
)
# Optionally attach current risk info
if risk_reports and svc in risk_reports:
rr = risk_reports[svc]
report["risk_score"] = rr.get("score")
report["risk_band"] = rr.get("band")
report["risk_delta_24h"] = (rr.get("trend") or {}).get("delta_24h")
reports.append(report)
except Exception as e:
logger.warning("pressure dashboard: compute_pressure failed for %s: %s", svc, e)
reports.sort(key=lambda r: -r.get("score", 0))
# Band counts
band_counts: Dict[str, int] = {"critical": 0, "high": 0, "medium": 0, "low": 0}
for r in reports:
b = r.get("band", "low")
band_counts[b] = band_counts.get(b, 0) + 1
critical_services = [r["service"] for r in reports if r.get("band") == "critical"]
high_services = [r["service"] for r in reports if r.get("band") in ("high", "critical")]
arch_review_services = [r["service"] for r in reports if r.get("requires_arch_review")]
return {
"env": env,
"computed_at": datetime.datetime.utcnow().isoformat(),
"top_pressure_services": reports[:effective_top_n],
"band_counts": band_counts,
"critical_services": critical_services,
"high_services": high_services,
"arch_review_required": arch_review_services,
"total_services_evaluated": len(reports),
}
def _list_services_from_stores(
env: str,
incident_store=None,
policy: Optional[Dict] = None,
) -> List[str]:
"""Infer known services from incident store, falling back to SLO policy."""
services: set = set()
try:
if incident_store is not None:
incs = incident_store.list_incidents({}, limit=200)
for inc in incs:
svc = inc.get("service")
if svc:
services.add(svc)
except Exception as e:
logger.warning("pressure: list_services from incident_store failed: %s", e)
if not services:
# Fallback: read from SLO policy
try:
slo_paths = [
Path("config/slo_policy.yml"),
Path(__file__).resolve().parent.parent.parent / "config" / "slo_policy.yml",
]
for p in slo_paths:
if p.exists():
import yaml as _yaml
with open(p) as f:
slo = _yaml.safe_load(f) or {}
services.update(slo.get("services", {}).keys())
break
except Exception:
pass
return sorted(services)
# ─── Auto followup creation ───────────────────────────────────────────────────
def maybe_create_arch_review_followup(
pressure_report: Dict,
*,
incident_store=None,
policy: Optional[Dict] = None,
week_str: Optional[str] = None,
) -> Dict:
"""
If pressure score >= require_arch_review_at and auto_create_followup=True,
create an architecture-review follow-up on the latest open incident.
Deduped by key: arch_review:{YYYY-WW}:{service}
Returns: {"created": bool, "dedupe_key": str, "skipped_reason": str|None}
"""
if policy is None:
policy = load_pressure_policy()
service = pressure_report.get("service", "")
score = int(pressure_report.get("score", 0))
rules = policy.get("priority_rules", {})
review_at = int(rules.get("require_arch_review_at", 70))
auto_create = bool(rules.get("auto_create_followup", True))
if score < review_at:
return {"created": False, "dedupe_key": None,
"skipped_reason": f"score {score} < require_arch_review_at {review_at}"}
if not auto_create:
return {"created": False, "dedupe_key": None,
"skipped_reason": "auto_create_followup disabled"}
if incident_store is None:
return {"created": False, "dedupe_key": None,
"skipped_reason": "incident_store not available"}
if week_str is None:
week_str = datetime.datetime.utcnow().strftime("%Y-W%V")
dedupe_key = f"arch_review:{week_str}:{service}"
priority = rules.get("followup_priority", "P1")
owner = rules.get("followup_owner", "cto")
due_days = int(rules.get("followup_due_days", 14))
due_date = (
datetime.datetime.utcnow() + datetime.timedelta(days=due_days)
).strftime("%Y-%m-%d")
try:
# Check if a follow-up with this dedupe_key already exists
incs = incident_store.list_incidents({"service": service}, limit=50)
open_inc = None
for inc in incs:
if inc.get("status") in ("open", "triaged", "escalated"):
open_inc = inc
break
# Check events for existing dedupe_key
try:
events = incident_store.get_events(inc.get("id", ""), limit=100)
for ev in events:
if ev.get("dedupe_key") == dedupe_key:
return {"created": False, "dedupe_key": dedupe_key,
"skipped_reason": f"already exists: {dedupe_key}"}
except Exception:
pass
if open_inc is None:
# No open incident — create a synthetic architecture_review incident
open_inc = incident_store.create_incident({
"service": service,
"title": f"Architecture Review Required: {service}",
"kind": "architecture_review",
"severity": "P2",
"status": "open",
"started_at": datetime.datetime.utcnow().isoformat(),
"source": "architecture_pressure_engine",
})
# Add followup event to the incident
inc_id = open_inc.get("id", "")
incident_store.get_events(inc_id, limit=1) # verify inc exists
# Write the followup event
followup_event = {
"type": "followup",
"ts": datetime.datetime.utcnow().isoformat(),
"message": (
f"[Architecture Pressure] Score={score} >= {review_at}. "
f"Schedule architecture review for '{service}'."
),
"owner": owner,
"priority": priority,
"due_date": due_date,
"status": "open",
"dedupe_key": dedupe_key,
"source": "architecture_pressure_engine",
}
if hasattr(incident_store, "add_event"):
incident_store.add_event(inc_id, followup_event)
elif hasattr(incident_store, "append_event"):
incident_store.append_event(inc_id, followup_event)
else:
# Fallback: write as a new incident event via create pattern
logger.info(
"pressure: would create followup for %s (inc=%s, key=%s)",
service, inc_id, dedupe_key
)
return {"created": True, "dedupe_key": dedupe_key, "skipped_reason": None,
"incident_id": inc_id, "due_date": due_date, "priority": priority}
except Exception as e:
logger.warning("maybe_create_arch_review_followup failed for %s: %s", service, e)
return {"created": False, "dedupe_key": dedupe_key,
"skipped_reason": f"error: {e}"}