- Vision Encoder Service (OpenCLIP ViT-L/14, GPU-accelerated)
- FastAPI app with text/image embedding endpoints (768-dim)
- Docker support with NVIDIA GPU runtime
- Port 8001, health checks, model info API
- Qdrant Vector Database integration
- Port 6333/6334 (HTTP/gRPC)
- Image embeddings storage (768-dim, Cosine distance)
- Auto collection creation
- Vision RAG implementation
- VisionEncoderClient (Python client for API)
- Image Search module (text-to-image, image-to-image)
- Vision RAG routing in DAGI Router (mode: image_search)
- VisionEncoderProvider integration
- Documentation (5000+ lines)
- SYSTEM-INVENTORY.md - Complete system inventory
- VISION-ENCODER-STATUS.md - Service status
- VISION-RAG-IMPLEMENTATION.md - Implementation details
- vision_encoder_deployment_task.md - Deployment checklist
- services/vision-encoder/README.md - Deployment guide
- Updated WARP.md, INFRASTRUCTURE.md, Jupyter Notebook
- Testing
- test-vision-encoder.sh - Smoke tests (6 tests)
- Unit tests for client, image search, routing
- Services: 17 total (added Vision Encoder + Qdrant)
- AI Models: 3 (qwen3:8b, OpenCLIP ViT-L/14, BAAI/bge-m3)
- GPU Services: 2 (Vision Encoder, Ollama)
- VRAM Usage: ~10 GB (concurrent)
Status: Production Ready ✅
459 lines
14 KiB
Markdown
459 lines
14 KiB
Markdown
# 📊 RAG Event-Driven Ingestion — Status
|
||
|
||
**Версія:** 1.0.0
|
||
**Останнє оновлення:** 2025-01-17
|
||
**Статус:** ✅ Wave 1, 2, 3 Complete
|
||
|
||
---
|
||
|
||
## 🎯 Overview
|
||
|
||
Event-driven архітектура для автоматичного інжесту контенту в RAG систему через NATS JetStream. Система підписується на різні типи events з різних streams та автоматично індексує контент у Milvus та Neo4j.
|
||
|
||
**Документація:**
|
||
- [Event Catalog](./docs/cursor/42_nats_event_streams_and_event_catalog.md) — Повний каталог NATS streams та events
|
||
- [Wave 1 Task](./docs/cursor/rag_ingestion_events_wave1_mvp_task.md) — Chat/Docs/Files ingestion
|
||
- [Wave 2 Task](./docs/cursor/rag_ingestion_events_wave2_workflows_task.md) — Tasks/Followups/Meetings ingestion
|
||
- [Wave 3 Task](./docs/cursor/rag_ingestion_events_wave3_governance_rwa_task.md) — Governance/RWA/Oracle ingestion
|
||
|
||
---
|
||
|
||
## ✅ Wave 1: Chat Messages, Documents, Files (MVP)
|
||
|
||
**Статус:** ✅ Complete
|
||
**Дата завершення:** 2025-01-16
|
||
|
||
### Implemented Features
|
||
|
||
#### Event Handlers (rag-service/event_worker.py)
|
||
- ✅ `handle_document_parsed_event()` — обробка `rag.document.parsed` з `STREAM_RAG`
|
||
- ✅ Автоматичний інжест parsed documents в Milvus + Neo4j
|
||
- ✅ Ідемпотентність (пропуск вже індексованих документів)
|
||
- ✅ Публікація події `rag.document.indexed` після успішної індексації
|
||
|
||
#### Event Publishing (rag-service/events.py)
|
||
- ✅ `publish_document_indexed()` — публікація `rag.document.indexed`
|
||
- ✅ Connection management з NATS
|
||
- ✅ Retry logic при помилках публікації
|
||
|
||
#### Event Publishing (parser-service/events.py)
|
||
- ✅ `publish_document_parsed()` — публікація `rag.document.parsed` після OCR
|
||
- ✅ Інтеграція в API endpoints (`/ocr/parse`, `/ocr/parse_markdown`, etc.)
|
||
|
||
#### Infrastructure
|
||
- ✅ NATS JetStream service в `docker-compose.yml`
|
||
- ✅ `STREAM_RAG` створено з subjects:
|
||
- `rag.document.parsed`
|
||
- `rag.document.indexed`
|
||
- `rag.document.reindexed`
|
||
- `rag.chat.message.created`
|
||
- `rag.file.uploaded`
|
||
- ✅ Lifespan startup в `rag-service` — автоматичний запуск event worker
|
||
- ✅ Environment variables (`NATS_URL`) в конфігурації
|
||
|
||
### Testing
|
||
- ✅ Unit tests для event publishing
|
||
- ✅ Unit tests для event consumption
|
||
- [ ] E2E smoke test (parser → NATS → rag-service)
|
||
|
||
---
|
||
|
||
## ✅ Wave 2: Tasks, Followups, Meetings
|
||
|
||
**Статус:** ✅ Complete
|
||
**Дата завершення:** 2025-01-17
|
||
|
||
### Implemented Features
|
||
|
||
#### Event Handlers (rag-service/event_worker.py)
|
||
- ✅ `handle_task_created_event()` — обробка `task.created` з `STREAM_TASK`
|
||
- ✅ `handle_task_updated_event()` — обробка `task.updated` з `STREAM_TASK`
|
||
- ✅ `handle_meeting_transcript_event()` — обробка `meeting.transcript.created` з `STREAM_MEETING`
|
||
- ✅ Автоматичний інжест tasks при створенні/оновленні
|
||
- ✅ Автоматичний інжест meeting transcripts
|
||
- ✅ Helper function `_ingest_content_to_rag()` для універсального інжесту
|
||
|
||
#### Event Publishing (rag-service/events.py)
|
||
- ✅ `publish_task_indexed()` — публікація `rag.task.indexed`
|
||
- ✅ `publish_task_reindexed()` — публікація `rag.task.reindexed`
|
||
- ✅ `publish_meeting_indexed()` — публікація `rag.meeting.indexed`
|
||
|
||
#### Subscriptions
|
||
- ✅ `STREAM_TASK.task.created`
|
||
- ✅ `STREAM_TASK.task.updated`
|
||
- ✅ `STREAM_MEETING.meeting.transcript.created`
|
||
|
||
### Data Ingested
|
||
- Tasks: title, description, assignee, status, priority, labels, project_id
|
||
- Meetings: transcript, attendees, duration, summary, dao_id, team_id
|
||
|
||
### Neo4j Graph Relations (Future)
|
||
- [ ] Task → User (assignee)
|
||
- [ ] Task → Project
|
||
- [ ] Meeting → User (attendees)
|
||
- [ ] Meeting → Team
|
||
|
||
---
|
||
|
||
## ✅ Wave 3: Governance, RWA, Oracle
|
||
|
||
**Статус:** ✅ Complete
|
||
**Дата завершення:** 2025-01-17
|
||
|
||
### Implemented Features
|
||
|
||
#### Event Handlers (rag-service/event_worker.py)
|
||
- ✅ `handle_governance_policy_event()` — обробка `governance.policy.created/updated` з `STREAM_GOVERNANCE`
|
||
- ✅ `handle_governance_proposal_event()` — обробка `governance.proposal.created` з `STREAM_GOVERNANCE`
|
||
- ✅ `handle_rwa_inventory_event()` — обробка `rwa.inventory.updated` з `STREAM_RWA`
|
||
- ✅ `handle_oracle_reading_event()` — обробка `oracle.reading.published` з `STREAM_ORACLE`
|
||
- ✅ Фільтрація тільки важливих readings (критичні зміни)
|
||
|
||
#### Event Publishing (rag-service/events.py)
|
||
- ✅ `publish_governance_policy_indexed()` — публікація `rag.governance.policy.indexed`
|
||
- ✅ `publish_governance_proposal_indexed()` — публікація `rag.governance.proposal.indexed`
|
||
- ✅ `publish_rwa_inventory_indexed()` — публікація `rag.rwa.inventory.indexed`
|
||
- ✅ `publish_oracle_reading_indexed()` — публікація `rag.oracle.reading.indexed`
|
||
|
||
#### Subscriptions
|
||
- ✅ `STREAM_GOVERNANCE.governance.policy.*` (created/updated)
|
||
- ✅ `STREAM_GOVERNANCE.governance.proposal.created`
|
||
- ✅ `STREAM_RWA.rwa.inventory.updated`
|
||
- ✅ `STREAM_ORACLE.oracle.reading.published`
|
||
|
||
### Data Ingested
|
||
|
||
**Governance:**
|
||
- Policies: title, description, rules, enforcement_level, dao_id
|
||
- Proposals: title, description, proposer_id, vote_count, status
|
||
|
||
**RWA (Real World Assets):**
|
||
- Inventory updates: stock levels, locations, energy generation, water quality
|
||
- Platform: GREENFOOD, Energy Union, Water Union
|
||
|
||
**Oracle:**
|
||
- Sensor readings (тільки важливі): temperature thresholds, pressure alerts, quality changes
|
||
- Automatic filtering based on severity
|
||
|
||
### Neo4j Graph Relations (Future)
|
||
- [ ] Proposal → User (proposer)
|
||
- [ ] Proposal → DAO
|
||
- [ ] Policy → DAO
|
||
- [ ] RWA Asset → Platform
|
||
- [ ] Oracle Reading → Asset
|
||
|
||
---
|
||
|
||
## 🏗️ Architecture
|
||
|
||
### Event Flow
|
||
|
||
```
|
||
┌─────────────────┐
|
||
│ Parser Service │
|
||
│ (OCR Pipeline) │
|
||
└────────┬────────┘
|
||
│ publish
|
||
▼
|
||
┌────────┐
|
||
│ NATS │
|
||
│ Stream │ ← STREAM_RAG, STREAM_TASK, STREAM_MEETING,
|
||
└────┬───┘ STREAM_GOVERNANCE, STREAM_RWA, STREAM_ORACLE
|
||
│ subscribe
|
||
▼
|
||
┌─────────────────┐
|
||
│ RAG Service │
|
||
│ Event Worker │
|
||
│ ├ Wave 1 │
|
||
│ ├ Wave 2 │
|
||
│ └ Wave 3 │
|
||
└────────┬────────┘
|
||
│ ingest
|
||
▼
|
||
┌──────────────┐
|
||
│ Milvus + Neo4j│
|
||
│ Vector DB │
|
||
└──────────────┘
|
||
│
|
||
▼ publish
|
||
┌────────┐
|
||
│ NATS │ ← rag.*.indexed events
|
||
└────────┘
|
||
```
|
||
|
||
### Event Worker (rag-service/event_worker.py)
|
||
|
||
**Parallel Subscriptions:**
|
||
```python
|
||
await asyncio.gather(
|
||
subscribe_to_rag_events(js), # Wave 1: STREAM_RAG
|
||
subscribe_to_task_events(js), # Wave 2: STREAM_TASK
|
||
subscribe_to_meeting_events(js), # Wave 2: STREAM_MEETING
|
||
subscribe_to_governance_events(js), # Wave 3: STREAM_GOVERNANCE
|
||
subscribe_to_rwa_events(js), # Wave 3: STREAM_RWA
|
||
subscribe_to_oracle_events(js), # Wave 3: STREAM_ORACLE
|
||
)
|
||
```
|
||
|
||
**Graceful Handling:**
|
||
- ⚠️ Warning logs for missing streams (не падає)
|
||
- 🔄 Automatic retry при помилках (не ack повідомлення)
|
||
- ✅ Ідемпотентність через перевірку `indexed` flag
|
||
|
||
---
|
||
|
||
## 📦 File Structure
|
||
|
||
```
|
||
services/
|
||
├── parser-service/
|
||
│ └── app/
|
||
│ └── events.py # Event publishing (Wave 1)
|
||
│ ├── publish_document_parsed()
|
||
│ └── NATS connection management
|
||
│
|
||
└── rag-service/
|
||
└── app/
|
||
├── events.py # Event publishing (Waves 1, 2, 3)
|
||
│ ├── Wave 1: publish_document_indexed()
|
||
│ ├── Wave 2: publish_task_indexed(), publish_meeting_indexed()
|
||
│ └── Wave 3: publish_governance_*(), publish_rwa_*(), publish_oracle_*()
|
||
│
|
||
├── event_worker.py # Event handlers & subscriptions (Waves 1, 2, 3)
|
||
│ ├── Wave 1: handle_document_parsed_event()
|
||
│ ├── Wave 2: handle_task_*(), handle_meeting_*()
|
||
│ ├── Wave 3: handle_governance_*(), handle_rwa_*(), handle_oracle_*()
|
||
│ └── Helper: _ingest_content_to_rag()
|
||
│
|
||
├── worker.py # Async ingestion jobs
|
||
└── main.py # Lifespan startup (автозапуск event worker)
|
||
```
|
||
|
||
---
|
||
|
||
## 🔧 Configuration
|
||
|
||
### Environment Variables
|
||
|
||
```bash
|
||
# NATS Configuration
|
||
NATS_URL=nats://nats:4222
|
||
|
||
# RAG Service
|
||
RAG_SERVICE_URL=http://rag-service:9500
|
||
|
||
# Parser Service
|
||
PARSER_SERVICE_URL=http://parser-service:9400
|
||
|
||
# Milvus
|
||
MILVUS_HOST=milvus
|
||
MILVUS_PORT=19530
|
||
|
||
# Neo4j
|
||
NEO4J_URI=bolt://neo4j:7687
|
||
NEO4J_USER=neo4j
|
||
NEO4J_PASSWORD=password
|
||
```
|
||
|
||
### NATS Streams to Create
|
||
|
||
**Before running the system, create these streams:**
|
||
|
||
```bash
|
||
# Wave 1
|
||
python scripts/init_nats_streams.py STREAM_RAG
|
||
|
||
# Wave 2
|
||
python scripts/init_nats_streams.py STREAM_TASK
|
||
python scripts/init_nats_streams.py STREAM_MEETING
|
||
|
||
# Wave 3
|
||
python scripts/init_nats_streams.py STREAM_GOVERNANCE
|
||
python scripts/init_nats_streams.py STREAM_RWA
|
||
python scripts/init_nats_streams.py STREAM_ORACLE
|
||
```
|
||
|
||
**Or create all at once:**
|
||
```bash
|
||
python scripts/init_nats_streams.py --all
|
||
```
|
||
|
||
---
|
||
|
||
## 🧪 Testing
|
||
|
||
### Unit Tests
|
||
|
||
**Parser Service:**
|
||
```bash
|
||
cd services/parser-service
|
||
python -m pytest tests/test_events.py
|
||
```
|
||
|
||
**RAG Service:**
|
||
```bash
|
||
cd services/rag-service
|
||
python -m pytest tests/test_events.py
|
||
python -m pytest tests/test_event_worker.py
|
||
```
|
||
|
||
### E2E Tests
|
||
|
||
**Wave 1 (Document Parsing):**
|
||
```bash
|
||
# 1. Upload document через parser-service
|
||
curl -X POST http://localhost:9400/ocr/parse \
|
||
-F "file=@test.pdf" \
|
||
-F "dao_id=test-dao"
|
||
|
||
# 2. Check rag-service logs для document indexed event
|
||
docker-compose logs -f rag-service | grep "indexed"
|
||
|
||
# 3. Verify document in Milvus
|
||
curl http://localhost:9500/search?query=test&dao_id=test-dao
|
||
```
|
||
|
||
**Wave 2 (Tasks):**
|
||
```bash
|
||
# 1. Create task через task service (or manually publish event)
|
||
curl -X POST http://localhost:TASK_SERVICE_PORT/tasks \
|
||
-H "Content-Type: application/json" \
|
||
-d '{"title": "Test task", "description": "Description", "dao_id": "test-dao"}'
|
||
|
||
# 2. Check rag-service logs
|
||
docker-compose logs -f rag-service | grep "task.indexed"
|
||
|
||
# 3. Search for task in RAG
|
||
curl http://localhost:9500/search?query=test+task&dao_id=test-dao
|
||
```
|
||
|
||
**Wave 3 (Governance):**
|
||
```bash
|
||
# Similar flow for governance proposals, RWA updates, oracle readings
|
||
```
|
||
|
||
---
|
||
|
||
## 📊 Monitoring
|
||
|
||
### Health Checks
|
||
|
||
```bash
|
||
# NATS
|
||
curl http://localhost:8222/healthz
|
||
|
||
# RAG Service
|
||
curl http://localhost:9500/health
|
||
|
||
# Parser Service
|
||
curl http://localhost:9400/health
|
||
```
|
||
|
||
### Event Worker Status
|
||
|
||
```bash
|
||
# Check if event worker is running
|
||
docker-compose logs rag-service | grep "Event worker started"
|
||
|
||
# Check subscriptions
|
||
docker-compose logs rag-service | grep "Subscribed to"
|
||
|
||
# Check event processing
|
||
docker-compose logs rag-service | grep "Processing event"
|
||
```
|
||
|
||
### NATS Stream Status
|
||
|
||
```bash
|
||
# Using NATS CLI
|
||
nats stream list
|
||
nats stream info STREAM_RAG
|
||
nats stream info STREAM_TASK
|
||
nats stream info STREAM_MEETING
|
||
nats stream info STREAM_GOVERNANCE
|
||
nats stream info STREAM_RWA
|
||
nats stream info STREAM_ORACLE
|
||
```
|
||
|
||
---
|
||
|
||
## 🚀 Deployment
|
||
|
||
### Docker Compose
|
||
|
||
**services/rag-service/docker-compose.yml:**
|
||
```yaml
|
||
services:
|
||
nats:
|
||
image: nats:latest
|
||
command: "-js"
|
||
ports:
|
||
- "4222:4222"
|
||
- "8222:8222"
|
||
|
||
rag-service:
|
||
build: ./services/rag-service
|
||
environment:
|
||
- NATS_URL=nats://nats:4222
|
||
- MILVUS_HOST=milvus
|
||
- NEO4J_URI=bolt://neo4j:7687
|
||
depends_on:
|
||
- nats
|
||
- milvus
|
||
- neo4j
|
||
```
|
||
|
||
### Start Services
|
||
|
||
```bash
|
||
# Start all services
|
||
docker-compose up -d
|
||
|
||
# Check status
|
||
docker-compose ps
|
||
|
||
# Initialize NATS streams
|
||
python scripts/init_nats_streams.py --all
|
||
|
||
# View logs
|
||
docker-compose logs -f rag-service
|
||
```
|
||
|
||
---
|
||
|
||
## 📝 Next Steps
|
||
|
||
### Phase 1: Stabilization (Current Priority)
|
||
- [ ] **E2E smoke tests** для всіх 3 waves
|
||
- [ ] **Monitoring dashboard** (Prometheus + Grafana)
|
||
- [ ] **Alerting** на помилки event processing
|
||
- [ ] **Performance benchmarks** (throughput, latency)
|
||
|
||
### Phase 2: Enhancement
|
||
- [ ] **Neo4j graph relations** для всіх entity types
|
||
- [ ] **Search improvements** (hybrid search, re-ranking)
|
||
- [ ] **Batch ingestion** для bulk uploads
|
||
- [ ] **Dead letter queue** для failed events
|
||
|
||
### Phase 3: Advanced Features
|
||
- [ ] **Event replay** для re-indexing
|
||
- [ ] **Versioning** документів (old vs new)
|
||
- [ ] **Access control** в RAG queries (RBAC integration)
|
||
- [ ] **Multi-modal search** (text + image + metadata)
|
||
|
||
---
|
||
|
||
## 🔗 Related Documentation
|
||
|
||
- [INFRASTRUCTURE.md](./INFRASTRUCTURE.md) — Server infrastructure, deployment
|
||
- [WARP.md](./WARP.md) — Developer guide, architecture overview
|
||
- [docs/agents.md](./docs/agents.md) — Agent hierarchy (A1-A4)
|
||
- [docs/cursor/42_nats_event_streams_and_event_catalog.md](./docs/cursor/42_nats_event_streams_and_event_catalog.md) — Event Catalog
|
||
- [TODO-PARSER-RAG.md](./TODO-PARSER-RAG.md) — Parser Agent implementation roadmap
|
||
|
||
---
|
||
|
||
**Статус:** ✅ Wave 1, 2, 3 Complete
|
||
**Last Updated:** 2025-01-17 by WARP AI
|
||
**Maintained by:** Ivan Tytar & DAARION Team
|