Skip to content

Task: RAG ingestion worker (events → Milvus + Neo4j)

Goal

Design and scaffold a RAG ingestion worker that:

  • Сonsumes domain events (messages, docs, files, RWA updates) from the existing event stream.
  • Transforms them into normalized chunks/documents.
  • Indexes them into Milvus (vector store) and Neo4j (graph store).
  • Works idempotently and supports reindex(team_id).

This worker complements the rag-gateway service (see docs/cursor/rag_gateway_task.md) by keeping its underlying stores up-to-date.

IMPORTANT: This task is about architecture, data flow and scaffolding. Concrete model choices and full schemas can be refined later.


Context

  • Project root: microdao-daarion/.
  • Planned/implemented RAG layer: see docs/cursor/rag_gateway_task.md.
  • Existing docs:
  • docs/cursor/42_nats_event_streams_and_event_catalog.md – event stream & catalog.
  • docs/cursor/34_internal_services_architecture.md – internal services & topology.

We assume there is (or will be):

  • An event bus (likely NATS) with domain events such as:
  • message.created
  • doc.upsert
  • file.uploaded
  • rwa.energy.update, rwa.food.update, etc.
  • A Milvus cluster instance.
  • A Neo4j instance.

The ingestion worker must not be called directly by agents. It is a back-office service that feeds RAG stores for the rag-gateway.


High-level design

1. Service placement & structure

Create a new service (or extend RAG-gateway repo structure) under, for example:

  • services/rag-ingest-worker/

Suggested files:

  • main.py — entrypoint (CLI or long-running process).
  • config.py — environment/config loader (event bus URL, Milvus/Neo4j URLs, batch sizes, etc.).
  • events/consumer.py — NATS (or other) consumer logic.
  • pipeline/normalization.py — turn events into normalized documents/chunks.
  • pipeline/embedding.py — embedding model client/wrapper.
  • pipeline/index_milvus.py — Milvus upsert logic.
  • pipeline/index_neo4j.py — Neo4j graph updates.
  • api.py — optional HTTP API for:
  • POST /ingest/one – ingest single payload for debugging.
  • POST /ingest/reindex/{team_id} – trigger reindex job.
  • GET /health – health check.

2. Event sources

The worker should subscribe to a small set of core event types (names to be aligned with the actual Event Catalog):

  • message.created — messages in chats/channels (Telegram, internal UI, etc.).
  • doc.upsert — wiki/docs/specs updates.
  • file.uploaded — files (PDF, images) that have parsed text.
  • rwa.* — events related to energy/food/water assets (optional, for later).

Implementation details:

  • Use NATS (or another broker) subscription patterns from docs/cursor/42_nats_event_streams_and_event_catalog.md.
  • Each event should carry at least:
  • event_type
  • team_id / dao_id
  • user_id
  • channel_id / project_id (if applicable)
  • payload with text/content and metadata.

Normalized document/chunk model

Define a common internal model for what is sent to Milvus/Neo4j, e.g. IngestChunk:

Fields (minimum):

  • chunk_id — deterministic ID (e.g. hash of (team_id, source_type, source_id, chunk_index)).
  • team_id / dao_id.
  • project_id (optional).
  • channel_id (optional).
  • agent_id (who generated it, if any).
  • source_type"message" | "doc" | "file" | "wiki" | "rwa" | ....
  • source_id — e.g. message ID, doc ID, file ID.
  • text — the chunk content.
  • tags — list of tags (topic, domain, etc.).
  • visibility"public" | "confidential".
  • created_at — timestamp.

Responsibilities:

  • pipeline/normalization.py:
  • For each event type, map event payload → one or more IngestChunk objects.
  • Handle splitting of long texts into smaller chunks if needed.

Embedding & Milvus indexing

1. Embedding

  • Create an embedding component (pipeline/embedding.py) that:
  • Accepts IngestChunk objects.
  • Supports batch processing.
  • Uses either:

    • Existing LLM proxy/embedding service (preferred), or
    • Direct model (e.g. local bge-m3, gte-large, etc.).
  • Each chunk after embedding should have vector + metadata per schema in rag_gateway_task.

2. Milvus indexing

  • pipeline/index_milvus.py should:
  • Upsert chunks into Milvus.
  • Ensure idempotency using chunk_id as primary key.
  • Store metadata:

    • team_id, project_id, channel_id, agent_id,
    • source_type, source_id,
    • visibility, tags, created_at,
    • embed_model version.
  • Consider using one Milvus collection with a partition key (team_id), or per-DAO collections — but keep code flexible.


