🔧 NATS: standalone режим + streams creation Job
- NATS працює в standalone режимі (1 replica) - Виправлено server_name через initContainer - Створено K8s Job для створення streams (через Python) - Створено create-streams.py скрипт TODO: Streams створити через worker-daemon або після виправлення DNS в Job
This commit is contained in:
65
infrastructure/kubernetes/nats/create-streams-job.yaml
Normal file
65
infrastructure/kubernetes/nats/create-streams-job.yaml
Normal file
@@ -0,0 +1,65 @@
|
||||
---
|
||||
# K8s Job для створення NATS JetStream streams
|
||||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: nats-create-streams
|
||||
namespace: nats
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: create-streams
|
||||
image: python:3.11-slim
|
||||
command:
|
||||
- /bin/sh
|
||||
- -c
|
||||
- |
|
||||
pip install -q nats-py
|
||||
python3 << 'PYEOF'
|
||||
import asyncio
|
||||
from nats.js import api
|
||||
from nats.aio.client import Client as NATS
|
||||
|
||||
async def main():
|
||||
nc = NATS()
|
||||
await nc.connect("nats://nats-client.nats:4222")
|
||||
js = nc.jetstream()
|
||||
|
||||
streams = [
|
||||
{"name": "MM_ONLINE", "subjects": ["mm.embed.online", "mm.retrieve.online", "mm.summarize.online"], "max_age": 1800000000000, "max_deliver": 3},
|
||||
{"name": "MM_OFFLINE", "subjects": ["mm.embed.offline", "mm.index.offline", "mm.backfill.offline"], "max_age": 604800000000000, "max_deliver": 10},
|
||||
{"name": "MM_WRITE", "subjects": ["mm.qdrant.upsert", "mm.pg.write", "mm.neo4j.write"], "max_age": 604800000000000, "max_deliver": 10},
|
||||
{"name": "MM_EVENTS", "subjects": ["mm.event.audit", "mm.event.status"], "max_age": 2592000000000000}
|
||||
]
|
||||
|
||||
for s in streams:
|
||||
try:
|
||||
await js.add_stream(
|
||||
name=s["name"],
|
||||
subjects=s["subjects"],
|
||||
retention=api.RetentionPolicy.LIMITS,
|
||||
max_age=s["max_age"],
|
||||
storage=api.StorageType.FILE,
|
||||
replicas=1,
|
||||
discard=api.DiscardPolicy.OLD,
|
||||
max_deliver=s.get("max_deliver", 10)
|
||||
)
|
||||
print(f"✅ {s['name']}")
|
||||
except Exception as e:
|
||||
if "already in use" in str(e).lower():
|
||||
print(f"⚠️ {s['name']} вже існує")
|
||||
else:
|
||||
print(f"❌ {s['name']}: {e}")
|
||||
|
||||
print("\n📋 Створені streams:")
|
||||
for stream_name in await js.stream_names():
|
||||
info = await js.stream_info(stream_name)
|
||||
print(f" - {stream_name}: {len(info.config.subjects)} subjects")
|
||||
|
||||
await nc.close()
|
||||
|
||||
asyncio.run(main())
|
||||
PYEOF
|
||||
restartPolicy: Never
|
||||
backoffLimit: 3
|
||||
91
infrastructure/nats/create-streams.py
Executable file
91
infrastructure/nats/create-streams.py
Executable file
@@ -0,0 +1,91 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Створення NATS JetStream streams через Python API
|
||||
Використання: python3 create-streams.py <nats-url>
|
||||
"""
|
||||
|
||||
import sys
|
||||
import asyncio
|
||||
from nats.js import api
|
||||
from nats.aio.client import Client as NATS
|
||||
|
||||
|
||||
async def create_streams(nats_url: str):
|
||||
"""Створення всіх streams"""
|
||||
nc = NATS()
|
||||
try:
|
||||
await nc.connect(nats_url)
|
||||
js = nc.jetstream()
|
||||
|
||||
print("✅ Підключено до NATS")
|
||||
|
||||
streams = [
|
||||
{
|
||||
"name": "MM_ONLINE",
|
||||
"subjects": ["mm.embed.online", "mm.retrieve.online", "mm.summarize.online"],
|
||||
"retention": api.RetentionPolicy.LIMITS,
|
||||
"max_age": 1800000000000, # 30 хв
|
||||
"max_deliver": 3,
|
||||
"storage": api.StorageType.FILE,
|
||||
"replicas": 1,
|
||||
"discard": api.DiscardPolicy.OLD,
|
||||
"duplicate_window": 300000000000 # 5 хв
|
||||
},
|
||||
{
|
||||
"name": "MM_OFFLINE",
|
||||
"subjects": ["mm.embed.offline", "mm.index.offline", "mm.backfill.offline"],
|
||||
"retention": api.RetentionPolicy.LIMITS,
|
||||
"max_age": 604800000000000, # 7 днів
|
||||
"max_deliver": 10,
|
||||
"storage": api.StorageType.FILE,
|
||||
"replicas": 1,
|
||||
"discard": api.DiscardPolicy.OLD
|
||||
},
|
||||
{
|
||||
"name": "MM_WRITE",
|
||||
"subjects": ["mm.qdrant.upsert", "mm.pg.write", "mm.neo4j.write"],
|
||||
"retention": api.RetentionPolicy.LIMITS,
|
||||
"max_age": 604800000000000, # 7 днів
|
||||
"max_deliver": 10,
|
||||
"storage": api.StorageType.FILE,
|
||||
"replicas": 1,
|
||||
"discard": api.DiscardPolicy.OLD
|
||||
},
|
||||
{
|
||||
"name": "MM_EVENTS",
|
||||
"subjects": ["mm.event.audit", "mm.event.status"],
|
||||
"retention": api.RetentionPolicy.LIMITS,
|
||||
"max_age": 2592000000000000, # 30 днів
|
||||
"storage": api.StorageType.FILE,
|
||||
"replicas": 1,
|
||||
"discard": api.DiscardPolicy.OLD
|
||||
}
|
||||
]
|
||||
|
||||
for stream_config in streams:
|
||||
name = stream_config.pop("name")
|
||||
try:
|
||||
await js.add_stream(name=name, **stream_config)
|
||||
print(f"✅ Stream '{name}' створено")
|
||||
except Exception as e:
|
||||
if "stream name already in use" in str(e).lower():
|
||||
print(f"⚠️ Stream '{name}' вже існує")
|
||||
else:
|
||||
print(f"❌ Помилка створення stream '{name}': {e}")
|
||||
|
||||
# Перевірка streams
|
||||
print("\n📋 Створені streams:")
|
||||
for stream_name in await js.stream_names():
|
||||
info = await js.stream_info(stream_name)
|
||||
print(f" - {stream_name}: {len(info.config.subjects)} subjects")
|
||||
|
||||
await nc.close()
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Помилка: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
nats_url = sys.argv[1] if len(sys.argv) > 1 else "nats://nats-client.nats:4222"
|
||||
asyncio.run(create_streams(nats_url))
|
||||
Reference in New Issue
Block a user