""" incident_intel_utils.py — Data helpers for Incident Intelligence Layer. Provides: - kind extraction from incident (signature, meta, title heuristics) - normalized key fields dict - time-proximity helpers - safe truncation/masking No external dependencies beyond stdlib. """ from __future__ import annotations import datetime import re from typing import Any, Dict, Optional, Tuple # ─── Kind heuristics ────────────────────────────────────────────────────────── _TITLE_KIND_PATTERNS = [ (re.compile(r'\b(latency|slow|timeout|p9[5-9]|p100)\b', re.I), "latency"), (re.compile(r'\b(error.?rate|5xx|http.?error|exception)\b', re.I), "error_rate"), (re.compile(r'\b(slo.?breach|slo)\b', re.I), "slo_breach"), (re.compile(r'\b(oom|out.?of.?memory|memory.?pressure)\b', re.I), "oom"), (re.compile(r'\b(disk|storage|volume.?full|inode)\b', re.I), "disk"), (re.compile(r'\b(security|intrusion|cve|vuln|unauthorized)\b', re.I), "security"), (re.compile(r'\b(deploy|rollout|release|canary)\b', re.I), "deploy"), (re.compile(r'\b(crash.?loop|crashloop|restart)\b', re.I), "crashloop"), (re.compile(r'\b(queue|lag|consumer|backlog)\b', re.I), "queue"), (re.compile(r'\b(network|connectivity|dns|unreachable)\b', re.I), "network"), ] _KNOWN_KINDS = frozenset([ "slo_breach", "crashloop", "latency", "error_rate", "disk", "oom", "deploy", "security", "custom", "network", "queue", ]) def extract_kind(incident: Dict) -> str: """ Best-effort kind extraction. Priority: 1. incident.meta.kind (if present) 2. incident.meta.alert_kind 3. Title heuristics 4. 'custom' """ meta = incident.get("meta") or {} # Direct meta fields for key in ("kind", "alert_kind"): v = meta.get(key) if v and v in _KNOWN_KINDS: return v # Title heuristics title = incident.get("title", "") or "" for pat, kind_name in _TITLE_KIND_PATTERNS: if pat.search(title): return kind_name return "custom" def incident_key_fields(incident: Dict) -> Dict: """Return a normalized dict of key fields used for correlation.""" meta = incident.get("meta") or {} return { "id": incident.get("id", ""), "service": incident.get("service", ""), "env": incident.get("env", "prod"), "severity": incident.get("severity", "P2"), "status": incident.get("status", "open"), "started_at": incident.get("started_at", ""), "signature": meta.get("incident_signature", ""), "kind": extract_kind(incident), } # ─── Time helpers ───────────────────────────────────────────────────────────── def parse_iso(ts: str) -> Optional[datetime.datetime]: """Parse ISO timestamp string to datetime, returns None on failure.""" if not ts: return None try: return datetime.datetime.fromisoformat(ts.rstrip("Z").split("+")[0]) except (ValueError, AttributeError): return None def minutes_apart(ts_a: str, ts_b: str) -> Optional[float]: """Return absolute minutes between two ISO timestamps, or None.""" a = parse_iso(ts_a) b = parse_iso(ts_b) if a is None or b is None: return None return abs((a - b).total_seconds()) / 60.0 def incidents_within_minutes(inc_a: Dict, inc_b: Dict, within: float) -> bool: """Return True if two incidents started within `within` minutes of each other.""" gap = minutes_apart( inc_a.get("started_at", ""), inc_b.get("started_at", ""), ) return gap is not None and gap <= within # ─── Text helpers ───────────────────────────────────────────────────────────── def safe_truncate(text: str, max_chars: int = 200) -> str: if not text: return "" return text[:max_chars] + ("…" if len(text) > max_chars else "") def mask_signature(sig: str, prefix_len: int = 8) -> str: """Show only first N chars of a SHA-256 signature for readability.""" if not sig: return "" return sig[:prefix_len] def severity_rank(sev: str) -> int: """Lower = more severe.""" return {"P0": 0, "P1": 1, "P2": 2, "P3": 3, "INFO": 4}.get(sev, 5) def format_duration(started_at: str, ended_at: Optional[str]) -> str: """Human-readable duration string.""" a = parse_iso(started_at) if a is None: return "unknown" if ended_at: b = parse_iso(ended_at) if b: secs = (b - a).total_seconds() if secs < 60: return f"{int(secs)}s" if secs < 3600: return f"{int(secs / 60)}m" return f"{secs / 3600:.1f}h" return "ongoing"