Files
Apple ef3473db21 snapshot: NODE1 production state 2026-02-09
Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.

Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles

Excluded from snapshot: venv/, .env, data/, backups, .tgz archives

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 08:46:46 -08:00

157 lines
5.0 KiB
Python

from fastapi import FastAPI
import asyncio
import nats
import logging
import json
from typing import Dict, Any
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
app = FastAPI()
nc = None
js = None
running = False
async def process_test_task(msg):
"""Process test task with mock execution"""
try:
data = json.loads(msg.data.decode())
job_id = data.get("job_id") or data.get("task_id")
trace_id = data.get("trace_id", "")
agent_id = data.get("agent_id", "helion")
logger.info(f"Processing TEST task: {job_id}")
headers = dict(msg.headers) if msg.headers else {}
headers_lower = {str(k).lower(): v for k, v in headers.items()}
replayed = headers_lower.get("replayed") == "true"
# Forced fail path for DLQ replay validation (test_mode only)
if data.get("force_fail") and not replayed:
fail_payload = {
"status": "failed",
"job_id": job_id,
"trace_id": trace_id,
"agent_id": agent_id,
"error": "forced_fail",
"original_subject": msg.subject,
"data": data
}
await js.publish(
"agent.run.failed.dlq",
json.dumps(fail_payload).encode(),
headers={**headers, "replay_count": "0"}
)
logger.info(f"⚠️ Forced fail sent to DLQ: {job_id}")
await msg.ack()
return
# Mock execution
await asyncio.sleep(0.1)
result = {
"status": "completed",
"job_id": job_id,
"trace_id": trace_id,
"agent_id": agent_id,
"result": "Mock test execution completed",
"test_mode": True
}
# Publish completion
completion_headers = {
"Nats-Trace-ID": trace_id,
"Nats-Job-ID": job_id,
"Nats-Agent-ID": agent_id
}
if headers_lower.get("replayed") is not None:
completion_headers["replayed"] = headers_lower.get("replayed")
if headers_lower.get("replay_count") is not None:
completion_headers["replay_count"] = headers_lower.get("replay_count")
if headers_lower.get("original_subject") is not None:
completion_headers["original_subject"] = headers_lower.get("original_subject")
if headers_lower.get("original_msg_id") is not None:
completion_headers["original_msg_id"] = headers_lower.get("original_msg_id")
await js.publish(
"agent.run.completed.helion",
json.dumps(result).encode(),
headers=completion_headers
)
logger.info(f"✅ TEST task completed: {job_id}")
await msg.ack()
except Exception as e:
logger.error(f"Test task failed: {e}")
await msg.nak()
async def worker_loop():
"""Worker loop - processes messages"""
global nc, js, running
try:
# Connect (ignore errors, try to continue)
try:
nc = await nats.connect("nats://nats:4222")
js = nc.jetstream()
logger.info("✅ Connected to NATS")
except Exception as e:
logger.warning(f"NATS connection issue (will retry): {e}")
await asyncio.sleep(5)
# Retry connection
asyncio.create_task(worker_loop())
return
if not js:
return
running = True
# Subscribe to messages (ephemeral consumer)
try:
sub = await js.pull_subscribe(
"agent.run.requested",
None, # Ephemeral consumer
stream=None
)
logger.info("✅ Subscribed to agent.run.requested")
except Exception as e:
logger.warning(f"Subscription failed: {e}")
return
while running:
try:
msgs = await sub.fetch(5, timeout=5)
for msg in msgs:
data = json.loads(msg.data.decode())
# Only process test messages
if data.get("workflow_type") == "test" or data.get("test_mode"):
await process_test_task(msg)
else:
await msg.ack() # ACK but don't process non-test
except asyncio.TimeoutError:
pass
except Exception as e:
logger.error(f"Worker loop error: {e}")
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Worker failed: {e}")
@app.on_event("startup")
async def startup():
"""Start worker on startup"""
asyncio.create_task(worker_loop())
@app.on_event("shutdown")
async def shutdown():
"""Stop worker on shutdown"""
global running, nc
running = False
if nc:
await nc.close()
@app.get("/health")
async def health():
return {"status": "ok", "running": running, "connected": nc is not None}