diff --git a/services/comfy-agent/Dockerfile b/services/comfy-agent/Dockerfile index 05fc0d9c..4ec2bae5 100644 --- a/services/comfy-agent/Dockerfile +++ b/services/comfy-agent/Dockerfile @@ -1,5 +1,6 @@ # services/comfy-agent/Dockerfile -FROM python:3.11-slim +ARG BASE_IMAGE=python:3.11-slim +FROM ${BASE_IMAGE} WORKDIR /app COPY requirements.txt /app/requirements.txt diff --git a/services/comfy-agent/app/api.py b/services/comfy-agent/app/api.py index 88133549..7b602aad 100644 --- a/services/comfy-agent/app/api.py +++ b/services/comfy-agent/app/api.py @@ -1,11 +1,64 @@ # services/comfy-agent/app/api.py -from fastapi import APIRouter, HTTPException +import hashlib +import json + +from fastapi import APIRouter, Header, HTTPException from .models import GenerateImageRequest, GenerateVideoRequest, JobStatus from .jobs import JOB_STORE from .worker import enqueue +from . import idempotency +from .config import settings router = APIRouter() + +def _req_hash(gen_type: str, payload: dict) -> str: + normalized = json.dumps({"type": gen_type, "payload": payload}, sort_keys=True, separators=(",", ":")) + return hashlib.sha256(normalized.encode("utf-8")).hexdigest() + + +def _resolve_idempotency_key( + *, + header_key: str | None, + body_key: str | None, +) -> str | None: + key = (header_key or body_key or "").strip() + return key or None + + +def _create_job_with_idempotency( + *, + gen_type: str, + idem_key: str | None, + req_hash: str, +) -> tuple[JobStatus, bool]: + """ + Returns: + (job_status, should_enqueue) + """ + if not idem_key or idempotency.IDEMPOTENCY_STORE is None: + return JOB_STORE.create(gen_type), True + + candidate_job_id = JOB_STORE.new_job_id() + result = idempotency.IDEMPOTENCY_STORE.reserve( + idem_key=idem_key, + gen_type=gen_type, + req_hash=req_hash, + job_id=candidate_job_id, + ) + + if result.decision == "conflict": + raise HTTPException(status_code=409, detail="idempotency_key_reused_with_different_payload") + + if result.decision == "exists": + existing = JOB_STORE.get(result.job_id) + if existing: + return existing, False + # If process was restarted and in-memory JOB_STORE was lost, return queued placeholder. + return JOB_STORE.create(gen_type, job_id=result.job_id), False + + return JOB_STORE.create(gen_type, job_id=result.job_id), True + def _build_workflow_t2i(req: GenerateImageRequest) -> dict: # Basic SD 1.5 workflow # Node structure: CheckpointLoader -> CLIP Encode -> KSampler -> VAE Decode -> SaveImage @@ -70,24 +123,147 @@ def _build_workflow_t2i(req: GenerateImageRequest) -> dict: } def _build_workflow_t2v(req: GenerateVideoRequest) -> dict: - # MVP placeholder for LTX-2 pipeline; replace with actual LTX-2 workflow. + if not settings.LTX_TEXT_ENCODER: + raise HTTPException(status_code=503, detail="ltx_text_encoder_not_configured") + + frame_rate = float(max(1, req.fps)) + length = req.frames if req.frames and req.frames > 0 else (max(1, req.seconds) * max(1, req.fps) + 1) + neg = req.negative_prompt if req.negative_prompt else "low quality, worst quality, deformed, distorted, disfigured, motion artifacts" + + # LTX-2 text-to-video pipeline with SaveVideo output node. return { - "1": {"class_type": "CLIPTextEncode", "inputs": {"text": req.prompt, "clip": ["2", 0]}}, - # TODO: Add complete workflow JSON for text-to-video with LTX-2 + "4": { + "class_type": "CheckpointLoaderSimple", + "inputs": { + "ckpt_name": settings.LTX_CKPT_NAME, + }, + }, + "5": { + "class_type": "LTXAVTextEncoderLoader", + "inputs": { + "text_encoder": settings.LTX_TEXT_ENCODER, + "ckpt_name": settings.LTX_CKPT_NAME, + "device": settings.LTX_DEVICE, + }, + }, + "6": { + "class_type": "CLIPTextEncode", + "inputs": { + "text": req.prompt, + "clip": ["5", 0], + }, + }, + "7": { + "class_type": "CLIPTextEncode", + "inputs": { + "text": neg, + "clip": ["5", 0], + }, + }, + "69": { + "class_type": "LTXVConditioning", + "inputs": { + "positive": ["6", 0], + "negative": ["7", 0], + "frame_rate": frame_rate, + }, + }, + "70": { + "class_type": "EmptyLTXVLatentVideo", + "inputs": { + "width": req.width, + "height": req.height, + "length": length, + "batch_size": 1, + }, + }, + "73": { + "class_type": "KSamplerSelect", + "inputs": { + "sampler_name": settings.LTX_SAMPLER, + }, + }, + "71": { + "class_type": "LTXVScheduler", + "inputs": { + "steps": req.steps, + "max_shift": settings.LTX_MAX_SHIFT, + "base_shift": settings.LTX_BASE_SHIFT, + "stretch": settings.LTX_STRETCH, + "terminal": settings.LTX_TERMINAL, + }, + }, + "72": { + "class_type": "SamplerCustom", + "inputs": { + "model": ["4", 0], + "add_noise": True, + "noise_seed": req.seed if req.seed is not None else 42, + "cfg": req.cfg, + "positive": ["69", 0], + "negative": ["69", 1], + "sampler": ["73", 0], + "sigmas": ["71", 0], + "latent_image": ["70", 0], + }, + }, + "8": { + "class_type": "VAEDecode", + "inputs": { + "samples": ["72", 0], + "vae": ["4", 2], + }, + }, + "78": { + "class_type": "CreateVideo", + "inputs": { + "images": ["8", 0], + "fps": frame_rate, + }, + }, + "79": { + "class_type": "SaveVideo", + "inputs": { + "video": ["78", 0], + "filename_prefix": "comfy-agent/video", + "format": req.format, + "codec": req.codec, + }, + }, } @router.post("/generate/image", response_model=JobStatus) -async def generate_image(req: GenerateImageRequest): - job = JOB_STORE.create("text-to-image") +async def generate_image( + req: GenerateImageRequest, + idempotency_key: str | None = Header(default=None, alias="Idempotency-Key"), +): + idem_key = _resolve_idempotency_key(header_key=idempotency_key, body_key=req.idempotency_key) + req_hash = _req_hash("text-to-image", req.model_dump(mode="json", exclude={"idempotency_key"})) + job, should_enqueue = _create_job_with_idempotency( + gen_type="text-to-image", + idem_key=idem_key, + req_hash=req_hash, + ) graph = _build_workflow_t2i(req) - enqueue(job.job_id, "text-to-image", graph) + if should_enqueue: + enqueue(job.job_id, "text-to-image", graph) return JOB_STORE.get(job.job_id) @router.post("/generate/video", response_model=JobStatus) -async def generate_video(req: GenerateVideoRequest): - job = JOB_STORE.create("text-to-video") +async def generate_video( + req: GenerateVideoRequest, + idempotency_key: str | None = Header(default=None, alias="Idempotency-Key"), +): + idem_key = _resolve_idempotency_key(header_key=idempotency_key, body_key=req.idempotency_key) + req_hash = _req_hash("text-to-video", req.model_dump(mode="json", exclude={"idempotency_key"})) + job, should_enqueue = _create_job_with_idempotency( + gen_type="text-to-video", + idem_key=idem_key, + req_hash=req_hash, + ) graph = _build_workflow_t2v(req) - enqueue(job.job_id, "text-to-video", graph) + if should_enqueue: + enqueue(job.job_id, "text-to-video", graph) return JOB_STORE.get(job.job_id) @router.get("/status/{job_id}", response_model=JobStatus) diff --git a/services/comfy-agent/app/config.py b/services/comfy-agent/app/config.py index d4b5a01b..ed5526f7 100644 --- a/services/comfy-agent/app/config.py +++ b/services/comfy-agent/app/config.py @@ -16,7 +16,30 @@ class Settings(BaseSettings): STORAGE_PATH: str = "/data/comfy-results" PUBLIC_BASE_URL: str = "http://212.8.58.133:8880/files" # NODE3 IP + S3_ENDPOINT: str = "" + S3_BUCKET: str = "" + S3_ACCESS_KEY: str = "" + S3_SECRET_KEY: str = "" + S3_REGION: str = "us-east-1" + S3_SECURE: bool = False + S3_URL_TTL_S: int = 900 + S3_PREFIX: str = "comfy-results" + S3_FORCE_PATH_STYLE: bool = True + MINIO_ENDPOINT: str = "" + MINIO_BUCKET: str = "" + MINIO_ACCESS_KEY: str = "" + MINIO_SECRET_KEY: str = "" + LTX_CKPT_NAME: str = "ltx-2-19b-distilled-fp8.safetensors" + LTX_TEXT_ENCODER: str = "" + LTX_DEVICE: str = "default" + LTX_SAMPLER: str = "euler" + LTX_MAX_SHIFT: float = 2.05 + LTX_BASE_SHIFT: float = 0.95 + LTX_TERMINAL: float = 0.1 + LTX_STRETCH: bool = True MAX_CONCURRENCY: int = 1 # для LTX-2 стартово краще 1 + IDEMPOTENCY_TTL_S: int = 24 * 60 * 60 + IDEMPOTENCY_DB_PATH: str = "" settings = Settings() diff --git a/services/comfy-agent/app/idempotency.py b/services/comfy-agent/app/idempotency.py new file mode 100644 index 00000000..aab2ae04 --- /dev/null +++ b/services/comfy-agent/app/idempotency.py @@ -0,0 +1,111 @@ +import sqlite3 +import threading +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Literal, Optional + +from .config import settings + + +Decision = Literal["created", "exists", "conflict"] + + +@dataclass +class IdempotencyResult: + decision: Decision + job_id: str + + +class IdempotencyStore: + def __init__(self, db_path: str, ttl_s: int) -> None: + self.db_path = db_path + self.ttl_s = max(60, int(ttl_s)) + self._lock = threading.Lock() + self._init_db() + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path, check_same_thread=False) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + return conn + + def _init_db(self) -> None: + Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) + with self._connect() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS idempotency_jobs ( + idem_key TEXT NOT NULL, + gen_type TEXT NOT NULL, + req_hash TEXT NOT NULL, + job_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + expires_at INTEGER NOT NULL, + PRIMARY KEY (idem_key, gen_type) + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_idem_expires_at ON idempotency_jobs(expires_at)" + ) + conn.commit() + + def reserve( + self, + *, + idem_key: str, + gen_type: str, + req_hash: str, + job_id: str, + ) -> IdempotencyResult: + now = int(time.time()) + expires = now + self.ttl_s + + with self._lock, self._connect() as conn: + conn.execute("DELETE FROM idempotency_jobs WHERE expires_at < ?", (now,)) + row = conn.execute( + """ + SELECT req_hash, job_id, expires_at + FROM idempotency_jobs + WHERE idem_key = ? AND gen_type = ? + """, + (idem_key, gen_type), + ).fetchone() + + if row: + existing_hash, existing_job_id, existing_exp = row + if existing_exp >= now: + if existing_hash == req_hash: + return IdempotencyResult(decision="exists", job_id=existing_job_id) + return IdempotencyResult(decision="conflict", job_id=existing_job_id) + + conn.execute( + "DELETE FROM idempotency_jobs WHERE idem_key = ? AND gen_type = ?", + (idem_key, gen_type), + ) + + conn.execute( + """ + INSERT INTO idempotency_jobs + (idem_key, gen_type, req_hash, job_id, created_at, expires_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + (idem_key, gen_type, req_hash, job_id, now, expires), + ) + conn.commit() + return IdempotencyResult(decision="created", job_id=job_id) + + +IDEMPOTENCY_STORE: Optional[IdempotencyStore] = None + + +def init_idempotency_store() -> None: + global IDEMPOTENCY_STORE + if IDEMPOTENCY_STORE is not None: + return + + db_path = settings.IDEMPOTENCY_DB_PATH + if not db_path: + db_path = str(Path(settings.STORAGE_PATH) / "idempotency.sqlite3") + IDEMPOTENCY_STORE = IdempotencyStore(db_path=db_path, ttl_s=settings.IDEMPOTENCY_TTL_S) diff --git a/services/comfy-agent/app/jobs.py b/services/comfy-agent/app/jobs.py index 272f7c46..1b33b5fa 100644 --- a/services/comfy-agent/app/jobs.py +++ b/services/comfy-agent/app/jobs.py @@ -7,8 +7,12 @@ class JobStore: def __init__(self) -> None: self._jobs: Dict[str, JobStatus] = {} - def create(self, gen_type: GenType) -> JobStatus: - job_id = f"job_{uuid.uuid4().hex}" + @staticmethod + def new_job_id() -> str: + return f"job_{uuid.uuid4().hex}" + + def create(self, gen_type: GenType, job_id: Optional[str] = None) -> JobStatus: + job_id = job_id or self.new_job_id() js = JobStatus(job_id=job_id, type=gen_type, status="queued", progress=0.0) self._jobs[job_id] = js return js diff --git a/services/comfy-agent/app/main.py b/services/comfy-agent/app/main.py index f7d4f504..f4e83645 100644 --- a/services/comfy-agent/app/main.py +++ b/services/comfy-agent/app/main.py @@ -6,7 +6,8 @@ from .config import settings from .api import router from .worker import worker_loop from .nats_client import start_nats -from .storage import ensure_storage +from .storage import ensure_storage, init_object_storage +from .idempotency import init_idempotency_store app = FastAPI(title="Comfy Agent Service", version="0.1.0") app.include_router(router) @@ -14,6 +15,8 @@ app.include_router(router) @app.on_event("startup") async def startup(): ensure_storage() + init_object_storage() + init_idempotency_store() # Static files for result URLs: /files/{job_id}/... app.mount("/files", StaticFiles(directory=settings.STORAGE_PATH), name="files") diff --git a/services/comfy-agent/app/models.py b/services/comfy-agent/app/models.py index 4e1001ef..351d779d 100644 --- a/services/comfy-agent/app/models.py +++ b/services/comfy-agent/app/models.py @@ -11,15 +11,24 @@ class GenerateImageRequest(BaseModel): height: int = 1024 steps: int = 28 seed: Optional[int] = None + idempotency_key: Optional[str] = None workflow: Optional[str] = None workflow_params: Dict[str, Any] = Field(default_factory=dict) class GenerateVideoRequest(BaseModel): prompt: str = Field(min_length=1) + negative_prompt: Optional[str] = None + width: int = 768 + height: int = 512 + frames: Optional[int] = None seconds: int = 4 fps: int = 24 steps: int = 30 + cfg: float = 2.5 seed: Optional[int] = None + format: str = "mp4" + codec: str = "h264" + idempotency_key: Optional[str] = None workflow: Optional[str] = None workflow_params: Dict[str, Any] = Field(default_factory=dict) diff --git a/services/comfy-agent/app/nats_client.py b/services/comfy-agent/app/nats_client.py index bdf3afe6..463a24c1 100644 --- a/services/comfy-agent/app/nats_client.py +++ b/services/comfy-agent/app/nats_client.py @@ -1,10 +1,21 @@ # services/comfy-agent/app/nats_client.py import json import asyncio +import hashlib from nats.aio.client import Client as NATS from .config import settings from .jobs import JOB_STORE from .worker import enqueue +from . import idempotency + + +def _hash_payload(gen_type: str, workflow: dict) -> str: + normalized = json.dumps( + {"type": gen_type, "workflow": workflow}, + sort_keys=True, + separators=(",", ":"), + ) + return hashlib.sha256(normalized.encode("utf-8")).hexdigest() async def start_nats() -> NATS: nc = NATS() @@ -24,8 +35,24 @@ async def start_nats() -> NATS: await nc.publish(reply, json.dumps({"error": "missing_workflow"}).encode()) return - job = JOB_STORE.create(gen_type) - enqueue(job.job_id, gen_type, workflow) + idem_key = (payload.get("idempotency_key") or "").strip() + if idem_key and idempotency.IDEMPOTENCY_STORE is not None: + result = idempotency.IDEMPOTENCY_STORE.reserve( + idem_key=idem_key, + gen_type=gen_type, + req_hash=_hash_payload(gen_type, workflow), + job_id=JOB_STORE.new_job_id(), + ) + if result.decision == "conflict": + if reply: + await nc.publish(reply, json.dumps({"error": "idempotency_key_reused_with_different_payload"}).encode()) + return + job = JOB_STORE.get(result.job_id) or JOB_STORE.create(gen_type, job_id=result.job_id) + if result.decision == "created": + enqueue(job.job_id, gen_type, workflow) + else: + job = JOB_STORE.create(gen_type) + enqueue(job.job_id, gen_type, workflow) if reply: await nc.publish(reply, json.dumps({"job_id": job.job_id}).encode()) diff --git a/services/comfy-agent/app/storage.py b/services/comfy-agent/app/storage.py index 960fe6fa..57192da5 100644 --- a/services/comfy-agent/app/storage.py +++ b/services/comfy-agent/app/storage.py @@ -1,16 +1,117 @@ # services/comfy-agent/app/storage.py +import mimetypes import os from pathlib import Path + from .config import settings +try: + import boto3 + from botocore.client import Config +except Exception: # pragma: no cover - optional runtime dependency fallback + boto3 = None + Config = None + +_s3_client = None +_s3_bucket: str | None = None +_s3_prefix: str | None = None + + +def _s3_value(primary: str, fallback: str) -> str: + return (primary or fallback).strip() + + +def _build_endpoint_url(endpoint: str) -> str: + if endpoint.startswith("http://") or endpoint.startswith("https://"): + return endpoint + scheme = "https" if settings.S3_SECURE else "http" + return f"{scheme}://{endpoint}" + + +def _ensure_bucket(client, bucket: str) -> None: + try: + client.head_bucket(Bucket=bucket) + return + except Exception: + pass + client.create_bucket(Bucket=bucket) + + +def init_object_storage() -> None: + global _s3_client, _s3_bucket, _s3_prefix + + endpoint = _s3_value(settings.S3_ENDPOINT, settings.MINIO_ENDPOINT) + bucket = _s3_value(settings.S3_BUCKET, settings.MINIO_BUCKET) + access_key = _s3_value(settings.S3_ACCESS_KEY, settings.MINIO_ACCESS_KEY) + secret_key = _s3_value(settings.S3_SECRET_KEY, settings.MINIO_SECRET_KEY) + + if not endpoint or not bucket or not access_key or not secret_key: + _s3_client = None + _s3_bucket = None + _s3_prefix = None + return + if boto3 is None or Config is None: + print("S3 storage disabled: boto3 is not installed") + return + + endpoint_url = _build_endpoint_url(endpoint) + s3_config = Config( + signature_version="s3v4", + s3={"addressing_style": "path" if settings.S3_FORCE_PATH_STYLE else "auto"}, + ) + client = boto3.client( + "s3", + endpoint_url=endpoint_url, + region_name=settings.S3_REGION, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + use_ssl=settings.S3_SECURE, + config=s3_config, + ) + + _ensure_bucket(client, bucket) + _s3_client = client + _s3_bucket = bucket + _s3_prefix = settings.S3_PREFIX.strip("/") + print(f"S3 storage enabled: endpoint={endpoint_url} bucket={bucket}") + + def ensure_storage() -> None: Path(settings.STORAGE_PATH).mkdir(parents=True, exist_ok=True) + def make_job_dir(job_id: str) -> str: ensure_storage() d = os.path.join(settings.STORAGE_PATH, job_id) Path(d).mkdir(parents=True, exist_ok=True) return d + def public_url(job_id: str, filename: str) -> str: return f"{settings.PUBLIC_BASE_URL}/{job_id}/{filename}" + + +def publish_result_url(job_id: str, filename: str, local_path: str) -> str: + if _s3_client is None or _s3_bucket is None: + return public_url(job_id, filename) + + object_key = f"{job_id}/{filename}" + if _s3_prefix: + object_key = f"{_s3_prefix}/{object_key}" + + content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream" + try: + _s3_client.upload_file( + local_path, + _s3_bucket, + object_key, + ExtraArgs={"ContentType": content_type}, + ) + return _s3_client.generate_presigned_url( + ClientMethod="get_object", + Params={"Bucket": _s3_bucket, "Key": object_key}, + ExpiresIn=settings.S3_URL_TTL_S, + ) + except Exception as e: + print(f"S3 upload/presign failed for job={job_id}: {e}") + return public_url(job_id, filename) diff --git a/services/comfy-agent/app/worker.py b/services/comfy-agent/app/worker.py index 0d9c47c3..ab93d5f3 100644 --- a/services/comfy-agent/app/worker.py +++ b/services/comfy-agent/app/worker.py @@ -5,7 +5,7 @@ import os import json from typing import Any, Dict, Optional, Tuple from .jobs import JOB_STORE -from .storage import make_job_dir, public_url +from .storage import make_job_dir, publish_result_url from .comfyui_client import ComfyUIClient from .config import settings @@ -14,15 +14,67 @@ _queue: "asyncio.Queue[Tuple[str, str, Dict[str, Any]]]" = asyncio.Queue() def enqueue(job_id: str, gen_type: str, prompt_graph: Dict[str, Any]) -> None: _queue.put_nowait((job_id, gen_type, prompt_graph)) -async def _extract_first_output(history: Dict[str, Any], job_dir: str) -> Optional[str]: - # ComfyUI history structure can vary; implement a conservative extraction: - # try to find any "images" or "gifs"/"videos" outputs and download via /view - # For MVP: prefer /view?filename=...&type=output&subfolder=... - # Here we return a "manifest.json" to unblock integration even if file fetching needs refinement. +async def _extract_first_output(history: Dict[str, Any], job_dir: str, client: ComfyUIClient) -> Optional[str]: + # Keep full history for debugging/reproducibility. manifest_path = os.path.join(job_dir, "manifest.json") with open(manifest_path, "w", encoding="utf-8") as f: json.dump(history, f, ensure_ascii=False, indent=2) - return "manifest.json" + + def _iter_assets() -> list[Dict[str, Any]]: + assets: list[Dict[str, Any]] = [] + for prompt_data in history.values(): + outputs = prompt_data.get("outputs", {}) if isinstance(prompt_data, dict) else {} + for node_out in outputs.values(): + if not isinstance(node_out, dict): + continue + for key in ("images", "gifs", "videos"): + for item in node_out.get(key, []) or []: + if isinstance(item, dict) and item.get("filename"): + assets.append(item) + return assets + + assets = _iter_assets() + if not assets: + return None + + first = assets[0] + filename = os.path.basename(first.get("filename", "output.bin")) + params = { + "filename": first.get("filename"), + "subfolder": first.get("subfolder", ""), + "type": first.get("type", "output"), + } + + try: + resp = await client.http.get("/view", params=params) + resp.raise_for_status() + out_path = os.path.join(job_dir, filename) + with open(out_path, "wb") as f: + f.write(resp.content) + return filename + except Exception: + # Fallback remains manifest-only if /view download fails. + return None + + +def _extract_history_error(history: Dict[str, Any]) -> Optional[str]: + for prompt_data in history.values(): + if not isinstance(prompt_data, dict): + continue + status = prompt_data.get("status", {}) + if isinstance(status, dict) and status.get("status_str") == "error": + messages = status.get("messages", []) + for item in messages: + if not (isinstance(item, list) and len(item) >= 2): + continue + if item[0] != "execution_error": + continue + payload = item[1] if isinstance(item[1], dict) else {} + msg = payload.get("exception_message") or payload.get("exception_type") + if msg: + return str(msg).strip() + return "comfy_execution_error" + return None async def worker_loop() -> None: client = ComfyUIClient() @@ -44,16 +96,22 @@ async def worker_loop() -> None: hist = await client.get_history(prompt_id) job_dir = make_job_dir(job_id) - fname = await _extract_first_output(hist, job_dir) + hist_error = _extract_history_error(hist) + if hist_error: + await _extract_first_output(hist, job_dir, client) + JOB_STORE.update(job_id, status="failed", message="failed", error=hist_error) + return + fname = await _extract_first_output(hist, job_dir, client) if not fname: - JOB_STORE.update(job_id, status="failed", error="No outputs found in ComfyUI history") + JOB_STORE.update(job_id, status="failed", message="failed", error="No outputs found in ComfyUI history") return - url = public_url(job_id, fname) + local_path = os.path.join(job_dir, fname) + url = publish_result_url(job_id, fname, local_path) JOB_STORE.update(job_id, status="succeeded", progress=1.0, result_url=url) except Exception as e: - JOB_STORE.update(job_id, status="failed", error=str(e)) + JOB_STORE.update(job_id, status="failed", message="failed", error=str(e)) while True: job_id, gen_type, prompt_graph = await _queue.get() diff --git a/services/comfy-agent/requirements.txt b/services/comfy-agent/requirements.txt index caea14ce..9803932e 100644 --- a/services/comfy-agent/requirements.txt +++ b/services/comfy-agent/requirements.txt @@ -7,3 +7,4 @@ websockets==12.0 nats-py==2.7.2 python-multipart==0.0.9 orjson==3.10.7 +boto3==1.35.36 diff --git a/services/senpai-md-consumer/senpai/md_consumer/api.py b/services/senpai-md-consumer/senpai/md_consumer/api.py index c0bc5e9a..4ca0a0de 100644 --- a/services/senpai-md-consumer/senpai/md_consumer/api.py +++ b/services/senpai-md-consumer/senpai/md_consumer/api.py @@ -25,6 +25,7 @@ logger = logging.getLogger(__name__) # These are set by main.py at startup _state: LatestState | None = None _stats_fn = None # callable → dict +_features_cache: dict[str, dict] = {} # symbol → last computed features def set_state(state: LatestState) -> None: @@ -37,6 +38,11 @@ def set_stats_fn(fn) -> None: _stats_fn = fn +def cache_features(symbol: str, features: dict) -> None: + """Cache pre-computed features for fast API responses.""" + _features_cache[symbol] = features + + async def _handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): """Minimal HTTP request handler.""" try: @@ -117,15 +123,19 @@ async def _route( return body, "application/json", "200 OK" elif path == "/features/latest": - symbol = params.get("symbol", "") + symbol = params.get("symbol", "").upper() if not symbol: body = json.dumps({"error": "missing ?symbol=XXX"}).encode() return body, "application/json", "400 Bad Request" - if not _state: + cached = _features_cache.get(symbol) + if cached: + data = {"symbol": symbol, "features": cached} + elif _state: + # Fallback to live compute (slower) + data = {"symbol": symbol, "features": compute_features(_state, symbol)} + else: body = json.dumps({"error": "not initialized"}).encode() return body, "application/json", "503 Service Unavailable" - features = compute_features(_state, symbol) - data = {"symbol": symbol.upper(), "features": features} body = json.dumps(data, ensure_ascii=False).encode() return body, "application/json", "200 OK" diff --git a/services/senpai-md-consumer/senpai/md_consumer/main.py b/services/senpai-md-consumer/senpai/md_consumer/main.py index 729d92f7..5209fb53 100644 --- a/services/senpai-md-consumer/senpai/md_consumer/main.py +++ b/services/senpai-md-consumer/senpai/md_consumer/main.py @@ -79,12 +79,24 @@ async def process_events( events_per_sec_count = 0 time.monotonic() + feature_compute_interval = 1.0 / max(settings.features_pub_rate_hz, 1.0) + next_feature_compute: dict[str, float] = {} + next_signal_emit: dict[str, float] = {} + signal_cooldown_sec = 1.0 + + batch_counter = 0 while True: try: event = await consumer.queue.get() except asyncio.CancelledError: break + # Yield to event loop every N events so HTTP API stays responsive + batch_counter += 1 + if batch_counter >= 5: + batch_counter = 0 + await asyncio.sleep(0) + proc_start = time.monotonic() try: @@ -110,15 +122,24 @@ async def process_events( else: symbol = None - # Compute features + publish (only for trade/quote events) + # Compute features + publish with per-symbol throttling if symbol and settings.features_enabled: - snapshot = make_feature_snapshot(state, symbol) - await publisher.publish_features(snapshot) + now_mono = time.monotonic() + due = next_feature_compute.get(symbol, 0.0) + if now_mono >= due: + snapshot = make_feature_snapshot(state, symbol) + # Cache for fast HTTP API responses + api.cache_features(symbol, snapshot.features) + await publisher.publish_features(snapshot) - # Check for trade signal - sig = check_signal(snapshot.features, symbol) - if sig: - await publisher.publish_signal(sig) + # Check for trade signal with cooldown to avoid flood + sig = check_signal(snapshot.features, symbol) + sig_due = next_signal_emit.get(symbol, 0.0) + if sig and now_mono >= sig_due: + await publisher.publish_signal(sig) + next_signal_emit[symbol] = now_mono + signal_cooldown_sec + + next_feature_compute[symbol] = now_mono + feature_compute_interval # Processing latency metric proc_ms = (time.monotonic() - proc_start) * 1000 diff --git a/services/senpai-md-consumer/senpai/md_consumer/publisher.py b/services/senpai-md-consumer/senpai/md_consumer/publisher.py index fc761d1d..53585bb9 100644 --- a/services/senpai-md-consumer/senpai/md_consumer/publisher.py +++ b/services/senpai-md-consumer/senpai/md_consumer/publisher.py @@ -78,7 +78,7 @@ class Publisher: symbol=signal.symbol, direction=signal.direction, ).inc() - logger.info( + logger.debug( "publisher.signal_emitted", extra={ "symbol": signal.symbol, diff --git a/services/swapper-service/requirements.txt b/services/swapper-service/requirements.txt index 47e62ed8..e15ea696 100644 --- a/services/swapper-service/requirements.txt +++ b/services/swapper-service/requirements.txt @@ -14,6 +14,7 @@ pillow>=10.0.0 tiktoken>=0.5.0 sentencepiece>=0.1.99 einops>=0.7.0 +verovio>=4.3.1 # STT (Speech-to-Text) dependencies faster-whisper>=1.0.0 @@ -41,4 +42,4 @@ googlesearch-python>=1.2.0 pdfplumber>=0.10.0 python-docx>=1.1.0 openpyxl>=3.1.2 -chardet>=5.2.0 \ No newline at end of file +chardet>=5.2.0