import hashlib import json import re import time import uuid from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import yaml VISIBILITY = ["public", "interclan", "incircle", "soulsafe", "sacred"] VIS_RANK = {v: i for i, v in enumerate(VISIBILITY)} CONSENT = {"none", "pending", "confirmed"} STOP_SCHEMA_ENVELOPE = "STOP_SCHEMA_ENVELOPE" STOP_SCHEMA_ARTIFACT = "STOP_SCHEMA_ARTIFACT" STOP_PROVENANCE_INVALID = "STOP_PROVENANCE_INVALID" STOP_CONSENT_EVENT_INVALID = "STOP_CONSENT_EVENT_INVALID" STOP_CONSENT_EVENT_MISSING = "STOP_CONSENT_EVENT_MISSING" STOP_CONSENT_QUORUM_NOT_MET = "STOP_CONSENT_QUORUM_NOT_MET" STOP_AGENT_UNKNOWN = "STOP_AGENT_UNKNOWN" STOP_OUTPUT_NOT_ALLOWED = "STOP_OUTPUT_NOT_ALLOWED" STOP_VISIBILITY_ESCALATION = "STOP_VISIBILITY_ESCALATION" STOP_CONSENT_MISSING = "STOP_CONSENT_MISSING" STOP_SECRETS_DETECTED = "STOP_SECRETS_DETECTED" STOP_INJECTION_ATTEMPT = "STOP_INJECTION_ATTEMPT" STOP_EXPORT_PAYLOAD_NOT_PUBLIC = "STOP_EXPORT_PAYLOAD_NOT_PUBLIC" CONSENT_TRANSITION_MAP: Dict[str, Dict[str, Dict[str, str]]] = { "testimony_draft": { "approve": {"to": "confirmed", "op": "validated"}, "reject": {"to": "rejected", "op": "validated"}, "revoke": {"to": "revoked", "op": "corrected"}, }, "decision_record_draft": { "approve": {"to": "confirmed", "op": "validated"}, "reject": {"to": "rejected", "op": "validated"}, "revoke": {"to": "revoked", "op": "corrected"}, }, "bridge_request_draft": { "approve": {"to": "approved_for_execution", "op": "export_validated"}, "reject": {"to": "rejected", "op": "validated"}, "revoke": {"to": "revoked", "op": "corrected"}, }, "export_payload_manifest": { "approve": {"to": "approved_for_execution", "op": "export_validated"}, "reject": {"to": "rejected", "op": "validated"}, "revoke": {"to": "revoked", "op": "corrected"}, }, "allocation_proposal": { "approve": {"to": "approved_for_execution", "op": "validated"}, "reject": {"to": "rejected", "op": "validated"}, "revoke": {"to": "revoked", "op": "corrected"}, }, "access_grant_draft": { "approve": {"to": "approved_for_execution", "op": "policy_checked"}, "reject": {"to": "rejected", "op": "validated"}, "revoke": {"to": "revoked", "op": "corrected"}, }, "visibility_change_draft": { "approve": {"to": "approved_for_execution", "op": "policy_checked"}, "reject": {"to": "rejected", "op": "validated"}, "revoke": {"to": "revoked", "op": "corrected"}, }, "offline_merge_plan": { "approve": {"to": "approved_for_execution", "op": "merged"}, "reject": {"to": "rejected", "op": "validated"}, "revoke": {"to": "revoked", "op": "corrected"}, }, "core_change_draft": { "approve": {"to": "needs_confirmation", "op": "policy_checked"}, "reject": {"to": "rejected", "op": "validated"}, "revoke": {"to": "revoked", "op": "corrected"}, }, } class RuntimeGuard: def __init__( self, registry_path: str, envelope_schema_path: str, artifact_schema_path: str, consent_event_schema_path: Optional[str] = None, mode: str = "soft", ) -> None: self.registry_path = Path(registry_path) self.envelope_schema_path = Path(envelope_schema_path) self.artifact_schema_path = Path(artifact_schema_path) self.consent_event_schema_path = Path(consent_event_schema_path) if consent_event_schema_path else None self.mode = mode if mode in {"soft", "strict"} else "soft" self.registry = self._load_registry() self.envelope_schema = self._load_json(self.envelope_schema_path) self.artifact_schema = self._load_json(self.artifact_schema_path) self.consent_event_schema = ( self._load_json(self.consent_event_schema_path) if self.consent_event_schema_path else {} ) def _load_json(self, path: Path) -> Dict[str, Any]: try: if path.exists(): return json.loads(path.read_text(encoding="utf-8")) except Exception: pass return {} def _load_registry(self) -> Dict[str, Any]: try: if self.registry_path.exists(): raw = yaml.safe_load(self.registry_path.read_text(encoding="utf-8")) or {} if isinstance(raw, dict): return raw except Exception: pass return {} def reload(self) -> None: self.registry = self._load_registry() self.envelope_schema = self._load_json(self.envelope_schema_path) self.artifact_schema = self._load_json(self.artifact_schema_path) self.consent_event_schema = ( self._load_json(self.consent_event_schema_path) if self.consent_event_schema_path else {} ) def get_agent(self, agent_id: str) -> Optional[Dict[str, Any]]: def _matches(entry: Dict[str, Any], requested_id: str) -> bool: if entry.get("agent_id") == requested_id: return True aliases = entry.get("aliases") if isinstance(aliases, list): return requested_id in {str(a) for a in aliases} return False workers = self.registry.get("workers", []) if not isinstance(workers, list): return None for w in workers: if isinstance(w, dict) and _matches(w, agent_id): return w mgr = self.registry.get("manager") if isinstance(mgr, dict) and _matches(mgr, agent_id): return mgr return None def _hash(self, text: str) -> str: return hashlib.sha256((text or "").encode("utf-8")).hexdigest()[:12] def build_envelope(self, agent_id: str, prompt: str, metadata: Dict[str, Any]) -> Dict[str, Any]: md = metadata or {} request_id = str(md.get("request_id") or f"rt-{int(time.time() * 1000)}-{self._hash(prompt)}") env = { "request_id": request_id, "agent_id": agent_id, "circle_context": md.get("circle_context") if isinstance(md.get("circle_context"), dict) else {}, "visibility_level_target": str(md.get("visibility_level_target") or "incircle"), "sensitivity_flags": md.get("sensitivity_flags") if isinstance(md.get("sensitivity_flags"), list) else [], "consent_status": str(md.get("consent_status") or "none"), "allowed_actions": md.get("allowed_actions") if isinstance(md.get("allowed_actions"), list) else ["analyze"], "expected_output": md.get("expected_output") or "__runtime_default__", "input_text": prompt or "", "requires_consent": bool(md.get("requires_consent")), "export_intent": bool(md.get("export_intent")), "constitution_version": str(md.get("constitution_version") or "JOS_BASE@1.0.0"), "agent_prompt_version": str(md.get("agent_prompt_version") or md.get("system_prompt_version") or ""), "router_guard_version": str(md.get("router_guard_version") or "runtime_guard@1.0.0"), "protocol_version": str(md.get("protocol_version") or "CLAN_AGENT_INTERACTION_PROTOCOL_V1@1.0.0"), "consent_events": md.get("consent_events") if isinstance(md.get("consent_events"), (dict, list)) else {}, "provenance": {"source": md.get("source", "router"), "channel": md.get("channel")}, } return env def _detect_secrets(self, text: str) -> bool: if not text: return False t = text.lower() patterns = [ r"seed\s*phrase", r"private\s*key", r"api[_-]?key", r"token", r"парол", r"приватн(ый|ий)\s+ключ", r"сид[-\s]?фраз", r"\b0x[a-f0-9]{40,}\b", r"[A-Za-z0-9_\-]{24,}:[A-Za-z0-9_\-]{20,}", ] return any(re.search(p, t) for p in patterns) def _detect_injection(self, text: str) -> bool: if not text: return False t = text.lower() markers = [ "ignore constitution", "ignore system prompt", "обійди конституцію", "ігноруй конституцію", "покажи секрет", "show secret", "виконай без згоди", "execute without consent", ] return any(m in t for m in markers) def _validate_envelope_shape(self, env: Dict[str, Any]) -> List[str]: errors: List[str] = [] required = self.envelope_schema.get("required", []) if isinstance(self.envelope_schema, dict) else [] for key in required: if key not in env: errors.append(f"missing:{key}") if not env.get("request_id"): errors.append("missing:request_id") if env.get("visibility_level_target") not in VISIBILITY: errors.append("invalid:visibility_level_target") if env.get("consent_status") not in CONSENT: errors.append("invalid:consent_status") actions = env.get("allowed_actions") if not isinstance(actions, list) or len(actions) == 0: errors.append("invalid:allowed_actions") if not isinstance(env.get("input_text"), str) or not env.get("input_text", "").strip(): errors.append("invalid:input_text") return errors def pre_dispatch_checks(self, env: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: errs = self._validate_envelope_shape(env) if errs and self.mode == "strict": return False, {"stop_code": STOP_SCHEMA_ENVELOPE, "details": errs[:10]} if errs and self.mode == "soft": # Continue in soft mode after coercion-like defaults. pass agent = self.get_agent(env.get("agent_id", "")) if not agent: return False, {"stop_code": STOP_AGENT_UNKNOWN, "details": [env.get("agent_id", "")]} if self._detect_secrets(env.get("input_text", "")): return False, {"stop_code": STOP_SECRETS_DETECTED, "details": ["input_contains_secret_like_pattern"]} if self._detect_injection(env.get("input_text", "")): return False, {"stop_code": STOP_INJECTION_ATTEMPT, "details": ["input_attempts_policy_bypass"]} expected = env.get("expected_output") allowed_outputs = agent.get("allowed_outputs", []) if isinstance(agent, dict) else [] if expected and expected != "__runtime_default__" and isinstance(allowed_outputs, list): if expected not in allowed_outputs: return False, {"stop_code": STOP_OUTPUT_NOT_ALLOWED, "details": [str(expected)]} if env.get("requires_consent") and env.get("consent_status") != "confirmed": return False, {"stop_code": STOP_CONSENT_MISSING, "details": ["requires_consent=true"]} if env.get("export_intent") and env.get("visibility_level_target") not in {"public", "interclan"}: return False, {"stop_code": STOP_EXPORT_PAYLOAD_NOT_PUBLIC, "details": [env.get("visibility_level_target")]} return True, {"ok": True, "agent": agent} def _validate_artifact_shape(self, artifact: Dict[str, Any]) -> List[str]: errors: List[str] = [] required = self.artifact_schema.get("required", []) if isinstance(self.artifact_schema, dict) else [] for key in required: if key not in artifact: errors.append(f"missing:{key}") if artifact.get("visibility_level") not in VISIBILITY: errors.append("invalid:visibility_level") if artifact.get("status") not in { "draft", "needs_confirmation", "waiting_for_consent", "confirmed", "proposed", "approved_for_execution", "rejected", "revoked", }: errors.append("invalid:status") return errors def _validate_provenance_min(self, provenance: Any) -> List[str]: if not isinstance(provenance, list) or len(provenance) < 1: return ["invalid:provenance"] errors: List[str] = [] for idx, trail in enumerate(provenance): if not isinstance(trail, dict): errors.append(f"invalid:provenance[{idx}]") continue if not trail.get("event_id"): errors.append(f"missing:provenance[{idx}].event_id") ts = trail.get("ts") if not isinstance(ts, int) or ts < 0: errors.append(f"invalid:provenance[{idx}].ts") actor = trail.get("actor") if not isinstance(actor, dict) or not actor.get("type") or not actor.get("id"): errors.append(f"missing:provenance[{idx}].actor") source = trail.get("source") if not isinstance(source, dict) or not source.get("channel") or not source.get("request_id"): errors.append(f"missing:provenance[{idx}].source") operation = trail.get("operation") if not isinstance(operation, dict) or not operation.get("op"): errors.append(f"missing:provenance[{idx}].operation") versions = trail.get("versions") if not isinstance(versions, dict) or not versions.get("constitution_version"): errors.append(f"missing:provenance[{idx}].versions.constitution_version") return errors def post_return_checks(self, env: Dict[str, Any], result: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: agent = self.get_agent(env.get("agent_id", "")) allowed_outputs = agent.get("allowed_outputs", []) if isinstance(agent, dict) else [] target = env.get("visibility_level_target", "incircle") target_rank = VIS_RANK.get(target, VIS_RANK["incircle"]) text_result = str(result.get("result") or "") if self._detect_secrets(text_result): return False, {"stop_code": STOP_SECRETS_DETECTED, "details": ["output_contains_secret_like_pattern"]} artifacts = result.get("artifacts") if artifacts is None: return True, {"ok": True} if not isinstance(artifacts, list): return False, {"stop_code": STOP_SCHEMA_ARTIFACT, "details": ["artifacts_not_list"]} for art in artifacts: if not isinstance(art, dict): return False, {"stop_code": STOP_SCHEMA_ARTIFACT, "details": ["artifact_not_object"]} errs = self._validate_artifact_shape(art) if errs: return False, {"stop_code": STOP_SCHEMA_ARTIFACT, "details": errs[:10]} prov_errs = self._validate_provenance_min(art.get("provenance")) if prov_errs: return False, {"stop_code": STOP_PROVENANCE_INVALID, "details": prov_errs[:10]} if isinstance(allowed_outputs, list) and art.get("type") not in allowed_outputs: return False, {"stop_code": STOP_OUTPUT_NOT_ALLOWED, "details": [str(art.get('type'))]} art_rank = VIS_RANK.get(art.get("visibility_level"), target_rank) if art_rank < target_rank: return False, { "stop_code": STOP_VISIBILITY_ESCALATION, "details": [str(art.get("visibility_level")), str(target)], } if env.get("consent_status") != "confirmed" and art.get("status") == "confirmed": return False, {"stop_code": STOP_CONSENT_MISSING, "details": ["artifact.confirmed_without_consent"]} ok_consent, consent_info = self._validate_confirmed_artifact_consent(env, art) if not ok_consent: return False, consent_info if self._detect_secrets(json.dumps(art.get("content", ""), ensure_ascii=False)): return False, {"stop_code": STOP_SECRETS_DETECTED, "details": ["artifact_content_secret_like_pattern"]} return True, {"ok": True} def _artifact_is_confirmed(self, artifact: Dict[str, Any]) -> bool: if str(artifact.get("status") or "") == "confirmed": return True prov = artifact.get("provenance") if not isinstance(prov, list): return False for tr in prov: if not isinstance(tr, dict): continue ctx = tr.get("context") if isinstance(tr.get("context"), dict) else {} if ctx.get("consent_status") == "confirmed": return True return False def _extract_consent_ref(self, artifact: Dict[str, Any]) -> str: prov = artifact.get("provenance") if not isinstance(prov, list): return "" for tr in reversed(prov): if not isinstance(tr, dict): continue ctx = tr.get("context") if isinstance(tr.get("context"), dict) else {} ref = str(ctx.get("consent_event_ref") or "").strip() if ref: return ref return "" def _get_consent_event(self, env: Dict[str, Any], consent_ref: str) -> Optional[Dict[str, Any]]: source = env.get("consent_events") if isinstance(source, dict): event = source.get(consent_ref) return event if isinstance(event, dict) else None if isinstance(source, list): for ev in source: if not isinstance(ev, dict): continue if str(ev.get("consent_event_id") or "") == consent_ref: return ev return None def _validate_consent_event_min(self, event: Dict[str, Any]) -> List[str]: errs: List[str] = [] if not isinstance(event, dict): return ["invalid:consent_event"] if not str(event.get("consent_event_id") or "").startswith("ce_"): errs.append("invalid:consent_event_id") ts = event.get("ts") if not isinstance(ts, int) or ts < 0: errs.append("invalid:ts") scope = event.get("scope") if not isinstance(scope, dict) or not scope.get("circle_id"): errs.append("missing:scope.circle_id") decision = event.get("decision") dtype = str((decision or {}).get("type") or "") if dtype not in {"approve", "reject", "revoke"}: errs.append("invalid:decision.type") target = event.get("target") if not isinstance(target, dict): errs.append("invalid:target") else: if not target.get("target_type"): errs.append("missing:target.target_type") if not isinstance(target.get("artifact_ids"), list) or len(target.get("artifact_ids")) < 1: errs.append("invalid:target.artifact_ids") if not target.get("operation"): errs.append("missing:target.operation") confirmations = event.get("confirmations") if dtype in {"approve", "reject", "revoke"}: if not isinstance(confirmations, list) or len(confirmations) < 1: errs.append("invalid:confirmations") quorum = event.get("quorum") if not isinstance(quorum, dict): errs.append("invalid:quorum") else: required = quorum.get("required") present = quorum.get("present") if not isinstance(required, int) or required < 1: errs.append("invalid:quorum.required") if not isinstance(present, int) or present < 0: errs.append("invalid:quorum.present") versions = event.get("versions") if not isinstance(versions, dict) or not versions.get("constitution_version"): errs.append("missing:versions.constitution_version") provenance = event.get("provenance") if not isinstance(provenance, dict) or not provenance.get("request_id"): errs.append("missing:provenance.request_id") return errs def _validate_confirmed_artifact_consent(self, env: Dict[str, Any], artifact: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: if not self._artifact_is_confirmed(artifact): return True, {"ok": True} consent_ref = self._extract_consent_ref(artifact) if not consent_ref: return False, {"stop_code": STOP_CONSENT_EVENT_MISSING, "details": ["missing:consent_event_ref"]} event = self._get_consent_event(env, consent_ref) if not event: return False, {"stop_code": STOP_CONSENT_EVENT_MISSING, "details": [f"missing:consent_event:{consent_ref}"]} event_errs = self._validate_consent_event_min(event) if event_errs: return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": event_errs[:10]} if str((event.get("decision") or {}).get("type") or "") != "approve": return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": ["invalid:decision.type_not_approve"]} art_id = str(artifact.get("id") or artifact.get("artifact_id") or "").strip() if not art_id: return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": ["missing:artifact.id"]} target_ids = (event.get("target") or {}).get("artifact_ids") if not isinstance(target_ids, list) or art_id not in [str(x) for x in target_ids]: return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": ["artifact_not_in_consent_target"]} confirmations = event.get("confirmations") if isinstance(event.get("confirmations"), list) else [] quorum = event.get("quorum") if isinstance(event.get("quorum"), dict) else {} required = quorum.get("required") present = quorum.get("present") if not isinstance(required, int) or not isinstance(present, int): return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": ["invalid:quorum"]} if len(confirmations) < required or present < required: return False, { "stop_code": STOP_CONSENT_QUORUM_NOT_MET, "details": [f"confirmations={len(confirmations)}", f"required={required}", f"present={present}"], } return True, {"ok": True} def _has_router_stamped_trail(self, provenance: Any) -> bool: if not isinstance(provenance, list): return False for trail in provenance: if not isinstance(trail, dict): continue actor = trail.get("actor") if isinstance(trail.get("actor"), dict) else {} op = trail.get("operation") if isinstance(trail.get("operation"), dict) else {} if actor.get("id") == "system:router" and op.get("op") == "stamped": return True return False def ensure_stamped_trails(self, result: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: artifacts = result.get("artifacts") if isinstance(result, dict) else None if artifacts is None: return True, {"ok": True} if not isinstance(artifacts, list): return False, {"stop_code": STOP_SCHEMA_ARTIFACT, "details": ["artifacts_not_list"]} for idx, art in enumerate(artifacts): if not isinstance(art, dict): return False, {"stop_code": STOP_SCHEMA_ARTIFACT, "details": [f"artifact_not_object:{idx}"]} if not self._has_router_stamped_trail(art.get("provenance")): return False, { "stop_code": STOP_PROVENANCE_INVALID, "details": [f"missing:provenance[{idx}].stamped_trail"], } return True, {"ok": True} def _normalize_provenance(self, provenance: Any) -> List[Dict[str, Any]]: if not isinstance(provenance, list): return [] out: List[Dict[str, Any]] = [] seen_ids = set() seen_fallback = set() for item in provenance: if not isinstance(item, dict): continue event_id = str(item.get("event_id") or "").strip() if event_id and event_id in seen_ids: continue if not event_id: fallback_key = "|".join( [ str(item.get("ts", "")), str((item.get("actor") or {}).get("id", "")), str((item.get("operation") or {}).get("op", "")), str((item.get("operation") or {}).get("input_hash", "")), ] ) if fallback_key in seen_fallback: continue seen_fallback.add(fallback_key) else: seen_ids.add(event_id) out.append(item) return out def _build_stamp_trail(self, env: Dict[str, Any], artifact: Dict[str, Any]) -> Dict[str, Any]: now_ts = int(time.time()) rid = str(env.get("request_id") or "") aid = str(env.get("agent_id") or "") vlevel = str(artifact.get("visibility_level") or env.get("visibility_level_target") or "incircle") raw = json.dumps(artifact, ensure_ascii=False, sort_keys=True) return { "event_id": f"prov_{uuid.uuid4().hex}", "ts": now_ts, "actor": { "type": "system", "id": "system:router", "display": "Runtime Guard", }, "source": { "channel": str((env.get("provenance") or {}).get("channel") or "internal"), "request_id": rid, "session_id": str(((env.get("source") or {}) if isinstance(env.get("source"), dict) else {}).get("session_id") or ""), "message_id": str(((env.get("source") or {}) if isinstance(env.get("source"), dict) else {}).get("message_id") or ""), }, "context": { "circle_id": str(((env.get("circle_context") or {}) if isinstance(env.get("circle_context"), dict) else {}).get("circle_id") or ""), "circle_name": str(((env.get("circle_context") or {}) if isinstance(env.get("circle_context"), dict) else {}).get("circle_name") or ""), "gate_level": str(((env.get("circle_context") or {}) if isinstance(env.get("circle_context"), dict) else {}).get("gate_level") or ""), "visibility_level": vlevel, "consent_status": str(env.get("consent_status") or "none"), "consent_event_ref": str((env.get("provenance") or {}).get("consent_event_ref") or ""), }, "operation": { "op": "stamped", "input_hash": f"sha256:{self._hash(str(env.get('input_text', '')))}", "output_hash": f"sha256:{self._hash(raw)}", "notes": f"runtime stamping for agent:{aid}", }, "versions": { "constitution_version": str(env.get("constitution_version") or "JOS_BASE@1.0.0"), "agent_prompt_version": str(env.get("agent_prompt_version") or ""), "router_guard_version": str(env.get("router_guard_version") or "runtime_guard@1.0.0"), "protocol_version": str(env.get("protocol_version") or "CLAN_AGENT_INTERACTION_PROTOCOL_V1@1.0.0"), }, "links": { "parent_artifact_ids": [], "related_artifact_ids": [], "external_refs": [], }, } def stamp_result_artifacts(self, env: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(result, dict): return result artifacts = result.get("artifacts") if not isinstance(artifacts, list): return result for artifact in artifacts: if not isinstance(artifact, dict): continue existing = self._normalize_provenance(artifact.get("provenance")) trail = self._build_stamp_trail(env, artifact) artifact["provenance"] = self._normalize_provenance(existing + [trail]) return result def artifact_runtime_rows(self, env: Dict[str, Any], result: Dict[str, Any]) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] if not isinstance(result, dict): return rows artifacts = result.get("artifacts") if not isinstance(artifacts, list): return rows for art in artifacts: if not isinstance(art, dict): continue status = str(art.get("status") or "draft") prov = art.get("provenance") has_prov = isinstance(prov, list) and len(prov) > 0 visibility = art.get("visibility_level") has_vis = visibility in VISIBILITY rows.append( { "event": "artifact_emitted", "request_id": env.get("request_id"), "agent_id": env.get("agent_id"), "artifact_type": art.get("type"), "visibility_level": visibility, "has_provenance": has_prov, "provenance_trails_count": len(prov) if isinstance(prov, list) else 0, "status": status, "needs_confirmation": status in {"needs_confirmation", "waiting_for_consent"}, "has_visibility_and_provenance": bool(has_vis and has_prov), "constitution_version": env.get("constitution_version"), "router_guard_version": env.get("router_guard_version"), } ) return rows def consent_runtime_rows(self, env: Dict[str, Any], result: Dict[str, Any]) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] if not isinstance(result, dict): return rows artifacts = result.get("artifacts") if not isinstance(artifacts, list): return rows now_ts = int(time.time()) for art in artifacts: if not isinstance(art, dict): continue if not self._artifact_is_confirmed(art): continue consent_ref = self._extract_consent_ref(art) if not consent_ref: continue event = self._get_consent_event(env, consent_ref) if not isinstance(event, dict): continue ok, _ = self._validate_confirmed_artifact_consent(env, art) if not ok: continue target = event.get("target") if isinstance(event.get("target"), dict) else {} quorum = event.get("quorum") if isinstance(event.get("quorum"), dict) else {} versions = event.get("versions") if isinstance(event.get("versions"), dict) else {} confirmations = event.get("confirmations") if isinstance(event.get("confirmations"), list) else [] rows.append( { "event": "consent_applied", "ts": now_ts, "request_id": env.get("request_id"), "agent_id": env.get("agent_id"), "artifact_id": str(art.get("id") or art.get("artifact_id") or ""), "artifact_type": art.get("type"), "artifact_status": art.get("status"), "visibility_level": art.get("visibility_level"), "consent_event_id": str(event.get("consent_event_id") or consent_ref), "consent_decision": "approve", "operation": target.get("operation"), "target_type": target.get("target_type"), "confirmations_count": len(confirmations), "quorum_required": quorum.get("required"), "quorum_present": quorum.get("present"), "constitution_version": versions.get("constitution_version") or env.get("constitution_version"), "protocol_version": versions.get("protocol_version") or env.get("protocol_version"), "router_guard_version": env.get("router_guard_version"), } ) return rows def _consent_transition(self, artifact_type: str, decision_type: str) -> Optional[Dict[str, str]]: if artifact_type in CONSENT_TRANSITION_MAP: return CONSENT_TRANSITION_MAP[artifact_type].get(decision_type) if decision_type == "approve": return {"to": "needs_confirmation", "op": "validated"} if decision_type == "reject": return {"to": "rejected", "op": "validated"} if decision_type == "revoke": return {"to": "revoked", "op": "corrected"} return None def _has_consent_application_trail(self, artifact: Dict[str, Any], consent_event_id: str) -> bool: provenance = artifact.get("provenance") if not isinstance(provenance, list): return False for trail in provenance: if not isinstance(trail, dict): continue ctx = trail.get("context") if isinstance(trail.get("context"), dict) else {} op = trail.get("operation") if isinstance(trail.get("operation"), dict) else {} if ( str(ctx.get("consent_event_ref") or "") == consent_event_id and str(op.get("op") or "") in {"validated", "export_validated", "policy_checked", "merged", "corrected"} ): return True return False def _has_prior_approve_trail(self, artifact: Dict[str, Any]) -> bool: provenance = artifact.get("provenance") if not isinstance(provenance, list): return False for trail in provenance: if not isinstance(trail, dict): continue ctx = trail.get("context") if isinstance(trail.get("context"), dict) else {} op = trail.get("operation") if isinstance(trail.get("operation"), dict) else {} if ctx.get("consent_status") == "confirmed" and str(op.get("op") or "") in { "validated", "export_validated", "policy_checked", "merged", }: return True return False def apply_consent_event( self, consent_event: Dict[str, Any], artifacts_by_id: Dict[str, Dict[str, Any]], now_ts: Optional[int] = None, applier_actor_id: str = "system:consent-applier", ) -> Tuple[bool, Dict[str, Any]]: ts = int(now_ts or time.time()) event_errs = self._validate_consent_event_min(consent_event) if event_errs: return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": event_errs[:10]} decision = consent_event.get("decision") if isinstance(consent_event.get("decision"), dict) else {} decision_type = str(decision.get("type") or "") expires_at = decision.get("expires_at") if isinstance(expires_at, int) and expires_at < ts: return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": ["consent_event_expired"]} confirmations = consent_event.get("confirmations") if isinstance(consent_event.get("confirmations"), list) else [] quorum = consent_event.get("quorum") if isinstance(consent_event.get("quorum"), dict) else {} required = quorum.get("required") present = quorum.get("present") if decision_type == "approve": if not isinstance(required, int) or not isinstance(present, int): return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": ["invalid:quorum"]} if len(confirmations) < required or present < required: return False, { "stop_code": STOP_CONSENT_QUORUM_NOT_MET, "details": [f"confirmations={len(confirmations)}", f"required={required}", f"present={present}"], } target = consent_event.get("target") if isinstance(consent_event.get("target"), dict) else {} target_ids = target.get("artifact_ids") if isinstance(target.get("artifact_ids"), list) else [] consent_event_id = str(consent_event.get("consent_event_id") or "") target_operation = str(target.get("operation") or "") updated: List[Dict[str, Any]] = [] logs: List[Dict[str, Any]] = [] for artifact_id in [str(x) for x in target_ids]: artifact = artifacts_by_id.get(artifact_id) if not isinstance(artifact, dict): return False, { "stop_code": STOP_CONSENT_EVENT_MISSING, "details": [f"artifact_missing:{artifact_id}"], } if self._has_consent_application_trail(artifact, consent_event_id): continue if decision_type == "approve" and str(artifact.get("status") or "") == "rejected": return False, { "stop_code": STOP_CONSENT_EVENT_INVALID, "details": [f"one_way_violation:{artifact_id}"], } if decision_type == "revoke" and not self._has_prior_approve_trail(artifact): return False, { "stop_code": STOP_CONSENT_EVENT_INVALID, "details": [f"revoke_without_prior_approve:{artifact_id}"], } transition = self._consent_transition(str(artifact.get("type") or ""), decision_type) if not transition: return False, { "stop_code": STOP_CONSENT_EVENT_INVALID, "details": [f"unsupported_transition:{artifact.get('type')}:{decision_type}"], } from_status = str(artifact.get("status") or "draft") to_status = transition["to"] op = transition["op"] consent_status = "confirmed" if decision_type == "approve" else "none" trail = { "event_id": f"prov_{uuid.uuid4().hex}", "ts": ts, "actor": {"type": "system", "id": applier_actor_id}, "source": { "channel": str((consent_event.get("provenance") or {}).get("channel") or "internal"), "request_id": str((consent_event.get("provenance") or {}).get("request_id") or ""), }, "context": { "visibility_level": str(artifact.get("visibility_level") or "incircle"), "consent_status": consent_status, "consent_event_ref": consent_event_id, }, "operation": { "op": op, "input_hash": str((consent_event.get("provenance") or {}).get("input_hash") or ""), "notes": f"consent decision={decision_type}; op={target_operation}", }, "versions": { "constitution_version": str((consent_event.get("versions") or {}).get("constitution_version") or ""), "protocol_version": str((consent_event.get("versions") or {}).get("protocol_version") or ""), "router_guard_version": "runtime_guard@1.0.0", }, "links": { "related_artifact_ids": [x for x in [str(i) for i in target_ids] if x != artifact_id], "external_refs": [consent_event_id], }, } provenance = self._normalize_provenance(artifact.get("provenance")) artifact["provenance"] = self._normalize_provenance(provenance + [trail]) artifact["status"] = to_status updated.append(artifact) logs.append( { "event": "artifact_state_transition", "ts": ts, "request_id": str((consent_event.get("provenance") or {}).get("request_id") or ""), "artifact_id": artifact_id, "artifact_type": artifact.get("type"), "from_status": from_status, "to_status": to_status, "op": op, "consent_event_id": consent_event_id, } ) return True, {"ok": True, "updated_artifacts": updated, "artifact_state_transition_rows": logs} def stop_payload(self, env: Dict[str, Any], stop: Dict[str, Any]) -> Dict[str, Any]: stop_code = stop.get("stop_code", "STOP_UNKNOWN") details = stop.get("details", []) next_step = "Потрібне підтвердження кола або уточнення контексту." if stop_code in {STOP_SECRETS_DETECTED, STOP_INJECTION_ATTEMPT}: next_step = "Зупинено з міркувань безпеки. Приберіть секрети/небезпечні інструкції та повторіть запит." elif stop_code in {STOP_CONSENT_MISSING, STOP_EXPORT_PAYLOAD_NOT_PUBLIC}: next_step = "Потрібен Consent Event або підвищення рівня видимості за процедурою." elif stop_code in {STOP_CONSENT_EVENT_INVALID, STOP_CONSENT_EVENT_MISSING, STOP_CONSENT_QUORUM_NOT_MET}: next_step = "Потрібна валідна ConsentEvent-подія з коректним кворумом і прив’язкою до артефакта." elif stop_code == STOP_OUTPUT_NOT_ALLOWED: next_step = "Виправте expected_output відповідно до agents_registry.yaml." return { "ok": False, "stop_code": stop_code, "details": details, "request_id": env.get("request_id"), "agent_id": env.get("agent_id"), "next_step": next_step, "timestamp": int(time.time()), "input_hash": self._hash(env.get("input_text", "")), }