import os import time from .audit import audit_tool_call import asyncio import json from nats.aio.client import Client as NATS NATS_URL = os.getenv("NATS_URL", "nats://localhost:4222") async def publish(subject: str, payload: dict): _t = time.time() nc = NATS() await nc.connect(servers=[NATS_URL]) await nc.publish(subject, json.dumps(payload).encode()) await nc.flush(1) await nc.drain() audit_tool_call("tool_event_bus.publish", {"subject": subject}, {"ok": True}, True, int((time.time()-_t)*1000)) async def subscribe(subject: str, handler, duration: float = 5.0): nc = NATS() await nc.connect(servers=[NATS_URL]) async def cb(msg): data = msg.data.decode() handler(subject, data) await nc.subscribe(subject, cb=cb) await asyncio.sleep(duration) await nc.drain() audit_tool_call("tool_event_bus.publish", {"subject": subject}, {"ok": True}, True, int((time.time()-_t)*1000))