from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Literal, Optional, Dict, Any, List import asyncio import json import os import re import yaml import httpx import logging from neo4j import AsyncGraphDatabase # Memory Retrieval Pipeline v3.0 try: from memory_retrieval import memory_retrieval, MemoryBrief MEMORY_RETRIEVAL_AVAILABLE = True except ImportError: MEMORY_RETRIEVAL_AVAILABLE = False memory_retrieval = None # Tool Manager for Function Calling try: from tool_manager import ToolManager, ToolResult, format_tool_calls_for_response TOOL_MANAGER_AVAILABLE = True except ImportError: TOOL_MANAGER_AVAILABLE = False ToolManager = None logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def _strip_dsml_keep_text_before(text: str) -> str: """If response contains DSML, return only the part before the first DSML-like tag. Otherwise return empty (caller will use fallback).""" if not text or len(text.strip()) < 10: return "" # Find first occurrence of DSML-like patterns (tag or keyword that starts markup) dsml_start_patterns = [ r"", # DSML variants (ASCII and Unicode separators, e.g. <|DSML|invoke ...>) r"<\s*(?:\|||)?\s*DSML", r"DSML\s*(?:\|||)", r"DSML\s*>\s*", ] earliest = len(text) for pat in dsml_start_patterns: m = re.search(pat, text, re.IGNORECASE | re.DOTALL) if m: earliest = min(earliest, m.start()) if earliest == 0: return "" prefix = text[:earliest].strip() # Remove trailing incomplete tags prefix = re.sub(r"<[^>]*$", "", prefix).strip() return prefix if len(prefix) > 30 else "" app = FastAPI(title="DAARION Router", version="2.0.0") # Configuration NATS_URL = os.getenv("NATS_URL", "nats://nats:4222") SWAPPER_URL = os.getenv("SWAPPER_URL", "http://swapper-service:8890") # All multimodal services now through Swapper STT_URL = os.getenv("STT_URL", "http://swapper-service:8890") # Swapper /stt endpoint TTS_URL = os.getenv("TTS_URL", "http://swapper-service:8890") # Swapper /tts endpoint VISION_URL = os.getenv("VISION_URL", "http://172.18.0.1:11434") # Host Ollama OCR_URL = os.getenv("OCR_URL", "http://swapper-service:8890") # Swapper /ocr endpoint DOCUMENT_URL = os.getenv("DOCUMENT_URL", "http://swapper-service:8890") # Swapper /document endpoint CITY_SERVICE_URL = os.getenv("CITY_SERVICE_URL", "http://daarion-city-service:7001") # Neo4j Configuration NEO4J_URI = os.getenv("NEO4J_BOLT_URL", "bolt://neo4j:7687") NEO4J_USER = os.getenv("NEO4J_USER", "neo4j") NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "DaarionNeo4j2026!") # HTTP client for backend services http_client: Optional[httpx.AsyncClient] = None # Neo4j driver neo4j_driver = None neo4j_available = False # NATS client nc = None nats_available = False # Tool Manager tool_manager = None # Models class FilterDecision(BaseModel): channel_id: str message_id: Optional[str] = None matrix_event_id: str microdao_id: str decision: Literal["allow", "deny", "modify"] target_agent_id: Optional[str] = None rewrite_prompt: Optional[str] = None class AgentInvocation(BaseModel): agent_id: str entrypoint: Literal["channel_message", "direct", "cron"] = "channel_message" payload: Dict[str, Any] # Load config def load_config(): config_path = "router_config.yaml" if os.path.exists(config_path): with open(config_path, 'r') as f: return yaml.safe_load(f) return { "messaging_inbound": { "enabled": True, "source_subject": "agent.filter.decision", "target_subject": "router.invoke.agent" } } def load_router_config(): """Load main router-config.yml with agents and LLM profiles""" # Try multiple locations paths = [ "router-config.yml", "/app/router-config.yml", "../router-config.yml", "../../router-config.yml" ] for path in paths: if os.path.exists(path): with open(path, 'r') as f: logger.info(f"✅ Loaded router config from {path}") return yaml.safe_load(f) logger.warning("⚠️ router-config.yml not found, using empty config") return {"agents": {}} config = load_config() router_config = load_router_config() @app.on_event("startup") async def startup_event(): """Initialize NATS connection and subscriptions""" global nc, nats_available, http_client, neo4j_driver, neo4j_available logger.info("🚀 DAGI Router v2.0.0 starting up...") # Initialize HTTP client http_client = httpx.AsyncClient(timeout=60.0) logger.info("✅ HTTP client initialized") # Initialize Neo4j connection try: neo4j_driver = AsyncGraphDatabase.driver( NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD) ) # Verify connection async with neo4j_driver.session() as session: result = await session.run("RETURN 1 as test") await result.consume() neo4j_available = True logger.info(f"✅ Connected to Neo4j at {NEO4J_URI}") except Exception as e: logger.warning(f"⚠️ Neo4j not available: {e}") neo4j_available = False # Try to connect to NATS try: import nats nc = await nats.connect(NATS_URL) nats_available = True logger.info(f"✅ Connected to NATS at {NATS_URL}") # Subscribe to filter decisions if enabled if config.get("messaging_inbound", {}).get("enabled", True): asyncio.create_task(subscribe_to_filter_decisions()) else: logger.warning("⚠️ Messaging inbound routing disabled in config") except Exception as e: logger.warning(f"⚠️ NATS not available: {e}") logger.warning("⚠️ Running in test mode (HTTP only)") nats_available = False # Initialize Memory Retrieval Pipeline if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval: try: await memory_retrieval.initialize() logger.info("✅ Memory Retrieval Pipeline initialized") except Exception as e: logger.warning(f"⚠️ Memory Retrieval init failed: {e}") # Initialize Tool Manager for function calling global tool_manager if TOOL_MANAGER_AVAILABLE and ToolManager: try: tool_manager = ToolManager(router_config) logger.info("✅ Tool Manager initialized with function calling") except Exception as e: logger.warning(f"⚠️ Tool Manager init failed: {e}") tool_manager = None else: tool_manager = None # Log backend URLs logger.info(f"📡 Swapper URL: {SWAPPER_URL}") logger.info(f"📡 STT URL: {STT_URL}") logger.info(f"📡 Vision URL: {VISION_URL}") logger.info(f"📡 OCR URL: {OCR_URL}") logger.info(f"📡 Neo4j URL: {NEO4J_URI}") async def subscribe_to_filter_decisions(): """Subscribe to agent.filter.decision events""" if not nc: return source_subject = config.get("messaging_inbound", {}).get( "source_subject", "agent.filter.decision" ) try: sub = await nc.subscribe(source_subject) print(f"✅ Subscribed to {source_subject}") async for msg in sub.messages: try: decision_data = json.loads(msg.data.decode()) await handle_filter_decision(decision_data) except Exception as e: print(f"❌ Error processing decision: {e}") import traceback traceback.print_exc() except Exception as e: print(f"❌ Subscription error: {e}") async def handle_filter_decision(decision_data: dict): """ Process agent.filter.decision events Only processes 'allow' decisions Creates AgentInvocation and publishes to router.invoke.agent """ try: print(f"\n🔀 Processing filter decision") decision = FilterDecision(**decision_data) # Only process 'allow' decisions if decision.decision != "allow": print(f"⏭️ Ignoring non-allow decision: {decision.decision}") return if not decision.target_agent_id: print(f"⚠️ No target agent specified, skipping") return print(f"✅ Decision: allow") print(f"📝 Target: {decision.target_agent_id}") print(f"📝 Channel: {decision.channel_id}") # Create AgentInvocation invocation = AgentInvocation( agent_id=decision.target_agent_id, entrypoint="channel_message", payload={ "channel_id": decision.channel_id, "message_id": decision.message_id, "matrix_event_id": decision.matrix_event_id, "microdao_id": decision.microdao_id, "rewrite_prompt": decision.rewrite_prompt } ) print(f"🚀 Created invocation for {invocation.agent_id}") # Publish to NATS await publish_agent_invocation(invocation) except Exception as e: print(f"❌ Error handling decision: {e}") import traceback traceback.print_exc() async def publish_agent_invocation(invocation: AgentInvocation): """Publish AgentInvocation to router.invoke.agent""" if nc and nats_available: target_subject = config.get("messaging_inbound", {}).get( "target_subject", "router.invoke.agent" ) try: await nc.publish(target_subject, invocation.json().encode()) print(f"✅ Published invocation to {target_subject}") except Exception as e: print(f"❌ Error publishing to NATS: {e}") else: print(f"⚠️ NATS not available, invocation not published: {invocation.json()}") @app.get("/health") async def health(): """Health check endpoint""" return { "status": "ok", "service": "router", "version": "1.0.0", "nats_connected": nats_available, "messaging_inbound_enabled": config.get("messaging_inbound", {}).get("enabled", True) } @app.post("/internal/router/test-messaging", response_model=AgentInvocation) async def test_messaging_route(decision: FilterDecision): """ Test endpoint for routing logic Tests filter decision → agent invocation mapping without NATS """ print(f"\n🧪 Test routing request") if decision.decision != "allow" or not decision.target_agent_id: raise HTTPException( status_code=400, detail=f"Decision not routable: {decision.decision}, agent: {decision.target_agent_id}" ) invocation = AgentInvocation( agent_id=decision.target_agent_id, entrypoint="channel_message", payload={ "channel_id": decision.channel_id, "message_id": decision.message_id, "matrix_event_id": decision.matrix_event_id, "microdao_id": decision.microdao_id, "rewrite_prompt": decision.rewrite_prompt } ) print(f"✅ Test invocation created for {invocation.agent_id}") return invocation @app.on_event("shutdown") async def shutdown_event(): """Clean shutdown""" global nc, http_client if nc: await nc.close() logger.info("✅ NATS connection closed") if http_client: await http_client.aclose() logger.info("✅ HTTP client closed") # ============================================================================ # Backend Integration Endpoints # ============================================================================ class InferRequest(BaseModel): """Request for agent inference""" prompt: str model: Optional[str] = None max_tokens: Optional[int] = 2048 temperature: Optional[float] = 0.7 system_prompt: Optional[str] = None images: Optional[List[str]] = None # List of base64 data URLs for vision metadata: Optional[Dict[str, Any]] = None # Additional metadata (user_id, chat_id, etc.) class InferResponse(BaseModel): """Response from agent inference""" response: str model: str tokens_used: Optional[int] = None backend: str image_base64: Optional[str] = None # Generated image in base64 format class BackendStatus(BaseModel): """Status of a backend service""" name: str url: str status: str # online, offline, error active_model: Optional[str] = None error: Optional[str] = None @app.get("/backends/status", response_model=List[BackendStatus]) async def get_backends_status(): """Get status of all backend services""" backends = [] # Check Swapper try: resp = await http_client.get(f"{SWAPPER_URL}/health", timeout=5.0) if resp.status_code == 200: data = resp.json() backends.append(BackendStatus( name="swapper", url=SWAPPER_URL, status="online", active_model=data.get("active_model") )) else: backends.append(BackendStatus( name="swapper", url=SWAPPER_URL, status="error", error=f"HTTP {resp.status_code}" )) except Exception as e: backends.append(BackendStatus( name="swapper", url=SWAPPER_URL, status="offline", error=str(e) )) # Check STT try: resp = await http_client.get(f"{STT_URL}/health", timeout=5.0) backends.append(BackendStatus( name="stt", url=STT_URL, status="online" if resp.status_code == 200 else "error" )) except Exception as e: backends.append(BackendStatus( name="stt", url=STT_URL, status="offline", error=str(e) )) # Check Vision (Ollama) try: resp = await http_client.get(f"{VISION_URL}/api/tags", timeout=5.0) if resp.status_code == 200: data = resp.json() models = [m.get("name") for m in data.get("models", [])] backends.append(BackendStatus( name="vision", url=VISION_URL, status="online", active_model=", ".join(models[:3]) if models else None )) else: backends.append(BackendStatus( name="vision", url=VISION_URL, status="error" )) except Exception as e: backends.append(BackendStatus( name="vision", url=VISION_URL, status="offline", error=str(e) )) # Check OCR try: resp = await http_client.get(f"{OCR_URL}/health", timeout=5.0) backends.append(BackendStatus( name="ocr", url=OCR_URL, status="online" if resp.status_code == 200 else "error" )) except Exception as e: backends.append(BackendStatus( name="ocr", url=OCR_URL, status="offline", error=str(e) )) return backends @app.post("/v1/agents/{agent_id}/infer", response_model=InferResponse) async def agent_infer(agent_id: str, request: InferRequest): """ Route inference request to appropriate backend. Router decides which backend to use based on: - Agent configuration (model, capabilities) - Request type (text, vision, audio) - Backend availability System prompt is fetched from database via city-service API. Memory context is retrieved via Memory Retrieval Pipeline v3.0. """ logger.info(f"🔀 Inference request for agent: {agent_id}") logger.info(f"📝 Prompt: {request.prompt[:100]}...") # ========================================================================= # MEMORY RETRIEVAL (v4.0 - Universal for all agents) # ========================================================================= memory_brief_text = "" # Extract metadata once for both retrieval and storage metadata = request.metadata or {} channel = "telegram" # Default chat_id = str(metadata.get("chat_id", "")) user_id = str(metadata.get("user_id", "")).replace("tg:", "") username = metadata.get("username") # Get agent_id from metadata or URL parameter request_agent_id = metadata.get("agent_id", agent_id).lower() if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval: try: if chat_id and user_id: brief = await memory_retrieval.retrieve( channel=channel, chat_id=chat_id, user_id=user_id, agent_id=request_agent_id, # Agent-specific collections username=username, message=request.prompt ) memory_brief_text = brief.to_text(max_lines=10) if memory_brief_text: logger.info(f"🧠 Memory brief for {request_agent_id}: {len(memory_brief_text)} chars") except Exception as e: logger.warning(f"⚠️ Memory retrieval failed for {request_agent_id}: {e}") # Get system prompt from database or config system_prompt = request.system_prompt # Debug logging for system prompt if system_prompt: logger.info(f"📝 Received system_prompt from request: {len(system_prompt)} chars") logger.debug(f"System prompt preview: {system_prompt[:200]}...") else: logger.warning(f"⚠️ No system_prompt in request for agent {agent_id}, trying to load...") if not system_prompt: try: from prompt_builder import get_agent_system_prompt system_prompt = await get_agent_system_prompt( agent_id, city_service_url=CITY_SERVICE_URL, router_config=router_config ) logger.info(f"✅ Loaded system prompt from database for {agent_id}") except Exception as e: logger.warning(f"⚠️ Could not load prompt from database: {e}") # Fallback to config agent_config = router_config.get("agents", {}).get(agent_id, {}) system_prompt = agent_config.get("system_prompt") # Determine which backend to use # Use router config to get default model for agent, fallback to qwen3-8b agent_config = router_config.get("agents", {}).get(agent_id, {}) default_llm = agent_config.get("default_llm", "qwen3-8b") # Check if there's a routing rule for this agent routing_rules = router_config.get("routing", []) for rule in routing_rules: if rule.get("when", {}).get("agent") == agent_id: if "use_llm" in rule: default_llm = rule.get("use_llm") logger.info(f"🎯 Agent {agent_id} routing to: {default_llm}") break # Get LLM profile configuration llm_profiles = router_config.get("llm_profiles", {}) llm_profile = llm_profiles.get(default_llm, {}) provider = llm_profile.get("provider", "ollama") # Determine model name if provider in ["deepseek", "openai", "anthropic", "mistral"]: model = llm_profile.get("model", "deepseek-chat") else: # For local ollama, use swapper model name format model = request.model or "qwen3-8b" # ========================================================================= # VISION PROCESSING (if images present) # ========================================================================= if request.images and len(request.images) > 0: logger.info(f"🖼️ Vision request: {len(request.images)} image(s)") try: # Use Swapper's /vision endpoint (manages model loading) vision_payload = { "model": "qwen3-vl-8b", "prompt": request.prompt, "images": request.images, # Swapper handles data URL conversion "max_tokens": request.max_tokens or 1024, "temperature": request.temperature or 0.7 } # Add system prompt if available if system_prompt: if memory_brief_text: vision_payload["system"] = f"{system_prompt}\n\n[Контекст пам'яті]\n{memory_brief_text}" else: vision_payload["system"] = system_prompt logger.info(f"🖼️ Sending to Swapper /vision: {SWAPPER_URL}/vision") vision_resp = await http_client.post( f"{SWAPPER_URL}/vision", json=vision_payload, timeout=120.0 ) if vision_resp.status_code == 200: vision_data = vision_resp.json() full_response = vision_data.get("text", "") # Debug: log full response structure logger.info(f"✅ Vision response: {len(full_response)} chars, success={vision_data.get('success')}, keys={list(vision_data.keys())}") if not full_response: logger.warning(f"⚠️ Empty vision response! Full data: {str(vision_data)[:500]}") # Store vision message in agent-specific memory if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id and full_response: asyncio.create_task( memory_retrieval.store_message( agent_id=request_agent_id, user_id=user_id, username=username, message_text=f"[Image] {request.prompt}", response_text=full_response, chat_id=chat_id, message_type="vision" ) ) return InferResponse( response=full_response, model="qwen3-vl-8b", tokens_used=None, backend="swapper-vision" ) else: logger.error(f"❌ Swapper vision error: {vision_resp.status_code} - {vision_resp.text[:200]}") # Fall through to text processing except Exception as e: logger.error(f"❌ Vision processing failed: {e}", exc_info=True) # Fall through to text processing # ========================================================================= # SMART LLM ROUTER WITH AUTO-FALLBACK # Priority: DeepSeek → Mistral → Grok → Local Ollama # ========================================================================= # Build messages array once for all providers messages = [] if system_prompt: if memory_brief_text: enhanced_prompt = f"{system_prompt}\n\n[Контекст пам'яті]\n{memory_brief_text}" messages.append({"role": "system", "content": enhanced_prompt}) logger.info(f"📝 Added system message with prompt ({len(system_prompt)} chars) + memory ({len(memory_brief_text)} chars)") else: messages.append({"role": "system", "content": system_prompt}) logger.info(f"📝 Added system message with prompt ({len(system_prompt)} chars)") elif memory_brief_text: messages.append({"role": "system", "content": f"[Контекст пам'яті]\n{memory_brief_text}"}) logger.warning(f"⚠️ No system_prompt! Using only memory brief ({len(memory_brief_text)} chars)") else: logger.error(f"❌ No system_prompt AND no memory_brief! LLM will have no context!") messages.append({"role": "user", "content": request.prompt}) logger.debug(f"📨 Messages array: {len(messages)} messages, system={len(messages[0].get('content', '')) if messages else 0} chars") max_tokens = request.max_tokens or llm_profile.get("max_tokens", 2048) temperature = request.temperature or llm_profile.get("temperature", 0.2) # Define cloud providers with fallback order cloud_providers = [ { "name": "deepseek", "api_key_env": "DEEPSEEK_API_KEY", "base_url": "https://api.deepseek.com", "model": "deepseek-chat", "timeout": 40 }, { "name": "mistral", "api_key_env": "MISTRAL_API_KEY", "base_url": "https://api.mistral.ai", "model": "mistral-large-latest", "timeout": 60 }, { "name": "grok", "api_key_env": "GROK_API_KEY", "base_url": "https://api.x.ai", "model": "grok-2-1212", "timeout": 60 } ] # If specific provider requested, try it first if provider in ["deepseek", "mistral", "grok"]: # Reorder to put requested provider first cloud_providers = sorted(cloud_providers, key=lambda x: 0 if x["name"] == provider else 1) last_error = None # Get tool definitions if Tool Manager is available tools_payload = None if TOOL_MANAGER_AVAILABLE and tool_manager: tools_payload = tool_manager.get_tool_definitions() logger.debug(f"🔧 {len(tools_payload)} tools available for function calling") for cloud in cloud_providers: api_key = os.getenv(cloud["api_key_env"]) if not api_key: logger.debug(f"⏭️ Skipping {cloud['name']}: API key not configured") continue try: logger.info(f"🌐 Trying {cloud['name'].upper()} API with model: {cloud['model']}") # Build request payload request_payload = { "model": cloud["model"], "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "stream": False } # Add tools for function calling (if available and supported) if tools_payload and cloud["name"] in ["deepseek", "mistral", "grok"]: request_payload["tools"] = tools_payload request_payload["tool_choice"] = "auto" cloud_resp = await http_client.post( f"{cloud['base_url']}/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json=request_payload, timeout=cloud["timeout"] ) if cloud_resp.status_code == 200: data = cloud_resp.json() choice = data.get("choices", [{}])[0] message = choice.get("message", {}) response_text = message.get("content", "") or "" tokens_used = data.get("usage", {}).get("total_tokens", 0) # Initialize tool_results to avoid UnboundLocalError tool_results = [] # Check for tool calls (standard format) tool_calls = message.get("tool_calls", []) # Also check for DSML format in content (DeepSeek sometimes returns this) # AGGRESSIVE check - any DSML-like pattern should be caught import re has_dsml = False if response_text: # Check for DSML patterns with regex (handles Unicode variations) dsml_patterns_check = [ r'DSML', # Any mention of DSML r'function_calls>', r'invoke\s*name\s*=', r'parameter\s*name\s*=', r'<[^>]*invoke[^>]*>', r']*invoke[^>]*>', ] for pattern in dsml_patterns_check: if re.search(pattern, response_text, re.IGNORECASE): has_dsml = True logger.warning(f"⚠️ DSML detected via pattern: {pattern}") break if has_dsml: logger.warning("⚠️ Detected DSML format in content, parsing...") # Extract tool name and parameters from DSML import re # Try multiple DSML patterns dsml_patterns = [ r'invoke name="(\w+)".*?parameter name="(\w+)"[^>]*>([^<]+)', r'invoke\s+name="(\w+)".*?parameter\s+name="(\w+)"[^>]*>([^<]+)', r'name="web_extract".*?url.*?>([^\s<]+)', ] dsml_match = None for pattern in dsml_patterns: dsml_match = re.search(pattern, response_text, re.DOTALL | re.IGNORECASE) if dsml_match: break if dsml_match and len(dsml_match.groups()) >= 3: tool_name = dsml_match.group(1) param_name = dsml_match.group(2) param_value = dsml_match.group(3).strip() # Create synthetic tool call with Mistral-compatible ID (exactly 9 chars, a-zA-Z0-9) import string import random tool_call_id = ''.join(random.choices(string.ascii_letters + string.digits, k=9)) tool_calls = [{ "id": tool_call_id, "function": { "name": tool_name, "arguments": json.dumps({param_name: param_value}) } }] logger.info(f"🔧 Parsed DSML tool call: {tool_name}({param_name}={param_value[:50]}...) id={tool_call_id}") # ALWAYS clear DSML content - never show raw DSML to user logger.warning(f"🧹 Clearing DSML content from response ({len(response_text)} chars)") response_text = "" if tool_calls and tool_manager: logger.info(f"🔧 LLM requested {len(tool_calls)} tool call(s)") # Execute each tool call tool_results = [] for tc in tool_calls: func = tc.get("function", {}) tool_name = func.get("name", "") try: tool_args = json.loads(func.get("arguments", "{}")) except: tool_args = {} result = await tool_manager.execute_tool(tool_name, tool_args) tool_result_dict = { "tool_call_id": tc.get("id", ""), "name": tool_name, "success": result.success, "result": result.result, "error": result.error, "image_base64": result.image_base64 # Store image if generated } if result.image_base64: logger.info(f"🖼️ Tool {tool_name} generated image: {len(result.image_base64)} chars") tool_results.append(tool_result_dict) # Append tool results to messages and call LLM again messages.append({"role": "assistant", "content": None, "tool_calls": tool_calls}) for tr in tool_results: messages.append({ "role": "tool", "tool_call_id": tr["tool_call_id"], "content": str(tr["result"]) if tr["success"] else f"Error: {tr['error']}" }) # Second call to get final response logger.info(f"🔄 Calling LLM again with tool results") final_payload = { "model": cloud["model"], "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "stream": False } # Don't include tools in second call (some APIs don't support it) # Tools are only needed in first call final_resp = await http_client.post( f"{cloud['base_url']}/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json=final_payload, timeout=cloud["timeout"] ) if final_resp.status_code == 200: final_data = final_resp.json() response_text = final_data.get("choices", [{}])[0].get("message", {}).get("content", "") # CRITICAL: Check for DSML in second response too! if response_text and ("DSML" in response_text or "invoke name=" in response_text or "function_calls>" in response_text): prefix_before_dsml = _strip_dsml_keep_text_before(response_text) if prefix_before_dsml: logger.warning(f"🧹 DSML in 2nd response: keeping text before DSML ({len(prefix_before_dsml)} chars), discarding {len(response_text) - len(prefix_before_dsml)} chars") response_text = prefix_before_dsml else: logger.warning(f"🧹 DSML detected in 2nd LLM response, trying 3rd call ({len(response_text)} chars)") # Third LLM call: explicitly ask to synthesize tool results tool_summary_parts = [] for tr in tool_results: if tr.get("success") and tr.get("result"): res_text = str(tr["result"])[:500] tool_summary_parts.append(f"Tool '{tr['name']}' returned: {res_text}") if tool_summary_parts: synthesis_prompt = "Based on the following tool results, provide a helpful response to the user in their language. Do NOT use any markup or XML. Just respond naturally.\n\n" + "\n".join(tool_summary_parts) try: synth_resp = await http_client.post( f"{cloud['base_url']}/v1/chat/completions", headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, json={"model": cloud["model"], "messages": [ {"role": "system", "content": system_prompt or "You are a helpful assistant. Respond naturally."}, {"role": "user", "content": synthesis_prompt} ], "max_tokens": max_tokens, "temperature": 0.3, "stream": False}, timeout=cloud["timeout"] ) if synth_resp.status_code == 200: synth_data = synth_resp.json() synth_text = synth_data.get("choices", [{}])[0].get("message", {}).get("content", "") if synth_text and "DSML" not in synth_text and "invoke" not in synth_text: response_text = synth_text tokens_used += synth_data.get("usage", {}).get("total_tokens", 0) logger.info("\u2705 3rd LLM call synthesized clean response from tool results") else: response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected") else: response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected") except Exception as synth_err: logger.warning(f"3rd LLM call failed: {synth_err}") response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected") else: response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected") if not response_text: logger.warning(f"⚠️ {cloud['name'].upper()} returned empty response after tool call") # Fallback to tool result summary response_text = format_tool_calls_for_response(tool_results, fallback_mode="empty_response") tokens_used += final_data.get("usage", {}).get("total_tokens", 0) else: logger.error(f"❌ {cloud['name'].upper()} second call failed: {final_resp.status_code} - {final_resp.text[:200]}") # Fallback to tool result summary response_text = format_tool_calls_for_response(tool_results, fallback_mode="empty_response") if response_text: # FINAL DSML check before returning - never show DSML to user if "DSML" in response_text or "invoke name=" in response_text or "function_calls>" in response_text: prefix_before_dsml = _strip_dsml_keep_text_before(response_text) if prefix_before_dsml: logger.warning(f"🧹 DSML in final response: keeping text before DSML ({len(prefix_before_dsml)} chars)") response_text = prefix_before_dsml else: logger.warning(f"🧹 DSML in final response! Replacing with fallback ({len(response_text)} chars)") response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected") # Check if any tool generated an image generated_image = None logger.debug(f"🔍 Checking {len(tool_results)} tool results for images...") for tr in tool_results: img_b64 = tr.get("image_base64") if img_b64: generated_image = img_b64 logger.info(f"🖼️ Image generated by tool: {tr['name']} ({len(img_b64)} chars)") break else: logger.debug(f" Tool {tr['name']}: no image_base64") logger.info(f"✅ {cloud['name'].upper()} response received, {tokens_used} tokens") # Store message in agent-specific memory (async, non-blocking) if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id: asyncio.create_task( memory_retrieval.store_message( agent_id=request_agent_id, user_id=user_id, username=username, message_text=request.prompt, response_text=response_text, chat_id=chat_id ) ) return InferResponse( response=response_text, model=cloud["model"], tokens_used=tokens_used, backend=f"{cloud['name']}-cloud", image_base64=generated_image ) else: logger.warning(f"⚠️ {cloud['name'].upper()} returned empty response, trying next provider") continue else: logger.warning(f"⚠️ {cloud['name'].upper()} returned {cloud_resp.status_code}, trying next...") last_error = f"{cloud['name']}: {cloud_resp.status_code}" continue except Exception as e: import traceback error_details = traceback.format_exc() logger.warning(f"⚠️ {cloud['name'].upper()} failed: {e}") if not str(e).strip(): # Empty error string logger.error(f"❌ {cloud['name'].upper()} failed with empty error! Check traceback:") logger.error(error_details) else: logger.debug(f"Full error traceback: {error_details}") last_error = f"{cloud['name']}: {str(e)}" continue # If all cloud providers failed, log and fall through to local if last_error: logger.warning(f"⚠️ All cloud providers failed ({last_error}), falling back to local Ollama") # ========================================================================= # LOCAL PROVIDERS (Ollama via Swapper) # ========================================================================= # Determine local model from config (not hardcoded) # Strategy: Use agent's default_llm if it's local (ollama), otherwise find first local model local_model = None # Check if default_llm is local if llm_profile.get("provider") == "ollama": # Extract model name and convert format (qwen3:8b → qwen3-8b for Swapper) ollama_model = llm_profile.get("model", "qwen3:8b") local_model = ollama_model.replace(":", "-") # qwen3:8b → qwen3-8b logger.debug(f"✅ Using agent's default local model: {local_model}") else: # Find first local model from config for profile_name, profile in llm_profiles.items(): if profile.get("provider") == "ollama": ollama_model = profile.get("model", "qwen3:8b") local_model = ollama_model.replace(":", "-") logger.info(f"🔄 Found fallback local model: {local_model} from profile {profile_name}") break # Final fallback if no local model found if not local_model: local_model = "qwen3-8b" logger.warning(f"⚠️ No local model in config, using hardcoded fallback: {local_model}") try: # Check if Swapper is available health_resp = await http_client.get(f"{SWAPPER_URL}/health", timeout=5.0) if health_resp.status_code == 200: logger.info(f"📡 Calling Swapper with local model: {local_model}") # Generate response via Swapper (which handles model loading) generate_resp = await http_client.post( f"{SWAPPER_URL}/generate", json={ "model": local_model, "prompt": request.prompt, "system": system_prompt, "max_tokens": request.max_tokens, "temperature": request.temperature, "stream": False }, timeout=300.0 ) if generate_resp.status_code == 200: data = generate_resp.json() local_response = data.get("response", "") # Store in agent-specific memory if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id and local_response: asyncio.create_task( memory_retrieval.store_message( agent_id=request_agent_id, user_id=user_id, username=username, message_text=request.prompt, response_text=local_response, chat_id=chat_id ) ) return InferResponse( response=local_response, model=local_model, tokens_used=data.get("eval_count", 0), backend="swapper+ollama" ) else: logger.error(f"❌ Swapper error: {generate_resp.status_code} - {generate_resp.text}") except Exception as e: logger.error(f"❌ Swapper/Ollama error: {e}") # Fallback to direct Ollama if Swapper fails try: logger.info(f"🔄 Falling back to direct Ollama connection") generate_resp = await http_client.post( f"{VISION_URL}/api/generate", json={ "model": "qwen3:8b", # Use actual Ollama model name "prompt": request.prompt, "system": system_prompt, "stream": False, "options": { "num_predict": request.max_tokens, "temperature": request.temperature } }, timeout=120.0 ) if generate_resp.status_code == 200: data = generate_resp.json() return InferResponse( response=data.get("response", ""), model=model, tokens_used=data.get("eval_count", 0), backend="ollama-direct" ) except Exception as e2: logger.error(f"❌ Direct Ollama fallback also failed: {e2}") # Fallback: return error raise HTTPException( status_code=503, detail=f"No backend available for model: {model}" ) @app.get("/v1/models") async def list_available_models(): """List all available models across backends""" models = [] # Get Swapper models try: resp = await http_client.get(f"{SWAPPER_URL}/models", timeout=5.0) if resp.status_code == 200: data = resp.json() for m in data.get("models", []): models.append({ "id": m.get("name"), "backend": "swapper", "size_gb": m.get("size_gb"), "status": m.get("status", "available") }) except Exception as e: logger.warning(f"Cannot get Swapper models: {e}") # Get Ollama models try: resp = await http_client.get(f"{VISION_URL}/api/tags", timeout=5.0) if resp.status_code == 200: data = resp.json() for m in data.get("models", []): # Avoid duplicates model_name = m.get("name") if not any(x.get("id") == model_name for x in models): models.append({ "id": model_name, "backend": "ollama", "size_gb": round(m.get("size", 0) / 1e9, 1), "status": "loaded" }) except Exception as e: logger.warning(f"Cannot get Ollama models: {e}") return {"models": models, "total": len(models)} # ============================================================================= # NEO4J GRAPH API ENDPOINTS # ============================================================================= class GraphNode(BaseModel): """Model for creating/updating a graph node""" label: str # Node type: User, Agent, Topic, Fact, Entity, etc. properties: Dict[str, Any] node_id: Optional[str] = None # If provided, update existing node class GraphRelationship(BaseModel): """Model for creating a relationship between nodes""" from_node_id: str to_node_id: str relationship_type: str # KNOWS, MENTIONED, RELATED_TO, CREATED_BY, etc. properties: Optional[Dict[str, Any]] = None class GraphQuery(BaseModel): """Model for querying the graph""" cypher: Optional[str] = None # Direct Cypher query (advanced) # Or use structured query: node_label: Optional[str] = None node_id: Optional[str] = None relationship_type: Optional[str] = None depth: int = 1 # How many hops to traverse limit: int = 50 class GraphSearchRequest(BaseModel): """Natural language search in graph""" query: str entity_types: Optional[List[str]] = None # Filter by types limit: int = 20 @app.post("/v1/graph/nodes") async def create_graph_node(node: GraphNode): """Create or update a node in the knowledge graph""" if not neo4j_available or not neo4j_driver: raise HTTPException(status_code=503, detail="Neo4j not available") try: async with neo4j_driver.session() as session: # Generate node_id if not provided node_id = node.node_id or f"{node.label.lower()}_{os.urandom(8).hex()}" # Build properties with node_id props = {**node.properties, "node_id": node_id, "updated_at": "datetime()"} # Create or merge node cypher = f""" MERGE (n:{node.label} {{node_id: $node_id}}) SET n += $properties SET n.updated_at = datetime() RETURN n """ result = await session.run(cypher, node_id=node_id, properties=node.properties) record = await result.single() if record: created_node = dict(record["n"]) logger.info(f"📊 Created/updated node: {node.label} - {node_id}") return {"status": "ok", "node_id": node_id, "node": created_node} raise HTTPException(status_code=500, detail="Failed to create node") except Exception as e: logger.error(f"❌ Neo4j error creating node: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/v1/graph/relationships") async def create_graph_relationship(rel: GraphRelationship): """Create a relationship between two nodes""" if not neo4j_available or not neo4j_driver: raise HTTPException(status_code=503, detail="Neo4j not available") try: async with neo4j_driver.session() as session: props_clause = "" if rel.properties: props_clause = " SET r += $properties" cypher = f""" MATCH (a {{node_id: $from_id}}) MATCH (b {{node_id: $to_id}}) MERGE (a)-[r:{rel.relationship_type}]->(b) {props_clause} SET r.created_at = datetime() RETURN a.node_id as from_id, b.node_id as to_id, type(r) as rel_type """ result = await session.run( cypher, from_id=rel.from_node_id, to_id=rel.to_node_id, properties=rel.properties or {} ) record = await result.single() if record: logger.info(f"🔗 Created relationship: {rel.from_node_id} -[{rel.relationship_type}]-> {rel.to_node_id}") return { "status": "ok", "from_id": record["from_id"], "to_id": record["to_id"], "relationship_type": record["rel_type"] } raise HTTPException(status_code=404, detail="One or both nodes not found") except Exception as e: logger.error(f"❌ Neo4j error creating relationship: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/v1/graph/query") async def query_graph(query: GraphQuery): """Query the knowledge graph""" if not neo4j_available or not neo4j_driver: raise HTTPException(status_code=503, detail="Neo4j not available") try: async with neo4j_driver.session() as session: # If direct Cypher provided, use it (with safety check) if query.cypher: # Basic safety: only allow read queries if any(kw in query.cypher.upper() for kw in ["DELETE", "REMOVE", "DROP", "CREATE", "MERGE", "SET"]): raise HTTPException(status_code=400, detail="Only read queries allowed via cypher parameter") result = await session.run(query.cypher) records = await result.data() return {"status": "ok", "results": records, "count": len(records)} # Build structured query if query.node_id: # Get specific node with relationships cypher = f""" MATCH (n {{node_id: $node_id}}) OPTIONAL MATCH (n)-[r]-(related) RETURN n, collect({{rel: type(r), node: related}}) as connections LIMIT 1 """ result = await session.run(cypher, node_id=query.node_id) elif query.node_label: # Get nodes by label cypher = f""" MATCH (n:{query.node_label}) RETURN n ORDER BY n.updated_at DESC LIMIT $limit """ result = await session.run(cypher, limit=query.limit) else: # Get recent nodes cypher = """ MATCH (n) RETURN n, labels(n) as labels ORDER BY n.updated_at DESC LIMIT $limit """ result = await session.run(cypher, limit=query.limit) records = await result.data() return {"status": "ok", "results": records, "count": len(records)} except Exception as e: logger.error(f"❌ Neo4j query error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/v1/graph/search") async def search_graph(q: str, types: Optional[str] = None, limit: int = 20): """Search nodes by text in properties""" if not neo4j_available or not neo4j_driver: raise HTTPException(status_code=503, detail="Neo4j not available") try: async with neo4j_driver.session() as session: # Build label filter label_filter = "" if types: labels = [t.strip() for t in types.split(",")] label_filter = " AND (" + " OR ".join([f"n:{l}" for l in labels]) + ")" # Search in common text properties cypher = f""" MATCH (n) WHERE ( n.name CONTAINS $query OR n.title CONTAINS $query OR n.text CONTAINS $query OR n.description CONTAINS $query OR n.content CONTAINS $query ){label_filter} RETURN n, labels(n) as labels ORDER BY n.updated_at DESC LIMIT $limit """ result = await session.run(cypher, query=q, limit=limit) records = await result.data() return {"status": "ok", "query": q, "results": records, "count": len(records)} except Exception as e: logger.error(f"❌ Neo4j search error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/v1/graph/stats") async def get_graph_stats(): """Get knowledge graph statistics""" if not neo4j_available or not neo4j_driver: raise HTTPException(status_code=503, detail="Neo4j not available") try: async with neo4j_driver.session() as session: # Get node counts by label labels_result = await session.run(""" CALL db.labels() YIELD label CALL apoc.cypher.run('MATCH (n:`' + label + '`) RETURN count(n) as count', {}) YIELD value RETURN label, value.count as count """) # If APOC not available, use simpler query try: labels_data = await labels_result.data() except: labels_result = await session.run(""" MATCH (n) RETURN labels(n)[0] as label, count(*) as count ORDER BY count DESC """) labels_data = await labels_result.data() # Get relationship counts rels_result = await session.run(""" MATCH ()-[r]->() RETURN type(r) as type, count(*) as count ORDER BY count DESC """) rels_data = await rels_result.data() # Get total counts total_result = await session.run(""" MATCH (n) RETURN count(n) as nodes """) total_nodes = (await total_result.single())["nodes"] total_rels_result = await session.run(""" MATCH ()-[r]->() RETURN count(r) as relationships """) total_rels = (await total_rels_result.single())["relationships"] return { "status": "ok", "total_nodes": total_nodes, "total_relationships": total_rels, "nodes_by_label": labels_data, "relationships_by_type": rels_data } except Exception as e: logger.error(f"❌ Neo4j stats error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.on_event("shutdown") async def shutdown_event(): """Cleanup connections on shutdown""" global neo4j_driver, http_client, nc # Close Memory Retrieval if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval: try: await memory_retrieval.close() logger.info("🔌 Memory Retrieval closed") except Exception as e: logger.warning(f"⚠️ Memory Retrieval close error: {e}") if neo4j_driver: await neo4j_driver.close() logger.info("🔌 Neo4j connection closed") if http_client: await http_client.aclose() logger.info("🔌 HTTP client closed") if nc: await nc.close() logger.info("🔌 NATS connection closed")