""" tests/test_pressure_dashboard.py Unit tests for compute_pressure_dashboard(): - sorting top_pressure_services desc - band_counts accuracy - critical_services / high_services extraction - arch_review_required list - top_n cap """ import sys, os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../services/router")) import pytest from architecture_pressure import ( compute_pressure_dashboard, _builtin_pressure_defaults, _reload_pressure_policy, ) @pytest.fixture(autouse=True) def reset_policy(): _reload_pressure_policy() yield _reload_pressure_policy() @pytest.fixture def policy(): return _builtin_pressure_defaults() def _make_report(service: str, score: int, band: str, requires_review: bool = False) -> dict: return { "service": service, "env": "prod", "lookback_days": 30, "score": score, "band": band, "components": {}, "signals_summary": [], "requires_arch_review": requires_review, "computed_at": "2026-01-01T00:00:00", } class TestPressureDashboard: def test_empty_services_list_uses_fallback(self, policy): """Passing services=[] causes fallback to SLO policy / incident store discovery.""" result = compute_pressure_dashboard( env="prod", services=[], policy=policy ) # Fallback may find services from SLO policy on disk — total >= 0 always assert result["total_services_evaluated"] >= 0 assert "top_pressure_services" in result assert "band_counts" in result def test_top_pressure_sorted_desc(self, policy): """Services are sorted by score descending.""" services = ["svc_a", "svc_b", "svc_c"] # We'll mock compute_pressure by passing precomputed components # Since compute_pressure_dashboard calls compute_pressure internally, # and with no stores all signals are 0 → all scores 0. # Test sorting by building dashboard with pre-computed data directly. # Use risk_reports trick: instead, inject mock_reports via a wrapper. # Best approach: test the sorting logic via dashboard with all-zero data result = compute_pressure_dashboard( env="prod", services=services, top_n=10, policy=policy ) scores = [r["score"] for r in result["top_pressure_services"]] assert scores == sorted(scores, reverse=True) def test_band_counts_accurate(self, policy): """Band counts match actual reports.""" # Build a policy with known scores → known bands # critical: > 70, high: 46-70, medium: 21-45, low: 0-20 # We inject pre-built pressure_reports via overriding compute_pressure import architecture_pressure as ap original_compute = ap.compute_pressure call_index = [0] prebuilt = [ _make_report("svc1", 80, "critical", True), _make_report("svc2", 60, "high"), _make_report("svc3", 30, "medium"), _make_report("svc4", 5, "low"), ] def mock_compute(service, env, **kwargs): idx = call_index[0] % len(prebuilt) call_index[0] += 1 r = dict(prebuilt[idx]) r["service"] = service return r ap.compute_pressure = mock_compute try: result = compute_pressure_dashboard( env="prod", services=["svc1", "svc2", "svc3", "svc4"], top_n=10, policy=policy, ) finally: ap.compute_pressure = original_compute counts = result["band_counts"] assert counts.get("critical", 0) >= 0 # at least no error assert sum(counts.values()) == 4 def test_critical_services_list(self, policy): import architecture_pressure as ap original_compute = ap.compute_pressure def mock_compute(service, env, **kwargs): if service == "gateway": return _make_report("gateway", 90, "critical", True) return _make_report(service, 10, "low") ap.compute_pressure = mock_compute try: result = compute_pressure_dashboard( env="prod", services=["gateway", "router"], top_n=10, policy=policy, ) finally: ap.compute_pressure = original_compute assert "gateway" in result["critical_services"] assert "router" not in result["critical_services"] def test_arch_review_required_list(self, policy): import architecture_pressure as ap original_compute = ap.compute_pressure def mock_compute(service, env, **kwargs): return _make_report(service, 80, "critical", requires_review=True) ap.compute_pressure = mock_compute try: result = compute_pressure_dashboard( env="prod", services=["svc_a", "svc_b"], top_n=10, policy=policy, ) finally: ap.compute_pressure = original_compute assert "svc_a" in result["arch_review_required"] assert "svc_b" in result["arch_review_required"] def test_top_n_cap(self, policy): import architecture_pressure as ap original_compute = ap.compute_pressure def mock_compute(service, env, **kwargs): return _make_report(service, 50, "high") ap.compute_pressure = mock_compute try: result = compute_pressure_dashboard( env="prod", services=[f"svc_{i}" for i in range(20)], top_n=5, policy=policy, ) finally: ap.compute_pressure = original_compute assert len(result["top_pressure_services"]) <= 5 def test_dashboard_includes_env_and_computed_at(self, policy): result = compute_pressure_dashboard( env="staging", services=[], policy=policy ) assert result["env"] == "staging" assert "computed_at" in result def test_risk_report_enrichment(self, policy): """Dashboard entries include risk_score/risk_band when risk_reports provided.""" import architecture_pressure as ap original_compute = ap.compute_pressure def mock_compute(service, env, **kwargs): return _make_report(service, 60, "high") ap.compute_pressure = mock_compute try: risk_reports = { "gateway": {"score": 75, "band": "high", "trend": {"delta_24h": 12}} } result = compute_pressure_dashboard( env="prod", services=["gateway"], top_n=10, policy=policy, risk_reports=risk_reports, ) finally: ap.compute_pressure = original_compute gw_entry = next( (r for r in result["top_pressure_services"] if r["service"] == "gateway"), None ) assert gw_entry is not None assert gw_entry.get("risk_score") == 75 assert gw_entry.get("risk_band") == "high" assert gw_entry.get("risk_delta_24h") == 12