""" tests/test_incident_log.py ─────────────────────────── Tests for incident_store, incident_artifacts, and oncall_tool incident CRUD. """ from __future__ import annotations import base64 import json import os import sys import tempfile from pathlib import Path from typing import Dict from unittest.mock import AsyncMock, MagicMock, patch import pytest ROUTER = Path(__file__).resolve().parent.parent / "services" / "router" if str(ROUTER) not in sys.path: sys.path.insert(0, str(ROUTER)) # ─── incident_store tests ──────────────────────────────────────────────────── class TestMemoryIncidentStore: def setup_method(self): from incident_store import MemoryIncidentStore self.store = MemoryIncidentStore() def test_create_and_get_incident(self): inc = self.store.create_incident({ "service": "router", "severity": "P1", "title": "Router is down", "started_at": "2026-02-23T10:00:00Z", "created_by": "sofiia", }) assert inc["id"].startswith("inc_") assert inc["status"] == "open" assert inc["service"] == "router" fetched = self.store.get_incident(inc["id"]) assert fetched is not None assert fetched["id"] == inc["id"] assert "events" in fetched assert "artifacts" in fetched def test_list_incidents_with_filters(self): self.store.create_incident({"service": "router", "severity": "P1", "title": "A", "created_by": "x"}) self.store.create_incident({"service": "gateway", "severity": "P2", "title": "B", "created_by": "x"}) self.store.create_incident({"service": "router", "severity": "P2", "title": "C", "created_by": "x"}) all_inc = self.store.list_incidents() assert len(all_inc) == 3 router_only = self.store.list_incidents({"service": "router"}) assert len(router_only) == 2 p1_only = self.store.list_incidents({"severity": "P1"}) assert len(p1_only) == 1 def test_close_incident(self): inc = self.store.create_incident({"service": "router", "title": "Down", "created_by": "x"}) result = self.store.close_incident(inc["id"], "2026-02-23T12:00:00Z", "Restarted service") assert result is not None assert result["status"] == "closed" assert result["ended_at"] == "2026-02-23T12:00:00Z" events = self.store.get_events(inc["id"]) assert any(e["type"] == "status_change" for e in events) def test_close_nonexistent_returns_none(self): result = self.store.close_incident("inc_nonexistent", "", "") assert result is None def test_append_event(self): inc = self.store.create_incident({"service": "x", "title": "T", "created_by": "x"}) ev = self.store.append_event(inc["id"], "note", "Investigating logs") assert ev is not None assert ev["type"] == "note" assert "Investigating" in ev["message"] def test_append_event_nonexistent_returns_none(self): result = self.store.append_event("inc_nonexistent", "note", "msg") assert result is None def test_add_artifact(self): inc = self.store.create_incident({"service": "x", "title": "T", "created_by": "x"}) art = self.store.add_artifact(inc["id"], "triage_report", "json", "/path/to/file", "abc123", 1024) assert art is not None assert art["kind"] == "triage_report" artifacts = self.store.get_artifacts(inc["id"]) assert len(artifacts) == 1 def test_message_redaction(self): inc = self.store.create_incident({"service": "x", "title": "T", "created_by": "x"}) ev = self.store.append_event(inc["id"], "note", "Found token=sk-12345 in logs") assert "sk-12345" not in ev["message"] assert "token=***" in ev["message"] def test_full_lifecycle(self): """create → append events → attach artifact → close → get""" inc = self.store.create_incident({ "service": "gateway", "severity": "P0", "title": "Gateway OOM", "started_at": "2026-02-23T08:00:00Z", "created_by": "sofiia", }) self.store.append_event(inc["id"], "note", "Memory usage spiking") self.store.append_event(inc["id"], "action", "Restarting gateway pods") self.store.add_artifact(inc["id"], "triage_report", "json", "/tmp/triage.json", "sha", 500) self.store.close_incident(inc["id"], "2026-02-23T09:30:00Z", "OOM caused by memory leak in v2.3.1") final = self.store.get_incident(inc["id"]) assert final["status"] == "closed" assert len(final["events"]) >= 3 # 2 notes + 1 status_change assert len(final["artifacts"]) == 1 class TestJsonlIncidentStore: def test_create_and_get(self, tmp_path): from incident_store import JsonlIncidentStore store = JsonlIncidentStore(str(tmp_path)) inc = store.create_incident({"service": "svc", "title": "Test", "created_by": "x"}) fetched = store.get_incident(inc["id"]) assert fetched is not None assert fetched["service"] == "svc" def test_append_event_and_list(self, tmp_path): from incident_store import JsonlIncidentStore store = JsonlIncidentStore(str(tmp_path)) inc = store.create_incident({"service": "svc", "title": "T", "created_by": "x"}) store.append_event(inc["id"], "note", "test message") events = store.get_events(inc["id"]) assert len(events) == 1 assert events[0]["type"] == "note" def test_close_and_reopen(self, tmp_path): from incident_store import JsonlIncidentStore store = JsonlIncidentStore(str(tmp_path)) inc = store.create_incident({"service": "svc", "title": "T", "created_by": "x"}) store.close_incident(inc["id"], "2026-02-23T12:00:00Z", "Fixed") fetched = store.get_incident(inc["id"]) assert fetched["status"] == "closed" # ─── incident_artifacts tests ──────────────────────────────────────────────── class TestIncidentArtifacts: def test_write_artifact(self, tmp_path): from incident_artifacts import write_artifact content = b'{"summary": "test postmortem"}' result = write_artifact("inc_test_001", "postmortem_draft.json", content, base_dir=str(tmp_path)) assert result["size_bytes"] == len(content) assert result["sha256"] assert "inc_test_001" in result["path"] assert (tmp_path / "inc_test_001" / "postmortem_draft.json").exists() def test_write_artifact_md(self, tmp_path): from incident_artifacts import write_artifact content = b"# Postmortem\n\nSummary here." result = write_artifact("inc_test_002", "postmortem.md", content, base_dir=str(tmp_path)) assert result["size_bytes"] == len(content) def test_write_artifact_path_traversal_blocked(self, tmp_path): from incident_artifacts import write_artifact with pytest.raises(ValueError, match="Invalid incident_id"): write_artifact("../etc/passwd", "test.json", b"{}", base_dir=str(tmp_path)) def test_write_artifact_format_blocked(self, tmp_path): from incident_artifacts import write_artifact with pytest.raises(ValueError, match="not allowed"): write_artifact("inc_001", "script.py", b"import os", base_dir=str(tmp_path)) def test_write_artifact_too_large(self, tmp_path): from incident_artifacts import write_artifact big = b"x" * (3 * 1024 * 1024) # 3MB with pytest.raises(ValueError, match="too large"): write_artifact("inc_001", "big.json", big, base_dir=str(tmp_path)) def test_decode_content_valid(self): from incident_artifacts import decode_content original = b"hello world" encoded = base64.b64encode(original).decode("ascii") assert decode_content(encoded) == original def test_decode_content_invalid(self): from incident_artifacts import decode_content with pytest.raises(ValueError, match="Invalid base64"): decode_content("not-valid-base64!!!") # ─── RBAC tests ────────────────────────────────────────────────────────────── class TestIncidentRBAC: """Test that monitor/aistalk roles cannot write incidents.""" def test_monitor_role_is_read_only(self): """monitor role should NOT have incident_write entitlement.""" import yaml rbac_path = Path(__file__).parent.parent / "config" / "rbac_tools_matrix.yml" with open(rbac_path) as f: rbac = yaml.safe_load(f) monitor_ents = rbac.get("role_entitlements", {}).get("agent_monitor", []) assert "tools.oncall.incident_write" not in monitor_ents assert "tools.oncall.read" in monitor_ents def test_interface_role_is_read_only(self): """agent_interface (AISTALK) should have only read.""" import yaml rbac_path = Path(__file__).parent.parent / "config" / "rbac_tools_matrix.yml" with open(rbac_path) as f: rbac = yaml.safe_load(f) interface_ents = rbac.get("role_entitlements", {}).get("agent_interface", []) assert "tools.oncall.incident_write" not in interface_ents assert "tools.oncall.read" in interface_ents def test_cto_has_write(self): """agent_cto (sofiia) should have incident_write.""" import yaml rbac_path = Path(__file__).parent.parent / "config" / "rbac_tools_matrix.yml" with open(rbac_path) as f: rbac = yaml.safe_load(f) cto_ents = rbac.get("role_entitlements", {}).get("agent_cto", []) assert "tools.oncall.incident_write" in cto_ents def test_oncall_has_write(self): """agent_oncall (helion) should have incident_write.""" import yaml rbac_path = Path(__file__).parent.parent / "config" / "rbac_tools_matrix.yml" with open(rbac_path) as f: rbac = yaml.safe_load(f) oncall_ents = rbac.get("role_entitlements", {}).get("agent_oncall", []) assert "tools.oncall.incident_write" in oncall_ents # ─── Agent role mapping tests ──────────────────────────────────────────────── class TestAgentRoleMapping: def test_monitor_maps_to_agent_monitor(self): import yaml rollout_path = Path(__file__).parent.parent / "config" / "tools_rollout.yml" with open(rollout_path) as f: rollout = yaml.safe_load(f) assert rollout["agent_roles"]["monitor"] == "agent_monitor" def test_aistalk_maps_to_agent_interface(self): import yaml rollout_path = Path(__file__).parent.parent / "config" / "tools_rollout.yml" with open(rollout_path) as f: rollout = yaml.safe_load(f) assert rollout["agent_roles"]["aistalk"] == "agent_interface" def test_sofiia_still_cto(self): import yaml rollout_path = Path(__file__).parent.parent / "config" / "tools_rollout.yml" with open(rollout_path) as f: rollout = yaml.safe_load(f) assert rollout["agent_roles"]["sofiia"] == "agent_cto"