Files
Apple ef3473db21 snapshot: NODE1 production state 2026-02-09
Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.

Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles

Excluded from snapshot: venv/, .env, data/, backups, .tgz archives

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 08:46:46 -08:00

83 lines
2.2 KiB
Plaintext

"""
CrewAI NATS Worker
==================
Слухає NATS JetStream для async workflow tasks.
Subjects:
- agent.run.requested -> запуск crew task
- workflow.execute -> виконання workflow
Публікує:
- agent.run.completed
- agent.run.failed
"""
import os
import asyncio
import logging
import json
from datetime import datetime
from typing import Dict, Any, Optional
from dataclasses import dataclass
import uuid
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Configuration
NATS_URL = os.getenv("NATS_URL", "nats://nats:4222")
ROUTER_URL = os.getenv("ROUTER_URL", "http://dagi-router-node1:8000")
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY", "")
# Stream configuration
STREAM_NAME = "AGENT_RUNS"
CONSUMER_NAME = "crewai-worker"
@dataclass
class WorkflowTask:
"""Represents a workflow task to execute"""
task_id: str
workflow_type: str
agent_id: str
payload: Dict[str, Any]
priority: int = 1
timeout: int = 300 # 5 minutes default
created_at: str = ""
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.utcnow().isoformat()
class CrewAINATSWorker:
"""
NATS Worker for CrewAI async workflows.
Pulls tasks from JetStream and executes them via CrewAI or Router.
"""
def __init__(self):
self.nc = None
self.js = None
self.running = False
self.http_client = None
async def connect(self):
"""Connect to NATS JetStream"""
try:
import nats
from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy
import httpx
self.nc = await nats.connect(NATS_URL)
self.js = self.nc.jetstream()
# Skip stream creation, use existing
try:
await self.js.stream_info(STREAM_NAME)
logger.info(f"Using existing stream: {STREAM_NAME}")
except:
(logger\.info\(f\"Stream \{STREAM_NAME\} exists\"\)|logger\.warning\(f\"Stream \{STREAM_NAME\} not found)
self.http_client = httpx.AsyncClient(timeout=120.0)