From 49272b66e6dab7f7d868aa3c775f4a1afa1bafa6 Mon Sep 17 00:00:00 2001 From: Apple Date: Sun, 16 Nov 2025 03:03:20 -0800 Subject: [PATCH] feat: add RAG converter utilities and update integration guide RAG Converter: - Create app/utils/rag_converter.py with conversion functions - parsed_doc_to_haystack_docs() - convert ParsedDocument to Haystack format - parsed_chunks_to_haystack_docs() - convert ParsedChunk list to Haystack - validate_parsed_doc_for_rag() - validate required fields before conversion - Automatic metadata extraction (dao_id, doc_id, page, block_type) - Preserve optional fields (bbox, section, reading_order) Integration Guide: - Update with ready-to-use converter functions - Add validation examples - Complete workflow examples --- services/parser-service/INTEGRATION.md | 17 ++ services/parser-service/app/utils/__init__.py | 16 ++ .../parser-service/app/utils/rag_converter.py | 175 ++++++++++++++++++ 3 files changed, 208 insertions(+) create mode 100644 services/parser-service/app/utils/__init__.py create mode 100644 services/parser-service/app/utils/rag_converter.py diff --git a/services/parser-service/INTEGRATION.md b/services/parser-service/INTEGRATION.md index 499da2e3..71864339 100644 --- a/services/parser-service/INTEGRATION.md +++ b/services/parser-service/INTEGRATION.md @@ -174,6 +174,23 @@ async def route(request: RouterRequest): ### 1. Конвертація ParsedDocument → Haystack Documents +**Готова функція:** `app/utils/rag_converter.py` + +```python +from app.utils.rag_converter import parsed_doc_to_haystack_docs, validate_parsed_doc_for_rag + +# Валідація перед конвертацією +is_valid, errors = validate_parsed_doc_for_rag(parsed_doc) +if not is_valid: + logger.error(f"Document validation failed: {errors}") + return + +# Конвертація +haystack_docs = parsed_doc_to_haystack_docs(parsed_doc) +``` + +**Або вручну:** + ```python from haystack.schema import Document diff --git a/services/parser-service/app/utils/__init__.py b/services/parser-service/app/utils/__init__.py new file mode 100644 index 00000000..73ebb923 --- /dev/null +++ b/services/parser-service/app/utils/__init__.py @@ -0,0 +1,16 @@ +""" +Utility functions for PARSER Service +""" + +from app.utils.rag_converter import ( + parsed_doc_to_haystack_docs, + parsed_chunks_to_haystack_docs, + validate_parsed_doc_for_rag +) + +__all__ = [ + "parsed_doc_to_haystack_docs", + "parsed_chunks_to_haystack_docs", + "validate_parsed_doc_for_rag" +] + diff --git a/services/parser-service/app/utils/rag_converter.py b/services/parser-service/app/utils/rag_converter.py new file mode 100644 index 00000000..fa243490 --- /dev/null +++ b/services/parser-service/app/utils/rag_converter.py @@ -0,0 +1,175 @@ +""" +Utilities for converting ParsedDocument to RAG formats +""" + +import logging +from typing import List, Dict, Any + +from app.schemas import ParsedDocument, ParsedBlock, ParsedPage + +logger = logging.getLogger(__name__) + + +def parsed_doc_to_haystack_docs(parsed_doc: ParsedDocument) -> List[Dict[str, Any]]: + """ + Convert ParsedDocument to Haystack Documents format + + This function prepares documents for RAG indexing by: + - Extracting all blocks with text + - Adding required metadata (dao_id, doc_id, page, block_type) + - Preserving optional fields (bbox, section, reading_order) + + Args: + parsed_doc: ParsedDocument from PARSER service + + Returns: + List of dictionaries compatible with Haystack Document format + """ + docs = [] + + # Validate required fields + if not parsed_doc.doc_id: + logger.warning("ParsedDocument missing doc_id, cannot create RAG documents") + return [] + + dao_id = parsed_doc.metadata.get("dao_id") + if not dao_id: + logger.warning(f"ParsedDocument missing metadata.dao_id for doc_id={parsed_doc.doc_id}") + + for page in parsed_doc.pages: + for block in page.blocks: + # Skip empty blocks + if not block.text or not block.text.strip(): + continue + + # Build metadata (must-have для RAG) + meta: Dict[str, Any] = { + "dao_id": dao_id or "", + "doc_id": parsed_doc.doc_id, + "page": page.page_num, + "block_type": block.type, + "reading_order": block.reading_order + } + + # Add optional fields + if block.bbox: + meta["bbox_x"] = block.bbox.x + meta["bbox_y"] = block.bbox.y + meta["bbox_width"] = block.bbox.width + meta["bbox_height"] = block.bbox.height + + # Add section if it's a heading + if block.type == "heading": + meta["section"] = block.text[:100] # First 100 chars as section name + + # Add table data if present + if block.type == "table" and block.table_data: + meta["table_rows"] = len(block.table_data.rows) + meta["table_columns"] = len(block.table_data.columns) + + # Add document-level metadata + if parsed_doc.metadata: + meta.update({ + k: v for k, v in parsed_doc.metadata.items() + if k not in ["dao_id"] # Already added + }) + + # Create document dict (Haystack format) + doc = { + "content": block.text.strip(), + "meta": meta + } + + docs.append(doc) + + logger.info(f"Converted {len(docs)} blocks to Haystack documents for doc_id={parsed_doc.doc_id}") + return docs + + +def parsed_chunks_to_haystack_docs(chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Convert ParsedChunk list to Haystack Documents format + + Args: + chunks: List of ParsedChunk dictionaries + + Returns: + List of Haystack Document dictionaries + """ + docs = [] + + for chunk in chunks: + # Validate required fields + if not chunk.get("text") or not chunk.get("text", "").strip(): + continue + + metadata = chunk.get("metadata", {}) + if not metadata.get("dao_id") or not metadata.get("doc_id"): + logger.warning(f"Chunk missing required metadata: {metadata}") + continue + + doc = { + "content": chunk["text"].strip(), + "meta": { + "dao_id": metadata["dao_id"], + "doc_id": metadata["doc_id"], + "page": chunk.get("page", 1), + "section": chunk.get("section"), + } + } + + # Add bbox if present + if chunk.get("bbox"): + bbox = chunk["bbox"] + doc["meta"]["bbox_x"] = bbox.get("x") + doc["meta"]["bbox_y"] = bbox.get("y") + doc["meta"]["bbox_width"] = bbox.get("width") + doc["meta"]["bbox_height"] = bbox.get("height") + + # Add other metadata + doc["meta"].update({ + k: v for k, v in metadata.items() + if k not in ["dao_id", "doc_id"] + }) + + docs.append(doc) + + return docs + + +def validate_parsed_doc_for_rag(parsed_doc: ParsedDocument) -> tuple[bool, List[str]]: + """ + Validate ParsedDocument has all required fields for RAG + + Args: + parsed_doc: ParsedDocument to validate + + Returns: + Tuple of (is_valid, list_of_errors) + """ + errors = [] + + # Check doc_id + if not parsed_doc.doc_id: + errors.append("doc_id is required") + + # Check pages + if not parsed_doc.pages: + errors.append("pages list is empty") + + # Check metadata.dao_id + if not parsed_doc.metadata.get("dao_id"): + errors.append("metadata.dao_id is required for RAG filtering") + + # Check that pages have blocks + for idx, page in enumerate(parsed_doc.pages, start=1): + if not page.blocks: + errors.append(f"Page {idx} has no blocks") + + # Check blocks have text + for block_idx, block in enumerate(page.blocks, start=1): + if not block.text or not block.text.strip(): + errors.append(f"Page {idx}, block {block_idx} has no text") + + return len(errors) == 0, errors +