Files
microdao-daarion/services/router/main.py
Apple 89c3f2ac66 P1: NCS-first model selection + NATS capabilities + Grok 4.1
Router model selection:
- New model_select.py: resolve_effective_profile → profile_requirements →
  select_best_model pipeline. NCS-first with graceful static fallback.
- selection_policies in router-config.node2.yml define prefer order per
  profile without hardcoding models (e.g. local_default_coder prefers
  qwen3:14b then qwen3.5:35b-a3b).
- Cloud profiles (cloud_grok, cloud_deepseek) skip NCS; on cloud failure
  use fallback_profile via NCS for local selection.
- Structured logs: selected_profile, required_type, runtime, model,
  caps_age_s, fallback_reason on every infer request.

Grok model fix:
- grok-2-1212 no longer exists on xAI API → updated to
  grok-4-1-fast-reasoning across all 3 hardcoded locations in main.py
  and router-config.node2.yml.

NCS NATS request/reply:
- node-capabilities subscribes to node.noda2.capabilities.get (NATS
  request/reply). Enabled via ENABLE_NATS_CAPS=true in compose.
- NODA1 router can query NODA2 capabilities over NATS leafnode without
  HTTP connectivity.

Verified:
- NCS: 14 served models from Ollama+Swapper+llama-server
- NATS: request/reply returns full capabilities JSON
- Sofiia: cloud_grok → grok-4-1-fast-reasoning (tested, 200 OK)
- Helion: NCS → qwen3:14b via Ollama (caps_age=23.7s cache hit)
- Router health: ok

Made-with: Cursor
2026-02-27 02:17:34 -08:00

