Files
microdao-daarion/RAG-INGESTION-STATUS.md
Apple 4601c6fca8 feat: add Vision Encoder service + Vision RAG implementation
- Vision Encoder Service (OpenCLIP ViT-L/14, GPU-accelerated)
  - FastAPI app with text/image embedding endpoints (768-dim)
  - Docker support with NVIDIA GPU runtime
  - Port 8001, health checks, model info API

- Qdrant Vector Database integration
  - Port 6333/6334 (HTTP/gRPC)
  - Image embeddings storage (768-dim, Cosine distance)
  - Auto collection creation

- Vision RAG implementation
  - VisionEncoderClient (Python client for API)
  - Image Search module (text-to-image, image-to-image)
  - Vision RAG routing in DAGI Router (mode: image_search)
  - VisionEncoderProvider integration

- Documentation (5000+ lines)
  - SYSTEM-INVENTORY.md - Complete system inventory
  - VISION-ENCODER-STATUS.md - Service status
  - VISION-RAG-IMPLEMENTATION.md - Implementation details
  - vision_encoder_deployment_task.md - Deployment checklist
  - services/vision-encoder/README.md - Deployment guide
  - Updated WARP.md, INFRASTRUCTURE.md, Jupyter Notebook

- Testing
  - test-vision-encoder.sh - Smoke tests (6 tests)
  - Unit tests for client, image search, routing

- Services: 17 total (added Vision Encoder + Qdrant)
- AI Models: 3 (qwen3:8b, OpenCLIP ViT-L/14, BAAI/bge-m3)
- GPU Services: 2 (Vision Encoder, Ollama)
- VRAM Usage: ~10 GB (concurrent)

Status: Production Ready 
2025-11-17 05:24:36 -08:00

