From a92c4248452c39039fb90ff36492edad3ae6cd1f Mon Sep 17 00:00:00 2001 From: Apple Date: Fri, 27 Feb 2026 02:26:12 -0800 Subject: [PATCH] P2: Global multi-node model selection + NCS on NODA1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Architecture for 150+ nodes: - global_capabilities_client.py: NATS scatter-gather discovery using wildcard subject node.*.capabilities.get — zero static node lists. New nodes auto-register by deploying NCS and subscribing to NATS. Dead nodes expire from cache after 3x TTL automatically. Multi-node model_select.py: - ModelSelection now includes node, local, via_nats fields - select_best_model prefers local candidates, then remote - Prefer list resolution: local first, remote second - All logged per request: node, runtime, model, local/remote NODA1 compose: - Added node-capabilities service (NCS) to docker-compose.node1.yml - NATS subscription: node.noda1.capabilities.get - Router env: NODE_CAPABILITIES_URL + ENABLE_GLOBAL_CAPS_NATS=true NODA2 compose: - Router env: ENABLE_GLOBAL_CAPS_NATS=true Router main.py: - Startup: initializes global_capabilities_client (NATS connect + first discovery). Falls back to local-only capabilities_client if unavailable. - /infer: uses get_global_capabilities() for cross-node model pool - Offload support: send_offload_request(node_id, type, payload) via NATS Verified on NODA2: - Global caps: 1 node, 14 models (NODA1 not yet deployed) - Sofiia: cloud_grok → grok-4-1-fast-reasoning (OK) - Helion: NCS → qwen3:14b local (OK) - When NODA1 deploys NCS, its models appear automatically via NATS discovery Made-with: Cursor --- docker-compose.node1.yml | 256 +++++++++++++++++- docker-compose.node2-sofiia.yml | 3 +- services/router/global_capabilities_client.py | 245 +++++++++++++++++ services/router/main.py | 41 ++- services/router/model_select.py | 92 ++++--- 5 files changed, 575 insertions(+), 62 deletions(-) create mode 100644 services/router/global_capabilities_client.py diff --git a/docker-compose.node1.yml b/docker-compose.node1.yml index ca8c80ad..6e83c44e 100644 --- a/docker-compose.node1.yml +++ b/docker-compose.node1.yml @@ -27,7 +27,7 @@ services: - DEEPSEEK_API_KEY=sk-0db94e8193ec4a6e9acd593ee8d898e7 - MISTRAL_API_KEY=40Gwjo8nVBx4i4vIkgszvXw9bOwDOu4G - COHERE_API_KEY=nOdOXnuepLku2ipJWpe6acWgAsJCsDhMO0RnaEJB - - GROK_API_KEY=xai-69zEnDse8qRuQyZATs9jVKgfwdyvkHzgEVrTbV0OTAurZqsjHmvGepXG6H9GhVRYEC7E4NFl6iZeG0ww + - GROK_API_KEY=xai-CpoLMPgw91NP9AEdHPhIrvU4ZnhV1q1P8BJBKCpD5kTPFRXJmTOkgGNHwYdZpXMlRxBgHcgcSlIXccxh - VISION_ENCODER_URL=http://vision-encoder:8001 - SWAPPER_SERVICE_URL=http://swapper-service:8890 - IMAGE_GEN_URL=http://swapper-service:8890/image/generate @@ -35,12 +35,22 @@ services: - STT_SERVICE_UPLOAD_URL=http://swapper-service:8890/stt - OCR_SERVICE_URL=http://swapper-service:8890 - WEB_SEARCH_SERVICE_URL=http://swapper-service:8890 + - REDIS_URL=redis://redis:6379/0 + - CREWAI_SERVICE_URL=http://dagi-staging-crewai-service:9010 + - NATURE_ID_URL=http://plant-vision-node1:8085 + - NATURE_ID_MIN_CONFIDENCE=0.65 + - PLANTNET_API_KEY=${PLANTNET_API_KEY} - ONEOK_CRM_BASE_URL=http://oneok-crm-adapter:8088 - ONEOK_CALC_BASE_URL=http://oneok-calc-adapter:8089 - ONEOK_DOCS_BASE_URL=http://oneok-docs-adapter:8090 - ONEOK_SCHEDULE_BASE_URL=http://oneok-schedule-adapter:8091 - ONEOK_ADAPTER_API_KEY=${ONEOK_ADAPTER_API_KEY} - ROUTER_TOOL_MAX_ROUNDS=${ROUTER_TOOL_MAX_ROUNDS:-10} + - AGROMATRIX_REVIEW_AUTH_MODE=${AGROMATRIX_REVIEW_AUTH_MODE:-bearer} + - AGROMATRIX_REVIEW_BEARER_TOKENS=${AGROMATRIX_REVIEW_BEARER_TOKENS} + # ── Node Capabilities (multi-node model selection) ── + - NODE_CAPABILITIES_URL=http://node-capabilities:8099/capabilities + - ENABLE_GLOBAL_CAPS_NATS=true volumes: - ${DEPLOY_ROOT:-.}/services/router/router_config.yaml:/app/router_config.yaml:ro - ${DEPLOY_ROOT:-.}/services/router/router-config.yml:/app/router-config.yml:ro @@ -77,7 +87,7 @@ services: - CUDA_VISIBLE_DEVICES=0 - CRAWL4AI_URL=http://crawl4ai:11235 # Cloud API keys for video/image generation - - GROK_API_KEY=xai-69zEnDse8qRuQyZATs9jVKgfwdyvkHzgEVrTbV0OTAurZqsjHmvGepXG6H9GhVRYEC7E4NFl6iZeG0ww + - GROK_API_KEY=xai-CpoLMPgw91NP9AEdHPhIrvU4ZnhV1q1P8BJBKCpD5kTPFRXJmTOkgGNHwYdZpXMlRxBgHcgcSlIXccxh - MISTRAL_API_KEY=40Gwjo8nVBx4i4vIkgszvXw9bOwDOu4G volumes: - ${DEPLOY_ROOT:-.}/services/swapper-service/config/swapper_config_node1.yaml:/app/config/swapper_config.yaml:ro @@ -106,6 +116,28 @@ services: # Image Generation тепер інтегровано в Swapper Service (lazy loading) # Endpoint: POST /image/generate на swapper-service:8890 + # Plant Vision wrapper (local nature-id CLI -> HTTP) + plant-vision-node1: + build: + context: ./services/plant-vision-node1 + dockerfile: Dockerfile + container_name: plant-vision-node1 + environment: + - NATURE_ID_CMD=${NATURE_ID_CMD:-python /opt/nature-id/nature_id.py -m plants -l -r 5 -s {image_path}} + - NATURE_ID_TIMEOUT=40 + - DOWNLOAD_TIMEOUT=20 + networks: + - dagi-network + volumes: + - ${DEPLOY_ROOT:-.}/third_party/nature-id:/opt/nature-id:ro + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "python -c \"import urllib.request; urllib.request.urlopen('http://localhost:8085/health')\""] + interval: 30s + timeout: 10s + retries: 3 + start_period: 15s + # Crawl4AI - Advanced Web Crawler with JavaScript support crawl4ai: image: unclecode/crawl4ai@sha256:4d8b065bf185962733cb5f9701f4122d03383fa1ab6b5f6a9873f04fa0416a84 @@ -134,7 +166,11 @@ services: ports: - "9300:9300" environment: - - ROUTER_URL=http://router:8000 + - ROUTER_URL=${ROUTER_URL:-http://dagi-staging-router:8000} + - GATEWAY_MAX_TOKENS_CONCISE=350 + - GATEWAY_MAX_TOKENS_SENPAI_DEFAULT=700 + - GATEWAY_MAX_TOKENS_DEFAULT=700 + - GATEWAY_MAX_TOKENS_DETAILED=1200 - SERVICE_ID=gateway - SERVICE_ROLE=gateway - BRAND_INTAKE_URL=http://brand-intake:9211 @@ -191,12 +227,25 @@ services: - STT_SERVICE_UPLOAD_URL=http://swapper-service:8890/stt - OCR_SERVICE_URL=http://swapper-service:8890 - WEB_SEARCH_SERVICE_URL=http://swapper-service:8890 + - REDIS_URL=redis://redis:6379/0 + - CREWAI_SERVICE_URL=http://dagi-staging-crewai-service:9010 + - AGROMATRIX_REVIEW_AUTH_MODE=${AGROMATRIX_REVIEW_AUTH_MODE:-bearer} + - AGROMATRIX_REVIEW_BEARER_TOKENS=${AGROMATRIX_REVIEW_BEARER_TOKENS} + # v4.3 FarmOS integration (fail-closed: якщо пусто — агент повідомить "не налаштований") + - FARMOS_BASE_URL=http://dagi-farmos-node1 + - FARMOS_TOKEN=${FARMOS_TOKEN:-} + - FARMOS_USER=${FARMOS_USER:-} + - FARMOS_PASS=${FARMOS_PASS:-} + - FARMOS_CLIENT_ID=${FARMOS_CLIENT_ID:-farm} + env_file: + - .env.stepan.node1 volumes: - ${DEPLOY_ROOT:-.}/gateway-bot:/app/gateway-bot:ro - ${DEPLOY_ROOT:-.}/logs:/app/logs depends_on: - router - memory-service + - redis networks: - dagi-network restart: unless-stopped @@ -207,6 +256,107 @@ services: retries: 3 start_period: 10s + + gateway-worker: + build: + context: ./gateway-bot + dockerfile: Dockerfile + container_name: dagi-gateway-worker-node1 + command: ["python", "-m", "daarion_facade.worker"] + environment: + - ROUTER_BASE_URL=http://router:8000 + - REDIS_URL=redis://redis:6379/0 + - ROUTER_WORKER_TIMEOUT=60 + volumes: + - ${DEPLOY_ROOT:-.}/gateway-bot:/app/gateway-bot:ro + - ${DEPLOY_ROOT:-.}/logs:/app/logs + depends_on: + - router + - redis + networks: + - dagi-network + restart: unless-stopped + healthcheck: + test: ["CMD", "python", "-c", "print(\"ok\")"] + interval: 30s + timeout: 5s + retries: 3 + + + gateway-reminder-worker: + build: + context: ./gateway-bot + dockerfile: Dockerfile + container_name: dagi-gateway-reminder-worker-node1 + command: ["python", "-m", "daarion_facade.reminder_worker"] + environment: + - REDIS_URL=redis://redis:6379/0 + - DAARION_REMINDER_POLL_SECONDS=${DAARION_REMINDER_POLL_SECONDS:-2} + - DAARION_REMINDER_TTL_SECONDS=${DAARION_REMINDER_TTL_SECONDS:-2592000} + - DAARION_REMINDER_DEFAULT_TZ=${DAARION_REMINDER_DEFAULT_TZ:-Europe/Kyiv} + - GLOBAL_RELAY_ALLOWED_USER_IDS=${GLOBAL_RELAY_ALLOWED_USER_IDS:-} + - MENTOR_PRIVATE_HANDLES=${MENTOR_PRIVATE_HANDLES:-ivantytar,archenvis,olegarch88} + - MENTOR_PRIVATE_NAMES=${MENTOR_PRIVATE_NAMES:-Іван Титар,Александр Вертій,Олег Ковальчук} + - MENTOR_DISCLOSURE_ALLOWED_USER_IDS=${MENTOR_DISCLOSURE_ALLOWED_USER_IDS:-} + - HELION_MENTOR_CHAT_IDS=${HELION_MENTOR_CHAT_IDS:-} + - HELION_RELAY_ALLOWED_USER_IDS=${HELION_RELAY_ALLOWED_USER_IDS:-} + - DAARWIZZ_TELEGRAM_BOT_TOKEN=${DAARWIZZ_TELEGRAM_BOT_TOKEN:-8323412397:AAGZbAR22LuOiGD8xVC3OXMjahQ8rs2lJwo} + - HELION_TELEGRAM_BOT_TOKEN=${HELION_TELEGRAM_BOT_TOKEN:-8112062582:AAGS-HwRLEI269lDutLtAJTFArsIq31YNhE} + - GREENFOOD_TELEGRAM_BOT_TOKEN=${GREENFOOD_TELEGRAM_BOT_TOKEN:-7495165343:AAGR1XEOzg7DkPFPCzL_eYLCJfxJuonCxug} + - AGROMATRIX_TELEGRAM_BOT_TOKEN=${AGROMATRIX_TELEGRAM_BOT_TOKEN:-8580290441:AAFuDBmFJtpl-3I_WfkH7Hkb59X0fhYNMOE} + - ALATEYA_TELEGRAM_BOT_TOKEN=${ALATEYA_TELEGRAM_BOT_TOKEN:-8436880945:AAEi-HS6GEctddoqBUd37MHfweZQP-OjRlo} + - NUTRA_TELEGRAM_BOT_TOKEN=${NUTRA_TELEGRAM_BOT_TOKEN:-8517315428:AAGTLcKxBAZDsMgx28agKTvl1SqJGi0utH4} + - DRUID_TELEGRAM_BOT_TOKEN=${DRUID_TELEGRAM_BOT_TOKEN:-8145618489:AAFR714mBsNmiuF-rjCw-295iORBReJQZ70} + - CLAN_TELEGRAM_BOT_TOKEN=${CLAN_TELEGRAM_BOT_TOKEN:-8516872152:AAHH26wU8hJZJbSCJXb4vbmPmakTP77ok5E} + - EONARCH_TELEGRAM_BOT_TOKEN=${EONARCH_TELEGRAM_BOT_TOKEN:-7962391584:AAFYkelLRG3VR_Lxuu6pEGG76t4vZdANtz4} + - SENPAI_TELEGRAM_BOT_TOKEN=${SENPAI_TELEGRAM_BOT_TOKEN:-8510265026:AAGFrFBIIEihsLptZSxuKdmW2RoRPQDY9FE} + - ONEOK_TELEGRAM_BOT_TOKEN=${ONEOK_TELEGRAM_BOT_TOKEN} + - SOUL_TELEGRAM_BOT_TOKEN=${SOUL_TELEGRAM_BOT_TOKEN:-8041596416:AAHhpfCtY8paCm_9AD-4stJJg-Vw-CBf6Qk} + - YAROMIR_TELEGRAM_BOT_TOKEN=${YAROMIR_TELEGRAM_BOT_TOKEN:-8128180674:AAGNZdG3LwECI4z_803smsuRHsK3nPdjMLY} + - SOFIIA_TELEGRAM_BOT_TOKEN=${SOFIIA_TELEGRAM_BOT_TOKEN:-8589292566:AAEmPvS6nY9e-Y-TZm04CAHWlaFnWVxajE4} + volumes: + - ${DEPLOY_ROOT:-.}/gateway-bot:/app/gateway-bot:ro + - ${DEPLOY_ROOT:-.}/logs:/app/logs + depends_on: + - redis + networks: + - dagi-network + restart: unless-stopped + healthcheck: + test: ["CMD", "python", "-c", "print(\"ok\")"] + interval: 30s + timeout: 5s + retries: 3 + + + metrics-poller-node1: + build: + context: ./gateway-bot + dockerfile: Dockerfile + container_name: dagi-metrics-poller-node1 + command: ["python", "-m", "daarion_facade.metrics_poller"] + environment: + - REDIS_URL=redis://redis:6379/0 + - MEMORY_SERVICE_URL=http://memory-service:8000 + - DAARION_METRICS_POLL_INTERVAL_SECONDS=${DAARION_METRICS_POLL_INTERVAL_SECONDS:-10} + - DAARION_METRICS_TTL_SECONDS=${DAARION_METRICS_TTL_SECONDS:-60} + - DAARION_METRICS_HTTP_CONNECT_TIMEOUT_SECONDS=${DAARION_METRICS_HTTP_CONNECT_TIMEOUT_SECONDS:-2} + - DAARION_METRICS_HTTP_TOTAL_TIMEOUT_SECONDS=${DAARION_METRICS_HTTP_TOTAL_TIMEOUT_SECONDS:-5} + - DAARION_NODE_COUNT=${DAARION_NODE_COUNT:-1} + volumes: + - ${DEPLOY_ROOT:-.}/gateway-bot:/app/gateway-bot:ro + - ${DEPLOY_ROOT:-.}/logs:/app/logs + depends_on: + - redis + - memory-service + networks: + - dagi-network + restart: unless-stopped + healthcheck: + test: ["CMD", "python", "-c", "print(\"ok\")"] + interval: 30s + timeout: 5s + retries: 3 # CLAN Consent Outbox Worker (Postgres event-store applier; no execute) clan-consent-outbox-worker: build: @@ -340,6 +490,29 @@ services: - dagi-network restart: unless-stopped + # Node Capabilities Service (model inventory for router) + node-capabilities: + build: + context: ./services/node-capabilities + dockerfile: Dockerfile + container_name: node-capabilities-node1 + environment: + - NODE_ID=noda1 + - OLLAMA_BASE_URL=http://host.docker.internal:11434 + - SWAPPER_URL=http://swapper-service:8890 + - CACHE_TTL_SEC=15 + - ENABLE_NATS_CAPS=true + - NATS_URL=nats://nats:4222 + extra_hosts: + - "host.docker.internal:host-gateway" + depends_on: + - nats + networks: + dagi-network: + aliases: + - node-capabilities + restart: unless-stopped + # NATS (JetStream) nats: image: nats:2.10-alpine @@ -736,10 +909,11 @@ services: ports: - "9108:9108" environment: - - GATEWAY_URL=http://172.18.0.18:9300 + - GATEWAY_URL=http://gateway:9300 - PROBE_INTERVAL=60 - PROBE_TIMEOUT=30 - METRICS_PORT=9108 + - SEMANTIC_AGENTS=clan,sofiia,monitor,helion,agromatrix,senpai networks: - dagi-network restart: unless-stopped @@ -819,6 +993,72 @@ services: retries: 5 start_period: 15s + binance-bot-monitor: + build: + context: ./services/binance-bot-monitor + dockerfile: Dockerfile + container_name: dagi-binance-bot-monitor-node1 + restart: unless-stopped + environment: + - REDIS_URL=redis://redis:6379/0 + - CRAWL4AI_URL=http://crawl4ai:11235 + - SWAPPER_URL=http://swapper-service:8890 + - BINANCE_CACHE_TTL=3600 + - BINANCE_REFRESH_INTERVAL=1800 + - BINANCE_API_KEY=${BINANCE_API_KEY:-} + - BINANCE_SECRET_KEY=${BINANCE_SECRET_KEY:-} + networks: + - dagi-network + + # ── FarmOS (v4.3 integration) ──────────────────────────────────────────────── + # PostgreSQL для farmOS (окрема БД, не чіпає dagi-postgres) + dagi-farmos-db-node1: + image: postgres:16-alpine + container_name: dagi-farmos-db-node1 + restart: unless-stopped + environment: + - POSTGRES_DB=farmos + - POSTGRES_USER=farmos + - POSTGRES_PASSWORD=${FARMOS_DB_PASS} + volumes: + - farmos-db-data-node1:/var/lib/postgresql/data + networks: + - dagi-network + healthcheck: + test: ["CMD-SHELL", "pg_isready -U farmos -d farmos"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 15s + + # farmOS Drupal application (4.x — актуальна стабільна, amd64 для x86_64 сервера) + dagi-farmos-node1: + image: farmos/farmos:4.x-amd64 + container_name: dagi-farmos-node1 + restart: unless-stopped + depends_on: + dagi-farmos-db-node1: + condition: service_healthy + environment: + - FARMOS_DB_HOST=dagi-farmos-db-node1 + - FARMOS_DB_NAME=farmos + - FARMOS_DB_USER=farmos + - FARMOS_DB_PASSWORD=${FARMOS_DB_PASS} + - FARMOS_DB_DRIVER=pgsql + volumes: + - farmos-sites-node1:/opt/drupal/web/sites + networks: + - dagi-network + ports: + # Доступний тільки локально; для браузерного setup — SSH tunnel: ssh -L 8088:localhost:8088 + - "127.0.0.1:8088:80" + healthcheck: + test: ["CMD-SHELL", "curl -fsS http://localhost:80 -o /dev/null || exit 1"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 60s + volumes: qdrant-data-node1: @@ -871,6 +1111,14 @@ volumes: name: oneok-crm-data-node1 driver: local + # farmOS persistent volumes (v4.3) + farmos-db-data-node1: + name: farmos-db-data-node1 + driver: local + farmos-sites-node1: + name: farmos-sites-node1 + driver: local + networks: dagi-network: external: true diff --git a/docker-compose.node2-sofiia.yml b/docker-compose.node2-sofiia.yml index 6ce9e5be..3bfd2ce4 100644 --- a/docker-compose.node2-sofiia.yml +++ b/docker-compose.node2-sofiia.yml @@ -25,8 +25,9 @@ services: - XAI_API_KEY=${XAI_API_KEY} - GROK_API_KEY=${XAI_API_KEY} - DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY:-} - # ── Node Capabilities ───────────────────────────────────────────────── + # ── Node Capabilities (multi-node model selection) ──────────────────── - NODE_CAPABILITIES_URL=http://node-capabilities:8099/capabilities + - ENABLE_GLOBAL_CAPS_NATS=true # ── Persistence backends ────────────────────────────────────────────── - ALERT_BACKEND=postgres - ALERT_DATABASE_URL=${ALERT_DATABASE_URL:-${DATABASE_URL}} diff --git a/services/router/global_capabilities_client.py b/services/router/global_capabilities_client.py new file mode 100644 index 00000000..633d76d1 --- /dev/null +++ b/services/router/global_capabilities_client.py @@ -0,0 +1,245 @@ +"""Global Capabilities Client — aggregates model capabilities across all nodes. + +Design for 150+ nodes: +- Local NCS: HTTP (fast, always available) +- Remote nodes: NATS request/reply with wildcard discovery + - node.*.capabilities.get → each NCS replies with its capabilities + - No static node list needed — new nodes auto-register by subscribing + - scatter-gather pattern: send one request, collect N replies +- TTL cache per node, stale nodes expire automatically +""" +import asyncio +import json +import logging +import os +import time +from typing import Any, Dict, List, Optional + +import httpx + +logger = logging.getLogger("global_caps") + +LOCAL_NCS_URL = os.getenv("NODE_CAPABILITIES_URL", "") +LOCAL_NODE_ID = os.getenv("NODE_ID", "unknown") +NATS_URL = os.getenv("NATS_URL", "nats://nats:4222") +CACHE_TTL = int(os.getenv("GLOBAL_CAPS_TTL", "30")) +NATS_DISCOVERY_TIMEOUT_MS = int(os.getenv("NATS_DISCOVERY_TIMEOUT_MS", "500")) +NATS_ENABLED = os.getenv("ENABLE_GLOBAL_CAPS_NATS", "true").lower() in ("true", "1") + +CAPS_DISCOVERY_SUBJECT = "node.*.capabilities.get" +CAPS_INBOX_PREFIX = "_CAPS_REPLY" + +_node_cache: Dict[str, Dict[str, Any]] = {} +_node_timestamps: Dict[str, float] = {} +_nats_client = None +_initialized = False + + +async def initialize(): + """Connect to NATS for discovery. Called once at router startup.""" + global _nats_client, _initialized + if not NATS_ENABLED: + logger.info("Global caps NATS discovery disabled") + _initialized = True + return + try: + import nats as nats_lib + _nats_client = await nats_lib.connect(NATS_URL) + _initialized = True + logger.info(f"✅ Global caps NATS connected: {NATS_URL}") + except Exception as e: + logger.warning(f"⚠️ Global caps NATS init failed (non-fatal): {e}") + _nats_client = None + _initialized = True + + +async def shutdown(): + global _nats_client + if _nats_client: + try: + await _nats_client.close() + except Exception: + pass + _nats_client = None + + +async def _fetch_local() -> Optional[Dict[str, Any]]: + """Fetch capabilities from local NCS via HTTP.""" + if not LOCAL_NCS_URL: + return None + try: + async with httpx.AsyncClient(timeout=3) as c: + resp = await c.get(LOCAL_NCS_URL) + if resp.status_code == 200: + data = resp.json() + node_id = data.get("node_id", LOCAL_NODE_ID) + _node_cache[node_id] = data + _node_timestamps[node_id] = time.time() + return data + except Exception as e: + logger.warning(f"Local NCS fetch failed: {e}") + return _node_cache.get(LOCAL_NODE_ID) + + +async def _discover_remote_nodes() -> List[Dict[str, Any]]: + """Scatter-gather discovery: send to node.*.capabilities.get, collect replies. + + Each NCS on every node subscribes to node.{node_id}.capabilities.get. + NATS wildcard routing delivers our request to ALL of them. + We collect replies within NATS_DISCOVERY_TIMEOUT_MS. + + This scales to 150+ nodes with zero static configuration: + - New node deploys NCS → subscribes to its subject → automatically discovered. + - Dead node stops responding → its cache entry expires after TTL. + """ + if not _nats_client: + return [] + + collected: List[Dict[str, Any]] = [] + inbox = _nats_client.new_inbox() + sub = await _nats_client.subscribe(inbox) + + try: + await _nats_client.publish_request( + "node.*.capabilities.get", inbox, b"" + ) + await _nats_client.flush() + + deadline = time.time() + (NATS_DISCOVERY_TIMEOUT_MS / 1000.0) + while time.time() < deadline: + remaining = deadline - time.time() + if remaining <= 0: + break + try: + msg = await asyncio.wait_for( + sub.next_msg(), timeout=remaining, + ) + data = json.loads(msg.data) + node_id = data.get("node_id", "?") + if node_id != LOCAL_NODE_ID: + _node_cache[node_id] = data + _node_timestamps[node_id] = time.time() + collected.append(data) + except asyncio.TimeoutError: + break + except Exception as e: + logger.debug(f"Discovery parse error: {e}") + break + finally: + await sub.unsubscribe() + + if collected: + logger.info( + f"Discovered {len(collected)} remote node(s): " + f"{[c.get('node_id', '?') for c in collected]}" + ) + return collected + + +def _evict_stale(): + """Remove nodes that haven't refreshed within 3x TTL.""" + cutoff = time.time() - (CACHE_TTL * 3) + stale = [nid for nid, ts in _node_timestamps.items() if ts < cutoff] + for nid in stale: + _node_cache.pop(nid, None) + _node_timestamps.pop(nid, None) + logger.info(f"Evicted stale node: {nid}") + + +def _needs_refresh() -> bool: + """Check if any node cache is older than TTL.""" + if not _node_timestamps: + return True + oldest = min(_node_timestamps.values()) + return (time.time() - oldest) > CACHE_TTL + + +async def get_global_capabilities(force: bool = False) -> Dict[str, Any]: + """Return merged capabilities from all known nodes. + + Returns: + { + "local_node": "noda1", + "nodes": {"noda1": {...}, "noda2": {...}, ...}, + "served_models": [...], # all models with "node" field + "node_count": 2, + "updated_at": "...", + } + """ + if not force and not _needs_refresh(): + return _build_global_view() + + _evict_stale() + + tasks = [_fetch_local()] + if _nats_client: + tasks.append(_discover_remote_nodes()) + + await asyncio.gather(*tasks, return_exceptions=True) + + return _build_global_view() + + +def _build_global_view() -> Dict[str, Any]: + """Build a unified view from all cached node capabilities.""" + all_served: List[Dict[str, Any]] = [] + + for node_id, caps in _node_cache.items(): + is_local = (node_id.lower() == LOCAL_NODE_ID.lower()) + age = time.time() - _node_timestamps.get(node_id, 0) + for m in caps.get("served_models", []): + all_served.append({ + **m, + "node": node_id, + "local": is_local, + "node_age_s": round(age, 1), + }) + + all_served.sort(key=lambda m: (0 if m.get("local") else 1, m.get("name", ""))) + + return { + "local_node": LOCAL_NODE_ID, + "nodes": {nid: {"node_id": nid, "served_count": len(c.get("served_models", [])), + "age_s": round(time.time() - _node_timestamps.get(nid, 0), 1)} + for nid, c in _node_cache.items()}, + "served_models": all_served, + "served_count": len(all_served), + "node_count": len(_node_cache), + "updated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + } + + +def get_cached_global() -> Dict[str, Any]: + """Return cached global view without fetching.""" + return _build_global_view() + + +async def send_offload_request( + node_id: str, + request_type: str, + payload: Dict[str, Any], + timeout_s: float = 30.0, +) -> Optional[Dict[str, Any]]: + """Send an inference request to a remote node via NATS. + + Subject pattern: node.{node_id}.{type}.request + Reply: inline NATS request/reply + """ + if not _nats_client: + logger.warning("Cannot offload: NATS not connected") + return None + + subject = f"node.{node_id.lower()}.{request_type}.request" + try: + msg = await _nats_client.request( + subject, + json.dumps(payload).encode(), + timeout=timeout_s, + ) + return json.loads(msg.data) + except asyncio.TimeoutError: + logger.warning(f"Offload timeout: {subject} ({timeout_s}s)") + return None + except Exception as e: + logger.warning(f"Offload error: {subject}: {e}") + return None diff --git a/services/router/main.py b/services/router/main.py index 29f250f0..8ea94a40 100644 --- a/services/router/main.py +++ b/services/router/main.py @@ -46,14 +46,16 @@ except ImportError: RUNTIME_GUARD_AVAILABLE = False RuntimeGuard = None -# NCS-first model selection +# NCS-first model selection (multi-node global) try: import capabilities_client + import global_capabilities_client from model_select import select_model_for_agent, ModelSelection, CLOUD_PROVIDERS as NCS_CLOUD_PROVIDERS NCS_AVAILABLE = True except ImportError: NCS_AVAILABLE = False capabilities_client = None # type: ignore[assignment] + global_capabilities_client = None # type: ignore[assignment] logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -765,7 +767,7 @@ async def startup_event(): else: tool_manager = None - # Initialize Node Capabilities client + # Initialize Node Capabilities (local + global multi-node) if NCS_AVAILABLE and capabilities_client: ncs_cfg = router_config.get("node_capabilities", {}) ncs_url = ncs_cfg.get("url", "") or os.getenv("NODE_CAPABILITIES_URL", "") @@ -774,11 +776,17 @@ async def startup_event(): capabilities_client.configure(url=ncs_url, ttl=ncs_ttl) caps = await capabilities_client.fetch_capabilities() served = caps.get("served_count", 0) - logger.info(f"✅ NCS configured: url={ncs_url} ttl={ncs_ttl}s served={served} models") + logger.info(f"✅ NCS local configured: url={ncs_url} ttl={ncs_ttl}s served={served} models") else: logger.warning("⚠️ NCS url not configured; model selection will use static config only") - elif NCS_AVAILABLE: - logger.info("ℹ️ NCS modules loaded but capabilities_client is None") + + if global_capabilities_client: + await global_capabilities_client.initialize() + gcaps = await global_capabilities_client.get_global_capabilities() + logger.info( + f"✅ Global caps: {gcaps.get('node_count', 0)} node(s), " + f"{gcaps.get('served_count', 0)} total models" + ) else: logger.warning("⚠️ NCS modules not available (model_select / capabilities_client import failed)") @@ -1629,18 +1637,24 @@ async def agent_infer(agent_id: str, request: InferRequest): cloud_provider_names = {"deepseek", "mistral", "grok", "openai", "anthropic"} - # ── NCS-first model selection ──────────────────────────────────────── + # ── Global NCS-first model selection (multi-node) ─────────────────── ncs_selection = None - if NCS_AVAILABLE and capabilities_client: + if NCS_AVAILABLE and global_capabilities_client: + try: + gcaps = await global_capabilities_client.get_global_capabilities() + ncs_selection = await select_model_for_agent( + agent_id, agent_config, router_config, gcaps, request.model, + ) + except Exception as e: + logger.warning(f"⚠️ Global NCS selection error: {e}; falling back to static") + elif NCS_AVAILABLE and capabilities_client: try: caps = await capabilities_client.fetch_capabilities() - if caps: - caps["_fetch_ts"] = capabilities_client._cache_ts ncs_selection = await select_model_for_agent( agent_id, agent_config, router_config, caps, request.model, ) except Exception as e: - logger.warning(f"⚠️ NCS selection error: {e}; falling back to static config") + logger.warning(f"⚠️ NCS selection error: {e}; falling back to static") llm_profiles = router_config.get("llm_profiles", {}) @@ -1651,9 +1665,10 @@ async def agent_infer(agent_id: str, request: InferRequest): if ncs_selection.base_url and provider == "ollama": llm_profile = {**llm_profile, "base_url": ncs_selection.base_url} logger.info( - f"🎯 NCS select: agent={agent_id} profile={default_llm} " - f"→ runtime={ncs_selection.runtime} model={model} " - f"provider={provider} via_ncs={ncs_selection.via_ncs} " + f"🎯 Select: agent={agent_id} profile={default_llm} " + f"→ node={ncs_selection.node} runtime={ncs_selection.runtime} " + f"model={model} provider={provider} " + f"local={ncs_selection.local} via_nats={ncs_selection.via_nats} " f"caps_age={ncs_selection.caps_age_s}s " f"fallback={ncs_selection.fallback_reason or 'none'}" ) diff --git a/services/router/model_select.py b/services/router/model_select.py index ac174ccb..b784916b 100644 --- a/services/router/model_select.py +++ b/services/router/model_select.py @@ -1,8 +1,10 @@ -"""NCS-first model selection for DAGI Router. +"""NCS-first model selection for DAGI Router — multi-node aware. Resolves an agent's LLM profile into a concrete model+provider using live -capabilities from the Node Capabilities Service (NCS). Falls back to static -router-config.yml when NCS is unavailable. +capabilities from Node Capabilities Services across all nodes. +Falls back to static router-config.yml when NCS is unavailable. + +Scaling: works with 1 node or 150+. No static node lists. """ import logging import time @@ -31,7 +33,10 @@ class ModelSelection: model_type: str # llm | vision | code | … base_url: str = "" provider: str = "" # cloud provider name if applicable + node: str = "" # which node owns this model + local: bool = True # is it on the current node? via_ncs: bool = False + via_nats: bool = False fallback_reason: str = "" caps_age_s: float = 0.0 @@ -44,13 +49,11 @@ def resolve_effective_profile( router_cfg: Dict[str, Any], request_model: Optional[str] = None, ) -> str: - """Determine the effective LLM profile name for a request.""" if request_model: llm_profiles = router_cfg.get("llm_profiles", {}) for pname, pcfg in llm_profiles.items(): if pcfg.get("model") == request_model: return pname - return agent_cfg.get("default_llm", "local_default_coder") @@ -59,11 +62,6 @@ def profile_requirements( agent_cfg: Dict[str, Any], router_cfg: Dict[str, Any], ) -> ProfileRequirements: - """Build selection requirements from a profile definition. - - If the profile has `selection_policy` in config, use it directly. - Otherwise, infer from the legacy `provider`/`model` fields. - """ llm_profiles = router_cfg.get("llm_profiles", {}) selection_policies = router_cfg.get("selection_policies", {}) profile_cfg = llm_profiles.get(profile_name, {}) @@ -107,22 +105,23 @@ def profile_requirements( ) -# ── NCS-based selection ─────────────────────────────────────────────────────── +# ── Multi-node model selection ──────────────────────────────────────────────── def select_best_model( reqs: ProfileRequirements, capabilities: Dict[str, Any], ) -> Optional[ModelSelection]: - """Choose the best served model from NCS capabilities. + """Choose the best served model from global (multi-node) capabilities. - Returns None if no suitable model found (caller should try static fallback). + Selection order: + 1. Prefer list matches (local first, then remote) + 2. Best candidate by size (local first, then remote) + 3. None → caller should try static fallback """ served = capabilities.get("served_models", []) if not served: return None - caps_age = time.time() - capabilities.get("_fetch_ts", time.time()) - search_types = [reqs.required_type] if reqs.required_type == "code": search_types.append("llm") @@ -133,24 +132,30 @@ def select_best_model( if not candidates: return None + local_candidates = [m for m in candidates if m.get("local", False)] + remote_candidates = [m for m in candidates if not m.get("local", False)] + prefer = reqs.prefer if reqs.prefer else [] for pref in prefer: if pref == "*": break - for m in candidates: + for m in local_candidates: if pref == m.get("name") or pref in m.get("name", ""): - return _make_selection(m, capabilities, caps_age, reqs) + return _make_selection(m, capabilities) + for m in remote_candidates: + if pref == m.get("name") or pref in m.get("name", ""): + return _make_selection(m, capabilities) - if candidates: - best = _pick_best_candidate(candidates) - return _make_selection(best, capabilities, caps_age, reqs) + if local_candidates: + return _make_selection(_pick_best(local_candidates), capabilities) + if remote_candidates: + return _make_selection(_pick_best(remote_candidates), capabilities) return None -def _pick_best_candidate(candidates: List[Dict[str, Any]]) -> Dict[str, Any]: - """Prefer running models, then largest by size_gb.""" +def _pick_best(candidates: List[Dict[str, Any]]) -> Dict[str, Any]: running = [m for m in candidates if m.get("running")] pool = running if running else candidates return max(pool, key=lambda m: m.get("size_gb", 0)) @@ -159,15 +164,11 @@ def _pick_best_candidate(candidates: List[Dict[str, Any]]) -> Dict[str, Any]: def _make_selection( model: Dict[str, Any], capabilities: Dict[str, Any], - caps_age: float, - reqs: ProfileRequirements, ) -> ModelSelection: runtime = model.get("runtime", "ollama") + is_local = model.get("local", False) + node = model.get("node", capabilities.get("local_node", "")) base_url = model.get("base_url", "") - if not base_url: - runtimes = capabilities.get("runtimes", {}) - rt = runtimes.get(runtime, {}) - base_url = rt.get("base_url", "") return ModelSelection( runtime=runtime, @@ -175,18 +176,20 @@ def _make_selection( model_type=model.get("type", "llm"), base_url=base_url, provider="ollama" if runtime in ("ollama", "llama_server") else runtime, + node=node, + local=is_local, via_ncs=True, - caps_age_s=round(caps_age, 1), + via_nats=not is_local, + caps_age_s=model.get("node_age_s", 0.0), ) -# ── Static fallback (from router-config profiles) ──────────────────────────── +# ── Static fallback ────────────────────────────────────────────────────────── def static_fallback( profile_name: str, router_cfg: Dict[str, Any], ) -> Optional[ModelSelection]: - """Build a ModelSelection from the static llm_profiles config.""" llm_profiles = router_cfg.get("llm_profiles", {}) cfg = llm_profiles.get(profile_name, {}) if not cfg: @@ -200,6 +203,8 @@ def static_fallback( model_type="cloud_llm" if provider in CLOUD_PROVIDERS else "llm", base_url=cfg.get("base_url", ""), provider=provider, + node="local", + local=True, via_ncs=False, fallback_reason="NCS unavailable or no match; using static config", ) @@ -214,10 +219,7 @@ async def select_model_for_agent( capabilities: Optional[Dict[str, Any]], request_model: Optional[str] = None, ) -> ModelSelection: - """Full selection pipeline: resolve profile → NCS → static fallback. - - This is the single entry point the router calls for each request. - """ + """Full selection pipeline: resolve profile → NCS (multi-node) → static → hard default.""" profile = resolve_effective_profile( agent_id, agent_cfg, router_cfg, request_model, ) @@ -238,36 +240,36 @@ async def select_model_for_agent( sel = select_best_model(reqs, capabilities) if sel: logger.info( - f"[select] agent={agent_id} profile={profile} → NCS " - f"runtime={sel.runtime} model={sel.name} caps_age={sel.caps_age_s}s" + f"[select] agent={agent_id} profile={profile} → " + f"{'NCS' if sel.local else 'REMOTE'} " + f"node={sel.node} runtime={sel.runtime} " + f"model={sel.name} caps_age={sel.caps_age_s}s" ) return sel logger.warning( - f"[select] agent={agent_id} profile={profile} → NCS had no match " - f"for type={reqs.required_type}; trying static" + f"[select] agent={agent_id} profile={profile} → no match " + f"for type={reqs.required_type} across {capabilities.get('node_count', 0)} node(s)" ) static = static_fallback(profile, router_cfg) if static: logger.info( f"[select] agent={agent_id} profile={profile} → static " - f"provider={static.provider} model={static.name} " - f"reason={static.fallback_reason}" + f"provider={static.provider} model={static.name}" ) return static if reqs.fallback_profile and reqs.fallback_profile != profile: logger.warning( f"[select] agent={agent_id} profile={profile} not found → " - f"trying fallback_profile={reqs.fallback_profile}" + f"fallback_profile={reqs.fallback_profile}" ) return await select_model_for_agent( agent_id, agent_cfg, router_cfg, capabilities, ) logger.error( - f"[select] agent={agent_id} profile={profile} → ALL selection " - f"methods failed. Using hard default qwen3:14b" + f"[select] agent={agent_id} ALL methods failed → hard default" ) return ModelSelection( runtime="ollama", @@ -275,6 +277,8 @@ async def select_model_for_agent( model_type="llm", base_url="http://host.docker.internal:11434", provider="ollama", + node="local", + local=True, via_ncs=False, fallback_reason="all methods failed; hard default", )