Files
microdao-daarion/services/comfy-agent/app/worker.py
Apple c41c68dc08 feat: Add Comfy Agent service for NODE3 image/video generation
- Create comfy-agent service with FastAPI + NATS integration
- ComfyUI client with HTTP/WebSocket support
- REST API: /generate/image, /generate/video, /status, /result
- NATS subjects: agent.invoke.comfy, comfy.request.*
- Async job queue with progress tracking
- Docker compose configuration for NODE3
- Update PROJECT-MASTER-INDEX.md with NODE2/NODE3 docs

Co-Authored-By: Warp <agent@warp.dev>
2026-02-10 04:13:49 -08:00

61 lines
2.5 KiB
Python

# services/comfy-agent/app/worker.py
import asyncio
import uuid
import os
import json
from typing import Any, Dict, Optional, Tuple
from .jobs import JOB_STORE
from .storage import make_job_dir, public_url
from .comfyui_client import ComfyUIClient
from .config import settings
_queue: "asyncio.Queue[Tuple[str, str, Dict[str, Any]]]" = asyncio.Queue()
def enqueue(job_id: str, gen_type: str, prompt_graph: Dict[str, Any]) -> None:
_queue.put_nowait((job_id, gen_type, prompt_graph))
async def _extract_first_output(history: Dict[str, Any], job_dir: str) -> Optional[str]:
# ComfyUI history structure can vary; implement a conservative extraction:
# try to find any "images" or "gifs"/"videos" outputs and download via /view
# For MVP: prefer /view?filename=...&type=output&subfolder=...
# Here we return a "manifest.json" to unblock integration even if file fetching needs refinement.
manifest_path = os.path.join(job_dir, "manifest.json")
with open(manifest_path, "w", encoding="utf-8") as f:
json.dump(history, f, ensure_ascii=False, indent=2)
return "manifest.json"
async def worker_loop() -> None:
client = ComfyUIClient()
sem = asyncio.Semaphore(settings.MAX_CONCURRENCY)
async def run_one(job_id: str, gen_type: str, prompt_graph: Dict[str, Any]) -> None:
async with sem:
JOB_STORE.update(job_id, status="running", progress=0.01)
client_id = f"comfy-agent-{uuid.uuid4().hex}"
def on_p(p: float, msg: str) -> None:
JOB_STORE.update(job_id, progress=float(p), message=msg)
try:
prompt_id = await client.queue_prompt(prompt_graph, client_id=client_id)
JOB_STORE.update(job_id, comfy_prompt_id=prompt_id)
await client.wait_progress(client_id=client_id, prompt_id=prompt_id, on_progress=on_p)
hist = await client.get_history(prompt_id)
job_dir = make_job_dir(job_id)
fname = await _extract_first_output(hist, job_dir)
if not fname:
JOB_STORE.update(job_id, status="failed", error="No outputs found in ComfyUI history")
return
url = public_url(job_id, fname)
JOB_STORE.update(job_id, status="succeeded", progress=1.0, result_url=url)
except Exception as e:
JOB_STORE.update(job_id, status="failed", error=str(e))
while True:
job_id, gen_type, prompt_graph = await _queue.get()
asyncio.create_task(run_one(job_id, gen_type, prompt_graph))