New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
205 lines
8.2 KiB
Python
205 lines
8.2 KiB
Python
"""
|
|
tests/test_risk_history_store.py — Unit tests for RiskHistoryStore backends.
|
|
|
|
Tests:
|
|
- write/get_latest/get_series/get_delta (Memory backend)
|
|
- retention cleanup
|
|
- AutoRiskHistoryStore memory fallback on Postgres error
|
|
"""
|
|
import datetime
|
|
import sys
|
|
import pytest
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "services" / "router"))
|
|
|
|
from risk_history_store import (
|
|
RiskSnapshot,
|
|
MemoryRiskHistoryStore,
|
|
NullRiskHistoryStore,
|
|
AutoRiskHistoryStore,
|
|
set_risk_history_store,
|
|
)
|
|
|
|
|
|
def _snap(service, env, score, band, hours_ago=0) -> RiskSnapshot:
|
|
ts = (datetime.datetime.utcnow() - datetime.timedelta(hours=hours_ago)).isoformat()
|
|
return RiskSnapshot(ts=ts, service=service, env=env, score=score, band=band)
|
|
|
|
|
|
# ─── MemoryRiskHistoryStore ───────────────────────────────────────────────────
|
|
|
|
class TestMemoryStore:
|
|
def test_write_and_get_latest(self):
|
|
store = MemoryRiskHistoryStore()
|
|
snap = _snap("gateway", "prod", 55, "high")
|
|
store.write_snapshot([snap])
|
|
result = store.get_latest("gateway", "prod")
|
|
assert result is not None
|
|
assert result.score == 55
|
|
assert result.service == "gateway"
|
|
|
|
def test_get_latest_none_if_empty(self):
|
|
store = MemoryRiskHistoryStore()
|
|
assert store.get_latest("gateway", "prod") is None
|
|
|
|
def test_get_latest_returns_most_recent(self):
|
|
store = MemoryRiskHistoryStore()
|
|
store.write_snapshot([
|
|
_snap("gateway", "prod", 30, "medium", hours_ago=5),
|
|
_snap("gateway", "prod", 60, "high", hours_ago=1),
|
|
_snap("gateway", "prod", 70, "high", hours_ago=0),
|
|
])
|
|
latest = store.get_latest("gateway", "prod")
|
|
assert latest.score == 70
|
|
|
|
def test_get_series_filters_by_hours(self):
|
|
store = MemoryRiskHistoryStore()
|
|
store.write_snapshot([
|
|
_snap("gateway", "prod", 20, "low", hours_ago=100), # outside window
|
|
_snap("gateway", "prod", 40, "medium", hours_ago=10),
|
|
_snap("gateway", "prod", 60, "high", hours_ago=1),
|
|
])
|
|
series = store.get_series("gateway", "prod", hours=24)
|
|
assert len(series) == 2
|
|
assert all(s.score in (40, 60) for s in series)
|
|
|
|
def test_get_series_sorted_desc(self):
|
|
store = MemoryRiskHistoryStore()
|
|
store.write_snapshot([
|
|
_snap("gateway", "prod", 40, "medium", hours_ago=10),
|
|
_snap("gateway", "prod", 60, "high", hours_ago=1),
|
|
])
|
|
series = store.get_series("gateway", "prod", hours=48)
|
|
assert series[0].score >= series[-1].score # newest first
|
|
|
|
def test_get_delta_computes_difference(self):
|
|
store = MemoryRiskHistoryStore()
|
|
store.write_snapshot([
|
|
_snap("gateway", "prod", 30, "medium", hours_ago=25), # baseline
|
|
_snap("gateway", "prod", 55, "high", hours_ago=1), # latest
|
|
])
|
|
delta = store.get_delta("gateway", "prod", hours=24)
|
|
assert delta == 25 # 55 - 30
|
|
|
|
def test_get_delta_none_if_no_baseline(self):
|
|
store = MemoryRiskHistoryStore()
|
|
store.write_snapshot([_snap("gateway", "prod", 55, "high", hours_ago=1)])
|
|
# No snapshot before 24h ago
|
|
delta = store.get_delta("gateway", "prod", hours=24)
|
|
assert delta is None
|
|
|
|
def test_get_delta_negative_when_improving(self):
|
|
store = MemoryRiskHistoryStore()
|
|
store.write_snapshot([
|
|
_snap("gateway", "prod", 70, "high", hours_ago=25),
|
|
_snap("gateway", "prod", 40, "medium", hours_ago=1),
|
|
])
|
|
delta = store.get_delta("gateway", "prod", hours=24)
|
|
assert delta == -30 # 40 - 70
|
|
|
|
def test_dashboard_series_returns_latest_per_service(self):
|
|
store = MemoryRiskHistoryStore()
|
|
store.write_snapshot([
|
|
_snap("gateway", "prod", 80, "critical", hours_ago=1),
|
|
_snap("router", "prod", 40, "medium", hours_ago=2),
|
|
_snap("gateway", "staging", 50, "medium", hours_ago=1), # different env
|
|
])
|
|
result = store.dashboard_series("prod", hours=24, top_n=10)
|
|
services = [r["service"] for r in result]
|
|
assert "gateway" in services
|
|
assert "router" in services
|
|
assert "staging" not in str(result) # env filtered
|
|
|
|
def test_dashboard_series_sorted_by_score(self):
|
|
store = MemoryRiskHistoryStore()
|
|
store.write_snapshot([
|
|
_snap("gateway", "prod", 80, "critical", hours_ago=1),
|
|
_snap("router", "prod", 20, "low", hours_ago=2),
|
|
_snap("memory-service", "prod", 50, "medium", hours_ago=3),
|
|
])
|
|
result = store.dashboard_series("prod", hours=24, top_n=10)
|
|
scores = [r["score"] for r in result]
|
|
assert scores == sorted(scores, reverse=True)
|
|
|
|
def test_cleanup_removes_old_records(self):
|
|
store = MemoryRiskHistoryStore()
|
|
store.write_snapshot([
|
|
_snap("gateway", "prod", 30, "low", hours_ago=24 * 100), # old
|
|
_snap("gateway", "prod", 40, "medium", hours_ago=24 * 100),
|
|
_snap("gateway", "prod", 60, "high", hours_ago=1), # recent
|
|
])
|
|
deleted = store.cleanup(retention_days=90)
|
|
assert deleted == 2
|
|
series = store.get_series("gateway", "prod", hours=24 * 200)
|
|
assert len(series) == 1
|
|
assert series[0].score == 60
|
|
|
|
|
|
# ─── NullRiskHistoryStore ─────────────────────────────────────────────────────
|
|
|
|
class TestNullStore:
|
|
def test_write_returns_zero(self):
|
|
store = NullRiskHistoryStore()
|
|
assert store.write_snapshot([_snap("g", "prod", 50, "medium")]) == 0
|
|
|
|
def test_get_latest_returns_none(self):
|
|
store = NullRiskHistoryStore()
|
|
assert store.get_latest("gateway", "prod") is None
|
|
|
|
def test_get_series_returns_empty(self):
|
|
store = NullRiskHistoryStore()
|
|
assert store.get_series("gateway", "prod") == []
|
|
|
|
def test_cleanup_returns_zero(self):
|
|
store = NullRiskHistoryStore()
|
|
assert store.cleanup() == 0
|
|
|
|
|
|
# ─── AutoRiskHistoryStore fallback ────────────────────────────────────────────
|
|
|
|
class TestAutoStoreFallback:
|
|
def test_postgres_error_falls_back_to_memory(self):
|
|
"""When Postgres raises, AutoStore uses memory buffer for reads."""
|
|
auto = AutoRiskHistoryStore(pg_dsn="postgresql://bad:5432/none")
|
|
snap = _snap("gateway", "prod", 55, "high")
|
|
# Write — Postgres will fail, but memory buffer gets the snap
|
|
auto.write_snapshot([snap])
|
|
# get_latest — Postgres fails, falls back to memory
|
|
result = auto.get_latest("gateway", "prod")
|
|
assert result is not None
|
|
assert result.score == 55
|
|
|
|
def test_series_falls_back_to_memory(self):
|
|
auto = AutoRiskHistoryStore(pg_dsn="postgresql://bad:5432/none")
|
|
snaps = [
|
|
_snap("router", "prod", 40, "medium", hours_ago=2),
|
|
_snap("router", "prod", 60, "high", hours_ago=1),
|
|
]
|
|
auto.write_snapshot(snaps)
|
|
series = auto.get_series("router", "prod", hours=24)
|
|
assert len(series) == 2
|
|
|
|
def test_get_delta_falls_back_to_memory(self):
|
|
auto = AutoRiskHistoryStore(pg_dsn="postgresql://bad:5432/none")
|
|
auto.write_snapshot([
|
|
_snap("gateway", "prod", 30, "medium", hours_ago=25),
|
|
_snap("gateway", "prod", 55, "high", hours_ago=1),
|
|
])
|
|
delta = auto.get_delta("gateway", "prod", hours=24)
|
|
assert delta == 25
|
|
|
|
|
|
# ─── RiskSnapshot serialisation ──────────────────────────────────────────────
|
|
|
|
class TestRiskSnapshotSerde:
|
|
def test_to_dict_roundtrip(self):
|
|
snap = _snap("gateway", "prod", 72, "high")
|
|
d = snap.to_dict()
|
|
assert d["service"] == "gateway"
|
|
assert d["score"] == 72
|
|
snap2 = RiskSnapshot.from_dict(d)
|
|
assert snap2.score == 72
|
|
assert snap2.band == "high"
|