From ef6ebe35837a17a5bd575d87bb93fdef01c15309 Mon Sep 17 00:00:00 2001 From: Apple Date: Thu, 5 Mar 2026 11:30:17 -0800 Subject: [PATCH] feat(runtime): sync experience bus and learner stack into main --- docker-compose.node1.yml | 74 +- docs/ops/experience_bus_phase1.md | 61 ++ docs/ops/experience_bus_phase2.md | 71 ++ docs/ops/experience_bus_phase3.md | 70 ++ docs/ops/experience_bus_phase4.md | 99 +++ .../phase4_1_payload_source_lock.json | 18 + ...ase4_1_payload_unsupported_no_message.json | 13 + .../phase5_payload_group_source_lock.json | 17 + ..._payload_group_unsupported_no_message.json | 19 + ...se5_payload_private_photo_unsupported.json | 17 + .../phase4_1_gateway_early_return_coverage.md | 134 +++ docs/ops/phase5_anti_silent_group_ux.md | 136 +++ gateway-bot/router_client.py | 7 +- migrations/054_agent_experience_events.sql | 38 + migrations/055_agent_lessons.sql | 27 + services/experience-learner/Dockerfile | 12 + services/experience-learner/main.py | 839 ++++++++++++++++++ services/experience-learner/requirements.txt | 5 + services/memory-service/app/database.py | 7 +- services/router/agent_metrics.py | 106 +++ services/router/experience_bus.py | 446 ++++++++++ services/router/main.py | 643 +++++++++++++- 22 files changed, 2837 insertions(+), 22 deletions(-) create mode 100644 docs/ops/experience_bus_phase1.md create mode 100644 docs/ops/experience_bus_phase2.md create mode 100644 docs/ops/experience_bus_phase3.md create mode 100644 docs/ops/experience_bus_phase4.md create mode 100644 docs/ops/payloads/phase4_1_payload_source_lock.json create mode 100644 docs/ops/payloads/phase4_1_payload_unsupported_no_message.json create mode 100644 docs/ops/payloads/phase5_payload_group_source_lock.json create mode 100644 docs/ops/payloads/phase5_payload_group_unsupported_no_message.json create mode 100644 docs/ops/payloads/phase5_payload_private_photo_unsupported.json create mode 100644 docs/ops/phase4_1_gateway_early_return_coverage.md create mode 100644 docs/ops/phase5_anti_silent_group_ux.md create mode 100644 migrations/054_agent_experience_events.sql create mode 100644 migrations/055_agent_lessons.sql create mode 100644 services/experience-learner/Dockerfile create mode 100644 services/experience-learner/main.py create mode 100644 services/experience-learner/requirements.txt create mode 100644 services/router/experience_bus.py diff --git a/docker-compose.node1.yml b/docker-compose.node1.yml index bbcbe3b5..1627787b 100644 --- a/docker-compose.node1.yml +++ b/docker-compose.node1.yml @@ -14,6 +14,7 @@ services: - ROUTER_CONFIG_PATH=/app/router_config.yaml - LOG_LEVEL=info - NODE_ID=noda1 + - EXPERIENCE_DATABASE_URL=postgresql://daarion:DaarionDB2026!@dagi-postgres:5432/daarion_memory - MEMORY_SERVICE_URL=http://memory-service:8000 # Timeout policy: Gateway (180s) > Router (60s) > LLM (30s) - ROUTER_TIMEOUT=180 @@ -64,7 +65,9 @@ services: - ${DEPLOY_ROOT:-.}/gateway-bot:/app/prompts:ro - ${DEPLOY_ROOT:-.}/logs:/app/logs networks: - - dagi-network + dagi-network: + aliases: + - dagi-router restart: unless-stopped extra_hosts: - "host.docker.internal:host-gateway" @@ -179,7 +182,18 @@ services: - BUILD_SHA=${BUILD_SHA:-dev} - BUILD_TIME=${BUILD_TIME:-local} - NODE_ID=NODA1 - - ROUTER_URL=${ROUTER_URL:-http://dagi-staging-router:8000} + - ROUTER_URL=${ROUTER_URL:-http://dagi-router:8000} + - NATS_URL=nats://nats:4222 + - EXPERIENCE_BUS_ENABLED=true + - EXPERIENCE_ENABLE_NATS=true + - EXPERIENCE_ENABLE_DB=true + - EXPERIENCE_STREAM_NAME=EXPERIENCE + - EXPERIENCE_SUBJECT_PREFIX=agent.experience.v1 + - EXPERIENCE_DATABASE_URL=postgresql://daarion:DaarionDB2026!@dagi-postgres:5432/daarion_memory + - GATEWAY_USER_SIGNAL_RETRY_WINDOW_SECONDS=30 + - ANTI_SILENT_TUNING_ENABLED=${ANTI_SILENT_TUNING_ENABLED:-false} + - ANTI_SILENT_TUNING_DB_TIMEOUT_MS=${ANTI_SILENT_TUNING_DB_TIMEOUT_MS:-40} + - ANTI_SILENT_TUNING_CACHE_TTL_SECONDS=${ANTI_SILENT_TUNING_CACHE_TTL_SECONDS:-60} - GATEWAY_MAX_TOKENS_CONCISE=350 - GATEWAY_MAX_TOKENS_SENPAI_DEFAULT=700 - GATEWAY_MAX_TOKENS_DEFAULT=700 @@ -282,7 +296,7 @@ services: container_name: dagi-gateway-worker-node1 command: ["python", "-m", "daarion_facade.worker"] environment: - - ROUTER_BASE_URL=http://router:8000 + - ROUTER_BASE_URL=http://dagi-router:8000 - REDIS_URL=redis://redis:6379/0 - ROUTER_WORKER_TIMEOUT=60 volumes: @@ -350,6 +364,56 @@ services: timeout: 5s retries: 3 + # Experience Learner (Phase-2): JetStream events -> aggregated lessons + experience-learner: + build: + context: ./services/experience-learner + dockerfile: Dockerfile + container_name: dagi-experience-learner-node1 + ports: + - "127.0.0.1:9109:9109" + environment: + - NODE_ID=NODA1 + - NATS_URL=nats://nats:4222 + - EXPERIENCE_STREAM_NAME=EXPERIENCE + - EXPERIENCE_SUBJECT=agent.experience.v1.> + - EXPERIENCE_DURABLE=experience-learner-v1 + - EXPERIENCE_DELIVER_POLICY=all + - EXPERIENCE_ACK_WAIT_SECONDS=30 + - EXPERIENCE_MAX_DELIVER=20 + - EXPERIENCE_FETCH_BATCH=64 + - EXPERIENCE_FETCH_TIMEOUT_SECONDS=2 + - EXPERIENCE_WINDOW_SECONDS=1800 + - EXPERIENCE_OK_SAMPLE_PCT=10 + - EXPERIENCE_LATENCY_SPIKE_MS=5000 + - EXPERIENCE_ERROR_THRESHOLD=3 + - EXPERIENCE_SILENT_THRESHOLD=5 + - EXPERIENCE_LATENCY_THRESHOLD=3 + - EXPERIENCE_EVENT_DEDUP_TTL_SECONDS=3600 + - LEARNER_DATABASE_URL=postgresql://daarion:DaarionDB2026!@dagi-postgres:5432/daarion_memory + - LESSON_SUBJECT=agent.lesson.v1 + - LESSON_PUBLISH_ENABLED=true + - ANTI_SILENT_TUNING_ENABLED=${ANTI_SILENT_TUNING_ENABLED:-true} + - ANTI_SILENT_TUNING_WINDOW_DAYS=${ANTI_SILENT_TUNING_WINDOW_DAYS:-7} + - ANTI_SILENT_TUNING_MIN_EVIDENCE=${ANTI_SILENT_TUNING_MIN_EVIDENCE:-20} + - ANTI_SILENT_TUNING_MIN_SCORE=${ANTI_SILENT_TUNING_MIN_SCORE:-0.75} + - ANTI_SILENT_TUNING_WEIGHT_RETRY=${ANTI_SILENT_TUNING_WEIGHT_RETRY:-0.6} + - ANTI_SILENT_TUNING_WEIGHT_NEGATIVE=${ANTI_SILENT_TUNING_WEIGHT_NEGATIVE:-0.3} + - ANTI_SILENT_TUNING_WEIGHT_SUPPRESSED=${ANTI_SILENT_TUNING_WEIGHT_SUPPRESSED:-0.1} + - ANTI_SILENT_TUNING_TTL_DAYS=${ANTI_SILENT_TUNING_TTL_DAYS:-7} + depends_on: + - nats + - dagi-postgres + networks: + - dagi-network + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "python -c \"import urllib.request; urllib.request.urlopen('http://localhost:9109/health')\""] + interval: 30s + timeout: 10s + retries: 3 + start_period: 10s + metrics-poller-node1: build: @@ -1228,7 +1292,7 @@ services: - SOFIIA_DATA_DIR=/data/sofiia - NODES_REGISTRY_PATH=/config/nodes_registry.yml - NODES_NODA1_SSH_PASSWORD=bRhfV7uNY9m6er - - ROUTER_URL=http://dagi-router-node1:8000 + - ROUTER_URL=http://dagi-router:8000 - GATEWAY_URL=http://dagi-gateway-node1:9300 - MEMORY_SERVICE_URL=http://dagi-memory-service-node1:8000 - OLLAMA_URL=http://172.18.0.1:11434 @@ -1238,6 +1302,8 @@ services: - SOFIIA_CONSOLE_API_KEY=${SOFIIA_CONSOLE_API_KEY:-} - SOFIIA_CONSOLE_TEAM_KEYS=${SOFIIA_CONSOLE_TEAM_KEYS:-} - SOFIIA_INTERNAL_TOKEN=${SOFIIA_INTERNAL_TOKEN:-} + # M3.1: control token for bridge→console runbook runs (separate from audit token) + - SOFIIA_CONTROL_TOKEN=${SOFIIA_CONTROL_TOKEN:-} # aurora-service not deployed on NODA1 — set explicit URL to avoid DNS lookup failure - AURORA_SERVICE_URL=http://127.0.0.1:9401 volumes: diff --git a/docs/ops/experience_bus_phase1.md b/docs/ops/experience_bus_phase1.md new file mode 100644 index 00000000..0f7caf3b --- /dev/null +++ b/docs/ops/experience_bus_phase1.md @@ -0,0 +1,61 @@ +# Experience Bus Phase-1 (Router First) + +## Scope +- Source: router `/v1/agents/{id}/infer` +- Event subject: `agent.experience.v1.` +- JetStream stream: `EXPERIENCE` (`agent.experience.v1.>`) +- DB table: `agent_experience_events` (append-only) +- Controls: dedup + sampling + +## Env knobs +- `EXPERIENCE_BUS_ENABLED=true` +- `EXPERIENCE_ENABLE_NATS=true` +- `EXPERIENCE_ENABLE_DB=true` +- `EXPERIENCE_DATABASE_URL=postgresql://:@:5432/daarion_memory` +- `EXPERIENCE_OK_SAMPLE_PCT=10` +- `EXPERIENCE_LATENCY_SPIKE_MS=5000` +- `EXPERIENCE_DEDUP_WINDOW_SECONDS=900` +- `EXPERIENCE_QUEUE_MAX=2000` + +## Deploy +1. Apply migration `migrations/054_agent_experience_events.sql`. +2. Deploy router with updated `main.py`, `experience_bus.py`, `agent_metrics.py`. +3. Restart router service. + +## Smoke (30 calls) +```bash +for i in $(seq 1 15); do + curl -sS -X POST http://127.0.0.1:9102/v1/agents/agromatrix/infer \ + -H 'content-type: application/json' \ + -d "{\"prompt\":\"experience smoke agromatrix $i $(date +%s%N)\"}" >/dev/null +done + +for i in $(seq 1 15); do + curl -sS -X POST http://127.0.0.1:9102/v1/agents/stepan/infer \ + -H 'content-type: application/json' \ + -d "{\"prompt\":\"experience smoke stepan $i $(date +%s%N)\"}" >/dev/null +done +``` + +## Verify JetStream +```bash +# example (inside nats container with nats CLI) +nats stream info EXPERIENCE +nats stream view EXPERIENCE --count 5 +``` + +## Verify DB +```sql +SELECT count(*) +FROM agent_experience_events +WHERE ts > now() - interval '10 minutes'; +``` + +## Verify lifecycle guard unchanged +```bash +curl -sS -o /dev/null -w '%{http_code}\n' -X POST http://127.0.0.1:9102/v1/agents/aistalk/infer -H 'content-type: application/json' -d '{"prompt":"ping"}' +# expected: 410 + +curl -sS -o /dev/null -w '%{http_code}\n' -X POST http://127.0.0.1:9102/v1/agents/devtools/infer -H 'content-type: application/json' -d '{"prompt":"ping"}' +# expected: 404 +``` diff --git a/docs/ops/experience_bus_phase2.md b/docs/ops/experience_bus_phase2.md new file mode 100644 index 00000000..f237ec25 --- /dev/null +++ b/docs/ops/experience_bus_phase2.md @@ -0,0 +1,71 @@ +# Experience Bus Phase-2 (Lessons Extractor) + +## Scope +- Source stream: `EXPERIENCE` +- Source subjects: `agent.experience.v1.>` +- Consumer mode: durable pull + explicit ack +- Output table: `agent_lessons` (append-only) +- Output subject: `agent.lesson.v1` (optional publish) + +## Service +- Container: `dagi-experience-learner-node1` +- Endpoint: + - `GET /health` + - `GET /metrics` + +## Environment +- `NATS_URL=nats://nats:4222` +- `EXPERIENCE_STREAM_NAME=EXPERIENCE` +- `EXPERIENCE_SUBJECT=agent.experience.v1.>` +- `EXPERIENCE_DURABLE=experience-learner-v1` +- `EXPERIENCE_ACK_WAIT_SECONDS=30` +- `EXPERIENCE_MAX_DELIVER=20` +- `EXPERIENCE_FETCH_BATCH=64` +- `EXPERIENCE_FETCH_TIMEOUT_SECONDS=2` +- `EXPERIENCE_WINDOW_SECONDS=1800` +- `EXPERIENCE_OK_SAMPLE_PCT=10` +- `EXPERIENCE_LATENCY_SPIKE_MS=5000` +- `EXPERIENCE_ERROR_THRESHOLD=3` +- `EXPERIENCE_SILENT_THRESHOLD=5` +- `EXPERIENCE_LATENCY_THRESHOLD=3` +- `EXPERIENCE_EVENT_DEDUP_TTL_SECONDS=3600` +- `LEARNER_DATABASE_URL=postgresql://:@:5432/daarion_memory` +- `LESSON_SUBJECT=agent.lesson.v1` +- `LESSON_PUBLISH_ENABLED=true` + +## Deploy +1. Apply migration `migrations/055_agent_lessons.sql`. +2. Deploy service `experience-learner`. +3. Verify service health and metrics. + +## Smoke +```bash +# Generate event traffic (Phase-1 router path) +for i in $(seq 1 50); do + agent=$([ $((i%2)) -eq 0 ] && echo "aistalk" || echo "devtools") + curl -sS -m 8 -o /dev/null \ + -X POST "http://127.0.0.1:9102/v1/agents/${agent}/infer" \ + -H "content-type: application/json" \ + -d "{\"prompt\":\"phase2-smoke-${agent}-${i}-$(date +%s%N)\"}" || true +done +``` + +## Verify +```bash +# Lessons rows +docker exec dagi-postgres psql -U daarion -d daarion_memory -tAc \ + "SELECT count(*) FROM agent_lessons WHERE ts > now()-interval '30 minutes';" + +# Idempotency check (run again, duplicates should not explode) +docker exec dagi-postgres psql -U daarion -d daarion_memory -tAc \ + "SELECT count(*), count(distinct lesson_key) FROM agent_lessons;" + +# Learner metrics +curl -sS http://127.0.0.1:9109/metrics | grep -E 'lessons_|js_messages_' +``` + +## Acceptance +- `agent_lessons` receives rows under live event flow. +- Reprocessing/redelivery does not duplicate lessons (`lesson_key` unique). +- `js_messages_acked_total` increases. +- `js_messages_redelivered_total` is observable when replay/redelivery occurs. diff --git a/docs/ops/experience_bus_phase3.md b/docs/ops/experience_bus_phase3.md new file mode 100644 index 00000000..ccf11e72 --- /dev/null +++ b/docs/ops/experience_bus_phase3.md @@ -0,0 +1,70 @@ +# Experience Bus Phase-3 (Router Runtime Retrieval) + +## Scope +- Read path only in `router` before `/v1/agents/{id}/infer`. +- Retrieves lessons from `agent_lessons` and injects a compact block: + - `Operational Lessons (apply if relevant)` +- Attach policy: + - after last error / latency spike: always-on, `K=7` + - otherwise sampled attach, default `10%`, `K=3` + +## Environment +- `LESSONS_ATTACH_ENABLED=true` +- `LESSONS_DATABASE_URL=postgresql://:@:5432/daarion_memory` +- `LESSONS_ATTACH_MIN=3` +- `LESSONS_ATTACH_MAX=7` +- `LESSONS_ATTACH_SAMPLE_PCT=10` +- `LESSONS_ATTACH_TIMEOUT_MS=25` +- `LESSONS_ATTACH_MAX_CHARS=1200` +- `LESSONS_SIGNAL_CACHE_TTL_SECONDS=300` +- `EXPERIENCE_LATENCY_SPIKE_MS=5000` + +## Metrics +- `lessons_retrieved_total{status="ok|timeout|err"}` +- `lessons_attached_total{count="0|1-3|4-7"}` +- `lessons_attach_latency_ms` + +## Safety +- Lessons block never includes raw user text. +- Guard filters skip lessons containing prompt-injection-like markers: + - `ignore previous`, `system:`, `developer:`, fenced code blocks. + +## Smoke +```bash +# 1) Seed synthetic lessons for one agent (example: agromatrix) +docker exec dagi-postgres psql -U daarion -d daarion_memory -c " +INSERT INTO agent_lessons (lesson_id, lesson_key, ts, scope, agent_id, task_type, trigger, action, avoid, signals, evidence, raw) +SELECT + gen_random_uuid(), + md5(random()::text || clock_timestamp()::text), + now() - (g * interval '1 minute'), + 'agent', + 'agromatrix', + 'infer', + 'when retrying after model timeout', + 'switch provider or reduce token budget first', + 'avoid repeating the same failed provider with same payload', + '{"error_class":"TimeoutError","provider":"deepseek","model":"deepseek-chat","profile":"reasoning"}'::jsonb, + '{"count":3}'::jsonb, + '{}'::jsonb +FROM generate_series(1,10) g;" + +# 2) Send infer calls +for i in $(seq 1 20); do + curl -sS -m 12 -o /dev/null \ + -X POST "http://127.0.0.1:9102/v1/agents/agromatrix/infer" \ + -H "content-type: application/json" \ + -d "{\"prompt\":\"phase3-smoke-${i}\",\"metadata\":{\"agent_id\":\"agromatrix\"}}" || true +done + +# 3) Check metrics +curl -sS http://127.0.0.1:9102/metrics | grep -E 'lessons_retrieved_total|lessons_attached_total|lessons_attach_latency_ms' + +# 4) Simulate DB issue (optional): lessons retrieval should fail-open and infer remains 200 +# (temporarily point LESSONS_DATABASE_URL to bad DSN + restart router) +``` + +## Acceptance +- Router logs include `lessons_attached=` during sampled or always-on retrieval. +- Infer path remains healthy when lessons DB is unavailable. +- p95 infer latency impact stays controlled at sampling `10%`. diff --git a/docs/ops/experience_bus_phase4.md b/docs/ops/experience_bus_phase4.md new file mode 100644 index 00000000..57e2ba09 --- /dev/null +++ b/docs/ops/experience_bus_phase4.md @@ -0,0 +1,99 @@ +# Experience Bus Phase-4 (Gateway Hooks) + +## Scope +- Source: `gateway` (Telegram webhook path). +- Emits `agent.experience.v1.` events with: + - `source="gateway"` + - `request_id`/`correlation_id` + - `policy.sowa_decision` + normalized `reason` + - `feedback.user_signal` (`none|positive|negative|retry|timeout`) +- Optional DB append to `agent_experience_events` (fail-open). + +## Environment (gateway) +- `NATS_URL=nats://nats:4222` +- `EXPERIENCE_BUS_ENABLED=true` +- `EXPERIENCE_ENABLE_NATS=true` +- `EXPERIENCE_ENABLE_DB=true` +- `EXPERIENCE_STREAM_NAME=EXPERIENCE` +- `EXPERIENCE_SUBJECT_PREFIX=agent.experience.v1` +- `EXPERIENCE_DATABASE_URL=postgresql://:@:5432/daarion_memory` +- `GATEWAY_USER_SIGNAL_RETRY_WINDOW_SECONDS=30` + +## Metrics +- `gateway_experience_published_total{status="ok|err"}` +- `gateway_policy_decisions_total{sowa_decision,reason}` +- `gateway_user_signal_total{user_signal}` +- `gateway_webhook_latency_ms` + +## Correlation contract +- Gateway creates `request_id` (`correlation_id`) per webhook cycle. +- Gateway forwards it to router via: + - `metadata.request_id` + - `metadata.trace_id` + - `X-Request-Id` header +- Router writes same `request_id` in its event payload for join. + +## Smoke +```bash +# 1) Send webhook payload (agent-specific endpoint) +curl -sS -X POST "http://127.0.0.1:9300/helion/telegram/webhook" \ + -H "content-type: application/json" \ + -d '{ + "update_id": 900001, + "message": { + "message_id": 101, + "date": 1760000000, + "text": "дякую", + "chat": {"id": "smoke-chat-1", "type": "private"}, + "from": {"id": 7001, "username": "smoke_user", "is_bot": false} + } + }' + +# 2) Retry signal (same text quickly) +curl -sS -X POST "http://127.0.0.1:9300/helion/telegram/webhook" \ + -H "content-type: application/json" \ + -d '{ + "update_id": 900002, + "message": { + "message_id": 102, + "date": 1760000005, + "text": "перевір", + "chat": {"id": "smoke-chat-1", "type": "private"}, + "from": {"id": 7001, "username": "smoke_user", "is_bot": false} + } + }' + +curl -sS -X POST "http://127.0.0.1:9300/helion/telegram/webhook" \ + -H "content-type: application/json" \ + -d '{ + "update_id": 900003, + "message": { + "message_id": 103, + "date": 1760000010, + "text": "перевір", + "chat": {"id": "smoke-chat-1", "type": "private"}, + "from": {"id": 7001, "username": "smoke_user", "is_bot": false} + } + }' + +# 3) Verify metrics +curl -sS http://127.0.0.1:9300/metrics | grep -E 'gateway_experience_published_total|gateway_policy_decisions_total|gateway_user_signal_total|gateway_webhook_latency_ms' + +# 4) Verify DB rows +docker exec dagi-postgres psql -U daarion -d daarion_memory -tAc \ + "SELECT count(*) FROM agent_experience_events WHERE source='gateway' AND ts > now()-interval '10 minutes';" + +# 5) Verify correlation join (gateway <-> router) +docker exec dagi-postgres psql -U daarion -d daarion_memory -P pager=off -c \ + "SELECT source, agent_id, request_id, task_type, ts + FROM agent_experience_events + WHERE ts > now()-interval '10 minutes' + AND source IN ('gateway','router') + ORDER BY ts DESC LIMIT 40;" +``` + +## Acceptance +- Gateway publishes and stores events without blocking webhook path. +- `request_id` can join gateway and router records for same conversation turn. +- `policy.sowa_decision` and `feedback.user_signal` are present in gateway `raw` event. +- If NATS/DB unavailable, webhook still returns normal success path (fail-open telemetry). diff --git a/docs/ops/payloads/phase4_1_payload_source_lock.json b/docs/ops/payloads/phase4_1_payload_source_lock.json new file mode 100644 index 00000000..ba72bdde --- /dev/null +++ b/docs/ops/payloads/phase4_1_payload_source_lock.json @@ -0,0 +1,18 @@ +{ + "update_id": 900002, + "message": { + "message_id": 1, + "date": 1760007001, + "chat": { + "id": 0, + "type": "private" + }, + "from": { + "id": 0, + "is_bot": false, + "first_name": "Source", + "username": "lock_smoke" + }, + "text": "source-lock-smoke" + } +} diff --git a/docs/ops/payloads/phase4_1_payload_unsupported_no_message.json b/docs/ops/payloads/phase4_1_payload_unsupported_no_message.json new file mode 100644 index 00000000..61aaed43 --- /dev/null +++ b/docs/ops/payloads/phase4_1_payload_unsupported_no_message.json @@ -0,0 +1,13 @@ +{ + "update_id": 900001, + "inline_query": { + "id": "phase4-inline-1", + "query": "unsupported-smoke", + "offset": "", + "from": { + "id": 12345, + "is_bot": false, + "first_name": "Smoke" + } + } +} diff --git a/docs/ops/payloads/phase5_payload_group_source_lock.json b/docs/ops/payloads/phase5_payload_group_source_lock.json new file mode 100644 index 00000000..163c1f5b --- /dev/null +++ b/docs/ops/payloads/phase5_payload_group_source_lock.json @@ -0,0 +1,17 @@ +{ + "update_id": 910002, + "message": { + "message_id": 1002, + "date": 1760010002, + "chat": { + "id": -1005001002, + "type": "group" + }, + "from": { + "id": 551002, + "is_bot": false, + "username": "phase5_lock_user" + }, + "text": "agromatrix, перевір lock smoke" + } +} diff --git a/docs/ops/payloads/phase5_payload_group_unsupported_no_message.json b/docs/ops/payloads/phase5_payload_group_unsupported_no_message.json new file mode 100644 index 00000000..67ba0499 --- /dev/null +++ b/docs/ops/payloads/phase5_payload_group_unsupported_no_message.json @@ -0,0 +1,19 @@ +{ + "update_id": 910001, + "message": { + "message_id": 1001, + "date": 1760010001, + "chat": { + "id": -1005001001, + "type": "group" + }, + "from": { + "id": 551001, + "is_bot": false, + "username": "phase5_group_user" + }, + "sticker": { + "file_id": "dummy_sticker_file" + } + } +} diff --git a/docs/ops/payloads/phase5_payload_private_photo_unsupported.json b/docs/ops/payloads/phase5_payload_private_photo_unsupported.json new file mode 100644 index 00000000..4c1d3b5d --- /dev/null +++ b/docs/ops/payloads/phase5_payload_private_photo_unsupported.json @@ -0,0 +1,17 @@ +{ + "update_id": 910003, + "message": { + "message_id": 1003, + "date": 1760010003, + "chat": { + "id": 551003, + "type": "private" + }, + "from": { + "id": 551003, + "is_bot": false, + "username": "phase5_private_user" + }, + "text": "що на цьому фото?" + } +} diff --git a/docs/ops/phase4_1_gateway_early_return_coverage.md b/docs/ops/phase4_1_gateway_early_return_coverage.md new file mode 100644 index 00000000..567df57f --- /dev/null +++ b/docs/ops/phase4_1_gateway_early_return_coverage.md @@ -0,0 +1,134 @@ +# Phase-4.1 Gateway Early-Return Coverage + +## Goal +Enforce the gateway telemetry invariant: +- 1 webhook call -> 1 `source="gateway"` event row. +- `request_id` always present. +- Early-return branches are emitted with deterministic reasons. + +## Deploy +```bash +cd /opt/microdao-daarion +docker compose -f docker-compose.node1.yml up -d --no-deps --build --force-recreate gateway +``` + +## Seed / Precheck +```bash +export GATEWAY_WEBHOOK_URL='http://127.0.0.1:9300/agromatrix/telegram/webhook' +export PG_CONTAINER='dagi-postgres' + +pre_rows_gateway=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \ + "SELECT count(*) FROM agent_experience_events WHERE source='gateway' AND ts > now()-interval '10 minutes';") + +pre_rows_join=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \ + "SELECT count(*) FROM ( + SELECT g.request_id + FROM agent_experience_events g + JOIN agent_experience_events r ON r.request_id=g.request_id + WHERE g.source='gateway' AND r.source='router' AND g.ts > now()-interval '10 minutes' + ) x;") + +pre_js=$(curl -sS http://127.0.0.1:8222/jsz?streams=true | python3 -c ' +import json,sys +j=json.load(sys.stdin) +d=(j.get("account_details") or [{}])[0].get("stream_detail") or [] +print(next((s.get("state",{}).get("messages",0) for s in d if s.get("name")=="EXPERIENCE"),0)) +') + +echo "pre_rows_gateway=$pre_rows_gateway" +echo "pre_rows_join=$pre_rows_join" +echo "pre_js=$pre_js" +``` + +## Fixed Payload Replay + +### 1) Unsupported (`unsupported_no_message`) +Payload file: +- `docs/ops/payloads/phase4_1_payload_unsupported_no_message.json` + +```bash +curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \ + -H 'content-type: application/json' \ + -d @docs/ops/payloads/phase4_1_payload_unsupported_no_message.json +``` + +### 2) Source-lock (`source_lock_duplicate_update`) +Payload file: +- `docs/ops/payloads/phase4_1_payload_source_lock.json` + +```bash +# first request +curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \ + -H 'content-type: application/json' \ + -d @docs/ops/payloads/phase4_1_payload_source_lock.json + +# duplicate replay (same update_id) +curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \ + -H 'content-type: application/json' \ + -d @docs/ops/payloads/phase4_1_payload_source_lock.json +``` + +## Assertions + +### A) Row delta strictness +```bash +post_rows_gateway=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \ + "SELECT count(*) FROM agent_experience_events WHERE source='gateway' AND ts > now()-interval '10 minutes';") + +delta_rows=$((post_rows_gateway-pre_rows_gateway)) +echo "delta_rows=$delta_rows" +# expected: delta_rows == 3 for this replay batch +``` + +### B) Deterministic reasons + unknown policy on unsupported +```bash +docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -P pager=off -c " +SELECT request_id, + raw->'policy'->>'sowa_decision' as sowa, + raw->'policy'->>'reason' as reason, + raw->'feedback'->>'user_signal' as user_signal, + raw->'result'->>'http_status' as http_status, + ts +FROM agent_experience_events +WHERE source='gateway' AND ts > now()-interval '10 minutes' +ORDER BY ts DESC LIMIT 20; +" + +# expected in this batch: +# - reason=unsupported_no_message (1 row), policy.sowa_decision=UNKNOWN +# - reason=source_lock_duplicate_update (1 row) +# - first source-lock request usually reason=prober_request (or normal path reason) +``` + +### C) Metrics assertions +```bash +curl -sS http://127.0.0.1:9300/metrics | grep -E \ +'gateway_experience_emitted_total|gateway_early_return_total|gateway_event_finalize_latency_ms|gateway_experience_published_total' + +# expected: +# - gateway_experience_emitted_total{path="early_return",status="ok"} increments +# - gateway_early_return_total{reason="unsupported_no_message"} increments +# - gateway_early_return_total{reason="source_lock_duplicate_update"} increments +``` + +### D) Join sanity (normal path unaffected) +```bash +post_rows_join=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \ + "SELECT count(*) FROM ( + SELECT g.request_id + FROM agent_experience_events g + JOIN agent_experience_events r ON r.request_id=g.request_id + WHERE g.source='gateway' AND r.source='router' AND g.ts > now()-interval '10 minutes' + ) x;") + +echo "post_rows_join=$post_rows_join" +# expected: join remains non-zero for normal webhook traffic +``` + +## PASS Criteria +- Strict `delta_rows == N_webhooks` for replay batch. +- Exactly one `source='gateway'` row per webhook call. +- `request_id` present on all new rows. +- Early-return reasons are deterministic (`unsupported_no_message`, `source_lock_duplicate_update`). +- Metrics counters for early-return/finalize are incrementing. +- Normal gateway<->router join remains healthy. diff --git a/docs/ops/phase5_anti_silent_group_ux.md b/docs/ops/phase5_anti_silent_group_ux.md new file mode 100644 index 00000000..73e2bc97 --- /dev/null +++ b/docs/ops/phase5_anti_silent_group_ux.md @@ -0,0 +1,136 @@ +# Phase-5 Anti-Silent / Group UX + +## Goal +Reduce user-facing silent outcomes in group/private chat flows while avoiding spam. + +## Invariants +- `I1`: no silent user-facing failure for `public_active` agents (`SILENT`/early-return -> short ACK). +- `I2`: one-message rule per webhook in group chats. +- `I3`: debounce by `(chat_id, agent_id, reason)` with cooldown. +- `I4`: evidence in gateway event (`anti_silent_action`, `anti_silent_template`, `chat_type`). + +## Deploy +```bash +cd /opt/microdao-daarion +docker compose -f docker-compose.node1.yml up -d --no-deps --build --force-recreate gateway +``` + +## Seed / Precheck +```bash +export GATEWAY_WEBHOOK_URL='http://127.0.0.1:9300/agromatrix/telegram/webhook' +export PG_CONTAINER='dagi-postgres' + +pre_rows_gateway=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \ + "SELECT count(*) FROM agent_experience_events WHERE source='gateway' AND ts > now()-interval '10 minutes';") + +pre_rows_join=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \ + "SELECT count(*) FROM ( + SELECT g.request_id + FROM agent_experience_events g + JOIN agent_experience_events r ON r.request_id=g.request_id + WHERE g.source='gateway' AND r.source='router' AND g.ts > now()-interval '10 minutes' + ) x;") + +pre_js=$(curl -sS http://127.0.0.1:8222/jsz?streams=true | python3 -c ' +import json,sys +j=json.load(sys.stdin) +d=(j.get("account_details") or [{}])[0].get("stream_detail") or [] +print(next((s.get("state",{}).get("messages",0) for s in d if s.get("name")=="EXPERIENCE"),0)) +') + +echo "pre_rows_gateway=$pre_rows_gateway" +echo "pre_rows_join=$pre_rows_join" +echo "pre_js=$pre_js" +``` + +## Fixed Payload Replay + +Payloads: +- `docs/ops/payloads/phase5_payload_group_unsupported_no_message.json` +- `docs/ops/payloads/phase5_payload_group_source_lock.json` +- `docs/ops/payloads/phase5_payload_private_photo_unsupported.json` + +```bash +# 1) group unsupported_no_message +curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \ + -H 'content-type: application/json' \ + -d @docs/ops/payloads/phase5_payload_group_unsupported_no_message.json + +# 2) group source_lock pair (same update_id) +curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \ + -H 'content-type: application/json' \ + -d @docs/ops/payloads/phase5_payload_group_source_lock.json + +curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \ + -H 'content-type: application/json' \ + -d @docs/ops/payloads/phase5_payload_group_source_lock.json + +# 3) private photo unsupported (photo follow-up without image context) +curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \ + -H 'content-type: application/json' \ + -d @docs/ops/payloads/phase5_payload_private_photo_unsupported.json +``` + +## Assertions + +### A) Strict row delta +```bash +post_rows_gateway=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \ + "SELECT count(*) FROM agent_experience_events WHERE source='gateway' AND ts > now()-interval '10 minutes';") + +delta_rows=$((post_rows_gateway-pre_rows_gateway)) +echo "delta_rows=$delta_rows" +# expected: delta_rows == 4 +``` + +### B) Anti-silent evidence in DB +```bash +docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -P pager=off -c " +SELECT request_id, + raw->>'chat_type' as chat_type, + raw->'policy'->>'sowa_decision' as sowa, + raw->'policy'->>'reason' as reason, + raw->>'anti_silent_action' as anti_silent_action, + raw->>'anti_silent_template' as anti_silent_template, + raw->'result'->>'http_status' as http_status, + ts +FROM agent_experience_events +WHERE source='gateway' AND agent_id='agromatrix' AND ts > now()-interval '10 minutes' +ORDER BY ts DESC LIMIT 25; +" + +# expected: +# - reason=unsupported_no_message with sowa=UNKNOWN (group path) +# - reason=source_lock_duplicate_update with anti_silent_action in (ACK_EMITTED, ACK_SUPPRESSED_COOLDOWN) +# - reason=photo_followup_without_image_context with anti_silent_action=ACK_EMITTED +``` + +### C) Metrics +```bash +curl -sS http://127.0.0.1:9300/metrics | grep -E \ +'gateway_anti_silent_total|gateway_ack_sent_total|gateway_experience_emitted_total|gateway_early_return_total' + +# expected: +# - gateway_anti_silent_total{action="ACK_EMITTED",reason="...",chat_type="group|private"} increments +# - source-lock repeated request may produce ACK_SUPPRESSED_COOLDOWN depending on timing +``` + +### D) Join sanity (normal path still healthy) +```bash +post_rows_join=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \ + "SELECT count(*) FROM ( + SELECT g.request_id + FROM agent_experience_events g + JOIN agent_experience_events r ON r.request_id=g.request_id + WHERE g.source='gateway' AND r.source='router' AND g.ts > now()-interval '10 minutes' + ) x;") + +echo "post_rows_join=$post_rows_join" +# expected: non-zero with normal traffic +``` + +## PASS Criteria +- `delta_rows == 4` for fixed replay batch. +- Deterministic reason codes present (`unsupported_no_message`, `source_lock_duplicate_update`, `photo_followup_without_image_context`). +- `anti_silent_action` is present for anti-silent branches (`ACK_EMITTED` or `ACK_SUPPRESSED_COOLDOWN`). +- No evidence of double-event for one webhook request. diff --git a/gateway-bot/router_client.py b/gateway-bot/router_client.py index b0a289ca..f618044a 100644 --- a/gateway-bot/router_client.py +++ b/gateway-bot/router_client.py @@ -75,6 +75,10 @@ async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]: system_prompt = body.get("system_prompt") or context.get("system_prompt") system_prompt = _apply_agent_style_guardrails(agent_id, system_prompt) system_prompt = _apply_runtime_communication_guardrails(system_prompt, metadata) + request_id = str(metadata.get("request_id") or metadata.get("trace_id") or "").strip() + if request_id: + metadata["request_id"] = request_id + metadata["trace_id"] = request_id if system_prompt: logger.info(f"Using system prompt ({len(system_prompt)} chars) for agent {agent_id}") @@ -124,7 +128,8 @@ async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]: try: async with httpx.AsyncClient(timeout=ROUTER_TIMEOUT) as client: - response = await client.post(infer_url, json=infer_body) + headers = {"X-Request-Id": request_id} if request_id else None + response = await client.post(infer_url, json=infer_body, headers=headers) response.raise_for_status() result = response.json() diff --git a/migrations/054_agent_experience_events.sql b/migrations/054_agent_experience_events.sql new file mode 100644 index 00000000..b9da1300 --- /dev/null +++ b/migrations/054_agent_experience_events.sql @@ -0,0 +1,38 @@ +-- Phase-1: Router Experience Bus event store (append-only) + +CREATE TABLE IF NOT EXISTS agent_experience_events ( + id BIGSERIAL PRIMARY KEY, + event_id UUID NOT NULL UNIQUE, + ts TIMESTAMPTZ NOT NULL, + node_id TEXT NOT NULL, + source TEXT NOT NULL, + agent_id TEXT NOT NULL, + task_type TEXT NOT NULL, + request_id TEXT NULL, + channel TEXT NOT NULL DEFAULT 'unknown', + inputs_hash TEXT NOT NULL, + provider TEXT NOT NULL, + model TEXT NOT NULL, + profile TEXT NULL, + latency_ms INT NOT NULL, + tokens_in INT NULL, + tokens_out INT NULL, + ok BOOLEAN NOT NULL, + error_class TEXT NULL, + error_msg_redacted TEXT NULL, + http_status INT NOT NULL, + raw JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_agent_experience_events_agent_ts + ON agent_experience_events (agent_id, ts DESC); + +CREATE INDEX IF NOT EXISTS idx_agent_experience_events_task_ts + ON agent_experience_events (task_type, ts DESC); + +CREATE INDEX IF NOT EXISTS idx_agent_experience_events_hash_ts + ON agent_experience_events (inputs_hash, ts DESC); + +CREATE INDEX IF NOT EXISTS idx_agent_experience_events_ok_ts + ON agent_experience_events (ok, ts DESC); diff --git a/migrations/055_agent_lessons.sql b/migrations/055_agent_lessons.sql new file mode 100644 index 00000000..8ea1ebaf --- /dev/null +++ b/migrations/055_agent_lessons.sql @@ -0,0 +1,27 @@ +-- Phase-2: Experience Learner lessons store (append-only) + +CREATE TABLE IF NOT EXISTS agent_lessons ( + id BIGSERIAL PRIMARY KEY, + lesson_id UUID NOT NULL UNIQUE, + lesson_key TEXT NOT NULL UNIQUE, + ts TIMESTAMPTZ NOT NULL, + scope TEXT NOT NULL, + agent_id TEXT NULL, + task_type TEXT NOT NULL, + trigger TEXT NOT NULL, + action TEXT NOT NULL, + avoid TEXT NOT NULL, + signals JSONB NOT NULL, + evidence JSONB NOT NULL, + raw JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_agent_lessons_scope_ts + ON agent_lessons (scope, ts DESC); + +CREATE INDEX IF NOT EXISTS idx_agent_lessons_agent_ts + ON agent_lessons (agent_id, ts DESC); + +CREATE INDEX IF NOT EXISTS idx_agent_lessons_task_ts + ON agent_lessons (task_type, ts DESC); diff --git a/services/experience-learner/Dockerfile b/services/experience-learner/Dockerfile new file mode 100644 index 00000000..e5bf3b57 --- /dev/null +++ b/services/experience-learner/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY main.py . + +EXPOSE 9109 + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "9109"] diff --git a/services/experience-learner/main.py b/services/experience-learner/main.py new file mode 100644 index 00000000..d9195338 --- /dev/null +++ b/services/experience-learner/main.py @@ -0,0 +1,839 @@ +from __future__ import annotations + +import asyncio +import contextlib +import hashlib +import json +import logging +import os +import random +import re +import time +import uuid +from collections import OrderedDict, deque +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any, Deque, Dict, Optional, Tuple + +import asyncpg +import nats +from fastapi import FastAPI, Response +from nats.aio.msg import Msg +from nats.js.api import AckPolicy, ConsumerConfig, DeliverPolicy +from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, generate_latest + + +logging.basicConfig( + level=os.getenv("LOG_LEVEL", "INFO").upper(), + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger("experience_learner") + + +LESSONS_EXTRACTED = Counter( + "lessons_extracted_total", + "Total lessons extracted from experience events", + ["status"], +) +LESSONS_INSERT = Counter( + "lessons_insert_total", + "Total lesson insert attempts", + ["status"], +) +JS_MESSAGES_ACKED = Counter( + "js_messages_acked_total", + "Total JetStream messages acked by learner", +) +JS_MESSAGES_REDELIVERED = Counter( + "js_messages_redelivered_total", + "Total redelivered JetStream messages observed by learner", +) +EVENTS_SELECTED = Counter( + "experience_learner_events_selected_total", + "Events selected for learner processing", + ["reason"], +) +EVENTS_DROPPED = Counter( + "experience_learner_events_dropped_total", + "Events dropped by learner filtering/dedup", + ["reason"], +) +LESSON_PUBLISH = Counter( + "experience_learner_lessons_published_total", + "Lesson publish attempts to JetStream", + ["status"], +) +ANTI_SILENT_TUNING_EVALUATED = Counter( + "experience_learner_anti_silent_tuning_evaluated_total", + "Anti-silent tuning lesson generation evaluations", + ["status"], +) +CONSUMER_RUNNING = Gauge( + "experience_learner_consumer_running", + "1 when learner consumer loop is running", +) + + +@dataclass +class EventSample: + ts_mono: float + ok: bool + latency_ms: int + + +class ExperienceLearner: + def __init__(self) -> None: + self.node_id = os.getenv("NODE_ID", "NODA1") + self.nats_url = os.getenv("NATS_URL", "nats://nats:4222") + self.stream_name = os.getenv("EXPERIENCE_STREAM_NAME", "EXPERIENCE") + self.subject = os.getenv("EXPERIENCE_SUBJECT", "agent.experience.v1.>") + self.lesson_subject = os.getenv("LESSON_SUBJECT", "agent.lesson.v1") + self.durable = os.getenv("EXPERIENCE_DURABLE", "experience-learner-v1") + self.deliver_policy = os.getenv("EXPERIENCE_DELIVER_POLICY", "all").lower() + self.ack_wait_s = float(os.getenv("EXPERIENCE_ACK_WAIT_SECONDS", "30")) + self.max_deliver = int(os.getenv("EXPERIENCE_MAX_DELIVER", "20")) + self.fetch_batch = int(os.getenv("EXPERIENCE_FETCH_BATCH", "64")) + self.fetch_timeout_s = float(os.getenv("EXPERIENCE_FETCH_TIMEOUT_SECONDS", "2")) + + self.window_s = int(os.getenv("EXPERIENCE_WINDOW_SECONDS", "1800")) + self.ok_sample_pct = float(os.getenv("EXPERIENCE_OK_SAMPLE_PCT", "10")) + self.latency_spike_ms = int(os.getenv("EXPERIENCE_LATENCY_SPIKE_MS", "5000")) + self.error_threshold = int(os.getenv("EXPERIENCE_ERROR_THRESHOLD", "3")) + self.silent_threshold = int(os.getenv("EXPERIENCE_SILENT_THRESHOLD", "5")) + self.latency_threshold = int(os.getenv("EXPERIENCE_LATENCY_THRESHOLD", "3")) + + self.event_dedup_ttl_s = int(os.getenv("EXPERIENCE_EVENT_DEDUP_TTL_SECONDS", "3600")) + self.event_dedup_max = int(os.getenv("EXPERIENCE_EVENT_DEDUP_MAX", "100000")) + self.publish_lessons = os.getenv("LESSON_PUBLISH_ENABLED", "true").lower() in {"1", "true", "yes"} + self.anti_silent_tuning_enabled = os.getenv("ANTI_SILENT_TUNING_ENABLED", "true").lower() in {"1", "true", "yes"} + self.anti_silent_window_days = max(1, int(os.getenv("ANTI_SILENT_TUNING_WINDOW_DAYS", "7"))) + self.anti_silent_min_evidence = max(1, int(os.getenv("ANTI_SILENT_TUNING_MIN_EVIDENCE", "20"))) + self.anti_silent_min_score = max(0.0, min(1.0, float(os.getenv("ANTI_SILENT_TUNING_MIN_SCORE", "0.75")))) + self.anti_silent_weight_retry = max(0.0, min(1.0, float(os.getenv("ANTI_SILENT_TUNING_WEIGHT_RETRY", "0.6")))) + self.anti_silent_weight_negative = max(0.0, min(1.0, float(os.getenv("ANTI_SILENT_TUNING_WEIGHT_NEGATIVE", "0.3")))) + self.anti_silent_weight_suppressed = max(0.0, min(1.0, float(os.getenv("ANTI_SILENT_TUNING_WEIGHT_SUPPRESSED", "0.1")))) + self.anti_silent_ttl_days = max(1, int(os.getenv("ANTI_SILENT_TUNING_TTL_DAYS", "7"))) + + self.db_dsn = ( + os.getenv("LEARNER_DATABASE_URL") + or os.getenv("EXPERIENCE_DATABASE_URL") + or os.getenv("DATABASE_URL") + ) + if not self.db_dsn: + raise RuntimeError("LEARNER_DATABASE_URL (or EXPERIENCE_DATABASE_URL/DATABASE_URL) is required") + + self._running = False + self._task: Optional[asyncio.Task[Any]] = None + self._nc = None + self._js = None + self._sub = None + self._pool: Optional[asyncpg.Pool] = None + + self._seen_events: "OrderedDict[str, float]" = OrderedDict() + self._buckets: Dict[str, Deque[EventSample]] = {} + self._lock = asyncio.Lock() + + async def start(self) -> None: + if self._running: + return + self._pool = await asyncpg.create_pool(self.db_dsn, min_size=1, max_size=4) + self._nc = await nats.connect(self.nats_url) + self._js = self._nc.jetstream() + await self._ensure_consumer() + self._sub = await self._js.pull_subscribe( + self.subject, + durable=self.durable, + stream=self.stream_name, + ) + self._running = True + CONSUMER_RUNNING.set(1) + self._task = asyncio.create_task(self._consume_loop(), name="experience-learner") + logger.info( + "experience-learner started stream=%s subject=%s durable=%s", + self.stream_name, + self.subject, + self.durable, + ) + + async def stop(self) -> None: + self._running = False + CONSUMER_RUNNING.set(0) + if self._task: + self._task.cancel() + with contextlib.suppress(Exception): + await self._task + self._task = None + if self._nc: + await self._nc.close() + self._nc = None + self._js = None + self._sub = None + if self._pool: + await self._pool.close() + self._pool = None + + async def _ensure_consumer(self) -> None: + if self._js is None: + return + deliver_policy = DeliverPolicy.ALL if self.deliver_policy == "all" else DeliverPolicy.NEW + cfg = ConsumerConfig( + durable_name=self.durable, + ack_policy=AckPolicy.EXPLICIT, + ack_wait=self.ack_wait_s, + max_deliver=self.max_deliver, + deliver_policy=deliver_policy, + filter_subject=self.subject, + ) + try: + await self._js.add_consumer(self.stream_name, config=cfg) + logger.info("consumer created durable=%s stream=%s", self.durable, self.stream_name) + except Exception as exc: + msg = str(exc).lower() + if "consumer name already in use" in msg or "consumer already exists" in msg: + logger.info("consumer exists durable=%s stream=%s", self.durable, self.stream_name) + else: + raise + + async def _consume_loop(self) -> None: + assert self._sub is not None + while self._running: + try: + msgs = await self._sub.fetch(self.fetch_batch, timeout=self.fetch_timeout_s) + except asyncio.TimeoutError: + continue + except Exception as exc: + logger.warning("fetch failed: %s", exc) + await asyncio.sleep(1.0) + continue + + for msg in msgs: + await self._handle_msg(msg) + + async def _handle_msg(self, msg: Msg) -> None: + try: + metadata = getattr(msg, "metadata", None) + if metadata is not None and getattr(metadata, "num_delivered", 1) > 1: + JS_MESSAGES_REDELIVERED.inc() + + event = json.loads(msg.data.decode("utf-8", errors="replace")) + if not isinstance(event, dict): + EVENTS_DROPPED.labels(reason="invalid_payload").inc() + await msg.ack() + JS_MESSAGES_ACKED.inc() + return + + event_id = str(event.get("event_id") or "").strip() + if event_id and await self._seen_event(event_id): + EVENTS_DROPPED.labels(reason="event_dedup").inc() + await msg.ack() + JS_MESSAGES_ACKED.inc() + return + + keep, reason = self._should_keep(event) + if not keep: + EVENTS_DROPPED.labels(reason=reason).inc() + await msg.ack() + JS_MESSAGES_ACKED.inc() + return + + EVENTS_SELECTED.labels(reason=reason).inc() + lessons = await self._extract_lessons(event) + if not lessons: + await msg.ack() + JS_MESSAGES_ACKED.inc() + return + + for lesson in lessons: + LESSONS_EXTRACTED.labels(status="ok").inc() + insert_status = await self._insert_lesson(lesson) + LESSONS_INSERT.labels(status=insert_status).inc() + if insert_status == "ok" and self.publish_lessons: + await self._publish_lesson(lesson) + + await msg.ack() + JS_MESSAGES_ACKED.inc() + except Exception as exc: + LESSONS_EXTRACTED.labels(status="err").inc() + logger.exception("message handling failed: %s", exc) + with contextlib.suppress(Exception): + await msg.nak() + + async def _seen_event(self, event_id: str) -> bool: + now = time.monotonic() + async with self._lock: + self._prune_seen(now) + seen_ts = self._seen_events.get(event_id) + if seen_ts is not None and (now - seen_ts) < self.event_dedup_ttl_s: + return True + self._seen_events[event_id] = now + self._seen_events.move_to_end(event_id, last=True) + while len(self._seen_events) > self.event_dedup_max: + self._seen_events.popitem(last=False) + return False + + def _prune_seen(self, now: float) -> None: + threshold = now - self.event_dedup_ttl_s + while self._seen_events: + _, ts = next(iter(self._seen_events.items())) + if ts >= threshold: + break + self._seen_events.popitem(last=False) + + def _should_keep(self, event: Dict[str, Any]) -> Tuple[bool, str]: + result = event.get("result") or {} + llm = event.get("llm") or {} + policy = event.get("policy") or {} + ok = bool(result.get("ok")) + status = _as_int(result.get("http_status"), 0) + latency_ms = _as_int(llm.get("latency_ms"), 0) + sowa_decision = str(policy.get("sowa_decision") or "").upper() + + if not ok: + return True, "error" + if self._is_anti_silent_gateway_event(event): + return True, "anti_silent_signal" + if sowa_decision == "SILENT": + return True, "silent" + if status >= 500: + return True, "http_5xx" + if latency_ms >= self.latency_spike_ms: + return True, "latency_spike" + if random.random() * 100.0 < self.ok_sample_pct: + return True, "ok_sample_in" + return False, "ok_sample_out" + + def _is_anti_silent_gateway_event(self, event: Dict[str, Any]) -> bool: + if not self.anti_silent_tuning_enabled: + return False + if str(event.get("source") or "").strip().lower() != "gateway": + return False + action = str(event.get("anti_silent_action") or "").strip().upper() + if action not in {"ACK_EMITTED", "ACK_SUPPRESSED_COOLDOWN"}: + return False + reason = str((event.get("policy") or {}).get("reason") or "").strip() + template_id = str(event.get("anti_silent_template") or "").strip() + return bool(reason and template_id) + + async def _extract_lessons(self, event: Dict[str, Any]) -> list[Dict[str, Any]]: + lessons: list[Dict[str, Any]] = [] + + tuning_lesson = await self._try_extract_anti_silent_tuning_lesson(event) + if tuning_lesson is not None: + lessons.append(tuning_lesson) + + operational_lesson = await self._try_extract_lesson(event) + if operational_lesson is not None: + lessons.append(operational_lesson) + + return lessons + + async def _try_extract_lesson(self, event: Dict[str, Any]) -> Optional[Dict[str, Any]]: + categories = self._lesson_categories(event) + if not categories: + return None + + for category in categories: + bucket_key = self._bucket_key(category, event) + count, ok_rate, p95_latency = self._update_bucket(bucket_key, event) + threshold = self._threshold_for(category) + if count < threshold: + continue + lesson = self._build_lesson( + category=category, + event=event, + count=count, + ok_rate=ok_rate, + p95_latency=p95_latency, + ) + return lesson + + return None + + async def _try_extract_anti_silent_tuning_lesson(self, event: Dict[str, Any]) -> Optional[Dict[str, Any]]: + if not self._is_anti_silent_gateway_event(event): + return None + if self._pool is None: + ANTI_SILENT_TUNING_EVALUATED.labels(status="pool_missing").inc() + return None + + policy = event.get("policy") or {} + reason = _safe_token(policy.get("reason")) + chat_type = _safe_token(event.get("chat_type")) + if not reason or not chat_type: + ANTI_SILENT_TUNING_EVALUATED.labels(status="missing_fields").inc() + return None + + stats = await self._anti_silent_stats(reason=reason, chat_type=chat_type) + if not stats: + ANTI_SILENT_TUNING_EVALUATED.labels(status="no_data").inc() + return None + + candidates = [item for item in stats if int(item.get("n") or 0) >= self.anti_silent_min_evidence] + if not candidates: + ANTI_SILENT_TUNING_EVALUATED.labels(status="insufficient_evidence").inc() + return None + + best = max(candidates, key=lambda item: (float(item.get("score", 0.0)), int(item.get("n", 0)))) + best_score = float(best.get("score", 0.0)) + if best_score < self.anti_silent_min_score: + ANTI_SILENT_TUNING_EVALUATED.labels(status="below_score").inc() + return None + + worst = min(candidates, key=lambda item: (float(item.get("score", 0.0)), -int(item.get("n", 0)))) + best_template = str(best.get("template_id") or "").strip().upper() + if not best_template: + ANTI_SILENT_TUNING_EVALUATED.labels(status="bad_template").inc() + return None + + trigger = f"reason={reason};chat_type={chat_type}" + action = f"prefer_template={best_template}" + avoid = "" + worst_template = str(worst.get("template_id") or "").strip().upper() + if worst_template and worst_template != best_template: + avoid = f"avoid_template={worst_template}" + if not avoid: + avoid = "avoid_template=none" + + lesson_type = "anti_silent_tuning" + lesson_key_raw = "|".join([lesson_type, trigger, action]) + lesson_key = hashlib.sha256(lesson_key_raw.encode("utf-8")).hexdigest() + now_dt = datetime.now(timezone.utc) + expires_at = (now_dt + timedelta(days=self.anti_silent_ttl_days)).isoformat().replace("+00:00", "Z") + evidence = { + "n_best": int(best.get("n") or 0), + "score_best": round(best_score, 6), + "retry_rate": round(float(best.get("retry_rate", 0.0)), 6), + "negative_rate": round(float(best.get("negative_rate", 0.0)), 6), + "suppressed_rate": round(float(best.get("suppressed_rate", 0.0)), 6), + "window_days": self.anti_silent_window_days, + "weights": { + "retry": self.anti_silent_weight_retry, + "negative": self.anti_silent_weight_negative, + "suppressed": self.anti_silent_weight_suppressed, + }, + "candidates": stats, + } + signals = { + "policy_reason": reason, + "chat_type": chat_type, + "lesson_type": lesson_type, + "trigger_kind": "anti_silent_ack_template", + } + lesson: Dict[str, Any] = { + "lesson_id": str(uuid.uuid4()), + "lesson_key": lesson_key, + "lesson_type": lesson_type, + "ts": now_dt.isoformat().replace("+00:00", "Z"), + "expires_at": expires_at, + "scope": "global", + "agent_id": None, + "task_type": "webhook", + "trigger": trigger, + "action": action, + "avoid": avoid, + "signals": signals, + "evidence": evidence, + } + lesson["raw"] = dict(lesson) + ANTI_SILENT_TUNING_EVALUATED.labels(status="ok").inc() + return lesson + + async def _anti_silent_stats(self, *, reason: str, chat_type: str) -> list[Dict[str, Any]]: + if self._pool is None: + return [] + query = """ + SELECT + COALESCE(raw->>'anti_silent_template', '') AS template_id, + COUNT(*)::int AS n, + AVG( + CASE + WHEN COALESCE(raw->'feedback'->>'user_signal', 'none') = 'retry' THEN 1.0 + ELSE 0.0 + END + )::float8 AS retry_rate, + AVG( + CASE + WHEN COALESCE(raw->'feedback'->>'user_signal', 'none') = 'negative' THEN 1.0 + ELSE 0.0 + END + )::float8 AS negative_rate, + AVG( + CASE + WHEN COALESCE(raw->>'anti_silent_action', '') = 'ACK_SUPPRESSED_COOLDOWN' THEN 1.0 + ELSE 0.0 + END + )::float8 AS suppressed_rate + FROM agent_experience_events + WHERE source = 'gateway' + AND ts >= (now() - ($1::int * interval '1 day')) + AND COALESCE(raw->'policy'->>'reason', '') = $2 + AND COALESCE(raw->>'chat_type', 'unknown') = $3 + AND COALESCE(raw->>'anti_silent_action', '') IN ('ACK_EMITTED', 'ACK_SUPPRESSED_COOLDOWN') + AND COALESCE(raw->>'anti_silent_template', '') <> '' + GROUP BY 1 + HAVING COUNT(*) >= $4 + """ + try: + async with self._pool.acquire() as conn: + rows = await conn.fetch( + query, + self.anti_silent_window_days, + reason, + chat_type, + self.anti_silent_min_evidence, + ) + except Exception as exc: + logger.warning("anti-silent stats query failed: %s", exc) + return [] + + results: list[Dict[str, Any]] = [] + for row in rows: + template_id = str(row.get("template_id") or "").strip().upper() + if not template_id: + continue + n = int(row.get("n") or 0) + retry_rate = float(row.get("retry_rate") or 0.0) + negative_rate = float(row.get("negative_rate") or 0.0) + suppressed_rate = float(row.get("suppressed_rate") or 0.0) + score = 1.0 - ( + self.anti_silent_weight_retry * retry_rate + + self.anti_silent_weight_negative * negative_rate + + self.anti_silent_weight_suppressed * suppressed_rate + ) + score = max(0.0, min(1.0, score)) + results.append( + { + "template_id": template_id, + "n": n, + "retry_rate": retry_rate, + "negative_rate": negative_rate, + "suppressed_rate": suppressed_rate, + "score": score, + } + ) + return results + + def _lesson_categories(self, event: Dict[str, Any]) -> list[str]: + result = event.get("result") or {} + llm = event.get("llm") or {} + policy = event.get("policy") or {} + categories: list[str] = [] + + if not bool(result.get("ok")): + categories.append("error_repeat") + if str(policy.get("sowa_decision") or "").upper() == "SILENT": + categories.append("silent_repeat") + if _as_int(llm.get("latency_ms"), 0) >= self.latency_spike_ms: + categories.append("latency_spike") + return categories + + def _bucket_key(self, category: str, event: Dict[str, Any]) -> str: + llm = event.get("llm") or {} + result = event.get("result") or {} + policy = event.get("policy") or {} + parts = [ + category, + str(event.get("agent_id") or ""), + str(event.get("task_type") or "infer"), + str(result.get("error_class") or ""), + str(policy.get("reason") or ""), + str(llm.get("provider") or ""), + str(llm.get("model") or ""), + str(llm.get("profile") or ""), + ] + return "|".join(parts) + + def _update_bucket(self, bucket_key: str, event: Dict[str, Any]) -> Tuple[int, Optional[float], Optional[int]]: + now = time.monotonic() + llm = event.get("llm") or {} + result = event.get("result") or {} + sample = EventSample( + ts_mono=now, + ok=bool(result.get("ok")), + latency_ms=_as_int(llm.get("latency_ms"), 0), + ) + bucket = self._buckets.get(bucket_key) + if bucket is None: + bucket = deque() + self._buckets[bucket_key] = bucket + bucket.append(sample) + + cutoff = now - self.window_s + while bucket and bucket[0].ts_mono < cutoff: + bucket.popleft() + + if not bucket: + return 0, None, None + + count = len(bucket) + ok_count = sum(1 for item in bucket if item.ok) + ok_rate = round(ok_count / count, 4) if count > 0 else None + latencies = sorted(item.latency_ms for item in bucket) + p95_latency = _p95(latencies) + return count, ok_rate, p95_latency + + def _threshold_for(self, category: str) -> int: + if category == "error_repeat": + return self.error_threshold + if category == "silent_repeat": + return self.silent_threshold + return self.latency_threshold + + def _build_lesson( + self, + category: str, + event: Dict[str, Any], + count: int, + ok_rate: Optional[float], + p95_latency: Optional[int], + ) -> Dict[str, Any]: + llm = event.get("llm") or {} + result = event.get("result") or {} + policy = event.get("policy") or {} + agent_id = str(event.get("agent_id") or "").strip() or None + task_type = str(event.get("task_type") or "infer") + error_class = _safe_token(result.get("error_class")) + policy_reason = _safe_token(policy.get("reason")) + sowa_decision = _safe_token(policy.get("sowa_decision")) + + if category == "silent_repeat": + trigger = "Frequent SILENT policy outcomes on active conversation flow." + action = "Use short ACK/CHALLENGE clarification before silencing response." + avoid = "Avoid immediate SILENT when user intent might target the agent." + elif category == "latency_spike": + trigger = "Repeated latency spikes above configured SLA threshold." + action = "Prefer faster model/profile and reduce expensive tool rounds." + avoid = "Avoid routing to slow provider/profile for same task pattern." + else: + trigger = f"Repeated inference failures of class '{error_class or 'unknown_error'}'." + action = "Switch to stable provider/profile and constrain optional tool calls." + avoid = "Avoid blind retries on the same failing route." + + scope = "agent" if agent_id else "global" + lesson_key_raw = "|".join( + [ + scope, + str(agent_id or ""), + trigger, + action, + avoid, + str(error_class or ""), + str(policy_reason or ""), + ] + ) + lesson_key = hashlib.sha256(lesson_key_raw.encode("utf-8")).hexdigest() + + lesson = { + "lesson_id": str(uuid.uuid4()), + "lesson_key": lesson_key, + "ts": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "scope": scope, + "agent_id": agent_id, + "task_type": task_type, + "trigger": trigger, + "action": action, + "avoid": avoid, + "signals": { + "policy_reason": policy_reason, + "policy_decision": sowa_decision, + "error_class": error_class, + "provider": _safe_token(llm.get("provider")), + "model": _safe_token(llm.get("model")), + "profile": _safe_token(llm.get("profile")), + }, + "evidence": { + "count": count, + "ok_rate": ok_rate, + "p95_latency_ms": p95_latency, + }, + } + lesson["raw"] = dict(lesson) + return lesson + + async def _insert_lesson(self, lesson: Dict[str, Any]) -> str: + if self._pool is None: + return "err" + query_insert = """ + INSERT INTO agent_lessons ( + lesson_id, + lesson_key, + ts, + scope, + agent_id, + task_type, + trigger, + action, + avoid, + signals, + evidence, + raw + ) VALUES ( + $1::uuid, + $2, + $3::timestamptz, + $4, + $5, + $6, + $7, + $8, + $9, + $10::jsonb, + $11::jsonb, + $12::jsonb + ) + ON CONFLICT (lesson_key) DO NOTHING + RETURNING id + """ + query_tuning_upsert = """ + INSERT INTO agent_lessons ( + lesson_id, + lesson_key, + ts, + scope, + agent_id, + task_type, + trigger, + action, + avoid, + signals, + evidence, + raw + ) VALUES ( + $1::uuid, + $2, + $3::timestamptz, + $4, + $5, + $6, + $7, + $8, + $9, + $10::jsonb, + $11::jsonb, + $12::jsonb + ) + ON CONFLICT (lesson_key) DO UPDATE SET + ts = EXCLUDED.ts, + scope = EXCLUDED.scope, + agent_id = EXCLUDED.agent_id, + task_type = EXCLUDED.task_type, + trigger = EXCLUDED.trigger, + action = EXCLUDED.action, + avoid = EXCLUDED.avoid, + signals = EXCLUDED.signals, + evidence = EXCLUDED.evidence, + raw = EXCLUDED.raw + RETURNING id + """ + + try: + lesson_id = uuid.UUID(str(lesson["lesson_id"])) + ts_value = _as_timestamptz(lesson["ts"]) + lesson_type = str(lesson.get("lesson_type") or "").strip().lower() + query = query_tuning_upsert if lesson_type == "anti_silent_tuning" else query_insert + async with self._pool.acquire() as conn: + row_id = await conn.fetchval( + query, + lesson_id, + lesson["lesson_key"], + ts_value, + lesson["scope"], + lesson.get("agent_id"), + lesson["task_type"], + lesson["trigger"], + lesson["action"], + lesson["avoid"], + json.dumps(lesson["signals"], ensure_ascii=False), + json.dumps(lesson["evidence"], ensure_ascii=False), + json.dumps(lesson, ensure_ascii=False), + ) + if row_id is None: + return "conflict" + return "ok" + except Exception as exc: + logger.warning("insert lesson failed: %s", exc) + return "err" + + async def _publish_lesson(self, lesson: Dict[str, Any]) -> None: + if self._js is None: + LESSON_PUBLISH.labels(status="skipped").inc() + return + payload = json.dumps(lesson, ensure_ascii=False).encode("utf-8") + headers = {"Nats-Msg-Id": str(lesson["lesson_id"])} + try: + await self._js.publish(self.lesson_subject, payload, headers=headers) + LESSON_PUBLISH.labels(status="ok").inc() + except Exception as exc: + LESSON_PUBLISH.labels(status="err").inc() + logger.warning("publish lesson failed: %s", exc) + + async def health(self) -> Dict[str, Any]: + return { + "status": "ok" if self._running else "starting", + "node_id": self.node_id, + "stream": self.stream_name, + "subject": self.subject, + "durable": self.durable, + "nats_connected": self._nc is not None and self._nc.is_connected, + "db_connected": self._pool is not None, + "running": self._running, + } + + +def _safe_token(value: Any) -> Optional[str]: + if value is None: + return None + text = str(value) + text = re.sub(r"(?i)bearer\s+[A-Za-z0-9._-]+", "bearer [redacted]", text) + text = re.sub(r"(?i)(api[_-]?key|token|password|secret)\s*[:=]\s*[^\s,;]+", r"\1=[redacted]", text) + text = re.sub(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", "[redacted-email]", text) + text = re.sub(r"https?://[^\s]+", "[redacted-url]", text) + text = re.sub(r"\s+", " ", text).strip() + return text[:180] if text else None + + +def _as_int(value: Any, default: int) -> int: + try: + return int(value) + except Exception: + return default + + +def _as_timestamptz(value: Any) -> datetime: + if isinstance(value, datetime): + return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc) + try: + parsed = datetime.fromisoformat(str(value).replace("Z", "+00:00")) + return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc) + except Exception: + return datetime.now(timezone.utc) + + +def _p95(sorted_values: list[int]) -> Optional[int]: + if not sorted_values: + return None + idx = int(round(0.95 * (len(sorted_values) - 1))) + return sorted_values[min(max(idx, 0), len(sorted_values) - 1)] + +app = FastAPI(title="Experience Learner") +learner = ExperienceLearner() + + +@app.on_event("startup") +async def startup() -> None: + await learner.start() + + +@app.on_event("shutdown") +async def shutdown() -> None: + await learner.stop() + + +@app.get("/health") +async def health() -> Dict[str, Any]: + return await learner.health() + + +@app.get("/metrics") +async def metrics() -> Response: + return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) diff --git a/services/experience-learner/requirements.txt b/services/experience-learner/requirements.txt new file mode 100644 index 00000000..517e921e --- /dev/null +++ b/services/experience-learner/requirements.txt @@ -0,0 +1,5 @@ +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +nats-py==2.6.0 +asyncpg>=0.29.0 +prometheus-client>=0.20.0 diff --git a/services/memory-service/app/database.py b/services/memory-service/app/database.py index 65551815..4244b443 100644 --- a/services/memory-service/app/database.py +++ b/services/memory-service/app/database.py @@ -443,6 +443,9 @@ class Database: ) -> Dict[str, Any]: """Create or update a user fact (isolated by agent_id)""" import json + # Normalize NULL to empty string so ON CONFLICT matches (PostgreSQL: NULL != NULL in unique) + _agent_id = (agent_id or "").strip() + _team_id = team_id or "" # Convert dict to JSON string for asyncpg JSONB json_value = json.dumps(fact_value_json) if fact_value_json else None @@ -457,7 +460,7 @@ class Database: fact_value_json = EXCLUDED.fact_value_json, updated_at = NOW() RETURNING * - """, user_id, team_id, agent_id, fact_key, fact_value, json_value) + """, user_id, _team_id, _agent_id, fact_key, fact_value, json_value) except asyncpg.exceptions.InvalidColumnReferenceError: # Backward compatibility for DBs that only have UNIQUE(user_id, team_id, fact_key). row = await conn.fetchrow(""" @@ -470,7 +473,7 @@ class Database: fact_value_json = EXCLUDED.fact_value_json, updated_at = NOW() RETURNING * - """, user_id, team_id, agent_id, fact_key, fact_value, json_value) + """, user_id, _team_id, _agent_id, fact_key, fact_value, json_value) return dict(row) if row else {} diff --git a/services/router/agent_metrics.py b/services/router/agent_metrics.py index 29b38dda..53562281 100644 --- a/services/router/agent_metrics.py +++ b/services/router/agent_metrics.py @@ -209,6 +209,57 @@ if PROMETHEUS_AVAILABLE: registry=REGISTRY ) + # ==================== EXPERIENCE BUS METRICS ==================== + + EXPERIENCE_PUBLISHED = Counter( + 'experience_published_total', + 'Total experience events publish attempts', + ['source', 'transport', 'status'], # transport: jetstream|core|none + registry=REGISTRY + ) + + EXPERIENCE_DB_INSERT = Counter( + 'experience_db_insert_total', + 'Total experience event DB insert attempts', + ['source', 'status'], # status: ok|error|skipped + registry=REGISTRY + ) + + EXPERIENCE_DEDUP_DROPPED = Counter( + 'experience_dedup_dropped_total', + 'Total experience events dropped by dedup', + ['source'], + registry=REGISTRY + ) + + EXPERIENCE_SAMPLED = Counter( + 'experience_sampled_total', + 'Total experience events sampled in/out', + ['source', 'decision', 'reason'], # decision: in|out + registry=REGISTRY + ) + + LESSONS_RETRIEVED = Counter( + 'lessons_retrieved_total', + 'Total lessons retrieval attempts', + ['status'], # status: ok|timeout|err + registry=REGISTRY + ) + + LESSONS_ATTACHED = Counter( + 'lessons_attached_total', + 'Total lessons attached buckets', + ['count'], # count: 0|1-3|4-7 + registry=REGISTRY + ) + + LESSONS_ATTACH_LATENCY = Histogram( + 'lessons_attach_latency_ms', + 'Lessons retrieval latency in milliseconds', + buckets=(1, 2, 5, 10, 25, 50, 100, 250, 500, 1000, 2500), + registry=REGISTRY + ) + # ==================== METRIC HELPERS ==================== @@ -357,6 +408,61 @@ def track_agent_request(agent_id: str, operation: str): return decorator +def inc_experience_published(source: str, transport: str, status: str) -> None: + if not PROMETHEUS_AVAILABLE: + return + EXPERIENCE_PUBLISHED.labels(source=source, transport=transport, status=status).inc() + + +def inc_experience_db_insert(source: str, status: str) -> None: + if not PROMETHEUS_AVAILABLE: + return + EXPERIENCE_DB_INSERT.labels(source=source, status=status).inc() + + +def inc_experience_dedup_dropped(source: str) -> None: + if not PROMETHEUS_AVAILABLE: + return + EXPERIENCE_DEDUP_DROPPED.labels(source=source).inc() + + +def inc_experience_sampled(source: str, decision: str, reason: str) -> None: + if not PROMETHEUS_AVAILABLE: + return + EXPERIENCE_SAMPLED.labels(source=source, decision=decision, reason=reason).inc() + + +def inc_lessons_retrieved(status: str) -> None: + if not PROMETHEUS_AVAILABLE: + return + LESSONS_RETRIEVED.labels(status=status).inc() + + +def inc_lessons_attached(count: int) -> None: + if not PROMETHEUS_AVAILABLE: + return + try: + n = int(count) + except Exception: + n = 0 + if n <= 0: + bucket = "0" + elif n <= 3: + bucket = "1-3" + else: + bucket = "4-7" + LESSONS_ATTACHED.labels(count=bucket).inc() + + +def observe_lessons_attach_latency(latency_ms: float) -> None: + if not PROMETHEUS_AVAILABLE: + return + try: + LESSONS_ATTACH_LATENCY.observe(float(latency_ms)) + except Exception: + return + + # ==================== GPU METRICS COLLECTOR ==================== async def collect_gpu_metrics(node_id: str = "node1"): diff --git a/services/router/experience_bus.py b/services/router/experience_bus.py new file mode 100644 index 00000000..87bd61ee --- /dev/null +++ b/services/router/experience_bus.py @@ -0,0 +1,446 @@ +"""Router experience event bus (Phase-1). + +Collects inference outcome events, applies sampling + dedup, then +persists to JetStream and Postgres in async background worker. +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone +import json +import logging +import os +import random +import re +import time +import uuid +from collections import OrderedDict +from dataclasses import dataclass +from typing import Any, Dict, Optional, Tuple + +try: + import asyncpg +except ImportError: # pragma: no cover - runtime dependency in container + asyncpg = None + +try: + from agent_metrics import ( + inc_experience_db_insert, + inc_experience_dedup_dropped, + inc_experience_published, + inc_experience_sampled, + ) +except Exception: # pragma: no cover - keep router resilient + def inc_experience_published(*args: Any, **kwargs: Any) -> None: # type: ignore[override] + return None + + def inc_experience_db_insert(*args: Any, **kwargs: Any) -> None: # type: ignore[override] + return None + + def inc_experience_dedup_dropped(*args: Any, **kwargs: Any) -> None: # type: ignore[override] + return None + + def inc_experience_sampled(*args: Any, **kwargs: Any) -> None: # type: ignore[override] + return None + + +logger = logging.getLogger("experience_bus") + + +@dataclass +class ExperienceDecision: + keep: bool + reason: str + + +class ExperienceBus: + def __init__(self) -> None: + self.enabled = os.getenv("EXPERIENCE_BUS_ENABLED", "true").lower() in {"1", "true", "yes"} + self.node_id = os.getenv("NODE_ID", "NODA1") + + self.ok_sample_pct = float(os.getenv("EXPERIENCE_OK_SAMPLE_PCT", "10")) + self.latency_spike_ms = int(os.getenv("EXPERIENCE_LATENCY_SPIKE_MS", "5000")) + self.dedup_window_s = int(os.getenv("EXPERIENCE_DEDUP_WINDOW_SECONDS", "900")) + self.dedup_max_keys = int(os.getenv("EXPERIENCE_DEDUP_MAX_KEYS", "20000")) + + self.queue_max = int(os.getenv("EXPERIENCE_QUEUE_MAX", "2000")) + self.publish_timeout_s = float(os.getenv("EXPERIENCE_PUBLISH_TIMEOUT_MS", "800") or 800) / 1000.0 + self.db_timeout_s = float(os.getenv("EXPERIENCE_DB_TIMEOUT_MS", "1200") or 1200) / 1000.0 + + self.subject_prefix = os.getenv("EXPERIENCE_SUBJECT_PREFIX", "agent.experience.v1") + self.stream_name = os.getenv("EXPERIENCE_STREAM_NAME", "EXPERIENCE") + self.enable_nats = os.getenv("EXPERIENCE_ENABLE_NATS", "true").lower() in {"1", "true", "yes"} + self.enable_db = os.getenv("EXPERIENCE_ENABLE_DB", "true").lower() in {"1", "true", "yes"} + + self.db_dsn = os.getenv("EXPERIENCE_DATABASE_URL") or os.getenv("DATABASE_URL") + + self._queue: asyncio.Queue[Optional[Dict[str, Any]]] = asyncio.Queue(maxsize=self.queue_max) + self._worker_task: Optional[asyncio.Task[Any]] = None + self._running = False + + self._dedup_lock = asyncio.Lock() + self._dedup: "OrderedDict[str, float]" = OrderedDict() + + self._pool: Optional[Any] = None + self._nc: Any = None + self._js: Any = None + + async def start(self, nats_client: Any = None) -> None: + if not self.enabled: + logger.info("ExperienceBus disabled by env") + return + if self._running: + return + + if self.enable_db: + await self._init_db() + + if self.enable_nats and nats_client is not None: + await self.set_nats_client(nats_client) + + self._running = True + self._worker_task = asyncio.create_task(self._worker(), name="experience-bus-worker") + logger.info( + "ExperienceBus started (db=%s nats=%s queue_max=%s sample_ok=%s%% dedup_window=%ss)", + bool(self._pool), + bool(self._js or self._nc), + self.queue_max, + self.ok_sample_pct, + self.dedup_window_s, + ) + + async def stop(self) -> None: + if not self._running: + return + self._running = False + + try: + self._queue.put_nowait(None) + except asyncio.QueueFull: + pass + + if self._worker_task is not None: + try: + await asyncio.wait_for(self._worker_task, timeout=5.0) + except Exception: + self._worker_task.cancel() + self._worker_task = None + + if self._pool is not None: + try: + await self._pool.close() + except Exception as e: # pragma: no cover + logger.debug("ExperienceBus pool close error: %s", e) + self._pool = None + + self._js = None + self._nc = None + logger.info("ExperienceBus stopped") + + async def set_nats_client(self, nats_client: Any) -> None: + if not self.enabled or not self.enable_nats: + return + self._nc = nats_client + if self._nc is None: + self._js = None + return + try: + self._js = self._nc.jetstream() + await self._ensure_stream() + except Exception as e: + self._js = None + logger.warning("ExperienceBus JetStream unavailable: %s", e) + + async def capture(self, event: Dict[str, Any]) -> None: + """Apply sampling/dedup and enqueue for async persistence.""" + if not self.enabled or not self._running: + return + + decision = await self._decide(event) + if not decision.keep: + if decision.reason == "dedup": + inc_experience_dedup_dropped(source="router") + inc_experience_sampled(source="router", decision="out", reason=decision.reason) + return + + inc_experience_sampled(source="router", decision="in", reason=decision.reason) + + try: + self._queue.put_nowait(event) + except asyncio.QueueFull: + inc_experience_sampled(source="router", decision="out", reason="queue_full") + logger.warning("ExperienceBus queue full; dropping event") + + async def _init_db(self) -> None: + if asyncpg is None: + logger.warning("ExperienceBus DB disabled: asyncpg not installed") + return + if not self.db_dsn: + logger.warning("ExperienceBus DB disabled: DATABASE_URL missing") + return + try: + self._pool = await asyncpg.create_pool(self.db_dsn, min_size=1, max_size=3) + except Exception as e: + self._pool = None + logger.warning("ExperienceBus DB pool init failed: %s", e) + + async def _ensure_stream(self) -> None: + if self._js is None: + return + subjects = [f"{self.subject_prefix}.>"] + try: + info = await self._js.stream_info(self.stream_name) + stream_subjects = set(getattr(info.config, "subjects", []) or []) + if not stream_subjects.intersection(subjects): + logger.warning( + "ExperienceBus stream '%s' exists without subject %s; keeping as-is", + self.stream_name, + subjects[0], + ) + return + except Exception: + pass + + try: + await self._js.add_stream(name=self.stream_name, subjects=subjects) + logger.info("ExperienceBus stream ensured: %s subjects=%s", self.stream_name, subjects) + except Exception as e: + logger.warning("ExperienceBus stream ensure failed: %s", e) + + async def _decide(self, event: Dict[str, Any]) -> ExperienceDecision: + result = event.get("result") or {} + llm = event.get("llm") or {} + ok = bool(result.get("ok")) + latency_ms = int(llm.get("latency_ms") or 0) + + if not ok: + sample_reason = "error" + elif latency_ms >= self.latency_spike_ms: + sample_reason = "latency_spike" + else: + if random.random() * 100.0 >= self.ok_sample_pct: + return ExperienceDecision(False, "ok_sample_out") + sample_reason = "ok_sample_in" + + dedup_key = self._dedup_key(event) + now = time.monotonic() + + async with self._dedup_lock: + self._prune_dedup(now) + seen_at = self._dedup.get(dedup_key) + if seen_at is not None and (now - seen_at) < self.dedup_window_s: + return ExperienceDecision(False, "dedup") + self._dedup[dedup_key] = now + self._dedup.move_to_end(dedup_key, last=True) + while len(self._dedup) > self.dedup_max_keys: + self._dedup.popitem(last=False) + + return ExperienceDecision(True, sample_reason) + + def _prune_dedup(self, now: float) -> None: + if not self._dedup: + return + threshold = now - self.dedup_window_s + while self._dedup: + _, ts = next(iter(self._dedup.items())) + if ts >= threshold: + break + self._dedup.popitem(last=False) + + def _dedup_key(self, event: Dict[str, Any]) -> str: + result = event.get("result") or {} + return "|".join( + [ + str(event.get("agent_id") or ""), + str(event.get("task_type") or ""), + str(event.get("inputs_hash") or ""), + "1" if bool(result.get("ok")) else "0", + str(result.get("error_class") or ""), + ] + ) + + async def _worker(self) -> None: + while True: + event = await self._queue.get() + if event is None: + self._queue.task_done() + break + try: + await self._persist_event(event) + except Exception as e: # pragma: no cover + logger.warning("ExperienceBus persist error: %s", e) + finally: + self._queue.task_done() + + async def _persist_event(self, event: Dict[str, Any]) -> None: + await self._publish_nats(event) + await self._insert_db(event) + + async def _publish_nats(self, event: Dict[str, Any]) -> None: + subject = f"{self.subject_prefix}.{event.get('agent_id', 'unknown')}" + payload = json.dumps(event, ensure_ascii=False).encode("utf-8") + msg_id = str(event.get("event_id") or "").strip() + headers = {"Nats-Msg-Id": msg_id} if msg_id else None + + if self._js is not None: + try: + await asyncio.wait_for( + self._js.publish(subject, payload, headers=headers), + timeout=self.publish_timeout_s, + ) + inc_experience_published(source="router", transport="jetstream", status="ok") + return + except Exception as e: + inc_experience_published(source="router", transport="jetstream", status="error") + logger.debug("ExperienceBus JetStream publish failed: %s", e) + + if self._nc is not None: + try: + await asyncio.wait_for( + self._nc.publish(subject, payload, headers=headers), + timeout=self.publish_timeout_s, + ) + await asyncio.wait_for(self._nc.flush(), timeout=self.publish_timeout_s) + inc_experience_published(source="router", transport="core", status="ok") + return + except Exception as e: + inc_experience_published(source="router", transport="core", status="error") + logger.debug("ExperienceBus core NATS publish failed: %s", e) + + inc_experience_published(source="router", transport="none", status="skipped") + + async def _insert_db(self, event: Dict[str, Any]) -> None: + if self._pool is None: + inc_experience_db_insert(source="router", status="skipped") + return + + payload_json = json.dumps(event, ensure_ascii=False) + llm = event.get("llm") or {} + result = event.get("result") or {} + event_uuid = _as_uuid(event.get("event_id")) + event_ts = _as_timestamptz(event.get("ts")) + + query = """ + INSERT INTO agent_experience_events ( + event_id, + ts, + node_id, + source, + agent_id, + task_type, + request_id, + channel, + inputs_hash, + provider, + model, + profile, + latency_ms, + tokens_in, + tokens_out, + ok, + error_class, + error_msg_redacted, + http_status, + raw + ) VALUES ( + $1::uuid, + $2::timestamptz, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13, + $14, + $15, + $16, + $17, + $18, + $19, + $20::jsonb + ) + ON CONFLICT (event_id) DO NOTHING + """ + + try: + async with self._pool.acquire() as conn: + await asyncio.wait_for( + conn.execute( + query, + event_uuid, + event_ts, + event.get("node_id"), + event.get("source"), + event.get("agent_id"), + event.get("task_type"), + event.get("request_id"), + event.get("channel", "unknown"), + event.get("inputs_hash"), + llm.get("provider", "unknown"), + llm.get("model", "unknown"), + llm.get("profile"), + int(llm.get("latency_ms") or 0), + _as_int_or_none(llm.get("tokens_in")), + _as_int_or_none(llm.get("tokens_out")), + bool(result.get("ok")), + result.get("error_class"), + result.get("error_msg_redacted"), + int(result.get("http_status") or 0), + payload_json, + ), + timeout=self.db_timeout_s, + ) + inc_experience_db_insert(source="router", status="ok") + except Exception as e: + inc_experience_db_insert(source="router", status="error") + logger.debug("ExperienceBus DB insert failed: %s", e) + + +def redact_error_message(value: Optional[str]) -> Optional[str]: + if value is None: + return None + text = str(value) + text = re.sub(r"(?i)(authorization\s*:\s*bearer)\s+[A-Za-z0-9._-]+", r"\1 [redacted]", text) + text = re.sub(r"(?i)(api[_-]?key|token|password|secret)\s*[:=]\s*[^\s,;]+", r"\1=[redacted]", text) + text = re.sub(r"\b[A-Za-z0-9_\-]{24,}\b", "[redacted]", text) + text = re.sub(r"\s+", " ", text).strip() + if len(text) > 300: + return text[:300] + return text + + +def normalize_input_for_hash(text: str) -> str: + value = re.sub(r"\s+", " ", (text or "").strip()).lower() + return value[:4000] + + +def _as_int_or_none(value: Any) -> Optional[int]: + try: + if value is None: + return None + return int(value) + except Exception: + return None + + +def _as_uuid(value: Any) -> uuid.UUID: + try: + return uuid.UUID(str(value)) + except Exception: + return uuid.uuid4() + + +def _as_timestamptz(value: Any) -> datetime: + if isinstance(value, datetime): + return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc) + try: + parsed = datetime.fromisoformat(str(value).replace("Z", "+00:00")) + return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc) + except Exception: + return datetime.now(timezone.utc) diff --git a/services/router/main.py b/services/router/main.py index ff833086..91f3d190 100644 --- a/services/router/main.py +++ b/services/router/main.py @@ -1,10 +1,12 @@ from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse, Response from pydantic import BaseModel, ConfigDict -from typing import Literal, Optional, Dict, Any, List +from typing import Literal, Optional, Dict, Any, List, Tuple import asyncio +from collections import OrderedDict import json import os +import random as random_module import re import yaml import httpx @@ -12,6 +14,8 @@ import logging import hashlib import hmac import time # For latency metrics +import uuid +from datetime import datetime, timezone, timedelta from difflib import SequenceMatcher # CrewAI Integration @@ -62,6 +66,34 @@ except ImportError: global_capabilities_client = None # type: ignore[assignment] offload_client = None # type: ignore[assignment] +try: + from experience_bus import ExperienceBus, normalize_input_for_hash, redact_error_message + EXPERIENCE_BUS_AVAILABLE = True +except ImportError: + EXPERIENCE_BUS_AVAILABLE = False + ExperienceBus = None # type: ignore[assignment] + +try: + import asyncpg +except ImportError: + asyncpg = None # type: ignore[assignment] + +try: + from agent_metrics import ( + inc_lessons_retrieved, + inc_lessons_attached, + observe_lessons_attach_latency, + ) +except Exception: + def inc_lessons_retrieved(*args: Any, **kwargs: Any) -> None: + return None + + def inc_lessons_attached(*args: Any, **kwargs: Any) -> None: + return None + + def observe_lessons_attach_latency(*args: Any, **kwargs: Any) -> None: + return None + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) NEO4J_NOTIFICATIONS_LOG_LEVEL = os.getenv("NEO4J_NOTIFICATIONS_LOG_LEVEL", "ERROR").strip().upper() @@ -71,6 +103,29 @@ logging.getLogger("neo4j.notifications").setLevel(_neo4j_notifications_level) # If auto-router module is unavailable (or loaded later), inference must still work. SOFIIA_AUTO_ROUTER_AVAILABLE = False + +def _parse_agent_id_set(raw_value: Optional[str], default_csv: str = "") -> set[str]: + source = raw_value if (raw_value is not None and str(raw_value).strip() != "") else default_csv + out: set[str] = set() + for part in str(source or "").split(","): + token = part.strip().lower() + if token: + out.add(token) + return out + + +PLANNED_AGENT_IDS = _parse_agent_id_set(os.getenv("PLANNED_AGENT_IDS"), "aistalk") +DISABLED_AGENT_IDS = _parse_agent_id_set(os.getenv("DISABLED_AGENT_IDS"), "devtools") + + +def _inactive_agent_state(agent_id: str) -> Optional[str]: + aid = str(agent_id or "").strip().lower() + if aid in PLANNED_AGENT_IDS: + return "planned" + if aid in DISABLED_AGENT_IDS: + return "disabled" + return None + TRUSTED_DOMAINS_CONFIG_PATH = os.getenv("TRUSTED_DOMAINS_CONFIG_PATH", "./trusted_domains.yml") _trusted_domains_cache: Dict[str, Any] = {"mtime": None, "data": {}} @@ -894,6 +949,287 @@ def _select_default_llm(agent_id: str, metadata: Dict[str, Any], base_llm: str, return use_llm return base_llm + +def _safe_json_from_bytes(payload: bytes) -> Dict[str, Any]: + if not payload: + return {} + try: + decoded = payload.decode("utf-8", errors="ignore").strip() + if not decoded: + return {} + value = json.loads(decoded) + if isinstance(value, dict): + return value + except Exception: + return {} + return {} + + +def _extract_infer_agent_id(path: str) -> Optional[str]: + match = _INFER_PATH_RE.match(path or "") + if not match: + return None + return (match.group(1) or "").strip().lower() or None + + +def _infer_channel_from_metadata(metadata: Dict[str, Any]) -> str: + channel = str( + metadata.get("channel") + or metadata.get("channel_type") + or metadata.get("source") + or metadata.get("entrypoint") + or "unknown" + ).strip().lower() + if channel in {"telegram", "web", "api"}: + return channel + return "unknown" + + +def _derive_provider_from_backend_model(backend: str, model: str, profile: Optional[str]) -> str: + profiles = (router_config or {}).get("llm_profiles", {}) if isinstance(router_config, dict) else {} + if profile and isinstance(profiles, dict): + p = profiles.get(profile, {}) + if isinstance(p, dict) and p.get("provider"): + return str(p.get("provider")) + + b = str(backend or "").lower() + m = str(model or "").lower() + if "mistral" in b: + return "mistral" + if "deepseek" in b: + return "deepseek" + if "grok" in b: + return "grok" + if "anthropic" in b or "claude" in b: + return "anthropic" + if "openai" in b: + return "openai" + if "glm" in b: + return "glm" + if "nats-offload" in b: + return "remote" + if "ollama" in b or "local" in b: + return "local" + if any(m.startswith(prefix) for prefix in ("qwen", "gemma", "mistral", "deepseek", "glm")): + return "local" + return "other" + + +def _resolve_profile_for_event(agent_id: str, req_payload: Dict[str, Any]) -> Optional[str]: + if not isinstance(router_config, dict): + return None + metadata = req_payload.get("metadata") + if not isinstance(metadata, dict): + metadata = {} + agent_cfg = (router_config.get("agents") or {}).get(agent_id, {}) + if not isinstance(agent_cfg, dict): + return None + base_llm = str(agent_cfg.get("default_llm") or "").strip() + if not base_llm: + return None + rules = router_config.get("routing") or [] + if isinstance(rules, list): + return _select_default_llm(agent_id, metadata, base_llm, rules) + return base_llm + + +def _lesson_guarded_text(value: Any, max_len: int = 220) -> str: + text = re.sub(r"\s+", " ", str(value or "")).strip() + if not text: + return "" + lower = text.lower() + if any(marker in lower for marker in LESSONS_INJECTION_GUARDS): + return "" + if len(text) > max_len: + text = text[:max_len].rstrip() + return text + + +def _decode_lesson_signals(raw: Any) -> Dict[str, Any]: + if isinstance(raw, dict): + return dict(raw) + if isinstance(raw, str): + try: + parsed = json.loads(raw) + if isinstance(parsed, dict): + return parsed + except Exception: + return {} + return {} + + +def _score_lesson_record( + row: Dict[str, Any], + *, + agent_id: str, + provider: str, + model: str, + profile: str, + last_error_class: Optional[str], +) -> float: + score = 0.0 + row_agent_id = str(row.get("agent_id") or "").strip().lower() + if row_agent_id and row_agent_id == agent_id: + score += 3.0 + + signals = _decode_lesson_signals(row.get("signals")) + signal_error = str(signals.get("error_class") or "").strip().lower() + if last_error_class and signal_error and signal_error == last_error_class.lower(): + score += 2.0 + + signal_provider = str(signals.get("provider") or "").strip().lower() + signal_model = str(signals.get("model") or "").strip().lower() + signal_profile = str(signals.get("profile") or "").strip().lower() + if provider and signal_provider and signal_provider == provider: + score += 1.0 + if model and signal_model and signal_model == model: + score += 1.0 + if profile and signal_profile and signal_profile == profile: + score += 1.0 + + row_ts = row.get("ts") + if isinstance(row_ts, datetime): + dt = row_ts if row_ts.tzinfo else row_ts.replace(tzinfo=timezone.utc) + age_hours = max(0.0, (datetime.now(timezone.utc) - dt).total_seconds() / 3600.0) + score -= min(2.0, age_hours / 168.0) # down-rank lessons older than ~7 days + + return score + + +def _render_operational_lessons(lessons: List[Dict[str, Any]], max_chars: int) -> str: + if not lessons: + return "" + lines = ["Operational Lessons (apply if relevant):"] + for idx, lesson in enumerate(lessons, start=1): + trigger = _lesson_guarded_text(lesson.get("trigger"), max_len=220) + action = _lesson_guarded_text(lesson.get("action"), max_len=220) + avoid = _lesson_guarded_text(lesson.get("avoid"), max_len=220) + if not trigger or not action or not avoid: + continue + chunk = f"{idx}) Trigger: {trigger}\n Do: {action}\n Avoid: {avoid}" + candidate = "\n".join(lines + [chunk]) + if len(candidate) > max_chars: + break + lines.append(chunk) + + if len(lines) <= 1: + return "" + return "\n".join(lines) + + +async def _update_last_infer_signal(agent_id: str, *, ok: bool, error_class: Optional[str], latency_ms: int) -> None: + key = str(agent_id or "").strip().lower() + if not key: + return + now = time.monotonic() + async with _lessons_signal_lock: + _lessons_signal_cache[key] = { + "ok": bool(ok), + "error_class": str(error_class or "").strip() or None, + "latency_ms": int(max(0, latency_ms)), + "seen_at": now, + } + _lessons_signal_cache.move_to_end(key, last=True) + threshold = now - max(30, LESSONS_SIGNAL_CACHE_TTL_SECONDS) + stale_keys = [k for k, v in _lessons_signal_cache.items() if float(v.get("seen_at", 0.0)) < threshold] + for stale_key in stale_keys: + _lessons_signal_cache.pop(stale_key, None) + while len(_lessons_signal_cache) > 4000: + _lessons_signal_cache.popitem(last=False) + + +async def _get_last_infer_signal(agent_id: str) -> Optional[Dict[str, Any]]: + key = str(agent_id or "").strip().lower() + if not key: + return None + now = time.monotonic() + async with _lessons_signal_lock: + value = _lessons_signal_cache.get(key) + if not value: + return None + age = now - float(value.get("seen_at", 0.0)) + if age > LESSONS_SIGNAL_CACHE_TTL_SECONDS: + _lessons_signal_cache.pop(key, None) + return None + return dict(value) + + +async def _fetch_ranked_lessons( + *, + agent_id: str, + provider: str, + model: str, + profile: str, + last_error_class: Optional[str], + limit: int, +) -> Tuple[List[Dict[str, Any]], str, int]: + if lessons_db_pool is None: + return [], "err", 0 + + query = """ + SELECT lesson_key, ts, scope, agent_id, task_type, trigger, action, avoid, signals + FROM agent_lessons + WHERE (agent_id = $1 OR agent_id IS NULL) + AND task_type = 'infer' + ORDER BY (agent_id = $1) DESC, ts DESC + LIMIT 50 + """ + + started = time.time() + try: + async with lessons_db_pool.acquire() as conn: + rows = await asyncio.wait_for( + conn.fetch(query, str(agent_id).strip().lower()), + timeout=LESSONS_ATTACH_TIMEOUT_MS / 1000.0, + ) + except asyncio.TimeoutError: + elapsed = max(0, int((time.time() - started) * 1000)) + return [], "timeout", elapsed + except Exception as e: + logger.debug("Lessons retrieval failed: %s", e) + elapsed = max(0, int((time.time() - started) * 1000)) + return [], "err", elapsed + + ranked: List[Tuple[float, datetime, Dict[str, Any]]] = [] + for row in rows: + row_data = dict(row) + lesson = { + "lesson_key": row_data.get("lesson_key"), + "ts": row_data.get("ts"), + "scope": row_data.get("scope"), + "agent_id": row_data.get("agent_id"), + "task_type": row_data.get("task_type"), + "trigger": row_data.get("trigger"), + "action": row_data.get("action"), + "avoid": row_data.get("avoid"), + "signals": _decode_lesson_signals(row_data.get("signals")), + } + + if not ( + _lesson_guarded_text(lesson.get("trigger")) + and _lesson_guarded_text(lesson.get("action")) + and _lesson_guarded_text(lesson.get("avoid")) + ): + continue + + score = _score_lesson_record( + lesson, + agent_id=agent_id, + provider=(provider or "").strip().lower(), + model=(model or "").strip().lower(), + profile=(profile or "").strip().lower(), + last_error_class=last_error_class, + ) + ts = lesson.get("ts") + if not isinstance(ts, datetime): + ts = datetime.now(timezone.utc) - timedelta(days=365) + ranked.append((score, ts, lesson)) + + ranked.sort(key=lambda item: (item[0], item[1]), reverse=True) + selected = [item[2] for item in ranked[: max(1, limit)]] + elapsed = max(0, int((time.time() - started) * 1000)) + return selected, "ok", elapsed + app = FastAPI(title="DAARION Router", version="2.0.0") # Configuration @@ -907,6 +1243,27 @@ VISION_URL = os.getenv("VISION_URL", "http://host.docker.internal:11434") OCR_URL = os.getenv("OCR_URL", "http://swapper-service:8890") DOCUMENT_URL = os.getenv("DOCUMENT_URL", "http://swapper-service:8890") CITY_SERVICE_URL = os.getenv("CITY_SERVICE_URL", "http://daarion-city-service:7001") +LESSONS_ATTACH_ENABLED = os.getenv("LESSONS_ATTACH_ENABLED", "true").lower() in {"1", "true", "yes"} +LESSONS_ATTACH_MIN = max(1, int(os.getenv("LESSONS_ATTACH_MIN", "3"))) +LESSONS_ATTACH_MAX = max(LESSONS_ATTACH_MIN, int(os.getenv("LESSONS_ATTACH_MAX", "7"))) +LESSONS_ATTACH_TIMEOUT_MS = max(5, int(os.getenv("LESSONS_ATTACH_TIMEOUT_MS", "25"))) +LESSONS_ATTACH_SAMPLE_PCT = max(0.0, min(100.0, float(os.getenv("LESSONS_ATTACH_SAMPLE_PCT", "10")))) +LESSONS_ATTACH_MAX_CHARS = max(400, int(os.getenv("LESSONS_ATTACH_MAX_CHARS", "1200"))) +LESSONS_SIGNAL_CACHE_TTL_SECONDS = max(30, int(os.getenv("LESSONS_SIGNAL_CACHE_TTL_SECONDS", "300"))) +LESSONS_LATENCY_SPIKE_MS = max(250, int(os.getenv("EXPERIENCE_LATENCY_SPIKE_MS", "5000"))) +LESSONS_DATABASE_URL = ( + os.getenv("LESSONS_DATABASE_URL") + or os.getenv("EXPERIENCE_DATABASE_URL") + or os.getenv("DATABASE_URL") +) + +LESSONS_INJECTION_GUARDS = ( + "ignore previous", + "ignore all previous", + "system:", + "developer:", + "```", +) # CrewAI Routing Configuration CREWAI_ROUTING_ENABLED = os.getenv("CREWAI_ROUTING_ENABLED", "true").lower() == "true" @@ -947,6 +1304,12 @@ nats_available = False # Tool Manager tool_manager = None runtime_guard_engine = None +experience_bus = None +lessons_db_pool = None +_lessons_signal_cache: "OrderedDict[str, Dict[str, Any]]" = OrderedDict() +_lessons_signal_lock = asyncio.Lock() + +_INFER_PATH_RE = re.compile(r"^/v1/agents/([^/]+)/infer/?$") # Models class FilterDecision(BaseModel): @@ -999,10 +1362,146 @@ def load_router_config(): config = load_config() router_config = load_router_config() + +@app.middleware("http") +async def experience_capture_middleware(request: Request, call_next): + """Capture /infer outcomes and emit ExperienceEvent asynchronously.""" + infer_agent_id = _extract_infer_agent_id(request.url.path) + if ( + not infer_agent_id + or request.method.upper() != "POST" + or not EXPERIENCE_BUS_AVAILABLE + or experience_bus is None + ): + return await call_next(request) + + started_at = time.time() + req_body = await request.body() + + async def _receive() -> Dict[str, Any]: + return {"type": "http.request", "body": req_body, "more_body": False} + + wrapped_request = Request(request.scope, _receive) + + response = None + response_body = b"" + status_code = 500 + caught_exc: Optional[Exception] = None + + try: + response = await call_next(wrapped_request) + status_code = int(response.status_code) + chunks: List[bytes] = [] + async for chunk in response.body_iterator: + chunks.append(chunk) + response_body = b"".join(chunks) + except Exception as exc: # pragma: no cover - defensive capture path + caught_exc = exc + status_code = 500 + + latency_ms = max(0, int((time.time() - started_at) * 1000)) + + try: + req_payload = _safe_json_from_bytes(req_body) + resp_payload = _safe_json_from_bytes(response_body) + metadata = req_payload.get("metadata") + if not isinstance(metadata, dict): + metadata = {} + + prompt = str(req_payload.get("prompt") or "") + normalized_input = normalize_input_for_hash(prompt) + inputs_hash = hashlib.sha256(normalized_input.encode("utf-8")).hexdigest() + + profile = _resolve_profile_for_event(infer_agent_id, req_payload) + profile_cfg = {} + if profile and isinstance(router_config, dict): + profile_cfg = (router_config.get("llm_profiles") or {}).get(profile, {}) or {} + if not isinstance(profile_cfg, dict): + profile_cfg = {} + + model = str(resp_payload.get("model") or profile_cfg.get("model") or "unknown") + backend = str(resp_payload.get("backend") or "") + provider = _derive_provider_from_backend_model(backend, model, profile) + + tokens_total = resp_payload.get("tokens_used") + tokens_out = int(tokens_total) if isinstance(tokens_total, int) else None + request_id = str( + metadata.get("request_id") + or metadata.get("trace_id") + or request.headers.get("x-request-id") + or "" + ).strip() or None + + err_class: Optional[str] = None + err_msg: Optional[str] = None + detail_obj = resp_payload.get("detail") + if caught_exc is not None: + err_class = type(caught_exc).__name__ + err_msg = str(caught_exc) + elif status_code >= 400: + if isinstance(detail_obj, dict): + err_class = str(detail_obj.get("code") or detail_obj.get("error_class") or f"http_{status_code}") + err_msg = str(detail_obj.get("message") or detail_obj.get("detail") or json.dumps(detail_obj)) + elif isinstance(detail_obj, str): + err_class = f"http_{status_code}" + err_msg = detail_obj + else: + err_class = f"http_{status_code}" + err_msg = f"http_status={status_code}" + + await _update_last_infer_signal( + infer_agent_id, + ok=status_code < 400, + error_class=err_class, + latency_ms=latency_ms, + ) + + event = { + "event_id": str(uuid.uuid4()), + "ts": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "node_id": os.getenv("NODE_ID", "NODA1"), + "source": "router", + "agent_id": infer_agent_id, + "request_id": request_id, + "channel": _infer_channel_from_metadata(metadata), + "task_type": "infer", + "inputs_hash": inputs_hash, + "llm": { + "provider": provider, + "model": model, + "profile": profile, + "latency_ms": latency_ms, + "tokens_in": None, + "tokens_out": tokens_out, + }, + "result": { + "ok": status_code < 400, + "error_class": err_class, + "error_msg_redacted": redact_error_message(err_msg), + "http_status": status_code, + }, + } + await experience_bus.capture(event) + except Exception as exp_err: + logger.debug("Experience capture skipped: %s", exp_err) + + if caught_exc is not None: + raise caught_exc + + headers = dict(response.headers) if response is not None else {} + headers.pop("content-length", None) + return Response( + content=response_body, + status_code=status_code, + headers=headers, + media_type=response.media_type if response is not None else "application/json", + background=response.background if response is not None else None, + ) + @app.on_event("startup") async def startup_event(): """Initialize NATS connection and subscriptions""" - global nc, nats_available, http_client, neo4j_driver, neo4j_available, runtime_guard_engine + global nc, nats_available, http_client, neo4j_driver, neo4j_available, runtime_guard_engine, experience_bus, lessons_db_pool logger.info("🚀 DAGI Router v2.0.0 starting up...") # Initialize HTTP client @@ -1041,6 +1540,34 @@ async def startup_event(): logger.warning(f"⚠️ NATS not available: {e}") logger.warning("⚠️ Running in test mode (HTTP only)") nats_available = False + + # Initialize Experience Bus (Phase-1) + if EXPERIENCE_BUS_AVAILABLE and ExperienceBus is not None: + try: + experience_bus = ExperienceBus() + await experience_bus.start(nats_client=nc if nats_available else None) + logger.info("✅ Experience Bus initialized") + except Exception as e: + experience_bus = None + logger.warning(f"⚠️ Experience Bus init failed: {e}") + + # Initialize lessons retrieval pool (Phase-3 read path) + if LESSONS_ATTACH_ENABLED: + if asyncpg is None: + logger.warning("⚠️ Lessons attach enabled but asyncpg is unavailable") + elif not LESSONS_DATABASE_URL: + logger.warning("⚠️ Lessons attach enabled but LESSONS_DATABASE_URL is missing") + else: + try: + lessons_db_pool = await asyncpg.create_pool( + LESSONS_DATABASE_URL, + min_size=1, + max_size=3, + ) + logger.info("✅ Lessons DB pool initialized") + except Exception as e: + lessons_db_pool = None + logger.warning(f"⚠️ Lessons DB pool init failed: {e}") # Initialize Memory Retrieval Pipeline if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval: @@ -1765,6 +2292,24 @@ async def agent_infer(agent_id: str, request: InferRequest): """ logger.info(f"🔀 Inference request for agent: {agent_id}") logger.info(f"📝 Prompt: {request.prompt[:100]}...") + + inactive_state = _inactive_agent_state(agent_id) + if inactive_state is not None: + status_code = 410 if inactive_state == "planned" else 404 + logger.info( + "⛔ Agent unavailable by lifecycle state: agent=%s state=%s", + agent_id, + inactive_state, + ) + raise HTTPException( + status_code=status_code, + detail={ + "code": f"agent_{inactive_state}", + "agent_id": str(agent_id).strip().lower(), + "state": inactive_state, + "message": "Agent is not active in this environment", + }, + ) # ========================================================================= # MEMORY RETRIEVAL (v4.0 - Universal for all agents) @@ -2682,23 +3227,77 @@ async def agent_infer(agent_id: str, request: InferRequest): # SMART LLM ROUTER WITH AUTO-FALLBACK # Priority: DeepSeek → Mistral → Grok → Local Ollama # ========================================================================= - + + lessons_block = "" + lessons_attached_count = 0 + if LESSONS_ATTACH_ENABLED and not request.images: + retrieval_always_on = False + retrieval_limit = LESSONS_ATTACH_MIN + last_signal = await _get_last_infer_signal(request_agent_id) + last_error_class = None + if last_signal: + last_error_class = last_signal.get("error_class") + if (not bool(last_signal.get("ok", True))) or int(last_signal.get("latency_ms", 0) or 0) >= LESSONS_LATENCY_SPIKE_MS: + retrieval_always_on = True + retrieval_limit = LESSONS_ATTACH_MAX + + should_retrieve = retrieval_always_on or (random_module.random() * 100.0 < LESSONS_ATTACH_SAMPLE_PCT) + if should_retrieve: + lessons_rows, retrieval_status, retrieval_latency_ms = await _fetch_ranked_lessons( + agent_id=request_agent_id, + provider=str(provider or "").strip().lower(), + model=str(model or "").strip().lower(), + profile=str(default_llm or "").strip().lower(), + last_error_class=str(last_error_class or "").strip() or None, + limit=retrieval_limit, + ) + inc_lessons_retrieved(status=retrieval_status) + observe_lessons_attach_latency(latency_ms=float(retrieval_latency_ms)) + + if retrieval_status == "ok" and lessons_rows: + selected_lessons = lessons_rows[:retrieval_limit] + lessons_block = _render_operational_lessons(selected_lessons, LESSONS_ATTACH_MAX_CHARS) + if lessons_block: + lessons_attached_count = len(selected_lessons) + logger.info( + "🧠 lessons_attached=%s agent=%s mode=%s", + lessons_attached_count, + request_agent_id, + "always_on" if retrieval_always_on else "sampled", + ) + inc_lessons_attached(count=lessons_attached_count) + # Build messages array once for all providers messages = [] if system_prompt: + combined_parts: List[str] = [system_prompt] if memory_brief_text: - enhanced_prompt = f"{system_prompt}\n\n[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}" - messages.append({"role": "system", "content": enhanced_prompt}) - logger.info(f"📝 Added system message with prompt ({len(system_prompt)} chars) + memory ({len(memory_brief_text)} chars)") - else: - messages.append({"role": "system", "content": system_prompt}) - logger.info(f"📝 Added system message with prompt ({len(system_prompt)} chars)") - elif memory_brief_text: - messages.append({"role": "system", "content": f"[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}"}) - logger.warning(f"⚠️ No system_prompt! Using only memory brief ({len(memory_brief_text)} chars)") + combined_parts.append(f"[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}") + if lessons_block: + combined_parts.append(f"[OPERATIONAL LESSONS - INTERNAL]\n{lessons_block}") + enhanced_prompt = "\n\n".join(combined_parts) + messages.append({"role": "system", "content": enhanced_prompt}) + logger.info( + "📝 Added system message prompt=%s memory=%s lessons=%s", + len(system_prompt), + len(memory_brief_text or ""), + lessons_attached_count, + ) + elif memory_brief_text or lessons_block: + fallback_parts: List[str] = [] + if memory_brief_text: + fallback_parts.append(f"[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}") + if lessons_block: + fallback_parts.append(f"[OPERATIONAL LESSONS - INTERNAL]\n{lessons_block}") + messages.append({"role": "system", "content": "\n\n".join(fallback_parts)}) + logger.warning( + "⚠️ No system_prompt! Using fallback context memory=%s lessons=%s", + len(memory_brief_text or ""), + lessons_attached_count, + ) else: - logger.error(f"❌ No system_prompt AND no memory_brief! LLM will have no context!") - + logger.error("❌ No system_prompt, memory_brief, or lessons; LLM will have no context") + messages.append({"role": "user", "content": request.prompt}) logger.debug(f"📨 Messages array: {len(messages)} messages, system={len(messages[0].get('content', '')) if messages else 0} chars") @@ -4555,7 +5154,7 @@ async def sofiia_model_catalog(refresh_ollama: bool = False): @app.on_event("shutdown") async def shutdown_event(): """Cleanup connections on shutdown""" - global neo4j_driver, http_client, nc + global neo4j_driver, http_client, nc, experience_bus, lessons_db_pool # Close Memory Retrieval if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval: @@ -4576,3 +5175,17 @@ async def shutdown_event(): if nc: await nc.close() logger.info("🔌 NATS connection closed") + + if EXPERIENCE_BUS_AVAILABLE and experience_bus: + try: + await experience_bus.stop() + logger.info("🔌 Experience Bus closed") + except Exception as e: + logger.warning(f"⚠️ Experience Bus close error: {e}") + + if lessons_db_pool is not None: + try: + await lessons_db_pool.close() + logger.info("🔌 Lessons DB pool closed") + except Exception as e: + logger.warning(f"⚠️ Lessons DB pool close error: {e}")