#!/usr/bin/env python3 """ Lightweight scheduled job runner for DAARION operational tasks. Calls tools directly (no gateway required) and saves output artifacts to ops/reports/{cost,privacy,drift}/. Usage: python3 ops/scripts/schedule_jobs.py daily_cost_digest python3 ops/scripts/schedule_jobs.py daily_privacy_digest python3 ops/scripts/schedule_jobs.py weekly_drift_full Environment variables: REPO_ROOT — root of repo (default: inferred from script location) AUDIT_BACKEND — auto|jsonl|postgres (default: auto) DATABASE_URL — PostgreSQL DSN (required for backend=postgres/auto with DB) AUDIT_JSONL_DIR — override JSONL audit dir Exit codes: 0 = success, 1 = error """ from __future__ import annotations import datetime import json import os import sys from pathlib import Path # ── Resolve repo root ───────────────────────────────────────────────────────── _HERE = Path(__file__).resolve().parent REPO_ROOT = Path(os.getenv("REPO_ROOT", str(_HERE.parent.parent))) sys.path.insert(0, str(REPO_ROOT / "services" / "router")) def _today() -> str: return datetime.date.today().isoformat() def _week_tag() -> str: d = datetime.date.today() return f"week-{d.isocalendar()[0]}-{d.isocalendar()[1]:02d}" def _save_artifact(output_dir: Path, stem: str, data: dict) -> None: output_dir.mkdir(parents=True, exist_ok=True) json_path = output_dir / f"{stem}.json" md_path = output_dir / f"{stem}.md" with open(json_path, "w", encoding="utf-8") as fh: json.dump(data, fh, indent=2, ensure_ascii=False, default=str) markdown = data.get("markdown", "") if markdown: with open(md_path, "w", encoding="utf-8") as fh: fh.write(markdown) print(f"[schedule_jobs] Artifacts saved: {json_path}") if md_path.exists(): print(f"[schedule_jobs] Markdown: {md_path}") # ─── Task implementations ───────────────────────────────────────────────────── def run_daily_cost_digest() -> int: print(f"[schedule_jobs] Running daily_cost_digest ({_today()})") try: from cost_analyzer import analyze_cost_dict # type: ignore result = analyze_cost_dict("digest", params={ "window_hours": 24, "baseline_hours": 168, "top_n": 10, "backend": os.getenv("AUDIT_BACKEND", "auto"), }) output_dir = REPO_ROOT / "ops" / "reports" / "cost" _save_artifact(output_dir, _today(), result) anomalies = result.get("anomaly_count", 0) recs = result.get("recommendations") or [] print(f"[schedule_jobs] Cost digest OK — anomalies={anomalies}, recs={len(recs)}") return 0 except Exception as exc: print(f"[schedule_jobs] daily_cost_digest FAILED: {exc}", file=sys.stderr) return 1 def run_daily_privacy_digest() -> int: print(f"[schedule_jobs] Running daily_privacy_digest ({_today()})") try: from data_governance import scan_data_governance_dict # type: ignore result = scan_data_governance_dict("digest_audit", params={ "backend": os.getenv("AUDIT_BACKEND", "auto"), "time_window_hours": 24, "max_findings": 20, }) output_dir = REPO_ROOT / "ops" / "reports" / "privacy" _save_artifact(output_dir, _today(), result) stats = result.get("stats") or {} print( f"[schedule_jobs] Privacy digest OK — " f"errors={stats.get('errors',0)}, warnings={stats.get('warnings',0)}" ) return 0 except Exception as exc: print(f"[schedule_jobs] daily_privacy_digest FAILED: {exc}", file=sys.stderr) return 1 def run_weekly_drift_full() -> int: tag = _week_tag() print(f"[schedule_jobs] Running weekly_drift_full ({tag})") try: from drift_analyzer import analyze_drift_dict # type: ignore result = analyze_drift_dict({ "categories": ["services", "openapi", "nats", "tools"], "drift_profile": "dev", }) output_dir = REPO_ROOT / "ops" / "reports" / "drift" _save_artifact(output_dir, tag, result) stats = (result.get("data") or result).get("stats") or {} print( f"[schedule_jobs] Drift full OK — " f"errors={stats.get('errors',0)}, warnings={stats.get('warnings',0)}" ) return 0 except Exception as exc: print(f"[schedule_jobs] weekly_drift_full FAILED: {exc}", file=sys.stderr) return 1 # ─── Dispatch ───────────────────────────────────────────────────────────────── TASKS = { "daily_cost_digest": run_daily_cost_digest, "daily_privacy_digest": run_daily_privacy_digest, "weekly_drift_full": run_weekly_drift_full, } def main() -> None: if len(sys.argv) < 2 or sys.argv[1] not in TASKS: print(f"Usage: {sys.argv[0]} ", file=sys.stderr) print(f" Available tasks: {', '.join(TASKS)}", file=sys.stderr) sys.exit(1) task_name = sys.argv[1] sys.exit(TASKS[task_name]()) if __name__ == "__main__": main()