feat: add RAG quality metrics, optimized prompts, and evaluation tools

Optimized Prompts:
- Create utils/rag_prompt_builder.py with citation-optimized prompts
- Specialized for DAO tokenomics and technical documentation
- Proper citation format [1], [2] with doc_id, page, section
- Memory context integration (facts, events, summaries)
- Token count estimation

RAG Service Metrics:
- Add comprehensive logging in query_pipeline.py
- Log: question, doc_ids, scores, retrieval method, timing
- Track: retrieval_time, total_query_time, documents_found, citations_count
- Add metrics in ingest_pipeline.py: pages_processed, blocks_processed, pipeline_time

Router Improvements:
- Use optimized prompt builder in _handle_rag_query()
- Add graceful fallback: if RAG unavailable, use Memory only
- Log prompt token count, RAG usage, Memory usage
- Return detailed metadata (rag_used, memory_used, citations_count, metrics)

Evaluation Tools:
- Create tests/rag_eval.py for systematic quality testing
- Test fixed questions with expected doc_ids
- Save results to JSON and CSV
- Compare RAG Service vs Router results
- Track: citations, expected docs found, query times

Documentation:
- Create docs/RAG_METRICS_PLAN.md
- Plan for Prometheus metrics collection
- Grafana dashboard panels and alerts
- Implementation guide for metrics
This commit is contained in:
Apple
2025-11-16 05:12:19 -08:00
parent 382e661f1f
commit 1ed1181105
6 changed files with 769 additions and 57 deletions

View File

