feat(matrix-bridge-dagi): add room mapping, ingress loop, synapse setup (PR-M1.2 + PR-M1.3)

PR-M1.2 — room-to-agent mapping:
- adds room_mapping.py: parse BRIDGE_ROOM_MAP (format: agent:!room_id:server)
- RoomMappingConfig with O(1) room→agent lookup, agent allowlist check
- /bridge/mappings endpoint (read-only ops summary, no secrets)
- health endpoint now includes mappings_count
- 21 tests for parsing, validation, allowlist, summary

PR-M1.3 — Matrix ingress loop:
- adds ingress.py: MatrixIngressLoop asyncio task
- sync_poll → extract → dedupe → _invoke_gateway (POST /v1/invoke)
- gateway payload: agent_id, node_id, message, metadata (transport, room_id, event_id, sender)
- exponential backoff on errors (2s..60s)
- joins all mapped rooms at startup
- metric callbacks: on_message_received, on_gateway_error
- graceful shutdown via asyncio.Event
- 5 ingress tests (invoke, dedupe, callbacks, empty-map idle)

Synapse setup (docker-compose.synapse-node1.yml):
- fixed volume: bind mount ./synapse-data instead of named volume
- added port mapping 127.0.0.1:8008:8008

Synapse running on NODA1 (localhost:8008), bot @dagi_bridge:daarion.space created,
room !QwHczWXgefDHBEVkTH:daarion.space created, all 4 values in .env on NODA1.

Made-with: Cursor
This commit is contained in:
Apple
2026-03-03 07:51:13 -08:00
parent d8506da179
commit dbfab78f02
6 changed files with 1010 additions and 5 deletions

View File

