""" backlog_generator.py — Auto-generation of Engineering Backlog items from Platform Priority / Risk digests. DAARION.city | deterministic, no LLM. Public API: load_backlog_policy() -> Dict generate_from_pressure_digest(digest_data, env, ...) -> GenerateResult generate_from_risk_digest(digest_data, env, ...) -> GenerateResult _build_item_from_rule(service, rule, context, policy, week_str, env) -> BacklogItem | None _make_dedupe_key(prefix, week_str, env, service, category) -> str """ from __future__ import annotations import datetime import json import logging import yaml from pathlib import Path from typing import Any, Dict, List, Optional from backlog_store import ( BacklogItem, BacklogEvent, BacklogStore, _new_id, _now_iso, ) logger = logging.getLogger(__name__) # ─── Policy ─────────────────────────────────────────────────────────────────── _BACKLOG_POLICY_CACHE: Optional[Dict] = None _BACKLOG_POLICY_PATHS = [ Path("config/backlog_policy.yml"), Path(__file__).resolve().parent.parent.parent / "config" / "backlog_policy.yml", ] def load_backlog_policy() -> Dict: global _BACKLOG_POLICY_CACHE if _BACKLOG_POLICY_CACHE is not None: return _BACKLOG_POLICY_CACHE for p in _BACKLOG_POLICY_PATHS: if p.exists(): try: with open(p) as f: data = yaml.safe_load(f) or {} _BACKLOG_POLICY_CACHE = data return data except Exception as e: logger.warning("Failed to load backlog_policy from %s: %s", p, e) _BACKLOG_POLICY_CACHE = _builtin_backlog_defaults() return _BACKLOG_POLICY_CACHE def _reload_backlog_policy() -> None: global _BACKLOG_POLICY_CACHE _BACKLOG_POLICY_CACHE = None def _builtin_backlog_defaults() -> Dict: return { "defaults": {"env": "prod", "retention_days": 180, "max_items_per_run": 50}, "dedupe": { "scheme": "YYYY-WW", "key_fields": ["service", "category", "env"], "key_prefix": "platform_backlog", }, "categories": { "arch_review": {"priority": "P1", "due_days": 14}, "refactor": {"priority": "P1", "due_days": 21}, "slo_hardening": {"priority": "P2", "due_days": 30}, "cleanup_followups": {"priority": "P2", "due_days": 14}, "security": {"priority": "P0", "due_days": 7}, }, "generation": { "weekly_from_pressure_digest": True, "daily_from_risk_digest": False, "rules": [ { "name": "arch_review_required", "when": {"pressure_requires_arch_review": True}, "create": { "category": "arch_review", "title_template": "[ARCH] Review required: {service}", }, }, { "name": "high_pressure_refactor", "when": { "pressure_band_in": ["high", "critical"], "risk_band_in": ["high", "critical"], }, "create": { "category": "refactor", "title_template": "[REF] Reduce pressure & risk: {service}", }, }, { "name": "slo_violations", "when": {"risk_has_slo_violations": True}, "create": { "category": "slo_hardening", "title_template": "[SLO] Fix violations: {service}", }, }, { "name": "followup_backlog", "when": {"followups_overdue_gt": 0}, "create": { "category": "cleanup_followups", "title_template": "[OPS] Close overdue followups: {service}", }, }, ], }, "ownership": { "default_owner": "oncall", "overrides": {"gateway": "cto"}, }, "workflow": { "statuses": ["open", "in_progress", "blocked", "done", "canceled"], "allowed_transitions": { "open": ["in_progress", "blocked", "canceled"], "in_progress": ["blocked", "done", "canceled"], "blocked": ["open", "in_progress", "canceled"], "done": [], "canceled": [], }, }, } # ─── Helpers ────────────────────────────────────────────────────────────────── def _now_week() -> str: return datetime.datetime.utcnow().strftime("%Y-W%V") def _make_dedupe_key(prefix: str, week_str: str, env: str, service: str, category: str) -> str: return f"{prefix}:{week_str}:{env}:{service}:{category}" def _due_date(due_days: int) -> str: return ( datetime.datetime.utcnow() + datetime.timedelta(days=due_days) ).strftime("%Y-%m-%d") def _owner_for(service: str, policy: Dict) -> str: overrides = policy.get("ownership", {}).get("overrides", {}) return overrides.get(service, policy.get("ownership", {}).get("default_owner", "oncall")) def _match_rule(rule: Dict, ctx: Dict) -> bool: """ Evaluate a rule's `when` conditions against the service context dict. All conditions must hold (AND logic). """ when = rule.get("when", {}) for key, expected in when.items(): if key == "pressure_requires_arch_review": if bool(ctx.get("pressure_requires_arch_review")) is not bool(expected): return False elif key == "pressure_band_in": if ctx.get("pressure_band") not in expected: return False elif key == "risk_band_in": if ctx.get("risk_band") not in expected: return False elif key == "risk_has_slo_violations": slo_v = int(ctx.get("slo_violations", 0)) if (slo_v > 0) is not bool(expected): return False elif key == "followups_overdue_gt": overdue = int(ctx.get("followups_overdue", 0)) if not (overdue > int(expected)): return False return True def _build_description(service: str, ctx: Dict, rule: Dict) -> str: """Generate deterministic bullet-list description from context.""" lines = [f"Auto-generated by Engineering Backlog Bridge — rule: {rule.get('name', '?')}.", ""] p_score = ctx.get("pressure_score") p_band = ctx.get("pressure_band") r_score = ctx.get("risk_score") r_band = ctx.get("risk_band") r_delta = ctx.get("risk_delta_24h") if p_score is not None: lines.append(f"- Architecture Pressure: {p_score} ({p_band})") if r_score is not None: lines.append(f"- Risk Score: {r_score} ({r_band})" + (f" Δ24h: +{r_delta}" if r_delta else "")) slo_v = int(ctx.get("slo_violations", 0)) if slo_v: lines.append(f"- Active SLO violations: {slo_v}") overdue = int(ctx.get("followups_overdue", 0)) if overdue: lines.append(f"- Overdue follow-ups: {overdue}") if ctx.get("signals_summary"): lines.append(f"- Pressure signals: {'; '.join(ctx['signals_summary'][:3])}") if ctx.get("risk_reasons"): lines.append(f"- Risk signals: {'; '.join(ctx['risk_reasons'][:3])}") return "\n".join(lines) def _build_item_from_rule( service: str, rule: Dict, ctx: Dict, policy: Dict, week_str: str, env: str, ) -> Optional[BacklogItem]: """Build a BacklogItem from a matched rule and service context.""" create_cfg = rule.get("create", {}) category = create_cfg.get("category", "arch_review") title_template = create_cfg.get("title_template", "[BACKLOG] {service}") title = title_template.format(service=service) cat_cfg = policy.get("categories", {}).get(category, {}) priority = cat_cfg.get("priority", "P2") due_days = int(cat_cfg.get("due_days", 14)) owner = _owner_for(service, policy) prefix = policy.get("dedupe", {}).get("key_prefix", "platform_backlog") dedupe_key = _make_dedupe_key(prefix, week_str, env, service, category) description = _build_description(service, ctx, rule) # Gather evidence_refs from context evidence_refs = dict(ctx.get("evidence_refs") or {}) return BacklogItem( id=_new_id("bl"), created_at=_now_iso(), updated_at=_now_iso(), env=env, service=service, category=category, title=title, description=description, priority=priority, status="open", owner=owner, due_date=_due_date(due_days), source="digest", dedupe_key=dedupe_key, evidence_refs=evidence_refs, tags=["auto", f"week:{week_str}", f"rule:{rule.get('name', '?')}"], meta={ "rule_name": rule.get("name", ""), "pressure_score": ctx.get("pressure_score"), "risk_score": ctx.get("risk_score"), "week": week_str, }, ) # ─── Context builder from digest ────────────────────────────────────────────── def _build_service_context( service_entry: Dict, risk_entry: Optional[Dict] = None, ) -> Dict: """ Build a unified service context dict from a platform_priority_digest top_pressure_services entry plus an optional risk_digest service entry. """ p_score = service_entry.get("score") p_band = service_entry.get("band", "low") requires_review = bool(service_entry.get("requires_arch_review", False)) signals_summary = service_entry.get("signals_summary", []) comp = service_entry.get("components", {}) followups_overdue = int(comp.get("followups_overdue", 0)) evidence_refs = service_entry.get("evidence_refs") or {} ctx: Dict[str, Any] = { "pressure_score": p_score, "pressure_band": p_band, "pressure_requires_arch_review": requires_review, "signals_summary": signals_summary, "followups_overdue": followups_overdue, "evidence_refs": dict(evidence_refs), } # Merge risk data if risk_entry: ctx["risk_score"] = risk_entry.get("score") ctx["risk_band"] = risk_entry.get("band", "low") ctx["risk_delta_24h"] = (risk_entry.get("trend") or {}).get("delta_24h") slo_comp = (risk_entry.get("components") or {}).get("slo") or {} ctx["slo_violations"] = int(slo_comp.get("violations", 0)) ctx["risk_reasons"] = risk_entry.get("reasons", []) # Merge evidence_refs from risk risk_attrs = risk_entry.get("attribution") or {} risk_erefs = risk_attrs.get("evidence_refs") or {} for k, v in risk_erefs.items(): if k not in ctx["evidence_refs"]: ctx["evidence_refs"][k] = v else: ctx.setdefault("risk_band", service_entry.get("risk_band", "low")) ctx.setdefault("risk_score", service_entry.get("risk_score")) ctx.setdefault("risk_delta_24h", service_entry.get("risk_delta_24h")) ctx.setdefault("slo_violations", 0) return ctx # ─── Main generation function ───────────────────────────────────────────────── def generate_from_pressure_digest( digest_data: Dict, env: str = "prod", *, store: Optional[BacklogStore] = None, policy: Optional[Dict] = None, week_str: Optional[str] = None, risk_digest_data: Optional[Dict] = None, ) -> Dict: """ Generate backlog items from a weekly_platform_priority_digest JSON output. Args: digest_data: JSON dict from platform_priority_digest (top_pressure_services list) env: deployment environment store: backlog store (loaded from factory if None) policy: backlog_policy (loaded if None) week_str: override ISO week (defaults to digest's "week" field or current) risk_digest_data: optional daily risk digest JSON to enrich context Returns GenerateResult dict: created, updated, skipped, items """ if policy is None: policy = load_backlog_policy() if store is None: from backlog_store import get_backlog_store store = get_backlog_store() gen_cfg = policy.get("generation", {}) if not gen_cfg.get("weekly_from_pressure_digest", True): return {"created": 0, "updated": 0, "skipped": 0, "items": [], "skipped_reason": "weekly_from_pressure_digest disabled in policy"} effective_week = week_str or digest_data.get("week") or _now_week() max_items = int(policy.get("defaults", {}).get("max_items_per_run", 50)) rules = gen_cfg.get("rules", []) # Build risk_by_service lookup risk_by_service: Dict[str, Dict] = {} if risk_digest_data: for rs in (risk_digest_data.get("top_services") or []): svc = rs.get("service", "") if svc: risk_by_service[svc] = rs created = updated = skipped = 0 items_out: List[Dict] = [] total_written = 0 for svc_entry in (digest_data.get("top_pressure_services") or []): service = svc_entry.get("service", "") if not service: continue if total_written >= max_items: skipped += 1 continue ctx = _build_service_context(svc_entry, risk_by_service.get(service)) # Evaluate rules — one item per matched rule matched_categories: set = set() for rule in rules: try: if not _match_rule(rule, ctx): continue category = rule.get("create", {}).get("category", "") if category in matched_categories: continue # dedupe same category within a service matched_categories.add(category) item = _build_item_from_rule(service, rule, ctx, policy, effective_week, env) if item is None: continue result = store.upsert(item) action = result["action"] upserted = result["item"] # Emit event ev_type = "created" if action == "created" else "auto_update" store.add_event(BacklogEvent( id=_new_id("ev"), item_id=upserted.id, ts=_now_iso(), type=ev_type, message=f"Auto-generated by weekly digest — rule: {rule.get('name', '?')}", actor="backlog_generator", meta={"week": effective_week, "rule": rule.get("name", "")}, )) if action == "created": created += 1 else: updated += 1 total_written += 1 items_out.append({ "id": upserted.id, "service": service, "category": upserted.category, "status": upserted.status, "action": action, }) except Exception as e: logger.warning("backlog_generator: skip rule %s for %s: %s", rule.get("name"), service, e) skipped += 1 return { "created": created, "updated": updated, "skipped": skipped, "items": items_out, "week": effective_week, } def generate_from_risk_digest( risk_digest_data: Dict, env: str = "prod", *, store: Optional[BacklogStore] = None, policy: Optional[Dict] = None, week_str: Optional[str] = None, ) -> Dict: """ Optional: generate items from a daily risk digest JSON. Only active when generation.daily_from_risk_digest=true. """ if policy is None: policy = load_backlog_policy() gen_cfg = policy.get("generation", {}) if not gen_cfg.get("daily_from_risk_digest", False): return {"created": 0, "updated": 0, "skipped": 0, "items": [], "skipped_reason": "daily_from_risk_digest disabled in policy"} if store is None: from backlog_store import get_backlog_store store = get_backlog_store() # Convert risk digest top_services into pressure-like entries effective_week = week_str or _now_week() max_items = int(policy.get("defaults", {}).get("max_items_per_run", 50)) rules = gen_cfg.get("rules", []) created = updated = skipped = 0 items_out: List[Dict] = [] total_written = 0 for svc_entry in (risk_digest_data.get("top_services") or []): service = svc_entry.get("service", "") if not service or total_written >= max_items: skipped += 1 continue # Build a minimal pressure context from risk data ctx: Dict = { "pressure_score": None, "pressure_band": "low", "pressure_requires_arch_review": False, "signals_summary": [], "followups_overdue": 0, "risk_score": svc_entry.get("score"), "risk_band": svc_entry.get("band", "low"), "risk_delta_24h": (svc_entry.get("trend") or {}).get("delta_24h"), "slo_violations": (svc_entry.get("components") or {}).get("slo", {}).get("violations", 0) if svc_entry.get("components") else 0, "risk_reasons": svc_entry.get("reasons", []), "evidence_refs": (svc_entry.get("attribution") or {}).get("evidence_refs") or {}, } matched_categories: set = set() for rule in rules: try: if not _match_rule(rule, ctx): continue category = rule.get("create", {}).get("category", "") if category in matched_categories: continue matched_categories.add(category) item = _build_item_from_rule(service, rule, ctx, policy, effective_week, env) if item is None: continue result = store.upsert(item) action = result["action"] upserted = result["item"] store.add_event(BacklogEvent( id=_new_id("ev"), item_id=upserted.id, ts=_now_iso(), type="created" if action == "created" else "auto_update", message="Auto-generated from daily risk digest", actor="backlog_generator", meta={"week": effective_week}, )) if action == "created": created += 1 else: updated += 1 total_written += 1 items_out.append({ "id": upserted.id, "service": service, "category": upserted.category, "status": upserted.status, "action": action, }) except Exception as e: logger.warning("backlog_generator(risk): skip rule %s for %s: %s", rule.get("name"), service, e) skipped += 1 return {"created": created, "updated": updated, "skipped": skipped, "items": items_out, "week": effective_week}