@@ -34,8 +34,11 @@ def ingest_parsed_document(
user_id: Optional user identifier
Returns:
Dictionary with ingest results (doc_count, status)
Dictionary with ingest results (doc_count, status, metrics)
"""
import time
ingest_start = time.time()
logger.info(f"Ingesting document: dao_id={dao_id}, doc_id={doc_id}")
try:
@@ -56,26 +59,53 @@ def ingest_parsed_document(
pipeline = _create_ingest_pipeline()
# Run pipeline
pipeline_start = time.time()
result = pipeline.run({"documents": documents})
pipeline_time = time.time() - pipeline_start
# Extract results
written_docs = result.get("documents_writer", {}).get("documents_written", 0)
logger.info(f"Ingested {written_docs} documents for doc_id={doc_id}")
# Calculate metrics
total_time = time.time() - ingest_start
pages_count = len(parsed_json.get("pages", []))
blocks_count = sum(
len(page.get("blocks", []))
for page in parsed_json.get("pages", [])
)
logger.info(
f"Ingested {written_docs} documents for doc_id={doc_id}: "
f"pages={pages_count}, blocks={blocks_count}, "
f"pipeline_time={pipeline_time:.2f}s, total_time={total_time:.2f}s"
)
return {
"status": "success",
"doc_count": written_docs,
"dao_id": dao_id,
"doc_id": doc_id
"doc_id": doc_id,
"metrics": {
"pages_processed": pages_count,
"blocks_processed": blocks_count,
"documents_indexed": written_docs,
"pipeline_time_seconds": round(pipeline_time, 2),
"total_time_seconds": round(total_time, 2)
}
}
except Exception as e:
logger.error(f"Failed to ingest document: {e}", exc_info=True)
total_time = time.time() - ingest_start
logger.error(f"Ingest failed after {total_time:.2f}s: {e}")
return {
"status": "error",
"message": str(e),
"doc_count": 0
"doc_count": 0,
"metrics": {
"total_time_seconds": round(total_time, 2),
"error": str(e)
}
}

View File

@@ -37,23 +37,33 @@ async def answer_query(
Returns:
Dictionary with answer, citations, and retrieved documents
"""
import time
start_time = time.time()
logger.info(f"Answering query: dao_id={dao_id}, question={question[:50]}...")
top_k = top_k or settings.TOP_K
try:
# Retrieve relevant documents
documents = _retrieve_documents(dao_id, question, top_k)
documents, retrieval_metrics = _retrieve_documents(dao_id, question, top_k)
if not documents:
logger.warning(f"No documents found for dao_id={dao_id}")
elapsed_time = time.time() - start_time
return {
"answer": "На жаль, я не знайшов релевантної інформації в базі знань.",
"citations": [],
"documents": []
"documents": [],
"metrics": {
"documents_retrieved": 0,
"citations_count": 0,
"doc_ids": [],
"query_time_seconds": round(elapsed_time, 2)
}
}
logger.info(f"Retrieved {len(documents)} documents")
logger.info(f"Retrieved {len(documents)} documents (method: {retrieval_metrics.get('retrieval_method')})")
# Generate answer using LLM
answer = await _generate_answer(question, documents, dao_id, user_id)
@@ -61,6 +71,31 @@ async def answer_query(
# Build citations
citations = _build_citations(documents)
# Calculate metrics
elapsed_time = time.time() - start_time
doc_ids = list(set([doc.meta.get("doc_id", "unknown") for doc in documents]))
# Merge metrics
final_metrics = {
**retrieval_metrics,
"documents_retrieved": len(documents),
"citations_count": len(citations),
"doc_ids": doc_ids,
"total_query_time_seconds": round(elapsed_time, 2),
"answer_length": len(answer)
}
# Log metrics
logger.info(
f"RAG query completed: "
f"dao_id={dao_id}, "
f"documents_found={len(documents)}, "
f"citations={len(citations)}, "
f"doc_ids={doc_ids}, "
f"retrieval_time={retrieval_metrics.get('retrieval_time_seconds', 0):.2f}s, "
f"total_time={elapsed_time:.2f}s"
)
return {
"answer": answer,
"citations": citations,
@@ -70,15 +105,25 @@ async def answer_query(
"meta": doc.meta
}
for doc in documents
]
],
"metrics": final_metrics
}
except Exception as e:
logger.error(f"Failed to answer query: {e}", exc_info=True)
elapsed_time = time.time() - start_time
logger.error(f"RAG query failed after {elapsed_time:.2f}s: {e}")
return {
"answer": f"Помилка при обробці запиту: {str(e)}",
"citations": [],
"documents": []
"documents": [],
"metrics": {
"documents_retrieved": 0,
"citations_count": 0,
"doc_ids": [],
"query_time_seconds": round(elapsed_time, 2),
"error": str(e)
}
}
@@ -86,7 +131,7 @@ def _retrieve_documents(
dao_id: str,
question: str,
top_k: int
) -> List[Any]:
) -> tuple[List[Any], Dict[str, Any]]:
"""
Retrieve relevant documents from DocumentStore
@@ -96,8 +141,11 @@ def _retrieve_documents(
top_k: Number of documents to retrieve
Returns:
List of Haystack Document objects
Tuple of (List of Haystack Document objects, metrics dict)
"""
import time
retrieval_start = time.time()
# Get components
embedder = get_text_embedder()
document_store = get_document_store()
@@ -116,6 +164,7 @@ def _retrieve_documents(
top_k=top_k,
return_embedding=False
)
retrieval_method = "vector_search"
except Exception as e:
logger.warning(f"Vector search failed: {e}, trying filter_documents")
# Fallback to filter_documents
@@ -124,6 +173,7 @@ def _retrieve_documents(
top_k=top_k,
return_embedding=False
)
retrieval_method = "filter_documents"
# If no documents with filter, try without filter (fallback)
if not documents:
@@ -135,14 +185,28 @@ def _retrieve_documents(
top_k=top_k,
return_embedding=False
)
retrieval_method = "vector_search_no_filter"
except Exception:
documents = document_store.filter_documents(
filters=None,
top_k=top_k,
return_embedding=False
)
retrieval_method = "filter_documents_no_filter"
return documents
retrieval_time = time.time() - retrieval_start
# Build metrics
doc_ids = list(set([doc.meta.get("doc_id", "unknown") for doc in documents]))
metrics = {
"retrieval_method": retrieval_method,
"retrieval_time_seconds": round(retrieval_time, 2),
"documents_found": len(documents),
"doc_ids": doc_ids,
"filters_applied": {"dao_id": dao_id}
}
return documents, metrics
async def _generate_answer(