New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
1150 lines
43 KiB
Python
1150 lines
43 KiB
Python
"""
|
|
incident_intelligence.py — Incident Intelligence Layer (deterministic, no LLM).
|
|
|
|
Functions:
|
|
correlate_incident(incident_id, policy, store) -> related[]
|
|
detect_recurrence(window_days, policy, store) -> stats
|
|
weekly_digest(policy, store) -> {json, markdown}
|
|
|
|
Policy: config/incident_intelligence_policy.yml
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import textwrap
|
|
import yaml
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from incident_intel_utils import (
|
|
extract_kind,
|
|
incident_key_fields,
|
|
incidents_within_minutes,
|
|
mask_signature,
|
|
safe_truncate,
|
|
severity_rank,
|
|
format_duration,
|
|
parse_iso,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ─── Policy ───────────────────────────────────────────────────────────────────
|
|
|
|
_POLICY_CACHE: Optional[Dict] = None
|
|
_POLICY_SEARCH_PATHS = [
|
|
Path("config/incident_intelligence_policy.yml"),
|
|
Path(__file__).resolve().parent.parent.parent / "config" / "incident_intelligence_policy.yml",
|
|
]
|
|
|
|
|
|
def load_intel_policy() -> Dict:
|
|
global _POLICY_CACHE
|
|
if _POLICY_CACHE is not None:
|
|
return _POLICY_CACHE
|
|
for p in _POLICY_SEARCH_PATHS:
|
|
if p.exists():
|
|
try:
|
|
with open(p) as f:
|
|
data = yaml.safe_load(f) or {}
|
|
_POLICY_CACHE = data
|
|
return data
|
|
except Exception as e:
|
|
logger.warning("Failed to load intel policy from %s: %s", p, e)
|
|
logger.warning("incident_intelligence_policy.yml not found; using defaults")
|
|
_POLICY_CACHE = _builtin_defaults()
|
|
return _POLICY_CACHE
|
|
|
|
|
|
def _builtin_defaults() -> Dict:
|
|
return {
|
|
"correlation": {
|
|
"lookback_days": 30,
|
|
"max_related": 10,
|
|
"min_score": 20,
|
|
"rules": [
|
|
{"name": "same_signature", "weight": 100, "match": {"signature": True}},
|
|
{"name": "same_service_and_kind", "weight": 60,
|
|
"match": {"same_service": True, "same_kind": True}},
|
|
{"name": "same_service_time_cluster", "weight": 40,
|
|
"match": {"same_service": True, "within_minutes": 180}},
|
|
{"name": "same_kind_cross_service", "weight": 30,
|
|
"match": {"same_kind": True, "within_minutes": 120}},
|
|
],
|
|
},
|
|
"recurrence": {
|
|
"windows_days": [7, 30],
|
|
"thresholds": {
|
|
"signature": {"warn": 3, "high": 6},
|
|
"kind": {"warn": 5, "high": 10},
|
|
},
|
|
"top_n": 15,
|
|
"recommendations": {
|
|
"signature_high": "Create permanent fix: add regression test + SLO guard",
|
|
"signature_warn": "Review root cause history; consider monitoring threshold",
|
|
"kind_high": "Systemic issue with kind={kind}: review architecture",
|
|
"kind_warn": "Recurring kind={kind}: validate alert thresholds",
|
|
},
|
|
},
|
|
"digest": {
|
|
"weekly_day": "Mon",
|
|
"include_closed": True,
|
|
"include_open": True,
|
|
"output_dir": "ops/reports/incidents",
|
|
"markdown_max_chars": 8000,
|
|
"top_incidents": 20,
|
|
},
|
|
}
|
|
|
|
|
|
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.datetime.utcnow().isoformat()
|
|
|
|
|
|
def _lookback_cutoff(days: int) -> str:
|
|
return (datetime.datetime.utcnow() - datetime.timedelta(days=days)).isoformat()
|
|
|
|
|
|
def _incidents_in_window(store, days: int, limit: int = 1000) -> List[Dict]:
|
|
"""Load all incidents (open+closed) in last N days."""
|
|
cutoff = _lookback_cutoff(days)
|
|
all_incs: List[Dict] = []
|
|
for status in ("open", "mitigating", "closed", "resolved"):
|
|
batch = store.list_incidents({"status": status}, limit=limit)
|
|
all_incs.extend(i for i in batch if i.get("started_at", "") >= cutoff)
|
|
seen = set()
|
|
unique = []
|
|
for i in all_incs:
|
|
if i["id"] not in seen:
|
|
seen.add(i["id"])
|
|
unique.append(i)
|
|
return unique
|
|
|
|
|
|
# ─── 1. Correlation ───────────────────────────────────────────────────────────
|
|
|
|
def correlate_incident(
|
|
incident_id: str,
|
|
policy: Optional[Dict] = None,
|
|
store = None,
|
|
append_note: bool = False,
|
|
) -> List[Dict]:
|
|
"""
|
|
Find related incidents for a given incident_id using scored matching.
|
|
Returns list sorted by score desc (highest relevance first).
|
|
"""
|
|
if policy is None:
|
|
policy = load_intel_policy()
|
|
if store is None:
|
|
from incident_store import get_incident_store
|
|
store = get_incident_store()
|
|
|
|
corr_cfg = policy.get("correlation", {})
|
|
lookback_days = int(corr_cfg.get("lookback_days", 30))
|
|
max_related = int(corr_cfg.get("max_related", 10))
|
|
min_score = int(corr_cfg.get("min_score", 20))
|
|
rules = corr_cfg.get("rules", [])
|
|
|
|
target_raw = store.get_incident(incident_id)
|
|
if not target_raw:
|
|
return []
|
|
|
|
target = incident_key_fields(target_raw)
|
|
candidates = _incidents_in_window(store, lookback_days, limit=500)
|
|
|
|
scored: List[Dict] = []
|
|
for cand_raw in candidates:
|
|
cid = cand_raw.get("id", "")
|
|
if cid == incident_id:
|
|
continue
|
|
cand = incident_key_fields(cand_raw)
|
|
score, reasons = _score_pair(target, cand, target_raw, cand_raw, rules)
|
|
if score >= min_score:
|
|
scored.append({
|
|
"incident_id": cid,
|
|
"score": score,
|
|
"reasons": reasons,
|
|
"service": cand["service"],
|
|
"kind": cand["kind"],
|
|
"severity": cand["severity"],
|
|
"status": cand["status"],
|
|
"started_at": cand["started_at"],
|
|
"signature": mask_signature(cand["signature"]),
|
|
})
|
|
|
|
scored.sort(key=lambda x: (-x["score"], x["started_at"]))
|
|
related = scored[:max_related]
|
|
|
|
# Optionally append correlation note to incident timeline
|
|
if append_note and related:
|
|
note_parts = [f"`{r['incident_id']}` score={r['score']} ({', '.join(r['reasons'])})"
|
|
for r in related[:5]]
|
|
note = "Related incidents: " + "; ".join(note_parts)
|
|
try:
|
|
store.append_event(incident_id, "note", safe_truncate(note, 2000),
|
|
meta={"auto_created": True, "related_count": len(related)})
|
|
except Exception as e:
|
|
logger.warning("correlate_incident: append_note failed: %s", e)
|
|
|
|
return related
|
|
|
|
|
|
def _score_pair(
|
|
target: Dict, cand: Dict,
|
|
target_raw: Dict, cand_raw: Dict,
|
|
rules: List[Dict],
|
|
) -> Tuple[int, List[str]]:
|
|
"""Compute correlation score and matching reasons for a candidate pair."""
|
|
score = 0
|
|
reasons: List[str] = []
|
|
|
|
t_sig = target.get("signature", "")
|
|
c_sig = cand.get("signature", "")
|
|
t_svc = target.get("service", "")
|
|
c_svc = cand.get("service", "")
|
|
t_kind = target.get("kind", "")
|
|
c_kind = cand.get("kind", "")
|
|
t_start = target.get("started_at", "")
|
|
c_start = cand.get("started_at", "")
|
|
|
|
for rule in rules:
|
|
m = rule.get("match", {})
|
|
w = int(rule.get("weight", 0))
|
|
name = rule.get("name", "rule")
|
|
|
|
# Signature-only rule: only fires when signatures are equal; skip otherwise.
|
|
if "signature" in m:
|
|
if t_sig and c_sig and t_sig == c_sig:
|
|
score += w
|
|
reasons.append(name)
|
|
continue # never fall through to combined-conditions for this rule
|
|
|
|
# Combined conditions (same_service / same_kind / within_minutes)
|
|
within = m.get("within_minutes")
|
|
matched = True
|
|
|
|
if m.get("same_service"):
|
|
if t_svc != c_svc:
|
|
matched = False
|
|
|
|
if m.get("same_kind"):
|
|
if t_kind != c_kind or not t_kind or t_kind == "custom":
|
|
matched = False
|
|
|
|
if within is not None:
|
|
if not incidents_within_minutes(target_raw, cand_raw, within):
|
|
matched = False
|
|
|
|
if matched:
|
|
score += w
|
|
reasons.append(name)
|
|
|
|
return score, reasons
|
|
|
|
|
|
# ─── 2. Recurrence Detection ──────────────────────────────────────────────────
|
|
|
|
def detect_recurrence(
|
|
window_days: int = 7,
|
|
policy: Optional[Dict] = None,
|
|
store = None,
|
|
) -> Dict:
|
|
"""
|
|
Analyze incident frequency for given window.
|
|
Returns frequency tables and threshold classifications.
|
|
"""
|
|
if policy is None:
|
|
policy = load_intel_policy()
|
|
if store is None:
|
|
from incident_store import get_incident_store
|
|
store = get_incident_store()
|
|
|
|
rec_cfg = policy.get("recurrence", {})
|
|
thresholds = rec_cfg.get("thresholds", {})
|
|
sig_thresh = thresholds.get("signature", {"warn": 3, "high": 6})
|
|
kind_thresh = thresholds.get("kind", {"warn": 5, "high": 10})
|
|
top_n = int(rec_cfg.get("top_n", 15))
|
|
|
|
incidents = _incidents_in_window(store, window_days)
|
|
|
|
# Frequency tables
|
|
sig_count: Dict[str, Dict] = {} # signature → {count, services, last_seen, severity_min}
|
|
kind_count: Dict[str, Dict] = {} # kind → {count, services}
|
|
svc_count: Dict[str, int] = defaultdict(int)
|
|
sev_count: Dict[str, int] = defaultdict(int)
|
|
open_count = 0
|
|
closed_count = 0
|
|
|
|
for inc in incidents:
|
|
fields = incident_key_fields(inc)
|
|
sig = fields["signature"]
|
|
kind = fields["kind"]
|
|
svc = fields["service"]
|
|
sev = fields["severity"]
|
|
status = fields["status"]
|
|
started_at = fields["started_at"]
|
|
|
|
svc_count[svc] += 1
|
|
sev_count[sev] += 1
|
|
if status in ("open", "mitigating"):
|
|
open_count += 1
|
|
else:
|
|
closed_count += 1
|
|
|
|
if sig:
|
|
if sig not in sig_count:
|
|
sig_count[sig] = {"count": 0, "services": set(), "last_seen": "",
|
|
"severity_min": sev}
|
|
sig_count[sig]["count"] += 1
|
|
sig_count[sig]["services"].add(svc)
|
|
if started_at > sig_count[sig]["last_seen"]:
|
|
sig_count[sig]["last_seen"] = started_at
|
|
if severity_rank(sev) < severity_rank(sig_count[sig]["severity_min"]):
|
|
sig_count[sig]["severity_min"] = sev
|
|
|
|
if kind and kind != "custom":
|
|
if kind not in kind_count:
|
|
kind_count[kind] = {"count": 0, "services": set()}
|
|
kind_count[kind]["count"] += 1
|
|
kind_count[kind]["services"].add(svc)
|
|
|
|
# Serialize sets
|
|
top_sigs = sorted(
|
|
[{"signature": k, "count": v["count"],
|
|
"services": sorted(v["services"]),
|
|
"last_seen": v["last_seen"],
|
|
"severity_min": v["severity_min"]}
|
|
for k, v in sig_count.items()],
|
|
key=lambda x: -x["count"],
|
|
)[:top_n]
|
|
|
|
top_kinds = sorted(
|
|
[{"kind": k, "count": v["count"], "services": sorted(v["services"])}
|
|
for k, v in kind_count.items()],
|
|
key=lambda x: -x["count"],
|
|
)[:top_n]
|
|
|
|
top_services = sorted(
|
|
[{"service": k, "count": v} for k, v in svc_count.items()],
|
|
key=lambda x: -x["count"],
|
|
)[:top_n]
|
|
|
|
# Classify high recurrence
|
|
high_sigs = [s for s in top_sigs if s["count"] >= sig_thresh.get("high", 6)]
|
|
warn_sigs = [s for s in top_sigs
|
|
if sig_thresh.get("warn", 3) <= s["count"] < sig_thresh.get("high", 6)]
|
|
high_kinds = [k for k in top_kinds if k["count"] >= kind_thresh.get("high", 10)]
|
|
warn_kinds = [k for k in top_kinds
|
|
if kind_thresh.get("warn", 5) <= k["count"] < kind_thresh.get("high", 10)]
|
|
|
|
return {
|
|
"window_days": window_days,
|
|
"total_incidents": len(incidents),
|
|
"open_count": open_count,
|
|
"closed_count": closed_count,
|
|
"severity_distribution": dict(sev_count),
|
|
"top_signatures": top_sigs,
|
|
"top_kinds": top_kinds,
|
|
"top_services": top_services,
|
|
"high_recurrence": {
|
|
"signatures": high_sigs,
|
|
"kinds": high_kinds,
|
|
},
|
|
"warn_recurrence": {
|
|
"signatures": warn_sigs,
|
|
"kinds": warn_kinds,
|
|
},
|
|
}
|
|
|
|
|
|
# ─── 3. Weekly Digest ─────────────────────────────────────────────────────────
|
|
|
|
def weekly_digest(
|
|
policy: Optional[Dict] = None,
|
|
store = None,
|
|
save_artifacts: bool = True,
|
|
) -> Dict:
|
|
"""
|
|
Generate weekly incident digest: markdown + JSON.
|
|
Saves to output_dir if save_artifacts=True.
|
|
Returns {markdown, json_data, artifact_paths}.
|
|
"""
|
|
if policy is None:
|
|
policy = load_intel_policy()
|
|
if store is None:
|
|
from incident_store import get_incident_store
|
|
store = get_incident_store()
|
|
|
|
digest_cfg = policy.get("digest", {})
|
|
max_chars = int(digest_cfg.get("markdown_max_chars", 8000))
|
|
top_n_inc = int(digest_cfg.get("top_incidents", 20))
|
|
output_dir = digest_cfg.get("output_dir", "ops/reports/incidents")
|
|
|
|
now = datetime.datetime.utcnow()
|
|
week_str = now.strftime("%Y-W%W")
|
|
ts_str = now.strftime("%Y-%m-%d %H:%M UTC")
|
|
|
|
# ── Collect data ──────────────────────────────────────────────────────────
|
|
rec_7d = detect_recurrence(window_days=7, policy=policy, store=store)
|
|
rec_30d = detect_recurrence(window_days=30, policy=policy, store=store)
|
|
|
|
# Open incidents
|
|
open_incs = store.list_incidents({"status": "open"}, limit=100)
|
|
mitigating = store.list_incidents({"status": "mitigating"}, limit=50)
|
|
all_open = open_incs + mitigating
|
|
all_open.sort(key=lambda i: severity_rank(i.get("severity", "P3")))
|
|
|
|
# Last 7d incidents (sorted by severity then started_at)
|
|
recent = _incidents_in_window(store, 7, limit=top_n_inc * 2)
|
|
recent.sort(key=lambda i: (severity_rank(i.get("severity", "P3")), i.get("started_at", "")))
|
|
recent = recent[:top_n_inc]
|
|
|
|
# ── Root-cause buckets ────────────────────────────────────────────────────
|
|
all_30d = _incidents_in_window(store, 30, limit=1000)
|
|
buckets = build_root_cause_buckets(all_30d, policy=policy, windows=[7, 30])
|
|
buck_cfg = policy.get("buckets", {})
|
|
sig_high_thresh = int(policy.get("recurrence", {}).get("thresholds", {})
|
|
.get("signature", {}).get("high", 6))
|
|
kind_high_thresh = int(policy.get("recurrence", {}).get("thresholds", {})
|
|
.get("kind", {}).get("high", 10))
|
|
high_buckets = [
|
|
b for b in buckets
|
|
if b["counts"]["7d"] >= sig_high_thresh
|
|
or any(k in {ki["kind"] for ki in rec_7d.get("high_recurrence", {}).get("kinds", [])}
|
|
for k in b.get("kinds", []))
|
|
]
|
|
|
|
# ── Auto follow-ups ───────────────────────────────────────────────────────
|
|
autofollowup_result: Dict = {"created": [], "skipped": []}
|
|
if policy.get("autofollowups", {}).get("enabled", True):
|
|
try:
|
|
autofollowup_result = create_autofollowups(
|
|
buckets=buckets,
|
|
rec_7d=rec_7d,
|
|
policy=policy,
|
|
store=store,
|
|
week_str=week_str,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("weekly_digest: autofollowups error (non-fatal): %s", e)
|
|
|
|
# ── Build recommendations ─────────────────────────────────────────────────
|
|
recs = _build_recommendations(rec_7d, rec_30d, policy)
|
|
|
|
# ── Build JSON payload ────────────────────────────────────────────────────
|
|
json_data = {
|
|
"generated_at": _now_iso(),
|
|
"week": week_str,
|
|
"open_incidents_count": len(all_open),
|
|
"recent_7d_count": rec_7d["total_incidents"],
|
|
"recent_30d_count": rec_30d["total_incidents"],
|
|
"recurrence_7d": rec_7d,
|
|
"recurrence_30d": rec_30d,
|
|
"open_incidents": [_inc_summary(i) for i in all_open[:20]],
|
|
"recent_incidents": [_inc_summary(i) for i in recent],
|
|
"recommendations": recs,
|
|
"buckets": {
|
|
"top": buckets,
|
|
"high": high_buckets,
|
|
},
|
|
"autofollowups": autofollowup_result,
|
|
}
|
|
|
|
# ── Build Markdown ────────────────────────────────────────────────────────
|
|
md = _build_markdown(
|
|
week_str=week_str,
|
|
ts_str=ts_str,
|
|
all_open=all_open,
|
|
recent=recent,
|
|
rec_7d=rec_7d,
|
|
rec_30d=rec_30d,
|
|
recs=recs,
|
|
buckets=buckets,
|
|
autofollowup_result=autofollowup_result,
|
|
)
|
|
|
|
if len(md) > max_chars:
|
|
md = md[:max_chars - 80] + f"\n\n… *(digest truncated at {max_chars} chars)*"
|
|
|
|
# ── Save artifacts ────────────────────────────────────────────────────────
|
|
artifact_paths: List[str] = []
|
|
if save_artifacts:
|
|
artifact_paths = _save_digest_artifacts(output_dir, week_str, json_data, md)
|
|
|
|
return {
|
|
"markdown": md,
|
|
"json_data": json_data,
|
|
"artifact_paths": artifact_paths,
|
|
"week": week_str,
|
|
}
|
|
|
|
|
|
def _inc_summary(inc: Dict) -> Dict:
|
|
return {
|
|
"id": inc.get("id", ""),
|
|
"service": inc.get("service", ""),
|
|
"env": inc.get("env", "prod"),
|
|
"severity": inc.get("severity", "P2"),
|
|
"status": inc.get("status", ""),
|
|
"kind": extract_kind(inc),
|
|
"title": safe_truncate(inc.get("title", ""), 120),
|
|
"started_at": inc.get("started_at", ""),
|
|
"duration": format_duration(
|
|
inc.get("started_at", ""), inc.get("ended_at")
|
|
),
|
|
"signature": mask_signature(
|
|
(inc.get("meta") or {}).get("incident_signature", "")
|
|
),
|
|
}
|
|
|
|
|
|
def _build_recommendations(rec_7d: Dict, rec_30d: Dict, policy: Dict) -> List[Dict]:
|
|
recs_cfg = policy.get("recurrence", {}).get("recommendations", {})
|
|
recs: List[Dict] = []
|
|
|
|
for sig_item in rec_7d.get("high_recurrence", {}).get("signatures", []):
|
|
sig_short = mask_signature(sig_item["signature"])
|
|
msg_tmpl = recs_cfg.get("signature_high",
|
|
"Create permanent fix for signature {sig}")
|
|
recs.append({
|
|
"level": "high",
|
|
"category": "signature",
|
|
"target": sig_short,
|
|
"services": sig_item.get("services", []),
|
|
"count_7d": sig_item.get("count", 0),
|
|
"message": msg_tmpl.format(sig=sig_short, **sig_item),
|
|
})
|
|
|
|
for sig_item in rec_7d.get("warn_recurrence", {}).get("signatures", []):
|
|
sig_short = mask_signature(sig_item["signature"])
|
|
msg_tmpl = recs_cfg.get("signature_warn",
|
|
"Review root cause for signature {sig}")
|
|
recs.append({
|
|
"level": "warn",
|
|
"category": "signature",
|
|
"target": sig_short,
|
|
"services": sig_item.get("services", []),
|
|
"count_7d": sig_item.get("count", 0),
|
|
"message": msg_tmpl.format(sig=sig_short, **sig_item),
|
|
})
|
|
|
|
for kind_item in rec_7d.get("high_recurrence", {}).get("kinds", []):
|
|
kind = kind_item.get("kind", "?")
|
|
msg_tmpl = recs_cfg.get("kind_high", "Systemic issue with {kind}")
|
|
recs.append({
|
|
"level": "high",
|
|
"category": "kind",
|
|
"target": kind,
|
|
"services": kind_item.get("services", []),
|
|
"count_7d": kind_item.get("count", 0),
|
|
"message": msg_tmpl.format(kind=kind),
|
|
})
|
|
|
|
for kind_item in rec_7d.get("warn_recurrence", {}).get("kinds", []):
|
|
kind = kind_item.get("kind", "?")
|
|
msg_tmpl = recs_cfg.get("kind_warn", "Recurring {kind}")
|
|
recs.append({
|
|
"level": "warn",
|
|
"category": "kind",
|
|
"target": kind,
|
|
"services": kind_item.get("services", []),
|
|
"count_7d": kind_item.get("count", 0),
|
|
"message": msg_tmpl.format(kind=kind),
|
|
})
|
|
|
|
return recs
|
|
|
|
|
|
def _build_markdown(
|
|
week_str: str, ts_str: str,
|
|
all_open: List[Dict], recent: List[Dict],
|
|
rec_7d: Dict, rec_30d: Dict,
|
|
recs: List[Dict],
|
|
buckets: Optional[List[Dict]] = None,
|
|
autofollowup_result: Optional[Dict] = None,
|
|
) -> str:
|
|
lines = [
|
|
f"# Weekly Incident Digest — {week_str}",
|
|
f"*Generated: {ts_str}*",
|
|
"",
|
|
"---",
|
|
"",
|
|
f"## Summary",
|
|
f"| Metric | Value |",
|
|
f"|--------|-------|",
|
|
f"| Open incidents | {len(all_open)} |",
|
|
f"| Incidents (7d) | {rec_7d['total_incidents']} |",
|
|
f"| Incidents (30d) | {rec_30d['total_incidents']} |",
|
|
"",
|
|
]
|
|
|
|
# ── Open incidents ─────────────────────────────────────────────────────────
|
|
if all_open:
|
|
lines.append("## 🔴 Open Incidents")
|
|
for inc in all_open[:10]:
|
|
sev = inc.get("severity", "?")
|
|
svc = inc.get("service", "?")
|
|
title = safe_truncate(inc.get("title", ""), 80)
|
|
dur = format_duration(inc.get("started_at", ""), inc.get("ended_at"))
|
|
kind = extract_kind(inc)
|
|
lines.append(f"- **[{sev}]** `{inc.get('id','?')}` {svc} — {title} *(kind: {kind}, {dur})*")
|
|
if len(all_open) > 10:
|
|
lines.append(f"- … and {len(all_open) - 10} more open incidents")
|
|
lines.append("")
|
|
|
|
# ── Recent 7d ─────────────────────────────────────────────────────────────
|
|
if recent:
|
|
lines.append("## 📋 Recent Incidents (7 days)")
|
|
for inc in recent[:15]:
|
|
sev = inc.get("severity", "?")
|
|
status = inc.get("status", "?")
|
|
svc = inc.get("service", "?")
|
|
title = safe_truncate(inc.get("title", ""), 80)
|
|
dur = format_duration(inc.get("started_at", ""), inc.get("ended_at"))
|
|
kind = extract_kind(inc)
|
|
lines.append(f"- **[{sev}/{status}]** {svc} — {title} *(kind: {kind}, {dur})*")
|
|
lines.append("")
|
|
|
|
# ── Recurrence: 7d ────────────────────────────────────────────────────────
|
|
lines.append("## 🔁 Recurrence (7 days)")
|
|
if rec_7d["top_signatures"]:
|
|
lines.append("### Top Signatures")
|
|
for item in rec_7d["top_signatures"][:8]:
|
|
sig_s = mask_signature(item["signature"])
|
|
svcs = ", ".join(item.get("services", [])[:3])
|
|
lines.append(f"- `{sig_s}` — {item['count']}x ({svcs})")
|
|
if rec_7d["top_kinds"]:
|
|
lines.append("### Top Kinds")
|
|
for item in rec_7d["top_kinds"][:8]:
|
|
svcs = ", ".join(item.get("services", [])[:3])
|
|
lines.append(f"- `{item['kind']}` — {item['count']}x ({svcs})")
|
|
lines.append("")
|
|
|
|
# ── Recurrence: 30d ───────────────────────────────────────────────────────
|
|
if rec_30d["total_incidents"] > rec_7d["total_incidents"]:
|
|
lines.append("## 📈 Recurrence (30 days)")
|
|
if rec_30d["top_signatures"][:5]:
|
|
for item in rec_30d["top_signatures"][:5]:
|
|
sig_s = mask_signature(item["signature"])
|
|
lines.append(f"- `{sig_s}` — {item['count']}x")
|
|
lines.append("")
|
|
|
|
# ── Root-Cause Buckets ────────────────────────────────────────────────────
|
|
if buckets:
|
|
lines.append("## 🪣 Top Root-Cause Buckets (7d/30d)")
|
|
for b in buckets[:8]:
|
|
bkey = b["bucket_key"]
|
|
c7 = b["counts"]["7d"]
|
|
c30 = b["counts"]["30d"]
|
|
c_open = b["counts"]["open"]
|
|
svcs = ", ".join(b.get("services", [])[:3])
|
|
last = b.get("last_seen", "")[:10]
|
|
lines.append(f"### `{bkey}`")
|
|
lines.append(f"*7d: {c7} incidents | 30d: {c30} | open: {c_open} | last: {last} | svcs: {svcs}*")
|
|
recs_b = b.get("recommendations", [])[:3]
|
|
for rec in recs_b:
|
|
lines.append(f" - {rec}")
|
|
lines.append("")
|
|
|
|
# ── Auto Follow-ups summary ────────────────────────────────────────────────
|
|
if autofollowup_result:
|
|
created = autofollowup_result.get("created", [])
|
|
skipped = autofollowup_result.get("skipped", [])
|
|
if created:
|
|
lines.append("## 🔗 Auto Follow-ups Created")
|
|
for fu in created[:10]:
|
|
lines.append(
|
|
f"- `{fu['bucket_key']}` → incident `{fu['incident_id']}` "
|
|
f"({fu['priority']}, due {fu['due_date']})"
|
|
)
|
|
lines.append("")
|
|
elif skipped:
|
|
lines.append(f"*Auto follow-ups: {len(skipped)} skipped (no high recurrence or already exists)*\n")
|
|
|
|
# ── Recommendations ───────────────────────────────────────────────────────
|
|
if recs:
|
|
lines.append("## 💡 Recommendations")
|
|
for r in recs[:10]:
|
|
icon = "🔴" if r["level"] == "high" else "🟡"
|
|
svcs = ", ".join(r.get("services", [])[:3])
|
|
lines.append(f"{icon} **[{r['category']}:{r['target']}]** {r['message']} *(count_7d={r['count_7d']}, svcs: {svcs})*")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _save_digest_artifacts(
|
|
output_dir: str, week_str: str,
|
|
json_data: Dict, md: str,
|
|
) -> List[str]:
|
|
"""Atomic write of digest artifacts. Returns list of written paths."""
|
|
paths: List[str] = []
|
|
try:
|
|
out = Path(output_dir) / "weekly"
|
|
out.mkdir(parents=True, exist_ok=True)
|
|
|
|
json_path = out / f"{week_str}.json"
|
|
md_path = out / f"{week_str}.md"
|
|
|
|
# Atomic write via temp file
|
|
import tempfile
|
|
for content, dest in [(json.dumps(json_data, indent=2, default=str), json_path),
|
|
(md, md_path)]:
|
|
tmp_fd, tmp_path = tempfile.mkstemp(dir=out, suffix=".tmp")
|
|
try:
|
|
with os.fdopen(tmp_fd, "w") as f:
|
|
f.write(content)
|
|
os.replace(tmp_path, dest)
|
|
paths.append(str(dest))
|
|
except Exception:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to save digest artifacts: %s", e)
|
|
return paths
|
|
|
|
|
|
# ─── Root-Cause Buckets ───────────────────────────────────────────────────────
|
|
|
|
_KIND_RECS: Dict[str, List[str]] = {
|
|
"error_rate": [
|
|
"Add regression test for API contract & error mapping",
|
|
"Review recent deploy diffs and dependency changes",
|
|
"Add/adjust SLO thresholds & alert routing",
|
|
],
|
|
"slo_breach": [
|
|
"Add regression test for API contract & error mapping",
|
|
"Review recent deploy diffs and dependency changes",
|
|
"Add/adjust SLO thresholds & alert routing",
|
|
],
|
|
"latency": [
|
|
"Check p95 vs saturation; add perf budget",
|
|
"Investigate DB/queue contention",
|
|
],
|
|
"oom": [
|
|
"Add memory profiling; set container limits; audit for leaks",
|
|
],
|
|
"crashloop": [
|
|
"Add memory profiling; set container limits; audit for leaks",
|
|
"Check liveness/readiness probe configuration",
|
|
],
|
|
"disk": [
|
|
"Add retention/cleanup automation; verify volume provisioning",
|
|
],
|
|
"security": [
|
|
"Run dependency scanner + rotate secrets; verify gateway allowlists",
|
|
],
|
|
"queue": [
|
|
"Check consumer lag and partition count; add dead-letter queue",
|
|
],
|
|
"network": [
|
|
"Audit DNS configuration; verify network policies and ACLs",
|
|
],
|
|
}
|
|
_DEFAULT_KIND_RECS = [
|
|
"Review incident timeline and add regression test",
|
|
"Check deployment diffs for correlated changes",
|
|
]
|
|
|
|
|
|
def bucket_recommendations(bucket: Dict) -> List[str]:
|
|
"""Deterministic per-bucket recommendations based on kind + open_count."""
|
|
kinds = bucket.get("kinds", set())
|
|
recs: List[str] = []
|
|
seen: set = set()
|
|
|
|
for kind in kinds:
|
|
for r in _KIND_RECS.get(kind, []):
|
|
if r not in seen:
|
|
recs.append(r)
|
|
seen.add(r)
|
|
|
|
if not recs:
|
|
recs = list(_DEFAULT_KIND_RECS)
|
|
|
|
# Actionable warning if there are open incidents
|
|
if bucket.get("counts", {}).get("open", 0) > 0:
|
|
recs.append("⚠ Do not deploy risky changes until open incidents are mitigated")
|
|
|
|
return recs[:5]
|
|
|
|
|
|
def build_root_cause_buckets(
|
|
incidents: List[Dict],
|
|
policy: Optional[Dict] = None,
|
|
windows: Optional[List[int]] = None,
|
|
) -> List[Dict]:
|
|
"""
|
|
Cluster incidents into root-cause buckets by service|kind or signature_prefix.
|
|
|
|
Returns top_n buckets sorted by (count_7d desc, count_30d desc, last_seen desc).
|
|
Only buckets meeting min_count thresholds are returned.
|
|
"""
|
|
if policy is None:
|
|
policy = load_intel_policy()
|
|
if windows is None:
|
|
windows = [7, 30]
|
|
|
|
buck_cfg = policy.get("buckets", {})
|
|
mode = buck_cfg.get("mode", "service_kind")
|
|
prefix_len = int(buck_cfg.get("signature_prefix_len", 12))
|
|
top_n = int(buck_cfg.get("top_n", 10))
|
|
min_count = buck_cfg.get("min_count", {"7": 3, "30": 6})
|
|
# normalize keys to int
|
|
min_7 = int(min_count.get(7, min_count.get("7", 3)))
|
|
min_30 = int(min_count.get(30, min_count.get("30", 6)))
|
|
|
|
now = datetime.datetime.utcnow()
|
|
cutoffs: Dict[int, str] = {
|
|
w: (now - datetime.timedelta(days=w)).isoformat() for w in windows
|
|
}
|
|
|
|
# Build bucket map
|
|
buckets: Dict[str, Dict] = {}
|
|
|
|
for inc in incidents:
|
|
fields = incident_key_fields(inc)
|
|
sig = fields["signature"]
|
|
kind = fields["kind"]
|
|
svc = fields["service"]
|
|
started = fields["started_at"]
|
|
status = fields["status"]
|
|
sev = fields["severity"]
|
|
|
|
if mode == "signature_prefix" and sig:
|
|
bkey = sig[:prefix_len]
|
|
else:
|
|
bkey = f"{svc}|{kind}"
|
|
|
|
if bkey not in buckets:
|
|
buckets[bkey] = {
|
|
"bucket_key": bkey,
|
|
"counts": {"open": 0},
|
|
"last_seen": "",
|
|
"first_seen": started,
|
|
"services": set(),
|
|
"kinds": set(),
|
|
"sig_counts": defaultdict(int),
|
|
"sev_mix": defaultdict(int),
|
|
"sample_incidents": [],
|
|
}
|
|
|
|
b = buckets[bkey]
|
|
b["services"].add(svc)
|
|
b["kinds"].add(kind)
|
|
if sig:
|
|
b["sig_counts"][sig] += 1
|
|
|
|
if started > b["last_seen"]:
|
|
b["last_seen"] = started
|
|
if started < b["first_seen"] or not b["first_seen"]:
|
|
b["first_seen"] = started
|
|
|
|
b["sev_mix"][sev] += 1
|
|
if status in ("open", "mitigating"):
|
|
b["counts"]["open"] += 1
|
|
|
|
# Count by window
|
|
for w, cutoff in cutoffs.items():
|
|
key_w = f"{w}d"
|
|
if started >= cutoff:
|
|
b["counts"][key_w] = b["counts"].get(key_w, 0) + 1
|
|
|
|
# Keep up to 5 sample incidents
|
|
if len(b["sample_incidents"]) < 5:
|
|
b["sample_incidents"].append({
|
|
"id": inc.get("id", ""),
|
|
"started_at": started,
|
|
"status": status,
|
|
"title": safe_truncate(inc.get("title", ""), 80),
|
|
})
|
|
|
|
# Serialize and filter
|
|
result = []
|
|
for bkey, b in buckets.items():
|
|
count_7d = b["counts"].get("7d", 0)
|
|
count_30d = b["counts"].get("30d", 0)
|
|
if count_7d < min_7 and count_30d < min_30:
|
|
continue
|
|
|
|
top_sigs = sorted(
|
|
[{"signature": mask_signature(s), "count": c}
|
|
for s, c in b["sig_counts"].items()],
|
|
key=lambda x: -x["count"],
|
|
)[:5]
|
|
|
|
recs_data = bucket_recommendations({
|
|
"kinds": b["kinds"],
|
|
"counts": b["counts"],
|
|
})
|
|
|
|
result.append({
|
|
"bucket_key": bkey,
|
|
"counts": {
|
|
"7d": count_7d,
|
|
"30d": count_30d,
|
|
"open": b["counts"]["open"],
|
|
},
|
|
"last_seen": b["last_seen"],
|
|
"first_seen": b["first_seen"],
|
|
"services": sorted(b["services"]),
|
|
"kinds": sorted(b["kinds"]),
|
|
"top_signatures": top_sigs,
|
|
"severity_mix": dict(b["sev_mix"]),
|
|
"sample_incidents": sorted(
|
|
b["sample_incidents"], key=lambda x: x["started_at"], reverse=True
|
|
)[:5],
|
|
"recommendations": recs_data,
|
|
})
|
|
|
|
# Sort: count_7d desc, then count_30d desc, then last_seen desc
|
|
result.sort(key=lambda x: (-x["counts"]["7d"], -x["counts"]["30d"], x["last_seen"]), reverse=False)
|
|
result.sort(key=lambda x: (-x["counts"]["7d"], -x["counts"]["30d"]))
|
|
return result[:top_n]
|
|
|
|
|
|
# ─── Auto Follow-ups ─────────────────────────────────────────────────────────
|
|
|
|
def _followup_dedupe_key(policy: Dict, week_str: str, bucket_key: str) -> str:
|
|
prefix = policy.get("autofollowups", {}).get("dedupe_key_prefix", "intel_recur")
|
|
return f"{prefix}:{week_str}:{bucket_key}"
|
|
|
|
|
|
def create_autofollowups(
|
|
buckets: List[Dict],
|
|
rec_7d: Dict,
|
|
policy: Optional[Dict] = None,
|
|
store = None,
|
|
week_str: Optional[str] = None,
|
|
) -> Dict:
|
|
"""
|
|
For each high-recurrence bucket, append a follow-up event to the most recent
|
|
open incident in that bucket (deterministic dedupe by week+bucket_key).
|
|
|
|
Returns {created: [...], skipped: [...]}
|
|
"""
|
|
if policy is None:
|
|
policy = load_intel_policy()
|
|
if store is None:
|
|
from incident_store import get_incident_store
|
|
store = get_incident_store()
|
|
if week_str is None:
|
|
week_str = datetime.datetime.utcnow().strftime("%Y-W%W")
|
|
|
|
af_cfg = policy.get("autofollowups", {})
|
|
if not af_cfg.get("enabled", True):
|
|
return {"created": [], "skipped": [{"reason": "disabled"}]}
|
|
|
|
only_when_high = bool(af_cfg.get("only_when_high", True))
|
|
owner = af_cfg.get("owner", "oncall")
|
|
priority = af_cfg.get("priority", "P1")
|
|
due_days = int(af_cfg.get("due_days", 7))
|
|
|
|
# Determine threshold thresholds
|
|
rec_cfg = policy.get("recurrence", {})
|
|
sig_high_thresh = int(rec_cfg.get("thresholds", {}).get("signature", {}).get("high", 6))
|
|
kind_high_thresh = int(rec_cfg.get("thresholds", {}).get("kind", {}).get("high", 10))
|
|
|
|
# Which signatures/kinds are "high"
|
|
high_sigs = {s["signature"] for s in rec_7d.get("high_recurrence", {}).get("signatures", [])}
|
|
high_kinds = {k["kind"] for k in rec_7d.get("high_recurrence", {}).get("kinds", [])}
|
|
|
|
created: List[Dict] = []
|
|
skipped: List[Dict] = []
|
|
|
|
due_date = (datetime.datetime.utcnow() + datetime.timedelta(days=due_days)).isoformat()[:10]
|
|
|
|
for bucket in buckets:
|
|
bkey = bucket["bucket_key"]
|
|
count_7d = bucket["counts"]["7d"]
|
|
|
|
# Check if this bucket is "high"
|
|
is_high = False
|
|
# by signature match
|
|
for ts in bucket.get("top_signatures", []):
|
|
if ts.get("signature", "") in high_sigs:
|
|
is_high = True
|
|
break
|
|
# by kind match
|
|
if not is_high:
|
|
for kind in bucket.get("kinds", []):
|
|
if kind in high_kinds:
|
|
is_high = True
|
|
break
|
|
# by count threshold
|
|
if not is_high and count_7d >= sig_high_thresh:
|
|
is_high = True
|
|
|
|
if only_when_high and not is_high:
|
|
skipped.append({"bucket_key": bkey, "reason": "not_high", "count_7d": count_7d})
|
|
continue
|
|
|
|
dedupe_key = _followup_dedupe_key(policy, week_str, bkey)
|
|
|
|
# Find anchor incident: most recent sample
|
|
samples = sorted(
|
|
bucket.get("sample_incidents", []),
|
|
key=lambda x: x.get("started_at", ""),
|
|
reverse=True,
|
|
)
|
|
if not samples:
|
|
skipped.append({"bucket_key": bkey, "reason": "no_incidents"})
|
|
continue
|
|
|
|
anchor_id = samples[0]["id"]
|
|
if not anchor_id:
|
|
skipped.append({"bucket_key": bkey, "reason": "no_anchor_id"})
|
|
continue
|
|
|
|
# Dedupe check: does anchor already have a followup with this key?
|
|
try:
|
|
existing_events = store.get_events(anchor_id, limit=100)
|
|
for ev in existing_events:
|
|
ev_meta = ev.get("meta") or {}
|
|
if ev_meta.get("dedupe_key") == dedupe_key:
|
|
skipped.append({
|
|
"bucket_key": bkey, "reason": "already_exists",
|
|
"incident_id": anchor_id, "dedupe_key": dedupe_key,
|
|
})
|
|
break
|
|
else:
|
|
# Create follow-up event
|
|
msg = (
|
|
f"[intel] Recurrence high: {bkey} "
|
|
f"(7d={count_7d}, 30d={bucket['counts']['30d']}, "
|
|
f"kinds={','.join(sorted(bucket.get('kinds', []))[:3])})"
|
|
)
|
|
store.append_event(
|
|
anchor_id, "followup",
|
|
safe_truncate(msg, 2000),
|
|
meta={
|
|
"title": f"[intel] Recurrence high: {bkey}",
|
|
"owner": owner,
|
|
"priority": priority,
|
|
"due_date": due_date,
|
|
"dedupe_key": dedupe_key,
|
|
"auto_created": True,
|
|
"bucket_key": bkey,
|
|
"count_7d": count_7d,
|
|
},
|
|
)
|
|
created.append({
|
|
"bucket_key": bkey,
|
|
"incident_id": anchor_id,
|
|
"dedupe_key": dedupe_key,
|
|
"priority": priority,
|
|
"due_date": due_date,
|
|
})
|
|
except Exception as e:
|
|
logger.warning("autofollowup failed for bucket %s: %s", bkey, e)
|
|
skipped.append({"bucket_key": bkey, "reason": f"error: {e}"})
|
|
|
|
return {"created": created, "skipped": skipped}
|
|
|
|
|
|
# ─── Recurrence-Watch helper (for release gate) ───────────────────────────────
|
|
|
|
def recurrence_for_service(
|
|
service: str,
|
|
window_days: int = 7,
|
|
policy: Optional[Dict] = None,
|
|
store = None,
|
|
) -> Dict:
|
|
"""
|
|
Focused recurrence analysis for a single service.
|
|
Returns the same shape as detect_recurrence but filtered to service.
|
|
"""
|
|
if policy is None:
|
|
policy = load_intel_policy()
|
|
if store is None:
|
|
from incident_store import get_incident_store
|
|
store = get_incident_store()
|
|
|
|
rec_cfg = policy.get("recurrence", {})
|
|
thresholds = rec_cfg.get("thresholds", {})
|
|
sig_thresh = thresholds.get("signature", {"warn": 3, "high": 6})
|
|
kind_thresh = thresholds.get("kind", {"warn": 5, "high": 10})
|
|
top_n = int(rec_cfg.get("top_n", 15))
|
|
|
|
incidents = _incidents_in_window(store, window_days)
|
|
if service:
|
|
incidents = [i for i in incidents if i.get("service", "") == service]
|
|
|
|
# Frequency tables for this service only
|
|
sig_count: Dict[str, Dict] = {}
|
|
kind_count: Dict[str, Dict] = {}
|
|
from collections import defaultdict as _defdict
|
|
sev_count: Dict[str, int] = _defdict(int)
|
|
|
|
for inc in incidents:
|
|
fields = incident_key_fields(inc)
|
|
sig = fields["signature"]
|
|
kind = fields["kind"]
|
|
sev = fields["severity"]
|
|
started_at = fields["started_at"]
|
|
|
|
sev_count[sev] += 1
|
|
|
|
if sig:
|
|
if sig not in sig_count:
|
|
sig_count[sig] = {"count": 0, "services": {service}, "last_seen": "",
|
|
"severity_min": sev}
|
|
sig_count[sig]["count"] += 1
|
|
if started_at > sig_count[sig]["last_seen"]:
|
|
sig_count[sig]["last_seen"] = started_at
|
|
|
|
if kind and kind != "custom":
|
|
if kind not in kind_count:
|
|
kind_count[kind] = {"count": 0, "services": {service}}
|
|
kind_count[kind]["count"] += 1
|
|
|
|
top_sigs = sorted(
|
|
[{"signature": k, "count": v["count"], "services": sorted(v["services"]),
|
|
"last_seen": v["last_seen"], "severity_min": v["severity_min"]}
|
|
for k, v in sig_count.items()],
|
|
key=lambda x: -x["count"],
|
|
)[:top_n]
|
|
|
|
top_kinds = sorted(
|
|
[{"kind": k, "count": v["count"], "services": sorted(v["services"])}
|
|
for k, v in kind_count.items()],
|
|
key=lambda x: -x["count"],
|
|
)[:top_n]
|
|
|
|
high_sigs = [s for s in top_sigs if s["count"] >= sig_thresh.get("high", 6)]
|
|
warn_sigs = [s for s in top_sigs
|
|
if sig_thresh.get("warn", 3) <= s["count"] < sig_thresh.get("high", 6)]
|
|
high_kinds = [k for k in top_kinds if k["count"] >= kind_thresh.get("high", 10)]
|
|
warn_kinds = [k for k in top_kinds
|
|
if kind_thresh.get("warn", 5) <= k["count"] < kind_thresh.get("high", 10)]
|
|
|
|
# Determine max severity seen in high-recurrence bucket
|
|
max_sev = "P3"
|
|
for inc in incidents:
|
|
sev = inc.get("severity", "P3")
|
|
if severity_rank(sev) < severity_rank(max_sev):
|
|
max_sev = sev
|
|
|
|
return {
|
|
"service": service,
|
|
"window_days": window_days,
|
|
"total_incidents": len(incidents),
|
|
"severity_distribution": dict(sev_count),
|
|
"max_severity_seen": max_sev,
|
|
"top_signatures": top_sigs,
|
|
"top_kinds": top_kinds,
|
|
"high_recurrence": {"signatures": high_sigs, "kinds": high_kinds},
|
|
"warn_recurrence": {"signatures": warn_sigs, "kinds": warn_kinds},
|
|
}
|