""" Repository для City Backend (PostgreSQL) """ import os import uuid import asyncpg import json from typing import Optional, List, Dict, Any, Tuple from datetime import datetime, timezone import secrets import httpx import logging logger = logging.getLogger(__name__) # Database connection _pool: Optional[asyncpg.Pool] = None MATRIX_GATEWAY_URL = os.getenv("MATRIX_GATEWAY_URL", "http://matrix-gateway:8000") async def get_pool() -> asyncpg.Pool: """Отримати connection pool""" global _pool if _pool is None: database_url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/daarion") _pool = await asyncpg.create_pool(database_url, min_size=2, max_size=10) return _pool async def close_pool(): """Закрити connection pool""" global _pool if _pool is not None: await _pool.close() _pool = None def generate_id(prefix: str) -> str: """Генерувати простий ID""" return f"{prefix}_{secrets.token_urlsafe(12)}" def _normalize_capabilities(value: Any) -> List[str]: """Ensure capabilities are returned as a list.""" if value is None: return [] if isinstance(value, list): return value if isinstance(value, str): import json try: return json.loads(value) except Exception: return [] return list(value) # ============================================================================= # City Rooms Repository # ============================================================================= async def get_all_rooms(limit: int = 100, offset: int = 0) -> List[dict]: """Отримати всі кімнати з додатковими полями""" pool = await get_pool() query = """ SELECT cr.id, cr.slug, cr.name, cr.description, cr.room_type, cr.space_scope, cr.visibility, cr.is_default, cr.is_public, cr.sort_order, cr.created_at, cr.created_by, cr.matrix_room_id, cr.matrix_room_alias, cr.logo_url, cr.banner_url, cr.room_role FROM city_rooms cr WHERE cr.is_public = true OR cr.space_scope = 'city' ORDER BY cr.sort_order ASC, cr.is_default DESC, cr.created_at DESC LIMIT $1 OFFSET $2 """ rows = await pool.fetch(query, limit, offset) return [dict(row) for row in rows] async def get_city_rooms_for_list(limit: int = 100) -> List[dict]: """Отримати City Rooms для відображення у списку""" pool = await get_pool() query = """ SELECT cr.id, cr.slug, cr.name, cr.description, cr.room_type, cr.space_scope, cr.visibility, cr.is_public, cr.sort_order, cr.matrix_room_id, cr.matrix_room_alias, cr.logo_url, cr.banner_url, cr.room_role, cr.created_at FROM city_rooms cr WHERE cr.space_scope = 'city' AND cr.is_public = true ORDER BY cr.sort_order ASC, cr.name ASC LIMIT $1 """ rows = await pool.fetch(query, limit) return [dict(row) for row in rows] async def get_room_by_id(room_id: str) -> Optional[dict]: """Отримати кімнату по ID""" pool = await get_pool() query = """ SELECT cr.id, cr.slug, cr.name, cr.description, cr.is_default, cr.created_at, cr.created_by, cr.matrix_room_id, cr.matrix_room_alias, cr.logo_url, cr.banner_url, cr.microdao_id, m.name AS microdao_name, m.slug AS microdao_slug, m.logo_url AS microdao_logo_url FROM city_rooms cr LEFT JOIN microdaos m ON cr.microdao_id::text = m.id WHERE cr.id = $1 """ row = await pool.fetchrow(query, room_id) return dict(row) if row else None async def get_room_by_slug(slug: str) -> Optional[dict]: """Отримати кімнату по slug""" pool = await get_pool() query = """ SELECT cr.id, cr.slug, cr.name, cr.description, cr.is_default, cr.created_at, cr.created_by, cr.matrix_room_id, cr.matrix_room_alias, cr.logo_url, cr.banner_url, cr.microdao_id, m.name AS microdao_name, m.slug AS microdao_slug, m.logo_url AS microdao_logo_url FROM city_rooms cr LEFT JOIN microdaos m ON cr.microdao_id::text = m.id WHERE cr.slug = $1 """ row = await pool.fetchrow(query, slug) return dict(row) if row else None async def get_room_by_id(room_id: str) -> Optional[dict]: """Отримати кімнату по ID (UUID)""" pool = await get_pool() query = """ SELECT cr.id, cr.slug, cr.name, cr.description, cr.is_default, cr.created_at, cr.created_by, cr.matrix_room_id, cr.matrix_room_alias, cr.logo_url, cr.banner_url, cr.microdao_id, m.name AS microdao_name, m.slug AS microdao_slug, m.logo_url AS microdao_logo_url FROM city_rooms cr LEFT JOIN microdaos m ON cr.microdao_id::text = m.id WHERE cr.id = $1 """ row = await pool.fetchrow(query, room_id) return dict(row) if row else None async def create_room( slug: str, name: str, description: Optional[str], created_by: Optional[str], matrix_room_id: Optional[str] = None, matrix_room_alias: Optional[str] = None ) -> dict: """Створити кімнату""" pool = await get_pool() room_id = f"room_city_{slug}" query = """ INSERT INTO city_rooms (id, slug, name, description, created_by, matrix_room_id, matrix_room_alias) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id, slug, name, description, is_default, created_at, created_by, matrix_room_id, matrix_room_alias """ row = await pool.fetchrow(query, room_id, slug, name, description, created_by, matrix_room_id, matrix_room_alias) return dict(row) async def update_room_matrix(room_id: str, matrix_room_id: str, matrix_room_alias: str) -> Optional[dict]: """Оновити Matrix поля кімнати""" pool = await get_pool() query = """ UPDATE city_rooms SET matrix_room_id = $2, matrix_room_alias = $3 WHERE id = $1 RETURNING id, slug, name, description, is_default, created_at, created_by, matrix_room_id, matrix_room_alias """ row = await pool.fetchrow(query, room_id, matrix_room_id, matrix_room_alias) return dict(row) async def get_rooms_without_matrix() -> List[dict]: """Отримати кімнати без Matrix інтеграції""" pool = await get_pool() query = """ SELECT id, slug, name, description, is_default, created_at, created_by, matrix_room_id, matrix_room_alias FROM city_rooms WHERE matrix_room_id IS NULL ORDER BY created_at """ rows = await pool.fetch(query) return [dict(row) for row in rows] # ============================================================================= # City Room Messages Repository # ============================================================================= async def get_room_messages(room_id: str, limit: int = 50) -> List[dict]: """Отримати повідомлення кімнати""" pool = await get_pool() query = """ SELECT id, room_id, author_user_id, author_agent_id, body, created_at FROM city_room_messages WHERE room_id = $1 ORDER BY created_at DESC LIMIT $2 """ rows = await pool.fetch(query, room_id, limit) # Reverse для правильного порядку (старі → нові) return [dict(row) for row in reversed(rows)] async def create_room_message( room_id: str, body: str, author_user_id: Optional[str] = None, author_agent_id: Optional[str] = None ) -> dict: """Створити повідомлення в кімнаті""" pool = await get_pool() message_id = generate_id("m_city") query = """ INSERT INTO city_room_messages (id, room_id, author_user_id, author_agent_id, body) VALUES ($1, $2, $3, $4, $5) RETURNING id, room_id, author_user_id, author_agent_id, body, created_at """ row = await pool.fetchrow(query, message_id, room_id, author_user_id, author_agent_id, body) return dict(row) # ============================================================================= # City Feed Events Repository # ============================================================================= async def get_feed_events(limit: int = 20, offset: int = 0) -> List[dict]: """Отримати події feed""" pool = await get_pool() query = """ SELECT id, kind, room_id, user_id, agent_id, payload, created_at FROM city_feed_events ORDER BY created_at DESC LIMIT $1 OFFSET $2 """ rows = await pool.fetch(query, limit, offset) return [dict(row) for row in rows] async def create_feed_event( kind: str, payload: dict, room_id: Optional[str] = None, user_id: Optional[str] = None, agent_id: Optional[str] = None ) -> dict: """Створити подію в feed""" pool = await get_pool() event_id = generate_id("evt_city") query = """ INSERT INTO city_feed_events (id, kind, room_id, user_id, agent_id, payload) VALUES ($1, $2, $3, $4, $5, $6::jsonb) RETURNING id, kind, room_id, user_id, agent_id, payload, created_at """ import json payload_json = json.dumps(payload) row = await pool.fetchrow(query, event_id, kind, room_id, user_id, agent_id, payload_json) return dict(row) # ============================================================================= # City Map Repository # ============================================================================= async def get_map_config() -> dict: """Отримати конфігурацію мапи міста""" pool = await get_pool() query = """ SELECT id, grid_width, grid_height, cell_size, background_url, updated_at FROM city_map_config WHERE id = 'default' """ row = await pool.fetchrow(query) if row: return dict(row) # Повернути дефолтні значення якщо немає запису return { "id": "default", "grid_width": 6, "grid_height": 3, "cell_size": 100, "background_url": None } async def get_rooms_for_map() -> List[dict]: """Отримати кімнати з координатами для 2D мапи""" pool = await get_pool() query = """ SELECT id, slug, name, description, room_type, zone, icon, color, map_x, map_y, map_w, map_h, matrix_room_id FROM city_rooms ORDER BY map_y, map_x """ rows = await pool.fetch(query) return [dict(row) for row in rows] # ============================================================================= # Agents Repository # ============================================================================= async def list_agent_summaries( *, node_id: Optional[str] = None, microdao_id: Optional[str] = None, is_public: Optional[bool] = None, visibility_scope: Optional[str] = None, listed_only: Optional[bool] = None, kinds: Optional[List[str]] = None, include_system: bool = True, include_archived: bool = False, limit: int = 200, offset: int = 0 ) -> Tuple[List[dict], int]: """ Unified method to list agents with all necessary data. Used by both Agent Console and Citizens page. """ pool = await get_pool() params: List[Any] = [] where_clauses = [] # Always filter archived unless explicitly included if not include_archived: where_clauses.append("COALESCE(a.is_archived, false) = false") where_clauses.append("COALESCE(a.is_test, false) = false") where_clauses.append("a.deleted_at IS NULL") if node_id: params.append(node_id) where_clauses.append(f"a.node_id = ${len(params)}") if microdao_id: params.append(microdao_id) where_clauses.append(f"EXISTS (SELECT 1 FROM microdao_agents ma WHERE ma.agent_id = a.id AND ma.microdao_id = ${len(params)})") if is_public is not None: params.append(is_public) where_clauses.append(f"COALESCE(a.is_public, false) = ${len(params)}") if visibility_scope: params.append(visibility_scope) where_clauses.append(f"COALESCE(a.visibility_scope, 'city') = ${len(params)}") if listed_only is True: where_clauses.append("COALESCE(a.is_listed_in_directory, true) = true") elif listed_only is False: where_clauses.append("COALESCE(a.is_listed_in_directory, true) = false") if kinds: params.append(kinds) where_clauses.append(f"a.kind = ANY(${len(params)})") if not include_system: where_clauses.append("COALESCE(a.is_system, false) = false") where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" query = f""" SELECT a.id, COALESCE(a.slug, a.public_slug, LOWER(REPLACE(a.display_name, ' ', '-'))) AS slug, a.display_name, COALESCE(a.public_title, '') AS title, COALESCE(a.public_tagline, '') AS tagline, a.kind, a.avatar_url, COALESCE(a.status, 'offline') AS status, a.node_id, nc.node_name AS node_label, nc.hostname AS node_hostname, nc.roles AS node_roles, nc.environment AS node_environment, COALESCE(a.visibility_scope, 'city') AS visibility_scope, COALESCE(a.is_listed_in_directory, true) AS is_listed_in_directory, COALESCE(a.is_system, false) AS is_system, COALESCE(a.is_public, false) AS is_public, COALESCE(a.is_orchestrator, false) AS is_orchestrator, a.primary_microdao_id, pm.name AS primary_microdao_name, pm.slug AS primary_microdao_slug, pm.district AS district, COALESCE(a.public_skills, ARRAY[]::text[]) AS public_skills, -- DAIS & Governance fields (A1, A2) a.gov_level, a.dais_identity_id, a.home_microdao_id, hm.name AS home_microdao_name, hm.slug AS home_microdao_slug, COUNT(*) OVER() AS total_count FROM agents a LEFT JOIN node_cache nc ON a.node_id = nc.node_id LEFT JOIN microdaos pm ON a.primary_microdao_id = pm.id LEFT JOIN microdaos hm ON a.home_microdao_id = hm.id WHERE {where_sql} ORDER BY a.display_name LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} """ params.append(limit) params.append(offset) rows = await pool.fetch(query, *params) if not rows: return [], 0 total = rows[0]["total_count"] items = [] for row in rows: data = dict(row) data.pop("total_count", None) # Build home_node object if data.get("node_id"): data["home_node"] = { "id": data.get("node_id"), "name": data.get("node_label"), "hostname": data.get("node_hostname"), "roles": list(data.get("node_roles") or []), "environment": data.get("node_environment") } else: data["home_node"] = None # Clean up intermediate fields for key in ["node_hostname", "node_roles", "node_environment"]: data.pop(key, None) # Get MicroDAO memberships memberships = await get_agent_microdao_memberships(data["id"]) data["microdaos"] = [ { "id": m.get("microdao_id", ""), "name": m.get("name", ""), "slug": m.get("slug"), "role": m.get("role") } for m in memberships ] data["microdao_memberships"] = memberships # backward compatibility data["public_skills"] = list(data.get("public_skills") or []) # Populate crew_info (crew_team_key column may not exist) crew_team_key = data.pop("crew_team_key", None) if crew_team_key: # Try to find orchestrator team room for their primary microdao # This is a bit expensive for list view, so maybe just return basic info data["crew_info"] = { "has_crew_team": True, "crew_team_key": data["crew_team_key"], "matrix_room_id": None # Loaded lazily if needed } else: data["crew_info"] = { "has_crew_team": False, "crew_team_key": None, "matrix_room_id": None } items.append(data) return items, total async def get_all_agents() -> List[dict]: """Отримати всіх агентів (non-archived) - legacy method""" pool = await get_pool() query = """ SELECT id, display_name, kind, avatar_url, color, status, current_room_id, capabilities, created_at, updated_at FROM agents WHERE COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false AND deleted_at IS NULL ORDER BY display_name """ rows = await pool.fetch(query) return [dict(row) for row in rows] async def update_agent_visibility( agent_id: str, *, is_public: bool, visibility_scope: Optional[str] = None, ) -> Optional[dict]: """ Оновити налаштування видимості агента. Returns updated agent data or None if not found. """ pool = await get_pool() # Build dynamic update set_parts = ["is_public = $2", "updated_at = NOW()"] params = [agent_id, is_public] if visibility_scope is not None: params.append(visibility_scope) set_parts.append(f"visibility_scope = ${len(params)}") # Also update is_listed_in_directory based on is_public set_parts.append("is_listed_in_directory = $2") # same as is_public query = f""" UPDATE agents SET {', '.join(set_parts)} WHERE id = $1 AND COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false RETURNING id, display_name, is_public, visibility_scope, is_listed_in_directory """ result = await pool.fetchrow(query, *params) return dict(result) if result else None async def update_agent_visibility_legacy( agent_id: str, visibility_scope: str, is_listed_in_directory: bool ) -> bool: """Legacy: Оновити налаштування видимості агента (backward compatibility)""" pool = await get_pool() query = """ UPDATE agents SET visibility_scope = $2, is_listed_in_directory = $3, is_public = $3, updated_at = NOW() WHERE id = $1 AND COALESCE(is_archived, false) = false RETURNING id """ result = await pool.fetchrow(query, agent_id, visibility_scope, is_listed_in_directory) return result is not None async def get_agent_prompts(agent_id: str) -> dict: """Отримати системні промти агента""" pool = await get_pool() query = """ SELECT kind, content, version, created_at, note FROM agent_prompts WHERE agent_id = $1 AND is_active = true ORDER BY kind """ rows = await pool.fetch(query, agent_id) result = { "core": None, "safety": None, "governance": None, "tools": None } for row in rows: kind = row["kind"] if kind in result: result[kind] = { "content": row["content"], "version": row["version"], "created_at": row["created_at"].isoformat() if row["created_at"] else None, "note": row.get("note") } return result async def get_runtime_prompts(agent_id: str) -> Dict[str, Any]: """ Отримати системні промти агента для DAGI Router runtime. Returns: { "agent_id": str, "has_prompts": bool, "prompts": { "core": str | None, "safety": str | None, "governance": str | None, "tools": str | None } } """ pool = await get_pool() query = """ SELECT kind, content FROM agent_prompts WHERE agent_id = $1 AND is_active = true ORDER BY kind """ rows = await pool.fetch(query, agent_id) prompts = { "core": None, "safety": None, "governance": None, "tools": None } for row in rows: kind = row["kind"] if kind in prompts: prompts[kind] = row["content"] has_prompts = prompts["core"] is not None return { "agent_id": agent_id, "has_prompts": has_prompts, "prompts": prompts } def build_system_prompt( agent: Dict[str, Any], prompts: Dict[str, str], context: Optional[Dict[str, Any]] = None ) -> str: """ Побудувати повний system prompt для LLM виклику. Args: agent: dict з інформацією про агента (name, kind, node_id, district_id, etc.) prompts: dict з промтами {"core": str, "safety": str, "governance": str, "tools": str} context: додатковий контекст (node info, district info, user role, etc.) Returns: str - зібраний system prompt """ parts = [] # Core prompt (required) if prompts.get("core"): parts.append(prompts["core"]) else: # Fallback: basic prompt from agent info agent_name = agent.get("display_name") or agent.get("name") or "Agent" agent_kind = agent.get("kind") or "assistant" parts.append( f"You are {agent_name}, an AI {agent_kind} in DAARION.city ecosystem. " f"Be helpful, accurate, and follow ethical guidelines." ) # Governance rules if prompts.get("governance"): parts.append("\n\n## Governance\n" + prompts["governance"]) # Safety guidelines if prompts.get("safety"): parts.append("\n\n## Safety Guidelines\n" + prompts["safety"]) # Tools instructions if prompts.get("tools"): parts.append("\n\n## Tools & Capabilities\n" + prompts["tools"]) # Context additions if context: context_parts = [] if context.get("node"): node = context["node"] context_parts.append( f"**Node**: {node.get('name', 'Unknown')} ({node.get('environment', 'unknown')} environment)" ) if context.get("district"): district = context["district"] context_parts.append( f"**District**: {district.get('name', 'Unknown')}" ) if context.get("user_role"): context_parts.append( f"**User Role**: {context['user_role']}" ) if context.get("microdao"): microdao = context["microdao"] context_parts.append( f"**MicroDAO**: {microdao.get('name', 'Unknown')}" ) if context_parts: parts.append("\n\n## Current Context\n" + "\n".join(context_parts)) return "\n".join(parts) async def get_agent_with_runtime_prompt(agent_id: str) -> Optional[Dict[str, Any]]: """ Отримати агента з зібраним runtime system prompt. Використовується DAGI Router для inference. """ pool = await get_pool() # Get agent info agent_query = """ SELECT a.id, a.name, a.display_name, a.kind, a.status, a.node_id, a.district_id, a.microdao_id, a.external_id, a.public_slug FROM agents a WHERE a.id = $1 OR a.external_id = $2 OR a.public_slug = $3 LIMIT 1 """ agent_row = await pool.fetchrow(agent_query, agent_id, f"agent:{agent_id}", agent_id) if not agent_row: return None agent = dict(agent_row) # Get prompts runtime_data = await get_runtime_prompts(agent["id"]) # Build context context = {} # Add node context if agent has node_id if agent.get("node_id"): node = await get_node_by_id(agent["node_id"]) if node: context["node"] = { "name": node.get("name"), "environment": node.get("environment") } # Build full system prompt system_prompt = build_system_prompt(agent, runtime_data["prompts"], context) return { "agent_id": agent["id"], "agent_name": agent.get("display_name") or agent.get("name"), "agent_kind": agent.get("kind"), "has_prompts": runtime_data["has_prompts"], "system_prompt": system_prompt, "prompts": runtime_data["prompts"] } async def check_agents_prompts_status(agent_ids: List[str]) -> Dict[str, bool]: """ Перевірити наявність промтів для списку агентів. Використовується для індикаторів у UI. """ if not agent_ids: return {} pool = await get_pool() # Get all agents with at least core prompt query = """ SELECT DISTINCT agent_id FROM agent_prompts WHERE agent_id = ANY($1) AND kind = 'core' AND is_active = true """ rows = await pool.fetch(query, agent_ids) agents_with_prompts = {row["agent_id"] for row in rows} return { agent_id: agent_id in agents_with_prompts for agent_id in agent_ids } async def update_agent_prompt( agent_id: str, kind: str, content: str, created_by: str, note: Optional[str] = None ) -> dict: """ Оновити або створити системний промт агента. Деактивує попередню версію та створює нову. """ pool = await get_pool() valid_kinds = ("core", "safety", "governance", "tools") if kind not in valid_kinds: raise ValueError(f"Invalid kind: {kind}. Must be one of {valid_kinds}") async with pool.acquire() as conn: async with conn.transaction(): # Деактивувати попередню версію await conn.execute( """ UPDATE agent_prompts SET is_active = false WHERE agent_id = $1 AND kind = $2 AND is_active = true """, agent_id, kind ) # Отримати наступну версію max_version = await conn.fetchval( """ SELECT COALESCE(MAX(version), 0) FROM agent_prompts WHERE agent_id = $1 AND kind = $2 """, agent_id, kind ) new_version = max_version + 1 # Створити новий запис row = await conn.fetchrow( """ INSERT INTO agent_prompts ( agent_id, kind, content, version, created_by, note, is_active, created_at ) VALUES ($1, $2, $3, $4, $5, $6, true, NOW()) RETURNING id, agent_id, kind, content, version, created_at, created_by, note """, agent_id, kind, content, new_version, created_by, note ) return { "agent_id": row["agent_id"], "kind": row["kind"], "content": row["content"], "version": row["version"], "created_at": row["created_at"].isoformat() if row["created_at"] else None, "updated_at": row["created_at"].isoformat() if row["created_at"] else None, "updated_by": row["created_by"], "note": row["note"] } async def upsert_agent_prompts(agent_id: str, prompts: List[dict], created_by: str) -> List[dict]: """ Пакетне оновлення промтів агента. """ results = [] for p in prompts: res = await update_agent_prompt( agent_id=agent_id, kind=p["kind"], content=p["content"], created_by=created_by, note=p.get("note") ) results.append(res) return results async def get_agent_prompt_history(agent_id: str, kind: str, limit: int = 10) -> List[dict]: """ Отримати історію версій промту агента. """ pool = await get_pool() query = """ SELECT id, version, content, created_at, created_by, note, is_active FROM agent_prompts WHERE agent_id = $1 AND kind = $2 ORDER BY version DESC LIMIT $3 """ rows = await pool.fetch(query, agent_id, kind, limit) return [ { "id": str(row["id"]), "version": row["version"], "content": row["content"], "created_at": row["created_at"].isoformat() if row["created_at"] else None, "created_by": row["created_by"], "note": row["note"], "is_active": row["is_active"] } for row in rows ] async def get_agent_public_profile(agent_id: str) -> Optional[dict]: """Отримати публічний профіль агента""" pool = await get_pool() query = """ SELECT is_public, public_slug, public_title, public_tagline, COALESCE(public_skills, ARRAY[]::text[]) AS public_skills, public_district, public_primary_room_slug, COALESCE(visibility_scope, 'city') AS visibility_scope, COALESCE(is_listed_in_directory, true) AS is_listed_in_directory, COALESCE(is_system, false) AS is_system FROM agents WHERE id = $1 """ row = await pool.fetchrow(query, agent_id) if not row: return None return { "is_public": row["is_public"], "public_slug": row["public_slug"], "public_title": row["public_title"], "public_tagline": row["public_tagline"], "public_skills": list(row["public_skills"] or []), "public_district": row["public_district"], "public_primary_room_slug": row["public_primary_room_slug"], "visibility_scope": row["visibility_scope"], "is_listed_in_directory": row["is_listed_in_directory"], "is_system": row["is_system"] } async def get_agents_with_home_node( kind: Optional[str] = None, node_id: Optional[str] = None, limit: int = 100, offset: int = 0 ) -> Tuple[List[dict], int]: """Отримати агентів з інформацією про home_node""" pool = await get_pool() params: List[Any] = [] where_clauses = [ "COALESCE(a.is_archived, false) = false", "COALESCE(a.is_test, false) = false", "a.deleted_at IS NULL" ] if kind: params.append(kind) where_clauses.append(f"a.kind = ${len(params)}") if node_id: params.append(node_id) where_clauses.append(f"a.node_id = ${len(params)}") where_sql = " AND ".join(where_clauses) query = f""" SELECT a.id, a.display_name, a.kind, a.avatar_url, a.status, a.is_public, a.public_slug, a.public_title, a.public_district, a.node_id, nc.node_name AS home_node_name, nc.hostname AS home_node_hostname, nc.roles AS home_node_roles, nc.environment AS home_node_environment, COUNT(*) OVER() AS total_count FROM agents a LEFT JOIN node_cache nc ON a.node_id = nc.node_id WHERE {where_sql} ORDER BY a.display_name LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} """ params.append(limit) params.append(offset) rows = await pool.fetch(query, *params) if not rows: return [], 0 total = rows[0]["total_count"] items = [] for row in rows: data = dict(row) data.pop("total_count", None) # Build home_node object if data.get("node_id"): data["home_node"] = { "id": data.get("node_id"), "name": data.get("home_node_name"), "hostname": data.get("home_node_hostname"), "roles": list(data.get("home_node_roles") or []), "environment": data.get("home_node_environment") } else: data["home_node"] = None # Clean up intermediate fields for key in ["home_node_name", "home_node_hostname", "home_node_roles", "home_node_environment"]: data.pop(key, None) items.append(data) return items, total async def get_agents_by_room(room_id: str) -> List[dict]: """Отримати агентів у конкретній кімнаті""" pool = await get_pool() query = """ SELECT id, display_name, kind, avatar_url, color, status, current_room_id, capabilities FROM agents WHERE current_room_id = $1 AND status != 'offline' ORDER BY display_name """ rows = await pool.fetch(query, room_id) return [dict(row) for row in rows] async def get_online_agents() -> List[dict]: """Отримати всіх онлайн агентів""" pool = await get_pool() query = """ SELECT id, display_name, kind, avatar_url, color, status, current_room_id, capabilities FROM agents WHERE status IN ('online', 'busy') ORDER BY display_name """ rows = await pool.fetch(query) return [dict(row) for row in rows] async def update_agent_status(agent_id: str, status: str, room_id: Optional[str] = None) -> Optional[dict]: """Оновити статус агента""" pool = await get_pool() if room_id: query = """ UPDATE agents SET status = $2, current_room_id = $3, updated_at = NOW() WHERE id = $1 RETURNING id, display_name, kind, status, current_room_id """ row = await pool.fetchrow(query, agent_id, status, room_id) else: query = """ UPDATE agents SET status = $2, updated_at = NOW() WHERE id = $1 RETURNING id, display_name, kind, status, current_room_id """ row = await pool.fetchrow(query, agent_id, status) return dict(row) if row else None async def get_agent_by_id(agent_id: str) -> Optional[dict]: """Отримати агента по ID або public_slug""" pool = await get_pool() query = """ SELECT a.id, a.display_name, a.kind, a.status, a.node_id, a.role, a.avatar_url, COALESCE(a.color_hint, a.color, 'cyan') AS color, a.capabilities, a.primary_room_slug, a.public_primary_room_slug, a.public_district, a.public_title, a.public_tagline, a.public_skills, a.public_slug, a.is_public, a.district AS home_district, a.crew_team_key, a.dagi_status, a.last_seen_at, COALESCE(a.is_node_guardian, false) as is_node_guardian, COALESCE(a.is_node_steward, false) as is_node_steward FROM agents a WHERE a.id = $1 OR a.public_slug = $1 """ row = await pool.fetchrow(query, agent_id) if not row: return None agent = dict(row) agent["capabilities"] = _normalize_capabilities(agent.get("capabilities")) if agent.get("public_skills") is None: agent["public_skills"] = [] # Populate crew_info if agent.get("crew_team_key"): agent["crew_info"] = { "has_crew_team": True, "crew_team_key": agent["crew_team_key"], "matrix_room_id": None # Populated later if needed } # If orchestrator, verify if room exists # For detailed view, let's try to fetch it if agent.get("primary_room_slug"): # Just a placeholder check, logic should be outside or specific method pass else: agent["crew_info"] = { "has_crew_team": False, "crew_team_key": None, "matrix_room_id": None } return agent async def get_agent_public_profile(agent_id: str) -> Optional[dict]: """Отримати публічний профіль агента""" pool = await get_pool() query = """ SELECT is_public, public_slug, public_title, public_tagline, public_skills, public_district, public_primary_room_slug FROM agents WHERE id = $1 """ row = await pool.fetchrow(query, agent_id) if not row: return None result = dict(row) if result.get("public_skills") is None: result["public_skills"] = [] return result async def update_agent_public_profile( agent_id: str, is_public: bool, public_slug: Optional[str], public_title: Optional[str], public_tagline: Optional[str], public_skills: Optional[List[str]], public_district: Optional[str], public_primary_room_slug: Optional[str] ) -> Optional[dict]: """Оновити публічний профіль агента""" pool = await get_pool() query = """ UPDATE agents SET is_public = $2, public_slug = $3, public_title = $4, public_tagline = $5, public_skills = $6, public_district = $7, public_primary_room_slug = $8, updated_at = NOW() WHERE id = $1 RETURNING is_public, public_slug, public_title, public_tagline, public_skills, public_district, public_primary_room_slug """ row = await pool.fetchrow( query, agent_id, is_public, public_slug, public_title, public_tagline, public_skills, public_district, public_primary_room_slug ) if not row: return None result = dict(row) if result.get("public_skills") is None: result["public_skills"] = [] return result async def get_agent_rooms(agent_id: str) -> List[dict]: """Отримати список кімнат агента (primary/public)""" pool = await get_pool() query = """ SELECT primary_room_slug, public_primary_room_slug FROM agents WHERE id = $1 """ row = await pool.fetchrow(query, agent_id) if not row: return [] slugs = [] if row.get("primary_room_slug"): slugs.append(row["primary_room_slug"]) if row.get("public_primary_room_slug") and row["public_primary_room_slug"] not in slugs: slugs.append(row["public_primary_room_slug"]) if not slugs: return [] rooms_query = """ SELECT id, slug, name FROM city_rooms WHERE slug = ANY($1::text[]) """ rooms = await pool.fetch(rooms_query, slugs) return [dict(room) for room in rooms] async def get_agent_matrix_config(agent_id: str) -> Optional[dict]: """Отримати Matrix-конфіг агента""" pool = await get_pool() query = """ SELECT agent_id, matrix_user_id, primary_room_id FROM agent_matrix_config WHERE agent_id = $1 """ row = await pool.fetchrow(query, agent_id) return dict(row) if row else None async def get_public_agent_by_slug(slug: str) -> Optional[dict]: """Отримати базову інформацію про публічного агента""" pool = await get_pool() query = """ SELECT id, display_name, public_primary_room_slug, primary_room_slug, public_district, public_title, public_tagline FROM agents WHERE public_slug = $1 AND is_public = true LIMIT 1 """ row = await pool.fetchrow(query, slug) return dict(row) if row else None async def get_microdao_for_agent(agent_id: str) -> Optional[dict]: """Отримати MicroDAO для агента (аліас get_agent_microdao)""" return await get_agent_microdao(agent_id) # ============================================================================= # Citizens Repository # ============================================================================= async def get_public_citizens( district: Optional[str] = None, kind: Optional[str] = None, q: Optional[str] = None, limit: int = 50, offset: int = 0 ) -> Tuple[List[dict], int]: """Отримати публічних громадян""" pool = await get_pool() params: List[Any] = [] where_clauses = [ "a.is_public = true", "COALESCE(a.is_archived, false) = false", "COALESCE(a.is_test, false) = false", "a.deleted_at IS NULL", # ROOMS_LAYER_RESTORE: Include agents with gov_level or specific kinds as citizens "(a.public_slug IS NOT NULL OR a.gov_level IN ('city_governance', 'district_lead', 'orchestrator', 'core_team') OR a.kind IN ('civic', 'governance', 'orchestrator'))" ] if district: params.append(district) where_clauses.append(f"a.public_district = ${len(params)}") if kind: params.append(kind) where_clauses.append(f"a.kind = ${len(params)}") if q: params.append(f"%{q}%") where_clauses.append( f"(a.display_name ILIKE ${len(params)} OR a.public_title ILIKE ${len(params)} OR a.public_tagline ILIKE ${len(params)})" ) where_sql = " AND ".join(where_clauses) query = f""" SELECT a.id, a.public_slug, a.display_name, a.public_title, a.public_tagline, a.avatar_url, a.kind, a.public_district, a.public_primary_room_slug, COALESCE(a.public_skills, ARRAY[]::text[]) AS public_skills, COALESCE(a.status, 'unknown') AS status, a.node_id, nc.node_name AS home_node_name, nc.hostname AS home_node_hostname, nc.roles AS home_node_roles, nc.environment AS home_node_environment, -- MicroDAO info m.slug AS home_microdao_slug, m.name AS home_microdao_name, -- Room info cr.id AS room_id, cr.slug AS room_slug, cr.name AS room_name, cr.matrix_room_id AS room_matrix_id, COUNT(*) OVER() AS total_count FROM agents a LEFT JOIN node_cache nc ON a.node_id = nc.node_id -- Join primary MicroDAO LEFT JOIN LATERAL ( SELECT ma.agent_id, md.slug, md.name FROM microdao_agents ma JOIN microdaos md ON ma.microdao_id = md.id WHERE ma.agent_id = a.id ORDER BY ma.is_core DESC, md.name LIMIT 1 ) m ON true -- Join primary room (by public_primary_room_slug) LEFT JOIN city_rooms cr ON cr.slug = a.public_primary_room_slug WHERE {where_sql} ORDER BY a.display_name LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} """ params.append(limit) params.append(offset) rows = await pool.fetch(query, *params) if not rows: return [], 0 total = rows[0]["total_count"] items = [] for row in rows: data = dict(row) data.pop("total_count", None) data["public_skills"] = list(data.get("public_skills") or []) data["online_status"] = data.get("status") or "unknown" # Build home_node object if data.get("node_id"): data["home_node"] = { "id": data.get("node_id"), "name": data.get("home_node_name"), "hostname": data.get("home_node_hostname"), "roles": list(data.get("home_node_roles") or []), "environment": data.get("home_node_environment") } else: data["home_node"] = None # Build primary_city_room object if data.get("room_id"): data["primary_city_room"] = { "id": str(data["room_id"]), "slug": data["room_slug"], "name": data["room_name"], "matrix_room_id": data.get("room_matrix_id") } else: data["primary_city_room"] = None # Clean up intermediate fields for key in ["home_node_name", "home_node_hostname", "home_node_roles", "home_node_environment", "room_id", "room_slug", "room_name", "room_matrix_id"]: data.pop(key, None) items.append(data) return items, total async def get_agent_microdao(agent_id: str) -> Optional[dict]: """Отримати MicroDAO, до якого належить агент (перший збіг)""" pool = await get_pool() query = """ SELECT m.id, m.slug, m.name, m.district FROM microdao_agents ma JOIN microdaos m ON m.id = ma.microdao_id WHERE ma.agent_id = $1 ORDER BY ma.is_core DESC, m.name LIMIT 1 """ row = await pool.fetchrow(query, agent_id) return dict(row) if row else None async def get_microdao_public_citizens(microdao_id: str) -> List[dict]: """Отримати публічних громадян конкретного MicroDAO""" pool = await get_pool() query = """ SELECT a.public_slug AS slug, a.display_name, a.public_title, a.public_tagline, a.avatar_url, a.public_district, a.public_primary_room_slug FROM microdao_agents ma JOIN agents a ON a.id = ma.agent_id WHERE ma.microdao_id = $1 AND a.is_public = true AND a.public_slug IS NOT NULL ORDER BY a.display_name """ rows = await pool.fetch(query, microdao_id) result = [] for row in rows: data = dict(row) result.append(data) return result async def get_public_citizen_by_slug(slug: str) -> Optional[dict]: """Отримати детальний профіль громадянина""" pool = await get_pool() query = """ SELECT a.id, a.display_name, a.kind, a.status, a.node_id, a.avatar_url, a.public_slug, a.public_title, a.public_tagline, COALESCE(a.public_skills, ARRAY[]::text[]) AS public_skills, a.public_district, a.public_primary_room_slug, a.primary_room_slug, nc.node_name AS home_node_name, nc.hostname AS home_node_hostname, nc.roles AS home_node_roles, nc.environment AS home_node_environment FROM agents a LEFT JOIN node_cache nc ON a.node_id = nc.node_id WHERE a.public_slug = $1 AND a.is_public = true LIMIT 1 """ agent_row = await pool.fetchrow(query, slug) if not agent_row: return None agent = dict(agent_row) agent["public_skills"] = list(agent.get("public_skills") or []) # Build home_node object home_node = None if agent.get("node_id"): home_node = { "id": agent.get("node_id"), "name": agent.get("home_node_name"), "hostname": agent.get("home_node_hostname"), "roles": list(agent.get("home_node_roles") or []), "environment": agent.get("home_node_environment") } rooms = await get_agent_rooms(agent["id"]) primary_room = agent.get("public_primary_room_slug") or agent.get("primary_room_slug") city_presence = { "primary_room_slug": primary_room, "rooms": rooms } if rooms else { "primary_room_slug": primary_room, "rooms": [] } dais_public = { "core": { "archetype": agent.get("kind"), "bio_short": agent.get("public_tagline") }, "phenotype": { "visual": { "avatar_url": agent.get("avatar_url"), "color": None } }, "memex": {}, "economics": {} } interaction = { "matrix_user": None, "primary_room_slug": primary_room, "actions": ["chat", "ask_for_help"] } metrics_public: Dict[str, Any] = {} microdao = await get_agent_microdao(agent["id"]) return { "slug": agent["public_slug"], "display_name": agent["display_name"], "kind": agent.get("kind"), "public_title": agent.get("public_title"), "public_tagline": agent.get("public_tagline"), "district": agent.get("public_district"), "avatar_url": agent.get("avatar_url"), "status": agent.get("status"), "node_id": agent.get("node_id"), "public_skills": agent.get("public_skills"), "city_presence": city_presence, "dais_public": dais_public, "interaction": interaction, "metrics_public": metrics_public, "microdao": microdao, "admin_panel_url": f"/agents/{agent['id']}", "home_node": home_node } # ============================================================================= # MicroDAO Membership Repository # ============================================================================= async def get_microdao_options() -> List[dict]: """Отримати список активних MicroDAO для селектора""" pool = await get_pool() query = """ SELECT id, slug, name, district, is_active FROM microdaos WHERE is_active = true ORDER BY name """ rows = await pool.fetch(query) return [dict(row) for row in rows] async def get_agent_microdao_memberships(agent_id: str) -> List[dict]: """Отримати всі членства агента в MicroDAO""" pool = await get_pool() query = """ SELECT ma.microdao_id, m.slug AS microdao_slug, m.name AS microdao_name, m.logo_url, ma.role, ma.is_core FROM microdao_agents ma JOIN microdaos m ON m.id = ma.microdao_id WHERE ma.agent_id = $1 ORDER BY ma.is_core DESC, m.name """ rows = await pool.fetch(query, agent_id) return [dict(row) for row in rows] async def upsert_agent_microdao_membership( agent_id: str, microdao_id: str, role: Optional[str], is_core: bool ) -> Optional[dict]: """Призначити або оновити членство агента в MicroDAO""" pool = await get_pool() query = """ WITH upsert AS ( INSERT INTO microdao_agents (microdao_id, agent_id, role, is_core) VALUES ($1, $2, $3, $4) ON CONFLICT (microdao_id, agent_id) DO UPDATE SET role = EXCLUDED.role, is_core = EXCLUDED.is_core RETURNING microdao_id, agent_id, role, is_core ) SELECT u.microdao_id, m.slug AS microdao_slug, m.name AS microdao_name, u.role, u.is_core FROM upsert u JOIN microdaos m ON m.id = u.microdao_id """ row = await pool.fetchrow(query, microdao_id, agent_id, role, is_core) return dict(row) if row else None async def remove_agent_microdao_membership(agent_id: str, microdao_id: str) -> bool: """Видалити членство агента в MicroDAO""" pool = await get_pool() result = await pool.execute( "DELETE FROM microdao_agents WHERE agent_id = $1 AND microdao_id = $2", agent_id, microdao_id ) # asyncpg returns strings like "DELETE 1" return result.split(" ")[-1] != "0" # ============================================================================= # MicroDAO Repository # ============================================================================= async def get_microdaos(district: Optional[str] = None, q: Optional[str] = None, limit: int = 50, offset: int = 0) -> List[dict]: """Отримати список MicroDAOs з агрегованою статистикою""" pool = await get_pool() params = [] where_clauses = [ "m.is_public = true", "m.is_active = true", "COALESCE(m.is_archived, false) = false", "COALESCE(m.is_test, false) = false", "m.deleted_at IS NULL" ] if district: params.append(district) where_clauses.append(f"m.district = ${len(params)}") if q: params.append(f"%{q}%") where_clauses.append(f"(m.name ILIKE ${len(params)} OR m.description ILIKE ${len(params)})") where_sql = " AND ".join(where_clauses) query = f""" SELECT m.id, m.slug, m.name, m.description, m.district, COALESCE(m.orchestrator_agent_id, m.owner_agent_id) as orchestrator_agent_id, oa.display_name as orchestrator_agent_name, m.is_active, COALESCE(m.is_public, true) as is_public, COALESCE(m.is_platform, false) as is_platform, COALESCE(m.is_pinned, false) as is_pinned, COALESCE(m.pinned_weight, 0) as pinned_weight, m.parent_microdao_id, pm.slug as parent_microdao_slug, m.logo_url, m.banner_url, COUNT(DISTINCT ma.agent_id) AS agents_count, COUNT(DISTINCT ma.agent_id) AS member_count, COUNT(DISTINCT mc.id) AS channels_count, COUNT(DISTINCT CASE WHEN mc.kind = 'city_room' THEN mc.id END) AS rooms_count, COUNT(DISTINCT CASE WHEN mc.kind = 'city_room' THEN mc.id END) AS room_count FROM microdaos m LEFT JOIN microdao_agents ma ON ma.microdao_id = m.id LEFT JOIN microdao_channels mc ON mc.microdao_id = m.id LEFT JOIN agents oa ON COALESCE(m.orchestrator_agent_id, m.owner_agent_id) = oa.id LEFT JOIN microdaos pm ON m.parent_microdao_id = pm.id WHERE {where_sql} GROUP BY m.id, oa.display_name, pm.slug ORDER BY COALESCE(m.is_pinned, false) DESC, COALESCE(m.pinned_weight, 0) ASC, m.created_at ASC LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} """ # Append limit and offset to params params.append(limit) params.append(offset) rows = await pool.fetch(query, *params) return [dict(row) for row in rows] async def list_microdao_summaries( *, is_public: Optional[bool] = None, is_platform: Optional[bool] = None, district: Optional[str] = None, q: Optional[str] = None, limit: int = 50, offset: int = 0 ) -> List[dict]: """ Unified method to list microDAOs. Wraps get_microdaos with additional filtering. """ pool = await get_pool() params = [] where_clauses = [ "COALESCE(m.is_archived, false) = false", "COALESCE(m.is_test, false) = false", "m.deleted_at IS NULL", "m.is_active = true" ] if is_public is not None: params.append(is_public) where_clauses.append(f"COALESCE(m.is_public, true) = ${len(params)}") if is_platform is not None: params.append(is_platform) where_clauses.append(f"COALESCE(m.is_platform, false) = ${len(params)}") if district: params.append(district) where_clauses.append(f"m.district = ${len(params)}") if q: params.append(f"%{q}%") where_clauses.append(f"(m.name ILIKE ${len(params)} OR m.description ILIKE ${len(params)})") where_sql = " AND ".join(where_clauses) query = f""" SELECT m.id, m.slug, m.name, m.description, m.district, COALESCE(m.orchestrator_agent_id, m.owner_agent_id) as orchestrator_agent_id, oa.display_name as orchestrator_agent_name, m.is_active, COALESCE(m.is_public, true) as is_public, COALESCE(m.is_platform, false) as is_platform, COALESCE(m.is_pinned, false) as is_pinned, COALESCE(m.pinned_weight, 0) as pinned_weight, m.parent_microdao_id, pm.slug as parent_microdao_slug, m.logo_url, m.banner_url, COUNT(DISTINCT ma.agent_id) AS agents_count, COUNT(DISTINCT ma.agent_id) AS member_count, COUNT(DISTINCT mc.id) AS channels_count, COUNT(DISTINCT CASE WHEN mc.kind = 'city_room' THEN mc.id END) AS rooms_count, COUNT(DISTINCT CASE WHEN mc.kind = 'city_room' THEN mc.id END) AS room_count FROM microdaos m LEFT JOIN microdao_agents ma ON ma.microdao_id = m.id LEFT JOIN microdao_channels mc ON mc.microdao_id = m.id LEFT JOIN agents oa ON COALESCE(m.orchestrator_agent_id, m.owner_agent_id) = oa.id LEFT JOIN microdaos pm ON m.parent_microdao_id = pm.id WHERE {where_sql} GROUP BY m.id, oa.display_name, pm.slug ORDER BY COALESCE(m.is_pinned, false) DESC, COALESCE(m.pinned_weight, 0) ASC, m.created_at ASC LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} """ params.append(limit) params.append(offset) rows = await pool.fetch(query, *params) return [dict(row) for row in rows] async def get_microdao_detail(slug: str) -> Optional[dict]: """ Get detailed microDAO info including agents, channels, children. Alias for get_microdao_by_slug with clearer naming. """ return await get_microdao_by_slug(slug) async def get_microdao_by_slug(slug: str) -> Optional[dict]: """Отримати детальну інформацію про MicroDAO""" pool = await get_pool() # 1. Get main DAO info query_dao = """ SELECT m.id, m.slug, m.name, m.description, m.district, COALESCE(m.orchestrator_agent_id, m.owner_agent_id) as orchestrator_agent_id, a.display_name as orchestrator_display_name, m.is_active, COALESCE(m.is_public, true) as is_public, COALESCE(m.is_platform, false) as is_platform, m.parent_microdao_id, pm.slug as parent_microdao_slug, m.logo_url, m.banner_url FROM microdaos m LEFT JOIN agents a ON COALESCE(m.orchestrator_agent_id, m.owner_agent_id) = a.id LEFT JOIN microdaos pm ON m.parent_microdao_id = pm.id WHERE m.slug = $1 AND COALESCE(m.is_archived, false) = false AND COALESCE(m.is_test, false) = false AND m.deleted_at IS NULL """ dao_row = await pool.fetchrow(query_dao, slug) if not dao_row: return None result = dict(dao_row) dao_id = result["id"] # 2. Get Agents query_agents = """ SELECT ma.agent_id, ma.role, ma.is_core, a.display_name FROM microdao_agents ma JOIN agents a ON ma.agent_id = a.id WHERE ma.microdao_id = $1 AND COALESCE(a.is_archived, false) = false AND COALESCE(a.is_test, false) = false AND a.deleted_at IS NULL ORDER BY ma.is_core DESC, ma.role """ agents_rows = await pool.fetch(query_agents, dao_id) result["agents"] = [dict(row) for row in agents_rows] # 3. Get Channels query_channels = """ SELECT kind, ref_id, display_name, is_primary FROM microdao_channels WHERE microdao_id = $1 ORDER BY is_primary DESC, kind """ channels_rows = await pool.fetch(query_channels, dao_id) result["channels"] = [dict(row) for row in channels_rows] # 4. Get child microDAOs query_children = """ SELECT id, slug, name, COALESCE(is_public, true) as is_public, COALESCE(is_platform, false) as is_platform FROM microdaos WHERE parent_microdao_id = $1 AND COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false AND deleted_at IS NULL ORDER BY name """ children_rows = await pool.fetch(query_children, dao_id) result["child_microdaos"] = [dict(row) for row in children_rows] public_citizens = await get_microdao_public_citizens(dao_id) result["public_citizens"] = public_citizens return result async def update_microdao_branding( microdao_slug: str, logo_url: Optional[str] = None, banner_url: Optional[str] = None ) -> Optional[dict]: """Оновити брендинг MicroDAO""" pool = await get_pool() set_parts = ["updated_at = NOW()"] params = [microdao_slug] if logo_url is not None: params.append(logo_url) set_parts.append(f"logo_url = ${len(params)}") if banner_url is not None: params.append(banner_url) set_parts.append(f"banner_url = ${len(params)}") query = f""" UPDATE microdaos SET {', '.join(set_parts)} WHERE slug = $1 RETURNING id, slug, name, logo_url, banner_url """ row = await pool.fetchrow(query, *params) return dict(row) if row else None async def update_room_branding( room_id: str, logo_url: Optional[str] = None, banner_url: Optional[str] = None ) -> Optional[dict]: """Оновити брендинг кімнати""" pool = await get_pool() set_parts = ["updated_at = NOW()"] params = [room_id] if logo_url is not None: params.append(logo_url) set_parts.append(f"logo_url = ${len(params)}") if banner_url is not None: params.append(banner_url) set_parts.append(f"banner_url = ${len(params)}") query = f""" UPDATE city_rooms SET {', '.join(set_parts)} WHERE id = $1 RETURNING id, slug, name, logo_url, banner_url """ row = await pool.fetchrow(query, *params) return dict(row) if row else None # ============================================================================= # Nodes Repository # ============================================================================= async def get_all_nodes() -> List[dict]: """Отримати список всіх нод з кількістю агентів, Guardian/Steward та метриками. ДЖЕРЕЛО ІСТИНИ: 1. node_registry (якщо існує) + node_cache (метрики) 2. Fallback: тільки node_cache (для зворотної сумісності) """ pool = await get_pool() # Перевіримо чи існує node_registry try: exists = await pool.fetchval(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = 'node_registry' ) """) except Exception: exists = False if exists: # Використовуємо node_registry як джерело істини query = """ SELECT COALESCE(nr.id, nc.node_id) as node_id, COALESCE(nr.name, nc.node_name) AS name, COALESCE(nr.hostname, nc.hostname) as hostname, COALESCE(nr.roles, nc.roles) as roles, COALESCE(nr.environment, nc.environment) as environment, COALESCE(nc.status, 'unknown') as status, nc.gpu, COALESCE(nc.last_heartbeat, nc.last_sync) AS last_heartbeat, nc.guardian_agent_id, nc.steward_agent_id, -- Metrics nc.cpu_model, nc.cpu_cores, COALESCE(nc.cpu_usage, 0) as cpu_usage, nc.gpu_model, COALESCE(nc.gpu_vram_total, 0) as gpu_vram_total, COALESCE(nc.gpu_vram_used, 0) as gpu_vram_used, COALESCE(nc.ram_total, 0) as ram_total, COALESCE(nc.ram_used, 0) as ram_used, COALESCE(nc.disk_total, 0) as disk_total, COALESCE(nc.disk_used, 0) as disk_used, COALESCE(nc.agent_count_router, 0) as agent_count_router, COALESCE(nc.agent_count_system, 0) as agent_count_system, nc.last_heartbeat as metrics_heartbeat, nc.dagi_router_url, -- Self-healing status (may not exist yet) NULL as self_healing_status, -- Registry info nr.description as node_description, nr.is_active as registry_active, nr.last_self_registration, -- Agent counts (dynamic) (SELECT COUNT(*) FROM agents a WHERE a.node_id = COALESCE(nr.id, nc.node_id) AND COALESCE(a.is_archived, false) = false AND a.deleted_at IS NULL) AS agents_total, (SELECT COUNT(*) FROM agents a WHERE a.node_id = COALESCE(nr.id, nc.node_id) AND a.status = 'online' AND COALESCE(a.is_archived, false) = false) AS agents_online, ga.display_name AS guardian_name, ga.public_slug AS guardian_slug, sa.display_name AS steward_name, sa.public_slug AS steward_slug FROM node_registry nr LEFT JOIN node_cache nc ON nc.node_id = nr.id LEFT JOIN agents ga ON nc.guardian_agent_id = ga.id LEFT JOIN agents sa ON nc.steward_agent_id = sa.id WHERE nr.is_active = true ORDER BY nr.environment DESC, nr.name """ try: rows = await pool.fetch(query) except Exception as e: logger.warning(f"node_registry query failed: {e}") rows = [] else: rows = [] # Fallback: якщо node_registry не існує або порожній, використовуємо node_cache if not rows: logger.info("Using node_cache as fallback for get_all_nodes") query_fallback = """ SELECT nc.node_id, nc.node_name AS name, nc.hostname, nc.roles, nc.environment, nc.status, nc.gpu, COALESCE(nc.last_heartbeat, nc.last_sync) AS last_heartbeat, nc.guardian_agent_id, nc.steward_agent_id, nc.cpu_model, nc.cpu_cores, COALESCE(nc.cpu_usage, 0) as cpu_usage, nc.gpu_model, COALESCE(nc.gpu_vram_total, 0) as gpu_vram_total, COALESCE(nc.gpu_vram_used, 0) as gpu_vram_used, COALESCE(nc.ram_total, 0) as ram_total, COALESCE(nc.ram_used, 0) as ram_used, COALESCE(nc.disk_total, 0) as disk_total, COALESCE(nc.disk_used, 0) as disk_used, COALESCE(nc.agent_count_router, 0) as agent_count_router, COALESCE(nc.agent_count_system, 0) as agent_count_system, nc.last_heartbeat as metrics_heartbeat, nc.dagi_router_url, NULL as self_healing_status, NULL as node_description, true as registry_active, NULL as last_self_registration, (SELECT COUNT(*) FROM agents a WHERE a.node_id = nc.node_id AND COALESCE(a.is_archived, false) = false AND a.deleted_at IS NULL) AS agents_total, (SELECT COUNT(*) FROM agents a WHERE a.node_id = nc.node_id AND a.status = 'online' AND COALESCE(a.is_archived, false) = false) AS agents_online, ga.display_name AS guardian_name, ga.public_slug AS guardian_slug, sa.display_name AS steward_name, sa.public_slug AS steward_slug FROM node_cache nc LEFT JOIN agents ga ON nc.guardian_agent_id = ga.id LEFT JOIN agents sa ON nc.steward_agent_id = sa.id ORDER BY nc.environment DESC, nc.node_name """ try: rows = await pool.fetch(query_fallback) except Exception as e: logger.error(f"Fallback node_cache query also failed: {e}") rows = [] result = [] for row in rows: data = dict(row) # Build guardian_agent object if data.get("guardian_agent_id"): data["guardian_agent"] = { "id": data.get("guardian_agent_id"), "name": data.get("guardian_name"), "slug": data.get("guardian_slug"), } else: data["guardian_agent"] = None # Build steward_agent object if data.get("steward_agent_id"): data["steward_agent"] = { "id": data.get("steward_agent_id"), "name": data.get("steward_name"), "slug": data.get("steward_slug"), } else: data["steward_agent"] = None # Build metrics object data["metrics"] = { "cpu_model": data.get("cpu_model"), "cpu_cores": data.get("cpu_cores", 0), "cpu_usage": float(data.get("cpu_usage", 0)), "gpu_model": data.get("gpu_model"), "gpu_vram_total": data.get("gpu_vram_total", 0), "gpu_vram_used": data.get("gpu_vram_used", 0), "ram_total": data.get("ram_total", 0), "ram_used": data.get("ram_used", 0), "disk_total": data.get("disk_total", 0), "disk_used": data.get("disk_used", 0), "agent_count_router": data.get("agent_count_router", 0), "agent_count_system": data.get("agent_count_system", 0), "dagi_router_url": data.get("dagi_router_url"), } # Clean up internal fields data.pop("guardian_name", None) data.pop("steward_name", None) data.pop("guardian_slug", None) data.pop("steward_slug", None) data.pop("cpu_model", None) data.pop("cpu_cores", None) data.pop("cpu_usage", None) data.pop("gpu_model", None) data.pop("gpu_vram_total", None) data.pop("gpu_vram_used", None) data.pop("ram_total", None) data.pop("ram_used", None) data.pop("disk_total", None) data.pop("disk_used", None) data.pop("agent_count_router", None) data.pop("agent_count_system", None) data.pop("dagi_router_url", None) data.pop("metrics_heartbeat", None) result.append(data) return result async def get_node_by_id(node_id: str) -> Optional[dict]: """Отримати ноду по ID з Guardian та Steward агентами""" pool = await get_pool() query = """ SELECT nc.node_id, nc.node_name AS name, nc.hostname, nc.roles, nc.environment, nc.status, nc.gpu, nc.last_sync AS last_heartbeat, nc.guardian_agent_id, nc.steward_agent_id, (SELECT COUNT(*) FROM agents a WHERE a.node_id = nc.node_id) AS agents_total, (SELECT COUNT(*) FROM agents a WHERE a.node_id = nc.node_id AND a.status = 'online') AS agents_online, -- Guardian agent info ga.display_name AS guardian_name, ga.kind AS guardian_kind, ga.public_slug AS guardian_slug, -- Steward agent info sa.display_name AS steward_name, sa.kind AS steward_kind, sa.public_slug AS steward_slug FROM node_cache nc LEFT JOIN agents ga ON nc.guardian_agent_id = ga.id LEFT JOIN agents sa ON nc.steward_agent_id = sa.id WHERE nc.node_id = $1 """ row = await pool.fetchrow(query, node_id) if not row: return None data = dict(row) # Fetch MicroDAOs where orchestrator is on this node microdaos = await pool.fetch(""" SELECT m.id, m.slug, m.name, COUNT(cr.id) as rooms_count FROM microdaos m JOIN agents a ON m.orchestrator_agent_id = a.id LEFT JOIN city_rooms cr ON cr.microdao_id::text = m.id WHERE a.node_id = $1 GROUP BY m.id, m.slug, m.name ORDER BY m.name """, node_id) data["microdaos"] = [dict(m) for m in microdaos] # Build guardian_agent object if data.get("guardian_agent_id"): data["guardian_agent"] = { "id": data.get("guardian_agent_id"), "name": data.get("guardian_name"), "kind": data.get("guardian_kind"), "slug": data.get("guardian_slug"), } else: data["guardian_agent"] = None # Build steward_agent object if data.get("steward_agent_id"): data["steward_agent"] = { "id": data.get("steward_agent_id"), "name": data.get("steward_name"), "kind": data.get("steward_kind"), "slug": data.get("steward_slug"), } else: data["steward_agent"] = None # TASK 038: Dynamic discovery of Node Guardian / Steward if cache is empty if not data["guardian_agent"] or not data["steward_agent"]: dynamic_agents = await pool.fetch(""" SELECT id, display_name, kind, public_slug FROM agents WHERE node_id = $1 AND (kind IN ('node_guardian', 'node_steward') OR kind IN ('infra_monitor', 'infra_ops')) AND COALESCE(is_archived, false) = false """, node_id) if not data["guardian_agent"]: # Prefer 'node_guardian', fallback to 'infra_monitor' guardian = next((a for a in dynamic_agents if a['kind'] == 'node_guardian'), next((a for a in dynamic_agents if a['kind'] == 'infra_monitor'), None)) if guardian: data["guardian_agent"] = { "id": guardian["id"], "name": guardian["display_name"], "kind": guardian["kind"], "slug": guardian["public_slug"] } if not data["steward_agent"]: # Prefer 'node_steward', fallback to 'infra_ops' steward = next((a for a in dynamic_agents if a['kind'] == 'node_steward'), next((a for a in dynamic_agents if a['kind'] == 'infra_ops'), None)) if steward: data["steward_agent"] = { "id": steward["id"], "name": steward["display_name"], "kind": steward["kind"], "slug": steward["public_slug"] } # Clean up intermediate fields for key in ["guardian_name", "guardian_kind", "guardian_slug", "steward_name", "steward_kind", "steward_slug"]: data.pop(key, None) return data # ============================================================================= # MicroDAO Visibility & Creation (Task 029) # ============================================================================= async def update_microdao_visibility( microdao_id: str, *, is_public: bool, is_platform: Optional[bool] = None, ) -> Optional[dict]: """ Оновити налаштування видимості MicroDAO. Returns updated MicroDAO data or None if not found. """ pool = await get_pool() set_parts = ["is_public = $2", "updated_at = NOW()"] params = [microdao_id, is_public] if is_platform is not None: params.append(is_platform) set_parts.append(f"is_platform = ${len(params)}") query = f""" UPDATE microdaos SET {', '.join(set_parts)} WHERE id = $1 AND COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false RETURNING id, slug, name, is_public, is_platform """ result = await pool.fetchrow(query, *params) return dict(result) if result else None async def create_microdao_for_agent( orchestrator_agent_id: str, *, name: str, slug: str, description: Optional[str] = None, make_platform: bool = False, is_public: bool = True, parent_microdao_id: Optional[str] = None, ) -> Optional[dict]: """ Створює microDAO, прив'язує його до агента-оркестратора. 1. INSERT новий microDAO 2. Додати агента в microdao_agents 3. Оновити агента: primary_microdao_id, is_orchestrator = true 4. Повернути створений microDAO """ pool = await get_pool() import uuid microdao_id = str(uuid.uuid4()) async with pool.acquire() as conn: async with conn.transaction(): # 1. Create microDAO insert_dao_query = """ INSERT INTO microdaos ( id, slug, name, description, orchestrator_agent_id, is_public, is_platform, parent_microdao_id, is_active, created_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, true, NOW()) RETURNING id, slug, name, description, is_public, is_platform """ dao_row = await conn.fetchrow( insert_dao_query, microdao_id, slug, name, description, orchestrator_agent_id, is_public, make_platform, parent_microdao_id ) if not dao_row: return None # 2. Add agent to microdao_agents as orchestrator insert_member_query = """ INSERT INTO microdao_agents (microdao_id, agent_id, role, is_core, joined_at) VALUES ($1, $2, 'orchestrator', true, NOW()) ON CONFLICT (microdao_id, agent_id) DO UPDATE SET role = 'orchestrator', is_core = true """ await conn.execute(insert_member_query, microdao_id, orchestrator_agent_id) # 3. Update agent: set primary_microdao_id if empty, set is_orchestrator = true # Also set public_slug if is_public, so orchestrator becomes a public citizen update_agent_query = """ UPDATE agents SET is_orchestrator = true, is_public = CASE WHEN $3 THEN true ELSE is_public END, public_slug = CASE WHEN $3 AND (public_slug IS NULL OR public_slug = '') THEN id ELSE public_slug END, primary_microdao_id = COALESCE(primary_microdao_id, $2), updated_at = NOW() WHERE id = $1 """ await conn.execute(update_agent_query, orchestrator_agent_id, microdao_id, is_public) return dict(dao_row) async def get_microdao_primary_room(microdao_id: str) -> Optional[dict]: """ Отримати основну кімнату MicroDAO для чату. Пріоритет: room_role='primary' → найнижчий sort_order → перша кімната. """ pool = await get_pool() query = """ SELECT cr.id, cr.slug, cr.name, cr.matrix_room_id, cr.microdao_id, cr.room_role, cr.is_public, cr.sort_order FROM city_rooms cr WHERE cr.microdao_id::text = $1 ORDER BY CASE WHEN cr.room_role = 'primary' THEN 0 ELSE 1 END, cr.sort_order ASC, cr.name ASC LIMIT 1 """ row = await pool.fetchrow(query, microdao_id) if row: return { "id": str(row["id"]), "slug": row["slug"], "name": row["name"], "matrix_room_id": row.get("matrix_room_id"), "microdao_id": str(row["microdao_id"]) if row.get("microdao_id") else None, "room_role": row.get("room_role"), "is_public": row.get("is_public", True), "sort_order": row.get("sort_order", 100) } return None async def get_microdao_rooms(microdao_id: str) -> List[dict]: """ Отримати всі кімнати MicroDAO, впорядковані за sort_order. Шукає по microdao_id АБО owner_id (для нових кімнат з owner_type='microdao'). """ pool = await get_pool() query = """ SELECT cr.id, cr.slug, cr.name, cr.matrix_room_id, COALESCE(cr.microdao_id::text, cr.owner_id) AS microdao_id, cr.room_role, cr.is_public, cr.sort_order, cr.logo_url, cr.banner_url, m.slug AS microdao_slug FROM city_rooms cr LEFT JOIN microdaos m ON COALESCE(cr.microdao_id::text, cr.owner_id) = m.id WHERE cr.microdao_id::text = $1 OR (cr.owner_type = 'microdao' AND cr.owner_id = $1) ORDER BY CASE WHEN cr.room_role = 'primary' THEN 0 ELSE 1 END, cr.sort_order ASC, cr.name ASC """ rows = await pool.fetch(query, microdao_id) return [ { "id": str(row["id"]), "slug": row["slug"], "name": row["name"], "matrix_room_id": row.get("matrix_room_id"), "microdao_id": str(row["microdao_id"]) if row.get("microdao_id") else None, "microdao_slug": row.get("microdao_slug"), "room_role": row.get("room_role"), "is_public": row.get("is_public", True), "sort_order": row.get("sort_order", 100), "logo_url": row.get("logo_url"), "banner_url": row.get("banner_url") } for row in rows ] async def get_microdao_rooms_by_slug(slug: str) -> Optional[dict]: """ Отримати MicroDAO та всі його кімнати за slug. """ pool = await get_pool() # Get microdao first microdao_query = """ SELECT id, slug FROM microdaos WHERE slug = $1 AND COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false """ microdao = await pool.fetchrow(microdao_query, slug) if not microdao: return None microdao_id = str(microdao["id"]) rooms = await get_microdao_rooms(microdao_id) return { "microdao_id": microdao_id, "microdao_slug": microdao["slug"], "rooms": rooms } async def attach_room_to_microdao( microdao_id: str, room_id: str, room_role: Optional[str] = None, is_public: bool = True, sort_order: int = 100 ) -> Optional[dict]: """ Прив'язати існуючу кімнату до MicroDAO. """ pool = await get_pool() query = """ UPDATE city_rooms SET microdao_id = $1, room_role = $2, is_public = $3, sort_order = $4 WHERE id = $5 RETURNING id, slug, name, matrix_room_id, microdao_id, room_role, is_public, sort_order, logo_url, banner_url """ row = await pool.fetchrow(query, microdao_id, room_role, is_public, sort_order, room_id) if row: return { "id": str(row["id"]), "slug": row["slug"], "name": row["name"], "matrix_room_id": row.get("matrix_room_id"), "microdao_id": str(row["microdao_id"]) if row.get("microdao_id") else None, "room_role": row.get("room_role"), "is_public": row.get("is_public", True), "sort_order": row.get("sort_order", 100), "logo_url": row.get("logo_url"), "banner_url": row.get("banner_url") } return None async def update_microdao_room( microdao_id: str, room_id: str, room_role: Optional[str] = None, is_public: Optional[bool] = None, sort_order: Optional[int] = None, set_primary: bool = False ) -> Optional[dict]: """ Оновити налаштування кімнати MicroDAO. Якщо set_primary=True, скидає роль 'primary' з інших кімнат. """ pool = await get_pool() async with pool.acquire() as conn: async with conn.transaction(): # If setting as primary, clear previous primary if set_primary: await conn.execute( """ UPDATE city_rooms SET room_role = NULL WHERE microdao_id = $1 AND room_role = 'primary' """, microdao_id ) room_role = 'primary' # Build update query set_parts = [] params = [room_id, microdao_id] param_idx = 3 if room_role is not None: set_parts.append(f"room_role = ${param_idx}") params.append(room_role) param_idx += 1 if is_public is not None: set_parts.append(f"is_public = ${param_idx}") params.append(is_public) param_idx += 1 if sort_order is not None: set_parts.append(f"sort_order = ${param_idx}") params.append(sort_order) param_idx += 1 if not set_parts: # Nothing to update, just return current state row = await conn.fetchrow( "SELECT * FROM city_rooms WHERE id = $1 AND microdao_id = $2", room_id, microdao_id ) else: query = f""" UPDATE city_rooms SET {', '.join(set_parts)} WHERE id = $1 AND microdao_id = $2 RETURNING id, slug, name, matrix_room_id, microdao_id, room_role, is_public, sort_order, logo_url, banner_url """ row = await conn.fetchrow(query, *params) if row: return { "id": str(row["id"]), "slug": row["slug"], "name": row["name"], "matrix_room_id": row.get("matrix_room_id"), "microdao_id": str(row["microdao_id"]) if row.get("microdao_id") else None, "room_role": row.get("room_role"), "is_public": row.get("is_public", True), "sort_order": row.get("sort_order", 100), "logo_url": row.get("logo_url"), "banner_url": row.get("banner_url") } return None # ============================================================================= # TASK 044: Orchestrator Crew Team Room # ============================================================================= async def create_matrix_room_for_microdao_orchestrator( microdao_id: str, microdao_name: str, orchestrator_agent_id: str ) -> Optional[dict]: """ Викликати Matrix Gateway для створення кімнати команди оркестратора. """ # TODO: This should ideally be done with a proper Matrix user (e.g. app bot or the orchestrator agent itself if possible) # For now, we'll use the system admin user logic in matrix-gateway or a specialized endpoint. # Since we are in repo, we don't have the user's token. We rely on matrix-gateway internal API. async with httpx.AsyncClient(timeout=30.0) as client: try: # Ensure matrix room alias is unique room_alias = f"orchestrator_team_{microdao_id[:8]}" room_name = f"{microdao_name} — Orchestrator Team" # Call Matrix Gateway to create room # Using /internal/matrix/rooms/create (assuming it exists or we reuse a similar logic) # If not, we might need to implement it in gateway-bot. # Let's assume we use a new endpoint or existing one. # Actually, we can reuse POST /internal/matrix/rooms if it exists or just use bot API. # NOTE: In real implementation, we need to authenticate this request or ensure network security. resp = await client.post( f"{MATRIX_GATEWAY_URL}/internal/matrix/rooms", json={ "alias": room_alias, "name": room_name, "topic": "Private team chat for MicroDAO Orchestrator", "preset": "private_chat", # or public_chat, but team chat usually private "initial_state": [] } ) if resp.status_code not in (200, 201): logger.error(f"Matrix Gateway failed to create room: {resp.text}") return None data = resp.json() return { "room_id": data["room_id"], "room_alias": data.get("room_alias", room_alias) } except Exception as e: logger.error(f"Failed to create matrix room via gateway: {e}") return None async def get_or_create_orchestrator_team_room(microdao_id: str) -> Optional[dict]: """ Знайти або створити кімнату команди оркестратора для MicroDAO. """ pool = await get_pool() # 1. Check if room exists in DB existing_room_query = """ SELECT cr.id, cr.slug, cr.name, cr.matrix_room_id, cr.microdao_id, cr.room_role, cr.is_public, cr.sort_order FROM city_rooms cr WHERE cr.microdao_id::text = $1 AND cr.room_role = 'orchestrator_team' LIMIT 1 """ room_row = await pool.fetchrow(existing_room_query, microdao_id) if room_row: return dict(room_row) # 2. If not, fetch MicroDAO details to create one microdao_query = """ SELECT id, name, slug, orchestrator_agent_id FROM microdaos WHERE id = $1 """ microdao = await pool.fetchrow(microdao_query, microdao_id) if not microdao or not microdao["orchestrator_agent_id"]: logger.warning(f"MicroDAO {microdao_id} not found or has no orchestrator") return None # 3. Create Matrix room matrix_info = await create_matrix_room_for_microdao_orchestrator( microdao_id=microdao_id, microdao_name=microdao["name"], orchestrator_agent_id=microdao["orchestrator_agent_id"] ) if not matrix_info: logger.error("Failed to create Matrix room for orchestrator team") # Fallback: Create DB record without Matrix ID if needed, or fail? # Let's fail for now as Matrix ID is crucial for this feature. return None # 4. Create DB record slug = f"{microdao['slug']}-team" # Ensure unique slug while True: check_slug = await pool.fetchrow("SELECT 1 FROM city_rooms WHERE slug = $1", slug) if not check_slug: break slug = f"{slug}-{secrets.token_hex(2)}" create_query = """ INSERT INTO city_rooms ( id, slug, name, description, created_by, matrix_room_id, matrix_room_alias, microdao_id, room_role, is_public, sort_order ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id, slug, name, matrix_room_id, microdao_id, room_role, is_public, sort_order """ room_id = f"room_city_{slug}" new_room = await pool.fetchrow( create_query, room_id, slug, f"{microdao['name']} Team", "Orchestrator Team Chat", "system", matrix_info["room_id"], matrix_info.get("room_alias"), microdao_id, "orchestrator_team", False, # Private by default 50 # Sort order (high priority) ) return dict(new_room) # ============================================================================= # Districts Repository (DB-based, no hardcodes) # ============================================================================= async def get_districts() -> List[Dict[str, Any]]: """ Отримати всі District-и з БД. District = microdao з dao_type = 'district' """ pool = await get_pool() query = """ SELECT id, slug, name, description, dao_type, orchestrator_agent_id, created_at FROM microdaos WHERE dao_type = 'district' ORDER BY name """ rows = await pool.fetch(query) return [dict(r) for r in rows] async def get_district_by_slug(slug: str) -> Optional[Dict[str, Any]]: """ Отримати District за slug. """ pool = await get_pool() query = """ SELECT id, slug, name, description, dao_type, orchestrator_agent_id, created_at FROM microdaos WHERE slug = $1 AND dao_type = 'district' """ row = await pool.fetchrow(query, slug) return dict(row) if row else None async def get_district_lead_agent(district_id: str) -> Optional[Dict[str, Any]]: """ Отримати lead agent District-а. Шукаємо спочатку role='district_lead', потім fallback на orchestrator. """ pool = await get_pool() # Try district_lead first query = """ SELECT a.id, a.display_name as name, a.kind, a.status, a.avatar_url, a.gov_level, ma.role as membership_role FROM agents a JOIN microdao_agents ma ON ma.agent_id = a.id WHERE ma.microdao_id = $1 AND ma.role = 'district_lead' LIMIT 1 """ row = await pool.fetchrow(query, district_id) if not row: # Fallback: orchestrator query = """ SELECT a.id, a.display_name as name, a.kind, a.status, a.avatar_url, a.gov_level, ma.role as membership_role FROM agents a JOIN microdao_agents ma ON ma.agent_id = a.id WHERE ma.microdao_id = $1 AND (ma.role = 'orchestrator' OR ma.is_core = true) ORDER BY ma.is_core DESC LIMIT 1 """ row = await pool.fetchrow(query, district_id) return dict(row) if row else None async def get_district_core_team(district_id: str) -> List[Dict[str, Any]]: """ Отримати core team District-а. """ pool = await get_pool() query = """ SELECT a.id, a.display_name as name, a.kind, a.status, a.avatar_url, a.gov_level, ma.role as membership_role FROM agents a JOIN microdao_agents ma ON ma.agent_id = a.id WHERE ma.microdao_id = $1 AND (ma.role = 'core_team' OR ma.is_core = true) AND ma.role != 'district_lead' AND ma.role != 'orchestrator' ORDER BY a.display_name """ rows = await pool.fetch(query, district_id) return [dict(r) for r in rows] async def get_district_agents(district_id: str) -> List[Dict[str, Any]]: """ Отримати всіх агентів District-а. """ pool = await get_pool() query = """ SELECT a.id, a.display_name as name, a.kind, a.status, a.avatar_url, a.gov_level, ma.role as membership_role, ma.is_core FROM agents a JOIN microdao_agents ma ON ma.agent_id = a.id WHERE ma.microdao_id = $1 ORDER BY CASE ma.role WHEN 'district_lead' THEN 0 WHEN 'orchestrator' THEN 1 WHEN 'core_team' THEN 2 ELSE 3 END, ma.is_core DESC, a.display_name """ rows = await pool.fetch(query, district_id) return [dict(r) for r in rows] async def get_district_rooms(district_slug: str) -> List[Dict[str, Any]]: """ Отримати кімнати District-а за slug-префіксом. Наприклад: soul-lobby, soul-events, greenfood-lobby """ pool = await get_pool() query = """ SELECT id, slug, name, description, matrix_room_id, matrix_room_alias, room_role, is_public FROM city_rooms WHERE slug LIKE $1 ORDER BY sort_order, name """ rows = await pool.fetch(query, f"{district_slug}-%") return [dict(r) for r in rows] async def get_district_nodes(district_id: str) -> List[Dict[str, Any]]: """ Отримати ноди District-а. """ pool = await get_pool() query = """ SELECT n.id, n.display_name as name, n.node_type as kind, n.status, n.hostname as location, n.guardian_agent_id, n.steward_agent_id FROM nodes n WHERE n.owner_microdao_id = $1 ORDER BY n.display_name """ rows = await pool.fetch(query, district_id) return [dict(r) for r in rows] async def get_district_stats(district_id: str, district_slug: str) -> Dict[str, Any]: """ Отримати статистику District-а. """ pool = await get_pool() # Count agents agents_count = await pool.fetchval( "SELECT COUNT(*) FROM microdao_agents WHERE microdao_id = $1", district_id ) # Count rooms rooms_count = await pool.fetchval( "SELECT COUNT(*) FROM city_rooms WHERE slug LIKE $1", f"{district_slug}-%" ) # Count nodes nodes_count = await pool.fetchval( "SELECT COUNT(*) FROM nodes WHERE owner_microdao_id = $1", district_id ) return { "agents_count": agents_count or 0, "rooms_count": rooms_count or 0, "nodes_count": nodes_count or 0 } # ============================================================================= # DAGI Agent Audit Repository # ============================================================================= async def get_agents_by_node_for_audit(node_id: str) -> List[Dict[str, Any]]: """ Отримати агентів для DAGI audit по node_id. """ pool = await get_pool() query = """ SELECT id::text, external_id, COALESCE(name, display_name) as name, kind, node_id, status, COALESCE(is_active, true) as is_active, last_seen_at, dagi_status, created_at, updated_at FROM agents WHERE node_id = $1 AND COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false AND deleted_at IS NULL ORDER BY name """ rows = await pool.fetch(query, node_id) return [ { "id": row["id"], "external_id": row["external_id"], "name": row["name"], "kind": row["kind"], "node_id": row["node_id"], "status": row["status"], "is_active": row["is_active"], "last_seen_at": row["last_seen_at"].isoformat() if row["last_seen_at"] else None, "dagi_status": row["dagi_status"], "created_at": row["created_at"].isoformat() if row["created_at"] else None, "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None } for row in rows ] async def get_all_agents_for_audit() -> List[Dict[str, Any]]: """ Отримати всіх активних агентів для DAGI audit. """ pool = await get_pool() query = """ SELECT id::text, external_id, COALESCE(name, display_name) as name, kind, node_id, status, COALESCE(is_active, true) as is_active, last_seen_at, dagi_status, created_at, updated_at FROM agents WHERE COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false AND deleted_at IS NULL ORDER BY name """ rows = await pool.fetch(query) return [ { "id": row["id"], "external_id": row["external_id"], "name": row["name"], "kind": row["kind"], "node_id": row["node_id"], "status": row["status"], "is_active": row["is_active"], "last_seen_at": row["last_seen_at"].isoformat() if row["last_seen_at"] else None, "dagi_status": row["dagi_status"], "created_at": row["created_at"].isoformat() if row["created_at"] else None, "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None } for row in rows ] async def update_agents_dagi_status( agent_ids: List[str], status: str, update_last_seen: bool = False ) -> int: """ Оновити dagi_status для групи агентів. Повертає кількість оновлених записів. """ if not agent_ids: return 0 pool = await get_pool() if update_last_seen: query = """ UPDATE agents SET dagi_status = $2, last_seen_at = NOW(), updated_at = NOW() WHERE id = ANY($1::uuid[]) """ else: query = """ UPDATE agents SET dagi_status = $2, updated_at = NOW() WHERE id = ANY($1::uuid[]) """ result = await pool.execute(query, agent_ids, status) # asyncpg returns "UPDATE N" return int(result.split(" ")[-1]) async def save_dagi_audit_report( node_id: str, report_data: Dict[str, Any], triggered_by: str = "api" ) -> Dict[str, Any]: """ Зберегти звіт DAGI audit. """ pool = await get_pool() import json summary = report_data.get("summary", {}) row = await pool.fetchrow(""" INSERT INTO dagi_audit_reports ( node_id, router_total, db_total, active_count, phantom_count, stale_count, report_data, triggered_by ) VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8) RETURNING id, node_id, timestamp, router_total, db_total, active_count, phantom_count, stale_count, triggered_by """, node_id, summary.get("router_total", 0), summary.get("db_total", 0), summary.get("active_count", 0), summary.get("phantom_count", 0), summary.get("stale_count", 0), json.dumps(report_data), triggered_by ) return { "id": str(row["id"]), "node_id": row["node_id"], "timestamp": row["timestamp"].isoformat(), "router_total": row["router_total"], "db_total": row["db_total"], "active_count": row["active_count"], "phantom_count": row["phantom_count"], "stale_count": row["stale_count"], "triggered_by": row["triggered_by"] } async def get_latest_dagi_audit(node_id: str) -> Optional[Dict[str, Any]]: """ Отримати останній DAGI audit звіт для ноди. """ pool = await get_pool() row = await pool.fetchrow(""" SELECT id, node_id, timestamp, router_total, db_total, active_count, phantom_count, stale_count, report_data, triggered_by FROM dagi_audit_reports WHERE node_id = $1 ORDER BY timestamp DESC LIMIT 1 """, node_id) if not row: return None return { "id": str(row["id"]), "node_id": row["node_id"], "timestamp": row["timestamp"].isoformat(), "router_total": row["router_total"], "db_total": row["db_total"], "active_count": row["active_count"], "phantom_count": row["phantom_count"], "stale_count": row["stale_count"], "report_data": row["report_data"], "triggered_by": row["triggered_by"] } async def get_dagi_audit_history( node_id: str, limit: int = 10 ) -> List[Dict[str, Any]]: """ Отримати історію DAGI audit звітів для ноди. """ pool = await get_pool() rows = await pool.fetch(""" SELECT id, node_id, timestamp, router_total, db_total, active_count, phantom_count, stale_count, triggered_by FROM dagi_audit_reports WHERE node_id = $1 ORDER BY timestamp DESC LIMIT $2 """, node_id, limit) return [ { "id": str(row["id"]), "node_id": row["node_id"], "timestamp": row["timestamp"].isoformat(), "router_total": row["router_total"], "db_total": row["db_total"], "active_count": row["active_count"], "phantom_count": row["phantom_count"], "stale_count": row["stale_count"], "triggered_by": row["triggered_by"] } for row in rows ] # ============================================================================= # Node Metrics Repository # ============================================================================= async def get_node_metrics_current(node_id: str) -> Optional[Dict[str, Any]]: """ Отримати поточні метрики ноди. """ pool = await get_pool() row = await pool.fetchrow(""" SELECT node_id, node_name, hostname, status, roles, environment, cpu_model, cpu_cores, COALESCE(cpu_usage, 0) as cpu_usage, gpu_model, COALESCE(gpu_vram_total, 0) as gpu_vram_total, COALESCE(gpu_vram_used, 0) as gpu_vram_used, COALESCE(ram_total, 0) as ram_total, COALESCE(ram_used, 0) as ram_used, COALESCE(disk_total, 0) as disk_total, COALESCE(disk_used, 0) as disk_used, COALESCE(agent_count_router, 0) as agent_count_router, COALESCE(agent_count_system, 0) as agent_count_system, last_heartbeat, dagi_router_url, COALESCE(swapper_healthy, false) as swapper_healthy, COALESCE(swapper_models_loaded, 0) as swapper_models_loaded, COALESCE(swapper_models_total, 0) as swapper_models_total, router_url, swapper_url, updated_at FROM node_cache WHERE node_id = $1 """, node_id) if not row: return None # Count agents from database agent_count = await pool.fetchval(""" SELECT COUNT(*) FROM agents WHERE (node_id = $1 OR node_id IS NULL) AND COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false AND deleted_at IS NULL """, node_id) result = { "node_id": row["node_id"], "node_name": row["node_name"], "hostname": row["hostname"], "status": row["status"], "roles": row["roles"] or [], "environment": row["environment"], "cpu_model": row["cpu_model"], "cpu_cores": row["cpu_cores"] or 0, "cpu_usage": float(row["cpu_usage"]) if row["cpu_usage"] else 0.0, "gpu_model": row["gpu_model"], "gpu_memory_total": row["gpu_vram_total"] or 0, "gpu_memory_used": row["gpu_vram_used"] or 0, "ram_total": row["ram_total"] or 0, "ram_used": row["ram_used"] or 0, "disk_total": row["disk_total"] or 0, "disk_used": row["disk_used"] or 0, "agent_count_router": row["agent_count_router"] or 0, "agent_count_system": agent_count or 0, "dagi_router_url": row["dagi_router_url"], "swapper_healthy": row["swapper_healthy"], "swapper_models_loaded": row["swapper_models_loaded"], "swapper_models_total": row["swapper_models_total"], "last_heartbeat": row["last_heartbeat"].isoformat() if row["last_heartbeat"] else None, "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None } # Add GPU info for compatibility if row["gpu_model"]: result["gpu_info"] = f"{row['gpu_model']} ({row['gpu_vram_total']}MB)" else: result["gpu_info"] = None return result async def get_node_endpoints(node_id: str) -> Dict[str, str]: """ Отримати URL endpoints для DAGI Router та Swapper Service. Використовує ENV змінні для базових URL (один для всіх нод у проді). Якщо в БД є специфічні URL для ноди - використовує їх, інакше - ENV дефолти. ENV змінні: - ROUTER_BASE_URL (default: http://dagi-router:9102 для проді) - SWAPPER_BASE_URL (default: http://swapper-service:8890 для проді) Для DEV (локальний запуск на Mac): - ROUTER_BASE_URL=http://localhost:9102 - SWAPPER_BASE_URL=http://localhost:8890 """ pool = await get_pool() # Get node-specific URLs from DB if exist row = await pool.fetchrow(""" SELECT router_url, swapper_url FROM node_cache WHERE node_id = $1 """, node_id) # Get base URLs from ENV (one for all nodes in production) router_base = os.getenv("ROUTER_BASE_URL", "http://dagi-router:9102") swapper_base = os.getenv("SWAPPER_BASE_URL", "http://swapper-service:8890") defaults = { "router_url": router_base, "swapper_url": swapper_base } if not row: return defaults # Use DB values if present, otherwise fallback to ENV defaults return { "router_url": row["router_url"] or defaults["router_url"], "swapper_url": row["swapper_url"] or defaults["swapper_url"] } async def get_node_metrics(node_id: str) -> Optional[Dict[str, Any]]: """ Отримати розширені метрики ноди (включаючи Swapper та Router). """ pool = await get_pool() row = await pool.fetchrow(""" SELECT node_id, swapper_healthy, swapper_models_loaded, swapper_models_total, swapper_state, router_url, swapper_url, router_healthy, router_version FROM node_cache WHERE node_id = $1 """, node_id) if not row: return None result = dict(row) if result.get("swapper_state"): try: if isinstance(result["swapper_state"], str): result["swapper_state"] = json.loads(result["swapper_state"]) except Exception: result["swapper_state"] = {} return result async def update_node_metrics( node_id: str, metrics: Dict[str, Any] ) -> bool: """ Оновити метрики ноди. """ pool = await get_pool() result = await pool.execute(""" UPDATE node_cache SET cpu_usage = COALESCE($2, cpu_usage), gpu_vram_used = COALESCE($3, gpu_vram_used), ram_used = COALESCE($4, ram_used), disk_used = COALESCE($5, disk_used), agent_count_router = COALESCE($6, agent_count_router), agent_count_system = COALESCE($7, agent_count_system), last_heartbeat = NOW(), updated_at = NOW() WHERE node_id = $1 """, node_id, metrics.get("cpu_usage"), metrics.get("gpu_vram_used"), metrics.get("ram_used"), metrics.get("disk_used"), metrics.get("agent_count_router"), metrics.get("agent_count_system") ) return "UPDATE 1" in result # ============================================================================= # DAGI Router Agents Repository # ============================================================================= async def get_dagi_router_agents_for_node(node_id: str) -> Dict[str, Any]: """ Отримати агентів DAGI Router для Node Cabinet таблиці. Поєднує дані з audit report та agents table. """ pool = await get_pool() # Отримати останній audit audit = await get_latest_dagi_audit(node_id) # Отримати метрики ноди для GPU/CPU info node_metrics = await get_node_metrics_current(node_id) # Отримати всіх агентів з БД для цієї ноди db_agents_rows = await pool.fetch(""" SELECT a.id::text, a.external_id, COALESCE(a.name, a.display_name) as name, a.kind, a.status, a.node_id, a.public_slug, a.dagi_status, a.last_seen_at, a.is_public FROM agents a WHERE COALESCE(a.is_archived, false) = false AND COALESCE(a.is_test, false) = false AND a.deleted_at IS NULL ORDER BY a.display_name """) # Map db agents by normalized name and external_id db_agents_map = {} for row in db_agents_rows: db_agents_map[row["id"]] = dict(row) if row["external_id"]: ext_id = row["external_id"].split(":")[-1].lower() if ":" in row["external_id"] else row["external_id"].lower() db_agents_map[ext_id] = dict(row) name_norm = row["name"].lower().replace(" ", "").replace("-", "").replace("_", "") if row["name"] else "" if name_norm: db_agents_map[name_norm] = dict(row) # Формуємо уніфікований список агентів agents = [] active_count = 0 phantom_count = 0 stale_count = 0 if audit and audit.get("report_data"): report = audit["report_data"] # Active agents for a in report.get("active_agents", []): db_agent = db_agents_map.get(a.get("db_id")) agents.append({ "id": a.get("db_id") or a.get("router_id"), "name": a.get("db_name") or a.get("router_name"), "role": db_agent.get("kind") if db_agent else None, "status": "active", "node_id": node_id, "models": [], # TODO: можна додати з router-config "gpu": node_metrics.get("gpu_model") if node_metrics else None, "cpu": f"{node_metrics.get('cpu_cores')} cores" if node_metrics else None, "last_seen_at": db_agent.get("last_seen_at").isoformat() if db_agent and db_agent.get("last_seen_at") else None, "has_cabinet": bool(db_agent and db_agent.get("public_slug")), "cabinet_slug": db_agent.get("public_slug") if db_agent else None }) active_count += 1 # Phantom agents for a in report.get("phantom_agents", []): agents.append({ "id": a.get("router_id"), "name": a.get("router_name"), "role": None, "status": "phantom", "node_id": node_id, "models": [], "gpu": node_metrics.get("gpu_model") if node_metrics else None, "cpu": f"{node_metrics.get('cpu_cores')} cores" if node_metrics else None, "last_seen_at": None, "has_cabinet": False, "cabinet_slug": None, "description": a.get("description") }) phantom_count += 1 # Stale agents for a in report.get("stale_agents", []): db_agent = db_agents_map.get(a.get("db_id")) agents.append({ "id": a.get("db_id"), "name": a.get("db_name"), "role": db_agent.get("kind") if db_agent else a.get("kind"), "status": "stale", "node_id": node_id, "models": [], "gpu": node_metrics.get("gpu_model") if node_metrics else None, "cpu": f"{node_metrics.get('cpu_cores')} cores" if node_metrics else None, "last_seen_at": db_agent.get("last_seen_at").isoformat() if db_agent and db_agent.get("last_seen_at") else None, "has_cabinet": bool(db_agent and db_agent.get("public_slug")), "cabinet_slug": db_agent.get("public_slug") if db_agent else None }) stale_count += 1 # Check prompts status for all agents agent_ids = [a["id"] for a in agents if a.get("id")] prompts_status = await check_agents_prompts_status(agent_ids) if agent_ids else {} # Add has_prompts to each agent for agent in agents: agent["has_prompts"] = prompts_status.get(agent.get("id"), False) return { "node_id": node_id, "last_audit_at": audit.get("timestamp") if audit else None, "summary": { "active": active_count, "phantom": phantom_count, "stale": stale_count, "router_total": audit.get("router_total", 0) if audit else 0, "system_total": audit.get("db_total", 0) if audit else len(db_agents_rows) }, "agents": agents } async def sync_phantom_agents( node_id: str, agent_ids: List[str], router_config: Dict[str, Any] ) -> List[Dict[str, Any]]: """ Синхронізувати phantom агентів (створити в БД). """ pool = await get_pool() created = [] agents_config = router_config.get("agents", {}) for agent_id in agent_ids: if agent_id not in agents_config: continue agent_data = agents_config[agent_id] # Створити агента в БД new_id = str(uuid.uuid4()) try: row = await pool.fetchrow(""" INSERT INTO agents ( id, external_id, name, display_name, kind, status, node_id, dagi_status, last_seen_at, is_public, public_slug, created_at, updated_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, 'active', NOW(), true, $8, NOW(), NOW()) ON CONFLICT (external_id) DO UPDATE SET dagi_status = 'active', last_seen_at = NOW(), updated_at = NOW() RETURNING id::text, name, external_id """, new_id, f"agent:{agent_id}", agent_id, agent_id.replace("_", " ").title(), "ai_agent", "online", node_id, agent_id ) if row: created.append({ "id": row["id"], "name": row["name"], "external_id": row["external_id"] }) except Exception as e: print(f"Error creating agent {agent_id}: {e}") return created async def mark_stale_agents(agent_ids: List[str]) -> int: """ Позначити агентів як stale. """ if not agent_ids: return 0 pool = await get_pool() result = await pool.execute(""" UPDATE agents SET dagi_status = 'stale', updated_at = NOW() WHERE id = ANY($1::uuid[]) """, agent_ids) return int(result.split(" ")[-1]) async def get_node_agents(node_id: str) -> List[Dict[str, Any]]: """ Отримати всіх агентів ноди (Guardian, Steward, runtime agents). """ pool = await get_pool() query = """ SELECT a.id, a.display_name, a.kind, a.status, a.node_id, a.public_slug FROM agents a WHERE a.node_id = $1 AND COALESCE(a.is_archived, false) = false ORDER BY a.display_name """ rows = await pool.fetch(query, node_id) return [dict(row) for row in rows] # Alias for DAGI Router integration async def get_agents_for_node(node_id: str) -> List[Dict[str, Any]]: """Alias for get_node_agents - used by DAGI Router endpoints.""" return await get_node_agents(node_id) # ============================================================================== # Node Self-Registration & Self-Healing # ============================================================================== async def node_self_register( node_id: str, name: str, hostname: Optional[str] = None, environment: str = "development", roles: Optional[List[str]] = None, description: Optional[str] = None ) -> Dict[str, Any]: """ Самореєстрація ноди. Викликається з Node Bootstrap або Node Guardian. Якщо нода вже існує — оновлює, інакше — створює. Також забезпечує наявність запису в node_cache. """ pool = await get_pool() roles = roles or [] try: # Використати SQL функцію для атомарної операції result = await pool.fetchval(""" SELECT fn_node_self_register($1, $2, $3, $4, $5) """, node_id, name, hostname, environment, roles) if result: import json return json.loads(result) except Exception as e: # Fallback якщо функція не існує (ще не запущена міграція) logger.warning(f"fn_node_self_register not available, using fallback: {e}") # Fallback: пряма вставка/оновлення try: # Check if exists existing = await pool.fetchval( "SELECT id FROM node_registry WHERE id = $1", node_id ) is_new = existing is None if is_new: await pool.execute(""" INSERT INTO node_registry (id, name, hostname, environment, roles, description, is_active, registered_at, updated_at, last_self_registration, self_registration_count) VALUES ($1, $2, $3, $4, $5, $6, true, NOW(), NOW(), NOW(), 1) """, node_id, name, hostname, environment, roles, description) else: await pool.execute(""" UPDATE node_registry SET name = COALESCE(NULLIF($2, ''), name), hostname = COALESCE($3, hostname), environment = COALESCE(NULLIF($4, ''), environment), roles = CASE WHEN array_length($5::text[], 1) > 0 THEN $5 ELSE roles END, description = COALESCE($6, description), is_active = true, updated_at = NOW(), last_self_registration = NOW(), self_registration_count = COALESCE(self_registration_count, 0) + 1 WHERE id = $1 """, node_id, name, hostname, environment, roles, description) # Ensure node_cache entry await pool.execute(""" INSERT INTO node_cache (node_id, last_heartbeat, self_healing_status) VALUES ($1, NOW(), 'healthy') ON CONFLICT (node_id) DO UPDATE SET last_heartbeat = NOW(), self_healing_status = 'healthy' """, node_id) return { "success": True, "node_id": node_id, "is_new": is_new, "message": "Node registered" if is_new else "Node updated" } except Exception as e: # Ultimate fallback: just update node_cache logger.warning(f"node_registry insert failed, updating node_cache: {e}") try: await pool.execute(""" INSERT INTO node_cache (node_id, node_name, hostname, environment, roles, last_heartbeat) VALUES ($1, $2, $3, $4, $5, NOW()) ON CONFLICT (node_id) DO UPDATE SET node_name = COALESCE(NULLIF($2, ''), node_cache.node_name), hostname = COALESCE($3, node_cache.hostname), environment = COALESCE(NULLIF($4, ''), node_cache.environment), roles = CASE WHEN array_length($5::text[], 1) > 0 THEN $5 ELSE node_cache.roles END, last_heartbeat = NOW() """, node_id, name, hostname, environment, roles) return { "success": True, "node_id": node_id, "is_new": False, "message": "Node updated (fallback to node_cache)" } except Exception as fallback_error: logger.error(f"Failed to register node {node_id}: {fallback_error}") return { "success": False, "node_id": node_id, "error": str(fallback_error) } async def node_heartbeat( node_id: str, metrics: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Heartbeat ноди з оновленням метрик. Повертає should_self_register=True якщо нода не зареєстрована. """ pool = await get_pool() metrics = metrics or {} try: # Використати SQL функцію result = await pool.fetchval(""" SELECT fn_node_heartbeat($1, $2) """, node_id, json.dumps(metrics) if metrics else None) if result: return json.loads(result) except Exception as e: logger.warning(f"fn_node_heartbeat not available, using fallback: {e}") # Fallback try: # Check if registered registered = await pool.fetchval(""" SELECT EXISTS(SELECT 1 FROM node_registry WHERE id = $1 AND is_active = true) """, node_id) if not registered: # Check node_cache as fallback cache_exists = await pool.fetchval(""" SELECT EXISTS(SELECT 1 FROM node_cache WHERE node_id = $1) """, node_id) if not cache_exists: return { "success": False, "error": "Node not registered", "should_self_register": True } # Update heartbeat swapper_state = metrics.get("swapper_state") # Always update swapper_state if provided (even if empty dict) # Use empty JSON object as default if swapper_state is None swapper_state_json = json.dumps(swapper_state) if swapper_state is not None else None await pool.execute(""" UPDATE node_cache SET last_heartbeat = NOW(), self_healing_status = 'healthy', cpu_usage = COALESCE($2::numeric, cpu_usage), gpu_vram_used = COALESCE($3::integer, gpu_vram_used), ram_used = COALESCE($4::integer, ram_used), disk_used = COALESCE($5::integer, disk_used), agent_count_router = COALESCE($6::integer, agent_count_router), agent_count_system = COALESCE($7::integer, agent_count_system), dagi_router_url = COALESCE($8, dagi_router_url), swapper_healthy = COALESCE($9::boolean, swapper_healthy), swapper_models_loaded = COALESCE($10::integer, swapper_models_loaded), swapper_models_total = COALESCE($11::integer, swapper_models_total), swapper_state = CASE WHEN $12::jsonb IS NOT NULL THEN $12::jsonb ELSE swapper_state END, router_healthy = COALESCE($13::boolean, router_healthy), router_version = COALESCE($14, router_version), router_url = COALESCE($15, router_url), swapper_url = COALESCE($16, swapper_url) WHERE node_id = $1 """, node_id, metrics.get("cpu_usage"), metrics.get("gpu_vram_used"), metrics.get("ram_used"), metrics.get("disk_used"), metrics.get("agent_count_router"), metrics.get("agent_count_system"), metrics.get("dagi_router_url"), metrics.get("swapper_healthy"), metrics.get("swapper_models_loaded"), metrics.get("swapper_models_total"), swapper_state_json, metrics.get("router_healthy"), metrics.get("router_version"), metrics.get("router_url"), metrics.get("swapper_url") ) return { "success": True, "node_id": node_id, "heartbeat_at": datetime.now(timezone.utc).isoformat() } except Exception as e: logger.error(f"Heartbeat failed for {node_id}: {e}") return { "success": False, "error": str(e) } async def check_node_in_directory(node_id: str) -> bool: """ Перевірити чи нода видима в Node Directory. Використовується Node Guardian для self-healing. """ pool = await get_pool() try: # Check node_registry first exists = await pool.fetchval(""" SELECT EXISTS( SELECT 1 FROM node_registry WHERE id = $1 AND is_active = true ) """, node_id) return bool(exists) except Exception: # Fallback to node_cache try: exists = await pool.fetchval(""" SELECT EXISTS(SELECT 1 FROM node_cache WHERE node_id = $1) """, node_id) return bool(exists) except Exception: return False async def get_node_self_healing_status(node_id: str) -> Dict[str, Any]: """ Отримати статус self-healing для ноди. """ pool = await get_pool() try: row = await pool.fetchrow(""" SELECT nr.id, nr.name, nr.is_active, nr.last_self_registration, nr.self_registration_count, nc.self_healing_status, nc.self_healing_last_check, nc.self_healing_errors, nc.last_heartbeat, nc.agent_count_router, nc.agent_count_system, nc.guardian_agent_id, nc.steward_agent_id FROM node_registry nr LEFT JOIN node_cache nc ON nc.node_id = nr.id WHERE nr.id = $1 """, node_id) if not row: return { "node_id": node_id, "registered": False, "status": "not_found" } return { "node_id": node_id, "registered": True, "is_active": row["is_active"], "name": row["name"], "self_healing_status": row["self_healing_status"] or "unknown", "last_heartbeat": row["last_heartbeat"].isoformat() if row["last_heartbeat"] else None, "last_self_registration": row["last_self_registration"].isoformat() if row["last_self_registration"] else None, "self_registration_count": row["self_registration_count"] or 0, "agent_count_router": row["agent_count_router"] or 0, "agent_count_system": row["agent_count_system"] or 0, "has_guardian": bool(row["guardian_agent_id"]), "has_steward": bool(row["steward_agent_id"]), "errors": row["self_healing_errors"] or [] } except Exception as e: logger.error(f"Failed to get self-healing status for {node_id}: {e}") return { "node_id": node_id, "registered": False, "status": "error", "error": str(e) } async def update_node_self_healing_status( node_id: str, status: str, error: Optional[str] = None ) -> bool: """ Оновити статус self-healing для ноди. """ pool = await get_pool() try: if error: await pool.execute(""" UPDATE node_cache SET self_healing_status = $2, self_healing_last_check = NOW(), self_healing_errors = COALESCE(self_healing_errors, '[]'::jsonb) || jsonb_build_object( 'timestamp', NOW(), 'error', $3 ) WHERE node_id = $1 """, node_id, status, error) else: await pool.execute(""" UPDATE node_cache SET self_healing_status = $2, self_healing_last_check = NOW() WHERE node_id = $1 """, node_id, status) return True except Exception as e: logger.error(f"Failed to update self-healing status for {node_id}: {e}") return False async def get_nodes_needing_healing() -> List[Dict[str, Any]]: """ Отримати список нод, які потребують self-healing. Критерії: - heartbeat старший за 10 хвилин - agent_count_router = 0 - немає guardian_agent_id - self_healing_status = 'error' """ pool = await get_pool() try: rows = await pool.fetch(""" SELECT nr.id as node_id, nr.name, nc.last_heartbeat, nc.agent_count_router, nc.agent_count_system, nc.guardian_agent_id, nc.steward_agent_id, nc.self_healing_status, CASE WHEN nc.last_heartbeat < NOW() - INTERVAL '10 minutes' THEN 'stale_heartbeat' WHEN nc.agent_count_router = 0 OR nc.agent_count_router IS NULL THEN 'no_router_agents' WHEN nc.guardian_agent_id IS NULL THEN 'no_guardian' WHEN nc.self_healing_status = 'error' THEN 'previous_error' ELSE 'unknown' END as healing_reason FROM node_registry nr LEFT JOIN node_cache nc ON nc.node_id = nr.id WHERE nr.is_active = true AND ( nc.last_heartbeat < NOW() - INTERVAL '10 minutes' OR nc.agent_count_router = 0 OR nc.agent_count_router IS NULL OR nc.guardian_agent_id IS NULL OR nc.self_healing_status = 'error' ) """) return [dict(row) for row in rows] except Exception as e: logger.error(f"Failed to get nodes needing healing: {e}") return [] # ============================================================================= # MicroDAO Activity Repository # ============================================================================= async def get_microdao_activity(slug: str, limit: int = 20) -> List[dict]: """Отримати активність MicroDAO (новини, події, оновлення)""" pool = await get_pool() query = """ SELECT id, microdao_slug, kind, title, body, author_agent_id, author_name, created_at FROM microdao_activity WHERE microdao_slug = $1 ORDER BY created_at DESC LIMIT $2 """ rows = await pool.fetch(query, slug, limit) return [dict(row) for row in rows] async def create_microdao_activity( slug: str, kind: str, body: str, title: Optional[str] = None, author_agent_id: Optional[str] = None, author_name: Optional[str] = None ) -> dict: """Створити новий запис активності для MicroDAO""" pool = await get_pool() # Перевірити, що MicroDAO існує microdao = await get_microdao_by_slug(slug) if not microdao: raise ValueError(f"MicroDAO {slug} not found") query = """ INSERT INTO microdao_activity ( microdao_slug, kind, title, body, author_agent_id, author_name ) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, microdao_slug, kind, title, body, author_agent_id, author_name, created_at """ row = await pool.fetchrow( query, slug, kind, title, body, author_agent_id, author_name ) return dict(row) async def get_citizens_for_microdao(slug: str, limit: int = 6) -> List[dict]: """Отримати громадян (публічних агентів) для MicroDAO""" pool = await get_pool() # Спочатку отримати microdao_id microdao = await get_microdao_by_slug(slug) if not microdao: return [] microdao_id = str(microdao["id"]) # Використати існуючу функцію або зробити простий SELECT query = """ SELECT DISTINCT a.public_slug AS slug, a.display_name, a.public_title, a.public_tagline, a.avatar_url, a.public_district AS district, a.public_primary_room_slug AS primary_room_slug, a.kind, a.node_id, COALESCE(a.public_skills, ARRAY[]::text[]) AS public_skills, a.status, m.slug AS home_microdao_slug, m.name AS home_microdao_name FROM microdao_agents ma JOIN agents a ON a.id = ma.agent_id LEFT JOIN microdaos m ON m.id = a.home_microdao_id WHERE ma.microdao_id = $1 AND a.is_public = true AND a.public_slug IS NOT NULL ORDER BY a.display_name LIMIT $2 """ rows = await pool.fetch(query, microdao_id, limit) result = [] for row in rows: data = dict(row) data["public_skills"] = list(data.get("public_skills") or []) result.append(data) return result async def count_agents_for_microdao(slug: str) -> int: """Підрахувати кількість агентів для MicroDAO""" pool = await get_pool() microdao = await get_microdao_by_slug(slug) if not microdao: return 0 microdao_id = str(microdao["id"]) query = """ SELECT COUNT(*) as count FROM microdao_agents WHERE microdao_id = $1 """ row = await pool.fetchrow(query, microdao_id) return row["count"] if row else 0 async def get_microdao_dashboard(slug: str) -> dict: """Отримати повний дашборд для MicroDAO""" pool = await get_pool() # Отримати MicroDAO microdao = await get_microdao_by_slug(slug) if not microdao: raise ValueError(f"MicroDAO {slug} not found") microdao_id = str(microdao["id"]) # Отримати кімнати rooms = await get_microdao_rooms(microdao_id) rooms_limited = rooms[:5] # Отримати громадян citizens = await get_citizens_for_microdao(slug, limit=6) # Отримати активність activity = await get_microdao_activity(slug, limit=10) # Підрахувати статистику rooms_count = len(rooms) citizens_count = len(citizens) agents_count = await count_agents_for_microdao(slug) # Конвертувати rooms в CityRoomSummary room_summaries = [] for room in rooms_limited: room_summaries.append({ "id": room["id"], "slug": room["slug"], "name": room["name"], "matrix_room_id": room.get("matrix_room_id"), "microdao_id": room.get("microdao_id"), "microdao_slug": room.get("microdao_slug"), "room_role": room.get("room_role"), "is_public": room.get("is_public", True), "sort_order": room.get("sort_order", 100), "logo_url": room.get("logo_url"), "banner_url": room.get("banner_url") }) # Конвертувати citizens в PublicCitizenSummary citizen_summaries = [] for citizen in citizens: # Переконатися що slug не None slug = citizen.get("slug") if not slug: continue # Пропустити громадян без slug citizen_summaries.append({ "slug": slug, "display_name": citizen["display_name"], "public_title": citizen.get("public_title"), "public_tagline": citizen.get("public_tagline"), "avatar_url": citizen.get("avatar_url"), "kind": citizen.get("kind"), "district": citizen.get("district"), "primary_room_slug": citizen.get("primary_room_slug"), "public_skills": list(citizen.get("public_skills", [])) if citizen.get("public_skills") else [], "online_status": citizen.get("status", "unknown"), "status": citizen.get("status"), "node_id": citizen.get("node_id"), "home_microdao_slug": citizen.get("home_microdao_slug"), "home_microdao_name": citizen.get("home_microdao_name") }) # Конвертувати activity в MicrodaoActivity activity_list = [] for act in activity: # Конвертувати UUID в string, якщо потрібно author_id = act.get("author_agent_id") if author_id: author_id = str(author_id) if not isinstance(author_id, str) else author_id activity_list.append({ "id": str(act["id"]), "microdao_slug": act["microdao_slug"], "kind": act["kind"], "title": act.get("title"), "body": act["body"], "author_agent_id": author_id, "author_name": act.get("author_name"), "created_at": act["created_at"].isoformat() if hasattr(act["created_at"], "isoformat") else str(act["created_at"]) }) # Створити MicrodaoSummary microdao_summary = { "id": str(microdao["id"]), "slug": microdao["slug"], "name": microdao["name"], "description": microdao.get("description"), "district": microdao.get("district"), "is_public": microdao.get("is_public", True), "is_platform": microdao.get("is_platform", False), "is_active": microdao.get("is_active", True), "is_pinned": microdao.get("is_pinned", False), "pinned_weight": microdao.get("pinned_weight", 0), "orchestrator_agent_id": microdao.get("orchestrator_agent_id"), "orchestrator_agent_name": microdao.get("orchestrator_agent_name"), "parent_microdao_id": microdao.get("parent_microdao_id"), "parent_microdao_slug": microdao.get("parent_microdao_slug"), "logo_url": microdao.get("logo_url"), "banner_url": microdao.get("banner_url"), "member_count": agents_count, "agents_count": agents_count, "room_count": rooms_count, "rooms_count": rooms_count, "channels_count": 0 } # Створити stats last_update = microdao.get("updated_at") if last_update and hasattr(last_update, "isoformat"): last_update = last_update.isoformat() elif last_update: last_update = str(last_update) stats = { "rooms_count": rooms_count, "citizens_count": citizens_count, "agents_count": agents_count, "last_update_at": last_update } return { "microdao": microdao_summary, "stats": stats, "recent_activity": activity_list, "rooms": room_summaries, "citizens": citizen_summaries }