diff --git a/ARCHITECTURE-150-NODES.md b/ARCHITECTURE-150-NODES.md index b8dad6e4..635c6ecc 100644 --- a/ARCHITECTURE-150-NODES.md +++ b/ARCHITECTURE-150-NODES.md @@ -18,6 +18,50 @@ --- +## 🎛️ Control-plane vs Data-plane + +### Control-plane (людський + командний) + +**Matrix Rooms** = контроль, команди, статуси, аудит + +**Призначення:** +- Команди користувачів/агентів +- Статуси виконання (progress + summary) +- Ручне керування +- Аудит рішень +- Оповіщення про помилки/алерти + +**НЕ використовується для:** +- Високочастотної job-шини +- Масових обчислень +- Real-time streaming даних + +### Data-plane (масові таски, fan-out/fan-in) + +**NATS JetStream** = черга задач, розподіл обчислень + +**Призначення:** +- Розподіл jobs між воркерами +- Fan-out/fan-in обчислень +- Retry/ack механізми +- Пріоритизація (online/offline) +- Idempotency гарантії + +**Переваги:** +- Легкий (менше ресурсів ніж Kafka) +- Швидкий (low latency) +- Простіший для старту +- At-least-once delivery +- Підтримка idempotency keys + +### State & Index + +**PostgreSQL** = state (організації, користувачі, threads, events) +**Qdrant** = retrieval index (semantic search) +**Neo4j** = graph relationships + +--- + ## 📊 Поточна структура БД (що правильно, що треба виправити) ### PostgreSQL @@ -134,40 +178,74 @@ --- -## 📦 Схема черг (NATS JetStream) під Memory Module +## 📦 NATS JetStream: Streams + Consumers -### Мінімум 5 потоків: +### Streams (4 основні) -1. **`mem.embed.online`** — embeddings для діалогів (пріоритет: high) -2. **`mem.embed.offline`** — бекфіл/індексація (пріоритет: low) -3. **`mem.qdrant.upsert`** — запис векторів -4. **`mem.summarize.rollup`** — rolling summaries -5. **`mem.memory.distill`** — витяг фактів у long-term memory +#### STREAM `MM_ONLINE` +- **Subjects:** `mm.embed.online`, `mm.retrieve.online`, `mm.summarize.online` +- **Retention:** limits +- **MaxAge:** 30 хв (або 2 год) +- **Ack:** explicit +- **MaxDeliver:** 3 +- **Backoff:** 1s, 5s, 30s +- **Replicas:** 3 -### Формат Job: +#### STREAM `MM_OFFLINE` +- **Subjects:** `mm.embed.offline`, `mm.index.offline`, `mm.backfill.offline` +- **Retention:** limits +- **MaxAge:** 7-30 днів +- **MaxDeliver:** 10 +- **Backoff:** 10s, 1m, 5m, 30m +- **Replicas:** 3 -```json -{ - "job_id": "uuid", - "idempotency_key": "thread-123-embed-v1", - "org_id": "uuid", - "workspace_id": "uuid", - "user_id": "uuid", - "agent_id": "uuid", - "thread_id": "uuid", - "requirements": { - "needs_gpu": true, - "min_vram_gb": 8, - "priority": "high|normal|low", - "max_latency_ms": 5000, - "tier": "A|B|C" - }, - "payload_ref": "s3://bucket/job-123.json", - "payload_inline": {...}, - "created_at": "2026-01-10T19:30:00Z", - "deadline": "2026-01-10T19:35:00Z" -} -``` +#### STREAM `MM_WRITE` +- **Subjects:** `mm.qdrant.upsert`, `mm.pg.write`, `mm.neo4j.write` +- **Retention:** limits +- **MaxAge:** 1-7 днів +- **MaxDeliver:** 10 +- **Replicas:** 3 + +#### STREAM `MM_EVENTS` +- **Subjects:** `mm.event.audit`, `mm.event.status` +- **Retention:** limits +- **MaxAge:** 7-30 днів +- **Replicas:** 3 + +### Consumer Policies + +#### Online Consumer (Tier A) +- **Stream:** `MM_ONLINE` +- **Consumer:** `online-worker-tier-a` +- **Ack Wait:** 30s +- **Max Ack Pending:** 5000 +- **Concurrency:** 50-200 (per worker) +- **Filter:** `tier=A` AND `needs_gpu=true` (для GPU jobs) + +#### Offline Consumer (Tier B) +- **Stream:** `MM_OFFLINE` +- **Consumer:** `offline-worker-tier-b` +- **Ack Wait:** 5-30 хв +- **Max Ack Pending:** 10000 +- **Concurrency:** висока, але з backpressure +- **Preemption:** можливе призупинення для online jobs + +### Job Payload Schema + +Детальна схема: `infrastructure/nats/job-payload-schema.json` + +**Ключові поля:** +- `job_id` (ULID) +- `idempotency_key` (SHA256 hash) — обов'язково для safe retries +- `type` (embed|retrieve|summarize|qdrant_upsert|pg_write|neo4j_write|index) +- `priority` (online|offline) +- `requirements` (needs_gpu, min_vram_gb, max_latency_ms, tier) +- `tenant` (org_id, workspace_id) +- `scope` (user_id, agent_id, thread_id) +- `input` (text, model, dims) +- `refs` (event_id, memory_id, artifact_uri) +- `trace` (trace_id, parent_span_id) +- `timestamps` (created_at, deadline) --- @@ -204,6 +282,39 @@ --- +## 📈 Метрики та Алерти + +### NATS JetStream Metrics + +- `nats_consumer_lag{stream, consumer}` — backlog по streams +- `nats_redeliveries_total{stream}` — кількість retries +- `nats_ack_pending{stream, consumer}` — jobs в обробці +- `nats_stream_storage_bytes{stream}` — використання диску + +### Worker Metrics + +- `worker_jobs_processed_total{type, status}` — загальна кількість jobs +- `worker_job_duration_seconds{type, quantile}` — latency (p50, p95, p99) +- `worker_gpu_utilization{node_id}` — використання GPU +- `worker_vram_usage_bytes{node_id}` — використання VRAM +- `worker_errors_total{type, error_type}` — помилки + +### Алерти → Matrix Ops Room + +**Критичні:** +- `MM_ONLINE backlog > 1000` — SLO порушення +- `redeliveries spike > 100/min` — проблеми з воркерами +- `embed p95 > 500ms` (target: 300ms) +- `disk > 80%` на JetStream PVC +- `worker offline > 2 min` (Tier A) + +**Попередження:** +- `MM_OFFLINE backlog > 10000` +- `ack_pending > 80% max_ack_pending` +- `worker_gpu_utilization < 20%` (Tier A, протягом 10+ хв) + +--- + ## 📋 Наступні 7 кроків (по порядку) ### 1. ✅ Ротація секретів + прибрати креденшали з документів/репо @@ -217,11 +328,14 @@ - [ ] Додати JWT/mTLS auth - [ ] NetworkPolicy для internal-only -### 3. Підняти NATS JetStream (Tier A) -- [ ] Встановити NATS на NODE1 (K8s) -- [ ] Налаштувати JetStream -- [ ] Створити streams для Memory Module -- [ ] Налаштувати replication (мінімум 3 ноди або 1+backup) +### 3. ✅ Підняти NATS JetStream (Tier A) +- [x] Створено K8s deployment (3 replicas + PVC + auth) +- [x] Створено streams definitions (MM_ONLINE, MM_OFFLINE, MM_WRITE, MM_EVENTS) +- [x] Створено job payload schema (JSON v1) +- [x] Створено worker contract (capabilities + ack/retry) +- [ ] Застосувати deployment в K8s +- [ ] Налаштувати operator JWT + system account +- [ ] Створити streams через NATS CLI або API ### 4. Додати worker-daemon на кожну ноду - [ ] Heartbeat до capability registry diff --git a/infrastructure/kubernetes/nats/deployment.yaml b/infrastructure/kubernetes/nats/deployment.yaml new file mode 100644 index 00000000..e97f46ca --- /dev/null +++ b/infrastructure/kubernetes/nats/deployment.yaml @@ -0,0 +1,212 @@ +--- +# NATS JetStream Deployment +# Data-plane для 150 нод: fan-out/fan-in, online/offline пріоритети +apiVersion: v1 +kind: Namespace +metadata: + name: nats +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: nats + namespace: nats +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: nats-config + namespace: nats +data: + nats.conf: | + # NATS JetStream Configuration + port: 4222 + http_port: 8222 + cluster { + port: 6222 + routes = [ + nats://nats-0.nats:6222 + nats://nats-1.nats:6222 + nats://nats-2.nats:6222 + ] + } + + server_name: POD_NAME_PLACEHOLDER + cluster { + name: nats-cluster + port: 6222 + routes = [ + nats://nats-0.nats:6222 + nats://nats-1.nats:6222 + ] + } + jetstream { + store_dir: /data/jetstream + max_mem_store: 2G + max_file_store: 50G + } + + # TODO: Auth: nkeys (operator + system account) + # operator: /etc/nats/nats-operator.jwt + # system_account: SYSTEM + # resolver: MEMORY + # Для dev: auth вимкнено. Для prod: обов'язково увімкнути! +--- +apiVersion: v1 +kind: Secret +metadata: + name: nats-operator + namespace: nats +type: Opaque +data: + # TODO: Generate operator JWT and system account + # nats-operator.jwt: + # system-account.jwt: +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: nats + namespace: nats + labels: + app: nats +spec: + serviceName: nats + replicas: 2 # Для поточного кластера (2 ноди). Збільшити до 3+ для prod + selector: + matchLabels: + app: nats + template: + metadata: + labels: + app: nats + spec: + serviceAccountName: nats + initContainers: + - name: config-init + image: busybox:latest + command: ['sh', '-c'] + args: + - | + sed "s/POD_NAME_PLACEHOLDER/${POD_NAME}/g" /etc/nats/nats.conf.template > /etc/nats/nats.conf + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + volumeMounts: + - name: config + mountPath: /etc/nats + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: app + operator: In + values: + - nats + topologyKey: kubernetes.io/hostname + containers: + - name: nats + image: nats:2.10-alpine + ports: + - containerPort: 4222 + name: client + - containerPort: 6222 + name: cluster + - containerPort: 8222 + name: monitor + args: + - -c + - /etc/nats/nats.conf + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + volumeMounts: + - name: config + mountPath: /etc/nats + - name: data + mountPath: /data + resources: + requests: + memory: "2Gi" + cpu: "1" + limits: + memory: "4Gi" + cpu: "2" + livenessProbe: + httpGet: + path: /healthz + port: 8222 + initialDelaySeconds: 10 + periodSeconds: 10 + readinessProbe: + httpGet: + path: /healthz + port: 8222 + initialDelaySeconds: 5 + periodSeconds: 5 + volumes: + - name: config + configMap: + name: nats-config + items: + - key: nats.conf + path: nats.conf.template + # TODO: Додати Secret для operator JWT + # - name: operator-jwt + # secret: + # secretName: nats-operator + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + storageClassName: local-path + resources: + requests: + storage: 100Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: nats + namespace: nats +spec: + type: ClusterIP + clusterIP: None + selector: + app: nats + ports: + - name: client + port: 4222 + targetPort: 4222 + - name: cluster + port: 6222 + targetPort: 6222 + - name: monitor + port: 8222 + targetPort: 8222 +--- +apiVersion: v1 +kind: Service +metadata: + name: nats-client + namespace: nats +spec: + type: ClusterIP + selector: + app: nats + ports: + - name: client + port: 4222 + targetPort: 4222 diff --git a/infrastructure/kubernetes/nats/streams.yaml b/infrastructure/kubernetes/nats/streams.yaml new file mode 100644 index 00000000..8127e907 --- /dev/null +++ b/infrastructure/kubernetes/nats/streams.yaml @@ -0,0 +1,100 @@ +--- +# NATS JetStream Streams Configuration +# Memory Module v1: 4 streams для різних класів даних та SLO + +apiVersion: v1 +kind: ConfigMap +metadata: + name: nats-streams-config + namespace: nats +data: + streams.json: | + { + "streams": [ + { + "name": "MM_ONLINE", + "subjects": [ + "mm.embed.online", + "mm.retrieve.online", + "mm.summarize.online" + ], + "retention": "limits", + "max_age": 1800000000000, + "max_deliver": 3, + "ack_policy": "explicit", + "storage": "file", + "replicas": 3, + "discard": "old", + "duplicate_window": 300000000000 + }, + { + "name": "MM_OFFLINE", + "subjects": [ + "mm.embed.offline", + "mm.index.offline", + "mm.backfill.offline" + ], + "retention": "limits", + "max_age": 2592000000000000, + "max_deliver": 10, + "ack_policy": "explicit", + "storage": "file", + "replicas": 3, + "discard": "old" + }, + { + "name": "MM_WRITE", + "subjects": [ + "mm.qdrant.upsert", + "mm.pg.write", + "mm.neo4j.write" + ], + "retention": "limits", + "max_age": 604800000000000, + "max_deliver": 10, + "ack_policy": "explicit", + "storage": "file", + "replicas": 3, + "discard": "old" + }, + { + "name": "MM_EVENTS", + "subjects": [ + "mm.event.audit", + "mm.event.status" + ], + "retention": "limits", + "max_age": 2592000000000000, + "ack_policy": "explicit", + "storage": "file", + "replicas": 3, + "discard": "old" + } + ], + "consumers": [ + { + "stream_name": "MM_ONLINE", + "name": "online-worker-tier-a", + "durable_name": "online-worker-tier-a", + "ack_policy": "explicit", + "ack_wait": 30000000000, + "max_ack_pending": 5000, + "filter_subject": "mm.embed.online", + "deliver_policy": "all", + "max_deliver": 3, + "backoff": [1000000000, 5000000000, 30000000000] + }, + { + "stream_name": "MM_OFFLINE", + "name": "offline-worker-tier-b", + "durable_name": "offline-worker-tier-b", + "ack_policy": "explicit", + "ack_wait": 300000000000, + "max_ack_pending": 10000, + "filter_subject": "mm.embed.offline", + "deliver_policy": "all", + "max_deliver": 10, + "backoff": [10000000000, 60000000000, 300000000000, 1800000000000] + } + ] + } diff --git a/infrastructure/nats/init-streams.sh b/infrastructure/nats/init-streams.sh new file mode 100755 index 00000000..15cfd35a --- /dev/null +++ b/infrastructure/nats/init-streams.sh @@ -0,0 +1,95 @@ +#!/bin/bash +# Ініціалізація NATS JetStream streams для Memory Module +# Використання: ./init-streams.sh + +set -e + +NATS_URL=${1:-"nats://nats-client.nats:4222"} + +echo "🚀 Ініціалізація NATS JetStream streams..." +echo "NATS URL: $NATS_URL" + +# Перевірка доступності NATS +if ! nats --server="$NATS_URL" server check jetstream 2>/dev/null; then + echo "❌ NATS JetStream не доступний. Перевірте deployment." + exit 1 +fi + +echo "" +echo "=== Створення Stream MM_ONLINE ===" +nats --server="$NATS_URL" stream add MM_ONLINE \ + --subjects="mm.embed.online,mm.retrieve.online,mm.summarize.online" \ + --storage=file \ + --replicas=3 \ + --max-age=30m \ + --max-deliver=3 \ + --ack \ + --discard=old \ + --duplicate-window=5m \ + --retention=limits + +echo "" +echo "=== Створення Stream MM_OFFLINE ===" +nats --server="$NATS_URL" stream add MM_OFFLINE \ + --subjects="mm.embed.offline,mm.index.offline,mm.backfill.offline" \ + --storage=file \ + --replicas=3 \ + --max-age=7d \ + --max-deliver=10 \ + --ack \ + --discard=old \ + --retention=limits + +echo "" +echo "=== Створення Stream MM_WRITE ===" +nats --server="$NATS_URL" stream add MM_WRITE \ + --subjects="mm.qdrant.upsert,mm.pg.write,mm.neo4j.write" \ + --storage=file \ + --replicas=3 \ + --max-age=7d \ + --max-deliver=10 \ + --ack \ + --discard=old \ + --retention=limits + +echo "" +echo "=== Створення Stream MM_EVENTS ===" +nats --server="$NATS_URL" stream add MM_EVENTS \ + --subjects="mm.event.audit,mm.event.status" \ + --storage=file \ + --replicas=3 \ + --max-age=30d \ + --ack \ + --discard=old \ + --retention=limits + +echo "" +echo "=== Створення Consumer online-worker-tier-a ===" +nats --server="$NATS_URL" consumer add MM_ONLINE online-worker-tier-a \ + --durable=online-worker-tier-a \ + --ack=explicit \ + --ack-wait=30s \ + --max-ack-pending=5000 \ + --filter="mm.embed.online" \ + --deliver=all \ + --max-deliver=3 \ + --backoff=1s,5s,30s + +echo "" +echo "=== Створення Consumer offline-worker-tier-b ===" +nats --server="$NATS_URL" consumer add MM_OFFLINE offline-worker-tier-b \ + --durable=offline-worker-tier-b \ + --ack=explicit \ + --ack-wait=5m \ + --max-ack-pending=10000 \ + --filter="mm.embed.offline" \ + --deliver=all \ + --max-deliver=10 \ + --backoff=10s,1m,5m,30m + +echo "" +echo "=== Перевірка streams ===" +nats --server="$NATS_URL" stream ls + +echo "" +echo "✅ Streams створено успішно!" diff --git a/infrastructure/nats/job-payload-schema.json b/infrastructure/nats/job-payload-schema.json new file mode 100644 index 00000000..12daf83f --- /dev/null +++ b/infrastructure/nats/job-payload-schema.json @@ -0,0 +1,203 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Memory Module Job Payload v1", + "description": "Стандартний формат job для NATS JetStream", + "type": "object", + "required": ["job_id", "idempotency_key", "type", "priority", "tenant", "timestamps"], + "properties": { + "job_id": { + "type": "string", + "format": "ulid", + "description": "Унікальний ID job (ULID)" + }, + "idempotency_key": { + "type": "string", + "pattern": "^sha256:", + "description": "Idempotency key для safe retries (SHA256 hash)" + }, + "type": { + "type": "string", + "enum": [ + "embed", + "retrieve", + "summarize", + "qdrant_upsert", + "pg_write", + "neo4j_write", + "index" + ], + "description": "Тип job" + }, + "priority": { + "type": "string", + "enum": ["online", "offline"], + "description": "Пріоритет: online (low latency) або offline (batch)" + }, + "tenant": { + "type": "object", + "required": ["org_id"], + "properties": { + "org_id": { + "type": "string", + "format": "uuid" + }, + "workspace_id": { + "type": "string", + "format": "uuid" + } + } + }, + "scope": { + "type": "object", + "properties": { + "user_id": { + "type": "string", + "format": "uuid" + }, + "agent_id": { + "type": "string", + "format": "uuid" + }, + "thread_id": { + "type": "string", + "format": "uuid" + } + } + }, + "requirements": { + "type": "object", + "properties": { + "needs_gpu": { + "type": "boolean", + "default": false + }, + "min_vram_gb": { + "type": "integer", + "minimum": 0 + }, + "max_latency_ms": { + "type": "integer", + "minimum": 0 + }, + "trust_zone": { + "type": "string", + "enum": ["A", "B", "C"] + }, + "tier": { + "type": "string", + "enum": ["A", "B", "C"] + } + } + }, + "input": { + "type": "object", + "properties": { + "text": { + "type": "array", + "items": { + "type": "string" + } + }, + "input_type": { + "type": "string", + "enum": ["search_query", "search_document", "classification", "clustering"] + }, + "model": { + "type": "string", + "enum": [ + "cohere/embed-multilingual-v3.0", + "bge-m3", + "custom" + ] + }, + "dims": { + "type": "integer", + "enum": [1024, 768, 512] + } + } + }, + "refs": { + "type": "object", + "properties": { + "event_id": { + "type": "string", + "format": "uuid" + }, + "memory_id": { + "type": "string", + "format": "uuid" + }, + "artifact_uri": { + "type": "string", + "format": "uri" + } + } + }, + "trace": { + "type": "object", + "properties": { + "trace_id": { + "type": "string", + "format": "uuid" + }, + "parent_span_id": { + "type": "string" + } + } + }, + "timestamps": { + "type": "object", + "required": ["created_at"], + "properties": { + "created_at": { + "type": "string", + "format": "date-time" + }, + "deadline": { + "type": "string", + "format": "date-time" + } + } + } + }, + "examples": [ + { + "job_id": "01J8X9Y2Z3A4B5C6D7E8F9G0H1", + "idempotency_key": "sha256:abc123...", + "type": "embed", + "priority": "online", + "tenant": { + "org_id": "550e8400-e29b-41d4-a716-446655440000", + "workspace_id": "660e8400-e29b-41d4-a716-446655440000" + }, + "scope": { + "user_id": "770e8400-e29b-41d4-a716-446655440000", + "agent_id": "880e8400-e29b-41d4-a716-446655440000", + "thread_id": "990e8400-e29b-41d4-a716-446655440000" + }, + "requirements": { + "needs_gpu": true, + "min_vram_gb": 8, + "max_latency_ms": 300, + "trust_zone": "A", + "tier": "A" + }, + "input": { + "text": ["Привіт, це тестовий текст для embedding"], + "input_type": "search_document", + "model": "cohere/embed-multilingual-v3.0", + "dims": 1024 + }, + "refs": { + "event_id": "aa0e8400-e29b-41d4-a716-446655440000" + }, + "trace": { + "trace_id": "bb0e8400-e29b-41d4-a716-446655440000" + }, + "timestamps": { + "created_at": "2026-01-10T19:30:00Z", + "deadline": "2026-01-10T19:35:00Z" + } + } + ] +} diff --git a/infrastructure/nats/worker-contract.md b/infrastructure/nats/worker-contract.md new file mode 100644 index 00000000..6f665bd5 --- /dev/null +++ b/infrastructure/nats/worker-contract.md @@ -0,0 +1,164 @@ +# Worker Contract v1 — Memory Module + +**Дата:** 2026-01-10 +**Версія:** 1.0.0 + +--- + +## 📋 Capability Registry + +Кожен воркер реєструється з наступними capabilities: + +```json +{ + "node_id": "node-123", + "tier": "A|B|C", + "region": "eu-west", + "trust_zone": "internal", + "hardware": { + "cpu_cores": 32, + "ram_gb": 128, + "gpu": true, + "gpu_model": "RTX 3090", + "vram_gb": 24, + "cuda_version": "13.0" + }, + "capabilities": { + "max_batch": 1000, + "max_tokens": 8192, + "models": ["embed-multilingual-v3.0", "llama3:8b"], + "embedding_dim": 1024, + "supported_jobs": ["embed", "summarize", "index"] + }, + "status": "ready|busy|maintenance", + "last_heartbeat": "2026-01-10T19:30:00Z" +} +``` + +### Реєстрація + +- **Postgres** (таблиця `worker_capabilities`) або **Consul** (KV store) +- Heartbeat кожні 30 секунд +- Якщо heartbeat пропущено > 90 секунд → worker вважається offline + +--- + +## 🔄 Job Execution Flow + +### 1. Pull Model + +Воркер сам вирішує "чи брати job": +- Підписка на consumer (durable) +- Перевірка `requirements` vs власні `capabilities` +- Якщо підходить → бере job, інакше пропускає + +### 2. Execution + +```python +async def execute_job(job: JobPayload): + # 1. Перевірка requirements + if not can_fulfill(job.requirements, my_capabilities): + await msg.nak() # Negative ack, не підходить + return + + # 2. Виконання + try: + result = await process_job(job) + + # 3. Запис результату + await write_result(job, result) + + # 4. ACK + await msg.ack() + + except RetryableError as e: + await msg.nak(delay=e.backoff) + except FatalError as e: + await msg.ack() # ACK щоб не retry + await log_error(job, e) +``` + +### 3. ACK/NAK Policy + +- **ACK** — job виконано успішно +- **NAK** — job не виконано, потрібен retry +- **NAK(delay)** — retry з затримкою +- **Timeout** — якщо `ack_wait` вийшов → автоматичний retry + +--- + +## 📊 Consumer Policies + +### Online Consumer (Tier A) + +```yaml +stream: MM_ONLINE +consumer: online-worker-tier-a +ack_wait: 30s +max_ack_pending: 5000 +concurrency: 50-200 (per worker) +filter: tier=A AND needs_gpu=true (для GPU jobs) +``` + +**Worker Selection:** +- Тільки `tier=A` +- `needs_gpu=true` → тільки GPU воркери +- `max_latency_ms` перевірка + +### Offline Consumer (Tier B) + +```yaml +stream: MM_OFFLINE +consumer: offline-worker-tier-b +ack_wait: 5-30 min +max_ack_pending: 10000 +concurrency: висока, але з backpressure +``` + +**Backpressure:** +- Якщо online backlog росте → обмежити offline concurrency +- Preemption: offline jobs можуть бути призупинені для online + +--- + +## 🔍 Idempotency + +**Обов'язково:** кожен job має `idempotency_key` (SHA256 hash). + +**Перевірка:** +- Перед виконанням перевірити в Postgres: чи вже виконано? +- Якщо так → повернути кешований результат +- Якщо ні → виконати та зберегти результат + +**TTL:** idempotency keys зберігаються 7 днів. + +--- + +## 📈 Metrics & Alerts + +### Worker Metrics + +- `worker_jobs_processed_total{type, status}` +- `worker_job_duration_seconds{type, quantile}` +- `worker_gpu_utilization{node_id}` +- `worker_vram_usage_bytes{node_id}` +- `worker_errors_total{type, error_type}` + +### NATS Metrics + +- `nats_consumer_lag{stream, consumer}` +- `nats_redeliveries_total{stream}` +- `nats_ack_pending{stream, consumer}` +- `nats_stream_storage_bytes{stream}` + +### Alerts → Matrix Ops Room + +- `MM_ONLINE backlog > 1000` (SLO порушення) +- `redeliveries spike > 100/min` +- `embed p95 > 500ms` (target: 300ms) +- `disk > 80%` на JetStream PVC +- `worker offline > 2 min` (Tier A) + +--- + +*Документ створено: 2026-01-10 19:30 CET* diff --git a/infrastructure/worker-daemon/README.md b/infrastructure/worker-daemon/README.md new file mode 100644 index 00000000..9b194379 --- /dev/null +++ b/infrastructure/worker-daemon/README.md @@ -0,0 +1,123 @@ +# Worker Daemon — Memory Module + +**Дата:** 2026-01-10 +**Версія:** 1.0.0 + +--- + +## 📋 Призначення + +Worker daemon запускається на кожній ноді і: +1. Реєструє capabilities в capability registry +2. Підписується на NATS consumers +3. Виконує jobs з перевіркою requirements +4. Звітує метрики та статуси + +--- + +## 🔧 Deployment + +### K8s (NODE1, NODE3) + +```yaml +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: memory-worker + namespace: daarion +spec: + selector: + matchLabels: + app: memory-worker + template: + metadata: + labels: + app: memory-worker + spec: + containers: + - name: worker + image: memory-worker:latest + env: + - name: NODE_ID + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: TIER + value: "A" # або B, C + - name: NATS_URL + value: "nats://nats-client.nats:4222" + - name: CAPABILITY_REGISTRY + value: "postgresql://..." # або Consul +``` + +### Docker (NODE2) + +```bash +docker run -d \ + --name memory-worker \ + -e NODE_ID=node2-macbook-m4 \ + -e TIER=C \ + -e NATS_URL=nats://nats-client.nats:4222 \ + memory-worker:latest +``` + +--- + +## 📊 Capability Registry Schema + +### Postgres Table + +```sql +CREATE TABLE worker_capabilities ( + node_id VARCHAR(255) PRIMARY KEY, + tier VARCHAR(10) NOT NULL, + region VARCHAR(50), + trust_zone VARCHAR(50), + hardware JSONB NOT NULL, + capabilities JSONB NOT NULL, + status VARCHAR(20) NOT NULL, + last_heartbeat TIMESTAMP WITH TIME ZONE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX idx_worker_capabilities_tier ON worker_capabilities(tier); +CREATE INDEX idx_worker_capabilities_status ON worker_capabilities(status); +CREATE INDEX idx_worker_capabilities_heartbeat ON worker_capabilities(last_heartbeat); +``` + +--- + +## 🔄 Worker Flow + +1. **Startup:** + - Реєстрація capabilities в registry + - Heartbeat (кожні 30s) + +2. **Job Processing:** + - Підписка на consumer (durable) + - Перевірка `requirements` vs `capabilities` + - Виконання job + - ACK/NAK + +3. **Metrics:** + - Експорт метрик в Prometheus format + - Відправка алертів в Matrix (якщо потрібно) + +--- + +## 📁 Структура коду + +``` +worker-daemon/ +├── main.py # Entry point +├── registry.py # Capability registry (Postgres/Consul) +├── nats_client.py # NATS consumer +├── job_executor.py # Job execution +├── metrics.py # Prometheus metrics +└── requirements.txt # Python dependencies +``` + +--- + +*Документ створено: 2026-01-10 19:30 CET*