From a001636c1199b14ee91d13e3f367b9061dcd0d29 Mon Sep 17 00:00:00 2001 From: Apple Date: Sat, 10 Jan 2026 10:32:44 -0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A7=20NATS:=20standalone=20=D1=80?= =?UTF-8?q?=D0=B5=D0=B6=D0=B8=D0=BC=20+=20streams=20creation=20Job?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - NATS працює в standalone режимі (1 replica) - Виправлено server_name через initContainer - Створено K8s Job для створення streams (через Python) - Створено create-streams.py скрипт TODO: Streams створити через worker-daemon або після виправлення DNS в Job --- .../kubernetes/nats/create-streams-job.yaml | 65 +++++++++++++ infrastructure/nats/create-streams.py | 91 +++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 infrastructure/kubernetes/nats/create-streams-job.yaml create mode 100755 infrastructure/nats/create-streams.py diff --git a/infrastructure/kubernetes/nats/create-streams-job.yaml b/infrastructure/kubernetes/nats/create-streams-job.yaml new file mode 100644 index 00000000..7ba63f80 --- /dev/null +++ b/infrastructure/kubernetes/nats/create-streams-job.yaml @@ -0,0 +1,65 @@ +--- +# K8s Job для створення NATS JetStream streams +apiVersion: batch/v1 +kind: Job +metadata: + name: nats-create-streams + namespace: nats +spec: + template: + spec: + containers: + - name: create-streams + image: python:3.11-slim + command: + - /bin/sh + - -c + - | + pip install -q nats-py + python3 << 'PYEOF' + import asyncio + from nats.js import api + from nats.aio.client import Client as NATS + + async def main(): + nc = NATS() + await nc.connect("nats://nats-client.nats:4222") + js = nc.jetstream() + + streams = [ + {"name": "MM_ONLINE", "subjects": ["mm.embed.online", "mm.retrieve.online", "mm.summarize.online"], "max_age": 1800000000000, "max_deliver": 3}, + {"name": "MM_OFFLINE", "subjects": ["mm.embed.offline", "mm.index.offline", "mm.backfill.offline"], "max_age": 604800000000000, "max_deliver": 10}, + {"name": "MM_WRITE", "subjects": ["mm.qdrant.upsert", "mm.pg.write", "mm.neo4j.write"], "max_age": 604800000000000, "max_deliver": 10}, + {"name": "MM_EVENTS", "subjects": ["mm.event.audit", "mm.event.status"], "max_age": 2592000000000000} + ] + + for s in streams: + try: + await js.add_stream( + name=s["name"], + subjects=s["subjects"], + retention=api.RetentionPolicy.LIMITS, + max_age=s["max_age"], + storage=api.StorageType.FILE, + replicas=1, + discard=api.DiscardPolicy.OLD, + max_deliver=s.get("max_deliver", 10) + ) + print(f"✅ {s['name']}") + except Exception as e: + if "already in use" in str(e).lower(): + print(f"⚠️ {s['name']} вже існує") + else: + print(f"❌ {s['name']}: {e}") + + print("\n📋 Створені streams:") + for stream_name in await js.stream_names(): + info = await js.stream_info(stream_name) + print(f" - {stream_name}: {len(info.config.subjects)} subjects") + + await nc.close() + + asyncio.run(main()) + PYEOF + restartPolicy: Never + backoffLimit: 3 diff --git a/infrastructure/nats/create-streams.py b/infrastructure/nats/create-streams.py new file mode 100755 index 00000000..9c7ebbd7 --- /dev/null +++ b/infrastructure/nats/create-streams.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +""" +Створення NATS JetStream streams через Python API +Використання: python3 create-streams.py +""" + +import sys +import asyncio +from nats.js import api +from nats.aio.client import Client as NATS + + +async def create_streams(nats_url: str): + """Створення всіх streams""" + nc = NATS() + try: + await nc.connect(nats_url) + js = nc.jetstream() + + print("✅ Підключено до NATS") + + streams = [ + { + "name": "MM_ONLINE", + "subjects": ["mm.embed.online", "mm.retrieve.online", "mm.summarize.online"], + "retention": api.RetentionPolicy.LIMITS, + "max_age": 1800000000000, # 30 хв + "max_deliver": 3, + "storage": api.StorageType.FILE, + "replicas": 1, + "discard": api.DiscardPolicy.OLD, + "duplicate_window": 300000000000 # 5 хв + }, + { + "name": "MM_OFFLINE", + "subjects": ["mm.embed.offline", "mm.index.offline", "mm.backfill.offline"], + "retention": api.RetentionPolicy.LIMITS, + "max_age": 604800000000000, # 7 днів + "max_deliver": 10, + "storage": api.StorageType.FILE, + "replicas": 1, + "discard": api.DiscardPolicy.OLD + }, + { + "name": "MM_WRITE", + "subjects": ["mm.qdrant.upsert", "mm.pg.write", "mm.neo4j.write"], + "retention": api.RetentionPolicy.LIMITS, + "max_age": 604800000000000, # 7 днів + "max_deliver": 10, + "storage": api.StorageType.FILE, + "replicas": 1, + "discard": api.DiscardPolicy.OLD + }, + { + "name": "MM_EVENTS", + "subjects": ["mm.event.audit", "mm.event.status"], + "retention": api.RetentionPolicy.LIMITS, + "max_age": 2592000000000000, # 30 днів + "storage": api.StorageType.FILE, + "replicas": 1, + "discard": api.DiscardPolicy.OLD + } + ] + + for stream_config in streams: + name = stream_config.pop("name") + try: + await js.add_stream(name=name, **stream_config) + print(f"✅ Stream '{name}' створено") + except Exception as e: + if "stream name already in use" in str(e).lower(): + print(f"⚠️ Stream '{name}' вже існує") + else: + print(f"❌ Помилка створення stream '{name}': {e}") + + # Перевірка streams + print("\n📋 Створені streams:") + for stream_name in await js.stream_names(): + info = await js.stream_info(stream_name) + print(f" - {stream_name}: {len(info.config.subjects)} subjects") + + await nc.close() + + except Exception as e: + print(f"❌ Помилка: {e}") + sys.exit(1) + + +if __name__ == "__main__": + nats_url = sys.argv[1] if len(sys.argv) > 1 else "nats://nats-client.nats:4222" + asyncio.run(create_streams(nats_url))