🔧 Worker Daemon: додано Stream Creator

- Автоматичне створення streams при старті worker
- Перевірка наявності streams перед створенням
- Підтримка всіх 4 streams (MM_ONLINE, MM_OFFLINE, MM_WRITE, MM_EVENTS)

Це вирішує проблему з DNS в K8s Job
This commit is contained in:
Apple
2026-01-10 10:41:41 -08:00
parent a0c3c0cbb5
commit 0ebbb172f0
2 changed files with 107 additions and 0 deletions

View File

@@ -14,6 +14,7 @@ from worker.registry import CapabilityRegistry
from worker.nats_client import NATSClient
from worker.job_executor import JobExecutor
from worker.metrics import MetricsExporter
from worker.stream_creator import StreamCreator
class WorkerDaemon:
@@ -50,6 +51,12 @@ class WorkerDaemon:
# Підключення до NATS
await self.nats_client.connect()
# Створення streams якщо не існують
stream_creator = StreamCreator(self.nats_url)
await stream_creator.connect()
await stream_creator.create_streams_if_not_exist()
await stream_creator.disconnect()
# Підписка на streams
await self.nats_client.subscribe_streams(self.job_executor)

View File

@@ -0,0 +1,100 @@
"""
Stream Creator — створення NATS JetStream streams при старті worker
"""
import asyncio
from typing import Optional
from nats.js import api
from nats.aio.client import Client as NATS
class StreamCreator:
def __init__(self, nats_url: str):
self.nats_url = nats_url
self.nc: Optional[NATS] = None
self.js: Optional[api.JetStreamContext] = None
async def connect(self):
"""Підключення до NATS"""
try:
self.nc = NATS()
await self.nc.connect(self.nats_url)
self.js = self.nc.jetstream()
print(f"✅ Підключено до NATS для створення streams: {self.nats_url}")
except Exception as e:
print(f"❌ Помилка підключення до NATS: {e}")
raise
async def disconnect(self):
"""Відключення від NATS"""
if self.nc:
await self.nc.close()
async def create_streams_if_not_exist(self):
"""Створення streams якщо вони не існують"""
if not self.js:
raise RuntimeError("NATS не підключено")
streams = [
{
"name": "MM_ONLINE",
"subjects": ["mm.embed.online", "mm.retrieve.online", "mm.summarize.online"],
"retention": api.RetentionPolicy.LIMITS,
"max_age": 1800000000000, # 30 хв
"max_deliver": 3,
"storage": api.StorageType.FILE,
"replicas": 1,
"discard": api.DiscardPolicy.OLD,
"duplicate_window": 300000000000 # 5 хв
},
{
"name": "MM_OFFLINE",
"subjects": ["mm.embed.offline", "mm.index.offline", "mm.backfill.offline"],
"retention": api.RetentionPolicy.LIMITS,
"max_age": 604800000000000, # 7 днів
"max_deliver": 10,
"storage": api.StorageType.FILE,
"replicas": 1,
"discard": api.DiscardPolicy.OLD
},
{
"name": "MM_WRITE",
"subjects": ["mm.qdrant.upsert", "mm.pg.write", "mm.neo4j.write"],
"retention": api.RetentionPolicy.LIMITS,
"max_age": 604800000000000, # 7 днів
"max_deliver": 10,
"storage": api.StorageType.FILE,
"replicas": 1,
"discard": api.DiscardPolicy.OLD
},
{
"name": "MM_EVENTS",
"subjects": ["mm.event.audit", "mm.event.status"],
"retention": api.RetentionPolicy.LIMITS,
"max_age": 2592000000000000, # 30 днів
"storage": api.StorageType.FILE,
"replicas": 1,
"discard": api.DiscardPolicy.OLD
}
]
for stream_config in streams:
name = stream_config.pop("name")
try:
await self.js.add_stream(name=name, **stream_config)
print(f"✅ Stream '{name}' створено")
except Exception as e:
if "already in use" in str(e).lower() or "stream name already" in str(e).lower():
print(f"⚠️ Stream '{name}' вже існує")
else:
print(f"❌ Помилка створення stream '{name}': {e}")
# Перевірка streams
print("\n📋 Створені streams:")
try:
stream_names = await self.js.stream_names()
for stream_name in stream_names:
info = await self.js.stream_info(stream_name)
print(f" - {stream_name}: {len(info.config.subjects)} subjects")
except Exception as e:
print(f"⚠️ Помилка перевірки streams: {e}")