From 465669fc1dfd4fdfd954ab910dca65ca46058dd6 Mon Sep 17 00:00:00 2001 From: Apple Date: Thu, 5 Mar 2026 09:19:25 -0800 Subject: [PATCH] feat(gateway): phase7 public access layer (entitlements, rate limits, public list) --- docs/ops/phase7_public_access.md | 106 ++ gateway-bot/gateway_experience_bus.py | 413 ++++++ gateway-bot/http_api.py | 1560 +++++++++++++++++++++- gateway-bot/metrics.py | 73 + migrations/056_agent_access_policies.sql | 61 + 5 files changed, 2187 insertions(+), 26 deletions(-) create mode 100644 docs/ops/phase7_public_access.md create mode 100644 gateway-bot/gateway_experience_bus.py create mode 100644 migrations/056_agent_access_policies.sql diff --git a/docs/ops/phase7_public_access.md b/docs/ops/phase7_public_access.md new file mode 100644 index 00000000..a32c2f9d --- /dev/null +++ b/docs/ops/phase7_public_access.md @@ -0,0 +1,106 @@ +# Phase-7 Public Access Layer + +## Scope +- Public discovery endpoint: `GET /v1/agents/public` +- Entitlements check in gateway before router call +- Rate limits in gateway for: + - `user_global` + - `user_agent` + - `group_agent` + +## Data Model +Migration: `migrations/056_agent_access_policies.sql` + +Tables: +- `agent_access_policies` +- `agent_allowlist` + +## Gateway Env +- `GATEWAY_PUBLIC_ACCESS_ENABLED=true` +- `GATEWAY_ACCESS_POLICY_CACHE_TTL_SECONDS=60` +- `GATEWAY_ALLOWLIST_CACHE_TTL_SECONDS=30` +- `GATEWAY_ACCESS_DB_TIMEOUT_MS=40` +- `GATEWAY_ACCESS_DENY_COOLDOWN_SECONDS=30` +- `GATEWAY_RL_USER_GLOBAL_LIMIT=60` +- `GATEWAY_RL_USER_GLOBAL_WINDOW_SECONDS=300` +- `GATEWAY_RL_USER_AGENT_LIMIT=20` +- `GATEWAY_RL_USER_AGENT_WINDOW_SECONDS=300` +- `GATEWAY_RL_GROUP_AGENT_LIMIT=10` +- `GATEWAY_RL_GROUP_AGENT_WINDOW_SECONDS=300` + +## Public Discovery +```bash +curl -sS http://127.0.0.1:9300/v1/agents/public | jq +``` + +Expected: +- `count` includes only `enabled && public_active` agents. +- planned/internal agents are excluded. + +## Entitlements Operations +Add whitelist user: +```sql +INSERT INTO agent_allowlist(platform, platform_user_id, agent_id) +VALUES ('telegram', '123456789', 'helion') +ON CONFLICT (platform, platform_user_id, agent_id) DO NOTHING; +``` + +Require whitelist for an agent: +```sql +UPDATE agent_access_policies +SET requires_whitelist = TRUE, updated_at = now() +WHERE agent_id = 'helion'; +``` + +Disable agent public access: +```sql +UPDATE agent_access_policies +SET enabled = FALSE, public_active = FALSE, updated_at = now() +WHERE agent_id = 'aistalk'; +``` + +## Rate-Limit Policy Update +```sql +UPDATE agent_access_policies +SET + user_global_limit = 30, + user_global_window_seconds = 300, + user_agent_limit = 10, + user_agent_window_seconds = 300, + group_agent_limit = 5, + group_agent_window_seconds = 300, + updated_at = now() +WHERE agent_id = 'agromatrix'; +``` + +## Fixed Smoke +1. Discovery: +```bash +curl -sS http://127.0.0.1:9300/v1/agents/public | jq '.count' +``` + +2. Whitelist deny: +- Set `requires_whitelist=true` for test agent. +- Replay webhook from user not in allowlist. +- Expected: deny ACK and event reason `access_whitelist_required`. + +3. Whitelist allow: +- Insert user to `agent_allowlist`. +- Replay webhook. +- Expected: request continues to normal processing path. + +4. Rate limit: +- Set low policy (`user_agent_limit=2`, window 60s). +- Send 3 quick webhooks from same user/agent. +- Expected: third request is `429`-style deny path and `reason=rate_limit_user_agent`. + +5. Event invariant: +- `1 webhook -> 1 gateway event` remains true. + +## PASS +- `/v1/agents/public` returns only public enabled agents. +- Entitlement decisions are deterministic (`allow|deny|rate_limited`). +- Metrics increment: + - `gateway_access_decisions_total` + - `gateway_rate_limited_total` +- No regression in webhook event finalize behavior. diff --git a/gateway-bot/gateway_experience_bus.py b/gateway-bot/gateway_experience_bus.py new file mode 100644 index 00000000..1283a7da --- /dev/null +++ b/gateway-bot/gateway_experience_bus.py @@ -0,0 +1,413 @@ +"""Gateway experience event publisher/store (Phase-4). + +Best-effort, fail-open telemetry for gateway webhook flow: +- publish to JetStream subject agent.experience.v1. +- optional DB append-only insert into agent_experience_events +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import uuid +from datetime import datetime, timezone +from typing import Any, Dict, Optional + +try: + import asyncpg +except ImportError: # pragma: no cover + asyncpg = None # type: ignore[assignment] + +try: + import nats +except ImportError: # pragma: no cover + nats = None # type: ignore[assignment] + +try: + from metrics import GATEWAY_EXPERIENCE_PUBLISHED_TOTAL + METRICS_AVAILABLE = True +except Exception: # pragma: no cover + METRICS_AVAILABLE = False + GATEWAY_EXPERIENCE_PUBLISHED_TOTAL = None # type: ignore[assignment] + + +logger = logging.getLogger("gateway.experience_bus") + + +def _metric_publish(status: str) -> None: + if METRICS_AVAILABLE and GATEWAY_EXPERIENCE_PUBLISHED_TOTAL is not None: + GATEWAY_EXPERIENCE_PUBLISHED_TOTAL.labels(status=status).inc() + + +class GatewayExperienceBus: + def __init__(self) -> None: + self.enabled = os.getenv("EXPERIENCE_BUS_ENABLED", "true").lower() in {"1", "true", "yes"} + self.enable_nats = os.getenv("EXPERIENCE_ENABLE_NATS", "true").lower() in {"1", "true", "yes"} + self.enable_db = os.getenv("EXPERIENCE_ENABLE_DB", "true").lower() in {"1", "true", "yes"} + + self.node_id = os.getenv("NODE_ID", "NODA1") + self.nats_url = os.getenv("NATS_URL", "nats://nats:4222") + self.stream_name = os.getenv("EXPERIENCE_STREAM_NAME", "EXPERIENCE") + self.subject_prefix = os.getenv("EXPERIENCE_SUBJECT_PREFIX", "agent.experience.v1") + self.publish_timeout_s = float(os.getenv("EXPERIENCE_PUBLISH_TIMEOUT_MS", "800") or 800) / 1000.0 + self.db_timeout_s = float(os.getenv("EXPERIENCE_DB_TIMEOUT_MS", "1200") or 1200) / 1000.0 + + self.db_dsn = os.getenv("EXPERIENCE_DATABASE_URL") or os.getenv("DATABASE_URL") + + self._lock = asyncio.Lock() + self._nc: Any = None + self._js: Any = None + self._pool: Any = None + self._stream_ensured = False + + async def capture(self, event: Dict[str, Any]) -> None: + if not self.enabled: + return + + try: + await self._ensure_clients() + except Exception as e: # pragma: no cover + logger.debug("gateway experience ensure clients failed: %s", e) + + nats_ok = await self._publish_nats(event) + db_ok = await self._insert_db(event) + + if nats_ok or db_ok: + _metric_publish("ok") + else: + _metric_publish("err") + + async def get_anti_silent_tuning_lesson( + self, + *, + reason: str, + chat_type: str, + timeout_s: float = 0.04, + ) -> Optional[Dict[str, Any]]: + """Lookup active anti-silent tuning lesson for (reason, chat_type). + + Returns lesson raw payload or None. Fail-open by design. + """ + if not self.enabled or not self.enable_db: + return None + try: + await self._ensure_clients() + except Exception: + return None + if self._pool is None: + return None + + trigger = f"reason={reason};chat_type={chat_type}" + query = """ + SELECT raw + FROM agent_lessons + WHERE COALESCE(raw->>'lesson_type', '') = 'anti_silent_tuning' + AND trigger = $1 + AND ( + NULLIF(COALESCE(raw->>'expires_at', ''), '') IS NULL + OR (raw->>'expires_at')::timestamptz > now() + ) + ORDER BY ts DESC + LIMIT 1 + """ + try: + async with self._pool.acquire() as conn: + row = await asyncio.wait_for(conn.fetchrow(query, trigger), timeout=timeout_s) + if row is None: + return None + raw = row.get("raw") + if isinstance(raw, dict): + return raw + if isinstance(raw, str): + return json.loads(raw) + return None + except Exception: + return None + + async def get_agent_access_policy( + self, + *, + agent_id: str, + timeout_s: float = 0.04, + ) -> Optional[Dict[str, Any]]: + """Lookup access policy row for an agent. Returns None on miss/errors.""" + if not self.enabled or not self.enable_db: + return None + try: + await self._ensure_clients() + except Exception: + return None + if self._pool is None: + return None + + query = """ + SELECT + agent_id, + enabled, + public_active, + requires_whitelist, + user_global_limit, + user_global_window_seconds, + user_agent_limit, + user_agent_window_seconds, + group_agent_limit, + group_agent_window_seconds + FROM agent_access_policies + WHERE agent_id = $1 + LIMIT 1 + """ + try: + async with self._pool.acquire() as conn: + row = await asyncio.wait_for(conn.fetchrow(query, agent_id), timeout=timeout_s) + if row is None: + return None + return { + "agent_id": row.get("agent_id"), + "enabled": bool(row.get("enabled")), + "public_active": bool(row.get("public_active")), + "requires_whitelist": bool(row.get("requires_whitelist")), + "user_global_limit": int(row.get("user_global_limit") or 0), + "user_global_window_seconds": int(row.get("user_global_window_seconds") or 0), + "user_agent_limit": int(row.get("user_agent_limit") or 0), + "user_agent_window_seconds": int(row.get("user_agent_window_seconds") or 0), + "group_agent_limit": int(row.get("group_agent_limit") or 0), + "group_agent_window_seconds": int(row.get("group_agent_window_seconds") or 0), + } + except Exception: + return None + + async def is_allowlisted( + self, + *, + platform: str, + platform_user_id: str, + agent_id: str, + timeout_s: float = 0.04, + ) -> bool: + """Return True when (platform, user, agent) exists in allowlist.""" + if not self.enabled or not self.enable_db: + return False + try: + await self._ensure_clients() + except Exception: + return False + if self._pool is None: + return False + + query = """ + SELECT 1 + FROM agent_allowlist + WHERE platform = $1 + AND platform_user_id = $2 + AND agent_id = $3 + LIMIT 1 + """ + try: + async with self._pool.acquire() as conn: + row = await asyncio.wait_for( + conn.fetchrow(query, platform, platform_user_id, agent_id), + timeout=timeout_s, + ) + return row is not None + except Exception: + return False + + async def close(self) -> None: + if self._pool is not None: + try: + await self._pool.close() + except Exception: + pass + self._pool = None + + if self._nc is not None: + try: + await self._nc.close() + except Exception: + pass + self._nc = None + self._js = None + + async def _ensure_clients(self) -> None: + async with self._lock: + if self.enable_nats and self._nc is None and nats is not None: + try: + self._nc = await nats.connect(self.nats_url) + self._js = self._nc.jetstream() + except Exception as e: + logger.debug("gateway experience nats connect failed: %s", e) + self._nc = None + self._js = None + + if self.enable_db and self._pool is None and asyncpg is not None and self.db_dsn: + try: + self._pool = await asyncpg.create_pool(self.db_dsn, min_size=1, max_size=2) + except Exception as e: + logger.debug("gateway experience db pool failed: %s", e) + self._pool = None + + if self._js is not None and not self._stream_ensured: + await self._ensure_stream() + + async def _ensure_stream(self) -> None: + if self._js is None: + return + subjects = [f"{self.subject_prefix}.>"] + try: + await self._js.stream_info(self.stream_name) + self._stream_ensured = True + return + except Exception: + pass + + try: + await self._js.add_stream(name=self.stream_name, subjects=subjects) + self._stream_ensured = True + except Exception as e: + logger.debug("gateway experience ensure stream failed: %s", e) + + async def _publish_nats(self, event: Dict[str, Any]) -> bool: + if not self.enable_nats: + return False + if self._js is None: + return False + + subject = f"{self.subject_prefix}.{event.get('agent_id', 'unknown')}" + payload = json.dumps(event, ensure_ascii=False).encode("utf-8") + msg_id = str(event.get("event_id") or "").strip() + headers = {"Nats-Msg-Id": msg_id} if msg_id else None + + try: + await asyncio.wait_for(self._js.publish(subject, payload, headers=headers), timeout=self.publish_timeout_s) + return True + except Exception as e: + logger.debug("gateway experience nats publish failed: %s", e) + return False + + async def _insert_db(self, event: Dict[str, Any]) -> bool: + if not self.enable_db: + return False + if self._pool is None: + return False + + llm = event.get("llm") or {} + result = event.get("result") or {} + + query = """ + INSERT INTO agent_experience_events ( + event_id, + ts, + node_id, + source, + agent_id, + task_type, + request_id, + channel, + inputs_hash, + provider, + model, + profile, + latency_ms, + tokens_in, + tokens_out, + ok, + error_class, + error_msg_redacted, + http_status, + raw + ) VALUES ( + $1::uuid, + $2::timestamptz, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13, + $14, + $15, + $16, + $17, + $18, + $19, + $20::jsonb + ) + ON CONFLICT (event_id) DO NOTHING + """ + + try: + payload_json = json.dumps(event, ensure_ascii=False) + async with self._pool.acquire() as conn: + await asyncio.wait_for( + conn.execute( + query, + _as_uuid(event.get("event_id")), + _as_timestamptz(event.get("ts")), + event.get("node_id", self.node_id), + event.get("source", "gateway"), + event.get("agent_id"), + event.get("task_type", "webhook"), + event.get("request_id"), + event.get("channel", "telegram"), + event.get("inputs_hash"), + llm.get("provider", "gateway"), + llm.get("model", "gateway"), + llm.get("profile"), + int(llm.get("latency_ms") or 0), + _as_int_or_none(llm.get("tokens_in")), + _as_int_or_none(llm.get("tokens_out")), + bool(result.get("ok")), + result.get("error_class"), + result.get("error_msg_redacted"), + int(result.get("http_status") or 0), + payload_json, + ), + timeout=self.db_timeout_s, + ) + return True + except Exception as e: + logger.debug("gateway experience db insert failed: %s", e) + return False + + +def _as_int_or_none(value: Any) -> Optional[int]: + try: + if value is None: + return None + return int(value) + except Exception: + return None + + +def _as_uuid(value: Any) -> uuid.UUID: + try: + return uuid.UUID(str(value)) + except Exception: + return uuid.uuid4() + + +def _as_timestamptz(value: Any) -> datetime: + if isinstance(value, datetime): + return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc) + try: + parsed = datetime.fromisoformat(str(value).replace("Z", "+00:00")) + return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc) + except Exception: + return datetime.now(timezone.utc) + + +_gateway_bus_singleton: Optional[GatewayExperienceBus] = None + + +def get_gateway_experience_bus() -> GatewayExperienceBus: + global _gateway_bus_singleton + if _gateway_bus_singleton is None: + _gateway_bus_singleton = GatewayExperienceBus() + return _gateway_bus_singleton diff --git a/gateway-bot/http_api.py b/gateway-bot/http_api.py index eacfd234..8a29e262 100644 --- a/gateway-bot/http_api.py +++ b/gateway-bot/http_api.py @@ -14,13 +14,14 @@ import sys import time import uuid import httpx +from contextvars import ContextVar from pathlib import Path from typing import Dict, Any, Optional, List, Tuple -from datetime import datetime +from datetime import datetime, timezone from dataclasses import dataclass from io import BytesIO -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, Response from pydantic import BaseModel from router_client import send_to_router @@ -68,6 +69,46 @@ from behavior_policy import ( logger = logging.getLogger(__name__) +try: + from metrics import ( + GATEWAY_POLICY_DECISIONS_TOTAL, + GATEWAY_USER_SIGNAL_TOTAL, + GATEWAY_WEBHOOK_LATENCY_MS, + GATEWAY_EXPERIENCE_EMITTED_TOTAL, + GATEWAY_EARLY_RETURN_TOTAL, + GATEWAY_EVENT_FINALIZE_LATENCY_MS, + GATEWAY_ANTI_SILENT_TOTAL, + GATEWAY_ACK_SENT_TOTAL, + GATEWAY_ANTI_SILENT_TUNING_APPLIED_TOTAL, + GATEWAY_ACCESS_DECISIONS_TOTAL, + GATEWAY_RATE_LIMITED_TOTAL, + get_metrics as _gateway_get_metrics, + get_content_type as _gateway_metrics_content_type, + ) + GATEWAY_METRICS_AVAILABLE = True +except Exception: + GATEWAY_METRICS_AVAILABLE = False + GATEWAY_POLICY_DECISIONS_TOTAL = None # type: ignore[assignment] + GATEWAY_USER_SIGNAL_TOTAL = None # type: ignore[assignment] + GATEWAY_WEBHOOK_LATENCY_MS = None # type: ignore[assignment] + GATEWAY_EXPERIENCE_EMITTED_TOTAL = None # type: ignore[assignment] + GATEWAY_EARLY_RETURN_TOTAL = None # type: ignore[assignment] + GATEWAY_EVENT_FINALIZE_LATENCY_MS = None # type: ignore[assignment] + GATEWAY_ANTI_SILENT_TOTAL = None # type: ignore[assignment] + GATEWAY_ACK_SENT_TOTAL = None # type: ignore[assignment] + GATEWAY_ANTI_SILENT_TUNING_APPLIED_TOTAL = None # type: ignore[assignment] + GATEWAY_ACCESS_DECISIONS_TOTAL = None # type: ignore[assignment] + GATEWAY_RATE_LIMITED_TOTAL = None # type: ignore[assignment] + _gateway_get_metrics = None # type: ignore[assignment] + _gateway_metrics_content_type = None # type: ignore[assignment] + +try: + from gateway_experience_bus import get_gateway_experience_bus + GATEWAY_EXPERIENCE_AVAILABLE = True +except Exception: + GATEWAY_EXPERIENCE_AVAILABLE = False + get_gateway_experience_bus = None # type: ignore[assignment] + def _safe_has_recent_interaction(agent_id: str, chat_id: str, user_id: str) -> bool: """Guard: avoid 500 if has_recent_interaction is missing or raises. Returns False on any error.""" @@ -99,6 +140,36 @@ RECENT_PHOTO_CONTEXT: Dict[str, Dict[str, Any]] = {} RECENT_PHOTO_TTL = 30 * 60 # 30 minutes AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID = "agent:agromatrix:global" +# Phase-4: user-signal heuristics for gateway webhook events +USER_SIGNAL_RETRY_WINDOW_SECONDS = max(5, int(os.getenv("GATEWAY_USER_SIGNAL_RETRY_WINDOW_SECONDS", "30"))) +USER_SIGNAL_CACHE_MAX = max(1000, int(os.getenv("GATEWAY_USER_SIGNAL_CACHE_MAX", "50000"))) +_USER_SIGNAL_CACHE: Dict[str, Dict[str, Any]] = {} + +USER_SIGNAL_POSITIVE_MARKERS = ( + "дякую", + "дякс", + "ок", + "окей", + "супер", + "клас", + "дяку", + "thanks", + "thank you", + "great", +) + +USER_SIGNAL_NEGATIVE_MARKERS = ( + "не так", + "неправильно", + "невірно", + "погано", + "не працює", + "ти тупиш", + "wrong", + "bad", + "not correct", +) + def _cleanup_recent_photo_context() -> None: now = time.time() @@ -268,6 +339,898 @@ def _truncate_context_for_prompt(raw: str, *, max_chars: int = 2200, max_lines: return out.strip() +def _normalize_policy_reason(reason: str) -> str: + normalized = re.sub(r"[^a-z0-9_]+", "_", str(reason or "").strip().lower()) + normalized = normalized.strip("_") + return (normalized or "unknown")[:80] + + +def _derive_provider_from_backend(backend: str) -> str: + b = str(backend or "").lower() + if "mistral" in b: + return "mistral" + if "deepseek" in b: + return "deepseek" + if "grok" in b: + return "grok" + if "anthropic" in b or "claude" in b: + return "anthropic" + if "openai" in b: + return "openai" + if "glm" in b: + return "glm" + if "ollama" in b or "local" in b: + return "local" + return "other" + + +def _build_gateway_inputs_hash( + *, + agent_id: str, + channel: str, + message_length: int, + message_type: str, + attachment_count: int, + ts_bucket_min: int, +) -> str: + envelope = { + "agent_id": str(agent_id or "").strip().lower(), + "channel": str(channel or "telegram").strip().lower(), + "message_length": max(0, int(message_length or 0)), + "message_type": str(message_type or "text").strip().lower(), + "attachment_count": max(0, int(attachment_count or 0)), + "ts_bucket_min": int(ts_bucket_min), + } + raw = json.dumps(envelope, ensure_ascii=False, sort_keys=True, separators=(",", ":")) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +def _normalize_signal_text(text: str) -> str: + return re.sub(r"\s+", " ", str(text or "").strip().lower())[:500] + + +def _detect_user_signal(agent_id: str, chat_id: str, user_id: str, text: str) -> str: + normalized = _normalize_signal_text(text) + if not normalized: + return "none" + + if any(marker in normalized for marker in USER_SIGNAL_NEGATIVE_MARKERS): + signal = "negative" + elif any(marker in normalized for marker in USER_SIGNAL_POSITIVE_MARKERS): + signal = "positive" + else: + signal = "none" + + key = f"{agent_id}:{chat_id}:{user_id}" + now = time.time() + prev = _USER_SIGNAL_CACHE.get(key) + if ( + signal == "none" + and prev + and prev.get("text") == normalized + and (now - float(prev.get("ts", 0.0))) <= USER_SIGNAL_RETRY_WINDOW_SECONDS + ): + signal = "retry" + + _USER_SIGNAL_CACHE[key] = {"text": normalized, "ts": now} + if len(_USER_SIGNAL_CACHE) > USER_SIGNAL_CACHE_MAX: + cutoff = now - (USER_SIGNAL_RETRY_WINDOW_SECONDS * 3) + stale_keys = [k for k, v in _USER_SIGNAL_CACHE.items() if float(v.get("ts", 0.0)) < cutoff] + for stale in stale_keys: + _USER_SIGNAL_CACHE.pop(stale, None) + while len(_USER_SIGNAL_CACHE) > USER_SIGNAL_CACHE_MAX: + _USER_SIGNAL_CACHE.pop(next(iter(_USER_SIGNAL_CACHE))) + + return signal + + +def _safe_redact_error(value: Optional[str]) -> Optional[str]: + if value is None: + return None + text = str(value) + text = re.sub(r"(?i)(authorization\s*:\s*bearer)\s+[A-Za-z0-9._-]+", r"\1 [redacted]", text) + text = re.sub(r"(?i)(api[_-]?key|token|password|secret)\s*[:=]\s*[^\s,;]+", r"\1=[redacted]", text) + text = re.sub(r"\s+", " ", text).strip() + return text[:300] if len(text) > 300 else text + + +def _agent_runtime_info(agent_id: str) -> Dict[str, Any]: + meta = AGENT_RUNTIME_META.get(agent_id, {}) + return { + "visibility": meta.get("visibility", "public"), + "lifecycle_status": meta.get("lifecycle_status", "active"), + "telegram_mode": meta.get("telegram_mode", "on"), + } + + +def _is_public_active_agent(agent_id: str) -> bool: + info = _agent_runtime_info(agent_id) + return ( + info.get("visibility") == "public" + and info.get("lifecycle_status") == "active" + and info.get("telegram_mode") != "off" + ) + + +def _normalize_chat_type(chat_type: Optional[str]) -> str: + ct = str(chat_type or "").strip().lower() + if ct in {"group", "supergroup"}: + return "group" + if ct in {"private", "channel"}: + return ct + return "unknown" + + +def _default_access_policy(agent_id: str) -> Dict[str, Any]: + runtime = _agent_runtime_info(agent_id) + lifecycle = str(runtime.get("lifecycle_status") or "active") + telegram_mode = str(runtime.get("telegram_mode") or "on") + enabled = lifecycle == "active" and telegram_mode != "off" + return { + "agent_id": agent_id, + "enabled": enabled, + "public_active": _is_public_active_agent(agent_id), + "requires_whitelist": False, + "user_global_limit": PUBLIC_ACCESS_DEFAULT_USER_GLOBAL_LIMIT, + "user_global_window_seconds": PUBLIC_ACCESS_DEFAULT_USER_GLOBAL_WINDOW, + "user_agent_limit": PUBLIC_ACCESS_DEFAULT_USER_AGENT_LIMIT, + "user_agent_window_seconds": PUBLIC_ACCESS_DEFAULT_USER_AGENT_WINDOW, + "group_agent_limit": PUBLIC_ACCESS_DEFAULT_GROUP_AGENT_LIMIT, + "group_agent_window_seconds": PUBLIC_ACCESS_DEFAULT_GROUP_AGENT_WINDOW, + } + + +def _cleanup_access_policy_cache(now_ts: float) -> None: + if len(_ACCESS_POLICY_CACHE) <= PUBLIC_ACCESS_CACHE_MAX: + return + cutoff = now_ts - max(PUBLIC_ACCESS_POLICY_CACHE_TTL_SECONDS * 4, 120) + stale_keys = [k for k, v in _ACCESS_POLICY_CACHE.items() if float(v.get("ts", 0.0)) < cutoff] + for key in stale_keys: + _ACCESS_POLICY_CACHE.pop(key, None) + while len(_ACCESS_POLICY_CACHE) > PUBLIC_ACCESS_CACHE_MAX: + _ACCESS_POLICY_CACHE.pop(next(iter(_ACCESS_POLICY_CACHE))) + + +def _cleanup_allowlist_cache(now_ts: float) -> None: + if len(_ALLOWLIST_CACHE) <= PUBLIC_ACCESS_CACHE_MAX: + return + cutoff = now_ts - max(PUBLIC_ACCESS_ALLOWLIST_CACHE_TTL_SECONDS * 4, 120) + stale_keys = [k for k, v in _ALLOWLIST_CACHE.items() if float(v.get("ts", 0.0)) < cutoff] + for key in stale_keys: + _ALLOWLIST_CACHE.pop(key, None) + while len(_ALLOWLIST_CACHE) > PUBLIC_ACCESS_CACHE_MAX: + _ALLOWLIST_CACHE.pop(next(iter(_ALLOWLIST_CACHE))) + + +def _cleanup_access_deny_cache(now_ts: float) -> None: + if len(_ACCESS_DENY_LAST_SENT) <= PUBLIC_ACCESS_CACHE_MAX: + return + cutoff = now_ts - max(ACCESS_DENY_COOLDOWN_SECONDS * 4, 120) + stale = [k for k, ts in _ACCESS_DENY_LAST_SENT.items() if ts < cutoff] + for key in stale: + _ACCESS_DENY_LAST_SENT.pop(key, None) + while len(_ACCESS_DENY_LAST_SENT) > PUBLIC_ACCESS_CACHE_MAX: + _ACCESS_DENY_LAST_SENT.pop(next(iter(_ACCESS_DENY_LAST_SENT))) + + +def _cleanup_access_rate_limit_counters(now_ts: float) -> None: + if len(_ACCESS_RATE_LIMIT_COUNTERS) <= PUBLIC_ACCESS_RATE_COUNTER_MAX: + return + stale_keys = [] + for key, payload in _ACCESS_RATE_LIMIT_COUNTERS.items(): + window_seconds = int(payload.get("window", 60) or 60) + window_start = float(payload.get("window_start", 0.0)) + if now_ts - window_start > max(window_seconds * 3, 120): + stale_keys.append(key) + for key in stale_keys: + _ACCESS_RATE_LIMIT_COUNTERS.pop(key, None) + while len(_ACCESS_RATE_LIMIT_COUNTERS) > PUBLIC_ACCESS_RATE_COUNTER_MAX: + _ACCESS_RATE_LIMIT_COUNTERS.pop(next(iter(_ACCESS_RATE_LIMIT_COUNTERS))) + + +async def _resolve_access_policy(agent_id: str) -> Dict[str, Any]: + policy = _default_access_policy(agent_id) + if not PUBLIC_ACCESS_CONTROL_ENABLED: + return policy + if not GATEWAY_EXPERIENCE_AVAILABLE or get_gateway_experience_bus is None: + return policy + + now_ts = time.time() + cached = _ACCESS_POLICY_CACHE.get(agent_id) + if cached and (now_ts - float(cached.get("ts", 0.0))) < PUBLIC_ACCESS_POLICY_CACHE_TTL_SECONDS: + cached_policy = cached.get("policy") + if isinstance(cached_policy, dict): + merged = dict(policy) + merged.update(cached_policy) + return merged + + fetched: Optional[Dict[str, Any]] = None + try: + bus = get_gateway_experience_bus() + fetched = await bus.get_agent_access_policy( + agent_id=agent_id, + timeout_s=PUBLIC_ACCESS_DB_TIMEOUT_MS / 1000.0, + ) + except Exception as exc: + logger.debug("access policy lookup failed agent=%s err=%s", agent_id, exc) + fetched = None + + if isinstance(fetched, dict): + merged = dict(policy) + merged.update(fetched) + policy = merged + _ACCESS_POLICY_CACHE[agent_id] = {"ts": now_ts, "policy": policy} + _cleanup_access_policy_cache(now_ts) + return policy + + +async def _resolve_allowlist( + *, + platform: str, + platform_user_id: str, + agent_id: str, +) -> bool: + if not PUBLIC_ACCESS_CONTROL_ENABLED: + return True + if not platform_user_id: + return False + if not GATEWAY_EXPERIENCE_AVAILABLE or get_gateway_experience_bus is None: + return False + + cache_key = f"{platform}:{platform_user_id}:{agent_id}" + now_ts = time.time() + cached = _ALLOWLIST_CACHE.get(cache_key) + if cached and (now_ts - float(cached.get("ts", 0.0))) < PUBLIC_ACCESS_ALLOWLIST_CACHE_TTL_SECONDS: + return bool(cached.get("allowed")) + + allowed = False + try: + bus = get_gateway_experience_bus() + allowed = await bus.is_allowlisted( + platform=platform, + platform_user_id=platform_user_id, + agent_id=agent_id, + timeout_s=PUBLIC_ACCESS_DB_TIMEOUT_MS / 1000.0, + ) + except Exception as exc: + logger.debug("allowlist lookup failed agent=%s user=%s err=%s", agent_id, platform_user_id, exc) + allowed = False + + _ALLOWLIST_CACHE[cache_key] = {"ts": now_ts, "allowed": bool(allowed)} + _cleanup_allowlist_cache(now_ts) + return bool(allowed) + + +def _consume_access_rate_limit( + *, + scope: str, + scope_key: str, + limit: int, + window_seconds: int, +) -> Tuple[bool, int]: + if limit <= 0: + return True, 0 + + now_ts = time.time() + key = f"{scope}:{scope_key}" + payload = _ACCESS_RATE_LIMIT_COUNTERS.get(key) + if payload is None: + _ACCESS_RATE_LIMIT_COUNTERS[key] = { + "count": 1.0, + "window_start": now_ts, + "window": float(window_seconds), + } + _cleanup_access_rate_limit_counters(now_ts) + return True, 0 + + window_start = float(payload.get("window_start", now_ts)) + elapsed = now_ts - window_start + if elapsed >= window_seconds: + _ACCESS_RATE_LIMIT_COUNTERS[key] = { + "count": 1.0, + "window_start": now_ts, + "window": float(window_seconds), + } + _cleanup_access_rate_limit_counters(now_ts) + return True, 0 + + count = int(payload.get("count", 0)) + if count >= limit: + retry_after_s = max(1, int(window_seconds - elapsed)) + return False, retry_after_s + + payload["count"] = float(count + 1) + payload["window"] = float(window_seconds) + _ACCESS_RATE_LIMIT_COUNTERS[key] = payload + _cleanup_access_rate_limit_counters(now_ts) + return True, 0 + + +def _access_template_id(reason_code: str) -> str: + normalized = _normalize_policy_reason(reason_code) + if normalized.startswith("rate_limit_"): + return "RATE_LIMIT" + if "whitelist" in normalized: + return "ACCESS_WHITELIST" + if "disabled" in normalized: + return "ACCESS_DISABLED" + return "ACCESS_DENIED" + + +def _access_template_text(template_id: str, chat_type: str, retry_after_s: int = 0) -> str: + group_mode = _normalize_chat_type(chat_type) == "group" + if template_id == "RATE_LIMIT": + if retry_after_s > 0: + return ( + f"Код: rate_limited. Повтори через {retry_after_s}с." + if group_mode + else f"Код: rate_limited. Запитів забагато. Спробуй через {retry_after_s} секунд." + ) + return ( + "Код: rate_limited. Запитів забагато." + if group_mode + else "Код: rate_limited. Запитів забагато. Спробуй трохи пізніше." + ) + if template_id == "ACCESS_WHITELIST": + return ( + "Код: access_denied. Потрібен інвайт." + if group_mode + else "Код: access_denied. Для цього агента потрібен інвайт. Попроси додати тебе в allowlist." + ) + if template_id == "ACCESS_DISABLED": + return ( + "Код: agent_disabled. Агент недоступний." + if group_mode + else "Код: agent_disabled. Цей агент тимчасово недоступний." + ) + return ( + "Код: access_denied. Доступ обмежений." + if group_mode + else "Код: access_denied. Доступ до цього агента обмежений." + ) + + +def _record_access_decision(decision: str, agent_id: str, chat_type: str) -> None: + if GATEWAY_METRICS_AVAILABLE and GATEWAY_ACCESS_DECISIONS_TOTAL is not None: + GATEWAY_ACCESS_DECISIONS_TOTAL.labels( + decision=decision, + agent_id=agent_id, + chat_type=_normalize_chat_type(chat_type), + ).inc() + + +async def _send_access_ack( + *, + chat_id: str, + chat_type: str, + agent_id: str, + telegram_token: Optional[str], + reason_code: str, + retry_after_s: int = 0, +) -> Tuple[str, str]: + normalized_chat_type = _normalize_chat_type(chat_type) + template_id = _access_template_id(reason_code) + if not telegram_token or str(chat_id or "").strip() in {"", "0", "unknown", "none"}: + return "ACK_SKIPPED_NO_CHANNEL", template_id + + normalized_reason = _normalize_policy_reason(reason_code) + now_ts = time.time() + if normalized_chat_type == "group": + cooldown_key = f"{chat_id}:{agent_id}:{normalized_reason}" + last_ts = _ACCESS_DENY_LAST_SENT.get(cooldown_key) + if last_ts and (now_ts - last_ts) < ACCESS_DENY_COOLDOWN_SECONDS: + return "ACK_SUPPRESSED_COOLDOWN", template_id + _ACCESS_DENY_LAST_SENT[cooldown_key] = now_ts + _cleanup_access_deny_cache(now_ts) + + text = _access_template_text(template_id, normalized_chat_type, retry_after_s=retry_after_s) + await send_telegram_message(chat_id, text, telegram_token) + return "ACK_EMITTED", template_id + + +async def _enforce_public_access( + *, + agent_config: "AgentConfig", + platform: str, + platform_user_id: str, + chat_id: str, + chat_type: str, + telegram_token: Optional[str], +) -> Optional[Dict[str, Any]]: + if not PUBLIC_ACCESS_CONTROL_ENABLED: + return None + + normalized_chat_type = _normalize_chat_type(chat_type) + policy = await _resolve_access_policy(agent_config.agent_id) + decision = "allow" + reason_code = "" + status_code = 200 + retry_after_s = 0 + rate_limit_scope: Optional[str] = None + + if not bool(policy.get("enabled", True)): + decision = "deny" + reason_code = "access_disabled" + status_code = 404 + elif not bool(policy.get("public_active", False)): + decision = "deny" + reason_code = "access_not_public" + status_code = 403 + elif bool(policy.get("requires_whitelist", False)): + allowed = await _resolve_allowlist( + platform=platform, + platform_user_id=platform_user_id, + agent_id=agent_config.agent_id, + ) + if not allowed: + decision = "deny" + reason_code = "access_whitelist_required" + status_code = 403 + + if decision == "allow": + checks = [ + ( + "user_global", + f"{platform}:{platform_user_id}", + int(policy.get("user_global_limit", PUBLIC_ACCESS_DEFAULT_USER_GLOBAL_LIMIT) or 0), + int(policy.get("user_global_window_seconds", PUBLIC_ACCESS_DEFAULT_USER_GLOBAL_WINDOW) or 0), + ), + ( + "user_agent", + f"{platform}:{platform_user_id}:{agent_config.agent_id}", + int(policy.get("user_agent_limit", PUBLIC_ACCESS_DEFAULT_USER_AGENT_LIMIT) or 0), + int(policy.get("user_agent_window_seconds", PUBLIC_ACCESS_DEFAULT_USER_AGENT_WINDOW) or 0), + ), + ] + if normalized_chat_type == "group" and chat_id: + checks.append( + ( + "group_agent", + f"{chat_id}:{agent_config.agent_id}", + int(policy.get("group_agent_limit", PUBLIC_ACCESS_DEFAULT_GROUP_AGENT_LIMIT) or 0), + int(policy.get("group_agent_window_seconds", PUBLIC_ACCESS_DEFAULT_GROUP_AGENT_WINDOW) or 0), + ) + ) + + for scope, key, limit, window_s in checks: + allowed, retry_after = _consume_access_rate_limit( + scope=scope, + scope_key=key, + limit=limit, + window_seconds=max(1, window_s), + ) + if not allowed: + decision = "rate_limited" + rate_limit_scope = scope + reason_code = f"rate_limit_{scope}" + status_code = 429 + retry_after_s = retry_after + break + + _record_access_decision(decision, agent_config.agent_id, normalized_chat_type) + if decision == "allow": + return None + + if decision == "rate_limited" and GATEWAY_METRICS_AVAILABLE and GATEWAY_RATE_LIMITED_TOTAL is not None: + GATEWAY_RATE_LIMITED_TOTAL.labels( + scope=rate_limit_scope or "unknown", + agent_id=agent_config.agent_id, + chat_type=normalized_chat_type, + ).inc() + + ack_action, template_id = await _send_access_ack( + chat_id=chat_id, + chat_type=normalized_chat_type, + agent_id=agent_config.agent_id, + telegram_token=telegram_token, + reason_code=reason_code, + retry_after_s=retry_after_s, + ) + return { + "ok": False, + "status": decision, + "action": "access_control", + "reason": reason_code, + "http_status": status_code, + "anti_silent_action": ack_action, + "template_id": template_id, + "chat_type": normalized_chat_type, + "retry_after": retry_after_s if retry_after_s > 0 else None, + } + + +def _anti_silent_template_id_for_reason(reason_code: str) -> str: + rc = _normalize_policy_reason(reason_code) + if "source_lock" in rc or "duplicate" in rc: + return "SOURCE_LOCK" + if "unsupported_no_message" in rc: + return "UNSUPPORTED_INPUT" + if "unsupported" in rc or "media" in rc or "photo" in rc: + return "UNSUPPORTED_MEDIA" + return "SILENT_POLICY" + + +def _anti_silent_template_text(template_id: str, chat_type: str) -> str: + variant = "group" if _normalize_chat_type(chat_type) == "group" else "private" + default_text = ANTI_SILENT_TEMPLATE_TEXTS["SILENT_POLICY"][variant] + return ANTI_SILENT_TEMPLATE_TEXTS.get(template_id, {}).get(variant, default_text) + + +def _extract_preferred_template_from_action(action: Optional[str]) -> Optional[str]: + if not action: + return None + text = str(action).strip() + prefix = "prefer_template=" + if not text.startswith(prefix): + return None + template_id = text[len(prefix) :].strip() + if not template_id: + return None + if not re.fullmatch(r"[A-Z0-9_]{2,64}", template_id): + return None + return template_id + + +def _cleanup_anti_silent_tuning_cache(now_ts: float) -> None: + if len(_ANTI_SILENT_TUNING_CACHE) <= _ANTI_SILENT_TUNING_CACHE_MAX: + return + cutoff = now_ts - max(ANTI_SILENT_TUNING_CACHE_TTL_SECONDS * 4, 300) + stale_keys = [ + key + for key, payload in _ANTI_SILENT_TUNING_CACHE.items() + if float(payload.get("ts", 0.0)) < cutoff + ] + for key in stale_keys: + _ANTI_SILENT_TUNING_CACHE.pop(key, None) + while len(_ANTI_SILENT_TUNING_CACHE) > _ANTI_SILENT_TUNING_CACHE_MAX: + _ANTI_SILENT_TUNING_CACHE.pop(next(iter(_ANTI_SILENT_TUNING_CACHE))) + + +async def _lookup_anti_silent_tuning_template(reason_code: str, chat_type: str) -> Optional[str]: + if not ANTI_SILENT_TUNING_ENABLED: + return None + if not GATEWAY_EXPERIENCE_AVAILABLE or get_gateway_experience_bus is None: + return None + + normalized_reason = _normalize_policy_reason(reason_code) + normalized_chat_type = _normalize_chat_type(chat_type) + cache_key = f"{normalized_reason}:{normalized_chat_type}" + now_ts = time.time() + cached = _ANTI_SILENT_TUNING_CACHE.get(cache_key) + if cached and (now_ts - float(cached.get("ts", 0.0))) < ANTI_SILENT_TUNING_CACHE_TTL_SECONDS: + return cached.get("template_id") + + template_id: Optional[str] = None + try: + bus = get_gateway_experience_bus() + lesson = await bus.get_anti_silent_tuning_lesson( + reason=normalized_reason, + chat_type=normalized_chat_type, + timeout_s=ANTI_SILENT_TUNING_DB_TIMEOUT_MS / 1000.0, + ) + if isinstance(lesson, dict): + template_id = _extract_preferred_template_from_action(lesson.get("action")) + except Exception as exc: + logger.debug("anti-silent tuning lookup failed reason=%s err=%s", normalized_reason, exc) + template_id = None + + _ANTI_SILENT_TUNING_CACHE[cache_key] = { + "ts": now_ts, + "template_id": template_id, + } + _cleanup_anti_silent_tuning_cache(now_ts) + return template_id + + +async def _resolve_anti_silent_template_id(reason_code: str, chat_type: str) -> Tuple[str, bool]: + default_template_id = _anti_silent_template_id_for_reason(reason_code) + tuned_template_id = await _lookup_anti_silent_tuning_template(reason_code, chat_type) + if not tuned_template_id: + return default_template_id, False + if tuned_template_id not in ANTI_SILENT_TEMPLATE_TEXTS: + return default_template_id, False + return tuned_template_id, True + + +def _cleanup_anti_silent_cache(now_ts: float) -> None: + if len(_ANTI_SILENT_LAST_SENT) <= _ANTI_SILENT_CACHE_MAX: + return + cutoff = now_ts - max(ANTI_SILENT_COOLDOWN_SECONDS * 4, 120) + stale = [k for k, ts in _ANTI_SILENT_LAST_SENT.items() if ts < cutoff] + for k in stale: + _ANTI_SILENT_LAST_SENT.pop(k, None) + while len(_ANTI_SILENT_LAST_SENT) > _ANTI_SILENT_CACHE_MAX: + _ANTI_SILENT_LAST_SENT.pop(next(iter(_ANTI_SILENT_LAST_SENT))) + + +async def _apply_anti_silent_ack( + *, + agent_config: "AgentConfig", + chat_id: str, + chat_type: str, + reason_code: str, +) -> Dict[str, Optional[str]]: + normalized_chat_type = _normalize_chat_type(chat_type) + normalized_reason = _normalize_policy_reason(reason_code) + result: Dict[str, Optional[str]] = { + "action": None, + "template_id": None, + "tuning_applied": "false", + } + + if not ANTI_SILENT_ENABLED: + return result + if not _is_public_active_agent(agent_config.agent_id): + return result + if str(chat_id or "").strip() in {"", "0", "unknown", "none"}: + return result + + template_id, tuning_applied = await _resolve_anti_silent_template_id(normalized_reason, normalized_chat_type) + result["tuning_applied"] = "true" if tuning_applied else "false" + cooldown_key = f"{chat_id}:{agent_config.agent_id}:{normalized_reason}" + now_ts = time.time() + last_ts = _ANTI_SILENT_LAST_SENT.get(cooldown_key) + if last_ts and (now_ts - last_ts) < ANTI_SILENT_COOLDOWN_SECONDS: + result["action"] = "ACK_SUPPRESSED_COOLDOWN" + result["template_id"] = template_id + if GATEWAY_METRICS_AVAILABLE and GATEWAY_ANTI_SILENT_TOTAL is not None: + GATEWAY_ANTI_SILENT_TOTAL.labels( + action="ACK_SUPPRESSED_COOLDOWN", + reason=normalized_reason, + chat_type=normalized_chat_type, + ).inc() + return result + + ack_text = _anti_silent_template_text(template_id, normalized_chat_type) + sent = await send_telegram_message(chat_id, ack_text, agent_config.get_telegram_token()) + _ANTI_SILENT_LAST_SENT[cooldown_key] = now_ts + _cleanup_anti_silent_cache(now_ts) + result["action"] = "ACK_EMITTED" + result["template_id"] = template_id + if GATEWAY_METRICS_AVAILABLE and GATEWAY_ANTI_SILENT_TOTAL is not None: + GATEWAY_ANTI_SILENT_TOTAL.labels( + action="ACK_EMITTED", + reason=normalized_reason, + chat_type=normalized_chat_type, + ).inc() + if ( + tuning_applied + and GATEWAY_METRICS_AVAILABLE + and GATEWAY_ANTI_SILENT_TUNING_APPLIED_TOTAL is not None + ): + GATEWAY_ANTI_SILENT_TUNING_APPLIED_TOTAL.labels( + reason=normalized_reason, + chat_type=normalized_chat_type, + template_id=template_id, + ).inc() + if sent and GATEWAY_METRICS_AVAILABLE and GATEWAY_ACK_SENT_TOTAL is not None: + GATEWAY_ACK_SENT_TOTAL.labels( + template_id=template_id, + chat_type=normalized_chat_type, + ).inc() + if not sent: + logger.info( + "Anti-silent ACK attempted but Telegram delivery failed: agent=%s chat_id=%s reason=%s", + agent_config.agent_id, + chat_id, + normalized_reason, + ) + logger.info( + "anti-silent ack action=%s reason=%s template=%s tuning_applied=%s chat_type=%s", + result.get("action"), + normalized_reason, + template_id, + result.get("tuning_applied"), + normalized_chat_type, + ) + return result + + +async def _emit_gateway_experience_event(event: Dict[str, Any]) -> None: + if not GATEWAY_EXPERIENCE_AVAILABLE or get_gateway_experience_bus is None: + return + try: + bus = get_gateway_experience_bus() + await bus.capture(event) + except Exception as e: + logger.debug("Gateway experience emit skipped: %s", e) + + +_GATEWAY_WEBHOOK_EVENT_CTX: ContextVar[Optional[Dict[str, Any]]] = ContextVar( + "gateway_webhook_event_ctx", + default=None, +) + + +def _derive_gateway_request_id(agent_id: str, update: "TelegramUpdate", fallback_ms: int) -> str: + msg = update.message or update.channel_post or {} + from_user = msg.get("from", {}) or msg.get("sender_chat", {}) + chat = msg.get("chat", {}) + user_id = str(from_user.get("id", "unknown")) + chat_id = str(chat.get("id", "unknown")) + update_part = update.update_id if update.update_id is not None else fallback_ms + seed = f"{agent_id}:{chat_id}:{user_id}:{update_part}" + return str(uuid.uuid5(uuid.NAMESPACE_URL, seed)) + + +def _message_type_from_update(msg: Dict[str, Any]) -> str: + if msg.get("document"): + return "document" + if msg.get("photo"): + return "photo" + if msg.get("voice") or msg.get("audio") or msg.get("video_note"): + return "voice" + return "text" + + +def _attachment_count_from_update(msg: Dict[str, Any]) -> int: + count = 0 + if msg.get("document"): + count += 1 + if msg.get("photo"): + count += len(msg.get("photo") or []) + if msg.get("voice") or msg.get("audio") or msg.get("video_note"): + count += 1 + if msg.get("animation"): + count += 1 + return count + + +def _build_default_gateway_event_ctx(agent_config: "AgentConfig", update: "TelegramUpdate") -> Dict[str, Any]: + now_ts = time.time() + msg = update.message or update.channel_post or {} + chat = msg.get("chat", {}) if isinstance(msg, dict) else {} + text = (msg.get("text") or msg.get("caption") or "") + message_type = _message_type_from_update(msg) + attachment_count = _attachment_count_from_update(msg) + request_id = str(uuid.uuid4()) + return { + "event_id": str(uuid.uuid4()), + "ts": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "node_id": os.getenv("NODE_ID", "NODA1"), + "source": "gateway", + "agent_id": agent_config.agent_id, + "request_id": request_id, + "channel": "telegram", + "chat_type": _normalize_chat_type(chat.get("type") if isinstance(chat, dict) else None), + "task_type": "webhook", + "inputs_hash": _build_gateway_inputs_hash( + agent_id=agent_config.agent_id, + channel="telegram", + message_length=len(text), + message_type=message_type, + attachment_count=attachment_count, + ts_bucket_min=int(now_ts // 60), + ), + "policy_decision": "UNKNOWN", + "policy_reason": "unknown", + "user_signal": "none", + "result_ok": True, + "http_status": 200, + "error_class": None, + "error_msg": None, + "llm_backend": None, + "llm_model": None, + "emit_path": "early_return", + "emitted": False, + "webhook_started_at": now_ts, + } + + +def _emit_gateway_event_from_ctx( + ctx: Dict[str, Any], + *, + policy_decision: str, + policy_reason: str, + result_ok: bool, + http_status: int, + error_class: Optional[str] = None, + error_msg: Optional[str] = None, + signal_override: Optional[str] = None, + llm_backend: Optional[str] = None, + llm_model: Optional[str] = None, + emit_path: Optional[str] = None, + early_reason: Optional[str] = None, + anti_silent_action: Optional[str] = None, + anti_silent_template: Optional[str] = None, + anti_silent_tuning_applied: Optional[str] = None, + chat_type: Optional[str] = None, +) -> None: + if ctx.get("emitted"): + return + + path = emit_path or ctx.get("emit_path") or ("exception" if not result_ok else "normal") + if path not in {"normal", "early_return", "exception"}: + path = "normal" + + reason_code = _normalize_policy_reason(policy_reason) + signal = signal_override or ctx.get("user_signal") or "none" + started_at = float(ctx.get("webhook_started_at") or time.time()) + latency_ms = max(0, int((time.time() - started_at) * 1000)) + + if GATEWAY_METRICS_AVAILABLE and GATEWAY_POLICY_DECISIONS_TOTAL is not None: + GATEWAY_POLICY_DECISIONS_TOTAL.labels( + sowa_decision=str(policy_decision or "UNKNOWN"), + reason=reason_code, + ).inc() + if GATEWAY_METRICS_AVAILABLE and GATEWAY_USER_SIGNAL_TOTAL is not None: + GATEWAY_USER_SIGNAL_TOTAL.labels(user_signal=signal).inc() + if GATEWAY_METRICS_AVAILABLE and GATEWAY_WEBHOOK_LATENCY_MS is not None: + GATEWAY_WEBHOOK_LATENCY_MS.observe(float(latency_ms)) + + if GATEWAY_METRICS_AVAILABLE and GATEWAY_EXPERIENCE_EMITTED_TOTAL is not None: + GATEWAY_EXPERIENCE_EMITTED_TOTAL.labels(status="ok", path=path).inc() + if ( + path == "early_return" + and GATEWAY_METRICS_AVAILABLE + and GATEWAY_EARLY_RETURN_TOTAL is not None + ): + early = _normalize_policy_reason(early_reason or reason_code or "unknown") + GATEWAY_EARLY_RETURN_TOTAL.labels(reason=early).inc() + if GATEWAY_METRICS_AVAILABLE and GATEWAY_EVENT_FINALIZE_LATENCY_MS is not None: + GATEWAY_EVENT_FINALIZE_LATENCY_MS.observe(float(latency_ms)) + + provider = _derive_provider_from_backend(llm_backend or ctx.get("llm_backend") or "") + ctx["policy_decision"] = str(policy_decision or "UNKNOWN") + ctx["policy_reason"] = reason_code + ctx["result_ok"] = bool(result_ok) + ctx["http_status"] = int(http_status) + ctx["error_class"] = error_class + ctx["error_msg"] = error_msg + ctx["user_signal"] = signal + ctx["llm_backend"] = llm_backend or ctx.get("llm_backend") + ctx["llm_model"] = llm_model or ctx.get("llm_model") + if anti_silent_action is not None: + ctx["anti_silent_action"] = anti_silent_action + if anti_silent_template is not None: + ctx["anti_silent_template"] = anti_silent_template + if anti_silent_tuning_applied is not None: + ctx["anti_silent_tuning_applied"] = anti_silent_tuning_applied + if chat_type is not None: + ctx["chat_type"] = _normalize_chat_type(chat_type) + + event = { + "event_id": str(ctx.get("event_id") or str(uuid.uuid4())), + "ts": str(ctx.get("ts") or datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")), + "node_id": str(ctx.get("node_id") or os.getenv("NODE_ID", "NODA1")), + "source": "gateway", + "agent_id": str(ctx.get("agent_id") or "unknown"), + "request_id": str(ctx.get("request_id") or ""), + "channel": str(ctx.get("channel") or "telegram"), + "chat_type": str(ctx.get("chat_type") or "unknown"), + "task_type": str(ctx.get("task_type") or "webhook"), + "inputs_hash": str(ctx.get("inputs_hash") or ""), + "anti_silent_action": ctx.get("anti_silent_action"), + "anti_silent_template": ctx.get("anti_silent_template"), + "anti_silent_tuning_applied": ctx.get("anti_silent_tuning_applied"), + "policy": { + "sowa_decision": str(policy_decision or "UNKNOWN"), + "reason": reason_code, + }, + "feedback": { + "user_signal": signal, + "operator_tag": None, + }, + "llm": { + "provider": provider, + "model": llm_model or ctx.get("llm_model") or "gateway", + "profile": None, + "latency_ms": latency_ms, + "tokens_in": None, + "tokens_out": None, + }, + "result": { + "ok": bool(result_ok), + "http_status": int(http_status), + "error_class": error_class, + "error_msg_redacted": _safe_redact_error(error_msg), + }, + } + ctx["emitted"] = True + ctx["emit_path"] = path + asyncio.create_task(_emit_gateway_experience_event(event)) + + def _agromatrix_observation_doc_id(file_id: str, label: str) -> str: digest = hashlib.sha1(f"{file_id}:{label}".encode("utf-8")).hexdigest()[:16] return f"agromatrix-photo-{digest}" @@ -1039,6 +2002,63 @@ AGENT_REGISTRY: Dict[str, AgentConfig] = { "monitor": MONITOR_CONFIG, "aistalk": AISTALK_CONFIG, } + +# Runtime metadata used by health and anti-silent adapter. +AGENT_RUNTIME_META: Dict[str, Dict[str, Any]] = { + "monitor": {"badges": ["per-node", "ops"], "visibility": "internal", "telegram_mode": "off"}, + "aistalk": {"badges": ["cyber", "private"], "visibility": "private", "lifecycle_status": "planned"}, + "sofiia": {"badges": ["supervisor", "architect"]}, + "helion": {"badges": ["cto", "dao"]}, +} + +# Phase-7 public access layer +PUBLIC_ACCESS_CONTROL_ENABLED = os.getenv("GATEWAY_PUBLIC_ACCESS_ENABLED", "true").lower() in {"1", "true", "yes"} +PUBLIC_ACCESS_POLICY_CACHE_TTL_SECONDS = max(5, int(os.getenv("GATEWAY_ACCESS_POLICY_CACHE_TTL_SECONDS", "60"))) +PUBLIC_ACCESS_ALLOWLIST_CACHE_TTL_SECONDS = max(5, int(os.getenv("GATEWAY_ALLOWLIST_CACHE_TTL_SECONDS", "30"))) +PUBLIC_ACCESS_DB_TIMEOUT_MS = max(5, int(os.getenv("GATEWAY_ACCESS_DB_TIMEOUT_MS", "40"))) +ACCESS_DENY_COOLDOWN_SECONDS = max(5, int(os.getenv("GATEWAY_ACCESS_DENY_COOLDOWN_SECONDS", "30"))) +PUBLIC_ACCESS_DEFAULT_USER_GLOBAL_LIMIT = max(1, int(os.getenv("GATEWAY_RL_USER_GLOBAL_LIMIT", "60"))) +PUBLIC_ACCESS_DEFAULT_USER_GLOBAL_WINDOW = max(5, int(os.getenv("GATEWAY_RL_USER_GLOBAL_WINDOW_SECONDS", "300"))) +PUBLIC_ACCESS_DEFAULT_USER_AGENT_LIMIT = max(1, int(os.getenv("GATEWAY_RL_USER_AGENT_LIMIT", "20"))) +PUBLIC_ACCESS_DEFAULT_USER_AGENT_WINDOW = max(5, int(os.getenv("GATEWAY_RL_USER_AGENT_WINDOW_SECONDS", "300"))) +PUBLIC_ACCESS_DEFAULT_GROUP_AGENT_LIMIT = max(1, int(os.getenv("GATEWAY_RL_GROUP_AGENT_LIMIT", "10"))) +PUBLIC_ACCESS_DEFAULT_GROUP_AGENT_WINDOW = max(5, int(os.getenv("GATEWAY_RL_GROUP_AGENT_WINDOW_SECONDS", "300"))) +PUBLIC_ACCESS_CACHE_MAX = max(1000, int(os.getenv("GATEWAY_ACCESS_CACHE_MAX", "50000"))) +PUBLIC_ACCESS_RATE_COUNTER_MAX = max(1000, int(os.getenv("GATEWAY_ACCESS_RATE_COUNTER_MAX", "120000"))) + +_ACCESS_POLICY_CACHE: Dict[str, Dict[str, Any]] = {} +_ALLOWLIST_CACHE: Dict[str, Dict[str, Any]] = {} +_ACCESS_DENY_LAST_SENT: Dict[str, float] = {} +_ACCESS_RATE_LIMIT_COUNTERS: Dict[str, Dict[str, float]] = {} + +ANTI_SILENT_ENABLED = os.getenv("GATEWAY_ANTI_SILENT_ENABLED", "true").lower() in {"1", "true", "yes"} +ANTI_SILENT_COOLDOWN_SECONDS = max(5, int(os.getenv("GATEWAY_ANTI_SILENT_COOLDOWN_SECONDS", "30"))) +_ANTI_SILENT_CACHE_MAX = max(1000, int(os.getenv("GATEWAY_ANTI_SILENT_CACHE_MAX", "50000"))) +_ANTI_SILENT_LAST_SENT: Dict[str, float] = {} +ANTI_SILENT_TUNING_ENABLED = os.getenv("ANTI_SILENT_TUNING_ENABLED", "false").lower() in {"1", "true", "yes"} +ANTI_SILENT_TUNING_DB_TIMEOUT_MS = max(5, int(os.getenv("ANTI_SILENT_TUNING_DB_TIMEOUT_MS", "40"))) +ANTI_SILENT_TUNING_CACHE_TTL_SECONDS = max(5, int(os.getenv("ANTI_SILENT_TUNING_CACHE_TTL_SECONDS", "60"))) +_ANTI_SILENT_TUNING_CACHE_MAX = max(1000, int(os.getenv("ANTI_SILENT_TUNING_CACHE_MAX", "50000"))) +_ANTI_SILENT_TUNING_CACHE: Dict[str, Dict[str, Any]] = {} + +ANTI_SILENT_TEMPLATE_TEXTS: Dict[str, Dict[str, str]] = { + "SILENT_POLICY": { + "group": "Код: policy_block. Уточни запит одним реченням або звернись до мене явно.", + "private": "Код: policy_block. Уточни запит одним реченням або додай більше контексту — я одразу відповім.", + }, + "SOURCE_LOCK": { + "group": "Код: source_lock. Бачу дубль повідомлення. Якщо це новий запит — повтори через 5с.", + "private": "Код: source_lock. Отримав дубль повідомлення. Якщо це новий запит — повтори через 5 секунд.", + }, + "UNSUPPORTED_MEDIA": { + "group": "Код: unsupported_media. Цей формат поки не обробляю. Надішли короткий текст або документ.", + "private": "Код: unsupported_media. Цей формат поки не обробляю. Надішли короткий текст або документ.", + }, + "UNSUPPORTED_INPUT": { + "group": "Код: unsupported_input. Не бачу тексту запиту. Надішли повідомлення текстом.", + "private": "Код: unsupported_input. Не бачу тексту запиту. Надішли повідомлення текстом.", + }, +} # 3. Створіть endpoint (опціонально, якщо потрібен окремий webhook): # @router.post("/new_agent/telegram/webhook") # async def new_agent_telegram_webhook(update: TelegramUpdate): @@ -2637,7 +3657,7 @@ def _dedup_cleanup(): # === END DEDUPLICATION === -async def handle_telegram_webhook( +async def _handle_telegram_webhook_impl( agent_config: AgentConfig, update: TelegramUpdate ) -> Dict[str, Any]: @@ -2657,7 +3677,34 @@ async def handle_telegram_webhook( if update.update_id: if update.update_id in _PROCESSED_UPDATES: logger.info(f"🔄 Skipping duplicate update_id={update.update_id} for {agent_config.name}") - return {"status": "ok", "skipped": "duplicate_update"} + duplicate_msg = update.message or update.channel_post or {} + duplicate_chat = duplicate_msg.get("chat", {}) if isinstance(duplicate_msg, dict) else {} + duplicate_chat_id = str(duplicate_chat.get("id", "")) if isinstance(duplicate_chat, dict) else "" + duplicate_chat_type = _normalize_chat_type( + duplicate_chat.get("type") if isinstance(duplicate_chat, dict) else None + ) + anti = await _apply_anti_silent_ack( + agent_config=agent_config, + chat_id=duplicate_chat_id, + chat_type=duplicate_chat_type, + reason_code="source_lock_duplicate_update", + ) + dup_ctx = _GATEWAY_WEBHOOK_EVENT_CTX.get() + if dup_ctx is not None: + dup_ctx["policy_decision"] = "SILENT" + dup_ctx["policy_reason"] = "source_lock_duplicate_update" + dup_ctx["chat_type"] = duplicate_chat_type + dup_ctx["anti_silent_action"] = anti.get("action") + dup_ctx["anti_silent_template"] = anti.get("template_id") + dup_ctx["anti_silent_tuning_applied"] = anti.get("tuning_applied") + return { + "status": "ok", + "skipped": "duplicate_update", + "anti_silent_action": anti.get("action"), + "template_id": anti.get("template_id"), + "tuning_applied": anti.get("tuning_applied"), + "chat_type": duplicate_chat_type, + } _PROCESSED_UPDATES[update.update_id] = _time.time() if len(_PROCESSED_UPDATES) > _DEDUP_MAX_SIZE: _dedup_cleanup() @@ -2681,10 +3728,45 @@ async def handle_telegram_webhook( first_name = from_user.get("first_name") last_name = from_user.get("last_name") is_sender_bot = bool(from_user.get("is_bot") or (username and username.lower().endswith("bot"))) + runtime_event_ctx = _GATEWAY_WEBHOOK_EVENT_CTX.get() + webhook_started_at = float(runtime_event_ctx.get("webhook_started_at")) if runtime_event_ctx else time.time() + if runtime_event_ctx and runtime_event_ctx.get("request_id"): + correlation_id = str(runtime_event_ctx.get("request_id")) + else: + correlation_id = _derive_gateway_request_id( + agent_config.agent_id, + update, + fallback_ms=int(webhook_started_at * 1000), + ) + if runtime_event_ctx is not None: + runtime_event_ctx["request_id"] = correlation_id + telegram_token = agent_config.get_telegram_token() + if not telegram_token: + raise HTTPException(status_code=500, detail=f"Telegram token not configured for {agent_config.name}") + + access_outcome = await _enforce_public_access( + agent_config=agent_config, + platform="telegram", + platform_user_id=user_id, + chat_id=chat_id, + chat_type=chat.get("type"), + telegram_token=telegram_token, + ) + if access_outcome is not None: + runtime_event_ctx = _GATEWAY_WEBHOOK_EVENT_CTX.get() + if runtime_event_ctx is not None: + runtime_event_ctx["policy_decision"] = "ACK" + runtime_event_ctx["policy_reason"] = access_outcome.get("reason") or "access_denied" + runtime_event_ctx["chat_type"] = _normalize_chat_type(chat.get("type")) + runtime_event_ctx["anti_silent_action"] = access_outcome.get("anti_silent_action") + runtime_event_ctx["anti_silent_template"] = access_outcome.get("template_id") + runtime_event_ctx["anti_silent_tuning_applied"] = "false" + return access_outcome + # Get DAO ID for this chat dao_id = get_dao_id(chat_id, "telegram", agent_id=agent_config.agent_id) - + initial_preferred_lang = resolve_preferred_language( chat_id=chat_id, user_id=user_id, @@ -2709,10 +3791,6 @@ async def handle_telegram_webhook( ) ) - telegram_token = agent_config.get_telegram_token() - if not telegram_token: - raise HTTPException(status_code=500, detail=f"Telegram token not configured for {agent_config.name}") - # === REPLY-TO-AGENT DETECTION === # If user replies to a bot message → treat as direct mention (SOWA Priority 3) is_reply_to_agent = False @@ -3302,6 +4380,45 @@ async def handle_telegram_webhook( text = update.message.get("text", "") caption = update.message.get("caption", "") + # Deterministic unsupported branch for non-text payloads in user-facing chats. + unsupported_types = [ + "sticker", + "animation", + "video_note", + "contact", + "location", + "venue", + "poll", + "dice", + "game", + "new_chat_members", + "left_chat_member", + "new_chat_title", + "new_chat_photo", + "delete_chat_photo", + "pinned_message", + "message_auto_delete_timer_changed", + ] + if not text and not caption and not document and not photo and not voice and not audio and not video_note: + unsupported_hit = next((msg_type for msg_type in unsupported_types if update.message.get(msg_type)), None) + if unsupported_hit: + anti = await _apply_anti_silent_ack( + agent_config=agent_config, + chat_id=chat_id, + chat_type=chat.get("type", "private"), + reason_code="unsupported_no_message", + ) + return { + "ok": True, + "ignored": True, + "reason": "unsupported_no_message", + "unsupported_type": unsupported_hit, + "anti_silent_action": anti.get("action"), + "template_id": anti.get("template_id"), + "tuning_applied": anti.get("tuning_applied"), + "chat_type": chat.get("type", "private"), + } + # Friendly greeting fast-path for better UX and less mechanical replies. if _is_simple_greeting(text): greeting_reply = ( @@ -3501,12 +4618,19 @@ async def handle_telegram_webhook( # Hard guard: don't send photo-related requests to text LLM path when image context is missing. if _needs_photo_only_response(text): - await send_telegram_message( + sent = await send_telegram_message( chat_id, "Бачу питання про фото, але не знайшов зображення в історії сесії. Надішли фото ще раз з коротким питанням, і я одразу проаналізую.", telegram_token, ) - return {"ok": True, "handled": True, "reason": "photo_followup_without_image_context"} + anti_action = "ACK_EMITTED" + return { + "ok": True, + "handled": True, + "reason": "photo_followup_without_image_context", + "anti_silent_action": anti_action, + "template_id": "UNSUPPORTED_MEDIA", + } if not text and not caption: # Check for unsupported message types and silently ignore @@ -3517,11 +4641,37 @@ async def handle_telegram_webhook( for msg_type in unsupported_types: if update.message.get(msg_type): logger.debug(f"Ignoring unsupported message type: {msg_type}") - return {"ok": True, "ignored": True, "reason": f"Unsupported message type: {msg_type}"} + anti = await _apply_anti_silent_ack( + agent_config=agent_config, + chat_id=chat_id, + chat_type=chat.get("type", "private"), + reason_code=f"unsupported_message_type_{msg_type}", + ) + return { + "ok": True, + "ignored": True, + "reason": f"unsupported_message_type_{msg_type}", + "anti_silent_action": anti.get("action"), + "template_id": anti.get("template_id"), + "tuning_applied": anti.get("tuning_applied"), + } # If no supported content found, return silently logger.debug(f"Message without processable content from user {user_id}") - return {"ok": True, "ignored": True, "reason": "No processable content"} + anti = await _apply_anti_silent_ack( + agent_config=agent_config, + chat_id=chat_id, + chat_type=chat.get("type", "private"), + reason_code="unsupported_no_processable_content", + ) + return { + "ok": True, + "ignored": True, + "reason": "unsupported_no_processable_content", + "anti_silent_action": anti.get("action"), + "template_id": anti.get("template_id"), + "tuning_applied": anti.get("tuning_applied"), + } # Use caption if text is empty (for photos with captions that weren't processed) if not text and caption: @@ -3530,6 +4680,103 @@ async def handle_telegram_webhook( logger.info(f"{agent_config.name} Telegram message from {username} (tg:{user_id}) in chat {chat_id}: {text[:50]}") mentioned_bots = extract_bot_mentions(text) needs_complex_reasoning = requires_complex_reasoning(text) + attachment_count = 0 + if update.message.get("document"): + attachment_count += 1 + if update.message.get("photo"): + attachment_count += len(update.message.get("photo") or []) + if update.message.get("voice") or update.message.get("audio") or update.message.get("video_note"): + attachment_count += 1 + if update.message.get("animation"): + attachment_count += 1 + + message_type = "text" + if update.message.get("document"): + message_type = "document" + elif update.message.get("photo"): + message_type = "photo" + elif update.message.get("voice") or update.message.get("audio") or update.message.get("video_note"): + message_type = "voice" + + user_signal = _detect_user_signal(agent_config.agent_id, chat_id, user_id, text or "") + inputs_hash = _build_gateway_inputs_hash( + agent_id=agent_config.agent_id, + channel="telegram", + message_length=len(text or ""), + message_type=message_type, + attachment_count=attachment_count, + ts_bucket_min=int(time.time() // 60), + ) + if runtime_event_ctx is not None: + runtime_event_ctx["user_signal"] = user_signal + runtime_event_ctx["inputs_hash"] = inputs_hash + runtime_event_ctx["emit_path"] = "normal" + + def _queue_gateway_event( + *, + policy_decision: str, + policy_reason: str, + result_ok: bool, + http_status: int, + error_class: Optional[str] = None, + error_msg: Optional[str] = None, + signal_override: Optional[str] = None, + llm_backend: Optional[str] = None, + llm_model: Optional[str] = None, + emit_path: str = "normal", + anti_silent_action: Optional[str] = None, + anti_silent_template: Optional[str] = None, + anti_silent_tuning_applied: Optional[str] = None, + chat_type_override: Optional[str] = None, + ) -> None: + active_ctx = runtime_event_ctx + if active_ctx is None: + active_ctx = { + "event_id": str(uuid.uuid4()), + "ts": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "node_id": os.getenv("NODE_ID", "NODA1"), + "source": "gateway", + "agent_id": agent_config.agent_id, + "request_id": correlation_id, + "channel": "telegram", + "task_type": "webhook", + "inputs_hash": inputs_hash, + "user_signal": user_signal, + "webhook_started_at": webhook_started_at, + "emitted": False, + } + active_ctx["request_id"] = correlation_id + active_ctx["inputs_hash"] = inputs_hash + active_ctx["user_signal"] = signal_override or user_signal + active_ctx["llm_backend"] = llm_backend + active_ctx["llm_model"] = llm_model + active_ctx["emit_path"] = emit_path + if anti_silent_action is not None: + active_ctx["anti_silent_action"] = anti_silent_action + if anti_silent_template is not None: + active_ctx["anti_silent_template"] = anti_silent_template + if anti_silent_tuning_applied is not None: + active_ctx["anti_silent_tuning_applied"] = anti_silent_tuning_applied + if chat_type_override is not None: + active_ctx["chat_type"] = _normalize_chat_type(chat_type_override) + _emit_gateway_event_from_ctx( + active_ctx, + policy_decision=policy_decision or "UNKNOWN", + policy_reason=policy_reason or "unknown", + result_ok=result_ok, + http_status=http_status, + error_class=error_class, + error_msg=error_msg, + signal_override=signal_override, + llm_backend=llm_backend, + llm_model=llm_model, + emit_path=emit_path, + early_reason=policy_reason, + anti_silent_action=anti_silent_action, + anti_silent_template=anti_silent_template, + anti_silent_tuning_applied=anti_silent_tuning_applied, + chat_type=chat_type_override or active_ctx.get("chat_type"), + ) cached_answer = get_cached_response(agent_config.agent_id, chat_id, text) if cached_answer: @@ -3550,6 +4797,15 @@ async def handle_telegram_webhook( }, username=username, ) + _queue_gateway_event( + policy_decision="FULL", + policy_reason="cache_hit", + result_ok=True, + http_status=200, + llm_backend="gateway-cache", + llm_model="gateway-cache", + emit_path="early_return", + ) return {"ok": True, "agent": agent_config.agent_id, "cached": True} # Check if there's a document context for follow-up questions @@ -3610,6 +4866,8 @@ async def handle_telegram_webhook( # Gateway computes has_link and has_explicit_request (source of truth) # ======================================== chat_type = chat.get("type", "private") + if runtime_event_ctx is not None: + runtime_event_ctx["chat_type"] = _normalize_chat_type(chat_type) is_private_chat = chat_type == "private" # Gateway: compute has_link (single source of truth) @@ -3672,23 +4930,61 @@ async def handle_telegram_webhook( if sowa_decision.action == "SILENT": logger.info(f"\U0001f507 SOWA: Agent {agent_config.agent_id} NOT responding. Reason: {respond_reason}") + anti = await _apply_anti_silent_ack( + agent_config=agent_config, + chat_id=chat_id, + chat_type=chat_type, + reason_code=respond_reason, + ) + anti_action = anti.get("action") + anti_template = anti.get("template_id") + anti_tuning_applied = anti.get("tuning_applied") + saved_response = "" + save_agent_response = False + if anti_action == "ACK_EMITTED": + saved_response = _anti_silent_template_text( + str(anti_template or "SILENT_POLICY"), + _normalize_chat_type(chat_type), + ) + save_agent_response = True # Save to memory for context tracking, but don't respond await memory_client.save_chat_turn( agent_id=agent_config.agent_id, team_id=dao_id, user_id=f"tg:{user_id}", message=text, - response="", # No response + response=saved_response, channel_id=chat_id, scope="short_term", - save_agent_response=False, + save_agent_response=save_agent_response, agent_metadata={ "sowa_skipped": True, "skip_reason": respond_reason, + "anti_silent_action": anti_action, + "anti_silent_template": anti_template, + "anti_silent_tuning_applied": anti_tuning_applied, }, username=username, ) - return {"ok": True, "skipped": True, "reason": respond_reason} + _queue_gateway_event( + policy_decision="SILENT", + policy_reason=respond_reason, + result_ok=True, + http_status=200, + emit_path="early_return", + anti_silent_action=anti_action, + anti_silent_template=anti_template, + anti_silent_tuning_applied=anti_tuning_applied, + chat_type_override=chat_type, + ) + return { + "ok": True, + "skipped": True, + "reason": respond_reason, + "anti_silent_action": anti_action, + "template_id": anti_template, + "tuning_applied": anti_tuning_applied, + } # ACK: send short presence message WITHOUT calling LLM/Router if sowa_decision.action == "ACK": @@ -3739,12 +5035,28 @@ async def handle_telegram_webhook( }, username=username, ) + _queue_gateway_event( + policy_decision="ACK", + policy_reason=respond_reason, + result_ok=True, + http_status=200, + emit_path="early_return", + ) return {"ok": True, "ack": True, "reason": respond_reason} # FULL: proceed with LLM/Router call # For prober requests, skip LLM/Router entirely to save tokens if is_prober: logger.info(f"\U0001f9ea PROBER: Agent {agent_config.agent_id} responding to prober (no LLM call). Reason: {respond_reason}") + _queue_gateway_event( + policy_decision="FULL", + policy_reason=respond_reason, + result_ok=True, + http_status=200, + llm_backend="gateway-prober", + llm_model="gateway-prober", + emit_path="early_return", + ) return {"ok": True, "agent": agent_config.agent_id, "prober": True, "response_preview": "[prober-skip-llm]"} else: logger.info(f"\u2705 SOWA: Agent {agent_config.agent_id} WILL respond (FULL). Reason: {respond_reason}") @@ -3845,6 +5157,8 @@ async def handle_telegram_webhook( "username": username, "chat_id": chat_id, "raw_user_text": text, + "request_id": correlation_id, + "trace_id": correlation_id, "sender_is_bot": is_sender_bot, "mentioned_bots": mentioned_bots, "requires_complex_reasoning": needs_complex_reasoning, @@ -3905,7 +5219,36 @@ async def handle_telegram_webhook( # Send to Router logger.info(f"Sending to Router: agent={agent_config.agent_id}, dao={dao_id}, user=tg:{user_id}") - response = await send_to_router(router_request) + try: + response = await send_to_router(router_request) + except httpx.TimeoutException as e: + logger.error("Router timeout for request_id=%s: %s", correlation_id, e) + timeout_message = "Вибач, запит обробляється довше звичного. Спробуй ще раз через кілька секунд." + await send_telegram_message(chat_id, timeout_message, telegram_token) + _queue_gateway_event( + policy_decision="FULL", + policy_reason=respond_reason, + result_ok=False, + http_status=504, + error_class="router_timeout", + error_msg=str(e), + signal_override="timeout", + emit_path="exception", + ) + return {"ok": False, "error": "router_timeout"} + except Exception as e: + logger.error("Router call failed for request_id=%s: %s", correlation_id, e, exc_info=True) + await send_telegram_message(chat_id, "Вибач, сталася помилка маршрутизації запиту.", telegram_token) + _queue_gateway_event( + policy_decision="FULL", + policy_reason=respond_reason, + result_ok=False, + http_status=502, + error_class=type(e).__name__, + error_msg=str(e), + emit_path="exception", + ) + return {"ok": False, "error": str(e)} # Extract response if isinstance(response, dict) and response.get("ok"): @@ -3972,6 +5315,15 @@ async def handle_telegram_webhook( }, username=username, ) + _queue_gateway_event( + policy_decision="FULL", + policy_reason=respond_reason, + result_ok=True, + http_status=200, + llm_backend=response.get("backend") if isinstance(response, dict) else None, + llm_model=response.get("model") if isinstance(response, dict) else None, + emit_path="early_return", + ) return {"ok": True, "skipped": True, "reason": "no_output_from_llm"} # Retry policy: if response drifts from current intent, do one strict reroute. @@ -4079,15 +5431,128 @@ async def handle_telegram_webhook( ) store_response_cache(agent_config.agent_id, chat_id, text, answer_text) + _queue_gateway_event( + policy_decision="FULL", + policy_reason=respond_reason, + result_ok=True, + http_status=200, + llm_backend=response.get("backend"), + llm_model=response.get("model"), + ) return {"ok": True, "agent": agent_config.agent_id} else: error_msg = response.get("error", "Unknown error") if isinstance(response, dict) else "Router error" logger.error(f"Router error: {error_msg}") await send_telegram_message(chat_id, f"Вибач, сталася помилка: {error_msg}", telegram_token) + _queue_gateway_event( + policy_decision="FULL", + policy_reason=respond_reason, + result_ok=False, + http_status=502, + error_class="router_error", + error_msg=error_msg, + llm_backend=response.get("backend") if isinstance(response, dict) else None, + llm_model=response.get("model") if isinstance(response, dict) else None, + emit_path="exception", + ) return {"ok": False, "error": error_msg} +async def handle_telegram_webhook( + agent_config: AgentConfig, + update: TelegramUpdate, +) -> Dict[str, Any]: + """ + Phase-4.1 wrapper: + guarantee exactly one gateway experience event per webhook request. + """ + ctx = _build_default_gateway_event_ctx(agent_config, update) + token = _GATEWAY_WEBHOOK_EVENT_CTX.set(ctx) + result: Optional[Dict[str, Any]] = None + try: + result = await _handle_telegram_webhook_impl(agent_config, update) + return result + except HTTPException as e: + ctx["result_ok"] = False + ctx["http_status"] = int(getattr(e, "status_code", 500) or 500) + ctx["error_class"] = "HTTPException" + ctx["error_msg"] = str(getattr(e, "detail", "") or str(e)) + ctx["emit_path"] = "exception" + raise + except Exception as e: + ctx["result_ok"] = False + ctx["http_status"] = 500 + ctx["error_class"] = type(e).__name__ + ctx["error_msg"] = str(e) + ctx["emit_path"] = "exception" + raise + finally: + if result is not None and isinstance(result, dict): + has_ok_flag = "ok" in result + ctx["result_ok"] = bool(result.get("ok", True)) if has_ok_flag else True + + reason = ( + result.get("reason") + or result.get("skipped") + or result.get("action") + or result.get("mode") + or result.get("status") + ) + skipped = str(result.get("skipped") or "").strip().lower() + if skipped == "duplicate_update": + reason = "source_lock_duplicate_update" + elif skipped == "no_message": + reason = "unsupported_no_message" + elif str(result.get("ignored") or "").lower() in {"true", "1"} and not reason: + reason = "ignored" + if reason: + ctx["policy_reason"] = str(reason) + if result.get("anti_silent_action") is not None: + ctx["anti_silent_action"] = result.get("anti_silent_action") + if result.get("template_id") is not None: + ctx["anti_silent_template"] = result.get("template_id") + if result.get("tuning_applied") is not None: + ctx["anti_silent_tuning_applied"] = str(result.get("tuning_applied")) + if result.get("chat_type") is not None: + ctx["chat_type"] = _normalize_chat_type(result.get("chat_type")) + if "http_status" in result: + try: + ctx["http_status"] = int(result.get("http_status")) + except Exception: + pass + elif not ctx["result_ok"]: + ctx["http_status"] = int(ctx.get("http_status") or 500) + else: + ctx["http_status"] = int(ctx.get("http_status") or 200) + + if result.get("skipped") or result.get("ignored") or result.get("cached") or result.get("action") or result.get("mode"): + ctx["emit_path"] = "early_return" + elif ctx.get("emit_path") not in {"normal", "exception"}: + ctx["emit_path"] = "normal" + + if not ctx.get("emitted"): + _emit_gateway_event_from_ctx( + ctx, + policy_decision=str(ctx.get("policy_decision") or "UNKNOWN"), + policy_reason=str(ctx.get("policy_reason") or "unknown"), + result_ok=bool(ctx.get("result_ok", True)), + http_status=int(ctx.get("http_status") or (200 if ctx.get("result_ok", True) else 500)), + error_class=ctx.get("error_class"), + error_msg=ctx.get("error_msg"), + signal_override=ctx.get("user_signal") or "none", + llm_backend=ctx.get("llm_backend"), + llm_model=ctx.get("llm_model"), + emit_path=str(ctx.get("emit_path") or "early_return"), + early_reason=str(ctx.get("policy_reason") or "unknown"), + anti_silent_action=ctx.get("anti_silent_action"), + anti_silent_template=ctx.get("anti_silent_template"), + anti_silent_tuning_applied=ctx.get("anti_silent_tuning_applied"), + chat_type=ctx.get("chat_type"), + ) + _GATEWAY_WEBHOOK_EVENT_CTX.reset(token) + + # ======================================== # Endpoints # ======================================== @@ -5105,20 +6570,63 @@ async def _old_helion_telegram_webhook(update: TelegramUpdate): return await handle_telegram_webhook(HELION_CONFIG, update) +@router.get("/metrics") +async def prometheus_metrics(): + """Prometheus metrics endpoint for gateway.""" + if not GATEWAY_METRICS_AVAILABLE or _gateway_get_metrics is None or _gateway_metrics_content_type is None: + return Response(content=b"# gateway metrics unavailable\\n", media_type="text/plain") + try: + return Response(content=_gateway_get_metrics(), media_type=_gateway_metrics_content_type()) + except Exception as e: + logger.error("Gateway metrics error: %s", e) + return Response(content=b"# gateway metrics error\\n", media_type="text/plain") + + +@router.get("/v1/agents/public") +async def list_public_agents(): + """Public discovery endpoint backed by access policies.""" + agents: List[Dict[str, Any]] = [] + for agent_id, config in AGENT_REGISTRY.items(): + policy = await _resolve_access_policy(agent_id) + if not bool(policy.get("enabled", True)): + continue + if not bool(policy.get("public_active", False)): + continue + + runtime = _agent_runtime_info(agent_id) + meta = AGENT_RUNTIME_META.get(agent_id, {}) + badges = meta.get("badges", []) if isinstance(meta, dict) else [] + channels: List[str] = [] + if runtime.get("telegram_mode") != "off" and config.get_telegram_token(): + channels.append("telegram") + + agents.append( + { + "agent_id": agent_id, + "display_name": config.name, + "district": badges[0] if badges else "core", + "status": "active", + "channels_supported": channels, + "requires_whitelist": bool(policy.get("requires_whitelist", False)), + "public_active": True, + } + ) + + agents.sort(key=lambda item: item["agent_id"]) + return { + "status": "ok", + "count": len(agents), + "agents": agents, + "timestamp": datetime.utcnow().isoformat(), + } + + @router.get("/health") async def health(): """Health check endpoint""" - # Static metadata for agents that don't have Telegram — used by Sofiia console UI badges - _AGENT_META: Dict[str, Dict] = { - "monitor": {"badges": ["per-node", "ops"], "visibility": "internal", "telegram_mode": "off"}, - "aistalk": {"badges": ["cyber", "private"], "visibility": "private", "lifecycle_status": "planned"}, - "sofiia": {"badges": ["supervisor", "architect"]}, - "helion": {"badges": ["cto", "dao"]}, - } - agents_info = {} for agent_id, config in AGENT_REGISTRY.items(): - meta = _AGENT_META.get(agent_id, {}) + meta = AGENT_RUNTIME_META.get(agent_id, {}) agents_info[agent_id] = { "name": config.name, "prompt_loaded": len(config.system_prompt) > 0, diff --git a/gateway-bot/metrics.py b/gateway-bot/metrics.py index 24cd7366..e869509e 100644 --- a/gateway-bot/metrics.py +++ b/gateway-bot/metrics.py @@ -45,6 +45,79 @@ ROUTER_LATENCY = Histogram( buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0] ) +# === Experience Bus Phase-4 Metrics === +GATEWAY_EXPERIENCE_PUBLISHED_TOTAL = Counter( + "gateway_experience_published_total", + "Gateway experience event publish/store status", + ["status"] # ok, err +) + +GATEWAY_POLICY_DECISIONS_TOTAL = Counter( + "gateway_policy_decisions_total", + "Gateway policy (SOWA) decisions", + ["sowa_decision", "reason"] +) + +GATEWAY_USER_SIGNAL_TOTAL = Counter( + "gateway_user_signal_total", + "Detected user signals from webhook stream", + ["user_signal"] # none, positive, negative, retry, timeout +) + +GATEWAY_WEBHOOK_LATENCY_MS = Histogram( + "gateway_webhook_latency_ms", + "Gateway webhook end-to-end latency in milliseconds", + buckets=[5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000] +) + +GATEWAY_EXPERIENCE_EMITTED_TOTAL = Counter( + "gateway_experience_emitted_total", + "Gateway experience events emitted from webhook handler", + ["status", "path"] # status: ok|err, path: normal|early_return|exception +) + +GATEWAY_EARLY_RETURN_TOTAL = Counter( + "gateway_early_return_total", + "Gateway early return branches observed by reason", + ["reason"] +) + +GATEWAY_EVENT_FINALIZE_LATENCY_MS = Histogram( + "gateway_event_finalize_latency_ms", + "Gateway event finalize latency in milliseconds", + buckets=[1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000] +) + +GATEWAY_ANTI_SILENT_TOTAL = Counter( + "gateway_anti_silent_total", + "Gateway anti-silent actions by reason/chat type", + ["action", "reason", "chat_type"] # ACK_EMITTED, ACK_SUPPRESSED_COOLDOWN +) + +GATEWAY_ACK_SENT_TOTAL = Counter( + "gateway_ack_sent_total", + "Gateway ACK messages sent by template/chat type", + ["template_id", "chat_type"] +) + +GATEWAY_ANTI_SILENT_TUNING_APPLIED_TOTAL = Counter( + "gateway_anti_silent_tuning_applied_total", + "Gateway anti-silent tuning applications by reason/chat type/template", + ["reason", "chat_type", "template_id"] +) + +GATEWAY_ACCESS_DECISIONS_TOTAL = Counter( + "gateway_access_decisions_total", + "Gateway access decisions for public layer", + ["decision", "agent_id", "chat_type"] # allow, deny, rate_limited +) + +GATEWAY_RATE_LIMITED_TOTAL = Counter( + "gateway_rate_limited_total", + "Gateway rate limit hits by scope", + ["scope", "agent_id", "chat_type"] # user_global, user_agent, group_agent +) + # === Memory Service Metrics === MEMORY_CALLS_TOTAL = Counter( "gateway_memory_calls_total", diff --git a/migrations/056_agent_access_policies.sql b/migrations/056_agent_access_policies.sql new file mode 100644 index 00000000..2348558b --- /dev/null +++ b/migrations/056_agent_access_policies.sql @@ -0,0 +1,61 @@ +-- Phase-7 public access layer +-- Access policy + allowlist tables for gateway entitlements/rate-limits. + +CREATE TABLE IF NOT EXISTS agent_access_policies ( + agent_id TEXT PRIMARY KEY, + enabled BOOLEAN NOT NULL DEFAULT TRUE, + public_active BOOLEAN NOT NULL DEFAULT TRUE, + requires_whitelist BOOLEAN NOT NULL DEFAULT FALSE, + user_global_limit INTEGER NOT NULL DEFAULT 60, + user_global_window_seconds INTEGER NOT NULL DEFAULT 300, + user_agent_limit INTEGER NOT NULL DEFAULT 20, + user_agent_window_seconds INTEGER NOT NULL DEFAULT 300, + group_agent_limit INTEGER NOT NULL DEFAULT 10, + group_agent_window_seconds INTEGER NOT NULL DEFAULT 300, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS agent_allowlist ( + id BIGSERIAL PRIMARY KEY, + platform TEXT NOT NULL, + platform_user_id TEXT NOT NULL, + agent_id TEXT NOT NULL REFERENCES agent_access_policies(agent_id) ON DELETE CASCADE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (platform, platform_user_id, agent_id) +); + +CREATE INDEX IF NOT EXISTS idx_agent_access_policies_enabled + ON agent_access_policies (enabled, public_active); + +CREATE INDEX IF NOT EXISTS idx_agent_allowlist_lookup + ON agent_allowlist (platform, platform_user_id, agent_id); + +INSERT INTO agent_access_policies ( + agent_id, + enabled, + public_active, + requires_whitelist +) +VALUES + ('daarwizz', TRUE, TRUE, FALSE), + ('helion', TRUE, TRUE, FALSE), + ('greenfood', TRUE, TRUE, FALSE), + ('agromatrix', TRUE, TRUE, FALSE), + ('alateya', TRUE, TRUE, FALSE), + ('nutra', TRUE, TRUE, FALSE), + ('druid', TRUE, TRUE, FALSE), + ('clan', TRUE, TRUE, FALSE), + ('eonarch', TRUE, TRUE, FALSE), + ('senpai', TRUE, TRUE, FALSE), + ('oneok', TRUE, TRUE, FALSE), + ('soul', TRUE, TRUE, FALSE), + ('yaromir', TRUE, TRUE, FALSE), + ('sofiia', TRUE, TRUE, FALSE), + ('monitor', FALSE, FALSE, TRUE), + ('aistalk', FALSE, FALSE, TRUE) +ON CONFLICT (agent_id) DO UPDATE +SET + enabled = EXCLUDED.enabled, + public_active = EXCLUDED.public_active, + requires_whitelist = EXCLUDED.requires_whitelist, + updated_at = now();