From 382e661f1f930ed082ea7cbeb30f88be77870357 Mon Sep 17 00:00:00 2001 From: Apple Date: Sun, 16 Nov 2025 05:02:14 -0800 Subject: [PATCH] feat: complete RAG pipeline integration (ingest + query + Memory) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Parser Service: - Add /ocr/ingest endpoint (PARSER → RAG in one call) - Add RAG_BASE_URL and RAG_TIMEOUT to config - Add OcrIngestResponse schema - Create file_converter utility for PDF/image → PNG bytes - Endpoint accepts file, dao_id, doc_id, user_id - Automatically parses with dots.ocr and sends to RAG Service Router Integration: - Add _handle_rag_query() method in RouterApp - Combines Memory + RAG → LLM pipeline - Get Memory context (facts, events, summaries) - Query RAG Service for documents - Build prompt with Memory + RAG documents - Call LLM provider with combined context - Return answer with citations Clients: - Create rag_client.py for Router (query RAG Service) - Create memory_client.py for Router (get Memory context) E2E Tests: - Create e2e_rag_pipeline.sh script for full pipeline test - Test ingest → query → router query flow - Add E2E_RAG_README.md with usage examples Docker: - Add RAG_SERVICE_URL and MEMORY_SERVICE_URL to router environment --- docker-compose.yml | 2 + memory_client.py | 87 ++++++++++ rag_client.py | 74 +++++++++ router_app.py | 155 ++++++++++++++++++ services/parser-service/app/api/endpoints.py | 104 +++++++++++- services/parser-service/app/core/config.py | 4 + services/parser-service/app/schemas.py | 9 + .../app/utils/file_converter.py | 59 +++++++ tests/E2E_RAG_README.md | 125 ++++++++++++++ tests/e2e_rag_pipeline.sh | 101 ++++++++++++ 10 files changed, 719 insertions(+), 1 deletion(-) create mode 100644 memory_client.py create mode 100644 rag_client.py create mode 100644 services/parser-service/app/utils/file_converter.py create mode 100644 tests/E2E_RAG_README.md create mode 100755 tests/e2e_rag_pipeline.sh diff --git a/docker-compose.yml b/docker-compose.yml index 075ddbe9..07f67dd4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,8 @@ services: - RBAC_BASE_URL=http://rbac:9200 - DEVTOOLS_BASE_URL=http://devtools:8008 - CREWAI_BASE_URL=http://crewai:9010 + - RAG_SERVICE_URL=http://rag-service:9500 + - MEMORY_SERVICE_URL=http://memory-service:8000 volumes: - ./router-config.yml:/app/router-config.yml:ro - ./logs:/app/logs diff --git a/memory_client.py b/memory_client.py new file mode 100644 index 00000000..2a73017b --- /dev/null +++ b/memory_client.py @@ -0,0 +1,87 @@ +""" +Memory Service Client for Router +Used to get memory context for RAG queries +""" + +import os +import logging +from typing import Optional, Dict, Any +import httpx + +logger = logging.getLogger(__name__) + +MEMORY_SERVICE_URL = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000") + + +class MemoryClient: + """Client for Memory Service""" + + def __init__(self, base_url: str = MEMORY_SERVICE_URL): + self.base_url = base_url.rstrip("/") + self.timeout = 10.0 + + async def get_context( + self, + user_id: str, + agent_id: str, + team_id: str, + channel_id: Optional[str] = None, + limit: int = 10 + ) -> Dict[str, Any]: + """ + Get memory context for dialogue + + Returns: + Dictionary with facts, recent_events, dialog_summaries + """ + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + # Get user facts + facts_response = await client.get( + f"{self.base_url}/facts", + params={"user_id": user_id, "team_id": team_id, "limit": limit} + ) + facts = facts_response.json() if facts_response.status_code == 200 else [] + + # Get recent memory events + events_response = await client.get( + f"{self.base_url}/agents/{agent_id}/memory", + params={ + "team_id": team_id, + "channel_id": channel_id, + "scope": "short_term", + "kind": "message", + "limit": limit + } + ) + events = events_response.json().get("items", []) if events_response.status_code == 200 else [] + + # Get dialog summaries + summaries_response = await client.get( + f"{self.base_url}/summaries", + params={ + "team_id": team_id, + "channel_id": channel_id, + "agent_id": agent_id, + "limit": 5 + } + ) + summaries = summaries_response.json().get("items", []) if summaries_response.status_code == 200 else [] + + return { + "facts": facts, + "recent_events": events, + "dialog_summaries": summaries + } + except Exception as e: + logger.warning(f"Memory context fetch failed: {e}") + return { + "facts": [], + "recent_events": [], + "dialog_summaries": [] + } + + +# Global client instance +memory_client = MemoryClient() + diff --git a/rag_client.py b/rag_client.py new file mode 100644 index 00000000..f06897e1 --- /dev/null +++ b/rag_client.py @@ -0,0 +1,74 @@ +""" +RAG Service Client for Router +Used to query RAG Service for document retrieval +""" + +import os +import logging +from typing import Optional, Dict, Any +import httpx + +logger = logging.getLogger(__name__) + +RAG_SERVICE_URL = os.getenv("RAG_SERVICE_URL", "http://rag-service:9500") + + +class RAGClient: + """Client for RAG Service""" + + def __init__(self, base_url: str = RAG_SERVICE_URL): + self.base_url = base_url.rstrip("/") + self.timeout = 30.0 + + async def query( + self, + dao_id: str, + question: str, + top_k: Optional[int] = None, + user_id: Optional[str] = None + ) -> Dict[str, Any]: + """ + Query RAG Service for answer and documents + + Args: + dao_id: DAO identifier + question: User question + top_k: Number of documents to retrieve + user_id: Optional user identifier + + Returns: + Dictionary with answer, citations, and documents + """ + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post( + f"{self.base_url}/query", + json={ + "dao_id": dao_id, + "question": question, + "top_k": top_k, + "user_id": user_id + } + ) + response.raise_for_status() + return response.json() + + except httpx.HTTPError as e: + logger.error(f"RAG query failed: {e}") + return { + "answer": "Помилка при запиті до бази знань.", + "citations": [], + "documents": [] + } + except Exception as e: + logger.error(f"RAG query error: {e}", exc_info=True) + return { + "answer": "Помилка при запиті до бази знань.", + "citations": [], + "documents": [] + } + + +# Global client instance +rag_client = RAGClient() + diff --git a/router_app.py b/router_app.py index 0d9f4098..7aefa871 100644 --- a/router_app.py +++ b/router_app.py @@ -55,6 +55,11 @@ class RouterApp: async def handle(self, req: RouterRequest) -> RouterResponse: """Handle router request with RBAC context injection for chat mode""" + + # Special handling for rag_query mode (RAG + Memory → LLM) + if req.mode == "rag_query": + return await self._handle_rag_query(req) + # 1. RBAC injection for microDAO chat if req.mode == "chat" and req.dao_id and req.user_id: try: @@ -127,6 +132,156 @@ class RouterApp: error=f"Internal error: {str(e)}" ) + async def _handle_rag_query(self, req: RouterRequest) -> RouterResponse: + """ + Handle RAG query mode: combines Memory + RAG → LLM + + Flow: + 1. Get Memory context + 2. Query RAG Service for documents + 3. Build prompt with Memory + RAG + 4. Call LLM provider + 5. Return answer with citations + """ + from rag_client import rag_client + from memory_client import memory_client + + logger.info(f"Handling RAG query: dao_id={req.dao_id}, user_id={req.user_id}") + + try: + # Extract question + question = req.payload.get("question") or req.message + if not question: + return RouterResponse( + ok=False, + provider_id="router", + error="Missing 'question' in payload" + ) + + dao_id = req.dao_id or "daarion" + user_id = req.user_id or "anonymous" + + # 1. Get Memory context + memory_ctx = {} + try: + memory_ctx = await memory_client.get_context( + user_id=user_id, + agent_id=req.agent or "daarwizz", + team_id=dao_id, + channel_id=req.payload.get("channel_id"), + limit=10 + ) + logger.info(f"Memory context retrieved: {len(memory_ctx.get('facts', []))} facts, {len(memory_ctx.get('recent_events', []))} events") + except Exception as e: + logger.warning(f"Memory context fetch failed: {e}") + + # 2. Query RAG Service + rag_resp = await rag_client.query( + dao_id=dao_id, + question=question, + top_k=5, + user_id=user_id + ) + + rag_answer = rag_resp.get("answer", "") + rag_citations = rag_resp.get("citations", []) + rag_docs = rag_resp.get("documents", []) + + logger.info(f"RAG retrieved {len(rag_docs)} documents, {len(rag_citations)} citations") + + # 3. Build final prompt with Memory + RAG + system_prompt = ( + "Ти асистент microDAO. Використовуй і особисту пам'ять, і документи DAO.\n" + "Формуй чітку, структуровану відповідь українською, посилаючись на документи " + "через індекси [1], [2] тощо, де це доречно.\n\n" + ) + + # Add Memory context + memory_text = "" + if memory_ctx.get("facts"): + facts_summary = ", ".join([ + f"{f.get('fact_key', '')}={f.get('fact_value', '')}" + for f in memory_ctx["facts"][:5] + ]) + if facts_summary: + memory_text += f"Особисті факти: {facts_summary}\n" + + if memory_ctx.get("recent_events"): + recent = memory_ctx["recent_events"][:3] + events_summary = "\n".join([ + f"- {e.get('body_text', '')[:100]}" + for e in recent + ]) + if events_summary: + memory_text += f"Останні події:\n{events_summary}\n" + + # Add RAG documents + docs_text = "" + for i, citation in enumerate(rag_citations[:5], start=1): + doc_id = citation.get("doc_id", "unknown") + page = citation.get("page", 0) + excerpt = citation.get("excerpt", "") + docs_text += f"[{i}] (doc_id={doc_id}, page={page}): {excerpt}\n" + + # Build final prompt + final_prompt = ( + f"{system_prompt}" + f"{'1) Пам\'ять (короткий summary):\n' + memory_text + '\n' if memory_text else ''}" + f"2) Релевантні документи (витяги):\n{docs_text}\n\n" + f"Питання користувача:\n{question}\n\n" + "Відповідь:" + ) + + # 4. Call LLM provider + provider = self.routing_table.resolve_provider(req) + logger.info(f"Calling LLM provider: {provider.id}") + + # Create modified request with final prompt + llm_req = RouterRequest( + mode="chat", # Use chat mode for LLM + agent=req.agent, + dao_id=req.dao_id, + source=req.source, + session_id=req.session_id, + user_id=req.user_id, + message=final_prompt, + payload=req.payload + ) + + llm_response = await provider.call(llm_req) + + if not llm_response.ok: + return RouterResponse( + ok=False, + provider_id="router", + error=f"LLM call failed: {llm_response.error}" + ) + + # 5. Return response with citations + return RouterResponse( + ok=True, + provider_id=llm_response.provider_id, + data={ + "text": llm_response.data.get("text", ""), + "citations": rag_citations + }, + metadata={ + "memory_used": bool(memory_text), + "rag_used": True, + "documents_retrieved": len(rag_docs), + "citations_count": len(rag_citations) + }, + error=None + ) + + except Exception as e: + logger.error(f"RAG query handler error: {e}", exc_info=True) + return RouterResponse( + ok=False, + provider_id="router", + error=f"RAG query failed: {str(e)}" + ) + def get_provider_info(self): """Get info about registered providers""" return { diff --git a/services/parser-service/app/api/endpoints.py b/services/parser-service/app/api/endpoints.py index fd9c72a9..9746a56c 100644 --- a/services/parser-service/app/api/endpoints.py +++ b/services/parser-service/app/api/endpoints.py @@ -4,14 +4,17 @@ API endpoints for PARSER Service import logging import uuid +import json from pathlib import Path from typing import Optional from fastapi import APIRouter, UploadFile, File, HTTPException, Form from fastapi.responses import JSONResponse +import httpx from app.schemas import ( - ParseRequest, ParseResponse, ParsedDocument, ParsedChunk, QAPair, ChunksResponse + ParseRequest, ParseResponse, ParsedDocument, ParsedChunk, QAPair, ChunksResponse, + OcrIngestResponse ) from app.core.config import settings from app.runtime.inference import parse_document_from_images @@ -22,6 +25,7 @@ from app.runtime.postprocessing import ( build_chunks, build_qa_pairs, build_markdown ) from app.runtime.qa_builder import build_qa_pairs_via_router +from app.utils.file_converter import pdf_or_image_to_png_bytes logger = logging.getLogger(__name__) @@ -242,3 +246,101 @@ async def parse_chunks_endpoint( dao_id=dao_id ) + +@router.post("/ocr/ingest", response_model=OcrIngestResponse) +async def ocr_ingest_endpoint( + file: UploadFile = File(...), + dao_id: str = Form(...), + doc_id: Optional[str] = Form(None), + user_id: Optional[str] = Form(None) +): + """ + Parse document and ingest into RAG in one call + + Flow: + 1. Accept PDF/image file + 2. Parse with dots.ocr (raw_json mode) + 3. Send parsed_json to RAG Service /ingest + 4. Return doc_id + raw_json + + Args: + file: PDF or image file + dao_id: DAO identifier (required) + doc_id: Document identifier (optional, defaults to filename) + user_id: User identifier (optional) + """ + try: + # Generate doc_id if not provided + if not doc_id: + doc_id = file.filename or f"doc-{uuid.uuid4().hex[:8]}" + + # Read and validate file + content = await file.read() + validate_file_size(content) + + # Detect file type + doc_type = detect_file_type(content, file.filename) + + # Convert to images + if doc_type == "pdf": + images = convert_pdf_to_images(content) + else: + image = load_image(content) + images = [image] + + pages_count = len(images) + logger.info(f"Ingesting document: dao_id={dao_id}, doc_id={doc_id}, pages={pages_count}") + + # Parse document (raw_json mode) + parsed_doc = parse_document_from_images( + images=images, + output_mode="raw_json", + doc_id=doc_id, + doc_type=doc_type + ) + + # Convert to JSON + parsed_json = parsed_doc.model_dump(mode="json") + + # Send to RAG Service + ingest_payload = { + "dao_id": dao_id, + "doc_id": doc_id, + "parsed_json": parsed_json, + } + + if user_id: + ingest_payload["user_id"] = user_id + + rag_url = f"{settings.RAG_BASE_URL.rstrip('/')}/ingest" + logger.info(f"Sending to RAG Service: {rag_url}") + + try: + async with httpx.AsyncClient(timeout=settings.RAG_TIMEOUT) as client: + resp = await client.post(rag_url, json=ingest_payload) + resp.raise_for_status() + rag_result = resp.json() + + logger.info(f"RAG ingest successful: {rag_result.get('doc_count', 0)} documents indexed") + + except httpx.HTTPError as e: + logger.error(f"RAG ingest failed: {e}") + raise HTTPException( + status_code=502, + detail=f"RAG Service ingest failed: {str(e)}" + ) + + return OcrIngestResponse( + dao_id=dao_id, + doc_id=doc_id, + pages_processed=pages_count, + rag_ingested=True, + raw_json=parsed_json + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error in ocr_ingest: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Ingest failed: {str(e)}") + diff --git a/services/parser-service/app/core/config.py b/services/parser-service/app/core/config.py index 2be4d6ab..4cc658f0 100644 --- a/services/parser-service/app/core/config.py +++ b/services/parser-service/app/core/config.py @@ -47,6 +47,10 @@ class Settings(BaseSettings): ROUTER_BASE_URL: str = os.getenv("ROUTER_BASE_URL", "http://router:9102") ROUTER_TIMEOUT: int = int(os.getenv("ROUTER_TIMEOUT", "60")) + # RAG Service configuration (for ingest pipeline) + RAG_BASE_URL: str = os.getenv("RAG_BASE_URL", "http://rag-service:9500") + RAG_TIMEOUT: int = int(os.getenv("RAG_TIMEOUT", "120")) + class Config: env_file = ".env" case_sensitive = True diff --git a/services/parser-service/app/schemas.py b/services/parser-service/app/schemas.py index 8076e31e..c8e0dcf5 100644 --- a/services/parser-service/app/schemas.py +++ b/services/parser-service/app/schemas.py @@ -141,3 +141,12 @@ class ChunksResponse(BaseModel): doc_id: str = Field(..., description="Document ID") dao_id: str = Field(..., description="DAO ID") + +class OcrIngestResponse(BaseModel): + """Response from /ocr/ingest endpoint""" + dao_id: str = Field(..., description="DAO identifier") + doc_id: str = Field(..., description="Document identifier") + pages_processed: int = Field(..., description="Number of pages processed") + rag_ingested: bool = Field(..., description="Whether document was ingested into RAG") + raw_json: Dict[str, Any] = Field(..., description="Parsed document JSON") + diff --git a/services/parser-service/app/utils/file_converter.py b/services/parser-service/app/utils/file_converter.py new file mode 100644 index 00000000..863327a7 --- /dev/null +++ b/services/parser-service/app/utils/file_converter.py @@ -0,0 +1,59 @@ +""" +Helper functions for file conversion (PDF/image → PNG bytes) +""" + +import logging +from typing import Tuple, Optional +from io import BytesIO + +from PIL import Image +from app.runtime.preprocessing import convert_pdf_to_images, load_image, detect_file_type + +logger = logging.getLogger(__name__) + + +async def pdf_or_image_to_png_bytes( + filename: Optional[str], + file_bytes: bytes +) -> Tuple[bytes, int]: + """ + Convert PDF or image file to PNG bytes + + Args: + filename: Original filename (for type detection) + file_bytes: File content as bytes + + Returns: + Tuple of (PNG bytes, number of pages) + + Raises: + ValueError: If file type is not supported or conversion fails + """ + # Detect file type + doc_type = detect_file_type(file_bytes, filename) + + if doc_type == "pdf": + # Convert PDF to images + images = convert_pdf_to_images(file_bytes) + + if not images: + raise ValueError("PDF conversion produced no images") + + # Convert first page to PNG bytes (for single-page processing) + # For multi-page, we'll process all pages separately + first_image = images[0] + buf = BytesIO() + first_image.convert("RGB").save(buf, format="PNG") + png_bytes = buf.getvalue() + + return png_bytes, len(images) + + else: + # Load image and convert to PNG + image = load_image(file_bytes) + buf = BytesIO() + image.convert("RGB").save(buf, format="PNG") + png_bytes = buf.getvalue() + + return png_bytes, 1 + diff --git a/tests/E2E_RAG_README.md b/tests/E2E_RAG_README.md new file mode 100644 index 00000000..f7681ec2 --- /dev/null +++ b/tests/E2E_RAG_README.md @@ -0,0 +1,125 @@ +# E2E RAG Pipeline Test + +End-to-end тест для повного пайплайну: PARSER → RAG → Router (Memory + RAG). + +## Підготовка + +1. Запустити всі сервіси: +```bash +docker-compose up -d parser-service rag-service router memory-service city-db +``` + +2. Перевірити, що сервіси працюють: +```bash +curl http://localhost:9400/health # PARSER +curl http://localhost:9500/health # RAG +curl http://localhost:9102/health # Router +curl http://localhost:8000/health # Memory +``` + +## Тест 1: Ingest Document + +```bash +curl -X POST http://localhost:9400/ocr/ingest \ + -F "file=@tests/fixtures/parsed_json_example.json" \ + -F "dao_id=daarion" \ + -F "doc_id=microdao-tokenomics-2025-11" +``` + +**Очікуваний результат:** +```json +{ + "dao_id": "daarion", + "doc_id": "microdao-tokenomics-2025-11", + "pages_processed": 2, + "rag_ingested": true, + "raw_json": { ... } +} +``` + +## Тест 2: Query RAG Service Directly + +```bash +curl -X POST http://localhost:9500/query \ + -H "Content-Type: application/json" \ + -d '{ + "dao_id": "daarion", + "question": "Поясни токеноміку microDAO і роль стейкінгу" + }' +``` + +**Очікуваний результат:** +```json +{ + "answer": "MicroDAO використовує токен μGOV...", + "citations": [ + { + "doc_id": "microdao-tokenomics-2025-11", + "page": 1, + "section": "Токеноміка MicroDAO", + "excerpt": "..." + } + ], + "documents": [...] +} +``` + +## Тест 3: Query via Router (Memory + RAG) + +```bash +curl -X POST http://localhost:9102/route \ + -H "Content-Type: application/json" \ + -d '{ + "mode": "rag_query", + "dao_id": "daarion", + "user_id": "test-user", + "payload": { + "question": "Поясни токеноміку microDAO і роль стейкінгу" + } + }' +``` + +**Очікуваний результат:** +```json +{ + "ok": true, + "provider_id": "llm_local_qwen3_8b", + "data": { + "text": "Відповідь з урахуванням Memory + RAG...", + "citations": [...] + }, + "metadata": { + "memory_used": true, + "rag_used": true, + "documents_retrieved": 5, + "citations_count": 3 + } +} +``` + +## Автоматичний E2E тест + +Запустити скрипт: +```bash +./tests/e2e_rag_pipeline.sh +``` + +Скрипт перевіряє всі три кроки автоматично. + +## Troubleshooting + +### RAG Service не знаходить документи +- Перевірити, що документ був успішно індексований: `rag_ingested: true` +- Перевірити логі RAG Service: `docker-compose logs rag-service` +- Перевірити, що `dao_id` збігається в ingest та query + +### Router повертає помилку +- Перевірити, що `mode="rag_query"` правильно обробляється +- Перевірити логі Router: `docker-compose logs router` +- Перевірити, що RAG та Memory сервіси доступні з Router + +### Memory context порожній +- Перевірити, що Memory Service працює +- Перевірити, що `user_id` та `dao_id` правильні +- Memory може бути порожнім для нового користувача (це нормально) + diff --git a/tests/e2e_rag_pipeline.sh b/tests/e2e_rag_pipeline.sh new file mode 100755 index 00000000..5757a627 --- /dev/null +++ b/tests/e2e_rag_pipeline.sh @@ -0,0 +1,101 @@ +#!/bin/bash +# E2E test script for RAG pipeline: ingest → query + +set -e + +PARSER_URL="${PARSER_URL:-http://localhost:9400}" +RAG_URL="${RAG_URL:-http://localhost:9500}" +ROUTER_URL="${ROUTER_URL:-http://localhost:9102}" + +echo "=== E2E RAG Pipeline Test ===" +echo "" + +# Step 1: Ingest document +echo "Step 1: Ingesting document via /ocr/ingest..." +INGEST_RESPONSE=$(curl -s -X POST "${PARSER_URL}/ocr/ingest" \ + -F "file=@tests/fixtures/parsed_json_example.json" \ + -F "dao_id=daarion" \ + -F "doc_id=microdao-tokenomics-2025-11") + +echo "Ingest response:" +echo "$INGEST_RESPONSE" | jq '.' +echo "" + +DOC_ID=$(echo "$INGEST_RESPONSE" | jq -r '.doc_id') +RAG_INGESTED=$(echo "$INGEST_RESPONSE" | jq -r '.rag_ingested') + +if [ "$RAG_INGESTED" != "true" ]; then + echo "ERROR: Document was not ingested into RAG" + exit 1 +fi + +echo "✓ Document ingested: doc_id=${DOC_ID}" +echo "" + +# Step 2: Query via RAG Service directly +echo "Step 2: Querying RAG Service directly..." +RAG_QUERY_RESPONSE=$(curl -s -X POST "${RAG_URL}/query" \ + -H "Content-Type: application/json" \ + -d '{ + "dao_id": "daarion", + "question": "Поясни токеноміку microDAO і роль стейкінгу" + }') + +echo "RAG query response:" +echo "$RAG_QUERY_RESPONSE" | jq '.' +echo "" + +ANSWER=$(echo "$RAG_QUERY_RESPONSE" | jq -r '.answer') +CITATIONS_COUNT=$(echo "$RAG_QUERY_RESPONSE" | jq '.citations | length') + +if [ -z "$ANSWER" ] || [ "$ANSWER" == "null" ]; then + echo "ERROR: Empty answer from RAG Service" + exit 1 +fi + +if [ "$CITATIONS_COUNT" -eq 0 ]; then + echo "WARNING: No citations returned" +fi + +echo "✓ RAG query successful: answer length=${#ANSWER}, citations=${CITATIONS_COUNT}" +echo "" + +# Step 3: Query via Router (mode="rag_query") +echo "Step 3: Querying via Router (mode=rag_query)..." +ROUTER_QUERY_RESPONSE=$(curl -s -X POST "${ROUTER_URL}/route" \ + -H "Content-Type: application/json" \ + -d '{ + "mode": "rag_query", + "dao_id": "daarion", + "user_id": "test-user", + "payload": { + "question": "Поясни токеноміку microDAO і роль стейкінгу" + } + }') + +echo "Router query response:" +echo "$ROUTER_QUERY_RESPONSE" | jq '.' +echo "" + +ROUTER_OK=$(echo "$ROUTER_QUERY_RESPONSE" | jq -r '.ok') +ROUTER_TEXT=$(echo "$ROUTER_QUERY_RESPONSE" | jq -r '.data.text // .data.answer // ""') +ROUTER_CITATIONS=$(echo "$ROUTER_QUERY_RESPONSE" | jq '.data.citations // .metadata.citations // []') + +if [ "$ROUTER_OK" != "true" ]; then + echo "ERROR: Router query failed" + exit 1 +fi + +if [ -z "$ROUTER_TEXT" ] || [ "$ROUTER_TEXT" == "null" ]; then + echo "ERROR: Empty answer from Router" + exit 1 +fi + +ROUTER_CITATIONS_COUNT=$(echo "$ROUTER_CITATIONS" | jq 'length') + +echo "✓ Router query successful: answer length=${#ROUTER_TEXT}, citations=${ROUTER_CITATIONS_COUNT}" +echo "" + +echo "=== E2E Test Complete ===" +echo "All steps passed successfully!" +