#!/usr/bin/env python3 """ run_governance_job.py — Universal Governance Job Runner. DAARION.city | used by cron to trigger scheduled governance jobs. Usage: python3 ops/scripts/run_governance_job.py \\ --tool risk_history_tool \\ --action snapshot \\ --params-json '{"env":"prod"}' python3 ops/scripts/run_governance_job.py \\ --tool backlog_tool --action cleanup --params-json '{"env":"prod"}' \\ --router-url http://localhost:8000 \\ --agent-id scheduler Exit codes: 0 — success (HTTP 200, result.success=true) 1 — HTTP error or tool returned success=false 2 — usage / configuration error Environment variables (read from .env if present): ROUTER_URL — base URL of the router service (default: http://localhost:8000) SCHEDULER_API_KEY — optional Bearer token for router auth GOVERNANCE_ENV — default env param passed in tool arguments (default: prod) """ from __future__ import annotations import argparse import datetime import json import logging import os import sys import urllib.error import urllib.request from pathlib import Path # ── Try loading .env from repo root ────────────────────────────────────────── def _load_dotenv(path: Path) -> None: if not path.exists(): return with open(path) as f: for line in f: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, _, value = line.partition("=") key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: # don't override existing env vars os.environ[key] = value _REPO_ROOT = Path(__file__).resolve().parent.parent.parent _load_dotenv(_REPO_ROOT / ".env") _load_dotenv(_REPO_ROOT / ".env.local") # ── Logging ─────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s [run_governance_job] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger("run_governance_job") # ── HTTP helper ─────────────────────────────────────────────────────────────── def _post_json(url: str, payload: dict, api_key: str = "", timeout: int = 60) -> dict: """POST JSON payload; return parsed response dict. Raises on HTTP error.""" body = json.dumps(payload).encode() headers = {"Content-Type": "application/json", "Accept": "application/json"} if api_key: headers["Authorization"] = f"Bearer {api_key}" req = urllib.request.Request(url, data=body, headers=headers, method="POST") try: with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: body_txt = e.read().decode(errors="replace")[:500] raise RuntimeError(f"HTTP {e.code} from {url}: {body_txt}") from e except urllib.error.URLError as e: raise RuntimeError(f"Cannot reach {url}: {e.reason}") from e # ── Main ────────────────────────────────────────────────────────────────────── def main() -> int: parser = argparse.ArgumentParser( description="Trigger a governance tool action via the DAARION router API." ) parser.add_argument("--tool", required=True, help="Tool name (e.g. risk_history_tool)") parser.add_argument("--action", required=True, help="Action (e.g. snapshot)") parser.add_argument( "--params-json", default="{}", help='JSON dict of extra parameters (e.g. \'{"env":"prod"}\')', ) parser.add_argument( "--router-url", default=os.environ.get("ROUTER_URL", "http://localhost:8000"), help="Router base URL (default: $ROUTER_URL or http://localhost:8000)", ) parser.add_argument( "--agent-id", default="scheduler", help='Agent identity for audit trail (default: scheduler)', ) parser.add_argument( "--timeout", type=int, default=90, help="HTTP timeout in seconds (default: 90)", ) parser.add_argument( "--dry-run", action="store_true", help="Print the request payload without sending it", ) args = parser.parse_args() # Parse extra params try: extra_params = json.loads(args.params_json) except json.JSONDecodeError as e: logger.error("Invalid --params-json: %s", e) return 2 api_key = os.environ.get("SCHEDULER_API_KEY", "") endpoint = f"{args.router_url.rstrip('/')}/v1/tools/execute" payload = { "tool": args.tool, "action": args.action, "agent_id": args.agent_id, **extra_params, } ts = datetime.datetime.utcnow().isoformat() logger.info("Job: %s.%s params=%s ts=%s", args.tool, args.action, extra_params, ts) if args.dry_run: print("[dry-run] Would POST to:", endpoint) print("[dry-run] Payload:", json.dumps(payload, indent=2)) return 0 try: result = _post_json(endpoint, payload, api_key=api_key, timeout=args.timeout) except RuntimeError as e: logger.error("Request failed: %s", e) return 1 # Normalise result — router returns {"success": bool, "result": ..., "error": ...} success = result.get("success", True) # assume success if key absent error = result.get("error") res_data = result.get("result", result) if success: # Pretty-print a summary summary = {} if isinstance(res_data, dict): for key in ("created", "updated", "skipped", "deleted", "snapshot_id", "services", "total", "week", "band", "score"): if key in res_data: summary[key] = res_data[key] logger.info( "✅ %s.%s → OK %s", args.tool, args.action, json.dumps(summary) if summary else "(done)", ) return 0 else: logger.error("❌ %s.%s → FAIL error=%s", args.tool, args.action, error) return 1 if __name__ == "__main__": sys.exit(main())