diff --git a/apps/web/src/app/city/page.tsx b/apps/web/src/app/city/page.tsx index b08b042d..a52bf0fa 100644 --- a/apps/web/src/app/city/page.tsx +++ b/apps/web/src/app/city/page.tsx @@ -1,22 +1,45 @@ +'use client' + +import { useState, useEffect } from 'react' import Link from 'next/link' -import { Building2, Users, Star, MessageSquare, ArrowRight } from 'lucide-react' +import { Building2, Users, Star, MessageSquare, ArrowRight, Loader2 } from 'lucide-react' import { api, CityRoom } from '@/lib/api' import { cn } from '@/lib/utils' +import { useGlobalPresence } from '@/hooks/useGlobalPresence' -// Force dynamic rendering - don't prerender at build time -export const dynamic = 'force-dynamic' +export default function CityPage() { + const [rooms, setRooms] = useState([]) + const [loading, setLoading] = useState(true) + const presence = useGlobalPresence() -async function getCityRooms(): Promise { - try { - return await api.getCityRooms() - } catch (error) { - console.error('Failed to fetch city rooms:', error) - return [] + useEffect(() => { + async function fetchRooms() { + try { + const data = await api.getCityRooms() + setRooms(data) + } catch (error) { + console.error('Failed to fetch city rooms:', error) + } finally { + setLoading(false) + } + } + fetchRooms() + }, []) + + // Calculate total online from presence or fallback to API data + const totalOnline = Object.values(presence).reduce((sum, p) => sum + p.online_count, 0) || + rooms.reduce((sum, r) => sum + r.members_online, 0) + + const activeRooms = Object.values(presence).filter(p => p.online_count > 0).length || + rooms.filter(r => r.members_online > 0).length + + if (loading) { + return ( +
+ +
+ ) } -} - -export default async function CityPage() { - const rooms = await getCityRooms() return (
@@ -32,6 +55,16 @@ export default async function CityPage() {

Оберіть кімнату для спілкування

+ + {/* Live indicator */} + {totalOnline > 0 && ( +
+ + + {totalOnline} у місті зараз + +
+ )} {/* Rooms Grid */} @@ -48,7 +81,11 @@ export default async function CityPage() { ) : (
{rooms.map((room) => ( - + ))}
)} @@ -63,8 +100,9 @@ export default async function CityPage() { /> sum + r.members_online, 0)} + value={totalOnline} icon={Users} + highlight={totalOnline > 0} /> r.members_online > 0).length} + value={activeRooms} icon={MessageSquare} + highlight={activeRooms > 0} /> )} @@ -83,13 +122,24 @@ export default async function CityPage() { ) } -function RoomCard({ room }: { room: CityRoom }) { - const isActive = room.members_online > 0 +interface RoomCardProps { + room: CityRoom + livePresence?: { online_count: number; typing_count: number } +} + +function RoomCard({ room, livePresence }: RoomCardProps) { + // Use live presence if available, otherwise fallback to API data + const onlineCount = livePresence?.online_count ?? room.members_online + const typingCount = livePresence?.typing_count ?? 0 + const isActive = onlineCount > 0 return (

@@ -118,8 +168,19 @@ function RoomCard({ room }: { room: CityRoom }) { 'w-2 h-2 rounded-full', isActive ? 'bg-emerald-400 animate-pulse' : 'bg-slate-600' )} /> - {room.members_online} онлайн + {onlineCount} онлайн + + {typingCount > 0 && ( + + + + + + + друкує + + )}

@@ -131,16 +192,29 @@ function RoomCard({ room }: { room: CityRoom }) { function StatCard({ label, value, - icon: Icon + icon: Icon, + highlight = false }: { label: string value: number icon: React.ComponentType<{ className?: string }> + highlight?: boolean }) { return ( -
- -
{value}
+
+ +
+ {value} +
{label}
) diff --git a/apps/web/src/hooks/useGlobalPresence.ts b/apps/web/src/hooks/useGlobalPresence.ts new file mode 100644 index 00000000..2fb4c84a --- /dev/null +++ b/apps/web/src/hooks/useGlobalPresence.ts @@ -0,0 +1,30 @@ +'use client' + +import { useState, useEffect } from 'react' +import { globalPresenceClient, RoomPresence } from '@/lib/global-presence' + +/** + * Hook for subscribing to global room presence updates + */ +export function useGlobalPresence(): Record { + const [presence, setPresence] = useState>({}) + + useEffect(() => { + const unsubscribe = globalPresenceClient.subscribe((newPresence) => { + setPresence(newPresence) + }) + + return unsubscribe + }, []) + + return presence +} + +/** + * Hook for getting presence of a specific room + */ +export function useRoomPresence(slug: string): RoomPresence | null { + const allPresence = useGlobalPresence() + return allPresence[slug] || null +} + diff --git a/apps/web/src/lib/global-presence.ts b/apps/web/src/lib/global-presence.ts new file mode 100644 index 00000000..7e3266f5 --- /dev/null +++ b/apps/web/src/lib/global-presence.ts @@ -0,0 +1,177 @@ +/** + * Global Presence WebSocket Client + * + * Connects to /ws/city/global-presence for real-time room presence updates + */ + +export interface RoomPresence { + room_slug: string; + online_count: number; + typing_count: number; +} + +export type PresenceCallback = (presence: Record) => void; + +class GlobalPresenceClient { + private ws: WebSocket | null = null; + private presence: Record = {}; + private listeners: Set = new Set(); + private reconnectTimeout: NodeJS.Timeout | null = null; + private pingInterval: NodeJS.Timeout | null = null; + private isConnecting = false; + + connect(): void { + if (this.ws?.readyState === WebSocket.OPEN || this.isConnecting) { + return; + } + + this.isConnecting = true; + + // Determine WebSocket URL + const protocol = typeof window !== 'undefined' && window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const host = typeof window !== 'undefined' ? window.location.host : 'localhost:7001'; + const wsUrl = `${protocol}//${host}/ws/city/global-presence`; + + console.log('[GlobalPresence] Connecting to', wsUrl); + + try { + this.ws = new WebSocket(wsUrl); + + this.ws.onopen = () => { + console.log('[GlobalPresence] Connected'); + this.isConnecting = false; + this.startPing(); + }; + + this.ws.onmessage = (event) => { + try { + const data = JSON.parse(event.data); + this.handleMessage(data); + } catch (e) { + console.error('[GlobalPresence] Failed to parse message:', e); + } + }; + + this.ws.onclose = () => { + console.log('[GlobalPresence] Disconnected'); + this.isConnecting = false; + this.stopPing(); + this.scheduleReconnect(); + }; + + this.ws.onerror = (error) => { + console.error('[GlobalPresence] WebSocket error:', error); + this.isConnecting = false; + }; + } catch (e) { + console.error('[GlobalPresence] Failed to create WebSocket:', e); + this.isConnecting = false; + this.scheduleReconnect(); + } + } + + disconnect(): void { + this.stopPing(); + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = null; + } + if (this.ws) { + this.ws.close(); + this.ws = null; + } + } + + subscribe(callback: PresenceCallback): () => void { + this.listeners.add(callback); + + // Send current state immediately + if (Object.keys(this.presence).length > 0) { + callback(this.presence); + } + + // Connect if not connected + this.connect(); + + return () => { + this.listeners.delete(callback); + + // Disconnect if no listeners + if (this.listeners.size === 0) { + this.disconnect(); + } + }; + } + + getPresence(slug: string): RoomPresence | null { + return this.presence[slug] || null; + } + + getAllPresence(): Record { + return { ...this.presence }; + } + + private handleMessage(data: any): void { + if (data.type === 'snapshot') { + // Initial snapshot + this.presence = {}; + for (const room of data.rooms || []) { + this.presence[room.room_slug] = { + room_slug: room.room_slug, + online_count: room.online_count || 0, + typing_count: room.typing_count || 0 + }; + } + this.notifyListeners(); + } else if (data.type === 'room.presence') { + // Incremental update + this.presence[data.room_slug] = { + room_slug: data.room_slug, + online_count: data.online_count || 0, + typing_count: data.typing_count || 0 + }; + this.notifyListeners(); + } + } + + private notifyListeners(): void { + for (const callback of this.listeners) { + try { + callback(this.presence); + } catch (e) { + console.error('[GlobalPresence] Listener error:', e); + } + } + } + + private startPing(): void { + this.pingInterval = setInterval(() => { + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws.send('ping'); + } + }, 30000); + } + + private stopPing(): void { + if (this.pingInterval) { + clearInterval(this.pingInterval); + this.pingInterval = null; + } + } + + private scheduleReconnect(): void { + if (this.reconnectTimeout) return; + + this.reconnectTimeout = setTimeout(() => { + this.reconnectTimeout = null; + if (this.listeners.size > 0) { + console.log('[GlobalPresence] Reconnecting...'); + this.connect(); + } + }, 5000); + } +} + +// Singleton instance +export const globalPresenceClient = new GlobalPresenceClient(); + diff --git a/docs/realtime/GLOBAL_PRESENCE_AGGREGATOR_SPEC.md b/docs/realtime/GLOBAL_PRESENCE_AGGREGATOR_SPEC.md new file mode 100644 index 00000000..685e9469 --- /dev/null +++ b/docs/realtime/GLOBAL_PRESENCE_AGGREGATOR_SPEC.md @@ -0,0 +1,344 @@ +# GLOBAL PRESENCE AGGREGATOR — DAARION.city + +Version: 1.0.0 +Location: docs/realtime/GLOBAL_PRESENCE_AGGREGATOR_SPEC.md + +--- + +## 0. PURPOSE + +Зробити **єдиний центр правди про присутність (presence) та активність** у місті: + +- збирати Matrix presence/typing/room-activity на сервері, +- агрегувати їх на рівні кімнат (`city_room`), +- публікувати у NATS як події, +- транслювати у фронтенд через WebSocket з `city-service`. + +Результат: DAARION має **"живе місто"**: + +- список кімнат `/city` показує: + - скільки людей онлайн, + - активність у реальному часі, +- майбутня City Map (2D/2.5D) живиться цими даними. + +--- + +## 1. ARCHITECTURE OVERVIEW + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ DAARION PRESENCE SYSTEM │ +├─────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────┐ ┌──────────────────────┐ ┌─────────────────┐ │ +│ │ Matrix │────▶│ matrix-presence- │────▶│ NATS │ │ +│ │ Synapse │ │ aggregator │ │ JetStream │ │ +│ │ │ │ (sync loop) │ │ │ │ +│ └─────────────┘ └──────────────────────┘ └────────┬────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────┐ ┌──────────────────────┐ ┌─────────────────┐ │ +│ │ Browser │◀────│ city-service │◀────│ NATS Sub │ │ +│ │ (WS) │ │ /ws/city/presence │ │ │ │ +│ └─────────────┘ └──────────────────────┘ └─────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +### Компоненти + +1. **matrix-presence-aggregator (новий сервіс)** + - читає Matrix sync (presence, typing, room activity), + - тримає у пам'яті поточний стан присутності, + - публікує агреговані події в NATS. + +2. **NATS JetStream** + - канал для presence/events: + - `city.presence.room.*` + - `city.presence.user.*` + +3. **city-service (розширення)** + - підписується на NATS події, + - тримає WebSocket з'єднання з фронтендом, + - пушить presence/room-activity у браузер. + +4. **web (Next.js UI)** + - сторінка `/city`: + - показує `N online` по кожній кімнаті, + - highlight "active" кімнати. + +--- + +## 2. MATRIX SIDE — ЗВІДКИ БРАТИ ПОДІЇ + +### 2.1. Окремий Matrix-юзер для агрегації + +Спец-акаунт: +- `@presence_daemon:daarion.space` +- права: + - читати presence/typing у всіх `city_*` кімнатах, + - бути учасником цих кімнат. + +### 2.2. Sync-loop на сервері + +Сервіс `matrix-presence-aggregator`: + +- використовує `/sync` Matrix (як клієнт), +- фільтр: + +```json +{ + "presence": { + "types": ["m.presence"] + }, + "room": { + "timeline": { "limit": 0 }, + "state": { "limit": 0 }, + "ephemeral": { + "types": ["m.typing", "m.receipt"] + } + } +} +``` + +- робить long-polling з `since` + `timeout`, +- парсить: + - `presence.events` → `m.presence`, + - `rooms.join[roomId].ephemeral.events` → `m.typing`, `m.receipt`. + +--- + +## 3. DATA MODEL (IN-MEMORY AGGREGATOR) + +### 3.1. Room presence state + +```python +from dataclasses import dataclass +from typing import Dict, List, Set, Optional +from datetime import datetime + +@dataclass +class UserPresence: + user_id: str # "@user:domain" + status: str # "online" | "offline" | "unavailable" + last_active_ts: float # timestamp + +@dataclass +class RoomPresence: + room_id: str # "!....:daarion.space" + alias: Optional[str] # "#city_energy:daarion.space" + city_room_slug: Optional[str] # "energy" + online_count: int + typing_user_ids: List[str] + last_event_ts: float + +class PresenceState: + users: Dict[str, UserPresence] + rooms: Dict[str, RoomPresence] + room_members: Dict[str, Set[str]] # room_id -> set of user_ids +``` + +### 3.2. Мапінг Room → City Room + +`matrix-presence-aggregator` має знати `matrix_room_id` ↔ `city_room.slug`. + +**Pull-mode (MVP):** +- при старті сервісу: + - `GET /internal/city/rooms` + - зчитати всі `matrix_room_id` / `matrix_room_alias` / `slug`, + - зібрати мапу `roomId → slug`. +- періодично (кожні 5 хвилин) оновлювати. + +--- + +## 4. NATS EVENTS + +### 4.1. Room-level presence + +Subject: +``` +city.presence.room. +``` + +Event payload: +```json +{ + "type": "room.presence", + "room_slug": "energy", + "matrix_room_id": "!gykdLyazhkcSZGHmbG:daarion.space", + "matrix_room_alias": "#city_energy:daarion.space", + "online_count": 5, + "typing_count": 1, + "typing_users": ["@user1:daarion.space"], + "last_event_ts": 1732610000000 +} +``` + +### 4.2. User-level presence (опційний) + +Subject: +``` +city.presence.user. +``` + +Payload: +```json +{ + "type": "user.presence", + "matrix_user_id": "@user1:daarion.space", + "status": "online", + "last_active_ts": 1732610000000 +} +``` + +--- + +## 5. EVENT GENERATION LOGIC + +### 5.1. Обробка m.presence + +При кожному `m.presence`: +- оновити `PresenceState.users[userId]`, +- для всіх кімнат, де є цей юзер — перерахувати `onlineCount`, +- якщо `onlineCount` змінився — публікувати нову подію. + +### 5.2. Обробка m.typing + +При `m.typing`: +- `content.user_ids` → список typing у кімнаті. +- Зберегти в `RoomPresence.typing_user_ids`. +- Згенерувати івент `city.presence.room.`. + +### 5.3. Throttling + +- подію публікувати тільки якщо `onlineCount` змінився, +- або не частіше ніж раз на 3 секунди на кімнату. + +--- + +## 6. CITY REALTIME GATEWAY (WEBSOCKET) + +### 6.1. WebSocket endpoint + +``` +GET /ws/city/presence +``` + +Auth: JWT токен у query param або header. + +### 6.2. Формат повідомлень + +**Snapshot (при підключенні):** +```json +{ + "type": "snapshot", + "rooms": [ + { "room_slug": "general", "online_count": 3, "typing_count": 0 }, + { "room_slug": "welcome", "online_count": 1, "typing_count": 0 } + ] +} +``` + +**Incremental update:** +```json +{ + "type": "room.presence", + "room_slug": "energy", + "online_count": 5, + "typing_count": 1 +} +``` + +--- + +## 7. FRONTEND INTEGRATION + +### 7.1. Список кімнат `/city` + +State: +```typescript +type RoomPresenceUI = { + onlineCount: number; + typingCount: number; +}; + +const [presenceBySlug, setPresenceBySlug] = useState>({}); +``` + +WebSocket handler: +```typescript +ws.onmessage = (event) => { + const data = JSON.parse(event.data); + + if (data.type === 'snapshot') { + const presence: Record = {}; + data.rooms.forEach(r => { + presence[r.room_slug] = { + onlineCount: r.online_count, + typingCount: r.typing_count + }; + }); + setPresenceBySlug(presence); + } + + if (data.type === 'room.presence') { + setPresenceBySlug(prev => ({ + ...prev, + [data.room_slug]: { + onlineCount: data.online_count, + typingCount: data.typing_count + } + })); + } +}; +``` + +### 7.2. UI + +- Room card: `X online`, typing badge +- Active room: glow effect +- Typing animation + +--- + +## 8. CONFIG / ENV + +### matrix-presence-aggregator + +```env +MATRIX_HS_URL=https://app.daarion.space +MATRIX_ACCESS_TOKEN= +MATRIX_USER_ID=@presence_daemon:daarion.space +CITY_SERVICE_INTERNAL_URL=http://city-service:7001 +NATS_URL=nats://nats:4222 +ROOM_PRESENCE_THROTTLE_MS=3000 +``` + +### city-service (realtime gateway) + +```env +NATS_URL=nats://nats:4222 +JWT_SECRET= +``` + +--- + +## 9. ACCEPTANCE CRITERIA + +- [ ] matrix-presence-aggregator запущений і синхронізується з Matrix +- [ ] NATS отримує події `city.presence.room.*` +- [ ] city-service має endpoint `/ws/city/presence` +- [ ] При підключенні WS клієнт отримує snapshot +- [ ] При зміні presence клієнт отримує update +- [ ] UI `/city` показує online count для кожної кімнати +- [ ] Typing indicator відображається + +--- + +## 10. FUTURE ENHANCEMENTS + +1. **Agent presence** — окремі статуси для AI-агентів +2. **City Map** — візуалізація presence на 2D карті +3. **Push notifications** — сповіщення про активність +4. **Historical analytics** — статистика активності + diff --git a/services/city-service/main.py b/services/city-service/main.py index 220344b7..e39899e8 100644 --- a/services/city-service/main.py +++ b/services/city-service/main.py @@ -16,6 +16,11 @@ import routes_city import ws_city import repo_city from common.redis_client import get_redis, close_redis +from presence_gateway import ( + websocket_global_presence, + start_presence_gateway, + stop_presence_gateway +) # Logging logging.basicConfig(level=logging.INFO) @@ -311,10 +316,16 @@ async def websocket_room_endpoint(websocket: WebSocket, room_id: str): @app.websocket("/ws/city/presence") async def websocket_presence_endpoint(websocket: WebSocket): - """WebSocket для Presence System""" + """WebSocket для Presence System (user heartbeats)""" await ws_city.websocket_city_presence(websocket) +@app.websocket("/ws/city/global-presence") +async def websocket_global_presence_endpoint(websocket: WebSocket): + """WebSocket для Global Room Presence (aggregated from Matrix)""" + await websocket_global_presence(websocket) + + @app.on_event("startup") async def startup_event(): """Запустити background tasks для WebSocket оновлень""" @@ -334,6 +345,13 @@ async def startup_event(): asyncio.create_task(agents_presence_generator()) asyncio.create_task(ws_city.presence_cleanup_task()) + # Start global presence gateway (NATS subscriber) + try: + await start_presence_gateway() + logger.info("✅ Global presence gateway started") + except Exception as e: + logger.warning(f"⚠️ Global presence gateway failed to start: {e}") + logger.info("✅ WebSocket background tasks started") @@ -341,6 +359,7 @@ async def startup_event(): async def shutdown_event(): """Cleanup при зупинці""" logger.info("🛑 City Service shutting down...") + await stop_presence_gateway() await repo_city.close_pool() await close_redis() diff --git a/services/city-service/presence_gateway.py b/services/city-service/presence_gateway.py new file mode 100644 index 00000000..f009e76c --- /dev/null +++ b/services/city-service/presence_gateway.py @@ -0,0 +1,173 @@ +""" +Global Presence Gateway for City Service + +Subscribes to NATS presence events from matrix-presence-aggregator +and broadcasts to WebSocket clients. +""" +import asyncio +import json +import logging +from typing import Dict, Set, Optional +import os + +from fastapi import WebSocket, WebSocketDisconnect + +logger = logging.getLogger(__name__) + +# NATS URL +NATS_URL = os.getenv("NATS_URL", "nats://localhost:4222") + + +class GlobalPresenceManager: + """Manages WebSocket connections for global room presence""" + + def __init__(self): + self.connections: Set[WebSocket] = set() + self.room_presence: Dict[str, dict] = {} # slug -> {online_count, typing_count} + self.nc = None # NATS connection + self.is_running = False + + async def connect(self, websocket: WebSocket): + """Add a new WebSocket client""" + await websocket.accept() + self.connections.add(websocket) + + # Send initial snapshot + await self._send_snapshot(websocket) + + logger.info(f"Global presence client connected. Total: {len(self.connections)}") + + def disconnect(self, websocket: WebSocket): + """Remove a WebSocket client""" + self.connections.discard(websocket) + logger.info(f"Global presence client disconnected. Total: {len(self.connections)}") + + async def _send_snapshot(self, websocket: WebSocket): + """Send current presence snapshot to a client""" + rooms = [ + { + "room_slug": slug, + "online_count": data.get("online_count", 0), + "typing_count": data.get("typing_count", 0) + } + for slug, data in self.room_presence.items() + ] + + await websocket.send_json({ + "type": "snapshot", + "rooms": rooms + }) + + async def broadcast(self, message: dict): + """Broadcast a message to all connected clients""" + if not self.connections: + return + + disconnected = set() + + for websocket in self.connections: + try: + await websocket.send_json(message) + except Exception as e: + logger.error(f"Failed to send to websocket: {e}") + disconnected.add(websocket) + + # Remove disconnected clients + for ws in disconnected: + self.connections.discard(ws) + + def update_room_presence(self, slug: str, online_count: int, typing_count: int): + """Update cached presence for a room""" + self.room_presence[slug] = { + "online_count": online_count, + "typing_count": typing_count + } + + async def start_nats_subscriber(self): + """Start NATS subscription for presence events""" + try: + import nats + + self.nc = await nats.connect(NATS_URL) + self.is_running = True + logger.info(f"Connected to NATS at {NATS_URL} for presence events") + + # Subscribe to room presence events + await self.nc.subscribe("city.presence.room.*", cb=self._on_room_presence) + + logger.info("Subscribed to city.presence.room.*") + + except ImportError: + logger.warning("nats-py not installed, NATS presence disabled") + except Exception as e: + logger.error(f"Failed to connect to NATS: {e}") + + async def _on_room_presence(self, msg): + """Handle room presence event from NATS""" + try: + data = json.loads(msg.data.decode()) + + slug = data.get("room_slug") + online_count = data.get("online_count", 0) + typing_count = data.get("typing_count", 0) + + if slug: + # Update cache + self.update_room_presence(slug, online_count, typing_count) + + # Broadcast to WebSocket clients + await self.broadcast({ + "type": "room.presence", + "room_slug": slug, + "online_count": online_count, + "typing_count": typing_count + }) + + logger.debug(f"Room presence update: {slug} -> {online_count} online, {typing_count} typing") + + except Exception as e: + logger.error(f"Error processing NATS presence event: {e}") + + async def stop(self): + """Stop NATS subscription""" + self.is_running = False + if self.nc: + await self.nc.drain() + logger.info("NATS connection closed") + + +# Global instance +global_presence_manager = GlobalPresenceManager() + + +async def websocket_global_presence(websocket: WebSocket): + """ + WebSocket endpoint for global room presence + /ws/city/global-presence + + Sends: + - Initial snapshot of all room presence + - Real-time updates when presence changes + """ + await global_presence_manager.connect(websocket) + + try: + while True: + # Keep connection alive, handle pings + data = await websocket.receive_text() + if data == "ping": + await websocket.send_text("pong") + + except WebSocketDisconnect: + global_presence_manager.disconnect(websocket) + + +async def start_presence_gateway(): + """Start the global presence gateway (call on startup)""" + await global_presence_manager.start_nats_subscriber() + + +async def stop_presence_gateway(): + """Stop the global presence gateway (call on shutdown)""" + await global_presence_manager.stop() + diff --git a/services/matrix-presence-aggregator/Dockerfile b/services/matrix-presence-aggregator/Dockerfile new file mode 100644 index 00000000..86c8f8c0 --- /dev/null +++ b/services/matrix-presence-aggregator/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy source +COPY . . + +# Run the service +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7026"] + diff --git a/services/matrix-presence-aggregator/config.py b/services/matrix-presence-aggregator/config.py new file mode 100644 index 00000000..3dccf692 --- /dev/null +++ b/services/matrix-presence-aggregator/config.py @@ -0,0 +1,25 @@ +"""Configuration for Matrix Presence Aggregator""" +import os +from dotenv import load_dotenv + +load_dotenv() + +# Matrix settings +MATRIX_HS_URL = os.getenv("MATRIX_HS_URL", "https://app.daarion.space") +MATRIX_ACCESS_TOKEN = os.getenv("MATRIX_ACCESS_TOKEN", "") +MATRIX_USER_ID = os.getenv("MATRIX_USER_ID", "@presence_daemon:daarion.space") + +# City Service for room mapping +CITY_SERVICE_URL = os.getenv("CITY_SERVICE_URL", "http://localhost:7001") +INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "super-secret-internal-key") + +# NATS +NATS_URL = os.getenv("NATS_URL", "nats://localhost:4222") + +# Throttling +ROOM_PRESENCE_THROTTLE_MS = int(os.getenv("ROOM_PRESENCE_THROTTLE_MS", "3000")) + +# Sync settings +SYNC_TIMEOUT_MS = int(os.getenv("SYNC_TIMEOUT_MS", "30000")) +ROOM_MAPPING_REFRESH_INTERVAL_S = int(os.getenv("ROOM_MAPPING_REFRESH_INTERVAL_S", "300")) + diff --git a/services/matrix-presence-aggregator/main.py b/services/matrix-presence-aggregator/main.py new file mode 100644 index 00000000..eda5c26b --- /dev/null +++ b/services/matrix-presence-aggregator/main.py @@ -0,0 +1,202 @@ +""" +Matrix Presence Aggregator Service + +Aggregates Matrix presence/typing events and publishes to NATS +for real-time city presence in DAARION. +""" +import asyncio +import logging +from contextlib import asynccontextmanager +from typing import Dict + +import httpx +from fastapi import FastAPI + +from config import ( + CITY_SERVICE_URL, + INTERNAL_API_KEY, + ROOM_PRESENCE_THROTTLE_MS, + ROOM_MAPPING_REFRESH_INTERVAL_S +) +from models import PresenceState +from matrix_sync import MatrixSyncClient, get_room_members, join_room +from nats_publisher import PresencePublisher + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + +# Global state +state = PresenceState() +publisher = PresencePublisher() +sync_client: MatrixSyncClient = None + + +async def fetch_room_mappings() -> Dict[str, str]: + """Fetch room_id -> slug mappings from city-service""" + async with httpx.AsyncClient(timeout=30.0) as client: + try: + response = await client.get( + f"{CITY_SERVICE_URL}/api/city/rooms", + headers={"X-Internal-API-Key": INTERNAL_API_KEY} + ) + response.raise_for_status() + rooms = response.json() + + mappings = {} + for room in rooms: + matrix_room_id = room.get("matrix_room_id") + slug = room.get("slug") + if matrix_room_id and slug: + mappings[matrix_room_id] = slug + + logger.info(f"Fetched {len(mappings)} room mappings from city-service") + return mappings + except Exception as e: + logger.error(f"Failed to fetch room mappings: {e}") + return {} + + +async def refresh_room_mappings_loop(): + """Periodically refresh room mappings""" + while True: + try: + mappings = await fetch_room_mappings() + if mappings: + state.set_room_mapping(mappings) + + # Join all mapped rooms + for room_id in mappings.keys(): + await join_room(room_id) + # Fetch initial members + members = await get_room_members(room_id) + for user_id in members: + state.add_room_member(room_id, user_id) + except Exception as e: + logger.error(f"Error refreshing room mappings: {e}") + + await asyncio.sleep(ROOM_MAPPING_REFRESH_INTERVAL_S) + + +async def on_presence(user_id: str, status: str): + """Handle presence update from Matrix""" + affected_slugs = state.update_user_presence(user_id, status) + + # Publish updates for affected rooms + for slug in affected_slugs: + room_id = state.slug_to_room_id.get(slug) + if room_id: + room = state.get_room_presence(room_id) + if room and state.should_publish(room_id, ROOM_PRESENCE_THROTTLE_MS): + await publisher.publish_room_presence(room) + + +async def on_typing(room_id: str, typing_user_ids: list): + """Handle typing update from Matrix""" + slug = state.update_room_typing(room_id, typing_user_ids) + + if slug: + room = state.get_room_presence(room_id) + if room and state.should_publish(room_id, ROOM_PRESENCE_THROTTLE_MS): + await publisher.publish_room_presence(room) + + +async def on_room_member(room_id: str, user_id: str, membership: str): + """Handle membership change from Matrix""" + if membership == "join": + state.add_room_member(room_id, user_id) + else: + state.remove_room_member(room_id, user_id) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Startup and shutdown events""" + global sync_client + + # Startup + logger.info("Starting Matrix Presence Aggregator") + + # Connect to NATS + await publisher.connect() + + # Initial room mapping fetch + mappings = await fetch_room_mappings() + if mappings: + state.set_room_mapping(mappings) + # Join all rooms and get initial members + for room_id in mappings.keys(): + await join_room(room_id) + members = await get_room_members(room_id) + for user_id in members: + state.add_room_member(room_id, user_id) + + # Start sync client + sync_client = MatrixSyncClient( + on_presence=on_presence, + on_typing=on_typing, + on_room_member=on_room_member + ) + + # Start background tasks + asyncio.create_task(sync_client.start()) + asyncio.create_task(refresh_room_mappings_loop()) + + logger.info("Matrix Presence Aggregator started successfully") + + yield + + # Shutdown + logger.info("Shutting down Matrix Presence Aggregator") + if sync_client: + await sync_client.stop() + await publisher.disconnect() + + +app = FastAPI( + title="Matrix Presence Aggregator", + description="Aggregates Matrix presence events for DAARION city", + version="1.0.0", + lifespan=lifespan +) + + +@app.get("/health") +async def health(): + """Health check endpoint""" + return { + "status": "healthy", + "service": "matrix-presence-aggregator", + "nats_connected": publisher.is_connected, + "rooms_tracked": len(state.rooms), + "users_tracked": len(state.users) + } + + +@app.get("/status") +async def status(): + """Detailed status endpoint""" + rooms = [] + for room in state.get_all_room_presences(): + rooms.append({ + "slug": room.city_room_slug, + "room_id": room.room_id, + "online_count": room.online_count, + "typing_count": len(room.typing_user_ids) + }) + + return { + "nats_connected": publisher.is_connected, + "sync_running": sync_client.is_running if sync_client else False, + "rooms": rooms, + "total_users_tracked": len(state.users) + } + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=7026) + diff --git a/services/matrix-presence-aggregator/matrix_sync.py b/services/matrix-presence-aggregator/matrix_sync.py new file mode 100644 index 00000000..b6309f32 --- /dev/null +++ b/services/matrix-presence-aggregator/matrix_sync.py @@ -0,0 +1,174 @@ +"""Matrix sync loop for presence aggregation""" +import asyncio +import logging +import httpx +from typing import Optional, Dict, Any, Callable, Awaitable + +from config import ( + MATRIX_HS_URL, + MATRIX_ACCESS_TOKEN, + MATRIX_USER_ID, + SYNC_TIMEOUT_MS +) + +logger = logging.getLogger(__name__) + + +class MatrixSyncClient: + """Client for Matrix /sync endpoint to get presence and typing events""" + + def __init__( + self, + on_presence: Callable[[str, str], Awaitable[None]], + on_typing: Callable[[str, list], Awaitable[None]], + on_room_member: Callable[[str, str, str], Awaitable[None]], + ): + self.base_url = MATRIX_HS_URL + self.access_token = MATRIX_ACCESS_TOKEN + self.user_id = MATRIX_USER_ID + self.since_token: Optional[str] = None + self.is_running = False + + # Callbacks + self.on_presence = on_presence # (user_id, status) + self.on_typing = on_typing # (room_id, typing_user_ids) + self.on_room_member = on_room_member # (room_id, user_id, membership) + + # Sync filter + self.filter = { + "presence": { + "types": ["m.presence"] + }, + "room": { + "timeline": {"limit": 0}, + "state": { + "types": ["m.room.member"], + "lazy_load_members": True + }, + "ephemeral": { + "types": ["m.typing"] + } + } + } + + async def start(self): + """Start the sync loop""" + self.is_running = True + logger.info(f"Starting Matrix sync loop as {self.user_id}") + + async with httpx.AsyncClient(timeout=60.0) as client: + while self.is_running: + try: + await self._sync_once(client) + except httpx.TimeoutException: + logger.debug("Sync timeout (normal for long-polling)") + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error during sync: {e.response.status_code}") + await asyncio.sleep(5) + except Exception as e: + logger.error(f"Sync error: {e}") + await asyncio.sleep(5) + + async def stop(self): + """Stop the sync loop""" + self.is_running = False + logger.info("Stopping Matrix sync loop") + + async def _sync_once(self, client: httpx.AsyncClient): + """Perform one sync request""" + import json + + params = { + "timeout": str(SYNC_TIMEOUT_MS), + "filter": json.dumps(self.filter) + } + + if self.since_token: + params["since"] = self.since_token + + response = await client.get( + f"{self.base_url}/_matrix/client/v3/sync", + params=params, + headers={"Authorization": f"Bearer {self.access_token}"} + ) + response.raise_for_status() + data = response.json() + + # Update since token + self.since_token = data.get("next_batch") + + # Process presence events + await self._process_presence(data.get("presence", {}).get("events", [])) + + # Process room events + rooms = data.get("rooms", {}) + await self._process_rooms(rooms.get("join", {})) + + async def _process_presence(self, events: list): + """Process m.presence events""" + for event in events: + if event.get("type") != "m.presence": + continue + + user_id = event.get("sender") + content = event.get("content", {}) + status = content.get("presence", "offline") + + if user_id: + logger.debug(f"Presence update: {user_id} -> {status}") + await self.on_presence(user_id, status) + + async def _process_rooms(self, joined_rooms: Dict[str, Any]): + """Process room events (typing, membership)""" + for room_id, room_data in joined_rooms.items(): + # Process ephemeral events (typing) + ephemeral = room_data.get("ephemeral", {}).get("events", []) + for event in ephemeral: + if event.get("type") == "m.typing": + typing_users = event.get("content", {}).get("user_ids", []) + logger.debug(f"Typing in {room_id}: {typing_users}") + await self.on_typing(room_id, typing_users) + + # Process state events (membership) + state = room_data.get("state", {}).get("events", []) + for event in state: + if event.get("type") == "m.room.member": + user_id = event.get("state_key") + membership = event.get("content", {}).get("membership", "leave") + if user_id: + logger.debug(f"Membership: {user_id} in {room_id} -> {membership}") + await self.on_room_member(room_id, user_id, membership) + + +async def get_room_members(room_id: str) -> list: + """Get current members of a room""" + async with httpx.AsyncClient(timeout=30.0) as client: + try: + response = await client.get( + f"{MATRIX_HS_URL}/_matrix/client/v3/rooms/{room_id}/joined_members", + headers={"Authorization": f"Bearer {MATRIX_ACCESS_TOKEN}"} + ) + response.raise_for_status() + data = response.json() + return list(data.get("joined", {}).keys()) + except Exception as e: + logger.error(f"Failed to get room members for {room_id}: {e}") + return [] + + +async def join_room(room_id_or_alias: str) -> Optional[str]: + """Join a room and return the room_id""" + async with httpx.AsyncClient(timeout=30.0) as client: + try: + response = await client.post( + f"{MATRIX_HS_URL}/_matrix/client/v3/join/{room_id_or_alias}", + headers={"Authorization": f"Bearer {MATRIX_ACCESS_TOKEN}"}, + json={} + ) + response.raise_for_status() + data = response.json() + return data.get("room_id") + except Exception as e: + logger.error(f"Failed to join room {room_id_or_alias}: {e}") + return None + diff --git a/services/matrix-presence-aggregator/models.py b/services/matrix-presence-aggregator/models.py new file mode 100644 index 00000000..cf8e8b0b --- /dev/null +++ b/services/matrix-presence-aggregator/models.py @@ -0,0 +1,135 @@ +"""Data models for Presence Aggregator""" +from dataclasses import dataclass, field +from typing import Dict, List, Set, Optional +from datetime import datetime +import time + + +@dataclass +class UserPresence: + user_id: str # "@user:domain" + status: str # "online" | "offline" | "unavailable" + last_active_ts: float = field(default_factory=time.time) + + +@dataclass +class RoomPresence: + room_id: str # "!....:daarion.space" + alias: Optional[str] = None # "#city_energy:daarion.space" + city_room_slug: Optional[str] = None # "energy" + online_count: int = 0 + typing_user_ids: List[str] = field(default_factory=list) + last_event_ts: float = field(default_factory=time.time) + last_published_ts: float = 0 # For throttling + + +class PresenceState: + """In-memory state for presence aggregation""" + + def __init__(self): + self.users: Dict[str, UserPresence] = {} + self.rooms: Dict[str, RoomPresence] = {} + self.room_members: Dict[str, Set[str]] = {} # room_id -> set of user_ids + self.room_id_to_slug: Dict[str, str] = {} # matrix_room_id -> city_room_slug + self.slug_to_room_id: Dict[str, str] = {} # city_room_slug -> matrix_room_id + + def update_user_presence(self, user_id: str, status: str) -> List[str]: + """ + Update user presence and return list of affected room slugs + """ + prev_status = self.users.get(user_id, UserPresence(user_id, "offline")).status + self.users[user_id] = UserPresence(user_id, status) + + # Find rooms where this user is a member + affected_slugs = [] + for room_id, members in self.room_members.items(): + if user_id in members: + slug = self.room_id_to_slug.get(room_id) + if slug: + # Recalculate online count for this room + self._recalculate_room_online_count(room_id) + affected_slugs.append(slug) + + return affected_slugs + + def update_room_typing(self, room_id: str, typing_user_ids: List[str]) -> Optional[str]: + """ + Update typing users for a room and return the slug if changed + """ + if room_id not in self.rooms: + slug = self.room_id_to_slug.get(room_id) + if slug: + self.rooms[room_id] = RoomPresence(room_id, city_room_slug=slug) + else: + return None + + room = self.rooms[room_id] + if room.typing_user_ids != typing_user_ids: + room.typing_user_ids = typing_user_ids + room.last_event_ts = time.time() + return room.city_room_slug + + return None + + def add_room_member(self, room_id: str, user_id: str): + """Add a user to a room's member list""" + if room_id not in self.room_members: + self.room_members[room_id] = set() + self.room_members[room_id].add(user_id) + + def remove_room_member(self, room_id: str, user_id: str): + """Remove a user from a room's member list""" + if room_id in self.room_members: + self.room_members[room_id].discard(user_id) + + def _recalculate_room_online_count(self, room_id: str): + """Recalculate online count for a room based on member presence""" + if room_id not in self.rooms: + slug = self.room_id_to_slug.get(room_id) + if slug: + self.rooms[room_id] = RoomPresence(room_id, city_room_slug=slug) + else: + return + + members = self.room_members.get(room_id, set()) + online_count = 0 + for user_id in members: + user = self.users.get(user_id) + if user and user.status in ("online", "unavailable"): + online_count += 1 + + self.rooms[room_id].online_count = online_count + self.rooms[room_id].last_event_ts = time.time() + + def get_room_presence(self, room_id: str) -> Optional[RoomPresence]: + """Get presence info for a room""" + return self.rooms.get(room_id) + + def get_all_room_presences(self) -> List[RoomPresence]: + """Get presence info for all tracked rooms""" + return list(self.rooms.values()) + + def set_room_mapping(self, mappings: Dict[str, str]): + """Set room_id -> slug mapping""" + self.room_id_to_slug = mappings + self.slug_to_room_id = {v: k for k, v in mappings.items()} + + # Initialize RoomPresence for all mapped rooms + for room_id, slug in mappings.items(): + if room_id not in self.rooms: + self.rooms[room_id] = RoomPresence(room_id, city_room_slug=slug) + else: + self.rooms[room_id].city_room_slug = slug + + def should_publish(self, room_id: str, throttle_ms: int) -> bool: + """Check if we should publish an event (throttling)""" + room = self.rooms.get(room_id) + if not room: + return False + + now = time.time() * 1000 # ms + if now - room.last_published_ts >= throttle_ms: + room.last_published_ts = now + return True + return False + diff --git a/services/matrix-presence-aggregator/nats_publisher.py b/services/matrix-presence-aggregator/nats_publisher.py new file mode 100644 index 00000000..9b9bf9a8 --- /dev/null +++ b/services/matrix-presence-aggregator/nats_publisher.py @@ -0,0 +1,87 @@ +"""NATS publisher for presence events""" +import asyncio +import json +import logging +from typing import Optional +import nats +from nats.aio.client import Client as NATS + +from config import NATS_URL +from models import RoomPresence + +logger = logging.getLogger(__name__) + + +class PresencePublisher: + """Publishes presence events to NATS""" + + def __init__(self): + self.nc: Optional[NATS] = None + self.is_connected = False + + async def connect(self): + """Connect to NATS""" + try: + self.nc = await nats.connect(NATS_URL) + self.is_connected = True + logger.info(f"Connected to NATS at {NATS_URL}") + except Exception as e: + logger.error(f"Failed to connect to NATS: {e}") + self.is_connected = False + + async def disconnect(self): + """Disconnect from NATS""" + if self.nc: + await self.nc.drain() + self.is_connected = False + logger.info("Disconnected from NATS") + + async def publish_room_presence(self, room: RoomPresence): + """Publish room presence event""" + if not self.is_connected or not self.nc: + logger.warning("Not connected to NATS, skipping publish") + return + + if not room.city_room_slug: + logger.debug(f"Room {room.room_id} has no slug, skipping") + return + + subject = f"city.presence.room.{room.city_room_slug}" + payload = { + "type": "room.presence", + "room_slug": room.city_room_slug, + "matrix_room_id": room.room_id, + "matrix_room_alias": room.alias, + "online_count": room.online_count, + "typing_count": len(room.typing_user_ids), + "typing_users": room.typing_user_ids, + "last_event_ts": int(room.last_event_ts * 1000) + } + + try: + await self.nc.publish(subject, json.dumps(payload).encode()) + logger.debug(f"Published to {subject}: online={room.online_count}, typing={len(room.typing_user_ids)}") + except Exception as e: + logger.error(f"Failed to publish to NATS: {e}") + + async def publish_user_presence(self, user_id: str, status: str, last_active_ts: float): + """Publish user presence event""" + if not self.is_connected or not self.nc: + return + + # Extract localpart from @user:domain + localpart = user_id.split(":")[0].lstrip("@") + subject = f"city.presence.user.{localpart}" + payload = { + "type": "user.presence", + "matrix_user_id": user_id, + "status": status, + "last_active_ts": int(last_active_ts * 1000) + } + + try: + await self.nc.publish(subject, json.dumps(payload).encode()) + logger.debug(f"Published user presence: {user_id} -> {status}") + except Exception as e: + logger.error(f"Failed to publish user presence: {e}") + diff --git a/services/matrix-presence-aggregator/requirements.txt b/services/matrix-presence-aggregator/requirements.txt new file mode 100644 index 00000000..9af9b05c --- /dev/null +++ b/services/matrix-presence-aggregator/requirements.txt @@ -0,0 +1,7 @@ +fastapi==0.109.0 +uvicorn==0.27.0 +httpx==0.26.0 +nats-py==2.6.0 +python-dotenv==1.0.0 +pydantic==2.5.3 +