fix(nodes): Normalize Router/Swapper endpoints and fix NODE2 display

Major changes:
- Normalize get_node_endpoints to use ENV vars (ROUTER_BASE_URL, SWAPPER_BASE_URL)
- Remove node_id-based URL selection logic
- Add fallback direct API call in get_node_swapper_detail
- Fix Swapper API endpoint (/models instead of /api/v1/models)
- Add router_healthy and router_version to node_heartbeat fallback
- Add ENV vars to docker-compose for Router/Swapper URLs

Documentation:
- Add TASK_PHASE_NODE2_ROUTER_SWAPPER_FIX.md with full task description
- Add NODE2_GUARDIAN_SETUP.md with setup instructions

This fixes:
- Swapper models not showing for NODE1 and NODE2
- DAGI Router agents not showing for NODE2
- Router/Swapper showing as Down/Degraded when they're actually up
This commit is contained in:
Apple
2025-12-02 03:13:01 -08:00
parent 5f07a6b3ae
commit f95810e8a7
5 changed files with 509 additions and 45 deletions

View File

@@ -3345,36 +3345,41 @@ async def get_node_metrics_current(node_id: str) -> Optional[Dict[str, Any]]:
async def get_node_endpoints(node_id: str) -> Dict[str, str]:
"""
Отримати URL endpoints для конкретної ноди.
Якщо в БД немає значень — підставляє дефолти на основі node_id.
Отримати URL endpoints для DAGI Router та Swapper Service.
Використовує ENV змінні для базових URL (один для всіх нод у проді).
Якщо в БД є специфічні URL для ноди - використовує їх, інакше - ENV дефолти.
ENV змінні:
- ROUTER_BASE_URL (default: http://dagi-router:9102 для проді)
- SWAPPER_BASE_URL (default: http://swapper-service:8890 для проді)
Для DEV (локальний запуск на Mac):
- ROUTER_BASE_URL=http://localhost:9102
- SWAPPER_BASE_URL=http://localhost:8890
"""
pool = await get_pool()
# Get node-specific URLs from DB if exist
row = await pool.fetchrow("""
SELECT router_url, swapper_url
FROM node_cache
WHERE node_id = $1
""", node_id)
# Determine defaults based on node_id
is_node2 = "node-2" in node_id.lower() or "macbook" in node_id.lower()
# Get base URLs from ENV (one for all nodes in production)
router_base = os.getenv("ROUTER_BASE_URL", "http://dagi-router:9102")
swapper_base = os.getenv("SWAPPER_BASE_URL", "http://swapper-service:8890")
if is_node2:
# NODE2 defaults (localhost or IP-based)
defaults = {
"router_url": "http://localhost:9102",
"swapper_url": "http://localhost:8890"
}
else:
# NODE1 defaults (Docker-based)
defaults = {
"router_url": "http://dagi-router:9102",
"swapper_url": "http://swapper-service:8890"
}
defaults = {
"router_url": router_base,
"swapper_url": swapper_base
}
if not row:
return defaults
# Use DB values if present, otherwise fallback to ENV defaults
return {
"router_url": row["router_url"] or defaults["router_url"],
"swapper_url": row["swapper_url"] or defaults["swapper_url"]
@@ -3863,7 +3868,11 @@ async def node_heartbeat(
swapper_state = CASE
WHEN $12::jsonb IS NOT NULL THEN $12::jsonb
ELSE swapper_state
END
END,
router_healthy = COALESCE($13::boolean, router_healthy),
router_version = COALESCE($14, router_version),
router_url = COALESCE($15, router_url),
swapper_url = COALESCE($16, swapper_url)
WHERE node_id = $1
""",
node_id,
@@ -3877,7 +3886,11 @@ async def node_heartbeat(
metrics.get("swapper_healthy"),
metrics.get("swapper_models_loaded"),
metrics.get("swapper_models_total"),
swapper_state_json
swapper_state_json,
metrics.get("router_healthy"),
metrics.get("router_version"),
metrics.get("router_url"),
metrics.get("swapper_url")
)
return {

View File

@@ -4388,43 +4388,86 @@ async def get_node_swapper_detail(node_id: str):
"""
Get detailed Swapper Service status for a node.
Used by Node Cabinet to show loaded models and health.
Returns fallback data if metrics not found (instead of 404).
First tries to get data from node_cache (populated by node-guardian).
If not found, attempts direct call to Swapper API as fallback.
Returns fallback data if both fail (instead of 404).
"""
import httpx
try:
# Fetch from node_cache
# First, try to fetch from node_cache (preferred - populated by node-guardian)
metrics = await repo_city.get_node_metrics(node_id)
if not metrics:
# Return fallback instead of 404 - allows UI to show pending state
logger.info(f"Swapper metrics not found for {node_id}, returning fallback")
if metrics:
# Parse swapper state (stored as JSONB)
state = metrics.get("swapper_state") or {}
models_data = state.get("models", [])
models = [
SwapperModel(
name=m.get("name", "unknown"),
# Swapper uses "status": "loaded" not "loaded": true
loaded=m.get("status") == "loaded" or m.get("loaded", False),
type=m.get("type"),
vram_gb=m.get("size_gb") or m.get("vram_gb")
)
for m in models_data
]
return NodeSwapperDetail(
node_id=node_id,
healthy=False,
models_loaded=0,
models_total=0,
models=[]
healthy=metrics.get("swapper_healthy", False),
models_loaded=metrics.get("swapper_models_loaded", 0),
models_total=metrics.get("swapper_models_total", 0),
models=models
)
# Parse swapper state (stored as JSONB)
state = metrics.get("swapper_state") or {}
models_data = state.get("models", [])
models = [
SwapperModel(
name=m.get("name", "unknown"),
# Swapper uses "status": "loaded" not "loaded": true
loaded=m.get("status") == "loaded" or m.get("loaded", False),
type=m.get("type"),
vram_gb=m.get("size_gb") or m.get("vram_gb")
)
for m in models_data
]
# Fallback: try direct call to Swapper API
logger.info(f"Swapper metrics not found in cache for {node_id}, trying direct API call")
endpoints = await repo_city.get_node_endpoints(node_id)
swapper_url = endpoints.get("swapper_url")
if swapper_url:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
# Try to get models from Swapper (endpoint: /models, not /api/v1/models)
resp = await client.get(f"{swapper_url}/models")
if resp.status_code == 200:
data = resp.json()
models_list = data.get("models", []) if isinstance(data, dict) else data
models = [
SwapperModel(
name=m.get("name", "unknown"),
loaded=m.get("status") == "loaded" or m.get("loaded", False),
type=m.get("type"),
vram_gb=m.get("size_gb") or m.get("vram_gb")
)
for m in models_list
]
loaded_count = sum(1 for m in models if m.loaded)
logger.info(f"✅ Direct Swapper API call successful: {loaded_count}/{len(models)} models loaded")
return NodeSwapperDetail(
node_id=node_id,
healthy=True,
models_loaded=loaded_count,
models_total=len(models),
models=models
)
except Exception as api_error:
logger.warning(f"Direct Swapper API call failed for {node_id} at {swapper_url}: {api_error}")
# Final fallback: return empty state
logger.info(f"Swapper data unavailable for {node_id}, returning fallback")
return NodeSwapperDetail(
node_id=node_id,
healthy=metrics.get("swapper_healthy", False),
models_loaded=metrics.get("swapper_models_loaded", 0),
models_total=metrics.get("swapper_models_total", 0),
models=models
healthy=False,
models_loaded=0,
models_total=0,
models=[]
)
except HTTPException:
raise