Files
microdao-daarion/tests/test_risk_evidence_refs.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

204 lines
7.6 KiB
Python

"""
tests/test_risk_evidence_refs.py
Unit tests for evidence refs in risk_attribution.py:
- deploy cause includes alert_ref refs
- followups include dedupe_key / incident_id refs
- max_refs_per_cause enforced
- top-level evidence_refs built correctly
- incident_storm includes incident_ids
"""
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../services/router"))
import datetime
import pytest
from risk_attribution import (
compute_attribution,
build_evidence_refs,
_detect_deploy,
_detect_followups_overdue,
_detect_incident_storm,
_detect_slo,
)
def _cutoff(hours: int = 24) -> str:
return (datetime.datetime.utcnow() - datetime.timedelta(hours=hours)).isoformat()
def _ts(minutes_ago: int = 5) -> str:
return (datetime.datetime.utcnow() - datetime.timedelta(minutes=minutes_ago)).isoformat()
_POLICY = {
"weights": {
"deploy": 30, "dependency": 25, "drift": 25, "incident_storm": 20,
"slo_violation": 15, "followups_overdue": 10, "alert_loop_degraded": 10,
},
"signals": {
"deploy": {"kinds": ["deploy", "canary", "rollout"]},
"incident_storm": {"thresholds": {"occurrences_60m_warn": 10, "escalations_24h_warn": 2}},
"slo": {"require_active_violation": True},
},
"output": {"confidence_bands": {"high": 60, "medium": 35}},
"defaults": {"lookback_hours": 24, "max_causes": 5, "llm_mode": "off"},
"timeline": {"enabled": False},
"evidence_linking": {"enabled": True, "max_refs_per_cause": 5},
}
class TestDeployCauseRefs:
def test_deploy_cause_includes_alert_refs(self):
alerts = [
{"alert_ref": "alrt_001", "kind": "deploy", "created_at": _ts(5),
"service": "gateway"},
{"alert_ref": "alrt_002", "kind": "canary", "created_at": _ts(10),
"service": "gateway"},
]
score, evid, refs = _detect_deploy(alerts, _cutoff(), _POLICY, max_refs=10)
assert score == 30
alert_refs = [r["alert_ref"] for r in refs if "alert_ref" in r]
assert "alrt_001" in alert_refs
assert "alrt_002" in alert_refs
def test_deploy_no_alerts_no_refs(self):
score, evid, refs = _detect_deploy([], _cutoff(), _POLICY)
assert score == 0
assert refs == []
def test_max_refs_per_cause_enforced(self):
alerts = [
{"alert_ref": f"alrt_{i}", "kind": "deploy",
"created_at": _ts(i + 1), "service": "svc"}
for i in range(20)
]
score, evid, refs = _detect_deploy(alerts, _cutoff(), _POLICY, max_refs=5)
assert score == 30
assert len(refs) <= 5
class TestFollowupRefs:
def test_followups_include_provided_refs(self):
followup_refs = [
{"incident_id": "inc_001", "dedupe_key": "fu_k1"},
{"incident_id": "inc_002", "dedupe_key": "fu_k2"},
]
score, evid, refs = _detect_followups_overdue(2, _POLICY, followup_refs=followup_refs)
assert score == 10
inc_ids = [r.get("incident_id") for r in refs]
assert "inc_001" in inc_ids
assert "inc_002" in inc_ids
def test_followups_max_refs(self):
followup_refs = [{"incident_id": f"inc_{i}"} for i in range(20)]
score, evid, refs = _detect_followups_overdue(20, _POLICY, followup_refs=followup_refs,
max_refs=4)
assert len(refs) <= 4
def test_followups_zero_overdue_no_refs(self):
score, evid, refs = _detect_followups_overdue(0, _POLICY)
assert score == 0
assert refs == []
class TestIncidentStormRefs:
def test_storm_includes_incident_ids(self):
score, evid, refs = _detect_incident_storm(
occurrences_60m=15, escalations_24h=3,
policy=_POLICY,
incident_ids=["inc_001", "inc_002"],
max_refs=10,
)
assert score == 20
incident_ids = [r["incident_id"] for r in refs]
assert "inc_001" in incident_ids
assert "inc_002" in incident_ids
def test_storm_max_refs(self):
ids = [f"inc_{i}" for i in range(20)]
score, evid, refs = _detect_incident_storm(15, 3, _POLICY, incident_ids=ids, max_refs=3)
assert len(refs) <= 3
class TestSloRefs:
def test_slo_includes_metric_names(self):
metrics = ["error_rate:gateway", "latency_p99:gateway"]
score, evid, refs = _detect_slo(2, _POLICY, slo_metrics=metrics)
assert score == 15
metric_names = [r["metric"] for r in refs]
assert "error_rate:gateway" in metric_names
def test_slo_max_refs(self):
metrics = [f"metric_{i}" for i in range(20)]
score, evid, refs = _detect_slo(5, _POLICY, slo_metrics=metrics, max_refs=3)
assert len(refs) <= 3
class TestTopLevelEvidenceRefs:
def test_build_evidence_refs_structure(self):
alerts = [{"alert_ref": "alrt_1"}, {"alert_ref": "alrt_2"}]
incidents = [{"id": "inc_1"}, {"id": "inc_2"}]
gates = [{"run_id": "rc_001", "gate": "dependency_scan",
"status": "fail", "artifact": "ops/reports/scan.md"}]
followups = [{"incident_id": "inc_1", "dedupe_key": "fu_k1"}]
refs = build_evidence_refs(alerts, incidents, gates, followup_refs=followups,
policy=_POLICY)
assert "alrt_1" in refs["alerts"]
assert "alrt_2" in refs["alerts"]
assert "inc_1" in refs["incidents"]
assert "rc_001" in refs["release_checks"]
assert "ops/reports/scan.md" in refs["artifacts"]
assert len(refs["followups"]) == 1
def test_evidence_refs_max_refs(self):
alerts = [{"alert_ref": f"a_{i}"} for i in range(30)]
refs = build_evidence_refs(alerts, [], [], policy=_POLICY)
assert len(refs["alerts"]) <= 5 # policy max_refs_per_cause = 5
def test_empty_inputs(self):
refs = build_evidence_refs([], [], [], policy=_POLICY)
assert refs["alerts"] == []
assert refs["incidents"] == []
assert refs["release_checks"] == []
assert refs["artifacts"] == []
class TestComputeAttributionRefsIntegration:
def test_attribution_includes_cause_refs(self):
alerts = [
{"alert_ref": "alrt_a1", "kind": "deploy",
"created_at": _ts(5), "service": "gateway"},
]
result = compute_attribution(
"gateway", "prod",
alerts_24h=alerts,
policy=_POLICY,
)
deploy_cause = next((c for c in result["causes"] if c["type"] == "deploy"), None)
assert deploy_cause is not None
assert "alrt_a1" in str(deploy_cause.get("refs", []))
def test_attribution_includes_evidence_refs_top_level(self):
alerts = [{"alert_ref": "alrt_x", "kind": "deploy",
"created_at": _ts(5), "service": "svc"}]
incidents = [{"id": "inc_42", "started_at": _ts(10), "service": "svc"}]
result = compute_attribution(
"svc", "prod",
alerts_24h=alerts,
incidents_24h=incidents,
policy={**_POLICY, "timeline": {"enabled": False},
"evidence_linking": {"enabled": True, "max_refs_per_cause": 10}},
)
assert "evidence_refs" in result
assert "alrt_x" in result["evidence_refs"]["alerts"]
assert "inc_42" in result["evidence_refs"]["incidents"]
def test_attribution_evidence_refs_disabled(self):
policy = {**_POLICY,
"timeline": {"enabled": False},
"evidence_linking": {"enabled": False, "max_refs_per_cause": 10}}
result = compute_attribution("svc", "prod", policy=policy)
assert result.get("evidence_refs") == {}