Files
microdao-daarion/services/matrix-bridge-dagi/app/main.py
Apple 1d8482f4c1 feat(matrix-bridge-dagi): scaffold service with health, metrics and config (PR-M1.0)
New service: services/matrix-bridge-dagi/
- app/config.py: BridgeConfig dataclass, load_config() with full env validation
  (MATRIX_HOMESERVER_URL, MATRIX_ACCESS_TOKEN, MATRIX_USER_ID, SOFIIA_ROOM_ID,
   DAGI_GATEWAY_URL, SOFIIA_CONSOLE_URL, SOFIIA_INTERNAL_TOKEN, rate limits)
- app/main.py: FastAPI app with lifespan, GET /health, GET /metrics (prometheus)
  health returns: ok, node_id, homeserver, bridge_user, sofiia_room_id,
  allowed_agents, gateway, uptime_s; graceful error state when config missing
- requirements.txt: fastapi, uvicorn, httpx, prometheus-client, pyyaml
- Dockerfile: python:3.11-slim, port 7030, BUILD_SHA/BUILD_TIME args

docker-compose.matrix-bridge-node1.yml:
- standalone override file (node1 network, port 127.0.0.1:7030)
- all env vars wired: MATRIX_*, SOFIIA_ROOM_ID, DAGI_GATEWAY_URL,
  SOFIIA_CONSOLE_URL, SOFIIA_INTERNAL_TOKEN, rate limit policy
- healthcheck, restart: unless-stopped

DoD: config validates, health/metrics respond, imports clean
Made-with: Cursor
2026-03-03 07:28:24 -08:00

137 lines
5.0 KiB
Python

"""
matrix-bridge-dagi — Phase M1 scaffold
Bridges Matrix/Element rooms to DAGI agents via Gateway.
M1 scope: 1 room ↔ 1 agent (Sofiia), audit via sofiia-console internal endpoint.
"""
import logging
import os
import time
from contextlib import asynccontextmanager
from typing import Any, Dict
from fastapi import FastAPI, Response
from fastapi.middleware.cors import CORSMiddleware
try:
from prometheus_client import (
Counter, Histogram, Gauge,
generate_latest, CONTENT_TYPE_LATEST,
CollectorRegistry, REGISTRY,
)
_PROM_OK = True
except ImportError: # pragma: no cover
_PROM_OK = False
from .config import BridgeConfig, load_config
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
logger = logging.getLogger("matrix-bridge-dagi")
# ── Prometheus metrics ────────────────────────────────────────────────────────
if _PROM_OK:
_messages_received = Counter(
"matrix_bridge_messages_received_total",
"Total Matrix messages received",
["room_id", "agent_id"],
)
_messages_replied = Counter(
"matrix_bridge_messages_replied_total",
"Total agent replies sent to Matrix",
["room_id", "agent_id", "status"],
)
_gateway_errors = Counter(
"matrix_bridge_gateway_errors_total",
"Errors calling DAGI gateway",
["error_type"],
)
_invoke_latency = Histogram(
"matrix_bridge_invoke_duration_seconds",
"Duration of DAGI invoke call",
["agent_id"],
)
_bridge_up = Gauge(
"matrix_bridge_up",
"1 if bridge started successfully",
)
# ── Startup state ─────────────────────────────────────────────────────────────
_START_TIME = time.monotonic()
_cfg: BridgeConfig | None = None
_config_error: str | None = None
# ── Lifespan ──────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app_: Any):
global _cfg, _config_error
try:
_cfg = load_config()
logger.info(
"✅ matrix-bridge-dagi started | node=%s build=%s homeserver=%s room=%s agents=%s",
_cfg.node_id, _cfg.build_sha, _cfg.matrix_homeserver_url,
_cfg.sofiia_room_id, list(_cfg.bridge_allowed_agents),
)
if _PROM_OK:
_bridge_up.set(1)
except RuntimeError as exc:
_config_error = str(exc)
logger.error("❌ Config error: %s", _config_error)
if _PROM_OK:
_bridge_up.set(0)
yield
logger.info("matrix-bridge-dagi shutting down")
# ── App ───────────────────────────────────────────────────────────────────────
app = FastAPI(
title="matrix-bridge-dagi",
version="0.1.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
# ── Health ────────────────────────────────────────────────────────────────────
@app.get("/health")
async def health() -> Dict[str, Any]:
uptime = int(time.monotonic() - _START_TIME)
if _config_error or _cfg is None:
return {
"ok": False,
"service": "matrix-bridge-dagi",
"version": "0.1.0",
"build": os.getenv("BUILD_SHA", "dev"),
"uptime_s": uptime,
"error": _config_error or "service not initialised",
}
return {
"ok": True,
"service": "matrix-bridge-dagi",
"version": "0.1.0",
"build": _cfg.build_sha,
"build_time": _cfg.build_time,
"env": os.getenv("ENV", "dev"),
"uptime_s": uptime,
"node_id": _cfg.node_id,
"homeserver": _cfg.matrix_homeserver_url,
"bridge_user": _cfg.matrix_user_id,
"sofiia_room_id": _cfg.sofiia_room_id,
"allowed_agents": list(_cfg.bridge_allowed_agents),
"gateway": _cfg.dagi_gateway_url,
"config_ok": True,
}
# ── Metrics ───────────────────────────────────────────────────────────────────
@app.get("/metrics")
async def metrics():
if not _PROM_OK:
return Response("# prometheus_client not available\n", media_type="text/plain")
return Response(generate_latest(REGISTRY), media_type=CONTENT_TYPE_LATEST)