Files
microdao-daarion/tests/test_incident_buckets.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

227 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Tests for Root-Cause Buckets: build_root_cause_buckets + bucket_recommendations.
"""
import sys, os, datetime
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "router"))
def _ts(days_ago: float = 0.0) -> str:
return (datetime.datetime.utcnow() - datetime.timedelta(days=days_ago)).isoformat()
def _make_inc(store, service, kind_tag, sig=None, days_ago=0.0, status="open", severity="P2"):
meta = {}
if sig:
meta["incident_signature"] = sig
if kind_tag:
meta["kind"] = kind_tag
inc = store.create_incident({
"service": service, "env": "prod", "severity": severity,
"title": f"{kind_tag} on {service}", "started_at": _ts(days_ago),
"created_by": "test", "meta": meta,
})
if status == "closed":
store.close_incident(inc["id"], _ts(days_ago - 0.01), "resolved")
return inc
@pytest.fixture
def store():
from incident_store import MemoryIncidentStore
return MemoryIncidentStore()
@pytest.fixture
def policy():
import incident_intelligence
incident_intelligence._POLICY_CACHE = None
return {
"correlation": {"lookback_days": 30, "max_related": 10, "min_score": 20, "rules": []},
"recurrence": {
"thresholds": {"signature": {"warn": 2, "high": 4}, "kind": {"warn": 3, "high": 6}},
"top_n": 15,
},
"buckets": {
"mode": "service_kind",
"signature_prefix_len": 12,
"top_n": 10,
"min_count": {"7": 2, "30": 3},
},
"autofollowups": {"enabled": True, "only_when_high": True, "owner": "oncall",
"priority": "P1", "due_days": 7,
"max_followups_per_bucket_per_week": 1,
"dedupe_key_prefix": "intel_recur"},
"digest": {"markdown_max_chars": 8000, "top_incidents": 20,
"output_dir": "/tmp/test_bucket_reports",
"include_closed": True, "include_open": True},
}
class TestBuildRootCauseBuckets:
def test_groups_by_service_kind(self, store, policy):
from incident_intelligence import build_root_cause_buckets
for _ in range(4):
_make_inc(store, "gateway", "error_rate", days_ago=1.0)
for _ in range(3):
_make_inc(store, "router", "latency", days_ago=2.0)
incidents = store.list_incidents(limit=100)
buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30])
bkeys = [b["bucket_key"] for b in buckets]
assert "gateway|error_rate" in bkeys
assert "router|latency" in bkeys
def test_min_count_filter_7d(self, store, policy):
from incident_intelligence import build_root_cause_buckets
# 1 incident only (below min_count[7]=2) — should not appear
_make_inc(store, "svc", "latency", days_ago=1.0)
# 3 incidents — should appear
for _ in range(3):
_make_inc(store, "svc2", "error_rate", days_ago=1.0)
incidents = store.list_incidents(limit=100)
buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30])
bkeys = [b["bucket_key"] for b in buckets]
assert "svc2|error_rate" in bkeys
assert "svc|latency" not in bkeys
def test_min_count_filter_30d(self, store, policy):
from incident_intelligence import build_root_cause_buckets
# 4 incidents in 820d window (beyond 7d but within 30d, count_30d=4 >= min_30=3)
for i in range(4):
_make_inc(store, "gateway", "oom", days_ago=8.0 + i)
incidents = store.list_incidents(limit=100)
buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30])
bkeys = [b["bucket_key"] for b in buckets]
assert "gateway|oom" in bkeys
def test_top_n_enforced(self, store, policy):
from incident_intelligence import build_root_cause_buckets
for i in range(15):
for j in range(3):
_make_inc(store, f"svc{i}", "latency", days_ago=float(j) * 0.5)
policy["buckets"]["top_n"] = 5
incidents = store.list_incidents(limit=200)
buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30])
assert len(buckets) <= 5
def test_counts_correct(self, store, policy):
from incident_intelligence import build_root_cause_buckets
# 5 incidents in 7d window, 2 more in 8-15d (30d bucket)
for _ in range(5):
_make_inc(store, "gateway", "error_rate", days_ago=2.0)
for _ in range(2):
_make_inc(store, "gateway", "error_rate", days_ago=10.0)
incidents = store.list_incidents(limit=100)
buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30])
gw = next(b for b in buckets if b["bucket_key"] == "gateway|error_rate")
assert gw["counts"]["7d"] == 5
assert gw["counts"]["30d"] == 7
def test_open_count_only_includes_open_mitigating(self, store, policy):
from incident_intelligence import build_root_cause_buckets
_make_inc(store, "svc", "latency", days_ago=1.0, status="open")
_make_inc(store, "svc", "latency", days_ago=1.5, status="closed")
_make_inc(store, "svc", "latency", days_ago=2.0, status="open")
incidents = store.list_incidents(limit=100) + [
i for i in store.list_incidents({"status": "closed"}, limit=10)
]
buckets = build_root_cause_buckets(incidents, policy=policy, windows=[7, 30])
svc_b = next((b for b in buckets if b["bucket_key"] == "svc|latency"), None)
if svc_b:
assert svc_b["counts"]["open"] == 2
def test_recommendations_are_deterministic(self, store, policy):
from incident_intelligence import build_root_cause_buckets
for _ in range(5):
_make_inc(store, "gateway", "latency", days_ago=1.0)
incidents = store.list_incidents(limit=100)
b1 = build_root_cause_buckets(incidents, policy=policy)
b2 = build_root_cause_buckets(incidents, policy=policy)
assert b1[0]["recommendations"] == b2[0]["recommendations"]
def test_signature_mode(self, store, policy):
from incident_intelligence import build_root_cause_buckets
SIG = "aabbccddee112233" * 2
for _ in range(3):
_make_inc(store, "gateway", "error_rate", sig=SIG, days_ago=1.0)
policy["buckets"]["mode"] = "signature_prefix"
policy["buckets"]["signature_prefix_len"] = 12
incidents = store.list_incidents(limit=100)
buckets = build_root_cause_buckets(incidents, policy=policy)
bkeys = [b["bucket_key"] for b in buckets]
assert any(b.startswith(SIG[:12]) for b in bkeys)
def test_sorted_by_count_7d_desc(self, store, policy):
from incident_intelligence import build_root_cause_buckets
for _ in range(6):
_make_inc(store, "svc_a", "error_rate", days_ago=1.0)
for _ in range(3):
_make_inc(store, "svc_b", "latency", days_ago=1.0)
incidents = store.list_incidents(limit=100)
buckets = build_root_cause_buckets(incidents, policy=policy)
assert len(buckets) >= 2
assert buckets[0]["counts"]["7d"] >= buckets[1]["counts"]["7d"]
class TestBucketRecommendations:
def test_error_rate_recommendations(self):
from incident_intelligence import bucket_recommendations
b = {"kinds": {"error_rate"}, "counts": {"open": 0}}
recs = bucket_recommendations(b)
assert any("regression" in r.lower() or "SLO" in r for r in recs)
def test_latency_recommendations(self):
from incident_intelligence import bucket_recommendations
b = {"kinds": {"latency"}, "counts": {"open": 0}}
recs = bucket_recommendations(b)
assert any("p95" in r.lower() or "perf" in r.lower() for r in recs)
def test_security_recommendations(self):
from incident_intelligence import bucket_recommendations
b = {"kinds": {"security"}, "counts": {"open": 0}}
recs = bucket_recommendations(b)
assert any("secret" in r.lower() or "scanner" in r.lower() or "rotate" in r.lower()
for r in recs)
def test_open_incident_adds_warning(self):
from incident_intelligence import bucket_recommendations
b = {"kinds": {"latency"}, "counts": {"open": 2}}
recs = bucket_recommendations(b)
assert any("deploy" in r.lower() or "mitigat" in r.lower() for r in recs)
def test_unknown_kind_returns_defaults(self):
from incident_intelligence import bucket_recommendations
b = {"kinds": {"custom"}, "counts": {"open": 0}}
recs = bucket_recommendations(b)
assert len(recs) > 0
def test_max_recs_capped(self):
from incident_intelligence import bucket_recommendations
b = {"kinds": {"error_rate", "latency", "oom", "disk", "security"}, "counts": {"open": 3}}
recs = bucket_recommendations(b)
assert len(recs) <= 5