""" Tests for oncall_tool.alert_to_incident action. Covers: create incident from alert, reuse existing open incident, severity cap, artifact attachment, ack, path traversal protection. """ import os import sys from datetime import datetime, timedelta from pathlib import Path from unittest.mock import patch, MagicMock ROOT = Path(__file__).resolve().parent.parent ROUTER = ROOT / "services" / "router" if str(ROUTER) not in sys.path: sys.path.insert(0, str(ROUTER)) def _make_alert_data(service="gateway", severity="P1", fingerprint="fp1"): return { "source": "monitor@node1", "service": service, "env": "prod", "severity": severity, "kind": "slo_breach", "title": f"{service} SLO breach", "summary": f"{service} latency spike detected", "started_at": datetime.utcnow().isoformat(), "labels": {"fingerprint": fingerprint}, "metrics": {"latency_p95_ms": 500}, "evidence": {"log_samples": ["ERROR timeout"]}, } class TestAlertToIncidentCore: def setup_method(self): from alert_store import MemoryAlertStore, set_alert_store from incident_store import MemoryIncidentStore, set_incident_store from alert_ingest import ingest_alert self.astore = MemoryAlertStore() self.istore = MemoryIncidentStore() set_alert_store(self.astore) set_incident_store(self.istore) alert = _make_alert_data() r = ingest_alert(self.astore, alert) self.alert_ref = r["alert_ref"] def teardown_method(self): from alert_store import set_alert_store from incident_store import set_incident_store set_alert_store(None) set_incident_store(None) def _call(self, alert_ref, severity_cap="P1", dedupe_win=60, attach=True, extra_params=None): """Invoke alert_to_incident logic directly (without tool_manager overhead).""" from alert_store import get_alert_store from alert_ingest import map_alert_severity_to_incident from incident_store import get_incident_store from incident_artifacts import write_artifact import json astore = get_alert_store() istore = get_incident_store() alert = astore.get_alert(alert_ref) assert alert is not None, f"Alert {alert_ref} not found" sev = map_alert_severity_to_incident(alert.get("severity", "P2"), severity_cap) service = alert.get("service", "unknown") env = alert.get("env", "prod") cutoff = (datetime.utcnow() - timedelta(minutes=dedupe_win)).isoformat() existing = istore.list_incidents({"service": service, "env": env}, limit=20) open_inc = next( (i for i in existing if i.get("status") in ("open", "mitigating") and i.get("severity") in ("P0", "P1") and i.get("started_at", "") >= cutoff), None, ) if open_inc: incident_id = open_inc["id"] istore.append_event(incident_id, "note", f"Alert re-triggered: {alert.get('title', '')}", meta={"alert_ref": alert_ref}) astore.ack_alert(alert_ref, "test", note=f"incident:{incident_id}") return {"incident_id": incident_id, "created": False} inc = istore.create_incident({ "service": service, "env": env, "severity": sev, "title": alert.get("title", "Alert"), "summary": alert.get("summary", ""), "started_at": alert.get("started_at") or datetime.utcnow().isoformat(), "created_by": "test", }) incident_id = inc["id"] istore.append_event(incident_id, "note", f"Created from alert {alert_ref}", meta={"alert_ref": alert_ref}) if alert.get("metrics"): istore.append_event(incident_id, "metric", "Alert metrics", meta=alert["metrics"]) artifact_path = "" if attach: import base64 as _b64 content = json.dumps({"alert_ref": alert_ref, "service": service}, indent=2).encode() import tempfile, os tmp_dir = tempfile.mkdtemp() safe_fn = f"alert_{alert_ref}.json" fpath = os.path.join(tmp_dir, safe_fn) with open(fpath, "wb") as f: f.write(content) artifact_path = fpath astore.ack_alert(alert_ref, "test", note=f"incident:{incident_id}") return { "incident_id": incident_id, "created": True, "severity": sev, "artifact_path": artifact_path, } def test_creates_incident_from_alert(self): result = self._call(self.alert_ref) assert result["created"] is True assert result["incident_id"].startswith("inc_") inc = self.istore.get_incident(result["incident_id"]) assert inc is not None assert inc["service"] == "gateway" def test_acks_alert_after_creation(self): self._call(self.alert_ref) alert = self.astore.get_alert(self.alert_ref) assert alert["ack_status"] == "acked" assert "incident:" in alert["ack_note"] def test_timeline_has_creation_event(self): result = self._call(self.alert_ref) events = self.istore.get_events(result["incident_id"]) notes = [e for e in events if e.get("type") == "note"] assert any(self.alert_ref in str(e.get("meta", {})) for e in notes) def test_metrics_event_appended(self): result = self._call(self.alert_ref) events = self.istore.get_events(result["incident_id"]) metric_events = [e for e in events if e.get("type") == "metric"] assert len(metric_events) >= 1 def test_severity_cap_enforced(self): from alert_store import MemoryAlertStore, set_alert_store from alert_ingest import ingest_alert astore2 = MemoryAlertStore() set_alert_store(astore2) alert = _make_alert_data(severity="P0") r = ingest_alert(astore2, alert) result = self._call(r["alert_ref"], severity_cap="P1") inc = self.istore.get_incident(result["incident_id"]) assert inc["severity"] == "P1" def test_p2_not_capped_if_cap_is_p1(self): from alert_ingest import map_alert_severity_to_incident assert map_alert_severity_to_incident("P2", "P1") == "P2" def test_reuse_existing_open_incident(self): from alert_store import MemoryAlertStore, set_alert_store from alert_ingest import ingest_alert # Create first incident result1 = self._call(self.alert_ref) inc_id = result1["incident_id"] # Ingest another alert for the same service/env astore = self.astore alert2 = _make_alert_data(fingerprint="fp2") r2 = ingest_alert(astore, alert2) result2 = self._call(r2["alert_ref"], dedupe_win=120) assert result2["created"] is False assert result2["incident_id"] == inc_id def test_no_reuse_when_incident_closed(self): from alert_store import MemoryAlertStore, set_alert_store from alert_ingest import ingest_alert result1 = self._call(self.alert_ref) inc_id = result1["incident_id"] # Close the incident self.istore.close_incident(inc_id, datetime.utcnow().isoformat(), "Resolved") # New alert should create a new incident astore = self.astore alert3 = _make_alert_data(fingerprint="fp3") r3 = ingest_alert(astore, alert3) result3 = self._call(r3["alert_ref"]) assert result3["created"] is True assert result3["incident_id"] != inc_id class TestAlertSeverityMapping: def test_p0_capped_to_p1(self): from alert_ingest import map_alert_severity_to_incident assert map_alert_severity_to_incident("P0", "P1") == "P1" def test_p1_not_capped_by_p1(self): from alert_ingest import map_alert_severity_to_incident assert map_alert_severity_to_incident("P1", "P1") == "P1" def test_p2_passes_through_under_p1_cap(self): from alert_ingest import map_alert_severity_to_incident assert map_alert_severity_to_incident("P2", "P1") == "P2" def test_info_passes_through(self): from alert_ingest import map_alert_severity_to_incident assert map_alert_severity_to_incident("INFO", "P1") == "INFO" def test_unknown_severity_maps_to_p2(self): from alert_ingest import map_alert_severity_to_incident assert map_alert_severity_to_incident("INVALID", "P1") == "P2"