Files
microdao-daarion/services/memory-service/outbox.py
Apple ef3473db21 snapshot: NODE1 production state 2026-02-09
Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.

Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles

Excluded from snapshot: venv/, .env, data/, backups, .tgz archives

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 08:46:46 -08:00

401 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Outbox Pattern Implementation
=============================
Забезпечує надійну публікацію подій через Postgres + NATS.
Принцип:
1. Записуємо подію в outbox таблицю (в тій же транзакції що і дані)
2. Publisher воркер забирає pending події
3. Публікуємо в NATS JetStream
4. Помічаємо як published
Переваги:
- Атомарність: дані + подія в одній транзакції
- Надійність: якщо NATS недоступний, події залишаються в outbox
- Ідемпотентність: подія публікується тільки один раз
"""
import json
import asyncio
import logging
from datetime import datetime
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from enum import Enum
import uuid
logger = logging.getLogger(__name__)
class OutboxStatus(str, Enum):
PENDING = "pending"
PUBLISHED = "published"
FAILED = "failed"
@dataclass
class OutboxEvent:
"""Подія в outbox"""
id: str
aggregate_type: str # "message", "attachment", "memory", etc.
aggregate_id: str # ID об'єкту
event_type: str # "created", "updated", "deleted"
payload: Dict[str, Any]
created_at: datetime = field(default_factory=datetime.utcnow)
published_at: Optional[datetime] = None
status: OutboxStatus = OutboxStatus.PENDING
retry_count: int = 0
last_error: Optional[str] = None
def to_nats_subject(self) -> str:
"""Генерує NATS subject для події"""
# Format: {aggregate_type}.{event_type}.{aggregate_id}
return f"{self.aggregate_type}.{self.event_type}"
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"aggregate_type": self.aggregate_type,
"aggregate_id": self.aggregate_id,
"event_type": self.event_type,
"payload": self.payload,
"created_at": self.created_at.isoformat(),
"status": self.status.value,
}
# SQL для створення outbox таблиці
OUTBOX_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
published_at TIMESTAMPTZ,
status VARCHAR(20) DEFAULT 'pending',
retry_count INTEGER DEFAULT 0,
last_error TEXT,
-- Indexes for efficient polling
CONSTRAINT outbox_status_check CHECK (status IN ('pending', 'published', 'failed'))
);
-- Index for efficient pending event polling
CREATE INDEX IF NOT EXISTS idx_outbox_pending
ON outbox(status, created_at)
WHERE status = 'pending';
-- Index for cleanup of old published events
CREATE INDEX IF NOT EXISTS idx_outbox_published
ON outbox(published_at)
WHERE status = 'published';
"""
class OutboxWriter:
"""
Записує події в outbox таблицю.
Використовується в Memory Service для запису подій.
"""
def __init__(self, db_pool):
"""
Args:
db_pool: asyncpg connection pool
"""
self.db_pool = db_pool
async def init_table(self):
"""Створює outbox таблицю якщо не існує"""
async with self.db_pool.acquire() as conn:
await conn.execute(OUTBOX_TABLE_SQL)
logger.info("Outbox table initialized")
async def write(self,
aggregate_type: str,
aggregate_id: str,
event_type: str,
payload: Dict[str, Any],
conn=None) -> str:
"""
Записує подію в outbox.
ВАЖЛИВО: Викликати в тій же транзакції, що і основні дані!
Args:
aggregate_type: Тип агрегату (message, attachment, memory)
aggregate_id: ID агрегату
event_type: Тип події (created, updated, deleted)
payload: Дані події
conn: Опціонально - існуюче з'єднання для транзакції
Returns:
ID події
"""
event_id = str(uuid.uuid4())
sql = """
INSERT INTO outbox (id, aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4, $5)
RETURNING id
"""
if conn:
# Використовуємо існуюче з'єднання (для транзакції)
await conn.execute(sql, event_id, aggregate_type, aggregate_id,
event_type, json.dumps(payload))
else:
# Створюємо нове з'єднання
async with self.db_pool.acquire() as conn:
await conn.execute(sql, event_id, aggregate_type, aggregate_id,
event_type, json.dumps(payload))
logger.debug(f"Outbox event written: {aggregate_type}.{event_type} [{event_id}]")
return event_id
class OutboxPublisher:
"""
Публікує події з outbox в NATS.
Запускається як background worker.
"""
def __init__(self, db_pool, nats_client,
batch_size: int = 100,
poll_interval: float = 1.0,
max_retries: int = 5):
"""
Args:
db_pool: asyncpg connection pool
nats_client: NATS client (з JetStream)
batch_size: Кількість подій за раз
poll_interval: Інтервал опитування (секунди)
max_retries: Максимум спроб публікації
"""
self.db_pool = db_pool
self.nats = nats_client
self.js = None # JetStream context
self.batch_size = batch_size
self.poll_interval = poll_interval
self.max_retries = max_retries
self._running = False
async def start(self):
"""Запускає publisher worker"""
self._running = True
self.js = self.nats.jetstream()
logger.info("Outbox publisher started")
while self._running:
try:
await self._process_batch()
except Exception as e:
logger.error(f"Outbox publisher error: {e}")
await asyncio.sleep(self.poll_interval)
async def stop(self):
"""Зупиняє publisher worker"""
self._running = False
logger.info("Outbox publisher stopped")
async def _process_batch(self):
"""Обробляє batch pending подій"""
async with self.db_pool.acquire() as conn:
# Fetch pending events
events = await conn.fetch("""
SELECT id, aggregate_type, aggregate_id, event_type,
payload, created_at, retry_count
FROM outbox
WHERE status = 'pending'
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED
""", self.batch_size)
if not events:
return
logger.debug(f"Processing {len(events)} outbox events")
for event in events:
await self._publish_event(conn, event)
async def _publish_event(self, conn, event):
"""Публікує одну подію в NATS"""
event_id = str(event['id'])
subject = f"{event['aggregate_type']}.{event['event_type']}"
try:
# Prepare message
message = {
"event_id": event_id,
"aggregate_type": event['aggregate_type'],
"aggregate_id": event['aggregate_id'],
"event_type": event['event_type'],
"payload": json.loads(event['payload']) if isinstance(event['payload'], str) else event['payload'],
"timestamp": event['created_at'].isoformat(),
}
# Publish to JetStream
ack = await self.js.publish(
subject,
json.dumps(message).encode(),
headers={"Nats-Msg-Id": event_id} # For idempotency
)
# Mark as published
await conn.execute("""
UPDATE outbox
SET status = 'published', published_at = NOW()
WHERE id = $1
""", event['id'])
logger.debug(f"Published: {subject} [{event_id}] -> seq={ack.seq}")
except Exception as e:
retry_count = event['retry_count'] + 1
if retry_count >= self.max_retries:
# Mark as failed
await conn.execute("""
UPDATE outbox
SET status = 'failed', retry_count = $2, last_error = $3
WHERE id = $1
""", event['id'], retry_count, str(e))
logger.error(f"Outbox event failed permanently: {event_id} - {e}")
else:
# Increment retry count
await conn.execute("""
UPDATE outbox
SET retry_count = $2, last_error = $3
WHERE id = $1
""", event['id'], retry_count, str(e))
logger.warning(f"Outbox event retry {retry_count}: {event_id} - {e}")
class OutboxCleaner:
"""
Очищає старі опубліковані події.
Запускається періодично.
"""
def __init__(self, db_pool, retention_days: int = 7):
self.db_pool = db_pool
self.retention_days = retention_days
async def cleanup(self) -> int:
"""
Видаляє старі опубліковані події.
Returns:
Кількість видалених подій
"""
async with self.db_pool.acquire() as conn:
result = await conn.execute("""
DELETE FROM outbox
WHERE status = 'published'
AND published_at < NOW() - INTERVAL '%s days'
""" % self.retention_days)
count = int(result.split()[-1]) if result else 0
if count > 0:
logger.info(f"Cleaned {count} old outbox events")
return count
# ==================== INTEGRATION HELPERS ====================
async def create_outbox_infrastructure(db_pool, nats_url: str = "nats://nats:4222"):
"""
Створює всю інфраструктуру outbox.
Usage:
writer, publisher = await create_outbox_infrastructure(db_pool)
# В основному коді:
await writer.write("message", "123", "created", {"text": "Hello"})
# Запуск publisher як background task:
asyncio.create_task(publisher.start())
"""
import nats
# Initialize table
writer = OutboxWriter(db_pool)
await writer.init_table()
# Connect to NATS
nc = await nats.connect(nats_url)
# Create publisher
publisher = OutboxPublisher(db_pool, nc)
return writer, publisher, nc
# ==================== MEMORY SERVICE INTEGRATION ====================
class MemoryOutboxMixin:
"""
Mixin для Memory Service з outbox підтримкою.
Додає автоматичну публікацію подій при операціях з пам'яттю.
"""
async def store_fact_with_event(self,
user_id: str,
fact: str,
metadata: Dict[str, Any] = None) -> str:
"""
Зберігає факт і публікує подію.
"""
async with self.db_pool.acquire() as conn:
async with conn.transaction():
# Store fact
fact_id = await self._store_fact(conn, user_id, fact, metadata)
# Write outbox event
await self.outbox_writer.write(
aggregate_type="memory",
aggregate_id=fact_id,
event_type="fact.created",
payload={
"fact_id": fact_id,
"user_id": user_id,
"fact_preview": fact[:100], # Preview only
"metadata": metadata,
},
conn=conn
)
return fact_id
async def store_vector_with_event(self,
collection: str,
vector_id: str,
vector: List[float],
payload: Dict[str, Any]) -> str:
"""
Зберігає вектор і публікує подію.
"""
# Store vector in Qdrant
await self._store_vector(collection, vector_id, vector, payload)
# Write outbox event
await self.outbox_writer.write(
aggregate_type="memory",
aggregate_id=vector_id,
event_type="vector.indexed",
payload={
"vector_id": vector_id,
"collection": collection,
"payload_keys": list(payload.keys()),
}
)
return vector_id