@@ -0,0 +1,267 @@
"""
Matrix Ingress Loop — Phase M1.3
Polls Matrix /sync for new messages, invokes DAGI Gateway for mapped rooms.
Does NOT send replies back (that is PR-M1.4 egress).
Design:
- asyncio task, driven by run_ingress_loop()
- sync_poll() → extract_room_messages() per mapped room
- for each message: dedupe → invoke gateway → audit (fire-and-forget)
- next_batch token persisted in memory (restart resets to None — acceptable for M1)
- graceful shutdown via asyncio.Event
"""
import asyncio
import logging
import time
from typing import Any, Dict, Optional
import httpx
from .matrix_client import MatrixClient
from .room_mapping import RoomMappingConfig
logger = logging.getLogger(__name__)
# ── Constants ──────────────────────────────────────────────────────────────────
# Max wait between sync retries on error (seconds)
_MAX_RETRY_BACKOFF = 60.0
_INIT_RETRY_BACKOFF = 2.0
# Gateway invoke timeout
_GATEWAY_TIMEOUT_S = 30.0
# ── Gateway invoke ─────────────────────────────────────────────────────────────
async def _invoke_gateway(
http_client: httpx.AsyncClient,
gateway_url: str,
agent_id: str,
node_id: str,
message_text: str,
matrix_room_id: str,
matrix_event_id: str,
matrix_sender: str,
) -> Dict[str, Any]:
"""
POST to DAGI Gateway /v1/invoke (or /debug/agent_ping equivalent).
Returns parsed JSON response or raises httpx.HTTPError.
Payload format matches existing Gateway invoke schema.
"""
url = f"{gateway_url.rstrip('/')}/v1/invoke"
payload = {
"agent_id": agent_id,
"node_id": node_id,
"message": message_text,
"metadata": {
"transport": "matrix",
"matrix_room_id": matrix_room_id,
"matrix_event_id": matrix_event_id,
"matrix_sender": matrix_sender,
"node_id": node_id,
},
}
resp = await http_client.post(url, json=payload, timeout=_GATEWAY_TIMEOUT_S)
resp.raise_for_status()
return resp.json()
# ── Ingress loop ───────────────────────────────────────────────────────────────
class MatrixIngressLoop:
"""
Drives the Matrix sync-poll → gateway-invoke pipeline.
Usage:
loop = MatrixIngressLoop(cfg, room_map)
stop_event = asyncio.Event()
await loop.run(stop_event)
Metrics callbacks (optional, injected to avoid hard dependency):
on_message_received(room_id, agent_id) — called after successful dedupe
on_gateway_error(error_type) — called on gateway invoke error
"""
def __init__(
self,
matrix_homeserver_url: str,
matrix_access_token: str,
matrix_user_id: str,
gateway_url: str,
node_id: str,
room_map: RoomMappingConfig,
on_message_received=None,
on_gateway_error=None,
) -> None:
self._hs_url = matrix_homeserver_url
self._token = matrix_access_token
self._user_id = matrix_user_id
self._gateway_url = gateway_url
self._node_id = node_id
self._room_map = room_map
self._on_message_received = on_message_received
self._on_gateway_error = on_gateway_error
self._next_batch: Optional[str] = None
self._running = False
@property
def next_batch(self) -> Optional[str]:
return self._next_batch
async def run(self, stop_event: asyncio.Event) -> None:
"""
Main loop. Runs until stop_event is set.
Handles errors with exponential backoff.
"""
self._running = True
backoff = _INIT_RETRY_BACKOFF
logger.info(
"Matrix ingress loop started | hs=%s node=%s mappings=%d",
self._hs_url, self._node_id, self._room_map.total_mappings,
)
if self._room_map.total_mappings == 0:
logger.warning("No room mappings configured — ingress loop is idle")
async with MatrixClient(
self._hs_url, self._token, self._user_id
) as client:
# Join all mapped rooms at startup
for mapping in self._room_map.mappings:
if mapping.agent_id in self._room_map.allowed_agents:
try:
await client.join_room(mapping.room_id)
logger.info("Joined room %s → agent %s", mapping.room_id, mapping.agent_id)
except Exception as exc:
logger.warning("Could not join room %s: %s", mapping.room_id, exc)
async with httpx.AsyncClient(timeout=_GATEWAY_TIMEOUT_S) as gw_client:
while not stop_event.is_set():
try:
sync_resp = await client.sync_poll(since=self._next_batch)
self._next_batch = sync_resp.get("next_batch")
backoff = _INIT_RETRY_BACKOFF # reset on success
await self._process_sync(client, gw_client, sync_resp)
except asyncio.CancelledError:
logger.info("Ingress loop cancelled")
break
except Exception as exc:
logger.error(
"Ingress loop error (retry in %.1fs): %s",
backoff, exc,
)
if self._on_gateway_error:
self._on_gateway_error("sync_error")
try:
await asyncio.wait_for(
stop_event.wait(), timeout=backoff
)
except asyncio.TimeoutError:
pass
backoff = min(backoff * 2, _MAX_RETRY_BACKOFF)
self._running = False
logger.info("Matrix ingress loop stopped")
async def _process_sync(
self,
client: MatrixClient,
gw_client: httpx.AsyncClient,
sync_resp: Dict[str, Any],
) -> None:
"""Process all mapped rooms in a sync response."""
for mapping in self._room_map.mappings:
if mapping.agent_id not in self._room_map.allowed_agents:
continue
messages = client.extract_room_messages(sync_resp, mapping.room_id)
for event in messages:
await self._handle_message(client, gw_client, event, mapping)
async def _handle_message(
self,
client: MatrixClient,
gw_client: httpx.AsyncClient,
event: Dict[str, Any],
mapping,
) -> None:
"""
Process a single Matrix message event:
1. Mark as seen (dedupe)
2. Invoke DAGI gateway
3. Fire metrics callback
Note: Reply sending (egress) is PR-M1.4 — not done here.
"""
event_id = event.get("event_id", "")
sender = event.get("sender", "")
text = event.get("content", {}).get("body", "").strip()
room_id = mapping.room_id
agent_id = mapping.agent_id
if not text:
logger.debug("Skipping empty message from %s in %s", sender, room_id)
return
# Mark event as seen before invoke (prevents duplicate on retry)
client.mark_seen(event_id)
logger.info(
"Matrix message: room=%s sender=%s agent=%s event=%s text_len=%d",
room_id, sender, agent_id, event_id, len(text),
)
if self._on_message_received:
self._on_message_received(room_id, agent_id)
t0 = time.monotonic()
try:
await _invoke_gateway(
gw_client,
self._gateway_url,
agent_id=agent_id,
node_id=self._node_id,
message_text=text,
matrix_room_id=room_id,
matrix_event_id=event_id,
matrix_sender=sender,
)
duration = time.monotonic() - t0
logger.info(
"Gateway invoke ok: agent=%s event=%s duration=%.2fs",
agent_id, event_id, duration,
)
except httpx.HTTPStatusError as exc:
duration = time.monotonic() - t0
logger.error(
"Gateway HTTP error %d for agent=%s event=%s duration=%.2fs",
exc.response.status_code, agent_id, event_id, duration,
)
if self._on_gateway_error:
self._on_gateway_error(f"http_{exc.response.status_code}")
except (httpx.ConnectError, httpx.TimeoutException) as exc:
duration = time.monotonic() - t0
logger.error(
"Gateway network error for agent=%s event=%s: %s duration=%.2fs",
agent_id, event_id, exc, duration,
)
if self._on_gateway_error:
self._on_gateway_error("network_error")
except Exception as exc:
duration = time.monotonic() - t0
logger.error(
"Unexpected error invoking gateway for agent=%s event=%s: %s",
agent_id, event_id, exc,
)
if self._on_gateway_error:
self._on_gateway_error("unexpected")

