Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.
Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles
Excluded from snapshot: venv/, .env, data/, backups, .tgz archives
Co-authored-by: Cursor <cursoragent@cursor.com>
381 lines
13 KiB
Plaintext
381 lines
13 KiB
Plaintext
"""
|
|
CrewAI Orchestrator Service
|
|
Manages multi-agent teams for complex tasks
|
|
|
|
Orchestrators: Helion, Daarwizz, Yaromir
|
|
Workers: Greenfood, Druid, Nutra, Clan, Soul, Eonarch, Monitor
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from typing import Dict, List, Optional, Any
|
|
from fastapi import FastAPI, HTTPException
|
|
from pydantic import BaseModel
|
|
import httpx
|
|
import yaml
|
|
|
|
# Registry loader for dynamic agent profiles
|
|
try:
|
|
from registry_loader import load_registry, get_orchestrators, get_team, is_orchestrator, get_agent_profile
|
|
REGISTRY_AVAILABLE = True
|
|
except ImportError:
|
|
REGISTRY_AVAILABLE = False
|
|
load_registry = None
|
|
|
|
# CrewAI imports
|
|
try:
|
|
from crewai import Agent, Task, Crew, Process
|
|
from crewai.tools import BaseTool
|
|
CREWAI_AVAILABLE = True
|
|
except ImportError:
|
|
CREWAI_AVAILABLE = False
|
|
logging.warning("CrewAI not installed, running in mock mode")
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = FastAPI(title="CrewAI Orchestrator", version="1.0.0")
|
|
|
|
# Configuration
|
|
ROUTER_URL = os.getenv("ROUTER_URL", "http://router:8000")
|
|
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY", "")
|
|
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY", "")
|
|
|
|
# Agent definitions
|
|
AGENT_PROFILES = {
|
|
"helion": {
|
|
"role": "Energy Research Lead & Orchestrator",
|
|
"goal": "Coordinate energy research, analyze biomass potential, manage BioMiner deployment strategy",
|
|
"backstory": "You are Helion, the lead AI researcher for Energy Union. You coordinate teams of specialists to analyze energy markets, biomass resources, and deployment opportunities.",
|
|
"can_orchestrate": True,
|
|
"specialties": ["energy", "biomass", "sustainability", "market_analysis"]
|
|
},
|
|
"daarwizz": {
|
|
"role": "DAO Strategy Architect & Orchestrator",
|
|
"goal": "Design tokenomics, governance structures, and coordinate strategic planning",
|
|
"backstory": "You are Daarwizz, the strategic mastermind of DAARION ecosystem. You design decentralized governance and coordinate complex multi-stakeholder initiatives.",
|
|
"can_orchestrate": True,
|
|
"specialties": ["dao", "tokenomics", "governance", "strategy"]
|
|
},
|
|
"yaromir": {
|
|
"role": "Technical Lead & Orchestrator",
|
|
"goal": "Coordinate technical teams, architect solutions, manage development workflows",
|
|
"backstory": "You are Yaromir, the technical architect of DAARION. You lead development teams and ensure technical excellence.",
|
|
"can_orchestrate": True,
|
|
"specialties": ["development", "architecture", "devops", "security"]
|
|
},
|
|
"greenfood": {
|
|
"role": "Organic Food & Agriculture Specialist",
|
|
"goal": "Analyze organic food markets, sustainable agriculture practices",
|
|
"backstory": "You are Greenfood, specialist in organic agriculture and sustainable food systems.",
|
|
"can_orchestrate": False,
|
|
"specialties": ["agriculture", "organic", "food", "sustainability"]
|
|
},
|
|
"druid": {
|
|
"role": "Environmental Data Analyst",
|
|
"goal": "Analyze environmental data, climate patterns, ecological systems",
|
|
"backstory": "You are Druid, the environmental intelligence specialist.",
|
|
"can_orchestrate": False,
|
|
"specialties": ["environment", "climate", "ecology", "data_analysis"]
|
|
},
|
|
"nutra": {
|
|
"role": "Nutrition & Health Researcher",
|
|
"goal": "Research nutrition science, health impacts of food systems",
|
|
"backstory": "You are Nutra, specialist in nutritional science and health.",
|
|
"can_orchestrate": False,
|
|
"specialties": ["nutrition", "health", "science", "research"]
|
|
},
|
|
"clan": {
|
|
"role": "Community & Partnership Manager",
|
|
"goal": "Build communities, manage partnerships, coordinate stakeholders",
|
|
"backstory": "You are Clan, the community builder and partnership coordinator.",
|
|
"can_orchestrate": False,
|
|
"specialties": ["community", "partnerships", "stakeholders", "communication"]
|
|
},
|
|
"monitor": {
|
|
"role": "Systems Monitor & Analytics",
|
|
"goal": "Monitor system health, analyze metrics, report anomalies",
|
|
"backstory": "You are Monitor, the watchful guardian of DAARION systems.",
|
|
"can_orchestrate": False,
|
|
"specialties": ["monitoring", "analytics", "alerts", "reporting"]
|
|
}
|
|
}
|
|
|
|
# Dynamic profile getter (registry first, fallback to hardcoded)
|
|
def get_agent_profile_dynamic(agent_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get agent profile from registry or fallback to AGENT_PROFILES."""
|
|
if REGISTRY_AVAILABLE:
|
|
registry = load_registry()
|
|
if registry:
|
|
# Try to find in agents dict
|
|
agents = registry.get("agents", {})
|
|
if agent_id in agents:
|
|
agent = agents[agent_id]
|
|
return {
|
|
"role": agent.get("role", agent.get("canonical_role", "Agent")),
|
|
"goal": agent.get("mission", ""),
|
|
"backstory": f"You are {agent.get('display_name', agent_id)}.",
|
|
"can_orchestrate": agent.get("crewai", {}).get("orchestrator", False),
|
|
"specialties": agent.get("domains", [])
|
|
}
|
|
|
|
# Try orchestrators list
|
|
for o in registry.get("orchestrators", []):
|
|
if o.get("id") == agent_id:
|
|
return {
|
|
"role": o.get("role", "Orchestrator"),
|
|
"goal": "",
|
|
"backstory": f"You are {o.get('display_name', agent_id)}.",
|
|
"can_orchestrate": o.get("can_orchestrate", True),
|
|
"specialties": o.get("domains", [])
|
|
}
|
|
|
|
# Fallback to hardcoded
|
|
return AGENT_PROFILES.get(agent_id)
|
|
|
|
|
|
def get_team_dynamic(orchestrator_id: str) -> List[Dict[str, str]]:
|
|
"""Get team for orchestrator from registry or empty list."""
|
|
if REGISTRY_AVAILABLE:
|
|
team = get_team(orchestrator_id)
|
|
if team:
|
|
return team
|
|
return []
|
|
|
|
|
|
|
|
# Request/Response models
|
|
class CrewRequest(BaseModel):
|
|
task: str
|
|
orchestrator: str = "helion"
|
|
team: Optional[List[str]] = None
|
|
context: Optional[Dict[str, Any]] = None
|
|
max_iterations: int = 5
|
|
verbose: bool = False
|
|
|
|
class CrewResponse(BaseModel):
|
|
success: bool
|
|
result: Optional[str] = None
|
|
agents_used: List[str] = []
|
|
iterations: int = 0
|
|
error: Optional[str] = None
|
|
|
|
class AgentRequest(BaseModel):
|
|
agent_id: str
|
|
message: str
|
|
context: Optional[Dict[str, Any]] = None
|
|
|
|
# Router client for calling individual agents
|
|
async def call_agent(agent_id: str, message: str, context: Dict = None) -> str:
|
|
"""Call an individual agent via Router"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
response = await client.post(
|
|
f"{ROUTER_URL}/v1/agents/{agent_id}/infer",
|
|
json={
|
|
"prompt": message,
|
|
"metadata": {**(context or {}), "from_crewai": True}
|
|
}
|
|
)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return data.get("response", "")
|
|
else:
|
|
logger.error(f"Agent {agent_id} call failed: {response.status_code}")
|
|
return f"Error calling {agent_id}"
|
|
except Exception as e:
|
|
logger.error(f"Agent call error: {e}")
|
|
return f"Error: {e}"
|
|
|
|
|
|
# Custom tool for calling DAARION agents
|
|
class DaarionAgentTool(BaseTool):
|
|
name: str = "daarion_agent"
|
|
description: str = "Call another DAARION agent for specialized tasks"
|
|
agent_id: str = ""
|
|
|
|
def _run(self, query: str) -> str:
|
|
import asyncio
|
|
return asyncio.run(call_agent(self.agent_id, query))
|
|
|
|
|
|
def create_crewai_agent(agent_id: str) -> Optional[Agent]:
|
|
"""Create a CrewAI agent from profile"""
|
|
if not CREWAI_AVAILABLE:
|
|
return None
|
|
|
|
profile = get_agent_profile_dynamic(agent_id)
|
|
if not profile:
|
|
return None
|
|
|
|
# Use DeepSeek or Mistral as LLM
|
|
llm_config = {
|
|
"model": "deepseek-chat",
|
|
"api_key": DEEPSEEK_API_KEY,
|
|
"base_url": "https://api.deepseek.com"
|
|
} if DEEPSEEK_API_KEY else {
|
|
"model": "mistral-large-latest",
|
|
"api_key": MISTRAL_API_KEY,
|
|
"base_url": "https://api.mistral.ai/v1"
|
|
}
|
|
|
|
return Agent(
|
|
role=profile["role"],
|
|
goal=profile["goal"],
|
|
backstory=profile["backstory"],
|
|
verbose=True,
|
|
allow_delegation=profile["can_orchestrate"],
|
|
llm=llm_config
|
|
)
|
|
|
|
|
|
def select_team_for_task(task: str, orchestrator: str) -> List[str]:
|
|
"""Automatically select best team for a task"""
|
|
task_lower = task.lower()
|
|
team = []
|
|
|
|
# Always include orchestrator
|
|
if orchestrator in AGENT_PROFILES:
|
|
team.append(orchestrator)
|
|
|
|
# Select specialists based on keywords
|
|
keyword_mapping = {
|
|
"greenfood": ["food", "agriculture", "organic", "farming", "crop"],
|
|
"druid": ["environment", "climate", "ecology", "nature", "forest", "biomass"],
|
|
"nutra": ["nutrition", "health", "diet", "vitamin", "supplement"],
|
|
"clan": ["community", "partner", "stakeholder", "team", "collaboration"],
|
|
"monitor": ["monitor", "metric", "alert", "status", "performance", "health"]
|
|
}
|
|
|
|
for agent_id, keywords in keyword_mapping.items():
|
|
if any(kw in task_lower for kw in keywords):
|
|
if agent_id not in team:
|
|
team.append(agent_id)
|
|
|
|
# Limit team size
|
|
return team[:4]
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {
|
|
"status": "healthy",
|
|
"crewai_available": CREWAI_AVAILABLE,
|
|
"agents": list(AGENT_PROFILES.keys())
|
|
}
|
|
|
|
|
|
@app.get("/agents")
|
|
async def list_agents():
|
|
"""List all available agents"""
|
|
return {
|
|
"orchestrators": [k for k, v in AGENT_PROFILES.items() if v["can_orchestrate"]],
|
|
"workers": [k for k, v in AGENT_PROFILES.items() if not v["can_orchestrate"]],
|
|
"profiles": AGENT_PROFILES
|
|
}
|
|
|
|
|
|
@app.post("/crew/run", response_model=CrewResponse)
|
|
async def run_crew(request: CrewRequest):
|
|
"""Run a crew of agents to complete a task"""
|
|
|
|
# Simple mode: use Router to call orchestrator agent
|
|
# This works without external API keys
|
|
USE_SIMPLE_MODE = os.getenv("CREWAI_SIMPLE_MODE", "true").lower() == "true"
|
|
|
|
if USE_SIMPLE_MODE or not CREWAI_AVAILABLE:
|
|
try:
|
|
# Get team for logging
|
|
team = request.team or select_team_for_task(request.task, request.orchestrator)
|
|
logger.info(f"🚀 Simple mode crew: orchestrator={request.orchestrator}, team={team}")
|
|
|
|
# Call orchestrator agent via Router (it has full LLM access)
|
|
result = await call_agent(request.orchestrator, request.task, request.context)
|
|
|
|
if result and not result.startswith("Error"):
|
|
return CrewResponse(
|
|
success=True,
|
|
result=result,
|
|
agents_used=[request.orchestrator],
|
|
iterations=1
|
|
)
|
|
else:
|
|
return CrewResponse(
|
|
success=False,
|
|
result=None,
|
|
agents_used=[],
|
|
error=result or "Empty response from orchestrator"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Simple mode crew failed: {e}")
|
|
return CrewResponse(
|
|
success=False,
|
|
result=None,
|
|
agents_used=[],
|
|
error=str(e)
|
|
)
|
|
|
|
try:
|
|
# Full CrewAI mode (requires API keys)
|
|
team = request.team or select_team_for_task(request.task, request.orchestrator)
|
|
logger.info(f"🚀 Starting crew: {team} for task: {request.task[:50]}...")
|
|
|
|
# Create agents
|
|
agents = []
|
|
for agent_id in team:
|
|
agent = create_crewai_agent(agent_id)
|
|
if agent:
|
|
agents.append(agent)
|
|
|
|
if not agents:
|
|
raise HTTPException(status_code=400, detail="No valid agents in team")
|
|
|
|
# Create task
|
|
task = Task(
|
|
description=request.task,
|
|
expected_output="Detailed analysis and recommendations",
|
|
agent=agents[0] # Lead agent
|
|
)
|
|
|
|
# Create and run crew
|
|
crew = Crew(
|
|
agents=agents,
|
|
tasks=[task],
|
|
process=Process.hierarchical if len(agents) > 2 else Process.sequential,
|
|
verbose=request.verbose,
|
|
max_iter=request.max_iterations
|
|
)
|
|
|
|
result = crew.kickoff()
|
|
|
|
return CrewResponse(
|
|
success=True,
|
|
result=str(result),
|
|
agents_used=team,
|
|
iterations=request.max_iterations
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Crew execution failed: {e}")
|
|
return CrewResponse(
|
|
success=False,
|
|
error=str(e),
|
|
agents_used=[]
|
|
)
|
|
|
|
|
|
@app.post("/agent/call")
|
|
async def call_single_agent(request: AgentRequest):
|
|
"""Call a single agent directly"""
|
|
result = await call_agent(request.agent_id, request.message, request.context)
|
|
return {
|
|
"success": True,
|
|
"agent": request.agent_id,
|
|
"response": result
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=9010)
|