feat: Implement Matrix Rooms Bridge

- MATRIX_ROOMS_BRIDGE_SPEC.md documentation
- Migration 012: Add matrix_room_id/alias to city_rooms
- Matrix Gateway service (port 7025)
- City-service: auto-create Matrix rooms on room creation
- Backfill endpoint for existing rooms
- API returns matrix_room_id/alias in room responses
This commit is contained in:
Apple
2025-11-26 12:33:54 -08:00
parent 29febee464
commit 984f67c26e
19 changed files with 3356 additions and 0 deletions

View File

@@ -0,0 +1,24 @@
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Expose port
EXPOSE 7001
# Health check
HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:7001/health')"
# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7001"]

View File

@@ -0,0 +1,347 @@
# 🏙️ DAARION City Service
**Версія:** 1.0.0
**Статус:** Development (Mock Data)
**Порт:** 7001
---
## 📋 Опис
City Service — це агрегатор даних для City Dashboard в екосистемі DAARION. Збирає та об'єднує інформацію з різних джерел для створення повного знімку стану міста.
### Функціонал
- 📊 **City Snapshot** — повний знімок стану міста
- 👤 **User Context** — профіль користувача та archetype
- 🏛️ **MicroDAO State** — стан microDAO користувача
- 📈 **Metrics Aggregation** — глобальні метрики міста
- 🖥️ **Node Status** — стан усіх нод
- 🤖 **Agent Presence** — активні агенти
- 🎯 **Quests** — активні квести
- 📡 **Events Feed** — останні події міста
---
## 🚀 Швидкий старт
### Через Docker Compose
```bash
# З кореня проєкту
./scripts/start-city-space-services.sh
```
### Локально (Development)
```bash
cd services/city-service
# Створити віртуальне середовище
python -m venv venv
source venv/bin/activate # Linux/Mac
# або venv\Scripts\activate # Windows
# Встановити залежності
pip install -r requirements.txt
# Запустити сервіс
python main.py
# Або через uvicorn
uvicorn main:app --reload --port 7001
```
---
## 📡 API Endpoints
### **GET** `/health`
Health check endpoint
**Response:**
```json
{
"status": "healthy",
"service": "city-service"
}
```
---
### **GET** `/api/city/snapshot`
Повертає повний знімок стану міста DAARION
**Response:** `CitySnapshot`
```json
{
"user": {
"id": "user:93",
"handle": "@alice:daarion.city",
"archetype": "Explorer",
"microdaoId": "microdao:7"
},
"microdao": {
"id": "microdao:7",
"name": "Quantum Garden",
"members": 7,
"humans": 4,
"agents": 3,
"balanceDcr": 12820,
"activity24h": 0.84
},
"metrics": {
"activityIndex": 0.71,
"avgAgentLatencyMs": 13,
"natsTps": 48200,
"nodeAvgLoad": 0.66,
"errorRate": 0.009,
"questEngagement": 0.62
},
"nodes": [...],
"agents": [...],
"quests": [...],
"events": [...]
}
```
---
## 🗺️ Схема агрегації даних
```
┌─────────────────────────────────────────────────────────┐
│ City Service │
│ (Port: 7001) │
└─────────────────────────────────────────────────────────┘
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌──────────────┐
│ Auth / │ │ microDAO │ │ Metrics │
│ Profile │ │ Service │ │ Collector │
│ Service │ │ │ │ │
└─────────────┘ └─────────────┘ └──────────────┘
┌────────────────────────┤
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ NATS │ │ Redis / │
│ JetStream │ │ Timescale │
└──────────────┘ └──────────────┘
┌───────┴────────┐
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ NodeMetrics │ │ Agent │
│ Agent │ │ Registry │
└──────────────┘ └──────────────┘
```
---
## 📊 Джерела даних
| Поле | Джерело | NATS Subject |
| ---------- | ----------------------------------------- | ------------------------- |
| `user` | Auth / Profile service | `user.profile.*` |
| `microdao` | microDAO service | `microdao.state.*` |
| `metrics` | Metrics collector (NATS → Redis/TSDB) | `metrics.city.*` |
| `nodes` | NodeMetrics Agent (NATS `node.metrics.*`) | `node.metrics.*` |
| `agents` | Agent Registry | `agent.status.*` |
| `quests` | Quest Engine | `quest.active.*` |
| `events` | JetStream Stream `events.city.*` | `events.city.*` |
---
## 🔧 Конфігурація
### Environment Variables
```bash
# Service
LOG_LEVEL=INFO
ENVIRONMENT=development
# Redis (для кешу метрик)
REDIS_URL=redis://redis:6379
# NATS (для підписки на події)
NATS_URL=nats://nats:4222
# PostgreSQL (для user/microDAO даних)
DATABASE_URL=postgresql://postgres:postgres@postgres:5432/daarion
# CORS
CORS_ORIGINS=http://localhost:8899,https://daarion.city
```
---
## 🏗️ Структура проєкту
```
services/city-service/
├── main.py # FastAPI application
├── requirements.txt # Python dependencies
├── Dockerfile # Docker image
├── README.md # Ця документація
├── models/ # Pydantic models (TODO)
│ ├── __init__.py
│ ├── city.py
│ └── response.py
├── services/ # Business logic (TODO)
│ ├── __init__.py
│ ├── aggregator.py # Data aggregation
│ ├── nats_client.py # NATS integration
│ └── redis_client.py # Redis integration
└── tests/ # Unit tests (TODO)
├── __init__.py
└── test_api.py
```
---
## 🧪 Тестування
```bash
# Health check
curl http://localhost:7001/health
# Get city snapshot
curl http://localhost:7001/api/city/snapshot
# Через API Gateway
curl http://localhost:8080/api/city/snapshot
```
### Expected Response Time
- `/health`: < 10ms
- `/api/city/snapshot`: < 100ms (з кешем)
---
## 📈 Моніторинг
### Health Checks
```bash
# Docker health check
docker inspect daarion-city-service | grep Health
# Manual health check
curl -f http://localhost:7001/health || exit 1
```
### Logs
```bash
# Follow logs
docker logs -f daarion-city-service
# Last 100 lines
docker logs --tail 100 daarion-city-service
```
### Metrics (TODO)
- Prometheus endpoint: `/metrics`
- Grafana dashboard: City Service Overview
---
## 🚨 Troubleshooting
### Service not starting
```bash
# Check logs
docker logs daarion-city-service
# Rebuild
docker-compose -f docker-compose.city-space.yml up -d --build city-service
```
### CORS errors
```bash
# Check CORS_ORIGINS environment variable
docker exec daarion-city-service env | grep CORS
```
### Slow response times
- Перевір з'єднання з Redis
- Перевір з'єднання з NATS
- Переглянь логи для помилок агрегації
---
## 🗺️ Roadmap
### Phase 1: Mock Data ✅
- [x] FastAPI application
- [x] Mock city snapshot
- [x] OpenAPI documentation
- [x] Docker setup
### Phase 2: Real Data Integration (Current)
- [ ] NATS client integration
- [ ] Redis client integration
- [ ] PostgreSQL integration
- [ ] Real-time metrics aggregation
- [ ] User profile integration
- [ ] MicroDAO state integration
### Phase 3: WebSocket Support
- [ ] `/ws/city` — real-time city updates
- [ ] `/ws/events` — event stream
- [ ] `/ws/metrics` — live metrics
### Phase 4: Optimization
- [ ] Response caching
- [ ] Query optimization
- [ ] Load testing
- [ ] Horizontal scaling
---
## 📚 Документація
- **OpenAPI Docs:** http://localhost:7001/docs
- **ReDoc:** http://localhost:7001/redoc
- **OpenAPI JSON:** http://localhost:7001/openapi.json
---
## 🤝 Contributing
1. Створи feature branch
2. Додай тести
3. Оновити документацію
4. Створи PR
---
## 📄 License
Proprietary — DAARION Ecosystem
---
## 📞 Контакти
- **Maintainer:** DAARION Core Team
- **Issues:** GitHub Issues
- **Slack:** #city-service

