## Agents Added - Alateya: R&D, biotech, innovations - Clan (Spirit): Community spirit agent - Eonarch: Consciousness evolution agent ## Changes - docker-compose.node1.yml: Added tokens for all 3 new agents - gateway-bot/http_api.py: Added configs and webhook endpoints - gateway-bot/clan_prompt.txt: New prompt file - gateway-bot/eonarch_prompt.txt: New prompt file ## Fixes - Fixed ROUTER_URL from :9102 to :8000 (internal container port) - All 9 Telegram agents now working ## Documentation - Created PROJECT-MASTER-INDEX.md - single entry point - Added various status documents and scripts Tokens configured: - Helion, NUTRA, Agromatrix (existing) - Alateya, Clan, Eonarch (new) - Druid, GreenFood, DAARWIZZ (configured)
309 lines
9.7 KiB
Python
309 lines
9.7 KiB
Python
"""
|
||
Query Pipeline: RAG → LLM
|
||
Retrieves relevant documents and generates answers
|
||
"""
|
||
|
||
import logging
|
||
from typing import List, Dict, Any, Optional
|
||
import httpx
|
||
from app.document_store import get_document_store
|
||
from app.embedding import get_text_embedder
|
||
from app.core.config import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
async def answer_query(
|
||
dao_id: str,
|
||
question: str,
|
||
top_k: Optional[int] = None,
|
||
user_id: Optional[str] = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Answer query using RAG pipeline
|
||
|
||
Args:
|
||
dao_id: DAO identifier (for filtering)
|
||
question: User question
|
||
top_k: Number of documents to retrieve (default from settings)
|
||
user_id: Optional user identifier
|
||
|
||
Returns:
|
||
Dictionary with answer, citations, and retrieved documents
|
||
"""
|
||
import time
|
||
start_time = time.time()
|
||
|
||
logger.info(f"Answering query: dao_id={dao_id}, question={question[:50]}...")
|
||
|
||
top_k = top_k or settings.TOP_K
|
||
|
||
try:
|
||
# Retrieve relevant documents
|
||
documents, retrieval_metrics = _retrieve_documents(dao_id, question, top_k)
|
||
|
||
if not documents:
|
||
logger.warning(f"No documents found for dao_id={dao_id}")
|
||
elapsed_time = time.time() - start_time
|
||
return {
|
||
"answer": "На жаль, я не знайшов релевантної інформації в базі знань.",
|
||
"citations": [],
|
||
"documents": [],
|
||
"metrics": {
|
||
"documents_retrieved": 0,
|
||
"citations_count": 0,
|
||
"doc_ids": [],
|
||
"query_time_seconds": round(elapsed_time, 2)
|
||
}
|
||
}
|
||
|
||
logger.info(f"Retrieved {len(documents)} documents (method: {retrieval_metrics.get('retrieval_method')})")
|
||
|
||
# Generate answer using LLM
|
||
answer = await _generate_answer(question, documents, dao_id, user_id)
|
||
|
||
# Build citations
|
||
citations = _build_citations(documents)
|
||
|
||
# Calculate metrics
|
||
elapsed_time = time.time() - start_time
|
||
doc_ids = list(set([doc.meta.get("doc_id", "unknown") for doc in documents]))
|
||
|
||
# Merge metrics
|
||
final_metrics = {
|
||
**retrieval_metrics,
|
||
"documents_retrieved": len(documents),
|
||
"citations_count": len(citations),
|
||
"doc_ids": doc_ids,
|
||
"total_query_time_seconds": round(elapsed_time, 2),
|
||
"answer_length": len(answer)
|
||
}
|
||
|
||
# Log metrics
|
||
logger.info(
|
||
f"RAG query completed: "
|
||
f"dao_id={dao_id}, "
|
||
f"documents_found={len(documents)}, "
|
||
f"citations={len(citations)}, "
|
||
f"doc_ids={doc_ids}, "
|
||
f"retrieval_time={retrieval_metrics.get('retrieval_time_seconds', 0):.2f}s, "
|
||
f"total_time={elapsed_time:.2f}s"
|
||
)
|
||
|
||
return {
|
||
"answer": answer,
|
||
"citations": citations,
|
||
"documents": [
|
||
{
|
||
"content": doc.content[:200] + "..." if len(doc.content) > 200 else doc.content,
|
||
"meta": doc.meta
|
||
}
|
||
for doc in documents
|
||
],
|
||
"metrics": final_metrics
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to answer query: {e}", exc_info=True)
|
||
elapsed_time = time.time() - start_time
|
||
logger.error(f"RAG query failed after {elapsed_time:.2f}s: {e}")
|
||
return {
|
||
"answer": f"Помилка при обробці запиту: {str(e)}",
|
||
"citations": [],
|
||
"documents": [],
|
||
"metrics": {
|
||
"documents_retrieved": 0,
|
||
"citations_count": 0,
|
||
"doc_ids": [],
|
||
"query_time_seconds": round(elapsed_time, 2),
|
||
"error": str(e)
|
||
}
|
||
}
|
||
|
||
|
||
def _retrieve_documents(
|
||
dao_id: str,
|
||
question: str,
|
||
top_k: int
|
||
) -> tuple[List[Any], Dict[str, Any]]:
|
||
"""
|
||
Retrieve relevant documents from DocumentStore
|
||
|
||
Args:
|
||
dao_id: DAO identifier for filtering
|
||
question: Query text
|
||
top_k: Number of documents to retrieve
|
||
|
||
Returns:
|
||
Tuple of (List of Haystack Document objects, metrics dict)
|
||
"""
|
||
import time
|
||
retrieval_start = time.time()
|
||
|
||
# Get components
|
||
embedder = get_text_embedder()
|
||
document_store = get_document_store()
|
||
|
||
# Embed query
|
||
embedding_result = embedder.run(text=question)
|
||
query_embedding = embedding_result["embedding"][0] if isinstance(embedding_result["embedding"], list) else embedding_result["embedding"]
|
||
|
||
# Retrieve with filters using vector similarity search
|
||
filters = {"dao_id": [dao_id]}
|
||
|
||
try:
|
||
documents = document_store.search(
|
||
query_embedding=query_embedding,
|
||
filters=filters,
|
||
top_k=top_k,
|
||
return_embedding=False
|
||
)
|
||
retrieval_method = "vector_search"
|
||
except Exception as e:
|
||
logger.warning(f"Vector search failed: {e}, trying filter_documents")
|
||
# Fallback to filter_documents
|
||
documents = document_store.filter_documents(
|
||
filters=filters,
|
||
top_k=top_k,
|
||
return_embedding=False
|
||
)
|
||
retrieval_method = "filter_documents"
|
||
|
||
# If no documents with filter, try without filter (fallback)
|
||
if not documents:
|
||
logger.warning(f"No documents found with dao_id={dao_id}, trying without filter")
|
||
try:
|
||
documents = document_store.search(
|
||
query_embedding=query_embedding,
|
||
filters=None,
|
||
top_k=top_k,
|
||
return_embedding=False
|
||
)
|
||
retrieval_method = "vector_search_no_filter"
|
||
except Exception:
|
||
documents = document_store.filter_documents(
|
||
filters=None,
|
||
top_k=top_k,
|
||
return_embedding=False
|
||
)
|
||
retrieval_method = "filter_documents_no_filter"
|
||
|
||
retrieval_time = time.time() - retrieval_start
|
||
|
||
# Build metrics
|
||
doc_ids = list(set([doc.meta.get("doc_id", "unknown") for doc in documents]))
|
||
metrics = {
|
||
"retrieval_method": retrieval_method,
|
||
"retrieval_time_seconds": round(retrieval_time, 2),
|
||
"documents_found": len(documents),
|
||
"doc_ids": doc_ids,
|
||
"filters_applied": {"dao_id": dao_id}
|
||
}
|
||
|
||
return documents, metrics
|
||
|
||
|
||
async def _generate_answer(
|
||
question: str,
|
||
documents: List[Any],
|
||
dao_id: str,
|
||
user_id: Optional[str] = None
|
||
) -> str:
|
||
"""
|
||
Generate answer using LLM (via DAGI Router or OpenAI)
|
||
|
||
Args:
|
||
question: User question
|
||
documents: Retrieved documents
|
||
dao_id: DAO identifier
|
||
user_id: Optional user identifier
|
||
|
||
Returns:
|
||
Generated answer text
|
||
"""
|
||
# Build context from documents
|
||
context = "\n\n".join([
|
||
f"[Документ {idx+1}, сторінка {doc.meta.get('page', '?')}]: {doc.content[:500]}"
|
||
for idx, doc in enumerate(documents[:3]) # Limit to first 3 documents
|
||
])
|
||
|
||
# Build prompt
|
||
prompt = (
|
||
"Тобі надано контекст з бази знань та питання користувача.\n"
|
||
"Відповідай на основі наданого контексту. Якщо в контексті немає відповіді, "
|
||
"скажи що не знаєш.\n\n"
|
||
f"Контекст:\n{context}\n\n"
|
||
f"Питання: {question}\n\n"
|
||
"Відповідь:"
|
||
)
|
||
|
||
# Call LLM based on provider
|
||
if settings.LLM_PROVIDER == "router":
|
||
return await _call_router_llm(prompt, dao_id, user_id)
|
||
elif settings.LLM_PROVIDER == "openai" and settings.OPENAI_API_KEY:
|
||
return await _call_openai_llm(prompt)
|
||
else:
|
||
# Fallback: simple answer
|
||
return f"Знайдено {len(documents)} релевантних документів. Перший фрагмент: {documents[0].content[:200]}..."
|
||
|
||
|
||
async def _call_router_llm(
|
||
prompt: str,
|
||
dao_id: str,
|
||
user_id: Optional[str] = None
|
||
) -> str:
|
||
"""Call DAGI Router LLM"""
|
||
router_url = f"{settings.ROUTER_BASE_URL.rstrip('/')}/route"
|
||
|
||
payload = {
|
||
"mode": "chat",
|
||
"dao_id": dao_id,
|
||
"user_id": user_id or "rag-service",
|
||
"payload": {
|
||
"message": prompt
|
||
}
|
||
}
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||
resp = await client.post(router_url, json=payload)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
|
||
return data.get("data", {}).get("text", "Не вдалося отримати відповідь")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Router LLM call failed: {e}")
|
||
return f"Помилка при виклику LLM: {str(e)}"
|
||
|
||
|
||
async def _call_openai_llm(prompt: str) -> str:
|
||
"""Call OpenAI LLM"""
|
||
# TODO: Implement OpenAI client
|
||
return "OpenAI integration not yet implemented"
|
||
|
||
|
||
def _build_citations(documents: List[Any]) -> List[Dict[str, Any]]:
|
||
"""
|
||
Build citations from retrieved documents
|
||
|
||
Args:
|
||
documents: List of Haystack Documents
|
||
|
||
Returns:
|
||
List of citation dictionaries
|
||
"""
|
||
citations = []
|
||
|
||
for doc in documents:
|
||
meta = doc.meta
|
||
citations.append({
|
||
"doc_id": meta.get("doc_id", "unknown"),
|
||
"page": meta.get("page", 0),
|
||
"section": meta.get("section"),
|
||
"excerpt": doc.content[:200] + "..." if len(doc.content) > 200 else doc.content
|
||
})
|
||
|
||
return citations
|
||
|