"""Ops: run risk dashboard, pressure dashboard, backlog generate, release_check via router tools.""" import logging from typing import Any, Dict from .config import get_router_url from .router_client import execute_tool logger = logging.getLogger(__name__) # Map ops action id -> (tool, action, default params) OPS_ACTIONS: Dict[str, tuple] = { "risk_dashboard": ("risk_engine_tool", "dashboard", {"env": "prod"}), "pressure_dashboard": ("architecture_pressure_tool", "dashboard", {"env": "prod"}), "backlog_generate_weekly": ("backlog_tool", "auto_generate_weekly", {"env": "prod"}), "pieces_status": ("pieces_tool", "status", {}), "notion_status": ("notion_tool", "status", {}), "notion_create_task": ("notion_tool", "create_task", {}), "notion_create_page": ("notion_tool", "create_page", {}), "notion_update_page": ("notion_tool", "update_page", {}), "notion_create_database": ("notion_tool", "create_database", {}), "release_check": ( "job_orchestrator_tool", "start_task", {"task_id": "release_check", "inputs": {"gate_profile": "staging"}}, ), } async def run_ops_action( action_id: str, node_id: str, params_override: Dict[str, Any], *, agent_id: str = "sofiia", timeout: float = 90.0, api_key: str = "", ) -> Dict[str, Any]: """Run one ops action against the given node's router. Returns { status, data, error }.""" if action_id not in OPS_ACTIONS: return {"status": "failed", "data": None, "error": {"message": f"Unknown action: {action_id}"}} tool, action, default_params = OPS_ACTIONS[action_id] params = {**default_params, **params_override} base_url = get_router_url(node_id) try: out = await execute_tool( base_url, tool, action, params=params, agent_id=agent_id, timeout=timeout, api_key=api_key, ) return out except Exception as e: logger.exception("ops run failed: action=%s node=%s", action_id, node_id) return { "status": "failed", "data": None, "error": {"message": str(e)[:300], "retryable": True}, }