feat: Add Global Presence Aggregator system

- GLOBAL_PRESENCE_AGGREGATOR_SPEC.md documentation
- matrix-presence-aggregator service (Python/FastAPI)
  - Matrix sync loop for presence/typing
  - NATS publishing for room presence
- city-service: presence_gateway for WS broadcast
- Frontend: real-time online count in room list
  - useGlobalPresence hook
  - Live typing indicators
  - Active room highlighting
This commit is contained in:
Apple
2025-11-26 14:22:34 -08:00
parent a3e632b9e7
commit 78849cc108
13 changed files with 1486 additions and 25 deletions

View File

@@ -1,22 +1,45 @@
'use client'
import { useState, useEffect } from 'react'
import Link from 'next/link'
import { Building2, Users, Star, MessageSquare, ArrowRight } from 'lucide-react'
import { Building2, Users, Star, MessageSquare, ArrowRight, Loader2 } from 'lucide-react'
import { api, CityRoom } from '@/lib/api'
import { cn } from '@/lib/utils'
import { useGlobalPresence } from '@/hooks/useGlobalPresence'
// Force dynamic rendering - don't prerender at build time
export const dynamic = 'force-dynamic'
export default function CityPage() {
const [rooms, setRooms] = useState<CityRoom[]>([])
const [loading, setLoading] = useState(true)
const presence = useGlobalPresence()
async function getCityRooms(): Promise<CityRoom[]> {
try {
return await api.getCityRooms()
} catch (error) {
console.error('Failed to fetch city rooms:', error)
return []
useEffect(() => {
async function fetchRooms() {
try {
const data = await api.getCityRooms()
setRooms(data)
} catch (error) {
console.error('Failed to fetch city rooms:', error)
} finally {
setLoading(false)
}
}
fetchRooms()
}, [])
// Calculate total online from presence or fallback to API data
const totalOnline = Object.values(presence).reduce((sum, p) => sum + p.online_count, 0) ||
rooms.reduce((sum, r) => sum + r.members_online, 0)
const activeRooms = Object.values(presence).filter(p => p.online_count > 0).length ||
rooms.filter(r => r.members_online > 0).length
if (loading) {
return (
<div className="min-h-screen flex items-center justify-center">
<Loader2 className="w-8 h-8 text-cyan-400 animate-spin" />
</div>
)
}
}
export default async function CityPage() {
const rooms = await getCityRooms()
return (
<div className="min-h-screen px-4 py-8">
@@ -32,6 +55,16 @@ export default async function CityPage() {
<p className="text-slate-400">Оберіть кімнату для спілкування</p>
</div>
</div>
{/* Live indicator */}
{totalOnline > 0 && (
<div className="inline-flex items-center gap-2 px-3 py-1.5 bg-emerald-500/10 border border-emerald-500/20 rounded-full">
<span className="w-2 h-2 rounded-full bg-emerald-400 animate-pulse" />
<span className="text-sm text-emerald-400 font-medium">
{totalOnline} у місті зараз
</span>
</div>
)}
</div>
{/* Rooms Grid */}
@@ -48,7 +81,11 @@ export default async function CityPage() {
) : (
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4 sm:gap-6">
{rooms.map((room) => (
<RoomCard key={room.id} room={room} />
<RoomCard
key={room.id}
room={room}
livePresence={presence[room.slug]}
/>
))}
</div>
)}
@@ -63,8 +100,9 @@ export default async function CityPage() {
/>
<StatCard
label="Онлайн"
value={rooms.reduce((sum, r) => sum + r.members_online, 0)}
value={totalOnline}
icon={Users}
highlight={totalOnline > 0}
/>
<StatCard
label="За замовч."
@@ -73,8 +111,9 @@ export default async function CityPage() {
/>
<StatCard
label="Активних"
value={rooms.filter(r => r.members_online > 0).length}
value={activeRooms}
icon={MessageSquare}
highlight={activeRooms > 0}
/>
</div>
)}
@@ -83,13 +122,24 @@ export default async function CityPage() {
)
}
function RoomCard({ room }: { room: CityRoom }) {
const isActive = room.members_online > 0
interface RoomCardProps {
room: CityRoom
livePresence?: { online_count: number; typing_count: number }
}
function RoomCard({ room, livePresence }: RoomCardProps) {
// Use live presence if available, otherwise fallback to API data
const onlineCount = livePresence?.online_count ?? room.members_online
const typingCount = livePresence?.typing_count ?? 0
const isActive = onlineCount > 0
return (
<Link
href={`/city/${room.slug}`}
className="glass-panel-hover p-5 group block"
className={cn(
"glass-panel-hover p-5 group block transition-all",
isActive && "ring-1 ring-emerald-500/30"
)}
>
<div className="flex items-start justify-between mb-3">
<h3 className="text-lg font-semibold text-white group-hover:text-cyan-400 transition-colors">
@@ -118,8 +168,19 @@ function RoomCard({ room }: { room: CityRoom }) {
'w-2 h-2 rounded-full',
isActive ? 'bg-emerald-400 animate-pulse' : 'bg-slate-600'
)} />
{room.members_online} онлайн
{onlineCount} онлайн
</span>
{typingCount > 0 && (
<span className="flex items-center gap-1.5 text-cyan-400">
<span className="flex gap-0.5">
<span className="w-1 h-1 rounded-full bg-cyan-400 animate-bounce" style={{ animationDelay: '0ms' }} />
<span className="w-1 h-1 rounded-full bg-cyan-400 animate-bounce" style={{ animationDelay: '150ms' }} />
<span className="w-1 h-1 rounded-full bg-cyan-400 animate-bounce" style={{ animationDelay: '300ms' }} />
</span>
друкує
</span>
)}
</div>
<ArrowRight className="w-5 h-5 text-slate-500 group-hover:text-cyan-400 group-hover:translate-x-1 transition-all" />
@@ -131,16 +192,29 @@ function RoomCard({ room }: { room: CityRoom }) {
function StatCard({
label,
value,
icon: Icon
icon: Icon,
highlight = false
}: {
label: string
value: number
icon: React.ComponentType<{ className?: string }>
highlight?: boolean
}) {
return (
<div className="glass-panel p-4 text-center">
<Icon className="w-5 h-5 text-cyan-400 mx-auto mb-2" />
<div className="text-2xl font-bold text-white">{value}</div>
<div className={cn(
"glass-panel p-4 text-center transition-all",
highlight && "ring-1 ring-emerald-500/30"
)}>
<Icon className={cn(
"w-5 h-5 mx-auto mb-2",
highlight ? "text-emerald-400" : "text-cyan-400"
)} />
<div className={cn(
"text-2xl font-bold",
highlight ? "text-emerald-400" : "text-white"
)}>
{value}
</div>
<div className="text-xs text-slate-400">{label}</div>
</div>
)

View File

@@ -0,0 +1,30 @@
'use client'
import { useState, useEffect } from 'react'
import { globalPresenceClient, RoomPresence } from '@/lib/global-presence'
/**
* Hook for subscribing to global room presence updates
*/
export function useGlobalPresence(): Record<string, RoomPresence> {
const [presence, setPresence] = useState<Record<string, RoomPresence>>({})
useEffect(() => {
const unsubscribe = globalPresenceClient.subscribe((newPresence) => {
setPresence(newPresence)
})
return unsubscribe
}, [])
return presence
}
/**
* Hook for getting presence of a specific room
*/
export function useRoomPresence(slug: string): RoomPresence | null {
const allPresence = useGlobalPresence()
return allPresence[slug] || null
}

View File

@@ -0,0 +1,177 @@
/**
* Global Presence WebSocket Client
*
* Connects to /ws/city/global-presence for real-time room presence updates
*/
export interface RoomPresence {
room_slug: string;
online_count: number;
typing_count: number;
}
export type PresenceCallback = (presence: Record<string, RoomPresence>) => void;
class GlobalPresenceClient {
private ws: WebSocket | null = null;
private presence: Record<string, RoomPresence> = {};
private listeners: Set<PresenceCallback> = new Set();
private reconnectTimeout: NodeJS.Timeout | null = null;
private pingInterval: NodeJS.Timeout | null = null;
private isConnecting = false;
connect(): void {
if (this.ws?.readyState === WebSocket.OPEN || this.isConnecting) {
return;
}
this.isConnecting = true;
// Determine WebSocket URL
const protocol = typeof window !== 'undefined' && window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const host = typeof window !== 'undefined' ? window.location.host : 'localhost:7001';
const wsUrl = `${protocol}//${host}/ws/city/global-presence`;
console.log('[GlobalPresence] Connecting to', wsUrl);
try {
this.ws = new WebSocket(wsUrl);
this.ws.onopen = () => {
console.log('[GlobalPresence] Connected');
this.isConnecting = false;
this.startPing();
};
this.ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
this.handleMessage(data);
} catch (e) {
console.error('[GlobalPresence] Failed to parse message:', e);
}
};
this.ws.onclose = () => {
console.log('[GlobalPresence] Disconnected');
this.isConnecting = false;
this.stopPing();
this.scheduleReconnect();
};
this.ws.onerror = (error) => {
console.error('[GlobalPresence] WebSocket error:', error);
this.isConnecting = false;
};
} catch (e) {
console.error('[GlobalPresence] Failed to create WebSocket:', e);
this.isConnecting = false;
this.scheduleReconnect();
}
}
disconnect(): void {
this.stopPing();
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
subscribe(callback: PresenceCallback): () => void {
this.listeners.add(callback);
// Send current state immediately
if (Object.keys(this.presence).length > 0) {
callback(this.presence);
}
// Connect if not connected
this.connect();
return () => {
this.listeners.delete(callback);
// Disconnect if no listeners
if (this.listeners.size === 0) {
this.disconnect();
}
};
}
getPresence(slug: string): RoomPresence | null {
return this.presence[slug] || null;
}
getAllPresence(): Record<string, RoomPresence> {
return { ...this.presence };
}
private handleMessage(data: any): void {
if (data.type === 'snapshot') {
// Initial snapshot
this.presence = {};
for (const room of data.rooms || []) {
this.presence[room.room_slug] = {
room_slug: room.room_slug,
online_count: room.online_count || 0,
typing_count: room.typing_count || 0
};
}
this.notifyListeners();
} else if (data.type === 'room.presence') {
// Incremental update
this.presence[data.room_slug] = {
room_slug: data.room_slug,
online_count: data.online_count || 0,
typing_count: data.typing_count || 0
};
this.notifyListeners();
}
}
private notifyListeners(): void {
for (const callback of this.listeners) {
try {
callback(this.presence);
} catch (e) {
console.error('[GlobalPresence] Listener error:', e);
}
}
}
private startPing(): void {
this.pingInterval = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send('ping');
}
}, 30000);
}
private stopPing(): void {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
}
private scheduleReconnect(): void {
if (this.reconnectTimeout) return;
this.reconnectTimeout = setTimeout(() => {
this.reconnectTimeout = null;
if (this.listeners.size > 0) {
console.log('[GlobalPresence] Reconnecting...');
this.connect();
}
}, 5000);
}
}
// Singleton instance
export const globalPresenceClient = new GlobalPresenceClient();

View File

@@ -0,0 +1,344 @@
# GLOBAL PRESENCE AGGREGATOR — DAARION.city
Version: 1.0.0
Location: docs/realtime/GLOBAL_PRESENCE_AGGREGATOR_SPEC.md
---
## 0. PURPOSE
Зробити **єдиний центр правди про присутність (presence) та активність** у місті:
- збирати Matrix presence/typing/room-activity на сервері,
- агрегувати їх на рівні кімнат (`city_room`),
- публікувати у NATS як події,
- транслювати у фронтенд через WebSocket з `city-service`.
Результат: DAARION має **"живе місто"**:
- список кімнат `/city` показує:
- скільки людей онлайн,
- активність у реальному часі,
- майбутня City Map (2D/2.5D) живиться цими даними.
---
## 1. ARCHITECTURE OVERVIEW
```
┌─────────────────────────────────────────────────────────────────────────┐
│ DAARION PRESENCE SYSTEM │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────────────┐ ┌─────────────────┐ │
│ │ Matrix │────▶│ matrix-presence- │────▶│ NATS │ │
│ │ Synapse │ │ aggregator │ │ JetStream │ │
│ │ │ │ (sync loop) │ │ │ │
│ └─────────────┘ └──────────────────────┘ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌──────────────────────┐ ┌─────────────────┐ │
│ │ Browser │◀────│ city-service │◀────│ NATS Sub │ │
│ │ (WS) │ │ /ws/city/presence │ │ │ │
│ └─────────────┘ └──────────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
### Компоненти
1. **matrix-presence-aggregator (новий сервіс)**
- читає Matrix sync (presence, typing, room activity),
- тримає у пам'яті поточний стан присутності,
- публікує агреговані події в NATS.
2. **NATS JetStream**
- канал для presence/events:
- `city.presence.room.*`
- `city.presence.user.*`
3. **city-service (розширення)**
- підписується на NATS події,
- тримає WebSocket з'єднання з фронтендом,
- пушить presence/room-activity у браузер.
4. **web (Next.js UI)**
- сторінка `/city`:
- показує `N online` по кожній кімнаті,
- highlight "active" кімнати.
---
## 2. MATRIX SIDE — ЗВІДКИ БРАТИ ПОДІЇ
### 2.1. Окремий Matrix-юзер для агрегації
Спец-акаунт:
- `@presence_daemon:daarion.space`
- права:
- читати presence/typing у всіх `city_*` кімнатах,
- бути учасником цих кімнат.
### 2.2. Sync-loop на сервері
Сервіс `matrix-presence-aggregator`:
- використовує `/sync` Matrix (як клієнт),
- фільтр:
```json
{
"presence": {
"types": ["m.presence"]
},
"room": {
"timeline": { "limit": 0 },
"state": { "limit": 0 },
"ephemeral": {
"types": ["m.typing", "m.receipt"]
}
}
}
```
- робить long-polling з `since` + `timeout`,
- парсить:
- `presence.events``m.presence`,
- `rooms.join[roomId].ephemeral.events``m.typing`, `m.receipt`.
---
## 3. DATA MODEL (IN-MEMORY AGGREGATOR)
### 3.1. Room presence state
```python
from dataclasses import dataclass
from typing import Dict, List, Set, Optional
from datetime import datetime
@dataclass
class UserPresence:
user_id: str # "@user:domain"
status: str # "online" | "offline" | "unavailable"
last_active_ts: float # timestamp
@dataclass
class RoomPresence:
room_id: str # "!....:daarion.space"
alias: Optional[str] # "#city_energy:daarion.space"
city_room_slug: Optional[str] # "energy"
online_count: int
typing_user_ids: List[str]
last_event_ts: float
class PresenceState:
users: Dict[str, UserPresence]
rooms: Dict[str, RoomPresence]
room_members: Dict[str, Set[str]] # room_id -> set of user_ids
```
### 3.2. Мапінг Room → City Room
`matrix-presence-aggregator` має знати `matrix_room_id``city_room.slug`.
**Pull-mode (MVP):**
- при старті сервісу:
- `GET /internal/city/rooms`
- зчитати всі `matrix_room_id` / `matrix_room_alias` / `slug`,
- зібрати мапу `roomId → slug`.
- періодично (кожні 5 хвилин) оновлювати.
---
## 4. NATS EVENTS
### 4.1. Room-level presence
Subject:
```
city.presence.room.<slug>
```
Event payload:
```json
{
"type": "room.presence",
"room_slug": "energy",
"matrix_room_id": "!gykdLyazhkcSZGHmbG:daarion.space",
"matrix_room_alias": "#city_energy:daarion.space",
"online_count": 5,
"typing_count": 1,
"typing_users": ["@user1:daarion.space"],
"last_event_ts": 1732610000000
}
```
### 4.2. User-level presence (опційний)
Subject:
```
city.presence.user.<localpart>
```
Payload:
```json
{
"type": "user.presence",
"matrix_user_id": "@user1:daarion.space",
"status": "online",
"last_active_ts": 1732610000000
}
```
---
## 5. EVENT GENERATION LOGIC
### 5.1. Обробка m.presence
При кожному `m.presence`:
- оновити `PresenceState.users[userId]`,
- для всіх кімнат, де є цей юзер — перерахувати `onlineCount`,
- якщо `onlineCount` змінився — публікувати нову подію.
### 5.2. Обробка m.typing
При `m.typing`:
- `content.user_ids` → список typing у кімнаті.
- Зберегти в `RoomPresence.typing_user_ids`.
- Згенерувати івент `city.presence.room.<slug>`.
### 5.3. Throttling
- подію публікувати тільки якщо `onlineCount` змінився,
- або не частіше ніж раз на 3 секунди на кімнату.
---
## 6. CITY REALTIME GATEWAY (WEBSOCKET)
### 6.1. WebSocket endpoint
```
GET /ws/city/presence
```
Auth: JWT токен у query param або header.
### 6.2. Формат повідомлень
**Snapshot (при підключенні):**
```json
{
"type": "snapshot",
"rooms": [
{ "room_slug": "general", "online_count": 3, "typing_count": 0 },
{ "room_slug": "welcome", "online_count": 1, "typing_count": 0 }
]
}
```
**Incremental update:**
```json
{
"type": "room.presence",
"room_slug": "energy",
"online_count": 5,
"typing_count": 1
}
```
---
## 7. FRONTEND INTEGRATION
### 7.1. Список кімнат `/city`
State:
```typescript
type RoomPresenceUI = {
onlineCount: number;
typingCount: number;
};
const [presenceBySlug, setPresenceBySlug] = useState<Record<string, RoomPresenceUI>>({});
```
WebSocket handler:
```typescript
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'snapshot') {
const presence: Record<string, RoomPresenceUI> = {};
data.rooms.forEach(r => {
presence[r.room_slug] = {
onlineCount: r.online_count,
typingCount: r.typing_count
};
});
setPresenceBySlug(presence);
}
if (data.type === 'room.presence') {
setPresenceBySlug(prev => ({
...prev,
[data.room_slug]: {
onlineCount: data.online_count,
typingCount: data.typing_count
}
}));
}
};
```
### 7.2. UI
- Room card: `X online`, typing badge
- Active room: glow effect
- Typing animation
---
## 8. CONFIG / ENV
### matrix-presence-aggregator
```env
MATRIX_HS_URL=https://app.daarion.space
MATRIX_ACCESS_TOKEN=<presence_daemon_token>
MATRIX_USER_ID=@presence_daemon:daarion.space
CITY_SERVICE_INTERNAL_URL=http://city-service:7001
NATS_URL=nats://nats:4222
ROOM_PRESENCE_THROTTLE_MS=3000
```
### city-service (realtime gateway)
```env
NATS_URL=nats://nats:4222
JWT_SECRET=<secret>
```
---
## 9. ACCEPTANCE CRITERIA
- [ ] matrix-presence-aggregator запущений і синхронізується з Matrix
- [ ] NATS отримує події `city.presence.room.*`
- [ ] city-service має endpoint `/ws/city/presence`
- [ ] При підключенні WS клієнт отримує snapshot
- [ ] При зміні presence клієнт отримує update
- [ ] UI `/city` показує online count для кожної кімнати
- [ ] Typing indicator відображається
---
## 10. FUTURE ENHANCEMENTS
1. **Agent presence** — окремі статуси для AI-агентів
2. **City Map** — візуалізація presence на 2D карті
3. **Push notifications** — сповіщення про активність
4. **Historical analytics** — статистика активності

View File

@@ -16,6 +16,11 @@ import routes_city
import ws_city
import repo_city
from common.redis_client import get_redis, close_redis
from presence_gateway import (
websocket_global_presence,
start_presence_gateway,
stop_presence_gateway
)
# Logging
logging.basicConfig(level=logging.INFO)
@@ -311,10 +316,16 @@ async def websocket_room_endpoint(websocket: WebSocket, room_id: str):
@app.websocket("/ws/city/presence")
async def websocket_presence_endpoint(websocket: WebSocket):
"""WebSocket для Presence System"""
"""WebSocket для Presence System (user heartbeats)"""
await ws_city.websocket_city_presence(websocket)
@app.websocket("/ws/city/global-presence")
async def websocket_global_presence_endpoint(websocket: WebSocket):
"""WebSocket для Global Room Presence (aggregated from Matrix)"""
await websocket_global_presence(websocket)
@app.on_event("startup")
async def startup_event():
"""Запустити background tasks для WebSocket оновлень"""
@@ -334,6 +345,13 @@ async def startup_event():
asyncio.create_task(agents_presence_generator())
asyncio.create_task(ws_city.presence_cleanup_task())
# Start global presence gateway (NATS subscriber)
try:
await start_presence_gateway()
logger.info("✅ Global presence gateway started")
except Exception as e:
logger.warning(f"⚠️ Global presence gateway failed to start: {e}")
logger.info("✅ WebSocket background tasks started")
@@ -341,6 +359,7 @@ async def startup_event():
async def shutdown_event():
"""Cleanup при зупинці"""
logger.info("🛑 City Service shutting down...")
await stop_presence_gateway()
await repo_city.close_pool()
await close_redis()

View File

@@ -0,0 +1,173 @@
"""
Global Presence Gateway for City Service
Subscribes to NATS presence events from matrix-presence-aggregator
and broadcasts to WebSocket clients.
"""
import asyncio
import json
import logging
from typing import Dict, Set, Optional
import os
from fastapi import WebSocket, WebSocketDisconnect
logger = logging.getLogger(__name__)
# NATS URL
NATS_URL = os.getenv("NATS_URL", "nats://localhost:4222")
class GlobalPresenceManager:
"""Manages WebSocket connections for global room presence"""
def __init__(self):
self.connections: Set[WebSocket] = set()
self.room_presence: Dict[str, dict] = {} # slug -> {online_count, typing_count}
self.nc = None # NATS connection
self.is_running = False
async def connect(self, websocket: WebSocket):
"""Add a new WebSocket client"""
await websocket.accept()
self.connections.add(websocket)
# Send initial snapshot
await self._send_snapshot(websocket)
logger.info(f"Global presence client connected. Total: {len(self.connections)}")
def disconnect(self, websocket: WebSocket):
"""Remove a WebSocket client"""
self.connections.discard(websocket)
logger.info(f"Global presence client disconnected. Total: {len(self.connections)}")
async def _send_snapshot(self, websocket: WebSocket):
"""Send current presence snapshot to a client"""
rooms = [
{
"room_slug": slug,
"online_count": data.get("online_count", 0),
"typing_count": data.get("typing_count", 0)
}
for slug, data in self.room_presence.items()
]
await websocket.send_json({
"type": "snapshot",
"rooms": rooms
})
async def broadcast(self, message: dict):
"""Broadcast a message to all connected clients"""
if not self.connections:
return
disconnected = set()
for websocket in self.connections:
try:
await websocket.send_json(message)
except Exception as e:
logger.error(f"Failed to send to websocket: {e}")
disconnected.add(websocket)
# Remove disconnected clients
for ws in disconnected:
self.connections.discard(ws)
def update_room_presence(self, slug: str, online_count: int, typing_count: int):
"""Update cached presence for a room"""
self.room_presence[slug] = {
"online_count": online_count,
"typing_count": typing_count
}
async def start_nats_subscriber(self):
"""Start NATS subscription for presence events"""
try:
import nats
self.nc = await nats.connect(NATS_URL)
self.is_running = True
logger.info(f"Connected to NATS at {NATS_URL} for presence events")
# Subscribe to room presence events
await self.nc.subscribe("city.presence.room.*", cb=self._on_room_presence)
logger.info("Subscribed to city.presence.room.*")
except ImportError:
logger.warning("nats-py not installed, NATS presence disabled")
except Exception as e:
logger.error(f"Failed to connect to NATS: {e}")
async def _on_room_presence(self, msg):
"""Handle room presence event from NATS"""
try:
data = json.loads(msg.data.decode())
slug = data.get("room_slug")
online_count = data.get("online_count", 0)
typing_count = data.get("typing_count", 0)
if slug:
# Update cache
self.update_room_presence(slug, online_count, typing_count)
# Broadcast to WebSocket clients
await self.broadcast({
"type": "room.presence",
"room_slug": slug,
"online_count": online_count,
"typing_count": typing_count
})
logger.debug(f"Room presence update: {slug} -> {online_count} online, {typing_count} typing")
except Exception as e:
logger.error(f"Error processing NATS presence event: {e}")
async def stop(self):
"""Stop NATS subscription"""
self.is_running = False
if self.nc:
await self.nc.drain()
logger.info("NATS connection closed")
# Global instance
global_presence_manager = GlobalPresenceManager()
async def websocket_global_presence(websocket: WebSocket):
"""
WebSocket endpoint for global room presence
/ws/city/global-presence
Sends:
- Initial snapshot of all room presence
- Real-time updates when presence changes
"""
await global_presence_manager.connect(websocket)
try:
while True:
# Keep connection alive, handle pings
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
global_presence_manager.disconnect(websocket)
async def start_presence_gateway():
"""Start the global presence gateway (call on startup)"""
await global_presence_manager.start_nats_subscriber()
async def stop_presence_gateway():
"""Stop the global presence gateway (call on shutdown)"""
await global_presence_manager.stop()

View File

@@ -0,0 +1,14 @@
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy source
COPY . .
# Run the service
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7026"]

View File

@@ -0,0 +1,25 @@
"""Configuration for Matrix Presence Aggregator"""
import os
from dotenv import load_dotenv
load_dotenv()
# Matrix settings
MATRIX_HS_URL = os.getenv("MATRIX_HS_URL", "https://app.daarion.space")
MATRIX_ACCESS_TOKEN = os.getenv("MATRIX_ACCESS_TOKEN", "")
MATRIX_USER_ID = os.getenv("MATRIX_USER_ID", "@presence_daemon:daarion.space")
# City Service for room mapping
CITY_SERVICE_URL = os.getenv("CITY_SERVICE_URL", "http://localhost:7001")
INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "super-secret-internal-key")
# NATS
NATS_URL = os.getenv("NATS_URL", "nats://localhost:4222")
# Throttling
ROOM_PRESENCE_THROTTLE_MS = int(os.getenv("ROOM_PRESENCE_THROTTLE_MS", "3000"))
# Sync settings
SYNC_TIMEOUT_MS = int(os.getenv("SYNC_TIMEOUT_MS", "30000"))
ROOM_MAPPING_REFRESH_INTERVAL_S = int(os.getenv("ROOM_MAPPING_REFRESH_INTERVAL_S", "300"))

View File

@@ -0,0 +1,202 @@
"""
Matrix Presence Aggregator Service
Aggregates Matrix presence/typing events and publishes to NATS
for real-time city presence in DAARION.
"""
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import Dict
import httpx
from fastapi import FastAPI
from config import (
CITY_SERVICE_URL,
INTERNAL_API_KEY,
ROOM_PRESENCE_THROTTLE_MS,
ROOM_MAPPING_REFRESH_INTERVAL_S
)
from models import PresenceState
from matrix_sync import MatrixSyncClient, get_room_members, join_room
from nats_publisher import PresencePublisher
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Global state
state = PresenceState()
publisher = PresencePublisher()
sync_client: MatrixSyncClient = None
async def fetch_room_mappings() -> Dict[str, str]:
"""Fetch room_id -> slug mappings from city-service"""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.get(
f"{CITY_SERVICE_URL}/api/city/rooms",
headers={"X-Internal-API-Key": INTERNAL_API_KEY}
)
response.raise_for_status()
rooms = response.json()
mappings = {}
for room in rooms:
matrix_room_id = room.get("matrix_room_id")
slug = room.get("slug")
if matrix_room_id and slug:
mappings[matrix_room_id] = slug
logger.info(f"Fetched {len(mappings)} room mappings from city-service")
return mappings
except Exception as e:
logger.error(f"Failed to fetch room mappings: {e}")
return {}
async def refresh_room_mappings_loop():
"""Periodically refresh room mappings"""
while True:
try:
mappings = await fetch_room_mappings()
if mappings:
state.set_room_mapping(mappings)
# Join all mapped rooms
for room_id in mappings.keys():
await join_room(room_id)
# Fetch initial members
members = await get_room_members(room_id)
for user_id in members:
state.add_room_member(room_id, user_id)
except Exception as e:
logger.error(f"Error refreshing room mappings: {e}")
await asyncio.sleep(ROOM_MAPPING_REFRESH_INTERVAL_S)
async def on_presence(user_id: str, status: str):
"""Handle presence update from Matrix"""
affected_slugs = state.update_user_presence(user_id, status)
# Publish updates for affected rooms
for slug in affected_slugs:
room_id = state.slug_to_room_id.get(slug)
if room_id:
room = state.get_room_presence(room_id)
if room and state.should_publish(room_id, ROOM_PRESENCE_THROTTLE_MS):
await publisher.publish_room_presence(room)
async def on_typing(room_id: str, typing_user_ids: list):
"""Handle typing update from Matrix"""
slug = state.update_room_typing(room_id, typing_user_ids)
if slug:
room = state.get_room_presence(room_id)
if room and state.should_publish(room_id, ROOM_PRESENCE_THROTTLE_MS):
await publisher.publish_room_presence(room)
async def on_room_member(room_id: str, user_id: str, membership: str):
"""Handle membership change from Matrix"""
if membership == "join":
state.add_room_member(room_id, user_id)
else:
state.remove_room_member(room_id, user_id)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown events"""
global sync_client
# Startup
logger.info("Starting Matrix Presence Aggregator")
# Connect to NATS
await publisher.connect()
# Initial room mapping fetch
mappings = await fetch_room_mappings()
if mappings:
state.set_room_mapping(mappings)
# Join all rooms and get initial members
for room_id in mappings.keys():
await join_room(room_id)
members = await get_room_members(room_id)
for user_id in members:
state.add_room_member(room_id, user_id)
# Start sync client
sync_client = MatrixSyncClient(
on_presence=on_presence,
on_typing=on_typing,
on_room_member=on_room_member
)
# Start background tasks
asyncio.create_task(sync_client.start())
asyncio.create_task(refresh_room_mappings_loop())
logger.info("Matrix Presence Aggregator started successfully")
yield
# Shutdown
logger.info("Shutting down Matrix Presence Aggregator")
if sync_client:
await sync_client.stop()
await publisher.disconnect()
app = FastAPI(
title="Matrix Presence Aggregator",
description="Aggregates Matrix presence events for DAARION city",
version="1.0.0",
lifespan=lifespan
)
@app.get("/health")
async def health():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "matrix-presence-aggregator",
"nats_connected": publisher.is_connected,
"rooms_tracked": len(state.rooms),
"users_tracked": len(state.users)
}
@app.get("/status")
async def status():
"""Detailed status endpoint"""
rooms = []
for room in state.get_all_room_presences():
rooms.append({
"slug": room.city_room_slug,
"room_id": room.room_id,
"online_count": room.online_count,
"typing_count": len(room.typing_user_ids)
})
return {
"nats_connected": publisher.is_connected,
"sync_running": sync_client.is_running if sync_client else False,
"rooms": rooms,
"total_users_tracked": len(state.users)
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7026)

View File

@@ -0,0 +1,174 @@
"""Matrix sync loop for presence aggregation"""
import asyncio
import logging
import httpx
from typing import Optional, Dict, Any, Callable, Awaitable
from config import (
MATRIX_HS_URL,
MATRIX_ACCESS_TOKEN,
MATRIX_USER_ID,
SYNC_TIMEOUT_MS
)
logger = logging.getLogger(__name__)
class MatrixSyncClient:
"""Client for Matrix /sync endpoint to get presence and typing events"""
def __init__(
self,
on_presence: Callable[[str, str], Awaitable[None]],
on_typing: Callable[[str, list], Awaitable[None]],
on_room_member: Callable[[str, str, str], Awaitable[None]],
):
self.base_url = MATRIX_HS_URL
self.access_token = MATRIX_ACCESS_TOKEN
self.user_id = MATRIX_USER_ID
self.since_token: Optional[str] = None
self.is_running = False
# Callbacks
self.on_presence = on_presence # (user_id, status)
self.on_typing = on_typing # (room_id, typing_user_ids)
self.on_room_member = on_room_member # (room_id, user_id, membership)
# Sync filter
self.filter = {
"presence": {
"types": ["m.presence"]
},
"room": {
"timeline": {"limit": 0},
"state": {
"types": ["m.room.member"],
"lazy_load_members": True
},
"ephemeral": {
"types": ["m.typing"]
}
}
}
async def start(self):
"""Start the sync loop"""
self.is_running = True
logger.info(f"Starting Matrix sync loop as {self.user_id}")
async with httpx.AsyncClient(timeout=60.0) as client:
while self.is_running:
try:
await self._sync_once(client)
except httpx.TimeoutException:
logger.debug("Sync timeout (normal for long-polling)")
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error during sync: {e.response.status_code}")
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Sync error: {e}")
await asyncio.sleep(5)
async def stop(self):
"""Stop the sync loop"""
self.is_running = False
logger.info("Stopping Matrix sync loop")
async def _sync_once(self, client: httpx.AsyncClient):
"""Perform one sync request"""
import json
params = {
"timeout": str(SYNC_TIMEOUT_MS),
"filter": json.dumps(self.filter)
}
if self.since_token:
params["since"] = self.since_token
response = await client.get(
f"{self.base_url}/_matrix/client/v3/sync",
params=params,
headers={"Authorization": f"Bearer {self.access_token}"}
)
response.raise_for_status()
data = response.json()
# Update since token
self.since_token = data.get("next_batch")
# Process presence events
await self._process_presence(data.get("presence", {}).get("events", []))
# Process room events
rooms = data.get("rooms", {})
await self._process_rooms(rooms.get("join", {}))
async def _process_presence(self, events: list):
"""Process m.presence events"""
for event in events:
if event.get("type") != "m.presence":
continue
user_id = event.get("sender")
content = event.get("content", {})
status = content.get("presence", "offline")
if user_id:
logger.debug(f"Presence update: {user_id} -> {status}")
await self.on_presence(user_id, status)
async def _process_rooms(self, joined_rooms: Dict[str, Any]):
"""Process room events (typing, membership)"""
for room_id, room_data in joined_rooms.items():
# Process ephemeral events (typing)
ephemeral = room_data.get("ephemeral", {}).get("events", [])
for event in ephemeral:
if event.get("type") == "m.typing":
typing_users = event.get("content", {}).get("user_ids", [])
logger.debug(f"Typing in {room_id}: {typing_users}")
await self.on_typing(room_id, typing_users)
# Process state events (membership)
state = room_data.get("state", {}).get("events", [])
for event in state:
if event.get("type") == "m.room.member":
user_id = event.get("state_key")
membership = event.get("content", {}).get("membership", "leave")
if user_id:
logger.debug(f"Membership: {user_id} in {room_id} -> {membership}")
await self.on_room_member(room_id, user_id, membership)
async def get_room_members(room_id: str) -> list:
"""Get current members of a room"""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.get(
f"{MATRIX_HS_URL}/_matrix/client/v3/rooms/{room_id}/joined_members",
headers={"Authorization": f"Bearer {MATRIX_ACCESS_TOKEN}"}
)
response.raise_for_status()
data = response.json()
return list(data.get("joined", {}).keys())
except Exception as e:
logger.error(f"Failed to get room members for {room_id}: {e}")
return []
async def join_room(room_id_or_alias: str) -> Optional[str]:
"""Join a room and return the room_id"""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.post(
f"{MATRIX_HS_URL}/_matrix/client/v3/join/{room_id_or_alias}",
headers={"Authorization": f"Bearer {MATRIX_ACCESS_TOKEN}"},
json={}
)
response.raise_for_status()
data = response.json()
return data.get("room_id")
except Exception as e:
logger.error(f"Failed to join room {room_id_or_alias}: {e}")
return None

View File

@@ -0,0 +1,135 @@
"""Data models for Presence Aggregator"""
from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional
from datetime import datetime
import time
@dataclass
class UserPresence:
user_id: str # "@user:domain"
status: str # "online" | "offline" | "unavailable"
last_active_ts: float = field(default_factory=time.time)
@dataclass
class RoomPresence:
room_id: str # "!....:daarion.space"
alias: Optional[str] = None # "#city_energy:daarion.space"
city_room_slug: Optional[str] = None # "energy"
online_count: int = 0
typing_user_ids: List[str] = field(default_factory=list)
last_event_ts: float = field(default_factory=time.time)
last_published_ts: float = 0 # For throttling
class PresenceState:
"""In-memory state for presence aggregation"""
def __init__(self):
self.users: Dict[str, UserPresence] = {}
self.rooms: Dict[str, RoomPresence] = {}
self.room_members: Dict[str, Set[str]] = {} # room_id -> set of user_ids
self.room_id_to_slug: Dict[str, str] = {} # matrix_room_id -> city_room_slug
self.slug_to_room_id: Dict[str, str] = {} # city_room_slug -> matrix_room_id
def update_user_presence(self, user_id: str, status: str) -> List[str]:
"""
Update user presence and return list of affected room slugs
"""
prev_status = self.users.get(user_id, UserPresence(user_id, "offline")).status
self.users[user_id] = UserPresence(user_id, status)
# Find rooms where this user is a member
affected_slugs = []
for room_id, members in self.room_members.items():
if user_id in members:
slug = self.room_id_to_slug.get(room_id)
if slug:
# Recalculate online count for this room
self._recalculate_room_online_count(room_id)
affected_slugs.append(slug)
return affected_slugs
def update_room_typing(self, room_id: str, typing_user_ids: List[str]) -> Optional[str]:
"""
Update typing users for a room and return the slug if changed
"""
if room_id not in self.rooms:
slug = self.room_id_to_slug.get(room_id)
if slug:
self.rooms[room_id] = RoomPresence(room_id, city_room_slug=slug)
else:
return None
room = self.rooms[room_id]
if room.typing_user_ids != typing_user_ids:
room.typing_user_ids = typing_user_ids
room.last_event_ts = time.time()
return room.city_room_slug
return None
def add_room_member(self, room_id: str, user_id: str):
"""Add a user to a room's member list"""
if room_id not in self.room_members:
self.room_members[room_id] = set()
self.room_members[room_id].add(user_id)
def remove_room_member(self, room_id: str, user_id: str):
"""Remove a user from a room's member list"""
if room_id in self.room_members:
self.room_members[room_id].discard(user_id)
def _recalculate_room_online_count(self, room_id: str):
"""Recalculate online count for a room based on member presence"""
if room_id not in self.rooms:
slug = self.room_id_to_slug.get(room_id)
if slug:
self.rooms[room_id] = RoomPresence(room_id, city_room_slug=slug)
else:
return
members = self.room_members.get(room_id, set())
online_count = 0
for user_id in members:
user = self.users.get(user_id)
if user and user.status in ("online", "unavailable"):
online_count += 1
self.rooms[room_id].online_count = online_count
self.rooms[room_id].last_event_ts = time.time()
def get_room_presence(self, room_id: str) -> Optional[RoomPresence]:
"""Get presence info for a room"""
return self.rooms.get(room_id)
def get_all_room_presences(self) -> List[RoomPresence]:
"""Get presence info for all tracked rooms"""
return list(self.rooms.values())
def set_room_mapping(self, mappings: Dict[str, str]):
"""Set room_id -> slug mapping"""
self.room_id_to_slug = mappings
self.slug_to_room_id = {v: k for k, v in mappings.items()}
# Initialize RoomPresence for all mapped rooms
for room_id, slug in mappings.items():
if room_id not in self.rooms:
self.rooms[room_id] = RoomPresence(room_id, city_room_slug=slug)
else:
self.rooms[room_id].city_room_slug = slug
def should_publish(self, room_id: str, throttle_ms: int) -> bool:
"""Check if we should publish an event (throttling)"""
room = self.rooms.get(room_id)
if not room:
return False
now = time.time() * 1000 # ms
if now - room.last_published_ts >= throttle_ms:
room.last_published_ts = now
return True
return False

View File

@@ -0,0 +1,87 @@
"""NATS publisher for presence events"""
import asyncio
import json
import logging
from typing import Optional
import nats
from nats.aio.client import Client as NATS
from config import NATS_URL
from models import RoomPresence
logger = logging.getLogger(__name__)
class PresencePublisher:
"""Publishes presence events to NATS"""
def __init__(self):
self.nc: Optional[NATS] = None
self.is_connected = False
async def connect(self):
"""Connect to NATS"""
try:
self.nc = await nats.connect(NATS_URL)
self.is_connected = True
logger.info(f"Connected to NATS at {NATS_URL}")
except Exception as e:
logger.error(f"Failed to connect to NATS: {e}")
self.is_connected = False
async def disconnect(self):
"""Disconnect from NATS"""
if self.nc:
await self.nc.drain()
self.is_connected = False
logger.info("Disconnected from NATS")
async def publish_room_presence(self, room: RoomPresence):
"""Publish room presence event"""
if not self.is_connected or not self.nc:
logger.warning("Not connected to NATS, skipping publish")
return
if not room.city_room_slug:
logger.debug(f"Room {room.room_id} has no slug, skipping")
return
subject = f"city.presence.room.{room.city_room_slug}"
payload = {
"type": "room.presence",
"room_slug": room.city_room_slug,
"matrix_room_id": room.room_id,
"matrix_room_alias": room.alias,
"online_count": room.online_count,
"typing_count": len(room.typing_user_ids),
"typing_users": room.typing_user_ids,
"last_event_ts": int(room.last_event_ts * 1000)
}
try:
await self.nc.publish(subject, json.dumps(payload).encode())
logger.debug(f"Published to {subject}: online={room.online_count}, typing={len(room.typing_user_ids)}")
except Exception as e:
logger.error(f"Failed to publish to NATS: {e}")
async def publish_user_presence(self, user_id: str, status: str, last_active_ts: float):
"""Publish user presence event"""
if not self.is_connected or not self.nc:
return
# Extract localpart from @user:domain
localpart = user_id.split(":")[0].lstrip("@")
subject = f"city.presence.user.{localpart}"
payload = {
"type": "user.presence",
"matrix_user_id": user_id,
"status": status,
"last_active_ts": int(last_active_ts * 1000)
}
try:
await self.nc.publish(subject, json.dumps(payload).encode())
logger.debug(f"Published user presence: {user_id} -> {status}")
except Exception as e:
logger.error(f"Failed to publish user presence: {e}")

View File

@@ -0,0 +1,7 @@
fastapi==0.109.0
uvicorn==0.27.0
httpx==0.26.0
nats-py==2.6.0
python-dotenv==1.0.0
pydantic==2.5.3