Files
microdao-daarion/services/router/tool_manager.py

1971 lines
87 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Tool Manager for Helion Agent
Implements OpenAI-compatible function calling for DeepSeek, Mistral, Grok
"""
import os
import asyncio
import uuid
from agent_tools_config import get_agent_tools, is_tool_allowed
import json
import logging
import hashlib
import base64
import csv
import httpx
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from io import BytesIO, StringIO
from pathlib import PurePath
from zipfile import ZIP_DEFLATED, ZipFile
logger = logging.getLogger(__name__)
# Tool definitions in OpenAI function calling format
# ORDER MATTERS: Memory/Graph tools first, then web search as fallback
TOOL_DEFINITIONS = [
# PRIORITY 1: Internal knowledge sources (use FIRST)
{
"type": "function",
"function": {
"name": "memory_search",
"description": "🔍 ПЕРШИЙ КРОК для пошуку! Шукає в моїй пам'яті: збережені факти, документи, розмови. ЗАВЖДИ використовуй спочатку перед web_search!",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Що шукати в пам'яті"
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "graph_query",
"description": "🔍 Пошук в Knowledge Graph - зв'язки між проєктами, людьми, темами Energy Union. Використовуй для питань про проєкти, партнерів, технології.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Що шукати (назва проєкту, людини, теми)"
},
"entity_type": {
"type": "string",
"enum": ["User", "Topic", "Project", "Fact"],
"description": "Тип сутності для пошуку"
}
},
"required": ["query"]
}
}
},
# PRIORITY 2: Web search (use ONLY if memory/graph don't have info)
{
"type": "function",
"function": {
"name": "web_search",
"description": "🌐 Пошук в інтернеті. Використовуй ТІЛЬКИ якщо memory_search і graph_query не знайшли потрібної інформації!",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Пошуковий запит"
},
"max_results": {
"type": "integer",
"description": "Максимальна кількість результатів (1-10)",
"default": 5
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "web_extract",
"description": "Витягнути текстовий контент з веб-сторінки за URL",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL сторінки для читання"
}
},
"required": ["url"]
}
}
},
# PRIORITY 3: Generation tools
{
"type": "function",
"function": {
"name": "image_generate",
"description": "🎨 Згенерувати зображення за текстовим описом (FLUX)",
"parameters": {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Опис зображення для генерації (англійською краще)"
},
"width": {
"type": "integer",
"description": "Ширина зображення",
"default": 512
},
"height": {
"type": "integer",
"description": "Висота зображення",
"default": 512
}
},
"required": ["prompt"]
}
}
},
{
"type": "function",
"function": {
"name": "comfy_generate_image",
"description": "🖼️ Згенерувати зображення через ComfyUI (NODE3, Stable Diffusion). Для високої якості та детальних зображень.",
"parameters": {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Детальний опис зображення (англійською)"
},
"negative_prompt": {
"type": "string",
"description": "Що НЕ включати в зображення",
"default": "blurry, low quality, watermark"
},
"width": {
"type": "integer",
"description": "Ширина (512, 768, 1024)",
"default": 512
},
"height": {
"type": "integer",
"description": "Висота (512, 768, 1024)",
"default": 512
},
"steps": {
"type": "integer",
"description": "Кількість кроків генерації (20-50)",
"default": 28
}
},
"required": ["prompt"]
}
}
},
{
"type": "function",
"function": {
"name": "comfy_generate_video",
"description": "🎬 Згенерувати відео через ComfyUI (NODE3, LTX-2). Text-to-video для коротких кліпів.",
"parameters": {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Детальний опис відео (англійською)"
},
"seconds": {
"type": "integer",
"description": "Тривалість в секундах (2-8)",
"default": 4
},
"fps": {
"type": "integer",
"description": "Кадри в секунду (24-30)",
"default": 24
},
"steps": {
"type": "integer",
"description": "Кількість кроків генерації (20-40)",
"default": 30
}
},
"required": ["prompt"]
}
}
},
{
"type": "function",
"function": {
"name": "remember_fact",
"description": "Запам'ятати важливий факт про користувача або тему",
"parameters": {
"type": "object",
"properties": {
"fact": {
"type": "string",
"description": "Факт для запам'ятовування"
},
"about": {
"type": "string",
"description": "Про кого/що цей факт (username або тема)"
},
"category": {
"type": "string",
"enum": ["personal", "technical", "preference", "project"],
"description": "Категорія факту"
}
},
"required": ["fact"]
}
}
},
# PRIORITY 4: Document/Presentation tools
{
"type": "function",
"function": {
"name": "presentation_create",
"description": "📊 Створити презентацію PowerPoint. Використовуй коли користувач просить 'створи презентацію', 'зроби презентацію', 'підготуй слайди'.",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Назва презентації"
},
"slides": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "Заголовок слайду"},
"content": {"type": "string", "description": "Контент слайду (markdown)"}
}
},
"description": "Масив слайдів: [{title, content}]"
},
"brand_id": {
"type": "string",
"description": "ID бренду для стилю (energyunion, greenfood, nutra)",
"default": "energyunion"
},
"theme_version": {
"type": "string",
"description": "Версія теми",
"default": "v1.0.0"
},
"language": {
"type": "string",
"enum": ["uk", "en", "ru"],
"description": "Мова презентації",
"default": "uk"
}
},
"required": ["title", "slides"]
}
}
},
{
"type": "function",
"function": {
"name": "presentation_status",
"description": "📋 Перевірити статус створення презентації за job_id",
"parameters": {
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "ID завдання рендерингу"
}
},
"required": ["job_id"]
}
}
},
{
"type": "function",
"function": {
"name": "presentation_download",
"description": "📥 Отримати посилання на готову презентацію за artifact_id",
"parameters": {
"type": "object",
"properties": {
"artifact_id": {
"type": "string",
"description": "ID артефакту презентації"
},
"format": {
"type": "string",
"enum": ["pptx", "pdf"],
"description": "Формат файлу",
"default": "pptx"
}
},
"required": ["artifact_id"]
}
}
},
{
"type": "function",
"function": {
"name": "file_tool",
"description": "📁 Універсальний file tool для створення та оновлення CSV/JSON/YAML/ZIP і інших форматів через action-based API.",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [
"excel_create", "excel_update", "docx_create", "docx_update",
"csv_create", "csv_update", "pdf_fill", "pdf_merge", "pdf_split",
"json_export", "yaml_export", "zip_bundle"
],
"description": "Дія file tool"
},
"file_name": {
"type": "string",
"description": "Назва файлу-результату"
},
"file_base64": {
"type": "string",
"description": "Вхідний файл у base64 для update-операцій"
},
"content": {
"description": "Контент для json/yaml export"
},
"headers": {
"type": "array",
"items": {"type": "string"},
"description": "Заголовки для CSV"
},
"rows": {
"type": "array",
"description": "Рядки для CSV"
},
"entries": {
"type": "array",
"description": "Елементи для zip_bundle"
},
"operation": {
"type": "string",
"enum": ["append", "replace"],
"description": "Режим csv_update"
}
},
"required": ["action"]
}
}
},
# PRIORITY 5: Web Scraping tools
{
"type": "function",
"function": {
"name": "crawl4ai_scrape",
"description": "🕷️ Глибокий скрейпінг веб-сторінки через Crawl4AI. Витягує повний контент, структуровані дані, медіа. Використовуй для детального аналізу сайтів.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL сторінки для скрейпінгу"
},
"extract_links": {
"type": "boolean",
"description": "Витягувати посилання зі сторінки",
"default": True
},
"extract_images": {
"type": "boolean",
"description": "Витягувати зображення",
"default": False
}
},
"required": ["url"]
}
}
},
# PRIORITY 6: TTS tools
{
"type": "function",
"function": {
"name": "tts_speak",
"description": "🔊 Перетворити текст на аудіо (Text-to-Speech). Повертає аудіо файл. Використовуй коли користувач просить озвучити текст.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Текст для озвучення"
},
"language": {
"type": "string",
"enum": ["uk", "en", "ru"],
"description": "Мова озвучення",
"default": "uk"
}
},
"required": ["text"]
}
}
},
# PRIORITY 7: Market Data tools (SenpAI)
{
"type": "function",
"function": {
"name": "market_data",
"description": "📊 Отримати real-time ринкові дані: поточну ціну, котирування, обсяги, аналітичні фічі (VWAP, spread, volatility, trade signals). Доступні символи: BTCUSDT, ETHUSDT.",
"parameters": {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Торговий символ (наприклад BTCUSDT, ETHUSDT)"
},
"query_type": {
"type": "string",
"enum": ["price", "features", "all"],
"description": "Тип запиту: price (ціна + котирування), features (аналітичні фічі), all (все разом)",
"default": "all"
}
},
"required": ["symbol"]
}
}
}
]
@dataclass
class ToolResult:
"""Result of tool execution"""
success: bool
result: Any
error: Optional[str] = None
image_base64: Optional[str] = None # For image generation results
file_base64: Optional[str] = None
file_name: Optional[str] = None
file_mime: Optional[str] = None
class ToolManager:
"""Manages tool execution for the agent"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.http_client = httpx.AsyncClient(timeout=60.0)
self.swapper_url = os.getenv("SWAPPER_URL", "http://swapper-service:8890")
self.comfy_agent_url = os.getenv("COMFY_AGENT_URL", "http://212.8.58.133:8880")
self.tools_config = self._load_tools_config()
def _load_tools_config(self) -> Dict[str, Dict]:
"""Load tool endpoints from config"""
tools = {}
agent_config = self.config.get("agents", {}).get("helion", {})
for tool in agent_config.get("tools", []):
if "endpoint" in tool:
tools[tool["id"]] = {
"endpoint": tool["endpoint"],
"method": tool.get("method", "POST")
}
return tools
def get_tool_definitions(self, agent_id: str = None) -> List[Dict]:
"""Get tool definitions for function calling, filtered by agent permissions"""
if not agent_id:
return TOOL_DEFINITIONS
# Get allowed tools for this agent
allowed_tools = get_agent_tools(agent_id)
# Filter tool definitions
filtered = []
for tool_def in TOOL_DEFINITIONS:
tool_name = tool_def.get("function", {}).get("name")
if tool_name in allowed_tools:
filtered.append(tool_def)
tool_names = [t.get("function", {}).get("name") for t in filtered]
logger.debug(f"Agent {agent_id} has {len(filtered)} tools: {tool_names}")
return filtered
async def execute_tool(
self,
tool_name: str,
arguments: Dict[str, Any],
agent_id: str = None,
chat_id: str = None,
user_id: str = None,
) -> ToolResult:
"""Execute a tool and return result. Optionally checks agent permissions."""
logger.info(f"🔧 Executing tool: {tool_name} for agent={agent_id} with args: {arguments}")
# Check agent permission if agent_id provided
if agent_id and not is_tool_allowed(agent_id, tool_name):
logger.warning(f"⚠️ Tool {tool_name} not allowed for agent {agent_id}")
return ToolResult(success=False, result=None, error=f"Tool {tool_name} not available for this agent")
try:
# Priority 1: Memory/Knowledge tools
if tool_name == "memory_search":
return await self._memory_search(arguments, agent_id=agent_id, chat_id=chat_id, user_id=user_id)
elif tool_name == "graph_query":
return await self._graph_query(arguments, agent_id=agent_id)
# Priority 2: Web tools
elif tool_name == "web_search":
return await self._web_search(arguments)
elif tool_name == "web_extract":
return await self._web_extract(arguments)
elif tool_name == "image_generate":
return await self._image_generate(arguments)
elif tool_name == "comfy_generate_image":
return await self._comfy_generate_image(arguments)
elif tool_name == "comfy_generate_video":
return await self._comfy_generate_video(arguments)
elif tool_name == "remember_fact":
return await self._remember_fact(arguments, agent_id=agent_id, chat_id=chat_id, user_id=user_id)
# Priority 4: Presentation tools
elif tool_name == "presentation_create":
return await self._presentation_create(arguments)
elif tool_name == "presentation_status":
return await self._presentation_status(arguments)
elif tool_name == "presentation_download":
return await self._presentation_download(arguments)
# Priority 5: Web scraping tools
elif tool_name == "crawl4ai_scrape":
return await self._crawl4ai_scrape(arguments)
# Priority 6: TTS tools
elif tool_name == "tts_speak":
return await self._tts_speak(arguments)
# Priority 6: File artifacts
elif tool_name == "file_tool":
return await self._file_tool(arguments)
# Priority 7: Market Data (SenpAI)
elif tool_name == "market_data":
return await self._market_data(arguments)
else:
return ToolResult(success=False, result=None, error=f"Unknown tool: {tool_name}")
except Exception as e:
logger.error(f"Tool execution failed: {e}")
return ToolResult(success=False, result=None, error=str(e))
@staticmethod
def _sanitize_file_name(name: Optional[str], default_name: str, force_ext: Optional[str] = None) -> str:
raw = (name or default_name).strip() or default_name
base = PurePath(raw).name
if not base:
base = default_name
if force_ext and not base.lower().endswith(force_ext):
base = f"{base}{force_ext}"
return base
@staticmethod
def _b64_from_bytes(data: bytes) -> str:
return base64.b64encode(data).decode("utf-8")
@staticmethod
def _bytes_from_b64(value: str) -> bytes:
return base64.b64decode(value)
@staticmethod
def _normalize_rows(rows: Any, headers: Optional[List[str]] = None) -> List[List[Any]]:
if not isinstance(rows, list):
return []
out: List[List[Any]] = []
for row in rows:
if isinstance(row, list):
out.append(row)
elif isinstance(row, dict):
keys = headers or list(row.keys())
out.append([row.get(k, "") for k in keys])
else:
out.append([row])
return out
@staticmethod
def _append_sheet_data(ws: Any, headers: List[str], rows: List[List[Any]]) -> None:
if headers:
ws.append(headers)
for row in rows:
ws.append(row)
async def _file_tool(self, args: Dict[str, Any]) -> ToolResult:
action = str((args or {}).get("action") or "").strip().lower()
if not action:
return ToolResult(success=False, result=None, error="Missing action")
if action == "excel_create":
return self._file_excel_create(args)
if action == "excel_update":
return self._file_excel_update(args)
if action == "csv_create":
return self._file_csv_create(args)
if action == "csv_update":
return self._file_csv_update(args)
if action == "json_export":
return self._file_json_export(args)
if action == "yaml_export":
return self._file_yaml_export(args)
if action == "zip_bundle":
return self._file_zip_bundle(args)
if action == "docx_create":
return self._file_docx_create(args)
if action == "docx_update":
return self._file_docx_update(args)
if action == "pdf_merge":
return self._file_pdf_merge(args)
if action == "pdf_split":
return self._file_pdf_split(args)
if action == "pdf_fill":
return self._file_pdf_fill(args)
return ToolResult(success=False, result=None, error=f"Action not implemented yet: {action}")
def _file_csv_create(self, args: Dict[str, Any]) -> ToolResult:
file_name = self._sanitize_file_name(args.get("file_name"), "export.csv", force_ext=".csv")
headers = args.get("headers") or []
rows_raw = args.get("rows") or []
rows = self._normalize_rows(rows_raw, headers=headers if headers else None)
if rows and not headers and isinstance(rows_raw[0], dict):
headers = list(rows_raw[0].keys())
rows = self._normalize_rows(rows_raw, headers=headers)
sio = StringIO(newline="")
writer = csv.writer(sio)
if headers:
writer.writerow(headers)
for row in rows:
writer.writerow(row)
data = sio.getvalue().encode("utf-8")
return ToolResult(
success=True,
result={"message": f"CSV created: {file_name}"},
file_base64=self._b64_from_bytes(data),
file_name=file_name,
file_mime="text/csv",
)
def _file_csv_update(self, args: Dict[str, Any]) -> ToolResult:
src_b64 = args.get("file_base64")
if not src_b64:
return ToolResult(success=False, result=None, error="file_base64 is required for csv_update")
file_name = self._sanitize_file_name(args.get("file_name"), "updated.csv", force_ext=".csv")
operation = str(args.get("operation") or "append").strip().lower()
if operation not in {"append", "replace"}:
return ToolResult(success=False, result=None, error="operation must be append|replace")
headers = args.get("headers") or []
rows_raw = args.get("rows") or []
rows = self._normalize_rows(rows_raw, headers=headers if headers else None)
if rows and not headers and isinstance(rows_raw[0], dict):
headers = list(rows_raw[0].keys())
rows = self._normalize_rows(rows_raw, headers=headers)
existing_rows: List[List[str]] = []
text = self._bytes_from_b64(src_b64).decode("utf-8")
if text.strip():
existing_rows = [list(r) for r in csv.reader(StringIO(text))]
out_rows: List[List[Any]] = []
if operation == "replace":
if headers:
out_rows.append(headers)
out_rows.extend(rows)
else:
if existing_rows:
out_rows.extend(existing_rows)
elif headers:
out_rows.append(headers)
out_rows.extend(rows)
sio = StringIO(newline="")
writer = csv.writer(sio)
for row in out_rows:
writer.writerow(row)
data = sio.getvalue().encode("utf-8")
return ToolResult(
success=True,
result={"message": f"CSV updated: {file_name}"},
file_base64=self._b64_from_bytes(data),
file_name=file_name,
file_mime="text/csv",
)
def _file_json_export(self, args: Dict[str, Any]) -> ToolResult:
file_name = self._sanitize_file_name(args.get("file_name"), "export.json", force_ext=".json")
content = args.get("content")
indent = int(args.get("indent") or 2)
payload = json.dumps(content, indent=indent, ensure_ascii=False).encode("utf-8")
return ToolResult(
success=True,
result={"message": f"JSON exported: {file_name}"},
file_base64=self._b64_from_bytes(payload),
file_name=file_name,
file_mime="application/json",
)
def _file_yaml_export(self, args: Dict[str, Any]) -> ToolResult:
file_name = self._sanitize_file_name(args.get("file_name"), "export.yaml", force_ext=".yaml")
content = args.get("content")
payload = json.dumps(content, ensure_ascii=False, indent=2).encode("utf-8")
try:
import yaml
payload = yaml.safe_dump(content, allow_unicode=True, sort_keys=False).encode("utf-8")
except Exception:
# Fallback to JSON serialization if PyYAML fails.
pass
return ToolResult(
success=True,
result={"message": f"YAML exported: {file_name}"},
file_base64=self._b64_from_bytes(payload),
file_name=file_name,
file_mime="application/x-yaml",
)
def _file_zip_bundle(self, args: Dict[str, Any]) -> ToolResult:
file_name = self._sanitize_file_name(args.get("file_name"), "bundle.zip", force_ext=".zip")
entries = args.get("entries") or []
if not isinstance(entries, list) or not entries:
return ToolResult(success=False, result=None, error="entries must be non-empty array")
out = BytesIO()
with ZipFile(out, mode="w", compression=ZIP_DEFLATED) as zf:
for idx, entry in enumerate(entries, start=1):
if not isinstance(entry, dict):
return ToolResult(success=False, result=None, error=f"entry[{idx-1}] must be object")
ename = self._sanitize_file_name(entry.get("file_name"), f"file_{idx}.bin")
if entry.get("file_base64"):
zf.writestr(ename, self._bytes_from_b64(entry["file_base64"]))
elif "text" in entry:
zf.writestr(ename, str(entry["text"]).encode("utf-8"))
elif "content" in entry:
zf.writestr(ename, json.dumps(entry["content"], ensure_ascii=False, indent=2).encode("utf-8"))
else:
return ToolResult(
success=False,
result=None,
error=f"entry[{idx-1}] requires file_base64|text|content",
)
return ToolResult(
success=True,
result={"message": f"ZIP bundle created: {file_name}"},
file_base64=self._b64_from_bytes(out.getvalue()),
file_name=file_name,
file_mime="application/zip",
)
def _file_excel_create(self, args: Dict[str, Any]) -> ToolResult:
import openpyxl
file_name = self._sanitize_file_name(args.get("file_name"), "report.xlsx", force_ext=".xlsx")
sheets = args.get("sheets")
wb = openpyxl.Workbook()
wb.remove(wb.active)
created = False
if isinstance(sheets, list) and sheets:
for idx, sheet in enumerate(sheets, start=1):
if not isinstance(sheet, dict):
return ToolResult(success=False, result=None, error=f"sheets[{idx-1}] must be object")
sheet_name = str(sheet.get("name") or f"Sheet{idx}")[:31]
headers = sheet.get("headers") or []
rows_raw = sheet.get("rows") or []
rows = self._normalize_rows(rows_raw, headers=headers if headers else None)
if rows and not headers and isinstance(rows_raw[0], dict):
headers = list(rows_raw[0].keys())
rows = self._normalize_rows(rows_raw, headers=headers)
ws = wb.create_sheet(title=sheet_name)
self._append_sheet_data(ws, headers, rows)
created = True
else:
sheet_name = str(args.get("sheet_name") or "Sheet1")[:31]
headers = args.get("headers") or []
rows_raw = args.get("rows") or []
rows = self._normalize_rows(rows_raw, headers=headers if headers else None)
if rows and not headers and isinstance(rows_raw[0], dict):
headers = list(rows_raw[0].keys())
rows = self._normalize_rows(rows_raw, headers=headers)
ws = wb.create_sheet(title=sheet_name)
self._append_sheet_data(ws, headers, rows)
created = True
if not created:
wb.create_sheet(title="Sheet1")
out = BytesIO()
wb.save(out)
return ToolResult(
success=True,
result={"message": f"Excel created: {file_name}"},
file_base64=self._b64_from_bytes(out.getvalue()),
file_name=file_name,
file_mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
def _file_excel_update(self, args: Dict[str, Any]) -> ToolResult:
import openpyxl
src_b64 = args.get("file_base64")
operations = args.get("operations") or []
if not src_b64:
return ToolResult(success=False, result=None, error="file_base64 is required for excel_update")
if not isinstance(operations, list) or not operations:
return ToolResult(success=False, result=None, error="operations must be non-empty array")
file_name = self._sanitize_file_name(args.get("file_name"), "updated.xlsx", force_ext=".xlsx")
wb = openpyxl.load_workbook(filename=BytesIO(self._bytes_from_b64(src_b64)))
for op in operations:
if not isinstance(op, dict):
return ToolResult(success=False, result=None, error="Each operation must be object")
op_type = str(op.get("type") or "").strip().lower()
if op_type == "append_rows":
sheet = str(op.get("sheet") or wb.sheetnames[0])[:31]
if sheet not in wb.sheetnames:
wb.create_sheet(title=sheet)
ws = wb[sheet]
rows_raw = op.get("rows") or []
header_row = [c.value for c in ws[1]] if ws.max_row >= 1 else []
rows = self._normalize_rows(rows_raw, headers=header_row if header_row else None)
if rows and not header_row and isinstance(rows_raw[0], dict):
header_row = list(rows_raw[0].keys())
ws.append(header_row)
rows = self._normalize_rows(rows_raw, headers=header_row)
for row in rows:
ws.append(row)
elif op_type == "set_cell":
sheet = str(op.get("sheet") or wb.sheetnames[0])[:31]
cell = op.get("cell")
if not cell:
return ToolResult(success=False, result=None, error="set_cell operation requires 'cell'")
if sheet not in wb.sheetnames:
wb.create_sheet(title=sheet)
wb[sheet][str(cell)] = op.get("value", "")
elif op_type == "replace_sheet":
sheet = str(op.get("sheet") or wb.sheetnames[0])[:31]
if sheet in wb.sheetnames:
wb.remove(wb[sheet])
ws = wb.create_sheet(title=sheet)
headers = op.get("headers") or []
rows_raw = op.get("rows") or []
rows = self._normalize_rows(rows_raw, headers=headers if headers else None)
if rows and not headers and isinstance(rows_raw[0], dict):
headers = list(rows_raw[0].keys())
rows = self._normalize_rows(rows_raw, headers=headers)
self._append_sheet_data(ws, headers, rows)
elif op_type == "rename_sheet":
src = str(op.get("from") or "")
dst = str(op.get("to") or "").strip()
if not src or not dst:
return ToolResult(success=False, result=None, error="rename_sheet requires 'from' and 'to'")
if src not in wb.sheetnames:
return ToolResult(success=False, result=None, error=f"Sheet not found: {src}")
wb[src].title = dst[:31]
else:
return ToolResult(success=False, result=None, error=f"Unsupported excel_update operation: {op_type}")
out = BytesIO()
wb.save(out)
return ToolResult(
success=True,
result={"message": f"Excel updated: {file_name}"},
file_base64=self._b64_from_bytes(out.getvalue()),
file_name=file_name,
file_mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
def _file_docx_create(self, args: Dict[str, Any]) -> ToolResult:
from docx import Document
file_name = self._sanitize_file_name(args.get("file_name"), "document.docx", force_ext=".docx")
doc = Document()
title = args.get("title")
if title:
doc.add_heading(str(title), level=1)
for item in args.get("paragraphs") or []:
doc.add_paragraph(str(item))
for table in args.get("tables") or []:
if not isinstance(table, dict):
continue
headers = [str(h) for h in (table.get("headers") or [])]
rows_raw = table.get("rows") or []
rows = self._normalize_rows(rows_raw, headers=headers if headers else None)
if rows and not headers and isinstance(rows_raw[0], dict):
headers = list(rows_raw[0].keys())
rows = self._normalize_rows(rows_raw, headers=headers)
total_rows = len(rows) + (1 if headers else 0)
total_cols = len(headers) if headers else (len(rows[0]) if rows else 1)
t = doc.add_table(rows=max(total_rows, 1), cols=max(total_cols, 1))
row_offset = 0
if headers:
for idx, value in enumerate(headers):
t.cell(0, idx).text = str(value)
row_offset = 1
for ridx, row in enumerate(rows):
for cidx, value in enumerate(row):
if cidx < total_cols:
t.cell(ridx + row_offset, cidx).text = str(value)
out = BytesIO()
doc.save(out)
return ToolResult(
success=True,
result={"message": f"DOCX created: {file_name}"},
file_base64=self._b64_from_bytes(out.getvalue()),
file_name=file_name,
file_mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
def _file_docx_update(self, args: Dict[str, Any]) -> ToolResult:
from docx import Document
src_b64 = args.get("file_base64")
operations = args.get("operations") or []
if not src_b64:
return ToolResult(success=False, result=None, error="file_base64 is required for docx_update")
if not isinstance(operations, list) or not operations:
return ToolResult(success=False, result=None, error="operations must be non-empty array")
file_name = self._sanitize_file_name(args.get("file_name"), "updated.docx", force_ext=".docx")
doc = Document(BytesIO(self._bytes_from_b64(src_b64)))
for op in operations:
if not isinstance(op, dict):
return ToolResult(success=False, result=None, error="Each operation must be object")
op_type = str(op.get("type") or "").strip().lower()
if op_type == "append_paragraph":
doc.add_paragraph(str(op.get("text") or ""))
elif op_type == "append_heading":
level = int(op.get("level") or 1)
level = max(1, min(level, 9))
doc.add_heading(str(op.get("text") or ""), level=level)
elif op_type == "replace_text":
old = str(op.get("old") or "")
new = str(op.get("new") or "")
if not old:
return ToolResult(success=False, result=None, error="replace_text requires old")
for p in doc.paragraphs:
if old in p.text:
p.text = p.text.replace(old, new)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
if old in cell.text:
cell.text = cell.text.replace(old, new)
elif op_type == "append_table":
headers = [str(h) for h in (op.get("headers") or [])]
rows_raw = op.get("rows") or []
rows = self._normalize_rows(rows_raw, headers=headers if headers else None)
if rows and not headers and isinstance(rows_raw[0], dict):
headers = list(rows_raw[0].keys())
rows = self._normalize_rows(rows_raw, headers=headers)
total_rows = len(rows) + (1 if headers else 0)
total_cols = len(headers) if headers else (len(rows[0]) if rows else 1)
t = doc.add_table(rows=max(total_rows, 1), cols=max(total_cols, 1))
row_offset = 0
if headers:
for idx, value in enumerate(headers):
t.cell(0, idx).text = str(value)
row_offset = 1
for ridx, row in enumerate(rows):
for cidx, value in enumerate(row):
if cidx < total_cols:
t.cell(ridx + row_offset, cidx).text = str(value)
else:
return ToolResult(success=False, result=None, error=f"Unsupported docx_update operation: {op_type}")
out = BytesIO()
doc.save(out)
return ToolResult(
success=True,
result={"message": f"DOCX updated: {file_name}"},
file_base64=self._b64_from_bytes(out.getvalue()),
file_name=file_name,
file_mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
def _file_pdf_merge(self, args: Dict[str, Any]) -> ToolResult:
from pypdf import PdfReader, PdfWriter
file_name = self._sanitize_file_name(args.get("file_name"), "merged.pdf", force_ext=".pdf")
files = args.get("files") or []
if not isinstance(files, list) or not files:
return ToolResult(success=False, result=None, error="files must be non-empty array for pdf_merge")
writer = PdfWriter()
page_count = 0
for item in files:
if not isinstance(item, dict) or not item.get("file_base64"):
return ToolResult(success=False, result=None, error="Each file entry must include file_base64")
reader = PdfReader(BytesIO(self._bytes_from_b64(item["file_base64"])))
for page in reader.pages:
writer.add_page(page)
page_count += 1
out = BytesIO()
writer.write(out)
return ToolResult(
success=True,
result={"message": f"PDF merged: {file_name} ({page_count} pages)"},
file_base64=self._b64_from_bytes(out.getvalue()),
file_name=file_name,
file_mime="application/pdf",
)
@staticmethod
def _parse_split_pages(pages: Any) -> Optional[List[int]]:
if not isinstance(pages, list) or not pages:
return None
parsed: List[int] = []
for p in pages:
idx = int(p)
if idx < 1:
return None
parsed.append(idx)
return sorted(set(parsed))
def _file_pdf_split(self, args: Dict[str, Any]) -> ToolResult:
from pypdf import PdfReader, PdfWriter
src_b64 = args.get("file_base64")
if not src_b64:
return ToolResult(success=False, result=None, error="file_base64 is required for pdf_split")
file_name = self._sanitize_file_name(args.get("file_name"), "split.zip", force_ext=".zip")
reader = PdfReader(BytesIO(self._bytes_from_b64(src_b64)))
total = len(reader.pages)
if total == 0:
return ToolResult(success=False, result=None, error="Input PDF has no pages")
groups = args.get("groups")
split_groups = []
if groups:
if not isinstance(groups, list):
return ToolResult(success=False, result=None, error="groups must be array")
for idx, grp in enumerate(groups, start=1):
if not isinstance(grp, dict):
return ToolResult(success=False, result=None, error="Each group must be object")
gname = self._sanitize_file_name(grp.get("file_name"), f"part_{idx}.pdf", force_ext=".pdf")
pages = self._parse_split_pages(grp.get("pages"))
if not pages:
return ToolResult(success=False, result=None, error=f"Invalid pages in group {idx}")
split_groups.append((gname, pages))
else:
split_groups = [(f"page_{i+1}.pdf", [i + 1]) for i in range(total)]
out = BytesIO()
with ZipFile(out, mode="w", compression=ZIP_DEFLATED) as zf:
for gname, pages in split_groups:
writer = PdfWriter()
for p in pages:
if p > total:
return ToolResult(success=False, result=None, error=f"Page out of range: {p} > {total}")
writer.add_page(reader.pages[p - 1])
part = BytesIO()
writer.write(part)
zf.writestr(gname, part.getvalue())
return ToolResult(
success=True,
result={"message": f"PDF split into {len(split_groups)} file(s): {file_name}"},
file_base64=self._b64_from_bytes(out.getvalue()),
file_name=file_name,
file_mime="application/zip",
)
def _file_pdf_fill(self, args: Dict[str, Any]) -> ToolResult:
from pypdf import PdfReader, PdfWriter
src_b64 = args.get("file_base64")
fields = args.get("fields") or {}
if not src_b64:
return ToolResult(success=False, result=None, error="file_base64 is required for pdf_fill")
if not isinstance(fields, dict) or not fields:
return ToolResult(success=False, result=None, error="fields must be a non-empty object")
file_name = self._sanitize_file_name(args.get("file_name"), "filled.pdf", force_ext=".pdf")
reader = PdfReader(BytesIO(self._bytes_from_b64(src_b64)))
writer = PdfWriter()
writer.append(reader)
filled = True
try:
for page in writer.pages:
writer.update_page_form_field_values(page, fields)
if hasattr(writer, "set_need_appearances_writer"):
writer.set_need_appearances_writer(True)
except Exception:
filled = False
out = BytesIO()
writer.write(out)
msg = f"PDF form filled: {file_name}" if filled else f"PDF has no fillable form fields, returned unchanged: {file_name}"
return ToolResult(
success=True,
result={"message": msg},
file_base64=self._b64_from_bytes(out.getvalue()),
file_name=file_name,
file_mime="application/pdf",
)
async def _memory_search(self, args: Dict, agent_id: str = None, chat_id: str = None, user_id: str = None) -> ToolResult:
"""Search in Qdrant vector memory using Router's memory_retrieval - PRIORITY 1"""
query = args.get("query")
try:
# Use Router's memory_retrieval pipeline directly (has Qdrant connection)
from memory_retrieval import memory_retrieval
if memory_retrieval and memory_retrieval.qdrant_client:
results = await memory_retrieval.search_memories(
query=query,
agent_id=agent_id or "helion",
chat_id=chat_id,
user_id=user_id,
limit=5
)
if results:
formatted = []
for r in results:
text = r.get("text", "")
score = r.get("score", 0)
mem_type = r.get("type", "memory")
if text:
formatted.append(f"• [{mem_type}] {text[:200]}... (релевантність: {score:.2f})")
if formatted:
return ToolResult(success=True, result=f"🧠 Знайдено в пам'яті:\n" + "\n".join(formatted))
return ToolResult(success=True, result="🧠 В моїй пам'яті немає інформації про це.")
else:
return ToolResult(success=True, result="🧠 Пам'ять недоступна, спробую web_search.")
except Exception as e:
logger.warning(f"Memory search error: {e}")
return ToolResult(success=True, result="🧠 Не вдалося перевірити пам'ять. Спробую інші джерела.")
async def _web_search(self, args: Dict) -> ToolResult:
"""Execute web search - PRIORITY 2 (use after memory_search)"""
query = args.get("query")
max_results = args.get("max_results", 5)
if not query:
return ToolResult(success=False, result=None, error="query is required")
try:
resp = await self.http_client.post(
f"{self.swapper_url}/web/search",
json={"query": query, "max_results": max_results}
)
if resp.status_code == 200:
data = resp.json()
results = data.get("results", []) or []
query_terms = {t for t in str(query).lower().replace("/", " ").replace("-", " ").split() if len(t) > 2}
trusted_domains = {
"wikipedia.org", "wikidata.org", "europa.eu", "fao.org", "who.int",
"worldbank.org", "oecd.org", "un.org", "gov.ua", "rada.gov.ua",
"kmu.gov.ua", "minagro.gov.ua", "agroportal.ua", "latifundist.com",
}
low_signal_domains = {
"pinterest.com", "tiktok.com", "instagram.com", "facebook.com",
"youtube.com", "yandex.", "vk.com",
}
def _extract_domain(url: str) -> str:
if not url:
return ""
u = url.lower().strip()
u = u.replace("https://", "").replace("http://", "")
u = u.split("/")[0]
if u.startswith("www."):
u = u[4:]
return u
def _overlap_score(title: str, snippet: str, url: str) -> int:
text = " ".join([title or "", snippet or "", url or ""]).lower()
score = 0
for t in query_terms:
if t in text:
score += 2
return score
ranked: List[Any] = []
for r in results:
title = str(r.get("title", "") or "")
snippet = str(r.get("snippet", "") or "")
url = str(r.get("url", "") or "")
domain = _extract_domain(url)
score = _overlap_score(title, snippet, url)
if any(x in domain for x in low_signal_domains):
score -= 2
if any(domain == d or domain.endswith("." + d) for d in trusted_domains):
score += 2
if not snippet:
score -= 1
if len(title.strip()) < 5:
score -= 1
ranked.append((score, r))
ranked.sort(key=lambda x: x[0], reverse=True)
selected = [item for _, item in ranked[:max_results]]
logger.info(f"🔎 web_search rerank: raw={len(results)} ranked={len(selected)} query='{query[:120]}'")
formatted = []
for r in selected:
formatted.append(
f"- {r.get('title', 'No title')}\n {r.get('snippet', '')}\n URL: {r.get('url', '')}"
)
return ToolResult(success=True, result="\n".join(formatted) if formatted else "Нічого не знайдено")
else:
return ToolResult(success=False, result=None, error=f"Search failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _web_extract(self, args: Dict) -> ToolResult:
"""Extract content from URL"""
url = args.get("url")
try:
resp = await self.http_client.post(
f"{self.swapper_url}/web/extract",
json={"url": url}
)
if resp.status_code == 200:
data = resp.json()
content = data.get("content", "")
# Truncate if too long
if len(content) > 4000:
content = content[:4000] + "\n... (текст обрізано)"
return ToolResult(success=True, result=content)
else:
return ToolResult(success=False, result=None, error=f"Extract failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _unload_ollama_models(self):
"""Unload all Ollama models to free VRAM for heavy operations like FLUX"""
ollama_url = os.getenv("OLLAMA_BASE_URL", "http://172.18.0.1:11434")
models_to_unload = ["qwen3:8b", "qwen3-vl:8b"]
for model in models_to_unload:
try:
await self.http_client.post(
f"{ollama_url}/api/generate",
json={"model": model, "keep_alive": 0},
timeout=5.0
)
logger.info(f"🧹 Unloaded Ollama model: {model}")
except Exception as e:
logger.debug(f"Could not unload {model}: {e}")
# Give GPU time to release memory
import asyncio
await asyncio.sleep(1)
async def _unload_flux(self):
"""Unload FLUX model after image generation to free VRAM"""
try:
# Try to unload flux-klein-4b model
await self.http_client.post(
f"{self.swapper_url}/image/models/flux-klein-4b/unload",
timeout=10.0
)
logger.info("🧹 Unloaded FLUX model from Swapper")
except Exception as e:
logger.debug(f"Could not unload FLUX: {e}")
async def _image_generate(self, args: Dict) -> ToolResult:
"""Backward-compatible image generation entrypoint routed to Comfy (NODE3)."""
if not args.get("prompt"):
return ToolResult(success=False, result=None, error="prompt is required")
comfy_args = dict(args)
comfy_args.setdefault("negative_prompt", "blurry, low quality, watermark")
comfy_args.setdefault("steps", 28)
comfy_args.setdefault("timeout_s", 180)
return await self._comfy_generate_image(comfy_args)
async def _poll_comfy_job(self, job_id: str, timeout_s: int = 180) -> Dict[str, Any]:
"""Poll Comfy Agent job status until terminal state or timeout."""
loop = asyncio.get_running_loop()
deadline = loop.time() + max(10, timeout_s)
delay = 1.0
last: Dict[str, Any] = {}
while loop.time() < deadline:
resp = await self.http_client.get(f"{self.comfy_agent_url}/status/{job_id}", timeout=30.0)
if resp.status_code != 200:
raise RuntimeError(f"Comfy status failed: {resp.status_code}")
data = resp.json()
last = data
status = (data.get("status") or "").lower()
if status in {"succeeded", "finished"}:
return data
if status in {"failed", "canceled", "cancelled", "expired"}:
return data
await asyncio.sleep(delay)
delay = min(delay * 1.5, 5.0)
raise TimeoutError(f"Comfy job timeout after {timeout_s}s (job_id={job_id})")
async def _comfy_generate_image(self, args: Dict) -> ToolResult:
"""Generate image via Comfy Agent on NODE3 and return URL when ready."""
prompt = args.get("prompt")
if not prompt:
return ToolResult(success=False, result=None, error="prompt is required")
payload = {
"prompt": prompt,
"negative_prompt": args.get("negative_prompt", "blurry, low quality, watermark"),
"width": int(args.get("width", 512)),
"height": int(args.get("height", 512)),
"steps": int(args.get("steps", 28)),
}
if args.get("seed") is not None:
payload["seed"] = int(args["seed"])
timeout_s = int(args.get("timeout_s", 180))
idem_key = args.get("idempotency_key") or f"router-{uuid.uuid4().hex}"
try:
resp = await self.http_client.post(
f"{self.comfy_agent_url}/generate/image",
json=payload,
headers={"Idempotency-Key": idem_key},
timeout=30.0,
)
if resp.status_code != 200:
return ToolResult(success=False, result=None, error=f"Comfy image request failed: {resp.status_code}")
created = resp.json()
job_id = created.get("job_id")
if not job_id:
return ToolResult(success=False, result=None, error="Comfy image request did not return job_id")
final = await self._poll_comfy_job(job_id, timeout_s=timeout_s)
status = (final.get("status") or "").lower()
if status in {"succeeded", "finished"}:
result_url = final.get("result_url")
if result_url:
return ToolResult(success=True, result=f"✅ Зображення згенеровано: {result_url}")
return ToolResult(success=True, result=f"✅ Зображення згенеровано. job_id={job_id}")
return ToolResult(success=False, result=None, error=final.get("error") or f"Comfy image failed (status={status})")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _comfy_generate_video(self, args: Dict) -> ToolResult:
"""Generate video via Comfy Agent on NODE3 and return URL when ready."""
prompt = args.get("prompt")
if not prompt:
return ToolResult(success=False, result=None, error="prompt is required")
payload = {
"prompt": prompt,
"seconds": int(args.get("seconds", 4)),
"fps": int(args.get("fps", 24)),
"steps": int(args.get("steps", 30)),
}
if args.get("seed") is not None:
payload["seed"] = int(args["seed"])
timeout_s = int(args.get("timeout_s", 300))
idem_key = args.get("idempotency_key") or f"router-{uuid.uuid4().hex}"
try:
resp = await self.http_client.post(
f"{self.comfy_agent_url}/generate/video",
json=payload,
headers={"Idempotency-Key": idem_key},
timeout=30.0,
)
if resp.status_code != 200:
return ToolResult(success=False, result=None, error=f"Comfy video request failed: {resp.status_code}")
created = resp.json()
job_id = created.get("job_id")
if not job_id:
return ToolResult(success=False, result=None, error="Comfy video request did not return job_id")
final = await self._poll_comfy_job(job_id, timeout_s=timeout_s)
status = (final.get("status") or "").lower()
if status in {"succeeded", "finished"}:
result_url = final.get("result_url")
if result_url:
return ToolResult(success=True, result=f"✅ Відео згенеровано: {result_url}")
return ToolResult(success=True, result=f"✅ Відео згенеровано. job_id={job_id}")
return ToolResult(success=False, result=None, error=final.get("error") or f"Comfy video failed (status={status})")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _graph_query(self, args: Dict, agent_id: str = None) -> ToolResult:
"""Query knowledge graph"""
query = args.get("query")
entity_type = args.get("entity_type")
# Simple natural language to Cypher conversion
cypher = f"""
MATCH (n)
WHERE toLower(n.name) CONTAINS toLower('{query}')
OR toLower(toString(n)) CONTAINS toLower('{query}')
RETURN labels(n)[0] as type, n.name as name, n.node_id as id
LIMIT 10
"""
if entity_type:
cypher = f"""
MATCH (n:{entity_type})
WHERE toLower(n.name) CONTAINS toLower('{query}')
RETURN n.name as name, n.node_id as id
LIMIT 10
"""
try:
# Execute via Router's graph endpoint
resp = await self.http_client.post(
"http://localhost:8000/v1/graph/query",
json={"query": cypher}
)
if resp.status_code == 200:
data = resp.json()
return ToolResult(success=True, result=json.dumps(data.get("results", []), ensure_ascii=False))
else:
return ToolResult(success=False, result=None, error=f"Graph query failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _remember_fact(self, args: Dict, agent_id: str = None, chat_id: str = None, user_id: str = None) -> ToolResult:
"""Store a fact in memory with strict args validation."""
if not isinstance(args, dict) or not args:
logger.warning("⚠️ remember_fact blocked: empty args")
return ToolResult(success=False, result=None, error="invalid_tool_args: remember_fact requires {fact: <non-empty string>}.")
fact_raw = args.get("fact")
if fact_raw is None:
fact_raw = args.get("text")
if not isinstance(fact_raw, str):
logger.warning("⚠️ remember_fact blocked: fact/text must be string")
return ToolResult(success=False, result=None, error="invalid_tool_args: fact/text must be string.")
fact = fact_raw.strip()
if not fact:
logger.warning("⚠️ remember_fact blocked: empty fact/text")
return ToolResult(success=False, result=None, error="invalid_tool_args: fact/text must be non-empty.")
if len(fact) > 2000:
logger.warning("⚠️ remember_fact blocked: fact too long (%s)", len(fact))
return ToolResult(success=False, result=None, error="invalid_tool_args: fact/text is too long (max 2000 chars).")
category = str(args.get("category") or "general").strip() or "general"
runtime_user_id = (str(user_id or "").strip() or str(args.get("user_id") or "").strip() or str(args.get("about") or "").strip())
if not runtime_user_id:
logger.warning("⚠️ remember_fact blocked: missing runtime user_id")
return ToolResult(success=False, result=None, error="invalid_tool_args: missing runtime user_id for memory write.")
fact_hash = hashlib.sha1(fact.encode("utf-8")).hexdigest()[:12]
fact_key = f"{category}_{fact_hash}"
try:
resp = await self.http_client.post(
"http://memory-service:8000/facts/upsert",
json={
"user_id": runtime_user_id,
"fact_key": fact_key,
"fact_value": fact,
"fact_value_json": {
"text": fact,
"category": category,
"about": runtime_user_id,
"agent_id": agent_id,
"chat_id": chat_id,
"source": "remember_fact_tool",
},
},
)
if resp.status_code in [200, 201]:
return ToolResult(success=True, result=f"✅ Запам'ятовано факт ({category})")
return ToolResult(success=False, result=None, error=f"memory_store_failed:{resp.status_code}:{resp.text[:160]}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _presentation_create(self, args: Dict) -> ToolResult:
"""Create a presentation via Presentation Renderer"""
title = args.get("title", "Презентація")
slides = args.get("slides", [])
brand_id = args.get("brand_id", "energyunion")
theme_version = args.get("theme_version", "v1.0.0")
language = args.get("language", "uk")
# Build SlideSpec
slidespec = {
"meta": {
"title": title,
"brand_id": brand_id,
"theme_version": theme_version,
"language": language
},
"slides": []
}
# Add title slide
slidespec["slides"].append({
"type": "title",
"title": title
})
# Add content slides
for slide in slides:
slide_obj = {
"type": "content",
"title": slide.get("title", ""),
"body": slide.get("content", "")
}
slidespec["slides"].append(slide_obj)
try:
renderer_url = os.getenv("PRESENTATION_RENDERER_URL", "http://presentation-renderer:9600")
resp = await self.http_client.post(
f"{renderer_url}/present/render",
json=slidespec,
timeout=120.0
)
if resp.status_code == 200:
data = resp.json()
job_id = data.get("job_id")
artifact_id = data.get("artifact_id")
return ToolResult(
success=True,
result=f"📊 Презентацію створено!\n\n🆔 Job ID: `{job_id}`\n📦 Artifact ID: `{artifact_id}`\n\nЩоб перевірити статус: використай presentation_status\nЩоб завантажити: використай presentation_download"
)
else:
error_text = resp.text[:200] if resp.text else "Unknown error"
return ToolResult(success=False, result=None, error=f"Render failed ({resp.status_code}): {error_text}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _presentation_status(self, args: Dict) -> ToolResult:
"""Check presentation job status"""
job_id = args.get("job_id")
try:
registry_url = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9700")
resp = await self.http_client.get(
f"{registry_url}/jobs/{job_id}",
timeout=10.0
)
if resp.status_code == 200:
data = resp.json()
status = data.get("status", "unknown")
artifact_id = data.get("artifact_id")
error = data.get("error_text", "")
status_emoji = {"queued": "", "running": "🔄", "done": "", "failed": ""}.get(status, "")
result = f"{status_emoji} Статус: **{status}**\n"
if artifact_id:
result += f"📦 Artifact ID: `{artifact_id}`\n"
if status == "done":
result += "\n✅ Презентація готова! Використай presentation_download щоб отримати файл."
if status == "failed" and error:
result += f"\n❌ Помилка: {error[:200]}"
return ToolResult(success=True, result=result)
elif resp.status_code == 404:
return ToolResult(success=False, result=None, error="Job not found")
else:
return ToolResult(success=False, result=None, error=f"Status check failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _presentation_download(self, args: Dict) -> ToolResult:
"""Get download link for presentation"""
artifact_id = args.get("artifact_id")
file_format = args.get("format", "pptx")
try:
registry_url = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9700")
resp = await self.http_client.get(
f"{registry_url}/artifacts/{artifact_id}/download?format={file_format}",
timeout=10.0,
follow_redirects=False
)
if resp.status_code in [200, 302, 307]:
# Check for signed URL in response or Location header
if resp.status_code in [302, 307]:
download_url = resp.headers.get("Location")
else:
data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {}
download_url = data.get("download_url") or data.get("url")
if download_url:
return ToolResult(
success=True,
result=f"📥 **Посилання для завантаження ({file_format.upper()}):**\n\n{download_url}\n\n⏰ Посилання дійсне 30 хвилин."
)
else:
# Direct binary response - artifact available
return ToolResult(
success=True,
result=f"✅ Файл {file_format.upper()} готовий! Завантажити можна через: {registry_url}/artifacts/{artifact_id}/download?format={file_format}"
)
elif resp.status_code == 404:
return ToolResult(success=False, result=None, error=f"Формат {file_format.upper()} ще не готовий. Спробуй пізніше.")
else:
return ToolResult(success=False, result=None, error=f"Download failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _crawl4ai_scrape(self, args: Dict) -> ToolResult:
"""Deep scrape a web page using Crawl4AI - PRIORITY 5"""
url = args.get("url")
extract_links = args.get("extract_links", True)
extract_images = args.get("extract_images", False)
if not url:
return ToolResult(success=False, result=None, error="URL is required")
try:
crawl4ai_url = os.getenv("CRAWL4AI_URL", "http://dagi-crawl4ai-node1:11235")
payload = {
"urls": [url],
"priority": 5,
"session_id": f"agent_scrape_{hash(url) % 10000}"
}
resp = await self.http_client.post(
f"{crawl4ai_url}/crawl",
json=payload,
timeout=60.0
)
if resp.status_code == 200:
data = resp.json()
results = data.get("results", []) if isinstance(data, dict) else []
if not results and isinstance(data, dict):
results = [data]
if results:
result = results[0] if isinstance(results, list) else results
markdown = result.get("markdown", "") or result.get("cleaned_html", "") or result.get("text", "")
title = result.get("title", url)
if len(markdown) > 3000:
markdown = markdown[:3000] + "... (скорочено)"
response_parts = [f"**{title}**", "", markdown]
if extract_links:
links = result.get("links", [])
if links:
response_parts.append("")
response_parts.append("**Посилання:**")
for link in links[:10]:
if isinstance(link, dict):
link_url = link.get("href", "")
else:
link_url = str(link)
if link_url:
response_parts.append(f"- {link_url}")
return ToolResult(success=True, result="\n".join(response_parts))
else:
return ToolResult(success=False, result=None, error="No content extracted")
else:
return ToolResult(success=False, result=None, error=f"Crawl failed: {resp.status_code}")
except Exception as e:
logger.error(f"Crawl4AI scrape failed: {e}")
return ToolResult(success=False, result=None, error=str(e))
async def _tts_speak(self, args: Dict) -> ToolResult:
"""Convert text to speech using Swapper TTS - PRIORITY 6"""
text = args.get("text")
language = args.get("language", "uk")
if not text:
return ToolResult(success=False, result=None, error="Text is required")
try:
if len(text) > 1000:
text = text[:1000]
resp = await self.http_client.post(
f"{self.swapper_url}/tts",
json={"text": text, "language": language},
timeout=60.0
)
if resp.status_code == 200:
data = resp.json()
audio_url = data.get("audio_url") or data.get("url")
if audio_url:
return ToolResult(success=True, result=f"Аудіо: {audio_url}")
else:
return ToolResult(success=True, result="TTS completed")
else:
return ToolResult(success=False, result=None, error=f"TTS failed: {resp.status_code}")
except Exception as e:
logger.error(f"TTS failed: {e}")
return ToolResult(success=False, result=None, error=str(e))
async def _market_data(self, args: Dict) -> ToolResult:
"""Query real-time market data from Market Data Service and SenpAI MD Consumer."""
symbol = str(args.get("symbol", "BTCUSDT")).upper()
query_type = str(args.get("query_type", "all")).lower()
md_url = os.getenv("MARKET_DATA_URL", "http://dagi-market-data-node1:8891")
consumer_url = os.getenv("SENPAI_CONSUMER_URL", "http://dagi-senpai-md-consumer-node1:8892")
results: Dict[str, Any] = {}
try:
async with httpx.AsyncClient(timeout=8.0) as client:
if query_type in ("price", "all"):
try:
resp = await client.get(f"{md_url}/latest", params={"symbol": symbol})
if resp.status_code == 200:
data = resp.json()
trade = data.get("latest_trade", {}) or {}
quote = data.get("latest_quote", {}) or {}
bid = quote.get("bid")
ask = quote.get("ask")
spread = None
if isinstance(bid, (int, float)) and isinstance(ask, (int, float)):
spread = round(ask - bid, 6)
results["price"] = {
"symbol": symbol,
"last_price": trade.get("price"),
"size": trade.get("size"),
"side": trade.get("side"),
"bid": bid,
"ask": ask,
"spread": spread,
"provider": trade.get("provider"),
"timestamp": trade.get("ts_recv"),
}
else:
results["price_error"] = f"market-data status={resp.status_code}"
except Exception as e:
results["price_error"] = str(e)
if query_type in ("features", "all"):
try:
resp = await client.get(f"{consumer_url}/features/latest", params={"symbol": symbol})
if resp.status_code == 200:
data = resp.json()
feats = data.get("features", {}) or {}
results["features"] = {
"symbol": symbol,
"mid_price": feats.get("mid"),
"spread_bps": round(float(feats.get("spread_bps", 0) or 0), 2),
"vwap_10s": round(float(feats.get("trade_vwap_10s", 0) or 0), 2),
"vwap_60s": round(float(feats.get("trade_vwap_60s", 0) or 0), 2),
"trade_count_10s": int(feats.get("trade_count_10s", 0) or 0),
"trade_volume_10s": round(float(feats.get("trade_volume_10s", 0) or 0), 4),
"return_10s_pct": round(float(feats.get("return_10s", 0) or 0) * 100, 4),
"realized_vol_60s_pct": round(float(feats.get("realized_vol_60s", 0) or 0) * 100, 6),
"latency_p50_ms": round(float(feats.get("latency_ms_p50", 0) or 0), 1),
"latency_p95_ms": round(float(feats.get("latency_ms_p95", 0) or 0), 1),
}
else:
results["features_error"] = f"senpai-consumer status={resp.status_code}"
except Exception as e:
results["features_error"] = str(e)
if not results:
return ToolResult(success=False, result=None, error=f"No market data for {symbol}")
return ToolResult(success=True, result=json.dumps(results, ensure_ascii=False))
except Exception as e:
logger.error(f"Market data tool error: {e}")
return ToolResult(success=False, result=None, error=str(e))
async def close(self):
await self.http_client.aclose()
def _strip_think_tags(text: str) -> str:
"""Remove <think>...</think> tags from DeepSeek responses."""
import re
text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
text = re.sub(r'<think>.*$', '', text, flags=re.DOTALL) # unclosed tag
return text.strip()
def format_tool_calls_for_response(tool_results: List[Dict], fallback_mode: str = "normal") -> str:
"""
Format tool results in human-friendly way - NOT raw data!
Args:
tool_results: List of tool execution results
fallback_mode: "normal" | "dsml_detected" | "empty_response"
"""
# Special handling for DSML detection - LLM tried to use tools but got confused
# If we have successful tool results, show them instead of generic fallback
if fallback_mode == "dsml_detected":
# Check if any tool succeeded with a useful result
if tool_results:
for tr in tool_results:
if tr.get("success") and tr.get("result"):
# Avoid dumping raw retrieval/search payloads to the user.
# These often look like "memory dumps" and are perceived as incorrect answers.
tool_name = (tr.get("name") or "").strip()
if tool_name in {"memory_search", "web_search", "web_extract", "web_read"}:
continue
result = str(tr.get("result", ""))
if result and len(result) > 10 and "error" not in result.lower():
# We have a useful tool result - use it!
if len(result) > 600:
return result[:600] + "..."
return result
# No useful tool results - give presence acknowledgment
return "Вибач, відповідь згенерувалась некоректно. Спробуй ще раз (коротше/конкретніше) або повтори питання одним реченням."
if not tool_results:
if fallback_mode == "empty_response":
return "Вибач, щось пішло не так. Спробуй ще раз."
return "Вибач, не вдалося виконати запит."
# Check what tools were used
tool_names = [tr.get("name", "") for tr in tool_results]
# Check if ANY tool succeeded
any_success = any(tr.get("success") for tr in tool_results)
if not any_success:
# All tools failed - give helpful message
errors = [tr.get("error", "unknown") for tr in tool_results if tr.get("error")]
if errors:
logger.warning(f"All tools failed: {errors}")
return "Вибач, виникла технічна проблема. Спробуй ще раз або переформулюй питання."
# Image generation - special handling
if "image_generate" in tool_names:
for tr in tool_results:
if tr.get("name") == "image_generate" and tr.get("success"):
return "✅ Зображення згенеровано!"
if "comfy_generate_image" in tool_names:
for tr in tool_results:
if tr.get("name") == "comfy_generate_image" and tr.get("success"):
return str(tr.get("result", "✅ Зображення згенеровано через ComfyUI"))
if "comfy_generate_video" in tool_names:
for tr in tool_results:
if tr.get("name") == "comfy_generate_video" and tr.get("success"):
return str(tr.get("result", "✅ Відео згенеровано через ComfyUI"))
# Web search - show actual results to user
if "web_search" in tool_names:
for tr in tool_results:
if tr.get("name") == "web_search":
if tr.get("success"):
result = tr.get("result", "")
if not result:
return "🔍 Не знайшов релевантної інформації в інтернеті."
# Parse and format results for user
lines = result.strip().split("\n")
formatted = ["🔍 **Результати пошуку:**\n"]
current_title = ""
current_url = ""
current_snippet = ""
count = 0
for line in lines:
line = line.strip()
if line.startswith("- ") and not line.startswith("- URL:"):
if current_title and count < 3: # Show max 3 results
formatted.append(f"**{count}. {current_title}**")
if current_snippet:
formatted.append(f" {current_snippet[:150]}...")
if current_url:
formatted.append(f" 🔗 {current_url}\n")
current_title = line[2:].strip()
current_snippet = ""
current_url = ""
count += 1
elif "URL:" in line:
current_url = line.split("URL:")[-1].strip()
elif line and not line.startswith("-"):
current_snippet = line
# Add last result
if current_title and count <= 3:
formatted.append(f"**{count}. {current_title}**")
if current_snippet:
formatted.append(f" {current_snippet[:150]}...")
if current_url:
formatted.append(f" 🔗 {current_url}")
if len(formatted) > 1:
return "\n".join(formatted)
else:
return "🔍 Не знайшов релевантної інформації в інтернеті."
else:
return "🔍 Пошук в інтернеті не вдався. Спробуй ще раз."
# Memory search
if "memory_search" in tool_names:
for tr in tool_results:
if tr.get("name") == "memory_search" and tr.get("success"):
result = tr.get("result", "")
if "немає інформації" in result.lower() or not result:
return "🧠 В моїй пам'яті немає інформації про це."
# Truncate if too long
if len(result) > 500:
return result[:500] + "..."
return result
# Graph query
if "graph_query" in tool_names:
for tr in tool_results:
if tr.get("name") == "graph_query" and tr.get("success"):
result = tr.get("result", "")
if not result or "не знайдено" in result.lower():
return "📊 В базі знань немає інформації про це."
if len(result) > 500:
return result[:500] + "..."
return result
# Default fallback - check if we have any result to show
for tr in tool_results:
if tr.get("success") and tr.get("result"):
result = str(tr.get("result", ""))
if result and len(result) > 10:
# We have something, show it
if len(result) > 400:
return result[:400] + "..."
return result
# Really nothing useful - be honest
return "Я обробив твій запит, але не знайшов корисної інформації. Можеш уточнити питання?"