From fe6e3d30aed2ef5cc6c808f79c03e53470a79dec Mon Sep 17 00:00:00 2001 From: Apple Date: Thu, 5 Mar 2026 01:50:04 -0800 Subject: [PATCH] feat(matrix-bridge-dagi): add operator allowlist for control commands (M3.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New: app/control.py - ControlConfig: operator_allowlist + control_rooms (frozensets) - parse_control_config(): validates @user:server + !room:server formats, fail-fast - parse_command(): parses !verb subcommand [args] [key=value] up to 512 chars - check_authorization(): AND(is_control_room, is_operator) → (bool, reason) - Reply helpers: not_implemented, unknown_command, unauthorized, help - KNOWN_VERBS: runbook, status, help (M3.1+ stubs) - MAX_CMD_LEN=512, MAX_CMD_TOKENS=20 ingress.py: - _try_control(): dispatch for control rooms (authorized → audit + reply, unauthorized → audit + optional ⛔) - join control rooms on startup - _enqueue_from_sync: control rooms processed first, never forwarded to agents - on_control_command(sender, verb, subcommand) metric callback - CONTROL_UNAUTHORIZED_BEHAVIOR: "ignore" | "reply_error" Audit events: matrix.control.command — authorised command (verb, subcommand, args, kwargs) matrix.control.unauthorized — rejected by allowlist (reason: not_operator | not_control_room) matrix.control.unknown_cmd — authorised but unrecognised verb Config + main: - bridge_operator_allowlist, bridge_control_rooms, control_unauthorized_behavior - matrix_bridge_control_commands_total{sender,verb,subcommand} counter - /health: control_channel section (enabled, rooms_count, operators_count, behavior) - /bridge/mappings: control_rooms + control_operators_count - docker-compose: BRIDGE_OPERATOR_ALLOWLIST, BRIDGE_CONTROL_ROOMS, CONTROL_UNAUTHORIZED_BEHAVIOR Tests: 40 new → 148 total pass Made-with: Cursor --- docker-compose.matrix-bridge-node1.yml | 8 + services/matrix-bridge-dagi/app/config.py | 13 +- services/matrix-bridge-dagi/app/control.py | 263 +++++++++++ services/matrix-bridge-dagi/app/ingress.py | 137 +++++- services/matrix-bridge-dagi/app/main.py | 43 +- tests/test_matrix_bridge_control.py | 486 +++++++++++++++++++++ 6 files changed, 945 insertions(+), 5 deletions(-) create mode 100644 services/matrix-bridge-dagi/app/control.py create mode 100644 tests/test_matrix_bridge_control.py diff --git a/docker-compose.matrix-bridge-node1.yml b/docker-compose.matrix-bridge-node1.yml index 1267fe35..4cd401f3 100644 --- a/docker-compose.matrix-bridge-node1.yml +++ b/docker-compose.matrix-bridge-node1.yml @@ -60,6 +60,14 @@ services: # "!roomX:server=helion;!roomY:server=druid" - BRIDGE_MIXED_DEFAULTS=${BRIDGE_MIXED_DEFAULTS:-} + # ── M3.0: Operator control channel ────────────────────────────────── + # Comma-separated Matrix user IDs allowed to issue !commands + - BRIDGE_OPERATOR_ALLOWLIST=${BRIDGE_OPERATOR_ALLOWLIST:-} + # Comma-separated room IDs designated as ops control channels + - BRIDGE_CONTROL_ROOMS=${BRIDGE_CONTROL_ROOMS:-} + # "ignore" (silent) | "reply_error" (⛔ reply to unauthorised attempts) + - CONTROL_UNAUTHORIZED_BEHAVIOR=${CONTROL_UNAUTHORIZED_BEHAVIOR:-ignore} + # ── M2.2: Mixed room guard rails ──────────────────────────────────── # Fail-fast if any room defines more agents than this - MAX_AGENTS_PER_MIXED_ROOM=${MAX_AGENTS_PER_MIXED_ROOM:-5} diff --git a/services/matrix-bridge-dagi/app/config.py b/services/matrix-bridge-dagi/app/config.py index 5d2eb883..c418a72f 100644 --- a/services/matrix-bridge-dagi/app/config.py +++ b/services/matrix-bridge-dagi/app/config.py @@ -1,5 +1,5 @@ """ -matrix-bridge-dagi — configuration and validation (M2.1 + M2.2: mixed rooms + guard rails) +matrix-bridge-dagi — configuration and validation (M2.1 + M2.2 + M3.0) """ import os from dataclasses import dataclass, field @@ -46,6 +46,14 @@ class BridgeConfig: unknown_agent_behavior: str # "ignore" | "reply_error" mixed_concurrency_cap: int # max parallel invokes per (room, agent); 0 = unlimited + # M3.0: Operator control channel + # "@ivan:daarion.space,@sergiy:daarion.space" + bridge_operator_allowlist: str + # "!opsroom:server,!opsroom2:server2" + bridge_control_rooms: str + # "ignore" | "reply_error" (send ⛔ to room on unauthorized attempt) + control_unauthorized_behavior: str + # Service identity node_id: str build_sha: str @@ -88,6 +96,9 @@ def load_config() -> BridgeConfig: max_slash_len=max(4, int(_optional("MAX_SLASH_LEN", "32"))), unknown_agent_behavior=_optional("UNKNOWN_AGENT_BEHAVIOR", "ignore"), mixed_concurrency_cap=max(0, int(_optional("MIXED_CONCURRENCY_CAP", "1"))), + bridge_operator_allowlist=_optional("BRIDGE_OPERATOR_ALLOWLIST", ""), + bridge_control_rooms=_optional("BRIDGE_CONTROL_ROOMS", ""), + control_unauthorized_behavior=_optional("CONTROL_UNAUTHORIZED_BEHAVIOR", "ignore"), node_id=_optional("NODE_ID", "NODA1"), build_sha=_optional("BUILD_SHA", "dev"), build_time=_optional("BUILD_TIME", "local"), diff --git a/services/matrix-bridge-dagi/app/control.py b/services/matrix-bridge-dagi/app/control.py new file mode 100644 index 00000000..3495843b --- /dev/null +++ b/services/matrix-bridge-dagi/app/control.py @@ -0,0 +1,263 @@ +""" +Matrix Bridge — Control Command Layer (M3.0) + +Handles operator commands from designated control rooms. + +Access policy (AND): + 1. Message arrives in a BRIDGE_CONTROL_ROOM + 2. Sender is in BRIDGE_OPERATOR_ALLOWLIST + 3. Message starts with "!" prefix (e.g. "!runbook start ...") + +Design principles: + - Bridge is a TRANSPORT only — it never executes scripts directly. + - All actions go via sofiia-console internal API (M3.1+). + - Every command attempt is audited regardless of authorization. + - Unknown commands acknowledged but not executed (forward-compatible). + +Audit events emitted: + matrix.control.command — authorised command recognised + matrix.control.unauthorized — command from non-operator or wrong room + matrix.control.unknown_cmd — authorised but unrecognised verb +""" + +import logging +import re +from dataclasses import dataclass, field +from typing import Dict, FrozenSet, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +# ── Constants ───────────────────────────────────────────────────────────────── + +# Supported control verbs (M3.1+ will implement them fully) +VERB_RUNBOOK = "runbook" +VERB_STATUS = "status" +VERB_HELP = "help" + +KNOWN_VERBS: FrozenSet[str] = frozenset({VERB_RUNBOOK, VERB_STATUS, VERB_HELP}) + +# Max command line length to guard against garbage injection +_MAX_CMD_LEN = 512 +# Max number of tokens in a single command +_MAX_CMD_TOKENS = 20 + +# Matrix user ID format: @localpart:server +_MATRIX_USER_RE = re.compile(r"^@[A-Za-z0-9._\-/=+]+:[A-Za-z0-9.\-]+$") +# Room ID format: !localpart:server +_ROOM_ID_RE = re.compile(r"^![A-Za-z0-9\-_.]+:[A-Za-z0-9\-_.]+$") + + +# ── Data structures ──────────────────────────────────────────────────────────── + +@dataclass(frozen=True) +class ControlCommand: + """Parsed control command from a Matrix message.""" + verb: str # e.g. "runbook" + subcommand: str # e.g. "start", "next", "complete", "evidence", "status" + args: Tuple[str, ...] # remaining positional args + kwargs: Dict[str, str] # key=value pairs parsed from args (e.g. node=NODA1) + raw: str # original message text + is_known: bool # True if verb in KNOWN_VERBS + + @classmethod + def from_tokens(cls, tokens: List[str], raw: str) -> "ControlCommand": + """Build ControlCommand from pre-split tokens (first token must not include '!').""" + verb = tokens[0].lower() if tokens else "" + subcommand = tokens[1].lower() if len(tokens) > 1 else "" + remaining = tokens[2:] if len(tokens) > 2 else [] + + positional: List[str] = [] + kw: Dict[str, str] = {} + for token in remaining: + if "=" in token: + k, _, v = token.partition("=") + kw[k.lower().strip()] = v.strip() + else: + positional.append(token) + + return cls( + verb=verb, + subcommand=subcommand, + args=tuple(positional), + kwargs=kw, + raw=raw, + is_known=verb in KNOWN_VERBS, + ) + + +@dataclass +class ControlConfig: + """ + Parsed operator access policy for the control channel. + + operator_allowlist: Frozenset of Matrix user IDs allowed to issue commands. + control_rooms: Frozenset of room IDs designated as control channels. + """ + operator_allowlist: FrozenSet[str] = field(default_factory=frozenset) + control_rooms: FrozenSet[str] = field(default_factory=frozenset) + + @property + def is_enabled(self) -> bool: + """Control channel is effective only when both sets are non-empty.""" + return bool(self.operator_allowlist and self.control_rooms) + + +# ── Parsers ──────────────────────────────────────────────────────────────────── + +def parse_control_config( + raw_allowlist: str, + raw_control_rooms: str, +) -> ControlConfig: + """ + Parse BRIDGE_OPERATOR_ALLOWLIST and BRIDGE_CONTROL_ROOMS. + + Allowlist format: "@ivan:daarion.space,@sergiy:daarion.space" + Control rooms fmt: "!opsroom:server,!opsroom2:server2" + + Raises ValueError on: + - Malformed Matrix user ID + - Malformed room ID + """ + operators: List[str] = [] + errors: List[str] = [] + + for entry in raw_allowlist.split(","): + uid = entry.strip() + if not uid: + continue + if not _MATRIX_USER_RE.match(uid): + errors.append(f"Invalid operator user_id: {uid!r}") + else: + operators.append(uid) + + rooms: List[str] = [] + for entry in raw_control_rooms.split(","): + rid = entry.strip() + if not rid: + continue + if not _ROOM_ID_RE.match(rid): + errors.append(f"Invalid control room_id: {rid!r}") + else: + rooms.append(rid) + + if errors: + raise ValueError(f"Control config parse errors: {'; '.join(errors)}") + + cfg = ControlConfig( + operator_allowlist=frozenset(operators), + control_rooms=frozenset(rooms), + ) + if cfg.is_enabled: + logger.info( + "Control channel enabled: %d operators, %d rooms", + len(operators), len(rooms), + ) + else: + logger.info("Control channel disabled (empty allowlist or no control rooms)") + return cfg + + +# ── Message inspection ──────────────────────────────────────────────────────── + +def is_control_message(text: str) -> bool: + """Returns True if message looks like a control command (starts with '!').""" + return bool(text and text.strip().startswith("!")) + + +def is_control_room(room_id: str, config: ControlConfig) -> bool: + return room_id in config.control_rooms + + +def is_operator(sender: str, config: ControlConfig) -> bool: + return sender in config.operator_allowlist + + +def parse_command(text: str) -> Optional[ControlCommand]: + """ + Parse a control message into a ControlCommand. + Returns None if text is not a control command or is malformed/too long. + """ + stripped = text.strip() + if not stripped.startswith("!"): + return None + if len(stripped) > _MAX_CMD_LEN: + logger.warning("Control command too long (%d chars) — rejected", len(stripped)) + return None + + # Strip leading '!' + body = stripped[1:] + tokens = body.split() + if not tokens: + return None + if len(tokens) > _MAX_CMD_TOKENS: + logger.warning("Control command has too many tokens (%d) — rejected", len(tokens)) + return None + + return ControlCommand.from_tokens(tokens, raw=stripped) + + +# ── Authorization check ─────────────────────────────────────────────────────── + +def check_authorization( + sender: str, + room_id: str, + config: ControlConfig, +) -> Tuple[bool, str]: + """ + Returns (authorized: bool, rejection_reason: str). + + Reasons: + - "not_operator": sender not in allowlist + - "not_control_room": room not in control_rooms + - "ok": authorized + """ + if not is_control_room(room_id, config): + return False, "not_control_room" + if not is_operator(sender, config): + logger.warning( + "Unauthorized control attempt: sender=%s room=%s not in allowlist", + sender, room_id, + ) + return False, "not_operator" + return True, "ok" + + +# ── Reply helpers ───────────────────────────────────────────────────────────── + +def not_implemented_reply(cmd: ControlCommand) -> str: + """Reply for known commands not yet implemented (M3.0 stub).""" + return ( + f"✅ Command acknowledged: `{cmd.raw}`\n" + f"⏳ `!{cmd.verb} {cmd.subcommand}` — implementation pending (M3.1+)." + ) + + +def unknown_command_reply(cmd: ControlCommand) -> str: + """Reply for unrecognised verbs.""" + return ( + f"⚠️ Unknown command: `{cmd.raw}`\n" + f"Known verbs: {', '.join(sorted(KNOWN_VERBS))}.\n" + f"Type `!help` for usage." + ) + + +def unauthorized_reply(reason: str) -> str: + """Reply for unauthorized command attempts (sent only when behavior=reply_error).""" + if reason == "not_operator": + return "⛔ Not authorised: your Matrix ID is not in the operator allowlist." + return "⛔ Not authorised: this room is not a control channel." + + +def help_reply() -> str: + """Brief help text.""" + return ( + "**DAGI Bridge — Control Commands**\n\n" + "`!runbook start [node=NODA1]` — Start a runbook run\n" + "`!runbook next ` — Advance to next step\n" + "`!runbook complete step= status=ok` — Mark step complete\n" + "`!runbook evidence ` — Get evidence artifact path\n" + "`!runbook status ` — Show current run state\n" + "`!status` — Bridge health summary\n" + "`!help` — This message\n\n" + "_Only authorised operators can issue control commands._" + ) diff --git a/services/matrix-bridge-dagi/app/ingress.py b/services/matrix-bridge-dagi/app/ingress.py index 13eccee3..8659c49b 100644 --- a/services/matrix-bridge-dagi/app/ingress.py +++ b/services/matrix-bridge-dagi/app/ingress.py @@ -1,5 +1,5 @@ """ -Matrix Ingress + Egress Loop — Phase M1.4 + H1 + H2 + H3 + M2.1 + M2.2 (mixed rooms hardening) +Matrix Ingress + Egress Loop — Phase M1.4 + H1 + H2 + H3 + M2.1 + M2.2 + M3.0 (control channel) Architecture (H2): Reader task → asyncio.Queue(maxsize) → N Worker tasks @@ -33,6 +33,12 @@ from typing import Any, Callable, Dict, List, Optional import httpx +from .control import ( + ControlConfig, ControlCommand, + check_authorization, parse_command, is_control_message, + not_implemented_reply, unknown_command_reply, unauthorized_reply, help_reply, + VERB_HELP, +) from .matrix_client import MatrixClient from .mixed_routing import ( MixedRoomConfig, route_message, reply_prefix, @@ -178,6 +184,9 @@ class MatrixIngressLoop: unknown_agent_behavior: str = "ignore", # "ignore" | "reply_error" max_slash_len: int = 32, mixed_concurrency_cap: int = 1, # 0 = unlimited + # M3.0: control channel + control_config: Optional[ControlConfig] = None, + control_unauthorized_behavior: str = "ignore", # "ignore" | "reply_error" # Callbacks on_message_received: Optional[Callable[[str, str], None]] = None, on_message_replied: Optional[Callable[[str, str, str], None]] = None, @@ -190,6 +199,7 @@ class MatrixIngressLoop: on_queue_wait: Optional[Callable[[str, float], None]] = None, on_routed: Optional[Callable[[str, str], None]] = None, on_route_rejected: Optional[Callable[[str, str], None]] = None, + on_control_command: Optional[Callable[[str, str, str], None]] = None, ) -> None: self._hs_url = matrix_homeserver_url self._token = matrix_access_token @@ -214,11 +224,14 @@ class MatrixIngressLoop: self._on_send_latency = on_send_latency self._on_queue_wait = on_queue_wait self._mixed_room_config = mixed_room_config + self._control_config = control_config + self._control_unauthorized_behavior = control_unauthorized_behavior self._unknown_agent_behavior = unknown_agent_behavior self._max_slash_len = max_slash_len self._mixed_concurrency_cap = mixed_concurrency_cap self._on_routed = on_routed self._on_route_rejected = on_route_rejected + self._on_control_command = on_control_command # Lazily populated semaphores keyed by "{room_id}:{agent_id}" self._concurrency_locks: Dict[str, asyncio.Semaphore] = {} self._next_batch: Optional[str] = None @@ -281,6 +294,17 @@ class MatrixIngressLoop: await client.join_room(room_id) except Exception as exc: logger.warning("Could not join mixed room %s: %s", room_id, exc) + if self._control_config and self._control_config.is_enabled: + for room_id in self._control_config.control_rooms: + try: + await client.join_room(room_id) + except Exception as exc: + logger.warning("Could not join control room %s: %s", room_id, exc) + logger.info( + "Control channel: %d rooms, %d operators", + len(self._control_config.control_rooms), + len(self._control_config.operator_allowlist), + ) async with httpx.AsyncClient() as http_client: # Start workers @@ -355,6 +379,13 @@ class MatrixIngressLoop: http_client: httpx.AsyncClient, sync_resp: Dict[str, Any], ) -> None: + # M3.0: Control rooms — handled first, not forwarded to agents + if self._control_config and self._control_config.is_enabled: + for room_id in self._control_config.control_rooms: + messages = client.extract_room_messages(sync_resp, room_id) + for event in messages: + await self._try_control(client, http_client, event, room_id) + # Regular rooms: 1 room → 1 agent (M1 / M2.0) for mapping in self._room_map.mappings: if mapping.agent_id not in self._room_map.allowed_agents: @@ -559,6 +590,110 @@ class MatrixIngressLoop: data={"queue_max": self._queue_max, "sender": sender}, ) + # ── Control command handler ──────────────────────────────────────────────── + + async def _try_control( + self, + client: MatrixClient, + http_client: httpx.AsyncClient, + event: Dict[str, Any], + room_id: str, + ) -> None: + """ + Process a message from a control room. + + Non-command messages (not starting with '!') are silently ignored. + All command attempts are audited regardless of authorization. + """ + assert self._control_config is not None + event_id = event.get("event_id", "") + sender = event.get("sender", "") + text = event.get("content", {}).get("body", "").strip() + + if not text or not is_control_message(text): + return # not a command, ignore + + client.mark_seen(event_id) + + # Authorization check + authorized, rejection_reason = check_authorization(sender, room_id, self._control_config) + + if not authorized: + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.control.unauthorized", + agent_id="control", node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="error", error_code=rejection_reason, + data={"sender": sender, "command_preview": text[:80]}, + ) + logger.warning( + "Unauthorized control command: sender=%s room=%s reason=%s cmd=%r", + sender, room_id, rejection_reason, text[:60], + ) + if self._control_unauthorized_behavior == "reply_error": + try: + txn_id = MatrixClient.make_txn_id(room_id, event_id + "_unauth") + await client.send_text(room_id, unauthorized_reply(rejection_reason), txn_id) + except Exception as exc: + logger.warning("Could not send unauthorized reply: %s", exc) + return + + # Parse command + cmd = parse_command(text) + if cmd is None: + logger.warning("Control message from %s could not be parsed: %r", sender, text[:60]) + return + + # Metric callback + if self._on_control_command: + self._on_control_command(sender, cmd.verb, cmd.subcommand) + + # Audit every authorized command + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.control.command", + agent_id="control", node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="ok", + data={ + "sender": sender, + "verb": cmd.verb, + "subcommand": cmd.subcommand, + "args": list(cmd.args), + "kwargs": dict(cmd.kwargs), + "is_known": cmd.is_known, + }, + ) + + logger.info( + "Control command: sender=%s verb=%s sub=%s args=%s", + sender, cmd.verb, cmd.subcommand, cmd.args, + ) + + # Build reply + txn_id = MatrixClient.make_txn_id(room_id, event_id + "_ctrl") + if cmd.verb == VERB_HELP: + reply_text = help_reply() + elif not cmd.is_known: + reply_text = unknown_command_reply(cmd) + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.control.unknown_cmd", + agent_id="control", node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="error", error_code="unknown_verb", + data={"verb": cmd.verb, "sender": sender}, + ) + else: + # M3.1+ will implement actual runbook/status commands + reply_text = not_implemented_reply(cmd) + + try: + await client.send_text(room_id, reply_text, txn_id) + except Exception as exc: + logger.error("Could not send control reply: %s", exc) + # ── Worker ───────────────────────────────────────────────────────────────── async def _worker( diff --git a/services/matrix-bridge-dagi/app/main.py b/services/matrix-bridge-dagi/app/main.py index d7f746f2..a297931e 100644 --- a/services/matrix-bridge-dagi/app/main.py +++ b/services/matrix-bridge-dagi/app/main.py @@ -32,6 +32,7 @@ except ImportError: # pragma: no cover _PROM_OK = False from .config import BridgeConfig, load_config +from .control import ControlConfig, parse_control_config from .ingress import MatrixIngressLoop from .mixed_routing import MixedRoomConfig, parse_mixed_room_map from .rate_limit import InMemoryRateLimiter @@ -120,6 +121,12 @@ if _PROM_OK: "matrix_bridge_active_room_agent_locks", "Number of room-agent pairs currently holding a concurrency lock", ) + # M3.0: Control channel + _control_commands_total = Counter( + "matrix_bridge_control_commands_total", + "Total control commands received from authorized operators", + ["sender", "verb", "subcommand"], + ) # ── Startup state ───────────────────────────────────────────────────────────── _START_TIME = time.monotonic() @@ -129,6 +136,7 @@ _matrix_reachable: Optional[bool] = None _gateway_reachable: Optional[bool] = None _room_map: Optional[RoomMappingConfig] = None _mixed_room_config: Optional[MixedRoomConfig] = None +_control_config: Optional[ControlConfig] = None _rate_limiter: Optional[InMemoryRateLimiter] = None _ingress_loop: Optional["MatrixIngressLoop"] = None # for /health queue_size _ingress_task: Optional[asyncio.Task] = None @@ -150,7 +158,7 @@ async def _probe_url(url: str, timeout: float = 5.0) -> bool: @asynccontextmanager async def lifespan(app_: Any): global _cfg, _config_error, _matrix_reachable, _gateway_reachable - global _room_map, _mixed_room_config, _rate_limiter, _ingress_loop + global _room_map, _mixed_room_config, _control_config, _rate_limiter, _ingress_loop try: _cfg = load_config() @@ -186,13 +194,24 @@ async def lifespan(app_: Any): _cfg.rate_limit_room_rpm, _cfg.rate_limit_sender_rpm, ) + # M3.0: Operator control channel + if _cfg.bridge_operator_allowlist or _cfg.bridge_control_rooms: + _control_config = parse_control_config( + _cfg.bridge_operator_allowlist, + _cfg.bridge_control_rooms, + ) + else: + _control_config = None + mixed_count = _mixed_room_config.total_rooms if _mixed_room_config else 0 + ctrl_rooms = len(_control_config.control_rooms) if _control_config else 0 + ctrl_ops = len(_control_config.operator_allowlist) if _control_config else 0 logger.info( "✅ matrix-bridge-dagi started | node=%s build=%s homeserver=%s " - "agents=%s mappings=%d mixed_rooms=%d", + "agents=%s mappings=%d mixed_rooms=%d ctrl_rooms=%d ctrl_operators=%d", _cfg.node_id, _cfg.build_sha, _cfg.matrix_homeserver_url, list(_cfg.bridge_allowed_agents), - _room_map.total_mappings, mixed_count, + _room_map.total_mappings, mixed_count, ctrl_rooms, ctrl_ops, ) # Connectivity smoke probes (non-blocking failures) @@ -274,6 +293,13 @@ async def lifespan(app_: Any): if _PROM_OK: _route_rejected_total.labels(room_id=room_id, reason=reason).inc() + # M3.0 callbacks + def _on_control_command(sender: str, verb: str, subcommand: str) -> None: + if _PROM_OK: + _control_commands_total.labels( + sender=sender, verb=verb, subcommand=subcommand + ).inc() + ingress = MatrixIngressLoop( matrix_homeserver_url=_cfg.matrix_homeserver_url, matrix_access_token=_cfg.matrix_access_token, @@ -302,6 +328,9 @@ async def lifespan(app_: Any): on_queue_wait=_on_queue_wait, on_routed=_on_routed, on_route_rejected=_on_route_rejected, + control_config=_control_config, + control_unauthorized_behavior=_cfg.control_unauthorized_behavior, + on_control_command=_on_control_command, ) logger.info( "✅ Backpressure queue: max=%d workers=%d drain_timeout=%.1fs", @@ -400,6 +429,12 @@ async def health() -> Dict[str, Any]: "concurrency_cap": _cfg.mixed_concurrency_cap, "active_room_agent_locks": _ingress_loop.active_lock_count if _ingress_loop else 0, }, + "control_channel": { + "enabled": _control_config.is_enabled if _control_config else False, + "control_rooms_count": len(_control_config.control_rooms) if _control_config else 0, + "operators_count": len(_control_config.operator_allowlist) if _control_config else 0, + "unauthorized_behavior": _cfg.control_unauthorized_behavior, + }, } @@ -424,6 +459,8 @@ async def bridge_mappings() -> Dict[str, Any]: "mappings": _room_map.as_summary(), "mixed_rooms_total": _mixed_room_config.total_rooms if _mixed_room_config else 0, "mixed_rooms": _mixed_room_config.as_summary() if _mixed_room_config else [], + "control_rooms": sorted(_control_config.control_rooms) if _control_config else [], + "control_operators_count": len(_control_config.operator_allowlist) if _control_config else 0, } diff --git a/tests/test_matrix_bridge_control.py b/tests/test_matrix_bridge_control.py new file mode 100644 index 00000000..90dcb35e --- /dev/null +++ b/tests/test_matrix_bridge_control.py @@ -0,0 +1,486 @@ +""" +Tests for services/matrix-bridge-dagi/app/control.py and M3.0 ingress control handling. + +Covers: + - parse_control_config: valid, invalid user_ids, invalid room_ids + - ControlConfig.is_enabled: empty sets + - is_control_message / is_control_room / is_operator + - parse_command: valid, too long, too many tokens, no subcommand + - ControlCommand.from_tokens: verb, subcommand, positional, kwargs + - check_authorization: ok, not_operator, not_control_room + - Reply helpers: not_implemented, unknown_command, unauthorized, help + - Ingress _try_control: authorized command audited, unauthorized audited, non-cmd ignored + - on_control_command callback fires + - CONTROL_UNAUTHORIZED_BEHAVIOR=reply_error sends ⛔ message +""" + +import asyncio +import sys +from pathlib import Path +from typing import Any, Dict, List +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +_BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi" +if str(_BRIDGE) not in sys.path: + sys.path.insert(0, str(_BRIDGE)) + +from app.control import ( # noqa: E402 + ControlConfig, + ControlCommand, + parse_control_config, + parse_command, + is_control_message, + is_control_room, + is_operator, + check_authorization, + not_implemented_reply, + unknown_command_reply, + unauthorized_reply, + help_reply, + VERB_RUNBOOK, + VERB_HELP, + KNOWN_VERBS, +) +from app.ingress import MatrixIngressLoop # noqa: E402 +from app.room_mapping import parse_room_map # noqa: E402 + +# ── Constants ──────────────────────────────────────────────────────────────── + +CTRL_ROOM = "!opsroom:daarion.space" +CTRL_ROOM2 = "!opsroom2:daarion.space" +OP1 = "@ivan:daarion.space" +OP2 = "@sergiy:daarion.space" +NON_OP = "@stranger:matrix.org" +ALLOWED_AGENTS = frozenset({"sofiia"}) + + +def run(coro): + return asyncio.run(coro) + + +def _make_event(body: str, event_id: str = "e1", sender: str = OP1) -> Dict[str, Any]: + return { + "event_id": event_id, + "sender": sender, + "type": "m.room.message", + "content": {"msgtype": "m.text", "body": body}, + } + + +def _make_ctrl_config( + allowlist: str = f"{OP1},{OP2}", + rooms: str = CTRL_ROOM, +) -> ControlConfig: + return parse_control_config(allowlist, rooms) + + +def _make_ingress(ctrl_config=None) -> MatrixIngressLoop: + room_map = parse_room_map("", ALLOWED_AGENTS) + return MatrixIngressLoop( + matrix_homeserver_url="https://matrix.test", + matrix_access_token="tok", + matrix_user_id="@bridge:test", + router_url="http://router:8000", + node_id="test", + room_map=room_map, + control_config=ctrl_config or _make_ctrl_config(), + queue_max_events=10, + worker_concurrency=1, + ) + + +# ── parse_control_config ───────────────────────────────────────────────────── + +def test_parse_valid_allowlist_and_rooms(): + cfg = parse_control_config(f"{OP1},{OP2}", CTRL_ROOM) + assert OP1 in cfg.operator_allowlist + assert OP2 in cfg.operator_allowlist + assert CTRL_ROOM in cfg.control_rooms + + +def test_parse_multiple_control_rooms(): + cfg = parse_control_config(OP1, f"{CTRL_ROOM},{CTRL_ROOM2}") + assert len(cfg.control_rooms) == 2 + + +def test_parse_empty_strings_return_empty_config(): + cfg = parse_control_config("", "") + assert cfg.operator_allowlist == frozenset() + assert cfg.control_rooms == frozenset() + assert cfg.is_enabled is False + + +def test_parse_invalid_user_id_raises(): + with pytest.raises(ValueError, match="Invalid operator user_id"): + parse_control_config("not-a-user-id", CTRL_ROOM) + + +def test_parse_invalid_room_id_raises(): + with pytest.raises(ValueError, match="Invalid control room_id"): + parse_control_config(OP1, "not-a-room-id") + + +def test_parse_whitespace_stripped(): + cfg = parse_control_config(f" {OP1} , {OP2} ", f" {CTRL_ROOM} ") + assert OP1 in cfg.operator_allowlist + assert CTRL_ROOM in cfg.control_rooms + + +def test_is_enabled_false_when_no_operators(): + cfg = parse_control_config("", CTRL_ROOM) + assert cfg.is_enabled is False + + +def test_is_enabled_false_when_no_rooms(): + cfg = parse_control_config(OP1, "") + assert cfg.is_enabled is False + + +def test_is_enabled_true_when_both_set(): + cfg = _make_ctrl_config() + assert cfg.is_enabled is True + + +# ── is_control_message ─────────────────────────────────────────────────────── + +def test_is_control_message_bang_prefix(): + assert is_control_message("!runbook start test.md") is True + + +def test_is_control_message_help(): + assert is_control_message("!help") is True + + +def test_is_control_message_no_bang(): + assert is_control_message("/sofiia hello") is False + assert is_control_message("hello") is False + + +def test_is_control_message_empty(): + assert is_control_message("") is False + + +# ── is_operator / is_control_room ──────────────────────────────────────────── + +def test_is_operator_true(): + cfg = _make_ctrl_config() + assert is_operator(OP1, cfg) is True + + +def test_is_operator_false(): + cfg = _make_ctrl_config() + assert is_operator(NON_OP, cfg) is False + + +def test_is_control_room_true(): + cfg = _make_ctrl_config() + assert is_control_room(CTRL_ROOM, cfg) is True + + +def test_is_control_room_false(): + cfg = _make_ctrl_config() + assert is_control_room("!other:server", cfg) is False + + +# ── parse_command ───────────────────────────────────────────────────────────── + +def test_parse_runbook_start(): + cmd = parse_command("!runbook start path/to/runbook.md node=NODA1") + assert cmd is not None + assert cmd.verb == VERB_RUNBOOK + assert cmd.subcommand == "start" + assert "path/to/runbook.md" in cmd.args + assert cmd.kwargs.get("node") == "NODA1" + assert cmd.is_known is True + + +def test_parse_runbook_next(): + cmd = parse_command("!runbook next run-abc123") + assert cmd is not None + assert cmd.subcommand == "next" + assert "run-abc123" in cmd.args + + +def test_parse_help(): + cmd = parse_command("!help") + assert cmd is not None + assert cmd.verb == VERB_HELP + assert cmd.subcommand == "" + assert cmd.is_known is True + + +def test_parse_unknown_verb(): + cmd = parse_command("!frobnicate do-something") + assert cmd is not None + assert cmd.is_known is False + + +def test_parse_too_long_returns_none(): + long_msg = "!" + "x" * 600 + assert parse_command(long_msg) is None + + +def test_parse_non_command_returns_none(): + assert parse_command("regular text") is None + assert parse_command("/slash command") is None + + +def test_parse_only_bang_returns_none(): + assert parse_command("!") is None + assert parse_command("! ") is None + + +def test_parse_kwargs_extracted(): + cmd = parse_command("!runbook complete run-1 step=3 status=ok notes=done") + assert cmd is not None + assert cmd.kwargs["step"] == "3" + assert cmd.kwargs["status"] == "ok" + assert cmd.kwargs["notes"] == "done" + + +# ── check_authorization ─────────────────────────────────────────────────────── + +def test_check_authorization_ok(): + cfg = _make_ctrl_config() + auth, reason = check_authorization(OP1, CTRL_ROOM, cfg) + assert auth is True + assert reason == "ok" + + +def test_check_authorization_not_operator(): + cfg = _make_ctrl_config() + auth, reason = check_authorization(NON_OP, CTRL_ROOM, cfg) + assert auth is False + assert reason == "not_operator" + + +def test_check_authorization_not_control_room(): + cfg = _make_ctrl_config() + auth, reason = check_authorization(OP1, "!wrong:server", cfg) + assert auth is False + assert reason == "not_control_room" + + +# ── Reply helpers ───────────────────────────────────────────────────────────── + +def test_not_implemented_reply_contains_verb(): + cmd = parse_command("!runbook start test.md") + reply = not_implemented_reply(cmd) + assert "runbook" in reply + assert "M3.1" in reply + + +def test_unknown_command_reply_contains_known_verbs(): + cmd = parse_command("!frobnicate") + reply = unknown_command_reply(cmd) + assert "runbook" in reply or "status" in reply or "help" in reply + + +def test_unauthorized_reply_not_operator(): + assert "operator allowlist" in unauthorized_reply("not_operator") + + +def test_help_reply_lists_commands(): + reply = help_reply() + assert "!runbook" in reply + assert "!status" in reply + assert "!help" in reply + + +# ── Ingress _try_control integration ───────────────────────────────────────── + +def test_authorized_command_audited(): + """Authorised !runbook start → audit matrix.control.command written.""" + ingress = _make_ingress() + audit_events: List[str] = [] + + async def fake_audit(*args, **kwargs): + audit_events.append(kwargs.get("event", "")) + + fake_client = MagicMock() + fake_client.mark_seen = MagicMock() + fake_client.send_text = AsyncMock() + + event = _make_event("!runbook start test.md node=NODA1", event_id="ctrl1", sender=OP1) + + async def _run(): + with patch("app.ingress._write_audit", side_effect=fake_audit): + await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) + + run(_run()) + + assert "matrix.control.command" in audit_events + fake_client.send_text.assert_called_once() + # Reply should mention pending implementation + sent_text = fake_client.send_text.call_args[0][1] + assert "runbook" in sent_text.lower() + + +def test_unauthorized_command_audited_silent_by_default(): + """Non-operator !command → audit unauthorized, no reply (default ignore).""" + ingress = _make_ingress() + audit_events: List[str] = [] + + async def fake_audit(*args, **kwargs): + audit_events.append(kwargs.get("event", "")) + + fake_client = MagicMock() + fake_client.mark_seen = MagicMock() + fake_client.send_text = AsyncMock() + + event = _make_event("!runbook start test.md", event_id="ctrl2", sender=NON_OP) + + async def _run(): + with patch("app.ingress._write_audit", side_effect=fake_audit): + await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) + + run(_run()) + + assert "matrix.control.unauthorized" in audit_events + fake_client.send_text.assert_not_called() # silent + + +def test_unauthorized_reply_error_sends_message(): + """CONTROL_UNAUTHORIZED_BEHAVIOR=reply_error → ⛔ message sent.""" + ingress = _make_ingress() + ingress._control_unauthorized_behavior = "reply_error" + + sent_texts: List[str] = [] + fake_client = MagicMock() + fake_client.mark_seen = MagicMock() + fake_client.send_text = AsyncMock( + side_effect=lambda room, text, txn_id=None: sent_texts.append(text) + ) + + event = _make_event("!runbook start test.md", event_id="ctrl3", sender=NON_OP) + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) + + run(_run()) + + assert len(sent_texts) == 1 + assert "Not authorised" in sent_texts[0] or "⛔" in sent_texts[0] + + +def test_non_command_message_ignored(): + """Regular text in control room (no '!') → silently ignored.""" + ingress = _make_ingress() + fake_client = MagicMock() + fake_client.mark_seen = MagicMock() + fake_client.send_text = AsyncMock() + + event = _make_event("just chatting in ops room", event_id="ctrl4", sender=OP1) + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) + + run(_run()) + + fake_client.mark_seen.assert_not_called() + fake_client.send_text.assert_not_called() + + +def test_on_control_command_callback_fires(): + """on_control_command callback receives (sender, verb, subcommand).""" + ingress = _make_ingress() + ctrl_calls: List[tuple] = [] + ingress._on_control_command = lambda s, v, sc: ctrl_calls.append((s, v, sc)) + + fake_client = MagicMock() + fake_client.mark_seen = MagicMock() + fake_client.send_text = AsyncMock() + + event = _make_event("!runbook next run-abc123", event_id="ctrl5", sender=OP1) + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) + + run(_run()) + + assert len(ctrl_calls) == 1 + sender, verb, subcommand = ctrl_calls[0] + assert sender == OP1 + assert verb == "runbook" + assert subcommand == "next" + + +def test_help_command_returns_help_text(): + """!help → help_reply() sent.""" + ingress = _make_ingress() + sent_texts: List[str] = [] + + fake_client = MagicMock() + fake_client.mark_seen = MagicMock() + fake_client.send_text = AsyncMock( + side_effect=lambda room, text, txn_id=None: sent_texts.append(text) + ) + + event = _make_event("!help", event_id="ctrl6", sender=OP2) + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) + + run(_run()) + + assert len(sent_texts) == 1 + assert "!runbook" in sent_texts[0] + + +def test_unknown_command_verb_audited_and_replied(): + """Unknown verb → matrix.control.unknown_cmd audited + reply.""" + ingress = _make_ingress() + audit_events: List[str] = [] + + async def fake_audit(*args, **kwargs): + audit_events.append(kwargs.get("event", "")) + + sent_texts: List[str] = [] + fake_client = MagicMock() + fake_client.mark_seen = MagicMock() + fake_client.send_text = AsyncMock( + side_effect=lambda room, text, txn_id=None: sent_texts.append(text) + ) + + event = _make_event("!frobnicate do-stuff", event_id="ctrl7", sender=OP1) + + async def _run(): + with patch("app.ingress._write_audit", side_effect=fake_audit): + await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) + + run(_run()) + + assert "matrix.control.unknown_cmd" in audit_events + assert len(sent_texts) == 1 + assert "Unknown command" in sent_texts[0] or "unknown" in sent_texts[0].lower() + + +def test_control_room_events_not_forwarded_to_agents(): + """Messages from control room must NOT be enqueued as agent messages.""" + ingress = _make_ingress() + queue: asyncio.Queue = asyncio.Queue(maxsize=10) + ingress._queue = queue + + def extract(sync_resp, room_id): + if room_id == CTRL_ROOM: + return [_make_event("!runbook start test.md", event_id="fwd1", sender=OP1)] + return [] + + fake_client = MagicMock() + fake_client.extract_room_messages.side_effect = extract + fake_client.mark_seen = MagicMock() + fake_client.send_text = AsyncMock() + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._enqueue_from_sync(fake_client, queue, AsyncMock(), {}) + + run(_run()) + + assert queue.qsize() == 0 # control commands never enqueued as agent work