Files
microdao-daarion/services/agents-service/ws_events.py
Apple 3de3c8cb36 feat: Add presence heartbeat for Matrix online status
- matrix-gateway: POST /internal/matrix/presence/online endpoint
- usePresenceHeartbeat hook with activity tracking
- Auto away after 5 min inactivity
- Offline on page close/visibility change
- Integrated in MatrixChatRoom component
2025-11-27 00:19:40 -08:00

178 lines
6.3 KiB
Python

"""
WebSocket — Live Agent Events Stream
Phase 6: Real-time event streaming
"""
from fastapi import WebSocket, WebSocketDisconnect, APIRouter
from typing import Set, Dict, Optional
import asyncio
import json
from datetime import datetime
from models import WSAgentEvent
from repository_events import EventRepository
router = APIRouter(tags=["websocket"])
# Global state for WebSocket connections
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, Set[WebSocket]] = {}
self.all_connections: Set[WebSocket] = set()
self.event_queue: asyncio.Queue = asyncio.Queue()
async def connect(self, websocket: WebSocket, agent_id: Optional[str] = None):
"""Accept WebSocket connection"""
await websocket.accept()
if agent_id:
if agent_id not in self.active_connections:
self.active_connections[agent_id] = set()
self.active_connections[agent_id].add(websocket)
else:
# Subscribe to all agents
self.all_connections.add(websocket)
print(f"✅ WS connected: {agent_id or 'ALL'} (total: {self.get_connection_count()})")
def disconnect(self, websocket: WebSocket, agent_id: Optional[str] = None):
"""Remove WebSocket connection"""
if agent_id and agent_id in self.active_connections:
self.active_connections[agent_id].discard(websocket)
if not self.active_connections[agent_id]:
del self.active_connections[agent_id]
else:
self.all_connections.discard(websocket)
print(f"❌ WS disconnected: {agent_id or 'ALL'} (total: {self.get_connection_count()})")
async def broadcast_event(self, agent_id: str, event: WSAgentEvent):
"""Broadcast event to subscribers"""
message = event.json()
# Send to agent-specific subscribers
if agent_id in self.active_connections:
dead_connections = set()
for connection in self.active_connections[agent_id]:
try:
await connection.send_text(message)
except Exception:
dead_connections.add(connection)
# Clean up dead connections
for conn in dead_connections:
self.disconnect(conn, agent_id)
# Send to "all agents" subscribers
dead_connections = set()
for connection in self.all_connections:
try:
await connection.send_text(message)
except Exception:
dead_connections.add(connection)
for conn in dead_connections:
self.disconnect(conn, None)
def get_connection_count(self) -> int:
"""Get total active connections"""
count = len(self.all_connections)
for connections in self.active_connections.values():
count += len(connections)
return count
async def push_event_to_queue(self, agent_id: str, event_kind: str, payload: dict):
"""Push event to queue (called from nats_subscriber or routes)"""
event = WSAgentEvent(
type="agent_event",
agent_id=agent_id,
ts=datetime.utcnow().isoformat(),
kind=event_kind,
payload=payload
)
await self.event_queue.put((agent_id, event))
manager = ConnectionManager()
# ============================================================================
# WebSocket Endpoint
# ============================================================================
@router.websocket("/ws/agents/stream")
async def websocket_agent_events(websocket: WebSocket, agent_id: Optional[str] = None):
"""
WebSocket endpoint for live agent events
Query params:
- agent_id: subscribe to specific agent (optional)
If agent_id is None, subscribe to all agents
"""
await manager.connect(websocket, agent_id)
try:
# Keep connection alive and send events
while True:
# Wait for events from queue
try:
event_agent_id, event = await asyncio.wait_for(
manager.event_queue.get(),
timeout=30.0 # 30-second timeout for ping
)
# If subscribed to specific agent, only send its events
if agent_id is None or event_agent_id == agent_id:
await websocket.send_text(event.json())
except asyncio.TimeoutError:
# Send ping to keep connection alive
await websocket.send_json({"type": "ping", "ts": datetime.utcnow().isoformat()})
except WebSocketDisconnect:
manager.disconnect(websocket, agent_id)
except Exception as e:
print(f"⚠️ WebSocket error: {e}")
manager.disconnect(websocket, agent_id)
# ============================================================================
# Background Task — Event Queue Consumer
# ============================================================================
async def event_queue_consumer():
"""
Background task that consumes events from queue and broadcasts to WS clients
This runs in background alongside the main FastAPI app
"""
print("🚀 Event queue consumer started")
while True:
try:
# Get event from queue (with timeout to prevent blocking)
agent_id, event = await asyncio.wait_for(
manager.event_queue.get(),
timeout=1.0
)
# Broadcast to WebSocket clients
await manager.broadcast_event(agent_id, event)
except asyncio.TimeoutError:
# No events in queue, continue
continue
except Exception as e:
print(f"⚠️ Event queue consumer error: {e}")
await asyncio.sleep(1)
# ============================================================================
# Helper Functions (for use in other modules)
# ============================================================================
async def push_event_to_ws(agent_id: str, event_kind: str, payload: dict = None):
"""
Push event to WebSocket stream
Called from routes_agents or nats_subscriber
"""
await manager.push_event_to_queue(agent_id, event_kind, payload or {})