Files
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

285 lines
8.8 KiB
Python

"""
Sofiia Supervisor — FastAPI Application
HTTP API for launching and monitoring LangGraph runs.
Endpoints:
POST /v1/graphs/{graph_name}/runs — start a new run (async)
GET /v1/runs/{run_id} — get run status + result
POST /v1/runs/{run_id}/cancel — cancel a running run
GET /healthz — health check
"""
from __future__ import annotations
import asyncio
import datetime
import hashlib
import logging
import uuid
from typing import Any, Dict, Optional
from fastapi import BackgroundTasks, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from .config import settings
from .graphs import GRAPH_REGISTRY
from .models import (
CancelRunResponse,
EventType,
GetRunResponse,
RunEvent,
RunRecord,
RunStatus,
StartRunRequest,
StartRunResponse,
)
from .state_backend import StateBackend, create_state_backend
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
# ─── App ──────────────────────────────────────────────────────────────────────
app = FastAPI(
title="Sofiia Supervisor",
version="1.0.0",
description="LangGraph orchestration service for DAARION.city",
docs_url="/docs",
redoc_url=None,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["POST", "GET"],
allow_headers=["*"],
)
_state_backend: Optional[StateBackend] = None
def get_state_backend() -> StateBackend:
global _state_backend
if _state_backend is None:
_state_backend = create_state_backend()
return _state_backend
# ─── Auth middleware ──────────────────────────────────────────────────────────
def _check_internal_key(request: Request):
key = settings.SUPERVISOR_INTERNAL_KEY
if not key:
return # no key configured → open (rely on network-level protection)
auth = request.headers.get("Authorization", "")
provided = auth.removeprefix("Bearer ").strip()
if provided != key:
raise HTTPException(status_code=401, detail="Unauthorized")
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _new_run_id() -> str:
return "gr_" + uuid.uuid4().hex[:20]
def _now() -> str:
return datetime.datetime.now(datetime.timezone.utc).isoformat()
def _input_hash(inp: Dict) -> str:
import json
try:
return hashlib.sha256(json.dumps(inp, sort_keys=True, ensure_ascii=False).encode()).hexdigest()[:12]
except Exception:
return "?"
# ─── Graph runner (background task) ──────────────────────────────────────────
async def _run_graph(run_id: str, graph_name: str, initial_state: Dict[str, Any]):
"""
Execute the LangGraph in a background asyncio task.
Updates run state in the backend as it progresses.
Does NOT log payload — only hash + sizes in events.
"""
backend = get_state_backend()
# Mark as running
run = await backend.get_run(run_id)
if not run:
logger.error("_run_graph: run %s not found in state backend", run_id)
return
run.status = RunStatus.RUNNING
run.started_at = _now()
await backend.save_run(run)
await backend.append_event(run_id, RunEvent(
ts=_now(), type=EventType.NODE_START, node="graph_start",
details={"input_hash": _input_hash(initial_state.get("input", {}))},
))
try:
compiled = GRAPH_REGISTRY[graph_name]()
# Run graph asynchronously
final_state = await compiled.ainvoke(initial_state)
graph_status = final_state.get("graph_status", "succeeded")
result = final_state.get("result")
error = final_state.get("error")
await backend.append_event(run_id, RunEvent(
ts=_now(), type=EventType.NODE_END, node="graph_end",
details={"graph_status": graph_status},
))
run = await backend.get_run(run_id)
if run and run.status != RunStatus.CANCELLED:
run.status = RunStatus.SUCCEEDED if graph_status == "succeeded" else RunStatus.FAILED
run.finished_at = _now()
run.result = result
run.error = error
await backend.save_run(run)
except asyncio.CancelledError:
logger.info("run %s cancelled", run_id)
run = await backend.get_run(run_id)
if run:
run.status = RunStatus.CANCELLED
run.finished_at = _now()
await backend.save_run(run)
except Exception as e:
logger.exception("run %s graph execution error: %s", run_id, str(e)[:200])
run = await backend.get_run(run_id)
if run and run.status != RunStatus.CANCELLED:
run.status = RunStatus.FAILED
run.finished_at = _now()
run.error = str(e)[:500]
await backend.save_run(run)
await backend.append_event(run_id, RunEvent(
ts=_now(), type=EventType.ERROR,
details={"error": str(e)[:300]},
))
# ─── Endpoints ────────────────────────────────────────────────────────────────
@app.get("/healthz")
async def healthz():
return {
"status": "ok",
"service": "sofiia-supervisor",
"graphs": list(GRAPH_REGISTRY.keys()),
"state_backend": settings.STATE_BACKEND,
"gateway_url": settings.GATEWAY_BASE_URL,
}
@app.post("/v1/graphs/{graph_name}/runs", response_model=StartRunResponse)
async def start_run(
graph_name: str,
body: StartRunRequest,
request: Request,
background_tasks: BackgroundTasks,
):
"""
Start a new graph run asynchronously.
The run is queued immediately; execution happens in the background.
Poll GET /v1/runs/{run_id} for status and result.
"""
_check_internal_key(request)
if graph_name not in GRAPH_REGISTRY:
raise HTTPException(
status_code=404,
detail=f"Unknown graph '{graph_name}'. Available: {list(GRAPH_REGISTRY.keys())}",
)
run_id = _new_run_id()
now = _now()
run = RunRecord(
run_id=run_id,
graph=graph_name,
status=RunStatus.QUEUED,
agent_id=body.agent_id,
workspace_id=body.workspace_id,
user_id=body.user_id,
started_at=now,
)
await get_state_backend().save_run(run)
# Build initial LangGraph state
initial_state = {
"run_id": run_id,
"agent_id": body.agent_id,
"workspace_id": body.workspace_id,
"user_id": body.user_id,
"input": body.input,
"graph_status": "running",
}
background_tasks.add_task(_run_graph, run_id, graph_name, initial_state)
logger.info(
"start_run graph=%s run=%s agent=%s input_hash=%s",
graph_name, run_id, body.agent_id, _input_hash(body.input),
)
return StartRunResponse(run_id=run_id, status=RunStatus.QUEUED)
@app.get("/v1/runs/{run_id}", response_model=GetRunResponse)
async def get_run(run_id: str, request: Request):
"""Get run status, result, and event log."""
_check_internal_key(request)
run = await get_state_backend().get_run(run_id)
if not run:
raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
return GetRunResponse(
run_id=run.run_id,
graph=run.graph,
status=run.status,
started_at=run.started_at,
finished_at=run.finished_at,
result=run.result,
events=run.events,
)
@app.post("/v1/runs/{run_id}/cancel", response_model=CancelRunResponse)
async def cancel_run(run_id: str, request: Request):
"""Request cancellation of a running/queued run."""
_check_internal_key(request)
backend = get_state_backend()
run = await backend.get_run(run_id)
if not run:
raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
cancelled = await backend.cancel_run(run_id)
if not cancelled:
return CancelRunResponse(
run_id=run_id,
status=run.status,
message=f"Run is already {run.status.value}, cannot cancel",
)
logger.info("cancel_run run=%s requested", run_id)
return CancelRunResponse(
run_id=run_id,
status=RunStatus.CANCELLED,
message="Cancellation requested. In-flight tool calls may still complete.",
)