From d42bb09912daf0892e8b749c4d85570ae4527293 Mon Sep 17 00:00:00 2001 From: Apple Date: Wed, 18 Feb 2026 09:36:16 -0800 Subject: [PATCH] helion: stabilize doc context, remove legacy webhook path, add stack smoke canary --- gateway-bot/http_api.py | 381 +--------------------------- gateway-bot/services/doc_service.py | 27 +- ops/canary_all.sh | 1 + ops/smoke_helion_stack.sh | 89 +++++++ 4 files changed, 111 insertions(+), 387 deletions(-) create mode 100755 ops/smoke_helion_stack.sh diff --git a/gateway-bot/http_api.py b/gateway-bot/http_api.py index e7f0b4e2..aed587dd 100644 --- a/gateway-bot/http_api.py +++ b/gateway-bot/http_api.py @@ -3620,384 +3620,11 @@ async def nutra_telegram_webhook(update: TelegramUpdate): raise HTTPException(status_code=500, detail=str(e)) -# Legacy code - will be removed after testing +# Legacy handler was removed. +# Keep a tiny sentinel for imports/tests that may still reference it. async def _old_helion_telegram_webhook(update: TelegramUpdate): - """Стара версія - використовується для тестування""" - try: - if not update.message: - raise HTTPException(status_code=400, detail="No message in update") - - # Extract message details - from_user = update.message.get("from", {}) - chat = update.message.get("chat", {}) - - user_id = str(from_user.get("id", "unknown")) - chat_id = str(chat.get("id", "unknown")) - username = from_user.get("username", "") - - # Get DAO ID for this chat (Energy Union specific) - dao_id = get_dao_id(chat_id, "telegram", agent_id=agent_config.agent_id) - - # Check for /ingest command - text = update.message.get("text", "") - if text and text.strip().startswith("/ingest"): - session_id = f"telegram:{chat_id}" - - # Check if there's a document in the message - document = update.message.get("document") - if document: - mime_type = document.get("mime_type", "") - file_name = document.get("file_name", "") - file_id = document.get("file_id") - - is_pdf = ( - mime_type == "application/pdf" or - (mime_type.startswith("application/") and file_name.lower().endswith(".pdf")) - ) - - if is_pdf and file_id: - try: - helion_token = os.getenv("HELION_TELEGRAM_BOT_TOKEN") - file_path = await get_telegram_file_path(file_id) - if file_path: - file_url = f"https://api.telegram.org/file/bot{helion_token}/{file_path}" - result = await ingest_document( - session_id=session_id, - doc_url=file_url, - file_name=file_name, - dao_id=dao_id, - user_id=f"tg:{user_id}" - ) - - if result.success: - await send_telegram_message( - chat_id, - f"✅ **Документ імпортовано у RAG**\n\n" - f"📊 Фрагментів: {result.ingested_chunks}\n" - f"📁 DAO: {dao_id}\n\n" - f"Тепер ти можеш задавати питання по цьому документу!", - helion_token - ) - return {"ok": True, "chunks_count": result.ingested_chunks} - else: - await send_telegram_message(chat_id, f"Вибач, не вдалося імпортувати: {result.error}", helion_token) - return {"ok": False, "error": result.error} - except Exception as e: - logger.error(f"Helion: Ingest failed: {e}", exc_info=True) - await send_telegram_message(chat_id, "Вибач, не вдалося імпортувати документ.", helion_token) - return {"ok": False, "error": "Ingest failed"} - - # Try to get last parsed doc_id from session context - helion_token = os.getenv("HELION_TELEGRAM_BOT_TOKEN") - result = await ingest_document( - session_id=session_id, - dao_id=dao_id, - user_id=f"tg:{user_id}" - ) - - if result.success: - await send_telegram_message( - chat_id, - f"✅ **Документ імпортовано у RAG**\n\n" - f"📊 Фрагментів: {result.ingested_chunks}\n" - f"📁 DAO: {dao_id}\n\n" - f"Тепер ти можеш задавати питання по цьому документу!", - helion_token - ) - return {"ok": True, "chunks_count": result.ingested_chunks} - else: - await send_telegram_message(chat_id, "Спочатку надішли PDF-документ, а потім використай /ingest", helion_token) - return {"ok": False, "error": result.error} - - # Check if it's a document (PDF) - document = update.message.get("document") - if document: - mime_type = document.get("mime_type", "") - file_name = document.get("file_name", "") - file_id = document.get("file_id") - - is_pdf = ( - mime_type == "application/pdf" or - (mime_type.startswith("application/") and file_name.lower().endswith(".pdf")) - ) - - if is_pdf and file_id: - logger.info(f"Helion: PDF document from {username} (tg:{user_id}), file_id: {file_id}, file_name: {file_name}") - - try: - helion_token = os.getenv("HELION_TELEGRAM_BOT_TOKEN") - file_path = await get_telegram_file_path(file_id) - if not file_path: - raise HTTPException(status_code=400, detail="Failed to get file from Telegram") - - file_url = f"https://api.telegram.org/file/bot{helion_token}/{file_path}" - - session_id = f"telegram:{chat_id}" - result = await parse_document( - session_id=session_id, - doc_url=file_url, - file_name=file_name, - dao_id=dao_id, - user_id=f"tg:{user_id}", - output_mode="qa_pairs", - metadata={"username": username, "chat_id": chat_id} - ) - - if not result.success: - await send_telegram_message(chat_id, f"Вибач, не вдалося обробити документ: {result.error}", helion_token) - return {"ok": False, "error": result.error} - - # Format response for Telegram - answer_text = "" - if result.qa_pairs: - qa_list = [{"question": qa.question, "answer": qa.answer} for qa in result.qa_pairs] - answer_text = format_qa_response(qa_list) - elif result.markdown: - answer_text = format_markdown_response(result.markdown) - elif result.chunks_meta and result.chunks_meta.get("chunks"): - chunks = result.chunks_meta.get("chunks", []) - answer_text = format_chunks_response(chunks) - else: - answer_text = "✅ Документ успішно оброблено, але формат відповіді не розпізнано." - - if not answer_text.endswith("_"): - answer_text += "\n\n💡 _Використай /ingest для імпорту документа у RAG_" - - logger.info(f"Helion: PDF parsing result: {len(answer_text)} chars, doc_id={result.doc_id}") - await send_telegram_message(chat_id, answer_text, helion_token) - return {"ok": True, "agent": "parser", "mode": "doc_parse", "doc_id": result.doc_id} - - except Exception as e: - logger.error(f"Helion: PDF processing failed: {e}", exc_info=True) - await send_telegram_message(chat_id, "Вибач, не вдалося обробити PDF-документ. Переконайся, що файл не пошкоджений.", helion_token) - return {"ok": False, "error": "PDF processing failed"} - elif document and not is_pdf: - helion_token = os.getenv("HELION_TELEGRAM_BOT_TOKEN") - await send_telegram_message(chat_id, "Наразі підтримуються тільки PDF-документи. Інші формати (docx, zip, тощо) будуть додані пізніше.", helion_token) - return {"ok": False, "error": "Unsupported document type"} - - # Check if it's a photo - photo = update.message.get("photo") - if photo: - # Telegram sends multiple sizes, get the largest one (last in array) - photo_obj = photo[-1] if isinstance(photo, list) else photo - file_id = photo_obj.get("file_id") if isinstance(photo_obj, dict) else None - - if file_id: - logger.info(f"Helion: Photo from {username} (tg:{user_id}), file_id: {file_id}") - - try: - # Get file path from Telegram - helion_token = os.getenv("HELION_TELEGRAM_BOT_TOKEN") - file_path = await get_telegram_file_path(file_id, helion_token) - if not file_path: - raise HTTPException(status_code=400, detail="Failed to get file from Telegram") - - # Build file URL - file_url = f"https://api.telegram.org/file/bot{helion_token}/{file_path}" - - # Send to Router with specialist_vision_8b model (Swapper) - # IMPORTANT: Request BRIEF description (2-3 sentences per v2.3 prompt rules) - router_request = { - "message": f"Коротко (2-3 речення максимум): що на цьому зображенні та яке його значення для Energy Union? {file_url}", - "mode": "chat", - "agent": "helion", - "metadata": { - "source": "telegram", - "dao_id": dao_id, - "user_id": f"tg:{user_id}", - "session_id": f"tg:{chat_id}:{dao_id}", - "username": username, - "chat_id": chat_id, - "file_id": file_id, - "file_url": file_url, - "has_image": True, - }, - "context": { - "agent_name": HELION_NAME, - "system_prompt": HELION_SYSTEM_PROMPT, - }, - } - - # Override LLM to use specialist_vision_8b for image understanding - router_request["metadata"]["use_llm"] = "specialist_vision_8b" - - # Send to Router - logger.info(f"Helion: Sending photo to Router with vision-8b: file_url={file_url[:50]}...") - response = await send_to_router(router_request) - - # Extract response - if isinstance(response, dict) and response.get("ok"): - answer_text = response.get("data", {}).get("text") or response.get("response", "") - - if answer_text: - # Photo processed - send LLM response directly WITHOUT prefix - await send_telegram_message( - chat_id, - answer_text, # No prefix, just the LLM response - helion_token - ) - - # Save to memory for context - await memory_client.save_chat_turn( - agent_id="helion", - team_id=dao_id, - user_id=f"tg:{user_id}", - message=f"[Photo: {file_id}]", - response=answer_text, - channel_id=chat_id, - scope="short_term", - save_agent_response=not is_service_response(answer_text), - agent_metadata={"context": "photo"}, - username=username, - ) - - return {"ok": True, "agent": "helion", "model": "specialist_vision_8b"} - else: - await send_telegram_message(chat_id, "Не вдалося отримати опис зображення.", helion_token) - return {"ok": False, "error": "No description in response"} - else: - error_msg = response.get("error", "Unknown error") if isinstance(response, dict) else "Router error" - logger.error(f"Helion: Vision-8b error: {error_msg}") - await send_telegram_message(chat_id, "Вибач, сталася помилка при обробці фото.", helion_token) - return {"ok": False, "error": error_msg} - - except Exception as e: - logger.error(f"Helion: Photo processing failed: {e}", exc_info=True) - helion_token = os.getenv("HELION_TELEGRAM_BOT_TOKEN") - await send_telegram_message(chat_id, "Вибач, сталася помилка при обробці фото.", helion_token) - return {"ok": False, "error": "Photo processing failed"} - - # Get message text - text = update.message.get("text", "") - if not text: - raise HTTPException(status_code=400, detail="No text in message") - - logger.info(f"Helion Telegram message from {username} (tg:{user_id}) in chat {chat_id}: {text[:50]}") - mentioned_bots = extract_bot_mentions(text) - needs_complex_reasoning = requires_complex_reasoning(text) - - # Check if there's a document context for follow-up questions - session_id = f"telegram:{chat_id}" - doc_context = await get_doc_context(session_id) - - # If there's a doc_id and the message looks like a question about the document - if doc_context and doc_context.doc_id: - # Check if it's a question (simple heuristic: contains question words or ends with ?) - is_question = ( - "?" in text or - any(word in text.lower() for word in ["що", "як", "чому", "коли", "де", "хто", "чи"]) - ) - - if is_question: - logger.info(f"Helion: Follow-up question detected for doc_id={doc_context.doc_id}") - # Try RAG query first - rag_result = await ask_about_document( - session_id=session_id, - question=text, - doc_id=doc_context.doc_id, - dao_id=dao_id or doc_context.dao_id, - user_id=f"tg:{user_id}" - ) - - if rag_result.success and rag_result.answer: - # Truncate if too long for Telegram - answer = rag_result.answer - if len(answer) > TELEGRAM_SAFE_LENGTH: - answer = answer[:TELEGRAM_SAFE_LENGTH] + "\n\n_... (відповідь обрізано)_" - - helion_token = os.getenv("HELION_TELEGRAM_BOT_TOKEN") - await send_telegram_message(chat_id, answer, helion_token) - return {"ok": True, "agent": "parser", "mode": "rag_query"} - # Fall through to regular chat if RAG query fails - - # Regular chat mode - # Fetch memory context (includes local context as fallback) - # All agents use limit=80 for full conversation history - memory_context = await memory_client.get_context( - user_id=f"tg:{user_id}", - agent_id="helion", - team_id=dao_id, - channel_id=chat_id, - limit=80 - ) - - # Build message with conversation context - local_history = memory_context.get("local_context_text", "") - - # Check if this is a training group - is_training_group = str(chat_id) in TRAINING_GROUP_IDS - training_prefix = "" - if is_training_group: - training_prefix = "[РЕЖИМ НАВЧАННЯ - відповідай на це повідомлення, ти в навчальній групі Agent Preschool]\n\n" - - if local_history: - # Add conversation history to message for better context understanding - message_with_context = f"{training_prefix}[Контекст розмови]\n{local_history}\n\n[Поточне повідомлення від {username}]\n{text}" - else: - message_with_context = f"{training_prefix}{text}" - - # Build request to Router with Helion context - router_request = { - "message": message_with_context, - "mode": "chat", - "agent": "helion", # Helion agent identifier - "metadata": { - "source": "telegram", - "dao_id": dao_id, - "user_id": f"tg:{user_id}", - "session_id": f"tg:{chat_id}:{dao_id}", - "username": username, - "chat_id": chat_id, - "mentioned_bots": mentioned_bots, - "requires_complex_reasoning": needs_complex_reasoning, - }, - "context": { - "agent_name": HELION_NAME, - "system_prompt": HELION_SYSTEM_PROMPT, - "memory": memory_context, - # RBAC context will be injected by Router - }, - } - - # Send to Router - logger.info(f"Sending to Router: agent=helion, dao={dao_id}, user=tg:{user_id}") - response = await send_to_router(router_request) - - # Extract response text - if isinstance(response, dict): - answer_text = response.get("data", {}).get("text") or response.get("response", "Вибач, я зараз не можу відповісти.") - else: - answer_text = "Вибач, сталася помилка." - - logger.info(f"Router response: {answer_text[:100]}") - - # Save chat turn to memory - await memory_client.save_chat_turn( - agent_id="helion", - team_id=dao_id, - user_id=f"tg:{user_id}", - message=text, - response=answer_text, - channel_id=chat_id, - scope="short_term", - save_agent_response=not is_service_response(answer_text), - agent_metadata={ - "context": "helion", - "mentioned_bots": mentioned_bots, - "requires_complex_reasoning": needs_complex_reasoning, - }, - username=username, - ) - - # Send response back to Telegram - await send_telegram_message(chat_id, answer_text, os.getenv("HELION_TELEGRAM_BOT_TOKEN")) - - return {"ok": True, "agent": "helion"} - - except Exception as e: - logger.error(f"Error handling Helion Telegram webhook: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=str(e)) + logger.warning("Deprecated handler _old_helion_telegram_webhook invoked; redirecting to unified handler") + return await handle_telegram_webhook(HELION_CONFIG, update) @router.get("/health") diff --git a/gateway-bot/services/doc_service.py b/gateway-bot/services/doc_service.py index 10303f33..3ae10754 100644 --- a/gateway-bot/services/doc_service.py +++ b/gateway-bot/services/doc_service.py @@ -86,7 +86,8 @@ class DocumentService: doc_id: str, doc_url: Optional[str] = None, file_name: Optional[str] = None, - dao_id: Optional[str] = None + dao_id: Optional[str] = None, + user_id: Optional[str] = None, ) -> bool: """ Save document context for a session. @@ -104,10 +105,10 @@ class DocumentService: True if saved successfully """ try: - # Extract user_id from session_id if possible + # Extract fallback user_id from session_id if not provided. # Format: "channel:identifier" or "channel:user_id" parts = session_id.split(":", 1) - user_id = parts[1] if len(parts) > 1 else session_id + fact_user_id = user_id or (parts[1] if len(parts) > 1 else session_id) # Save as fact in Memory Service fact_key = f"doc_context:{session_id}" @@ -116,14 +117,17 @@ class DocumentService: "doc_url": doc_url, "file_name": file_name, "dao_id": dao_id, + "user_id": user_id, "saved_at": datetime.utcnow().isoformat() } result = await self.memory_client.upsert_fact( - user_id=user_id, + user_id=fact_user_id, fact_key=fact_key, fact_value_json=fact_value_json, - team_id=dao_id + # Keep doc context globally addressable for follow-up calls + # that may not include dao_id/team_id in retrieval. + team_id=None, ) logger.info(f"Saved doc context for session {session_id}: doc_id={doc_id}") @@ -260,7 +264,8 @@ class DocumentService: doc_id=doc_id, doc_url=doc_url, file_name=file_name, - dao_id=dao_id + dao_id=dao_id, + user_id=user_id, ) # Convert text to markdown format @@ -312,7 +317,8 @@ class DocumentService: doc_id=doc_id, doc_url=doc_url, file_name=file_name, - dao_id=dao_id + dao_id=dao_id, + user_id=user_id, ) return ParsedResult( @@ -599,7 +605,8 @@ async def save_doc_context( doc_id: str, doc_url: Optional[str] = None, file_name: Optional[str] = None, - dao_id: Optional[str] = None + dao_id: Optional[str] = None, + user_id: Optional[str] = None, ) -> bool: """Save document context for a session""" return await doc_service.save_doc_context( @@ -607,11 +614,11 @@ async def save_doc_context( doc_id=doc_id, doc_url=doc_url, file_name=file_name, - dao_id=dao_id + dao_id=dao_id, + user_id=user_id, ) async def get_doc_context(session_id: str) -> Optional[DocContext]: """Get document context for a session""" return await doc_service.get_doc_context(session_id) - diff --git a/ops/canary_all.sh b/ops/canary_all.sh index 868d5ac5..e9b9389d 100755 --- a/ops/canary_all.sh +++ b/ops/canary_all.sh @@ -14,5 +14,6 @@ run() { run "gateway_delivery_priority" "$ROOT/ops/canary_gateway_delivery_priority.sh" run "router_contract" "$ROOT/ops/canary_router_contract.sh" run "daarwizz_awareness" "$ROOT/ops/check_daarwizz_awareness.sh" +run "helion_stack_smoke" "$ROOT/ops/smoke_helion_stack.sh" echo "[OK] all canaries passed" diff --git a/ops/smoke_helion_stack.sh b/ops/smoke_helion_stack.sh new file mode 100755 index 00000000..ff3d9fcf --- /dev/null +++ b/ops/smoke_helion_stack.sh @@ -0,0 +1,89 @@ +#!/usr/bin/env bash +set -euo pipefail + +GATEWAY_URL="${GATEWAY_URL:-http://localhost:9300}" +ROUTER_URL="${ROUTER_URL:-http://localhost:9102}" +MEMORY_URL="${MEMORY_URL:-http://localhost:8000}" +SWAPPER_URL="${SWAPPER_URL:-http://localhost:8890}" + +SMOKE_USER="${SMOKE_USER:-smoke-helion-user}" +SMOKE_CHAT="${SMOKE_CHAT:-smoke-helion-chat}" + +note() { echo "[INFO] $*"; } +ok() { echo "[OK] $*"; } +fail() { echo "[FAIL] $*" >&2; exit 1; } + +need_cmd() { + command -v "$1" >/dev/null 2>&1 || fail "missing command: $1" +} + +need_cmd curl +need_cmd python3 + +note "Health checks" +curl -fsS -m 10 "$GATEWAY_URL/health" >/dev/null || fail "gateway health failed" +curl -fsS -m 10 "$ROUTER_URL/health" >/dev/null || fail "router health failed" +curl -fsS -m 10 "$MEMORY_URL/health" >/dev/null || fail "memory health failed" +curl -fsS -m 10 "$SWAPPER_URL/health" >/dev/null || fail "swapper health failed" +ok "core services healthy" + +note "Helion text infer (deepseek-first path)" +TEXT_RESP="$(curl -fsS -m 45 -H 'Content-Type: application/json' \ + -d "{\"prompt\":\"Що таке тепловий насос? Коротко.\",\"agent\":\"helion\",\"metadata\":{\"source\":\"ops-smoke\",\"user_id\":\"$SMOKE_USER\",\"chat_id\":\"$SMOKE_CHAT\"}}" \ + "$ROUTER_URL/v1/agents/helion/infer")" + +python3 - "$TEXT_RESP" <<'PY' || fail "text infer validation failed" +import json, sys +data = json.loads(sys.argv[1]) +backend = str(data.get("backend", "")) +resp = str(data.get("response", "")) +if not backend: + raise SystemExit("backend missing") +if backend == "crewai": + raise SystemExit("unexpected crewai backend for short text") +if not resp: + raise SystemExit("empty response") +print(f"[INFO] text backend={backend}") +PY +ok "text infer passed" + +note "Memory write/read smoke" +curl -fsS -m 15 -H "Authorization: Bearer $SMOKE_USER" -H 'Content-Type: application/json' \ + -d "{\"agent_id\":\"helion\",\"team_id\":\"helion-dao\",\"channel_id\":\"$SMOKE_CHAT\",\"user_id\":\"$SMOKE_USER\",\"scope\":\"short_term\",\"kind\":\"message\",\"body_text\":\"smoke memory user turn\",\"body_json\":{\"type\":\"user_message\",\"source\":\"ops-smoke\"}}" \ + "$MEMORY_URL/agents/helion/memory" >/dev/null + +curl -fsS -m 15 -H "Authorization: Bearer $SMOKE_USER" -H 'Content-Type: application/json' \ + -d "{\"agent_id\":\"helion\",\"team_id\":\"helion-dao\",\"channel_id\":\"$SMOKE_CHAT\",\"user_id\":\"$SMOKE_USER\",\"scope\":\"short_term\",\"kind\":\"message\",\"body_text\":\"smoke memory assistant turn\",\"body_json\":{\"type\":\"agent_response\",\"source\":\"ops-smoke\"}}" \ + "$MEMORY_URL/agents/helion/memory" >/dev/null + +MEM_RESP="$(curl -fsS -m 15 -H "Authorization: Bearer $SMOKE_USER" \ + "$MEMORY_URL/agents/helion/memory?user_id=$SMOKE_USER&channel_id=$SMOKE_CHAT&limit=10")" + +python3 - "$MEM_RESP" <<'PY' || fail "memory read validation failed" +import json, sys +data = json.loads(sys.argv[1]) +events = data.get("events", []) +joined = "\n".join(str(e.get("content", "")) for e in events) +if "smoke memory user turn" not in joined or "smoke memory assistant turn" not in joined: + raise SystemExit("smoke events not found in memory") +print(f"[INFO] memory events={len(events)}") +PY +ok "memory smoke passed" + +note "Helion vision infer path" +PNG_B64='iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO5X7h8AAAAASUVORK5CYII=' +VISION_RESP="$(curl -fsS -m 90 -H 'Content-Type: application/json' \ + -d "{\"prompt\":\"Що зображено? Відповідай коротко.\",\"agent\":\"helion\",\"images\":[\"data:image/png;base64,$PNG_B64\"],\"metadata\":{\"source\":\"ops-smoke\",\"user_id\":\"$SMOKE_USER\",\"chat_id\":\"$SMOKE_CHAT\"}}" \ + "$ROUTER_URL/v1/agents/helion/infer")" + +python3 - "$VISION_RESP" <<'PY' || fail "vision infer validation failed" +import json, sys +data = json.loads(sys.argv[1]) +backend = str(data.get("backend", "")) +if not backend.startswith("swapper-vision"): + raise SystemExit(f"unexpected vision backend: {backend}") +print(f"[INFO] vision backend={backend}") +PY +ok "vision smoke passed" + +echo "[OK] helion stack smoke passed"