Files
microdao-daarion/gateway-bot/services/doc_service.py

625 lines
21 KiB
Python

"""
Document Workflow Service
Channel-agnostic service for document parsing, ingestion, and RAG queries.
This service can be used by:
- Telegram bots
- Web applications
- Mobile apps
- Any other client
"""
import os
import logging
import hashlib
from typing import Optional, Dict, Any, List
from pydantic import BaseModel
from datetime import datetime
from router_client import send_to_router
from memory_client import memory_client
logger = logging.getLogger(__name__)
class QAItem(BaseModel):
"""Single Q&A pair"""
question: str
answer: str
class ParsedResult(BaseModel):
"""Result of document parsing"""
success: bool
doc_id: Optional[str] = None
qa_pairs: Optional[List[QAItem]] = None
markdown: Optional[str] = None
chunks_meta: Optional[Dict[str, Any]] = None
raw: Optional[Dict[str, Any]] = None
error: Optional[str] = None
class IngestResult(BaseModel):
"""Result of document ingestion to RAG"""
success: bool
doc_id: Optional[str] = None
ingested_chunks: int = 0
status: str = "unknown"
error: Optional[str] = None
class QAResult(BaseModel):
"""Result of RAG query about a document"""
success: bool
answer: Optional[str] = None
doc_id: Optional[str] = None
sources: Optional[List[Dict[str, Any]]] = None
error: Optional[str] = None
class DocContext(BaseModel):
"""Document context stored in Memory Service"""
doc_id: str
dao_id: Optional[str] = None
user_id: Optional[str] = None
doc_url: Optional[str] = None
file_name: Optional[str] = None
saved_at: Optional[str] = None
class DocumentService:
"""
Channel-agnostic service for document operations.
Handles:
- Document parsing (PDF, images)
- Document ingestion to RAG
- RAG queries about documents
"""
def __init__(self):
"""Initialize document service"""
self.memory_client = memory_client
async def save_doc_context(
self,
session_id: str,
doc_id: str,
doc_url: Optional[str] = None,
file_name: Optional[str] = None,
dao_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> bool:
"""
Save document context for a session.
Uses Memory Service to persist document context across channels.
Args:
session_id: Session identifier (e.g., "telegram:123", "web:user456")
doc_id: Document ID from parser
doc_url: Optional document URL
file_name: Optional file name
dao_id: Optional DAO ID
Returns:
True if saved successfully
"""
try:
# Extract fallback user_id from session_id if not provided.
# Format: "channel:identifier" or "channel:user_id"
parts = session_id.split(":", 1)
fact_user_id = user_id or (parts[1] if len(parts) > 1 else session_id)
# Save as fact in Memory Service
fact_key = f"doc_context:{session_id}"
fact_value_json = {
"doc_id": doc_id,
"doc_url": doc_url,
"file_name": file_name,
"dao_id": dao_id,
"user_id": user_id,
"saved_at": datetime.utcnow().isoformat()
}
result = await self.memory_client.upsert_fact(
user_id=fact_user_id,
fact_key=fact_key,
fact_value_json=fact_value_json,
# Keep doc context globally addressable for follow-up calls
# that may not include dao_id/team_id in retrieval.
team_id=None,
)
logger.info(f"Saved doc context for session {session_id}: doc_id={doc_id}")
return result
except Exception as e:
logger.error(f"Failed to save doc context: {e}", exc_info=True)
return False
async def get_doc_context(self, session_id: str) -> Optional[DocContext]:
"""
Get document context for a session.
Args:
session_id: Session identifier
Returns:
DocContext or None
"""
try:
parts = session_id.split(":", 1)
user_id = parts[1] if len(parts) > 1 else session_id
fact_key = f"doc_context:{session_id}"
# Get fact from Memory Service
fact = await self.memory_client.get_fact(
user_id=user_id,
fact_key=fact_key
)
if fact and fact.get("fact_value_json"):
logger.debug(f"Retrieved doc context for session {session_id}")
ctx_data = fact.get("fact_value_json")
return DocContext(**ctx_data)
return None
except Exception as e:
logger.error(f"Failed to get doc context: {e}", exc_info=True)
return None
async def parse_document(
self,
session_id: str,
doc_url: str,
file_name: str,
dao_id: str,
user_id: str,
output_mode: str = "qa_pairs",
metadata: Optional[Dict[str, Any]] = None
) -> ParsedResult:
"""
Parse a document directly through Swapper service.
Args:
session_id: Session identifier (e.g., "telegram:123", "web:user456")
doc_url: URL to the document file
file_name: Name of the file
dao_id: DAO identifier
user_id: User identifier
output_mode: Output format ("qa_pairs", "markdown", "chunks", "text")
metadata: Optional additional metadata
Returns:
ParsedResult with parsed data
"""
import httpx
SWAPPER_URL = os.getenv("SWAPPER_URL", "http://swapper-service:8890")
try:
logger.info(f"Parsing document: session={session_id}, file={file_name}, mode={output_mode}")
# Download the document first
async with httpx.AsyncClient(timeout=60.0) as client:
doc_response = await client.get(doc_url)
if doc_response.status_code != 200:
return ParsedResult(
success=False,
error=f"Failed to download document: {doc_response.status_code}"
)
doc_content = doc_response.content
# Send directly to Swapper /document endpoint
async with httpx.AsyncClient(timeout=30.0) as client:
# Map output_mode: qa_pairs -> text (Swapper doesn't support qa_pairs directly)
swapper_mode = "markdown" if output_mode in ["qa_pairs", "markdown"] else "text"
mime_type = "application/octet-stream"
if file_name:
import mimetypes
mime_type = mimetypes.guess_type(file_name)[0] or mime_type
files = {"file": (file_name, doc_content, mime_type)}
data = {"output_format": swapper_mode}
swapper_response = await client.post(
f"{SWAPPER_URL}/document",
files=files,
data=data
)
if swapper_response.status_code == 200:
response = {"ok": True, "data": swapper_response.json()}
else:
logger.error(f"Swapper document error: {swapper_response.status_code} - {swapper_response.text[:200]}")
return ParsedResult(
success=False,
error=f"Document parsing failed: {swapper_response.status_code}"
)
if not isinstance(response, dict):
return ParsedResult(
success=False,
error="Invalid response from Swapper"
)
data = response.get("data", {})
# Swapper returns: {success, model, output_format, result, filename, processing_time_ms}
parsed_text = data.get("result", "")
output_format = data.get("output_format", "text")
model_used = data.get("model", "unknown")
logger.info(f"Document parsed: {len(parsed_text)} chars using {model_used}")
# Generate a simple doc_id based on filename and timestamp
doc_id = hashlib.md5(f"{file_name}:{datetime.utcnow().isoformat()}".encode()).hexdigest()[:12]
# Save document context for follow-up queries
await self.save_doc_context(
session_id=session_id,
doc_id=doc_id,
doc_url=doc_url,
file_name=file_name,
dao_id=dao_id,
user_id=user_id,
)
# Convert text to markdown format
markdown = parsed_text if output_format == "markdown" else f"```\n{parsed_text}\n```"
# No QA pairs from direct parsing - would need LLM for that
qa_pairs = None
chunks = []
chunks_meta = None
if chunks:
chunks_meta = {
"count": len(chunks),
"chunks": chunks[:3] if len(chunks) > 3 else chunks # Sample
}
return ParsedResult(
success=True,
doc_id=doc_id,
qa_pairs=qa_pairs,
markdown=markdown,
chunks_meta=chunks_meta,
raw=data,
error=None
)
except Exception as e:
logger.error(f"Document parsing via Swapper failed: {e}")
# === FALLBACK: Try PyPDF2 for PDF files ===
if file_name and file_name.lower().endswith(".pdf"):
try:
logger.info(f"Fallback: parsing PDF with PyPDF2: {file_name}")
import io
import PyPDF2
reader = PyPDF2.PdfReader(io.BytesIO(doc_content))
parsed_text = ""
for page in reader.pages:
text = page.extract_text() or ""
parsed_text += text + "\n"
parsed_text = parsed_text.strip()
if len(parsed_text) > 30:
logger.info(f"PyPDF2 fallback success: {len(parsed_text)} chars from {len(reader.pages)} pages")
doc_id = hashlib.md5(f"{file_name}:{datetime.utcnow().isoformat()}".encode()).hexdigest()[:12]
await self.save_doc_context(
session_id=session_id,
doc_id=doc_id,
doc_url=doc_url,
file_name=file_name,
dao_id=dao_id,
user_id=user_id,
)
return ParsedResult(
success=True,
doc_id=doc_id,
qa_pairs=None,
markdown=parsed_text,
chunks_meta=None,
raw={"model": "PyPDF2-fallback", "pages": len(reader.pages)},
error=None
)
else:
logger.warning(f"PyPDF2 fallback: too little text ({len(parsed_text)} chars)")
except Exception as pdf_err:
logger.error(f"PyPDF2 fallback also failed: {pdf_err}")
# === END FALLBACK ===
return ParsedResult(
success=False,
error=str(e)
)
async def ingest_document(
self,
session_id: str,
doc_id: Optional[str] = None,
doc_url: Optional[str] = None,
file_name: Optional[str] = None,
dao_id: str = None,
user_id: str = None
) -> IngestResult:
"""
Ingest document chunks into RAG/Memory.
Args:
session_id: Session identifier
doc_id: Document ID (if already parsed)
doc_url: Document URL (if need to parse first)
file_name: File name
dao_id: DAO identifier
user_id: User identifier
Returns:
IngestResult with ingestion status
"""
try:
# If doc_id not provided, try to get from context
if not doc_id:
doc_context = await self.get_doc_context(session_id)
if doc_context:
doc_id = doc_context.doc_id
doc_url = doc_url or doc_context.doc_url
file_name = file_name or doc_context.file_name
dao_id = dao_id or doc_context.dao_id
if not doc_id and not doc_url:
return IngestResult(
success=False,
error="No document ID or URL provided"
)
# Build request to Router with ingest flag
router_request = {
"mode": "doc_parse",
"agent": "parser",
"metadata": {
"source": self._extract_source(session_id),
"dao_id": dao_id,
"user_id": user_id,
"session_id": session_id,
},
"payload": {
"output_mode": "chunks", # Use chunks for RAG ingestion
"dao_id": dao_id,
"user_id": user_id,
"ingest": True, # Flag for ingestion
},
}
if doc_url:
router_request["payload"]["doc_url"] = doc_url
router_request["payload"]["file_name"] = file_name or "document.pdf"
if doc_id:
router_request["payload"]["doc_id"] = doc_id
logger.info(f"Ingesting document: session={session_id}, doc_id={doc_id}")
# Send to Router
response = await send_to_router(router_request)
if not isinstance(response, dict):
return IngestResult(
success=False,
error="Invalid response from router"
)
data = response.get("data", {})
chunks = data.get("chunks", [])
if chunks:
return IngestResult(
success=True,
doc_id=doc_id or data.get("doc_id"),
ingested_chunks=len(chunks),
status="ingested"
)
else:
return IngestResult(
success=False,
status="failed",
error="No chunks to ingest"
)
except Exception as e:
logger.error(f"Document ingestion failed: {e}", exc_info=True)
return IngestResult(
success=False,
error=str(e)
)
async def ask_about_document(
self,
session_id: str,
question: str,
doc_id: Optional[str] = None,
dao_id: Optional[str] = None,
user_id: Optional[str] = None
) -> QAResult:
"""
Ask a question about a document using RAG query.
Args:
session_id: Session identifier
question: Question text
doc_id: Document ID (if None, tries to get from context)
dao_id: DAO identifier
user_id: User identifier
Returns:
QAResult with answer and citations
"""
try:
# If doc_id not provided, try to get from context
if not doc_id:
doc_context = await self.get_doc_context(session_id)
if doc_context:
doc_id = doc_context.doc_id
dao_id = dao_id or doc_context.dao_id
if not doc_id:
return QAResult(
success=False,
error="No document context found. Parse a document first."
)
# Extract user_id from session_id if not provided
if not user_id:
parts = session_id.split(":", 1)
user_id = parts[1] if len(parts) > 1 else session_id
# Build RAG query request
router_request = {
"mode": "rag_query",
"agent": "daarwizz",
"metadata": {
"source": self._extract_source(session_id),
"dao_id": dao_id,
"user_id": user_id,
"session_id": session_id,
},
"payload": {
"question": question,
"dao_id": dao_id,
"user_id": user_id,
"doc_id": doc_id,
},
}
logger.info(f"RAG query: session={session_id}, question={question[:50]}, doc_id={doc_id}")
# Send to Router
response = await send_to_router(router_request)
if not isinstance(response, dict):
return QAResult(
success=False,
error="Invalid response from router"
)
data = response.get("data", {})
answer = data.get("answer") or data.get("text")
sources = data.get("citations", []) or data.get("sources", [])
if answer:
return QAResult(
success=True,
answer=answer,
doc_id=doc_id,
sources=sources if sources else None
)
else:
return QAResult(
success=False,
error="No answer from RAG query"
)
except Exception as e:
logger.error(f"RAG query failed: {e}", exc_info=True)
return QAResult(
success=False,
error=str(e)
)
def _extract_source(self, session_id: str) -> str:
"""Extract source channel from session_id"""
parts = session_id.split(":", 1)
return parts[0] if len(parts) > 1 else "unknown"
# Global instance
doc_service = DocumentService()
# Export functions for convenience
async def parse_document(
session_id: str,
doc_url: str,
file_name: str,
dao_id: str,
user_id: str,
output_mode: str = "qa_pairs",
metadata: Optional[Dict[str, Any]] = None
) -> ParsedResult:
"""Parse a document through DAGI Router"""
return await doc_service.parse_document(
session_id=session_id,
doc_url=doc_url,
file_name=file_name,
dao_id=dao_id,
user_id=user_id,
output_mode=output_mode,
metadata=metadata
)
async def ingest_document(
session_id: str,
doc_id: Optional[str] = None,
doc_url: Optional[str] = None,
file_name: Optional[str] = None,
dao_id: Optional[str] = None,
user_id: Optional[str] = None
) -> IngestResult:
"""Ingest document chunks into RAG/Memory"""
return await doc_service.ingest_document(
session_id=session_id,
doc_id=doc_id,
doc_url=doc_url,
file_name=file_name,
dao_id=dao_id,
user_id=user_id
)
async def ask_about_document(
session_id: str,
question: str,
doc_id: Optional[str] = None,
dao_id: Optional[str] = None,
user_id: Optional[str] = None
) -> QAResult:
"""Ask a question about a document using RAG query"""
return await doc_service.ask_about_document(
session_id=session_id,
question=question,
doc_id=doc_id,
dao_id=dao_id,
user_id=user_id
)
async def save_doc_context(
session_id: str,
doc_id: str,
doc_url: Optional[str] = None,
file_name: Optional[str] = None,
dao_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> bool:
"""Save document context for a session"""
return await doc_service.save_doc_context(
session_id=session_id,
doc_id=doc_id,
doc_url=doc_url,
file_name=file_name,
dao_id=dao_id,
user_id=user_id,
)
async def get_doc_context(session_id: str) -> Optional[DocContext]:
"""Get document context for a session"""
return await doc_service.get_doc_context(session_id)