🚀 NATS JetStream: K8s deployment + streams + job schema v1

- K8s deployment (2 replicas, PVC, initContainer для server_name)
- Streams definitions (MM_ONLINE, MM_OFFLINE, MM_WRITE, MM_EVENTS)
- Job payload schema (JSON v1 з idempotency)
- Worker contract (capabilities + ack/retry)
- Init streams script
- Оновлено ARCHITECTURE-150-NODES.md (Control-plane vs Data-plane)

TODO: Auth (nkeys), 3+ replicas для prod, worker-daemon implementation
This commit is contained in:
Apple
2026-01-10 10:02:25 -08:00
parent 3478dfce5f
commit 8fe0b58978
7 changed files with 1046 additions and 35 deletions

View File

@@ -0,0 +1,212 @@
---
# NATS JetStream Deployment
# Data-plane для 150 нод: fan-out/fan-in, online/offline пріоритети
apiVersion: v1
kind: Namespace
metadata:
name: nats
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: nats
namespace: nats
---
apiVersion: v1
kind: ConfigMap
metadata:
name: nats-config
namespace: nats
data:
nats.conf: |
# NATS JetStream Configuration
port: 4222
http_port: 8222
cluster {
port: 6222
routes = [
nats://nats-0.nats:6222
nats://nats-1.nats:6222
nats://nats-2.nats:6222
]
}
server_name: POD_NAME_PLACEHOLDER
cluster {
name: nats-cluster
port: 6222
routes = [
nats://nats-0.nats:6222
nats://nats-1.nats:6222
]
}
jetstream {
store_dir: /data/jetstream
max_mem_store: 2G
max_file_store: 50G
}
# TODO: Auth: nkeys (operator + system account)
# operator: /etc/nats/nats-operator.jwt
# system_account: SYSTEM
# resolver: MEMORY
# Для dev: auth вимкнено. Для prod: обов'язково увімкнути!
---
apiVersion: v1
kind: Secret
metadata:
name: nats-operator
namespace: nats
type: Opaque
data:
# TODO: Generate operator JWT and system account
# nats-operator.jwt: <base64>
# system-account.jwt: <base64>
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: nats
namespace: nats
labels:
app: nats
spec:
serviceName: nats
replicas: 2 # Для поточного кластера (2 ноди). Збільшити до 3+ для prod
selector:
matchLabels:
app: nats
template:
metadata:
labels:
app: nats
spec:
serviceAccountName: nats
initContainers:
- name: config-init
image: busybox:latest
command: ['sh', '-c']
args:
- |
sed "s/POD_NAME_PLACEHOLDER/${POD_NAME}/g" /etc/nats/nats.conf.template > /etc/nats/nats.conf
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
volumeMounts:
- name: config
mountPath: /etc/nats
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- nats
topologyKey: kubernetes.io/hostname
containers:
- name: nats
image: nats:2.10-alpine
ports:
- containerPort: 4222
name: client
- containerPort: 6222
name: cluster
- containerPort: 8222
name: monitor
args:
- -c
- /etc/nats/nats.conf
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
volumeMounts:
- name: config
mountPath: /etc/nats
- name: data
mountPath: /data
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
livenessProbe:
httpGet:
path: /healthz
port: 8222
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
httpGet:
path: /healthz
port: 8222
initialDelaySeconds: 5
periodSeconds: 5
volumes:
- name: config
configMap:
name: nats-config
items:
- key: nats.conf
path: nats.conf.template
# TODO: Додати Secret для operator JWT
# - name: operator-jwt
# secret:
# secretName: nats-operator
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: local-path
resources:
requests:
storage: 100Gi
---
apiVersion: v1
kind: Service
metadata:
name: nats
namespace: nats
spec:
type: ClusterIP
clusterIP: None
selector:
app: nats
ports:
- name: client
port: 4222
targetPort: 4222
- name: cluster
port: 6222
targetPort: 6222
- name: monitor
port: 8222
targetPort: 8222
---
apiVersion: v1
kind: Service
metadata:
name: nats-client
namespace: nats
spec:
type: ClusterIP
selector:
app: nats
ports:
- name: client
port: 4222
targetPort: 4222

