diff --git a/crews/agromatrix_crew/operator_commands.py b/crews/agromatrix_crew/operator_commands.py index 8015539..194a5c5 100644 --- a/crews/agromatrix_crew/operator_commands.py +++ b/crews/agromatrix_crew/operator_commands.py @@ -1,3 +1,13 @@ +""" +Operator commands for AgroMatrix (Stepan). Access control and slash commands. + +Access control (env, used by gateway and here): +- AGX_OPERATOR_IDS: comma-separated Telegram user_id list; only these users are operators. +- AGX_OPERATOR_CHAT_ID: optional; if set, operator actions allowed only in this chat_id. + +When is_operator(user_id, chat_id) is True, gateway routes any message (not only slash) +to Stepan for human-friendly operator interaction. +""" import os import re import shlex diff --git a/docker-compose.node1.yml b/docker-compose.node1.yml index ca8c80a..662815f 100644 --- a/docker-compose.node1.yml +++ b/docker-compose.node1.yml @@ -191,8 +191,16 @@ services: - STT_SERVICE_UPLOAD_URL=http://swapper-service:8890/stt - OCR_SERVICE_URL=http://swapper-service:8890 - WEB_SEARCH_SERVICE_URL=http://swapper-service:8890 + # Stepan (AgroMatrix) in-process + - PYTHONPATH=/app:/app/packages/agromatrix-tools + - AGX_REPO_ROOT=/app + - AGX_STEPAN_MODE=${AGX_STEPAN_MODE:-inproc} + - AGX_OPERATOR_IDS=${AGX_OPERATOR_IDS:-} + - AGX_OPERATOR_CHAT_ID=${AGX_OPERATOR_CHAT_ID:-} volumes: - ${DEPLOY_ROOT:-.}/gateway-bot:/app/gateway-bot:ro + - ${DEPLOY_ROOT:-.}/crews:/app/crews:ro + - ${DEPLOY_ROOT:-.}/packages/agromatrix-tools:/app/packages/agromatrix-tools:ro - ${DEPLOY_ROOT:-.}/logs:/app/logs depends_on: - router diff --git a/gateway-bot/app.py b/gateway-bot/app.py index 0653724..97f7e3c 100644 --- a/gateway-bot/app.py +++ b/gateway-bot/app.py @@ -2,16 +2,23 @@ FastAPI app instance for Gateway Bot """ import logging +import os +import sys +from pathlib import Path + from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from http_api import router as gateway_router from http_api_doc import router as doc_router +import gateway_boot + logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) +logger = logging.getLogger(__name__) app = FastAPI( title="Bot Gateway with DAARWIZZ", @@ -32,6 +39,30 @@ app.add_middleware( app.include_router(gateway_router, prefix="", tags=["gateway"]) app.include_router(doc_router, prefix="", tags=["docs"]) + +@app.on_event("startup") +async def startup_stepan_check(): + """Check crews + agromatrix_tools availability. Do not crash gateway if missing.""" + repo_root = os.getenv("AGX_REPO_ROOT", "/opt/microdao-daarion").strip() + if repo_root and repo_root not in sys.path: + sys.path.insert(0, repo_root) + tools_path = str(Path(repo_root) / "packages" / "agromatrix-tools") + if tools_path not in sys.path: + sys.path.insert(0, tools_path) + try: + import crews.agromatrix_crew.run # noqa: F401 + import agromatrix_tools # noqa: F401 + gateway_boot.STEPAN_IMPORTS_OK = True + logger.info("Stepan inproc: crews + agromatrix_tools OK; STEPAN_IMPORTS_OK=True") + except Exception as e: + logger.error( + "Stepan disabled: crews or agromatrix_tools not available: %s. " + "Set AGX_REPO_ROOT, mount crews and packages/agromatrix-tools.", + e, + ) + gateway_boot.STEPAN_IMPORTS_OK = False + + @app.get("/") async def root(): return { diff --git a/gateway-bot/gateway_boot.py b/gateway-bot/gateway_boot.py new file mode 100644 index 0000000..05daab2 --- /dev/null +++ b/gateway-bot/gateway_boot.py @@ -0,0 +1,4 @@ +""" +Boot-time state for Gateway. Set by app startup; read by http_api. +""" +STEPAN_IMPORTS_OK = False diff --git a/gateway-bot/http_api.py b/gateway-bot/http_api.py index 8bb526d..942f422 100644 --- a/gateway-bot/http_api.py +++ b/gateway-bot/http_api.py @@ -44,6 +44,7 @@ from behavior_policy import ( get_ack_text, is_prober_request, has_agent_chat_participation, + has_recent_interaction, NO_OUTPUT, BehaviorDecision, AGENT_NAME_VARIANTS, @@ -51,6 +52,16 @@ from behavior_policy import ( logger = logging.getLogger(__name__) + +def _safe_has_recent_interaction(agent_id: str, chat_id: str, user_id: str) -> bool: + """Guard: avoid 500 if has_recent_interaction is missing or raises. Returns False on any error.""" + try: + return bool(has_recent_interaction(agent_id, str(chat_id), str(user_id))) + except Exception as e: + logger.warning("has_recent_interaction failed, treating as False: %s", e) + return False + + # Telegram message length limits TELEGRAM_MAX_MESSAGE_LENGTH = 4096 TELEGRAM_SAFE_LENGTH = 3500 # Leave room for formatting @@ -992,6 +1003,18 @@ async def druid_telegram_webhook(update: TelegramUpdate): # AGROMATRIX webhook endpoint +# AGX_STEPAN_MODE: inproc = run Crew in-process (default); http = call crewai-service (9010). +_STEPAN_MODE = None + +def _get_stepan_mode() -> str: + global _STEPAN_MODE + if _STEPAN_MODE is None: + _STEPAN_MODE = (os.getenv("AGX_STEPAN_MODE", "inproc") or "inproc").strip().lower() + if _STEPAN_MODE not in ("inproc", "http"): + _STEPAN_MODE = "inproc" + logger.info("Stepan mode=%s (AGX_STEPAN_MODE)", _STEPAN_MODE) + return _STEPAN_MODE + async def handle_stepan_message(update: TelegramUpdate, agent_config: AgentConfig) -> Dict[str, Any]: update_id = getattr(update, 'update_id', None) or update.update_id @@ -1022,10 +1045,37 @@ async def handle_stepan_message(update: TelegramUpdate, agent_config: AgentConfi ops_mode = True trace_id = str(uuid.uuid4()) + stepan_mode = _get_stepan_mode() + + if stepan_mode == "http": + logger.warning("Stepan http mode not implemented; use AGX_STEPAN_MODE=inproc.") + bot_token = agent_config.get_telegram_token() + await send_telegram_message( + chat_id, + "Степан у режимі HTTP зараз недоступний. Встановіть AGX_STEPAN_MODE=inproc.", + bot_token=bot_token, + ) + return {"ok": False, "status": "stepan_http_not_implemented"} - # call Stepan directly try: - sys.path.insert(0, str(Path('/opt/microdao-daarion'))) + import gateway_boot + except ImportError: + gateway_boot = type(sys)("gateway_boot") + gateway_boot.STEPAN_IMPORTS_OK = False + if not getattr(gateway_boot, "STEPAN_IMPORTS_OK", False): + logger.warning("Stepan inproc disabled: crews/agromatrix_tools not available at startup") + bot_token = agent_config.get_telegram_token() + await send_telegram_message( + chat_id, + "Степан тимчасово недоступний (не встановлено crews або agromatrix-tools).", + bot_token=bot_token, + ) + return {"ok": False, "status": "stepan_disabled"} + + try: + repo_root = os.getenv("AGX_REPO_ROOT", "/opt/microdao-daarion") + if repo_root and repo_root not in sys.path: + sys.path.insert(0, str(Path(repo_root))) from crews.agromatrix_crew.run import handle_message started = time.time() last_pending = _get_last_pending(chat_id) @@ -1078,35 +1128,14 @@ async def agromatrix_telegram_webhook(update: TelegramUpdate): if user_id and user_id in op_ids: is_ops = True - # Operator NL or operator slash commands -> handle via Stepan handler. - # Important: do NOT treat generic slash commands (/start, /agromatrix) as operator commands, - # otherwise regular users will see "Недостатньо прав" or Stepan errors. - operator_slash_cmds = { - "whoami", - "pending", - "pending_show", - "approve", - "reject", - "apply_dict", - "pending_stats", - } - slash_cmd = "" - if is_slash: - try: - slash_cmd = (msg_text.strip().split()[0].lstrip("/").strip().lower()) - except Exception: - slash_cmd = "" - is_operator_slash = bool(slash_cmd) and slash_cmd in operator_slash_cmds - - # Stepan handler currently depends on ChatOpenAI (OPENAI_API_KEY). If key is not configured, - # never route production traffic there (avoid "Помилка обробки..." and webhook 5xx). + # Operator: any message (not only slash) goes to Stepan when is_ops. stepan_enabled = bool(os.getenv("OPENAI_API_KEY", "").strip()) - if stepan_enabled and (is_ops or is_operator_slash): + if stepan_enabled and is_ops: return await handle_stepan_message(update, AGROMATRIX_CONFIG) - if (is_ops or is_operator_slash) and not stepan_enabled: + if is_ops and not stepan_enabled: logger.warning( "Stepan handler disabled (OPENAI_API_KEY missing); falling back to Router pipeline " - f"for chat_id={chat_id}, user_id={user_id}, slash_cmd={slash_cmd!r}" + f"for chat_id={chat_id}, user_id={user_id}" ) # General conversation -> standard Router pipeline (like all other agents) @@ -1911,7 +1940,8 @@ async def process_document( dao_id=dao_id, user_id=f"tg:{user_id}", output_mode="qa_pairs", - metadata={"username": username, "chat_id": chat_id} + metadata={"username": username, "chat_id": chat_id}, + agent_id=agent_config.agent_id, ) if not result.success: @@ -3067,7 +3097,7 @@ async def handle_telegram_webhook( # Check if there's a document context for follow-up questions session_id = f"telegram:{chat_id}" - doc_context = await get_doc_context(session_id) + doc_context = await get_doc_context(session_id, agent_id=agent_config.agent_id) # If there's a doc_id and the message looks like a question about the document if doc_context and doc_context.doc_id: @@ -3756,7 +3786,8 @@ async def _old_telegram_webhook(update: TelegramUpdate): dao_id=dao_id, user_id=f"tg:{user_id}", output_mode="qa_pairs", - metadata={"username": username, "chat_id": chat_id} + metadata={"username": username, "chat_id": chat_id}, + agent_id=agent_config.agent_id, ) if not result.success: @@ -3959,7 +3990,7 @@ async def _old_telegram_webhook(update: TelegramUpdate): # Check if there's a document context for follow-up questions session_id = f"telegram:{chat_id}" - doc_context = await get_doc_context(session_id) + doc_context = await get_doc_context(session_id, agent_id=agent_config.agent_id) # If there's a doc_id and the message looks like a question about the document if doc_context and doc_context.doc_id: diff --git a/gateway-bot/services/doc_service.py b/gateway-bot/services/doc_service.py index da5a684..5f8f031 100644 --- a/gateway-bot/services/doc_service.py +++ b/gateway-bot/services/doc_service.py @@ -198,29 +198,20 @@ class DocumentService: file_name: Optional[str] = None, dao_id: Optional[str] = None, user_id: Optional[str] = None, + agent_id: Optional[str] = None, ) -> bool: """ - Save document context for a session. - - Uses Memory Service to persist document context across channels. + Save document context for a session (scoped by agent_id to avoid cross-agent leak). Args: - session_id: Session identifier (e.g., "telegram:123", "web:user456") + session_id: Session identifier doc_id: Document ID from parser - doc_url: Optional document URL - file_name: Optional file name - dao_id: Optional DAO ID - - Returns: - True if saved successfully + agent_id: Optional; if set, context is isolated per agent (key: doc_context:{agent_id}:{session_id}). """ try: - # Use stable synthetic user key per session, so context can be - # retrieved later using only session_id (without caller user_id). - fact_user_id = f"session:{session_id}" - - # Save as fact in Memory Service - fact_key = f"doc_context:{session_id}" + aid = (agent_id or "default").lower() + fact_user_id = f"session:{aid}:{session_id}" + fact_key = f"doc_context:{aid}:{session_id}" fact_value_json = { "doc_id": doc_id, "doc_url": doc_url, @@ -239,36 +230,28 @@ class DocumentService: team_id=None, ) - logger.info(f"Saved doc context for session {session_id}: doc_id={doc_id}") + logger.info(f"Saved doc context for session {session_id} agent={aid}: doc_id={doc_id}") return result except Exception as e: logger.error(f"Failed to save doc context: {e}", exc_info=True) return False - async def get_doc_context(self, session_id: str) -> Optional[DocContext]: + async def get_doc_context(self, session_id: str, agent_id: Optional[str] = None) -> Optional[DocContext]: """ - Get document context for a session. - - Args: - session_id: Session identifier - - Returns: - DocContext or None + Get document context for a session (scoped by agent_id when provided). + Backward-compat: if new key missing, tries legacy doc_context:{session_id} (read-only). """ try: - user_id = f"session:{session_id}" - - fact_key = f"doc_context:{session_id}" - - # Get fact from Memory Service + aid = (agent_id or "default").lower() + user_id = f"session:{aid}:{session_id}" + fact_key = f"doc_context:{aid}:{session_id}" fact = await self.memory_client.get_fact( user_id=user_id, fact_key=fact_key ) - if fact and fact.get("fact_value_json"): - logger.debug(f"Retrieved doc context for session {session_id}") + logger.debug(f"Retrieved doc context for session {session_id} agent={aid}") ctx_data = fact.get("fact_value_json") if isinstance(ctx_data, str): try: @@ -277,9 +260,23 @@ class DocumentService: logger.warning("doc_context fact_value_json is not valid JSON string") return None return DocContext(**ctx_data) - + # Backward-compat: legacy key + legacy_user_id = f"session:{session_id}" + legacy_key = f"doc_context:{session_id}" + fact_legacy = await self.memory_client.get_fact( + user_id=legacy_user_id, + fact_key=legacy_key + ) + if fact_legacy and fact_legacy.get("fact_value_json"): + logger.debug(f"Retrieved doc context from legacy key for session {session_id}") + ctx_data = fact_legacy.get("fact_value_json") + if isinstance(ctx_data, str): + try: + ctx_data = json.loads(ctx_data) + except Exception: + return None + return DocContext(**ctx_data) return None - except Exception as e: logger.error(f"Failed to get doc context: {e}", exc_info=True) return None @@ -292,7 +289,8 @@ class DocumentService: dao_id: str, user_id: str, output_mode: str = "qa_pairs", - metadata: Optional[Dict[str, Any]] = None + metadata: Optional[Dict[str, Any]] = None, + agent_id: Optional[str] = None, ) -> ParsedResult: """ Parse a document directly through Swapper service. @@ -372,7 +370,6 @@ class DocumentService: # Generate a simple doc_id based on filename and timestamp doc_id = hashlib.md5(f"{file_name}:{datetime.utcnow().isoformat()}".encode()).hexdigest()[:12] - # Save document context for follow-up queries await self.save_doc_context( session_id=session_id, doc_id=doc_id, @@ -380,6 +377,7 @@ class DocumentService: file_name=file_name, dao_id=dao_id, user_id=user_id, + agent_id=agent_id, ) # Convert text to markdown format @@ -433,6 +431,7 @@ class DocumentService: file_name=file_name, dao_id=dao_id, user_id=user_id, + agent_id=agent_id, ) return ParsedResult( @@ -697,9 +696,10 @@ async def parse_document( dao_id: str, user_id: str, output_mode: str = "qa_pairs", - metadata: Optional[Dict[str, Any]] = None + metadata: Optional[Dict[str, Any]] = None, + agent_id: Optional[str] = None, ) -> ParsedResult: - """Parse a document through DAGI Router""" + """Parse a document (agent_id scopes doc_context key).""" return await doc_service.parse_document( session_id=session_id, doc_url=doc_url, @@ -707,7 +707,8 @@ async def parse_document( dao_id=dao_id, user_id=user_id, output_mode=output_mode, - metadata=metadata + metadata=metadata, + agent_id=agent_id, ) @@ -756,8 +757,9 @@ async def save_doc_context( file_name: Optional[str] = None, dao_id: Optional[str] = None, user_id: Optional[str] = None, + agent_id: Optional[str] = None, ) -> bool: - """Save document context for a session""" + """Save document context for a session (scoped by agent_id when provided).""" return await doc_service.save_doc_context( session_id=session_id, doc_id=doc_id, @@ -765,9 +767,10 @@ async def save_doc_context( file_name=file_name, dao_id=dao_id, user_id=user_id, + agent_id=agent_id, ) -async def get_doc_context(session_id: str) -> Optional[DocContext]: - """Get document context for a session""" - return await doc_service.get_doc_context(session_id) +async def get_doc_context(session_id: str, agent_id: Optional[str] = None) -> Optional[DocContext]: + """Get document context for a session (scoped by agent_id when provided).""" + return await doc_service.get_doc_context(session_id, agent_id=agent_id)