7 Commits

26 changed files with 2845 additions and 23 deletions

View File

@@ -100,8 +100,11 @@ jobs:
"${SSH_USER}@${SSH_HOST}" \
"set -euo pipefail; \
cd /opt/microdao-daarion; \
origin_url=\$(git remote get-url origin 2>/dev/null || true); \
if [ -n \"\$(git status --porcelain)\" ]; then \
echo 'WARN: dirty git tree on NODA1; skip checkout/pull and continue with gate'; \
elif ! printf '%s' \"\$origin_url\" | grep -Eq 'daarion-admin/microdao-daarion(\\.git)?$'; then \
echo \"WARN: origin remote (\$origin_url) is not deploy-safe; skip checkout/pull and continue with gate\"; \
else \
git fetch origin; \
git checkout '${DEPLOY_REF:-main}'; \

View File

@@ -14,6 +14,7 @@ services:
- ROUTER_CONFIG_PATH=/app/router_config.yaml
- LOG_LEVEL=info
- NODE_ID=noda1
- EXPERIENCE_DATABASE_URL=postgresql://daarion:DaarionDB2026!@dagi-postgres:5432/daarion_memory
- MEMORY_SERVICE_URL=http://memory-service:8000
# Timeout policy: Gateway (180s) > Router (60s) > LLM (30s)
- ROUTER_TIMEOUT=180
@@ -64,7 +65,9 @@ services:
- ${DEPLOY_ROOT:-.}/gateway-bot:/app/prompts:ro
- ${DEPLOY_ROOT:-.}/logs:/app/logs
networks:
- dagi-network
dagi-network:
aliases:
- dagi-router
restart: unless-stopped
extra_hosts:
- "host.docker.internal:host-gateway"
@@ -179,7 +182,18 @@ services:
- BUILD_SHA=${BUILD_SHA:-dev}
- BUILD_TIME=${BUILD_TIME:-local}
- NODE_ID=NODA1
- ROUTER_URL=${ROUTER_URL:-http://dagi-staging-router:8000}
- ROUTER_URL=${ROUTER_URL:-http://dagi-router:8000}
- NATS_URL=nats://nats:4222
- EXPERIENCE_BUS_ENABLED=true
- EXPERIENCE_ENABLE_NATS=true
- EXPERIENCE_ENABLE_DB=true
- EXPERIENCE_STREAM_NAME=EXPERIENCE
- EXPERIENCE_SUBJECT_PREFIX=agent.experience.v1
- EXPERIENCE_DATABASE_URL=postgresql://daarion:DaarionDB2026!@dagi-postgres:5432/daarion_memory
- GATEWAY_USER_SIGNAL_RETRY_WINDOW_SECONDS=30
- ANTI_SILENT_TUNING_ENABLED=${ANTI_SILENT_TUNING_ENABLED:-false}
- ANTI_SILENT_TUNING_DB_TIMEOUT_MS=${ANTI_SILENT_TUNING_DB_TIMEOUT_MS:-40}
- ANTI_SILENT_TUNING_CACHE_TTL_SECONDS=${ANTI_SILENT_TUNING_CACHE_TTL_SECONDS:-60}
- GATEWAY_MAX_TOKENS_CONCISE=350
- GATEWAY_MAX_TOKENS_SENPAI_DEFAULT=700
- GATEWAY_MAX_TOKENS_DEFAULT=700
@@ -282,7 +296,7 @@ services:
container_name: dagi-gateway-worker-node1
command: ["python", "-m", "daarion_facade.worker"]
environment:
- ROUTER_BASE_URL=http://router:8000
- ROUTER_BASE_URL=http://dagi-router:8000
- REDIS_URL=redis://redis:6379/0
- ROUTER_WORKER_TIMEOUT=60
volumes:
@@ -350,6 +364,56 @@ services:
timeout: 5s
retries: 3
# Experience Learner (Phase-2): JetStream events -> aggregated lessons
experience-learner:
build:
context: ./services/experience-learner
dockerfile: Dockerfile
container_name: dagi-experience-learner-node1
ports:
- "127.0.0.1:9109:9109"
environment:
- NODE_ID=NODA1
- NATS_URL=nats://nats:4222
- EXPERIENCE_STREAM_NAME=EXPERIENCE
- EXPERIENCE_SUBJECT=agent.experience.v1.>
- EXPERIENCE_DURABLE=experience-learner-v1
- EXPERIENCE_DELIVER_POLICY=all
- EXPERIENCE_ACK_WAIT_SECONDS=30
- EXPERIENCE_MAX_DELIVER=20
- EXPERIENCE_FETCH_BATCH=64
- EXPERIENCE_FETCH_TIMEOUT_SECONDS=2
- EXPERIENCE_WINDOW_SECONDS=1800
- EXPERIENCE_OK_SAMPLE_PCT=10
- EXPERIENCE_LATENCY_SPIKE_MS=5000
- EXPERIENCE_ERROR_THRESHOLD=3
- EXPERIENCE_SILENT_THRESHOLD=5
- EXPERIENCE_LATENCY_THRESHOLD=3
- EXPERIENCE_EVENT_DEDUP_TTL_SECONDS=3600
- LEARNER_DATABASE_URL=postgresql://daarion:DaarionDB2026!@dagi-postgres:5432/daarion_memory
- LESSON_SUBJECT=agent.lesson.v1
- LESSON_PUBLISH_ENABLED=true
- ANTI_SILENT_TUNING_ENABLED=${ANTI_SILENT_TUNING_ENABLED:-true}
- ANTI_SILENT_TUNING_WINDOW_DAYS=${ANTI_SILENT_TUNING_WINDOW_DAYS:-7}
- ANTI_SILENT_TUNING_MIN_EVIDENCE=${ANTI_SILENT_TUNING_MIN_EVIDENCE:-20}
- ANTI_SILENT_TUNING_MIN_SCORE=${ANTI_SILENT_TUNING_MIN_SCORE:-0.75}
- ANTI_SILENT_TUNING_WEIGHT_RETRY=${ANTI_SILENT_TUNING_WEIGHT_RETRY:-0.6}
- ANTI_SILENT_TUNING_WEIGHT_NEGATIVE=${ANTI_SILENT_TUNING_WEIGHT_NEGATIVE:-0.3}
- ANTI_SILENT_TUNING_WEIGHT_SUPPRESSED=${ANTI_SILENT_TUNING_WEIGHT_SUPPRESSED:-0.1}
- ANTI_SILENT_TUNING_TTL_DAYS=${ANTI_SILENT_TUNING_TTL_DAYS:-7}
depends_on:
- nats
- dagi-postgres
networks:
- dagi-network
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "python -c \"import urllib.request; urllib.request.urlopen('http://localhost:9109/health')\""]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
metrics-poller-node1:
build:
@@ -1228,7 +1292,7 @@ services:
- SOFIIA_DATA_DIR=/data/sofiia
- NODES_REGISTRY_PATH=/config/nodes_registry.yml
- NODES_NODA1_SSH_PASSWORD=bRhfV7uNY9m6er
- ROUTER_URL=http://dagi-router-node1:8000
- ROUTER_URL=http://dagi-router:8000
- GATEWAY_URL=http://dagi-gateway-node1:9300
- MEMORY_SERVICE_URL=http://dagi-memory-service-node1:8000
- OLLAMA_URL=http://172.18.0.1:11434
@@ -1238,6 +1302,8 @@ services:
- SOFIIA_CONSOLE_API_KEY=${SOFIIA_CONSOLE_API_KEY:-}
- SOFIIA_CONSOLE_TEAM_KEYS=${SOFIIA_CONSOLE_TEAM_KEYS:-}
- SOFIIA_INTERNAL_TOKEN=${SOFIIA_INTERNAL_TOKEN:-}
# M3.1: control token for bridge→console runbook runs (separate from audit token)
- SOFIIA_CONTROL_TOKEN=${SOFIIA_CONTROL_TOKEN:-}
# aurora-service not deployed on NODA1 — set explicit URL to avoid DNS lookup failure
- AURORA_SERVICE_URL=http://127.0.0.1:9401
volumes:

View File

@@ -32,6 +32,7 @@ Required repo secrets:
- `redeploy_runtime=false` only syncs git on NODA1 and runs gate checks.
- `redeploy_runtime=true` recreates `gateway` and `experience-learner` containers.
- If NODA1 git tree is dirty, workflow skips checkout/pull and still enforces `phase6_gate` (safe mode for live nodes).
- If NODA1 `origin` remote is not the expected deploy-safe repo, workflow skips checkout/pull and still enforces `phase6_gate` (prevents accidental downgrade from a stale remote).
- Workflow uses SSH key validation and `IdentitiesOnly=yes` to avoid host key collisions.
## Expected PASS

View File

@@ -0,0 +1,61 @@
# Experience Bus Phase-1 (Router First)
## Scope
- Source: router `/v1/agents/{id}/infer`
- Event subject: `agent.experience.v1.<agent_id>`
- JetStream stream: `EXPERIENCE` (`agent.experience.v1.>`)
- DB table: `agent_experience_events` (append-only)
- Controls: dedup + sampling
## Env knobs
- `EXPERIENCE_BUS_ENABLED=true`
- `EXPERIENCE_ENABLE_NATS=true`
- `EXPERIENCE_ENABLE_DB=true`
- `EXPERIENCE_DATABASE_URL=postgresql://<user>:<pass>@<host>:5432/daarion_memory`
- `EXPERIENCE_OK_SAMPLE_PCT=10`
- `EXPERIENCE_LATENCY_SPIKE_MS=5000`
- `EXPERIENCE_DEDUP_WINDOW_SECONDS=900`
- `EXPERIENCE_QUEUE_MAX=2000`
## Deploy
1. Apply migration `migrations/054_agent_experience_events.sql`.
2. Deploy router with updated `main.py`, `experience_bus.py`, `agent_metrics.py`.
3. Restart router service.
## Smoke (30 calls)
```bash
for i in $(seq 1 15); do
curl -sS -X POST http://127.0.0.1:9102/v1/agents/agromatrix/infer \
-H 'content-type: application/json' \
-d "{\"prompt\":\"experience smoke agromatrix $i $(date +%s%N)\"}" >/dev/null
done
for i in $(seq 1 15); do
curl -sS -X POST http://127.0.0.1:9102/v1/agents/stepan/infer \
-H 'content-type: application/json' \
-d "{\"prompt\":\"experience smoke stepan $i $(date +%s%N)\"}" >/dev/null
done
```
## Verify JetStream
```bash
# example (inside nats container with nats CLI)
nats stream info EXPERIENCE
nats stream view EXPERIENCE --count 5
```
## Verify DB
```sql
SELECT count(*)
FROM agent_experience_events
WHERE ts > now() - interval '10 minutes';
```
## Verify lifecycle guard unchanged
```bash
curl -sS -o /dev/null -w '%{http_code}\n' -X POST http://127.0.0.1:9102/v1/agents/aistalk/infer -H 'content-type: application/json' -d '{"prompt":"ping"}'
# expected: 410
curl -sS -o /dev/null -w '%{http_code}\n' -X POST http://127.0.0.1:9102/v1/agents/devtools/infer -H 'content-type: application/json' -d '{"prompt":"ping"}'
# expected: 404
```

View File

@@ -0,0 +1,71 @@
# Experience Bus Phase-2 (Lessons Extractor)
## Scope
- Source stream: `EXPERIENCE`
- Source subjects: `agent.experience.v1.>`
- Consumer mode: durable pull + explicit ack
- Output table: `agent_lessons` (append-only)
- Output subject: `agent.lesson.v1` (optional publish)
## Service
- Container: `dagi-experience-learner-node1`
- Endpoint:
- `GET /health`
- `GET /metrics`
## Environment
- `NATS_URL=nats://nats:4222`
- `EXPERIENCE_STREAM_NAME=EXPERIENCE`
- `EXPERIENCE_SUBJECT=agent.experience.v1.>`
- `EXPERIENCE_DURABLE=experience-learner-v1`
- `EXPERIENCE_ACK_WAIT_SECONDS=30`
- `EXPERIENCE_MAX_DELIVER=20`
- `EXPERIENCE_FETCH_BATCH=64`
- `EXPERIENCE_FETCH_TIMEOUT_SECONDS=2`
- `EXPERIENCE_WINDOW_SECONDS=1800`
- `EXPERIENCE_OK_SAMPLE_PCT=10`
- `EXPERIENCE_LATENCY_SPIKE_MS=5000`
- `EXPERIENCE_ERROR_THRESHOLD=3`
- `EXPERIENCE_SILENT_THRESHOLD=5`
- `EXPERIENCE_LATENCY_THRESHOLD=3`
- `EXPERIENCE_EVENT_DEDUP_TTL_SECONDS=3600`
- `LEARNER_DATABASE_URL=postgresql://<user>:<pass>@<host>:5432/daarion_memory`
- `LESSON_SUBJECT=agent.lesson.v1`
- `LESSON_PUBLISH_ENABLED=true`
## Deploy
1. Apply migration `migrations/055_agent_lessons.sql`.
2. Deploy service `experience-learner`.
3. Verify service health and metrics.
## Smoke
```bash
# Generate event traffic (Phase-1 router path)
for i in $(seq 1 50); do
agent=$([ $((i%2)) -eq 0 ] && echo "aistalk" || echo "devtools")
curl -sS -m 8 -o /dev/null \
-X POST "http://127.0.0.1:9102/v1/agents/${agent}/infer" \
-H "content-type: application/json" \
-d "{\"prompt\":\"phase2-smoke-${agent}-${i}-$(date +%s%N)\"}" || true
done
```
## Verify
```bash
# Lessons rows
docker exec dagi-postgres psql -U daarion -d daarion_memory -tAc \
"SELECT count(*) FROM agent_lessons WHERE ts > now()-interval '30 minutes';"
# Idempotency check (run again, duplicates should not explode)
docker exec dagi-postgres psql -U daarion -d daarion_memory -tAc \
"SELECT count(*), count(distinct lesson_key) FROM agent_lessons;"
# Learner metrics
curl -sS http://127.0.0.1:9109/metrics | grep -E 'lessons_|js_messages_'
```
## Acceptance
- `agent_lessons` receives rows under live event flow.
- Reprocessing/redelivery does not duplicate lessons (`lesson_key` unique).
- `js_messages_acked_total` increases.
- `js_messages_redelivered_total` is observable when replay/redelivery occurs.

View File

@@ -0,0 +1,70 @@
# Experience Bus Phase-3 (Router Runtime Retrieval)
## Scope
- Read path only in `router` before `/v1/agents/{id}/infer`.
- Retrieves lessons from `agent_lessons` and injects a compact block:
- `Operational Lessons (apply if relevant)`
- Attach policy:
- after last error / latency spike: always-on, `K=7`
- otherwise sampled attach, default `10%`, `K=3`
## Environment
- `LESSONS_ATTACH_ENABLED=true`
- `LESSONS_DATABASE_URL=postgresql://<user>:<pass>@<host>:5432/daarion_memory`
- `LESSONS_ATTACH_MIN=3`
- `LESSONS_ATTACH_MAX=7`
- `LESSONS_ATTACH_SAMPLE_PCT=10`
- `LESSONS_ATTACH_TIMEOUT_MS=25`
- `LESSONS_ATTACH_MAX_CHARS=1200`
- `LESSONS_SIGNAL_CACHE_TTL_SECONDS=300`
- `EXPERIENCE_LATENCY_SPIKE_MS=5000`
## Metrics
- `lessons_retrieved_total{status="ok|timeout|err"}`
- `lessons_attached_total{count="0|1-3|4-7"}`
- `lessons_attach_latency_ms`
## Safety
- Lessons block never includes raw user text.
- Guard filters skip lessons containing prompt-injection-like markers:
- `ignore previous`, `system:`, `developer:`, fenced code blocks.
## Smoke
```bash
# 1) Seed synthetic lessons for one agent (example: agromatrix)
docker exec dagi-postgres psql -U daarion -d daarion_memory -c "
INSERT INTO agent_lessons (lesson_id, lesson_key, ts, scope, agent_id, task_type, trigger, action, avoid, signals, evidence, raw)
SELECT
gen_random_uuid(),
md5(random()::text || clock_timestamp()::text),
now() - (g * interval '1 minute'),
'agent',
'agromatrix',
'infer',
'when retrying after model timeout',
'switch provider or reduce token budget first',
'avoid repeating the same failed provider with same payload',
'{"error_class":"TimeoutError","provider":"deepseek","model":"deepseek-chat","profile":"reasoning"}'::jsonb,
'{"count":3}'::jsonb,
'{}'::jsonb
FROM generate_series(1,10) g;"
# 2) Send infer calls
for i in $(seq 1 20); do
curl -sS -m 12 -o /dev/null \
-X POST "http://127.0.0.1:9102/v1/agents/agromatrix/infer" \
-H "content-type: application/json" \
-d "{\"prompt\":\"phase3-smoke-${i}\",\"metadata\":{\"agent_id\":\"agromatrix\"}}" || true
done
# 3) Check metrics
curl -sS http://127.0.0.1:9102/metrics | grep -E 'lessons_retrieved_total|lessons_attached_total|lessons_attach_latency_ms'
# 4) Simulate DB issue (optional): lessons retrieval should fail-open and infer remains 200
# (temporarily point LESSONS_DATABASE_URL to bad DSN + restart router)
```
## Acceptance
- Router logs include `lessons_attached=<k>` during sampled or always-on retrieval.
- Infer path remains healthy when lessons DB is unavailable.
- p95 infer latency impact stays controlled at sampling `10%`.

View File

@@ -0,0 +1,99 @@
# Experience Bus Phase-4 (Gateway Hooks)
## Scope
- Source: `gateway` (Telegram webhook path).
- Emits `agent.experience.v1.<agent_id>` events with:
- `source="gateway"`
- `request_id`/`correlation_id`
- `policy.sowa_decision` + normalized `reason`
- `feedback.user_signal` (`none|positive|negative|retry|timeout`)
- Optional DB append to `agent_experience_events` (fail-open).
## Environment (gateway)
- `NATS_URL=nats://nats:4222`
- `EXPERIENCE_BUS_ENABLED=true`
- `EXPERIENCE_ENABLE_NATS=true`
- `EXPERIENCE_ENABLE_DB=true`
- `EXPERIENCE_STREAM_NAME=EXPERIENCE`
- `EXPERIENCE_SUBJECT_PREFIX=agent.experience.v1`
- `EXPERIENCE_DATABASE_URL=postgresql://<user>:<pass>@<host>:5432/daarion_memory`
- `GATEWAY_USER_SIGNAL_RETRY_WINDOW_SECONDS=30`
## Metrics
- `gateway_experience_published_total{status="ok|err"}`
- `gateway_policy_decisions_total{sowa_decision,reason}`
- `gateway_user_signal_total{user_signal}`
- `gateway_webhook_latency_ms`
## Correlation contract
- Gateway creates `request_id` (`correlation_id`) per webhook cycle.
- Gateway forwards it to router via:
- `metadata.request_id`
- `metadata.trace_id`
- `X-Request-Id` header
- Router writes same `request_id` in its event payload for join.
## Smoke
```bash
# 1) Send webhook payload (agent-specific endpoint)
curl -sS -X POST "http://127.0.0.1:9300/helion/telegram/webhook" \
-H "content-type: application/json" \
-d '{
"update_id": 900001,
"message": {
"message_id": 101,
"date": 1760000000,
"text": "дякую",
"chat": {"id": "smoke-chat-1", "type": "private"},
"from": {"id": 7001, "username": "smoke_user", "is_bot": false}
}
}'
# 2) Retry signal (same text quickly)
curl -sS -X POST "http://127.0.0.1:9300/helion/telegram/webhook" \
-H "content-type: application/json" \
-d '{
"update_id": 900002,
"message": {
"message_id": 102,
"date": 1760000005,
"text": "перевір",
"chat": {"id": "smoke-chat-1", "type": "private"},
"from": {"id": 7001, "username": "smoke_user", "is_bot": false}
}
}'
curl -sS -X POST "http://127.0.0.1:9300/helion/telegram/webhook" \
-H "content-type: application/json" \
-d '{
"update_id": 900003,
"message": {
"message_id": 103,
"date": 1760000010,
"text": "перевір",
"chat": {"id": "smoke-chat-1", "type": "private"},
"from": {"id": 7001, "username": "smoke_user", "is_bot": false}
}
}'
# 3) Verify metrics
curl -sS http://127.0.0.1:9300/metrics | grep -E 'gateway_experience_published_total|gateway_policy_decisions_total|gateway_user_signal_total|gateway_webhook_latency_ms'
# 4) Verify DB rows
docker exec dagi-postgres psql -U daarion -d daarion_memory -tAc \
"SELECT count(*) FROM agent_experience_events WHERE source='gateway' AND ts > now()-interval '10 minutes';"
# 5) Verify correlation join (gateway <-> router)
docker exec dagi-postgres psql -U daarion -d daarion_memory -P pager=off -c \
"SELECT source, agent_id, request_id, task_type, ts
FROM agent_experience_events
WHERE ts > now()-interval '10 minutes'
AND source IN ('gateway','router')
ORDER BY ts DESC LIMIT 40;"
```
## Acceptance
- Gateway publishes and stores events without blocking webhook path.
- `request_id` can join gateway and router records for same conversation turn.
- `policy.sowa_decision` and `feedback.user_signal` are present in gateway `raw` event.
- If NATS/DB unavailable, webhook still returns normal success path (fail-open telemetry).

View File

@@ -0,0 +1,18 @@
{
"update_id": 900002,
"message": {
"message_id": 1,
"date": 1760007001,
"chat": {
"id": 0,
"type": "private"
},
"from": {
"id": 0,
"is_bot": false,
"first_name": "Source",
"username": "lock_smoke"
},
"text": "source-lock-smoke"
}
}

View File

@@ -0,0 +1,13 @@
{
"update_id": 900001,
"inline_query": {
"id": "phase4-inline-1",
"query": "unsupported-smoke",
"offset": "",
"from": {
"id": 12345,
"is_bot": false,
"first_name": "Smoke"
}
}
}

View File

@@ -0,0 +1,17 @@
{
"update_id": 910002,
"message": {
"message_id": 1002,
"date": 1760010002,
"chat": {
"id": -1005001002,
"type": "group"
},
"from": {
"id": 551002,
"is_bot": false,
"username": "phase5_lock_user"
},
"text": "agromatrix, перевір lock smoke"
}
}

View File

@@ -0,0 +1,19 @@
{
"update_id": 910001,
"message": {
"message_id": 1001,
"date": 1760010001,
"chat": {
"id": -1005001001,
"type": "group"
},
"from": {
"id": 551001,
"is_bot": false,
"username": "phase5_group_user"
},
"sticker": {
"file_id": "dummy_sticker_file"
}
}
}

View File

@@ -0,0 +1,17 @@
{
"update_id": 910003,
"message": {
"message_id": 1003,
"date": 1760010003,
"chat": {
"id": 551003,
"type": "private"
},
"from": {
"id": 551003,
"is_bot": false,
"username": "phase5_private_user"
},
"text": "що на цьому фото?"
}
}

View File

@@ -0,0 +1,134 @@
# Phase-4.1 Gateway Early-Return Coverage
## Goal
Enforce the gateway telemetry invariant:
- 1 webhook call -> 1 `source="gateway"` event row.
- `request_id` always present.
- Early-return branches are emitted with deterministic reasons.
## Deploy
```bash
cd /opt/microdao-daarion
docker compose -f docker-compose.node1.yml up -d --no-deps --build --force-recreate gateway
```
## Seed / Precheck
```bash
export GATEWAY_WEBHOOK_URL='http://127.0.0.1:9300/agromatrix/telegram/webhook'
export PG_CONTAINER='dagi-postgres'
pre_rows_gateway=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \
"SELECT count(*) FROM agent_experience_events WHERE source='gateway' AND ts > now()-interval '10 minutes';")
pre_rows_join=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \
"SELECT count(*) FROM (
SELECT g.request_id
FROM agent_experience_events g
JOIN agent_experience_events r ON r.request_id=g.request_id
WHERE g.source='gateway' AND r.source='router' AND g.ts > now()-interval '10 minutes'
) x;")
pre_js=$(curl -sS http://127.0.0.1:8222/jsz?streams=true | python3 -c '
import json,sys
j=json.load(sys.stdin)
d=(j.get("account_details") or [{}])[0].get("stream_detail") or []
print(next((s.get("state",{}).get("messages",0) for s in d if s.get("name")=="EXPERIENCE"),0))
')
echo "pre_rows_gateway=$pre_rows_gateway"
echo "pre_rows_join=$pre_rows_join"
echo "pre_js=$pre_js"
```
## Fixed Payload Replay
### 1) Unsupported (`unsupported_no_message`)
Payload file:
- `docs/ops/payloads/phase4_1_payload_unsupported_no_message.json`
```bash
curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \
-H 'content-type: application/json' \
-d @docs/ops/payloads/phase4_1_payload_unsupported_no_message.json
```
### 2) Source-lock (`source_lock_duplicate_update`)
Payload file:
- `docs/ops/payloads/phase4_1_payload_source_lock.json`
```bash
# first request
curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \
-H 'content-type: application/json' \
-d @docs/ops/payloads/phase4_1_payload_source_lock.json
# duplicate replay (same update_id)
curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \
-H 'content-type: application/json' \
-d @docs/ops/payloads/phase4_1_payload_source_lock.json
```
## Assertions
### A) Row delta strictness
```bash
post_rows_gateway=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \
"SELECT count(*) FROM agent_experience_events WHERE source='gateway' AND ts > now()-interval '10 minutes';")
delta_rows=$((post_rows_gateway-pre_rows_gateway))
echo "delta_rows=$delta_rows"
# expected: delta_rows == 3 for this replay batch
```
### B) Deterministic reasons + unknown policy on unsupported
```bash
docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -P pager=off -c "
SELECT request_id,
raw->'policy'->>'sowa_decision' as sowa,
raw->'policy'->>'reason' as reason,
raw->'feedback'->>'user_signal' as user_signal,
raw->'result'->>'http_status' as http_status,
ts
FROM agent_experience_events
WHERE source='gateway' AND ts > now()-interval '10 minutes'
ORDER BY ts DESC LIMIT 20;
"
# expected in this batch:
# - reason=unsupported_no_message (1 row), policy.sowa_decision=UNKNOWN
# - reason=source_lock_duplicate_update (1 row)
# - first source-lock request usually reason=prober_request (or normal path reason)
```
### C) Metrics assertions
```bash
curl -sS http://127.0.0.1:9300/metrics | grep -E \
'gateway_experience_emitted_total|gateway_early_return_total|gateway_event_finalize_latency_ms|gateway_experience_published_total'
# expected:
# - gateway_experience_emitted_total{path="early_return",status="ok"} increments
# - gateway_early_return_total{reason="unsupported_no_message"} increments
# - gateway_early_return_total{reason="source_lock_duplicate_update"} increments
```
### D) Join sanity (normal path unaffected)
```bash
post_rows_join=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \
"SELECT count(*) FROM (
SELECT g.request_id
FROM agent_experience_events g
JOIN agent_experience_events r ON r.request_id=g.request_id
WHERE g.source='gateway' AND r.source='router' AND g.ts > now()-interval '10 minutes'
) x;")
echo "post_rows_join=$post_rows_join"
# expected: join remains non-zero for normal webhook traffic
```
## PASS Criteria
- Strict `delta_rows == N_webhooks` for replay batch.
- Exactly one `source='gateway'` row per webhook call.
- `request_id` present on all new rows.
- Early-return reasons are deterministic (`unsupported_no_message`, `source_lock_duplicate_update`).
- Metrics counters for early-return/finalize are incrementing.
- Normal gateway<->router join remains healthy.

View File

@@ -0,0 +1,136 @@
# Phase-5 Anti-Silent / Group UX
## Goal
Reduce user-facing silent outcomes in group/private chat flows while avoiding spam.
## Invariants
- `I1`: no silent user-facing failure for `public_active` agents (`SILENT`/early-return -> short ACK).
- `I2`: one-message rule per webhook in group chats.
- `I3`: debounce by `(chat_id, agent_id, reason)` with cooldown.
- `I4`: evidence in gateway event (`anti_silent_action`, `anti_silent_template`, `chat_type`).
## Deploy
```bash
cd /opt/microdao-daarion
docker compose -f docker-compose.node1.yml up -d --no-deps --build --force-recreate gateway
```
## Seed / Precheck
```bash
export GATEWAY_WEBHOOK_URL='http://127.0.0.1:9300/agromatrix/telegram/webhook'
export PG_CONTAINER='dagi-postgres'
pre_rows_gateway=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \
"SELECT count(*) FROM agent_experience_events WHERE source='gateway' AND ts > now()-interval '10 minutes';")
pre_rows_join=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \
"SELECT count(*) FROM (
SELECT g.request_id
FROM agent_experience_events g
JOIN agent_experience_events r ON r.request_id=g.request_id
WHERE g.source='gateway' AND r.source='router' AND g.ts > now()-interval '10 minutes'
) x;")
pre_js=$(curl -sS http://127.0.0.1:8222/jsz?streams=true | python3 -c '
import json,sys
j=json.load(sys.stdin)
d=(j.get("account_details") or [{}])[0].get("stream_detail") or []
print(next((s.get("state",{}).get("messages",0) for s in d if s.get("name")=="EXPERIENCE"),0))
')
echo "pre_rows_gateway=$pre_rows_gateway"
echo "pre_rows_join=$pre_rows_join"
echo "pre_js=$pre_js"
```
## Fixed Payload Replay
Payloads:
- `docs/ops/payloads/phase5_payload_group_unsupported_no_message.json`
- `docs/ops/payloads/phase5_payload_group_source_lock.json`
- `docs/ops/payloads/phase5_payload_private_photo_unsupported.json`
```bash
# 1) group unsupported_no_message
curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \
-H 'content-type: application/json' \
-d @docs/ops/payloads/phase5_payload_group_unsupported_no_message.json
# 2) group source_lock pair (same update_id)
curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \
-H 'content-type: application/json' \
-d @docs/ops/payloads/phase5_payload_group_source_lock.json
curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \
-H 'content-type: application/json' \
-d @docs/ops/payloads/phase5_payload_group_source_lock.json
# 3) private photo unsupported (photo follow-up without image context)
curl -sS -X POST "$GATEWAY_WEBHOOK_URL" \
-H 'content-type: application/json' \
-d @docs/ops/payloads/phase5_payload_private_photo_unsupported.json
```
## Assertions
### A) Strict row delta
```bash
post_rows_gateway=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \
"SELECT count(*) FROM agent_experience_events WHERE source='gateway' AND ts > now()-interval '10 minutes';")
delta_rows=$((post_rows_gateway-pre_rows_gateway))
echo "delta_rows=$delta_rows"
# expected: delta_rows == 4
```
### B) Anti-silent evidence in DB
```bash
docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -P pager=off -c "
SELECT request_id,
raw->>'chat_type' as chat_type,
raw->'policy'->>'sowa_decision' as sowa,
raw->'policy'->>'reason' as reason,
raw->>'anti_silent_action' as anti_silent_action,
raw->>'anti_silent_template' as anti_silent_template,
raw->'result'->>'http_status' as http_status,
ts
FROM agent_experience_events
WHERE source='gateway' AND agent_id='agromatrix' AND ts > now()-interval '10 minutes'
ORDER BY ts DESC LIMIT 25;
"
# expected:
# - reason=unsupported_no_message with sowa=UNKNOWN (group path)
# - reason=source_lock_duplicate_update with anti_silent_action in (ACK_EMITTED, ACK_SUPPRESSED_COOLDOWN)
# - reason=photo_followup_without_image_context with anti_silent_action=ACK_EMITTED
```
### C) Metrics
```bash
curl -sS http://127.0.0.1:9300/metrics | grep -E \
'gateway_anti_silent_total|gateway_ack_sent_total|gateway_experience_emitted_total|gateway_early_return_total'
# expected:
# - gateway_anti_silent_total{action="ACK_EMITTED",reason="...",chat_type="group|private"} increments
# - source-lock repeated request may produce ACK_SUPPRESSED_COOLDOWN depending on timing
```
### D) Join sanity (normal path still healthy)
```bash
post_rows_join=$(docker exec "$PG_CONTAINER" psql -U daarion -d daarion_memory -tAc \
"SELECT count(*) FROM (
SELECT g.request_id
FROM agent_experience_events g
JOIN agent_experience_events r ON r.request_id=g.request_id
WHERE g.source='gateway' AND r.source='router' AND g.ts > now()-interval '10 minutes'
) x;")
echo "post_rows_join=$post_rows_join"
# expected: non-zero with normal traffic
```
## PASS Criteria
- `delta_rows == 4` for fixed replay batch.
- Deterministic reason codes present (`unsupported_no_message`, `source_lock_duplicate_update`, `photo_followup_without_image_context`).
- `anti_silent_action` is present for anti-silent branches (`ACK_EMITTED` or `ACK_SUPPRESSED_COOLDOWN`).
- No evidence of double-event for one webhook request.

View File

@@ -28,7 +28,8 @@ RUN pip install --no-cache-dir \
pandas \
openpyxl \
python-docx \
redis==5.0.1
redis==5.0.1 \
asyncpg>=0.29.0
# Copy gateway code and DAARWIZZ prompt
COPY . .

View File

@@ -1,5 +1,7 @@
PyYAML>=6.0
prometheus-client>=0.20.0
PyPDF2>=3.0.0
nats-py>=2.6.0
asyncpg>=0.29.0
crewai

View File

@@ -75,6 +75,10 @@ async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]:
system_prompt = body.get("system_prompt") or context.get("system_prompt")
system_prompt = _apply_agent_style_guardrails(agent_id, system_prompt)
system_prompt = _apply_runtime_communication_guardrails(system_prompt, metadata)
request_id = str(metadata.get("request_id") or metadata.get("trace_id") or "").strip()
if request_id:
metadata["request_id"] = request_id
metadata["trace_id"] = request_id
if system_prompt:
logger.info(f"Using system prompt ({len(system_prompt)} chars) for agent {agent_id}")
@@ -124,7 +128,8 @@ async def send_to_router(body: Dict[str, Any]) -> Dict[str, Any]:
try:
async with httpx.AsyncClient(timeout=ROUTER_TIMEOUT) as client:
response = await client.post(infer_url, json=infer_body)
headers = {"X-Request-Id": request_id} if request_id else None
response = await client.post(infer_url, json=infer_body, headers=headers)
response.raise_for_status()
result = response.json()

