From 1ed11811058bdc665e5953fcf0df83288a3e0ea5 Mon Sep 17 00:00:00 2001 From: Apple Date: Sun, 16 Nov 2025 05:12:19 -0800 Subject: [PATCH] feat: add RAG quality metrics, optimized prompts, and evaluation tools Optimized Prompts: - Create utils/rag_prompt_builder.py with citation-optimized prompts - Specialized for DAO tokenomics and technical documentation - Proper citation format [1], [2] with doc_id, page, section - Memory context integration (facts, events, summaries) - Token count estimation RAG Service Metrics: - Add comprehensive logging in query_pipeline.py - Log: question, doc_ids, scores, retrieval method, timing - Track: retrieval_time, total_query_time, documents_found, citations_count - Add metrics in ingest_pipeline.py: pages_processed, blocks_processed, pipeline_time Router Improvements: - Use optimized prompt builder in _handle_rag_query() - Add graceful fallback: if RAG unavailable, use Memory only - Log prompt token count, RAG usage, Memory usage - Return detailed metadata (rag_used, memory_used, citations_count, metrics) Evaluation Tools: - Create tests/rag_eval.py for systematic quality testing - Test fixed questions with expected doc_ids - Save results to JSON and CSV - Compare RAG Service vs Router results - Track: citations, expected docs found, query times Documentation: - Create docs/RAG_METRICS_PLAN.md - Plan for Prometheus metrics collection - Grafana dashboard panels and alerts - Implementation guide for metrics --- docs/RAG_METRICS_PLAN.md | 225 +++++++++++++++++++ router_app.py | 77 +++---- services/rag-service/app/ingest_pipeline.py | 38 +++- services/rag-service/app/query_pipeline.py | 80 ++++++- tests/rag_eval.py | 237 ++++++++++++++++++++ utils/rag_prompt_builder.py | 169 ++++++++++++++ 6 files changed, 769 insertions(+), 57 deletions(-) create mode 100644 docs/RAG_METRICS_PLAN.md create mode 100755 tests/rag_eval.py create mode 100644 utils/rag_prompt_builder.py diff --git a/docs/RAG_METRICS_PLAN.md b/docs/RAG_METRICS_PLAN.md new file mode 100644 index 00000000..08bf5412 --- /dev/null +++ b/docs/RAG_METRICS_PLAN.md @@ -0,0 +1,225 @@ +# RAG Metrics & Dashboard Plan + +План збору метрик та створення дашборду для RAG + Memory стеку. + +--- + +## 1. Метрики для збору + +### 1.1. RAG Service Metrics + +**Ingest Metrics:** +- `rag_ingest_total` - загальна кількість ingest операцій +- `rag_ingest_duration_seconds` - час ingest (histogram) +- `rag_ingest_documents_indexed` - кількість індексованих документів +- `rag_ingest_pages_processed` - кількість оброблених сторінок +- `rag_ingest_errors_total` - кількість помилок ingest + +**Query Metrics:** +- `rag_query_total` - загальна кількість запитів +- `rag_query_duration_seconds` - час query (histogram) +- `rag_query_documents_retrieved` - кількість знайдених документів +- `rag_query_citations_count` - кількість citations +- `rag_query_embedding_time_seconds` - час embedding +- `rag_query_retrieval_time_seconds` - час retrieval +- `rag_query_llm_time_seconds` - час LLM генерації +- `rag_query_errors_total` - кількість помилок query +- `rag_query_empty_results_total` - запити без результатів + +**Quality Metrics:** +- `rag_query_dao_filter_applied` - застосування dao_id фільтра +- `rag_query_doc_ids_found` - унікальні doc_ids в результатах + +### 1.2. Router Metrics (RAG Query Mode) + +- `router_rag_query_total` - загальна кількість rag_query запитів +- `router_rag_query_duration_seconds` - загальний час обробки +- `router_rag_query_memory_used` - використання Memory +- `router_rag_query_rag_used` - використання RAG +- `router_rag_query_prompt_tokens_estimated` - оцінка токенів промпту +- `router_rag_query_fallback_total` - fallback на Memory only + +### 1.3. Memory Service Metrics + +- `memory_context_fetch_total` - кількість викликів get_context +- `memory_context_fetch_duration_seconds` - час отримання контексту +- `memory_context_facts_count` - кількість facts +- `memory_context_events_count` - кількість events +- `memory_context_summaries_count` - кількість summaries + +--- + +## 2. Де збирати метрики + +### 2.1. RAG Service + +**Файл:** `services/rag-service/app/metrics.py` + +```python +from prometheus_client import Counter, Histogram, Gauge + +# Ingest metrics +ingest_total = Counter('rag_ingest_total', 'Total ingest operations') +ingest_duration = Histogram('rag_ingest_duration_seconds', 'Ingest duration') +ingest_documents = Counter('rag_ingest_documents_indexed', 'Documents indexed') +ingest_errors = Counter('rag_ingest_errors_total', 'Ingest errors') + +# Query metrics +query_total = Counter('rag_query_total', 'Total queries') +query_duration = Histogram('rag_query_duration_seconds', 'Query duration') +query_documents = Histogram('rag_query_documents_retrieved', 'Documents retrieved') +query_citations = Histogram('rag_query_citations_count', 'Citations count') +query_errors = Counter('rag_query_errors_total', 'Query errors') +query_empty = Counter('rag_query_empty_results_total', 'Empty results') + +# Quality metrics +query_dao_filter = Counter('rag_query_dao_filter_applied', 'DAO filter applied', ['dao_id']) +``` + +**Використання:** +- В `ingest_pipeline.py`: після успішного ingest +- В `query_pipeline.py`: після кожного query + +### 2.2. Router + +**Файл:** `metrics.py` (в корені Router) + +```python +from prometheus_client import Counter, Histogram + +rag_query_total = Counter('router_rag_query_total', 'Total RAG queries') +rag_query_duration = Histogram('router_rag_query_duration_seconds', 'RAG query duration') +rag_query_memory_used = Counter('router_rag_query_memory_used', 'Memory used in RAG queries') +rag_query_rag_used = Counter('router_rag_query_rag_used', 'RAG used in queries') +rag_query_fallback = Counter('router_rag_query_fallback_total', 'Fallback to Memory only') +``` + +**Використання:** +- В `router_app.py`: в `_handle_rag_query()` + +--- + +## 3. Dashboard (Grafana) + +### 3.1. Panels + +**RAG Service:** +1. **Ingest Rate** - `rate(rag_ingest_total[5m])` +2. **Ingest Duration** - `histogram_quantile(0.95, rag_ingest_duration_seconds)` +3. **Documents Indexed** - `sum(rag_ingest_documents_indexed)` +4. **Query Rate** - `rate(rag_query_total[5m])` +5. **Query Duration** - `histogram_quantile(0.95, rag_query_duration_seconds)` +6. **Documents Retrieved** - `avg(rag_query_documents_retrieved)` +7. **Citations Count** - `avg(rag_query_citations_count)` +8. **Empty Results Rate** - `rate(rag_query_empty_results_total[5m]) / rate(rag_query_total[5m])` + +**Router (RAG Query):** +1. **RAG Query Rate** - `rate(router_rag_query_total[5m])` +2. **RAG Query Duration** - `histogram_quantile(0.95, router_rag_query_duration_seconds)` +3. **Memory Usage Rate** - `rate(router_rag_query_memory_used[5m]) / rate(router_rag_query_total[5m])` +4. **RAG Usage Rate** - `rate(router_rag_query_rag_used[5m]) / rate(router_rag_query_total[5m])` +5. **Fallback Rate** - `rate(router_rag_query_fallback_total[5m]) / rate(router_rag_query_total[5m])` + +**Memory Service:** +1. **Context Fetch Rate** - `rate(memory_context_fetch_total[5m])` +2. **Context Fetch Duration** - `histogram_quantile(0.95, memory_context_fetch_duration_seconds)` +3. **Average Facts Count** - `avg(memory_context_facts_count)` +4. **Average Events Count** - `avg(memory_context_events_count)` + +### 3.2. Alerts + +- **High Error Rate**: `rate(rag_query_errors_total[5m]) > 0.1` +- **Slow Queries**: `histogram_quantile(0.95, rag_query_duration_seconds) > 10` +- **High Fallback Rate**: `rate(router_rag_query_fallback_total[5m]) / rate(router_rag_query_total[5m]) > 0.2` +- **Empty Results**: `rate(rag_query_empty_results_total[5m]) / rate(rag_query_total[5m]) > 0.3` + +--- + +## 4. Реалізація (мінімальна) + +### 4.1. Додати Prometheus Client + +**RAG Service:** +```bash +pip install prometheus-client +``` + +**Router:** +```bash +pip install prometheus-client +``` + +### 4.2. Expose Metrics Endpoint + +**RAG Service:** +```python +# app/main.py +from prometheus_client import generate_latest, CONTENT_TYPE_LATEST +from fastapi.responses import Response + +@app.get("/metrics") +async def metrics(): + return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST) +``` + +**Router:** +```python +# http_api.py +@app.get("/metrics") +async def metrics(): + return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST) +``` + +### 4.3. Docker Compose для Prometheus + Grafana + +```yaml +prometheus: + image: prom/prometheus + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + ports: + - "9090:9090" + +grafana: + image: grafana/grafana + ports: + - "3000:3000" + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin +``` + +--- + +## 5. Наступні кроки + +1. Додати `prometheus-client` в requirements +2. Створити `metrics.py` в RAG Service та Router +3. Додати `/metrics` endpoints +4. Налаштувати Prometheus scraping +5. Створити Grafana dashboard +6. Налаштувати alerts + +--- + +## 6. Корисні запити для аналізу + +**Hit Rate (кількість успішних запитів з результатами):** +``` +(rag_query_total - rag_query_empty_results_total) / rag_query_total +``` + +**Average Documents per Query:** +``` +avg(rag_query_documents_retrieved) +``` + +**DAO Distribution:** +``` +sum by (dao_id) (rag_query_dao_filter_applied) +``` + +**Token Usage:** +``` +avg(router_rag_query_prompt_tokens_estimated) +``` + diff --git a/router_app.py b/router_app.py index 7aefa871..ead6013e 100644 --- a/router_app.py +++ b/router_app.py @@ -189,48 +189,33 @@ class RouterApp: logger.info(f"RAG retrieved {len(rag_docs)} documents, {len(rag_citations)} citations") - # 3. Build final prompt with Memory + RAG - system_prompt = ( - "Ти асистент microDAO. Використовуй і особисту пам'ять, і документи DAO.\n" - "Формуй чітку, структуровану відповідь українською, посилаючись на документи " - "через індекси [1], [2] тощо, де це доречно.\n\n" - ) + # 3. Build final prompt with Memory + RAG (using optimized prompt builder) + from utils.rag_prompt_builder import build_rag_prompt_with_citations, estimate_token_count - # Add Memory context - memory_text = "" - if memory_ctx.get("facts"): - facts_summary = ", ".join([ - f"{f.get('fact_key', '')}={f.get('fact_value', '')}" - for f in memory_ctx["facts"][:5] - ]) - if facts_summary: - memory_text += f"Особисті факти: {facts_summary}\n" + # Only include RAG if available + if rag_used and rag_citations: + final_prompt = build_rag_prompt_with_citations( + question=question, + memory_context=memory_ctx, + rag_citations=rag_citations, + rag_documents=rag_docs + ) + else: + # Fallback: Memory only prompt + from utils.rag_prompt_builder import _build_memory_section + memory_section = _build_memory_section(memory_ctx) + + final_prompt = ( + "Ти — експерт-консультант з токеноміки та архітектури DAO в екосистемі DAARION.city.\n" + "Відповідай на основі особистої пам'яті та контексту.\n\n" + ) + if memory_section: + final_prompt += f"**Особиста пам'ять та контекст:**\n{memory_section}\n\n" + final_prompt += f"**Питання користувача:**\n{question}\n\n**Відповідь:**" - if memory_ctx.get("recent_events"): - recent = memory_ctx["recent_events"][:3] - events_summary = "\n".join([ - f"- {e.get('body_text', '')[:100]}" - for e in recent - ]) - if events_summary: - memory_text += f"Останні події:\n{events_summary}\n" - - # Add RAG documents - docs_text = "" - for i, citation in enumerate(rag_citations[:5], start=1): - doc_id = citation.get("doc_id", "unknown") - page = citation.get("page", 0) - excerpt = citation.get("excerpt", "") - docs_text += f"[{i}] (doc_id={doc_id}, page={page}): {excerpt}\n" - - # Build final prompt - final_prompt = ( - f"{system_prompt}" - f"{'1) Пам\'ять (короткий summary):\n' + memory_text + '\n' if memory_text else ''}" - f"2) Релевантні документи (витяги):\n{docs_text}\n\n" - f"Питання користувача:\n{question}\n\n" - "Відповідь:" - ) + # Estimate token count for logging + estimated_tokens = estimate_token_count(final_prompt) + logger.info(f"Final prompt length: ~{estimated_tokens} tokens, RAG used: {rag_used}") # 4. Call LLM provider provider = self.routing_table.resolve_provider(req) @@ -263,13 +248,15 @@ class RouterApp: provider_id=llm_response.provider_id, data={ "text": llm_response.data.get("text", ""), - "citations": rag_citations + "citations": rag_citations if rag_used else [] }, metadata={ - "memory_used": bool(memory_text), - "rag_used": True, - "documents_retrieved": len(rag_docs), - "citations_count": len(rag_citations) + "memory_used": bool(memory_ctx.get("facts") or memory_ctx.get("recent_events")), + "rag_used": rag_used, + "documents_retrieved": len(rag_docs) if rag_used else 0, + "citations_count": len(rag_citations) if rag_used else 0, + "prompt_tokens_estimated": estimated_tokens, + "rag_metrics": rag_resp.get("metrics") if rag_resp else None }, error=None ) diff --git a/services/rag-service/app/ingest_pipeline.py b/services/rag-service/app/ingest_pipeline.py index f1ca635d..9c0ef453 100644 --- a/services/rag-service/app/ingest_pipeline.py +++ b/services/rag-service/app/ingest_pipeline.py @@ -34,8 +34,11 @@ def ingest_parsed_document( user_id: Optional user identifier Returns: - Dictionary with ingest results (doc_count, status) + Dictionary with ingest results (doc_count, status, metrics) """ + import time + ingest_start = time.time() + logger.info(f"Ingesting document: dao_id={dao_id}, doc_id={doc_id}") try: @@ -56,26 +59,53 @@ def ingest_parsed_document( pipeline = _create_ingest_pipeline() # Run pipeline + pipeline_start = time.time() result = pipeline.run({"documents": documents}) + pipeline_time = time.time() - pipeline_start # Extract results written_docs = result.get("documents_writer", {}).get("documents_written", 0) - logger.info(f"Ingested {written_docs} documents for doc_id={doc_id}") + # Calculate metrics + total_time = time.time() - ingest_start + pages_count = len(parsed_json.get("pages", [])) + blocks_count = sum( + len(page.get("blocks", [])) + for page in parsed_json.get("pages", []) + ) + + logger.info( + f"Ingested {written_docs} documents for doc_id={doc_id}: " + f"pages={pages_count}, blocks={blocks_count}, " + f"pipeline_time={pipeline_time:.2f}s, total_time={total_time:.2f}s" + ) return { "status": "success", "doc_count": written_docs, "dao_id": dao_id, - "doc_id": doc_id + "doc_id": doc_id, + "metrics": { + "pages_processed": pages_count, + "blocks_processed": blocks_count, + "documents_indexed": written_docs, + "pipeline_time_seconds": round(pipeline_time, 2), + "total_time_seconds": round(total_time, 2) + } } except Exception as e: logger.error(f"Failed to ingest document: {e}", exc_info=True) + total_time = time.time() - ingest_start + logger.error(f"Ingest failed after {total_time:.2f}s: {e}") return { "status": "error", "message": str(e), - "doc_count": 0 + "doc_count": 0, + "metrics": { + "total_time_seconds": round(total_time, 2), + "error": str(e) + } } diff --git a/services/rag-service/app/query_pipeline.py b/services/rag-service/app/query_pipeline.py index 945aca5b..edad3624 100644 --- a/services/rag-service/app/query_pipeline.py +++ b/services/rag-service/app/query_pipeline.py @@ -37,23 +37,33 @@ async def answer_query( Returns: Dictionary with answer, citations, and retrieved documents """ + import time + start_time = time.time() + logger.info(f"Answering query: dao_id={dao_id}, question={question[:50]}...") top_k = top_k or settings.TOP_K try: # Retrieve relevant documents - documents = _retrieve_documents(dao_id, question, top_k) + documents, retrieval_metrics = _retrieve_documents(dao_id, question, top_k) if not documents: logger.warning(f"No documents found for dao_id={dao_id}") + elapsed_time = time.time() - start_time return { "answer": "На жаль, я не знайшов релевантної інформації в базі знань.", "citations": [], - "documents": [] + "documents": [], + "metrics": { + "documents_retrieved": 0, + "citations_count": 0, + "doc_ids": [], + "query_time_seconds": round(elapsed_time, 2) + } } - logger.info(f"Retrieved {len(documents)} documents") + logger.info(f"Retrieved {len(documents)} documents (method: {retrieval_metrics.get('retrieval_method')})") # Generate answer using LLM answer = await _generate_answer(question, documents, dao_id, user_id) @@ -61,6 +71,31 @@ async def answer_query( # Build citations citations = _build_citations(documents) + # Calculate metrics + elapsed_time = time.time() - start_time + doc_ids = list(set([doc.meta.get("doc_id", "unknown") for doc in documents])) + + # Merge metrics + final_metrics = { + **retrieval_metrics, + "documents_retrieved": len(documents), + "citations_count": len(citations), + "doc_ids": doc_ids, + "total_query_time_seconds": round(elapsed_time, 2), + "answer_length": len(answer) + } + + # Log metrics + logger.info( + f"RAG query completed: " + f"dao_id={dao_id}, " + f"documents_found={len(documents)}, " + f"citations={len(citations)}, " + f"doc_ids={doc_ids}, " + f"retrieval_time={retrieval_metrics.get('retrieval_time_seconds', 0):.2f}s, " + f"total_time={elapsed_time:.2f}s" + ) + return { "answer": answer, "citations": citations, @@ -70,15 +105,25 @@ async def answer_query( "meta": doc.meta } for doc in documents - ] + ], + "metrics": final_metrics } except Exception as e: logger.error(f"Failed to answer query: {e}", exc_info=True) + elapsed_time = time.time() - start_time + logger.error(f"RAG query failed after {elapsed_time:.2f}s: {e}") return { "answer": f"Помилка при обробці запиту: {str(e)}", "citations": [], - "documents": [] + "documents": [], + "metrics": { + "documents_retrieved": 0, + "citations_count": 0, + "doc_ids": [], + "query_time_seconds": round(elapsed_time, 2), + "error": str(e) + } } @@ -86,7 +131,7 @@ def _retrieve_documents( dao_id: str, question: str, top_k: int -) -> List[Any]: +) -> tuple[List[Any], Dict[str, Any]]: """ Retrieve relevant documents from DocumentStore @@ -96,8 +141,11 @@ def _retrieve_documents( top_k: Number of documents to retrieve Returns: - List of Haystack Document objects + Tuple of (List of Haystack Document objects, metrics dict) """ + import time + retrieval_start = time.time() + # Get components embedder = get_text_embedder() document_store = get_document_store() @@ -116,6 +164,7 @@ def _retrieve_documents( top_k=top_k, return_embedding=False ) + retrieval_method = "vector_search" except Exception as e: logger.warning(f"Vector search failed: {e}, trying filter_documents") # Fallback to filter_documents @@ -124,6 +173,7 @@ def _retrieve_documents( top_k=top_k, return_embedding=False ) + retrieval_method = "filter_documents" # If no documents with filter, try without filter (fallback) if not documents: @@ -135,14 +185,28 @@ def _retrieve_documents( top_k=top_k, return_embedding=False ) + retrieval_method = "vector_search_no_filter" except Exception: documents = document_store.filter_documents( filters=None, top_k=top_k, return_embedding=False ) + retrieval_method = "filter_documents_no_filter" - return documents + retrieval_time = time.time() - retrieval_start + + # Build metrics + doc_ids = list(set([doc.meta.get("doc_id", "unknown") for doc in documents])) + metrics = { + "retrieval_method": retrieval_method, + "retrieval_time_seconds": round(retrieval_time, 2), + "documents_found": len(documents), + "doc_ids": doc_ids, + "filters_applied": {"dao_id": dao_id} + } + + return documents, metrics async def _generate_answer( diff --git a/tests/rag_eval.py b/tests/rag_eval.py new file mode 100755 index 00000000..0225ff10 --- /dev/null +++ b/tests/rag_eval.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +""" +RAG Evaluation Script +Tests RAG quality with fixed questions and saves results +""" + +import json +import csv +import time +import sys +from pathlib import Path +from typing import List, Dict, Any +from datetime import datetime + +import httpx + + +# Configuration +RAG_URL = "http://localhost:9500" +ROUTER_URL = "http://localhost:9102" +DAO_ID = "daarion" + +# Test questions +TEST_QUESTIONS = [ + { + "id": "q1", + "question": "Яка роль стейкінгу в microDAO?", + "expected_doc_ids": ["microdao-tokenomics"], + "category": "tokenomics" + }, + { + "id": "q2", + "question": "Які основні фази roadmap розгортання?", + "expected_doc_ids": ["roadmap", "deployment"], + "category": "roadmap" + }, + { + "id": "q3", + "question": "Поясни архітектуру DAARION.city", + "expected_doc_ids": ["architecture", "whitepaper"], + "category": "architecture" + }, + { + "id": "q4", + "question": "Як працює система ролей та RBAC?", + "expected_doc_ids": ["rbac", "roles"], + "category": "rbac" + }, + { + "id": "q5", + "question": "Що таке μGOV токен і навіщо він потрібен?", + "expected_doc_ids": ["microdao-tokenomics", "tokenomics"], + "category": "tokenomics" + } +] + + +async def test_rag_query(question: Dict[str, Any], dao_id: str) -> Dict[str, Any]: + """Test single RAG query""" + async with httpx.AsyncClient(timeout=60.0) as client: + start_time = time.time() + + response = await client.post( + f"{RAG_URL}/query", + json={ + "dao_id": dao_id, + "question": question["question"], + "top_k": 5 + } + ) + + elapsed = time.time() - start_time + response.raise_for_status() + data = response.json() + + # Extract metrics + metrics = data.get("metrics", {}) + citations = data.get("citations", []) + answer = data.get("answer", "") + + # Check if expected doc_ids are found + found_doc_ids = [c.get("doc_id", "") for c in citations] + expected_found = any( + expected_id in found_doc_id + for expected_id in question["expected_doc_ids"] + for found_doc_id in found_doc_ids + ) + + return { + "question_id": question["id"], + "question": question["question"], + "category": question["category"], + "answer": answer, + "answer_length": len(answer), + "citations_count": len(citations), + "citations": citations, + "doc_ids_found": found_doc_ids, + "expected_doc_found": expected_found, + "query_time_seconds": elapsed, + "metrics": metrics, + "timestamp": datetime.utcnow().isoformat() + } + + +async def test_router_query(question: Dict[str, Any], dao_id: str, user_id: str = "test-user") -> Dict[str, Any]: + """Test query via Router (Memory + RAG)""" + async with httpx.AsyncClient(timeout=60.0) as client: + start_time = time.time() + + response = await client.post( + f"{ROUTER_URL}/route", + json={ + "mode": "rag_query", + "dao_id": dao_id, + "user_id": user_id, + "payload": { + "question": question["question"] + } + } + ) + + elapsed = time.time() - start_time + response.raise_for_status() + data = response.json() + + # Extract data + answer = data.get("data", {}).get("text", "") + citations = data.get("data", {}).get("citations", []) or data.get("metadata", {}).get("citations", []) + metadata = data.get("metadata", {}) + + return { + "question_id": question["id"], + "question": question["question"], + "category": question["category"], + "answer": answer, + "answer_length": len(answer), + "citations_count": len(citations), + "citations": citations, + "memory_used": metadata.get("memory_used", False), + "rag_used": metadata.get("rag_used", False), + "query_time_seconds": elapsed, + "metadata": metadata, + "timestamp": datetime.utcnow().isoformat() + } + + +async def run_evaluation(output_dir: Path = Path("tests/rag_eval_results")): + """Run full evaluation""" + output_dir.mkdir(parents=True, exist_ok=True) + + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + + # Test RAG Service directly + print("Testing RAG Service directly...") + rag_results = [] + for question in TEST_QUESTIONS: + print(f" Testing: {question['question'][:50]}...") + try: + result = await test_rag_query(question, DAO_ID) + rag_results.append(result) + print(f" ✓ Found {result['citations_count']} citations, expected doc: {result['expected_doc_found']}") + except Exception as e: + print(f" ✗ Error: {e}") + rag_results.append({ + "question_id": question["id"], + "error": str(e) + }) + + # Test Router (Memory + RAG) + print("\nTesting Router (Memory + RAG)...") + router_results = [] + for question in TEST_QUESTIONS: + print(f" Testing: {question['question'][:50]}...") + try: + result = await test_router_query(question, DAO_ID) + router_results.append(result) + print(f" ✓ Answer length: {result['answer_length']}, citations: {result['citations_count']}") + except Exception as e: + print(f" ✗ Error: {e}") + router_results.append({ + "question_id": question["id"], + "error": str(e) + }) + + # Save results + results_file = output_dir / f"rag_eval_{timestamp}.json" + with open(results_file, "w", encoding="utf-8") as f: + json.dump({ + "rag_service_results": rag_results, + "router_results": router_results, + "timestamp": timestamp, + "dao_id": DAO_ID + }, f, indent=2, ensure_ascii=False) + + # Save CSV summary + csv_file = output_dir / f"rag_eval_{timestamp}.csv" + with open(csv_file, "w", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + writer.writerow([ + "Question ID", "Question", "Category", + "RAG Citations", "RAG Expected Found", "RAG Time (s)", + "Router Citations", "Router Memory Used", "Router Time (s)", + "Answer Length" + ]) + + for rag_res, router_res in zip(rag_results, router_results): + writer.writerow([ + rag_res.get("question_id", ""), + rag_res.get("question", ""), + rag_res.get("category", ""), + rag_res.get("citations_count", 0), + rag_res.get("expected_doc_found", False), + rag_res.get("query_time_seconds", 0), + router_res.get("citations_count", 0), + router_res.get("memory_used", False), + router_res.get("query_time_seconds", 0), + router_res.get("answer_length", 0) + ]) + + print(f"\n✓ Results saved:") + print(f" JSON: {results_file}") + print(f" CSV: {csv_file}") + + # Print summary + print("\n=== Summary ===") + rag_avg_time = sum(r.get("query_time_seconds", 0) for r in rag_results) / len(rag_results) + router_avg_time = sum(r.get("query_time_seconds", 0) for r in router_results) / len(router_results) + + print(f"RAG Service: avg time={rag_avg_time:.2f}s") + print(f"Router: avg time={router_avg_time:.2f}s") + print(f"Expected docs found: {sum(1 for r in rag_results if r.get('expected_doc_found', False))}/{len(rag_results)}") + + +if __name__ == "__main__": + import asyncio + asyncio.run(run_evaluation()) + diff --git a/utils/rag_prompt_builder.py b/utils/rag_prompt_builder.py new file mode 100644 index 00000000..22d8c775 --- /dev/null +++ b/utils/rag_prompt_builder.py @@ -0,0 +1,169 @@ +""" +RAG Prompt Builder - optimized prompts for DAO tokenomics and documents +""" + +from typing import List, Dict, Any, Optional + + +def build_rag_prompt_with_citations( + question: str, + memory_context: Dict[str, Any], + rag_citations: List[Dict[str, Any]], + rag_documents: Optional[List[Dict[str, Any]]] = None +) -> str: + """ + Build optimized prompt for RAG queries with citations + + Optimized for: + - DAO tokenomics questions + - Technical documentation + - Multi-document answers with proper citations + + Args: + question: User question + memory_context: Memory context (facts, events, summaries) + rag_citations: List of citations from RAG + rag_documents: Optional full documents (for context) + + Returns: + Formatted prompt for LLM + """ + # Base system prompt + system_prompt = ( + "Ти — експерт-консультант з токеноміки та архітектури DAO в екосистемі DAARION.city.\n" + "Твоя задача: дати чітку, структуровану відповідь на основі наданих документів та особистої пам'яті.\n\n" + "**Правила формування відповіді:**\n" + "1. Використовуй тільки інформацію з наданих документів та пам'яті\n" + "2. Посилайся на документи через індекси [1], [2], [3] тощо\n" + "3. Для технічних термінів (стейкінг, токени, ролі) давай конкретні приклади\n" + "4. Якщо в документах немає відповіді — чесно скажи, що не знаєш\n" + "5. Відповідай українською, структуровано (списки, абзаци)\n\n" + ) + + # Build Memory section + memory_section = _build_memory_section(memory_context) + + # Build Documents section with citations + documents_section = _build_documents_section(rag_citations, rag_documents) + + # Combine into final prompt + prompt_parts = [system_prompt] + + if memory_section: + prompt_parts.append("**1. Особиста пам'ять та контекст:**\n") + prompt_parts.append(memory_section) + prompt_parts.append("\n") + + if documents_section: + prompt_parts.append("**2. Релевантні документи DAO:**\n") + prompt_parts.append(documents_section) + prompt_parts.append("\n") + + prompt_parts.append(f"**Питання користувача:**\n{question}\n\n") + prompt_parts.append("**Твоя відповідь (з цитатами [1], [2] тощо):**") + + return "\n".join(prompt_parts) + + +def _build_memory_section(memory_context: Dict[str, Any]) -> str: + """Build memory context section""" + parts = [] + + # User facts + facts = memory_context.get("facts", []) + if facts: + facts_list = [] + for fact in facts[:5]: # Top 5 facts + key = fact.get("fact_key", "") + value = fact.get("fact_value", "") + if key and value: + facts_list.append(f"- {key}: {value}") + + if facts_list: + parts.append("Особисті факти користувача:") + parts.extend(facts_list) + parts.append("") + + # Recent events + events = memory_context.get("recent_events", []) + if events: + events_list = [] + for event in events[:3]: # Last 3 events + body = event.get("body_text", "") + if body: + events_list.append(f"- {body[:150]}...") + + if events_list: + parts.append("Останні події в діалозі:") + parts.extend(events_list) + parts.append("") + + # Dialog summaries + summaries = memory_context.get("dialog_summaries", []) + if summaries: + summary_text = summaries[0].get("summary_text", "") + if summary_text: + parts.append(f"Підсумок попередніх діалогів: {summary_text[:200]}...") + + return "\n".join(parts) if parts else "" + + +def _build_documents_section( + citations: List[Dict[str, Any]], + documents: Optional[List[Dict[str, Any]]] = None +) -> str: + """ + Build documents section with proper citation format + + Format: + [1] (doc_id=microdao-tokenomics, page=1, section=Токеноміка): + MicroDAO використовує токен μGOV як ключ доступу... + """ + if not citations: + return "Документи не знайдено." + + parts = [] + + for idx, citation in enumerate(citations[:5], start=1): # Top 5 citations + doc_id = citation.get("doc_id", "unknown") + page = citation.get("page", 0) + section = citation.get("section", "") + excerpt = citation.get("excerpt", "") + + # Build citation header + header_parts = [f"[{idx}]"] + if doc_id != "unknown": + header_parts.append(f"doc_id={doc_id}") + if page: + header_parts.append(f"page={page}") + if section: + header_parts.append(f"section={section}") + + header = " (" + ", ".join(header_parts) + "):" + + # Add excerpt + if excerpt: + # Limit excerpt length + excerpt_clean = excerpt[:300] + "..." if len(excerpt) > 300 else excerpt + parts.append(f"{header}\n{excerpt_clean}") + else: + parts.append(f"{header}\n(фрагмент недоступний)") + + parts.append("") # Empty line between citations + + return "\n".join(parts) + + +def estimate_token_count(text: str, chars_per_token: float = 4.0) -> int: + """ + Rough estimate of token count + + Args: + text: Text to estimate + chars_per_token: Average characters per token (default 4.0 for most models) + + Returns: + Estimated token count + """ + return int(len(text) / chars_per_token) +