Files

840 lines
31 KiB
Python

from __future__ import annotations
import asyncio
import contextlib
import hashlib
import json
import logging
import os
import random
import re
import time
import uuid
from collections import OrderedDict, deque
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Deque, Dict, Optional, Tuple
import asyncpg
import nats
from fastapi import FastAPI, Response
from nats.aio.msg import Msg
from nats.js.api import AckPolicy, ConsumerConfig, DeliverPolicy
from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, generate_latest
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO").upper(),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("experience_learner")
LESSONS_EXTRACTED = Counter(
"lessons_extracted_total",
"Total lessons extracted from experience events",
["status"],
)
LESSONS_INSERT = Counter(
"lessons_insert_total",
"Total lesson insert attempts",
["status"],
)
JS_MESSAGES_ACKED = Counter(
"js_messages_acked_total",
"Total JetStream messages acked by learner",
)
JS_MESSAGES_REDELIVERED = Counter(
"js_messages_redelivered_total",
"Total redelivered JetStream messages observed by learner",
)
EVENTS_SELECTED = Counter(
"experience_learner_events_selected_total",
"Events selected for learner processing",
["reason"],
)
EVENTS_DROPPED = Counter(
"experience_learner_events_dropped_total",
"Events dropped by learner filtering/dedup",
["reason"],
)
LESSON_PUBLISH = Counter(
"experience_learner_lessons_published_total",
"Lesson publish attempts to JetStream",
["status"],
)
ANTI_SILENT_TUNING_EVALUATED = Counter(
"experience_learner_anti_silent_tuning_evaluated_total",
"Anti-silent tuning lesson generation evaluations",
["status"],
)
CONSUMER_RUNNING = Gauge(
"experience_learner_consumer_running",
"1 when learner consumer loop is running",
)
@dataclass
class EventSample:
ts_mono: float
ok: bool
latency_ms: int
class ExperienceLearner:
def __init__(self) -> None:
self.node_id = os.getenv("NODE_ID", "NODA1")
self.nats_url = os.getenv("NATS_URL", "nats://nats:4222")
self.stream_name = os.getenv("EXPERIENCE_STREAM_NAME", "EXPERIENCE")
self.subject = os.getenv("EXPERIENCE_SUBJECT", "agent.experience.v1.>")
self.lesson_subject = os.getenv("LESSON_SUBJECT", "agent.lesson.v1")
self.durable = os.getenv("EXPERIENCE_DURABLE", "experience-learner-v1")
self.deliver_policy = os.getenv("EXPERIENCE_DELIVER_POLICY", "all").lower()
self.ack_wait_s = float(os.getenv("EXPERIENCE_ACK_WAIT_SECONDS", "30"))
self.max_deliver = int(os.getenv("EXPERIENCE_MAX_DELIVER", "20"))
self.fetch_batch = int(os.getenv("EXPERIENCE_FETCH_BATCH", "64"))
self.fetch_timeout_s = float(os.getenv("EXPERIENCE_FETCH_TIMEOUT_SECONDS", "2"))
self.window_s = int(os.getenv("EXPERIENCE_WINDOW_SECONDS", "1800"))
self.ok_sample_pct = float(os.getenv("EXPERIENCE_OK_SAMPLE_PCT", "10"))
self.latency_spike_ms = int(os.getenv("EXPERIENCE_LATENCY_SPIKE_MS", "5000"))
self.error_threshold = int(os.getenv("EXPERIENCE_ERROR_THRESHOLD", "3"))
self.silent_threshold = int(os.getenv("EXPERIENCE_SILENT_THRESHOLD", "5"))
self.latency_threshold = int(os.getenv("EXPERIENCE_LATENCY_THRESHOLD", "3"))
self.event_dedup_ttl_s = int(os.getenv("EXPERIENCE_EVENT_DEDUP_TTL_SECONDS", "3600"))
self.event_dedup_max = int(os.getenv("EXPERIENCE_EVENT_DEDUP_MAX", "100000"))
self.publish_lessons = os.getenv("LESSON_PUBLISH_ENABLED", "true").lower() in {"1", "true", "yes"}
self.anti_silent_tuning_enabled = os.getenv("ANTI_SILENT_TUNING_ENABLED", "true").lower() in {"1", "true", "yes"}
self.anti_silent_window_days = max(1, int(os.getenv("ANTI_SILENT_TUNING_WINDOW_DAYS", "7")))
self.anti_silent_min_evidence = max(1, int(os.getenv("ANTI_SILENT_TUNING_MIN_EVIDENCE", "20")))
self.anti_silent_min_score = max(0.0, min(1.0, float(os.getenv("ANTI_SILENT_TUNING_MIN_SCORE", "0.75"))))
self.anti_silent_weight_retry = max(0.0, min(1.0, float(os.getenv("ANTI_SILENT_TUNING_WEIGHT_RETRY", "0.6"))))
self.anti_silent_weight_negative = max(0.0, min(1.0, float(os.getenv("ANTI_SILENT_TUNING_WEIGHT_NEGATIVE", "0.3"))))
self.anti_silent_weight_suppressed = max(0.0, min(1.0, float(os.getenv("ANTI_SILENT_TUNING_WEIGHT_SUPPRESSED", "0.1"))))
self.anti_silent_ttl_days = max(1, int(os.getenv("ANTI_SILENT_TUNING_TTL_DAYS", "7")))
self.db_dsn = (
os.getenv("LEARNER_DATABASE_URL")
or os.getenv("EXPERIENCE_DATABASE_URL")
or os.getenv("DATABASE_URL")
)
if not self.db_dsn:
raise RuntimeError("LEARNER_DATABASE_URL (or EXPERIENCE_DATABASE_URL/DATABASE_URL) is required")
self._running = False
self._task: Optional[asyncio.Task[Any]] = None
self._nc = None
self._js = None
self._sub = None
self._pool: Optional[asyncpg.Pool] = None
self._seen_events: "OrderedDict[str, float]" = OrderedDict()
self._buckets: Dict[str, Deque[EventSample]] = {}
self._lock = asyncio.Lock()
async def start(self) -> None:
if self._running:
return
self._pool = await asyncpg.create_pool(self.db_dsn, min_size=1, max_size=4)
self._nc = await nats.connect(self.nats_url)
self._js = self._nc.jetstream()
await self._ensure_consumer()
self._sub = await self._js.pull_subscribe(
self.subject,
durable=self.durable,
stream=self.stream_name,
)
self._running = True
CONSUMER_RUNNING.set(1)
self._task = asyncio.create_task(self._consume_loop(), name="experience-learner")
logger.info(
"experience-learner started stream=%s subject=%s durable=%s",
self.stream_name,
self.subject,
self.durable,
)
async def stop(self) -> None:
self._running = False
CONSUMER_RUNNING.set(0)
if self._task:
self._task.cancel()
with contextlib.suppress(Exception):
await self._task
self._task = None
if self._nc:
await self._nc.close()
self._nc = None
self._js = None
self._sub = None
if self._pool:
await self._pool.close()
self._pool = None
async def _ensure_consumer(self) -> None:
if self._js is None:
return
deliver_policy = DeliverPolicy.ALL if self.deliver_policy == "all" else DeliverPolicy.NEW
cfg = ConsumerConfig(
durable_name=self.durable,
ack_policy=AckPolicy.EXPLICIT,
ack_wait=self.ack_wait_s,
max_deliver=self.max_deliver,
deliver_policy=deliver_policy,
filter_subject=self.subject,
)
try:
await self._js.add_consumer(self.stream_name, config=cfg)
logger.info("consumer created durable=%s stream=%s", self.durable, self.stream_name)
except Exception as exc:
msg = str(exc).lower()
if "consumer name already in use" in msg or "consumer already exists" in msg:
logger.info("consumer exists durable=%s stream=%s", self.durable, self.stream_name)
else:
raise
async def _consume_loop(self) -> None:
assert self._sub is not None
while self._running:
try:
msgs = await self._sub.fetch(self.fetch_batch, timeout=self.fetch_timeout_s)
except asyncio.TimeoutError:
continue
except Exception as exc:
logger.warning("fetch failed: %s", exc)
await asyncio.sleep(1.0)
continue
for msg in msgs:
await self._handle_msg(msg)
async def _handle_msg(self, msg: Msg) -> None:
try:
metadata = getattr(msg, "metadata", None)
if metadata is not None and getattr(metadata, "num_delivered", 1) > 1:
JS_MESSAGES_REDELIVERED.inc()
event = json.loads(msg.data.decode("utf-8", errors="replace"))
if not isinstance(event, dict):
EVENTS_DROPPED.labels(reason="invalid_payload").inc()
await msg.ack()
JS_MESSAGES_ACKED.inc()
return
event_id = str(event.get("event_id") or "").strip()
if event_id and await self._seen_event(event_id):
EVENTS_DROPPED.labels(reason="event_dedup").inc()
await msg.ack()
JS_MESSAGES_ACKED.inc()
return
keep, reason = self._should_keep(event)
if not keep:
EVENTS_DROPPED.labels(reason=reason).inc()
await msg.ack()
JS_MESSAGES_ACKED.inc()
return
EVENTS_SELECTED.labels(reason=reason).inc()
lessons = await self._extract_lessons(event)
if not lessons:
await msg.ack()
JS_MESSAGES_ACKED.inc()
return
for lesson in lessons:
LESSONS_EXTRACTED.labels(status="ok").inc()
insert_status = await self._insert_lesson(lesson)
LESSONS_INSERT.labels(status=insert_status).inc()
if insert_status == "ok" and self.publish_lessons:
await self._publish_lesson(lesson)
await msg.ack()
JS_MESSAGES_ACKED.inc()
except Exception as exc:
LESSONS_EXTRACTED.labels(status="err").inc()
logger.exception("message handling failed: %s", exc)
with contextlib.suppress(Exception):
await msg.nak()
async def _seen_event(self, event_id: str) -> bool:
now = time.monotonic()
async with self._lock:
self._prune_seen(now)
seen_ts = self._seen_events.get(event_id)
if seen_ts is not None and (now - seen_ts) < self.event_dedup_ttl_s:
return True
self._seen_events[event_id] = now
self._seen_events.move_to_end(event_id, last=True)
while len(self._seen_events) > self.event_dedup_max:
self._seen_events.popitem(last=False)
return False
def _prune_seen(self, now: float) -> None:
threshold = now - self.event_dedup_ttl_s
while self._seen_events:
_, ts = next(iter(self._seen_events.items()))
if ts >= threshold:
break
self._seen_events.popitem(last=False)
def _should_keep(self, event: Dict[str, Any]) -> Tuple[bool, str]:
result = event.get("result") or {}
llm = event.get("llm") or {}
policy = event.get("policy") or {}
ok = bool(result.get("ok"))
status = _as_int(result.get("http_status"), 0)
latency_ms = _as_int(llm.get("latency_ms"), 0)
sowa_decision = str(policy.get("sowa_decision") or "").upper()
if not ok:
return True, "error"
if self._is_anti_silent_gateway_event(event):
return True, "anti_silent_signal"
if sowa_decision == "SILENT":
return True, "silent"
if status >= 500:
return True, "http_5xx"
if latency_ms >= self.latency_spike_ms:
return True, "latency_spike"
if random.random() * 100.0 < self.ok_sample_pct:
return True, "ok_sample_in"
return False, "ok_sample_out"
def _is_anti_silent_gateway_event(self, event: Dict[str, Any]) -> bool:
if not self.anti_silent_tuning_enabled:
return False
if str(event.get("source") or "").strip().lower() != "gateway":
return False
action = str(event.get("anti_silent_action") or "").strip().upper()
if action not in {"ACK_EMITTED", "ACK_SUPPRESSED_COOLDOWN"}:
return False
reason = str((event.get("policy") or {}).get("reason") or "").strip()
template_id = str(event.get("anti_silent_template") or "").strip()
return bool(reason and template_id)
async def _extract_lessons(self, event: Dict[str, Any]) -> list[Dict[str, Any]]:
lessons: list[Dict[str, Any]] = []
tuning_lesson = await self._try_extract_anti_silent_tuning_lesson(event)
if tuning_lesson is not None:
lessons.append(tuning_lesson)
operational_lesson = await self._try_extract_lesson(event)
if operational_lesson is not None:
lessons.append(operational_lesson)
return lessons
async def _try_extract_lesson(self, event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
categories = self._lesson_categories(event)
if not categories:
return None
for category in categories:
bucket_key = self._bucket_key(category, event)
count, ok_rate, p95_latency = self._update_bucket(bucket_key, event)
threshold = self._threshold_for(category)
if count < threshold:
continue
lesson = self._build_lesson(
category=category,
event=event,
count=count,
ok_rate=ok_rate,
p95_latency=p95_latency,
)
return lesson
return None
async def _try_extract_anti_silent_tuning_lesson(self, event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if not self._is_anti_silent_gateway_event(event):
return None
if self._pool is None:
ANTI_SILENT_TUNING_EVALUATED.labels(status="pool_missing").inc()
return None
policy = event.get("policy") or {}
reason = _safe_token(policy.get("reason"))
chat_type = _safe_token(event.get("chat_type"))
if not reason or not chat_type:
ANTI_SILENT_TUNING_EVALUATED.labels(status="missing_fields").inc()
return None
stats = await self._anti_silent_stats(reason=reason, chat_type=chat_type)
if not stats:
ANTI_SILENT_TUNING_EVALUATED.labels(status="no_data").inc()
return None
candidates = [item for item in stats if int(item.get("n") or 0) >= self.anti_silent_min_evidence]
if not candidates:
ANTI_SILENT_TUNING_EVALUATED.labels(status="insufficient_evidence").inc()
return None
best = max(candidates, key=lambda item: (float(item.get("score", 0.0)), int(item.get("n", 0))))
best_score = float(best.get("score", 0.0))
if best_score < self.anti_silent_min_score:
ANTI_SILENT_TUNING_EVALUATED.labels(status="below_score").inc()
return None
worst = min(candidates, key=lambda item: (float(item.get("score", 0.0)), -int(item.get("n", 0))))
best_template = str(best.get("template_id") or "").strip().upper()
if not best_template:
ANTI_SILENT_TUNING_EVALUATED.labels(status="bad_template").inc()
return None
trigger = f"reason={reason};chat_type={chat_type}"
action = f"prefer_template={best_template}"
avoid = ""
worst_template = str(worst.get("template_id") or "").strip().upper()
if worst_template and worst_template != best_template:
avoid = f"avoid_template={worst_template}"
if not avoid:
avoid = "avoid_template=none"
lesson_type = "anti_silent_tuning"
lesson_key_raw = "|".join([lesson_type, trigger, action])
lesson_key = hashlib.sha256(lesson_key_raw.encode("utf-8")).hexdigest()
now_dt = datetime.now(timezone.utc)
expires_at = (now_dt + timedelta(days=self.anti_silent_ttl_days)).isoformat().replace("+00:00", "Z")
evidence = {
"n_best": int(best.get("n") or 0),
"score_best": round(best_score, 6),
"retry_rate": round(float(best.get("retry_rate", 0.0)), 6),
"negative_rate": round(float(best.get("negative_rate", 0.0)), 6),
"suppressed_rate": round(float(best.get("suppressed_rate", 0.0)), 6),
"window_days": self.anti_silent_window_days,
"weights": {
"retry": self.anti_silent_weight_retry,
"negative": self.anti_silent_weight_negative,
"suppressed": self.anti_silent_weight_suppressed,
},
"candidates": stats,
}
signals = {
"policy_reason": reason,
"chat_type": chat_type,
"lesson_type": lesson_type,
"trigger_kind": "anti_silent_ack_template",
}
lesson: Dict[str, Any] = {
"lesson_id": str(uuid.uuid4()),
"lesson_key": lesson_key,
"lesson_type": lesson_type,
"ts": now_dt.isoformat().replace("+00:00", "Z"),
"expires_at": expires_at,
"scope": "global",
"agent_id": None,
"task_type": "webhook",
"trigger": trigger,
"action": action,
"avoid": avoid,
"signals": signals,
"evidence": evidence,
}
lesson["raw"] = dict(lesson)
ANTI_SILENT_TUNING_EVALUATED.labels(status="ok").inc()
return lesson
async def _anti_silent_stats(self, *, reason: str, chat_type: str) -> list[Dict[str, Any]]:
if self._pool is None:
return []
query = """
SELECT
COALESCE(raw->>'anti_silent_template', '') AS template_id,
COUNT(*)::int AS n,
AVG(
CASE
WHEN COALESCE(raw->'feedback'->>'user_signal', 'none') = 'retry' THEN 1.0
ELSE 0.0
END
)::float8 AS retry_rate,
AVG(
CASE
WHEN COALESCE(raw->'feedback'->>'user_signal', 'none') = 'negative' THEN 1.0
ELSE 0.0
END
)::float8 AS negative_rate,
AVG(
CASE
WHEN COALESCE(raw->>'anti_silent_action', '') = 'ACK_SUPPRESSED_COOLDOWN' THEN 1.0
ELSE 0.0
END
)::float8 AS suppressed_rate
FROM agent_experience_events
WHERE source = 'gateway'
AND ts >= (now() - ($1::int * interval '1 day'))
AND COALESCE(raw->'policy'->>'reason', '') = $2
AND COALESCE(raw->>'chat_type', 'unknown') = $3
AND COALESCE(raw->>'anti_silent_action', '') IN ('ACK_EMITTED', 'ACK_SUPPRESSED_COOLDOWN')
AND COALESCE(raw->>'anti_silent_template', '') <> ''
GROUP BY 1
HAVING COUNT(*) >= $4
"""
try:
async with self._pool.acquire() as conn:
rows = await conn.fetch(
query,
self.anti_silent_window_days,
reason,
chat_type,
self.anti_silent_min_evidence,
)
except Exception as exc:
logger.warning("anti-silent stats query failed: %s", exc)
return []
results: list[Dict[str, Any]] = []
for row in rows:
template_id = str(row.get("template_id") or "").strip().upper()
if not template_id:
continue
n = int(row.get("n") or 0)
retry_rate = float(row.get("retry_rate") or 0.0)
negative_rate = float(row.get("negative_rate") or 0.0)
suppressed_rate = float(row.get("suppressed_rate") or 0.0)
score = 1.0 - (
self.anti_silent_weight_retry * retry_rate
+ self.anti_silent_weight_negative * negative_rate
+ self.anti_silent_weight_suppressed * suppressed_rate
)
score = max(0.0, min(1.0, score))
results.append(
{
"template_id": template_id,
"n": n,
"retry_rate": retry_rate,
"negative_rate": negative_rate,
"suppressed_rate": suppressed_rate,
"score": score,
}
)
return results
def _lesson_categories(self, event: Dict[str, Any]) -> list[str]:
result = event.get("result") or {}
llm = event.get("llm") or {}
policy = event.get("policy") or {}
categories: list[str] = []
if not bool(result.get("ok")):
categories.append("error_repeat")
if str(policy.get("sowa_decision") or "").upper() == "SILENT":
categories.append("silent_repeat")
if _as_int(llm.get("latency_ms"), 0) >= self.latency_spike_ms:
categories.append("latency_spike")
return categories
def _bucket_key(self, category: str, event: Dict[str, Any]) -> str:
llm = event.get("llm") or {}
result = event.get("result") or {}
policy = event.get("policy") or {}
parts = [
category,
str(event.get("agent_id") or ""),
str(event.get("task_type") or "infer"),
str(result.get("error_class") or ""),
str(policy.get("reason") or ""),
str(llm.get("provider") or ""),
str(llm.get("model") or ""),
str(llm.get("profile") or ""),
]
return "|".join(parts)
def _update_bucket(self, bucket_key: str, event: Dict[str, Any]) -> Tuple[int, Optional[float], Optional[int]]:
now = time.monotonic()
llm = event.get("llm") or {}
result = event.get("result") or {}
sample = EventSample(
ts_mono=now,
ok=bool(result.get("ok")),
latency_ms=_as_int(llm.get("latency_ms"), 0),
)
bucket = self._buckets.get(bucket_key)
if bucket is None:
bucket = deque()
self._buckets[bucket_key] = bucket
bucket.append(sample)
cutoff = now - self.window_s
while bucket and bucket[0].ts_mono < cutoff:
bucket.popleft()
if not bucket:
return 0, None, None
count = len(bucket)
ok_count = sum(1 for item in bucket if item.ok)
ok_rate = round(ok_count / count, 4) if count > 0 else None
latencies = sorted(item.latency_ms for item in bucket)
p95_latency = _p95(latencies)
return count, ok_rate, p95_latency
def _threshold_for(self, category: str) -> int:
if category == "error_repeat":
return self.error_threshold
if category == "silent_repeat":
return self.silent_threshold
return self.latency_threshold
def _build_lesson(
self,
category: str,
event: Dict[str, Any],
count: int,
ok_rate: Optional[float],
p95_latency: Optional[int],
) -> Dict[str, Any]:
llm = event.get("llm") or {}
result = event.get("result") or {}
policy = event.get("policy") or {}
agent_id = str(event.get("agent_id") or "").strip() or None
task_type = str(event.get("task_type") or "infer")
error_class = _safe_token(result.get("error_class"))
policy_reason = _safe_token(policy.get("reason"))
sowa_decision = _safe_token(policy.get("sowa_decision"))
if category == "silent_repeat":
trigger = "Frequent SILENT policy outcomes on active conversation flow."
action = "Use short ACK/CHALLENGE clarification before silencing response."
avoid = "Avoid immediate SILENT when user intent might target the agent."
elif category == "latency_spike":
trigger = "Repeated latency spikes above configured SLA threshold."
action = "Prefer faster model/profile and reduce expensive tool rounds."
avoid = "Avoid routing to slow provider/profile for same task pattern."
else:
trigger = f"Repeated inference failures of class '{error_class or 'unknown_error'}'."
action = "Switch to stable provider/profile and constrain optional tool calls."
avoid = "Avoid blind retries on the same failing route."
scope = "agent" if agent_id else "global"
lesson_key_raw = "|".join(
[
scope,
str(agent_id or ""),
trigger,
action,
avoid,
str(error_class or ""),
str(policy_reason or ""),
]
)
lesson_key = hashlib.sha256(lesson_key_raw.encode("utf-8")).hexdigest()
lesson = {
"lesson_id": str(uuid.uuid4()),
"lesson_key": lesson_key,
"ts": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"scope": scope,
"agent_id": agent_id,
"task_type": task_type,
"trigger": trigger,
"action": action,
"avoid": avoid,
"signals": {
"policy_reason": policy_reason,
"policy_decision": sowa_decision,
"error_class": error_class,
"provider": _safe_token(llm.get("provider")),
"model": _safe_token(llm.get("model")),
"profile": _safe_token(llm.get("profile")),
},
"evidence": {
"count": count,
"ok_rate": ok_rate,
"p95_latency_ms": p95_latency,
},
}
lesson["raw"] = dict(lesson)
return lesson
async def _insert_lesson(self, lesson: Dict[str, Any]) -> str:
if self._pool is None:
return "err"
query_insert = """
INSERT INTO agent_lessons (
lesson_id,
lesson_key,
ts,
scope,
agent_id,
task_type,
trigger,
action,
avoid,
signals,
evidence,
raw
) VALUES (
$1::uuid,
$2,
$3::timestamptz,
$4,
$5,
$6,
$7,
$8,
$9,
$10::jsonb,
$11::jsonb,
$12::jsonb
)
ON CONFLICT (lesson_key) DO NOTHING
RETURNING id
"""
query_tuning_upsert = """
INSERT INTO agent_lessons (
lesson_id,
lesson_key,
ts,
scope,
agent_id,
task_type,
trigger,
action,
avoid,
signals,
evidence,
raw
) VALUES (
$1::uuid,
$2,
$3::timestamptz,
$4,
$5,
$6,
$7,
$8,
$9,
$10::jsonb,
$11::jsonb,
$12::jsonb
)
ON CONFLICT (lesson_key) DO UPDATE SET
ts = EXCLUDED.ts,
scope = EXCLUDED.scope,
agent_id = EXCLUDED.agent_id,
task_type = EXCLUDED.task_type,
trigger = EXCLUDED.trigger,
action = EXCLUDED.action,
avoid = EXCLUDED.avoid,
signals = EXCLUDED.signals,
evidence = EXCLUDED.evidence,
raw = EXCLUDED.raw
RETURNING id
"""
try:
lesson_id = uuid.UUID(str(lesson["lesson_id"]))
ts_value = _as_timestamptz(lesson["ts"])
lesson_type = str(lesson.get("lesson_type") or "").strip().lower()
query = query_tuning_upsert if lesson_type == "anti_silent_tuning" else query_insert
async with self._pool.acquire() as conn:
row_id = await conn.fetchval(
query,
lesson_id,
lesson["lesson_key"],
ts_value,
lesson["scope"],
lesson.get("agent_id"),
lesson["task_type"],
lesson["trigger"],
lesson["action"],
lesson["avoid"],
json.dumps(lesson["signals"], ensure_ascii=False),
json.dumps(lesson["evidence"], ensure_ascii=False),
json.dumps(lesson, ensure_ascii=False),
)
if row_id is None:
return "conflict"
return "ok"
except Exception as exc:
logger.warning("insert lesson failed: %s", exc)
return "err"
async def _publish_lesson(self, lesson: Dict[str, Any]) -> None:
if self._js is None:
LESSON_PUBLISH.labels(status="skipped").inc()
return
payload = json.dumps(lesson, ensure_ascii=False).encode("utf-8")
headers = {"Nats-Msg-Id": str(lesson["lesson_id"])}
try:
await self._js.publish(self.lesson_subject, payload, headers=headers)
LESSON_PUBLISH.labels(status="ok").inc()
except Exception as exc:
LESSON_PUBLISH.labels(status="err").inc()
logger.warning("publish lesson failed: %s", exc)
async def health(self) -> Dict[str, Any]:
return {
"status": "ok" if self._running else "starting",
"node_id": self.node_id,
"stream": self.stream_name,
"subject": self.subject,
"durable": self.durable,
"nats_connected": self._nc is not None and self._nc.is_connected,
"db_connected": self._pool is not None,
"running": self._running,
}
def _safe_token(value: Any) -> Optional[str]:
if value is None:
return None
text = str(value)
text = re.sub(r"(?i)bearer\s+[A-Za-z0-9._-]+", "bearer [redacted]", text)
text = re.sub(r"(?i)(api[_-]?key|token|password|secret)\s*[:=]\s*[^\s,;]+", r"\1=[redacted]", text)
text = re.sub(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", "[redacted-email]", text)
text = re.sub(r"https?://[^\s]+", "[redacted-url]", text)
text = re.sub(r"\s+", " ", text).strip()
return text[:180] if text else None
def _as_int(value: Any, default: int) -> int:
try:
return int(value)
except Exception:
return default
def _as_timestamptz(value: Any) -> datetime:
if isinstance(value, datetime):
return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc)
try:
parsed = datetime.fromisoformat(str(value).replace("Z", "+00:00"))
return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc)
except Exception:
return datetime.now(timezone.utc)
def _p95(sorted_values: list[int]) -> Optional[int]:
if not sorted_values:
return None
idx = int(round(0.95 * (len(sorted_values) - 1)))
return sorted_values[min(max(idx, 0), len(sorted_values) - 1)]
app = FastAPI(title="Experience Learner")
learner = ExperienceLearner()
@app.on_event("startup")
async def startup() -> None:
await learner.start()
@app.on_event("shutdown")
async def shutdown() -> None:
await learner.stop()
@app.get("/health")
async def health() -> Dict[str, Any]:
return await learner.health()
@app.get("/metrics")
async def metrics() -> Response:
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)