View File

View File

@@ -0,0 +1,107 @@
"""
Redis Client для DAARION
Використовується для Presence System та інших real-time features
"""
import os
import redis.asyncio as aioredis
from typing import Optional
import logging
logger = logging.getLogger(__name__)
_redis_client: Optional[aioredis.Redis] = None
async def get_redis() -> aioredis.Redis:
"""
Отримати Redis клієнт (singleton)
"""
global _redis_client
if _redis_client is None:
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
try:
_redis_client = await aioredis.from_url(
redis_url,
encoding="utf-8",
decode_responses=True,
max_connections=10
)
logger.info(f"✅ Redis connected: {redis_url}")
except Exception as e:
logger.error(f"❌ Redis connection failed: {e}")
raise
return _redis_client
async def close_redis():
"""
Закрити Redis connection
"""
global _redis_client
if _redis_client is not None:
await _redis_client.close()
_redis_client = None
logger.info("❌ Redis connection closed")
class PresenceRedis:
"""
Helper для роботи з Presence System в Redis
"""
PREFIX = "presence:user:"
TTL = 40 # seconds
@staticmethod
async def set_online(user_id: str) -> None:
"""Встановити користувача онлайн"""
redis = await get_redis()
key = f"{PresenceRedis.PREFIX}{user_id}"
await redis.setex(key, PresenceRedis.TTL, "online")
@staticmethod
async def is_online(user_id: str) -> bool:
"""Перевірити чи користувач онлайн"""
redis = await get_redis()
key = f"{PresenceRedis.PREFIX}{user_id}"
value = await redis.get(key)
return value == "online"
@staticmethod
async def get_all_online() -> list[str]:
"""Отримати всіх онлайн користувачів"""
redis = await get_redis()
pattern = f"{PresenceRedis.PREFIX}*"
keys = []
async for key in redis.scan_iter(match=pattern, count=100):
user_id = key.replace(PresenceRedis.PREFIX, "")
keys.append(user_id)
return keys
@staticmethod
async def get_online_count() -> int:
"""Отримати кількість онлайн користувачів"""
users = await PresenceRedis.get_all_online()
return len(users)
@staticmethod
async def refresh_ttl(user_id: str) -> None:
"""Оновити TTL для користувача (heartbeat)"""
redis = await get_redis()
key = f"{PresenceRedis.PREFIX}{user_id}"
# Перевірити чи key існує
exists = await redis.exists(key)
if exists:
await redis.expire(key, PresenceRedis.TTL)
else:
# Якщо не існує — створити
await redis.setex(key, PresenceRedis.TTL, "online")

View File

