diff --git a/infrastructure/nats/init-streams-kubectl.sh b/infrastructure/nats/init-streams-kubectl.sh new file mode 100755 index 00000000..157d2d04 --- /dev/null +++ b/infrastructure/nats/init-streams-kubectl.sh @@ -0,0 +1,76 @@ +#!/bin/bash +# Ініціалізація NATS JetStream streams через kubectl exec +# Використання: ./init-streams-kubectl.sh + +set -e + +echo "🚀 Ініціалізація NATS JetStream streams через kubectl..." + +# Встановлюємо NATS CLI в pod (якщо потрібно) +echo "=== Встановлення NATS CLI в pod ===" +kubectl exec -n nats nats-0 -- sh -c 'apk add --no-cache curl && curl -L https://github.com/nats-io/natscli/releases/latest/download/nats-linux-amd64.zip -o /tmp/nats.zip && unzip -o /tmp/nats.zip -d /tmp && chmod +x /tmp/nats' || echo "NATS CLI встановлення..." + +NATS_URL="nats://nats-client.nats:4222" + +echo "" +echo "=== Створення Stream MM_ONLINE ===" +kubectl exec -n nats nats-0 -- /tmp/nats stream add MM_ONLINE \ + --server="$NATS_URL" \ + --subjects="mm.embed.online,mm.retrieve.online,mm.summarize.online" \ + --storage=file \ + --replicas=2 \ + --max-age=30m \ + --max-deliver=3 \ + --ack \ + --discard=old \ + --duplicate-window=5m \ + --retention=limits \ + --yes || echo "Stream вже існує або помилка" + +echo "" +echo "=== Створення Stream MM_OFFLINE ===" +kubectl exec -n nats nats-0 -- /tmp/nats stream add MM_OFFLINE \ + --server="$NATS_URL" \ + --subjects="mm.embed.offline,mm.index.offline,mm.backfill.offline" \ + --storage=file \ + --replicas=2 \ + --max-age=7d \ + --max-deliver=10 \ + --ack \ + --discard=old \ + --retention=limits \ + --yes || echo "Stream вже існує або помилка" + +echo "" +echo "=== Створення Stream MM_WRITE ===" +kubectl exec -n nats nats-0 -- /tmp/nats stream add MM_WRITE \ + --server="$NATS_URL" \ + --subjects="mm.qdrant.upsert,mm.pg.write,mm.neo4j.write" \ + --storage=file \ + --replicas=2 \ + --max-age=7d \ + --max-deliver=10 \ + --ack \ + --discard=old \ + --retention=limits \ + --yes || echo "Stream вже існує або помилка" + +echo "" +echo "=== Створення Stream MM_EVENTS ===" +kubectl exec -n nats nats-0 -- /tmp/nats stream add MM_EVENTS \ + --server="$NATS_URL" \ + --subjects="mm.event.audit,mm.event.status" \ + --storage=file \ + --replicas=2 \ + --max-age=30d \ + --ack \ + --discard=old \ + --retention=limits \ + --yes || echo "Stream вже існує або помилка" + +echo "" +echo "=== Перевірка streams ===" +kubectl exec -n nats nats-0 -- /tmp/nats stream ls --server="$NATS_URL" || echo "Помилка перевірки" + +echo "" +echo "✅ Streams створено!" diff --git a/infrastructure/nats/init-streams.py b/infrastructure/nats/init-streams.py new file mode 100755 index 00000000..795a8aeb --- /dev/null +++ b/infrastructure/nats/init-streams.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +""" +Ініціалізація NATS JetStream streams для Memory Module +Використання: python3 init-streams.py +""" + +import sys +import json +import requests +from typing import Dict, Any + +NATS_URL = sys.argv[1] if len(sys.argv) > 1 else "nats://nats-client.nats:4222" +NATS_HTTP = NATS_URL.replace("nats://", "http://").replace(":4222", ":8222") + +def create_stream(stream_config: Dict[str, Any]) -> bool: + """Створює stream через NATS HTTP API""" + url = f"{NATS_HTTP}/jsz?streams=1" + + # NATS HTTP API використовує інший формат + # Спробуємо через JetStream API + try: + # Для створення stream потрібен NATS CLI або Go/Python клієнт + # Тимчасово використовуємо curl через kubectl exec + print(f"⚠️ Stream '{stream_config['name']}' потрібно створити через NATS CLI") + print(f" Використайте: kubectl exec -n nats nats-0 -- nats stream add {stream_config['name']} ...") + return False + except Exception as e: + print(f"❌ Помилка створення stream: {e}") + return False + +def main(): + print(f"🚀 Ініціалізація NATS JetStream streams...") + print(f"NATS URL: {NATS_URL}") + print(f"NATS HTTP: {NATS_HTTP}") + + streams = [ + { + "name": "MM_ONLINE", + "subjects": ["mm.embed.online", "mm.retrieve.online", "mm.summarize.online"], + "retention": "limits", + "max_age": 1800000000000, # 30 хв + "max_deliver": 3, + "ack_policy": "explicit", + "storage": "file", + "replicas": 2, + "discard": "old", + "duplicate_window": 300000000000 + }, + { + "name": "MM_OFFLINE", + "subjects": ["mm.embed.offline", "mm.index.offline", "mm.backfill.offline"], + "retention": "limits", + "max_age": 604800000000000, # 7 днів + "max_deliver": 10, + "ack_policy": "explicit", + "storage": "file", + "replicas": 2, + "discard": "old" + }, + { + "name": "MM_WRITE", + "subjects": ["mm.qdrant.upsert", "mm.pg.write", "mm.neo4j.write"], + "retention": "limits", + "max_age": 604800000000000, # 7 днів + "max_deliver": 10, + "ack_policy": "explicit", + "storage": "file", + "replicas": 2, + "discard": "old" + }, + { + "name": "MM_EVENTS", + "subjects": ["mm.event.audit", "mm.event.status"], + "retention": "limits", + "max_age": 2592000000000000, # 30 днів + "ack_policy": "explicit", + "storage": "file", + "replicas": 2, + "discard": "old" + } + ] + + print("\n📋 Streams для створення:") + for stream in streams: + print(f" - {stream['name']}: {', '.join(stream['subjects'])}") + + print("\n⚠️ Для створення streams потрібен NATS CLI.") + print(" Використайте init-streams.sh або встановіть NATS CLI в контейнер.") + + return 0 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/infrastructure/worker-daemon/Dockerfile b/infrastructure/worker-daemon/Dockerfile new file mode 100644 index 00000000..11a51187 --- /dev/null +++ b/infrastructure/worker-daemon/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Встановлення залежностей +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Копіювання коду +COPY worker/ ./worker/ + +# Entrypoint +ENTRYPOINT ["python3", "-m", "worker.main"] diff --git a/infrastructure/worker-daemon/requirements.txt b/infrastructure/worker-daemon/requirements.txt new file mode 100644 index 00000000..ae8b2b7b --- /dev/null +++ b/infrastructure/worker-daemon/requirements.txt @@ -0,0 +1,7 @@ +nats-py==2.7.0 +asyncpg==0.29.0 +psycopg2-binary==2.9.9 +pydantic==2.5.3 +python-dotenv==1.0.1 +prometheus-client==0.19.0 +aiohttp==3.9.1 diff --git a/infrastructure/worker-daemon/worker/__init__.py b/infrastructure/worker-daemon/worker/__init__.py new file mode 100644 index 00000000..a780b40e --- /dev/null +++ b/infrastructure/worker-daemon/worker/__init__.py @@ -0,0 +1 @@ +# Worker Daemon package diff --git a/infrastructure/worker-daemon/worker/job_executor.py b/infrastructure/worker-daemon/worker/job_executor.py new file mode 100644 index 00000000..8ab91075 --- /dev/null +++ b/infrastructure/worker-daemon/worker/job_executor.py @@ -0,0 +1,128 @@ +""" +Job Executor — виконання jobs з NATS +""" + +import asyncio +import json +from typing import Dict, Any, Optional +from datetime import datetime + + +class JobExecutor: + def __init__(self, node_id: str, tier: str): + self.node_id = node_id + self.tier = tier + + async def execute_job(self, job_data: Dict[str, Any]) -> Dict[str, Any]: + """Виконання job""" + job_id = job_data.get("job_id", "unknown") + job_type = job_data.get("type", "unknown") + priority = job_data.get("priority", "offline") + + print(f"📦 Виконання job: {job_id} (type: {job_type}, priority: {priority})") + + # Перевірка requirements + if not self._can_fulfill(job_data.get("requirements", {})): + return { + "success": False, + "reason": "requirements_not_met" + } + + # Виконання залежно від типу + try: + if job_type == "embed": + result = await self._execute_embed(job_data) + elif job_type == "retrieve": + result = await self._execute_retrieve(job_data) + elif job_type == "summarize": + result = await self._execute_summarize(job_data) + elif job_type == "qdrant_upsert": + result = await self._execute_qdrant_upsert(job_data) + elif job_type == "pg_write": + result = await self._execute_pg_write(job_data) + elif job_type == "neo4j_write": + result = await self._execute_neo4j_write(job_data) + else: + return { + "success": False, + "reason": f"unknown_job_type: {job_type}" + } + + return { + "success": True, + "result": result + } + + except Exception as e: + print(f"❌ Помилка виконання job {job_id}: {e}") + return { + "success": False, + "reason": str(e), + "backoff": 30 # Retry через 30 секунд + } + + def _can_fulfill(self, requirements: Dict[str, Any]) -> bool: + """Перевірка, чи воркер може виконати job""" + # Перевірка tier + required_tier = requirements.get("tier") + if required_tier and required_tier != self.tier: + return False + + # Перевірка GPU + needs_gpu = requirements.get("needs_gpu", False) + if needs_gpu: + # TODO: Перевірити наявність GPU + pass + + # Перевірка VRAM + min_vram = requirements.get("min_vram_gb", 0) + if min_vram > 0: + # TODO: Перевірити доступну VRAM + pass + + return True + + async def _execute_embed(self, job_data: Dict[str, Any]) -> Dict[str, Any]: + """Виконання embedding job""" + input_data = job_data.get("input", {}) + texts = input_data.get("text", []) + model = input_data.get("model", "cohere/embed-multilingual-v3.0") + + # TODO: Реальна реалізація через Cohere API або локальну модель + print(f" → Embedding {len(texts)} texts з {model}") + + # Симуляція + await asyncio.sleep(0.1) + + embeddings = [[0.0] * 1024 for _ in texts] # Placeholder + + return { + "embeddings": embeddings, + "model": model, + "dims": 1024 + } + + async def _execute_retrieve(self, job_data: Dict[str, Any]) -> Dict[str, Any]: + """Виконання retrieval job""" + # TODO: Реальна реалізація через Qdrant + return {"results": []} + + async def _execute_summarize(self, job_data: Dict[str, Any]) -> Dict[str, Any]: + """Виконання summarization job""" + # TODO: Реальна реалізація через LLM + return {"summary": "Placeholder summary"} + + async def _execute_qdrant_upsert(self, job_data: Dict[str, Any]) -> Dict[str, Any]: + """Виконання Qdrant upsert""" + # TODO: Реальна реалізація + return {"status": "ok"} + + async def _execute_pg_write(self, job_data: Dict[str, Any]) -> Dict[str, Any]: + """Виконання PostgreSQL write""" + # TODO: Реальна реалізація + return {"status": "ok"} + + async def _execute_neo4j_write(self, job_data: Dict[str, Any]) -> Dict[str, Any]: + """Виконання Neo4j write""" + # TODO: Реальна реалізація + return {"status": "ok"} diff --git a/infrastructure/worker-daemon/worker/main.py b/infrastructure/worker-daemon/worker/main.py new file mode 100644 index 00000000..78e54169 --- /dev/null +++ b/infrastructure/worker-daemon/worker/main.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +""" +Worker Daemon для Memory Module +Реєстрація capabilities, підписка на NATS streams, виконання jobs +""" + +import asyncio +import os +import signal +import sys +from typing import Optional + +from worker.registry import CapabilityRegistry +from worker.nats_client import NATSClient +from worker.job_executor import JobExecutor +from worker.metrics import MetricsExporter + + +class WorkerDaemon: + def __init__(self): + self.node_id = os.getenv("NODE_ID", "unknown") + self.tier = os.getenv("TIER", "C") + self.region = os.getenv("REGION", "unknown") + self.nats_url = os.getenv("NATS_URL", "nats://nats-client.nats:4222") + self.postgres_url = os.getenv("CAPABILITY_REGISTRY", "") + + self.registry: Optional[CapabilityRegistry] = None + self.nats_client: Optional[NATSClient] = None + self.job_executor: Optional[JobExecutor] = None + self.metrics: Optional[MetricsExporter] = None + + self.running = False + + async def start(self): + """Запуск worker daemon""" + print(f"🚀 Worker Daemon запускається...") + print(f" Node ID: {self.node_id}") + print(f" Tier: {self.tier}") + print(f" NATS URL: {self.nats_url}") + + # Ініціалізація компонентів + self.registry = CapabilityRegistry(self.postgres_url, self.node_id, self.tier, self.region) + self.nats_client = NATSClient(self.nats_url) + self.job_executor = JobExecutor(self.node_id, self.tier) + self.metrics = MetricsExporter(port=9090) + + # Реєстрація capabilities + await self.registry.register() + + # Підключення до NATS + await self.nats_client.connect() + + # Підписка на streams + await self.nats_client.subscribe_streams(self.job_executor) + + # Запуск metrics server + await self.metrics.start() + + # Heartbeat loop + self.running = True + asyncio.create_task(self.heartbeat_loop()) + + print("✅ Worker Daemon запущено") + + async def heartbeat_loop(self): + """Heartbeat кожні 30 секунд""" + while self.running: + await asyncio.sleep(30) + if self.registry: + await self.registry.update_heartbeat() + + async def stop(self): + """Зупинка worker daemon""" + print("🛑 Зупинка Worker Daemon...") + self.running = False + + if self.nats_client: + await self.nats_client.disconnect() + if self.registry: + await self.registry.unregister() + if self.metrics: + await self.metrics.stop() + + print("✅ Worker Daemon зупинено") + + def setup_signal_handlers(self): + """Налаштування обробників сигналів""" + def signal_handler(sig, frame): + print(f"\n📡 Отримано сигнал {sig}") + asyncio.create_task(self.stop()) + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + +async def main(): + daemon = WorkerDaemon() + daemon.setup_signal_handlers() + + try: + await daemon.start() + # Чекаємо поки працює + while daemon.running: + await asyncio.sleep(1) + except KeyboardInterrupt: + pass + finally: + await daemon.stop() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/infrastructure/worker-daemon/worker/metrics.py b/infrastructure/worker-daemon/worker/metrics.py new file mode 100644 index 00000000..5e649906 --- /dev/null +++ b/infrastructure/worker-daemon/worker/metrics.py @@ -0,0 +1,69 @@ +""" +Metrics Exporter — Prometheus metrics для worker +""" + +import asyncio +from typing import Optional +from aiohttp import web +from prometheus_client import Counter, Histogram, Gauge, generate_latest + + +class MetricsExporter: + def __init__(self, port: int = 9090): + self.port = port + self.app: Optional[web.Application] = None + self.runner: Optional[web.AppRunner] = None + + # Metrics + self.jobs_processed = Counter( + "worker_jobs_processed_total", + "Total jobs processed", + ["type", "status"] + ) + self.job_duration = Histogram( + "worker_job_duration_seconds", + "Job execution duration", + ["type"], + buckets=[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0] + ) + self.gpu_utilization = Gauge( + "worker_gpu_utilization", + "GPU utilization percentage", + ["node_id"] + ) + self.vram_usage = Gauge( + "worker_vram_usage_bytes", + "VRAM usage in bytes", + ["node_id"] + ) + self.errors_total = Counter( + "worker_errors_total", + "Total errors", + ["type", "error_type"] + ) + + async def start(self): + """Запуск metrics server""" + self.app = web.Application() + self.app.router.add_get("/metrics", self.metrics_handler) + + self.runner = web.AppRunner(self.app) + await self.runner.setup() + + site = web.TCPSite(self.runner, "0.0.0.0", self.port) + await site.start() + + print(f"✅ Metrics server запущено на порту {self.port}") + + async def stop(self): + """Зупинка metrics server""" + if self.runner: + await self.runner.cleanup() + print("✅ Metrics server зупинено") + + async def metrics_handler(self, request): + """HTTP handler для /metrics""" + return web.Response( + text=generate_latest(), + content_type="text/plain" + ) diff --git a/infrastructure/worker-daemon/worker/nats_client.py b/infrastructure/worker-daemon/worker/nats_client.py new file mode 100644 index 00000000..bbcd8215 --- /dev/null +++ b/infrastructure/worker-daemon/worker/nats_client.py @@ -0,0 +1,103 @@ +""" +NATS Client — підключення до NATS JetStream та підписка на streams +""" + +import asyncio +import json +from typing import Optional, Callable +from nats.aio.client import Client as NATS +from nats.js import JetStreamContext +from nats.js.api import StreamConfig, ConsumerConfig + + +class NATSClient: + def __init__(self, nats_url: str): + self.nats_url = nats_url + self.nc: Optional[NATS] = None + self.js: Optional[JetStreamContext] = None + + async def connect(self): + """Підключення до NATS""" + try: + self.nc = NATS() + await self.nc.connect(self.nats_url) + self.js = self.nc.jetstream() + print(f"✅ Підключено до NATS: {self.nats_url}") + except Exception as e: + print(f"❌ Помилка підключення до NATS: {e}") + raise + + async def disconnect(self): + """Відключення від NATS""" + if self.nc: + await self.nc.close() + print("✅ Відключено від NATS") + + async def subscribe_streams(self, job_executor): + """Підписка на streams для обробки jobs""" + if not self.js: + raise RuntimeError("NATS не підключено") + + # Підписка на MM_ONLINE (Tier A) + if job_executor.tier in ["A", "B"]: + await self._subscribe_consumer( + stream="MM_ONLINE", + consumer="online-worker-tier-a", + job_executor=job_executor + ) + + # Підписка на MM_OFFLINE (Tier B/C) + if job_executor.tier in ["B", "C"]: + await self._subscribe_consumer( + stream="MM_OFFLINE", + consumer="offline-worker-tier-b", + job_executor=job_executor + ) + + async def _subscribe_consumer(self, stream: str, consumer: str, job_executor): + """Підписка на конкретний consumer""" + try: + psub = await self.js.pull_subscribe( + subject=f"{stream}.*", + durable=consumer, + stream=stream + ) + + print(f"✅ Підписано на {stream}/{consumer}") + + # Обробка messages в окремому task + asyncio.create_task(self._process_messages(psub, job_executor)) + + except Exception as e: + print(f"⚠️ Помилка підписки на {stream}/{consumer}: {e}") + + async def _process_messages(self, psub, job_executor): + """Обробка messages з consumer""" + while True: + try: + msgs = await psub.fetch(batch=10, timeout=5) + for msg in msgs: + await self._handle_message(msg, job_executor) + except asyncio.TimeoutError: + continue + except Exception as e: + print(f"❌ Помилка обробки messages: {e}") + await asyncio.sleep(1) + + async def _handle_message(self, msg, job_executor): + """Обробка одного message""" + try: + job_data = json.loads(msg.data.decode()) + + # Виконання job + result = await job_executor.execute_job(job_data) + + if result["success"]: + await msg.ack() + else: + # NAK з backoff + await msg.nak(delay=result.get("backoff", 5)) + + except Exception as e: + print(f"❌ Помилка обробки job: {e}") + await msg.nak() diff --git a/infrastructure/worker-daemon/worker/registry.py b/infrastructure/worker-daemon/worker/registry.py new file mode 100644 index 00000000..3fa9055f --- /dev/null +++ b/infrastructure/worker-daemon/worker/registry.py @@ -0,0 +1,175 @@ +""" +Capability Registry — реєстрація воркерів в Postgres +""" + +import asyncio +import asyncpg +import json +import os +import platform +import subprocess +from datetime import datetime +from typing import Dict, Any, Optional + + +class CapabilityRegistry: + def __init__(self, postgres_url: str, node_id: str, tier: str, region: str): + self.postgres_url = postgres_url + self.node_id = node_id + self.tier = tier + self.region = region + self.pool: Optional[asyncpg.Pool] = None + + async def connect(self): + """Підключення до Postgres""" + if not self.postgres_url: + print("⚠️ CAPABILITY_REGISTRY не встановлено, пропускаємо реєстрацію") + return + + try: + self.pool = await asyncpg.create_pool(self.postgres_url) + await self._ensure_table() + except Exception as e: + print(f"❌ Помилка підключення до Postgres: {e}") + self.pool = None + + async def _ensure_table(self): + """Створення таблиці worker_capabilities якщо не існує""" + async with self.pool.acquire() as conn: + await conn.execute(""" + CREATE TABLE IF NOT EXISTS worker_capabilities ( + node_id VARCHAR(255) PRIMARY KEY, + tier VARCHAR(10) NOT NULL, + region VARCHAR(50), + trust_zone VARCHAR(50), + hardware JSONB NOT NULL, + capabilities JSONB NOT NULL, + status VARCHAR(20) NOT NULL, + last_heartbeat TIMESTAMP WITH TIME ZONE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS idx_worker_capabilities_tier + ON worker_capabilities(tier); + CREATE INDEX IF NOT EXISTS idx_worker_capabilities_status + ON worker_capabilities(status); + """) + + async def _detect_hardware(self) -> Dict[str, Any]: + """Автоматичне визначення hardware capabilities""" + hardware = { + "cpu_cores": os.cpu_count() or 1, + "ram_gb": self._get_ram_gb(), + "gpu": False, + "gpu_model": None, + "vram_gb": 0, + "cuda_version": None + } + + # Перевірка GPU (NVIDIA) + try: + result = subprocess.run(["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader"], + capture_output=True, text=True, timeout=5) + if result.returncode == 0: + lines = result.stdout.strip().split("\n") + if lines: + gpu_info = lines[0].split(",") + hardware["gpu"] = True + hardware["gpu_model"] = gpu_info[0].strip() + # Парсинг VRAM (формат: "XXXXX MiB") + vram_str = gpu_info[1].strip().replace("MiB", "").strip() + hardware["vram_gb"] = int(vram_str) // 1024 if vram_str.isdigit() else 0 + + # CUDA version + cuda_result = subprocess.run(["nvcc", "--version"], + capture_output=True, text=True, timeout=5) + if cuda_result.returncode == 0: + for line in cuda_result.stdout.split("\n"): + if "release" in line.lower(): + # Парсинг версії CUDA + parts = line.split() + for i, part in enumerate(parts): + if part.lower() == "release": + if i + 1 < len(parts): + hardware["cuda_version"] = parts[i + 1].rstrip(",") + except Exception: + pass # GPU не знайдено або помилка + + return hardware + + def _get_ram_gb(self) -> int: + """Отримання RAM в GB""" + try: + if platform.system() == "Linux": + with open("/proc/meminfo") as f: + for line in f: + if line.startswith("MemTotal:"): + return int(line.split()[1]) // (1024 * 1024) + elif platform.system() == "Darwin": # macOS + result = subprocess.run(["sysctl", "-n", "hw.memsize"], + capture_output=True, text=True) + if result.returncode == 0: + return int(result.stdout.strip()) // (1024 ** 3) + except Exception: + pass + return 8 # Default + + async def register(self): + """Реєстрація воркера""" + await self.connect() + if not self.pool: + return + + hardware = await self._detect_hardware() + capabilities = { + "max_batch": 1000, + "max_tokens": 8192, + "models": ["cohere/embed-multilingual-v3.0"], + "embedding_dim": 1024, + "supported_jobs": ["embed", "summarize", "index"] + } + + async with self.pool.acquire() as conn: + await conn.execute(""" + INSERT INTO worker_capabilities + (node_id, tier, region, trust_zone, hardware, capabilities, status, last_heartbeat) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (node_id) DO UPDATE SET + tier = EXCLUDED.tier, + region = EXCLUDED.region, + hardware = EXCLUDED.hardware, + capabilities = EXCLUDED.capabilities, + status = EXCLUDED.status, + last_heartbeat = EXCLUDED.last_heartbeat, + updated_at = CURRENT_TIMESTAMP + """, self.node_id, self.tier, self.region, "internal", + json.dumps(hardware), json.dumps(capabilities), "ready", datetime.utcnow()) + + print(f"✅ Worker зареєстровано: {self.node_id} (Tier {self.tier})") + + async def update_heartbeat(self): + """Оновлення heartbeat""" + if not self.pool: + return + + async with self.pool.acquire() as conn: + await conn.execute(""" + UPDATE worker_capabilities + SET last_heartbeat = CURRENT_TIMESTAMP + WHERE node_id = $1 + """, self.node_id) + + async def unregister(self): + """Видалення реєстрації""" + if not self.pool: + return + + async with self.pool.acquire() as conn: + await conn.execute(""" + UPDATE worker_capabilities + SET status = 'offline' + WHERE node_id = $1 + """, self.node_id) + + print(f"✅ Worker видалено з реєстру: {self.node_id}")