From 21576f0ca3d30ae83a86c1d8f9988ba9183f62e2 Mon Sep 17 00:00:00 2001 From: Apple Date: Sun, 15 Feb 2026 01:50:37 -0800 Subject: [PATCH] node1: add universal file tool, gateway document delivery, and sync runbook --- docs/runbooks/NODE1_FILE_TOOL_SYNC_RUNBOOK.md | 124 +++ gateway-bot/http_api.py | 333 +++++- gateway-bot/router_client.py | 77 +- services/router/agent_tools_config.py | 37 +- services/router/main.py | 769 +++++++++++++- services/router/requirements.txt | 3 + services/router/tool_manager.py | 995 ++++++++++++++++-- 7 files changed, 2207 insertions(+), 131 deletions(-) create mode 100644 docs/runbooks/NODE1_FILE_TOOL_SYNC_RUNBOOK.md diff --git a/docs/runbooks/NODE1_FILE_TOOL_SYNC_RUNBOOK.md b/docs/runbooks/NODE1_FILE_TOOL_SYNC_RUNBOOK.md new file mode 100644 index 00000000..c277b00e --- /dev/null +++ b/docs/runbooks/NODE1_FILE_TOOL_SYNC_RUNBOOK.md @@ -0,0 +1,124 @@ +# NODE1 File Tool Sync Runbook + +## Scope +This runbook documents: +- how NODE1 runtime drift was identified and synchronized, +- how universal `file_tool` was introduced into the **actual** NODE1 router stack, +- how to deploy and rollback safely. + +## Canonical Runtime (NODE1) +- Host: `144.76.224.179` +- Runtime repo: `/opt/microdao-daarion` +- Compose: `/opt/microdao-daarion/docker-compose.node1.yml` +- Router container: `dagi-router-node1` +- Gateway container: `dagi-gateway-node1` +- Router API contract: `POST /v1/agents/{agent_id}/infer` (not `/route`) + +## Drift Findings (before sync) +The laptop had multiple repos; only this repo matches NODE1 architecture: +- `/Users/apple/github-projects/microdao-daarion` + +Critical files were drifted against NODE1 runtime (15 files): +- `docker-compose.node1.yml` +- `gateway-bot/agent_registry.json` +- `gateway-bot/http_api.py` +- `gateway-bot/router_client.py` +- `gateway-bot/senpai_prompt.txt` +- `http_api.py` +- `services/router/agent_tools_config.py` +- `services/router/main.py` +- `services/router/memory_retrieval.py` +- `services/router/requirements.txt` +- `services/router/router-config.yml` +- `services/router/tool_manager.py` +- `services/senpai-md-consumer/senpai/md_consumer/main.py` +- `services/senpai-md-consumer/senpai/md_consumer/publisher.py` +- `services/swapper-service/requirements.txt` + +## Sync Procedure (NODE1 -> Laptop) +1. Snapshot and compare hashes: + - compute SHA256 for the critical file list on laptop and NODE1. +2. Backup local copies: + - `rollback_backups/node1_sync_/...` +3. Copy runtime files from NODE1 to laptop: + - `scp root@144.76.224.179:/opt/microdao-daarion/ ...` +4. Verify 1:1 hash match. + +## File Tool Implementation +Implemented in actual NODE1 stack (`services/router/*` + gateway): + +### Added actions +- `csv_create` +- `csv_update` +- `json_export` +- `yaml_export` +- `zip_bundle` +- `docx_create` +- `docx_update` +- `pdf_merge` +- `pdf_split` +- `pdf_fill` + +### Standard output contract +For file-producing tool calls, router now propagates: +- `file_base64` +- `file_name` +- `file_mime` +- `message` (inside tool result payload) + +### Gateway behavior +`gateway-bot/http_api.py` now sends Telegram `sendDocument` when file fields are present. + +## Changed Files +- `services/router/requirements.txt` +- `services/router/agent_tools_config.py` +- `services/router/tool_manager.py` +- `services/router/main.py` +- `gateway-bot/router_client.py` +- `gateway-bot/http_api.py` + +## Deployment Steps (NODE1) +1. Backup target files on NODE1 before each step. +2. Sync updated files to `/opt/microdao-daarion`. +3. Compile checks: + - `python3 -m py_compile services/router/tool_manager.py services/router/main.py services/router/agent_tools_config.py gateway-bot/router_client.py gateway-bot/http_api.py` +4. Rebuild/restart runtime services: + - `docker compose -f docker-compose.node1.yml up -d --build --no-deps router gateway` +5. Health checks: + - `curl http://127.0.0.1:9102/health` + - `curl http://127.0.0.1:9300/health` + +## Smoke Tests +Run inside `dagi-router-node1` to validate actions deterministically: +- CSV create/update +- JSON/YAML export +- ZIP bundle +- DOCX create/update +- PDF merge/split/fill + +Also verify infer endpoint still works: +- `POST http://127.0.0.1:9102/v1/agents/devtools/infer` + +## Backups Created During This Work +- `rollback_backups/file_tool_step1_20260215_011637/...` +- `rollback_backups/file_tool_step2_tool_manager.py.bak_20260215_012029` +- `rollback_backups/file_tool_step3_tool_manager.py.bak_20260215_012200` +- `rollback_backups/file_tool_step4_tool_manager.py.bak_20260215_012309` + +## Rollback (NODE1) +```bash +cd /opt/microdao-daarion +cp rollback_backups/file_tool_step1_20260215_011637/services/router/requirements.txt services/router/requirements.txt +cp rollback_backups/file_tool_step1_20260215_011637/services/router/agent_tools_config.py services/router/agent_tools_config.py +cp rollback_backups/file_tool_step1_20260215_011637/services/router/tool_manager.py services/router/tool_manager.py +cp rollback_backups/file_tool_step1_20260215_011637/services/router/main.py services/router/main.py +cp rollback_backups/file_tool_step1_20260215_011637/gateway-bot/router_client.py gateway-bot/router_client.py +cp rollback_backups/file_tool_step1_20260215_011637/gateway-bot/http_api.py gateway-bot/http_api.py + +docker compose -f docker-compose.node1.yml up -d --build --no-deps router gateway +``` + +## Notes +- `docker-compose.node1.yml` may warn about deprecated `version` key; this is non-blocking. +- Avoid `--remove-orphans` unless explicitly intended. +- Use `--no-deps` for targeted router/gateway rollout to avoid unrelated service churn. diff --git a/gateway-bot/http_api.py b/gateway-bot/http_api.py index f64822fa..579218e9 100644 --- a/gateway-bot/http_api.py +++ b/gateway-bot/http_api.py @@ -57,6 +57,172 @@ LAST_PENDING_STATE: Dict[str, Dict[str, Any]] = {} PENDING_STATE_TTL = 1800 # 30 minutes +# Per-user language preference cache (chat_id:user_id -> {lang, ts}) +USER_LANGUAGE_PREFS: Dict[str, Dict[str, Any]] = {} +USER_LANGUAGE_PREF_TTL = 30 * 24 * 3600 # 30 days + +# Recent photo context for follow-up questions in chat (agent:chat:user -> {file_id, ts}) +RECENT_PHOTO_CONTEXT: Dict[str, Dict[str, Any]] = {} +RECENT_PHOTO_TTL = 30 * 60 # 30 minutes + + +def _cleanup_recent_photo_context() -> None: + now = time.time() + expired = [k for k, v in RECENT_PHOTO_CONTEXT.items() if now - float(v.get("ts", 0)) > RECENT_PHOTO_TTL] + for k in expired: + del RECENT_PHOTO_CONTEXT[k] + + +def _set_recent_photo_context(agent_id: str, chat_id: str, user_id: str, file_id: str) -> None: + _cleanup_recent_photo_context() + key = f"{agent_id}:{chat_id}:{user_id}" + RECENT_PHOTO_CONTEXT[key] = {"file_id": file_id, "ts": time.time()} + + +def _get_recent_photo_file_id(agent_id: str, chat_id: str, user_id: str) -> Optional[str]: + _cleanup_recent_photo_context() + key = f"{agent_id}:{chat_id}:{user_id}" + rec = RECENT_PHOTO_CONTEXT.get(key) + if not rec: + return None + return rec.get("file_id") + + +def _looks_like_photo_followup(text: str) -> bool: + if not text: + return False + t = text.strip().lower() + markers = [ + "що ти бачиш", "що на фото", "що на зображенні", "опиши фото", "подивись фото", + "what do you see", "what is in the image", "describe the photo", + "что ты видишь", "что на фото", "опиши фото", "посмотри фото", + ] + return any(m in t for m in markers) + + +def _cleanup_user_language_prefs() -> None: + now = time.time() + expired = [k for k, v in USER_LANGUAGE_PREFS.items() if now - float(v.get("ts", 0)) > USER_LANGUAGE_PREF_TTL] + for k in expired: + del USER_LANGUAGE_PREFS[k] + + +def _normalize_lang_code(raw: Optional[str]) -> Optional[str]: + if not raw: + return None + code = str(raw).strip().lower().replace("_", "-") + if code.startswith("uk"): + return "uk" + if code.startswith("ru"): + return "ru" + if code.startswith("en"): + return "en" + return None + + +def _detect_language_from_text(text: str) -> Optional[str]: + if not text: + return None + t = text.lower() + letters = [ch for ch in t if ch.isalpha()] + if not letters: + return None + + cyr = sum(1 for ch in letters if "а" <= ch <= "я" or ch in "іїєґё") + lat = sum(1 for ch in letters if "a" <= ch <= "z") + + if cyr >= 3 and cyr >= lat: + # Ukrainian-specific letters strongly indicate Ukrainian. + if any(ch in t for ch in "іїєґ"): + return "uk" + # Russian-specific letters/symbols. + if any(ch in t for ch in "ёыэъ"): + return "ru" + # Soft lexical preference. + uk_hits = sum(1 for w in ("що", "який", "дякую", "будь", "будь ласка", "привіт") if w in t) + ru_hits = sum(1 for w in ("что", "какой", "спасибо", "пожалуйста", "привет") if w in t) + if uk_hits > ru_hits: + return "uk" + if ru_hits > uk_hits: + return "ru" + return "uk" + + if lat >= 3 and lat > cyr: + return "en" + + return None + + +def resolve_preferred_language(chat_id: str, user_id: str, text: str, telegram_lang_code: Optional[str]) -> str: + _cleanup_user_language_prefs() + key = f"{chat_id}:{user_id}" + + text_lang = _detect_language_from_text(text) + tg_lang = _normalize_lang_code(telegram_lang_code) + cached_lang = USER_LANGUAGE_PREFS.get(key, {}).get("lang") + + preferred = text_lang or tg_lang or cached_lang or "uk" + USER_LANGUAGE_PREFS[key] = {"lang": preferred, "ts": time.time()} + return preferred + + +def preferred_language_label(lang: str) -> str: + return { + "uk": "Ukrainian", + "ru": "Russian", + "en": "English", + }.get((lang or "").lower(), "Ukrainian") + + +def _extract_preferred_language_from_profile_fact(fact: Optional[Dict[str, Any]]) -> Optional[str]: + if not isinstance(fact, dict): + return None + data = fact.get("fact_value_json") + if not isinstance(data, dict): + return None + preferred = _normalize_lang_code(data.get("preferred_language")) + if preferred: + return preferred + return _normalize_lang_code(data.get("language_code")) + + +async def resolve_preferred_language_persistent( + chat_id: str, + user_id: str, + text: str, + telegram_lang_code: Optional[str], + team_id: Optional[str] = None, +) -> str: + """Resolve language with memory-service fallback for post-restart continuity.""" + _cleanup_user_language_prefs() + key = f"{chat_id}:{user_id}" + + text_lang = _detect_language_from_text(text) + tg_lang = _normalize_lang_code(telegram_lang_code) + cached_lang = USER_LANGUAGE_PREFS.get(key, {}).get("lang") + + if text_lang or tg_lang or cached_lang: + preferred = text_lang or tg_lang or cached_lang or "uk" + USER_LANGUAGE_PREFS[key] = {"lang": preferred, "ts": time.time()} + return preferred + + try: + fact = await memory_client.get_fact( + user_id=f"tg:{user_id}", + fact_key="profile", + team_id=team_id, + ) + fact_lang = _extract_preferred_language_from_profile_fact(fact) + if fact_lang: + USER_LANGUAGE_PREFS[key] = {"lang": fact_lang, "ts": time.time()} + return fact_lang + except Exception as e: + logger.debug(f"preferred language fact lookup failed: {e}") + + USER_LANGUAGE_PREFS[key] = {"lang": "uk", "ts": time.time()} + return "uk" + + def _pending_state_cleanup(): now = time.time() expired = [cid for cid, rec in LAST_PENDING_STATE.items() if now - rec.get('ts', 0) > PENDING_STATE_TTL] @@ -483,9 +649,36 @@ async def agromatrix_telegram_webhook(update: TelegramUpdate): if user_id and user_id in op_ids: is_ops = True - # Operator NL or slash commands -> handle via Stepan handler - if is_slash or is_ops: + # Operator NL or operator slash commands -> handle via Stepan handler. + # Important: do NOT treat generic slash commands (/start, /agromatrix) as operator commands, + # otherwise regular users will see "Недостатньо прав" or Stepan errors. + operator_slash_cmds = { + "whoami", + "pending", + "pending_show", + "approve", + "reject", + "apply_dict", + "pending_stats", + } + slash_cmd = "" + if is_slash: + try: + slash_cmd = (msg_text.strip().split()[0].lstrip("/").strip().lower()) + except Exception: + slash_cmd = "" + is_operator_slash = bool(slash_cmd) and slash_cmd in operator_slash_cmds + + # Stepan handler currently depends on ChatOpenAI (OPENAI_API_KEY). If key is not configured, + # never route production traffic there (avoid "Помилка обробки..." and webhook 5xx). + stepan_enabled = bool(os.getenv("OPENAI_API_KEY", "").strip()) + if stepan_enabled and (is_ops or is_operator_slash): return await handle_stepan_message(update, AGROMATRIX_CONFIG) + if (is_ops or is_operator_slash) and not stepan_enabled: + logger.warning( + "Stepan handler disabled (OPENAI_API_KEY missing); falling back to Router pipeline " + f"for chat_id={chat_id}, user_id={user_id}, slash_cmd={slash_cmd!r}" + ) # General conversation -> standard Router pipeline (like all other agents) return await handle_telegram_webhook(AGROMATRIX_CONFIG, update) @@ -611,14 +804,37 @@ def extract_bot_mentions(text: str) -> List[str]: return mentions +def should_force_detailed_reply(text: str) -> bool: + """Soft signal: user explicitly asks for details/long format.""" + if not text: + return False + lower = text.strip().lower() + detail_markers = [ + "детально", "подробно", "розгорну", "розпиши", "по всіх пунктах", + "step by step", "покроково", "з прикладами", "глибоко", "deep dive", + "full", "повний розбір", "максимально детально", + ] + return any(m in lower for m in detail_markers) + + def should_force_concise_reply(text: str) -> bool: - """Якщо коротке або без питального знаку — просимо агента відповісти стисло.""" + """Soft concise mode by default, unless user asks for detailed answer.""" if not text: return True + stripped = text.strip() - if len(stripped) <= 120 and "?" not in stripped: + if not stripped: return True - return False + + if should_force_detailed_reply(stripped): + return False + + # Very long user request usually means they expect context-aware answer. + if len(stripped) > 700: + return False + + # For regular Q&A in chat keep first response concise by default. + return True COMPLEX_REASONING_KEYWORDS = [ @@ -808,7 +1024,9 @@ async def process_photo( user_id: str, username: str, dao_id: str, - photo: Dict[str, Any] + photo: Dict[str, Any], + caption_override: Optional[str] = None, + bypass_media_gate: bool = False, ) -> Dict[str, Any]: """ Універсальна функція для обробки фото для будь-якого агента. @@ -833,9 +1051,10 @@ async def process_photo( return {"ok": False, "error": "No file_id in photo"} logger.info(f"{agent_config.name}: Photo from {username} (tg:{user_id}), file_id: {file_id}") + _set_recent_photo_context(agent_config.agent_id, chat_id, user_id, file_id) # Get caption for media question check - caption = (update.message or {}).get("caption") or "" + caption = caption_override if caption_override is not None else ((update.message or {}).get("caption") or "") chat = (update.message or {}).get("chat", {}) chat_type = chat.get("type", "private") is_private_chat = chat_type == "private" @@ -843,7 +1062,7 @@ async def process_photo( # BEHAVIOR POLICY v1: Media-no-comment # Check if photo has a question/request in caption - if not is_private_chat and not is_training: + if not bypass_media_gate and not is_private_chat and not is_training: has_question = detect_media_question(caption) if not has_question: logger.info(f"🔇 MEDIA-NO-COMMENT: Photo without question. Agent {agent_config.agent_id} NOT responding.") @@ -961,10 +1180,10 @@ async def process_photo( else: await send_telegram_message( chat_id, - "Не вдалося отримати опис зображення.", + "Не вдалося коректно обробити фото. Спробуйте інше фото або додайте короткий опис, що саме перевірити.", telegram_token ) - return {"ok": False, "error": "No description in response"} + return {"ok": True, "handled": True, "reason": "vision_empty_response"} else: error_msg = response.get("error", "Unknown error") if isinstance(response, dict) else "Router error" logger.error(f"{agent_config.name}: Vision-8b error: {error_msg}") @@ -1338,6 +1557,13 @@ async def handle_telegram_webhook( # Get DAO ID for this chat dao_id = get_dao_id(chat_id, "telegram", agent_id=agent_config.agent_id) + initial_preferred_lang = resolve_preferred_language( + chat_id=chat_id, + user_id=user_id, + text=update.message.get("text", ""), + telegram_lang_code=from_user.get("language_code"), + ) + # Оновлюємо факти про користувача/агента для побудови графу пам'яті asyncio.create_task( memory_client.upsert_fact( @@ -1348,6 +1574,7 @@ async def handle_telegram_webhook( "first_name": first_name, "last_name": last_name, "language_code": from_user.get("language_code"), + "preferred_language": initial_preferred_lang, "is_bot": is_sender_bot, }, team_id=dao_id, @@ -1919,8 +2146,7 @@ async def handle_telegram_webhook( result = await process_photo( agent_config, update, chat_id, user_id, username, dao_id, photo ) - if result.get("ok"): - return result + return result # Check if it's a voice message voice = update.message.get("voice") @@ -1947,6 +2173,26 @@ async def handle_telegram_webhook( if not text: text = update.message.get("text", "") caption = update.message.get("caption", "") + + # If user asks about a recently sent photo, run vision on cached photo file_id. + if text and _looks_like_photo_followup(text): + recent_file_id = _get_recent_photo_file_id(agent_config.agent_id, chat_id, user_id) + if recent_file_id: + logger.info( + f"{agent_config.name}: Detected follow-up photo question; using cached file_id={recent_file_id}" + ) + followup_result = await process_photo( + agent_config=agent_config, + update=update, + chat_id=chat_id, + user_id=user_id, + username=username, + dao_id=dao_id, + photo={"file_id": recent_file_id}, + caption_override=text, + bypass_media_gate=True, + ) + return followup_result if not text and not caption: # Check for unsupported message types and silently ignore @@ -2149,9 +2395,10 @@ async def handle_telegram_webhook( return {"ok": True, "ack": True, "reason": respond_reason} # FULL: proceed with LLM/Router call - # For prober requests, respond but don't send to Telegram + # For prober requests, skip LLM/Router entirely to save tokens if is_prober: - logger.info(f"\U0001f9ea PROBER: Agent {agent_config.agent_id} responding to prober request. Reason: {respond_reason}") + logger.info(f"\U0001f9ea PROBER: Agent {agent_config.agent_id} responding to prober (no LLM call). Reason: {respond_reason}") + return {"ok": True, "agent": agent_config.agent_id, "prober": True, "response_preview": "[prober-skip-llm]"} else: logger.info(f"\u2705 SOWA: Agent {agent_config.agent_id} WILL respond (FULL). Reason: {respond_reason}") @@ -2183,6 +2430,15 @@ async def handle_telegram_webhook( else: message_with_context = f"{training_prefix}{text}" + preferred_lang = await resolve_preferred_language_persistent( + chat_id=chat_id, + user_id=user_id, + text=text or "", + telegram_lang_code=from_user.get("language_code"), + team_id=dao_id, + ) + preferred_lang_label = preferred_language_label(preferred_lang) + # Build request to Router system_prompt = agent_config.system_prompt logger.info(f"📝 {agent_config.name} system_prompt length: {len(system_prompt) if system_prompt else 0} chars") @@ -2206,6 +2462,9 @@ async def handle_telegram_webhook( "mentioned_bots": mentioned_bots, "requires_complex_reasoning": needs_complex_reasoning, "is_reply_to_agent": is_reply_to_agent, + "is_training_group": is_training_group, + "preferred_response_language": preferred_lang, + "preferred_response_language_label": preferred_lang_label, }, "context": { "agent_name": agent_config.name, @@ -2218,17 +2477,30 @@ async def handle_telegram_webhook( }, } + if should_force_detailed_reply(text): + router_request["metadata"]["force_detailed"] = True + if should_force_concise_reply(text): # IMPORTANT: preserve conversation context! Only append concise instruction + router_request["metadata"]["force_concise"] = True router_request["message"] = ( router_request["message"] - + "\n\n(Інструкція: дай максимально коротку відповідь, якщо не просили деталей " - "і дочекайся додаткового питання.)" + + "\n\n(Інструкція: спочатку дай коротку відповідь по суті (1-3 абзаци), " + "а якщо користувач попросить — розгорни детально.)" + + f"\n(Мова відповіді: {preferred_lang_label}.)" + + "\n(Не потрібно щоразу представлятися по імені або писати шаблонне: 'чим можу допомогти'.)" ) if needs_complex_reasoning: router_request["metadata"]["provider"] = "cloud_deepseek" router_request["metadata"]["reason"] = "auto_complex" + + if not should_force_concise_reply(text): + router_request["message"] = ( + router_request["message"] + + f"\n\n(Мова відповіді: {preferred_lang_label}.)" + + "\n(Не потрібно щоразу представлятися по імені або писати шаблонне: 'чим можу допомогти'.)" + ) # Send to Router logger.info(f"Sending to Router: agent={agent_config.agent_id}, dao={dao_id}, user=tg:{user_id}") @@ -2238,6 +2510,9 @@ async def handle_telegram_webhook( if isinstance(response, dict) and response.get("ok"): answer_text = response.get("data", {}).get("text") or response.get("response", "") image_base64 = response.get("image_base64") or response.get("data", {}).get("image_base64") + file_base64 = response.get("file_base64") or response.get("data", {}).get("file_base64") + file_name = response.get("file_name") or response.get("data", {}).get("file_name") or "artifact.bin" + file_mime = response.get("file_mime") or response.get("data", {}).get("file_mime") or "application/octet-stream" # Debug logging logger.info(f"📦 Router response: {len(answer_text)} chars, model={response.get('model')}, backend={response.get('backend')}") @@ -2246,7 +2521,9 @@ async def handle_telegram_webhook( logger.info(f"🖼️ Received image_base64: {len(image_base64)} chars") else: logger.debug("⚠️ No image_base64 in response") - + if file_base64: + logger.info(f"📄 Received file_base64: {len(file_base64)} chars ({file_name})") + # Check for NO_OUTPUT (LLM decided not to respond) if is_no_output_response(answer_text): logger.info(f"🔇 NO_OUTPUT: Agent {agent_config.agent_id} returned empty/NO_OUTPUT. Not sending to Telegram.") @@ -2305,8 +2582,27 @@ async def handle_telegram_webhook( logger.info(f"🧪 PROBER: Skipping Telegram send for prober request. Response: {answer_text[:100]}...") return {"ok": True, "agent": agent_config.agent_id, "prober": True, "response_preview": answer_text[:100]} + # Send file artifact if generated + if file_base64: + try: + file_bytes = base64.b64decode(file_base64) + token = telegram_token or os.getenv("TELEGRAM_BOT_TOKEN") + url = f"https://api.telegram.org/bot{token}/sendDocument" + caption = answer_text[:1024] if answer_text else "" + safe_name = str(file_name).split("/")[-1].split("\\")[-1] or "artifact.bin" + async with httpx.AsyncClient() as client: + files = {"document": (safe_name, BytesIO(file_bytes), file_mime)} + data = {"chat_id": chat_id} + if caption: + data["caption"] = caption + response_doc = await client.post(url, files=files, data=data, timeout=45.0) + response_doc.raise_for_status() + logger.info(f"✅ Sent generated document to Telegram chat {chat_id}: {safe_name}") + except Exception as e: + logger.error(f"❌ Failed to send document to Telegram: {e}") + await send_telegram_message(chat_id, answer_text or "Файл згенеровано, але не вдалося надіслати документ.", telegram_token) # Send image if generated - if image_base64: + elif image_base64: try: # Decode base64 image image_bytes = base64.b64decode(image_base64) @@ -2344,6 +2640,7 @@ async def handle_telegram_webhook( agent_metadata={ "mentioned_bots": mentioned_bots, "requires_complex_reasoning": needs_complex_reasoning, + "preferred_language": preferred_lang, }, username=username, ) diff --git a/gateway-bot/router_client.py b/gateway-bot/router_client.py index f6f63d35..1e01aad4 100644 --- a/gateway-bot/router_client.py +++ b/gateway-bot/router_client.py @@ -20,6 +20,46 @@ except ImportError: # Router configuration from environment ROUTER_BASE_URL = os.getenv("ROUTER_URL", "http://127.0.0.1:9102") ROUTER_TIMEOUT = float(os.getenv("ROUTER_TIMEOUT", "180.0")) +GATEWAY_MAX_TOKENS_DEFAULT = int(os.getenv("GATEWAY_MAX_TOKENS_DEFAULT", "700")) +GATEWAY_MAX_TOKENS_CONCISE = int(os.getenv("GATEWAY_MAX_TOKENS_CONCISE", "220")) +GATEWAY_MAX_TOKENS_TRAINING = int(os.getenv("GATEWAY_MAX_TOKENS_TRAINING", "900")) +GATEWAY_TEMPERATURE_DEFAULT = float(os.getenv("GATEWAY_TEMPERATURE_DEFAULT", "0.4")) +GATEWAY_MAX_TOKENS_SENPAI_DEFAULT = int(os.getenv("GATEWAY_MAX_TOKENS_SENPAI_DEFAULT", "320")) +GATEWAY_MAX_TOKENS_DETAILED = int(os.getenv("GATEWAY_MAX_TOKENS_DETAILED", "900")) + + +def _apply_runtime_communication_guardrails(system_prompt: str, metadata: Dict[str, Any]) -> str: + """Apply global communication constraints for all agents in Telegram flows.""" + if not system_prompt: + return system_prompt + + lang_label = (metadata or {}).get("preferred_response_language_label") or "user language" + guardrail = ( + "\n\n[GLOBAL COMMUNICATION POLICY]\n" + "1) Do not introduce yourself by name in every message.\n" + "2) Do not add repetitive generic closers like 'how can I help' unless user explicitly asks.\n" + "3) Continue the dialog naturally from context.\n" + f"4) Respond in {lang_label}, matching the user's latest language.\n" + ) + return system_prompt + guardrail + + +def _apply_agent_style_guardrails(agent_id: str, system_prompt: str) -> str: + """Apply lightweight runtime style constraints for specific agents.""" + if not system_prompt: + return system_prompt + + if agent_id == "nutra": + nutra_guardrail = ( + "\n\n[STYLE LOCK - NUTRA]\n" + "Always write in first-person singular and feminine form.\n" + "Use feminine wording in Ukrainian/Russian (e.g., 'я підготувала', 'я готова', " + "'я зрозуміла').\n" + "Never switch to masculine forms (e.g., 'понял', 'готов').\n" + ) + return system_prompt + nutra_guardrail + + return system_prompt async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]: @@ -32,6 +72,8 @@ async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]: context = body.get("context", {}) system_prompt = body.get("system_prompt") or context.get("system_prompt") + system_prompt = _apply_agent_style_guardrails(agent_id, system_prompt) + system_prompt = _apply_runtime_communication_guardrails(system_prompt, metadata) if system_prompt: logger.info(f"Using system prompt ({len(system_prompt)} chars) for agent {agent_id}") @@ -39,10 +81,28 @@ async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]: infer_url = f"{ROUTER_BASE_URL}/v1/agents/{agent_id}/infer" metadata["agent_id"] = agent_id + # Keep defaults moderate to avoid overly long replies while preserving flexibility. + max_tokens = GATEWAY_MAX_TOKENS_DEFAULT + + # Senpai tends to over-verbose responses in Telegram; use lower default unless user asked details. + if agent_id == "senpai": + max_tokens = GATEWAY_MAX_TOKENS_SENPAI_DEFAULT + + if metadata.get("is_training_group"): + max_tokens = GATEWAY_MAX_TOKENS_TRAINING + + if metadata.get("force_detailed"): + max_tokens = max(max_tokens, GATEWAY_MAX_TOKENS_DETAILED) + + if metadata.get("force_concise"): + max_tokens = min(max_tokens, GATEWAY_MAX_TOKENS_CONCISE) + infer_body = { "prompt": message, "system_prompt": system_prompt, - "metadata": metadata + "metadata": metadata, + "max_tokens": max_tokens, + "temperature": float(metadata.get("temperature_override", GATEWAY_TEMPERATURE_DEFAULT)), } images = context.get("images", []) @@ -54,7 +114,10 @@ async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]: infer_body["provider_override"] = metadata["provider"] prov = metadata.get("provider", "default") - logger.info(f"Sending to Router ({infer_url}): agent={agent_id}, provider={prov}, has_images={bool(images)}, prompt_len={len(message)}") + logger.info( + f"Sending to Router ({infer_url}): agent={agent_id}, provider={prov}, " + f"has_images={bool(images)}, prompt_len={len(message)}, max_tokens={max_tokens}" + ) try: async with httpx.AsyncClient(timeout=ROUTER_TIMEOUT) as client: @@ -74,12 +137,18 @@ async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]: "ok": True, "data": { "text": result.get("response", result.get("text", "")), - "image_base64": result.get("image_base64") + "image_base64": result.get("image_base64"), + "file_base64": result.get("file_base64"), + "file_name": result.get("file_name"), + "file_mime": result.get("file_mime"), }, "response": result.get("response", result.get("text", "")), "model": result.get("model"), "backend": result.get("backend"), - "image_base64": result.get("image_base64") + "image_base64": result.get("image_base64"), + "file_base64": result.get("file_base64"), + "file_name": result.get("file_name"), + "file_mime": result.get("file_mime"), } except httpx.TimeoutException as e: diff --git a/services/router/agent_tools_config.py b/services/router/agent_tools_config.py index 1c680936..6493e7e8 100644 --- a/services/router/agent_tools_config.py +++ b/services/router/agent_tools_config.py @@ -26,45 +26,64 @@ FULL_STANDARD_STACK = [ "presentation_create", "presentation_status", "presentation_download", + + # File artifacts + "file_tool", ] # Specialized tools per agent (on top of standard stack) AGENT_SPECIALIZED_TOOLS = { # Helion - Energy platform # Specialized: energy calculations, solar/wind analysis - "helion": [], + "helion": ['comfy_generate_image', 'comfy_generate_video'], # Alateya - R&D Lab OS # Specialized: experiment tracking, hypothesis testing - "alateya": [], + "alateya": ['comfy_generate_image', 'comfy_generate_video'], # Nutra - Health & Nutrition # Specialized: nutrition calculations, supplement analysis - "nutra": [], + "nutra": ['comfy_generate_image', 'comfy_generate_video'], # AgroMatrix - Agriculture # Specialized: crop analysis, weather integration, field mapping - "agromatrix": [], + "agromatrix": ['comfy_generate_image', 'comfy_generate_video'], # GreenFood - Food & Eco # Specialized: recipe analysis, eco-scoring - "greenfood": [], + "greenfood": ['comfy_generate_image', 'comfy_generate_video'], # Druid - Knowledge Search # Specialized: deep RAG, document comparison - "druid": [], + "druid": ['comfy_generate_image', 'comfy_generate_video'], # DaarWizz - DAO Coordination # Specialized: governance tools, voting, treasury - "daarwizz": [], + "daarwizz": ['comfy_generate_image', 'comfy_generate_video'], # Clan - Community # Specialized: event management, polls, member tracking - "clan": [], + "clan": ['comfy_generate_image', 'comfy_generate_video'], # Eonarch - Philosophy & Evolution # Specialized: concept mapping, timeline analysis - "eonarch": [], + "eonarch": ['comfy_generate_image', 'comfy_generate_video'], + + # SenpAI (Gordon Senpai) - Trading & Markets + # Specialized: real-time market data, features, signals + "senpai": ['market_data', 'comfy_generate_image', 'comfy_generate_video'], + + # Soul / Athena - Spiritual Mentor + "soul": ['comfy_generate_image', 'comfy_generate_video'], + + # Yaromir - Tech Lead + "yaromir": ['comfy_generate_image', 'comfy_generate_video'], + + # Sofiia - Chief AI Architect + "sofiia": ['comfy_generate_image', 'comfy_generate_video'], + + # Daarion - Media Generation + "daarion": ['comfy_generate_image', 'comfy_generate_video'], } # CrewAI team structure per agent (future implementation) diff --git a/services/router/main.py b/services/router/main.py index 8050a7dd..4056bf11 100644 --- a/services/router/main.py +++ b/services/router/main.py @@ -9,6 +9,7 @@ import re import yaml import httpx import logging +import hashlib import time # For latency metrics # CrewAI Integration @@ -40,6 +41,30 @@ except ImportError: logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +TRUSTED_DOMAINS_CONFIG_PATH = os.getenv("TRUSTED_DOMAINS_CONFIG_PATH", "./trusted_domains.yml") +_trusted_domains_cache: Dict[str, Any] = {"mtime": None, "data": {}} + + +def _load_trusted_domains_overrides() -> Dict[str, Any]: + """Load optional trusted domains overrides editable by mentors.""" + global _trusted_domains_cache + try: + if not os.path.exists(TRUSTED_DOMAINS_CONFIG_PATH): + return {} + mtime = os.path.getmtime(TRUSTED_DOMAINS_CONFIG_PATH) + if _trusted_domains_cache.get("mtime") == mtime: + return _trusted_domains_cache.get("data") or {} + + with open(TRUSTED_DOMAINS_CONFIG_PATH, "r", encoding="utf-8") as f: + raw = yaml.safe_load(f) or {} + if not isinstance(raw, dict): + raw = {} + _trusted_domains_cache = {"mtime": mtime, "data": raw} + return raw + except Exception as e: + logger.warning(f"⚠️ Failed to load trusted domains overrides: {e}") + return {} + def _strip_dsml_keep_text_before(text: str) -> str: """If response contains DSML, return only the part before the first DSML-like tag. Otherwise return empty (caller will use fallback).""" @@ -69,6 +94,499 @@ def _strip_dsml_keep_text_before(text: str) -> str: return prefix if len(prefix) > 30 else "" + + + +def _vision_prompt_wants_web(prompt: str) -> bool: + if not prompt: + return False + p = prompt.lower() + markers = [ + "знайди", "пошукай", "пошук", "в інтернет", "в інтернеті", "у відкритих джерелах", + "що це", "що на фото", "який це", "яка це", "identify", "find online", "search web", + "назва", "бренд", "виробник", "інструкція", "дозування", "регламент", "де купити", "ціна", + ] + return any(m in p for m in markers) + + +def _vision_answer_uncertain(answer: str) -> bool: + if not answer: + return True + a = answer.lower() + uncertain_markers = [ + "ймовірно", "можливо", "схоже", "не впевнений", "не можу визначити", "важко сказати", + "probably", "maybe", "looks like", "not sure", "cannot identify" + ] + return any(m in a for m in uncertain_markers) + + +EMPTY_ANSWER_GUARD_AGENTS = {"devtools", "monitor"} + + +def _normalize_text_response(text: str) -> str: + return re.sub(r"\s+", " ", str(text or "")).strip() + + +def _needs_empty_answer_recovery(text: str) -> bool: + normalized = _normalize_text_response(text) + if not normalized: + return True + low = normalized.lower() + if len(normalized) < 8: + return True + meta_markers = ( + "the user", "user asked", "i need", "let me", "analysis", "thinking", + "користувач", "потрібно", "спочатку", "сначала" + ) + if any(m in low for m in meta_markers) and len(normalized) < 80: + return True + if normalized in {"...", "ok", "done"}: + return True + return False + + +def _image_response_needs_retry(text: str) -> bool: + normalized = _normalize_text_response(text) + if _needs_empty_answer_recovery(normalized): + return True + low = normalized.lower() + blocked_markers = ( + "не можу бачити", "не можу аналізувати зображення", "опишіть фото словами", + "cannot view images", "cannot analyze image", "as a text model" + ) + if any(m in low for m in blocked_markers): + return True + return len(normalized) < 24 + + +def _vision_response_is_blurry(text: str) -> bool: + low = _normalize_text_response(text).lower() + if not low: + return False + blurry_markers = ( + "розмит", "нечітк", "не дуже чітк", "blur", "blurry", "out of focus", "low quality" + ) + return any(m in low for m in blurry_markers) + + +def _build_image_fallback_response(agent_id: str, prompt: str = "") -> str: + if (agent_id or "").lower() == "agromatrix": + return ( + "Фото поки занадто нечітке, тому діагноз неточний. " + "Надішли, будь ласка, 2-3 чіткі фото: загальний вигляд рослини, крупний план проблемної ділянки " + "і (для листка) нижній бік. Якщо можеш, додай культуру та стадію росту." + ) + return "Я поки не бачу достатньо деталей на фото. Надішли, будь ласка, чіткіше фото або крупний план об'єкта." + + + +def _sanitize_vision_text_for_user(text: str) -> str: + if not text: + return "" + normalized = re.sub(r"\s+", " ", str(text)).strip() + if not normalized: + return "" + + sentences = [seg.strip() for seg in re.split(r"(?<=[.!?])\s+", normalized) if seg.strip()] + meta_markers = ( + "okay", "the user", "user sent", "they want", "i need", "let me", "i will", + "first, look at the image", "look at the image", "first, analyze", + "first, looking at the image", "looking at the image", + "хорошо", "користувач", "пользователь", "потрібно", "нужно", "спочатку", "сначала" + ) + cleaned = [sent for sent in sentences if not any(m in sent.lower() for m in meta_markers)] + if cleaned: + out = " ".join(cleaned[:3]).strip() + else: + # If text is only meta-reasoning, prefer empty over leaking service text to user. + if any(m in normalized.lower() for m in meta_markers): + return "" + out = " ".join(sentences[:3]).strip() + + if len(out) > 700: + out = out[:700].rsplit(" ", 1)[0] + "..." + return out + + +def _extract_vision_search_facts(text: str, max_chars: int = 220) -> str: + fact = _sanitize_vision_text_for_user(text) + + # If sanitizer dropped everything (meta-only), try to recover object phrase. + if not fact and text: + raw = re.sub(r"\s+", " ", str(text)).strip() + raw = re.sub(r"(?i)^okay,?\s*", "", raw) + raw = re.sub(r"(?i)^let\'s\s+see\.?\s*", "", raw) + raw = re.sub(r"(?i)^the user sent (an image|a photo|a picture) of\s+", "", raw) + raw = re.sub(r"(?i)^user sent (an image|a photo|a picture) of\s+", "", raw) + raw = re.sub(r"(?i)^an image of\s+", "", raw) + raw = re.sub(r"(?i)they want.*$", "", raw).strip(" .") + fact = raw + + if not fact: + return "" + + fact = re.sub(r"(?i)джерела\s*:\s*.*$", "", fact).strip() + fact = re.sub(r"[*_`#\[\]()]", "", fact) + fact = re.sub(r"\s{2,}", " ", fact).strip(" .,") + if len(fact) > max_chars: + fact = fact[:max_chars].rsplit(" ", 1)[0] + return fact + + +def _build_vision_web_query(prompt: str, vision_text: str) -> str: + # Keep query compact and deterministic for web_search tool. + source_intent = any(k in (prompt or "").lower() for k in ("джерел", "підтвердж", "source", "reference")) + prompt_part = (prompt or "").strip() + # Remove generic question wrappers that pollute search quality. + prompt_part = re.sub(r"(?i)що\s*це\s*на\s*фото\??", "", prompt_part).strip(" .") + prompt_part = re.sub(r"(?i)дай\s*2-?3\s*джерела", "", prompt_part).strip(" .") + prompt_part = re.sub(r"(?i)дай\s*\d+\s*джерел[а-я]*\s*для", "", prompt_part).strip(" .") + prompt_part = re.sub(r"(?i)дай\s*\d+\s*джерел[а-я]*", "", prompt_part).strip(" .") + prompt_part = re.sub(r"(?i)знайди\s*в\s*інтернеті", "", prompt_part).strip(" .") + prompt_part = re.sub(r"(?i)знайди\s*в\s*інтернеті\s*схожі\s*джерела", "", prompt_part).strip(" .") + prompt_part = re.sub(r"(?i)підтвердження", "", prompt_part).strip(" .") + prompt_part = re.sub(r"(?i)якщо\s*не\s*впевнений.*$", "", prompt_part).strip(" .") + prompt_part = re.sub(r"(?i)пошукай.*$", "", prompt_part).strip(" .") + prompt_part = re.sub(r"(?iu)\bі\b\.?$", "", prompt_part).strip(" .") + + vision_part = _extract_vision_search_facts(vision_text) + if vision_part: + tokens = re.findall(r"[a-zA-Zа-яА-ЯіїєІЇЄ0-9]{3,}", vision_part.lower()) + generic_tokens = { + "first", "look", "image", "photo", "picture", "context", "the", "and", + "спочатку", "подивись", "зображення", "фото", "картинка", "контекст", + } + if len(tokens) < 3 or len(vision_part) < 18 or all(t in generic_tokens for t in tokens): + # Too vague entity extraction (e.g., single word "rex") -> skip web enrichment. + return "" + if vision_part: + if prompt_part: + q = f"{vision_part}. контекст: {prompt_part}".strip(" .") + else: + q = vision_part + if source_intent: + q = f"{q} wikipedia encyclopedia" + return q.strip() + return "" + + +def _compact_web_search_result(raw: str, query: str = "", agent_id: str = "", max_chars: int = 900) -> str: + if not raw: + return "" + text = str(raw).strip() + if not text: + return "" + + def _extract_domain(url: str) -> str: + if not url: + return "" + d = url.lower().strip() + d = d.replace("https://", "").replace("http://", "") + d = d.split("/")[0] + if d.startswith("www."): + d = d[4:] + return d + + low_signal_tokens = ( + "grid maker", "converter", "convert", "download", "wallpaper", "stock photo", + "instagram", "pinterest", "tiktok", "youtube", "facebook", "generator", "meme", + ) + low_signal_domains = ( + "pinterest.com", "instagram.com", "tiktok.com", "youtube.com", + "facebook.com", "vk.com", "yandex.", "stackexchange.com", + "zhihu.com", "baidu.com", + ) + trusted_common_domains = ( + "wikipedia.org", "wikidata.org", "britannica.com", + "who.int", "fao.org", "oecd.org", "worldbank.org", "un.org", "europa.eu", + "nature.com", "science.org", "sciencedirect.com", "springer.com", + ) + trusted_agro_domains = ( + "fao.org", "europa.eu", "ec.europa.eu", "usda.gov", "nass.usda.gov", + "ukragroconsult.com", "minagro.gov.ua", "rada.gov.ua", "kmu.gov.ua", + "agroportal.ua", "latifundist.com", "kurkul.com", + ) + trusted_by_agent = { + "agromatrix": trusted_agro_domains, + "alateya": ( + "europa.eu", "un.org", "worldbank.org", "oecd.org", + ), + "clan": ( + "europa.eu", "un.org", "wikipedia.org", + ), + "daarwizz": ( + "openai.com", "anthropic.com", "mistral.ai", "huggingface.co", + "python.org", "github.com", + ), + "devtools": ( + "github.com", "docs.python.org", "pypi.org", "docker.com", + "kubernetes.io", "fastapi.tiangolo.com", "postgresql.org", + ), + "druid": ( + "who.int", "nih.gov", "ncbi.nlm.nih.gov", "wikipedia.org", + ), + "eonarch": ( + "iea.org", "irena.org", "entsoe.eu", "europa.eu", "worldbank.org", + ), + "greenfood": ( + "fao.org", "who.int", "efsa.europa.eu", "usda.gov", "ec.europa.eu", + ), + "senpai": ( + "binance.com", "bybit.com", "coinbase.com", "kraken.com", + "coindesk.com", "cointelegraph.com", "tradingview.com", + "cftc.gov", "sec.gov", "esma.europa.eu", + ), + "sofiia": ( + "who.int", "nih.gov", "ncbi.nlm.nih.gov", "ema.europa.eu", + "fda.gov", "mayoclinic.org", "nhs.uk", + ), + "helion": ( + "iea.org", "irena.org", "entsoe.eu", "europa.eu", "worldbank.org", + ), + "nutra": ( + "fao.org", "who.int", "efsa.europa.eu", "fda.gov", + ), + "microdao_orchestrator": ( + "openai.com", "anthropic.com", "mistral.ai", "github.com", + "europa.eu", "un.org", "worldbank.org", + ), + "monitor": ( + "grafana.com", "prometheus.io", "elastic.co", "datadoghq.com", + "opentelemetry.io", + ), + "soul": ( + "who.int", "nih.gov", "ncbi.nlm.nih.gov", "wikipedia.org", + ), + "yaromir": ( + "europa.eu", "un.org", "worldbank.org", "wikipedia.org", + ), + } + def _norm_domain_entry(value: Any) -> str: + if isinstance(value, dict): + value = value.get("url") or value.get("domain") or "" + value = str(value or "").strip().lower() + if not value: + return "" + value = value.replace("https://", "").replace("http://", "") + value = value.split("/")[0] + if value.startswith("www."): + value = value[4:] + return value + + def _norm_domain_list(values: Any) -> List[str]: + out: List[str] = [] + if not isinstance(values, list): + return out + for v in values: + d = _norm_domain_entry(v) + if d: + out.append(d) + return out + + overrides = _load_trusted_domains_overrides() + extra_low_signal = _norm_domain_list(overrides.get("low_signal_domains")) + if extra_low_signal: + low_signal_domains = tuple(dict.fromkeys([*low_signal_domains, *extra_low_signal])) + extra_common = _norm_domain_list(overrides.get("common_domains")) + if extra_common: + trusted_common_domains = tuple(dict.fromkeys([*trusted_common_domains, *extra_common])) + agents_overrides = overrides.get("agents") if isinstance(overrides.get("agents"), dict) else {} + for a, cfg in agents_overrides.items(): + if not isinstance(cfg, dict): + continue + doms = _norm_domain_list(cfg.get("domains")) + if doms: + base = trusted_by_agent.get(str(a).lower(), ()) + merged = tuple(dict.fromkeys([*base, *doms])) + trusted_by_agent[str(a).lower()] = merged + agro_query_terms = { + "агро", "agro", "crop", "crops", "fertilizer", "fertilizers", + "field", "soil", "harvest", "yield", "pesticide", "herbicide", + "farm", "farming", "tractor", "зерно", "пшениц", "кукурудз", + "соняшник", "ріпак", "врожай", "ґрунт", "поле", "добрив", + "насіння", "ззр", "фермер", + } + query_terms = {t for t in re.findall(r"[a-zA-Zа-яА-ЯіїєІЇЄ0-9]{3,}", (query or "").lower())} + agro_mode = any(any(k in term for k in agro_query_terms) for term in query_terms) + agent_trusted_domains = trusted_by_agent.get((agent_id or "").lower(), ()) + + # Parse bullet blocks from tool output. + chunks = [] + current = [] + for line in text.splitlines(): + ln = line.rstrip() + if ln.startswith("- ") and current: + chunks.append("\n".join(current)) + current = [ln] + else: + current.append(ln) + if current: + chunks.append("\n".join(current)) + + scored = [] + for chunk in chunks: + lines = [ln.strip() for ln in chunk.splitlines() if ln.strip()] + title = lines[0][2:].strip() if lines and lines[0].startswith("- ") else (lines[0] if lines else "") + url_line = next((ln for ln in lines if ln.lower().startswith("url:")), "") + url = url_line.split(":", 1)[1].strip() if ":" in url_line else "" + domain = _extract_domain(url) + text_blob = " ".join(lines).lower() + + if any(x in domain for x in low_signal_domains): + continue + + score = 0 + for t in query_terms: + if t in text_blob: + score += 2 + if any(tok in text_blob for tok in low_signal_tokens): + score -= 3 + if domain.endswith(".gov") or domain.endswith(".gov.ua") or domain.endswith(".edu"): + score += 2 + if any(domain == d or domain.endswith("." + d) for d in trusted_common_domains): + score += 2 + if any(domain == d or domain.endswith("." + d) for d in agent_trusted_domains): + score += 2 + if any(domain.endswith(d) for d in ("wikipedia.org", "wikidata.org", "fao.org", "europa.eu")): + score += 2 + if agro_mode: + if any(domain == d or domain.endswith("." + d) for d in trusted_agro_domains): + score += 3 + else: + score -= 1 + if not url: + score -= 1 + if len(title) < 6: + score -= 1 + + scored.append((score, domain, chunk)) + + def _is_trusted_agro(domain: str) -> bool: + if not domain: + return False + if any(domain == d or domain.endswith("." + d) for d in trusted_common_domains): + return True + return any(domain == d or domain.endswith("." + d) for d in trusted_agro_domains) + + scored.sort(key=lambda x: x[0], reverse=True) + kept = [] + seen_domains = set() + if agro_mode: + for s, domain, chunk in scored: + if s < 1 or not _is_trusted_agro(domain): + continue + if domain and domain in seen_domains: + continue + if domain: + seen_domains.add(domain) + kept.append(chunk) + if len(kept) >= 3: + break + + if kept: + compact = "\n\n".join(kept).strip() + if len(compact) > max_chars: + compact = compact[:max_chars].rstrip() + "..." + return compact + + for s, domain, chunk in scored: + if s < 2: + continue + if domain and domain in seen_domains: + continue + if domain: + seen_domains.add(domain) + kept.append(chunk) + if len(kept) >= 3: + break + if not kept: + return "" + + compact = "\n\n".join(kept).strip() + if len(compact) > max_chars: + compact = compact[:max_chars].rstrip() + "..." + return compact + + +def _extract_sources_from_compact(compact: str, max_items: int = 3) -> List[Dict[str, str]]: + if not compact: + return [] + items: List[Dict[str, str]] = [] + chunks = [c for c in compact.split("\n\n") if c.strip()] + for chunk in chunks: + lines = [ln.strip() for ln in chunk.splitlines() if ln.strip()] + if not lines: + continue + title = lines[0][2:].strip() if lines[0].startswith("- ") else lines[0] + url_line = next((ln for ln in lines if ln.lower().startswith("url:")), "") + url = url_line.split(":", 1)[1].strip() if ":" in url_line else "" + if not url: + continue + items.append({"title": title[:180], "url": url[:500]}) + if len(items) >= max_items: + break + return items + +def _condition_matches(cond: Dict[str, Any], agent_id: str, metadata: Dict[str, Any]) -> bool: + """Minimal matcher for router-config `when` conditions.""" + if not isinstance(cond, dict): + return True + + meta = metadata or {} + + if "agent" in cond and cond.get("agent") != agent_id: + return False + + if "mode" in cond and meta.get("mode") != cond.get("mode"): + return False + + if "metadata_has" in cond: + key = cond.get("metadata_has") + if key not in meta: + return False + + if "metadata_equals" in cond: + eq = cond.get("metadata_equals") or {} + for k, v in eq.items(): + if meta.get(k) != v: + return False + + if "task_type" in cond: + expected = cond.get("task_type") + actual = meta.get("task_type") + if isinstance(expected, list): + if actual not in expected: + return False + elif actual != expected: + return False + + if "api_key_available" in cond: + env_name = cond.get("api_key_available") + if not (isinstance(env_name, str) and os.getenv(env_name)): + return False + + if "and" in cond: + clauses = cond.get("and") or [] + if not isinstance(clauses, list): + return False + for clause in clauses: + if not _condition_matches(clause, agent_id, meta): + return False + + return True + + +def _select_default_llm(agent_id: str, metadata: Dict[str, Any], base_llm: str, routing_rules: List[Dict[str, Any]]) -> str: + """Select LLM by first matching routing rule with `use_llm`.""" + for rule in routing_rules: + when = rule.get("when", {}) + if _condition_matches(when, agent_id, metadata): + use_llm = rule.get("use_llm") + if use_llm: + logger.info(f"🎯 Agent {agent_id} routing rule {rule.get('id', '')} -> {use_llm}") + return use_llm + return base_llm + app = FastAPI(title="DAARION Router", version="2.0.0") # Configuration @@ -404,6 +922,9 @@ class InferResponse(BaseModel): tokens_used: Optional[int] = None backend: str image_base64: Optional[str] = None # Generated image in base64 format + file_base64: Optional[str] = None + file_name: Optional[str] = None + file_mime: Optional[str] = None @@ -675,13 +1196,14 @@ async def agent_infer(agent_id: str, request: InferRequest): # Get system prompt from database or config system_prompt = request.system_prompt - # Debug logging for system prompt + system_prompt_source = "request" if system_prompt: logger.info(f"📝 Received system_prompt from request: {len(system_prompt)} chars") logger.debug(f"System prompt preview: {system_prompt[:200]}...") else: - logger.warning(f"⚠️ No system_prompt in request for agent {agent_id}, trying to load...") - + system_prompt_source = "city_service" + logger.info(f"ℹ️ No system_prompt in request for agent {agent_id}, loading from configured sources") + if not system_prompt: try: from prompt_builder import get_agent_system_prompt @@ -694,8 +1216,26 @@ async def agent_infer(agent_id: str, request: InferRequest): except Exception as e: logger.warning(f"⚠️ Could not load prompt from database: {e}") # Fallback to config + system_prompt_source = "router_config" agent_config = router_config.get("agents", {}).get(agent_id, {}) system_prompt = agent_config.get("system_prompt") + + if not system_prompt: + system_prompt_source = "empty" + logger.warning(f"⚠️ System prompt unavailable for {agent_id}; continuing with provider defaults") + + system_prompt_hash = hashlib.sha256((system_prompt or "").encode("utf-8")).hexdigest()[:12] + effective_metadata = dict(metadata) + effective_metadata["system_prompt_hash"] = system_prompt_hash + effective_metadata["system_prompt_source"] = system_prompt_source + effective_metadata["system_prompt_version"] = ( + metadata.get("system_prompt_version") + or f"{agent_id}:{system_prompt_hash}" + ) + logger.info( + f"🧩 Prompt meta for {agent_id}: source={system_prompt_source}, " + f"version={effective_metadata['system_prompt_version']}, hash={system_prompt_hash}" + ) # Determine which backend to use # Use router config to get default model for agent, fallback to qwen3:8b @@ -713,8 +1253,8 @@ async def agent_infer(agent_id: str, request: InferRequest): agent_id=agent_id, prompt=request.prompt, agent_config=agent_config, - force_crewai=request.metadata.get("force_crewai", False) if request.metadata else False, - + metadata=effective_metadata, + force_crewai=effective_metadata.get("force_crewai", False), ) logger.info(f"🎭 CrewAI decision for {agent_id}: {use_crewai} ({crewai_reason})") @@ -727,7 +1267,12 @@ async def agent_infer(agent_id: str, request: InferRequest): context={ "memory_brief": memory_brief_text, "system_prompt": system_prompt, - "metadata": metadata, + "system_prompt_meta": { + "source": system_prompt_source, + "version": effective_metadata.get("system_prompt_version"), + "hash": system_prompt_hash, + }, + "metadata": effective_metadata, }, team=crewai_cfg.get("team") ) @@ -755,9 +1300,8 @@ async def agent_infer(agent_id: str, request: InferRequest): return InferResponse( response=crew_result["result"], model="crewai-" + agent_id, - provider="crewai", - tokens_used=0, - latency_ms=int(latency * 1000) + backend="crewai", + tokens_used=0 ) else: logger.warning(f"⚠️ CrewAI failed, falling back to direct LLM") @@ -765,15 +1309,9 @@ async def agent_infer(agent_id: str, request: InferRequest): logger.exception(f"❌ CrewAI error: {e}, falling back to direct LLM") default_llm = agent_config.get("default_llm", "qwen3:8b") - - # Check if there's a routing rule for this agent + routing_rules = router_config.get("routing", []) - for rule in routing_rules: - if rule.get("when", {}).get("agent") == agent_id: - if "use_llm" in rule: - default_llm = rule.get("use_llm") - logger.info(f"🎯 Agent {agent_id} routing to: {default_llm}") - break + default_llm = _select_default_llm(agent_id, metadata, default_llm, routing_rules) # Get LLM profile configuration llm_profiles = router_config.get("llm_profiles", {}) @@ -819,15 +1357,114 @@ async def agent_infer(agent_id: str, request: InferRequest): if vision_resp.status_code == 200: vision_data = vision_resp.json() - full_response = vision_data.get("text", "") - + raw_response = vision_data.get("text", "") + full_response = _sanitize_vision_text_for_user(raw_response) + vision_web_query = "" + vision_sources: List[Dict[str, str]] = [] + # Debug: log full response structure - logger.info(f"✅ Vision response: {len(full_response)} chars, success={vision_data.get('success')}, keys={list(vision_data.keys())}") + logger.info( + f"✅ Vision response: raw={len(raw_response)} chars, sanitized={len(full_response)} chars, " + f"success={vision_data.get('success')}, keys={list(vision_data.keys())}" + ) + if raw_response and not full_response: + full_response = _extract_vision_search_facts(raw_response, max_chars=280) if not full_response: logger.warning(f"⚠️ Empty vision response! Full data: {str(vision_data)[:500]}") + + # Optional vision -> web enrichment (soft policy): + # if prompt explicitly asks to search online OR vision answer is uncertain. + if (full_response or raw_response) and TOOL_MANAGER_AVAILABLE and tool_manager: + try: + wants_web = _vision_prompt_wants_web(request.prompt) + uncertain = _vision_answer_uncertain(full_response or raw_response) + if wants_web or uncertain: + query = _build_vision_web_query(request.prompt, full_response or raw_response) + if not query: + logger.info("🔎 Vision web enrich skipped: query not actionable") + else: + vision_web_query = query + search_result = await tool_manager.execute_tool( + "web_search", + {"query": query, "max_results": 3}, + agent_id=request_agent_id, + chat_id=chat_id, + user_id=user_id, + ) + if search_result and search_result.success and search_result.result: + + compact_search = _compact_web_search_result( + search_result.result, + query=query, + agent_id=request_agent_id, + ) + + if compact_search and "Нічого не знайдено" not in compact_search: + vision_sources = _extract_sources_from_compact(compact_search) + + base_text = full_response or "Не вдалося надійно ідентифікувати об'єкт на фото." + + full_response = ( + + f"{base_text}\n\n" + + f"Додатково знайшов у відкритих джерелах:\n{compact_search}" + + ) + + logger.info( + "🌐 Vision web enrichment applied " + f"for agent={request_agent_id}, wants_web={wants_web}, uncertain={uncertain}, " + f"sources={len(vision_sources)}" + ) + except Exception as e: + logger.warning(f"⚠️ Vision web enrichment failed: {e}") + + if vision_web_query: + logger.info( + f"🗂️ Vision enrichment metadata: agent={request_agent_id}, " + f"query='{vision_web_query[:180]}', sources={len(vision_sources)}" + ) + + # Image quality gate: one soft retry if response looks empty/meta. + if _image_response_needs_retry(full_response): + try: + logger.warning(f"⚠️ Vision quality gate triggered for agent={request_agent_id}, retrying once") + retry_payload = dict(vision_payload) + retry_payload["prompt"] = ( + "Опиши зображення по суті: що зображено, ключові деталі, можливий контекст. " + "Відповідай українською 2-4 реченнями, без службових фраз. " + f"Запит користувача: {request.prompt}" + ) + retry_resp = await http_client.post( + f"{SWAPPER_URL}/vision", + json=retry_payload, + timeout=120.0 + ) + if retry_resp.status_code == 200: + retry_data = retry_resp.json() + retry_raw = retry_data.get("text", "") + retry_text = _sanitize_vision_text_for_user(retry_raw) + if retry_raw and not retry_text: + retry_text = _extract_vision_search_facts(retry_raw, max_chars=280) + if retry_text and not _image_response_needs_retry(retry_text): + full_response = retry_text + logger.info(f"✅ Vision quality retry improved response for agent={request_agent_id}") + except Exception as e: + logger.warning(f"⚠️ Vision quality retry failed: {e}") + + if _image_response_needs_retry(full_response): + full_response = _build_image_fallback_response(request_agent_id, request.prompt) + elif request_agent_id == "agromatrix" and _vision_response_is_blurry(full_response): + full_response = _build_image_fallback_response(request_agent_id, request.prompt) # Store vision message in agent-specific memory if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id and full_response: + vision_meta: Dict[str, Any] = {} + if vision_web_query: + vision_meta["vision_search_query"] = vision_web_query[:500] + if vision_sources: + vision_meta["vision_sources"] = vision_sources asyncio.create_task( memory_retrieval.store_message( agent_id=request_agent_id, @@ -836,7 +1473,8 @@ async def agent_infer(agent_id: str, request: InferRequest): message_text=f"[Image] {request.prompt}", response_text=full_response, chat_id=chat_id, - message_type="vision" + message_type="vision", + metadata=vision_meta if vision_meta else None, ) ) @@ -848,11 +1486,21 @@ async def agent_infer(agent_id: str, request: InferRequest): ) else: logger.error(f"❌ Swapper vision error: {vision_resp.status_code} - {vision_resp.text[:200]}") - # Fall through to text processing + return InferResponse( + response=_build_image_fallback_response(request_agent_id, request.prompt), + model="qwen3-vl-8b", + tokens_used=None, + backend="swapper-vision-fallback" + ) except Exception as e: logger.error(f"❌ Vision processing failed: {e}", exc_info=True) - # Fall through to text processing + return InferResponse( + response=_build_image_fallback_response(request_agent_id, request.prompt), + model="qwen3-vl-8b", + tokens_used=None, + backend="swapper-vision-fallback" + ) # ========================================================================= # SMART LLM ROUTER WITH AUTO-FALLBACK @@ -881,6 +1529,10 @@ async def agent_infer(agent_id: str, request: InferRequest): max_tokens = request.max_tokens or llm_profile.get("max_tokens", 2048) temperature = request.temperature or llm_profile.get("temperature", 0.2) + cloud_provider_names = {"deepseek", "mistral", "grok", "openai", "anthropic"} + allow_cloud = provider in cloud_provider_names + if not allow_cloud: + logger.info(f"☁️ Cloud providers disabled for agent {agent_id}: provider={provider}") # Define cloud providers with fallback order cloud_providers = [ { @@ -905,7 +1557,10 @@ async def agent_infer(agent_id: str, request: InferRequest): "timeout": 60 } ] - + + if not allow_cloud: + cloud_providers = [] + # If specific provider requested, try it first if provider in ["deepseek", "mistral", "grok"]: # Reorder to put requested provider first @@ -916,7 +1571,7 @@ async def agent_infer(agent_id: str, request: InferRequest): # Get tool definitions if Tool Manager is available tools_payload = None if TOOL_MANAGER_AVAILABLE and tool_manager: - tools_payload = tool_manager.get_tool_definitions() + tools_payload = tool_manager.get_tool_definitions(request_agent_id) logger.debug(f"🔧 {len(tools_payload)} tools available for function calling") for cloud in cloud_providers: @@ -1034,14 +1689,23 @@ async def agent_infer(agent_id: str, request: InferRequest): except: tool_args = {} - result = await tool_manager.execute_tool(tool_name, tool_args, agent_id=request_agent_id) + result = await tool_manager.execute_tool( + tool_name, + tool_args, + agent_id=request_agent_id, + chat_id=chat_id, + user_id=user_id, + ) tool_result_dict = { "tool_call_id": tc.get("id", ""), "name": tool_name, "success": result.success, "result": result.result, "error": result.error, - "image_base64": result.image_base64 # Store image if generated + "image_base64": result.image_base64, # Store image if generated + "file_base64": result.file_base64, + "file_name": result.file_name, + "file_mime": result.file_mime, } if result.image_base64: logger.info(f"🖼️ Tool {tool_name} generated image: {len(result.image_base64)} chars") @@ -1149,14 +1813,22 @@ async def agent_infer(agent_id: str, request: InferRequest): # Check if any tool generated an image generated_image = None + generated_file_base64 = None + generated_file_name = None + generated_file_mime = None logger.debug(f"🔍 Checking {len(tool_results)} tool results for images...") for tr in tool_results: img_b64 = tr.get("image_base64") if img_b64: generated_image = img_b64 logger.info(f"🖼️ Image generated by tool: {tr['name']} ({len(img_b64)} chars)") - break - else: + file_b64 = tr.get("file_base64") + if file_b64 and not generated_file_base64: + generated_file_base64 = file_b64 + generated_file_name = tr.get("file_name") + generated_file_mime = tr.get("file_mime") + logger.info(f"📄 File generated by tool: {tr['name']} ({len(file_b64)} chars)") + if not img_b64: logger.debug(f" Tool {tr['name']}: no image_base64") logger.info(f"✅ {cloud['name'].upper()} response received, {tokens_used} tokens") @@ -1179,7 +1851,10 @@ async def agent_infer(agent_id: str, request: InferRequest): model=cloud["model"], tokens_used=tokens_used, backend=f"{cloud['name']}-cloud", - image_base64=generated_image + image_base64=generated_image, + file_base64=generated_file_base64, + file_name=generated_file_name, + file_mime=generated_file_mime, ) else: logger.warning(f"⚠️ {cloud['name'].upper()} returned empty response, trying next provider") @@ -1253,7 +1928,38 @@ async def agent_infer(agent_id: str, request: InferRequest): if generate_resp.status_code == 200: data = generate_resp.json() - local_response = data.get("response", "") + local_response = _normalize_text_response(data.get("response", "")) + + # Empty-answer gate for selected local top-level agents. + if request_agent_id in EMPTY_ANSWER_GUARD_AGENTS and _needs_empty_answer_recovery(local_response): + logger.warning(f"⚠️ Empty-answer gate triggered for {request_agent_id}, retrying local generate once") + retry_prompt = ( + f"{request.prompt}\n\n" + "Відповідай коротко і конкретно (2-5 речень), без службових або мета-фраз." + ) + retry_resp = await http_client.post( + f"{SWAPPER_URL}/generate", + json={ + "model": local_model, + "prompt": retry_prompt, + "system": system_prompt, + "max_tokens": request.max_tokens, + "temperature": request.temperature, + "stream": False + }, + timeout=300.0 + ) + if retry_resp.status_code == 200: + retry_data = retry_resp.json() + retry_text = _normalize_text_response(retry_data.get("response", "")) + if retry_text and not _needs_empty_answer_recovery(retry_text): + local_response = retry_text + + if _needs_empty_answer_recovery(local_response): + local_response = ( + "Я не отримав корисну відповідь з першої спроби. " + "Сформулюй запит коротко ще раз, і я відповім конкретно." + ) # Store in agent-specific memory if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id and local_response: @@ -1649,4 +2355,3 @@ async def shutdown_event(): if nc: await nc.close() logger.info("🔌 NATS connection closed") - diff --git a/services/router/requirements.txt b/services/router/requirements.txt index f9188ad1..4f915cbf 100644 --- a/services/router/requirements.txt +++ b/services/router/requirements.txt @@ -5,6 +5,9 @@ nats-py==2.6.0 PyYAML==6.0.1 httpx>=0.25.0 neo4j>=5.14.0 +openpyxl>=3.1.2 +python-docx>=1.1.2 +pypdf>=5.1.0 # Memory Retrieval v3.0 asyncpg>=0.29.0 diff --git a/services/router/tool_manager.py b/services/router/tool_manager.py index c6cc1208..b88470a0 100644 --- a/services/router/tool_manager.py +++ b/services/router/tool_manager.py @@ -4,12 +4,20 @@ Implements OpenAI-compatible function calling for DeepSeek, Mistral, Grok """ import os +import asyncio +import uuid from agent_tools_config import get_agent_tools, is_tool_allowed import json import logging +import hashlib +import base64 +import csv import httpx from typing import Dict, List, Any, Optional from dataclasses import dataclass +from io import BytesIO, StringIO +from pathlib import PurePath +from zipfile import ZIP_DEFLATED, ZipFile logger = logging.getLogger(__name__) @@ -124,6 +132,75 @@ TOOL_DEFINITIONS = [ } } }, + { + "type": "function", + "function": { + "name": "comfy_generate_image", + "description": "🖼️ Згенерувати зображення через ComfyUI (NODE3, Stable Diffusion). Для високої якості та детальних зображень.", + "parameters": { + "type": "object", + "properties": { + "prompt": { + "type": "string", + "description": "Детальний опис зображення (англійською)" + }, + "negative_prompt": { + "type": "string", + "description": "Що НЕ включати в зображення", + "default": "blurry, low quality, watermark" + }, + "width": { + "type": "integer", + "description": "Ширина (512, 768, 1024)", + "default": 512 + }, + "height": { + "type": "integer", + "description": "Висота (512, 768, 1024)", + "default": 512 + }, + "steps": { + "type": "integer", + "description": "Кількість кроків генерації (20-50)", + "default": 28 + } + }, + "required": ["prompt"] + } + } + }, + { + "type": "function", + "function": { + "name": "comfy_generate_video", + "description": "🎬 Згенерувати відео через ComfyUI (NODE3, LTX-2). Text-to-video для коротких кліпів.", + "parameters": { + "type": "object", + "properties": { + "prompt": { + "type": "string", + "description": "Детальний опис відео (англійською)" + }, + "seconds": { + "type": "integer", + "description": "Тривалість в секундах (2-8)", + "default": 4 + }, + "fps": { + "type": "integer", + "description": "Кадри в секунду (24-30)", + "default": 24 + }, + "steps": { + "type": "integer", + "description": "Кількість кроків генерації (20-40)", + "default": 30 + } + }, + "required": ["prompt"] + } + } + }, { "type": "function", "function": { @@ -146,7 +223,7 @@ TOOL_DEFINITIONS = [ "description": "Категорія факту" } }, - "required": ["fact", "about"] + "required": ["fact"] } } }, @@ -235,6 +312,57 @@ TOOL_DEFINITIONS = [ } } }, + { + "type": "function", + "function": { + "name": "file_tool", + "description": "📁 Універсальний file tool для створення та оновлення CSV/JSON/YAML/ZIP і інших форматів через action-based API.", + "parameters": { + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": [ + "excel_create", "excel_update", "docx_create", "docx_update", + "csv_create", "csv_update", "pdf_fill", "pdf_merge", "pdf_split", + "json_export", "yaml_export", "zip_bundle" + ], + "description": "Дія file tool" + }, + "file_name": { + "type": "string", + "description": "Назва файлу-результату" + }, + "file_base64": { + "type": "string", + "description": "Вхідний файл у base64 для update-операцій" + }, + "content": { + "description": "Контент для json/yaml export" + }, + "headers": { + "type": "array", + "items": {"type": "string"}, + "description": "Заголовки для CSV" + }, + "rows": { + "type": "array", + "description": "Рядки для CSV" + }, + "entries": { + "type": "array", + "description": "Елементи для zip_bundle" + }, + "operation": { + "type": "string", + "enum": ["append", "replace"], + "description": "Режим csv_update" + } + }, + "required": ["action"] + } + } + }, # PRIORITY 5: Web Scraping tools { "type": "function", @@ -286,6 +414,30 @@ TOOL_DEFINITIONS = [ "required": ["text"] } } + }, + # PRIORITY 7: Market Data tools (SenpAI) + { + "type": "function", + "function": { + "name": "market_data", + "description": "📊 Отримати real-time ринкові дані: поточну ціну, котирування, обсяги, аналітичні фічі (VWAP, spread, volatility, trade signals). Доступні символи: BTCUSDT, ETHUSDT.", + "parameters": { + "type": "object", + "properties": { + "symbol": { + "type": "string", + "description": "Торговий символ (наприклад BTCUSDT, ETHUSDT)" + }, + "query_type": { + "type": "string", + "enum": ["price", "features", "all"], + "description": "Тип запиту: price (ціна + котирування), features (аналітичні фічі), all (все разом)", + "default": "all" + } + }, + "required": ["symbol"] + } + } } ] @@ -297,6 +449,9 @@ class ToolResult: result: Any error: Optional[str] = None image_base64: Optional[str] = None # For image generation results + file_base64: Optional[str] = None + file_name: Optional[str] = None + file_mime: Optional[str] = None class ToolManager: @@ -306,6 +461,7 @@ class ToolManager: self.config = config self.http_client = httpx.AsyncClient(timeout=60.0) self.swapper_url = os.getenv("SWAPPER_URL", "http://swapper-service:8890") + self.comfy_agent_url = os.getenv("COMFY_AGENT_URL", "http://212.8.58.133:8880") self.tools_config = self._load_tools_config() def _load_tools_config(self) -> Dict[str, Dict]: @@ -339,7 +495,14 @@ class ToolManager: logger.debug(f"Agent {agent_id} has {len(filtered)} tools: {tool_names}") return filtered - async def execute_tool(self, tool_name: str, arguments: Dict[str, Any], agent_id: str = None) -> ToolResult: + async def execute_tool( + self, + tool_name: str, + arguments: Dict[str, Any], + agent_id: str = None, + chat_id: str = None, + user_id: str = None, + ) -> ToolResult: """Execute a tool and return result. Optionally checks agent permissions.""" logger.info(f"🔧 Executing tool: {tool_name} for agent={agent_id} with args: {arguments}") @@ -351,7 +514,7 @@ class ToolManager: try: # Priority 1: Memory/Knowledge tools if tool_name == "memory_search": - return await self._memory_search(arguments, agent_id=agent_id) + return await self._memory_search(arguments, agent_id=agent_id, chat_id=chat_id, user_id=user_id) elif tool_name == "graph_query": return await self._graph_query(arguments, agent_id=agent_id) # Priority 2: Web tools @@ -361,8 +524,12 @@ class ToolManager: return await self._web_extract(arguments) elif tool_name == "image_generate": return await self._image_generate(arguments) + elif tool_name == "comfy_generate_image": + return await self._comfy_generate_image(arguments) + elif tool_name == "comfy_generate_video": + return await self._comfy_generate_video(arguments) elif tool_name == "remember_fact": - return await self._remember_fact(arguments) + return await self._remember_fact(arguments, agent_id=agent_id, chat_id=chat_id, user_id=user_id) # Priority 4: Presentation tools elif tool_name == "presentation_create": return await self._presentation_create(arguments) @@ -376,13 +543,457 @@ class ToolManager: # Priority 6: TTS tools elif tool_name == "tts_speak": return await self._tts_speak(arguments) + # Priority 6: File artifacts + elif tool_name == "file_tool": + return await self._file_tool(arguments) + # Priority 7: Market Data (SenpAI) + elif tool_name == "market_data": + return await self._market_data(arguments) else: return ToolResult(success=False, result=None, error=f"Unknown tool: {tool_name}") except Exception as e: logger.error(f"Tool execution failed: {e}") return ToolResult(success=False, result=None, error=str(e)) + + @staticmethod + def _sanitize_file_name(name: Optional[str], default_name: str, force_ext: Optional[str] = None) -> str: + raw = (name or default_name).strip() or default_name + base = PurePath(raw).name + if not base: + base = default_name + if force_ext and not base.lower().endswith(force_ext): + base = f"{base}{force_ext}" + return base + + @staticmethod + def _b64_from_bytes(data: bytes) -> str: + return base64.b64encode(data).decode("utf-8") + + @staticmethod + def _bytes_from_b64(value: str) -> bytes: + return base64.b64decode(value) + + @staticmethod + def _normalize_rows(rows: Any, headers: Optional[List[str]] = None) -> List[List[Any]]: + if not isinstance(rows, list): + return [] + out: List[List[Any]] = [] + for row in rows: + if isinstance(row, list): + out.append(row) + elif isinstance(row, dict): + keys = headers or list(row.keys()) + out.append([row.get(k, "") for k in keys]) + else: + out.append([row]) + return out + + async def _file_tool(self, args: Dict[str, Any]) -> ToolResult: + action = str((args or {}).get("action") or "").strip().lower() + if not action: + return ToolResult(success=False, result=None, error="Missing action") + + if action == "csv_create": + return self._file_csv_create(args) + if action == "csv_update": + return self._file_csv_update(args) + if action == "json_export": + return self._file_json_export(args) + if action == "yaml_export": + return self._file_yaml_export(args) + if action == "zip_bundle": + return self._file_zip_bundle(args) + if action == "docx_create": + return self._file_docx_create(args) + if action == "docx_update": + return self._file_docx_update(args) + if action == "pdf_merge": + return self._file_pdf_merge(args) + if action == "pdf_split": + return self._file_pdf_split(args) + if action == "pdf_fill": + return self._file_pdf_fill(args) + + return ToolResult(success=False, result=None, error=f"Action not implemented yet: {action}") + + def _file_csv_create(self, args: Dict[str, Any]) -> ToolResult: + file_name = self._sanitize_file_name(args.get("file_name"), "export.csv", force_ext=".csv") + headers = args.get("headers") or [] + rows_raw = args.get("rows") or [] + rows = self._normalize_rows(rows_raw, headers=headers if headers else None) + if rows and not headers and isinstance(rows_raw[0], dict): + headers = list(rows_raw[0].keys()) + rows = self._normalize_rows(rows_raw, headers=headers) + + sio = StringIO(newline="") + writer = csv.writer(sio) + if headers: + writer.writerow(headers) + for row in rows: + writer.writerow(row) + + data = sio.getvalue().encode("utf-8") + return ToolResult( + success=True, + result={"message": f"CSV created: {file_name}"}, + file_base64=self._b64_from_bytes(data), + file_name=file_name, + file_mime="text/csv", + ) + + def _file_csv_update(self, args: Dict[str, Any]) -> ToolResult: + src_b64 = args.get("file_base64") + if not src_b64: + return ToolResult(success=False, result=None, error="file_base64 is required for csv_update") + file_name = self._sanitize_file_name(args.get("file_name"), "updated.csv", force_ext=".csv") + operation = str(args.get("operation") or "append").strip().lower() + if operation not in {"append", "replace"}: + return ToolResult(success=False, result=None, error="operation must be append|replace") + + headers = args.get("headers") or [] + rows_raw = args.get("rows") or [] + rows = self._normalize_rows(rows_raw, headers=headers if headers else None) + if rows and not headers and isinstance(rows_raw[0], dict): + headers = list(rows_raw[0].keys()) + rows = self._normalize_rows(rows_raw, headers=headers) + + existing_rows: List[List[str]] = [] + text = self._bytes_from_b64(src_b64).decode("utf-8") + if text.strip(): + existing_rows = [list(r) for r in csv.reader(StringIO(text))] + + out_rows: List[List[Any]] = [] + if operation == "replace": + if headers: + out_rows.append(headers) + out_rows.extend(rows) + else: + if existing_rows: + out_rows.extend(existing_rows) + elif headers: + out_rows.append(headers) + out_rows.extend(rows) + + sio = StringIO(newline="") + writer = csv.writer(sio) + for row in out_rows: + writer.writerow(row) + data = sio.getvalue().encode("utf-8") + return ToolResult( + success=True, + result={"message": f"CSV updated: {file_name}"}, + file_base64=self._b64_from_bytes(data), + file_name=file_name, + file_mime="text/csv", + ) + + def _file_json_export(self, args: Dict[str, Any]) -> ToolResult: + file_name = self._sanitize_file_name(args.get("file_name"), "export.json", force_ext=".json") + content = args.get("content") + indent = int(args.get("indent") or 2) + payload = json.dumps(content, indent=indent, ensure_ascii=False).encode("utf-8") + return ToolResult( + success=True, + result={"message": f"JSON exported: {file_name}"}, + file_base64=self._b64_from_bytes(payload), + file_name=file_name, + file_mime="application/json", + ) + + def _file_yaml_export(self, args: Dict[str, Any]) -> ToolResult: + file_name = self._sanitize_file_name(args.get("file_name"), "export.yaml", force_ext=".yaml") + content = args.get("content") + payload = json.dumps(content, ensure_ascii=False, indent=2).encode("utf-8") + try: + import yaml + payload = yaml.safe_dump(content, allow_unicode=True, sort_keys=False).encode("utf-8") + except Exception: + # Fallback to JSON serialization if PyYAML fails. + pass + return ToolResult( + success=True, + result={"message": f"YAML exported: {file_name}"}, + file_base64=self._b64_from_bytes(payload), + file_name=file_name, + file_mime="application/x-yaml", + ) + + def _file_zip_bundle(self, args: Dict[str, Any]) -> ToolResult: + file_name = self._sanitize_file_name(args.get("file_name"), "bundle.zip", force_ext=".zip") + entries = args.get("entries") or [] + if not isinstance(entries, list) or not entries: + return ToolResult(success=False, result=None, error="entries must be non-empty array") + + out = BytesIO() + with ZipFile(out, mode="w", compression=ZIP_DEFLATED) as zf: + for idx, entry in enumerate(entries, start=1): + if not isinstance(entry, dict): + return ToolResult(success=False, result=None, error=f"entry[{idx-1}] must be object") + ename = self._sanitize_file_name(entry.get("file_name"), f"file_{idx}.bin") + if entry.get("file_base64"): + zf.writestr(ename, self._bytes_from_b64(entry["file_base64"])) + elif "text" in entry: + zf.writestr(ename, str(entry["text"]).encode("utf-8")) + elif "content" in entry: + zf.writestr(ename, json.dumps(entry["content"], ensure_ascii=False, indent=2).encode("utf-8")) + else: + return ToolResult( + success=False, + result=None, + error=f"entry[{idx-1}] requires file_base64|text|content", + ) + + return ToolResult( + success=True, + result={"message": f"ZIP bundle created: {file_name}"}, + file_base64=self._b64_from_bytes(out.getvalue()), + file_name=file_name, + file_mime="application/zip", + ) + + def _file_docx_create(self, args: Dict[str, Any]) -> ToolResult: + from docx import Document + + file_name = self._sanitize_file_name(args.get("file_name"), "document.docx", force_ext=".docx") + doc = Document() + + title = args.get("title") + if title: + doc.add_heading(str(title), level=1) + + for item in args.get("paragraphs") or []: + doc.add_paragraph(str(item)) + + for table in args.get("tables") or []: + if not isinstance(table, dict): + continue + headers = [str(h) for h in (table.get("headers") or [])] + rows_raw = table.get("rows") or [] + rows = self._normalize_rows(rows_raw, headers=headers if headers else None) + if rows and not headers and isinstance(rows_raw[0], dict): + headers = list(rows_raw[0].keys()) + rows = self._normalize_rows(rows_raw, headers=headers) + + total_rows = len(rows) + (1 if headers else 0) + total_cols = len(headers) if headers else (len(rows[0]) if rows else 1) + t = doc.add_table(rows=max(total_rows, 1), cols=max(total_cols, 1)) + + row_offset = 0 + if headers: + for idx, value in enumerate(headers): + t.cell(0, idx).text = str(value) + row_offset = 1 + for ridx, row in enumerate(rows): + for cidx, value in enumerate(row): + if cidx < total_cols: + t.cell(ridx + row_offset, cidx).text = str(value) + + out = BytesIO() + doc.save(out) + return ToolResult( + success=True, + result={"message": f"DOCX created: {file_name}"}, + file_base64=self._b64_from_bytes(out.getvalue()), + file_name=file_name, + file_mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ) + + def _file_docx_update(self, args: Dict[str, Any]) -> ToolResult: + from docx import Document + + src_b64 = args.get("file_base64") + operations = args.get("operations") or [] + if not src_b64: + return ToolResult(success=False, result=None, error="file_base64 is required for docx_update") + if not isinstance(operations, list) or not operations: + return ToolResult(success=False, result=None, error="operations must be non-empty array") + + file_name = self._sanitize_file_name(args.get("file_name"), "updated.docx", force_ext=".docx") + doc = Document(BytesIO(self._bytes_from_b64(src_b64))) + + for op in operations: + if not isinstance(op, dict): + return ToolResult(success=False, result=None, error="Each operation must be object") + op_type = str(op.get("type") or "").strip().lower() + + if op_type == "append_paragraph": + doc.add_paragraph(str(op.get("text") or "")) + elif op_type == "append_heading": + level = int(op.get("level") or 1) + level = max(1, min(level, 9)) + doc.add_heading(str(op.get("text") or ""), level=level) + elif op_type == "replace_text": + old = str(op.get("old") or "") + new = str(op.get("new") or "") + if not old: + return ToolResult(success=False, result=None, error="replace_text requires old") + for p in doc.paragraphs: + if old in p.text: + p.text = p.text.replace(old, new) + for table in doc.tables: + for row in table.rows: + for cell in row.cells: + if old in cell.text: + cell.text = cell.text.replace(old, new) + elif op_type == "append_table": + headers = [str(h) for h in (op.get("headers") or [])] + rows_raw = op.get("rows") or [] + rows = self._normalize_rows(rows_raw, headers=headers if headers else None) + if rows and not headers and isinstance(rows_raw[0], dict): + headers = list(rows_raw[0].keys()) + rows = self._normalize_rows(rows_raw, headers=headers) + + total_rows = len(rows) + (1 if headers else 0) + total_cols = len(headers) if headers else (len(rows[0]) if rows else 1) + t = doc.add_table(rows=max(total_rows, 1), cols=max(total_cols, 1)) + + row_offset = 0 + if headers: + for idx, value in enumerate(headers): + t.cell(0, idx).text = str(value) + row_offset = 1 + for ridx, row in enumerate(rows): + for cidx, value in enumerate(row): + if cidx < total_cols: + t.cell(ridx + row_offset, cidx).text = str(value) + else: + return ToolResult(success=False, result=None, error=f"Unsupported docx_update operation: {op_type}") + + out = BytesIO() + doc.save(out) + return ToolResult( + success=True, + result={"message": f"DOCX updated: {file_name}"}, + file_base64=self._b64_from_bytes(out.getvalue()), + file_name=file_name, + file_mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ) + + def _file_pdf_merge(self, args: Dict[str, Any]) -> ToolResult: + from pypdf import PdfReader, PdfWriter + + file_name = self._sanitize_file_name(args.get("file_name"), "merged.pdf", force_ext=".pdf") + files = args.get("files") or [] + if not isinstance(files, list) or not files: + return ToolResult(success=False, result=None, error="files must be non-empty array for pdf_merge") + + writer = PdfWriter() + page_count = 0 + for item in files: + if not isinstance(item, dict) or not item.get("file_base64"): + return ToolResult(success=False, result=None, error="Each file entry must include file_base64") + reader = PdfReader(BytesIO(self._bytes_from_b64(item["file_base64"]))) + for page in reader.pages: + writer.add_page(page) + page_count += 1 + + out = BytesIO() + writer.write(out) + return ToolResult( + success=True, + result={"message": f"PDF merged: {file_name} ({page_count} pages)"}, + file_base64=self._b64_from_bytes(out.getvalue()), + file_name=file_name, + file_mime="application/pdf", + ) + + @staticmethod + def _parse_split_pages(pages: Any) -> Optional[List[int]]: + if not isinstance(pages, list) or not pages: + return None + parsed: List[int] = [] + for p in pages: + idx = int(p) + if idx < 1: + return None + parsed.append(idx) + return sorted(set(parsed)) + + def _file_pdf_split(self, args: Dict[str, Any]) -> ToolResult: + from pypdf import PdfReader, PdfWriter + + src_b64 = args.get("file_base64") + if not src_b64: + return ToolResult(success=False, result=None, error="file_base64 is required for pdf_split") + file_name = self._sanitize_file_name(args.get("file_name"), "split.zip", force_ext=".zip") + + reader = PdfReader(BytesIO(self._bytes_from_b64(src_b64))) + total = len(reader.pages) + if total == 0: + return ToolResult(success=False, result=None, error="Input PDF has no pages") + + groups = args.get("groups") + split_groups = [] + if groups: + if not isinstance(groups, list): + return ToolResult(success=False, result=None, error="groups must be array") + for idx, grp in enumerate(groups, start=1): + if not isinstance(grp, dict): + return ToolResult(success=False, result=None, error="Each group must be object") + gname = self._sanitize_file_name(grp.get("file_name"), f"part_{idx}.pdf", force_ext=".pdf") + pages = self._parse_split_pages(grp.get("pages")) + if not pages: + return ToolResult(success=False, result=None, error=f"Invalid pages in group {idx}") + split_groups.append((gname, pages)) + else: + split_groups = [(f"page_{i+1}.pdf", [i + 1]) for i in range(total)] + + out = BytesIO() + with ZipFile(out, mode="w", compression=ZIP_DEFLATED) as zf: + for gname, pages in split_groups: + writer = PdfWriter() + for p in pages: + if p > total: + return ToolResult(success=False, result=None, error=f"Page out of range: {p} > {total}") + writer.add_page(reader.pages[p - 1]) + part = BytesIO() + writer.write(part) + zf.writestr(gname, part.getvalue()) + + return ToolResult( + success=True, + result={"message": f"PDF split into {len(split_groups)} file(s): {file_name}"}, + file_base64=self._b64_from_bytes(out.getvalue()), + file_name=file_name, + file_mime="application/zip", + ) + + def _file_pdf_fill(self, args: Dict[str, Any]) -> ToolResult: + from pypdf import PdfReader, PdfWriter + + src_b64 = args.get("file_base64") + fields = args.get("fields") or {} + if not src_b64: + return ToolResult(success=False, result=None, error="file_base64 is required for pdf_fill") + if not isinstance(fields, dict) or not fields: + return ToolResult(success=False, result=None, error="fields must be a non-empty object") + + file_name = self._sanitize_file_name(args.get("file_name"), "filled.pdf", force_ext=".pdf") + reader = PdfReader(BytesIO(self._bytes_from_b64(src_b64))) + writer = PdfWriter() + writer.append(reader) + + filled = True + try: + for page in writer.pages: + writer.update_page_form_field_values(page, fields) + if hasattr(writer, "set_need_appearances_writer"): + writer.set_need_appearances_writer(True) + except Exception: + filled = False + + out = BytesIO() + writer.write(out) + msg = f"PDF form filled: {file_name}" if filled else f"PDF has no fillable form fields, returned unchanged: {file_name}" + return ToolResult( + success=True, + result={"message": msg}, + file_base64=self._b64_from_bytes(out.getvalue()), + file_name=file_name, + file_mime="application/pdf", + ) - async def _memory_search(self, args: Dict, agent_id: str = None) -> ToolResult: + async def _memory_search(self, args: Dict, agent_id: str = None, chat_id: str = None, user_id: str = None) -> ToolResult: """Search in Qdrant vector memory using Router's memory_retrieval - PRIORITY 1""" query = args.get("query") @@ -394,6 +1005,8 @@ class ToolManager: results = await memory_retrieval.search_memories( query=query, agent_id=agent_id or "helion", + chat_id=chat_id, + user_id=user_id, limit=5 ) @@ -419,6 +1032,8 @@ class ToolManager: """Execute web search - PRIORITY 2 (use after memory_search)""" query = args.get("query") max_results = args.get("max_results", 5) + if not query: + return ToolResult(success=False, result=None, error="query is required") try: resp = await self.http_client.post( @@ -427,11 +1042,67 @@ class ToolManager: ) if resp.status_code == 200: data = resp.json() - results = data.get("results", []) - # Format results for LLM + results = data.get("results", []) or [] + query_terms = {t for t in str(query).lower().replace("/", " ").replace("-", " ").split() if len(t) > 2} + + trusted_domains = { + "wikipedia.org", "wikidata.org", "europa.eu", "fao.org", "who.int", + "worldbank.org", "oecd.org", "un.org", "gov.ua", "rada.gov.ua", + "kmu.gov.ua", "minagro.gov.ua", "agroportal.ua", "latifundist.com", + } + low_signal_domains = { + "pinterest.com", "tiktok.com", "instagram.com", "facebook.com", + "youtube.com", "yandex.", "vk.com", + } + + def _extract_domain(url: str) -> str: + if not url: + return "" + u = url.lower().strip() + u = u.replace("https://", "").replace("http://", "") + u = u.split("/")[0] + if u.startswith("www."): + u = u[4:] + return u + + def _overlap_score(title: str, snippet: str, url: str) -> int: + text = " ".join([title or "", snippet or "", url or ""]).lower() + score = 0 + for t in query_terms: + if t in text: + score += 2 + return score + + ranked: List[Any] = [] + for r in results: + title = str(r.get("title", "") or "") + snippet = str(r.get("snippet", "") or "") + url = str(r.get("url", "") or "") + domain = _extract_domain(url) + score = _overlap_score(title, snippet, url) + + if any(x in domain for x in low_signal_domains): + score -= 2 + if any(domain == d or domain.endswith("." + d) for d in trusted_domains): + score += 2 + if not snippet: + score -= 1 + if len(title.strip()) < 5: + score -= 1 + + ranked.append((score, r)) + + ranked.sort(key=lambda x: x[0], reverse=True) + selected = [item for _, item in ranked[:max_results]] + + logger.info(f"🔎 web_search rerank: raw={len(results)} ranked={len(selected)} query='{query[:120]}'") + formatted = [] - for r in results[:max_results]: - formatted.append(f"- {r.get('title', 'No title')}\n {r.get('snippet', '')}\n URL: {r.get('url', '')}") + for r in selected: + formatted.append( + f"- {r.get('title', 'No title')}\n {r.get('snippet', '')}\n URL: {r.get('url', '')}" + ) + return ToolResult(success=True, result="\n".join(formatted) if formatted else "Нічого не знайдено") else: return ToolResult(success=False, result=None, error=f"Search failed: {resp.status_code}") @@ -492,55 +1163,130 @@ class ToolManager: logger.debug(f"Could not unload FLUX: {e}") async def _image_generate(self, args: Dict) -> ToolResult: - """Generate image with VRAM management""" + """Backward-compatible image generation entrypoint routed to Comfy (NODE3).""" + if not args.get("prompt"): + return ToolResult(success=False, result=None, error="prompt is required") + + comfy_args = dict(args) + comfy_args.setdefault("negative_prompt", "blurry, low quality, watermark") + comfy_args.setdefault("steps", 28) + comfy_args.setdefault("timeout_s", 180) + return await self._comfy_generate_image(comfy_args) + + async def _poll_comfy_job(self, job_id: str, timeout_s: int = 180) -> Dict[str, Any]: + """Poll Comfy Agent job status until terminal state or timeout.""" + loop = asyncio.get_running_loop() + deadline = loop.time() + max(10, timeout_s) + delay = 1.0 + last: Dict[str, Any] = {} + + while loop.time() < deadline: + resp = await self.http_client.get(f"{self.comfy_agent_url}/status/{job_id}", timeout=30.0) + if resp.status_code != 200: + raise RuntimeError(f"Comfy status failed: {resp.status_code}") + + data = resp.json() + last = data + status = (data.get("status") or "").lower() + + if status in {"succeeded", "finished"}: + return data + if status in {"failed", "canceled", "cancelled", "expired"}: + return data + + await asyncio.sleep(delay) + delay = min(delay * 1.5, 5.0) + + raise TimeoutError(f"Comfy job timeout after {timeout_s}s (job_id={job_id})") + + async def _comfy_generate_image(self, args: Dict) -> ToolResult: + """Generate image via Comfy Agent on NODE3 and return URL when ready.""" prompt = args.get("prompt") - # Use smaller sizes to fit in VRAM (20GB GPU shared with LLM) - width = min(args.get("width", 512), 512) - height = min(args.get("height", 512), 512) - + if not prompt: + return ToolResult(success=False, result=None, error="prompt is required") + + payload = { + "prompt": prompt, + "negative_prompt": args.get("negative_prompt", "blurry, low quality, watermark"), + "width": int(args.get("width", 512)), + "height": int(args.get("height", 512)), + "steps": int(args.get("steps", 28)), + } + if args.get("seed") is not None: + payload["seed"] = int(args["seed"]) + + timeout_s = int(args.get("timeout_s", 180)) + idem_key = args.get("idempotency_key") or f"router-{uuid.uuid4().hex}" + try: - # Step 1: Unload Ollama models to free VRAM for FLUX (~15GB needed) - logger.info("🔄 Preparing VRAM for FLUX image generation...") - await self._unload_ollama_models() - - # Step 2: Generate image resp = await self.http_client.post( - f"{self.swapper_url}/image/generate", - json={"prompt": prompt, "width": width, "height": height, "num_inference_steps": 8}, - timeout=180.0 # FLUX needs time + f"{self.comfy_agent_url}/generate/image", + json=payload, + headers={"Idempotency-Key": idem_key}, + timeout=30.0, ) - if resp.status_code == 200: - data = resp.json() - image_base64 = data.get("image_base64") - image_url = data.get("image_url") or data.get("url") - - # Step 3: Unload FLUX to free VRAM for other models (LLM, Vision) - logger.info("🔄 Image generated, unloading FLUX to free VRAM...") - await self._unload_flux() - - if image_base64: - # Return base64 image for Gateway to send - return ToolResult( - success=True, - result="✅ Зображення згенеровано", - image_base64=image_base64 - ) - elif image_url: - return ToolResult( - success=True, - result=f"✅ Зображення згенеровано: {image_url}", - image_base64=None - ) - else: - return ToolResult( - success=True, - result="✅ Зображення згенеровано (формат невідомий)", - image_base64=None - ) - else: - # Also unload FLUX on failure to free VRAM - await self._unload_flux() - return ToolResult(success=False, result=None, error=f"Generation failed: {resp.status_code}") + if resp.status_code != 200: + return ToolResult(success=False, result=None, error=f"Comfy image request failed: {resp.status_code}") + + created = resp.json() + job_id = created.get("job_id") + if not job_id: + return ToolResult(success=False, result=None, error="Comfy image request did not return job_id") + + final = await self._poll_comfy_job(job_id, timeout_s=timeout_s) + status = (final.get("status") or "").lower() + if status in {"succeeded", "finished"}: + result_url = final.get("result_url") + if result_url: + return ToolResult(success=True, result=f"✅ Зображення згенеровано: {result_url}") + return ToolResult(success=True, result=f"✅ Зображення згенеровано. job_id={job_id}") + + return ToolResult(success=False, result=None, error=final.get("error") or f"Comfy image failed (status={status})") + except Exception as e: + return ToolResult(success=False, result=None, error=str(e)) + + async def _comfy_generate_video(self, args: Dict) -> ToolResult: + """Generate video via Comfy Agent on NODE3 and return URL when ready.""" + prompt = args.get("prompt") + if not prompt: + return ToolResult(success=False, result=None, error="prompt is required") + + payload = { + "prompt": prompt, + "seconds": int(args.get("seconds", 4)), + "fps": int(args.get("fps", 24)), + "steps": int(args.get("steps", 30)), + } + if args.get("seed") is not None: + payload["seed"] = int(args["seed"]) + + timeout_s = int(args.get("timeout_s", 300)) + idem_key = args.get("idempotency_key") or f"router-{uuid.uuid4().hex}" + + try: + resp = await self.http_client.post( + f"{self.comfy_agent_url}/generate/video", + json=payload, + headers={"Idempotency-Key": idem_key}, + timeout=30.0, + ) + if resp.status_code != 200: + return ToolResult(success=False, result=None, error=f"Comfy video request failed: {resp.status_code}") + + created = resp.json() + job_id = created.get("job_id") + if not job_id: + return ToolResult(success=False, result=None, error="Comfy video request did not return job_id") + + final = await self._poll_comfy_job(job_id, timeout_s=timeout_s) + status = (final.get("status") or "").lower() + if status in {"succeeded", "finished"}: + result_url = final.get("result_url") + if result_url: + return ToolResult(success=True, result=f"✅ Відео згенеровано: {result_url}") + return ToolResult(success=True, result=f"✅ Відео згенеровано. job_id={job_id}") + + return ToolResult(success=False, result=None, error=final.get("error") or f"Comfy video failed (status={status})") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) @@ -580,27 +1326,58 @@ class ToolManager: except Exception as e: return ToolResult(success=False, result=None, error=str(e)) - async def _remember_fact(self, args: Dict) -> ToolResult: - """Store a fact in memory""" - fact = args.get("fact") - about = args.get("about") - category = args.get("category", "general") - + async def _remember_fact(self, args: Dict, agent_id: str = None, chat_id: str = None, user_id: str = None) -> ToolResult: + """Store a fact in memory with strict args validation.""" + if not isinstance(args, dict) or not args: + logger.warning("⚠️ remember_fact blocked: empty args") + return ToolResult(success=False, result=None, error="invalid_tool_args: remember_fact requires {fact: }.") + + fact_raw = args.get("fact") + if fact_raw is None: + fact_raw = args.get("text") + + if not isinstance(fact_raw, str): + logger.warning("⚠️ remember_fact blocked: fact/text must be string") + return ToolResult(success=False, result=None, error="invalid_tool_args: fact/text must be string.") + + fact = fact_raw.strip() + if not fact: + logger.warning("⚠️ remember_fact blocked: empty fact/text") + return ToolResult(success=False, result=None, error="invalid_tool_args: fact/text must be non-empty.") + + if len(fact) > 2000: + logger.warning("⚠️ remember_fact blocked: fact too long (%s)", len(fact)) + return ToolResult(success=False, result=None, error="invalid_tool_args: fact/text is too long (max 2000 chars).") + + category = str(args.get("category") or "general").strip() or "general" + runtime_user_id = (str(user_id or "").strip() or str(args.get("user_id") or "").strip() or str(args.get("about") or "").strip()) + if not runtime_user_id: + logger.warning("⚠️ remember_fact blocked: missing runtime user_id") + return ToolResult(success=False, result=None, error="invalid_tool_args: missing runtime user_id for memory write.") + + fact_hash = hashlib.sha1(fact.encode("utf-8")).hexdigest()[:12] + fact_key = f"{category}_{fact_hash}" + try: - # Store via Memory Service resp = await self.http_client.post( "http://memory-service:8000/facts/upsert", json={ - "user_id": about, - "fact_key": f"{category}_{hash(fact) % 10000}", + "user_id": runtime_user_id, + "fact_key": fact_key, "fact_value": fact, - "fact_value_json": {"text": fact, "category": category, "about": about} - } + "fact_value_json": { + "text": fact, + "category": category, + "about": runtime_user_id, + "agent_id": agent_id, + "chat_id": chat_id, + "source": "remember_fact_tool", + }, + }, ) if resp.status_code in [200, 201]: - return ToolResult(success=True, result=f"✅ Запам'ятовано факт про {about}") - else: - return ToolResult(success=False, result=None, error=f"Memory store failed: {resp.status_code}") + return ToolResult(success=True, result=f"✅ Запам'ятовано факт ({category})") + return ToolResult(success=False, result=None, error=f"memory_store_failed:{resp.status_code}:{resp.text[:160]}") except Exception as e: return ToolResult(success=False, result=None, error=str(e)) @@ -827,6 +1604,78 @@ class ToolManager: logger.error(f"TTS failed: {e}") return ToolResult(success=False, result=None, error=str(e)) + async def _market_data(self, args: Dict) -> ToolResult: + """Query real-time market data from Market Data Service and SenpAI MD Consumer.""" + symbol = str(args.get("symbol", "BTCUSDT")).upper() + query_type = str(args.get("query_type", "all")).lower() + + md_url = os.getenv("MARKET_DATA_URL", "http://dagi-market-data-node1:8891") + consumer_url = os.getenv("SENPAI_CONSUMER_URL", "http://dagi-senpai-md-consumer-node1:8892") + + results: Dict[str, Any] = {} + + try: + async with httpx.AsyncClient(timeout=8.0) as client: + if query_type in ("price", "all"): + try: + resp = await client.get(f"{md_url}/latest", params={"symbol": symbol}) + if resp.status_code == 200: + data = resp.json() + trade = data.get("latest_trade", {}) or {} + quote = data.get("latest_quote", {}) or {} + bid = quote.get("bid") + ask = quote.get("ask") + spread = None + if isinstance(bid, (int, float)) and isinstance(ask, (int, float)): + spread = round(ask - bid, 6) + results["price"] = { + "symbol": symbol, + "last_price": trade.get("price"), + "size": trade.get("size"), + "side": trade.get("side"), + "bid": bid, + "ask": ask, + "spread": spread, + "provider": trade.get("provider"), + "timestamp": trade.get("ts_recv"), + } + else: + results["price_error"] = f"market-data status={resp.status_code}" + except Exception as e: + results["price_error"] = str(e) + + if query_type in ("features", "all"): + try: + resp = await client.get(f"{consumer_url}/features/latest", params={"symbol": symbol}) + if resp.status_code == 200: + data = resp.json() + feats = data.get("features", {}) or {} + results["features"] = { + "symbol": symbol, + "mid_price": feats.get("mid"), + "spread_bps": round(float(feats.get("spread_bps", 0) or 0), 2), + "vwap_10s": round(float(feats.get("trade_vwap_10s", 0) or 0), 2), + "vwap_60s": round(float(feats.get("trade_vwap_60s", 0) or 0), 2), + "trade_count_10s": int(feats.get("trade_count_10s", 0) or 0), + "trade_volume_10s": round(float(feats.get("trade_volume_10s", 0) or 0), 4), + "return_10s_pct": round(float(feats.get("return_10s", 0) or 0) * 100, 4), + "realized_vol_60s_pct": round(float(feats.get("realized_vol_60s", 0) or 0) * 100, 6), + "latency_p50_ms": round(float(feats.get("latency_ms_p50", 0) or 0), 1), + "latency_p95_ms": round(float(feats.get("latency_ms_p95", 0) or 0), 1), + } + else: + results["features_error"] = f"senpai-consumer status={resp.status_code}" + except Exception as e: + results["features_error"] = str(e) + + if not results: + return ToolResult(success=False, result=None, error=f"No market data for {symbol}") + + return ToolResult(success=True, result=json.dumps(results, ensure_ascii=False)) + except Exception as e: + logger.error(f"Market data tool error: {e}") + return ToolResult(success=False, result=None, error=str(e)) + async def close(self): await self.http_client.aclose() @@ -890,6 +1739,16 @@ def format_tool_calls_for_response(tool_results: List[Dict], fallback_mode: str for tr in tool_results: if tr.get("name") == "image_generate" and tr.get("success"): return "✅ Зображення згенеровано!" + + if "comfy_generate_image" in tool_names: + for tr in tool_results: + if tr.get("name") == "comfy_generate_image" and tr.get("success"): + return str(tr.get("result", "✅ Зображення згенеровано через ComfyUI")) + + if "comfy_generate_video" in tool_names: + for tr in tool_results: + if tr.get("name") == "comfy_generate_video" and tr.get("success"): + return str(tr.get("result", "✅ Відео згенеровано через ComfyUI")) # Web search - show actual results to user if "web_search" in tool_names: