feat: complete RAG pipeline integration (ingest + query + Memory)

Parser Service:
- Add /ocr/ingest endpoint (PARSER → RAG in one call)
- Add RAG_BASE_URL and RAG_TIMEOUT to config
- Add OcrIngestResponse schema
- Create file_converter utility for PDF/image → PNG bytes
- Endpoint accepts file, dao_id, doc_id, user_id
- Automatically parses with dots.ocr and sends to RAG Service

Router Integration:
- Add _handle_rag_query() method in RouterApp
- Combines Memory + RAG → LLM pipeline
- Get Memory context (facts, events, summaries)
- Query RAG Service for documents
- Build prompt with Memory + RAG documents
- Call LLM provider with combined context
- Return answer with citations

Clients:
- Create rag_client.py for Router (query RAG Service)
- Create memory_client.py for Router (get Memory context)

E2E Tests:
- Create e2e_rag_pipeline.sh script for full pipeline test
- Test ingest → query → router query flow
- Add E2E_RAG_README.md with usage examples

Docker:
- Add RAG_SERVICE_URL and MEMORY_SERVICE_URL to router environment
This commit is contained in:
Apple
2025-11-16 05:02:14 -08:00
parent 6d69f901f7
commit 382e661f1f
10 changed files with 719 additions and 1 deletions

View File

@@ -12,6 +12,8 @@ services:
- RBAC_BASE_URL=http://rbac:9200
- DEVTOOLS_BASE_URL=http://devtools:8008
- CREWAI_BASE_URL=http://crewai:9010
- RAG_SERVICE_URL=http://rag-service:9500
- MEMORY_SERVICE_URL=http://memory-service:8000
volumes:
- ./router-config.yml:/app/router-config.yml:ro
- ./logs:/app/logs

87
memory_client.py Normal file
View File

