""" tests/test_monitor_status.py — Tests for GET /monitor/status on router. Covers: - test_monitor_status_basic : returns required fields, router_ok=True - test_monitor_status_no_secrets : no DSN/password/key in response - test_monitor_status_rbac_prod : 403 when wrong key in prod - test_monitor_status_rbac_dev : 200 even without key in dev - test_monitor_status_partial_fail : returns data even if incidents/slo unavailable - test_monitor_status_rate_limit : 429 after 60 rpm - test_healthz_alias : /healthz returns same as /health """ from __future__ import annotations import json import os import sys import importlib import unittest from pathlib import Path from unittest.mock import MagicMock, patch # Ensure router modules are importable sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "services" / "router")) # ── Minimal FastAPI test client setup ───────────────────────────────────────── def _make_test_client(): """Import router main and return TestClient (skips if deps missing).""" try: from fastapi.testclient import TestClient import main as router_main return TestClient(router_main.app, raise_server_exceptions=False) except Exception as exc: return None, str(exc) # ── Helpers ─────────────────────────────────────────────────────────────────── def _call_monitor(client, headers=None): return client.get("/monitor/status", headers=headers or {}) def _call_health(client): return client.get("/health") def _call_healthz(client): return client.get("/healthz") # ───────────────────────────────────────────────────────────────────────────── # Tests # ───────────────────────────────────────────────────────────────────────────── class TestMonitorStatusBasic(unittest.TestCase): def setUp(self): result = _make_test_client() if isinstance(result, tuple): self.skipTest(f"Cannot import router main: {result[1]}") self.client = result def test_returns_200(self): r = _call_monitor(self.client) self.assertEqual(r.status_code, 200, r.text[:200]) def test_required_fields_present(self): r = _call_monitor(self.client) d = r.json() required = ["node_id", "ts", "heartbeat_age_s", "router_ok", "backends"] for field in required: self.assertIn(field, d, f"missing field: {field}") def test_router_ok_true(self): """Router self-reports as OK if endpoint responds.""" r = _call_monitor(self.client) d = r.json() self.assertTrue(d["router_ok"]) def test_backends_has_alerts(self): r = _call_monitor(self.client) be = r.json().get("backends", {}) self.assertIn("alerts", be) self.assertIn("incidents", be) def test_heartbeat_age_nonnegative(self): r = _call_monitor(self.client) age = r.json().get("heartbeat_age_s") self.assertIsNotNone(age) self.assertGreaterEqual(age, 0) def test_warnings_is_list(self): r = _call_monitor(self.client) warnings = r.json().get("warnings", []) self.assertIsInstance(warnings, list) class TestMonitorStatusNoSecrets(unittest.TestCase): """Ensure no DSN, passwords, or keys leak in the response.""" FORBIDDEN_PATTERNS = [ "postgresql://", "postgres://", "mongodb://", "mysql://", "password", "passwd", "secret", "dsn", "DATABASE_URL", "QDRANT_", "AWS_SECRET", "API_KEY=", "token=", ] def setUp(self): result = _make_test_client() if isinstance(result, tuple): self.skipTest(f"Cannot import router main: {result[1]}") self.client = result def test_no_secrets_in_response(self): r = _call_monitor(self.client) body = r.text.lower() for pat in self.FORBIDDEN_PATTERNS: self.assertNotIn(pat.lower(), body, f"Potential secret pattern '{pat}' found in /monitor/status response") def test_backends_values_are_short_identifiers(self): """Backend values should be short labels like 'postgres', 'auto', 'memory' — not DSNs.""" r = _call_monitor(self.client) backends = r.json().get("backends", {}) for key, value in backends.items(): if value and value != "unknown": self.assertLess(len(str(value)), 64, f"backends.{key} value looks too long (possible DSN): {value[:80]}") self.assertNotIn("://", str(value), f"backends.{key} contains URL scheme (possible DSN): {value[:80]}") class TestMonitorStatusRBAC(unittest.TestCase): def setUp(self): result = _make_test_client() if isinstance(result, tuple): self.skipTest(f"Cannot import router main: {result[1]}") self.client = result def test_dev_no_key_returns_200(self): """In dev env (no API key set), /monitor/status is accessible without auth.""" with patch.dict(os.environ, {"ENV": "dev", "SUPERVISOR_API_KEY": ""}): r = _call_monitor(self.client) self.assertEqual(r.status_code, 200) def test_prod_no_key_still_200_when_no_key_configured(self): """If SUPERVISOR_API_KEY is not set, even prod allows access (graceful).""" with patch.dict(os.environ, {"ENV": "prod", "SUPERVISOR_API_KEY": ""}): r = _call_monitor(self.client) self.assertEqual(r.status_code, 200) def test_prod_wrong_key_returns_403(self): """In prod with a configured API key, wrong key → 403.""" with patch.dict(os.environ, {"ENV": "prod", "SUPERVISOR_API_KEY": "secret-key-123"}): r = _call_monitor(self.client, headers={"X-Monitor-Key": "wrong-key"}) self.assertEqual(r.status_code, 403) def test_prod_correct_key_returns_200(self): """In prod, correct X-Monitor-Key → 200.""" with patch.dict(os.environ, {"ENV": "prod", "SUPERVISOR_API_KEY": "secret-key-123"}): r = _call_monitor(self.client, headers={"X-Monitor-Key": "secret-key-123"}) self.assertEqual(r.status_code, 200) def test_prod_bearer_token_accepted(self): """In prod, Authorization: Bearer is also accepted.""" with patch.dict(os.environ, {"ENV": "prod", "SUPERVISOR_API_KEY": "secret-key-123"}): r = _call_monitor(self.client, headers={"Authorization": "Bearer secret-key-123"}) self.assertEqual(r.status_code, 200) class TestMonitorStatusPartialFail(unittest.TestCase): """Even if incidents or SLO store fails, /monitor/status must return 200 with partial data.""" def setUp(self): result = _make_test_client() if isinstance(result, tuple): self.skipTest(f"Cannot import router main: {result[1]}") self.client = result def test_incident_store_error_is_non_fatal(self): """If incident_store raises, open_incidents is None and warning is added.""" with patch.dict("sys.modules", {"incident_store": None}): # Force import error on incident_store within the endpoint r = _call_monitor(self.client) # Must still return 200 self.assertEqual(r.status_code, 200) d = r.json() # open_incidents can be null but endpoint must not crash self.assertIn("open_incidents", d) def test_alert_store_error_is_non_fatal(self): """If alert_store.compute_loop_slo raises, alerts_loop_slo is None and warning added.""" with patch.dict("sys.modules", {"alert_store": None}): r = _call_monitor(self.client) self.assertEqual(r.status_code, 200) d = r.json() self.assertIn("alerts_loop_slo", d) def test_partial_data_has_warnings(self): """When stores are unavailable, warnings list should be non-empty.""" # Simulate both stores failing by patching the imports inside the function import main as router_main orig_get_is = None orig_get_as = None try: import incident_store as _is_mod orig_get_is = _is_mod.get_incident_store def _bad_is(): raise RuntimeError("simulated incident_store failure") _is_mod.get_incident_store = _bad_is except ImportError: pass try: import alert_store as _as_mod orig_get_as = _as_mod.get_alert_store def _bad_as(): raise RuntimeError("simulated alert_store failure") _as_mod.get_alert_store = _bad_as except ImportError: pass try: r = _call_monitor(self.client) self.assertEqual(r.status_code, 200) d = r.json() warnings = d.get("warnings", []) self.assertIsInstance(warnings, list) finally: # Restore try: if orig_get_is: import incident_store as _is_mod _is_mod.get_incident_store = orig_get_is if orig_get_as: import alert_store as _as_mod _as_mod.get_alert_store = orig_get_as except Exception: pass class TestMonitorStatusRateLimit(unittest.TestCase): def setUp(self): result = _make_test_client() if isinstance(result, tuple): self.skipTest(f"Cannot import router main: {result[1]}") self.client = result def test_rate_limit_after_60_requests(self): """After 60 requests from same IP within 60s, should get 429.""" import main as router_main # Reset the rate bucket for test isolation if hasattr(router_main.monitor_status, "_buckets"): router_main.monitor_status._buckets.clear() # Fire 60 — all should pass for i in range(60): r = _call_monitor(self.client) self.assertIn(r.status_code, (200, 403), f"Expected 200/403 on request {i+1}, got {r.status_code}") # 61st should be rate limited r = _call_monitor(self.client) self.assertEqual(r.status_code, 429, "Expected 429 after 60 rpm") def tearDown(self): # Always reset bucket after test try: import main as router_main if hasattr(router_main.monitor_status, "_buckets"): router_main.monitor_status._buckets.clear() except Exception: pass class TestHealthzAlias(unittest.TestCase): """GET /healthz should return same structure as GET /health.""" def setUp(self): result = _make_test_client() if isinstance(result, tuple): self.skipTest(f"Cannot import router main: {result[1]}") self.client = result def test_healthz_returns_200(self): r = _call_healthz(self.client) self.assertEqual(r.status_code, 200) def test_healthz_has_status_ok(self): r = _call_healthz(self.client) self.assertEqual(r.json().get("status"), "ok") def test_healthz_same_fields_as_health(self): rh = _call_health(self.client) rz = _call_healthz(self.client) health_keys = set(rh.json().keys()) healthz_keys = set(rz.json().keys()) self.assertEqual(health_keys, healthz_keys, f"healthz keys differ from health: {health_keys ^ healthz_keys}") class TestMonitorRbacMatrixEntitlement(unittest.TestCase): """Verify rbac_tools_matrix.yml contains tools.monitor.read in correct roles.""" def _load_matrix(self): import yaml as _yaml paths = [ Path(__file__).resolve().parent.parent / "config" / "rbac_tools_matrix.yml", Path("config/rbac_tools_matrix.yml"), ] for p in paths: if p.exists(): with open(p) as f: return _yaml.safe_load(f) self.skipTest("rbac_tools_matrix.yml not found") def test_monitor_tool_defined(self): m = self._load_matrix() tools = m.get("tools", {}) self.assertIn("monitor_tool", tools, "monitor_tool missing from rbac matrix") def test_monitor_status_action_has_entitlement(self): m = self._load_matrix() ents = ( m.get("tools", {}) .get("monitor_tool", {}) .get("actions", {}) .get("status", {}) .get("entitlements", []) ) self.assertIn("tools.monitor.read", ents) def test_agent_cto_has_monitor_read(self): m = self._load_matrix() cto_ents = m.get("role_entitlements", {}).get("agent_cto", []) self.assertIn("tools.monitor.read", cto_ents) def test_agent_monitor_has_monitor_read(self): m = self._load_matrix() ents = m.get("role_entitlements", {}).get("agent_monitor", []) self.assertIn("tools.monitor.read", ents) def test_agent_oncall_has_monitor_read(self): m = self._load_matrix() ents = m.get("role_entitlements", {}).get("agent_oncall", []) self.assertIn("tools.monitor.read", ents) if __name__ == "__main__": unittest.main(verbosity=2)