- Vision Encoder Service (OpenCLIP ViT-L/14, GPU-accelerated)
- FastAPI app with text/image embedding endpoints (768-dim)
- Docker support with NVIDIA GPU runtime
- Port 8001, health checks, model info API
- Qdrant Vector Database integration
- Port 6333/6334 (HTTP/gRPC)
- Image embeddings storage (768-dim, Cosine distance)
- Auto collection creation
- Vision RAG implementation
- VisionEncoderClient (Python client for API)
- Image Search module (text-to-image, image-to-image)
- Vision RAG routing in DAGI Router (mode: image_search)
- VisionEncoderProvider integration
- Documentation (5000+ lines)
- SYSTEM-INVENTORY.md - Complete system inventory
- VISION-ENCODER-STATUS.md - Service status
- VISION-RAG-IMPLEMENTATION.md - Implementation details
- vision_encoder_deployment_task.md - Deployment checklist
- services/vision-encoder/README.md - Deployment guide
- Updated WARP.md, INFRASTRUCTURE.md, Jupyter Notebook
- Testing
- test-vision-encoder.sh - Smoke tests (6 tests)
- Unit tests for client, image search, routing
- Services: 17 total (added Vision Encoder + Qdrant)
- AI Models: 3 (qwen3:8b, OpenCLIP ViT-L/14, BAAI/bge-m3)
- GPU Services: 2 (Vision Encoder, Ollama)
- VRAM Usage: ~10 GB (concurrent)
Status: Production Ready ✅
556 lines
17 KiB
Python
556 lines
17 KiB
Python
"""
|
|
Document Workflow Service
|
|
Channel-agnostic service for document parsing, ingestion, and RAG queries.
|
|
|
|
This service can be used by:
|
|
- Telegram bots
|
|
- Web applications
|
|
- Mobile apps
|
|
- Any other client
|
|
"""
|
|
import logging
|
|
from typing import Optional, Dict, Any, List
|
|
from pydantic import BaseModel
|
|
from datetime import datetime
|
|
|
|
from router_client import send_to_router
|
|
from memory_client import memory_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class QAItem(BaseModel):
|
|
"""Single Q&A pair"""
|
|
question: str
|
|
answer: str
|
|
|
|
|
|
class ParsedResult(BaseModel):
|
|
"""Result of document parsing"""
|
|
success: bool
|
|
doc_id: Optional[str] = None
|
|
qa_pairs: Optional[List[QAItem]] = None
|
|
markdown: Optional[str] = None
|
|
chunks_meta: Optional[Dict[str, Any]] = None
|
|
raw: Optional[Dict[str, Any]] = None
|
|
error: Optional[str] = None
|
|
|
|
|
|
class IngestResult(BaseModel):
|
|
"""Result of document ingestion to RAG"""
|
|
success: bool
|
|
doc_id: Optional[str] = None
|
|
ingested_chunks: int = 0
|
|
status: str = "unknown"
|
|
error: Optional[str] = None
|
|
|
|
|
|
class QAResult(BaseModel):
|
|
"""Result of RAG query about a document"""
|
|
success: bool
|
|
answer: Optional[str] = None
|
|
doc_id: Optional[str] = None
|
|
sources: Optional[List[Dict[str, Any]]] = None
|
|
error: Optional[str] = None
|
|
|
|
|
|
class DocContext(BaseModel):
|
|
"""Document context stored in Memory Service"""
|
|
doc_id: str
|
|
dao_id: Optional[str] = None
|
|
user_id: Optional[str] = None
|
|
doc_url: Optional[str] = None
|
|
file_name: Optional[str] = None
|
|
saved_at: Optional[str] = None
|
|
|
|
|
|
class DocumentService:
|
|
"""
|
|
Channel-agnostic service for document operations.
|
|
|
|
Handles:
|
|
- Document parsing (PDF, images)
|
|
- Document ingestion to RAG
|
|
- RAG queries about documents
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize document service"""
|
|
self.memory_client = memory_client
|
|
|
|
async def save_doc_context(
|
|
self,
|
|
session_id: str,
|
|
doc_id: str,
|
|
doc_url: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
dao_id: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Save document context for a session.
|
|
|
|
Uses Memory Service to persist document context across channels.
|
|
|
|
Args:
|
|
session_id: Session identifier (e.g., "telegram:123", "web:user456")
|
|
doc_id: Document ID from parser
|
|
doc_url: Optional document URL
|
|
file_name: Optional file name
|
|
dao_id: Optional DAO ID
|
|
|
|
Returns:
|
|
True if saved successfully
|
|
"""
|
|
try:
|
|
# Extract user_id from session_id if possible
|
|
# Format: "channel:identifier" or "channel:user_id"
|
|
parts = session_id.split(":", 1)
|
|
user_id = parts[1] if len(parts) > 1 else session_id
|
|
|
|
# Save as fact in Memory Service
|
|
fact_key = f"doc_context:{session_id}"
|
|
fact_value_json = {
|
|
"doc_id": doc_id,
|
|
"doc_url": doc_url,
|
|
"file_name": file_name,
|
|
"dao_id": dao_id,
|
|
"saved_at": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
result = await self.memory_client.upsert_fact(
|
|
user_id=user_id,
|
|
fact_key=fact_key,
|
|
fact_value_json=fact_value_json,
|
|
team_id=dao_id
|
|
)
|
|
|
|
logger.info(f"Saved doc context for session {session_id}: doc_id={doc_id}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save doc context: {e}", exc_info=True)
|
|
return False
|
|
|
|
async def get_doc_context(self, session_id: str) -> Optional[DocContext]:
|
|
"""
|
|
Get document context for a session.
|
|
|
|
Args:
|
|
session_id: Session identifier
|
|
|
|
Returns:
|
|
DocContext or None
|
|
"""
|
|
try:
|
|
parts = session_id.split(":", 1)
|
|
user_id = parts[1] if len(parts) > 1 else session_id
|
|
|
|
fact_key = f"doc_context:{session_id}"
|
|
|
|
# Get fact from Memory Service
|
|
fact = await self.memory_client.get_fact(
|
|
user_id=user_id,
|
|
fact_key=fact_key
|
|
)
|
|
|
|
if fact and fact.get("fact_value_json"):
|
|
logger.debug(f"Retrieved doc context for session {session_id}")
|
|
ctx_data = fact.get("fact_value_json")
|
|
return DocContext(**ctx_data)
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get doc context: {e}", exc_info=True)
|
|
return None
|
|
|
|
async def parse_document(
|
|
self,
|
|
session_id: str,
|
|
doc_url: str,
|
|
file_name: str,
|
|
dao_id: str,
|
|
user_id: str,
|
|
output_mode: str = "qa_pairs",
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> ParsedResult:
|
|
"""
|
|
Parse a document through DAGI Router.
|
|
|
|
Args:
|
|
session_id: Session identifier (e.g., "telegram:123", "web:user456")
|
|
doc_url: URL to the document file
|
|
file_name: Name of the file
|
|
dao_id: DAO identifier
|
|
user_id: User identifier
|
|
output_mode: Output format ("qa_pairs", "markdown", "chunks")
|
|
metadata: Optional additional metadata
|
|
|
|
Returns:
|
|
ParsedResult with parsed data
|
|
"""
|
|
try:
|
|
# Build request to Router
|
|
router_request = {
|
|
"mode": "doc_parse",
|
|
"agent": "parser",
|
|
"metadata": {
|
|
"source": self._extract_source(session_id),
|
|
"dao_id": dao_id,
|
|
"user_id": user_id,
|
|
"session_id": session_id,
|
|
**(metadata or {})
|
|
},
|
|
"payload": {
|
|
"doc_url": doc_url,
|
|
"file_name": file_name,
|
|
"output_mode": output_mode,
|
|
"dao_id": dao_id,
|
|
"user_id": user_id,
|
|
},
|
|
}
|
|
|
|
logger.info(f"Parsing document: session={session_id}, file={file_name}, mode={output_mode}")
|
|
|
|
# Send to Router
|
|
response = await send_to_router(router_request)
|
|
|
|
if not isinstance(response, dict):
|
|
return ParsedResult(
|
|
success=False,
|
|
error="Invalid response from router"
|
|
)
|
|
|
|
data = response.get("data", {})
|
|
|
|
# Extract doc_id
|
|
doc_id = data.get("doc_id") or data.get("metadata", {}).get("doc_id")
|
|
|
|
# Save document context for follow-up queries
|
|
if doc_id:
|
|
await self.save_doc_context(
|
|
session_id=session_id,
|
|
doc_id=doc_id,
|
|
doc_url=doc_url,
|
|
file_name=file_name,
|
|
dao_id=dao_id
|
|
)
|
|
|
|
# Extract parsed data
|
|
qa_pairs_raw = data.get("qa_pairs", [])
|
|
qa_pairs = None
|
|
if qa_pairs_raw:
|
|
# Convert to QAItem list
|
|
try:
|
|
qa_pairs = [QAItem(**qa) if isinstance(qa, dict) else QAItem(question=qa.get("question", ""), answer=qa.get("answer", "")) for qa in qa_pairs_raw]
|
|
except Exception as e:
|
|
logger.warning(f"Failed to parse qa_pairs: {e}")
|
|
qa_pairs = None
|
|
|
|
markdown = data.get("markdown")
|
|
chunks = data.get("chunks", [])
|
|
chunks_meta = None
|
|
if chunks:
|
|
chunks_meta = {
|
|
"count": len(chunks),
|
|
"chunks": chunks[:3] if len(chunks) > 3 else chunks # Sample
|
|
}
|
|
|
|
return ParsedResult(
|
|
success=True,
|
|
doc_id=doc_id,
|
|
qa_pairs=qa_pairs,
|
|
markdown=markdown,
|
|
chunks_meta=chunks_meta,
|
|
raw=data,
|
|
error=None
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Document parsing failed: {e}", exc_info=True)
|
|
return ParsedResult(
|
|
success=False,
|
|
error=str(e)
|
|
)
|
|
|
|
async def ingest_document(
|
|
self,
|
|
session_id: str,
|
|
doc_id: Optional[str] = None,
|
|
doc_url: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
dao_id: str = None,
|
|
user_id: str = None
|
|
) -> IngestResult:
|
|
"""
|
|
Ingest document chunks into RAG/Memory.
|
|
|
|
Args:
|
|
session_id: Session identifier
|
|
doc_id: Document ID (if already parsed)
|
|
doc_url: Document URL (if need to parse first)
|
|
file_name: File name
|
|
dao_id: DAO identifier
|
|
user_id: User identifier
|
|
|
|
Returns:
|
|
IngestResult with ingestion status
|
|
"""
|
|
try:
|
|
# If doc_id not provided, try to get from context
|
|
if not doc_id:
|
|
doc_context = await self.get_doc_context(session_id)
|
|
if doc_context:
|
|
doc_id = doc_context.doc_id
|
|
doc_url = doc_url or doc_context.doc_url
|
|
file_name = file_name or doc_context.file_name
|
|
dao_id = dao_id or doc_context.dao_id
|
|
|
|
if not doc_id and not doc_url:
|
|
return IngestResult(
|
|
success=False,
|
|
error="No document ID or URL provided"
|
|
)
|
|
|
|
# Build request to Router with ingest flag
|
|
router_request = {
|
|
"mode": "doc_parse",
|
|
"agent": "parser",
|
|
"metadata": {
|
|
"source": self._extract_source(session_id),
|
|
"dao_id": dao_id,
|
|
"user_id": user_id,
|
|
"session_id": session_id,
|
|
},
|
|
"payload": {
|
|
"output_mode": "chunks", # Use chunks for RAG ingestion
|
|
"dao_id": dao_id,
|
|
"user_id": user_id,
|
|
"ingest": True, # Flag for ingestion
|
|
},
|
|
}
|
|
|
|
if doc_url:
|
|
router_request["payload"]["doc_url"] = doc_url
|
|
router_request["payload"]["file_name"] = file_name or "document.pdf"
|
|
|
|
if doc_id:
|
|
router_request["payload"]["doc_id"] = doc_id
|
|
|
|
logger.info(f"Ingesting document: session={session_id}, doc_id={doc_id}")
|
|
|
|
# Send to Router
|
|
response = await send_to_router(router_request)
|
|
|
|
if not isinstance(response, dict):
|
|
return IngestResult(
|
|
success=False,
|
|
error="Invalid response from router"
|
|
)
|
|
|
|
data = response.get("data", {})
|
|
chunks = data.get("chunks", [])
|
|
|
|
if chunks:
|
|
return IngestResult(
|
|
success=True,
|
|
doc_id=doc_id or data.get("doc_id"),
|
|
ingested_chunks=len(chunks),
|
|
status="ingested"
|
|
)
|
|
else:
|
|
return IngestResult(
|
|
success=False,
|
|
status="failed",
|
|
error="No chunks to ingest"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Document ingestion failed: {e}", exc_info=True)
|
|
return IngestResult(
|
|
success=False,
|
|
error=str(e)
|
|
)
|
|
|
|
async def ask_about_document(
|
|
self,
|
|
session_id: str,
|
|
question: str,
|
|
doc_id: Optional[str] = None,
|
|
dao_id: Optional[str] = None,
|
|
user_id: Optional[str] = None
|
|
) -> QAResult:
|
|
"""
|
|
Ask a question about a document using RAG query.
|
|
|
|
Args:
|
|
session_id: Session identifier
|
|
question: Question text
|
|
doc_id: Document ID (if None, tries to get from context)
|
|
dao_id: DAO identifier
|
|
user_id: User identifier
|
|
|
|
Returns:
|
|
QAResult with answer and citations
|
|
"""
|
|
try:
|
|
# If doc_id not provided, try to get from context
|
|
if not doc_id:
|
|
doc_context = await self.get_doc_context(session_id)
|
|
if doc_context:
|
|
doc_id = doc_context.doc_id
|
|
dao_id = dao_id or doc_context.dao_id
|
|
|
|
if not doc_id:
|
|
return QAResult(
|
|
success=False,
|
|
error="No document context found. Parse a document first."
|
|
)
|
|
|
|
# Extract user_id from session_id if not provided
|
|
if not user_id:
|
|
parts = session_id.split(":", 1)
|
|
user_id = parts[1] if len(parts) > 1 else session_id
|
|
|
|
# Build RAG query request
|
|
router_request = {
|
|
"mode": "rag_query",
|
|
"agent": "daarwizz",
|
|
"metadata": {
|
|
"source": self._extract_source(session_id),
|
|
"dao_id": dao_id,
|
|
"user_id": user_id,
|
|
"session_id": session_id,
|
|
},
|
|
"payload": {
|
|
"question": question,
|
|
"dao_id": dao_id,
|
|
"user_id": user_id,
|
|
"doc_id": doc_id,
|
|
},
|
|
}
|
|
|
|
logger.info(f"RAG query: session={session_id}, question={question[:50]}, doc_id={doc_id}")
|
|
|
|
# Send to Router
|
|
response = await send_to_router(router_request)
|
|
|
|
if not isinstance(response, dict):
|
|
return QAResult(
|
|
success=False,
|
|
error="Invalid response from router"
|
|
)
|
|
|
|
data = response.get("data", {})
|
|
answer = data.get("answer") or data.get("text")
|
|
sources = data.get("citations", []) or data.get("sources", [])
|
|
|
|
if answer:
|
|
return QAResult(
|
|
success=True,
|
|
answer=answer,
|
|
doc_id=doc_id,
|
|
sources=sources if sources else None
|
|
)
|
|
else:
|
|
return QAResult(
|
|
success=False,
|
|
error="No answer from RAG query"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"RAG query failed: {e}", exc_info=True)
|
|
return QAResult(
|
|
success=False,
|
|
error=str(e)
|
|
)
|
|
|
|
def _extract_source(self, session_id: str) -> str:
|
|
"""Extract source channel from session_id"""
|
|
parts = session_id.split(":", 1)
|
|
return parts[0] if len(parts) > 1 else "unknown"
|
|
|
|
|
|
# Global instance
|
|
doc_service = DocumentService()
|
|
|
|
# Export functions for convenience
|
|
async def parse_document(
|
|
session_id: str,
|
|
doc_url: str,
|
|
file_name: str,
|
|
dao_id: str,
|
|
user_id: str,
|
|
output_mode: str = "qa_pairs",
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> ParsedResult:
|
|
"""Parse a document through DAGI Router"""
|
|
return await doc_service.parse_document(
|
|
session_id=session_id,
|
|
doc_url=doc_url,
|
|
file_name=file_name,
|
|
dao_id=dao_id,
|
|
user_id=user_id,
|
|
output_mode=output_mode,
|
|
metadata=metadata
|
|
)
|
|
|
|
|
|
async def ingest_document(
|
|
session_id: str,
|
|
doc_id: Optional[str] = None,
|
|
doc_url: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
dao_id: Optional[str] = None,
|
|
user_id: Optional[str] = None
|
|
) -> IngestResult:
|
|
"""Ingest document chunks into RAG/Memory"""
|
|
return await doc_service.ingest_document(
|
|
session_id=session_id,
|
|
doc_id=doc_id,
|
|
doc_url=doc_url,
|
|
file_name=file_name,
|
|
dao_id=dao_id,
|
|
user_id=user_id
|
|
)
|
|
|
|
|
|
async def ask_about_document(
|
|
session_id: str,
|
|
question: str,
|
|
doc_id: Optional[str] = None,
|
|
dao_id: Optional[str] = None,
|
|
user_id: Optional[str] = None
|
|
) -> QAResult:
|
|
"""Ask a question about a document using RAG query"""
|
|
return await doc_service.ask_about_document(
|
|
session_id=session_id,
|
|
question=question,
|
|
doc_id=doc_id,
|
|
dao_id=dao_id,
|
|
user_id=user_id
|
|
)
|
|
|
|
|
|
async def save_doc_context(
|
|
session_id: str,
|
|
doc_id: str,
|
|
doc_url: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
dao_id: Optional[str] = None
|
|
) -> bool:
|
|
"""Save document context for a session"""
|
|
return await doc_service.save_doc_context(
|
|
session_id=session_id,
|
|
doc_id=doc_id,
|
|
doc_url=doc_url,
|
|
file_name=file_name,
|
|
dao_id=dao_id
|
|
)
|
|
|
|
|
|
async def get_doc_context(session_id: str) -> Optional[DocContext]:
|
|
"""Get document context for a session"""
|
|
return await doc_service.get_doc_context(session_id)
|
|
|