View File

@@ -0,0 +1,100 @@
---
# NATS JetStream Streams Configuration
# Memory Module v1: 4 streams для різних класів даних та SLO
apiVersion: v1
kind: ConfigMap
metadata:
name: nats-streams-config
namespace: nats
data:
streams.json: |
{
"streams": [
{
"name": "MM_ONLINE",
"subjects": [
"mm.embed.online",
"mm.retrieve.online",
"mm.summarize.online"
],
"retention": "limits",
"max_age": 1800000000000,
"max_deliver": 3,
"ack_policy": "explicit",
"storage": "file",
"replicas": 3,
"discard": "old",
"duplicate_window": 300000000000
},
{
"name": "MM_OFFLINE",
"subjects": [
"mm.embed.offline",
"mm.index.offline",
"mm.backfill.offline"
],
"retention": "limits",
"max_age": 2592000000000000,
"max_deliver": 10,
"ack_policy": "explicit",
"storage": "file",
"replicas": 3,
"discard": "old"
},
{
"name": "MM_WRITE",
"subjects": [
"mm.qdrant.upsert",
"mm.pg.write",
"mm.neo4j.write"
],
"retention": "limits",
"max_age": 604800000000000,
"max_deliver": 10,
"ack_policy": "explicit",
"storage": "file",
"replicas": 3,
"discard": "old"
},
{
"name": "MM_EVENTS",
"subjects": [
"mm.event.audit",
"mm.event.status"
],
"retention": "limits",
"max_age": 2592000000000000,
"ack_policy": "explicit",
"storage": "file",
"replicas": 3,
"discard": "old"
}
],
"consumers": [
{
"stream_name": "MM_ONLINE",
"name": "online-worker-tier-a",
"durable_name": "online-worker-tier-a",
"ack_policy": "explicit",
"ack_wait": 30000000000,
"max_ack_pending": 5000,
"filter_subject": "mm.embed.online",
"deliver_policy": "all",
"max_deliver": 3,
"backoff": [1000000000, 5000000000, 30000000000]
},
{
"stream_name": "MM_OFFLINE",
"name": "offline-worker-tier-b",
"durable_name": "offline-worker-tier-b",
"ack_policy": "explicit",
"ack_wait": 300000000000,
"max_ack_pending": 10000,
"filter_subject": "mm.embed.offline",
"deliver_policy": "all",
"max_deliver": 10,
"backoff": [10000000000, 60000000000, 300000000000, 1800000000000]
}
]
}

View File

@@ -0,0 +1,95 @@
#!/bin/bash
# Ініціалізація NATS JetStream streams для Memory Module
# Використання: ./init-streams.sh <nats-url>
set -e
NATS_URL=${1:-"nats://nats-client.nats:4222"}
echo "🚀 Ініціалізація NATS JetStream streams..."
echo "NATS URL: $NATS_URL"
# Перевірка доступності NATS
if ! nats --server="$NATS_URL" server check jetstream 2>/dev/null; then
echo "❌ NATS JetStream не доступний. Перевірте deployment."
exit 1
fi
echo ""
echo "=== Створення Stream MM_ONLINE ==="
nats --server="$NATS_URL" stream add MM_ONLINE \
--subjects="mm.embed.online,mm.retrieve.online,mm.summarize.online" \
--storage=file \
--replicas=3 \
--max-age=30m \
--max-deliver=3 \
--ack \
--discard=old \
--duplicate-window=5m \
--retention=limits
echo ""
echo "=== Створення Stream MM_OFFLINE ==="
nats --server="$NATS_URL" stream add MM_OFFLINE \
--subjects="mm.embed.offline,mm.index.offline,mm.backfill.offline" \
--storage=file \
--replicas=3 \
--max-age=7d \
--max-deliver=10 \
--ack \
--discard=old \
--retention=limits
echo ""
echo "=== Створення Stream MM_WRITE ==="
nats --server="$NATS_URL" stream add MM_WRITE \
--subjects="mm.qdrant.upsert,mm.pg.write,mm.neo4j.write" \
--storage=file \
--replicas=3 \
--max-age=7d \
--max-deliver=10 \
--ack \
--discard=old \
--retention=limits
echo ""
echo "=== Створення Stream MM_EVENTS ==="
nats --server="$NATS_URL" stream add MM_EVENTS \
--subjects="mm.event.audit,mm.event.status" \
--storage=file \
--replicas=3 \
--max-age=30d \
--ack \
--discard=old \
--retention=limits
echo ""
echo "=== Створення Consumer online-worker-tier-a ==="
nats --server="$NATS_URL" consumer add MM_ONLINE online-worker-tier-a \
--durable=online-worker-tier-a \
--ack=explicit \
--ack-wait=30s \
--max-ack-pending=5000 \
--filter="mm.embed.online" \
--deliver=all \
--max-deliver=3 \
--backoff=1s,5s,30s
echo ""
echo "=== Створення Consumer offline-worker-tier-b ==="
nats --server="$NATS_URL" consumer add MM_OFFLINE offline-worker-tier-b \
--durable=offline-worker-tier-b \
--ack=explicit \
--ack-wait=5m \
--max-ack-pending=10000 \
--filter="mm.embed.offline" \
--deliver=all \
--max-deliver=10 \
--backoff=10s,1m,5m,30m
echo ""
echo "=== Перевірка streams ==="
nats --server="$NATS_URL" stream ls
echo ""
echo "✅ Streams створено успішно!"

