- matrix-gateway: POST /internal/matrix/presence/online endpoint - usePresenceHeartbeat hook with activity tracking - Auto away after 5 min inactivity - Offline on page close/visibility change - Integrated in MatrixChatRoom component
117 lines
3.8 KiB
Python
117 lines
3.8 KiB
Python
"""
|
|
NATS Subscriber for Living Map events
|
|
Phase 9: Living Map
|
|
"""
|
|
import json
|
|
import asyncio
|
|
from nats.aio.client import Client as NATS
|
|
from typing import Callable, Optional
|
|
from repository_history import HistoryRepository
|
|
|
|
class NATSSubscriber:
|
|
"""Subscribe to NATS subjects and log events"""
|
|
|
|
def __init__(self, nats_url: str, history_repo: HistoryRepository):
|
|
self.nats_url = nats_url
|
|
self.history_repo = history_repo
|
|
self.nc: Optional[NATS] = None
|
|
self.subscriptions = []
|
|
self.event_callback: Optional[Callable] = None
|
|
|
|
async def connect(self):
|
|
"""Connect to NATS"""
|
|
self.nc = NATS()
|
|
await self.nc.connect(self.nats_url)
|
|
print(f"✅ NATS connected: {self.nats_url}")
|
|
|
|
async def subscribe_all(self, event_callback: Optional[Callable] = None):
|
|
"""Subscribe to all Living Map relevant subjects"""
|
|
if not self.nc:
|
|
raise RuntimeError("NATS not connected")
|
|
|
|
self.event_callback = event_callback
|
|
|
|
subjects = [
|
|
"city.event.*",
|
|
"dao.event.*",
|
|
"microdao.event.*",
|
|
"node.metrics.*",
|
|
"agent.event.*",
|
|
"usage.llm.*",
|
|
"usage.agent.*",
|
|
"messaging.message.created"
|
|
]
|
|
|
|
for subject in subjects:
|
|
try:
|
|
sub = await self.nc.subscribe(subject, cb=self._handle_message)
|
|
self.subscriptions.append(sub)
|
|
print(f"📡 Subscribed to: {subject}")
|
|
except Exception as e:
|
|
print(f"⚠️ Failed to subscribe to {subject}: {e}")
|
|
|
|
async def _handle_message(self, msg):
|
|
"""Handle incoming NATS message"""
|
|
try:
|
|
# Decode payload
|
|
payload = json.loads(msg.data.decode())
|
|
subject = msg.subject
|
|
|
|
# Extract entity info
|
|
entity_id = payload.get("id") or payload.get("entity_id") or payload.get("agent_id") or payload.get("dao_id")
|
|
entity_type = self._infer_entity_type(subject)
|
|
|
|
# Log to history
|
|
await self.history_repo.add_event(
|
|
event_type=subject,
|
|
payload=payload,
|
|
source_service=self._extract_service(subject),
|
|
entity_id=entity_id,
|
|
entity_type=entity_type
|
|
)
|
|
|
|
# Notify callback (for WebSocket broadcast)
|
|
if self.event_callback:
|
|
await self.event_callback({
|
|
"kind": "event",
|
|
"event_type": subject,
|
|
"timestamp": payload.get("ts") or payload.get("timestamp"),
|
|
"payload": payload
|
|
})
|
|
|
|
print(f"📥 Event logged: {subject}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error handling message from {msg.subject}: {e}")
|
|
|
|
def _infer_entity_type(self, subject: str) -> str:
|
|
"""Infer entity type from NATS subject"""
|
|
if "agent" in subject:
|
|
return "agent"
|
|
elif "dao" in subject:
|
|
return "dao"
|
|
elif "microdao" in subject:
|
|
return "microdao"
|
|
elif "node" in subject:
|
|
return "node"
|
|
elif "city" in subject:
|
|
return "city"
|
|
elif "space" in subject:
|
|
return "space"
|
|
else:
|
|
return "unknown"
|
|
|
|
def _extract_service(self, subject: str) -> str:
|
|
"""Extract service name from subject"""
|
|
parts = subject.split(".")
|
|
if len(parts) > 0:
|
|
return f"{parts[0]}-service"
|
|
return "unknown"
|
|
|
|
async def close(self):
|
|
"""Close NATS connection"""
|
|
if self.nc:
|
|
await self.nc.close()
|
|
print("✅ NATS connection closed")
|
|
|