Files
microdao-daarion/agromatrix_stepan_noda1_prod.patch
Apple fa749fa56c chore(infra): add NODA2 setup files, docker-compose configs and root config
- AGENTS.md: Sofiia Chief AI Architect role definition
- SOFIIA_IN_OPENCODE.md, SOFIIA_NODA2_SETUP.md: NODA2 setup documentation
- agromatrix_stepan_noda1_APPLY.md, agromatrix_stepan_noda1_prod.patch: AgroMatrix production patch
- docker-compose.memory-node2.yml: memory service for NODA2
- docker-compose.node2-sofiia-supervisor.yml: sofiia supervisor for NODA2
- gateway-bot/gateway_boot.py, monitor_prompt.txt, vision_guard.py: gateway extras
- models/Modelfile.qwen3.5-35b-a3b: Qwen model definition for NODA3
- opencode.json: OpenCode providers and agents config
- scripts/init-sofiia-memory.py, scripts/node2/*, start-memory-node2.sh: NODA2 init scripts
- setup_sofiia_node2.sh: NODA2 full setup script

Made-with: Cursor
2026-03-03 07:15:20 -08:00

469 lines
19 KiB
Diff
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
diff --git a/crews/agromatrix_crew/operator_commands.py b/crews/agromatrix_crew/operator_commands.py
index 8015539..194a5c5 100644
--- a/crews/agromatrix_crew/operator_commands.py
+++ b/crews/agromatrix_crew/operator_commands.py
@@ -1,3 +1,13 @@
+"""
+Operator commands for AgroMatrix (Stepan). Access control and slash commands.
+
+Access control (env, used by gateway and here):
+- AGX_OPERATOR_IDS: comma-separated Telegram user_id list; only these users are operators.
+- AGX_OPERATOR_CHAT_ID: optional; if set, operator actions allowed only in this chat_id.
+
+When is_operator(user_id, chat_id) is True, gateway routes any message (not only slash)
+to Stepan for human-friendly operator interaction.
+"""
import os
import re
import shlex
diff --git a/docker-compose.node1.yml b/docker-compose.node1.yml
index ca8c80a..662815f 100644
--- a/docker-compose.node1.yml
+++ b/docker-compose.node1.yml
@@ -191,8 +191,16 @@ services:
- STT_SERVICE_UPLOAD_URL=http://swapper-service:8890/stt
- OCR_SERVICE_URL=http://swapper-service:8890
- WEB_SEARCH_SERVICE_URL=http://swapper-service:8890
+ # Stepan (AgroMatrix) in-process
+ - PYTHONPATH=/app:/app/packages/agromatrix-tools
+ - AGX_REPO_ROOT=/app
+ - AGX_STEPAN_MODE=${AGX_STEPAN_MODE:-inproc}
+ - AGX_OPERATOR_IDS=${AGX_OPERATOR_IDS:-}
+ - AGX_OPERATOR_CHAT_ID=${AGX_OPERATOR_CHAT_ID:-}
volumes:
- ${DEPLOY_ROOT:-.}/gateway-bot:/app/gateway-bot:ro
+ - ${DEPLOY_ROOT:-.}/crews:/app/crews:ro
+ - ${DEPLOY_ROOT:-.}/packages/agromatrix-tools:/app/packages/agromatrix-tools:ro
- ${DEPLOY_ROOT:-.}/logs:/app/logs
depends_on:
- router
diff --git a/gateway-bot/app.py b/gateway-bot/app.py
index 0653724..97f7e3c 100644
--- a/gateway-bot/app.py
+++ b/gateway-bot/app.py
@@ -2,16 +2,23 @@
FastAPI app instance for Gateway Bot
"""
import logging
+import os
+import sys
+from pathlib import Path
+
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from http_api import router as gateway_router
from http_api_doc import router as doc_router
+import gateway_boot
+
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
+logger = logging.getLogger(__name__)
app = FastAPI(
title="Bot Gateway with DAARWIZZ",
@@ -32,6 +39,30 @@ app.add_middleware(
app.include_router(gateway_router, prefix="", tags=["gateway"])
app.include_router(doc_router, prefix="", tags=["docs"])
+
+@app.on_event("startup")
+async def startup_stepan_check():
+ """Check crews + agromatrix_tools availability. Do not crash gateway if missing."""
+ repo_root = os.getenv("AGX_REPO_ROOT", "/opt/microdao-daarion").strip()
+ if repo_root and repo_root not in sys.path:
+ sys.path.insert(0, repo_root)
+ tools_path = str(Path(repo_root) / "packages" / "agromatrix-tools")
+ if tools_path not in sys.path:
+ sys.path.insert(0, tools_path)
+ try:
+ import crews.agromatrix_crew.run # noqa: F401
+ import agromatrix_tools # noqa: F401
+ gateway_boot.STEPAN_IMPORTS_OK = True
+ logger.info("Stepan inproc: crews + agromatrix_tools OK; STEPAN_IMPORTS_OK=True")
+ except Exception as e:
+ logger.error(
+ "Stepan disabled: crews or agromatrix_tools not available: %s. "
+ "Set AGX_REPO_ROOT, mount crews and packages/agromatrix-tools.",
+ e,
+ )
+ gateway_boot.STEPAN_IMPORTS_OK = False
+
+
@app.get("/")
async def root():
return {
diff --git a/gateway-bot/gateway_boot.py b/gateway-bot/gateway_boot.py
new file mode 100644
index 0000000..05daab2
--- /dev/null
+++ b/gateway-bot/gateway_boot.py
@@ -0,0 +1,4 @@
+"""
+Boot-time state for Gateway. Set by app startup; read by http_api.
+"""
+STEPAN_IMPORTS_OK = False
diff --git a/gateway-bot/http_api.py b/gateway-bot/http_api.py
index 8bb526d..942f422 100644
--- a/gateway-bot/http_api.py
+++ b/gateway-bot/http_api.py
@@ -44,6 +44,7 @@ from behavior_policy import (
get_ack_text,
is_prober_request,
has_agent_chat_participation,
+ has_recent_interaction,
NO_OUTPUT,
BehaviorDecision,
AGENT_NAME_VARIANTS,
@@ -51,6 +52,16 @@ from behavior_policy import (
logger = logging.getLogger(__name__)
+
+def _safe_has_recent_interaction(agent_id: str, chat_id: str, user_id: str) -> bool:
+ """Guard: avoid 500 if has_recent_interaction is missing or raises. Returns False on any error."""
+ try:
+ return bool(has_recent_interaction(agent_id, str(chat_id), str(user_id)))
+ except Exception as e:
+ logger.warning("has_recent_interaction failed, treating as False: %s", e)
+ return False
+
+
# Telegram message length limits
TELEGRAM_MAX_MESSAGE_LENGTH = 4096
TELEGRAM_SAFE_LENGTH = 3500 # Leave room for formatting
@@ -992,6 +1003,18 @@ async def druid_telegram_webhook(update: TelegramUpdate):
# AGROMATRIX webhook endpoint
+# AGX_STEPAN_MODE: inproc = run Crew in-process (default); http = call crewai-service (9010).
+_STEPAN_MODE = None
+
+def _get_stepan_mode() -> str:
+ global _STEPAN_MODE
+ if _STEPAN_MODE is None:
+ _STEPAN_MODE = (os.getenv("AGX_STEPAN_MODE", "inproc") or "inproc").strip().lower()
+ if _STEPAN_MODE not in ("inproc", "http"):
+ _STEPAN_MODE = "inproc"
+ logger.info("Stepan mode=%s (AGX_STEPAN_MODE)", _STEPAN_MODE)
+ return _STEPAN_MODE
+
async def handle_stepan_message(update: TelegramUpdate, agent_config: AgentConfig) -> Dict[str, Any]:
update_id = getattr(update, 'update_id', None) or update.update_id
@@ -1022,10 +1045,37 @@ async def handle_stepan_message(update: TelegramUpdate, agent_config: AgentConfi
ops_mode = True
trace_id = str(uuid.uuid4())
+ stepan_mode = _get_stepan_mode()
+
+ if stepan_mode == "http":
+ logger.warning("Stepan http mode not implemented; use AGX_STEPAN_MODE=inproc.")
+ bot_token = agent_config.get_telegram_token()
+ await send_telegram_message(
+ chat_id,
+ "Степан у режимі HTTP зараз недоступний. Встановіть AGX_STEPAN_MODE=inproc.",
+ bot_token=bot_token,
+ )
+ return {"ok": False, "status": "stepan_http_not_implemented"}
- # call Stepan directly
try:
- sys.path.insert(0, str(Path('/opt/microdao-daarion')))
+ import gateway_boot
+ except ImportError:
+ gateway_boot = type(sys)("gateway_boot")
+ gateway_boot.STEPAN_IMPORTS_OK = False
+ if not getattr(gateway_boot, "STEPAN_IMPORTS_OK", False):
+ logger.warning("Stepan inproc disabled: crews/agromatrix_tools not available at startup")
+ bot_token = agent_config.get_telegram_token()
+ await send_telegram_message(
+ chat_id,
+ "Степан тимчасово недоступний (не встановлено crews або agromatrix-tools).",
+ bot_token=bot_token,
+ )
+ return {"ok": False, "status": "stepan_disabled"}
+
+ try:
+ repo_root = os.getenv("AGX_REPO_ROOT", "/opt/microdao-daarion")
+ if repo_root and repo_root not in sys.path:
+ sys.path.insert(0, str(Path(repo_root)))
from crews.agromatrix_crew.run import handle_message
started = time.time()
last_pending = _get_last_pending(chat_id)
@@ -1078,35 +1128,14 @@ async def agromatrix_telegram_webhook(update: TelegramUpdate):
if user_id and user_id in op_ids:
is_ops = True
- # Operator NL or operator slash commands -> handle via Stepan handler.
- # Important: do NOT treat generic slash commands (/start, /agromatrix) as operator commands,
- # otherwise regular users will see "Недостатньо прав" or Stepan errors.
- operator_slash_cmds = {
- "whoami",
- "pending",
- "pending_show",
- "approve",
- "reject",
- "apply_dict",
- "pending_stats",
- }
- slash_cmd = ""
- if is_slash:
- try:
- slash_cmd = (msg_text.strip().split()[0].lstrip("/").strip().lower())
- except Exception:
- slash_cmd = ""
- is_operator_slash = bool(slash_cmd) and slash_cmd in operator_slash_cmds
-
- # Stepan handler currently depends on ChatOpenAI (OPENAI_API_KEY). If key is not configured,
- # never route production traffic there (avoid "Помилка обробки..." and webhook 5xx).
+ # Operator: any message (not only slash) goes to Stepan when is_ops.
stepan_enabled = bool(os.getenv("OPENAI_API_KEY", "").strip())
- if stepan_enabled and (is_ops or is_operator_slash):
+ if stepan_enabled and is_ops:
return await handle_stepan_message(update, AGROMATRIX_CONFIG)
- if (is_ops or is_operator_slash) and not stepan_enabled:
+ if is_ops and not stepan_enabled:
logger.warning(
"Stepan handler disabled (OPENAI_API_KEY missing); falling back to Router pipeline "
- f"for chat_id={chat_id}, user_id={user_id}, slash_cmd={slash_cmd!r}"
+ f"for chat_id={chat_id}, user_id={user_id}"
)
# General conversation -> standard Router pipeline (like all other agents)
@@ -1911,7 +1940,8 @@ async def process_document(
dao_id=dao_id,
user_id=f"tg:{user_id}",
output_mode="qa_pairs",
- metadata={"username": username, "chat_id": chat_id}
+ metadata={"username": username, "chat_id": chat_id},
+ agent_id=agent_config.agent_id,
)
if not result.success:
@@ -3067,7 +3097,7 @@ async def handle_telegram_webhook(
# Check if there's a document context for follow-up questions
session_id = f"telegram:{chat_id}"
- doc_context = await get_doc_context(session_id)
+ doc_context = await get_doc_context(session_id, agent_id=agent_config.agent_id)
# If there's a doc_id and the message looks like a question about the document
if doc_context and doc_context.doc_id:
@@ -3756,7 +3786,8 @@ async def _old_telegram_webhook(update: TelegramUpdate):
dao_id=dao_id,
user_id=f"tg:{user_id}",
output_mode="qa_pairs",
- metadata={"username": username, "chat_id": chat_id}
+ metadata={"username": username, "chat_id": chat_id},
+ agent_id=agent_config.agent_id,
)
if not result.success:
@@ -3959,7 +3990,7 @@ async def _old_telegram_webhook(update: TelegramUpdate):
# Check if there's a document context for follow-up questions
session_id = f"telegram:{chat_id}"
- doc_context = await get_doc_context(session_id)
+ doc_context = await get_doc_context(session_id, agent_id=agent_config.agent_id)
# If there's a doc_id and the message looks like a question about the document
if doc_context and doc_context.doc_id:
diff --git a/gateway-bot/services/doc_service.py b/gateway-bot/services/doc_service.py
index da5a684..5f8f031 100644
--- a/gateway-bot/services/doc_service.py
+++ b/gateway-bot/services/doc_service.py
@@ -198,29 +198,20 @@ class DocumentService:
file_name: Optional[str] = None,
dao_id: Optional[str] = None,
user_id: Optional[str] = None,
+ agent_id: Optional[str] = None,
) -> bool:
"""
- Save document context for a session.
-
- Uses Memory Service to persist document context across channels.
+ Save document context for a session (scoped by agent_id to avoid cross-agent leak).
Args:
- session_id: Session identifier (e.g., "telegram:123", "web:user456")
+ session_id: Session identifier
doc_id: Document ID from parser
- doc_url: Optional document URL
- file_name: Optional file name
- dao_id: Optional DAO ID
-
- Returns:
- True if saved successfully
+ agent_id: Optional; if set, context is isolated per agent (key: doc_context:{agent_id}:{session_id}).
"""
try:
- # Use stable synthetic user key per session, so context can be
- # retrieved later using only session_id (without caller user_id).
- fact_user_id = f"session:{session_id}"
-
- # Save as fact in Memory Service
- fact_key = f"doc_context:{session_id}"
+ aid = (agent_id or "default").lower()
+ fact_user_id = f"session:{aid}:{session_id}"
+ fact_key = f"doc_context:{aid}:{session_id}"
fact_value_json = {
"doc_id": doc_id,
"doc_url": doc_url,
@@ -239,36 +230,28 @@ class DocumentService:
team_id=None,
)
- logger.info(f"Saved doc context for session {session_id}: doc_id={doc_id}")
+ logger.info(f"Saved doc context for session {session_id} agent={aid}: doc_id={doc_id}")
return result
except Exception as e:
logger.error(f"Failed to save doc context: {e}", exc_info=True)
return False
- async def get_doc_context(self, session_id: str) -> Optional[DocContext]:
+ async def get_doc_context(self, session_id: str, agent_id: Optional[str] = None) -> Optional[DocContext]:
"""
- Get document context for a session.
-
- Args:
- session_id: Session identifier
-
- Returns:
- DocContext or None
+ Get document context for a session (scoped by agent_id when provided).
+ Backward-compat: if new key missing, tries legacy doc_context:{session_id} (read-only).
"""
try:
- user_id = f"session:{session_id}"
-
- fact_key = f"doc_context:{session_id}"
-
- # Get fact from Memory Service
+ aid = (agent_id or "default").lower()
+ user_id = f"session:{aid}:{session_id}"
+ fact_key = f"doc_context:{aid}:{session_id}"
fact = await self.memory_client.get_fact(
user_id=user_id,
fact_key=fact_key
)
-
if fact and fact.get("fact_value_json"):
- logger.debug(f"Retrieved doc context for session {session_id}")
+ logger.debug(f"Retrieved doc context for session {session_id} agent={aid}")
ctx_data = fact.get("fact_value_json")
if isinstance(ctx_data, str):
try:
@@ -277,9 +260,23 @@ class DocumentService:
logger.warning("doc_context fact_value_json is not valid JSON string")
return None
return DocContext(**ctx_data)
-
+ # Backward-compat: legacy key
+ legacy_user_id = f"session:{session_id}"
+ legacy_key = f"doc_context:{session_id}"
+ fact_legacy = await self.memory_client.get_fact(
+ user_id=legacy_user_id,
+ fact_key=legacy_key
+ )
+ if fact_legacy and fact_legacy.get("fact_value_json"):
+ logger.debug(f"Retrieved doc context from legacy key for session {session_id}")
+ ctx_data = fact_legacy.get("fact_value_json")
+ if isinstance(ctx_data, str):
+ try:
+ ctx_data = json.loads(ctx_data)
+ except Exception:
+ return None
+ return DocContext(**ctx_data)
return None
-
except Exception as e:
logger.error(f"Failed to get doc context: {e}", exc_info=True)
return None
@@ -292,7 +289,8 @@ class DocumentService:
dao_id: str,
user_id: str,
output_mode: str = "qa_pairs",
- metadata: Optional[Dict[str, Any]] = None
+ metadata: Optional[Dict[str, Any]] = None,
+ agent_id: Optional[str] = None,
) -> ParsedResult:
"""
Parse a document directly through Swapper service.
@@ -372,7 +370,6 @@ class DocumentService:
# Generate a simple doc_id based on filename and timestamp
doc_id = hashlib.md5(f"{file_name}:{datetime.utcnow().isoformat()}".encode()).hexdigest()[:12]
- # Save document context for follow-up queries
await self.save_doc_context(
session_id=session_id,
doc_id=doc_id,
@@ -380,6 +377,7 @@ class DocumentService:
file_name=file_name,
dao_id=dao_id,
user_id=user_id,
+ agent_id=agent_id,
)
# Convert text to markdown format
@@ -433,6 +431,7 @@ class DocumentService:
file_name=file_name,
dao_id=dao_id,
user_id=user_id,
+ agent_id=agent_id,
)
return ParsedResult(
@@ -697,9 +696,10 @@ async def parse_document(
dao_id: str,
user_id: str,
output_mode: str = "qa_pairs",
- metadata: Optional[Dict[str, Any]] = None
+ metadata: Optional[Dict[str, Any]] = None,
+ agent_id: Optional[str] = None,
) -> ParsedResult:
- """Parse a document through DAGI Router"""
+ """Parse a document (agent_id scopes doc_context key)."""
return await doc_service.parse_document(
session_id=session_id,
doc_url=doc_url,
@@ -707,7 +707,8 @@ async def parse_document(
dao_id=dao_id,
user_id=user_id,
output_mode=output_mode,
- metadata=metadata
+ metadata=metadata,
+ agent_id=agent_id,
)
@@ -756,8 +757,9 @@ async def save_doc_context(
file_name: Optional[str] = None,
dao_id: Optional[str] = None,
user_id: Optional[str] = None,
+ agent_id: Optional[str] = None,
) -> bool:
- """Save document context for a session"""
+ """Save document context for a session (scoped by agent_id when provided)."""
return await doc_service.save_doc_context(
session_id=session_id,
doc_id=doc_id,
@@ -765,9 +767,10 @@ async def save_doc_context(
file_name=file_name,
dao_id=dao_id,
user_id=user_id,
+ agent_id=agent_id,
)
-async def get_doc_context(session_id: str) -> Optional[DocContext]:
- """Get document context for a session"""
- return await doc_service.get_doc_context(session_id)
+async def get_doc_context(session_id: str, agent_id: Optional[str] = None) -> Optional[DocContext]:
+ """Get document context for a session (scoped by agent_id when provided)."""
+ return await doc_service.get_doc_context(session_id, agent_id=agent_id)