Files
microdao-daarion/telegram-infrastructure/CURSOR_INSTRUCTIONS.md

16 KiB
Raw Blame History

CURSOR_INSTRUCTIONS.md

telegram-gateway для DAARION / microdao

Мета: реалізувати сервіс telegram-gateway, який:

  • працює через Local Telegram Bot API (telegram-bot-api), без SSL і без публічних вебхуків;
  • отримує апдейти від декількох Telegram-ботів через long polling;
  • публікує вхідні повідомлення в NATS як події agent.telegram.update;
  • приймає HTTP-запити від DAGI/microdao для надсилання повідомлень назад у Telegram (agent.telegram.send);
  • інтегрується з існуючим Router (HTTP API http://router:9102), але не залежить від нього жорстко.

Інфраструктура (вже піднята Warp-ом):

  • telegram-bot-api (Local Bot API, http://telegram-bot-api:8081, всередині Docker-мережі).
  • nats (nats://nats:4222).
  • telegram-gateway (цей сервіс, FastAPI, порт 8000 всередині контейнера).

Все це описано в docker-compose.yml у каталозі telegram-infrastructure/.


0. Структура проєкту

Працюємо в каталозі:

telegram-infrastructure/telegram-gateway/

Очікувана структура:

telegram-infrastructure/
  docker-compose.yml
  .env

  telegram-gateway/
    Dockerfile
    requirements.txt  (або pyproject.toml — див. нижче)
    app/
      __init__.py
      main.py
      config.py
      nats_client.py
      telegram_listener.py
      models.py
      bots_registry.py

Завдання для Cursor: створити/оновити ці файли згідно інструкції нижче.


1. Залежності (requirements.txt)

Онови telegram-gateway/requirements.txt, додавши:

fastapi
uvicorn[standard]
aiogram==3.*
nats-py
pydantic-settings
httpx

Якщо вже існують інші залежності — збережи їх.


2. Конфігурація (app/config.py)

Створи файл app/config.py:

from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    # Local Telegram Bot API (Docker service)
    TELEGRAM_API_BASE: str = "http://telegram-bot-api:8081"

    # NATS event bus
    NATS_URL: str = "nats://nats:4222"

    # Опційно: URL Router-а DAGI/microdao
    ROUTER_BASE_URL: str = "http://router:9102"

    # Через це поле можна включити debug-логування
    DEBUG: bool = False

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"


settings = Settings()

3. Моделі подій і DTO (app/models.py)

Створи app/models.py з Pydantic-схемами:

from typing import Optional, Any, Dict
from pydantic import BaseModel


class TelegramUpdateEvent(BaseModel):
    """Подія 'agent.telegram.update' для NATS."""
    agent_id: str
    bot_id: str
    chat_id: int
    user_id: int
    text: Optional[str] = None
    raw_update: Dict[str, Any]


class TelegramSendCommand(BaseModel):
    """Команда від DAGI/microdao для надсилання повідомлення в Telegram."""
    agent_id: str
    chat_id: int
    text: str
    reply_to_message_id: Optional[int] = None


class BotRegistration(BaseModel):
    """HTTP payload для реєстрації нового бота/агента."""
    agent_id: str
    bot_token: str
    # опційно: allowed_chat_id, ім'я, тощо

4. Клієнт NATS (app/nats_client.py)

Створи app/nats_client.py з асинхронним клієнтом:

import json
from typing import Optional

import nats

from .config import settings


class NatsClient:
    def __init__(self, url: str):
        self._url = url
        self._nc: Optional[nats.NATS] = None

    async def connect(self) -> None:
        if self._nc is None or self._nc.is_closed:
            self._nc = await nats.connect(self._url)

    async def close(self) -> None:
        if self._nc and not self._nc.is_closed:
            await self._nc.drain()
            await self._nc.close()

    async def publish_json(self, subject: str, data: dict) -> None:
        if self._nc is None or self._nc.is_closed:
            await self.connect()
        await self._nc.publish(subject, json.dumps(data).encode("utf-8"))


nats_client = NatsClient(settings.NATS_URL)

На цьому етапі підписки NATS не потрібні — тільки publish. Підписуватися на agent.telegram.send можна пізніше, якщо буде потрібно. Зараз відправлення в Telegram робимо через HTTP /send.


5. Реєстр ботів (app/bots_registry.py)

Нам потрібна мапа agent_id → bot_token (і навпаки) для маршрутизації.

Створи app/bots_registry.py:

from typing import Dict, Optional

from .models import BotRegistration


class BotsRegistry:
    """
    Простий in-memory реєстр.
    TODO: замінити на персистентне сховище (PostgreSQL/microdao DB).
    """
    def __init__(self) -> None:
        self._agent_to_token: Dict[str, str] = {}
        self._token_to_agent: Dict[str, str] = {}

    def register(self, reg: BotRegistration) -> None:
        self._agent_to_token[reg.agent_id] = reg.bot_token
        self._token_to_agent[reg.bot_token] = reg.agent_id

    def get_token_by_agent(self, agent_id: str) -> Optional[str]:
        return self._agent_to_token.get(agent_id)

    def get_agent_by_token(self, bot_token: str) -> Optional[str]:
        return self._token_to_agent.get(bot_token)


bots_registry = BotsRegistry()

На MVP-дроті достатньо in-memory. Потім можна буде підключити БД microdao (таблиця telegram_bots).


6. Telegram listener (app/telegram_listener.py)

Задача: запускати long polling для кількох ботів через Local Bot API і на кожне вхідне повідомлення публікувати agent.telegram.update у NATS.

Створи app/telegram_listener.py:

import asyncio
import logging
from typing import Dict

from aiogram import Bot, Dispatcher, F
from aiogram.client.session.aiohttp import AiohttpSession
from aiogram.client.telegram import TelegramAPIServer
from aiogram.types import Message, Update

from .config import settings
from .models import TelegramUpdateEvent
from .nats_client import nats_client
from .bots_registry import bots_registry

logger = logging.getLogger(__name__)


class TelegramListener:
    def __init__(self) -> None:
        self._bots: Dict[str, Bot] = {}  # bot_token -> Bot
        self._dispatchers: Dict[str, Dispatcher] = {}
        self._tasks: Dict[str, asyncio.Task] = {}
        self._server = TelegramAPIServer.from_base(settings.TELEGRAM_API_BASE)

    async def _create_bot(self, bot_token: str) -> Bot:
        session = AiohttpSession(api=self._server)
        bot = Bot(token=bot_token, session=session)
        return bot

    async def add_bot(self, bot_token: str) -> None:
        if bot_token in self._bots:
            return

        bot = await self._create_bot(bot_token)
        dp = Dispatcher()

        @dp.message(F.text)
        async def on_message(message: Message) -> None:
            agent_id = bots_registry.get_agent_by_token(bot_token)
            if not agent_id:
                logger.warning("No agent_id for bot_token=%s", bot_token)
                return

            event = TelegramUpdateEvent(
                agent_id=agent_id,
                bot_id=f"bot:{bot_token[:8]}",
                chat_id=message.chat.id,
                user_id=message.from_user.id if message.from_user else 0,
                text=message.text,
                raw_update=message.model_dump()
            )
            await nats_client.publish_json(
                subject="agent.telegram.update",
                data=event.model_dump()
            )

        # Запускаємо polling у фоні
        async def _polling():
            try:
                logger.info("Start polling for bot %s", bot_token)
                await dp.start_polling(bot)
            except asyncio.CancelledError:
                logger.info("Polling cancelled for bot %s", bot_token)
            except Exception as e:
                logger.exception("Polling error for bot %s: %s", bot_token, e)
                raise

        task = asyncio.create_task(_polling())

        self._bots[bot_token] = bot
        self._dispatchers[bot_token] = dp
        self._tasks[bot_token] = task

    async def send_message(self, agent_id: str, chat_id: int, text: str, reply_to_message_id: int | None = None):
        bot_token = bots_registry.get_token_by_agent(agent_id)
        if not bot_token:
            raise RuntimeError(f"No bot token for agent_id={agent_id}")

        bot = self._bots.get(bot_token)
        if not bot:
            # Якщо бот ще не запущений (наприклад, перший виклик через /send)
            await self.add_bot(bot_token)
            bot = self._bots[bot_token]

        await bot.send_message(
            chat_id=chat_id,
            text=text,
            reply_to_message_id=reply_to_message_id
        )

    async def shutdown(self):
        # Завершити polling задачі
        for task in self._tasks.values():
            task.cancel()
        await asyncio.gather(*self._tasks.values(), return_exceptions=True)

        # Закрити бот-сесії
        for bot in self._bots.values():
            await bot.session.close()


telegram_listener = TelegramListener()

7. FastAPI: main + HTTP-ендпоінти (app/main.py)

Онови app/main.py так, щоб:

  • при старті сервісу:
    • підключатися до NATS;
    • за потреби — попередньо запускати polling для вже відомих ботів (MVP можна пропустити);
  • надавати HTTP-ендпоінти:
    • GET /healthz
    • POST /bots/register — реєстрація нового бота для агента
    • POST /send — надсилання повідомлення в Telegram від агента
import asyncio
import logging

from fastapi import FastAPI, HTTPException

from .config import settings
from .models import BotRegistration, TelegramSendCommand
from .bots_registry import bots_registry
from .nats_client import nats_client
from .telegram_listener import telegram_listener

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO if settings.DEBUG else logging.WARNING)

app = FastAPI(title="telegram-gateway", version="0.1.0")


@app.on_event("startup")
async def on_startup():
    # Підключаємося до NATS
    await nats_client.connect()
    logger.info("Connected to NATS at %s", settings.NATS_URL)

    # На цьому етапі список ботів пустий; їх додаватимуть через /bots/register.
    # За потреби сюди можна додати завантаження конфігів з БД.


@app.on_event("shutdown")
async def on_shutdown():
    await telegram_listener.shutdown()
    await nats_client.close()


@app.get("/healthz")
async def healthz():
    return {"status": "ok"}


@app.post("/bots/register")
async def register_bot(reg: BotRegistration):
    """
    Прив'язати Telegram-бота до agent_id.
    1) Зберегти в реєстрі (in-memory);
    2) Запустити polling для цього bot_token.
    3) Опційно: опублікувати подію bot.registered у NATS.
    """
    bots_registry.register(reg)

    # Запускаємо polling
    asyncio.create_task(telegram_listener.add_bot(reg.bot_token))

    # Публікуємо подію реєстрації (може ловити Router або інший сервіс)
    await nats_client.publish_json(
        subject="bot.registered",
        data={"agent_id": reg.agent_id, "bot_token": reg.bot_token}
    )

    return {"status": "registered"}


@app.post("/send")
async def send_message(cmd: TelegramSendCommand):
    """
    Відправити повідомлення в Telegram від імені агента.
    Викликається DAGI Router / microdao.
    """
    try:
        await telegram_listener.send_message(
            agent_id=cmd.agent_id,
            chat_id=cmd.chat_id,
            text=cmd.text,
            reply_to_message_id=cmd.reply_to_message_id,
        )
    except RuntimeError as e:
        raise HTTPException(status_code=400, detail=str(e)) from e

    return {"status": "sent"}

8. Dockerfile (telegram-gateway/Dockerfile)

Переконайся, що Dockerfile відповідає цим вимогам:

FROM python:3.11-slim

WORKDIR /app

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY app ./app

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

9. docker-compose.yml (корінь telegram-infrastructure)

Переконайся, що сервіс telegram-gateway описаний приблизно так:

services:
  telegram-bot-api:
    image: ghcr.io/tdlib/telegram-bot-api:latest
    container_name: telegram-bot-api
    restart: unless-stopped
    env_file:
      - .env
    command:
      - --local
      - --http-port=8081
      - --dir=/var/lib/telegram-bot-api
    volumes:
      - ./data/telegram-bot-api:/var/lib/telegram-bot-api
    ports:
      - "127.0.0.1:8081:8081"

  nats:
    image: nats:2
    container_name: nats
    restart: unless-stopped
    ports:
      - "127.0.0.1:4222:4222"

  telegram-gateway:
    build: ./telegram-gateway
    container_name: telegram-gateway
    restart: unless-stopped
    env_file:
      - .env
    depends_on:
      - telegram-bot-api
      - nats
    ports:
      - "127.0.0.1:8000:8000"

10. Послідовність дій (для запуску й тесту)

  1. Переконайся, що .env у корені telegram-infrastructure містить:

    TELEGRAM_API_ID=XXXXXXX
    TELEGRAM_API_HASH=XXXXXXXXXXXXXXXXXXXXXXXXXXXX
    NATS_URL=nats://nats:4222
    TELEGRAM_API_BASE=http://telegram-bot-api:8081
    
  2. Запусти стек:

    docker compose build telegram-gateway
    docker compose up -d telegram-bot-api nats telegram-gateway
    
  3. Перевір telegram-gateway:

    curl http://127.0.0.1:8000/healthz
    # очікується: {"status":"ok"}
    
  4. Перевір Local Telegram Bot API (з реальним BOT_TOKEN):

    curl http://127.0.0.1:8081/bot<YOUR_BOT_TOKEN>/getMe
    
  5. Зареєструй бота для агента:

    curl -X POST http://127.0.0.1:8000/bots/register \
      -H "Content-Type: application/json" \
      -d '{
        "agent_id": "ag_helion",
        "bot_token": "<YOUR_BOT_TOKEN>"
      }'
    
  6. Надішли тестове повідомлення з агента:

    curl -X POST http://127.0.0.1:8000/send \
      -H "Content-Type: application/json" \
      -d '{
        "agent_id": "ag_helion",
        "chat_id": <YOUR_CHAT_ID>,
        "text": "Привіт від агента Helion!",
        "reply_to_message_id": null
      }'
    
  7. Перевір у логах nats або через окремий subscriber, що події agent.telegram.update публікуються при вхідних повідомленнях.


11. Що далі (опційно)

Після того, як MVP працює:

  • підключити реальну БД microdao для таблиці telegram_bots замість in-memory BotsRegistry;
  • додати підписника NATS на agent.telegram.send, щоб можна було слати повідомлення не тільки через HTTP /send, а й через події;
  • розширити payload подій (mode, team_id, channel_id) під існуючий Event Catalog microdao;
  • додати обмеження / rate limiting / логування в стилі microdao.