Files
microdao-daarion/ops/scripts/schedule_jobs.py
Apple 67225a39fa docs(platform): add policy configs, runbooks, ops scripts and platform documentation
Config policies (16 files): alert_routing, architecture_pressure, backlog,
cost_weights, data_governance, incident_escalation, incident_intelligence,
network_allowlist, nodes_registry, observability_sources, rbac_tools_matrix,
release_gate, risk_attribution, risk_policy, slo_policy, tool_limits, tools_rollout

Ops (22 files): Caddyfile, calendar compose, grafana voice dashboard,
deployments/incidents logs, runbooks for alerts/audit/backlog/incidents/sofiia/voice,
cron jobs, scripts (alert_triage, audit_cleanup, migrate_*, governance, schedule),
task_registry, voice alerts/ha/latency/policy

Docs (30+ files): HUMANIZED_STEPAN v2.7-v3 changelogs and runbooks,
NODA1/NODA2 status and setup, audit index and traces, backlog, incident,
supervisor, tools, voice, opencode, release, risk, aistalk, spacebot

Made-with: Cursor
2026-03-03 07:14:53 -08:00

146 lines
5.3 KiB
Python

#!/usr/bin/env python3
"""
Lightweight scheduled job runner for DAARION operational tasks.
Calls tools directly (no gateway required) and saves output artifacts to
ops/reports/{cost,privacy,drift}/.
Usage:
python3 ops/scripts/schedule_jobs.py daily_cost_digest
python3 ops/scripts/schedule_jobs.py daily_privacy_digest
python3 ops/scripts/schedule_jobs.py weekly_drift_full
Environment variables:
REPO_ROOT — root of repo (default: inferred from script location)
AUDIT_BACKEND — auto|jsonl|postgres (default: auto)
DATABASE_URL — PostgreSQL DSN (required for backend=postgres/auto with DB)
AUDIT_JSONL_DIR — override JSONL audit dir
Exit codes: 0 = success, 1 = error
"""
from __future__ import annotations
import datetime
import json
import os
import sys
from pathlib import Path
# ── Resolve repo root ─────────────────────────────────────────────────────────
_HERE = Path(__file__).resolve().parent
REPO_ROOT = Path(os.getenv("REPO_ROOT", str(_HERE.parent.parent)))
sys.path.insert(0, str(REPO_ROOT / "services" / "router"))
def _today() -> str:
return datetime.date.today().isoformat()
def _week_tag() -> str:
d = datetime.date.today()
return f"week-{d.isocalendar()[0]}-{d.isocalendar()[1]:02d}"
def _save_artifact(output_dir: Path, stem: str, data: dict) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
json_path = output_dir / f"{stem}.json"
md_path = output_dir / f"{stem}.md"
with open(json_path, "w", encoding="utf-8") as fh:
json.dump(data, fh, indent=2, ensure_ascii=False, default=str)
markdown = data.get("markdown", "")
if markdown:
with open(md_path, "w", encoding="utf-8") as fh:
fh.write(markdown)
print(f"[schedule_jobs] Artifacts saved: {json_path}")
if md_path.exists():
print(f"[schedule_jobs] Markdown: {md_path}")
# ─── Task implementations ─────────────────────────────────────────────────────
def run_daily_cost_digest() -> int:
print(f"[schedule_jobs] Running daily_cost_digest ({_today()})")
try:
from cost_analyzer import analyze_cost_dict # type: ignore
result = analyze_cost_dict("digest", params={
"window_hours": 24,
"baseline_hours": 168,
"top_n": 10,
"backend": os.getenv("AUDIT_BACKEND", "auto"),
})
output_dir = REPO_ROOT / "ops" / "reports" / "cost"
_save_artifact(output_dir, _today(), result)
anomalies = result.get("anomaly_count", 0)
recs = result.get("recommendations") or []
print(f"[schedule_jobs] Cost digest OK — anomalies={anomalies}, recs={len(recs)}")
return 0
except Exception as exc:
print(f"[schedule_jobs] daily_cost_digest FAILED: {exc}", file=sys.stderr)
return 1
def run_daily_privacy_digest() -> int:
print(f"[schedule_jobs] Running daily_privacy_digest ({_today()})")
try:
from data_governance import scan_data_governance_dict # type: ignore
result = scan_data_governance_dict("digest_audit", params={
"backend": os.getenv("AUDIT_BACKEND", "auto"),
"time_window_hours": 24,
"max_findings": 20,
})
output_dir = REPO_ROOT / "ops" / "reports" / "privacy"
_save_artifact(output_dir, _today(), result)
stats = result.get("stats") or {}
print(
f"[schedule_jobs] Privacy digest OK — "
f"errors={stats.get('errors',0)}, warnings={stats.get('warnings',0)}"
)
return 0
except Exception as exc:
print(f"[schedule_jobs] daily_privacy_digest FAILED: {exc}", file=sys.stderr)
return 1
def run_weekly_drift_full() -> int:
tag = _week_tag()
print(f"[schedule_jobs] Running weekly_drift_full ({tag})")
try:
from drift_analyzer import analyze_drift_dict # type: ignore
result = analyze_drift_dict({
"categories": ["services", "openapi", "nats", "tools"],
"drift_profile": "dev",
})
output_dir = REPO_ROOT / "ops" / "reports" / "drift"
_save_artifact(output_dir, tag, result)
stats = (result.get("data") or result).get("stats") or {}
print(
f"[schedule_jobs] Drift full OK — "
f"errors={stats.get('errors',0)}, warnings={stats.get('warnings',0)}"
)
return 0
except Exception as exc:
print(f"[schedule_jobs] weekly_drift_full FAILED: {exc}", file=sys.stderr)
return 1
# ─── Dispatch ─────────────────────────────────────────────────────────────────
TASKS = {
"daily_cost_digest": run_daily_cost_digest,
"daily_privacy_digest": run_daily_privacy_digest,
"weekly_drift_full": run_weekly_drift_full,
}
def main() -> None:
if len(sys.argv) < 2 or sys.argv[1] not in TASKS:
print(f"Usage: {sys.argv[0]} <task>", file=sys.stderr)
print(f" Available tasks: {', '.join(TASKS)}", file=sys.stderr)
sys.exit(1)
task_name = sys.argv[1]
sys.exit(TASKS[task_name]())
if __name__ == "__main__":
main()