import os import json import hmac import time import uuid import hashlib import requests from .audit import audit_tool_call INTEGRATION_BASE_URL = os.getenv("INTEGRATION_BASE_URL", "http://localhost:8800") AGX_HMAC_SECRET = os.getenv("AGX_HMAC_SECRET", "") def _sign(body: dict): if not AGX_HMAC_SECRET: return {}, json.dumps(body) ts = str(int(time.time() * 1000)) nonce = str(uuid.uuid4()) body_json = json.dumps(body, separators=(",", ":"), sort_keys=True) payload = f"{ts}.{nonce}.{body_json}" sig = hmac.new(AGX_HMAC_SECRET.encode(), payload.encode(), hashlib.sha256).hexdigest() headers = { "X-AGX-SIGNATURE": sig, "X-AGX-TIMESTAMP": ts, "X-AGX-NONCE": nonce, "Content-Type": "application/json" } trace_id = os.getenv("AGX_TRACE_ID", "") if trace_id: headers["X-AGX-TRACE-ID"] = trace_id return headers, body_json def write_observation(assetRef: dict, observation: dict): _t = time.time() payload = {"assetRef": assetRef, **observation} headers, body = _sign(payload) r = requests.post(f"{INTEGRATION_BASE_URL}/write/observation", data=body, headers=headers, timeout=20) r.raise_for_status() out = r.json() audit_tool_call("tool_integration_write.write_observation", {"assetRef": assetRef}, {"ok": True}, True, int((time.time()-_t)*1000)) return out def write_event(assetRef: dict, event: dict): _t = time.time() payload = {"assetRef": assetRef, **event} headers, body = _sign(payload) r = requests.post(f"{INTEGRATION_BASE_URL}/write/event", data=body, headers=headers, timeout=20) r.raise_for_status() out = r.json() audit_tool_call("tool_integration_write.write_event", {"assetRef": assetRef}, {"ok": True}, True, int((time.time()-_t)*1000)) return out def write_tasklog(assetRef: dict, tasklog: dict): _t = time.time() payload = {"assetRef": assetRef, **tasklog} headers, body = _sign(payload) r = requests.post(f"{INTEGRATION_BASE_URL}/write/tasklog", data=body, headers=headers, timeout=20) r.raise_for_status() out = r.json() audit_tool_call("tool_integration_write.write_tasklog", {"assetRef": assetRef}, {"ok": True}, True, int((time.time()-_t)*1000)) return out def write_inventory_movement(assetRef: dict, movement: dict): _t = time.time() payload = {"assetRef": assetRef, **movement} headers, body = _sign(payload) r = requests.post(f"{INTEGRATION_BASE_URL}/write/inventory", data=body, headers=headers, timeout=20) r.raise_for_status() out = r.json() audit_tool_call("tool_integration_write.write_inventory_movement", {"assetRef": assetRef}, {"ok": True}, True, int((time.time()-_t)*1000)) return out