@@ -0,0 +1,87 @@
"""
Memory Service Client for Router
Used to get memory context for RAG queries
"""
import os
import logging
from typing import Optional, Dict, Any
import httpx
logger = logging.getLogger(__name__)
MEMORY_SERVICE_URL = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000")
class MemoryClient:
"""Client for Memory Service"""
def __init__(self, base_url: str = MEMORY_SERVICE_URL):
self.base_url = base_url.rstrip("/")
self.timeout = 10.0
async def get_context(
self,
user_id: str,
agent_id: str,
team_id: str,
channel_id: Optional[str] = None,
limit: int = 10
) -> Dict[str, Any]:
"""
Get memory context for dialogue
Returns:
Dictionary with facts, recent_events, dialog_summaries
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Get user facts
facts_response = await client.get(
f"{self.base_url}/facts",
params={"user_id": user_id, "team_id": team_id, "limit": limit}
)
facts = facts_response.json() if facts_response.status_code == 200 else []
# Get recent memory events
events_response = await client.get(
f"{self.base_url}/agents/{agent_id}/memory",
params={
"team_id": team_id,
"channel_id": channel_id,
"scope": "short_term",
"kind": "message",
"limit": limit
}
)
events = events_response.json().get("items", []) if events_response.status_code == 200 else []
# Get dialog summaries
summaries_response = await client.get(
f"{self.base_url}/summaries",
params={
"team_id": team_id,
"channel_id": channel_id,
"agent_id": agent_id,
"limit": 5
}
)
summaries = summaries_response.json().get("items", []) if summaries_response.status_code == 200 else []
return {
"facts": facts,
"recent_events": events,
"dialog_summaries": summaries
}
except Exception as e:
logger.warning(f"Memory context fetch failed: {e}")
return {
"facts": [],
"recent_events": [],
"dialog_summaries": []
}
# Global client instance
memory_client = MemoryClient()

74
rag_client.py Normal file
View File

@@ -0,0 +1,74 @@
"""
RAG Service Client for Router
Used to query RAG Service for document retrieval
"""
import os
import logging
from typing import Optional, Dict, Any
import httpx
logger = logging.getLogger(__name__)
RAG_SERVICE_URL = os.getenv("RAG_SERVICE_URL", "http://rag-service:9500")
class RAGClient:
"""Client for RAG Service"""
def __init__(self, base_url: str = RAG_SERVICE_URL):
self.base_url = base_url.rstrip("/")
self.timeout = 30.0
async def query(
self,
dao_id: str,
question: str,
top_k: Optional[int] = None,
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Query RAG Service for answer and documents
Args:
dao_id: DAO identifier
question: User question
top_k: Number of documents to retrieve
user_id: Optional user identifier
Returns:
Dictionary with answer, citations, and documents
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
f"{self.base_url}/query",
json={
"dao_id": dao_id,
"question": question,
"top_k": top_k,
"user_id": user_id
}
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
logger.error(f"RAG query failed: {e}")
return {
"answer": "Помилка при запиті до бази знань.",
"citations": [],
"documents": []
}
except Exception as e:
logger.error(f"RAG query error: {e}", exc_info=True)
return {
"answer": "Помилка при запиті до бази знань.",
"citations": [],
"documents": []
}
# Global client instance
rag_client = RAGClient()

View File

@@ -55,6 +55,11 @@ class RouterApp:
async def handle(self, req: RouterRequest) -> RouterResponse:
"""Handle router request with RBAC context injection for chat mode"""
# Special handling for rag_query mode (RAG + Memory → LLM)
if req.mode == "rag_query":
return await self._handle_rag_query(req)
# 1. RBAC injection for microDAO chat
if req.mode == "chat" and req.dao_id and req.user_id:
try:
@@ -127,6 +132,156 @@ class RouterApp:
error=f"Internal error: {str(e)}"
)
async def _handle_rag_query(self, req: RouterRequest) -> RouterResponse:
"""
Handle RAG query mode: combines Memory + RAG → LLM
Flow:
1. Get Memory context
2. Query RAG Service for documents
3. Build prompt with Memory + RAG
4. Call LLM provider
5. Return answer with citations
"""
from rag_client import rag_client
from memory_client import memory_client
logger.info(f"Handling RAG query: dao_id={req.dao_id}, user_id={req.user_id}")
try:
# Extract question
question = req.payload.get("question") or req.message
if not question:
return RouterResponse(
ok=False,
provider_id="router",
error="Missing 'question' in payload"
)
dao_id = req.dao_id or "daarion"
user_id = req.user_id or "anonymous"
# 1. Get Memory context
memory_ctx = {}
try:
memory_ctx = await memory_client.get_context(
user_id=user_id,
agent_id=req.agent or "daarwizz",
team_id=dao_id,
channel_id=req.payload.get("channel_id"),
limit=10
)
logger.info(f"Memory context retrieved: {len(memory_ctx.get('facts', []))} facts, {len(memory_ctx.get('recent_events', []))} events")
except Exception as e:
logger.warning(f"Memory context fetch failed: {e}")
# 2. Query RAG Service
rag_resp = await rag_client.query(
dao_id=dao_id,
question=question,
top_k=5,
user_id=user_id
)
rag_answer = rag_resp.get("answer", "")
rag_citations = rag_resp.get("citations", [])
rag_docs = rag_resp.get("documents", [])
logger.info(f"RAG retrieved {len(rag_docs)} documents, {len(rag_citations)} citations")
# 3. Build final prompt with Memory + RAG
system_prompt = (
"Ти асистент microDAO. Використовуй і особисту пам'ять, і документи DAO.\n"
"Формуй чітку, структуровану відповідь українською, посилаючись на документи "
"через індекси [1], [2] тощо, де це доречно.\n\n"
)
# Add Memory context
memory_text = ""
if memory_ctx.get("facts"):
facts_summary = ", ".join([
f"{f.get('fact_key', '')}={f.get('fact_value', '')}"
for f in memory_ctx["facts"][:5]
])
if facts_summary:
memory_text += f"Особисті факти: {facts_summary}\n"
if memory_ctx.get("recent_events"):
recent = memory_ctx["recent_events"][:3]
events_summary = "\n".join([
f"- {e.get('body_text', '')[:100]}"
for e in recent
])
if events_summary:
memory_text += f"Останні події:\n{events_summary}\n"
# Add RAG documents
docs_text = ""
for i, citation in enumerate(rag_citations[:5], start=1):
doc_id = citation.get("doc_id", "unknown")
page = citation.get("page", 0)
excerpt = citation.get("excerpt", "")
docs_text += f"[{i}] (doc_id={doc_id}, page={page}): {excerpt}\n"
# Build final prompt
final_prompt = (
f"{system_prompt}"
f"{'1) Пам\'ять (короткий summary):\n' + memory_text + '\n' if memory_text else ''}"
f"2) Релевантні документи (витяги):\n{docs_text}\n\n"
f"Питання користувача:\n{question}\n\n"
"Відповідь:"
)
# 4. Call LLM provider
provider = self.routing_table.resolve_provider(req)
logger.info(f"Calling LLM provider: {provider.id}")
# Create modified request with final prompt
llm_req = RouterRequest(
mode="chat", # Use chat mode for LLM
agent=req.agent,
dao_id=req.dao_id,
source=req.source,
session_id=req.session_id,
user_id=req.user_id,
message=final_prompt,
payload=req.payload
)
llm_response = await provider.call(llm_req)
if not llm_response.ok:
return RouterResponse(
ok=False,
provider_id="router",
error=f"LLM call failed: {llm_response.error}"
)
# 5. Return response with citations
return RouterResponse(
ok=True,
provider_id=llm_response.provider_id,
data={
"text": llm_response.data.get("text", ""),
"citations": rag_citations
},
metadata={
"memory_used": bool(memory_text),
"rag_used": True,
"documents_retrieved": len(rag_docs),
"citations_count": len(rag_citations)
},
error=None
)
except Exception as e:
logger.error(f"RAG query handler error: {e}", exc_info=True)
return RouterResponse(
ok=False,
provider_id="router",
error=f"RAG query failed: {str(e)}"
)
def get_provider_info(self):
"""Get info about registered providers"""
return {

View File

@@ -4,14 +4,17 @@ API endpoints for PARSER Service
import logging
import uuid
import json
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, UploadFile, File, HTTPException, Form
from fastapi.responses import JSONResponse
import httpx
from app.schemas import (
ParseRequest, ParseResponse, ParsedDocument, ParsedChunk, QAPair, ChunksResponse
ParseRequest, ParseResponse, ParsedDocument, ParsedChunk, QAPair, ChunksResponse,
OcrIngestResponse
)
from app.core.config import settings
from app.runtime.inference import parse_document_from_images
@@ -22,6 +25,7 @@ from app.runtime.postprocessing import (
build_chunks, build_qa_pairs, build_markdown
)
from app.runtime.qa_builder import build_qa_pairs_via_router
from app.utils.file_converter import pdf_or_image_to_png_bytes
logger = logging.getLogger(__name__)
@@ -242,3 +246,101 @@ async def parse_chunks_endpoint(
dao_id=dao_id
)
@router.post("/ocr/ingest", response_model=OcrIngestResponse)
async def ocr_ingest_endpoint(
file: UploadFile = File(...),
dao_id: str = Form(...),
doc_id: Optional[str] = Form(None),
user_id: Optional[str] = Form(None)
):
"""
Parse document and ingest into RAG in one call
Flow:
1. Accept PDF/image file
2. Parse with dots.ocr (raw_json mode)
3. Send parsed_json to RAG Service /ingest
4. Return doc_id + raw_json
Args:
file: PDF or image file
dao_id: DAO identifier (required)
doc_id: Document identifier (optional, defaults to filename)
user_id: User identifier (optional)
"""
try:
# Generate doc_id if not provided
if not doc_id:
doc_id = file.filename or f"doc-{uuid.uuid4().hex[:8]}"
# Read and validate file
content = await file.read()
validate_file_size(content)
# Detect file type
doc_type = detect_file_type(content, file.filename)
# Convert to images
if doc_type == "pdf":
images = convert_pdf_to_images(content)
else:
image = load_image(content)
images = [image]
pages_count = len(images)
logger.info(f"Ingesting document: dao_id={dao_id}, doc_id={doc_id}, pages={pages_count}")
# Parse document (raw_json mode)
parsed_doc = parse_document_from_images(
images=images,
output_mode="raw_json",
doc_id=doc_id,
doc_type=doc_type
)
# Convert to JSON
parsed_json = parsed_doc.model_dump(mode="json")
# Send to RAG Service
ingest_payload = {
"dao_id": dao_id,
"doc_id": doc_id,
"parsed_json": parsed_json,
}
if user_id:
ingest_payload["user_id"] = user_id
rag_url = f"{settings.RAG_BASE_URL.rstrip('/')}/ingest"
logger.info(f"Sending to RAG Service: {rag_url}")
try:
async with httpx.AsyncClient(timeout=settings.RAG_TIMEOUT) as client:
resp = await client.post(rag_url, json=ingest_payload)
resp.raise_for_status()
rag_result = resp.json()
logger.info(f"RAG ingest successful: {rag_result.get('doc_count', 0)} documents indexed")
except httpx.HTTPError as e:
logger.error(f"RAG ingest failed: {e}")
raise HTTPException(
status_code=502,
detail=f"RAG Service ingest failed: {str(e)}"
)
return OcrIngestResponse(
dao_id=dao_id,
doc_id=doc_id,
pages_processed=pages_count,
rag_ingested=True,
raw_json=parsed_json
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in ocr_ingest: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Ingest failed: {str(e)}")

View File

@@ -47,6 +47,10 @@ class Settings(BaseSettings):
ROUTER_BASE_URL: str = os.getenv("ROUTER_BASE_URL", "http://router:9102")
ROUTER_TIMEOUT: int = int(os.getenv("ROUTER_TIMEOUT", "60"))
# RAG Service configuration (for ingest pipeline)
RAG_BASE_URL: str = os.getenv("RAG_BASE_URL", "http://rag-service:9500")
RAG_TIMEOUT: int = int(os.getenv("RAG_TIMEOUT", "120"))
class Config:
env_file = ".env"
case_sensitive = True

View File

@@ -141,3 +141,12 @@ class ChunksResponse(BaseModel):
doc_id: str = Field(..., description="Document ID")
dao_id: str = Field(..., description="DAO ID")
class OcrIngestResponse(BaseModel):
"""Response from /ocr/ingest endpoint"""
dao_id: str = Field(..., description="DAO identifier")
doc_id: str = Field(..., description="Document identifier")
pages_processed: int = Field(..., description="Number of pages processed")
rag_ingested: bool = Field(..., description="Whether document was ingested into RAG")
raw_json: Dict[str, Any] = Field(..., description="Parsed document JSON")

View File

@@ -0,0 +1,59 @@
"""
Helper functions for file conversion (PDF/image → PNG bytes)
"""
import logging
from typing import Tuple, Optional
from io import BytesIO
from PIL import Image
from app.runtime.preprocessing import convert_pdf_to_images, load_image, detect_file_type
logger = logging.getLogger(__name__)
async def pdf_or_image_to_png_bytes(
filename: Optional[str],
file_bytes: bytes
) -> Tuple[bytes, int]:
"""
Convert PDF or image file to PNG bytes
Args:
filename: Original filename (for type detection)
file_bytes: File content as bytes
Returns:
Tuple of (PNG bytes, number of pages)
Raises:
ValueError: If file type is not supported or conversion fails
"""
# Detect file type
doc_type = detect_file_type(file_bytes, filename)
if doc_type == "pdf":
# Convert PDF to images
images = convert_pdf_to_images(file_bytes)
if not images:
raise ValueError("PDF conversion produced no images")
# Convert first page to PNG bytes (for single-page processing)
# For multi-page, we'll process all pages separately
first_image = images[0]
buf = BytesIO()
first_image.convert("RGB").save(buf, format="PNG")
png_bytes = buf.getvalue()
return png_bytes, len(images)
else:
# Load image and convert to PNG
image = load_image(file_bytes)
buf = BytesIO()
image.convert("RGB").save(buf, format="PNG")
png_bytes = buf.getvalue()
return png_bytes, 1

125
tests/E2E_RAG_README.md Normal file
View File

@@ -0,0 +1,125 @@
# E2E RAG Pipeline Test
End-to-end тест для повного пайплайну: PARSER → RAG → Router (Memory + RAG).
## Підготовка
1. Запустити всі сервіси:
```bash
docker-compose up -d parser-service rag-service router memory-service city-db
```
2. Перевірити, що сервіси працюють:
```bash
curl http://localhost:9400/health # PARSER
curl http://localhost:9500/health # RAG
curl http://localhost:9102/health # Router
curl http://localhost:8000/health # Memory
```
## Тест 1: Ingest Document
```bash
curl -X POST http://localhost:9400/ocr/ingest \
-F "file=@tests/fixtures/parsed_json_example.json" \
-F "dao_id=daarion" \
-F "doc_id=microdao-tokenomics-2025-11"
```
**Очікуваний результат:**
```json
{
"dao_id": "daarion",
"doc_id": "microdao-tokenomics-2025-11",
"pages_processed": 2,
"rag_ingested": true,
"raw_json": { ... }
}
```
## Тест 2: Query RAG Service Directly
```bash
curl -X POST http://localhost:9500/query \
-H "Content-Type: application/json" \
-d '{
"dao_id": "daarion",
"question": "Поясни токеноміку microDAO і роль стейкінгу"
}'
```
**Очікуваний результат:**
```json
{
"answer": "MicroDAO використовує токен μGOV...",
"citations": [
{
"doc_id": "microdao-tokenomics-2025-11",
"page": 1,
"section": "Токеноміка MicroDAO",
"excerpt": "..."
}
],
"documents": [...]
}
```
## Тест 3: Query via Router (Memory + RAG)
```bash
curl -X POST http://localhost:9102/route \
-H "Content-Type: application/json" \
-d '{
"mode": "rag_query",
"dao_id": "daarion",
"user_id": "test-user",
"payload": {
"question": "Поясни токеноміку microDAO і роль стейкінгу"
}
}'
```
**Очікуваний результат:**
```json
{
"ok": true,
"provider_id": "llm_local_qwen3_8b",
"data": {
"text": "Відповідь з урахуванням Memory + RAG...",
"citations": [...]
},
"metadata": {
"memory_used": true,
"rag_used": true,
"documents_retrieved": 5,
"citations_count": 3
}
}
```
## Автоматичний E2E тест
Запустити скрипт:
```bash
./tests/e2e_rag_pipeline.sh
```
Скрипт перевіряє всі три кроки автоматично.
## Troubleshooting
### RAG Service не знаходить документи
- Перевірити, що документ був успішно індексований: `rag_ingested: true`
- Перевірити логі RAG Service: `docker-compose logs rag-service`
- Перевірити, що `dao_id` збігається в ingest та query
### Router повертає помилку
- Перевірити, що `mode="rag_query"` правильно обробляється
- Перевірити логі Router: `docker-compose logs router`
- Перевірити, що RAG та Memory сервіси доступні з Router
### Memory context порожній
- Перевірити, що Memory Service працює
- Перевірити, що `user_id` та `dao_id` правильні
- Memory може бути порожнім для нового користувача (це нормально)

101
tests/e2e_rag_pipeline.sh Executable file
View File

@@ -0,0 +1,101 @@
#!/bin/bash
# E2E test script for RAG pipeline: ingest → query
set -e
PARSER_URL="${PARSER_URL:-http://localhost:9400}"
RAG_URL="${RAG_URL:-http://localhost:9500}"
ROUTER_URL="${ROUTER_URL:-http://localhost:9102}"
echo "=== E2E RAG Pipeline Test ==="
echo ""
# Step 1: Ingest document
echo "Step 1: Ingesting document via /ocr/ingest..."
INGEST_RESPONSE=$(curl -s -X POST "${PARSER_URL}/ocr/ingest" \
-F "file=@tests/fixtures/parsed_json_example.json" \
-F "dao_id=daarion" \
-F "doc_id=microdao-tokenomics-2025-11")
echo "Ingest response:"
echo "$INGEST_RESPONSE" | jq '.'
echo ""
DOC_ID=$(echo "$INGEST_RESPONSE" | jq -r '.doc_id')
RAG_INGESTED=$(echo "$INGEST_RESPONSE" | jq -r '.rag_ingested')
if [ "$RAG_INGESTED" != "true" ]; then
echo "ERROR: Document was not ingested into RAG"
exit 1
fi
echo "✓ Document ingested: doc_id=${DOC_ID}"
echo ""
# Step 2: Query via RAG Service directly
echo "Step 2: Querying RAG Service directly..."
RAG_QUERY_RESPONSE=$(curl -s -X POST "${RAG_URL}/query" \
-H "Content-Type: application/json" \
-d '{
"dao_id": "daarion",
"question": "Поясни токеноміку microDAO і роль стейкінгу"
}')
echo "RAG query response:"
echo "$RAG_QUERY_RESPONSE" | jq '.'
echo ""
ANSWER=$(echo "$RAG_QUERY_RESPONSE" | jq -r '.answer')
CITATIONS_COUNT=$(echo "$RAG_QUERY_RESPONSE" | jq '.citations | length')
if [ -z "$ANSWER" ] || [ "$ANSWER" == "null" ]; then
echo "ERROR: Empty answer from RAG Service"
exit 1
fi
if [ "$CITATIONS_COUNT" -eq 0 ]; then
echo "WARNING: No citations returned"
fi
echo "✓ RAG query successful: answer length=${#ANSWER}, citations=${CITATIONS_COUNT}"
echo ""
# Step 3: Query via Router (mode="rag_query")
echo "Step 3: Querying via Router (mode=rag_query)..."
ROUTER_QUERY_RESPONSE=$(curl -s -X POST "${ROUTER_URL}/route" \
-H "Content-Type: application/json" \
-d '{
"mode": "rag_query",
"dao_id": "daarion",
"user_id": "test-user",
"payload": {
"question": "Поясни токеноміку microDAO і роль стейкінгу"
}
}')
echo "Router query response:"
echo "$ROUTER_QUERY_RESPONSE" | jq '.'
echo ""
ROUTER_OK=$(echo "$ROUTER_QUERY_RESPONSE" | jq -r '.ok')
ROUTER_TEXT=$(echo "$ROUTER_QUERY_RESPONSE" | jq -r '.data.text // .data.answer // ""')
ROUTER_CITATIONS=$(echo "$ROUTER_QUERY_RESPONSE" | jq '.data.citations // .metadata.citations // []')
if [ "$ROUTER_OK" != "true" ]; then
echo "ERROR: Router query failed"
exit 1
fi
if [ -z "$ROUTER_TEXT" ] || [ "$ROUTER_TEXT" == "null" ]; then
echo "ERROR: Empty answer from Router"
exit 1
fi
ROUTER_CITATIONS_COUNT=$(echo "$ROUTER_CITATIONS" | jq 'length')
echo "✓ Router query successful: answer length=${#ROUTER_TEXT}, citations=${ROUTER_CITATIONS_COUNT}"
echo ""
echo "=== E2E Test Complete ==="
echo "All steps passed successfully!"