# services/comfy-agent/app/worker.py import asyncio import uuid import os import json from typing import Any, Dict, Optional, Tuple from .jobs import JOB_STORE from .storage import make_job_dir, public_url from .comfyui_client import ComfyUIClient from .config import settings _queue: "asyncio.Queue[Tuple[str, str, Dict[str, Any]]]" = asyncio.Queue() def enqueue(job_id: str, gen_type: str, prompt_graph: Dict[str, Any]) -> None: _queue.put_nowait((job_id, gen_type, prompt_graph)) async def _extract_first_output(history: Dict[str, Any], job_dir: str) -> Optional[str]: # ComfyUI history structure can vary; implement a conservative extraction: # try to find any "images" or "gifs"/"videos" outputs and download via /view # For MVP: prefer /view?filename=...&type=output&subfolder=... # Here we return a "manifest.json" to unblock integration even if file fetching needs refinement. manifest_path = os.path.join(job_dir, "manifest.json") with open(manifest_path, "w", encoding="utf-8") as f: json.dump(history, f, ensure_ascii=False, indent=2) return "manifest.json" async def worker_loop() -> None: client = ComfyUIClient() sem = asyncio.Semaphore(settings.MAX_CONCURRENCY) async def run_one(job_id: str, gen_type: str, prompt_graph: Dict[str, Any]) -> None: async with sem: JOB_STORE.update(job_id, status="running", progress=0.01) client_id = f"comfy-agent-{uuid.uuid4().hex}" def on_p(p: float, msg: str) -> None: JOB_STORE.update(job_id, progress=float(p), message=msg) try: prompt_id = await client.queue_prompt(prompt_graph, client_id=client_id) JOB_STORE.update(job_id, comfy_prompt_id=prompt_id) await client.wait_progress(client_id=client_id, prompt_id=prompt_id, on_progress=on_p) hist = await client.get_history(prompt_id) job_dir = make_job_dir(job_id) fname = await _extract_first_output(hist, job_dir) if not fname: JOB_STORE.update(job_id, status="failed", error="No outputs found in ComfyUI history") return url = public_url(job_id, fname) JOB_STORE.update(job_id, status="succeeded", progress=1.0, result_url=url) except Exception as e: JOB_STORE.update(job_id, status="failed", error=str(e)) while True: job_id, gen_type, prompt_graph = await _queue.get() asyncio.create_task(run_one(job_id, gen_type, prompt_graph))