Files
microdao-daarion/tests/test_risk_attribution.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

299 lines
11 KiB
Python

"""
tests/test_risk_attribution.py — Unit tests for the Risk Attribution Engine.
Tests:
- deploy alerts → deploy cause
- occurrences/escalations → incident_storm cause
- SLO violations → slo_violation cause
- overdue followups → followups_overdue cause
- alert-loop degradation → alert_loop_degraded cause
- sort + max_causes + confidence bands
- release gate results → dependency + drift causes
"""
import datetime
import sys
import pytest
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "services" / "router"))
from risk_attribution import (
compute_attribution,
_detect_deploy,
_detect_dependency,
_detect_drift,
_detect_incident_storm,
_detect_slo,
_detect_followups_overdue,
_detect_alert_loop_degraded,
_score_to_confidence,
_build_summary,
_builtin_attr_defaults,
_reload_attribution_policy,
)
@pytest.fixture(autouse=True)
def reset_cache():
_reload_attribution_policy()
yield
_reload_attribution_policy()
@pytest.fixture
def policy():
return _builtin_attr_defaults()
def _alert(kind: str, hours_ago: float = 1.0) -> dict:
ts = (datetime.datetime.utcnow() - datetime.timedelta(hours=hours_ago)).isoformat()
return {"kind": kind, "created_at": ts, "service": "gateway"}
def _cutoff(hours: int = 24) -> str:
return (datetime.datetime.utcnow() - datetime.timedelta(hours=hours)).isoformat()
# ─── Individual signal detectors ─────────────────────────────────────────────
class TestDetectDeploy:
def test_deploy_alert_gives_score(self, policy):
alerts = [_alert("deploy", hours_ago=1)]
score, evidence, _ = _detect_deploy(alerts, _cutoff(), policy)
assert score == 30
assert "deploy alerts: 1" in evidence[0]
def test_no_deploy_alerts_zero_score(self, policy):
alerts = [_alert("cpu_high", hours_ago=1)]
score, evidence, _ = _detect_deploy(alerts, _cutoff(), policy)
assert score == 0
assert evidence == []
def test_multiple_deploy_alerts(self, policy):
alerts = [_alert("deploy"), _alert("rollout", hours_ago=2), _alert("canary", hours_ago=3)]
score, evidence, _ = _detect_deploy(alerts, _cutoff(), policy)
assert score == 30
assert "3" in evidence[0]
def test_old_deploy_ignored(self, policy):
old_ts = (datetime.datetime.utcnow() - datetime.timedelta(hours=30)).isoformat()
alerts = [{"kind": "deploy", "created_at": old_ts}]
score, evidence, _ = _detect_deploy(alerts, _cutoff(24), policy)
assert score == 0
class TestDetectDependency:
def test_dependency_scan_fail_gives_score(self, policy):
gates = [{"gate": "dependency_scan", "status": "fail"}]
score, evidence, _ = _detect_dependency(gates, policy)
assert score == 25
assert "dependency_scan" in evidence[0]
def test_dependency_scan_warn_gives_score(self, policy):
gates = [{"gate": "dependency_scan", "status": "warn"}]
score, evidence, _ = _detect_dependency(gates, policy)
assert score == 25
def test_dependency_scan_pass_zero(self, policy):
gates = [{"gate": "dependency_scan", "status": "pass"}]
score, evidence, _ = _detect_dependency(gates, policy)
assert score == 0
def test_no_gate_results_zero(self, policy):
score, evidence, _ = _detect_dependency([], policy)
assert score == 0
class TestDetectDrift:
def test_drift_fail_gives_score(self, policy):
gates = [{"gate": "drift", "status": "fail"}]
score, evidence, _ = _detect_drift(gates, policy)
assert score == 25
def test_drift_pass_zero(self, policy):
gates = [{"gate": "drift", "status": "pass"}]
score, evidence, _ = _detect_drift(gates, policy)
assert score == 0
class TestDetectIncidentStorm:
def test_high_occurrences_gives_score(self, policy):
score, evidence, _ = _detect_incident_storm(occurrences_60m=15, escalations_24h=0,
policy=policy)
assert score == 20
assert "occurrences_60m=15" in evidence[0]
def test_high_escalations_gives_score(self, policy):
score, evidence, _ = _detect_incident_storm(occurrences_60m=0, escalations_24h=3,
policy=policy)
assert score == 20
assert "escalations_24h=3" in evidence[0]
def test_both_signals_combined_evidence(self, policy):
score, evidence, _ = _detect_incident_storm(occurrences_60m=12, escalations_24h=4,
policy=policy)
assert score == 20
assert len(evidence) == 2
def test_below_threshold_zero(self, policy):
score, evidence, _ = _detect_incident_storm(occurrences_60m=5, escalations_24h=1,
policy=policy)
assert score == 0
class TestDetectSlo:
def test_one_violation_gives_score(self, policy):
score, evidence, _ = _detect_slo(slo_violations=1, policy=policy)
assert score == 15
def test_zero_violations_zero(self, policy):
score, evidence, _ = _detect_slo(slo_violations=0, policy=policy)
assert score == 0
class TestDetectFollowups:
def test_overdue_gives_score(self, policy):
score, evidence, _ = _detect_followups_overdue(overdue_count=2, policy=policy)
assert score == 10
assert "2" in evidence[0]
def test_zero_overdue_zero(self, policy):
score, evidence, _ = _detect_followups_overdue(overdue_count=0, policy=policy)
assert score == 0
class TestDetectAlertLoop:
def test_loop_degraded_gives_score(self, policy):
score, evidence, _ = _detect_alert_loop_degraded(loop_slo_violations=1, policy=policy)
assert score == 10
def test_no_violations_zero(self, policy):
score, evidence, _ = _detect_alert_loop_degraded(loop_slo_violations=0, policy=policy)
assert score == 0
# ─── Confidence bands ─────────────────────────────────────────────────────────
class TestConfidence:
def test_score_60_is_high(self, policy):
assert _score_to_confidence(60, policy) == "high"
def test_score_35_is_medium(self, policy):
assert _score_to_confidence(35, policy) == "medium"
def test_score_30_is_low(self, policy):
assert _score_to_confidence(30, policy) == "low"
def test_score_0_is_low(self, policy):
assert _score_to_confidence(0, policy) == "low"
# ─── Full compute_attribution ─────────────────────────────────────────────────
class TestComputeAttribution:
def test_no_signals_empty_causes(self, policy):
result = compute_attribution("gateway", "prod", policy=policy)
assert result["causes"] == []
assert result["service"] == "gateway"
assert result["summary"] == "No significant attribution signals detected."
assert result["llm_enrichment"]["enabled"] is False
def test_deploy_signal_produces_cause(self, policy):
alerts = [_alert("deploy", hours_ago=1)]
result = compute_attribution(
"gateway", "prod",
alerts_24h=alerts,
policy=policy,
)
types = [c["type"] for c in result["causes"]]
assert "deploy" in types
def test_multiple_causes_sorted_desc(self, policy):
# deploy=30, slo=15, followups=10
alerts = [_alert("deploy")]
result = compute_attribution(
"gateway", "prod",
alerts_24h=alerts,
slo_violations=1,
overdue_followup_count=2,
policy=policy,
)
scores = [c["score"] for c in result["causes"]]
assert scores == sorted(scores, reverse=True)
def test_max_causes_respected(self, policy):
# Inject all 7 signal types to exceed max_causes=5
alerts = [_alert("deploy")]
result = compute_attribution(
"gateway", "prod",
alerts_24h=alerts,
occurrences_60m=15,
escalations_24h=3,
release_gate_results=[
{"gate": "dependency_scan", "status": "fail"},
{"gate": "drift", "status": "warn"},
],
slo_violations=1,
overdue_followup_count=2,
loop_slo_violations=1,
policy=policy,
)
assert len(result["causes"]) <= 5
def test_causes_have_confidence(self, policy):
alerts = [_alert("deploy")]
result = compute_attribution("gateway", "prod", alerts_24h=alerts, policy=policy)
for cause in result["causes"]:
assert "confidence" in cause
assert cause["confidence"] in ("high", "medium", "low")
def test_causes_have_evidence(self, policy):
alerts = [_alert("rollout")]
result = compute_attribution("gateway", "prod", alerts_24h=alerts, policy=policy)
for cause in result["causes"]:
assert isinstance(cause.get("evidence"), list)
def test_slo_from_risk_report_components(self, policy):
"""If slo_violations=0 but risk_report has SLO data, it extracts from components."""
risk_report = {
"service": "gateway", "env": "prod",
"components": {"slo": {"violations": 2, "points": 20}},
}
result = compute_attribution(
"gateway", "prod",
risk_report=risk_report,
policy=policy,
)
types = [c["type"] for c in result["causes"]]
assert "slo_violation" in types
def test_followups_from_risk_report_components(self, policy):
risk_report = {
"components": {
"followups": {"P0": 1, "P1": 0, "other": 0, "points": 20}
}
}
result = compute_attribution(
"gateway", "prod",
risk_report=risk_report,
policy=policy,
)
types = [c["type"] for c in result["causes"]]
assert "followups_overdue" in types
def test_summary_template_filled(self, policy):
alerts = [_alert("deploy")]
result = compute_attribution("gateway", "prod", alerts_24h=alerts, policy=policy)
assert result["summary"].startswith("Likely causes:")
assert "deploy" in result["summary"].lower()
def test_incident_storm_cause(self, policy):
result = compute_attribution(
"router", "prod",
occurrences_60m=12,
escalations_24h=3,
policy=policy,
)
types = [c["type"] for c in result["causes"]]
assert "incident_storm" in types