Files
microdao-daarion/services/rag-service/app/ingest_pipeline.py
Apple 7b91c8e83c feat(foundation): FOUNDATION_UPDATE implementation
## Documentation (20 files)
- DAARION Ontology Core v1 (Agent → MicroDAO → Node → District)
- User Onboarding & Identity Layer (DAIS)
- Data Model UPDATE, Event Catalog, Governance & Permissions
- Rooms Layer, City/MicroDAO/Agents/Nodes Interface Architecture
- Helper files: ontology-summary, lifecycles, event-schemas

## Database Migration (027)
- DAIS tables: dais_identities, dais_emails, dais_wallets, dais_keys
- agent_assignments table for Assignment Layer
- rooms table for Rooms Layer
- event_outbox for NATS event delivery
- New enums: agent_role, microdao_type, node_kind, node_status, etc.
- Updated agents, microdaos, nodes tables with ontology fields

## Backend
- DAIS service & routes (/api/v1/dais/*)
- Assignment service & routes (/api/v1/assignments/*)
- Domain types for DAIS and Ontology

## Frontend
- Ontology types (Agent, MicroDAO, Node, DAIS, Assignments)
- API clients for DAIS and Assignments
- UI components: DaisProfileCard, AssignmentsPanel, OntologyBadge

Non-breaking update - all existing functionality preserved.
2025-11-29 15:24:38 -08:00

277 lines
8.4 KiB
Python

"""
Ingest Pipeline: PARSER → RAG
Converts ParsedDocument to Haystack Documents and indexes them
"""
import logging
from typing import List, Dict, Any, Optional
from haystack import Pipeline, Document
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from app.document_store import get_document_store
from app.embedding import get_text_embedder
from app.core.config import settings
from app.events import publish_document_ingested, publish_document_indexed
logger = logging.getLogger(__name__)
async def ingest_parsed_document(
dao_id: str,
doc_id: str,
parsed_json: Dict[str, Any],
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Ingest parsed document from PARSER service into RAG
Args:
dao_id: DAO identifier
doc_id: Document identifier
parsed_json: ParsedDocument JSON from PARSER service
user_id: Optional user identifier
Returns:
Dictionary with ingest results (doc_count, status, metrics)
"""
import time
ingest_start = time.time()
logger.info(f"Ingesting document: dao_id={dao_id}, doc_id={doc_id}")
try:
# Convert parsed_json to Haystack Documents
documents = _parsed_json_to_documents(parsed_json, dao_id, doc_id, user_id)
if not documents:
logger.warning(f"No documents to ingest for doc_id={doc_id}")
return {
"status": "error",
"message": "No documents to ingest",
"doc_count": 0
}
logger.info(f"Converted {len(documents)} blocks to Haystack Documents")
# Create ingest pipeline
pipeline = _create_ingest_pipeline()
# Run pipeline
pipeline_start = time.time()
result = pipeline.run({"documents": documents})
pipeline_time = time.time() - pipeline_start
# Extract results
written_docs = result.get("documents_writer", {}).get("documents_written", 0)
# Calculate metrics
total_time = time.time() - ingest_start
pages_count = len(parsed_json.get("pages", []))
blocks_count = sum(
len(page.get("blocks", []))
for page in parsed_json.get("pages", [])
)
logger.info(
f"Ingested {written_docs} documents for doc_id={doc_id}: "
f"pages={pages_count}, blocks={blocks_count}, "
f"pipeline_time={pipeline_time:.2f}s, total_time={total_time:.2f}s"
)
# Publish events
await _publish_events_async(
dao_id=dao_id,
doc_id=doc_id,
written_docs=written_docs,
pages_count=pages_count,
blocks_count=blocks_count,
pipeline_time=pipeline_time
)
return {
"status": "success",
"doc_count": written_docs,
"dao_id": dao_id,
"doc_id": doc_id,
"metrics": {
"pages_processed": pages_count,
"blocks_processed": blocks_count,
"documents_indexed": written_docs,
"pipeline_time_seconds": round(pipeline_time, 2),
"total_time_seconds": round(total_time, 2)
}
}
except Exception as e:
logger.error(f"Failed to ingest document: {e}", exc_info=True)
total_time = time.time() - ingest_start
logger.error(f"Ingest failed after {total_time:.2f}s: {e}")
return {
"status": "error",
"message": str(e),
"doc_count": 0,
"metrics": {
"total_time_seconds": round(total_time, 2),
"error": str(e)
}
}
def _parsed_json_to_documents(
parsed_json: Dict[str, Any],
dao_id: str,
doc_id: str,
user_id: Optional[str] = None
) -> List[Document]:
"""
Convert ParsedDocument JSON to Haystack Documents
Args:
parsed_json: ParsedDocument JSON structure
dao_id: DAO identifier
doc_id: Document identifier
user_id: Optional user identifier
Returns:
List of Haystack Document objects
"""
documents = []
# Extract pages from parsed_json
pages = parsed_json.get("pages", [])
for page_data in pages:
page_num = page_data.get("page_num", 1)
blocks = page_data.get("blocks", [])
for block in blocks:
# Skip empty blocks
text = block.get("text", "").strip()
if not text:
continue
# Build metadata (must-have для RAG)
meta = {
"dao_id": dao_id,
"doc_id": doc_id,
"page": page_num,
"block_type": block.get("type", "paragraph"),
"reading_order": block.get("reading_order", 0)
}
# Add optional fields
if block.get("bbox"):
bbox = block["bbox"]
meta.update({
"bbox_x": bbox.get("x", 0),
"bbox_y": bbox.get("y", 0),
"bbox_width": bbox.get("width", 0),
"bbox_height": bbox.get("height", 0)
})
# Add section if heading
if block.get("type") == "heading":
meta["section"] = text[:100] # First 100 chars as section name
# Add user_id if provided
if user_id:
meta["user_id"] = user_id
# Add document-level metadata
if parsed_json.get("metadata"):
meta.update({
k: v for k, v in parsed_json["metadata"].items()
if k not in ["dao_id"] # Already added
})
# Create Haystack Document
doc = Document(
content=text,
meta=meta
)
documents.append(doc)
return documents
async def _publish_events_async(
dao_id: str,
doc_id: str,
written_docs: int,
pages_count: int,
blocks_count: int,
pipeline_time: float
):
try:
await publish_document_ingested(
doc_id=doc_id,
team_id=dao_id,
dao_id=dao_id,
chunk_count=written_docs,
indexed=True,
visibility="public",
metadata={
"ingestion_time_ms": round(pipeline_time * 1000),
"embed_model": settings.EMBEDDING_MODEL or "bge-m3@v1",
"pages_processed": pages_count,
"blocks_processed": blocks_count
}
)
logger.info(f"Published rag.document.ingested event for doc_id={doc_id}")
chunk_ids = [f"{doc_id}_chunk_{i+1}" for i in range(written_docs)]
await publish_document_indexed(
doc_id=doc_id,
team_id=dao_id,
dao_id=dao_id,
chunk_ids=chunk_ids,
indexed=True,
visibility="public",
metadata={
"indexing_time_ms": 0,
"milvus_collection": "documents_v1",
"neo4j_nodes_created": len(chunk_ids),
"embed_model": settings.EMBEDDING_MODEL or "bge-m3@v1"
}
)
logger.info(f"Published rag.document.indexed event for doc_id={doc_id}")
except Exception as e:
logger.error(f"Failed to publish RAG events for doc_id={doc_id}: {e}")
def _create_ingest_pipeline() -> Pipeline:
"""
Create Haystack ingest pipeline
Pipeline: DocumentSplitter → Embedder → DocumentWriter
"""
# Get components
embedder = get_text_embedder()
document_store = get_document_store()
# Create splitter (optional, if chunks are too large)
splitter = DocumentSplitter(
split_by="sentence",
split_length=settings.CHUNK_SIZE,
split_overlap=settings.CHUNK_OVERLAP
)
# Create writer
writer = DocumentWriter(document_store)
# Build pipeline
pipeline = Pipeline()
pipeline.add_component("splitter", splitter)
pipeline.add_component("embedder", embedder)
pipeline.add_component("documents_writer", writer)
# Connect components
pipeline.connect("splitter", "embedder")
pipeline.connect("embedder", "documents_writer")
return pipeline