Files
Apple 0c8bef82f4 feat: Add Alateya, Clan, Eonarch agents + fix gateway-router connection
## Agents Added
- Alateya: R&D, biotech, innovations
- Clan (Spirit): Community spirit agent
- Eonarch: Consciousness evolution agent

## Changes
- docker-compose.node1.yml: Added tokens for all 3 new agents
- gateway-bot/http_api.py: Added configs and webhook endpoints
- gateway-bot/clan_prompt.txt: New prompt file
- gateway-bot/eonarch_prompt.txt: New prompt file

## Fixes
- Fixed ROUTER_URL from :9102 to :8000 (internal container port)
- All 9 Telegram agents now working

## Documentation
- Created PROJECT-MASTER-INDEX.md - single entry point
- Added various status documents and scripts

Tokens configured:
- Helion, NUTRA, Agromatrix (existing)
- Alateya, Clan, Eonarch (new)
- Druid, GreenFood, DAARWIZZ (configured)
2026-01-28 06:40:34 -08:00

175 lines
4.6 KiB
Python

"""
Events module for rag-service
Publishes RAG events to NATS JetStream STREAM_RAG
"""
import json
import uuid
import logging
from datetime import datetime
from typing import Dict, Any, Optional
import asyncio
from app.core.config import settings
try:
import nats
NATS_AVAILABLE = True
except ImportError:
NATS_AVAILABLE = False
nats = None
logger = logging.getLogger(__name__)
# Connection to NATS
_nats_conn: Optional[nats.NATS] = None
async def is_nats_available():
"""Check if NATS is available"""
return NATS_AVAILABLE
async def get_nats_connection():
"""Initialize or return existing NATS connection"""
if not NATS_AVAILABLE:
logger.warning("NATS not available, events will be skipped")
return None
global _nats_conn
if _nats_conn is None:
_nats_conn = await nats.connect(settings.NATS_URL)
# Initialize JetStream context
js = _nats_conn.jetstream()
# Ensure STREAM_RAG exists
try:
await js.add_stream(
name="STREAM_RAG",
subjects=[
"parser.document.parsed",
"rag.document.ingested",
"rag.document.indexed"
],
retention=nats.RetentionPolicy.WORK_QUEUE,
storage=nats.StorageType.FILE,
replicas=3
)
logger.info("STREAM_RAG created or already exists")
except nats.js.errors.StreamAlreadyExists:
logger.info("STREAM_RAG already exists")
except Exception as e:
logger.error(f"Failed to create STREAM_RAG: {e}")
raise
return _nats_conn
async def publish_event(
subject: str,
payload: Dict[str, Any],
team_id: str,
trace_id: Optional[str] = None,
span_id: Optional[str] = None
):
"""Publish an event to NATS JetStream"""
try:
conn = await get_nats_connection()
event_envelope = {
"event_id": f"evt_{uuid.uuid4().hex[:8]}",
"ts": datetime.utcnow().isoformat() + "Z",
"domain": "rag",
"type": subject,
"version": 1,
"actor": {
"id": "rag-service",
"kind": "service"
},
"payload": payload,
"meta": {
"team_id": team_id,
"trace_id": trace_id or uuid.uuid4().hex[:8],
"span_id": span_id or uuid.uuid4().hex[:8]
}
}
# Publish to JetStream
js = conn.jetstream()
ack = await js.publish(subject, json.dumps(event_envelope))
logger.info(
f"Event published to {subject}: seq={ack.sequence}, stream_seq={ack.stream_seq}"
)
return ack
except Exception as e:
logger.error(f"Failed to publish event {subject}: {e}", exc_info=True)
raise
async def publish_document_ingested(
doc_id: str,
team_id: str,
dao_id: str,
chunk_count: int,
indexed: bool = True,
visibility: str = "public",
metadata: Optional[Dict[str, Any]] = None,
trace_id: Optional[str] = None,
span_id: Optional[str] = None
):
"""Publish rag.document.ingested event"""
payload = {
"doc_id": doc_id,
"team_id": team_id,
"dao_id": dao_id,
"chunk_count": chunk_count,
"indexed": indexed,
"visibility": visibility,
"metadata": metadata or {}
}
return await publish_event(
subject="rag.document.ingested",
payload=payload,
team_id=team_id,
trace_id=trace_id,
span_id=span_id
)
async def publish_document_indexed(
doc_id: str,
team_id: str,
dao_id: str,
chunk_ids: list[str],
indexed: bool = True,
visibility: str = "public",
metadata: Optional[Dict[str, Any]] = None,
trace_id: Optional[str] = None,
span_id: Optional[str] = None
):
"""Publish rag.document.indexed event"""
payload = {
"doc_id": doc_id,
"team_id": team_id,
"dao_id": dao_id,
"chunk_ids": chunk_ids,
"indexed": indexed,
"visibility": visibility,
"metadata": metadata or {}
}
return await publish_event(
subject="rag.document.indexed",
payload=payload,
team_id=team_id,
trace_id=trace_id,
span_id=span_id
)
async def close_nats():
"""Close NATS connection"""
global _nats_conn
if _nats_conn:
await _nats_conn.drain()
await _nats_conn.close()
_nats_conn = None
logger.info("NATS connection closed")