Neo4j graph updates

pipeline/index_neo4j.py should:

  • For events that carry structural information (e.g. project uses resource, doc mentions topic):
  • Create or update nodes: User, MicroDAO, Project, Channel, Topic, Resource, File, RWAObject, Doc.
  • Create relationships such as:

    • (:User)-[:MEMBER_OF]->(:MicroDAO)
    • (:Agent)-[:SERVES]->(:MicroDAO|:Project)
    • (:Doc)-[:MENTIONS]->(:Topic)
    • (:Project)-[:USES]->(:Resource)
  • All nodes/edges must include:

  • team_id / dao_id
  • visibility when it matters

  • Operations should be upserts (MERGE) to avoid duplicates.


Idempotency & reindex

1. Idempotent semantics

  • Use deterministic chunk_id for Milvus records.
  • Use Neo4j MERGE for nodes/edges based on natural keys (e.g. (team_id, source_type, source_id, chunk_index)).
  • Replaying the same events should not corrupt or duplicate data.

2. Reindex API

  • Provide a simple HTTP or CLI interface to:

  • POST /ingest/reindex/{team_id} — schedule or start reindex for a team/DAO.

  • Reindex strategy:

  • Read documents/messages from source-of-truth (DB or event replay).

  • Rebuild chunks and embeddings.
  • Upsert into Milvus & Neo4j (idempotently).

Implementation details (can be left as TODOs if missing backends):

  • If there is no easy historic source yet, stub the reindex endpoint with clear TODO and logging.

Monitoring & logging

Add basic observability:

  • Structured logs for:
  • Each event type ingested.
  • Number of chunks produced.
  • Latency for embedding and indexing.
  • (Optional) Metrics counters/gauges:
  • ingest_events_total
  • ingest_chunks_total
  • ingest_errors_total

Files to create/modify (suggested)

Adjust exact paths if needed.

  • services/rag-ingest-worker/main.py
  • Parse config, connect to event bus, start consumers.

  • services/rag-ingest-worker/config.py

  • Environment variables: EVENT_BUS_URL, MILVUS_URL, NEO4J_URL, EMBEDDING_SERVICE_URL, etc.

  • services/rag-ingest-worker/events/consumer.py

  • NATS (or chosen bus) subscription logic.

  • services/rag-ingest-worker/pipeline/normalization.py

  • Functions normalize_message_created(event), normalize_doc_upsert(event), normalize_file_uploaded(event).

  • services/rag-ingest-worker/pipeline/embedding.py

  • embed_chunks(chunks: List[IngestChunk]) -> List[VectorChunk].

  • services/rag-ingest-worker/pipeline/index_milvus.py

  • upsert_chunks_to_milvus(chunks: List[VectorChunk]).

  • services/rag-ingest-worker/pipeline/index_neo4j.py

  • update_graph_for_event(event, chunks: List[IngestChunk]).

  • Optional: services/rag-ingest-worker/api.py

  • FastAPI app with:

    • GET /health
    • POST /ingest/one
    • POST /ingest/reindex/{team_id}
  • Integration docs:

  • Reference docs/cursor/rag_gateway_task.md and docs/cursor/42_nats_event_streams_and_event_catalog.md where appropriate.

Acceptance criteria

  1. A new rag-ingest-worker (or similarly named) module/service exists under services/ with:
  2. Clear directory structure (events/, pipeline/, config.py, main.py).
  3. Stubs or initial implementations for consuming events and indexing to Milvus/Neo4j.

  4. A normalized internal model (IngestChunk or equivalent) is defined and used across pipelines.

  5. Milvus indexing code:

  6. Uses idempotent upserts keyed by chunk_id.
  7. Stores metadata compatible with the RAG-gateway schema.

  8. Neo4j update code:

  9. Uses MERGE for nodes/relationships.
  10. Encodes team_id/dao_id and privacy where relevant.

  11. Idempotency strategy and reindex(team_id) path are present in code (even if reindex is initially a stub with TODO).

  12. Basic logging is present for ingestion operations.

  13. This file (docs/cursor/rag_ingestion_worker_task.md) can be executed by Cursor as:

bash cursor task < docs/cursor/rag_ingestion_worker_task.md

and Cursor will use it as the single source of truth for implementing/refining the ingestion worker.