- matrix-gateway: POST /internal/matrix/presence/online endpoint - usePresenceHeartbeat hook with activity tracking - Auto away after 5 min inactivity - Offline on page close/visibility change - Integrated in MatrixChatRoom component
281 lines
8.8 KiB
Python
281 lines
8.8 KiB
Python
"""
|
|
NATS Subscriber — Listen to agent activity events
|
|
Phase 6: Subscribe to usage.agent, usage.llm, agent.reply.sent, agent.error
|
|
"""
|
|
import asyncio
|
|
import json
|
|
from typing import Optional
|
|
from nats.aio.client import Client as NATS
|
|
from repository_events import EventRepository
|
|
from ws_events import push_event_to_ws
|
|
|
|
class NATSSubscriber:
|
|
def __init__(self, nats_url: str, event_repo: EventRepository):
|
|
self.nats_url = nats_url
|
|
self.event_repo = event_repo
|
|
self.nc: Optional[NATS] = None
|
|
|
|
async def connect(self):
|
|
"""Connect to NATS"""
|
|
self.nc = NATS()
|
|
await self.nc.connect(self.nats_url)
|
|
print(f"✅ NATS connected: {self.nats_url}")
|
|
|
|
async def subscribe_all(self):
|
|
"""Subscribe to all agent-related subjects"""
|
|
if not self.nc:
|
|
raise RuntimeError("NATS not connected")
|
|
|
|
# Subscribe to usage.agent (invocations)
|
|
await self.nc.subscribe("usage.agent", cb=self._handle_usage_agent)
|
|
print("✅ Subscribed to: usage.agent")
|
|
|
|
# Subscribe to usage.llm (LLM calls)
|
|
await self.nc.subscribe("usage.llm", cb=self._handle_usage_llm)
|
|
print("✅ Subscribed to: usage.llm")
|
|
|
|
# Subscribe to usage.tool (tool calls)
|
|
await self.nc.subscribe("usage.tool", cb=self._handle_usage_tool)
|
|
print("✅ Subscribed to: usage.tool")
|
|
|
|
# Subscribe to agent.reply.sent (replies)
|
|
await self.nc.subscribe("agent.reply.sent", cb=self._handle_agent_reply)
|
|
print("✅ Subscribed to: agent.reply.sent")
|
|
|
|
# Subscribe to agent.error (errors)
|
|
await self.nc.subscribe("agent.error", cb=self._handle_agent_error)
|
|
print("✅ Subscribed to: agent.error")
|
|
|
|
# ========================================================================
|
|
# Handlers
|
|
# ========================================================================
|
|
|
|
async def _handle_usage_agent(self, msg):
|
|
"""
|
|
Handle usage.agent events (invocations)
|
|
|
|
Example payload:
|
|
{
|
|
"agent_id": "agent:sofia",
|
|
"ts": "2025-11-24T10:30:00Z",
|
|
"kind": "invocation",
|
|
"channel_id": "channel:123",
|
|
"microdao_id": "microdao:7"
|
|
}
|
|
"""
|
|
try:
|
|
data = json.loads(msg.data.decode())
|
|
agent_id = data.get("agent_id")
|
|
|
|
if not agent_id:
|
|
return
|
|
|
|
# Log to DB
|
|
await self.event_repo.log_event(
|
|
agent_external_id=agent_id,
|
|
kind="invocation",
|
|
channel_id=data.get("channel_id"),
|
|
payload={
|
|
"microdao_id": data.get("microdao_id"),
|
|
"ts": data.get("ts")
|
|
}
|
|
)
|
|
|
|
# Push to WebSocket
|
|
await push_event_to_ws(
|
|
agent_id=agent_id,
|
|
event_kind="invocation",
|
|
payload={
|
|
"channel_id": data.get("channel_id"),
|
|
"ts": data.get("ts")
|
|
}
|
|
)
|
|
|
|
print(f"📥 Event: {agent_id} → invocation")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Error handling usage.agent: {e}")
|
|
|
|
async def _handle_usage_llm(self, msg):
|
|
"""
|
|
Handle usage.llm events (LLM calls)
|
|
|
|
Example payload:
|
|
{
|
|
"agent_id": "agent:sofia",
|
|
"model": "gpt-4.1-mini",
|
|
"tokens_input": 150,
|
|
"tokens_output": 80,
|
|
"latency_ms": 320
|
|
}
|
|
"""
|
|
try:
|
|
data = json.loads(msg.data.decode())
|
|
agent_id = data.get("agent_id")
|
|
|
|
if not agent_id:
|
|
return
|
|
|
|
# Log to DB (optional — might be too verbose)
|
|
# await self.event_repo.log_event(
|
|
# agent_external_id=agent_id,
|
|
# kind="llm_call",
|
|
# payload=data
|
|
# )
|
|
|
|
# Push to WebSocket (live activity)
|
|
await push_event_to_ws(
|
|
agent_id=agent_id,
|
|
event_kind="llm_call",
|
|
payload={
|
|
"model": data.get("model"),
|
|
"tokens": data.get("tokens_input", 0) + data.get("tokens_output", 0),
|
|
"latency_ms": data.get("latency_ms")
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Error handling usage.llm: {e}")
|
|
|
|
async def _handle_usage_tool(self, msg):
|
|
"""
|
|
Handle usage.tool events (tool calls)
|
|
|
|
Example payload:
|
|
{
|
|
"agent_id": "agent:sofia",
|
|
"tool_id": "projects.list",
|
|
"success": true,
|
|
"latency_ms": 50
|
|
}
|
|
"""
|
|
try:
|
|
data = json.loads(msg.data.decode())
|
|
agent_id = data.get("agent_id")
|
|
|
|
if not agent_id:
|
|
return
|
|
|
|
# Log to DB
|
|
await self.event_repo.log_event(
|
|
agent_external_id=agent_id,
|
|
kind="tool_call",
|
|
payload={
|
|
"tool_id": data.get("tool_id"),
|
|
"success": data.get("success"),
|
|
"latency_ms": data.get("latency_ms")
|
|
}
|
|
)
|
|
|
|
# Push to WebSocket
|
|
await push_event_to_ws(
|
|
agent_id=agent_id,
|
|
event_kind="tool_call",
|
|
payload={
|
|
"tool_id": data.get("tool_id"),
|
|
"success": data.get("success")
|
|
}
|
|
)
|
|
|
|
print(f"📥 Event: {agent_id} → tool_call ({data.get('tool_id')})")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Error handling usage.tool: {e}")
|
|
|
|
async def _handle_agent_reply(self, msg):
|
|
"""
|
|
Handle agent.reply.sent events
|
|
|
|
Example payload:
|
|
{
|
|
"agent_id": "agent:sofia",
|
|
"channel_id": "channel:123",
|
|
"message_preview": "Here are your projects...",
|
|
"ts": "2025-11-24T10:30:05Z"
|
|
}
|
|
"""
|
|
try:
|
|
data = json.loads(msg.data.decode())
|
|
agent_id = data.get("agent_id")
|
|
|
|
if not agent_id:
|
|
return
|
|
|
|
# Log to DB
|
|
await self.event_repo.log_event(
|
|
agent_external_id=agent_id,
|
|
kind="reply_sent",
|
|
channel_id=data.get("channel_id"),
|
|
payload={
|
|
"message_preview": data.get("message_preview", "")[:200],
|
|
"ts": data.get("ts")
|
|
}
|
|
)
|
|
|
|
# Push to WebSocket
|
|
await push_event_to_ws(
|
|
agent_id=agent_id,
|
|
event_kind="reply_sent",
|
|
payload={
|
|
"channel_id": data.get("channel_id"),
|
|
"message_preview": data.get("message_preview", "")[:50]
|
|
}
|
|
)
|
|
|
|
print(f"📥 Event: {agent_id} → reply_sent")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Error handling agent.reply.sent: {e}")
|
|
|
|
async def _handle_agent_error(self, msg):
|
|
"""
|
|
Handle agent.error events
|
|
|
|
Example payload:
|
|
{
|
|
"agent_id": "agent:sofia",
|
|
"error_type": "LLM_ERROR",
|
|
"error_message": "Rate limit exceeded",
|
|
"ts": "2025-11-24T10:30:00Z"
|
|
}
|
|
"""
|
|
try:
|
|
data = json.loads(msg.data.decode())
|
|
agent_id = data.get("agent_id")
|
|
|
|
if not agent_id:
|
|
return
|
|
|
|
# Log to DB
|
|
await self.event_repo.log_event(
|
|
agent_external_id=agent_id,
|
|
kind="error",
|
|
payload={
|
|
"error_type": data.get("error_type"),
|
|
"error_message": data.get("error_message"),
|
|
"ts": data.get("ts")
|
|
}
|
|
)
|
|
|
|
# Push to WebSocket
|
|
await push_event_to_ws(
|
|
agent_id=agent_id,
|
|
event_kind="error",
|
|
payload={
|
|
"error_type": data.get("error_type"),
|
|
"error_message": data.get("error_message", "")[:100]
|
|
}
|
|
)
|
|
|
|
print(f"📥 Event: {agent_id} → error ({data.get('error_type')})")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Error handling agent.error: {e}")
|
|
|
|
async def close(self):
|
|
"""Close NATS connection"""
|
|
if self.nc:
|
|
await self.nc.close()
|
|
print("❌ NATS closed")
|
|
|