Files
Apple ef3473db21 snapshot: NODE1 production state 2026-02-09
Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.

Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles

Excluded from snapshot: venv/, .env, data/, backups, .tgz archives

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 08:46:46 -08:00

385 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Parser Pipeline
===============
Асинхронний пайплайн для обробки документів.
Слухає події з NATS JetStream:
- attachment.created.document -> парсить документ
- attachment.created.image -> описує зображення
- attachment.created.audio -> транскрибує аудіо
Публікує результати:
- attachment.parsed.{type}
"""
import os
import asyncio
import logging
import json
from datetime import datetime
from typing import Optional, Dict, Any
from dataclasses import dataclass
from enum import Enum
import httpx
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Configuration
NATS_URL = os.getenv("NATS_URL", "nats://nats:4222")
SWAPPER_URL = os.getenv("SWAPPER_URL", "http://swapper-service:8890")
MEMORY_SERVICE_URL = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000")
COHERE_API_KEY = os.getenv("COHERE_API_KEY", "")
# Stream configuration
STREAM_NAME = "ATTACHMENTS"
CONSUMER_NAME = "parser-pipeline"
class ProcessingResult:
def __init__(self, success: bool, content: str = "", metadata: Dict = None, error: str = None):
self.success = success
self.content = content
self.metadata = metadata or {}
self.error = error
class ParserPipeline:
"""
Асинхронний обробник документів.
Підписується на NATS JetStream і обробляє файли:
- PDF, DOCX, TXT -> текст
- Images -> опис через Vision
- Audio -> транскрипція через STT
"""
def __init__(self):
self.nc = None
self.js = None
self.http_client = httpx.AsyncClient(timeout=120.0)
self.running = False
async def connect(self):
"""Підключення до NATS JetStream"""
try:
import nats
from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
self.nc = await nats.connect(NATS_URL)
self.js = self.nc.jetstream()
# Ensure stream exists
try:
await self.js.stream_info(STREAM_NAME)
logger.info(f"Connected to stream: {STREAM_NAME}")
except Exception:
# Create stream if not exists
await self.js.add_stream(
name=STREAM_NAME,
subjects=["attachment.>"]
)
logger.info(f"Created stream: {STREAM_NAME}")
# Create durable consumer
try:
consumer_config = ConsumerConfig(
durable_name=CONSUMER_NAME,
deliver_policy=DeliverPolicy.ALL,
ack_policy=AckPolicy.EXPLICIT,
filter_subject="attachment.created.>",
max_deliver=3, # Retry up to 3 times
)
await self.js.add_consumer(STREAM_NAME, consumer_config)
logger.info(f"Consumer created: {CONSUMER_NAME}")
except Exception as e:
logger.info(f"Consumer exists or error: {e}")
return True
except Exception as e:
logger.error(f"NATS connection failed: {e}")
return False
async def start(self):
"""Запуск обробки повідомлень"""
if not await self.connect():
logger.error("Failed to connect to NATS")
return
self.running = True
logger.info("Parser Pipeline started")
# Subscribe to attachment events
sub = await self.js.pull_subscribe(
"attachment.created.>",
CONSUMER_NAME,
stream=STREAM_NAME
)
while self.running:
try:
# Fetch messages (batch of 10, wait 5 seconds)
messages = await sub.fetch(10, timeout=5)
for msg in messages:
try:
await self.process_message(msg)
await msg.ack()
except Exception as e:
logger.error(f"Message processing failed: {e}")
await msg.nak(delay=10) # Retry after 10 seconds
except asyncio.TimeoutError:
# No messages, continue polling
pass
except Exception as e:
logger.error(f"Fetch error: {e}")
await asyncio.sleep(1)
async def stop(self):
"""Зупинка пайплайну"""
self.running = False
if self.nc:
await self.nc.close()
await self.http_client.aclose()
logger.info("Parser Pipeline stopped")
async def process_message(self, msg):
"""Обробляє одне повідомлення"""
try:
data = json.loads(msg.data.decode())
file_type = data.get("file_type", "unknown")
file_id = data.get("file_id")
storage_path = data.get("storage_path")
agent_id = data.get("agent_id", "helion")
user_id = data.get("user_id")
logger.info(f"Processing: {file_id} ({file_type})")
# Process based on type
if file_type == "document":
result = await self.process_document(storage_path, data.get("mime_type"))
elif file_type == "image":
result = await self.process_image(storage_path)
elif file_type == "audio":
result = await self.process_audio(storage_path)
else:
logger.warning(f"Unknown file type: {file_type}")
return
if result.success:
# Index to RAG
await self.index_to_rag(
file_id=file_id,
content=result.content,
metadata=result.metadata,
agent_id=agent_id,
user_id=user_id
)
# Publish parsed event
await self.publish_result(file_id, file_type, result, agent_id)
logger.info(f"✅ Processed: {file_id}")
else:
logger.error(f"❌ Failed: {file_id} - {result.error}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {e}")
async def process_document(self, path: str, mime_type: str) -> ProcessingResult:
"""Парсить документ через Swapper"""
try:
resp = await self.http_client.post(
f"{SWAPPER_URL}/document",
json={"file_path": path, "mime_type": mime_type},
timeout=60.0
)
if resp.status_code == 200:
data = resp.json()
return ProcessingResult(
success=True,
content=data.get("text", ""),
metadata={
"pages": data.get("pages", 1),
"mime_type": mime_type,
"processed_at": datetime.utcnow().isoformat()
}
)
else:
return ProcessingResult(success=False, error=f"API error: {resp.status_code}")
except Exception as e:
return ProcessingResult(success=False, error=str(e))
async def process_image(self, path: str) -> ProcessingResult:
"""Описує зображення через Vision"""
try:
import base64
with open(path, "rb") as f:
image_data = base64.b64encode(f.read()).decode()
resp = await self.http_client.post(
f"{SWAPPER_URL}/vision",
json={
"image_base64": image_data,
"prompt": "Опиши це зображення детально українською. Вкажи всі важливі деталі."
},
timeout=30.0
)
if resp.status_code == 200:
data = resp.json()
description = data.get("description", data.get("text", ""))
return ProcessingResult(
success=True,
content=description,
metadata={
"type": "image_description",
"processed_at": datetime.utcnow().isoformat()
}
)
else:
return ProcessingResult(success=False, error=f"Vision API error: {resp.status_code}")
except Exception as e:
return ProcessingResult(success=False, error=str(e))
async def process_audio(self, path: str) -> ProcessingResult:
"""Транскрибує аудіо через STT"""
try:
import base64
with open(path, "rb") as f:
audio_data = base64.b64encode(f.read()).decode()
resp = await self.http_client.post(
f"{SWAPPER_URL}/stt",
json={
"audio_base64": audio_data,
"language": "uk"
},
timeout=120.0
)
if resp.status_code == 200:
data = resp.json()
return ProcessingResult(
success=True,
content=data.get("text", ""),
metadata={
"language": data.get("language", "uk"),
"duration": data.get("duration"),
"processed_at": datetime.utcnow().isoformat()
}
)
else:
return ProcessingResult(success=False, error=f"STT API error: {resp.status_code}")
except Exception as e:
return ProcessingResult(success=False, error=str(e))
async def index_to_rag(self, file_id: str, content: str, metadata: Dict,
agent_id: str, user_id: str):
"""Індексує контент у RAG (Qdrant через Memory Service)"""
if not content or len(content) < 10:
return
try:
# Generate embedding via Cohere
if COHERE_API_KEY:
import cohere
co = cohere.Client(COHERE_API_KEY)
embed_resp = co.embed(
texts=[content[:8000]], # Limit text length
model="embed-multilingual-v3.0",
input_type="search_document"
)
vector = embed_resp.embeddings[0]
else:
logger.warning("No Cohere API key, skipping embedding")
return
# Store in Memory Service
resp = await self.http_client.post(
f"{MEMORY_SERVICE_URL}/artifacts/store",
json={
"artifact_id": file_id,
"content": content[:10000], # Limit content
"vector": vector,
"agent_id": agent_id,
"user_id": user_id,
"metadata": metadata,
"artifact_type": "document"
},
timeout=30.0
)
if resp.status_code in [200, 201]:
logger.info(f"Indexed to RAG: {file_id}")
else:
logger.warning(f"RAG indexing failed: {resp.status_code}")
except Exception as e:
logger.error(f"RAG indexing error: {e}")
async def publish_result(self, file_id: str, file_type: str,
result: ProcessingResult, agent_id: str):
"""Публікує результат у NATS"""
if not self.js:
return
event = {
"event_id": f"parsed-{file_id}",
"event_type": f"attachment.parsed.{file_type}",
"timestamp": datetime.utcnow().isoformat(),
"file_id": file_id,
"agent_id": agent_id,
"content_length": len(result.content),
"metadata": result.metadata
}
subject = f"attachment.parsed.{file_type}"
await self.js.publish(subject, json.dumps(event).encode())
# ==================== FastAPI App for Health Check ====================
from fastapi import FastAPI
app = FastAPI(title="Parser Pipeline", version="1.0.0")
pipeline = ParserPipeline()
@app.on_event("startup")
async def startup():
# Start pipeline in background
asyncio.create_task(pipeline.start())
@app.on_event("shutdown")
async def shutdown():
await pipeline.stop()
@app.get("/health")
async def health():
return {
"status": "healthy" if pipeline.running else "starting",
"service": "parser-pipeline",
"nats_connected": pipeline.nc is not None
}
@app.get("/stats")
async def stats():
"""Статистика обробки"""
return {
"running": pipeline.running,
"nats_connected": pipeline.nc is not None,
"stream": STREAM_NAME,
"consumer": CONSUMER_NAME
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8101)