""" Tool Manager for Helion Agent Implements OpenAI-compatible function calling for DeepSeek, Mistral, Grok """ import os import asyncio import uuid from agent_tools_config import get_agent_tools, is_tool_allowed import json import logging import hashlib import base64 import csv import tempfile import subprocess import httpx from typing import Dict, List, Any, Optional from dataclasses import dataclass from io import BytesIO, StringIO from pathlib import PurePath import xml.etree.ElementTree as ET from xml.sax.saxutils import escape as xml_escape from zipfile import ZIP_DEFLATED, ZipFile logger = logging.getLogger(__name__) # Tool definitions in OpenAI function calling format # ORDER MATTERS: Memory/Graph tools first, then web search as fallback TOOL_DEFINITIONS = [ # PRIORITY 1: Internal knowledge sources (use FIRST) { "type": "function", "function": { "name": "memory_search", "description": "🔍 ПЕРШИЙ КРОК для пошуку! Шукає в моїй пам'яті: збережені факти, документи, розмови. ЗАВЖДИ використовуй спочатку перед web_search!", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "Що шукати в пам'яті" } }, "required": ["query"] } } }, { "type": "function", "function": { "name": "graph_query", "description": "🔍 Пошук в Knowledge Graph - зв'язки між проєктами, людьми, темами Energy Union. Використовуй для питань про проєкти, партнерів, технології.", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "Що шукати (назва проєкту, людини, теми)" }, "entity_type": { "type": "string", "enum": ["User", "Topic", "Project", "Fact"], "description": "Тип сутності для пошуку" } }, "required": ["query"] } } }, # PRIORITY 2: Web search (use ONLY if memory/graph don't have info) { "type": "function", "function": { "name": "web_search", "description": "🌐 Пошук в інтернеті. Використовуй ТІЛЬКИ якщо memory_search і graph_query не знайшли потрібної інформації!", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "Пошуковий запит" }, "max_results": { "type": "integer", "description": "Максимальна кількість результатів (1-10)", "default": 5 } }, "required": ["query"] } } }, { "type": "function", "function": { "name": "web_extract", "description": "Витягнути текстовий контент з веб-сторінки за URL", "parameters": { "type": "object", "properties": { "url": { "type": "string", "description": "URL сторінки для читання" } }, "required": ["url"] } } }, # PRIORITY 3: Generation tools { "type": "function", "function": { "name": "image_generate", "description": "🎨 Згенерувати зображення за текстовим описом (FLUX)", "parameters": { "type": "object", "properties": { "prompt": { "type": "string", "description": "Опис зображення для генерації (англійською краще)" }, "width": { "type": "integer", "description": "Ширина зображення", "default": 512 }, "height": { "type": "integer", "description": "Висота зображення", "default": 512 } }, "required": ["prompt"] } } }, { "type": "function", "function": { "name": "comfy_generate_image", "description": "🖼️ Згенерувати зображення через ComfyUI (NODE3, Stable Diffusion). Для високої якості та детальних зображень.", "parameters": { "type": "object", "properties": { "prompt": { "type": "string", "description": "Детальний опис зображення (англійською)" }, "negative_prompt": { "type": "string", "description": "Що НЕ включати в зображення", "default": "blurry, low quality, watermark" }, "width": { "type": "integer", "description": "Ширина (512, 768, 1024)", "default": 512 }, "height": { "type": "integer", "description": "Висота (512, 768, 1024)", "default": 512 }, "steps": { "type": "integer", "description": "Кількість кроків генерації (20-50)", "default": 28 } }, "required": ["prompt"] } } }, { "type": "function", "function": { "name": "comfy_generate_video", "description": "🎬 Згенерувати відео через ComfyUI (NODE3, LTX-2). Text-to-video для коротких кліпів.", "parameters": { "type": "object", "properties": { "prompt": { "type": "string", "description": "Детальний опис відео (англійською)" }, "seconds": { "type": "integer", "description": "Тривалість в секундах (2-8)", "default": 4 }, "fps": { "type": "integer", "description": "Кадри в секунду (24-30)", "default": 24 }, "steps": { "type": "integer", "description": "Кількість кроків генерації (20-40)", "default": 30 } }, "required": ["prompt"] } } }, { "type": "function", "function": { "name": "remember_fact", "description": "Запам'ятати важливий факт про користувача або тему", "parameters": { "type": "object", "properties": { "fact": { "type": "string", "description": "Факт для запам'ятовування" }, "about": { "type": "string", "description": "Про кого/що цей факт (username або тема)" }, "category": { "type": "string", "enum": ["personal", "technical", "preference", "project"], "description": "Категорія факту" } }, "required": ["fact"] } } }, # PRIORITY 4: Document/Presentation tools { "type": "function", "function": { "name": "presentation_create", "description": "📊 Створити презентацію PowerPoint. Використовуй коли користувач просить 'створи презентацію', 'зроби презентацію', 'підготуй слайди'.", "parameters": { "type": "object", "properties": { "title": { "type": "string", "description": "Назва презентації" }, "slides": { "type": "array", "items": { "type": "object", "properties": { "title": {"type": "string", "description": "Заголовок слайду"}, "content": {"type": "string", "description": "Контент слайду (markdown)"} } }, "description": "Масив слайдів: [{title, content}]" }, "brand_id": { "type": "string", "description": "ID бренду для стилю (energyunion, greenfood, nutra)", "default": "energyunion" }, "theme_version": { "type": "string", "description": "Версія теми", "default": "v1.0.0" }, "language": { "type": "string", "enum": ["uk", "en", "ru"], "description": "Мова презентації", "default": "uk" } }, "required": ["title", "slides"] } } }, { "type": "function", "function": { "name": "presentation_status", "description": "📋 Перевірити статус створення презентації за job_id", "parameters": { "type": "object", "properties": { "job_id": { "type": "string", "description": "ID завдання рендерингу" } }, "required": ["job_id"] } } }, { "type": "function", "function": { "name": "presentation_download", "description": "📥 Отримати посилання на готову презентацію за artifact_id", "parameters": { "type": "object", "properties": { "artifact_id": { "type": "string", "description": "ID артефакту презентації" }, "format": { "type": "string", "enum": ["pptx", "pdf"], "description": "Формат файлу", "default": "pptx" } }, "required": ["artifact_id"] } } }, { "type": "function", "function": { "name": "file_tool", "description": "📁 Універсальний file tool для створення та оновлення CSV/JSON/YAML/ZIP і інших форматів через action-based API.", "parameters": { "type": "object", "properties": { "action": { "type": "string", "enum": [ "excel_create", "excel_update", "docx_create", "docx_update", "pptx_create", "pptx_update", "ods_create", "ods_update", "parquet_create", "parquet_update", "csv_create", "csv_update", "pdf_fill", "pdf_merge", "pdf_split", "pdf_update", "djvu_to_pdf", "djvu_extract_text", "json_export", "yaml_export", "zip_bundle", "text_create", "text_update", "markdown_create", "markdown_update", "xml_export", "html_export", "image_create", "image_edit", "image_convert", "image_bundle", "svg_export", "svg_to_png" ], "description": "Дія file tool" }, "file_name": { "type": "string", "description": "Назва файлу-результату" }, "file_base64": { "type": "string", "description": "Вхідний файл у base64 для update-операцій" }, "content": { "description": "Контент для json/yaml export" }, "headers": { "type": "array", "items": {"type": "string"}, "description": "Заголовки для CSV" }, "rows": { "type": "array", "description": "Рядки для CSV" }, "entries": { "type": "array", "description": "Елементи для zip_bundle" }, "operation": { "type": "string", "enum": ["append", "replace"], "description": "Режим csv_update" } }, "required": ["action"] } } }, # PRIORITY 5: Web Scraping tools { "type": "function", "function": { "name": "crawl4ai_scrape", "description": "🕷️ Глибокий скрейпінг веб-сторінки через Crawl4AI. Витягує повний контент, структуровані дані, медіа. Використовуй для детального аналізу сайтів.", "parameters": { "type": "object", "properties": { "url": { "type": "string", "description": "URL сторінки для скрейпінгу" }, "extract_links": { "type": "boolean", "description": "Витягувати посилання зі сторінки", "default": True }, "extract_images": { "type": "boolean", "description": "Витягувати зображення", "default": False } }, "required": ["url"] } } }, # PRIORITY 6: TTS tools { "type": "function", "function": { "name": "tts_speak", "description": "🔊 Перетворити текст на аудіо (Text-to-Speech). Повертає аудіо файл. Використовуй коли користувач просить озвучити текст.", "parameters": { "type": "object", "properties": { "text": { "type": "string", "description": "Текст для озвучення" }, "language": { "type": "string", "enum": ["uk", "en", "ru"], "description": "Мова озвучення", "default": "uk" } }, "required": ["text"] } } }, # PRIORITY 7: Market Data tools (SenpAI) { "type": "function", "function": { "name": "market_data", "description": "📊 Отримати real-time ринкові дані: поточну ціну, котирування, обсяги, аналітичні фічі (VWAP, spread, volatility, trade signals). Доступні символи: BTCUSDT, ETHUSDT.", "parameters": { "type": "object", "properties": { "symbol": { "type": "string", "description": "Торговий символ (наприклад BTCUSDT, ETHUSDT)" }, "query_type": { "type": "string", "enum": ["price", "features", "all"], "description": "Тип запиту: price (ціна + котирування), features (аналітичні фічі), all (все разом)", "default": "all" } }, "required": ["symbol"] } } } ] @dataclass class ToolResult: """Result of tool execution""" success: bool result: Any error: Optional[str] = None image_base64: Optional[str] = None # For image generation results file_base64: Optional[str] = None file_name: Optional[str] = None file_mime: Optional[str] = None class ToolManager: """Manages tool execution for the agent""" def __init__(self, config: Dict[str, Any]): self.config = config self.http_client = httpx.AsyncClient(timeout=60.0) self.swapper_url = os.getenv("SWAPPER_URL", "http://swapper-service:8890") self.comfy_agent_url = os.getenv("COMFY_AGENT_URL", "http://212.8.58.133:8880") self.tools_config = self._load_tools_config() def _load_tools_config(self) -> Dict[str, Dict]: """Load tool endpoints from config""" tools = {} agent_config = self.config.get("agents", {}).get("helion", {}) for tool in agent_config.get("tools", []): if "endpoint" in tool: tools[tool["id"]] = { "endpoint": tool["endpoint"], "method": tool.get("method", "POST") } return tools def get_tool_definitions(self, agent_id: str = None) -> List[Dict]: """Get tool definitions for function calling, filtered by agent permissions""" if not agent_id: return TOOL_DEFINITIONS # Get allowed tools for this agent allowed_tools = get_agent_tools(agent_id) # Filter tool definitions filtered = [] for tool_def in TOOL_DEFINITIONS: tool_name = tool_def.get("function", {}).get("name") if tool_name in allowed_tools: filtered.append(tool_def) tool_names = [t.get("function", {}).get("name") for t in filtered] logger.debug(f"Agent {agent_id} has {len(filtered)} tools: {tool_names}") return filtered async def execute_tool( self, tool_name: str, arguments: Dict[str, Any], agent_id: str = None, chat_id: str = None, user_id: str = None, ) -> ToolResult: """Execute a tool and return result. Optionally checks agent permissions.""" logger.info(f"🔧 Executing tool: {tool_name} for agent={agent_id} with args: {arguments}") # Check agent permission if agent_id provided if agent_id and not is_tool_allowed(agent_id, tool_name): logger.warning(f"⚠️ Tool {tool_name} not allowed for agent {agent_id}") return ToolResult(success=False, result=None, error=f"Tool {tool_name} not available for this agent") try: # Priority 1: Memory/Knowledge tools if tool_name == "memory_search": return await self._memory_search(arguments, agent_id=agent_id, chat_id=chat_id, user_id=user_id) elif tool_name == "graph_query": return await self._graph_query(arguments, agent_id=agent_id) # Priority 2: Web tools elif tool_name == "web_search": return await self._web_search(arguments) elif tool_name == "web_extract": return await self._web_extract(arguments) elif tool_name == "image_generate": return await self._image_generate(arguments) elif tool_name == "comfy_generate_image": return await self._comfy_generate_image(arguments) elif tool_name == "comfy_generate_video": return await self._comfy_generate_video(arguments) elif tool_name == "remember_fact": return await self._remember_fact(arguments, agent_id=agent_id, chat_id=chat_id, user_id=user_id) # Priority 4: Presentation tools elif tool_name == "presentation_create": return await self._presentation_create(arguments) elif tool_name == "presentation_status": return await self._presentation_status(arguments) elif tool_name == "presentation_download": return await self._presentation_download(arguments) # Priority 5: Web scraping tools elif tool_name == "crawl4ai_scrape": return await self._crawl4ai_scrape(arguments) # Priority 6: TTS tools elif tool_name == "tts_speak": return await self._tts_speak(arguments) # Priority 6: File artifacts elif tool_name == "file_tool": return await self._file_tool(arguments) # Priority 7: Market Data (SenpAI) elif tool_name == "market_data": return await self._market_data(arguments) else: return ToolResult(success=False, result=None, error=f"Unknown tool: {tool_name}") except Exception as e: logger.error(f"Tool execution failed: {e}") return ToolResult(success=False, result=None, error=str(e)) @staticmethod def _sanitize_file_name(name: Optional[str], default_name: str, force_ext: Optional[str] = None) -> str: raw = (name or default_name).strip() or default_name base = PurePath(raw).name if not base: base = default_name if force_ext and not base.lower().endswith(force_ext): base = f"{base}{force_ext}" return base @staticmethod def _b64_from_bytes(data: bytes) -> str: return base64.b64encode(data).decode("utf-8") @staticmethod def _bytes_from_b64(value: str) -> bytes: return base64.b64decode(value) @staticmethod def _normalize_rows(rows: Any, headers: Optional[List[str]] = None) -> List[List[Any]]: if not isinstance(rows, list): return [] out: List[List[Any]] = [] for row in rows: if isinstance(row, list): out.append(row) elif isinstance(row, dict): keys = headers or list(row.keys()) out.append([row.get(k, "") for k in keys]) else: out.append([row]) return out @staticmethod def _append_sheet_data(ws: Any, headers: List[str], rows: List[List[Any]]) -> None: if headers: ws.append(headers) for row in rows: ws.append(row) async def _file_tool(self, args: Dict[str, Any]) -> ToolResult: action = str((args or {}).get("action") or "").strip().lower() if not action: return ToolResult(success=False, result=None, error="Missing action") if action == "excel_create": return self._file_excel_create(args) if action == "excel_update": return self._file_excel_update(args) if action == "csv_create": return self._file_csv_create(args) if action == "csv_update": return self._file_csv_update(args) if action == "ods_create": return self._file_ods_create(args) if action == "ods_update": return self._file_ods_update(args) if action == "parquet_create": return self._file_parquet_create(args) if action == "parquet_update": return self._file_parquet_update(args) if action == "text_create": return self._file_text_create(args) if action == "text_update": return self._file_text_update(args) if action == "markdown_create": return self._file_markdown_create(args) if action == "markdown_update": return self._file_markdown_update(args) if action == "xml_export": return self._file_xml_export(args) if action == "html_export": return self._file_html_export(args) if action == "image_create": return self._file_image_create(args) if action == "image_edit": return self._file_image_edit(args) if action == "image_convert": return self._file_image_convert(args) if action == "image_bundle": return self._file_image_bundle(args) if action == "svg_export": return self._file_svg_export(args) if action == "svg_to_png": return self._file_svg_to_png(args) if action == "json_export": return self._file_json_export(args) if action == "yaml_export": return self._file_yaml_export(args) if action == "zip_bundle": return self._file_zip_bundle(args) if action == "docx_create": return self._file_docx_create(args) if action == "docx_update": return self._file_docx_update(args) if action == "pptx_create": return self._file_pptx_create(args) if action == "pptx_update": return self._file_pptx_update(args) if action == "pdf_merge": return self._file_pdf_merge(args) if action == "pdf_split": return self._file_pdf_split(args) if action == "pdf_update": return self._file_pdf_update(args) if action == "pdf_fill": return self._file_pdf_fill(args) if action == "djvu_to_pdf": return self._file_djvu_to_pdf(args) if action == "djvu_extract_text": return self._file_djvu_extract_text(args) return ToolResult(success=False, result=None, error=f"Action not implemented yet: {action}") def _file_csv_create(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "export.csv", force_ext=".csv") headers = args.get("headers") or [] rows_raw = args.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) sio = StringIO(newline="") writer = csv.writer(sio) if headers: writer.writerow(headers) for row in rows: writer.writerow(row) data = sio.getvalue().encode("utf-8") return ToolResult( success=True, result={"message": f"CSV created: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/csv", ) def _file_csv_update(self, args: Dict[str, Any]) -> ToolResult: src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for csv_update") file_name = self._sanitize_file_name(args.get("file_name"), "updated.csv", force_ext=".csv") operation = str(args.get("operation") or "append").strip().lower() if operation not in {"append", "replace"}: return ToolResult(success=False, result=None, error="operation must be append|replace") headers = args.get("headers") or [] rows_raw = args.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) existing_rows: List[List[str]] = [] text = self._bytes_from_b64(src_b64).decode("utf-8") if text.strip(): existing_rows = [list(r) for r in csv.reader(StringIO(text))] out_rows: List[List[Any]] = [] if operation == "replace": if headers: out_rows.append(headers) out_rows.extend(rows) else: if existing_rows: out_rows.extend(existing_rows) elif headers: out_rows.append(headers) out_rows.extend(rows) sio = StringIO(newline="") writer = csv.writer(sio) for row in out_rows: writer.writerow(row) data = sio.getvalue().encode("utf-8") return ToolResult( success=True, result={"message": f"CSV updated: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/csv", ) @staticmethod def _rows_to_objects(rows_raw: Any, headers: Optional[List[str]] = None) -> List[Dict[str, Any]]: if not isinstance(rows_raw, list): return [] result: List[Dict[str, Any]] = [] for idx, row in enumerate(rows_raw): if isinstance(row, dict): result.append(dict(row)) continue if isinstance(row, list): if headers: obj = {str(headers[i]): row[i] if i < len(row) else None for i in range(len(headers))} else: obj = {f"col_{i+1}": v for i, v in enumerate(row)} result.append(obj) continue key = headers[0] if headers else "value" result.append({str(key): row}) return result def _file_ods_create(self, args: Dict[str, Any]) -> ToolResult: from odf.opendocument import OpenDocumentSpreadsheet from odf.table import Table, TableCell, TableRow from odf.text import P file_name = self._sanitize_file_name(args.get("file_name"), "sheet.ods", force_ext=".ods") headers = args.get("headers") or [] rows_raw = args.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) doc = OpenDocumentSpreadsheet() table = Table(name=str(args.get("sheet_name") or "Sheet1")) if headers: hrow = TableRow() for value in headers: cell = TableCell(valuetype="string") cell.addElement(P(text=str(value))) hrow.addElement(cell) table.addElement(hrow) for row in rows: trow = TableRow() for value in row: cell = TableCell(valuetype="string") cell.addElement(P(text="" if value is None else str(value))) trow.addElement(cell) table.addElement(trow) doc.spreadsheet.addElement(table) with tempfile.NamedTemporaryFile(suffix=".ods") as tmp: doc.save(tmp.name) tmp.seek(0) payload = tmp.read() return ToolResult( success=True, result={"message": f"ODS created: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="application/vnd.oasis.opendocument.spreadsheet", ) def _file_ods_update(self, args: Dict[str, Any]) -> ToolResult: from odf.opendocument import OpenDocumentSpreadsheet, load from odf.table import Table, TableCell, TableRow from odf.text import P src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for ods_update") file_name = self._sanitize_file_name(args.get("file_name"), "updated.ods", force_ext=".ods") operation = str(args.get("operation") or "append").strip().lower() if operation not in {"append", "replace"}: return ToolResult(success=False, result=None, error="operation must be append|replace") headers = args.get("headers") or [] rows_raw = args.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) with tempfile.NamedTemporaryFile(suffix=".ods") as src: src.write(self._bytes_from_b64(src_b64)) src.flush() doc = load(src.name) # Rebuild first table to keep update deterministic. tables = doc.spreadsheet.getElementsByType(Table) existing: List[List[str]] = [] if tables and operation == "append": first = tables[0] for r in first.getElementsByType(TableRow): vals = [] for c in r.getElementsByType(TableCell): text_nodes = c.getElementsByType(P) vals.append("".join((p.firstChild.data if p.firstChild else "") for p in text_nodes)) existing.append(vals) doc.spreadsheet.removeChild(first) elif tables: doc.spreadsheet.removeChild(tables[0]) table = Table(name=str(args.get("sheet_name") or "Sheet1")) out_rows: List[List[Any]] = [] if operation == "append" and existing: out_rows.extend(existing) out_rows.extend(rows) else: if headers: out_rows.append(headers) out_rows.extend(rows) for row in out_rows: trow = TableRow() for value in row: cell = TableCell(valuetype="string") cell.addElement(P(text="" if value is None else str(value))) trow.addElement(cell) table.addElement(trow) doc.spreadsheet.addElement(table) with tempfile.NamedTemporaryFile(suffix=".ods") as dst: doc.save(dst.name) dst.seek(0) payload = dst.read() return ToolResult( success=True, result={"message": f"ODS updated: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="application/vnd.oasis.opendocument.spreadsheet", ) def _file_parquet_create(self, args: Dict[str, Any]) -> ToolResult: import pyarrow as pa import pyarrow.parquet as pq file_name = self._sanitize_file_name(args.get("file_name"), "data.parquet", force_ext=".parquet") headers = args.get("headers") or [] rows_raw = args.get("rows") or [] objects = self._rows_to_objects(rows_raw, headers=headers if headers else None) table = pa.Table.from_pylist(objects if objects else [{"value": None}]) out = BytesIO() pq.write_table(table, out) payload = out.getvalue() return ToolResult( success=True, result={"message": f"Parquet created: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="application/vnd.apache.parquet", ) def _file_parquet_update(self, args: Dict[str, Any]) -> ToolResult: import pyarrow as pa import pyarrow.parquet as pq src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for parquet_update") file_name = self._sanitize_file_name(args.get("file_name"), "updated.parquet", force_ext=".parquet") operation = str(args.get("operation") or "append").strip().lower() if operation not in {"append", "replace"}: return ToolResult(success=False, result=None, error="operation must be append|replace") headers = args.get("headers") or [] rows_raw = args.get("rows") or [] new_rows = self._rows_to_objects(rows_raw, headers=headers if headers else None) existing_rows: List[Dict[str, Any]] = [] if operation == "append": table = pq.read_table(BytesIO(self._bytes_from_b64(src_b64))) existing_rows = table.to_pylist() merged = new_rows if operation == "replace" else (existing_rows + new_rows) table = pa.Table.from_pylist(merged if merged else [{"value": None}]) out = BytesIO() pq.write_table(table, out) payload = out.getvalue() return ToolResult( success=True, result={"message": f"Parquet updated: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="application/vnd.apache.parquet", ) def _file_json_export(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "export.json", force_ext=".json") content = args.get("content") indent = int(args.get("indent") or 2) payload = json.dumps(content, indent=indent, ensure_ascii=False).encode("utf-8") return ToolResult( success=True, result={"message": f"JSON exported: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="application/json", ) @staticmethod def _stringify_text_payload(args: Dict[str, Any], key: str = "text") -> str: value = args.get(key) if value is None and key != "content": value = args.get("content") if value is None: return "" if isinstance(value, (dict, list)): return json.dumps(value, ensure_ascii=False, indent=2) return str(value) def _file_text_create(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "note.txt", force_ext=".txt") text = self._stringify_text_payload(args, key="text") data = text.encode("utf-8") return ToolResult( success=True, result={"message": f"Text file created: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/plain", ) def _file_text_update(self, args: Dict[str, Any]) -> ToolResult: src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for text_update") file_name = self._sanitize_file_name(args.get("file_name"), "updated.txt", force_ext=".txt") operation = str(args.get("operation") or "append").strip().lower() if operation not in {"append", "replace"}: return ToolResult(success=False, result=None, error="operation must be append|replace") incoming = self._stringify_text_payload(args, key="text") existing = self._bytes_from_b64(src_b64).decode("utf-8") updated = incoming if operation == "replace" else f"{existing}{incoming}" data = updated.encode("utf-8") return ToolResult( success=True, result={"message": f"Text file updated: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/plain", ) def _file_markdown_create(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "document.md", force_ext=".md") text = self._stringify_text_payload(args, key="text") data = text.encode("utf-8") return ToolResult( success=True, result={"message": f"Markdown created: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/markdown", ) def _file_markdown_update(self, args: Dict[str, Any]) -> ToolResult: src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for markdown_update") file_name = self._sanitize_file_name(args.get("file_name"), "updated.md", force_ext=".md") operation = str(args.get("operation") or "append").strip().lower() if operation not in {"append", "replace"}: return ToolResult(success=False, result=None, error="operation must be append|replace") incoming = self._stringify_text_payload(args, key="text") existing = self._bytes_from_b64(src_b64).decode("utf-8") updated = incoming if operation == "replace" else f"{existing}{incoming}" data = updated.encode("utf-8") return ToolResult( success=True, result={"message": f"Markdown updated: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/markdown", ) def _file_xml_export(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "export.xml", force_ext=".xml") xml = self._stringify_text_payload(args, key="xml") data = xml.encode("utf-8") return ToolResult( success=True, result={"message": f"XML exported: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="application/xml", ) def _file_html_export(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "export.html", force_ext=".html") html = self._stringify_text_payload(args, key="html") data = html.encode("utf-8") return ToolResult( success=True, result={"message": f"HTML exported: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/html", ) @staticmethod def _image_format_for_name(file_name: str, fallback: str = "PNG") -> str: suffix = PurePath(file_name).suffix.lower() mapping = { ".png": "PNG", ".jpg": "JPEG", ".jpeg": "JPEG", ".webp": "WEBP", ".gif": "GIF", ".bmp": "BMP", ".tif": "TIFF", ".tiff": "TIFF", } return mapping.get(suffix, fallback) @staticmethod def _mime_for_image_format(fmt: str) -> str: mapping = { "PNG": "image/png", "JPEG": "image/jpeg", "WEBP": "image/webp", "GIF": "image/gif", "BMP": "image/bmp", "TIFF": "image/tiff", } return mapping.get(fmt.upper(), "application/octet-stream") def _file_image_create(self, args: Dict[str, Any]) -> ToolResult: from PIL import Image, ImageDraw file_name = self._sanitize_file_name(args.get("file_name"), "image.png") fmt = self._image_format_for_name(file_name, fallback=str(args.get("format") or "PNG")) width = max(1, int(args.get("width") or 1024)) height = max(1, int(args.get("height") or 1024)) color = args.get("background_color") or args.get("color") or "white" image = Image.new("RGB", (width, height), color=color) text = args.get("text") if text: draw = ImageDraw.Draw(image) draw.text((20, 20), str(text), fill=args.get("text_color") or "black") out = BytesIO() image.save(out, format=fmt) payload = out.getvalue() return ToolResult( success=True, result={"message": f"Image created: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime=self._mime_for_image_format(fmt), ) def _file_image_edit(self, args: Dict[str, Any]) -> ToolResult: from PIL import Image, ImageDraw src_b64 = args.get("file_base64") operations = args.get("operations") or [] if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for image_edit") if not isinstance(operations, list) or not operations: return ToolResult(success=False, result=None, error="operations must be non-empty array") file_name = self._sanitize_file_name(args.get("file_name"), "edited.png") fmt = self._image_format_for_name(file_name, fallback=str(args.get("format") or "PNG")) image = Image.open(BytesIO(self._bytes_from_b64(src_b64))) for op in operations: if not isinstance(op, dict): return ToolResult(success=False, result=None, error="Each operation must be object") op_type = str(op.get("type") or "").strip().lower() if op_type == "resize": width = max(1, int(op.get("width") or image.width)) height = max(1, int(op.get("height") or image.height)) image = image.resize((width, height)) elif op_type == "crop": left = int(op.get("left") or 0) top = int(op.get("top") or 0) right = int(op.get("right") or image.width) bottom = int(op.get("bottom") or image.height) image = image.crop((left, top, right, bottom)) elif op_type == "rotate": angle = float(op.get("angle") or 0) image = image.rotate(angle, expand=bool(op.get("expand", True))) elif op_type == "flip_horizontal": image = image.transpose(Image.FLIP_LEFT_RIGHT) elif op_type == "flip_vertical": image = image.transpose(Image.FLIP_TOP_BOTTOM) elif op_type == "draw_text": draw = ImageDraw.Draw(image) x = int(op.get("x") or 0) y = int(op.get("y") or 0) draw.text((x, y), str(op.get("text") or ""), fill=op.get("color") or "black") else: return ToolResult(success=False, result=None, error=f"Unsupported image_edit operation: {op_type}") out = BytesIO() image.save(out, format=fmt) payload = out.getvalue() return ToolResult( success=True, result={"message": f"Image edited: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime=self._mime_for_image_format(fmt), ) def _file_image_convert(self, args: Dict[str, Any]) -> ToolResult: from PIL import Image src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for image_convert") file_name = self._sanitize_file_name(args.get("file_name"), "converted.png") fmt = str(args.get("target_format") or self._image_format_for_name(file_name)).upper() if fmt == "JPG": fmt = "JPEG" image = Image.open(BytesIO(self._bytes_from_b64(src_b64))) if fmt in {"JPEG"} and image.mode not in {"RGB", "L"}: image = image.convert("RGB") out = BytesIO() save_kwargs: Dict[str, Any] = {} if fmt in {"JPEG", "WEBP"} and args.get("quality") is not None: save_kwargs["quality"] = int(args.get("quality")) image.save(out, format=fmt, **save_kwargs) payload = out.getvalue() return ToolResult( success=True, result={"message": f"Image converted: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime=self._mime_for_image_format(fmt), ) def _file_image_bundle(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "images.zip", force_ext=".zip") images = args.get("images") or args.get("entries") or [] if not isinstance(images, list) or not images: return ToolResult(success=False, result=None, error="images must be non-empty array") out = BytesIO() with ZipFile(out, mode="w", compression=ZIP_DEFLATED) as zf: for idx, item in enumerate(images, start=1): if not isinstance(item, dict): return ToolResult(success=False, result=None, error=f"images[{idx-1}] must be object") src_b64 = item.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error=f"images[{idx-1}].file_base64 is required") in_name = self._sanitize_file_name(item.get("file_name"), f"image_{idx}.bin") zf.writestr(in_name, self._bytes_from_b64(src_b64)) payload = out.getvalue() return ToolResult( success=True, result={"message": f"Image bundle created: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="application/zip", ) @staticmethod def _strip_ns(tag: str) -> str: return tag.split("}", 1)[1] if "}" in tag else tag @staticmethod def _safe_int(value: Any, default: int = 0) -> int: try: text = str(value).strip() if text.endswith("px"): text = text[:-2] return int(float(text)) except Exception: return default @staticmethod def _safe_float(value: Any, default: float = 0.0) -> float: try: text = str(value).strip() if text.endswith("px"): text = text[:-2] return float(text) except Exception: return default @staticmethod def _svg_style_map(elem: Any) -> Dict[str, str]: style = str(elem.attrib.get("style") or "") out: Dict[str, str] = {} for chunk in style.split(";"): if ":" not in chunk: continue k, v = chunk.split(":", 1) out[k.strip()] = v.strip() return out def _svg_paint(self, elem: Any, key: str, default: Optional[str]) -> Optional[str]: style = self._svg_style_map(elem) value = elem.attrib.get(key, style.get(key, default)) if value is None: return None text = str(value).strip() if not text or text.lower() == "none": return None return text @staticmethod def _svg_color(value: Optional[str], fallback: Optional[tuple[int, int, int]] = None) -> Optional[tuple[int, int, int]]: if value is None: return fallback try: from PIL import ImageColor return ImageColor.getrgb(value) except Exception: return fallback @staticmethod def _svg_points(raw: Any) -> List[tuple[float, float]]: text = str(raw or "").replace(",", " ") nums: List[float] = [] for token in text.split(): try: nums.append(float(token)) except Exception: continue pts: List[tuple[float, float]] = [] for i in range(0, len(nums) - 1, 2): pts.append((nums[i], nums[i + 1])) return pts def _file_svg_export(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "image.svg", force_ext=".svg") svg_raw = args.get("svg") if svg_raw is not None: payload = str(svg_raw).encode("utf-8") return ToolResult( success=True, result={"message": f"SVG exported: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="image/svg+xml", ) width = max(1, int(args.get("width") or 1024)) height = max(1, int(args.get("height") or 1024)) bg = str(args.get("background_color") or "white") text = xml_escape(str(args.get("text") or "")) text_color = str(args.get("text_color") or "black") text_x = int(args.get("text_x") or 20) text_y = int(args.get("text_y") or 40) # Minimal deterministic SVG template for safe generation. svg = ( f'' f'' f'{text}' f"" ) payload = svg.encode("utf-8") return ToolResult( success=True, result={"message": f"SVG exported: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="image/svg+xml", ) def _file_svg_to_png(self, args: Dict[str, Any]) -> ToolResult: from PIL import Image, ImageDraw file_name = self._sanitize_file_name(args.get("file_name"), "converted.png", force_ext=".png") src_b64 = args.get("file_base64") svg_text = args.get("svg") if src_b64: svg_text = self._bytes_from_b64(src_b64).decode("utf-8", errors="ignore") if not svg_text: return ToolResult(success=False, result=None, error="file_base64 or svg is required for svg_to_png") try: root = ET.fromstring(svg_text) except Exception as exc: return ToolResult(success=False, result=None, error=f"Invalid SVG: {exc}") width = self._safe_int(root.attrib.get("width"), 1024) height = self._safe_int(root.attrib.get("height"), 1024) width = max(1, width) height = max(1, height) image = Image.new("RGBA", (width, height), color="white") draw = ImageDraw.Draw(image) for elem in root.iter(): tag = self._strip_ns(elem.tag) if tag == "rect": x = self._safe_int(elem.attrib.get("x"), 0) y = self._safe_int(elem.attrib.get("y"), 0) w = self._safe_int(elem.attrib.get("width"), width) h = self._safe_int(elem.attrib.get("height"), height) fill = self._svg_paint(elem, "fill", "white") stroke = self._svg_paint(elem, "stroke", None) stroke_width = max(1, self._safe_int(elem.attrib.get("stroke-width"), 1)) color = self._svg_color(fill, (255, 255, 255) if fill else None) outline = None if stroke: outline = self._svg_color(stroke, (0, 0, 0)) draw.rectangle([x, y, x + max(0, w), y + max(0, h)], fill=color, outline=outline, width=stroke_width) elif tag == "circle": cx = self._safe_float(elem.attrib.get("cx"), width / 2.0) cy = self._safe_float(elem.attrib.get("cy"), height / 2.0) r = max(0.0, self._safe_float(elem.attrib.get("r"), 0.0)) fill = self._svg_paint(elem, "fill", None) stroke = self._svg_paint(elem, "stroke", None) stroke_width = max(1, self._safe_int(elem.attrib.get("stroke-width"), 1)) fill_color = self._svg_color(fill, None) outline = self._svg_color(stroke, None) draw.ellipse([cx - r, cy - r, cx + r, cy + r], fill=fill_color, outline=outline, width=stroke_width) elif tag == "ellipse": cx = self._safe_float(elem.attrib.get("cx"), width / 2.0) cy = self._safe_float(elem.attrib.get("cy"), height / 2.0) rx = max(0.0, self._safe_float(elem.attrib.get("rx"), 0.0)) ry = max(0.0, self._safe_float(elem.attrib.get("ry"), 0.0)) fill = self._svg_paint(elem, "fill", None) stroke = self._svg_paint(elem, "stroke", None) stroke_width = max(1, self._safe_int(elem.attrib.get("stroke-width"), 1)) fill_color = self._svg_color(fill, None) outline = self._svg_color(stroke, None) draw.ellipse([cx - rx, cy - ry, cx + rx, cy + ry], fill=fill_color, outline=outline, width=stroke_width) elif tag == "line": x1 = self._safe_float(elem.attrib.get("x1"), 0.0) y1 = self._safe_float(elem.attrib.get("y1"), 0.0) x2 = self._safe_float(elem.attrib.get("x2"), 0.0) y2 = self._safe_float(elem.attrib.get("y2"), 0.0) stroke = self._svg_paint(elem, "stroke", "black") stroke_width = max(1, self._safe_int(elem.attrib.get("stroke-width"), 1)) color = self._svg_color(stroke, (0, 0, 0)) or (0, 0, 0) draw.line([(x1, y1), (x2, y2)], fill=color, width=stroke_width) elif tag == "polyline": points = self._svg_points(elem.attrib.get("points")) if len(points) >= 2: stroke = self._svg_paint(elem, "stroke", "black") stroke_width = max(1, self._safe_int(elem.attrib.get("stroke-width"), 1)) color = self._svg_color(stroke, (0, 0, 0)) or (0, 0, 0) draw.line(points, fill=color, width=stroke_width) elif tag == "polygon": points = self._svg_points(elem.attrib.get("points")) if len(points) >= 3: fill = self._svg_paint(elem, "fill", None) stroke = self._svg_paint(elem, "stroke", None) stroke_width = max(1, self._safe_int(elem.attrib.get("stroke-width"), 1)) fill_color = self._svg_color(fill, None) outline = self._svg_color(stroke, None) draw.polygon(points, fill=fill_color, outline=outline) # Pillow polygon has no width support; emulate thicker stroke if outline and stroke_width > 1: draw.line(points + [points[0]], fill=outline, width=stroke_width) elif tag == "text": x = self._safe_int(elem.attrib.get("x"), 0) y = self._safe_int(elem.attrib.get("y"), 0) fill = self._svg_paint(elem, "fill", "black") color = self._svg_color(fill, (0, 0, 0)) or (0, 0, 0) draw.text((x, y), elem.text or "", fill=color) out = BytesIO() image.convert("RGB").save(out, format="PNG") payload = out.getvalue() return ToolResult( success=True, result={"message": f"SVG converted to PNG: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="image/png", ) def _file_yaml_export(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "export.yaml", force_ext=".yaml") content = args.get("content") payload = json.dumps(content, ensure_ascii=False, indent=2).encode("utf-8") try: import yaml payload = yaml.safe_dump(content, allow_unicode=True, sort_keys=False).encode("utf-8") except Exception: # Fallback to JSON serialization if PyYAML fails. pass return ToolResult( success=True, result={"message": f"YAML exported: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="application/x-yaml", ) def _file_zip_bundle(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "bundle.zip", force_ext=".zip") entries = args.get("entries") or [] if not isinstance(entries, list) or not entries: return ToolResult(success=False, result=None, error="entries must be non-empty array") out = BytesIO() with ZipFile(out, mode="w", compression=ZIP_DEFLATED) as zf: for idx, entry in enumerate(entries, start=1): if not isinstance(entry, dict): return ToolResult(success=False, result=None, error=f"entry[{idx-1}] must be object") ename = self._sanitize_file_name(entry.get("file_name"), f"file_{idx}.bin") if entry.get("file_base64"): zf.writestr(ename, self._bytes_from_b64(entry["file_base64"])) elif "text" in entry: zf.writestr(ename, str(entry["text"]).encode("utf-8")) elif "content" in entry: zf.writestr(ename, json.dumps(entry["content"], ensure_ascii=False, indent=2).encode("utf-8")) else: return ToolResult( success=False, result=None, error=f"entry[{idx-1}] requires file_base64|text|content", ) return ToolResult( success=True, result={"message": f"ZIP bundle created: {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/zip", ) def _file_excel_create(self, args: Dict[str, Any]) -> ToolResult: import openpyxl file_name = self._sanitize_file_name(args.get("file_name"), "report.xlsx", force_ext=".xlsx") sheets = args.get("sheets") wb = openpyxl.Workbook() wb.remove(wb.active) created = False if isinstance(sheets, list) and sheets: for idx, sheet in enumerate(sheets, start=1): if not isinstance(sheet, dict): return ToolResult(success=False, result=None, error=f"sheets[{idx-1}] must be object") sheet_name = str(sheet.get("name") or f"Sheet{idx}")[:31] headers = sheet.get("headers") or [] rows_raw = sheet.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) ws = wb.create_sheet(title=sheet_name) self._append_sheet_data(ws, headers, rows) created = True else: sheet_name = str(args.get("sheet_name") or "Sheet1")[:31] headers = args.get("headers") or [] rows_raw = args.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) ws = wb.create_sheet(title=sheet_name) self._append_sheet_data(ws, headers, rows) created = True if not created: wb.create_sheet(title="Sheet1") out = BytesIO() wb.save(out) return ToolResult( success=True, result={"message": f"Excel created: {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ) def _file_excel_update(self, args: Dict[str, Any]) -> ToolResult: import openpyxl src_b64 = args.get("file_base64") operations = args.get("operations") or [] if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for excel_update") if not isinstance(operations, list) or not operations: return ToolResult(success=False, result=None, error="operations must be non-empty array") file_name = self._sanitize_file_name(args.get("file_name"), "updated.xlsx", force_ext=".xlsx") wb = openpyxl.load_workbook(filename=BytesIO(self._bytes_from_b64(src_b64))) for op in operations: if not isinstance(op, dict): return ToolResult(success=False, result=None, error="Each operation must be object") op_type = str(op.get("type") or "").strip().lower() if op_type == "append_rows": sheet = str(op.get("sheet") or wb.sheetnames[0])[:31] if sheet not in wb.sheetnames: wb.create_sheet(title=sheet) ws = wb[sheet] rows_raw = op.get("rows") or [] header_row = [c.value for c in ws[1]] if ws.max_row >= 1 else [] rows = self._normalize_rows(rows_raw, headers=header_row if header_row else None) if rows and not header_row and isinstance(rows_raw[0], dict): header_row = list(rows_raw[0].keys()) ws.append(header_row) rows = self._normalize_rows(rows_raw, headers=header_row) for row in rows: ws.append(row) elif op_type == "set_cell": sheet = str(op.get("sheet") or wb.sheetnames[0])[:31] cell = op.get("cell") if not cell: return ToolResult(success=False, result=None, error="set_cell operation requires 'cell'") if sheet not in wb.sheetnames: wb.create_sheet(title=sheet) wb[sheet][str(cell)] = op.get("value", "") elif op_type == "replace_sheet": sheet = str(op.get("sheet") or wb.sheetnames[0])[:31] if sheet in wb.sheetnames: wb.remove(wb[sheet]) ws = wb.create_sheet(title=sheet) headers = op.get("headers") or [] rows_raw = op.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) self._append_sheet_data(ws, headers, rows) elif op_type == "rename_sheet": src = str(op.get("from") or "") dst = str(op.get("to") or "").strip() if not src or not dst: return ToolResult(success=False, result=None, error="rename_sheet requires 'from' and 'to'") if src not in wb.sheetnames: return ToolResult(success=False, result=None, error=f"Sheet not found: {src}") wb[src].title = dst[:31] else: return ToolResult(success=False, result=None, error=f"Unsupported excel_update operation: {op_type}") out = BytesIO() wb.save(out) return ToolResult( success=True, result={"message": f"Excel updated: {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ) def _file_docx_create(self, args: Dict[str, Any]) -> ToolResult: from docx import Document file_name = self._sanitize_file_name(args.get("file_name"), "document.docx", force_ext=".docx") doc = Document() title = args.get("title") if title: doc.add_heading(str(title), level=1) for item in args.get("paragraphs") or []: doc.add_paragraph(str(item)) for table in args.get("tables") or []: if not isinstance(table, dict): continue headers = [str(h) for h in (table.get("headers") or [])] rows_raw = table.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) total_rows = len(rows) + (1 if headers else 0) total_cols = len(headers) if headers else (len(rows[0]) if rows else 1) t = doc.add_table(rows=max(total_rows, 1), cols=max(total_cols, 1)) row_offset = 0 if headers: for idx, value in enumerate(headers): t.cell(0, idx).text = str(value) row_offset = 1 for ridx, row in enumerate(rows): for cidx, value in enumerate(row): if cidx < total_cols: t.cell(ridx + row_offset, cidx).text = str(value) out = BytesIO() doc.save(out) return ToolResult( success=True, result={"message": f"DOCX created: {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document", ) def _file_docx_update(self, args: Dict[str, Any]) -> ToolResult: from docx import Document src_b64 = args.get("file_base64") operations = args.get("operations") or [] if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for docx_update") if not isinstance(operations, list) or not operations: return ToolResult(success=False, result=None, error="operations must be non-empty array") file_name = self._sanitize_file_name(args.get("file_name"), "updated.docx", force_ext=".docx") doc = Document(BytesIO(self._bytes_from_b64(src_b64))) for op in operations: if not isinstance(op, dict): return ToolResult(success=False, result=None, error="Each operation must be object") op_type = str(op.get("type") or "").strip().lower() if op_type == "append_paragraph": doc.add_paragraph(str(op.get("text") or "")) elif op_type == "append_heading": level = int(op.get("level") or 1) level = max(1, min(level, 9)) doc.add_heading(str(op.get("text") or ""), level=level) elif op_type == "replace_text": old = str(op.get("old") or "") new = str(op.get("new") or "") if not old: return ToolResult(success=False, result=None, error="replace_text requires old") for p in doc.paragraphs: if old in p.text: p.text = p.text.replace(old, new) for table in doc.tables: for row in table.rows: for cell in row.cells: if old in cell.text: cell.text = cell.text.replace(old, new) elif op_type == "append_table": headers = [str(h) for h in (op.get("headers") or [])] rows_raw = op.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) total_rows = len(rows) + (1 if headers else 0) total_cols = len(headers) if headers else (len(rows[0]) if rows else 1) t = doc.add_table(rows=max(total_rows, 1), cols=max(total_cols, 1)) row_offset = 0 if headers: for idx, value in enumerate(headers): t.cell(0, idx).text = str(value) row_offset = 1 for ridx, row in enumerate(rows): for cidx, value in enumerate(row): if cidx < total_cols: t.cell(ridx + row_offset, cidx).text = str(value) else: return ToolResult(success=False, result=None, error=f"Unsupported docx_update operation: {op_type}") out = BytesIO() doc.save(out) return ToolResult( success=True, result={"message": f"DOCX updated: {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document", ) def _file_pptx_create(self, args: Dict[str, Any]) -> ToolResult: from pptx import Presentation file_name = self._sanitize_file_name(args.get("file_name"), "slides.pptx", force_ext=".pptx") prs = Presentation() title = str(args.get("title") or "").strip() subtitle = str(args.get("subtitle") or "").strip() if title or subtitle: slide = prs.slides.add_slide(prs.slide_layouts[0]) if title and slide.shapes.title: slide.shapes.title.text = title if subtitle and len(slide.placeholders) > 1: slide.placeholders[1].text = subtitle for entry in args.get("slides") or []: if not isinstance(entry, dict): continue slide = prs.slides.add_slide(prs.slide_layouts[1]) if slide.shapes.title: slide.shapes.title.text = str(entry.get("title") or "") body = None if len(slide.placeholders) > 1: body = slide.placeholders[1].text_frame lines = entry.get("bullets") if lines is None: lines = entry.get("lines") if lines is None: lines = [entry.get("text")] if entry.get("text") is not None else [] if body is not None: body.clear() first = True for line in lines: if first: body.text = str(line) first = False else: p = body.add_paragraph() p.text = str(line) out = BytesIO() prs.save(out) return ToolResult( success=True, result={"message": f"PPTX created: {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/vnd.openxmlformats-officedocument.presentationml.presentation", ) def _file_pptx_update(self, args: Dict[str, Any]) -> ToolResult: from pptx import Presentation from pptx.util import Inches src_b64 = args.get("file_base64") operations = args.get("operations") or [] if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for pptx_update") if not isinstance(operations, list) or not operations: return ToolResult(success=False, result=None, error="operations must be non-empty array") file_name = self._sanitize_file_name(args.get("file_name"), "updated.pptx", force_ext=".pptx") prs = Presentation(BytesIO(self._bytes_from_b64(src_b64))) for op in operations: if not isinstance(op, dict): return ToolResult(success=False, result=None, error="Each operation must be object") op_type = str(op.get("type") or "").strip().lower() if op_type == "append_slide": layout_idx = int(op.get("layout") or 1) if layout_idx < 0 or layout_idx >= len(prs.slide_layouts): layout_idx = 1 slide = prs.slides.add_slide(prs.slide_layouts[layout_idx]) if slide.shapes.title: slide.shapes.title.text = str(op.get("title") or "") lines = op.get("bullets") if lines is None: lines = op.get("lines") if lines is None: lines = [op.get("text")] if op.get("text") is not None else [] if len(slide.placeholders) > 1: body = slide.placeholders[1].text_frame body.clear() first = True for line in lines: if first: body.text = str(line) first = False else: p = body.add_paragraph() p.text = str(line) elif op_type == "add_table": slide_index = int(op.get("slide_index") or len(prs.slides)) if slide_index < 0: slide_index = 0 while len(prs.slides) <= slide_index: prs.slides.add_slide(prs.slide_layouts[1]) slide = prs.slides[slide_index] headers = [str(h) for h in (op.get("headers") or [])] rows_raw = op.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) row_count = len(rows) + (1 if headers else 0) col_count = len(headers) if headers else (len(rows[0]) if rows else 1) left = Inches(float(op.get("left_inches") or 1.0)) top = Inches(float(op.get("top_inches") or 1.5)) width = Inches(float(op.get("width_inches") or 8.0)) height = Inches(float(op.get("height_inches") or 3.0)) table = slide.shapes.add_table(max(1, row_count), max(1, col_count), left, top, width, height).table offset = 0 if headers: for idx, value in enumerate(headers): table.cell(0, idx).text = str(value) offset = 1 for r_idx, row in enumerate(rows): for c_idx, value in enumerate(row): if c_idx < col_count: table.cell(r_idx + offset, c_idx).text = str(value) elif op_type == "replace_text": old = str(op.get("old") or "") new = str(op.get("new") or "") if not old: return ToolResult(success=False, result=None, error="replace_text requires old") for slide in prs.slides: for shape in slide.shapes: if not hasattr(shape, "text"): continue text = shape.text or "" if old in text: shape.text = text.replace(old, new) elif op_type == "replace_text_preserve_layout": old = str(op.get("old") or "") new = str(op.get("new") or "") if not old: return ToolResult(success=False, result=None, error="replace_text_preserve_layout requires old") for slide in prs.slides: for shape in slide.shapes: if hasattr(shape, "text_frame") and shape.text_frame: for paragraph in shape.text_frame.paragraphs: for run in paragraph.runs: if old in run.text: run.text = run.text.replace(old, new) if getattr(shape, "has_table", False): for row in shape.table.rows: for cell in row.cells: for paragraph in cell.text_frame.paragraphs: for run in paragraph.runs: if old in run.text: run.text = run.text.replace(old, new) else: return ToolResult(success=False, result=None, error=f"Unsupported pptx_update operation: {op_type}") out = BytesIO() prs.save(out) return ToolResult( success=True, result={"message": f"PPTX updated: {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/vnd.openxmlformats-officedocument.presentationml.presentation", ) def _file_pdf_merge(self, args: Dict[str, Any]) -> ToolResult: from pypdf import PdfReader, PdfWriter file_name = self._sanitize_file_name(args.get("file_name"), "merged.pdf", force_ext=".pdf") files = args.get("files") or [] if not isinstance(files, list) or not files: return ToolResult(success=False, result=None, error="files must be non-empty array for pdf_merge") writer = PdfWriter() page_count = 0 for item in files: if not isinstance(item, dict) or not item.get("file_base64"): return ToolResult(success=False, result=None, error="Each file entry must include file_base64") reader = PdfReader(BytesIO(self._bytes_from_b64(item["file_base64"]))) for page in reader.pages: writer.add_page(page) page_count += 1 out = BytesIO() writer.write(out) return ToolResult( success=True, result={"message": f"PDF merged: {file_name} ({page_count} pages)"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/pdf", ) @staticmethod def _parse_split_pages(pages: Any) -> Optional[List[int]]: if not isinstance(pages, list) or not pages: return None parsed: List[int] = [] for p in pages: idx = int(p) if idx < 1: return None parsed.append(idx) return sorted(set(parsed)) def _file_pdf_split(self, args: Dict[str, Any]) -> ToolResult: from pypdf import PdfReader, PdfWriter src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for pdf_split") file_name = self._sanitize_file_name(args.get("file_name"), "split.zip", force_ext=".zip") reader = PdfReader(BytesIO(self._bytes_from_b64(src_b64))) total = len(reader.pages) if total == 0: return ToolResult(success=False, result=None, error="Input PDF has no pages") groups = args.get("groups") split_groups = [] if groups: if not isinstance(groups, list): return ToolResult(success=False, result=None, error="groups must be array") for idx, grp in enumerate(groups, start=1): if not isinstance(grp, dict): return ToolResult(success=False, result=None, error="Each group must be object") gname = self._sanitize_file_name(grp.get("file_name"), f"part_{idx}.pdf", force_ext=".pdf") pages = self._parse_split_pages(grp.get("pages")) if not pages: return ToolResult(success=False, result=None, error=f"Invalid pages in group {idx}") split_groups.append((gname, pages)) else: split_groups = [(f"page_{i+1}.pdf", [i + 1]) for i in range(total)] out = BytesIO() with ZipFile(out, mode="w", compression=ZIP_DEFLATED) as zf: for gname, pages in split_groups: writer = PdfWriter() for p in pages: if p > total: return ToolResult(success=False, result=None, error=f"Page out of range: {p} > {total}") writer.add_page(reader.pages[p - 1]) part = BytesIO() writer.write(part) zf.writestr(gname, part.getvalue()) return ToolResult( success=True, result={"message": f"PDF split into {len(split_groups)} file(s): {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/zip", ) def _file_pdf_fill(self, args: Dict[str, Any]) -> ToolResult: from pypdf import PdfReader, PdfWriter src_b64 = args.get("file_base64") fields = args.get("fields") or {} if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for pdf_fill") if not isinstance(fields, dict) or not fields: return ToolResult(success=False, result=None, error="fields must be a non-empty object") file_name = self._sanitize_file_name(args.get("file_name"), "filled.pdf", force_ext=".pdf") reader = PdfReader(BytesIO(self._bytes_from_b64(src_b64))) writer = PdfWriter() writer.append(reader) filled = True try: for page in writer.pages: writer.update_page_form_field_values(page, fields) if hasattr(writer, "set_need_appearances_writer"): writer.set_need_appearances_writer(True) except Exception: filled = False out = BytesIO() writer.write(out) msg = f"PDF form filled: {file_name}" if filled else f"PDF has no fillable form fields, returned unchanged: {file_name}" return ToolResult( success=True, result={"message": msg}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/pdf", ) def _file_pdf_update(self, args: Dict[str, Any]) -> ToolResult: from pypdf import PdfReader, PdfWriter src_b64 = args.get("file_base64") operations = args.get("operations") or [] if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for pdf_update") if not isinstance(operations, list) or not operations: return ToolResult(success=False, result=None, error="operations must be non-empty array") file_name = self._sanitize_file_name(args.get("file_name"), "updated.pdf", force_ext=".pdf") reader = PdfReader(BytesIO(self._bytes_from_b64(src_b64))) pages = [reader.pages[i] for i in range(len(reader.pages))] total = len(pages) if total == 0: return ToolResult(success=False, result=None, error="Input PDF has no pages") def parse_pages_list(raw: Any, allow_empty: bool = False) -> Optional[List[int]]: if raw is None: return [] if allow_empty else None if not isinstance(raw, list) or (not raw and not allow_empty): return None out: List[int] = [] for val in raw: try: idx = int(val) except Exception: return None if idx < 1 or idx > len(pages): return None out.append(idx) return out for op in operations: if not isinstance(op, dict): return ToolResult(success=False, result=None, error="Each operation must be object") op_type = str(op.get("type") or "").strip().lower() if op_type == "rotate_pages": angle = int(op.get("angle") or 90) if angle not in {90, 180, 270}: return ToolResult(success=False, result=None, error="rotate_pages angle must be 90|180|270") target = parse_pages_list(op.get("pages"), allow_empty=True) if target is None: return ToolResult(success=False, result=None, error="Invalid pages for rotate_pages") targets = target or list(range(1, len(pages) + 1)) for p in targets: page = pages[p - 1] try: pages[p - 1] = page.rotate(angle) except Exception: if hasattr(page, "rotate_clockwise"): page.rotate_clockwise(angle) pages[p - 1] = page else: return ToolResult(success=False, result=None, error="PDF rotation not supported by library") elif op_type == "remove_pages": target = parse_pages_list(op.get("pages")) if not target: return ToolResult(success=False, result=None, error="remove_pages requires pages") drop = set(target) pages = [p for idx, p in enumerate(pages, start=1) if idx not in drop] if not pages: return ToolResult(success=False, result=None, error="remove_pages removed all pages") elif op_type in {"reorder_pages", "extract_pages"}: target = parse_pages_list(op.get("pages")) if not target: return ToolResult(success=False, result=None, error=f"{op_type} requires pages") pages = [pages[i - 1] for i in target] elif op_type == "set_metadata": # Applied later on writer to avoid page object recreation. continue else: return ToolResult(success=False, result=None, error=f"Unsupported pdf_update operation: {op_type}") writer = PdfWriter() for page in pages: writer.add_page(page) for op in operations: if isinstance(op, dict) and str(op.get("type") or "").strip().lower() == "set_metadata": meta = op.get("metadata") if isinstance(meta, dict) and meta: normalized = {k if str(k).startswith("/") else f"/{k}": str(v) for k, v in meta.items()} writer.add_metadata(normalized) out = BytesIO() writer.write(out) return ToolResult( success=True, result={"message": f"PDF updated: {file_name} ({len(pages)} pages)"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/pdf", ) def _file_djvu_to_pdf(self, args: Dict[str, Any]) -> ToolResult: src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for djvu_to_pdf") file_name = self._sanitize_file_name(args.get("file_name"), "converted.pdf", force_ext=".pdf") timeout_sec = max(5, min(int(args.get("timeout_sec") or 60), 300)) with tempfile.TemporaryDirectory(prefix="djvu2pdf_") as tmpdir: src = os.path.join(tmpdir, "input.djvu") out_pdf = os.path.join(tmpdir, "output.pdf") with open(src, "wb") as f: f.write(self._bytes_from_b64(src_b64)) try: proc = subprocess.run( ["ddjvu", "-format=pdf", src, out_pdf], capture_output=True, text=True, timeout=timeout_sec, check=False, ) except FileNotFoundError: return ToolResult(success=False, result=None, error="ddjvu not found in runtime image") except subprocess.TimeoutExpired: return ToolResult(success=False, result=None, error=f"DJVU conversion timed out ({timeout_sec}s)") if proc.returncode != 0 or not os.path.exists(out_pdf): stderr = (proc.stderr or "").strip() return ToolResult(success=False, result=None, error=f"ddjvu failed: {stderr or 'unknown error'}") data = open(out_pdf, "rb").read() if not data: return ToolResult(success=False, result=None, error="ddjvu produced empty PDF") return ToolResult( success=True, result={"message": f"DJVU converted to PDF: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="application/pdf", ) def _file_djvu_extract_text(self, args: Dict[str, Any]) -> ToolResult: src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for djvu_extract_text") file_name = self._sanitize_file_name(args.get("file_name"), "extracted.txt", force_ext=".txt") timeout_sec = max(5, min(int(args.get("timeout_sec") or 60), 300)) with tempfile.TemporaryDirectory(prefix="djvutxt_") as tmpdir: src = os.path.join(tmpdir, "input.djvu") with open(src, "wb") as f: f.write(self._bytes_from_b64(src_b64)) try: proc = subprocess.run( ["djvutxt", src], capture_output=True, text=True, timeout=timeout_sec, check=False, ) except FileNotFoundError: return ToolResult(success=False, result=None, error="djvutxt not found in runtime image") except subprocess.TimeoutExpired: return ToolResult(success=False, result=None, error=f"DJVU text extraction timed out ({timeout_sec}s)") if proc.returncode != 0: stderr = (proc.stderr or "").strip() return ToolResult(success=False, result=None, error=f"djvutxt failed: {stderr or 'unknown error'}") text = proc.stdout or "" msg = f"DJVU text extracted: {file_name}" if not text.strip(): msg = f"DJVU has no extractable text layer, returned empty text file: {file_name}" payload = text.encode("utf-8") return ToolResult( success=True, result={"message": msg}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="text/plain", ) async def _memory_search(self, args: Dict, agent_id: str = None, chat_id: str = None, user_id: str = None) -> ToolResult: """Search in Qdrant vector memory using Router's memory_retrieval - PRIORITY 1""" query = args.get("query") try: # Use Router's memory_retrieval pipeline directly (has Qdrant connection) from memory_retrieval import memory_retrieval if memory_retrieval and memory_retrieval.qdrant_client: results = await memory_retrieval.search_memories( query=query, agent_id=agent_id or "helion", chat_id=chat_id, user_id=user_id, limit=5 ) if results: formatted = [] for r in results: text = r.get("text", "") score = r.get("score", 0) mem_type = r.get("type", "memory") if text: formatted.append(f"• [{mem_type}] {text[:200]}... (релевантність: {score:.2f})") if formatted: return ToolResult(success=True, result=f"🧠 Знайдено в пам'яті:\n" + "\n".join(formatted)) return ToolResult(success=True, result="🧠 В моїй пам'яті немає інформації про це.") else: return ToolResult(success=True, result="🧠 Пам'ять недоступна, спробую web_search.") except Exception as e: logger.warning(f"Memory search error: {e}") return ToolResult(success=True, result="🧠 Не вдалося перевірити пам'ять. Спробую інші джерела.") async def _web_search(self, args: Dict) -> ToolResult: """Execute web search - PRIORITY 2 (use after memory_search)""" query = args.get("query") max_results = args.get("max_results", 5) if not query: return ToolResult(success=False, result=None, error="query is required") try: resp = await self.http_client.post( f"{self.swapper_url}/web/search", json={"query": query, "max_results": max_results} ) if resp.status_code == 200: data = resp.json() results = data.get("results", []) or [] query_terms = {t for t in str(query).lower().replace("/", " ").replace("-", " ").split() if len(t) > 2} trusted_domains = { "wikipedia.org", "wikidata.org", "europa.eu", "fao.org", "who.int", "worldbank.org", "oecd.org", "un.org", "gov.ua", "rada.gov.ua", "kmu.gov.ua", "minagro.gov.ua", "agroportal.ua", "latifundist.com", } low_signal_domains = { "pinterest.com", "tiktok.com", "instagram.com", "facebook.com", "youtube.com", "yandex.", "vk.com", } def _extract_domain(url: str) -> str: if not url: return "" u = url.lower().strip() u = u.replace("https://", "").replace("http://", "") u = u.split("/")[0] if u.startswith("www."): u = u[4:] return u def _overlap_score(title: str, snippet: str, url: str) -> int: text = " ".join([title or "", snippet or "", url or ""]).lower() score = 0 for t in query_terms: if t in text: score += 2 return score ranked: List[Any] = [] for r in results: title = str(r.get("title", "") or "") snippet = str(r.get("snippet", "") or "") url = str(r.get("url", "") or "") domain = _extract_domain(url) score = _overlap_score(title, snippet, url) if any(x in domain for x in low_signal_domains): score -= 2 if any(domain == d or domain.endswith("." + d) for d in trusted_domains): score += 2 if not snippet: score -= 1 if len(title.strip()) < 5: score -= 1 ranked.append((score, r)) ranked.sort(key=lambda x: x[0], reverse=True) selected = [item for _, item in ranked[:max_results]] logger.info(f"🔎 web_search rerank: raw={len(results)} ranked={len(selected)} query='{query[:120]}'") formatted = [] for r in selected: formatted.append( f"- {r.get('title', 'No title')}\n {r.get('snippet', '')}\n URL: {r.get('url', '')}" ) return ToolResult(success=True, result="\n".join(formatted) if formatted else "Нічого не знайдено") else: return ToolResult(success=False, result=None, error=f"Search failed: {resp.status_code}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _web_extract(self, args: Dict) -> ToolResult: """Extract content from URL""" url = args.get("url") try: resp = await self.http_client.post( f"{self.swapper_url}/web/extract", json={"url": url} ) if resp.status_code == 200: data = resp.json() content = data.get("content", "") # Truncate if too long if len(content) > 4000: content = content[:4000] + "\n... (текст обрізано)" return ToolResult(success=True, result=content) else: return ToolResult(success=False, result=None, error=f"Extract failed: {resp.status_code}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _unload_ollama_models(self): """Unload all Ollama models to free VRAM for heavy operations like FLUX""" ollama_url = os.getenv("OLLAMA_BASE_URL", "http://172.18.0.1:11434") models_to_unload = ["qwen3:8b", "qwen3-vl:8b"] for model in models_to_unload: try: await self.http_client.post( f"{ollama_url}/api/generate", json={"model": model, "keep_alive": 0}, timeout=5.0 ) logger.info(f"🧹 Unloaded Ollama model: {model}") except Exception as e: logger.debug(f"Could not unload {model}: {e}") # Give GPU time to release memory import asyncio await asyncio.sleep(1) async def _unload_flux(self): """Unload FLUX model after image generation to free VRAM""" try: # Try to unload flux-klein-4b model await self.http_client.post( f"{self.swapper_url}/image/models/flux-klein-4b/unload", timeout=10.0 ) logger.info("🧹 Unloaded FLUX model from Swapper") except Exception as e: logger.debug(f"Could not unload FLUX: {e}") async def _image_generate(self, args: Dict) -> ToolResult: """Backward-compatible image generation entrypoint routed to Comfy (NODE3).""" if not args.get("prompt"): return ToolResult(success=False, result=None, error="prompt is required") comfy_args = dict(args) comfy_args.setdefault("negative_prompt", "blurry, low quality, watermark") comfy_args.setdefault("steps", 28) comfy_args.setdefault("timeout_s", 180) return await self._comfy_generate_image(comfy_args) async def _poll_comfy_job(self, job_id: str, timeout_s: int = 180) -> Dict[str, Any]: """Poll Comfy Agent job status until terminal state or timeout.""" loop = asyncio.get_running_loop() deadline = loop.time() + max(10, timeout_s) delay = 1.0 last: Dict[str, Any] = {} while loop.time() < deadline: resp = await self.http_client.get(f"{self.comfy_agent_url}/status/{job_id}", timeout=30.0) if resp.status_code != 200: raise RuntimeError(f"Comfy status failed: {resp.status_code}") data = resp.json() last = data status = (data.get("status") or "").lower() if status in {"succeeded", "finished"}: return data if status in {"failed", "canceled", "cancelled", "expired"}: return data await asyncio.sleep(delay) delay = min(delay * 1.5, 5.0) raise TimeoutError(f"Comfy job timeout after {timeout_s}s (job_id={job_id})") async def _comfy_generate_image(self, args: Dict) -> ToolResult: """Generate image via Comfy Agent on NODE3 and return URL when ready.""" prompt = args.get("prompt") if not prompt: return ToolResult(success=False, result=None, error="prompt is required") payload = { "prompt": prompt, "negative_prompt": args.get("negative_prompt", "blurry, low quality, watermark"), "width": int(args.get("width", 512)), "height": int(args.get("height", 512)), "steps": int(args.get("steps", 28)), } if args.get("seed") is not None: payload["seed"] = int(args["seed"]) timeout_s = int(args.get("timeout_s", 180)) idem_key = args.get("idempotency_key") or f"router-{uuid.uuid4().hex}" try: resp = await self.http_client.post( f"{self.comfy_agent_url}/generate/image", json=payload, headers={"Idempotency-Key": idem_key}, timeout=30.0, ) if resp.status_code != 200: return ToolResult(success=False, result=None, error=f"Comfy image request failed: {resp.status_code}") created = resp.json() job_id = created.get("job_id") if not job_id: return ToolResult(success=False, result=None, error="Comfy image request did not return job_id") final = await self._poll_comfy_job(job_id, timeout_s=timeout_s) status = (final.get("status") or "").lower() if status in {"succeeded", "finished"}: result_url = final.get("result_url") if result_url: return ToolResult(success=True, result=f"✅ Зображення згенеровано: {result_url}") return ToolResult(success=True, result=f"✅ Зображення згенеровано. job_id={job_id}") return ToolResult(success=False, result=None, error=final.get("error") or f"Comfy image failed (status={status})") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _comfy_generate_video(self, args: Dict) -> ToolResult: """Generate video via Comfy Agent on NODE3 and return URL when ready.""" prompt = args.get("prompt") if not prompt: return ToolResult(success=False, result=None, error="prompt is required") payload = { "prompt": prompt, "seconds": int(args.get("seconds", 4)), "fps": int(args.get("fps", 24)), "steps": int(args.get("steps", 30)), } if args.get("seed") is not None: payload["seed"] = int(args["seed"]) timeout_s = int(args.get("timeout_s", 300)) idem_key = args.get("idempotency_key") or f"router-{uuid.uuid4().hex}" try: resp = await self.http_client.post( f"{self.comfy_agent_url}/generate/video", json=payload, headers={"Idempotency-Key": idem_key}, timeout=30.0, ) if resp.status_code != 200: return ToolResult(success=False, result=None, error=f"Comfy video request failed: {resp.status_code}") created = resp.json() job_id = created.get("job_id") if not job_id: return ToolResult(success=False, result=None, error="Comfy video request did not return job_id") final = await self._poll_comfy_job(job_id, timeout_s=timeout_s) status = (final.get("status") or "").lower() if status in {"succeeded", "finished"}: result_url = final.get("result_url") if result_url: return ToolResult(success=True, result=f"✅ Відео згенеровано: {result_url}") return ToolResult(success=True, result=f"✅ Відео згенеровано. job_id={job_id}") return ToolResult(success=False, result=None, error=final.get("error") or f"Comfy video failed (status={status})") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _graph_query(self, args: Dict, agent_id: str = None) -> ToolResult: """Query knowledge graph""" query = args.get("query") entity_type = args.get("entity_type") # Simple natural language to Cypher conversion cypher = f""" MATCH (n) WHERE toLower(n.name) CONTAINS toLower('{query}') OR toLower(toString(n)) CONTAINS toLower('{query}') RETURN labels(n)[0] as type, n.name as name, n.node_id as id LIMIT 10 """ if entity_type: cypher = f""" MATCH (n:{entity_type}) WHERE toLower(n.name) CONTAINS toLower('{query}') RETURN n.name as name, n.node_id as id LIMIT 10 """ try: # Execute via Router's graph endpoint resp = await self.http_client.post( "http://localhost:8000/v1/graph/query", json={"query": cypher} ) if resp.status_code == 200: data = resp.json() return ToolResult(success=True, result=json.dumps(data.get("results", []), ensure_ascii=False)) else: return ToolResult(success=False, result=None, error=f"Graph query failed: {resp.status_code}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _remember_fact(self, args: Dict, agent_id: str = None, chat_id: str = None, user_id: str = None) -> ToolResult: """Store a fact in memory with strict args validation.""" if not isinstance(args, dict) or not args: logger.warning("⚠️ remember_fact blocked: empty args") return ToolResult(success=False, result=None, error="invalid_tool_args: remember_fact requires {fact: }.") fact_raw = args.get("fact") if fact_raw is None: fact_raw = args.get("text") if not isinstance(fact_raw, str): logger.warning("⚠️ remember_fact blocked: fact/text must be string") return ToolResult(success=False, result=None, error="invalid_tool_args: fact/text must be string.") fact = fact_raw.strip() if not fact: logger.warning("⚠️ remember_fact blocked: empty fact/text") return ToolResult(success=False, result=None, error="invalid_tool_args: fact/text must be non-empty.") if len(fact) > 2000: logger.warning("⚠️ remember_fact blocked: fact too long (%s)", len(fact)) return ToolResult(success=False, result=None, error="invalid_tool_args: fact/text is too long (max 2000 chars).") category = str(args.get("category") or "general").strip() or "general" runtime_user_id = (str(user_id or "").strip() or str(args.get("user_id") or "").strip() or str(args.get("about") or "").strip()) if not runtime_user_id: logger.warning("⚠️ remember_fact blocked: missing runtime user_id") return ToolResult(success=False, result=None, error="invalid_tool_args: missing runtime user_id for memory write.") fact_hash = hashlib.sha1(fact.encode("utf-8")).hexdigest()[:12] fact_key = f"{category}_{fact_hash}" try: resp = await self.http_client.post( "http://memory-service:8000/facts/upsert", json={ "user_id": runtime_user_id, "fact_key": fact_key, "fact_value": fact, "fact_value_json": { "text": fact, "category": category, "about": runtime_user_id, "agent_id": agent_id, "chat_id": chat_id, "source": "remember_fact_tool", }, }, ) if resp.status_code in [200, 201]: return ToolResult(success=True, result=f"✅ Запам'ятовано факт ({category})") return ToolResult(success=False, result=None, error=f"memory_store_failed:{resp.status_code}:{resp.text[:160]}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _presentation_create(self, args: Dict) -> ToolResult: """Create a presentation via Presentation Renderer""" title = args.get("title", "Презентація") slides = args.get("slides", []) brand_id = args.get("brand_id", "energyunion") theme_version = args.get("theme_version", "v1.0.0") language = args.get("language", "uk") # Build SlideSpec slidespec = { "meta": { "title": title, "brand_id": brand_id, "theme_version": theme_version, "language": language }, "slides": [] } # Add title slide slidespec["slides"].append({ "type": "title", "title": title }) # Add content slides for slide in slides: slide_obj = { "type": "content", "title": slide.get("title", ""), "body": slide.get("content", "") } slidespec["slides"].append(slide_obj) try: renderer_url = os.getenv("PRESENTATION_RENDERER_URL", "http://presentation-renderer:9600") resp = await self.http_client.post( f"{renderer_url}/present/render", json=slidespec, timeout=120.0 ) if resp.status_code == 200: data = resp.json() job_id = data.get("job_id") artifact_id = data.get("artifact_id") return ToolResult( success=True, result=f"📊 Презентацію створено!\n\n🆔 Job ID: `{job_id}`\n📦 Artifact ID: `{artifact_id}`\n\nЩоб перевірити статус: використай presentation_status\nЩоб завантажити: використай presentation_download" ) else: error_text = resp.text[:200] if resp.text else "Unknown error" return ToolResult(success=False, result=None, error=f"Render failed ({resp.status_code}): {error_text}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _presentation_status(self, args: Dict) -> ToolResult: """Check presentation job status""" job_id = args.get("job_id") try: registry_url = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9700") resp = await self.http_client.get( f"{registry_url}/jobs/{job_id}", timeout=10.0 ) if resp.status_code == 200: data = resp.json() status = data.get("status", "unknown") artifact_id = data.get("artifact_id") error = data.get("error_text", "") status_emoji = {"queued": "⏳", "running": "🔄", "done": "✅", "failed": "❌"}.get(status, "❓") result = f"{status_emoji} Статус: **{status}**\n" if artifact_id: result += f"📦 Artifact ID: `{artifact_id}`\n" if status == "done": result += "\n✅ Презентація готова! Використай presentation_download щоб отримати файл." if status == "failed" and error: result += f"\n❌ Помилка: {error[:200]}" return ToolResult(success=True, result=result) elif resp.status_code == 404: return ToolResult(success=False, result=None, error="Job not found") else: return ToolResult(success=False, result=None, error=f"Status check failed: {resp.status_code}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _presentation_download(self, args: Dict) -> ToolResult: """Get download link for presentation""" artifact_id = args.get("artifact_id") file_format = args.get("format", "pptx") try: registry_url = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9700") resp = await self.http_client.get( f"{registry_url}/artifacts/{artifact_id}/download?format={file_format}", timeout=10.0, follow_redirects=False ) if resp.status_code in [200, 302, 307]: # Check for signed URL in response or Location header if resp.status_code in [302, 307]: download_url = resp.headers.get("Location") else: data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {} download_url = data.get("download_url") or data.get("url") if download_url: return ToolResult( success=True, result=f"📥 **Посилання для завантаження ({file_format.upper()}):**\n\n{download_url}\n\n⏰ Посилання дійсне 30 хвилин." ) else: # Direct binary response - artifact available return ToolResult( success=True, result=f"✅ Файл {file_format.upper()} готовий! Завантажити можна через: {registry_url}/artifacts/{artifact_id}/download?format={file_format}" ) elif resp.status_code == 404: return ToolResult(success=False, result=None, error=f"Формат {file_format.upper()} ще не готовий. Спробуй пізніше.") else: return ToolResult(success=False, result=None, error=f"Download failed: {resp.status_code}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _crawl4ai_scrape(self, args: Dict) -> ToolResult: """Deep scrape a web page using Crawl4AI - PRIORITY 5""" url = args.get("url") extract_links = args.get("extract_links", True) extract_images = args.get("extract_images", False) if not url: return ToolResult(success=False, result=None, error="URL is required") try: crawl4ai_url = os.getenv("CRAWL4AI_URL", "http://dagi-crawl4ai-node1:11235") payload = { "urls": [url], "priority": 5, "session_id": f"agent_scrape_{hash(url) % 10000}" } resp = await self.http_client.post( f"{crawl4ai_url}/crawl", json=payload, timeout=60.0 ) if resp.status_code == 200: data = resp.json() results = data.get("results", []) if isinstance(data, dict) else [] if not results and isinstance(data, dict): results = [data] if results: result = results[0] if isinstance(results, list) else results markdown = result.get("markdown", "") or result.get("cleaned_html", "") or result.get("text", "") title = result.get("title", url) if len(markdown) > 3000: markdown = markdown[:3000] + "... (скорочено)" response_parts = [f"**{title}**", "", markdown] if extract_links: links = result.get("links", []) if links: response_parts.append("") response_parts.append("**Посилання:**") for link in links[:10]: if isinstance(link, dict): link_url = link.get("href", "") else: link_url = str(link) if link_url: response_parts.append(f"- {link_url}") return ToolResult(success=True, result="\n".join(response_parts)) else: return ToolResult(success=False, result=None, error="No content extracted") else: return ToolResult(success=False, result=None, error=f"Crawl failed: {resp.status_code}") except Exception as e: logger.error(f"Crawl4AI scrape failed: {e}") return ToolResult(success=False, result=None, error=str(e)) async def _tts_speak(self, args: Dict) -> ToolResult: """Convert text to speech using Swapper TTS - PRIORITY 6""" text = args.get("text") language = args.get("language", "uk") if not text: return ToolResult(success=False, result=None, error="Text is required") try: if len(text) > 1000: text = text[:1000] resp = await self.http_client.post( f"{self.swapper_url}/tts", json={"text": text, "language": language}, timeout=60.0 ) if resp.status_code == 200: data = resp.json() audio_url = data.get("audio_url") or data.get("url") if audio_url: return ToolResult(success=True, result=f"Аудіо: {audio_url}") else: return ToolResult(success=True, result="TTS completed") else: return ToolResult(success=False, result=None, error=f"TTS failed: {resp.status_code}") except Exception as e: logger.error(f"TTS failed: {e}") return ToolResult(success=False, result=None, error=str(e)) async def _market_data(self, args: Dict) -> ToolResult: """Query real-time market data from Market Data Service and SenpAI MD Consumer.""" symbol = str(args.get("symbol", "BTCUSDT")).upper() query_type = str(args.get("query_type", "all")).lower() md_url = os.getenv("MARKET_DATA_URL", "http://dagi-market-data-node1:8891") consumer_url = os.getenv("SENPAI_CONSUMER_URL", "http://dagi-senpai-md-consumer-node1:8892") results: Dict[str, Any] = {} try: async with httpx.AsyncClient(timeout=8.0) as client: if query_type in ("price", "all"): try: resp = await client.get(f"{md_url}/latest", params={"symbol": symbol}) if resp.status_code == 200: data = resp.json() trade = data.get("latest_trade", {}) or {} quote = data.get("latest_quote", {}) or {} bid = quote.get("bid") ask = quote.get("ask") spread = None if isinstance(bid, (int, float)) and isinstance(ask, (int, float)): spread = round(ask - bid, 6) results["price"] = { "symbol": symbol, "last_price": trade.get("price"), "size": trade.get("size"), "side": trade.get("side"), "bid": bid, "ask": ask, "spread": spread, "provider": trade.get("provider"), "timestamp": trade.get("ts_recv"), } else: results["price_error"] = f"market-data status={resp.status_code}" except Exception as e: results["price_error"] = str(e) if query_type in ("features", "all"): try: resp = await client.get(f"{consumer_url}/features/latest", params={"symbol": symbol}) if resp.status_code == 200: data = resp.json() feats = data.get("features", {}) or {} results["features"] = { "symbol": symbol, "mid_price": feats.get("mid"), "spread_bps": round(float(feats.get("spread_bps", 0) or 0), 2), "vwap_10s": round(float(feats.get("trade_vwap_10s", 0) or 0), 2), "vwap_60s": round(float(feats.get("trade_vwap_60s", 0) or 0), 2), "trade_count_10s": int(feats.get("trade_count_10s", 0) or 0), "trade_volume_10s": round(float(feats.get("trade_volume_10s", 0) or 0), 4), "return_10s_pct": round(float(feats.get("return_10s", 0) or 0) * 100, 4), "realized_vol_60s_pct": round(float(feats.get("realized_vol_60s", 0) or 0) * 100, 6), "latency_p50_ms": round(float(feats.get("latency_ms_p50", 0) or 0), 1), "latency_p95_ms": round(float(feats.get("latency_ms_p95", 0) or 0), 1), } else: results["features_error"] = f"senpai-consumer status={resp.status_code}" except Exception as e: results["features_error"] = str(e) if not results: return ToolResult(success=False, result=None, error=f"No market data for {symbol}") return ToolResult(success=True, result=json.dumps(results, ensure_ascii=False)) except Exception as e: logger.error(f"Market data tool error: {e}") return ToolResult(success=False, result=None, error=str(e)) async def close(self): await self.http_client.aclose() def _strip_think_tags(text: str) -> str: """Remove ... tags from DeepSeek responses.""" import re text = re.sub(r'.*?', '', text, flags=re.DOTALL) text = re.sub(r'.*$', '', text, flags=re.DOTALL) # unclosed tag return text.strip() def format_tool_calls_for_response(tool_results: List[Dict], fallback_mode: str = "normal") -> str: """ Format tool results in human-friendly way - NOT raw data! Args: tool_results: List of tool execution results fallback_mode: "normal" | "dsml_detected" | "empty_response" """ # Special handling for DSML detection - LLM tried to use tools but got confused # If we have successful tool results, show them instead of generic fallback if fallback_mode == "dsml_detected": # Check if any tool succeeded with a useful result if tool_results: for tr in tool_results: if tr.get("success") and tr.get("result"): # Avoid dumping raw retrieval/search payloads to the user. # These often look like "memory dumps" and are perceived as incorrect answers. tool_name = (tr.get("name") or "").strip() if tool_name in {"memory_search", "web_search", "web_extract", "web_read"}: continue result = str(tr.get("result", "")) if result and len(result) > 10 and "error" not in result.lower(): # We have a useful tool result - use it! if len(result) > 600: return result[:600] + "..." return result # No useful tool results - give presence acknowledgment return "Вибач, відповідь згенерувалась некоректно. Спробуй ще раз (коротше/конкретніше) або повтори питання одним реченням." if not tool_results: if fallback_mode == "empty_response": return "Вибач, щось пішло не так. Спробуй ще раз." return "Вибач, не вдалося виконати запит." # Check what tools were used tool_names = [tr.get("name", "") for tr in tool_results] # Check if ANY tool succeeded any_success = any(tr.get("success") for tr in tool_results) if not any_success: # All tools failed - give helpful message errors = [tr.get("error", "unknown") for tr in tool_results if tr.get("error")] if errors: logger.warning(f"All tools failed: {errors}") return "Вибач, виникла технічна проблема. Спробуй ще раз або переформулюй питання." # Image generation - special handling if "image_generate" in tool_names: for tr in tool_results: if tr.get("name") == "image_generate" and tr.get("success"): return "✅ Зображення згенеровано!" if "comfy_generate_image" in tool_names: for tr in tool_results: if tr.get("name") == "comfy_generate_image" and tr.get("success"): return str(tr.get("result", "✅ Зображення згенеровано через ComfyUI")) if "comfy_generate_video" in tool_names: for tr in tool_results: if tr.get("name") == "comfy_generate_video" and tr.get("success"): return str(tr.get("result", "✅ Відео згенеровано через ComfyUI")) # Web search - show actual results to user if "web_search" in tool_names: for tr in tool_results: if tr.get("name") == "web_search": if tr.get("success"): result = tr.get("result", "") if not result: return "🔍 Не знайшов релевантної інформації в інтернеті." # Parse and format results for user lines = result.strip().split("\n") formatted = ["🔍 **Результати пошуку:**\n"] current_title = "" current_url = "" current_snippet = "" count = 0 for line in lines: line = line.strip() if line.startswith("- ") and not line.startswith("- URL:"): if current_title and count < 3: # Show max 3 results formatted.append(f"**{count}. {current_title}**") if current_snippet: formatted.append(f" {current_snippet[:150]}...") if current_url: formatted.append(f" 🔗 {current_url}\n") current_title = line[2:].strip() current_snippet = "" current_url = "" count += 1 elif "URL:" in line: current_url = line.split("URL:")[-1].strip() elif line and not line.startswith("-"): current_snippet = line # Add last result if current_title and count <= 3: formatted.append(f"**{count}. {current_title}**") if current_snippet: formatted.append(f" {current_snippet[:150]}...") if current_url: formatted.append(f" 🔗 {current_url}") if len(formatted) > 1: return "\n".join(formatted) else: return "🔍 Не знайшов релевантної інформації в інтернеті." else: return "🔍 Пошук в інтернеті не вдався. Спробуй ще раз." # Memory search if "memory_search" in tool_names: for tr in tool_results: if tr.get("name") == "memory_search" and tr.get("success"): result = tr.get("result", "") if "немає інформації" in result.lower() or not result: return "🧠 В моїй пам'яті немає інформації про це." # Truncate if too long if len(result) > 500: return result[:500] + "..." return result # Graph query if "graph_query" in tool_names: for tr in tool_results: if tr.get("name") == "graph_query" and tr.get("success"): result = tr.get("result", "") if not result or "не знайдено" in result.lower(): return "📊 В базі знань немає інформації про це." if len(result) > 500: return result[:500] + "..." return result # Default fallback - check if we have any result to show for tr in tool_results: if tr.get("success") and tr.get("result"): result = str(tr.get("result", "")) if result and len(result) > 10: # We have something, show it if len(result) > 400: return result[:400] + "..." return result # Really nothing useful - be honest return "Я обробив твій запит, але не знайшов корисної інформації. Можеш уточнити питання?"