Files
microdao-daarion/infrastructure/observability/matrix-alerts-bridge.py
Apple 70fd268a0d 🚀 Production-ready: Auth enforcement + Observability + Policy
- Atomic генерація всіх секретів (generate-all-secrets.sh)
- Auth enforcement перевірка (enforce-auth.sh)
- Оновлений full flow test (must-pass)
- Prometheus alerting rules для Memory Module
- Matrix alerts bridge (алерти в ops room)
- Policy engine документація для пам'яті

Готово до production deployment!
2026-01-10 10:56:05 -08:00

66 lines
2.2 KiB
Python

#!/usr/bin/env python3
"""
Matrix Alerts Bridge — відправка алертів з Prometheus в Matrix ops room
"""
import asyncio
import os
from nio import AsyncClient
from prometheus_client import start_http_server
from prometheus_client.core import Gauge, Counter
class MatrixAlertsBridge:
def __init__(self):
self.matrix_homeserver = os.getenv("MATRIX_HOMESERVER", "https://matrix.org")
self.matrix_user = os.getenv("MATRIX_USER", "")
self.matrix_password = os.getenv("MATRIX_PASSWORD", "")
self.ops_room_id = os.getenv("MATRIX_OPS_ROOM_ID", "")
self.client: AsyncClient = None
async def connect(self):
"""Підключення до Matrix"""
self.client = AsyncClient(self.matrix_homeserver, self.matrix_user)
await self.client.login(self.matrix_password)
print(f"✅ Підключено до Matrix: {self.matrix_user}")
async def send_alert(self, alert_name: str, severity: str, description: str):
"""Відправка алерту в Matrix ops room"""
emoji = "🔴" if severity == "critical" else "🟡"
message = f"{emoji} **{alert_name}** ({severity})\n\n{description}"
await self.client.room_send(
room_id=self.ops_room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": message,
"format": "org.matrix.custom.html",
"formatted_body": message.replace("\n", "<br>")
}
)
async def listen_prometheus_alerts(self):
"""Слухання алертів з Prometheus Alertmanager webhook"""
# TODO: Реалізація webhook listener для Prometheus Alertmanager
pass
async def main():
bridge = MatrixAlertsBridge()
await bridge.connect()
# Тестовий алерт
await bridge.send_alert(
"TestAlert",
"warning",
"Це тестовий алерт для перевірки Matrix bridge"
)
print("✅ Тестовий алерт відправлено")
if __name__ == "__main__":
asyncio.run(main())