#!/usr/bin/env python3 """ Local RAG Router for microDAO Node-2 Routes queries to appropriate memory stores: - local.memory.qdrant - Fast RAG - local.memory.milvus - Heavy indexing - local.graph.neo4j - Graph queries - global.memory - Node-1 (optional) """ import logging from typing import Dict, List, Optional, Any from enum import Enum import httpx from pydantic import BaseModel logger = logging.getLogger(__name__) class MemoryTarget(str, Enum): """Memory store targets""" QDRANT = "local.memory.qdrant" MILVUS = "local.memory.milvus" NEO4J = "local.graph.neo4j" GLOBAL = "global.memory" class QueryType(str, Enum): """Query types""" VECTOR_SEARCH = "vector_search" GRAPH_QUERY = "graph_query" HYBRID = "hybrid" METADATA = "metadata" class RAGQuery(BaseModel): """RAG query model""" query: str query_type: QueryType = QueryType.VECTOR_SEARCH limit: int = 10 filters: Optional[Dict[str, Any]] = None target: Optional[MemoryTarget] = None use_global: bool = False class RAGRouter: """Local RAG Router for microDAO Node-2""" def __init__( self, qdrant_url: str = "http://localhost:6333", milvus_host: str = "localhost", milvus_port: int = 19530, neo4j_uri: str = "bolt://localhost:7687", neo4j_user: str = "neo4j", neo4j_password: str = "microdao-node2", global_memory_url: Optional[str] = None ): self.qdrant_url = qdrant_url self.milvus_host = milvus_host self.milvus_port = milvus_port self.neo4j_uri = neo4j_uri self.neo4j_user = neo4j_user self.neo4j_password = neo4j_password self.global_memory_url = global_memory_url self.http_client = httpx.AsyncClient(timeout=30.0) def _determine_target(self, query: RAGQuery) -> MemoryTarget: """Determine target memory store based on query""" # If target explicitly specified if query.target: return query.target # Route based on query type if query.query_type == QueryType.GRAPH_QUERY: return MemoryTarget.NEO4J if query.query_type == QueryType.VECTOR_SEARCH: # Use Qdrant for fast queries, Milvus for heavy indexing if query.limit <= 50 and not query.filters: return MemoryTarget.QDRANT else: return MemoryTarget.MILVUS if query.query_type == QueryType.HYBRID: # Hybrid queries can use both return MemoryTarget.QDRANT # Default to Qdrant for speed # Default to Qdrant return MemoryTarget.QDRANT async def _query_qdrant(self, query: RAGQuery) -> List[Dict[str, Any]]: """Query Qdrant vector database""" try: # First, get collection info collections_response = await self.http_client.get( f"{self.qdrant_url}/collections" ) collections = collections_response.json().get("result", {}).get("collections", []) if not collections: logger.warning("No collections found in Qdrant") return [] # Use first collection (or implement collection selection logic) collection_name = collections[0].get("name", "default") # For now, return empty - actual vector search requires embeddings # This is a placeholder for the actual implementation logger.info(f"Querying Qdrant collection: {collection_name}") return [] except Exception as e: logger.error(f"Error querying Qdrant: {e}") return [] async def _query_milvus(self, query: RAGQuery) -> List[Dict[str, Any]]: """Query Milvus vector database""" try: # Milvus query implementation # This requires pymilvus client logger.info(f"Querying Milvus (not yet implemented)") return [] except Exception as e: logger.error(f"Error querying Milvus: {e}") return [] async def _query_neo4j(self, query: RAGQuery) -> List[Dict[str, Any]]: """Query Neo4j graph database""" try: # Neo4j query implementation # This requires neo4j driver logger.info(f"Querying Neo4j (not yet implemented)") return [] except Exception as e: logger.error(f"Error querying Neo4j: {e}") return [] async def _query_global(self, query: RAGQuery) -> List[Dict[str, Any]]: """Query global memory on Node-1""" if not self.global_memory_url: logger.warning("Global memory URL not configured") return [] try: response = await self.http_client.post( f"{self.global_memory_url}/query", json=query.dict() ) return response.json().get("results", []) except Exception as e: logger.error(f"Error querying global memory: {e}") return [] async def route_query(self, query: RAGQuery) -> Dict[str, Any]: """Route query to appropriate memory store""" target = self._determine_target(query) logger.info(f"Routing query to: {target}") results = [] # Query primary target if target == MemoryTarget.QDRANT: results = await self._query_qdrant(query) elif target == MemoryTarget.MILVUS: results = await self._query_milvus(query) elif target == MemoryTarget.NEO4J: results = await self._query_neo4j(query) # Optionally query global memory if query.use_global and self.global_memory_url: global_results = await self._query_global(query) results.extend(global_results) return { "target": target.value, "results": results, "count": len(results) } async def health_check(self) -> Dict[str, bool]: """Check health of all memory stores""" health = { "qdrant": False, "milvus": False, "neo4j": False, "global": False } # Check Qdrant try: response = await self.http_client.get(f"{self.qdrant_url}/health") health["qdrant"] = response.status_code == 200 except: pass # Check Milvus (placeholder) health["milvus"] = True # TODO: Implement actual health check # Check Neo4j (placeholder) health["neo4j"] = True # TODO: Implement actual health check # Check Global (placeholder) if self.global_memory_url: try: response = await self.http_client.get(f"{self.global_memory_url}/health") health["global"] = response.status_code == 200 except: pass return health async def close(self): """Close HTTP client""" await self.http_client.aclose() # FastAPI integration from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse app = FastAPI(title="microDAO Node-2 RAG Router") # Global router instance router: Optional[RAGRouter] = None @app.on_event("startup") async def startup(): """Initialize router on startup""" global router router = RAGRouter( global_memory_url=None # Configure from environment ) @app.on_event("shutdown") async def shutdown(): """Close router on shutdown""" global router if router: await router.close() @app.post("/query") async def query_endpoint(query: RAGQuery): """Query endpoint""" if not router: raise HTTPException(status_code=503, detail="Router not initialized") try: result = await router.route_query(query) return JSONResponse(content=result) except Exception as e: logger.error(f"Error in query endpoint: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/health") async def health_endpoint(): """Health check endpoint""" if not router: return JSONResponse( status_code=503, content={"status": "unhealthy", "reason": "Router not initialized"} ) health = await router.health_check() all_healthy = all(health.values()) return JSONResponse( status_code=200 if all_healthy else 503, content={ "status": "healthy" if all_healthy else "degraded", "stores": health } ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=9401)