Files
microdao-daarion/services/router/backlog_generator.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

531 lines
20 KiB
Python

"""
backlog_generator.py — Auto-generation of Engineering Backlog items
from Platform Priority / Risk digests.
DAARION.city | deterministic, no LLM.
Public API:
load_backlog_policy() -> Dict
generate_from_pressure_digest(digest_data, env, ...) -> GenerateResult
generate_from_risk_digest(digest_data, env, ...) -> GenerateResult
_build_item_from_rule(service, rule, context, policy, week_str, env) -> BacklogItem | None
_make_dedupe_key(prefix, week_str, env, service, category) -> str
"""
from __future__ import annotations
import datetime
import json
import logging
import yaml
from pathlib import Path
from typing import Any, Dict, List, Optional
from backlog_store import (
BacklogItem, BacklogEvent, BacklogStore,
_new_id, _now_iso,
)
logger = logging.getLogger(__name__)
# ─── Policy ───────────────────────────────────────────────────────────────────
_BACKLOG_POLICY_CACHE: Optional[Dict] = None
_BACKLOG_POLICY_PATHS = [
Path("config/backlog_policy.yml"),
Path(__file__).resolve().parent.parent.parent / "config" / "backlog_policy.yml",
]
def load_backlog_policy() -> Dict:
global _BACKLOG_POLICY_CACHE
if _BACKLOG_POLICY_CACHE is not None:
return _BACKLOG_POLICY_CACHE
for p in _BACKLOG_POLICY_PATHS:
if p.exists():
try:
with open(p) as f:
data = yaml.safe_load(f) or {}
_BACKLOG_POLICY_CACHE = data
return data
except Exception as e:
logger.warning("Failed to load backlog_policy from %s: %s", p, e)
_BACKLOG_POLICY_CACHE = _builtin_backlog_defaults()
return _BACKLOG_POLICY_CACHE
def _reload_backlog_policy() -> None:
global _BACKLOG_POLICY_CACHE
_BACKLOG_POLICY_CACHE = None
def _builtin_backlog_defaults() -> Dict:
return {
"defaults": {"env": "prod", "retention_days": 180, "max_items_per_run": 50},
"dedupe": {
"scheme": "YYYY-WW",
"key_fields": ["service", "category", "env"],
"key_prefix": "platform_backlog",
},
"categories": {
"arch_review": {"priority": "P1", "due_days": 14},
"refactor": {"priority": "P1", "due_days": 21},
"slo_hardening": {"priority": "P2", "due_days": 30},
"cleanup_followups": {"priority": "P2", "due_days": 14},
"security": {"priority": "P0", "due_days": 7},
},
"generation": {
"weekly_from_pressure_digest": True,
"daily_from_risk_digest": False,
"rules": [
{
"name": "arch_review_required",
"when": {"pressure_requires_arch_review": True},
"create": {
"category": "arch_review",
"title_template": "[ARCH] Review required: {service}",
},
},
{
"name": "high_pressure_refactor",
"when": {
"pressure_band_in": ["high", "critical"],
"risk_band_in": ["high", "critical"],
},
"create": {
"category": "refactor",
"title_template": "[REF] Reduce pressure & risk: {service}",
},
},
{
"name": "slo_violations",
"when": {"risk_has_slo_violations": True},
"create": {
"category": "slo_hardening",
"title_template": "[SLO] Fix violations: {service}",
},
},
{
"name": "followup_backlog",
"when": {"followups_overdue_gt": 0},
"create": {
"category": "cleanup_followups",
"title_template": "[OPS] Close overdue followups: {service}",
},
},
],
},
"ownership": {
"default_owner": "oncall",
"overrides": {"gateway": "cto"},
},
"workflow": {
"statuses": ["open", "in_progress", "blocked", "done", "canceled"],
"allowed_transitions": {
"open": ["in_progress", "blocked", "canceled"],
"in_progress": ["blocked", "done", "canceled"],
"blocked": ["open", "in_progress", "canceled"],
"done": [],
"canceled": [],
},
},
}
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _now_week() -> str:
return datetime.datetime.utcnow().strftime("%Y-W%V")
def _make_dedupe_key(prefix: str, week_str: str, env: str,
service: str, category: str) -> str:
return f"{prefix}:{week_str}:{env}:{service}:{category}"
def _due_date(due_days: int) -> str:
return (
datetime.datetime.utcnow() + datetime.timedelta(days=due_days)
).strftime("%Y-%m-%d")
def _owner_for(service: str, policy: Dict) -> str:
overrides = policy.get("ownership", {}).get("overrides", {})
return overrides.get(service, policy.get("ownership", {}).get("default_owner", "oncall"))
def _match_rule(rule: Dict, ctx: Dict) -> bool:
"""
Evaluate a rule's `when` conditions against the service context dict.
All conditions must hold (AND logic).
"""
when = rule.get("when", {})
for key, expected in when.items():
if key == "pressure_requires_arch_review":
if bool(ctx.get("pressure_requires_arch_review")) is not bool(expected):
return False
elif key == "pressure_band_in":
if ctx.get("pressure_band") not in expected:
return False
elif key == "risk_band_in":
if ctx.get("risk_band") not in expected:
return False
elif key == "risk_has_slo_violations":
slo_v = int(ctx.get("slo_violations", 0))
if (slo_v > 0) is not bool(expected):
return False
elif key == "followups_overdue_gt":
overdue = int(ctx.get("followups_overdue", 0))
if not (overdue > int(expected)):
return False
return True
def _build_description(service: str, ctx: Dict, rule: Dict) -> str:
"""Generate deterministic bullet-list description from context."""
lines = [f"Auto-generated by Engineering Backlog Bridge — rule: {rule.get('name', '?')}.", ""]
p_score = ctx.get("pressure_score")
p_band = ctx.get("pressure_band")
r_score = ctx.get("risk_score")
r_band = ctx.get("risk_band")
r_delta = ctx.get("risk_delta_24h")
if p_score is not None:
lines.append(f"- Architecture Pressure: {p_score} ({p_band})")
if r_score is not None:
lines.append(f"- Risk Score: {r_score} ({r_band})"
+ (f" Δ24h: +{r_delta}" if r_delta else ""))
slo_v = int(ctx.get("slo_violations", 0))
if slo_v:
lines.append(f"- Active SLO violations: {slo_v}")
overdue = int(ctx.get("followups_overdue", 0))
if overdue:
lines.append(f"- Overdue follow-ups: {overdue}")
if ctx.get("signals_summary"):
lines.append(f"- Pressure signals: {'; '.join(ctx['signals_summary'][:3])}")
if ctx.get("risk_reasons"):
lines.append(f"- Risk signals: {'; '.join(ctx['risk_reasons'][:3])}")
return "\n".join(lines)
def _build_item_from_rule(
service: str,
rule: Dict,
ctx: Dict,
policy: Dict,
week_str: str,
env: str,
) -> Optional[BacklogItem]:
"""Build a BacklogItem from a matched rule and service context."""
create_cfg = rule.get("create", {})
category = create_cfg.get("category", "arch_review")
title_template = create_cfg.get("title_template", "[BACKLOG] {service}")
title = title_template.format(service=service)
cat_cfg = policy.get("categories", {}).get(category, {})
priority = cat_cfg.get("priority", "P2")
due_days = int(cat_cfg.get("due_days", 14))
owner = _owner_for(service, policy)
prefix = policy.get("dedupe", {}).get("key_prefix", "platform_backlog")
dedupe_key = _make_dedupe_key(prefix, week_str, env, service, category)
description = _build_description(service, ctx, rule)
# Gather evidence_refs from context
evidence_refs = dict(ctx.get("evidence_refs") or {})
return BacklogItem(
id=_new_id("bl"),
created_at=_now_iso(),
updated_at=_now_iso(),
env=env,
service=service,
category=category,
title=title,
description=description,
priority=priority,
status="open",
owner=owner,
due_date=_due_date(due_days),
source="digest",
dedupe_key=dedupe_key,
evidence_refs=evidence_refs,
tags=["auto", f"week:{week_str}", f"rule:{rule.get('name', '?')}"],
meta={
"rule_name": rule.get("name", ""),
"pressure_score": ctx.get("pressure_score"),
"risk_score": ctx.get("risk_score"),
"week": week_str,
},
)
# ─── Context builder from digest ──────────────────────────────────────────────
def _build_service_context(
service_entry: Dict,
risk_entry: Optional[Dict] = None,
) -> Dict:
"""
Build a unified service context dict from a platform_priority_digest
top_pressure_services entry plus an optional risk_digest service entry.
"""
p_score = service_entry.get("score")
p_band = service_entry.get("band", "low")
requires_review = bool(service_entry.get("requires_arch_review", False))
signals_summary = service_entry.get("signals_summary", [])
comp = service_entry.get("components", {})
followups_overdue = int(comp.get("followups_overdue", 0))
evidence_refs = service_entry.get("evidence_refs") or {}
ctx: Dict[str, Any] = {
"pressure_score": p_score,
"pressure_band": p_band,
"pressure_requires_arch_review": requires_review,
"signals_summary": signals_summary,
"followups_overdue": followups_overdue,
"evidence_refs": dict(evidence_refs),
}
# Merge risk data
if risk_entry:
ctx["risk_score"] = risk_entry.get("score")
ctx["risk_band"] = risk_entry.get("band", "low")
ctx["risk_delta_24h"] = (risk_entry.get("trend") or {}).get("delta_24h")
slo_comp = (risk_entry.get("components") or {}).get("slo") or {}
ctx["slo_violations"] = int(slo_comp.get("violations", 0))
ctx["risk_reasons"] = risk_entry.get("reasons", [])
# Merge evidence_refs from risk
risk_attrs = risk_entry.get("attribution") or {}
risk_erefs = risk_attrs.get("evidence_refs") or {}
for k, v in risk_erefs.items():
if k not in ctx["evidence_refs"]:
ctx["evidence_refs"][k] = v
else:
ctx.setdefault("risk_band", service_entry.get("risk_band", "low"))
ctx.setdefault("risk_score", service_entry.get("risk_score"))
ctx.setdefault("risk_delta_24h", service_entry.get("risk_delta_24h"))
ctx.setdefault("slo_violations", 0)
return ctx
# ─── Main generation function ─────────────────────────────────────────────────
def generate_from_pressure_digest(
digest_data: Dict,
env: str = "prod",
*,
store: Optional[BacklogStore] = None,
policy: Optional[Dict] = None,
week_str: Optional[str] = None,
risk_digest_data: Optional[Dict] = None,
) -> Dict:
"""
Generate backlog items from a weekly_platform_priority_digest JSON output.
Args:
digest_data: JSON dict from platform_priority_digest (top_pressure_services list)
env: deployment environment
store: backlog store (loaded from factory if None)
policy: backlog_policy (loaded if None)
week_str: override ISO week (defaults to digest's "week" field or current)
risk_digest_data: optional daily risk digest JSON to enrich context
Returns GenerateResult dict: created, updated, skipped, items
"""
if policy is None:
policy = load_backlog_policy()
if store is None:
from backlog_store import get_backlog_store
store = get_backlog_store()
gen_cfg = policy.get("generation", {})
if not gen_cfg.get("weekly_from_pressure_digest", True):
return {"created": 0, "updated": 0, "skipped": 0, "items": [],
"skipped_reason": "weekly_from_pressure_digest disabled in policy"}
effective_week = week_str or digest_data.get("week") or _now_week()
max_items = int(policy.get("defaults", {}).get("max_items_per_run", 50))
rules = gen_cfg.get("rules", [])
# Build risk_by_service lookup
risk_by_service: Dict[str, Dict] = {}
if risk_digest_data:
for rs in (risk_digest_data.get("top_services") or []):
svc = rs.get("service", "")
if svc:
risk_by_service[svc] = rs
created = updated = skipped = 0
items_out: List[Dict] = []
total_written = 0
for svc_entry in (digest_data.get("top_pressure_services") or []):
service = svc_entry.get("service", "")
if not service:
continue
if total_written >= max_items:
skipped += 1
continue
ctx = _build_service_context(svc_entry, risk_by_service.get(service))
# Evaluate rules — one item per matched rule
matched_categories: set = set()
for rule in rules:
try:
if not _match_rule(rule, ctx):
continue
category = rule.get("create", {}).get("category", "")
if category in matched_categories:
continue # dedupe same category within a service
matched_categories.add(category)
item = _build_item_from_rule(service, rule, ctx, policy,
effective_week, env)
if item is None:
continue
result = store.upsert(item)
action = result["action"]
upserted = result["item"]
# Emit event
ev_type = "created" if action == "created" else "auto_update"
store.add_event(BacklogEvent(
id=_new_id("ev"),
item_id=upserted.id,
ts=_now_iso(),
type=ev_type,
message=f"Auto-generated by weekly digest — rule: {rule.get('name', '?')}",
actor="backlog_generator",
meta={"week": effective_week, "rule": rule.get("name", "")},
))
if action == "created":
created += 1
else:
updated += 1
total_written += 1
items_out.append({
"id": upserted.id,
"service": service,
"category": upserted.category,
"status": upserted.status,
"action": action,
})
except Exception as e:
logger.warning("backlog_generator: skip rule %s for %s: %s",
rule.get("name"), service, e)
skipped += 1
return {
"created": created,
"updated": updated,
"skipped": skipped,
"items": items_out,
"week": effective_week,
}
def generate_from_risk_digest(
risk_digest_data: Dict,
env: str = "prod",
*,
store: Optional[BacklogStore] = None,
policy: Optional[Dict] = None,
week_str: Optional[str] = None,
) -> Dict:
"""
Optional: generate items from a daily risk digest JSON.
Only active when generation.daily_from_risk_digest=true.
"""
if policy is None:
policy = load_backlog_policy()
gen_cfg = policy.get("generation", {})
if not gen_cfg.get("daily_from_risk_digest", False):
return {"created": 0, "updated": 0, "skipped": 0, "items": [],
"skipped_reason": "daily_from_risk_digest disabled in policy"}
if store is None:
from backlog_store import get_backlog_store
store = get_backlog_store()
# Convert risk digest top_services into pressure-like entries
effective_week = week_str or _now_week()
max_items = int(policy.get("defaults", {}).get("max_items_per_run", 50))
rules = gen_cfg.get("rules", [])
created = updated = skipped = 0
items_out: List[Dict] = []
total_written = 0
for svc_entry in (risk_digest_data.get("top_services") or []):
service = svc_entry.get("service", "")
if not service or total_written >= max_items:
skipped += 1
continue
# Build a minimal pressure context from risk data
ctx: Dict = {
"pressure_score": None,
"pressure_band": "low",
"pressure_requires_arch_review": False,
"signals_summary": [],
"followups_overdue": 0,
"risk_score": svc_entry.get("score"),
"risk_band": svc_entry.get("band", "low"),
"risk_delta_24h": (svc_entry.get("trend") or {}).get("delta_24h"),
"slo_violations": (svc_entry.get("components") or {}).get("slo", {}).get("violations", 0) if svc_entry.get("components") else 0,
"risk_reasons": svc_entry.get("reasons", []),
"evidence_refs": (svc_entry.get("attribution") or {}).get("evidence_refs") or {},
}
matched_categories: set = set()
for rule in rules:
try:
if not _match_rule(rule, ctx):
continue
category = rule.get("create", {}).get("category", "")
if category in matched_categories:
continue
matched_categories.add(category)
item = _build_item_from_rule(service, rule, ctx, policy,
effective_week, env)
if item is None:
continue
result = store.upsert(item)
action = result["action"]
upserted = result["item"]
store.add_event(BacklogEvent(
id=_new_id("ev"),
item_id=upserted.id,
ts=_now_iso(),
type="created" if action == "created" else "auto_update",
message="Auto-generated from daily risk digest",
actor="backlog_generator",
meta={"week": effective_week},
))
if action == "created":
created += 1
else:
updated += 1
total_written += 1
items_out.append({
"id": upserted.id, "service": service,
"category": upserted.category, "status": upserted.status,
"action": action,
})
except Exception as e:
logger.warning("backlog_generator(risk): skip rule %s for %s: %s",
rule.get("name"), service, e)
skipped += 1
return {"created": created, "updated": updated, "skipped": skipped,
"items": items_out, "week": effective_week}