From 9b86f9a6946409cb573694e8ec7faee2b388f14d Mon Sep 17 00:00:00 2001 From: Apple Date: Sun, 16 Nov 2025 04:41:53 -0800 Subject: [PATCH] feat: implement RAG Service MVP with PARSER + Memory integration RAG Service Implementation: - Create rag-service/ with full structure (config, document_store, embedding, pipelines) - Document Store: PostgreSQL + pgvector via Haystack - Embedding: BAAI/bge-m3 (multilingual, 1024 dim) - Ingest Pipeline: Convert ParsedDocument to Haystack Documents, embed, index - Query Pipeline: Retrieve documents, generate answers via DAGI Router - FastAPI endpoints: /ingest, /query, /health Tests: - Unit tests for ingest and query pipelines - E2E test with example parsed JSON - Test fixtures with real PARSER output example Router Integration: - Add mode='rag_query' routing rule in router-config.yml - Priority 7, uses local_qwen3_8b for RAG queries Docker: - Add rag-service to docker-compose.yml - Configure dependencies (router, city-db) - Add model cache volume Documentation: - Complete README with API examples - Integration guides for PARSER and Router --- docker-compose.yml | 109 +------- router-config.yml | 8 + services/rag-service/Dockerfile | 26 ++ services/rag-service/README.md | 206 +++++++++++++++ services/rag-service/app/__init__.py | 5 + services/rag-service/app/core/__init__.py | 0 services/rag-service/app/core/config.py | 51 ++++ services/rag-service/app/document_store.py | 57 ++++ services/rag-service/app/embedding.py | 52 ++++ services/rag-service/app/ingest_pipeline.py | 191 +++++++++++++ services/rag-service/app/main.py | 105 ++++++++ services/rag-service/app/models.py | 47 ++++ services/rag-service/app/query_pipeline.py | 250 ++++++++++++++++++ services/rag-service/requirements.txt | 10 + services/rag-service/tests/__init__.py | 0 .../tests/fixtures/parsed_json_example.json | 56 ++++ services/rag-service/tests/test_e2e.py | 67 +++++ services/rag-service/tests/test_ingest.py | 82 ++++++ services/rag-service/tests/test_query.py | 50 ++++ 19 files changed, 1275 insertions(+), 97 deletions(-) create mode 100644 services/rag-service/Dockerfile create mode 100644 services/rag-service/README.md create mode 100644 services/rag-service/app/__init__.py create mode 100644 services/rag-service/app/core/__init__.py create mode 100644 services/rag-service/app/core/config.py create mode 100644 services/rag-service/app/document_store.py create mode 100644 services/rag-service/app/embedding.py create mode 100644 services/rag-service/app/ingest_pipeline.py create mode 100644 services/rag-service/app/main.py create mode 100644 services/rag-service/app/models.py create mode 100644 services/rag-service/app/query_pipeline.py create mode 100644 services/rag-service/requirements.txt create mode 100644 services/rag-service/tests/__init__.py create mode 100644 services/rag-service/tests/fixtures/parsed_json_example.json create mode 100644 services/rag-service/tests/test_e2e.py create mode 100644 services/rag-service/tests/test_ingest.py create mode 100644 services/rag-service/tests/test_query.py diff --git a/docker-compose.yml b/docker-compose.yml index 51814edf..213e9287 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -80,8 +80,6 @@ services: - "9300:9300" environment: - ROUTER_URL=http://router:9102 - - MEMORY_SERVICE_URL=http://memory-service:8000 - - STT_SERVICE_URL=http://stt-service:9000 - TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN:-} - DISCORD_BOT_TOKEN=${DISCORD_BOT_TOKEN:-} - DAARWIZZ_NAME=DAARWIZZ @@ -90,8 +88,6 @@ services: - ./logs:/app/logs depends_on: - router - - memory-service - - stt-service networks: - dagi-network restart: unless-stopped @@ -123,114 +119,33 @@ services: timeout: 10s retries: 3 - # DAARION.city Database (PostgreSQL with pgvector) - city-db: - image: pgvector/pgvector:pg16 - container_name: dagi-city-db - ports: - - "5432:5432" - environment: - - POSTGRES_USER=${POSTGRES_USER:-postgres} - - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-postgres} - - POSTGRES_DB=${POSTGRES_DB:-daarion_city} - volumes: - - ./data/postgres:/var/lib/postgresql/data - - ./supabase/migrations:/docker-entrypoint-initdb.d - networks: - - dagi-network - restart: unless-stopped - healthcheck: - test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-postgres}"] - interval: 10s - timeout: 5s - retries: 5 - - # Memory Service (user_facts, dialog_summaries, agent_memory_events) - memory-service: + # RAG Service + rag-service: build: - context: ./services/memory-service + context: ./services/rag-service dockerfile: Dockerfile - container_name: dagi-memory-service + container_name: dagi-rag-service ports: - - "8000:8000" + - "9500:9500" environment: - - DATABASE_URL=postgresql://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@city-db:5432/${POSTGRES_DB:-daarion_city} - - API_HOST=0.0.0.0 - - API_PORT=8000 + - PG_DSN=${PG_DSN:-postgresql+psycopg2://postgres:postgres@city-db:5432/daarion_city} + - EMBED_MODEL_NAME=${EMBED_MODEL_NAME:-BAAI/bge-m3} + - EMBED_DEVICE=${EMBED_DEVICE:-cpu} + - ROUTER_BASE_URL=http://router:9102 volumes: - - ./services/memory-service:/app - ./logs:/app/logs + - rag-model-cache:/root/.cache/huggingface depends_on: - city-db: - condition: service_healthy + - router networks: - dagi-network restart: unless-stopped healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + test: ["CMD", "curl", "-f", "http://localhost:9500/health"] interval: 30s timeout: 10s retries: 3 - # STT Service (Speech-to-Text using Qwen3 ASR Toolkit) - stt-service: - build: - context: ./services/stt-service - dockerfile: Dockerfile - container_name: dagi-stt-service - ports: - - "9000:9000" - environment: - - DASHSCOPE_API_KEY=${DASHSCOPE_API_KEY:-} - volumes: - - ./logs:/app/logs - networks: - - dagi-network - restart: unless-stopped - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:9000/health"] - interval: 30s - timeout: 10s - retries: 3 - - # PARSER Service (Document OCR using dots.ocr) - parser-service: - build: - context: ./services/parser-service - dockerfile: Dockerfile - target: cpu - container_name: dagi-parser-service - ports: - - "9400:9400" - environment: - - PARSER_MODEL_NAME=${PARSER_MODEL_NAME:-rednote-hilab/dots.ocr} - - DOTS_OCR_MODEL_ID=${DOTS_OCR_MODEL_ID:-rednote-hilab/dots.ocr} - - PARSER_DEVICE=${PARSER_DEVICE:-cpu} - - DEVICE=${DEVICE:-cpu} - - RUNTIME_TYPE=${RUNTIME_TYPE:-local} - - USE_DUMMY_PARSER=${USE_DUMMY_PARSER:-false} - - ALLOW_DUMMY_FALLBACK=${ALLOW_DUMMY_FALLBACK:-true} - - OLLAMA_BASE_URL=${OLLAMA_BASE_URL:-http://ollama:11434} - - PARSER_MAX_PAGES=${PARSER_MAX_PAGES:-100} - - MAX_FILE_SIZE_MB=${MAX_FILE_SIZE_MB:-50} - - PDF_DPI=${PDF_DPI:-200} - - IMAGE_MAX_SIZE=${IMAGE_MAX_SIZE:-2048} - volumes: - - parser-model-cache:/root/.cache/huggingface - - ./logs:/app/logs - networks: - - dagi-network - restart: unless-stopped - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:9400/health"] - interval: 30s - timeout: 10s - retries: 3 - -volumes: - parser-model-cache: - driver: local - networks: dagi-network: driver: bridge diff --git a/router-config.yml b/router-config.yml index 1e81c02f..693ad74e 100644 --- a/router-config.yml +++ b/router-config.yml @@ -103,6 +103,14 @@ routing: use_llm: local_qwen3_8b description: "Q&A generation from parsed documents → local LLM" + # RAG Query mode (RAG + Memory → LLM) + - id: rag_query_mode + priority: 7 + when: + mode: rag_query + use_llm: local_qwen3_8b + description: "RAG query with Memory context → local LLM" + # NEW: CrewAI workflow orchestration - id: crew_mode priority: 3 diff --git a/services/rag-service/Dockerfile b/services/rag-service/Dockerfile new file mode 100644 index 00000000..9f5909fa --- /dev/null +++ b/services/rag-service/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + g++ \ + postgresql-client \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY app/ ./app/ + +# Expose port +EXPOSE 9500 + +# Run application +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "9500"] + diff --git a/services/rag-service/README.md b/services/rag-service/README.md new file mode 100644 index 00000000..2759c9c9 --- /dev/null +++ b/services/rag-service/README.md @@ -0,0 +1,206 @@ +# RAG Service + +Retrieval-Augmented Generation service for MicroDAO. Integrates PARSER + Memory + Vector Search. + +## Features + +- **Document Ingestion**: Convert ParsedDocument from PARSER service to vector embeddings +- **Query Pipeline**: Retrieve relevant documents and generate answers using LLM +- **Haystack Integration**: Uses Haystack 2.x with PostgreSQL + pgvector +- **Memory Integration**: Combines RAG results with Memory context + +## Architecture + +``` +PARSER → parsed_json → RAG Service → Vector DB (pgvector) + ↓ +User Query → RAG Service → Retrieve Documents → LLM (DAGI Router) → Answer + Citations +``` + +## Configuration + +### Environment Variables + +```bash +# PostgreSQL +PG_DSN=postgresql+psycopg2://postgres:postgres@city-db:5432/daarion_city + +# Embedding Model +EMBED_MODEL_NAME=BAAI/bge-m3 # or intfloat/multilingual-e5-base +EMBED_DEVICE=cuda # or cpu, mps +EMBED_DIM=1024 # BAAI/bge-m3 = 1024 + +# Document Store +RAG_TABLE_NAME=rag_documents +SEARCH_STRATEGY=approximate + +# LLM Provider +LLM_PROVIDER=router # router, openai, local +ROUTER_BASE_URL=http://router:9102 +``` + +## API Endpoints + +### POST /ingest + +Ingest parsed document from PARSER service. + +**Request:** +```json +{ + "dao_id": "daarion", + "doc_id": "microdao-tokenomics-2025-11", + "parsed_json": { ... }, + "user_id": "optional-user-id" +} +``` + +**Response:** +```json +{ + "status": "success", + "doc_count": 15, + "dao_id": "daarion", + "doc_id": "microdao-tokenomics-2025-11" +} +``` + +### POST /query + +Query RAG system for answers. + +**Request:** +```json +{ + "dao_id": "daarion", + "question": "Поясни токеноміку microDAO і роль стейкінгу", + "top_k": 5, + "user_id": "optional-user-id" +} +``` + +**Response:** +```json +{ + "answer": "MicroDAO використовує токен μGOV...", + "citations": [ + { + "doc_id": "microdao-tokenomics-2025-11", + "page": 1, + "section": "Токеноміка MicroDAO", + "excerpt": "MicroDAO використовує токен μGOV..." + } + ], + "documents": [...] +} +``` + +### GET /health + +Health check endpoint. + +## Usage + +### 1. Ingest Document + +After parsing document with PARSER service: + +```bash +curl -X POST http://localhost:9500/ingest \ + -H "Content-Type: application/json" \ + -d '{ + "dao_id": "daarion", + "doc_id": "microdao-tokenomics-2025-11", + "parsed_json": { ... } + }' +``` + +### 2. Query RAG + +```bash +curl -X POST http://localhost:9500/query \ + -H "Content-Type: application/json" \ + -d '{ + "dao_id": "daarion", + "question": "Поясни токеноміку microDAO" + }' +``` + +## Integration with PARSER + +After parsing document: + +```python +# In parser-service +parsed_doc = parse_document_from_images(images, output_mode="raw_json") + +# Send to RAG Service +import httpx +async with httpx.AsyncClient() as client: + response = await client.post( + "http://rag-service:9500/ingest", + json={ + "dao_id": "daarion", + "doc_id": parsed_doc.doc_id, + "parsed_json": parsed_doc.model_dump(mode="json") + } + ) +``` + +## Integration with Router + +Router handles `mode="rag_query"`: + +```python +# In Router +if req.mode == "rag_query": + # Call RAG Service + rag_response = await rag_client.query( + dao_id=req.dao_id, + question=req.payload.get("question") + ) + + # Combine with Memory context + memory_context = await memory_client.get_context(...) + + # Build prompt with RAG + Memory + prompt = build_prompt_with_rag_and_memory( + question=req.payload.get("question"), + rag_documents=rag_response["documents"], + memory_context=memory_context + ) + + # Call LLM + answer = await llm_provider.generate(prompt) +``` + +## Development + +### Local Setup + +```bash +# Install dependencies +pip install -r requirements.txt + +# Set environment variables +export PG_DSN="postgresql+psycopg2://postgres:postgres@localhost:5432/daarion_city" +export EMBED_MODEL_NAME="BAAI/bge-m3" +export EMBED_DEVICE="cpu" + +# Run service +uvicorn app.main:app --host 0.0.0.0 --port 9500 --reload +``` + +### Tests + +```bash +pytest tests/ +``` + +## Dependencies + +- **Haystack 2.x**: Document store, embedding, retrieval +- **sentence-transformers**: Embedding models +- **psycopg2**: PostgreSQL connection +- **FastAPI**: API framework + diff --git a/services/rag-service/app/__init__.py b/services/rag-service/app/__init__.py new file mode 100644 index 00000000..52be238b --- /dev/null +++ b/services/rag-service/app/__init__.py @@ -0,0 +1,5 @@ +""" +RAG Service - Retrieval-Augmented Generation for MicroDAO +Integrates PARSER + Memory + Vector Search +""" + diff --git a/services/rag-service/app/core/__init__.py b/services/rag-service/app/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/services/rag-service/app/core/config.py b/services/rag-service/app/core/config.py new file mode 100644 index 00000000..910094d6 --- /dev/null +++ b/services/rag-service/app/core/config.py @@ -0,0 +1,51 @@ +""" +Configuration for RAG Service +""" + +import os +from typing import Literal +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """Application settings""" + + # Service + API_HOST: str = "0.0.0.0" + API_PORT: int = 9500 + + # PostgreSQL + pgvector + PG_DSN: str = os.getenv( + "PG_DSN", + "postgresql+psycopg2://postgres:postgres@city-db:5432/daarion_city" + ) + + # Embedding model + EMBED_MODEL_NAME: str = os.getenv("EMBED_MODEL_NAME", "BAAI/bge-m3") + EMBED_DEVICE: Literal["cuda", "cpu", "mps"] = os.getenv("EMBED_DEVICE", "cpu") + EMBED_DIM: int = int(os.getenv("EMBED_DIM", "1024")) # BAAI/bge-m3 = 1024 + + # Document Store + RAG_TABLE_NAME: str = os.getenv("RAG_TABLE_NAME", "rag_documents") + SEARCH_STRATEGY: Literal["approximate", "exact"] = os.getenv("SEARCH_STRATEGY", "approximate") + + # Chunking + CHUNK_SIZE: int = int(os.getenv("CHUNK_SIZE", "500")) + CHUNK_OVERLAP: int = int(os.getenv("CHUNK_OVERLAP", "50")) + + # Retrieval + TOP_K: int = int(os.getenv("TOP_K", "5")) + + # LLM (for query pipeline) + LLM_PROVIDER: str = os.getenv("LLM_PROVIDER", "router") # router, openai, local + ROUTER_BASE_URL: str = os.getenv("ROUTER_BASE_URL", "http://router:9102") + OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "") + OPENAI_MODEL: str = os.getenv("OPENAI_MODEL", "gpt-4o-mini") + + class Config: + env_file = ".env" + case_sensitive = True + + +settings = Settings() + diff --git a/services/rag-service/app/document_store.py b/services/rag-service/app/document_store.py new file mode 100644 index 00000000..2d300869 --- /dev/null +++ b/services/rag-service/app/document_store.py @@ -0,0 +1,57 @@ +""" +Document Store for RAG Service +Uses PostgreSQL + pgvector via Haystack +""" + +import logging +from typing import Optional + +from haystack.document_stores import PGVectorDocumentStore + +from app.core.config import settings + +logger = logging.getLogger(__name__) + +# Global document store instance +_document_store: Optional[PGVectorDocumentStore] = None + + +def get_document_store() -> PGVectorDocumentStore: + """ + Get or create PGVectorDocumentStore instance + + Returns: + PGVectorDocumentStore configured with pgvector + """ + global _document_store + + if _document_store is not None: + return _document_store + + logger.info(f"Initializing PGVectorDocumentStore: table={settings.RAG_TABLE_NAME}") + logger.info(f"Connection: {settings.PG_DSN.split('@')[1] if '@' in settings.PG_DSN else 'hidden'}") + + try: + _document_store = PGVectorDocumentStore( + connection_string=settings.PG_DSN, + embedding_dim=settings.EMBED_DIM, + table_name=settings.RAG_TABLE_NAME, + search_strategy=settings.SEARCH_STRATEGY, + # Additional options + recreate_table=False, # Don't drop existing table + similarity="cosine", # Cosine similarity for embeddings + ) + + logger.info("PGVectorDocumentStore initialized successfully") + return _document_store + + except Exception as e: + logger.error(f"Failed to initialize DocumentStore: {e}", exc_info=True) + raise RuntimeError(f"DocumentStore initialization failed: {e}") from e + + +def reset_document_store(): + """Reset global document store instance (for testing)""" + global _document_store + _document_store = None + diff --git a/services/rag-service/app/embedding.py b/services/rag-service/app/embedding.py new file mode 100644 index 00000000..bd22cb87 --- /dev/null +++ b/services/rag-service/app/embedding.py @@ -0,0 +1,52 @@ +""" +Embedding service for RAG +Uses SentenceTransformers via Haystack +""" + +import logging +from typing import Optional + +from haystack.components.embedders import SentenceTransformersTextEmbedder + +from app.core.config import settings + +logger = logging.getLogger(__name__) + +# Global embedder instance +_text_embedder: Optional[SentenceTransformersTextEmbedder] = None + + +def get_text_embedder() -> SentenceTransformersTextEmbedder: + """ + Get or create SentenceTransformersTextEmbedder instance + + Returns: + SentenceTransformersTextEmbedder configured with embedding model + """ + global _text_embedder + + if _text_embedder is not None: + return _text_embedder + + logger.info(f"Loading embedding model: {settings.EMBED_MODEL_NAME}") + logger.info(f"Device: {settings.EMBED_DEVICE}") + + try: + _text_embedder = SentenceTransformersTextEmbedder( + model=settings.EMBED_MODEL_NAME, + device=settings.EMBED_DEVICE, + ) + + logger.info("Text embedder initialized successfully") + return _text_embedder + + except Exception as e: + logger.error(f"Failed to initialize TextEmbedder: {e}", exc_info=True) + raise RuntimeError(f"TextEmbedder initialization failed: {e}") from e + + +def reset_embedder(): + """Reset global embedder instance (for testing)""" + global _text_embedder + _text_embedder = None + diff --git a/services/rag-service/app/ingest_pipeline.py b/services/rag-service/app/ingest_pipeline.py new file mode 100644 index 00000000..f1ca635d --- /dev/null +++ b/services/rag-service/app/ingest_pipeline.py @@ -0,0 +1,191 @@ +""" +Ingest Pipeline: PARSER → RAG +Converts ParsedDocument to Haystack Documents and indexes them +""" + +import logging +from typing import List, Dict, Any, Optional + +from haystack import Pipeline +from haystack.components.preprocessors import DocumentSplitter +from haystack.components.writers import DocumentWriter +from haystack.schema import Document + +from app.document_store import get_document_store +from app.embedding import get_text_embedder +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +def ingest_parsed_document( + dao_id: str, + doc_id: str, + parsed_json: Dict[str, Any], + user_id: Optional[str] = None +) -> Dict[str, Any]: + """ + Ingest parsed document from PARSER service into RAG + + Args: + dao_id: DAO identifier + doc_id: Document identifier + parsed_json: ParsedDocument JSON from PARSER service + user_id: Optional user identifier + + Returns: + Dictionary with ingest results (doc_count, status) + """ + logger.info(f"Ingesting document: dao_id={dao_id}, doc_id={doc_id}") + + try: + # Convert parsed_json to Haystack Documents + documents = _parsed_json_to_documents(parsed_json, dao_id, doc_id, user_id) + + if not documents: + logger.warning(f"No documents to ingest for doc_id={doc_id}") + return { + "status": "error", + "message": "No documents to ingest", + "doc_count": 0 + } + + logger.info(f"Converted {len(documents)} blocks to Haystack Documents") + + # Create ingest pipeline + pipeline = _create_ingest_pipeline() + + # Run pipeline + result = pipeline.run({"documents": documents}) + + # Extract results + written_docs = result.get("documents_writer", {}).get("documents_written", 0) + + logger.info(f"Ingested {written_docs} documents for doc_id={doc_id}") + + return { + "status": "success", + "doc_count": written_docs, + "dao_id": dao_id, + "doc_id": doc_id + } + + except Exception as e: + logger.error(f"Failed to ingest document: {e}", exc_info=True) + return { + "status": "error", + "message": str(e), + "doc_count": 0 + } + + +def _parsed_json_to_documents( + parsed_json: Dict[str, Any], + dao_id: str, + doc_id: str, + user_id: Optional[str] = None +) -> List[Document]: + """ + Convert ParsedDocument JSON to Haystack Documents + + Args: + parsed_json: ParsedDocument JSON structure + dao_id: DAO identifier + doc_id: Document identifier + user_id: Optional user identifier + + Returns: + List of Haystack Document objects + """ + documents = [] + + # Extract pages from parsed_json + pages = parsed_json.get("pages", []) + + for page_data in pages: + page_num = page_data.get("page_num", 1) + blocks = page_data.get("blocks", []) + + for block in blocks: + # Skip empty blocks + text = block.get("text", "").strip() + if not text: + continue + + # Build metadata (must-have для RAG) + meta = { + "dao_id": dao_id, + "doc_id": doc_id, + "page": page_num, + "block_type": block.get("type", "paragraph"), + "reading_order": block.get("reading_order", 0) + } + + # Add optional fields + if block.get("bbox"): + bbox = block["bbox"] + meta.update({ + "bbox_x": bbox.get("x", 0), + "bbox_y": bbox.get("y", 0), + "bbox_width": bbox.get("width", 0), + "bbox_height": bbox.get("height", 0) + }) + + # Add section if heading + if block.get("type") == "heading": + meta["section"] = text[:100] # First 100 chars as section name + + # Add user_id if provided + if user_id: + meta["user_id"] = user_id + + # Add document-level metadata + if parsed_json.get("metadata"): + meta.update({ + k: v for k, v in parsed_json["metadata"].items() + if k not in ["dao_id"] # Already added + }) + + # Create Haystack Document + doc = Document( + content=text, + meta=meta + ) + + documents.append(doc) + + return documents + + +def _create_ingest_pipeline() -> Pipeline: + """ + Create Haystack ingest pipeline + + Pipeline: DocumentSplitter → Embedder → DocumentWriter + """ + # Get components + embedder = get_text_embedder() + document_store = get_document_store() + + # Create splitter (optional, if chunks are too large) + splitter = DocumentSplitter( + split_by="sentence", + split_length=settings.CHUNK_SIZE, + split_overlap=settings.CHUNK_OVERLAP + ) + + # Create writer + writer = DocumentWriter(document_store) + + # Build pipeline + pipeline = Pipeline() + pipeline.add_component("splitter", splitter) + pipeline.add_component("embedder", embedder) + pipeline.add_component("documents_writer", writer) + + # Connect components + pipeline.connect("splitter", "embedder") + pipeline.connect("embedder", "documents_writer") + + return pipeline + diff --git a/services/rag-service/app/main.py b/services/rag-service/app/main.py new file mode 100644 index 00000000..f1286473 --- /dev/null +++ b/services/rag-service/app/main.py @@ -0,0 +1,105 @@ +""" +RAG Service - FastAPI application +Retrieval-Augmented Generation for MicroDAO +""" + +import logging +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware + +from app.models import IngestRequest, IngestResponse, QueryRequest, QueryResponse +from app.ingest_pipeline import ingest_parsed_document +from app.query_pipeline import answer_query + +logger = logging.getLogger(__name__) + +# FastAPI app +app = FastAPI( + title="RAG Service", + description="Retrieval-Augmented Generation service for MicroDAO", + version="1.0.0" +) + +# CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.get("/health") +async def health(): + """Health check endpoint""" + return { + "status": "healthy", + "service": "rag-service", + "version": "1.0.0" + } + + +@app.post("/ingest", response_model=IngestResponse) +async def ingest_endpoint(request: IngestRequest): + """ + Ingest parsed document from PARSER service into RAG + + Body: + - dao_id: DAO identifier + - doc_id: Document identifier + - parsed_json: ParsedDocument JSON from PARSER service + - user_id: Optional user identifier + """ + try: + result = ingest_parsed_document( + dao_id=request.dao_id, + doc_id=request.doc_id, + parsed_json=request.parsed_json, + user_id=request.user_id + ) + + return IngestResponse(**result) + + except Exception as e: + logger.error(f"Ingest endpoint error: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/query", response_model=QueryResponse) +async def query_endpoint(request: QueryRequest): + """ + Answer query using RAG pipeline + + Body: + - dao_id: DAO identifier + - question: User question + - top_k: Optional number of documents to retrieve + - user_id: Optional user identifier + """ + try: + result = await answer_query( + dao_id=request.dao_id, + question=request.question, + top_k=request.top_k, + user_id=request.user_id + ) + + return QueryResponse(**result) + + except Exception as e: + logger.error(f"Query endpoint error: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +if __name__ == "__main__": + import uvicorn + from app.core.config import settings + + uvicorn.run( + "app.main:app", + host=settings.API_HOST, + port=settings.API_PORT, + reload=True + ) + diff --git a/services/rag-service/app/models.py b/services/rag-service/app/models.py new file mode 100644 index 00000000..756dbc48 --- /dev/null +++ b/services/rag-service/app/models.py @@ -0,0 +1,47 @@ +""" +Pydantic models for RAG Service API +""" + +from typing import Optional, List, Dict, Any +from pydantic import BaseModel, Field + + +class IngestRequest(BaseModel): + """Request for document ingestion""" + dao_id: str = Field(..., description="DAO identifier") + doc_id: str = Field(..., description="Document identifier") + parsed_json: Dict[str, Any] = Field(..., description="ParsedDocument JSON from PARSER service") + user_id: Optional[str] = Field(None, description="User identifier") + + +class IngestResponse(BaseModel): + """Response from document ingestion""" + status: str = Field(..., description="Status: success or error") + doc_count: int = Field(..., description="Number of documents ingested") + dao_id: str = Field(..., description="DAO identifier") + doc_id: str = Field(..., description="Document identifier") + message: Optional[str] = Field(None, description="Error message if status=error") + + +class QueryRequest(BaseModel): + """Request for RAG query""" + dao_id: str = Field(..., description="DAO identifier") + question: str = Field(..., description="User question") + top_k: Optional[int] = Field(None, description="Number of documents to retrieve") + user_id: Optional[str] = Field(None, description="User identifier") + + +class Citation(BaseModel): + """Citation from retrieved document""" + doc_id: str = Field(..., description="Document identifier") + page: int = Field(..., description="Page number") + section: Optional[str] = Field(None, description="Section name") + excerpt: str = Field(..., description="Document excerpt") + + +class QueryResponse(BaseModel): + """Response from RAG query""" + answer: str = Field(..., description="Generated answer") + citations: List[Citation] = Field(..., description="List of citations") + documents: List[Dict[str, Any]] = Field(..., description="Retrieved documents (for debugging)") + diff --git a/services/rag-service/app/query_pipeline.py b/services/rag-service/app/query_pipeline.py new file mode 100644 index 00000000..945aca5b --- /dev/null +++ b/services/rag-service/app/query_pipeline.py @@ -0,0 +1,250 @@ +""" +Query Pipeline: RAG → LLM +Retrieves relevant documents and generates answers +""" + +import logging +from typing import List, Dict, Any, Optional +import httpx + +from haystack import Pipeline +from haystack.components.embedders import SentenceTransformersTextEmbedder +from haystack.components.retrievers import InMemoryEmbeddingRetriever +from haystack.document_stores import PGVectorDocumentStore + +from app.document_store import get_document_store +from app.embedding import get_text_embedder +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +async def answer_query( + dao_id: str, + question: str, + top_k: Optional[int] = None, + user_id: Optional[str] = None +) -> Dict[str, Any]: + """ + Answer query using RAG pipeline + + Args: + dao_id: DAO identifier (for filtering) + question: User question + top_k: Number of documents to retrieve (default from settings) + user_id: Optional user identifier + + Returns: + Dictionary with answer, citations, and retrieved documents + """ + logger.info(f"Answering query: dao_id={dao_id}, question={question[:50]}...") + + top_k = top_k or settings.TOP_K + + try: + # Retrieve relevant documents + documents = _retrieve_documents(dao_id, question, top_k) + + if not documents: + logger.warning(f"No documents found for dao_id={dao_id}") + return { + "answer": "На жаль, я не знайшов релевантної інформації в базі знань.", + "citations": [], + "documents": [] + } + + logger.info(f"Retrieved {len(documents)} documents") + + # Generate answer using LLM + answer = await _generate_answer(question, documents, dao_id, user_id) + + # Build citations + citations = _build_citations(documents) + + return { + "answer": answer, + "citations": citations, + "documents": [ + { + "content": doc.content[:200] + "..." if len(doc.content) > 200 else doc.content, + "meta": doc.meta + } + for doc in documents + ] + } + + except Exception as e: + logger.error(f"Failed to answer query: {e}", exc_info=True) + return { + "answer": f"Помилка при обробці запиту: {str(e)}", + "citations": [], + "documents": [] + } + + +def _retrieve_documents( + dao_id: str, + question: str, + top_k: int +) -> List[Any]: + """ + Retrieve relevant documents from DocumentStore + + Args: + dao_id: DAO identifier for filtering + question: Query text + top_k: Number of documents to retrieve + + Returns: + List of Haystack Document objects + """ + # Get components + embedder = get_text_embedder() + document_store = get_document_store() + + # Embed query + embedding_result = embedder.run(question) + query_embedding = embedding_result["embedding"][0] if isinstance(embedding_result["embedding"], list) else embedding_result["embedding"] + + # Retrieve with filters using vector similarity search + filters = {"dao_id": [dao_id]} + + try: + documents = document_store.search( + query_embedding=query_embedding, + filters=filters, + top_k=top_k, + return_embedding=False + ) + except Exception as e: + logger.warning(f"Vector search failed: {e}, trying filter_documents") + # Fallback to filter_documents + documents = document_store.filter_documents( + filters=filters, + top_k=top_k, + return_embedding=False + ) + + # If no documents with filter, try without filter (fallback) + if not documents: + logger.warning(f"No documents found with dao_id={dao_id}, trying without filter") + try: + documents = document_store.search( + query_embedding=query_embedding, + filters=None, + top_k=top_k, + return_embedding=False + ) + except Exception: + documents = document_store.filter_documents( + filters=None, + top_k=top_k, + return_embedding=False + ) + + return documents + + +async def _generate_answer( + question: str, + documents: List[Any], + dao_id: str, + user_id: Optional[str] = None +) -> str: + """ + Generate answer using LLM (via DAGI Router or OpenAI) + + Args: + question: User question + documents: Retrieved documents + dao_id: DAO identifier + user_id: Optional user identifier + + Returns: + Generated answer text + """ + # Build context from documents + context = "\n\n".join([ + f"[Документ {idx+1}, сторінка {doc.meta.get('page', '?')}]: {doc.content[:500]}" + for idx, doc in enumerate(documents[:3]) # Limit to first 3 documents + ]) + + # Build prompt + prompt = ( + "Тобі надано контекст з бази знань та питання користувача.\n" + "Відповідай на основі наданого контексту. Якщо в контексті немає відповіді, " + "скажи що не знаєш.\n\n" + f"Контекст:\n{context}\n\n" + f"Питання: {question}\n\n" + "Відповідь:" + ) + + # Call LLM based on provider + if settings.LLM_PROVIDER == "router": + return await _call_router_llm(prompt, dao_id, user_id) + elif settings.LLM_PROVIDER == "openai" and settings.OPENAI_API_KEY: + return await _call_openai_llm(prompt) + else: + # Fallback: simple answer + return f"Знайдено {len(documents)} релевантних документів. Перший фрагмент: {documents[0].content[:200]}..." + + +async def _call_router_llm( + prompt: str, + dao_id: str, + user_id: Optional[str] = None +) -> str: + """Call DAGI Router LLM""" + router_url = f"{settings.ROUTER_BASE_URL.rstrip('/')}/route" + + payload = { + "mode": "chat", + "dao_id": dao_id, + "user_id": user_id or "rag-service", + "payload": { + "message": prompt + } + } + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post(router_url, json=payload) + resp.raise_for_status() + data = resp.json() + + return data.get("data", {}).get("text", "Не вдалося отримати відповідь") + + except Exception as e: + logger.error(f"Router LLM call failed: {e}") + return f"Помилка при виклику LLM: {str(e)}" + + +async def _call_openai_llm(prompt: str) -> str: + """Call OpenAI LLM""" + # TODO: Implement OpenAI client + return "OpenAI integration not yet implemented" + + +def _build_citations(documents: List[Any]) -> List[Dict[str, Any]]: + """ + Build citations from retrieved documents + + Args: + documents: List of Haystack Documents + + Returns: + List of citation dictionaries + """ + citations = [] + + for doc in documents: + meta = doc.meta + citations.append({ + "doc_id": meta.get("doc_id", "unknown"), + "page": meta.get("page", 0), + "section": meta.get("section"), + "excerpt": doc.content[:200] + "..." if len(doc.content) > 200 else doc.content + }) + + return citations + diff --git a/services/rag-service/requirements.txt b/services/rag-service/requirements.txt new file mode 100644 index 00000000..0dea21ee --- /dev/null +++ b/services/rag-service/requirements.txt @@ -0,0 +1,10 @@ +fastapi>=0.115.0 +uvicorn[standard]>=0.30.0 +pydantic>=2.0.0 +pydantic-settings>=2.0.0 +haystack-ai>=2.0.0 +sentence-transformers>=2.2.0 +psycopg2-binary>=2.9.0 +httpx>=0.27.0 +python-dotenv>=1.0.0 + diff --git a/services/rag-service/tests/__init__.py b/services/rag-service/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/services/rag-service/tests/fixtures/parsed_json_example.json b/services/rag-service/tests/fixtures/parsed_json_example.json new file mode 100644 index 00000000..fd900bad --- /dev/null +++ b/services/rag-service/tests/fixtures/parsed_json_example.json @@ -0,0 +1,56 @@ +{ + "doc_id": "microdao-tokenomics-2025-11", + "doc_type": "pdf", + "pages": [ + { + "page_num": 1, + "blocks": [ + { + "type": "heading", + "text": "Токеноміка MicroDAO", + "bbox": {"x": 0, "y": 0, "width": 800, "height": 50}, + "reading_order": 1 + }, + { + "type": "paragraph", + "text": "MicroDAO використовує токен μGOV як ключ доступу до приватних спільнот. Стейкінг μGOV дозволяє отримувати дохід та участь у управлінні.", + "bbox": {"x": 0, "y": 60, "width": 800, "height": 100}, + "reading_order": 2 + }, + { + "type": "paragraph", + "text": "Стейкінг є основним механізмом отримання доходу в MicroDAO. Користувачі можуть стейкати токени та отримувати винагороди за участь у спільноті.", + "bbox": {"x": 0, "y": 170, "width": 800, "height": 100}, + "reading_order": 3 + } + ], + "width": 800, + "height": 600 + }, + { + "page_num": 2, + "blocks": [ + { + "type": "heading", + "text": "Роль стейкінгу", + "bbox": {"x": 0, "y": 0, "width": 800, "height": 50}, + "reading_order": 1 + }, + { + "type": "paragraph", + "text": "Стейкінг μGOV токенів дає користувачам право голосу та доступ до приватних каналів спільноти. Мінімальна сума стейкінгу визначається кожною спільнотою окремо.", + "bbox": {"x": 0, "y": 60, "width": 800, "height": 100}, + "reading_order": 2 + } + ], + "width": 800, + "height": 600 + } + ], + "metadata": { + "dao_id": "daarion", + "title": "Токеноміка MicroDAO", + "created_at": "2025-01-15T10:00:00Z" + } +} + diff --git a/services/rag-service/tests/test_e2e.py b/services/rag-service/tests/test_e2e.py new file mode 100644 index 00000000..c26874ad --- /dev/null +++ b/services/rag-service/tests/test_e2e.py @@ -0,0 +1,67 @@ +""" +E2E tests for RAG Service +Tests full ingest → query pipeline +""" + +import pytest +import json +from pathlib import Path +from fastapi.testclient import TestClient + +from app.main import app + +client = TestClient(app) + +# Load example parsed JSON +FIXTURES_DIR = Path(__file__).parent / "fixtures" +EXAMPLE_JSON = json.loads((FIXTURES_DIR / "parsed_json_example.json").read_text()) + + +class TestE2E: + """End-to-end tests""" + + def test_health(self): + """Test health endpoint""" + response = client.get("/health") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "healthy" + assert data["service"] == "rag-service" + + @pytest.mark.skip(reason="Requires database connection") + def test_ingest_then_query(self): + """Test full pipeline: ingest → query""" + # Step 1: Ingest document + ingest_request = { + "dao_id": "daarion", + "doc_id": "microdao-tokenomics-2025-11", + "parsed_json": EXAMPLE_JSON + } + + ingest_response = client.post("/ingest", json=ingest_request) + assert ingest_response.status_code == 200 + ingest_data = ingest_response.json() + assert ingest_data["status"] == "success" + assert ingest_data["doc_count"] > 0 + + # Step 2: Query + query_request = { + "dao_id": "daarion", + "question": "Поясни токеноміку microDAO і роль стейкінгу" + } + + query_response = client.post("/query", json=query_request) + assert query_response.status_code == 200 + query_data = query_response.json() + + assert "answer" in query_data + assert len(query_data["answer"]) > 0 + assert "citations" in query_data + assert len(query_data["citations"]) > 0 + + # Check citation structure + citation = query_data["citations"][0] + assert "doc_id" in citation + assert "page" in citation + assert "excerpt" in citation + diff --git a/services/rag-service/tests/test_ingest.py b/services/rag-service/tests/test_ingest.py new file mode 100644 index 00000000..7863212f --- /dev/null +++ b/services/rag-service/tests/test_ingest.py @@ -0,0 +1,82 @@ +""" +Tests for ingest pipeline +""" + +import pytest +from app.ingest_pipeline import ingest_parsed_document, _parsed_json_to_documents + + +class TestIngestPipeline: + """Tests for document ingestion""" + + def test_parsed_json_to_documents(self): + """Test conversion of parsed JSON to Haystack Documents""" + parsed_json = { + "doc_id": "test-doc", + "doc_type": "pdf", + "pages": [ + { + "page_num": 1, + "blocks": [ + { + "type": "heading", + "text": "Test Document", + "bbox": {"x": 0, "y": 0, "width": 800, "height": 50}, + "reading_order": 1 + }, + { + "type": "paragraph", + "text": "This is test content.", + "bbox": {"x": 0, "y": 60, "width": 800, "height": 100}, + "reading_order": 2 + } + ], + "width": 800, + "height": 600 + } + ], + "metadata": { + "dao_id": "test-dao", + "title": "Test Document" + } + } + + documents = _parsed_json_to_documents( + parsed_json=parsed_json, + dao_id="test-dao", + doc_id="test-doc" + ) + + assert len(documents) == 2 + assert documents[0].content == "Test Document" + assert documents[0].meta["dao_id"] == "test-dao" + assert documents[0].meta["doc_id"] == "test-doc" + assert documents[0].meta["page"] == 1 + assert documents[0].meta["block_type"] == "heading" + + def test_parsed_json_to_documents_empty_blocks(self): + """Test that empty blocks are skipped""" + parsed_json = { + "doc_id": "test-doc", + "pages": [ + { + "page_num": 1, + "blocks": [ + {"type": "paragraph", "text": ""}, + {"type": "paragraph", "text": " "}, + {"type": "paragraph", "text": "Valid content"} + ] + } + ], + "metadata": {} + } + + documents = _parsed_json_to_documents( + parsed_json=parsed_json, + dao_id="test-dao", + doc_id="test-doc" + ) + + assert len(documents) == 1 + assert documents[0].content == "Valid content" + diff --git a/services/rag-service/tests/test_query.py b/services/rag-service/tests/test_query.py new file mode 100644 index 00000000..0187da16 --- /dev/null +++ b/services/rag-service/tests/test_query.py @@ -0,0 +1,50 @@ +""" +Tests for query pipeline +""" + +import pytest +from unittest.mock import AsyncMock, patch, MagicMock +from app.query_pipeline import answer_query, _build_citations + + +class TestQueryPipeline: + """Tests for RAG query pipeline""" + + @pytest.mark.asyncio + async def test_answer_query_no_documents(self): + """Test query when no documents found""" + with patch("app.query_pipeline._retrieve_documents", return_value=[]): + result = await answer_query( + dao_id="test-dao", + question="Test question" + ) + + assert "answer" in result + assert "На жаль, я не знайшов" in result["answer"] + assert result["citations"] == [] + + @pytest.mark.asyncio + async def test_build_citations(self): + """Test citation building""" + from haystack.schema import Document + + documents = [ + Document( + content="Test content 1", + meta={"doc_id": "doc1", "page": 1, "section": "Section 1"} + ), + Document( + content="Test content 2", + meta={"doc_id": "doc2", "page": 2} + ) + ] + + citations = _build_citations(documents) + + assert len(citations) == 2 + assert citations[0]["doc_id"] == "doc1" + assert citations[0]["page"] == 1 + assert citations[0]["section"] == "Section 1" + assert citations[1]["doc_id"] == "doc2" + assert citations[1]["page"] == 2 +