Files
microdao-daarion/ops/offload_routing.md
Apple c4b94a327d P2.2+P2.3: NATS offload node-worker + router offload integration
Node Worker (services/node-worker/):
- NATS subscriber for node.{NODE_ID}.llm.request / vision.request
- Canonical JobRequest/JobResponse envelope (Pydantic)
- Idempotency cache (TTL 10min) with inflight dedup
- Deadline enforcement (DEADLINE_EXCEEDED on expired jobs)
- Concurrency limiter (semaphore, returns busy)
- Ollama + Swapper vision providers

Router offload (services/router/offload_client.py):
- NATS req/reply with configurable retries
- Circuit breaker per node+type (3 fails/60s → open 120s)
- Concurrency semaphore for remote requests

Model selection (services/router/model_select.py):
- exclude_nodes parameter for circuit-broken nodes
- force_local flag for fallback re-selection
- Integrated circuit breaker state awareness

Router /infer pipeline:
- Remote offload path when NCS selects remote node
- Automatic fallback: exclude failed node → force_local re-select
- Deadline propagation from router to node-worker

Tests: 17 unit tests (idempotency, deadline, circuit breaker)
Docs: ops/offload_routing.md (subjects, envelope, verification)
Made-with: Cursor
2026-02-27 02:44:05 -08:00

4.4 KiB

NATS Offload Routing — Operations Guide

Architecture

 Router (NODA1/NODA2)
   │
   ├── model_select.py → selects best model from global capabilities pool
   │                      (local first, remote if needed, circuit breaker aware)
   │
   ├── offload_client.py → NATS req/reply to remote node-worker
   │                        (retries, deadlines, circuit breaker)
   │
   └── global_capabilities_client.py → scatter-gather discovery
                                        node.*.capabilities.get

NATS Subjects

Subject Direction Description
node.{node_id}.capabilities.get req/reply NCS capabilities query
node.{node_id}.llm.request req/reply LLM inference offload
node.{node_id}.vision.request req/reply Vision inference offload
node.{node_id}.stt.request req/reply STT (scaffold)
node.{node_id}.tts.request req/reply TTS (scaffold)

Job Request Envelope

{
  "job_id": "uuid",
  "trace_id": "uuid",
  "actor_agent_id": "sofiia",
  "target_agent_id": "helion",
  "required_type": "llm",
  "deadline_ts": 1740000000000,
  "idempotency_key": "uuid",
  "payload": {
    "messages": [{"role": "user", "content": "..."}],
    "prompt": "...",
    "model": "qwen3:14b",
    "max_tokens": 2048,
    "temperature": 0.2
  },
  "hints": {
    "prefer_models": ["qwen3:14b"]
  }
}

Job Response

{
  "job_id": "uuid",
  "trace_id": "uuid",
  "node_id": "noda2",
  "status": "ok|busy|timeout|error",
  "provider": "ollama",
  "model": "qwen3:14b",
  "latency_ms": 5500,
  "result": {"text": "..."},
  "error": null,
  "cached": false
}

Circuit Breaker

Per node_id:required_type. Opens after 3 failures in 60s, stays open 120s.

Env Default Description
ROUTER_OFFLOAD_CB_FAILS 3 Failures to trip
ROUTER_OFFLOAD_CB_WINDOW_S 60 Failure window
ROUTER_OFFLOAD_CB_OPEN_S 120 Open duration

Idempotency

Node-worker caches idempotency_key → response for 10 minutes. Duplicate requests return cached response immediately (< 10ms). Inflight dedup prevents parallel execution of same job.

Deadline Enforcement

deadline_ts is absolute Unix milliseconds. If already expired on arrival, node-worker returns status=timeout + error.code=DEADLINE_EXCEEDED. During inference, asyncio.wait_for enforces remaining time.

Fallback Chain

  1. NCS selects remote model (e.g. noda2)
  2. Router sends NATS offload request
  3. If remote fails (timeout/busy/error):
    • Record circuit breaker failure
    • Re-select with exclude_nodes={failed} + force_local=True
    • Execute locally

Environment Variables

Router

Env Default Description
ROUTER_INFER_TIMEOUT_MS 25000 Total inference deadline
ROUTER_OFFLOAD_RETRIES 1 NATS retry on transient
ROUTER_OFFLOAD_MAX_CONCURRENCY_REMOTE 8 Max parallel offloads

Node Worker

Env Default Description
NODE_ID noda2 Canonical node identifier
NATS_URL nats://dagi-nats:4222 NATS server
OLLAMA_BASE_URL http://host.docker.internal:11434 Ollama API
SWAPPER_URL http://swapper-service:8890 Swapper API
NODE_DEFAULT_LLM qwen3:14b Default LLM model
NODE_DEFAULT_VISION llava:13b Default vision model
NODE_WORKER_MAX_CONCURRENCY 2 Max parallel inferences

Verification Commands

# Node Worker health
curl -s http://localhost:8109/healthz | jq .

# Direct NATS LLM offload
nats request node.noda2.llm.request '{"job_id":"test","required_type":"llm","payload":{"prompt":"hi","max_tokens":10}}'

# Idempotency test (same job_id)
nats request node.noda2.llm.request '{"job_id":"test","idempotency_key":"test","required_type":"llm","payload":{"prompt":"hi"}}' # → cached=true

# Deadline expired test
nats request node.noda2.llm.request '{"job_id":"expired","required_type":"llm","deadline_ts":1000,"payload":{"prompt":"hi"}}' # → DEADLINE_EXCEEDED

# Router logs (selection + offload)
docker logs dagi-router-node2 2>&1 | grep -E '\[select\]|\[offload\]|\[fallback\]'

Adding New Nodes

  1. Deploy node-capabilities + node-worker with unique NODE_ID
  2. Connect NATS (leafnode or cluster member)
  3. Router auto-discovers via node.*.capabilities.get scatter-gather
  4. No config changes on Router needed