Task: RAG ingestion worker (events → Milvus + Neo4j)¶
Goal¶
Design and scaffold a RAG ingestion worker that:
- Сonsumes domain events (messages, docs, files, RWA updates) from the existing event stream.
- Transforms them into normalized chunks/documents.
- Indexes them into Milvus (vector store) and Neo4j (graph store).
- Works idempotently and supports
reindex(team_id).
This worker complements the rag-gateway service (see docs/cursor/rag_gateway_task.md) by keeping its underlying stores up-to-date.
IMPORTANT: This task is about architecture, data flow and scaffolding. Concrete model choices and full schemas can be refined later.
Context¶
- Project root:
microdao-daarion/. - Planned/implemented RAG layer: see
docs/cursor/rag_gateway_task.md. - Existing docs:
docs/cursor/42_nats_event_streams_and_event_catalog.md– event stream & catalog.docs/cursor/34_internal_services_architecture.md– internal services & topology.
We assume there is (or will be):
- An event bus (likely NATS) with domain events such as:
message.createddoc.upsertfile.uploadedrwa.energy.update,rwa.food.update, etc.- A Milvus cluster instance.
- A Neo4j instance.
The ingestion worker must not be called directly by agents. It is a back-office service that feeds RAG stores for the rag-gateway.
High-level design¶
1. Service placement & structure¶
Create a new service (or extend RAG-gateway repo structure) under, for example:
services/rag-ingest-worker/
Suggested files:
main.py— entrypoint (CLI or long-running process).config.py— environment/config loader (event bus URL, Milvus/Neo4j URLs, batch sizes, etc.).events/consumer.py— NATS (or other) consumer logic.pipeline/normalization.py— turn events into normalized documents/chunks.pipeline/embedding.py— embedding model client/wrapper.pipeline/index_milvus.py— Milvus upsert logic.pipeline/index_neo4j.py— Neo4j graph updates.api.py— optional HTTP API for:POST /ingest/one– ingest single payload for debugging.POST /ingest/reindex/{team_id}– trigger reindex job.GET /health– health check.
2. Event sources¶
The worker should subscribe to a small set of core event types (names to be aligned with the actual Event Catalog):
message.created— messages in chats/channels (Telegram, internal UI, etc.).doc.upsert— wiki/docs/specs updates.file.uploaded— files (PDF, images) that have parsed text.rwa.*— events related to energy/food/water assets (optional, for later).
Implementation details:
- Use NATS (or another broker) subscription patterns from
docs/cursor/42_nats_event_streams_and_event_catalog.md. - Each event should carry at least:
event_typeteam_id/dao_iduser_idchannel_id/project_id(if applicable)payloadwith text/content and metadata.
Normalized document/chunk model¶
Define a common internal model for what is sent to Milvus/Neo4j, e.g. IngestChunk:
Fields (minimum):
chunk_id— deterministic ID (e.g. hash of (team_id, source_type, source_id, chunk_index)).team_id/dao_id.project_id(optional).channel_id(optional).agent_id(who generated it, if any).source_type—"message" | "doc" | "file" | "wiki" | "rwa" | ....source_id— e.g. message ID, doc ID, file ID.text— the chunk content.tags— list of tags (topic, domain, etc.).visibility—"public" | "confidential".created_at— timestamp.
Responsibilities:
pipeline/normalization.py:- For each event type, map event payload → one or more
IngestChunkobjects. - Handle splitting of long texts into smaller chunks if needed.
Embedding & Milvus indexing¶
1. Embedding¶
- Create an embedding component (
pipeline/embedding.py) that: - Accepts
IngestChunkobjects. - Supports batch processing.
-
Uses either:
- Existing LLM proxy/embedding service (preferred), or
- Direct model (e.g. local
bge-m3,gte-large, etc.).
-
Each chunk after embedding should have vector + metadata per schema in
rag_gateway_task.
2. Milvus indexing¶
pipeline/index_milvus.pyshould:- Upsert chunks into Milvus.
- Ensure idempotency using
chunk_idas primary key. -
Store metadata:
team_id,project_id,channel_id,agent_id,source_type,source_id,visibility,tags,created_at,embed_modelversion.
-
Consider using one Milvus collection with a partition key (
team_id), or per-DAO collections — but keep code flexible.
Neo4j graph updates¶
pipeline/index_neo4j.py should:
- For events that carry structural information (e.g. project uses resource, doc mentions topic):
- Create or update nodes:
User,MicroDAO,Project,Channel,Topic,Resource,File,RWAObject,Doc. -
Create relationships such as:
(:User)-[:MEMBER_OF]->(:MicroDAO)(:Agent)-[:SERVES]->(:MicroDAO|:Project)(:Doc)-[:MENTIONS]->(:Topic)(:Project)-[:USES]->(:Resource)
-
All nodes/edges must include:
team_id/dao_id-
visibilitywhen it matters -
Operations should be upserts (MERGE) to avoid duplicates.
Idempotency & reindex¶
1. Idempotent semantics¶
- Use deterministic
chunk_idfor Milvus records. - Use Neo4j
MERGEfor nodes/edges based on natural keys (e.g.(team_id, source_type, source_id, chunk_index)). - Replaying the same events should not corrupt or duplicate data.
2. Reindex API¶
-
Provide a simple HTTP or CLI interface to:
-
POST /ingest/reindex/{team_id}— schedule or start reindex for a team/DAO. -
Reindex strategy:
-
Read documents/messages from source-of-truth (DB or event replay).
- Rebuild chunks and embeddings.
- Upsert into Milvus & Neo4j (idempotently).
Implementation details (can be left as TODOs if missing backends):
- If there is no easy historic source yet, stub the reindex endpoint with clear TODO and logging.
Monitoring & logging¶
Add basic observability:
- Structured logs for:
- Each event type ingested.
- Number of chunks produced.
- Latency for embedding and indexing.
- (Optional) Metrics counters/gauges:
ingest_events_totalingest_chunks_totalingest_errors_total
Files to create/modify (suggested)¶
Adjust exact paths if needed.
services/rag-ingest-worker/main.py-
Parse config, connect to event bus, start consumers.
-
services/rag-ingest-worker/config.py -
Environment variables:
EVENT_BUS_URL,MILVUS_URL,NEO4J_URL,EMBEDDING_SERVICE_URL, etc. -
services/rag-ingest-worker/events/consumer.py -
NATS (or chosen bus) subscription logic.
-
services/rag-ingest-worker/pipeline/normalization.py -
Functions
normalize_message_created(event),normalize_doc_upsert(event),normalize_file_uploaded(event). -
services/rag-ingest-worker/pipeline/embedding.py -
embed_chunks(chunks: List[IngestChunk]) -> List[VectorChunk]. -
services/rag-ingest-worker/pipeline/index_milvus.py -
upsert_chunks_to_milvus(chunks: List[VectorChunk]). -
services/rag-ingest-worker/pipeline/index_neo4j.py -
update_graph_for_event(event, chunks: List[IngestChunk]). -
Optional:
services/rag-ingest-worker/api.py -
FastAPI app with:
GET /healthPOST /ingest/onePOST /ingest/reindex/{team_id}
-
Integration docs:
- Reference
docs/cursor/rag_gateway_task.mdanddocs/cursor/42_nats_event_streams_and_event_catalog.mdwhere appropriate.
Acceptance criteria¶
- A new
rag-ingest-worker(or similarly named) module/service exists underservices/with: - Clear directory structure (
events/,pipeline/,config.py,main.py). -
Stubs or initial implementations for consuming events and indexing to Milvus/Neo4j.
-
A normalized internal model (
IngestChunkor equivalent) is defined and used across pipelines. -
Milvus indexing code:
- Uses idempotent upserts keyed by
chunk_id. -
Stores metadata compatible with the RAG-gateway schema.
-
Neo4j update code:
- Uses MERGE for nodes/relationships.
-
Encodes
team_id/dao_idand privacy where relevant. -
Idempotency strategy and
reindex(team_id)path are present in code (even if reindex is initially a stub with TODO). -
Basic logging is present for ingestion operations.
-
This file (
docs/cursor/rag_ingestion_worker_task.md) can be executed by Cursor as:
bash
cursor task < docs/cursor/rag_ingestion_worker_task.md
and Cursor will use it as the single source of truth for implementing/refining the ingestion worker.