Guard rails (mixed_routing.py):
- MAX_AGENTS_PER_MIXED_ROOM (default 5): fail-fast at parse time
- MAX_SLASH_LEN (default 32): reject garbage/injection slash tokens
- Unified rejection reasons: unknown_agent, slash_too_long, no_mapping
- REASON_REJECTED_* constants (separate from success REASON_*)
Ingress (ingress.py):
- per-room-agent concurrency semaphore (MIXED_CONCURRENCY_CAP, default 1)
- active_lock_count property for /health + prometheus
- UNKNOWN_AGENT_BEHAVIOR: "ignore" (silent) | "reply_error" (inform user)
- on_routed(agent_id, reason) callback for routing metrics
- on_route_rejected(room_id, reason) callback for rejection metrics
- matrix.route.rejected audit event on every rejection
Config + main:
- max_agents_per_mixed_room, max_slash_len, unknown_agent_behavior, mixed_concurrency_cap
- matrix_bridge_routed_total{agent_id, reason} counter
- matrix_bridge_route_rejected_total{room_id, reason} counter
- matrix_bridge_active_room_agent_locks gauge
- /health: mixed_guard_rails section + total_agents_in_mixed_rooms
- docker-compose: all 4 new guard rail env vars
Runbook: section 9 — mixed room debug guide (6 acceptance tests, routing metrics, session isolation, lock hang, config guard)
Tests: 108 pass (94 → 108, +14 new tests for guard rails + callbacks + concurrency)
Made-with: Cursor
780 lines
32 KiB
Python
780 lines
32 KiB
Python
"""
|
|
Matrix Ingress + Egress Loop — Phase M1.4 + H1 + H2 + H3 + M2.1 + M2.2 (mixed rooms hardening)
|
|
|
|
Architecture (H2):
|
|
Reader task → asyncio.Queue(maxsize) → N Worker tasks
|
|
─────────────────────────────────────────────────────────
|
|
Reader:
|
|
sync_poll() → extract_room_messages()
|
|
→ rate_limit check (H1)
|
|
→ mark_seen / dedupe
|
|
→ queue.put_nowait() or DROP (audit matrix.queue_full + metric)
|
|
|
|
Workers (N concurrent):
|
|
queue.get() → measure wait latency (H3)
|
|
→ audit matrix.message.received
|
|
→ invoke Router (timed, H3)
|
|
→ send_text() (timed, H3)
|
|
→ audit matrix.agent.replied | matrix.error
|
|
|
|
Shutdown:
|
|
1. stop_event set → reader exits loop
|
|
2. queue.join() with drain_timeout → workers finish in-flight
|
|
3. worker tasks cancelled
|
|
|
|
Queue entry: _QueueEntry(event, room_id, agent_id, enqueue_time, routing_reason, is_mixed)
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Callable, Dict, List, Optional
|
|
|
|
import httpx
|
|
|
|
from .matrix_client import MatrixClient
|
|
from .mixed_routing import (
|
|
MixedRoomConfig, route_message, reply_prefix,
|
|
REASON_REJECTED_UNKNOWN_AGENT, REASON_REJECTED_SLASH_TOO_LONG, REASON_REJECTED_NO_MAPPING,
|
|
)
|
|
from .rate_limit import InMemoryRateLimiter
|
|
from .room_mapping import RoomMappingConfig, RoomMapping
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ── Constants ──────────────────────────────────────────────────────────────────
|
|
|
|
_MAX_RETRY_BACKOFF = 60.0
|
|
_INIT_RETRY_BACKOFF = 2.0
|
|
_ROUTER_TIMEOUT_S = 45.0
|
|
_AUDIT_TIMEOUT_S = 5.0
|
|
_REPLY_TEXT_MAX = 4000
|
|
_WORKER_GET_TIMEOUT_S = 1.0 # how long a worker waits on empty queue before re-checking
|
|
|
|
|
|
# ── Queue entry ────────────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class _QueueEntry:
|
|
event: Dict[str, Any]
|
|
room_id: str
|
|
agent_id: str
|
|
enqueue_time: float # time.monotonic() at enqueue
|
|
routing_reason: str = "direct"
|
|
is_mixed: bool = False # True for mixed-room entries (reply tagging, session isolation)
|
|
|
|
|
|
# ── Router invoke ──────────────────────────────────────────────────────────────
|
|
|
|
async def _invoke_router(
|
|
http_client: httpx.AsyncClient,
|
|
router_url: str,
|
|
agent_id: str,
|
|
node_id: str,
|
|
prompt: str,
|
|
session_id: str,
|
|
) -> str:
|
|
"""POST /v1/agents/{agent_id}/infer → response text. Raises httpx.HTTPError on failure."""
|
|
url = f"{router_url.rstrip('/')}/v1/agents/{agent_id}/infer"
|
|
payload = {
|
|
"prompt": prompt,
|
|
"session_id": session_id,
|
|
"user_id": "matrix_bridge",
|
|
"metadata": {"transport": "matrix", "node_id": node_id},
|
|
}
|
|
resp = await http_client.post(url, json=payload, timeout=_ROUTER_TIMEOUT_S)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
text = (
|
|
data.get("response")
|
|
or data.get("text")
|
|
or data.get("content")
|
|
or data.get("message")
|
|
or ""
|
|
)
|
|
return (text if isinstance(text, str) else str(text)).strip()
|
|
|
|
|
|
# ── Audit write ────────────────────────────────────────────────────────────────
|
|
|
|
async def _write_audit(
|
|
http_client: httpx.AsyncClient,
|
|
console_url: str,
|
|
internal_token: str,
|
|
event: str,
|
|
agent_id: str,
|
|
node_id: str,
|
|
room_id: str,
|
|
event_id: str,
|
|
status: str = "ok",
|
|
error_code: Optional[str] = None,
|
|
duration_ms: Optional[int] = None,
|
|
data: Optional[Dict[str, Any]] = None,
|
|
) -> None:
|
|
"""Fire-and-forget. Never raises."""
|
|
if not console_url or not internal_token:
|
|
return
|
|
try:
|
|
await http_client.post(
|
|
f"{console_url.rstrip('/')}/api/audit/internal",
|
|
json={
|
|
"event": event,
|
|
"operator_id": "matrix_bridge",
|
|
"node_id": node_id,
|
|
"agent_id": agent_id,
|
|
"chat_id": room_id,
|
|
"status": status,
|
|
"error_code": error_code,
|
|
"duration_ms": duration_ms,
|
|
"data": {"matrix_event_id": event_id, "matrix_room_id": room_id, **(data or {})},
|
|
},
|
|
headers={"X-Internal-Service-Token": internal_token},
|
|
timeout=_AUDIT_TIMEOUT_S,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("Audit write failed (non-blocking): %s", exc)
|
|
|
|
|
|
# ── Ingress loop (reader + workers) ───────────────────────────────────────────
|
|
|
|
class MatrixIngressLoop:
|
|
"""
|
|
Drives the full Matrix → Router → Matrix pipeline with backpressure.
|
|
|
|
Reader task: sync → extract → rate_check → dedupe → queue.put_nowait
|
|
Worker tasks: queue.get → invoke → send → audit
|
|
|
|
Metric callbacks (all optional, called synchronously):
|
|
on_message_received(room_id, agent_id)
|
|
on_message_replied(room_id, agent_id, status)
|
|
on_gateway_error(error_type)
|
|
on_rate_limited(room_id, agent_id, limit_type)
|
|
on_queue_dropped(room_id, agent_id)
|
|
on_queue_size(current_size: int)
|
|
on_invoke_latency(agent_id, duration_seconds)
|
|
on_send_latency(agent_id, duration_seconds)
|
|
on_queue_wait(agent_id, wait_seconds)
|
|
on_routed(agent_id, reason) M2.2: successful routing
|
|
on_route_rejected(room_id, reason) M2.2: routing rejection
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
matrix_homeserver_url: str,
|
|
matrix_access_token: str,
|
|
matrix_user_id: str,
|
|
router_url: str,
|
|
node_id: str,
|
|
room_map: RoomMappingConfig,
|
|
sofiia_console_url: str = "",
|
|
sofiia_internal_token: str = "",
|
|
rate_limiter: Optional[InMemoryRateLimiter] = None,
|
|
queue_max_events: int = 100,
|
|
worker_concurrency: int = 2,
|
|
queue_drain_timeout_s: float = 5.0,
|
|
mixed_room_config: Optional[MixedRoomConfig] = None,
|
|
# M2.2: guard rails
|
|
unknown_agent_behavior: str = "ignore", # "ignore" | "reply_error"
|
|
max_slash_len: int = 32,
|
|
mixed_concurrency_cap: int = 1, # 0 = unlimited
|
|
# Callbacks
|
|
on_message_received: Optional[Callable[[str, str], None]] = None,
|
|
on_message_replied: Optional[Callable[[str, str, str], None]] = None,
|
|
on_gateway_error: Optional[Callable[[str], None]] = None,
|
|
on_rate_limited: Optional[Callable[[str, str, str], None]] = None,
|
|
on_queue_dropped: Optional[Callable[[str, str], None]] = None,
|
|
on_queue_size: Optional[Callable[[int], None]] = None,
|
|
on_invoke_latency: Optional[Callable[[str, float], None]] = None,
|
|
on_send_latency: Optional[Callable[[str, float], None]] = None,
|
|
on_queue_wait: Optional[Callable[[str, float], None]] = None,
|
|
on_routed: Optional[Callable[[str, str], None]] = None,
|
|
on_route_rejected: Optional[Callable[[str, str], None]] = None,
|
|
) -> None:
|
|
self._hs_url = matrix_homeserver_url
|
|
self._token = matrix_access_token
|
|
self._user_id = matrix_user_id
|
|
self._router_url = router_url
|
|
self._node_id = node_id
|
|
self._room_map = room_map
|
|
self._console_url = sofiia_console_url
|
|
self._internal_token = sofiia_internal_token
|
|
self._rate_limiter = rate_limiter
|
|
self._queue_max = queue_max_events
|
|
self._worker_count = worker_concurrency
|
|
self._drain_timeout_s = queue_drain_timeout_s
|
|
# Callbacks
|
|
self._on_message_received = on_message_received
|
|
self._on_message_replied = on_message_replied
|
|
self._on_gateway_error = on_gateway_error
|
|
self._on_rate_limited = on_rate_limited
|
|
self._on_queue_dropped = on_queue_dropped
|
|
self._on_queue_size = on_queue_size
|
|
self._on_invoke_latency = on_invoke_latency
|
|
self._on_send_latency = on_send_latency
|
|
self._on_queue_wait = on_queue_wait
|
|
self._mixed_room_config = mixed_room_config
|
|
self._unknown_agent_behavior = unknown_agent_behavior
|
|
self._max_slash_len = max_slash_len
|
|
self._mixed_concurrency_cap = mixed_concurrency_cap
|
|
self._on_routed = on_routed
|
|
self._on_route_rejected = on_route_rejected
|
|
# Lazily populated semaphores keyed by "{room_id}:{agent_id}"
|
|
self._concurrency_locks: Dict[str, asyncio.Semaphore] = {}
|
|
self._next_batch: Optional[str] = None
|
|
self._queue: Optional[asyncio.Queue] = None # exposed for /health
|
|
|
|
@property
|
|
def next_batch(self) -> Optional[str]:
|
|
return self._next_batch
|
|
|
|
@property
|
|
def queue_size(self) -> int:
|
|
return self._queue.qsize() if self._queue else 0
|
|
|
|
@property
|
|
def worker_count(self) -> int:
|
|
return self._worker_count
|
|
|
|
@property
|
|
def active_lock_count(self) -> int:
|
|
"""Number of room-agent pairs currently holding a concurrency lock."""
|
|
return sum(1 for lock in self._concurrency_locks.values() if lock.locked())
|
|
|
|
def _get_concurrency_lock(self, room_id: str, agent_id: str) -> asyncio.Semaphore:
|
|
"""Lazily create and return the semaphore for a (room, agent) pair."""
|
|
key = f"{room_id}:{agent_id}"
|
|
if key not in self._concurrency_locks:
|
|
cap = self._mixed_concurrency_cap if self._mixed_concurrency_cap > 0 else 2 ** 31
|
|
self._concurrency_locks[key] = asyncio.Semaphore(cap)
|
|
return self._concurrency_locks[key]
|
|
|
|
# ── Public run ─────────────────────────────────────────────────────────────
|
|
|
|
async def run(self, stop_event: asyncio.Event) -> None:
|
|
mixed_rooms_count = self._mixed_room_config.total_rooms if self._mixed_room_config else 0
|
|
logger.info(
|
|
"Matrix ingress loop started | hs=%s node=%s mappings=%d mixed_rooms=%d "
|
|
"queue_max=%d workers=%d",
|
|
self._hs_url, self._node_id, self._room_map.total_mappings, mixed_rooms_count,
|
|
self._queue_max, self._worker_count,
|
|
)
|
|
|
|
if self._room_map.total_mappings == 0 and mixed_rooms_count == 0:
|
|
logger.warning("No room mappings — ingress loop is idle")
|
|
|
|
queue: asyncio.Queue[Optional[_QueueEntry]] = asyncio.Queue(
|
|
maxsize=self._queue_max
|
|
)
|
|
self._queue = queue
|
|
|
|
async with MatrixClient(self._hs_url, self._token, self._user_id) as client:
|
|
for mapping in self._room_map.mappings:
|
|
if mapping.agent_id in self._room_map.allowed_agents:
|
|
try:
|
|
await client.join_room(mapping.room_id)
|
|
except Exception as exc:
|
|
logger.warning("Could not join room %s: %s", mapping.room_id, exc)
|
|
if self._mixed_room_config:
|
|
for room_id in self._mixed_room_config.rooms:
|
|
try:
|
|
await client.join_room(room_id)
|
|
except Exception as exc:
|
|
logger.warning("Could not join mixed room %s: %s", room_id, exc)
|
|
|
|
async with httpx.AsyncClient() as http_client:
|
|
# Start workers
|
|
worker_tasks = [
|
|
asyncio.create_task(
|
|
self._worker(queue, client, http_client),
|
|
name=f"matrix_worker_{i}",
|
|
)
|
|
for i in range(self._worker_count)
|
|
]
|
|
|
|
# Run reader until stop_event
|
|
await self._reader(client, queue, http_client, stop_event)
|
|
|
|
# Drain: wait for all enqueued items to be processed
|
|
logger.info(
|
|
"Reader stopped. Draining queue (%d items, timeout=%.1fs)...",
|
|
queue.qsize(), self._drain_timeout_s,
|
|
)
|
|
try:
|
|
await asyncio.wait_for(queue.join(), timeout=self._drain_timeout_s)
|
|
logger.info("Queue drained successfully")
|
|
except asyncio.TimeoutError:
|
|
remaining = queue.qsize()
|
|
logger.warning(
|
|
"Drain timeout (%.1fs): %d items not processed",
|
|
self._drain_timeout_s, remaining,
|
|
)
|
|
|
|
# Cancel workers
|
|
for task in worker_tasks:
|
|
task.cancel()
|
|
results = await asyncio.gather(*worker_tasks, return_exceptions=True)
|
|
cancelled = sum(1 for r in results if isinstance(r, asyncio.CancelledError))
|
|
logger.info("Workers stopped (%d cancelled)", cancelled)
|
|
|
|
self._queue = None
|
|
logger.info("Matrix ingress loop stopped")
|
|
|
|
# ── Reader ─────────────────────────────────────────────────────────────────
|
|
|
|
async def _reader(
|
|
self,
|
|
client: MatrixClient,
|
|
queue: "asyncio.Queue[Optional[_QueueEntry]]",
|
|
http_client: httpx.AsyncClient,
|
|
stop_event: asyncio.Event,
|
|
) -> None:
|
|
backoff = _INIT_RETRY_BACKOFF
|
|
while not stop_event.is_set():
|
|
try:
|
|
sync_resp = await client.sync_poll(since=self._next_batch)
|
|
self._next_batch = sync_resp.get("next_batch")
|
|
backoff = _INIT_RETRY_BACKOFF
|
|
await self._enqueue_from_sync(client, queue, http_client, sync_resp)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as exc:
|
|
logger.error("Reader error (retry in %.1fs): %s", backoff, exc)
|
|
if self._on_gateway_error:
|
|
self._on_gateway_error("sync_error")
|
|
try:
|
|
await asyncio.wait_for(stop_event.wait(), timeout=backoff)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
backoff = min(backoff * 2, _MAX_RETRY_BACKOFF)
|
|
|
|
async def _enqueue_from_sync(
|
|
self,
|
|
client: MatrixClient,
|
|
queue: "asyncio.Queue[Optional[_QueueEntry]]",
|
|
http_client: httpx.AsyncClient,
|
|
sync_resp: Dict[str, Any],
|
|
) -> None:
|
|
# Regular rooms: 1 room → 1 agent (M1 / M2.0)
|
|
for mapping in self._room_map.mappings:
|
|
if mapping.agent_id not in self._room_map.allowed_agents:
|
|
continue
|
|
messages = client.extract_room_messages(sync_resp, mapping.room_id)
|
|
for event in messages:
|
|
await self._try_enqueue(client, queue, http_client, event, mapping)
|
|
|
|
# Mixed rooms: 1 room → N agents, routing per message (M2.1)
|
|
if self._mixed_room_config:
|
|
for room_id in self._mixed_room_config.rooms:
|
|
messages = client.extract_room_messages(sync_resp, room_id)
|
|
for event in messages:
|
|
await self._try_enqueue_mixed(client, queue, http_client, event, room_id)
|
|
|
|
async def _try_enqueue(
|
|
self,
|
|
client: MatrixClient,
|
|
queue: "asyncio.Queue[Optional[_QueueEntry]]",
|
|
http_client: httpx.AsyncClient,
|
|
event: Dict[str, Any],
|
|
mapping: RoomMapping,
|
|
) -> None:
|
|
event_id = event.get("event_id", "")
|
|
sender = event.get("sender", "")
|
|
text = event.get("content", {}).get("body", "").strip()
|
|
room_id = mapping.room_id
|
|
agent_id = mapping.agent_id
|
|
|
|
if not text:
|
|
return
|
|
|
|
# H1: Rate limit (before mark_seen — don't charge quota on drop)
|
|
if self._rate_limiter is not None:
|
|
allowed, limit_type = self._rate_limiter.check(room_id=room_id, sender=sender)
|
|
if not allowed:
|
|
logger.warning(
|
|
"Rate limited: room=%s sender=%s limit_type=%s event=%s",
|
|
room_id, sender, limit_type, event_id,
|
|
)
|
|
if self._on_rate_limited:
|
|
self._on_rate_limited(room_id, agent_id, limit_type or "unknown")
|
|
await _write_audit(
|
|
http_client, self._console_url, self._internal_token,
|
|
event="matrix.rate_limited",
|
|
agent_id=agent_id, node_id=self._node_id,
|
|
room_id=room_id, event_id=event_id,
|
|
status="error", error_code=f"rate_limit_{limit_type}",
|
|
data={"sender": sender, "limit_type": limit_type},
|
|
)
|
|
return
|
|
|
|
# Dedupe — mark before enqueue (prevents double-enqueue on retry)
|
|
client.mark_seen(event_id)
|
|
|
|
# H2: Enqueue or drop
|
|
entry = _QueueEntry(
|
|
event=event,
|
|
room_id=room_id,
|
|
agent_id=agent_id,
|
|
enqueue_time=time.monotonic(),
|
|
)
|
|
try:
|
|
queue.put_nowait(entry)
|
|
qsize = queue.qsize()
|
|
logger.debug("Enqueued event=%s qsize=%d", event_id, qsize)
|
|
if self._on_queue_size:
|
|
self._on_queue_size(qsize)
|
|
except asyncio.QueueFull:
|
|
logger.warning(
|
|
"Queue full (max=%d): dropping event=%s room=%s agent=%s",
|
|
self._queue_max, event_id, room_id, agent_id,
|
|
)
|
|
if self._on_queue_dropped:
|
|
self._on_queue_dropped(room_id, agent_id)
|
|
await _write_audit(
|
|
http_client, self._console_url, self._internal_token,
|
|
event="matrix.queue_full",
|
|
agent_id=agent_id, node_id=self._node_id,
|
|
room_id=room_id, event_id=event_id,
|
|
status="error", error_code="queue_full",
|
|
data={"queue_max": self._queue_max, "sender": sender},
|
|
)
|
|
|
|
async def _try_enqueue_mixed(
|
|
self,
|
|
client: MatrixClient,
|
|
queue: "asyncio.Queue[Optional[_QueueEntry]]",
|
|
http_client: httpx.AsyncClient,
|
|
event: Dict[str, Any],
|
|
room_id: str,
|
|
) -> None:
|
|
"""Enqueue a message from a mixed room, routing to the appropriate agent."""
|
|
assert self._mixed_room_config is not None
|
|
event_id = event.get("event_id", "")
|
|
sender = event.get("sender", "")
|
|
text = event.get("content", {}).get("body", "").strip()
|
|
|
|
if not text:
|
|
return
|
|
|
|
# Route message to determine target agent
|
|
agent_id, routing_reason, effective_text = route_message(
|
|
text, room_id, self._mixed_room_config, self._room_map.allowed_agents,
|
|
max_slash_len=self._max_slash_len,
|
|
)
|
|
|
|
if agent_id is None:
|
|
# M2.2: routing rejected — audit + metric + optional error reply
|
|
logger.warning(
|
|
"Mixed room %s: routing rejected reason=%s event=%s",
|
|
room_id, routing_reason, event_id,
|
|
)
|
|
if self._on_route_rejected:
|
|
self._on_route_rejected(room_id, routing_reason)
|
|
await _write_audit(
|
|
http_client, self._console_url, self._internal_token,
|
|
event="matrix.route.rejected",
|
|
agent_id="unknown", node_id=self._node_id,
|
|
room_id=room_id, event_id=event_id,
|
|
status="error", error_code=routing_reason,
|
|
data={"routing_reason": routing_reason, "sender": sender, "text_len": len(text)},
|
|
)
|
|
# M2.2: optional user-facing error reply in room
|
|
if self._unknown_agent_behavior == "reply_error" and routing_reason == REASON_REJECTED_UNKNOWN_AGENT:
|
|
available = self._mixed_room_config.agents_for_room(room_id)
|
|
# Extract agent name from text (first slash token, if any)
|
|
slash_token = text.strip().split()[0].lstrip("/") if text.strip().startswith("/") else ""
|
|
label = f"`/{slash_token}`" if slash_token else "this command"
|
|
error_msg = (
|
|
f"⚠️ Unknown agent {label}. "
|
|
f"Available in this room: {', '.join(available)}"
|
|
)
|
|
txn_id = MatrixClient.make_txn_id(room_id, event_id + "_reject")
|
|
try:
|
|
await client.send_text(room_id, error_msg, txn_id)
|
|
except Exception as exc:
|
|
logger.warning("Could not send route-error reply: %s", exc)
|
|
return
|
|
|
|
# M2.2: successful route — fire metric callback
|
|
if self._on_routed:
|
|
self._on_routed(agent_id, routing_reason)
|
|
|
|
# H1: Rate limit (uses final agent_id for metric tagging)
|
|
if self._rate_limiter is not None:
|
|
allowed, limit_type = self._rate_limiter.check(room_id=room_id, sender=sender)
|
|
if not allowed:
|
|
logger.warning(
|
|
"Rate limited (mixed): room=%s sender=%s agent=%s limit_type=%s",
|
|
room_id, sender, agent_id, limit_type,
|
|
)
|
|
if self._on_rate_limited:
|
|
self._on_rate_limited(room_id, agent_id, limit_type or "unknown")
|
|
await _write_audit(
|
|
http_client, self._console_url, self._internal_token,
|
|
event="matrix.rate_limited",
|
|
agent_id=agent_id, node_id=self._node_id,
|
|
room_id=room_id, event_id=event_id,
|
|
status="error", error_code=f"rate_limit_{limit_type}",
|
|
data={"sender": sender, "limit_type": limit_type, "routing_reason": routing_reason},
|
|
)
|
|
return
|
|
|
|
client.mark_seen(event_id)
|
|
|
|
# Store effective_text (stripped of routing token) in a patched event copy
|
|
effective_event = dict(event)
|
|
effective_event["content"] = dict(event.get("content", {}))
|
|
effective_event["content"]["body"] = effective_text
|
|
|
|
entry = _QueueEntry(
|
|
event=effective_event,
|
|
room_id=room_id,
|
|
agent_id=agent_id,
|
|
enqueue_time=time.monotonic(),
|
|
routing_reason=routing_reason,
|
|
is_mixed=True,
|
|
)
|
|
try:
|
|
queue.put_nowait(entry)
|
|
qsize = queue.qsize()
|
|
logger.debug(
|
|
"Enqueued (mixed): event=%s agent=%s reason=%s qsize=%d",
|
|
event_id, agent_id, routing_reason, qsize,
|
|
)
|
|
if self._on_queue_size:
|
|
self._on_queue_size(qsize)
|
|
except asyncio.QueueFull:
|
|
logger.warning(
|
|
"Queue full (max=%d): dropping mixed event=%s room=%s agent=%s",
|
|
self._queue_max, event_id, room_id, agent_id,
|
|
)
|
|
if self._on_queue_dropped:
|
|
self._on_queue_dropped(room_id, agent_id)
|
|
await _write_audit(
|
|
http_client, self._console_url, self._internal_token,
|
|
event="matrix.queue_full",
|
|
agent_id=agent_id, node_id=self._node_id,
|
|
room_id=room_id, event_id=event_id,
|
|
status="error", error_code="queue_full",
|
|
data={"queue_max": self._queue_max, "sender": sender},
|
|
)
|
|
|
|
# ── Worker ─────────────────────────────────────────────────────────────────
|
|
|
|
async def _worker(
|
|
self,
|
|
queue: "asyncio.Queue[Optional[_QueueEntry]]",
|
|
client: MatrixClient,
|
|
http_client: httpx.AsyncClient,
|
|
) -> None:
|
|
"""Consume queue entries until cancelled."""
|
|
while True:
|
|
entry = await queue.get() # blocks until item available; raises CancelledError on cancel
|
|
try:
|
|
await self._process_entry(client, http_client, entry)
|
|
except Exception as exc:
|
|
logger.error("Worker unhandled error: %s", exc)
|
|
finally:
|
|
queue.task_done()
|
|
if self._on_queue_size:
|
|
self._on_queue_size(queue.qsize())
|
|
|
|
# ── Process (invoke + send + audit) ───────────────────────────────────────
|
|
|
|
async def _process_entry(
|
|
self,
|
|
client: MatrixClient,
|
|
http_client: httpx.AsyncClient,
|
|
entry: _QueueEntry,
|
|
) -> None:
|
|
event = entry.event
|
|
event_id = event.get("event_id", "")
|
|
sender = event.get("sender", "")
|
|
text = event.get("content", {}).get("body", "").strip()
|
|
room_id = entry.room_id
|
|
agent_id = entry.agent_id
|
|
|
|
# H3: Queue wait latency
|
|
wait_s = time.monotonic() - entry.enqueue_time
|
|
if self._on_queue_wait:
|
|
self._on_queue_wait(agent_id, wait_s)
|
|
|
|
routing_reason = entry.routing_reason
|
|
is_mixed = entry.is_mixed
|
|
|
|
logger.info(
|
|
"Processing: room=%s agent=%s event=%s len=%d wait=%.3fs mixed=%s reason=%s",
|
|
room_id, agent_id, event_id, len(text), wait_s, is_mixed, routing_reason,
|
|
)
|
|
|
|
if self._on_message_received:
|
|
self._on_message_received(room_id, agent_id)
|
|
|
|
await _write_audit(
|
|
http_client, self._console_url, self._internal_token,
|
|
event="matrix.message.received",
|
|
agent_id=agent_id, node_id=self._node_id,
|
|
room_id=room_id, event_id=event_id,
|
|
status="ok",
|
|
data={
|
|
"sender": sender,
|
|
"text_len": len(text),
|
|
"queue_wait_ms": int(wait_s * 1000),
|
|
"routing_reason": routing_reason,
|
|
"is_mixed": is_mixed,
|
|
},
|
|
)
|
|
|
|
# M2.1: session isolation per (room, agent) for mixed rooms
|
|
room_key = room_id.replace("!", "").replace(":", "_")
|
|
if is_mixed:
|
|
session_id = f"matrix:{room_key}:{agent_id}"
|
|
else:
|
|
session_id = f"matrix:{room_key}"
|
|
|
|
# M2.2: per-room-agent concurrency cap (only for mixed rooms; single-agent rooms unaffected)
|
|
_lock = self._get_concurrency_lock(room_id, agent_id) if is_mixed and self._mixed_concurrency_cap > 0 else None
|
|
if _lock is not None:
|
|
await _lock.acquire()
|
|
try:
|
|
await self._invoke_and_send(
|
|
client, http_client, entry, session_id, wait_s, is_mixed, routing_reason,
|
|
)
|
|
finally:
|
|
if _lock is not None:
|
|
_lock.release()
|
|
|
|
async def _invoke_and_send(
|
|
self,
|
|
client: MatrixClient,
|
|
http_client: httpx.AsyncClient,
|
|
entry: _QueueEntry,
|
|
session_id: str,
|
|
wait_s: float,
|
|
is_mixed: bool,
|
|
routing_reason: str,
|
|
) -> None:
|
|
"""Inner: invoke Router + send reply (separated for concurrency lock wrapping)."""
|
|
event = entry.event
|
|
event_id = event.get("event_id", "")
|
|
text = event.get("content", {}).get("body", "").strip()
|
|
room_id = entry.room_id
|
|
agent_id = entry.agent_id
|
|
|
|
# H3: Invoke with latency
|
|
t0 = time.monotonic()
|
|
reply_text: Optional[str] = None
|
|
invoke_ok = False
|
|
invoke_duration_s = 0.0
|
|
|
|
try:
|
|
reply_text = await _invoke_router(
|
|
http_client, self._router_url,
|
|
agent_id=agent_id, node_id=self._node_id,
|
|
prompt=text, session_id=session_id,
|
|
)
|
|
invoke_ok = True
|
|
invoke_duration_s = time.monotonic() - t0
|
|
if self._on_invoke_latency:
|
|
self._on_invoke_latency(agent_id, invoke_duration_s)
|
|
logger.info(
|
|
"Invoke ok: agent=%s event=%s reply_len=%d duration=%dms",
|
|
agent_id, event_id, len(reply_text or ""), int(invoke_duration_s * 1000),
|
|
)
|
|
|
|
except httpx.HTTPStatusError as exc:
|
|
invoke_duration_s = time.monotonic() - t0
|
|
logger.error(
|
|
"Router HTTP %d agent=%s event=%s duration=%dms",
|
|
exc.response.status_code, agent_id, event_id, int(invoke_duration_s * 1000),
|
|
)
|
|
if self._on_gateway_error:
|
|
self._on_gateway_error(f"http_{exc.response.status_code}")
|
|
await _write_audit(
|
|
http_client, self._console_url, self._internal_token,
|
|
event="matrix.error", agent_id=agent_id, node_id=self._node_id,
|
|
room_id=room_id, event_id=event_id,
|
|
status="error", error_code=f"router_http_{exc.response.status_code}",
|
|
duration_ms=int(invoke_duration_s * 1000),
|
|
)
|
|
|
|
except (httpx.ConnectError, httpx.TimeoutException) as exc:
|
|
invoke_duration_s = time.monotonic() - t0
|
|
logger.error("Router network error agent=%s event=%s: %s", agent_id, event_id, exc)
|
|
if self._on_gateway_error:
|
|
self._on_gateway_error("network_error")
|
|
await _write_audit(
|
|
http_client, self._console_url, self._internal_token,
|
|
event="matrix.error", agent_id=agent_id, node_id=self._node_id,
|
|
room_id=room_id, event_id=event_id,
|
|
status="error", error_code="router_network_error",
|
|
duration_ms=int(invoke_duration_s * 1000),
|
|
)
|
|
|
|
except Exception as exc:
|
|
invoke_duration_s = time.monotonic() - t0
|
|
logger.error("Unexpected invoke error agent=%s event=%s: %s", agent_id, event_id, exc)
|
|
if self._on_gateway_error:
|
|
self._on_gateway_error("unexpected")
|
|
await _write_audit(
|
|
http_client, self._console_url, self._internal_token,
|
|
event="matrix.error", agent_id=agent_id, node_id=self._node_id,
|
|
room_id=room_id, event_id=event_id,
|
|
status="error", error_code="router_unexpected",
|
|
duration_ms=int(invoke_duration_s * 1000),
|
|
)
|
|
|
|
if not invoke_ok or not reply_text:
|
|
if invoke_ok:
|
|
logger.warning("Empty reply from router agent=%s event=%s", agent_id, event_id)
|
|
return
|
|
|
|
# H3: Send with latency
|
|
# M2.1: prefix reply with agent identity in mixed rooms ("Sofiia: ...")
|
|
prefix = reply_prefix(agent_id, is_mixed)
|
|
raw_reply = reply_text[:_REPLY_TEXT_MAX - len(prefix)]
|
|
send_text = prefix + raw_reply
|
|
txn_id = MatrixClient.make_txn_id(room_id, event_id)
|
|
send_t0 = time.monotonic()
|
|
|
|
try:
|
|
await client.send_text(room_id, send_text, txn_id)
|
|
send_duration_s = time.monotonic() - send_t0
|
|
if self._on_send_latency:
|
|
self._on_send_latency(agent_id, send_duration_s)
|
|
if self._on_message_replied:
|
|
self._on_message_replied(room_id, agent_id, "ok")
|
|
await _write_audit(
|
|
http_client, self._console_url, self._internal_token,
|
|
event="matrix.agent.replied", agent_id=agent_id, node_id=self._node_id,
|
|
room_id=room_id, event_id=event_id, status="ok",
|
|
duration_ms=int(send_duration_s * 1000),
|
|
data={
|
|
"reply_len": len(send_text),
|
|
"truncated": len(reply_text) > _REPLY_TEXT_MAX,
|
|
"router_duration_ms": int(invoke_duration_s * 1000),
|
|
"queue_wait_ms": int(wait_s * 1000),
|
|
"routing_reason": routing_reason,
|
|
"is_mixed": is_mixed,
|
|
},
|
|
)
|
|
logger.info(
|
|
"Reply sent: agent=%s event=%s reply_len=%d send_ms=%d",
|
|
agent_id, event_id, len(send_text), int(send_duration_s * 1000),
|
|
)
|
|
|
|
except Exception as exc:
|
|
send_duration_s = time.monotonic() - send_t0
|
|
logger.error("Send failed agent=%s event=%s: %s", agent_id, event_id, exc)
|
|
if self._on_message_replied:
|
|
self._on_message_replied(room_id, agent_id, "error")
|
|
if self._on_gateway_error:
|
|
self._on_gateway_error("matrix_send_error")
|
|
await _write_audit(
|
|
http_client, self._console_url, self._internal_token,
|
|
event="matrix.error", agent_id=agent_id, node_id=self._node_id,
|
|
room_id=room_id, event_id=event_id,
|
|
status="error", error_code="matrix_send_failed",
|
|
duration_ms=int(send_duration_s * 1000),
|
|
)
|