From 5bed515852e2a7b8bc3ebc08ea2b1cbef2a0b8c8 Mon Sep 17 00:00:00 2001 From: Apple Date: Wed, 26 Nov 2025 14:43:46 -0800 Subject: [PATCH] feat: Upgrade Global Presence to SSE architecture - matrix-presence-aggregator v2 with SSE endpoint - Created @presence_daemon Matrix user - SSE proxy in Next.js /api/presence/stream - Updated frontend to use SSE instead of WebSocket - Real-time city online count and room presence --- apps/web/src/app/api/presence/stream/route.ts | 64 ++++++ apps/web/src/app/city/page.tsx | 19 +- apps/web/src/hooks/useGlobalPresence.ts | 22 +- apps/web/src/lib/global-presence.ts | 150 +++++++------ .../matrix-presence-aggregator/Dockerfile | 9 +- .../app/__init__.py | 2 + .../app/aggregator.py | 154 +++++++++++++ .../matrix-presence-aggregator/app/config.py | 36 ++++ .../matrix-presence-aggregator/app/main.py | 159 ++++++++++++++ .../app/matrix_client.py | 94 ++++++++ .../matrix-presence-aggregator/app/models.py | 24 +++ .../app/rooms_source.py | 69 ++++++ services/matrix-presence-aggregator/config.py | 25 --- services/matrix-presence-aggregator/main.py | 202 ------------------ .../matrix-presence-aggregator/matrix_sync.py | 174 --------------- services/matrix-presence-aggregator/models.py | 135 ------------ .../nats_publisher.py | 87 -------- .../requirements.txt | 13 +- 18 files changed, 709 insertions(+), 729 deletions(-) create mode 100644 apps/web/src/app/api/presence/stream/route.ts create mode 100644 services/matrix-presence-aggregator/app/__init__.py create mode 100644 services/matrix-presence-aggregator/app/aggregator.py create mode 100644 services/matrix-presence-aggregator/app/config.py create mode 100644 services/matrix-presence-aggregator/app/main.py create mode 100644 services/matrix-presence-aggregator/app/matrix_client.py create mode 100644 services/matrix-presence-aggregator/app/models.py create mode 100644 services/matrix-presence-aggregator/app/rooms_source.py delete mode 100644 services/matrix-presence-aggregator/config.py delete mode 100644 services/matrix-presence-aggregator/main.py delete mode 100644 services/matrix-presence-aggregator/matrix_sync.py delete mode 100644 services/matrix-presence-aggregator/models.py delete mode 100644 services/matrix-presence-aggregator/nats_publisher.py diff --git a/apps/web/src/app/api/presence/stream/route.ts b/apps/web/src/app/api/presence/stream/route.ts new file mode 100644 index 00000000..514d39c2 --- /dev/null +++ b/apps/web/src/app/api/presence/stream/route.ts @@ -0,0 +1,64 @@ +import { NextRequest } from "next/server"; + +export const runtime = "nodejs"; +export const dynamic = "force-dynamic"; + +const PRESENCE_AGGREGATOR_URL = process.env.PRESENCE_AGGREGATOR_URL || "http://localhost:8085/presence/stream"; + +export async function GET(req: NextRequest) { + try { + const upstream = await fetch(PRESENCE_AGGREGATOR_URL, { + headers: { + accept: "text/event-stream", + }, + }); + + if (!upstream.ok) { + return new Response( + JSON.stringify({ error: "Failed to connect to presence aggregator" }), + { status: 502, headers: { "Content-Type": "application/json" } } + ); + } + + const readable = new ReadableStream({ + start(controller) { + const reader = upstream.body!.getReader(); + + function push() { + reader.read().then(({ done, value }) => { + if (done) { + controller.close(); + return; + } + controller.enqueue(value); + push(); + }).catch((err) => { + console.error("SSE proxy error:", err); + controller.close(); + }); + } + + push(); + }, + cancel() { + upstream.body?.cancel(); + }, + }); + + return new Response(readable, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + }); + } catch (error) { + console.error("SSE proxy connection error:", error); + return new Response( + JSON.stringify({ error: "Presence aggregator unavailable" }), + { status: 503, headers: { "Content-Type": "application/json" } } + ); + } +} + diff --git a/apps/web/src/app/city/page.tsx b/apps/web/src/app/city/page.tsx index a52bf0fa..e0f64cc1 100644 --- a/apps/web/src/app/city/page.tsx +++ b/apps/web/src/app/city/page.tsx @@ -10,7 +10,7 @@ import { useGlobalPresence } from '@/hooks/useGlobalPresence' export default function CityPage() { const [rooms, setRooms] = useState([]) const [loading, setLoading] = useState(true) - const presence = useGlobalPresence() + const { cityOnline, roomsPresence } = useGlobalPresence() useEffect(() => { async function fetchRooms() { @@ -26,11 +26,12 @@ export default function CityPage() { fetchRooms() }, []) - // Calculate total online from presence or fallback to API data - const totalOnline = Object.values(presence).reduce((sum, p) => sum + p.online_count, 0) || - rooms.reduce((sum, r) => sum + r.members_online, 0) + // Use SSE presence data if available, otherwise fallback to API data + const totalOnline = cityOnline > 0 + ? cityOnline + : rooms.reduce((sum, r) => sum + r.members_online, 0) - const activeRooms = Object.values(presence).filter(p => p.online_count > 0).length || + const activeRooms = Object.values(roomsPresence).filter(p => p.online > 0).length || rooms.filter(r => r.members_online > 0).length if (loading) { @@ -84,7 +85,7 @@ export default function CityPage() { ))} @@ -124,13 +125,13 @@ export default function CityPage() { interface RoomCardProps { room: CityRoom - livePresence?: { online_count: number; typing_count: number } + livePresence?: { online: number; typing: number } } function RoomCard({ room, livePresence }: RoomCardProps) { // Use live presence if available, otherwise fallback to API data - const onlineCount = livePresence?.online_count ?? room.members_online - const typingCount = livePresence?.typing_count ?? 0 + const onlineCount = livePresence?.online ?? room.members_online + const typingCount = livePresence?.typing ?? 0 const isActive = onlineCount > 0 return ( diff --git a/apps/web/src/hooks/useGlobalPresence.ts b/apps/web/src/hooks/useGlobalPresence.ts index 2fb4c84a..6450ef89 100644 --- a/apps/web/src/hooks/useGlobalPresence.ts +++ b/apps/web/src/hooks/useGlobalPresence.ts @@ -4,27 +4,29 @@ import { useState, useEffect } from 'react' import { globalPresenceClient, RoomPresence } from '@/lib/global-presence' /** - * Hook for subscribing to global room presence updates + * Hook for subscribing to global room presence updates via SSE */ -export function useGlobalPresence(): Record { - const [presence, setPresence] = useState>({}) +export function useGlobalPresence() { + const [cityOnline, setCityOnline] = useState(0) + const [roomsPresence, setRoomsPresence] = useState>({}) useEffect(() => { - const unsubscribe = globalPresenceClient.subscribe((newPresence) => { - setPresence(newPresence) + const unsubscribe = globalPresenceClient.subscribe((newCityOnline, newRoomsPresence) => { + setCityOnline(newCityOnline) + setRoomsPresence(newRoomsPresence) }) return unsubscribe }, []) - return presence + return { cityOnline, roomsPresence } } /** - * Hook for getting presence of a specific room + * Hook for getting presence of a specific room by ID */ -export function useRoomPresence(slug: string): RoomPresence | null { - const allPresence = useGlobalPresence() - return allPresence[slug] || null +export function useRoomPresence(roomId: string): RoomPresence | null { + const { roomsPresence } = useGlobalPresence() + return roomsPresence[roomId] || null } diff --git a/apps/web/src/lib/global-presence.ts b/apps/web/src/lib/global-presence.ts index 7e3266f5..bd99403f 100644 --- a/apps/web/src/lib/global-presence.ts +++ b/apps/web/src/lib/global-presence.ts @@ -1,84 +1,96 @@ /** - * Global Presence WebSocket Client + * Global Presence SSE Client * - * Connects to /ws/city/global-presence for real-time room presence updates + * Connects to /api/presence/stream for real-time room presence updates via SSE */ export interface RoomPresence { - room_slug: string; - online_count: number; - typing_count: number; + room_id: string; + matrix_room_id?: string; + online: number; + typing: number; } -export type PresenceCallback = (presence: Record) => void; +export interface CityPresence { + online_total: number; + rooms_online: number; +} + +export interface PresenceEvent { + type: "presence_update"; + timestamp: string; + city: CityPresence; + rooms: RoomPresence[]; +} + +export type PresenceCallback = ( + cityOnline: number, + roomsPresence: Record +) => void; class GlobalPresenceClient { - private ws: WebSocket | null = null; - private presence: Record = {}; + private eventSource: EventSource | null = null; + private cityOnline: number = 0; + private roomsPresence: Record = {}; private listeners: Set = new Set(); private reconnectTimeout: NodeJS.Timeout | null = null; - private pingInterval: NodeJS.Timeout | null = null; private isConnecting = false; connect(): void { - if (this.ws?.readyState === WebSocket.OPEN || this.isConnecting) { + if (this.eventSource || this.isConnecting) { return; } + if (typeof window === 'undefined') { + return; // SSR - don't connect + } + this.isConnecting = true; - // Determine WebSocket URL - const protocol = typeof window !== 'undefined' && window.location.protocol === 'https:' ? 'wss:' : 'ws:'; - const host = typeof window !== 'undefined' ? window.location.host : 'localhost:7001'; - const wsUrl = `${protocol}//${host}/ws/city/global-presence`; - - console.log('[GlobalPresence] Connecting to', wsUrl); + const sseUrl = "/api/presence/stream"; + console.log('[GlobalPresence] Connecting to SSE:', sseUrl); try { - this.ws = new WebSocket(wsUrl); + this.eventSource = new EventSource(sseUrl); - this.ws.onopen = () => { - console.log('[GlobalPresence] Connected'); + this.eventSource.onopen = () => { + console.log('[GlobalPresence] SSE Connected'); this.isConnecting = false; - this.startPing(); }; - this.ws.onmessage = (event) => { + this.eventSource.onmessage = (event) => { try { - const data = JSON.parse(event.data); + const data: PresenceEvent = JSON.parse(event.data); this.handleMessage(data); } catch (e) { - console.error('[GlobalPresence] Failed to parse message:', e); + // Ignore keep-alive comments + if (!event.data.startsWith(':')) { + console.error('[GlobalPresence] Failed to parse message:', e); + } } }; - this.ws.onclose = () => { - console.log('[GlobalPresence] Disconnected'); + this.eventSource.onerror = (error) => { + console.error('[GlobalPresence] SSE error:', error); this.isConnecting = false; - this.stopPing(); + this.disconnect(); this.scheduleReconnect(); }; - - this.ws.onerror = (error) => { - console.error('[GlobalPresence] WebSocket error:', error); - this.isConnecting = false; - }; } catch (e) { - console.error('[GlobalPresence] Failed to create WebSocket:', e); + console.error('[GlobalPresence] Failed to create EventSource:', e); this.isConnecting = false; this.scheduleReconnect(); } } disconnect(): void { - this.stopPing(); if (this.reconnectTimeout) { clearTimeout(this.reconnectTimeout); this.reconnectTimeout = null; } - if (this.ws) { - this.ws.close(); - this.ws = null; + if (this.eventSource) { + this.eventSource.close(); + this.eventSource = null; } } @@ -86,8 +98,8 @@ class GlobalPresenceClient { this.listeners.add(callback); // Send current state immediately - if (Object.keys(this.presence).length > 0) { - callback(this.presence); + if (this.cityOnline > 0 || Object.keys(this.roomsPresence).length > 0) { + callback(this.cityOnline, this.roomsPresence); } // Connect if not connected @@ -103,62 +115,44 @@ class GlobalPresenceClient { }; } - getPresence(slug: string): RoomPresence | null { - return this.presence[slug] || null; + getCityOnline(): number { + return this.cityOnline; } - getAllPresence(): Record { - return { ...this.presence }; + getRoomPresence(roomId: string): RoomPresence | null { + return this.roomsPresence[roomId] || null; } - private handleMessage(data: any): void { - if (data.type === 'snapshot') { - // Initial snapshot - this.presence = {}; - for (const room of data.rooms || []) { - this.presence[room.room_slug] = { - room_slug: room.room_slug, - online_count: room.online_count || 0, - typing_count: room.typing_count || 0 - }; - } - this.notifyListeners(); - } else if (data.type === 'room.presence') { - // Incremental update - this.presence[data.room_slug] = { - room_slug: data.room_slug, - online_count: data.online_count || 0, - typing_count: data.typing_count || 0 - }; - this.notifyListeners(); + getAllRoomsPresence(): Record { + return { ...this.roomsPresence }; + } + + private handleMessage(data: PresenceEvent): void { + if (data.type !== 'presence_update') return; + + // Update city stats + this.cityOnline = data.city?.online_total || 0; + + // Update rooms + const newRoomsPresence: Record = {}; + for (const room of data.rooms || []) { + newRoomsPresence[room.room_id] = room; } + this.roomsPresence = newRoomsPresence; + + this.notifyListeners(); } private notifyListeners(): void { for (const callback of this.listeners) { try { - callback(this.presence); + callback(this.cityOnline, this.roomsPresence); } catch (e) { console.error('[GlobalPresence] Listener error:', e); } } } - private startPing(): void { - this.pingInterval = setInterval(() => { - if (this.ws?.readyState === WebSocket.OPEN) { - this.ws.send('ping'); - } - }, 30000); - } - - private stopPing(): void { - if (this.pingInterval) { - clearInterval(this.pingInterval); - this.pingInterval = null; - } - } - private scheduleReconnect(): void { if (this.reconnectTimeout) return; diff --git a/services/matrix-presence-aggregator/Dockerfile b/services/matrix-presence-aggregator/Dockerfile index 86c8f8c0..fc173de5 100644 --- a/services/matrix-presence-aggregator/Dockerfile +++ b/services/matrix-presence-aggregator/Dockerfile @@ -2,13 +2,16 @@ FROM python:3.11-slim WORKDIR /app +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + # Install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -# Copy source -COPY . . +# Copy application +COPY app ./app # Run the service -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7026"] +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8085"] diff --git a/services/matrix-presence-aggregator/app/__init__.py b/services/matrix-presence-aggregator/app/__init__.py new file mode 100644 index 00000000..9a467c9c --- /dev/null +++ b/services/matrix-presence-aggregator/app/__init__.py @@ -0,0 +1,2 @@ +# Matrix Presence Aggregator + diff --git a/services/matrix-presence-aggregator/app/aggregator.py b/services/matrix-presence-aggregator/app/aggregator.py new file mode 100644 index 00000000..1c929c1c --- /dev/null +++ b/services/matrix-presence-aggregator/app/aggregator.py @@ -0,0 +1,154 @@ +"""Presence aggregation logic with caching and broadcasting""" +import asyncio +from datetime import datetime, timezone +from typing import List, Optional +import logging + +from .models import PresenceSnapshot, RoomPresence, CityPresence +from .matrix_client import MatrixClient +from .rooms_source import RoomsSource + +logger = logging.getLogger(__name__) + + +class PresenceAggregator: + """ + Aggregates presence data from Matrix and broadcasts to subscribers. + + - Periodically polls Matrix for room members and presence + - Caches the latest snapshot + - Broadcasts updates to SSE subscribers + """ + + def __init__( + self, + matrix_client: MatrixClient, + rooms_source: RoomsSource, + poll_interval_seconds: int = 5, + ): + self.matrix_client = matrix_client + self.rooms_source = rooms_source + self.poll_interval_seconds = poll_interval_seconds + + self._snapshot: Optional[PresenceSnapshot] = None + self._subscribers: List[asyncio.Queue] = [] + self._running = False + + def get_snapshot(self) -> Optional[PresenceSnapshot]: + """Get the latest cached snapshot""" + return self._snapshot + + def register_subscriber(self) -> asyncio.Queue: + """Register a new SSE subscriber""" + q: asyncio.Queue = asyncio.Queue() + self._subscribers.append(q) + logger.info(f"Subscriber registered. Total: {len(self._subscribers)}") + return q + + def unregister_subscriber(self, q: asyncio.Queue): + """Unregister an SSE subscriber""" + if q in self._subscribers: + self._subscribers.remove(q) + logger.info(f"Subscriber unregistered. Total: {len(self._subscribers)}") + + async def _broadcast(self, snapshot: PresenceSnapshot): + """Broadcast snapshot to all subscribers""" + for q in list(self._subscribers): + try: + # Don't block if queue is full + if q.qsize() < 10: + await q.put(snapshot) + except asyncio.CancelledError: + pass + + async def _compute_snapshot(self) -> PresenceSnapshot: + """Compute a new presence snapshot from Matrix""" + rooms = self.rooms_source.get_rooms() + + if not rooms: + logger.warning("No rooms with matrix_room_id found") + + room_presences: List[RoomPresence] = [] + city_online_total = 0 + rooms_online = 0 + + for r in rooms: + matrix_room_id = r["matrix_room_id"] + + try: + # Get room members + members = await self.matrix_client.get_room_members(matrix_room_id) + + # Get presence for each member + online_count = 0 + for member in members: + user_id = member.get("user_id") + if not user_id: + continue + + presence = await self.matrix_client.get_presence(user_id) + if presence in ("online", "unavailable"): + online_count += 1 + + # Get typing (currently returns empty, needs sync loop) + typing_users = await self.matrix_client.get_room_typing(matrix_room_id) + typing_count = len(typing_users) + + if online_count > 0: + rooms_online += 1 + + city_online_total += online_count + + room_presences.append( + RoomPresence( + room_id=r["room_id"], + matrix_room_id=matrix_room_id, + online=online_count, + typing=typing_count, + ) + ) + + except Exception as e: + logger.error(f"Error processing room {r['room_id']}: {e}") + # Add room with 0 online + room_presences.append( + RoomPresence( + room_id=r["room_id"], + matrix_room_id=matrix_room_id, + online=0, + typing=0, + ) + ) + + snapshot = PresenceSnapshot( + timestamp=datetime.now(timezone.utc), + city=CityPresence( + online_total=city_online_total, + rooms_online=rooms_online, + ), + rooms=room_presences, + ) + + logger.info(f"Computed snapshot: {city_online_total} online in {rooms_online} rooms") + return snapshot + + async def run_forever(self): + """Main loop - continuously compute and broadcast snapshots""" + self._running = True + logger.info(f"Starting presence aggregator (poll interval: {self.poll_interval_seconds}s)") + + while self._running: + try: + snapshot = await self._compute_snapshot() + self._snapshot = snapshot + await self._broadcast(snapshot) + except Exception as e: + logger.error(f"Error in aggregator loop: {e}") + + await asyncio.sleep(self.poll_interval_seconds) + + def stop(self): + """Stop the aggregator loop""" + self._running = False + logger.info("Stopping presence aggregator") + diff --git a/services/matrix-presence-aggregator/app/config.py b/services/matrix-presence-aggregator/app/config.py new file mode 100644 index 00000000..57a11a53 --- /dev/null +++ b/services/matrix-presence-aggregator/app/config.py @@ -0,0 +1,36 @@ +"""Configuration for Matrix Presence Aggregator""" +from pydantic import BaseModel +import os + + +class Settings(BaseModel): + matrix_base_url: str + matrix_access_token: str + matrix_homeserver_domain: str = "daarion.space" + poll_interval_seconds: int = 5 + + rooms_source: str = "database" # "database" | "static" + db_dsn: str | None = None + rooms_config_path: str | None = None + + http_host: str = "0.0.0.0" + http_port: int = 8085 + + # Filter out presence daemon from member lists + presence_daemon_user: str = "@presence_daemon:daarion.space" + + +def load_settings() -> Settings: + return Settings( + matrix_base_url=os.getenv("MATRIX_BASE_URL", "https://app.daarion.space"), + matrix_access_token=os.getenv("MATRIX_ACCESS_TOKEN", ""), + matrix_homeserver_domain=os.getenv("MATRIX_HOMESERVER_DOMAIN", "daarion.space"), + poll_interval_seconds=int(os.getenv("POLL_INTERVAL_SECONDS", "5")), + rooms_source=os.getenv("ROOMS_SOURCE", "database"), + db_dsn=os.getenv("DB_DSN"), + rooms_config_path=os.getenv("ROOMS_CONFIG"), + http_host=os.getenv("PRESENCE_HTTP_HOST", "0.0.0.0"), + http_port=int(os.getenv("PRESENCE_HTTP_PORT", "8085")), + presence_daemon_user=os.getenv("PRESENCE_DAEMON_USER", "@presence_daemon:daarion.space"), + ) + diff --git a/services/matrix-presence-aggregator/app/main.py b/services/matrix-presence-aggregator/app/main.py new file mode 100644 index 00000000..f0859413 --- /dev/null +++ b/services/matrix-presence-aggregator/app/main.py @@ -0,0 +1,159 @@ +""" +Matrix Presence Aggregator - FastAPI Application + +Provides REST and SSE endpoints for real-time presence data. +""" +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse, StreamingResponse +from fastapi.middleware.cors import CORSMiddleware +import asyncio +import uvicorn +import logging + +from .config import load_settings +from .matrix_client import MatrixClient +from .rooms_source import RoomsSource, StaticRoomsSource +from .aggregator import PresenceAggregator + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + +settings = load_settings() + +app = FastAPI( + title="Matrix Presence Aggregator", + description="Real-time presence aggregation for DAARION City", + version="2.0.0" +) + +# CORS +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Initialize components +matrix_client = MatrixClient( + base_url=settings.matrix_base_url, + access_token=settings.matrix_access_token, + daemon_user=settings.presence_daemon_user, +) + +# Choose rooms source +if settings.rooms_source == "database" and settings.db_dsn: + rooms_source = RoomsSource(db_dsn=settings.db_dsn) + logger.info(f"Using database rooms source: {settings.db_dsn[:30]}...") +elif settings.rooms_source == "static" and settings.rooms_config_path: + rooms_source = StaticRoomsSource(config_path=settings.rooms_config_path) + logger.info(f"Using static rooms source: {settings.rooms_config_path}") +else: + # Fallback to database with default DSN + rooms_source = RoomsSource(db_dsn=settings.db_dsn or "postgresql://postgres:postgres@localhost:5432/postgres") + logger.warning("No rooms source configured, using default database") + +aggregator = PresenceAggregator( + matrix_client=matrix_client, + rooms_source=rooms_source, + poll_interval_seconds=settings.poll_interval_seconds, +) + + +@app.on_event("startup") +async def startup_event(): + logger.info("Starting Matrix Presence Aggregator...") + asyncio.create_task(aggregator.run_forever()) + logger.info("Aggregator task started") + + +@app.on_event("shutdown") +async def shutdown_event(): + logger.info("Shutting down...") + aggregator.stop() + await matrix_client.close() + + +@app.get("/health") +async def health(): + """Health check endpoint""" + snapshot = aggregator.get_snapshot() + return { + "status": "healthy", + "service": "matrix-presence-aggregator", + "has_snapshot": snapshot is not None, + "subscribers": len(aggregator._subscribers), + } + + +@app.get("/presence/summary") +async def get_presence_summary(): + """ + Get current presence snapshot. + + Returns aggregated presence data for all rooms. + """ + snapshot = aggregator.get_snapshot() + if snapshot is None: + return JSONResponse( + content={"status": "initializing", "message": "Waiting for first poll"}, + status_code=503, + ) + return snapshot.model_dump() + + +@app.get("/presence/stream") +async def presence_stream(request: Request): + """ + SSE stream of presence updates. + + Clients receive real-time updates whenever presence changes. + """ + async def event_generator(): + q = aggregator.register_subscriber() + + # Send initial snapshot immediately + initial = aggregator.get_snapshot() + if initial is not None: + yield f"data: {initial.model_dump_json()}\n\n" + + try: + while True: + if await request.is_disconnected(): + break + + try: + snapshot = await asyncio.wait_for(q.get(), timeout=15.0) + yield f"data: {snapshot.model_dump_json()}\n\n" + except asyncio.TimeoutError: + # Keep connection alive + yield ": keep-alive\n\n" + continue + + finally: + aggregator.unregister_subscriber(q) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable nginx buffering + } + ) + + +if __name__ == "__main__": + uvicorn.run( + "app.main:app", + host=settings.http_host, + port=settings.http_port, + reload=True, + ) + diff --git a/services/matrix-presence-aggregator/app/matrix_client.py b/services/matrix-presence-aggregator/app/matrix_client.py new file mode 100644 index 00000000..4afc99d3 --- /dev/null +++ b/services/matrix-presence-aggregator/app/matrix_client.py @@ -0,0 +1,94 @@ +"""Matrix API client for presence aggregation""" +import httpx +from typing import List, Optional +import logging + +logger = logging.getLogger(__name__) + + +class MatrixClient: + """Simplified Matrix client for reading members, presence, and typing""" + + def __init__(self, base_url: str, access_token: str, daemon_user: str = ""): + self.base_url = base_url.rstrip("/") + self.access_token = access_token + self.daemon_user = daemon_user # Filter this user from lists + self._client = httpx.AsyncClient( + base_url=self.base_url, + headers={"Authorization": f"Bearer {self.access_token}"}, + timeout=30.0 + ) + + async def get_room_members(self, room_id: str) -> List[dict]: + """Get all members of a room""" + try: + # Use joined_members for efficiency + res = await self._client.get( + f"/_matrix/client/v3/rooms/{room_id}/joined_members" + ) + res.raise_for_status() + data = res.json() + + # joined_members returns: {"joined": {"@user:domain": {...}}} + joined = data.get("joined", {}) + members = [] + for user_id, info in joined.items(): + # Filter out presence daemon + if user_id == self.daemon_user: + continue + members.append({ + "user_id": user_id, + "display_name": info.get("display_name"), + "avatar_url": info.get("avatar_url"), + }) + return members + except httpx.HTTPError as e: + logger.error(f"Failed to get room members for {room_id}: {e}") + return [] + + async def get_room_typing(self, room_id: str) -> List[str]: + """Get list of currently typing users in a room""" + # Note: Matrix doesn't have a direct API for this + # Typing info comes from /sync, which we'd need to run continuously + # For now, return empty - we'll get typing from sync loop later + return [] + + async def get_presence(self, user_id: str) -> str: + """Get presence status for a user""" + try: + res = await self._client.get( + f"/_matrix/client/v3/presence/{user_id}/status" + ) + if res.status_code != 200: + return "offline" + data = res.json() + return data.get("presence", "offline") + except httpx.HTTPError: + return "offline" + + async def get_presence_batch(self, user_ids: List[str]) -> dict: + """Get presence for multiple users (with caching)""" + # For efficiency, we could batch these or use sync + # For now, simple sequential calls with error handling + result = {} + for user_id in user_ids: + result[user_id] = await self.get_presence(user_id) + return result + + async def join_room(self, room_id_or_alias: str) -> Optional[str]: + """Join a room and return the room_id""" + try: + res = await self._client.post( + f"/_matrix/client/v3/join/{room_id_or_alias}", + json={} + ) + res.raise_for_status() + data = res.json() + return data.get("room_id") + except httpx.HTTPError as e: + logger.error(f"Failed to join room {room_id_or_alias}: {e}") + return None + + async def close(self): + await self._client.aclose() + diff --git a/services/matrix-presence-aggregator/app/models.py b/services/matrix-presence-aggregator/app/models.py new file mode 100644 index 00000000..b54129b7 --- /dev/null +++ b/services/matrix-presence-aggregator/app/models.py @@ -0,0 +1,24 @@ +"""Data models for Presence Aggregator""" +from pydantic import BaseModel +from typing import List +from datetime import datetime + + +class RoomPresence(BaseModel): + room_id: str # internal room id from DB + matrix_room_id: str # Matrix room ID (!xxx:domain) + online: int + typing: int + + +class CityPresence(BaseModel): + online_total: int + rooms_online: int + + +class PresenceSnapshot(BaseModel): + type: str = "presence_update" + timestamp: datetime + city: CityPresence + rooms: List[RoomPresence] + diff --git a/services/matrix-presence-aggregator/app/rooms_source.py b/services/matrix-presence-aggregator/app/rooms_source.py new file mode 100644 index 00000000..1eedf6a8 --- /dev/null +++ b/services/matrix-presence-aggregator/app/rooms_source.py @@ -0,0 +1,69 @@ +"""Room source - reads rooms from database or static config""" +from sqlalchemy import create_engine, text +from typing import List, Dict +import logging +import yaml + +logger = logging.getLogger(__name__) + + +class RoomsSource: + """Reads room list from PostgreSQL database""" + + def __init__(self, db_dsn: str): + self.engine = create_engine(db_dsn) + + def get_rooms(self) -> List[Dict]: + """ + Get all rooms with matrix_room_id set. + + Expected table structure: + - id (text) + - slug (text) + - name (text) + - matrix_room_id (text, nullable) + """ + query = text( + """ + SELECT id, slug, name, matrix_room_id + FROM city_rooms + WHERE matrix_room_id IS NOT NULL + """ + ) + try: + with self.engine.connect() as conn: + rows = conn.execute(query).mappings().all() + + return [ + { + "room_id": str(r["id"]), + "slug": r["slug"], + "title": r["name"], + "matrix_room_id": r["matrix_room_id"], + } + for r in rows + ] + except Exception as e: + logger.error(f"Failed to get rooms from database: {e}") + return [] + + +class StaticRoomsSource: + """Reads room list from YAML config file""" + + def __init__(self, config_path: str): + self.config_path = config_path + self._rooms = self._load_config() + + def _load_config(self) -> List[Dict]: + try: + with open(self.config_path, 'r') as f: + data = yaml.safe_load(f) + return data.get('rooms', []) + except Exception as e: + logger.error(f"Failed to load rooms config: {e}") + return [] + + def get_rooms(self) -> List[Dict]: + return self._rooms + diff --git a/services/matrix-presence-aggregator/config.py b/services/matrix-presence-aggregator/config.py deleted file mode 100644 index 3dccf692..00000000 --- a/services/matrix-presence-aggregator/config.py +++ /dev/null @@ -1,25 +0,0 @@ -"""Configuration for Matrix Presence Aggregator""" -import os -from dotenv import load_dotenv - -load_dotenv() - -# Matrix settings -MATRIX_HS_URL = os.getenv("MATRIX_HS_URL", "https://app.daarion.space") -MATRIX_ACCESS_TOKEN = os.getenv("MATRIX_ACCESS_TOKEN", "") -MATRIX_USER_ID = os.getenv("MATRIX_USER_ID", "@presence_daemon:daarion.space") - -# City Service for room mapping -CITY_SERVICE_URL = os.getenv("CITY_SERVICE_URL", "http://localhost:7001") -INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "super-secret-internal-key") - -# NATS -NATS_URL = os.getenv("NATS_URL", "nats://localhost:4222") - -# Throttling -ROOM_PRESENCE_THROTTLE_MS = int(os.getenv("ROOM_PRESENCE_THROTTLE_MS", "3000")) - -# Sync settings -SYNC_TIMEOUT_MS = int(os.getenv("SYNC_TIMEOUT_MS", "30000")) -ROOM_MAPPING_REFRESH_INTERVAL_S = int(os.getenv("ROOM_MAPPING_REFRESH_INTERVAL_S", "300")) - diff --git a/services/matrix-presence-aggregator/main.py b/services/matrix-presence-aggregator/main.py deleted file mode 100644 index eda5c26b..00000000 --- a/services/matrix-presence-aggregator/main.py +++ /dev/null @@ -1,202 +0,0 @@ -""" -Matrix Presence Aggregator Service - -Aggregates Matrix presence/typing events and publishes to NATS -for real-time city presence in DAARION. -""" -import asyncio -import logging -from contextlib import asynccontextmanager -from typing import Dict - -import httpx -from fastapi import FastAPI - -from config import ( - CITY_SERVICE_URL, - INTERNAL_API_KEY, - ROOM_PRESENCE_THROTTLE_MS, - ROOM_MAPPING_REFRESH_INTERVAL_S -) -from models import PresenceState -from matrix_sync import MatrixSyncClient, get_room_members, join_room -from nats_publisher import PresencePublisher - -# Configure logging -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) - -# Global state -state = PresenceState() -publisher = PresencePublisher() -sync_client: MatrixSyncClient = None - - -async def fetch_room_mappings() -> Dict[str, str]: - """Fetch room_id -> slug mappings from city-service""" - async with httpx.AsyncClient(timeout=30.0) as client: - try: - response = await client.get( - f"{CITY_SERVICE_URL}/api/city/rooms", - headers={"X-Internal-API-Key": INTERNAL_API_KEY} - ) - response.raise_for_status() - rooms = response.json() - - mappings = {} - for room in rooms: - matrix_room_id = room.get("matrix_room_id") - slug = room.get("slug") - if matrix_room_id and slug: - mappings[matrix_room_id] = slug - - logger.info(f"Fetched {len(mappings)} room mappings from city-service") - return mappings - except Exception as e: - logger.error(f"Failed to fetch room mappings: {e}") - return {} - - -async def refresh_room_mappings_loop(): - """Periodically refresh room mappings""" - while True: - try: - mappings = await fetch_room_mappings() - if mappings: - state.set_room_mapping(mappings) - - # Join all mapped rooms - for room_id in mappings.keys(): - await join_room(room_id) - # Fetch initial members - members = await get_room_members(room_id) - for user_id in members: - state.add_room_member(room_id, user_id) - except Exception as e: - logger.error(f"Error refreshing room mappings: {e}") - - await asyncio.sleep(ROOM_MAPPING_REFRESH_INTERVAL_S) - - -async def on_presence(user_id: str, status: str): - """Handle presence update from Matrix""" - affected_slugs = state.update_user_presence(user_id, status) - - # Publish updates for affected rooms - for slug in affected_slugs: - room_id = state.slug_to_room_id.get(slug) - if room_id: - room = state.get_room_presence(room_id) - if room and state.should_publish(room_id, ROOM_PRESENCE_THROTTLE_MS): - await publisher.publish_room_presence(room) - - -async def on_typing(room_id: str, typing_user_ids: list): - """Handle typing update from Matrix""" - slug = state.update_room_typing(room_id, typing_user_ids) - - if slug: - room = state.get_room_presence(room_id) - if room and state.should_publish(room_id, ROOM_PRESENCE_THROTTLE_MS): - await publisher.publish_room_presence(room) - - -async def on_room_member(room_id: str, user_id: str, membership: str): - """Handle membership change from Matrix""" - if membership == "join": - state.add_room_member(room_id, user_id) - else: - state.remove_room_member(room_id, user_id) - - -@asynccontextmanager -async def lifespan(app: FastAPI): - """Startup and shutdown events""" - global sync_client - - # Startup - logger.info("Starting Matrix Presence Aggregator") - - # Connect to NATS - await publisher.connect() - - # Initial room mapping fetch - mappings = await fetch_room_mappings() - if mappings: - state.set_room_mapping(mappings) - # Join all rooms and get initial members - for room_id in mappings.keys(): - await join_room(room_id) - members = await get_room_members(room_id) - for user_id in members: - state.add_room_member(room_id, user_id) - - # Start sync client - sync_client = MatrixSyncClient( - on_presence=on_presence, - on_typing=on_typing, - on_room_member=on_room_member - ) - - # Start background tasks - asyncio.create_task(sync_client.start()) - asyncio.create_task(refresh_room_mappings_loop()) - - logger.info("Matrix Presence Aggregator started successfully") - - yield - - # Shutdown - logger.info("Shutting down Matrix Presence Aggregator") - if sync_client: - await sync_client.stop() - await publisher.disconnect() - - -app = FastAPI( - title="Matrix Presence Aggregator", - description="Aggregates Matrix presence events for DAARION city", - version="1.0.0", - lifespan=lifespan -) - - -@app.get("/health") -async def health(): - """Health check endpoint""" - return { - "status": "healthy", - "service": "matrix-presence-aggregator", - "nats_connected": publisher.is_connected, - "rooms_tracked": len(state.rooms), - "users_tracked": len(state.users) - } - - -@app.get("/status") -async def status(): - """Detailed status endpoint""" - rooms = [] - for room in state.get_all_room_presences(): - rooms.append({ - "slug": room.city_room_slug, - "room_id": room.room_id, - "online_count": room.online_count, - "typing_count": len(room.typing_user_ids) - }) - - return { - "nats_connected": publisher.is_connected, - "sync_running": sync_client.is_running if sync_client else False, - "rooms": rooms, - "total_users_tracked": len(state.users) - } - - -if __name__ == "__main__": - import uvicorn - uvicorn.run(app, host="0.0.0.0", port=7026) - diff --git a/services/matrix-presence-aggregator/matrix_sync.py b/services/matrix-presence-aggregator/matrix_sync.py deleted file mode 100644 index b6309f32..00000000 --- a/services/matrix-presence-aggregator/matrix_sync.py +++ /dev/null @@ -1,174 +0,0 @@ -"""Matrix sync loop for presence aggregation""" -import asyncio -import logging -import httpx -from typing import Optional, Dict, Any, Callable, Awaitable - -from config import ( - MATRIX_HS_URL, - MATRIX_ACCESS_TOKEN, - MATRIX_USER_ID, - SYNC_TIMEOUT_MS -) - -logger = logging.getLogger(__name__) - - -class MatrixSyncClient: - """Client for Matrix /sync endpoint to get presence and typing events""" - - def __init__( - self, - on_presence: Callable[[str, str], Awaitable[None]], - on_typing: Callable[[str, list], Awaitable[None]], - on_room_member: Callable[[str, str, str], Awaitable[None]], - ): - self.base_url = MATRIX_HS_URL - self.access_token = MATRIX_ACCESS_TOKEN - self.user_id = MATRIX_USER_ID - self.since_token: Optional[str] = None - self.is_running = False - - # Callbacks - self.on_presence = on_presence # (user_id, status) - self.on_typing = on_typing # (room_id, typing_user_ids) - self.on_room_member = on_room_member # (room_id, user_id, membership) - - # Sync filter - self.filter = { - "presence": { - "types": ["m.presence"] - }, - "room": { - "timeline": {"limit": 0}, - "state": { - "types": ["m.room.member"], - "lazy_load_members": True - }, - "ephemeral": { - "types": ["m.typing"] - } - } - } - - async def start(self): - """Start the sync loop""" - self.is_running = True - logger.info(f"Starting Matrix sync loop as {self.user_id}") - - async with httpx.AsyncClient(timeout=60.0) as client: - while self.is_running: - try: - await self._sync_once(client) - except httpx.TimeoutException: - logger.debug("Sync timeout (normal for long-polling)") - except httpx.HTTPStatusError as e: - logger.error(f"HTTP error during sync: {e.response.status_code}") - await asyncio.sleep(5) - except Exception as e: - logger.error(f"Sync error: {e}") - await asyncio.sleep(5) - - async def stop(self): - """Stop the sync loop""" - self.is_running = False - logger.info("Stopping Matrix sync loop") - - async def _sync_once(self, client: httpx.AsyncClient): - """Perform one sync request""" - import json - - params = { - "timeout": str(SYNC_TIMEOUT_MS), - "filter": json.dumps(self.filter) - } - - if self.since_token: - params["since"] = self.since_token - - response = await client.get( - f"{self.base_url}/_matrix/client/v3/sync", - params=params, - headers={"Authorization": f"Bearer {self.access_token}"} - ) - response.raise_for_status() - data = response.json() - - # Update since token - self.since_token = data.get("next_batch") - - # Process presence events - await self._process_presence(data.get("presence", {}).get("events", [])) - - # Process room events - rooms = data.get("rooms", {}) - await self._process_rooms(rooms.get("join", {})) - - async def _process_presence(self, events: list): - """Process m.presence events""" - for event in events: - if event.get("type") != "m.presence": - continue - - user_id = event.get("sender") - content = event.get("content", {}) - status = content.get("presence", "offline") - - if user_id: - logger.debug(f"Presence update: {user_id} -> {status}") - await self.on_presence(user_id, status) - - async def _process_rooms(self, joined_rooms: Dict[str, Any]): - """Process room events (typing, membership)""" - for room_id, room_data in joined_rooms.items(): - # Process ephemeral events (typing) - ephemeral = room_data.get("ephemeral", {}).get("events", []) - for event in ephemeral: - if event.get("type") == "m.typing": - typing_users = event.get("content", {}).get("user_ids", []) - logger.debug(f"Typing in {room_id}: {typing_users}") - await self.on_typing(room_id, typing_users) - - # Process state events (membership) - state = room_data.get("state", {}).get("events", []) - for event in state: - if event.get("type") == "m.room.member": - user_id = event.get("state_key") - membership = event.get("content", {}).get("membership", "leave") - if user_id: - logger.debug(f"Membership: {user_id} in {room_id} -> {membership}") - await self.on_room_member(room_id, user_id, membership) - - -async def get_room_members(room_id: str) -> list: - """Get current members of a room""" - async with httpx.AsyncClient(timeout=30.0) as client: - try: - response = await client.get( - f"{MATRIX_HS_URL}/_matrix/client/v3/rooms/{room_id}/joined_members", - headers={"Authorization": f"Bearer {MATRIX_ACCESS_TOKEN}"} - ) - response.raise_for_status() - data = response.json() - return list(data.get("joined", {}).keys()) - except Exception as e: - logger.error(f"Failed to get room members for {room_id}: {e}") - return [] - - -async def join_room(room_id_or_alias: str) -> Optional[str]: - """Join a room and return the room_id""" - async with httpx.AsyncClient(timeout=30.0) as client: - try: - response = await client.post( - f"{MATRIX_HS_URL}/_matrix/client/v3/join/{room_id_or_alias}", - headers={"Authorization": f"Bearer {MATRIX_ACCESS_TOKEN}"}, - json={} - ) - response.raise_for_status() - data = response.json() - return data.get("room_id") - except Exception as e: - logger.error(f"Failed to join room {room_id_or_alias}: {e}") - return None - diff --git a/services/matrix-presence-aggregator/models.py b/services/matrix-presence-aggregator/models.py deleted file mode 100644 index cf8e8b0b..00000000 --- a/services/matrix-presence-aggregator/models.py +++ /dev/null @@ -1,135 +0,0 @@ -"""Data models for Presence Aggregator""" -from dataclasses import dataclass, field -from typing import Dict, List, Set, Optional -from datetime import datetime -import time - - -@dataclass -class UserPresence: - user_id: str # "@user:domain" - status: str # "online" | "offline" | "unavailable" - last_active_ts: float = field(default_factory=time.time) - - -@dataclass -class RoomPresence: - room_id: str # "!....:daarion.space" - alias: Optional[str] = None # "#city_energy:daarion.space" - city_room_slug: Optional[str] = None # "energy" - online_count: int = 0 - typing_user_ids: List[str] = field(default_factory=list) - last_event_ts: float = field(default_factory=time.time) - last_published_ts: float = 0 # For throttling - - -class PresenceState: - """In-memory state for presence aggregation""" - - def __init__(self): - self.users: Dict[str, UserPresence] = {} - self.rooms: Dict[str, RoomPresence] = {} - self.room_members: Dict[str, Set[str]] = {} # room_id -> set of user_ids - self.room_id_to_slug: Dict[str, str] = {} # matrix_room_id -> city_room_slug - self.slug_to_room_id: Dict[str, str] = {} # city_room_slug -> matrix_room_id - - def update_user_presence(self, user_id: str, status: str) -> List[str]: - """ - Update user presence and return list of affected room slugs - """ - prev_status = self.users.get(user_id, UserPresence(user_id, "offline")).status - self.users[user_id] = UserPresence(user_id, status) - - # Find rooms where this user is a member - affected_slugs = [] - for room_id, members in self.room_members.items(): - if user_id in members: - slug = self.room_id_to_slug.get(room_id) - if slug: - # Recalculate online count for this room - self._recalculate_room_online_count(room_id) - affected_slugs.append(slug) - - return affected_slugs - - def update_room_typing(self, room_id: str, typing_user_ids: List[str]) -> Optional[str]: - """ - Update typing users for a room and return the slug if changed - """ - if room_id not in self.rooms: - slug = self.room_id_to_slug.get(room_id) - if slug: - self.rooms[room_id] = RoomPresence(room_id, city_room_slug=slug) - else: - return None - - room = self.rooms[room_id] - if room.typing_user_ids != typing_user_ids: - room.typing_user_ids = typing_user_ids - room.last_event_ts = time.time() - return room.city_room_slug - - return None - - def add_room_member(self, room_id: str, user_id: str): - """Add a user to a room's member list""" - if room_id not in self.room_members: - self.room_members[room_id] = set() - self.room_members[room_id].add(user_id) - - def remove_room_member(self, room_id: str, user_id: str): - """Remove a user from a room's member list""" - if room_id in self.room_members: - self.room_members[room_id].discard(user_id) - - def _recalculate_room_online_count(self, room_id: str): - """Recalculate online count for a room based on member presence""" - if room_id not in self.rooms: - slug = self.room_id_to_slug.get(room_id) - if slug: - self.rooms[room_id] = RoomPresence(room_id, city_room_slug=slug) - else: - return - - members = self.room_members.get(room_id, set()) - online_count = 0 - for user_id in members: - user = self.users.get(user_id) - if user and user.status in ("online", "unavailable"): - online_count += 1 - - self.rooms[room_id].online_count = online_count - self.rooms[room_id].last_event_ts = time.time() - - def get_room_presence(self, room_id: str) -> Optional[RoomPresence]: - """Get presence info for a room""" - return self.rooms.get(room_id) - - def get_all_room_presences(self) -> List[RoomPresence]: - """Get presence info for all tracked rooms""" - return list(self.rooms.values()) - - def set_room_mapping(self, mappings: Dict[str, str]): - """Set room_id -> slug mapping""" - self.room_id_to_slug = mappings - self.slug_to_room_id = {v: k for k, v in mappings.items()} - - # Initialize RoomPresence for all mapped rooms - for room_id, slug in mappings.items(): - if room_id not in self.rooms: - self.rooms[room_id] = RoomPresence(room_id, city_room_slug=slug) - else: - self.rooms[room_id].city_room_slug = slug - - def should_publish(self, room_id: str, throttle_ms: int) -> bool: - """Check if we should publish an event (throttling)""" - room = self.rooms.get(room_id) - if not room: - return False - - now = time.time() * 1000 # ms - if now - room.last_published_ts >= throttle_ms: - room.last_published_ts = now - return True - return False - diff --git a/services/matrix-presence-aggregator/nats_publisher.py b/services/matrix-presence-aggregator/nats_publisher.py deleted file mode 100644 index 9b9bf9a8..00000000 --- a/services/matrix-presence-aggregator/nats_publisher.py +++ /dev/null @@ -1,87 +0,0 @@ -"""NATS publisher for presence events""" -import asyncio -import json -import logging -from typing import Optional -import nats -from nats.aio.client import Client as NATS - -from config import NATS_URL -from models import RoomPresence - -logger = logging.getLogger(__name__) - - -class PresencePublisher: - """Publishes presence events to NATS""" - - def __init__(self): - self.nc: Optional[NATS] = None - self.is_connected = False - - async def connect(self): - """Connect to NATS""" - try: - self.nc = await nats.connect(NATS_URL) - self.is_connected = True - logger.info(f"Connected to NATS at {NATS_URL}") - except Exception as e: - logger.error(f"Failed to connect to NATS: {e}") - self.is_connected = False - - async def disconnect(self): - """Disconnect from NATS""" - if self.nc: - await self.nc.drain() - self.is_connected = False - logger.info("Disconnected from NATS") - - async def publish_room_presence(self, room: RoomPresence): - """Publish room presence event""" - if not self.is_connected or not self.nc: - logger.warning("Not connected to NATS, skipping publish") - return - - if not room.city_room_slug: - logger.debug(f"Room {room.room_id} has no slug, skipping") - return - - subject = f"city.presence.room.{room.city_room_slug}" - payload = { - "type": "room.presence", - "room_slug": room.city_room_slug, - "matrix_room_id": room.room_id, - "matrix_room_alias": room.alias, - "online_count": room.online_count, - "typing_count": len(room.typing_user_ids), - "typing_users": room.typing_user_ids, - "last_event_ts": int(room.last_event_ts * 1000) - } - - try: - await self.nc.publish(subject, json.dumps(payload).encode()) - logger.debug(f"Published to {subject}: online={room.online_count}, typing={len(room.typing_user_ids)}") - except Exception as e: - logger.error(f"Failed to publish to NATS: {e}") - - async def publish_user_presence(self, user_id: str, status: str, last_active_ts: float): - """Publish user presence event""" - if not self.is_connected or not self.nc: - return - - # Extract localpart from @user:domain - localpart = user_id.split(":")[0].lstrip("@") - subject = f"city.presence.user.{localpart}" - payload = { - "type": "user.presence", - "matrix_user_id": user_id, - "status": status, - "last_active_ts": int(last_active_ts * 1000) - } - - try: - await self.nc.publish(subject, json.dumps(payload).encode()) - logger.debug(f"Published user presence: {user_id} -> {status}") - except Exception as e: - logger.error(f"Failed to publish user presence: {e}") - diff --git a/services/matrix-presence-aggregator/requirements.txt b/services/matrix-presence-aggregator/requirements.txt index 9af9b05c..35cfd989 100644 --- a/services/matrix-presence-aggregator/requirements.txt +++ b/services/matrix-presence-aggregator/requirements.txt @@ -1,7 +1,8 @@ -fastapi==0.109.0 -uvicorn==0.27.0 -httpx==0.26.0 -nats-py==2.6.0 -python-dotenv==1.0.0 -pydantic==2.5.3 +fastapi==0.115.0 +uvicorn[standard]==0.30.0 +httpx==0.27.0 +pydantic==2.9.0 +psycopg2-binary==2.9.9 +SQLAlchemy==2.0.35 +python-dotenv==1.0.1