View File

@@ -0,0 +1,203 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Memory Module Job Payload v1",
"description": "Стандартний формат job для NATS JetStream",
"type": "object",
"required": ["job_id", "idempotency_key", "type", "priority", "tenant", "timestamps"],
"properties": {
"job_id": {
"type": "string",
"format": "ulid",
"description": "Унікальний ID job (ULID)"
},
"idempotency_key": {
"type": "string",
"pattern": "^sha256:",
"description": "Idempotency key для safe retries (SHA256 hash)"
},
"type": {
"type": "string",
"enum": [
"embed",
"retrieve",
"summarize",
"qdrant_upsert",
"pg_write",
"neo4j_write",
"index"
],
"description": "Тип job"
},
"priority": {
"type": "string",
"enum": ["online", "offline"],
"description": "Пріоритет: online (low latency) або offline (batch)"
},
"tenant": {
"type": "object",
"required": ["org_id"],
"properties": {
"org_id": {
"type": "string",
"format": "uuid"
},
"workspace_id": {
"type": "string",
"format": "uuid"
}
}
},
"scope": {
"type": "object",
"properties": {
"user_id": {
"type": "string",
"format": "uuid"
},
"agent_id": {
"type": "string",
"format": "uuid"
},
"thread_id": {
"type": "string",
"format": "uuid"
}
}
},
"requirements": {
"type": "object",
"properties": {
"needs_gpu": {
"type": "boolean",
"default": false
},
"min_vram_gb": {
"type": "integer",
"minimum": 0
},
"max_latency_ms": {
"type": "integer",
"minimum": 0
},
"trust_zone": {
"type": "string",
"enum": ["A", "B", "C"]
},
"tier": {
"type": "string",
"enum": ["A", "B", "C"]
}
}
},
"input": {
"type": "object",
"properties": {
"text": {
"type": "array",
"items": {
"type": "string"
}
},
"input_type": {
"type": "string",
"enum": ["search_query", "search_document", "classification", "clustering"]
},
"model": {
"type": "string",
"enum": [
"cohere/embed-multilingual-v3.0",
"bge-m3",
"custom"
]
},
"dims": {
"type": "integer",
"enum": [1024, 768, 512]
}
}
},
"refs": {
"type": "object",
"properties": {
"event_id": {
"type": "string",
"format": "uuid"
},
"memory_id": {
"type": "string",
"format": "uuid"
},
"artifact_uri": {
"type": "string",
"format": "uri"
}
}
},
"trace": {
"type": "object",
"properties": {
"trace_id": {
"type": "string",
"format": "uuid"
},
"parent_span_id": {
"type": "string"
}
}
},
"timestamps": {
"type": "object",
"required": ["created_at"],
"properties": {
"created_at": {
"type": "string",
"format": "date-time"
},
"deadline": {
"type": "string",
"format": "date-time"
}
}
}
},
"examples": [
{
"job_id": "01J8X9Y2Z3A4B5C6D7E8F9G0H1",
"idempotency_key": "sha256:abc123...",
"type": "embed",
"priority": "online",
"tenant": {
"org_id": "550e8400-e29b-41d4-a716-446655440000",
"workspace_id": "660e8400-e29b-41d4-a716-446655440000"
},
"scope": {
"user_id": "770e8400-e29b-41d4-a716-446655440000",
"agent_id": "880e8400-e29b-41d4-a716-446655440000",
"thread_id": "990e8400-e29b-41d4-a716-446655440000"
},
"requirements": {
"needs_gpu": true,
"min_vram_gb": 8,
"max_latency_ms": 300,
"trust_zone": "A",
"tier": "A"
},
"input": {
"text": ["Привіт, це тестовий текст для embedding"],
"input_type": "search_document",
"model": "cohere/embed-multilingual-v3.0",
"dims": 1024
},
"refs": {
"event_id": "aa0e8400-e29b-41d4-a716-446655440000"
},
"trace": {
"trace_id": "bb0e8400-e29b-41d4-a716-446655440000"
},
"timestamps": {
"created_at": "2026-01-10T19:30:00Z",
"deadline": "2026-01-10T19:35:00Z"
}
}
]
}

