""" tests/test_risk_history_store.py — Unit tests for RiskHistoryStore backends. Tests: - write/get_latest/get_series/get_delta (Memory backend) - retention cleanup - AutoRiskHistoryStore memory fallback on Postgres error """ import datetime import sys import pytest from pathlib import Path from unittest.mock import MagicMock, patch sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "services" / "router")) from risk_history_store import ( RiskSnapshot, MemoryRiskHistoryStore, NullRiskHistoryStore, AutoRiskHistoryStore, set_risk_history_store, ) def _snap(service, env, score, band, hours_ago=0) -> RiskSnapshot: ts = (datetime.datetime.utcnow() - datetime.timedelta(hours=hours_ago)).isoformat() return RiskSnapshot(ts=ts, service=service, env=env, score=score, band=band) # ─── MemoryRiskHistoryStore ─────────────────────────────────────────────────── class TestMemoryStore: def test_write_and_get_latest(self): store = MemoryRiskHistoryStore() snap = _snap("gateway", "prod", 55, "high") store.write_snapshot([snap]) result = store.get_latest("gateway", "prod") assert result is not None assert result.score == 55 assert result.service == "gateway" def test_get_latest_none_if_empty(self): store = MemoryRiskHistoryStore() assert store.get_latest("gateway", "prod") is None def test_get_latest_returns_most_recent(self): store = MemoryRiskHistoryStore() store.write_snapshot([ _snap("gateway", "prod", 30, "medium", hours_ago=5), _snap("gateway", "prod", 60, "high", hours_ago=1), _snap("gateway", "prod", 70, "high", hours_ago=0), ]) latest = store.get_latest("gateway", "prod") assert latest.score == 70 def test_get_series_filters_by_hours(self): store = MemoryRiskHistoryStore() store.write_snapshot([ _snap("gateway", "prod", 20, "low", hours_ago=100), # outside window _snap("gateway", "prod", 40, "medium", hours_ago=10), _snap("gateway", "prod", 60, "high", hours_ago=1), ]) series = store.get_series("gateway", "prod", hours=24) assert len(series) == 2 assert all(s.score in (40, 60) for s in series) def test_get_series_sorted_desc(self): store = MemoryRiskHistoryStore() store.write_snapshot([ _snap("gateway", "prod", 40, "medium", hours_ago=10), _snap("gateway", "prod", 60, "high", hours_ago=1), ]) series = store.get_series("gateway", "prod", hours=48) assert series[0].score >= series[-1].score # newest first def test_get_delta_computes_difference(self): store = MemoryRiskHistoryStore() store.write_snapshot([ _snap("gateway", "prod", 30, "medium", hours_ago=25), # baseline _snap("gateway", "prod", 55, "high", hours_ago=1), # latest ]) delta = store.get_delta("gateway", "prod", hours=24) assert delta == 25 # 55 - 30 def test_get_delta_none_if_no_baseline(self): store = MemoryRiskHistoryStore() store.write_snapshot([_snap("gateway", "prod", 55, "high", hours_ago=1)]) # No snapshot before 24h ago delta = store.get_delta("gateway", "prod", hours=24) assert delta is None def test_get_delta_negative_when_improving(self): store = MemoryRiskHistoryStore() store.write_snapshot([ _snap("gateway", "prod", 70, "high", hours_ago=25), _snap("gateway", "prod", 40, "medium", hours_ago=1), ]) delta = store.get_delta("gateway", "prod", hours=24) assert delta == -30 # 40 - 70 def test_dashboard_series_returns_latest_per_service(self): store = MemoryRiskHistoryStore() store.write_snapshot([ _snap("gateway", "prod", 80, "critical", hours_ago=1), _snap("router", "prod", 40, "medium", hours_ago=2), _snap("gateway", "staging", 50, "medium", hours_ago=1), # different env ]) result = store.dashboard_series("prod", hours=24, top_n=10) services = [r["service"] for r in result] assert "gateway" in services assert "router" in services assert "staging" not in str(result) # env filtered def test_dashboard_series_sorted_by_score(self): store = MemoryRiskHistoryStore() store.write_snapshot([ _snap("gateway", "prod", 80, "critical", hours_ago=1), _snap("router", "prod", 20, "low", hours_ago=2), _snap("memory-service", "prod", 50, "medium", hours_ago=3), ]) result = store.dashboard_series("prod", hours=24, top_n=10) scores = [r["score"] for r in result] assert scores == sorted(scores, reverse=True) def test_cleanup_removes_old_records(self): store = MemoryRiskHistoryStore() store.write_snapshot([ _snap("gateway", "prod", 30, "low", hours_ago=24 * 100), # old _snap("gateway", "prod", 40, "medium", hours_ago=24 * 100), _snap("gateway", "prod", 60, "high", hours_ago=1), # recent ]) deleted = store.cleanup(retention_days=90) assert deleted == 2 series = store.get_series("gateway", "prod", hours=24 * 200) assert len(series) == 1 assert series[0].score == 60 # ─── NullRiskHistoryStore ───────────────────────────────────────────────────── class TestNullStore: def test_write_returns_zero(self): store = NullRiskHistoryStore() assert store.write_snapshot([_snap("g", "prod", 50, "medium")]) == 0 def test_get_latest_returns_none(self): store = NullRiskHistoryStore() assert store.get_latest("gateway", "prod") is None def test_get_series_returns_empty(self): store = NullRiskHistoryStore() assert store.get_series("gateway", "prod") == [] def test_cleanup_returns_zero(self): store = NullRiskHistoryStore() assert store.cleanup() == 0 # ─── AutoRiskHistoryStore fallback ──────────────────────────────────────────── class TestAutoStoreFallback: def test_postgres_error_falls_back_to_memory(self): """When Postgres raises, AutoStore uses memory buffer for reads.""" auto = AutoRiskHistoryStore(pg_dsn="postgresql://bad:5432/none") snap = _snap("gateway", "prod", 55, "high") # Write — Postgres will fail, but memory buffer gets the snap auto.write_snapshot([snap]) # get_latest — Postgres fails, falls back to memory result = auto.get_latest("gateway", "prod") assert result is not None assert result.score == 55 def test_series_falls_back_to_memory(self): auto = AutoRiskHistoryStore(pg_dsn="postgresql://bad:5432/none") snaps = [ _snap("router", "prod", 40, "medium", hours_ago=2), _snap("router", "prod", 60, "high", hours_ago=1), ] auto.write_snapshot(snaps) series = auto.get_series("router", "prod", hours=24) assert len(series) == 2 def test_get_delta_falls_back_to_memory(self): auto = AutoRiskHistoryStore(pg_dsn="postgresql://bad:5432/none") auto.write_snapshot([ _snap("gateway", "prod", 30, "medium", hours_ago=25), _snap("gateway", "prod", 55, "high", hours_ago=1), ]) delta = auto.get_delta("gateway", "prod", hours=24) assert delta == 25 # ─── RiskSnapshot serialisation ────────────────────────────────────────────── class TestRiskSnapshotSerde: def test_to_dict_roundtrip(self): snap = _snap("gateway", "prod", 72, "high") d = snap.to_dict() assert d["service"] == "gateway" assert d["score"] == 72 snap2 = RiskSnapshot.from_dict(d) assert snap2.score == 72 assert snap2.band == "high"