feat(sofiia-console): add guided runbook runner with http checks and audit integration
adds runbook_runs/runbook_steps state machine parses markdown runbooks into guided steps supports allowlisted http_check (health/metrics/audit) integrates runbook execution with audit trail exposes authenticated runbook runs API Made-with: Cursor
This commit is contained in:
337
services/sofiia-console/app/runbook_runner.py
Normal file
337
services/sofiia-console/app/runbook_runner.py
Normal file
@@ -0,0 +1,337 @@
|
||||
"""
|
||||
Runbook runner — create run, next_step (execute http_check or return manual), complete_step, abort.
|
||||
PR2: guided execution, allowlisted HTTP only; audit integration.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import httpx
|
||||
|
||||
from . import db as _db
|
||||
from . import docs_store as _docs_store
|
||||
from .audit import audit_log, AuditEvent
|
||||
from .runbook_parser import RunbookStep, parse_runbook
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_ALLOWED_HTTP_PATHS = {"/api/health", "/metrics", "/api/audit"}
|
||||
|
||||
|
||||
def _now_ts() -> float:
|
||||
return time.time()
|
||||
|
||||
|
||||
async def create_run(
|
||||
runbook_path: str,
|
||||
operator_id: str = "",
|
||||
node_id: Optional[str] = None,
|
||||
sofiia_url: Optional[str] = None,
|
||||
data_json: Optional[Dict[str, Any]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Parse runbook, insert run + steps, audit. Returns run_id, status, current_step, steps_total."""
|
||||
raw = await _docs_store.get_raw(runbook_path)
|
||||
if not raw:
|
||||
raise ValueError(f"Runbook not found: {runbook_path}")
|
||||
base_url = sofiia_url or "http://127.0.0.1:8002"
|
||||
steps = parse_runbook(runbook_path, raw, sofiia_url=base_url)
|
||||
if not steps:
|
||||
raise ValueError("Runbook produced no steps")
|
||||
|
||||
run_id = str(uuid.uuid4())
|
||||
now = _now_ts()
|
||||
conn = await _db.get_db()
|
||||
await conn.execute(
|
||||
"""INSERT INTO runbook_runs(run_id, runbook_path, status, current_step, created_at, started_at,
|
||||
finished_at, operator_id, node_id, sofiia_url, data_json, evidence_path)
|
||||
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
|
||||
(
|
||||
run_id,
|
||||
runbook_path,
|
||||
"running",
|
||||
0,
|
||||
now,
|
||||
now,
|
||||
None,
|
||||
operator_id or None,
|
||||
node_id,
|
||||
base_url,
|
||||
json.dumps(data_json or {}, separators=(",", ":")) if data_json else None,
|
||||
None,
|
||||
),
|
||||
)
|
||||
for s in steps:
|
||||
await conn.execute(
|
||||
"""INSERT INTO runbook_steps(run_id, step_index, title, section, action_type, action_json,
|
||||
status, result_json, started_at, finished_at) VALUES (?,?,?,?,?,?,?,?,?,?)""",
|
||||
(
|
||||
run_id,
|
||||
s.step_index,
|
||||
s.title,
|
||||
s.section,
|
||||
s.action_type,
|
||||
json.dumps(s.action_json, separators=(",", ":")),
|
||||
s.status,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
),
|
||||
)
|
||||
await conn.commit()
|
||||
|
||||
await audit_log(
|
||||
AuditEvent(
|
||||
event="runbook.run.created",
|
||||
operator_id=operator_id or "unknown",
|
||||
operator_id_missing=not operator_id,
|
||||
node_id=node_id,
|
||||
status="ok",
|
||||
data={"run_id": run_id, "runbook_path": runbook_path, "steps_total": len(steps)},
|
||||
)
|
||||
)
|
||||
return {
|
||||
"run_id": run_id,
|
||||
"status": "running",
|
||||
"current_step": 0,
|
||||
"steps_total": len(steps),
|
||||
}
|
||||
|
||||
|
||||
async def get_run(run_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Return run row + steps (light: no large action_json/result_json)."""
|
||||
conn = await _db.get_db()
|
||||
async with conn.execute(
|
||||
"SELECT run_id, runbook_path, status, current_step, created_at, started_at, finished_at,"
|
||||
" operator_id, node_id, sofiia_url, evidence_path FROM runbook_runs WHERE run_id = ?",
|
||||
(run_id,),
|
||||
) as cur:
|
||||
row = await cur.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
run = {
|
||||
"run_id": row[0],
|
||||
"runbook_path": row[1],
|
||||
"status": row[2],
|
||||
"current_step": row[3],
|
||||
"created_at": row[4],
|
||||
"started_at": row[5],
|
||||
"finished_at": row[6],
|
||||
"operator_id": row[7],
|
||||
"node_id": row[8],
|
||||
"sofiia_url": row[9],
|
||||
"evidence_path": row[10],
|
||||
}
|
||||
async with conn.execute(
|
||||
"SELECT step_index, title, section, action_type, status, started_at, finished_at "
|
||||
"FROM runbook_steps WHERE run_id = ? ORDER BY step_index",
|
||||
(run_id,),
|
||||
) as cur:
|
||||
step_rows = await cur.fetchall()
|
||||
run["steps"] = [
|
||||
{
|
||||
"step_index": r[0],
|
||||
"title": r[1],
|
||||
"section": r[2],
|
||||
"action_type": r[3],
|
||||
"status": r[4],
|
||||
"started_at": r[5],
|
||||
"finished_at": r[6],
|
||||
}
|
||||
for r in (step_rows or [])
|
||||
]
|
||||
return run
|
||||
|
||||
|
||||
async def _execute_http_check(base_url: str, action: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Allowlisted GET only; returns {status, status_code, ok, error}."""
|
||||
path = (action.get("url_path") or "").strip()
|
||||
if not path.startswith("/"):
|
||||
path = "/" + path
|
||||
if path not in _ALLOWED_HTTP_PATHS:
|
||||
return {"ok": False, "error": "path not allowlisted", "status_code": None}
|
||||
url = (base_url or "http://127.0.0.1:8002").rstrip("/") + path
|
||||
expect_statuses = action.get("expect", {}).get("status") or [200]
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15.0) as client:
|
||||
r = await client.get(url, headers=action.get("headers") or {})
|
||||
ok = r.status_code in expect_statuses
|
||||
return {
|
||||
"ok": ok,
|
||||
"status_code": r.status_code,
|
||||
"expected": expect_statuses,
|
||||
"status": "ok" if ok else "fail",
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning("http_check %s failed: %s", url, e)
|
||||
return {"ok": False, "error": str(e)[:200], "status_code": None, "status": "fail"}
|
||||
|
||||
|
||||
async def next_step(run_id: str, operator_id: str = "") -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Get current step; if http_check execute and advance; if manual return instructions.
|
||||
Returns either {type: "http_check", step_index, result, ...} or {type: "manual", step_index, title, instructions_md}.
|
||||
"""
|
||||
conn = await _db.get_db()
|
||||
async with conn.execute(
|
||||
"SELECT runbook_path, status, current_step, sofiia_url FROM runbook_runs WHERE run_id = ?",
|
||||
(run_id,),
|
||||
) as cur:
|
||||
row = await cur.fetchone()
|
||||
if not row or row[1] not in ("running", "paused"):
|
||||
return None
|
||||
runbook_path, status, current_step, sofiia_url = row[0], row[1], row[2], row[3]
|
||||
|
||||
async with conn.execute(
|
||||
"SELECT step_index, title, section, action_type, action_json, status FROM runbook_steps "
|
||||
"WHERE run_id = ? AND step_index = ?",
|
||||
(run_id, current_step),
|
||||
) as cur:
|
||||
step_row = await cur.fetchone()
|
||||
if not step_row:
|
||||
return None
|
||||
step_index, title, section, action_type, action_json_str, step_status = step_row
|
||||
action_json = json.loads(action_json_str) if action_json_str else {}
|
||||
|
||||
started_at = _now_ts()
|
||||
await conn.execute(
|
||||
"UPDATE runbook_steps SET status = ?, started_at = ? WHERE run_id = ? AND step_index = ?",
|
||||
("running", started_at, run_id, step_index),
|
||||
)
|
||||
await conn.commit()
|
||||
await audit_log(
|
||||
AuditEvent(
|
||||
event="runbook.step.started",
|
||||
operator_id=operator_id or "unknown",
|
||||
node_id=None,
|
||||
status="ok",
|
||||
data={"run_id": run_id, "step_index": step_index, "action_type": action_type, "title": title},
|
||||
)
|
||||
)
|
||||
|
||||
if action_type == "http_check":
|
||||
result = await _execute_http_check(sofiia_url or "http://127.0.0.1:8002", action_json)
|
||||
finished_at = _now_ts()
|
||||
duration_ms = int((finished_at - started_at) * 1000)
|
||||
step_status = "ok" if result.get("ok") else "fail"
|
||||
await conn.execute(
|
||||
"UPDATE runbook_steps SET status = ?, result_json = ?, finished_at = ? WHERE run_id = ? AND step_index = ?",
|
||||
(step_status, json.dumps(result, separators=(",", ":")), finished_at, run_id, step_index),
|
||||
)
|
||||
next_current = current_step + 1
|
||||
async with conn.execute("SELECT COUNT(*) FROM runbook_steps WHERE run_id = ?", (run_id,)) as cur:
|
||||
total = (await cur.fetchone())[0]
|
||||
if next_current >= total:
|
||||
await conn.execute(
|
||||
"UPDATE runbook_runs SET current_step = ?, status = ?, finished_at = ? WHERE run_id = ?",
|
||||
(next_current, "completed", finished_at, run_id),
|
||||
)
|
||||
else:
|
||||
await conn.execute("UPDATE runbook_runs SET current_step = ? WHERE run_id = ?", (next_current, run_id))
|
||||
await conn.commit()
|
||||
await audit_log(
|
||||
AuditEvent(
|
||||
event="runbook.step.completed",
|
||||
operator_id=operator_id or "unknown",
|
||||
node_id=None,
|
||||
status=step_status,
|
||||
duration_ms=duration_ms,
|
||||
data={
|
||||
"run_id": run_id,
|
||||
"step_index": step_index,
|
||||
"action_type": action_type,
|
||||
"result_ok": result.get("ok"),
|
||||
},
|
||||
)
|
||||
)
|
||||
return {
|
||||
"type": "http_check",
|
||||
"step_index": step_index,
|
||||
"title": title,
|
||||
"result": result,
|
||||
"next_step": next_current,
|
||||
"completed": next_current >= total,
|
||||
}
|
||||
|
||||
# manual
|
||||
instructions = action_json.get("instructions_md") or title
|
||||
return {
|
||||
"type": "manual",
|
||||
"step_index": step_index,
|
||||
"title": title,
|
||||
"section": section,
|
||||
"instructions_md": instructions,
|
||||
}
|
||||
|
||||
|
||||
async def complete_step(
|
||||
run_id: str,
|
||||
step_index: int,
|
||||
status: str = "ok",
|
||||
notes: str = "",
|
||||
data: Optional[Dict[str, Any]] = None,
|
||||
operator_id: str = "",
|
||||
) -> bool:
|
||||
"""Record manual step completion, advance current_step."""
|
||||
if status not in ("ok", "warn", "fail", "skipped"):
|
||||
status = "ok"
|
||||
conn = await _db.get_db()
|
||||
async with conn.execute(
|
||||
"SELECT current_step FROM runbook_runs WHERE run_id = ? AND runbook_runs.status IN ('running','paused')",
|
||||
(run_id,),
|
||||
) as cur:
|
||||
row = await cur.fetchone()
|
||||
if not row or row[0] != step_index:
|
||||
return False
|
||||
finished_at = _now_ts()
|
||||
result_json = json.dumps({"status": status, "notes": notes[:500], **(data or {})}, separators=(",", ":"))
|
||||
await conn.execute(
|
||||
"UPDATE runbook_steps SET status = ?, result_json = ?, finished_at = ? WHERE run_id = ? AND step_index = ?",
|
||||
(status, result_json, finished_at, run_id, step_index),
|
||||
)
|
||||
next_current = step_index + 1
|
||||
async with conn.execute("SELECT COUNT(*) FROM runbook_steps WHERE run_id = ?", (run_id,)) as cur:
|
||||
total = (await cur.fetchone())[0]
|
||||
if next_current >= total:
|
||||
await conn.execute(
|
||||
"UPDATE runbook_runs SET current_step = ?, status = ?, finished_at = ? WHERE run_id = ?",
|
||||
(next_current, "completed", finished_at, run_id),
|
||||
)
|
||||
else:
|
||||
await conn.execute("UPDATE runbook_runs SET current_step = ? WHERE run_id = ?", (next_current, run_id))
|
||||
await conn.commit()
|
||||
await audit_log(
|
||||
AuditEvent(
|
||||
event="runbook.step.completed",
|
||||
operator_id=operator_id or "unknown",
|
||||
status=status,
|
||||
data={"run_id": run_id, "step_index": step_index, "action_type": "manual"},
|
||||
)
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
async def abort_run(run_id: str, operator_id: str = "") -> bool:
|
||||
"""Set status=aborted, audit."""
|
||||
conn = await _db.get_db()
|
||||
async with conn.execute("SELECT 1 FROM runbook_runs WHERE run_id = ?", (run_id,)) as cur:
|
||||
if not await cur.fetchone():
|
||||
return False
|
||||
now = _now_ts()
|
||||
await conn.execute(
|
||||
"UPDATE runbook_runs SET status = ?, finished_at = ? WHERE run_id = ?",
|
||||
("aborted", now, run_id),
|
||||
)
|
||||
await conn.commit()
|
||||
await audit_log(
|
||||
AuditEvent(
|
||||
event="runbook.run.aborted",
|
||||
operator_id=operator_id or "unknown",
|
||||
status="ok",
|
||||
data={"run_id": run_id},
|
||||
)
|
||||
)
|
||||
return True
|
||||
Reference in New Issue
Block a user