@@ -0,0 +1,351 @@
"""
DAARION City Service
Агрегатор даних для City Dashboard + City Rooms + Presence
"""
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional
import logging
import asyncio
# Import new modules
import routes_city
import ws_city
import repo_city
from common.redis_client import get_redis, close_redis
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="DAARION City Service",
version="2.0.0",
description="City snapshot aggregator + Rooms + Presence for DAARION ecosystem"
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # TODO: обмежити в production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(routes_city.router)
# ============================================================================
# Models
# ============================================================================
class CityUser(BaseModel):
id: str
handle: str
archetype: str
microdaoId: Optional[str] = None
class CityMicroDAO(BaseModel):
id: str
name: str
members: int
humans: int
agents: int
balanceDcr: float
activity24h: float = Field(ge=0, le=1)
class CityMetrics(BaseModel):
activityIndex: float = Field(ge=0, le=1)
avgAgentLatencyMs: float
natsTps: int
nodeAvgLoad: float = Field(ge=0, le=1)
errorRate: float
questEngagement: float = Field(ge=0, le=1)
class CityNode(BaseModel):
id: str
label: str
gpuLoad: float = Field(ge=0, le=1)
latencyMs: float
agents: int
status: str = Field(pattern="^(healthy|warn|critical)$")
class CityAgentSummary(BaseModel):
id: str
name: str
role: str
status: str = Field(pattern="^(online|offline|busy)$")
lastAction: Optional[str] = None
class CityQuestSummary(BaseModel):
id: str
label: str
progress: float = Field(ge=0, le=1)
class CityEvent(BaseModel):
id: str
type: str = Field(pattern="^(dao|node|matrix|quest|system)$")
label: str
timestamp: str
severity: str = Field(pattern="^(info|warn|error)$")
class CitySnapshot(BaseModel):
user: CityUser
microdao: Optional[CityMicroDAO]
metrics: CityMetrics
nodes: List[CityNode]
agents: List[CityAgentSummary]
quests: List[CityQuestSummary]
events: List[CityEvent]
# ============================================================================
# Mock Data (тимчасово, до інтеграції з реальними джерелами)
# ============================================================================
MOCK_CITY_SNAPSHOT = CitySnapshot(
user=CityUser(
id="user:93",
handle="@alice:daarion.city",
archetype="Explorer",
microdaoId="microdao:7"
),
microdao=CityMicroDAO(
id="microdao:7",
name="Quantum Garden",
members=7,
humans=4,
agents=3,
balanceDcr=12820,
activity24h=0.84
),
metrics=CityMetrics(
activityIndex=0.71,
avgAgentLatencyMs=13,
natsTps=48200,
nodeAvgLoad=0.66,
errorRate=0.009,
questEngagement=0.62
),
nodes=[
CityNode(
id="node:03",
label="Quantum Relay",
gpuLoad=0.72,
latencyMs=14,
agents=14,
status="healthy"
),
CityNode(
id="node:04",
label="Atlas Engine",
gpuLoad=0.88,
latencyMs=19,
agents=11,
status="warn"
)
],
agents=[
CityAgentSummary(
id="agent:sofia",
name="Sofia-Prime",
role="System Architect",
status="online",
lastAction="Summarized DAO events 2m ago"
)
],
quests=[
CityQuestSummary(id="q1", label="Visit Space Map", progress=0.4),
CityQuestSummary(id="q2", label="Vote in DAO proposal", progress=0.0),
],
events=[
CityEvent(
id="evt-1133",
type="dao",
label="New proposal in Aurora Circle",
timestamp="2025-11-24T09:12:11Z",
severity="info"
),
CityEvent(
id="evt-1134",
type="node",
label="NODE-03 GPU spike",
timestamp="2025-11-24T09:12:14Z",
severity="warn"
)
]
)
# ============================================================================
# API Endpoints
# ============================================================================
@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "healthy", "service": "city-service"}
@app.get("/api/city/snapshot", response_model=CitySnapshot)
async def get_city_snapshot():
"""
Повертає повний знімок стану міста DAARION
Агрегує дані з:
- Auth / Profile service
- microDAO service
- Metrics collector (NATS → Redis/TSDB)
- NodeMetrics Agent (NATS node.metrics.*)
- Agent Registry
- Quest Engine
- JetStream Stream events.city.*
"""
try:
# TODO: замінити на реальну агрегацію даних
logger.info("Fetching city snapshot")
return MOCK_CITY_SNAPSHOT
except Exception as e:
logger.error(f"Error fetching city snapshot: {e}")
raise HTTPException(status_code=500, detail="Failed to fetch city snapshot")
# ============================================================================
# WebSocket Endpoints
# ============================================================================
from websocket import (
manager,
city_updates_generator,
events_stream_generator,
metrics_stream_generator,
agents_presence_generator,
)
@app.websocket("/ws/city")
async def websocket_city(websocket: WebSocket):
"""
WebSocket для live оновлень City Dashboard
Надсилає оновлення метрик, нод, агентів кожні 5 секунд
"""
await manager.connect(websocket, "city")
try:
while True:
# Keep connection alive
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
manager.disconnect(websocket)
@app.websocket("/ws/events")
async def websocket_events(websocket: WebSocket):
"""
WebSocket для потоку подій міста
Надсилає нові події в реальному часі
"""
await manager.connect(websocket, "events")
try:
while True:
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
manager.disconnect(websocket)
@app.websocket("/ws/metrics")
async def websocket_metrics(websocket: WebSocket):
"""
WebSocket для live метрик
Надсилає оновлення метрик кожну секунду
"""
await manager.connect(websocket, "metrics")
try:
while True:
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
manager.disconnect(websocket)
@app.websocket("/ws/agents")
async def websocket_agents(websocket: WebSocket):
"""
WebSocket для присутності агентів
Надсилає оновлення присутності агентів
"""
await manager.connect(websocket, "agents")
try:
while True:
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
manager.disconnect(websocket)
@app.websocket("/ws/city/rooms/{room_id}")
async def websocket_room_endpoint(websocket: WebSocket, room_id: str):
"""WebSocket для City Room"""
await ws_city.websocket_city_room(websocket, room_id)
@app.websocket("/ws/city/presence")
async def websocket_presence_endpoint(websocket: WebSocket):
"""WebSocket для Presence System"""
await ws_city.websocket_city_presence(websocket)
@app.on_event("startup")
async def startup_event():
"""Запустити background tasks для WebSocket оновлень"""
logger.info("🚀 City Service starting...")
# Initialize Redis
try:
await get_redis()
logger.info("✅ Redis connection established")
except Exception as e:
logger.error(f"❌ Redis connection failed: {e}")
# Background tasks
asyncio.create_task(city_updates_generator())
asyncio.create_task(events_stream_generator())
asyncio.create_task(metrics_stream_generator())
asyncio.create_task(agents_presence_generator())
asyncio.create_task(ws_city.presence_cleanup_task())
logger.info("✅ WebSocket background tasks started")
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup при зупинці"""
logger.info("🛑 City Service shutting down...")
await repo_city.close_pool()
await close_redis()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7001)

View File

@@ -0,0 +1,84 @@
"""
Matrix Gateway Client for City Service
"""
import os
import httpx
import logging
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
MATRIX_GATEWAY_URL = os.getenv("MATRIX_GATEWAY_URL", "http://daarion-matrix-gateway:7025")
async def create_matrix_room(slug: str, name: str, visibility: str = "public") -> Tuple[Optional[str], Optional[str]]:
"""
Create a Matrix room via Matrix Gateway.
Returns:
Tuple of (matrix_room_id, matrix_room_alias) or (None, None) on failure
"""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.post(
f"{MATRIX_GATEWAY_URL}/internal/matrix/rooms/create",
json={
"slug": slug,
"name": name,
"visibility": visibility
}
)
if response.status_code == 200:
data = response.json()
logger.info(f"Matrix room created: {data['matrix_room_id']}")
return data["matrix_room_id"], data["matrix_room_alias"]
else:
logger.error(f"Failed to create Matrix room: {response.text}")
return None, None
except httpx.RequestError as e:
logger.error(f"Matrix Gateway request error: {e}")
return None, None
except Exception as e:
logger.error(f"Matrix room creation error: {e}")
return None, None
async def find_matrix_room_by_alias(alias: str) -> Tuple[Optional[str], Optional[str]]:
"""
Find a Matrix room by alias via Matrix Gateway.
Returns:
Tuple of (matrix_room_id, matrix_room_alias) or (None, None) if not found
"""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.get(
f"{MATRIX_GATEWAY_URL}/internal/matrix/rooms/find-by-alias",
params={"alias": alias}
)
if response.status_code == 200:
data = response.json()
return data["matrix_room_id"], data["matrix_room_alias"]
elif response.status_code == 404:
return None, None
else:
logger.error(f"Failed to find Matrix room: {response.text}")
return None, None
except httpx.RequestError as e:
logger.error(f"Matrix Gateway request error: {e}")
return None, None
async def check_matrix_gateway_health() -> bool:
"""Check if Matrix Gateway is available."""
async with httpx.AsyncClient(timeout=5.0) as client:
try:
response = await client.get(f"{MATRIX_GATEWAY_URL}/healthz")
return response.status_code == 200
except Exception:
return False

View File

@@ -0,0 +1,109 @@
"""
Pydantic Models для City Backend
"""
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
# =============================================================================
# City Rooms
# =============================================================================
class CityRoomBase(BaseModel):
slug: str
name: str
description: Optional[str] = None
class CityRoomCreate(CityRoomBase):
pass
class CityRoomRead(CityRoomBase):
id: str
is_default: bool
created_at: datetime
created_by: Optional[str] = None
members_online: int = 0
last_event: Optional[str] = None
# Matrix integration
matrix_room_id: Optional[str] = None
matrix_room_alias: Optional[str] = None
# =============================================================================
# City Room Messages
# =============================================================================
class CityRoomMessageBase(BaseModel):
body: str = Field(..., min_length=1, max_length=10000)
class CityRoomMessageCreate(CityRoomMessageBase):
pass
class CityRoomMessageRead(CityRoomMessageBase):
id: str
room_id: str
author_user_id: Optional[str] = None
author_agent_id: Optional[str] = None
username: Optional[str] = "Anonymous" # Для frontend
created_at: datetime
# =============================================================================
# City Room Detail (з повідомленнями)
# =============================================================================
class CityRoomDetail(CityRoomRead):
messages: List[CityRoomMessageRead] = []
online_members: List[str] = [] # user_ids
# =============================================================================
# City Feed Events
# =============================================================================
class CityFeedEventRead(BaseModel):
id: str
kind: str # 'room_message', 'agent_reply', 'system', 'dao_event'
room_id: Optional[str] = None
user_id: Optional[str] = None
agent_id: Optional[str] = None
payload: dict
created_at: datetime
# =============================================================================
# Presence
# =============================================================================
class PresenceUpdate(BaseModel):
user_id: str
status: str # 'online', 'offline', 'away'
last_seen: Optional[datetime] = None
class PresenceBulkUpdate(BaseModel):
users: List[PresenceUpdate]
# =============================================================================
# WebSocket Messages
# =============================================================================
class WSRoomMessage(BaseModel):
event: str # 'room.message', 'room.join', 'room.leave'
room_id: Optional[str] = None
user_id: Optional[str] = None
message: Optional[CityRoomMessageRead] = None
class WSPresenceMessage(BaseModel):
event: str # 'presence.heartbeat', 'presence.update'
user_id: str
status: Optional[str] = None

View File

@@ -0,0 +1,228 @@
"""
Repository для City Backend (PostgreSQL)
"""
import os
import asyncpg
from typing import Optional, List
from datetime import datetime
import secrets
# Database connection
_pool: Optional[asyncpg.Pool] = None
async def get_pool() -> asyncpg.Pool:
"""Отримати connection pool"""
global _pool
if _pool is None:
database_url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/daarion")
_pool = await asyncpg.create_pool(database_url, min_size=2, max_size=10)
return _pool
async def close_pool():
"""Закрити connection pool"""
global _pool
if _pool is not None:
await _pool.close()
_pool = None
def generate_id(prefix: str) -> str:
"""Генерувати простий ID"""
return f"{prefix}_{secrets.token_urlsafe(12)}"
# =============================================================================
# City Rooms Repository
# =============================================================================
async def get_all_rooms(limit: int = 100, offset: int = 0) -> List[dict]:
"""Отримати всі кімнати"""
pool = await get_pool()
query = """
SELECT id, slug, name, description, is_default, created_at, created_by,
matrix_room_id, matrix_room_alias
FROM city_rooms
ORDER BY is_default DESC, created_at DESC
LIMIT $1 OFFSET $2
"""
rows = await pool.fetch(query, limit, offset)
return [dict(row) for row in rows]
async def get_room_by_id(room_id: str) -> Optional[dict]:
"""Отримати кімнату по ID"""
pool = await get_pool()
query = """
SELECT id, slug, name, description, is_default, created_at, created_by,
matrix_room_id, matrix_room_alias
FROM city_rooms
WHERE id = $1
"""
row = await pool.fetchrow(query, room_id)
return dict(row) if row else None
async def get_room_by_slug(slug: str) -> Optional[dict]:
"""Отримати кімнату по slug"""
pool = await get_pool()
query = """
SELECT id, slug, name, description, is_default, created_at, created_by,
matrix_room_id, matrix_room_alias
FROM city_rooms
WHERE slug = $1
"""
row = await pool.fetchrow(query, slug)
return dict(row) if row else None
async def create_room(
slug: str,
name: str,
description: Optional[str],
created_by: Optional[str],
matrix_room_id: Optional[str] = None,
matrix_room_alias: Optional[str] = None
) -> dict:
"""Створити кімнату"""
pool = await get_pool()
room_id = f"room_city_{slug}"
query = """
INSERT INTO city_rooms (id, slug, name, description, created_by, matrix_room_id, matrix_room_alias)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id, slug, name, description, is_default, created_at, created_by, matrix_room_id, matrix_room_alias
"""
row = await pool.fetchrow(query, room_id, slug, name, description, created_by, matrix_room_id, matrix_room_alias)
return dict(row)
async def update_room_matrix(room_id: str, matrix_room_id: str, matrix_room_alias: str) -> Optional[dict]:
"""Оновити Matrix поля кімнати"""
pool = await get_pool()
query = """
UPDATE city_rooms
SET matrix_room_id = $2, matrix_room_alias = $3
WHERE id = $1
RETURNING id, slug, name, description, is_default, created_at, created_by, matrix_room_id, matrix_room_alias
"""
row = await pool.fetchrow(query, room_id, matrix_room_id, matrix_room_alias)
return dict(row) if row else None
async def get_rooms_without_matrix() -> List[dict]:
"""Отримати кімнати без Matrix інтеграції"""
pool = await get_pool()
query = """
SELECT id, slug, name, description, is_default, created_at, created_by,
matrix_room_id, matrix_room_alias
FROM city_rooms
WHERE matrix_room_id IS NULL
ORDER BY created_at
"""
rows = await pool.fetch(query)
return [dict(row) for row in rows]
# =============================================================================
# City Room Messages Repository
# =============================================================================
async def get_room_messages(room_id: str, limit: int = 50) -> List[dict]:
"""Отримати повідомлення кімнати"""
pool = await get_pool()
query = """
SELECT id, room_id, author_user_id, author_agent_id, body, created_at
FROM city_room_messages
WHERE room_id = $1
ORDER BY created_at DESC
LIMIT $2
"""
rows = await pool.fetch(query, room_id, limit)
# Reverse для правильного порядку (старі → нові)
return [dict(row) for row in reversed(rows)]
async def create_room_message(
room_id: str,
body: str,
author_user_id: Optional[str] = None,
author_agent_id: Optional[str] = None
) -> dict:
"""Створити повідомлення в кімнаті"""
pool = await get_pool()
message_id = generate_id("m_city")
query = """
INSERT INTO city_room_messages (id, room_id, author_user_id, author_agent_id, body)
VALUES ($1, $2, $3, $4, $5)
RETURNING id, room_id, author_user_id, author_agent_id, body, created_at
"""
row = await pool.fetchrow(query, message_id, room_id, author_user_id, author_agent_id, body)
return dict(row)
# =============================================================================
# City Feed Events Repository
# =============================================================================
async def get_feed_events(limit: int = 20, offset: int = 0) -> List[dict]:
"""Отримати події feed"""
pool = await get_pool()
query = """
SELECT id, kind, room_id, user_id, agent_id, payload, created_at
FROM city_feed_events
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
"""
rows = await pool.fetch(query, limit, offset)
return [dict(row) for row in rows]
async def create_feed_event(
kind: str,
payload: dict,
room_id: Optional[str] = None,
user_id: Optional[str] = None,
agent_id: Optional[str] = None
) -> dict:
"""Створити подію в feed"""
pool = await get_pool()
event_id = generate_id("evt_city")
query = """
INSERT INTO city_feed_events (id, kind, room_id, user_id, agent_id, payload)
VALUES ($1, $2, $3, $4, $5, $6::jsonb)
RETURNING id, kind, room_id, user_id, agent_id, payload, created_at
"""
import json
payload_json = json.dumps(payload)
row = await pool.fetchrow(query, event_id, kind, room_id, user_id, agent_id, payload_json)
return dict(row)

View File

@@ -0,0 +1,7 @@
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
asyncpg==0.29.0
redis==5.0.1
websockets==12.0
requests==2.31.0

View File

@@ -0,0 +1,311 @@
"""
City Backend API Routes
"""
from fastapi import APIRouter, HTTPException, Depends, Body
from typing import List, Optional
import logging
from models_city import (
CityRoomRead,
CityRoomCreate,
CityRoomDetail,
CityRoomMessageRead,
CityRoomMessageCreate,
CityFeedEventRead
)
import repo_city
from common.redis_client import PresenceRedis, get_redis
from matrix_client import create_matrix_room, find_matrix_room_by_alias
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/city", tags=["city"])
# =============================================================================
# City Rooms API
# =============================================================================
@router.get("/rooms", response_model=List[CityRoomRead])
async def get_city_rooms(limit: int = 100, offset: int = 0):
"""
Отримати список всіх City Rooms
"""
try:
rooms = await repo_city.get_all_rooms(limit=limit, offset=offset)
# Додати online count (приблизно)
online_count = await PresenceRedis.get_online_count()
result = []
for room in rooms:
result.append({
**room,
"members_online": online_count if room.get("is_default") else max(1, online_count // 2),
"last_event": None # TODO: з останнього повідомлення
})
return result
except Exception as e:
logger.error(f"Failed to get city rooms: {e}")
raise HTTPException(status_code=500, detail="Failed to get city rooms")
@router.post("/rooms", response_model=CityRoomRead)
async def create_city_room(payload: CityRoomCreate):
"""
Створити нову City Room (автоматично створює Matrix room)
"""
try:
# TODO: витягнути user_id з JWT
created_by = "u_system" # Mock для MVP
# Перевірити чи не існує вже
existing = await repo_city.get_room_by_slug(payload.slug)
if existing:
raise HTTPException(status_code=409, detail="Room with this slug already exists")
# Створити Matrix room
matrix_room_id, matrix_room_alias = await create_matrix_room(
slug=payload.slug,
name=payload.name,
visibility="public"
)
if not matrix_room_id:
logger.warning(f"Failed to create Matrix room for {payload.slug}, proceeding without Matrix")
room = await repo_city.create_room(
slug=payload.slug,
name=payload.name,
description=payload.description,
created_by=created_by,
matrix_room_id=matrix_room_id,
matrix_room_alias=matrix_room_alias
)
# Додати початкове повідомлення
await repo_city.create_room_message(
room_id=room["id"],
body=f"Кімната '{payload.name}' створена! Ласкаво просимо! 🎉",
author_agent_id="ag_system"
)
# Додати в feed
await repo_city.create_feed_event(
kind="system",
room_id=room["id"],
payload={"action": "room_created", "room_name": payload.name, "matrix_room_id": matrix_room_id}
)
return {**room, "members_online": 1, "last_event": None}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to create city room: {e}")
raise HTTPException(status_code=500, detail="Failed to create city room")
@router.get("/rooms/{room_id}", response_model=CityRoomDetail)
async def get_city_room(room_id: str):
"""
Отримати деталі City Room з повідомленнями
"""
try:
room = await repo_city.get_room_by_id(room_id)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
messages = await repo_city.get_room_messages(room_id, limit=50)
# Додати username до повідомлень
for msg in messages:
if msg.get("author_user_id"):
msg["username"] = f"User-{msg['author_user_id'][-4:]}" # Mock
elif msg.get("author_agent_id"):
msg["username"] = "System Agent"
else:
msg["username"] = "Anonymous"
online_users = await PresenceRedis.get_all_online()
return {
**room,
"members_online": len(online_users),
"last_event": None,
"messages": messages,
"online_members": online_users[:20] # Перші 20
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get city room: {e}")
raise HTTPException(status_code=500, detail="Failed to get city room")
@router.post("/rooms/{room_id}/messages", response_model=CityRoomMessageRead)
async def send_city_room_message(room_id: str, payload: CityRoomMessageCreate):
"""
Надіслати повідомлення в City Room
"""
try:
# Перевірити чи кімната існує
room = await repo_city.get_room_by_id(room_id)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
# TODO: витягнути user_id з JWT
author_user_id = "u_mock_user" # Mock для MVP
# Створити повідомлення
message = await repo_city.create_room_message(
room_id=room_id,
body=payload.body,
author_user_id=author_user_id
)
# Додати в feed
await repo_city.create_feed_event(
kind="room_message",
room_id=room_id,
user_id=author_user_id,
payload={"body": payload.body[:100], "message_id": message["id"]}
)
# TODO: Broadcast WS event
# await ws_manager.broadcast_to_room(room_id, {
# "event": "room.message",
# "message": message
# })
# Додати username
message["username"] = f"User-{author_user_id[-4:]}"
return message
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to send room message: {e}")
raise HTTPException(status_code=500, detail="Failed to send message")
@router.post("/rooms/{room_id}/join")
async def join_city_room(room_id: str):
"""
Приєднатися до City Room (для tracking)
"""
# TODO: витягнути user_id з JWT
user_id = "u_mock_user"
# Для MVP просто повертаємо success
# У production можна зберігати active memberships в Redis
logger.info(f"User {user_id} joined room {room_id}")
return {"status": "joined", "room_id": room_id}
@router.post("/rooms/{room_id}/leave")
async def leave_city_room(room_id: str):
"""
Покинути City Room
"""
# TODO: витягнути user_id з JWT
user_id = "u_mock_user"
logger.info(f"User {user_id} left room {room_id}")
return {"status": "left", "room_id": room_id}
# =============================================================================
# Matrix Backfill API (Internal)
# =============================================================================
@router.post("/matrix/backfill")
async def backfill_matrix_rooms():
"""
Backfill Matrix rooms for existing City Rooms that don't have Matrix integration.
This is an internal endpoint for admin use.
"""
try:
rooms_without_matrix = await repo_city.get_rooms_without_matrix()
results = {
"processed": 0,
"created": 0,
"found": 0,
"failed": 0,
"details": []
}
for room in rooms_without_matrix:
results["processed"] += 1
slug = room["slug"]
name = room["name"]
room_id = room["id"]
# Спочатку спробувати знайти існуючу Matrix room
alias = f"#city_{slug}:daarion.space"
matrix_room_id, matrix_room_alias = await find_matrix_room_by_alias(alias)
if matrix_room_id:
# Знайдено існуючу
await repo_city.update_room_matrix(room_id, matrix_room_id, matrix_room_alias)
results["found"] += 1
results["details"].append({
"room_id": room_id,
"slug": slug,
"status": "found",
"matrix_room_id": matrix_room_id
})
else:
# Створити нову
matrix_room_id, matrix_room_alias = await create_matrix_room(slug, name, "public")
if matrix_room_id:
await repo_city.update_room_matrix(room_id, matrix_room_id, matrix_room_alias)
results["created"] += 1
results["details"].append({
"room_id": room_id,
"slug": slug,
"status": "created",
"matrix_room_id": matrix_room_id
})
else:
results["failed"] += 1
results["details"].append({
"room_id": room_id,
"slug": slug,
"status": "failed",
"error": "Could not create Matrix room"
})
logger.info(f"Matrix backfill completed: {results['processed']} processed, "
f"{results['created']} created, {results['found']} found, {results['failed']} failed")
return results
except Exception as e:
logger.error(f"Matrix backfill failed: {e}")
raise HTTPException(status_code=500, detail=f"Backfill failed: {str(e)}")
# =============================================================================
# City Feed API
# =============================================================================
@router.get("/feed", response_model=List[CityFeedEventRead])
async def get_city_feed(limit: int = 20, offset: int = 0):
"""
Отримати City Feed (останні події)
"""
try:
events = await repo_city.get_feed_events(limit=limit, offset=offset)
return events
except Exception as e:
logger.error(f"Failed to get city feed: {e}")
raise HTTPException(status_code=500, detail="Failed to get city feed")

View File

@@ -0,0 +1,162 @@
"""
WebSocket Support for City Service
Real-time updates для City Dashboard
"""
from fastapi import WebSocket, WebSocketDisconnect
from typing import List, Dict, Any
import asyncio
import json
import logging
logger = logging.getLogger(__name__)
class ConnectionManager:
"""Менеджер WebSocket з'єднань"""
def __init__(self):
self.active_connections: List[WebSocket] = []
self.subscriptions: Dict[str, List[WebSocket]] = {
"city": [],
"events": [],
"metrics": [],
"agents": [],
}
async def connect(self, websocket: WebSocket, channel: str = "city"):
"""Підключити WebSocket"""
await websocket.accept()
self.active_connections.append(websocket)
if channel in self.subscriptions:
self.subscriptions[channel].append(websocket)
logger.info(f"Client connected to channel: {channel}")
def disconnect(self, websocket: WebSocket):
"""Від'єднати WebSocket"""
self.active_connections.remove(websocket)
for channel in self.subscriptions.values():
if websocket in channel:
channel.remove(websocket)
logger.info("Client disconnected")
async def send_personal_message(self, message: str, websocket: WebSocket):
"""Надіслати повідомлення конкретному клієнту"""
await websocket.send_text(message)
async def broadcast(self, message: str, channel: str = "city"):
"""Надіслати повідомлення всім клієнтам каналу"""
if channel in self.subscriptions:
for connection in self.subscriptions[channel]:
try:
await connection.send_text(message)
except Exception as e:
logger.error(f"Error broadcasting to client: {e}")
async def broadcast_json(self, data: Dict[str, Any], channel: str = "city"):
"""Надіслати JSON всім клієнтам каналу"""
message = json.dumps(data)
await self.broadcast(message, channel)
# Глобальний instance
manager = ConnectionManager()
async def city_updates_generator():
"""
Генератор оновлень для City Dashboard
TODO: Підключити до реальних джерел (NATS, Redis)
"""
while True:
await asyncio.sleep(5) # Оновлення кожні 5 секунд
# Mock update
update = {
"type": "city_update",
"timestamp": "2025-11-24T10:00:00Z",
"data": {
"metrics": {
"activityIndex": 0.72,
"nodeAvgLoad": 0.65,
},
"nodes_online": 12
}
}
await manager.broadcast_json(update, "city")
async def events_stream_generator():
"""
Генератор потоку подій
TODO: Підключити до NATS JetStream events.city.*
"""
while True:
await asyncio.sleep(3) # Нові події кожні 3 секунди
# Mock event
event = {
"type": "city_event",
"timestamp": "2025-11-24T10:00:00Z",
"event": {
"id": f"evt-{asyncio.get_event_loop().time()}",
"type": "node",
"label": "Mock event for testing",
"severity": "info"
}
}
await manager.broadcast_json(event, "events")
async def metrics_stream_generator():
"""
Генератор live метрик
TODO: Підключити до Redis/Prometheus
"""
while True:
await asyncio.sleep(1) # Метрики кожну секунду
# Mock metrics
metrics = {
"type": "metrics_update",
"timestamp": "2025-11-24T10:00:00Z",
"metrics": {
"activityIndex": 0.71 + (asyncio.get_event_loop().time() % 10) / 100,
"natsTps": int(48000 + (asyncio.get_event_loop().time() % 1000)),
}
}
await manager.broadcast_json(metrics, "metrics")
async def agents_presence_generator():
"""
Генератор присутності агентів
TODO: Підключити до Agent Registry
"""
while True:
await asyncio.sleep(10) # Оновлення присутності кожні 10 секунд
# Mock agent presence
presence = {
"type": "agent_presence",
"timestamp": "2025-11-24T10:00:00Z",
"agents": {
"online": 42,
"offline": 3,
"busy": 5,
}
}
await manager.broadcast_json(presence, "agents")