View File

@@ -0,0 +1,164 @@
# Worker Contract v1 — Memory Module
**Дата:** 2026-01-10
**Версія:** 1.0.0
---
## 📋 Capability Registry
Кожен воркер реєструється з наступними capabilities:
```json
{
"node_id": "node-123",
"tier": "A|B|C",
"region": "eu-west",
"trust_zone": "internal",
"hardware": {
"cpu_cores": 32,
"ram_gb": 128,
"gpu": true,
"gpu_model": "RTX 3090",
"vram_gb": 24,
"cuda_version": "13.0"
},
"capabilities": {
"max_batch": 1000,
"max_tokens": 8192,
"models": ["embed-multilingual-v3.0", "llama3:8b"],
"embedding_dim": 1024,
"supported_jobs": ["embed", "summarize", "index"]
},
"status": "ready|busy|maintenance",
"last_heartbeat": "2026-01-10T19:30:00Z"
}
```
### Реєстрація
- **Postgres** (таблиця `worker_capabilities`) або **Consul** (KV store)
- Heartbeat кожні 30 секунд
- Якщо heartbeat пропущено > 90 секунд → worker вважається offline
---
## 🔄 Job Execution Flow
### 1. Pull Model
Воркер сам вирішує "чи брати job":
- Підписка на consumer (durable)
- Перевірка `requirements` vs власні `capabilities`
- Якщо підходить → бере job, інакше пропускає
### 2. Execution
```python
async def execute_job(job: JobPayload):
# 1. Перевірка requirements
if not can_fulfill(job.requirements, my_capabilities):
await msg.nak() # Negative ack, не підходить
return
# 2. Виконання
try:
result = await process_job(job)
# 3. Запис результату
await write_result(job, result)
# 4. ACK
await msg.ack()
except RetryableError as e:
await msg.nak(delay=e.backoff)
except FatalError as e:
await msg.ack() # ACK щоб не retry
await log_error(job, e)
```
### 3. ACK/NAK Policy
- **ACK** — job виконано успішно
- **NAK** — job не виконано, потрібен retry
- **NAK(delay)** — retry з затримкою
- **Timeout** — якщо `ack_wait` вийшов → автоматичний retry
---
## 📊 Consumer Policies
### Online Consumer (Tier A)
```yaml
stream: MM_ONLINE
consumer: online-worker-tier-a
ack_wait: 30s
max_ack_pending: 5000
concurrency: 50-200 (per worker)
filter: tier=A AND needs_gpu=true (для GPU jobs)
```
**Worker Selection:**
- Тільки `tier=A`
- `needs_gpu=true` → тільки GPU воркери
- `max_latency_ms` перевірка
### Offline Consumer (Tier B)
```yaml
stream: MM_OFFLINE
consumer: offline-worker-tier-b
ack_wait: 5-30 min
max_ack_pending: 10000
concurrency: висока, але з backpressure
```
**Backpressure:**
- Якщо online backlog росте → обмежити offline concurrency
- Preemption: offline jobs можуть бути призупинені для online
---
## 🔍 Idempotency
**Обов'язково:** кожен job має `idempotency_key` (SHA256 hash).
**Перевірка:**
- Перед виконанням перевірити в Postgres: чи вже виконано?
- Якщо так → повернути кешований результат
- Якщо ні → виконати та зберегти результат
**TTL:** idempotency keys зберігаються 7 днів.
---
## 📈 Metrics & Alerts
### Worker Metrics
- `worker_jobs_processed_total{type, status}`
- `worker_job_duration_seconds{type, quantile}`
- `worker_gpu_utilization{node_id}`
- `worker_vram_usage_bytes{node_id}`
- `worker_errors_total{type, error_type}`
### NATS Metrics
- `nats_consumer_lag{stream, consumer}`
- `nats_redeliveries_total{stream}`
- `nats_ack_pending{stream, consumer}`
- `nats_stream_storage_bytes{stream}`
### Alerts → Matrix Ops Room
- `MM_ONLINE backlog > 1000` (SLO порушення)
- `redeliveries spike > 100/min`
- `embed p95 > 500ms` (target: 300ms)
- `disk > 80%` на JetStream PVC
- `worker offline > 2 min` (Tier A)
---
*Документ створено: 2026-01-10 19:30 CET*

