node1: add universal file tool, gateway document delivery, and sync runbook

This commit is contained in:
Apple
2026-02-15 01:50:37 -08:00
parent dd4b466d79
commit 21576f0ca3
7 changed files with 2207 additions and 131 deletions

View File

@@ -0,0 +1,124 @@
# NODE1 File Tool Sync Runbook
## Scope
This runbook documents:
- how NODE1 runtime drift was identified and synchronized,
- how universal `file_tool` was introduced into the **actual** NODE1 router stack,
- how to deploy and rollback safely.
## Canonical Runtime (NODE1)
- Host: `144.76.224.179`
- Runtime repo: `/opt/microdao-daarion`
- Compose: `/opt/microdao-daarion/docker-compose.node1.yml`
- Router container: `dagi-router-node1`
- Gateway container: `dagi-gateway-node1`
- Router API contract: `POST /v1/agents/{agent_id}/infer` (not `/route`)
## Drift Findings (before sync)
The laptop had multiple repos; only this repo matches NODE1 architecture:
- `/Users/apple/github-projects/microdao-daarion`
Critical files were drifted against NODE1 runtime (15 files):
- `docker-compose.node1.yml`
- `gateway-bot/agent_registry.json`
- `gateway-bot/http_api.py`
- `gateway-bot/router_client.py`
- `gateway-bot/senpai_prompt.txt`
- `http_api.py`
- `services/router/agent_tools_config.py`
- `services/router/main.py`
- `services/router/memory_retrieval.py`
- `services/router/requirements.txt`
- `services/router/router-config.yml`
- `services/router/tool_manager.py`
- `services/senpai-md-consumer/senpai/md_consumer/main.py`
- `services/senpai-md-consumer/senpai/md_consumer/publisher.py`
- `services/swapper-service/requirements.txt`
## Sync Procedure (NODE1 -> Laptop)
1. Snapshot and compare hashes:
- compute SHA256 for the critical file list on laptop and NODE1.
2. Backup local copies:
- `rollback_backups/node1_sync_<timestamp>/...`
3. Copy runtime files from NODE1 to laptop:
- `scp root@144.76.224.179:/opt/microdao-daarion/<file> ...`
4. Verify 1:1 hash match.
## File Tool Implementation
Implemented in actual NODE1 stack (`services/router/*` + gateway):
### Added actions
- `csv_create`
- `csv_update`
- `json_export`
- `yaml_export`
- `zip_bundle`
- `docx_create`
- `docx_update`
- `pdf_merge`
- `pdf_split`
- `pdf_fill`
### Standard output contract
For file-producing tool calls, router now propagates:
- `file_base64`
- `file_name`
- `file_mime`
- `message` (inside tool result payload)
### Gateway behavior
`gateway-bot/http_api.py` now sends Telegram `sendDocument` when file fields are present.
## Changed Files
- `services/router/requirements.txt`
- `services/router/agent_tools_config.py`
- `services/router/tool_manager.py`
- `services/router/main.py`
- `gateway-bot/router_client.py`
- `gateway-bot/http_api.py`
## Deployment Steps (NODE1)
1. Backup target files on NODE1 before each step.
2. Sync updated files to `/opt/microdao-daarion`.
3. Compile checks:
- `python3 -m py_compile services/router/tool_manager.py services/router/main.py services/router/agent_tools_config.py gateway-bot/router_client.py gateway-bot/http_api.py`
4. Rebuild/restart runtime services:
- `docker compose -f docker-compose.node1.yml up -d --build --no-deps router gateway`
5. Health checks:
- `curl http://127.0.0.1:9102/health`
- `curl http://127.0.0.1:9300/health`
## Smoke Tests
Run inside `dagi-router-node1` to validate actions deterministically:
- CSV create/update
- JSON/YAML export
- ZIP bundle
- DOCX create/update
- PDF merge/split/fill
Also verify infer endpoint still works:
- `POST http://127.0.0.1:9102/v1/agents/devtools/infer`
## Backups Created During This Work
- `rollback_backups/file_tool_step1_20260215_011637/...`
- `rollback_backups/file_tool_step2_tool_manager.py.bak_20260215_012029`
- `rollback_backups/file_tool_step3_tool_manager.py.bak_20260215_012200`
- `rollback_backups/file_tool_step4_tool_manager.py.bak_20260215_012309`
## Rollback (NODE1)
```bash
cd /opt/microdao-daarion
cp rollback_backups/file_tool_step1_20260215_011637/services/router/requirements.txt services/router/requirements.txt
cp rollback_backups/file_tool_step1_20260215_011637/services/router/agent_tools_config.py services/router/agent_tools_config.py
cp rollback_backups/file_tool_step1_20260215_011637/services/router/tool_manager.py services/router/tool_manager.py
cp rollback_backups/file_tool_step1_20260215_011637/services/router/main.py services/router/main.py
cp rollback_backups/file_tool_step1_20260215_011637/gateway-bot/router_client.py gateway-bot/router_client.py
cp rollback_backups/file_tool_step1_20260215_011637/gateway-bot/http_api.py gateway-bot/http_api.py
docker compose -f docker-compose.node1.yml up -d --build --no-deps router gateway
```
## Notes
- `docker-compose.node1.yml` may warn about deprecated `version` key; this is non-blocking.
- Avoid `--remove-orphans` unless explicitly intended.
- Use `--no-deps` for targeted router/gateway rollout to avoid unrelated service churn.

View File

