feat: add RAG converter utilities and update integration guide
RAG Converter: - Create app/utils/rag_converter.py with conversion functions - parsed_doc_to_haystack_docs() - convert ParsedDocument to Haystack format - parsed_chunks_to_haystack_docs() - convert ParsedChunk list to Haystack - validate_parsed_doc_for_rag() - validate required fields before conversion - Automatic metadata extraction (dao_id, doc_id, page, block_type) - Preserve optional fields (bbox, section, reading_order) Integration Guide: - Update with ready-to-use converter functions - Add validation examples - Complete workflow examples
This commit is contained in:
@@ -174,6 +174,23 @@ async def route(request: RouterRequest):
|
||||
|
||||
### 1. Конвертація ParsedDocument → Haystack Documents
|
||||
|
||||
**Готова функція:** `app/utils/rag_converter.py`
|
||||
|
||||
```python
|
||||
from app.utils.rag_converter import parsed_doc_to_haystack_docs, validate_parsed_doc_for_rag
|
||||
|
||||
# Валідація перед конвертацією
|
||||
is_valid, errors = validate_parsed_doc_for_rag(parsed_doc)
|
||||
if not is_valid:
|
||||
logger.error(f"Document validation failed: {errors}")
|
||||
return
|
||||
|
||||
# Конвертація
|
||||
haystack_docs = parsed_doc_to_haystack_docs(parsed_doc)
|
||||
```
|
||||
|
||||
**Або вручну:**
|
||||
|
||||
```python
|
||||
from haystack.schema import Document
|
||||
|
||||
|
||||
16
services/parser-service/app/utils/__init__.py
Normal file
16
services/parser-service/app/utils/__init__.py
Normal file
@@ -0,0 +1,16 @@
|
||||
"""
|
||||
Utility functions for PARSER Service
|
||||
"""
|
||||
|
||||
from app.utils.rag_converter import (
|
||||
parsed_doc_to_haystack_docs,
|
||||
parsed_chunks_to_haystack_docs,
|
||||
validate_parsed_doc_for_rag
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"parsed_doc_to_haystack_docs",
|
||||
"parsed_chunks_to_haystack_docs",
|
||||
"validate_parsed_doc_for_rag"
|
||||
]
|
||||
|
||||
175
services/parser-service/app/utils/rag_converter.py
Normal file
175
services/parser-service/app/utils/rag_converter.py
Normal file
@@ -0,0 +1,175 @@
|
||||
"""
|
||||
Utilities for converting ParsedDocument to RAG formats
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import List, Dict, Any
|
||||
|
||||
from app.schemas import ParsedDocument, ParsedBlock, ParsedPage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parsed_doc_to_haystack_docs(parsed_doc: ParsedDocument) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Convert ParsedDocument to Haystack Documents format
|
||||
|
||||
This function prepares documents for RAG indexing by:
|
||||
- Extracting all blocks with text
|
||||
- Adding required metadata (dao_id, doc_id, page, block_type)
|
||||
- Preserving optional fields (bbox, section, reading_order)
|
||||
|
||||
Args:
|
||||
parsed_doc: ParsedDocument from PARSER service
|
||||
|
||||
Returns:
|
||||
List of dictionaries compatible with Haystack Document format
|
||||
"""
|
||||
docs = []
|
||||
|
||||
# Validate required fields
|
||||
if not parsed_doc.doc_id:
|
||||
logger.warning("ParsedDocument missing doc_id, cannot create RAG documents")
|
||||
return []
|
||||
|
||||
dao_id = parsed_doc.metadata.get("dao_id")
|
||||
if not dao_id:
|
||||
logger.warning(f"ParsedDocument missing metadata.dao_id for doc_id={parsed_doc.doc_id}")
|
||||
|
||||
for page in parsed_doc.pages:
|
||||
for block in page.blocks:
|
||||
# Skip empty blocks
|
||||
if not block.text or not block.text.strip():
|
||||
continue
|
||||
|
||||
# Build metadata (must-have для RAG)
|
||||
meta: Dict[str, Any] = {
|
||||
"dao_id": dao_id or "",
|
||||
"doc_id": parsed_doc.doc_id,
|
||||
"page": page.page_num,
|
||||
"block_type": block.type,
|
||||
"reading_order": block.reading_order
|
||||
}
|
||||
|
||||
# Add optional fields
|
||||
if block.bbox:
|
||||
meta["bbox_x"] = block.bbox.x
|
||||
meta["bbox_y"] = block.bbox.y
|
||||
meta["bbox_width"] = block.bbox.width
|
||||
meta["bbox_height"] = block.bbox.height
|
||||
|
||||
# Add section if it's a heading
|
||||
if block.type == "heading":
|
||||
meta["section"] = block.text[:100] # First 100 chars as section name
|
||||
|
||||
# Add table data if present
|
||||
if block.type == "table" and block.table_data:
|
||||
meta["table_rows"] = len(block.table_data.rows)
|
||||
meta["table_columns"] = len(block.table_data.columns)
|
||||
|
||||
# Add document-level metadata
|
||||
if parsed_doc.metadata:
|
||||
meta.update({
|
||||
k: v for k, v in parsed_doc.metadata.items()
|
||||
if k not in ["dao_id"] # Already added
|
||||
})
|
||||
|
||||
# Create document dict (Haystack format)
|
||||
doc = {
|
||||
"content": block.text.strip(),
|
||||
"meta": meta
|
||||
}
|
||||
|
||||
docs.append(doc)
|
||||
|
||||
logger.info(f"Converted {len(docs)} blocks to Haystack documents for doc_id={parsed_doc.doc_id}")
|
||||
return docs
|
||||
|
||||
|
||||
def parsed_chunks_to_haystack_docs(chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Convert ParsedChunk list to Haystack Documents format
|
||||
|
||||
Args:
|
||||
chunks: List of ParsedChunk dictionaries
|
||||
|
||||
Returns:
|
||||
List of Haystack Document dictionaries
|
||||
"""
|
||||
docs = []
|
||||
|
||||
for chunk in chunks:
|
||||
# Validate required fields
|
||||
if not chunk.get("text") or not chunk.get("text", "").strip():
|
||||
continue
|
||||
|
||||
metadata = chunk.get("metadata", {})
|
||||
if not metadata.get("dao_id") or not metadata.get("doc_id"):
|
||||
logger.warning(f"Chunk missing required metadata: {metadata}")
|
||||
continue
|
||||
|
||||
doc = {
|
||||
"content": chunk["text"].strip(),
|
||||
"meta": {
|
||||
"dao_id": metadata["dao_id"],
|
||||
"doc_id": metadata["doc_id"],
|
||||
"page": chunk.get("page", 1),
|
||||
"section": chunk.get("section"),
|
||||
}
|
||||
}
|
||||
|
||||
# Add bbox if present
|
||||
if chunk.get("bbox"):
|
||||
bbox = chunk["bbox"]
|
||||
doc["meta"]["bbox_x"] = bbox.get("x")
|
||||
doc["meta"]["bbox_y"] = bbox.get("y")
|
||||
doc["meta"]["bbox_width"] = bbox.get("width")
|
||||
doc["meta"]["bbox_height"] = bbox.get("height")
|
||||
|
||||
# Add other metadata
|
||||
doc["meta"].update({
|
||||
k: v for k, v in metadata.items()
|
||||
if k not in ["dao_id", "doc_id"]
|
||||
})
|
||||
|
||||
docs.append(doc)
|
||||
|
||||
return docs
|
||||
|
||||
|
||||
def validate_parsed_doc_for_rag(parsed_doc: ParsedDocument) -> tuple[bool, List[str]]:
|
||||
"""
|
||||
Validate ParsedDocument has all required fields for RAG
|
||||
|
||||
Args:
|
||||
parsed_doc: ParsedDocument to validate
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, list_of_errors)
|
||||
"""
|
||||
errors = []
|
||||
|
||||
# Check doc_id
|
||||
if not parsed_doc.doc_id:
|
||||
errors.append("doc_id is required")
|
||||
|
||||
# Check pages
|
||||
if not parsed_doc.pages:
|
||||
errors.append("pages list is empty")
|
||||
|
||||
# Check metadata.dao_id
|
||||
if not parsed_doc.metadata.get("dao_id"):
|
||||
errors.append("metadata.dao_id is required for RAG filtering")
|
||||
|
||||
# Check that pages have blocks
|
||||
for idx, page in enumerate(parsed_doc.pages, start=1):
|
||||
if not page.blocks:
|
||||
errors.append(f"Page {idx} has no blocks")
|
||||
|
||||
# Check blocks have text
|
||||
for block_idx, block in enumerate(page.blocks, start=1):
|
||||
if not block.text or not block.text.strip():
|
||||
errors.append(f"Page {idx}, block {block_idx} has no text")
|
||||
|
||||
return len(errors) == 0, errors
|
||||
|
||||
Reference in New Issue
Block a user