Optimized Prompts: - Create utils/rag_prompt_builder.py with citation-optimized prompts - Specialized for DAO tokenomics and technical documentation - Proper citation format [1], [2] with doc_id, page, section - Memory context integration (facts, events, summaries) - Token count estimation RAG Service Metrics: - Add comprehensive logging in query_pipeline.py - Log: question, doc_ids, scores, retrieval method, timing - Track: retrieval_time, total_query_time, documents_found, citations_count - Add metrics in ingest_pipeline.py: pages_processed, blocks_processed, pipeline_time Router Improvements: - Use optimized prompt builder in _handle_rag_query() - Add graceful fallback: if RAG unavailable, use Memory only - Log prompt token count, RAG usage, Memory usage - Return detailed metadata (rag_used, memory_used, citations_count, metrics) Evaluation Tools: - Create tests/rag_eval.py for systematic quality testing - Test fixed questions with expected doc_ids - Save results to JSON and CSV - Compare RAG Service vs Router results - Track: citations, expected docs found, query times Documentation: - Create docs/RAG_METRICS_PLAN.md - Plan for Prometheus metrics collection - Grafana dashboard panels and alerts - Implementation guide for metrics
222 lines
6.6 KiB
Python
222 lines
6.6 KiB
Python
"""
|
|
Ingest Pipeline: PARSER → RAG
|
|
Converts ParsedDocument to Haystack Documents and indexes them
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
from haystack import Pipeline
|
|
from haystack.components.preprocessors import DocumentSplitter
|
|
from haystack.components.writers import DocumentWriter
|
|
from haystack.schema import Document
|
|
|
|
from app.document_store import get_document_store
|
|
from app.embedding import get_text_embedder
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def ingest_parsed_document(
|
|
dao_id: str,
|
|
doc_id: str,
|
|
parsed_json: Dict[str, Any],
|
|
user_id: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Ingest parsed document from PARSER service into RAG
|
|
|
|
Args:
|
|
dao_id: DAO identifier
|
|
doc_id: Document identifier
|
|
parsed_json: ParsedDocument JSON from PARSER service
|
|
user_id: Optional user identifier
|
|
|
|
Returns:
|
|
Dictionary with ingest results (doc_count, status, metrics)
|
|
"""
|
|
import time
|
|
ingest_start = time.time()
|
|
|
|
logger.info(f"Ingesting document: dao_id={dao_id}, doc_id={doc_id}")
|
|
|
|
try:
|
|
# Convert parsed_json to Haystack Documents
|
|
documents = _parsed_json_to_documents(parsed_json, dao_id, doc_id, user_id)
|
|
|
|
if not documents:
|
|
logger.warning(f"No documents to ingest for doc_id={doc_id}")
|
|
return {
|
|
"status": "error",
|
|
"message": "No documents to ingest",
|
|
"doc_count": 0
|
|
}
|
|
|
|
logger.info(f"Converted {len(documents)} blocks to Haystack Documents")
|
|
|
|
# Create ingest pipeline
|
|
pipeline = _create_ingest_pipeline()
|
|
|
|
# Run pipeline
|
|
pipeline_start = time.time()
|
|
result = pipeline.run({"documents": documents})
|
|
pipeline_time = time.time() - pipeline_start
|
|
|
|
# Extract results
|
|
written_docs = result.get("documents_writer", {}).get("documents_written", 0)
|
|
|
|
# Calculate metrics
|
|
total_time = time.time() - ingest_start
|
|
pages_count = len(parsed_json.get("pages", []))
|
|
blocks_count = sum(
|
|
len(page.get("blocks", []))
|
|
for page in parsed_json.get("pages", [])
|
|
)
|
|
|
|
logger.info(
|
|
f"Ingested {written_docs} documents for doc_id={doc_id}: "
|
|
f"pages={pages_count}, blocks={blocks_count}, "
|
|
f"pipeline_time={pipeline_time:.2f}s, total_time={total_time:.2f}s"
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"doc_count": written_docs,
|
|
"dao_id": dao_id,
|
|
"doc_id": doc_id,
|
|
"metrics": {
|
|
"pages_processed": pages_count,
|
|
"blocks_processed": blocks_count,
|
|
"documents_indexed": written_docs,
|
|
"pipeline_time_seconds": round(pipeline_time, 2),
|
|
"total_time_seconds": round(total_time, 2)
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to ingest document: {e}", exc_info=True)
|
|
total_time = time.time() - ingest_start
|
|
logger.error(f"Ingest failed after {total_time:.2f}s: {e}")
|
|
return {
|
|
"status": "error",
|
|
"message": str(e),
|
|
"doc_count": 0,
|
|
"metrics": {
|
|
"total_time_seconds": round(total_time, 2),
|
|
"error": str(e)
|
|
}
|
|
}
|
|
|
|
|
|
def _parsed_json_to_documents(
|
|
parsed_json: Dict[str, Any],
|
|
dao_id: str,
|
|
doc_id: str,
|
|
user_id: Optional[str] = None
|
|
) -> List[Document]:
|
|
"""
|
|
Convert ParsedDocument JSON to Haystack Documents
|
|
|
|
Args:
|
|
parsed_json: ParsedDocument JSON structure
|
|
dao_id: DAO identifier
|
|
doc_id: Document identifier
|
|
user_id: Optional user identifier
|
|
|
|
Returns:
|
|
List of Haystack Document objects
|
|
"""
|
|
documents = []
|
|
|
|
# Extract pages from parsed_json
|
|
pages = parsed_json.get("pages", [])
|
|
|
|
for page_data in pages:
|
|
page_num = page_data.get("page_num", 1)
|
|
blocks = page_data.get("blocks", [])
|
|
|
|
for block in blocks:
|
|
# Skip empty blocks
|
|
text = block.get("text", "").strip()
|
|
if not text:
|
|
continue
|
|
|
|
# Build metadata (must-have для RAG)
|
|
meta = {
|
|
"dao_id": dao_id,
|
|
"doc_id": doc_id,
|
|
"page": page_num,
|
|
"block_type": block.get("type", "paragraph"),
|
|
"reading_order": block.get("reading_order", 0)
|
|
}
|
|
|
|
# Add optional fields
|
|
if block.get("bbox"):
|
|
bbox = block["bbox"]
|
|
meta.update({
|
|
"bbox_x": bbox.get("x", 0),
|
|
"bbox_y": bbox.get("y", 0),
|
|
"bbox_width": bbox.get("width", 0),
|
|
"bbox_height": bbox.get("height", 0)
|
|
})
|
|
|
|
# Add section if heading
|
|
if block.get("type") == "heading":
|
|
meta["section"] = text[:100] # First 100 chars as section name
|
|
|
|
# Add user_id if provided
|
|
if user_id:
|
|
meta["user_id"] = user_id
|
|
|
|
# Add document-level metadata
|
|
if parsed_json.get("metadata"):
|
|
meta.update({
|
|
k: v for k, v in parsed_json["metadata"].items()
|
|
if k not in ["dao_id"] # Already added
|
|
})
|
|
|
|
# Create Haystack Document
|
|
doc = Document(
|
|
content=text,
|
|
meta=meta
|
|
)
|
|
|
|
documents.append(doc)
|
|
|
|
return documents
|
|
|
|
|
|
def _create_ingest_pipeline() -> Pipeline:
|
|
"""
|
|
Create Haystack ingest pipeline
|
|
|
|
Pipeline: DocumentSplitter → Embedder → DocumentWriter
|
|
"""
|
|
# Get components
|
|
embedder = get_text_embedder()
|
|
document_store = get_document_store()
|
|
|
|
# Create splitter (optional, if chunks are too large)
|
|
splitter = DocumentSplitter(
|
|
split_by="sentence",
|
|
split_length=settings.CHUNK_SIZE,
|
|
split_overlap=settings.CHUNK_OVERLAP
|
|
)
|
|
|
|
# Create writer
|
|
writer = DocumentWriter(document_store)
|
|
|
|
# Build pipeline
|
|
pipeline = Pipeline()
|
|
pipeline.add_component("splitter", splitter)
|
|
pipeline.add_component("embedder", embedder)
|
|
pipeline.add_component("documents_writer", writer)
|
|
|
|
# Connect components
|
|
pipeline.connect("splitter", "embedder")
|
|
pipeline.connect("embedder", "documents_writer")
|
|
|
|
return pipeline
|
|
|