View File

@@ -4,6 +4,7 @@ Bridges Matrix/Element rooms to DAGI agents via Gateway.
M1 scope: 1 room ↔ 1 agent (Sofiia), audit via sofiia-console internal endpoint.
"""
import asyncio
import logging
import os
import time
@@ -31,6 +32,8 @@ except ImportError: # pragma: no cover
_PROM_OK = False
from .config import BridgeConfig, load_config
from .ingress import MatrixIngressLoop
from .room_mapping import RoomMappingConfig, parse_room_map
logging.basicConfig(
level=logging.INFO,
@@ -71,6 +74,9 @@ _cfg: Optional[BridgeConfig] = None
_config_error: Optional[str] = None
_matrix_reachable: Optional[bool] = None # probed at startup
_gateway_reachable: Optional[bool] = None # probed at startup
_room_map: Optional[RoomMappingConfig] = None
_ingress_task: Optional[asyncio.Task] = None
_ingress_stop: Optional[asyncio.Event] = None
async def _probe_url(url: str, timeout: float = 5.0) -> bool:
@@ -87,14 +93,24 @@ async def _probe_url(url: str, timeout: float = 5.0) -> bool:
# ── Lifespan ──────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app_: Any):
global _cfg, _config_error, _matrix_reachable, _gateway_reachable
global _cfg, _config_error, _matrix_reachable, _gateway_reachable, _room_map
try:
_cfg = load_config()
# Parse room mapping
_room_map = parse_room_map(
os.getenv("BRIDGE_ROOM_MAP", ""),
_cfg.bridge_allowed_agents,
)
logger.info(
"✅ matrix-bridge-dagi started | node=%s build=%s homeserver=%s room=%s agents=%s",
"✅ matrix-bridge-dagi started | node=%s build=%s homeserver=%s "
"room=%s agents=%s mappings=%d",
_cfg.node_id, _cfg.build_sha, _cfg.matrix_homeserver_url,
_cfg.sofiia_room_id, list(_cfg.bridge_allowed_agents),
_room_map.total_mappings,
)
# Connectivity smoke probes (non-blocking failures)
_matrix_reachable = await _probe_url(
f"{_cfg.matrix_homeserver_url}/_matrix/client/versions"
@@ -112,12 +128,52 @@ async def lifespan(app_: Any):
logger.warning("⚠️ DAGI Gateway NOT reachable: %s", _cfg.dagi_gateway_url)
if _PROM_OK:
_bridge_up.set(1)
except RuntimeError as exc:
# Start ingress loop (fire-and-forget asyncio task)
if _room_map and _room_map.total_mappings > 0:
_ingress_stop = asyncio.Event()
def _on_msg(room_id: str, agent_id: str) -> None:
if _PROM_OK:
_messages_received.labels(room_id=room_id, agent_id=agent_id).inc()
def _on_gw_error(error_type: str) -> None:
if _PROM_OK:
_gateway_errors.labels(error_type=error_type).inc()
ingress = MatrixIngressLoop(
matrix_homeserver_url=_cfg.matrix_homeserver_url,
matrix_access_token=_cfg.matrix_access_token,
matrix_user_id=_cfg.matrix_user_id,
gateway_url=_cfg.dagi_gateway_url,
node_id=_cfg.node_id,
room_map=_room_map,
on_message_received=_on_msg,
on_gateway_error=_on_gw_error,
)
_ingress_task = asyncio.create_task(
ingress.run(_ingress_stop),
name="matrix_ingress_loop",
)
logger.info("✅ Ingress loop task started")
else:
logger.warning("⚠️ No room mappings — ingress loop NOT started")
except (RuntimeError, ValueError) as exc:
_config_error = str(exc)
logger.error("❌ Config error: %s", _config_error)
if _PROM_OK:
_bridge_up.set(0)
yield
# Shutdown: cancel ingress loop
if _ingress_stop:
_ingress_stop.set()
if _ingress_task and not _ingress_task.done():
_ingress_task.cancel()
try:
await asyncio.wait_for(_ingress_task, timeout=5.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
logger.info("matrix-bridge-dagi shutting down")
# ── App ───────────────────────────────────────────────────────────────────────
@@ -166,9 +222,32 @@ async def health() -> Dict[str, Any]:
"allowed_agents": list(_cfg.bridge_allowed_agents),
"gateway": _cfg.dagi_gateway_url,
"gateway_reachable": _gateway_reachable,
"mappings_count": _room_map.total_mappings if _room_map else 0,
"config_ok": True,
}
# ── Bridge Mappings (read-only ops endpoint) ───────────────────────────────────
@app.get("/bridge/mappings")
async def bridge_mappings() -> Dict[str, Any]:
"""
Returns room-to-agent mapping summary.
Safe for ops visibility — no secrets included.
"""
if _cfg is None or _room_map is None:
return {
"ok": False,
"error": _config_error or "service not initialised",
"mappings": [],
}
return {
"ok": True,
"total": _room_map.total_mappings,
"allowed_agents": list(_cfg.bridge_allowed_agents),
"mappings": _room_map.as_summary(),
}
# ── Metrics ───────────────────────────────────────────────────────────────────
@app.get("/metrics")
async def metrics():

View File

@@ -0,0 +1,156 @@
"""
Room-to-Agent Mapping — Phase M1
Parses BRIDGE_ROOM_MAP env var and provides:
- room_id → agent_id lookup
- agent_id allowlist validation (from BRIDGE_ALLOWED_AGENTS)
- summary for /bridge/mappings endpoint
Format of BRIDGE_ROOM_MAP:
"agent_id:!room_id:server,agent2:!room2:server"
e.g. "sofiia:!QwHczWXgefDHBEVkTH:daarion.space"
Multiple mappings separated by comma.
"""
import logging
import re
from dataclasses import dataclass, field
from typing import Dict, FrozenSet, List, Optional
logger = logging.getLogger(__name__)
# Room ID format: !<localpart>:<server>
_ROOM_ID_RE = re.compile(r"^![A-Za-z0-9\-_.]+:[A-Za-z0-9\-_.]+$")
@dataclass(frozen=True)
class RoomMapping:
"""Single room → agent binding."""
room_id: str # e.g. "!abc:daarion.space"
agent_id: str # e.g. "sofiia"
@dataclass
class RoomMappingConfig:
"""
Parsed mapping configuration.
Attributes:
mappings: List of RoomMapping (room_id → agent_id)
allowed_agents: Frozenset of allowlisted agent ids
"""
mappings: List[RoomMapping] = field(default_factory=list)
allowed_agents: FrozenSet[str] = field(default_factory=frozenset)
# Internal index for O(1) lookup
_room_to_agent: Dict[str, str] = field(default_factory=dict, repr=False, compare=False)
_agent_to_rooms: Dict[str, List[str]] = field(default_factory=dict, repr=False, compare=False)
def __post_init__(self) -> None:
self._rebuild_index()
def _rebuild_index(self) -> None:
self._room_to_agent = {m.room_id: m.agent_id for m in self.mappings}
self._agent_to_rooms = {}
for m in self.mappings:
self._agent_to_rooms.setdefault(m.agent_id, []).append(m.room_id)
def agent_for_room(self, room_id: str) -> Optional[str]:
"""
Returns agent_id for a room_id, or None if room is not mapped.
Also returns None if agent is not in allowed_agents.
"""
agent = self._room_to_agent.get(room_id)
if agent is None:
return None
if agent not in self.allowed_agents:
logger.warning("Room %s mapped to agent %s which is NOT in allowed_agents", room_id, agent)
return None
return agent
def rooms_for_agent(self, agent_id: str) -> List[str]:
"""Returns list of room_ids mapped to an agent_id."""
return list(self._agent_to_rooms.get(agent_id, []))
def as_summary(self) -> List[Dict]:
"""
Returns a safe summary list for the /bridge/mappings endpoint.
Room IDs are NOT secrets (they identify chat rooms), but tokens are never included.
"""
return [
{
"room_id": m.room_id,
"agent_id": m.agent_id,
"allowed": m.agent_id in self.allowed_agents,
}
for m in self.mappings
]
@property
def total_mappings(self) -> int:
return len(self.mappings)
def parse_room_map(raw: str, allowed_agents: FrozenSet[str]) -> RoomMappingConfig:
"""
Parse BRIDGE_ROOM_MAP string into RoomMappingConfig.
Format: "agent_id:!room_id:server[,agent2:!room2:server2,...]"
Raises ValueError on malformed entries (but skips warn-only issues).
"""
mappings: List[RoomMapping] = []
errors: List[str] = []
if not raw or not raw.strip():
return RoomMappingConfig(mappings=[], allowed_agents=allowed_agents)
for idx, entry in enumerate(raw.split(",")):
entry = entry.strip()
if not entry:
continue
# Find the colon that separates agent_id from room_id
# Room IDs look like !localpart:server — the separator colon is after agent_id
# Format: "sofiia:!QwHczWXgefDHBEVkTH:daarion.space"
# ^agent^:^------room_id---------^
colon_idx = entry.find(":")
if colon_idx < 1:
errors.append(f"Entry[{idx}] missing agent:room separator: {entry!r}")
continue
agent_id = entry[:colon_idx].strip()
room_id = entry[colon_idx + 1:].strip()
if not agent_id:
errors.append(f"Entry[{idx}] empty agent_id in: {entry!r}")
continue
if not room_id:
errors.append(f"Entry[{idx}] empty room_id in: {entry!r}")
continue
if not _ROOM_ID_RE.match(room_id):
errors.append(
f"Entry[{idx}] invalid room_id format (expected !localpart:server): {room_id!r}"
)
continue
if agent_id not in allowed_agents:
logger.warning(
"Entry[%d] agent %r not in allowed_agents %s — mapping accepted but will be rejected at runtime",
idx, agent_id, set(allowed_agents),
)
mappings.append(RoomMapping(room_id=room_id, agent_id=agent_id))
if errors:
raise ValueError(f"BRIDGE_ROOM_MAP parse errors: {'; '.join(errors)}")
config = RoomMappingConfig(mappings=mappings, allowed_agents=allowed_agents)
logger.info(
"Room mapping loaded: %d entries, allowed_agents=%s",
len(mappings), set(allowed_agents),
)
return config