Config policies (16 files): alert_routing, architecture_pressure, backlog, cost_weights, data_governance, incident_escalation, incident_intelligence, network_allowlist, nodes_registry, observability_sources, rbac_tools_matrix, release_gate, risk_attribution, risk_policy, slo_policy, tool_limits, tools_rollout Ops (22 files): Caddyfile, calendar compose, grafana voice dashboard, deployments/incidents logs, runbooks for alerts/audit/backlog/incidents/sofiia/voice, cron jobs, scripts (alert_triage, audit_cleanup, migrate_*, governance, schedule), task_registry, voice alerts/ha/latency/policy Docs (30+ files): HUMANIZED_STEPAN v2.7-v3 changelogs and runbooks, NODA1/NODA2 status and setup, audit index and traces, backlog, incident, supervisor, tools, voice, opencode, release, risk, aistalk, spacebot Made-with: Cursor
149 lines
4.6 KiB
Python
149 lines
4.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
alert_triage_loop.py — Scheduled runner for the alert_triage_graph.
|
|
|
|
Calls the sofiia-supervisor API (POST /v1/graphs/alert_triage/runs) and
|
|
polls until the run completes, then prints the digest.
|
|
|
|
Usage:
|
|
python3 ops/scripts/alert_triage_loop.py [--dry-run] [--supervisor-url URL]
|
|
|
|
Environment:
|
|
SUPERVISOR_URL default: http://sofiia-supervisor:8084
|
|
SUPERVISOR_API_KEY optional API key (Bearer token)
|
|
ALERT_TRIAGE_WS_ID workspace_id (default: "default")
|
|
ALERT_TRIAGE_AGENT agent_id (default: "sofiia")
|
|
|
|
Cron example (NODA2):
|
|
*/5 * * * * python3 /opt/daarion/ops/scripts/alert_triage_loop.py >> /var/log/alert_triage.log 2>&1
|
|
"""
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SUPERVISOR_URL = os.getenv("SUPERVISOR_URL", "http://sofiia-supervisor:8084")
|
|
API_KEY = os.getenv("SUPERVISOR_API_KEY", "")
|
|
WORKSPACE_ID = os.getenv("ALERT_TRIAGE_WS_ID", "default")
|
|
AGENT_ID = os.getenv("ALERT_TRIAGE_AGENT", "sofiia")
|
|
|
|
MAX_POLL_SECONDS = 220
|
|
POLL_INTERVAL_SECONDS = 5
|
|
|
|
|
|
def _headers() -> dict:
|
|
h = {"Content-Type": "application/json", "Accept": "application/json"}
|
|
if API_KEY:
|
|
h["Authorization"] = f"Bearer {API_KEY}"
|
|
return h
|
|
|
|
|
|
def _http_post(url: str, body: dict) -> dict:
|
|
data = json.dumps(body).encode()
|
|
req = urllib.request.Request(url, data=data, headers=_headers(), method="POST")
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
|
|
def _http_get(url: str) -> dict:
|
|
req = urllib.request.Request(url, headers=_headers(), method="GET")
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
|
|
def start_run(dry_run: bool = False) -> str:
|
|
payload = {
|
|
"workspace_id": WORKSPACE_ID,
|
|
"user_id": "scheduler",
|
|
"agent_id": AGENT_ID,
|
|
"input": {
|
|
"policy_profile": "default",
|
|
"dry_run": dry_run,
|
|
"workspace_id": WORKSPACE_ID,
|
|
"agent_id": AGENT_ID,
|
|
},
|
|
}
|
|
url = f"{SUPERVISOR_URL}/v1/graphs/alert_triage/runs"
|
|
logger.info("Starting alert_triage run (dry_run=%s)", dry_run)
|
|
resp = _http_post(url, payload)
|
|
run_id = resp.get("run_id")
|
|
if not run_id:
|
|
raise RuntimeError(f"No run_id in response: {resp}")
|
|
logger.info("Run started: %s (status=%s)", run_id, resp.get("status"))
|
|
return run_id
|
|
|
|
|
|
def poll_run(run_id: str) -> dict:
|
|
url = f"{SUPERVISOR_URL}/v1/runs/{run_id}"
|
|
deadline = time.monotonic() + MAX_POLL_SECONDS
|
|
while time.monotonic() < deadline:
|
|
resp = _http_get(url)
|
|
status = resp.get("status", "unknown")
|
|
if status in ("succeeded", "failed", "cancelled"):
|
|
return resp
|
|
logger.debug("Run %s status=%s — waiting…", run_id, status)
|
|
time.sleep(POLL_INTERVAL_SECONDS)
|
|
raise TimeoutError(f"Run {run_id} did not complete in {MAX_POLL_SECONDS}s")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Alert Triage Loop runner")
|
|
parser.add_argument("--dry-run", action="store_true", help="Simulate without writes")
|
|
parser.add_argument("--supervisor-url", default=SUPERVISOR_URL)
|
|
args = parser.parse_args()
|
|
|
|
global SUPERVISOR_URL
|
|
SUPERVISOR_URL = args.supervisor_url
|
|
|
|
try:
|
|
run_id = start_run(dry_run=args.dry_run)
|
|
result = poll_run(run_id)
|
|
status = result.get("status")
|
|
run_result = result.get("result") or {}
|
|
|
|
digest = run_result.get("digest_md", "")
|
|
summary = run_result.get("result_summary") or {}
|
|
|
|
logger.info(
|
|
"Alert triage run %s completed: status=%s processed=%s "
|
|
"created=%s updated=%s skipped=%s errors=%s triages=%s",
|
|
run_id, status,
|
|
summary.get("processed", "?"),
|
|
summary.get("created_incidents", "?"),
|
|
summary.get("updated_incidents", "?"),
|
|
summary.get("skipped", "?"),
|
|
summary.get("errors", "?"),
|
|
summary.get("triage_runs", "?"),
|
|
)
|
|
|
|
if digest:
|
|
print("\n" + digest)
|
|
|
|
if status == "failed":
|
|
logger.error("Run %s FAILED", run_id)
|
|
sys.exit(1)
|
|
|
|
except urllib.error.URLError as e:
|
|
logger.error("Cannot reach supervisor at %s: %s", SUPERVISOR_URL, e)
|
|
sys.exit(2)
|
|
except TimeoutError as e:
|
|
logger.error("Timeout: %s", e)
|
|
sys.exit(3)
|
|
except Exception as e:
|
|
logger.error("Unexpected error: %s", e)
|
|
sys.exit(4)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|