From 0ebbb172f0b92b36501000ea111fe9fcde311324 Mon Sep 17 00:00:00 2001 From: Apple Date: Sat, 10 Jan 2026 10:41:41 -0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A7=20Worker=20Daemon:=20=D0=B4=D0=BE?= =?UTF-8?q?=D0=B4=D0=B0=D0=BD=D0=BE=20Stream=20Creator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Автоматичне створення streams при старті worker - Перевірка наявності streams перед створенням - Підтримка всіх 4 streams (MM_ONLINE, MM_OFFLINE, MM_WRITE, MM_EVENTS) Це вирішує проблему з DNS в K8s Job --- infrastructure/worker-daemon/worker/main.py | 7 ++ .../worker-daemon/worker/stream_creator.py | 100 ++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 infrastructure/worker-daemon/worker/stream_creator.py diff --git a/infrastructure/worker-daemon/worker/main.py b/infrastructure/worker-daemon/worker/main.py index 78e54169..1105570d 100644 --- a/infrastructure/worker-daemon/worker/main.py +++ b/infrastructure/worker-daemon/worker/main.py @@ -14,6 +14,7 @@ from worker.registry import CapabilityRegistry from worker.nats_client import NATSClient from worker.job_executor import JobExecutor from worker.metrics import MetricsExporter +from worker.stream_creator import StreamCreator class WorkerDaemon: @@ -50,6 +51,12 @@ class WorkerDaemon: # Підключення до NATS await self.nats_client.connect() + # Створення streams якщо не існують + stream_creator = StreamCreator(self.nats_url) + await stream_creator.connect() + await stream_creator.create_streams_if_not_exist() + await stream_creator.disconnect() + # Підписка на streams await self.nats_client.subscribe_streams(self.job_executor) diff --git a/infrastructure/worker-daemon/worker/stream_creator.py b/infrastructure/worker-daemon/worker/stream_creator.py new file mode 100644 index 00000000..5949252c --- /dev/null +++ b/infrastructure/worker-daemon/worker/stream_creator.py @@ -0,0 +1,100 @@ +""" +Stream Creator — створення NATS JetStream streams при старті worker +""" + +import asyncio +from typing import Optional +from nats.js import api +from nats.aio.client import Client as NATS + + +class StreamCreator: + def __init__(self, nats_url: str): + self.nats_url = nats_url + self.nc: Optional[NATS] = None + self.js: Optional[api.JetStreamContext] = None + + async def connect(self): + """Підключення до NATS""" + try: + self.nc = NATS() + await self.nc.connect(self.nats_url) + self.js = self.nc.jetstream() + print(f"✅ Підключено до NATS для створення streams: {self.nats_url}") + except Exception as e: + print(f"❌ Помилка підключення до NATS: {e}") + raise + + async def disconnect(self): + """Відключення від NATS""" + if self.nc: + await self.nc.close() + + async def create_streams_if_not_exist(self): + """Створення streams якщо вони не існують""" + if not self.js: + raise RuntimeError("NATS не підключено") + + streams = [ + { + "name": "MM_ONLINE", + "subjects": ["mm.embed.online", "mm.retrieve.online", "mm.summarize.online"], + "retention": api.RetentionPolicy.LIMITS, + "max_age": 1800000000000, # 30 хв + "max_deliver": 3, + "storage": api.StorageType.FILE, + "replicas": 1, + "discard": api.DiscardPolicy.OLD, + "duplicate_window": 300000000000 # 5 хв + }, + { + "name": "MM_OFFLINE", + "subjects": ["mm.embed.offline", "mm.index.offline", "mm.backfill.offline"], + "retention": api.RetentionPolicy.LIMITS, + "max_age": 604800000000000, # 7 днів + "max_deliver": 10, + "storage": api.StorageType.FILE, + "replicas": 1, + "discard": api.DiscardPolicy.OLD + }, + { + "name": "MM_WRITE", + "subjects": ["mm.qdrant.upsert", "mm.pg.write", "mm.neo4j.write"], + "retention": api.RetentionPolicy.LIMITS, + "max_age": 604800000000000, # 7 днів + "max_deliver": 10, + "storage": api.StorageType.FILE, + "replicas": 1, + "discard": api.DiscardPolicy.OLD + }, + { + "name": "MM_EVENTS", + "subjects": ["mm.event.audit", "mm.event.status"], + "retention": api.RetentionPolicy.LIMITS, + "max_age": 2592000000000000, # 30 днів + "storage": api.StorageType.FILE, + "replicas": 1, + "discard": api.DiscardPolicy.OLD + } + ] + + for stream_config in streams: + name = stream_config.pop("name") + try: + await self.js.add_stream(name=name, **stream_config) + print(f"✅ Stream '{name}' створено") + except Exception as e: + if "already in use" in str(e).lower() or "stream name already" in str(e).lower(): + print(f"⚠️ Stream '{name}' вже існує") + else: + print(f"❌ Помилка створення stream '{name}': {e}") + + # Перевірка streams + print("\n📋 Створені streams:") + try: + stream_names = await self.js.stream_names() + for stream_name in stream_names: + info = await self.js.stream_info(stream_name) + print(f" - {stream_name}: {len(info.config.subjects)} subjects") + except Exception as e: + print(f"⚠️ Помилка перевірки streams: {e}")