From 776ab3a7ba1553e6e87a5804d28a2927973ff789 Mon Sep 17 00:00:00 2001 From: Apple Date: Fri, 28 Nov 2025 05:23:14 -0800 Subject: [PATCH] feat: add node-registry dashboard, matrix-presence-aggregator, ocr-service updates --- scripts/sync_agents_from_config.py | 248 ++++++++++++ .../app/agents_source.py | 59 ++- .../app/aggregator.py | 7 +- .../matrix-presence-aggregator/app/main.py | 1 + .../matrix-presence-aggregator/app/models.py | 7 +- services/node-registry/app/dashboard.py | 369 ++++++++++++++++++ services/node-registry/app/database.py | 23 +- services/node-registry/app/main.py | 183 ++++++++- services/node-registry/app/models.py | 15 +- services/node-registry/requirements.txt | 2 + services/ocr-service/Dockerfile | 12 +- services/ocr-service/app/main.py | 10 +- services/ocr-service/requirements.txt | 10 +- services/router/main.py | 271 ++++++++++++- services/router/requirements.txt | 1 + 15 files changed, 1162 insertions(+), 56 deletions(-) create mode 100644 scripts/sync_agents_from_config.py create mode 100644 services/node-registry/app/dashboard.py diff --git a/scripts/sync_agents_from_config.py b/scripts/sync_agents_from_config.py new file mode 100644 index 00000000..61c25adf --- /dev/null +++ b/scripts/sync_agents_from_config.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 +""" +Sync Agents from YAML Config to DAARION City Database + +This script reads config/agents_city_mapping.yaml and syncs: +1. Agents to `agents` table +2. Agent-Room bindings to `agent_room_bindings` table +3. Validates node_id against Node Registry + +Usage: + python scripts/sync_agents_from_config.py + +Environment: + DATABASE_URL - PostgreSQL connection string for DAARION city DB + NODE_REGISTRY_URL - URL for Node Registry API (default: http://localhost:9205) +""" + +import os +import sys +import yaml +import logging +import httpx +from datetime import datetime +from pathlib import Path + +import psycopg2 +from psycopg2.extras import RealDictCursor + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s' +) +logger = logging.getLogger(__name__) + +# Configuration +DATABASE_URL = os.getenv('DATABASE_URL', 'postgresql://postgres:postgres@localhost:5432/daarion') +NODE_REGISTRY_URL = os.getenv('NODE_REGISTRY_URL', 'http://localhost:9205') +CONFIG_PATH = Path(__file__).parent.parent / 'config' / 'agents_city_mapping.yaml' + + +def load_config() -> dict: + """Load agents configuration from YAML file.""" + if not CONFIG_PATH.exists(): + raise FileNotFoundError(f"Config file not found: {CONFIG_PATH}") + + with open(CONFIG_PATH, 'r', encoding='utf-8') as f: + config = yaml.safe_load(f) + + logger.info(f"✅ Loaded config: {len(config.get('agents', []))} agents, {len(config.get('districts', []))} districts") + return config + + +def validate_node(node_id: str) -> bool: + """Check if node exists in Node Registry.""" + try: + response = httpx.get(f"{NODE_REGISTRY_URL}/api/v1/nodes/{node_id}", timeout=5.0) + if response.status_code == 200: + logger.debug(f"✅ Node validated: {node_id}") + return True + else: + logger.warning(f"⚠️ Node not found in registry: {node_id}") + return False + except Exception as e: + logger.warning(f"⚠️ Cannot validate node {node_id}: {e}") + return False + + +def get_room_id_by_slug(cursor, slug: str) -> str | None: + """Get room_id by slug from city_rooms.""" + cursor.execute("SELECT id FROM city_rooms WHERE slug = %s", (slug,)) + row = cursor.fetchone() + return row['id'] if row else None + + +def sync_agents(config: dict, conn) -> tuple[int, int, int]: + """ + Sync agents from config to database. + + Returns: + Tuple of (created, updated, errors) + """ + cursor = conn.cursor(cursor_factory=RealDictCursor) + created = 0 + updated = 0 + errors = 0 + + default_node_id = config.get('default_node_id', 'node-2-macbook-m4max') + + for agent in config.get('agents', []): + agent_id = agent['agent_id'] + node_id = agent.get('node_id', default_node_id) + + try: + # Validate node (optional - just warning) + validate_node(node_id) + + # Upsert agent (using 'id' as primary key, which is agent_id) + cursor.execute(""" + INSERT INTO agents ( + id, display_name, kind, role, avatar_url, color_hint, + is_active, node_id, district, primary_room_slug, model, priority, + status, created_at, updated_at + ) VALUES ( + %(agent_id)s, %(display_name)s, %(kind)s, %(role)s, %(avatar_url)s, %(color_hint)s, + true, %(node_id)s, %(district)s, %(primary_room_slug)s, %(model)s, %(priority)s, + 'online', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + ) + ON CONFLICT (id) DO UPDATE SET + display_name = EXCLUDED.display_name, + kind = EXCLUDED.kind, + role = EXCLUDED.role, + avatar_url = EXCLUDED.avatar_url, + color_hint = EXCLUDED.color_hint, + is_active = true, + node_id = EXCLUDED.node_id, + district = EXCLUDED.district, + primary_room_slug = EXCLUDED.primary_room_slug, + model = EXCLUDED.model, + priority = EXCLUDED.priority, + status = 'online', + updated_at = CURRENT_TIMESTAMP + RETURNING (xmax = 0) as is_insert + """, { + 'agent_id': agent_id, + 'display_name': agent.get('display_name', agent_id), + 'kind': agent.get('kind', 'agent'), + 'role': agent.get('role', ''), + 'avatar_url': agent.get('avatar_url'), + 'color_hint': agent.get('color_hint', '#6366F1'), + 'node_id': node_id, + 'district': agent.get('district'), + 'primary_room_slug': agent.get('primary_room_slug'), + 'model': agent.get('model'), + 'priority': agent.get('priority', 'medium'), + }) + + result = cursor.fetchone() + if result and result['is_insert']: + created += 1 + logger.info(f"✅ Created agent: {agent_id}") + else: + updated += 1 + logger.debug(f"🔄 Updated agent: {agent_id}") + + # Create room binding + room_slug = agent.get('primary_room_slug') + if room_slug: + room_id = get_room_id_by_slug(cursor, room_slug) + if room_id: + cursor.execute(""" + INSERT INTO agent_room_bindings (agent_id, room_id, role, is_primary) + VALUES (%(agent_id)s, %(room_id)s, 'resident', true) + ON CONFLICT (agent_id, room_id) DO UPDATE SET + is_primary = true, + updated_at = CURRENT_TIMESTAMP + """, {'agent_id': agent_id, 'room_id': room_id}) + else: + logger.warning(f"⚠️ Room not found for agent {agent_id}: {room_slug}") + + except Exception as e: + errors += 1 + logger.error(f"❌ Error syncing agent {agent_id}: {e}") + + conn.commit() + return created, updated, errors + + +def sync_districts(config: dict, conn) -> int: + """Sync districts from config to database.""" + cursor = conn.cursor() + count = 0 + + for district in config.get('districts', []): + try: + cursor.execute(""" + INSERT INTO city_districts (id, name, description, color, icon, room_slug) + VALUES (%(id)s, %(name)s, %(description)s, %(color)s, %(icon)s, %(room_slug)s) + ON CONFLICT (id) DO UPDATE SET + name = EXCLUDED.name, + description = EXCLUDED.description, + color = EXCLUDED.color, + icon = EXCLUDED.icon, + room_slug = EXCLUDED.room_slug, + updated_at = CURRENT_TIMESTAMP + """, { + 'id': district['id'], + 'name': district['name'], + 'description': district.get('description', ''), + 'color': district.get('color', '#6366F1'), + 'icon': district.get('icon', 'building'), + 'room_slug': district.get('room_slug'), + }) + count += 1 + except Exception as e: + logger.error(f"❌ Error syncing district {district['id']}: {e}") + + conn.commit() + logger.info(f"✅ Synced {count} districts") + return count + + +def main(): + """Main entry point.""" + logger.info("🚀 Starting Agent-City Sync") + logger.info(f"📁 Config: {CONFIG_PATH}") + logger.info(f"🗄️ Database: {DATABASE_URL.split('@')[-1] if '@' in DATABASE_URL else DATABASE_URL}") + logger.info(f"📡 Node Registry: {NODE_REGISTRY_URL}") + print() + + try: + # Load config + config = load_config() + + # Connect to database + conn = psycopg2.connect(DATABASE_URL) + logger.info("✅ Connected to database") + + # Sync districts + sync_districts(config, conn) + + # Sync agents + created, updated, errors = sync_agents(config, conn) + + # Summary + print() + logger.info("=" * 50) + logger.info("📊 SYNC SUMMARY") + logger.info("=" * 50) + logger.info(f"✅ Agents created: {created}") + logger.info(f"🔄 Agents updated: {updated}") + logger.info(f"❌ Errors: {errors}") + logger.info(f"📍 Total agents: {created + updated}") + logger.info("=" * 50) + + conn.close() + + if errors > 0: + sys.exit(1) + + except Exception as e: + logger.error(f"❌ Fatal error: {e}") + sys.exit(1) + + +if __name__ == '__main__': + main() + diff --git a/services/matrix-presence-aggregator/app/agents_source.py b/services/matrix-presence-aggregator/app/agents_source.py index 6b7446f2..dc139950 100644 --- a/services/matrix-presence-aggregator/app/agents_source.py +++ b/services/matrix-presence-aggregator/app/agents_source.py @@ -21,20 +21,33 @@ class AgentsSource: - display_name - kind - status - - room_id (current_room_id) + - room_id - color + - node_id + - district + - model + - role + - avatar_url """ query = text(""" SELECT - id as agent_id, - display_name, - kind, - status, - current_room_id as room_id, - color - FROM agents - WHERE status IN ('online', 'busy') - ORDER BY display_name + a.id as agent_id, + a.display_name, + a.kind, + a.status, + COALESCE(cr.id, a.current_room_id) as room_id, + COALESCE(a.color_hint, a.color, 'cyan') as color, + a.node_id, + a.district, + a.model, + a.role, + a.avatar_url, + a.primary_room_slug + FROM agents a + LEFT JOIN city_rooms cr ON cr.slug = a.primary_room_slug + WHERE a.status IN ('online', 'busy') + AND (a.is_active = true OR a.is_active IS NULL) + ORDER BY a.display_name """) try: @@ -69,17 +82,25 @@ class AgentsSource: return [] def get_all_agents(self) -> List[Dict]: - """Get all agents (including offline)""" + """Get all active agents (including offline)""" query = text(""" SELECT - id as agent_id, - display_name, - kind, - status, - current_room_id as room_id, - color - FROM agents - ORDER BY display_name + a.id as agent_id, + a.display_name, + a.kind, + a.status, + COALESCE(cr.id, a.current_room_id) as room_id, + COALESCE(a.color_hint, a.color, 'cyan') as color, + a.node_id, + a.district, + a.model, + a.role, + a.avatar_url, + a.primary_room_slug + FROM agents a + LEFT JOIN city_rooms cr ON cr.slug = a.primary_room_slug + WHERE a.is_active = true OR a.is_active IS NULL + ORDER BY a.display_name """) try: diff --git a/services/matrix-presence-aggregator/app/aggregator.py b/services/matrix-presence-aggregator/app/aggregator.py index d007465d..831099b5 100644 --- a/services/matrix-presence-aggregator/app/aggregator.py +++ b/services/matrix-presence-aggregator/app/aggregator.py @@ -86,7 +86,12 @@ class PresenceAggregator: kind=agent.get("kind", "assistant"), status=agent.get("status", "online"), room_id=agent.get("room_id"), - color=agent.get("color", "cyan") + color=agent.get("color", "cyan"), + node_id=agent.get("node_id"), + district=agent.get("district"), + model=agent.get("model"), + role=agent.get("role"), + avatar_url=agent.get("avatar_url"), ) all_agents.append(ap) diff --git a/services/matrix-presence-aggregator/app/main.py b/services/matrix-presence-aggregator/app/main.py index 64935ff3..8165d003 100644 --- a/services/matrix-presence-aggregator/app/main.py +++ b/services/matrix-presence-aggregator/app/main.py @@ -103,6 +103,7 @@ async def health(): @app.get("/presence/summary") +@app.get("/presence/snapshot") async def get_presence_summary(): """ Get current presence snapshot. diff --git a/services/matrix-presence-aggregator/app/models.py b/services/matrix-presence-aggregator/app/models.py index 9a3cecf7..ec7b147f 100644 --- a/services/matrix-presence-aggregator/app/models.py +++ b/services/matrix-presence-aggregator/app/models.py @@ -8,10 +8,15 @@ class AgentPresence(BaseModel): """Agent presence in a room""" agent_id: str display_name: str - kind: str = "assistant" # assistant, civic, oracle, builder + kind: str = "assistant" # assistant, civic, oracle, builder, vision, etc. status: str = "offline" # online, offline, busy room_id: Optional[str] = None color: Optional[str] = None + node_id: Optional[str] = None # Node where agent runs (node-1-hetzner, node-2-macbook) + district: Optional[str] = None # City district (leadership, engineering, etc.) + model: Optional[str] = None # LLM model used by agent + role: Optional[str] = None # Agent's role description + avatar_url: Optional[str] = None # Agent avatar URL class RoomPresence(BaseModel): diff --git a/services/node-registry/app/dashboard.py b/services/node-registry/app/dashboard.py new file mode 100644 index 00000000..0d3d62d2 --- /dev/null +++ b/services/node-registry/app/dashboard.py @@ -0,0 +1,369 @@ +""" +Node Dashboard API - Aggregator for node status and metrics +""" +import asyncio +import logging +import httpx +import psutil +from typing import Dict, Any, Optional, List +from datetime import datetime + +logger = logging.getLogger(__name__) + +# Probe timeout in seconds +PROBE_TIMEOUT = 0.5 +PROBE_TIMEOUT_LONG = 1.0 + + +class DashboardAggregator: + """Aggregates data from multiple services for node dashboard""" + + def __init__(self, node_ip: str = "localhost"): + self.node_ip = node_ip + self.client = httpx.AsyncClient(timeout=PROBE_TIMEOUT) + + async def close(self): + await self.client.aclose() + + async def _probe(self, url: str, timeout: float = PROBE_TIMEOUT) -> Dict[str, Any]: + """Execute HTTP probe with timeout""" + try: + resp = await self.client.get(url, timeout=timeout) + if resp.status_code == 200: + return {"status": "up", "data": resp.json(), "latency_ms": int(resp.elapsed.total_seconds() * 1000)} + else: + return {"status": "degraded", "error": f"HTTP {resp.status_code}"} + except httpx.TimeoutException: + return {"status": "down", "error": "timeout"} + except httpx.ConnectError: + return {"status": "down", "error": "connection refused"} + except Exception as e: + return {"status": "down", "error": str(e)} + + async def get_infra_metrics(self) -> Dict[str, Any]: + """Get infrastructure metrics using psutil""" + try: + cpu_pct = psutil.cpu_percent(interval=0.1) + mem = psutil.virtual_memory() + disk = psutil.disk_usage('/') + + result = { + "cpu_usage_pct": round(cpu_pct, 1), + "ram": { + "total_gb": round(mem.total / (1024**3), 1), + "used_gb": round(mem.used / (1024**3), 1) + }, + "disk": { + "total_gb": round(disk.total / (1024**3), 1), + "used_gb": round(disk.used / (1024**3), 1) + }, + "gpus": [] + } + + # Try to get GPU info (nvidia-smi or similar) + try: + import subprocess + nvidia_output = subprocess.run( + ['nvidia-smi', '--query-gpu=name,memory.total,memory.used,utilization.gpu', '--format=csv,noheader,nounits'], + capture_output=True, text=True, timeout=2 + ) + if nvidia_output.returncode == 0: + for line in nvidia_output.stdout.strip().split('\n'): + parts = [p.strip() for p in line.split(',')] + if len(parts) >= 4: + result["gpus"].append({ + "name": parts[0], + "vram_gb": round(float(parts[1]) / 1024, 1), + "used_gb": round(float(parts[2]) / 1024, 1), + "sm_util_pct": int(parts[3]) + }) + except: + pass + + return result + except Exception as e: + logger.error(f"Failed to get infra metrics: {e}") + return { + "cpu_usage_pct": 0, + "ram": {"total_gb": 0, "used_gb": 0}, + "disk": {"total_gb": 0, "used_gb": 0}, + "gpus": [] + } + + async def probe_swapper(self, port: int = 8890) -> Dict[str, Any]: + """Probe Swapper service""" + base_url = f"http://{self.node_ip}:{port}" + + health_result = await self._probe(f"{base_url}/health", PROBE_TIMEOUT_LONG) + models_result = await self._probe(f"{base_url}/models", PROBE_TIMEOUT_LONG) + + result = { + "status": health_result.get("status", "unknown"), + "endpoint": base_url, + "latency_ms": health_result.get("latency_ms", 0), + "storage": {"total_gb": 0, "used_gb": 0, "free_gb": 0}, + "models": [] + } + + if health_result.get("status") == "up": + data = health_result.get("data", {}) + result["active_model"] = data.get("active_model") + result["mode"] = data.get("mode") + + if models_result.get("status") == "up": + data = models_result.get("data", {}) + for m in data.get("models", []): + result["models"].append({ + "name": m.get("name"), + "size_gb": m.get("size_gb", 0), + "device": m.get("device", "disk"), + "state": m.get("status", "unloaded") + }) + + return result + + async def probe_router(self, port: int = 9102) -> Dict[str, Any]: + """Probe DAGI Router service""" + base_url = f"http://{self.node_ip}:{port}" + + health_result = await self._probe(f"{base_url}/health", PROBE_TIMEOUT_LONG) + backends_result = await self._probe(f"{base_url}/backends/status", PROBE_TIMEOUT_LONG) + + result = { + "status": health_result.get("status", "unknown"), + "endpoint": base_url, + "version": "unknown", + "backends": [], + "metrics": { + "requests_1m": 0, + "requests_1h": 0, + "error_rate_1h": 0, + "avg_latency_ms_1h": 0 + } + } + + if health_result.get("status") == "up": + data = health_result.get("data", {}) + result["version"] = data.get("version", "unknown") + result["nats_connected"] = data.get("nats_connected", False) + + if backends_result.get("status") == "up": + for backend in backends_result.get("data", []): + result["backends"].append({ + "name": backend.get("name"), + "status": backend.get("status"), + "latency_ms": backend.get("latency_ms", 0), + "error": backend.get("error") + }) + + return result + + async def probe_service(self, name: str, port: int, health_path: str = "/health") -> Dict[str, Any]: + """Probe generic AI service""" + base_url = f"http://{self.node_ip}:{port}" + + result = await self._probe(f"{base_url}{health_path}") + + return { + "status": result.get("status", "unknown"), + "endpoint": base_url, + "latency_ms": result.get("latency_ms", 0), + "error": result.get("error") + } + + async def probe_ollama(self, port: int = 11434) -> Dict[str, Any]: + """Probe Ollama service""" + base_url = f"http://{self.node_ip}:{port}" + + result = await self._probe(f"{base_url}/api/tags", PROBE_TIMEOUT_LONG) + + models = [] + if result.get("status") == "up": + data = result.get("data", {}) + for m in data.get("models", []): + models.append(m.get("name")) + + return { + "status": result.get("status", "unknown"), + "endpoint": base_url, + "latency_ms": result.get("latency_ms", 0), + "models": models[:10], # Limit to 10 models + "error": result.get("error") + } + + async def probe_matrix(self, synapse_port: int = 8018, presence_port: int = 8085) -> Dict[str, Any]: + """Probe Matrix services""" + synapse_result = await self._probe(f"http://{self.node_ip}:{synapse_port}/_matrix/client/versions") + presence_result = await self._probe(f"http://{self.node_ip}:{presence_port}/health") + + return { + "enabled": synapse_result.get("status") == "up", + "homeserver": f"http://{self.node_ip}:{synapse_port}", + "synapse": { + "status": synapse_result.get("status", "unknown"), + "latency_ms": synapse_result.get("latency_ms", 0) + }, + "presence_bridge": { + "status": presence_result.get("status", "unknown"), + "latency_ms": presence_result.get("latency_ms", 0) + } + } + + async def probe_monitoring(self, prometheus_port: int = 9090, grafana_port: int = 3001) -> Dict[str, Any]: + """Probe monitoring services""" + prometheus_result = await self._probe(f"http://{self.node_ip}:{prometheus_port}/-/ready") + grafana_result = await self._probe(f"http://{self.node_ip}:{grafana_port}/api/health") + + return { + "prometheus": { + "url": f"http://{self.node_ip}:{prometheus_port}", + "status": prometheus_result.get("status", "unknown") + }, + "grafana": { + "url": f"http://{self.node_ip}:{grafana_port}", + "status": grafana_result.get("status", "unknown") + }, + "logging": { + "loki": {"status": "unknown"} + } + } + + async def get_agents_summary(self, city_service_port: int = 7001) -> Dict[str, Any]: + """Get agents summary from city service""" + # City service uses /city/agents endpoint + result = await self._probe(f"http://{self.node_ip}:{city_service_port}/city/agents", PROBE_TIMEOUT_LONG) + + summary = { + "total": 0, + "running": 0, + "by_kind": {}, + "top": [] + } + + if result.get("status") == "up": + agents = result.get("data", []) + summary["total"] = len(agents) + + for agent in agents: + kind = agent.get("kind", "unknown") + summary["by_kind"][kind] = summary["by_kind"].get(kind, 0) + 1 + + if agent.get("status") in ["online", "busy"]: + summary["running"] += 1 + + # Top 5 agents + online_agents = [a for a in agents if a.get("status") in ["online", "busy"]][:5] + for agent in online_agents: + summary["top"].append({ + "agent_id": agent.get("id"), + "display_name": agent.get("display_name"), + "kind": agent.get("kind"), + "status": agent.get("status"), + "node_id": agent.get("node_id") + }) + + return summary + + +async def build_dashboard(node_profile: Dict[str, Any], node_ip: str = "localhost") -> Dict[str, Any]: + """ + Build complete dashboard from node profile. + + Args: + node_profile: Node profile from registry (with modules, gpu, roles) + node_ip: IP address to probe services + + Returns: + Complete dashboard JSON + """ + aggregator = DashboardAggregator(node_ip) + + try: + # Build module port map + module_ports = {} + for module in node_profile.get("modules", []): + if module.get("port"): + module_ports[module["id"]] = module["port"] + + # Parallel probes + tasks = { + "infra": aggregator.get_infra_metrics(), + } + + # Add probes based on modules + if "ai.swapper" in module_ports: + tasks["swapper"] = aggregator.probe_swapper(module_ports["ai.swapper"]) + + if "ai.router" in module_ports: + tasks["router"] = aggregator.probe_router(module_ports["ai.router"]) + + if "ai.ollama" in module_ports: + tasks["ollama"] = aggregator.probe_ollama(module_ports["ai.ollama"]) + + # Generic AI services + ai_services = ["ai.stt", "ai.tts", "ai.ocr", "ai.memory", "ai.crewai"] + for svc in ai_services: + if svc in module_ports: + svc_name = svc.replace("ai.", "") + tasks[f"svc_{svc_name}"] = aggregator.probe_service(svc_name, module_ports[svc]) + + # Matrix + synapse_port = module_ports.get("matrix.synapse", 8018) + presence_port = module_ports.get("matrix.presence", 8085) + if "matrix.synapse" in module_ports or "matrix.presence" in module_ports: + tasks["matrix"] = aggregator.probe_matrix(synapse_port, presence_port) + + # Monitoring + prometheus_port = module_ports.get("monitoring.prometheus", 9090) + tasks["monitoring"] = aggregator.probe_monitoring(prometheus_port) + + # Agents + city_port = module_ports.get("daarion.city", 7001) + if "daarion.city" in module_ports or "daarion.agents" in module_ports: + tasks["agents"] = aggregator.get_agents_summary(city_port) + + # Execute all probes in parallel + results = {} + for name, task in tasks.items(): + try: + results[name] = await task + except Exception as e: + logger.error(f"Probe {name} failed: {e}") + results[name] = {"status": "error", "error": str(e)} + + # Build dashboard response + dashboard = { + "node": { + "node_id": node_profile.get("node_id"), + "name": node_profile.get("name"), + "roles": node_profile.get("roles", []), + "status": node_profile.get("status", "unknown"), + "public_hostname": node_profile.get("ip_address"), + "environment": node_profile.get("role", "production"), + "gpu": node_profile.get("gpu"), + "modules": node_profile.get("modules", []), + "version": node_profile.get("version", "1.0.0") + }, + "infra": results.get("infra", {}), + "ai": { + "swapper": results.get("swapper", {"status": "not_installed"}), + "router": results.get("router", {"status": "not_installed"}), + "ollama": results.get("ollama", {"status": "not_installed"}), + "services": {} + }, + "agents": results.get("agents", {"total": 0, "running": 0, "by_kind": {}, "top": []}), + "matrix": results.get("matrix", {"enabled": False}), + "monitoring": results.get("monitoring", {}) + } + + # Add AI services + for key, value in results.items(): + if key.startswith("svc_"): + svc_name = key.replace("svc_", "") + dashboard["ai"]["services"][svc_name] = value + + return dashboard + + finally: + await aggregator.close() + diff --git a/services/node-registry/app/database.py b/services/node-registry/app/database.py index d5241e5f..9f3862eb 100644 --- a/services/node-registry/app/database.py +++ b/services/node-registry/app/database.py @@ -1,24 +1,24 @@ """ -SQLite Database connection for local development -Use this for testing without PostgreSQL +PostgreSQL Database connection for Node Registry """ import os -from sqlalchemy import create_engine +from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker, Session from contextlib import contextmanager import logging logger = logging.getLogger(__name__) -# SQLite database file -DB_FILE = os.getenv("NODE_REGISTRY_DB_FILE", "node_registry.db") -DATABASE_URL = f"sqlite:///{DB_FILE}" +# Database URL from environment +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/node_registry") # Create engine engine = create_engine( DATABASE_URL, - connect_args={"check_same_thread": False}, # Required for SQLite - echo=os.getenv("NODE_REGISTRY_ENV") == "development", # Log SQL in dev + pool_pre_ping=True, + pool_size=5, + max_overflow=10, + echo=os.getenv("NODE_REGISTRY_ENV") == "development", ) # Create session factory @@ -65,7 +65,7 @@ def check_db_connection() -> bool: """Check if database connection is working""" try: with engine.connect() as conn: - conn.execute("SELECT 1") + conn.execute(text("SELECT 1")) return True except Exception as e: logger.error(f"Database connection failed: {e}") @@ -75,8 +75,7 @@ def check_db_connection() -> bool: def get_db_info() -> dict: """Get database connection information""" return { - "type": "sqlite", - "database": DB_FILE, + "type": "postgresql", + "url": DATABASE_URL.split("@")[-1] if "@" in DATABASE_URL else DATABASE_URL, "connected": check_db_connection(), } - diff --git a/services/node-registry/app/main.py b/services/node-registry/app/main.py index 2454cfbf..d1bd553b 100644 --- a/services/node-registry/app/main.py +++ b/services/node-registry/app/main.py @@ -488,6 +488,187 @@ async def discover_nodes(query: NodeDiscoveryQuery, db: Session = Depends(get_db raise HTTPException(status_code=500, detail=f"Discovery failed: {str(e)}") +# ============================================================================ +# Node Profile Endpoints (Standard v1) +# ============================================================================ + +from app.dashboard import build_dashboard + + +@app.get("/api/v1/nodes/self/dashboard") +async def get_self_dashboard(db: Session = Depends(get_db)): + """ + Get dashboard for current node (self). + + Uses the first node in registry as "self" for now. + In production, this would use JWT claims to identify the node. + """ + try: + from sqlalchemy import text + + # Get first node as "self" (simplified for v1) + result = db.execute(text(""" + SELECT node_id FROM nodes ORDER BY registered_at LIMIT 1 + """)) + + row = result.fetchone() + if not row: + raise HTTPException(status_code=404, detail="No nodes registered") + + # Delegate to node dashboard + return await get_node_dashboard(row[0], db) + + except HTTPException: + raise + except Exception as e: + logger.error(f"❌ Failed to get self dashboard: {e}") + raise HTTPException(status_code=500, detail=f"Dashboard failed: {str(e)}") + + +@app.get("/api/v1/nodes/{node_id}/dashboard") +async def get_node_dashboard(node_id: str, db: Session = Depends(get_db)): + """ + Get complete node dashboard with live status. + + Aggregates: + - Node profile (roles, modules, GPU) + - Infrastructure metrics (CPU, RAM, Disk, GPU) + - AI services status (Swapper, Router, STT, Vision, OCR) + - Agents summary + - Matrix integration status + - Monitoring status + """ + try: + from sqlalchemy import text + + # Get node profile + result = db.execute(text(""" + SELECT node_id, node_name, node_role, node_type, ip_address, hostname, + status, roles, gpu, modules, version, vpn_ip + FROM nodes + WHERE node_id = :node_id + """), {"node_id": node_id}) + + row = result.fetchone() + if not row: + raise HTTPException(status_code=404, detail=f"Node not found: {node_id}") + + profile = { + "node_id": row[0], + "name": row[1], + "role": row[2], + "type": row[3], + "ip_address": row[4], + "hostname": row[5], + "status": row[6], + "roles": list(row[7]) if row[7] else [], + "gpu": row[8], + "modules": row[9] if row[9] else [], + "version": row[10] or "1.0.0", + } + + # Build dashboard with probes + # For Docker network, use gateway IP to access host services + import os + + # Default to Docker gateway for dagi-network + node_ip = os.getenv("PROBE_HOST", "172.21.0.1") + + # For NODE2, use its actual IP (for remote probing) + if node_id == "node-2-macbook-m4max": + node_ip = row[4] or "192.168.1.33" + + dashboard = await build_dashboard(profile, node_ip) + + return dashboard + + except HTTPException: + raise + except Exception as e: + logger.error(f"❌ Failed to get node dashboard: {e}") + import traceback + traceback.print_exc() + raise HTTPException(status_code=500, detail=f"Dashboard failed: {str(e)}") + + +@app.get("/api/v1/nodes/{node_id}/profile") +async def get_node_profile(node_id: str, db: Session = Depends(get_db)): + """ + Get full node profile including modules, GPU, roles. + Node Profile Standard v1. + """ + try: + from sqlalchemy import text + + result = db.execute(text(""" + SELECT node_id, node_name, node_role, node_type, ip_address, hostname, + status, roles, gpu, modules, version, vpn_ip + FROM nodes + WHERE node_id = :node_id + """), {"node_id": node_id}) + + row = result.fetchone() + if not row: + raise HTTPException(status_code=404, detail=f"Node not found: {node_id}") + + return { + "node_id": row[0], + "name": row[1], + "role": row[2], + "type": row[3], + "ip_address": row[4], + "hostname": row[5], + "status": row[6], + "roles": list(row[7]) if row[7] else [], + "gpu": row[8], + "modules": row[9] if row[9] else [], + "version": row[10] or "1.0.0", + "vpn_ip": str(row[11]) if row[11] else None, + } + except HTTPException: + raise + except Exception as e: + logger.error(f"❌ Failed to get node profile: {e}") + raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}") + + +@app.get("/api/v1/nodes/profiles") +async def get_all_node_profiles(db: Session = Depends(get_db)): + """ + Get all node profiles with modules. + """ + try: + from sqlalchemy import text + + result = db.execute(text(""" + SELECT node_id, node_name, node_role, node_type, ip_address, hostname, + status, roles, gpu, modules, version, vpn_ip + FROM nodes + ORDER BY node_id + """)) + + nodes = [] + for row in result.fetchall(): + nodes.append({ + "node_id": row[0], + "name": row[1], + "role": row[2], + "type": row[3], + "ip_address": row[4], + "hostname": row[5], + "status": row[6], + "roles": list(row[7]) if row[7] else [], + "gpu": row[8], + "modules": row[9] if row[9] else [], + "version": row[10] or "1.0.0", + }) + + return {"nodes": nodes, "total": len(nodes)} + except Exception as e: + logger.error(f"❌ Failed to get node profiles: {e}") + raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}") + + # ============================================================================ # Maintenance Endpoints # ============================================================================ @@ -518,7 +699,7 @@ if __name__ == "__main__": print(f"🚀 Starting {SERVICE_NAME} v{VERSION}") print(f"📊 Environment: {ENV}") print(f"🔌 Port: {HTTP_PORT}") - print(f"🗄️ Database: {DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}") + print(f"🗄️ Database: {os.getenv('DATABASE_URL', 'not configured')}") print(f"📝 Log level: {LOG_LEVEL}") print() diff --git a/services/node-registry/app/models.py b/services/node-registry/app/models.py index 189c9c5b..a0c934dd 100644 --- a/services/node-registry/app/models.py +++ b/services/node-registry/app/models.py @@ -3,8 +3,8 @@ SQLAlchemy ORM Models for Node Registry """ from datetime import datetime from typing import Optional -from sqlalchemy import Column, String, DateTime, Boolean, ForeignKey, Text, Index -from sqlalchemy.dialects.postgresql import UUID as PG_UUID, INET, JSONB as PG_JSONB +from sqlalchemy import Column, String, DateTime, Boolean, ForeignKey, Text, Index, ARRAY +from sqlalchemy.dialects.postgresql import UUID as PG_UUID, INET, JSONB as PG_JSONB, ARRAY as PG_ARRAY from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship from sqlalchemy.types import TypeDecorator, String as SQLString, Text as SQLText @@ -90,6 +90,12 @@ class Node(Base): updated_at = Column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow) node_metadata = Column(JSONB, default={}) + # Node Profile Standard v1 fields + roles = Column(ARRAY(String), default=[]) # ['core', 'gateway', 'matrix', 'agents', 'gpu'] + gpu = Column(JSONB, default=None) # {"name": "NVIDIA RTX 4000", "vram_gb": 20} + modules = Column(JSONB, default=[]) # [{"id": "ai.router", "status": "up", "port": 9102}, ...] + version = Column(String(50), default='1.0.0') + # Relationships profiles = relationship("NodeProfile", back_populates="node", cascade="all, delete-orphan") heartbeats = relationship("HeartbeatLog", back_populates="node", cascade="all, delete-orphan") @@ -113,6 +119,11 @@ class Node(Base): "registered_at": self.registered_at.isoformat() if self.registered_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, "metadata": self.node_metadata or {}, + # Node Profile Standard v1 + "roles": self.roles or [], + "gpu": self.gpu, + "modules": self.modules or [], + "version": self.version or "1.0.0", } diff --git a/services/node-registry/requirements.txt b/services/node-registry/requirements.txt index 7204f218..6c8ece62 100644 --- a/services/node-registry/requirements.txt +++ b/services/node-registry/requirements.txt @@ -9,3 +9,5 @@ alembic==1.14.0 python-json-logger==3.2.1 prometheus-client==0.21.0 psycopg2-binary>=2.9.0 +psutil>=5.9.0 +requests>=2.28.0 diff --git a/services/ocr-service/Dockerfile b/services/ocr-service/Dockerfile index 7828b986..dac8fb7f 100644 --- a/services/ocr-service/Dockerfile +++ b/services/ocr-service/Dockerfile @@ -1,6 +1,6 @@ FROM python:3.11-slim -# Встановити системні залежності +# Встановити системні залежності для Tesseract RUN apt-get update && apt-get install -y \ tesseract-ocr \ tesseract-ocr-ukr \ @@ -16,16 +16,16 @@ WORKDIR /app # Копіювати requirements COPY requirements.txt . -# Встановити Python залежності +# Встановити Python залежності (без EasyOCR для швидкого білду) RUN pip install --no-cache-dir -r requirements.txt -# Завантажити EasyOCR моделі -RUN python -c "import easyocr; easyocr.Reader(['uk', 'en'], gpu=False)" - # Копіювати код COPY app/ ./app/ EXPOSE 8896 -CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8896"] +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8896/health || exit 1 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8896"] diff --git a/services/ocr-service/app/main.py b/services/ocr-service/app/main.py index a57e3cfa..7b85ec4a 100644 --- a/services/ocr-service/app/main.py +++ b/services/ocr-service/app/main.py @@ -92,11 +92,19 @@ async def root(): @app.get("/health") async def health(): """Health check endpoint""" + gpu_available = False + if EASYOCR_AVAILABLE: + try: + import torch + gpu_available = torch.cuda.is_available() + except: + pass + return { "status": "healthy" if (TESSERACT_AVAILABLE or EASYOCR_AVAILABLE) else "degraded", "tesseract": "available" if TESSERACT_AVAILABLE else "unavailable", "easyocr": "available" if EASYOCR_AVAILABLE else "unavailable", - "gpu": torch.cuda.is_available() if EASYOCR_AVAILABLE else False + "gpu": gpu_available } def preprocess_image(img: Image.Image) -> Image.Image: diff --git a/services/ocr-service/requirements.txt b/services/ocr-service/requirements.txt index 5d8cf946..bf5b715d 100644 --- a/services/ocr-service/requirements.txt +++ b/services/ocr-service/requirements.txt @@ -1,10 +1,14 @@ +# Core fastapi==0.104.1 uvicorn[standard]==0.24.0 python-multipart==0.0.6 + +# OCR - Tesseract only (EasyOCR optional) pytesseract==0.3.10 -easyocr==1.7.1 Pillow==10.1.0 numpy==1.24.3 -torch==2.1.0 -torchvision==0.16.0 +# Optional: EasyOCR (uncomment for full support, requires GPU) +# easyocr==1.7.1 +# torch==2.1.0 +# torchvision==0.16.0 diff --git a/services/router/main.py b/services/router/main.py index da3de0c3..249d341a 100644 --- a/services/router/main.py +++ b/services/router/main.py @@ -1,15 +1,27 @@ from fastapi import FastAPI, HTTPException from pydantic import BaseModel -from typing import Literal, Optional, Dict, Any +from typing import Literal, Optional, Dict, Any, List import asyncio import json import os import yaml +import httpx +import logging -app = FastAPI(title="DAARION Router", version="1.0.0") +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +app = FastAPI(title="DAARION Router", version="2.0.0") # Configuration NATS_URL = os.getenv("NATS_URL", "nats://nats:4222") +SWAPPER_URL = os.getenv("SWAPPER_URL", "http://192.168.1.33:8890") +STT_URL = os.getenv("STT_URL", "http://192.168.1.33:8895") +VISION_URL = os.getenv("VISION_URL", "http://192.168.1.33:11434") +OCR_URL = os.getenv("OCR_URL", "http://192.168.1.33:8896") + +# HTTP client for backend services +http_client: Optional[httpx.AsyncClient] = None # NATS client nc = None @@ -49,25 +61,35 @@ config = load_config() @app.on_event("startup") async def startup_event(): """Initialize NATS connection and subscriptions""" - global nc, nats_available - print("🚀 DAGI Router starting up...") + global nc, nats_available, http_client + logger.info("🚀 DAGI Router v2.0.0 starting up...") + + # Initialize HTTP client + http_client = httpx.AsyncClient(timeout=60.0) + logger.info("✅ HTTP client initialized") # Try to connect to NATS try: import nats nc = await nats.connect(NATS_URL) nats_available = True - print(f"✅ Connected to NATS at {NATS_URL}") + logger.info(f"✅ Connected to NATS at {NATS_URL}") # Subscribe to filter decisions if enabled if config.get("messaging_inbound", {}).get("enabled", True): asyncio.create_task(subscribe_to_filter_decisions()) else: - print("⚠️ Messaging inbound routing disabled in config") + logger.warning("⚠️ Messaging inbound routing disabled in config") except Exception as e: - print(f"⚠️ NATS not available: {e}") - print("⚠️ Running in test mode (HTTP only)") + logger.warning(f"⚠️ NATS not available: {e}") + logger.warning("⚠️ Running in test mode (HTTP only)") nats_available = False + + # Log backend URLs + logger.info(f"📡 Swapper URL: {SWAPPER_URL}") + logger.info(f"📡 STT URL: {STT_URL}") + logger.info(f"📡 Vision URL: {VISION_URL}") + logger.info(f"📡 OCR URL: {OCR_URL}") async def subscribe_to_filter_decisions(): """Subscribe to agent.filter.decision events""" @@ -201,10 +223,239 @@ async def test_messaging_route(decision: FilterDecision): @app.on_event("shutdown") async def shutdown_event(): """Clean shutdown""" - global nc + global nc, http_client if nc: await nc.close() - print("✅ NATS connection closed") + logger.info("✅ NATS connection closed") + if http_client: + await http_client.aclose() + logger.info("✅ HTTP client closed") + + +# ============================================================================ +# Backend Integration Endpoints +# ============================================================================ + +class InferRequest(BaseModel): + """Request for agent inference""" + prompt: str + model: Optional[str] = None + max_tokens: Optional[int] = 2048 + temperature: Optional[float] = 0.7 + system_prompt: Optional[str] = None + + +class InferResponse(BaseModel): + """Response from agent inference""" + response: str + model: str + tokens_used: Optional[int] = None + backend: str + + +class BackendStatus(BaseModel): + """Status of a backend service""" + name: str + url: str + status: str # online, offline, error + active_model: Optional[str] = None + error: Optional[str] = None + + +@app.get("/backends/status", response_model=List[BackendStatus]) +async def get_backends_status(): + """Get status of all backend services""" + backends = [] + + # Check Swapper + try: + resp = await http_client.get(f"{SWAPPER_URL}/health", timeout=5.0) + if resp.status_code == 200: + data = resp.json() + backends.append(BackendStatus( + name="swapper", + url=SWAPPER_URL, + status="online", + active_model=data.get("active_model") + )) + else: + backends.append(BackendStatus( + name="swapper", + url=SWAPPER_URL, + status="error", + error=f"HTTP {resp.status_code}" + )) + except Exception as e: + backends.append(BackendStatus( + name="swapper", + url=SWAPPER_URL, + status="offline", + error=str(e) + )) + + # Check STT + try: + resp = await http_client.get(f"{STT_URL}/health", timeout=5.0) + backends.append(BackendStatus( + name="stt", + url=STT_URL, + status="online" if resp.status_code == 200 else "error" + )) + except Exception as e: + backends.append(BackendStatus( + name="stt", + url=STT_URL, + status="offline", + error=str(e) + )) + + # Check Vision (Ollama) + try: + resp = await http_client.get(f"{VISION_URL}/api/tags", timeout=5.0) + if resp.status_code == 200: + data = resp.json() + models = [m.get("name") for m in data.get("models", [])] + backends.append(BackendStatus( + name="vision", + url=VISION_URL, + status="online", + active_model=", ".join(models[:3]) if models else None + )) + else: + backends.append(BackendStatus( + name="vision", + url=VISION_URL, + status="error" + )) + except Exception as e: + backends.append(BackendStatus( + name="vision", + url=VISION_URL, + status="offline", + error=str(e) + )) + + # Check OCR + try: + resp = await http_client.get(f"{OCR_URL}/health", timeout=5.0) + backends.append(BackendStatus( + name="ocr", + url=OCR_URL, + status="online" if resp.status_code == 200 else "error" + )) + except Exception as e: + backends.append(BackendStatus( + name="ocr", + url=OCR_URL, + status="offline", + error=str(e) + )) + + return backends + + +@app.post("/v1/agents/{agent_id}/infer", response_model=InferResponse) +async def agent_infer(agent_id: str, request: InferRequest): + """ + Route inference request to appropriate backend. + + Router decides which backend to use based on: + - Agent configuration (model, capabilities) + - Request type (text, vision, audio) + - Backend availability + """ + logger.info(f"🔀 Inference request for agent: {agent_id}") + logger.info(f"📝 Prompt: {request.prompt[:100]}...") + + # Determine which backend to use + model = request.model or "gpt-oss:latest" + + # Try Swapper first (for LLM models) + try: + # Check if Swapper is available + health_resp = await http_client.get(f"{SWAPPER_URL}/health", timeout=5.0) + if health_resp.status_code == 200: + # Load model if needed + load_resp = await http_client.post( + f"{SWAPPER_URL}/load", + json={"model": model}, + timeout=30.0 + ) + + if load_resp.status_code == 200: + # Generate response via Ollama + generate_resp = await http_client.post( + f"{VISION_URL}/api/generate", + json={ + "model": model, + "prompt": request.prompt, + "system": request.system_prompt, + "stream": False, + "options": { + "num_predict": request.max_tokens, + "temperature": request.temperature + } + }, + timeout=120.0 + ) + + if generate_resp.status_code == 200: + data = generate_resp.json() + return InferResponse( + response=data.get("response", ""), + model=model, + tokens_used=data.get("eval_count"), + backend="swapper+ollama" + ) + except Exception as e: + logger.error(f"❌ Swapper/Ollama error: {e}") + + # Fallback: return error + raise HTTPException( + status_code=503, + detail=f"No backend available for model: {model}" + ) + + +@app.get("/v1/models") +async def list_available_models(): + """List all available models across backends""" + models = [] + + # Get Swapper models + try: + resp = await http_client.get(f"{SWAPPER_URL}/models", timeout=5.0) + if resp.status_code == 200: + data = resp.json() + for m in data.get("models", []): + models.append({ + "id": m.get("name"), + "backend": "swapper", + "size_gb": m.get("size_gb"), + "status": m.get("status", "available") + }) + except Exception as e: + logger.warning(f"Cannot get Swapper models: {e}") + + # Get Ollama models + try: + resp = await http_client.get(f"{VISION_URL}/api/tags", timeout=5.0) + if resp.status_code == 200: + data = resp.json() + for m in data.get("models", []): + # Avoid duplicates + model_name = m.get("name") + if not any(x.get("id") == model_name for x in models): + models.append({ + "id": model_name, + "backend": "ollama", + "size_gb": round(m.get("size", 0) / 1e9, 1), + "status": "loaded" + }) + except Exception as e: + logger.warning(f"Cannot get Ollama models: {e}") + + return {"models": models, "total": len(models)} diff --git a/services/router/requirements.txt b/services/router/requirements.txt index 717822e1..037e33c6 100644 --- a/services/router/requirements.txt +++ b/services/router/requirements.txt @@ -3,6 +3,7 @@ uvicorn[standard]==0.24.0 pydantic==2.5.0 nats-py==2.6.0 PyYAML==6.0.1 +httpx>=0.25.0