View File

@@ -0,0 +1,123 @@
# Worker Daemon — Memory Module
**Дата:** 2026-01-10
**Версія:** 1.0.0
---
## 📋 Призначення
Worker daemon запускається на кожній ноді і:
1. Реєструє capabilities в capability registry
2. Підписується на NATS consumers
3. Виконує jobs з перевіркою requirements
4. Звітує метрики та статуси
---
## 🔧 Deployment
### K8s (NODE1, NODE3)
```yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: memory-worker
namespace: daarion
spec:
selector:
matchLabels:
app: memory-worker
template:
metadata:
labels:
app: memory-worker
spec:
containers:
- name: worker
image: memory-worker:latest
env:
- name: NODE_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: TIER
value: "A" # або B, C
- name: NATS_URL
value: "nats://nats-client.nats:4222"
- name: CAPABILITY_REGISTRY
value: "postgresql://..." # або Consul
```
### Docker (NODE2)
```bash
docker run -d \
--name memory-worker \
-e NODE_ID=node2-macbook-m4 \
-e TIER=C \
-e NATS_URL=nats://nats-client.nats:4222 \
memory-worker:latest
```
---
## 📊 Capability Registry Schema
### Postgres Table
```sql
CREATE TABLE worker_capabilities (
node_id VARCHAR(255) PRIMARY KEY,
tier VARCHAR(10) NOT NULL,
region VARCHAR(50),
trust_zone VARCHAR(50),
hardware JSONB NOT NULL,
capabilities JSONB NOT NULL,
status VARCHAR(20) NOT NULL,
last_heartbeat TIMESTAMP WITH TIME ZONE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_worker_capabilities_tier ON worker_capabilities(tier);
CREATE INDEX idx_worker_capabilities_status ON worker_capabilities(status);
CREATE INDEX idx_worker_capabilities_heartbeat ON worker_capabilities(last_heartbeat);
```
---
## 🔄 Worker Flow
1. **Startup:**
- Реєстрація capabilities в registry
- Heartbeat (кожні 30s)
2. **Job Processing:**
- Підписка на consumer (durable)
- Перевірка `requirements` vs `capabilities`
- Виконання job
- ACK/NAK
3. **Metrics:**
- Експорт метрик в Prometheus format
- Відправка алертів в Matrix (якщо потрібно)
---
## 📁 Структура коду
```
worker-daemon/
├── main.py # Entry point
├── registry.py # Capability registry (Postgres/Consul)
├── nats_client.py # NATS consumer
├── job_executor.py # Job execution
├── metrics.py # Prometheus metrics
└── requirements.txt # Python dependencies
```
---
*Документ створено: 2026-01-10 19:30 CET*