""" Repository для City Backend (PostgreSQL) """ import os import asyncpg from typing import Optional, List, Dict, Any, Tuple from datetime import datetime import secrets import httpx import logging logger = logging.getLogger(__name__) # Database connection _pool: Optional[asyncpg.Pool] = None MATRIX_GATEWAY_URL = os.getenv("MATRIX_GATEWAY_URL", "http://matrix-gateway:8000") async def get_pool() -> asyncpg.Pool: """Отримати connection pool""" global _pool if _pool is None: database_url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/daarion") _pool = await asyncpg.create_pool(database_url, min_size=2, max_size=10) return _pool async def close_pool(): """Закрити connection pool""" global _pool if _pool is not None: await _pool.close() _pool = None def generate_id(prefix: str) -> str: """Генерувати простий ID""" return f"{prefix}_{secrets.token_urlsafe(12)}" def _normalize_capabilities(value: Any) -> List[str]: """Ensure capabilities are returned as a list.""" if value is None: return [] if isinstance(value, list): return value if isinstance(value, str): import json try: return json.loads(value) except Exception: return [] return list(value) # ============================================================================= # City Rooms Repository # ============================================================================= async def get_all_rooms(limit: int = 100, offset: int = 0) -> List[dict]: """Отримати всі кімнати""" pool = await get_pool() query = """ SELECT id, slug, name, description, is_default, created_at, created_by, matrix_room_id, matrix_room_alias, logo_url, banner_url FROM city_rooms ORDER BY is_default DESC, created_at DESC LIMIT $1 OFFSET $2 """ rows = await pool.fetch(query, limit, offset) return [dict(row) for row in rows] async def get_room_by_id(room_id: str) -> Optional[dict]: """Отримати кімнату по ID""" pool = await get_pool() query = """ SELECT cr.id, cr.slug, cr.name, cr.description, cr.is_default, cr.created_at, cr.created_by, cr.matrix_room_id, cr.matrix_room_alias, cr.logo_url, cr.banner_url, cr.microdao_id, m.name AS microdao_name, m.slug AS microdao_slug, m.logo_url AS microdao_logo_url FROM city_rooms cr LEFT JOIN microdaos m ON cr.microdao_id = m.id WHERE cr.id = $1 """ row = await pool.fetchrow(query, room_id) return dict(row) if row else None async def get_room_by_slug(slug: str) -> Optional[dict]: """Отримати кімнату по slug""" pool = await get_pool() query = """ SELECT cr.id, cr.slug, cr.name, cr.description, cr.is_default, cr.created_at, cr.created_by, cr.matrix_room_id, cr.matrix_room_alias, cr.logo_url, cr.banner_url, cr.microdao_id, m.name AS microdao_name, m.slug AS microdao_slug, m.logo_url AS microdao_logo_url FROM city_rooms cr LEFT JOIN microdaos m ON cr.microdao_id = m.id WHERE cr.slug = $1 """ row = await pool.fetchrow(query, slug) return dict(row) if row else None async def create_room( slug: str, name: str, description: Optional[str], created_by: Optional[str], matrix_room_id: Optional[str] = None, matrix_room_alias: Optional[str] = None ) -> dict: """Створити кімнату""" pool = await get_pool() room_id = f"room_city_{slug}" query = """ INSERT INTO city_rooms (id, slug, name, description, created_by, matrix_room_id, matrix_room_alias) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id, slug, name, description, is_default, created_at, created_by, matrix_room_id, matrix_room_alias """ row = await pool.fetchrow(query, room_id, slug, name, description, created_by, matrix_room_id, matrix_room_alias) return dict(row) async def update_room_matrix(room_id: str, matrix_room_id: str, matrix_room_alias: str) -> Optional[dict]: """Оновити Matrix поля кімнати""" pool = await get_pool() query = """ UPDATE city_rooms SET matrix_room_id = $2, matrix_room_alias = $3 WHERE id = $1 RETURNING id, slug, name, description, is_default, created_at, created_by, matrix_room_id, matrix_room_alias """ row = await pool.fetchrow(query, room_id, matrix_room_id, matrix_room_alias) return dict(row) async def get_rooms_without_matrix() -> List[dict]: """Отримати кімнати без Matrix інтеграції""" pool = await get_pool() query = """ SELECT id, slug, name, description, is_default, created_at, created_by, matrix_room_id, matrix_room_alias FROM city_rooms WHERE matrix_room_id IS NULL ORDER BY created_at """ rows = await pool.fetch(query) return [dict(row) for row in rows] # ============================================================================= # City Room Messages Repository # ============================================================================= async def get_room_messages(room_id: str, limit: int = 50) -> List[dict]: """Отримати повідомлення кімнати""" pool = await get_pool() query = """ SELECT id, room_id, author_user_id, author_agent_id, body, created_at FROM city_room_messages WHERE room_id = $1 ORDER BY created_at DESC LIMIT $2 """ rows = await pool.fetch(query, room_id, limit) # Reverse для правильного порядку (старі → нові) return [dict(row) for row in reversed(rows)] async def create_room_message( room_id: str, body: str, author_user_id: Optional[str] = None, author_agent_id: Optional[str] = None ) -> dict: """Створити повідомлення в кімнаті""" pool = await get_pool() message_id = generate_id("m_city") query = """ INSERT INTO city_room_messages (id, room_id, author_user_id, author_agent_id, body) VALUES ($1, $2, $3, $4, $5) RETURNING id, room_id, author_user_id, author_agent_id, body, created_at """ row = await pool.fetchrow(query, message_id, room_id, author_user_id, author_agent_id, body) return dict(row) # ============================================================================= # City Feed Events Repository # ============================================================================= async def get_feed_events(limit: int = 20, offset: int = 0) -> List[dict]: """Отримати події feed""" pool = await get_pool() query = """ SELECT id, kind, room_id, user_id, agent_id, payload, created_at FROM city_feed_events ORDER BY created_at DESC LIMIT $1 OFFSET $2 """ rows = await pool.fetch(query, limit, offset) return [dict(row) for row in rows] async def create_feed_event( kind: str, payload: dict, room_id: Optional[str] = None, user_id: Optional[str] = None, agent_id: Optional[str] = None ) -> dict: """Створити подію в feed""" pool = await get_pool() event_id = generate_id("evt_city") query = """ INSERT INTO city_feed_events (id, kind, room_id, user_id, agent_id, payload) VALUES ($1, $2, $3, $4, $5, $6::jsonb) RETURNING id, kind, room_id, user_id, agent_id, payload, created_at """ import json payload_json = json.dumps(payload) row = await pool.fetchrow(query, event_id, kind, room_id, user_id, agent_id, payload_json) return dict(row) # ============================================================================= # City Map Repository # ============================================================================= async def get_map_config() -> dict: """Отримати конфігурацію мапи міста""" pool = await get_pool() query = """ SELECT id, grid_width, grid_height, cell_size, background_url, updated_at FROM city_map_config WHERE id = 'default' """ row = await pool.fetchrow(query) if row: return dict(row) # Повернути дефолтні значення якщо немає запису return { "id": "default", "grid_width": 6, "grid_height": 3, "cell_size": 100, "background_url": None } async def get_rooms_for_map() -> List[dict]: """Отримати кімнати з координатами для 2D мапи""" pool = await get_pool() query = """ SELECT id, slug, name, description, room_type, zone, icon, color, map_x, map_y, map_w, map_h, matrix_room_id FROM city_rooms ORDER BY map_y, map_x """ rows = await pool.fetch(query) return [dict(row) for row in rows] # ============================================================================= # Agents Repository # ============================================================================= async def list_agent_summaries( *, node_id: Optional[str] = None, microdao_id: Optional[str] = None, is_public: Optional[bool] = None, visibility_scope: Optional[str] = None, listed_only: Optional[bool] = None, kinds: Optional[List[str]] = None, include_system: bool = True, include_archived: bool = False, limit: int = 200, offset: int = 0 ) -> Tuple[List[dict], int]: """ Unified method to list agents with all necessary data. Used by both Agent Console and Citizens page. """ pool = await get_pool() params: List[Any] = [] where_clauses = [] # Always filter archived unless explicitly included if not include_archived: where_clauses.append("COALESCE(a.is_archived, false) = false") where_clauses.append("COALESCE(a.is_test, false) = false") where_clauses.append("a.deleted_at IS NULL") if node_id: params.append(node_id) where_clauses.append(f"a.node_id = ${len(params)}") if microdao_id: params.append(microdao_id) where_clauses.append(f"EXISTS (SELECT 1 FROM microdao_agents ma WHERE ma.agent_id = a.id AND ma.microdao_id = ${len(params)})") if is_public is not None: params.append(is_public) where_clauses.append(f"COALESCE(a.is_public, false) = ${len(params)}") if visibility_scope: params.append(visibility_scope) where_clauses.append(f"COALESCE(a.visibility_scope, 'city') = ${len(params)}") if listed_only is True: where_clauses.append("COALESCE(a.is_listed_in_directory, true) = true") elif listed_only is False: where_clauses.append("COALESCE(a.is_listed_in_directory, true) = false") if kinds: params.append(kinds) where_clauses.append(f"a.kind = ANY(${len(params)})") if not include_system: where_clauses.append("COALESCE(a.is_system, false) = false") where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" query = f""" SELECT a.id, COALESCE(a.slug, a.public_slug, LOWER(REPLACE(a.display_name, ' ', '-'))) AS slug, a.display_name, COALESCE(a.public_title, '') AS title, COALESCE(a.public_tagline, '') AS tagline, a.kind, a.avatar_url, COALESCE(a.status, 'offline') AS status, a.node_id, nc.node_name AS node_label, nc.hostname AS node_hostname, nc.roles AS node_roles, nc.environment AS node_environment, COALESCE(a.visibility_scope, 'city') AS visibility_scope, COALESCE(a.is_listed_in_directory, true) AS is_listed_in_directory, COALESCE(a.is_system, false) AS is_system, COALESCE(a.is_public, false) AS is_public, COALESCE(a.is_orchestrator, false) AS is_orchestrator, a.primary_microdao_id, pm.name AS primary_microdao_name, pm.slug AS primary_microdao_slug, pm.district AS district, COALESCE(a.public_skills, ARRAY[]::text[]) AS public_skills, a.crew_team_key, COUNT(*) OVER() AS total_count FROM agents a LEFT JOIN node_cache nc ON a.node_id = nc.node_id LEFT JOIN microdaos pm ON a.primary_microdao_id = pm.id WHERE {where_sql} ORDER BY a.display_name LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} """ params.append(limit) params.append(offset) rows = await pool.fetch(query, *params) if not rows: return [], 0 total = rows[0]["total_count"] items = [] for row in rows: data = dict(row) data.pop("total_count", None) # Build home_node object if data.get("node_id"): data["home_node"] = { "id": data.get("node_id"), "name": data.get("node_label"), "hostname": data.get("node_hostname"), "roles": list(data.get("node_roles") or []), "environment": data.get("node_environment") } else: data["home_node"] = None # Clean up intermediate fields for key in ["node_hostname", "node_roles", "node_environment"]: data.pop(key, None) # Get MicroDAO memberships memberships = await get_agent_microdao_memberships(data["id"]) data["microdaos"] = [ { "id": m.get("microdao_id", ""), "name": m.get("name", ""), "slug": m.get("slug"), "role": m.get("role") } for m in memberships ] data["microdao_memberships"] = memberships # backward compatibility data["public_skills"] = list(data.get("public_skills") or []) # Populate crew_info if data.get("crew_team_key"): # Try to find orchestrator team room for their primary microdao # This is a bit expensive for list view, so maybe just return basic info data["crew_info"] = { "has_crew_team": True, "crew_team_key": data["crew_team_key"], "matrix_room_id": None # Loaded lazily if needed } else: data["crew_info"] = { "has_crew_team": False, "crew_team_key": None, "matrix_room_id": None } items.append(data) return items, total async def get_all_agents() -> List[dict]: """Отримати всіх агентів (non-archived) - legacy method""" pool = await get_pool() query = """ SELECT id, display_name, kind, avatar_url, color, status, current_room_id, capabilities, created_at, updated_at FROM agents WHERE COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false AND deleted_at IS NULL ORDER BY display_name """ rows = await pool.fetch(query) return [dict(row) for row in rows] async def update_agent_visibility( agent_id: str, *, is_public: bool, visibility_scope: Optional[str] = None, ) -> Optional[dict]: """ Оновити налаштування видимості агента. Returns updated agent data or None if not found. """ pool = await get_pool() # Build dynamic update set_parts = ["is_public = $2", "updated_at = NOW()"] params = [agent_id, is_public] if visibility_scope is not None: params.append(visibility_scope) set_parts.append(f"visibility_scope = ${len(params)}") # Also update is_listed_in_directory based on is_public set_parts.append("is_listed_in_directory = $2") # same as is_public query = f""" UPDATE agents SET {', '.join(set_parts)} WHERE id = $1 AND COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false RETURNING id, display_name, is_public, visibility_scope, is_listed_in_directory """ result = await pool.fetchrow(query, *params) return dict(result) if result else None async def update_agent_visibility_legacy( agent_id: str, visibility_scope: str, is_listed_in_directory: bool ) -> bool: """Legacy: Оновити налаштування видимості агента (backward compatibility)""" pool = await get_pool() query = """ UPDATE agents SET visibility_scope = $2, is_listed_in_directory = $3, is_public = $3, updated_at = NOW() WHERE id = $1 AND COALESCE(is_archived, false) = false RETURNING id """ result = await pool.fetchrow(query, agent_id, visibility_scope, is_listed_in_directory) return result is not None async def get_agent_prompts(agent_id: str) -> dict: """Отримати системні промти агента""" pool = await get_pool() query = """ SELECT kind, content, version, created_at, note FROM agent_prompts WHERE agent_id = $1 AND is_active = true ORDER BY kind """ rows = await pool.fetch(query, agent_id) result = { "core": None, "safety": None, "governance": None, "tools": None } for row in rows: kind = row["kind"] if kind in result: result[kind] = { "content": row["content"], "version": row["version"], "created_at": row["created_at"].isoformat() if row["created_at"] else None, "note": row.get("note") } return result async def get_agent_public_profile(agent_id: str) -> Optional[dict]: """Отримати публічний профіль агента""" pool = await get_pool() query = """ SELECT is_public, public_slug, public_title, public_tagline, COALESCE(public_skills, ARRAY[]::text[]) AS public_skills, public_district, public_primary_room_slug, COALESCE(visibility_scope, 'city') AS visibility_scope, COALESCE(is_listed_in_directory, true) AS is_listed_in_directory, COALESCE(is_system, false) AS is_system FROM agents WHERE id = $1 """ row = await pool.fetchrow(query, agent_id) if not row: return None return { "is_public": row["is_public"], "public_slug": row["public_slug"], "public_title": row["public_title"], "public_tagline": row["public_tagline"], "public_skills": list(row["public_skills"] or []), "public_district": row["public_district"], "public_primary_room_slug": row["public_primary_room_slug"], "visibility_scope": row["visibility_scope"], "is_listed_in_directory": row["is_listed_in_directory"], "is_system": row["is_system"] } async def get_agents_with_home_node( kind: Optional[str] = None, node_id: Optional[str] = None, limit: int = 100, offset: int = 0 ) -> Tuple[List[dict], int]: """Отримати агентів з інформацією про home_node""" pool = await get_pool() params: List[Any] = [] where_clauses = [ "COALESCE(a.is_archived, false) = false", "COALESCE(a.is_test, false) = false", "a.deleted_at IS NULL" ] if kind: params.append(kind) where_clauses.append(f"a.kind = ${len(params)}") if node_id: params.append(node_id) where_clauses.append(f"a.node_id = ${len(params)}") where_sql = " AND ".join(where_clauses) query = f""" SELECT a.id, a.display_name, a.kind, a.avatar_url, a.status, a.is_public, a.public_slug, a.public_title, a.public_district, a.node_id, nc.node_name AS home_node_name, nc.hostname AS home_node_hostname, nc.roles AS home_node_roles, nc.environment AS home_node_environment, COUNT(*) OVER() AS total_count FROM agents a LEFT JOIN node_cache nc ON a.node_id = nc.node_id WHERE {where_sql} ORDER BY a.display_name LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} """ params.append(limit) params.append(offset) rows = await pool.fetch(query, *params) if not rows: return [], 0 total = rows[0]["total_count"] items = [] for row in rows: data = dict(row) data.pop("total_count", None) # Build home_node object if data.get("node_id"): data["home_node"] = { "id": data.get("node_id"), "name": data.get("home_node_name"), "hostname": data.get("home_node_hostname"), "roles": list(data.get("home_node_roles") or []), "environment": data.get("home_node_environment") } else: data["home_node"] = None # Clean up intermediate fields for key in ["home_node_name", "home_node_hostname", "home_node_roles", "home_node_environment"]: data.pop(key, None) items.append(data) return items, total async def get_agents_by_room(room_id: str) -> List[dict]: """Отримати агентів у конкретній кімнаті""" pool = await get_pool() query = """ SELECT id, display_name, kind, avatar_url, color, status, current_room_id, capabilities FROM agents WHERE current_room_id = $1 AND status != 'offline' ORDER BY display_name """ rows = await pool.fetch(query, room_id) return [dict(row) for row in rows] async def get_online_agents() -> List[dict]: """Отримати всіх онлайн агентів""" pool = await get_pool() query = """ SELECT id, display_name, kind, avatar_url, color, status, current_room_id, capabilities FROM agents WHERE status IN ('online', 'busy') ORDER BY display_name """ rows = await pool.fetch(query) return [dict(row) for row in rows] async def update_agent_status(agent_id: str, status: str, room_id: Optional[str] = None) -> Optional[dict]: """Оновити статус агента""" pool = await get_pool() if room_id: query = """ UPDATE agents SET status = $2, current_room_id = $3, updated_at = NOW() WHERE id = $1 RETURNING id, display_name, kind, status, current_room_id """ row = await pool.fetchrow(query, agent_id, status, room_id) else: query = """ UPDATE agents SET status = $2, updated_at = NOW() WHERE id = $1 RETURNING id, display_name, kind, status, current_room_id """ row = await pool.fetchrow(query, agent_id, status) return dict(row) if row else None async def get_agent_by_id(agent_id: str) -> Optional[dict]: """Отримати агента по ID""" pool = await get_pool() query = """ SELECT a.id, a.display_name, a.kind, a.status, a.node_id, a.role, a.avatar_url, COALESCE(a.color_hint, a.color, 'cyan') AS color, a.capabilities, a.primary_room_slug, a.public_primary_room_slug, a.public_district, a.public_title, a.public_tagline, a.public_skills, a.public_slug, a.is_public, a.district AS home_district, a.crew_team_key FROM agents a WHERE a.id = $1 """ row = await pool.fetchrow(query, agent_id) if not row: return None agent = dict(row) agent["capabilities"] = _normalize_capabilities(agent.get("capabilities")) if agent.get("public_skills") is None: agent["public_skills"] = [] # Populate crew_info if agent.get("crew_team_key"): agent["crew_info"] = { "has_crew_team": True, "crew_team_key": agent["crew_team_key"], "matrix_room_id": None # Populated later if needed } # If orchestrator, verify if room exists # For detailed view, let's try to fetch it if agent.get("primary_room_slug"): # Just a placeholder check, logic should be outside or specific method pass else: agent["crew_info"] = { "has_crew_team": False, "crew_team_key": None, "matrix_room_id": None } return agent async def get_agent_public_profile(agent_id: str) -> Optional[dict]: """Отримати публічний профіль агента""" pool = await get_pool() query = """ SELECT is_public, public_slug, public_title, public_tagline, public_skills, public_district, public_primary_room_slug FROM agents WHERE id = $1 """ row = await pool.fetchrow(query, agent_id) if not row: return None result = dict(row) if result.get("public_skills") is None: result["public_skills"] = [] return result async def update_agent_public_profile( agent_id: str, is_public: bool, public_slug: Optional[str], public_title: Optional[str], public_tagline: Optional[str], public_skills: Optional[List[str]], public_district: Optional[str], public_primary_room_slug: Optional[str] ) -> Optional[dict]: """Оновити публічний профіль агента""" pool = await get_pool() query = """ UPDATE agents SET is_public = $2, public_slug = $3, public_title = $4, public_tagline = $5, public_skills = $6, public_district = $7, public_primary_room_slug = $8, updated_at = NOW() WHERE id = $1 RETURNING is_public, public_slug, public_title, public_tagline, public_skills, public_district, public_primary_room_slug """ row = await pool.fetchrow( query, agent_id, is_public, public_slug, public_title, public_tagline, public_skills, public_district, public_primary_room_slug ) if not row: return None result = dict(row) if result.get("public_skills") is None: result["public_skills"] = [] return result async def get_agent_rooms(agent_id: str) -> List[dict]: """Отримати список кімнат агента (primary/public)""" pool = await get_pool() query = """ SELECT primary_room_slug, public_primary_room_slug FROM agents WHERE id = $1 """ row = await pool.fetchrow(query, agent_id) if not row: return [] slugs = [] if row.get("primary_room_slug"): slugs.append(row["primary_room_slug"]) if row.get("public_primary_room_slug") and row["public_primary_room_slug"] not in slugs: slugs.append(row["public_primary_room_slug"]) if not slugs: return [] rooms_query = """ SELECT id, slug, name FROM city_rooms WHERE slug = ANY($1::text[]) """ rooms = await pool.fetch(rooms_query, slugs) return [dict(room) for room in rooms] async def get_agent_matrix_config(agent_id: str) -> Optional[dict]: """Отримати Matrix-конфіг агента""" pool = await get_pool() query = """ SELECT agent_id, matrix_user_id, primary_room_id FROM agent_matrix_config WHERE agent_id = $1 """ row = await pool.fetchrow(query, agent_id) return dict(row) if row else None async def get_public_agent_by_slug(slug: str) -> Optional[dict]: """Отримати базову інформацію про публічного агента""" pool = await get_pool() query = """ SELECT id, display_name, public_primary_room_slug, primary_room_slug, public_district, public_title, public_tagline FROM agents WHERE public_slug = $1 AND is_public = true LIMIT 1 """ row = await pool.fetchrow(query, slug) return dict(row) if row else None async def get_microdao_for_agent(agent_id: str) -> Optional[dict]: """Отримати MicroDAO для агента (аліас get_agent_microdao)""" return await get_agent_microdao(agent_id) # ============================================================================= # Citizens Repository # ============================================================================= async def get_public_citizens( district: Optional[str] = None, kind: Optional[str] = None, q: Optional[str] = None, limit: int = 50, offset: int = 0 ) -> Tuple[List[dict], int]: """Отримати публічних громадян""" pool = await get_pool() params: List[Any] = [] where_clauses = [ "a.is_public = true", "a.public_slug IS NOT NULL", "COALESCE(a.is_archived, false) = false", "COALESCE(a.is_test, false) = false", "a.deleted_at IS NULL", # TASK 037A: Stricter filtering for Citizens Layer "a.node_id IS NOT NULL", "EXISTS (SELECT 1 FROM microdao_agents ma WHERE ma.agent_id = a.id)" ] if district: params.append(district) where_clauses.append(f"a.public_district = ${len(params)}") if kind: params.append(kind) where_clauses.append(f"a.kind = ${len(params)}") if q: params.append(f"%{q}%") where_clauses.append( f"(a.display_name ILIKE ${len(params)} OR a.public_title ILIKE ${len(params)} OR a.public_tagline ILIKE ${len(params)})" ) where_sql = " AND ".join(where_clauses) query = f""" SELECT a.id, a.public_slug, a.display_name, a.public_title, a.public_tagline, a.avatar_url, a.kind, a.public_district, a.public_primary_room_slug, COALESCE(a.public_skills, ARRAY[]::text[]) AS public_skills, COALESCE(a.status, 'unknown') AS status, a.node_id, nc.node_name AS home_node_name, nc.hostname AS home_node_hostname, nc.roles AS home_node_roles, nc.environment AS home_node_environment, -- MicroDAO info m.slug AS home_microdao_slug, m.name AS home_microdao_name, -- Room info cr.id AS room_id, cr.slug AS room_slug, cr.name AS room_name, cr.matrix_room_id AS room_matrix_id, COUNT(*) OVER() AS total_count FROM agents a LEFT JOIN node_cache nc ON a.node_id = nc.node_id -- Join primary MicroDAO LEFT JOIN LATERAL ( SELECT ma.agent_id, md.slug, md.name FROM microdao_agents ma JOIN microdaos md ON ma.microdao_id = md.id WHERE ma.agent_id = a.id ORDER BY ma.is_core DESC, md.name LIMIT 1 ) m ON true -- Join primary room (by public_primary_room_slug) LEFT JOIN city_rooms cr ON cr.slug = a.public_primary_room_slug WHERE {where_sql} ORDER BY a.display_name LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} """ params.append(limit) params.append(offset) rows = await pool.fetch(query, *params) if not rows: return [], 0 total = rows[0]["total_count"] items = [] for row in rows: data = dict(row) data.pop("total_count", None) data["public_skills"] = list(data.get("public_skills") or []) data["online_status"] = data.get("status") or "unknown" # Build home_node object if data.get("node_id"): data["home_node"] = { "id": data.get("node_id"), "name": data.get("home_node_name"), "hostname": data.get("home_node_hostname"), "roles": list(data.get("home_node_roles") or []), "environment": data.get("home_node_environment") } else: data["home_node"] = None # Build primary_city_room object if data.get("room_id"): data["primary_city_room"] = { "id": str(data["room_id"]), "slug": data["room_slug"], "name": data["room_name"], "matrix_room_id": data.get("room_matrix_id") } else: data["primary_city_room"] = None # Clean up intermediate fields for key in ["home_node_name", "home_node_hostname", "home_node_roles", "home_node_environment", "room_id", "room_slug", "room_name", "room_matrix_id"]: data.pop(key, None) items.append(data) return items, total async def get_agent_microdao(agent_id: str) -> Optional[dict]: """Отримати MicroDAO, до якого належить агент (перший збіг)""" pool = await get_pool() query = """ SELECT m.id, m.slug, m.name, m.district FROM microdao_agents ma JOIN microdaos m ON m.id = ma.microdao_id WHERE ma.agent_id = $1 ORDER BY ma.is_core DESC, m.name LIMIT 1 """ row = await pool.fetchrow(query, agent_id) return dict(row) if row else None async def get_microdao_public_citizens(microdao_id: str) -> List[dict]: """Отримати публічних громадян конкретного MicroDAO""" pool = await get_pool() query = """ SELECT a.public_slug AS slug, a.display_name, a.public_title, a.public_tagline, a.avatar_url, a.public_district, a.public_primary_room_slug FROM microdao_agents ma JOIN agents a ON a.id = ma.agent_id WHERE ma.microdao_id = $1 AND a.is_public = true AND a.public_slug IS NOT NULL ORDER BY a.display_name """ rows = await pool.fetch(query, microdao_id) result = [] for row in rows: data = dict(row) result.append(data) return result async def get_public_citizen_by_slug(slug: str) -> Optional[dict]: """Отримати детальний профіль громадянина""" pool = await get_pool() query = """ SELECT a.id, a.display_name, a.kind, a.status, a.node_id, a.avatar_url, a.public_slug, a.public_title, a.public_tagline, COALESCE(a.public_skills, ARRAY[]::text[]) AS public_skills, a.public_district, a.public_primary_room_slug, a.primary_room_slug, nc.node_name AS home_node_name, nc.hostname AS home_node_hostname, nc.roles AS home_node_roles, nc.environment AS home_node_environment FROM agents a LEFT JOIN node_cache nc ON a.node_id = nc.node_id WHERE a.public_slug = $1 AND a.is_public = true LIMIT 1 """ agent_row = await pool.fetchrow(query, slug) if not agent_row: return None agent = dict(agent_row) agent["public_skills"] = list(agent.get("public_skills") or []) # Build home_node object home_node = None if agent.get("node_id"): home_node = { "id": agent.get("node_id"), "name": agent.get("home_node_name"), "hostname": agent.get("home_node_hostname"), "roles": list(agent.get("home_node_roles") or []), "environment": agent.get("home_node_environment") } rooms = await get_agent_rooms(agent["id"]) primary_room = agent.get("public_primary_room_slug") or agent.get("primary_room_slug") city_presence = { "primary_room_slug": primary_room, "rooms": rooms } if rooms else { "primary_room_slug": primary_room, "rooms": [] } dais_public = { "core": { "archetype": agent.get("kind"), "bio_short": agent.get("public_tagline") }, "phenotype": { "visual": { "avatar_url": agent.get("avatar_url"), "color": None } }, "memex": {}, "economics": {} } interaction = { "matrix_user": None, "primary_room_slug": primary_room, "actions": ["chat", "ask_for_help"] } metrics_public: Dict[str, Any] = {} microdao = await get_agent_microdao(agent["id"]) return { "slug": agent["public_slug"], "display_name": agent["display_name"], "kind": agent.get("kind"), "public_title": agent.get("public_title"), "public_tagline": agent.get("public_tagline"), "district": agent.get("public_district"), "avatar_url": agent.get("avatar_url"), "status": agent.get("status"), "node_id": agent.get("node_id"), "public_skills": agent.get("public_skills"), "city_presence": city_presence, "dais_public": dais_public, "interaction": interaction, "metrics_public": metrics_public, "microdao": microdao, "admin_panel_url": f"/agents/{agent['id']}", "home_node": home_node } # ============================================================================= # MicroDAO Membership Repository # ============================================================================= async def get_microdao_options() -> List[dict]: """Отримати список активних MicroDAO для селектора""" pool = await get_pool() query = """ SELECT id, slug, name, district, is_active FROM microdaos WHERE is_active = true ORDER BY name """ rows = await pool.fetch(query) return [dict(row) for row in rows] async def get_agent_microdao_memberships(agent_id: str) -> List[dict]: """Отримати всі членства агента в MicroDAO""" pool = await get_pool() query = """ SELECT ma.microdao_id, m.slug AS microdao_slug, m.name AS microdao_name, m.logo_url, ma.role, ma.is_core FROM microdao_agents ma JOIN microdaos m ON m.id = ma.microdao_id WHERE ma.agent_id = $1 ORDER BY ma.is_core DESC, m.name """ rows = await pool.fetch(query, agent_id) return [dict(row) for row in rows] async def upsert_agent_microdao_membership( agent_id: str, microdao_id: str, role: Optional[str], is_core: bool ) -> Optional[dict]: """Призначити або оновити членство агента в MicroDAO""" pool = await get_pool() query = """ WITH upsert AS ( INSERT INTO microdao_agents (microdao_id, agent_id, role, is_core) VALUES ($1, $2, $3, $4) ON CONFLICT (microdao_id, agent_id) DO UPDATE SET role = EXCLUDED.role, is_core = EXCLUDED.is_core RETURNING microdao_id, agent_id, role, is_core ) SELECT u.microdao_id, m.slug AS microdao_slug, m.name AS microdao_name, u.role, u.is_core FROM upsert u JOIN microdaos m ON m.id = u.microdao_id """ row = await pool.fetchrow(query, microdao_id, agent_id, role, is_core) return dict(row) if row else None async def remove_agent_microdao_membership(agent_id: str, microdao_id: str) -> bool: """Видалити членство агента в MicroDAO""" pool = await get_pool() result = await pool.execute( "DELETE FROM microdao_agents WHERE agent_id = $1 AND microdao_id = $2", agent_id, microdao_id ) # asyncpg returns strings like "DELETE 1" return result.split(" ")[-1] != "0" # ============================================================================= # MicroDAO Repository # ============================================================================= async def get_microdaos(district: Optional[str] = None, q: Optional[str] = None, limit: int = 50, offset: int = 0) -> List[dict]: """Отримати список MicroDAOs з агрегованою статистикою""" pool = await get_pool() params = [] where_clauses = [ "m.is_public = true", "m.is_active = true", "COALESCE(m.is_archived, false) = false", "COALESCE(m.is_test, false) = false", "m.deleted_at IS NULL" ] if district: params.append(district) where_clauses.append(f"m.district = ${len(params)}") if q: params.append(f"%{q}%") where_clauses.append(f"(m.name ILIKE ${len(params)} OR m.description ILIKE ${len(params)})") where_sql = " AND ".join(where_clauses) query = f""" SELECT m.id, m.slug, m.name, m.description, m.district, COALESCE(m.orchestrator_agent_id, m.owner_agent_id) as orchestrator_agent_id, oa.display_name as orchestrator_agent_name, m.is_active, COALESCE(m.is_public, true) as is_public, COALESCE(m.is_platform, false) as is_platform, m.parent_microdao_id, pm.slug as parent_microdao_slug, m.logo_url, m.banner_url, COUNT(DISTINCT ma.agent_id) AS agents_count, COUNT(DISTINCT ma.agent_id) AS member_count, COUNT(DISTINCT mc.id) AS channels_count, COUNT(DISTINCT CASE WHEN mc.kind = 'city_room' THEN mc.id END) AS rooms_count, COUNT(DISTINCT CASE WHEN mc.kind = 'city_room' THEN mc.id END) AS room_count FROM microdaos m LEFT JOIN microdao_agents ma ON ma.microdao_id = m.id LEFT JOIN microdao_channels mc ON mc.microdao_id = m.id LEFT JOIN agents oa ON COALESCE(m.orchestrator_agent_id, m.owner_agent_id) = oa.id LEFT JOIN microdaos pm ON m.parent_microdao_id = pm.id WHERE {where_sql} GROUP BY m.id, oa.display_name, pm.slug ORDER BY m.name LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} """ # Append limit and offset to params params.append(limit) params.append(offset) rows = await pool.fetch(query, *params) return [dict(row) for row in rows] async def list_microdao_summaries( *, is_public: Optional[bool] = None, is_platform: Optional[bool] = None, district: Optional[str] = None, q: Optional[str] = None, limit: int = 50, offset: int = 0 ) -> List[dict]: """ Unified method to list microDAOs. Wraps get_microdaos with additional filtering. """ pool = await get_pool() params = [] where_clauses = [ "COALESCE(m.is_archived, false) = false", "COALESCE(m.is_test, false) = false", "m.deleted_at IS NULL", "m.is_active = true" ] if is_public is not None: params.append(is_public) where_clauses.append(f"COALESCE(m.is_public, true) = ${len(params)}") if is_platform is not None: params.append(is_platform) where_clauses.append(f"COALESCE(m.is_platform, false) = ${len(params)}") if district: params.append(district) where_clauses.append(f"m.district = ${len(params)}") if q: params.append(f"%{q}%") where_clauses.append(f"(m.name ILIKE ${len(params)} OR m.description ILIKE ${len(params)})") where_sql = " AND ".join(where_clauses) query = f""" SELECT m.id, m.slug, m.name, m.description, m.district, COALESCE(m.orchestrator_agent_id, m.owner_agent_id) as orchestrator_agent_id, oa.display_name as orchestrator_agent_name, m.is_active, COALESCE(m.is_public, true) as is_public, COALESCE(m.is_platform, false) as is_platform, m.parent_microdao_id, pm.slug as parent_microdao_slug, m.logo_url, m.banner_url, COUNT(DISTINCT ma.agent_id) AS agents_count, COUNT(DISTINCT ma.agent_id) AS member_count, COUNT(DISTINCT mc.id) AS channels_count, COUNT(DISTINCT CASE WHEN mc.kind = 'city_room' THEN mc.id END) AS rooms_count, COUNT(DISTINCT CASE WHEN mc.kind = 'city_room' THEN mc.id END) AS room_count FROM microdaos m LEFT JOIN microdao_agents ma ON ma.microdao_id = m.id LEFT JOIN microdao_channels mc ON mc.microdao_id = m.id LEFT JOIN agents oa ON COALESCE(m.orchestrator_agent_id, m.owner_agent_id) = oa.id LEFT JOIN microdaos pm ON m.parent_microdao_id = pm.id WHERE {where_sql} GROUP BY m.id, oa.display_name, pm.slug ORDER BY m.name LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} """ params.append(limit) params.append(offset) rows = await pool.fetch(query, *params) return [dict(row) for row in rows] async def get_microdao_detail(slug: str) -> Optional[dict]: """ Get detailed microDAO info including agents, channels, children. Alias for get_microdao_by_slug with clearer naming. """ return await get_microdao_by_slug(slug) async def get_microdao_by_slug(slug: str) -> Optional[dict]: """Отримати детальну інформацію про MicroDAO""" pool = await get_pool() # 1. Get main DAO info query_dao = """ SELECT m.id, m.slug, m.name, m.description, m.district, COALESCE(m.orchestrator_agent_id, m.owner_agent_id) as orchestrator_agent_id, a.display_name as orchestrator_display_name, m.is_active, COALESCE(m.is_public, true) as is_public, COALESCE(m.is_platform, false) as is_platform, m.parent_microdao_id, pm.slug as parent_microdao_slug, m.logo_url, m.banner_url FROM microdaos m LEFT JOIN agents a ON COALESCE(m.orchestrator_agent_id, m.owner_agent_id) = a.id LEFT JOIN microdaos pm ON m.parent_microdao_id = pm.id WHERE m.slug = $1 AND COALESCE(m.is_archived, false) = false AND COALESCE(m.is_test, false) = false AND m.deleted_at IS NULL """ dao_row = await pool.fetchrow(query_dao, slug) if not dao_row: return None result = dict(dao_row) dao_id = result["id"] # 2. Get Agents query_agents = """ SELECT ma.agent_id, ma.role, ma.is_core, a.display_name FROM microdao_agents ma JOIN agents a ON ma.agent_id = a.id WHERE ma.microdao_id = $1 AND COALESCE(a.is_archived, false) = false AND COALESCE(a.is_test, false) = false AND a.deleted_at IS NULL ORDER BY ma.is_core DESC, ma.role """ agents_rows = await pool.fetch(query_agents, dao_id) result["agents"] = [dict(row) for row in agents_rows] # 3. Get Channels query_channels = """ SELECT kind, ref_id, display_name, is_primary FROM microdao_channels WHERE microdao_id = $1 ORDER BY is_primary DESC, kind """ channels_rows = await pool.fetch(query_channels, dao_id) result["channels"] = [dict(row) for row in channels_rows] # 4. Get child microDAOs query_children = """ SELECT id, slug, name, COALESCE(is_public, true) as is_public, COALESCE(is_platform, false) as is_platform FROM microdaos WHERE parent_microdao_id = $1 AND COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false AND deleted_at IS NULL ORDER BY name """ children_rows = await pool.fetch(query_children, dao_id) result["child_microdaos"] = [dict(row) for row in children_rows] public_citizens = await get_microdao_public_citizens(dao_id) result["public_citizens"] = public_citizens return result async def update_microdao_branding( microdao_slug: str, logo_url: Optional[str] = None, banner_url: Optional[str] = None ) -> Optional[dict]: """Оновити брендинг MicroDAO""" pool = await get_pool() set_parts = ["updated_at = NOW()"] params = [microdao_slug] if logo_url is not None: params.append(logo_url) set_parts.append(f"logo_url = ${len(params)}") if banner_url is not None: params.append(banner_url) set_parts.append(f"banner_url = ${len(params)}") query = f""" UPDATE microdaos SET {', '.join(set_parts)} WHERE slug = $1 RETURNING id, slug, name, logo_url, banner_url """ row = await pool.fetchrow(query, *params) return dict(row) if row else None async def update_room_branding( room_id: str, logo_url: Optional[str] = None, banner_url: Optional[str] = None ) -> Optional[dict]: """Оновити брендинг кімнати""" pool = await get_pool() set_parts = ["updated_at = NOW()"] params = [room_id] if logo_url is not None: params.append(logo_url) set_parts.append(f"logo_url = ${len(params)}") if banner_url is not None: params.append(banner_url) set_parts.append(f"banner_url = ${len(params)}") query = f""" UPDATE city_rooms SET {', '.join(set_parts)} WHERE id = $1 RETURNING id, slug, name, logo_url, banner_url """ row = await pool.fetchrow(query, *params) return dict(row) if row else None # ============================================================================= # Nodes Repository # ============================================================================= async def get_all_nodes() -> List[dict]: """Отримати список всіх нод з кількістю агентів та Guardian/Steward""" pool = await get_pool() query = """ SELECT nc.node_id, nc.node_name AS name, nc.hostname, nc.roles, nc.environment, nc.status, nc.gpu, nc.last_sync AS last_heartbeat, nc.guardian_agent_id, nc.steward_agent_id, (SELECT COUNT(*) FROM agents a WHERE a.node_id = nc.node_id) AS agents_total, (SELECT COUNT(*) FROM agents a WHERE a.node_id = nc.node_id AND a.status = 'online') AS agents_online, ga.display_name AS guardian_name, sa.display_name AS steward_name FROM node_cache nc LEFT JOIN agents ga ON nc.guardian_agent_id = ga.id LEFT JOIN agents sa ON nc.steward_agent_id = sa.id ORDER BY nc.environment DESC, nc.node_name """ rows = await pool.fetch(query) result = [] for row in rows: data = dict(row) # Build guardian_agent object if data.get("guardian_agent_id"): data["guardian_agent"] = { "id": data.get("guardian_agent_id"), "name": data.get("guardian_name"), } else: data["guardian_agent"] = None # Build steward_agent object if data.get("steward_agent_id"): data["steward_agent"] = { "id": data.get("steward_agent_id"), "name": data.get("steward_name"), } else: data["steward_agent"] = None # Clean up data.pop("guardian_name", None) data.pop("steward_name", None) result.append(data) return result async def get_node_by_id(node_id: str) -> Optional[dict]: """Отримати ноду по ID з Guardian та Steward агентами""" pool = await get_pool() query = """ SELECT nc.node_id, nc.node_name AS name, nc.hostname, nc.roles, nc.environment, nc.status, nc.gpu, nc.last_sync AS last_heartbeat, nc.guardian_agent_id, nc.steward_agent_id, (SELECT COUNT(*) FROM agents a WHERE a.node_id = nc.node_id) AS agents_total, (SELECT COUNT(*) FROM agents a WHERE a.node_id = nc.node_id AND a.status = 'online') AS agents_online, -- Guardian agent info ga.display_name AS guardian_name, ga.kind AS guardian_kind, ga.public_slug AS guardian_slug, -- Steward agent info sa.display_name AS steward_name, sa.kind AS steward_kind, sa.public_slug AS steward_slug FROM node_cache nc LEFT JOIN agents ga ON nc.guardian_agent_id = ga.id LEFT JOIN agents sa ON nc.steward_agent_id = sa.id WHERE nc.node_id = $1 """ row = await pool.fetchrow(query, node_id) if not row: return None data = dict(row) # Fetch MicroDAOs where orchestrator is on this node microdaos = await pool.fetch(""" SELECT m.id, m.slug, m.name, COUNT(cr.id) as rooms_count FROM microdaos m JOIN agents a ON m.orchestrator_agent_id = a.id LEFT JOIN city_rooms cr ON cr.microdao_id = m.id WHERE a.node_id = $1 GROUP BY m.id, m.slug, m.name ORDER BY m.name """, node_id) data["microdaos"] = [dict(m) for m in microdaos] # Build guardian_agent object if data.get("guardian_agent_id"): data["guardian_agent"] = { "id": data.get("guardian_agent_id"), "name": data.get("guardian_name"), "kind": data.get("guardian_kind"), "slug": data.get("guardian_slug"), } else: data["guardian_agent"] = None # Build steward_agent object if data.get("steward_agent_id"): data["steward_agent"] = { "id": data.get("steward_agent_id"), "name": data.get("steward_name"), "kind": data.get("steward_kind"), "slug": data.get("steward_slug"), } else: data["steward_agent"] = None # TASK 038: Dynamic discovery of Node Guardian / Steward if cache is empty if not data["guardian_agent"] or not data["steward_agent"]: dynamic_agents = await pool.fetch(""" SELECT id, display_name, kind, public_slug FROM agents WHERE node_id = $1 AND (kind IN ('node_guardian', 'node_steward') OR kind IN ('infra_monitor', 'infra_ops')) AND COALESCE(is_archived, false) = false """, node_id) if not data["guardian_agent"]: # Prefer 'node_guardian', fallback to 'infra_monitor' guardian = next((a for a in dynamic_agents if a['kind'] == 'node_guardian'), next((a for a in dynamic_agents if a['kind'] == 'infra_monitor'), None)) if guardian: data["guardian_agent"] = { "id": guardian["id"], "name": guardian["display_name"], "kind": guardian["kind"], "slug": guardian["public_slug"] } if not data["steward_agent"]: # Prefer 'node_steward', fallback to 'infra_ops' steward = next((a for a in dynamic_agents if a['kind'] == 'node_steward'), next((a for a in dynamic_agents if a['kind'] == 'infra_ops'), None)) if steward: data["steward_agent"] = { "id": steward["id"], "name": steward["display_name"], "kind": steward["kind"], "slug": steward["public_slug"] } # Clean up intermediate fields for key in ["guardian_name", "guardian_kind", "guardian_slug", "steward_name", "steward_kind", "steward_slug"]: data.pop(key, None) return data # ============================================================================= # MicroDAO Visibility & Creation (Task 029) # ============================================================================= async def update_microdao_visibility( microdao_id: str, *, is_public: bool, is_platform: Optional[bool] = None, ) -> Optional[dict]: """ Оновити налаштування видимості MicroDAO. Returns updated MicroDAO data or None if not found. """ pool = await get_pool() set_parts = ["is_public = $2", "updated_at = NOW()"] params = [microdao_id, is_public] if is_platform is not None: params.append(is_platform) set_parts.append(f"is_platform = ${len(params)}") query = f""" UPDATE microdaos SET {', '.join(set_parts)} WHERE id = $1 AND COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false RETURNING id, slug, name, is_public, is_platform """ result = await pool.fetchrow(query, *params) return dict(result) if result else None async def create_microdao_for_agent( orchestrator_agent_id: str, *, name: str, slug: str, description: Optional[str] = None, make_platform: bool = False, is_public: bool = True, parent_microdao_id: Optional[str] = None, ) -> Optional[dict]: """ Створює microDAO, прив'язує його до агента-оркестратора. 1. INSERT новий microDAO 2. Додати агента в microdao_agents 3. Оновити агента: primary_microdao_id, is_orchestrator = true 4. Повернути створений microDAO """ pool = await get_pool() import uuid microdao_id = str(uuid.uuid4()) async with pool.acquire() as conn: async with conn.transaction(): # 1. Create microDAO insert_dao_query = """ INSERT INTO microdaos ( id, slug, name, description, orchestrator_agent_id, is_public, is_platform, parent_microdao_id, is_active, created_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, true, NOW()) RETURNING id, slug, name, description, is_public, is_platform """ dao_row = await conn.fetchrow( insert_dao_query, microdao_id, slug, name, description, orchestrator_agent_id, is_public, make_platform, parent_microdao_id ) if not dao_row: return None # 2. Add agent to microdao_agents as orchestrator insert_member_query = """ INSERT INTO microdao_agents (microdao_id, agent_id, role, is_core, joined_at) VALUES ($1, $2, 'orchestrator', true, NOW()) ON CONFLICT (microdao_id, agent_id) DO UPDATE SET role = 'orchestrator', is_core = true """ await conn.execute(insert_member_query, microdao_id, orchestrator_agent_id) # 3. Update agent: set primary_microdao_id if empty, set is_orchestrator = true # Also set public_slug if is_public, so orchestrator becomes a public citizen update_agent_query = """ UPDATE agents SET is_orchestrator = true, is_public = CASE WHEN $3 THEN true ELSE is_public END, public_slug = CASE WHEN $3 AND (public_slug IS NULL OR public_slug = '') THEN id ELSE public_slug END, primary_microdao_id = COALESCE(primary_microdao_id, $2), updated_at = NOW() WHERE id = $1 """ await conn.execute(update_agent_query, orchestrator_agent_id, microdao_id, is_public) return dict(dao_row) async def get_microdao_primary_room(microdao_id: str) -> Optional[dict]: """ Отримати основну кімнату MicroDAO для чату. Пріоритет: room_role='primary' → найнижчий sort_order → перша кімната. """ pool = await get_pool() query = """ SELECT cr.id, cr.slug, cr.name, cr.matrix_room_id, cr.microdao_id, cr.room_role, cr.is_public, cr.sort_order FROM city_rooms cr WHERE cr.microdao_id = $1 ORDER BY CASE WHEN cr.room_role = 'primary' THEN 0 ELSE 1 END, cr.sort_order ASC, cr.name ASC LIMIT 1 """ row = await pool.fetchrow(query, microdao_id) if row: return { "id": str(row["id"]), "slug": row["slug"], "name": row["name"], "matrix_room_id": row.get("matrix_room_id"), "microdao_id": str(row["microdao_id"]) if row.get("microdao_id") else None, "room_role": row.get("room_role"), "is_public": row.get("is_public", True), "sort_order": row.get("sort_order", 100) } return None async def get_microdao_rooms(microdao_id: str) -> List[dict]: """ Отримати всі кімнати MicroDAO, впорядковані за sort_order. """ pool = await get_pool() query = """ SELECT cr.id, cr.slug, cr.name, cr.matrix_room_id, cr.microdao_id, cr.room_role, cr.is_public, cr.sort_order, cr.logo_url, cr.banner_url, m.slug AS microdao_slug FROM city_rooms cr LEFT JOIN microdaos m ON cr.microdao_id = m.id WHERE cr.microdao_id = $1 ORDER BY CASE WHEN cr.room_role = 'primary' THEN 0 ELSE 1 END, cr.sort_order ASC, cr.name ASC """ rows = await pool.fetch(query, microdao_id) return [ { "id": str(row["id"]), "slug": row["slug"], "name": row["name"], "matrix_room_id": row.get("matrix_room_id"), "microdao_id": str(row["microdao_id"]) if row.get("microdao_id") else None, "microdao_slug": row.get("microdao_slug"), "room_role": row.get("room_role"), "is_public": row.get("is_public", True), "sort_order": row.get("sort_order", 100), "logo_url": row.get("logo_url"), "banner_url": row.get("banner_url") } for row in rows ] async def get_microdao_rooms_by_slug(slug: str) -> Optional[dict]: """ Отримати MicroDAO та всі його кімнати за slug. """ pool = await get_pool() # Get microdao first microdao_query = """ SELECT id, slug FROM microdaos WHERE slug = $1 AND COALESCE(is_archived, false) = false AND COALESCE(is_test, false) = false """ microdao = await pool.fetchrow(microdao_query, slug) if not microdao: return None microdao_id = str(microdao["id"]) rooms = await get_microdao_rooms(microdao_id) return { "microdao_id": microdao_id, "microdao_slug": microdao["slug"], "rooms": rooms } async def attach_room_to_microdao( microdao_id: str, room_id: str, room_role: Optional[str] = None, is_public: bool = True, sort_order: int = 100 ) -> Optional[dict]: """ Прив'язати існуючу кімнату до MicroDAO. """ pool = await get_pool() query = """ UPDATE city_rooms SET microdao_id = $1, room_role = $2, is_public = $3, sort_order = $4 WHERE id = $5 RETURNING id, slug, name, matrix_room_id, microdao_id, room_role, is_public, sort_order, logo_url, banner_url """ row = await pool.fetchrow(query, microdao_id, room_role, is_public, sort_order, room_id) if row: return { "id": str(row["id"]), "slug": row["slug"], "name": row["name"], "matrix_room_id": row.get("matrix_room_id"), "microdao_id": str(row["microdao_id"]) if row.get("microdao_id") else None, "room_role": row.get("room_role"), "is_public": row.get("is_public", True), "sort_order": row.get("sort_order", 100), "logo_url": row.get("logo_url"), "banner_url": row.get("banner_url") } return None async def update_microdao_room( microdao_id: str, room_id: str, room_role: Optional[str] = None, is_public: Optional[bool] = None, sort_order: Optional[int] = None, set_primary: bool = False ) -> Optional[dict]: """ Оновити налаштування кімнати MicroDAO. Якщо set_primary=True, скидає роль 'primary' з інших кімнат. """ pool = await get_pool() async with pool.acquire() as conn: async with conn.transaction(): # If setting as primary, clear previous primary if set_primary: await conn.execute( """ UPDATE city_rooms SET room_role = NULL WHERE microdao_id = $1 AND room_role = 'primary' """, microdao_id ) room_role = 'primary' # Build update query set_parts = [] params = [room_id, microdao_id] param_idx = 3 if room_role is not None: set_parts.append(f"room_role = ${param_idx}") params.append(room_role) param_idx += 1 if is_public is not None: set_parts.append(f"is_public = ${param_idx}") params.append(is_public) param_idx += 1 if sort_order is not None: set_parts.append(f"sort_order = ${param_idx}") params.append(sort_order) param_idx += 1 if not set_parts: # Nothing to update, just return current state row = await conn.fetchrow( "SELECT * FROM city_rooms WHERE id = $1 AND microdao_id = $2", room_id, microdao_id ) else: query = f""" UPDATE city_rooms SET {', '.join(set_parts)} WHERE id = $1 AND microdao_id = $2 RETURNING id, slug, name, matrix_room_id, microdao_id, room_role, is_public, sort_order, logo_url, banner_url """ row = await conn.fetchrow(query, *params) if row: return { "id": str(row["id"]), "slug": row["slug"], "name": row["name"], "matrix_room_id": row.get("matrix_room_id"), "microdao_id": str(row["microdao_id"]) if row.get("microdao_id") else None, "room_role": row.get("room_role"), "is_public": row.get("is_public", True), "sort_order": row.get("sort_order", 100), "logo_url": row.get("logo_url"), "banner_url": row.get("banner_url") } return None # ============================================================================= # TASK 044: Orchestrator Crew Team Room # ============================================================================= async def create_matrix_room_for_microdao_orchestrator( microdao_id: str, microdao_name: str, orchestrator_agent_id: str ) -> Optional[dict]: """ Викликати Matrix Gateway для створення кімнати команди оркестратора. """ # TODO: This should ideally be done with a proper Matrix user (e.g. app bot or the orchestrator agent itself if possible) # For now, we'll use the system admin user logic in matrix-gateway or a specialized endpoint. # Since we are in repo, we don't have the user's token. We rely on matrix-gateway internal API. async with httpx.AsyncClient(timeout=30.0) as client: try: # Ensure matrix room alias is unique room_alias = f"orchestrator_team_{microdao_id[:8]}" room_name = f"{microdao_name} — Orchestrator Team" # Call Matrix Gateway to create room # Using /internal/matrix/rooms/create (assuming it exists or we reuse a similar logic) # If not, we might need to implement it in gateway-bot. # Let's assume we use a new endpoint or existing one. # Actually, we can reuse POST /internal/matrix/rooms if it exists or just use bot API. # NOTE: In real implementation, we need to authenticate this request or ensure network security. resp = await client.post( f"{MATRIX_GATEWAY_URL}/internal/matrix/rooms", json={ "alias": room_alias, "name": room_name, "topic": "Private team chat for MicroDAO Orchestrator", "preset": "private_chat", # or public_chat, but team chat usually private "initial_state": [] } ) if resp.status_code not in (200, 201): logger.error(f"Matrix Gateway failed to create room: {resp.text}") return None data = resp.json() return { "room_id": data["room_id"], "room_alias": data.get("room_alias", room_alias) } except Exception as e: logger.error(f"Failed to create matrix room via gateway: {e}") return None async def get_or_create_orchestrator_team_room(microdao_id: str) -> Optional[dict]: """ Знайти або створити кімнату команди оркестратора для MicroDAO. """ pool = await get_pool() # 1. Check if room exists in DB existing_room_query = """ SELECT cr.id, cr.slug, cr.name, cr.matrix_room_id, cr.microdao_id, cr.room_role, cr.is_public, cr.sort_order FROM city_rooms cr WHERE cr.microdao_id = $1 AND cr.room_role = 'orchestrator_team' LIMIT 1 """ room_row = await pool.fetchrow(existing_room_query, microdao_id) if room_row: return dict(room_row) # 2. If not, fetch MicroDAO details to create one microdao_query = """ SELECT id, name, slug, orchestrator_agent_id FROM microdaos WHERE id = $1 """ microdao = await pool.fetchrow(microdao_query, microdao_id) if not microdao or not microdao["orchestrator_agent_id"]: logger.warning(f"MicroDAO {microdao_id} not found or has no orchestrator") return None # 3. Create Matrix room matrix_info = await create_matrix_room_for_microdao_orchestrator( microdao_id=microdao_id, microdao_name=microdao["name"], orchestrator_agent_id=microdao["orchestrator_agent_id"] ) if not matrix_info: logger.error("Failed to create Matrix room for orchestrator team") # Fallback: Create DB record without Matrix ID if needed, or fail? # Let's fail for now as Matrix ID is crucial for this feature. return None # 4. Create DB record slug = f"{microdao['slug']}-team" # Ensure unique slug while True: check_slug = await pool.fetchrow("SELECT 1 FROM city_rooms WHERE slug = $1", slug) if not check_slug: break slug = f"{slug}-{secrets.token_hex(2)}" create_query = """ INSERT INTO city_rooms ( id, slug, name, description, created_by, matrix_room_id, matrix_room_alias, microdao_id, room_role, is_public, sort_order ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id, slug, name, matrix_room_id, microdao_id, room_role, is_public, sort_order """ room_id = f"room_city_{slug}" new_room = await pool.fetchrow( create_query, room_id, slug, f"{microdao['name']} Team", "Orchestrator Team Chat", "system", matrix_info["room_id"], matrix_info.get("room_alias"), microdao_id, "orchestrator_team", False, # Private by default 50 # Sort order (high priority) ) return dict(new_room)