Files
microdao-daarion/services/router/tool_manager.py
Apple acceac6929 fix: helion string literal + memory brief anti-echo in Router
- Fixed unquoted `helion` variable reference to string literal `"helion"`
  in tool_manager.py search_memories fallback
- Replaced `[Контекст пам'яті]` with `[INTERNAL MEMORY - do NOT repeat
  to user]` in all 3 injection points in main.py
- Verified: Senpai now responds without Helion contamination or memory
  brief leaking

Tested and deployed on NODE1.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 10:05:25 -08:00

966 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Tool Manager for Helion Agent
Implements OpenAI-compatible function calling for DeepSeek, Mistral, Grok
"""
import os
from agent_tools_config import get_agent_tools, is_tool_allowed
import json
import logging
import httpx
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# Tool definitions in OpenAI function calling format
# ORDER MATTERS: Memory/Graph tools first, then web search as fallback
TOOL_DEFINITIONS = [
# PRIORITY 1: Internal knowledge sources (use FIRST)
{
"type": "function",
"function": {
"name": "memory_search",
"description": "🔍 ПЕРШИЙ КРОК для пошуку! Шукає в моїй пам'яті: збережені факти, документи, розмови. ЗАВЖДИ використовуй спочатку перед web_search!",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Що шукати в пам'яті"
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "graph_query",
"description": "🔍 Пошук в Knowledge Graph - зв'язки між проєктами, людьми, темами Energy Union. Використовуй для питань про проєкти, партнерів, технології.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Що шукати (назва проєкту, людини, теми)"
},
"entity_type": {
"type": "string",
"enum": ["User", "Topic", "Project", "Fact"],
"description": "Тип сутності для пошуку"
}
},
"required": ["query"]
}
}
},
# PRIORITY 2: Web search (use ONLY if memory/graph don't have info)
{
"type": "function",
"function": {
"name": "web_search",
"description": "🌐 Пошук в інтернеті. Використовуй ТІЛЬКИ якщо memory_search і graph_query не знайшли потрібної інформації!",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Пошуковий запит"
},
"max_results": {
"type": "integer",
"description": "Максимальна кількість результатів (1-10)",
"default": 5
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "web_extract",
"description": "Витягнути текстовий контент з веб-сторінки за URL",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL сторінки для читання"
}
},
"required": ["url"]
}
}
},
# PRIORITY 3: Generation tools
{
"type": "function",
"function": {
"name": "image_generate",
"description": "🎨 Згенерувати зображення за текстовим описом (FLUX)",
"parameters": {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Опис зображення для генерації (англійською краще)"
},
"width": {
"type": "integer",
"description": "Ширина зображення",
"default": 512
},
"height": {
"type": "integer",
"description": "Висота зображення",
"default": 512
}
},
"required": ["prompt"]
}
}
},
{
"type": "function",
"function": {
"name": "remember_fact",
"description": "Запам'ятати важливий факт про користувача або тему",
"parameters": {
"type": "object",
"properties": {
"fact": {
"type": "string",
"description": "Факт для запам'ятовування"
},
"about": {
"type": "string",
"description": "Про кого/що цей факт (username або тема)"
},
"category": {
"type": "string",
"enum": ["personal", "technical", "preference", "project"],
"description": "Категорія факту"
}
},
"required": ["fact", "about"]
}
}
},
# PRIORITY 4: Document/Presentation tools
{
"type": "function",
"function": {
"name": "presentation_create",
"description": "📊 Створити презентацію PowerPoint. Використовуй коли користувач просить 'створи презентацію', 'зроби презентацію', 'підготуй слайди'.",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Назва презентації"
},
"slides": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "Заголовок слайду"},
"content": {"type": "string", "description": "Контент слайду (markdown)"}
}
},
"description": "Масив слайдів: [{title, content}]"
},
"brand_id": {
"type": "string",
"description": "ID бренду для стилю (energyunion, greenfood, nutra)",
"default": "energyunion"
},
"theme_version": {
"type": "string",
"description": "Версія теми",
"default": "v1.0.0"
},
"language": {
"type": "string",
"enum": ["uk", "en", "ru"],
"description": "Мова презентації",
"default": "uk"
}
},
"required": ["title", "slides"]
}
}
},
{
"type": "function",
"function": {
"name": "presentation_status",
"description": "📋 Перевірити статус створення презентації за job_id",
"parameters": {
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "ID завдання рендерингу"
}
},
"required": ["job_id"]
}
}
},
{
"type": "function",
"function": {
"name": "presentation_download",
"description": "📥 Отримати посилання на готову презентацію за artifact_id",
"parameters": {
"type": "object",
"properties": {
"artifact_id": {
"type": "string",
"description": "ID артефакту презентації"
},
"format": {
"type": "string",
"enum": ["pptx", "pdf"],
"description": "Формат файлу",
"default": "pptx"
}
},
"required": ["artifact_id"]
}
}
},
# PRIORITY 5: Web Scraping tools
{
"type": "function",
"function": {
"name": "crawl4ai_scrape",
"description": "🕷️ Глибокий скрейпінг веб-сторінки через Crawl4AI. Витягує повний контент, структуровані дані, медіа. Використовуй для детального аналізу сайтів.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL сторінки для скрейпінгу"
},
"extract_links": {
"type": "boolean",
"description": "Витягувати посилання зі сторінки",
"default": True
},
"extract_images": {
"type": "boolean",
"description": "Витягувати зображення",
"default": False
}
},
"required": ["url"]
}
}
},
# PRIORITY 6: TTS tools
{
"type": "function",
"function": {
"name": "tts_speak",
"description": "🔊 Перетворити текст на аудіо (Text-to-Speech). Повертає аудіо файл. Використовуй коли користувач просить озвучити текст.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Текст для озвучення"
},
"language": {
"type": "string",
"enum": ["uk", "en", "ru"],
"description": "Мова озвучення",
"default": "uk"
}
},
"required": ["text"]
}
}
}
]
@dataclass
class ToolResult:
"""Result of tool execution"""
success: bool
result: Any
error: Optional[str] = None
image_base64: Optional[str] = None # For image generation results
class ToolManager:
"""Manages tool execution for the agent"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.http_client = httpx.AsyncClient(timeout=60.0)
self.swapper_url = os.getenv("SWAPPER_URL", "http://swapper-service:8890")
self.tools_config = self._load_tools_config()
def _load_tools_config(self) -> Dict[str, Dict]:
"""Load tool endpoints from config"""
tools = {}
agent_config = self.config.get("agents", {}).get("helion", {})
for tool in agent_config.get("tools", []):
if "endpoint" in tool:
tools[tool["id"]] = {
"endpoint": tool["endpoint"],
"method": tool.get("method", "POST")
}
return tools
def get_tool_definitions(self, agent_id: str = None) -> List[Dict]:
"""Get tool definitions for function calling, filtered by agent permissions"""
if not agent_id:
return TOOL_DEFINITIONS
# Get allowed tools for this agent
allowed_tools = get_agent_tools(agent_id)
# Filter tool definitions
filtered = []
for tool_def in TOOL_DEFINITIONS:
tool_name = tool_def.get("function", {}).get("name")
if tool_name in allowed_tools:
filtered.append(tool_def)
tool_names = [t.get("function", {}).get("name") for t in filtered]
logger.debug(f"Agent {agent_id} has {len(filtered)} tools: {tool_names}")
return filtered
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any], agent_id: str = None) -> ToolResult:
"""Execute a tool and return result. Optionally checks agent permissions."""
logger.info(f"🔧 Executing tool: {tool_name} for agent={agent_id} with args: {arguments}")
# Check agent permission if agent_id provided
if agent_id and not is_tool_allowed(agent_id, tool_name):
logger.warning(f"⚠️ Tool {tool_name} not allowed for agent {agent_id}")
return ToolResult(success=False, result=None, error=f"Tool {tool_name} not available for this agent")
try:
# Priority 1: Memory/Knowledge tools
if tool_name == "memory_search":
return await self._memory_search(arguments, agent_id=agent_id)
elif tool_name == "graph_query":
return await self._graph_query(arguments, agent_id=agent_id)
# Priority 2: Web tools
elif tool_name == "web_search":
return await self._web_search(arguments)
elif tool_name == "web_extract":
return await self._web_extract(arguments)
elif tool_name == "image_generate":
return await self._image_generate(arguments)
elif tool_name == "remember_fact":
return await self._remember_fact(arguments)
# Priority 4: Presentation tools
elif tool_name == "presentation_create":
return await self._presentation_create(arguments)
elif tool_name == "presentation_status":
return await self._presentation_status(arguments)
elif tool_name == "presentation_download":
return await self._presentation_download(arguments)
# Priority 5: Web scraping tools
elif tool_name == "crawl4ai_scrape":
return await self._crawl4ai_scrape(arguments)
# Priority 6: TTS tools
elif tool_name == "tts_speak":
return await self._tts_speak(arguments)
else:
return ToolResult(success=False, result=None, error=f"Unknown tool: {tool_name}")
except Exception as e:
logger.error(f"Tool execution failed: {e}")
return ToolResult(success=False, result=None, error=str(e))
async def _memory_search(self, args: Dict, agent_id: str = None) -> ToolResult:
"""Search in Qdrant vector memory using Router's memory_retrieval - PRIORITY 1"""
query = args.get("query")
try:
# Use Router's memory_retrieval pipeline directly (has Qdrant connection)
from memory_retrieval import memory_retrieval
if memory_retrieval and memory_retrieval.qdrant_client:
results = await memory_retrieval.search_memories(
query=query,
agent_id=agent_id or "helion",
limit=5
)
if results:
formatted = []
for r in results:
text = r.get("text", "")
score = r.get("score", 0)
mem_type = r.get("type", "memory")
if text:
formatted.append(f"• [{mem_type}] {text[:200]}... (релевантність: {score:.2f})")
if formatted:
return ToolResult(success=True, result=f"🧠 Знайдено в пам'яті:\n" + "\n".join(formatted))
return ToolResult(success=True, result="🧠 В моїй пам'яті немає інформації про це.")
else:
return ToolResult(success=True, result="🧠 Пам'ять недоступна, спробую web_search.")
except Exception as e:
logger.warning(f"Memory search error: {e}")
return ToolResult(success=True, result="🧠 Не вдалося перевірити пам'ять. Спробую інші джерела.")
async def _web_search(self, args: Dict) -> ToolResult:
"""Execute web search - PRIORITY 2 (use after memory_search)"""
query = args.get("query")
max_results = args.get("max_results", 5)
try:
resp = await self.http_client.post(
f"{self.swapper_url}/web/search",
json={"query": query, "max_results": max_results}
)
if resp.status_code == 200:
data = resp.json()
results = data.get("results", [])
# Format results for LLM
formatted = []
for r in results[:max_results]:
formatted.append(f"- {r.get('title', 'No title')}\n {r.get('snippet', '')}\n URL: {r.get('url', '')}")
return ToolResult(success=True, result="\n".join(formatted) if formatted else "Нічого не знайдено")
else:
return ToolResult(success=False, result=None, error=f"Search failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _web_extract(self, args: Dict) -> ToolResult:
"""Extract content from URL"""
url = args.get("url")
try:
resp = await self.http_client.post(
f"{self.swapper_url}/web/extract",
json={"url": url}
)
if resp.status_code == 200:
data = resp.json()
content = data.get("content", "")
# Truncate if too long
if len(content) > 4000:
content = content[:4000] + "\n... (текст обрізано)"
return ToolResult(success=True, result=content)
else:
return ToolResult(success=False, result=None, error=f"Extract failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _unload_ollama_models(self):
"""Unload all Ollama models to free VRAM for heavy operations like FLUX"""
ollama_url = os.getenv("OLLAMA_BASE_URL", "http://172.18.0.1:11434")
models_to_unload = ["qwen3:8b", "qwen3-vl:8b"]
for model in models_to_unload:
try:
await self.http_client.post(
f"{ollama_url}/api/generate",
json={"model": model, "keep_alive": 0},
timeout=5.0
)
logger.info(f"🧹 Unloaded Ollama model: {model}")
except Exception as e:
logger.debug(f"Could not unload {model}: {e}")
# Give GPU time to release memory
import asyncio
await asyncio.sleep(1)
async def _unload_flux(self):
"""Unload FLUX model after image generation to free VRAM"""
try:
# Try to unload flux-klein-4b model
await self.http_client.post(
f"{self.swapper_url}/image/models/flux-klein-4b/unload",
timeout=10.0
)
logger.info("🧹 Unloaded FLUX model from Swapper")
except Exception as e:
logger.debug(f"Could not unload FLUX: {e}")
async def _image_generate(self, args: Dict) -> ToolResult:
"""Generate image with VRAM management"""
prompt = args.get("prompt")
# Use smaller sizes to fit in VRAM (20GB GPU shared with LLM)
width = min(args.get("width", 512), 512)
height = min(args.get("height", 512), 512)
try:
# Step 1: Unload Ollama models to free VRAM for FLUX (~15GB needed)
logger.info("🔄 Preparing VRAM for FLUX image generation...")
await self._unload_ollama_models()
# Step 2: Generate image
resp = await self.http_client.post(
f"{self.swapper_url}/image/generate",
json={"prompt": prompt, "width": width, "height": height, "num_inference_steps": 8},
timeout=180.0 # FLUX needs time
)
if resp.status_code == 200:
data = resp.json()
image_base64 = data.get("image_base64")
image_url = data.get("image_url") or data.get("url")
# Step 3: Unload FLUX to free VRAM for other models (LLM, Vision)
logger.info("🔄 Image generated, unloading FLUX to free VRAM...")
await self._unload_flux()
if image_base64:
# Return base64 image for Gateway to send
return ToolResult(
success=True,
result="✅ Зображення згенеровано",
image_base64=image_base64
)
elif image_url:
return ToolResult(
success=True,
result=f"✅ Зображення згенеровано: {image_url}",
image_base64=None
)
else:
return ToolResult(
success=True,
result="✅ Зображення згенеровано (формат невідомий)",
image_base64=None
)
else:
# Also unload FLUX on failure to free VRAM
await self._unload_flux()
return ToolResult(success=False, result=None, error=f"Generation failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _graph_query(self, args: Dict, agent_id: str = None) -> ToolResult:
"""Query knowledge graph"""
query = args.get("query")
entity_type = args.get("entity_type")
# Simple natural language to Cypher conversion
cypher = f"""
MATCH (n)
WHERE toLower(n.name) CONTAINS toLower('{query}')
OR toLower(toString(n)) CONTAINS toLower('{query}')
RETURN labels(n)[0] as type, n.name as name, n.node_id as id
LIMIT 10
"""
if entity_type:
cypher = f"""
MATCH (n:{entity_type})
WHERE toLower(n.name) CONTAINS toLower('{query}')
RETURN n.name as name, n.node_id as id
LIMIT 10
"""
try:
# Execute via Router's graph endpoint
resp = await self.http_client.post(
"http://localhost:8000/v1/graph/query",
json={"query": cypher}
)
if resp.status_code == 200:
data = resp.json()
return ToolResult(success=True, result=json.dumps(data.get("results", []), ensure_ascii=False))
else:
return ToolResult(success=False, result=None, error=f"Graph query failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _remember_fact(self, args: Dict) -> ToolResult:
"""Store a fact in memory"""
fact = args.get("fact")
about = args.get("about")
category = args.get("category", "general")
try:
# Store via Memory Service
resp = await self.http_client.post(
"http://memory-service:8000/facts/upsert",
json={
"user_id": about,
"fact_key": f"{category}_{hash(fact) % 10000}",
"fact_value": fact,
"fact_value_json": {"text": fact, "category": category, "about": about}
}
)
if resp.status_code in [200, 201]:
return ToolResult(success=True, result=f"✅ Запам'ятовано факт про {about}")
else:
return ToolResult(success=False, result=None, error=f"Memory store failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _presentation_create(self, args: Dict) -> ToolResult:
"""Create a presentation via Presentation Renderer"""
title = args.get("title", "Презентація")
slides = args.get("slides", [])
brand_id = args.get("brand_id", "energyunion")
theme_version = args.get("theme_version", "v1.0.0")
language = args.get("language", "uk")
# Build SlideSpec
slidespec = {
"meta": {
"title": title,
"brand_id": brand_id,
"theme_version": theme_version,
"language": language
},
"slides": []
}
# Add title slide
slidespec["slides"].append({
"type": "title",
"title": title
})
# Add content slides
for slide in slides:
slide_obj = {
"type": "content",
"title": slide.get("title", ""),
"body": slide.get("content", "")
}
slidespec["slides"].append(slide_obj)
try:
renderer_url = os.getenv("PRESENTATION_RENDERER_URL", "http://presentation-renderer:9600")
resp = await self.http_client.post(
f"{renderer_url}/present/render",
json=slidespec,
timeout=120.0
)
if resp.status_code == 200:
data = resp.json()
job_id = data.get("job_id")
artifact_id = data.get("artifact_id")
return ToolResult(
success=True,
result=f"📊 Презентацію створено!\n\n🆔 Job ID: `{job_id}`\n📦 Artifact ID: `{artifact_id}`\n\nЩоб перевірити статус: використай presentation_status\nЩоб завантажити: використай presentation_download"
)
else:
error_text = resp.text[:200] if resp.text else "Unknown error"
return ToolResult(success=False, result=None, error=f"Render failed ({resp.status_code}): {error_text}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _presentation_status(self, args: Dict) -> ToolResult:
"""Check presentation job status"""
job_id = args.get("job_id")
try:
registry_url = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9700")
resp = await self.http_client.get(
f"{registry_url}/jobs/{job_id}",
timeout=10.0
)
if resp.status_code == 200:
data = resp.json()
status = data.get("status", "unknown")
artifact_id = data.get("artifact_id")
error = data.get("error_text", "")
status_emoji = {"queued": "", "running": "🔄", "done": "", "failed": ""}.get(status, "")
result = f"{status_emoji} Статус: **{status}**\n"
if artifact_id:
result += f"📦 Artifact ID: `{artifact_id}`\n"
if status == "done":
result += "\n✅ Презентація готова! Використай presentation_download щоб отримати файл."
if status == "failed" and error:
result += f"\n❌ Помилка: {error[:200]}"
return ToolResult(success=True, result=result)
elif resp.status_code == 404:
return ToolResult(success=False, result=None, error="Job not found")
else:
return ToolResult(success=False, result=None, error=f"Status check failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _presentation_download(self, args: Dict) -> ToolResult:
"""Get download link for presentation"""
artifact_id = args.get("artifact_id")
file_format = args.get("format", "pptx")
try:
registry_url = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9700")
resp = await self.http_client.get(
f"{registry_url}/artifacts/{artifact_id}/download?format={file_format}",
timeout=10.0,
follow_redirects=False
)
if resp.status_code in [200, 302, 307]:
# Check for signed URL in response or Location header
if resp.status_code in [302, 307]:
download_url = resp.headers.get("Location")
else:
data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {}
download_url = data.get("download_url") or data.get("url")
if download_url:
return ToolResult(
success=True,
result=f"📥 **Посилання для завантаження ({file_format.upper()}):**\n\n{download_url}\n\n⏰ Посилання дійсне 30 хвилин."
)
else:
# Direct binary response - artifact available
return ToolResult(
success=True,
result=f"✅ Файл {file_format.upper()} готовий! Завантажити можна через: {registry_url}/artifacts/{artifact_id}/download?format={file_format}"
)
elif resp.status_code == 404:
return ToolResult(success=False, result=None, error=f"Формат {file_format.upper()} ще не готовий. Спробуй пізніше.")
else:
return ToolResult(success=False, result=None, error=f"Download failed: {resp.status_code}")
except Exception as e:
return ToolResult(success=False, result=None, error=str(e))
async def _crawl4ai_scrape(self, args: Dict) -> ToolResult:
"""Deep scrape a web page using Crawl4AI - PRIORITY 5"""
url = args.get("url")
extract_links = args.get("extract_links", True)
extract_images = args.get("extract_images", False)
if not url:
return ToolResult(success=False, result=None, error="URL is required")
try:
crawl4ai_url = os.getenv("CRAWL4AI_URL", "http://dagi-crawl4ai-node1:11235")
payload = {
"urls": [url],
"priority": 5,
"session_id": f"agent_scrape_{hash(url) % 10000}"
}
resp = await self.http_client.post(
f"{crawl4ai_url}/crawl",
json=payload,
timeout=60.0
)
if resp.status_code == 200:
data = resp.json()
results = data.get("results", []) if isinstance(data, dict) else []
if not results and isinstance(data, dict):
results = [data]
if results:
result = results[0] if isinstance(results, list) else results
markdown = result.get("markdown", "") or result.get("cleaned_html", "") or result.get("text", "")
title = result.get("title", url)
if len(markdown) > 3000:
markdown = markdown[:3000] + "... (скорочено)"
response_parts = [f"**{title}**", "", markdown]
if extract_links:
links = result.get("links", [])
if links:
response_parts.append("")
response_parts.append("**Посилання:**")
for link in links[:10]:
if isinstance(link, dict):
link_url = link.get("href", "")
else:
link_url = str(link)
if link_url:
response_parts.append(f"- {link_url}")
return ToolResult(success=True, result="\n".join(response_parts))
else:
return ToolResult(success=False, result=None, error="No content extracted")
else:
return ToolResult(success=False, result=None, error=f"Crawl failed: {resp.status_code}")
except Exception as e:
logger.error(f"Crawl4AI scrape failed: {e}")
return ToolResult(success=False, result=None, error=str(e))
async def _tts_speak(self, args: Dict) -> ToolResult:
"""Convert text to speech using Swapper TTS - PRIORITY 6"""
text = args.get("text")
language = args.get("language", "uk")
if not text:
return ToolResult(success=False, result=None, error="Text is required")
try:
if len(text) > 1000:
text = text[:1000]
resp = await self.http_client.post(
f"{self.swapper_url}/tts",
json={"text": text, "language": language},
timeout=60.0
)
if resp.status_code == 200:
data = resp.json()
audio_url = data.get("audio_url") or data.get("url")
if audio_url:
return ToolResult(success=True, result=f"Аудіо: {audio_url}")
else:
return ToolResult(success=True, result="TTS completed")
else:
return ToolResult(success=False, result=None, error=f"TTS failed: {resp.status_code}")
except Exception as e:
logger.error(f"TTS failed: {e}")
return ToolResult(success=False, result=None, error=str(e))
async def close(self):
await self.http_client.aclose()
def format_tool_calls_for_response(tool_results: List[Dict], fallback_mode: str = "normal") -> str:
"""
Format tool results in human-friendly way - NOT raw data!
Args:
tool_results: List of tool execution results
fallback_mode: "normal" | "dsml_detected" | "empty_response"
"""
# Special handling for DSML detection - LLM tried to use tools but got confused
# If we have successful tool results, show them instead of generic fallback
if fallback_mode == "dsml_detected":
# Check if any tool succeeded with a useful result
if tool_results:
for tr in tool_results:
if tr.get("success") and tr.get("result"):
result = str(tr.get("result", ""))
if result and len(result) > 10 and "error" not in result.lower():
# We have a useful tool result - use it!
if len(result) > 600:
return result[:600] + "..."
return result
# No useful tool results - give presence acknowledgment
return "Я тут. Чим можу допомогти?"
if not tool_results:
if fallback_mode == "empty_response":
return "Вибач, щось пішло не так. Спробуй ще раз."
return "Вибач, не вдалося виконати запит."
# Check what tools were used
tool_names = [tr.get("name", "") for tr in tool_results]
# Check if ANY tool succeeded
any_success = any(tr.get("success") for tr in tool_results)
if not any_success:
# All tools failed - give helpful message
errors = [tr.get("error", "unknown") for tr in tool_results if tr.get("error")]
if errors:
logger.warning(f"All tools failed: {errors}")
return "Вибач, виникла технічна проблема. Спробуй ще раз або переформулюй питання."
# Image generation - special handling
if "image_generate" in tool_names:
for tr in tool_results:
if tr.get("name") == "image_generate" and tr.get("success"):
return "✅ Зображення згенеровано!"
# Web search - show actual results to user
if "web_search" in tool_names:
for tr in tool_results:
if tr.get("name") == "web_search":
if tr.get("success"):
result = tr.get("result", "")
if not result:
return "🔍 Не знайшов релевантної інформації в інтернеті."
# Parse and format results for user
lines = result.strip().split("\n")
formatted = ["🔍 **Результати пошуку:**\n"]
current_title = ""
current_url = ""
current_snippet = ""
count = 0
for line in lines:
line = line.strip()
if line.startswith("- ") and not line.startswith("- URL:"):
if current_title and count < 3: # Show max 3 results
formatted.append(f"**{count}. {current_title}**")
if current_snippet:
formatted.append(f" {current_snippet[:150]}...")
if current_url:
formatted.append(f" 🔗 {current_url}\n")
current_title = line[2:].strip()
current_snippet = ""
current_url = ""
count += 1
elif "URL:" in line:
current_url = line.split("URL:")[-1].strip()
elif line and not line.startswith("-"):
current_snippet = line
# Add last result
if current_title and count <= 3:
formatted.append(f"**{count}. {current_title}**")
if current_snippet:
formatted.append(f" {current_snippet[:150]}...")
if current_url:
formatted.append(f" 🔗 {current_url}")
if len(formatted) > 1:
return "\n".join(formatted)
else:
return "🔍 Не знайшов релевантної інформації в інтернеті."
else:
return "🔍 Пошук в інтернеті не вдався. Спробуй ще раз."
# Memory search
if "memory_search" in tool_names:
for tr in tool_results:
if tr.get("name") == "memory_search" and tr.get("success"):
result = tr.get("result", "")
if "немає інформації" in result.lower() or not result:
return "🧠 В моїй пам'яті немає інформації про це."
# Truncate if too long
if len(result) > 500:
return result[:500] + "..."
return result
# Graph query
if "graph_query" in tool_names:
for tr in tool_results:
if tr.get("name") == "graph_query" and tr.get("success"):
result = tr.get("result", "")
if not result or "не знайдено" in result.lower():
return "📊 В базі знань немає інформації про це."
if len(result) > 500:
return result[:500] + "..."
return result
# Default fallback - check if we have any result to show
for tr in tool_results:
if tr.get("success") and tr.get("result"):
result = str(tr.get("result", ""))
if result and len(result) > 10:
# We have something, show it
if len(result) > 400:
return result[:400] + "..."
return result
# Really nothing useful - be honest
return "Я обробив твій запит, але не знайшов корисної інформації. Можеш уточнити питання?"