2906 lines
126 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import Response
from pydantic import BaseModel, ConfigDict
from typing import Literal, Optional, Dict, Any, List
import asyncio
import json
import os
import re
import yaml
import httpx
import logging
import hashlib
import time # For latency metrics
# CrewAI Integration
try:
from crewai_client import should_use_crewai, call_crewai, get_crewai_health
CREWAI_CLIENT_AVAILABLE = True
except ImportError:
CREWAI_CLIENT_AVAILABLE = False
should_use_crewai = None
call_crewai = None
from neo4j import AsyncGraphDatabase
# Memory Retrieval Pipeline v3.0
try:
from memory_retrieval import memory_retrieval, MemoryBrief
MEMORY_RETRIEVAL_AVAILABLE = True
except ImportError:
MEMORY_RETRIEVAL_AVAILABLE = False
memory_retrieval = None
# Tool Manager for Function Calling
try:
from tool_manager import ToolManager, ToolResult, format_tool_calls_for_response
TOOL_MANAGER_AVAILABLE = True
except ImportError:
TOOL_MANAGER_AVAILABLE = False
ToolManager = None
# Runtime Guard (Envelope/Artifact validation for CLAN orchestration)
try:
from runtime_guard import RuntimeGuard
RUNTIME_GUARD_AVAILABLE = True
except ImportError:
RUNTIME_GUARD_AVAILABLE = False
RuntimeGuard = None
# NCS-first model selection
try:
import capabilities_client
from model_select import select_model_for_agent, ModelSelection, CLOUD_PROVIDERS as NCS_CLOUD_PROVIDERS
NCS_AVAILABLE = True
except ImportError:
NCS_AVAILABLE = False
capabilities_client = None # type: ignore[assignment]
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
TRUSTED_DOMAINS_CONFIG_PATH = os.getenv("TRUSTED_DOMAINS_CONFIG_PATH", "./trusted_domains.yml")
_trusted_domains_cache: Dict[str, Any] = {"mtime": None, "data": {}}
def _load_trusted_domains_overrides() -> Dict[str, Any]:
"""Load optional trusted domains overrides editable by mentors."""
global _trusted_domains_cache
try:
if not os.path.exists(TRUSTED_DOMAINS_CONFIG_PATH):
return {}
mtime = os.path.getmtime(TRUSTED_DOMAINS_CONFIG_PATH)
if _trusted_domains_cache.get("mtime") == mtime:
return _trusted_domains_cache.get("data") or {}
with open(TRUSTED_DOMAINS_CONFIG_PATH, "r", encoding="utf-8") as f:
raw = yaml.safe_load(f) or {}
if not isinstance(raw, dict):
raw = {}
_trusted_domains_cache = {"mtime": mtime, "data": raw}
return raw
except Exception as e:
logger.warning(f"⚠️ Failed to load trusted domains overrides: {e}")
return {}
def _strip_dsml_keep_text_before(text: str) -> str:
"""If response contains DSML, return only the part before the first DSML-like tag. Otherwise return empty (caller will use fallback)."""
if not text or len(text.strip()) < 10:
return ""
# Find first occurrence of DSML-like patterns (tag or keyword that starts markup)
dsml_start_patterns = [
r"<function_calls",
r"<invoke\s",
r"<parameter\s",
r"<think>",
# DSML variants (ASCII and Unicode separators, e.g. <DSMLinvoke ...>)
r"<\s*(?:\||)?\s*DSML",
r"DSML\s*(?:\||)",
r"DSML\s*>\s*",
]
earliest = len(text)
for pat in dsml_start_patterns:
m = re.search(pat, text, re.IGNORECASE | re.DOTALL)
if m:
earliest = min(earliest, m.start())
if earliest == 0:
return ""
prefix = text[:earliest].strip()
# Remove trailing incomplete tags
prefix = re.sub(r"<[^>]*$", "", prefix).strip()
return prefix if len(prefix) > 30 else ""
def _vision_prompt_wants_web(prompt: str) -> bool:
if not prompt:
return False
p = prompt.lower()
markers = [
"знайди", "пошукай", "пошук", "в інтернет", "в інтернеті", "у відкритих джерелах",
"що це", "що на фото", "який це", "яка це", "identify", "find online", "search web",
"назва", "бренд", "виробник", "інструкція", "дозування", "регламент", "де купити", "ціна",
]
return any(m in p for m in markers)
def _vision_answer_uncertain(answer: str) -> bool:
if not answer:
return True
a = answer.lower()
uncertain_markers = [
"ймовірно", "можливо", "схоже", "не впевнений", "не можу визначити", "важко сказати",
"probably", "maybe", "looks like", "not sure", "cannot identify"
]
return any(m in a for m in uncertain_markers)
EMPTY_ANSWER_GUARD_AGENTS = {"devtools", "monitor"}
def _normalize_text_response(text: str) -> str:
return re.sub(r"\s+", " ", str(text or "")).strip()
def _needs_empty_answer_recovery(text: str) -> bool:
normalized = _normalize_text_response(text)
if not normalized:
return True
low = normalized.lower()
if len(normalized) < 8:
return True
meta_markers = (
"the user", "user asked", "i need", "let me", "analysis", "thinking",
"користувач", "потрібно", "спочатку", "сначала"
)
if any(m in low for m in meta_markers) and len(normalized) < 80:
return True
if normalized in {"...", "ok", "done"}:
return True
return False
def _image_response_needs_retry(text: str) -> bool:
normalized = _normalize_text_response(text)
if _needs_empty_answer_recovery(normalized):
return True
low = normalized.lower()
blocked_markers = (
"не можу бачити", "не можу аналізувати зображення", "опишіть фото словами",
"cannot view images", "cannot analyze image", "as a text model"
)
if any(m in low for m in blocked_markers):
return True
return len(normalized) < 24
def _vision_response_is_blurry(text: str) -> bool:
low = _normalize_text_response(text).lower()
if not low:
return False
blurry_markers = (
"розмит", "нечітк", "не дуже чітк", "blur", "blurry", "out of focus", "low quality"
)
return any(m in low for m in blurry_markers)
def _build_image_fallback_response(agent_id: str, prompt: str = "") -> str:
if (agent_id or "").lower() == "agromatrix":
return (
"Фото поки занадто нечітке, тому діагноз неточний. "
"Надішли, будь ласка, 2-3 чіткі фото: загальний вигляд рослини, крупний план проблемної ділянки "
"і (для листка) нижній бік. Якщо можеш, додай культуру та стадію росту."
)
return "Я поки не бачу достатньо деталей на фото. Надішли, будь ласка, чіткіше фото або крупний план об'єкта."
def _sanitize_vision_text_for_user(text: str) -> str:
if not text:
return ""
normalized = re.sub(r"\s+", " ", str(text)).strip()
if not normalized:
return ""
sentences = [seg.strip() for seg in re.split(r"(?<=[.!?])\s+", normalized) if seg.strip()]
meta_markers = (
"okay", "the user", "user sent", "they want", "i need", "let me", "i will",
"first, look at the image", "look at the image", "first, analyze",
"first, looking at the image", "looking at the image",
"хорошо", "користувач", "пользователь", "потрібно", "нужно", "спочатку", "сначала"
)
cleaned = [sent for sent in sentences if not any(m in sent.lower() for m in meta_markers)]
if cleaned:
out = " ".join(cleaned[:3]).strip()
else:
# If text is only meta-reasoning, prefer empty over leaking service text to user.
if any(m in normalized.lower() for m in meta_markers):
return ""
out = " ".join(sentences[:3]).strip()
if len(out) > 700:
out = out[:700].rsplit(" ", 1)[0] + "..."
return out
def _extract_vision_search_facts(text: str, max_chars: int = 220) -> str:
fact = _sanitize_vision_text_for_user(text)
# If sanitizer dropped everything (meta-only), try to recover object phrase.
if not fact and text:
raw = re.sub(r"\s+", " ", str(text)).strip()
raw = re.sub(r"(?i)^okay,?\s*", "", raw)
raw = re.sub(r"(?i)^let\'s\s+see\.?\s*", "", raw)
raw = re.sub(r"(?i)^the user sent (an image|a photo|a picture) of\s+", "", raw)
raw = re.sub(r"(?i)^user sent (an image|a photo|a picture) of\s+", "", raw)
raw = re.sub(r"(?i)^an image of\s+", "", raw)
raw = re.sub(r"(?i)they want.*$", "", raw).strip(" .")
fact = raw
if not fact:
return ""
fact = re.sub(r"(?i)джерела\s*:\s*.*$", "", fact).strip()
fact = re.sub(r"[*_`#\[\]()]", "", fact)
fact = re.sub(r"\s{2,}", " ", fact).strip(" .,")
if len(fact) > max_chars:
fact = fact[:max_chars].rsplit(" ", 1)[0]
return fact
def _build_vision_web_query(prompt: str, vision_text: str) -> str:
# Keep query compact and deterministic for web_search tool.
source_intent = any(k in (prompt or "").lower() for k in ("джерел", "підтвердж", "source", "reference"))
prompt_part = (prompt or "").strip()
# Remove generic question wrappers that pollute search quality.
prompt_part = re.sub(r"(?i)що\s*це\s*на\s*фото\??", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)дай\s*2-?3\s*джерела", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)дай\s*\d+\s*джерел[а-я]*\s*для", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)дай\s*\d+\s*джерел[а-я]*", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)знайди\s*в\s*інтернеті", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)знайди\s*в\s*інтернеті\s*схожі\s*джерела", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)підтвердження", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)якщо\s*не\s*впевнений.*$", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?i)пошукай.*$", "", prompt_part).strip(" .")
prompt_part = re.sub(r"(?iu)\bі\b\.?$", "", prompt_part).strip(" .")
vision_part = _extract_vision_search_facts(vision_text)
if vision_part:
tokens = re.findall(r"[a-zA-Zа-яА-ЯіїєІЇЄ0-9]{3,}", vision_part.lower())
generic_tokens = {
"first", "look", "image", "photo", "picture", "context", "the", "and",
"спочатку", "подивись", "зображення", "фото", "картинка", "контекст",
}
if len(tokens) < 3 or len(vision_part) < 18 or all(t in generic_tokens for t in tokens):
# Too vague entity extraction (e.g., single word "rex") -> skip web enrichment.
return ""
if vision_part:
if prompt_part:
q = f"{vision_part}. контекст: {prompt_part}".strip(" .")
else:
q = vision_part
if source_intent:
q = f"{q} wikipedia encyclopedia"
return q.strip()
return ""
def _compact_web_search_result(raw: str, query: str = "", agent_id: str = "", max_chars: int = 900) -> str:
if not raw:
return ""
text = str(raw).strip()
if not text:
return ""
def _extract_domain(url: str) -> str:
if not url:
return ""
d = url.lower().strip()
d = d.replace("https://", "").replace("http://", "")
d = d.split("/")[0]
if d.startswith("www."):
d = d[4:]
return d
low_signal_tokens = (
"grid maker", "converter", "convert", "download", "wallpaper", "stock photo",
"instagram", "pinterest", "tiktok", "youtube", "facebook", "generator", "meme",
)
low_signal_domains = (
"pinterest.com", "instagram.com", "tiktok.com", "youtube.com",
"facebook.com", "vk.com", "yandex.", "stackexchange.com",
"zhihu.com", "baidu.com",
)
trusted_common_domains = (
"wikipedia.org", "wikidata.org", "britannica.com",
"who.int", "fao.org", "oecd.org", "worldbank.org", "un.org", "europa.eu",
"nature.com", "science.org", "sciencedirect.com", "springer.com",
)
trusted_agro_domains = (
"fao.org", "europa.eu", "ec.europa.eu", "usda.gov", "nass.usda.gov",
"ukragroconsult.com", "minagro.gov.ua", "rada.gov.ua", "kmu.gov.ua",
"agroportal.ua", "latifundist.com", "kurkul.com",
)
trusted_by_agent = {
"agromatrix": trusted_agro_domains,
"alateya": (
"europa.eu", "un.org", "worldbank.org", "oecd.org",
),
"clan": (
"europa.eu", "un.org", "wikipedia.org",
),
"daarwizz": (
"openai.com", "anthropic.com", "mistral.ai", "huggingface.co",
"python.org", "github.com",
),
"devtools": (
"github.com", "docs.python.org", "pypi.org", "docker.com",
"kubernetes.io", "fastapi.tiangolo.com", "postgresql.org",
),
"druid": (
"who.int", "nih.gov", "ncbi.nlm.nih.gov", "wikipedia.org",
),
"eonarch": (
"iea.org", "irena.org", "entsoe.eu", "europa.eu", "worldbank.org",
),
"greenfood": (
"fao.org", "who.int", "efsa.europa.eu", "usda.gov", "ec.europa.eu",
),
"senpai": (
"binance.com", "bybit.com", "coinbase.com", "kraken.com",
"coindesk.com", "cointelegraph.com", "tradingview.com",
"cftc.gov", "sec.gov", "esma.europa.eu",
),
"sofiia": (
"who.int", "nih.gov", "ncbi.nlm.nih.gov", "ema.europa.eu",
"fda.gov", "mayoclinic.org", "nhs.uk",
),
"helion": (
"iea.org", "irena.org", "entsoe.eu", "europa.eu", "worldbank.org",
),
"nutra": (
"fao.org", "who.int", "efsa.europa.eu", "fda.gov",
),
"microdao_orchestrator": (
"openai.com", "anthropic.com", "mistral.ai", "github.com",
"europa.eu", "un.org", "worldbank.org",
),
"monitor": (
"grafana.com", "prometheus.io", "elastic.co", "datadoghq.com",
"opentelemetry.io",
),
"soul": (
"who.int", "nih.gov", "ncbi.nlm.nih.gov", "wikipedia.org",
),
"yaromir": (
"europa.eu", "un.org", "worldbank.org", "wikipedia.org",
),
}
def _norm_domain_entry(value: Any) -> str:
if isinstance(value, dict):
value = value.get("url") or value.get("domain") or ""
value = str(value or "").strip().lower()
if not value:
return ""
value = value.replace("https://", "").replace("http://", "")
value = value.split("/")[0]
if value.startswith("www."):
value = value[4:]
return value
def _norm_domain_list(values: Any) -> List[str]:
out: List[str] = []
if not isinstance(values, list):
return out
for v in values:
d = _norm_domain_entry(v)
if d:
out.append(d)
return out
overrides = _load_trusted_domains_overrides()
extra_low_signal = _norm_domain_list(overrides.get("low_signal_domains"))
if extra_low_signal:
low_signal_domains = tuple(dict.fromkeys([*low_signal_domains, *extra_low_signal]))
extra_common = _norm_domain_list(overrides.get("common_domains"))
if extra_common:
trusted_common_domains = tuple(dict.fromkeys([*trusted_common_domains, *extra_common]))
agents_overrides = overrides.get("agents") if isinstance(overrides.get("agents"), dict) else {}
for a, cfg in agents_overrides.items():
if not isinstance(cfg, dict):
continue
doms = _norm_domain_list(cfg.get("domains"))
if doms:
base = trusted_by_agent.get(str(a).lower(), ())
merged = tuple(dict.fromkeys([*base, *doms]))
trusted_by_agent[str(a).lower()] = merged
agro_query_terms = {
"агро", "agro", "crop", "crops", "fertilizer", "fertilizers",
"field", "soil", "harvest", "yield", "pesticide", "herbicide",
"farm", "farming", "tractor", "зерно", "пшениц", "кукурудз",
"соняшник", "ріпак", "врожай", "ґрунт", "поле", "добрив",
"насіння", "ззр", "фермер",
}
query_terms = {t for t in re.findall(r"[a-zA-Zа-яА-ЯіїєІЇЄ0-9]{3,}", (query or "").lower())}
agro_mode = any(any(k in term for k in agro_query_terms) for term in query_terms)
agent_trusted_domains = trusted_by_agent.get((agent_id or "").lower(), ())
# Parse bullet blocks from tool output.
chunks = []
current = []
for line in text.splitlines():
ln = line.rstrip()
if ln.startswith("- ") and current:
chunks.append("\n".join(current))
current = [ln]
else:
current.append(ln)
if current:
chunks.append("\n".join(current))
scored = []
for chunk in chunks:
lines = [ln.strip() for ln in chunk.splitlines() if ln.strip()]
title = lines[0][2:].strip() if lines and lines[0].startswith("- ") else (lines[0] if lines else "")
url_line = next((ln for ln in lines if ln.lower().startswith("url:")), "")
url = url_line.split(":", 1)[1].strip() if ":" in url_line else ""
domain = _extract_domain(url)
text_blob = " ".join(lines).lower()
if any(x in domain for x in low_signal_domains):
continue
score = 0
for t in query_terms:
if t in text_blob:
score += 2
if any(tok in text_blob for tok in low_signal_tokens):
score -= 3
if domain.endswith(".gov") or domain.endswith(".gov.ua") or domain.endswith(".edu"):
score += 2
if any(domain == d or domain.endswith("." + d) for d in trusted_common_domains):
score += 2
if any(domain == d or domain.endswith("." + d) for d in agent_trusted_domains):
score += 2
if any(domain.endswith(d) for d in ("wikipedia.org", "wikidata.org", "fao.org", "europa.eu")):
score += 2
if agro_mode:
if any(domain == d or domain.endswith("." + d) for d in trusted_agro_domains):
score += 3
else:
score -= 1
if not url:
score -= 1
if len(title) < 6:
score -= 1
scored.append((score, domain, chunk))
def _is_trusted_agro(domain: str) -> bool:
if not domain:
return False
if any(domain == d or domain.endswith("." + d) for d in trusted_common_domains):
return True
return any(domain == d or domain.endswith("." + d) for d in trusted_agro_domains)
scored.sort(key=lambda x: x[0], reverse=True)
kept = []
seen_domains = set()
if agro_mode:
for s, domain, chunk in scored:
if s < 1 or not _is_trusted_agro(domain):
continue
if domain and domain in seen_domains:
continue
if domain:
seen_domains.add(domain)
kept.append(chunk)
if len(kept) >= 3:
break
if kept:
compact = "\n\n".join(kept).strip()
if len(compact) > max_chars:
compact = compact[:max_chars].rstrip() + "..."
return compact
for s, domain, chunk in scored:
if s < 2:
continue
if domain and domain in seen_domains:
continue
if domain:
seen_domains.add(domain)
kept.append(chunk)
if len(kept) >= 3:
break
if not kept:
return ""
compact = "\n\n".join(kept).strip()
if len(compact) > max_chars:
compact = compact[:max_chars].rstrip() + "..."
return compact
def _extract_sources_from_compact(compact: str, max_items: int = 3) -> List[Dict[str, str]]:
if not compact:
return []
items: List[Dict[str, str]] = []
chunks = [c for c in compact.split("\n\n") if c.strip()]
for chunk in chunks:
lines = [ln.strip() for ln in chunk.splitlines() if ln.strip()]
if not lines:
continue
title = lines[0][2:].strip() if lines[0].startswith("- ") else lines[0]
url_line = next((ln for ln in lines if ln.lower().startswith("url:")), "")
url = url_line.split(":", 1)[1].strip() if ":" in url_line else ""
if not url:
continue
items.append({"title": title[:180], "url": url[:500]})
if len(items) >= max_items:
break
return items
def _condition_matches(cond: Dict[str, Any], agent_id: str, metadata: Dict[str, Any]) -> bool:
"""Minimal matcher for router-config `when` conditions."""
if not isinstance(cond, dict):
return True
meta = metadata or {}
if "agent" in cond and cond.get("agent") != agent_id:
return False
if "mode" in cond and meta.get("mode") != cond.get("mode"):
return False
if "metadata_has" in cond:
key = cond.get("metadata_has")
if key not in meta:
return False
if "metadata_equals" in cond:
eq = cond.get("metadata_equals") or {}
for k, v in eq.items():
if meta.get(k) != v:
return False
if "task_type" in cond:
expected = cond.get("task_type")
actual = meta.get("task_type")
if isinstance(expected, list):
if actual not in expected:
return False
elif actual != expected:
return False
if "api_key_available" in cond:
env_name = cond.get("api_key_available")
if not (isinstance(env_name, str) and os.getenv(env_name)):
return False
if "and" in cond:
clauses = cond.get("and") or []
if not isinstance(clauses, list):
return False
for clause in clauses:
if not _condition_matches(clause, agent_id, meta):
return False
return True
def _select_default_llm(agent_id: str, metadata: Dict[str, Any], base_llm: str, routing_rules: List[Dict[str, Any]]) -> str:
"""Select LLM by first matching routing rule with `use_llm`."""
for rule in routing_rules:
when = rule.get("when", {})
if _condition_matches(when, agent_id, metadata):
use_llm = rule.get("use_llm")
if use_llm:
logger.info(f"🎯 Agent {agent_id} routing rule {rule.get('id', '<no-id>')} -> {use_llm}")
return use_llm
return base_llm
app = FastAPI(title="DAARION Router", version="2.0.0")
# Configuration
NATS_URL = os.getenv("NATS_URL", "nats://nats:4222")
SWAPPER_URL = os.getenv("SWAPPER_URL", "http://swapper-service:8890")
# All multimodal services now through Swapper
STT_URL = os.getenv("STT_URL", "http://swapper-service:8890") # Swapper /stt endpoint
TTS_URL = os.getenv("TTS_URL", "http://swapper-service:8890") # Swapper /tts endpoint
VISION_URL = os.getenv("VISION_URL", "http://172.18.0.1:11434") # Host Ollama
OCR_URL = os.getenv("OCR_URL", "http://swapper-service:8890") # Swapper /ocr endpoint
DOCUMENT_URL = os.getenv("DOCUMENT_URL", "http://swapper-service:8890") # Swapper /document endpoint
CITY_SERVICE_URL = os.getenv("CITY_SERVICE_URL", "http://daarion-city-service:7001")
# CrewAI Routing Configuration
CREWAI_ROUTING_ENABLED = os.getenv("CREWAI_ROUTING_ENABLED", "true").lower() == "true"
CREWAI_URL = os.getenv("CREWAI_URL", "http://dagi-staging-crewai-service:9010")
CLAN_RUNTIME_GUARD_ENABLED = os.getenv("CLAN_RUNTIME_GUARD_ENABLED", "true").lower() == "true"
CLAN_RUNTIME_GUARD_MODE = os.getenv("CLAN_RUNTIME_GUARD_MODE", "soft").lower()
CLAN_GUARD_TEST_MODE = os.getenv("CLAN_GUARD_TEST_MODE", "false").lower() == "true"
CLAN_RUNTIME_REGISTRY_PATH = os.getenv("CLAN_RUNTIME_REGISTRY_PATH", "/app/config/roles/clan/zhos/agents_registry.yaml")
CLAN_RUNTIME_ENVELOPE_SCHEMA_PATH = os.getenv("CLAN_RUNTIME_ENVELOPE_SCHEMA_PATH", "/app/docs/contracts/clan-envelope.schema.json")
CLAN_RUNTIME_ARTIFACT_SCHEMA_PATH = os.getenv("CLAN_RUNTIME_ARTIFACT_SCHEMA_PATH", "/app/docs/contracts/clan-artifact.schema.json")
CLAN_RUNTIME_CONSENT_EVENT_SCHEMA_PATH = os.getenv(
"CLAN_RUNTIME_CONSENT_EVENT_SCHEMA_PATH",
"/app/docs/contracts/clan-consent-event.schema.json",
)
# Neo4j Configuration
NEO4J_URI = os.getenv("NEO4J_BOLT_URL", "bolt://neo4j:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "DaarionNeo4j2026!")
# HTTP client for backend services
http_client: Optional[httpx.AsyncClient] = None
# Neo4j driver
neo4j_driver = None
neo4j_available = False
# NATS client
nc = None
nats_available = False
# Tool Manager
tool_manager = None
runtime_guard_engine = None
# Models
class FilterDecision(BaseModel):
channel_id: str
message_id: Optional[str] = None
matrix_event_id: str
microdao_id: str
decision: Literal["allow", "deny", "modify"]
target_agent_id: Optional[str] = None
rewrite_prompt: Optional[str] = None
class AgentInvocation(BaseModel):
agent_id: str
entrypoint: Literal["channel_message", "direct", "cron"] = "channel_message"
payload: Dict[str, Any]
# Load config
def load_config():
config_path = "router_config.yaml"
if os.path.exists(config_path):
with open(config_path, 'r') as f:
return yaml.safe_load(f)
return {
"messaging_inbound": {
"enabled": True,
"source_subject": "agent.filter.decision",
"target_subject": "router.invoke.agent"
}
}
def load_router_config():
"""Load main router-config.yml with agents and LLM profiles"""
# Try multiple locations
paths = [
"router-config.yml",
"/app/router-config.yml",
"../router-config.yml",
"../../router-config.yml"
]
for path in paths:
if os.path.exists(path):
with open(path, 'r') as f:
logger.info(f"✅ Loaded router config from {path}")
return yaml.safe_load(f)
logger.warning("⚠️ router-config.yml not found, using empty config")
return {"agents": {}}
config = load_config()
router_config = load_router_config()
@app.on_event("startup")
async def startup_event():
"""Initialize NATS connection and subscriptions"""
global nc, nats_available, http_client, neo4j_driver, neo4j_available, runtime_guard_engine
logger.info("🚀 DAGI Router v2.0.0 starting up...")
# Initialize HTTP client
http_client = httpx.AsyncClient(timeout=60.0)
logger.info("✅ HTTP client initialized")
# Initialize Neo4j connection
try:
neo4j_driver = AsyncGraphDatabase.driver(
NEO4J_URI,
auth=(NEO4J_USER, NEO4J_PASSWORD)
)
# Verify connection
async with neo4j_driver.session() as session:
result = await session.run("RETURN 1 as test")
await result.consume()
neo4j_available = True
logger.info(f"✅ Connected to Neo4j at {NEO4J_URI}")
except Exception as e:
logger.warning(f"⚠️ Neo4j not available: {e}")
neo4j_available = False
# Try to connect to NATS
try:
import nats
nc = await nats.connect(NATS_URL)
nats_available = True
logger.info(f"✅ Connected to NATS at {NATS_URL}")
# Subscribe to filter decisions if enabled
if config.get("messaging_inbound", {}).get("enabled", True):
asyncio.create_task(subscribe_to_filter_decisions())
else:
logger.warning("⚠️ Messaging inbound routing disabled in config")
except Exception as e:
logger.warning(f"⚠️ NATS not available: {e}")
logger.warning("⚠️ Running in test mode (HTTP only)")
nats_available = False
# Initialize Memory Retrieval Pipeline
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval:
try:
await memory_retrieval.initialize()
logger.info("✅ Memory Retrieval Pipeline initialized")
except Exception as e:
logger.warning(f"⚠️ Memory Retrieval init failed: {e}")
# Initialize Tool Manager for function calling
global tool_manager
if TOOL_MANAGER_AVAILABLE and ToolManager:
try:
tool_manager = ToolManager(router_config)
logger.info("✅ Tool Manager initialized with function calling")
except Exception as e:
logger.warning(f"⚠️ Tool Manager init failed: {e}")
tool_manager = None
else:
tool_manager = None
# Initialize Node Capabilities client
if NCS_AVAILABLE and capabilities_client:
ncs_cfg = router_config.get("node_capabilities", {})
ncs_url = ncs_cfg.get("url", "") or os.getenv("NODE_CAPABILITIES_URL", "")
ncs_ttl = ncs_cfg.get("cache_ttl_sec", 30)
if ncs_url:
capabilities_client.configure(url=ncs_url, ttl=ncs_ttl)
caps = await capabilities_client.fetch_capabilities()
served = caps.get("served_count", 0)
logger.info(f"✅ NCS configured: url={ncs_url} ttl={ncs_ttl}s served={served} models")
else:
logger.warning("⚠️ NCS url not configured; model selection will use static config only")
elif NCS_AVAILABLE:
logger.info(" NCS modules loaded but capabilities_client is None")
else:
logger.warning("⚠️ NCS modules not available (model_select / capabilities_client import failed)")
# Initialize CLAN runtime guard
if RUNTIME_GUARD_AVAILABLE and RuntimeGuard and CLAN_RUNTIME_GUARD_ENABLED:
try:
runtime_guard_engine = RuntimeGuard(
registry_path=CLAN_RUNTIME_REGISTRY_PATH,
envelope_schema_path=CLAN_RUNTIME_ENVELOPE_SCHEMA_PATH,
artifact_schema_path=CLAN_RUNTIME_ARTIFACT_SCHEMA_PATH,
consent_event_schema_path=CLAN_RUNTIME_CONSENT_EVENT_SCHEMA_PATH,
mode=CLAN_RUNTIME_GUARD_MODE,
)
logger.info(
"✅ CLAN Runtime Guard initialized "
f"(mode={CLAN_RUNTIME_GUARD_MODE}, registry={CLAN_RUNTIME_REGISTRY_PATH})"
)
except Exception as e:
logger.warning(f"⚠️ Runtime Guard init failed: {e}")
runtime_guard_engine = None
else:
runtime_guard_engine = None
# Log backend URLs
logger.info(f"📡 Swapper URL: {SWAPPER_URL}")
logger.info(f"📡 STT URL: {STT_URL}")
logger.info(f"📡 Vision URL: {VISION_URL}")
logger.info(f"📡 OCR URL: {OCR_URL}")
logger.info(f"📡 Neo4j URL: {NEO4J_URI}")
async def subscribe_to_filter_decisions():
"""Subscribe to agent.filter.decision events"""
if not nc:
return
source_subject = config.get("messaging_inbound", {}).get(
"source_subject",
"agent.filter.decision"
)
try:
sub = await nc.subscribe(source_subject)
print(f"✅ Subscribed to {source_subject}")
async for msg in sub.messages:
try:
decision_data = json.loads(msg.data.decode())
await handle_filter_decision(decision_data)
except Exception as e:
print(f"❌ Error processing decision: {e}")
import traceback
traceback.print_exc()
except Exception as e:
print(f"❌ Subscription error: {e}")
async def handle_filter_decision(decision_data: dict):
"""
Process agent.filter.decision events
Only processes 'allow' decisions
Creates AgentInvocation and publishes to router.invoke.agent
"""
try:
print(f"\n🔀 Processing filter decision")
decision = FilterDecision(**decision_data)
# Only process 'allow' decisions
if decision.decision != "allow":
print(f"⏭️ Ignoring non-allow decision: {decision.decision}")
return
if not decision.target_agent_id:
print(f"⚠️ No target agent specified, skipping")
return
print(f"✅ Decision: allow")
print(f"📝 Target: {decision.target_agent_id}")
print(f"📝 Channel: {decision.channel_id}")
# Create AgentInvocation
invocation = AgentInvocation(
agent_id=decision.target_agent_id,
entrypoint="channel_message",
payload={
"channel_id": decision.channel_id,
"message_id": decision.message_id,
"matrix_event_id": decision.matrix_event_id,
"microdao_id": decision.microdao_id,
"rewrite_prompt": decision.rewrite_prompt
}
)
print(f"🚀 Created invocation for {invocation.agent_id}")
# Publish to NATS
await publish_agent_invocation(invocation)
except Exception as e:
print(f"❌ Error handling decision: {e}")
import traceback
traceback.print_exc()
async def publish_agent_invocation(invocation: AgentInvocation):
"""Publish AgentInvocation to router.invoke.agent"""
if nc and nats_available:
target_subject = config.get("messaging_inbound", {}).get(
"target_subject",
"router.invoke.agent"
)
try:
await nc.publish(target_subject, invocation.json().encode())
print(f"✅ Published invocation to {target_subject}")
except Exception as e:
print(f"❌ Error publishing to NATS: {e}")
else:
print(f"⚠️ NATS not available, invocation not published: {invocation.json()}")
# ==============================================================
# PROMETHEUS METRICS ENDPOINT
# ==============================================================
@app.get("/metrics")
async def prometheus_metrics():
"""Prometheus metrics endpoint."""
try:
from agent_metrics import get_metrics, get_content_type
return Response(content=get_metrics(), media_type=get_content_type())
except Exception as e:
logger.error(f"Metrics error: {e}")
return Response(content=b"# Error generating metrics", media_type="text/plain")
@app.get("/health")
async def health():
"""Health check endpoint"""
return {
"status": "ok",
"service": "router",
"version": "1.0.0",
"nats_connected": nats_available,
"messaging_inbound_enabled": config.get("messaging_inbound", {}).get("enabled", True)
}
@app.get("/healthz")
async def healthz():
"""Alias /healthz → /health for BFF compatibility."""
return await health()
@app.get("/monitor/status")
async def monitor_status(request: Request = None):
"""
Node monitor status — read-only, safe, no secrets.
Returns: heartbeat, router/gateway health, open incidents,
alerts loop SLO, active backends, last artifact timestamps.
Rate limited: 60 rpm per IP (in-process bucket).
RBAC: requires tools.monitor.read entitlement (or tools.observability.read).
Auth: X-Monitor-Key header (same as SUPERVISOR_API_KEY, optional in dev).
"""
import collections as _collections
# ── Rate limit (60 rpm per IP) ────────────────────────────────────────
_now = time.monotonic()
client_ip = (
(request.client.host if request and request.client else None) or "unknown"
)
_bucket_key = f"monitor:{client_ip}"
if not hasattr(monitor_status, "_buckets"):
monitor_status._buckets = {}
dq = monitor_status._buckets.setdefault(_bucket_key, _collections.deque())
while dq and _now - dq[0] > 60:
dq.popleft()
if len(dq) >= 60:
from fastapi.responses import JSONResponse
return JSONResponse(status_code=429, content={"error": "rate_limit", "message": "60 rpm exceeded"})
dq.append(_now)
# ── Auth (optional in dev, enforced in prod) ──────────────────────────
_env = os.getenv("ENV", "dev").strip().lower()
_monitor_key = os.getenv("SUPERVISOR_API_KEY", "").strip()
if _env in ("prod", "production", "staging") and _monitor_key:
_req_key = ""
if request:
_req_key = (
request.headers.get("X-Monitor-Key", "")
or request.headers.get("Authorization", "").removeprefix("Bearer ").strip()
)
if _req_key != _monitor_key:
from fastapi.responses import JSONResponse
return JSONResponse(status_code=403, content={"error": "forbidden", "message": "X-Monitor-Key required"})
# ── Collect data (best-effort, non-fatal) ─────────────────────────────
warnings: list[str] = []
ts_now = __import__("datetime").datetime.now(
__import__("datetime").timezone.utc
).isoformat(timespec="seconds")
# uptime as heartbeat proxy
_proc_start = getattr(monitor_status, "_proc_start", None)
if _proc_start is None:
monitor_status._proc_start = time.monotonic()
_proc_start = monitor_status._proc_start
heartbeat_age_s = int(time.monotonic() - _proc_start)
# open incidents
open_incidents: int | None = None
try:
from incident_store import get_incident_store as _get_is
_istore = _get_is()
_open = _istore.list_incidents(filters={"status": "open"}, limit=500)
# include "mitigating" as still-open
open_incidents = sum(
1 for i in _open if (i.get("status") or "").lower() in ("open", "mitigating")
)
except Exception as _e:
warnings.append(f"incidents: {str(_e)[:80]}")
# alerts loop SLO
alerts_loop_slo: dict | None = None
try:
from alert_store import get_alert_store as _get_as
alerts_loop_slo = _get_as().compute_loop_slo(window_minutes=240)
# strip any internal keys that may contain infra details
_safe_keys = {"claim_to_ack_p95_seconds", "failed_rate_pct", "processing_stuck_count", "sample_count", "violations"}
alerts_loop_slo = {k: v for k, v in alerts_loop_slo.items() if k in _safe_keys}
except Exception as _e:
warnings.append(f"alerts_slo: {str(_e)[:80]}")
# backends (env vars only — no DSN, no passwords)
backends = {
"alerts": os.getenv("ALERT_BACKEND", "unknown"),
"audit": os.getenv("AUDIT_BACKEND", "unknown"),
"incidents": os.getenv("INCIDENT_BACKEND", "unknown"),
"risk_history": os.getenv("RISK_HISTORY_BACKEND", "unknown"),
"backlog": os.getenv("BACKLOG_BACKEND", "unknown"),
}
# last artifact timestamps (best-effort filesystem scan)
last_artifacts: dict = {}
_base = __import__("pathlib").Path("ops")
for _pattern, _key in [
("reports/risk/*.md", "risk_digest_ts"),
("reports/platform/*.md", "platform_digest_ts"),
("backlog/*.jsonl", "backlog_generate_ts"),
("reports/backlog/*.md", "backlog_report_ts"),
]:
try:
_files = sorted(_base.glob(_pattern))
if _files:
_mtime = _files[-1].stat().st_mtime
last_artifacts[_key] = __import__("datetime").datetime.fromtimestamp(
_mtime, tz=__import__("datetime").timezone.utc
).isoformat(timespec="seconds")
except Exception:
pass
return {
"node_id": os.getenv("NODE_ID", "NODA1"),
"ts": ts_now,
"heartbeat_age_s": heartbeat_age_s,
"router_ok": True, # we are the router; if we respond, we're ok
"gateway_ok": None, # gateway health not probed here (separate svc)
"open_incidents": open_incidents,
"alerts_loop_slo": alerts_loop_slo,
"backends": backends,
"last_artifacts": last_artifacts,
"warnings": warnings,
}
@app.post("/internal/router/test-messaging", response_model=AgentInvocation)
async def test_messaging_route(decision: FilterDecision):
"""
Test endpoint for routing logic
Tests filter decision → agent invocation mapping without NATS
"""
print(f"\n🧪 Test routing request")
if decision.decision != "allow" or not decision.target_agent_id:
raise HTTPException(
status_code=400,
detail=f"Decision not routable: {decision.decision}, agent: {decision.target_agent_id}"
)
invocation = AgentInvocation(
agent_id=decision.target_agent_id,
entrypoint="channel_message",
payload={
"channel_id": decision.channel_id,
"message_id": decision.message_id,
"matrix_event_id": decision.matrix_event_id,
"microdao_id": decision.microdao_id,
"rewrite_prompt": decision.rewrite_prompt
}
)
print(f"✅ Test invocation created for {invocation.agent_id}")
return invocation
@app.on_event("shutdown")
async def shutdown_event():
"""Clean shutdown"""
global nc, http_client
if nc:
await nc.close()
logger.info("✅ NATS connection closed")
if http_client:
await http_client.aclose()
logger.info("✅ HTTP client closed")
# ============================================================================
# Backend Integration Endpoints
# ============================================================================
class InferRequest(BaseModel):
"""Request for agent inference"""
prompt: str
model: Optional[str] = None
max_tokens: Optional[int] = 2048
temperature: Optional[float] = 0.7
system_prompt: Optional[str] = None
images: Optional[List[str]] = None # List of base64 data URLs for vision
metadata: Optional[Dict[str, Any]] = None # Additional metadata (user_id, chat_id, etc.)
class InferResponse(BaseModel):
"""Response from agent inference"""
response: str
model: str
tokens_used: Optional[int] = None
backend: str
image_base64: Optional[str] = None # Generated image in base64 format
file_base64: Optional[str] = None
file_name: Optional[str] = None
file_mime: Optional[str] = None
class ToolExecuteRequest(BaseModel):
"""External tool execution request used by console/ops APIs."""
model_config = ConfigDict(extra="allow")
tool: str
action: Optional[str] = None
agent_id: Optional[str] = "sofiia"
metadata: Optional[Dict[str, Any]] = None
# =========================================================================
# INTERNAL LLM API (for CrewAI and internal services)
# =========================================================================
class InternalLLMRequest(BaseModel):
prompt: str
system_prompt: Optional[str] = None
llm_profile: Optional[str] = "reasoning"
model: Optional[str] = None
max_tokens: Optional[int] = 2048
temperature: Optional[float] = 0.2
role_context: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class InternalLLMResponse(BaseModel):
text: str
model: str
provider: str
tokens_used: int = 0
latency_ms: int = 0
class BackendStatus(BaseModel):
"""Status of a backend service"""
name: str
url: str
status: str # online, offline, error
active_model: Optional[str] = None
error: Optional[str] = None
@app.get("/backends/status", response_model=List[BackendStatus])
async def get_backends_status():
"""Get status of all backend services"""
backends = []
# Check Swapper
try:
resp = await http_client.get(f"{SWAPPER_URL}/health", timeout=5.0)
if resp.status_code == 200:
data = resp.json()
backends.append(BackendStatus(
name="swapper",
url=SWAPPER_URL,
status="online",
active_model=data.get("active_model")
))
else:
backends.append(BackendStatus(
name="swapper",
url=SWAPPER_URL,
status="error",
error=f"HTTP {resp.status_code}"
))
except Exception as e:
backends.append(BackendStatus(
name="swapper",
url=SWAPPER_URL,
status="offline",
error=str(e)
))
# Check STT
try:
resp = await http_client.get(f"{STT_URL}/health", timeout=5.0)
backends.append(BackendStatus(
name="stt",
url=STT_URL,
status="online" if resp.status_code == 200 else "error"
))
except Exception as e:
backends.append(BackendStatus(
name="stt",
url=STT_URL,
status="offline",
error=str(e)
))
# Check Vision (Ollama)
try:
resp = await http_client.get(f"{VISION_URL}/api/tags", timeout=5.0)
if resp.status_code == 200:
data = resp.json()
models = [m.get("name") for m in data.get("models", [])]
backends.append(BackendStatus(
name="vision",
url=VISION_URL,
status="online",
active_model=", ".join(models[:3]) if models else None
))
else:
backends.append(BackendStatus(
name="vision",
url=VISION_URL,
status="error"
))
except Exception as e:
backends.append(BackendStatus(
name="vision",
url=VISION_URL,
status="offline",
error=str(e)
))
# Check OCR
try:
resp = await http_client.get(f"{OCR_URL}/health", timeout=5.0)
backends.append(BackendStatus(
name="ocr",
url=OCR_URL,
status="online" if resp.status_code == 200 else "error"
))
except Exception as e:
backends.append(BackendStatus(
name="ocr",
url=OCR_URL,
status="offline",
error=str(e)
))
return backends
# =========================================================================
# INTERNAL LLM COMPLETE ENDPOINT (for CrewAI)
# =========================================================================
@app.post("/internal/llm/complete", response_model=InternalLLMResponse)
async def internal_llm_complete(request: InternalLLMRequest):
"""
Internal LLM completion endpoint.
NO routing, NO CrewAI decision, NO agent selection.
Used by CrewAI service for multi-role orchestration.
"""
import time as time_module
t0 = time_module.time()
logger.info(f"Internal LLM: profile={request.llm_profile}, role={request.role_context}")
llm_profiles = router_config.get("llm_profiles", {})
profile_name = request.llm_profile or "reasoning"
llm_profile = llm_profiles.get(profile_name, {})
if not llm_profile:
fallback_name = "local_default_coder"
llm_profile = llm_profiles.get(fallback_name, {})
logger.warning(f"⚠️ Profile '{profile_name}' not found in llm_profiles → falling back to '{fallback_name}' (local)")
profile_name = fallback_name
provider = llm_profile.get("provider", "ollama")
model = request.model or llm_profile.get("model", "qwen3:14b")
max_tokens = request.max_tokens or llm_profile.get("max_tokens", 2048)
temperature = request.temperature or llm_profile.get("temperature", 0.2)
logger.info(f"🎯 Resolved: profile={profile_name} provider={provider} model={model}")
# Build messages
messages = []
if request.system_prompt:
system_content = request.system_prompt
if request.role_context:
system_content = f"[Role: {request.role_context}]\n\n{system_content}"
messages.append({"role": "system", "content": system_content})
elif request.role_context:
messages.append({"role": "system", "content": f"You are acting as {request.role_context}. Respond professionally."})
messages.append({"role": "user", "content": request.prompt})
# Cloud providers
cloud_providers = [
{"name": "deepseek", "api_key_env": "DEEPSEEK_API_KEY", "base_url": "https://api.deepseek.com", "model": "deepseek-chat", "timeout": 60},
{"name": "mistral", "api_key_env": "MISTRAL_API_KEY", "base_url": "https://api.mistral.ai", "model": "mistral-large-latest", "timeout": 60},
{"name": "grok", "api_key_env": "GROK_API_KEY", "base_url": "https://api.x.ai", "model": "grok-4-1-fast-reasoning", "timeout": 60}
]
# Respect configured provider: local profiles should stay local.
if provider in ["deepseek", "mistral", "grok"]:
cloud_providers = sorted(cloud_providers, key=lambda x: 0 if x["name"] == provider else 1)
elif provider == "ollama":
cloud_providers = []
# Try cloud providers (only when provider is cloud)
for cloud in cloud_providers:
api_key = os.getenv(cloud["api_key_env"])
if not api_key:
continue
try:
logger.debug(f"Internal LLM trying {cloud['name']}")
cloud_resp = await http_client.post(
f"{cloud['base_url']}/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
json={"model": cloud["model"], "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "stream": False},
timeout=cloud["timeout"]
)
if cloud_resp.status_code == 200:
data = cloud_resp.json()
response_text = data.get("choices", [{}])[0].get("message", {}).get("content", "")
tokens = data.get("usage", {}).get("total_tokens", 0)
latency = int((time_module.time() - t0) * 1000)
logger.info(f"Internal LLM success: {cloud['name']}, {tokens} tokens, {latency}ms")
return InternalLLMResponse(text=response_text, model=cloud["model"], provider=cloud["name"], tokens_used=tokens, latency_ms=latency)
except Exception as e:
logger.warning(f"Internal LLM {cloud['name']} failed: {e}")
continue
# Fallback/target local provider (Ollama)
try:
ollama_base = llm_profile.get("base_url", os.getenv("OLLAMA_BASE_URL", "http://host.docker.internal:11434"))
ollama_model = model or "qwen3:14b"
logger.info(f"Internal LLM to Ollama: model={ollama_model} url={ollama_base}")
ollama_resp = await http_client.post(
f"{ollama_base}/api/generate",
json={"model": ollama_model, "prompt": request.prompt, "system": request.system_prompt or "", "stream": False, "options": {"num_predict": max_tokens, "temperature": temperature}},
timeout=120.0
)
if ollama_resp.status_code == 200:
data = ollama_resp.json()
latency = int((time_module.time() - t0) * 1000)
return InternalLLMResponse(text=data.get("response", ""), model=ollama_model, provider="ollama", tokens_used=0, latency_ms=latency)
except Exception as e:
logger.error(f"Internal LLM Ollama failed: {e}")
raise HTTPException(status_code=503, detail="All LLM providers unavailable")
@app.post("/v1/agents/{agent_id}/infer", response_model=InferResponse)
async def agent_infer(agent_id: str, request: InferRequest):
"""
Route inference request to appropriate backend.
Router decides which backend to use based on:
- Agent configuration (model, capabilities)
- Request type (text, vision, audio)
- Backend availability
System prompt is fetched from database via city-service API.
Memory context is retrieved via Memory Retrieval Pipeline v3.0.
"""
logger.info(f"🔀 Inference request for agent: {agent_id}")
logger.info(f"📝 Prompt: {request.prompt[:100]}...")
# =========================================================================
# MEMORY RETRIEVAL (v4.0 - Universal for all agents)
# =========================================================================
memory_brief_text = ""
# Extract metadata once for both retrieval and storage
metadata = request.metadata or {}
channel = "telegram" # Default
chat_id = str(metadata.get("chat_id", ""))
user_id = str(metadata.get("user_id", "")).replace("tg:", "")
username = metadata.get("username")
# Get agent_id from metadata or URL parameter
request_agent_id = metadata.get("agent_id", agent_id).lower()
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval:
try:
if chat_id and user_id:
brief = await memory_retrieval.retrieve(
channel=channel,
chat_id=chat_id,
user_id=user_id,
agent_id=request_agent_id, # Agent-specific collections
username=username,
message=request.prompt
)
memory_brief_text = brief.to_text(max_lines=10)
if memory_brief_text:
logger.info(f"🧠 Memory brief for {request_agent_id}: {len(memory_brief_text)} chars")
except Exception as e:
logger.warning(f"⚠️ Memory retrieval failed for {request_agent_id}: {e}")
# Get system prompt from database or config
system_prompt = request.system_prompt
system_prompt_source = "request"
if system_prompt:
logger.info(f"📝 Received system_prompt from request: {len(system_prompt)} chars")
logger.debug(f"System prompt preview: {system_prompt[:200]}...")
else:
system_prompt_source = "city_service"
logger.info(f" No system_prompt in request for agent {agent_id}, loading from configured sources")
if not system_prompt:
try:
from prompt_builder import get_prompt_builder
prompt_builder = await get_prompt_builder(
city_service_url=CITY_SERVICE_URL,
router_config=router_config,
)
prompt_result = await prompt_builder.get_system_prompt(agent_id)
system_prompt = prompt_result.system_prompt
system_prompt_source = prompt_result.source
logger.info(f"✅ Loaded system prompt for {agent_id} from {system_prompt_source}")
except Exception as e:
logger.warning(f"⚠️ Could not load prompt from configured sources: {e}")
# Fallback to config
system_prompt_source = "router_config"
agent_config = router_config.get("agents", {}).get(agent_id, {})
system_prompt = agent_config.get("system_prompt")
if not system_prompt:
system_prompt_source = "empty"
logger.warning(f"⚠️ System prompt unavailable for {agent_id}; continuing with provider defaults")
system_prompt_hash = hashlib.sha256((system_prompt or "").encode("utf-8")).hexdigest()[:12]
effective_metadata = dict(metadata)
effective_metadata["system_prompt_hash"] = system_prompt_hash
effective_metadata["system_prompt_source"] = system_prompt_source
effective_metadata["system_prompt_version"] = (
metadata.get("system_prompt_version")
or f"{agent_id}:{system_prompt_hash}"
)
logger.info(
f"🧩 Prompt meta for {agent_id}: source={system_prompt_source}, "
f"version={effective_metadata['system_prompt_version']}, hash={system_prompt_hash}"
)
# Determine which backend to use
# Use router config to get default model for agent, fallback to qwen3:8b
agent_config = router_config.get("agents", {}).get(agent_id, {})
# =========================================================================
# CREWAI DECISION: Use orchestration or direct LLM?
# =========================================================================
if CREWAI_ROUTING_ENABLED and CREWAI_CLIENT_AVAILABLE:
try:
runtime_envelope = None
if runtime_guard_engine and request_agent_id == "clan":
runtime_envelope = runtime_guard_engine.build_envelope(
agent_id=request_agent_id,
prompt=request.prompt,
metadata=effective_metadata,
)
ok_pre, pre_info = runtime_guard_engine.pre_dispatch_checks(runtime_envelope)
if not ok_pre:
stop_payload = runtime_guard_engine.stop_payload(runtime_envelope, pre_info)
logger.warning(
"🛑 Runtime guard pre-dispatch stop: "
f"code={stop_payload.get('stop_code')} request_id={stop_payload.get('request_id')} "
f"input_hash={stop_payload.get('input_hash')}"
)
return InferResponse(
response=json.dumps(stop_payload, ensure_ascii=False),
model="runtime-guard",
backend="runtime-guard",
tokens_used=0,
)
if (
CLAN_GUARD_TEST_MODE
and effective_metadata.get("guard_self_test") is True
and isinstance(effective_metadata.get("__inject_fake_agent_result"), dict)
):
fake_result = effective_metadata.get("__inject_fake_agent_result")
ok_post, post_info = runtime_guard_engine.post_return_checks(runtime_envelope, fake_result)
if not ok_post:
stop_payload = runtime_guard_engine.stop_payload(runtime_envelope, post_info)
logger.warning(
"🧪 Runtime guard self-test stop: "
f"code={stop_payload.get('stop_code')} request_id={stop_payload.get('request_id')} "
f"input_hash={stop_payload.get('input_hash')}"
)
return InferResponse(
response=json.dumps(stop_payload, ensure_ascii=False),
model="runtime-guard",
backend="runtime-guard",
tokens_used=0,
)
logger.info("🧪 Runtime guard self-test passed (fake result accepted)")
return InferResponse(
response=json.dumps({"ok": True, "self_test": True}, ensure_ascii=False),
model="runtime-guard",
backend="runtime-guard",
tokens_used=0,
)
# Get agent CrewAI config from registry (or router_config fallback)
crewai_cfg = agent_config.get("crewai", {})
use_crewai, crewai_reason = should_use_crewai(
agent_id=agent_id,
prompt=request.prompt,
agent_config=agent_config,
metadata=effective_metadata,
force_crewai=effective_metadata.get("force_crewai", False),
)
logger.info(f"🎭 CrewAI decision for {agent_id}: {use_crewai} ({crewai_reason})")
if use_crewai:
t0 = time.time()
crew_result = await call_crewai(
agent_id=agent_id,
task=request.prompt,
context={
"memory_brief": memory_brief_text,
"system_prompt": system_prompt,
"system_prompt_meta": {
"source": system_prompt_source,
"version": effective_metadata.get("system_prompt_version"),
"hash": system_prompt_hash,
},
"metadata": effective_metadata,
"runtime_envelope": runtime_envelope,
},
team=crewai_cfg.get("team"),
profile=effective_metadata.get("crewai_profile")
)
latency = time.time() - t0
if crew_result.get("success") and crew_result.get("result"):
if runtime_guard_engine and request_agent_id == "clan" and runtime_envelope:
ok_post, post_info = runtime_guard_engine.post_return_checks(runtime_envelope, crew_result)
if not ok_post:
stop_payload = runtime_guard_engine.stop_payload(runtime_envelope, post_info)
logger.warning(
"🛑 Runtime guard post-return stop: "
f"code={stop_payload.get('stop_code')} request_id={stop_payload.get('request_id')} "
f"input_hash={stop_payload.get('input_hash')}"
)
return InferResponse(
response=json.dumps(stop_payload, ensure_ascii=False),
model="runtime-guard",
backend="runtime-guard",
tokens_used=0,
)
crew_result = runtime_guard_engine.stamp_result_artifacts(runtime_envelope, crew_result)
ok_stamp, stamp_info = runtime_guard_engine.ensure_stamped_trails(crew_result)
if not ok_stamp:
stop_payload = runtime_guard_engine.stop_payload(runtime_envelope, stamp_info)
logger.warning(
"🛑 Runtime guard stamped-trail stop: "
f"code={stop_payload.get('stop_code')} request_id={stop_payload.get('request_id')} "
f"input_hash={stop_payload.get('input_hash')}"
)
return InferResponse(
response=json.dumps(stop_payload, ensure_ascii=False),
model="runtime-guard",
backend="runtime-guard",
tokens_used=0,
)
for row in runtime_guard_engine.artifact_runtime_rows(runtime_envelope, crew_result):
logger.info(json.dumps(row, ensure_ascii=False))
for row in runtime_guard_engine.consent_runtime_rows(runtime_envelope, crew_result):
logger.info(json.dumps(row, ensure_ascii=False))
for row in (crew_result.get("artifact_state_transition_rows") or []):
if isinstance(row, dict):
logger.info(json.dumps(row, ensure_ascii=False))
logger.info(f"✅ CrewAI success for {agent_id}: {latency:.2f}s")
final_response_text = crew_result["result"]
# Helion: keep first-touch answers short by default, even after CrewAI.
if (
agent_id == "helion"
and isinstance(final_response_text, str)
and effective_metadata.get("force_concise")
and not effective_metadata.get("force_detailed")
):
parts = re.split(r"(?<=[.!?])\s+", final_response_text.strip())
if len(parts) > 3:
final_response_text = " ".join(parts[:3]).strip()
# Store interaction in memory
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id:
try:
await memory_retrieval.store_interaction(
channel=channel,
chat_id=chat_id,
user_id=user_id,
agent_id=request_agent_id,
username=username,
user_message=request.prompt,
assistant_response=final_response_text
)
except Exception as e:
logger.warning(f"⚠️ Memory storage failed: {e}")
return InferResponse(
response=final_response_text,
model="crewai-" + agent_id,
backend="crewai",
tokens_used=0
)
else:
logger.warning(f"⚠️ CrewAI failed, falling back to direct LLM")
except Exception as e:
logger.exception(f"❌ CrewAI error: {e}, falling back to direct LLM")
default_llm = agent_config.get("default_llm", "local_default_coder")
routing_rules = router_config.get("routing", [])
default_llm = _select_default_llm(agent_id, metadata, default_llm, routing_rules)
cloud_provider_names = {"deepseek", "mistral", "grok", "openai", "anthropic"}
# ── NCS-first model selection ────────────────────────────────────────
ncs_selection = None
if NCS_AVAILABLE and capabilities_client:
try:
caps = await capabilities_client.fetch_capabilities()
if caps:
caps["_fetch_ts"] = capabilities_client._cache_ts
ncs_selection = await select_model_for_agent(
agent_id, agent_config, router_config, caps, request.model,
)
except Exception as e:
logger.warning(f"⚠️ NCS selection error: {e}; falling back to static config")
llm_profiles = router_config.get("llm_profiles", {})
if ncs_selection and ncs_selection.name:
provider = ncs_selection.provider
model = ncs_selection.name
llm_profile = llm_profiles.get(default_llm, {})
if ncs_selection.base_url and provider == "ollama":
llm_profile = {**llm_profile, "base_url": ncs_selection.base_url}
logger.info(
f"🎯 NCS select: agent={agent_id} profile={default_llm} "
f"→ runtime={ncs_selection.runtime} model={model} "
f"provider={provider} via_ncs={ncs_selection.via_ncs} "
f"caps_age={ncs_selection.caps_age_s}s "
f"fallback={ncs_selection.fallback_reason or 'none'}"
)
else:
llm_profile = llm_profiles.get(default_llm, {})
if not llm_profile:
fallback_llm = agent_config.get("fallback_llm", "local_default_coder")
llm_profile = llm_profiles.get(fallback_llm, {})
logger.warning(
f"⚠️ Profile '{default_llm}' not found for agent={agent_id} "
f"→ fallback to '{fallback_llm}' (local). "
f"NOT defaulting to cloud silently."
)
default_llm = fallback_llm
provider = llm_profile.get("provider", "ollama")
if request.model:
for profile_name, profile in llm_profiles.items():
if profile.get("model") == request.model and profile.get("provider") in cloud_provider_names:
llm_profile = profile
provider = profile.get("provider", provider)
default_llm = profile_name
logger.info(f"🎛️ Matched request.model={request.model} to profile={profile_name} provider={provider}")
break
if provider in ["deepseek", "openai", "anthropic", "mistral"]:
model = llm_profile.get("model", "deepseek-chat")
elif provider == "grok":
model = llm_profile.get("model", "grok-4-1-fast-reasoning")
else:
model = request.model or llm_profile.get("model", "qwen3:14b")
logger.info(
f"🎯 Static select: agent={agent_id} profile={default_llm} "
f"provider={provider} model={model}"
)
# =========================================================================
# VISION PROCESSING (if images present)
# =========================================================================
if request.images and len(request.images) > 0:
logger.info(f"🖼️ Vision request: {len(request.images)} image(s)")
try:
# Use Swapper's /vision endpoint (manages model loading)
vision_payload = {
"model": "qwen3-vl-8b",
"prompt": request.prompt,
"images": request.images, # Swapper handles data URL conversion
"max_tokens": request.max_tokens or 1024,
"temperature": request.temperature or 0.7
}
# Add system prompt if available
if system_prompt:
if memory_brief_text:
vision_payload["system"] = f"{system_prompt}\n\n[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}"
else:
vision_payload["system"] = system_prompt
logger.info(f"🖼️ Sending to Swapper /vision: {SWAPPER_URL}/vision")
vision_resp = await http_client.post(
f"{SWAPPER_URL}/vision",
json=vision_payload,
timeout=120.0
)
if vision_resp.status_code == 200:
vision_data = vision_resp.json()
raw_response = vision_data.get("text", "")
full_response = _sanitize_vision_text_for_user(raw_response)
vision_web_query = ""
vision_sources: List[Dict[str, str]] = []
# Debug: log full response structure
logger.info(
f"✅ Vision response: raw={len(raw_response)} chars, sanitized={len(full_response)} chars, "
f"success={vision_data.get('success')}, keys={list(vision_data.keys())}"
)
if raw_response and not full_response:
full_response = _extract_vision_search_facts(raw_response, max_chars=280)
if not full_response:
logger.warning(f"⚠️ Empty vision response! Full data: {str(vision_data)[:500]}")
# Optional vision -> web enrichment (soft policy):
# if prompt explicitly asks to search online OR vision answer is uncertain.
if (full_response or raw_response) and TOOL_MANAGER_AVAILABLE and tool_manager:
try:
wants_web = _vision_prompt_wants_web(request.prompt)
uncertain = _vision_answer_uncertain(full_response or raw_response)
if wants_web or uncertain:
query = _build_vision_web_query(request.prompt, full_response or raw_response)
if not query:
logger.info("🔎 Vision web enrich skipped: query not actionable")
else:
vision_web_query = query
search_result = await tool_manager.execute_tool(
"web_search",
{"query": query, "max_results": 3},
agent_id=request_agent_id,
chat_id=chat_id,
user_id=user_id,
)
if search_result and search_result.success and search_result.result:
compact_search = _compact_web_search_result(
search_result.result,
query=query,
agent_id=request_agent_id,
)
if compact_search and "Нічого не знайдено" not in compact_search:
vision_sources = _extract_sources_from_compact(compact_search)
base_text = full_response or "Не вдалося надійно ідентифікувати об'єкт на фото."
full_response = (
f"{base_text}\n\n"
f"Додатково знайшов у відкритих джерелах:\n{compact_search}"
)
logger.info(
"🌐 Vision web enrichment applied "
f"for agent={request_agent_id}, wants_web={wants_web}, uncertain={uncertain}, "
f"sources={len(vision_sources)}"
)
except Exception as e:
logger.warning(f"⚠️ Vision web enrichment failed: {e}")
if vision_web_query:
logger.info(
f"🗂️ Vision enrichment metadata: agent={request_agent_id}, "
f"query='{vision_web_query[:180]}', sources={len(vision_sources)}"
)
# Image quality gate: one soft retry if response looks empty/meta.
if _image_response_needs_retry(full_response):
try:
logger.warning(f"⚠️ Vision quality gate triggered for agent={request_agent_id}, retrying once")
retry_payload = dict(vision_payload)
retry_payload["prompt"] = (
"Опиши зображення по суті: що зображено, ключові деталі, можливий контекст. "
"Відповідай українською 2-4 реченнями, без службових фраз. "
f"Запит користувача: {request.prompt}"
)
retry_resp = await http_client.post(
f"{SWAPPER_URL}/vision",
json=retry_payload,
timeout=120.0
)
if retry_resp.status_code == 200:
retry_data = retry_resp.json()
retry_raw = retry_data.get("text", "")
retry_text = _sanitize_vision_text_for_user(retry_raw)
if retry_raw and not retry_text:
retry_text = _extract_vision_search_facts(retry_raw, max_chars=280)
if retry_text and not _image_response_needs_retry(retry_text):
full_response = retry_text
logger.info(f"✅ Vision quality retry improved response for agent={request_agent_id}")
except Exception as e:
logger.warning(f"⚠️ Vision quality retry failed: {e}")
if _image_response_needs_retry(full_response):
full_response = _build_image_fallback_response(request_agent_id, request.prompt)
elif request_agent_id == "agromatrix" and _vision_response_is_blurry(full_response):
full_response = _build_image_fallback_response(request_agent_id, request.prompt)
# Store vision message in agent-specific memory
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id and full_response:
vision_meta: Dict[str, Any] = {}
if vision_web_query:
vision_meta["vision_search_query"] = vision_web_query[:500]
if vision_sources:
vision_meta["vision_sources"] = vision_sources
asyncio.create_task(
memory_retrieval.store_message(
agent_id=request_agent_id,
user_id=user_id,
username=username,
message_text=f"[Image] {request.prompt}",
response_text=full_response,
chat_id=chat_id,
message_type="vision",
metadata=vision_meta if vision_meta else None,
)
)
return InferResponse(
response=full_response,
model="qwen3-vl-8b",
tokens_used=None,
backend="swapper-vision"
)
else:
logger.error(f"❌ Swapper vision error: {vision_resp.status_code} - {vision_resp.text[:200]}")
return InferResponse(
response=_build_image_fallback_response(request_agent_id, request.prompt),
model="qwen3-vl-8b",
tokens_used=None,
backend="swapper-vision-fallback"
)
except Exception as e:
logger.error(f"❌ Vision processing failed: {e}", exc_info=True)
return InferResponse(
response=_build_image_fallback_response(request_agent_id, request.prompt),
model="qwen3-vl-8b",
tokens_used=None,
backend="swapper-vision-fallback"
)
# =========================================================================
# SMART LLM ROUTER WITH AUTO-FALLBACK
# Priority: DeepSeek → Mistral → Grok → Local Ollama
# =========================================================================
# Build messages array once for all providers
messages = []
if system_prompt:
if memory_brief_text:
enhanced_prompt = f"{system_prompt}\n\n[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}"
messages.append({"role": "system", "content": enhanced_prompt})
logger.info(f"📝 Added system message with prompt ({len(system_prompt)} chars) + memory ({len(memory_brief_text)} chars)")
else:
messages.append({"role": "system", "content": system_prompt})
logger.info(f"📝 Added system message with prompt ({len(system_prompt)} chars)")
elif memory_brief_text:
messages.append({"role": "system", "content": f"[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}"})
logger.warning(f"⚠️ No system_prompt! Using only memory brief ({len(memory_brief_text)} chars)")
else:
logger.error(f"❌ No system_prompt AND no memory_brief! LLM will have no context!")
messages.append({"role": "user", "content": request.prompt})
logger.debug(f"📨 Messages array: {len(messages)} messages, system={len(messages[0].get('content', '')) if messages else 0} chars")
max_tokens = request.max_tokens or llm_profile.get("max_tokens", 2048)
temperature = request.temperature or llm_profile.get("temperature", 0.2)
allow_cloud = provider in cloud_provider_names
if not allow_cloud:
logger.info(f"☁️ Cloud providers disabled for agent {agent_id}: provider={provider}")
# Define cloud providers with fallback order
cloud_providers = [
{
"name": "deepseek",
"api_key_env": "DEEPSEEK_API_KEY",
"base_url": "https://api.deepseek.com",
"model": "deepseek-chat",
"timeout": 40
},
{
"name": "mistral",
"api_key_env": "MISTRAL_API_KEY",
"base_url": "https://api.mistral.ai",
"model": "mistral-large-latest",
"timeout": 60
},
{
"name": "grok",
"api_key_env": "GROK_API_KEY",
"base_url": "https://api.x.ai",
"model": "grok-4-1-fast-reasoning",
"timeout": 60
}
]
# Custom configured profile for OpenAI-compatible backends (e.g. local llama-server).
if provider == "openai":
cloud_providers = [
{
"name": "openai",
"api_key_env": llm_profile.get("api_key_env", "OPENAI_API_KEY"),
"base_url": llm_profile.get("base_url", "https://api.openai.com"),
"model": request.model or llm_profile.get("model", model),
"timeout": int(llm_profile.get("timeout_ms", 60000) / 1000),
}
]
if not allow_cloud:
cloud_providers = []
# If specific provider requested, try it first
if provider in ["deepseek", "mistral", "grok"]:
# Reorder to put requested provider first
cloud_providers = sorted(cloud_providers, key=lambda x: 0 if x["name"] == provider else 1)
last_error = None
# Get tool definitions if Tool Manager is available
tools_payload = None
if TOOL_MANAGER_AVAILABLE and tool_manager:
tools_payload = tool_manager.get_tool_definitions(request_agent_id)
logger.debug(f"🔧 {len(tools_payload)} tools available for function calling")
for cloud in cloud_providers:
api_key = os.getenv(cloud["api_key_env"], "")
base_url = cloud.get("base_url", "")
is_local_openai = (
cloud.get("name") == "openai"
and isinstance(base_url, str)
and any(host in base_url for host in ["host.docker.internal", "localhost", "127.0.0.1"])
)
if not api_key and not is_local_openai:
logger.debug(f"⏭️ Skipping {cloud['name']}: API key not configured")
continue
try:
logger.info(f"🌐 Trying {cloud['name'].upper()} API with model: {cloud['model']}")
# Build request payload
request_payload = {
"model": cloud["model"],
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": False
}
# Add tools for function calling (if available and supported)
if tools_payload and cloud["name"] in ["deepseek", "mistral", "grok"]:
request_payload["tools"] = tools_payload
request_payload["tool_choice"] = "auto"
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
cloud_resp = await http_client.post(
f"{cloud['base_url']}/v1/chat/completions",
headers=headers,
json=request_payload,
timeout=cloud["timeout"]
)
if cloud_resp.status_code == 200:
data = cloud_resp.json()
choice = data.get("choices", [{}])[0]
message = choice.get("message", {})
response_text = message.get("content", "") or ""
if not response_text and message.get("reasoning_content"):
response_text = str(message.get("reasoning_content", "")).strip()
tokens_used = data.get("usage", {}).get("total_tokens", 0)
# Initialize tool_results to avoid UnboundLocalError
tool_results = []
# Check for tool calls (standard format)
tool_calls = message.get("tool_calls", [])
# Also check for DSML format in content (DeepSeek sometimes returns this)
# AGGRESSIVE check - any DSML-like pattern should be caught
import re
has_dsml = False
if response_text:
# Check for DSML patterns with regex (handles Unicode variations)
dsml_patterns_check = [
r'DSML', # Any mention of DSML
r'function_calls>',
r'invoke\s*name\s*=',
r'parameter\s*name\s*=',
r'<[^>]*invoke[^>]*>',
r'</[^>]*invoke[^>]*>',
]
for pattern in dsml_patterns_check:
if re.search(pattern, response_text, re.IGNORECASE):
has_dsml = True
logger.warning(f"⚠️ DSML detected via pattern: {pattern}")
break
if has_dsml:
logger.warning("⚠️ Detected DSML format in content, parsing...")
# Extract tool name and parameters from DSML
import re
# Try multiple DSML patterns
dsml_patterns = [
r'invoke name="(\w+)".*?parameter name="(\w+)"[^>]*>([^<]+)',
r'invoke\s+name="(\w+)".*?parameter\s+name="(\w+)"[^>]*>([^<]+)',
r'name="web_extract".*?url.*?>([^\s<]+)',
]
dsml_match = None
for pattern in dsml_patterns:
dsml_match = re.search(pattern, response_text, re.DOTALL | re.IGNORECASE)
if dsml_match:
break
if dsml_match and len(dsml_match.groups()) >= 3:
tool_name = dsml_match.group(1)
param_name = dsml_match.group(2)
param_value = dsml_match.group(3).strip()
# Create synthetic tool call with Mistral-compatible ID (exactly 9 chars, a-zA-Z0-9)
import string
import random
tool_call_id = ''.join(random.choices(string.ascii_letters + string.digits, k=9))
tool_calls = [{
"id": tool_call_id,
"function": {
"name": tool_name,
"arguments": json.dumps({param_name: param_value})
}
}]
logger.info(f"🔧 Parsed DSML tool call: {tool_name}({param_name}={param_value[:50]}...) id={tool_call_id}")
# ALWAYS clear DSML content - never show raw DSML to user
logger.warning(f"🧹 Clearing DSML content from response ({len(response_text)} chars)")
response_text = ""
if tool_calls and tool_manager:
max_tool_rounds = int(os.getenv("ROUTER_TOOL_MAX_ROUNDS", "10"))
logger.info(f"🔧 LLM requested tool calls; running iterative mode up to {max_tool_rounds} rounds")
all_tool_results = []
current_tool_calls = tool_calls
rounds_done = 0
oneok_ctx = {
"client_id": None,
"site_id": None,
"calc_result": None,
"quote_id": None,
}
repeated_failures = {}
while current_tool_calls and rounds_done < max_tool_rounds:
rounds_done += 1
logger.info(f"🔁 Tool round {rounds_done}/{max_tool_rounds}: {len(current_tool_calls)} call(s)")
round_results = []
abort_loop_due_repeats = False
for tc in current_tool_calls:
func = tc.get("function", {})
tool_name = func.get("name", "")
try:
tool_args = json.loads(func.get("arguments", "{}"))
except Exception:
tool_args = {}
# Light auto-repair for 1OK multi-step flows when model omits required args.
if request_agent_id == "oneok":
if tool_name == "calc_window_quote":
ip = (tool_args or {}).get("input_payload")
if isinstance(ip, dict) and isinstance(ip.get("windows"), list) and "window_units" not in ip:
ip2 = dict(ip)
ip2["window_units"] = ip.get("windows")
tool_args = dict(tool_args or {})
tool_args["input_payload"] = ip2
elif tool_name == "crm_create_quote":
quote_payload = (tool_args or {}).get("quote_payload")
if not isinstance(quote_payload, dict):
calc_res = oneok_ctx.get("calc_result") or {}
line_items = calc_res.get("line_items") if isinstance(calc_res, dict) else None
totals = calc_res.get("totals") if isinstance(calc_res, dict) else None
if isinstance(line_items, list) and isinstance(totals, dict):
tool_args = {
"quote_payload": {
"client_id": oneok_ctx.get("client_id"),
"site_id": oneok_ctx.get("site_id"),
"currency": calc_res.get("currency", "UAH"),
"line_items": line_items,
"totals": totals,
"assumptions": calc_res.get("assumptions", []),
"validity_days": 14,
"lead_time_estimate": calc_res.get("lead_time_if_known") or calc_res.get("lead_time_estimate"),
}
}
logger.info("🛠️ oneok: auto-filled crm_create_quote.quote_payload from calc context")
elif tool_name == "docs_render_quote_pdf":
quote_id = (tool_args or {}).get("quote_id")
quote_payload = (tool_args or {}).get("quote_payload")
if not quote_id and not isinstance(quote_payload, dict) and oneok_ctx.get("quote_id"):
tool_args = {"quote_id": oneok_ctx.get("quote_id")}
logger.info("🛠️ oneok: auto-filled docs_render_quote_pdf.quote_id from quote context")
elif tool_name == "schedule_propose_slots":
params = (tool_args or {}).get("params")
if not isinstance(params, dict):
tool_args = {"params": {"count": 3, "timezone": "Europe/Kyiv"}}
logger.info("🛠️ oneok: auto-filled schedule_propose_slots.params")
result = await tool_manager.execute_tool(
tool_name,
tool_args,
agent_id=request_agent_id,
chat_id=chat_id,
user_id=user_id,
)
tool_result_dict = {
"tool_call_id": tc.get("id", ""),
"name": tool_name,
"success": result.success,
"result": result.result,
"error": result.error,
"image_base64": result.image_base64,
"file_base64": result.file_base64,
"file_name": result.file_name,
"file_mime": result.file_mime,
}
if result.image_base64:
logger.info(f"🖼️ Tool {tool_name} generated image: {len(result.image_base64)} chars")
round_results.append(tool_result_dict)
all_tool_results.append(tool_result_dict)
# Track oneok context to help subsequent tool calls in the same request.
if request_agent_id == "oneok" and result.success and isinstance(result.result, dict):
if tool_name == "crm_upsert_client":
oneok_ctx["client_id"] = result.result.get("client_id") or oneok_ctx.get("client_id")
elif tool_name == "crm_upsert_site":
oneok_ctx["site_id"] = result.result.get("site_id") or oneok_ctx.get("site_id")
elif tool_name == "calc_window_quote":
oneok_ctx["calc_result"] = result.result
elif tool_name == "crm_create_quote":
oneok_ctx["quote_id"] = result.result.get("quote_id") or oneok_ctx.get("quote_id")
# Guardrail: stop if model repeats same failing tool call too many times.
sig = f"{tool_name}:{json.dumps(tool_args, ensure_ascii=False, sort_keys=True, default=str)}"
if result.success:
repeated_failures.pop(sig, None)
else:
repeated_failures[sig] = repeated_failures.get(sig, 0) + 1
if repeated_failures[sig] >= 3:
logger.warning(f"⚠️ Repeated failing tool call detected ({tool_name}) x{repeated_failures[sig]}; breaking iterative loop")
abort_loop_due_repeats = True
break
if abort_loop_due_repeats:
current_tool_calls = []
response_text = response_text or format_tool_calls_for_response(all_tool_results, fallback_mode="empty_response")
break
messages.append({"role": "assistant", "content": None, "tool_calls": current_tool_calls})
for tr in round_results:
messages.append({
"role": "tool",
"tool_call_id": tr["tool_call_id"],
"content": str(tr["result"]) if tr["success"] else f"Error: {tr['error']}"
})
logger.info("🔄 Calling LLM again after tool round")
loop_payload = {
"model": cloud["model"],
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": False
}
# Keep tools enabled for multi-step chains.
if tools_payload and cloud["name"] in ["deepseek", "mistral", "grok"]:
loop_payload["tools"] = tools_payload
loop_payload["tool_choice"] = "auto"
loop_headers = {"Content-Type": "application/json"}
if api_key:
loop_headers["Authorization"] = f"Bearer {api_key}"
loop_resp = await http_client.post(
f"{cloud['base_url']}/v1/chat/completions",
headers=loop_headers,
json=loop_payload,
timeout=cloud["timeout"]
)
if loop_resp.status_code != 200:
logger.error(f"{cloud['name'].upper()} loop call failed: {loop_resp.status_code} - {loop_resp.text[:200]}")
response_text = format_tool_calls_for_response(all_tool_results, fallback_mode="empty_response")
current_tool_calls = []
break
loop_data = loop_resp.json()
loop_message = loop_data.get("choices", [{}])[0].get("message", {})
response_text = loop_message.get("content", "") or ""
if not response_text and loop_message.get("reasoning_content"):
response_text = str(loop_message.get("reasoning_content", "")).strip()
tokens_used += loop_data.get("usage", {}).get("total_tokens", 0)
current_tool_calls = loop_message.get("tool_calls", [])
# DSML fallback parsing for providers that return markup instead of tool_calls.
has_dsml_loop = False
if response_text:
dsml_patterns_check = [
r'DSML',
r'function_calls>',
r'invoke\s*name\s*=',
r'parameter\s*name\s*=',
r'<[^>]*invoke[^>]*>',
r'</[^>]*invoke[^>]*>',
]
for pattern in dsml_patterns_check:
if re.search(pattern, response_text, re.IGNORECASE):
has_dsml_loop = True
logger.warning(f"⚠️ DSML detected in loop via pattern: {pattern}")
break
if has_dsml_loop and not current_tool_calls:
dsml_patterns = [
r'invoke name="(\w+)".*?parameter name="(\w+)"[^>]*>([^<]+)',
r'invoke\s+name="(\w+)".*?parameter\s+name="(\w+)"[^>]*>([^<]+)',
]
dsml_match = None
for pattern in dsml_patterns:
dsml_match = re.search(pattern, response_text, re.DOTALL | re.IGNORECASE)
if dsml_match:
break
if dsml_match and len(dsml_match.groups()) >= 3:
import string
import random
tool_call_id = ''.join(random.choices(string.ascii_letters + string.digits, k=9))
current_tool_calls = [{
"id": tool_call_id,
"function": {
"name": dsml_match.group(1),
"arguments": json.dumps({dsml_match.group(2): dsml_match.group(3).strip()})
}
}]
response_text = ""
tool_results = all_tool_results
if current_tool_calls:
logger.warning(f"⚠️ Reached max tool rounds ({max_tool_rounds}) for {request_agent_id}, returning summary")
response_text = response_text or format_tool_calls_for_response(tool_results, fallback_mode="empty_response")
if response_text and ("DSML" in response_text or "invoke name=" in response_text or "function_calls>" in response_text):
prefix_before_dsml = _strip_dsml_keep_text_before(response_text)
if prefix_before_dsml:
logger.warning(f"🧹 DSML in loop final response: keeping text before DSML ({len(prefix_before_dsml)} chars)")
response_text = prefix_before_dsml
else:
response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected")
if not response_text:
logger.warning(f"⚠️ {cloud['name'].upper()} returned empty response after iterative tool calls")
response_text = format_tool_calls_for_response(tool_results, fallback_mode="empty_response")
if response_text:
# FINAL DSML check before returning - never show DSML to user
if "DSML" in response_text or "invoke name=" in response_text or "function_calls>" in response_text:
prefix_before_dsml = _strip_dsml_keep_text_before(response_text)
if prefix_before_dsml:
logger.warning(f"🧹 DSML in final response: keeping text before DSML ({len(prefix_before_dsml)} chars)")
response_text = prefix_before_dsml
else:
logger.warning(f"🧹 DSML in final response! Replacing with fallback ({len(response_text)} chars)")
response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected")
# Check if any tool generated an image
generated_image = None
generated_file_base64 = None
generated_file_name = None
generated_file_mime = None
logger.debug(f"🔍 Checking {len(tool_results)} tool results for images...")
for tr in tool_results:
img_b64 = tr.get("image_base64")
if img_b64:
generated_image = img_b64
logger.info(f"🖼️ Image generated by tool: {tr['name']} ({len(img_b64)} chars)")
file_b64 = tr.get("file_base64")
if file_b64 and not generated_file_base64:
generated_file_base64 = file_b64
generated_file_name = tr.get("file_name")
generated_file_mime = tr.get("file_mime")
logger.info(f"📄 File generated by tool: {tr['name']} ({len(file_b64)} chars)")
if not img_b64:
logger.debug(f" Tool {tr['name']}: no image_base64")
logger.info(f"{cloud['name'].upper()} response received, {tokens_used} tokens")
# Store message in agent-specific memory (async, non-blocking)
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id:
asyncio.create_task(
memory_retrieval.store_message(
agent_id=request_agent_id,
user_id=user_id,
username=username,
message_text=request.prompt,
response_text=response_text,
chat_id=chat_id
)
)
return InferResponse(
response=response_text,
model=cloud["model"],
tokens_used=tokens_used,
backend=f"{cloud['name']}-cloud",
image_base64=generated_image,
file_base64=generated_file_base64,
file_name=generated_file_name,
file_mime=generated_file_mime,
)
else:
logger.warning(f"⚠️ {cloud['name'].upper()} returned empty response, trying next provider")
continue
else:
logger.warning(f"⚠️ {cloud['name'].upper()} returned {cloud_resp.status_code}, trying next...")
last_error = f"{cloud['name']}: {cloud_resp.status_code}"
continue
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.warning(f"⚠️ {cloud['name'].upper()} failed: {e}")
if not str(e).strip(): # Empty error string
logger.error(f"{cloud['name'].upper()} failed with empty error! Check traceback:")
logger.error(error_details)
else:
logger.debug(f"Full error traceback: {error_details}")
last_error = f"{cloud['name']}: {str(e)}"
continue
# If all cloud providers failed, log and fall through to local
if last_error:
logger.warning(f"⚠️ All cloud providers failed ({last_error}), falling back to local Ollama")
# =========================================================================
# LOCAL PROVIDERS (Ollama via Swapper)
# =========================================================================
# Determine local model from config (not hardcoded)
# Strategy:
# 1) explicit request.model override
# 2) agent default_llm if it's local (ollama)
# 3) first local profile fallback
local_model = None
requested_local_model = (request.model or "").strip()
if requested_local_model:
local_model = requested_local_model.replace(":", "-")
logger.info(f"🎛️ Local model override requested: {requested_local_model} -> {local_model}")
# Check if default_llm is local
if not local_model and llm_profile.get("provider") == "ollama":
# Extract model name and convert format (qwen3:8b → qwen3:8b for Swapper)
ollama_model = llm_profile.get("model", "qwen3:8b")
local_model = ollama_model.replace(":", "-") # qwen3:8b → qwen3:8b
logger.debug(f"✅ Using agent's default local model: {local_model}")
elif not local_model:
# Find first local model from config
for profile_name, profile in llm_profiles.items():
if profile.get("provider") == "ollama":
ollama_model = profile.get("model", "qwen3:8b")
local_model = ollama_model.replace(":", "-")
logger.info(f"🔄 Found fallback local model: {local_model} from profile {profile_name}")
break
# Final fallback if no local model found
if not local_model:
local_model = "qwen3:8b"
logger.warning(f"⚠️ No local model in config, using hardcoded fallback: {local_model}")
try:
# Check if Swapper is available
health_resp = await http_client.get(f"{SWAPPER_URL}/health", timeout=5.0)
if health_resp.status_code == 200:
logger.info(f"📡 Calling Swapper with local model: {local_model}")
# Generate response via Swapper (which handles model loading)
generate_resp = await http_client.post(
f"{SWAPPER_URL}/generate",
json={
"model": local_model,
"prompt": request.prompt,
"system": system_prompt,
"max_tokens": request.max_tokens,
"temperature": request.temperature,
"stream": False
},
timeout=300.0
)
if generate_resp.status_code == 200:
data = generate_resp.json()
local_response = _normalize_text_response(data.get("response", ""))
# Empty-answer gate for selected local top-level agents.
if request_agent_id in EMPTY_ANSWER_GUARD_AGENTS and _needs_empty_answer_recovery(local_response):
logger.warning(f"⚠️ Empty-answer gate triggered for {request_agent_id}, retrying local generate once")
retry_prompt = (
f"{request.prompt}\n\n"
"Відповідай коротко і конкретно (2-5 речень), без службових або мета-фраз."
)
retry_resp = await http_client.post(
f"{SWAPPER_URL}/generate",
json={
"model": local_model,
"prompt": retry_prompt,
"system": system_prompt,
"max_tokens": request.max_tokens,
"temperature": request.temperature,
"stream": False
},
timeout=300.0
)
if retry_resp.status_code == 200:
retry_data = retry_resp.json()
retry_text = _normalize_text_response(retry_data.get("response", ""))
if retry_text and not _needs_empty_answer_recovery(retry_text):
local_response = retry_text
if _needs_empty_answer_recovery(local_response):
local_response = (
"Я не отримав корисну відповідь з першої спроби. "
"Сформулюй запит коротко ще раз, і я відповім конкретно."
)
# Store in agent-specific memory
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval and chat_id and user_id and local_response:
asyncio.create_task(
memory_retrieval.store_message(
agent_id=request_agent_id,
user_id=user_id,
username=username,
message_text=request.prompt,
response_text=local_response,
chat_id=chat_id
)
)
return InferResponse(
response=local_response,
model=local_model,
tokens_used=data.get("eval_count", 0),
backend="swapper+ollama"
)
else:
logger.error(f"❌ Swapper error: {generate_resp.status_code} - {generate_resp.text}")
except Exception as e:
logger.error(f"❌ Swapper/Ollama error: {e}")
# Fallback to direct Ollama if Swapper fails
try:
logger.info(f"🔄 Falling back to direct Ollama connection")
generate_resp = await http_client.post(
f"{VISION_URL}/api/generate",
json={
"model": "qwen3:8b", # Use actual Ollama model name
"prompt": request.prompt,
"system": system_prompt,
"stream": False,
"options": {
"num_predict": request.max_tokens,
"temperature": request.temperature
}
},
timeout=120.0
)
if generate_resp.status_code == 200:
data = generate_resp.json()
return InferResponse(
response=data.get("response", ""),
model=model,
tokens_used=data.get("eval_count", 0),
backend="ollama-direct"
)
except Exception as e2:
logger.error(f"❌ Direct Ollama fallback also failed: {e2}")
# Fallback: return error
raise HTTPException(
status_code=503,
detail=f"No backend available for model: {model}"
)
@app.post("/v1/tools/execute")
async def tools_execute(request: ToolExecuteRequest):
"""
Execute a single tool call through ToolManager.
Returns console-compatible shape: {status, data, error}.
"""
if not tool_manager:
raise HTTPException(status_code=503, detail="Tool manager unavailable")
payload = request.model_dump(exclude_none=True)
tool_name = str(payload.pop("tool", "")).strip()
action = payload.pop("action", None)
agent_id = str(payload.pop("agent_id", "sofiia") or "sofiia").strip()
metadata = payload.pop("metadata", {}) or {}
if not tool_name:
raise HTTPException(status_code=422, detail="tool is required")
# Keep backward compatibility with sofiia-console calls
if action is not None:
payload["action"] = action
chat_id = str(metadata.get("chat_id", "") or "") or None
user_id = str(metadata.get("user_id", "") or "") or None
workspace_id = str(metadata.get("workspace_id", "default") or "default")
try:
result = await tool_manager.execute_tool(
tool_name=tool_name,
arguments=payload,
agent_id=agent_id,
chat_id=chat_id,
user_id=user_id,
workspace_id=workspace_id,
)
except Exception as e:
logger.exception("❌ Tool execution failed: %s", tool_name)
raise HTTPException(status_code=500, detail=f"Tool execution error: {str(e)[:200]}")
data: Dict[str, Any] = {"result": result.result}
if result.image_base64:
data["image_base64"] = result.image_base64
if result.file_base64:
data["file_base64"] = result.file_base64
if result.file_name:
data["file_name"] = result.file_name
if result.file_mime:
data["file_mime"] = result.file_mime
if result.success:
return {"status": "ok", "data": data, "error": None}
return {"status": "failed", "data": data, "error": {"message": result.error or "Tool failed"}}
@app.get("/v1/models")
async def list_available_models():
"""List all available models across backends"""
models = []
# Get Swapper models
try:
resp = await http_client.get(f"{SWAPPER_URL}/models", timeout=5.0)
if resp.status_code == 200:
data = resp.json()
for m in data.get("models", []):
models.append({
"id": m.get("name"),
"backend": "swapper",
"size_gb": m.get("size_gb"),
"status": m.get("status", "available")
})
except Exception as e:
logger.warning(f"Cannot get Swapper models: {e}")
# Get Ollama models
try:
resp = await http_client.get(f"{VISION_URL}/api/tags", timeout=5.0)
if resp.status_code == 200:
data = resp.json()
for m in data.get("models", []):
# Avoid duplicates
model_name = m.get("name")
if not any(x.get("id") == model_name for x in models):
models.append({
"id": model_name,
"backend": "ollama",
"size_gb": round(m.get("size", 0) / 1e9, 1),
"status": "loaded"
})
except Exception as e:
logger.warning(f"Cannot get Ollama models: {e}")
return {"models": models, "total": len(models)}
# =============================================================================
# NEO4J GRAPH API ENDPOINTS
# =============================================================================
class GraphNode(BaseModel):
"""Model for creating/updating a graph node"""
label: str # Node type: User, Agent, Topic, Fact, Entity, etc.
properties: Dict[str, Any]
node_id: Optional[str] = None # If provided, update existing node
class GraphRelationship(BaseModel):
"""Model for creating a relationship between nodes"""
from_node_id: str
to_node_id: str
relationship_type: str # KNOWS, MENTIONED, RELATED_TO, CREATED_BY, etc.
properties: Optional[Dict[str, Any]] = None
class GraphQuery(BaseModel):
"""Model for querying the graph"""
cypher: Optional[str] = None # Direct Cypher query (advanced)
# Or use structured query:
node_label: Optional[str] = None
node_id: Optional[str] = None
relationship_type: Optional[str] = None
depth: int = 1 # How many hops to traverse
limit: int = 50
class GraphSearchRequest(BaseModel):
"""Natural language search in graph"""
query: str
entity_types: Optional[List[str]] = None # Filter by types
limit: int = 20
@app.post("/v1/graph/nodes")
async def create_graph_node(node: GraphNode):
"""Create or update a node in the knowledge graph"""
if not neo4j_available or not neo4j_driver:
raise HTTPException(status_code=503, detail="Neo4j not available")
try:
async with neo4j_driver.session() as session:
# Generate node_id if not provided
node_id = node.node_id or f"{node.label.lower()}_{os.urandom(8).hex()}"
# Build properties with node_id
props = {**node.properties, "node_id": node_id, "updated_at": "datetime()"}
# Create or merge node
cypher = f"""
MERGE (n:{node.label} {{node_id: $node_id}})
SET n += $properties
SET n.updated_at = datetime()
RETURN n
"""
result = await session.run(cypher, node_id=node_id, properties=node.properties)
record = await result.single()
if record:
created_node = dict(record["n"])
logger.info(f"📊 Created/updated node: {node.label} - {node_id}")
return {"status": "ok", "node_id": node_id, "node": created_node}
raise HTTPException(status_code=500, detail="Failed to create node")
except Exception as e:
logger.error(f"❌ Neo4j error creating node: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/v1/graph/relationships")
async def create_graph_relationship(rel: GraphRelationship):
"""Create a relationship between two nodes"""
if not neo4j_available or not neo4j_driver:
raise HTTPException(status_code=503, detail="Neo4j not available")
try:
async with neo4j_driver.session() as session:
props_clause = ""
if rel.properties:
props_clause = " SET r += $properties"
cypher = f"""
MATCH (a {{node_id: $from_id}})
MATCH (b {{node_id: $to_id}})
MERGE (a)-[r:{rel.relationship_type}]->(b)
{props_clause}
SET r.created_at = datetime()
RETURN a.node_id as from_id, b.node_id as to_id, type(r) as rel_type
"""
result = await session.run(
cypher,
from_id=rel.from_node_id,
to_id=rel.to_node_id,
properties=rel.properties or {}
)
record = await result.single()
if record:
logger.info(f"🔗 Created relationship: {rel.from_node_id} -[{rel.relationship_type}]-> {rel.to_node_id}")
return {
"status": "ok",
"from_id": record["from_id"],
"to_id": record["to_id"],
"relationship_type": record["rel_type"]
}
raise HTTPException(status_code=404, detail="One or both nodes not found")
except Exception as e:
logger.error(f"❌ Neo4j error creating relationship: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/v1/graph/query")
async def query_graph(query: GraphQuery):
"""Query the knowledge graph"""
if not neo4j_available or not neo4j_driver:
raise HTTPException(status_code=503, detail="Neo4j not available")
try:
async with neo4j_driver.session() as session:
# If direct Cypher provided, use it (with safety check)
if query.cypher:
# Basic safety: only allow read queries
if any(kw in query.cypher.upper() for kw in ["DELETE", "REMOVE", "DROP", "CREATE", "MERGE", "SET"]):
raise HTTPException(status_code=400, detail="Only read queries allowed via cypher parameter")
result = await session.run(query.cypher)
records = await result.data()
return {"status": "ok", "results": records, "count": len(records)}
# Build structured query
if query.node_id:
# Get specific node with relationships
cypher = f"""
MATCH (n {{node_id: $node_id}})
OPTIONAL MATCH (n)-[r]-(related)
RETURN n, collect({{rel: type(r), node: related}}) as connections
LIMIT 1
"""
result = await session.run(cypher, node_id=query.node_id)
elif query.node_label:
# Get nodes by label
cypher = f"""
MATCH (n:{query.node_label})
RETURN n
ORDER BY n.updated_at DESC
LIMIT $limit
"""
result = await session.run(cypher, limit=query.limit)
else:
# Get recent nodes
cypher = """
MATCH (n)
RETURN n, labels(n) as labels
ORDER BY n.updated_at DESC
LIMIT $limit
"""
result = await session.run(cypher, limit=query.limit)
records = await result.data()
return {"status": "ok", "results": records, "count": len(records)}
except Exception as e:
logger.error(f"❌ Neo4j query error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/v1/graph/search")
async def search_graph(q: str, types: Optional[str] = None, limit: int = 20):
"""Search nodes by text in properties"""
if not neo4j_available or not neo4j_driver:
raise HTTPException(status_code=503, detail="Neo4j not available")
try:
async with neo4j_driver.session() as session:
# Build label filter
label_filter = ""
if types:
labels = [t.strip() for t in types.split(",")]
label_filter = " AND (" + " OR ".join([f"n:{l}" for l in labels]) + ")"
# Search in common text properties
cypher = f"""
MATCH (n)
WHERE (
n.name CONTAINS $query OR
n.title CONTAINS $query OR
n.text CONTAINS $query OR
n.description CONTAINS $query OR
n.content CONTAINS $query
){label_filter}
RETURN n, labels(n) as labels
ORDER BY n.updated_at DESC
LIMIT $limit
"""
result = await session.run(cypher, query=q, limit=limit)
records = await result.data()
return {"status": "ok", "query": q, "results": records, "count": len(records)}
except Exception as e:
logger.error(f"❌ Neo4j search error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/v1/graph/stats")
async def get_graph_stats():
"""Get knowledge graph statistics"""
if not neo4j_available or not neo4j_driver:
raise HTTPException(status_code=503, detail="Neo4j not available")
try:
async with neo4j_driver.session() as session:
# Get node counts by label
labels_result = await session.run("""
CALL db.labels() YIELD label
CALL apoc.cypher.run('MATCH (n:`' + label + '`) RETURN count(n) as count', {}) YIELD value
RETURN label, value.count as count
""")
# If APOC not available, use simpler query
try:
labels_data = await labels_result.data()
except:
labels_result = await session.run("""
MATCH (n)
RETURN labels(n)[0] as label, count(*) as count
ORDER BY count DESC
""")
labels_data = await labels_result.data()
# Get relationship counts
rels_result = await session.run("""
MATCH ()-[r]->()
RETURN type(r) as type, count(*) as count
ORDER BY count DESC
""")
rels_data = await rels_result.data()
# Get total counts
total_result = await session.run("""
MATCH (n) RETURN count(n) as nodes
""")
total_nodes = (await total_result.single())["nodes"]
total_rels_result = await session.run("""
MATCH ()-[r]->() RETURN count(r) as relationships
""")
total_rels = (await total_rels_result.single())["relationships"]
return {
"status": "ok",
"total_nodes": total_nodes,
"total_relationships": total_rels,
"nodes_by_label": labels_data,
"relationships_by_type": rels_data
}
except Exception as e:
logger.error(f"❌ Neo4j stats error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup connections on shutdown"""
global neo4j_driver, http_client, nc
# Close Memory Retrieval
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval:
try:
await memory_retrieval.close()
logger.info("🔌 Memory Retrieval closed")
except Exception as e:
logger.warning(f"⚠️ Memory Retrieval close error: {e}")
if neo4j_driver:
await neo4j_driver.close()
logger.info("🔌 Neo4j connection closed")
if http_client:
await http_client.aclose()
logger.info("🔌 HTTP client closed")
if nc:
await nc.close()
logger.info("🔌 NATS connection closed")