- Vision Encoder Service (OpenCLIP ViT-L/14, GPU-accelerated)
- FastAPI app with text/image embedding endpoints (768-dim)
- Docker support with NVIDIA GPU runtime
- Port 8001, health checks, model info API
- Qdrant Vector Database integration
- Port 6333/6334 (HTTP/gRPC)
- Image embeddings storage (768-dim, Cosine distance)
- Auto collection creation
- Vision RAG implementation
- VisionEncoderClient (Python client for API)
- Image Search module (text-to-image, image-to-image)
- Vision RAG routing in DAGI Router (mode: image_search)
- VisionEncoderProvider integration
- Documentation (5000+ lines)
- SYSTEM-INVENTORY.md - Complete system inventory
- VISION-ENCODER-STATUS.md - Service status
- VISION-RAG-IMPLEMENTATION.md - Implementation details
- vision_encoder_deployment_task.md - Deployment checklist
- services/vision-encoder/README.md - Deployment guide
- Updated WARP.md, INFRASTRUCTURE.md, Jupyter Notebook
- Testing
- test-vision-encoder.sh - Smoke tests (6 tests)
- Unit tests for client, image search, routing
- Services: 17 total (added Vision Encoder + Qdrant)
- AI Models: 3 (qwen3:8b, OpenCLIP ViT-L/14, BAAI/bge-m3)
- GPU Services: 2 (Vision Encoder, Ollama)
- VRAM Usage: ~10 GB (concurrent)
Status: Production Ready ✅
14 KiB
📊 RAG Event-Driven Ingestion — Status
Версія: 1.0.0
Останнє оновлення: 2025-01-17
Статус: ✅ Wave 1, 2, 3 Complete
🎯 Overview
Event-driven архітектура для автоматичного інжесту контенту в RAG систему через NATS JetStream. Система підписується на різні типи events з різних streams та автоматично індексує контент у Milvus та Neo4j.
Документація:
- Event Catalog — Повний каталог NATS streams та events
- Wave 1 Task — Chat/Docs/Files ingestion
- Wave 2 Task — Tasks/Followups/Meetings ingestion
- Wave 3 Task — Governance/RWA/Oracle ingestion
✅ Wave 1: Chat Messages, Documents, Files (MVP)
Статус: ✅ Complete
Дата завершення: 2025-01-16
Implemented Features
Event Handlers (rag-service/event_worker.py)
- ✅
handle_document_parsed_event()— обробкаrag.document.parsedзSTREAM_RAG - ✅ Автоматичний інжест parsed documents в Milvus + Neo4j
- ✅ Ідемпотентність (пропуск вже індексованих документів)
- ✅ Публікація події
rag.document.indexedпісля успішної індексації
Event Publishing (rag-service/events.py)
- ✅
publish_document_indexed()— публікаціяrag.document.indexed - ✅ Connection management з NATS
- ✅ Retry logic при помилках публікації
Event Publishing (parser-service/events.py)
- ✅
publish_document_parsed()— публікаціяrag.document.parsedпісля OCR - ✅ Інтеграція в API endpoints (
/ocr/parse,/ocr/parse_markdown, etc.)
Infrastructure
- ✅ NATS JetStream service в
docker-compose.yml - ✅
STREAM_RAGстворено з subjects:rag.document.parsedrag.document.indexedrag.document.reindexedrag.chat.message.createdrag.file.uploaded
- ✅ Lifespan startup в
rag-service— автоматичний запуск event worker - ✅ Environment variables (
NATS_URL) в конфігурації
Testing
- ✅ Unit tests для event publishing
- ✅ Unit tests для event consumption
- E2E smoke test (parser → NATS → rag-service)
✅ Wave 2: Tasks, Followups, Meetings
Статус: ✅ Complete
Дата завершення: 2025-01-17
Implemented Features
Event Handlers (rag-service/event_worker.py)
- ✅
handle_task_created_event()— обробкаtask.createdзSTREAM_TASK - ✅
handle_task_updated_event()— обробкаtask.updatedзSTREAM_TASK - ✅
handle_meeting_transcript_event()— обробкаmeeting.transcript.createdзSTREAM_MEETING - ✅ Автоматичний інжест tasks при створенні/оновленні
- ✅ Автоматичний інжест meeting transcripts
- ✅ Helper function
_ingest_content_to_rag()для універсального інжесту
Event Publishing (rag-service/events.py)
- ✅
publish_task_indexed()— публікаціяrag.task.indexed - ✅
publish_task_reindexed()— публікаціяrag.task.reindexed - ✅
publish_meeting_indexed()— публікаціяrag.meeting.indexed
Subscriptions
- ✅
STREAM_TASK.task.created - ✅
STREAM_TASK.task.updated - ✅
STREAM_MEETING.meeting.transcript.created
Data Ingested
- Tasks: title, description, assignee, status, priority, labels, project_id
- Meetings: transcript, attendees, duration, summary, dao_id, team_id
Neo4j Graph Relations (Future)
- Task → User (assignee)
- Task → Project
- Meeting → User (attendees)
- Meeting → Team
✅ Wave 3: Governance, RWA, Oracle
Статус: ✅ Complete
Дата завершення: 2025-01-17
Implemented Features
Event Handlers (rag-service/event_worker.py)
- ✅
handle_governance_policy_event()— обробкаgovernance.policy.created/updatedзSTREAM_GOVERNANCE - ✅
handle_governance_proposal_event()— обробкаgovernance.proposal.createdзSTREAM_GOVERNANCE - ✅
handle_rwa_inventory_event()— обробкаrwa.inventory.updatedзSTREAM_RWA - ✅
handle_oracle_reading_event()— обробкаoracle.reading.publishedзSTREAM_ORACLE- ✅ Фільтрація тільки важливих readings (критичні зміни)
Event Publishing (rag-service/events.py)
- ✅
publish_governance_policy_indexed()— публікаціяrag.governance.policy.indexed - ✅
publish_governance_proposal_indexed()— публікаціяrag.governance.proposal.indexed - ✅
publish_rwa_inventory_indexed()— публікаціяrag.rwa.inventory.indexed - ✅
publish_oracle_reading_indexed()— публікаціяrag.oracle.reading.indexed
Subscriptions
- ✅
STREAM_GOVERNANCE.governance.policy.*(created/updated) - ✅
STREAM_GOVERNANCE.governance.proposal.created - ✅
STREAM_RWA.rwa.inventory.updated - ✅
STREAM_ORACLE.oracle.reading.published
Data Ingested
Governance:
- Policies: title, description, rules, enforcement_level, dao_id
- Proposals: title, description, proposer_id, vote_count, status
RWA (Real World Assets):
- Inventory updates: stock levels, locations, energy generation, water quality
- Platform: GREENFOOD, Energy Union, Water Union
Oracle:
- Sensor readings (тільки важливі): temperature thresholds, pressure alerts, quality changes
- Automatic filtering based on severity
Neo4j Graph Relations (Future)
- Proposal → User (proposer)
- Proposal → DAO
- Policy → DAO
- RWA Asset → Platform
- Oracle Reading → Asset
🏗️ Architecture
Event Flow
┌─────────────────┐
│ Parser Service │
│ (OCR Pipeline) │
└────────┬────────┘
│ publish
▼
┌────────┐
│ NATS │
│ Stream │ ← STREAM_RAG, STREAM_TASK, STREAM_MEETING,
└────┬───┘ STREAM_GOVERNANCE, STREAM_RWA, STREAM_ORACLE
│ subscribe
▼
┌─────────────────┐
│ RAG Service │
│ Event Worker │
│ ├ Wave 1 │
│ ├ Wave 2 │
│ └ Wave 3 │
└────────┬────────┘
│ ingest
▼
┌──────────────┐
│ Milvus + Neo4j│
│ Vector DB │
└──────────────┘
│
▼ publish
┌────────┐
│ NATS │ ← rag.*.indexed events
└────────┘
Event Worker (rag-service/event_worker.py)
Parallel Subscriptions:
await asyncio.gather(
subscribe_to_rag_events(js), # Wave 1: STREAM_RAG
subscribe_to_task_events(js), # Wave 2: STREAM_TASK
subscribe_to_meeting_events(js), # Wave 2: STREAM_MEETING
subscribe_to_governance_events(js), # Wave 3: STREAM_GOVERNANCE
subscribe_to_rwa_events(js), # Wave 3: STREAM_RWA
subscribe_to_oracle_events(js), # Wave 3: STREAM_ORACLE
)
Graceful Handling:
- ⚠️ Warning logs for missing streams (не падає)
- 🔄 Automatic retry при помилках (не ack повідомлення)
- ✅ Ідемпотентність через перевірку
indexedflag
📦 File Structure
services/
├── parser-service/
│ └── app/
│ └── events.py # Event publishing (Wave 1)
│ ├── publish_document_parsed()
│ └── NATS connection management
│
└── rag-service/
└── app/
├── events.py # Event publishing (Waves 1, 2, 3)
│ ├── Wave 1: publish_document_indexed()
│ ├── Wave 2: publish_task_indexed(), publish_meeting_indexed()
│ └── Wave 3: publish_governance_*(), publish_rwa_*(), publish_oracle_*()
│
├── event_worker.py # Event handlers & subscriptions (Waves 1, 2, 3)
│ ├── Wave 1: handle_document_parsed_event()
│ ├── Wave 2: handle_task_*(), handle_meeting_*()
│ ├── Wave 3: handle_governance_*(), handle_rwa_*(), handle_oracle_*()
│ └── Helper: _ingest_content_to_rag()
│
├── worker.py # Async ingestion jobs
└── main.py # Lifespan startup (автозапуск event worker)
🔧 Configuration
Environment Variables
# NATS Configuration
NATS_URL=nats://nats:4222
# RAG Service
RAG_SERVICE_URL=http://rag-service:9500
# Parser Service
PARSER_SERVICE_URL=http://parser-service:9400
# Milvus
MILVUS_HOST=milvus
MILVUS_PORT=19530
# Neo4j
NEO4J_URI=bolt://neo4j:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=password
NATS Streams to Create
Before running the system, create these streams:
# Wave 1
python scripts/init_nats_streams.py STREAM_RAG
# Wave 2
python scripts/init_nats_streams.py STREAM_TASK
python scripts/init_nats_streams.py STREAM_MEETING
# Wave 3
python scripts/init_nats_streams.py STREAM_GOVERNANCE
python scripts/init_nats_streams.py STREAM_RWA
python scripts/init_nats_streams.py STREAM_ORACLE
Or create all at once:
python scripts/init_nats_streams.py --all
🧪 Testing
Unit Tests
Parser Service:
cd services/parser-service
python -m pytest tests/test_events.py
RAG Service:
cd services/rag-service
python -m pytest tests/test_events.py
python -m pytest tests/test_event_worker.py
E2E Tests
Wave 1 (Document Parsing):
# 1. Upload document через parser-service
curl -X POST http://localhost:9400/ocr/parse \
-F "file=@test.pdf" \
-F "dao_id=test-dao"
# 2. Check rag-service logs для document indexed event
docker-compose logs -f rag-service | grep "indexed"
# 3. Verify document in Milvus
curl http://localhost:9500/search?query=test&dao_id=test-dao
Wave 2 (Tasks):
# 1. Create task через task service (or manually publish event)
curl -X POST http://localhost:TASK_SERVICE_PORT/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Test task", "description": "Description", "dao_id": "test-dao"}'
# 2. Check rag-service logs
docker-compose logs -f rag-service | grep "task.indexed"
# 3. Search for task in RAG
curl http://localhost:9500/search?query=test+task&dao_id=test-dao
Wave 3 (Governance):
# Similar flow for governance proposals, RWA updates, oracle readings
📊 Monitoring
Health Checks
# NATS
curl http://localhost:8222/healthz
# RAG Service
curl http://localhost:9500/health
# Parser Service
curl http://localhost:9400/health
Event Worker Status
# Check if event worker is running
docker-compose logs rag-service | grep "Event worker started"
# Check subscriptions
docker-compose logs rag-service | grep "Subscribed to"
# Check event processing
docker-compose logs rag-service | grep "Processing event"
NATS Stream Status
# Using NATS CLI
nats stream list
nats stream info STREAM_RAG
nats stream info STREAM_TASK
nats stream info STREAM_MEETING
nats stream info STREAM_GOVERNANCE
nats stream info STREAM_RWA
nats stream info STREAM_ORACLE
🚀 Deployment
Docker Compose
services/rag-service/docker-compose.yml:
services:
nats:
image: nats:latest
command: "-js"
ports:
- "4222:4222"
- "8222:8222"
rag-service:
build: ./services/rag-service
environment:
- NATS_URL=nats://nats:4222
- MILVUS_HOST=milvus
- NEO4J_URI=bolt://neo4j:7687
depends_on:
- nats
- milvus
- neo4j
Start Services
# Start all services
docker-compose up -d
# Check status
docker-compose ps
# Initialize NATS streams
python scripts/init_nats_streams.py --all
# View logs
docker-compose logs -f rag-service
📝 Next Steps
Phase 1: Stabilization (Current Priority)
- E2E smoke tests для всіх 3 waves
- Monitoring dashboard (Prometheus + Grafana)
- Alerting на помилки event processing
- Performance benchmarks (throughput, latency)
Phase 2: Enhancement
- Neo4j graph relations для всіх entity types
- Search improvements (hybrid search, re-ranking)
- Batch ingestion для bulk uploads
- Dead letter queue для failed events
Phase 3: Advanced Features
- Event replay для re-indexing
- Versioning документів (old vs new)
- Access control в RAG queries (RBAC integration)
- Multi-modal search (text + image + metadata)
🔗 Related Documentation
- INFRASTRUCTURE.md — Server infrastructure, deployment
- WARP.md — Developer guide, architecture overview
- docs/agents.md — Agent hierarchy (A1-A4)
- docs/cursor/42_nats_event_streams_and_event_catalog.md — Event Catalog
- TODO-PARSER-RAG.md — Parser Agent implementation roadmap
Статус: ✅ Wave 1, 2, 3 Complete
Last Updated: 2025-01-17 by WARP AI
Maintained by: Ivan Tytar & DAARION Team