Some checks failed
Update Documentation / update-repos-info (push) Has been cancelled
- Capability Registry (Postgres heartbeat) - NATS Client (підписка на streams) - Job Executor (виконання jobs) - Metrics Exporter (Prometheus) - Dockerfile для deployment - Виправлено server_name в NATS (emptyDir) TODO: Реальна реалізація embed/retrieve/summarize, Matrix Gateway, Auth
104 lines
3.7 KiB
Python
104 lines
3.7 KiB
Python
"""
|
|
NATS Client — підключення до NATS JetStream та підписка на streams
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
from typing import Optional, Callable
|
|
from nats.aio.client import Client as NATS
|
|
from nats.js import JetStreamContext
|
|
from nats.js.api import StreamConfig, ConsumerConfig
|
|
|
|
|
|
class NATSClient:
|
|
def __init__(self, nats_url: str):
|
|
self.nats_url = nats_url
|
|
self.nc: Optional[NATS] = None
|
|
self.js: Optional[JetStreamContext] = None
|
|
|
|
async def connect(self):
|
|
"""Підключення до NATS"""
|
|
try:
|
|
self.nc = NATS()
|
|
await self.nc.connect(self.nats_url)
|
|
self.js = self.nc.jetstream()
|
|
print(f"✅ Підключено до NATS: {self.nats_url}")
|
|
except Exception as e:
|
|
print(f"❌ Помилка підключення до NATS: {e}")
|
|
raise
|
|
|
|
async def disconnect(self):
|
|
"""Відключення від NATS"""
|
|
if self.nc:
|
|
await self.nc.close()
|
|
print("✅ Відключено від NATS")
|
|
|
|
async def subscribe_streams(self, job_executor):
|
|
"""Підписка на streams для обробки jobs"""
|
|
if not self.js:
|
|
raise RuntimeError("NATS не підключено")
|
|
|
|
# Підписка на MM_ONLINE (Tier A)
|
|
if job_executor.tier in ["A", "B"]:
|
|
await self._subscribe_consumer(
|
|
stream="MM_ONLINE",
|
|
consumer="online-worker-tier-a",
|
|
job_executor=job_executor
|
|
)
|
|
|
|
# Підписка на MM_OFFLINE (Tier B/C)
|
|
if job_executor.tier in ["B", "C"]:
|
|
await self._subscribe_consumer(
|
|
stream="MM_OFFLINE",
|
|
consumer="offline-worker-tier-b",
|
|
job_executor=job_executor
|
|
)
|
|
|
|
async def _subscribe_consumer(self, stream: str, consumer: str, job_executor):
|
|
"""Підписка на конкретний consumer"""
|
|
try:
|
|
psub = await self.js.pull_subscribe(
|
|
subject=f"{stream}.*",
|
|
durable=consumer,
|
|
stream=stream
|
|
)
|
|
|
|
print(f"✅ Підписано на {stream}/{consumer}")
|
|
|
|
# Обробка messages в окремому task
|
|
asyncio.create_task(self._process_messages(psub, job_executor))
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Помилка підписки на {stream}/{consumer}: {e}")
|
|
|
|
async def _process_messages(self, psub, job_executor):
|
|
"""Обробка messages з consumer"""
|
|
while True:
|
|
try:
|
|
msgs = await psub.fetch(batch=10, timeout=5)
|
|
for msg in msgs:
|
|
await self._handle_message(msg, job_executor)
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
except Exception as e:
|
|
print(f"❌ Помилка обробки messages: {e}")
|
|
await asyncio.sleep(1)
|
|
|
|
async def _handle_message(self, msg, job_executor):
|
|
"""Обробка одного message"""
|
|
try:
|
|
job_data = json.loads(msg.data.decode())
|
|
|
|
# Виконання job
|
|
result = await job_executor.execute_job(job_data)
|
|
|
|
if result["success"]:
|
|
await msg.ack()
|
|
else:
|
|
# NAK з backoff
|
|
await msg.nak(delay=result.get("backoff", 5))
|
|
|
|
except Exception as e:
|
|
print(f"❌ Помилка обробки job: {e}")
|
|
await msg.nak()
|