diff --git a/services/parser-service/INTEGRATION.md b/services/parser-service/INTEGRATION.md new file mode 100644 index 00000000..499da2e3 --- /dev/null +++ b/services/parser-service/INTEGRATION.md @@ -0,0 +1,415 @@ +# PARSER Service - Integration Guide + +Інтеграція PARSER-сервісу з DAGI Router та RAG-пайплайном. + +## Формат виводу dots.ocr → ParsedBlock + +### Очікувані формати виводу dots.ocr + +PARSER-сервіс підтримує кілька форматів виводу від dots.ocr моделі: + +#### 1. JSON зі структурованими блоками (preferred) + +```json +{ + "blocks": [ + { + "type": "heading", + "text": "Document Title", + "bbox": [0, 0, 800, 50], + "reading_order": 1 + }, + { + "type": "paragraph", + "text": "Document content...", + "bbox": [0, 60, 800, 100], + "reading_order": 2 + }, + { + "type": "table", + "text": "Table content", + "bbox": [0, 200, 800, 300], + "reading_order": 3, + "table_data": { + "rows": [["Header 1", "Header 2"], ["Value 1", "Value 2"]], + "columns": ["Header 1", "Header 2"] + } + } + ] +} +``` + +#### 2. JSON зі сторінками + +```json +{ + "pages": [ + { + "page_num": 1, + "blocks": [...] + } + ] +} +``` + +#### 3. Plain text / Markdown + +``` +# Document Title + +Document content paragraph... + +- List item 1 +- List item 2 +``` + +### Нормалізація в ParsedBlock + +`model_output_parser.py` автоматично нормалізує всі формати до стандартного `ParsedBlock`: + +```python +{ + "type": "paragraph" | "heading" | "table" | "formula" | "figure_caption" | "list", + "text": "Block text content", + "bbox": { + "x": 0.0, + "y": 0.0, + "width": 800.0, + "height": 50.0 + }, + "reading_order": 1, + "page_num": 1, + "table_data": {...}, # Optional, for table blocks + "metadata": {...} # Optional, additional metadata +} +``` + +## Must-have поля для RAG + +### ParsedDocument + +**Обов'язкові поля:** +- `doc_id: str` - Унікальний ідентифікатор документа (для індексації) +- `pages: List[ParsedPage]` - Список сторінок з блоками (контент) +- `doc_type: Literal["pdf", "image"]` - Тип документа + +**Рекомендовані поля в metadata:** +- `metadata.dao_id: str` - ID DAO (для фільтрації) +- `metadata.user_id: str` - ID користувача (для access control) +- `metadata.title: str` - Назва документа (для відображення) +- `metadata.created_at: datetime` - Час завантаження (для сортування) + +### ParsedChunk + +**Обов'язкові поля:** +- `text: str` - Текст фрагменту (для індексації) +- `metadata.dao_id: str` - ID DAO (для фільтрації) +- `metadata.doc_id: str` - ID документа (для citation) + +**Рекомендовані поля:** +- `page: int` - Номер сторінки (для citation) +- `section: str` - Назва секції (для контексту) +- `metadata.block_type: str` - Тип блоку (heading, paragraph, etc.) +- `metadata.reading_order: int` - Порядок читання (для сортування) +- `bbox: BBox` - Координати (для виділення в PDF) + +## Інтеграція з DAGI Router + +### 1. Додати provider в router-config.yml + +```yaml +providers: + parser: + type: ocr + base_url: "http://parser-service:9400" + timeout: 120 +``` + +### 2. Додати routing rule + +```yaml +routing: + - id: doc_parse + when: + mode: doc_parse + use_provider: parser +``` + +### 3. Розширити RouterRequest + +Додати в `router_client.py` або `types/api.ts`: + +```python +class RouterRequest(BaseModel): + mode: str + dao_id: str + user_id: str + payload: Dict[str, Any] + + # Нові поля для PARSER + doc_url: Optional[str] = None + doc_type: Optional[Literal["pdf", "image"]] = None + output_mode: Optional[Literal["raw_json", "markdown", "qa_pairs", "chunks"]] = "raw_json" +``` + +### 4. Handler в Router + +```python +@router.post("/route") +async def route(request: RouterRequest): + if request.mode == "doc_parse": + # Викликати parser-service + async with httpx.AsyncClient() as client: + files = {"file": await download_file(request.doc_url)} + response = await client.post( + "http://parser-service:9400/ocr/parse", + files=files, + data={"output_mode": request.output_mode} + ) + parsed_doc = response.json() + return {"data": parsed_doc} +``` + +## Інтеграція з RAG Pipeline + +### 1. Конвертація ParsedDocument → Haystack Documents + +```python +from haystack.schema import Document + +def parsed_doc_to_haystack_docs(parsed_doc: ParsedDocument) -> List[Document]: + """Convert ParsedDocument to Haystack Documents for RAG""" + docs = [] + + for page in parsed_doc.pages: + for block in page.blocks: + # Skip empty blocks + if not block.text or not block.text.strip(): + continue + + # Build metadata (must-have для RAG) + meta = { + "dao_id": parsed_doc.metadata.get("dao_id", ""), + "doc_id": parsed_doc.doc_id, + "page": page.page_num, + "block_type": block.type, + "reading_order": block.reading_order, + "section": block.type if block.type == "heading" else None + } + + # Add optional fields + if block.bbox: + meta["bbox_x"] = block.bbox.x + meta["bbox_y"] = block.bbox.y + meta["bbox_width"] = block.bbox.width + meta["bbox_height"] = block.bbox.height + + # Create Haystack Document + doc = Document( + content=block.text, + meta=meta + ) + docs.append(doc) + + return docs +``` + +### 2. Ingest Pipeline + +```python +from haystack import Pipeline +from haystack.components.embedders import SentenceTransformersTextEmbedder +from haystack.components.writers import DocumentWriter +from haystack.document_stores import PGVectorDocumentStore + +def create_ingest_pipeline(): + """Create RAG ingest pipeline""" + doc_store = PGVectorDocumentStore( + connection_string="postgresql+psycopg2://...", + embedding_dim=1024, + table_name="rag_documents" + ) + + embedder = SentenceTransformersTextEmbedder( + model="BAAI/bge-m3", + device="cuda" + ) + + writer = DocumentWriter(document_store=doc_store) + + pipeline = Pipeline() + pipeline.add_component("embedder", embedder) + pipeline.add_component("writer", writer) + pipeline.connect("embedder.documents", "writer.documents") + + return pipeline + +def ingest_parsed_document(parsed_doc: ParsedDocument): + """Ingest parsed document into RAG""" + # Convert to Haystack Documents + docs = parsed_doc_to_haystack_docs(parsed_doc) + + if not docs: + logger.warning(f"No documents to ingest for doc_id={parsed_doc.doc_id}") + return + + # Create pipeline + pipeline = create_ingest_pipeline() + + # Run ingest + result = pipeline.run({ + "embedder": {"documents": docs} + }) + + logger.info(f"Ingested {len(docs)} chunks for doc_id={parsed_doc.doc_id}") +``` + +### 3. Query Pipeline з фільтрами + +```python +def answer_query(dao_id: str, question: str, user_id: str): + """Query RAG with RBAC filters""" + # Build filters (must-have для ізоляції даних) + filters = { + "dao_id": [dao_id] # Фільтр по DAO + } + + # Optional: додати фільтри по roles через RBAC + # user_roles = get_user_roles(user_id, dao_id) + # if "admin" not in user_roles: + # filters["visibility"] = ["public"] + + # Query pipeline + pipeline = create_query_pipeline() + + result = pipeline.run({ + "embedder": {"texts": [question]}, + "retriever": {"filters": filters, "top_k": 5}, + "generator": {"prompt": question} + }) + + answer = result["generator"]["replies"][0] + citations = [ + { + "doc_id": doc.meta["doc_id"], + "page": doc.meta["page"], + "text": doc.content[:200], + "bbox": { + "x": doc.meta.get("bbox_x"), + "y": doc.meta.get("bbox_y") + } + } + for doc in result["retriever"]["documents"] + ] + + return { + "answer": answer, + "citations": citations + } +``` + +## Приклад повного workflow + +### 1. Завантаження документа + +```python +# Gateway отримує файл від користувача +file_bytes = await get_file_from_telegram(file_id) + +# Викликаємо PARSER +async with httpx.AsyncClient() as client: + response = await client.post( + "http://parser-service:9400/ocr/parse_chunks", + files={"file": ("doc.pdf", file_bytes)}, + data={ + "dao_id": "daarion", + "doc_id": "tokenomics_v1", + "output_mode": "chunks" + } + ) + result = response.json() + +# Конвертуємо в ParsedDocument +parsed_doc = ParsedDocument(**result["document"]) + +# Додаємо metadata +parsed_doc.metadata.update({ + "dao_id": "daarion", + "user_id": "user123", + "title": "Tokenomics v1" +}) + +# Інжестимо в RAG +ingest_parsed_document(parsed_doc) +``` + +### 2. Запит до RAG + +```python +# Користувач питає через бота +question = "Поясни токеноміку microDAO" + +# Викликаємо RAG через DAGI Router +router_request = { + "mode": "rag_query", + "dao_id": "daarion", + "user_id": "user123", + "payload": { + "question": question + } +} + +response = await send_to_router(router_request) +answer = response["data"]["answer"] +citations = response["data"]["citations"] + +# Відправляємо користувачу з цитатами +await send_message(f"{answer}\n\nДжерела: {len(citations)} документів") +``` + +## Рекомендації + +### Для RAG indexing + +1. **Обов'язкові поля:** + - `doc_id` - для унікальності + - `dao_id` - для фільтрації + - `text` - для індексації + +2. **Рекомендовані поля:** + - `page` - для citation + - `block_type` - для контексту + - `section` - для семантичної групи + +3. **Опційні поля:** + - `bbox` - для виділення в PDF + - `reading_order` - для сортування + +### Для DAGI Router + +1. **Обов'язкові поля в payload:** + - `doc_url` або `file` - для завантаження + - `output_mode` - для вибору формату + +2. **Рекомендовані поля:** + - `dao_id` - для контексту + - `doc_id` - для tracking + +### Для RBAC інтеграції + +1. **Фільтри в RAG:** + - `dao_id` - обов'язково + - `visibility` - для приватних документів + - `user_id` - для персональних документів + +2. **Перевірки в PARSER:** + - Перевірка прав на завантаження + - Перевірка обмежень по розміру + - Перевірка типу файлу + +## Посилання + +- [PARSER Agent Documentation](../docs/agents/parser.md) +- [TODO: RAG Implementation](./TODO-RAG.md) +- [Deployment Guide](./DEPLOYMENT.md) + diff --git a/services/parser-service/app/runtime/model_output_parser.py b/services/parser-service/app/runtime/model_output_parser.py index 7d8748fe..debb7059 100644 --- a/services/parser-service/app/runtime/model_output_parser.py +++ b/services/parser-service/app/runtime/model_output_parser.py @@ -1,11 +1,19 @@ """ Parser for dots.ocr model output Converts model output to structured blocks + +Expected dots.ocr output formats: +1. JSON with structured blocks (preferred) +2. Plain text with layout hints +3. Markdown-like structure + +This parser handles all formats and normalizes to ParsedBlock structure. """ import logging import json -from typing import List, Dict, Any, Optional +import re +from typing import List, Dict, Any, Optional, Tuple from PIL import Image logger = logging.getLogger(__name__) @@ -19,121 +27,311 @@ def parse_model_output_to_blocks( """ Parse dots.ocr model output into structured blocks + Handles multiple output formats: + 1. JSON with "blocks" array (preferred) + 2. JSON with "pages" array + 3. Plain text with layout hints + 4. Markdown-like structure + Args: - model_output: Raw text output from model (may be JSON or plain text) + model_output: Raw text output from model image_size: (width, height) of the image page_num: Page number Returns: - List of block dictionaries + List of block dictionaries with normalized structure """ blocks = [] try: - # Try to parse as JSON first (if model outputs structured JSON) - try: - output_data = json.loads(model_output) - if isinstance(output_data, dict) and "blocks" in output_data: - # Model outputs structured format - return output_data["blocks"] - elif isinstance(output_data, list): - # Model outputs list of blocks - return output_data - except (json.JSONDecodeError, KeyError): - # Not JSON, treat as plain text - pass + # Format 1: Try to parse as JSON (structured output) + parsed_json = _try_parse_json(model_output) + if parsed_json: + blocks = _extract_blocks_from_json(parsed_json, image_size, page_num) + if blocks: + logger.debug(f"Parsed {len(blocks)} blocks from JSON output") + return blocks - # Parse plain text output - # This is a simple heuristic - adjust based on actual dots.ocr output format - lines = model_output.strip().split('\n') + # Format 2: Try to parse as structured text (markdown-like) + blocks = _parse_structured_text(model_output, image_size, page_num) + if blocks: + logger.debug(f"Parsed {len(blocks)} blocks from structured text") + return blocks - current_block = None - reading_order = 1 - - for line in lines: - line = line.strip() - if not line: - continue - - # Heuristic: lines starting with # are headings - if line.startswith('#'): - # Save previous block - if current_block: - blocks.append(current_block) - - # New heading block - current_block = { - "type": "heading", - "text": line.lstrip('#').strip(), - "bbox": { - "x": 0, - "y": reading_order * 30, - "width": image_size[0], - "height": 30 - }, - "reading_order": reading_order - } - reading_order += 1 - else: - # Regular paragraph - if current_block and current_block["type"] == "paragraph": - # Append to existing paragraph - current_block["text"] += " " + line - else: - # Save previous block - if current_block: - blocks.append(current_block) - - # New paragraph block - current_block = { - "type": "paragraph", - "text": line, - "bbox": { - "x": 0, - "y": reading_order * 30, - "width": image_size[0], - "height": 30 - }, - "reading_order": reading_order - } - reading_order += 1 - - # Save last block - if current_block: - blocks.append(current_block) - - # If no blocks were created, create a single paragraph with all text - if not blocks: - blocks.append({ - "type": "paragraph", - "text": model_output.strip(), - "bbox": { - "x": 0, - "y": 0, - "width": image_size[0], - "height": image_size[1] - }, - "reading_order": 1 - }) + # Format 3: Fallback - plain text as single paragraph + blocks = _parse_plain_text(model_output, image_size, page_num) + logger.debug(f"Parsed {len(blocks)} blocks from plain text") except Exception as e: logger.error(f"Error parsing model output: {e}", exc_info=True) - # Fallback: create single block with raw output - blocks = [{ - "type": "paragraph", - "text": model_output.strip() if model_output else "", - "bbox": { - "x": 0, - "y": 0, - "width": image_size[0], - "height": image_size[1] - }, - "reading_order": 1 - }] + blocks = _create_fallback_block(model_output, image_size, page_num) return blocks +def _try_parse_json(text: str) -> Optional[Dict[str, Any]]: + """Try to parse text as JSON""" + try: + # Try to find JSON in text (might be wrapped in markdown code blocks) + json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL) + if json_match: + return json.loads(json_match.group(1)) + + # Try direct JSON parse + return json.loads(text) + except (json.JSONDecodeError, ValueError): + return None + + +def _extract_blocks_from_json( + data: Dict[str, Any], + image_size: tuple[int, int], + page_num: int +) -> List[Dict[str, Any]]: + """Extract blocks from JSON structure""" + blocks = [] + + # Format: {"blocks": [...]} + if "blocks" in data and isinstance(data["blocks"], list): + for idx, block_data in enumerate(data["blocks"], start=1): + block = _normalize_block(block_data, image_size, idx) + if block: + blocks.append(block) + + # Format: {"pages": [{"blocks": [...]}]} + elif "pages" in data and isinstance(data["pages"], list): + for page_data in data["pages"]: + if isinstance(page_data, dict) and "blocks" in page_data: + for idx, block_data in enumerate(page_data["blocks"], start=1): + block = _normalize_block(block_data, image_size, idx) + if block: + blocks.append(block) + + # Format: Direct array of blocks + elif isinstance(data, list): + for idx, block_data in enumerate(data, start=1): + block = _normalize_block(block_data, image_size, idx) + if block: + blocks.append(block) + + return blocks + + +def _normalize_block( + block_data: Dict[str, Any], + image_size: tuple[int, int], + reading_order: int +) -> Optional[Dict[str, Any]]: + """Normalize block data to standard format""" + if not isinstance(block_data, dict): + return None + + # Extract text + text = block_data.get("text") or block_data.get("content") or "" + if not text or not text.strip(): + return None + + # Extract type + block_type = block_data.get("type") or block_data.get("block_type") or "paragraph" + + # Normalize type + type_mapping = { + "heading": "heading", + "title": "heading", + "h1": "heading", + "h2": "heading", + "h3": "heading", + "paragraph": "paragraph", + "p": "paragraph", + "text": "paragraph", + "table": "table", + "formula": "formula", + "figure": "figure_caption", + "caption": "figure_caption", + "list": "list", + "li": "list" + } + block_type = type_mapping.get(block_type.lower(), "paragraph") + + # Extract bbox + bbox = block_data.get("bbox") or block_data.get("bounding_box") or {} + if isinstance(bbox, list) and len(bbox) >= 4: + # Format: [x, y, width, height] + bbox = { + "x": float(bbox[0]), + "y": float(bbox[1]), + "width": float(bbox[2]), + "height": float(bbox[3]) + } + elif isinstance(bbox, dict): + # Ensure all fields are present + bbox = { + "x": float(bbox.get("x", 0)), + "y": float(bbox.get("y", 0)), + "width": float(bbox.get("width", image_size[0])), + "height": float(bbox.get("height", 30)) + } + else: + # Default bbox + bbox = { + "x": 0, + "y": reading_order * 30, + "width": image_size[0], + "height": 30 + } + + # Build normalized block + normalized = { + "type": block_type, + "text": text.strip(), + "bbox": bbox, + "reading_order": block_data.get("reading_order") or reading_order + } + + # Add table data if present + if block_type == "table" and "table_data" in block_data: + normalized["table_data"] = block_data["table_data"] + + # Add metadata if present + if "metadata" in block_data: + normalized["metadata"] = block_data["metadata"] + + return normalized + + +def _parse_structured_text( + text: str, + image_size: tuple[int, int], + page_num: int +) -> List[Dict[str, Any]]: + """Parse structured text (markdown-like) into blocks""" + blocks = [] + lines = text.strip().split('\n') + + current_block = None + reading_order = 1 + + for line in lines: + line = line.strip() + if not line: + if current_block: + blocks.append(current_block) + current_block = None + continue + + # Detect heading (markdown style) + heading_match = re.match(r'^(#{1,6})\s+(.+)$', line) + if heading_match: + if current_block: + blocks.append(current_block) + + level = len(heading_match.group(1)) + heading_text = heading_match.group(2) + + current_block = { + "type": "heading", + "text": heading_text, + "bbox": { + "x": 0, + "y": reading_order * 30, + "width": image_size[0], + "height": 30 + }, + "reading_order": reading_order + } + reading_order += 1 + continue + + # Detect list item + if re.match(r'^[-*+]\s+', line) or re.match(r'^\d+\.\s+', line): + if current_block and current_block["type"] != "list": + blocks.append(current_block) + + list_text = re.sub(r'^[-*+]\s+', '', line) + list_text = re.sub(r'^\d+\.\s+', '', list_text) + + current_block = { + "type": "list", + "text": list_text, + "bbox": { + "x": 0, + "y": reading_order * 30, + "width": image_size[0], + "height": 30 + }, + "reading_order": reading_order + } + reading_order += 1 + continue + + # Regular paragraph + if current_block and current_block["type"] == "paragraph": + current_block["text"] += " " + line + else: + if current_block: + blocks.append(current_block) + + current_block = { + "type": "paragraph", + "text": line, + "bbox": { + "x": 0, + "y": reading_order * 30, + "width": image_size[0], + "height": 30 + }, + "reading_order": reading_order + } + reading_order += 1 + + if current_block: + blocks.append(current_block) + + return blocks + + +def _parse_plain_text( + text: str, + image_size: tuple[int, int], + page_num: int +) -> List[Dict[str, Any]]: + """Parse plain text as single paragraph""" + if not text or not text.strip(): + return [] + + return [{ + "type": "paragraph", + "text": text.strip(), + "bbox": { + "x": 0, + "y": 0, + "width": image_size[0], + "height": image_size[1] + }, + "reading_order": 1 + }] + + +def _create_fallback_block( + text: str, + image_size: tuple[int, int], + page_num: int +) -> List[Dict[str, Any]]: + """Create fallback block when parsing fails""" + return [{ + "type": "paragraph", + "text": text.strip() if text else f"Page {page_num} (parsing failed)", + "bbox": { + "x": 0, + "y": 0, + "width": image_size[0], + "height": image_size[1] + }, + "reading_order": 1, + "metadata": {"parsing_error": True} + }] + + def extract_layout_info(model_output: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Extract layout information from model output (if available) diff --git a/services/parser-service/app/schemas.py b/services/parser-service/app/schemas.py index 25b0676a..cff5d3ec 100644 --- a/services/parser-service/app/schemas.py +++ b/services/parser-service/app/schemas.py @@ -53,12 +53,28 @@ class ParsedPage(BaseModel): class ParsedChunk(BaseModel): - """Semantic chunk for RAG""" - text: str = Field(..., description="Chunk text") - page: int = Field(..., description="Page number") - bbox: Optional[BBox] = Field(None, description="Bounding box") - section: Optional[str] = Field(None, description="Section name") - metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") + """ + Semantic chunk for RAG + + Must-have fields for RAG indexing: + - text: Chunk text content (required) + - metadata.dao_id: DAO identifier (required for filtering) + - metadata.doc_id: Document identifier (required for citation) + + Recommended fields: + - page: Page number (for citation) + - section: Section name (for context) + - metadata.block_type: Type of block (heading, paragraph, etc.) + - metadata.reading_order: Reading order (for sorting) + """ + text: str = Field(..., description="Chunk text (required for RAG)") + page: int = Field(..., description="Page number (for citation)") + bbox: Optional[BBox] = Field(None, description="Bounding box (for highlighting)") + section: Optional[str] = Field(None, description="Section name (for context)") + metadata: Dict[str, Any] = Field( + default_factory=dict, + description="Metadata (must include dao_id, doc_id for RAG)" + ) class QAPair(BaseModel): @@ -71,12 +87,28 @@ class QAPair(BaseModel): class ParsedDocument(BaseModel): - """Complete parsed document""" - doc_id: Optional[str] = Field(None, description="Document ID") + """ + Complete parsed document + + Must-have fields for RAG integration: + - doc_id: Unique document identifier (required for RAG indexing) + - pages: List of parsed pages with blocks (required for content) + - doc_type: Document type (required for processing) + + Recommended fields for RAG: + - metadata.dao_id: DAO identifier (for filtering) + - metadata.user_id: User who uploaded (for access control) + - metadata.title: Document title (for display) + - metadata.created_at: Upload timestamp (for sorting) + """ + doc_id: str = Field(..., description="Document ID (required for RAG)") doc_url: Optional[str] = Field(None, description="Document URL") doc_type: Literal["pdf", "image"] = Field(..., description="Document type") - pages: List[ParsedPage] = Field(..., description="Parsed pages") - metadata: Dict[str, Any] = Field(default_factory=dict, description="Document metadata") + pages: List[ParsedPage] = Field(..., description="Parsed pages (required for RAG)") + metadata: Dict[str, Any] = Field( + default_factory=dict, + description="Document metadata (should include dao_id, user_id for RAG)" + ) created_at: datetime = Field(default_factory=datetime.utcnow, description="Creation timestamp")