New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
175 lines
6.8 KiB
Python
175 lines
6.8 KiB
Python
"""
|
|
tests/test_risk_trend.py — Unit tests for compute_trend and enrich helpers.
|
|
|
|
Tests:
|
|
- delta_24h / delta_7d computed correctly
|
|
- volatility computed from daily series
|
|
- regression flags set by policy thresholds
|
|
- enrich_risk_report_with_trend: non-fatal on store error
|
|
"""
|
|
import datetime
|
|
import sys
|
|
import pytest
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "services" / "router"))
|
|
|
|
from risk_engine import compute_trend, enrich_risk_report_with_trend, _reload_policy, _builtin_defaults
|
|
from risk_history_store import MemoryRiskHistoryStore, RiskSnapshot
|
|
|
|
|
|
def _reload():
|
|
_reload_policy()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_policy_cache():
|
|
_reload_policy()
|
|
yield
|
|
_reload_policy()
|
|
|
|
|
|
@pytest.fixture
|
|
def policy():
|
|
return _builtin_defaults()
|
|
|
|
|
|
def _snap(service, env, score, hours_ago=0) -> RiskSnapshot:
|
|
from risk_engine import score_to_band, _builtin_defaults
|
|
p = _builtin_defaults()
|
|
ts = (datetime.datetime.utcnow() - datetime.timedelta(hours=hours_ago)).isoformat()
|
|
band = score_to_band(score, p)
|
|
return RiskSnapshot(ts=ts, service=service, env=env, score=score, band=band)
|
|
|
|
|
|
# ─── compute_trend ────────────────────────────────────────────────────────────
|
|
|
|
class TestComputeTrend:
|
|
def test_empty_series_returns_nulls(self, policy):
|
|
trend = compute_trend([], policy=policy)
|
|
assert trend["delta_24h"] is None
|
|
assert trend["delta_7d"] is None
|
|
assert trend["slope_per_day"] is None
|
|
assert trend["volatility"] is None
|
|
assert trend["regression"] == {"warn": False, "fail": False}
|
|
|
|
def test_delta_24h_computed(self, policy):
|
|
"""Latest score 60, baseline 40 → delta_24h = 20."""
|
|
series = [
|
|
_snap("gw", "prod", 60, hours_ago=0),
|
|
_snap("gw", "prod", 40, hours_ago=25), # baseline at -25h
|
|
]
|
|
trend = compute_trend(series, policy=policy)
|
|
assert trend["delta_24h"] == 20
|
|
|
|
def test_delta_7d_computed(self, policy):
|
|
series = [
|
|
_snap("gw", "prod", 65, hours_ago=0),
|
|
_snap("gw", "prod", 30, hours_ago=170), # baseline at ~7d
|
|
]
|
|
trend = compute_trend(series, policy=policy)
|
|
assert trend["delta_7d"] == 35
|
|
|
|
def test_delta_none_when_no_baseline_in_window(self, policy):
|
|
"""Only recent snaps — no baseline before 24h."""
|
|
series = [_snap("gw", "prod", 60, hours_ago=0)]
|
|
trend = compute_trend(series, policy=policy)
|
|
assert trend["delta_24h"] is None
|
|
|
|
def test_improving_negative_delta(self, policy):
|
|
series = [
|
|
_snap("gw", "prod", 20, hours_ago=0),
|
|
_snap("gw", "prod", 70, hours_ago=25),
|
|
]
|
|
trend = compute_trend(series, policy=policy)
|
|
assert trend["delta_24h"] == -50 # improving
|
|
|
|
def test_regression_warn_set_when_delta_exceeds_warn(self, policy):
|
|
"""delta_24h_warn = 10 by default; delta 15 → warn=True."""
|
|
series = [
|
|
_snap("gw", "prod", 55, hours_ago=0),
|
|
_snap("gw", "prod", 40, hours_ago=25), # delta_24h = 15
|
|
]
|
|
trend = compute_trend(series, policy=policy)
|
|
assert trend["regression"]["warn"] is True
|
|
assert trend["regression"]["fail"] is False
|
|
|
|
def test_regression_fail_set_when_delta_exceeds_fail(self, policy):
|
|
"""delta_24h_fail = 20 by default; delta 25 → fail=True."""
|
|
series = [
|
|
_snap("gw", "prod", 65, hours_ago=0),
|
|
_snap("gw", "prod", 40, hours_ago=25), # delta_24h = 25
|
|
]
|
|
trend = compute_trend(series, policy=policy)
|
|
assert trend["regression"]["fail"] is True
|
|
assert trend["regression"]["warn"] is True # fail implies warn
|
|
|
|
def test_no_regression_below_threshold(self, policy):
|
|
"""delta 5 < warn 10 → no flags."""
|
|
series = [
|
|
_snap("gw", "prod", 45, hours_ago=0),
|
|
_snap("gw", "prod", 40, hours_ago=25),
|
|
]
|
|
trend = compute_trend(series, policy=policy)
|
|
assert trend["regression"]["warn"] is False
|
|
assert trend["regression"]["fail"] is False
|
|
|
|
def test_slope_computed_with_multiple_points(self, policy):
|
|
"""Linear slope should reflect direction of change."""
|
|
# Increasing: 20, 40, 60 over hours 3, 2, 1 ago
|
|
series = [
|
|
_snap("gw", "prod", 60, hours_ago=1),
|
|
_snap("gw", "prod", 40, hours_ago=2),
|
|
_snap("gw", "prod", 20, hours_ago=3),
|
|
]
|
|
trend = compute_trend(series, policy=policy)
|
|
assert trend["slope_per_day"] is not None
|
|
assert trend["slope_per_day"] > 0 # rising
|
|
|
|
def test_volatility_with_daily_scores(self, policy):
|
|
"""Multiple daily snapshots → non-zero volatility."""
|
|
series = []
|
|
scores = [20, 50, 30, 80, 40, 60, 10]
|
|
for i, score in enumerate(scores):
|
|
series.append(_snap("gw", "prod", score, hours_ago=i * 24 + 1))
|
|
trend = compute_trend(series, policy=policy)
|
|
assert trend["volatility"] is not None
|
|
assert trend["volatility"] > 0
|
|
|
|
def test_volatility_none_if_single_day(self, policy):
|
|
series = [_snap("gw", "prod", 50, hours_ago=1)]
|
|
trend = compute_trend(series, policy=policy)
|
|
assert trend["volatility"] is None
|
|
|
|
|
|
# ─── enrich_risk_report_with_trend ───────────────────────────────────────────
|
|
|
|
class TestEnrichWithTrend:
|
|
def test_adds_trend_key_from_store(self, policy):
|
|
store = MemoryRiskHistoryStore()
|
|
store.write_snapshot([
|
|
_snap("gateway", "prod", 40, hours_ago=25),
|
|
_snap("gateway", "prod", 60, hours_ago=1),
|
|
])
|
|
report = {"service": "gateway", "env": "prod", "score": 60}
|
|
enrich_risk_report_with_trend(report, store, policy)
|
|
assert "trend" in report
|
|
assert report["trend"]["delta_24h"] == 20
|
|
|
|
def test_trend_null_on_store_error(self, policy):
|
|
"""Store raises → trend=None, never raises."""
|
|
broken_store = MagicMock()
|
|
broken_store.get_series.side_effect = RuntimeError("DB down")
|
|
report = {"service": "gateway", "env": "prod", "score": 60}
|
|
enrich_risk_report_with_trend(report, broken_store, policy)
|
|
assert report["trend"] is None
|
|
|
|
def test_trend_empty_when_no_history(self, policy):
|
|
store = MemoryRiskHistoryStore() # empty
|
|
report = {"service": "gateway", "env": "prod", "score": 60}
|
|
enrich_risk_report_with_trend(report, store, policy)
|
|
assert report["trend"]["delta_24h"] is None
|
|
assert report["trend"]["delta_7d"] is None
|
|
assert report["trend"]["regression"]["warn"] is False
|