Router (main.py): - When DSML detected in 2nd LLM response after tool execution, make a 3rd LLM call with explicit synthesis prompt instead of returning raw tool results to the user - Falls back to format_tool_calls_for_response only if 3rd call fails Router (tool_manager.py): - Added _strip_think_tags() helper for <think>...</think> removal from DeepSeek reasoning artifacts Gateway (http_api.py): - Strip <think>...</think> tags before sending to Telegram - Strip DSML/XML-like markup (function_calls, invoke, parameter tags) - Ensure empty text after stripping gets "..." fallback Deployed to NODE1 and verified services running. Co-authored-by: Cursor <cursoragent@cursor.com>
1614 lines
66 KiB
Python
1614 lines
66 KiB
Python
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import Response
|
|
from pydantic import BaseModel
|
|
from typing import Literal, Optional, Dict, Any, List
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import yaml
|
|
import httpx
|
|
import logging
|
|
import time # For latency metrics
|
|
|
|
# CrewAI Integration
|
|
try:
|
|
from crewai_client import should_use_crewai, call_crewai, get_crewai_health
|
|
CREWAI_CLIENT_AVAILABLE = True
|
|
except ImportError:
|
|
CREWAI_CLIENT_AVAILABLE = False
|
|
should_use_crewai = None
|
|
call_crewai = None
|
|
from neo4j import AsyncGraphDatabase
|
|
|
|
# Memory Retrieval Pipeline v3.0
|
|
try:
|
|
from memory_retrieval import memory_retrieval, MemoryBrief
|
|
MEMORY_RETRIEVAL_AVAILABLE = True
|
|
except ImportError:
|
|
MEMORY_RETRIEVAL_AVAILABLE = False
|
|
memory_retrieval = None
|
|
|
|
# Tool Manager for Function Calling
|
|
try:
|
|
from tool_manager import ToolManager, ToolResult, format_tool_calls_for_response
|
|
TOOL_MANAGER_AVAILABLE = True
|
|
except ImportError:
|
|
TOOL_MANAGER_AVAILABLE = False
|
|
ToolManager = None
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = FastAPI(title="DAARION Router", version="2.0.0")
|
|
|
|
# Configuration
|
|
NATS_URL = os.getenv("NATS_URL", "nats://nats:4222")
|
|
SWAPPER_URL = os.getenv("SWAPPER_URL", "http://swapper-service:8890")
|
|
# All multimodal services now through Swapper
|
|
STT_URL = os.getenv("STT_URL", "http://swapper-service:8890") # Swapper /stt endpoint
|
|
TTS_URL = os.getenv("TTS_URL", "http://swapper-service:8890") # Swapper /tts endpoint
|
|
VISION_URL = os.getenv("VISION_URL", "http://172.18.0.1:11434") # Host Ollama
|
|
OCR_URL = os.getenv("OCR_URL", "http://swapper-service:8890") # Swapper /ocr endpoint
|
|
DOCUMENT_URL = os.getenv("DOCUMENT_URL", "http://swapper-service:8890") # Swapper /document endpoint
|
|
CITY_SERVICE_URL = os.getenv("CITY_SERVICE_URL", "http://daarion-city-service:7001")
|
|
|
|
# CrewAI Routing Configuration
|
|
CREWAI_ROUTING_ENABLED = os.getenv("CREWAI_ROUTING_ENABLED", "true").lower() == "true"
|
|
CREWAI_URL = os.getenv("CREWAI_URL", "http://dagi-staging-crewai-service:9010")
|
|
|
|
# Neo4j Configuration
|
|
NEO4J_URI = os.getenv("NEO4J_BOLT_URL", "bolt://neo4j:7687")
|
|
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
|
|
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "DaarionNeo4j2026!")
|
|
|
|
# HTTP client for backend services
|
|
http_client: Optional[httpx.AsyncClient] = None
|
|
|
|
# Neo4j driver
|
|
neo4j_driver = None
|
|
neo4j_available = False
|
|
|
|
# NATS client
|
|
nc = None
|
|
nats_available = False
|
|
|
|
# Tool Manager
|
|
tool_manager = None
|
|
|
|
# Models
|
|
class FilterDecision(BaseModel):
|
|
channel_id: str
|
|
message_id: Optional[str] = None
|
|
matrix_event_id: str
|
|
microdao_id: str
|
|
decision: Literal["allow", "deny", "modify"]
|
|
target_agent_id: Optional[str] = None
|
|
rewrite_prompt: Optional[str] = None
|
|
|
|
class AgentInvocation(BaseModel):
|
|
agent_id: str
|
|
entrypoint: Literal["channel_message", "direct", "cron"] = "channel_message"
|
|
payload: Dict[str, Any]
|
|
|
|
# Load config
|
|
def load_config():
|
|
config_path = "router_config.yaml"
|
|
if os.path.exists(config_path):
|
|
with open(config_path, 'r') as f:
|
|
return yaml.safe_load(f)
|
|
return {
|
|
"messaging_inbound": {
|
|
"enabled": True,
|
|
"source_subject": "agent.filter.decision",
|
|
"target_subject": "router.invoke.agent"
|
|
}
|
|
}
|
|
|
|
def load_router_config():
|
|
"""Load main router-config.yml with agents and LLM profiles"""
|
|
# Try multiple locations
|
|
paths = [
|
|
"router-config.yml",
|
|
"/app/router-config.yml",
|
|
"../router-config.yml",
|
|
"../../router-config.yml"
|
|
]
|
|
|
|
for path in paths:
|
|
if os.path.exists(path):
|
|
with open(path, 'r') as f:
|
|
logger.info(f"✅ Loaded router config from {path}")
|
|
return yaml.safe_load(f)
|
|
|
|
logger.warning("⚠️ router-config.yml not found, using empty config")
|
|
return {"agents": {}}
|
|
|
|
config = load_config()
|
|
router_config = load_router_config()
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Initialize NATS connection and subscriptions"""
|
|
global nc, nats_available, http_client, neo4j_driver, neo4j_available
|
|
logger.info("🚀 DAGI Router v2.0.0 starting up...")
|
|
|
|
# Initialize HTTP client
|
|
http_client = httpx.AsyncClient(timeout=60.0)
|
|
logger.info("✅ HTTP client initialized")
|
|
|
|
# Initialize Neo4j connection
|
|
try:
|
|
neo4j_driver = AsyncGraphDatabase.driver(
|
|
NEO4J_URI,
|
|
auth=(NEO4J_USER, NEO4J_PASSWORD)
|
|
)
|
|
# Verify connection
|
|
async with neo4j_driver.session() as session:
|
|
result = await session.run("RETURN 1 as test")
|
|
await result.consume()
|
|
neo4j_available = True
|
|
logger.info(f"✅ Connected to Neo4j at {NEO4J_URI}")
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Neo4j not available: {e}")
|
|
neo4j_available = False
|
|
|
|
# Try to connect to NATS
|
|
try:
|
|
import nats
|
|
nc = await nats.connect(NATS_URL)
|
|
nats_available = True
|
|
logger.info(f"✅ Connected to NATS at {NATS_URL}")
|
|
|
|
# Subscribe to filter decisions if enabled
|
|
if config.get("messaging_inbound", {}).get("enabled", True):
|
|
asyncio.create_task(subscribe_to_filter_decisions())
|
|
else:
|
|
logger.warning("⚠️ Messaging inbound routing disabled in config")
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ NATS not available: {e}")
|
|
logger.warning("⚠️ Running in test mode (HTTP only)")
|
|
nats_available = False
|
|
|
|
# Initialize Memory Retrieval Pipeline
|
|
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval:
|
|
try:
|
|
await memory_retrieval.initialize()
|
|
logger.info("✅ Memory Retrieval Pipeline initialized")
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Memory Retrieval init failed: {e}")
|
|
|
|
# Initialize Tool Manager for function calling
|
|
global tool_manager
|
|
if TOOL_MANAGER_AVAILABLE and ToolManager:
|
|
try:
|
|
tool_manager = ToolManager(router_config)
|
|
logger.info("✅ Tool Manager initialized with function calling")
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Tool Manager init failed: {e}")
|
|
tool_manager = None
|
|
else:
|
|
tool_manager = None
|
|
|
|
# Log backend URLs
|
|
logger.info(f"📡 Swapper URL: {SWAPPER_URL}")
|
|
logger.info(f"📡 STT URL: {STT_URL}")
|
|
logger.info(f"📡 Vision URL: {VISION_URL}")
|
|
logger.info(f"📡 OCR URL: {OCR_URL}")
|
|
logger.info(f"📡 Neo4j URL: {NEO4J_URI}")
|
|
|
|
async def subscribe_to_filter_decisions():
|
|
"""Subscribe to agent.filter.decision events"""
|
|
if not nc:
|
|
return
|
|
|
|
source_subject = config.get("messaging_inbound", {}).get(
|
|
"source_subject",
|
|
"agent.filter.decision"
|
|
)
|
|
|
|
try:
|
|
sub = await nc.subscribe(source_subject)
|
|
print(f"✅ Subscribed to {source_subject}")
|
|
|
|
async for msg in sub.messages:
|
|
try:
|
|
decision_data = json.loads(msg.data.decode())
|
|
await handle_filter_decision(decision_data)
|
|
except Exception as e:
|
|
print(f"❌ Error processing decision: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
except Exception as e:
|
|
print(f"❌ Subscription error: {e}")
|
|
|
|
async def handle_filter_decision(decision_data: dict):
|
|
"""
|
|
Process agent.filter.decision events
|
|
|
|
Only processes 'allow' decisions
|
|
Creates AgentInvocation and publishes to router.invoke.agent
|
|
"""
|
|
try:
|
|
print(f"\n🔀 Processing filter decision")
|
|
decision = FilterDecision(**decision_data)
|
|
|
|
# Only process 'allow' decisions
|
|
if decision.decision != "allow":
|
|
print(f"⏭️ Ignoring non-allow decision: {decision.decision}")
|
|
return
|
|
|
|
if not decision.target_agent_id:
|
|
print(f"⚠️ No target agent specified, skipping")
|
|
return
|
|
|
|
print(f"✅ Decision: allow")
|
|
print(f"📝 Target: {decision.target_agent_id}")
|
|
print(f"📝 Channel: {decision.channel_id}")
|
|
|
|
# Create AgentInvocation
|
|
invocation = AgentInvocation(
|
|
agent_id=decision.target_agent_id,
|
|
entrypoint="channel_message",
|
|
payload={
|
|
"channel_id": decision.channel_id,
|
|
"message_id": decision.message_id,
|
|
"matrix_event_id": decision.matrix_event_id,
|
|
"microdao_id": decision.microdao_id,
|
|
"rewrite_prompt": decision.rewrite_prompt
|
|
}
|
|
)
|
|
|
|
print(f"🚀 Created invocation for {invocation.agent_id}")
|
|
|
|
# Publish to NATS
|
|
await publish_agent_invocation(invocation)
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error handling decision: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
async def publish_agent_invocation(invocation: AgentInvocation):
|
|
"""Publish AgentInvocation to router.invoke.agent"""
|
|
if nc and nats_available:
|
|
target_subject = config.get("messaging_inbound", {}).get(
|
|
"target_subject",
|
|
"router.invoke.agent"
|
|
)
|
|
|
|
try:
|
|
await nc.publish(target_subject, invocation.json().encode())
|
|
print(f"✅ Published invocation to {target_subject}")
|
|
except Exception as e:
|
|
print(f"❌ Error publishing to NATS: {e}")
|
|
else:
|
|
print(f"⚠️ NATS not available, invocation not published: {invocation.json()}")
|
|
|
|
|
|
|
|
# ==============================================================
|
|
# PROMETHEUS METRICS ENDPOINT
|
|
# ==============================================================
|
|
@app.get("/metrics")
|
|
async def prometheus_metrics():
|
|
"""Prometheus metrics endpoint."""
|
|
try:
|
|
from agent_metrics import get_metrics, get_content_type
|
|
return Response(content=get_metrics(), media_type=get_content_type())
|
|
except Exception as e:
|
|
logger.error(f"Metrics error: {e}")
|
|
return Response(content=b"# Error generating metrics", media_type="text/plain")
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
"""Health check endpoint"""
|
|
return {
|
|
"status": "ok",
|
|
"service": "router",
|
|
"version": "1.0.0",
|
|
"nats_connected": nats_available,
|
|
"messaging_inbound_enabled": config.get("messaging_inbound", {}).get("enabled", True)
|
|
}
|
|
|
|
@app.post("/internal/router/test-messaging", response_model=AgentInvocation)
|
|
async def test_messaging_route(decision: FilterDecision):
|
|
"""
|
|
Test endpoint for routing logic
|
|
|
|
Tests filter decision → agent invocation mapping without NATS
|
|
"""
|
|
print(f"\n🧪 Test routing request")
|
|
|
|
if decision.decision != "allow" or not decision.target_agent_id:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Decision not routable: {decision.decision}, agent: {decision.target_agent_id}"
|
|
)
|
|
|
|
invocation = AgentInvocation(
|
|
agent_id=decision.target_agent_id,
|
|
entrypoint="channel_message",
|
|
payload={
|
|
"channel_id": decision.channel_id,
|
|
"message_id": decision.message_id,
|
|
"matrix_event_id": decision.matrix_event_id,
|
|
"microdao_id": decision.microdao_id,
|
|
"rewrite_prompt": decision.rewrite_prompt
|
|
}
|
|
)
|
|
|
|
print(f"✅ Test invocation created for {invocation.agent_id}")
|
|
return invocation
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown_event():
|
|
"""Clean shutdown"""
|
|
global nc, http_client
|
|
if nc:
|
|
await nc.close()
|
|
logger.info("✅ NATS connection closed")
|
|
if http_client:
|
|
await http_client.aclose()
|
|
logger.info("✅ HTTP client closed")
|
|
|
|
|
|
# ============================================================================
|
|
# Backend Integration Endpoints
|
|
# ============================================================================
|
|
|
|
class InferRequest(BaseModel):
|
|
"""Request for agent inference"""
|
|
prompt: str
|
|
model: Optional[str] = None
|
|
max_tokens: Optional[int] = 2048
|
|
temperature: Optional[float] = 0.7
|
|
system_prompt: Optional[str] = None
|
|
images: Optional[List[str]] = None # List of base64 data URLs for vision
|
|
metadata: Optional[Dict[str, Any]] = None # Additional metadata (user_id, chat_id, etc.)
|
|
|
|
|
|
class InferResponse(BaseModel):
|
|
"""Response from agent inference"""
|
|
response: str
|
|
model: str
|
|
tokens_used: Optional[int] = None
|
|
backend: str
|
|
image_base64: Optional[str] = None # Generated image in base64 format
|
|
|
|
|
|
|
|
|
|
# =========================================================================
|
|
# INTERNAL LLM API (for CrewAI and internal services)
|
|
# =========================================================================
|
|
|
|
class InternalLLMRequest(BaseModel):
|
|
prompt: str
|
|
system_prompt: Optional[str] = None
|
|
llm_profile: Optional[str] = "reasoning"
|
|
model: Optional[str] = None
|
|
max_tokens: Optional[int] = 2048
|
|
temperature: Optional[float] = 0.2
|
|
role_context: Optional[str] = None
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
class InternalLLMResponse(BaseModel):
|
|
text: str
|
|
model: str
|
|
provider: str
|
|
tokens_used: int = 0
|
|
latency_ms: int = 0
|
|
|
|
|
|
class BackendStatus(BaseModel):
|
|
"""Status of a backend service"""
|
|
name: str
|
|
url: str
|
|
status: str # online, offline, error
|
|
active_model: Optional[str] = None
|
|
error: Optional[str] = None
|
|
|
|
|
|
@app.get("/backends/status", response_model=List[BackendStatus])
|
|
async def get_backends_status():
|
|
"""Get status of all backend services"""
|
|
backends = []
|
|
|
|
# Check Swapper
|
|
try:
|
|
resp = await http_client.get(f"{SWAPPER_URL}/health", timeout=5.0)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
backends.append(BackendStatus(
|
|
name="swapper",
|
|
url=SWAPPER_URL,
|
|
status="online",
|
|
active_model=data.get("active_model")
|
|
))
|
|
else:
|
|
backends.append(BackendStatus(
|
|
name="swapper",
|
|
url=SWAPPER_URL,
|
|
status="error",
|
|
error=f"HTTP {resp.status_code}"
|
|
))
|
|
except Exception as e:
|
|
backends.append(BackendStatus(
|
|
name="swapper",
|
|
url=SWAPPER_URL,
|
|
status="offline",
|
|
error=str(e)
|
|
))
|
|
|
|
# Check STT
|
|
try:
|
|
resp = await http_client.get(f"{STT_URL}/health", timeout=5.0)
|
|
backends.append(BackendStatus(
|
|
name="stt",
|
|
url=STT_URL,
|
|
status="online" if resp.status_code == 200 else "error"
|
|
))
|
|
except Exception as e:
|
|
backends.append(BackendStatus(
|
|
name="stt",
|
|
url=STT_URL,
|
|
status="offline",
|
|
error=str(e)
|
|
))
|
|
|
|
# Check Vision (Ollama)
|
|
try:
|
|
resp = await http_client.get(f"{VISION_URL}/api/tags", timeout=5.0)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
models = [m.get("name") for m in data.get("models", [])]
|
|
backends.append(BackendStatus(
|
|
name="vision",
|
|
url=VISION_URL,
|
|
status="online",
|
|
active_model=", ".join(models[:3]) if models else None
|
|
))
|
|
else:
|
|
backends.append(BackendStatus(
|
|
name="vision",
|
|
url=VISION_URL,
|
|
status="error"
|
|
))
|
|
except Exception as e:
|
|
backends.append(BackendStatus(
|
|
name="vision",
|
|
url=VISION_URL,
|
|
status="offline",
|
|
error=str(e)
|
|
))
|
|
|
|
# Check OCR
|
|
try:
|
|
resp = await http_client.get(f"{OCR_URL}/health", timeout=5.0)
|
|
backends.append(BackendStatus(
|
|
name="ocr",
|
|
url=OCR_URL,
|
|
status="online" if resp.status_code == 200 else "error"
|
|
))
|
|
except Exception as e:
|
|
backends.append(BackendStatus(
|
|
name="ocr",
|
|
url=OCR_URL,
|
|
status="offline",
|
|
error=str(e)
|
|
))
|
|
|
|
return backends
|
|
|
|
|
|
|
|
|
|
# =========================================================================
|
|
# INTERNAL LLM COMPLETE ENDPOINT (for CrewAI)
|
|
# =========================================================================
|
|
|
|
@app.post("/internal/llm/complete", response_model=InternalLLMResponse)
|
|
async def internal_llm_complete(request: InternalLLMRequest):
|
|
"""
|
|
Internal LLM completion endpoint.
|
|
NO routing, NO CrewAI decision, NO agent selection.
|
|
Used by CrewAI service for multi-role orchestration.
|
|
"""
|
|
import time as time_module
|
|
t0 = time_module.time()
|
|
|
|
logger.info(f"Internal LLM: profile={request.llm_profile}, role={request.role_context}")
|
|
|
|
# Get LLM profile configuration
|
|
llm_profiles = router_config.get("llm_profiles", {})
|
|
profile_name = request.llm_profile or "reasoning"
|
|
llm_profile = llm_profiles.get(profile_name, {})
|
|
|
|
provider = llm_profile.get("provider", "deepseek")
|
|
model = request.model or llm_profile.get("model", "deepseek-chat")
|
|
max_tokens = request.max_tokens or llm_profile.get("max_tokens", 2048)
|
|
temperature = request.temperature or llm_profile.get("temperature", 0.2)
|
|
|
|
# Build messages
|
|
messages = []
|
|
if request.system_prompt:
|
|
system_content = request.system_prompt
|
|
if request.role_context:
|
|
system_content = f"[Role: {request.role_context}]\n\n{system_content}"
|
|
messages.append({"role": "system", "content": system_content})
|
|
elif request.role_context:
|
|
messages.append({"role": "system", "content": f"You are acting as {request.role_context}. Respond professionally."})
|
|
|
|
messages.append({"role": "user", "content": request.prompt})
|
|
|
|
# Cloud providers
|
|
cloud_providers = [
|
|
{"name": "deepseek", "api_key_env": "DEEPSEEK_API_KEY", "base_url": "https://api.deepseek.com", "model": "deepseek-chat", "timeout": 60},
|
|
{"name": "mistral", "api_key_env": "MISTRAL_API_KEY", "base_url": "https://api.mistral.ai", "model": "mistral-large-latest", "timeout": 60},
|
|
{"name": "grok", "api_key_env": "GROK_API_KEY", "base_url": "https://api.x.ai", "model": "grok-2-1212", "timeout": 60}
|
|
]
|
|
|
|
if provider in ["deepseek", "mistral", "grok"]:
|
|
cloud_providers = sorted(cloud_providers, key=lambda x: 0 if x["name"] == provider else 1)
|
|
|
|
# Try cloud providers
|
|
for cloud in cloud_providers:
|
|
api_key = os.getenv(cloud["api_key_env"])
|
|
if not api_key:
|
|
continue
|
|
|
|
try:
|
|
logger.debug(f"Internal LLM trying {cloud['name']}")
|
|
cloud_resp = await http_client.post(
|
|
f"{cloud['base_url']}/v1/chat/completions",
|
|
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
|
|
json={"model": cloud["model"], "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "stream": False},
|
|
timeout=cloud["timeout"]
|
|
)
|
|
|
|
if cloud_resp.status_code == 200:
|
|
data = cloud_resp.json()
|
|
response_text = data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
|
tokens = data.get("usage", {}).get("total_tokens", 0)
|
|
latency = int((time_module.time() - t0) * 1000)
|
|
logger.info(f"Internal LLM success: {cloud['name']}, {tokens} tokens, {latency}ms")
|
|
return InternalLLMResponse(text=response_text, model=cloud["model"], provider=cloud["name"], tokens_used=tokens, latency_ms=latency)
|
|
except Exception as e:
|
|
logger.warning(f"Internal LLM {cloud['name']} failed: {e}")
|
|
continue
|
|
|
|
# Fallback to Ollama
|
|
try:
|
|
logger.info("Internal LLM fallback to Ollama")
|
|
ollama_resp = await http_client.post(
|
|
"http://172.18.0.1:11434/api/generate",
|
|
json={"model": "qwen3:8b", "prompt": request.prompt, "system": request.system_prompt or "", "stream": False, "options": {"num_predict": max_tokens, "temperature": temperature}},
|
|
timeout=120.0
|
|
)
|
|
if ollama_resp.status_code == 200:
|
|
data = ollama_resp.json()
|
|
latency = int((time_module.time() - t0) * 1000)
|
|
return InternalLLMResponse(text=data.get("response", ""), model="qwen3:8b", provider="ollama", tokens_used=0, latency_ms=latency)
|
|
except Exception as e:
|
|
logger.error(f"Internal LLM Ollama failed: {e}")
|
|
|
|
raise HTTPException(status_code=503, detail="All LLM providers unavailable")
|
|
|
|
|
|
@app.post("/v1/agents/{agent_id}/infer", response_model=InferResponse)
|
|
async def agent_infer(agent_id: str, request: InferRequest):
|
|
"""
|
|
Route inference request to appropriate backend.
|
|
|
|
Router decides which backend to use based on:
|
|
- Agent configuration (model, capabilities)
|
|
- Request type (text, vision, audio)
|
|
- Backend availability
|
|
|
|
System prompt is fetched from database via city-service API.
|
|
Memory context is retrieved via Memory Retrieval Pipeline v3.0.
|
|
"""
|
|
logger.info(f"🔀 Inference request for agent: {agent_id}")
|
|
logger.info(f"📝 Prompt: {request.prompt[:100]}...")
|
|
|
|
# =========================================================================
|
|
# MEMORY RETRIEVAL (v4.0 - Universal for all agents)
|
|
# =========================================================================
|
|
memory_brief_text = ""
|
|
# Extract metadata once for both retrieval and storage
|
|
metadata = request.metadata or {}
|
|
channel = "telegram" # Default
|
|
chat_id = str(metadata.get("chat_id", ""))
|
|
user_id = str(metadata.get("user_id", "")).replace("tg:", "")
|
|
username = metadata.get("username")
|
|
# Get agent_id from metadata or URL parameter
|
|
request_agent_id = metadata.get("agent_id", agent_id).lower()
|
|
|
|
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval:
|
|
try:
|
|
if chat_id and user_id:
|
|
brief = await memory_retrieval.retrieve(
|
|
channel=channel,
|
|
chat_id=chat_id,
|
|
user_id=user_id,
|
|
agent_id=request_agent_id, # Agent-specific collections
|
|
username=username,
|
|
message=request.prompt
|
|
)
|
|
memory_brief_text = brief.to_text(max_lines=10)
|
|
if memory_brief_text:
|
|
logger.info(f"🧠 Memory brief for {request_agent_id}: {len(memory_brief_text)} chars")
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Memory retrieval failed for {request_agent_id}: {e}")
|
|
|
|
# Get system prompt from database or config
|
|
system_prompt = request.system_prompt
|
|
|
|
# Debug logging for system prompt
|
|
if system_prompt:
|
|
logger.info(f"📝 Received system_prompt from request: {len(system_prompt)} chars")
|
|
logger.debug(f"System prompt preview: {system_prompt[:200]}...")
|
|
else:
|
|
logger.warning(f"⚠️ No system_prompt in request for agent {agent_id}, trying to load...")
|
|
|
|
if not system_prompt:
|
|
try:
|
|
from prompt_builder import get_agent_system_prompt
|
|
system_prompt = await get_agent_system_prompt(
|
|
agent_id,
|
|
city_service_url=CITY_SERVICE_URL,
|
|
router_config=router_config
|
|
)
|
|
logger.info(f"✅ Loaded system prompt from database for {agent_id}")
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Could not load prompt from database: {e}")
|
|
# Fallback to config
|
|
agent_config = router_config.get("agents", {}).get(agent_id, {})
|
|
system_prompt = agent_config.get("system_prompt")
|
|
|
|
# Determine which backend to use
|
|
# Use router config to get default model for agent, fallback to qwen3:8b
|
|
agent_config = router_config.get("agents", {}).get(agent_id, {})
|
|
|
|
# =========================================================================
|
|
# CREWAI DECISION: Use orchestration or direct LLM?
|
|
# =========================================================================
|
|
if CREWAI_ROUTING_ENABLED and CREWAI_CLIENT_AVAILABLE:
|
|
try:
|
|
# Get agent CrewAI config from registry (or router_config fallback)
|
|
crewai_cfg = agent_config.get("crewai", {})
|
|
|
|
use_crewai, crewai_reason = should_use_crewai(
|
|
agent_id=agent_id,
|
|
prompt=request.prompt,
|
|
agent_config=agent_config,
|
|
force_crewai=request.metadata.get("force_crewai", False) if request.metadata else False,
|
|
|
|
)
|
|
|
|
logger.info(f"🎭 CrewAI decision for {agent_id}: {use_crewai} ({crewai_reason})")
|
|
|
|
if use_crewai:
|
|
t0 = time.time()
|
|
crew_result = await call_crewai(
|
|
agent_id=agent_id,
|
|
task=request.prompt,
|
|
context={
|
|
"memory_brief": memory_brief_text,
|
|
"system_prompt": system_prompt,
|
|
"metadata": metadata,
|
|
},
|
|
team=crewai_cfg.get("team")
|
|
)
|
|
|
|
latency = time.time() - t0
|
|
|
|
if crew_result.get("success") and crew_result.get("result"):
|
|
logger.info(f"✅ CrewAI success for {agent_id}: {latency:.2f}s")
|
|
|
|
# Store interaction in memory
|
|
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id:
|
|
try:
|
|
await memory_retrieval.store_interaction(
|
|
channel=channel,
|
|
chat_id=chat_id,
|
|
user_id=user_id,
|
|
agent_id=request_agent_id,
|
|
username=username,
|
|
user_message=request.prompt,
|
|
assistant_response=crew_result["result"]
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Memory storage failed: {e}")
|
|
|
|
return InferResponse(
|
|
response=crew_result["result"],
|
|
model="crewai-" + agent_id,
|
|
provider="crewai",
|
|
tokens_used=0,
|
|
latency_ms=int(latency * 1000)
|
|
)
|
|
else:
|
|
logger.warning(f"⚠️ CrewAI failed, falling back to direct LLM")
|
|
except Exception as e:
|
|
logger.exception(f"❌ CrewAI error: {e}, falling back to direct LLM")
|
|
|
|
default_llm = agent_config.get("default_llm", "qwen3:8b")
|
|
|
|
# Check if there's a routing rule for this agent
|
|
routing_rules = router_config.get("routing", [])
|
|
for rule in routing_rules:
|
|
if rule.get("when", {}).get("agent") == agent_id:
|
|
if "use_llm" in rule:
|
|
default_llm = rule.get("use_llm")
|
|
logger.info(f"🎯 Agent {agent_id} routing to: {default_llm}")
|
|
break
|
|
|
|
# Get LLM profile configuration
|
|
llm_profiles = router_config.get("llm_profiles", {})
|
|
llm_profile = llm_profiles.get(default_llm, {})
|
|
provider = llm_profile.get("provider", "ollama")
|
|
|
|
# Determine model name
|
|
if provider in ["deepseek", "openai", "anthropic", "mistral"]:
|
|
model = llm_profile.get("model", "deepseek-chat")
|
|
else:
|
|
# For local ollama, use swapper model name format
|
|
model = request.model or "qwen3:8b"
|
|
|
|
# =========================================================================
|
|
# VISION PROCESSING (if images present)
|
|
# =========================================================================
|
|
if request.images and len(request.images) > 0:
|
|
logger.info(f"🖼️ Vision request: {len(request.images)} image(s)")
|
|
try:
|
|
# Use Swapper's /vision endpoint (manages model loading)
|
|
vision_payload = {
|
|
"model": "qwen3-vl-8b",
|
|
"prompt": request.prompt,
|
|
"images": request.images, # Swapper handles data URL conversion
|
|
"max_tokens": request.max_tokens or 1024,
|
|
"temperature": request.temperature or 0.7
|
|
}
|
|
|
|
# Add system prompt if available
|
|
if system_prompt:
|
|
if memory_brief_text:
|
|
vision_payload["system"] = f"{system_prompt}\n\n[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}"
|
|
else:
|
|
vision_payload["system"] = system_prompt
|
|
|
|
logger.info(f"🖼️ Sending to Swapper /vision: {SWAPPER_URL}/vision")
|
|
|
|
vision_resp = await http_client.post(
|
|
f"{SWAPPER_URL}/vision",
|
|
json=vision_payload,
|
|
timeout=120.0
|
|
)
|
|
|
|
if vision_resp.status_code == 200:
|
|
vision_data = vision_resp.json()
|
|
full_response = vision_data.get("text", "")
|
|
|
|
# Debug: log full response structure
|
|
logger.info(f"✅ Vision response: {len(full_response)} chars, success={vision_data.get('success')}, keys={list(vision_data.keys())}")
|
|
if not full_response:
|
|
logger.warning(f"⚠️ Empty vision response! Full data: {str(vision_data)[:500]}")
|
|
|
|
# Store vision message in agent-specific memory
|
|
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id and full_response:
|
|
asyncio.create_task(
|
|
memory_retrieval.store_message(
|
|
agent_id=request_agent_id,
|
|
user_id=user_id,
|
|
username=username,
|
|
message_text=f"[Image] {request.prompt}",
|
|
response_text=full_response,
|
|
chat_id=chat_id,
|
|
message_type="vision"
|
|
)
|
|
)
|
|
|
|
return InferResponse(
|
|
response=full_response,
|
|
model="qwen3-vl-8b",
|
|
tokens_used=None,
|
|
backend="swapper-vision"
|
|
)
|
|
else:
|
|
logger.error(f"❌ Swapper vision error: {vision_resp.status_code} - {vision_resp.text[:200]}")
|
|
# Fall through to text processing
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Vision processing failed: {e}", exc_info=True)
|
|
# Fall through to text processing
|
|
|
|
# =========================================================================
|
|
# SMART LLM ROUTER WITH AUTO-FALLBACK
|
|
# Priority: DeepSeek → Mistral → Grok → Local Ollama
|
|
# =========================================================================
|
|
|
|
# Build messages array once for all providers
|
|
messages = []
|
|
if system_prompt:
|
|
if memory_brief_text:
|
|
enhanced_prompt = f"{system_prompt}\n\n[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}"
|
|
messages.append({"role": "system", "content": enhanced_prompt})
|
|
logger.info(f"📝 Added system message with prompt ({len(system_prompt)} chars) + memory ({len(memory_brief_text)} chars)")
|
|
else:
|
|
messages.append({"role": "system", "content": system_prompt})
|
|
logger.info(f"📝 Added system message with prompt ({len(system_prompt)} chars)")
|
|
elif memory_brief_text:
|
|
messages.append({"role": "system", "content": f"[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}"})
|
|
logger.warning(f"⚠️ No system_prompt! Using only memory brief ({len(memory_brief_text)} chars)")
|
|
else:
|
|
logger.error(f"❌ No system_prompt AND no memory_brief! LLM will have no context!")
|
|
|
|
messages.append({"role": "user", "content": request.prompt})
|
|
logger.debug(f"📨 Messages array: {len(messages)} messages, system={len(messages[0].get('content', '')) if messages else 0} chars")
|
|
|
|
max_tokens = request.max_tokens or llm_profile.get("max_tokens", 2048)
|
|
temperature = request.temperature or llm_profile.get("temperature", 0.2)
|
|
|
|
# Define cloud providers with fallback order
|
|
cloud_providers = [
|
|
{
|
|
"name": "deepseek",
|
|
"api_key_env": "DEEPSEEK_API_KEY",
|
|
"base_url": "https://api.deepseek.com",
|
|
"model": "deepseek-chat",
|
|
"timeout": 40
|
|
},
|
|
{
|
|
"name": "mistral",
|
|
"api_key_env": "MISTRAL_API_KEY",
|
|
"base_url": "https://api.mistral.ai",
|
|
"model": "mistral-large-latest",
|
|
"timeout": 60
|
|
},
|
|
{
|
|
"name": "grok",
|
|
"api_key_env": "GROK_API_KEY",
|
|
"base_url": "https://api.x.ai",
|
|
"model": "grok-2-1212",
|
|
"timeout": 60
|
|
}
|
|
]
|
|
|
|
# If specific provider requested, try it first
|
|
if provider in ["deepseek", "mistral", "grok"]:
|
|
# Reorder to put requested provider first
|
|
cloud_providers = sorted(cloud_providers, key=lambda x: 0 if x["name"] == provider else 1)
|
|
|
|
last_error = None
|
|
|
|
# Get tool definitions if Tool Manager is available
|
|
tools_payload = None
|
|
if TOOL_MANAGER_AVAILABLE and tool_manager:
|
|
tools_payload = tool_manager.get_tool_definitions()
|
|
logger.debug(f"🔧 {len(tools_payload)} tools available for function calling")
|
|
|
|
for cloud in cloud_providers:
|
|
api_key = os.getenv(cloud["api_key_env"])
|
|
if not api_key:
|
|
logger.debug(f"⏭️ Skipping {cloud['name']}: API key not configured")
|
|
continue
|
|
|
|
try:
|
|
logger.info(f"🌐 Trying {cloud['name'].upper()} API with model: {cloud['model']}")
|
|
|
|
# Build request payload
|
|
request_payload = {
|
|
"model": cloud["model"],
|
|
"messages": messages,
|
|
"max_tokens": max_tokens,
|
|
"temperature": temperature,
|
|
"stream": False
|
|
}
|
|
|
|
# Add tools for function calling (if available and supported)
|
|
if tools_payload and cloud["name"] in ["deepseek", "mistral", "grok"]:
|
|
request_payload["tools"] = tools_payload
|
|
request_payload["tool_choice"] = "auto"
|
|
|
|
cloud_resp = await http_client.post(
|
|
f"{cloud['base_url']}/v1/chat/completions",
|
|
headers={
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json"
|
|
},
|
|
json=request_payload,
|
|
timeout=cloud["timeout"]
|
|
)
|
|
|
|
if cloud_resp.status_code == 200:
|
|
data = cloud_resp.json()
|
|
choice = data.get("choices", [{}])[0]
|
|
message = choice.get("message", {})
|
|
response_text = message.get("content", "") or ""
|
|
tokens_used = data.get("usage", {}).get("total_tokens", 0)
|
|
|
|
# Initialize tool_results to avoid UnboundLocalError
|
|
tool_results = []
|
|
|
|
# Check for tool calls (standard format)
|
|
tool_calls = message.get("tool_calls", [])
|
|
|
|
# Also check for DSML format in content (DeepSeek sometimes returns this)
|
|
# AGGRESSIVE check - any DSML-like pattern should be caught
|
|
import re
|
|
has_dsml = False
|
|
if response_text:
|
|
# Check for DSML patterns with regex (handles Unicode variations)
|
|
dsml_patterns_check = [
|
|
r'DSML', # Any mention of DSML
|
|
r'function_calls>',
|
|
r'invoke\s*name\s*=',
|
|
r'parameter\s*name\s*=',
|
|
r'<[^>]*invoke[^>]*>',
|
|
r'</[^>]*invoke[^>]*>',
|
|
]
|
|
for pattern in dsml_patterns_check:
|
|
if re.search(pattern, response_text, re.IGNORECASE):
|
|
has_dsml = True
|
|
logger.warning(f"⚠️ DSML detected via pattern: {pattern}")
|
|
break
|
|
|
|
if has_dsml:
|
|
logger.warning("⚠️ Detected DSML format in content, parsing...")
|
|
# Extract tool name and parameters from DSML
|
|
import re
|
|
# Try multiple DSML patterns
|
|
dsml_patterns = [
|
|
r'invoke name="(\w+)".*?parameter name="(\w+)"[^>]*>([^<]+)',
|
|
r'invoke\s+name="(\w+)".*?parameter\s+name="(\w+)"[^>]*>([^<]+)',
|
|
r'name="web_extract".*?url.*?>([^\s<]+)',
|
|
]
|
|
dsml_match = None
|
|
for pattern in dsml_patterns:
|
|
dsml_match = re.search(pattern, response_text, re.DOTALL | re.IGNORECASE)
|
|
if dsml_match:
|
|
break
|
|
|
|
if dsml_match and len(dsml_match.groups()) >= 3:
|
|
tool_name = dsml_match.group(1)
|
|
param_name = dsml_match.group(2)
|
|
param_value = dsml_match.group(3).strip()
|
|
# Create synthetic tool call with Mistral-compatible ID (exactly 9 chars, a-zA-Z0-9)
|
|
import string
|
|
import random
|
|
tool_call_id = ''.join(random.choices(string.ascii_letters + string.digits, k=9))
|
|
tool_calls = [{
|
|
"id": tool_call_id,
|
|
"function": {
|
|
"name": tool_name,
|
|
"arguments": json.dumps({param_name: param_value})
|
|
}
|
|
}]
|
|
logger.info(f"🔧 Parsed DSML tool call: {tool_name}({param_name}={param_value[:50]}...) id={tool_call_id}")
|
|
|
|
# ALWAYS clear DSML content - never show raw DSML to user
|
|
logger.warning(f"🧹 Clearing DSML content from response ({len(response_text)} chars)")
|
|
response_text = ""
|
|
if tool_calls and tool_manager:
|
|
logger.info(f"🔧 LLM requested {len(tool_calls)} tool call(s)")
|
|
|
|
# Execute each tool call
|
|
tool_results = []
|
|
for tc in tool_calls:
|
|
func = tc.get("function", {})
|
|
tool_name = func.get("name", "")
|
|
try:
|
|
tool_args = json.loads(func.get("arguments", "{}"))
|
|
except:
|
|
tool_args = {}
|
|
|
|
result = await tool_manager.execute_tool(tool_name, tool_args, agent_id=request_agent_id)
|
|
tool_result_dict = {
|
|
"tool_call_id": tc.get("id", ""),
|
|
"name": tool_name,
|
|
"success": result.success,
|
|
"result": result.result,
|
|
"error": result.error,
|
|
"image_base64": result.image_base64 # Store image if generated
|
|
}
|
|
if result.image_base64:
|
|
logger.info(f"🖼️ Tool {tool_name} generated image: {len(result.image_base64)} chars")
|
|
tool_results.append(tool_result_dict)
|
|
|
|
# Append tool results to messages and call LLM again
|
|
messages.append({"role": "assistant", "content": None, "tool_calls": tool_calls})
|
|
|
|
for tr in tool_results:
|
|
messages.append({
|
|
"role": "tool",
|
|
"tool_call_id": tr["tool_call_id"],
|
|
"content": str(tr["result"]) if tr["success"] else f"Error: {tr['error']}"
|
|
})
|
|
|
|
# Second call to get final response
|
|
logger.info(f"🔄 Calling LLM again with tool results")
|
|
final_payload = {
|
|
"model": cloud["model"],
|
|
"messages": messages,
|
|
"max_tokens": max_tokens,
|
|
"temperature": temperature,
|
|
"stream": False
|
|
}
|
|
# Don't include tools in second call (some APIs don't support it)
|
|
# Tools are only needed in first call
|
|
|
|
final_resp = await http_client.post(
|
|
f"{cloud['base_url']}/v1/chat/completions",
|
|
headers={
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json"
|
|
},
|
|
json=final_payload,
|
|
timeout=cloud["timeout"]
|
|
)
|
|
|
|
if final_resp.status_code == 200:
|
|
final_data = final_resp.json()
|
|
response_text = final_data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
|
|
|
# CRITICAL: Check for DSML in second response too!
|
|
if response_text and "DSML" in response_text:
|
|
logger.warning(f"🧹 DSML detected in 2nd LLM response, trying 3rd call ({len(response_text)} chars)")
|
|
# Third LLM call: explicitly ask to synthesize tool results
|
|
tool_summary_parts = []
|
|
for tr in tool_results:
|
|
if tr.get("success") and tr.get("result"):
|
|
res_text = str(tr["result"])[:500]
|
|
tool_summary_parts.append(f"Tool '{tr['name']}' returned: {res_text}")
|
|
if tool_summary_parts:
|
|
synthesis_prompt = "Based on the following tool results, provide a helpful response to the user in their language. Do NOT use any markup or XML. Just respond naturally.\n\n" + "\n".join(tool_summary_parts)
|
|
try:
|
|
synth_resp = await http_client.post(
|
|
f"{cloud['base_url']}/v1/chat/completions",
|
|
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
|
|
json={"model": cloud["model"], "messages": [
|
|
{"role": "system", "content": system_prompt or "You are a helpful assistant. Respond naturally."},
|
|
{"role": "user", "content": synthesis_prompt}
|
|
], "max_tokens": max_tokens, "temperature": 0.3, "stream": False},
|
|
timeout=cloud["timeout"]
|
|
)
|
|
if synth_resp.status_code == 200:
|
|
synth_data = synth_resp.json()
|
|
synth_text = synth_data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
|
if synth_text and "DSML" not in synth_text and "invoke" not in synth_text:
|
|
response_text = synth_text
|
|
tokens_used += synth_data.get("usage", {}).get("total_tokens", 0)
|
|
logger.info("\u2705 3rd LLM call synthesized clean response from tool results")
|
|
else:
|
|
response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected")
|
|
else:
|
|
response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected")
|
|
except Exception as synth_err:
|
|
logger.warning(f"3rd LLM call failed: {synth_err}")
|
|
response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected")
|
|
else:
|
|
response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected")
|
|
|
|
if not response_text:
|
|
logger.warning(f"⚠️ {cloud['name'].upper()} returned empty response after tool call")
|
|
# Fallback to tool result summary
|
|
response_text = format_tool_calls_for_response(tool_results, fallback_mode="empty_response")
|
|
tokens_used += final_data.get("usage", {}).get("total_tokens", 0)
|
|
else:
|
|
logger.error(f"❌ {cloud['name'].upper()} second call failed: {final_resp.status_code} - {final_resp.text[:200]}")
|
|
# Fallback to tool result summary
|
|
response_text = format_tool_calls_for_response(tool_results, fallback_mode="empty_response")
|
|
|
|
if response_text:
|
|
# FINAL DSML check before returning - never show DSML to user
|
|
if "DSML" in response_text or "invoke name=" in response_text or "function_calls>" in response_text:
|
|
logger.warning(f"🧹 DSML in final response! Replacing with fallback ({len(response_text)} chars)")
|
|
# Use dsml_detected mode - LLM confused, just acknowledge presence
|
|
response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected")
|
|
|
|
# Check if any tool generated an image
|
|
generated_image = None
|
|
logger.debug(f"🔍 Checking {len(tool_results)} tool results for images...")
|
|
for tr in tool_results:
|
|
img_b64 = tr.get("image_base64")
|
|
if img_b64:
|
|
generated_image = img_b64
|
|
logger.info(f"🖼️ Image generated by tool: {tr['name']} ({len(img_b64)} chars)")
|
|
break
|
|
else:
|
|
logger.debug(f" Tool {tr['name']}: no image_base64")
|
|
|
|
logger.info(f"✅ {cloud['name'].upper()} response received, {tokens_used} tokens")
|
|
|
|
# Store message in agent-specific memory (async, non-blocking)
|
|
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id:
|
|
asyncio.create_task(
|
|
memory_retrieval.store_message(
|
|
agent_id=request_agent_id,
|
|
user_id=user_id,
|
|
username=username,
|
|
message_text=request.prompt,
|
|
response_text=response_text,
|
|
chat_id=chat_id
|
|
)
|
|
)
|
|
|
|
return InferResponse(
|
|
response=response_text,
|
|
model=cloud["model"],
|
|
tokens_used=tokens_used,
|
|
backend=f"{cloud['name']}-cloud",
|
|
image_base64=generated_image
|
|
)
|
|
else:
|
|
logger.warning(f"⚠️ {cloud['name'].upper()} returned empty response, trying next provider")
|
|
continue
|
|
else:
|
|
logger.warning(f"⚠️ {cloud['name'].upper()} returned {cloud_resp.status_code}, trying next...")
|
|
last_error = f"{cloud['name']}: {cloud_resp.status_code}"
|
|
continue
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
error_details = traceback.format_exc()
|
|
logger.warning(f"⚠️ {cloud['name'].upper()} failed: {e}")
|
|
if not str(e).strip(): # Empty error string
|
|
logger.error(f"❌ {cloud['name'].upper()} failed with empty error! Check traceback:")
|
|
logger.error(error_details)
|
|
else:
|
|
logger.debug(f"Full error traceback: {error_details}")
|
|
last_error = f"{cloud['name']}: {str(e)}"
|
|
continue
|
|
|
|
# If all cloud providers failed, log and fall through to local
|
|
if last_error:
|
|
logger.warning(f"⚠️ All cloud providers failed ({last_error}), falling back to local Ollama")
|
|
|
|
# =========================================================================
|
|
# LOCAL PROVIDERS (Ollama via Swapper)
|
|
# =========================================================================
|
|
# Determine local model from config (not hardcoded)
|
|
# Strategy: Use agent's default_llm if it's local (ollama), otherwise find first local model
|
|
local_model = None
|
|
|
|
# Check if default_llm is local
|
|
if llm_profile.get("provider") == "ollama":
|
|
# Extract model name and convert format (qwen3:8b → qwen3:8b for Swapper)
|
|
ollama_model = llm_profile.get("model", "qwen3:8b")
|
|
local_model = ollama_model.replace(":", "-") # qwen3:8b → qwen3:8b
|
|
logger.debug(f"✅ Using agent's default local model: {local_model}")
|
|
else:
|
|
# Find first local model from config
|
|
for profile_name, profile in llm_profiles.items():
|
|
if profile.get("provider") == "ollama":
|
|
ollama_model = profile.get("model", "qwen3:8b")
|
|
local_model = ollama_model.replace(":", "-")
|
|
logger.info(f"🔄 Found fallback local model: {local_model} from profile {profile_name}")
|
|
break
|
|
|
|
# Final fallback if no local model found
|
|
if not local_model:
|
|
local_model = "qwen3:8b"
|
|
logger.warning(f"⚠️ No local model in config, using hardcoded fallback: {local_model}")
|
|
|
|
try:
|
|
# Check if Swapper is available
|
|
health_resp = await http_client.get(f"{SWAPPER_URL}/health", timeout=5.0)
|
|
if health_resp.status_code == 200:
|
|
logger.info(f"📡 Calling Swapper with local model: {local_model}")
|
|
# Generate response via Swapper (which handles model loading)
|
|
generate_resp = await http_client.post(
|
|
f"{SWAPPER_URL}/generate",
|
|
json={
|
|
"model": local_model,
|
|
"prompt": request.prompt,
|
|
"system": system_prompt,
|
|
"max_tokens": request.max_tokens,
|
|
"temperature": request.temperature,
|
|
"stream": False
|
|
},
|
|
timeout=300.0
|
|
)
|
|
|
|
if generate_resp.status_code == 200:
|
|
data = generate_resp.json()
|
|
local_response = data.get("response", "")
|
|
|
|
# Store in agent-specific memory
|
|
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id and local_response:
|
|
asyncio.create_task(
|
|
memory_retrieval.store_message(
|
|
agent_id=request_agent_id,
|
|
user_id=user_id,
|
|
username=username,
|
|
message_text=request.prompt,
|
|
response_text=local_response,
|
|
chat_id=chat_id
|
|
)
|
|
)
|
|
|
|
return InferResponse(
|
|
response=local_response,
|
|
model=local_model,
|
|
tokens_used=data.get("eval_count", 0),
|
|
backend="swapper+ollama"
|
|
)
|
|
else:
|
|
logger.error(f"❌ Swapper error: {generate_resp.status_code} - {generate_resp.text}")
|
|
except Exception as e:
|
|
logger.error(f"❌ Swapper/Ollama error: {e}")
|
|
# Fallback to direct Ollama if Swapper fails
|
|
try:
|
|
logger.info(f"🔄 Falling back to direct Ollama connection")
|
|
generate_resp = await http_client.post(
|
|
f"{VISION_URL}/api/generate",
|
|
json={
|
|
"model": "qwen3:8b", # Use actual Ollama model name
|
|
"prompt": request.prompt,
|
|
"system": system_prompt,
|
|
"stream": False,
|
|
"options": {
|
|
"num_predict": request.max_tokens,
|
|
"temperature": request.temperature
|
|
}
|
|
},
|
|
timeout=120.0
|
|
)
|
|
|
|
if generate_resp.status_code == 200:
|
|
data = generate_resp.json()
|
|
return InferResponse(
|
|
response=data.get("response", ""),
|
|
model=model,
|
|
tokens_used=data.get("eval_count", 0),
|
|
backend="ollama-direct"
|
|
)
|
|
except Exception as e2:
|
|
logger.error(f"❌ Direct Ollama fallback also failed: {e2}")
|
|
|
|
# Fallback: return error
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail=f"No backend available for model: {model}"
|
|
)
|
|
|
|
|
|
@app.get("/v1/models")
|
|
async def list_available_models():
|
|
"""List all available models across backends"""
|
|
models = []
|
|
|
|
# Get Swapper models
|
|
try:
|
|
resp = await http_client.get(f"{SWAPPER_URL}/models", timeout=5.0)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
for m in data.get("models", []):
|
|
models.append({
|
|
"id": m.get("name"),
|
|
"backend": "swapper",
|
|
"size_gb": m.get("size_gb"),
|
|
"status": m.get("status", "available")
|
|
})
|
|
except Exception as e:
|
|
logger.warning(f"Cannot get Swapper models: {e}")
|
|
|
|
# Get Ollama models
|
|
try:
|
|
resp = await http_client.get(f"{VISION_URL}/api/tags", timeout=5.0)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
for m in data.get("models", []):
|
|
# Avoid duplicates
|
|
model_name = m.get("name")
|
|
if not any(x.get("id") == model_name for x in models):
|
|
models.append({
|
|
"id": model_name,
|
|
"backend": "ollama",
|
|
"size_gb": round(m.get("size", 0) / 1e9, 1),
|
|
"status": "loaded"
|
|
})
|
|
except Exception as e:
|
|
logger.warning(f"Cannot get Ollama models: {e}")
|
|
|
|
return {"models": models, "total": len(models)}
|
|
|
|
|
|
# =============================================================================
|
|
# NEO4J GRAPH API ENDPOINTS
|
|
# =============================================================================
|
|
|
|
class GraphNode(BaseModel):
|
|
"""Model for creating/updating a graph node"""
|
|
label: str # Node type: User, Agent, Topic, Fact, Entity, etc.
|
|
properties: Dict[str, Any]
|
|
node_id: Optional[str] = None # If provided, update existing node
|
|
|
|
class GraphRelationship(BaseModel):
|
|
"""Model for creating a relationship between nodes"""
|
|
from_node_id: str
|
|
to_node_id: str
|
|
relationship_type: str # KNOWS, MENTIONED, RELATED_TO, CREATED_BY, etc.
|
|
properties: Optional[Dict[str, Any]] = None
|
|
|
|
class GraphQuery(BaseModel):
|
|
"""Model for querying the graph"""
|
|
cypher: Optional[str] = None # Direct Cypher query (advanced)
|
|
# Or use structured query:
|
|
node_label: Optional[str] = None
|
|
node_id: Optional[str] = None
|
|
relationship_type: Optional[str] = None
|
|
depth: int = 1 # How many hops to traverse
|
|
limit: int = 50
|
|
|
|
class GraphSearchRequest(BaseModel):
|
|
"""Natural language search in graph"""
|
|
query: str
|
|
entity_types: Optional[List[str]] = None # Filter by types
|
|
limit: int = 20
|
|
|
|
|
|
@app.post("/v1/graph/nodes")
|
|
async def create_graph_node(node: GraphNode):
|
|
"""Create or update a node in the knowledge graph"""
|
|
if not neo4j_available or not neo4j_driver:
|
|
raise HTTPException(status_code=503, detail="Neo4j not available")
|
|
|
|
try:
|
|
async with neo4j_driver.session() as session:
|
|
# Generate node_id if not provided
|
|
node_id = node.node_id or f"{node.label.lower()}_{os.urandom(8).hex()}"
|
|
|
|
# Build properties with node_id
|
|
props = {**node.properties, "node_id": node_id, "updated_at": "datetime()"}
|
|
|
|
# Create or merge node
|
|
cypher = f"""
|
|
MERGE (n:{node.label} {{node_id: $node_id}})
|
|
SET n += $properties
|
|
SET n.updated_at = datetime()
|
|
RETURN n
|
|
"""
|
|
|
|
result = await session.run(cypher, node_id=node_id, properties=node.properties)
|
|
record = await result.single()
|
|
|
|
if record:
|
|
created_node = dict(record["n"])
|
|
logger.info(f"📊 Created/updated node: {node.label} - {node_id}")
|
|
return {"status": "ok", "node_id": node_id, "node": created_node}
|
|
|
|
raise HTTPException(status_code=500, detail="Failed to create node")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Neo4j error creating node: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/v1/graph/relationships")
|
|
async def create_graph_relationship(rel: GraphRelationship):
|
|
"""Create a relationship between two nodes"""
|
|
if not neo4j_available or not neo4j_driver:
|
|
raise HTTPException(status_code=503, detail="Neo4j not available")
|
|
|
|
try:
|
|
async with neo4j_driver.session() as session:
|
|
props_clause = ""
|
|
if rel.properties:
|
|
props_clause = " SET r += $properties"
|
|
|
|
cypher = f"""
|
|
MATCH (a {{node_id: $from_id}})
|
|
MATCH (b {{node_id: $to_id}})
|
|
MERGE (a)-[r:{rel.relationship_type}]->(b)
|
|
{props_clause}
|
|
SET r.created_at = datetime()
|
|
RETURN a.node_id as from_id, b.node_id as to_id, type(r) as rel_type
|
|
"""
|
|
|
|
result = await session.run(
|
|
cypher,
|
|
from_id=rel.from_node_id,
|
|
to_id=rel.to_node_id,
|
|
properties=rel.properties or {}
|
|
)
|
|
record = await result.single()
|
|
|
|
if record:
|
|
logger.info(f"🔗 Created relationship: {rel.from_node_id} -[{rel.relationship_type}]-> {rel.to_node_id}")
|
|
return {
|
|
"status": "ok",
|
|
"from_id": record["from_id"],
|
|
"to_id": record["to_id"],
|
|
"relationship_type": record["rel_type"]
|
|
}
|
|
|
|
raise HTTPException(status_code=404, detail="One or both nodes not found")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Neo4j error creating relationship: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/v1/graph/query")
|
|
async def query_graph(query: GraphQuery):
|
|
"""Query the knowledge graph"""
|
|
if not neo4j_available or not neo4j_driver:
|
|
raise HTTPException(status_code=503, detail="Neo4j not available")
|
|
|
|
try:
|
|
async with neo4j_driver.session() as session:
|
|
# If direct Cypher provided, use it (with safety check)
|
|
if query.cypher:
|
|
# Basic safety: only allow read queries
|
|
if any(kw in query.cypher.upper() for kw in ["DELETE", "REMOVE", "DROP", "CREATE", "MERGE", "SET"]):
|
|
raise HTTPException(status_code=400, detail="Only read queries allowed via cypher parameter")
|
|
|
|
result = await session.run(query.cypher)
|
|
records = await result.data()
|
|
return {"status": "ok", "results": records, "count": len(records)}
|
|
|
|
# Build structured query
|
|
if query.node_id:
|
|
# Get specific node with relationships
|
|
cypher = f"""
|
|
MATCH (n {{node_id: $node_id}})
|
|
OPTIONAL MATCH (n)-[r]-(related)
|
|
RETURN n, collect({{rel: type(r), node: related}}) as connections
|
|
LIMIT 1
|
|
"""
|
|
result = await session.run(cypher, node_id=query.node_id)
|
|
|
|
elif query.node_label:
|
|
# Get nodes by label
|
|
cypher = f"""
|
|
MATCH (n:{query.node_label})
|
|
RETURN n
|
|
ORDER BY n.updated_at DESC
|
|
LIMIT $limit
|
|
"""
|
|
result = await session.run(cypher, limit=query.limit)
|
|
|
|
else:
|
|
# Get recent nodes
|
|
cypher = """
|
|
MATCH (n)
|
|
RETURN n, labels(n) as labels
|
|
ORDER BY n.updated_at DESC
|
|
LIMIT $limit
|
|
"""
|
|
result = await session.run(cypher, limit=query.limit)
|
|
|
|
records = await result.data()
|
|
return {"status": "ok", "results": records, "count": len(records)}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Neo4j query error: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.get("/v1/graph/search")
|
|
async def search_graph(q: str, types: Optional[str] = None, limit: int = 20):
|
|
"""Search nodes by text in properties"""
|
|
if not neo4j_available or not neo4j_driver:
|
|
raise HTTPException(status_code=503, detail="Neo4j not available")
|
|
|
|
try:
|
|
async with neo4j_driver.session() as session:
|
|
# Build label filter
|
|
label_filter = ""
|
|
if types:
|
|
labels = [t.strip() for t in types.split(",")]
|
|
label_filter = " AND (" + " OR ".join([f"n:{l}" for l in labels]) + ")"
|
|
|
|
# Search in common text properties
|
|
cypher = f"""
|
|
MATCH (n)
|
|
WHERE (
|
|
n.name CONTAINS $query OR
|
|
n.title CONTAINS $query OR
|
|
n.text CONTAINS $query OR
|
|
n.description CONTAINS $query OR
|
|
n.content CONTAINS $query
|
|
){label_filter}
|
|
RETURN n, labels(n) as labels
|
|
ORDER BY n.updated_at DESC
|
|
LIMIT $limit
|
|
"""
|
|
|
|
result = await session.run(cypher, query=q, limit=limit)
|
|
records = await result.data()
|
|
|
|
return {"status": "ok", "query": q, "results": records, "count": len(records)}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Neo4j search error: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.get("/v1/graph/stats")
|
|
async def get_graph_stats():
|
|
"""Get knowledge graph statistics"""
|
|
if not neo4j_available or not neo4j_driver:
|
|
raise HTTPException(status_code=503, detail="Neo4j not available")
|
|
|
|
try:
|
|
async with neo4j_driver.session() as session:
|
|
# Get node counts by label
|
|
labels_result = await session.run("""
|
|
CALL db.labels() YIELD label
|
|
CALL apoc.cypher.run('MATCH (n:`' + label + '`) RETURN count(n) as count', {}) YIELD value
|
|
RETURN label, value.count as count
|
|
""")
|
|
|
|
# If APOC not available, use simpler query
|
|
try:
|
|
labels_data = await labels_result.data()
|
|
except:
|
|
labels_result = await session.run("""
|
|
MATCH (n)
|
|
RETURN labels(n)[0] as label, count(*) as count
|
|
ORDER BY count DESC
|
|
""")
|
|
labels_data = await labels_result.data()
|
|
|
|
# Get relationship counts
|
|
rels_result = await session.run("""
|
|
MATCH ()-[r]->()
|
|
RETURN type(r) as type, count(*) as count
|
|
ORDER BY count DESC
|
|
""")
|
|
rels_data = await rels_result.data()
|
|
|
|
# Get total counts
|
|
total_result = await session.run("""
|
|
MATCH (n) RETURN count(n) as nodes
|
|
""")
|
|
total_nodes = (await total_result.single())["nodes"]
|
|
|
|
total_rels_result = await session.run("""
|
|
MATCH ()-[r]->() RETURN count(r) as relationships
|
|
""")
|
|
total_rels = (await total_rels_result.single())["relationships"]
|
|
|
|
return {
|
|
"status": "ok",
|
|
"total_nodes": total_nodes,
|
|
"total_relationships": total_rels,
|
|
"nodes_by_label": labels_data,
|
|
"relationships_by_type": rels_data
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Neo4j stats error: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown_event():
|
|
"""Cleanup connections on shutdown"""
|
|
global neo4j_driver, http_client, nc
|
|
|
|
# Close Memory Retrieval
|
|
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval:
|
|
try:
|
|
await memory_retrieval.close()
|
|
logger.info("🔌 Memory Retrieval closed")
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Memory Retrieval close error: {e}")
|
|
|
|
if neo4j_driver:
|
|
await neo4j_driver.close()
|
|
logger.info("🔌 Neo4j connection closed")
|
|
|
|
if http_client:
|
|
await http_client.aclose()
|
|
logger.info("🔌 HTTP client closed")
|
|
|
|
if nc:
|
|
await nc.close()
|
|
logger.info("🔌 NATS connection closed")
|
|
|