gateway: add privacy guard plus reminders and mentor relay commands
This commit is contained in:
100
gateway-bot/daarion_facade/reminder_worker.py
Normal file
100
gateway-bot/daarion_facade/reminder_worker.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from typing import Dict
|
||||
|
||||
import httpx
|
||||
|
||||
from .reminders import close_redis, pop_due_reminders
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
|
||||
logger = logging.getLogger("daarion-reminder-worker")
|
||||
|
||||
POLL_SECONDS = float(os.getenv("DAARION_REMINDER_POLL_SECONDS", "2"))
|
||||
TELEGRAM_TIMEOUT = float(os.getenv("DAARION_REMINDER_TELEGRAM_TIMEOUT", "20"))
|
||||
|
||||
AGENT_TOKEN_ENV: Dict[str, str] = {
|
||||
"daarwizz": "DAARWIZZ_TELEGRAM_BOT_TOKEN",
|
||||
"helion": "HELION_TELEGRAM_BOT_TOKEN",
|
||||
"greenfood": "GREENFOOD_TELEGRAM_BOT_TOKEN",
|
||||
"agromatrix": "AGROMATRIX_TELEGRAM_BOT_TOKEN",
|
||||
"alateya": "ALATEYA_TELEGRAM_BOT_TOKEN",
|
||||
"nutra": "NUTRA_TELEGRAM_BOT_TOKEN",
|
||||
"druid": "DRUID_TELEGRAM_BOT_TOKEN",
|
||||
"clan": "CLAN_TELEGRAM_BOT_TOKEN",
|
||||
"eonarch": "EONARCH_TELEGRAM_BOT_TOKEN",
|
||||
"senpai": "SENPAI_TELEGRAM_BOT_TOKEN",
|
||||
"oneok": "ONEOK_TELEGRAM_BOT_TOKEN",
|
||||
"soul": "SOUL_TELEGRAM_BOT_TOKEN",
|
||||
"yaromir": "YAROMIR_TELEGRAM_BOT_TOKEN",
|
||||
"sofiia": "SOFIIA_TELEGRAM_BOT_TOKEN",
|
||||
}
|
||||
|
||||
|
||||
def _token_for_agent(agent_id: str) -> str:
|
||||
env = AGENT_TOKEN_ENV.get((agent_id or "").lower(), "")
|
||||
return os.getenv(env, "") if env else ""
|
||||
|
||||
|
||||
async def _send_reminder(item: Dict[str, str]) -> bool:
|
||||
agent_id = str(item.get("agent_id", ""))
|
||||
chat_id = str(item.get("chat_id", ""))
|
||||
reminder_text = str(item.get("text", "")).strip()
|
||||
due_at = str(item.get("due_at", ""))
|
||||
|
||||
token = _token_for_agent(agent_id)
|
||||
if not token:
|
||||
logger.warning("reminder_skip_no_token agent=%s reminder_id=%s", agent_id, item.get("reminder_id"))
|
||||
return False
|
||||
|
||||
if not chat_id or not reminder_text:
|
||||
logger.warning("reminder_skip_invalid_payload reminder_id=%s", item.get("reminder_id"))
|
||||
return False
|
||||
|
||||
body = {
|
||||
"chat_id": chat_id,
|
||||
"text": f"⏰ Нагадування ({agent_id})\n\n{reminder_text}\n\n🕒 {due_at}",
|
||||
}
|
||||
|
||||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||||
async with httpx.AsyncClient(timeout=TELEGRAM_TIMEOUT) as client:
|
||||
resp = await client.post(url, json=body)
|
||||
if resp.status_code != 200:
|
||||
logger.warning(
|
||||
"reminder_send_failed reminder_id=%s status=%s body=%s",
|
||||
item.get("reminder_id"),
|
||||
resp.status_code,
|
||||
resp.text[:300],
|
||||
)
|
||||
return False
|
||||
|
||||
logger.info("reminder_sent reminder_id=%s agent=%s chat=%s", item.get("reminder_id"), agent_id, chat_id)
|
||||
return True
|
||||
|
||||
|
||||
async def worker_loop() -> None:
|
||||
logger.info("reminder_worker_started poll_seconds=%s", POLL_SECONDS)
|
||||
while True:
|
||||
try:
|
||||
items = await pop_due_reminders(limit=20)
|
||||
if items:
|
||||
for item in items:
|
||||
try:
|
||||
await _send_reminder(item)
|
||||
except Exception:
|
||||
logger.exception("reminder_send_exception reminder_id=%s", item.get("reminder_id"))
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception:
|
||||
logger.exception("reminder_worker_cycle_failed")
|
||||
await asyncio.sleep(POLL_SECONDS)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
asyncio.run(worker_loop())
|
||||
finally:
|
||||
try:
|
||||
asyncio.run(close_redis())
|
||||
except Exception:
|
||||
pass
|
||||
154
gateway-bot/daarion_facade/reminders.py
Normal file
154
gateway-bot/daarion_facade/reminders.py
Normal file
@@ -0,0 +1,154 @@
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from redis.asyncio import Redis
|
||||
|
||||
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
|
||||
REMINDER_PREFIX = "daarion:reminders"
|
||||
REMINDER_BY_ID = f"{REMINDER_PREFIX}:by_id"
|
||||
REMINDER_SCHEDULE = f"{REMINDER_PREFIX}:schedule"
|
||||
REMINDER_TTL_SECONDS = int(os.getenv("DAARION_REMINDER_TTL_SECONDS", str(30 * 24 * 3600)))
|
||||
|
||||
_redis: Optional[Redis] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class Reminder:
|
||||
reminder_id: str
|
||||
agent_id: str
|
||||
chat_id: str
|
||||
user_id: str
|
||||
text: str
|
||||
due_ts: int
|
||||
created_at: str
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"reminder_id": self.reminder_id,
|
||||
"agent_id": self.agent_id,
|
||||
"chat_id": self.chat_id,
|
||||
"user_id": self.user_id,
|
||||
"text": self.text,
|
||||
"due_ts": self.due_ts,
|
||||
"created_at": self.created_at,
|
||||
}
|
||||
|
||||
|
||||
async def redis_client() -> Redis:
|
||||
global _redis
|
||||
if _redis is None:
|
||||
_redis = Redis.from_url(REDIS_URL, decode_responses=True)
|
||||
return _redis
|
||||
|
||||
|
||||
async def close_redis() -> None:
|
||||
global _redis
|
||||
if _redis is not None:
|
||||
await _redis.close()
|
||||
_redis = None
|
||||
|
||||
|
||||
def _iso_now() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def _iso_from_ts(ts: int) -> str:
|
||||
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
|
||||
|
||||
|
||||
async def create_reminder(agent_id: str, chat_id: str, user_id: str, text: str, due_ts: int) -> Dict[str, Any]:
|
||||
reminder = Reminder(
|
||||
reminder_id=f"rem_{uuid.uuid4().hex[:16]}",
|
||||
agent_id=agent_id,
|
||||
chat_id=str(chat_id),
|
||||
user_id=str(user_id),
|
||||
text=text.strip(),
|
||||
due_ts=int(due_ts),
|
||||
created_at=_iso_now(),
|
||||
)
|
||||
|
||||
r = await redis_client()
|
||||
key = f"{REMINDER_BY_ID}:{reminder.reminder_id}"
|
||||
payload = json.dumps(reminder.to_dict(), ensure_ascii=False)
|
||||
|
||||
await r.set(key, payload, ex=REMINDER_TTL_SECONDS)
|
||||
await r.zadd(REMINDER_SCHEDULE, {reminder.reminder_id: float(reminder.due_ts)})
|
||||
|
||||
result = reminder.to_dict()
|
||||
result["due_at"] = _iso_from_ts(reminder.due_ts)
|
||||
return result
|
||||
|
||||
|
||||
async def list_reminders(agent_id: str, chat_id: str, user_id: str, limit: int = 10) -> List[Dict[str, Any]]:
|
||||
r = await redis_client()
|
||||
now_ts = int(time.time())
|
||||
ids = await r.zrangebyscore(REMINDER_SCHEDULE, min=now_ts - 365 * 24 * 3600, max="+inf", start=0, num=max(1, limit * 5))
|
||||
|
||||
out: List[Dict[str, Any]] = []
|
||||
for reminder_id in ids:
|
||||
raw = await r.get(f"{REMINDER_BY_ID}:{reminder_id}")
|
||||
if not raw:
|
||||
continue
|
||||
try:
|
||||
item = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
if item.get("agent_id") != agent_id:
|
||||
continue
|
||||
if str(item.get("chat_id")) != str(chat_id):
|
||||
continue
|
||||
if str(item.get("user_id")) != str(user_id):
|
||||
continue
|
||||
item["due_at"] = _iso_from_ts(int(item.get("due_ts", 0)))
|
||||
out.append(item)
|
||||
if len(out) >= limit:
|
||||
break
|
||||
return out
|
||||
|
||||
|
||||
async def cancel_reminder(reminder_id: str, agent_id: str, chat_id: str, user_id: str) -> bool:
|
||||
r = await redis_client()
|
||||
key = f"{REMINDER_BY_ID}:{reminder_id}"
|
||||
raw = await r.get(key)
|
||||
if not raw:
|
||||
return False
|
||||
try:
|
||||
item = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
return False
|
||||
|
||||
if item.get("agent_id") != agent_id or str(item.get("chat_id")) != str(chat_id) or str(item.get("user_id")) != str(user_id):
|
||||
return False
|
||||
|
||||
await r.delete(key)
|
||||
await r.zrem(REMINDER_SCHEDULE, reminder_id)
|
||||
return True
|
||||
|
||||
|
||||
async def pop_due_reminders(limit: int = 20) -> List[Dict[str, Any]]:
|
||||
r = await redis_client()
|
||||
now_ts = int(time.time())
|
||||
ids = await r.zrangebyscore(REMINDER_SCHEDULE, min="-inf", max=now_ts, start=0, num=max(1, limit))
|
||||
out: List[Dict[str, Any]] = []
|
||||
|
||||
for reminder_id in ids:
|
||||
removed = await r.zrem(REMINDER_SCHEDULE, reminder_id)
|
||||
if removed == 0:
|
||||
continue
|
||||
raw = await r.get(f"{REMINDER_BY_ID}:{reminder_id}")
|
||||
if not raw:
|
||||
continue
|
||||
await r.delete(f"{REMINDER_BY_ID}:{reminder_id}")
|
||||
try:
|
||||
item = json.loads(raw)
|
||||
item["due_at"] = _iso_from_ts(int(item.get("due_ts", now_ts)))
|
||||
out.append(item)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return out
|
||||
Reference in New Issue
Block a user