- Автоматичне створення streams при старті worker - Перевірка наявності streams перед створенням - Підтримка всіх 4 streams (MM_ONLINE, MM_OFFLINE, MM_WRITE, MM_EVENTS) Це вирішує проблему з DNS в K8s Job
101 lines
3.9 KiB
Python
101 lines
3.9 KiB
Python
"""
|
|
Stream Creator — створення NATS JetStream streams при старті worker
|
|
"""
|
|
|
|
import asyncio
|
|
from typing import Optional
|
|
from nats.js import api
|
|
from nats.aio.client import Client as NATS
|
|
|
|
|
|
class StreamCreator:
|
|
def __init__(self, nats_url: str):
|
|
self.nats_url = nats_url
|
|
self.nc: Optional[NATS] = None
|
|
self.js: Optional[api.JetStreamContext] = None
|
|
|
|
async def connect(self):
|
|
"""Підключення до NATS"""
|
|
try:
|
|
self.nc = NATS()
|
|
await self.nc.connect(self.nats_url)
|
|
self.js = self.nc.jetstream()
|
|
print(f"✅ Підключено до NATS для створення streams: {self.nats_url}")
|
|
except Exception as e:
|
|
print(f"❌ Помилка підключення до NATS: {e}")
|
|
raise
|
|
|
|
async def disconnect(self):
|
|
"""Відключення від NATS"""
|
|
if self.nc:
|
|
await self.nc.close()
|
|
|
|
async def create_streams_if_not_exist(self):
|
|
"""Створення streams якщо вони не існують"""
|
|
if not self.js:
|
|
raise RuntimeError("NATS не підключено")
|
|
|
|
streams = [
|
|
{
|
|
"name": "MM_ONLINE",
|
|
"subjects": ["mm.embed.online", "mm.retrieve.online", "mm.summarize.online"],
|
|
"retention": api.RetentionPolicy.LIMITS,
|
|
"max_age": 1800000000000, # 30 хв
|
|
"max_deliver": 3,
|
|
"storage": api.StorageType.FILE,
|
|
"replicas": 1,
|
|
"discard": api.DiscardPolicy.OLD,
|
|
"duplicate_window": 300000000000 # 5 хв
|
|
},
|
|
{
|
|
"name": "MM_OFFLINE",
|
|
"subjects": ["mm.embed.offline", "mm.index.offline", "mm.backfill.offline"],
|
|
"retention": api.RetentionPolicy.LIMITS,
|
|
"max_age": 604800000000000, # 7 днів
|
|
"max_deliver": 10,
|
|
"storage": api.StorageType.FILE,
|
|
"replicas": 1,
|
|
"discard": api.DiscardPolicy.OLD
|
|
},
|
|
{
|
|
"name": "MM_WRITE",
|
|
"subjects": ["mm.qdrant.upsert", "mm.pg.write", "mm.neo4j.write"],
|
|
"retention": api.RetentionPolicy.LIMITS,
|
|
"max_age": 604800000000000, # 7 днів
|
|
"max_deliver": 10,
|
|
"storage": api.StorageType.FILE,
|
|
"replicas": 1,
|
|
"discard": api.DiscardPolicy.OLD
|
|
},
|
|
{
|
|
"name": "MM_EVENTS",
|
|
"subjects": ["mm.event.audit", "mm.event.status"],
|
|
"retention": api.RetentionPolicy.LIMITS,
|
|
"max_age": 2592000000000000, # 30 днів
|
|
"storage": api.StorageType.FILE,
|
|
"replicas": 1,
|
|
"discard": api.DiscardPolicy.OLD
|
|
}
|
|
]
|
|
|
|
for stream_config in streams:
|
|
name = stream_config.pop("name")
|
|
try:
|
|
await self.js.add_stream(name=name, **stream_config)
|
|
print(f"✅ Stream '{name}' створено")
|
|
except Exception as e:
|
|
if "already in use" in str(e).lower() or "stream name already" in str(e).lower():
|
|
print(f"⚠️ Stream '{name}' вже існує")
|
|
else:
|
|
print(f"❌ Помилка створення stream '{name}': {e}")
|
|
|
|
# Перевірка streams
|
|
print("\n📋 Створені streams:")
|
|
try:
|
|
stream_names = await self.js.stream_names()
|
|
for stream_name in stream_names:
|
|
info = await self.js.stream_info(stream_name)
|
|
print(f" - {stream_name}: {len(info.config.subjects)} subjects")
|
|
except Exception as e:
|
|
print(f"⚠️ Помилка перевірки streams: {e}")
|