View File

@@ -0,0 +1,38 @@
-- Phase-1: Router Experience Bus event store (append-only)
CREATE TABLE IF NOT EXISTS agent_experience_events (
id BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL UNIQUE,
ts TIMESTAMPTZ NOT NULL,
node_id TEXT NOT NULL,
source TEXT NOT NULL,
agent_id TEXT NOT NULL,
task_type TEXT NOT NULL,
request_id TEXT NULL,
channel TEXT NOT NULL DEFAULT 'unknown',
inputs_hash TEXT NOT NULL,
provider TEXT NOT NULL,
model TEXT NOT NULL,
profile TEXT NULL,
latency_ms INT NOT NULL,
tokens_in INT NULL,
tokens_out INT NULL,
ok BOOLEAN NOT NULL,
error_class TEXT NULL,
error_msg_redacted TEXT NULL,
http_status INT NOT NULL,
raw JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_agent_experience_events_agent_ts
ON agent_experience_events (agent_id, ts DESC);
CREATE INDEX IF NOT EXISTS idx_agent_experience_events_task_ts
ON agent_experience_events (task_type, ts DESC);
CREATE INDEX IF NOT EXISTS idx_agent_experience_events_hash_ts
ON agent_experience_events (inputs_hash, ts DESC);
CREATE INDEX IF NOT EXISTS idx_agent_experience_events_ok_ts
ON agent_experience_events (ok, ts DESC);

View File

@@ -0,0 +1,27 @@
-- Phase-2: Experience Learner lessons store (append-only)
CREATE TABLE IF NOT EXISTS agent_lessons (
id BIGSERIAL PRIMARY KEY,
lesson_id UUID NOT NULL UNIQUE,
lesson_key TEXT NOT NULL UNIQUE,
ts TIMESTAMPTZ NOT NULL,
scope TEXT NOT NULL,
agent_id TEXT NULL,
task_type TEXT NOT NULL,
trigger TEXT NOT NULL,
action TEXT NOT NULL,
avoid TEXT NOT NULL,
signals JSONB NOT NULL,
evidence JSONB NOT NULL,
raw JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_agent_lessons_scope_ts
ON agent_lessons (scope, ts DESC);
CREATE INDEX IF NOT EXISTS idx_agent_lessons_agent_ts
ON agent_lessons (agent_id, ts DESC);
CREATE INDEX IF NOT EXISTS idx_agent_lessons_task_ts
ON agent_lessons (task_type, ts DESC);

View File

@@ -0,0 +1,12 @@
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
EXPOSE 9109
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "9109"]

View File

@@ -0,0 +1,839 @@
from __future__ import annotations
import asyncio
import contextlib
import hashlib
import json
import logging
import os
import random
import re
import time
import uuid
from collections import OrderedDict, deque
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Deque, Dict, Optional, Tuple
import asyncpg
import nats
from fastapi import FastAPI, Response
from nats.aio.msg import Msg
from nats.js.api import AckPolicy, ConsumerConfig, DeliverPolicy
from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, generate_latest
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO").upper(),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("experience_learner")
LESSONS_EXTRACTED = Counter(
"lessons_extracted_total",
"Total lessons extracted from experience events",
["status"],
)
LESSONS_INSERT = Counter(
"lessons_insert_total",
"Total lesson insert attempts",
["status"],
)
JS_MESSAGES_ACKED = Counter(
"js_messages_acked_total",
"Total JetStream messages acked by learner",
)
JS_MESSAGES_REDELIVERED = Counter(
"js_messages_redelivered_total",
"Total redelivered JetStream messages observed by learner",
)
EVENTS_SELECTED = Counter(
"experience_learner_events_selected_total",
"Events selected for learner processing",
["reason"],
)
EVENTS_DROPPED = Counter(
"experience_learner_events_dropped_total",
"Events dropped by learner filtering/dedup",
["reason"],
)
LESSON_PUBLISH = Counter(
"experience_learner_lessons_published_total",
"Lesson publish attempts to JetStream",
["status"],
)
ANTI_SILENT_TUNING_EVALUATED = Counter(
"experience_learner_anti_silent_tuning_evaluated_total",
"Anti-silent tuning lesson generation evaluations",
["status"],
)
CONSUMER_RUNNING = Gauge(
"experience_learner_consumer_running",
"1 when learner consumer loop is running",
)
@dataclass
class EventSample:
ts_mono: float
ok: bool
latency_ms: int
class ExperienceLearner:
def __init__(self) -> None:
self.node_id = os.getenv("NODE_ID", "NODA1")
self.nats_url = os.getenv("NATS_URL", "nats://nats:4222")
self.stream_name = os.getenv("EXPERIENCE_STREAM_NAME", "EXPERIENCE")
self.subject = os.getenv("EXPERIENCE_SUBJECT", "agent.experience.v1.>")
self.lesson_subject = os.getenv("LESSON_SUBJECT", "agent.lesson.v1")
self.durable = os.getenv("EXPERIENCE_DURABLE", "experience-learner-v1")
self.deliver_policy = os.getenv("EXPERIENCE_DELIVER_POLICY", "all").lower()
self.ack_wait_s = float(os.getenv("EXPERIENCE_ACK_WAIT_SECONDS", "30"))
self.max_deliver = int(os.getenv("EXPERIENCE_MAX_DELIVER", "20"))
self.fetch_batch = int(os.getenv("EXPERIENCE_FETCH_BATCH", "64"))
self.fetch_timeout_s = float(os.getenv("EXPERIENCE_FETCH_TIMEOUT_SECONDS", "2"))
self.window_s = int(os.getenv("EXPERIENCE_WINDOW_SECONDS", "1800"))
self.ok_sample_pct = float(os.getenv("EXPERIENCE_OK_SAMPLE_PCT", "10"))
self.latency_spike_ms = int(os.getenv("EXPERIENCE_LATENCY_SPIKE_MS", "5000"))
self.error_threshold = int(os.getenv("EXPERIENCE_ERROR_THRESHOLD", "3"))
self.silent_threshold = int(os.getenv("EXPERIENCE_SILENT_THRESHOLD", "5"))
self.latency_threshold = int(os.getenv("EXPERIENCE_LATENCY_THRESHOLD", "3"))
self.event_dedup_ttl_s = int(os.getenv("EXPERIENCE_EVENT_DEDUP_TTL_SECONDS", "3600"))
self.event_dedup_max = int(os.getenv("EXPERIENCE_EVENT_DEDUP_MAX", "100000"))
self.publish_lessons = os.getenv("LESSON_PUBLISH_ENABLED", "true").lower() in {"1", "true", "yes"}
self.anti_silent_tuning_enabled = os.getenv("ANTI_SILENT_TUNING_ENABLED", "true").lower() in {"1", "true", "yes"}
self.anti_silent_window_days = max(1, int(os.getenv("ANTI_SILENT_TUNING_WINDOW_DAYS", "7")))
self.anti_silent_min_evidence = max(1, int(os.getenv("ANTI_SILENT_TUNING_MIN_EVIDENCE", "20")))
self.anti_silent_min_score = max(0.0, min(1.0, float(os.getenv("ANTI_SILENT_TUNING_MIN_SCORE", "0.75"))))
self.anti_silent_weight_retry = max(0.0, min(1.0, float(os.getenv("ANTI_SILENT_TUNING_WEIGHT_RETRY", "0.6"))))
self.anti_silent_weight_negative = max(0.0, min(1.0, float(os.getenv("ANTI_SILENT_TUNING_WEIGHT_NEGATIVE", "0.3"))))
self.anti_silent_weight_suppressed = max(0.0, min(1.0, float(os.getenv("ANTI_SILENT_TUNING_WEIGHT_SUPPRESSED", "0.1"))))
self.anti_silent_ttl_days = max(1, int(os.getenv("ANTI_SILENT_TUNING_TTL_DAYS", "7")))
self.db_dsn = (
os.getenv("LEARNER_DATABASE_URL")
or os.getenv("EXPERIENCE_DATABASE_URL")
or os.getenv("DATABASE_URL")
)
if not self.db_dsn:
raise RuntimeError("LEARNER_DATABASE_URL (or EXPERIENCE_DATABASE_URL/DATABASE_URL) is required")
self._running = False
self._task: Optional[asyncio.Task[Any]] = None
self._nc = None
self._js = None
self._sub = None
self._pool: Optional[asyncpg.Pool] = None
self._seen_events: "OrderedDict[str, float]" = OrderedDict()
self._buckets: Dict[str, Deque[EventSample]] = {}
self._lock = asyncio.Lock()
async def start(self) -> None:
if self._running:
return
self._pool = await asyncpg.create_pool(self.db_dsn, min_size=1, max_size=4)
self._nc = await nats.connect(self.nats_url)
self._js = self._nc.jetstream()
await self._ensure_consumer()
self._sub = await self._js.pull_subscribe(
self.subject,
durable=self.durable,
stream=self.stream_name,
)
self._running = True
CONSUMER_RUNNING.set(1)
self._task = asyncio.create_task(self._consume_loop(), name="experience-learner")
logger.info(
"experience-learner started stream=%s subject=%s durable=%s",
self.stream_name,
self.subject,
self.durable,
)
async def stop(self) -> None:
self._running = False
CONSUMER_RUNNING.set(0)
if self._task:
self._task.cancel()
with contextlib.suppress(Exception):
await self._task
self._task = None
if self._nc:
await self._nc.close()
self._nc = None
self._js = None
self._sub = None
if self._pool:
await self._pool.close()
self._pool = None
async def _ensure_consumer(self) -> None:
if self._js is None:
return
deliver_policy = DeliverPolicy.ALL if self.deliver_policy == "all" else DeliverPolicy.NEW
cfg = ConsumerConfig(
durable_name=self.durable,
ack_policy=AckPolicy.EXPLICIT,
ack_wait=self.ack_wait_s,
max_deliver=self.max_deliver,
deliver_policy=deliver_policy,
filter_subject=self.subject,
)
try:
await self._js.add_consumer(self.stream_name, config=cfg)
logger.info("consumer created durable=%s stream=%s", self.durable, self.stream_name)
except Exception as exc:
msg = str(exc).lower()
if "consumer name already in use" in msg or "consumer already exists" in msg:
logger.info("consumer exists durable=%s stream=%s", self.durable, self.stream_name)
else:
raise
async def _consume_loop(self) -> None:
assert self._sub is not None
while self._running:
try:
msgs = await self._sub.fetch(self.fetch_batch, timeout=self.fetch_timeout_s)
except asyncio.TimeoutError:
continue
except Exception as exc:
logger.warning("fetch failed: %s", exc)
await asyncio.sleep(1.0)
continue
for msg in msgs:
await self._handle_msg(msg)
async def _handle_msg(self, msg: Msg) -> None:
try:
metadata = getattr(msg, "metadata", None)
if metadata is not None and getattr(metadata, "num_delivered", 1) > 1:
JS_MESSAGES_REDELIVERED.inc()
event = json.loads(msg.data.decode("utf-8", errors="replace"))
if not isinstance(event, dict):
EVENTS_DROPPED.labels(reason="invalid_payload").inc()
await msg.ack()
JS_MESSAGES_ACKED.inc()
return
event_id = str(event.get("event_id") or "").strip()
if event_id and await self._seen_event(event_id):
EVENTS_DROPPED.labels(reason="event_dedup").inc()
await msg.ack()
JS_MESSAGES_ACKED.inc()
return
keep, reason = self._should_keep(event)
if not keep:
EVENTS_DROPPED.labels(reason=reason).inc()
await msg.ack()
JS_MESSAGES_ACKED.inc()
return
EVENTS_SELECTED.labels(reason=reason).inc()
lessons = await self._extract_lessons(event)
if not lessons:
await msg.ack()
JS_MESSAGES_ACKED.inc()
return
for lesson in lessons:
LESSONS_EXTRACTED.labels(status="ok").inc()
insert_status = await self._insert_lesson(lesson)
LESSONS_INSERT.labels(status=insert_status).inc()
if insert_status == "ok" and self.publish_lessons:
await self._publish_lesson(lesson)
await msg.ack()
JS_MESSAGES_ACKED.inc()
except Exception as exc:
LESSONS_EXTRACTED.labels(status="err").inc()
logger.exception("message handling failed: %s", exc)
with contextlib.suppress(Exception):
await msg.nak()
async def _seen_event(self, event_id: str) -> bool:
now = time.monotonic()
async with self._lock:
self._prune_seen(now)
seen_ts = self._seen_events.get(event_id)
if seen_ts is not None and (now - seen_ts) < self.event_dedup_ttl_s:
return True
self._seen_events[event_id] = now
self._seen_events.move_to_end(event_id, last=True)
while len(self._seen_events) > self.event_dedup_max:
self._seen_events.popitem(last=False)
return False
def _prune_seen(self, now: float) -> None:
threshold = now - self.event_dedup_ttl_s
while self._seen_events:
_, ts = next(iter(self._seen_events.items()))
if ts >= threshold:
break
self._seen_events.popitem(last=False)
def _should_keep(self, event: Dict[str, Any]) -> Tuple[bool, str]:
result = event.get("result") or {}
llm = event.get("llm") or {}
policy = event.get("policy") or {}
ok = bool(result.get("ok"))
status = _as_int(result.get("http_status"), 0)
latency_ms = _as_int(llm.get("latency_ms"), 0)
sowa_decision = str(policy.get("sowa_decision") or "").upper()
if not ok:
return True, "error"
if self._is_anti_silent_gateway_event(event):
return True, "anti_silent_signal"
if sowa_decision == "SILENT":
return True, "silent"
if status >= 500:
return True, "http_5xx"
if latency_ms >= self.latency_spike_ms:
return True, "latency_spike"
if random.random() * 100.0 < self.ok_sample_pct:
return True, "ok_sample_in"
return False, "ok_sample_out"
def _is_anti_silent_gateway_event(self, event: Dict[str, Any]) -> bool:
if not self.anti_silent_tuning_enabled:
return False
if str(event.get("source") or "").strip().lower() != "gateway":
return False
action = str(event.get("anti_silent_action") or "").strip().upper()
if action not in {"ACK_EMITTED", "ACK_SUPPRESSED_COOLDOWN"}:
return False
reason = str((event.get("policy") or {}).get("reason") or "").strip()
template_id = str(event.get("anti_silent_template") or "").strip()
return bool(reason and template_id)
async def _extract_lessons(self, event: Dict[str, Any]) -> list[Dict[str, Any]]:
lessons: list[Dict[str, Any]] = []
tuning_lesson = await self._try_extract_anti_silent_tuning_lesson(event)
if tuning_lesson is not None:
lessons.append(tuning_lesson)
operational_lesson = await self._try_extract_lesson(event)
if operational_lesson is not None:
lessons.append(operational_lesson)
return lessons
async def _try_extract_lesson(self, event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
categories = self._lesson_categories(event)
if not categories:
return None
for category in categories:
bucket_key = self._bucket_key(category, event)
count, ok_rate, p95_latency = self._update_bucket(bucket_key, event)
threshold = self._threshold_for(category)
if count < threshold:
continue
lesson = self._build_lesson(
category=category,
event=event,
count=count,
ok_rate=ok_rate,
p95_latency=p95_latency,
)
return lesson
return None
async def _try_extract_anti_silent_tuning_lesson(self, event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if not self._is_anti_silent_gateway_event(event):
return None
if self._pool is None:
ANTI_SILENT_TUNING_EVALUATED.labels(status="pool_missing").inc()
return None
policy = event.get("policy") or {}
reason = _safe_token(policy.get("reason"))
chat_type = _safe_token(event.get("chat_type"))
if not reason or not chat_type:
ANTI_SILENT_TUNING_EVALUATED.labels(status="missing_fields").inc()
return None
stats = await self._anti_silent_stats(reason=reason, chat_type=chat_type)
if not stats:
ANTI_SILENT_TUNING_EVALUATED.labels(status="no_data").inc()
return None
candidates = [item for item in stats if int(item.get("n") or 0) >= self.anti_silent_min_evidence]
if not candidates:
ANTI_SILENT_TUNING_EVALUATED.labels(status="insufficient_evidence").inc()
return None
best = max(candidates, key=lambda item: (float(item.get("score", 0.0)), int(item.get("n", 0))))
best_score = float(best.get("score", 0.0))
if best_score < self.anti_silent_min_score:
ANTI_SILENT_TUNING_EVALUATED.labels(status="below_score").inc()
return None
worst = min(candidates, key=lambda item: (float(item.get("score", 0.0)), -int(item.get("n", 0))))
best_template = str(best.get("template_id") or "").strip().upper()
if not best_template:
ANTI_SILENT_TUNING_EVALUATED.labels(status="bad_template").inc()
return None
trigger = f"reason={reason};chat_type={chat_type}"
action = f"prefer_template={best_template}"
avoid = ""
worst_template = str(worst.get("template_id") or "").strip().upper()
if worst_template and worst_template != best_template:
avoid = f"avoid_template={worst_template}"
if not avoid:
avoid = "avoid_template=none"
lesson_type = "anti_silent_tuning"
lesson_key_raw = "|".join([lesson_type, trigger, action])
lesson_key = hashlib.sha256(lesson_key_raw.encode("utf-8")).hexdigest()
now_dt = datetime.now(timezone.utc)
expires_at = (now_dt + timedelta(days=self.anti_silent_ttl_days)).isoformat().replace("+00:00", "Z")
evidence = {
"n_best": int(best.get("n") or 0),
"score_best": round(best_score, 6),
"retry_rate": round(float(best.get("retry_rate", 0.0)), 6),
"negative_rate": round(float(best.get("negative_rate", 0.0)), 6),
"suppressed_rate": round(float(best.get("suppressed_rate", 0.0)), 6),
"window_days": self.anti_silent_window_days,
"weights": {
"retry": self.anti_silent_weight_retry,
"negative": self.anti_silent_weight_negative,
"suppressed": self.anti_silent_weight_suppressed,
},
"candidates": stats,
}
signals = {
"policy_reason": reason,
"chat_type": chat_type,
"lesson_type": lesson_type,
"trigger_kind": "anti_silent_ack_template",
}
lesson: Dict[str, Any] = {
"lesson_id": str(uuid.uuid4()),
"lesson_key": lesson_key,
"lesson_type": lesson_type,
"ts": now_dt.isoformat().replace("+00:00", "Z"),
"expires_at": expires_at,
"scope": "global",
"agent_id": None,
"task_type": "webhook",
"trigger": trigger,
"action": action,
"avoid": avoid,
"signals": signals,
"evidence": evidence,
}
lesson["raw"] = dict(lesson)
ANTI_SILENT_TUNING_EVALUATED.labels(status="ok").inc()
return lesson
async def _anti_silent_stats(self, *, reason: str, chat_type: str) -> list[Dict[str, Any]]:
if self._pool is None:
return []
query = """
SELECT
COALESCE(raw->>'anti_silent_template', '') AS template_id,
COUNT(*)::int AS n,
AVG(
CASE
WHEN COALESCE(raw->'feedback'->>'user_signal', 'none') = 'retry' THEN 1.0
ELSE 0.0
END
)::float8 AS retry_rate,
AVG(
CASE
WHEN COALESCE(raw->'feedback'->>'user_signal', 'none') = 'negative' THEN 1.0
ELSE 0.0
END
)::float8 AS negative_rate,
AVG(
CASE
WHEN COALESCE(raw->>'anti_silent_action', '') = 'ACK_SUPPRESSED_COOLDOWN' THEN 1.0
ELSE 0.0
END
)::float8 AS suppressed_rate
FROM agent_experience_events
WHERE source = 'gateway'
AND ts >= (now() - ($1::int * interval '1 day'))
AND COALESCE(raw->'policy'->>'reason', '') = $2
AND COALESCE(raw->>'chat_type', 'unknown') = $3
AND COALESCE(raw->>'anti_silent_action', '') IN ('ACK_EMITTED', 'ACK_SUPPRESSED_COOLDOWN')
AND COALESCE(raw->>'anti_silent_template', '') <> ''
GROUP BY 1
HAVING COUNT(*) >= $4
"""
try:
async with self._pool.acquire() as conn:
rows = await conn.fetch(
query,
self.anti_silent_window_days,
reason,
chat_type,
self.anti_silent_min_evidence,
)
except Exception as exc:
logger.warning("anti-silent stats query failed: %s", exc)
return []
results: list[Dict[str, Any]] = []
for row in rows:
template_id = str(row.get("template_id") or "").strip().upper()
if not template_id:
continue
n = int(row.get("n") or 0)
retry_rate = float(row.get("retry_rate") or 0.0)
negative_rate = float(row.get("negative_rate") or 0.0)
suppressed_rate = float(row.get("suppressed_rate") or 0.0)
score = 1.0 - (
self.anti_silent_weight_retry * retry_rate
+ self.anti_silent_weight_negative * negative_rate
+ self.anti_silent_weight_suppressed * suppressed_rate
)
score = max(0.0, min(1.0, score))
results.append(
{
"template_id": template_id,
"n": n,
"retry_rate": retry_rate,
"negative_rate": negative_rate,
"suppressed_rate": suppressed_rate,
"score": score,
}
)
return results
def _lesson_categories(self, event: Dict[str, Any]) -> list[str]:
result = event.get("result") or {}
llm = event.get("llm") or {}
policy = event.get("policy") or {}
categories: list[str] = []
if not bool(result.get("ok")):
categories.append("error_repeat")
if str(policy.get("sowa_decision") or "").upper() == "SILENT":
categories.append("silent_repeat")
if _as_int(llm.get("latency_ms"), 0) >= self.latency_spike_ms:
categories.append("latency_spike")
return categories
def _bucket_key(self, category: str, event: Dict[str, Any]) -> str:
llm = event.get("llm") or {}
result = event.get("result") or {}
policy = event.get("policy") or {}
parts = [
category,
str(event.get("agent_id") or ""),
str(event.get("task_type") or "infer"),
str(result.get("error_class") or ""),
str(policy.get("reason") or ""),
str(llm.get("provider") or ""),
str(llm.get("model") or ""),
str(llm.get("profile") or ""),
]
return "|".join(parts)
def _update_bucket(self, bucket_key: str, event: Dict[str, Any]) -> Tuple[int, Optional[float], Optional[int]]:
now = time.monotonic()
llm = event.get("llm") or {}
result = event.get("result") or {}
sample = EventSample(
ts_mono=now,
ok=bool(result.get("ok")),
latency_ms=_as_int(llm.get("latency_ms"), 0),
)
bucket = self._buckets.get(bucket_key)
if bucket is None:
bucket = deque()
self._buckets[bucket_key] = bucket
bucket.append(sample)
cutoff = now - self.window_s
while bucket and bucket[0].ts_mono < cutoff:
bucket.popleft()
if not bucket:
return 0, None, None
count = len(bucket)
ok_count = sum(1 for item in bucket if item.ok)
ok_rate = round(ok_count / count, 4) if count > 0 else None
latencies = sorted(item.latency_ms for item in bucket)
p95_latency = _p95(latencies)
return count, ok_rate, p95_latency
def _threshold_for(self, category: str) -> int:
if category == "error_repeat":
return self.error_threshold
if category == "silent_repeat":
return self.silent_threshold
return self.latency_threshold
def _build_lesson(
self,
category: str,
event: Dict[str, Any],
count: int,
ok_rate: Optional[float],
p95_latency: Optional[int],
) -> Dict[str, Any]:
llm = event.get("llm") or {}
result = event.get("result") or {}
policy = event.get("policy") or {}
agent_id = str(event.get("agent_id") or "").strip() or None
task_type = str(event.get("task_type") or "infer")
error_class = _safe_token(result.get("error_class"))
policy_reason = _safe_token(policy.get("reason"))
sowa_decision = _safe_token(policy.get("sowa_decision"))
if category == "silent_repeat":
trigger = "Frequent SILENT policy outcomes on active conversation flow."
action = "Use short ACK/CHALLENGE clarification before silencing response."
avoid = "Avoid immediate SILENT when user intent might target the agent."
elif category == "latency_spike":
trigger = "Repeated latency spikes above configured SLA threshold."
action = "Prefer faster model/profile and reduce expensive tool rounds."
avoid = "Avoid routing to slow provider/profile for same task pattern."
else:
trigger = f"Repeated inference failures of class '{error_class or 'unknown_error'}'."
action = "Switch to stable provider/profile and constrain optional tool calls."
avoid = "Avoid blind retries on the same failing route."
scope = "agent" if agent_id else "global"
lesson_key_raw = "|".join(
[
scope,
str(agent_id or ""),
trigger,
action,
avoid,
str(error_class or ""),
str(policy_reason or ""),
]
)
lesson_key = hashlib.sha256(lesson_key_raw.encode("utf-8")).hexdigest()
lesson = {
"lesson_id": str(uuid.uuid4()),
"lesson_key": lesson_key,
"ts": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"scope": scope,
"agent_id": agent_id,
"task_type": task_type,
"trigger": trigger,
"action": action,
"avoid": avoid,
"signals": {
"policy_reason": policy_reason,
"policy_decision": sowa_decision,
"error_class": error_class,
"provider": _safe_token(llm.get("provider")),
"model": _safe_token(llm.get("model")),
"profile": _safe_token(llm.get("profile")),
},
"evidence": {
"count": count,
"ok_rate": ok_rate,
"p95_latency_ms": p95_latency,
},
}
lesson["raw"] = dict(lesson)
return lesson
async def _insert_lesson(self, lesson: Dict[str, Any]) -> str:
if self._pool is None:
return "err"
query_insert = """
INSERT INTO agent_lessons (
lesson_id,
lesson_key,
ts,
scope,
agent_id,
task_type,
trigger,
action,
avoid,
signals,
evidence,
raw
) VALUES (
$1::uuid,
$2,
$3::timestamptz,
$4,
$5,
$6,
$7,
$8,
$9,
$10::jsonb,
$11::jsonb,
$12::jsonb
)
ON CONFLICT (lesson_key) DO NOTHING
RETURNING id
"""
query_tuning_upsert = """
INSERT INTO agent_lessons (
lesson_id,
lesson_key,
ts,
scope,
agent_id,
task_type,
trigger,
action,
avoid,
signals,
evidence,
raw
) VALUES (
$1::uuid,
$2,
$3::timestamptz,
$4,
$5,
$6,
$7,
$8,
$9,
$10::jsonb,
$11::jsonb,
$12::jsonb
)
ON CONFLICT (lesson_key) DO UPDATE SET
ts = EXCLUDED.ts,
scope = EXCLUDED.scope,
agent_id = EXCLUDED.agent_id,
task_type = EXCLUDED.task_type,
trigger = EXCLUDED.trigger,
action = EXCLUDED.action,
avoid = EXCLUDED.avoid,
signals = EXCLUDED.signals,
evidence = EXCLUDED.evidence,
raw = EXCLUDED.raw
RETURNING id
"""
try:
lesson_id = uuid.UUID(str(lesson["lesson_id"]))
ts_value = _as_timestamptz(lesson["ts"])
lesson_type = str(lesson.get("lesson_type") or "").strip().lower()
query = query_tuning_upsert if lesson_type == "anti_silent_tuning" else query_insert
async with self._pool.acquire() as conn:
row_id = await conn.fetchval(
query,
lesson_id,
lesson["lesson_key"],
ts_value,
lesson["scope"],
lesson.get("agent_id"),
lesson["task_type"],
lesson["trigger"],
lesson["action"],
lesson["avoid"],
json.dumps(lesson["signals"], ensure_ascii=False),
json.dumps(lesson["evidence"], ensure_ascii=False),
json.dumps(lesson, ensure_ascii=False),
)
if row_id is None:
return "conflict"
return "ok"
except Exception as exc:
logger.warning("insert lesson failed: %s", exc)
return "err"
async def _publish_lesson(self, lesson: Dict[str, Any]) -> None:
if self._js is None:
LESSON_PUBLISH.labels(status="skipped").inc()
return
payload = json.dumps(lesson, ensure_ascii=False).encode("utf-8")
headers = {"Nats-Msg-Id": str(lesson["lesson_id"])}
try:
await self._js.publish(self.lesson_subject, payload, headers=headers)
LESSON_PUBLISH.labels(status="ok").inc()
except Exception as exc:
LESSON_PUBLISH.labels(status="err").inc()
logger.warning("publish lesson failed: %s", exc)
async def health(self) -> Dict[str, Any]:
return {
"status": "ok" if self._running else "starting",
"node_id": self.node_id,
"stream": self.stream_name,
"subject": self.subject,
"durable": self.durable,
"nats_connected": self._nc is not None and self._nc.is_connected,
"db_connected": self._pool is not None,
"running": self._running,
}
def _safe_token(value: Any) -> Optional[str]:
if value is None:
return None
text = str(value)
text = re.sub(r"(?i)bearer\s+[A-Za-z0-9._-]+", "bearer [redacted]", text)
text = re.sub(r"(?i)(api[_-]?key|token|password|secret)\s*[:=]\s*[^\s,;]+", r"\1=[redacted]", text)
text = re.sub(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", "[redacted-email]", text)
text = re.sub(r"https?://[^\s]+", "[redacted-url]", text)
text = re.sub(r"\s+", " ", text).strip()
return text[:180] if text else None
def _as_int(value: Any, default: int) -> int:
try:
return int(value)
except Exception:
return default
def _as_timestamptz(value: Any) -> datetime:
if isinstance(value, datetime):
return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc)
try:
parsed = datetime.fromisoformat(str(value).replace("Z", "+00:00"))
return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc)
except Exception:
return datetime.now(timezone.utc)
def _p95(sorted_values: list[int]) -> Optional[int]:
if not sorted_values:
return None
idx = int(round(0.95 * (len(sorted_values) - 1)))
return sorted_values[min(max(idx, 0), len(sorted_values) - 1)]
app = FastAPI(title="Experience Learner")
learner = ExperienceLearner()
@app.on_event("startup")
async def startup() -> None:
await learner.start()
@app.on_event("shutdown")
async def shutdown() -> None:
await learner.stop()
@app.get("/health")
async def health() -> Dict[str, Any]:
return await learner.health()
@app.get("/metrics")
async def metrics() -> Response:
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

View File

@@ -0,0 +1,5 @@
fastapi==0.104.1
uvicorn[standard]==0.24.0
nats-py==2.6.0
asyncpg>=0.29.0
prometheus-client>=0.20.0

View File

@@ -443,6 +443,9 @@ class Database:
) -> Dict[str, Any]:
"""Create or update a user fact (isolated by agent_id)"""
import json
# Normalize NULL to empty string so ON CONFLICT matches (PostgreSQL: NULL != NULL in unique)
_agent_id = (agent_id or "").strip()
_team_id = team_id or ""
# Convert dict to JSON string for asyncpg JSONB
json_value = json.dumps(fact_value_json) if fact_value_json else None
@@ -457,7 +460,7 @@ class Database:
fact_value_json = EXCLUDED.fact_value_json,
updated_at = NOW()
RETURNING *
""", user_id, team_id, agent_id, fact_key, fact_value, json_value)
""", user_id, _team_id, _agent_id, fact_key, fact_value, json_value)
except asyncpg.exceptions.InvalidColumnReferenceError:
# Backward compatibility for DBs that only have UNIQUE(user_id, team_id, fact_key).
row = await conn.fetchrow("""
@@ -470,7 +473,7 @@ class Database:
fact_value_json = EXCLUDED.fact_value_json,
updated_at = NOW()
RETURNING *
""", user_id, team_id, agent_id, fact_key, fact_value, json_value)
""", user_id, _team_id, _agent_id, fact_key, fact_value, json_value)
return dict(row) if row else {}

View File

@@ -209,6 +209,57 @@ if PROMETHEUS_AVAILABLE:
registry=REGISTRY
)
# ==================== EXPERIENCE BUS METRICS ====================
EXPERIENCE_PUBLISHED = Counter(
'experience_published_total',
'Total experience events publish attempts',
['source', 'transport', 'status'], # transport: jetstream|core|none
registry=REGISTRY
)
EXPERIENCE_DB_INSERT = Counter(
'experience_db_insert_total',
'Total experience event DB insert attempts',
['source', 'status'], # status: ok|error|skipped
registry=REGISTRY
)
EXPERIENCE_DEDUP_DROPPED = Counter(
'experience_dedup_dropped_total',
'Total experience events dropped by dedup',
['source'],
registry=REGISTRY
)
EXPERIENCE_SAMPLED = Counter(
'experience_sampled_total',
'Total experience events sampled in/out',
['source', 'decision', 'reason'], # decision: in|out
registry=REGISTRY
)
LESSONS_RETRIEVED = Counter(
'lessons_retrieved_total',
'Total lessons retrieval attempts',
['status'], # status: ok|timeout|err
registry=REGISTRY
)
LESSONS_ATTACHED = Counter(
'lessons_attached_total',
'Total lessons attached buckets',
['count'], # count: 0|1-3|4-7
registry=REGISTRY
)
LESSONS_ATTACH_LATENCY = Histogram(
'lessons_attach_latency_ms',
'Lessons retrieval latency in milliseconds',
buckets=(1, 2, 5, 10, 25, 50, 100, 250, 500, 1000, 2500),
registry=REGISTRY
)
# ==================== METRIC HELPERS ====================
@@ -357,6 +408,61 @@ def track_agent_request(agent_id: str, operation: str):
return decorator
def inc_experience_published(source: str, transport: str, status: str) -> None:
if not PROMETHEUS_AVAILABLE:
return
EXPERIENCE_PUBLISHED.labels(source=source, transport=transport, status=status).inc()
def inc_experience_db_insert(source: str, status: str) -> None:
if not PROMETHEUS_AVAILABLE:
return
EXPERIENCE_DB_INSERT.labels(source=source, status=status).inc()
def inc_experience_dedup_dropped(source: str) -> None:
if not PROMETHEUS_AVAILABLE:
return
EXPERIENCE_DEDUP_DROPPED.labels(source=source).inc()
def inc_experience_sampled(source: str, decision: str, reason: str) -> None:
if not PROMETHEUS_AVAILABLE:
return
EXPERIENCE_SAMPLED.labels(source=source, decision=decision, reason=reason).inc()
def inc_lessons_retrieved(status: str) -> None:
if not PROMETHEUS_AVAILABLE:
return
LESSONS_RETRIEVED.labels(status=status).inc()
def inc_lessons_attached(count: int) -> None:
if not PROMETHEUS_AVAILABLE:
return
try:
n = int(count)
except Exception:
n = 0
if n <= 0:
bucket = "0"
elif n <= 3:
bucket = "1-3"
else:
bucket = "4-7"
LESSONS_ATTACHED.labels(count=bucket).inc()
def observe_lessons_attach_latency(latency_ms: float) -> None:
if not PROMETHEUS_AVAILABLE:
return
try:
LESSONS_ATTACH_LATENCY.observe(float(latency_ms))
except Exception:
return
# ==================== GPU METRICS COLLECTOR ====================
async def collect_gpu_metrics(node_id: str = "node1"):

View File

@@ -0,0 +1,446 @@
"""Router experience event bus (Phase-1).
Collects inference outcome events, applies sampling + dedup, then
persists to JetStream and Postgres in async background worker.
"""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
import json
import logging
import os
import random
import re
import time
import uuid
from collections import OrderedDict
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
try:
import asyncpg
except ImportError: # pragma: no cover - runtime dependency in container
asyncpg = None
try:
from agent_metrics import (
inc_experience_db_insert,
inc_experience_dedup_dropped,
inc_experience_published,
inc_experience_sampled,
)
except Exception: # pragma: no cover - keep router resilient
def inc_experience_published(*args: Any, **kwargs: Any) -> None: # type: ignore[override]
return None
def inc_experience_db_insert(*args: Any, **kwargs: Any) -> None: # type: ignore[override]
return None
def inc_experience_dedup_dropped(*args: Any, **kwargs: Any) -> None: # type: ignore[override]
return None
def inc_experience_sampled(*args: Any, **kwargs: Any) -> None: # type: ignore[override]
return None
logger = logging.getLogger("experience_bus")
@dataclass
class ExperienceDecision:
keep: bool
reason: str
class ExperienceBus:
def __init__(self) -> None:
self.enabled = os.getenv("EXPERIENCE_BUS_ENABLED", "true").lower() in {"1", "true", "yes"}
self.node_id = os.getenv("NODE_ID", "NODA1")
self.ok_sample_pct = float(os.getenv("EXPERIENCE_OK_SAMPLE_PCT", "10"))
self.latency_spike_ms = int(os.getenv("EXPERIENCE_LATENCY_SPIKE_MS", "5000"))
self.dedup_window_s = int(os.getenv("EXPERIENCE_DEDUP_WINDOW_SECONDS", "900"))
self.dedup_max_keys = int(os.getenv("EXPERIENCE_DEDUP_MAX_KEYS", "20000"))
self.queue_max = int(os.getenv("EXPERIENCE_QUEUE_MAX", "2000"))
self.publish_timeout_s = float(os.getenv("EXPERIENCE_PUBLISH_TIMEOUT_MS", "800") or 800) / 1000.0
self.db_timeout_s = float(os.getenv("EXPERIENCE_DB_TIMEOUT_MS", "1200") or 1200) / 1000.0
self.subject_prefix = os.getenv("EXPERIENCE_SUBJECT_PREFIX", "agent.experience.v1")
self.stream_name = os.getenv("EXPERIENCE_STREAM_NAME", "EXPERIENCE")
self.enable_nats = os.getenv("EXPERIENCE_ENABLE_NATS", "true").lower() in {"1", "true", "yes"}
self.enable_db = os.getenv("EXPERIENCE_ENABLE_DB", "true").lower() in {"1", "true", "yes"}
self.db_dsn = os.getenv("EXPERIENCE_DATABASE_URL") or os.getenv("DATABASE_URL")
self._queue: asyncio.Queue[Optional[Dict[str, Any]]] = asyncio.Queue(maxsize=self.queue_max)
self._worker_task: Optional[asyncio.Task[Any]] = None
self._running = False
self._dedup_lock = asyncio.Lock()
self._dedup: "OrderedDict[str, float]" = OrderedDict()
self._pool: Optional[Any] = None
self._nc: Any = None
self._js: Any = None
async def start(self, nats_client: Any = None) -> None:
if not self.enabled:
logger.info("ExperienceBus disabled by env")
return
if self._running:
return
if self.enable_db:
await self._init_db()
if self.enable_nats and nats_client is not None:
await self.set_nats_client(nats_client)
self._running = True
self._worker_task = asyncio.create_task(self._worker(), name="experience-bus-worker")
logger.info(
"ExperienceBus started (db=%s nats=%s queue_max=%s sample_ok=%s%% dedup_window=%ss)",
bool(self._pool),
bool(self._js or self._nc),
self.queue_max,
self.ok_sample_pct,
self.dedup_window_s,
)
async def stop(self) -> None:
if not self._running:
return
self._running = False
try:
self._queue.put_nowait(None)
except asyncio.QueueFull:
pass
if self._worker_task is not None:
try:
await asyncio.wait_for(self._worker_task, timeout=5.0)
except Exception:
self._worker_task.cancel()
self._worker_task = None
if self._pool is not None:
try:
await self._pool.close()
except Exception as e: # pragma: no cover
logger.debug("ExperienceBus pool close error: %s", e)
self._pool = None
self._js = None
self._nc = None
logger.info("ExperienceBus stopped")
async def set_nats_client(self, nats_client: Any) -> None:
if not self.enabled or not self.enable_nats:
return
self._nc = nats_client
if self._nc is None:
self._js = None
return
try:
self._js = self._nc.jetstream()
await self._ensure_stream()
except Exception as e:
self._js = None
logger.warning("ExperienceBus JetStream unavailable: %s", e)
async def capture(self, event: Dict[str, Any]) -> None:
"""Apply sampling/dedup and enqueue for async persistence."""
if not self.enabled or not self._running:
return
decision = await self._decide(event)
if not decision.keep:
if decision.reason == "dedup":
inc_experience_dedup_dropped(source="router")
inc_experience_sampled(source="router", decision="out", reason=decision.reason)
return
inc_experience_sampled(source="router", decision="in", reason=decision.reason)
try:
self._queue.put_nowait(event)
except asyncio.QueueFull:
inc_experience_sampled(source="router", decision="out", reason="queue_full")
logger.warning("ExperienceBus queue full; dropping event")
async def _init_db(self) -> None:
if asyncpg is None:
logger.warning("ExperienceBus DB disabled: asyncpg not installed")
return
if not self.db_dsn:
logger.warning("ExperienceBus DB disabled: DATABASE_URL missing")
return
try:
self._pool = await asyncpg.create_pool(self.db_dsn, min_size=1, max_size=3)
except Exception as e:
self._pool = None
logger.warning("ExperienceBus DB pool init failed: %s", e)
async def _ensure_stream(self) -> None:
if self._js is None:
return
subjects = [f"{self.subject_prefix}.>"]
try:
info = await self._js.stream_info(self.stream_name)
stream_subjects = set(getattr(info.config, "subjects", []) or [])
if not stream_subjects.intersection(subjects):
logger.warning(
"ExperienceBus stream '%s' exists without subject %s; keeping as-is",
self.stream_name,
subjects[0],
)
return
except Exception:
pass
try:
await self._js.add_stream(name=self.stream_name, subjects=subjects)
logger.info("ExperienceBus stream ensured: %s subjects=%s", self.stream_name, subjects)
except Exception as e:
logger.warning("ExperienceBus stream ensure failed: %s", e)
async def _decide(self, event: Dict[str, Any]) -> ExperienceDecision:
result = event.get("result") or {}
llm = event.get("llm") or {}
ok = bool(result.get("ok"))
latency_ms = int(llm.get("latency_ms") or 0)
if not ok:
sample_reason = "error"
elif latency_ms >= self.latency_spike_ms:
sample_reason = "latency_spike"
else:
if random.random() * 100.0 >= self.ok_sample_pct:
return ExperienceDecision(False, "ok_sample_out")
sample_reason = "ok_sample_in"
dedup_key = self._dedup_key(event)
now = time.monotonic()
async with self._dedup_lock:
self._prune_dedup(now)
seen_at = self._dedup.get(dedup_key)
if seen_at is not None and (now - seen_at) < self.dedup_window_s:
return ExperienceDecision(False, "dedup")
self._dedup[dedup_key] = now
self._dedup.move_to_end(dedup_key, last=True)
while len(self._dedup) > self.dedup_max_keys:
self._dedup.popitem(last=False)
return ExperienceDecision(True, sample_reason)
def _prune_dedup(self, now: float) -> None:
if not self._dedup:
return
threshold = now - self.dedup_window_s
while self._dedup:
_, ts = next(iter(self._dedup.items()))
if ts >= threshold:
break
self._dedup.popitem(last=False)
def _dedup_key(self, event: Dict[str, Any]) -> str:
result = event.get("result") or {}
return "|".join(
[
str(event.get("agent_id") or ""),
str(event.get("task_type") or ""),
str(event.get("inputs_hash") or ""),
"1" if bool(result.get("ok")) else "0",
str(result.get("error_class") or ""),
]
)
async def _worker(self) -> None:
while True:
event = await self._queue.get()
if event is None:
self._queue.task_done()
break
try:
await self._persist_event(event)
except Exception as e: # pragma: no cover
logger.warning("ExperienceBus persist error: %s", e)
finally:
self._queue.task_done()
async def _persist_event(self, event: Dict[str, Any]) -> None:
await self._publish_nats(event)
await self._insert_db(event)
async def _publish_nats(self, event: Dict[str, Any]) -> None:
subject = f"{self.subject_prefix}.{event.get('agent_id', 'unknown')}"
payload = json.dumps(event, ensure_ascii=False).encode("utf-8")
msg_id = str(event.get("event_id") or "").strip()
headers = {"Nats-Msg-Id": msg_id} if msg_id else None
if self._js is not None:
try:
await asyncio.wait_for(
self._js.publish(subject, payload, headers=headers),
timeout=self.publish_timeout_s,
)
inc_experience_published(source="router", transport="jetstream", status="ok")
return
except Exception as e:
inc_experience_published(source="router", transport="jetstream", status="error")
logger.debug("ExperienceBus JetStream publish failed: %s", e)
if self._nc is not None:
try:
await asyncio.wait_for(
self._nc.publish(subject, payload, headers=headers),
timeout=self.publish_timeout_s,
)
await asyncio.wait_for(self._nc.flush(), timeout=self.publish_timeout_s)
inc_experience_published(source="router", transport="core", status="ok")
return
except Exception as e:
inc_experience_published(source="router", transport="core", status="error")
logger.debug("ExperienceBus core NATS publish failed: %s", e)
inc_experience_published(source="router", transport="none", status="skipped")
async def _insert_db(self, event: Dict[str, Any]) -> None:
if self._pool is None:
inc_experience_db_insert(source="router", status="skipped")
return
payload_json = json.dumps(event, ensure_ascii=False)
llm = event.get("llm") or {}
result = event.get("result") or {}
event_uuid = _as_uuid(event.get("event_id"))
event_ts = _as_timestamptz(event.get("ts"))
query = """
INSERT INTO agent_experience_events (
event_id,
ts,
node_id,
source,
agent_id,
task_type,
request_id,
channel,
inputs_hash,
provider,
model,
profile,
latency_ms,
tokens_in,
tokens_out,
ok,
error_class,
error_msg_redacted,
http_status,
raw
) VALUES (
$1::uuid,
$2::timestamptz,
$3,
$4,
$5,
$6,
$7,
$8,
$9,
$10,
$11,
$12,
$13,
$14,
$15,
$16,
$17,
$18,
$19,
$20::jsonb
)
ON CONFLICT (event_id) DO NOTHING
"""
try:
async with self._pool.acquire() as conn:
await asyncio.wait_for(
conn.execute(
query,
event_uuid,
event_ts,
event.get("node_id"),
event.get("source"),
event.get("agent_id"),
event.get("task_type"),
event.get("request_id"),
event.get("channel", "unknown"),
event.get("inputs_hash"),
llm.get("provider", "unknown"),
llm.get("model", "unknown"),
llm.get("profile"),
int(llm.get("latency_ms") or 0),
_as_int_or_none(llm.get("tokens_in")),
_as_int_or_none(llm.get("tokens_out")),
bool(result.get("ok")),
result.get("error_class"),
result.get("error_msg_redacted"),
int(result.get("http_status") or 0),
payload_json,
),
timeout=self.db_timeout_s,
)
inc_experience_db_insert(source="router", status="ok")
except Exception as e:
inc_experience_db_insert(source="router", status="error")
logger.debug("ExperienceBus DB insert failed: %s", e)
def redact_error_message(value: Optional[str]) -> Optional[str]:
if value is None:
return None
text = str(value)
text = re.sub(r"(?i)(authorization\s*:\s*bearer)\s+[A-Za-z0-9._-]+", r"\1 [redacted]", text)
text = re.sub(r"(?i)(api[_-]?key|token|password|secret)\s*[:=]\s*[^\s,;]+", r"\1=[redacted]", text)
text = re.sub(r"\b[A-Za-z0-9_\-]{24,}\b", "[redacted]", text)
text = re.sub(r"\s+", " ", text).strip()
if len(text) > 300:
return text[:300]
return text
def normalize_input_for_hash(text: str) -> str:
value = re.sub(r"\s+", " ", (text or "").strip()).lower()
return value[:4000]
def _as_int_or_none(value: Any) -> Optional[int]:
try:
if value is None:
return None
return int(value)
except Exception:
return None
def _as_uuid(value: Any) -> uuid.UUID:
try:
return uuid.UUID(str(value))
except Exception:
return uuid.uuid4()
def _as_timestamptz(value: Any) -> datetime:
if isinstance(value, datetime):
return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc)
try:
parsed = datetime.fromisoformat(str(value).replace("Z", "+00:00"))
return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc)
except Exception:
return datetime.now(timezone.utc)

View File

@@ -1,10 +1,12 @@
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, Response
from pydantic import BaseModel, ConfigDict
from typing import Literal, Optional, Dict, Any, List
from typing import Literal, Optional, Dict, Any, List, Tuple
import asyncio
from collections import OrderedDict
import json
import os
import random as random_module
import re
import yaml
import httpx
@@ -12,6 +14,8 @@ import logging
import hashlib
import hmac
import time # For latency metrics
import uuid
from datetime import datetime, timezone, timedelta
from difflib import SequenceMatcher
# CrewAI Integration
@@ -62,6 +66,34 @@ except ImportError:
global_capabilities_client = None # type: ignore[assignment]
offload_client = None # type: ignore[assignment]
try:
from experience_bus import ExperienceBus, normalize_input_for_hash, redact_error_message
EXPERIENCE_BUS_AVAILABLE = True
except ImportError:
EXPERIENCE_BUS_AVAILABLE = False
ExperienceBus = None # type: ignore[assignment]
try:
import asyncpg
except ImportError:
asyncpg = None # type: ignore[assignment]
try:
from agent_metrics import (
inc_lessons_retrieved,
inc_lessons_attached,
observe_lessons_attach_latency,
)
except Exception:
def inc_lessons_retrieved(*args: Any, **kwargs: Any) -> None:
return None
def inc_lessons_attached(*args: Any, **kwargs: Any) -> None:
return None
def observe_lessons_attach_latency(*args: Any, **kwargs: Any) -> None:
return None
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
NEO4J_NOTIFICATIONS_LOG_LEVEL = os.getenv("NEO4J_NOTIFICATIONS_LOG_LEVEL", "ERROR").strip().upper()
@@ -71,6 +103,29 @@ logging.getLogger("neo4j.notifications").setLevel(_neo4j_notifications_level)
# If auto-router module is unavailable (or loaded later), inference must still work.
SOFIIA_AUTO_ROUTER_AVAILABLE = False
def _parse_agent_id_set(raw_value: Optional[str], default_csv: str = "") -> set[str]:
source = raw_value if (raw_value is not None and str(raw_value).strip() != "") else default_csv
out: set[str] = set()
for part in str(source or "").split(","):
token = part.strip().lower()
if token:
out.add(token)
return out
PLANNED_AGENT_IDS = _parse_agent_id_set(os.getenv("PLANNED_AGENT_IDS"), "aistalk")
DISABLED_AGENT_IDS = _parse_agent_id_set(os.getenv("DISABLED_AGENT_IDS"), "devtools")
def _inactive_agent_state(agent_id: str) -> Optional[str]:
aid = str(agent_id or "").strip().lower()
if aid in PLANNED_AGENT_IDS:
return "planned"
if aid in DISABLED_AGENT_IDS:
return "disabled"
return None
TRUSTED_DOMAINS_CONFIG_PATH = os.getenv("TRUSTED_DOMAINS_CONFIG_PATH", "./trusted_domains.yml")
_trusted_domains_cache: Dict[str, Any] = {"mtime": None, "data": {}}
@@ -894,6 +949,287 @@ def _select_default_llm(agent_id: str, metadata: Dict[str, Any], base_llm: str,
return use_llm
return base_llm
def _safe_json_from_bytes(payload: bytes) -> Dict[str, Any]:
if not payload:
return {}
try:
decoded = payload.decode("utf-8", errors="ignore").strip()
if not decoded:
return {}
value = json.loads(decoded)
if isinstance(value, dict):
return value
except Exception:
return {}
return {}
def _extract_infer_agent_id(path: str) -> Optional[str]:
match = _INFER_PATH_RE.match(path or "")
if not match:
return None
return (match.group(1) or "").strip().lower() or None
def _infer_channel_from_metadata(metadata: Dict[str, Any]) -> str:
channel = str(
metadata.get("channel")
or metadata.get("channel_type")
or metadata.get("source")
or metadata.get("entrypoint")
or "unknown"
).strip().lower()
if channel in {"telegram", "web", "api"}:
return channel
return "unknown"
def _derive_provider_from_backend_model(backend: str, model: str, profile: Optional[str]) -> str:
profiles = (router_config or {}).get("llm_profiles", {}) if isinstance(router_config, dict) else {}
if profile and isinstance(profiles, dict):
p = profiles.get(profile, {})
if isinstance(p, dict) and p.get("provider"):
return str(p.get("provider"))
b = str(backend or "").lower()
m = str(model or "").lower()
if "mistral" in b:
return "mistral"
if "deepseek" in b:
return "deepseek"
if "grok" in b:
return "grok"
if "anthropic" in b or "claude" in b:
return "anthropic"
if "openai" in b:
return "openai"
if "glm" in b:
return "glm"
if "nats-offload" in b:
return "remote"
if "ollama" in b or "local" in b:
return "local"
if any(m.startswith(prefix) for prefix in ("qwen", "gemma", "mistral", "deepseek", "glm")):
return "local"
return "other"
def _resolve_profile_for_event(agent_id: str, req_payload: Dict[str, Any]) -> Optional[str]:
if not isinstance(router_config, dict):
return None
metadata = req_payload.get("metadata")
if not isinstance(metadata, dict):
metadata = {}
agent_cfg = (router_config.get("agents") or {}).get(agent_id, {})
if not isinstance(agent_cfg, dict):
return None
base_llm = str(agent_cfg.get("default_llm") or "").strip()
if not base_llm:
return None
rules = router_config.get("routing") or []
if isinstance(rules, list):
return _select_default_llm(agent_id, metadata, base_llm, rules)
return base_llm
def _lesson_guarded_text(value: Any, max_len: int = 220) -> str:
text = re.sub(r"\s+", " ", str(value or "")).strip()
if not text:
return ""
lower = text.lower()
if any(marker in lower for marker in LESSONS_INJECTION_GUARDS):
return ""
if len(text) > max_len:
text = text[:max_len].rstrip()
return text
def _decode_lesson_signals(raw: Any) -> Dict[str, Any]:
if isinstance(raw, dict):
return dict(raw)
if isinstance(raw, str):
try:
parsed = json.loads(raw)
if isinstance(parsed, dict):
return parsed
except Exception:
return {}
return {}
def _score_lesson_record(
row: Dict[str, Any],
*,
agent_id: str,
provider: str,
model: str,
profile: str,
last_error_class: Optional[str],
) -> float:
score = 0.0
row_agent_id = str(row.get("agent_id") or "").strip().lower()
if row_agent_id and row_agent_id == agent_id:
score += 3.0
signals = _decode_lesson_signals(row.get("signals"))
signal_error = str(signals.get("error_class") or "").strip().lower()
if last_error_class and signal_error and signal_error == last_error_class.lower():
score += 2.0
signal_provider = str(signals.get("provider") or "").strip().lower()
signal_model = str(signals.get("model") or "").strip().lower()
signal_profile = str(signals.get("profile") or "").strip().lower()
if provider and signal_provider and signal_provider == provider:
score += 1.0
if model and signal_model and signal_model == model:
score += 1.0
if profile and signal_profile and signal_profile == profile:
score += 1.0
row_ts = row.get("ts")
if isinstance(row_ts, datetime):
dt = row_ts if row_ts.tzinfo else row_ts.replace(tzinfo=timezone.utc)
age_hours = max(0.0, (datetime.now(timezone.utc) - dt).total_seconds() / 3600.0)
score -= min(2.0, age_hours / 168.0) # down-rank lessons older than ~7 days
return score
def _render_operational_lessons(lessons: List[Dict[str, Any]], max_chars: int) -> str:
if not lessons:
return ""
lines = ["Operational Lessons (apply if relevant):"]
for idx, lesson in enumerate(lessons, start=1):
trigger = _lesson_guarded_text(lesson.get("trigger"), max_len=220)
action = _lesson_guarded_text(lesson.get("action"), max_len=220)
avoid = _lesson_guarded_text(lesson.get("avoid"), max_len=220)
if not trigger or not action or not avoid:
continue
chunk = f"{idx}) Trigger: {trigger}\n Do: {action}\n Avoid: {avoid}"
candidate = "\n".join(lines + [chunk])
if len(candidate) > max_chars:
break
lines.append(chunk)
if len(lines) <= 1:
return ""
return "\n".join(lines)
async def _update_last_infer_signal(agent_id: str, *, ok: bool, error_class: Optional[str], latency_ms: int) -> None:
key = str(agent_id or "").strip().lower()
if not key:
return
now = time.monotonic()
async with _lessons_signal_lock:
_lessons_signal_cache[key] = {
"ok": bool(ok),
"error_class": str(error_class or "").strip() or None,
"latency_ms": int(max(0, latency_ms)),
"seen_at": now,
}
_lessons_signal_cache.move_to_end(key, last=True)
threshold = now - max(30, LESSONS_SIGNAL_CACHE_TTL_SECONDS)
stale_keys = [k for k, v in _lessons_signal_cache.items() if float(v.get("seen_at", 0.0)) < threshold]
for stale_key in stale_keys:
_lessons_signal_cache.pop(stale_key, None)
while len(_lessons_signal_cache) > 4000:
_lessons_signal_cache.popitem(last=False)
async def _get_last_infer_signal(agent_id: str) -> Optional[Dict[str, Any]]:
key = str(agent_id or "").strip().lower()
if not key:
return None
now = time.monotonic()
async with _lessons_signal_lock:
value = _lessons_signal_cache.get(key)
if not value:
return None
age = now - float(value.get("seen_at", 0.0))
if age > LESSONS_SIGNAL_CACHE_TTL_SECONDS:
_lessons_signal_cache.pop(key, None)
return None
return dict(value)
async def _fetch_ranked_lessons(
*,
agent_id: str,
provider: str,
model: str,
profile: str,
last_error_class: Optional[str],
limit: int,
) -> Tuple[List[Dict[str, Any]], str, int]:
if lessons_db_pool is None:
return [], "err", 0
query = """
SELECT lesson_key, ts, scope, agent_id, task_type, trigger, action, avoid, signals
FROM agent_lessons
WHERE (agent_id = $1 OR agent_id IS NULL)
AND task_type = 'infer'
ORDER BY (agent_id = $1) DESC, ts DESC
LIMIT 50
"""
started = time.time()
try:
async with lessons_db_pool.acquire() as conn:
rows = await asyncio.wait_for(
conn.fetch(query, str(agent_id).strip().lower()),
timeout=LESSONS_ATTACH_TIMEOUT_MS / 1000.0,
)
except asyncio.TimeoutError:
elapsed = max(0, int((time.time() - started) * 1000))
return [], "timeout", elapsed
except Exception as e:
logger.debug("Lessons retrieval failed: %s", e)
elapsed = max(0, int((time.time() - started) * 1000))
return [], "err", elapsed
ranked: List[Tuple[float, datetime, Dict[str, Any]]] = []
for row in rows:
row_data = dict(row)
lesson = {
"lesson_key": row_data.get("lesson_key"),
"ts": row_data.get("ts"),
"scope": row_data.get("scope"),
"agent_id": row_data.get("agent_id"),
"task_type": row_data.get("task_type"),
"trigger": row_data.get("trigger"),
"action": row_data.get("action"),
"avoid": row_data.get("avoid"),
"signals": _decode_lesson_signals(row_data.get("signals")),
}
if not (
_lesson_guarded_text(lesson.get("trigger"))
and _lesson_guarded_text(lesson.get("action"))
and _lesson_guarded_text(lesson.get("avoid"))
):
continue
score = _score_lesson_record(
lesson,
agent_id=agent_id,
provider=(provider or "").strip().lower(),
model=(model or "").strip().lower(),
profile=(profile or "").strip().lower(),
last_error_class=last_error_class,
)
ts = lesson.get("ts")
if not isinstance(ts, datetime):
ts = datetime.now(timezone.utc) - timedelta(days=365)
ranked.append((score, ts, lesson))
ranked.sort(key=lambda item: (item[0], item[1]), reverse=True)
selected = [item[2] for item in ranked[: max(1, limit)]]
elapsed = max(0, int((time.time() - started) * 1000))
return selected, "ok", elapsed
app = FastAPI(title="DAARION Router", version="2.0.0")
# Configuration
@@ -907,6 +1243,27 @@ VISION_URL = os.getenv("VISION_URL", "http://host.docker.internal:11434")
OCR_URL = os.getenv("OCR_URL", "http://swapper-service:8890")
DOCUMENT_URL = os.getenv("DOCUMENT_URL", "http://swapper-service:8890")
CITY_SERVICE_URL = os.getenv("CITY_SERVICE_URL", "http://daarion-city-service:7001")
LESSONS_ATTACH_ENABLED = os.getenv("LESSONS_ATTACH_ENABLED", "true").lower() in {"1", "true", "yes"}
LESSONS_ATTACH_MIN = max(1, int(os.getenv("LESSONS_ATTACH_MIN", "3")))
LESSONS_ATTACH_MAX = max(LESSONS_ATTACH_MIN, int(os.getenv("LESSONS_ATTACH_MAX", "7")))
LESSONS_ATTACH_TIMEOUT_MS = max(5, int(os.getenv("LESSONS_ATTACH_TIMEOUT_MS", "25")))
LESSONS_ATTACH_SAMPLE_PCT = max(0.0, min(100.0, float(os.getenv("LESSONS_ATTACH_SAMPLE_PCT", "10"))))
LESSONS_ATTACH_MAX_CHARS = max(400, int(os.getenv("LESSONS_ATTACH_MAX_CHARS", "1200")))
LESSONS_SIGNAL_CACHE_TTL_SECONDS = max(30, int(os.getenv("LESSONS_SIGNAL_CACHE_TTL_SECONDS", "300")))
LESSONS_LATENCY_SPIKE_MS = max(250, int(os.getenv("EXPERIENCE_LATENCY_SPIKE_MS", "5000")))
LESSONS_DATABASE_URL = (
os.getenv("LESSONS_DATABASE_URL")
or os.getenv("EXPERIENCE_DATABASE_URL")
or os.getenv("DATABASE_URL")
)
LESSONS_INJECTION_GUARDS = (
"ignore previous",
"ignore all previous",
"system:",
"developer:",
"```",
)
# CrewAI Routing Configuration
CREWAI_ROUTING_ENABLED = os.getenv("CREWAI_ROUTING_ENABLED", "true").lower() == "true"
@@ -947,6 +1304,12 @@ nats_available = False
# Tool Manager
tool_manager = None
runtime_guard_engine = None
experience_bus = None
lessons_db_pool = None
_lessons_signal_cache: "OrderedDict[str, Dict[str, Any]]" = OrderedDict()
_lessons_signal_lock = asyncio.Lock()
_INFER_PATH_RE = re.compile(r"^/v1/agents/([^/]+)/infer/?$")
# Models
class FilterDecision(BaseModel):
@@ -999,10 +1362,146 @@ def load_router_config():
config = load_config()
router_config = load_router_config()
@app.middleware("http")
async def experience_capture_middleware(request: Request, call_next):
"""Capture /infer outcomes and emit ExperienceEvent asynchronously."""
infer_agent_id = _extract_infer_agent_id(request.url.path)
if (
not infer_agent_id
or request.method.upper() != "POST"
or not EXPERIENCE_BUS_AVAILABLE
or experience_bus is None
):
return await call_next(request)
started_at = time.time()
req_body = await request.body()
async def _receive() -> Dict[str, Any]:
return {"type": "http.request", "body": req_body, "more_body": False}
wrapped_request = Request(request.scope, _receive)
response = None
response_body = b""
status_code = 500
caught_exc: Optional[Exception] = None
try:
response = await call_next(wrapped_request)
status_code = int(response.status_code)
chunks: List[bytes] = []
async for chunk in response.body_iterator:
chunks.append(chunk)
response_body = b"".join(chunks)
except Exception as exc: # pragma: no cover - defensive capture path
caught_exc = exc
status_code = 500
latency_ms = max(0, int((time.time() - started_at) * 1000))
try:
req_payload = _safe_json_from_bytes(req_body)
resp_payload = _safe_json_from_bytes(response_body)
metadata = req_payload.get("metadata")
if not isinstance(metadata, dict):
metadata = {}
prompt = str(req_payload.get("prompt") or "")
normalized_input = normalize_input_for_hash(prompt)
inputs_hash = hashlib.sha256(normalized_input.encode("utf-8")).hexdigest()
profile = _resolve_profile_for_event(infer_agent_id, req_payload)
profile_cfg = {}
if profile and isinstance(router_config, dict):
profile_cfg = (router_config.get("llm_profiles") or {}).get(profile, {}) or {}
if not isinstance(profile_cfg, dict):
profile_cfg = {}
model = str(resp_payload.get("model") or profile_cfg.get("model") or "unknown")
backend = str(resp_payload.get("backend") or "")
provider = _derive_provider_from_backend_model(backend, model, profile)
tokens_total = resp_payload.get("tokens_used")
tokens_out = int(tokens_total) if isinstance(tokens_total, int) else None
request_id = str(
metadata.get("request_id")
or metadata.get("trace_id")
or request.headers.get("x-request-id")
or ""
).strip() or None
err_class: Optional[str] = None
err_msg: Optional[str] = None
detail_obj = resp_payload.get("detail")
if caught_exc is not None:
err_class = type(caught_exc).__name__
err_msg = str(caught_exc)
elif status_code >= 400:
if isinstance(detail_obj, dict):
err_class = str(detail_obj.get("code") or detail_obj.get("error_class") or f"http_{status_code}")
err_msg = str(detail_obj.get("message") or detail_obj.get("detail") or json.dumps(detail_obj))
elif isinstance(detail_obj, str):
err_class = f"http_{status_code}"
err_msg = detail_obj
else:
err_class = f"http_{status_code}"
err_msg = f"http_status={status_code}"
await _update_last_infer_signal(
infer_agent_id,
ok=status_code < 400,
error_class=err_class,
latency_ms=latency_ms,
)
event = {
"event_id": str(uuid.uuid4()),
"ts": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"node_id": os.getenv("NODE_ID", "NODA1"),
"source": "router",
"agent_id": infer_agent_id,
"request_id": request_id,
"channel": _infer_channel_from_metadata(metadata),
"task_type": "infer",
"inputs_hash": inputs_hash,
"llm": {
"provider": provider,
"model": model,
"profile": profile,
"latency_ms": latency_ms,
"tokens_in": None,
"tokens_out": tokens_out,
},
"result": {
"ok": status_code < 400,
"error_class": err_class,
"error_msg_redacted": redact_error_message(err_msg),
"http_status": status_code,
},
}
await experience_bus.capture(event)
except Exception as exp_err:
logger.debug("Experience capture skipped: %s", exp_err)
if caught_exc is not None:
raise caught_exc
headers = dict(response.headers) if response is not None else {}
headers.pop("content-length", None)
return Response(
content=response_body,
status_code=status_code,
headers=headers,
media_type=response.media_type if response is not None else "application/json",
background=response.background if response is not None else None,
)
@app.on_event("startup")
async def startup_event():
"""Initialize NATS connection and subscriptions"""
global nc, nats_available, http_client, neo4j_driver, neo4j_available, runtime_guard_engine
global nc, nats_available, http_client, neo4j_driver, neo4j_available, runtime_guard_engine, experience_bus, lessons_db_pool
logger.info("🚀 DAGI Router v2.0.0 starting up...")
# Initialize HTTP client
@@ -1042,6 +1541,34 @@ async def startup_event():
logger.warning("⚠️ Running in test mode (HTTP only)")
nats_available = False
# Initialize Experience Bus (Phase-1)
if EXPERIENCE_BUS_AVAILABLE and ExperienceBus is not None:
try:
experience_bus = ExperienceBus()
await experience_bus.start(nats_client=nc if nats_available else None)
logger.info("✅ Experience Bus initialized")
except Exception as e:
experience_bus = None
logger.warning(f"⚠️ Experience Bus init failed: {e}")
# Initialize lessons retrieval pool (Phase-3 read path)
if LESSONS_ATTACH_ENABLED:
if asyncpg is None:
logger.warning("⚠️ Lessons attach enabled but asyncpg is unavailable")
elif not LESSONS_DATABASE_URL:
logger.warning("⚠️ Lessons attach enabled but LESSONS_DATABASE_URL is missing")
else:
try:
lessons_db_pool = await asyncpg.create_pool(
LESSONS_DATABASE_URL,
min_size=1,
max_size=3,
)
logger.info("✅ Lessons DB pool initialized")
except Exception as e:
lessons_db_pool = None
logger.warning(f"⚠️ Lessons DB pool init failed: {e}")
# Initialize Memory Retrieval Pipeline
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval:
try:
@@ -1766,6 +2293,24 @@ async def agent_infer(agent_id: str, request: InferRequest):
logger.info(f"🔀 Inference request for agent: {agent_id}")
logger.info(f"📝 Prompt: {request.prompt[:100]}...")
inactive_state = _inactive_agent_state(agent_id)
if inactive_state is not None:
status_code = 410 if inactive_state == "planned" else 404
logger.info(
"⛔ Agent unavailable by lifecycle state: agent=%s state=%s",
agent_id,
inactive_state,
)
raise HTTPException(
status_code=status_code,
detail={
"code": f"agent_{inactive_state}",
"agent_id": str(agent_id).strip().lower(),
"state": inactive_state,
"message": "Agent is not active in this environment",
},
)
# =========================================================================
# MEMORY RETRIEVAL (v4.0 - Universal for all agents)
# =========================================================================
@@ -2683,21 +3228,75 @@ async def agent_infer(agent_id: str, request: InferRequest):
# Priority: DeepSeek → Mistral → Grok → Local Ollama
# =========================================================================
lessons_block = ""
lessons_attached_count = 0
if LESSONS_ATTACH_ENABLED and not request.images:
retrieval_always_on = False
retrieval_limit = LESSONS_ATTACH_MIN
last_signal = await _get_last_infer_signal(request_agent_id)
last_error_class = None
if last_signal:
last_error_class = last_signal.get("error_class")
if (not bool(last_signal.get("ok", True))) or int(last_signal.get("latency_ms", 0) or 0) >= LESSONS_LATENCY_SPIKE_MS:
retrieval_always_on = True
retrieval_limit = LESSONS_ATTACH_MAX
should_retrieve = retrieval_always_on or (random_module.random() * 100.0 < LESSONS_ATTACH_SAMPLE_PCT)
if should_retrieve:
lessons_rows, retrieval_status, retrieval_latency_ms = await _fetch_ranked_lessons(
agent_id=request_agent_id,
provider=str(provider or "").strip().lower(),
model=str(model or "").strip().lower(),
profile=str(default_llm or "").strip().lower(),
last_error_class=str(last_error_class or "").strip() or None,
limit=retrieval_limit,
)
inc_lessons_retrieved(status=retrieval_status)
observe_lessons_attach_latency(latency_ms=float(retrieval_latency_ms))
if retrieval_status == "ok" and lessons_rows:
selected_lessons = lessons_rows[:retrieval_limit]
lessons_block = _render_operational_lessons(selected_lessons, LESSONS_ATTACH_MAX_CHARS)
if lessons_block:
lessons_attached_count = len(selected_lessons)
logger.info(
"🧠 lessons_attached=%s agent=%s mode=%s",
lessons_attached_count,
request_agent_id,
"always_on" if retrieval_always_on else "sampled",
)
inc_lessons_attached(count=lessons_attached_count)
# Build messages array once for all providers
messages = []
if system_prompt:
combined_parts: List[str] = [system_prompt]
if memory_brief_text:
enhanced_prompt = f"{system_prompt}\n\n[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}"
messages.append({"role": "system", "content": enhanced_prompt})
logger.info(f"📝 Added system message with prompt ({len(system_prompt)} chars) + memory ({len(memory_brief_text)} chars)")
else:
messages.append({"role": "system", "content": system_prompt})
logger.info(f"📝 Added system message with prompt ({len(system_prompt)} chars)")
elif memory_brief_text:
messages.append({"role": "system", "content": f"[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}"})
logger.warning(f"⚠️ No system_prompt! Using only memory brief ({len(memory_brief_text)} chars)")
combined_parts.append(f"[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}")
if lessons_block:
combined_parts.append(f"[OPERATIONAL LESSONS - INTERNAL]\n{lessons_block}")
enhanced_prompt = "\n\n".join(combined_parts)
messages.append({"role": "system", "content": enhanced_prompt})
logger.info(
"📝 Added system message prompt=%s memory=%s lessons=%s",
len(system_prompt),
len(memory_brief_text or ""),
lessons_attached_count,
)
elif memory_brief_text or lessons_block:
fallback_parts: List[str] = []
if memory_brief_text:
fallback_parts.append(f"[INTERNAL MEMORY - do NOT repeat to user]\n{memory_brief_text}")
if lessons_block:
fallback_parts.append(f"[OPERATIONAL LESSONS - INTERNAL]\n{lessons_block}")
messages.append({"role": "system", "content": "\n\n".join(fallback_parts)})
logger.warning(
"⚠️ No system_prompt! Using fallback context memory=%s lessons=%s",
len(memory_brief_text or ""),
lessons_attached_count,
)
else:
logger.error(f"❌ No system_prompt AND no memory_brief! LLM will have no context!")
logger.error("❌ No system_prompt, memory_brief, or lessons; LLM will have no context")
messages.append({"role": "user", "content": request.prompt})
logger.debug(f"📨 Messages array: {len(messages)} messages, system={len(messages[0].get('content', '')) if messages else 0} chars")
@@ -4555,7 +5154,7 @@ async def sofiia_model_catalog(refresh_ollama: bool = False):
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup connections on shutdown"""
global neo4j_driver, http_client, nc
global neo4j_driver, http_client, nc, experience_bus, lessons_db_pool
# Close Memory Retrieval
if MEMORY_RETRIEVAL_AVAILABLE and memory_retrieval:
@@ -4576,3 +5175,17 @@ async def shutdown_event():
if nc:
await nc.close()
logger.info("🔌 NATS connection closed")
if EXPERIENCE_BUS_AVAILABLE and experience_bus:
try:
await experience_bus.stop()
logger.info("🔌 Experience Bus closed")
except Exception as e:
logger.warning(f"⚠️ Experience Bus close error: {e}")
if lessons_db_pool is not None:
try:
await lessons_db_pool.close()
logger.info("🔌 Lessons DB pool closed")
except Exception as e:
logger.warning(f"⚠️ Lessons DB pool close error: {e}")