diff --git a/TODO-RAG.md b/TODO-RAG.md new file mode 100644 index 00000000..2a0e83ea --- /dev/null +++ b/TODO-RAG.md @@ -0,0 +1,369 @@ +# TODO — RAG Stack (Haystack + PARSER Agent) + +Цей план описує, як побудувати RAG-шар навколо PARSER (dots.ocr) та DAGI Router. + +**Статус:** 🟡 Планування + +--- + +## 1. Document Store (pgvector або Qdrant) + +### 1.1. Вибір бекенду + +- [ ] Обрати бекенд: + - [ ] `Postgres + pgvector` (рекомендовано, якщо в нас уже є Postgres) + - [ ] або `Qdrant` (docker-сервіс) + +**Рекомендація:** Використати `pgvector` (вже є в `city-db`) + +### 1.2. Ініціалізація Haystack DocumentStore + +Приклад для PostgreSQL + pgvector: + +```python +# services/rag-service/app/document_store.py + +from haystack.document_stores import PGVectorDocumentStore + +def get_document_store() -> PGVectorDocumentStore: + return PGVectorDocumentStore( + connection_string="postgresql+psycopg2://postgres:postgres@city-db:5432/daarion_city", + embedding_dim=1024, # залежить від моделі ембеддингів + table_name="rag_documents", + search_strategy="approximate", + ) +``` + +**Завдання:** +- [ ] Створити `services/rag-service/` структуру +- [ ] Додати `app/document_store.py` з ініціалізацією +- [ ] Налаштувати підключення до `city-db` + +--- + +## 2. Embedding-модель + +### 2.1. Обрати модель + +- [ ] Вибрати embedding-модель: + - [ ] `BAAI/bge-m3` (multilingual, 1024 dim) + - [ ] `sentence-transformers/all-MiniLM-L12-v2` (легка, 384 dim) + - [ ] `intfloat/multilingual-e5-base` (українська підтримка, 768 dim) + +**Рекомендація:** `BAAI/bge-m3` для кращої підтримки української + +### 2.2. Обгортка під Haystack + +```python +# services/rag-service/app/embedding.py + +from haystack.components.embedders import SentenceTransformersTextEmbedder + +def get_text_embedder(): + return SentenceTransformersTextEmbedder( + model="BAAI/bge-m3", + device="cuda" # або "cpu" + ) +``` + +**Завдання:** +- [ ] Створити `app/embedding.py` +- [ ] Додати конфігурацію моделі через env +- [ ] Тестувати на українському тексті + +--- + +## 3. Ingest-пайплайн: PARSER → RAG + +### 3.1. Функція ingest_document + +- [ ] Створити `services/rag-service/app/ingest_pipeline.py`: + +```python +# services/rag-service/app/ingest_pipeline.py + +from haystack import Pipeline +from haystack.components.preprocessors import DocumentSplitter +from haystack.components.writers import DocumentWriter +from haystack.schema import Document + +from .document_store import get_document_store +from .embedding import get_text_embedder + +# 1) splitter — якщо треба додатково різати текст +splitter = DocumentSplitter( + split_by="sentence", + split_length=8, + split_overlap=1 +) + +embedder = get_text_embedder() +doc_store = get_document_store() +writer = DocumentWriter(document_store=doc_store) + +ingest_pipeline = Pipeline() +ingest_pipeline.add_component("splitter", splitter) +ingest_pipeline.add_component("embedder", embedder) +ingest_pipeline.add_component("writer", writer) + +ingest_pipeline.connect("splitter.documents", "embedder.documents") +ingest_pipeline.connect("embedder.documents", "writer.documents") + + +def ingest_parsed_document(dao_id: str, doc_id: str, parsed_json: dict): + """ + parsed_json — результат PARSER (mode=raw_json або qa_pairs/chunks). + Тут треба перетворити його у список haystack.Document. + """ + blocks = parsed_json.get("blocks", []) + docs = [] + + for b in blocks: + text = b.get("text") or "" + if not text.strip(): + continue + + meta = { + "dao_id": dao_id, + "doc_id": doc_id, + "page": b.get("page"), + "section_type": b.get("type"), + } + + docs.append(Document(content=text, meta=meta)) + + if not docs: + return + + ingest_pipeline.run( + { + "splitter": {"documents": docs} + } + ) +``` + +**Завдання:** +- [ ] Створити ingest pipeline +- [ ] Додати конвертацію ParsedDocument → Haystack Documents +- [ ] Додати обробку chunks mode (якщо PARSER повертає готові chunks) + +### 3.2. Інтеграція з PARSER Service + +- [ ] Додати виклик `parser-service` у DevTools / CrewAI workflow: + - [ ] Завантажити файл + - [ ] Викликати `/ocr/parse?output_mode=raw_json` або `/ocr/parse_chunks` + - [ ] Передати `parsed_json` у `ingest_parsed_document` + +**Завдання:** +- [ ] Створити `services/rag-service/app/parser_client.py` для виклику parser-service +- [ ] Додати endpoint `/rag/ingest` для завантаження документів +- [ ] Інтегрувати з Gateway для команди `/upload_doc` + +--- + +## 4. Query-пайплайн: питання → RAG → LLM + +### 4.1. Retriever + Generator + +```python +# services/rag-service/app/query_pipeline.py + +from haystack import Pipeline +from haystack.components.retrievers import DocumentRetriever +from haystack.components.generators import OpenAIGenerator # або свій LLM через DAGI Router +from .document_store import get_document_store +from .embedding import get_text_embedder + +doc_store = get_document_store() +embedder = get_text_embedder() + +retriever = DocumentRetriever(document_store=doc_store) +# У проді замінити на кастомний generator, що ходить у DAGI Router +generator = OpenAIGenerator( + api_key="DUMMY", + model="gpt-4o-mini" +) + +query_pipeline = Pipeline() +query_pipeline.add_component("embedder", embedder) +query_pipeline.add_component("retriever", retriever) +query_pipeline.add_component("generator", generator) + +query_pipeline.connect("embedder.documents", "retriever.documents") +query_pipeline.connect("retriever.documents", "generator.documents") + + +def answer_query(dao_id: str, question: str): + filters = {"dao_id": [dao_id]} + + result = query_pipeline.run( + { + "embedder": {"texts": [question]}, + "retriever": {"filters": filters}, + "generator": {"prompt": question}, + } + ) + + answer = result["generator"]["replies"][0] + documents = result["retriever"]["documents"] + return answer, documents +``` + +**У реальному стеку:** +- Генератором буде не OpenAI, а DAGI Router (через окремий компонент / кастомний генератор) +- Фільтри по `dao_id`, `roles`, `visibility` будуть інтегровані з RBAC + +**Завдання:** +- [ ] Створити query pipeline +- [ ] Додати кастомний generator для DAGI Router +- [ ] Додати RBAC фільтри +- [ ] Створити endpoint `/rag/query` + +--- + +## 5. Інтеграція з DAGI Router + +### 5.1. Режим `mode=rag_query` + +- [ ] Додати у `router-config.yml` rule: + +```yaml +routing: + - id: rag_query + when: + mode: rag_query + use_provider: llm_local_qwen3_8b # або окремий RAG-provider +``` + +- [ ] Додати handler у `RouterApp`, який: + - До виклику LLM запускає `answer_query(dao_id, question)` + - В prompt LLM додає витягнуті документи як контекст + +**Завдання:** +- [ ] Оновити `router-config.yml` +- [ ] Додати RAG provider в Router +- [ ] Створити handler для `mode=rag_query` + +--- + +## 6. RAG Service (FastAPI) + +### 6.1. Структура сервісу + +- [ ] Створити `services/rag-service/`: + - [ ] `app/main.py` - FastAPI додаток + - [ ] `app/api/endpoints.py` - ендпоінти: + - [ ] `POST /rag/ingest` - інжест документу + - [ ] `POST /rag/query` - запит до RAG + - [ ] `GET /rag/health` - health check + - [ ] `app/schemas.py` - Pydantic моделі + - [ ] `requirements.txt` - залежності (haystack, pgvector, etc.) + - [ ] `Dockerfile` + +### 6.2. Ендпоінти + +```python +# services/rag-service/app/api/endpoints.py + +@router.post("/rag/ingest") +async def ingest_document_endpoint( + doc_id: str, + dao_id: str, + parsed_doc: ParsedDocument # або doc_url для завантаження +): + """Ingest parsed document into RAG""" + # Викликати ingest_parsed_document() + pass + +@router.post("/rag/query") +async def query_endpoint( + dao_id: str, + question: str, + user_id: str +): + """Query RAG and return answer with citations""" + # Викликати answer_query() + # Повернути відповідь + цитати + pass +``` + +--- + +## 7. Інтеграція з DAARWIZZBot / microDAO + +### 7.1. Команди для бота + +- [ ] Додати команди в `gateway-bot/http_api.py`: + - [ ] `/upload_doc` → інжест документу в RAG через PARSER + - [ ] Підтримка завантаження файлів через Telegram + - [ ] Виклик `parser-service` → `rag-service` + - [ ] `/ask_doc` → питання до бази документів DAO + - [ ] Виклик `rag-service` → DAGI Router + - [ ] Відправка відповіді з цитатами + +### 7.2. RBAC + +- [ ] Хто може інжестити документи (`role: admin`, `role: researcher`) +- [ ] Хто може ставити питання до приватних документів +- [ ] Перевірка прав в `microdao/rbac.py` + +--- + +## 8. Тести + +- [ ] Інжест одного PDF (наприклад, "Токеноміка MicroDAO") через PARSER → ingest +- [ ] Питання: + > "Поясни, як працює стейкінг у цьому microDAO." +- [ ] Перевірити, що Haystack знаходить потрібні фрагменти і LLM будує відповідь з цитатами + +**Завдання:** +- [ ] Створити тестові фікстури (PDF документи) +- [ ] E2E тести для ingest → query +- [ ] Тести на RBAC фільтри + +--- + +## Порядок виконання (рекомендований) + +### Фаза 1: Document Store + Embeddings (1-2 дні) +1. Налаштувати pgvector в city-db +2. Створити Haystack DocumentStore +3. Вибрати та налаштувати embedding-модель + +### Фаза 2: Ingest Pipeline (2-3 дні) +1. Створити ingest pipeline +2. Інтегрувати з PARSER Service +3. Створити RAG Service з endpoint `/rag/ingest` + +### Фаза 3: Query Pipeline (2-3 дні) +1. Створити query pipeline +2. Інтегрувати з DAGI Router +3. Додати RBAC фільтри + +### Фаза 4: Інтеграція з ботом (1-2 дні) +1. Додати команди `/upload_doc`, `/ask_doc` +2. Тестування E2E + +**Загальний час:** ~6-10 днів + +--- + +## Залежності + +### Python пакети +- `haystack-ai>=2.0.0` +- `sentence-transformers>=2.2.0` +- `pgvector>=0.2.0` +- `psycopg2-binary>=2.9.0` + +### Системні залежності +- PostgreSQL з pgvector (вже є в city-db) + +--- + +## Посилання + +- [PARSER Agent Documentation](./docs/agents/parser.md) +- [TODO: PARSER Implementation](./TODO-PARSER-RAG.md) +- [Haystack Documentation](https://docs.haystack.deepset.ai/) + diff --git a/services/parser-service/app/api/endpoints.py b/services/parser-service/app/api/endpoints.py index 3a99a00a..005316e8 100644 --- a/services/parser-service/app/api/endpoints.py +++ b/services/parser-service/app/api/endpoints.py @@ -90,12 +90,23 @@ async def parse_document_endpoint( # Parse document from images logger.info(f"Parsing document: {len(images)} page(s), mode: {output_mode}") - parsed_doc = parse_document_from_images( - images=images, - output_mode=output_mode, - doc_id=doc_id or str(uuid.uuid4()), - doc_type=doc_type - ) + # Check if using Ollama (async) or local model (sync) + from app.core.config import settings + if settings.RUNTIME_TYPE == "ollama": + from app.runtime.inference import parse_document_with_ollama + parsed_doc = await parse_document_with_ollama( + images=images, + output_mode=output_mode, + doc_id=doc_id or str(uuid.uuid4()), + doc_type=doc_type + ) + else: + parsed_doc = parse_document_from_images( + images=images, + output_mode=output_mode, + doc_id=doc_id or str(uuid.uuid4()), + doc_type=doc_type + ) # Build response based on output_mode response_data = {"metadata": { diff --git a/services/parser-service/app/core/config.py b/services/parser-service/app/core/config.py index 08c3882b..a6909abc 100644 --- a/services/parser-service/app/core/config.py +++ b/services/parser-service/app/core/config.py @@ -37,9 +37,12 @@ class Settings(BaseSettings): ALLOW_DUMMY_FALLBACK: bool = os.getenv("ALLOW_DUMMY_FALLBACK", "true").lower() == "true" # Runtime - RUNTIME_TYPE: Literal["local", "remote"] = os.getenv("RUNTIME_TYPE", "local") + RUNTIME_TYPE: Literal["local", "remote", "ollama"] = os.getenv("RUNTIME_TYPE", "local") RUNTIME_URL: str = os.getenv("RUNTIME_URL", "http://parser-runtime:11435") + # Ollama configuration (if RUNTIME_TYPE=ollama) + OLLAMA_BASE_URL: str = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") + class Config: env_file = ".env" case_sensitive = True diff --git a/services/parser-service/app/runtime/inference.py b/services/parser-service/app/runtime/inference.py index 8483c34c..ec596672 100644 --- a/services/parser-service/app/runtime/inference.py +++ b/services/parser-service/app/runtime/inference.py @@ -16,11 +16,94 @@ from app.runtime.preprocessing import ( ) from app.runtime.postprocessing import build_parsed_document from app.runtime.model_output_parser import parse_model_output_to_blocks +from app.runtime.ollama_client import ( + call_ollama_vision, parse_ollama_response, OutputMode as OllamaOutputMode +) from app.core.config import settings logger = logging.getLogger(__name__) +async def parse_document_with_ollama( + images: List[Image.Image], + output_mode: Literal["raw_json", "markdown", "qa_pairs", "chunks"] = "raw_json", + doc_id: Optional[str] = None, + doc_type: Literal["pdf", "image"] = "image" +) -> ParsedDocument: + """ + Parse document using Ollama API + + Args: + images: List of PIL Images + output_mode: Output format mode + doc_id: Document ID + doc_type: Document type + + Returns: + ParsedDocument + """ + import io + + # Convert output_mode to Ollama format + ollama_mode_map = { + "raw_json": OllamaOutputMode.raw_json, + "markdown": OllamaOutputMode.markdown, + "qa_pairs": OllamaOutputMode.qa_pairs, + "chunks": OllamaOutputMode.raw_json # Use raw_json for chunks, will be processed later + } + ollama_mode = ollama_mode_map.get(output_mode, OllamaOutputMode.raw_json) + + pages_data = [] + + for idx, image in enumerate(images, start=1): + try: + # Convert image to PNG bytes + buf = io.BytesIO() + image.convert("RGB").save(buf, format="PNG") + png_bytes = buf.getvalue() + + # Call Ollama + ollama_data = await call_ollama_vision(png_bytes, ollama_mode) + raw_text, parsed_json = parse_ollama_response(ollama_data, ollama_mode) + + logger.debug(f"Ollama output for page {idx}: {raw_text[:100]}...") + + # Parse into blocks + if parsed_json and isinstance(parsed_json, dict): + # Use structured JSON if available + blocks = parsed_json.get("blocks", []) + if not blocks: + # Fallback: create block from raw text + blocks = [{ + "type": "paragraph", + "text": raw_text, + "bbox": {"x": 0, "y": 0, "width": image.width, "height": image.height}, + "reading_order": 1 + }] + else: + # Parse plain text output + blocks = parse_model_output_to_blocks(raw_text, image.size, page_num=idx) + + pages_data.append({ + "blocks": blocks, + "width": image.width, + "height": image.height + }) + + logger.info(f"Processed page {idx}/{len(images)} via Ollama") + + except Exception as e: + logger.error(f"Error processing page {idx} with Ollama: {e}", exc_info=True) + continue + + return build_parsed_document( + pages_data=pages_data, + doc_id=doc_id or "parsed-doc", + doc_type=doc_type, + metadata={"model": settings.PARSER_MODEL_NAME, "runtime": "ollama"} + ) + + def parse_document_from_images( images: List[Image.Image], output_mode: Literal["raw_json", "markdown", "qa_pairs", "chunks"] = "raw_json", @@ -44,7 +127,12 @@ def parse_document_from_images( logger.info("Using dummy parser (USE_DUMMY_PARSER=true)") return dummy_parse_document_from_images(images, doc_id, doc_type) - # Try to get model + # Check if using Ollama runtime + if settings.RUNTIME_TYPE == "ollama": + logger.info("Using Ollama runtime") + return await parse_document_with_ollama(images, output_mode, doc_id, doc_type) + + # Try to get local model model = get_model() if model is None: diff --git a/services/parser-service/app/runtime/ollama_client.py b/services/parser-service/app/runtime/ollama_client.py new file mode 100644 index 00000000..f67528bd --- /dev/null +++ b/services/parser-service/app/runtime/ollama_client.py @@ -0,0 +1,127 @@ +""" +Ollama client for dots.ocr model +Alternative runtime using Ollama API +""" + +import base64 +import json +import logging +from typing import Dict, Any, Optional +from enum import Enum + +import httpx + +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +class OutputMode(str, Enum): + raw_json = "raw_json" + markdown = "markdown" + qa_pairs = "qa_pairs" + + +def build_prompt(mode: OutputMode) -> str: + """Build prompt for Ollama based on output mode""" + if mode == OutputMode.raw_json: + return ( + "You are a document OCR and layout parser. " + "Extract all text, tables, formulas, and layout into a clean JSON structure with fields like " + "`blocks`, `tables`, `reading_order`, including bounding boxes and page numbers. " + "Respond with JSON only, no explanations." + ) + elif mode == OutputMode.markdown: + return ( + "You are a document OCR and layout parser. " + "Extract the document as Markdown, preserving headings, paragraphs, and tables. " + "Tables should be proper GitHub-flavored Markdown tables. " + "Respond with Markdown as plain text." + ) + elif mode == OutputMode.qa_pairs: + return ( + "You are a document OCR and knowledge extraction assistant. " + "Read the document and output a JSON array of Q&A pairs covering the key information. " + "Each item should be {\"question\": ..., \"answer\": ..., \"page\": ..., \"section\": ...}. " + "Respond with JSON only, no explanations." + ) + return "You are a document OCR assistant. Extract text." + + +async def call_ollama_vision( + image_bytes: bytes, + mode: OutputMode, + model_name: Optional[str] = None +) -> Dict[str, Any]: + """ + Call Ollama vision API with image + + Args: + image_bytes: PNG image bytes + mode: Output mode + model_name: Model name (defaults to PARSER_MODEL_NAME) + + Returns: + Ollama response dictionary + """ + model_name = model_name or settings.PARSER_MODEL_NAME + + # Encode image to base64 + img_b64 = base64.b64encode(image_bytes).decode("ascii") + prompt = build_prompt(mode) + + body = { + "model": model_name, + "prompt": prompt, + "images": [img_b64], + "stream": False, + } + + url = f"{settings.OLLAMA_BASE_URL.rstrip('/')}/api/generate" + + logger.info(f"Calling Ollama: {url}, model: {model_name}, mode: {mode}") + + try: + async with httpx.AsyncClient(timeout=120.0) as client: + resp = await client.post(url, json=body) + resp.raise_for_status() + data = resp.json() + + logger.debug(f"Ollama response: {data.get('response', '')[:100]}...") + return data + + except httpx.HTTPError as e: + logger.error(f"Ollama HTTP error: {e}") + raise RuntimeError(f"Ollama API error: {e}") from e + except Exception as e: + logger.error(f"Ollama error: {e}", exc_info=True) + raise RuntimeError(f"Failed to call Ollama: {e}") from e + + +def parse_ollama_response( + ollama_data: Dict[str, Any], + mode: OutputMode +) -> tuple[str, Optional[Dict[str, Any]]]: + """ + Parse Ollama response + + Args: + ollama_data: Response from Ollama API + mode: Output mode + + Returns: + Tuple of (raw_text, parsed_json) + """ + raw_text = ollama_data.get("response", "").strip() + parsed_json: Optional[Dict[str, Any]] = None + + # Try to parse as JSON for raw_json and qa_pairs modes + if mode in (OutputMode.raw_json, OutputMode.qa_pairs): + try: + parsed_json = json.loads(raw_text) + except (json.JSONDecodeError, ValueError): + logger.warning(f"Failed to parse response as JSON for mode {mode}") + parsed_json = None + + return raw_text, parsed_json + diff --git a/services/parser-service/requirements.txt b/services/parser-service/requirements.txt index 98e75bca..84305172 100644 --- a/services/parser-service/requirements.txt +++ b/services/parser-service/requirements.txt @@ -23,5 +23,5 @@ python-dotenv>=1.0.1 # Testing pytest>=7.4.0 pytest-asyncio>=0.21.0 -httpx>=0.25.0 # For TestClient +httpx>=0.25.0 # For TestClient and Ollama client