"""
Tool Manager for Helion Agent
Implements OpenAI-compatible function calling for DeepSeek, Mistral, Grok
"""
import os
from agent_tools_config import get_agent_tools, is_tool_allowed
import json
import logging
import httpx
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# Tool definitions in OpenAI function calling format
# ORDER MATTERS: Memory/Graph tools first, then web search as fallback
TOOL_DEFINITIONS = [
# PRIORITY 1: Internal knowledge sources (use FIRST)
{
"type": "function",
"function": {
"name": "memory_search",
"description": "🔍 ПЕРШИЙ КРОК для пошуку! Шукає в моїй пам'яті: збережені факти, документи, розмови. ЗАВЖДИ використовуй спочатку перед web_search!",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Що шукати в пам'яті"
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "graph_query",
"description": "🔍 Пошук в Knowledge Graph - зв'язки між проєктами, людьми, темами Energy Union. Використовуй для питань про проєкти, партнерів, технології.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Що шукати (назва проєкту, людини, теми)"
},
"entity_type": {
"type": "string",
"enum": ["User", "Topic", "Project", "Fact"],
"description": "Тип сутності для пошуку"
}
},
"required": ["query"]
}
}
},
# PRIORITY 2: Web search (use ONLY if memory/graph don't have info)
{
"type": "function",
"function": {
"name": "web_search",
"description": "🌐 Пошук в інтернеті. Використовуй ТІЛЬКИ якщо memory_search і graph_query не знайшли потрібної інформації!",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Пошуковий запит"
},
"max_results": {
"type": "integer",
"description": "Максимальна кількість результатів (1-10)",
"default": 5
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "web_extract",
"description": "Витягнути текстовий контент з веб-сторінки за URL",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL сторінки для читання"
}
},
"required": ["url"]
}
}
},
# PRIORITY 3: Generation tools
{
"type": "function",
"function": {
"name": "image_generate",
"description": "🎨 Згенерувати зображення за текстовим описом (FLUX)",
"parameters": {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Опис зображення для генерації (англійською краще)"
},
"width": {
"type": "integer",
"description": "Ширина зображення",
"default": 512
},
"height": {
"type": "integer",
"description": "Висота зображення",
"default": 512
}
},
"required": ["prompt"]
}
}
},
{
"type": "function",
"function": {
"name": "remember_fact",
"description": "Запам'ятати важливий факт про користувача або тему",
"parameters": {
"type": "object",
"properties": {
"fact": {
"type": "string",
"description": "Факт для запам'ятовування"
},
"about": {
"type": "string",
"description": "Про кого/що цей факт (username або тема)"
},
"category": {
"type": "string",
"enum": ["personal", "technical", "preference", "project"],
"description": "Категорія факту"
}
},
"required": ["fact", "about"]
}
}
},
# PRIORITY 4: Document/Presentation tools
{
"type": "function",
"function": {
"name": "presentation_create",
"description": "📊 Створити презентацію PowerPoint. Використовуй коли користувач просить 'створи презентацію', 'зроби презентацію', 'підготуй слайди'.",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Назва презентації"
},
"slides": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "Заголовок слайду"},
"content": {"type": "string", "description": "Контент слайду (markdown)"}
}
},
"description": "Масив слайдів: [{title, content}]"
},
"brand_id": {
"type": "string",
"description": "ID бренду для стилю (energyunion, greenfood, nutra)",
"default": "energyunion"
},
"theme_version": {
"type": "string",
"description": "Версія теми",
"default": "v1.0.0"
},
"language": {
"type": "string",
"enum": ["uk", "en", "ru"],
"description": "Мова презентації",
"default": "uk"
}
},
"required": ["title", "slides"]
}
}
},
{
"type": "function",
"function": {
"name": "presentation_status",
"description": "📋 Перевірити статус створення презентації за job_id",
"parameters": {
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "ID завдання рендерингу"
}
},
"required": ["job_id"]
}
}
},
{
"type": "function",
"function": {
"name": "presentation_download",
"description": "📥 Отримати посилання на готову презентацію за artifact_id",
"parameters": {
"type": "object",
"properties": {
"artifact_id": {
"type": "string",
"description": "ID артефакту презентації"
},
"format": {
"type": "string",
"enum": ["pptx", "pdf"],
"description": "Формат файлу",
"default": "pptx"
}
},
"required": ["artifact_id"]
}
}
},
# PRIORITY 5: Web Scraping tools
{
"type": "function",
"function": {
"name": "crawl4ai_scrape",
"description": "🕷️ Глибокий скрейпінг веб-сторінки через Crawl4AI. Витягує повний контент, структуровані дані, медіа. Використовуй для детального аналізу сайтів.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL сторінки для скрейпінгу"
},
"extract_links": {
"type": "boolean",
"description": "Витягувати посилання зі сторінки",
"default": True
},
"extract_images": {
"type": "boolean",
"description": "Витягувати зображення",
"default": False
}
},
"required": ["url"]
}
}
},
# PRIORITY 6: TTS tools
{
"type": "function",
"function": {
"name": "tts_speak",
"description": "🔊 Перетворити текст на аудіо (Text-to-Speech). Повертає аудіо файл. Використовуй коли користувач просить озвучити текст.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Текст для озвучення"
},
"language": {
"type": "string",
"enum": ["uk", "en", "ru"],
"description": "Мова озвучення",
"default": "uk"
}
},
"required": ["text"]
}
}
}
]
@dataclass
class ToolResult:
"""Result of tool execution"""
success: bool
result: Any
error: Optional[str] = None
image_base64: Optional[str] = None # For image generation results
class ToolManager:
"""Manages tool execution for the agent"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.http_client = httpx.AsyncClient(timeout=60.0)
self.swapper_url = os.getenv("SWAPPER_URL", "http://swapper-service:8890")
self.tools_config = self._load_tools_config()
def _load_tools_config(self) -> Dict[str, Dict]:
"""Load tool endpoints from config"""
tools = {}
agent_config = self.config.get("agents", {}).get("helion", {})
for tool in agent_config.get("tools", []):
if "endpoint" in tool:
tools[tool["id"]] = {
"endpoint": tool["endpoint"],
"method": tool.get("method", "POST")
}
return tools
def get_tool_definitions(self, agent_id: str = None) -> List[Dict]:
"""Get tool definitions for function calling, filtered by agent permissions"""
if not agent_id:
return TOOL_DEFINITIONS
# Get allowed tools for this agent
allowed_tools = get_agent_tools(agent_id)
# Filter tool definitions
filtered = []
for tool_def in TOOL_DEFINITIONS:
tool_name = tool_def.get("function", {}).get("name")
if tool_name in allowed_tools:
filtered.append(tool_def)
tool_names = [t.get("function", {}).get("name") for t in filtered]
logger.debug(f"Agent {agent_id} has {len(filtered)} tools: {tool_names}")
return filtered
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any], agent_id: str = None) -> ToolResult:
"""Execute a tool and return result. Optionally checks agent permissions."""
logger.info(f"🔧 Executing tool: {tool_name} for agent={agent_id} with args: {arguments}")
# Check agent permission if agent_id provided
if agent_id and not is_tool_allowed(agent_id, tool_name):
logger.warning(f"⚠️ Tool {tool_name} not allowed for agent {agent_id}")
return ToolResult(success=False, result=None, error=f"Tool {tool_name} not available for this agent")
try:
# Priority 1: Memory/Knowledge tools
if tool_name == "memory_search":
return await self._memory_search(arguments, agent_id=agent_id)
elif tool_name == "graph_query":
return await self._graph_query(arguments, agent_id=agent_id)
# Priority 2: Web tools
elif tool_name == "web_search":
return await self._web_search(arguments)
elif tool_name == "web_extract":
return await self._web_extract(arguments)
elif tool_name == "image_generate":
return await self._image_generate(arguments)
elif tool_name == "remember_fact":
return await self._remember_fact(arguments)
# Priority 4: Presentation tools
elif tool_name == "presentation_create":
return await self._presentation_create(arguments)
elif tool_name == "presentation_status":
return await self._presentation_status(arguments)
elif tool_name == "presentation_download":
return await self._presentation_download(arguments)
# Priority 5: Web scraping tools
elif tool_name == "crawl4ai_scrape":
return await self._crawl4ai_scrape(arguments)
# Priority 6: TTS tools
elif tool_name == "tts_speak":
return await self._tts_speak(arguments)
else:
return ToolResult(success=False, result=None, error=f"Unknown tool: {tool_name}")
except Exception as e:
logger.error(f"Tool execution failed: {e}")
return ToolResult(success=False, result=None, error=str(e))
async def _memory_search(self, args: Dict, agent_id: str = None) -> ToolResult:
"""Search in Qdrant vector memory using Router's memory_retrieval - PRIORITY 1"""
query = args.get("query")
try:
# Use Router's memory_retrieval pipeline directly (has Qdrant connection)
from memory_retrieval import memory_retrieval
if memory_retrieval and memory_retrieval.qdrant_client:
results = await memory_retrieval.search_memories(
query=query,
agent_id=agent_id or "helion",
limit=5
)
if results:
formatted = []
for r in results:
text = r.get("text", "")
score = r.get("score", 0)
mem_type = r.get("type", "memory")
if text:
formatted.append(f"• [{mem_type}] {text[:200]}... (релевантність: {score:.2f})")
if formatted:
return ToolResult(success=True, result=f"🧠 Знайдено в пам'яті:\n" + "\n".join(formatted))
return ToolResult(success=True, result="🧠 В моїй пам'яті немає інформації про це.")
else:
return ToolResult(success=True, result="🧠 Пам'ять недоступна, спробую web_search.")
except Exception as e:
logger.warning(f"Memory search error: {e}")
return ToolResult(success=True, result="🧠 Не вдалося перевірити пам'ять. Спробую інші джерела.")
async def _web_search(self, args: Dict) -> ToolResult:
"""Execute web search - PRIORITY 2 (use after memory_search)"""
query = args.get("query")
max_results = args.get("max_results", 5)
try:
resp = await self.http_client.post(
f"{self.swapper_url}/web/search",
json={"query": query, "max_results": max_results}
)
if resp.status_code == 200:
data = resp.json()
results = data.get("results", [])
# Format results for LLM
formatted = []
for r in results[:max_results]:
formatted.append(f"- {r.get('title', 'No title')}\n {r.get('snippet', '')}\n URL: {r.get('url', '')}")
return ToolResult(success=True, result="\n".join(formatted) if formatted else "Нічого не знайдено")
else:
return ToolResult(success=False, result=None, error=f"Search failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _web_extract(self, args: Dict) -> ToolResult:
"""Extract content from URL"""
url = args.get("url")
try:
resp = await self.http_client.post(
f"{self.swapper_url}/web/extract",
json={"url": url}
)
if resp.status_code == 200:
data = resp.json()
content = data.get("content", "")
# Truncate if too long
if len(content) > 4000:
content = content[:4000] + "\n... (текст обрізано)"
return ToolResult(success=True, result=content)
else:
return ToolResult(success=False, result=None, error=f"Extract failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _unload_ollama_models(self):
"""Unload all Ollama models to free VRAM for heavy operations like FLUX"""
ollama_url = os.getenv("OLLAMA_BASE_URL", "http://172.18.0.1:11434")
models_to_unload = ["qwen3:8b", "qwen3-vl:8b"]
for model in models_to_unload:
try:
await self.http_client.post(
f"{ollama_url}/api/generate",
json={"model": model, "keep_alive": 0},
timeout=5.0
)
logger.info(f"🧹 Unloaded Ollama model: {model}")
except Exception as e:
logger.debug(f"Could not unload {model}: {e}")
# Give GPU time to release memory
import asyncio
await asyncio.sleep(1)
async def _unload_flux(self):
"""Unload FLUX model after image generation to free VRAM"""
try:
# Try to unload flux-klein-4b model
await self.http_client.post(
f"{self.swapper_url}/image/models/flux-klein-4b/unload",
timeout=10.0
)
logger.info("🧹 Unloaded FLUX model from Swapper")
except Exception as e:
logger.debug(f"Could not unload FLUX: {e}")
async def _image_generate(self, args: Dict) -> ToolResult:
"""Generate image with VRAM management"""
prompt = args.get("prompt")
# Use smaller sizes to fit in VRAM (20GB GPU shared with LLM)
width = min(args.get("width", 512), 512)
height = min(args.get("height", 512), 512)
try:
# Step 1: Unload Ollama models to free VRAM for FLUX (~15GB needed)
logger.info("🔄 Preparing VRAM for FLUX image generation...")
await self._unload_ollama_models()
# Step 2: Generate image
resp = await self.http_client.post(
f"{self.swapper_url}/image/generate",
json={"prompt": prompt, "width": width, "height": height, "num_inference_steps": 8},
timeout=180.0 # FLUX needs time
)
if resp.status_code == 200:
data = resp.json()
image_base64 = data.get("image_base64")
image_url = data.get("image_url") or data.get("url")
# Step 3: Unload FLUX to free VRAM for other models (LLM, Vision)
logger.info("🔄 Image generated, unloading FLUX to free VRAM...")
await self._unload_flux()
if image_base64:
# Return base64 image for Gateway to send
return ToolResult(
success=True,
result="✅ Зображення згенеровано",
image_base64=image_base64
)
elif image_url:
return ToolResult(
success=True,
result=f"✅ Зображення згенеровано: {image_url}",
image_base64=None
)
else:
return ToolResult(
success=True,
result="✅ Зображення згенеровано (формат невідомий)",
image_base64=None
)
else:
# Also unload FLUX on failure to free VRAM
await self._unload_flux()
return ToolResult(success=False, result=None, error=f"Generation failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _graph_query(self, args: Dict, agent_id: str = None) -> ToolResult:
"""Query knowledge graph"""
query = args.get("query")
entity_type = args.get("entity_type")
# Simple natural language to Cypher conversion
cypher = f"""
MATCH (n)
WHERE toLower(n.name) CONTAINS toLower('{query}')
OR toLower(toString(n)) CONTAINS toLower('{query}')
RETURN labels(n)[0] as type, n.name as name, n.node_id as id
LIMIT 10
"""
if entity_type:
cypher = f"""
MATCH (n:{entity_type})
WHERE toLower(n.name) CONTAINS toLower('{query}')
RETURN n.name as name, n.node_id as id
LIMIT 10
"""
try:
# Execute via Router's graph endpoint
resp = await self.http_client.post(
"http://localhost:8000/v1/graph/query",
json={"query": cypher}
)
if resp.status_code == 200:
data = resp.json()
return ToolResult(success=True, result=json.dumps(data.get("results", []), ensure_ascii=False))
else:
return ToolResult(success=False, result=None, error=f"Graph query failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _remember_fact(self, args: Dict) -> ToolResult:
"""Store a fact in memory"""
fact = args.get("fact")
about = args.get("about")
category = args.get("category", "general")
try:
# Store via Memory Service
resp = await self.http_client.post(
"http://memory-service:8000/facts/upsert",
json={
"user_id": about,
"fact_key": f"{category}_{hash(fact) % 10000}",
"fact_value": fact,
"fact_value_json": {"text": fact, "category": category, "about": about}
}
)
if resp.status_code in [200, 201]:
return ToolResult(success=True, result=f"✅ Запам'ятовано факт про {about}")
else:
return ToolResult(success=False, result=None, error=f"Memory store failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _presentation_create(self, args: Dict) -> ToolResult:
"""Create a presentation via Presentation Renderer"""
title = args.get("title", "Презентація")
slides = args.get("slides", [])
brand_id = args.get("brand_id", "energyunion")
theme_version = args.get("theme_version", "v1.0.0")
language = args.get("language", "uk")
# Build SlideSpec
slidespec = {
"meta": {
"title": title,
"brand_id": brand_id,
"theme_version": theme_version,
"language": language
},
"slides": []
}
# Add title slide
slidespec["slides"].append({
"type": "title",
"title": title
})
# Add content slides
for slide in slides:
slide_obj = {
"type": "content",
"title": slide.get("title", ""),
"body": slide.get("content", "")
}
slidespec["slides"].append(slide_obj)
try:
renderer_url = os.getenv("PRESENTATION_RENDERER_URL", "http://presentation-renderer:9600")
resp = await self.http_client.post(
f"{renderer_url}/present/render",
json=slidespec,
timeout=120.0
)
if resp.status_code == 200:
data = resp.json()
job_id = data.get("job_id")
artifact_id = data.get("artifact_id")
return ToolResult(
success=True,
result=f"📊 Презентацію створено!\n\n🆔 Job ID: `{job_id}`\n📦 Artifact ID: `{artifact_id}`\n\nЩоб перевірити статус: використай presentation_status\nЩоб завантажити: використай presentation_download"
)
else:
error_text = resp.text[:200] if resp.text else "Unknown error"
return ToolResult(success=False, result=None, error=f"Render failed ({resp.status_code}): {error_text}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _presentation_status(self, args: Dict) -> ToolResult:
"""Check presentation job status"""
job_id = args.get("job_id")
try:
registry_url = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9700")
resp = await self.http_client.get(
f"{registry_url}/jobs/{job_id}",
timeout=10.0
)
if resp.status_code == 200:
data = resp.json()
status = data.get("status", "unknown")
artifact_id = data.get("artifact_id")
error = data.get("error_text", "")
status_emoji = {"queued": "⏳", "running": "🔄", "done": "✅", "failed": "❌"}.get(status, "❓")
result = f"{status_emoji} Статус: **{status}**\n"
if artifact_id:
result += f"📦 Artifact ID: `{artifact_id}`\n"
if status == "done":
result += "\n✅ Презентація готова! Використай presentation_download щоб отримати файл."
if status == "failed" and error:
result += f"\n❌ Помилка: {error[:200]}"
return ToolResult(success=True, result=result)
elif resp.status_code == 404:
return ToolResult(success=False, result=None, error="Job not found")
else:
return ToolResult(success=False, result=None, error=f"Status check failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _presentation_download(self, args: Dict) -> ToolResult:
"""Get download link for presentation"""
artifact_id = args.get("artifact_id")
file_format = args.get("format", "pptx")
try:
registry_url = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9700")
resp = await self.http_client.get(
f"{registry_url}/artifacts/{artifact_id}/download?format={file_format}",
timeout=10.0,
follow_redirects=False
)
if resp.status_code in [200, 302, 307]:
# Check for signed URL in response or Location header
if resp.status_code in [302, 307]:
download_url = resp.headers.get("Location")
else:
data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {}
download_url = data.get("download_url") or data.get("url")
if download_url:
return ToolResult(
success=True,
result=f"📥 **Посилання для завантаження ({file_format.upper()}):**\n\n{download_url}\n\n⏰ Посилання дійсне 30 хвилин."
)
else:
# Direct binary response - artifact available
return ToolResult(
success=True,
result=f"✅ Файл {file_format.upper()} готовий! Завантажити можна через: {registry_url}/artifacts/{artifact_id}/download?format={file_format}"
)
elif resp.status_code == 404:
return ToolResult(success=False, result=None, error=f"Формат {file_format.upper()} ще не готовий. Спробуй пізніше.")
else:
return ToolResult(success=False, result=None, error=f"Download failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _crawl4ai_scrape(self, args: Dict) -> ToolResult:
"""Deep scrape a web page using Crawl4AI - PRIORITY 5"""
url = args.get("url")
extract_links = args.get("extract_links", True)
extract_images = args.get("extract_images", False)
if not url:
return ToolResult(success=False, result=None, error="URL is required")
try:
crawl4ai_url = os.getenv("CRAWL4AI_URL", "http://dagi-crawl4ai-node1:11235")
payload = {
"urls": [url],
"priority": 5,
"session_id": f"agent_scrape_{hash(url) % 10000}"
}
resp = await self.http_client.post(
f"{crawl4ai_url}/crawl",
json=payload,
timeout=60.0
)
if resp.status_code == 200:
data = resp.json()
results = data.get("results", []) if isinstance(data, dict) else []
if not results and isinstance(data, dict):
results = [data]
if results:
result = results[0] if isinstance(results, list) else results
markdown = result.get("markdown", "") or result.get("cleaned_html", "") or result.get("text", "")
title = result.get("title", url)
if len(markdown) > 3000:
markdown = markdown[:3000] + "... (скорочено)"
response_parts = [f"**{title}**", "", markdown]
if extract_links:
links = result.get("links", [])
if links:
response_parts.append("")
response_parts.append("**Посилання:**")
for link in links[:10]:
if isinstance(link, dict):
link_url = link.get("href", "")
else:
link_url = str(link)
if link_url:
response_parts.append(f"- {link_url}")
return ToolResult(success=True, result="\n".join(response_parts))
else:
return ToolResult(success=False, result=None, error="No content extracted")
else:
return ToolResult(success=False, result=None, error=f"Crawl failed: {resp.status_code}")
except Exception as e:
logger.error(f"Crawl4AI scrape failed: {e}")
return ToolResult(success=False, result=None, error=str(e))
async def _tts_speak(self, args: Dict) -> ToolResult:
"""Convert text to speech using Swapper TTS - PRIORITY 6"""
text = args.get("text")
language = args.get("language", "uk")
if not text:
return ToolResult(success=False, result=None, error="Text is required")
try:
if len(text) > 1000:
text = text[:1000]
resp = await self.http_client.post(
f"{self.swapper_url}/tts",
json={"text": text, "language": language},
timeout=60.0
)
if resp.status_code == 200:
data = resp.json()
audio_url = data.get("audio_url") or data.get("url")
if audio_url:
return ToolResult(success=True, result=f"Аудіо: {audio_url}")
else:
return ToolResult(success=True, result="TTS completed")
else:
return ToolResult(success=False, result=None, error=f"TTS failed: {resp.status_code}")
except Exception as e:
logger.error(f"TTS failed: {e}")
return ToolResult(success=False, result=None, error=str(e))
async def close(self):
await self.http_client.aclose()
def _strip_think_tags(text: str) -> str:
"""Remove ... tags from DeepSeek responses."""
import re
text = re.sub(r'.*?', '', text, flags=re.DOTALL)
text = re.sub(r'.*$', '', text, flags=re.DOTALL) # unclosed tag
return text.strip()
def format_tool_calls_for_response(tool_results: List[Dict], fallback_mode: str = "normal") -> str:
"""
Format tool results in human-friendly way - NOT raw data!
Args:
tool_results: List of tool execution results
fallback_mode: "normal" | "dsml_detected" | "empty_response"
"""
# Special handling for DSML detection - LLM tried to use tools but got confused
# If we have successful tool results, show them instead of generic fallback
if fallback_mode == "dsml_detected":
# Check if any tool succeeded with a useful result
if tool_results:
for tr in tool_results:
if tr.get("success") and tr.get("result"):
# Avoid dumping raw retrieval/search payloads to the user.
# These often look like "memory dumps" and are perceived as incorrect answers.
tool_name = (tr.get("name") or "").strip()
if tool_name in {"memory_search", "web_search", "web_extract", "web_read"}:
continue
result = str(tr.get("result", ""))
if result and len(result) > 10 and "error" not in result.lower():
# We have a useful tool result - use it!
if len(result) > 600:
return result[:600] + "..."
return result
# No useful tool results - give presence acknowledgment
return "Вибач, відповідь згенерувалась некоректно. Спробуй ще раз (коротше/конкретніше) або повтори питання одним реченням."
if not tool_results:
if fallback_mode == "empty_response":
return "Вибач, щось пішло не так. Спробуй ще раз."
return "Вибач, не вдалося виконати запит."
# Check what tools were used
tool_names = [tr.get("name", "") for tr in tool_results]
# Check if ANY tool succeeded
any_success = any(tr.get("success") for tr in tool_results)
if not any_success:
# All tools failed - give helpful message
errors = [tr.get("error", "unknown") for tr in tool_results if tr.get("error")]
if errors:
logger.warning(f"All tools failed: {errors}")
return "Вибач, виникла технічна проблема. Спробуй ще раз або переформулюй питання."
# Image generation - special handling
if "image_generate" in tool_names:
for tr in tool_results:
if tr.get("name") == "image_generate" and tr.get("success"):
return "✅ Зображення згенеровано!"
# Web search - show actual results to user
if "web_search" in tool_names:
for tr in tool_results:
if tr.get("name") == "web_search":
if tr.get("success"):
result = tr.get("result", "")
if not result:
return "🔍 Не знайшов релевантної інформації в інтернеті."
# Parse and format results for user
lines = result.strip().split("\n")
formatted = ["🔍 **Результати пошуку:**\n"]
current_title = ""
current_url = ""
current_snippet = ""
count = 0
for line in lines:
line = line.strip()
if line.startswith("- ") and not line.startswith("- URL:"):
if current_title and count < 3: # Show max 3 results
formatted.append(f"**{count}. {current_title}**")
if current_snippet:
formatted.append(f" {current_snippet[:150]}...")
if current_url:
formatted.append(f" 🔗 {current_url}\n")
current_title = line[2:].strip()
current_snippet = ""
current_url = ""
count += 1
elif "URL:" in line:
current_url = line.split("URL:")[-1].strip()
elif line and not line.startswith("-"):
current_snippet = line
# Add last result
if current_title and count <= 3:
formatted.append(f"**{count}. {current_title}**")
if current_snippet:
formatted.append(f" {current_snippet[:150]}...")
if current_url:
formatted.append(f" 🔗 {current_url}")
if len(formatted) > 1:
return "\n".join(formatted)
else:
return "🔍 Не знайшов релевантної інформації в інтернеті."
else:
return "🔍 Пошук в інтернеті не вдався. Спробуй ще раз."
# Memory search
if "memory_search" in tool_names:
for tr in tool_results:
if tr.get("name") == "memory_search" and tr.get("success"):
result = tr.get("result", "")
if "немає інформації" in result.lower() or not result:
return "🧠 В моїй пам'яті немає інформації про це."
# Truncate if too long
if len(result) > 500:
return result[:500] + "..."
return result
# Graph query
if "graph_query" in tool_names:
for tr in tool_results:
if tr.get("name") == "graph_query" and tr.get("success"):
result = tr.get("result", "")
if not result or "не знайдено" in result.lower():
return "📊 В базі знань немає інформації про це."
if len(result) > 500:
return result[:500] + "..."
return result
# Default fallback - check if we have any result to show
for tr in tool_results:
if tr.get("success") and tr.get("result"):
result = str(tr.get("result", ""))
if result and len(result) > 10:
# We have something, show it
if len(result) > 400:
return result[:400] + "..."
return result
# Really nothing useful - be honest
return "Я обробив твій запит, але не знайшов корисної інформації. Можеш уточнити питання?"