diff --git a/scripts/deploy-prod.sh b/scripts/deploy-prod.sh old mode 100644 new mode 100755 diff --git a/services/city-service/models_city.py b/services/city-service/models_city.py index 45f039bd..678563ab 100644 --- a/services/city-service/models_city.py +++ b/services/city-service/models_city.py @@ -1,621 +1,618 @@ - 1|""" - 2|Pydantic Models для City Backend - 3|""" - 4| - 5|from pydantic import BaseModel, Field - 6|from typing import Optional, List, Dict, Any - 7|from datetime import datetime - 8| - 9| - 10|# ============================================================================= - 11|# City Rooms - 12|# ============================================================================= - 13| - 14|class CityRoomBase(BaseModel): - 15| slug: str - 16| name: str - 17| description: Optional[str] = None - 18| - 19| - 20|class CityRoomCreate(CityRoomBase): - 21| pass - 22| - 23| - 24|class CityRoomRead(CityRoomBase): - 25| id: str - 26| is_default: bool - 27| created_at: datetime - 28| created_by: Optional[str] = None - 29| members_online: int = 0 - 30| last_event: Optional[str] = None - 31| # Branding - 32| logo_url: Optional[str] = None - 33| banner_url: Optional[str] = None - 34| # Context - 35| microdao_id: Optional[str] = None - 36| microdao_name: Optional[str] = None - 37| microdao_slug: Optional[str] = None - 38| microdao_logo_url: Optional[str] = None - 39| # Matrix integration - 40| matrix_room_id: Optional[str] = None - 41| matrix_room_alias: Optional[str] = None - 42| - 43| - 44|# ============================================================================= - 45|# City Room Messages - 46|# ============================================================================= - 47| - 48|class CityRoomMessageBase(BaseModel): - 49| body: str = Field(..., min_length=1, max_length=10000) - 50| - 51| - 52|class CityRoomMessageCreate(CityRoomMessageBase): - 53| pass - 54| - 55| - 56|class CityRoomMessageRead(CityRoomMessageBase): - 57| id: str - 58| room_id: str - 59| author_user_id: Optional[str] = None - 60| author_agent_id: Optional[str] = None - 61| username: Optional[str] = "Anonymous" # Для frontend - 62| created_at: datetime - 63| - 64| - 65|# ============================================================================= - 66|# City Room Detail (з повідомленнями) - 67|# ============================================================================= - 68| - 69|class CityRoomDetail(CityRoomRead): - 70| messages: List[CityRoomMessageRead] = [] - 71| online_members: List[str] = [] # user_ids - 72| - 73| - 74|# ============================================================================= - 75|# City Feed Events - 76|# ============================================================================= - 77| - 78|class CityFeedEventRead(BaseModel): - 79| id: str - 80| kind: str # 'room_message', 'agent_reply', 'system', 'dao_event' - 81| room_id: Optional[str] = None - 82| user_id: Optional[str] = None - 83| agent_id: Optional[str] = None - 84| payload: dict - 85| created_at: datetime - 86| - 87| - 88|# ============================================================================= - 89|# Presence - 90|# ============================================================================= - 91| - 92|class PresenceUpdate(BaseModel): - 93| user_id: str - 94| status: str # 'online', 'offline', 'away' - 95| last_seen: Optional[datetime] = None - 96| - 97| - 98|class PresenceBulkUpdate(BaseModel): - 99| users: List[PresenceUpdate] - 100| - 101| - 102|# ============================================================================= - 103|# WebSocket Messages - 104|# ============================================================================= - 105| - 106|class WSRoomMessage(BaseModel): - 107| event: str # 'room.message', 'room.join', 'room.leave' - 108| room_id: Optional[str] = None - 109| user_id: Optional[str] = None - 110| message: Optional[CityRoomMessageRead] = None - 111| - 112| - 113|class WSPresenceMessage(BaseModel): - 114| event: str # 'presence.heartbeat', 'presence.update' - 115| user_id: str - 116| status: Optional[str] = None - 117| - 118| - 119|# ============================================================================= - 120|# City Map (2D Map) - 121|# ============================================================================= - 122| - 123|class CityMapRoom(BaseModel): - 124| """Room representation on 2D city map""" - 125| id: str - 126| slug: str - 127| name: str - 128| description: Optional[str] = None - 129| room_type: str = "public" - 130| zone: str = "central" - 131| icon: Optional[str] = None - 132| color: Optional[str] = None - 133| # Map coordinates - 134| x: int = 0 - 135| y: int = 0 - 136| w: int = 1 - 137| h: int = 1 - 138| # Matrix integration - 139| matrix_room_id: Optional[str] = None - 140| - 141| - 142|class CityMapConfig(BaseModel): - 143| """Global city map configuration""" - 144| grid_width: int = 6 - 145| grid_height: int = 3 - 146| cell_size: int = 100 - 147| background_url: Optional[str] = None - 148| - 149| - 150|class CityMapResponse(BaseModel): - 151| """Full city map response""" - 152| config: CityMapConfig - 153| rooms: List[CityMapRoom] - 154| - 155| - 156|# ============================================================================= - 157|# Branding & Assets - 158|# ============================================================================= - 159| - 160|class BrandingUpdatePayload(BaseModel): - 161| logo_url: Optional[str] = None - 162| banner_url: Optional[str] = None - 163| - 164| - 165|class AssetUploadResponse(BaseModel): - 166| original_url: str - 167| processed_url: str - 168| thumb_url: Optional[str] = None - 169| - 170| - 171|# ============================================================================= - 172|# Agents (for Agent Presence) - 173|# ============================================================================= - 174| - 175|class AgentRead(BaseModel): - 176| """Agent representation""" - 177| id: str - 178| display_name: str - 179| kind: str = "assistant" # assistant, civic, oracle, builder - 180| avatar_url: Optional[str] = None - 181| color: str = "cyan" - 182| status: str = "offline" # online, offline, busy - 183| current_room_id: Optional[str] = None - 184| capabilities: List[str] = [] - 185| - 186| - 187|class AgentPresence(BaseModel): - 188| """Agent presence in a room""" - 189| agent_id: str - 190| display_name: str - 191| kind: str - 192| status: str - 193| room_id: Optional[str] = None - 194| color: Optional[str] = None - 195| node_id: Optional[str] = None - 196| district: Optional[str] = None - 197| model: Optional[str] = None - 198| role: Optional[str] = None - 199| avatar_url: Optional[str] = None - 200| - 201| - 202|# ============================================================================= - 203|# Citizens - 204|# ============================================================================= - 205| - 206|class CityPresenceRoomView(BaseModel): - 207| room_id: Optional[str] = None - 208| slug: Optional[str] = None - 209| name: Optional[str] = None - 210| - 211| - 212|class CityPresenceView(BaseModel): - 213| primary_room_slug: Optional[str] = None - 214| rooms: List[CityPresenceRoomView] = [] - 215| - 216| - 217|class HomeNodeView(BaseModel): - 218| """Home node information for agent/citizen""" - 219| id: Optional[str] = None - 220| name: Optional[str] = None - 221| hostname: Optional[str] = None - 222| roles: List[str] = [] - 223| environment: Optional[str] = None - 224| - 225| - 226|class NodeAgentSummary(BaseModel): - 227| """Summary of a node agent (Guardian or Steward)""" - 228| id: str - 229| name: Optional[str] = None - 230| kind: Optional[str] = None - 231| slug: Optional[str] = None - 232| - 233| - 234|class NodeMicrodaoSummary(BaseModel): - 235| """Summary of a MicroDAO hosted on a node (via orchestrator)""" - 236| id: str - 237| slug: str - 238| name: str - 239| rooms_count: int = 0 - 240| - 241| - 242|class NodeMetrics(BaseModel): - 243| """Node metrics for Node Directory cards""" - 244| cpu_model: Optional[str] = None - 245| cpu_cores: int = 0 - 246| cpu_usage: float = 0.0 - 247| gpu_model: Optional[str] = None - 248| gpu_vram_total: int = 0 - 249| gpu_vram_used: int = 0 - 250| ram_total: int = 0 - 251| ram_used: int = 0 - 252| disk_total: int = 0 - 253| disk_used: int = 0 - 254| agent_count_router: int = 0 - 255| agent_count_system: int = 0 - 256| dagi_router_url: Optional[str] = None - 257| swapper_healthy: bool = False - 258| swapper_models_loaded: int = 0 - 259| swapper_models_total: int = 0 - 260| - 261| - 262|class NodeProfile(BaseModel): - 263| """Node profile for Node Directory""" - 264| node_id: str - 265| name: str - 266| hostname: Optional[str] = None - 267| roles: List[str] = [] - 268| environment: str = "unknown" - 269| status: str = "offline" - 270| gpu_info: Optional[str] = None - 271| agents_total: int = 0 - 272| agents_online: int = 0 - 273| last_heartbeat: Optional[str] = None - 274| guardian_agent_id: Optional[str] = None - 275| steward_agent_id: Optional[str] = None - 276| guardian_agent: Optional[NodeAgentSummary] = None - 277| steward_agent: Optional[NodeAgentSummary] = None - 278| microdaos: List[NodeMicrodaoSummary] = [] - 279| metrics: Optional[NodeMetrics] = None - 280| - 281| - 282|class ModelBindings(BaseModel): - 283| """Agent model bindings for AI capabilities""" - 284| primary_model: Optional[str] = None # e.g., "qwen3:8b" - 285| supported_kinds: List[str] = [] # e.g., ["text", "vision", "audio"] - 286| - 287| - 288|class UsageStats(BaseModel): - 289| """Agent usage statistics""" - 290| tokens_total_24h: Optional[int] = None - 291| calls_total_24h: Optional[int] = None - 292| last_active: Optional[str] = None - 293| - 294| - 295|class MicrodaoBadge(BaseModel): - 296| """MicroDAO badge for agent display""" - 297| id: str - 298| name: str - 299| slug: Optional[str] = None - 300| role: Optional[str] = None # orchestrator, member, etc. - 301| is_public: bool = True - 302| is_platform: bool = False - 303| logo_url: Optional[str] = None - 304| banner_url: Optional[str] = None - 305| - 306| - 307|class AgentCrewInfo(BaseModel): - 308| """Information about agent's CrewAI team""" - 309| has_crew_team: bool - 310| crew_team_key: Optional[str] = None - 311| matrix_room_id: Optional[str] = None - 312| - 313| - 314|class AgentSummary(BaseModel): - 315| """Unified Agent summary for Agent Console and Citizens""" - 316| id: str - 317| slug: Optional[str] = None - 318| display_name: str - 319| title: Optional[str] = None # public_title - 320| tagline: Optional[str] = None # public_tagline - 321| kind: str = "assistant" - 322| avatar_url: Optional[str] = None - 323| status: str = "offline" - 324| - 325| # Node info - 326| node_id: Optional[str] = None - 327| node_label: Optional[str] = None # "НОДА1" / "НОДА2" - 328| home_node: Optional[HomeNodeView] = None - 329| - 330| # Governance & DAIS (A1, A2) - 331| gov_level: Optional[str] = None # personal, core_team, orchestrator, district_lead, city_governance - 332| dais_identity_id: Optional[str] = None # DAIS identity reference - 333| - 334| # Visibility & roles - 335| visibility_scope: str = "city" # global, microdao, private - 336| is_listed_in_directory: bool = True - 337| is_system: bool = False - 338| is_public: bool = False - 339| is_orchestrator: bool = False # Can create/manage microDAOs - 340| - 341| # MicroDAO (A3) - 342| primary_microdao_id: Optional[str] = None - 343| primary_microdao_name: Optional[str] = None - 344| primary_microdao_slug: Optional[str] = None - 345| home_microdao_id: Optional[str] = None # Owner microDAO - 346| home_microdao_name: Optional[str] = None - 347| home_microdao_slug: Optional[str] = None - 348| district: Optional[str] = None - 349| microdaos: List[MicrodaoBadge] = [] - 350| microdao_memberships: List[Dict[str, Any]] = [] # backward compatibility - 351| - 352| # Skills - 353| public_skills: List[str] = [] - 354| - 355| # CrewAI - 356| crew_info: Optional[AgentCrewInfo] = None - 357| - 358| # Future: model bindings and usage stats - 359| model_bindings: Optional[ModelBindings] = None - 360| usage_stats: Optional[UsageStats] = None - 361| - 362| - 363|class PublicCitizenSummary(BaseModel): - 364| slug: str - 365| display_name: str - 366| public_title: Optional[str] = None - 367| public_tagline: Optional[str] = None - 368| avatar_url: Optional[str] = None - 369| kind: Optional[str] = None - 370| district: Optional[str] = None - 371| primary_room_slug: Optional[str] = None - 372| public_skills: List[str] = [] - 373| online_status: Optional[str] = "unknown" - 374| status: Optional[str] = None # backward compatibility - 375| # Home node info - 376| home_node: Optional[HomeNodeView] = None - 377| node_id: Optional[str] = None - 378| - 379| # TASK 037A: Alignment - 380| home_microdao_slug: Optional[str] = None - 381| home_microdao_name: Optional[str] = None - 382| primary_city_room: Optional["CityRoomSummary"] = None - 383| - 384| - 385|class PublicCitizenProfile(BaseModel): - 386| slug: str - 387| display_name: str - 388| kind: Optional[str] = None - 389| public_title: Optional[str] = None - 390| public_tagline: Optional[str] = None - 391| district: Optional[str] = None - 392| avatar_url: Optional[str] = None - 393| status: Optional[str] = None - 394| node_id: Optional[str] = None - 395| public_skills: List[str] = [] - 396| city_presence: Optional[CityPresenceView] = None - 397| dais_public: Dict[str, Any] - 398| interaction: Dict[str, Any] - 399| metrics_public: Dict[str, Any] - 400| admin_panel_url: Optional[str] = None - 401| microdao: Optional[Dict[str, Any]] = None - 402| # Home node info - 403| home_node: Optional[HomeNodeView] = None - 404| - 405| - 406|class CitizenInteractionInfo(BaseModel): - 407| slug: str - 408| display_name: str - 409| primary_room_slug: Optional[str] = None - 410| primary_room_id: Optional[str] = None - 411| primary_room_name: Optional[str] = None - 412| matrix_user_id: Optional[str] = None - 413| district: Optional[str] = None - 414| microdao_slug: Optional[str] = None - 415| microdao_name: Optional[str] = None - 416| - 417| - 418|class CitizenAskRequest(BaseModel): - 419| question: str - 420| context: Optional[str] = None - 421| - 422| - 423|class CitizenAskResponse(BaseModel): - 424| answer: str - 425| agent_display_name: str - 426| agent_id: str - 427| - 428| - 429|# ============================================================================= - 430|# MicroDAO - 431|# ============================================================================= - 432| - 433|class MicrodaoCitizenView(BaseModel): - 434| slug: str - 435| display_name: str - 436| public_title: Optional[str] = None - 437| public_tagline: Optional[str] = None - 438| avatar_url: Optional[str] = None - 439| district: Optional[str] = None - 440| primary_room_slug: Optional[str] = None - 441| - 442| - 443|class MicrodaoSummary(BaseModel): - 444| """MicroDAO summary for list view""" - 445| id: str - 446| slug: str - 447| name: str - 448| description: Optional[str] = None - 449| district: Optional[str] = None - 450| - 451| # Visibility & type - 452| is_public: bool = True - 453| is_platform: bool = False # Is a platform/district - 454| is_active: bool = True - 455| - 456| # Orchestrator - 457| orchestrator_agent_id: Optional[str] = None - 458| orchestrator_agent_name: Optional[str] = None - 459| - 460| # Hierarchy - 461| parent_microdao_id: Optional[str] = None - 462| parent_microdao_slug: Optional[str] = None - 463| - 464| # Stats - 465| logo_url: Optional[str] = None - 466| banner_url: Optional[str] = None - 467| member_count: int = 0 # alias for agents_count - 468| agents_count: int = 0 # backward compatibility - 469| room_count: int = 0 # alias for rooms_count - 470| rooms_count: int = 0 # backward compatibility - 471| channels_count: int = 0 - 472| - 473| - 474|class MicrodaoChannelView(BaseModel): - 475| """Channel/integration view for MicroDAO""" - 476| kind: str # 'matrix' | 'telegram' | 'city_room' | 'crew' - 477| ref_id: str - 478| display_name: Optional[str] = None - 479| is_primary: bool - 480| - 481| - 482|class MicrodaoAgentView(BaseModel): - 483| """Agent view within MicroDAO""" - 484| agent_id: str - 485| display_name: str - 486| role: Optional[str] = None - 487| is_core: bool - 488| - 489| - 490|class CityRoomSummary(BaseModel): - 491| """Summary of a city room for chat embedding and multi-room support""" - 492| id: str - 493| slug: str - 494| name: str - 495| matrix_room_id: Optional[str] = None - 496| microdao_id: Optional[str] = None - 497| microdao_slug: Optional[str] = None - 498| room_role: Optional[str] = None # 'primary', 'lobby', 'team', 'research', 'security', 'governance', 'orchestrator_team' - 499| is_public: bool = True - 500| sort_order: int = 100 - 501| logo_url: Optional[str] = None - 502| banner_url: Optional[str] = None - 503| - 504| - 505|class MicrodaoRoomsList(BaseModel): - 506| """List of rooms belonging to a MicroDAO""" - 507| microdao_id: str - 508| microdao_slug: str - 509| rooms: List[CityRoomSummary] = [] - 510| - 511| - 512|class MicrodaoRoomUpdate(BaseModel): - 513| """Update request for MicroDAO room settings""" - 514| room_role: Optional[str] = None - 515| is_public: Optional[bool] = None - 516| sort_order: Optional[int] = None - 517| set_primary: Optional[bool] = None # if true, mark as primary - 518| - 519| - 520|class AttachExistingRoomRequest(BaseModel): - 521| """Request to attach an existing city room to a MicroDAO""" - 522| room_id: str - 523| room_role: Optional[str] = None - 524| is_public: bool = True - 525| sort_order: int = 100 - 526| - 527| - 528|class MicrodaoDetail(BaseModel): - 529| """Full MicroDAO detail view""" - 530| id: str - 531| slug: str - 532| name: str - 533| description: Optional[str] = None - 534| district: Optional[str] = None - 535| - 536| # Visibility & type - 537| is_public: bool = True - 538| is_platform: bool = False - 539| is_active: bool = True - 540| - 541| # Orchestrator - 542| orchestrator_agent_id: Optional[str] = None - 543| orchestrator_display_name: Optional[str] = None - 544| - 545| # Hierarchy - 546| parent_microdao_id: Optional[str] = None - 547| parent_microdao_slug: Optional[str] = None - 548| child_microdaos: List["MicrodaoSummary"] = [] - 549| - 550| # Content - 551| logo_url: Optional[str] = None - 552| banner_url: Optional[str] = None - 553| agents: List[MicrodaoAgentView] = [] - 554| channels: List[MicrodaoChannelView] = [] - 555| - 556| # Multi-room support - 557| rooms: List[CityRoomSummary] = [] - 558| public_citizens: List[MicrodaoCitizenView] = [] - 559| - 560| # Primary city room for chat - 561| primary_city_room: Optional[CityRoomSummary] = None - 562| - 563| - 564|class AgentMicrodaoMembership(BaseModel): - 565| microdao_id: str - 566| microdao_slug: str - 567| microdao_name: str - 568| logo_url: Optional[str] = None - 569| role: Optional[str] = None - 570| is_core: bool = False - 571| - 572| - 573|class MicrodaoOption(BaseModel): - 574| id: str - 575| slug: str - 576| name: str - 577| district: Optional[str] = None - 578| is_active: bool = True - 579| - 580| - 581|# ============================================================================= - 582|# Visibility Updates (Task 029) - 583|# ============================================================================= - 584| - 585|class AgentVisibilityUpdate(BaseModel): - 586| """Update agent visibility settings""" - 587| is_public: bool - 588| visibility_scope: Optional[str] = None # 'global' | 'microdao' | 'private' - 589| - 590| - 591|class MicrodaoVisibilityUpdate(BaseModel): - 592| """Update MicroDAO visibility settings""" - 593| is_public: bool - 594| is_platform: Optional[bool] = None # Upgrade to platform/district - 595| - 596| - 597|class MicrodaoCreateRequest(BaseModel): - 598| """Request to create MicroDAO from agent (orchestrator flow)""" - 599| name: str - 600| slug: str - 601| description: Optional[str] = None - 602| make_platform: bool = False # If true -> is_platform = true - 603| is_public: bool = True - 604| parent_microdao_id: Optional[str] = None - 605| - 606| - 607|class SwapperModel(BaseModel): - 608| """Model info from Swapper service""" - 609| name: str - 610| loaded: bool - 611| type: Optional[str] = None - 612| vram_gb: Optional[float] = None - 613| - 614| - 615|class NodeSwapperDetail(BaseModel): - 616| """Detailed Swapper info for Node Cabinet""" - 617| node_id: str - 618| healthy: bool - 619| models_loaded: int - 620| models_total: int - 621| models: List[SwapperModel] = [] +""" +Pydantic Models для City Backend +""" + +from pydantic import BaseModel, Field +from typing import Optional, List, Dict, Any +from datetime import datetime + + +# ============================================================================= +# City Rooms +# ============================================================================= + +class CityRoomBase(BaseModel): + slug: str + name: str + description: Optional[str] = None + + +class CityRoomCreate(CityRoomBase): + pass + + +class CityRoomRead(CityRoomBase): + id: str + is_default: bool + created_at: datetime + created_by: Optional[str] = None + members_online: int = 0 + last_event: Optional[str] = None + # Branding + logo_url: Optional[str] = None + banner_url: Optional[str] = None + # Context + microdao_id: Optional[str] = None + microdao_name: Optional[str] = None + microdao_slug: Optional[str] = None + microdao_logo_url: Optional[str] = None + # Matrix integration + matrix_room_id: Optional[str] = None + matrix_room_alias: Optional[str] = None + + +# ============================================================================= +# City Room Messages +# ============================================================================= + +class CityRoomMessageBase(BaseModel): + body: str = Field(..., min_length=1, max_length=10000) + + +class CityRoomMessageCreate(CityRoomMessageBase): + pass + + +class CityRoomMessageRead(CityRoomMessageBase): + id: str + room_id: str + author_user_id: Optional[str] = None + author_agent_id: Optional[str] = None + username: Optional[str] = "Anonymous" # Для frontend + created_at: datetime + + +# ============================================================================= +# City Room Detail (з повідомленнями) +# ============================================================================= + +class CityRoomDetail(CityRoomRead): + messages: List[CityRoomMessageRead] = [] + online_members: List[str] = [] # user_ids + + +# ============================================================================= +# City Feed Events +# ============================================================================= + +class CityFeedEventRead(BaseModel): + id: str + kind: str # 'room_message', 'agent_reply', 'system', 'dao_event' + room_id: Optional[str] = None + user_id: Optional[str] = None + agent_id: Optional[str] = None + payload: dict + created_at: datetime + + +# ============================================================================= +# Presence +# ============================================================================= + +class PresenceUpdate(BaseModel): + user_id: str + status: str # 'online', 'offline', 'away' + last_seen: Optional[datetime] = None + + +class PresenceBulkUpdate(BaseModel): + users: List[PresenceUpdate] + + +# ============================================================================= +# WebSocket Messages +# ============================================================================= + +class WSRoomMessage(BaseModel): + event: str # 'room.message', 'room.join', 'room.leave' + room_id: Optional[str] = None + user_id: Optional[str] = None + message: Optional[CityRoomMessageRead] = None + + +class WSPresenceMessage(BaseModel): + event: str # 'presence.heartbeat', 'presence.update' + user_id: str + status: Optional[str] = None + + +# ============================================================================= +# City Map (2D Map) +# ============================================================================= + +class CityMapRoom(BaseModel): + """Room representation on 2D city map""" + id: str + slug: str + name: str + description: Optional[str] = None + room_type: str = "public" + zone: str = "central" + icon: Optional[str] = None + color: Optional[str] = None + # Map coordinates + x: int = 0 + y: int = 0 + w: int = 1 + h: int = 1 + # Matrix integration + matrix_room_id: Optional[str] = None + + +class CityMapConfig(BaseModel): + """Global city map configuration""" + grid_width: int = 6 + grid_height: int = 3 + cell_size: int = 100 + background_url: Optional[str] = None + + +class CityMapResponse(BaseModel): + """Full city map response""" + config: CityMapConfig + rooms: List[CityMapRoom] + + +# ============================================================================= +# Branding & Assets +# ============================================================================= + +class BrandingUpdatePayload(BaseModel): + logo_url: Optional[str] = None + banner_url: Optional[str] = None + + +class AssetUploadResponse(BaseModel): + original_url: str + processed_url: str + thumb_url: Optional[str] = None + + +# ============================================================================= +# Agents (for Agent Presence) +# ============================================================================= + +class AgentRead(BaseModel): + """Agent representation""" + id: str + display_name: str + kind: str = "assistant" # assistant, civic, oracle, builder + avatar_url: Optional[str] = None + color: str = "cyan" + status: str = "offline" # online, offline, busy + current_room_id: Optional[str] = None + capabilities: List[str] = [] + + +class AgentPresence(BaseModel): + """Agent presence in a room""" + agent_id: str + display_name: str + kind: str + status: str + room_id: Optional[str] = None + color: Optional[str] = None + node_id: Optional[str] = None + district: Optional[str] = None + model: Optional[str] = None + role: Optional[str] = None + avatar_url: Optional[str] = None + + +# ============================================================================= +# Citizens +# ============================================================================= + +class CityPresenceRoomView(BaseModel): + room_id: Optional[str] = None + slug: Optional[str] = None + name: Optional[str] = None + + +class CityPresenceView(BaseModel): + primary_room_slug: Optional[str] = None + rooms: List[CityPresenceRoomView] = [] + + +class HomeNodeView(BaseModel): + """Home node information for agent/citizen""" + id: Optional[str] = None + name: Optional[str] = None + hostname: Optional[str] = None + roles: List[str] = [] + environment: Optional[str] = None + + +class NodeAgentSummary(BaseModel): + """Summary of a node agent (Guardian or Steward)""" + id: str + name: Optional[str] = None + kind: Optional[str] = None + slug: Optional[str] = None + + +class NodeMicrodaoSummary(BaseModel): + """Summary of a MicroDAO hosted on a node (via orchestrator)""" + id: str + slug: str + name: str + rooms_count: int = 0 + + +class NodeMetrics(BaseModel): + """Node metrics for Node Directory cards""" + cpu_model: Optional[str] = None + cpu_cores: int = 0 + cpu_usage: float = 0.0 + gpu_model: Optional[str] = None + gpu_vram_total: int = 0 + gpu_vram_used: int = 0 + ram_total: int = 0 + ram_used: int = 0 + disk_total: int = 0 + disk_used: int = 0 + agent_count_router: int = 0 + agent_count_system: int = 0 + dagi_router_url: Optional[str] = None + + +class NodeProfile(BaseModel): + """Node profile for Node Directory""" + node_id: str + name: str + hostname: Optional[str] = None + roles: List[str] = [] + environment: str = "unknown" + status: str = "offline" + gpu_info: Optional[str] = None + agents_total: int = 0 + agents_online: int = 0 + last_heartbeat: Optional[str] = None + guardian_agent_id: Optional[str] = None + steward_agent_id: Optional[str] = None + guardian_agent: Optional[NodeAgentSummary] = None + steward_agent: Optional[NodeAgentSummary] = None + microdaos: List[NodeMicrodaoSummary] = [] + metrics: Optional[NodeMetrics] = None + + +class ModelBindings(BaseModel): + """Agent model bindings for AI capabilities""" + primary_model: Optional[str] = None # e.g., "qwen3:8b" + supported_kinds: List[str] = [] # e.g., ["text", "vision", "audio"] + + +class UsageStats(BaseModel): + """Agent usage statistics""" + tokens_total_24h: Optional[int] = None + calls_total_24h: Optional[int] = None + last_active: Optional[str] = None + + +class MicrodaoBadge(BaseModel): + """MicroDAO badge for agent display""" + id: str + name: str + slug: Optional[str] = None + role: Optional[str] = None # orchestrator, member, etc. + is_public: bool = True + is_platform: bool = False + logo_url: Optional[str] = None + banner_url: Optional[str] = None + + +class AgentCrewInfo(BaseModel): + """Information about agent's CrewAI team""" + has_crew_team: bool + crew_team_key: Optional[str] = None + matrix_room_id: Optional[str] = None + + +class AgentSummary(BaseModel): + """Unified Agent summary for Agent Console and Citizens""" + id: str + slug: Optional[str] = None + display_name: str + title: Optional[str] = None # public_title + tagline: Optional[str] = None # public_tagline + kind: str = "assistant" + avatar_url: Optional[str] = None + status: str = "offline" + + # Node info + node_id: Optional[str] = None + node_label: Optional[str] = None # "НОДА1" / "НОДА2" + home_node: Optional[HomeNodeView] = None + + # Governance & DAIS (A1, A2) + gov_level: Optional[str] = None # personal, core_team, orchestrator, district_lead, city_governance + dais_identity_id: Optional[str] = None # DAIS identity reference + + # Visibility & roles + visibility_scope: str = "city" # global, microdao, private + is_listed_in_directory: bool = True + is_system: bool = False + is_public: bool = False + is_orchestrator: bool = False # Can create/manage microDAOs + + # MicroDAO (A3) + primary_microdao_id: Optional[str] = None + primary_microdao_name: Optional[str] = None + primary_microdao_slug: Optional[str] = None + home_microdao_id: Optional[str] = None # Owner microDAO + home_microdao_name: Optional[str] = None + home_microdao_slug: Optional[str] = None + district: Optional[str] = None + microdaos: List[MicrodaoBadge] = [] + microdao_memberships: List[Dict[str, Any]] = [] # backward compatibility + + # Skills + public_skills: List[str] = [] + + # CrewAI + crew_info: Optional[AgentCrewInfo] = None + + # Future: model bindings and usage stats + model_bindings: Optional[ModelBindings] = None + usage_stats: Optional[UsageStats] = None + + +class PublicCitizenSummary(BaseModel): + slug: str + display_name: str + public_title: Optional[str] = None + public_tagline: Optional[str] = None + avatar_url: Optional[str] = None + kind: Optional[str] = None + district: Optional[str] = None + primary_room_slug: Optional[str] = None + public_skills: List[str] = [] + online_status: Optional[str] = "unknown" + status: Optional[str] = None # backward compatibility + # Home node info + home_node: Optional[HomeNodeView] = None + node_id: Optional[str] = None + + # TASK 037A: Alignment + home_microdao_slug: Optional[str] = None + home_microdao_name: Optional[str] = None + primary_city_room: Optional["CityRoomSummary"] = None + + +class PublicCitizenProfile(BaseModel): + slug: str + display_name: str + kind: Optional[str] = None + public_title: Optional[str] = None + public_tagline: Optional[str] = None + district: Optional[str] = None + avatar_url: Optional[str] = None + status: Optional[str] = None + node_id: Optional[str] = None + public_skills: List[str] = [] + city_presence: Optional[CityPresenceView] = None + dais_public: Dict[str, Any] + interaction: Dict[str, Any] + metrics_public: Dict[str, Any] + admin_panel_url: Optional[str] = None + microdao: Optional[Dict[str, Any]] = None + # Home node info + home_node: Optional[HomeNodeView] = None + + +class CitizenInteractionInfo(BaseModel): + slug: str + display_name: str + primary_room_slug: Optional[str] = None + primary_room_id: Optional[str] = None + primary_room_name: Optional[str] = None + matrix_user_id: Optional[str] = None + district: Optional[str] = None + microdao_slug: Optional[str] = None + microdao_name: Optional[str] = None + + +class CitizenAskRequest(BaseModel): + question: str + context: Optional[str] = None + + +class CitizenAskResponse(BaseModel): + answer: str + agent_display_name: str + agent_id: str + + +# ============================================================================= +# MicroDAO +# ============================================================================= + +class MicrodaoCitizenView(BaseModel): + slug: str + display_name: str + public_title: Optional[str] = None + public_tagline: Optional[str] = None + avatar_url: Optional[str] = None + district: Optional[str] = None + primary_room_slug: Optional[str] = None + + +class MicrodaoSummary(BaseModel): + """MicroDAO summary for list view""" + id: str + slug: str + name: str + description: Optional[str] = None + district: Optional[str] = None + + # Visibility & type + is_public: bool = True + is_platform: bool = False # Is a platform/district + is_active: bool = True + + # Orchestrator + orchestrator_agent_id: Optional[str] = None + orchestrator_agent_name: Optional[str] = None + + # Hierarchy + parent_microdao_id: Optional[str] = None + parent_microdao_slug: Optional[str] = None + + # Stats + logo_url: Optional[str] = None + banner_url: Optional[str] = None + member_count: int = 0 # alias for agents_count + agents_count: int = 0 # backward compatibility + room_count: int = 0 # alias for rooms_count + rooms_count: int = 0 # backward compatibility + channels_count: int = 0 + + +class MicrodaoChannelView(BaseModel): + """Channel/integration view for MicroDAO""" + kind: str # 'matrix' | 'telegram' | 'city_room' | 'crew' + ref_id: str + display_name: Optional[str] = None + is_primary: bool + + +class MicrodaoAgentView(BaseModel): + """Agent view within MicroDAO""" + agent_id: str + display_name: str + role: Optional[str] = None + is_core: bool + + +class CityRoomSummary(BaseModel): + """Summary of a city room for chat embedding and multi-room support""" + id: str + slug: str + name: str + matrix_room_id: Optional[str] = None + microdao_id: Optional[str] = None + microdao_slug: Optional[str] = None + room_role: Optional[str] = None # 'primary', 'lobby', 'team', 'research', 'security', 'governance', 'orchestrator_team' + is_public: bool = True + sort_order: int = 100 + logo_url: Optional[str] = None + banner_url: Optional[str] = None + + +class MicrodaoRoomsList(BaseModel): + """List of rooms belonging to a MicroDAO""" + microdao_id: str + microdao_slug: str + rooms: List[CityRoomSummary] = [] + + +class MicrodaoRoomUpdate(BaseModel): + """Update request for MicroDAO room settings""" + room_role: Optional[str] = None + is_public: Optional[bool] = None + sort_order: Optional[int] = None + set_primary: Optional[bool] = None # if true, mark as primary + + +class AttachExistingRoomRequest(BaseModel): + """Request to attach an existing city room to a MicroDAO""" + room_id: str + room_role: Optional[str] = None + is_public: bool = True + sort_order: int = 100 + + +class MicrodaoDetail(BaseModel): + """Full MicroDAO detail view""" + id: str + slug: str + name: str + description: Optional[str] = None + district: Optional[str] = None + + # Visibility & type + is_public: bool = True + is_platform: bool = False + is_active: bool = True + + # Orchestrator + orchestrator_agent_id: Optional[str] = None + orchestrator_display_name: Optional[str] = None + + # Hierarchy + parent_microdao_id: Optional[str] = None + parent_microdao_slug: Optional[str] = None + child_microdaos: List["MicrodaoSummary"] = [] + + # Content + logo_url: Optional[str] = None + banner_url: Optional[str] = None + agents: List[MicrodaoAgentView] = [] + channels: List[MicrodaoChannelView] = [] + + # Multi-room support + rooms: List[CityRoomSummary] = [] + public_citizens: List[MicrodaoCitizenView] = [] + + # Primary city room for chat + primary_city_room: Optional[CityRoomSummary] = None + + +class AgentMicrodaoMembership(BaseModel): + microdao_id: str + microdao_slug: str + microdao_name: str + logo_url: Optional[str] = None + role: Optional[str] = None + is_core: bool = False + + +class MicrodaoOption(BaseModel): + id: str + slug: str + name: str + district: Optional[str] = None + is_active: bool = True + + +# ============================================================================= +# Visibility Updates (Task 029) +# ============================================================================= + +class AgentVisibilityUpdate(BaseModel): + """Update agent visibility settings""" + is_public: bool + visibility_scope: Optional[str] = None # 'global' | 'microdao' | 'private' + + +class MicrodaoVisibilityUpdate(BaseModel): + """Update MicroDAO visibility settings""" + is_public: bool + is_platform: Optional[bool] = None # Upgrade to platform/district + + +class MicrodaoCreateRequest(BaseModel): + """Request to create MicroDAO from agent (orchestrator flow)""" + name: str + slug: str + description: Optional[str] = None + make_platform: bool = False # If true -> is_platform = true + is_public: bool = True + parent_microdao_id: Optional[str] = None + + +class SwapperModel(BaseModel): + """Model info from Swapper service""" + name: str + loaded: bool + type: Optional[str] = None + vram_gb: Optional[float] = None + + +class NodeSwapperDetail(BaseModel): + """Detailed Swapper info for Node Cabinet""" + node_id: str + healthy: bool + models_loaded: int + models_total: int + models: List[SwapperModel] = [] diff --git a/services/city-service/repo_city.py b/services/city-service/repo_city.py index 7763cffe..32a47ec3 100644 --- a/services/city-service/repo_city.py +++ b/services/city-service/repo_city.py @@ -1,3 +1,3296 @@ +""" +Repository для City Backend (PostgreSQL) +""" + +import os +import uuid +import asyncpg +import json +from typing import Optional, List, Dict, Any, Tuple +from datetime import datetime, timezone +import secrets +import httpx +import logging + +logger = logging.getLogger(__name__) + +# Database connection +_pool: Optional[asyncpg.Pool] = None + +MATRIX_GATEWAY_URL = os.getenv("MATRIX_GATEWAY_URL", "http://matrix-gateway:8000") + +async def get_pool() -> asyncpg.Pool: + """Отримати connection pool""" + global _pool + + if _pool is None: + database_url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/daarion") + _pool = await asyncpg.create_pool(database_url, min_size=2, max_size=10) + + return _pool + + +async def close_pool(): + """Закрити connection pool""" + global _pool + if _pool is not None: + await _pool.close() + _pool = None + + +def generate_id(prefix: str) -> str: + """Генерувати простий ID""" + return f"{prefix}_{secrets.token_urlsafe(12)}" + + +def _normalize_capabilities(value: Any) -> List[str]: + """Ensure capabilities are returned as a list.""" + if value is None: + return [] + if isinstance(value, list): + return value + if isinstance(value, str): + import json + try: + return json.loads(value) + except Exception: + return [] + return list(value) + + +# ============================================================================= +# City Rooms Repository +# ============================================================================= + +async def get_all_rooms(limit: int = 100, offset: int = 0) -> List[dict]: + """Отримати всі кімнати з додатковими полями""" + pool = await get_pool() + + query = """ + SELECT + cr.id, cr.slug, cr.name, cr.description, + cr.room_type, cr.owner_type, cr.owner_id, cr.space_scope, cr.visibility, + cr.is_default, cr.is_public, cr.sort_order, + cr.created_at, cr.created_by, + cr.matrix_room_id, cr.matrix_room_alias, + cr.logo_url, cr.banner_url, + cr.room_role + FROM city_rooms cr + WHERE cr.is_public = true OR cr.space_scope = 'city' + ORDER BY cr.sort_order ASC, cr.is_default DESC, cr.created_at DESC + LIMIT $1 OFFSET $2 + """ + + rows = await pool.fetch(query, limit, offset) + return [dict(row) for row in rows] + + +async def get_city_rooms_for_list(limit: int = 100) -> List[dict]: + """Отримати City Rooms для відображення у списку""" + pool = await get_pool() + + query = """ + SELECT + cr.id, cr.slug, cr.name, cr.description, + cr.room_type, cr.owner_type, cr.owner_id, cr.space_scope, cr.visibility, + cr.is_public, cr.sort_order, + cr.matrix_room_id, cr.matrix_room_alias, + cr.logo_url, cr.banner_url, + cr.room_role, + cr.created_at + FROM city_rooms cr + WHERE cr.space_scope = 'city' AND cr.is_public = true + ORDER BY cr.sort_order ASC, cr.name ASC + LIMIT $1 + """ + + rows = await pool.fetch(query, limit) + return [dict(row) for row in rows] + + +async def get_room_by_id(room_id: str) -> Optional[dict]: + """Отримати кімнату по ID""" + pool = await get_pool() + + query = """ + SELECT + cr.id, cr.slug, cr.name, cr.description, cr.is_default, cr.created_at, cr.created_by, + cr.matrix_room_id, cr.matrix_room_alias, cr.logo_url, cr.banner_url, + cr.microdao_id, m.name AS microdao_name, m.slug AS microdao_slug, m.logo_url AS microdao_logo_url + FROM city_rooms cr + LEFT JOIN microdaos m ON cr.microdao_id::text = m.id + WHERE cr.id = $1 + """ + + row = await pool.fetchrow(query, room_id) + return dict(row) if row else None + + +async def get_room_by_slug(slug: str) -> Optional[dict]: + """Отримати кімнату по slug""" + pool = await get_pool() + + query = """ + SELECT + cr.id, cr.slug, cr.name, cr.description, cr.is_default, cr.created_at, cr.created_by, + cr.matrix_room_id, cr.matrix_room_alias, cr.logo_url, cr.banner_url, + cr.microdao_id, m.name AS microdao_name, m.slug AS microdao_slug, m.logo_url AS microdao_logo_url + FROM city_rooms cr + LEFT JOIN microdaos m ON cr.microdao_id::text = m.id + WHERE cr.slug = $1 + """ + + row = await pool.fetchrow(query, slug) + return dict(row) if row else None + + +async def get_room_by_id(room_id: str) -> Optional[dict]: + """Отримати кімнату по ID (UUID)""" + pool = await get_pool() + + query = """ + SELECT + cr.id, cr.slug, cr.name, cr.description, cr.is_default, cr.created_at, cr.created_by, + cr.matrix_room_id, cr.matrix_room_alias, cr.logo_url, cr.banner_url, + cr.microdao_id, m.name AS microdao_name, m.slug AS microdao_slug, m.logo_url AS microdao_logo_url + FROM city_rooms cr + LEFT JOIN microdaos m ON cr.microdao_id::text = m.id + WHERE cr.id = $1 + """ + + row = await pool.fetchrow(query, room_id) + return dict(row) if row else None + + +async def create_room( + slug: str, + name: str, + description: Optional[str], + created_by: Optional[str], + matrix_room_id: Optional[str] = None, + matrix_room_alias: Optional[str] = None +) -> dict: + """Створити кімнату""" + pool = await get_pool() + + room_id = f"room_city_{slug}" + + query = """ + INSERT INTO city_rooms (id, slug, name, description, created_by, matrix_room_id, matrix_room_alias) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING id, slug, name, description, is_default, created_at, created_by, matrix_room_id, matrix_room_alias + """ + + row = await pool.fetchrow(query, room_id, slug, name, description, created_by, matrix_room_id, matrix_room_alias) + return dict(row) + + +async def update_room_matrix(room_id: str, matrix_room_id: str, matrix_room_alias: str) -> Optional[dict]: + """Оновити Matrix поля кімнати""" + pool = await get_pool() + + query = """ + UPDATE city_rooms + SET matrix_room_id = $2, matrix_room_alias = $3 + WHERE id = $1 + RETURNING id, slug, name, description, is_default, created_at, created_by, matrix_room_id, matrix_room_alias + """ + + row = await pool.fetchrow(query, room_id, matrix_room_id, matrix_room_alias) + return dict(row) + + +async def get_rooms_without_matrix() -> List[dict]: + """Отримати кімнати без Matrix інтеграції""" + pool = await get_pool() + + query = """ + SELECT id, slug, name, description, is_default, created_at, created_by, + matrix_room_id, matrix_room_alias + FROM city_rooms + WHERE matrix_room_id IS NULL + ORDER BY created_at + """ + + rows = await pool.fetch(query) + return [dict(row) for row in rows] + + +# ============================================================================= +# City Room Messages Repository +# ============================================================================= + +async def get_room_messages(room_id: str, limit: int = 50) -> List[dict]: + """Отримати повідомлення кімнати""" + pool = await get_pool() + + query = """ + SELECT id, room_id, author_user_id, author_agent_id, body, created_at + FROM city_room_messages + WHERE room_id = $1 + ORDER BY created_at DESC + LIMIT $2 + """ + + rows = await pool.fetch(query, room_id, limit) + # Reverse для правильного порядку (старі → нові) + return [dict(row) for row in reversed(rows)] + + +async def create_room_message( + room_id: str, + body: str, + author_user_id: Optional[str] = None, + author_agent_id: Optional[str] = None +) -> dict: + """Створити повідомлення в кімнаті""" + pool = await get_pool() + + message_id = generate_id("m_city") + + query = """ + INSERT INTO city_room_messages (id, room_id, author_user_id, author_agent_id, body) + VALUES ($1, $2, $3, $4, $5) + RETURNING id, room_id, author_user_id, author_agent_id, body, created_at + """ + + row = await pool.fetchrow(query, message_id, room_id, author_user_id, author_agent_id, body) + return dict(row) + + +# ============================================================================= +# City Feed Events Repository +# ============================================================================= + +async def get_feed_events(limit: int = 20, offset: int = 0) -> List[dict]: + """Отримати події feed""" + pool = await get_pool() + + query = """ + SELECT id, kind, room_id, user_id, agent_id, payload, created_at + FROM city_feed_events + ORDER BY created_at DESC + LIMIT $1 OFFSET $2 + """ + + rows = await pool.fetch(query, limit, offset) + return [dict(row) for row in rows] + + +async def create_feed_event( + kind: str, + payload: dict, + room_id: Optional[str] = None, + user_id: Optional[str] = None, + agent_id: Optional[str] = None +) -> dict: + """Створити подію в feed""" + pool = await get_pool() + + event_id = generate_id("evt_city") + + query = """ + INSERT INTO city_feed_events (id, kind, room_id, user_id, agent_id, payload) + VALUES ($1, $2, $3, $4, $5, $6::jsonb) + RETURNING id, kind, room_id, user_id, agent_id, payload, created_at + """ + + import json + payload_json = json.dumps(payload) + + row = await pool.fetchrow(query, event_id, kind, room_id, user_id, agent_id, payload_json) + return dict(row) + + +# ============================================================================= +# City Map Repository +# ============================================================================= + +async def get_map_config() -> dict: + """Отримати конфігурацію мапи міста""" + pool = await get_pool() + + query = """ + SELECT id, grid_width, grid_height, cell_size, background_url, updated_at + FROM city_map_config + WHERE id = 'default' + """ + + row = await pool.fetchrow(query) + if row: + return dict(row) + + # Повернути дефолтні значення якщо немає запису + return { + "id": "default", + "grid_width": 6, + "grid_height": 3, + "cell_size": 100, + "background_url": None + } + + +async def get_rooms_for_map() -> List[dict]: + """Отримати кімнати з координатами для 2D мапи""" + pool = await get_pool() + + query = """ + SELECT + id, slug, name, description, + room_type, zone, icon, color, + map_x, map_y, map_w, map_h, + matrix_room_id + FROM city_rooms + ORDER BY map_y, map_x + """ + + rows = await pool.fetch(query) + return [dict(row) for row in rows] + + +# ============================================================================= +# Agents Repository +# ============================================================================= + +async def list_agent_summaries( + *, + node_id: Optional[str] = None, + microdao_id: Optional[str] = None, + is_public: Optional[bool] = None, + visibility_scope: Optional[str] = None, + listed_only: Optional[bool] = None, + kinds: Optional[List[str]] = None, + include_system: bool = True, + include_archived: bool = False, + limit: int = 200, + offset: int = 0 +) -> Tuple[List[dict], int]: + """ + Unified method to list agents with all necessary data. + Used by both Agent Console and Citizens page. + """ + pool = await get_pool() + + params: List[Any] = [] + where_clauses = [] + + # Always filter archived unless explicitly included + if not include_archived: + where_clauses.append("COALESCE(a.is_archived, false) = false") + where_clauses.append("COALESCE(a.is_test, false) = false") + where_clauses.append("a.deleted_at IS NULL") + + if node_id: + params.append(node_id) + where_clauses.append(f"a.node_id = ${len(params)}") + + if microdao_id: + params.append(microdao_id) + where_clauses.append(f"EXISTS (SELECT 1 FROM microdao_agents ma WHERE ma.agent_id = a.id AND ma.microdao_id = ${len(params)})") + + if is_public is not None: + params.append(is_public) + where_clauses.append(f"COALESCE(a.is_public, false) = ${len(params)}") + + if visibility_scope: + params.append(visibility_scope) + where_clauses.append(f"COALESCE(a.visibility_scope, 'city') = ${len(params)}") + + if listed_only is True: + where_clauses.append("COALESCE(a.is_listed_in_directory, true) = true") + elif listed_only is False: + where_clauses.append("COALESCE(a.is_listed_in_directory, true) = false") + + if kinds: + params.append(kinds) + where_clauses.append(f"a.kind = ANY(${len(params)})") + + if not include_system: + where_clauses.append("COALESCE(a.is_system, false) = false") + + where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" + + query = f""" + SELECT + a.id, + COALESCE(a.slug, a.public_slug, LOWER(REPLACE(a.display_name, ' ', '-'))) AS slug, + a.display_name, + COALESCE(a.public_title, '') AS title, + COALESCE(a.public_tagline, '') AS tagline, + a.kind, + a.avatar_url, + COALESCE(a.status, 'offline') AS status, + a.node_id, + nc.node_name AS node_label, + nc.hostname AS node_hostname, + nc.roles AS node_roles, + nc.environment AS node_environment, + COALESCE(a.visibility_scope, 'city') AS visibility_scope, + COALESCE(a.is_listed_in_directory, true) AS is_listed_in_directory, + COALESCE(a.is_system, false) AS is_system, + COALESCE(a.is_public, false) AS is_public, + COALESCE(a.is_orchestrator, false) AS is_orchestrator, + a.primary_microdao_id, + pm.name AS primary_microdao_name, + pm.slug AS primary_microdao_slug, + pm.district AS district, + COALESCE(a.public_skills, ARRAY[]::text[]) AS public_skills, + a.crew_team_key, + -- DAIS & Governance fields (A1, A2) + a.gov_level, + a.dais_identity_id, + a.home_microdao_id, + hm.name AS home_microdao_name, + hm.slug AS home_microdao_slug, + COUNT(*) OVER() AS total_count + FROM agents a + LEFT JOIN node_cache nc ON a.node_id = nc.node_id + LEFT JOIN microdaos pm ON a.primary_microdao_id = pm.id + LEFT JOIN microdaos hm ON a.home_microdao_id = hm.id + WHERE {where_sql} + ORDER BY a.display_name + LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} + """ + + params.append(limit) + params.append(offset) + + rows = await pool.fetch(query, *params) + if not rows: + return [], 0 + + total = rows[0]["total_count"] + items = [] + + for row in rows: + data = dict(row) + data.pop("total_count", None) + + # Build home_node object + if data.get("node_id"): + data["home_node"] = { + "id": data.get("node_id"), + "name": data.get("node_label"), + "hostname": data.get("node_hostname"), + "roles": list(data.get("node_roles") or []), + "environment": data.get("node_environment") + } + else: + data["home_node"] = None + + # Clean up intermediate fields + for key in ["node_hostname", "node_roles", "node_environment"]: + data.pop(key, None) + + # Get MicroDAO memberships + memberships = await get_agent_microdao_memberships(data["id"]) + data["microdaos"] = [ + { + "id": m.get("microdao_id", ""), + "name": m.get("name", ""), + "slug": m.get("slug"), + "role": m.get("role") + } + for m in memberships + ] + data["microdao_memberships"] = memberships # backward compatibility + + data["public_skills"] = list(data.get("public_skills") or []) + + # Populate crew_info + if data.get("crew_team_key"): + # Try to find orchestrator team room for their primary microdao + # This is a bit expensive for list view, so maybe just return basic info + data["crew_info"] = { + "has_crew_team": True, + "crew_team_key": data["crew_team_key"], + "matrix_room_id": None # Loaded lazily if needed + } + else: + data["crew_info"] = { + "has_crew_team": False, + "crew_team_key": None, + "matrix_room_id": None + } + + items.append(data) + + return items, total + + +async def get_all_agents() -> List[dict]: + """Отримати всіх агентів (non-archived) - legacy method""" + pool = await get_pool() + + query = """ + SELECT id, display_name, kind, avatar_url, color, status, + current_room_id, capabilities, created_at, updated_at + FROM agents + WHERE COALESCE(is_archived, false) = false + AND COALESCE(is_test, false) = false + AND deleted_at IS NULL + ORDER BY display_name + """ + + rows = await pool.fetch(query) + return [dict(row) for row in rows] + + +async def update_agent_visibility( + agent_id: str, + *, + is_public: bool, + visibility_scope: Optional[str] = None, +) -> Optional[dict]: + """ + Оновити налаштування видимості агента. + Returns updated agent data or None if not found. + """ + pool = await get_pool() + + # Build dynamic update + set_parts = ["is_public = $2", "updated_at = NOW()"] + params = [agent_id, is_public] + + if visibility_scope is not None: + params.append(visibility_scope) + set_parts.append(f"visibility_scope = ${len(params)}") + + # Also update is_listed_in_directory based on is_public + set_parts.append("is_listed_in_directory = $2") # same as is_public + + query = f""" + UPDATE agents + SET {', '.join(set_parts)} + WHERE id = $1 + AND COALESCE(is_archived, false) = false + AND COALESCE(is_test, false) = false + RETURNING id, display_name, is_public, visibility_scope, is_listed_in_directory + """ + + result = await pool.fetchrow(query, *params) + return dict(result) if result else None + + +async def update_agent_visibility_legacy( + agent_id: str, + visibility_scope: str, + is_listed_in_directory: bool +) -> bool: + """Legacy: Оновити налаштування видимості агента (backward compatibility)""" + pool = await get_pool() + + query = """ + UPDATE agents + SET visibility_scope = $2, + is_listed_in_directory = $3, + is_public = $3, + updated_at = NOW() + WHERE id = $1 + AND COALESCE(is_archived, false) = false + RETURNING id + """ + + result = await pool.fetchrow(query, agent_id, visibility_scope, is_listed_in_directory) + return result is not None + + +async def get_agent_prompts(agent_id: str) -> dict: + """Отримати системні промти агента""" + pool = await get_pool() + + query = """ + SELECT kind, content, version, created_at, note + FROM agent_prompts + WHERE agent_id = $1 + AND is_active = true + ORDER BY kind + """ + + rows = await pool.fetch(query, agent_id) + + result = { + "core": None, + "safety": None, + "governance": None, + "tools": None + } + + for row in rows: + kind = row["kind"] + if kind in result: + result[kind] = { + "content": row["content"], + "version": row["version"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "note": row.get("note") + } + + return result + + +async def get_runtime_prompts(agent_id: str) -> Dict[str, Any]: + """ + Отримати системні промти агента для DAGI Router runtime. + + Returns: + { + "agent_id": str, + "has_prompts": bool, + "prompts": { + "core": str | None, + "safety": str | None, + "governance": str | None, + "tools": str | None + } + } + """ + pool = await get_pool() + + query = """ + SELECT kind, content + FROM agent_prompts + WHERE agent_id = $1 + AND is_active = true + ORDER BY kind + """ + + rows = await pool.fetch(query, agent_id) + + prompts = { + "core": None, + "safety": None, + "governance": None, + "tools": None + } + + for row in rows: + kind = row["kind"] + if kind in prompts: + prompts[kind] = row["content"] + + has_prompts = prompts["core"] is not None + + return { + "agent_id": agent_id, + "has_prompts": has_prompts, + "prompts": prompts + } + + +def build_system_prompt( + agent: Dict[str, Any], + prompts: Dict[str, str], + context: Optional[Dict[str, Any]] = None +) -> str: + """ + Побудувати повний system prompt для LLM виклику. + + Args: + agent: dict з інформацією про агента (name, kind, node_id, district_id, etc.) + prompts: dict з промтами {"core": str, "safety": str, "governance": str, "tools": str} + context: додатковий контекст (node info, district info, user role, etc.) + + Returns: + str - зібраний system prompt + """ + parts = [] + + # Core prompt (required) + if prompts.get("core"): + parts.append(prompts["core"]) + else: + # Fallback: basic prompt from agent info + agent_name = agent.get("display_name") or agent.get("name") or "Agent" + agent_kind = agent.get("kind") or "assistant" + parts.append( + f"You are {agent_name}, an AI {agent_kind} in DAARION.city ecosystem. " + f"Be helpful, accurate, and follow ethical guidelines." + ) + + # Governance rules + if prompts.get("governance"): + parts.append("\n\n## Governance\n" + prompts["governance"]) + + # Safety guidelines + if prompts.get("safety"): + parts.append("\n\n## Safety Guidelines\n" + prompts["safety"]) + + # Tools instructions + if prompts.get("tools"): + parts.append("\n\n## Tools & Capabilities\n" + prompts["tools"]) + + # Context additions + if context: + context_parts = [] + + if context.get("node"): + node = context["node"] + context_parts.append( + f"**Node**: {node.get('name', 'Unknown')} ({node.get('environment', 'unknown')} environment)" + ) + + if context.get("district"): + district = context["district"] + context_parts.append( + f"**District**: {district.get('name', 'Unknown')}" + ) + + if context.get("user_role"): + context_parts.append( + f"**User Role**: {context['user_role']}" + ) + + if context.get("microdao"): + microdao = context["microdao"] + context_parts.append( + f"**MicroDAO**: {microdao.get('name', 'Unknown')}" + ) + + if context_parts: + parts.append("\n\n## Current Context\n" + "\n".join(context_parts)) + + return "\n".join(parts) + + +async def get_agent_with_runtime_prompt(agent_id: str) -> Optional[Dict[str, Any]]: + """ + Отримати агента з зібраним runtime system prompt. + Використовується DAGI Router для inference. + """ + pool = await get_pool() + + # Get agent info + agent_query = """ + SELECT + a.id, a.name, a.display_name, a.kind, a.status, + a.node_id, a.district_id, a.microdao_id, + a.external_id, a.public_slug + FROM agents a + WHERE a.id = $1 OR a.external_id = $2 OR a.public_slug = $3 + LIMIT 1 + """ + + agent_row = await pool.fetchrow(agent_query, agent_id, f"agent:{agent_id}", agent_id) + + if not agent_row: + return None + + agent = dict(agent_row) + + # Get prompts + runtime_data = await get_runtime_prompts(agent["id"]) + + # Build context + context = {} + + # Add node context if agent has node_id + if agent.get("node_id"): + node = await get_node_by_id(agent["node_id"]) + if node: + context["node"] = { + "name": node.get("name"), + "environment": node.get("environment") + } + + # Build full system prompt + system_prompt = build_system_prompt(agent, runtime_data["prompts"], context) + + return { + "agent_id": agent["id"], + "agent_name": agent.get("display_name") or agent.get("name"), + "agent_kind": agent.get("kind"), + "has_prompts": runtime_data["has_prompts"], + "system_prompt": system_prompt, + "prompts": runtime_data["prompts"] + } + + +async def check_agents_prompts_status(agent_ids: List[str]) -> Dict[str, bool]: + """ + Перевірити наявність промтів для списку агентів. + Використовується для індикаторів у UI. + """ + if not agent_ids: + return {} + + pool = await get_pool() + + # Get all agents with at least core prompt + query = """ + SELECT DISTINCT agent_id + FROM agent_prompts + WHERE agent_id = ANY($1) + AND kind = 'core' + AND is_active = true + """ + + rows = await pool.fetch(query, agent_ids) + agents_with_prompts = {row["agent_id"] for row in rows} + + return { + agent_id: agent_id in agents_with_prompts + for agent_id in agent_ids + } + + +async def update_agent_prompt( + agent_id: str, + kind: str, + content: str, + created_by: str, + note: Optional[str] = None +) -> dict: + """ + Оновити або створити системний промт агента. + Деактивує попередню версію та створює нову. + """ + pool = await get_pool() + + valid_kinds = ("core", "safety", "governance", "tools") + if kind not in valid_kinds: + raise ValueError(f"Invalid kind: {kind}. Must be one of {valid_kinds}") + + async with pool.acquire() as conn: + async with conn.transaction(): + # Деактивувати попередню версію + await conn.execute( + """ + UPDATE agent_prompts + SET is_active = false + WHERE agent_id = $1 AND kind = $2 AND is_active = true + """, + agent_id, kind + ) + + # Отримати наступну версію + max_version = await conn.fetchval( + """ + SELECT COALESCE(MAX(version), 0) FROM agent_prompts + WHERE agent_id = $1 AND kind = $2 + """, + agent_id, kind + ) + new_version = max_version + 1 + + # Створити новий запис + row = await conn.fetchrow( + """ + INSERT INTO agent_prompts ( + agent_id, kind, content, version, created_by, note, is_active, created_at + ) + VALUES ($1, $2, $3, $4, $5, $6, true, NOW()) + RETURNING id, agent_id, kind, content, version, created_at, created_by, note + """, + agent_id, kind, content, new_version, created_by, note + ) + + return { + "agent_id": row["agent_id"], + "kind": row["kind"], + "content": row["content"], + "version": row["version"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "updated_at": row["created_at"].isoformat() if row["created_at"] else None, + "updated_by": row["created_by"], + "note": row["note"] + } + + +async def upsert_agent_prompts(agent_id: str, prompts: List[dict], created_by: str) -> List[dict]: + """ + Пакетне оновлення промтів агента. + """ + results = [] + for p in prompts: + res = await update_agent_prompt( + agent_id=agent_id, + kind=p["kind"], + content=p["content"], + created_by=created_by, + note=p.get("note") + ) + results.append(res) + return results + + +async def get_agent_prompt_history(agent_id: str, kind: str, limit: int = 10) -> List[dict]: + """ + Отримати історію версій промту агента. + """ + pool = await get_pool() + + query = """ + SELECT id, version, content, created_at, created_by, note, is_active + FROM agent_prompts + WHERE agent_id = $1 AND kind = $2 + ORDER BY version DESC + LIMIT $3 + """ + + rows = await pool.fetch(query, agent_id, kind, limit) + + return [ + { + "id": str(row["id"]), + "version": row["version"], + "content": row["content"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "created_by": row["created_by"], + "note": row["note"], + "is_active": row["is_active"] + } + for row in rows + ] + + +async def get_agent_public_profile(agent_id: str) -> Optional[dict]: + """Отримати публічний профіль агента""" + pool = await get_pool() + + query = """ + SELECT + is_public, + public_slug, + public_title, + public_tagline, + COALESCE(public_skills, ARRAY[]::text[]) AS public_skills, + public_district, + public_primary_room_slug, + COALESCE(visibility_scope, 'city') AS visibility_scope, + COALESCE(is_listed_in_directory, true) AS is_listed_in_directory, + COALESCE(is_system, false) AS is_system + FROM agents + WHERE id = $1 + """ + + row = await pool.fetchrow(query, agent_id) + if not row: + return None + + return { + "is_public": row["is_public"], + "public_slug": row["public_slug"], + "public_title": row["public_title"], + "public_tagline": row["public_tagline"], + "public_skills": list(row["public_skills"] or []), + "public_district": row["public_district"], + "public_primary_room_slug": row["public_primary_room_slug"], + "visibility_scope": row["visibility_scope"], + "is_listed_in_directory": row["is_listed_in_directory"], + "is_system": row["is_system"] + } + + +async def get_agents_with_home_node( + kind: Optional[str] = None, + node_id: Optional[str] = None, + limit: int = 100, + offset: int = 0 +) -> Tuple[List[dict], int]: + """Отримати агентів з інформацією про home_node""" + pool = await get_pool() + + params: List[Any] = [] + where_clauses = [ + "COALESCE(a.is_archived, false) = false", + "COALESCE(a.is_test, false) = false", + "a.deleted_at IS NULL" + ] + + if kind: + params.append(kind) + where_clauses.append(f"a.kind = ${len(params)}") + + if node_id: + params.append(node_id) + where_clauses.append(f"a.node_id = ${len(params)}") + + where_sql = " AND ".join(where_clauses) + + query = f""" + SELECT + a.id, + a.display_name, + a.kind, + a.avatar_url, + a.status, + a.is_public, + a.public_slug, + a.public_title, + a.public_district, + a.node_id, + nc.node_name AS home_node_name, + nc.hostname AS home_node_hostname, + nc.roles AS home_node_roles, + nc.environment AS home_node_environment, + COUNT(*) OVER() AS total_count + FROM agents a + LEFT JOIN node_cache nc ON a.node_id = nc.node_id + WHERE {where_sql} + ORDER BY a.display_name + LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} + """ + + params.append(limit) + params.append(offset) + + rows = await pool.fetch(query, *params) + if not rows: + return [], 0 + + total = rows[0]["total_count"] + items = [] + + for row in rows: + data = dict(row) + data.pop("total_count", None) + + # Build home_node object + if data.get("node_id"): + data["home_node"] = { + "id": data.get("node_id"), + "name": data.get("home_node_name"), + "hostname": data.get("home_node_hostname"), + "roles": list(data.get("home_node_roles") or []), + "environment": data.get("home_node_environment") + } + else: + data["home_node"] = None + + # Clean up intermediate fields + for key in ["home_node_name", "home_node_hostname", "home_node_roles", "home_node_environment"]: + data.pop(key, None) + + items.append(data) + + return items, total + + +async def get_agents_by_room(room_id: str) -> List[dict]: + """Отримати агентів у конкретній кімнаті""" + pool = await get_pool() + + query = """ + SELECT id, display_name, kind, avatar_url, color, status, + current_room_id, capabilities + FROM agents + WHERE current_room_id = $1 AND status != 'offline' + ORDER BY display_name + """ + + rows = await pool.fetch(query, room_id) + return [dict(row) for row in rows] + + +async def get_online_agents() -> List[dict]: + """Отримати всіх онлайн агентів""" + pool = await get_pool() + + query = """ + SELECT id, display_name, kind, avatar_url, color, status, + current_room_id, capabilities + FROM agents + WHERE status IN ('online', 'busy') + ORDER BY display_name + """ + + rows = await pool.fetch(query) + return [dict(row) for row in rows] + + +async def update_agent_status(agent_id: str, status: str, room_id: Optional[str] = None) -> Optional[dict]: + """Оновити статус агента""" + pool = await get_pool() + + if room_id: + query = """ + UPDATE agents + SET status = $2, current_room_id = $3, updated_at = NOW() + WHERE id = $1 + RETURNING id, display_name, kind, status, current_room_id + """ + row = await pool.fetchrow(query, agent_id, status, room_id) + else: + query = """ + UPDATE agents + SET status = $2, updated_at = NOW() + WHERE id = $1 + RETURNING id, display_name, kind, status, current_room_id + """ + row = await pool.fetchrow(query, agent_id, status) + + return dict(row) if row else None + + +async def get_agent_by_id(agent_id: str) -> Optional[dict]: + """Отримати агента по ID або public_slug""" + pool = await get_pool() + + query = """ + SELECT + a.id, + a.display_name, + a.kind, + a.status, + a.node_id, + a.role, + a.avatar_url, + COALESCE(a.color_hint, a.color, 'cyan') AS color, + a.capabilities, + a.primary_room_slug, + a.public_primary_room_slug, + a.public_district, + a.public_title, + a.public_tagline, + a.public_skills, + a.public_slug, + a.is_public, + a.district AS home_district, + a.crew_team_key, + a.dagi_status, + a.last_seen_at, + COALESCE(a.is_node_guardian, false) as is_node_guardian, + COALESCE(a.is_node_steward, false) as is_node_steward + FROM agents a + WHERE a.id = $1 OR a.public_slug = $1 + """ + + row = await pool.fetchrow(query, agent_id) + if not row: + return None + + agent = dict(row) + agent["capabilities"] = _normalize_capabilities(agent.get("capabilities")) + if agent.get("public_skills") is None: + agent["public_skills"] = [] + + # Populate crew_info + if agent.get("crew_team_key"): + agent["crew_info"] = { + "has_crew_team": True, + "crew_team_key": agent["crew_team_key"], + "matrix_room_id": None # Populated later if needed + } + + # If orchestrator, verify if room exists + # For detailed view, let's try to fetch it + if agent.get("primary_room_slug"): + # Just a placeholder check, logic should be outside or specific method + pass + else: + agent["crew_info"] = { + "has_crew_team": False, + "crew_team_key": None, + "matrix_room_id": None + } + + return agent + + +async def get_agent_public_profile(agent_id: str) -> Optional[dict]: + """Отримати публічний профіль агента""" + pool = await get_pool() + + query = """ + SELECT + is_public, + public_slug, + public_title, + public_tagline, + public_skills, + public_district, + public_primary_room_slug + FROM agents + WHERE id = $1 + """ + + row = await pool.fetchrow(query, agent_id) + if not row: + return None + + result = dict(row) + if result.get("public_skills") is None: + result["public_skills"] = [] + return result + + +async def update_agent_public_profile( + agent_id: str, + is_public: bool, + public_slug: Optional[str], + public_title: Optional[str], + public_tagline: Optional[str], + public_skills: Optional[List[str]], + public_district: Optional[str], + public_primary_room_slug: Optional[str] +) -> Optional[dict]: + """Оновити публічний профіль агента""" + pool = await get_pool() + + query = """ + UPDATE agents + SET + is_public = $2, + public_slug = $3, + public_title = $4, + public_tagline = $5, + public_skills = $6, + public_district = $7, + public_primary_room_slug = $8, + updated_at = NOW() + WHERE id = $1 + RETURNING + is_public, + public_slug, + public_title, + public_tagline, + public_skills, + public_district, + public_primary_room_slug + """ + + row = await pool.fetchrow( + query, + agent_id, + is_public, + public_slug, + public_title, + public_tagline, + public_skills, + public_district, + public_primary_room_slug + ) + + if not row: + return None + + result = dict(row) + if result.get("public_skills") is None: + result["public_skills"] = [] + return result + + +async def get_agent_rooms(agent_id: str) -> List[dict]: + """Отримати список кімнат агента (primary/public)""" + pool = await get_pool() + + query = """ + SELECT primary_room_slug, public_primary_room_slug + FROM agents + WHERE id = $1 + """ + + row = await pool.fetchrow(query, agent_id) + if not row: + return [] + + slugs = [] + if row.get("primary_room_slug"): + slugs.append(row["primary_room_slug"]) + if row.get("public_primary_room_slug") and row["public_primary_room_slug"] not in slugs: + slugs.append(row["public_primary_room_slug"]) + + if not slugs: + return [] + + rooms_query = """ + SELECT id, slug, name + FROM city_rooms + WHERE slug = ANY($1::text[]) + """ + + rooms = await pool.fetch(rooms_query, slugs) + return [dict(room) for room in rooms] + + +async def get_agent_matrix_config(agent_id: str) -> Optional[dict]: + """Отримати Matrix-конфіг агента""" + pool = await get_pool() + + query = """ + SELECT agent_id, matrix_user_id, primary_room_id + FROM agent_matrix_config + WHERE agent_id = $1 + """ + + row = await pool.fetchrow(query, agent_id) + return dict(row) if row else None + + +async def get_public_agent_by_slug(slug: str) -> Optional[dict]: + """Отримати базову інформацію про публічного агента""" + pool = await get_pool() + + query = """ + SELECT + id, + display_name, + public_primary_room_slug, + primary_room_slug, + public_district, + public_title, + public_tagline + FROM agents + WHERE public_slug = $1 + AND is_public = true + LIMIT 1 + """ + + row = await pool.fetchrow(query, slug) + return dict(row) if row else None + + +async def get_microdao_for_agent(agent_id: str) -> Optional[dict]: + """Отримати MicroDAO для агента (аліас get_agent_microdao)""" + return await get_agent_microdao(agent_id) + + +# ============================================================================= +# Citizens Repository +# ============================================================================= + +async def get_public_citizens( + district: Optional[str] = None, + kind: Optional[str] = None, + q: Optional[str] = None, + limit: int = 50, + offset: int = 0 +) -> Tuple[List[dict], int]: + """Отримати публічних громадян""" + pool = await get_pool() + + params: List[Any] = [] + where_clauses = [ + "a.is_public = true", + "COALESCE(a.is_archived, false) = false", + "COALESCE(a.is_test, false) = false", + "a.deleted_at IS NULL", + # ROOMS_LAYER_RESTORE: Include agents with gov_level or specific kinds as citizens + "(a.public_slug IS NOT NULL OR a.gov_level IN ('city_governance', 'district_lead', 'orchestrator', 'core_team') OR a.kind IN ('civic', 'governance', 'orchestrator'))" + ] + + if district: + params.append(district) + where_clauses.append(f"a.public_district = ${len(params)}") + + if kind: + params.append(kind) + where_clauses.append(f"a.kind = ${len(params)}") + + if q: + params.append(f"%{q}%") + where_clauses.append( + f"(a.display_name ILIKE ${len(params)} OR a.public_title ILIKE ${len(params)} OR a.public_tagline ILIKE ${len(params)})" + ) + + where_sql = " AND ".join(where_clauses) + + query = f""" + SELECT + a.id, + a.public_slug, + a.display_name, + a.public_title, + a.public_tagline, + a.avatar_url, + a.kind, + a.public_district, + a.public_primary_room_slug, + COALESCE(a.public_skills, ARRAY[]::text[]) AS public_skills, + COALESCE(a.status, 'unknown') AS status, + a.node_id, + nc.node_name AS home_node_name, + nc.hostname AS home_node_hostname, + nc.roles AS home_node_roles, + nc.environment AS home_node_environment, + -- MicroDAO info + m.slug AS home_microdao_slug, + m.name AS home_microdao_name, + -- Room info + cr.id AS room_id, + cr.slug AS room_slug, + cr.name AS room_name, + cr.matrix_room_id AS room_matrix_id, + COUNT(*) OVER() AS total_count + FROM agents a + LEFT JOIN node_cache nc ON a.node_id = nc.node_id + -- Join primary MicroDAO + LEFT JOIN LATERAL ( + SELECT ma.agent_id, md.slug, md.name + FROM microdao_agents ma + JOIN microdaos md ON ma.microdao_id = md.id + WHERE ma.agent_id = a.id + ORDER BY ma.is_core DESC, md.name + LIMIT 1 + ) m ON true + -- Join primary room (by public_primary_room_slug) + LEFT JOIN city_rooms cr ON cr.slug = a.public_primary_room_slug + WHERE {where_sql} + ORDER BY a.display_name + LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} + """ + + params.append(limit) + params.append(offset) + + rows = await pool.fetch(query, *params) + if not rows: + return [], 0 + + total = rows[0]["total_count"] + items = [] + for row in rows: + data = dict(row) + data.pop("total_count", None) + data["public_skills"] = list(data.get("public_skills") or []) + data["online_status"] = data.get("status") or "unknown" + # Build home_node object + if data.get("node_id"): + data["home_node"] = { + "id": data.get("node_id"), + "name": data.get("home_node_name"), + "hostname": data.get("home_node_hostname"), + "roles": list(data.get("home_node_roles") or []), + "environment": data.get("home_node_environment") + } + else: + data["home_node"] = None + + # Build primary_city_room object + if data.get("room_id"): + data["primary_city_room"] = { + "id": str(data["room_id"]), + "slug": data["room_slug"], + "name": data["room_name"], + "matrix_room_id": data.get("room_matrix_id") + } + else: + data["primary_city_room"] = None + + # Clean up intermediate fields + for key in ["home_node_name", "home_node_hostname", "home_node_roles", "home_node_environment", + "room_id", "room_slug", "room_name", "room_matrix_id"]: + data.pop(key, None) + items.append(data) + + return items, total + + +async def get_agent_microdao(agent_id: str) -> Optional[dict]: + """Отримати MicroDAO, до якого належить агент (перший збіг)""" + pool = await get_pool() + + query = """ + SELECT + m.id, + m.slug, + m.name, + m.district + FROM microdao_agents ma + JOIN microdaos m ON m.id = ma.microdao_id + WHERE ma.agent_id = $1 + ORDER BY ma.is_core DESC, m.name + LIMIT 1 + """ + + row = await pool.fetchrow(query, agent_id) + return dict(row) if row else None + + +async def get_microdao_public_citizens(microdao_id: str) -> List[dict]: + """Отримати публічних громадян конкретного MicroDAO""" + pool = await get_pool() + + query = """ + SELECT + a.public_slug AS slug, + a.display_name, + a.public_title, + a.public_tagline, + a.avatar_url, + a.public_district, + a.public_primary_room_slug + FROM microdao_agents ma + JOIN agents a ON a.id = ma.agent_id + WHERE ma.microdao_id = $1 + AND a.is_public = true + AND a.public_slug IS NOT NULL + ORDER BY a.display_name + """ + + rows = await pool.fetch(query, microdao_id) + result = [] + for row in rows: + data = dict(row) + result.append(data) + return result + + +async def get_public_citizen_by_slug(slug: str) -> Optional[dict]: + """Отримати детальний профіль громадянина""" + pool = await get_pool() + + query = """ + SELECT + a.id, + a.display_name, + a.kind, + a.status, + a.node_id, + a.avatar_url, + a.public_slug, + a.public_title, + a.public_tagline, + COALESCE(a.public_skills, ARRAY[]::text[]) AS public_skills, + a.public_district, + a.public_primary_room_slug, + a.primary_room_slug, + nc.node_name AS home_node_name, + nc.hostname AS home_node_hostname, + nc.roles AS home_node_roles, + nc.environment AS home_node_environment + FROM agents a + LEFT JOIN node_cache nc ON a.node_id = nc.node_id + WHERE a.public_slug = $1 + AND a.is_public = true + LIMIT 1 + """ + + agent_row = await pool.fetchrow(query, slug) + if not agent_row: + return None + + agent = dict(agent_row) + agent["public_skills"] = list(agent.get("public_skills") or []) + + # Build home_node object + home_node = None + if agent.get("node_id"): + home_node = { + "id": agent.get("node_id"), + "name": agent.get("home_node_name"), + "hostname": agent.get("home_node_hostname"), + "roles": list(agent.get("home_node_roles") or []), + "environment": agent.get("home_node_environment") + } + + rooms = await get_agent_rooms(agent["id"]) + primary_room = agent.get("public_primary_room_slug") or agent.get("primary_room_slug") + city_presence = { + "primary_room_slug": primary_room, + "rooms": rooms + } if rooms else { + "primary_room_slug": primary_room, + "rooms": [] + } + + dais_public = { + "core": { + "archetype": agent.get("kind"), + "bio_short": agent.get("public_tagline") + }, + "phenotype": { + "visual": { + "avatar_url": agent.get("avatar_url"), + "color": None + } + }, + "memex": {}, + "economics": {} + } + + interaction = { + "matrix_user": None, + "primary_room_slug": primary_room, + "actions": ["chat", "ask_for_help"] + } + + metrics_public: Dict[str, Any] = {} + + microdao = await get_agent_microdao(agent["id"]) + + return { + "slug": agent["public_slug"], + "display_name": agent["display_name"], + "kind": agent.get("kind"), + "public_title": agent.get("public_title"), + "public_tagline": agent.get("public_tagline"), + "district": agent.get("public_district"), + "avatar_url": agent.get("avatar_url"), + "status": agent.get("status"), + "node_id": agent.get("node_id"), + "public_skills": agent.get("public_skills"), + "city_presence": city_presence, + "dais_public": dais_public, + "interaction": interaction, + "metrics_public": metrics_public, + "microdao": microdao, + "admin_panel_url": f"/agents/{agent['id']}", + "home_node": home_node + } + + +# ============================================================================= +# MicroDAO Membership Repository +# ============================================================================= + +async def get_microdao_options() -> List[dict]: + """Отримати список активних MicroDAO для селектора""" + pool = await get_pool() + + query = """ + SELECT id, slug, name, district, is_active + FROM microdaos + WHERE is_active = true + ORDER BY name + """ + + rows = await pool.fetch(query) + return [dict(row) for row in rows] + + +async def get_agent_microdao_memberships(agent_id: str) -> List[dict]: + """Отримати всі членства агента в MicroDAO""" + pool = await get_pool() + + query = """ + SELECT + ma.microdao_id, + m.slug AS microdao_slug, + m.name AS microdao_name, + m.logo_url, + ma.role, + ma.is_core + FROM microdao_agents ma + JOIN microdaos m ON m.id = ma.microdao_id + WHERE ma.agent_id = $1 + ORDER BY ma.is_core DESC, m.name + """ + + rows = await pool.fetch(query, agent_id) + return [dict(row) for row in rows] + + +async def upsert_agent_microdao_membership( + agent_id: str, + microdao_id: str, + role: Optional[str], + is_core: bool +) -> Optional[dict]: + """Призначити або оновити членство агента в MicroDAO""" + pool = await get_pool() + + query = """ + WITH upsert AS ( + INSERT INTO microdao_agents (microdao_id, agent_id, role, is_core) + VALUES ($1, $2, $3, $4) + ON CONFLICT (microdao_id, agent_id) + DO UPDATE SET role = EXCLUDED.role, is_core = EXCLUDED.is_core + RETURNING microdao_id, agent_id, role, is_core + ) + SELECT + u.microdao_id, + m.slug AS microdao_slug, + m.name AS microdao_name, + u.role, + u.is_core + FROM upsert u + JOIN microdaos m ON m.id = u.microdao_id + """ + + row = await pool.fetchrow(query, microdao_id, agent_id, role, is_core) + return dict(row) if row else None + + +async def remove_agent_microdao_membership(agent_id: str, microdao_id: str) -> bool: + """Видалити членство агента в MicroDAO""" + pool = await get_pool() + + result = await pool.execute( + "DELETE FROM microdao_agents WHERE agent_id = $1 AND microdao_id = $2", + agent_id, + microdao_id + ) + + # asyncpg returns strings like "DELETE 1" + return result.split(" ")[-1] != "0" + + +# ============================================================================= +# MicroDAO Repository +# ============================================================================= + +async def get_microdaos(district: Optional[str] = None, q: Optional[str] = None, limit: int = 50, offset: int = 0) -> List[dict]: + """Отримати список MicroDAOs з агрегованою статистикою""" + pool = await get_pool() + + params = [] + + where_clauses = [ + "m.is_public = true", + "m.is_active = true", + "COALESCE(m.is_archived, false) = false", + "COALESCE(m.is_test, false) = false", + "m.deleted_at IS NULL" + ] + + if district: + params.append(district) + where_clauses.append(f"m.district = ${len(params)}") + + if q: + params.append(f"%{q}%") + where_clauses.append(f"(m.name ILIKE ${len(params)} OR m.description ILIKE ${len(params)})") + + where_sql = " AND ".join(where_clauses) + + query = f""" + SELECT + m.id, + m.slug, + m.name, + m.description, + m.district, + COALESCE(m.orchestrator_agent_id, m.owner_agent_id) as orchestrator_agent_id, + oa.display_name as orchestrator_agent_name, + m.is_active, + COALESCE(m.is_public, true) as is_public, + COALESCE(m.is_platform, false) as is_platform, + m.parent_microdao_id, + pm.slug as parent_microdao_slug, + m.logo_url, + m.banner_url, + COUNT(DISTINCT ma.agent_id) AS agents_count, + COUNT(DISTINCT ma.agent_id) AS member_count, + COUNT(DISTINCT mc.id) AS channels_count, + COUNT(DISTINCT CASE WHEN mc.kind = 'city_room' THEN mc.id END) AS rooms_count, + COUNT(DISTINCT CASE WHEN mc.kind = 'city_room' THEN mc.id END) AS room_count + FROM microdaos m + LEFT JOIN microdao_agents ma ON ma.microdao_id = m.id + LEFT JOIN microdao_channels mc ON mc.microdao_id = m.id + LEFT JOIN agents oa ON COALESCE(m.orchestrator_agent_id, m.owner_agent_id) = oa.id + LEFT JOIN microdaos pm ON m.parent_microdao_id = pm.id + WHERE {where_sql} + GROUP BY m.id, oa.display_name, pm.slug + ORDER BY m.name + LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} + """ + + # Append limit and offset to params + params.append(limit) + params.append(offset) + + rows = await pool.fetch(query, *params) + return [dict(row) for row in rows] + + +async def list_microdao_summaries( + *, + is_public: Optional[bool] = None, + is_platform: Optional[bool] = None, + district: Optional[str] = None, + q: Optional[str] = None, + limit: int = 50, + offset: int = 0 +) -> List[dict]: + """ + Unified method to list microDAOs. + Wraps get_microdaos with additional filtering. + """ + pool = await get_pool() + + params = [] + where_clauses = [ + "COALESCE(m.is_archived, false) = false", + "COALESCE(m.is_test, false) = false", + "m.deleted_at IS NULL", + "m.is_active = true" + ] + + if is_public is not None: + params.append(is_public) + where_clauses.append(f"COALESCE(m.is_public, true) = ${len(params)}") + + if is_platform is not None: + params.append(is_platform) + where_clauses.append(f"COALESCE(m.is_platform, false) = ${len(params)}") + + if district: + params.append(district) + where_clauses.append(f"m.district = ${len(params)}") + + if q: + params.append(f"%{q}%") + where_clauses.append(f"(m.name ILIKE ${len(params)} OR m.description ILIKE ${len(params)})") + + where_sql = " AND ".join(where_clauses) + + query = f""" + SELECT + m.id, + m.slug, + m.name, + m.description, + m.district, + COALESCE(m.orchestrator_agent_id, m.owner_agent_id) as orchestrator_agent_id, + oa.display_name as orchestrator_agent_name, + m.is_active, + COALESCE(m.is_public, true) as is_public, + COALESCE(m.is_platform, false) as is_platform, + m.parent_microdao_id, + pm.slug as parent_microdao_slug, + m.logo_url, + m.banner_url, + COUNT(DISTINCT ma.agent_id) AS agents_count, + COUNT(DISTINCT ma.agent_id) AS member_count, + COUNT(DISTINCT mc.id) AS channels_count, + COUNT(DISTINCT CASE WHEN mc.kind = 'city_room' THEN mc.id END) AS rooms_count, + COUNT(DISTINCT CASE WHEN mc.kind = 'city_room' THEN mc.id END) AS room_count + FROM microdaos m + LEFT JOIN microdao_agents ma ON ma.microdao_id = m.id + LEFT JOIN microdao_channels mc ON mc.microdao_id = m.id + LEFT JOIN agents oa ON COALESCE(m.orchestrator_agent_id, m.owner_agent_id) = oa.id + LEFT JOIN microdaos pm ON m.parent_microdao_id = pm.id + WHERE {where_sql} + GROUP BY m.id, oa.display_name, pm.slug + ORDER BY m.name + LIMIT ${len(params) + 1} OFFSET ${len(params) + 2} + """ + + params.append(limit) + params.append(offset) + + rows = await pool.fetch(query, *params) + return [dict(row) for row in rows] + + +async def get_microdao_detail(slug: str) -> Optional[dict]: + """ + Get detailed microDAO info including agents, channels, children. + Alias for get_microdao_by_slug with clearer naming. + """ + return await get_microdao_by_slug(slug) + + +async def get_microdao_by_slug(slug: str) -> Optional[dict]: + """Отримати детальну інформацію про MicroDAO""" + pool = await get_pool() + + # 1. Get main DAO info + query_dao = """ + SELECT + m.id, + m.slug, + m.name, + m.description, + m.district, + COALESCE(m.orchestrator_agent_id, m.owner_agent_id) as orchestrator_agent_id, + a.display_name as orchestrator_display_name, + m.is_active, + COALESCE(m.is_public, true) as is_public, + COALESCE(m.is_platform, false) as is_platform, + m.parent_microdao_id, + pm.slug as parent_microdao_slug, + m.logo_url, + m.banner_url + FROM microdaos m + LEFT JOIN agents a ON COALESCE(m.orchestrator_agent_id, m.owner_agent_id) = a.id + LEFT JOIN microdaos pm ON m.parent_microdao_id = pm.id + WHERE m.slug = $1 + AND COALESCE(m.is_archived, false) = false + AND COALESCE(m.is_test, false) = false + AND m.deleted_at IS NULL + """ + + dao_row = await pool.fetchrow(query_dao, slug) + if not dao_row: + return None + + result = dict(dao_row) + dao_id = result["id"] + + # 2. Get Agents + query_agents = """ + SELECT + ma.agent_id, + ma.role, + ma.is_core, + a.display_name + FROM microdao_agents ma + JOIN agents a ON ma.agent_id = a.id + WHERE ma.microdao_id = $1 + AND COALESCE(a.is_archived, false) = false + AND COALESCE(a.is_test, false) = false + AND a.deleted_at IS NULL + ORDER BY ma.is_core DESC, ma.role + """ + agents_rows = await pool.fetch(query_agents, dao_id) + result["agents"] = [dict(row) for row in agents_rows] + + # 3. Get Channels + query_channels = """ + SELECT + kind, + ref_id, + display_name, + is_primary + FROM microdao_channels + WHERE microdao_id = $1 + ORDER BY is_primary DESC, kind + """ + channels_rows = await pool.fetch(query_channels, dao_id) + result["channels"] = [dict(row) for row in channels_rows] + + # 4. Get child microDAOs + query_children = """ + SELECT id, slug, name, COALESCE(is_public, true) as is_public, + COALESCE(is_platform, false) as is_platform + FROM microdaos + WHERE parent_microdao_id = $1 + AND COALESCE(is_archived, false) = false + AND COALESCE(is_test, false) = false + AND deleted_at IS NULL + ORDER BY name + """ + children_rows = await pool.fetch(query_children, dao_id) + result["child_microdaos"] = [dict(row) for row in children_rows] + + public_citizens = await get_microdao_public_citizens(dao_id) + result["public_citizens"] = public_citizens + + return result + + +async def update_microdao_branding( + microdao_slug: str, + logo_url: Optional[str] = None, + banner_url: Optional[str] = None +) -> Optional[dict]: + """Оновити брендинг MicroDAO""" + pool = await get_pool() + + set_parts = ["updated_at = NOW()"] + params = [microdao_slug] + + if logo_url is not None: + params.append(logo_url) + set_parts.append(f"logo_url = ${len(params)}") + + if banner_url is not None: + params.append(banner_url) + set_parts.append(f"banner_url = ${len(params)}") + + query = f""" + UPDATE microdaos + SET {', '.join(set_parts)} + WHERE slug = $1 + RETURNING id, slug, name, logo_url, banner_url + """ + + row = await pool.fetchrow(query, *params) + return dict(row) if row else None + + +async def update_room_branding( + room_id: str, + logo_url: Optional[str] = None, + banner_url: Optional[str] = None +) -> Optional[dict]: + """Оновити брендинг кімнати""" + pool = await get_pool() + + set_parts = ["updated_at = NOW()"] + params = [room_id] + + if logo_url is not None: + params.append(logo_url) + set_parts.append(f"logo_url = ${len(params)}") + + if banner_url is not None: + params.append(banner_url) + set_parts.append(f"banner_url = ${len(params)}") + + query = f""" + UPDATE city_rooms + SET {', '.join(set_parts)} + WHERE id = $1 + RETURNING id, slug, name, logo_url, banner_url + """ + + row = await pool.fetchrow(query, *params) + return dict(row) if row else None + + +# ============================================================================= +# Nodes Repository +# ============================================================================= + +async def get_all_nodes() -> List[dict]: + """Отримати список всіх нод з кількістю агентів, Guardian/Steward та метриками. + + ДЖЕРЕЛО ІСТИНИ: + 1. node_registry (якщо існує) + node_cache (метрики) + 2. Fallback: тільки node_cache (для зворотної сумісності) + """ + pool = await get_pool() + + # Перевіримо чи існує node_registry + try: + exists = await pool.fetchval(""" + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'node_registry' + ) + """) + except Exception: + exists = False + + if exists: + # Використовуємо node_registry як джерело істини + query = """ + SELECT + COALESCE(nr.id, nc.node_id) as node_id, + COALESCE(nr.name, nc.node_name) AS name, + COALESCE(nr.hostname, nc.hostname) as hostname, + COALESCE(nr.roles, nc.roles) as roles, + COALESCE(nr.environment, nc.environment) as environment, + COALESCE(nc.status, 'unknown') as status, + nc.gpu, + COALESCE(nc.last_heartbeat, nc.last_sync) AS last_heartbeat, + nc.guardian_agent_id, + nc.steward_agent_id, + -- Metrics + nc.cpu_model, + nc.cpu_cores, + COALESCE(nc.cpu_usage, 0) as cpu_usage, + nc.gpu_model, + COALESCE(nc.gpu_vram_total, 0) as gpu_vram_total, + COALESCE(nc.gpu_vram_used, 0) as gpu_vram_used, + COALESCE(nc.ram_total, 0) as ram_total, + COALESCE(nc.ram_used, 0) as ram_used, + COALESCE(nc.disk_total, 0) as disk_total, + COALESCE(nc.disk_used, 0) as disk_used, + COALESCE(nc.agent_count_router, 0) as agent_count_router, + COALESCE(nc.agent_count_system, 0) as agent_count_system, + nc.last_heartbeat as metrics_heartbeat, + nc.dagi_router_url, + -- Self-healing status (may not exist yet) + NULL as self_healing_status, + -- Registry info + nr.description as node_description, + nr.is_active as registry_active, + nr.last_self_registration, + -- Agent counts (dynamic) + (SELECT COUNT(*) FROM agents a WHERE a.node_id = COALESCE(nr.id, nc.node_id) AND COALESCE(a.is_archived, false) = false AND a.deleted_at IS NULL) AS agents_total, + (SELECT COUNT(*) FROM agents a WHERE a.node_id = COALESCE(nr.id, nc.node_id) AND a.status = 'online' AND COALESCE(a.is_archived, false) = false) AS agents_online, + ga.display_name AS guardian_name, + ga.public_slug AS guardian_slug, + sa.display_name AS steward_name, + sa.public_slug AS steward_slug + FROM node_registry nr + LEFT JOIN node_cache nc ON nc.node_id = nr.id + LEFT JOIN agents ga ON nc.guardian_agent_id = ga.id + LEFT JOIN agents sa ON nc.steward_agent_id = sa.id + WHERE nr.is_active = true + ORDER BY nr.environment DESC, nr.name + """ + try: + rows = await pool.fetch(query) + except Exception as e: + logger.warning(f"node_registry query failed: {e}") + rows = [] + else: + rows = [] + + # Fallback: якщо node_registry не існує або порожній, використовуємо node_cache + if not rows: + logger.info("Using node_cache as fallback for get_all_nodes") + query_fallback = """ + SELECT + nc.node_id, + nc.node_name AS name, + nc.hostname, + nc.roles, + nc.environment, + nc.status, + nc.gpu, + COALESCE(nc.last_heartbeat, nc.last_sync) AS last_heartbeat, + nc.guardian_agent_id, + nc.steward_agent_id, + nc.cpu_model, + nc.cpu_cores, + COALESCE(nc.cpu_usage, 0) as cpu_usage, + nc.gpu_model, + COALESCE(nc.gpu_vram_total, 0) as gpu_vram_total, + COALESCE(nc.gpu_vram_used, 0) as gpu_vram_used, + COALESCE(nc.ram_total, 0) as ram_total, + COALESCE(nc.ram_used, 0) as ram_used, + COALESCE(nc.disk_total, 0) as disk_total, + COALESCE(nc.disk_used, 0) as disk_used, + COALESCE(nc.agent_count_router, 0) as agent_count_router, + COALESCE(nc.agent_count_system, 0) as agent_count_system, + nc.last_heartbeat as metrics_heartbeat, + nc.dagi_router_url, + NULL as self_healing_status, + NULL as node_description, + true as registry_active, + NULL as last_self_registration, + (SELECT COUNT(*) FROM agents a WHERE a.node_id = nc.node_id AND COALESCE(a.is_archived, false) = false AND a.deleted_at IS NULL) AS agents_total, + (SELECT COUNT(*) FROM agents a WHERE a.node_id = nc.node_id AND a.status = 'online' AND COALESCE(a.is_archived, false) = false) AS agents_online, + ga.display_name AS guardian_name, + ga.public_slug AS guardian_slug, + sa.display_name AS steward_name, + sa.public_slug AS steward_slug + FROM node_cache nc + LEFT JOIN agents ga ON nc.guardian_agent_id = ga.id + LEFT JOIN agents sa ON nc.steward_agent_id = sa.id + ORDER BY nc.environment DESC, nc.node_name + """ + try: + rows = await pool.fetch(query_fallback) + except Exception as e: + logger.error(f"Fallback node_cache query also failed: {e}") + rows = [] + + result = [] + for row in rows: + data = dict(row) + # Build guardian_agent object + if data.get("guardian_agent_id"): + data["guardian_agent"] = { + "id": data.get("guardian_agent_id"), + "name": data.get("guardian_name"), + "slug": data.get("guardian_slug"), + } + else: + data["guardian_agent"] = None + # Build steward_agent object + if data.get("steward_agent_id"): + data["steward_agent"] = { + "id": data.get("steward_agent_id"), + "name": data.get("steward_name"), + "slug": data.get("steward_slug"), + } + else: + data["steward_agent"] = None + + # Build metrics object + data["metrics"] = { + "cpu_model": data.get("cpu_model"), + "cpu_cores": data.get("cpu_cores", 0), + "cpu_usage": float(data.get("cpu_usage", 0)), + "gpu_model": data.get("gpu_model"), + "gpu_vram_total": data.get("gpu_vram_total", 0), + "gpu_vram_used": data.get("gpu_vram_used", 0), + "ram_total": data.get("ram_total", 0), + "ram_used": data.get("ram_used", 0), + "disk_total": data.get("disk_total", 0), + "disk_used": data.get("disk_used", 0), + "agent_count_router": data.get("agent_count_router", 0), + "agent_count_system": data.get("agent_count_system", 0), + "dagi_router_url": data.get("dagi_router_url"), + } + + # Clean up internal fields + data.pop("guardian_name", None) + data.pop("steward_name", None) + data.pop("guardian_slug", None) + data.pop("steward_slug", None) + data.pop("cpu_model", None) + data.pop("cpu_cores", None) + data.pop("cpu_usage", None) + data.pop("gpu_model", None) + data.pop("gpu_vram_total", None) + data.pop("gpu_vram_used", None) + data.pop("ram_total", None) + data.pop("ram_used", None) + data.pop("disk_total", None) + data.pop("disk_used", None) + data.pop("agent_count_router", None) + data.pop("agent_count_system", None) + data.pop("dagi_router_url", None) + data.pop("metrics_heartbeat", None) + + result.append(data) + return result + + +async def get_node_by_id(node_id: str) -> Optional[dict]: + """Отримати ноду по ID з Guardian та Steward агентами""" + pool = await get_pool() + + query = """ + SELECT + nc.node_id, + nc.node_name AS name, + nc.hostname, + nc.roles, + nc.environment, + nc.status, + nc.gpu, + nc.last_sync AS last_heartbeat, + nc.guardian_agent_id, + nc.steward_agent_id, + (SELECT COUNT(*) FROM agents a WHERE a.node_id = nc.node_id) AS agents_total, + (SELECT COUNT(*) FROM agents a WHERE a.node_id = nc.node_id AND a.status = 'online') AS agents_online, + -- Guardian agent info + ga.display_name AS guardian_name, + ga.kind AS guardian_kind, + ga.public_slug AS guardian_slug, + -- Steward agent info + sa.display_name AS steward_name, + sa.kind AS steward_kind, + sa.public_slug AS steward_slug + FROM node_cache nc + LEFT JOIN agents ga ON nc.guardian_agent_id = ga.id + LEFT JOIN agents sa ON nc.steward_agent_id = sa.id + WHERE nc.node_id = $1 + """ + + row = await pool.fetchrow(query, node_id) + if not row: + return None + + data = dict(row) + + # Fetch MicroDAOs where orchestrator is on this node + microdaos = await pool.fetch(""" + SELECT m.id, m.slug, m.name, COUNT(cr.id) as rooms_count + FROM microdaos m + JOIN agents a ON m.orchestrator_agent_id = a.id + LEFT JOIN city_rooms cr ON cr.microdao_id::text = m.id + WHERE a.node_id = $1 + GROUP BY m.id, m.slug, m.name + ORDER BY m.name + """, node_id) + + data["microdaos"] = [dict(m) for m in microdaos] + + # Build guardian_agent object + if data.get("guardian_agent_id"): + data["guardian_agent"] = { + "id": data.get("guardian_agent_id"), + "name": data.get("guardian_name"), + "kind": data.get("guardian_kind"), + "slug": data.get("guardian_slug"), + } + else: + data["guardian_agent"] = None + + # Build steward_agent object + if data.get("steward_agent_id"): + data["steward_agent"] = { + "id": data.get("steward_agent_id"), + "name": data.get("steward_name"), + "kind": data.get("steward_kind"), + "slug": data.get("steward_slug"), + } + else: + data["steward_agent"] = None + + # TASK 038: Dynamic discovery of Node Guardian / Steward if cache is empty + if not data["guardian_agent"] or not data["steward_agent"]: + dynamic_agents = await pool.fetch(""" + SELECT id, display_name, kind, public_slug + FROM agents + WHERE node_id = $1 + AND (kind IN ('node_guardian', 'node_steward') OR kind IN ('infra_monitor', 'infra_ops')) + AND COALESCE(is_archived, false) = false + """, node_id) + + if not data["guardian_agent"]: + # Prefer 'node_guardian', fallback to 'infra_monitor' + guardian = next((a for a in dynamic_agents if a['kind'] == 'node_guardian'), + next((a for a in dynamic_agents if a['kind'] == 'infra_monitor'), None)) + if guardian: + data["guardian_agent"] = { + "id": guardian["id"], + "name": guardian["display_name"], + "kind": guardian["kind"], + "slug": guardian["public_slug"] + } + + if not data["steward_agent"]: + # Prefer 'node_steward', fallback to 'infra_ops' + steward = next((a for a in dynamic_agents if a['kind'] == 'node_steward'), + next((a for a in dynamic_agents if a['kind'] == 'infra_ops'), None)) + if steward: + data["steward_agent"] = { + "id": steward["id"], + "name": steward["display_name"], + "kind": steward["kind"], + "slug": steward["public_slug"] + } + + # Clean up intermediate fields + for key in ["guardian_name", "guardian_kind", "guardian_slug", + "steward_name", "steward_kind", "steward_slug"]: + data.pop(key, None) + + return data + + +# ============================================================================= +# MicroDAO Visibility & Creation (Task 029) +# ============================================================================= + +async def update_microdao_visibility( + microdao_id: str, + *, + is_public: bool, + is_platform: Optional[bool] = None, +) -> Optional[dict]: + """ + Оновити налаштування видимості MicroDAO. + Returns updated MicroDAO data or None if not found. + """ + pool = await get_pool() + + set_parts = ["is_public = $2", "updated_at = NOW()"] + params = [microdao_id, is_public] + + if is_platform is not None: + params.append(is_platform) + set_parts.append(f"is_platform = ${len(params)}") + + query = f""" + UPDATE microdaos + SET {', '.join(set_parts)} + WHERE id = $1 + AND COALESCE(is_archived, false) = false + AND COALESCE(is_test, false) = false + RETURNING id, slug, name, is_public, is_platform + """ + + result = await pool.fetchrow(query, *params) + return dict(result) if result else None + + +async def create_microdao_for_agent( + orchestrator_agent_id: str, + *, + name: str, + slug: str, + description: Optional[str] = None, + make_platform: bool = False, + is_public: bool = True, + parent_microdao_id: Optional[str] = None, +) -> Optional[dict]: + """ + Створює microDAO, прив'язує його до агента-оркестратора. + + 1. INSERT новий microDAO + 2. Додати агента в microdao_agents + 3. Оновити агента: primary_microdao_id, is_orchestrator = true + 4. Повернути створений microDAO + """ + pool = await get_pool() + + import uuid + microdao_id = str(uuid.uuid4()) + + async with pool.acquire() as conn: + async with conn.transaction(): + # 1. Create microDAO + insert_dao_query = """ + INSERT INTO microdaos ( + id, slug, name, description, + orchestrator_agent_id, is_public, is_platform, + parent_microdao_id, is_active, created_at + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, true, NOW()) + RETURNING id, slug, name, description, is_public, is_platform + """ + dao_row = await conn.fetchrow( + insert_dao_query, + microdao_id, slug, name, description, + orchestrator_agent_id, is_public, make_platform, + parent_microdao_id + ) + + if not dao_row: + return None + + # 2. Add agent to microdao_agents as orchestrator + insert_member_query = """ + INSERT INTO microdao_agents (microdao_id, agent_id, role, is_core, joined_at) + VALUES ($1, $2, 'orchestrator', true, NOW()) + ON CONFLICT (microdao_id, agent_id) DO UPDATE SET role = 'orchestrator', is_core = true + """ + await conn.execute(insert_member_query, microdao_id, orchestrator_agent_id) + + # 3. Update agent: set primary_microdao_id if empty, set is_orchestrator = true + # Also set public_slug if is_public, so orchestrator becomes a public citizen + update_agent_query = """ + UPDATE agents + SET is_orchestrator = true, + is_public = CASE WHEN $3 THEN true ELSE is_public END, + public_slug = CASE WHEN $3 AND (public_slug IS NULL OR public_slug = '') THEN id ELSE public_slug END, + primary_microdao_id = COALESCE(primary_microdao_id, $2), + updated_at = NOW() + WHERE id = $1 + """ + await conn.execute(update_agent_query, orchestrator_agent_id, microdao_id, is_public) + + return dict(dao_row) + + +async def get_microdao_primary_room(microdao_id: str) -> Optional[dict]: + """ + Отримати основну кімнату MicroDAO для чату. + Пріоритет: room_role='primary' → найнижчий sort_order → перша кімната. + """ + pool = await get_pool() + + query = """ + SELECT + cr.id, + cr.slug, + cr.name, + cr.matrix_room_id, + cr.microdao_id, + cr.room_role, + cr.is_public, + cr.sort_order + FROM city_rooms cr + WHERE cr.microdao_id::text = $1 + ORDER BY + CASE WHEN cr.room_role = 'primary' THEN 0 ELSE 1 END, + cr.sort_order ASC, + cr.name ASC + LIMIT 1 + """ + + row = await pool.fetchrow(query, microdao_id) + if row: + return { + "id": str(row["id"]), + "slug": row["slug"], + "name": row["name"], + "matrix_room_id": row.get("matrix_room_id"), + "microdao_id": str(row["microdao_id"]) if row.get("microdao_id") else None, + "room_role": row.get("room_role"), + "is_public": row.get("is_public", True), + "sort_order": row.get("sort_order", 100) + } + return None + + +async def get_microdao_rooms(microdao_id: str) -> List[dict]: + """ + Отримати всі кімнати MicroDAO, впорядковані за sort_order. + Шукає по microdao_id АБО owner_id (для нових кімнат з owner_type='microdao'). + """ + pool = await get_pool() + + query = """ + SELECT + cr.id, + cr.slug, + cr.name, + cr.matrix_room_id, + COALESCE(cr.microdao_id::text, cr.owner_id) AS microdao_id, + cr.room_role, + cr.is_public, + cr.sort_order, + cr.logo_url, + cr.banner_url, + m.slug AS microdao_slug + FROM city_rooms cr + LEFT JOIN microdaos m ON COALESCE(cr.microdao_id::text, cr.owner_id) = m.id + WHERE cr.microdao_id::text = $1 + OR (cr.owner_type = 'microdao' AND cr.owner_id = $1) + ORDER BY + CASE WHEN cr.room_role = 'primary' THEN 0 ELSE 1 END, + cr.sort_order ASC, + cr.name ASC + """ + + rows = await pool.fetch(query, microdao_id) + return [ + { + "id": str(row["id"]), + "slug": row["slug"], + "name": row["name"], + "matrix_room_id": row.get("matrix_room_id"), + "microdao_id": str(row["microdao_id"]) if row.get("microdao_id") else None, + "microdao_slug": row.get("microdao_slug"), + "room_role": row.get("room_role"), + "is_public": row.get("is_public", True), + "sort_order": row.get("sort_order", 100), + "logo_url": row.get("logo_url"), + "banner_url": row.get("banner_url") + } + for row in rows + ] + + +async def get_microdao_rooms_by_slug(slug: str) -> Optional[dict]: + """ + Отримати MicroDAO та всі його кімнати за slug. + """ + pool = await get_pool() + + # Get microdao first + microdao_query = """ + SELECT id, slug FROM microdaos + WHERE slug = $1 + AND COALESCE(is_archived, false) = false + AND COALESCE(is_test, false) = false + """ + microdao = await pool.fetchrow(microdao_query, slug) + if not microdao: + return None + + microdao_id = str(microdao["id"]) + rooms = await get_microdao_rooms(microdao_id) + + return { + "microdao_id": microdao_id, + "microdao_slug": microdao["slug"], + "rooms": rooms + } + + +async def attach_room_to_microdao( + microdao_id: str, + room_id: str, + room_role: Optional[str] = None, + is_public: bool = True, + sort_order: int = 100 +) -> Optional[dict]: + """ + Прив'язати існуючу кімнату до MicroDAO. + """ + pool = await get_pool() + + query = """ + UPDATE city_rooms + SET microdao_id = $1, + room_role = $2, + is_public = $3, + sort_order = $4 + WHERE id = $5 + RETURNING id, slug, name, matrix_room_id, microdao_id, room_role, is_public, sort_order, logo_url, banner_url + """ + + row = await pool.fetchrow(query, microdao_id, room_role, is_public, sort_order, room_id) + if row: + return { + "id": str(row["id"]), + "slug": row["slug"], + "name": row["name"], + "matrix_room_id": row.get("matrix_room_id"), + "microdao_id": str(row["microdao_id"]) if row.get("microdao_id") else None, + "room_role": row.get("room_role"), + "is_public": row.get("is_public", True), + "sort_order": row.get("sort_order", 100), + "logo_url": row.get("logo_url"), + "banner_url": row.get("banner_url") + } + return None + + +async def update_microdao_room( + microdao_id: str, + room_id: str, + room_role: Optional[str] = None, + is_public: Optional[bool] = None, + sort_order: Optional[int] = None, + set_primary: bool = False +) -> Optional[dict]: + """ + Оновити налаштування кімнати MicroDAO. + Якщо set_primary=True, скидає роль 'primary' з інших кімнат. + """ + pool = await get_pool() + + async with pool.acquire() as conn: + async with conn.transaction(): + # If setting as primary, clear previous primary + if set_primary: + await conn.execute( + """ + UPDATE city_rooms + SET room_role = NULL + WHERE microdao_id = $1 AND room_role = 'primary' + """, + microdao_id + ) + room_role = 'primary' + + # Build update query + set_parts = [] + params = [room_id, microdao_id] + param_idx = 3 + + if room_role is not None: + set_parts.append(f"room_role = ${param_idx}") + params.append(room_role) + param_idx += 1 + + if is_public is not None: + set_parts.append(f"is_public = ${param_idx}") + params.append(is_public) + param_idx += 1 + + if sort_order is not None: + set_parts.append(f"sort_order = ${param_idx}") + params.append(sort_order) + param_idx += 1 + + if not set_parts: + # Nothing to update, just return current state + row = await conn.fetchrow( + "SELECT * FROM city_rooms WHERE id = $1 AND microdao_id = $2", + room_id, microdao_id + ) + else: + query = f""" + UPDATE city_rooms + SET {', '.join(set_parts)} + WHERE id = $1 AND microdao_id = $2 + RETURNING id, slug, name, matrix_room_id, microdao_id, room_role, is_public, sort_order, logo_url, banner_url + """ + row = await conn.fetchrow(query, *params) + + if row: + return { + "id": str(row["id"]), + "slug": row["slug"], + "name": row["name"], + "matrix_room_id": row.get("matrix_room_id"), + "microdao_id": str(row["microdao_id"]) if row.get("microdao_id") else None, + "room_role": row.get("room_role"), + "is_public": row.get("is_public", True), + "sort_order": row.get("sort_order", 100), + "logo_url": row.get("logo_url"), + "banner_url": row.get("banner_url") + } + return None + + +# ============================================================================= +# TASK 044: Orchestrator Crew Team Room +# ============================================================================= + +async def create_matrix_room_for_microdao_orchestrator( + microdao_id: str, + microdao_name: str, + orchestrator_agent_id: str +) -> Optional[dict]: + """ + Викликати Matrix Gateway для створення кімнати команди оркестратора. + """ + # TODO: This should ideally be done with a proper Matrix user (e.g. app bot or the orchestrator agent itself if possible) + # For now, we'll use the system admin user logic in matrix-gateway or a specialized endpoint. + + # Since we are in repo, we don't have the user's token. We rely on matrix-gateway internal API. + async with httpx.AsyncClient(timeout=30.0) as client: + try: + # Ensure matrix room alias is unique + room_alias = f"orchestrator_team_{microdao_id[:8]}" + room_name = f"{microdao_name} — Orchestrator Team" + + # Call Matrix Gateway to create room + # Using /internal/matrix/rooms/create (assuming it exists or we reuse a similar logic) + # If not, we might need to implement it in gateway-bot. + # Let's assume we use a new endpoint or existing one. + # Actually, we can reuse POST /internal/matrix/rooms if it exists or just use bot API. + + # NOTE: In real implementation, we need to authenticate this request or ensure network security. + resp = await client.post( + f"{MATRIX_GATEWAY_URL}/internal/matrix/rooms", + json={ + "alias": room_alias, + "name": room_name, + "topic": "Private team chat for MicroDAO Orchestrator", + "preset": "private_chat", # or public_chat, but team chat usually private + "initial_state": [] + } + ) + + if resp.status_code not in (200, 201): + logger.error(f"Matrix Gateway failed to create room: {resp.text}") + return None + + data = resp.json() + return { + "room_id": data["room_id"], + "room_alias": data.get("room_alias", room_alias) + } + + except Exception as e: + logger.error(f"Failed to create matrix room via gateway: {e}") + return None + + +async def get_or_create_orchestrator_team_room(microdao_id: str) -> Optional[dict]: + """ + Знайти або створити кімнату команди оркестратора для MicroDAO. + """ + pool = await get_pool() + + # 1. Check if room exists in DB + existing_room_query = """ + SELECT + cr.id, cr.slug, cr.name, cr.matrix_room_id, cr.microdao_id, cr.room_role, cr.is_public, cr.sort_order + FROM city_rooms cr + WHERE cr.microdao_id::text = $1 AND cr.room_role = 'orchestrator_team' + LIMIT 1 + """ + room_row = await pool.fetchrow(existing_room_query, microdao_id) + + if room_row: + return dict(room_row) + + # 2. If not, fetch MicroDAO details to create one + microdao_query = """ + SELECT id, name, slug, orchestrator_agent_id + FROM microdaos + WHERE id = $1 + """ + microdao = await pool.fetchrow(microdao_query, microdao_id) + + if not microdao or not microdao["orchestrator_agent_id"]: + logger.warning(f"MicroDAO {microdao_id} not found or has no orchestrator") + return None + + # 3. Create Matrix room + matrix_info = await create_matrix_room_for_microdao_orchestrator( + microdao_id=microdao_id, + microdao_name=microdao["name"], + orchestrator_agent_id=microdao["orchestrator_agent_id"] + ) + + if not matrix_info: + logger.error("Failed to create Matrix room for orchestrator team") + # Fallback: Create DB record without Matrix ID if needed, or fail? + # Let's fail for now as Matrix ID is crucial for this feature. + return None + + # 4. Create DB record + slug = f"{microdao['slug']}-team" + # Ensure unique slug + while True: + check_slug = await pool.fetchrow("SELECT 1 FROM city_rooms WHERE slug = $1", slug) + if not check_slug: + break + slug = f"{slug}-{secrets.token_hex(2)}" + + create_query = """ + INSERT INTO city_rooms ( + id, slug, name, description, created_by, + matrix_room_id, matrix_room_alias, + microdao_id, room_role, is_public, sort_order + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + RETURNING id, slug, name, matrix_room_id, microdao_id, room_role, is_public, sort_order + """ + + room_id = f"room_city_{slug}" + + new_room = await pool.fetchrow( + create_query, + room_id, + slug, + f"{microdao['name']} Team", + "Orchestrator Team Chat", + "system", + matrix_info["room_id"], + matrix_info.get("room_alias"), + microdao_id, + "orchestrator_team", + False, # Private by default + 50 # Sort order (high priority) + ) + + return dict(new_room) + + +# ============================================================================= +# Districts Repository (DB-based, no hardcodes) +# ============================================================================= + +async def get_districts() -> List[Dict[str, Any]]: + """ + Отримати всі District-и з БД. + District = microdao з dao_type = 'district' + """ + pool = await get_pool() + query = """ + SELECT id, slug, name, description, dao_type, + orchestrator_agent_id, created_at + FROM microdaos + WHERE dao_type = 'district' + ORDER BY name + """ + rows = await pool.fetch(query) + return [dict(r) for r in rows] + + +async def get_district_by_slug(slug: str) -> Optional[Dict[str, Any]]: + """ + Отримати District за slug. + """ + pool = await get_pool() + query = """ + SELECT id, slug, name, description, dao_type, + orchestrator_agent_id, created_at + FROM microdaos + WHERE slug = $1 + AND dao_type = 'district' + """ + row = await pool.fetchrow(query, slug) + return dict(row) if row else None + + +async def get_district_lead_agent(district_id: str) -> Optional[Dict[str, Any]]: + """ + Отримати lead agent District-а. + Шукаємо спочатку role='district_lead', потім fallback на orchestrator. + """ + pool = await get_pool() + + # Try district_lead first + query = """ + SELECT a.id, a.display_name as name, a.kind, a.status, + a.avatar_url, a.gov_level, + ma.role as membership_role + FROM agents a + JOIN microdao_agents ma ON ma.agent_id = a.id + WHERE ma.microdao_id = $1 + AND ma.role = 'district_lead' + LIMIT 1 + """ + row = await pool.fetchrow(query, district_id) + + if not row: + # Fallback: orchestrator + query = """ + SELECT a.id, a.display_name as name, a.kind, a.status, + a.avatar_url, a.gov_level, + ma.role as membership_role + FROM agents a + JOIN microdao_agents ma ON ma.agent_id = a.id + WHERE ma.microdao_id = $1 + AND (ma.role = 'orchestrator' OR ma.is_core = true) + ORDER BY ma.is_core DESC + LIMIT 1 + """ + row = await pool.fetchrow(query, district_id) + + return dict(row) if row else None + + +async def get_district_core_team(district_id: str) -> List[Dict[str, Any]]: + """ + Отримати core team District-а. + """ + pool = await get_pool() + query = """ + SELECT a.id, a.display_name as name, a.kind, a.status, + a.avatar_url, a.gov_level, + ma.role as membership_role + FROM agents a + JOIN microdao_agents ma ON ma.agent_id = a.id + WHERE ma.microdao_id = $1 + AND (ma.role = 'core_team' OR ma.is_core = true) + AND ma.role != 'district_lead' + AND ma.role != 'orchestrator' + ORDER BY a.display_name + """ + rows = await pool.fetch(query, district_id) + return [dict(r) for r in rows] + + +async def get_district_agents(district_id: str) -> List[Dict[str, Any]]: + """ + Отримати всіх агентів District-а. + """ + pool = await get_pool() + query = """ + SELECT a.id, a.display_name as name, a.kind, a.status, + a.avatar_url, a.gov_level, + ma.role as membership_role, ma.is_core + FROM agents a + JOIN microdao_agents ma ON ma.agent_id = a.id + WHERE ma.microdao_id = $1 + ORDER BY + CASE ma.role + WHEN 'district_lead' THEN 0 + WHEN 'orchestrator' THEN 1 + WHEN 'core_team' THEN 2 + ELSE 3 + END, + ma.is_core DESC, + a.display_name + """ + rows = await pool.fetch(query, district_id) + return [dict(r) for r in rows] + + +async def get_district_rooms(district_slug: str) -> List[Dict[str, Any]]: + """ + Отримати кімнати District-а за slug-префіксом. + Наприклад: soul-lobby, soul-events, greenfood-lobby + """ + pool = await get_pool() + query = """ + SELECT id, slug, name, description, + matrix_room_id, matrix_room_alias, + room_role, is_public + FROM city_rooms + WHERE slug LIKE $1 + ORDER BY sort_order, name + """ + rows = await pool.fetch(query, f"{district_slug}-%") + return [dict(r) for r in rows] + + +async def get_district_nodes(district_id: str) -> List[Dict[str, Any]]: + """ + Отримати ноди District-а. + """ + pool = await get_pool() + query = """ + SELECT n.id, n.display_name as name, n.node_type as kind, + n.status, n.hostname as location, + n.guardian_agent_id, n.steward_agent_id + FROM nodes n + WHERE n.owner_microdao_id = $1 + ORDER BY n.display_name + """ + rows = await pool.fetch(query, district_id) + return [dict(r) for r in rows] + + +async def get_district_stats(district_id: str, district_slug: str) -> Dict[str, Any]: + """ + Отримати статистику District-а. + """ + pool = await get_pool() + + # Count agents + agents_count = await pool.fetchval( + "SELECT COUNT(*) FROM microdao_agents WHERE microdao_id = $1", + district_id + ) + + # Count rooms + rooms_count = await pool.fetchval( + "SELECT COUNT(*) FROM city_rooms WHERE slug LIKE $1", + f"{district_slug}-%" + ) + + # Count nodes + nodes_count = await pool.fetchval( + "SELECT COUNT(*) FROM nodes WHERE owner_microdao_id = $1", + district_id + ) + + return { + "agents_count": agents_count or 0, + "rooms_count": rooms_count or 0, + "nodes_count": nodes_count or 0 + } + + +# ============================================================================= +# DAGI Agent Audit Repository +# ============================================================================= + +async def get_agents_by_node_for_audit(node_id: str) -> List[Dict[str, Any]]: + """ + Отримати агентів для DAGI audit по node_id. + """ + pool = await get_pool() + + query = """ + SELECT + id::text, + external_id, + COALESCE(name, display_name) as name, + kind, + node_id, + status, + COALESCE(is_active, true) as is_active, + last_seen_at, + dagi_status, + created_at, + updated_at + FROM agents + WHERE node_id = $1 + AND COALESCE(is_archived, false) = false + AND COALESCE(is_test, false) = false + AND deleted_at IS NULL + ORDER BY name + """ + + rows = await pool.fetch(query, node_id) + + return [ + { + "id": row["id"], + "external_id": row["external_id"], + "name": row["name"], + "kind": row["kind"], + "node_id": row["node_id"], + "status": row["status"], + "is_active": row["is_active"], + "last_seen_at": row["last_seen_at"].isoformat() if row["last_seen_at"] else None, + "dagi_status": row["dagi_status"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None + } + for row in rows + ] + + +async def get_all_agents_for_audit() -> List[Dict[str, Any]]: + """ + Отримати всіх активних агентів для DAGI audit. + """ + pool = await get_pool() + + query = """ + SELECT + id::text, + external_id, + COALESCE(name, display_name) as name, + kind, + node_id, + status, + COALESCE(is_active, true) as is_active, + last_seen_at, + dagi_status, + created_at, + updated_at + FROM agents + WHERE COALESCE(is_archived, false) = false + AND COALESCE(is_test, false) = false + AND deleted_at IS NULL + ORDER BY name + """ + + rows = await pool.fetch(query) + + return [ + { + "id": row["id"], + "external_id": row["external_id"], + "name": row["name"], + "kind": row["kind"], + "node_id": row["node_id"], + "status": row["status"], + "is_active": row["is_active"], + "last_seen_at": row["last_seen_at"].isoformat() if row["last_seen_at"] else None, + "dagi_status": row["dagi_status"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None + } + for row in rows + ] + + +async def update_agents_dagi_status( + agent_ids: List[str], + status: str, + update_last_seen: bool = False +) -> int: + """ + Оновити dagi_status для групи агентів. + Повертає кількість оновлених записів. + """ + if not agent_ids: + return 0 + + pool = await get_pool() + + if update_last_seen: + query = """ + UPDATE agents + SET dagi_status = $2, + last_seen_at = NOW(), + updated_at = NOW() + WHERE id = ANY($1::uuid[]) + """ + else: + query = """ + UPDATE agents + SET dagi_status = $2, + updated_at = NOW() + WHERE id = ANY($1::uuid[]) + """ + + result = await pool.execute(query, agent_ids, status) + # asyncpg returns "UPDATE N" + return int(result.split(" ")[-1]) + + +async def save_dagi_audit_report( + node_id: str, + report_data: Dict[str, Any], + triggered_by: str = "api" +) -> Dict[str, Any]: + """ + Зберегти звіт DAGI audit. + """ + pool = await get_pool() + + import json + + summary = report_data.get("summary", {}) + + row = await pool.fetchrow(""" + INSERT INTO dagi_audit_reports ( + node_id, + router_total, + db_total, + active_count, + phantom_count, + stale_count, + report_data, + triggered_by + ) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8) + RETURNING id, node_id, timestamp, router_total, db_total, + active_count, phantom_count, stale_count, triggered_by + """, + node_id, + summary.get("router_total", 0), + summary.get("db_total", 0), + summary.get("active_count", 0), + summary.get("phantom_count", 0), + summary.get("stale_count", 0), + json.dumps(report_data), + triggered_by + ) + + return { + "id": str(row["id"]), + "node_id": row["node_id"], + "timestamp": row["timestamp"].isoformat(), + "router_total": row["router_total"], + "db_total": row["db_total"], + "active_count": row["active_count"], + "phantom_count": row["phantom_count"], + "stale_count": row["stale_count"], + "triggered_by": row["triggered_by"] + } + + +async def get_latest_dagi_audit(node_id: str) -> Optional[Dict[str, Any]]: + """ + Отримати останній DAGI audit звіт для ноди. + """ + pool = await get_pool() + + row = await pool.fetchrow(""" + SELECT id, node_id, timestamp, router_total, db_total, + active_count, phantom_count, stale_count, + report_data, triggered_by + FROM dagi_audit_reports + WHERE node_id = $1 + ORDER BY timestamp DESC + LIMIT 1 + """, node_id) + + if not row: + return None + + return { + "id": str(row["id"]), + "node_id": row["node_id"], + "timestamp": row["timestamp"].isoformat(), + "router_total": row["router_total"], + "db_total": row["db_total"], + "active_count": row["active_count"], + "phantom_count": row["phantom_count"], + "stale_count": row["stale_count"], + "report_data": row["report_data"], + "triggered_by": row["triggered_by"] + } + + +async def get_dagi_audit_history( + node_id: str, + limit: int = 10 +) -> List[Dict[str, Any]]: + """ + Отримати історію DAGI audit звітів для ноди. + """ + pool = await get_pool() + + rows = await pool.fetch(""" + SELECT id, node_id, timestamp, router_total, db_total, + active_count, phantom_count, stale_count, triggered_by + FROM dagi_audit_reports + WHERE node_id = $1 + ORDER BY timestamp DESC + LIMIT $2 + """, node_id, limit) + + return [ + { + "id": str(row["id"]), + "node_id": row["node_id"], + "timestamp": row["timestamp"].isoformat(), + "router_total": row["router_total"], + "db_total": row["db_total"], + "active_count": row["active_count"], + "phantom_count": row["phantom_count"], + "stale_count": row["stale_count"], + "triggered_by": row["triggered_by"] + } + for row in rows + ] + + +# ============================================================================= +# Node Metrics Repository +# ============================================================================= + +async def get_node_metrics_current(node_id: str) -> Optional[Dict[str, Any]]: + """ + Отримати поточні метрики ноди. + """ + pool = await get_pool() + + row = await pool.fetchrow(""" + SELECT + node_id, + node_name, + hostname, + status, + roles, + environment, + cpu_model, + cpu_cores, + COALESCE(cpu_usage, 0) as cpu_usage, + gpu_model, + COALESCE(gpu_vram_total, 0) as gpu_vram_total, + COALESCE(gpu_vram_used, 0) as gpu_vram_used, + COALESCE(ram_total, 0) as ram_total, + COALESCE(ram_used, 0) as ram_used, + COALESCE(disk_total, 0) as disk_total, + COALESCE(disk_used, 0) as disk_used, + COALESCE(agent_count_router, 0) as agent_count_router, + COALESCE(agent_count_system, 0) as agent_count_system, + last_heartbeat, + dagi_router_url, + COALESCE(swapper_healthy, false) as swapper_healthy, + COALESCE(swapper_models_loaded, 0) as swapper_models_loaded, + COALESCE(swapper_models_total, 0) as swapper_models_total, + updated_at + FROM node_cache + WHERE node_id = $1 + """, node_id) + + if not row: + return None + # Count agents from database agent_count = await pool.fetchval(""" SELECT COUNT(*) @@ -8,9 +3301,33 @@ AND deleted_at IS NULL """, node_id) - result = dict(row) - result["agents_total"] = agent_count or row["agent_count_router"] - + result = { + "node_id": row["node_id"], + "node_name": row["node_name"], + "hostname": row["hostname"], + "status": row["status"], + "roles": row["roles"] or [], + "environment": row["environment"], + "cpu_model": row["cpu_model"], + "cpu_cores": row["cpu_cores"] or 0, + "cpu_usage": float(row["cpu_usage"]) if row["cpu_usage"] else 0.0, + "gpu_model": row["gpu_model"], + "gpu_memory_total": row["gpu_vram_total"] or 0, + "gpu_memory_used": row["gpu_vram_used"] or 0, + "ram_total": row["ram_total"] or 0, + "ram_used": row["ram_used"] or 0, + "disk_total": row["disk_total"] or 0, + "disk_used": row["disk_used"] or 0, + "agent_count_router": row["agent_count_router"] or 0, + "agent_count_system": agent_count or 0, + "dagi_router_url": row["dagi_router_url"], + "swapper_healthy": row["swapper_healthy"], + "swapper_models_loaded": row["swapper_models_loaded"], + "swapper_models_total": row["swapper_models_total"], + "last_heartbeat": row["last_heartbeat"].isoformat() if row["last_heartbeat"] else None, + "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None + } + # Add GPU info for compatibility if row["gpu_model"]: result["gpu_info"] = f"{row['gpu_model']} ({row['gpu_vram_total']}MB)" @@ -49,3 +3366,652 @@ async def get_node_metrics(node_id: str) -> Optional[Dict[str, Any]]: result["swapper_state"] = {} return result + + +async def update_node_metrics( + node_id: str, + metrics: Dict[str, Any] +) -> bool: + """ + Оновити метрики ноди. + """ + pool = await get_pool() + + result = await pool.execute(""" + UPDATE node_cache SET + cpu_usage = COALESCE($2, cpu_usage), + gpu_vram_used = COALESCE($3, gpu_vram_used), + ram_used = COALESCE($4, ram_used), + disk_used = COALESCE($5, disk_used), + agent_count_router = COALESCE($6, agent_count_router), + agent_count_system = COALESCE($7, agent_count_system), + last_heartbeat = NOW(), + updated_at = NOW() + WHERE node_id = $1 + """, + node_id, + metrics.get("cpu_usage"), + metrics.get("gpu_vram_used"), + metrics.get("ram_used"), + metrics.get("disk_used"), + metrics.get("agent_count_router"), + metrics.get("agent_count_system") + ) + + return "UPDATE 1" in result + + +# ============================================================================= +# DAGI Router Agents Repository +# ============================================================================= + +async def get_dagi_router_agents_for_node(node_id: str) -> Dict[str, Any]: + """ + Отримати агентів DAGI Router для Node Cabinet таблиці. + Поєднує дані з audit report та agents table. + """ + pool = await get_pool() + + # Отримати останній audit + audit = await get_latest_dagi_audit(node_id) + + # Отримати метрики ноди для GPU/CPU info + node_metrics = await get_node_metrics_current(node_id) + + # Отримати всіх агентів з БД для цієї ноди + db_agents_rows = await pool.fetch(""" + SELECT + a.id::text, + a.external_id, + COALESCE(a.name, a.display_name) as name, + a.kind, + a.status, + a.node_id, + a.public_slug, + a.dagi_status, + a.last_seen_at, + a.is_public + FROM agents a + WHERE COALESCE(a.is_archived, false) = false + AND COALESCE(a.is_test, false) = false + AND a.deleted_at IS NULL + ORDER BY a.display_name + """) + + # Map db agents by normalized name and external_id + db_agents_map = {} + for row in db_agents_rows: + db_agents_map[row["id"]] = dict(row) + if row["external_id"]: + ext_id = row["external_id"].split(":")[-1].lower() if ":" in row["external_id"] else row["external_id"].lower() + db_agents_map[ext_id] = dict(row) + name_norm = row["name"].lower().replace(" ", "").replace("-", "").replace("_", "") if row["name"] else "" + if name_norm: + db_agents_map[name_norm] = dict(row) + + # Формуємо уніфікований список агентів + agents = [] + active_count = 0 + phantom_count = 0 + stale_count = 0 + + if audit and audit.get("report_data"): + report = audit["report_data"] + + # Active agents + for a in report.get("active_agents", []): + db_agent = db_agents_map.get(a.get("db_id")) + agents.append({ + "id": a.get("db_id") or a.get("router_id"), + "name": a.get("db_name") or a.get("router_name"), + "role": db_agent.get("kind") if db_agent else None, + "status": "active", + "node_id": node_id, + "models": [], # TODO: можна додати з router-config + "gpu": node_metrics.get("gpu_model") if node_metrics else None, + "cpu": f"{node_metrics.get('cpu_cores')} cores" if node_metrics else None, + "last_seen_at": db_agent.get("last_seen_at").isoformat() if db_agent and db_agent.get("last_seen_at") else None, + "has_cabinet": bool(db_agent and db_agent.get("public_slug")), + "cabinet_slug": db_agent.get("public_slug") if db_agent else None + }) + active_count += 1 + + # Phantom agents + for a in report.get("phantom_agents", []): + agents.append({ + "id": a.get("router_id"), + "name": a.get("router_name"), + "role": None, + "status": "phantom", + "node_id": node_id, + "models": [], + "gpu": node_metrics.get("gpu_model") if node_metrics else None, + "cpu": f"{node_metrics.get('cpu_cores')} cores" if node_metrics else None, + "last_seen_at": None, + "has_cabinet": False, + "cabinet_slug": None, + "description": a.get("description") + }) + phantom_count += 1 + + # Stale agents + for a in report.get("stale_agents", []): + db_agent = db_agents_map.get(a.get("db_id")) + agents.append({ + "id": a.get("db_id"), + "name": a.get("db_name"), + "role": db_agent.get("kind") if db_agent else a.get("kind"), + "status": "stale", + "node_id": node_id, + "models": [], + "gpu": node_metrics.get("gpu_model") if node_metrics else None, + "cpu": f"{node_metrics.get('cpu_cores')} cores" if node_metrics else None, + "last_seen_at": db_agent.get("last_seen_at").isoformat() if db_agent and db_agent.get("last_seen_at") else None, + "has_cabinet": bool(db_agent and db_agent.get("public_slug")), + "cabinet_slug": db_agent.get("public_slug") if db_agent else None + }) + stale_count += 1 + + # Check prompts status for all agents + agent_ids = [a["id"] for a in agents if a.get("id")] + prompts_status = await check_agents_prompts_status(agent_ids) if agent_ids else {} + + # Add has_prompts to each agent + for agent in agents: + agent["has_prompts"] = prompts_status.get(agent.get("id"), False) + + return { + "node_id": node_id, + "last_audit_at": audit.get("timestamp") if audit else None, + "summary": { + "active": active_count, + "phantom": phantom_count, + "stale": stale_count, + "router_total": audit.get("router_total", 0) if audit else 0, + "system_total": audit.get("db_total", 0) if audit else len(db_agents_rows) + }, + "agents": agents + } + + +async def sync_phantom_agents( + node_id: str, + agent_ids: List[str], + router_config: Dict[str, Any] +) -> List[Dict[str, Any]]: + """ + Синхронізувати phantom агентів (створити в БД). + """ + pool = await get_pool() + created = [] + + agents_config = router_config.get("agents", {}) + + for agent_id in agent_ids: + if agent_id not in agents_config: + continue + + agent_data = agents_config[agent_id] + + # Створити агента в БД + new_id = str(uuid.uuid4()) + + try: + row = await pool.fetchrow(""" + INSERT INTO agents ( + id, external_id, name, display_name, kind, + status, node_id, dagi_status, last_seen_at, + is_public, public_slug, created_at, updated_at + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, 'active', NOW(), true, $8, NOW(), NOW()) + ON CONFLICT (external_id) DO UPDATE SET + dagi_status = 'active', + last_seen_at = NOW(), + updated_at = NOW() + RETURNING id::text, name, external_id + """, + new_id, + f"agent:{agent_id}", + agent_id, + agent_id.replace("_", " ").title(), + "ai_agent", + "online", + node_id, + agent_id + ) + + if row: + created.append({ + "id": row["id"], + "name": row["name"], + "external_id": row["external_id"] + }) + except Exception as e: + print(f"Error creating agent {agent_id}: {e}") + + return created + + +async def mark_stale_agents(agent_ids: List[str]) -> int: + """ + Позначити агентів як stale. + """ + if not agent_ids: + return 0 + + pool = await get_pool() + + result = await pool.execute(""" + UPDATE agents + SET dagi_status = 'stale', + updated_at = NOW() + WHERE id = ANY($1::uuid[]) + """, agent_ids) + + return int(result.split(" ")[-1]) + + +async def get_node_agents(node_id: str) -> List[Dict[str, Any]]: + """ + Отримати всіх агентів ноди (Guardian, Steward, runtime agents). + """ + pool = await get_pool() + + query = """ + SELECT + a.id, + a.external_id, + COALESCE(a.display_name, a.name) as display_name, + a.kind, + a.status, + a.node_id, + a.public_slug, + a.dagi_status, + a.last_seen_at, + COALESCE(a.is_node_guardian, false) as is_node_guardian, + COALESCE(a.is_node_steward, false) as is_node_steward + FROM agents a + WHERE a.node_id = $1 + AND COALESCE(a.is_archived, false) = false + AND COALESCE(a.is_test, false) = false + AND a.deleted_at IS NULL + ORDER BY + CASE + WHEN a.kind = 'node_guardian' OR a.is_node_guardian THEN 1 + WHEN a.kind = 'node_steward' OR a.is_node_steward THEN 2 + ELSE 3 + END, + a.display_name + """ + + rows = await pool.fetch(query, node_id) + return [dict(row) for row in rows] + + +# ============================================================================== +# Node Self-Registration & Self-Healing +# ============================================================================== + +async def node_self_register( + node_id: str, + name: str, + hostname: Optional[str] = None, + environment: str = "development", + roles: Optional[List[str]] = None, + description: Optional[str] = None +) -> Dict[str, Any]: + """ + Самореєстрація ноди. Викликається з Node Bootstrap або Node Guardian. + + Якщо нода вже існує — оновлює, інакше — створює. + Також забезпечує наявність запису в node_cache. + """ + pool = await get_pool() + roles = roles or [] + + try: + # Використати SQL функцію для атомарної операції + result = await pool.fetchval(""" + SELECT fn_node_self_register($1, $2, $3, $4, $5) + """, node_id, name, hostname, environment, roles) + + if result: + import json + return json.loads(result) + except Exception as e: + # Fallback якщо функція не існує (ще не запущена міграція) + logger.warning(f"fn_node_self_register not available, using fallback: {e}") + + # Fallback: пряма вставка/оновлення + try: + # Check if exists + existing = await pool.fetchval( + "SELECT id FROM node_registry WHERE id = $1", + node_id + ) + is_new = existing is None + + if is_new: + await pool.execute(""" + INSERT INTO node_registry (id, name, hostname, environment, roles, description, is_active, registered_at, updated_at, last_self_registration, self_registration_count) + VALUES ($1, $2, $3, $4, $5, $6, true, NOW(), NOW(), NOW(), 1) + """, node_id, name, hostname, environment, roles, description) + else: + await pool.execute(""" + UPDATE node_registry SET + name = COALESCE(NULLIF($2, ''), name), + hostname = COALESCE($3, hostname), + environment = COALESCE(NULLIF($4, ''), environment), + roles = CASE WHEN array_length($5::text[], 1) > 0 THEN $5 ELSE roles END, + description = COALESCE($6, description), + is_active = true, + updated_at = NOW(), + last_self_registration = NOW(), + self_registration_count = COALESCE(self_registration_count, 0) + 1 + WHERE id = $1 + """, node_id, name, hostname, environment, roles, description) + + # Ensure node_cache entry + await pool.execute(""" + INSERT INTO node_cache (node_id, last_heartbeat, self_healing_status) + VALUES ($1, NOW(), 'healthy') + ON CONFLICT (node_id) DO UPDATE SET + last_heartbeat = NOW(), + self_healing_status = 'healthy' + """, node_id) + + return { + "success": True, + "node_id": node_id, + "is_new": is_new, + "message": "Node registered" if is_new else "Node updated" + } + except Exception as e: + # Ultimate fallback: just update node_cache + logger.warning(f"node_registry insert failed, updating node_cache: {e}") + try: + await pool.execute(""" + INSERT INTO node_cache (node_id, node_name, hostname, environment, roles, last_heartbeat) + VALUES ($1, $2, $3, $4, $5, NOW()) + ON CONFLICT (node_id) DO UPDATE SET + node_name = COALESCE(NULLIF($2, ''), node_cache.node_name), + hostname = COALESCE($3, node_cache.hostname), + environment = COALESCE(NULLIF($4, ''), node_cache.environment), + roles = CASE WHEN array_length($5::text[], 1) > 0 THEN $5 ELSE node_cache.roles END, + last_heartbeat = NOW() + """, node_id, name, hostname, environment, roles) + + return { + "success": True, + "node_id": node_id, + "is_new": False, + "message": "Node updated (fallback to node_cache)" + } + except Exception as fallback_error: + logger.error(f"Failed to register node {node_id}: {fallback_error}") + return { + "success": False, + "node_id": node_id, + "error": str(fallback_error) + } + + +async def node_heartbeat( + node_id: str, + metrics: Optional[Dict[str, Any]] = None +) -> Dict[str, Any]: + """ + Heartbeat ноди з оновленням метрик. + + Повертає should_self_register=True якщо нода не зареєстрована. + """ + pool = await get_pool() + metrics = metrics or {} + + try: + # Використати SQL функцію + result = await pool.fetchval(""" + SELECT fn_node_heartbeat($1, $2) + """, node_id, json.dumps(metrics) if metrics else None) + + if result: + return json.loads(result) + except Exception as e: + logger.warning(f"fn_node_heartbeat not available, using fallback: {e}") + + # Fallback + try: + # Check if registered + registered = await pool.fetchval(""" + SELECT EXISTS(SELECT 1 FROM node_registry WHERE id = $1 AND is_active = true) + """, node_id) + + if not registered: + # Check node_cache as fallback + cache_exists = await pool.fetchval(""" + SELECT EXISTS(SELECT 1 FROM node_cache WHERE node_id = $1) + """, node_id) + + if not cache_exists: + return { + "success": False, + "error": "Node not registered", + "should_self_register": True + } + + # Update heartbeat + swapper_state = metrics.get("swapper_state") + swapper_state_json = json.dumps(swapper_state) if swapper_state else None + + await pool.execute(""" + UPDATE node_cache SET + last_heartbeat = NOW(), + self_healing_status = 'healthy', + cpu_usage = COALESCE($2::numeric, cpu_usage), + gpu_vram_used = COALESCE($3::integer, gpu_vram_used), + ram_used = COALESCE($4::integer, ram_used), + disk_used = COALESCE($5::integer, disk_used), + agent_count_router = COALESCE($6::integer, agent_count_router), + agent_count_system = COALESCE($7::integer, agent_count_system), + dagi_router_url = COALESCE($8, dagi_router_url), + swapper_healthy = COALESCE($9::boolean, swapper_healthy), + swapper_models_loaded = COALESCE($10::integer, swapper_models_loaded), + swapper_models_total = COALESCE($11::integer, swapper_models_total), + swapper_state = COALESCE($12::jsonb, swapper_state) + WHERE node_id = $1 + """, + node_id, + metrics.get("cpu_usage"), + metrics.get("gpu_vram_used"), + metrics.get("ram_used"), + metrics.get("disk_used"), + metrics.get("agent_count_router"), + metrics.get("agent_count_system"), + metrics.get("dagi_router_url"), + metrics.get("swapper_healthy"), + metrics.get("swapper_models_loaded"), + metrics.get("swapper_models_total"), + swapper_state_json + ) + + return { + "success": True, + "node_id": node_id, + "heartbeat_at": datetime.now(timezone.utc).isoformat() + } + except Exception as e: + logger.error(f"Heartbeat failed for {node_id}: {e}") + return { + "success": False, + "error": str(e) + } + + +async def check_node_in_directory(node_id: str) -> bool: + """ + Перевірити чи нода видима в Node Directory. + Використовується Node Guardian для self-healing. + """ + pool = await get_pool() + + try: + # Check node_registry first + exists = await pool.fetchval(""" + SELECT EXISTS( + SELECT 1 FROM node_registry + WHERE id = $1 AND is_active = true + ) + """, node_id) + return bool(exists) + except Exception: + # Fallback to node_cache + try: + exists = await pool.fetchval(""" + SELECT EXISTS(SELECT 1 FROM node_cache WHERE node_id = $1) + """, node_id) + return bool(exists) + except Exception: + return False + + +async def get_node_self_healing_status(node_id: str) -> Dict[str, Any]: + """ + Отримати статус self-healing для ноди. + """ + pool = await get_pool() + + try: + row = await pool.fetchrow(""" + SELECT + nr.id, + nr.name, + nr.is_active, + nr.last_self_registration, + nr.self_registration_count, + nc.self_healing_status, + nc.self_healing_last_check, + nc.self_healing_errors, + nc.last_heartbeat, + nc.agent_count_router, + nc.agent_count_system, + nc.guardian_agent_id, + nc.steward_agent_id + FROM node_registry nr + LEFT JOIN node_cache nc ON nc.node_id = nr.id + WHERE nr.id = $1 + """, node_id) + + if not row: + return { + "node_id": node_id, + "registered": False, + "status": "not_found" + } + + return { + "node_id": node_id, + "registered": True, + "is_active": row["is_active"], + "name": row["name"], + "self_healing_status": row["self_healing_status"] or "unknown", + "last_heartbeat": row["last_heartbeat"].isoformat() if row["last_heartbeat"] else None, + "last_self_registration": row["last_self_registration"].isoformat() if row["last_self_registration"] else None, + "self_registration_count": row["self_registration_count"] or 0, + "agent_count_router": row["agent_count_router"] or 0, + "agent_count_system": row["agent_count_system"] or 0, + "has_guardian": bool(row["guardian_agent_id"]), + "has_steward": bool(row["steward_agent_id"]), + "errors": row["self_healing_errors"] or [] + } + except Exception as e: + logger.error(f"Failed to get self-healing status for {node_id}: {e}") + return { + "node_id": node_id, + "registered": False, + "status": "error", + "error": str(e) + } + + +async def update_node_self_healing_status( + node_id: str, + status: str, + error: Optional[str] = None +) -> bool: + """ + Оновити статус self-healing для ноди. + """ + pool = await get_pool() + + try: + if error: + await pool.execute(""" + UPDATE node_cache SET + self_healing_status = $2, + self_healing_last_check = NOW(), + self_healing_errors = COALESCE(self_healing_errors, '[]'::jsonb) || jsonb_build_object( + 'timestamp', NOW(), + 'error', $3 + ) + WHERE node_id = $1 + """, node_id, status, error) + else: + await pool.execute(""" + UPDATE node_cache SET + self_healing_status = $2, + self_healing_last_check = NOW() + WHERE node_id = $1 + """, node_id, status) + + return True + except Exception as e: + logger.error(f"Failed to update self-healing status for {node_id}: {e}") + return False + + +async def get_nodes_needing_healing() -> List[Dict[str, Any]]: + """ + Отримати список нод, які потребують self-healing. + + Критерії: + - heartbeat старший за 10 хвилин + - agent_count_router = 0 + - немає guardian_agent_id + - self_healing_status = 'error' + """ + pool = await get_pool() + + try: + rows = await pool.fetch(""" + SELECT + nr.id as node_id, + nr.name, + nc.last_heartbeat, + nc.agent_count_router, + nc.agent_count_system, + nc.guardian_agent_id, + nc.steward_agent_id, + nc.self_healing_status, + CASE + WHEN nc.last_heartbeat < NOW() - INTERVAL '10 minutes' THEN 'stale_heartbeat' + WHEN nc.agent_count_router = 0 OR nc.agent_count_router IS NULL THEN 'no_router_agents' + WHEN nc.guardian_agent_id IS NULL THEN 'no_guardian' + WHEN nc.self_healing_status = 'error' THEN 'previous_error' + ELSE 'unknown' + END as healing_reason + FROM node_registry nr + LEFT JOIN node_cache nc ON nc.node_id = nr.id + WHERE nr.is_active = true + AND ( + nc.last_heartbeat < NOW() - INTERVAL '10 minutes' + OR nc.agent_count_router = 0 + OR nc.agent_count_router IS NULL + OR nc.guardian_agent_id IS NULL + OR nc.self_healing_status = 'error' + ) + """) + + return [dict(row) for row in rows] + except Exception as e: + logger.error(f"Failed to get nodes needing healing: {e}") + return []