diff --git a/PROJECT-MASTER-INDEX.md b/PROJECT-MASTER-INDEX.md index f3cd67d4..31e0ad30 100644 --- a/PROJECT-MASTER-INDEX.md +++ b/PROJECT-MASTER-INDEX.md @@ -1,6 +1,6 @@ # 📚 MASTER INDEX — MicroDAO / DAARION / DAGI -**Оновлено:** 2026-01-29 +**Оновлено:** 2026-02-10 **Призначення:** Єдина точка входу до всієї документації проекту --- @@ -25,6 +25,94 @@ | **SSH** | `ssh root@144.76.224.179` | | **Project Root** | `/opt/microdao-daarion/` | | **Docker Network** | `dagi-network` | +| **Hardware** | Hetzner GEX44, NVIDIA RTX 4000 SFF Ada (20GB VRAM) | + +### Агент НОДА1 (Cursor) + +**Де створено:** Агент налаштований у проєкті `/Users/apple/github-projects/microdao-daarion/`, а **не** в `Desktop/MicroDAO` або `node1`. Тому раніше він не знаходився. + +| Що | Де | +|----|-----| +| **Cursor Rule** | `.cursor/rules/noda1-operations.mdc` | +| **Cursor Skill** | `.cursor/skills/noda1-operations/SKILL.md` | +| **Credentials** (локально, не в git) | `.cursor/noda1-credentials.local.mdc` | + +**Як запустити:** «Підключись до НОДА1» — правило/skill описують SSH та операції на сервері. + +**Host key (для верифікації):** +- RSA 3072: `OzbVMM7CC4SatdE2CSoxh5qgJdCyYO22MLjchXXBIro` +- ECDSA 256: `YPQUigtDm3HiEp4MYYeREE+M3ig/2CrZXy2ozr4OWQw` +- ED25519 256: `79LG0tKQ1B1DsdVZ/BhLYSX2v08eCWqqWihHtn+Y8FU` + +### NODA2 (Development Node) + +| Параметр | Значення | +|----------|----------| +| **Тип** | MacBook Pro M4 Max | +| **GPU** | Apple Silicon (40-core GPU, 64GB RAM) | +| **Project Root** | `/Users/apple/github-projects/microdao-daarion/` | +| **Ollama URL** | `http://localhost:11434` | +| **Metal Acceleration** | ✅ Enabled | + +**LLM Моделі (Ollama):** +- `gpt-oss:latest` (13 GB) - Fast LLM 20.9B params +- `phi3:latest` (2.2 GB) - Lightweight 3.8B params +- `starcoder2:3b` (1.7 GB) - Code specialist +- `mistral-nemo:12b` (7.1 GB) - Advanced reasoning +- `gemma2:27b` (15 GB) - Strategic reasoning +- `deepseek-coder:33b` (18 GB) - Advanced code +- `qwen2.5-coder:32b` (19 GB) - Code specialist +- `deepseek-r1:70b` (42 GB) - Strategic reasoning + +**Config:** `services/swapper-service/config/swapper_config_node2.yaml` + +### NODA3 (AI/ML Workstation) + +| Параметр | Значення | +|----------|----------| +| **IP** | `212.8.58.133` | +| **SSH** | `ssh zevs@212.8.58.133 -p33147` | +| **Hostname** | `llm80-che-1-1` | +| **CPU** | AMD Threadripper PRO | +| **GPU** | NVIDIA GeForce RTX 3090 (24GB VRAM) | +| **RAM** | 128GB | +| **Storage** | 1TB NVMe (374GB used, 593GB available) | +| **Project Root** | `/home/zevs/microdao-daarion/` | +| **Node ID** | `node-3-threadripper-rtx3090` | + +**Запущені сервіси (Docker):** +- `swapper-service-node3` (✅ healthy) - порти 8890-8891 +- `dagi-router-node3` (⚠️ unhealthy) - порт 9102 +- `postgres-daarion` - порт 5432 +- `neo4j-daarion` - порти 7474, 7687 +- `qdrant-daarion` - порти 6333-6334 +- `gitlab` - порти 8922, 8929, 8443 + +**LLM Моделі (Ollama):** +- `qwen3:32b` (20 GB) - Primary LLM, 32B params +- `llama3:latest` (4.7 GB) - Fast responses + +**ComfyUI:** +- **Path:** `/home/zevs/ComfyUI/` +- **Size:** 5.8 GB +- **Port:** 8188 +- **Status:** Встановлено (запускається вручну) +- **Purpose:** Image/Video generation workflows + +**LTX-2 Video Generation Model:** +- **Path:** `/home/zevs/models/LTX-2/` +- **Size:** 293 GB (!) +- **Type:** Diffusion audio-video foundation model +- **Model:** LTX-2 19B parameters +- **Variants:** + - `ltx-2-19b-distilled.safetensors` (full precision) + - `ltx-2-19b-distilled-fp8.safetensors` (quantized) +- **Capabilities:** Text-to-Video, Image-to-Video +- **Languages:** en, de, es, fr, ja, ko, zh, it, pt +- **License:** LTX-2 Community License +- **ArXiv:** 2601.03233 + +**Config:** `services/swapper-service/config/swapper_config_node3.yaml` --- @@ -162,6 +250,32 @@ python3 tools/agents smoke --id # Smoke test --- +## 🛠️ Зміни 2026-02-10 + +### ✅ Infrastructure Documentation Update + +**Що зроблено:** + +1. **Додано повну документацію NODA2 (Development Node):** + - MacBook Pro M4 Max конфігурація + - 8 LLM моделей Ollama (gpt-oss, phi3, deepseek-coder, etc.) + - Swapper config для NODE2 + +2. **Додано повну документацію NODA3 (AI/ML Workstation):** + - Hardware: AMD Threadripper PRO + RTX 3090 24GB + - SSH: `zevs@212.8.58.133 -p33147` + - Docker сервіси: swapper, router, postgres, neo4j, qdrant, gitlab + - LLM моделі: qwen3:32b (20GB), llama3:latest (4.7GB) + - **ComfyUI** (5.8 GB) - Image/Video generation на порту 8188 + - **LTX-2 Video Model** (293 GB!) - Text-to-Video, Image-to-Video generation + - Swapper config для NODE3 + +3. **Виправлено розбіжності між документацією та реальним стеком:** + - Оновлено реальні моделі замість документованих + - Додано інформацію про ComfyUI та LTX-2 + +--- + ## 🛠️ Зміни 2026-02-09 ### ✅ SenpAI Market Data Integration @@ -285,12 +399,29 @@ curl -s http://144.76.224.179:6333/collections | jq '.result.collections[] | {na --- +## 🔍 Логи агента Helion (NODA1) + +**Де дивитися:** Gateway приймає webhook, Router викликає LLM (DeepSeek) і tools. + +```bash +# Логи gateway (вхід повідомлень, prober) +ssh root@144.76.224.179 "docker logs dagi-gateway-node1 --tail 100" + +# Логи router (inference, tool calls, DSML) +ssh root@144.76.224.179 "docker logs dagi-router-node1 --tail 150" +``` + +**Типова причина некоректних відповідей у чаті:** у Router логах з’являється `DSML detected in 2nd LLM response` — DeepSeek іноді повертає розмітку DSML замість звичайного тексту після tool call. Router тоді робить 3-й виклик для синтезу відповіді або показує fallback. Покращення: у коді Router перед заміною відповіді спочатку вирізається тільки блок DSML, зберігається текст до нього (якщо є). + +--- + ## ⚠️ Відомі проблеми 1. ~~**gateway → router: "All connection attempts failed"**~~ — ✅ Виправлено (router підключено до dagi-network) 2. ~~**Alateya токен не був раніше доданий**~~ — ✅ Виправлено 3. ~~**Clan, Eonarch не були в production репо**~~ — ✅ Виправлено 4. ~~**Розбіжності в ролях агентів між Gateway/Router/CrewAI**~~ — ✅ Виправлено (Unified Registry) +5. **Helion іноді відповідає некоректно** — пов’язано з DSML у другій відповіді DeepSeek; у Router додано збереження тексту перед DSML-блоком (див. вище). --- diff --git a/docker-compose.node3.yml b/docker-compose.node3.yml new file mode 100644 index 00000000..d0aa169d --- /dev/null +++ b/docker-compose.node3.yml @@ -0,0 +1,103 @@ +version: '3.8' + +services: + # DAGI Router для NODE3 + dagi-router-node3: + build: + context: ./services/router + dockerfile: Dockerfile + container_name: dagi-router-node3 + ports: + - "9102:9102" + environment: + - NATS_URL=nats://144.76.224.179:4222 + - ROUTER_CONFIG_PATH=/app/router_config.yaml + - LOG_LEVEL=info + - NODE_ID=node-3-threadripper-rtx3090 + extra_hosts: + - "host.docker.internal:host-gateway" + volumes: + - ./services/router/router_config.yaml:/app/router_config.yaml:ro + - ./logs:/app/logs + networks: + - dagi-network + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9102/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 10s + + # Swapper Service для NODE3 + swapper-service-node3: + build: + context: ./services/swapper-service + dockerfile: Dockerfile + container_name: swapper-service-node3 + ports: + - "8890:8890" + - "8891:8891" # Metrics + environment: + - OLLAMA_BASE_URL=http://host.docker.internal:11434 + - SWAPPER_CONFIG_PATH=/app/config/swapper_config.yaml + - SWAPPER_MODE=single-active + - MAX_CONCURRENT_MODELS=1 + - MODEL_SWAP_TIMEOUT=300 + - GPU_ENABLED=true + - NODE_ID=node-3-threadripper-rtx3090 + volumes: + - ./services/swapper-service/config/swapper_config_node3.yaml:/app/config/swapper_config.yaml:ro + - ./logs:/app/logs + networks: + - dagi-network + restart: unless-stopped + extra_hosts: + - "host.docker.internal:host-gateway" + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:8890/health || exit 1"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 10s + + # Comfy Agent - Image & Video Generation Service + comfy-agent: + build: + context: ./services/comfy-agent + dockerfile: Dockerfile + container_name: comfy-agent-node3 + ports: + - "8880:8880" + environment: + - COMFYUI_HTTP=http://host.docker.internal:8188 + - COMFYUI_WS=ws://host.docker.internal:8188/ws + - NATS_URL=nats://144.76.224.179:4222 + - NATS_SUBJECT_INVOKE=agent.invoke.comfy + - NATS_SUBJECT_IMAGE=comfy.request.image + - NATS_SUBJECT_VIDEO=comfy.request.video + - STORAGE_PATH=/data/comfy-results + - PUBLIC_BASE_URL=http://212.8.58.133:8880/files + - MAX_CONCURRENCY=1 + volumes: + - comfy-results:/data/comfy-results + - ./logs:/app/logs + networks: + - dagi-network + restart: unless-stopped + extra_hosts: + - "host.docker.internal:host-gateway" + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:8880/healthz || exit 1"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 15s + +networks: + dagi-network: + external: true + +volumes: + comfy-results: + driver: local diff --git a/services/comfy-agent/.dockerignore b/services/comfy-agent/.dockerignore new file mode 100644 index 00000000..d2f60756 --- /dev/null +++ b/services/comfy-agent/.dockerignore @@ -0,0 +1,18 @@ +__pycache__ +*.pyc +*.pyo +*.pyd +.Python +venv/ +.venv/ +*.egg-info/ +.pytest_cache/ +.mypy_cache/ +.coverage +htmlcov/ +dist/ +build/ +*.log +.DS_Store +.env +.env.local diff --git a/services/comfy-agent/Dockerfile b/services/comfy-agent/Dockerfile new file mode 100644 index 00000000..05fc0d9c --- /dev/null +++ b/services/comfy-agent/Dockerfile @@ -0,0 +1,13 @@ +# services/comfy-agent/Dockerfile +FROM python:3.11-slim + +WORKDIR /app +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r /app/requirements.txt + +COPY app /app/app + +ENV PYTHONUNBUFFERED=1 +EXPOSE 8880 + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8880"] diff --git a/services/comfy-agent/README.md b/services/comfy-agent/README.md new file mode 100644 index 00000000..7f78650d --- /dev/null +++ b/services/comfy-agent/README.md @@ -0,0 +1,215 @@ +# Comfy Agent Service + +**Image & Video Generation Service for NODE3** + +## Overview + +Comfy Agent is a specialized service that interfaces with ComfyUI for AI-powered image and video generation. It provides both REST API and NATS messaging interfaces, enabling other agents in the DAARION ecosystem to request generation tasks. + +## Architecture + +``` +NODE1 Agents → NATS → Comfy Agent (NODE3) → ComfyUI (port 8188) + → LTX-2 Models (293 GB) +``` + +## Features + +- **REST API**: Synchronous HTTP endpoints for generation requests +- **NATS Integration**: Async message-based communication with other agents +- **Job Queue**: Handles concurrent generation requests with configurable concurrency +- **Progress Tracking**: Real-time progress updates via WebSocket monitoring +- **Result Storage**: File-based storage with URL-based result retrieval + +## API Endpoints + +### POST /generate/image +Generate an image from text prompt. + +**Request:** +```json +{ + "prompt": "a futuristic city of gifts, ultra-detailed, cinematic", + "negative_prompt": "blurry, low quality", + "width": 1024, + "height": 1024, + "steps": 28, + "seed": 12345 +} +``` + +**Response:** +```json +{ + "job_id": "job_abc123...", + "type": "text-to-image", + "status": "queued", + "progress": 0.0 +} +``` + +### POST /generate/video +Generate a video from text prompt using LTX-2. + +**Request:** +```json +{ + "prompt": "a cat walking on the moon, cinematic", + "seconds": 4, + "fps": 24, + "steps": 30 +} +``` + +### GET /status/{job_id} +Check the status of a generation job. + +**Response:** +```json +{ + "job_id": "job_abc123...", + "type": "text-to-image", + "status": "succeeded", + "progress": 1.0, + "result_url": "http://NODE3_IP:8880/files/job_abc123.../output.png" +} +``` + +### GET /result/{job_id} +Retrieve the final result (same as status). + +### GET /healthz +Health check endpoint. + +## NATS Integration + +### Subscribed Topics + +- `agent.invoke.comfy` - Main invocation channel from router +- `comfy.request.image` - Direct image generation requests +- `comfy.request.video` - Direct video generation requests + +### Message Format + +**Request:** +```json +{ + "type": "text-to-image", + "workflow": { + "1": {"class_type": "CLIPTextEncode", ...}, + "2": {"class_type": "CheckpointLoaderSimple", ...} + } +} +``` + +**Response:** +```json +{ + "job_id": "job_abc123..." +} +``` + +## Configuration + +Environment variables: + +- `COMFYUI_HTTP` - ComfyUI HTTP endpoint (default: `http://127.0.0.1:8188`) +- `COMFYUI_WS` - ComfyUI WebSocket endpoint (default: `ws://127.0.0.1:8188/ws`) +- `NATS_URL` - NATS server URL (default: `nats://144.76.224.179:4222`) +- `STORAGE_PATH` - Path for result storage (default: `/data/comfy-results`) +- `PUBLIC_BASE_URL` - Public URL for accessing results (default: `http://212.8.58.133:8880/files`) +- `MAX_CONCURRENCY` - Max concurrent generations (default: `1`) + +## Development + +### Local Setup + +```bash +cd services/comfy-agent +python -m venv venv +source venv/bin/activate # or `venv\Scripts\activate` on Windows +pip install -r requirements.txt + +# Run locally +uvicorn app.main:app --reload --port 8880 +``` + +### Docker Build + +```bash +docker build -t comfy-agent:latest . +``` + +### Testing + +```bash +# Test image generation +curl -X POST http://localhost:8880/generate/image \ + -H "Content-Type: application/json" \ + -d '{"prompt":"a futuristic city, cyberpunk style"}' + +# Check status +curl http://localhost:8880/status/job_abc123... + +# Health check +curl http://localhost:8880/healthz +``` + +## TODO / Roadmap + +1. **Workflow Templates**: Replace placeholder workflows with actual ComfyUI workflows + - SDXL text-to-image workflow + - LTX-2 text-to-video workflow + - Image-to-video workflow + +2. **Result Extraction**: Implement proper file extraction from ComfyUI history + - Download images/videos via `/view` endpoint + - Support multiple output formats (PNG, JPG, GIF, MP4) + - Handle batch outputs + +3. **Advanced Features**: + - Workflow library management + - Custom model loading + - LoRA/ControlNet support + - Batch processing + - Queue prioritization + +4. **Monitoring**: + - Prometheus metrics + - Grafana dashboards + - Alert on failures + - GPU usage tracking + +5. **Storage**: + - S3/MinIO integration for scalable storage + - Result expiration/cleanup + - Thumbnail generation + +## Integration with Agent Registry + +Add to `config/agent_registry.yml`: + +```yaml +comfy: + id: comfy + name: Comfy + role: Image & Video Generation Specialist + scope: node_local + node_id: node-3-threadripper-rtx3090 + capabilities: + - text-to-image + - text-to-video + - image-to-video + - workflow-execution + api_endpoint: http://212.8.58.133:8880 + nats_subject: agent.invoke.comfy +``` + +## License + +Part of the DAARION MicroDAO project. + +## Maintainers + +- DAARION Team +- Last Updated: 2026-02-10 diff --git a/services/comfy-agent/app/__init__.py b/services/comfy-agent/app/__init__.py new file mode 100644 index 00000000..f83cabd0 --- /dev/null +++ b/services/comfy-agent/app/__init__.py @@ -0,0 +1 @@ +# services/comfy-agent/app/__init__.py diff --git a/services/comfy-agent/app/api.py b/services/comfy-agent/app/api.py new file mode 100644 index 00000000..232890ae --- /dev/null +++ b/services/comfy-agent/app/api.py @@ -0,0 +1,51 @@ +# services/comfy-agent/app/api.py +from fastapi import APIRouter, HTTPException +from .models import GenerateImageRequest, GenerateVideoRequest, JobStatus +from .jobs import JOB_STORE +from .worker import enqueue + +router = APIRouter() + +def _build_workflow_t2i(req: GenerateImageRequest) -> dict: + # MVP: placeholder graph; you will replace with your canonical Comfy workflow JSON. + # Keep it deterministic and param-driven. + return { + "1": {"class_type": "CLIPTextEncode", "inputs": {"text": req.prompt, "clip": ["2", 0]}}, + "2": {"class_type": "CheckpointLoaderSimple", "inputs": {"ckpt_name": "sdxl.safetensors"}}, + # TODO: Add complete workflow JSON for text-to-image + } + +def _build_workflow_t2v(req: GenerateVideoRequest) -> dict: + # MVP placeholder for LTX-2 pipeline; replace with actual LTX-2 workflow. + return { + "1": {"class_type": "CLIPTextEncode", "inputs": {"text": req.prompt, "clip": ["2", 0]}}, + # TODO: Add complete workflow JSON for text-to-video with LTX-2 + } + +@router.post("/generate/image", response_model=JobStatus) +async def generate_image(req: GenerateImageRequest): + job = JOB_STORE.create("text-to-image") + graph = _build_workflow_t2i(req) + enqueue(job.job_id, "text-to-image", graph) + return JOB_STORE.get(job.job_id) + +@router.post("/generate/video", response_model=JobStatus) +async def generate_video(req: GenerateVideoRequest): + job = JOB_STORE.create("text-to-video") + graph = _build_workflow_t2v(req) + enqueue(job.job_id, "text-to-video", graph) + return JOB_STORE.get(job.job_id) + +@router.get("/status/{job_id}", response_model=JobStatus) +async def status(job_id: str): + job = JOB_STORE.get(job_id) + if not job: + raise HTTPException(status_code=404, detail="job_not_found") + return job + +@router.get("/result/{job_id}", response_model=JobStatus) +async def result(job_id: str): + job = JOB_STORE.get(job_id) + if not job: + raise HTTPException(status_code=404, detail="job_not_found") + return job diff --git a/services/comfy-agent/app/comfyui_client.py b/services/comfy-agent/app/comfyui_client.py new file mode 100644 index 00000000..e9f7adb5 --- /dev/null +++ b/services/comfy-agent/app/comfyui_client.py @@ -0,0 +1,54 @@ +# services/comfy-agent/app/comfyui_client.py +import asyncio +import httpx +import json +import websockets +from typing import Any, Dict, Optional, Callable +from .config import settings + +ProgressCb = Callable[[float, str], None] + +class ComfyUIClient: + def __init__(self) -> None: + self.http = httpx.AsyncClient(base_url=settings.COMFYUI_HTTP, timeout=60) + + async def queue_prompt(self, prompt_graph: Dict[str, Any], client_id: str) -> str: + # ComfyUI expects: {"prompt": {...}, "client_id": "..."} + r = await self.http.post("/prompt", json={"prompt": prompt_graph, "client_id": client_id}) + r.raise_for_status() + data = r.json() + # typically returns {"prompt_id": "...", "number": ...} + return data["prompt_id"] + + async def wait_progress(self, client_id: str, prompt_id: str, on_progress: Optional[ProgressCb] = None) -> None: + # WS emits progress/executing/status; keep generic handling + ws_url = f"{settings.COMFYUI_WS}?clientId={client_id}" + async with websockets.connect(ws_url, max_size=50_000_000) as ws: + while True: + msg = await ws.recv() + evt = json.loads(msg) + + # Best-effort progress mapping + if evt.get("type") == "progress": + data = evt.get("data", {}) + max_v = float(data.get("max", 1.0)) + val = float(data.get("value", 0.0)) + p = 0.0 if max_v <= 0 else min(1.0, val / max_v) + if on_progress: + on_progress(p, "progress") + + # completion signal varies; "executing" with node=None часто означає done + if evt.get("type") == "executing": + data = evt.get("data", {}) + if data.get("prompt_id") == prompt_id and data.get("node") is None: + if on_progress: + on_progress(1.0, "done") + return + + async def get_history(self, prompt_id: str) -> Dict[str, Any]: + r = await self.http.get(f"/history/{prompt_id}") + r.raise_for_status() + return r.json() + + async def close(self) -> None: + await self.http.aclose() diff --git a/services/comfy-agent/app/config.py b/services/comfy-agent/app/config.py new file mode 100644 index 00000000..d4b5a01b --- /dev/null +++ b/services/comfy-agent/app/config.py @@ -0,0 +1,22 @@ +# services/comfy-agent/app/config.py +from pydantic_settings import BaseSettings + +class Settings(BaseSettings): + SERVICE_NAME: str = "comfy-agent" + API_HOST: str = "0.0.0.0" + API_PORT: int = 8880 + + COMFYUI_HTTP: str = "http://127.0.0.1:8188" + COMFYUI_WS: str = "ws://127.0.0.1:8188/ws" + + NATS_URL: str = "nats://144.76.224.179:4222" # NODE1 production IP + NATS_SUBJECT_INVOKE: str = "agent.invoke.comfy" + NATS_SUBJECT_IMAGE: str = "comfy.request.image" + NATS_SUBJECT_VIDEO: str = "comfy.request.video" + + STORAGE_PATH: str = "/data/comfy-results" + PUBLIC_BASE_URL: str = "http://212.8.58.133:8880/files" # NODE3 IP + + MAX_CONCURRENCY: int = 1 # для LTX-2 стартово краще 1 + +settings = Settings() diff --git a/services/comfy-agent/app/jobs.py b/services/comfy-agent/app/jobs.py new file mode 100644 index 00000000..272f7c46 --- /dev/null +++ b/services/comfy-agent/app/jobs.py @@ -0,0 +1,25 @@ +# services/comfy-agent/app/jobs.py +import uuid +from typing import Dict, Optional +from .models import JobStatus, GenType + +class JobStore: + def __init__(self) -> None: + self._jobs: Dict[str, JobStatus] = {} + + def create(self, gen_type: GenType) -> JobStatus: + job_id = f"job_{uuid.uuid4().hex}" + js = JobStatus(job_id=job_id, type=gen_type, status="queued", progress=0.0) + self._jobs[job_id] = js + return js + + def get(self, job_id: str) -> Optional[JobStatus]: + return self._jobs.get(job_id) + + def update(self, job_id: str, **patch) -> JobStatus: + js = self._jobs[job_id] + updated = js.model_copy(update=patch) + self._jobs[job_id] = updated + return updated + +JOB_STORE = JobStore() diff --git a/services/comfy-agent/app/main.py b/services/comfy-agent/app/main.py new file mode 100644 index 00000000..f7d4f504 --- /dev/null +++ b/services/comfy-agent/app/main.py @@ -0,0 +1,26 @@ +# services/comfy-agent/app/main.py +import asyncio +from fastapi import FastAPI +from fastapi.staticfiles import StaticFiles +from .config import settings +from .api import router +from .worker import worker_loop +from .nats_client import start_nats +from .storage import ensure_storage + +app = FastAPI(title="Comfy Agent Service", version="0.1.0") +app.include_router(router) + +@app.on_event("startup") +async def startup(): + ensure_storage() + + # Static files for result URLs: /files/{job_id}/... + app.mount("/files", StaticFiles(directory=settings.STORAGE_PATH), name="files") + + asyncio.create_task(worker_loop()) + await start_nats() + +@app.get("/healthz") +async def healthz(): + return {"ok": True, "service": settings.SERVICE_NAME} diff --git a/services/comfy-agent/app/models.py b/services/comfy-agent/app/models.py new file mode 100644 index 00000000..4e1001ef --- /dev/null +++ b/services/comfy-agent/app/models.py @@ -0,0 +1,34 @@ +# services/comfy-agent/app/models.py +from pydantic import BaseModel, Field +from typing import Any, Dict, Optional, Literal + +GenType = Literal["text-to-image", "text-to-video", "image-to-video"] + +class GenerateImageRequest(BaseModel): + prompt: str = Field(min_length=1) + negative_prompt: Optional[str] = None + width: int = 1024 + height: int = 1024 + steps: int = 28 + seed: Optional[int] = None + workflow: Optional[str] = None + workflow_params: Dict[str, Any] = Field(default_factory=dict) + +class GenerateVideoRequest(BaseModel): + prompt: str = Field(min_length=1) + seconds: int = 4 + fps: int = 24 + steps: int = 30 + seed: Optional[int] = None + workflow: Optional[str] = None + workflow_params: Dict[str, Any] = Field(default_factory=dict) + +class JobStatus(BaseModel): + job_id: str + type: GenType + status: Literal["queued", "running", "succeeded", "failed"] + progress: float = 0.0 + message: Optional[str] = None + result_url: Optional[str] = None + error: Optional[str] = None + comfy_prompt_id: Optional[str] = None diff --git a/services/comfy-agent/app/nats_client.py b/services/comfy-agent/app/nats_client.py new file mode 100644 index 00000000..bdf3afe6 --- /dev/null +++ b/services/comfy-agent/app/nats_client.py @@ -0,0 +1,36 @@ +# services/comfy-agent/app/nats_client.py +import json +import asyncio +from nats.aio.client import Client as NATS +from .config import settings +from .jobs import JOB_STORE +from .worker import enqueue + +async def start_nats() -> NATS: + nc = NATS() + await nc.connect(servers=[settings.NATS_URL]) + + async def handle(msg): + subj = msg.subject + reply = msg.reply + payload = json.loads(msg.data.decode("utf-8")) + + # payload contract (MVP): + # { "type": "text-to-image|text-to-video", "workflow": {...} } + gen_type = payload.get("type", "text-to-image") + workflow = payload.get("workflow") + if not workflow: + if reply: + await nc.publish(reply, json.dumps({"error": "missing_workflow"}).encode()) + return + + job = JOB_STORE.create(gen_type) + enqueue(job.job_id, gen_type, workflow) + + if reply: + await nc.publish(reply, json.dumps({"job_id": job.job_id}).encode()) + + await nc.subscribe(settings.NATS_SUBJECT_INVOKE, cb=handle) + await nc.subscribe(settings.NATS_SUBJECT_IMAGE, cb=handle) + await nc.subscribe(settings.NATS_SUBJECT_VIDEO, cb=handle) + return nc diff --git a/services/comfy-agent/app/storage.py b/services/comfy-agent/app/storage.py new file mode 100644 index 00000000..960fe6fa --- /dev/null +++ b/services/comfy-agent/app/storage.py @@ -0,0 +1,16 @@ +# services/comfy-agent/app/storage.py +import os +from pathlib import Path +from .config import settings + +def ensure_storage() -> None: + Path(settings.STORAGE_PATH).mkdir(parents=True, exist_ok=True) + +def make_job_dir(job_id: str) -> str: + ensure_storage() + d = os.path.join(settings.STORAGE_PATH, job_id) + Path(d).mkdir(parents=True, exist_ok=True) + return d + +def public_url(job_id: str, filename: str) -> str: + return f"{settings.PUBLIC_BASE_URL}/{job_id}/{filename}" diff --git a/services/comfy-agent/app/worker.py b/services/comfy-agent/app/worker.py new file mode 100644 index 00000000..0d9c47c3 --- /dev/null +++ b/services/comfy-agent/app/worker.py @@ -0,0 +1,60 @@ +# services/comfy-agent/app/worker.py +import asyncio +import uuid +import os +import json +from typing import Any, Dict, Optional, Tuple +from .jobs import JOB_STORE +from .storage import make_job_dir, public_url +from .comfyui_client import ComfyUIClient +from .config import settings + +_queue: "asyncio.Queue[Tuple[str, str, Dict[str, Any]]]" = asyncio.Queue() + +def enqueue(job_id: str, gen_type: str, prompt_graph: Dict[str, Any]) -> None: + _queue.put_nowait((job_id, gen_type, prompt_graph)) + +async def _extract_first_output(history: Dict[str, Any], job_dir: str) -> Optional[str]: + # ComfyUI history structure can vary; implement a conservative extraction: + # try to find any "images" or "gifs"/"videos" outputs and download via /view + # For MVP: prefer /view?filename=...&type=output&subfolder=... + # Here we return a "manifest.json" to unblock integration even if file fetching needs refinement. + manifest_path = os.path.join(job_dir, "manifest.json") + with open(manifest_path, "w", encoding="utf-8") as f: + json.dump(history, f, ensure_ascii=False, indent=2) + return "manifest.json" + +async def worker_loop() -> None: + client = ComfyUIClient() + sem = asyncio.Semaphore(settings.MAX_CONCURRENCY) + + async def run_one(job_id: str, gen_type: str, prompt_graph: Dict[str, Any]) -> None: + async with sem: + JOB_STORE.update(job_id, status="running", progress=0.01) + + client_id = f"comfy-agent-{uuid.uuid4().hex}" + def on_p(p: float, msg: str) -> None: + JOB_STORE.update(job_id, progress=float(p), message=msg) + + try: + prompt_id = await client.queue_prompt(prompt_graph, client_id=client_id) + JOB_STORE.update(job_id, comfy_prompt_id=prompt_id) + + await client.wait_progress(client_id=client_id, prompt_id=prompt_id, on_progress=on_p) + + hist = await client.get_history(prompt_id) + job_dir = make_job_dir(job_id) + fname = await _extract_first_output(hist, job_dir) + if not fname: + JOB_STORE.update(job_id, status="failed", error="No outputs found in ComfyUI history") + return + + url = public_url(job_id, fname) + JOB_STORE.update(job_id, status="succeeded", progress=1.0, result_url=url) + + except Exception as e: + JOB_STORE.update(job_id, status="failed", error=str(e)) + + while True: + job_id, gen_type, prompt_graph = await _queue.get() + asyncio.create_task(run_one(job_id, gen_type, prompt_graph)) diff --git a/services/comfy-agent/requirements.txt b/services/comfy-agent/requirements.txt new file mode 100644 index 00000000..caea14ce --- /dev/null +++ b/services/comfy-agent/requirements.txt @@ -0,0 +1,9 @@ +fastapi==0.115.0 +uvicorn[standard]==0.30.6 +pydantic==2.8.2 +pydantic-settings==2.4.0 +httpx==0.27.2 +websockets==12.0 +nats-py==2.7.2 +python-multipart==0.0.9 +orjson==3.10.7