""" CrewAI Orchestrator Backend MVP FastAPI service that manages multi-agent workflows using CrewAI framework. For MVP: simulated workflow execution For production: integrate real CrewAI crews with DAGI Router as LLM provider """ import logging from typing import Dict, Any, Optional, List from datetime import datetime from fastapi import FastAPI, HTTPException from pydantic import BaseModel logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) logger = logging.getLogger(__name__) app = FastAPI( title="CrewAI Orchestrator Backend", version="1.0.0", description="Multi-agent workflow orchestration for DAGI Router" ) # ======================================== # Request Models # ======================================== class WorkflowMeta(BaseModel): mode: Optional[str] = None agent: Optional[str] = None dao_id: Optional[str] = None user_id: Optional[str] = None source: Optional[str] = None session_id: Optional[str] = None class WorkflowRunRequest(BaseModel): workflow: str input: Dict[str, Any] meta: Optional[WorkflowMeta] = None # ======================================== # Workflow Registry (MVP - simulated) # ======================================== WORKFLOWS = { "microdao_onboarding": { "description": "Onboard new member to microDAO", "agents": ["welcomer", "role_assigner", "guide"], "steps": [ "Send welcome message", "Explain DAO structure", "Assign initial role", "Provide getting started guide" ] }, "code_review": { "description": "Multi-agent code review workflow", "agents": ["reviewer", "security_checker", "performance_analyzer"], "steps": [ "Analyze code structure", "Check for security vulnerabilities", "Review performance implications", "Generate review summary" ] }, "proposal_review": { "description": "DAO proposal multi-agent review", "agents": ["legal_checker", "financial_analyzer", "impact_assessor"], "steps": [ "Review legal compliance", "Analyze financial impact", "Assess community impact", "Generate recommendation" ] }, "task_decomposition": { "description": "Break down complex task into subtasks", "agents": ["planner", "estimator", "dependency_analyzer"], "steps": [ "Analyze task requirements", "Break into subtasks", "Estimate effort", "Identify dependencies" ] } } # ======================================== # Endpoints # ======================================== @app.get("/") async def root(): return { "service": "crewai-orchestrator", "version": "1.0.0", "endpoints": [ "POST /workflow/run", "GET /workflow/list", "GET /health" ], "workflows_available": len(WORKFLOWS) } @app.get("/health") async def health(): return { "status": "healthy", "service": "crewai-orchestrator", "workflows": len(WORKFLOWS) } @app.get("/workflow/list") async def list_workflows(): """List all available workflows""" return { "workflows": [ { "id": wf_id, "description": wf_data["description"], "agents": wf_data["agents"], "steps_count": len(wf_data["steps"]) } for wf_id, wf_data in WORKFLOWS.items() ] } @app.post("/workflow/run") async def run_workflow(req: WorkflowRunRequest): """ Execute a multi-agent workflow. For MVP: simulates workflow execution For production: - Initialize CrewAI crew - Configure agents to use DAGI Router as LLM endpoint - Execute workflow - Return results """ try: # Validate workflow exists if req.workflow not in WORKFLOWS: raise HTTPException( status_code=404, detail=f"Workflow '{req.workflow}' not found. Available: {list(WORKFLOWS.keys())}" ) workflow_def = WORKFLOWS[req.workflow] logger.info(f"Executing workflow: {req.workflow}") logger.info(f" Input: {req.input}") logger.info(f" Meta: {req.meta}") # MVP: Simulate workflow execution # TODO: Replace with real CrewAI integration # Simulate agent execution steps execution_log = [] for idx, step in enumerate(workflow_def["steps"], 1): agent = workflow_def["agents"][idx - 1] if idx - 1 < len(workflow_def["agents"]) else "coordinator" execution_log.append({ "step": idx, "agent": agent, "action": step, "status": "completed", "timestamp": datetime.now().isoformat() }) # Simulate workflow result result = { "workflow": req.workflow, "status": "completed", "execution_time_ms": len(workflow_def["steps"]) * 250, # Simulated "agents_used": workflow_def["agents"], "steps_completed": len(workflow_def["steps"]), "execution_log": execution_log, "output": { "summary": f"Workflow '{req.workflow}' completed successfully (SIMULATED)", "input_processed": req.input, "recommendations": [ "This is a simulated workflow result", "In production, CrewAI agents will process the request", "Agents will use DAGI Router for LLM calls" ] } } # Include metadata in response if req.meta: result["meta"] = req.meta.dict() logger.info(f"Workflow {req.workflow} completed: {len(execution_log)} steps") return result except HTTPException: raise except Exception as e: logger.error(f"Workflow execution error: {e}") raise HTTPException(status_code=500, detail=str(e)) # ======================================== # Main # ======================================== if __name__ == "__main__": import uvicorn import argparse parser = argparse.ArgumentParser(description="CrewAI Orchestrator Backend") parser.add_argument("--host", default="127.0.0.1", help="Host to bind to") parser.add_argument("--port", type=int, default=9010, help="Port to bind to") parser.add_argument("--reload", action="store_true", help="Enable auto-reload") args = parser.parse_args() logger.info(f"Starting CrewAI Orchestrator on {args.host}:{args.port}") uvicorn.run( "crewai_backend:app", host=args.host, port=args.port, reload=args.reload, log_level="info" )