- matrix-gateway: POST /internal/matrix/presence/online endpoint - usePresenceHeartbeat hook with activity tracking - Auto away after 5 min inactivity - Offline on page close/visibility change - Integrated in MatrixChatRoom component
178 lines
6.3 KiB
Python
178 lines
6.3 KiB
Python
"""
|
|
WebSocket — Live Agent Events Stream
|
|
Phase 6: Real-time event streaming
|
|
"""
|
|
from fastapi import WebSocket, WebSocketDisconnect, APIRouter
|
|
from typing import Set, Dict, Optional
|
|
import asyncio
|
|
import json
|
|
from datetime import datetime
|
|
|
|
from models import WSAgentEvent
|
|
from repository_events import EventRepository
|
|
|
|
router = APIRouter(tags=["websocket"])
|
|
|
|
# Global state for WebSocket connections
|
|
class ConnectionManager:
|
|
def __init__(self):
|
|
self.active_connections: Dict[str, Set[WebSocket]] = {}
|
|
self.all_connections: Set[WebSocket] = set()
|
|
self.event_queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
async def connect(self, websocket: WebSocket, agent_id: Optional[str] = None):
|
|
"""Accept WebSocket connection"""
|
|
await websocket.accept()
|
|
|
|
if agent_id:
|
|
if agent_id not in self.active_connections:
|
|
self.active_connections[agent_id] = set()
|
|
self.active_connections[agent_id].add(websocket)
|
|
else:
|
|
# Subscribe to all agents
|
|
self.all_connections.add(websocket)
|
|
|
|
print(f"✅ WS connected: {agent_id or 'ALL'} (total: {self.get_connection_count()})")
|
|
|
|
def disconnect(self, websocket: WebSocket, agent_id: Optional[str] = None):
|
|
"""Remove WebSocket connection"""
|
|
if agent_id and agent_id in self.active_connections:
|
|
self.active_connections[agent_id].discard(websocket)
|
|
if not self.active_connections[agent_id]:
|
|
del self.active_connections[agent_id]
|
|
else:
|
|
self.all_connections.discard(websocket)
|
|
|
|
print(f"❌ WS disconnected: {agent_id or 'ALL'} (total: {self.get_connection_count()})")
|
|
|
|
async def broadcast_event(self, agent_id: str, event: WSAgentEvent):
|
|
"""Broadcast event to subscribers"""
|
|
message = event.json()
|
|
|
|
# Send to agent-specific subscribers
|
|
if agent_id in self.active_connections:
|
|
dead_connections = set()
|
|
for connection in self.active_connections[agent_id]:
|
|
try:
|
|
await connection.send_text(message)
|
|
except Exception:
|
|
dead_connections.add(connection)
|
|
|
|
# Clean up dead connections
|
|
for conn in dead_connections:
|
|
self.disconnect(conn, agent_id)
|
|
|
|
# Send to "all agents" subscribers
|
|
dead_connections = set()
|
|
for connection in self.all_connections:
|
|
try:
|
|
await connection.send_text(message)
|
|
except Exception:
|
|
dead_connections.add(connection)
|
|
|
|
for conn in dead_connections:
|
|
self.disconnect(conn, None)
|
|
|
|
def get_connection_count(self) -> int:
|
|
"""Get total active connections"""
|
|
count = len(self.all_connections)
|
|
for connections in self.active_connections.values():
|
|
count += len(connections)
|
|
return count
|
|
|
|
async def push_event_to_queue(self, agent_id: str, event_kind: str, payload: dict):
|
|
"""Push event to queue (called from nats_subscriber or routes)"""
|
|
event = WSAgentEvent(
|
|
type="agent_event",
|
|
agent_id=agent_id,
|
|
ts=datetime.utcnow().isoformat(),
|
|
kind=event_kind,
|
|
payload=payload
|
|
)
|
|
await self.event_queue.put((agent_id, event))
|
|
|
|
|
|
manager = ConnectionManager()
|
|
|
|
# ============================================================================
|
|
# WebSocket Endpoint
|
|
# ============================================================================
|
|
|
|
@router.websocket("/ws/agents/stream")
|
|
async def websocket_agent_events(websocket: WebSocket, agent_id: Optional[str] = None):
|
|
"""
|
|
WebSocket endpoint for live agent events
|
|
|
|
Query params:
|
|
- agent_id: subscribe to specific agent (optional)
|
|
|
|
If agent_id is None, subscribe to all agents
|
|
"""
|
|
await manager.connect(websocket, agent_id)
|
|
|
|
try:
|
|
# Keep connection alive and send events
|
|
while True:
|
|
# Wait for events from queue
|
|
try:
|
|
event_agent_id, event = await asyncio.wait_for(
|
|
manager.event_queue.get(),
|
|
timeout=30.0 # 30-second timeout for ping
|
|
)
|
|
|
|
# If subscribed to specific agent, only send its events
|
|
if agent_id is None or event_agent_id == agent_id:
|
|
await websocket.send_text(event.json())
|
|
|
|
except asyncio.TimeoutError:
|
|
# Send ping to keep connection alive
|
|
await websocket.send_json({"type": "ping", "ts": datetime.utcnow().isoformat()})
|
|
|
|
except WebSocketDisconnect:
|
|
manager.disconnect(websocket, agent_id)
|
|
except Exception as e:
|
|
print(f"⚠️ WebSocket error: {e}")
|
|
manager.disconnect(websocket, agent_id)
|
|
|
|
# ============================================================================
|
|
# Background Task — Event Queue Consumer
|
|
# ============================================================================
|
|
|
|
async def event_queue_consumer():
|
|
"""
|
|
Background task that consumes events from queue and broadcasts to WS clients
|
|
|
|
This runs in background alongside the main FastAPI app
|
|
"""
|
|
print("🚀 Event queue consumer started")
|
|
|
|
while True:
|
|
try:
|
|
# Get event from queue (with timeout to prevent blocking)
|
|
agent_id, event = await asyncio.wait_for(
|
|
manager.event_queue.get(),
|
|
timeout=1.0
|
|
)
|
|
|
|
# Broadcast to WebSocket clients
|
|
await manager.broadcast_event(agent_id, event)
|
|
|
|
except asyncio.TimeoutError:
|
|
# No events in queue, continue
|
|
continue
|
|
except Exception as e:
|
|
print(f"⚠️ Event queue consumer error: {e}")
|
|
await asyncio.sleep(1)
|
|
|
|
# ============================================================================
|
|
# Helper Functions (for use in other modules)
|
|
# ============================================================================
|
|
|
|
async def push_event_to_ws(agent_id: str, event_kind: str, payload: dict = None):
|
|
"""
|
|
Push event to WebSocket stream
|
|
Called from routes_agents or nats_subscriber
|
|
"""
|
|
await manager.push_event_to_queue(agent_id, event_kind, payload or {})
|
|
|