""" Parser Pipeline =============== Асинхронний пайплайн для обробки документів. Слухає події з NATS JetStream: - attachment.created.document -> парсить документ - attachment.created.image -> описує зображення - attachment.created.audio -> транскрибує аудіо Публікує результати: - attachment.parsed.{type} """ import os import asyncio import logging import json from datetime import datetime from typing import Optional, Dict, Any from dataclasses import dataclass from enum import Enum import httpx logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) # Configuration NATS_URL = os.getenv("NATS_URL", "nats://nats:4222") SWAPPER_URL = os.getenv("SWAPPER_URL", "http://swapper-service:8890") MEMORY_SERVICE_URL = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000") COHERE_API_KEY = os.getenv("COHERE_API_KEY", "") # Stream configuration STREAM_NAME = "ATTACHMENTS" CONSUMER_NAME = "parser-pipeline" class ProcessingResult: def __init__(self, success: bool, content: str = "", metadata: Dict = None, error: str = None): self.success = success self.content = content self.metadata = metadata or {} self.error = error class ParserPipeline: """ Асинхронний обробник документів. Підписується на NATS JetStream і обробляє файли: - PDF, DOCX, TXT -> текст - Images -> опис через Vision - Audio -> транскрипція через STT """ def __init__(self): self.nc = None self.js = None self.http_client = httpx.AsyncClient(timeout=120.0) self.running = False async def connect(self): """Підключення до NATS JetStream""" try: import nats from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy self.nc = await nats.connect(NATS_URL) self.js = self.nc.jetstream() # Ensure stream exists try: await self.js.stream_info(STREAM_NAME) logger.info(f"Connected to stream: {STREAM_NAME}") except Exception: # Create stream if not exists await self.js.add_stream( name=STREAM_NAME, subjects=["attachment.>"] ) logger.info(f"Created stream: {STREAM_NAME}") # Create durable consumer try: consumer_config = ConsumerConfig( durable_name=CONSUMER_NAME, deliver_policy=DeliverPolicy.ALL, ack_policy=AckPolicy.EXPLICIT, filter_subject="attachment.created.>", max_deliver=3, # Retry up to 3 times ) await self.js.add_consumer(STREAM_NAME, consumer_config) logger.info(f"Consumer created: {CONSUMER_NAME}") except Exception as e: logger.info(f"Consumer exists or error: {e}") return True except Exception as e: logger.error(f"NATS connection failed: {e}") return False async def start(self): """Запуск обробки повідомлень""" if not await self.connect(): logger.error("Failed to connect to NATS") return self.running = True logger.info("Parser Pipeline started") # Subscribe to attachment events sub = await self.js.pull_subscribe( "attachment.created.>", CONSUMER_NAME, stream=STREAM_NAME ) while self.running: try: # Fetch messages (batch of 10, wait 5 seconds) messages = await sub.fetch(10, timeout=5) for msg in messages: try: await self.process_message(msg) await msg.ack() except Exception as e: logger.error(f"Message processing failed: {e}") await msg.nak(delay=10) # Retry after 10 seconds except asyncio.TimeoutError: # No messages, continue polling pass except Exception as e: logger.error(f"Fetch error: {e}") await asyncio.sleep(1) async def stop(self): """Зупинка пайплайну""" self.running = False if self.nc: await self.nc.close() await self.http_client.aclose() logger.info("Parser Pipeline stopped") async def process_message(self, msg): """Обробляє одне повідомлення""" try: data = json.loads(msg.data.decode()) file_type = data.get("file_type", "unknown") file_id = data.get("file_id") storage_path = data.get("storage_path") agent_id = data.get("agent_id", "helion") user_id = data.get("user_id") logger.info(f"Processing: {file_id} ({file_type})") # Process based on type if file_type == "document": result = await self.process_document(storage_path, data.get("mime_type")) elif file_type == "image": result = await self.process_image(storage_path) elif file_type == "audio": result = await self.process_audio(storage_path) else: logger.warning(f"Unknown file type: {file_type}") return if result.success: # Index to RAG await self.index_to_rag( file_id=file_id, content=result.content, metadata=result.metadata, agent_id=agent_id, user_id=user_id ) # Publish parsed event await self.publish_result(file_id, file_type, result, agent_id) logger.info(f"✅ Processed: {file_id}") else: logger.error(f"❌ Failed: {file_id} - {result.error}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON: {e}") async def process_document(self, path: str, mime_type: str) -> ProcessingResult: """Парсить документ через Swapper""" try: resp = await self.http_client.post( f"{SWAPPER_URL}/document", json={"file_path": path, "mime_type": mime_type}, timeout=60.0 ) if resp.status_code == 200: data = resp.json() return ProcessingResult( success=True, content=data.get("text", ""), metadata={ "pages": data.get("pages", 1), "mime_type": mime_type, "processed_at": datetime.utcnow().isoformat() } ) else: return ProcessingResult(success=False, error=f"API error: {resp.status_code}") except Exception as e: return ProcessingResult(success=False, error=str(e)) async def process_image(self, path: str) -> ProcessingResult: """Описує зображення через Vision""" try: import base64 with open(path, "rb") as f: image_data = base64.b64encode(f.read()).decode() resp = await self.http_client.post( f"{SWAPPER_URL}/vision", json={ "image_base64": image_data, "prompt": "Опиши це зображення детально українською. Вкажи всі важливі деталі." }, timeout=30.0 ) if resp.status_code == 200: data = resp.json() description = data.get("description", data.get("text", "")) return ProcessingResult( success=True, content=description, metadata={ "type": "image_description", "processed_at": datetime.utcnow().isoformat() } ) else: return ProcessingResult(success=False, error=f"Vision API error: {resp.status_code}") except Exception as e: return ProcessingResult(success=False, error=str(e)) async def process_audio(self, path: str) -> ProcessingResult: """Транскрибує аудіо через STT""" try: import base64 with open(path, "rb") as f: audio_data = base64.b64encode(f.read()).decode() resp = await self.http_client.post( f"{SWAPPER_URL}/stt", json={ "audio_base64": audio_data, "language": "uk" }, timeout=120.0 ) if resp.status_code == 200: data = resp.json() return ProcessingResult( success=True, content=data.get("text", ""), metadata={ "language": data.get("language", "uk"), "duration": data.get("duration"), "processed_at": datetime.utcnow().isoformat() } ) else: return ProcessingResult(success=False, error=f"STT API error: {resp.status_code}") except Exception as e: return ProcessingResult(success=False, error=str(e)) async def index_to_rag(self, file_id: str, content: str, metadata: Dict, agent_id: str, user_id: str): """Індексує контент у RAG (Qdrant через Memory Service)""" if not content or len(content) < 10: return try: # Generate embedding via Cohere if COHERE_API_KEY: import cohere co = cohere.Client(COHERE_API_KEY) embed_resp = co.embed( texts=[content[:8000]], # Limit text length model="embed-multilingual-v3.0", input_type="search_document" ) vector = embed_resp.embeddings[0] else: logger.warning("No Cohere API key, skipping embedding") return # Store in Memory Service resp = await self.http_client.post( f"{MEMORY_SERVICE_URL}/artifacts/store", json={ "artifact_id": file_id, "content": content[:10000], # Limit content "vector": vector, "agent_id": agent_id, "user_id": user_id, "metadata": metadata, "artifact_type": "document" }, timeout=30.0 ) if resp.status_code in [200, 201]: logger.info(f"Indexed to RAG: {file_id}") else: logger.warning(f"RAG indexing failed: {resp.status_code}") except Exception as e: logger.error(f"RAG indexing error: {e}") async def publish_result(self, file_id: str, file_type: str, result: ProcessingResult, agent_id: str): """Публікує результат у NATS""" if not self.js: return event = { "event_id": f"parsed-{file_id}", "event_type": f"attachment.parsed.{file_type}", "timestamp": datetime.utcnow().isoformat(), "file_id": file_id, "agent_id": agent_id, "content_length": len(result.content), "metadata": result.metadata } subject = f"attachment.parsed.{file_type}" await self.js.publish(subject, json.dumps(event).encode()) # ==================== FastAPI App for Health Check ==================== from fastapi import FastAPI app = FastAPI(title="Parser Pipeline", version="1.0.0") pipeline = ParserPipeline() @app.on_event("startup") async def startup(): # Start pipeline in background asyncio.create_task(pipeline.start()) @app.on_event("shutdown") async def shutdown(): await pipeline.stop() @app.get("/health") async def health(): return { "status": "healthy" if pipeline.running else "starting", "service": "parser-pipeline", "nats_connected": pipeline.nc is not None } @app.get("/stats") async def stats(): """Статистика обробки""" return { "running": pipeline.running, "nats_connected": pipeline.nc is not None, "stream": STREAM_NAME, "consumer": CONSUMER_NAME } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8101)