""" Outbox Pattern Implementation ============================= Забезпечує надійну публікацію подій через Postgres + NATS. Принцип: 1. Записуємо подію в outbox таблицю (в тій же транзакції що і дані) 2. Publisher воркер забирає pending події 3. Публікуємо в NATS JetStream 4. Помічаємо як published Переваги: - Атомарність: дані + подія в одній транзакції - Надійність: якщо NATS недоступний, події залишаються в outbox - Ідемпотентність: подія публікується тільки один раз """ import json import asyncio import logging from datetime import datetime from typing import Dict, Any, Optional, List from dataclasses import dataclass, field from enum import Enum import uuid logger = logging.getLogger(__name__) class OutboxStatus(str, Enum): PENDING = "pending" PUBLISHED = "published" FAILED = "failed" @dataclass class OutboxEvent: """Подія в outbox""" id: str aggregate_type: str # "message", "attachment", "memory", etc. aggregate_id: str # ID об'єкту event_type: str # "created", "updated", "deleted" payload: Dict[str, Any] created_at: datetime = field(default_factory=datetime.utcnow) published_at: Optional[datetime] = None status: OutboxStatus = OutboxStatus.PENDING retry_count: int = 0 last_error: Optional[str] = None def to_nats_subject(self) -> str: """Генерує NATS subject для події""" # Format: {aggregate_type}.{event_type}.{aggregate_id} return f"{self.aggregate_type}.{self.event_type}" def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "aggregate_type": self.aggregate_type, "aggregate_id": self.aggregate_id, "event_type": self.event_type, "payload": self.payload, "created_at": self.created_at.isoformat(), "status": self.status.value, } # SQL для створення outbox таблиці OUTBOX_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS outbox ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), aggregate_type VARCHAR(100) NOT NULL, aggregate_id VARCHAR(255) NOT NULL, event_type VARCHAR(100) NOT NULL, payload JSONB NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW(), published_at TIMESTAMPTZ, status VARCHAR(20) DEFAULT 'pending', retry_count INTEGER DEFAULT 0, last_error TEXT, -- Indexes for efficient polling CONSTRAINT outbox_status_check CHECK (status IN ('pending', 'published', 'failed')) ); -- Index for efficient pending event polling CREATE INDEX IF NOT EXISTS idx_outbox_pending ON outbox(status, created_at) WHERE status = 'pending'; -- Index for cleanup of old published events CREATE INDEX IF NOT EXISTS idx_outbox_published ON outbox(published_at) WHERE status = 'published'; """ class OutboxWriter: """ Записує події в outbox таблицю. Використовується в Memory Service для запису подій. """ def __init__(self, db_pool): """ Args: db_pool: asyncpg connection pool """ self.db_pool = db_pool async def init_table(self): """Створює outbox таблицю якщо не існує""" async with self.db_pool.acquire() as conn: await conn.execute(OUTBOX_TABLE_SQL) logger.info("Outbox table initialized") async def write(self, aggregate_type: str, aggregate_id: str, event_type: str, payload: Dict[str, Any], conn=None) -> str: """ Записує подію в outbox. ВАЖЛИВО: Викликати в тій же транзакції, що і основні дані! Args: aggregate_type: Тип агрегату (message, attachment, memory) aggregate_id: ID агрегату event_type: Тип події (created, updated, deleted) payload: Дані події conn: Опціонально - існуюче з'єднання для транзакції Returns: ID події """ event_id = str(uuid.uuid4()) sql = """ INSERT INTO outbox (id, aggregate_type, aggregate_id, event_type, payload) VALUES ($1, $2, $3, $4, $5) RETURNING id """ if conn: # Використовуємо існуюче з'єднання (для транзакції) await conn.execute(sql, event_id, aggregate_type, aggregate_id, event_type, json.dumps(payload)) else: # Створюємо нове з'єднання async with self.db_pool.acquire() as conn: await conn.execute(sql, event_id, aggregate_type, aggregate_id, event_type, json.dumps(payload)) logger.debug(f"Outbox event written: {aggregate_type}.{event_type} [{event_id}]") return event_id class OutboxPublisher: """ Публікує події з outbox в NATS. Запускається як background worker. """ def __init__(self, db_pool, nats_client, batch_size: int = 100, poll_interval: float = 1.0, max_retries: int = 5): """ Args: db_pool: asyncpg connection pool nats_client: NATS client (з JetStream) batch_size: Кількість подій за раз poll_interval: Інтервал опитування (секунди) max_retries: Максимум спроб публікації """ self.db_pool = db_pool self.nats = nats_client self.js = None # JetStream context self.batch_size = batch_size self.poll_interval = poll_interval self.max_retries = max_retries self._running = False async def start(self): """Запускає publisher worker""" self._running = True self.js = self.nats.jetstream() logger.info("Outbox publisher started") while self._running: try: await self._process_batch() except Exception as e: logger.error(f"Outbox publisher error: {e}") await asyncio.sleep(self.poll_interval) async def stop(self): """Зупиняє publisher worker""" self._running = False logger.info("Outbox publisher stopped") async def _process_batch(self): """Обробляє batch pending подій""" async with self.db_pool.acquire() as conn: # Fetch pending events events = await conn.fetch(""" SELECT id, aggregate_type, aggregate_id, event_type, payload, created_at, retry_count FROM outbox WHERE status = 'pending' ORDER BY created_at LIMIT $1 FOR UPDATE SKIP LOCKED """, self.batch_size) if not events: return logger.debug(f"Processing {len(events)} outbox events") for event in events: await self._publish_event(conn, event) async def _publish_event(self, conn, event): """Публікує одну подію в NATS""" event_id = str(event['id']) subject = f"{event['aggregate_type']}.{event['event_type']}" try: # Prepare message message = { "event_id": event_id, "aggregate_type": event['aggregate_type'], "aggregate_id": event['aggregate_id'], "event_type": event['event_type'], "payload": json.loads(event['payload']) if isinstance(event['payload'], str) else event['payload'], "timestamp": event['created_at'].isoformat(), } # Publish to JetStream ack = await self.js.publish( subject, json.dumps(message).encode(), headers={"Nats-Msg-Id": event_id} # For idempotency ) # Mark as published await conn.execute(""" UPDATE outbox SET status = 'published', published_at = NOW() WHERE id = $1 """, event['id']) logger.debug(f"Published: {subject} [{event_id}] -> seq={ack.seq}") except Exception as e: retry_count = event['retry_count'] + 1 if retry_count >= self.max_retries: # Mark as failed await conn.execute(""" UPDATE outbox SET status = 'failed', retry_count = $2, last_error = $3 WHERE id = $1 """, event['id'], retry_count, str(e)) logger.error(f"Outbox event failed permanently: {event_id} - {e}") else: # Increment retry count await conn.execute(""" UPDATE outbox SET retry_count = $2, last_error = $3 WHERE id = $1 """, event['id'], retry_count, str(e)) logger.warning(f"Outbox event retry {retry_count}: {event_id} - {e}") class OutboxCleaner: """ Очищає старі опубліковані події. Запускається періодично. """ def __init__(self, db_pool, retention_days: int = 7): self.db_pool = db_pool self.retention_days = retention_days async def cleanup(self) -> int: """ Видаляє старі опубліковані події. Returns: Кількість видалених подій """ async with self.db_pool.acquire() as conn: result = await conn.execute(""" DELETE FROM outbox WHERE status = 'published' AND published_at < NOW() - INTERVAL '%s days' """ % self.retention_days) count = int(result.split()[-1]) if result else 0 if count > 0: logger.info(f"Cleaned {count} old outbox events") return count # ==================== INTEGRATION HELPERS ==================== async def create_outbox_infrastructure(db_pool, nats_url: str = "nats://nats:4222"): """ Створює всю інфраструктуру outbox. Usage: writer, publisher = await create_outbox_infrastructure(db_pool) # В основному коді: await writer.write("message", "123", "created", {"text": "Hello"}) # Запуск publisher як background task: asyncio.create_task(publisher.start()) """ import nats # Initialize table writer = OutboxWriter(db_pool) await writer.init_table() # Connect to NATS nc = await nats.connect(nats_url) # Create publisher publisher = OutboxPublisher(db_pool, nc) return writer, publisher, nc # ==================== MEMORY SERVICE INTEGRATION ==================== class MemoryOutboxMixin: """ Mixin для Memory Service з outbox підтримкою. Додає автоматичну публікацію подій при операціях з пам'яттю. """ async def store_fact_with_event(self, user_id: str, fact: str, metadata: Dict[str, Any] = None) -> str: """ Зберігає факт і публікує подію. """ async with self.db_pool.acquire() as conn: async with conn.transaction(): # Store fact fact_id = await self._store_fact(conn, user_id, fact, metadata) # Write outbox event await self.outbox_writer.write( aggregate_type="memory", aggregate_id=fact_id, event_type="fact.created", payload={ "fact_id": fact_id, "user_id": user_id, "fact_preview": fact[:100], # Preview only "metadata": metadata, }, conn=conn ) return fact_id async def store_vector_with_event(self, collection: str, vector_id: str, vector: List[float], payload: Dict[str, Any]) -> str: """ Зберігає вектор і публікує подію. """ # Store vector in Qdrant await self._store_vector(collection, vector_id, vector, payload) # Write outbox event await self.outbox_writer.write( aggregate_type="memory", aggregate_id=vector_id, event_type="vector.indexed", payload={ "vector_id": vector_id, "collection": collection, "payload_keys": list(payload.keys()), } ) return vector_id