Files
microdao-daarion/services/city-service/routes_city.py

4727 lines
173 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
City Backend API Routes
"""
from fastapi import APIRouter, HTTPException, Depends, Body, Header, Query, Request, UploadFile, File, Form
from pydantic import BaseModel
from typing import List, Optional, Dict
from datetime import datetime, timezone
import logging
import httpx
import os
import io
import uuid
from PIL import Image
import shutil
# MinIO assets client
try:
from lib.assets_client import upload_asset as minio_upload_asset
MINIO_AVAILABLE = True
except ImportError as e:
MINIO_AVAILABLE = False
logger.warning(f"MinIO client not available, falling back to local storage: {e}")
from models_city import (
CityRoomRead,
CityRoomCreate,
CityRoomDetail,
CityRoomMessageRead,
CityRoomMessageCreate,
CityFeedEventRead,
CityMapRoom,
CityMapConfig,
CityMapResponse,
AgentRead,
AgentPresence,
AgentSummary,
MicrodaoBadge,
HomeNodeView,
NodeProfile,
PublicCitizenSummary,
PublicCitizenProfile,
CitizenInteractionInfo,
CitizenAskRequest,
CitizenAskResponse,
AgentMicrodaoMembership,
MicrodaoSummary,
MicrodaoDetail,
MicrodaoAgentView,
MicrodaoChannelView,
MicrodaoCitizenView,
MicrodaoOption,
CityRoomSummary,
MicrodaoRoomsList,
MicrodaoRoomUpdate,
AttachExistingRoomRequest,
SwapperModel,
NodeSwapperDetail,
CreateAgentRequest,
CreateAgentResponse,
DeleteAgentResponse,
CreateMicrodaoRoomRequest
)
import repo_city
from common.redis_client import PresenceRedis, get_redis
from matrix_client import (
create_matrix_room,
find_matrix_room_by_alias,
join_user_to_room,
send_message_to_room,
get_room_messages,
check_matrix_gateway_health,
ensure_room_has_matrix
)
from dagi_router_client import get_dagi_router_client, DagiRouterClient
logger = logging.getLogger(__name__)
# JWT validation (simplified for MVP)
AUTH_SERVICE_URL = os.getenv("AUTH_SERVICE_URL", "http://daarion-auth:7020")
MATRIX_GATEWAY_URL = os.getenv("MATRIX_GATEWAY_URL", "http://daarion-matrix-gateway:7025")
# Helper for image processing
def process_image(image_bytes: bytes, max_size: int = 1024, force_square: bool = False) -> tuple[bytes, bytes]:
"""
Process image:
1. Convert to PNG (any format accepted)
2. Resize to fit within max_size (preserving aspect ratio)
3. Optionally force square crop for avatars/logos
4. Generate thumbnail 128x128
Returns (processed_bytes, thumb_bytes)
"""
with Image.open(io.BytesIO(image_bytes)) as img:
# Convert to RGBA/RGB
if img.mode in ('P', 'CMYK', 'LA'):
img = img.convert('RGBA')
elif img.mode != 'RGBA':
img = img.convert('RGB')
# Force square crop if needed (for avatars/logos)
if force_square:
min_dim = min(img.width, img.height)
left = (img.width - min_dim) / 2
top = (img.height - min_dim) / 2
img = img.crop((left, top, left + min_dim, top + min_dim))
# Resize to fit within max_size (preserve aspect ratio)
if img.width > max_size or img.height > max_size:
img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Save processed
processed_io = io.BytesIO()
img.save(processed_io, format='PNG', optimize=True)
processed_bytes = processed_io.getvalue()
# Thumbnail
img.thumbnail((128, 128))
thumb_io = io.BytesIO()
img.save(thumb_io, format='PNG', optimize=True)
thumb_bytes = thumb_io.getvalue()
return processed_bytes, thumb_bytes
router = APIRouter(prefix="/city", tags=["city"])
public_router = APIRouter(prefix="/public", tags=["public"])
api_router = APIRouter(prefix="/api/v1", tags=["api_v1"])
class MicrodaoMembershipPayload(BaseModel):
microdao_id: str
role: Optional[str] = None
is_core: bool = False
# =============================================================================
# Agents API (for Agent Console)
# =============================================================================
@public_router.get("/agents")
async def list_agents(
kind: Optional[str] = Query(None, description="Filter by agent kind"),
node_id: Optional[str] = Query(None, description="Filter by node_id"),
microdao_id: Optional[str] = Query(None, description="Filter by microDAO id"),
is_public: Optional[bool] = Query(None, description="Filter by public status"),
visibility_scope: Optional[str] = Query(None, description="Filter by visibility: global, microdao, private"),
include_system: bool = Query(True, description="Include system agents"),
limit: int = Query(100, le=200),
offset: int = Query(0, ge=0)
):
"""Список всіх агентів для Agent Console (unified API)"""
try:
kinds_list = [kind] if kind else None
agents, total = await repo_city.list_agent_summaries(
node_id=node_id,
microdao_id=microdao_id,
is_public=is_public,
visibility_scope=visibility_scope,
kinds=kinds_list,
include_system=include_system,
limit=limit,
offset=offset
)
items: List[AgentSummary] = []
for agent in agents:
# Build home_node if available
home_node_data = agent.get("home_node")
home_node = None
if home_node_data:
home_node = HomeNodeView(
id=home_node_data.get("id"),
name=home_node_data.get("name"),
hostname=home_node_data.get("hostname"),
roles=home_node_data.get("roles", []),
environment=home_node_data.get("environment")
)
# Build microdao badges
microdaos = [
MicrodaoBadge(
id=m.get("id", ""),
name=m.get("name", ""),
slug=m.get("slug"),
role=m.get("role")
)
for m in agent.get("microdaos", [])
]
items.append(AgentSummary(
id=agent["id"],
slug=agent.get("slug"),
display_name=agent["display_name"],
title=agent.get("title"),
tagline=agent.get("tagline"),
kind=agent.get("kind", "assistant"),
avatar_url=agent.get("avatar_url"),
status=agent.get("status", "offline"),
node_id=agent.get("node_id"),
node_label=agent.get("node_label"),
home_node=home_node,
# Governance & DAIS (A1, A2)
gov_level=agent.get("gov_level"),
dais_identity_id=agent.get("dais_identity_id"),
visibility_scope=agent.get("visibility_scope", "city"),
is_listed_in_directory=agent.get("is_listed_in_directory", True),
is_system=agent.get("is_system", False),
is_public=agent.get("is_public", False),
is_orchestrator=agent.get("is_orchestrator", False),
# MicroDAO (A3)
primary_microdao_id=agent.get("primary_microdao_id"),
primary_microdao_name=agent.get("primary_microdao_name"),
primary_microdao_slug=agent.get("primary_microdao_slug"),
home_microdao_id=agent.get("home_microdao_id"),
home_microdao_name=agent.get("home_microdao_name"),
home_microdao_slug=agent.get("home_microdao_slug"),
district=agent.get("district"),
microdaos=microdaos,
microdao_memberships=agent.get("microdao_memberships", []),
public_skills=agent.get("public_skills", [])
))
return {"items": items, "total": total}
except Exception as e:
logger.error(f"Failed to list agents: {e}")
raise HTTPException(status_code=500, detail="Failed to list agents")
class AgentVisibilityPayload(BaseModel):
"""Agent visibility update payload (Task 039)"""
is_public: bool
public_title: Optional[str] = None
public_tagline: Optional[str] = None
public_slug: Optional[str] = None
visibility_scope: Optional[str] = None # 'global' | 'microdao' | 'private'
@router.put("/agents/{agent_id}/visibility")
async def update_agent_visibility_endpoint(
agent_id: str,
payload: AgentVisibilityPayload
):
"""Оновити налаштування видимості агента (Task 039)"""
try:
# Validate visibility_scope if provided
valid_scopes = ("global", "microdao", "private", "city", "owner_only") # support legacy too
if payload.visibility_scope and payload.visibility_scope not in valid_scopes:
raise HTTPException(
status_code=400,
detail=f"visibility_scope must be one of: {', '.join(valid_scopes)}"
)
# Normalize legacy values
scope = payload.visibility_scope
if scope == "city":
scope = "global"
elif scope == "owner_only":
scope = "private"
# Validate: if is_public, slug is required
if payload.is_public and not payload.public_slug:
# If slug is missing but we have agent_id, maybe we can't generate it here safely without duplicate check?
# The prompt says "якщо is_public = true → slug обовʼязковий"
# But let's see if we can fetch existing slug if not provided?
# For now, enforce requirement as per prompt
raise HTTPException(status_code=400, detail="public_slug is required when is_public is true")
# Validate slug format if provided
if payload.public_slug:
import re
if not re.match(r'^[a-z0-9_-]+$', payload.public_slug.lower()):
raise HTTPException(status_code=400, detail="public_slug must contain only lowercase letters, numbers, underscores, and hyphens")
# Use unified update_agent_public_profile
# We need to fetch existing values for fields not provided?
# repo_city.update_agent_public_profile replaces values.
# So we should probably fetch the agent first to preserve other fields if they are None in payload?
# But the payload fields are optional.
# Let's assume if they are passed as None, we might want to keep them or clear them?
# Usually PUT replaces. PATCH updates. This is PUT.
# But let's be safe and fetch current profile to merge if needed, or just update what we have.
# Actually, update_agent_public_profile updates all passed fields.
# Let's fetch current to ensure we don't wipe out existing data if payload sends None but we want to keep it.
# However, the prompt implies these are the fields to update.
current_profile = await repo_city.get_agent_public_profile(agent_id)
if not current_profile:
raise HTTPException(status_code=404, detail="Agent not found")
result = await repo_city.update_agent_public_profile(
agent_id=agent_id,
is_public=payload.is_public,
public_slug=payload.public_slug or current_profile.get("public_slug"),
public_title=payload.public_title if payload.public_title is not None else current_profile.get("public_title"),
public_tagline=payload.public_tagline if payload.public_tagline is not None else current_profile.get("public_tagline"),
public_skills=current_profile.get("public_skills"), # Preserve skills
public_district=current_profile.get("public_district"), # Preserve district
public_primary_room_slug=current_profile.get("public_primary_room_slug") # Preserve room
)
# Also update visibility_scope if provided
if scope:
await repo_city.update_agent_visibility(
agent_id=agent_id,
is_public=payload.is_public,
visibility_scope=scope
)
result["visibility_scope"] = scope
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update agent visibility: {e}")
raise HTTPException(status_code=500, detail="Failed to update visibility")
# =============================================================================
# Assets & Branding API (Task 042)
# =============================================================================
@router.post("/assets/upload")
async def upload_asset(
file: UploadFile = File(...),
type: str = Form(...) # microdao_logo, microdao_banner, room_logo, room_banner, agent_avatar
):
"""Upload asset (logo/banner/avatar) with auto-processing. Accepts any image format."""
try:
# Validate type
valid_types = ['microdao_logo', 'microdao_banner', 'room_logo', 'room_banner', 'agent_avatar']
if type not in valid_types:
raise HTTPException(status_code=400, detail=f"Invalid asset type. Valid: {valid_types}")
# Validate file size (20MB limit)
content = await file.read()
if len(content) > 20 * 1024 * 1024:
raise HTTPException(status_code=400, detail="File too large (max 20MB)")
# Process image based on type
# Logos and avatars: square, max 512px
# Banners: max 1920px width, preserve aspect ratio
if 'banner' in type:
max_size = 1920
force_square = False
else:
max_size = 512
force_square = True # Square crop for logos/avatars
processed_bytes, thumb_bytes = process_image(content, max_size=max_size, force_square=force_square)
# Map type to prefix
type_to_prefix = {
'microdao_logo': 'microdao/logo',
'microdao_banner': 'microdao/banner',
'room_logo': 'rooms/logo',
'room_banner': 'rooms/banner',
'agent_avatar': 'agents/avatar',
}
prefix = type_to_prefix.get(type, 'uploads')
# Upload to MinIO if available, otherwise fallback to local storage
if MINIO_AVAILABLE:
try:
# Upload processed image
processed_file = io.BytesIO(processed_bytes)
processed_url = minio_upload_asset(
processed_file,
content_type="image/png",
prefix=prefix,
filename=file.filename
)
# Upload thumbnail if exists
thumb_url = None
if thumb_bytes:
thumb_file = io.BytesIO(thumb_bytes)
thumb_url = minio_upload_asset(
thumb_file,
content_type="image/png",
prefix=f"{prefix}/thumb",
filename=file.filename
)
return {
"original_url": processed_url,
"processed_url": processed_url,
"thumb_url": thumb_url or processed_url
}
except Exception as e:
logger.warning(f"MinIO upload failed, falling back to local: {e}")
# Fall through to local storage
# Fallback: Save to local disk
filename = f"{uuid.uuid4()}.png"
filepath = f"static/uploads/{filename}"
thumb_filepath = f"static/uploads/thumb_{filename}"
os.makedirs("static/uploads", exist_ok=True)
with open(filepath, "wb") as f:
f.write(processed_bytes)
if thumb_bytes:
with open(thumb_filepath, "wb") as f:
f.write(thumb_bytes)
# Construct URLs
base_url = "/api/static/uploads"
return {
"original_url": f"{base_url}/{filename}",
"processed_url": f"{base_url}/{filename}",
"thumb_url": f"{base_url}/thumb_{filename}" if thumb_bytes else f"{base_url}/{filename}"
}
except Exception as e:
logger.error(f"Upload failed: {e}")
raise HTTPException(status_code=500, detail="Upload failed")
class BrandingUpdatePayload(BaseModel):
logo_url: Optional[str] = None
banner_url: Optional[str] = None
@router.patch("/microdao/{slug}/branding")
async def update_microdao_branding_endpoint(slug: str, payload: BrandingUpdatePayload):
"""Update MicroDAO branding"""
try:
# Check exists
dao = await repo_city.get_microdao_by_slug(slug)
if not dao:
raise HTTPException(status_code=404, detail="MicroDAO not found")
# Update
result = await repo_city.update_microdao_branding(
microdao_slug=slug,
logo_url=payload.logo_url,
banner_url=payload.banner_url
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update branding: {e}")
raise HTTPException(status_code=500, detail="Failed to update branding")
@router.patch("/rooms/{room_id}/branding")
async def update_room_branding_endpoint(room_id: str, payload: BrandingUpdatePayload):
"""Update Room branding"""
try:
# Check exists
room = await repo_city.get_room_by_id(room_id)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
# Update
result = await repo_city.update_room_branding(
room_id=room_id,
logo_url=payload.logo_url,
banner_url=payload.banner_url
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update room branding: {e}")
raise HTTPException(status_code=500, detail="Failed to update room branding")
# =============================================================================
# Nodes API (for Node Directory)
# =============================================================================
@public_router.get("/nodes/join/instructions")
async def get_node_join_instructions():
"""
Отримати інструкції з підключення нової ноди.
"""
instructions = """
# Як підключити нову ноду до DAARION
Вітаємо! Ви вирішили розширити обчислювальну потужність мережі DAARION.
Цей гайд допоможе вам розгорнути власну ноду та підключити її до кластера.
## Вимоги до заліза (Мінімальні)
- **CPU**: 4 cores
- **RAM**: 16 GB (рекомендовано 32+ GB для LLM)
- **Disk**: 100 GB SSD
- **OS**: Ubuntu 22.04 LTS / Debian 11+
- **Network**: Статична IP адреса, відкриті порти
## Крок 1: Підготовка сервера
Встановіть Docker та Docker Compose:
```bash
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
```
## Крок 2: Отримання токенів
Для підключення вам знадобляться:
1. **NATS Connection URL** (від адміністратора)
2. **NATS Credentials File** (`.creds`) (від адміністратора)
Зверніться до адміністраторів мережі у [Discord/Matrix], щоб отримати доступ.
## Крок 3: Розгортання Node Runtime
Створіть директорію `daarion-node` та файл `docker-compose.yml`:
```yaml
version: '3.8'
services:
# 1. NATS Leaf Node (міст до ядра)
nats-leaf:
image: nats:2.10-alpine
volumes:
- ./nats.conf:/etc/nats/nats.conf
- ./creds:/etc/nats/creds
ports:
- "4222:4222"
# 2. Node Registry (реєстрація в мережі)
node-registry:
image: daarion/node-registry:latest
environment:
- NODE_ID=my-node-01 # Змініть на унікальне ім'я
- NATS_URL=nats://nats-leaf:4222
- REGION=eu-central
depends_on:
- nats-leaf
# 3. Ollama (AI Runtime)
ollama:
image: ollama/ollama:latest
volumes:
- ollama_data:/root/.ollama
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
volumes:
ollama_data:
```
## Крок 4: Запуск
```bash
docker compose up -d
```
## Крок 5: Перевірка
Перейдіть у консоль **Nodes** на https://app.daarion.space/nodes.
Ваша нода має з'явитися у списку зі статусом **Online**.
"""
return {"content": instructions}
@public_router.get("/nodes")
async def list_nodes():
"""Список всіх нод мережі з метриками"""
try:
from models_city import NodeMetrics, NodeAgentSummary
nodes = await repo_city.get_all_nodes()
items: List[NodeProfile] = []
for node in nodes:
# Build guardian agent
guardian_agent = None
if node.get("guardian_agent"):
guardian_agent = NodeAgentSummary(
id=node["guardian_agent"]["id"],
name=node["guardian_agent"].get("name"),
slug=node["guardian_agent"].get("slug")
)
# Build steward agent
steward_agent = None
if node.get("steward_agent"):
steward_agent = NodeAgentSummary(
id=node["steward_agent"]["id"],
name=node["steward_agent"].get("name"),
slug=node["steward_agent"].get("slug")
)
# Build metrics
metrics = None
if node.get("metrics"):
m = node["metrics"]
metrics = NodeMetrics(
cpu_model=m.get("cpu_model"),
cpu_cores=m.get("cpu_cores", 0),
cpu_usage=m.get("cpu_usage", 0.0),
gpu_model=m.get("gpu_model"),
gpu_vram_total=m.get("gpu_vram_total", 0),
gpu_vram_used=m.get("gpu_vram_used", 0),
ram_total=m.get("ram_total", 0),
ram_used=m.get("ram_used", 0),
disk_total=m.get("disk_total", 0),
disk_used=m.get("disk_used", 0),
agent_count_router=m.get("agent_count_router", 0),
agent_count_system=m.get("agent_count_system", 0),
dagi_router_url=m.get("dagi_router_url")
)
items.append(NodeProfile(
node_id=node["node_id"],
name=node["name"],
hostname=node.get("hostname"),
roles=list(node.get("roles") or []),
environment=node.get("environment", "unknown"),
status=node.get("status", "offline"),
gpu_info=node.get("gpu"),
agents_total=node.get("agents_total", 0),
agents_online=node.get("agents_online", 0),
last_heartbeat=str(node["last_heartbeat"]) if node.get("last_heartbeat") else None,
guardian_agent=guardian_agent,
steward_agent=steward_agent,
metrics=metrics
))
return {"items": items, "total": len(items)}
except Exception as e:
logger.error(f"Failed to list nodes: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail="Failed to list nodes")
@public_router.get("/nodes/{node_id}")
async def get_node_profile(node_id: str):
"""Отримати профіль ноди з Guardian та Steward агентами"""
try:
node = await repo_city.get_node_by_id(node_id)
if not node:
raise HTTPException(status_code=404, detail="Node not found")
# Build guardian agent summary
guardian_agent = None
if node.get("guardian_agent"):
from models_city import NodeAgentSummary
guardian_agent = NodeAgentSummary(
id=node["guardian_agent"]["id"],
name=node["guardian_agent"]["name"],
kind=node["guardian_agent"].get("kind"),
slug=node["guardian_agent"].get("slug")
)
# Build steward agent summary
steward_agent = None
if node.get("steward_agent"):
from models_city import NodeAgentSummary
steward_agent = NodeAgentSummary(
id=node["steward_agent"]["id"],
name=node["steward_agent"]["name"],
kind=node["steward_agent"].get("kind"),
slug=node["steward_agent"].get("slug")
)
return NodeProfile(
node_id=node["node_id"],
name=node["name"],
hostname=node.get("hostname"),
roles=list(node.get("roles") or []),
environment=node.get("environment", "unknown"),
status=node.get("status", "offline"),
gpu_info=node.get("gpu"),
agents_total=node.get("agents_total", 0),
agents_online=node.get("agents_online", 0),
last_heartbeat=str(node["last_heartbeat"]) if node.get("last_heartbeat") else None,
guardian_agent_id=node.get("guardian_agent_id"),
steward_agent_id=node.get("steward_agent_id"),
guardian_agent=guardian_agent,
steward_agent=steward_agent
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get node {node_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to get node")
# =============================================================================
# Public Citizens API
# =============================================================================
@public_router.get("/citizens")
async def list_public_citizens(
district: Optional[str] = Query(None, description="Filter by district"),
kind: Optional[str] = Query(None, description="Filter by agent kind"),
q: Optional[str] = Query(None, description="Search by display name or title"),
limit: int = Query(50, le=100),
offset: int = Query(0, ge=0)
):
"""Публічний список громадян з фільтрами"""
try:
citizens, total = await repo_city.get_public_citizens(
district=district,
kind=kind,
q=q,
limit=limit,
offset=offset
)
items: List[PublicCitizenSummary] = []
for citizen in citizens:
# Build home_node if available
home_node_data = citizen.get("home_node")
home_node = None
if home_node_data:
home_node = HomeNodeView(
id=home_node_data.get("id"),
name=home_node_data.get("name"),
hostname=home_node_data.get("hostname"),
roles=home_node_data.get("roles", []),
environment=home_node_data.get("environment")
)
items.append(PublicCitizenSummary(
slug=citizen.get("public_slug") or citizen.get("id"),
display_name=citizen["display_name"],
public_title=citizen.get("public_title"),
public_tagline=citizen.get("public_tagline"),
avatar_url=citizen.get("avatar_url"),
kind=citizen.get("kind"),
district=citizen.get("public_district"),
primary_room_slug=citizen.get("public_primary_room_slug"),
public_skills=citizen.get("public_skills", []),
online_status=citizen.get("online_status"),
status=citizen.get("status"),
home_node=home_node
))
return {"items": items, "total": total}
except Exception as e:
logger.error(f"Failed to list public citizens: {e}")
raise HTTPException(status_code=500, detail="Failed to list public citizens")
@public_router.get("/citizens/{slug}")
async def get_public_citizen(slug: str, request: Request):
"""Отримати публічний профіль громадянина"""
try:
include_admin_url = False
authorization = request.headers.get("Authorization")
if authorization:
user_info = await validate_jwt_token(authorization)
if user_info:
roles = user_info.get("roles", [])
if any(role in ["admin", "architect"] for role in roles):
include_admin_url = True
citizen = await repo_city.get_public_citizen_by_slug(slug)
if not citizen:
raise HTTPException(status_code=404, detail=f"Citizen not found: {slug}")
if not include_admin_url:
citizen["admin_panel_url"] = None
return PublicCitizenProfile(**citizen)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get public citizen {slug}: {e}")
raise HTTPException(status_code=500, detail="Failed to get citizen")
@public_router.get("/citizens/{slug}/interaction", response_model=CitizenInteractionInfo)
async def get_citizen_interaction_info(slug: str):
"""Отримати інформацію для взаємодії з громадянином"""
try:
agent = await repo_city.get_public_agent_by_slug(slug)
if not agent:
raise HTTPException(status_code=404, detail=f"Citizen not found: {slug}")
matrix_config = await repo_city.get_agent_matrix_config(agent["id"])
matrix_user_id = matrix_config.get("matrix_user_id") if matrix_config else None
primary_room_slug = agent.get("public_primary_room_slug") or agent.get("primary_room_slug")
primary_room_id = matrix_config.get("primary_room_id") if matrix_config else None
primary_room_name = None
room_record = None
if primary_room_id:
room_record = await repo_city.get_room_by_id(primary_room_id)
elif primary_room_slug:
room_record = await repo_city.get_room_by_slug(primary_room_slug)
if room_record:
primary_room_id = room_record.get("id")
primary_room_name = room_record.get("name")
primary_room_slug = room_record.get("slug") or primary_room_slug
microdao = await repo_city.get_microdao_for_agent(agent["id"])
return CitizenInteractionInfo(
slug=slug,
display_name=agent["display_name"],
primary_room_slug=primary_room_slug,
primary_room_id=primary_room_id,
primary_room_name=primary_room_name,
matrix_user_id=matrix_user_id,
district=agent.get("public_district"),
microdao_slug=microdao.get("slug") if microdao else None,
microdao_name=microdao.get("name") if microdao else None,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get interaction info for citizen {slug}: {e}")
raise HTTPException(status_code=500, detail="Failed to load interaction info")
@public_router.post("/citizens/{slug}/ask", response_model=CitizenAskResponse)
async def ask_citizen(
slug: str,
payload: CitizenAskRequest,
router_client: DagiRouterClient = Depends(get_dagi_router_client),
):
"""Надіслати запитання громадянину через DAGI Router"""
question = (payload.question or "").strip()
if not question:
raise HTTPException(status_code=400, detail="Question is required")
try:
agent = await repo_city.get_public_agent_by_slug(slug)
if not agent:
raise HTTPException(status_code=404, detail=f"Citizen not found: {slug}")
router_response = await router_client.ask_agent(
agent_id=agent["id"],
prompt=question,
system_prompt=payload.context,
)
answer = (
router_response.get("response")
or router_response.get("answer")
or router_response.get("result")
)
if answer:
answer = answer.strip()
if not answer:
answer = "Вибач, агент наразі не може відповісти."
return CitizenAskResponse(
answer=answer,
agent_display_name=agent["display_name"],
agent_id=agent["id"],
)
except HTTPException:
raise
except httpx.HTTPError as e:
logger.error(f"DAGI Router request failed for citizen {slug}: {e}")
raise HTTPException(status_code=502, detail="Citizen is temporarily unavailable")
except Exception as e:
logger.error(f"Failed to ask citizen {slug}: {e}")
raise HTTPException(status_code=500, detail="Failed to ask citizen")
# =============================================================================
# API v1 — Node Dashboard
# =============================================================================
@api_router.get("/nodes/{node_id}/dashboard")
async def get_node_dashboard(node_id: str):
"""
Отримати мінімальний Dashboard для ноди (MVP).
Повертає базову інформацію + placeholder для метрик:
- node_id, name, kind, status
- tags (roles)
- agents_total, agents_online
- uptime (null — placeholder)
- metrics_available (false — placeholder)
"""
try:
node = await repo_city.get_node_by_id(node_id)
if not node:
raise HTTPException(status_code=404, detail=f"Node not found: {node_id}")
return {
"node_id": node["node_id"],
"name": node["name"],
"kind": node.get("kind"),
"status": node.get("status", "unknown"),
"tags": list(node.get("roles") or []),
"agents_total": node.get("agents_total", 0),
"agents_online": node.get("agents_online", 0),
"uptime": None, # placeholder — буде заповнено коли з'явиться Prometheus/NATS
"metrics_available": False # прапорець, що розширеного дашборду ще немає
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get node dashboard for {node_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to get node dashboard")
# Legacy endpoint for frontend compatibility
@api_router.get("/node/dashboard")
async def get_node_dashboard_legacy(node_id: str = Query(..., description="Node ID")):
"""
Legacy endpoint: /api/v1/node/dashboard?nodeId=...
Redirects to canonical /api/v1/nodes/{node_id}/dashboard
"""
return await get_node_dashboard(node_id)
# =============================================================================
# API v1 — MicroDAO Membership
# =============================================================================
@api_router.get("/microdao/options")
async def get_microdao_options():
"""Отримати список MicroDAO для селектора"""
try:
options = await repo_city.get_microdao_options()
items = [MicrodaoOption(**option) for option in options]
return {"items": items}
except Exception as e:
logger.error(f"Failed to get microdao options: {e}")
raise HTTPException(status_code=500, detail="Failed to get microdao options")
@api_router.put("/agents/{agent_id}/microdao-membership")
async def assign_agent_microdao_membership(
agent_id: str,
payload: MicrodaoMembershipPayload,
authorization: Optional[str] = Header(None)
):
"""Призначити/оновити членство агента в MicroDAO"""
await ensure_architect_or_admin(authorization)
try:
membership = await repo_city.upsert_agent_microdao_membership(
agent_id=agent_id,
microdao_id=payload.microdao_id,
role=payload.role,
is_core=payload.is_core
)
if not membership:
raise HTTPException(status_code=404, detail="MicroDAO not found")
return membership
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to assign microdao membership: {e}")
raise HTTPException(status_code=500, detail="Failed to assign microdao membership")
@api_router.delete("/agents/{agent_id}/microdao-membership/{microdao_id}")
async def delete_agent_microdao_membership(
agent_id: str,
microdao_id: str,
authorization: Optional[str] = Header(None)
):
"""Видалити членство агента в MicroDAO"""
await ensure_architect_or_admin(authorization)
try:
deleted = await repo_city.remove_agent_microdao_membership(agent_id, microdao_id)
if not deleted:
raise HTTPException(status_code=404, detail="Membership not found")
return {"status": "deleted"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete microdao membership: {e}")
raise HTTPException(status_code=500, detail="Failed to delete microdao membership")
# =============================================================================
# City Rooms API
# =============================================================================
@router.post("/rooms/sync/matrix")
async def sync_rooms_with_matrix():
"""
Sync all rooms with Matrix.
Creates Matrix rooms for any room without matrix_room_id.
This is an admin endpoint for initial setup or recovery.
"""
try:
# Check Matrix Gateway health
gateway_ok = await check_matrix_gateway_health()
if not gateway_ok:
raise HTTPException(status_code=503, detail="Matrix Gateway unavailable")
# Get all rooms without Matrix ID
rooms = await repo_city.get_rooms_without_matrix()
results = {
"synced": [],
"failed": [],
"skipped": []
}
for room in rooms:
room_slug = room.get("slug")
room_name = room.get("name")
if not room_slug or not room_name:
results["skipped"].append({"id": room.get("id"), "reason": "missing slug or name"})
continue
try:
# Create Matrix room
matrix_room_id, matrix_room_alias = await create_matrix_room(
slug=room_slug,
name=room_name,
visibility="public" if room.get("is_public", True) else "private"
)
if matrix_room_id:
# Update room with Matrix ID
await repo_city.update_room_matrix(
room_id=room.get("id"),
matrix_room_id=matrix_room_id,
matrix_room_alias=matrix_room_alias
)
results["synced"].append({
"id": room.get("id"),
"slug": room_slug,
"matrix_room_id": matrix_room_id
})
logger.info(f"Synced room {room_slug} with Matrix: {matrix_room_id}")
else:
results["failed"].append({
"id": room.get("id"),
"slug": room_slug,
"reason": "Matrix room creation failed"
})
except Exception as e:
logger.error(f"Failed to sync room {room_slug}: {e}")
results["failed"].append({
"id": room.get("id"),
"slug": room_slug,
"reason": str(e)
})
return {
"status": "completed",
"total_rooms": len(rooms),
"synced_count": len(results["synced"]),
"failed_count": len(results["failed"]),
"skipped_count": len(results["skipped"]),
"details": results
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to sync rooms with Matrix: {e}")
raise HTTPException(status_code=500, detail=f"Failed to sync rooms: {str(e)}")
@router.get("/rooms", response_model=List[CityRoomRead])
async def get_city_rooms(limit: int = 100, offset: int = 0):
"""
Отримати список всіх City Rooms
"""
try:
rooms = await repo_city.get_all_rooms(limit=limit, offset=offset)
# Додати online count (приблизно)
online_count = await PresenceRedis.get_online_count()
result = []
for room in rooms:
result.append({
**room,
"members_online": online_count if room.get("is_default") else max(1, online_count // 2),
"last_event": None # TODO: з останнього повідомлення
})
return result
except Exception as e:
logger.error(f"Failed to get city rooms: {e}")
raise HTTPException(status_code=500, detail="Failed to get city rooms")
@router.post("/rooms", response_model=CityRoomRead)
async def create_city_room(payload: CityRoomCreate):
"""
Створити нову City Room (автоматично створює Matrix room)
"""
try:
# TODO: витягнути user_id з JWT
created_by = "u_system" # Mock для MVP
# Перевірити чи не існує вже
existing = await repo_city.get_room_by_slug(payload.slug)
if existing:
raise HTTPException(status_code=409, detail="Room with this slug already exists")
# Створити Matrix room
matrix_room_id, matrix_room_alias = await create_matrix_room(
slug=payload.slug,
name=payload.name,
visibility="public"
)
if not matrix_room_id:
logger.warning(f"Failed to create Matrix room for {payload.slug}, proceeding without Matrix")
room = await repo_city.create_room(
slug=payload.slug,
name=payload.name,
description=payload.description,
created_by=created_by,
matrix_room_id=matrix_room_id,
matrix_room_alias=matrix_room_alias
)
# Додати початкове повідомлення
await repo_city.create_room_message(
room_id=room["id"],
body=f"Кімната '{payload.name}' створена! Ласкаво просимо! 🎉",
author_agent_id="ag_system"
)
# Додати в feed
await repo_city.create_feed_event(
kind="system",
room_id=room["id"],
payload={"action": "room_created", "room_name": payload.name, "matrix_room_id": matrix_room_id}
)
return {**room, "members_online": 1, "last_event": None}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to create city room: {e}")
raise HTTPException(status_code=500, detail="Failed to create city room")
@api_router.get("/city/rooms")
async def get_city_rooms_api():
"""
Отримати список всіх City Rooms для API.
"""
try:
rooms = await repo_city.get_all_rooms(limit=100, offset=0)
result = []
for room in rooms:
result.append({
"id": room.get("id"),
"slug": room.get("slug"),
"name": room.get("name"),
"description": room.get("description"),
"scope": room.get("owner_type", "city"),
"matrix_room_id": room.get("matrix_room_id"),
"is_public": room.get("is_public", True),
"room_role": room.get("room_role"),
"members_online": room.get("members_online") or 0,
"zone": room.get("zone"),
"room_type": room.get("room_type", "city"),
})
return result
except Exception as e:
logger.error(f"Failed to get city rooms: {e}")
raise HTTPException(status_code=500, detail="Failed to get city rooms")
@api_router.get("/city/rooms/{slug}")
async def get_city_room_by_slug(slug: str):
"""
Отримати деталі City Room за slug.
"""
try:
room = await repo_city.get_room_by_slug(slug)
if not room:
raise HTTPException(status_code=404, detail=f"Room not found: {slug}")
# Get host agents for this room (public agents assigned to city rooms)
host_agents = []
try:
# Get DARIO and DARIA as default hosts for city rooms
default_hosts = ["dario", "daria", "daarwizz"]
for agent_id in default_hosts:
agent = await repo_city.get_agent_by_id(agent_id)
if agent:
host_agents.append({
"id": agent.get("id"),
"display_name": agent.get("display_name"),
"avatar_url": agent.get("avatar_url"),
"kind": agent.get("kind"),
"role": "host"
})
except Exception as e:
logger.warning(f"Failed to get host agents for room {slug}: {e}")
return {
"id": room.get("id"),
"slug": room.get("slug"),
"name": room.get("name"),
"description": room.get("description"),
"scope": room.get("owner_type", "city"),
"matrix_room_id": room.get("matrix_room_id"),
"matrix_room_alias": room.get("matrix_room_alias"),
"is_public": room.get("is_public", True),
"room_role": room.get("room_role"),
"host_agents": host_agents,
"chat_available": room.get("matrix_room_id") is not None
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get city room by slug {slug}: {e}")
raise HTTPException(status_code=500, detail="Failed to get city room")
# =============================================================================
# Districts API (DB-based, no hardcodes)
# =============================================================================
@api_router.get("/districts")
async def get_districts():
"""
Отримати список всіх District-ів.
District = microdao з dao_type = 'district'
"""
try:
districts = await repo_city.get_districts()
result = []
for d in districts:
# Get lead agent for each district
lead_agent = await repo_city.get_district_lead_agent(d["id"])
# Get rooms count
rooms = await repo_city.get_district_rooms(d["slug"])
result.append({
"id": d["id"],
"slug": d["slug"],
"name": d["name"],
"description": d.get("description"),
"dao_type": d["dao_type"],
"lead_agent": {
"id": lead_agent["id"],
"name": lead_agent["name"],
"avatar_url": lead_agent.get("avatar_url")
} if lead_agent else None,
"rooms_count": len(rooms),
"rooms": [{"id": r["id"], "slug": r["slug"], "name": r["name"]} for r in rooms[:3]]
})
return result
except Exception as e:
logger.error(f"Failed to get districts: {e}")
raise HTTPException(status_code=500, detail="Failed to get districts")
@api_router.get("/districts/{slug}")
async def get_district_detail(slug: str):
"""
Отримати деталі District-а за slug.
"""
try:
district = await repo_city.get_district_by_slug(slug)
if not district:
raise HTTPException(status_code=404, detail=f"District not found: {slug}")
# Get lead agent
lead_agent = await repo_city.get_district_lead_agent(district["id"])
# Get core team
core_team = await repo_city.get_district_core_team(district["id"])
# Get all agents
agents = await repo_city.get_district_agents(district["id"])
# Get rooms
rooms = await repo_city.get_district_rooms(district["slug"])
# Get nodes
nodes = await repo_city.get_district_nodes(district["id"])
# Get stats
stats = await repo_city.get_district_stats(district["id"], district["slug"])
return {
"district": {
"id": district["id"],
"slug": district["slug"],
"name": district["name"],
"description": district.get("description"),
"dao_type": district["dao_type"]
},
"lead_agent": {
"id": lead_agent["id"],
"name": lead_agent["name"],
"kind": lead_agent.get("kind"),
"status": lead_agent.get("status"),
"avatar_url": lead_agent.get("avatar_url"),
"gov_level": lead_agent.get("gov_level")
} if lead_agent else None,
"core_team": [
{
"id": a["id"],
"name": a["name"],
"kind": a.get("kind"),
"status": a.get("status"),
"avatar_url": a.get("avatar_url"),
"role": a.get("membership_role")
} for a in core_team
],
"agents": [
{
"id": a["id"],
"name": a["name"],
"kind": a.get("kind"),
"status": a.get("status"),
"avatar_url": a.get("avatar_url"),
"role": a.get("membership_role"),
"is_core": a.get("is_core", False)
} for a in agents
],
"rooms": [
{
"id": r["id"],
"slug": r["slug"],
"name": r["name"],
"description": r.get("description"),
"matrix_room_id": r.get("matrix_room_id"),
"room_role": r.get("room_role"),
"is_public": r.get("is_public", True)
} for r in rooms
],
"nodes": [
{
"id": n["id"],
"name": n["name"],
"kind": n.get("kind"),
"status": n.get("status"),
"location": n.get("location")
} for n in nodes
],
"stats": stats
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get district {slug}: {e}")
raise HTTPException(status_code=500, detail="Failed to get district")
@router.get("/rooms/{room_id}", response_model=CityRoomDetail)
async def get_city_room(room_id: str):
"""
Отримати деталі City Room з повідомленнями
"""
try:
room = await repo_city.get_room_by_id(room_id)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
messages = await repo_city.get_room_messages(room_id, limit=50)
# Додати username до повідомлень
for msg in messages:
if msg.get("author_user_id"):
msg["username"] = f"User-{msg['author_user_id'][-4:]}" # Mock
elif msg.get("author_agent_id"):
msg["username"] = "System Agent"
else:
msg["username"] = "Anonymous"
online_users = await PresenceRedis.get_all_online()
return {
**room,
"members_online": len(online_users),
"last_event": None,
"messages": messages,
"online_members": online_users[:20] # Перші 20
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get city room: {e}")
raise HTTPException(status_code=500, detail="Failed to get city room")
@router.post("/rooms/{room_id}/messages", response_model=CityRoomMessageRead)
async def send_city_room_message(room_id: str, payload: CityRoomMessageCreate):
"""
Надіслати повідомлення в City Room
"""
try:
# Перевірити чи кімната існує
room = await repo_city.get_room_by_id(room_id)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
# TODO: витягнути user_id з JWT
author_user_id = "u_mock_user" # Mock для MVP
# Створити повідомлення
message = await repo_city.create_room_message(
room_id=room_id,
body=payload.body,
author_user_id=author_user_id
)
# Додати в feed
await repo_city.create_feed_event(
kind="room_message",
room_id=room_id,
user_id=author_user_id,
payload={"body": payload.body[:100], "message_id": message["id"]}
)
# TODO: Broadcast WS event
# await ws_manager.broadcast_to_room(room_id, {
# "event": "room.message",
# "message": message
# })
# Додати username
message["username"] = f"User-{author_user_id[-4:]}"
return message
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to send room message: {e}")
raise HTTPException(status_code=500, detail="Failed to send message")
@router.post("/rooms/{room_id}/join")
async def join_city_room(room_id: str):
"""
Приєднатися до City Room (для tracking)
"""
# TODO: витягнути user_id з JWT
user_id = "u_mock_user"
# Для MVP просто повертаємо success
# У production можна зберігати active memberships в Redis
logger.info(f"User {user_id} joined room {room_id}")
return {"status": "joined", "room_id": room_id}
@router.post("/rooms/{room_id}/leave")
async def leave_city_room(room_id: str):
"""
Покинути City Room
"""
# TODO: витягнути user_id з JWT
user_id = "u_mock_user"
logger.info(f"User {user_id} left room {room_id}")
return {"status": "left", "room_id": room_id}
# =============================================================================
# Matrix Backfill API (Internal)
# =============================================================================
@router.post("/matrix/backfill")
async def backfill_matrix_rooms():
"""
Backfill Matrix rooms for existing City Rooms that don't have Matrix integration.
This is an internal endpoint for admin use.
"""
try:
rooms_without_matrix = await repo_city.get_rooms_without_matrix()
results = {
"processed": 0,
"created": 0,
"found": 0,
"failed": 0,
"details": []
}
for room in rooms_without_matrix:
results["processed"] += 1
slug = room["slug"]
name = room["name"]
room_id = room["id"]
# Спочатку спробувати знайти існуючу Matrix room
alias = f"#city_{slug}:daarion.space"
matrix_room_id, matrix_room_alias = await find_matrix_room_by_alias(alias)
if matrix_room_id:
# Знайдено існуючу
await repo_city.update_room_matrix(room_id, matrix_room_id, matrix_room_alias)
results["found"] += 1
results["details"].append({
"room_id": room_id,
"slug": slug,
"status": "found",
"matrix_room_id": matrix_room_id
})
else:
# Створити нову
matrix_room_id, matrix_room_alias = await create_matrix_room(slug, name, "public")
if matrix_room_id:
await repo_city.update_room_matrix(room_id, matrix_room_id, matrix_room_alias)
results["created"] += 1
results["details"].append({
"room_id": room_id,
"slug": slug,
"status": "created",
"matrix_room_id": matrix_room_id
})
else:
results["failed"] += 1
results["details"].append({
"room_id": room_id,
"slug": slug,
"status": "failed",
"error": "Could not create Matrix room"
})
logger.info(f"Matrix backfill completed: {results['processed']} processed, "
f"{results['created']} created, {results['found']} found, {results['failed']} failed")
return results
except Exception as e:
logger.error(f"Matrix backfill failed: {e}")
raise HTTPException(status_code=500, detail=f"Backfill failed: {str(e)}")
# =============================================================================
# Chat Bootstrap API (Matrix Integration)
# =============================================================================
async def validate_jwt_token(authorization: str) -> Optional[dict]:
"""Validate JWT token via auth-service introspect endpoint."""
if not authorization or not authorization.startswith("Bearer "):
return None
token = authorization.replace("Bearer ", "")
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.post(
f"{AUTH_SERVICE_URL}/api/auth/introspect",
json={"token": token}
)
if resp.status_code == 200:
data = resp.json()
if data.get("active"):
return {"user_id": data.get("sub"), "email": data.get("email"), "roles": data.get("roles", [])}
return None
except Exception as e:
logger.error(f"JWT validation error: {e}")
return None
async def ensure_architect_or_admin(authorization: Optional[str]) -> dict:
"""Переконатися, що користувач має роль architect/admin"""
if not authorization:
raise HTTPException(status_code=403, detail="Missing authorization token")
user_info = await validate_jwt_token(authorization)
if not user_info:
raise HTTPException(status_code=403, detail="Invalid authorization token")
roles = user_info.get("roles", [])
if not any(role in ["admin", "architect"] for role in roles):
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user_info
@router.get("/chat/bootstrap")
async def chat_bootstrap(
room_slug: str = Query(..., description="City room slug"),
authorization: Optional[str] = Header(None)
):
"""
Bootstrap Matrix chat for a city room.
Returns Matrix credentials and room info for the authenticated user.
"""
# Validate JWT
user_info = await validate_jwt_token(authorization)
if not user_info:
raise HTTPException(status_code=401, detail="Invalid or missing authorization token")
user_id = user_info.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token: missing user_id")
# Get room by slug
room = await repo_city.get_room_by_slug(room_slug)
if not room:
raise HTTPException(status_code=404, detail=f"Room '{room_slug}' not found")
# Check if room has Matrix integration
matrix_room_id = room.get("matrix_room_id")
matrix_room_alias = room.get("matrix_room_alias")
if not matrix_room_id:
raise HTTPException(
status_code=400,
detail="Room does not have Matrix integration. Run /city/matrix/backfill first."
)
# Get Matrix user token from matrix-gateway
async with httpx.AsyncClient(timeout=30.0) as client:
try:
token_resp = await client.post(
f"{MATRIX_GATEWAY_URL}/internal/matrix/users/token",
json={"user_id": user_id}
)
if token_resp.status_code != 200:
error = token_resp.json()
logger.error(f"Failed to get Matrix token: {error}")
raise HTTPException(status_code=500, detail="Failed to get Matrix credentials")
matrix_creds = token_resp.json()
except httpx.RequestError as e:
logger.error(f"Matrix gateway request error: {e}")
raise HTTPException(status_code=503, detail="Matrix service unavailable")
# Return bootstrap data
return {
"matrix_hs_url": f"https://app.daarion.space", # Through nginx proxy
"matrix_user_id": matrix_creds["matrix_user_id"],
"matrix_access_token": matrix_creds["access_token"],
"matrix_device_id": matrix_creds["device_id"],
"matrix_room_id": matrix_room_id,
"matrix_room_alias": matrix_room_alias,
"room": {
"id": room["id"],
"slug": room["slug"],
"name": room["name"],
"description": room.get("description")
}
}
# =============================================================================
# Chat Room API (TASK_PHASE_AGENT_CHAT_WIDGET_MVP)
# =============================================================================
@api_router.get("/agents/{agent_id}/chat-room")
async def get_agent_chat_room(agent_id: str):
"""
Отримати інформацію про кімнату чату для агента.
Автоматично створює Matrix кімнату якщо її немає.
"""
try:
agent = await repo_city.get_agent_by_id(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent not found: {agent_id}")
# Get agent's primary room or create room slug
agent_slug = agent.get('public_slug') or agent_id.replace('ag_', '').replace('-', '_')
room_slug = f"agent-console-{agent_slug}"
room = await repo_city.get_room_by_slug(room_slug)
# If room doesn't exist in DB, create it
if not room:
# Create room in DB
room = await repo_city.create_room(
slug=room_slug,
name=f"{agent.get('display_name')} Console",
description=f"Chat room for agent {agent.get('display_name')}",
created_by="system"
)
logger.info(f"Created agent chat room in DB: {room_slug}")
# Check if room has Matrix ID
matrix_room_id = room.get("matrix_room_id") if room else None
# If no Matrix room, try to create one
if not matrix_room_id:
try:
matrix_room_id, matrix_room_alias = await ensure_room_has_matrix(
room_slug=room_slug,
room_name=f"{agent.get('display_name')} Console",
visibility="private"
)
if matrix_room_id and room:
await repo_city.update_room_matrix(
room_id=room.get("id"),
matrix_room_id=matrix_room_id,
matrix_room_alias=matrix_room_alias
)
logger.info(f"Created Matrix room for agent {agent_id}: {matrix_room_id}")
except Exception as e:
logger.warning(f"Could not create Matrix room for agent {agent_id}: {e}")
return {
"agent_id": agent_id,
"agent_display_name": agent.get("display_name"),
"agent_avatar_url": agent.get("avatar_url"),
"agent_status": agent.get("status", "offline"),
"agent_kind": agent.get("kind"),
"room_slug": room_slug,
"room_id": room.get("id") if room else None,
"matrix_room_id": matrix_room_id,
"chat_available": matrix_room_id is not None
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get agent chat room for {agent_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to get agent chat room")
@api_router.get("/nodes/{node_id}/chat-room")
async def get_node_chat_room(node_id: str):
"""
Отримати інформацію про кімнату чату для ноди.
Автоматично створює Matrix кімнату якщо її немає.
"""
try:
node = await repo_city.get_node_by_id(node_id)
if not node:
raise HTTPException(status_code=404, detail=f"Node not found: {node_id}")
# Get node support room
node_slug = node_id.replace("node-", "").replace("-", "_")
room_slug = f"node-support-{node_slug}"
room = await repo_city.get_room_by_slug(room_slug)
# If room doesn't exist in DB, create it
if not room:
room = await repo_city.create_room(
slug=room_slug,
name=f"{node.get('name', node_id)} Support",
description=f"Support room for node {node.get('name', node_id)}",
created_by="system"
)
logger.info(f"Created node support room in DB: {room_slug}")
# Check if room has Matrix ID
matrix_room_id = room.get("matrix_room_id") if room else None
# If no Matrix room, try to create one
if not matrix_room_id:
try:
matrix_room_id, matrix_room_alias = await ensure_room_has_matrix(
room_slug=room_slug,
room_name=f"{node.get('name', node_id)} Support",
visibility="private"
)
if matrix_room_id and room:
await repo_city.update_room_matrix(
room_id=room.get("id"),
matrix_room_id=matrix_room_id,
matrix_room_alias=matrix_room_alias
)
logger.info(f"Created Matrix room for node {node_id}: {matrix_room_id}")
except Exception as e:
logger.warning(f"Could not create Matrix room for node {node_id}: {e}")
# Get guardian and steward agents
guardian_agent = None
steward_agent = None
if node.get("guardian_agent"):
guardian_agent = {
"id": node["guardian_agent"].get("id"),
"display_name": node["guardian_agent"].get("name"),
"kind": node["guardian_agent"].get("kind"),
"role": "guardian"
}
if node.get("steward_agent"):
steward_agent = {
"id": node["steward_agent"].get("id"),
"display_name": node["steward_agent"].get("name"),
"kind": node["steward_agent"].get("kind"),
"role": "steward"
}
return {
"node_id": node_id,
"node_name": node.get("name"),
"node_status": node.get("status", "offline"),
"room_slug": room_slug,
"room_id": room.get("id") if room else None,
"matrix_room_id": matrix_room_id,
"chat_available": matrix_room_id is not None,
"agents": [a for a in [guardian_agent, steward_agent] if a is not None]
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get node chat room for {node_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to get node chat room")
@api_router.get("/microdaos/{slug}/chat-room")
async def get_microdao_chat_room(slug: str):
"""
Отримати інформацію про кімнату чату для MicroDAO.
Автоматично створює Matrix кімнату якщо її немає.
"""
try:
dao = await repo_city.get_microdao_by_slug(slug)
if not dao:
raise HTTPException(status_code=404, detail=f"MicroDAO not found: {slug}")
# Get MicroDAO lobby room (prefer {slug}-lobby pattern)
room_slug = f"{slug}-lobby"
room = await repo_city.get_room_by_slug(room_slug)
# If no lobby room, try to get primary room from existing rooms
if not room:
rooms = await repo_city.get_microdao_rooms(dao["id"])
if rooms and len(rooms) > 0:
# Find primary room or first room
primary = next((r for r in rooms if r.get("room_role") == "primary"), rooms[0])
room = primary
room_slug = room.get("slug", room_slug)
# If still no room, create one
if not room:
room = await repo_city.create_room(
slug=room_slug,
name=f"{dao.get('name', slug)} Lobby",
description=f"Lobby for {dao.get('name', slug)} MicroDAO",
created_by="system"
)
logger.info(f"Created MicroDAO lobby room in DB: {room_slug}")
# Check if room has Matrix ID
matrix_room_id = room.get("matrix_room_id") if room else None
# If no Matrix room, try to create one
if not matrix_room_id:
try:
matrix_room_id, matrix_room_alias = await ensure_room_has_matrix(
room_slug=room_slug,
room_name=f"{dao.get('name', slug)} Lobby",
visibility="public"
)
if matrix_room_id and room:
await repo_city.update_room_matrix(
room_id=room.get("id"),
matrix_room_id=matrix_room_id,
matrix_room_alias=matrix_room_alias
)
logger.info(f"Created Matrix room for MicroDAO {slug}: {matrix_room_id}")
except Exception as e:
logger.warning(f"Could not create Matrix room for MicroDAO {slug}: {e}")
# Get orchestrator agent
orchestrator = None
orchestrator_id = dao.get("orchestrator_agent_id")
if orchestrator_id:
orch_agent = await repo_city.get_agent_by_id(orchestrator_id)
if orch_agent:
orchestrator = {
"id": orchestrator_id,
"display_name": orch_agent.get("display_name"),
"avatar_url": orch_agent.get("avatar_url"),
"kind": orch_agent.get("kind"),
"role": "orchestrator"
}
return {
"microdao_id": dao["id"],
"microdao_slug": slug,
"microdao_name": dao.get("name"),
"room_slug": room_slug,
"room_id": room.get("id") if room else None,
"matrix_room_id": matrix_room_id,
"chat_available": matrix_room_id is not None,
"orchestrator": orchestrator
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get microdao chat room for {slug}: {e}")
raise HTTPException(status_code=500, detail="Failed to get microdao chat room")
# =============================================================================
# Chat API (TASK_PHASE_MATRIX_FINALIZE_v2)
# =============================================================================
class SendMessageRequest(BaseModel):
body: str
sender: Optional[str] = None
class ChatMessage(BaseModel):
event_id: str
sender: str
body: str
timestamp: int
@api_router.get("/chat/rooms/{room_id}/messages")
async def get_chat_messages(room_id: str, limit: int = 50):
"""
Отримати історію повідомлень з Matrix кімнати.
room_id може бути slug або UUID кімнати.
"""
try:
# Get room from DB
room = await repo_city.get_room_by_id(room_id)
if not room:
# Try by slug
room = await repo_city.get_room_by_slug(room_id)
if not room:
raise HTTPException(status_code=404, detail=f"Room not found: {room_id}")
matrix_room_id = room.get("matrix_room_id")
if not matrix_room_id:
raise HTTPException(status_code=400, detail="Room has no Matrix integration")
# Get messages from Matrix via gateway
messages = await get_room_messages(matrix_room_id, limit=limit)
return {
"room_id": room.get("id"),
"room_slug": room.get("slug"),
"matrix_room_id": matrix_room_id,
"messages": messages,
"count": len(messages)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get chat messages for {room_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to get chat messages")
@api_router.post("/chat/rooms/{room_id}/messages")
async def send_chat_message(room_id: str, payload: SendMessageRequest):
"""
Надіслати повідомлення в Matrix кімнату.
room_id може бути slug або UUID кімнати.
"""
try:
# Get room from DB
room = await repo_city.get_room_by_id(room_id)
if not room:
# Try by slug
room = await repo_city.get_room_by_slug(room_id)
if not room:
raise HTTPException(status_code=404, detail=f"Room not found: {room_id}")
matrix_room_id = room.get("matrix_room_id")
if not matrix_room_id:
raise HTTPException(status_code=400, detail="Room has no Matrix integration")
# Send message via gateway
event_id = await send_message_to_room(
room_id=matrix_room_id,
body=payload.body,
sender=payload.sender
)
if not event_id:
raise HTTPException(status_code=500, detail="Failed to send message to Matrix")
return {
"ok": True,
"event_id": event_id,
"room_id": room.get("id"),
"matrix_room_id": matrix_room_id
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to send chat message to {room_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to send chat message")
# =============================================================================
# Presence API (TASK_PHASE_AGENT_PRESENCE_INDICATORS_MVP)
# =============================================================================
@api_router.get("/agents/presence")
async def get_agents_presence():
"""
Отримати presence статус всіх активних агентів.
Повертає Matrix presence + DAGI router health.
"""
try:
# Get all agents from DB
agents = await repo_city.list_agent_summaries(limit=1000)
# Get Matrix presence from matrix-presence-aggregator
matrix_presence = await get_matrix_presence_status()
# Get DAGI router health (simplified for MVP)
dagi_health = await get_dagi_router_health()
# Combine presence data
presence_data = []
for agent in agents:
agent_id = agent["id"]
node_id = agent.get("node_id")
# Matrix presence
matrix_status = matrix_presence.get(agent_id, {}).get("status", "offline")
last_seen = matrix_presence.get(agent_id, {}).get("last_seen")
# DAGI router presence (node-level)
dagi_status = "unknown"
if node_id and node_id in dagi_health:
dagi_status = dagi_health[node_id].get("router_status", "unknown")
presence_data.append({
"agent_id": agent_id,
"display_name": agent.get("display_name", agent_id),
"matrix_presence": matrix_status,
"dagi_router_presence": dagi_status,
"last_seen": last_seen,
"node_id": node_id
})
return {"presence": presence_data}
except Exception as e:
logger.error(f"Failed to get agents presence: {e}")
raise HTTPException(status_code=500, detail="Failed to get agents presence")
@api_router.get("/agents/{agent_id}/presence")
async def get_single_agent_presence(agent_id: str):
"""
Отримати presence статус одного агента.
Використовує Matrix Gateway для отримання реального статусу.
"""
try:
# Get agent from DB
agent = await repo_city.get_agent_by_id(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent not found: {agent_id}")
# Get Matrix user ID for agent (or generate it)
# Pattern: @agent_{slug}:daarion.space
agent_slug = agent.get("public_slug") or agent_id.replace("ag_", "").replace("-", "_")
matrix_user_id = f"@agent_{agent_slug}:daarion.space"
# Get presence from Matrix Gateway
presence_data = await get_matrix_presence_for_user(matrix_user_id)
return {
"agent_id": agent_id,
"display_name": agent.get("display_name"),
"matrix_user_id": matrix_user_id,
"presence": presence_data.get("presence", "offline"),
"last_active_ago_ms": presence_data.get("last_active_ago_ms"),
"status_msg": presence_data.get("status_msg")
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get agent presence for {agent_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to get agent presence")
async def get_matrix_presence_for_user(matrix_user_id: str) -> dict:
"""
Get Matrix presence for a single user via Matrix Gateway.
"""
try:
gateway_url = os.getenv("MATRIX_GATEWAY_URL", "http://daarion-matrix-gateway:7025")
async with httpx.AsyncClient(timeout=5.0) as client:
# URL encode the matrix user ID
encoded_mxid = matrix_user_id.replace("@", "%40").replace(":", "%3A")
response = await client.get(f"{gateway_url}/internal/matrix/presence/{encoded_mxid}")
if response.status_code == 200:
return response.json()
else:
logger.warning(f"Matrix gateway presence returned {response.status_code}")
return {"presence": "offline"}
except Exception as e:
logger.warning(f"Failed to get Matrix presence for {matrix_user_id}: {e}")
return {"presence": "offline"}
async def get_matrix_presence_status():
"""
Get Matrix presence from matrix-presence-aggregator.
Returns dict: agent_id -> {status: 'online'|'unavailable'|'offline', last_seen: timestamp}
"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get("http://matrix-presence-aggregator:8080/api/presence/agents")
if response.status_code == 200:
data = response.json()
return data.get("agents", {})
else:
logger.warning(f"Matrix presence aggregator returned {response.status_code}")
return {}
except Exception as e:
logger.warning(f"Failed to get Matrix presence: {e}")
return {}
async def get_dagi_router_health():
"""
Get DAGI router health from node-registry or direct ping.
Returns dict: node_id -> {router_status: 'healthy'|'degraded'|'offline'}
"""
try:
# Try to get from node-registry first
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get("http://dagi-node-registry:9205/api/v1/health")
if response.status_code == 200:
data = response.json()
return data.get("nodes", {})
else:
logger.warning(f"Node registry returned {response.status_code}")
except Exception as e:
logger.warning(f"Failed to get DAGI router health from node-registry: {e}")
# Fallback: try direct ping to known nodes
try:
known_nodes = ["node-1-hetzner-gex44", "node-2-macbook-m4max"]
health_data = {}
for node_id in known_nodes:
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.get(f"http://dagi-router-{node_id}:8080/health")
if response.status_code == 200:
health_data[node_id] = {"router_status": "healthy"}
else:
health_data[node_id] = {"router_status": "degraded"}
except Exception:
health_data[node_id] = {"router_status": "offline"}
return health_data
except Exception as e:
logger.warning(f"Failed to get DAGI router health: {e}")
return {}
# =============================================================================
# City Feed API
# =============================================================================
@router.get("/feed", response_model=List[CityFeedEventRead])
async def get_city_feed(limit: int = 20, offset: int = 0):
"""
Отримати City Feed (останні події)
"""
try:
events = await repo_city.get_feed_events(limit=limit, offset=offset)
return events
except Exception as e:
logger.error(f"Failed to get city feed: {e}")
raise HTTPException(status_code=500, detail="Failed to get city feed")
# =============================================================================
# City Map API (2D Map)
# =============================================================================
@router.get("/map", response_model=CityMapResponse)
async def get_city_map():
"""
Отримати дані для 2D мапи міста.
Повертає:
- config: розміри сітки та налаштування
- rooms: список кімнат з координатами
"""
try:
# Отримати конфігурацію
config_data = await repo_city.get_map_config()
config = CityMapConfig(
grid_width=config_data.get("grid_width", 6),
grid_height=config_data.get("grid_height", 3),
cell_size=config_data.get("cell_size", 100),
background_url=config_data.get("background_url")
)
# Отримати кімнати з координатами
rooms_data = await repo_city.get_rooms_for_map()
rooms = []
for room in rooms_data:
rooms.append(CityMapRoom(
id=room["id"],
slug=room["slug"],
name=room["name"],
description=room.get("description"),
room_type=room.get("room_type", "public"),
zone=room.get("zone", "central"),
icon=room.get("icon"),
color=room.get("color"),
x=room.get("map_x", 0),
y=room.get("map_y", 0),
w=room.get("map_w", 1),
h=room.get("map_h", 1),
matrix_room_id=room.get("matrix_room_id")
))
return CityMapResponse(config=config, rooms=rooms)
except Exception as e:
logger.error(f"Failed to get city map: {e}")
raise HTTPException(status_code=500, detail="Failed to get city map")
# =============================================================================
# Agents API
# =============================================================================
@router.put("/agents/{agent_id}/public-profile")
async def update_agent_public_profile(agent_id: str, request: Request):
"""
Оновити публічний профіль агента.
Тільки для Architect/Admin.
"""
try:
# Check agent exists
agent = await repo_city.get_agent_by_id(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent not found: {agent_id}")
# Parse body
body = await request.json()
is_public = body.get("is_public", False)
public_slug = body.get("public_slug")
public_title = body.get("public_title")
public_tagline = body.get("public_tagline")
public_skills = body.get("public_skills", [])
public_district = body.get("public_district")
public_primary_room_slug = body.get("public_primary_room_slug")
# Validate: if is_public, slug is required
if is_public and not public_slug:
raise HTTPException(status_code=400, detail="public_slug is required when is_public is true")
# Validate slug format
if public_slug:
import re
if not re.match(r'^[a-z0-9_-]+$', public_slug.lower()):
raise HTTPException(status_code=400, detail="public_slug must contain only lowercase letters, numbers, underscores, and hyphens")
# Validate skills (max 10, max 64 chars each)
if public_skills:
public_skills = [s[:64] for s in public_skills[:10]]
# Update
result = await repo_city.update_agent_public_profile(
agent_id=agent_id,
is_public=is_public,
public_slug=public_slug,
public_title=public_title,
public_tagline=public_tagline,
public_skills=public_skills,
public_district=public_district,
public_primary_room_slug=public_primary_room_slug
)
logger.info(f"Updated public profile for agent {agent_id}: is_public={is_public}, slug={public_slug}")
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update agent public profile: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail="Failed to update agent public profile")
@router.get("/citizens")
async def get_public_citizens_legacy(limit: int = 50, offset: int = 0):
"""
Отримати список публічних громадян DAARION City.
"""
try:
citizens, total = await repo_city.get_public_citizens(limit=limit, offset=offset)
return {"citizens": citizens, "total": total}
except Exception as e:
logger.error(f"Failed to get public citizens: {e}")
raise HTTPException(status_code=500, detail="Failed to get public citizens")
@router.get("/citizens/{slug}")
async def get_citizen_by_slug(slug: str, request: Request):
"""
Отримати публічного громадянина за slug.
Для адмінів/архітекторів додається admin_panel_url.
"""
try:
include_admin_url = True # legacy endpoint доступний тільки з адмінської панелі
citizen = await repo_city.get_public_citizen_by_slug(slug)
if not citizen:
raise HTTPException(status_code=404, detail=f"Citizen not found: {slug}")
if not include_admin_url:
citizen["admin_panel_url"] = None
return citizen
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get citizen: {e}")
raise HTTPException(status_code=500, detail="Failed to get citizen")
@router.put("/agents/{agent_id}/prompts/{kind}")
async def update_agent_prompt(agent_id: str, kind: str, request: Request):
"""
Оновити системний промт агента.
Тільки для Architect/Admin.
kind: core | safety | governance | tools
"""
try:
# Validate kind
valid_kinds = ["core", "safety", "governance", "tools"]
if kind not in valid_kinds:
raise HTTPException(status_code=400, detail=f"Invalid kind. Must be one of: {valid_kinds}")
# Check agent exists
agent = await repo_city.get_agent_by_id(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent not found: {agent_id}")
# Parse body
body = await request.json()
content = body.get("content")
note = body.get("note")
if not content or not content.strip():
raise HTTPException(status_code=400, detail="Content is required")
# TODO: Get user from JWT and check permissions
# For now, use a placeholder
created_by = "ARCHITECT" # Will be replaced with actual user from auth
# Update prompt
result = await repo_city.update_agent_prompt(
agent_id=agent_id,
kind=kind,
content=content.strip(),
created_by=created_by,
note=note
)
logger.info(f"Updated {kind} prompt for agent {agent_id} to version {result['version']}")
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update agent prompt: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail="Failed to update agent prompt")
@router.get("/agents/{agent_id}/prompts/{kind}/history")
async def get_agent_prompt_history(agent_id: str, kind: str, limit: int = 10):
"""
Отримати історію версій промту агента.
"""
try:
valid_kinds = ["core", "safety", "governance", "tools"]
if kind not in valid_kinds:
raise HTTPException(status_code=400, detail=f"Invalid kind. Must be one of: {valid_kinds}")
history = await repo_city.get_agent_prompt_history(agent_id, kind, limit)
return {"agent_id": agent_id, "kind": kind, "history": history}
except Exception as e:
logger.error(f"Failed to get prompt history: {e}")
raise HTTPException(status_code=500, detail="Failed to get prompt history")
@router.get("/agents/{agent_id}/dashboard")
async def get_agent_dashboard(agent_id: str):
"""
Отримати повний dashboard агента (DAIS Profile + Node + Metrics)
"""
try:
# Get agent profile
agent = await repo_city.get_agent_by_id(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent not found: {agent_id}")
# Get agent's rooms
rooms = await repo_city.get_agent_rooms(agent_id)
# Build DAIS profile
profile = {
"agent_id": agent["id"],
"display_name": agent["display_name"],
"kind": agent.get("kind", "assistant"),
"model": agent.get("model"),
"avatar_url": agent.get("avatar_url"),
"status": agent.get("status", "offline"),
"node_id": agent.get("node_id"),
"is_public": agent.get("is_public", False),
"public_slug": agent.get("public_slug"),
"is_orchestrator": agent.get("is_orchestrator", False),
"primary_microdao_id": agent.get("primary_microdao_id"),
"primary_microdao_name": agent.get("primary_microdao_name"),
"primary_microdao_slug": agent.get("primary_microdao_slug"),
"crew_info": agent.get("crew_info"),
"roles": [agent.get("role")] if agent.get("role") else [],
"tags": [],
"dais": {
"core": {
"title": agent.get("display_name"),
"bio": f"{agent.get('kind', 'assistant').title()} agent in DAARION",
"version": "1.0.0"
},
"vis": {
"avatar_url": agent.get("avatar_url"),
"color_primary": agent.get("color", "#22D3EE")
},
"cog": {
"base_model": agent.get("model", "default"),
"provider": "ollama",
"node_id": agent.get("node_id")
},
"act": {
"tools": agent.get("capabilities", [])
}
},
"city_presence": {
"primary_room_slug": agent.get("primary_room_slug"),
"district": agent.get("home_district"),
"rooms": rooms
}
}
# Get node info (detailed)
node_info = None
if agent.get("node_id"):
node_data = await repo_city.get_node_by_id(agent["node_id"])
if node_data:
node_info = {
"node_id": node_data["node_id"],
"name": node_data["name"],
"hostname": node_data.get("hostname"),
"roles": node_data.get("roles", []),
"environment": node_data.get("environment"),
"status": node_data.get("status", "offline"),
"guardian_agent": node_data.get("guardian_agent"),
"steward_agent": node_data.get("steward_agent")
}
else:
node_info = {
"node_id": agent["node_id"],
"status": "unknown",
"name": "Unknown Node"
}
# Get system prompts
system_prompts = await repo_city.get_agent_prompts(agent_id)
# Get public profile
public_profile = await repo_city.get_agent_public_profile(agent_id)
# MicroDAO memberships
memberships_raw = await repo_city.get_agent_microdao_memberships(agent_id)
memberships = [
AgentMicrodaoMembership(
microdao_id=item["microdao_id"],
microdao_slug=item.get("microdao_slug"),
microdao_name=item.get("microdao_name"),
logo_url=item.get("logo_url"),
role=item.get("role"),
is_core=item.get("is_core", False)
)
for item in memberships_raw
]
# Get primary city room for agent
primary_city_room = None
# Priority 1: agent's primary room from city_rooms
if rooms and len(rooms) > 0:
primary_room = rooms[0] # First room as primary
primary_city_room = {
"id": primary_room.get("id"),
"slug": primary_room.get("slug"),
"name": primary_room.get("name"),
"matrix_room_id": primary_room.get("matrix_room_id")
}
# Priority 2: Get from primary MicroDAO's main room
elif agent.get("primary_microdao_id"):
microdao_room = await repo_city.get_microdao_primary_room(agent["primary_microdao_id"])
if microdao_room:
primary_city_room = microdao_room
# Build dashboard response
dashboard = {
"profile": profile,
"node": node_info,
"primary_city_room": primary_city_room,
"runtime": {
"health": "healthy" if agent.get("status") == "online" else "unknown",
"last_success_at": None,
"last_error_at": None
},
"metrics": {
"tasks_1h": 0,
"tasks_24h": 0,
"errors_24h": 0,
"avg_latency_ms_1h": 0,
"success_rate_24h": 1.0
},
"recent_activity": [],
"system_prompts": system_prompts,
"public_profile": public_profile,
"microdao_memberships": memberships
}
return dashboard
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get agent dashboard: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail="Failed to get agent dashboard")
@router.get("/agents", response_model=List[AgentRead])
async def get_agents():
"""
Отримати список всіх агентів
"""
try:
agents = await repo_city.get_all_agents()
result = []
for agent in agents:
capabilities = agent.get("capabilities", [])
if isinstance(capabilities, str):
import json
capabilities = json.loads(capabilities)
result.append(AgentRead(
id=agent["id"],
display_name=agent["display_name"],
kind=agent.get("kind", "assistant"),
avatar_url=agent.get("avatar_url"),
color=agent.get("color", "cyan"),
status=agent.get("status", "offline"),
current_room_id=agent.get("current_room_id"),
capabilities=capabilities
))
return result
except Exception as e:
logger.error(f"Failed to get agents: {e}")
raise HTTPException(status_code=500, detail="Failed to get agents")
@router.post("/agents", response_model=CreateAgentResponse)
async def create_agent(body: CreateAgentRequest):
"""
Створити нового агента
"""
try:
pool = await repo_city.get_pool()
# Check if slug already exists
existing = await pool.fetchrow(
"SELECT id FROM agents WHERE id = $1 OR slug = $1",
body.slug
)
if existing:
raise HTTPException(status_code=400, detail=f"Agent with slug '{body.slug}' already exists")
# Generate ID from slug
agent_id = body.slug
# Insert agent
row = await pool.fetchrow("""
INSERT INTO agents (
id, slug, display_name, kind, role, model,
node_id, home_node_id, home_microdao_id, district,
primary_room_slug, avatar_url, color_hint,
is_public, is_orchestrator, priority,
created_at, updated_at
) VALUES (
$1, $2, $3, $4, $5, $6,
$7, $8, $9, $10,
$11, $12, $13,
$14, $15, $16,
NOW(), NOW()
)
RETURNING id, slug, display_name, kind, node_id, home_microdao_id, district, created_at
""",
agent_id,
body.slug,
body.display_name,
body.kind,
body.role,
body.model,
body.node_id,
body.home_node_id or body.node_id,
body.home_microdao_id,
body.district,
body.primary_room_slug,
body.avatar_url,
body.color_hint,
body.is_public,
body.is_orchestrator,
body.priority
)
logger.info(f"Created agent: {agent_id}")
return CreateAgentResponse(
id=row["id"],
slug=row["slug"],
display_name=row["display_name"],
kind=row["kind"],
node_id=row["node_id"],
home_microdao_id=row["home_microdao_id"],
district=row["district"],
created_at=row["created_at"]
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to create agent: {e}")
raise HTTPException(status_code=500, detail=f"Failed to create agent: {str(e)}")
@router.delete("/agents/{agent_id}", response_model=DeleteAgentResponse)
async def delete_agent(agent_id: str):
"""
Видалити агента (soft delete - встановлює is_archived=true, deleted_at=now())
"""
try:
pool = await repo_city.get_pool()
# Check if agent exists
existing = await pool.fetchrow(
"SELECT id, display_name FROM agents WHERE id = $1 AND deleted_at IS NULL",
agent_id
)
if not existing:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
# Soft delete
await pool.execute("""
UPDATE agents
SET is_archived = true,
deleted_at = NOW(),
updated_at = NOW()
WHERE id = $1
""", agent_id)
logger.info(f"Deleted (archived) agent: {agent_id}")
return DeleteAgentResponse(
ok=True,
message=f"Agent '{existing['display_name']}' has been archived",
agent_id=agent_id
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete agent: {e}")
raise HTTPException(status_code=500, detail=f"Failed to delete agent: {str(e)}")
@router.post("/agents/{agent_id}/ensure-room")
async def ensure_agent_room_endpoint(agent_id: str):
"""
Забезпечити існування персональної кімнати агента (Task v3).
Якщо кімнати немає - створює нову.
Повертає room_slug для переходу в чат.
"""
try:
pool = await repo_city.get_pool()
# Get agent
agent = await pool.fetchrow("""
SELECT id, display_name, primary_room_slug, district
FROM agents
WHERE id = $1 AND deleted_at IS NULL
""", agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' not found")
# If agent already has a room, return it
if agent["primary_room_slug"]:
return {"room_slug": agent["primary_room_slug"], "created": False}
# Create personal room for agent
import re
room_slug = f"agent-{re.sub(r'[^a-z0-9]+', '-', agent_id.lower()).strip('-')}"
# Check if slug exists
existing = await pool.fetchrow("SELECT id FROM city_rooms WHERE slug = $1", room_slug)
if existing:
room_slug = f"{room_slug}-{str(uuid.uuid4())[:8]}"
# Create room
await pool.execute("""
INSERT INTO city_rooms (
slug, name, description, owner_type, owner_id,
room_type, room_role, is_public, zone, space_scope
) VALUES (
$1, $2, $3, 'agent', $4,
'agent', 'personal', FALSE, $5, 'personal'
)
""",
room_slug,
f"Чат з {agent['display_name']}",
f"Персональна кімната агента {agent['display_name']}",
agent_id,
agent.get("district") or "agents"
)
# Update agent with room_slug
await pool.execute("""
UPDATE agents SET primary_room_slug = $1, updated_at = NOW()
WHERE id = $2
""", room_slug, agent_id)
logger.info(f"Created personal room {room_slug} for agent {agent_id}")
return {"room_slug": room_slug, "created": True}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to ensure room for agent {agent_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to ensure room: {str(e)}")
@router.get("/agents/online", response_model=List[AgentPresence])
async def get_online_agents():
"""
Отримати список онлайн агентів (для presence)
"""
try:
agents = await repo_city.get_online_agents()
result = []
for agent in agents:
result.append(AgentPresence(
agent_id=agent["id"],
display_name=agent["display_name"],
kind=agent.get("kind", "assistant"),
status=agent.get("status", "offline"),
room_id=agent.get("current_room_id"),
color=agent.get("color", "cyan")
))
return result
except Exception as e:
logger.error(f"Failed to get online agents: {e}")
raise HTTPException(status_code=500, detail="Failed to get online agents")
@router.get("/rooms/{room_id}/agents", response_model=List[AgentPresence])
async def get_room_agents(room_id: str):
"""
Отримати агентів у конкретній кімнаті
"""
try:
agents = await repo_city.get_agents_by_room(room_id)
result = []
for agent in agents:
result.append(AgentPresence(
agent_id=agent["id"],
display_name=agent["display_name"],
kind=agent.get("kind", "assistant"),
status=agent.get("status", "offline"),
room_id=room_id,
color=agent.get("color", "cyan"),
node_id=agent.get("node_id"),
model=agent.get("model"),
role=agent.get("role")
))
return result
except Exception as e:
logger.error(f"Failed to get room agents: {e}")
raise HTTPException(status_code=500, detail="Failed to get room agents")
@router.get("/agents/presence-snapshot")
async def get_agents_presence_snapshot():
"""
Отримати snapshot всіх агентів для presence (50 агентів по 10 districts)
"""
try:
snapshot = await repo_city.get_agents_presence_snapshot()
return snapshot
except Exception as e:
logger.error(f"Failed to get agents presence snapshot: {e}")
raise HTTPException(status_code=500, detail="Failed to get agents presence snapshot")
# =============================================================================
# MicroDAO API
# =============================================================================
@router.get("/microdao", response_model=List[MicrodaoSummary])
async def get_microdaos(
district: Optional[str] = Query(None, description="Filter by district"),
is_public: Optional[bool] = Query(True, description="Filter by public status (default: True)"),
is_platform: Optional[bool] = Query(None, description="Filter by platform status"),
q: Optional[str] = Query(None, description="Search by name/description"),
include_all: bool = Query(False, description="Include non-public (admin only)"),
limit: int = Query(50, le=100),
offset: int = Query(0, ge=0)
):
"""
Отримати список MicroDAOs.
- **district**: фільтр по дістрікту (Core, Energy, Green, Labs, etc.)
- **is_public**: фільтр по публічності (за замовчуванням True)
- **is_platform**: фільтр по типу (платформа/дістрікт)
- **q**: пошук по назві або опису
- **include_all**: включити всі (для адмінів)
"""
try:
# If include_all is True (admin mode), don't filter by is_public
public_filter = None if include_all else is_public
daos = await repo_city.list_microdao_summaries(
district=district,
is_public=public_filter,
is_platform=is_platform,
q=q,
limit=limit,
offset=offset
)
result = []
for dao in daos:
result.append(MicrodaoSummary(
id=dao["id"],
slug=dao["slug"],
name=dao["name"],
description=dao.get("description"),
district=dao.get("district"),
is_public=dao.get("is_public", True),
is_platform=dao.get("is_platform", False),
is_active=dao.get("is_active", True),
is_pinned=dao.get("is_pinned", False),
pinned_weight=dao.get("pinned_weight", 0),
orchestrator_agent_id=dao.get("orchestrator_agent_id"),
orchestrator_agent_name=dao.get("orchestrator_agent_name"),
parent_microdao_id=dao.get("parent_microdao_id"),
parent_microdao_slug=dao.get("parent_microdao_slug"),
logo_url=dao.get("logo_url"),
member_count=dao.get("member_count", 0),
agents_count=dao.get("agents_count", 0),
room_count=dao.get("room_count", 0),
rooms_count=dao.get("rooms_count", 0),
channels_count=dao.get("channels_count", 0)
))
return result
except Exception as e:
logger.error(f"Failed to get microdaos: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail="Failed to get microdaos")
@router.get("/microdao/{slug}", response_model=MicrodaoDetail)
async def get_microdao_by_slug(slug: str):
"""
Отримати детальну інформацію про MicroDAO.
Включає:
- Базову інформацію про DAO
- Список агентів (з ролями)
- Список каналів (Telegram, Matrix, City rooms, CrewAI)
"""
try:
dao = await repo_city.get_microdao_by_slug(slug)
if not dao:
raise HTTPException(status_code=404, detail=f"MicroDAO not found: {slug}")
# Build agents list
agents = []
for agent in dao.get("agents", []):
agents.append(MicrodaoAgentView(
agent_id=agent["agent_id"],
display_name=agent.get("display_name", agent["agent_id"]),
role=agent.get("role"),
is_core=agent.get("is_core", False)
))
# Build channels list
channels = []
for channel in dao.get("channels", []):
channels.append(MicrodaoChannelView(
kind=channel["kind"],
ref_id=channel["ref_id"],
display_name=channel.get("display_name"),
is_primary=channel.get("is_primary", False)
))
public_citizens = []
for citizen in dao.get("public_citizens", []):
public_citizens.append(MicrodaoCitizenView(
slug=citizen["slug"],
display_name=citizen["display_name"],
public_title=citizen.get("public_title"),
public_tagline=citizen.get("public_tagline"),
avatar_url=citizen.get("avatar_url"),
district=citizen.get("public_district"),
primary_room_slug=citizen.get("public_primary_room_slug")
))
# Build child microDAOs list
child_microdaos = []
for child in dao.get("child_microdaos", []):
child_microdaos.append(MicrodaoSummary(
id=child["id"],
slug=child["slug"],
name=child["name"],
is_public=child.get("is_public", True),
is_platform=child.get("is_platform", False)
))
# Get all rooms for MicroDAO (multi-room support)
all_rooms = await repo_city.get_microdao_rooms(dao["id"])
rooms_list = [
CityRoomSummary(
id=room["id"],
slug=room["slug"],
name=room["name"],
matrix_room_id=room.get("matrix_room_id"),
microdao_id=room.get("microdao_id"),
microdao_slug=room.get("microdao_slug"),
room_role=room.get("room_role"),
is_public=room.get("is_public", True),
sort_order=room.get("sort_order", 100)
)
for room in all_rooms
]
# Get primary city room (first room with role='primary' or first by sort_order)
primary_room_summary = None
if rooms_list:
primary = next((r for r in rooms_list if r.room_role == 'primary'), rooms_list[0])
primary_room_summary = primary
return MicrodaoDetail(
id=dao["id"],
slug=dao["slug"],
name=dao["name"],
description=dao.get("description"),
district=dao.get("district"),
is_public=dao.get("is_public", True),
is_platform=dao.get("is_platform", False),
is_active=dao.get("is_active", True),
orchestrator_agent_id=dao.get("orchestrator_agent_id"),
orchestrator_display_name=dao.get("orchestrator_display_name"),
parent_microdao_id=dao.get("parent_microdao_id"),
parent_microdao_slug=dao.get("parent_microdao_slug"),
child_microdaos=child_microdaos,
logo_url=dao.get("logo_url"),
agents=agents,
channels=channels,
public_citizens=public_citizens,
primary_city_room=primary_room_summary,
rooms=rooms_list
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get microdao {slug}: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail="Failed to get microdao")
# =============================================================================
# MicroDAO Multi-Room API (Task 034)
# =============================================================================
@router.get("/microdao/{slug}/rooms", response_model=MicrodaoRoomsList)
async def get_microdao_rooms_endpoint(slug: str):
"""
Отримати всі кімнати MicroDAO (Task 034).
Повертає список кімнат, впорядкованих за sort_order.
"""
try:
result = await repo_city.get_microdao_rooms_by_slug(slug)
if not result:
raise HTTPException(status_code=404, detail=f"MicroDAO not found: {slug}")
rooms = [
CityRoomSummary(
id=room["id"],
slug=room["slug"],
name=room["name"],
matrix_room_id=room.get("matrix_room_id"),
microdao_id=room.get("microdao_id"),
microdao_slug=room.get("microdao_slug"),
room_role=room.get("room_role"),
is_public=room.get("is_public", True),
sort_order=room.get("sort_order", 100)
)
for room in result["rooms"]
]
return MicrodaoRoomsList(
microdao_id=result["microdao_id"],
microdao_slug=result["microdao_slug"],
rooms=rooms
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get microdao rooms for {slug}: {e}")
raise HTTPException(status_code=500, detail="Failed to get microdao rooms")
@router.post("/microdao/{slug}/rooms", response_model=CityRoomSummary)
async def create_microdao_room_endpoint(slug: str, payload: CreateMicrodaoRoomRequest):
"""
Створити нову кімнату для MicroDAO (Task v3).
Створює city_room та прив'язує до MicroDAO.
"""
try:
# Get microdao by slug
dao = await repo_city.get_microdao_by_slug(slug)
if not dao:
raise HTTPException(status_code=404, detail=f"MicroDAO not found: {slug}")
pool = await repo_city.get_pool()
# Generate slug from name
import re
room_slug = re.sub(r'[^a-z0-9]+', '-', payload.name.lower()).strip('-')
room_slug = f"{slug}-{room_slug}"
# Check if slug already exists
existing = await pool.fetchrow("SELECT id FROM city_rooms WHERE slug = $1", room_slug)
if existing:
room_slug = f"{room_slug}-{str(uuid.uuid4())[:8]}"
# Create room in city_rooms
row = await pool.fetchrow("""
INSERT INTO city_rooms (
slug, name, description, owner_type, owner_id,
room_type, room_role, is_public, zone, space_scope
) VALUES (
$1, $2, $3, 'microdao', $4,
'microdao', $5, $6, $7, 'microdao'
)
RETURNING id, slug, name, description, room_role, is_public, zone
""",
room_slug,
payload.name,
payload.description,
dao["id"],
payload.room_role,
payload.is_public,
payload.zone_key
)
logger.info(f"Created room {room_slug} for MicroDAO {slug}")
return CityRoomSummary(
id=str(row["id"]),
slug=row["slug"],
name=row["name"],
microdao_id=dao["id"],
microdao_slug=slug,
room_role=row["room_role"],
is_public=row["is_public"],
sort_order=100
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to create room for microdao {slug}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to create room: {str(e)}")
@router.delete("/microdao/{slug}/rooms/{room_id}")
async def delete_microdao_room_endpoint(slug: str, room_id: str):
"""
Видалити кімнату MicroDAO (Task v3).
Soft-delete: встановлює deleted_at.
"""
try:
# Get microdao by slug
dao = await repo_city.get_microdao_by_slug(slug)
if not dao:
raise HTTPException(status_code=404, detail=f"MicroDAO not found: {slug}")
pool = await repo_city.get_pool()
# Check if room belongs to this microdao
room = await pool.fetchrow("""
SELECT id, slug FROM city_rooms
WHERE id = $1 AND owner_id = $2 AND owner_type = 'microdao'
""", room_id, dao["id"])
if not room:
raise HTTPException(status_code=404, detail="Room not found or not owned by this MicroDAO")
# Soft delete
await pool.execute("""
UPDATE city_rooms
SET deleted_at = NOW(), updated_at = NOW()
WHERE id = $1
""", room_id)
logger.info(f"Deleted room {room['slug']} from MicroDAO {slug}")
return {"ok": True, "message": f"Room '{room['slug']}' deleted"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete room {room_id} from microdao {slug}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to delete room: {str(e)}")
@router.get("/microdao/{slug}/agents")
async def get_microdao_agents_endpoint(slug: str):
"""
Отримати всіх агентів MicroDAO.
Повертає список агентів з їх ролями та статусами.
"""
try:
# Get microdao by slug
dao = await repo_city.get_microdao_by_slug(slug)
if not dao:
raise HTTPException(status_code=404, detail=f"MicroDAO not found: {slug}")
# Get agents from microdao_agents
pool = await repo_city.get_pool()
query = """
SELECT
a.id,
a.display_name as name,
a.kind,
a.status,
a.avatar_url,
a.gov_level,
ma.role,
ma.is_core
FROM agents a
JOIN microdao_agents ma ON ma.agent_id = a.id
WHERE ma.microdao_id = $1
AND COALESCE(a.is_archived, false) = false
AND a.deleted_at IS NULL
ORDER BY
ma.is_core DESC,
CASE ma.role
WHEN 'orchestrator' THEN 0
WHEN 'district_lead' THEN 1
WHEN 'core_team' THEN 2
ELSE 3
END,
a.display_name
"""
rows = await pool.fetch(query, dao["id"])
return {
"microdao_id": dao["id"],
"microdao_slug": dao["slug"],
"agents": [
{
"id": r["id"],
"name": r["name"],
"kind": r["kind"],
"status": r["status"],
"avatar_url": r.get("avatar_url"),
"gov_level": r.get("gov_level"),
"role": r["role"],
"is_core": r["is_core"]
}
for r in rows
]
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get microdao agents for {slug}: {e}")
raise HTTPException(status_code=500, detail="Failed to get microdao agents")
@router.post("/microdao/{slug}/rooms/attach-existing", response_model=CityRoomSummary)
async def attach_existing_room_endpoint(
slug: str,
payload: AttachExistingRoomRequest
):
"""
Прив'язати існуючу кімнату до MicroDAO (Task 036).
Потребує прав адміністратора або оркестратора MicroDAO.
"""
try:
# Get microdao by slug
dao = await repo_city.get_microdao_by_slug(slug)
if not dao:
raise HTTPException(status_code=404, detail=f"MicroDAO not found: {slug}")
# TODO: Add authorization check (assert_can_manage_microdao)
result = await repo_city.attach_room_to_microdao(
microdao_id=dao["id"],
room_id=payload.room_id,
room_role=payload.room_role,
is_public=payload.is_public,
sort_order=payload.sort_order
)
if not result:
raise HTTPException(status_code=404, detail="Room not found")
return CityRoomSummary(
id=result["id"],
slug=result["slug"],
name=result["name"],
matrix_room_id=result.get("matrix_room_id"),
microdao_id=result.get("microdao_id"),
room_role=result.get("room_role"),
is_public=result.get("is_public", True),
sort_order=result.get("sort_order", 100)
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to attach room to microdao {slug}: {e}")
raise HTTPException(status_code=500, detail="Failed to attach room")
@router.patch("/microdao/{slug}/rooms/{room_id}", response_model=CityRoomSummary)
async def update_microdao_room_endpoint(
slug: str,
room_id: str,
payload: MicrodaoRoomUpdate
):
"""
Оновити налаштування кімнати MicroDAO (Task 036).
Потребує прав адміністратора або оркестратора MicroDAO.
"""
try:
# Get microdao by slug
dao = await repo_city.get_microdao_by_slug(slug)
if not dao:
raise HTTPException(status_code=404, detail=f"MicroDAO not found: {slug}")
# TODO: Add authorization check (assert_can_manage_microdao)
result = await repo_city.update_microdao_room(
microdao_id=dao["id"],
room_id=room_id,
room_role=payload.room_role,
is_public=payload.is_public,
sort_order=payload.sort_order,
set_primary=payload.set_primary or False
)
if not result:
raise HTTPException(status_code=404, detail="Room not found or not attached to this MicroDAO")
return CityRoomSummary(
id=result["id"],
slug=result["slug"],
name=result["name"],
matrix_room_id=result.get("matrix_room_id"),
microdao_id=result.get("microdao_id"),
room_role=result.get("room_role"),
is_public=result.get("is_public", True),
sort_order=result.get("sort_order", 100)
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update room {room_id} for microdao {slug}: {e}")
raise HTTPException(status_code=500, detail="Failed to update room")
# =============================================================================
# MicroDAO Visibility & Creation (Task 029)
# =============================================================================
class MicrodaoVisibilityPayload(BaseModel):
"""MicroDAO visibility update payload"""
is_public: bool
is_platform: Optional[bool] = None
@router.put("/microdao/{microdao_id}/visibility")
async def update_microdao_visibility_endpoint(
microdao_id: str,
payload: MicrodaoVisibilityPayload
):
"""Оновити налаштування видимості MicroDAO (Task 029)"""
try:
result = await repo_city.update_microdao_visibility(
microdao_id=microdao_id,
is_public=payload.is_public,
is_platform=payload.is_platform,
)
if not result:
raise HTTPException(status_code=404, detail="MicroDAO not found")
return {
"status": "ok",
"microdao_id": result.get("id"),
"slug": result.get("slug"),
"is_public": result.get("is_public"),
"is_platform": result.get("is_platform"),
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update microdao visibility: {e}")
raise HTTPException(status_code=500, detail="Failed to update visibility")
class MicrodaoCreatePayload(BaseModel):
"""Create MicroDAO from agent (orchestrator flow)"""
name: str
slug: str
description: Optional[str] = None
make_platform: bool = False
is_public: bool = True
parent_microdao_id: Optional[str] = None
create_rooms: Optional[dict] = None # {"primary_lobby": bool, "governance": bool, "crew_team": bool}
@router.post("/agents/{agent_id}/microdao", response_model=dict)
async def create_microdao_for_agent_endpoint(
agent_id: str,
payload: MicrodaoCreatePayload
):
"""
Створити MicroDAO для агента (зробити його оркестратором).
Цей endpoint:
1. Створює новий MicroDAO
2. Призначає агента оркестратором
3. Додає агента як члена DAO
4. Встановлює primary_microdao_id якщо порожній
5. Опціонально створює кімнати (primary/governance/crew)
"""
try:
# Check if agent exists and is not archived
agent = await repo_city.get_agent_by_id(agent_id)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
# Check if slug is unique
existing = await repo_city.get_microdao_by_slug(payload.slug)
if existing:
raise HTTPException(status_code=400, detail=f"MicroDAO with slug '{payload.slug}' already exists")
# Create MicroDAO
result = await repo_city.create_microdao_for_agent(
orchestrator_agent_id=agent_id,
name=payload.name,
slug=payload.slug,
description=payload.description,
make_platform=payload.make_platform,
is_public=payload.is_public,
parent_microdao_id=payload.parent_microdao_id,
)
if not result:
raise HTTPException(status_code=500, detail="Failed to create MicroDAO")
microdao_id = result["id"]
created_rooms = []
# Create Rooms if requested
if payload.create_rooms:
# Helper to create room
async def create_room_helper(room_slug: str, name: str, role: str, is_public: bool = True):
# Create room
matrix_room_id, matrix_room_alias = await create_matrix_room(
slug=room_slug,
name=name,
visibility="public" if is_public else "private"
)
room = await repo_city.create_room(
slug=room_slug,
name=name,
description=f"{role.title()} room for {payload.name}",
created_by="u_system", # TODO: use actual user if available
matrix_room_id=matrix_room_id,
matrix_room_alias=matrix_room_alias
)
# Attach to MicroDAO
attached = await repo_city.attach_room_to_microdao(
microdao_id=microdao_id,
room_id=room["id"],
room_role=role,
is_public=is_public,
sort_order=10 if role == 'primary' else 50
)
created_rooms.append(attached)
# Primary Lobby
if payload.create_rooms.get("primary_lobby"):
await create_room_helper(
room_slug=f"{payload.slug}-lobby",
name=f"{payload.name} Lobby",
role="primary",
is_public=True
)
# Governance
if payload.create_rooms.get("governance"):
await create_room_helper(
room_slug=f"{payload.slug}-gov",
name=f"{payload.name} Governance",
role="governance",
is_public=True
)
# Crew Team
if payload.create_rooms.get("crew_team"):
await create_room_helper(
room_slug=f"{payload.slug}-team",
name=f"{payload.name} Team",
role="team",
is_public=False # Team rooms usually private/internal
)
return {
"status": "ok",
"microdao": result,
"agent_id": agent_id,
"created_rooms": created_rooms
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to create microdao for agent {agent_id}: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail="Failed to create MicroDAO")
class AttachToMicrodaoPayload(BaseModel):
agent_id: str
role: str = "member" # orchestrator | member
@router.post("/microdao/{slug}/attach-agent")
async def attach_agent_to_microdao_endpoint(
slug: str,
payload: AttachToMicrodaoPayload
):
"""
Приєднати агента до існуючого MicroDAO (Task 040).
"""
try:
# Check MicroDAO
dao = await repo_city.get_microdao_by_slug(slug)
if not dao:
raise HTTPException(status_code=404, detail=f"MicroDAO not found: {slug}")
# Check Agent
agent = await repo_city.get_agent_by_id(payload.agent_id)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
# Determine flags
is_core = False
if payload.role == "orchestrator":
is_core = True
# Upsert membership
result = await repo_city.upsert_agent_microdao_membership(
agent_id=payload.agent_id,
microdao_id=dao["id"],
role=payload.role,
is_core=is_core
)
# If role is orchestrator, we might want to update the main record too if it's missing an orchestrator,
# or just rely on the membership table.
# The current repo_city.create_microdao_for_agent sets orchestrator_agent_id on the microdao table.
# Let's check if we should update that.
if payload.role == "orchestrator" and not dao.get("orchestrator_agent_id"):
# TODO: Update microdao table orchestrator_agent_id = payload.agent_id
# For now, memberships are the source of truth for "orchestrators list"
pass
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to attach agent to microdao: {e}")
raise HTTPException(status_code=500, detail="Failed to attach agent")
@router.post("/microdao/{slug}/ensure-orchestrator-room", response_model=CityRoomSummary)
async def ensure_orchestrator_room(
slug: str,
authorization: Optional[str] = Header(None)
):
"""
Забезпечити існування кімнати команди оркестратора для MicroDAO.
Створює нову кімнату в Matrix та БД, якщо її ще немає.
Доступно для: Адмінів, Оркестратора MicroDAO.
"""
# 1. Validate JWT
user_info = await validate_jwt_token(authorization)
if not user_info:
raise HTTPException(status_code=401, detail="Invalid or missing authorization token")
user_id = user_info.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token: missing user_id")
# 2. Get MicroDAO
dao = await repo_city.get_microdao_by_slug(slug)
if not dao:
raise HTTPException(status_code=404, detail="MicroDAO not found")
microdao_id = dao["id"]
orchestrator_id = dao.get("orchestrator_agent_id")
# 3. Check permissions (mock logic for now, assuming valid user if token is present)
# TODO: In real app, check if user_id matches owner or has admin role
# For now, we assume if they can call this (protected by UI/proxy), they are authorized.
try:
room = await repo_city.get_or_create_orchestrator_team_room(microdao_id)
if not room:
raise HTTPException(status_code=500, detail="Failed to create or retrieve orchestrator room")
return CityRoomSummary(
id=room["id"],
slug=room["slug"],
name=room["name"],
matrix_room_id=room.get("matrix_room_id"),
microdao_id=room.get("microdao_id"),
microdao_slug=slug, # Ensure slug is passed
room_role=room.get("room_role"),
is_public=room.get("is_public", False),
sort_order=room.get("sort_order", 100),
logo_url=room.get("logo_url"),
banner_url=room.get("banner_url")
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error ensuring orchestrator room for {slug}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# =============================================================================
# DAGI Agent Audit API
# =============================================================================
class DAGIAuditSummary(BaseModel):
"""Підсумок DAGI audit"""
node_id: str
timestamp: str
router_total: int
db_total: int
active_count: int
phantom_count: int
stale_count: int
triggered_by: Optional[str] = None
class DAGIAgentStatus(BaseModel):
"""Статус агента в DAGI"""
id: str
name: str
external_id: Optional[str] = None
kind: Optional[str] = None
status: str # active, stale, phantom
dagi_status: Optional[str] = None
last_seen_at: Optional[str] = None
router_id: Optional[str] = None
reason: Optional[str] = None
class DAGIAuditResponse(BaseModel):
"""Повний звіт DAGI audit"""
summary: DAGIAuditSummary
active_agents: List[DAGIAgentStatus]
phantom_agents: List[DAGIAgentStatus]
stale_agents: List[DAGIAgentStatus]
@router.get("/internal/node/{node_id}/dagi-audit", response_model=Optional[DAGIAuditSummary])
async def get_node_dagi_audit(node_id: str):
"""
Отримати останній DAGI audit звіт для ноди.
"""
try:
audit = await repo_city.get_latest_dagi_audit(node_id)
if not audit:
return None
return DAGIAuditSummary(
node_id=audit["node_id"],
timestamp=audit["timestamp"],
router_total=audit["router_total"],
db_total=audit["db_total"],
active_count=audit["active_count"],
phantom_count=audit["phantom_count"],
stale_count=audit["stale_count"],
triggered_by=audit.get("triggered_by")
)
except Exception as e:
logger.error(f"Error getting DAGI audit for {node_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to get DAGI audit")
@router.get("/internal/node/{node_id}/dagi-audit/full")
async def get_node_dagi_audit_full(node_id: str):
"""
Отримати повний DAGI audit звіт для ноди (з деталями).
"""
try:
audit = await repo_city.get_latest_dagi_audit(node_id)
if not audit:
raise HTTPException(status_code=404, detail="No audit found for this node")
return audit
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting full DAGI audit for {node_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to get DAGI audit")
@router.get("/internal/node/{node_id}/dagi-audit/history")
async def get_node_dagi_audit_history(
node_id: str,
limit: int = Query(default=10, le=100)
):
"""
Отримати історію DAGI audit звітів для ноди.
"""
try:
history = await repo_city.get_dagi_audit_history(node_id, limit)
return {"node_id": node_id, "history": history}
except Exception as e:
logger.error(f"Error getting DAGI audit history for {node_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to get audit history")
@router.get("/internal/node/{node_id}/agents/system")
async def get_node_system_agents(node_id: str):
"""
Отримати агентів з БД для ноди (для DAGI audit).
"""
try:
agents = await repo_city.get_agents_by_node_for_audit(node_id)
return {
"node_id": node_id,
"total": len(agents),
"agents": agents
}
except Exception as e:
logger.error(f"Error getting system agents for {node_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to get system agents")
@router.post("/internal/node/{node_id}/dagi-audit/run")
async def run_node_dagi_audit(
node_id: str,
request: Request
):
"""
Запустити DAGI audit для ноди.
Порівнює агентів з router-config.yml та БД.
Цей endpoint викликає audit логіку inline (для MVP).
В продакшені краще делегувати на worker/celery.
"""
import yaml
from pathlib import Path
from datetime import datetime
try:
# Визначити шлях до router-config
project_root = Path(__file__).parent.parent.parent
config_path = project_root / "router-config.yml"
if not config_path.exists():
raise HTTPException(status_code=404, detail="router-config.yml not found")
# Парсити router config
with open(config_path, 'r', encoding='utf-8') as f:
router_config = yaml.safe_load(f)
router_agents = []
for agent_id, agent_data in router_config.get("agents", {}).items():
router_agents.append({
"id": agent_id,
"name": agent_id,
"description": agent_data.get("description", "")
})
# Отримати агентів з БД
db_agents = await repo_city.get_all_agents_for_audit()
# Зіставлення
def normalize(name: str) -> str:
return name.lower().replace(" ", "").replace("-", "").replace("_", "")
router_by_id = {a["id"].lower(): a for a in router_agents}
db_by_ext_id = {}
for a in db_agents:
if a.get("external_id"):
ext_id = a["external_id"].split(":")[-1].lower() if ":" in a["external_id"] else a["external_id"].lower()
db_by_ext_id[ext_id] = a
db_by_name = {normalize(a["name"]): a for a in db_agents}
active = []
phantom = []
stale = []
matched_db_ids = set()
for r_agent in router_agents:
r_id = r_agent["id"].lower()
r_name_norm = normalize(r_agent["name"])
db_match = db_by_ext_id.get(r_id) or db_by_name.get(r_name_norm)
if db_match:
active.append({
"router_id": r_agent["id"],
"router_name": r_agent["name"],
"db_id": db_match["id"],
"db_name": db_match["name"],
"db_external_id": db_match.get("external_id"),
"kind": db_match.get("kind"),
"status": db_match.get("status", "unknown")
})
matched_db_ids.add(db_match["id"])
else:
phantom.append({
"router_id": r_agent["id"],
"router_name": r_agent["name"],
"description": r_agent.get("description", ""),
"reason": "In Router config but not in DB"
})
for db_agent in db_agents:
if db_agent["id"] not in matched_db_ids:
stale.append({
"db_id": db_agent["id"],
"db_name": db_agent["name"],
"db_external_id": db_agent.get("external_id"),
"kind": db_agent.get("kind"),
"reason": "In DB but not in Router config"
})
# Формуємо звіт
report = {
"node_id": node_id,
"timestamp": datetime.utcnow().isoformat() + "Z",
"summary": {
"router_total": len(router_agents),
"db_total": len(db_agents),
"active_count": len(active),
"phantom_count": len(phantom),
"stale_count": len(stale)
},
"active_agents": active,
"phantom_agents": phantom,
"stale_agents": stale
}
# Зберегти звіт в БД
saved = await repo_city.save_dagi_audit_report(node_id, report, triggered_by="api")
# Оновити статуси агентів
if active:
active_ids = [a["db_id"] for a in active]
await repo_city.update_agents_dagi_status(active_ids, "active", update_last_seen=True)
if stale:
stale_ids = [a["db_id"] for a in stale]
await repo_city.update_agents_dagi_status(stale_ids, "stale")
return {
"status": "completed",
"report_id": saved["id"],
"summary": report["summary"],
"message": f"Audit completed: {len(active)} active, {len(phantom)} phantom, {len(stale)} stale"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error running DAGI audit for {node_id}: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Failed to run DAGI audit: {str(e)}")
# =============================================================================
# DAGI Router Agents API (for Node Cabinet Table)
# =============================================================================
class DAGIRouterAgentItem(BaseModel):
"""Агент для таблиці DAGI Router"""
id: str
name: str
role: Optional[str] = None
status: str # active, phantom, stale, error
node_id: Optional[str] = None
models: List[str] = []
gpu: Optional[str] = None
cpu: Optional[str] = None
last_seen_at: Optional[str] = None
has_cabinet: bool = False
cabinet_slug: Optional[str] = None
description: Optional[str] = None
has_prompts: bool = False # Чи є системні промти в БД
class DAGIRouterAgentsSummary(BaseModel):
"""Summary для DAGI Router Agents"""
active: int = 0
phantom: int = 0
stale: int = 0
router_total: int = 0
system_total: int = 0
class DAGIRouterAgentsResponse(BaseModel):
"""Відповідь API DAGI Router Agents"""
node_id: str
last_audit_at: Optional[str] = None
summary: DAGIRouterAgentsSummary
agents: List[DAGIRouterAgentItem]
# NOTE: get_dagi_router_agents moved to internal endpoints section below
# =============================================================================
# Node Metrics API
# =============================================================================
class NodeMetricsResponse(BaseModel):
"""Метрики ноди"""
node_id: str
node_name: Optional[str] = None
hostname: Optional[str] = None
status: Optional[str] = "unknown"
environment: Optional[str] = None
cpu_model: Optional[str] = None
cpu_cores: int = 0
cpu_usage: float = 0.0
gpu_model: Optional[str] = None
gpu_memory_total: int = 0
gpu_memory_used: int = 0
ram_total: int = 0
ram_used: int = 0
disk_total: int = 0
disk_used: int = 0
agent_count_router: int = 0
agent_count_system: int = 0
last_heartbeat: Optional[str] = None
@router.get("/internal/node/{node_id}/metrics/current", response_model=NodeMetricsResponse)
async def get_node_metrics_current(node_id: str):
"""
Отримати поточні метрики ноди.
Єдине джерело правди для Node Cabinet індикаторів.
"""
try:
metrics = await repo_city.get_node_metrics_current(node_id)
if not metrics:
# Return minimal response for unknown node
return NodeMetricsResponse(node_id=node_id)
return NodeMetricsResponse(**metrics)
except Exception as e:
logger.error(f"Error getting metrics for {node_id}: {e}")
return NodeMetricsResponse(node_id=node_id)
class NodeMetricsUpdateRequest(BaseModel):
"""Запит на оновлення метрик"""
cpu_usage: Optional[float] = None
gpu_vram_used: Optional[int] = None
ram_used: Optional[int] = None
disk_used: Optional[int] = None
agent_count_router: Optional[int] = None
agent_count_system: Optional[int] = None
@router.post("/internal/node/{node_id}/metrics/update")
async def update_node_metrics(
node_id: str,
metrics: NodeMetricsUpdateRequest
):
"""
Оновити метрики ноди (heartbeat).
Викликається з agent на ноді.
"""
try:
success = await repo_city.update_node_metrics(node_id, metrics.dict(exclude_unset=True))
return {
"status": "updated" if success else "not_found",
"node_id": node_id
}
except Exception as e:
logger.error(f"Error updating metrics for {node_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to update metrics")
# =============================================================================
# Phantom / Stale Autosync API
# =============================================================================
class PhantomSyncRequest(BaseModel):
"""Запит на синхронізацію phantom агентів"""
agent_ids: List[str]
@router.post("/internal/node/{node_id}/dagi-router/phantom/sync")
async def sync_phantom_agents(
node_id: str,
request: PhantomSyncRequest
):
"""
Синхронізувати phantom агентів — створити їх у БД на основі router-config.
"""
import yaml
from pathlib import Path
try:
# Читаємо router-config
project_root = Path(__file__).parent.parent.parent
config_path = project_root / "router-config.yml"
if not config_path.exists():
raise HTTPException(status_code=404, detail="router-config.yml not found")
with open(config_path, 'r', encoding='utf-8') as f:
router_config = yaml.safe_load(f)
# Синхронізуємо агентів
created = await repo_city.sync_phantom_agents(
node_id,
request.agent_ids,
router_config
)
return {
"status": "completed",
"node_id": node_id,
"created_count": len(created),
"created_agents": created
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error syncing phantom agents for {node_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to sync phantom agents: {str(e)}")
class StaleSyncRequest(BaseModel):
"""Запит на позначення stale агентів"""
agent_ids: List[str]
@router.post("/internal/node/{node_id}/dagi-router/stale/mark")
async def mark_stale_agents(
node_id: str,
request: StaleSyncRequest
):
"""
Позначити агентів як stale (в БД, але не в Router).
"""
try:
updated_count = await repo_city.mark_stale_agents(request.agent_ids)
return {
"status": "completed",
"node_id": node_id,
"marked_count": updated_count
}
except Exception as e:
logger.error(f"Error marking stale agents for {node_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to mark stale agents: {str(e)}")
# =============================================================================
# Node Agents API (для Node Cabinet)
# =============================================================================
class NodeAgentItem(BaseModel):
"""Агент ноди для Node Cabinet"""
id: str
name: str
slug: Optional[str] = None
kind: Optional[str] = None
role: Optional[str] = None # node_guardian, node_steward, etc.
status: str = "unknown"
dagi_status: Optional[str] = None
last_seen_at: Optional[str] = None
is_guardian: bool = False
is_steward: bool = False
class NodeAgentsResponse(BaseModel):
"""Список агентів ноди"""
node_id: str
total: int
guardian: Optional[NodeAgentItem] = None
steward: Optional[NodeAgentItem] = None
agents: List[NodeAgentItem]
@router.get("/internal/node/{node_id}/agents", response_model=NodeAgentsResponse)
async def get_node_agents(node_id: str):
"""
Отримати всіх агентів ноди (Guardian, Steward, runtime agents).
"""
try:
agents_data = await repo_city.get_node_agents(node_id)
agents = []
guardian = None
steward = None
for a in agents_data:
item = NodeAgentItem(
id=a["id"],
name=a.get("display_name") or a.get("name") or a["id"],
slug=a.get("public_slug") or a["id"],
kind=a.get("kind"),
role=a.get("kind"), # Use kind as role for now
status=a.get("status", "unknown"),
dagi_status=a.get("dagi_status"),
last_seen_at=a.get("last_seen_at").isoformat() if a.get("last_seen_at") else None,
is_guardian=a.get("is_node_guardian", False) or a.get("kind") == "node_guardian",
is_steward=a.get("is_node_steward", False) or a.get("kind") == "node_steward"
)
agents.append(item)
if item.is_guardian and not guardian:
guardian = item
if item.is_steward and not steward:
steward = item
return NodeAgentsResponse(
node_id=node_id,
total=len(agents),
guardian=guardian,
steward=steward,
agents=agents
)
except Exception as e:
logger.error(f"Error getting node agents for {node_id}: {e}")
return NodeAgentsResponse(
node_id=node_id,
total=0,
agents=[]
)
# =============================================================================
# Agent Runtime Prompts API (for DAGI Router integration)
# =============================================================================
class RuntimePromptsResponse(BaseModel):
"""Runtime prompts for DAGI Router"""
agent_id: str
has_prompts: bool
prompts: Dict[str, Optional[str]]
class RuntimeSystemPromptResponse(BaseModel):
"""Full runtime system prompt for DAGI Router"""
agent_id: str
agent_name: Optional[str] = None
agent_kind: Optional[str] = None
has_prompts: bool
system_prompt: str
prompts: Dict[str, Optional[str]]
class AgentPromptsStatusRequest(BaseModel):
"""Request to check prompts status for multiple agents"""
agent_ids: List[str]
class AgentPromptsStatusResponse(BaseModel):
"""Response with prompts status for multiple agents"""
status: Dict[str, bool]
@router.get("/internal/agents/{agent_id}/prompts/runtime", response_model=RuntimePromptsResponse)
async def get_agent_runtime_prompts(agent_id: str):
"""
Отримати runtime промти агента для DAGI Router.
Повертає тільки content промтів без метаданих.
Використовується DAGI Router для побудови system prompt.
"""
try:
data = await repo_city.get_runtime_prompts(agent_id)
return RuntimePromptsResponse(**data)
except Exception as e:
logger.error(f"Error getting runtime prompts for {agent_id}: {e}")
return RuntimePromptsResponse(
agent_id=agent_id,
has_prompts=False,
prompts={"core": None, "safety": None, "governance": None, "tools": None}
)
@router.get("/internal/agents/{agent_id}/system-prompt", response_model=RuntimeSystemPromptResponse)
async def get_agent_system_prompt(agent_id: str):
"""
Отримати зібраний system prompt для агента.
DAGI Router використовує цей endpoint для отримання повного system prompt,
який включає core, safety, governance, tools та контекст.
"""
try:
data = await repo_city.get_agent_with_runtime_prompt(agent_id)
if not data:
# Fallback for unknown agent
return RuntimeSystemPromptResponse(
agent_id=agent_id,
agent_name=None,
agent_kind=None,
has_prompts=False,
system_prompt=f"You are an AI agent (ID: {agent_id}) in DAARION.city. Be helpful and accurate.",
prompts={"core": None, "safety": None, "governance": None, "tools": None}
)
return RuntimeSystemPromptResponse(**data)
except Exception as e:
logger.error(f"Error getting system prompt for {agent_id}: {e}")
import traceback
traceback.print_exc()
return RuntimeSystemPromptResponse(
agent_id=agent_id,
has_prompts=False,
system_prompt=f"You are an AI agent in DAARION.city. Be helpful and accurate.",
prompts={"core": None, "safety": None, "governance": None, "tools": None}
)
@router.post("/internal/agents/prompts/status", response_model=AgentPromptsStatusResponse)
async def check_agents_prompts_status(request: AgentPromptsStatusRequest):
"""
Перевірити наявність промтів для списку агентів.
Використовується UI для показу індикаторів has_prompts в таблицях агентів.
"""
try:
status = await repo_city.check_agents_prompts_status(request.agent_ids)
return AgentPromptsStatusResponse(status=status)
except Exception as e:
logger.error(f"Error checking prompts status: {e}")
return AgentPromptsStatusResponse(
status={agent_id: False for agent_id in request.agent_ids}
)
# =============================================================================
# Node Self-Registration & Self-Healing API
# =============================================================================
class NodeSelfRegisterRequest(BaseModel):
"""Request body for node self-registration"""
id: str
name: str
hostname: Optional[str] = None
environment: str = "development"
roles: List[str] = []
description: Optional[str] = None
class NodeSelfRegisterResponse(BaseModel):
"""Response for node self-registration"""
success: bool
node_id: str
is_new: bool = False
message: str = ""
error: Optional[str] = None
class NodeHeartbeatRequest(BaseModel):
"""Request body for node heartbeat"""
metrics: Optional[Dict] = None
class NodeHeartbeatResponse(BaseModel):
"""Response for node heartbeat"""
success: bool
node_id: Optional[str] = None
heartbeat_at: Optional[str] = None
error: Optional[str] = None
should_self_register: bool = False
class NodeSelfHealingStatusResponse(BaseModel):
"""Response for node self-healing status"""
node_id: str
registered: bool
is_active: Optional[bool] = None
name: Optional[str] = None
self_healing_status: str = "unknown"
last_heartbeat: Optional[str] = None
last_self_registration: Optional[str] = None
self_registration_count: int = 0
agent_count_router: int = 0
agent_count_system: int = 0
has_guardian: bool = False
has_steward: bool = False
errors: List[Dict] = []
status: Optional[str] = None
error: Optional[str] = None
class NodesNeedingHealingResponse(BaseModel):
"""Response listing nodes that need healing"""
nodes: List[Dict]
total: int
@router.post("/internal/nodes/register-or-update", response_model=NodeSelfRegisterResponse)
async def node_self_register(request: NodeSelfRegisterRequest):
"""
Самореєстрація ноди.
Цей endpoint викликається:
- Node Bootstrap script при старті ноди
- Node Guardian при виявленні, що нода зникла з Directory
Якщо нода вже зареєстрована — оновлює дані.
Якщо нова — створює запис в node_registry.
"""
try:
result = await repo_city.node_self_register(
node_id=request.id,
name=request.name,
hostname=request.hostname,
environment=request.environment,
roles=request.roles,
description=request.description
)
return NodeSelfRegisterResponse(
success=result.get("success", False),
node_id=result.get("node_id", request.id),
is_new=result.get("is_new", False),
message=result.get("message", ""),
error=result.get("error")
)
except Exception as e:
logger.error(f"Node self-registration failed for {request.id}: {e}")
return NodeSelfRegisterResponse(
success=False,
node_id=request.id,
message="Registration failed",
error=str(e)
)
@router.post("/internal/node/{node_id}/heartbeat", response_model=NodeHeartbeatResponse)
async def node_heartbeat(node_id: str, request: NodeHeartbeatRequest = NodeHeartbeatRequest()):
"""
Heartbeat ноди з оновленням метрик.
Повертає should_self_register=True якщо нода не зареєстрована,
що є сигналом для Node Guardian виконати self-registration.
"""
try:
result = await repo_city.node_heartbeat(
node_id=node_id,
metrics=request.metrics
)
return NodeHeartbeatResponse(
success=result.get("success", False),
node_id=result.get("node_id"),
heartbeat_at=result.get("heartbeat_at"),
error=result.get("error"),
should_self_register=result.get("should_self_register", False)
)
except Exception as e:
logger.error(f"Heartbeat failed for {node_id}: {e}")
return NodeHeartbeatResponse(
success=False,
node_id=node_id,
error=str(e)
)
@router.get("/internal/node/{node_id}/self-healing/status", response_model=NodeSelfHealingStatusResponse)
async def get_node_self_healing_status(node_id: str):
"""
Отримати статус self-healing для ноди.
Використовується Node Guardian для моніторингу стану ноди.
"""
try:
result = await repo_city.get_node_self_healing_status(node_id)
return NodeSelfHealingStatusResponse(**result)
except Exception as e:
logger.error(f"Failed to get self-healing status for {node_id}: {e}")
return NodeSelfHealingStatusResponse(
node_id=node_id,
registered=False,
status="error",
error=str(e)
)
@router.get("/internal/node/{node_id}/swapper", response_model=NodeSwapperDetail)
async def get_node_swapper_detail(node_id: str):
"""
Get detailed Swapper Service status for a node.
Used by Node Cabinet to show loaded models and health.
"""
try:
# Fetch from node_cache
metrics = await repo_city.get_node_metrics(node_id)
if not metrics:
raise HTTPException(status_code=404, detail="Node not found")
# Parse swapper state (stored as JSONB)
state = metrics.get("swapper_state") or {}
models_data = state.get("models", [])
models = [
SwapperModel(
name=m.get("name", "unknown"),
# Swapper uses "status": "loaded" not "loaded": true
loaded=m.get("status") == "loaded" or m.get("loaded", False),
type=m.get("type"),
vram_gb=m.get("size_gb") or m.get("vram_gb")
)
for m in models_data
]
return NodeSwapperDetail(
node_id=node_id,
healthy=metrics.get("swapper_healthy", False),
models_loaded=metrics.get("swapper_models_loaded", 0),
models_total=metrics.get("swapper_models_total", 0),
models=models
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get swapper detail for {node_id}: {e}")
return NodeSwapperDetail(
node_id=node_id,
healthy=False,
models_loaded=0,
models_total=0,
models=[]
)
@router.get("/internal/node/{node_id}/dagi-router/health")
async def get_dagi_router_health(node_id: str):
"""
Get DAGI Router health status for a node.
First checks node_cache for cached router_healthy status (from node-guardian).
Falls back to direct health check if node is local (NODE1).
"""
import httpx
import time
# First, try to get cached router health from node_cache
# This is populated by node-guardian which has direct access to the router
try:
metrics = await repo_city.get_node_metrics(node_id)
if metrics and metrics.get("router_healthy") is not None:
return {
"node_id": node_id,
"status": "up" if metrics.get("router_healthy") else "down",
"version": metrics.get("router_version"),
"agent_count": 0, # TODO: get from node_cache
"latency_ms": None,
"source": "node_cache"
}
except Exception as e:
logger.debug(f"Failed to get cached router health for {node_id}: {e}")
# Fallback: try direct health check (only works for NODE1 which is local to city-service)
endpoints = await repo_city.get_node_endpoints(node_id)
base_url = endpoints.get("router_url")
if not base_url:
return {
"node_id": node_id,
"status": "down",
"version": None,
"agent_count": 0,
"latency_ms": None,
"error": "No router URL configured for this node"
}
try:
start = time.monotonic()
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(f"{base_url}/health")
latency_ms = (time.monotonic() - start) * 1000.0
if resp.status_code != 200:
return {
"node_id": node_id,
"status": "down",
"version": None,
"agent_count": 0,
"latency_ms": latency_ms
}
data = resp.json()
# Router can return "healthy" or "ok"
status_val = data.get("status", "").lower()
is_healthy = status_val in ("healthy", "ok")
return {
"node_id": node_id,
"status": "up" if is_healthy else "degraded",
"version": data.get("version"),
"agent_count": data.get("agent_count", 0),
"latency_ms": round(latency_ms, 2)
}
except Exception as e:
logger.warning(f"DAGI Router health check failed for {node_id}: {e}")
return {
"node_id": node_id,
"status": "down",
"version": None,
"agent_count": 0,
"latency_ms": None
}
@router.get("/internal/node/{node_id}/dagi-router/agents")
async def get_dagi_router_agents(node_id: str):
"""
Get list of agents for a node.
Since DAGI Router doesn't expose /agents endpoint, we use DB agents
and check router health to determine status.
Uses node-specific router_url from node_cache.
"""
import httpx
# Get router URL from database (node-specific)
endpoints = await repo_city.get_node_endpoints(node_id)
base_url = endpoints.get("router_url")
router_healthy = False
# Check if router is healthy
if base_url:
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(f"{base_url}/health")
if resp.status_code == 200:
data = resp.json()
# Router can return "healthy" or "ok"
status = data.get("status", "").lower()
router_healthy = status in ("healthy", "ok")
except Exception as e:
logger.warning(f"Failed to check router health for {node_id} at {base_url}: {e}")
# Get agents from DB for this node
try:
db_agents = await repo_city.get_agents_for_node(node_id)
except Exception as e:
logger.warning(f"Failed to get DB agents for {node_id}: {e}")
db_agents = []
# Build agents list - if router is healthy, agents are "active", otherwise "stale"
result_agents = []
for db_agent in db_agents:
agent_id = db_agent.get("id") or db_agent.get("slug")
if not agent_id:
continue
# Determine status based on router health and agent status
agent_status = db_agent.get("status", "offline")
if router_healthy and agent_status in ("online", "active"):
status = "active"
elif router_healthy:
status = "stale" # Router up but agent offline
else:
status = "stale" # Router down
result_agents.append({
"id": agent_id,
"name": db_agent.get("display_name") or db_agent.get("name"),
"kind": db_agent.get("kind"),
"runtime": f"{node_id}-router" if router_healthy else None,
"node_id": node_id,
"last_seen_at": db_agent.get("last_seen_at"),
"status": status,
"has_db_record": True
})
# Count by status
active = sum(1 for a in result_agents if a["status"] == "active")
phantom = sum(1 for a in result_agents if a["status"] == "phantom")
stale = sum(1 for a in result_agents if a["status"] == "stale")
return {
"node_id": node_id,
"total": len(result_agents),
"active": active,
"phantom": phantom,
"stale": stale,
"agents": result_agents
}
@router.get("/internal/node/{node_id}/dagi-router/summary")
async def get_dagi_router_summary(node_id: str):
"""
Get combined DAGI Router status summary for a node.
Includes health, agent counts, and last audit timestamp.
"""
# Get health
health = await get_dagi_router_health(node_id)
# Get agents info
agents_info = await get_dagi_router_agents(node_id)
# Get last audit timestamp
last_audit_at = None
try:
pool = await repo_city.get_pool()
row = await pool.fetchrow("""
SELECT MAX(created_at) as last_audit
FROM dagi_audit_reports
WHERE node_id = $1
""", node_id)
if row and row["last_audit"]:
last_audit_at = row["last_audit"].isoformat()
except Exception as e:
logger.warning(f"Failed to get last audit for {node_id}: {e}")
return {
"node_id": node_id,
"status": health.get("status", "down"),
"version": health.get("version"),
"latency_ms": health.get("latency_ms"),
"router_agent_count": health.get("agent_count", 0),
"db_agent_count": agents_info.get("total", 0),
"active": agents_info.get("active", 0),
"phantom": agents_info.get("phantom", 0),
"stale": agents_info.get("stale", 0),
"last_audit_at": last_audit_at
}
@router.get("/internal/node/{node_id}/directory-check")
async def check_node_in_directory(node_id: str):
"""
Перевірити чи нода видима в Node Directory.
Простий endpoint для Node Guardian self-healing loop.
"""
try:
visible = await repo_city.check_node_in_directory(node_id)
return {
"node_id": node_id,
"visible_in_directory": visible,
"checked_at": datetime.now(timezone.utc).isoformat()
}
except Exception as e:
logger.error(f"Directory check failed for {node_id}: {e}")
return {
"node_id": node_id,
"visible_in_directory": False,
"error": str(e)
}
@router.get("/internal/nodes/needing-healing", response_model=NodesNeedingHealingResponse)
async def get_nodes_needing_healing():
"""
Отримати список нод, які потребують self-healing.
Використовується для моніторингу та автоматичного healing.
"""
try:
nodes = await repo_city.get_nodes_needing_healing()
return NodesNeedingHealingResponse(
nodes=nodes,
total=len(nodes)
)
except Exception as e:
logger.error(f"Failed to get nodes needing healing: {e}")
return NodesNeedingHealingResponse(nodes=[], total=0)
@router.post("/internal/node/{node_id}/self-healing/trigger")
async def trigger_node_self_healing(node_id: str):
"""
Тригернути self-healing для ноди.
Ця операція:
1. Перевіряє стан ноди
2. Якщо нода не в Directory — виконує self-registration
3. Оновлює статус self-healing
"""
try:
# Check current state
status = await repo_city.get_node_self_healing_status(node_id)
actions_taken = []
if not status.get("registered"):
# Need to register
result = await repo_city.node_self_register(
node_id=node_id,
name=f"Auto-healed node {node_id}",
environment="production" if "node-1" in node_id else "development"
)
actions_taken.append({
"action": "self_register",
"result": result
})
# Check if visible in directory
visible = await repo_city.check_node_in_directory(node_id)
if not visible:
actions_taken.append({
"action": "visibility_check",
"result": {"visible": False, "needs_manual_intervention": True}
})
# Update healing status
final_status = "healthy" if visible else "needs_attention"
await repo_city.update_node_self_healing_status(
node_id=node_id,
status=final_status
)
return {
"node_id": node_id,
"triggered_at": datetime.now(timezone.utc).isoformat(),
"actions_taken": actions_taken,
"final_status": final_status,
"visible_in_directory": visible
}
except Exception as e:
logger.error(f"Self-healing trigger failed for {node_id}: {e}")
# Record error
await repo_city.update_node_self_healing_status(
node_id=node_id,
status="error",
error=str(e)
)
raise HTTPException(status_code=500, detail=f"Self-healing failed: {e}")