@@ -57,6 +57,172 @@ LAST_PENDING_STATE: Dict[str, Dict[str, Any]] = {}
PENDING_STATE_TTL = 1800 # 30 minutes PENDING_STATE_TTL = 1800 # 30 minutes
# Per-user language preference cache (chat_id:user_id -> {lang, ts})
USER_LANGUAGE_PREFS: Dict[str, Dict[str, Any]] = {}
USER_LANGUAGE_PREF_TTL = 30 * 24 * 3600 # 30 days
# Recent photo context for follow-up questions in chat (agent:chat:user -> {file_id, ts})
RECENT_PHOTO_CONTEXT: Dict[str, Dict[str, Any]] = {}
RECENT_PHOTO_TTL = 30 * 60 # 30 minutes
def _cleanup_recent_photo_context() -> None:
now = time.time()
expired = [k for k, v in RECENT_PHOTO_CONTEXT.items() if now - float(v.get("ts", 0)) > RECENT_PHOTO_TTL]
for k in expired:
del RECENT_PHOTO_CONTEXT[k]
def _set_recent_photo_context(agent_id: str, chat_id: str, user_id: str, file_id: str) -> None:
_cleanup_recent_photo_context()
key = f"{agent_id}:{chat_id}:{user_id}"
RECENT_PHOTO_CONTEXT[key] = {"file_id": file_id, "ts": time.time()}
def _get_recent_photo_file_id(agent_id: str, chat_id: str, user_id: str) -> Optional[str]:
_cleanup_recent_photo_context()
key = f"{agent_id}:{chat_id}:{user_id}"
rec = RECENT_PHOTO_CONTEXT.get(key)
if not rec:
return None
return rec.get("file_id")
def _looks_like_photo_followup(text: str) -> bool:
if not text:
return False
t = text.strip().lower()
markers = [
"що ти бачиш", "що на фото", "що на зображенні", "опиши фото", "подивись фото",
"what do you see", "what is in the image", "describe the photo",
"что ты видишь", "что на фото", "опиши фото", "посмотри фото",
]
return any(m in t for m in markers)
def _cleanup_user_language_prefs() -> None:
now = time.time()
expired = [k for k, v in USER_LANGUAGE_PREFS.items() if now - float(v.get("ts", 0)) > USER_LANGUAGE_PREF_TTL]
for k in expired:
del USER_LANGUAGE_PREFS[k]
def _normalize_lang_code(raw: Optional[str]) -> Optional[str]:
if not raw:
return None
code = str(raw).strip().lower().replace("_", "-")
if code.startswith("uk"):
return "uk"
if code.startswith("ru"):
return "ru"
if code.startswith("en"):
return "en"
return None
def _detect_language_from_text(text: str) -> Optional[str]:
if not text:
return None
t = text.lower()
letters = [ch for ch in t if ch.isalpha()]
if not letters:
return None
cyr = sum(1 for ch in letters if "а" <= ch <= "я" or ch in "іїєґё")
lat = sum(1 for ch in letters if "a" <= ch <= "z")
if cyr >= 3 and cyr >= lat:
# Ukrainian-specific letters strongly indicate Ukrainian.
if any(ch in t for ch in "іїєґ"):
return "uk"
# Russian-specific letters/symbols.
if any(ch in t for ch in "ёыэъ"):
return "ru"
# Soft lexical preference.
uk_hits = sum(1 for w in ("що", "який", "дякую", "будь", "будь ласка", "привіт") if w in t)
ru_hits = sum(1 for w in ("что", "какой", "спасибо", "пожалуйста", "привет") if w in t)
if uk_hits > ru_hits:
return "uk"
if ru_hits > uk_hits:
return "ru"
return "uk"
if lat >= 3 and lat > cyr:
return "en"
return None
def resolve_preferred_language(chat_id: str, user_id: str, text: str, telegram_lang_code: Optional[str]) -> str:
_cleanup_user_language_prefs()
key = f"{chat_id}:{user_id}"
text_lang = _detect_language_from_text(text)
tg_lang = _normalize_lang_code(telegram_lang_code)
cached_lang = USER_LANGUAGE_PREFS.get(key, {}).get("lang")
preferred = text_lang or tg_lang or cached_lang or "uk"
USER_LANGUAGE_PREFS[key] = {"lang": preferred, "ts": time.time()}
return preferred
def preferred_language_label(lang: str) -> str:
return {
"uk": "Ukrainian",
"ru": "Russian",
"en": "English",
}.get((lang or "").lower(), "Ukrainian")
def _extract_preferred_language_from_profile_fact(fact: Optional[Dict[str, Any]]) -> Optional[str]:
if not isinstance(fact, dict):
return None
data = fact.get("fact_value_json")
if not isinstance(data, dict):
return None
preferred = _normalize_lang_code(data.get("preferred_language"))
if preferred:
return preferred
return _normalize_lang_code(data.get("language_code"))
async def resolve_preferred_language_persistent(
chat_id: str,
user_id: str,
text: str,
telegram_lang_code: Optional[str],
team_id: Optional[str] = None,
) -> str:
"""Resolve language with memory-service fallback for post-restart continuity."""
_cleanup_user_language_prefs()
key = f"{chat_id}:{user_id}"
text_lang = _detect_language_from_text(text)
tg_lang = _normalize_lang_code(telegram_lang_code)
cached_lang = USER_LANGUAGE_PREFS.get(key, {}).get("lang")
if text_lang or tg_lang or cached_lang:
preferred = text_lang or tg_lang or cached_lang or "uk"
USER_LANGUAGE_PREFS[key] = {"lang": preferred, "ts": time.time()}
return preferred
try:
fact = await memory_client.get_fact(
user_id=f"tg:{user_id}",
fact_key="profile",
team_id=team_id,
)
fact_lang = _extract_preferred_language_from_profile_fact(fact)
if fact_lang:
USER_LANGUAGE_PREFS[key] = {"lang": fact_lang, "ts": time.time()}
return fact_lang
except Exception as e:
logger.debug(f"preferred language fact lookup failed: {e}")
USER_LANGUAGE_PREFS[key] = {"lang": "uk", "ts": time.time()}
return "uk"
def _pending_state_cleanup(): def _pending_state_cleanup():
now = time.time() now = time.time()
expired = [cid for cid, rec in LAST_PENDING_STATE.items() if now - rec.get('ts', 0) > PENDING_STATE_TTL] expired = [cid for cid, rec in LAST_PENDING_STATE.items() if now - rec.get('ts', 0) > PENDING_STATE_TTL]
@@ -483,9 +649,36 @@ async def agromatrix_telegram_webhook(update: TelegramUpdate):
if user_id and user_id in op_ids: if user_id and user_id in op_ids:
is_ops = True is_ops = True
# Operator NL or slash commands -> handle via Stepan handler # Operator NL or operator slash commands -> handle via Stepan handler.
if is_slash or is_ops: # Important: do NOT treat generic slash commands (/start, /agromatrix) as operator commands,
# otherwise regular users will see "Недостатньо прав" or Stepan errors.
operator_slash_cmds = {
"whoami",
"pending",
"pending_show",
"approve",
"reject",
"apply_dict",
"pending_stats",
}
slash_cmd = ""
if is_slash:
try:
slash_cmd = (msg_text.strip().split()[0].lstrip("/").strip().lower())
except Exception:
slash_cmd = ""
is_operator_slash = bool(slash_cmd) and slash_cmd in operator_slash_cmds
# Stepan handler currently depends on ChatOpenAI (OPENAI_API_KEY). If key is not configured,
# never route production traffic there (avoid "Помилка обробки..." and webhook 5xx).
stepan_enabled = bool(os.getenv("OPENAI_API_KEY", "").strip())
if stepan_enabled and (is_ops or is_operator_slash):
return await handle_stepan_message(update, AGROMATRIX_CONFIG) return await handle_stepan_message(update, AGROMATRIX_CONFIG)
if (is_ops or is_operator_slash) and not stepan_enabled:
logger.warning(
"Stepan handler disabled (OPENAI_API_KEY missing); falling back to Router pipeline "
f"for chat_id={chat_id}, user_id={user_id}, slash_cmd={slash_cmd!r}"
)
# General conversation -> standard Router pipeline (like all other agents) # General conversation -> standard Router pipeline (like all other agents)
return await handle_telegram_webhook(AGROMATRIX_CONFIG, update) return await handle_telegram_webhook(AGROMATRIX_CONFIG, update)
@@ -611,14 +804,37 @@ def extract_bot_mentions(text: str) -> List[str]:
return mentions return mentions
def should_force_detailed_reply(text: str) -> bool:
"""Soft signal: user explicitly asks for details/long format."""
if not text:
return False
lower = text.strip().lower()
detail_markers = [
"детально", "подробно", "розгорну", "розпиши", "по всіх пунктах",
"step by step", "покроково", "з прикладами", "глибоко", "deep dive",
"full", "повний розбір", "максимально детально",
]
return any(m in lower for m in detail_markers)
def should_force_concise_reply(text: str) -> bool: def should_force_concise_reply(text: str) -> bool:
"""Якщо коротке або без питального знаку — просимо агента відповісти стисло.""" """Soft concise mode by default, unless user asks for detailed answer."""
if not text: if not text:
return True return True
stripped = text.strip() stripped = text.strip()
if len(stripped) <= 120 and "?" not in stripped: if not stripped:
return True return True
return False
if should_force_detailed_reply(stripped):
return False
# Very long user request usually means they expect context-aware answer.
if len(stripped) > 700:
return False
# For regular Q&A in chat keep first response concise by default.
return True
COMPLEX_REASONING_KEYWORDS = [ COMPLEX_REASONING_KEYWORDS = [
@@ -808,7 +1024,9 @@ async def process_photo(
user_id: str, user_id: str,
username: str, username: str,
dao_id: str, dao_id: str,
photo: Dict[str, Any] photo: Dict[str, Any],
caption_override: Optional[str] = None,
bypass_media_gate: bool = False,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Універсальна функція для обробки фото для будь-якого агента. Універсальна функція для обробки фото для будь-якого агента.
@@ -833,9 +1051,10 @@ async def process_photo(
return {"ok": False, "error": "No file_id in photo"} return {"ok": False, "error": "No file_id in photo"}
logger.info(f"{agent_config.name}: Photo from {username} (tg:{user_id}), file_id: {file_id}") logger.info(f"{agent_config.name}: Photo from {username} (tg:{user_id}), file_id: {file_id}")
_set_recent_photo_context(agent_config.agent_id, chat_id, user_id, file_id)
# Get caption for media question check # Get caption for media question check
caption = (update.message or {}).get("caption") or "" caption = caption_override if caption_override is not None else ((update.message or {}).get("caption") or "")
chat = (update.message or {}).get("chat", {}) chat = (update.message or {}).get("chat", {})
chat_type = chat.get("type", "private") chat_type = chat.get("type", "private")
is_private_chat = chat_type == "private" is_private_chat = chat_type == "private"
@@ -843,7 +1062,7 @@ async def process_photo(
# BEHAVIOR POLICY v1: Media-no-comment # BEHAVIOR POLICY v1: Media-no-comment
# Check if photo has a question/request in caption # Check if photo has a question/request in caption
if not is_private_chat and not is_training: if not bypass_media_gate and not is_private_chat and not is_training:
has_question = detect_media_question(caption) has_question = detect_media_question(caption)
if not has_question: if not has_question:
logger.info(f"🔇 MEDIA-NO-COMMENT: Photo without question. Agent {agent_config.agent_id} NOT responding.") logger.info(f"🔇 MEDIA-NO-COMMENT: Photo without question. Agent {agent_config.agent_id} NOT responding.")
@@ -961,10 +1180,10 @@ async def process_photo(
else: else:
await send_telegram_message( await send_telegram_message(
chat_id, chat_id,
"Не вдалося отримати опис зображення.", "Не вдалося коректно обробити фото. Спробуйте інше фото або додайте короткий опис, що саме перевірити.",
telegram_token telegram_token
) )
return {"ok": False, "error": "No description in response"} return {"ok": True, "handled": True, "reason": "vision_empty_response"}
else: else:
error_msg = response.get("error", "Unknown error") if isinstance(response, dict) else "Router error" error_msg = response.get("error", "Unknown error") if isinstance(response, dict) else "Router error"
logger.error(f"{agent_config.name}: Vision-8b error: {error_msg}") logger.error(f"{agent_config.name}: Vision-8b error: {error_msg}")
@@ -1338,6 +1557,13 @@ async def handle_telegram_webhook(
# Get DAO ID for this chat # Get DAO ID for this chat
dao_id = get_dao_id(chat_id, "telegram", agent_id=agent_config.agent_id) dao_id = get_dao_id(chat_id, "telegram", agent_id=agent_config.agent_id)
initial_preferred_lang = resolve_preferred_language(
chat_id=chat_id,
user_id=user_id,
text=update.message.get("text", ""),
telegram_lang_code=from_user.get("language_code"),
)
# Оновлюємо факти про користувача/агента для побудови графу пам'яті # Оновлюємо факти про користувача/агента для побудови графу пам'яті
asyncio.create_task( asyncio.create_task(
memory_client.upsert_fact( memory_client.upsert_fact(
@@ -1348,6 +1574,7 @@ async def handle_telegram_webhook(
"first_name": first_name, "first_name": first_name,
"last_name": last_name, "last_name": last_name,
"language_code": from_user.get("language_code"), "language_code": from_user.get("language_code"),
"preferred_language": initial_preferred_lang,
"is_bot": is_sender_bot, "is_bot": is_sender_bot,
}, },
team_id=dao_id, team_id=dao_id,
@@ -1919,8 +2146,7 @@ async def handle_telegram_webhook(
result = await process_photo( result = await process_photo(
agent_config, update, chat_id, user_id, username, dao_id, photo agent_config, update, chat_id, user_id, username, dao_id, photo
) )
if result.get("ok"): return result
return result
# Check if it's a voice message # Check if it's a voice message
voice = update.message.get("voice") voice = update.message.get("voice")
@@ -1947,6 +2173,26 @@ async def handle_telegram_webhook(
if not text: if not text:
text = update.message.get("text", "") text = update.message.get("text", "")
caption = update.message.get("caption", "") caption = update.message.get("caption", "")
# If user asks about a recently sent photo, run vision on cached photo file_id.
if text and _looks_like_photo_followup(text):
recent_file_id = _get_recent_photo_file_id(agent_config.agent_id, chat_id, user_id)
if recent_file_id:
logger.info(
f"{agent_config.name}: Detected follow-up photo question; using cached file_id={recent_file_id}"
)
followup_result = await process_photo(
agent_config=agent_config,
update=update,
chat_id=chat_id,
user_id=user_id,
username=username,
dao_id=dao_id,
photo={"file_id": recent_file_id},
caption_override=text,
bypass_media_gate=True,
)
return followup_result
if not text and not caption: if not text and not caption:
# Check for unsupported message types and silently ignore # Check for unsupported message types and silently ignore
@@ -2149,9 +2395,10 @@ async def handle_telegram_webhook(
return {"ok": True, "ack": True, "reason": respond_reason} return {"ok": True, "ack": True, "reason": respond_reason}
# FULL: proceed with LLM/Router call # FULL: proceed with LLM/Router call
# For prober requests, respond but don't send to Telegram # For prober requests, skip LLM/Router entirely to save tokens
if is_prober: if is_prober:
logger.info(f"\U0001f9ea PROBER: Agent {agent_config.agent_id} responding to prober request. Reason: {respond_reason}") logger.info(f"\U0001f9ea PROBER: Agent {agent_config.agent_id} responding to prober (no LLM call). Reason: {respond_reason}")
return {"ok": True, "agent": agent_config.agent_id, "prober": True, "response_preview": "[prober-skip-llm]"}
else: else:
logger.info(f"\u2705 SOWA: Agent {agent_config.agent_id} WILL respond (FULL). Reason: {respond_reason}") logger.info(f"\u2705 SOWA: Agent {agent_config.agent_id} WILL respond (FULL). Reason: {respond_reason}")
@@ -2183,6 +2430,15 @@ async def handle_telegram_webhook(
else: else:
message_with_context = f"{training_prefix}{text}" message_with_context = f"{training_prefix}{text}"
preferred_lang = await resolve_preferred_language_persistent(
chat_id=chat_id,
user_id=user_id,
text=text or "",
telegram_lang_code=from_user.get("language_code"),
team_id=dao_id,
)
preferred_lang_label = preferred_language_label(preferred_lang)
# Build request to Router # Build request to Router
system_prompt = agent_config.system_prompt system_prompt = agent_config.system_prompt
logger.info(f"📝 {agent_config.name} system_prompt length: {len(system_prompt) if system_prompt else 0} chars") logger.info(f"📝 {agent_config.name} system_prompt length: {len(system_prompt) if system_prompt else 0} chars")
@@ -2206,6 +2462,9 @@ async def handle_telegram_webhook(
"mentioned_bots": mentioned_bots, "mentioned_bots": mentioned_bots,
"requires_complex_reasoning": needs_complex_reasoning, "requires_complex_reasoning": needs_complex_reasoning,
"is_reply_to_agent": is_reply_to_agent, "is_reply_to_agent": is_reply_to_agent,
"is_training_group": is_training_group,
"preferred_response_language": preferred_lang,
"preferred_response_language_label": preferred_lang_label,
}, },
"context": { "context": {
"agent_name": agent_config.name, "agent_name": agent_config.name,
@@ -2218,17 +2477,30 @@ async def handle_telegram_webhook(
}, },
} }
if should_force_detailed_reply(text):
router_request["metadata"]["force_detailed"] = True
if should_force_concise_reply(text): if should_force_concise_reply(text):
# IMPORTANT: preserve conversation context! Only append concise instruction # IMPORTANT: preserve conversation context! Only append concise instruction
router_request["metadata"]["force_concise"] = True
router_request["message"] = ( router_request["message"] = (
router_request["message"] router_request["message"]
+ "\n\n(Інструкція: дай максимально коротку відповідь, якщо не просили деталей " + "\n\n(Інструкція: спочатку дай коротку відповідь по суті (1-3 абзаци), "
"і дочекайся додаткового питання.)" "а якщо користувач попросить — розгорни детально.)"
+ f"\n(Мова відповіді: {preferred_lang_label}.)"
+ "\n(Не потрібно щоразу представлятися по імені або писати шаблонне: 'чим можу допомогти'.)"
) )
if needs_complex_reasoning: if needs_complex_reasoning:
router_request["metadata"]["provider"] = "cloud_deepseek" router_request["metadata"]["provider"] = "cloud_deepseek"
router_request["metadata"]["reason"] = "auto_complex" router_request["metadata"]["reason"] = "auto_complex"
if not should_force_concise_reply(text):
router_request["message"] = (
router_request["message"]
+ f"\n\n(Мова відповіді: {preferred_lang_label}.)"
+ "\n(Не потрібно щоразу представлятися по імені або писати шаблонне: 'чим можу допомогти'.)"
)
# Send to Router # Send to Router
logger.info(f"Sending to Router: agent={agent_config.agent_id}, dao={dao_id}, user=tg:{user_id}") logger.info(f"Sending to Router: agent={agent_config.agent_id}, dao={dao_id}, user=tg:{user_id}")
@@ -2238,6 +2510,9 @@ async def handle_telegram_webhook(
if isinstance(response, dict) and response.get("ok"): if isinstance(response, dict) and response.get("ok"):
answer_text = response.get("data", {}).get("text") or response.get("response", "") answer_text = response.get("data", {}).get("text") or response.get("response", "")
image_base64 = response.get("image_base64") or response.get("data", {}).get("image_base64") image_base64 = response.get("image_base64") or response.get("data", {}).get("image_base64")
file_base64 = response.get("file_base64") or response.get("data", {}).get("file_base64")
file_name = response.get("file_name") or response.get("data", {}).get("file_name") or "artifact.bin"
file_mime = response.get("file_mime") or response.get("data", {}).get("file_mime") or "application/octet-stream"
# Debug logging # Debug logging
logger.info(f"📦 Router response: {len(answer_text)} chars, model={response.get('model')}, backend={response.get('backend')}") logger.info(f"📦 Router response: {len(answer_text)} chars, model={response.get('model')}, backend={response.get('backend')}")
@@ -2246,7 +2521,9 @@ async def handle_telegram_webhook(
logger.info(f"🖼️ Received image_base64: {len(image_base64)} chars") logger.info(f"🖼️ Received image_base64: {len(image_base64)} chars")
else: else:
logger.debug("⚠️ No image_base64 in response") logger.debug("⚠️ No image_base64 in response")
if file_base64:
logger.info(f"📄 Received file_base64: {len(file_base64)} chars ({file_name})")
# Check for NO_OUTPUT (LLM decided not to respond) # Check for NO_OUTPUT (LLM decided not to respond)
if is_no_output_response(answer_text): if is_no_output_response(answer_text):
logger.info(f"🔇 NO_OUTPUT: Agent {agent_config.agent_id} returned empty/NO_OUTPUT. Not sending to Telegram.") logger.info(f"🔇 NO_OUTPUT: Agent {agent_config.agent_id} returned empty/NO_OUTPUT. Not sending to Telegram.")
@@ -2305,8 +2582,27 @@ async def handle_telegram_webhook(
logger.info(f"🧪 PROBER: Skipping Telegram send for prober request. Response: {answer_text[:100]}...") logger.info(f"🧪 PROBER: Skipping Telegram send for prober request. Response: {answer_text[:100]}...")
return {"ok": True, "agent": agent_config.agent_id, "prober": True, "response_preview": answer_text[:100]} return {"ok": True, "agent": agent_config.agent_id, "prober": True, "response_preview": answer_text[:100]}
# Send file artifact if generated
if file_base64:
try:
file_bytes = base64.b64decode(file_base64)
token = telegram_token or os.getenv("TELEGRAM_BOT_TOKEN")
url = f"https://api.telegram.org/bot{token}/sendDocument"
caption = answer_text[:1024] if answer_text else ""
safe_name = str(file_name).split("/")[-1].split("\\")[-1] or "artifact.bin"
async with httpx.AsyncClient() as client:
files = {"document": (safe_name, BytesIO(file_bytes), file_mime)}
data = {"chat_id": chat_id}
if caption:
data["caption"] = caption
response_doc = await client.post(url, files=files, data=data, timeout=45.0)
response_doc.raise_for_status()
logger.info(f"✅ Sent generated document to Telegram chat {chat_id}: {safe_name}")
except Exception as e:
logger.error(f"❌ Failed to send document to Telegram: {e}")
await send_telegram_message(chat_id, answer_text or "Файл згенеровано, але не вдалося надіслати документ.", telegram_token)
# Send image if generated # Send image if generated
if image_base64: elif image_base64:
try: try:
# Decode base64 image # Decode base64 image
image_bytes = base64.b64decode(image_base64) image_bytes = base64.b64decode(image_base64)
@@ -2344,6 +2640,7 @@ async def handle_telegram_webhook(
agent_metadata={ agent_metadata={
"mentioned_bots": mentioned_bots, "mentioned_bots": mentioned_bots,
"requires_complex_reasoning": needs_complex_reasoning, "requires_complex_reasoning": needs_complex_reasoning,
"preferred_language": preferred_lang,
}, },
username=username, username=username,
) )

View File

@@ -20,6 +20,46 @@ except ImportError:
# Router configuration from environment # Router configuration from environment
ROUTER_BASE_URL = os.getenv("ROUTER_URL", "http://127.0.0.1:9102") ROUTER_BASE_URL = os.getenv("ROUTER_URL", "http://127.0.0.1:9102")
ROUTER_TIMEOUT = float(os.getenv("ROUTER_TIMEOUT", "180.0")) ROUTER_TIMEOUT = float(os.getenv("ROUTER_TIMEOUT", "180.0"))
GATEWAY_MAX_TOKENS_DEFAULT = int(os.getenv("GATEWAY_MAX_TOKENS_DEFAULT", "700"))
GATEWAY_MAX_TOKENS_CONCISE = int(os.getenv("GATEWAY_MAX_TOKENS_CONCISE", "220"))
GATEWAY_MAX_TOKENS_TRAINING = int(os.getenv("GATEWAY_MAX_TOKENS_TRAINING", "900"))
GATEWAY_TEMPERATURE_DEFAULT = float(os.getenv("GATEWAY_TEMPERATURE_DEFAULT", "0.4"))
GATEWAY_MAX_TOKENS_SENPAI_DEFAULT = int(os.getenv("GATEWAY_MAX_TOKENS_SENPAI_DEFAULT", "320"))
GATEWAY_MAX_TOKENS_DETAILED = int(os.getenv("GATEWAY_MAX_TOKENS_DETAILED", "900"))
def _apply_runtime_communication_guardrails(system_prompt: str, metadata: Dict[str, Any]) -> str:
"""Apply global communication constraints for all agents in Telegram flows."""
if not system_prompt:
return system_prompt
lang_label = (metadata or {}).get("preferred_response_language_label") or "user language"
guardrail = (
"\n\n[GLOBAL COMMUNICATION POLICY]\n"
"1) Do not introduce yourself by name in every message.\n"
"2) Do not add repetitive generic closers like 'how can I help' unless user explicitly asks.\n"
"3) Continue the dialog naturally from context.\n"
f"4) Respond in {lang_label}, matching the user's latest language.\n"
)
return system_prompt + guardrail
def _apply_agent_style_guardrails(agent_id: str, system_prompt: str) -> str:
"""Apply lightweight runtime style constraints for specific agents."""
if not system_prompt:
return system_prompt
if agent_id == "nutra":
nutra_guardrail = (
"\n\n[STYLE LOCK - NUTRA]\n"
"Always write in first-person singular and feminine form.\n"
"Use feminine wording in Ukrainian/Russian (e.g., 'я підготувала', 'я готова', "
"'я зрозуміла').\n"
"Never switch to masculine forms (e.g., 'понял', 'готов').\n"
)
return system_prompt + nutra_guardrail
return system_prompt
async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]: async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]:
@@ -32,6 +72,8 @@ async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]:
context = body.get("context", {}) context = body.get("context", {})
system_prompt = body.get("system_prompt") or context.get("system_prompt") system_prompt = body.get("system_prompt") or context.get("system_prompt")
system_prompt = _apply_agent_style_guardrails(agent_id, system_prompt)
system_prompt = _apply_runtime_communication_guardrails(system_prompt, metadata)
if system_prompt: if system_prompt:
logger.info(f"Using system prompt ({len(system_prompt)} chars) for agent {agent_id}") logger.info(f"Using system prompt ({len(system_prompt)} chars) for agent {agent_id}")
@@ -39,10 +81,28 @@ async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]:
infer_url = f"{ROUTER_BASE_URL}/v1/agents/{agent_id}/infer" infer_url = f"{ROUTER_BASE_URL}/v1/agents/{agent_id}/infer"
metadata["agent_id"] = agent_id metadata["agent_id"] = agent_id
# Keep defaults moderate to avoid overly long replies while preserving flexibility.
max_tokens = GATEWAY_MAX_TOKENS_DEFAULT
# Senpai tends to over-verbose responses in Telegram; use lower default unless user asked details.
if agent_id == "senpai":
max_tokens = GATEWAY_MAX_TOKENS_SENPAI_DEFAULT
if metadata.get("is_training_group"):
max_tokens = GATEWAY_MAX_TOKENS_TRAINING
if metadata.get("force_detailed"):
max_tokens = max(max_tokens, GATEWAY_MAX_TOKENS_DETAILED)
if metadata.get("force_concise"):
max_tokens = min(max_tokens, GATEWAY_MAX_TOKENS_CONCISE)
infer_body = { infer_body = {
"prompt": message, "prompt": message,
"system_prompt": system_prompt, "system_prompt": system_prompt,
"metadata": metadata "metadata": metadata,
"max_tokens": max_tokens,
"temperature": float(metadata.get("temperature_override", GATEWAY_TEMPERATURE_DEFAULT)),
} }
images = context.get("images", []) images = context.get("images", [])
@@ -54,7 +114,10 @@ async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]:
infer_body["provider_override"] = metadata["provider"] infer_body["provider_override"] = metadata["provider"]
prov = metadata.get("provider", "default") prov = metadata.get("provider", "default")
logger.info(f"Sending to Router ({infer_url}): agent={agent_id}, provider={prov}, has_images={bool(images)}, prompt_len={len(message)}") logger.info(
f"Sending to Router ({infer_url}): agent={agent_id}, provider={prov}, "
f"has_images={bool(images)}, prompt_len={len(message)}, max_tokens={max_tokens}"
)
try: try:
async with httpx.AsyncClient(timeout=ROUTER_TIMEOUT) as client: async with httpx.AsyncClient(timeout=ROUTER_TIMEOUT) as client:
@@ -74,12 +137,18 @@ async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]:
"ok": True, "ok": True,
"data": { "data": {
"text": result.get("response", result.get("text", "")), "text": result.get("response", result.get("text", "")),
"image_base64": result.get("image_base64") "image_base64": result.get("image_base64"),
"file_base64": result.get("file_base64"),
"file_name": result.get("file_name"),
"file_mime": result.get("file_mime"),
}, },
"response": result.get("response", result.get("text", "")), "response": result.get("response", result.get("text", "")),
"model": result.get("model"), "model": result.get("model"),
"backend": result.get("backend"), "backend": result.get("backend"),
"image_base64": result.get("image_base64") "image_base64": result.get("image_base64"),
"file_base64": result.get("file_base64"),
"file_name": result.get("file_name"),
"file_mime": result.get("file_mime"),
} }
except httpx.TimeoutException as e: except httpx.TimeoutException as e:

View File

@@ -26,45 +26,64 @@ FULL_STANDARD_STACK = [
"presentation_create", "presentation_create",
"presentation_status", "presentation_status",
"presentation_download", "presentation_download",
# File artifacts
"file_tool",
] ]
# Specialized tools per agent (on top of standard stack) # Specialized tools per agent (on top of standard stack)
AGENT_SPECIALIZED_TOOLS = { AGENT_SPECIALIZED_TOOLS = {
# Helion - Energy platform # Helion - Energy platform
# Specialized: energy calculations, solar/wind analysis # Specialized: energy calculations, solar/wind analysis
"helion": [], "helion": ['comfy_generate_image', 'comfy_generate_video'],
# Alateya - R&D Lab OS # Alateya - R&D Lab OS
# Specialized: experiment tracking, hypothesis testing # Specialized: experiment tracking, hypothesis testing
"alateya": [], "alateya": ['comfy_generate_image', 'comfy_generate_video'],
# Nutra - Health & Nutrition # Nutra - Health & Nutrition
# Specialized: nutrition calculations, supplement analysis # Specialized: nutrition calculations, supplement analysis
"nutra": [], "nutra": ['comfy_generate_image', 'comfy_generate_video'],
# AgroMatrix - Agriculture # AgroMatrix - Agriculture
# Specialized: crop analysis, weather integration, field mapping # Specialized: crop analysis, weather integration, field mapping
"agromatrix": [], "agromatrix": ['comfy_generate_image', 'comfy_generate_video'],
# GreenFood - Food & Eco # GreenFood - Food & Eco
# Specialized: recipe analysis, eco-scoring # Specialized: recipe analysis, eco-scoring
"greenfood": [], "greenfood": ['comfy_generate_image', 'comfy_generate_video'],
# Druid - Knowledge Search # Druid - Knowledge Search
# Specialized: deep RAG, document comparison # Specialized: deep RAG, document comparison
"druid": [], "druid": ['comfy_generate_image', 'comfy_generate_video'],
# DaarWizz - DAO Coordination # DaarWizz - DAO Coordination
# Specialized: governance tools, voting, treasury # Specialized: governance tools, voting, treasury
"daarwizz": [], "daarwizz": ['comfy_generate_image', 'comfy_generate_video'],
# Clan - Community # Clan - Community
# Specialized: event management, polls, member tracking # Specialized: event management, polls, member tracking
"clan": [], "clan": ['comfy_generate_image', 'comfy_generate_video'],
# Eonarch - Philosophy & Evolution # Eonarch - Philosophy & Evolution
# Specialized: concept mapping, timeline analysis # Specialized: concept mapping, timeline analysis
"eonarch": [], "eonarch": ['comfy_generate_image', 'comfy_generate_video'],
# SenpAI (Gordon Senpai) - Trading & Markets
# Specialized: real-time market data, features, signals
"senpai": ['market_data', 'comfy_generate_image', 'comfy_generate_video'],
# Soul / Athena - Spiritual Mentor
"soul": ['comfy_generate_image', 'comfy_generate_video'],
# Yaromir - Tech Lead
"yaromir": ['comfy_generate_image', 'comfy_generate_video'],
# Sofiia - Chief AI Architect
"sofiia": ['comfy_generate_image', 'comfy_generate_video'],
# Daarion - Media Generation
"daarion": ['comfy_generate_image', 'comfy_generate_video'],
} }
# CrewAI team structure per agent (future implementation) # CrewAI team structure per agent (future implementation)

View File

@@ -9,6 +9,7 @@ import re
import yaml import yaml
import httpx import httpx
import logging import logging
import hashlib
import time # For latency metrics import time # For latency metrics
# CrewAI Integration # CrewAI Integration
@@ -40,6 +41,30 @@ except ImportError:
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
TRUSTED_DOMAINS_CONFIG_PATH = os.getenv("TRUSTED_DOMAINS_CONFIG_PATH", "./trusted_domains.yml")
_trusted_domains_cache: Dict[str, Any] = {"mtime": None, "data": {}}
def _load_trusted_domains_overrides() -> Dict[str, Any]:
"""Load optional trusted domains overrides editable by mentors."""
global _trusted_domains_cache
try:
if not os.path.exists(TRUSTED_DOMAINS_CONFIG_PATH):
return {}
mtime = os.path.getmtime(TRUSTED_DOMAINS_CONFIG_PATH)
if _trusted_domains_cache.get("mtime") == mtime:
return _trusted_domains_cache.get("data") or {}
with open(TRUSTED_DOMAINS_CONFIG_PATH, "r", encoding="utf-8") as f:
raw = yaml.safe_load(f) or {}
if not isinstance(raw, dict):
raw = {}
_trusted_domains_cache = {"mtime": mtime, "data": raw}
return raw
except Exception as e:
logger.warning(f"⚠️ Failed to load trusted domains overrides: {e}")
return {}
def _strip_dsml_keep_text_before(text: str) -> str: def _strip_dsml_keep_text_before(text: str) -> str:
"""If response contains DSML, return only the part before the first DSML-like tag. Otherwise return empty (caller will use fallback).""" """If response contains DSML, return only the part before the first DSML-like tag. Otherwise return empty (caller will use fallback)."""
@@ -69,6 +94,499 @@ def _strip_dsml_keep_text_before(text: str) -> str:
return prefix if len(prefix) > 30 else "" return prefix if len(prefix) > 30 else ""
def _vision_prompt_wants_web(prompt: str) -> bool:
if not prompt:
return False
p = prompt.lower()
markers = [
"знайди", "пошукай", "пошук", "в інтернет", "в інтернеті", "у відкритих джерелах",
"що це", "що на фото", "який це", "яка це", "identify", "find online", "search web",
"назва", "бренд", "виробник", "інструкція", "дозування", "регламент", "де купити", "ціна",
]
return any(m in p for m in markers)
def _vision_answer_uncertain(answer: str) -> bool:
if not answer:
return True
a = answer.lower()
uncertain_markers = [
"ймовірно", "можливо", "схоже", "не впевнений", "не можу визначити", "важко сказати",
"probably", "maybe", "looks like", "not sure", "cannot identify"
]
return any(m in a for m in uncertain_markers)
EMPTY_ANSWER_GUARD_AGENTS = {"devtools", "monitor"}
def _normalize_text_response(text: str) -> str:
return re.sub(r"\s+", " ", str(text or "")).strip()
def _needs_empty_answer_recovery(text: str) -> bool:
normalized = _normalize_text_response(text)
if not normalized:
return True
low = normalized.lower()
if len(normalized) < 8:
return True
meta_markers = (
"the user", "user asked", "i need", "let me", "analysis", "thinking",
"користувач", "потрібно", "спочатку", "сначала"
)
if any(m in low for m in meta_markers) and len(normalized) < 80:
return True
if normalized in {"...", "ok", "done"}:
return True
return False
def _image_response_needs_retry(text: str) -> bool:
normalized = _normalize_text_response(text)
if _needs_empty_answer_recovery(normalized):
return True
low = normalized.lower()
blocked_markers = (
"не можу бачити", "не можу аналізувати зображення", "опишіть фото словами",
"cannot view images", "cannot analyze image", "as a text model"
)
if any(m in low for m in blocked_markers):
return True
return len(normalized) < 24
def _vision_response_is_blurry(text: str) -> bool:
low = _normalize_text_response(text).lower()
if not low:
return False
blurry_markers = (
"розмит", "нечітк", "не дуже чітк", "blur", "blurry", "out of focus", "low quality"
)
return any(m in low for m in blurry_markers)
def _build_image_fallback_response(agent_id: str, prompt: str = "") -> str:
if (agent_id or "").lower() == "agromatrix":
return (
"Фото поки занадто нечітке, тому діагноз неточний. "
"Надішли, будь ласка, 2-3 чіткі фото: загальний вигляд рослини, крупний план проблемної ділянки "
"і (для листка) нижній бік. Якщо можеш, додай культуру та стадію росту."
)
return "Я поки не бачу достатньо деталей на фото. Надішли, будь ласка, чіткіше фото або крупний план об'єкта."
def _sanitize_vision_text_for_user(text: str) -> str:
if not text:
return ""
normalized = re.sub(r"\s+", " ", str(text)).strip()
if not normalized:
return ""
sentences = [seg.strip() for seg in re.split(r"(?<=[.!?])\s+", normalized) if seg.strip()]
meta_markers = (
"okay", "the user", "user sent", "they want", "i need", "let me", "i will",
"first, look at the image", "look at the image", "first, analyze",
"first, looking at the image", "looking at the image",
"хорошо", "користувач", "пользователь", "потрібно", "нужно", "спочатку", "сначала"
)
cleaned = [sent for sent in sentences if not any(m in sent.lower() for m in meta_markers)]
if cleaned:
out = " ".join(cleaned[:3]).strip()
else:
# If text is only meta-reasoning, prefer empty over leaking service text to user.
if any(m in normalized.lower() for m in meta_markers):
return ""
out = " ".join(sentences[:3]).strip()
if len(out) > 700:
out = out[:700].rsplit(" ", 1)[0] + "..."
return out
def _extract_vision_search_facts(text: str, max_chars: int = 220) -> str:
fact = _sanitize_vision_text_for_user(text)
# If sanitizer dropped everything (meta-only), try to recover object phrase.
if not fact and text:
raw = re.sub(r"\s+", " ", str(text)).strip()
raw = re.sub(r"(?i)^okay,?\s*", "", raw)
raw = re.sub(r"(?i)^let\'s\s+see\.?\s*", "", raw)
raw = re.sub(r"(?i)^the user sent (an image|a photo|a picture) of\s+", "", raw)
raw = re.sub(r"(?i)^user sent (an image|a photo|a picture) of\s+", "", raw)
raw = re.sub(r"(?i)^an image of\s+", "", raw)
raw = re.sub(r"(?i)they want.*$", "", raw).strip(" .")
fact = raw
if not fact:
return ""
fact = re.sub(r"(?i)джерела\s*:\s*.*$", "", fact).strip()
fact = re.sub(r"[*_`#\[\]()]", "", fact)
fact = re.sub(r"\s{2,}", " ", fact).strip(" .,")
if len(fact) > max_chars:
fact = fact[:max_chars].rsplit(" ", 1)[0]
return fact
def _build_vision_web_query(prompt: str, vision_text: str) -> str:
# Keep query compact and deterministic for web_search tool.
source_intent = any(k in (prompt or "").lower() for k in ("джерел", "підтвердж", "source", "reference"))
prompt_part = (prompt or "").strip()
# Remove generic question wrappers that pollute search quality.
prompt_part = re.sub(r"(?i)що\s*це\s*на\s*фото\??", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)дай\s*2-?3\s*джерела", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)дай\s*\d+\s*джерел[а-я]*\s*для", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)дай\s*\d+\s*джерел[а-я]*", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)знайди\s*в\s*інтернеті", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)знайди\s*в\s*інтернеті\s*схожі\s*джерела", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)підтвердження", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)якщо\s*не\s*впевнений.*$", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)пошукай.*$", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?iu)\bі\b\.?$", "", prompt_part).strip(" .")
vision_part = _extract_vision_search_facts(vision_text)
if vision_part:
tokens = re.findall(r"[a-zA-Zа-яА-ЯіїєІЇЄ0-9]{3,}", vision_part.lower())
generic_tokens = {
"first", "look", "image", "photo", "picture", "context", "the", "and",
"спочатку", "подивись", "зображення", "фото", "картинка", "контекст",
}
if len(tokens) < 3 or len(vision_part) < 18 or all(t in generic_tokens for t in tokens):
# Too vague entity extraction (e.g., single word "rex") -> skip web enrichment.
return ""
if vision_part:
if prompt_part:
q = f"{vision_part}. контекст: {prompt_part}".strip(" .")
else:
q = vision_part
if source_intent:
q = f"{q} wikipedia encyclopedia"
return q.strip()
return ""
def _compact_web_search_result(raw: str, query: str = "", agent_id: str = "", max_chars: int = 900) -> str:
if not raw:
return ""
text = str(raw).strip()
if not text:
return ""
def _extract_domain(url: str) -> str:
if not url:
return ""
d = url.lower().strip()
d = d.replace("https://", "").replace("http://", "")
d = d.split("/")[0]
if d.startswith("www."):
d = d[4:]
return d
low_signal_tokens = (
"grid maker", "converter", "convert", "download", "wallpaper", "stock photo",
"instagram", "pinterest", "tiktok", "youtube", "facebook", "generator", "meme",
)
low_signal_domains = (
"pinterest.com", "instagram.com", "tiktok.com", "youtube.com",
"facebook.com", "vk.com", "yandex.", "stackexchange.com",
"zhihu.com", "baidu.com",
)
trusted_common_domains = (
"wikipedia.org", "wikidata.org", "britannica.com",
"who.int", "fao.org", "oecd.org", "worldbank.org", "un.org", "europa.eu",
"nature.com", "science.org", "sciencedirect.com", "springer.com",
)
trusted_agro_domains = (
"fao.org", "europa.eu", "ec.europa.eu", "usda.gov", "nass.usda.gov",
"ukragroconsult.com", "minagro.gov.ua", "rada.gov.ua", "kmu.gov.ua",
"agroportal.ua", "latifundist.com", "kurkul.com",
)
trusted_by_agent = {
"agromatrix": trusted_agro_domains,
"alateya": (
"europa.eu", "un.org", "worldbank.org", "oecd.org",
),
"clan": (
"europa.eu", "un.org", "wikipedia.org",
),
"daarwizz": (
"openai.com", "anthropic.com", "mistral.ai", "huggingface.co",
"python.org", "github.com",
),
"devtools": (
"github.com", "docs.python.org", "pypi.org", "docker.com",
"kubernetes.io", "fastapi.tiangolo.com", "postgresql.org",
),
"druid": (
"who.int", "nih.gov", "ncbi.nlm.nih.gov", "wikipedia.org",
),
"eonarch": (
"iea.org", "irena.org", "entsoe.eu", "europa.eu", "worldbank.org",
),
"greenfood": (
"fao.org", "who.int", "efsa.europa.eu", "usda.gov", "ec.europa.eu",
),
"senpai": (
"binance.com", "bybit.com", "coinbase.com", "kraken.com",
"coindesk.com", "cointelegraph.com", "tradingview.com",
"cftc.gov", "sec.gov", "esma.europa.eu",
),
"sofiia": (
"who.int", "nih.gov", "ncbi.nlm.nih.gov", "ema.europa.eu",
"fda.gov", "mayoclinic.org", "nhs.uk",
),
"helion": (
"iea.org", "irena.org", "entsoe.eu", "europa.eu", "worldbank.org",
),
"nutra": (
"fao.org", "who.int", "efsa.europa.eu", "fda.gov",
),
"microdao_orchestrator": (
"openai.com", "anthropic.com", "mistral.ai", "github.com",
"europa.eu", "un.org", "worldbank.org",
),
"monitor": (
"grafana.com", "prometheus.io", "elastic.co", "datadoghq.com",
"opentelemetry.io",
),
"soul": (
"who.int", "nih.gov", "ncbi.nlm.nih.gov", "wikipedia.org",
),
"yaromir": (
"europa.eu", "un.org", "worldbank.org", "wikipedia.org",
),
}
def _norm_domain_entry(value: Any) -> str:
if isinstance(value, dict):
value = value.get("url") or value.get("domain") or ""
value = str(value or "").strip().lower()
if not value:
return ""
value = value.replace("https://", "").replace("http://", "")
value = value.split("/")[0]
if value.startswith("www."):
value = value[4:]
return value
def _norm_domain_list(values: Any) -> List[str]:
out: List[str] = []
if not isinstance(values, list):
return out
for v in values:
d = _norm_domain_entry(v)
if d:
out.append(d)
return out
overrides = _load_trusted_domains_overrides()
extra_low_signal = _norm_domain_list(overrides.get("low_signal_domains"))
if extra_low_signal:
low_signal_domains = tuple(dict.fromkeys([*low_signal_domains, *extra_low_signal]))
extra_common = _norm_domain_list(overrides.get("common_domains"))
if extra_common:
trusted_common_domains = tuple(dict.fromkeys([*trusted_common_domains, *extra_common]))
agents_overrides = overrides.get("agents") if isinstance(overrides.get("agents"), dict) else {}
for a, cfg in agents_overrides.items():
if not isinstance(cfg, dict):
continue
doms = _norm_domain_list(cfg.get("domains"))
if doms:
base = trusted_by_agent.get(str(a).lower(), ())
merged = tuple(dict.fromkeys([*base, *doms]))
trusted_by_agent[str(a).lower()] = merged
agro_query_terms = {
"агро", "agro", "crop", "crops", "fertilizer", "fertilizers",
"field", "soil", "harvest", "yield", "pesticide", "herbicide",
"farm", "farming", "tractor", "зерно", "пшениц", "кукурудз",
"соняшник", "ріпак", "врожай", "ґрунт", "поле", "добрив",
"насіння", "ззр", "фермер",
}
query_terms = {t for t in re.findall(r"[a-zA-Zа-яА-ЯіїєІЇЄ0-9]{3,}", (query or "").lower())}
agro_mode = any(any(k in term for k in agro_query_terms) for term in query_terms)
agent_trusted_domains = trusted_by_agent.get((agent_id or "").lower(), ())
# Parse bullet blocks from tool output.
chunks = []
current = []
for line in text.splitlines():
ln = line.rstrip()
if ln.startswith("- ") and current:
chunks.append("\n".join(current))
current = [ln]
else:
current.append(ln)
if current:
chunks.append("\n".join(current))
scored = []
for chunk in chunks:
lines = [ln.strip() for ln in chunk.splitlines() if ln.strip()]
title = lines[0][2:].strip() if lines and lines[0].startswith("- ") else (lines[0] if lines else "")
url_line = next((ln for ln in lines if ln.lower().startswith("url:")), "")
url = url_line.split(":", 1)[1].strip() if ":" in url_line else ""
domain = _extract_domain(url)
text_blob = " ".join(lines).lower()
if any(x in domain for x in low_signal_domains):
continue
score = 0
for t in query_terms:
if t in text_blob:
score += 2
if any(tok in text_blob for tok in low_signal_tokens):
score -= 3
if domain.endswith(".gov") or domain.endswith(".gov.ua") or domain.endswith(".edu"):
score += 2
if any(domain == d or domain.endswith("." + d) for d in trusted_common_domains):
score += 2
if any(domain == d or domain.endswith("." + d) for d in agent_trusted_domains):
score += 2
if any(domain.endswith(d) for d in ("wikipedia.org", "wikidata.org", "fao.org", "europa.eu")):
score += 2
if agro_mode:
if any(domain == d or domain.endswith("." + d) for d in trusted_agro_domains):
score += 3
else:
score -= 1
if not url:
score -= 1
if len(title) < 6:
score -= 1
scored.append((score, domain, chunk))
def _is_trusted_agro(domain: str) -> bool:
if not domain:
return False
if any(domain == d or domain.endswith("." + d) for d in trusted_common_domains):
return True
return any(domain == d or domain.endswith("." + d) for d in trusted_agro_domains)
scored.sort(key=lambda x: x[0], reverse=True)
kept = []
seen_domains = set()
if agro_mode:
for s, domain, chunk in scored:
if s < 1 or not _is_trusted_agro(domain):
continue
if domain and domain in seen_domains:
continue
if domain:
seen_domains.add(domain)
kept.append(chunk)
if len(kept) >= 3:
break
if kept:
compact = "\n\n".join(kept).strip()
if len(compact) > max_chars:
compact = compact[:max_chars].rstrip() + "..."
return compact
for s, domain, chunk in scored:
if s < 2:
continue
if domain and domain in seen_domains:
continue
if domain:
seen_domains.add(domain)
kept.append(chunk)
if len(kept) >= 3:
break
if not kept:
return ""
compact = "\n\n".join(kept).strip()
if len(compact) > max_chars:
compact = compact[:max_chars].rstrip() + "..."
return compact
def _extract_sources_from_compact(compact: str, max_items: int = 3) -> List[Dict[str, str]]:
if not compact:
return []
items: List[Dict[str, str]] = []
chunks = [c for c in compact.split("\n\n") if c.strip()]
for chunk in chunks:
lines = [ln.strip() for ln in chunk.splitlines() if ln.strip()]
if not lines:
continue
title = lines[0][2:].strip() if lines[0].startswith("- ") else lines[0]
url_line = next((ln for ln in lines if ln.lower().startswith("url:")), "")
url = url_line.split(":", 1)[1].strip() if ":" in url_line else ""
if not url:
continue
items.append({"title": title[:180], "url": url[:500]})
if len(items) >= max_items:
break
return items
def _condition_matches(cond: Dict[str, Any], agent_id: str, metadata: Dict[str, Any]) -> bool:
"""Minimal matcher for router-config `when` conditions."""
if not isinstance(cond, dict):
return True
meta = metadata or {}
if "agent" in cond and cond.get("agent") != agent_id:
return False
if "mode" in cond and meta.get("mode") != cond.get("mode"):
return False
if "metadata_has" in cond:
key = cond.get("metadata_has")
if key not in meta:
return False
if "metadata_equals" in cond:
eq = cond.get("metadata_equals") or {}
for k, v in eq.items():
if meta.get(k) != v:
return False
if "task_type" in cond:
expected = cond.get("task_type")
actual = meta.get("task_type")
if isinstance(expected, list):
if actual not in expected:
return False
elif actual != expected:
return False
if "api_key_available" in cond:
env_name = cond.get("api_key_available")
if not (isinstance(env_name, str) and os.getenv(env_name)):
return False
if "and" in cond:
clauses = cond.get("and") or []
if not isinstance(clauses, list):
return False
for clause in clauses:
if not _condition_matches(clause, agent_id, meta):
return False
return True
def _select_default_llm(agent_id: str, metadata: Dict[str, Any], base_llm: str, routing_rules: List[Dict[str, Any]]) -> str:
"""Select LLM by first matching routing rule with `use_llm`."""
for rule in routing_rules:
when = rule.get("when", {})
if _condition_matches(when, agent_id, metadata):
use_llm = rule.get("use_llm")
if use_llm:
logger.info(f"🎯 Agent {agent_id} routing rule {rule.get('id', '<no-id>')} -> {use_llm}")
return use_llm
return base_llm
app = FastAPI(title="DAARION Router", version="2.0.0") app = FastAPI(title="DAARION Router", version="2.0.0")
# Configuration # Configuration
@@ -404,6 +922,9 @@ class InferResponse(BaseModel):
tokens_used: Optional[int] = None tokens_used: Optional[int] = None
backend: str backend: str
image_base64: Optional[str] = None # Generated image in base64 format image_base64: Optional[str] = None # Generated image in base64 format
file_base64: Optional[str] = None
file_name: Optional[str] = None
file_mime: Optional[str] = None
@@ -675,13 +1196,14 @@ async def agent_infer(agent_id: str, request: InferRequest):
# Get system prompt from database or config # Get system prompt from database or config
system_prompt = request.system_prompt system_prompt = request.system_prompt
# Debug logging for system prompt system_prompt_source = "request"
if system_prompt: if system_prompt:
logger.info(f"📝 Received system_prompt from request: {len(system_prompt)} chars") logger.info(f"📝 Received system_prompt from request: {len(system_prompt)} chars")
logger.debug(f"System prompt preview: {system_prompt[:200]}...") logger.debug(f"System prompt preview: {system_prompt[:200]}...")
else: else:
logger.warning(f"⚠️ No system_prompt in request for agent {agent_id}, trying to load...") system_prompt_source = "city_service"
logger.info(f" No system_prompt in request for agent {agent_id}, loading from configured sources")
if not system_prompt: if not system_prompt:
try: try:
from prompt_builder import get_agent_system_prompt from prompt_builder import get_agent_system_prompt
@@ -694,8 +1216,26 @@ async def agent_infer(agent_id: str, request: InferRequest):
except Exception as e: except Exception as e:
logger.warning(f"⚠️ Could not load prompt from database: {e}") logger.warning(f"⚠️ Could not load prompt from database: {e}")
# Fallback to config # Fallback to config
system_prompt_source = "router_config"
agent_config = router_config.get("agents", {}).get(agent_id, {}) agent_config = router_config.get("agents", {}).get(agent_id, {})
system_prompt = agent_config.get("system_prompt") system_prompt = agent_config.get("system_prompt")
if not system_prompt:
system_prompt_source = "empty"
logger.warning(f"⚠️ System prompt unavailable for {agent_id}; continuing with provider defaults")
system_prompt_hash = hashlib.sha256((system_prompt or "").encode("utf-8")).hexdigest()[:12]
effective_metadata = dict(metadata)
effective_metadata["system_prompt_hash"] = system_prompt_hash
effective_metadata["system_prompt_source"] = system_prompt_source
effective_metadata["system_prompt_version"] = (
metadata.get("system_prompt_version")
or f"{agent_id}:{system_prompt_hash}"
)
logger.info(
f"🧩 Prompt meta for {agent_id}: source={system_prompt_source}, "
f"version={effective_metadata['system_prompt_version']}, hash={system_prompt_hash}"
)
# Determine which backend to use # Determine which backend to use
# Use router config to get default model for agent, fallback to qwen3:8b # Use router config to get default model for agent, fallback to qwen3:8b
@@ -713,8 +1253,8 @@ async def agent_infer(agent_id: str, request: InferRequest):
agent_id=agent_id, agent_id=agent_id,
prompt=request.prompt, prompt=request.prompt,
agent_config=agent_config, agent_config=agent_config,
force_crewai=request.metadata.get("force_crewai", False) if request.metadata else False, metadata=effective_metadata,
force_crewai=effective_metadata.get("force_crewai", False),
) )
logger.info(f"🎭 CrewAI decision for {agent_id}: {use_crewai} ({crewai_reason})") logger.info(f"🎭 CrewAI decision for {agent_id}: {use_crewai} ({crewai_reason})")
@@ -727,7 +1267,12 @@ async def agent_infer(agent_id: str, request: InferRequest):
context={ context={
"memory_brief": memory_brief_text, "memory_brief": memory_brief_text,
"system_prompt": system_prompt, "system_prompt": system_prompt,
"metadata": metadata, "system_prompt_meta": {
"source": system_prompt_source,
"version": effective_metadata.get("system_prompt_version"),
"hash": system_prompt_hash,
},
"metadata": effective_metadata,
}, },
team=crewai_cfg.get("team") team=crewai_cfg.get("team")
) )
@@ -755,9 +1300,8 @@ async def agent_infer(agent_id: str, request: InferRequest):
return InferResponse( return InferResponse(
response=crew_result["result"], response=crew_result["result"],
model="crewai-" + agent_id, model="crewai-" + agent_id,
provider="crewai", backend="crewai",
tokens_used=0, tokens_used=0
latency_ms=int(latency * 1000)
) )
else: else:
logger.warning(f"⚠️ CrewAI failed, falling back to direct LLM") logger.warning(f"⚠️ CrewAI failed, falling back to direct LLM")
@@ -765,15 +1309,9 @@ async def agent_infer(agent_id: str, request: InferRequest):
logger.exception(f"❌ CrewAI error: {e}, falling back to direct LLM") logger.exception(f"❌ CrewAI error: {e}, falling back to direct LLM")
default_llm = agent_config.get("default_llm", "qwen3:8b") default_llm = agent_config.get("default_llm", "qwen3:8b")
# Check if there's a routing rule for this agent
routing_rules = router_config.get("routing", []) routing_rules = router_config.get("routing", [])
for rule in routing_rules: default_llm = _select_default_llm(agent_id, metadata, default_llm, routing_rules)
if rule.get("when", {}).get("agent") == agent_id:
if "use_llm" in rule:
default_llm = rule.get("use_llm")
logger.info(f"🎯 Agent {agent_id} routing to: {default_llm}")
break
# Get LLM profile configuration # Get LLM profile configuration
llm_profiles = router_config.get("llm_profiles", {}) llm_profiles = router_config.get("llm_profiles", {})
@@ -819,15 +1357,114 @@ async def agent_infer(agent_id: str, request: InferRequest):
if vision_resp.status_code == 200: if vision_resp.status_code == 200:
vision_data = vision_resp.json() vision_data = vision_resp.json()
full_response = vision_data.get("text", "") raw_response = vision_data.get("text", "")
full_response = _sanitize_vision_text_for_user(raw_response)
vision_web_query = ""
vision_sources: List[Dict[str, str]] = []
# Debug: log full response structure # Debug: log full response structure
logger.info(f"✅ Vision response: {len(full_response)} chars, success={vision_data.get('success')}, keys={list(vision_data.keys())}") logger.info(
f"✅ Vision response: raw={len(raw_response)} chars, sanitized={len(full_response)} chars, "
f"success={vision_data.get('success')}, keys={list(vision_data.keys())}"
)
if raw_response and not full_response:
full_response = _extract_vision_search_facts(raw_response, max_chars=280)
if not full_response: if not full_response:
logger.warning(f"⚠️ Empty vision response! Full data: {str(vision_data)[:500]}") logger.warning(f"⚠️ Empty vision response! Full data: {str(vision_data)[:500]}")
# Optional vision -> web enrichment (soft policy):
# if prompt explicitly asks to search online OR vision answer is uncertain.
if (full_response or raw_response) and TOOL_MANAGER_AVAILABLE and tool_manager:
try:
wants_web = _vision_prompt_wants_web(request.prompt)
uncertain = _vision_answer_uncertain(full_response or raw_response)
if wants_web or uncertain:
query = _build_vision_web_query(request.prompt, full_response or raw_response)
if not query:
logger.info("🔎 Vision web enrich skipped: query not actionable")
else:
vision_web_query = query
search_result = await tool_manager.execute_tool(
"web_search",
{"query": query, "max_results": 3},
agent_id=request_agent_id,
chat_id=chat_id,
user_id=user_id,
)
if search_result and search_result.success and search_result.result:
compact_search = _compact_web_search_result(
search_result.result,
query=query,
agent_id=request_agent_id,
)
if compact_search and "Нічого не знайдено" not in compact_search:
vision_sources = _extract_sources_from_compact(compact_search)
base_text = full_response or "Не вдалося надійно ідентифікувати об'єкт на фото."
full_response = (
f"{base_text}\n\n"
f"Додатково знайшов у відкритих джерелах:\n{compact_search}"
)
logger.info(
"🌐 Vision web enrichment applied "
f"for agent={request_agent_id}, wants_web={wants_web}, uncertain={uncertain}, "
f"sources={len(vision_sources)}"
)
except Exception as e:
logger.warning(f"⚠️ Vision web enrichment failed: {e}")
if vision_web_query:
logger.info(
f"🗂️ Vision enrichment metadata: agent={request_agent_id}, "
f"query='{vision_web_query[:180]}', sources={len(vision_sources)}"
)
# Image quality gate: one soft retry if response looks empty/meta.
if _image_response_needs_retry(full_response):
try:
logger.warning(f"⚠️ Vision quality gate triggered for agent={request_agent_id}, retrying once")
retry_payload = dict(vision_payload)
retry_payload["prompt"] = (
"Опиши зображення по суті: що зображено, ключові деталі, можливий контекст. "
"Відповідай українською 2-4 реченнями, без службових фраз. "
f"Запит користувача: {request.prompt}"
)
retry_resp = await http_client.post(
f"{SWAPPER_URL}/vision",
json=retry_payload,
timeout=120.0
)
if retry_resp.status_code == 200:
retry_data = retry_resp.json()
retry_raw = retry_data.get("text", "")
retry_text = _sanitize_vision_text_for_user(retry_raw)
if retry_raw and not retry_text:
retry_text = _extract_vision_search_facts(retry_raw, max_chars=280)
if retry_text and not _image_response_needs_retry(retry_text):
full_response = retry_text
logger.info(f"✅ Vision quality retry improved response for agent={request_agent_id}")
except Exception as e:
logger.warning(f"⚠️ Vision quality retry failed: {e}")
if _image_response_needs_retry(full_response):
full_response = _build_image_fallback_response(request_agent_id, request.prompt)
elif request_agent_id == "agromatrix" and _vision_response_is_blurry(full_response):
full_response = _build_image_fallback_response(request_agent_id, request.prompt)
# Store vision message in agent-specific memory # Store vision message in agent-specific memory
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id and full_response: if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id and full_response:
vision_meta: Dict[str, Any] = {}
if vision_web_query:
vision_meta["vision_search_query"] = vision_web_query[:500]
if vision_sources:
vision_meta["vision_sources"] = vision_sources
asyncio.create_task( asyncio.create_task(
memory_retrieval.store_message( memory_retrieval.store_message(
agent_id=request_agent_id, agent_id=request_agent_id,
@@ -836,7 +1473,8 @@ async def agent_infer(agent_id: str, request: InferRequest):
message_text=f"[Image] {request.prompt}", message_text=f"[Image] {request.prompt}",
response_text=full_response, response_text=full_response,
chat_id=chat_id, chat_id=chat_id,
message_type="vision" message_type="vision",
metadata=vision_meta if vision_meta else None,
) )
) )
@@ -848,11 +1486,21 @@ async def agent_infer(agent_id: str, request: InferRequest):
) )
else: else:
logger.error(f"❌ Swapper vision error: {vision_resp.status_code} - {vision_resp.text[:200]}") logger.error(f"❌ Swapper vision error: {vision_resp.status_code} - {vision_resp.text[:200]}")
# Fall through to text processing return InferResponse(
response=_build_image_fallback_response(request_agent_id, request.prompt),
model="qwen3-vl-8b",
tokens_used=None,
backend="swapper-vision-fallback"
)
except Exception as e: except Exception as e:
logger.error(f"❌ Vision processing failed: {e}", exc_info=True) logger.error(f"❌ Vision processing failed: {e}", exc_info=True)
# Fall through to text processing return InferResponse(
response=_build_image_fallback_response(request_agent_id, request.prompt),
model="qwen3-vl-8b",
tokens_used=None,
backend="swapper-vision-fallback"
)
# ========================================================================= # =========================================================================
# SMART LLM ROUTER WITH AUTO-FALLBACK # SMART LLM ROUTER WITH AUTO-FALLBACK
@@ -881,6 +1529,10 @@ async def agent_infer(agent_id: str, request: InferRequest):
max_tokens = request.max_tokens or llm_profile.get("max_tokens", 2048) max_tokens = request.max_tokens or llm_profile.get("max_tokens", 2048)
temperature = request.temperature or llm_profile.get("temperature", 0.2) temperature = request.temperature or llm_profile.get("temperature", 0.2)
cloud_provider_names = {"deepseek", "mistral", "grok", "openai", "anthropic"}
allow_cloud = provider in cloud_provider_names
if not allow_cloud:
logger.info(f"☁️ Cloud providers disabled for agent {agent_id}: provider={provider}")
# Define cloud providers with fallback order # Define cloud providers with fallback order
cloud_providers = [ cloud_providers = [
{ {
@@ -905,7 +1557,10 @@ async def agent_infer(agent_id: str, request: InferRequest):
"timeout": 60 "timeout": 60
} }
] ]
if not allow_cloud:
cloud_providers = []
# If specific provider requested, try it first # If specific provider requested, try it first
if provider in ["deepseek", "mistral", "grok"]: if provider in ["deepseek", "mistral", "grok"]:
# Reorder to put requested provider first # Reorder to put requested provider first
@@ -916,7 +1571,7 @@ async def agent_infer(agent_id: str, request: InferRequest):
# Get tool definitions if Tool Manager is available # Get tool definitions if Tool Manager is available
tools_payload = None tools_payload = None
if TOOL_MANAGER_AVAILABLE and tool_manager: if TOOL_MANAGER_AVAILABLE and tool_manager:
tools_payload = tool_manager.get_tool_definitions() tools_payload = tool_manager.get_tool_definitions(request_agent_id)
logger.debug(f"🔧 {len(tools_payload)} tools available for function calling") logger.debug(f"🔧 {len(tools_payload)} tools available for function calling")
for cloud in cloud_providers: for cloud in cloud_providers:
@@ -1034,14 +1689,23 @@ async def agent_infer(agent_id: str, request: InferRequest):
except: except:
tool_args = {} tool_args = {}
result = await tool_manager.execute_tool(tool_name, tool_args, agent_id=request_agent_id) result = await tool_manager.execute_tool(
tool_name,
tool_args,
agent_id=request_agent_id,
chat_id=chat_id,
user_id=user_id,
)
tool_result_dict = { tool_result_dict = {
"tool_call_id": tc.get("id", ""), "tool_call_id": tc.get("id", ""),
"name": tool_name, "name": tool_name,
"success": result.success, "success": result.success,
"result": result.result, "result": result.result,
"error": result.error, "error": result.error,
"image_base64": result.image_base64 # Store image if generated "image_base64": result.image_base64, # Store image if generated
"file_base64": result.file_base64,
"file_name": result.file_name,
"file_mime": result.file_mime,
} }
if result.image_base64: if result.image_base64:
logger.info(f"🖼️ Tool {tool_name} generated image: {len(result.image_base64)} chars") logger.info(f"🖼️ Tool {tool_name} generated image: {len(result.image_base64)} chars")
@@ -1149,14 +1813,22 @@ async def agent_infer(agent_id: str, request: InferRequest):
# Check if any tool generated an image # Check if any tool generated an image
generated_image = None generated_image = None
generated_file_base64 = None
generated_file_name = None
generated_file_mime = None
logger.debug(f"🔍 Checking {len(tool_results)} tool results for images...") logger.debug(f"🔍 Checking {len(tool_results)} tool results for images...")
for tr in tool_results: for tr in tool_results:
img_b64 = tr.get("image_base64") img_b64 = tr.get("image_base64")
if img_b64: if img_b64:
generated_image = img_b64 generated_image = img_b64
logger.info(f"🖼️ Image generated by tool: {tr['name']} ({len(img_b64)} chars)") logger.info(f"🖼️ Image generated by tool: {tr['name']} ({len(img_b64)} chars)")
break file_b64 = tr.get("file_base64")
else: if file_b64 and not generated_file_base64:
generated_file_base64 = file_b64
generated_file_name = tr.get("file_name")
generated_file_mime = tr.get("file_mime")
logger.info(f"📄 File generated by tool: {tr['name']} ({len(file_b64)} chars)")
if not img_b64:
logger.debug(f" Tool {tr['name']}: no image_base64") logger.debug(f" Tool {tr['name']}: no image_base64")
logger.info(f"{cloud['name'].upper()} response received, {tokens_used} tokens") logger.info(f"{cloud['name'].upper()} response received, {tokens_used} tokens")
@@ -1179,7 +1851,10 @@ async def agent_infer(agent_id: str, request: InferRequest):
model=cloud["model"], model=cloud["model"],
tokens_used=tokens_used, tokens_used=tokens_used,
backend=f"{cloud['name']}-cloud", backend=f"{cloud['name']}-cloud",
image_base64=generated_image image_base64=generated_image,
file_base64=generated_file_base64,
file_name=generated_file_name,
file_mime=generated_file_mime,
) )
else: else:
logger.warning(f"⚠️ {cloud['name'].upper()} returned empty response, trying next provider") logger.warning(f"⚠️ {cloud['name'].upper()} returned empty response, trying next provider")
@@ -1253,7 +1928,38 @@ async def agent_infer(agent_id: str, request: InferRequest):
if generate_resp.status_code == 200: if generate_resp.status_code == 200:
data = generate_resp.json() data = generate_resp.json()
local_response = data.get("response", "") local_response = _normalize_text_response(data.get("response", ""))
# Empty-answer gate for selected local top-level agents.
if request_agent_id in EMPTY_ANSWER_GUARD_AGENTS and _needs_empty_answer_recovery(local_response):
logger.warning(f"⚠️ Empty-answer gate triggered for {request_agent_id}, retrying local generate once")
retry_prompt = (
f"{request.prompt}\n\n"
"Відповідай коротко і конкретно (2-5 речень), без службових або мета-фраз."
)
retry_resp = await http_client.post(
f"{SWAPPER_URL}/generate",
json={
"model": local_model,
"prompt": retry_prompt,
"system": system_prompt,
"max_tokens": request.max_tokens,
"temperature": request.temperature,
"stream": False
},
timeout=300.0
)
if retry_resp.status_code == 200:
retry_data = retry_resp.json()
retry_text = _normalize_text_response(retry_data.get("response", ""))
if retry_text and not _needs_empty_answer_recovery(retry_text):
local_response = retry_text
if _needs_empty_answer_recovery(local_response):
local_response = (
"Я не отримав корисну відповідь з першої спроби. "
"Сформулюй запит коротко ще раз, і я відповім конкретно."
)
# Store in agent-specific memory # Store in agent-specific memory
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id and local_response: if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id and local_response:
@@ -1649,4 +2355,3 @@ async def shutdown_event():
if nc: if nc:
await nc.close() await nc.close()
logger.info("🔌 NATS connection closed") logger.info("🔌 NATS connection closed")

View File

@@ -5,6 +5,9 @@ nats-py==2.6.0
PyYAML==6.0.1 PyYAML==6.0.1
httpx>=0.25.0 httpx>=0.25.0
neo4j>=5.14.0 neo4j>=5.14.0
openpyxl>=3.1.2
python-docx>=1.1.2
pypdf>=5.1.0
# Memory Retrieval v3.0 # Memory Retrieval v3.0
asyncpg>=0.29.0 asyncpg>=0.29.0

File diff suppressed because it is too large Load Diff