- Vision Encoder Service (OpenCLIP ViT-L/14, GPU-accelerated)
- FastAPI app with text/image embedding endpoints (768-dim)
- Docker support with NVIDIA GPU runtime
- Port 8001, health checks, model info API
- Qdrant Vector Database integration
- Port 6333/6334 (HTTP/gRPC)
- Image embeddings storage (768-dim, Cosine distance)
- Auto collection creation
- Vision RAG implementation
- VisionEncoderClient (Python client for API)
- Image Search module (text-to-image, image-to-image)
- Vision RAG routing in DAGI Router (mode: image_search)
- VisionEncoderProvider integration
- Documentation (5000+ lines)
- SYSTEM-INVENTORY.md - Complete system inventory
- VISION-ENCODER-STATUS.md - Service status
- VISION-RAG-IMPLEMENTATION.md - Implementation details
- vision_encoder_deployment_task.md - Deployment checklist
- services/vision-encoder/README.md - Deployment guide
- Updated WARP.md, INFRASTRUCTURE.md, Jupyter Notebook
- Testing
- test-vision-encoder.sh - Smoke tests (6 tests)
- Unit tests for client, image search, routing
- Services: 17 total (added Vision Encoder + Qdrant)
- AI Models: 3 (qwen3:8b, OpenCLIP ViT-L/14, BAAI/bge-m3)
- GPU Services: 2 (Vision Encoder, Ollama)
- VRAM Usage: ~10 GB (concurrent)
Status: Production Ready ✅
8.6 KiB
Task: RAG ingestion worker (events → Milvus + Neo4j)
Goal
Design and scaffold a RAG ingestion worker that:
- Сonsumes domain events (messages, docs, files, RWA updates) from the existing event stream.
- Transforms them into normalized chunks/documents.
- Indexes them into Milvus (vector store) and Neo4j (graph store).
- Works idempotently and supports
reindex(team_id).
This worker complements the rag-gateway service (see docs/cursor/rag_gateway_task.md) by keeping its underlying stores up-to-date.
IMPORTANT: This task is about architecture, data flow and scaffolding. Concrete model choices and full schemas can be refined later.
Context
- Project root:
microdao-daarion/. - Planned/implemented RAG layer: see
docs/cursor/rag_gateway_task.md. - Existing docs:
docs/cursor/42_nats_event_streams_and_event_catalog.md– event stream & catalog.docs/cursor/34_internal_services_architecture.md– internal services & topology.
We assume there is (or will be):
- An event bus (likely NATS) with domain events such as:
message.createddoc.upsertfile.uploadedrwa.energy.update,rwa.food.update, etc.
- A Milvus cluster instance.
- A Neo4j instance.
The ingestion worker must not be called directly by agents. It is a back-office service that feeds RAG stores for the rag-gateway.
High-level design
1. Service placement & structure
Create a new service (or extend RAG-gateway repo structure) under, for example:
services/rag-ingest-worker/
Suggested files:
main.py— entrypoint (CLI or long-running process).config.py— environment/config loader (event bus URL, Milvus/Neo4j URLs, batch sizes, etc.).events/consumer.py— NATS (or other) consumer logic.pipeline/normalization.py— turn events into normalized documents/chunks.pipeline/embedding.py— embedding model client/wrapper.pipeline/index_milvus.py— Milvus upsert logic.pipeline/index_neo4j.py— Neo4j graph updates.api.py— optional HTTP API for:POST /ingest/one– ingest single payload for debugging.POST /ingest/reindex/{team_id}– trigger reindex job.GET /health– health check.
2. Event sources
The worker should subscribe to a small set of core event types (names to be aligned with the actual Event Catalog):
message.created— messages in chats/channels (Telegram, internal UI, etc.).doc.upsert— wiki/docs/specs updates.file.uploaded— files (PDF, images) that have parsed text.rwa.*— events related to energy/food/water assets (optional, for later).
Implementation details:
- Use NATS (or another broker) subscription patterns from
docs/cursor/42_nats_event_streams_and_event_catalog.md. - Each event should carry at least:
event_typeteam_id/dao_iduser_idchannel_id/project_id(if applicable)payloadwith text/content and metadata.
Normalized document/chunk model
Define a common internal model for what is sent to Milvus/Neo4j, e.g. IngestChunk:
Fields (minimum):
chunk_id— deterministic ID (e.g. hash of (team_id, source_type, source_id, chunk_index)).team_id/dao_id.project_id(optional).channel_id(optional).agent_id(who generated it, if any).source_type—"message" | "doc" | "file" | "wiki" | "rwa" | ....source_id— e.g. message ID, doc ID, file ID.text— the chunk content.tags— list of tags (topic, domain, etc.).visibility—"public" | "confidential".created_at— timestamp.
Responsibilities:
pipeline/normalization.py:- For each event type, map event payload → one or more
IngestChunkobjects. - Handle splitting of long texts into smaller chunks if needed.
- For each event type, map event payload → one or more
Embedding & Milvus indexing
1. Embedding
-
Create an embedding component (
pipeline/embedding.py) that:- Accepts
IngestChunkobjects. - Supports batch processing.
- Uses either:
- Existing LLM proxy/embedding service (preferred), or
- Direct model (e.g. local
bge-m3,gte-large, etc.).
- Accepts
-
Each chunk after embedding should have vector + metadata per schema in
rag_gateway_task.
2. Milvus indexing
-
pipeline/index_milvus.pyshould:- Upsert chunks into Milvus.
- Ensure idempotency using
chunk_idas primary key. - Store metadata:
team_id,project_id,channel_id,agent_id,source_type,source_id,visibility,tags,created_at,embed_modelversion.
-
Consider using one Milvus collection with a partition key (
team_id), or per-DAO collections — but keep code flexible.
Neo4j graph updates
pipeline/index_neo4j.py should:
-
For events that carry structural information (e.g. project uses resource, doc mentions topic):
- Create or update nodes:
User,MicroDAO,Project,Channel,Topic,Resource,File,RWAObject,Doc. - Create relationships such as:
(:User)-[:MEMBER_OF]->(:MicroDAO)(:Agent)-[:SERVES]->(:MicroDAO|:Project)(:Doc)-[:MENTIONS]->(:Topic)(:Project)-[:USES]->(:Resource)
- Create or update nodes:
-
All nodes/edges must include:
team_id/dao_idvisibilitywhen it matters
-
Operations should be upserts (MERGE) to avoid duplicates.
Idempotency & reindex
1. Idempotent semantics
- Use deterministic
chunk_idfor Milvus records. - Use Neo4j
MERGEfor nodes/edges based on natural keys (e.g.(team_id, source_type, source_id, chunk_index)). - Replaying the same events should not corrupt or duplicate data.
2. Reindex API
-
Provide a simple HTTP or CLI interface to:
POST /ingest/reindex/{team_id}— schedule or start reindex for a team/DAO.
-
Reindex strategy:
- Read documents/messages from source-of-truth (DB or event replay).
- Rebuild chunks and embeddings.
- Upsert into Milvus & Neo4j (idempotently).
Implementation details (can be left as TODOs if missing backends):
- If there is no easy historic source yet, stub the reindex endpoint with clear TODO and logging.
Monitoring & logging
Add basic observability:
- Structured logs for:
- Each event type ingested.
- Number of chunks produced.
- Latency for embedding and indexing.
- (Optional) Metrics counters/gauges:
ingest_events_totalingest_chunks_totalingest_errors_total
Files to create/modify (suggested)
Adjust exact paths if needed.
-
services/rag-ingest-worker/main.py- Parse config, connect to event bus, start consumers.
-
services/rag-ingest-worker/config.py- Environment variables:
EVENT_BUS_URL,MILVUS_URL,NEO4J_URL,EMBEDDING_SERVICE_URL, etc.
- Environment variables:
-
services/rag-ingest-worker/events/consumer.py- NATS (or chosen bus) subscription logic.
-
services/rag-ingest-worker/pipeline/normalization.py- Functions
normalize_message_created(event),normalize_doc_upsert(event),normalize_file_uploaded(event).
- Functions
-
services/rag-ingest-worker/pipeline/embedding.pyembed_chunks(chunks: List[IngestChunk]) -> List[VectorChunk].
-
services/rag-ingest-worker/pipeline/index_milvus.pyupsert_chunks_to_milvus(chunks: List[VectorChunk]).
-
services/rag-ingest-worker/pipeline/index_neo4j.pyupdate_graph_for_event(event, chunks: List[IngestChunk]).
-
Optional:
services/rag-ingest-worker/api.py- FastAPI app with:
GET /healthPOST /ingest/onePOST /ingest/reindex/{team_id}
- FastAPI app with:
-
Integration docs:
- Reference
docs/cursor/rag_gateway_task.mdanddocs/cursor/42_nats_event_streams_and_event_catalog.mdwhere appropriate.
- Reference
Acceptance criteria
-
A new
rag-ingest-worker(or similarly named) module/service exists underservices/with:- Clear directory structure (
events/,pipeline/,config.py,main.py). - Stubs or initial implementations for consuming events and indexing to Milvus/Neo4j.
- Clear directory structure (
-
A normalized internal model (
IngestChunkor equivalent) is defined and used across pipelines. -
Milvus indexing code:
- Uses idempotent upserts keyed by
chunk_id. - Stores metadata compatible with the RAG-gateway schema.
- Uses idempotent upserts keyed by
-
Neo4j update code:
- Uses MERGE for nodes/relationships.
- Encodes
team_id/dao_idand privacy where relevant.
-
Idempotency strategy and
reindex(team_id)path are present in code (even if reindex is initially a stub with TODO). -
Basic logging is present for ingestion operations.
-
This file (
docs/cursor/rag_ingestion_worker_task.md) can be executed by Cursor as:cursor task < docs/cursor/rag_ingestion_worker_task.mdand Cursor will use it as the single source of truth for implementing/refining the ingestion worker.