"""Capabilities client — fetches and caches live model inventory from Node Capabilities Service.""" import os import time import logging from typing import Any, Dict, List, Optional import httpx logger = logging.getLogger("capabilities_client") _cache: Dict[str, Any] = {} _cache_ts: float = 0 NODE_CAPABILITIES_URL = os.getenv("NODE_CAPABILITIES_URL", "") CACHE_TTL = 30 def configure(url: str = "", ttl: int = 30): global NODE_CAPABILITIES_URL, CACHE_TTL if url: NODE_CAPABILITIES_URL = url CACHE_TTL = ttl async def fetch_capabilities(force: bool = False) -> Dict[str, Any]: global _cache, _cache_ts if not NODE_CAPABILITIES_URL: return {} if not force and _cache and (time.time() - _cache_ts) < CACHE_TTL: return _cache try: async with httpx.AsyncClient(timeout=5) as c: resp = await c.get(NODE_CAPABILITIES_URL) if resp.status_code == 200: _cache = resp.json() _cache_ts = time.time() logger.info(f"Capabilities refreshed: {_cache.get('served_count', 0)} served models") return _cache else: logger.warning(f"Capabilities fetch failed: HTTP {resp.status_code}") except Exception as e: logger.warning(f"Capabilities fetch error: {e}") return _cache def get_cached() -> Dict[str, Any]: return _cache def find_served_model( model_type: str = "llm", preferred_name: Optional[str] = None, runtime: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """Find best served model matching criteria from cached capabilities.""" served = _cache.get("served_models", []) if not served: return None candidates = [m for m in served if m.get("type") == model_type] if runtime: candidates = [m for m in candidates if m.get("runtime") == runtime] if not candidates: return None if preferred_name: for m in candidates: if preferred_name in m.get("name", ""): return m return candidates[0] def list_served_by_type(model_type: str = "llm") -> List[Dict[str, Any]]: return [m for m in _cache.get("served_models", []) if m.get("type") == model_type]