## Documentation (20 files) - DAARION Ontology Core v1 (Agent → MicroDAO → Node → District) - User Onboarding & Identity Layer (DAIS) - Data Model UPDATE, Event Catalog, Governance & Permissions - Rooms Layer, City/MicroDAO/Agents/Nodes Interface Architecture - Helper files: ontology-summary, lifecycles, event-schemas ## Database Migration (027) - DAIS tables: dais_identities, dais_emails, dais_wallets, dais_keys - agent_assignments table for Assignment Layer - rooms table for Rooms Layer - event_outbox for NATS event delivery - New enums: agent_role, microdao_type, node_kind, node_status, etc. - Updated agents, microdaos, nodes tables with ontology fields ## Backend - DAIS service & routes (/api/v1/dais/*) - Assignment service & routes (/api/v1/assignments/*) - Domain types for DAIS and Ontology ## Frontend - Ontology types (Agent, MicroDAO, Node, DAIS, Assignments) - API clients for DAIS and Assignments - UI components: DaisProfileCard, AssignmentsPanel, OntologyBadge Non-breaking update - all existing functionality preserved.
277 lines
8.4 KiB
Python
277 lines
8.4 KiB
Python
"""
|
|
Ingest Pipeline: PARSER → RAG
|
|
Converts ParsedDocument to Haystack Documents and indexes them
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
from haystack import Pipeline, Document
|
|
from haystack.components.preprocessors import DocumentSplitter
|
|
from haystack.components.writers import DocumentWriter
|
|
|
|
from app.document_store import get_document_store
|
|
from app.embedding import get_text_embedder
|
|
from app.core.config import settings
|
|
from app.events import publish_document_ingested, publish_document_indexed
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def ingest_parsed_document(
|
|
dao_id: str,
|
|
doc_id: str,
|
|
parsed_json: Dict[str, Any],
|
|
user_id: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Ingest parsed document from PARSER service into RAG
|
|
|
|
Args:
|
|
dao_id: DAO identifier
|
|
doc_id: Document identifier
|
|
parsed_json: ParsedDocument JSON from PARSER service
|
|
user_id: Optional user identifier
|
|
|
|
Returns:
|
|
Dictionary with ingest results (doc_count, status, metrics)
|
|
"""
|
|
import time
|
|
ingest_start = time.time()
|
|
|
|
logger.info(f"Ingesting document: dao_id={dao_id}, doc_id={doc_id}")
|
|
|
|
try:
|
|
# Convert parsed_json to Haystack Documents
|
|
documents = _parsed_json_to_documents(parsed_json, dao_id, doc_id, user_id)
|
|
|
|
if not documents:
|
|
logger.warning(f"No documents to ingest for doc_id={doc_id}")
|
|
return {
|
|
"status": "error",
|
|
"message": "No documents to ingest",
|
|
"doc_count": 0
|
|
}
|
|
|
|
logger.info(f"Converted {len(documents)} blocks to Haystack Documents")
|
|
|
|
# Create ingest pipeline
|
|
pipeline = _create_ingest_pipeline()
|
|
|
|
# Run pipeline
|
|
pipeline_start = time.time()
|
|
result = pipeline.run({"documents": documents})
|
|
pipeline_time = time.time() - pipeline_start
|
|
|
|
# Extract results
|
|
written_docs = result.get("documents_writer", {}).get("documents_written", 0)
|
|
|
|
# Calculate metrics
|
|
total_time = time.time() - ingest_start
|
|
pages_count = len(parsed_json.get("pages", []))
|
|
blocks_count = sum(
|
|
len(page.get("blocks", []))
|
|
for page in parsed_json.get("pages", [])
|
|
)
|
|
|
|
logger.info(
|
|
f"Ingested {written_docs} documents for doc_id={doc_id}: "
|
|
f"pages={pages_count}, blocks={blocks_count}, "
|
|
f"pipeline_time={pipeline_time:.2f}s, total_time={total_time:.2f}s"
|
|
)
|
|
|
|
# Publish events
|
|
await _publish_events_async(
|
|
dao_id=dao_id,
|
|
doc_id=doc_id,
|
|
written_docs=written_docs,
|
|
pages_count=pages_count,
|
|
blocks_count=blocks_count,
|
|
pipeline_time=pipeline_time
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"doc_count": written_docs,
|
|
"dao_id": dao_id,
|
|
"doc_id": doc_id,
|
|
"metrics": {
|
|
"pages_processed": pages_count,
|
|
"blocks_processed": blocks_count,
|
|
"documents_indexed": written_docs,
|
|
"pipeline_time_seconds": round(pipeline_time, 2),
|
|
"total_time_seconds": round(total_time, 2)
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to ingest document: {e}", exc_info=True)
|
|
total_time = time.time() - ingest_start
|
|
logger.error(f"Ingest failed after {total_time:.2f}s: {e}")
|
|
return {
|
|
"status": "error",
|
|
"message": str(e),
|
|
"doc_count": 0,
|
|
"metrics": {
|
|
"total_time_seconds": round(total_time, 2),
|
|
"error": str(e)
|
|
}
|
|
}
|
|
|
|
|
|
def _parsed_json_to_documents(
|
|
parsed_json: Dict[str, Any],
|
|
dao_id: str,
|
|
doc_id: str,
|
|
user_id: Optional[str] = None
|
|
) -> List[Document]:
|
|
"""
|
|
Convert ParsedDocument JSON to Haystack Documents
|
|
|
|
Args:
|
|
parsed_json: ParsedDocument JSON structure
|
|
dao_id: DAO identifier
|
|
doc_id: Document identifier
|
|
user_id: Optional user identifier
|
|
|
|
Returns:
|
|
List of Haystack Document objects
|
|
"""
|
|
documents = []
|
|
|
|
# Extract pages from parsed_json
|
|
pages = parsed_json.get("pages", [])
|
|
|
|
for page_data in pages:
|
|
page_num = page_data.get("page_num", 1)
|
|
blocks = page_data.get("blocks", [])
|
|
|
|
for block in blocks:
|
|
# Skip empty blocks
|
|
text = block.get("text", "").strip()
|
|
if not text:
|
|
continue
|
|
|
|
# Build metadata (must-have для RAG)
|
|
meta = {
|
|
"dao_id": dao_id,
|
|
"doc_id": doc_id,
|
|
"page": page_num,
|
|
"block_type": block.get("type", "paragraph"),
|
|
"reading_order": block.get("reading_order", 0)
|
|
}
|
|
|
|
# Add optional fields
|
|
if block.get("bbox"):
|
|
bbox = block["bbox"]
|
|
meta.update({
|
|
"bbox_x": bbox.get("x", 0),
|
|
"bbox_y": bbox.get("y", 0),
|
|
"bbox_width": bbox.get("width", 0),
|
|
"bbox_height": bbox.get("height", 0)
|
|
})
|
|
|
|
# Add section if heading
|
|
if block.get("type") == "heading":
|
|
meta["section"] = text[:100] # First 100 chars as section name
|
|
|
|
# Add user_id if provided
|
|
if user_id:
|
|
meta["user_id"] = user_id
|
|
|
|
# Add document-level metadata
|
|
if parsed_json.get("metadata"):
|
|
meta.update({
|
|
k: v for k, v in parsed_json["metadata"].items()
|
|
if k not in ["dao_id"] # Already added
|
|
})
|
|
|
|
# Create Haystack Document
|
|
doc = Document(
|
|
content=text,
|
|
meta=meta
|
|
)
|
|
|
|
documents.append(doc)
|
|
|
|
return documents
|
|
|
|
|
|
async def _publish_events_async(
|
|
dao_id: str,
|
|
doc_id: str,
|
|
written_docs: int,
|
|
pages_count: int,
|
|
blocks_count: int,
|
|
pipeline_time: float
|
|
):
|
|
try:
|
|
await publish_document_ingested(
|
|
doc_id=doc_id,
|
|
team_id=dao_id,
|
|
dao_id=dao_id,
|
|
chunk_count=written_docs,
|
|
indexed=True,
|
|
visibility="public",
|
|
metadata={
|
|
"ingestion_time_ms": round(pipeline_time * 1000),
|
|
"embed_model": settings.EMBEDDING_MODEL or "bge-m3@v1",
|
|
"pages_processed": pages_count,
|
|
"blocks_processed": blocks_count
|
|
}
|
|
)
|
|
logger.info(f"Published rag.document.ingested event for doc_id={doc_id}")
|
|
|
|
chunk_ids = [f"{doc_id}_chunk_{i+1}" for i in range(written_docs)]
|
|
await publish_document_indexed(
|
|
doc_id=doc_id,
|
|
team_id=dao_id,
|
|
dao_id=dao_id,
|
|
chunk_ids=chunk_ids,
|
|
indexed=True,
|
|
visibility="public",
|
|
metadata={
|
|
"indexing_time_ms": 0,
|
|
"milvus_collection": "documents_v1",
|
|
"neo4j_nodes_created": len(chunk_ids),
|
|
"embed_model": settings.EMBEDDING_MODEL or "bge-m3@v1"
|
|
}
|
|
)
|
|
logger.info(f"Published rag.document.indexed event for doc_id={doc_id}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to publish RAG events for doc_id={doc_id}: {e}")
|
|
|
|
|
|
def _create_ingest_pipeline() -> Pipeline:
|
|
"""
|
|
Create Haystack ingest pipeline
|
|
|
|
Pipeline: DocumentSplitter → Embedder → DocumentWriter
|
|
"""
|
|
# Get components
|
|
embedder = get_text_embedder()
|
|
document_store = get_document_store()
|
|
|
|
# Create splitter (optional, if chunks are too large)
|
|
splitter = DocumentSplitter(
|
|
split_by="sentence",
|
|
split_length=settings.CHUNK_SIZE,
|
|
split_overlap=settings.CHUNK_OVERLAP
|
|
)
|
|
|
|
# Create writer
|
|
writer = DocumentWriter(document_store)
|
|
|
|
# Build pipeline
|
|
pipeline = Pipeline()
|
|
pipeline.add_component("splitter", splitter)
|
|
pipeline.add_component("embedder", embedder)
|
|
pipeline.add_component("documents_writer", writer)
|
|
|
|
# Connect components
|
|
pipeline.connect("splitter", "embedder")
|
|
pipeline.connect("embedder", "documents_writer")
|
|
|
|
return pipeline
|
|
|