diff --git a/gateway-bot/http_api.py b/gateway-bot/http_api.py index 8bb526d3..ca1c5fce 100644 --- a/gateway-bot/http_api.py +++ b/gateway-bot/http_api.py @@ -1871,23 +1871,53 @@ async def process_document( Dict з результатом обробки """ mime_type = document.get("mime_type", "") + mime_type_l = (mime_type or "").lower() file_name = document.get("file_name", "") file_id = document.get("file_id") file_name_lower = file_name.lower() - allowed_exts = {".pdf", ".docx", ".txt", ".md", ".csv", ".xlsx", ".zip"} + allowed_exts = { + ".pdf", ".doc", ".docx", ".rtf", ".odt", + ".txt", ".md", ".markdown", + ".csv", ".tsv", ".xls", ".xlsx", ".xlsm", ".ods", + ".ppt", ".pptx", ".odp", + ".json", ".yaml", ".yml", ".xml", ".html", ".htm", + ".zip", + ".jpg", ".jpeg", ".png", ".webp", ".gif", ".bmp", ".tiff", + } is_allowed = any(file_name_lower.endswith(ext) for ext in allowed_exts) - if mime_type == "application/pdf": + if mime_type_l == "application/pdf": is_allowed = True - if mime_type in { + if mime_type_l in { + "application/msword", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/rtf", + "text/rtf", + "application/vnd.oasis.opendocument.text", + "application/vnd.ms-excel", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/vnd.ms-excel.sheet.macroenabled.12", + "application/vnd.oasis.opendocument.spreadsheet", + "application/vnd.ms-powerpoint", + "application/vnd.openxmlformats-officedocument.presentationml.presentation", + "application/vnd.oasis.opendocument.presentation", "text/plain", "text/markdown", "text/csv", + "text/tab-separated-values", + "application/json", + "application/yaml", + "application/x-yaml", + "text/yaml", + "application/xml", + "text/xml", + "text/html", "application/zip", + "application/x-zip-compressed", }: is_allowed = True + if mime_type_l.startswith("image/"): + is_allowed = True if is_allowed and file_id: logger.info(f"{agent_config.name}: Document from {username} (tg:{user_id}), file_id: {file_id}, file_name: {file_name}") @@ -2027,7 +2057,7 @@ async def process_document( telegram_token = agent_config.get_telegram_token() await send_telegram_message( chat_id, - "Наразі підтримуються формати: PDF, DOCX, TXT, MD, CSV, XLSX, ZIP.", + "Підтримуються формати: PDF/DOC/DOCX/RTF/ODT, TXT/MD/CSV/TSV, XLS/XLSX/XLSM/ODS, PPT/PPTX/ODP, JSON/YAML/XML/HTML, ZIP, зображення.", telegram_token, ) return {"ok": False, "error": "Unsupported document type"} @@ -3681,7 +3711,8 @@ async def _old_telegram_webhook(update: TelegramUpdate): doc_url=file_url, file_name=file_name, dao_id=dao_id, - user_id=f"tg:{user_id}" + user_id=f"tg:{user_id}", + agent_id=agent_config.agent_id, ) if result.success: @@ -3705,7 +3736,8 @@ async def _old_telegram_webhook(update: TelegramUpdate): result = await ingest_document( session_id=session_id, dao_id=dao_id, - user_id=f"tg:{user_id}" + user_id=f"tg:{user_id}", + agent_id=agent_config.agent_id, ) if result.success: diff --git a/gateway-bot/services/doc_service.py b/gateway-bot/services/doc_service.py index da5a6843..ed936d69 100644 --- a/gateway-bot/services/doc_service.py +++ b/gateway-bot/services/doc_service.py @@ -17,12 +17,12 @@ from typing import Optional, Dict, Any, List from pydantic import BaseModel from datetime import datetime -from router_client import send_to_router from memory_client import memory_client logger = logging.getLogger(__name__) SHARED_EXCEL_POLICY_AGENTS = {"agromatrix", "helion", "nutra", "greenfood"} +ROUTER_URL = os.getenv("ROUTER_URL", "http://router:8000") class QAItem(BaseModel): @@ -84,6 +84,28 @@ class DocumentService: """Initialize document service""" self.memory_client = memory_client + async def _router_post_json( + self, + path: str, + payload: Dict[str, Any], + timeout: float = 45.0, + ) -> Dict[str, Any]: + import httpx + + base = ROUTER_URL.rstrip("/") + url = f"{base}{path}" + async with httpx.AsyncClient(timeout=timeout) as client: + resp = await client.post(url, json=payload) + body = {} + try: + body = resp.json() + except Exception: + body = {"ok": False, "error": f"Invalid JSON from router ({resp.status_code})"} + if resp.status_code >= 400: + err = body.get("detail") or body.get("error") or f"HTTP {resp.status_code}" + raise RuntimeError(f"Router error on {path}: {err}") + return body if isinstance(body, dict) else {"ok": False, "error": "Invalid router response type"} + def _is_excel_filename(self, file_name: Optional[str]) -> bool: if not file_name: return False @@ -462,7 +484,8 @@ class DocumentService: doc_url: Optional[str] = None, file_name: Optional[str] = None, dao_id: str = None, - user_id: str = None + user_id: str = None, + agent_id: str = "daarwizz", ) -> IngestResult: """ Ingest document chunks into RAG/Memory. @@ -488,64 +511,60 @@ class DocumentService: file_name = file_name or doc_context.file_name dao_id = dao_id or doc_context.dao_id - if not doc_id and not doc_url: + if not doc_url: return IngestResult( success=False, - error="No document ID or URL provided" + error="No document URL available for ingest" ) - - # Build request to Router with ingest flag - router_request = { - "mode": "doc_parse", - "agent": "parser", + + parsed = await self.parse_document( + session_id=session_id, + doc_url=doc_url, + file_name=file_name or "document", + dao_id=dao_id or "", + user_id=user_id or "", + output_mode="markdown", + metadata={"source": self._extract_source(session_id), "mode": "ingest"}, + ) + if not parsed.success: + return IngestResult(success=False, error=parsed.error or "Document parse failed") + + effective_doc_id = doc_id or parsed.doc_id + if not effective_doc_id: + effective_doc_id = hashlib.md5(f"{session_id}:{file_name}:{datetime.utcnow().isoformat()}".encode()).hexdigest()[:12] + + doc_text = (parsed.markdown or "").strip() + if not doc_text: + return IngestResult(success=False, error="No extractable text for ingestion") + + payload = { + "agent_id": (agent_id or "daarwizz").lower(), + "doc_id": effective_doc_id, + "file_name": file_name or "document", + "text": doc_text, + "dao_id": dao_id, + "user_id": user_id, "metadata": { - "source": self._extract_source(session_id), - "dao_id": dao_id, - "user_id": user_id, "session_id": session_id, - }, - "payload": { - "output_mode": "chunks", # Use chunks for RAG ingestion - "dao_id": dao_id, - "user_id": user_id, - "ingest": True, # Flag for ingestion + "source": self._extract_source(session_id), }, } - - if doc_url: - router_request["payload"]["doc_url"] = doc_url - router_request["payload"]["file_name"] = file_name or "document.pdf" - - if doc_id: - router_request["payload"]["doc_id"] = doc_id - - logger.info(f"Ingesting document: session={session_id}, doc_id={doc_id}") - - # Send to Router - response = await send_to_router(router_request) - - if not isinstance(response, dict): - return IngestResult( - success=False, - error="Invalid response from router" - ) - - data = response.get("data", {}) - chunks = data.get("chunks", []) - - if chunks: + response = await self._router_post_json("/v1/documents/ingest", payload, timeout=90.0) + + if response.get("ok"): return IngestResult( success=True, - doc_id=doc_id or data.get("doc_id"), - ingested_chunks=len(chunks), - status="ingested" - ) - else: - return IngestResult( - success=False, - status="failed", - error="No chunks to ingest" + doc_id=response.get("doc_id") or effective_doc_id, + ingested_chunks=int(response.get("chunks_stored", 0) or 0), + status="ingested", ) + + return IngestResult( + success=False, + doc_id=effective_doc_id, + status="failed", + error=response.get("error", "Router ingest failed"), + ) except Exception as e: logger.error(f"Document ingestion failed: {e}", exc_info=True) @@ -625,38 +644,30 @@ class DocumentService: }], ) - # Build RAG query request - router_request = { - "mode": "rag_query", - "agent": agent_id, - "metadata": { - "source": self._extract_source(session_id), - "dao_id": dao_id, - "user_id": user_id, - "session_id": session_id, - }, - "payload": { - "question": question, - "dao_id": dao_id, - "user_id": user_id, - "doc_id": doc_id, - }, - } - logger.info( f"RAG query: agent={agent_id}, session={session_id}, question={question[:50]}, doc_id={doc_id}" ) - - # Send to Router - response = await send_to_router(router_request) - - if not isinstance(response, dict): + + response = await self._router_post_json( + "/v1/documents/query", + { + "agent_id": (agent_id or "daarwizz").lower(), + "question": question, + "doc_id": doc_id, + "dao_id": dao_id, + "user_id": user_id, + "limit": 5, + }, + timeout=60.0, + ) + + if isinstance(response, dict) and not response.get("ok", False): return QAResult( success=False, - error="Invalid response from router" + error=response.get("error", "Document query failed"), ) - - data = response.get("data", {}) + + data = response.get("data", {}) if isinstance(response, dict) else {} answer = data.get("answer") or data.get("text") sources = data.get("citations", []) or data.get("sources", []) @@ -717,7 +728,8 @@ async def ingest_document( doc_url: Optional[str] = None, file_name: Optional[str] = None, dao_id: Optional[str] = None, - user_id: Optional[str] = None + user_id: Optional[str] = None, + agent_id: str = "daarwizz", ) -> IngestResult: """Ingest document chunks into RAG/Memory""" return await doc_service.ingest_document( @@ -726,7 +738,8 @@ async def ingest_document( doc_url=doc_url, file_name=file_name, dao_id=dao_id, - user_id=user_id + user_id=user_id, + agent_id=agent_id, ) diff --git a/services/router/main.py b/services/router/main.py index 3e16ad9b..8b8cb517 100644 --- a/services/router/main.py +++ b/services/router/main.py @@ -1235,6 +1235,27 @@ class InferResponse(BaseModel): file_mime: Optional[str] = None +class DocumentIngestRequest(BaseModel): + """Ingest document text into agent-specific docs collection.""" + agent_id: str + doc_id: str + file_name: Optional[str] = None + text: str + dao_id: Optional[str] = None + user_id: Optional[str] = None + metadata: Optional[Dict[str, Any]] = None + + +class DocumentQueryRequest(BaseModel): + """Query document context from agent-specific docs collection.""" + agent_id: str + question: str + doc_id: Optional[str] = None + dao_id: Optional[str] = None + user_id: Optional[str] = None + limit: int = 5 + + class SharedMemoryReviewRequest(BaseModel): point_id: str approve: bool @@ -2867,6 +2888,149 @@ async def agent_infer(agent_id: str, request: InferRequest): ) +@app.post("/v1/documents/ingest") +async def documents_ingest(request: DocumentIngestRequest): + """ + Ingest raw document text into Qdrant {agent_id}_docs. + """ + if not MEMORY_RETRIEVAL_AVAILABLE or not memory_retrieval: + raise HTTPException(status_code=503, detail="Memory retrieval not available") + + agent_id = (request.agent_id or "").strip().lower() + if not agent_id: + raise HTTPException(status_code=400, detail="agent_id is required") + + text = (request.text or "").strip() + if not text: + raise HTTPException(status_code=400, detail="text is required") + + doc_id = (request.doc_id or "").strip() + if not doc_id: + # Fallback should be deterministic for same text + file + seed = f"{agent_id}:{request.file_name or ''}:{text[:400]}" + doc_id = hashlib.md5(seed.encode("utf-8")).hexdigest()[:16] + + result = await memory_retrieval.ingest_document_chunks( + agent_id=agent_id, + doc_id=doc_id, + file_name=request.file_name, + text=text, + dao_id=request.dao_id, + user_id=request.user_id, + metadata=request.metadata, + ) + if not result.get("ok"): + return { + "ok": False, + "error": result.get("error", "ingest_failed"), + "doc_id": doc_id, + "collection": result.get("collection"), + } + return result + + +@app.post("/v1/documents/query") +async def documents_query(request: DocumentQueryRequest): + """ + Query ingested document chunks and synthesize source-locked answer. + """ + if not MEMORY_RETRIEVAL_AVAILABLE or not memory_retrieval: + raise HTTPException(status_code=503, detail="Memory retrieval not available") + + agent_id = (request.agent_id or "").strip().lower() + if not agent_id: + raise HTTPException(status_code=400, detail="agent_id is required") + + question = (request.question or "").strip() + if not question: + raise HTTPException(status_code=400, detail="question is required") + + lookup = await memory_retrieval.query_document_chunks( + agent_id=agent_id, + question=question, + doc_id=request.doc_id, + dao_id=request.dao_id, + limit=request.limit, + ) + chunks = lookup.get("chunks") or [] + if not chunks: + return { + "ok": False, + "error": lookup.get("error", "no_relevant_chunks"), + "data": { + "answer": None, + "citations": [], + "doc_id": request.doc_id, + }, + } + + citations: List[Dict[str, Any]] = [] + context_blocks: List[str] = [] + for i, ch in enumerate(chunks, start=1): + c_doc_id = ch.get("doc_id") or request.doc_id + c_file = ch.get("file_name") + c_idx = ch.get("chunk_index") + c_score = float(ch.get("score", 0.0) or 0.0) + citations.append( + { + "doc_id": c_doc_id, + "file_name": c_file, + "chunk_index": c_idx, + "score": round(c_score, 4), + } + ) + src = [] + if c_file: + src.append(f"file={c_file}") + if c_idx is not None: + src.append(f"chunk={int(c_idx) + 1}") + src_label = ", ".join(src) if src else "chunk" + context_blocks.append(f"[{i}] ({src_label}) {str(ch.get('text') or '').strip()[:1400]}") + + answer_text = "" + try: + llm_req = InternalLLMRequest( + prompt=( + "Питання користувача:\n" + f"{question}\n\n" + "Контекст із документа (дозволено використовувати ТІЛЬКИ його):\n" + + "\n\n".join(context_blocks) + + "\n\n" + "Правила відповіді:\n" + "1) Відповідай лише на основі наведеного контексту.\n" + "2) Якщо даних недостатньо, прямо скажи: 'Недостатньо даних у документі'.\n" + "3) В кінці додай коротке посилання на джерело у форматі [source: N].\n" + ), + llm_profile="reasoning", + max_tokens=320, + temperature=0.1, + role_context="Document QA source-locked", + metadata={"agent_id": agent_id, "mode": "documents_query"}, + ) + llm_resp = await internal_llm_complete(llm_req) + answer_text = (llm_resp.text or "").strip() + except Exception as e: + logger.warning(f"documents_query LLM synthesis failed: {e}") + + if not answer_text: + top = chunks[0] + answer_text = ( + "Знайшов релевантний фрагмент у документі, але не вдалося сформувати підсумок. " + f"Ось ключовий уривок:\n{str(top.get('text') or '').strip()[:1200]}" + ) + + return { + "ok": True, + "data": { + "answer": answer_text, + "citations": citations, + "doc_id": request.doc_id or chunks[0].get("doc_id"), + "chunks_used": len(chunks), + "collection": lookup.get("collection"), + }, + } + + @app.get("/v1/models") async def list_available_models(): """List all available models across backends""" diff --git a/services/router/memory_retrieval.py b/services/router/memory_retrieval.py index d848fa04..aeb96b07 100644 --- a/services/router/memory_retrieval.py +++ b/services/router/memory_retrieval.py @@ -1237,6 +1237,234 @@ class MemoryRetrieval: logger.warning(f"review_shared_pending_case failed: {e}") return {"ok": False, "error": str(e)} + def _chunk_document_text( + self, + text: str, + chunk_chars: int = 1200, + overlap_chars: int = 180, + ) -> List[str]: + """ + Split document text into overlap-aware chunks for RAG indexing. + Keeps paragraph structure when possible. + """ + raw = re.sub(r"\r\n?", "\n", text or "").strip() + if not raw: + return [] + + paragraphs = [p.strip() for p in re.split(r"\n{2,}", raw) if p and p.strip()] + if not paragraphs: + return [] + + chunks: List[str] = [] + current = "" + max_hard = max(chunk_chars, 600) + + def _push_current() -> None: + nonlocal current + if current and len(current.strip()) >= 20: + chunks.append(current.strip()) + current = "" + + for para in paragraphs: + if len(para) > max_hard * 2: + _push_current() + i = 0 + step = max_hard - max(80, min(overlap_chars, max_hard // 2)) + while i < len(para): + part = para[i : i + max_hard] + if len(part.strip()) >= 20: + chunks.append(part.strip()) + i += max(1, step) + continue + + candidate = f"{current}\n\n{para}".strip() if current else para + if len(candidate) <= max_hard: + current = candidate + continue + + _push_current() + if overlap_chars > 0 and chunks: + tail = chunks[-1][-overlap_chars:] + current = f"{tail}\n\n{para}".strip() + if len(current) > max_hard: + _push_current() + current = para + else: + current = para + + _push_current() + return chunks + + async def ingest_document_chunks( + self, + agent_id: str, + doc_id: str, + file_name: Optional[str], + text: str, + dao_id: Optional[str] = None, + user_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """ + Ingest normalized document chunks into {agent_id}_docs collection. + """ + if not self.qdrant_client: + return {"ok": False, "error": "qdrant_unavailable"} + if not COHERE_API_KEY: + return {"ok": False, "error": "cohere_unavailable"} + + body = (text or "").strip() + if not body: + return {"ok": False, "error": "empty_document"} + + chunks = self._chunk_document_text(body) + if not chunks: + return {"ok": False, "error": "no_chunks"} + + collection = f"{(agent_id or 'daarwizz').lower()}_docs" + stored_points = [] + + try: + from qdrant_client.http import models as qmodels + import uuid + + try: + self.qdrant_client.get_collection(collection) + except Exception: + self.qdrant_client.create_collection( + collection_name=collection, + vectors_config=qmodels.VectorParams( + size=1024, + distance=qmodels.Distance.COSINE, + ), + ) + logger.info(f"✅ Created collection: {collection}") + + total = len(chunks) + for idx, chunk in enumerate(chunks): + emb = await self.get_embedding(chunk[:2000]) + if not emb: + continue + payload: Dict[str, Any] = { + "text": chunk[:6000], + "doc_id": doc_id, + "file_name": file_name, + "agent_id": (agent_id or "").lower(), + "dao_id": dao_id, + "user_id": user_id, + "chunk_index": idx, + "chunks_total": total, + "type": "document_chunk", + "timestamp": datetime.utcnow().isoformat(), + } + if isinstance(metadata, dict) and metadata: + payload["metadata"] = metadata + stored_points.append( + qmodels.PointStruct( + id=str(uuid.uuid4()), + vector=emb, + payload=payload, + ) + ) + + if not stored_points: + return {"ok": False, "error": "embedding_failed"} + + self.qdrant_client.upsert(collection_name=collection, points=stored_points) + return { + "ok": True, + "doc_id": doc_id, + "chunks_total": len(chunks), + "chunks_stored": len(stored_points), + "collection": collection, + } + except Exception as e: + logger.warning(f"ingest_document_chunks failed for {collection}: {e}") + return {"ok": False, "error": str(e)} + + async def query_document_chunks( + self, + agent_id: str, + question: str, + doc_id: Optional[str] = None, + dao_id: Optional[str] = None, + limit: int = 5, + ) -> Dict[str, Any]: + """ + Retrieve top document chunks from {agent_id}_docs for a question. + """ + if not self.qdrant_client: + return {"ok": False, "error": "qdrant_unavailable", "chunks": []} + if not COHERE_API_KEY: + return {"ok": False, "error": "cohere_unavailable", "chunks": []} + + q = (question or "").strip() + if not q: + return {"ok": False, "error": "empty_question", "chunks": []} + + embedding = await self.get_embedding(q[:2000]) + if not embedding: + return {"ok": False, "error": "embedding_failed", "chunks": []} + + collection = f"{(agent_id or 'daarwizz').lower()}_docs" + + try: + from qdrant_client.http import models as qmodels + must_conditions = [] + if doc_id: + must_conditions.append( + qmodels.FieldCondition( + key="doc_id", + match=qmodels.MatchValue(value=doc_id), + ) + ) + if dao_id: + must_conditions.append( + qmodels.FieldCondition( + key="dao_id", + match=qmodels.MatchValue(value=dao_id), + ) + ) + query_filter = qmodels.Filter(must=must_conditions) if must_conditions else None + + rows = self.qdrant_client.search( + collection_name=collection, + query_vector=embedding, + query_filter=query_filter, + limit=max(1, min(int(limit or 5), 12)), + with_payload=True, + ) + except Exception as e: + logger.debug(f"query_document_chunks search failed for {collection}: {e}") + return {"ok": False, "error": "search_failed", "chunks": [], "collection": collection} + + hits: List[Dict[str, Any]] = [] + for row in rows or []: + score = float(getattr(row, "score", 0.0) or 0.0) + if score < 0.30: + continue + payload = getattr(row, "payload", {}) or {} + text = str(payload.get("text") or "").strip() + if len(text) < 10: + continue + hits.append( + { + "text": text, + "score": score, + "doc_id": payload.get("doc_id"), + "file_name": payload.get("file_name"), + "chunk_index": payload.get("chunk_index"), + "chunks_total": payload.get("chunks_total"), + } + ) + + return { + "ok": bool(hits), + "chunks": hits, + "collection": collection, + "doc_id": doc_id, + } + async def store_interaction( self, channel: str, diff --git a/services/swapper-service/app/main.py b/services/swapper-service/app/main.py index bd2a4274..07591750 100644 --- a/services/swapper-service/app/main.py +++ b/services/swapper-service/app/main.py @@ -11,10 +11,13 @@ import os import asyncio import logging import base64 +import json +import re from typing import Optional, Dict, List, Any, Union from datetime import datetime, timedelta from enum import Enum from io import BytesIO +import xml.etree.ElementTree as ET from fastapi import FastAPI, HTTPException, BackgroundTasks, File, UploadFile, Form from fastapi.middleware.cors import CORSMiddleware @@ -56,16 +59,34 @@ def _csv_to_markdown(content: bytes) -> str: text = _decode_text_bytes(content) reader = csv.reader(text.splitlines()) rows = list(reader) + return _rows_to_markdown(rows) + + +def _tsv_to_markdown(content: bytes) -> str: + text = _decode_text_bytes(content) + reader = csv.reader(text.splitlines(), delimiter="\t") + rows = list(reader) + return _rows_to_markdown(rows) + + +def _rows_to_markdown(rows: List[List[Any]]) -> str: if not rows: return "" - header = rows[0] - body = rows[1:] + width = max(len(r) for r in rows) + norm_rows = [] + for r in rows: + rr = [str(c) if c is not None else "" for c in r] + if len(rr) < width: + rr.extend([""] * (width - len(rr))) + norm_rows.append(rr) + header = norm_rows[0] + body = norm_rows[1:] lines = [ "| " + " | ".join(header) + " |", "| " + " | ".join(["---"] * len(header)) + " |", ] for row in body: - lines.append("| " + " | ".join(row) + " |") + lines.append("| " + " | ".join([str(c) if c is not None else "" for c in row]) + " |") return "\n".join(lines) @@ -91,6 +112,69 @@ def _xlsx_to_markdown(content: bytes) -> str: return "\n".join(parts) +def _xls_to_markdown(content: bytes) -> str: + try: + import xlrd + except Exception as e: + raise HTTPException(status_code=500, detail=f"xlrd not available: {e}") + wb = xlrd.open_workbook(file_contents=content) + parts = [] + for s in wb.sheets(): + parts.append(f"## Sheet: {s.name}") + rows = [] + for r in range(s.nrows): + rows.append([s.cell_value(r, c) for c in range(s.ncols)]) + if not rows: + parts.append("_Empty sheet_") + continue + parts.append(_rows_to_markdown(rows)) + return "\n\n".join(parts) + + +def _ods_to_markdown(content: bytes) -> str: + try: + from odf.opendocument import load + from odf.table import Table, TableRow, TableCell + from odf.text import P + except Exception as e: + raise HTTPException(status_code=500, detail=f"odfpy not available: {e}") + + try: + doc = load(BytesIO(content)) + except Exception as e: + raise HTTPException(status_code=400, detail=f"Invalid ODS file: {e}") + + parts = [] + for table in doc.spreadsheet.getElementsByType(Table): + table_name = str(table.getAttribute("name") or "Sheet") + parts.append(f"## Sheet: {table_name}") + rows: List[List[str]] = [] + for row in table.getElementsByType(TableRow): + cells_out: List[str] = [] + for cell in row.getElementsByType(TableCell): + txt_parts = [] + for p in cell.getElementsByType(P): + txt_parts.extend( + [str(getattr(node, "data", "")).strip() for node in p.childNodes if getattr(node, "data", None)] + ) + cell_text = " ".join([t for t in txt_parts if t]).strip() + repeat_raw = cell.getAttribute("numbercolumnsrepeated") + try: + repeat = int(repeat_raw) if repeat_raw else 1 + except Exception: + repeat = 1 + repeat = max(1, min(repeat, 100)) + for _ in range(repeat): + cells_out.append(cell_text) + if cells_out: + rows.append(cells_out) + if not rows: + parts.append("_Empty sheet_") + continue + parts.append(_rows_to_markdown(rows)) + return "\n\n".join(parts) + + def _docx_to_text(content: bytes) -> str: try: from docx import Document @@ -115,18 +199,111 @@ def _pdf_to_text(content: bytes) -> str: return "\n\n".join(text_content) +def _pptx_to_text(content: bytes) -> str: + try: + from pptx import Presentation + except Exception as e: + raise HTTPException(status_code=500, detail=f"python-pptx not available: {e}") + prs = Presentation(BytesIO(content)) + parts = [] + for idx, slide in enumerate(prs.slides, start=1): + parts.append(f"## Slide {idx}") + slide_lines = [] + for shape in slide.shapes: + text = getattr(shape, "text", None) + if text and str(text).strip(): + slide_lines.append(str(text).strip()) + parts.extend(slide_lines if slide_lines else ["_No text on this slide_"]) + return "\n\n".join(parts) + + +def _json_to_text(content: bytes) -> str: + raw = _decode_text_bytes(content) + try: + parsed = json.loads(raw) + return json.dumps(parsed, ensure_ascii=False, indent=2) + except Exception: + return raw + + +def _yaml_to_text(content: bytes) -> str: + raw = _decode_text_bytes(content) + try: + parsed = yaml.safe_load(raw) + return yaml.safe_dump(parsed, allow_unicode=True, sort_keys=False) + except Exception: + return raw + + +def _xml_to_text(content: bytes) -> str: + raw = _decode_text_bytes(content) + try: + root = ET.fromstring(raw) + text = " ".join([t.strip() for t in root.itertext() if t and t.strip()]) + return text or raw + except Exception: + return raw + + +def _html_to_text(content: bytes) -> str: + raw = _decode_text_bytes(content) + try: + from bs4 import BeautifulSoup + + soup = BeautifulSoup(raw, "html.parser") + text = soup.get_text(separator="\n") + text = re.sub(r"\n{3,}", "\n\n", text) + return text.strip() or raw + except Exception: + # Minimal fallback if bs4 is unavailable + text = re.sub(r"<[^>]+>", " ", raw) + text = re.sub(r"\s+", " ", text) + return text.strip() + + +def _rtf_to_text(content: bytes) -> str: + raw = _decode_text_bytes(content) + try: + from striprtf.striprtf import rtf_to_text + return rtf_to_text(raw) + except Exception: + # Basic fallback: strip common RTF control tokens + text = re.sub(r"\\'[0-9a-fA-F]{2}", " ", raw) + text = re.sub(r"\\[a-zA-Z]+-?\d* ?", " ", text) + text = text.replace("{", " ").replace("}", " ") + return re.sub(r"\s+", " ", text).strip() + + def _extract_text_by_ext(filename: str, content: bytes) -> str: ext = filename.split(".")[-1].lower() if "." in filename else "" - if ext in ["txt", "md"]: + if ext in ["txt", "md", "markdown"]: return _decode_text_bytes(content) if ext == "csv": return _csv_to_markdown(content) - if ext == "xlsx": + if ext == "tsv": + return _tsv_to_markdown(content) + if ext in {"xlsx", "xlsm"}: return _xlsx_to_markdown(content) + if ext == "xls": + return _xls_to_markdown(content) + if ext == "ods": + return _ods_to_markdown(content) if ext == "docx": return _docx_to_text(content) if ext == "pdf": return _pdf_to_text(content) + if ext == "pptx": + return _pptx_to_text(content) + if ext == "json": + return _json_to_text(content) + if ext in {"yaml", "yml"}: + return _yaml_to_text(content) + if ext == "xml": + return _xml_to_text(content) + if ext in {"html", "htm"}: + return _html_to_text(content) + if ext == "rtf": + return _rtf_to_text(content) raise HTTPException(status_code=400, detail=f"Unsupported file type: .{ext}") @@ -139,7 +316,12 @@ def _zip_to_markdown(content: bytes, max_files: int = 50, max_total_mb: int = 10 if total_size > max_total_mb * 1024 * 1024: raise HTTPException(status_code=400, detail=f"ZIP слишком большой: {total_size / 1024 / 1024:.1f} MB") parts = [] - allowed_exts = {"txt", "md", "csv", "xlsx", "docx", "pdf"} + allowed_exts = { + "txt", "md", "markdown", "csv", "tsv", + "xls", "xlsx", "xlsm", "ods", + "docx", "pdf", "pptx", + "json", "yaml", "yml", "xml", "html", "htm", "rtf", + } processed = [] skipped = [] for member in members: @@ -1655,7 +1837,8 @@ async def document_endpoint( - json: Structured JSON with document elements - text: Plain text extraction - Supported files: PDF, DOCX, PPTX, images (PNG, JPG) + Supported files: + PDF, DOCX, XLS/XLSX/XLSM/ODS, PPTX, TXT/MD/CSV/TSV, JSON/YAML/XML/HTML, RTF, ZIP, images. """ try: import time @@ -1672,15 +1855,28 @@ async def document_endpoint( filename = file.filename if file else "document" file_ext = filename.split(".")[-1].lower() if "." in filename else "pdf" - # Handle text-based formats without Docling - if file_ext in ["txt", "md", "csv", "xlsx", "zip"]: + # Handle deterministic extraction for standard office/text formats + if file_ext in [ + "txt", "md", "markdown", "csv", "tsv", + "xlsx", "xls", "xlsm", "ods", + "json", "yaml", "yml", "xml", "html", "htm", "rtf", + "pptx", "zip", + ]: try: if file_ext == "zip": content = _zip_to_markdown(doc_data) output_format = "markdown" else: content = _extract_text_by_ext(filename, doc_data) - output_format = "markdown" if file_ext in ["md", "csv", "xlsx"] else "text" + output_format = ( + "markdown" + if file_ext in { + "md", "markdown", "csv", "tsv", + "xlsx", "xls", "xlsm", "ods", + "json", "yaml", "yml", "xml", "html", "htm", "pptx", + } + else "text" + ) processing_time_ms = (time.time() - start_time) * 1000 return { "success": True, @@ -1764,22 +1960,27 @@ async def document_endpoint( "device": swapper.device } - # For DOCX, try python-docx - if file_ext == "docx": + # For common office/text formats, try deterministic extractors. + if file_ext in { + "docx", "txt", "md", "markdown", "csv", "tsv", + "xlsx", "xls", "xlsm", "ods", + "pptx", "json", "yaml", "yml", "xml", "html", "htm", "rtf", + }: try: - content = _docx_to_text(doc_data) + content = _extract_text_by_ext(filename, doc_data) + out_fmt = "markdown" if file_ext not in {"txt", "rtf"} else "text" return { "success": True, - "model": "python-docx (fallback)", - "output_format": "text", + "model": "text-extract (fallback)", + "output_format": out_fmt, "result": content, "filename": filename, "processing_time_ms": (time.time() - start_time) * 1000, "device": swapper.device } except Exception as e: - logger.error(f"DOCX fallback failed: {e}") - raise HTTPException(status_code=500, detail="DOCX extraction failed") + logger.error(f"Text fallback failed for .{file_ext}: {e}") + raise HTTPException(status_code=500, detail=f"Extraction failed for .{file_ext}") # For PDFs, try pdfplumber if file_ext == "pdf": @@ -1807,7 +2008,7 @@ async def document_endpoint( # For other documents, return error raise HTTPException( status_code=503, - detail="Document processing not available. Supported: PDF (with pdfplumber), images (with OCR)" + detail="Document processing unavailable for this type. Supported: office/text/image/zip standard formats." ) finally: @@ -2312,4 +2513,3 @@ async def get_multimodal_stack(): if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8890) - diff --git a/services/swapper-service/app/requirements.txt b/services/swapper-service/app/requirements.txt index 6f40062a..75e38998 100644 --- a/services/swapper-service/app/requirements.txt +++ b/services/swapper-service/app/requirements.txt @@ -4,6 +4,15 @@ httpx==0.25.2 pydantic==2.5.0 pyyaml==6.0.1 python-multipart==0.0.6 +chardet>=5.2.0 +openpyxl>=3.1.2 +python-docx>=1.1.2 +pdfplumber>=0.11.0 +python-pptx>=0.6.23 +xlrd>=2.0.1 +odfpy>=1.4.1 +beautifulsoup4>=4.12.0 +striprtf>=0.0.26 # HuggingFace dependencies for OCR models torch>=2.0.0 @@ -25,4 +34,4 @@ safetensors>=0.4.0 # Web Scraping & Search trafilatura>=1.6.0 -duckduckgo-search>=4.0.0 \ No newline at end of file +duckduckgo-search>=4.0.0 diff --git a/services/swapper-service/requirements.txt b/services/swapper-service/requirements.txt index e15ea696..22acb316 100644 --- a/services/swapper-service/requirements.txt +++ b/services/swapper-service/requirements.txt @@ -43,3 +43,8 @@ pdfplumber>=0.10.0 python-docx>=1.1.0 openpyxl>=3.1.2 chardet>=5.2.0 +python-pptx>=0.6.23 +xlrd>=2.0.1 +odfpy>=1.4.1 +beautifulsoup4>=4.12.0 +striprtf>=0.0.26