Files
microdao-daarion/rag_client.py
Apple 382e661f1f feat: complete RAG pipeline integration (ingest + query + Memory)
Parser Service:
- Add /ocr/ingest endpoint (PARSER → RAG in one call)
- Add RAG_BASE_URL and RAG_TIMEOUT to config
- Add OcrIngestResponse schema
- Create file_converter utility for PDF/image → PNG bytes
- Endpoint accepts file, dao_id, doc_id, user_id
- Automatically parses with dots.ocr and sends to RAG Service

Router Integration:
- Add _handle_rag_query() method in RouterApp
- Combines Memory + RAG → LLM pipeline
- Get Memory context (facts, events, summaries)
- Query RAG Service for documents
- Build prompt with Memory + RAG documents
- Call LLM provider with combined context
- Return answer with citations

Clients:
- Create rag_client.py for Router (query RAG Service)
- Create memory_client.py for Router (get Memory context)

E2E Tests:
- Create e2e_rag_pipeline.sh script for full pipeline test
- Test ingest → query → router query flow
- Add E2E_RAG_README.md with usage examples

Docker:
- Add RAG_SERVICE_URL and MEMORY_SERVICE_URL to router environment
2025-11-16 05:02:14 -08:00

75 lines
2.1 KiB
Python

"""
RAG Service Client for Router
Used to query RAG Service for document retrieval
"""
import os
import logging
from typing import Optional, Dict, Any
import httpx
logger = logging.getLogger(__name__)
RAG_SERVICE_URL = os.getenv("RAG_SERVICE_URL", "http://rag-service:9500")
class RAGClient:
"""Client for RAG Service"""
def __init__(self, base_url: str = RAG_SERVICE_URL):
self.base_url = base_url.rstrip("/")
self.timeout = 30.0
async def query(
self,
dao_id: str,
question: str,
top_k: Optional[int] = None,
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Query RAG Service for answer and documents
Args:
dao_id: DAO identifier
question: User question
top_k: Number of documents to retrieve
user_id: Optional user identifier
Returns:
Dictionary with answer, citations, and documents
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
f"{self.base_url}/query",
json={
"dao_id": dao_id,
"question": question,
"top_k": top_k,
"user_id": user_id
}
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
logger.error(f"RAG query failed: {e}")
return {
"answer": "Помилка при запиті до бази знань.",
"citations": [],
"documents": []
}
except Exception as e:
logger.error(f"RAG query error: {e}", exc_info=True)
return {
"answer": "Помилка при запиті до бази знань.",
"citations": [],
"documents": []
}
# Global client instance
rag_client = RAGClient()