Files

64 lines
2.3 KiB
Python

# services/comfy-agent/app/nats_client.py
import json
import asyncio
import hashlib
from nats.aio.client import Client as NATS
from .config import settings
from .jobs import JOB_STORE
from .worker import enqueue
from . import idempotency
def _hash_payload(gen_type: str, workflow: dict) -> str:
normalized = json.dumps(
{"type": gen_type, "workflow": workflow},
sort_keys=True,
separators=(",", ":"),
)
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
async def start_nats() -> NATS:
nc = NATS()
await nc.connect(servers=[settings.NATS_URL])
async def handle(msg):
subj = msg.subject
reply = msg.reply
payload = json.loads(msg.data.decode("utf-8"))
# payload contract (MVP):
# { "type": "text-to-image|text-to-video", "workflow": {...} }
gen_type = payload.get("type", "text-to-image")
workflow = payload.get("workflow")
if not workflow:
if reply:
await nc.publish(reply, json.dumps({"error": "missing_workflow"}).encode())
return
idem_key = (payload.get("idempotency_key") or "").strip()
if idem_key and idempotency.IDEMPOTENCY_STORE is not None:
result = idempotency.IDEMPOTENCY_STORE.reserve(
idem_key=idem_key,
gen_type=gen_type,
req_hash=_hash_payload(gen_type, workflow),
job_id=JOB_STORE.new_job_id(),
)
if result.decision == "conflict":
if reply:
await nc.publish(reply, json.dumps({"error": "idempotency_key_reused_with_different_payload"}).encode())
return
job = JOB_STORE.get(result.job_id) or JOB_STORE.create(gen_type, job_id=result.job_id)
if result.decision == "created":
enqueue(job.job_id, gen_type, workflow)
else:
job = JOB_STORE.create(gen_type)
enqueue(job.job_id, gen_type, workflow)
if reply:
await nc.publish(reply, json.dumps({"job_id": job.job_id}).encode())
await nc.subscribe(settings.NATS_SUBJECT_INVOKE, cb=handle)
await nc.subscribe(settings.NATS_SUBJECT_IMAGE, cb=handle)
await nc.subscribe(settings.NATS_SUBJECT_VIDEO, cb=handle)
return nc