Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.
Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles
Excluded from snapshot: venv/, .env, data/, backups, .tgz archives
Co-authored-by: Cursor <cursoragent@cursor.com>
401 lines
14 KiB
Python
401 lines
14 KiB
Python
"""
|
||
Outbox Pattern Implementation
|
||
=============================
|
||
Забезпечує надійну публікацію подій через Postgres + NATS.
|
||
|
||
Принцип:
|
||
1. Записуємо подію в outbox таблицю (в тій же транзакції що і дані)
|
||
2. Publisher воркер забирає pending події
|
||
3. Публікуємо в NATS JetStream
|
||
4. Помічаємо як published
|
||
|
||
Переваги:
|
||
- Атомарність: дані + подія в одній транзакції
|
||
- Надійність: якщо NATS недоступний, події залишаються в outbox
|
||
- Ідемпотентність: подія публікується тільки один раз
|
||
"""
|
||
|
||
import json
|
||
import asyncio
|
||
import logging
|
||
from datetime import datetime
|
||
from typing import Dict, Any, Optional, List
|
||
from dataclasses import dataclass, field
|
||
from enum import Enum
|
||
import uuid
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class OutboxStatus(str, Enum):
|
||
PENDING = "pending"
|
||
PUBLISHED = "published"
|
||
FAILED = "failed"
|
||
|
||
@dataclass
|
||
class OutboxEvent:
|
||
"""Подія в outbox"""
|
||
id: str
|
||
aggregate_type: str # "message", "attachment", "memory", etc.
|
||
aggregate_id: str # ID об'єкту
|
||
event_type: str # "created", "updated", "deleted"
|
||
payload: Dict[str, Any]
|
||
created_at: datetime = field(default_factory=datetime.utcnow)
|
||
published_at: Optional[datetime] = None
|
||
status: OutboxStatus = OutboxStatus.PENDING
|
||
retry_count: int = 0
|
||
last_error: Optional[str] = None
|
||
|
||
def to_nats_subject(self) -> str:
|
||
"""Генерує NATS subject для події"""
|
||
# Format: {aggregate_type}.{event_type}.{aggregate_id}
|
||
return f"{self.aggregate_type}.{self.event_type}"
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
return {
|
||
"id": self.id,
|
||
"aggregate_type": self.aggregate_type,
|
||
"aggregate_id": self.aggregate_id,
|
||
"event_type": self.event_type,
|
||
"payload": self.payload,
|
||
"created_at": self.created_at.isoformat(),
|
||
"status": self.status.value,
|
||
}
|
||
|
||
|
||
# SQL для створення outbox таблиці
|
||
OUTBOX_TABLE_SQL = """
|
||
CREATE TABLE IF NOT EXISTS outbox (
|
||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||
aggregate_type VARCHAR(100) NOT NULL,
|
||
aggregate_id VARCHAR(255) NOT NULL,
|
||
event_type VARCHAR(100) NOT NULL,
|
||
payload JSONB NOT NULL,
|
||
created_at TIMESTAMPTZ DEFAULT NOW(),
|
||
published_at TIMESTAMPTZ,
|
||
status VARCHAR(20) DEFAULT 'pending',
|
||
retry_count INTEGER DEFAULT 0,
|
||
last_error TEXT,
|
||
|
||
-- Indexes for efficient polling
|
||
CONSTRAINT outbox_status_check CHECK (status IN ('pending', 'published', 'failed'))
|
||
);
|
||
|
||
-- Index for efficient pending event polling
|
||
CREATE INDEX IF NOT EXISTS idx_outbox_pending
|
||
ON outbox(status, created_at)
|
||
WHERE status = 'pending';
|
||
|
||
-- Index for cleanup of old published events
|
||
CREATE INDEX IF NOT EXISTS idx_outbox_published
|
||
ON outbox(published_at)
|
||
WHERE status = 'published';
|
||
"""
|
||
|
||
|
||
class OutboxWriter:
|
||
"""
|
||
Записує події в outbox таблицю.
|
||
|
||
Використовується в Memory Service для запису подій.
|
||
"""
|
||
|
||
def __init__(self, db_pool):
|
||
"""
|
||
Args:
|
||
db_pool: asyncpg connection pool
|
||
"""
|
||
self.db_pool = db_pool
|
||
|
||
async def init_table(self):
|
||
"""Створює outbox таблицю якщо не існує"""
|
||
async with self.db_pool.acquire() as conn:
|
||
await conn.execute(OUTBOX_TABLE_SQL)
|
||
logger.info("Outbox table initialized")
|
||
|
||
async def write(self,
|
||
aggregate_type: str,
|
||
aggregate_id: str,
|
||
event_type: str,
|
||
payload: Dict[str, Any],
|
||
conn=None) -> str:
|
||
"""
|
||
Записує подію в outbox.
|
||
|
||
ВАЖЛИВО: Викликати в тій же транзакції, що і основні дані!
|
||
|
||
Args:
|
||
aggregate_type: Тип агрегату (message, attachment, memory)
|
||
aggregate_id: ID агрегату
|
||
event_type: Тип події (created, updated, deleted)
|
||
payload: Дані події
|
||
conn: Опціонально - існуюче з'єднання для транзакції
|
||
|
||
Returns:
|
||
ID події
|
||
"""
|
||
event_id = str(uuid.uuid4())
|
||
|
||
sql = """
|
||
INSERT INTO outbox (id, aggregate_type, aggregate_id, event_type, payload)
|
||
VALUES ($1, $2, $3, $4, $5)
|
||
RETURNING id
|
||
"""
|
||
|
||
if conn:
|
||
# Використовуємо існуюче з'єднання (для транзакції)
|
||
await conn.execute(sql, event_id, aggregate_type, aggregate_id,
|
||
event_type, json.dumps(payload))
|
||
else:
|
||
# Створюємо нове з'єднання
|
||
async with self.db_pool.acquire() as conn:
|
||
await conn.execute(sql, event_id, aggregate_type, aggregate_id,
|
||
event_type, json.dumps(payload))
|
||
|
||
logger.debug(f"Outbox event written: {aggregate_type}.{event_type} [{event_id}]")
|
||
return event_id
|
||
|
||
|
||
class OutboxPublisher:
|
||
"""
|
||
Публікує події з outbox в NATS.
|
||
|
||
Запускається як background worker.
|
||
"""
|
||
|
||
def __init__(self, db_pool, nats_client,
|
||
batch_size: int = 100,
|
||
poll_interval: float = 1.0,
|
||
max_retries: int = 5):
|
||
"""
|
||
Args:
|
||
db_pool: asyncpg connection pool
|
||
nats_client: NATS client (з JetStream)
|
||
batch_size: Кількість подій за раз
|
||
poll_interval: Інтервал опитування (секунди)
|
||
max_retries: Максимум спроб публікації
|
||
"""
|
||
self.db_pool = db_pool
|
||
self.nats = nats_client
|
||
self.js = None # JetStream context
|
||
self.batch_size = batch_size
|
||
self.poll_interval = poll_interval
|
||
self.max_retries = max_retries
|
||
self._running = False
|
||
|
||
async def start(self):
|
||
"""Запускає publisher worker"""
|
||
self._running = True
|
||
self.js = self.nats.jetstream()
|
||
logger.info("Outbox publisher started")
|
||
|
||
while self._running:
|
||
try:
|
||
await self._process_batch()
|
||
except Exception as e:
|
||
logger.error(f"Outbox publisher error: {e}")
|
||
|
||
await asyncio.sleep(self.poll_interval)
|
||
|
||
async def stop(self):
|
||
"""Зупиняє publisher worker"""
|
||
self._running = False
|
||
logger.info("Outbox publisher stopped")
|
||
|
||
async def _process_batch(self):
|
||
"""Обробляє batch pending подій"""
|
||
async with self.db_pool.acquire() as conn:
|
||
# Fetch pending events
|
||
events = await conn.fetch("""
|
||
SELECT id, aggregate_type, aggregate_id, event_type,
|
||
payload, created_at, retry_count
|
||
FROM outbox
|
||
WHERE status = 'pending'
|
||
ORDER BY created_at
|
||
LIMIT $1
|
||
FOR UPDATE SKIP LOCKED
|
||
""", self.batch_size)
|
||
|
||
if not events:
|
||
return
|
||
|
||
logger.debug(f"Processing {len(events)} outbox events")
|
||
|
||
for event in events:
|
||
await self._publish_event(conn, event)
|
||
|
||
async def _publish_event(self, conn, event):
|
||
"""Публікує одну подію в NATS"""
|
||
event_id = str(event['id'])
|
||
subject = f"{event['aggregate_type']}.{event['event_type']}"
|
||
|
||
try:
|
||
# Prepare message
|
||
message = {
|
||
"event_id": event_id,
|
||
"aggregate_type": event['aggregate_type'],
|
||
"aggregate_id": event['aggregate_id'],
|
||
"event_type": event['event_type'],
|
||
"payload": json.loads(event['payload']) if isinstance(event['payload'], str) else event['payload'],
|
||
"timestamp": event['created_at'].isoformat(),
|
||
}
|
||
|
||
# Publish to JetStream
|
||
ack = await self.js.publish(
|
||
subject,
|
||
json.dumps(message).encode(),
|
||
headers={"Nats-Msg-Id": event_id} # For idempotency
|
||
)
|
||
|
||
# Mark as published
|
||
await conn.execute("""
|
||
UPDATE outbox
|
||
SET status = 'published', published_at = NOW()
|
||
WHERE id = $1
|
||
""", event['id'])
|
||
|
||
logger.debug(f"Published: {subject} [{event_id}] -> seq={ack.seq}")
|
||
|
||
except Exception as e:
|
||
retry_count = event['retry_count'] + 1
|
||
|
||
if retry_count >= self.max_retries:
|
||
# Mark as failed
|
||
await conn.execute("""
|
||
UPDATE outbox
|
||
SET status = 'failed', retry_count = $2, last_error = $3
|
||
WHERE id = $1
|
||
""", event['id'], retry_count, str(e))
|
||
logger.error(f"Outbox event failed permanently: {event_id} - {e}")
|
||
else:
|
||
# Increment retry count
|
||
await conn.execute("""
|
||
UPDATE outbox
|
||
SET retry_count = $2, last_error = $3
|
||
WHERE id = $1
|
||
""", event['id'], retry_count, str(e))
|
||
logger.warning(f"Outbox event retry {retry_count}: {event_id} - {e}")
|
||
|
||
|
||
class OutboxCleaner:
|
||
"""
|
||
Очищає старі опубліковані події.
|
||
|
||
Запускається періодично.
|
||
"""
|
||
|
||
def __init__(self, db_pool, retention_days: int = 7):
|
||
self.db_pool = db_pool
|
||
self.retention_days = retention_days
|
||
|
||
async def cleanup(self) -> int:
|
||
"""
|
||
Видаляє старі опубліковані події.
|
||
|
||
Returns:
|
||
Кількість видалених подій
|
||
"""
|
||
async with self.db_pool.acquire() as conn:
|
||
result = await conn.execute("""
|
||
DELETE FROM outbox
|
||
WHERE status = 'published'
|
||
AND published_at < NOW() - INTERVAL '%s days'
|
||
""" % self.retention_days)
|
||
|
||
count = int(result.split()[-1]) if result else 0
|
||
if count > 0:
|
||
logger.info(f"Cleaned {count} old outbox events")
|
||
return count
|
||
|
||
|
||
# ==================== INTEGRATION HELPERS ====================
|
||
|
||
async def create_outbox_infrastructure(db_pool, nats_url: str = "nats://nats:4222"):
|
||
"""
|
||
Створює всю інфраструктуру outbox.
|
||
|
||
Usage:
|
||
writer, publisher = await create_outbox_infrastructure(db_pool)
|
||
|
||
# В основному коді:
|
||
await writer.write("message", "123", "created", {"text": "Hello"})
|
||
|
||
# Запуск publisher як background task:
|
||
asyncio.create_task(publisher.start())
|
||
"""
|
||
import nats
|
||
|
||
# Initialize table
|
||
writer = OutboxWriter(db_pool)
|
||
await writer.init_table()
|
||
|
||
# Connect to NATS
|
||
nc = await nats.connect(nats_url)
|
||
|
||
# Create publisher
|
||
publisher = OutboxPublisher(db_pool, nc)
|
||
|
||
return writer, publisher, nc
|
||
|
||
|
||
# ==================== MEMORY SERVICE INTEGRATION ====================
|
||
|
||
class MemoryOutboxMixin:
|
||
"""
|
||
Mixin для Memory Service з outbox підтримкою.
|
||
|
||
Додає автоматичну публікацію подій при операціях з пам'яттю.
|
||
"""
|
||
|
||
async def store_fact_with_event(self,
|
||
user_id: str,
|
||
fact: str,
|
||
metadata: Dict[str, Any] = None) -> str:
|
||
"""
|
||
Зберігає факт і публікує подію.
|
||
"""
|
||
async with self.db_pool.acquire() as conn:
|
||
async with conn.transaction():
|
||
# Store fact
|
||
fact_id = await self._store_fact(conn, user_id, fact, metadata)
|
||
|
||
# Write outbox event
|
||
await self.outbox_writer.write(
|
||
aggregate_type="memory",
|
||
aggregate_id=fact_id,
|
||
event_type="fact.created",
|
||
payload={
|
||
"fact_id": fact_id,
|
||
"user_id": user_id,
|
||
"fact_preview": fact[:100], # Preview only
|
||
"metadata": metadata,
|
||
},
|
||
conn=conn
|
||
)
|
||
|
||
return fact_id
|
||
|
||
async def store_vector_with_event(self,
|
||
collection: str,
|
||
vector_id: str,
|
||
vector: List[float],
|
||
payload: Dict[str, Any]) -> str:
|
||
"""
|
||
Зберігає вектор і публікує подію.
|
||
"""
|
||
# Store vector in Qdrant
|
||
await self._store_vector(collection, vector_id, vector, payload)
|
||
|
||
# Write outbox event
|
||
await self.outbox_writer.write(
|
||
aggregate_type="memory",
|
||
aggregate_id=vector_id,
|
||
event_type="vector.indexed",
|
||
payload={
|
||
"vector_id": vector_id,
|
||
"collection": collection,
|
||
"payload_keys": list(payload.keys()),
|
||
}
|
||
)
|
||
|
||
return vector_id
|