feat: add Ollama runtime support and RAG implementation plan

Ollama Runtime:
- Add ollama_client.py for Ollama API integration
- Support for dots-ocr model via Ollama
- Add OLLAMA_BASE_URL configuration
- Update inference.py to support Ollama runtime (RUNTIME_TYPE=ollama)
- Update endpoints to handle async Ollama calls
- Alternative to local transformers model

RAG Implementation Plan:
- Create TODO-RAG.md with detailed Haystack integration plan
- Document Store setup (pgvector)
- Embedding model selection
- Ingest pipeline (PARSER → RAG)
- Query pipeline (RAG → LLM)
- Integration with DAGI Router
- Bot commands (/upload_doc, /ask_doc)
- Testing strategy

Now supports three runtime modes:
1. Local transformers (RUNTIME_TYPE=local)
2. Ollama (RUNTIME_TYPE=ollama)
3. Dummy (USE_DUMMY_PARSER=true)
This commit is contained in:
Apple
2025-11-16 02:56:36 -08:00
parent d56ff3493d
commit 00f9102e50
6 changed files with 607 additions and 9 deletions

369
TODO-RAG.md Normal file
View File

@@ -0,0 +1,369 @@
# TODO — RAG Stack (Haystack + PARSER Agent)
Цей план описує, як побудувати RAG-шар навколо PARSER (dots.ocr) та DAGI Router.
**Статус:** 🟡 Планування
---
## 1. Document Store (pgvector або Qdrant)
### 1.1. Вибір бекенду
- [ ] Обрати бекенд:
- [ ] `Postgres + pgvector` (рекомендовано, якщо в нас уже є Postgres)
- [ ] або `Qdrant` (docker-сервіс)
**Рекомендація:** Використати `pgvector` (вже є в `city-db`)
### 1.2. Ініціалізація Haystack DocumentStore
Приклад для PostgreSQL + pgvector:
```python
# services/rag-service/app/document_store.py
from haystack.document_stores import PGVectorDocumentStore
def get_document_store() -> PGVectorDocumentStore:
return PGVectorDocumentStore(
connection_string="postgresql+psycopg2://postgres:postgres@city-db:5432/daarion_city",
embedding_dim=1024, # залежить від моделі ембеддингів
table_name="rag_documents",
search_strategy="approximate",
)
```
**Завдання:**
- [ ] Створити `services/rag-service/` структуру
- [ ] Додати `app/document_store.py` з ініціалізацією
- [ ] Налаштувати підключення до `city-db`
---
## 2. Embedding-модель
### 2.1. Обрати модель
- [ ] Вибрати embedding-модель:
- [ ] `BAAI/bge-m3` (multilingual, 1024 dim)
- [ ] `sentence-transformers/all-MiniLM-L12-v2` (легка, 384 dim)
- [ ] `intfloat/multilingual-e5-base` (українська підтримка, 768 dim)
**Рекомендація:** `BAAI/bge-m3` для кращої підтримки української
### 2.2. Обгортка під Haystack
```python
# services/rag-service/app/embedding.py
from haystack.components.embedders import SentenceTransformersTextEmbedder
def get_text_embedder():
return SentenceTransformersTextEmbedder(
model="BAAI/bge-m3",
device="cuda" # або "cpu"
)
```
**Завдання:**
- [ ] Створити `app/embedding.py`
- [ ] Додати конфігурацію моделі через env
- [ ] Тестувати на українському тексті
---
## 3. Ingest-пайплайн: PARSER → RAG
### 3.1. Функція ingest_document
- [ ] Створити `services/rag-service/app/ingest_pipeline.py`:
```python
# services/rag-service/app/ingest_pipeline.py
from haystack import Pipeline
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.schema import Document
from .document_store import get_document_store
from .embedding import get_text_embedder
# 1) splitter — якщо треба додатково різати текст
splitter = DocumentSplitter(
split_by="sentence",
split_length=8,
split_overlap=1
)
embedder = get_text_embedder()
doc_store = get_document_store()
writer = DocumentWriter(document_store=doc_store)
ingest_pipeline = Pipeline()
ingest_pipeline.add_component("splitter", splitter)
ingest_pipeline.add_component("embedder", embedder)
ingest_pipeline.add_component("writer", writer)
ingest_pipeline.connect("splitter.documents", "embedder.documents")
ingest_pipeline.connect("embedder.documents", "writer.documents")
def ingest_parsed_document(dao_id: str, doc_id: str, parsed_json: dict):
"""
parsed_json — результат PARSER (mode=raw_json або qa_pairs/chunks).
Тут треба перетворити його у список haystack.Document.
"""
blocks = parsed_json.get("blocks", [])
docs = []
for b in blocks:
text = b.get("text") or ""
if not text.strip():
continue
meta = {
"dao_id": dao_id,
"doc_id": doc_id,
"page": b.get("page"),
"section_type": b.get("type"),
}
docs.append(Document(content=text, meta=meta))
if not docs:
return
ingest_pipeline.run(
{
"splitter": {"documents": docs}
}
)
```
**Завдання:**
- [ ] Створити ingest pipeline
- [ ] Додати конвертацію ParsedDocument → Haystack Documents
- [ ] Додати обробку chunks mode (якщо PARSER повертає готові chunks)
### 3.2. Інтеграція з PARSER Service
- [ ] Додати виклик `parser-service` у DevTools / CrewAI workflow:
- [ ] Завантажити файл
- [ ] Викликати `/ocr/parse?output_mode=raw_json` або `/ocr/parse_chunks`
- [ ] Передати `parsed_json` у `ingest_parsed_document`
**Завдання:**
- [ ] Створити `services/rag-service/app/parser_client.py` для виклику parser-service
- [ ] Додати endpoint `/rag/ingest` для завантаження документів
- [ ] Інтегрувати з Gateway для команди `/upload_doc`
---
## 4. Query-пайплайн: питання → RAG → LLM
### 4.1. Retriever + Generator
```python
# services/rag-service/app/query_pipeline.py
from haystack import Pipeline
from haystack.components.retrievers import DocumentRetriever
from haystack.components.generators import OpenAIGenerator # або свій LLM через DAGI Router
from .document_store import get_document_store
from .embedding import get_text_embedder
doc_store = get_document_store()
embedder = get_text_embedder()
retriever = DocumentRetriever(document_store=doc_store)
# У проді замінити на кастомний generator, що ходить у DAGI Router
generator = OpenAIGenerator(
api_key="DUMMY",
model="gpt-4o-mini"
)
query_pipeline = Pipeline()
query_pipeline.add_component("embedder", embedder)
query_pipeline.add_component("retriever", retriever)
query_pipeline.add_component("generator", generator)
query_pipeline.connect("embedder.documents", "retriever.documents")
query_pipeline.connect("retriever.documents", "generator.documents")
def answer_query(dao_id: str, question: str):
filters = {"dao_id": [dao_id]}
result = query_pipeline.run(
{
"embedder": {"texts": [question]},
"retriever": {"filters": filters},
"generator": {"prompt": question},
}
)
answer = result["generator"]["replies"][0]
documents = result["retriever"]["documents"]
return answer, documents
```
**У реальному стеку:**
- Генератором буде не OpenAI, а DAGI Router (через окремий компонент / кастомний генератор)
- Фільтри по `dao_id`, `roles`, `visibility` будуть інтегровані з RBAC
**Завдання:**
- [ ] Створити query pipeline
- [ ] Додати кастомний generator для DAGI Router
- [ ] Додати RBAC фільтри
- [ ] Створити endpoint `/rag/query`
---
## 5. Інтеграція з DAGI Router
### 5.1. Режим `mode=rag_query`
- [ ] Додати у `router-config.yml` rule:
```yaml
routing:
- id: rag_query
when:
mode: rag_query
use_provider: llm_local_qwen3_8b # або окремий RAG-provider
```
- [ ] Додати handler у `RouterApp`, який:
- До виклику LLM запускає `answer_query(dao_id, question)`
- В prompt LLM додає витягнуті документи як контекст
**Завдання:**
- [ ] Оновити `router-config.yml`
- [ ] Додати RAG provider в Router
- [ ] Створити handler для `mode=rag_query`
---
## 6. RAG Service (FastAPI)
### 6.1. Структура сервісу
- [ ] Створити `services/rag-service/`:
- [ ] `app/main.py` - FastAPI додаток
- [ ] `app/api/endpoints.py` - ендпоінти:
- [ ] `POST /rag/ingest` - інжест документу
- [ ] `POST /rag/query` - запит до RAG
- [ ] `GET /rag/health` - health check
- [ ] `app/schemas.py` - Pydantic моделі
- [ ] `requirements.txt` - залежності (haystack, pgvector, etc.)
- [ ] `Dockerfile`
### 6.2. Ендпоінти
```python
# services/rag-service/app/api/endpoints.py
@router.post("/rag/ingest")
async def ingest_document_endpoint(
doc_id: str,
dao_id: str,
parsed_doc: ParsedDocument # або doc_url для завантаження
):
"""Ingest parsed document into RAG"""
# Викликати ingest_parsed_document()
pass
@router.post("/rag/query")
async def query_endpoint(
dao_id: str,
question: str,
user_id: str
):
"""Query RAG and return answer with citations"""
# Викликати answer_query()
# Повернути відповідь + цитати
pass
```
---
## 7. Інтеграція з DAARWIZZBot / microDAO
### 7.1. Команди для бота
- [ ] Додати команди в `gateway-bot/http_api.py`:
- [ ] `/upload_doc` → інжест документу в RAG через PARSER
- [ ] Підтримка завантаження файлів через Telegram
- [ ] Виклик `parser-service``rag-service`
- [ ] `/ask_doc` → питання до бази документів DAO
- [ ] Виклик `rag-service` → DAGI Router
- [ ] Відправка відповіді з цитатами
### 7.2. RBAC
- [ ] Хто може інжестити документи (`role: admin`, `role: researcher`)
- [ ] Хто може ставити питання до приватних документів
- [ ] Перевірка прав в `microdao/rbac.py`
---
## 8. Тести
- [ ] Інжест одного PDF (наприклад, "Токеноміка MicroDAO") через PARSER → ingest
- [ ] Питання:
> "Поясни, як працює стейкінг у цьому microDAO."
- [ ] Перевірити, що Haystack знаходить потрібні фрагменти і LLM будує відповідь з цитатами
**Завдання:**
- [ ] Створити тестові фікстури (PDF документи)
- [ ] E2E тести для ingest → query
- [ ] Тести на RBAC фільтри
---
## Порядок виконання (рекомендований)
### Фаза 1: Document Store + Embeddings (1-2 дні)
1. Налаштувати pgvector в city-db
2. Створити Haystack DocumentStore
3. Вибрати та налаштувати embedding-модель
### Фаза 2: Ingest Pipeline (2-3 дні)
1. Створити ingest pipeline
2. Інтегрувати з PARSER Service
3. Створити RAG Service з endpoint `/rag/ingest`
### Фаза 3: Query Pipeline (2-3 дні)
1. Створити query pipeline
2. Інтегрувати з DAGI Router
3. Додати RBAC фільтри
### Фаза 4: Інтеграція з ботом (1-2 дні)
1. Додати команди `/upload_doc`, `/ask_doc`
2. Тестування E2E
**Загальний час:** ~6-10 днів
---
## Залежності
### Python пакети
- `haystack-ai>=2.0.0`
- `sentence-transformers>=2.2.0`
- `pgvector>=0.2.0`
- `psycopg2-binary>=2.9.0`
### Системні залежності
- PostgreSQL з pgvector (вже є в city-db)
---
## Посилання
- [PARSER Agent Documentation](./docs/agents/parser.md)
- [TODO: PARSER Implementation](./TODO-PARSER-RAG.md)
- [Haystack Documentation](https://docs.haystack.deepset.ai/)

View File

@@ -90,12 +90,23 @@ async def parse_document_endpoint(
# Parse document from images
logger.info(f"Parsing document: {len(images)} page(s), mode: {output_mode}")
parsed_doc = parse_document_from_images(
images=images,
output_mode=output_mode,
doc_id=doc_id or str(uuid.uuid4()),
doc_type=doc_type
)
# Check if using Ollama (async) or local model (sync)
from app.core.config import settings
if settings.RUNTIME_TYPE == "ollama":
from app.runtime.inference import parse_document_with_ollama
parsed_doc = await parse_document_with_ollama(
images=images,
output_mode=output_mode,
doc_id=doc_id or str(uuid.uuid4()),
doc_type=doc_type
)
else:
parsed_doc = parse_document_from_images(
images=images,
output_mode=output_mode,
doc_id=doc_id or str(uuid.uuid4()),
doc_type=doc_type
)
# Build response based on output_mode
response_data = {"metadata": {

View File

@@ -37,9 +37,12 @@ class Settings(BaseSettings):
ALLOW_DUMMY_FALLBACK: bool = os.getenv("ALLOW_DUMMY_FALLBACK", "true").lower() == "true"
# Runtime
RUNTIME_TYPE: Literal["local", "remote"] = os.getenv("RUNTIME_TYPE", "local")
RUNTIME_TYPE: Literal["local", "remote", "ollama"] = os.getenv("RUNTIME_TYPE", "local")
RUNTIME_URL: str = os.getenv("RUNTIME_URL", "http://parser-runtime:11435")
# Ollama configuration (if RUNTIME_TYPE=ollama)
OLLAMA_BASE_URL: str = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
class Config:
env_file = ".env"
case_sensitive = True

View File

@@ -16,11 +16,94 @@ from app.runtime.preprocessing import (
)
from app.runtime.postprocessing import build_parsed_document
from app.runtime.model_output_parser import parse_model_output_to_blocks
from app.runtime.ollama_client import (
call_ollama_vision, parse_ollama_response, OutputMode as OllamaOutputMode
)
from app.core.config import settings
logger = logging.getLogger(__name__)
async def parse_document_with_ollama(
images: List[Image.Image],
output_mode: Literal["raw_json", "markdown", "qa_pairs", "chunks"] = "raw_json",
doc_id: Optional[str] = None,
doc_type: Literal["pdf", "image"] = "image"
) -> ParsedDocument:
"""
Parse document using Ollama API
Args:
images: List of PIL Images
output_mode: Output format mode
doc_id: Document ID
doc_type: Document type
Returns:
ParsedDocument
"""
import io
# Convert output_mode to Ollama format
ollama_mode_map = {
"raw_json": OllamaOutputMode.raw_json,
"markdown": OllamaOutputMode.markdown,
"qa_pairs": OllamaOutputMode.qa_pairs,
"chunks": OllamaOutputMode.raw_json # Use raw_json for chunks, will be processed later
}
ollama_mode = ollama_mode_map.get(output_mode, OllamaOutputMode.raw_json)
pages_data = []
for idx, image in enumerate(images, start=1):
try:
# Convert image to PNG bytes
buf = io.BytesIO()
image.convert("RGB").save(buf, format="PNG")
png_bytes = buf.getvalue()
# Call Ollama
ollama_data = await call_ollama_vision(png_bytes, ollama_mode)
raw_text, parsed_json = parse_ollama_response(ollama_data, ollama_mode)
logger.debug(f"Ollama output for page {idx}: {raw_text[:100]}...")
# Parse into blocks
if parsed_json and isinstance(parsed_json, dict):
# Use structured JSON if available
blocks = parsed_json.get("blocks", [])
if not blocks:
# Fallback: create block from raw text
blocks = [{
"type": "paragraph",
"text": raw_text,
"bbox": {"x": 0, "y": 0, "width": image.width, "height": image.height},
"reading_order": 1
}]
else:
# Parse plain text output
blocks = parse_model_output_to_blocks(raw_text, image.size, page_num=idx)
pages_data.append({
"blocks": blocks,
"width": image.width,
"height": image.height
})
logger.info(f"Processed page {idx}/{len(images)} via Ollama")
except Exception as e:
logger.error(f"Error processing page {idx} with Ollama: {e}", exc_info=True)
continue
return build_parsed_document(
pages_data=pages_data,
doc_id=doc_id or "parsed-doc",
doc_type=doc_type,
metadata={"model": settings.PARSER_MODEL_NAME, "runtime": "ollama"}
)
def parse_document_from_images(
images: List[Image.Image],
output_mode: Literal["raw_json", "markdown", "qa_pairs", "chunks"] = "raw_json",
@@ -44,7 +127,12 @@ def parse_document_from_images(
logger.info("Using dummy parser (USE_DUMMY_PARSER=true)")
return dummy_parse_document_from_images(images, doc_id, doc_type)
# Try to get model
# Check if using Ollama runtime
if settings.RUNTIME_TYPE == "ollama":
logger.info("Using Ollama runtime")
return await parse_document_with_ollama(images, output_mode, doc_id, doc_type)
# Try to get local model
model = get_model()
if model is None:

View File

@@ -0,0 +1,127 @@
"""
Ollama client for dots.ocr model
Alternative runtime using Ollama API
"""
import base64
import json
import logging
from typing import Dict, Any, Optional
from enum import Enum
import httpx
from app.core.config import settings
logger = logging.getLogger(__name__)
class OutputMode(str, Enum):
raw_json = "raw_json"
markdown = "markdown"
qa_pairs = "qa_pairs"
def build_prompt(mode: OutputMode) -> str:
"""Build prompt for Ollama based on output mode"""
if mode == OutputMode.raw_json:
return (
"You are a document OCR and layout parser. "
"Extract all text, tables, formulas, and layout into a clean JSON structure with fields like "
"`blocks`, `tables`, `reading_order`, including bounding boxes and page numbers. "
"Respond with JSON only, no explanations."
)
elif mode == OutputMode.markdown:
return (
"You are a document OCR and layout parser. "
"Extract the document as Markdown, preserving headings, paragraphs, and tables. "
"Tables should be proper GitHub-flavored Markdown tables. "
"Respond with Markdown as plain text."
)
elif mode == OutputMode.qa_pairs:
return (
"You are a document OCR and knowledge extraction assistant. "
"Read the document and output a JSON array of Q&A pairs covering the key information. "
"Each item should be {\"question\": ..., \"answer\": ..., \"page\": ..., \"section\": ...}. "
"Respond with JSON only, no explanations."
)
return "You are a document OCR assistant. Extract text."
async def call_ollama_vision(
image_bytes: bytes,
mode: OutputMode,
model_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Call Ollama vision API with image
Args:
image_bytes: PNG image bytes
mode: Output mode
model_name: Model name (defaults to PARSER_MODEL_NAME)
Returns:
Ollama response dictionary
"""
model_name = model_name or settings.PARSER_MODEL_NAME
# Encode image to base64
img_b64 = base64.b64encode(image_bytes).decode("ascii")
prompt = build_prompt(mode)
body = {
"model": model_name,
"prompt": prompt,
"images": [img_b64],
"stream": False,
}
url = f"{settings.OLLAMA_BASE_URL.rstrip('/')}/api/generate"
logger.info(f"Calling Ollama: {url}, model: {model_name}, mode: {mode}")
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(url, json=body)
resp.raise_for_status()
data = resp.json()
logger.debug(f"Ollama response: {data.get('response', '')[:100]}...")
return data
except httpx.HTTPError as e:
logger.error(f"Ollama HTTP error: {e}")
raise RuntimeError(f"Ollama API error: {e}") from e
except Exception as e:
logger.error(f"Ollama error: {e}", exc_info=True)
raise RuntimeError(f"Failed to call Ollama: {e}") from e
def parse_ollama_response(
ollama_data: Dict[str, Any],
mode: OutputMode
) -> tuple[str, Optional[Dict[str, Any]]]:
"""
Parse Ollama response
Args:
ollama_data: Response from Ollama API
mode: Output mode
Returns:
Tuple of (raw_text, parsed_json)
"""
raw_text = ollama_data.get("response", "").strip()
parsed_json: Optional[Dict[str, Any]] = None
# Try to parse as JSON for raw_json and qa_pairs modes
if mode in (OutputMode.raw_json, OutputMode.qa_pairs):
try:
parsed_json = json.loads(raw_text)
except (json.JSONDecodeError, ValueError):
logger.warning(f"Failed to parse response as JSON for mode {mode}")
parsed_json = None
return raw_text, parsed_json

View File

@@ -23,5 +23,5 @@ python-dotenv>=1.0.1
# Testing
pytest>=7.4.0
pytest-asyncio>=0.21.0
httpx>=0.25.0 # For TestClient
httpx>=0.25.0 # For TestClient and Ollama client