🔧 Worker Daemon: базова реалізація v1
Some checks failed
Update Documentation / update-repos-info (push) Has been cancelled
Some checks failed
Update Documentation / update-repos-info (push) Has been cancelled
- Capability Registry (Postgres heartbeat) - NATS Client (підписка на streams) - Job Executor (виконання jobs) - Metrics Exporter (Prometheus) - Dockerfile для deployment - Виправлено server_name в NATS (emptyDir) TODO: Реальна реалізація embed/retrieve/summarize, Matrix Gateway, Auth
This commit is contained in:
76
infrastructure/nats/init-streams-kubectl.sh
Executable file
76
infrastructure/nats/init-streams-kubectl.sh
Executable file
@@ -0,0 +1,76 @@
|
||||
#!/bin/bash
|
||||
# Ініціалізація NATS JetStream streams через kubectl exec
|
||||
# Використання: ./init-streams-kubectl.sh
|
||||
|
||||
set -e
|
||||
|
||||
echo "🚀 Ініціалізація NATS JetStream streams через kubectl..."
|
||||
|
||||
# Встановлюємо NATS CLI в pod (якщо потрібно)
|
||||
echo "=== Встановлення NATS CLI в pod ==="
|
||||
kubectl exec -n nats nats-0 -- sh -c 'apk add --no-cache curl && curl -L https://github.com/nats-io/natscli/releases/latest/download/nats-linux-amd64.zip -o /tmp/nats.zip && unzip -o /tmp/nats.zip -d /tmp && chmod +x /tmp/nats' || echo "NATS CLI встановлення..."
|
||||
|
||||
NATS_URL="nats://nats-client.nats:4222"
|
||||
|
||||
echo ""
|
||||
echo "=== Створення Stream MM_ONLINE ==="
|
||||
kubectl exec -n nats nats-0 -- /tmp/nats stream add MM_ONLINE \
|
||||
--server="$NATS_URL" \
|
||||
--subjects="mm.embed.online,mm.retrieve.online,mm.summarize.online" \
|
||||
--storage=file \
|
||||
--replicas=2 \
|
||||
--max-age=30m \
|
||||
--max-deliver=3 \
|
||||
--ack \
|
||||
--discard=old \
|
||||
--duplicate-window=5m \
|
||||
--retention=limits \
|
||||
--yes || echo "Stream вже існує або помилка"
|
||||
|
||||
echo ""
|
||||
echo "=== Створення Stream MM_OFFLINE ==="
|
||||
kubectl exec -n nats nats-0 -- /tmp/nats stream add MM_OFFLINE \
|
||||
--server="$NATS_URL" \
|
||||
--subjects="mm.embed.offline,mm.index.offline,mm.backfill.offline" \
|
||||
--storage=file \
|
||||
--replicas=2 \
|
||||
--max-age=7d \
|
||||
--max-deliver=10 \
|
||||
--ack \
|
||||
--discard=old \
|
||||
--retention=limits \
|
||||
--yes || echo "Stream вже існує або помилка"
|
||||
|
||||
echo ""
|
||||
echo "=== Створення Stream MM_WRITE ==="
|
||||
kubectl exec -n nats nats-0 -- /tmp/nats stream add MM_WRITE \
|
||||
--server="$NATS_URL" \
|
||||
--subjects="mm.qdrant.upsert,mm.pg.write,mm.neo4j.write" \
|
||||
--storage=file \
|
||||
--replicas=2 \
|
||||
--max-age=7d \
|
||||
--max-deliver=10 \
|
||||
--ack \
|
||||
--discard=old \
|
||||
--retention=limits \
|
||||
--yes || echo "Stream вже існує або помилка"
|
||||
|
||||
echo ""
|
||||
echo "=== Створення Stream MM_EVENTS ==="
|
||||
kubectl exec -n nats nats-0 -- /tmp/nats stream add MM_EVENTS \
|
||||
--server="$NATS_URL" \
|
||||
--subjects="mm.event.audit,mm.event.status" \
|
||||
--storage=file \
|
||||
--replicas=2 \
|
||||
--max-age=30d \
|
||||
--ack \
|
||||
--discard=old \
|
||||
--retention=limits \
|
||||
--yes || echo "Stream вже існує або помилка"
|
||||
|
||||
echo ""
|
||||
echo "=== Перевірка streams ==="
|
||||
kubectl exec -n nats nats-0 -- /tmp/nats stream ls --server="$NATS_URL" || echo "Помилка перевірки"
|
||||
|
||||
echo ""
|
||||
echo "✅ Streams створено!"
|
||||
93
infrastructure/nats/init-streams.py
Executable file
93
infrastructure/nats/init-streams.py
Executable file
@@ -0,0 +1,93 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Ініціалізація NATS JetStream streams для Memory Module
|
||||
Використання: python3 init-streams.py <nats-url>
|
||||
"""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import requests
|
||||
from typing import Dict, Any
|
||||
|
||||
NATS_URL = sys.argv[1] if len(sys.argv) > 1 else "nats://nats-client.nats:4222"
|
||||
NATS_HTTP = NATS_URL.replace("nats://", "http://").replace(":4222", ":8222")
|
||||
|
||||
def create_stream(stream_config: Dict[str, Any]) -> bool:
|
||||
"""Створює stream через NATS HTTP API"""
|
||||
url = f"{NATS_HTTP}/jsz?streams=1"
|
||||
|
||||
# NATS HTTP API використовує інший формат
|
||||
# Спробуємо через JetStream API
|
||||
try:
|
||||
# Для створення stream потрібен NATS CLI або Go/Python клієнт
|
||||
# Тимчасово використовуємо curl через kubectl exec
|
||||
print(f"⚠️ Stream '{stream_config['name']}' потрібно створити через NATS CLI")
|
||||
print(f" Використайте: kubectl exec -n nats nats-0 -- nats stream add {stream_config['name']} ...")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"❌ Помилка створення stream: {e}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
print(f"🚀 Ініціалізація NATS JetStream streams...")
|
||||
print(f"NATS URL: {NATS_URL}")
|
||||
print(f"NATS HTTP: {NATS_HTTP}")
|
||||
|
||||
streams = [
|
||||
{
|
||||
"name": "MM_ONLINE",
|
||||
"subjects": ["mm.embed.online", "mm.retrieve.online", "mm.summarize.online"],
|
||||
"retention": "limits",
|
||||
"max_age": 1800000000000, # 30 хв
|
||||
"max_deliver": 3,
|
||||
"ack_policy": "explicit",
|
||||
"storage": "file",
|
||||
"replicas": 2,
|
||||
"discard": "old",
|
||||
"duplicate_window": 300000000000
|
||||
},
|
||||
{
|
||||
"name": "MM_OFFLINE",
|
||||
"subjects": ["mm.embed.offline", "mm.index.offline", "mm.backfill.offline"],
|
||||
"retention": "limits",
|
||||
"max_age": 604800000000000, # 7 днів
|
||||
"max_deliver": 10,
|
||||
"ack_policy": "explicit",
|
||||
"storage": "file",
|
||||
"replicas": 2,
|
||||
"discard": "old"
|
||||
},
|
||||
{
|
||||
"name": "MM_WRITE",
|
||||
"subjects": ["mm.qdrant.upsert", "mm.pg.write", "mm.neo4j.write"],
|
||||
"retention": "limits",
|
||||
"max_age": 604800000000000, # 7 днів
|
||||
"max_deliver": 10,
|
||||
"ack_policy": "explicit",
|
||||
"storage": "file",
|
||||
"replicas": 2,
|
||||
"discard": "old"
|
||||
},
|
||||
{
|
||||
"name": "MM_EVENTS",
|
||||
"subjects": ["mm.event.audit", "mm.event.status"],
|
||||
"retention": "limits",
|
||||
"max_age": 2592000000000000, # 30 днів
|
||||
"ack_policy": "explicit",
|
||||
"storage": "file",
|
||||
"replicas": 2,
|
||||
"discard": "old"
|
||||
}
|
||||
]
|
||||
|
||||
print("\n📋 Streams для створення:")
|
||||
for stream in streams:
|
||||
print(f" - {stream['name']}: {', '.join(stream['subjects'])}")
|
||||
|
||||
print("\n⚠️ Для створення streams потрібен NATS CLI.")
|
||||
print(" Використайте init-streams.sh або встановіть NATS CLI в контейнер.")
|
||||
|
||||
return 0
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
13
infrastructure/worker-daemon/Dockerfile
Normal file
13
infrastructure/worker-daemon/Dockerfile
Normal file
@@ -0,0 +1,13 @@
|
||||
FROM python:3.11-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Встановлення залежностей
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Копіювання коду
|
||||
COPY worker/ ./worker/
|
||||
|
||||
# Entrypoint
|
||||
ENTRYPOINT ["python3", "-m", "worker.main"]
|
||||
7
infrastructure/worker-daemon/requirements.txt
Normal file
7
infrastructure/worker-daemon/requirements.txt
Normal file
@@ -0,0 +1,7 @@
|
||||
nats-py==2.7.0
|
||||
asyncpg==0.29.0
|
||||
psycopg2-binary==2.9.9
|
||||
pydantic==2.5.3
|
||||
python-dotenv==1.0.1
|
||||
prometheus-client==0.19.0
|
||||
aiohttp==3.9.1
|
||||
1
infrastructure/worker-daemon/worker/__init__.py
Normal file
1
infrastructure/worker-daemon/worker/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Worker Daemon package
|
||||
128
infrastructure/worker-daemon/worker/job_executor.py
Normal file
128
infrastructure/worker-daemon/worker/job_executor.py
Normal file
@@ -0,0 +1,128 @@
|
||||
"""
|
||||
Job Executor — виконання jobs з NATS
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from typing import Dict, Any, Optional
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class JobExecutor:
|
||||
def __init__(self, node_id: str, tier: str):
|
||||
self.node_id = node_id
|
||||
self.tier = tier
|
||||
|
||||
async def execute_job(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Виконання job"""
|
||||
job_id = job_data.get("job_id", "unknown")
|
||||
job_type = job_data.get("type", "unknown")
|
||||
priority = job_data.get("priority", "offline")
|
||||
|
||||
print(f"📦 Виконання job: {job_id} (type: {job_type}, priority: {priority})")
|
||||
|
||||
# Перевірка requirements
|
||||
if not self._can_fulfill(job_data.get("requirements", {})):
|
||||
return {
|
||||
"success": False,
|
||||
"reason": "requirements_not_met"
|
||||
}
|
||||
|
||||
# Виконання залежно від типу
|
||||
try:
|
||||
if job_type == "embed":
|
||||
result = await self._execute_embed(job_data)
|
||||
elif job_type == "retrieve":
|
||||
result = await self._execute_retrieve(job_data)
|
||||
elif job_type == "summarize":
|
||||
result = await self._execute_summarize(job_data)
|
||||
elif job_type == "qdrant_upsert":
|
||||
result = await self._execute_qdrant_upsert(job_data)
|
||||
elif job_type == "pg_write":
|
||||
result = await self._execute_pg_write(job_data)
|
||||
elif job_type == "neo4j_write":
|
||||
result = await self._execute_neo4j_write(job_data)
|
||||
else:
|
||||
return {
|
||||
"success": False,
|
||||
"reason": f"unknown_job_type: {job_type}"
|
||||
}
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"result": result
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Помилка виконання job {job_id}: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"reason": str(e),
|
||||
"backoff": 30 # Retry через 30 секунд
|
||||
}
|
||||
|
||||
def _can_fulfill(self, requirements: Dict[str, Any]) -> bool:
|
||||
"""Перевірка, чи воркер може виконати job"""
|
||||
# Перевірка tier
|
||||
required_tier = requirements.get("tier")
|
||||
if required_tier and required_tier != self.tier:
|
||||
return False
|
||||
|
||||
# Перевірка GPU
|
||||
needs_gpu = requirements.get("needs_gpu", False)
|
||||
if needs_gpu:
|
||||
# TODO: Перевірити наявність GPU
|
||||
pass
|
||||
|
||||
# Перевірка VRAM
|
||||
min_vram = requirements.get("min_vram_gb", 0)
|
||||
if min_vram > 0:
|
||||
# TODO: Перевірити доступну VRAM
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
async def _execute_embed(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Виконання embedding job"""
|
||||
input_data = job_data.get("input", {})
|
||||
texts = input_data.get("text", [])
|
||||
model = input_data.get("model", "cohere/embed-multilingual-v3.0")
|
||||
|
||||
# TODO: Реальна реалізація через Cohere API або локальну модель
|
||||
print(f" → Embedding {len(texts)} texts з {model}")
|
||||
|
||||
# Симуляція
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
embeddings = [[0.0] * 1024 for _ in texts] # Placeholder
|
||||
|
||||
return {
|
||||
"embeddings": embeddings,
|
||||
"model": model,
|
||||
"dims": 1024
|
||||
}
|
||||
|
||||
async def _execute_retrieve(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Виконання retrieval job"""
|
||||
# TODO: Реальна реалізація через Qdrant
|
||||
return {"results": []}
|
||||
|
||||
async def _execute_summarize(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Виконання summarization job"""
|
||||
# TODO: Реальна реалізація через LLM
|
||||
return {"summary": "Placeholder summary"}
|
||||
|
||||
async def _execute_qdrant_upsert(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Виконання Qdrant upsert"""
|
||||
# TODO: Реальна реалізація
|
||||
return {"status": "ok"}
|
||||
|
||||
async def _execute_pg_write(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Виконання PostgreSQL write"""
|
||||
# TODO: Реальна реалізація
|
||||
return {"status": "ok"}
|
||||
|
||||
async def _execute_neo4j_write(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Виконання Neo4j write"""
|
||||
# TODO: Реальна реалізація
|
||||
return {"status": "ok"}
|
||||
113
infrastructure/worker-daemon/worker/main.py
Normal file
113
infrastructure/worker-daemon/worker/main.py
Normal file
@@ -0,0 +1,113 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Worker Daemon для Memory Module
|
||||
Реєстрація capabilities, підписка на NATS streams, виконання jobs
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
from worker.registry import CapabilityRegistry
|
||||
from worker.nats_client import NATSClient
|
||||
from worker.job_executor import JobExecutor
|
||||
from worker.metrics import MetricsExporter
|
||||
|
||||
|
||||
class WorkerDaemon:
|
||||
def __init__(self):
|
||||
self.node_id = os.getenv("NODE_ID", "unknown")
|
||||
self.tier = os.getenv("TIER", "C")
|
||||
self.region = os.getenv("REGION", "unknown")
|
||||
self.nats_url = os.getenv("NATS_URL", "nats://nats-client.nats:4222")
|
||||
self.postgres_url = os.getenv("CAPABILITY_REGISTRY", "")
|
||||
|
||||
self.registry: Optional[CapabilityRegistry] = None
|
||||
self.nats_client: Optional[NATSClient] = None
|
||||
self.job_executor: Optional[JobExecutor] = None
|
||||
self.metrics: Optional[MetricsExporter] = None
|
||||
|
||||
self.running = False
|
||||
|
||||
async def start(self):
|
||||
"""Запуск worker daemon"""
|
||||
print(f"🚀 Worker Daemon запускається...")
|
||||
print(f" Node ID: {self.node_id}")
|
||||
print(f" Tier: {self.tier}")
|
||||
print(f" NATS URL: {self.nats_url}")
|
||||
|
||||
# Ініціалізація компонентів
|
||||
self.registry = CapabilityRegistry(self.postgres_url, self.node_id, self.tier, self.region)
|
||||
self.nats_client = NATSClient(self.nats_url)
|
||||
self.job_executor = JobExecutor(self.node_id, self.tier)
|
||||
self.metrics = MetricsExporter(port=9090)
|
||||
|
||||
# Реєстрація capabilities
|
||||
await self.registry.register()
|
||||
|
||||
# Підключення до NATS
|
||||
await self.nats_client.connect()
|
||||
|
||||
# Підписка на streams
|
||||
await self.nats_client.subscribe_streams(self.job_executor)
|
||||
|
||||
# Запуск metrics server
|
||||
await self.metrics.start()
|
||||
|
||||
# Heartbeat loop
|
||||
self.running = True
|
||||
asyncio.create_task(self.heartbeat_loop())
|
||||
|
||||
print("✅ Worker Daemon запущено")
|
||||
|
||||
async def heartbeat_loop(self):
|
||||
"""Heartbeat кожні 30 секунд"""
|
||||
while self.running:
|
||||
await asyncio.sleep(30)
|
||||
if self.registry:
|
||||
await self.registry.update_heartbeat()
|
||||
|
||||
async def stop(self):
|
||||
"""Зупинка worker daemon"""
|
||||
print("🛑 Зупинка Worker Daemon...")
|
||||
self.running = False
|
||||
|
||||
if self.nats_client:
|
||||
await self.nats_client.disconnect()
|
||||
if self.registry:
|
||||
await self.registry.unregister()
|
||||
if self.metrics:
|
||||
await self.metrics.stop()
|
||||
|
||||
print("✅ Worker Daemon зупинено")
|
||||
|
||||
def setup_signal_handlers(self):
|
||||
"""Налаштування обробників сигналів"""
|
||||
def signal_handler(sig, frame):
|
||||
print(f"\n📡 Отримано сигнал {sig}")
|
||||
asyncio.create_task(self.stop())
|
||||
sys.exit(0)
|
||||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
|
||||
|
||||
async def main():
|
||||
daemon = WorkerDaemon()
|
||||
daemon.setup_signal_handlers()
|
||||
|
||||
try:
|
||||
await daemon.start()
|
||||
# Чекаємо поки працює
|
||||
while daemon.running:
|
||||
await asyncio.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
await daemon.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
69
infrastructure/worker-daemon/worker/metrics.py
Normal file
69
infrastructure/worker-daemon/worker/metrics.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""
|
||||
Metrics Exporter — Prometheus metrics для worker
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
from aiohttp import web
|
||||
from prometheus_client import Counter, Histogram, Gauge, generate_latest
|
||||
|
||||
|
||||
class MetricsExporter:
|
||||
def __init__(self, port: int = 9090):
|
||||
self.port = port
|
||||
self.app: Optional[web.Application] = None
|
||||
self.runner: Optional[web.AppRunner] = None
|
||||
|
||||
# Metrics
|
||||
self.jobs_processed = Counter(
|
||||
"worker_jobs_processed_total",
|
||||
"Total jobs processed",
|
||||
["type", "status"]
|
||||
)
|
||||
self.job_duration = Histogram(
|
||||
"worker_job_duration_seconds",
|
||||
"Job execution duration",
|
||||
["type"],
|
||||
buckets=[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0]
|
||||
)
|
||||
self.gpu_utilization = Gauge(
|
||||
"worker_gpu_utilization",
|
||||
"GPU utilization percentage",
|
||||
["node_id"]
|
||||
)
|
||||
self.vram_usage = Gauge(
|
||||
"worker_vram_usage_bytes",
|
||||
"VRAM usage in bytes",
|
||||
["node_id"]
|
||||
)
|
||||
self.errors_total = Counter(
|
||||
"worker_errors_total",
|
||||
"Total errors",
|
||||
["type", "error_type"]
|
||||
)
|
||||
|
||||
async def start(self):
|
||||
"""Запуск metrics server"""
|
||||
self.app = web.Application()
|
||||
self.app.router.add_get("/metrics", self.metrics_handler)
|
||||
|
||||
self.runner = web.AppRunner(self.app)
|
||||
await self.runner.setup()
|
||||
|
||||
site = web.TCPSite(self.runner, "0.0.0.0", self.port)
|
||||
await site.start()
|
||||
|
||||
print(f"✅ Metrics server запущено на порту {self.port}")
|
||||
|
||||
async def stop(self):
|
||||
"""Зупинка metrics server"""
|
||||
if self.runner:
|
||||
await self.runner.cleanup()
|
||||
print("✅ Metrics server зупинено")
|
||||
|
||||
async def metrics_handler(self, request):
|
||||
"""HTTP handler для /metrics"""
|
||||
return web.Response(
|
||||
text=generate_latest(),
|
||||
content_type="text/plain"
|
||||
)
|
||||
103
infrastructure/worker-daemon/worker/nats_client.py
Normal file
103
infrastructure/worker-daemon/worker/nats_client.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""
|
||||
NATS Client — підключення до NATS JetStream та підписка на streams
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from typing import Optional, Callable
|
||||
from nats.aio.client import Client as NATS
|
||||
from nats.js import JetStreamContext
|
||||
from nats.js.api import StreamConfig, ConsumerConfig
|
||||
|
||||
|
||||
class NATSClient:
|
||||
def __init__(self, nats_url: str):
|
||||
self.nats_url = nats_url
|
||||
self.nc: Optional[NATS] = None
|
||||
self.js: Optional[JetStreamContext] = None
|
||||
|
||||
async def connect(self):
|
||||
"""Підключення до NATS"""
|
||||
try:
|
||||
self.nc = NATS()
|
||||
await self.nc.connect(self.nats_url)
|
||||
self.js = self.nc.jetstream()
|
||||
print(f"✅ Підключено до NATS: {self.nats_url}")
|
||||
except Exception as e:
|
||||
print(f"❌ Помилка підключення до NATS: {e}")
|
||||
raise
|
||||
|
||||
async def disconnect(self):
|
||||
"""Відключення від NATS"""
|
||||
if self.nc:
|
||||
await self.nc.close()
|
||||
print("✅ Відключено від NATS")
|
||||
|
||||
async def subscribe_streams(self, job_executor):
|
||||
"""Підписка на streams для обробки jobs"""
|
||||
if not self.js:
|
||||
raise RuntimeError("NATS не підключено")
|
||||
|
||||
# Підписка на MM_ONLINE (Tier A)
|
||||
if job_executor.tier in ["A", "B"]:
|
||||
await self._subscribe_consumer(
|
||||
stream="MM_ONLINE",
|
||||
consumer="online-worker-tier-a",
|
||||
job_executor=job_executor
|
||||
)
|
||||
|
||||
# Підписка на MM_OFFLINE (Tier B/C)
|
||||
if job_executor.tier in ["B", "C"]:
|
||||
await self._subscribe_consumer(
|
||||
stream="MM_OFFLINE",
|
||||
consumer="offline-worker-tier-b",
|
||||
job_executor=job_executor
|
||||
)
|
||||
|
||||
async def _subscribe_consumer(self, stream: str, consumer: str, job_executor):
|
||||
"""Підписка на конкретний consumer"""
|
||||
try:
|
||||
psub = await self.js.pull_subscribe(
|
||||
subject=f"{stream}.*",
|
||||
durable=consumer,
|
||||
stream=stream
|
||||
)
|
||||
|
||||
print(f"✅ Підписано на {stream}/{consumer}")
|
||||
|
||||
# Обробка messages в окремому task
|
||||
asyncio.create_task(self._process_messages(psub, job_executor))
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ Помилка підписки на {stream}/{consumer}: {e}")
|
||||
|
||||
async def _process_messages(self, psub, job_executor):
|
||||
"""Обробка messages з consumer"""
|
||||
while True:
|
||||
try:
|
||||
msgs = await psub.fetch(batch=10, timeout=5)
|
||||
for msg in msgs:
|
||||
await self._handle_message(msg, job_executor)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"❌ Помилка обробки messages: {e}")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def _handle_message(self, msg, job_executor):
|
||||
"""Обробка одного message"""
|
||||
try:
|
||||
job_data = json.loads(msg.data.decode())
|
||||
|
||||
# Виконання job
|
||||
result = await job_executor.execute_job(job_data)
|
||||
|
||||
if result["success"]:
|
||||
await msg.ack()
|
||||
else:
|
||||
# NAK з backoff
|
||||
await msg.nak(delay=result.get("backoff", 5))
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Помилка обробки job: {e}")
|
||||
await msg.nak()
|
||||
175
infrastructure/worker-daemon/worker/registry.py
Normal file
175
infrastructure/worker-daemon/worker/registry.py
Normal file
@@ -0,0 +1,175 @@
|
||||
"""
|
||||
Capability Registry — реєстрація воркерів в Postgres
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import asyncpg
|
||||
import json
|
||||
import os
|
||||
import platform
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
|
||||
class CapabilityRegistry:
|
||||
def __init__(self, postgres_url: str, node_id: str, tier: str, region: str):
|
||||
self.postgres_url = postgres_url
|
||||
self.node_id = node_id
|
||||
self.tier = tier
|
||||
self.region = region
|
||||
self.pool: Optional[asyncpg.Pool] = None
|
||||
|
||||
async def connect(self):
|
||||
"""Підключення до Postgres"""
|
||||
if not self.postgres_url:
|
||||
print("⚠️ CAPABILITY_REGISTRY не встановлено, пропускаємо реєстрацію")
|
||||
return
|
||||
|
||||
try:
|
||||
self.pool = await asyncpg.create_pool(self.postgres_url)
|
||||
await self._ensure_table()
|
||||
except Exception as e:
|
||||
print(f"❌ Помилка підключення до Postgres: {e}")
|
||||
self.pool = None
|
||||
|
||||
async def _ensure_table(self):
|
||||
"""Створення таблиці worker_capabilities якщо не існує"""
|
||||
async with self.pool.acquire() as conn:
|
||||
await conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS worker_capabilities (
|
||||
node_id VARCHAR(255) PRIMARY KEY,
|
||||
tier VARCHAR(10) NOT NULL,
|
||||
region VARCHAR(50),
|
||||
trust_zone VARCHAR(50),
|
||||
hardware JSONB NOT NULL,
|
||||
capabilities JSONB NOT NULL,
|
||||
status VARCHAR(20) NOT NULL,
|
||||
last_heartbeat TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_worker_capabilities_tier
|
||||
ON worker_capabilities(tier);
|
||||
CREATE INDEX IF NOT EXISTS idx_worker_capabilities_status
|
||||
ON worker_capabilities(status);
|
||||
""")
|
||||
|
||||
async def _detect_hardware(self) -> Dict[str, Any]:
|
||||
"""Автоматичне визначення hardware capabilities"""
|
||||
hardware = {
|
||||
"cpu_cores": os.cpu_count() or 1,
|
||||
"ram_gb": self._get_ram_gb(),
|
||||
"gpu": False,
|
||||
"gpu_model": None,
|
||||
"vram_gb": 0,
|
||||
"cuda_version": None
|
||||
}
|
||||
|
||||
# Перевірка GPU (NVIDIA)
|
||||
try:
|
||||
result = subprocess.run(["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader"],
|
||||
capture_output=True, text=True, timeout=5)
|
||||
if result.returncode == 0:
|
||||
lines = result.stdout.strip().split("\n")
|
||||
if lines:
|
||||
gpu_info = lines[0].split(",")
|
||||
hardware["gpu"] = True
|
||||
hardware["gpu_model"] = gpu_info[0].strip()
|
||||
# Парсинг VRAM (формат: "XXXXX MiB")
|
||||
vram_str = gpu_info[1].strip().replace("MiB", "").strip()
|
||||
hardware["vram_gb"] = int(vram_str) // 1024 if vram_str.isdigit() else 0
|
||||
|
||||
# CUDA version
|
||||
cuda_result = subprocess.run(["nvcc", "--version"],
|
||||
capture_output=True, text=True, timeout=5)
|
||||
if cuda_result.returncode == 0:
|
||||
for line in cuda_result.stdout.split("\n"):
|
||||
if "release" in line.lower():
|
||||
# Парсинг версії CUDA
|
||||
parts = line.split()
|
||||
for i, part in enumerate(parts):
|
||||
if part.lower() == "release":
|
||||
if i + 1 < len(parts):
|
||||
hardware["cuda_version"] = parts[i + 1].rstrip(",")
|
||||
except Exception:
|
||||
pass # GPU не знайдено або помилка
|
||||
|
||||
return hardware
|
||||
|
||||
def _get_ram_gb(self) -> int:
|
||||
"""Отримання RAM в GB"""
|
||||
try:
|
||||
if platform.system() == "Linux":
|
||||
with open("/proc/meminfo") as f:
|
||||
for line in f:
|
||||
if line.startswith("MemTotal:"):
|
||||
return int(line.split()[1]) // (1024 * 1024)
|
||||
elif platform.system() == "Darwin": # macOS
|
||||
result = subprocess.run(["sysctl", "-n", "hw.memsize"],
|
||||
capture_output=True, text=True)
|
||||
if result.returncode == 0:
|
||||
return int(result.stdout.strip()) // (1024 ** 3)
|
||||
except Exception:
|
||||
pass
|
||||
return 8 # Default
|
||||
|
||||
async def register(self):
|
||||
"""Реєстрація воркера"""
|
||||
await self.connect()
|
||||
if not self.pool:
|
||||
return
|
||||
|
||||
hardware = await self._detect_hardware()
|
||||
capabilities = {
|
||||
"max_batch": 1000,
|
||||
"max_tokens": 8192,
|
||||
"models": ["cohere/embed-multilingual-v3.0"],
|
||||
"embedding_dim": 1024,
|
||||
"supported_jobs": ["embed", "summarize", "index"]
|
||||
}
|
||||
|
||||
async with self.pool.acquire() as conn:
|
||||
await conn.execute("""
|
||||
INSERT INTO worker_capabilities
|
||||
(node_id, tier, region, trust_zone, hardware, capabilities, status, last_heartbeat)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
ON CONFLICT (node_id) DO UPDATE SET
|
||||
tier = EXCLUDED.tier,
|
||||
region = EXCLUDED.region,
|
||||
hardware = EXCLUDED.hardware,
|
||||
capabilities = EXCLUDED.capabilities,
|
||||
status = EXCLUDED.status,
|
||||
last_heartbeat = EXCLUDED.last_heartbeat,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
""", self.node_id, self.tier, self.region, "internal",
|
||||
json.dumps(hardware), json.dumps(capabilities), "ready", datetime.utcnow())
|
||||
|
||||
print(f"✅ Worker зареєстровано: {self.node_id} (Tier {self.tier})")
|
||||
|
||||
async def update_heartbeat(self):
|
||||
"""Оновлення heartbeat"""
|
||||
if not self.pool:
|
||||
return
|
||||
|
||||
async with self.pool.acquire() as conn:
|
||||
await conn.execute("""
|
||||
UPDATE worker_capabilities
|
||||
SET last_heartbeat = CURRENT_TIMESTAMP
|
||||
WHERE node_id = $1
|
||||
""", self.node_id)
|
||||
|
||||
async def unregister(self):
|
||||
"""Видалення реєстрації"""
|
||||
if not self.pool:
|
||||
return
|
||||
|
||||
async with self.pool.acquire() as conn:
|
||||
await conn.execute("""
|
||||
UPDATE worker_capabilities
|
||||
SET status = 'offline'
|
||||
WHERE node_id = $1
|
||||
""", self.node_id)
|
||||
|
||||
print(f"✅ Worker видалено з реєстру: {self.node_id}")
|
||||
Reference in New Issue
Block a user