""" Document Workflow Service Channel-agnostic service for document parsing, ingestion, and RAG queries. This service can be used by: - Telegram bots - Web applications - Mobile apps - Any other client """ import os import logging import hashlib import json import re from typing import Optional, Dict, Any, List from pydantic import BaseModel from datetime import datetime from router_client import send_to_router from memory_client import memory_client logger = logging.getLogger(__name__) SHARED_EXCEL_POLICY_AGENTS = {"agromatrix", "helion", "nutra", "greenfood"} class QAItem(BaseModel): """Single Q&A pair""" question: str answer: str class ParsedResult(BaseModel): """Result of document parsing""" success: bool doc_id: Optional[str] = None qa_pairs: Optional[List[QAItem]] = None markdown: Optional[str] = None chunks_meta: Optional[Dict[str, Any]] = None raw: Optional[Dict[str, Any]] = None error: Optional[str] = None class IngestResult(BaseModel): """Result of document ingestion to RAG""" success: bool doc_id: Optional[str] = None ingested_chunks: int = 0 status: str = "unknown" error: Optional[str] = None class QAResult(BaseModel): """Result of RAG query about a document""" success: bool answer: Optional[str] = None doc_id: Optional[str] = None sources: Optional[List[Dict[str, Any]]] = None error: Optional[str] = None class DocContext(BaseModel): """Document context stored in Memory Service""" doc_id: str dao_id: Optional[str] = None user_id: Optional[str] = None doc_url: Optional[str] = None file_name: Optional[str] = None saved_at: Optional[str] = None class DocumentService: """ Channel-agnostic service for document operations. Handles: - Document parsing (PDF, images) - Document ingestion to RAG - RAG queries about documents """ def __init__(self): """Initialize document service""" self.memory_client = memory_client def _is_excel_filename(self, file_name: Optional[str]) -> bool: if not file_name: return False lower = file_name.lower() return lower.endswith(".xlsx") or lower.endswith(".xls") def _is_numeric_question(self, question: str) -> bool: t = (question or "").lower() if not t: return False markers = [ "скільки", "сума", "витрат", "добрив", "грн", "uah", "usd", "eur", "сколько", "amount", "total", "spent", "cost", "value", ] return any(m in t for m in markers) def _extract_query_tokens(self, question: str) -> List[str]: tokens = re.findall(r"[a-zA-Zа-яА-ЯіїєґІЇЄҐ0-9]{3,}", (question or "").lower()) stop = { "яка", "який", "яке", "which", "what", "скільки", "сума", "була", "витрачена", "write", "show", "give", "please", "мені", "будь", "ласка", "тому", "цьому", "цей", "this", "that", "for", "and", "the", } return [t for t in tokens if t not in stop] async def _try_answer_excel_question( self, question: str, doc_url: Optional[str], file_name: Optional[str], ) -> Optional[str]: if not doc_url or not self._is_numeric_question(question): return None try: import httpx from io import BytesIO import openpyxl except Exception: return None query_tokens = self._extract_query_tokens(question) if not query_tokens: query_tokens = ["сума", "витрати", "добрив"] try: async with httpx.AsyncClient(timeout=20.0) as client: resp = await client.get(doc_url) if resp.status_code != 200: return None content = resp.content wb = openpyxl.load_workbook(BytesIO(content), data_only=True, read_only=True) best = None best_score = -1 fallback = None for ws in wb.worksheets: for row_idx, row in enumerate(ws.iter_rows(values_only=True), start=1): label = "" numeric_value = None for cell in row: if isinstance(cell, (int, float)) and numeric_value is None: numeric_value = float(cell) elif isinstance(cell, str) and not label: label = cell.strip() if numeric_value is None: continue label_low = label.lower() score = sum(1 for t in query_tokens if t in label_low) if score > best_score: best_score = score best = { "sheet": ws.title, "row": row_idx, "label": label or "n/a", "value": numeric_value, } if fallback is None and any(m in label_low for m in ("добрив", "fertiliz", "удобр")): fallback = { "sheet": ws.title, "row": row_idx, "label": label or "n/a", "value": numeric_value, } picked = best if best and best_score > 0 else fallback if not picked: return None value = picked["value"] if abs(value - int(value)) < 1e-9: value_str = f"{int(value):,}".replace(",", " ") else: value_str = f"{value:,.2f}".replace(",", " ").replace(".", ",") unit = "грн" if self._is_numeric_question(question) else "" unit_part = f" {unit}" if unit else "" file_part = f' у файлі "{file_name}"' if file_name else "" return ( f"За{file_part}: {value_str}{unit_part}. " f"Джерело: лист {picked['sheet']}, рядок {picked['row']} ({picked['label']})." ) except Exception as e: logger.warning(f"Excel deterministic answer failed: {e}") return None async def save_doc_context( self, session_id: str, doc_id: str, doc_url: Optional[str] = None, file_name: Optional[str] = None, dao_id: Optional[str] = None, user_id: Optional[str] = None, ) -> bool: """ Save document context for a session. Uses Memory Service to persist document context across channels. Args: session_id: Session identifier (e.g., "telegram:123", "web:user456") doc_id: Document ID from parser doc_url: Optional document URL file_name: Optional file name dao_id: Optional DAO ID Returns: True if saved successfully """ try: # Use stable synthetic user key per session, so context can be # retrieved later using only session_id (without caller user_id). fact_user_id = f"session:{session_id}" # Save as fact in Memory Service fact_key = f"doc_context:{session_id}" fact_value_json = { "doc_id": doc_id, "doc_url": doc_url, "file_name": file_name, "dao_id": dao_id, "user_id": user_id, "saved_at": datetime.utcnow().isoformat() } result = await self.memory_client.upsert_fact( user_id=fact_user_id, fact_key=fact_key, fact_value_json=fact_value_json, # Keep doc context globally addressable for follow-up calls # that may not include dao_id/team_id in retrieval. team_id=None, ) logger.info(f"Saved doc context for session {session_id}: doc_id={doc_id}") return result except Exception as e: logger.error(f"Failed to save doc context: {e}", exc_info=True) return False async def get_doc_context(self, session_id: str) -> Optional[DocContext]: """ Get document context for a session. Args: session_id: Session identifier Returns: DocContext or None """ try: user_id = f"session:{session_id}" fact_key = f"doc_context:{session_id}" # Get fact from Memory Service fact = await self.memory_client.get_fact( user_id=user_id, fact_key=fact_key ) if fact and fact.get("fact_value_json"): logger.debug(f"Retrieved doc context for session {session_id}") ctx_data = fact.get("fact_value_json") if isinstance(ctx_data, str): try: ctx_data = json.loads(ctx_data) except Exception: logger.warning("doc_context fact_value_json is not valid JSON string") return None return DocContext(**ctx_data) return None except Exception as e: logger.error(f"Failed to get doc context: {e}", exc_info=True) return None async def parse_document( self, session_id: str, doc_url: str, file_name: str, dao_id: str, user_id: str, output_mode: str = "qa_pairs", metadata: Optional[Dict[str, Any]] = None ) -> ParsedResult: """ Parse a document directly through Swapper service. Args: session_id: Session identifier (e.g., "telegram:123", "web:user456") doc_url: URL to the document file file_name: Name of the file dao_id: DAO identifier user_id: User identifier output_mode: Output format ("qa_pairs", "markdown", "chunks", "text") metadata: Optional additional metadata Returns: ParsedResult with parsed data """ import httpx SWAPPER_URL = os.getenv("SWAPPER_URL", "http://swapper-service:8890") try: logger.info(f"Parsing document: session={session_id}, file={file_name}, mode={output_mode}") # Download the document first async with httpx.AsyncClient(timeout=60.0) as client: doc_response = await client.get(doc_url) if doc_response.status_code != 200: return ParsedResult( success=False, error=f"Failed to download document: {doc_response.status_code}" ) doc_content = doc_response.content # Send directly to Swapper /document endpoint async with httpx.AsyncClient(timeout=30.0) as client: # Map output_mode: qa_pairs -> text (Swapper doesn't support qa_pairs directly) swapper_mode = "markdown" if output_mode in ["qa_pairs", "markdown"] else "text" mime_type = "application/octet-stream" if file_name: import mimetypes mime_type = mimetypes.guess_type(file_name)[0] or mime_type files = {"file": (file_name, doc_content, mime_type)} data = {"output_format": swapper_mode} swapper_response = await client.post( f"{SWAPPER_URL}/document", files=files, data=data ) if swapper_response.status_code == 200: response = {"ok": True, "data": swapper_response.json()} else: logger.error(f"Swapper document error: {swapper_response.status_code} - {swapper_response.text[:200]}") return ParsedResult( success=False, error=f"Document parsing failed: {swapper_response.status_code}" ) if not isinstance(response, dict): return ParsedResult( success=False, error="Invalid response from Swapper" ) data = response.get("data", {}) # Swapper returns: {success, model, output_format, result, filename, processing_time_ms} parsed_text = data.get("result", "") output_format = data.get("output_format", "text") model_used = data.get("model", "unknown") logger.info(f"Document parsed: {len(parsed_text)} chars using {model_used}") # Generate a simple doc_id based on filename and timestamp doc_id = hashlib.md5(f"{file_name}:{datetime.utcnow().isoformat()}".encode()).hexdigest()[:12] # Save document context for follow-up queries await self.save_doc_context( session_id=session_id, doc_id=doc_id, doc_url=doc_url, file_name=file_name, dao_id=dao_id, user_id=user_id, ) # Convert text to markdown format markdown = parsed_text if output_format == "markdown" else f"```\n{parsed_text}\n```" # No QA pairs from direct parsing - would need LLM for that qa_pairs = None chunks = [] chunks_meta = None if chunks: chunks_meta = { "count": len(chunks), "chunks": chunks[:3] if len(chunks) > 3 else chunks # Sample } return ParsedResult( success=True, doc_id=doc_id, qa_pairs=qa_pairs, markdown=markdown, chunks_meta=chunks_meta, raw=data, error=None ) except Exception as e: logger.error(f"Document parsing via Swapper failed: {e}") # === FALLBACK: Try PyPDF2 for PDF files === if file_name and file_name.lower().endswith(".pdf"): try: logger.info(f"Fallback: parsing PDF with PyPDF2: {file_name}") import io import PyPDF2 reader = PyPDF2.PdfReader(io.BytesIO(doc_content)) parsed_text = "" for page in reader.pages: text = page.extract_text() or "" parsed_text += text + "\n" parsed_text = parsed_text.strip() if len(parsed_text) > 30: logger.info(f"PyPDF2 fallback success: {len(parsed_text)} chars from {len(reader.pages)} pages") doc_id = hashlib.md5(f"{file_name}:{datetime.utcnow().isoformat()}".encode()).hexdigest()[:12] await self.save_doc_context( session_id=session_id, doc_id=doc_id, doc_url=doc_url, file_name=file_name, dao_id=dao_id, user_id=user_id, ) return ParsedResult( success=True, doc_id=doc_id, qa_pairs=None, markdown=parsed_text, chunks_meta=None, raw={"model": "PyPDF2-fallback", "pages": len(reader.pages)}, error=None ) else: logger.warning(f"PyPDF2 fallback: too little text ({len(parsed_text)} chars)") except Exception as pdf_err: logger.error(f"PyPDF2 fallback also failed: {pdf_err}") # === END FALLBACK === return ParsedResult( success=False, error=str(e) ) async def ingest_document( self, session_id: str, doc_id: Optional[str] = None, doc_url: Optional[str] = None, file_name: Optional[str] = None, dao_id: str = None, user_id: str = None ) -> IngestResult: """ Ingest document chunks into RAG/Memory. Args: session_id: Session identifier doc_id: Document ID (if already parsed) doc_url: Document URL (if need to parse first) file_name: File name dao_id: DAO identifier user_id: User identifier Returns: IngestResult with ingestion status """ try: # If doc_id not provided, try to get from context if not doc_id: doc_context = await self.get_doc_context(session_id) if doc_context: doc_id = doc_context.doc_id doc_url = doc_url or doc_context.doc_url file_name = file_name or doc_context.file_name dao_id = dao_id or doc_context.dao_id if not doc_id and not doc_url: return IngestResult( success=False, error="No document ID or URL provided" ) # Build request to Router with ingest flag router_request = { "mode": "doc_parse", "agent": "parser", "metadata": { "source": self._extract_source(session_id), "dao_id": dao_id, "user_id": user_id, "session_id": session_id, }, "payload": { "output_mode": "chunks", # Use chunks for RAG ingestion "dao_id": dao_id, "user_id": user_id, "ingest": True, # Flag for ingestion }, } if doc_url: router_request["payload"]["doc_url"] = doc_url router_request["payload"]["file_name"] = file_name or "document.pdf" if doc_id: router_request["payload"]["doc_id"] = doc_id logger.info(f"Ingesting document: session={session_id}, doc_id={doc_id}") # Send to Router response = await send_to_router(router_request) if not isinstance(response, dict): return IngestResult( success=False, error="Invalid response from router" ) data = response.get("data", {}) chunks = data.get("chunks", []) if chunks: return IngestResult( success=True, doc_id=doc_id or data.get("doc_id"), ingested_chunks=len(chunks), status="ingested" ) else: return IngestResult( success=False, status="failed", error="No chunks to ingest" ) except Exception as e: logger.error(f"Document ingestion failed: {e}", exc_info=True) return IngestResult( success=False, error=str(e) ) async def ask_about_document( self, session_id: str, question: str, doc_id: Optional[str] = None, dao_id: Optional[str] = None, user_id: Optional[str] = None, agent_id: str = "daarwizz" ) -> QAResult: """ Ask a question about a document using RAG query. Args: session_id: Session identifier question: Question text doc_id: Document ID (if None, tries to get from context) dao_id: DAO identifier user_id: User identifier Returns: QAResult with answer and citations """ try: # If doc_id not provided, try to get from context doc_url = None file_name = None if not doc_id: doc_context = await self.get_doc_context(session_id) if doc_context: doc_id = doc_context.doc_id dao_id = dao_id or doc_context.dao_id doc_url = doc_context.doc_url file_name = doc_context.file_name else: doc_context = await self.get_doc_context(session_id) if doc_context: doc_url = doc_context.doc_url file_name = doc_context.file_name if not doc_id: return QAResult( success=False, error="No document context found. Parse a document first." ) # Extract user_id from session_id if not provided if not user_id: parts = session_id.split(":", 1) user_id = parts[1] if len(parts) > 1 else session_id # Shared deterministic Excel policy for top-level agrarian agents. if ( (agent_id or "").lower() in SHARED_EXCEL_POLICY_AGENTS and self._is_excel_filename(file_name) ): deterministic = await self._try_answer_excel_question( question=question, doc_url=doc_url, file_name=file_name, ) if deterministic: return QAResult( success=True, answer=deterministic, doc_id=doc_id, sources=[{ "type": "excel_deterministic", "file_name": file_name, }], ) # Build RAG query request router_request = { "mode": "rag_query", "agent": agent_id, "metadata": { "source": self._extract_source(session_id), "dao_id": dao_id, "user_id": user_id, "session_id": session_id, }, "payload": { "question": question, "dao_id": dao_id, "user_id": user_id, "doc_id": doc_id, }, } logger.info( f"RAG query: agent={agent_id}, session={session_id}, question={question[:50]}, doc_id={doc_id}" ) # Send to Router response = await send_to_router(router_request) if not isinstance(response, dict): return QAResult( success=False, error="Invalid response from router" ) data = response.get("data", {}) answer = data.get("answer") or data.get("text") sources = data.get("citations", []) or data.get("sources", []) if answer: return QAResult( success=True, answer=answer, doc_id=doc_id, sources=sources if sources else None ) else: return QAResult( success=False, error="No answer from RAG query" ) except Exception as e: logger.error(f"RAG query failed: {e}", exc_info=True) return QAResult( success=False, error=str(e) ) def _extract_source(self, session_id: str) -> str: """Extract source channel from session_id""" parts = session_id.split(":", 1) return parts[0] if len(parts) > 1 else "unknown" # Global instance doc_service = DocumentService() # Export functions for convenience async def parse_document( session_id: str, doc_url: str, file_name: str, dao_id: str, user_id: str, output_mode: str = "qa_pairs", metadata: Optional[Dict[str, Any]] = None ) -> ParsedResult: """Parse a document through DAGI Router""" return await doc_service.parse_document( session_id=session_id, doc_url=doc_url, file_name=file_name, dao_id=dao_id, user_id=user_id, output_mode=output_mode, metadata=metadata ) async def ingest_document( session_id: str, doc_id: Optional[str] = None, doc_url: Optional[str] = None, file_name: Optional[str] = None, dao_id: Optional[str] = None, user_id: Optional[str] = None ) -> IngestResult: """Ingest document chunks into RAG/Memory""" return await doc_service.ingest_document( session_id=session_id, doc_id=doc_id, doc_url=doc_url, file_name=file_name, dao_id=dao_id, user_id=user_id ) async def ask_about_document( session_id: str, question: str, doc_id: Optional[str] = None, dao_id: Optional[str] = None, user_id: Optional[str] = None, agent_id: str = "daarwizz" ) -> QAResult: """Ask a question about a document using RAG query""" return await doc_service.ask_about_document( session_id=session_id, question=question, doc_id=doc_id, dao_id=dao_id, user_id=user_id, agent_id=agent_id ) async def save_doc_context( session_id: str, doc_id: str, doc_url: Optional[str] = None, file_name: Optional[str] = None, dao_id: Optional[str] = None, user_id: Optional[str] = None, ) -> bool: """Save document context for a session""" return await doc_service.save_doc_context( session_id=session_id, doc_id=doc_id, doc_url=doc_url, file_name=file_name, dao_id=dao_id, user_id=user_id, ) async def get_doc_context(session_id: str) -> Optional[DocContext]: """Get document context for a session""" return await doc_service.get_doc_context(session_id)