101 lines
3.3 KiB
Python
101 lines
3.3 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
from typing import Dict
|
|
|
|
import httpx
|
|
|
|
from .reminders import close_redis, pop_due_reminders
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
|
|
logger = logging.getLogger("daarion-reminder-worker")
|
|
|
|
POLL_SECONDS = float(os.getenv("DAARION_REMINDER_POLL_SECONDS", "2"))
|
|
TELEGRAM_TIMEOUT = float(os.getenv("DAARION_REMINDER_TELEGRAM_TIMEOUT", "20"))
|
|
|
|
AGENT_TOKEN_ENV: Dict[str, str] = {
|
|
"daarwizz": "DAARWIZZ_TELEGRAM_BOT_TOKEN",
|
|
"helion": "HELION_TELEGRAM_BOT_TOKEN",
|
|
"greenfood": "GREENFOOD_TELEGRAM_BOT_TOKEN",
|
|
"agromatrix": "AGROMATRIX_TELEGRAM_BOT_TOKEN",
|
|
"alateya": "ALATEYA_TELEGRAM_BOT_TOKEN",
|
|
"nutra": "NUTRA_TELEGRAM_BOT_TOKEN",
|
|
"druid": "DRUID_TELEGRAM_BOT_TOKEN",
|
|
"clan": "CLAN_TELEGRAM_BOT_TOKEN",
|
|
"eonarch": "EONARCH_TELEGRAM_BOT_TOKEN",
|
|
"senpai": "SENPAI_TELEGRAM_BOT_TOKEN",
|
|
"oneok": "ONEOK_TELEGRAM_BOT_TOKEN",
|
|
"soul": "SOUL_TELEGRAM_BOT_TOKEN",
|
|
"yaromir": "YAROMIR_TELEGRAM_BOT_TOKEN",
|
|
"sofiia": "SOFIIA_TELEGRAM_BOT_TOKEN",
|
|
}
|
|
|
|
|
|
def _token_for_agent(agent_id: str) -> str:
|
|
env = AGENT_TOKEN_ENV.get((agent_id or "").lower(), "")
|
|
return os.getenv(env, "") if env else ""
|
|
|
|
|
|
async def _send_reminder(item: Dict[str, str]) -> bool:
|
|
agent_id = str(item.get("agent_id", ""))
|
|
chat_id = str(item.get("chat_id", ""))
|
|
reminder_text = str(item.get("text", "")).strip()
|
|
due_at = str(item.get("due_at", ""))
|
|
|
|
token = _token_for_agent(agent_id)
|
|
if not token:
|
|
logger.warning("reminder_skip_no_token agent=%s reminder_id=%s", agent_id, item.get("reminder_id"))
|
|
return False
|
|
|
|
if not chat_id or not reminder_text:
|
|
logger.warning("reminder_skip_invalid_payload reminder_id=%s", item.get("reminder_id"))
|
|
return False
|
|
|
|
body = {
|
|
"chat_id": chat_id,
|
|
"text": f"⏰ Нагадування ({agent_id})\n\n{reminder_text}\n\n🕒 {due_at}",
|
|
}
|
|
|
|
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
|
async with httpx.AsyncClient(timeout=TELEGRAM_TIMEOUT) as client:
|
|
resp = await client.post(url, json=body)
|
|
if resp.status_code != 200:
|
|
logger.warning(
|
|
"reminder_send_failed reminder_id=%s status=%s body=%s",
|
|
item.get("reminder_id"),
|
|
resp.status_code,
|
|
resp.text[:300],
|
|
)
|
|
return False
|
|
|
|
logger.info("reminder_sent reminder_id=%s agent=%s chat=%s", item.get("reminder_id"), agent_id, chat_id)
|
|
return True
|
|
|
|
|
|
async def worker_loop() -> None:
|
|
logger.info("reminder_worker_started poll_seconds=%s", POLL_SECONDS)
|
|
while True:
|
|
try:
|
|
items = await pop_due_reminders(limit=20)
|
|
if items:
|
|
for item in items:
|
|
try:
|
|
await _send_reminder(item)
|
|
except Exception:
|
|
logger.exception("reminder_send_exception reminder_id=%s", item.get("reminder_id"))
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception:
|
|
logger.exception("reminder_worker_cycle_failed")
|
|
await asyncio.sleep(POLL_SECONDS)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(worker_loop())
|
|
finally:
|
|
try:
|
|
asyncio.run(close_redis())
|
|
except Exception:
|
|
pass
|