""" Ingest Pipeline: PARSER → RAG Converts ParsedDocument to Haystack Documents and indexes them """ import logging from typing import List, Dict, Any, Optional from app.document_store import get_document_store, _make_document from app.embedding import get_text_embedder from app.core.config import settings from app.events import publish_document_ingested, publish_document_indexed logger = logging.getLogger(__name__) async def ingest_parsed_document( dao_id: str, doc_id: str, parsed_json: Dict[str, Any], user_id: Optional[str] = None ) -> Dict[str, Any]: """ Ingest parsed document from PARSER service into RAG Args: dao_id: DAO identifier doc_id: Document identifier parsed_json: ParsedDocument JSON from PARSER service user_id: Optional user identifier Returns: Dictionary with ingest results (doc_count, status, metrics) """ import time ingest_start = time.time() logger.info(f"Ingesting document: dao_id={dao_id}, doc_id={doc_id}") try: # Convert parsed_json to Haystack Documents documents = _parsed_json_to_documents(parsed_json, dao_id, doc_id, user_id) if not documents: logger.warning(f"No documents to ingest for doc_id={doc_id}") return { "status": "error", "message": "No documents to ingest", "doc_count": 0 } logger.info(f"Converted {len(documents)} blocks to document chunks") embedder = get_text_embedder() texts = [doc["content"] for doc in documents] embedding_result = embedder.run(texts=texts) embeddings = embedding_result.get("embeddings", []) doc_objects = [] for idx, doc in enumerate(documents): embedding = embeddings[idx] if idx < len(embeddings) else None doc_objects.append( _make_document(content=doc["content"], meta=doc["meta"], embedding=embedding) ) pipeline_start = time.time() document_store = get_document_store() document_store.write_documents(doc_objects) pipeline_time = time.time() - pipeline_start written_docs = len(doc_objects) # Calculate metrics total_time = time.time() - ingest_start pages_count = len(parsed_json.get("pages", [])) blocks_count = sum( len(page.get("blocks", [])) for page in parsed_json.get("pages", []) ) logger.info( f"Ingested {written_docs} documents for doc_id={doc_id}: " f"pages={pages_count}, blocks={blocks_count}, " f"pipeline_time={pipeline_time:.2f}s, total_time={total_time:.2f}s" ) # Publish events await _publish_events_async( dao_id=dao_id, doc_id=doc_id, written_docs=written_docs, pages_count=pages_count, blocks_count=blocks_count, pipeline_time=pipeline_time ) return { "status": "success", "doc_count": written_docs, "dao_id": dao_id, "doc_id": doc_id, "metrics": { "pages_processed": pages_count, "blocks_processed": blocks_count, "documents_indexed": written_docs, "pipeline_time_seconds": round(pipeline_time, 2), "total_time_seconds": round(total_time, 2) } } except Exception as e: logger.error(f"Failed to ingest document: {e}", exc_info=True) total_time = time.time() - ingest_start logger.error(f"Ingest failed after {total_time:.2f}s: {e}") return { "status": "error", "message": str(e), "doc_count": 0, "metrics": { "total_time_seconds": round(total_time, 2), "error": str(e) } } def _parsed_json_to_documents( parsed_json: Dict[str, Any], dao_id: str, doc_id: str, user_id: Optional[str] = None ) -> List[Dict[str, Any]]: """ Convert ParsedDocument JSON to Haystack Documents Args: parsed_json: ParsedDocument JSON structure dao_id: DAO identifier doc_id: Document identifier user_id: Optional user identifier Returns: List of Haystack Document objects """ documents: List[Dict[str, Any]] = [] # Extract pages from parsed_json pages = parsed_json.get("pages", []) for page_data in pages: page_num = page_data.get("page_num", 1) blocks = page_data.get("blocks", []) for block in blocks: # Skip empty blocks text = block.get("text", "").strip() if not text: continue # Build metadata (must-have для RAG) meta = { "dao_id": dao_id, "doc_id": doc_id, "page": page_num, "block_type": block.get("type", "paragraph"), "reading_order": block.get("reading_order", 0) } # Add optional fields if block.get("bbox"): bbox = block["bbox"] meta.update({ "bbox_x": bbox.get("x", 0), "bbox_y": bbox.get("y", 0), "bbox_width": bbox.get("width", 0), "bbox_height": bbox.get("height", 0) }) # Add section if heading if block.get("type") == "heading": meta["section"] = text[:100] # First 100 chars as section name # Add user_id if provided if user_id: meta["user_id"] = user_id # Add document-level metadata if parsed_json.get("metadata"): meta.update({ k: v for k, v in parsed_json["metadata"].items() if k not in ["dao_id"] # Already added }) documents.append({"content": text, "meta": meta}) return documents async def _publish_events_async( dao_id: str, doc_id: str, written_docs: int, pages_count: int, blocks_count: int, pipeline_time: float ): try: await publish_document_ingested( doc_id=doc_id, team_id=dao_id, dao_id=dao_id, chunk_count=written_docs, indexed=True, visibility="public", metadata={ "ingestion_time_ms": round(pipeline_time * 1000), "embed_model": settings.EMBEDDING_MODEL or "bge-m3@v1", "pages_processed": pages_count, "blocks_processed": blocks_count } ) logger.info(f"Published rag.document.ingested event for doc_id={doc_id}") chunk_ids = [f"{doc_id}_chunk_{i+1}" for i in range(written_docs)] await publish_document_indexed( doc_id=doc_id, team_id=dao_id, dao_id=dao_id, chunk_ids=chunk_ids, indexed=True, visibility="public", metadata={ "indexing_time_ms": 0, "milvus_collection": "documents_v1", "neo4j_nodes_created": len(chunk_ids), "embed_model": settings.EMBEDDING_MODEL or "bge-m3@v1" } ) logger.info(f"Published rag.document.indexed event for doc_id={doc_id}") except Exception as e: logger.error(f"Failed to publish RAG events for doc_id={doc_id}: {e}") def _create_ingest_pipeline(): # Deprecated: no haystack pipeline in minimal PGVector mode. return None