Files
microdao-daarion/services/router/incident_intelligence.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

1150 lines
43 KiB
Python

"""
incident_intelligence.py — Incident Intelligence Layer (deterministic, no LLM).
Functions:
correlate_incident(incident_id, policy, store) -> related[]
detect_recurrence(window_days, policy, store) -> stats
weekly_digest(policy, store) -> {json, markdown}
Policy: config/incident_intelligence_policy.yml
"""
from __future__ import annotations
import datetime
import json
import logging
import os
import re
import textwrap
import yaml
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from incident_intel_utils import (
extract_kind,
incident_key_fields,
incidents_within_minutes,
mask_signature,
safe_truncate,
severity_rank,
format_duration,
parse_iso,
)
logger = logging.getLogger(__name__)
# ─── Policy ───────────────────────────────────────────────────────────────────
_POLICY_CACHE: Optional[Dict] = None
_POLICY_SEARCH_PATHS = [
Path("config/incident_intelligence_policy.yml"),
Path(__file__).resolve().parent.parent.parent / "config" / "incident_intelligence_policy.yml",
]
def load_intel_policy() -> Dict:
global _POLICY_CACHE
if _POLICY_CACHE is not None:
return _POLICY_CACHE
for p in _POLICY_SEARCH_PATHS:
if p.exists():
try:
with open(p) as f:
data = yaml.safe_load(f) or {}
_POLICY_CACHE = data
return data
except Exception as e:
logger.warning("Failed to load intel policy from %s: %s", p, e)
logger.warning("incident_intelligence_policy.yml not found; using defaults")
_POLICY_CACHE = _builtin_defaults()
return _POLICY_CACHE
def _builtin_defaults() -> Dict:
return {
"correlation": {
"lookback_days": 30,
"max_related": 10,
"min_score": 20,
"rules": [
{"name": "same_signature", "weight": 100, "match": {"signature": True}},
{"name": "same_service_and_kind", "weight": 60,
"match": {"same_service": True, "same_kind": True}},
{"name": "same_service_time_cluster", "weight": 40,
"match": {"same_service": True, "within_minutes": 180}},
{"name": "same_kind_cross_service", "weight": 30,
"match": {"same_kind": True, "within_minutes": 120}},
],
},
"recurrence": {
"windows_days": [7, 30],
"thresholds": {
"signature": {"warn": 3, "high": 6},
"kind": {"warn": 5, "high": 10},
},
"top_n": 15,
"recommendations": {
"signature_high": "Create permanent fix: add regression test + SLO guard",
"signature_warn": "Review root cause history; consider monitoring threshold",
"kind_high": "Systemic issue with kind={kind}: review architecture",
"kind_warn": "Recurring kind={kind}: validate alert thresholds",
},
},
"digest": {
"weekly_day": "Mon",
"include_closed": True,
"include_open": True,
"output_dir": "ops/reports/incidents",
"markdown_max_chars": 8000,
"top_incidents": 20,
},
}
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _now_iso() -> str:
return datetime.datetime.utcnow().isoformat()
def _lookback_cutoff(days: int) -> str:
return (datetime.datetime.utcnow() - datetime.timedelta(days=days)).isoformat()
def _incidents_in_window(store, days: int, limit: int = 1000) -> List[Dict]:
"""Load all incidents (open+closed) in last N days."""
cutoff = _lookback_cutoff(days)
all_incs: List[Dict] = []
for status in ("open", "mitigating", "closed", "resolved"):
batch = store.list_incidents({"status": status}, limit=limit)
all_incs.extend(i for i in batch if i.get("started_at", "") >= cutoff)
seen = set()
unique = []
for i in all_incs:
if i["id"] not in seen:
seen.add(i["id"])
unique.append(i)
return unique
# ─── 1. Correlation ───────────────────────────────────────────────────────────
def correlate_incident(
incident_id: str,
policy: Optional[Dict] = None,
store = None,
append_note: bool = False,
) -> List[Dict]:
"""
Find related incidents for a given incident_id using scored matching.
Returns list sorted by score desc (highest relevance first).
"""
if policy is None:
policy = load_intel_policy()
if store is None:
from incident_store import get_incident_store
store = get_incident_store()
corr_cfg = policy.get("correlation", {})
lookback_days = int(corr_cfg.get("lookback_days", 30))
max_related = int(corr_cfg.get("max_related", 10))
min_score = int(corr_cfg.get("min_score", 20))
rules = corr_cfg.get("rules", [])
target_raw = store.get_incident(incident_id)
if not target_raw:
return []
target = incident_key_fields(target_raw)
candidates = _incidents_in_window(store, lookback_days, limit=500)
scored: List[Dict] = []
for cand_raw in candidates:
cid = cand_raw.get("id", "")
if cid == incident_id:
continue
cand = incident_key_fields(cand_raw)
score, reasons = _score_pair(target, cand, target_raw, cand_raw, rules)
if score >= min_score:
scored.append({
"incident_id": cid,
"score": score,
"reasons": reasons,
"service": cand["service"],
"kind": cand["kind"],
"severity": cand["severity"],
"status": cand["status"],
"started_at": cand["started_at"],
"signature": mask_signature(cand["signature"]),
})
scored.sort(key=lambda x: (-x["score"], x["started_at"]))
related = scored[:max_related]
# Optionally append correlation note to incident timeline
if append_note and related:
note_parts = [f"`{r['incident_id']}` score={r['score']} ({', '.join(r['reasons'])})"
for r in related[:5]]
note = "Related incidents: " + "; ".join(note_parts)
try:
store.append_event(incident_id, "note", safe_truncate(note, 2000),
meta={"auto_created": True, "related_count": len(related)})
except Exception as e:
logger.warning("correlate_incident: append_note failed: %s", e)
return related
def _score_pair(
target: Dict, cand: Dict,
target_raw: Dict, cand_raw: Dict,
rules: List[Dict],
) -> Tuple[int, List[str]]:
"""Compute correlation score and matching reasons for a candidate pair."""
score = 0
reasons: List[str] = []
t_sig = target.get("signature", "")
c_sig = cand.get("signature", "")
t_svc = target.get("service", "")
c_svc = cand.get("service", "")
t_kind = target.get("kind", "")
c_kind = cand.get("kind", "")
t_start = target.get("started_at", "")
c_start = cand.get("started_at", "")
for rule in rules:
m = rule.get("match", {})
w = int(rule.get("weight", 0))
name = rule.get("name", "rule")
# Signature-only rule: only fires when signatures are equal; skip otherwise.
if "signature" in m:
if t_sig and c_sig and t_sig == c_sig:
score += w
reasons.append(name)
continue # never fall through to combined-conditions for this rule
# Combined conditions (same_service / same_kind / within_minutes)
within = m.get("within_minutes")
matched = True
if m.get("same_service"):
if t_svc != c_svc:
matched = False
if m.get("same_kind"):
if t_kind != c_kind or not t_kind or t_kind == "custom":
matched = False
if within is not None:
if not incidents_within_minutes(target_raw, cand_raw, within):
matched = False
if matched:
score += w
reasons.append(name)
return score, reasons
# ─── 2. Recurrence Detection ──────────────────────────────────────────────────
def detect_recurrence(
window_days: int = 7,
policy: Optional[Dict] = None,
store = None,
) -> Dict:
"""
Analyze incident frequency for given window.
Returns frequency tables and threshold classifications.
"""
if policy is None:
policy = load_intel_policy()
if store is None:
from incident_store import get_incident_store
store = get_incident_store()
rec_cfg = policy.get("recurrence", {})
thresholds = rec_cfg.get("thresholds", {})
sig_thresh = thresholds.get("signature", {"warn": 3, "high": 6})
kind_thresh = thresholds.get("kind", {"warn": 5, "high": 10})
top_n = int(rec_cfg.get("top_n", 15))
incidents = _incidents_in_window(store, window_days)
# Frequency tables
sig_count: Dict[str, Dict] = {} # signature → {count, services, last_seen, severity_min}
kind_count: Dict[str, Dict] = {} # kind → {count, services}
svc_count: Dict[str, int] = defaultdict(int)
sev_count: Dict[str, int] = defaultdict(int)
open_count = 0
closed_count = 0
for inc in incidents:
fields = incident_key_fields(inc)
sig = fields["signature"]
kind = fields["kind"]
svc = fields["service"]
sev = fields["severity"]
status = fields["status"]
started_at = fields["started_at"]
svc_count[svc] += 1
sev_count[sev] += 1
if status in ("open", "mitigating"):
open_count += 1
else:
closed_count += 1
if sig:
if sig not in sig_count:
sig_count[sig] = {"count": 0, "services": set(), "last_seen": "",
"severity_min": sev}
sig_count[sig]["count"] += 1
sig_count[sig]["services"].add(svc)
if started_at > sig_count[sig]["last_seen"]:
sig_count[sig]["last_seen"] = started_at
if severity_rank(sev) < severity_rank(sig_count[sig]["severity_min"]):
sig_count[sig]["severity_min"] = sev
if kind and kind != "custom":
if kind not in kind_count:
kind_count[kind] = {"count": 0, "services": set()}
kind_count[kind]["count"] += 1
kind_count[kind]["services"].add(svc)
# Serialize sets
top_sigs = sorted(
[{"signature": k, "count": v["count"],
"services": sorted(v["services"]),
"last_seen": v["last_seen"],
"severity_min": v["severity_min"]}
for k, v in sig_count.items()],
key=lambda x: -x["count"],
)[:top_n]
top_kinds = sorted(
[{"kind": k, "count": v["count"], "services": sorted(v["services"])}
for k, v in kind_count.items()],
key=lambda x: -x["count"],
)[:top_n]
top_services = sorted(
[{"service": k, "count": v} for k, v in svc_count.items()],
key=lambda x: -x["count"],
)[:top_n]
# Classify high recurrence
high_sigs = [s for s in top_sigs if s["count"] >= sig_thresh.get("high", 6)]
warn_sigs = [s for s in top_sigs
if sig_thresh.get("warn", 3) <= s["count"] < sig_thresh.get("high", 6)]
high_kinds = [k for k in top_kinds if k["count"] >= kind_thresh.get("high", 10)]
warn_kinds = [k for k in top_kinds
if kind_thresh.get("warn", 5) <= k["count"] < kind_thresh.get("high", 10)]
return {
"window_days": window_days,
"total_incidents": len(incidents),
"open_count": open_count,
"closed_count": closed_count,
"severity_distribution": dict(sev_count),
"top_signatures": top_sigs,
"top_kinds": top_kinds,
"top_services": top_services,
"high_recurrence": {
"signatures": high_sigs,
"kinds": high_kinds,
},
"warn_recurrence": {
"signatures": warn_sigs,
"kinds": warn_kinds,
},
}
# ─── 3. Weekly Digest ─────────────────────────────────────────────────────────
def weekly_digest(
policy: Optional[Dict] = None,
store = None,
save_artifacts: bool = True,
) -> Dict:
"""
Generate weekly incident digest: markdown + JSON.
Saves to output_dir if save_artifacts=True.
Returns {markdown, json_data, artifact_paths}.
"""
if policy is None:
policy = load_intel_policy()
if store is None:
from incident_store import get_incident_store
store = get_incident_store()
digest_cfg = policy.get("digest", {})
max_chars = int(digest_cfg.get("markdown_max_chars", 8000))
top_n_inc = int(digest_cfg.get("top_incidents", 20))
output_dir = digest_cfg.get("output_dir", "ops/reports/incidents")
now = datetime.datetime.utcnow()
week_str = now.strftime("%Y-W%W")
ts_str = now.strftime("%Y-%m-%d %H:%M UTC")
# ── Collect data ──────────────────────────────────────────────────────────
rec_7d = detect_recurrence(window_days=7, policy=policy, store=store)
rec_30d = detect_recurrence(window_days=30, policy=policy, store=store)
# Open incidents
open_incs = store.list_incidents({"status": "open"}, limit=100)
mitigating = store.list_incidents({"status": "mitigating"}, limit=50)
all_open = open_incs + mitigating
all_open.sort(key=lambda i: severity_rank(i.get("severity", "P3")))
# Last 7d incidents (sorted by severity then started_at)
recent = _incidents_in_window(store, 7, limit=top_n_inc * 2)
recent.sort(key=lambda i: (severity_rank(i.get("severity", "P3")), i.get("started_at", "")))
recent = recent[:top_n_inc]
# ── Root-cause buckets ────────────────────────────────────────────────────
all_30d = _incidents_in_window(store, 30, limit=1000)
buckets = build_root_cause_buckets(all_30d, policy=policy, windows=[7, 30])
buck_cfg = policy.get("buckets", {})
sig_high_thresh = int(policy.get("recurrence", {}).get("thresholds", {})
.get("signature", {}).get("high", 6))
kind_high_thresh = int(policy.get("recurrence", {}).get("thresholds", {})
.get("kind", {}).get("high", 10))
high_buckets = [
b for b in buckets
if b["counts"]["7d"] >= sig_high_thresh
or any(k in {ki["kind"] for ki in rec_7d.get("high_recurrence", {}).get("kinds", [])}
for k in b.get("kinds", []))
]
# ── Auto follow-ups ───────────────────────────────────────────────────────
autofollowup_result: Dict = {"created": [], "skipped": []}
if policy.get("autofollowups", {}).get("enabled", True):
try:
autofollowup_result = create_autofollowups(
buckets=buckets,
rec_7d=rec_7d,
policy=policy,
store=store,
week_str=week_str,
)
except Exception as e:
logger.warning("weekly_digest: autofollowups error (non-fatal): %s", e)
# ── Build recommendations ─────────────────────────────────────────────────
recs = _build_recommendations(rec_7d, rec_30d, policy)
# ── Build JSON payload ────────────────────────────────────────────────────
json_data = {
"generated_at": _now_iso(),
"week": week_str,
"open_incidents_count": len(all_open),
"recent_7d_count": rec_7d["total_incidents"],
"recent_30d_count": rec_30d["total_incidents"],
"recurrence_7d": rec_7d,
"recurrence_30d": rec_30d,
"open_incidents": [_inc_summary(i) for i in all_open[:20]],
"recent_incidents": [_inc_summary(i) for i in recent],
"recommendations": recs,
"buckets": {
"top": buckets,
"high": high_buckets,
},
"autofollowups": autofollowup_result,
}
# ── Build Markdown ────────────────────────────────────────────────────────
md = _build_markdown(
week_str=week_str,
ts_str=ts_str,
all_open=all_open,
recent=recent,
rec_7d=rec_7d,
rec_30d=rec_30d,
recs=recs,
buckets=buckets,
autofollowup_result=autofollowup_result,
)
if len(md) > max_chars:
md = md[:max_chars - 80] + f"\n\n… *(digest truncated at {max_chars} chars)*"
# ── Save artifacts ────────────────────────────────────────────────────────
artifact_paths: List[str] = []
if save_artifacts:
artifact_paths = _save_digest_artifacts(output_dir, week_str, json_data, md)
return {
"markdown": md,
"json_data": json_data,
"artifact_paths": artifact_paths,
"week": week_str,
}
def _inc_summary(inc: Dict) -> Dict:
return {
"id": inc.get("id", ""),
"service": inc.get("service", ""),
"env": inc.get("env", "prod"),
"severity": inc.get("severity", "P2"),
"status": inc.get("status", ""),
"kind": extract_kind(inc),
"title": safe_truncate(inc.get("title", ""), 120),
"started_at": inc.get("started_at", ""),
"duration": format_duration(
inc.get("started_at", ""), inc.get("ended_at")
),
"signature": mask_signature(
(inc.get("meta") or {}).get("incident_signature", "")
),
}
def _build_recommendations(rec_7d: Dict, rec_30d: Dict, policy: Dict) -> List[Dict]:
recs_cfg = policy.get("recurrence", {}).get("recommendations", {})
recs: List[Dict] = []
for sig_item in rec_7d.get("high_recurrence", {}).get("signatures", []):
sig_short = mask_signature(sig_item["signature"])
msg_tmpl = recs_cfg.get("signature_high",
"Create permanent fix for signature {sig}")
recs.append({
"level": "high",
"category": "signature",
"target": sig_short,
"services": sig_item.get("services", []),
"count_7d": sig_item.get("count", 0),
"message": msg_tmpl.format(sig=sig_short, **sig_item),
})
for sig_item in rec_7d.get("warn_recurrence", {}).get("signatures", []):
sig_short = mask_signature(sig_item["signature"])
msg_tmpl = recs_cfg.get("signature_warn",
"Review root cause for signature {sig}")
recs.append({
"level": "warn",
"category": "signature",
"target": sig_short,
"services": sig_item.get("services", []),
"count_7d": sig_item.get("count", 0),
"message": msg_tmpl.format(sig=sig_short, **sig_item),
})
for kind_item in rec_7d.get("high_recurrence", {}).get("kinds", []):
kind = kind_item.get("kind", "?")
msg_tmpl = recs_cfg.get("kind_high", "Systemic issue with {kind}")
recs.append({
"level": "high",
"category": "kind",
"target": kind,
"services": kind_item.get("services", []),
"count_7d": kind_item.get("count", 0),
"message": msg_tmpl.format(kind=kind),
})
for kind_item in rec_7d.get("warn_recurrence", {}).get("kinds", []):
kind = kind_item.get("kind", "?")
msg_tmpl = recs_cfg.get("kind_warn", "Recurring {kind}")
recs.append({
"level": "warn",
"category": "kind",
"target": kind,
"services": kind_item.get("services", []),
"count_7d": kind_item.get("count", 0),
"message": msg_tmpl.format(kind=kind),
})
return recs
def _build_markdown(
week_str: str, ts_str: str,
all_open: List[Dict], recent: List[Dict],
rec_7d: Dict, rec_30d: Dict,
recs: List[Dict],
buckets: Optional[List[Dict]] = None,
autofollowup_result: Optional[Dict] = None,
) -> str:
lines = [
f"# Weekly Incident Digest — {week_str}",
f"*Generated: {ts_str}*",
"",
"---",
"",
f"## Summary",
f"| Metric | Value |",
f"|--------|-------|",
f"| Open incidents | {len(all_open)} |",
f"| Incidents (7d) | {rec_7d['total_incidents']} |",
f"| Incidents (30d) | {rec_30d['total_incidents']} |",
"",
]
# ── Open incidents ─────────────────────────────────────────────────────────
if all_open:
lines.append("## 🔴 Open Incidents")
for inc in all_open[:10]:
sev = inc.get("severity", "?")
svc = inc.get("service", "?")
title = safe_truncate(inc.get("title", ""), 80)
dur = format_duration(inc.get("started_at", ""), inc.get("ended_at"))
kind = extract_kind(inc)
lines.append(f"- **[{sev}]** `{inc.get('id','?')}` {svc}{title} *(kind: {kind}, {dur})*")
if len(all_open) > 10:
lines.append(f"- … and {len(all_open) - 10} more open incidents")
lines.append("")
# ── Recent 7d ─────────────────────────────────────────────────────────────
if recent:
lines.append("## 📋 Recent Incidents (7 days)")
for inc in recent[:15]:
sev = inc.get("severity", "?")
status = inc.get("status", "?")
svc = inc.get("service", "?")
title = safe_truncate(inc.get("title", ""), 80)
dur = format_duration(inc.get("started_at", ""), inc.get("ended_at"))
kind = extract_kind(inc)
lines.append(f"- **[{sev}/{status}]** {svc}{title} *(kind: {kind}, {dur})*")
lines.append("")
# ── Recurrence: 7d ────────────────────────────────────────────────────────
lines.append("## 🔁 Recurrence (7 days)")
if rec_7d["top_signatures"]:
lines.append("### Top Signatures")
for item in rec_7d["top_signatures"][:8]:
sig_s = mask_signature(item["signature"])
svcs = ", ".join(item.get("services", [])[:3])
lines.append(f"- `{sig_s}` — {item['count']}x ({svcs})")
if rec_7d["top_kinds"]:
lines.append("### Top Kinds")
for item in rec_7d["top_kinds"][:8]:
svcs = ", ".join(item.get("services", [])[:3])
lines.append(f"- `{item['kind']}` — {item['count']}x ({svcs})")
lines.append("")
# ── Recurrence: 30d ───────────────────────────────────────────────────────
if rec_30d["total_incidents"] > rec_7d["total_incidents"]:
lines.append("## 📈 Recurrence (30 days)")
if rec_30d["top_signatures"][:5]:
for item in rec_30d["top_signatures"][:5]:
sig_s = mask_signature(item["signature"])
lines.append(f"- `{sig_s}` — {item['count']}x")
lines.append("")
# ── Root-Cause Buckets ────────────────────────────────────────────────────
if buckets:
lines.append("## 🪣 Top Root-Cause Buckets (7d/30d)")
for b in buckets[:8]:
bkey = b["bucket_key"]
c7 = b["counts"]["7d"]
c30 = b["counts"]["30d"]
c_open = b["counts"]["open"]
svcs = ", ".join(b.get("services", [])[:3])
last = b.get("last_seen", "")[:10]
lines.append(f"### `{bkey}`")
lines.append(f"*7d: {c7} incidents | 30d: {c30} | open: {c_open} | last: {last} | svcs: {svcs}*")
recs_b = b.get("recommendations", [])[:3]
for rec in recs_b:
lines.append(f" - {rec}")
lines.append("")
# ── Auto Follow-ups summary ────────────────────────────────────────────────
if autofollowup_result:
created = autofollowup_result.get("created", [])
skipped = autofollowup_result.get("skipped", [])
if created:
lines.append("## 🔗 Auto Follow-ups Created")
for fu in created[:10]:
lines.append(
f"- `{fu['bucket_key']}` → incident `{fu['incident_id']}` "
f"({fu['priority']}, due {fu['due_date']})"
)
lines.append("")
elif skipped:
lines.append(f"*Auto follow-ups: {len(skipped)} skipped (no high recurrence or already exists)*\n")
# ── Recommendations ───────────────────────────────────────────────────────
if recs:
lines.append("## 💡 Recommendations")
for r in recs[:10]:
icon = "🔴" if r["level"] == "high" else "🟡"
svcs = ", ".join(r.get("services", [])[:3])
lines.append(f"{icon} **[{r['category']}:{r['target']}]** {r['message']} *(count_7d={r['count_7d']}, svcs: {svcs})*")
lines.append("")
return "\n".join(lines)
def _save_digest_artifacts(
output_dir: str, week_str: str,
json_data: Dict, md: str,
) -> List[str]:
"""Atomic write of digest artifacts. Returns list of written paths."""
paths: List[str] = []
try:
out = Path(output_dir) / "weekly"
out.mkdir(parents=True, exist_ok=True)
json_path = out / f"{week_str}.json"
md_path = out / f"{week_str}.md"
# Atomic write via temp file
import tempfile
for content, dest in [(json.dumps(json_data, indent=2, default=str), json_path),
(md, md_path)]:
tmp_fd, tmp_path = tempfile.mkstemp(dir=out, suffix=".tmp")
try:
with os.fdopen(tmp_fd, "w") as f:
f.write(content)
os.replace(tmp_path, dest)
paths.append(str(dest))
except Exception:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
except Exception as e:
logger.error("Failed to save digest artifacts: %s", e)
return paths
# ─── Root-Cause Buckets ───────────────────────────────────────────────────────
_KIND_RECS: Dict[str, List[str]] = {
"error_rate": [
"Add regression test for API contract & error mapping",
"Review recent deploy diffs and dependency changes",
"Add/adjust SLO thresholds & alert routing",
],
"slo_breach": [
"Add regression test for API contract & error mapping",
"Review recent deploy diffs and dependency changes",
"Add/adjust SLO thresholds & alert routing",
],
"latency": [
"Check p95 vs saturation; add perf budget",
"Investigate DB/queue contention",
],
"oom": [
"Add memory profiling; set container limits; audit for leaks",
],
"crashloop": [
"Add memory profiling; set container limits; audit for leaks",
"Check liveness/readiness probe configuration",
],
"disk": [
"Add retention/cleanup automation; verify volume provisioning",
],
"security": [
"Run dependency scanner + rotate secrets; verify gateway allowlists",
],
"queue": [
"Check consumer lag and partition count; add dead-letter queue",
],
"network": [
"Audit DNS configuration; verify network policies and ACLs",
],
}
_DEFAULT_KIND_RECS = [
"Review incident timeline and add regression test",
"Check deployment diffs for correlated changes",
]
def bucket_recommendations(bucket: Dict) -> List[str]:
"""Deterministic per-bucket recommendations based on kind + open_count."""
kinds = bucket.get("kinds", set())
recs: List[str] = []
seen: set = set()
for kind in kinds:
for r in _KIND_RECS.get(kind, []):
if r not in seen:
recs.append(r)
seen.add(r)
if not recs:
recs = list(_DEFAULT_KIND_RECS)
# Actionable warning if there are open incidents
if bucket.get("counts", {}).get("open", 0) > 0:
recs.append("⚠ Do not deploy risky changes until open incidents are mitigated")
return recs[:5]
def build_root_cause_buckets(
incidents: List[Dict],
policy: Optional[Dict] = None,
windows: Optional[List[int]] = None,
) -> List[Dict]:
"""
Cluster incidents into root-cause buckets by service|kind or signature_prefix.
Returns top_n buckets sorted by (count_7d desc, count_30d desc, last_seen desc).
Only buckets meeting min_count thresholds are returned.
"""
if policy is None:
policy = load_intel_policy()
if windows is None:
windows = [7, 30]
buck_cfg = policy.get("buckets", {})
mode = buck_cfg.get("mode", "service_kind")
prefix_len = int(buck_cfg.get("signature_prefix_len", 12))
top_n = int(buck_cfg.get("top_n", 10))
min_count = buck_cfg.get("min_count", {"7": 3, "30": 6})
# normalize keys to int
min_7 = int(min_count.get(7, min_count.get("7", 3)))
min_30 = int(min_count.get(30, min_count.get("30", 6)))
now = datetime.datetime.utcnow()
cutoffs: Dict[int, str] = {
w: (now - datetime.timedelta(days=w)).isoformat() for w in windows
}
# Build bucket map
buckets: Dict[str, Dict] = {}
for inc in incidents:
fields = incident_key_fields(inc)
sig = fields["signature"]
kind = fields["kind"]
svc = fields["service"]
started = fields["started_at"]
status = fields["status"]
sev = fields["severity"]
if mode == "signature_prefix" and sig:
bkey = sig[:prefix_len]
else:
bkey = f"{svc}|{kind}"
if bkey not in buckets:
buckets[bkey] = {
"bucket_key": bkey,
"counts": {"open": 0},
"last_seen": "",
"first_seen": started,
"services": set(),
"kinds": set(),
"sig_counts": defaultdict(int),
"sev_mix": defaultdict(int),
"sample_incidents": [],
}
b = buckets[bkey]
b["services"].add(svc)
b["kinds"].add(kind)
if sig:
b["sig_counts"][sig] += 1
if started > b["last_seen"]:
b["last_seen"] = started
if started < b["first_seen"] or not b["first_seen"]:
b["first_seen"] = started
b["sev_mix"][sev] += 1
if status in ("open", "mitigating"):
b["counts"]["open"] += 1
# Count by window
for w, cutoff in cutoffs.items():
key_w = f"{w}d"
if started >= cutoff:
b["counts"][key_w] = b["counts"].get(key_w, 0) + 1
# Keep up to 5 sample incidents
if len(b["sample_incidents"]) < 5:
b["sample_incidents"].append({
"id": inc.get("id", ""),
"started_at": started,
"status": status,
"title": safe_truncate(inc.get("title", ""), 80),
})
# Serialize and filter
result = []
for bkey, b in buckets.items():
count_7d = b["counts"].get("7d", 0)
count_30d = b["counts"].get("30d", 0)
if count_7d < min_7 and count_30d < min_30:
continue
top_sigs = sorted(
[{"signature": mask_signature(s), "count": c}
for s, c in b["sig_counts"].items()],
key=lambda x: -x["count"],
)[:5]
recs_data = bucket_recommendations({
"kinds": b["kinds"],
"counts": b["counts"],
})
result.append({
"bucket_key": bkey,
"counts": {
"7d": count_7d,
"30d": count_30d,
"open": b["counts"]["open"],
},
"last_seen": b["last_seen"],
"first_seen": b["first_seen"],
"services": sorted(b["services"]),
"kinds": sorted(b["kinds"]),
"top_signatures": top_sigs,
"severity_mix": dict(b["sev_mix"]),
"sample_incidents": sorted(
b["sample_incidents"], key=lambda x: x["started_at"], reverse=True
)[:5],
"recommendations": recs_data,
})
# Sort: count_7d desc, then count_30d desc, then last_seen desc
result.sort(key=lambda x: (-x["counts"]["7d"], -x["counts"]["30d"], x["last_seen"]), reverse=False)
result.sort(key=lambda x: (-x["counts"]["7d"], -x["counts"]["30d"]))
return result[:top_n]
# ─── Auto Follow-ups ─────────────────────────────────────────────────────────
def _followup_dedupe_key(policy: Dict, week_str: str, bucket_key: str) -> str:
prefix = policy.get("autofollowups", {}).get("dedupe_key_prefix", "intel_recur")
return f"{prefix}:{week_str}:{bucket_key}"
def create_autofollowups(
buckets: List[Dict],
rec_7d: Dict,
policy: Optional[Dict] = None,
store = None,
week_str: Optional[str] = None,
) -> Dict:
"""
For each high-recurrence bucket, append a follow-up event to the most recent
open incident in that bucket (deterministic dedupe by week+bucket_key).
Returns {created: [...], skipped: [...]}
"""
if policy is None:
policy = load_intel_policy()
if store is None:
from incident_store import get_incident_store
store = get_incident_store()
if week_str is None:
week_str = datetime.datetime.utcnow().strftime("%Y-W%W")
af_cfg = policy.get("autofollowups", {})
if not af_cfg.get("enabled", True):
return {"created": [], "skipped": [{"reason": "disabled"}]}
only_when_high = bool(af_cfg.get("only_when_high", True))
owner = af_cfg.get("owner", "oncall")
priority = af_cfg.get("priority", "P1")
due_days = int(af_cfg.get("due_days", 7))
# Determine threshold thresholds
rec_cfg = policy.get("recurrence", {})
sig_high_thresh = int(rec_cfg.get("thresholds", {}).get("signature", {}).get("high", 6))
kind_high_thresh = int(rec_cfg.get("thresholds", {}).get("kind", {}).get("high", 10))
# Which signatures/kinds are "high"
high_sigs = {s["signature"] for s in rec_7d.get("high_recurrence", {}).get("signatures", [])}
high_kinds = {k["kind"] for k in rec_7d.get("high_recurrence", {}).get("kinds", [])}
created: List[Dict] = []
skipped: List[Dict] = []
due_date = (datetime.datetime.utcnow() + datetime.timedelta(days=due_days)).isoformat()[:10]
for bucket in buckets:
bkey = bucket["bucket_key"]
count_7d = bucket["counts"]["7d"]
# Check if this bucket is "high"
is_high = False
# by signature match
for ts in bucket.get("top_signatures", []):
if ts.get("signature", "") in high_sigs:
is_high = True
break
# by kind match
if not is_high:
for kind in bucket.get("kinds", []):
if kind in high_kinds:
is_high = True
break
# by count threshold
if not is_high and count_7d >= sig_high_thresh:
is_high = True
if only_when_high and not is_high:
skipped.append({"bucket_key": bkey, "reason": "not_high", "count_7d": count_7d})
continue
dedupe_key = _followup_dedupe_key(policy, week_str, bkey)
# Find anchor incident: most recent sample
samples = sorted(
bucket.get("sample_incidents", []),
key=lambda x: x.get("started_at", ""),
reverse=True,
)
if not samples:
skipped.append({"bucket_key": bkey, "reason": "no_incidents"})
continue
anchor_id = samples[0]["id"]
if not anchor_id:
skipped.append({"bucket_key": bkey, "reason": "no_anchor_id"})
continue
# Dedupe check: does anchor already have a followup with this key?
try:
existing_events = store.get_events(anchor_id, limit=100)
for ev in existing_events:
ev_meta = ev.get("meta") or {}
if ev_meta.get("dedupe_key") == dedupe_key:
skipped.append({
"bucket_key": bkey, "reason": "already_exists",
"incident_id": anchor_id, "dedupe_key": dedupe_key,
})
break
else:
# Create follow-up event
msg = (
f"[intel] Recurrence high: {bkey} "
f"(7d={count_7d}, 30d={bucket['counts']['30d']}, "
f"kinds={','.join(sorted(bucket.get('kinds', []))[:3])})"
)
store.append_event(
anchor_id, "followup",
safe_truncate(msg, 2000),
meta={
"title": f"[intel] Recurrence high: {bkey}",
"owner": owner,
"priority": priority,
"due_date": due_date,
"dedupe_key": dedupe_key,
"auto_created": True,
"bucket_key": bkey,
"count_7d": count_7d,
},
)
created.append({
"bucket_key": bkey,
"incident_id": anchor_id,
"dedupe_key": dedupe_key,
"priority": priority,
"due_date": due_date,
})
except Exception as e:
logger.warning("autofollowup failed for bucket %s: %s", bkey, e)
skipped.append({"bucket_key": bkey, "reason": f"error: {e}"})
return {"created": created, "skipped": skipped}
# ─── Recurrence-Watch helper (for release gate) ───────────────────────────────
def recurrence_for_service(
service: str,
window_days: int = 7,
policy: Optional[Dict] = None,
store = None,
) -> Dict:
"""
Focused recurrence analysis for a single service.
Returns the same shape as detect_recurrence but filtered to service.
"""
if policy is None:
policy = load_intel_policy()
if store is None:
from incident_store import get_incident_store
store = get_incident_store()
rec_cfg = policy.get("recurrence", {})
thresholds = rec_cfg.get("thresholds", {})
sig_thresh = thresholds.get("signature", {"warn": 3, "high": 6})
kind_thresh = thresholds.get("kind", {"warn": 5, "high": 10})
top_n = int(rec_cfg.get("top_n", 15))
incidents = _incidents_in_window(store, window_days)
if service:
incidents = [i for i in incidents if i.get("service", "") == service]
# Frequency tables for this service only
sig_count: Dict[str, Dict] = {}
kind_count: Dict[str, Dict] = {}
from collections import defaultdict as _defdict
sev_count: Dict[str, int] = _defdict(int)
for inc in incidents:
fields = incident_key_fields(inc)
sig = fields["signature"]
kind = fields["kind"]
sev = fields["severity"]
started_at = fields["started_at"]
sev_count[sev] += 1
if sig:
if sig not in sig_count:
sig_count[sig] = {"count": 0, "services": {service}, "last_seen": "",
"severity_min": sev}
sig_count[sig]["count"] += 1
if started_at > sig_count[sig]["last_seen"]:
sig_count[sig]["last_seen"] = started_at
if kind and kind != "custom":
if kind not in kind_count:
kind_count[kind] = {"count": 0, "services": {service}}
kind_count[kind]["count"] += 1
top_sigs = sorted(
[{"signature": k, "count": v["count"], "services": sorted(v["services"]),
"last_seen": v["last_seen"], "severity_min": v["severity_min"]}
for k, v in sig_count.items()],
key=lambda x: -x["count"],
)[:top_n]
top_kinds = sorted(
[{"kind": k, "count": v["count"], "services": sorted(v["services"])}
for k, v in kind_count.items()],
key=lambda x: -x["count"],
)[:top_n]
high_sigs = [s for s in top_sigs if s["count"] >= sig_thresh.get("high", 6)]
warn_sigs = [s for s in top_sigs
if sig_thresh.get("warn", 3) <= s["count"] < sig_thresh.get("high", 6)]
high_kinds = [k for k in top_kinds if k["count"] >= kind_thresh.get("high", 10)]
warn_kinds = [k for k in top_kinds
if kind_thresh.get("warn", 5) <= k["count"] < kind_thresh.get("high", 10)]
# Determine max severity seen in high-recurrence bucket
max_sev = "P3"
for inc in incidents:
sev = inc.get("severity", "P3")
if severity_rank(sev) < severity_rank(max_sev):
max_sev = sev
return {
"service": service,
"window_days": window_days,
"total_incidents": len(incidents),
"severity_distribution": dict(sev_count),
"max_severity_seen": max_sev,
"top_signatures": top_sigs,
"top_kinds": top_kinds,
"high_recurrence": {"signatures": high_sigs, "kinds": high_kinds},
"warn_recurrence": {"signatures": warn_sigs, "kinds": warn_kinds},
}