Config policies (16 files): alert_routing, architecture_pressure, backlog, cost_weights, data_governance, incident_escalation, incident_intelligence, network_allowlist, nodes_registry, observability_sources, rbac_tools_matrix, release_gate, risk_attribution, risk_policy, slo_policy, tool_limits, tools_rollout Ops (22 files): Caddyfile, calendar compose, grafana voice dashboard, deployments/incidents logs, runbooks for alerts/audit/backlog/incidents/sofiia/voice, cron jobs, scripts (alert_triage, audit_cleanup, migrate_*, governance, schedule), task_registry, voice alerts/ha/latency/policy Docs (30+ files): HUMANIZED_STEPAN v2.7-v3 changelogs and runbooks, NODA1/NODA2 status and setup, audit index and traces, backlog, incident, supervisor, tools, voice, opencode, release, risk, aistalk, spacebot Made-with: Cursor
146 lines
5.3 KiB
Python
146 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Lightweight scheduled job runner for DAARION operational tasks.
|
|
|
|
Calls tools directly (no gateway required) and saves output artifacts to
|
|
ops/reports/{cost,privacy,drift}/.
|
|
|
|
Usage:
|
|
python3 ops/scripts/schedule_jobs.py daily_cost_digest
|
|
python3 ops/scripts/schedule_jobs.py daily_privacy_digest
|
|
python3 ops/scripts/schedule_jobs.py weekly_drift_full
|
|
|
|
Environment variables:
|
|
REPO_ROOT — root of repo (default: inferred from script location)
|
|
AUDIT_BACKEND — auto|jsonl|postgres (default: auto)
|
|
DATABASE_URL — PostgreSQL DSN (required for backend=postgres/auto with DB)
|
|
AUDIT_JSONL_DIR — override JSONL audit dir
|
|
|
|
Exit codes: 0 = success, 1 = error
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import json
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# ── Resolve repo root ─────────────────────────────────────────────────────────
|
|
_HERE = Path(__file__).resolve().parent
|
|
REPO_ROOT = Path(os.getenv("REPO_ROOT", str(_HERE.parent.parent)))
|
|
sys.path.insert(0, str(REPO_ROOT / "services" / "router"))
|
|
|
|
|
|
def _today() -> str:
|
|
return datetime.date.today().isoformat()
|
|
|
|
|
|
def _week_tag() -> str:
|
|
d = datetime.date.today()
|
|
return f"week-{d.isocalendar()[0]}-{d.isocalendar()[1]:02d}"
|
|
|
|
|
|
def _save_artifact(output_dir: Path, stem: str, data: dict) -> None:
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
json_path = output_dir / f"{stem}.json"
|
|
md_path = output_dir / f"{stem}.md"
|
|
with open(json_path, "w", encoding="utf-8") as fh:
|
|
json.dump(data, fh, indent=2, ensure_ascii=False, default=str)
|
|
markdown = data.get("markdown", "")
|
|
if markdown:
|
|
with open(md_path, "w", encoding="utf-8") as fh:
|
|
fh.write(markdown)
|
|
print(f"[schedule_jobs] Artifacts saved: {json_path}")
|
|
if md_path.exists():
|
|
print(f"[schedule_jobs] Markdown: {md_path}")
|
|
|
|
|
|
# ─── Task implementations ─────────────────────────────────────────────────────
|
|
|
|
def run_daily_cost_digest() -> int:
|
|
print(f"[schedule_jobs] Running daily_cost_digest ({_today()})")
|
|
try:
|
|
from cost_analyzer import analyze_cost_dict # type: ignore
|
|
result = analyze_cost_dict("digest", params={
|
|
"window_hours": 24,
|
|
"baseline_hours": 168,
|
|
"top_n": 10,
|
|
"backend": os.getenv("AUDIT_BACKEND", "auto"),
|
|
})
|
|
output_dir = REPO_ROOT / "ops" / "reports" / "cost"
|
|
_save_artifact(output_dir, _today(), result)
|
|
anomalies = result.get("anomaly_count", 0)
|
|
recs = result.get("recommendations") or []
|
|
print(f"[schedule_jobs] Cost digest OK — anomalies={anomalies}, recs={len(recs)}")
|
|
return 0
|
|
except Exception as exc:
|
|
print(f"[schedule_jobs] daily_cost_digest FAILED: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
def run_daily_privacy_digest() -> int:
|
|
print(f"[schedule_jobs] Running daily_privacy_digest ({_today()})")
|
|
try:
|
|
from data_governance import scan_data_governance_dict # type: ignore
|
|
result = scan_data_governance_dict("digest_audit", params={
|
|
"backend": os.getenv("AUDIT_BACKEND", "auto"),
|
|
"time_window_hours": 24,
|
|
"max_findings": 20,
|
|
})
|
|
output_dir = REPO_ROOT / "ops" / "reports" / "privacy"
|
|
_save_artifact(output_dir, _today(), result)
|
|
stats = result.get("stats") or {}
|
|
print(
|
|
f"[schedule_jobs] Privacy digest OK — "
|
|
f"errors={stats.get('errors',0)}, warnings={stats.get('warnings',0)}"
|
|
)
|
|
return 0
|
|
except Exception as exc:
|
|
print(f"[schedule_jobs] daily_privacy_digest FAILED: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
def run_weekly_drift_full() -> int:
|
|
tag = _week_tag()
|
|
print(f"[schedule_jobs] Running weekly_drift_full ({tag})")
|
|
try:
|
|
from drift_analyzer import analyze_drift_dict # type: ignore
|
|
result = analyze_drift_dict({
|
|
"categories": ["services", "openapi", "nats", "tools"],
|
|
"drift_profile": "dev",
|
|
})
|
|
output_dir = REPO_ROOT / "ops" / "reports" / "drift"
|
|
_save_artifact(output_dir, tag, result)
|
|
stats = (result.get("data") or result).get("stats") or {}
|
|
print(
|
|
f"[schedule_jobs] Drift full OK — "
|
|
f"errors={stats.get('errors',0)}, warnings={stats.get('warnings',0)}"
|
|
)
|
|
return 0
|
|
except Exception as exc:
|
|
print(f"[schedule_jobs] weekly_drift_full FAILED: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
# ─── Dispatch ─────────────────────────────────────────────────────────────────
|
|
|
|
TASKS = {
|
|
"daily_cost_digest": run_daily_cost_digest,
|
|
"daily_privacy_digest": run_daily_privacy_digest,
|
|
"weekly_drift_full": run_weekly_drift_full,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
if len(sys.argv) < 2 or sys.argv[1] not in TASKS:
|
|
print(f"Usage: {sys.argv[0]} <task>", file=sys.stderr)
|
|
print(f" Available tasks: {', '.join(TASKS)}", file=sys.stderr)
|
|
sys.exit(1)
|
|
task_name = sys.argv[1]
|
|
sys.exit(TASKS[task_name]())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|