""" Tool Manager for Helion Agent Implements OpenAI-compatible function calling for DeepSeek, Mistral, Grok """ import os import asyncio import uuid from agent_tools_config import get_agent_tools, is_tool_allowed import json import logging import hashlib import base64 import csv import httpx from typing import Dict, List, Any, Optional from dataclasses import dataclass from io import BytesIO, StringIO from pathlib import PurePath from zipfile import ZIP_DEFLATED, ZipFile logger = logging.getLogger(__name__) # Tool definitions in OpenAI function calling format # ORDER MATTERS: Memory/Graph tools first, then web search as fallback TOOL_DEFINITIONS = [ # PRIORITY 1: Internal knowledge sources (use FIRST) { "type": "function", "function": { "name": "memory_search", "description": "🔍 ПЕРШИЙ КРОК для пошуку! Шукає в моїй пам'яті: збережені факти, документи, розмови. ЗАВЖДИ використовуй спочатку перед web_search!", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "Що шукати в пам'яті" } }, "required": ["query"] } } }, { "type": "function", "function": { "name": "graph_query", "description": "🔍 Пошук в Knowledge Graph - зв'язки між проєктами, людьми, темами Energy Union. Використовуй для питань про проєкти, партнерів, технології.", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "Що шукати (назва проєкту, людини, теми)" }, "entity_type": { "type": "string", "enum": ["User", "Topic", "Project", "Fact"], "description": "Тип сутності для пошуку" } }, "required": ["query"] } } }, # PRIORITY 2: Web search (use ONLY if memory/graph don't have info) { "type": "function", "function": { "name": "web_search", "description": "🌐 Пошук в інтернеті. Використовуй ТІЛЬКИ якщо memory_search і graph_query не знайшли потрібної інформації!", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "Пошуковий запит" }, "max_results": { "type": "integer", "description": "Максимальна кількість результатів (1-10)", "default": 5 } }, "required": ["query"] } } }, { "type": "function", "function": { "name": "web_extract", "description": "Витягнути текстовий контент з веб-сторінки за URL", "parameters": { "type": "object", "properties": { "url": { "type": "string", "description": "URL сторінки для читання" } }, "required": ["url"] } } }, # PRIORITY 3: Generation tools { "type": "function", "function": { "name": "image_generate", "description": "🎨 Згенерувати зображення за текстовим описом (FLUX)", "parameters": { "type": "object", "properties": { "prompt": { "type": "string", "description": "Опис зображення для генерації (англійською краще)" }, "width": { "type": "integer", "description": "Ширина зображення", "default": 512 }, "height": { "type": "integer", "description": "Висота зображення", "default": 512 } }, "required": ["prompt"] } } }, { "type": "function", "function": { "name": "comfy_generate_image", "description": "🖼️ Згенерувати зображення через ComfyUI (NODE3, Stable Diffusion). Для високої якості та детальних зображень.", "parameters": { "type": "object", "properties": { "prompt": { "type": "string", "description": "Детальний опис зображення (англійською)" }, "negative_prompt": { "type": "string", "description": "Що НЕ включати в зображення", "default": "blurry, low quality, watermark" }, "width": { "type": "integer", "description": "Ширина (512, 768, 1024)", "default": 512 }, "height": { "type": "integer", "description": "Висота (512, 768, 1024)", "default": 512 }, "steps": { "type": "integer", "description": "Кількість кроків генерації (20-50)", "default": 28 } }, "required": ["prompt"] } } }, { "type": "function", "function": { "name": "comfy_generate_video", "description": "🎬 Згенерувати відео через ComfyUI (NODE3, LTX-2). Text-to-video для коротких кліпів.", "parameters": { "type": "object", "properties": { "prompt": { "type": "string", "description": "Детальний опис відео (англійською)" }, "seconds": { "type": "integer", "description": "Тривалість в секундах (2-8)", "default": 4 }, "fps": { "type": "integer", "description": "Кадри в секунду (24-30)", "default": 24 }, "steps": { "type": "integer", "description": "Кількість кроків генерації (20-40)", "default": 30 } }, "required": ["prompt"] } } }, { "type": "function", "function": { "name": "remember_fact", "description": "Запам'ятати важливий факт про користувача або тему", "parameters": { "type": "object", "properties": { "fact": { "type": "string", "description": "Факт для запам'ятовування" }, "about": { "type": "string", "description": "Про кого/що цей факт (username або тема)" }, "category": { "type": "string", "enum": ["personal", "technical", "preference", "project"], "description": "Категорія факту" } }, "required": ["fact"] } } }, # PRIORITY 4: Document/Presentation tools { "type": "function", "function": { "name": "presentation_create", "description": "📊 Створити презентацію PowerPoint. Використовуй коли користувач просить 'створи презентацію', 'зроби презентацію', 'підготуй слайди'.", "parameters": { "type": "object", "properties": { "title": { "type": "string", "description": "Назва презентації" }, "slides": { "type": "array", "items": { "type": "object", "properties": { "title": {"type": "string", "description": "Заголовок слайду"}, "content": {"type": "string", "description": "Контент слайду (markdown)"} } }, "description": "Масив слайдів: [{title, content}]" }, "brand_id": { "type": "string", "description": "ID бренду для стилю (energyunion, greenfood, nutra)", "default": "energyunion" }, "theme_version": { "type": "string", "description": "Версія теми", "default": "v1.0.0" }, "language": { "type": "string", "enum": ["uk", "en", "ru"], "description": "Мова презентації", "default": "uk" } }, "required": ["title", "slides"] } } }, { "type": "function", "function": { "name": "presentation_status", "description": "📋 Перевірити статус створення презентації за job_id", "parameters": { "type": "object", "properties": { "job_id": { "type": "string", "description": "ID завдання рендерингу" } }, "required": ["job_id"] } } }, { "type": "function", "function": { "name": "presentation_download", "description": "📥 Отримати посилання на готову презентацію за artifact_id", "parameters": { "type": "object", "properties": { "artifact_id": { "type": "string", "description": "ID артефакту презентації" }, "format": { "type": "string", "enum": ["pptx", "pdf"], "description": "Формат файлу", "default": "pptx" } }, "required": ["artifact_id"] } } }, { "type": "function", "function": { "name": "file_tool", "description": "📁 Універсальний file tool для створення та оновлення CSV/JSON/YAML/ZIP і інших форматів через action-based API.", "parameters": { "type": "object", "properties": { "action": { "type": "string", "enum": [ "excel_create", "excel_update", "docx_create", "docx_update", "csv_create", "csv_update", "pdf_fill", "pdf_merge", "pdf_split", "json_export", "yaml_export", "zip_bundle", "text_create", "text_update", "markdown_create", "markdown_update", "xml_export", "html_export" ], "description": "Дія file tool" }, "file_name": { "type": "string", "description": "Назва файлу-результату" }, "file_base64": { "type": "string", "description": "Вхідний файл у base64 для update-операцій" }, "content": { "description": "Контент для json/yaml export" }, "headers": { "type": "array", "items": {"type": "string"}, "description": "Заголовки для CSV" }, "rows": { "type": "array", "description": "Рядки для CSV" }, "entries": { "type": "array", "description": "Елементи для zip_bundle" }, "operation": { "type": "string", "enum": ["append", "replace"], "description": "Режим csv_update" } }, "required": ["action"] } } }, # PRIORITY 5: Web Scraping tools { "type": "function", "function": { "name": "crawl4ai_scrape", "description": "🕷️ Глибокий скрейпінг веб-сторінки через Crawl4AI. Витягує повний контент, структуровані дані, медіа. Використовуй для детального аналізу сайтів.", "parameters": { "type": "object", "properties": { "url": { "type": "string", "description": "URL сторінки для скрейпінгу" }, "extract_links": { "type": "boolean", "description": "Витягувати посилання зі сторінки", "default": True }, "extract_images": { "type": "boolean", "description": "Витягувати зображення", "default": False } }, "required": ["url"] } } }, # PRIORITY 6: TTS tools { "type": "function", "function": { "name": "tts_speak", "description": "🔊 Перетворити текст на аудіо (Text-to-Speech). Повертає аудіо файл. Використовуй коли користувач просить озвучити текст.", "parameters": { "type": "object", "properties": { "text": { "type": "string", "description": "Текст для озвучення" }, "language": { "type": "string", "enum": ["uk", "en", "ru"], "description": "Мова озвучення", "default": "uk" } }, "required": ["text"] } } }, # PRIORITY 7: Market Data tools (SenpAI) { "type": "function", "function": { "name": "market_data", "description": "📊 Отримати real-time ринкові дані: поточну ціну, котирування, обсяги, аналітичні фічі (VWAP, spread, volatility, trade signals). Доступні символи: BTCUSDT, ETHUSDT.", "parameters": { "type": "object", "properties": { "symbol": { "type": "string", "description": "Торговий символ (наприклад BTCUSDT, ETHUSDT)" }, "query_type": { "type": "string", "enum": ["price", "features", "all"], "description": "Тип запиту: price (ціна + котирування), features (аналітичні фічі), all (все разом)", "default": "all" } }, "required": ["symbol"] } } } ] @dataclass class ToolResult: """Result of tool execution""" success: bool result: Any error: Optional[str] = None image_base64: Optional[str] = None # For image generation results file_base64: Optional[str] = None file_name: Optional[str] = None file_mime: Optional[str] = None class ToolManager: """Manages tool execution for the agent""" def __init__(self, config: Dict[str, Any]): self.config = config self.http_client = httpx.AsyncClient(timeout=60.0) self.swapper_url = os.getenv("SWAPPER_URL", "http://swapper-service:8890") self.comfy_agent_url = os.getenv("COMFY_AGENT_URL", "http://212.8.58.133:8880") self.tools_config = self._load_tools_config() def _load_tools_config(self) -> Dict[str, Dict]: """Load tool endpoints from config""" tools = {} agent_config = self.config.get("agents", {}).get("helion", {}) for tool in agent_config.get("tools", []): if "endpoint" in tool: tools[tool["id"]] = { "endpoint": tool["endpoint"], "method": tool.get("method", "POST") } return tools def get_tool_definitions(self, agent_id: str = None) -> List[Dict]: """Get tool definitions for function calling, filtered by agent permissions""" if not agent_id: return TOOL_DEFINITIONS # Get allowed tools for this agent allowed_tools = get_agent_tools(agent_id) # Filter tool definitions filtered = [] for tool_def in TOOL_DEFINITIONS: tool_name = tool_def.get("function", {}).get("name") if tool_name in allowed_tools: filtered.append(tool_def) tool_names = [t.get("function", {}).get("name") for t in filtered] logger.debug(f"Agent {agent_id} has {len(filtered)} tools: {tool_names}") return filtered async def execute_tool( self, tool_name: str, arguments: Dict[str, Any], agent_id: str = None, chat_id: str = None, user_id: str = None, ) -> ToolResult: """Execute a tool and return result. Optionally checks agent permissions.""" logger.info(f"🔧 Executing tool: {tool_name} for agent={agent_id} with args: {arguments}") # Check agent permission if agent_id provided if agent_id and not is_tool_allowed(agent_id, tool_name): logger.warning(f"⚠️ Tool {tool_name} not allowed for agent {agent_id}") return ToolResult(success=False, result=None, error=f"Tool {tool_name} not available for this agent") try: # Priority 1: Memory/Knowledge tools if tool_name == "memory_search": return await self._memory_search(arguments, agent_id=agent_id, chat_id=chat_id, user_id=user_id) elif tool_name == "graph_query": return await self._graph_query(arguments, agent_id=agent_id) # Priority 2: Web tools elif tool_name == "web_search": return await self._web_search(arguments) elif tool_name == "web_extract": return await self._web_extract(arguments) elif tool_name == "image_generate": return await self._image_generate(arguments) elif tool_name == "comfy_generate_image": return await self._comfy_generate_image(arguments) elif tool_name == "comfy_generate_video": return await self._comfy_generate_video(arguments) elif tool_name == "remember_fact": return await self._remember_fact(arguments, agent_id=agent_id, chat_id=chat_id, user_id=user_id) # Priority 4: Presentation tools elif tool_name == "presentation_create": return await self._presentation_create(arguments) elif tool_name == "presentation_status": return await self._presentation_status(arguments) elif tool_name == "presentation_download": return await self._presentation_download(arguments) # Priority 5: Web scraping tools elif tool_name == "crawl4ai_scrape": return await self._crawl4ai_scrape(arguments) # Priority 6: TTS tools elif tool_name == "tts_speak": return await self._tts_speak(arguments) # Priority 6: File artifacts elif tool_name == "file_tool": return await self._file_tool(arguments) # Priority 7: Market Data (SenpAI) elif tool_name == "market_data": return await self._market_data(arguments) else: return ToolResult(success=False, result=None, error=f"Unknown tool: {tool_name}") except Exception as e: logger.error(f"Tool execution failed: {e}") return ToolResult(success=False, result=None, error=str(e)) @staticmethod def _sanitize_file_name(name: Optional[str], default_name: str, force_ext: Optional[str] = None) -> str: raw = (name or default_name).strip() or default_name base = PurePath(raw).name if not base: base = default_name if force_ext and not base.lower().endswith(force_ext): base = f"{base}{force_ext}" return base @staticmethod def _b64_from_bytes(data: bytes) -> str: return base64.b64encode(data).decode("utf-8") @staticmethod def _bytes_from_b64(value: str) -> bytes: return base64.b64decode(value) @staticmethod def _normalize_rows(rows: Any, headers: Optional[List[str]] = None) -> List[List[Any]]: if not isinstance(rows, list): return [] out: List[List[Any]] = [] for row in rows: if isinstance(row, list): out.append(row) elif isinstance(row, dict): keys = headers or list(row.keys()) out.append([row.get(k, "") for k in keys]) else: out.append([row]) return out @staticmethod def _append_sheet_data(ws: Any, headers: List[str], rows: List[List[Any]]) -> None: if headers: ws.append(headers) for row in rows: ws.append(row) async def _file_tool(self, args: Dict[str, Any]) -> ToolResult: action = str((args or {}).get("action") or "").strip().lower() if not action: return ToolResult(success=False, result=None, error="Missing action") if action == "excel_create": return self._file_excel_create(args) if action == "excel_update": return self._file_excel_update(args) if action == "csv_create": return self._file_csv_create(args) if action == "csv_update": return self._file_csv_update(args) if action == "text_create": return self._file_text_create(args) if action == "text_update": return self._file_text_update(args) if action == "markdown_create": return self._file_markdown_create(args) if action == "markdown_update": return self._file_markdown_update(args) if action == "xml_export": return self._file_xml_export(args) if action == "html_export": return self._file_html_export(args) if action == "json_export": return self._file_json_export(args) if action == "yaml_export": return self._file_yaml_export(args) if action == "zip_bundle": return self._file_zip_bundle(args) if action == "docx_create": return self._file_docx_create(args) if action == "docx_update": return self._file_docx_update(args) if action == "pdf_merge": return self._file_pdf_merge(args) if action == "pdf_split": return self._file_pdf_split(args) if action == "pdf_fill": return self._file_pdf_fill(args) return ToolResult(success=False, result=None, error=f"Action not implemented yet: {action}") def _file_csv_create(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "export.csv", force_ext=".csv") headers = args.get("headers") or [] rows_raw = args.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) sio = StringIO(newline="") writer = csv.writer(sio) if headers: writer.writerow(headers) for row in rows: writer.writerow(row) data = sio.getvalue().encode("utf-8") return ToolResult( success=True, result={"message": f"CSV created: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/csv", ) def _file_csv_update(self, args: Dict[str, Any]) -> ToolResult: src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for csv_update") file_name = self._sanitize_file_name(args.get("file_name"), "updated.csv", force_ext=".csv") operation = str(args.get("operation") or "append").strip().lower() if operation not in {"append", "replace"}: return ToolResult(success=False, result=None, error="operation must be append|replace") headers = args.get("headers") or [] rows_raw = args.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) existing_rows: List[List[str]] = [] text = self._bytes_from_b64(src_b64).decode("utf-8") if text.strip(): existing_rows = [list(r) for r in csv.reader(StringIO(text))] out_rows: List[List[Any]] = [] if operation == "replace": if headers: out_rows.append(headers) out_rows.extend(rows) else: if existing_rows: out_rows.extend(existing_rows) elif headers: out_rows.append(headers) out_rows.extend(rows) sio = StringIO(newline="") writer = csv.writer(sio) for row in out_rows: writer.writerow(row) data = sio.getvalue().encode("utf-8") return ToolResult( success=True, result={"message": f"CSV updated: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/csv", ) def _file_json_export(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "export.json", force_ext=".json") content = args.get("content") indent = int(args.get("indent") or 2) payload = json.dumps(content, indent=indent, ensure_ascii=False).encode("utf-8") return ToolResult( success=True, result={"message": f"JSON exported: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="application/json", ) @staticmethod def _stringify_text_payload(args: Dict[str, Any], key: str = "text") -> str: value = args.get(key) if value is None and key != "content": value = args.get("content") if value is None: return "" if isinstance(value, (dict, list)): return json.dumps(value, ensure_ascii=False, indent=2) return str(value) def _file_text_create(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "note.txt", force_ext=".txt") text = self._stringify_text_payload(args, key="text") data = text.encode("utf-8") return ToolResult( success=True, result={"message": f"Text file created: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/plain", ) def _file_text_update(self, args: Dict[str, Any]) -> ToolResult: src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for text_update") file_name = self._sanitize_file_name(args.get("file_name"), "updated.txt", force_ext=".txt") operation = str(args.get("operation") or "append").strip().lower() if operation not in {"append", "replace"}: return ToolResult(success=False, result=None, error="operation must be append|replace") incoming = self._stringify_text_payload(args, key="text") existing = self._bytes_from_b64(src_b64).decode("utf-8") updated = incoming if operation == "replace" else f"{existing}{incoming}" data = updated.encode("utf-8") return ToolResult( success=True, result={"message": f"Text file updated: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/plain", ) def _file_markdown_create(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "document.md", force_ext=".md") text = self._stringify_text_payload(args, key="text") data = text.encode("utf-8") return ToolResult( success=True, result={"message": f"Markdown created: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/markdown", ) def _file_markdown_update(self, args: Dict[str, Any]) -> ToolResult: src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for markdown_update") file_name = self._sanitize_file_name(args.get("file_name"), "updated.md", force_ext=".md") operation = str(args.get("operation") or "append").strip().lower() if operation not in {"append", "replace"}: return ToolResult(success=False, result=None, error="operation must be append|replace") incoming = self._stringify_text_payload(args, key="text") existing = self._bytes_from_b64(src_b64).decode("utf-8") updated = incoming if operation == "replace" else f"{existing}{incoming}" data = updated.encode("utf-8") return ToolResult( success=True, result={"message": f"Markdown updated: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/markdown", ) def _file_xml_export(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "export.xml", force_ext=".xml") xml = self._stringify_text_payload(args, key="xml") data = xml.encode("utf-8") return ToolResult( success=True, result={"message": f"XML exported: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="application/xml", ) def _file_html_export(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "export.html", force_ext=".html") html = self._stringify_text_payload(args, key="html") data = html.encode("utf-8") return ToolResult( success=True, result={"message": f"HTML exported: {file_name}"}, file_base64=self._b64_from_bytes(data), file_name=file_name, file_mime="text/html", ) def _file_yaml_export(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "export.yaml", force_ext=".yaml") content = args.get("content") payload = json.dumps(content, ensure_ascii=False, indent=2).encode("utf-8") try: import yaml payload = yaml.safe_dump(content, allow_unicode=True, sort_keys=False).encode("utf-8") except Exception: # Fallback to JSON serialization if PyYAML fails. pass return ToolResult( success=True, result={"message": f"YAML exported: {file_name}"}, file_base64=self._b64_from_bytes(payload), file_name=file_name, file_mime="application/x-yaml", ) def _file_zip_bundle(self, args: Dict[str, Any]) -> ToolResult: file_name = self._sanitize_file_name(args.get("file_name"), "bundle.zip", force_ext=".zip") entries = args.get("entries") or [] if not isinstance(entries, list) or not entries: return ToolResult(success=False, result=None, error="entries must be non-empty array") out = BytesIO() with ZipFile(out, mode="w", compression=ZIP_DEFLATED) as zf: for idx, entry in enumerate(entries, start=1): if not isinstance(entry, dict): return ToolResult(success=False, result=None, error=f"entry[{idx-1}] must be object") ename = self._sanitize_file_name(entry.get("file_name"), f"file_{idx}.bin") if entry.get("file_base64"): zf.writestr(ename, self._bytes_from_b64(entry["file_base64"])) elif "text" in entry: zf.writestr(ename, str(entry["text"]).encode("utf-8")) elif "content" in entry: zf.writestr(ename, json.dumps(entry["content"], ensure_ascii=False, indent=2).encode("utf-8")) else: return ToolResult( success=False, result=None, error=f"entry[{idx-1}] requires file_base64|text|content", ) return ToolResult( success=True, result={"message": f"ZIP bundle created: {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/zip", ) def _file_excel_create(self, args: Dict[str, Any]) -> ToolResult: import openpyxl file_name = self._sanitize_file_name(args.get("file_name"), "report.xlsx", force_ext=".xlsx") sheets = args.get("sheets") wb = openpyxl.Workbook() wb.remove(wb.active) created = False if isinstance(sheets, list) and sheets: for idx, sheet in enumerate(sheets, start=1): if not isinstance(sheet, dict): return ToolResult(success=False, result=None, error=f"sheets[{idx-1}] must be object") sheet_name = str(sheet.get("name") or f"Sheet{idx}")[:31] headers = sheet.get("headers") or [] rows_raw = sheet.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) ws = wb.create_sheet(title=sheet_name) self._append_sheet_data(ws, headers, rows) created = True else: sheet_name = str(args.get("sheet_name") or "Sheet1")[:31] headers = args.get("headers") or [] rows_raw = args.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) ws = wb.create_sheet(title=sheet_name) self._append_sheet_data(ws, headers, rows) created = True if not created: wb.create_sheet(title="Sheet1") out = BytesIO() wb.save(out) return ToolResult( success=True, result={"message": f"Excel created: {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ) def _file_excel_update(self, args: Dict[str, Any]) -> ToolResult: import openpyxl src_b64 = args.get("file_base64") operations = args.get("operations") or [] if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for excel_update") if not isinstance(operations, list) or not operations: return ToolResult(success=False, result=None, error="operations must be non-empty array") file_name = self._sanitize_file_name(args.get("file_name"), "updated.xlsx", force_ext=".xlsx") wb = openpyxl.load_workbook(filename=BytesIO(self._bytes_from_b64(src_b64))) for op in operations: if not isinstance(op, dict): return ToolResult(success=False, result=None, error="Each operation must be object") op_type = str(op.get("type") or "").strip().lower() if op_type == "append_rows": sheet = str(op.get("sheet") or wb.sheetnames[0])[:31] if sheet not in wb.sheetnames: wb.create_sheet(title=sheet) ws = wb[sheet] rows_raw = op.get("rows") or [] header_row = [c.value for c in ws[1]] if ws.max_row >= 1 else [] rows = self._normalize_rows(rows_raw, headers=header_row if header_row else None) if rows and not header_row and isinstance(rows_raw[0], dict): header_row = list(rows_raw[0].keys()) ws.append(header_row) rows = self._normalize_rows(rows_raw, headers=header_row) for row in rows: ws.append(row) elif op_type == "set_cell": sheet = str(op.get("sheet") or wb.sheetnames[0])[:31] cell = op.get("cell") if not cell: return ToolResult(success=False, result=None, error="set_cell operation requires 'cell'") if sheet not in wb.sheetnames: wb.create_sheet(title=sheet) wb[sheet][str(cell)] = op.get("value", "") elif op_type == "replace_sheet": sheet = str(op.get("sheet") or wb.sheetnames[0])[:31] if sheet in wb.sheetnames: wb.remove(wb[sheet]) ws = wb.create_sheet(title=sheet) headers = op.get("headers") or [] rows_raw = op.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) self._append_sheet_data(ws, headers, rows) elif op_type == "rename_sheet": src = str(op.get("from") or "") dst = str(op.get("to") or "").strip() if not src or not dst: return ToolResult(success=False, result=None, error="rename_sheet requires 'from' and 'to'") if src not in wb.sheetnames: return ToolResult(success=False, result=None, error=f"Sheet not found: {src}") wb[src].title = dst[:31] else: return ToolResult(success=False, result=None, error=f"Unsupported excel_update operation: {op_type}") out = BytesIO() wb.save(out) return ToolResult( success=True, result={"message": f"Excel updated: {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ) def _file_docx_create(self, args: Dict[str, Any]) -> ToolResult: from docx import Document file_name = self._sanitize_file_name(args.get("file_name"), "document.docx", force_ext=".docx") doc = Document() title = args.get("title") if title: doc.add_heading(str(title), level=1) for item in args.get("paragraphs") or []: doc.add_paragraph(str(item)) for table in args.get("tables") or []: if not isinstance(table, dict): continue headers = [str(h) for h in (table.get("headers") or [])] rows_raw = table.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) total_rows = len(rows) + (1 if headers else 0) total_cols = len(headers) if headers else (len(rows[0]) if rows else 1) t = doc.add_table(rows=max(total_rows, 1), cols=max(total_cols, 1)) row_offset = 0 if headers: for idx, value in enumerate(headers): t.cell(0, idx).text = str(value) row_offset = 1 for ridx, row in enumerate(rows): for cidx, value in enumerate(row): if cidx < total_cols: t.cell(ridx + row_offset, cidx).text = str(value) out = BytesIO() doc.save(out) return ToolResult( success=True, result={"message": f"DOCX created: {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document", ) def _file_docx_update(self, args: Dict[str, Any]) -> ToolResult: from docx import Document src_b64 = args.get("file_base64") operations = args.get("operations") or [] if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for docx_update") if not isinstance(operations, list) or not operations: return ToolResult(success=False, result=None, error="operations must be non-empty array") file_name = self._sanitize_file_name(args.get("file_name"), "updated.docx", force_ext=".docx") doc = Document(BytesIO(self._bytes_from_b64(src_b64))) for op in operations: if not isinstance(op, dict): return ToolResult(success=False, result=None, error="Each operation must be object") op_type = str(op.get("type") or "").strip().lower() if op_type == "append_paragraph": doc.add_paragraph(str(op.get("text") or "")) elif op_type == "append_heading": level = int(op.get("level") or 1) level = max(1, min(level, 9)) doc.add_heading(str(op.get("text") or ""), level=level) elif op_type == "replace_text": old = str(op.get("old") or "") new = str(op.get("new") or "") if not old: return ToolResult(success=False, result=None, error="replace_text requires old") for p in doc.paragraphs: if old in p.text: p.text = p.text.replace(old, new) for table in doc.tables: for row in table.rows: for cell in row.cells: if old in cell.text: cell.text = cell.text.replace(old, new) elif op_type == "append_table": headers = [str(h) for h in (op.get("headers") or [])] rows_raw = op.get("rows") or [] rows = self._normalize_rows(rows_raw, headers=headers if headers else None) if rows and not headers and isinstance(rows_raw[0], dict): headers = list(rows_raw[0].keys()) rows = self._normalize_rows(rows_raw, headers=headers) total_rows = len(rows) + (1 if headers else 0) total_cols = len(headers) if headers else (len(rows[0]) if rows else 1) t = doc.add_table(rows=max(total_rows, 1), cols=max(total_cols, 1)) row_offset = 0 if headers: for idx, value in enumerate(headers): t.cell(0, idx).text = str(value) row_offset = 1 for ridx, row in enumerate(rows): for cidx, value in enumerate(row): if cidx < total_cols: t.cell(ridx + row_offset, cidx).text = str(value) else: return ToolResult(success=False, result=None, error=f"Unsupported docx_update operation: {op_type}") out = BytesIO() doc.save(out) return ToolResult( success=True, result={"message": f"DOCX updated: {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document", ) def _file_pdf_merge(self, args: Dict[str, Any]) -> ToolResult: from pypdf import PdfReader, PdfWriter file_name = self._sanitize_file_name(args.get("file_name"), "merged.pdf", force_ext=".pdf") files = args.get("files") or [] if not isinstance(files, list) or not files: return ToolResult(success=False, result=None, error="files must be non-empty array for pdf_merge") writer = PdfWriter() page_count = 0 for item in files: if not isinstance(item, dict) or not item.get("file_base64"): return ToolResult(success=False, result=None, error="Each file entry must include file_base64") reader = PdfReader(BytesIO(self._bytes_from_b64(item["file_base64"]))) for page in reader.pages: writer.add_page(page) page_count += 1 out = BytesIO() writer.write(out) return ToolResult( success=True, result={"message": f"PDF merged: {file_name} ({page_count} pages)"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/pdf", ) @staticmethod def _parse_split_pages(pages: Any) -> Optional[List[int]]: if not isinstance(pages, list) or not pages: return None parsed: List[int] = [] for p in pages: idx = int(p) if idx < 1: return None parsed.append(idx) return sorted(set(parsed)) def _file_pdf_split(self, args: Dict[str, Any]) -> ToolResult: from pypdf import PdfReader, PdfWriter src_b64 = args.get("file_base64") if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for pdf_split") file_name = self._sanitize_file_name(args.get("file_name"), "split.zip", force_ext=".zip") reader = PdfReader(BytesIO(self._bytes_from_b64(src_b64))) total = len(reader.pages) if total == 0: return ToolResult(success=False, result=None, error="Input PDF has no pages") groups = args.get("groups") split_groups = [] if groups: if not isinstance(groups, list): return ToolResult(success=False, result=None, error="groups must be array") for idx, grp in enumerate(groups, start=1): if not isinstance(grp, dict): return ToolResult(success=False, result=None, error="Each group must be object") gname = self._sanitize_file_name(grp.get("file_name"), f"part_{idx}.pdf", force_ext=".pdf") pages = self._parse_split_pages(grp.get("pages")) if not pages: return ToolResult(success=False, result=None, error=f"Invalid pages in group {idx}") split_groups.append((gname, pages)) else: split_groups = [(f"page_{i+1}.pdf", [i + 1]) for i in range(total)] out = BytesIO() with ZipFile(out, mode="w", compression=ZIP_DEFLATED) as zf: for gname, pages in split_groups: writer = PdfWriter() for p in pages: if p > total: return ToolResult(success=False, result=None, error=f"Page out of range: {p} > {total}") writer.add_page(reader.pages[p - 1]) part = BytesIO() writer.write(part) zf.writestr(gname, part.getvalue()) return ToolResult( success=True, result={"message": f"PDF split into {len(split_groups)} file(s): {file_name}"}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/zip", ) def _file_pdf_fill(self, args: Dict[str, Any]) -> ToolResult: from pypdf import PdfReader, PdfWriter src_b64 = args.get("file_base64") fields = args.get("fields") or {} if not src_b64: return ToolResult(success=False, result=None, error="file_base64 is required for pdf_fill") if not isinstance(fields, dict) or not fields: return ToolResult(success=False, result=None, error="fields must be a non-empty object") file_name = self._sanitize_file_name(args.get("file_name"), "filled.pdf", force_ext=".pdf") reader = PdfReader(BytesIO(self._bytes_from_b64(src_b64))) writer = PdfWriter() writer.append(reader) filled = True try: for page in writer.pages: writer.update_page_form_field_values(page, fields) if hasattr(writer, "set_need_appearances_writer"): writer.set_need_appearances_writer(True) except Exception: filled = False out = BytesIO() writer.write(out) msg = f"PDF form filled: {file_name}" if filled else f"PDF has no fillable form fields, returned unchanged: {file_name}" return ToolResult( success=True, result={"message": msg}, file_base64=self._b64_from_bytes(out.getvalue()), file_name=file_name, file_mime="application/pdf", ) async def _memory_search(self, args: Dict, agent_id: str = None, chat_id: str = None, user_id: str = None) -> ToolResult: """Search in Qdrant vector memory using Router's memory_retrieval - PRIORITY 1""" query = args.get("query") try: # Use Router's memory_retrieval pipeline directly (has Qdrant connection) from memory_retrieval import memory_retrieval if memory_retrieval and memory_retrieval.qdrant_client: results = await memory_retrieval.search_memories( query=query, agent_id=agent_id or "helion", chat_id=chat_id, user_id=user_id, limit=5 ) if results: formatted = [] for r in results: text = r.get("text", "") score = r.get("score", 0) mem_type = r.get("type", "memory") if text: formatted.append(f"• [{mem_type}] {text[:200]}... (релевантність: {score:.2f})") if formatted: return ToolResult(success=True, result=f"🧠 Знайдено в пам'яті:\n" + "\n".join(formatted)) return ToolResult(success=True, result="🧠 В моїй пам'яті немає інформації про це.") else: return ToolResult(success=True, result="🧠 Пам'ять недоступна, спробую web_search.") except Exception as e: logger.warning(f"Memory search error: {e}") return ToolResult(success=True, result="🧠 Не вдалося перевірити пам'ять. Спробую інші джерела.") async def _web_search(self, args: Dict) -> ToolResult: """Execute web search - PRIORITY 2 (use after memory_search)""" query = args.get("query") max_results = args.get("max_results", 5) if not query: return ToolResult(success=False, result=None, error="query is required") try: resp = await self.http_client.post( f"{self.swapper_url}/web/search", json={"query": query, "max_results": max_results} ) if resp.status_code == 200: data = resp.json() results = data.get("results", []) or [] query_terms = {t for t in str(query).lower().replace("/", " ").replace("-", " ").split() if len(t) > 2} trusted_domains = { "wikipedia.org", "wikidata.org", "europa.eu", "fao.org", "who.int", "worldbank.org", "oecd.org", "un.org", "gov.ua", "rada.gov.ua", "kmu.gov.ua", "minagro.gov.ua", "agroportal.ua", "latifundist.com", } low_signal_domains = { "pinterest.com", "tiktok.com", "instagram.com", "facebook.com", "youtube.com", "yandex.", "vk.com", } def _extract_domain(url: str) -> str: if not url: return "" u = url.lower().strip() u = u.replace("https://", "").replace("http://", "") u = u.split("/")[0] if u.startswith("www."): u = u[4:] return u def _overlap_score(title: str, snippet: str, url: str) -> int: text = " ".join([title or "", snippet or "", url or ""]).lower() score = 0 for t in query_terms: if t in text: score += 2 return score ranked: List[Any] = [] for r in results: title = str(r.get("title", "") or "") snippet = str(r.get("snippet", "") or "") url = str(r.get("url", "") or "") domain = _extract_domain(url) score = _overlap_score(title, snippet, url) if any(x in domain for x in low_signal_domains): score -= 2 if any(domain == d or domain.endswith("." + d) for d in trusted_domains): score += 2 if not snippet: score -= 1 if len(title.strip()) < 5: score -= 1 ranked.append((score, r)) ranked.sort(key=lambda x: x[0], reverse=True) selected = [item for _, item in ranked[:max_results]] logger.info(f"🔎 web_search rerank: raw={len(results)} ranked={len(selected)} query='{query[:120]}'") formatted = [] for r in selected: formatted.append( f"- {r.get('title', 'No title')}\n {r.get('snippet', '')}\n URL: {r.get('url', '')}" ) return ToolResult(success=True, result="\n".join(formatted) if formatted else "Нічого не знайдено") else: return ToolResult(success=False, result=None, error=f"Search failed: {resp.status_code}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _web_extract(self, args: Dict) -> ToolResult: """Extract content from URL""" url = args.get("url") try: resp = await self.http_client.post( f"{self.swapper_url}/web/extract", json={"url": url} ) if resp.status_code == 200: data = resp.json() content = data.get("content", "") # Truncate if too long if len(content) > 4000: content = content[:4000] + "\n... (текст обрізано)" return ToolResult(success=True, result=content) else: return ToolResult(success=False, result=None, error=f"Extract failed: {resp.status_code}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _unload_ollama_models(self): """Unload all Ollama models to free VRAM for heavy operations like FLUX""" ollama_url = os.getenv("OLLAMA_BASE_URL", "http://172.18.0.1:11434") models_to_unload = ["qwen3:8b", "qwen3-vl:8b"] for model in models_to_unload: try: await self.http_client.post( f"{ollama_url}/api/generate", json={"model": model, "keep_alive": 0}, timeout=5.0 ) logger.info(f"🧹 Unloaded Ollama model: {model}") except Exception as e: logger.debug(f"Could not unload {model}: {e}") # Give GPU time to release memory import asyncio await asyncio.sleep(1) async def _unload_flux(self): """Unload FLUX model after image generation to free VRAM""" try: # Try to unload flux-klein-4b model await self.http_client.post( f"{self.swapper_url}/image/models/flux-klein-4b/unload", timeout=10.0 ) logger.info("🧹 Unloaded FLUX model from Swapper") except Exception as e: logger.debug(f"Could not unload FLUX: {e}") async def _image_generate(self, args: Dict) -> ToolResult: """Backward-compatible image generation entrypoint routed to Comfy (NODE3).""" if not args.get("prompt"): return ToolResult(success=False, result=None, error="prompt is required") comfy_args = dict(args) comfy_args.setdefault("negative_prompt", "blurry, low quality, watermark") comfy_args.setdefault("steps", 28) comfy_args.setdefault("timeout_s", 180) return await self._comfy_generate_image(comfy_args) async def _poll_comfy_job(self, job_id: str, timeout_s: int = 180) -> Dict[str, Any]: """Poll Comfy Agent job status until terminal state or timeout.""" loop = asyncio.get_running_loop() deadline = loop.time() + max(10, timeout_s) delay = 1.0 last: Dict[str, Any] = {} while loop.time() < deadline: resp = await self.http_client.get(f"{self.comfy_agent_url}/status/{job_id}", timeout=30.0) if resp.status_code != 200: raise RuntimeError(f"Comfy status failed: {resp.status_code}") data = resp.json() last = data status = (data.get("status") or "").lower() if status in {"succeeded", "finished"}: return data if status in {"failed", "canceled", "cancelled", "expired"}: return data await asyncio.sleep(delay) delay = min(delay * 1.5, 5.0) raise TimeoutError(f"Comfy job timeout after {timeout_s}s (job_id={job_id})") async def _comfy_generate_image(self, args: Dict) -> ToolResult: """Generate image via Comfy Agent on NODE3 and return URL when ready.""" prompt = args.get("prompt") if not prompt: return ToolResult(success=False, result=None, error="prompt is required") payload = { "prompt": prompt, "negative_prompt": args.get("negative_prompt", "blurry, low quality, watermark"), "width": int(args.get("width", 512)), "height": int(args.get("height", 512)), "steps": int(args.get("steps", 28)), } if args.get("seed") is not None: payload["seed"] = int(args["seed"]) timeout_s = int(args.get("timeout_s", 180)) idem_key = args.get("idempotency_key") or f"router-{uuid.uuid4().hex}" try: resp = await self.http_client.post( f"{self.comfy_agent_url}/generate/image", json=payload, headers={"Idempotency-Key": idem_key}, timeout=30.0, ) if resp.status_code != 200: return ToolResult(success=False, result=None, error=f"Comfy image request failed: {resp.status_code}") created = resp.json() job_id = created.get("job_id") if not job_id: return ToolResult(success=False, result=None, error="Comfy image request did not return job_id") final = await self._poll_comfy_job(job_id, timeout_s=timeout_s) status = (final.get("status") or "").lower() if status in {"succeeded", "finished"}: result_url = final.get("result_url") if result_url: return ToolResult(success=True, result=f"✅ Зображення згенеровано: {result_url}") return ToolResult(success=True, result=f"✅ Зображення згенеровано. job_id={job_id}") return ToolResult(success=False, result=None, error=final.get("error") or f"Comfy image failed (status={status})") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _comfy_generate_video(self, args: Dict) -> ToolResult: """Generate video via Comfy Agent on NODE3 and return URL when ready.""" prompt = args.get("prompt") if not prompt: return ToolResult(success=False, result=None, error="prompt is required") payload = { "prompt": prompt, "seconds": int(args.get("seconds", 4)), "fps": int(args.get("fps", 24)), "steps": int(args.get("steps", 30)), } if args.get("seed") is not None: payload["seed"] = int(args["seed"]) timeout_s = int(args.get("timeout_s", 300)) idem_key = args.get("idempotency_key") or f"router-{uuid.uuid4().hex}" try: resp = await self.http_client.post( f"{self.comfy_agent_url}/generate/video", json=payload, headers={"Idempotency-Key": idem_key}, timeout=30.0, ) if resp.status_code != 200: return ToolResult(success=False, result=None, error=f"Comfy video request failed: {resp.status_code}") created = resp.json() job_id = created.get("job_id") if not job_id: return ToolResult(success=False, result=None, error="Comfy video request did not return job_id") final = await self._poll_comfy_job(job_id, timeout_s=timeout_s) status = (final.get("status") or "").lower() if status in {"succeeded", "finished"}: result_url = final.get("result_url") if result_url: return ToolResult(success=True, result=f"✅ Відео згенеровано: {result_url}") return ToolResult(success=True, result=f"✅ Відео згенеровано. job_id={job_id}") return ToolResult(success=False, result=None, error=final.get("error") or f"Comfy video failed (status={status})") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _graph_query(self, args: Dict, agent_id: str = None) -> ToolResult: """Query knowledge graph""" query = args.get("query") entity_type = args.get("entity_type") # Simple natural language to Cypher conversion cypher = f""" MATCH (n) WHERE toLower(n.name) CONTAINS toLower('{query}') OR toLower(toString(n)) CONTAINS toLower('{query}') RETURN labels(n)[0] as type, n.name as name, n.node_id as id LIMIT 10 """ if entity_type: cypher = f""" MATCH (n:{entity_type}) WHERE toLower(n.name) CONTAINS toLower('{query}') RETURN n.name as name, n.node_id as id LIMIT 10 """ try: # Execute via Router's graph endpoint resp = await self.http_client.post( "http://localhost:8000/v1/graph/query", json={"query": cypher} ) if resp.status_code == 200: data = resp.json() return ToolResult(success=True, result=json.dumps(data.get("results", []), ensure_ascii=False)) else: return ToolResult(success=False, result=None, error=f"Graph query failed: {resp.status_code}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _remember_fact(self, args: Dict, agent_id: str = None, chat_id: str = None, user_id: str = None) -> ToolResult: """Store a fact in memory with strict args validation.""" if not isinstance(args, dict) or not args: logger.warning("⚠️ remember_fact blocked: empty args") return ToolResult(success=False, result=None, error="invalid_tool_args: remember_fact requires {fact: }.") fact_raw = args.get("fact") if fact_raw is None: fact_raw = args.get("text") if not isinstance(fact_raw, str): logger.warning("⚠️ remember_fact blocked: fact/text must be string") return ToolResult(success=False, result=None, error="invalid_tool_args: fact/text must be string.") fact = fact_raw.strip() if not fact: logger.warning("⚠️ remember_fact blocked: empty fact/text") return ToolResult(success=False, result=None, error="invalid_tool_args: fact/text must be non-empty.") if len(fact) > 2000: logger.warning("⚠️ remember_fact blocked: fact too long (%s)", len(fact)) return ToolResult(success=False, result=None, error="invalid_tool_args: fact/text is too long (max 2000 chars).") category = str(args.get("category") or "general").strip() or "general" runtime_user_id = (str(user_id or "").strip() or str(args.get("user_id") or "").strip() or str(args.get("about") or "").strip()) if not runtime_user_id: logger.warning("⚠️ remember_fact blocked: missing runtime user_id") return ToolResult(success=False, result=None, error="invalid_tool_args: missing runtime user_id for memory write.") fact_hash = hashlib.sha1(fact.encode("utf-8")).hexdigest()[:12] fact_key = f"{category}_{fact_hash}" try: resp = await self.http_client.post( "http://memory-service:8000/facts/upsert", json={ "user_id": runtime_user_id, "fact_key": fact_key, "fact_value": fact, "fact_value_json": { "text": fact, "category": category, "about": runtime_user_id, "agent_id": agent_id, "chat_id": chat_id, "source": "remember_fact_tool", }, }, ) if resp.status_code in [200, 201]: return ToolResult(success=True, result=f"✅ Запам'ятовано факт ({category})") return ToolResult(success=False, result=None, error=f"memory_store_failed:{resp.status_code}:{resp.text[:160]}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _presentation_create(self, args: Dict) -> ToolResult: """Create a presentation via Presentation Renderer""" title = args.get("title", "Презентація") slides = args.get("slides", []) brand_id = args.get("brand_id", "energyunion") theme_version = args.get("theme_version", "v1.0.0") language = args.get("language", "uk") # Build SlideSpec slidespec = { "meta": { "title": title, "brand_id": brand_id, "theme_version": theme_version, "language": language }, "slides": [] } # Add title slide slidespec["slides"].append({ "type": "title", "title": title }) # Add content slides for slide in slides: slide_obj = { "type": "content", "title": slide.get("title", ""), "body": slide.get("content", "") } slidespec["slides"].append(slide_obj) try: renderer_url = os.getenv("PRESENTATION_RENDERER_URL", "http://presentation-renderer:9600") resp = await self.http_client.post( f"{renderer_url}/present/render", json=slidespec, timeout=120.0 ) if resp.status_code == 200: data = resp.json() job_id = data.get("job_id") artifact_id = data.get("artifact_id") return ToolResult( success=True, result=f"📊 Презентацію створено!\n\n🆔 Job ID: `{job_id}`\n📦 Artifact ID: `{artifact_id}`\n\nЩоб перевірити статус: використай presentation_status\nЩоб завантажити: використай presentation_download" ) else: error_text = resp.text[:200] if resp.text else "Unknown error" return ToolResult(success=False, result=None, error=f"Render failed ({resp.status_code}): {error_text}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _presentation_status(self, args: Dict) -> ToolResult: """Check presentation job status""" job_id = args.get("job_id") try: registry_url = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9700") resp = await self.http_client.get( f"{registry_url}/jobs/{job_id}", timeout=10.0 ) if resp.status_code == 200: data = resp.json() status = data.get("status", "unknown") artifact_id = data.get("artifact_id") error = data.get("error_text", "") status_emoji = {"queued": "⏳", "running": "🔄", "done": "✅", "failed": "❌"}.get(status, "❓") result = f"{status_emoji} Статус: **{status}**\n" if artifact_id: result += f"📦 Artifact ID: `{artifact_id}`\n" if status == "done": result += "\n✅ Презентація готова! Використай presentation_download щоб отримати файл." if status == "failed" and error: result += f"\n❌ Помилка: {error[:200]}" return ToolResult(success=True, result=result) elif resp.status_code == 404: return ToolResult(success=False, result=None, error="Job not found") else: return ToolResult(success=False, result=None, error=f"Status check failed: {resp.status_code}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _presentation_download(self, args: Dict) -> ToolResult: """Get download link for presentation""" artifact_id = args.get("artifact_id") file_format = args.get("format", "pptx") try: registry_url = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9700") resp = await self.http_client.get( f"{registry_url}/artifacts/{artifact_id}/download?format={file_format}", timeout=10.0, follow_redirects=False ) if resp.status_code in [200, 302, 307]: # Check for signed URL in response or Location header if resp.status_code in [302, 307]: download_url = resp.headers.get("Location") else: data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {} download_url = data.get("download_url") or data.get("url") if download_url: return ToolResult( success=True, result=f"📥 **Посилання для завантаження ({file_format.upper()}):**\n\n{download_url}\n\n⏰ Посилання дійсне 30 хвилин." ) else: # Direct binary response - artifact available return ToolResult( success=True, result=f"✅ Файл {file_format.upper()} готовий! Завантажити можна через: {registry_url}/artifacts/{artifact_id}/download?format={file_format}" ) elif resp.status_code == 404: return ToolResult(success=False, result=None, error=f"Формат {file_format.upper()} ще не готовий. Спробуй пізніше.") else: return ToolResult(success=False, result=None, error=f"Download failed: {resp.status_code}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) async def _crawl4ai_scrape(self, args: Dict) -> ToolResult: """Deep scrape a web page using Crawl4AI - PRIORITY 5""" url = args.get("url") extract_links = args.get("extract_links", True) extract_images = args.get("extract_images", False) if not url: return ToolResult(success=False, result=None, error="URL is required") try: crawl4ai_url = os.getenv("CRAWL4AI_URL", "http://dagi-crawl4ai-node1:11235") payload = { "urls": [url], "priority": 5, "session_id": f"agent_scrape_{hash(url) % 10000}" } resp = await self.http_client.post( f"{crawl4ai_url}/crawl", json=payload, timeout=60.0 ) if resp.status_code == 200: data = resp.json() results = data.get("results", []) if isinstance(data, dict) else [] if not results and isinstance(data, dict): results = [data] if results: result = results[0] if isinstance(results, list) else results markdown = result.get("markdown", "") or result.get("cleaned_html", "") or result.get("text", "") title = result.get("title", url) if len(markdown) > 3000: markdown = markdown[:3000] + "... (скорочено)" response_parts = [f"**{title}**", "", markdown] if extract_links: links = result.get("links", []) if links: response_parts.append("") response_parts.append("**Посилання:**") for link in links[:10]: if isinstance(link, dict): link_url = link.get("href", "") else: link_url = str(link) if link_url: response_parts.append(f"- {link_url}") return ToolResult(success=True, result="\n".join(response_parts)) else: return ToolResult(success=False, result=None, error="No content extracted") else: return ToolResult(success=False, result=None, error=f"Crawl failed: {resp.status_code}") except Exception as e: logger.error(f"Crawl4AI scrape failed: {e}") return ToolResult(success=False, result=None, error=str(e)) async def _tts_speak(self, args: Dict) -> ToolResult: """Convert text to speech using Swapper TTS - PRIORITY 6""" text = args.get("text") language = args.get("language", "uk") if not text: return ToolResult(success=False, result=None, error="Text is required") try: if len(text) > 1000: text = text[:1000] resp = await self.http_client.post( f"{self.swapper_url}/tts", json={"text": text, "language": language}, timeout=60.0 ) if resp.status_code == 200: data = resp.json() audio_url = data.get("audio_url") or data.get("url") if audio_url: return ToolResult(success=True, result=f"Аудіо: {audio_url}") else: return ToolResult(success=True, result="TTS completed") else: return ToolResult(success=False, result=None, error=f"TTS failed: {resp.status_code}") except Exception as e: logger.error(f"TTS failed: {e}") return ToolResult(success=False, result=None, error=str(e)) async def _market_data(self, args: Dict) -> ToolResult: """Query real-time market data from Market Data Service and SenpAI MD Consumer.""" symbol = str(args.get("symbol", "BTCUSDT")).upper() query_type = str(args.get("query_type", "all")).lower() md_url = os.getenv("MARKET_DATA_URL", "http://dagi-market-data-node1:8891") consumer_url = os.getenv("SENPAI_CONSUMER_URL", "http://dagi-senpai-md-consumer-node1:8892") results: Dict[str, Any] = {} try: async with httpx.AsyncClient(timeout=8.0) as client: if query_type in ("price", "all"): try: resp = await client.get(f"{md_url}/latest", params={"symbol": symbol}) if resp.status_code == 200: data = resp.json() trade = data.get("latest_trade", {}) or {} quote = data.get("latest_quote", {}) or {} bid = quote.get("bid") ask = quote.get("ask") spread = None if isinstance(bid, (int, float)) and isinstance(ask, (int, float)): spread = round(ask - bid, 6) results["price"] = { "symbol": symbol, "last_price": trade.get("price"), "size": trade.get("size"), "side": trade.get("side"), "bid": bid, "ask": ask, "spread": spread, "provider": trade.get("provider"), "timestamp": trade.get("ts_recv"), } else: results["price_error"] = f"market-data status={resp.status_code}" except Exception as e: results["price_error"] = str(e) if query_type in ("features", "all"): try: resp = await client.get(f"{consumer_url}/features/latest", params={"symbol": symbol}) if resp.status_code == 200: data = resp.json() feats = data.get("features", {}) or {} results["features"] = { "symbol": symbol, "mid_price": feats.get("mid"), "spread_bps": round(float(feats.get("spread_bps", 0) or 0), 2), "vwap_10s": round(float(feats.get("trade_vwap_10s", 0) or 0), 2), "vwap_60s": round(float(feats.get("trade_vwap_60s", 0) or 0), 2), "trade_count_10s": int(feats.get("trade_count_10s", 0) or 0), "trade_volume_10s": round(float(feats.get("trade_volume_10s", 0) or 0), 4), "return_10s_pct": round(float(feats.get("return_10s", 0) or 0) * 100, 4), "realized_vol_60s_pct": round(float(feats.get("realized_vol_60s", 0) or 0) * 100, 6), "latency_p50_ms": round(float(feats.get("latency_ms_p50", 0) or 0), 1), "latency_p95_ms": round(float(feats.get("latency_ms_p95", 0) or 0), 1), } else: results["features_error"] = f"senpai-consumer status={resp.status_code}" except Exception as e: results["features_error"] = str(e) if not results: return ToolResult(success=False, result=None, error=f"No market data for {symbol}") return ToolResult(success=True, result=json.dumps(results, ensure_ascii=False)) except Exception as e: logger.error(f"Market data tool error: {e}") return ToolResult(success=False, result=None, error=str(e)) async def close(self): await self.http_client.aclose() def _strip_think_tags(text: str) -> str: """Remove ... tags from DeepSeek responses.""" import re text = re.sub(r'.*?', '', text, flags=re.DOTALL) text = re.sub(r'.*$', '', text, flags=re.DOTALL) # unclosed tag return text.strip() def format_tool_calls_for_response(tool_results: List[Dict], fallback_mode: str = "normal") -> str: """ Format tool results in human-friendly way - NOT raw data! Args: tool_results: List of tool execution results fallback_mode: "normal" | "dsml_detected" | "empty_response" """ # Special handling for DSML detection - LLM tried to use tools but got confused # If we have successful tool results, show them instead of generic fallback if fallback_mode == "dsml_detected": # Check if any tool succeeded with a useful result if tool_results: for tr in tool_results: if tr.get("success") and tr.get("result"): # Avoid dumping raw retrieval/search payloads to the user. # These often look like "memory dumps" and are perceived as incorrect answers. tool_name = (tr.get("name") or "").strip() if tool_name in {"memory_search", "web_search", "web_extract", "web_read"}: continue result = str(tr.get("result", "")) if result and len(result) > 10 and "error" not in result.lower(): # We have a useful tool result - use it! if len(result) > 600: return result[:600] + "..." return result # No useful tool results - give presence acknowledgment return "Вибач, відповідь згенерувалась некоректно. Спробуй ще раз (коротше/конкретніше) або повтори питання одним реченням." if not tool_results: if fallback_mode == "empty_response": return "Вибач, щось пішло не так. Спробуй ще раз." return "Вибач, не вдалося виконати запит." # Check what tools were used tool_names = [tr.get("name", "") for tr in tool_results] # Check if ANY tool succeeded any_success = any(tr.get("success") for tr in tool_results) if not any_success: # All tools failed - give helpful message errors = [tr.get("error", "unknown") for tr in tool_results if tr.get("error")] if errors: logger.warning(f"All tools failed: {errors}") return "Вибач, виникла технічна проблема. Спробуй ще раз або переформулюй питання." # Image generation - special handling if "image_generate" in tool_names: for tr in tool_results: if tr.get("name") == "image_generate" and tr.get("success"): return "✅ Зображення згенеровано!" if "comfy_generate_image" in tool_names: for tr in tool_results: if tr.get("name") == "comfy_generate_image" and tr.get("success"): return str(tr.get("result", "✅ Зображення згенеровано через ComfyUI")) if "comfy_generate_video" in tool_names: for tr in tool_results: if tr.get("name") == "comfy_generate_video" and tr.get("success"): return str(tr.get("result", "✅ Відео згенеровано через ComfyUI")) # Web search - show actual results to user if "web_search" in tool_names: for tr in tool_results: if tr.get("name") == "web_search": if tr.get("success"): result = tr.get("result", "") if not result: return "🔍 Не знайшов релевантної інформації в інтернеті." # Parse and format results for user lines = result.strip().split("\n") formatted = ["🔍 **Результати пошуку:**\n"] current_title = "" current_url = "" current_snippet = "" count = 0 for line in lines: line = line.strip() if line.startswith("- ") and not line.startswith("- URL:"): if current_title and count < 3: # Show max 3 results formatted.append(f"**{count}. {current_title}**") if current_snippet: formatted.append(f" {current_snippet[:150]}...") if current_url: formatted.append(f" 🔗 {current_url}\n") current_title = line[2:].strip() current_snippet = "" current_url = "" count += 1 elif "URL:" in line: current_url = line.split("URL:")[-1].strip() elif line and not line.startswith("-"): current_snippet = line # Add last result if current_title and count <= 3: formatted.append(f"**{count}. {current_title}**") if current_snippet: formatted.append(f" {current_snippet[:150]}...") if current_url: formatted.append(f" 🔗 {current_url}") if len(formatted) > 1: return "\n".join(formatted) else: return "🔍 Не знайшов релевантної інформації в інтернеті." else: return "🔍 Пошук в інтернеті не вдався. Спробуй ще раз." # Memory search if "memory_search" in tool_names: for tr in tool_results: if tr.get("name") == "memory_search" and tr.get("success"): result = tr.get("result", "") if "немає інформації" in result.lower() or not result: return "🧠 В моїй пам'яті немає інформації про це." # Truncate if too long if len(result) > 500: return result[:500] + "..." return result # Graph query if "graph_query" in tool_names: for tr in tool_results: if tr.get("name") == "graph_query" and tr.get("success"): result = tr.get("result", "") if not result or "не знайдено" in result.lower(): return "📊 В базі знань немає інформації про це." if len(result) > 500: return result[:500] + "..." return result # Default fallback - check if we have any result to show for tr in tool_results: if tr.get("success") and tr.get("result"): result = str(tr.get("result", "")) if result and len(result) > 10: # We have something, show it if len(result) > 400: return result[:400] + "..." return result # Really nothing useful - be honest return "Я обробив твій запит, але не знайшов корисної інформації. Можеш уточнити питання?"