View File

@@ -0,0 +1,222 @@
"""
WebSocket Endpoints для City Backend
Rooms + Presence System
"""
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Set, Optional
import json
import asyncio
import logging
from common.redis_client import PresenceRedis
logger = logging.getLogger(__name__)
# =============================================================================
# WebSocket Connection Manager
# =============================================================================
class CityWSManager:
"""Менеджер WebSocket підключень для City"""
def __init__(self):
# room_id -> set of websockets
self.room_connections: Dict[str, Set[WebSocket]] = {}
# presence connections
self.presence_connections: Set[WebSocket] = set()
async def connect_to_room(self, websocket: WebSocket, room_id: str):
"""Підключити клієнта до кімнати"""
await websocket.accept()
if room_id not in self.room_connections:
self.room_connections[room_id] = set()
self.room_connections[room_id].add(websocket)
logger.info(f"✅ Client connected to room {room_id}. Total: {len(self.room_connections[room_id])}")
def disconnect_from_room(self, websocket: WebSocket, room_id: str):
"""Від'єднати клієнта від кімнати"""
if room_id in self.room_connections:
self.room_connections[room_id].discard(websocket)
if len(self.room_connections[room_id]) == 0:
del self.room_connections[room_id]
logger.info(f"❌ Client disconnected from room {room_id}")
async def broadcast_to_room(self, room_id: str, message: dict):
"""Broadcast повідомлення всім клієнтам кімнати"""
if room_id not in self.room_connections:
return
disconnected = set()
for websocket in self.room_connections[room_id]:
try:
await websocket.send_json(message)
except Exception as e:
logger.error(f"Failed to send to websocket: {e}")
disconnected.add(websocket)
# Видалити disconnected
for ws in disconnected:
self.room_connections[room_id].discard(ws)
# Presence methods
async def connect_to_presence(self, websocket: WebSocket):
"""Підключити клієнта до Presence System"""
await websocket.accept()
self.presence_connections.add(websocket)
logger.info(f"✅ Client connected to presence. Total: {len(self.presence_connections)}")
def disconnect_from_presence(self, websocket: WebSocket):
"""Від'єднати клієнта від Presence System"""
self.presence_connections.discard(websocket)
logger.info(f"❌ Client disconnected from presence")
async def broadcast_presence_update(self, message: dict):
"""Broadcast presence update всім клієнтам"""
disconnected = set()
for websocket in self.presence_connections:
try:
await websocket.send_json(message)
except Exception as e:
logger.error(f"Failed to send presence update: {e}")
disconnected.add(websocket)
# Видалити disconnected
for ws in disconnected:
self.presence_connections.discard(ws)
# Global manager instance
ws_manager = CityWSManager()
# =============================================================================
# WebSocket Endpoints
# =============================================================================
async def websocket_city_room(websocket: WebSocket, room_id: str):
"""
WebSocket для City Room
/ws/city/rooms/{room_id}
"""
await ws_manager.connect_to_room(websocket, room_id)
try:
while True:
data = await websocket.receive_text()
try:
message = json.loads(data)
event = message.get("event")
if event == "room.join":
# User joined room
user_id = message.get("user_id", "anonymous")
await ws_manager.broadcast_to_room(room_id, {
"event": "room.join",
"room_id": room_id,
"user_id": user_id
})
elif event == "room.leave":
# User left room
user_id = message.get("user_id", "anonymous")
await ws_manager.broadcast_to_room(room_id, {
"event": "room.leave",
"room_id": room_id,
"user_id": user_id
})
elif event == "room.message.send":
# New message (але краще через HTTP API)
logger.info(f"Message via WS (should use HTTP): {message}")
else:
logger.warning(f"Unknown event: {event}")
except json.JSONDecodeError:
logger.error("Invalid JSON from client")
except WebSocketDisconnect:
ws_manager.disconnect_from_room(websocket, room_id)
async def websocket_city_presence(websocket: WebSocket):
"""
WebSocket для Presence System
/ws/city/presence
"""
await ws_manager.connect_to_presence(websocket)
current_user_id: Optional[str] = None
try:
while True:
data = await websocket.receive_text()
try:
message = json.loads(data)
event = message.get("event")
if event == "presence.heartbeat":
user_id = message.get("user_id")
if not user_id:
continue
current_user_id = user_id
# Оновити Redis
await PresenceRedis.set_online(user_id)
# Broadcast presence update
await ws_manager.broadcast_presence_update({
"event": "presence.update",
"user_id": user_id,
"status": "online"
})
logger.debug(f"Heartbeat from {user_id}")
else:
logger.warning(f"Unknown presence event: {event}")
except json.JSONDecodeError:
logger.error("Invalid JSON from client")
except WebSocketDisconnect:
ws_manager.disconnect_from_presence(websocket)
# Видалити з Redis
if current_user_id:
logger.info(f"User {current_user_id} disconnected, presence will expire via TTL")
# =============================================================================
# Background Task: Presence Cleanup
# =============================================================================
async def presence_cleanup_task():
"""
Background task для очищення offline користувачів
Запускається кожні 60 секунд
"""
while True:
try:
await asyncio.sleep(60)
# Redis автоматично видаляє keys з TTL
# Тут можна додати додаткову логіку якщо потрібно
online_users = await PresenceRedis.get_all_online()
logger.info(f"Presence cleanup: {len(online_users)} users online")
except Exception as e:
logger.error(f"Presence cleanup error: {e}")