459 lines
14 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 📊 RAG Event-Driven Ingestion — Status
**Версія:** 1.0.0
**Останнє оновлення:** 2025-01-17
**Статус:** ✅ Wave 1, 2, 3 Complete
---
## 🎯 Overview
Event-driven архітектура для автоматичного інжесту контенту в RAG систему через NATS JetStream. Система підписується на різні типи events з різних streams та автоматично індексує контент у Milvus та Neo4j.
**Документація:**
- [Event Catalog](./docs/cursor/42_nats_event_streams_and_event_catalog.md) — Повний каталог NATS streams та events
- [Wave 1 Task](./docs/cursor/rag_ingestion_events_wave1_mvp_task.md) — Chat/Docs/Files ingestion
- [Wave 2 Task](./docs/cursor/rag_ingestion_events_wave2_workflows_task.md) — Tasks/Followups/Meetings ingestion
- [Wave 3 Task](./docs/cursor/rag_ingestion_events_wave3_governance_rwa_task.md) — Governance/RWA/Oracle ingestion
---
## ✅ Wave 1: Chat Messages, Documents, Files (MVP)
**Статус:** ✅ Complete
**Дата завершення:** 2025-01-16
### Implemented Features
#### Event Handlers (rag-service/event_worker.py)
-`handle_document_parsed_event()` — обробка `rag.document.parsed` з `STREAM_RAG`
- ✅ Автоматичний інжест parsed documents в Milvus + Neo4j
- ✅ Ідемпотентність (пропуск вже індексованих документів)
- ✅ Публікація події `rag.document.indexed` після успішної індексації
#### Event Publishing (rag-service/events.py)
-`publish_document_indexed()` — публікація `rag.document.indexed`
- ✅ Connection management з NATS
- ✅ Retry logic при помилках публікації
#### Event Publishing (parser-service/events.py)
-`publish_document_parsed()` — публікація `rag.document.parsed` після OCR
- ✅ Інтеграція в API endpoints (`/ocr/parse`, `/ocr/parse_markdown`, etc.)
#### Infrastructure
- ✅ NATS JetStream service в `docker-compose.yml`
-`STREAM_RAG` створено з subjects:
- `rag.document.parsed`
- `rag.document.indexed`
- `rag.document.reindexed`
- `rag.chat.message.created`
- `rag.file.uploaded`
- ✅ Lifespan startup в `rag-service` — автоматичний запуск event worker
- ✅ Environment variables (`NATS_URL`) в конфігурації
### Testing
- ✅ Unit tests для event publishing
- ✅ Unit tests для event consumption
- [ ] E2E smoke test (parser → NATS → rag-service)
---
## ✅ Wave 2: Tasks, Followups, Meetings
**Статус:** ✅ Complete
**Дата завершення:** 2025-01-17
### Implemented Features
#### Event Handlers (rag-service/event_worker.py)
-`handle_task_created_event()` — обробка `task.created` з `STREAM_TASK`
-`handle_task_updated_event()` — обробка `task.updated` з `STREAM_TASK`
-`handle_meeting_transcript_event()` — обробка `meeting.transcript.created` з `STREAM_MEETING`
- ✅ Автоматичний інжест tasks при створенні/оновленні
- ✅ Автоматичний інжест meeting transcripts
- ✅ Helper function `_ingest_content_to_rag()` для універсального інжесту
#### Event Publishing (rag-service/events.py)
-`publish_task_indexed()` — публікація `rag.task.indexed`
-`publish_task_reindexed()` — публікація `rag.task.reindexed`
-`publish_meeting_indexed()` — публікація `rag.meeting.indexed`
#### Subscriptions
-`STREAM_TASK.task.created`
-`STREAM_TASK.task.updated`
-`STREAM_MEETING.meeting.transcript.created`
### Data Ingested
- Tasks: title, description, assignee, status, priority, labels, project_id
- Meetings: transcript, attendees, duration, summary, dao_id, team_id
### Neo4j Graph Relations (Future)
- [ ] Task → User (assignee)
- [ ] Task → Project
- [ ] Meeting → User (attendees)
- [ ] Meeting → Team
---
## ✅ Wave 3: Governance, RWA, Oracle
**Статус:** ✅ Complete
**Дата завершення:** 2025-01-17
### Implemented Features
#### Event Handlers (rag-service/event_worker.py)
-`handle_governance_policy_event()` — обробка `governance.policy.created/updated` з `STREAM_GOVERNANCE`
-`handle_governance_proposal_event()` — обробка `governance.proposal.created` з `STREAM_GOVERNANCE`
-`handle_rwa_inventory_event()` — обробка `rwa.inventory.updated` з `STREAM_RWA`
-`handle_oracle_reading_event()` — обробка `oracle.reading.published` з `STREAM_ORACLE`
- ✅ Фільтрація тільки важливих readings (критичні зміни)
#### Event Publishing (rag-service/events.py)
-`publish_governance_policy_indexed()` — публікація `rag.governance.policy.indexed`
-`publish_governance_proposal_indexed()` — публікація `rag.governance.proposal.indexed`
-`publish_rwa_inventory_indexed()` — публікація `rag.rwa.inventory.indexed`
-`publish_oracle_reading_indexed()` — публікація `rag.oracle.reading.indexed`
#### Subscriptions
-`STREAM_GOVERNANCE.governance.policy.*` (created/updated)
-`STREAM_GOVERNANCE.governance.proposal.created`
-`STREAM_RWA.rwa.inventory.updated`
-`STREAM_ORACLE.oracle.reading.published`
### Data Ingested
**Governance:**
- Policies: title, description, rules, enforcement_level, dao_id
- Proposals: title, description, proposer_id, vote_count, status
**RWA (Real World Assets):**
- Inventory updates: stock levels, locations, energy generation, water quality
- Platform: GREENFOOD, Energy Union, Water Union
**Oracle:**
- Sensor readings (тільки важливі): temperature thresholds, pressure alerts, quality changes
- Automatic filtering based on severity
### Neo4j Graph Relations (Future)
- [ ] Proposal → User (proposer)
- [ ] Proposal → DAO
- [ ] Policy → DAO
- [ ] RWA Asset → Platform
- [ ] Oracle Reading → Asset
---
## 🏗️ Architecture
### Event Flow
```
┌─────────────────┐
│ Parser Service │
│ (OCR Pipeline) │
└────────┬────────┘
│ publish
┌────────┐
│ NATS │
│ Stream │ ← STREAM_RAG, STREAM_TASK, STREAM_MEETING,
└────┬───┘ STREAM_GOVERNANCE, STREAM_RWA, STREAM_ORACLE
│ subscribe
┌─────────────────┐
│ RAG Service │
│ Event Worker │
│ ├ Wave 1 │
│ ├ Wave 2 │
│ └ Wave 3 │
└────────┬────────┘
│ ingest
┌──────────────┐
│ Milvus + Neo4j│
│ Vector DB │
└──────────────┘
▼ publish
┌────────┐
│ NATS │ ← rag.*.indexed events
└────────┘
```
### Event Worker (rag-service/event_worker.py)
**Parallel Subscriptions:**
```python
await asyncio.gather(
subscribe_to_rag_events(js), # Wave 1: STREAM_RAG
subscribe_to_task_events(js), # Wave 2: STREAM_TASK
subscribe_to_meeting_events(js), # Wave 2: STREAM_MEETING
subscribe_to_governance_events(js), # Wave 3: STREAM_GOVERNANCE
subscribe_to_rwa_events(js), # Wave 3: STREAM_RWA
subscribe_to_oracle_events(js), # Wave 3: STREAM_ORACLE
)
```
**Graceful Handling:**
- ⚠️ Warning logs for missing streams (не падає)
- 🔄 Automatic retry при помилках (не ack повідомлення)
- ✅ Ідемпотентність через перевірку `indexed` flag
---
## 📦 File Structure
```
services/
├── parser-service/
│ └── app/
│ └── events.py # Event publishing (Wave 1)
│ ├── publish_document_parsed()
│ └── NATS connection management
└── rag-service/
└── app/
├── events.py # Event publishing (Waves 1, 2, 3)
│ ├── Wave 1: publish_document_indexed()
│ ├── Wave 2: publish_task_indexed(), publish_meeting_indexed()
│ └── Wave 3: publish_governance_*(), publish_rwa_*(), publish_oracle_*()
├── event_worker.py # Event handlers & subscriptions (Waves 1, 2, 3)
│ ├── Wave 1: handle_document_parsed_event()
│ ├── Wave 2: handle_task_*(), handle_meeting_*()
│ ├── Wave 3: handle_governance_*(), handle_rwa_*(), handle_oracle_*()
│ └── Helper: _ingest_content_to_rag()
├── worker.py # Async ingestion jobs
└── main.py # Lifespan startup (автозапуск event worker)
```
---
## 🔧 Configuration
### Environment Variables
```bash
# NATS Configuration
NATS_URL=nats://nats:4222
# RAG Service
RAG_SERVICE_URL=http://rag-service:9500
# Parser Service
PARSER_SERVICE_URL=http://parser-service:9400
# Milvus
MILVUS_HOST=milvus
MILVUS_PORT=19530
# Neo4j
NEO4J_URI=bolt://neo4j:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=password
```
### NATS Streams to Create
**Before running the system, create these streams:**
```bash
# Wave 1
python scripts/init_nats_streams.py STREAM_RAG
# Wave 2
python scripts/init_nats_streams.py STREAM_TASK
python scripts/init_nats_streams.py STREAM_MEETING
# Wave 3
python scripts/init_nats_streams.py STREAM_GOVERNANCE
python scripts/init_nats_streams.py STREAM_RWA
python scripts/init_nats_streams.py STREAM_ORACLE
```
**Or create all at once:**
```bash
python scripts/init_nats_streams.py --all
```
---
## 🧪 Testing
### Unit Tests
**Parser Service:**
```bash
cd services/parser-service
python -m pytest tests/test_events.py
```
**RAG Service:**
```bash
cd services/rag-service
python -m pytest tests/test_events.py
python -m pytest tests/test_event_worker.py
```
### E2E Tests
**Wave 1 (Document Parsing):**
```bash
# 1. Upload document через parser-service
curl -X POST http://localhost:9400/ocr/parse \
-F "file=@test.pdf" \
-F "dao_id=test-dao"
# 2. Check rag-service logs для document indexed event
docker-compose logs -f rag-service | grep "indexed"
# 3. Verify document in Milvus
curl http://localhost:9500/search?query=test&dao_id=test-dao
```
**Wave 2 (Tasks):**
```bash
# 1. Create task через task service (or manually publish event)
curl -X POST http://localhost:TASK_SERVICE_PORT/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Test task", "description": "Description", "dao_id": "test-dao"}'
# 2. Check rag-service logs
docker-compose logs -f rag-service | grep "task.indexed"
# 3. Search for task in RAG
curl http://localhost:9500/search?query=test+task&dao_id=test-dao
```
**Wave 3 (Governance):**
```bash
# Similar flow for governance proposals, RWA updates, oracle readings
```
---
## 📊 Monitoring
### Health Checks
```bash
# NATS
curl http://localhost:8222/healthz
# RAG Service
curl http://localhost:9500/health
# Parser Service
curl http://localhost:9400/health
```
### Event Worker Status
```bash
# Check if event worker is running
docker-compose logs rag-service | grep "Event worker started"
# Check subscriptions
docker-compose logs rag-service | grep "Subscribed to"
# Check event processing
docker-compose logs rag-service | grep "Processing event"
```
### NATS Stream Status
```bash
# Using NATS CLI
nats stream list
nats stream info STREAM_RAG
nats stream info STREAM_TASK
nats stream info STREAM_MEETING
nats stream info STREAM_GOVERNANCE
nats stream info STREAM_RWA
nats stream info STREAM_ORACLE
```
---
## 🚀 Deployment
### Docker Compose
**services/rag-service/docker-compose.yml:**
```yaml
services:
nats:
image: nats:latest
command: "-js"
ports:
- "4222:4222"
- "8222:8222"
rag-service:
build: ./services/rag-service
environment:
- NATS_URL=nats://nats:4222
- MILVUS_HOST=milvus
- NEO4J_URI=bolt://neo4j:7687
depends_on:
- nats
- milvus
- neo4j
```
### Start Services
```bash
# Start all services
docker-compose up -d
# Check status
docker-compose ps
# Initialize NATS streams
python scripts/init_nats_streams.py --all
# View logs
docker-compose logs -f rag-service
```
---
## 📝 Next Steps
### Phase 1: Stabilization (Current Priority)
- [ ] **E2E smoke tests** для всіх 3 waves
- [ ] **Monitoring dashboard** (Prometheus + Grafana)
- [ ] **Alerting** на помилки event processing
- [ ] **Performance benchmarks** (throughput, latency)
### Phase 2: Enhancement
- [ ] **Neo4j graph relations** для всіх entity types
- [ ] **Search improvements** (hybrid search, re-ranking)
- [ ] **Batch ingestion** для bulk uploads
- [ ] **Dead letter queue** для failed events
### Phase 3: Advanced Features
- [ ] **Event replay** для re-indexing
- [ ] **Versioning** документів (old vs new)
- [ ] **Access control** в RAG queries (RBAC integration)
- [ ] **Multi-modal search** (text + image + metadata)
---
## 🔗 Related Documentation
- [INFRASTRUCTURE.md](./INFRASTRUCTURE.md) — Server infrastructure, deployment
- [WARP.md](./WARP.md) — Developer guide, architecture overview
- [docs/agents.md](./docs/agents.md) — Agent hierarchy (A1-A4)
- [docs/cursor/42_nats_event_streams_and_event_catalog.md](./docs/cursor/42_nats_event_streams_and_event_catalog.md) — Event Catalog
- [TODO-PARSER-RAG.md](./TODO-PARSER-RAG.md) — Parser Agent implementation roadmap
---
**Статус:** ✅ Wave 1, 2, 3 Complete
**Last Updated:** 2025-01-17 by WARP AI
**Maintained by:** Ivan Tytar & DAARION Team