Files
microdao-daarion/services/rag-service/app/ingest_pipeline.py
Apple 0c8bef82f4 feat: Add Alateya, Clan, Eonarch agents + fix gateway-router connection
## Agents Added
- Alateya: R&D, biotech, innovations
- Clan (Spirit): Community spirit agent
- Eonarch: Consciousness evolution agent

## Changes
- docker-compose.node1.yml: Added tokens for all 3 new agents
- gateway-bot/http_api.py: Added configs and webhook endpoints
- gateway-bot/clan_prompt.txt: New prompt file
- gateway-bot/eonarch_prompt.txt: New prompt file

## Fixes
- Fixed ROUTER_URL from :9102 to :8000 (internal container port)
- All 9 Telegram agents now working

## Documentation
- Created PROJECT-MASTER-INDEX.md - single entry point
- Added various status documents and scripts

Tokens configured:
- Helion, NUTRA, Agromatrix (existing)
- Alateya, Clan, Eonarch (new)
- Druid, GreenFood, DAARWIZZ (configured)
2026-01-28 06:40:34 -08:00

246 lines
7.8 KiB
Python

"""
Ingest Pipeline: PARSER → RAG
Converts ParsedDocument to Haystack Documents and indexes them
"""
import logging
from typing import List, Dict, Any, Optional
from app.document_store import get_document_store, _make_document
from app.embedding import get_text_embedder
from app.core.config import settings
from app.events import publish_document_ingested, publish_document_indexed
logger = logging.getLogger(__name__)
async def ingest_parsed_document(
dao_id: str,
doc_id: str,
parsed_json: Dict[str, Any],
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Ingest parsed document from PARSER service into RAG
Args:
dao_id: DAO identifier
doc_id: Document identifier
parsed_json: ParsedDocument JSON from PARSER service
user_id: Optional user identifier
Returns:
Dictionary with ingest results (doc_count, status, metrics)
"""
import time
ingest_start = time.time()
logger.info(f"Ingesting document: dao_id={dao_id}, doc_id={doc_id}")
try:
# Convert parsed_json to Haystack Documents
documents = _parsed_json_to_documents(parsed_json, dao_id, doc_id, user_id)
if not documents:
logger.warning(f"No documents to ingest for doc_id={doc_id}")
return {
"status": "error",
"message": "No documents to ingest",
"doc_count": 0
}
logger.info(f"Converted {len(documents)} blocks to document chunks")
embedder = get_text_embedder()
texts = [doc["content"] for doc in documents]
embedding_result = embedder.run(texts=texts)
embeddings = embedding_result.get("embeddings", [])
doc_objects = []
for idx, doc in enumerate(documents):
embedding = embeddings[idx] if idx < len(embeddings) else None
doc_objects.append(
_make_document(content=doc["content"], meta=doc["meta"], embedding=embedding)
)
pipeline_start = time.time()
document_store = get_document_store()
document_store.write_documents(doc_objects)
pipeline_time = time.time() - pipeline_start
written_docs = len(doc_objects)
# Calculate metrics
total_time = time.time() - ingest_start
pages_count = len(parsed_json.get("pages", []))
blocks_count = sum(
len(page.get("blocks", []))
for page in parsed_json.get("pages", [])
)
logger.info(
f"Ingested {written_docs} documents for doc_id={doc_id}: "
f"pages={pages_count}, blocks={blocks_count}, "
f"pipeline_time={pipeline_time:.2f}s, total_time={total_time:.2f}s"
)
# Publish events
await _publish_events_async(
dao_id=dao_id,
doc_id=doc_id,
written_docs=written_docs,
pages_count=pages_count,
blocks_count=blocks_count,
pipeline_time=pipeline_time
)
return {
"status": "success",
"doc_count": written_docs,
"dao_id": dao_id,
"doc_id": doc_id,
"metrics": {
"pages_processed": pages_count,
"blocks_processed": blocks_count,
"documents_indexed": written_docs,
"pipeline_time_seconds": round(pipeline_time, 2),
"total_time_seconds": round(total_time, 2)
}
}
except Exception as e:
logger.error(f"Failed to ingest document: {e}", exc_info=True)
total_time = time.time() - ingest_start
logger.error(f"Ingest failed after {total_time:.2f}s: {e}")
return {
"status": "error",
"message": str(e),
"doc_count": 0,
"metrics": {
"total_time_seconds": round(total_time, 2),
"error": str(e)
}
}
def _parsed_json_to_documents(
parsed_json: Dict[str, Any],
dao_id: str,
doc_id: str,
user_id: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Convert ParsedDocument JSON to Haystack Documents
Args:
parsed_json: ParsedDocument JSON structure
dao_id: DAO identifier
doc_id: Document identifier
user_id: Optional user identifier
Returns:
List of Haystack Document objects
"""
documents: List[Dict[str, Any]] = []
# Extract pages from parsed_json
pages = parsed_json.get("pages", [])
for page_data in pages:
page_num = page_data.get("page_num", 1)
blocks = page_data.get("blocks", [])
for block in blocks:
# Skip empty blocks
text = block.get("text", "").strip()
if not text:
continue
# Build metadata (must-have для RAG)
meta = {
"dao_id": dao_id,
"doc_id": doc_id,
"page": page_num,
"block_type": block.get("type", "paragraph"),
"reading_order": block.get("reading_order", 0)
}
# Add optional fields
if block.get("bbox"):
bbox = block["bbox"]
meta.update({
"bbox_x": bbox.get("x", 0),
"bbox_y": bbox.get("y", 0),
"bbox_width": bbox.get("width", 0),
"bbox_height": bbox.get("height", 0)
})
# Add section if heading
if block.get("type") == "heading":
meta["section"] = text[:100] # First 100 chars as section name
# Add user_id if provided
if user_id:
meta["user_id"] = user_id
# Add document-level metadata
if parsed_json.get("metadata"):
meta.update({
k: v for k, v in parsed_json["metadata"].items()
if k not in ["dao_id"] # Already added
})
documents.append({"content": text, "meta": meta})
return documents
async def _publish_events_async(
dao_id: str,
doc_id: str,
written_docs: int,
pages_count: int,
blocks_count: int,
pipeline_time: float
):
try:
await publish_document_ingested(
doc_id=doc_id,
team_id=dao_id,
dao_id=dao_id,
chunk_count=written_docs,
indexed=True,
visibility="public",
metadata={
"ingestion_time_ms": round(pipeline_time * 1000),
"embed_model": settings.EMBEDDING_MODEL or "bge-m3@v1",
"pages_processed": pages_count,
"blocks_processed": blocks_count
}
)
logger.info(f"Published rag.document.ingested event for doc_id={doc_id}")
chunk_ids = [f"{doc_id}_chunk_{i+1}" for i in range(written_docs)]
await publish_document_indexed(
doc_id=doc_id,
team_id=dao_id,
dao_id=dao_id,
chunk_ids=chunk_ids,
indexed=True,
visibility="public",
metadata={
"indexing_time_ms": 0,
"milvus_collection": "documents_v1",
"neo4j_nodes_created": len(chunk_ids),
"embed_model": settings.EMBEDDING_MODEL or "bge-m3@v1"
}
)
logger.info(f"Published rag.document.indexed event for doc_id={doc_id}")
except Exception as e:
logger.error(f"Failed to publish RAG events for doc_id={doc_id}: {e}")
def _create_ingest_pipeline():
# Deprecated: no haystack pipeline in minimal PGVector mode.
return None