Files
microdao-daarion/telegram-infrastructure/telegram-gateway/app/bots_registry.py
Apple 3de3c8cb36 feat: Add presence heartbeat for Matrix online status
- matrix-gateway: POST /internal/matrix/presence/online endpoint
- usePresenceHeartbeat hook with activity tracking
- Auto away after 5 min inactivity
- Offline on page close/visibility change
- Integrated in MatrixChatRoom component
2025-11-27 00:19:40 -08:00

60 lines
2.5 KiB
Python

import logging
from typing import Dict, Optional, List
from .models import BotRegistration
from .config import BotConfig
logger = logging.getLogger(__name__)
class BotsRegistry:
"""
Реєстр ботів з підтримкою ініціалізації з конфігурації.
TODO: замінити на персистентне сховище (PostgreSQL/microdao DB).
"""
def __init__(self) -> None:
self._agent_to_token: Dict[str, str] = {}
self._token_to_agent: Dict[str, str] = {}
def register(self, reg: BotRegistration) -> None:
"""Реєстрація бота через BotRegistration (HTTP API)"""
self._agent_to_token[reg.agent_id] = reg.bot_token
self._token_to_agent[reg.bot_token] = reg.agent_id
logger.info(f"Registered bot: agent_id={reg.agent_id}, token={reg.bot_token[:8]}...")
def register_from_config(self, bot_config: BotConfig) -> None:
"""Реєстрація бота з BotConfig (конфігурація)"""
if not bot_config.enabled:
logger.debug(f"Skipping disabled bot: agent_id={bot_config.agent_id}")
return
self._agent_to_token[bot_config.agent_id] = bot_config.bot_token
self._token_to_agent[bot_config.bot_token] = bot_config.agent_id
logger.info(f"Registered bot from config: agent_id={bot_config.agent_id}, token={bot_config.bot_token[:8]}...")
def register_batch(self, bot_configs: List[BotConfig]) -> None:
"""Масове реєстрування ботів з конфігурації"""
for bot_config in bot_configs:
self.register_from_config(bot_config)
def unregister(self, agent_id: str) -> Optional[str]:
"""Видалити бота за agent_id та повернути bot_token"""
bot_token = self._agent_to_token.pop(agent_id, None)
if bot_token:
self._token_to_agent.pop(bot_token, None)
logger.info(f"Unregistered bot: agent_id={agent_id}, token={bot_token[:8]}...")
return bot_token
def get_token_by_agent(self, agent_id: str) -> Optional[str]:
return self._agent_to_token.get(agent_id)
def get_agent_by_token(self, bot_token: str) -> Optional[str]:
return self._token_to_agent.get(bot_token)
def list_agents(self) -> List[str]:
"""Повернути список всіх зареєстрованих agent_id"""
return list(self._agent_to_token.keys())
bots_registry = BotsRegistry()