From a85a11984bf4a6f3effbe38d411d1a70fc3e016d Mon Sep 17 00:00:00 2001 From: Apple Date: Thu, 5 Mar 2026 01:29:18 -0800 Subject: [PATCH] feat(matrix-bridge-dagi): add mixed-room routing by slash/mention (M2.1) - mixed_routing.py: parse BRIDGE_MIXED_ROOM_MAP, route by /slash > @mention > name: > default - ingress.py: _try_enqueue_mixed for mixed rooms, session isolation {room}:{agent}, reply tagging - config.py: bridge_mixed_room_map + bridge_mixed_defaults fields - main.py: parse mixed config, pass to MatrixIngressLoop, expose in /health + /bridge/mappings - docker-compose: BRIDGE_MIXED_ROOM_MAP / BRIDGE_MIXED_DEFAULTS env vars, BRIDGE_ALLOWED_AGENTS multi-value - tests: 25 routing unit tests + 10 ingress integration tests (94 total pass) Made-with: Cursor --- docker-compose.matrix-bridge-node1.yml | 18 +- services/matrix-bridge-dagi/app/config.py | 10 +- services/matrix-bridge-dagi/app/ingress.py | 160 ++++++++- services/matrix-bridge-dagi/app/main.py | 38 +- .../matrix-bridge-dagi/app/mixed_routing.py | 279 +++++++++++++++ tests/test_matrix_bridge_mixed_ingress.py | 338 ++++++++++++++++++ tests/test_matrix_bridge_mixed_routing.py | 227 ++++++++++++ 7 files changed, 1049 insertions(+), 21 deletions(-) create mode 100644 services/matrix-bridge-dagi/app/mixed_routing.py create mode 100644 tests/test_matrix_bridge_mixed_ingress.py create mode 100644 tests/test_matrix_bridge_mixed_routing.py diff --git a/docker-compose.matrix-bridge-node1.yml b/docker-compose.matrix-bridge-node1.yml index 9b9cccf5..9239f9f4 100644 --- a/docker-compose.matrix-bridge-node1.yml +++ b/docker-compose.matrix-bridge-node1.yml @@ -1,4 +1,4 @@ -# Matrix Bridge DAGI — Phase M1 +# Matrix Bridge DAGI — Phase M2.1 (multi-room + mixed routing) # Include into the main NODA1 stack or run standalone: # docker compose -f docker-compose.node1.yml -f docker-compose.matrix-bridge-node1.yml up -d matrix-bridge-dagi @@ -40,12 +40,26 @@ services: - SOFIIA_CONSOLE_URL=http://dagi-sofiia-console-node1:8002 - SOFIIA_INTERNAL_TOKEN=${SOFIIA_INTERNAL_TOKEN:-} + # ── H2: Backpressure queue ─────────────────────────────────────────── + - QUEUE_MAX_EVENTS=100 + - WORKER_CONCURRENCY=2 + - QUEUE_DRAIN_TIMEOUT_S=5 + # ── Policy ─────────────────────────────────────────────────────────── - - BRIDGE_ALLOWED_AGENTS=sofiia + # M2.0+: multiple agents separated by comma + - BRIDGE_ALLOWED_AGENTS=${BRIDGE_ALLOWED_AGENTS:-sofiia} + # M2.0: "sofiia:!room1:server,helion:!room2:server" (1 room → 1 agent) - BRIDGE_ROOM_MAP=${BRIDGE_ROOM_MAP:-} - RATE_LIMIT_ROOM_RPM=20 - RATE_LIMIT_SENDER_RPM=10 + # ── M2.1: Mixed rooms (1 room → N agents) ─────────────────────────── + # Format: "!roomX:server=sofiia,helion;!roomY:server=druid" + - BRIDGE_MIXED_ROOM_MAP=${BRIDGE_MIXED_ROOM_MAP:-} + # Override default agent per mixed room (optional): + # "!roomX:server=helion;!roomY:server=druid" + - BRIDGE_MIXED_DEFAULTS=${BRIDGE_MIXED_DEFAULTS:-} + healthcheck: test: - "CMD" diff --git a/services/matrix-bridge-dagi/app/config.py b/services/matrix-bridge-dagi/app/config.py index 3efb39b9..c5267f15 100644 --- a/services/matrix-bridge-dagi/app/config.py +++ b/services/matrix-bridge-dagi/app/config.py @@ -1,5 +1,5 @@ """ -matrix-bridge-dagi — configuration and validation +matrix-bridge-dagi — configuration and validation (M2.1: mixed rooms) """ import os from dataclasses import dataclass, field @@ -34,6 +34,12 @@ class BridgeConfig: worker_concurrency: int # parallel invoke workers queue_drain_timeout_s: float # graceful shutdown drain timeout + # M2.1: Mixed rooms + # "!roomX:server=sofiia,helion;!roomY:server=druid" + bridge_mixed_room_map: str + # "!roomX:server=helion" — explicit default per mixed room (optional) + bridge_mixed_defaults: str + # Service identity node_id: str build_sha: str @@ -70,6 +76,8 @@ def load_config() -> BridgeConfig: queue_max_events=max(1, int(_optional("QUEUE_MAX_EVENTS", "100"))), worker_concurrency=max(1, int(_optional("WORKER_CONCURRENCY", "2"))), queue_drain_timeout_s=max(1.0, float(_optional("QUEUE_DRAIN_TIMEOUT_S", "5"))), + bridge_mixed_room_map=_optional("BRIDGE_MIXED_ROOM_MAP", ""), + bridge_mixed_defaults=_optional("BRIDGE_MIXED_DEFAULTS", ""), node_id=_optional("NODE_ID", "NODA1"), build_sha=_optional("BUILD_SHA", "dev"), build_time=_optional("BUILD_TIME", "local"), diff --git a/services/matrix-bridge-dagi/app/ingress.py b/services/matrix-bridge-dagi/app/ingress.py index ded027dc..e0d46419 100644 --- a/services/matrix-bridge-dagi/app/ingress.py +++ b/services/matrix-bridge-dagi/app/ingress.py @@ -1,5 +1,5 @@ """ -Matrix Ingress + Egress Loop — Phase M1.4 + H1 + H2 + H3 +Matrix Ingress + Egress Loop — Phase M1.4 + H1 + H2 + H3 + M2.1 (mixed rooms) Architecture (H2): Reader task → asyncio.Queue(maxsize) → N Worker tasks @@ -22,18 +22,19 @@ Shutdown: 2. queue.join() with drain_timeout → workers finish in-flight 3. worker tasks cancelled -Queue entry: _QueueEntry(event, room_id, agent_id, enqueue_time) +Queue entry: _QueueEntry(event, room_id, agent_id, enqueue_time, routing_reason, is_mixed) """ import asyncio import logging import time -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional import httpx from .matrix_client import MatrixClient +from .mixed_routing import MixedRoomConfig, route_message, reply_prefix from .rate_limit import InMemoryRateLimiter from .room_mapping import RoomMappingConfig, RoomMapping @@ -56,7 +57,9 @@ class _QueueEntry: event: Dict[str, Any] room_id: str agent_id: str - enqueue_time: float # time.monotonic() at enqueue + enqueue_time: float # time.monotonic() at enqueue + routing_reason: str = "direct" + is_mixed: bool = False # True for mixed-room entries (reply tagging, session isolation) # ── Router invoke ────────────────────────────────────────────────────────────── @@ -165,6 +168,7 @@ class MatrixIngressLoop: queue_max_events: int = 100, worker_concurrency: int = 2, queue_drain_timeout_s: float = 5.0, + mixed_room_config: Optional[MixedRoomConfig] = None, # Callbacks on_message_received: Optional[Callable[[str, str], None]] = None, on_message_replied: Optional[Callable[[str, str, str], None]] = None, @@ -198,6 +202,7 @@ class MatrixIngressLoop: self._on_invoke_latency = on_invoke_latency self._on_send_latency = on_send_latency self._on_queue_wait = on_queue_wait + self._mixed_room_config = mixed_room_config self._next_batch: Optional[str] = None self._queue: Optional[asyncio.Queue] = None # exposed for /health @@ -216,14 +221,15 @@ class MatrixIngressLoop: # ── Public run ───────────────────────────────────────────────────────────── async def run(self, stop_event: asyncio.Event) -> None: + mixed_rooms_count = self._mixed_room_config.total_rooms if self._mixed_room_config else 0 logger.info( - "Matrix ingress loop started | hs=%s node=%s mappings=%d " + "Matrix ingress loop started | hs=%s node=%s mappings=%d mixed_rooms=%d " "queue_max=%d workers=%d", - self._hs_url, self._node_id, self._room_map.total_mappings, + self._hs_url, self._node_id, self._room_map.total_mappings, mixed_rooms_count, self._queue_max, self._worker_count, ) - if self._room_map.total_mappings == 0: + if self._room_map.total_mappings == 0 and mixed_rooms_count == 0: logger.warning("No room mappings — ingress loop is idle") queue: asyncio.Queue[Optional[_QueueEntry]] = asyncio.Queue( @@ -238,6 +244,12 @@ class MatrixIngressLoop: await client.join_room(mapping.room_id) except Exception as exc: logger.warning("Could not join room %s: %s", mapping.room_id, exc) + if self._mixed_room_config: + for room_id in self._mixed_room_config.rooms: + try: + await client.join_room(room_id) + except Exception as exc: + logger.warning("Could not join mixed room %s: %s", room_id, exc) async with httpx.AsyncClient() as http_client: # Start workers @@ -312,6 +324,7 @@ class MatrixIngressLoop: http_client: httpx.AsyncClient, sync_resp: Dict[str, Any], ) -> None: + # Regular rooms: 1 room → 1 agent (M1 / M2.0) for mapping in self._room_map.mappings: if mapping.agent_id not in self._room_map.allowed_agents: continue @@ -319,6 +332,13 @@ class MatrixIngressLoop: for event in messages: await self._try_enqueue(client, queue, http_client, event, mapping) + # Mixed rooms: 1 room → N agents, routing per message (M2.1) + if self._mixed_room_config: + for room_id in self._mixed_room_config.rooms: + messages = client.extract_room_messages(sync_resp, room_id) + for event in messages: + await self._try_enqueue_mixed(client, queue, http_client, event, room_id) + async def _try_enqueue( self, client: MatrixClient, @@ -388,6 +408,103 @@ class MatrixIngressLoop: data={"queue_max": self._queue_max, "sender": sender}, ) + async def _try_enqueue_mixed( + self, + client: MatrixClient, + queue: "asyncio.Queue[Optional[_QueueEntry]]", + http_client: httpx.AsyncClient, + event: Dict[str, Any], + room_id: str, + ) -> None: + """Enqueue a message from a mixed room, routing to the appropriate agent.""" + assert self._mixed_room_config is not None + event_id = event.get("event_id", "") + sender = event.get("sender", "") + text = event.get("content", {}).get("body", "").strip() + + if not text: + return + + # Route message to determine target agent + agent_id, routing_reason, effective_text = route_message( + text, room_id, self._mixed_room_config, self._room_map.allowed_agents, + ) + + if agent_id is None: + logger.warning( + "Mixed room %s: unresolvable routing reason=%s event=%s — skipping", + room_id, routing_reason, event_id, + ) + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.error", + agent_id="unknown", node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="error", error_code="no_agent_for_message", + data={"routing_reason": routing_reason, "sender": sender}, + ) + return + + # H1: Rate limit (uses final agent_id for metric tagging) + if self._rate_limiter is not None: + allowed, limit_type = self._rate_limiter.check(room_id=room_id, sender=sender) + if not allowed: + logger.warning( + "Rate limited (mixed): room=%s sender=%s agent=%s limit_type=%s", + room_id, sender, agent_id, limit_type, + ) + if self._on_rate_limited: + self._on_rate_limited(room_id, agent_id, limit_type or "unknown") + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.rate_limited", + agent_id=agent_id, node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="error", error_code=f"rate_limit_{limit_type}", + data={"sender": sender, "limit_type": limit_type, "routing_reason": routing_reason}, + ) + return + + client.mark_seen(event_id) + + # Store effective_text (stripped of routing token) in a patched event copy + effective_event = dict(event) + effective_event["content"] = dict(event.get("content", {})) + effective_event["content"]["body"] = effective_text + + entry = _QueueEntry( + event=effective_event, + room_id=room_id, + agent_id=agent_id, + enqueue_time=time.monotonic(), + routing_reason=routing_reason, + is_mixed=True, + ) + try: + queue.put_nowait(entry) + qsize = queue.qsize() + logger.debug( + "Enqueued (mixed): event=%s agent=%s reason=%s qsize=%d", + event_id, agent_id, routing_reason, qsize, + ) + if self._on_queue_size: + self._on_queue_size(qsize) + except asyncio.QueueFull: + logger.warning( + "Queue full (max=%d): dropping mixed event=%s room=%s agent=%s", + self._queue_max, event_id, room_id, agent_id, + ) + if self._on_queue_dropped: + self._on_queue_dropped(room_id, agent_id) + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.queue_full", + agent_id=agent_id, node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="error", error_code="queue_full", + data={"queue_max": self._queue_max, "sender": sender}, + ) + # ── Worker ───────────────────────────────────────────────────────────────── async def _worker( @@ -428,9 +545,12 @@ class MatrixIngressLoop: if self._on_queue_wait: self._on_queue_wait(agent_id, wait_s) + routing_reason = entry.routing_reason + is_mixed = entry.is_mixed + logger.info( - "Processing: room=%s agent=%s event=%s len=%d wait=%.3fs", - room_id, agent_id, event_id, len(text), wait_s, + "Processing: room=%s agent=%s event=%s len=%d wait=%.3fs mixed=%s reason=%s", + room_id, agent_id, event_id, len(text), wait_s, is_mixed, routing_reason, ) if self._on_message_received: @@ -442,10 +562,21 @@ class MatrixIngressLoop: agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="ok", - data={"sender": sender, "text_len": len(text), "queue_wait_ms": int(wait_s * 1000)}, + data={ + "sender": sender, + "text_len": len(text), + "queue_wait_ms": int(wait_s * 1000), + "routing_reason": routing_reason, + "is_mixed": is_mixed, + }, ) - session_id = f"matrix:{room_id.replace('!', '').replace(':', '_')}" + # M2.1: session isolation per (room, agent) for mixed rooms + room_key = room_id.replace("!", "").replace(":", "_") + if is_mixed: + session_id = f"matrix:{room_key}:{agent_id}" + else: + session_id = f"matrix:{room_key}" # H3: Invoke with latency t0 = time.monotonic() @@ -516,7 +647,10 @@ class MatrixIngressLoop: return # H3: Send with latency - send_text = reply_text[:_REPLY_TEXT_MAX] + # M2.1: prefix reply with agent identity in mixed rooms ("Sofiia: ...") + prefix = reply_prefix(agent_id, is_mixed) + raw_reply = reply_text[:_REPLY_TEXT_MAX - len(prefix)] + send_text = prefix + raw_reply txn_id = MatrixClient.make_txn_id(room_id, event_id) send_t0 = time.monotonic() @@ -537,6 +671,8 @@ class MatrixIngressLoop: "truncated": len(reply_text) > _REPLY_TEXT_MAX, "router_duration_ms": int(invoke_duration_s * 1000), "queue_wait_ms": int(wait_s * 1000), + "routing_reason": routing_reason, + "is_mixed": is_mixed, }, ) logger.info( diff --git a/services/matrix-bridge-dagi/app/main.py b/services/matrix-bridge-dagi/app/main.py index 65989be8..ac318bb0 100644 --- a/services/matrix-bridge-dagi/app/main.py +++ b/services/matrix-bridge-dagi/app/main.py @@ -33,6 +33,7 @@ except ImportError: # pragma: no cover from .config import BridgeConfig, load_config from .ingress import MatrixIngressLoop +from .mixed_routing import MixedRoomConfig, parse_mixed_room_map from .rate_limit import InMemoryRateLimiter from .room_mapping import RoomMappingConfig, parse_room_map @@ -112,6 +113,7 @@ _config_error: Optional[str] = None _matrix_reachable: Optional[bool] = None _gateway_reachable: Optional[bool] = None _room_map: Optional[RoomMappingConfig] = None +_mixed_room_config: Optional[MixedRoomConfig] = None _rate_limiter: Optional[InMemoryRateLimiter] = None _ingress_loop: Optional["MatrixIngressLoop"] = None # for /health queue_size _ingress_task: Optional[asyncio.Task] = None @@ -133,16 +135,31 @@ async def _probe_url(url: str, timeout: float = 5.0) -> bool: @asynccontextmanager async def lifespan(app_: Any): global _cfg, _config_error, _matrix_reachable, _gateway_reachable - global _room_map, _rate_limiter, _ingress_loop + global _room_map, _mixed_room_config, _rate_limiter, _ingress_loop try: _cfg = load_config() - # Parse room mapping + # Parse regular room mapping (M1/M2.0: 1 room → 1 agent) _room_map = parse_room_map( os.getenv("BRIDGE_ROOM_MAP", ""), _cfg.bridge_allowed_agents, ) + # Parse mixed room mapping (M2.1: 1 room → N agents) + if _cfg.bridge_mixed_room_map: + _mixed_room_config = parse_mixed_room_map( + _cfg.bridge_mixed_room_map, + _cfg.bridge_mixed_defaults, + _cfg.bridge_allowed_agents, + ) + logger.info( + "✅ Mixed room config: %d rooms, agents=%s", + _mixed_room_config.total_rooms, + [a for r in _mixed_room_config.rooms.values() for a in r.agents], + ) + else: + _mixed_room_config = None + # H1: Rate limiter (inmemory, per config) _rate_limiter = InMemoryRateLimiter( room_rpm=_cfg.rate_limit_room_rpm, @@ -153,12 +170,13 @@ async def lifespan(app_: Any): _cfg.rate_limit_room_rpm, _cfg.rate_limit_sender_rpm, ) + mixed_count = _mixed_room_config.total_rooms if _mixed_room_config else 0 logger.info( "✅ matrix-bridge-dagi started | node=%s build=%s homeserver=%s " - "room=%s agents=%s mappings=%d", + "agents=%s mappings=%d mixed_rooms=%d", _cfg.node_id, _cfg.build_sha, _cfg.matrix_homeserver_url, - _cfg.sofiia_room_id, list(_cfg.bridge_allowed_agents), - _room_map.total_mappings, + list(_cfg.bridge_allowed_agents), + _room_map.total_mappings, mixed_count, ) # Connectivity smoke probes (non-blocking failures) @@ -180,7 +198,10 @@ async def lifespan(app_: Any): _bridge_up.set(1) # Start ingress loop (fire-and-forget asyncio task) - if _room_map and _room_map.total_mappings > 0: + _has_rooms = (_room_map and _room_map.total_mappings > 0) or ( + _mixed_room_config and _mixed_room_config.total_rooms > 0 + ) + if _has_rooms: _ingress_stop = asyncio.Event() def _on_msg(room_id: str, agent_id: str) -> None: @@ -241,6 +262,7 @@ async def lifespan(app_: Any): queue_max_events=_cfg.queue_max_events, worker_concurrency=_cfg.worker_concurrency, queue_drain_timeout_s=_cfg.queue_drain_timeout_s, + mixed_room_config=_mixed_room_config, on_message_received=_on_msg, on_message_replied=_on_replied, on_gateway_error=_on_gw_error, @@ -328,6 +350,7 @@ async def health() -> Dict[str, Any]: "gateway": _cfg.dagi_gateway_url, "gateway_reachable": _gateway_reachable, "mappings_count": _room_map.total_mappings if _room_map else 0, + "mixed_rooms_count": _mixed_room_config.total_rooms if _mixed_room_config else 0, "config_ok": True, "rate_limiter": _rate_limiter.stats() if _rate_limiter else None, "queue": { @@ -350,12 +373,15 @@ async def bridge_mappings() -> Dict[str, Any]: "ok": False, "error": _config_error or "service not initialised", "mappings": [], + "mixed_rooms": [], } return { "ok": True, "total": _room_map.total_mappings, "allowed_agents": list(_cfg.bridge_allowed_agents), "mappings": _room_map.as_summary(), + "mixed_rooms_total": _mixed_room_config.total_rooms if _mixed_room_config else 0, + "mixed_rooms": _mixed_room_config.as_summary() if _mixed_room_config else [], } diff --git a/services/matrix-bridge-dagi/app/mixed_routing.py b/services/matrix-bridge-dagi/app/mixed_routing.py new file mode 100644 index 00000000..5c84a0c3 --- /dev/null +++ b/services/matrix-bridge-dagi/app/mixed_routing.py @@ -0,0 +1,279 @@ +""" +Mixed-Room Routing — Phase M2.1 + +Supports 1 room → N agents with deterministic message routing. + +Env: + BRIDGE_MIXED_ROOM_MAP=!roomX:server=sofiia,helion;!roomY:server=druid,nutra + BRIDGE_MIXED_DEFAULTS=!roomX:server=sofiia;!roomY:server=druid (optional) + +Routing priority (per message): + 1. Slash command: /sofiia message text → agent=sofiia + 2. Mention @: @sofiia message text → agent=sofiia + 3. Mention name: sofiia: message text → agent=sofiia + 4. Fallback: default_agent_for_room (first in list, or explicit BRIDGE_MIXED_DEFAULTS) + +Reply tagging (mixed room only): + Worker prepends "Agentname: " to reply so users see who answered. + Single-agent rooms are unaffected. +""" + +import logging +import re +from dataclasses import dataclass, field +from typing import Dict, FrozenSet, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +# Room ID format: !: +_ROOM_ID_RE = re.compile(r"^![A-Za-z0-9\-_.]+:[A-Za-z0-9\-_.]+$") + +# Routing patterns (compiled once) +_SLASH_RE = re.compile(r"^/([A-Za-z0-9_\-]+)\s*(.*)", re.DOTALL) +_MENTION_AT_RE = re.compile(r"^@([A-Za-z0-9_\-]+)\s*(.*)", re.DOTALL) +_MENTION_COLON_RE = re.compile(r"^([A-Za-z0-9_\-]+):\s+(.*)", re.DOTALL) + +# Routing reason labels +REASON_SLASH = "slash_command" +REASON_AT_MENTION = "at_mention" +REASON_COLON_MENTION = "colon_mention" +REASON_DEFAULT = "default" + + +# ── Data structures ──────────────────────────────────────────────────────────── + +@dataclass +class MixedRoom: + """A single mixed room with its ordered agent list and default agent.""" + room_id: str + agents: List[str] # ordered; first = default if not overridden + default_agent: str # explicit default (from BRIDGE_MIXED_DEFAULTS or first agent) + + def __post_init__(self) -> None: + if self.default_agent not in self.agents: + raise ValueError( + f"MixedRoom {self.room_id!r}: default_agent {self.default_agent!r} " + f"not in agents list {self.agents}" + ) + + +@dataclass +class MixedRoomConfig: + """Parsed configuration for all mixed rooms.""" + rooms: Dict[str, MixedRoom] = field(default_factory=dict) # room_id → MixedRoom + + @property + def total_rooms(self) -> int: + return len(self.rooms) + + def is_mixed(self, room_id: str) -> bool: + return room_id in self.rooms + + def agents_for_room(self, room_id: str) -> List[str]: + room = self.rooms.get(room_id) + return list(room.agents) if room else [] + + def default_agent(self, room_id: str) -> Optional[str]: + room = self.rooms.get(room_id) + return room.default_agent if room else None + + def as_summary(self) -> List[Dict]: + return [ + { + "room_id": room_id, + "agents": list(room.agents), + "default_agent": room.default_agent, + } + for room_id, room in self.rooms.items() + ] + + +# ── Parsers ──────────────────────────────────────────────────────────────────── + +def parse_mixed_room_map( + raw_map: str, + raw_defaults: str, + allowed_agents: FrozenSet[str], +) -> MixedRoomConfig: + """ + Parse BRIDGE_MIXED_ROOM_MAP and BRIDGE_MIXED_DEFAULTS into MixedRoomConfig. + + Map format: "!room1:server=sofiia,helion;!room2:server=druid" + Defaults fmt: "!room1:server=sofiia;!room2:server=druid" + + Raises ValueError on: + - Malformed room_id + - Empty agent list + - Agent not in allowed_agents + - Duplicate room_id in map + """ + if not raw_map or not raw_map.strip(): + return MixedRoomConfig() + + # Parse explicit defaults first + explicit_defaults: Dict[str, str] = {} + if raw_defaults and raw_defaults.strip(): + for entry in raw_defaults.split(";"): + entry = entry.strip() + if not entry: + continue + if "=" not in entry: + raise ValueError(f"BRIDGE_MIXED_DEFAULTS bad entry (no '='): {entry!r}") + rid, agent = entry.split("=", 1) + rid, agent = rid.strip(), agent.strip() + if not _ROOM_ID_RE.match(rid): + raise ValueError(f"BRIDGE_MIXED_DEFAULTS bad room_id: {rid!r}") + explicit_defaults[rid] = agent + + rooms: Dict[str, MixedRoom] = {} + errors: List[str] = [] + + for entry in raw_map.split(";"): + entry = entry.strip() + if not entry: + continue + + if "=" not in entry: + errors.append(f"BRIDGE_MIXED_ROOM_MAP bad entry (no '='): {entry!r}") + continue + + room_id, agents_raw = entry.split("=", 1) + room_id = room_id.strip() + agents_raw = agents_raw.strip() + + if not _ROOM_ID_RE.match(room_id): + errors.append(f"Invalid room_id format: {room_id!r}") + continue + + if room_id in rooms: + errors.append(f"Duplicate room_id in BRIDGE_MIXED_ROOM_MAP: {room_id!r}") + continue + + agents = [a.strip() for a in agents_raw.split(",") if a.strip()] + if not agents: + errors.append(f"Empty agent list for room {room_id!r}") + continue + + invalid = [a for a in agents if a not in allowed_agents] + if invalid: + errors.append( + f"Agents {invalid} for room {room_id!r} not in allowed_agents {set(allowed_agents)}" + ) + continue + + default = explicit_defaults.get(room_id, agents[0]) + if default not in agents: + errors.append( + f"Default agent {default!r} for room {room_id!r} not in agents list {agents}" + ) + continue + + rooms[room_id] = MixedRoom(room_id=room_id, agents=agents, default_agent=default) + + if errors: + raise ValueError(f"BRIDGE_MIXED_ROOM_MAP parse errors: {'; '.join(errors)}") + + config = MixedRoomConfig(rooms=rooms) + logger.info( + "Mixed room config loaded: %d rooms, total agents=%d", + config.total_rooms, + sum(len(r.agents) for r in rooms.values()), + ) + return config + + +# ── Routing ──────────────────────────────────────────────────────────────────── + +def route_message( + text: str, + room_id: str, + config: MixedRoomConfig, + allowed_agents: FrozenSet[str], +) -> Tuple[Optional[str], str, str]: + """ + Determine which agent should handle this message. + + Returns: + (agent_id, routing_reason, effective_text) + + agent_id: matched agent or None if unresolvable + routing_reason: one of REASON_* constants + effective_text: text with routing prefix stripped (for cleaner invoke) + + Priority: + 1. /agentname ... (slash command) + 2. @agentname ... (at-mention) + 3. agentname: ... (colon-mention) + 4. default agent for room (fallback) + """ + room = config.rooms.get(room_id) + if room is None: + return None, "no_mapping", text + + stripped = text.strip() + + # 1. Slash: /sofiia hello world + m = _SLASH_RE.match(stripped) + if m: + candidate = m.group(1).lower() + body = m.group(2).strip() or stripped # keep original if body empty + agent = _resolve_agent(candidate, room, allowed_agents) + if agent: + logger.debug("Slash route: /%s → %s", candidate, agent) + return agent, REASON_SLASH, body + # Unknown agent → return None + log; do not fall through to default + logger.warning( + "Slash command /%s in room %s: agent not recognised or not allowed", + candidate, room_id, + ) + return None, f"unknown_slash_{candidate}", text + + # 2. @mention: @sofiia hello + m = _MENTION_AT_RE.match(stripped) + if m: + candidate = m.group(1).lower() + body = m.group(2).strip() or stripped + agent = _resolve_agent(candidate, room, allowed_agents) + if agent: + logger.debug("@mention route: @%s → %s", candidate, agent) + return agent, REASON_AT_MENTION, body + + # 3. colon-mention: sofiia: hello + m = _MENTION_COLON_RE.match(stripped) + if m: + candidate = m.group(1).lower() + body = m.group(2).strip() or stripped + agent = _resolve_agent(candidate, room, allowed_agents) + if agent: + logger.debug("Colon-mention route: %s: → %s", candidate, agent) + return agent, REASON_COLON_MENTION, body + + # 4. Default fallback + return room.default_agent, REASON_DEFAULT, stripped + + +def _resolve_agent( + candidate: str, + room: MixedRoom, + allowed_agents: FrozenSet[str], +) -> Optional[str]: + """ + Return agent_id if candidate matches an allowed agent in this room. + Matching is case-insensitive against agent ids and their base names. + """ + for agent in room.agents: + if candidate == agent.lower(): + if agent in allowed_agents: + return agent + return None + + +def reply_prefix(agent_id: str, is_mixed: bool) -> str: + """ + Return reply prefix string for mixed rooms. + Single-agent rooms get empty prefix (no change to M1 behaviour). + """ + if not is_mixed: + return "" + # Capitalise first letter of agent name: "sofiia" → "Sofiia" + return f"{agent_id.capitalize()}: " diff --git a/tests/test_matrix_bridge_mixed_ingress.py b/tests/test_matrix_bridge_mixed_ingress.py new file mode 100644 index 00000000..b558a762 --- /dev/null +++ b/tests/test_matrix_bridge_mixed_ingress.py @@ -0,0 +1,338 @@ +""" +Tests for mixed-room routing in MatrixIngressLoop (M2.1). + +Covers: + - Slash command routes to correct agent in mixed room + - @mention routes to correct agent in mixed room + - Default fallback routes to first agent + - Unknown /slash returns no invoke + audit error + - Reply is prefixed with agent name in mixed room + - Session isolation: different agents get different session_ids + - Multi-room: regular room and mixed room coexist correctly + - Rate-limited message in mixed room is dropped + - Direct (single-agent) room reply has no prefix +""" + +import asyncio +import sys +from pathlib import Path +from typing import Any, Dict, List +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +_BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi" +if str(_BRIDGE) not in sys.path: + sys.path.insert(0, str(_BRIDGE)) + +from app.ingress import MatrixIngressLoop, _QueueEntry # noqa: E402 +from app.mixed_routing import parse_mixed_room_map # noqa: E402 +from app.room_mapping import parse_room_map # noqa: E402 + +# ── Constants ──────────────────────────────────────────────────────────────── + +ROOM_MIXED = "!mixedRoom:daarion.space" +ROOM_DIRECT = "!directRoom:daarion.space" +ALLOWED = frozenset({"sofiia", "helion", "druid"}) + + +# ── Helpers ────────────────────────────────────────────────────────────────── + +def run(coro): + return asyncio.run(coro) + + +def _make_event(body: str, event_id: str = "evt1", sender: str = "@user:test") -> Dict[str, Any]: + return { + "event_id": event_id, + "sender": sender, + "type": "m.room.message", + "content": {"msgtype": "m.text", "body": body}, + } + + +def _make_ingress( + mixed_raw: str = "", + direct_raw: str = "", + allowed: frozenset = ALLOWED, +) -> MatrixIngressLoop: + room_map = parse_room_map(direct_raw, allowed) if direct_raw else parse_room_map("", allowed) + mixed_cfg = parse_mixed_room_map(mixed_raw, "", allowed) if mixed_raw else None + return MatrixIngressLoop( + matrix_homeserver_url="https://matrix.test", + matrix_access_token="tok_test", + matrix_user_id="@bridge:test", + router_url="http://router:8000", + node_id="test_node", + room_map=room_map, + mixed_room_config=mixed_cfg, + queue_max_events=50, + worker_concurrency=1, + ) + + +def _fake_client(room_events: Dict[str, List[Dict[str, Any]]]) -> MagicMock: + """Return a mock MatrixClient that yields pre-set events per room.""" + c = MagicMock() + c.extract_room_messages.side_effect = lambda sync_resp, room_id: room_events.get(room_id, []) + c.mark_seen = MagicMock() + c.send_text = AsyncMock(return_value=None) + return c + + +# ── Tests ──────────────────────────────────────────────────────────────────── + +def test_slash_command_routes_to_helion(): + """/helion in mixed room → agent=helion, body stripped.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + client = _fake_client({ROOM_MIXED: [_make_event("/helion what is the weather?", event_id="e1")]}) + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._enqueue_from_sync(client, queue, AsyncMock(), {}) + + run(_run()) + + assert queue.qsize() == 1 + entry: _QueueEntry = queue.get_nowait() + assert entry.agent_id == "helion" + assert entry.is_mixed is True + assert entry.routing_reason == "slash_command" + assert entry.event["content"]["body"] == "what is the weather?" + + +def test_at_mention_routes_to_sofiia(): + """@sofiia in mixed room → agent=sofiia.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + client = _fake_client({ROOM_MIXED: [_make_event("@sofiia check status", event_id="e2")]}) + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._enqueue_from_sync(client, queue, AsyncMock(), {}) + + run(_run()) + + entry: _QueueEntry = queue.get_nowait() + assert entry.agent_id == "sofiia" + assert entry.routing_reason == "at_mention" + + +def test_colon_mention_routes_to_sofiia(): + """'sofiia: help' in mixed room → agent=sofiia.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + client = _fake_client({ROOM_MIXED: [_make_event("sofiia: can you help?", event_id="e3")]}) + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._enqueue_from_sync(client, queue, AsyncMock(), {}) + + run(_run()) + + entry: _QueueEntry = queue.get_nowait() + assert entry.agent_id == "sofiia" + assert entry.routing_reason == "colon_mention" + + +def test_default_fallback_routes_to_first_agent(): + """Plain text with no routing token → default (first in list = helion).""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=helion,sofiia") + client = _fake_client({ROOM_MIXED: [_make_event("plain message", event_id="e4")]}) + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._enqueue_from_sync(client, queue, AsyncMock(), {}) + + run(_run()) + + entry: _QueueEntry = queue.get_nowait() + assert entry.agent_id == "helion" + assert entry.routing_reason == "default" + + +def test_unknown_slash_not_enqueued_and_audited(): + """/unknownbot in mixed room → NOT enqueued, audit error written.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + client = _fake_client({ROOM_MIXED: [_make_event("/unknownbot hello", event_id="e5")]}) + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + audit_calls: List[str] = [] + + async def fake_audit(*args, **kwargs): + audit_calls.append(kwargs.get("event", "")) + + async def _run(): + with patch("app.ingress._write_audit", side_effect=fake_audit): + await ingress._enqueue_from_sync(client, queue, AsyncMock(), {}) + + run(_run()) + + assert queue.qsize() == 0 + assert len(audit_calls) >= 1 + assert any("matrix" in e for e in audit_calls) + + +def test_reply_prefixed_with_agent_name_in_mixed_room(): + """Reply in mixed room must start with 'Helion: '.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + sent_texts: List[str] = [] + + async def fake_send(room_id, text, txn_id=None): + sent_texts.append(text) + + async def fake_invoke(http_client, router_url, agent_id, node_id, prompt, session_id): + return "The weather is sunny" + + entry = _QueueEntry( + event=_make_event("/helion weather", event_id="e6"), + room_id=ROOM_MIXED, + agent_id="helion", + enqueue_time=0.0, + routing_reason="slash_command", + is_mixed=True, + ) + fake_client = MagicMock() + fake_client.send_text = AsyncMock(side_effect=fake_send) + + async def _run(): + with patch("app.ingress._invoke_router", side_effect=fake_invoke), \ + patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._process_entry(fake_client, AsyncMock(), entry) + + run(_run()) + + assert len(sent_texts) == 1 + assert sent_texts[0].startswith("Helion: ") + assert "The weather is sunny" in sent_texts[0] + + +def test_session_isolation_per_agent(): + """Two agents in same mixed room must get different session_ids.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + sessions: List[str] = [] + + async def fake_invoke(http_client, router_url, agent_id, node_id, prompt, session_id): + sessions.append(session_id) + return f"reply from {agent_id}" + + entries = [ + _QueueEntry( + event=_make_event("msg", event_id="s1"), + room_id=ROOM_MIXED, agent_id="sofiia", + enqueue_time=0.0, routing_reason="default", is_mixed=True, + ), + _QueueEntry( + event=_make_event("msg", event_id="h1"), + room_id=ROOM_MIXED, agent_id="helion", + enqueue_time=0.0, routing_reason="slash_command", is_mixed=True, + ), + ] + fake_client = MagicMock() + fake_client.send_text = AsyncMock() + + async def _run(): + with patch("app.ingress._invoke_router", side_effect=fake_invoke), \ + patch("app.ingress._write_audit", new=AsyncMock()): + for e in entries: + await ingress._process_entry(fake_client, AsyncMock(), e) + + run(_run()) + + assert len(sessions) == 2 + assert sessions[0] != sessions[1], "Session IDs must differ per agent" + assert "sofiia" in sessions[0] + assert "helion" in sessions[1] + + +def test_direct_room_and_mixed_room_coexist(): + """Regular direct room and mixed room both processed in same sync.""" + ingress = _make_ingress( + direct_raw=f"druid:{ROOM_DIRECT}", + mixed_raw=f"{ROOM_MIXED}=sofiia,helion", + allowed=frozenset({"sofiia", "helion", "druid"}), + ) + client = _fake_client({ + ROOM_DIRECT: [_make_event("direct msg", event_id="d1")], + ROOM_MIXED: [_make_event("/helion mixed msg", event_id="m1")], + }) + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._enqueue_from_sync(client, queue, AsyncMock(), {}) + + run(_run()) + + assert queue.qsize() == 2 + entries_got = [queue.get_nowait() for _ in range(2)] + agents = {e.agent_id for e in entries_got} + assert agents == {"druid", "helion"} + mixed_map = {e.agent_id: e.is_mixed for e in entries_got} + assert mixed_map["druid"] is False + assert mixed_map["helion"] is True + + +def test_rate_limited_mixed_room_event_dropped(): + """Rate-limited sender in mixed room: only first message passes.""" + from app.rate_limit import InMemoryRateLimiter + + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + ingress._rate_limiter = InMemoryRateLimiter(room_rpm=100, sender_rpm=1) + + events = [ + _make_event("hello", event_id=f"rl{i}", sender="@spammer:test") + for i in range(3) + ] + client = _fake_client({ROOM_MIXED: events}) + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + + dropped: List[str] = [] + ingress._on_rate_limited = lambda room, agent, kind: dropped.append(kind) + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._enqueue_from_sync(client, queue, AsyncMock(), {}) + + run(_run()) + + assert queue.qsize() == 1 # only first passes + assert len(dropped) == 2 # two dropped by rate limiter + + +def test_direct_room_reply_has_no_prefix(): + """Reply in single-agent (direct) room must NOT have a prefix.""" + ingress = _make_ingress(direct_raw=f"druid:{ROOM_DIRECT}", allowed=frozenset({"druid"})) + sent_texts: List[str] = [] + + async def fake_send(room_id, text, txn_id=None): + sent_texts.append(text) + + async def fake_invoke(http_client, router_url, agent_id, node_id, prompt, session_id): + return "direct reply no prefix" + + entry = _QueueEntry( + event=_make_event("hello", event_id="dr1"), + room_id=ROOM_DIRECT, agent_id="druid", + enqueue_time=0.0, routing_reason="direct", is_mixed=False, + ) + fake_client = MagicMock() + fake_client.send_text = AsyncMock(side_effect=fake_send) + + async def _run(): + with patch("app.ingress._invoke_router", side_effect=fake_invoke), \ + patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._process_entry(fake_client, AsyncMock(), entry) + + run(_run()) + + assert len(sent_texts) == 1 + assert sent_texts[0] == "direct reply no prefix" diff --git a/tests/test_matrix_bridge_mixed_routing.py b/tests/test_matrix_bridge_mixed_routing.py new file mode 100644 index 00000000..9bf3888f --- /dev/null +++ b/tests/test_matrix_bridge_mixed_routing.py @@ -0,0 +1,227 @@ +""" +Tests for services/matrix-bridge-dagi/app/mixed_routing.py + +Covers: + - parse_mixed_room_map: valid, errors, defaults + - route_message: slash, @mention, colon-mention, fallback, unknown agent + - reply_prefix: mixed vs single-agent rooms +""" + +import sys +from pathlib import Path + +import pytest + +_BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi" +if str(_BRIDGE) not in sys.path: + sys.path.insert(0, str(_BRIDGE)) + +from app.mixed_routing import ( # noqa: E402 + MixedRoomConfig, + MixedRoom, + parse_mixed_room_map, + route_message, + reply_prefix, + REASON_SLASH, + REASON_AT_MENTION, + REASON_COLON_MENTION, + REASON_DEFAULT, +) + +ROOM_X = "!roomX:daarion.space" +ROOM_Y = "!roomY:daarion.space" +ALLOWED = frozenset({"sofiia", "helion", "druid", "nutra"}) + + +# ── Parsing ──────────────────────────────────────────────────────────────────── + +def test_parse_single_mixed_room(): + raw = f"{ROOM_X}=sofiia,helion" + cfg = parse_mixed_room_map(raw, "", ALLOWED) + assert cfg.total_rooms == 1 + assert cfg.agents_for_room(ROOM_X) == ["sofiia", "helion"] + assert cfg.default_agent(ROOM_X) == "sofiia" # first in list + + +def test_parse_two_mixed_rooms(): + raw = f"{ROOM_X}=sofiia,helion;{ROOM_Y}=druid,nutra" + cfg = parse_mixed_room_map(raw, "", ALLOWED) + assert cfg.total_rooms == 2 + assert cfg.agents_for_room(ROOM_Y) == ["druid", "nutra"] + assert cfg.default_agent(ROOM_Y) == "druid" + + +def test_parse_explicit_default(): + raw = f"{ROOM_X}=sofiia,helion" + defaults = f"{ROOM_X}=helion" + cfg = parse_mixed_room_map(raw, defaults, ALLOWED) + assert cfg.default_agent(ROOM_X) == "helion" + + +def test_parse_explicit_default_not_in_agents_raises(): + raw = f"{ROOM_X}=sofiia,helion" + defaults = f"{ROOM_X}=druid" # druid not in agents for ROOM_X + with pytest.raises(ValueError, match="Default agent"): + parse_mixed_room_map(raw, defaults, ALLOWED) + + +def test_parse_duplicate_room_raises(): + raw = f"{ROOM_X}=sofiia;{ROOM_X}=helion" + with pytest.raises(ValueError, match="Duplicate room_id"): + parse_mixed_room_map(raw, "", ALLOWED) + + +def test_parse_unknown_agent_raises(): + raw = f"{ROOM_X}=sofiia,unknown_bot" + with pytest.raises(ValueError, match="not in allowed_agents"): + parse_mixed_room_map(raw, "", ALLOWED) + + +def test_parse_bad_room_id_raises(): + raw = "not-a-room-id=sofiia" + with pytest.raises(ValueError, match="Invalid room_id"): + parse_mixed_room_map(raw, "", ALLOWED) + + +def test_parse_empty_map_returns_empty(): + cfg = parse_mixed_room_map("", "", ALLOWED) + assert cfg.total_rooms == 0 + + +def test_parse_semicolons_with_spaces(): + raw = f" {ROOM_X}=sofiia,helion ; {ROOM_Y}=druid " + cfg = parse_mixed_room_map(raw, "", ALLOWED) + assert cfg.total_rooms == 2 + + +def test_is_mixed_true_false(): + raw = f"{ROOM_X}=sofiia,helion" + cfg = parse_mixed_room_map(raw, "", ALLOWED) + assert cfg.is_mixed(ROOM_X) is True + assert cfg.is_mixed(ROOM_Y) is False + + +def test_as_summary_shape(): + raw = f"{ROOM_X}=sofiia,helion;{ROOM_Y}=druid" + cfg = parse_mixed_room_map(raw, "", ALLOWED) + summary = cfg.as_summary() + assert len(summary) == 2 + for entry in summary: + assert "room_id" in entry + assert "agents" in entry + assert "default_agent" in entry + + +# ── Routing — slash command ──────────────────────────────────────────────────── + +def _make_cfg(room_id: str = ROOM_X, agents=("sofiia", "helion")) -> MixedRoomConfig: + raw = f"{room_id}={','.join(agents)}" + return parse_mixed_room_map(raw, "", frozenset(agents)) + + +def test_slash_routes_to_correct_agent(): + cfg = _make_cfg() + agent, reason, body = route_message("/helion tell me the weather", ROOM_X, cfg, frozenset({"sofiia", "helion"})) + assert agent == "helion" + assert reason == REASON_SLASH + assert body == "tell me the weather" + + +def test_slash_case_insensitive(): + cfg = _make_cfg() + agent, reason, _ = route_message("/Sofiia hello", ROOM_X, cfg, frozenset({"sofiia", "helion"})) + assert agent == "sofiia" + assert reason == REASON_SLASH + + +def test_slash_empty_body_keeps_original(): + cfg = _make_cfg() + agent, reason, body = route_message("/helion", ROOM_X, cfg, frozenset({"sofiia", "helion"})) + assert agent == "helion" + # body fallback: original text + assert "/helion" in body + + +def test_slash_unknown_agent_returns_none(): + cfg = _make_cfg() + agent, reason, _ = route_message("/druid hello", ROOM_X, cfg, frozenset({"sofiia", "helion"})) + assert agent is None + assert "unknown_slash_druid" in reason + + +# ── Routing — @mention ──────────────────────────────────────────────────────── + +def test_at_mention_routes_correctly(): + cfg = _make_cfg() + agent, reason, body = route_message("@sofiia what is the status?", ROOM_X, cfg, frozenset({"sofiia", "helion"})) + assert agent == "sofiia" + assert reason == REASON_AT_MENTION + assert body == "what is the status?" + + +def test_at_mention_unknown_falls_through_to_default(): + cfg = _make_cfg() + # @unknown_bot — not in agents → falls through to colon check, then default + agent, reason, _ = route_message("@unknown_bot hello", ROOM_X, cfg, frozenset({"sofiia", "helion"})) + assert agent == "sofiia" # default + assert reason == REASON_DEFAULT + + +# ── Routing — colon mention ─────────────────────────────────────────────────── + +def test_colon_mention_routes_correctly(): + cfg = _make_cfg() + agent, reason, body = route_message("sofiia: can you help?", ROOM_X, cfg, frozenset({"sofiia", "helion"})) + assert agent == "sofiia" + assert reason == REASON_COLON_MENTION + assert body == "can you help?" + + +def test_colon_mention_unknown_falls_to_default(): + cfg = _make_cfg() + agent, reason, _ = route_message("druid: hello", ROOM_X, cfg, frozenset({"sofiia", "helion"})) + assert agent == "sofiia" + assert reason == REASON_DEFAULT + + +# ── Routing — priority order ────────────────────────────────────────────────── + +def test_slash_beats_at_mention(): + """If text starts with slash, it should be slash-routed even if it also mentions @.""" + cfg = _make_cfg() + agent, reason, _ = route_message("/helion @sofiia hello", ROOM_X, cfg, frozenset({"sofiia", "helion"})) + assert reason == REASON_SLASH + assert agent == "helion" + + +# ── Routing — default fallback ──────────────────────────────────────────────── + +def test_plain_message_routes_to_default(): + cfg = _make_cfg() + agent, reason, body = route_message("plain message no routing token", ROOM_X, cfg, frozenset({"sofiia", "helion"})) + assert agent == "sofiia" + assert reason == REASON_DEFAULT + assert body == "plain message no routing token" + + +def test_no_mapping_for_room_returns_none(): + cfg = _make_cfg(room_id=ROOM_X) + agent, reason, _ = route_message("hello", ROOM_Y, cfg, ALLOWED) # ROOM_Y not in config + assert agent is None + assert reason == "no_mapping" + + +# ── Reply prefix ────────────────────────────────────────────────────────────── + +def test_reply_prefix_mixed_room(): + assert reply_prefix("sofiia", is_mixed=True) == "Sofiia: " + assert reply_prefix("helion", is_mixed=True) == "Helion: " + + +def test_reply_prefix_single_room_empty(): + assert reply_prefix("sofiia", is_mixed=False) == "" + + +def test_reply_prefix_capitalises_first_letter(): + assert reply_prefix("druid", is_mixed=True) == "Druid: " + assert reply_prefix("NUTRA", is_mixed=True) == "Nutra: " # capitalize() normalises case