diff --git a/config/crewai_agents.json b/config/crewai_agents.json index e92a0a3e..160ab75f 100644 --- a/config/crewai_agents.json +++ b/config/crewai_agents.json @@ -159,6 +159,19 @@ "values", "wellbeing" ] + }, + { + "id": "sofiia", + "display_name": "SOFIIA", + "role": "Chief AI Architect & Monitor Orchestrator", + "can_orchestrate": true, + "domains": [ + "infrastructure", + "observability", + "runtime_guard", + "incident_response", + "node_orchestration" + ] } ], "workers": [ @@ -179,28 +192,27 @@ ], "teams": { "helion": { - "team_name": "Helion Team", + "team_name": "HELION Energy Council", "members": [ { - "role": "Energy Analyst", - "skills": [ - "market_research", - "data_analysis" - ] + "role": "Energy Researcher", + "skills": [] }, { - "role": "Biomass Specialist", - "skills": [ - "biomass_tech", - "processing" - ] + "role": "Systems Modeler", + "skills": [] }, { - "role": "Strategy Advisor", - "skills": [ - "investment", - "planning" - ] + "role": "Policy Analyst", + "skills": [] + }, + { + "role": "Risk Assessor", + "skills": [] + }, + { + "role": "Communicator", + "skills": [] } ] }, @@ -273,28 +285,75 @@ ] }, "nutra": { - "team_name": "NUTRA Team", + "team_name": "NUTRA Wellness Team", "members": [ { - "role": "Nutritional Scientist", - "skills": [ - "research", - "formulation" - ] + "role": "AI-Нутрициолог", + "skills": [] }, { - "role": "Lab Interpreter", - "skills": [ - "biomarkers", - "analysis" - ] + "role": "AI-Клінічний нутрициолог", + "skills": [] }, { - "role": "Protocol Designer", - "skills": [ - "supplementation", - "dosing" - ] + "role": "AI-Детокс-наставник", + "skills": [] + }, + { + "role": "AI-Ендокрин-гід", + "skills": [] + }, + { + "role": "AI-Фітнес-тренер", + "skills": [] + }, + { + "role": "AI-Гастро-асистент", + "skills": [] + }, + { + "role": "AI-Психолог-коуч", + "skills": [] + }, + { + "role": "AI-Косметолог-експерт", + "skills": [] + }, + { + "role": "AI-Трихолог", + "skills": [] + }, + { + "role": "AI-Сон-експерт", + "skills": [] + }, + { + "role": "AI-Фудхакер", + "skills": [] + }, + { + "role": "Фейс-Фітнес Тренер", + "skills": [] + }, + { + "role": "Тренер Тіла", + "skills": [] + }, + { + "role": "Наставниця Циклу", + "skills": [] + }, + { + "role": "Наставниця Материнства", + "skills": [] + }, + { + "role": "Цілителька", + "skills": [] + }, + { + "role": "AI-Аналітик Раціону", + "skills": [] } ] }, @@ -310,96 +369,19 @@ ] }, { - "role": "farmOS SoR Analyst", + "role": "Context/Memory Manager", "skills": [ - "farmos_api", - "field_management" + "context_memory", + "memory_brief", + "state_tracking" ] }, { - "role": "IoT/ThingsBoard Engineer", + "role": "Policy/Risk Manager", "skills": [ - "iot_ingestion", - "edge_polygon" - ] - }, - { - "role": "Excel/Document Engineer", - "skills": [ - "xlsx_processing", - "data_quality" - ] - }, - { - "role": "Finance & Costing", - "skills": [ - "budgeting", - "profitability", - "contracts" - ] - }, - { - "role": "Cadastre & GIS", - "skills": [ - "geo_cadastre", - "gis_integration" - ] - }, - { - "role": "LiteFarm Analytics", - "skills": [ - "bi_dashboards", - "analytics" - ] - }, - { - "role": "Platform DevOps", - "skills": [ - "sre", - "observability", - "ci_cd" - ] - }, - { - "role": "Supply & Warehouse", - "skills": [ - "inventory", - "procurement" - ] - }, - { - "role": "QA & Testing", - "skills": [ - "test_strategy", - "autotests" - ] - }, - { - "role": "Security & Access", - "skills": [ - "audit_compliance", - "access_control" - ] - }, - { - "role": "Event Bus Integrator", - "skills": [ - "nats", - "connectors" - ] - }, - { - "role": "Product/MVP", - "skills": [ - "product_strategy", - "ux" - ] - }, - { - "role": "Synthesis Core", - "skills": [ - "answer_synthesis", - "technical_clarity" + "policy_risk", + "constraints", + "risk_assessment" ] } ] @@ -527,6 +509,35 @@ ] } ] + }, + "sofiia": { + "team_name": "SOFIIA Monitor Orchestration Team", + "members": [ + { + "role": "Node Monitor Coordinator", + "skills": [ + "observability", + "incident_triage", + "runbook_coordination" + ] + }, + { + "role": "Monitor Handoff Controller", + "skills": [ + "multi_node_routing", + "escalation", + "consent_safe_dispatch" + ] + }, + { + "role": "Infrastructure Synthesizer", + "skills": [ + "summary", + "risk_reporting", + "next_steps" + ] + } + ] } } -} \ No newline at end of file +} diff --git a/config/crewai_teams.yml b/config/crewai_teams.yml index 0970f4f0..29bc4a12 100644 --- a/config/crewai_teams.yml +++ b/config/crewai_teams.yml @@ -799,6 +799,69 @@ clan: llm_profile: community delegation: enabled: false + zhos_mvp: + team_name: CLAN ZHOS Circle + parallel_roles: false + max_concurrency: 1 + synthesis: + role_context: Spirit-Orchestrator + system_prompt_ref: roles/clan/zhos/orchestrator.md + llm_profile: reasoning + team: + - id: privacy_sentinel + role_context: Privacy-Sentinel + system_prompt_ref: roles/clan/zhos/privacy_sentinel.md + llm_profile: reasoning + - id: memory + role_context: Agent-Memory + system_prompt_ref: roles/clan/zhos/memory.md + llm_profile: reasoning + - id: process + role_context: Agent-Process + system_prompt_ref: roles/clan/zhos/process.md + llm_profile: reasoning + - id: bridge + role_context: Agent-Bridge + system_prompt_ref: roles/clan/zhos/bridge.md + llm_profile: reasoning + - id: gifts + role_context: Agent-Gifts + system_prompt_ref: roles/clan/zhos/gifts.md + llm_profile: community + - id: core_guardian + role_context: Agent-Core-Guardian + system_prompt_ref: roles/clan/zhos/core_guardian.md + llm_profile: reasoning + - id: sync + role_context: Agent-Sync + system_prompt_ref: roles/clan/zhos/sync.md + llm_profile: reasoning + - id: identity + role_context: Agent-Identity + system_prompt_ref: roles/clan/zhos/identity.md + llm_profile: reasoning + - id: gate_policy + role_context: Agent-Gate-Policy + system_prompt_ref: roles/clan/zhos/gate_policy.md + llm_profile: reasoning + - id: audit_log + role_context: Agent-Audit-Log + system_prompt_ref: roles/clan/zhos/audit_log.md + llm_profile: reasoning + - id: infra_health + role_context: Agent-Infra-Health + system_prompt_ref: roles/clan/zhos/infra_health.md + llm_profile: reasoning + - id: research_scout + role_context: Agent-Research-Scout + system_prompt_ref: roles/clan/zhos/research_scout.md + llm_profile: reasoning + - id: ritual_field + role_context: Agent-Ritual-Field + system_prompt_ref: roles/clan/zhos/ritual_field.md + llm_profile: community + delegation: + enabled: false default_profile: default eonarch: profiles: @@ -902,3 +965,41 @@ soul: delegation: enabled: false default_profile: default +sofiia: + profiles: + default: + team_name: SOFIIA Monitor Orchestrator + parallel_roles: true + max_concurrency: 3 + synthesis: + role_context: SOFIIA Orchestrator + system_prompt_ref: roles/sofiia/orchestrator_synthesis.md + llm_profile: cloud_deepseek + team: + - id: system_architect + role_context: System Architect + system_prompt_ref: roles/sofiia/system_architect.md + llm_profile: local_qwen3_8b + - id: platform_integrator + role_context: Platform Integrator + system_prompt_ref: roles/sofiia/platform_integrator.md + llm_profile: local_qwen3_8b + - id: security_reviewer + role_context: Security Reviewer + system_prompt_ref: roles/sofiia/security_reviewer.md + llm_profile: local_qwen3_8b + - id: monitor_bridge + role_context: Monitor Bridge + system_prompt_ref: roles/sofiia/communicator.md + llm_profile: local_qwen3_8b + delegation: + enabled: true + mode: router_infer + selection_policy: router_by_id + max_hops: 1 + forbid_self: true + attach_headers: + handoff_from: sofiia + allow_top_level_agents: + - monitor + default_profile: default diff --git a/ops/canary_gateway_delivery_priority.sh b/ops/canary_gateway_delivery_priority.sh new file mode 100644 index 00000000..4bac7185 --- /dev/null +++ b/ops/canary_gateway_delivery_priority.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash +set -euo pipefail + +TARGET="/opt/microdao-daarion/gateway-bot/http_api.py" + +python3 - <<'PY' +from pathlib import Path +import re +p = Path('/opt/microdao-daarion/gateway-bot/http_api.py') +text = p.read_text(encoding='utf-8') + +anchors = { + 'file_base64': r'\n\s*if file_base64:\n', + 'image_base64': r'\n\s*elif image_base64:\n', + 'text_fallback': r'\n\s*else:\n\s*# Send text response only\n', +} + +pos = {} +for k, pat in anchors.items(): + m = re.search(pat, text) + if not m: + raise SystemExit(f"[FAIL] anchor not found: {k}") + pos[k] = m.start() + +expected = ['file_base64','image_base64','text_fallback'] +for a, b in zip(expected, expected[1:]): + if not (pos[a] < pos[b]): + raise SystemExit(f"[FAIL] priority order broken: {a} should be before {b}") + +print('[OK] gateway delivery priority order is correct') +for k in expected: + print(f' - {k}: {pos[k]}') +PY + +echo "[OK] gateway delivery priority canary passed" diff --git a/ops/canary_router_contract.sh b/ops/canary_router_contract.sh new file mode 100644 index 00000000..0f1ca28c --- /dev/null +++ b/ops/canary_router_contract.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROUTER_URL="http://127.0.0.1:9102" + +curl -fsS "$ROUTER_URL/health" >/dev/null + +echo "[INFO] Calling /v1/agents/devtools/infer for contract check" +resp=$(curl -fsS -X POST "$ROUTER_URL/v1/agents/devtools/infer" \ + -H "Content-Type: application/json" \ + -d '{"prompt":"Reply with: ok","max_tokens":32,"temperature":0.1}') + +RESP="$resp" python3 - <<'PY' +import json, os +obj = json.loads(os.environ['RESP']) +required = [ + 'response', 'model', 'backend', 'tokens_used', + 'image_base64', 'file_base64', 'file_name', 'file_mime' +] +missing = [k for k in required if k not in obj] +if missing: + raise SystemExit(f"Missing keys: {missing}; got keys={sorted(obj.keys())}") +print('[OK] Router infer contract keys present') +PY + +echo "[OK] router contract canary passed" diff --git a/ops/monitor_notify_sofiia.sh b/ops/monitor_notify_sofiia.sh new file mode 100644 index 00000000..cb9d0c88 --- /dev/null +++ b/ops/monitor_notify_sofiia.sh @@ -0,0 +1,129 @@ +#!/usr/bin/env bash +set -euo pipefail + +STATUS_JSON="${1:-/opt/microdao-daarion/ops/status/canary_all.latest.json}" +ROOT="/opt/microdao-daarion" +ROUTER_URL="${ROUTER_URL:-http://127.0.0.1:9102}" +REPORT_ENABLED="${SOFIIA_REPORTS_ENABLED:-true}" +REPORT_MODE="${SOFIIA_REPORT_MODE:-fail_only}" # fail_only | always +REPORT_TIMEOUT="${SOFIIA_REPORT_TIMEOUT:-180}" +REPORT_CHAT_ID="${SOFIIA_REPORT_CHAT_ID:-ops-monitor-sofiia}" +REPORT_USER_ID="${SOFIIA_REPORT_USER_ID:-ops-monitor-agent}" +REPORT_USERNAME="${SOFIIA_REPORT_USERNAME:-monitor-agent}" +REPORT_TELEGRAM_CHAT_ID="${SOFIIA_REPORT_TELEGRAM_CHAT_ID:-}" +SOFIIA_BOT_TOKEN="${SOFIIA_TELEGRAM_BOT_TOKEN:-${TELEGRAM_BOT_TOKEN:-}}" + +if [[ "${REPORT_ENABLED,,}" != "true" ]]; then + echo "[INFO] sofiia notify disabled" + exit 0 +fi + +if [[ ! -f "$STATUS_JSON" ]]; then + echo "[WARN] status json not found: $STATUS_JSON" + exit 0 +fi + +python3 - "$STATUS_JSON" "$ROOT" "$ROUTER_URL" "$REPORT_MODE" "$REPORT_TIMEOUT" "$REPORT_CHAT_ID" "$REPORT_USER_ID" "$REPORT_USERNAME" "$REPORT_TELEGRAM_CHAT_ID" "$SOFIIA_BOT_TOKEN" <<'PY' +import json +import sys +from pathlib import Path +from urllib import request as urlreq +from urllib.error import URLError, HTTPError + +status_json = Path(sys.argv[1]) +root = Path(sys.argv[2]) +router_url = sys.argv[3].rstrip('/') +report_mode = sys.argv[4] +timeout_s = int(sys.argv[5]) +chat_id = sys.argv[6] +user_id = sys.argv[7] +username = sys.argv[8] +tg_chat_id = sys.argv[9].strip() +tg_token = sys.argv[10].strip() + +payload = json.loads(status_json.read_text(encoding='utf-8')) +status = str(payload.get('status', 'unknown')).lower() + +if report_mode == 'fail_only' and status == 'ok': + print('[INFO] sofiia notify skipped: status=ok and mode=fail_only') + sys.exit(0) + +log_path = payload.get('log_path') +log_tail = '' +if log_path: + p = Path(log_path) + if p.exists(): + lines = p.read_text(encoding='utf-8', errors='ignore').splitlines() + log_tail = '\n'.join(lines[-40:]) + +prompt = ( + 'System monitoring report from NODE1 operator pipeline. '\ + 'Analyze briefly and return 3 sections: status, risks, actions.\\n\\n' + f"status={payload.get('status')}\\n" + f"exit_code={payload.get('exit_code')}\\n" + f"started_at={payload.get('started_at')}\\n" + f"ended_at={payload.get('ended_at')}\\n" + f"log_path={payload.get('log_path')}\\n\\n" + 'log_tail:\\n' + f"{log_tail[:6000]}" +) + +body = { + 'prompt': prompt, + 'max_tokens': 400, + 'temperature': 0.1, + 'metadata': { + 'source': 'ops-monitor-canary', + 'force_concise': True, + 'user_id': user_id, + 'chat_id': chat_id, + 'username': username, + 'session_id': f'{chat_id}:sofiia:monitor', + 'report_mode': report_mode, + } +} + +req = urlreq.Request( + url=f"{router_url}/v1/agents/sofiia/infer", + data=json.dumps(body).encode('utf-8'), + headers={'Content-Type': 'application/json'}, + method='POST', +) + +try: + with urlreq.urlopen(req, timeout=timeout_s) as resp: + raw = resp.read().decode('utf-8', errors='ignore') + data = json.loads(raw) + text = (data.get('response') or '').strip() + short = text[:200] + print(f"[OK] sofiia report sent: backend={data.get('backend')} model={data.get('model')} preview={short!r}") + + if tg_chat_id and tg_token and text: + msg = ( + "[NODE1 Monitor]\n" + f"status={payload.get('status')} exit_code={payload.get('exit_code')}\n\n" + f"{text[:3500]}" + ) + tg_req = urlreq.Request( + url=f"https://api.telegram.org/bot{tg_token}/sendMessage", + data=json.dumps({"chat_id": tg_chat_id, "text": msg}).encode('utf-8'), + headers={'Content-Type': 'application/json'}, + method='POST', + ) + try: + with urlreq.urlopen(tg_req, timeout=20) as tg_resp: + tg_data = json.loads(tg_resp.read().decode('utf-8', errors='ignore')) + if tg_data.get('ok'): + print(f"[OK] telegram report delivered: chat_id={tg_chat_id}") + else: + print(f"[WARN] telegram send not ok: {tg_data}") + except Exception as tg_e: + print(f"[WARN] telegram send failed: {tg_e}") + else: + print('[INFO] telegram delivery skipped (missing SOFIIA_REPORT_TELEGRAM_CHAT_ID or token or empty text)') +except HTTPError as e: + msg = e.read().decode('utf-8', errors='ignore')[:300] + raise SystemExit(f"[FAIL] sofiia report HTTPError {e.code}: {msg}") +except URLError as e: + raise SystemExit(f"[FAIL] sofiia report URLError: {e}") +PY diff --git a/services/router/crewai_client.py b/services/router/crewai_client.py index 0199db91..1ff4d6ba 100644 --- a/services/router/crewai_client.py +++ b/services/router/crewai_client.py @@ -13,6 +13,7 @@ logger = logging.getLogger(__name__) CREWAI_URL = os.getenv("CREWAI_URL", "http://dagi-staging-crewai-service:9010") CREWAI_ENABLED = os.getenv("CREWAI_ENABLED", "true").lower() == "true" +CREWAI_ORCHESTRATORS_ALWAYS = os.getenv("CREWAI_ORCHESTRATORS_ALWAYS", "true").lower() == "true" CREWAI_AGENTS_PATH = os.getenv("CREWAI_AGENTS_PATH", "/config/crewai_agents.json") FALLBACK_CREWAI_PATH = "/app/config/crewai_agents.json" @@ -66,7 +67,7 @@ def get_agent_crewai_info(agent_id): } -def should_use_crewai(agent_id, prompt, agent_config, force_crewai=False): +def should_use_crewai(agent_id, prompt, agent_config, metadata=None, force_crewai=False): """ Decide whether to use CrewAI orchestration or direct LLM. Returns: (use_crewai: bool, reason: str) @@ -88,6 +89,10 @@ def should_use_crewai(agent_id, prompt, agent_config, force_crewai=False): team = crewai_info.get("team", []) if not team: return False, "agent_has_no_team" + + # Architecture mode: top-level orchestrators go through CrewAI API by default. + if CREWAI_ORCHESTRATORS_ALWAYS: + return True, "orchestrator_default_crewai" if len(prompt) < MIN_PROMPT_LENGTH_FOR_CREW: return False, "prompt_too_short" @@ -101,18 +106,25 @@ def should_use_crewai(agent_id, prompt, agent_config, force_crewai=False): return False, "default_direct_llm" -async def call_crewai(agent_id, task, context=None, team=None): +async def call_crewai(agent_id, task, context=None, team=None, profile=None): try: if not team: crewai_info = get_agent_crewai_info(agent_id) team = crewai_info.get("team", []) - async with httpx.AsyncClient(timeout=180.0) as client: + async with httpx.AsyncClient(timeout=600.0) as client: + effective_context = context or {} + effective_profile = profile or (effective_context.get("metadata", {}) or {}).get("crewai_profile") + if not effective_profile and agent_id == "clan": + effective_profile = "zhos_mvp" + payload = { "task": task, - "orchestrator": agent_id, - "context": context or {}, + "orchestrator_id": agent_id, + "context": effective_context, } + if effective_profile: + payload["profile"] = effective_profile if team: payload["team"] = [ m.get("role", str(m)) if isinstance(m, dict) else m diff --git a/services/router/main.py b/services/router/main.py index 4056bf11..85aa19da 100644 --- a/services/router/main.py +++ b/services/router/main.py @@ -38,6 +38,14 @@ except ImportError: TOOL_MANAGER_AVAILABLE = False ToolManager = None +# Runtime Guard (Envelope/Artifact validation for CLAN orchestration) +try: + from runtime_guard import RuntimeGuard + RUNTIME_GUARD_AVAILABLE = True +except ImportError: + RUNTIME_GUARD_AVAILABLE = False + RuntimeGuard = None + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -603,6 +611,16 @@ CITY_SERVICE_URL = os.getenv("CITY_SERVICE_URL", "http://daarion-city-service:70 # CrewAI Routing Configuration CREWAI_ROUTING_ENABLED = os.getenv("CREWAI_ROUTING_ENABLED", "true").lower() == "true" CREWAI_URL = os.getenv("CREWAI_URL", "http://dagi-staging-crewai-service:9010") +CLAN_RUNTIME_GUARD_ENABLED = os.getenv("CLAN_RUNTIME_GUARD_ENABLED", "true").lower() == "true" +CLAN_RUNTIME_GUARD_MODE = os.getenv("CLAN_RUNTIME_GUARD_MODE", "soft").lower() +CLAN_GUARD_TEST_MODE = os.getenv("CLAN_GUARD_TEST_MODE", "false").lower() == "true" +CLAN_RUNTIME_REGISTRY_PATH = os.getenv("CLAN_RUNTIME_REGISTRY_PATH", "/app/config/roles/clan/zhos/agents_registry.yaml") +CLAN_RUNTIME_ENVELOPE_SCHEMA_PATH = os.getenv("CLAN_RUNTIME_ENVELOPE_SCHEMA_PATH", "/app/docs/contracts/clan-envelope.schema.json") +CLAN_RUNTIME_ARTIFACT_SCHEMA_PATH = os.getenv("CLAN_RUNTIME_ARTIFACT_SCHEMA_PATH", "/app/docs/contracts/clan-artifact.schema.json") +CLAN_RUNTIME_CONSENT_EVENT_SCHEMA_PATH = os.getenv( + "CLAN_RUNTIME_CONSENT_EVENT_SCHEMA_PATH", + "/app/docs/contracts/clan-consent-event.schema.json", +) # Neo4j Configuration NEO4J_URI = os.getenv("NEO4J_BOLT_URL", "bolt://neo4j:7687") @@ -622,6 +640,7 @@ nats_available = False # Tool Manager tool_manager = None +runtime_guard_engine = None # Models class FilterDecision(BaseModel): @@ -677,7 +696,7 @@ router_config = load_router_config() @app.on_event("startup") async def startup_event(): """Initialize NATS connection and subscriptions""" - global nc, nats_available, http_client, neo4j_driver, neo4j_available + global nc, nats_available, http_client, neo4j_driver, neo4j_available, runtime_guard_engine logger.info("🚀 DAGI Router v2.0.0 starting up...") # Initialize HTTP client @@ -736,6 +755,26 @@ async def startup_event(): tool_manager = None else: tool_manager = None + + # Initialize CLAN runtime guard + if RUNTIME_GUARD_AVAILABLE and RuntimeGuard and CLAN_RUNTIME_GUARD_ENABLED: + try: + runtime_guard_engine = RuntimeGuard( + registry_path=CLAN_RUNTIME_REGISTRY_PATH, + envelope_schema_path=CLAN_RUNTIME_ENVELOPE_SCHEMA_PATH, + artifact_schema_path=CLAN_RUNTIME_ARTIFACT_SCHEMA_PATH, + consent_event_schema_path=CLAN_RUNTIME_CONSENT_EVENT_SCHEMA_PATH, + mode=CLAN_RUNTIME_GUARD_MODE, + ) + logger.info( + "✅ CLAN Runtime Guard initialized " + f"(mode={CLAN_RUNTIME_GUARD_MODE}, registry={CLAN_RUNTIME_REGISTRY_PATH})" + ) + except Exception as e: + logger.warning(f"⚠️ Runtime Guard init failed: {e}") + runtime_guard_engine = None + else: + runtime_guard_engine = None # Log backend URLs logger.info(f"📡 Swapper URL: {SWAPPER_URL}") @@ -1099,11 +1138,14 @@ async def internal_llm_complete(request: InternalLLMRequest): {"name": "mistral", "api_key_env": "MISTRAL_API_KEY", "base_url": "https://api.mistral.ai", "model": "mistral-large-latest", "timeout": 60}, {"name": "grok", "api_key_env": "GROK_API_KEY", "base_url": "https://api.x.ai", "model": "grok-2-1212", "timeout": 60} ] - + + # Respect configured provider: local profiles should stay local. if provider in ["deepseek", "mistral", "grok"]: cloud_providers = sorted(cloud_providers, key=lambda x: 0 if x["name"] == provider else 1) - - # Try cloud providers + elif provider == "ollama": + cloud_providers = [] + + # Try cloud providers (only when provider is cloud) for cloud in cloud_providers: api_key = os.getenv(cloud["api_key_env"]) if not api_key: @@ -1129,18 +1171,19 @@ async def internal_llm_complete(request: InternalLLMRequest): logger.warning(f"Internal LLM {cloud['name']} failed: {e}") continue - # Fallback to Ollama + # Fallback/target local provider (Ollama) try: - logger.info("Internal LLM fallback to Ollama") + logger.info("Internal LLM to Ollama") + ollama_model = model or "qwen3:8b" ollama_resp = await http_client.post( "http://172.18.0.1:11434/api/generate", - json={"model": "qwen3:8b", "prompt": request.prompt, "system": request.system_prompt or "", "stream": False, "options": {"num_predict": max_tokens, "temperature": temperature}}, + json={"model": ollama_model, "prompt": request.prompt, "system": request.system_prompt or "", "stream": False, "options": {"num_predict": max_tokens, "temperature": temperature}}, timeout=120.0 ) if ollama_resp.status_code == 200: data = ollama_resp.json() latency = int((time_module.time() - t0) * 1000) - return InternalLLMResponse(text=data.get("response", ""), model="qwen3:8b", provider="ollama", tokens_used=0, latency_ms=latency) + return InternalLLMResponse(text=data.get("response", ""), model=ollama_model, provider="ollama", tokens_used=0, latency_ms=latency) except Exception as e: logger.error(f"Internal LLM Ollama failed: {e}") @@ -1246,6 +1289,55 @@ async def agent_infer(agent_id: str, request: InferRequest): # ========================================================================= if CREWAI_ROUTING_ENABLED and CREWAI_CLIENT_AVAILABLE: try: + runtime_envelope = None + if runtime_guard_engine and request_agent_id == "clan": + runtime_envelope = runtime_guard_engine.build_envelope( + agent_id=request_agent_id, + prompt=request.prompt, + metadata=effective_metadata, + ) + ok_pre, pre_info = runtime_guard_engine.pre_dispatch_checks(runtime_envelope) + if not ok_pre: + stop_payload = runtime_guard_engine.stop_payload(runtime_envelope, pre_info) + logger.warning( + "🛑 Runtime guard pre-dispatch stop: " + f"code={stop_payload.get('stop_code')} request_id={stop_payload.get('request_id')} " + f"input_hash={stop_payload.get('input_hash')}" + ) + return InferResponse( + response=json.dumps(stop_payload, ensure_ascii=False), + model="runtime-guard", + backend="runtime-guard", + tokens_used=0, + ) + if ( + CLAN_GUARD_TEST_MODE + and effective_metadata.get("guard_self_test") is True + and isinstance(effective_metadata.get("__inject_fake_agent_result"), dict) + ): + fake_result = effective_metadata.get("__inject_fake_agent_result") + ok_post, post_info = runtime_guard_engine.post_return_checks(runtime_envelope, fake_result) + if not ok_post: + stop_payload = runtime_guard_engine.stop_payload(runtime_envelope, post_info) + logger.warning( + "🧪 Runtime guard self-test stop: " + f"code={stop_payload.get('stop_code')} request_id={stop_payload.get('request_id')} " + f"input_hash={stop_payload.get('input_hash')}" + ) + return InferResponse( + response=json.dumps(stop_payload, ensure_ascii=False), + model="runtime-guard", + backend="runtime-guard", + tokens_used=0, + ) + logger.info("🧪 Runtime guard self-test passed (fake result accepted)") + return InferResponse( + response=json.dumps({"ok": True, "self_test": True}, ensure_ascii=False), + model="runtime-guard", + backend="runtime-guard", + tokens_used=0, + ) + # Get agent CrewAI config from registry (or router_config fallback) crewai_cfg = agent_config.get("crewai", {}) @@ -1273,13 +1365,52 @@ async def agent_infer(agent_id: str, request: InferRequest): "hash": system_prompt_hash, }, "metadata": effective_metadata, + "runtime_envelope": runtime_envelope, }, - team=crewai_cfg.get("team") + team=crewai_cfg.get("team"), + profile=effective_metadata.get("crewai_profile") ) latency = time.time() - t0 if crew_result.get("success") and crew_result.get("result"): + if runtime_guard_engine and request_agent_id == "clan" and runtime_envelope: + ok_post, post_info = runtime_guard_engine.post_return_checks(runtime_envelope, crew_result) + if not ok_post: + stop_payload = runtime_guard_engine.stop_payload(runtime_envelope, post_info) + logger.warning( + "🛑 Runtime guard post-return stop: " + f"code={stop_payload.get('stop_code')} request_id={stop_payload.get('request_id')} " + f"input_hash={stop_payload.get('input_hash')}" + ) + return InferResponse( + response=json.dumps(stop_payload, ensure_ascii=False), + model="runtime-guard", + backend="runtime-guard", + tokens_used=0, + ) + crew_result = runtime_guard_engine.stamp_result_artifacts(runtime_envelope, crew_result) + ok_stamp, stamp_info = runtime_guard_engine.ensure_stamped_trails(crew_result) + if not ok_stamp: + stop_payload = runtime_guard_engine.stop_payload(runtime_envelope, stamp_info) + logger.warning( + "🛑 Runtime guard stamped-trail stop: " + f"code={stop_payload.get('stop_code')} request_id={stop_payload.get('request_id')} " + f"input_hash={stop_payload.get('input_hash')}" + ) + return InferResponse( + response=json.dumps(stop_payload, ensure_ascii=False), + model="runtime-guard", + backend="runtime-guard", + tokens_used=0, + ) + for row in runtime_guard_engine.artifact_runtime_rows(runtime_envelope, crew_result): + logger.info(json.dumps(row, ensure_ascii=False)) + for row in runtime_guard_engine.consent_runtime_rows(runtime_envelope, crew_result): + logger.info(json.dumps(row, ensure_ascii=False)) + for row in (crew_result.get("artifact_state_transition_rows") or []): + if isinstance(row, dict): + logger.info(json.dumps(row, ensure_ascii=False)) logger.info(f"✅ CrewAI success for {agent_id}: {latency:.2f}s") # Store interaction in memory @@ -1677,127 +1808,224 @@ async def agent_infer(agent_id: str, request: InferRequest): logger.warning(f"🧹 Clearing DSML content from response ({len(response_text)} chars)") response_text = "" if tool_calls and tool_manager: - logger.info(f"🔧 LLM requested {len(tool_calls)} tool call(s)") - - # Execute each tool call - tool_results = [] - for tc in tool_calls: - func = tc.get("function", {}) - tool_name = func.get("name", "") - try: - tool_args = json.loads(func.get("arguments", "{}")) - except: - tool_args = {} - - result = await tool_manager.execute_tool( - tool_name, - tool_args, - agent_id=request_agent_id, - chat_id=chat_id, - user_id=user_id, - ) - tool_result_dict = { - "tool_call_id": tc.get("id", ""), - "name": tool_name, - "success": result.success, - "result": result.result, - "error": result.error, - "image_base64": result.image_base64, # Store image if generated - "file_base64": result.file_base64, - "file_name": result.file_name, - "file_mime": result.file_mime, - } - if result.image_base64: - logger.info(f"🖼️ Tool {tool_name} generated image: {len(result.image_base64)} chars") - tool_results.append(tool_result_dict) - - # Append tool results to messages and call LLM again - messages.append({"role": "assistant", "content": None, "tool_calls": tool_calls}) - - for tr in tool_results: - messages.append({ - "role": "tool", - "tool_call_id": tr["tool_call_id"], - "content": str(tr["result"]) if tr["success"] else f"Error: {tr['error']}" - }) - - # Second call to get final response - logger.info(f"🔄 Calling LLM again with tool results") - final_payload = { - "model": cloud["model"], - "messages": messages, - "max_tokens": max_tokens, - "temperature": temperature, - "stream": False + max_tool_rounds = int(os.getenv("ROUTER_TOOL_MAX_ROUNDS", "10")) + logger.info(f"🔧 LLM requested tool calls; running iterative mode up to {max_tool_rounds} rounds") + + all_tool_results = [] + current_tool_calls = tool_calls + rounds_done = 0 + oneok_ctx = { + "client_id": None, + "site_id": None, + "calc_result": None, + "quote_id": None, } - # Don't include tools in second call (some APIs don't support it) - # Tools are only needed in first call - - final_resp = await http_client.post( - f"{cloud['base_url']}/v1/chat/completions", - headers={ - "Authorization": f"Bearer {api_key}", - "Content-Type": "application/json" - }, - json=final_payload, - timeout=cloud["timeout"] - ) - - if final_resp.status_code == 200: - final_data = final_resp.json() - response_text = final_data.get("choices", [{}])[0].get("message", {}).get("content", "") - - # CRITICAL: Check for DSML in second response too! - if response_text and ("DSML" in response_text or "invoke name=" in response_text or "function_calls>" in response_text): - prefix_before_dsml = _strip_dsml_keep_text_before(response_text) - if prefix_before_dsml: - logger.warning(f"🧹 DSML in 2nd response: keeping text before DSML ({len(prefix_before_dsml)} chars), discarding {len(response_text) - len(prefix_before_dsml)} chars") - response_text = prefix_before_dsml + repeated_failures = {} + + while current_tool_calls and rounds_done < max_tool_rounds: + rounds_done += 1 + logger.info(f"🔁 Tool round {rounds_done}/{max_tool_rounds}: {len(current_tool_calls)} call(s)") + round_results = [] + abort_loop_due_repeats = False + + for tc in current_tool_calls: + func = tc.get("function", {}) + tool_name = func.get("name", "") + try: + tool_args = json.loads(func.get("arguments", "{}")) + except Exception: + tool_args = {} + + # Light auto-repair for 1OK multi-step flows when model omits required args. + if request_agent_id == "oneok": + if tool_name == "calc_window_quote": + ip = (tool_args or {}).get("input_payload") + if isinstance(ip, dict) and isinstance(ip.get("windows"), list) and "window_units" not in ip: + ip2 = dict(ip) + ip2["window_units"] = ip.get("windows") + tool_args = dict(tool_args or {}) + tool_args["input_payload"] = ip2 + elif tool_name == "crm_create_quote": + quote_payload = (tool_args or {}).get("quote_payload") + if not isinstance(quote_payload, dict): + calc_res = oneok_ctx.get("calc_result") or {} + line_items = calc_res.get("line_items") if isinstance(calc_res, dict) else None + totals = calc_res.get("totals") if isinstance(calc_res, dict) else None + if isinstance(line_items, list) and isinstance(totals, dict): + tool_args = { + "quote_payload": { + "client_id": oneok_ctx.get("client_id"), + "site_id": oneok_ctx.get("site_id"), + "currency": calc_res.get("currency", "UAH"), + "line_items": line_items, + "totals": totals, + "assumptions": calc_res.get("assumptions", []), + "validity_days": 14, + "lead_time_estimate": calc_res.get("lead_time_if_known") or calc_res.get("lead_time_estimate"), + } + } + logger.info("🛠️ oneok: auto-filled crm_create_quote.quote_payload from calc context") + elif tool_name == "docs_render_quote_pdf": + quote_id = (tool_args or {}).get("quote_id") + quote_payload = (tool_args or {}).get("quote_payload") + if not quote_id and not isinstance(quote_payload, dict) and oneok_ctx.get("quote_id"): + tool_args = {"quote_id": oneok_ctx.get("quote_id")} + logger.info("🛠️ oneok: auto-filled docs_render_quote_pdf.quote_id from quote context") + elif tool_name == "schedule_propose_slots": + params = (tool_args or {}).get("params") + if not isinstance(params, dict): + tool_args = {"params": {"count": 3, "timezone": "Europe/Kyiv"}} + logger.info("🛠️ oneok: auto-filled schedule_propose_slots.params") + + result = await tool_manager.execute_tool( + tool_name, + tool_args, + agent_id=request_agent_id, + chat_id=chat_id, + user_id=user_id, + ) + tool_result_dict = { + "tool_call_id": tc.get("id", ""), + "name": tool_name, + "success": result.success, + "result": result.result, + "error": result.error, + "image_base64": result.image_base64, + "file_base64": result.file_base64, + "file_name": result.file_name, + "file_mime": result.file_mime, + } + if result.image_base64: + logger.info(f"🖼️ Tool {tool_name} generated image: {len(result.image_base64)} chars") + round_results.append(tool_result_dict) + all_tool_results.append(tool_result_dict) + + # Track oneok context to help subsequent tool calls in the same request. + if request_agent_id == "oneok" and result.success and isinstance(result.result, dict): + if tool_name == "crm_upsert_client": + oneok_ctx["client_id"] = result.result.get("client_id") or oneok_ctx.get("client_id") + elif tool_name == "crm_upsert_site": + oneok_ctx["site_id"] = result.result.get("site_id") or oneok_ctx.get("site_id") + elif tool_name == "calc_window_quote": + oneok_ctx["calc_result"] = result.result + elif tool_name == "crm_create_quote": + oneok_ctx["quote_id"] = result.result.get("quote_id") or oneok_ctx.get("quote_id") + + # Guardrail: stop if model repeats same failing tool call too many times. + sig = f"{tool_name}:{json.dumps(tool_args, ensure_ascii=False, sort_keys=True, default=str)}" + if result.success: + repeated_failures.pop(sig, None) else: - logger.warning(f"🧹 DSML detected in 2nd LLM response, trying 3rd call ({len(response_text)} chars)") - # Third LLM call: explicitly ask to synthesize tool results - tool_summary_parts = [] - for tr in tool_results: - if tr.get("success") and tr.get("result"): - res_text = str(tr["result"])[:500] - tool_summary_parts.append(f"Tool '{tr['name']}' returned: {res_text}") - if tool_summary_parts: - synthesis_prompt = "Based on the following tool results, provide a helpful response to the user in their language. Do NOT use any markup or XML. Just respond naturally.\n\n" + "\n".join(tool_summary_parts) - try: - synth_resp = await http_client.post( - f"{cloud['base_url']}/v1/chat/completions", - headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, - json={"model": cloud["model"], "messages": [ - {"role": "system", "content": system_prompt or "You are a helpful assistant. Respond naturally."}, - {"role": "user", "content": synthesis_prompt} - ], "max_tokens": max_tokens, "temperature": 0.3, "stream": False}, - timeout=cloud["timeout"] - ) - if synth_resp.status_code == 200: - synth_data = synth_resp.json() - synth_text = synth_data.get("choices", [{}])[0].get("message", {}).get("content", "") - if synth_text and "DSML" not in synth_text and "invoke" not in synth_text: - response_text = synth_text - tokens_used += synth_data.get("usage", {}).get("total_tokens", 0) - logger.info("\u2705 3rd LLM call synthesized clean response from tool results") - else: - response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected") - else: - response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected") - except Exception as synth_err: - logger.warning(f"3rd LLM call failed: {synth_err}") - response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected") - else: - response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected") - - if not response_text: - logger.warning(f"⚠️ {cloud['name'].upper()} returned empty response after tool call") - # Fallback to tool result summary - response_text = format_tool_calls_for_response(tool_results, fallback_mode="empty_response") - tokens_used += final_data.get("usage", {}).get("total_tokens", 0) - else: - logger.error(f"❌ {cloud['name'].upper()} second call failed: {final_resp.status_code} - {final_resp.text[:200]}") - # Fallback to tool result summary + repeated_failures[sig] = repeated_failures.get(sig, 0) + 1 + if repeated_failures[sig] >= 3: + logger.warning(f"⚠️ Repeated failing tool call detected ({tool_name}) x{repeated_failures[sig]}; breaking iterative loop") + abort_loop_due_repeats = True + break + + if abort_loop_due_repeats: + current_tool_calls = [] + response_text = response_text or format_tool_calls_for_response(all_tool_results, fallback_mode="empty_response") + break + + messages.append({"role": "assistant", "content": None, "tool_calls": current_tool_calls}) + for tr in round_results: + messages.append({ + "role": "tool", + "tool_call_id": tr["tool_call_id"], + "content": str(tr["result"]) if tr["success"] else f"Error: {tr['error']}" + }) + + logger.info("🔄 Calling LLM again after tool round") + loop_payload = { + "model": cloud["model"], + "messages": messages, + "max_tokens": max_tokens, + "temperature": temperature, + "stream": False + } + # Keep tools enabled for multi-step chains. + if tools_payload and cloud["name"] in ["deepseek", "mistral", "grok"]: + loop_payload["tools"] = tools_payload + loop_payload["tool_choice"] = "auto" + + loop_resp = await http_client.post( + f"{cloud['base_url']}/v1/chat/completions", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + }, + json=loop_payload, + timeout=cloud["timeout"] + ) + + if loop_resp.status_code != 200: + logger.error(f"❌ {cloud['name'].upper()} loop call failed: {loop_resp.status_code} - {loop_resp.text[:200]}") + response_text = format_tool_calls_for_response(all_tool_results, fallback_mode="empty_response") + current_tool_calls = [] + break + + loop_data = loop_resp.json() + loop_message = loop_data.get("choices", [{}])[0].get("message", {}) + response_text = loop_message.get("content", "") or "" + tokens_used += loop_data.get("usage", {}).get("total_tokens", 0) + current_tool_calls = loop_message.get("tool_calls", []) + + # DSML fallback parsing for providers that return markup instead of tool_calls. + has_dsml_loop = False + if response_text: + dsml_patterns_check = [ + r'DSML', + r'function_calls>', + r'invoke\s*name\s*=', + r'parameter\s*name\s*=', + r'<[^>]*invoke[^>]*>', + r']*invoke[^>]*>', + ] + for pattern in dsml_patterns_check: + if re.search(pattern, response_text, re.IGNORECASE): + has_dsml_loop = True + logger.warning(f"⚠️ DSML detected in loop via pattern: {pattern}") + break + + if has_dsml_loop and not current_tool_calls: + dsml_patterns = [ + r'invoke name="(\w+)".*?parameter name="(\w+)"[^>]*>([^<]+)', + r'invoke\s+name="(\w+)".*?parameter\s+name="(\w+)"[^>]*>([^<]+)', + ] + dsml_match = None + for pattern in dsml_patterns: + dsml_match = re.search(pattern, response_text, re.DOTALL | re.IGNORECASE) + if dsml_match: + break + if dsml_match and len(dsml_match.groups()) >= 3: + import string + import random + tool_call_id = ''.join(random.choices(string.ascii_letters + string.digits, k=9)) + current_tool_calls = [{ + "id": tool_call_id, + "function": { + "name": dsml_match.group(1), + "arguments": json.dumps({dsml_match.group(2): dsml_match.group(3).strip()}) + } + }] + response_text = "" + + tool_results = all_tool_results + + if current_tool_calls: + logger.warning(f"⚠️ Reached max tool rounds ({max_tool_rounds}) for {request_agent_id}, returning summary") + response_text = response_text or format_tool_calls_for_response(tool_results, fallback_mode="empty_response") + + if response_text and ("DSML" in response_text or "invoke name=" in response_text or "function_calls>" in response_text): + prefix_before_dsml = _strip_dsml_keep_text_before(response_text) + if prefix_before_dsml: + logger.warning(f"🧹 DSML in loop final response: keeping text before DSML ({len(prefix_before_dsml)} chars)") + response_text = prefix_before_dsml + else: + response_text = format_tool_calls_for_response(tool_results, fallback_mode="dsml_detected") + + if not response_text: + logger.warning(f"⚠️ {cloud['name'].upper()} returned empty response after iterative tool calls") response_text = format_tool_calls_for_response(tool_results, fallback_mode="empty_response") if response_text: diff --git a/services/router/runtime_guard.py b/services/router/runtime_guard.py new file mode 100644 index 00000000..f4034103 --- /dev/null +++ b/services/router/runtime_guard.py @@ -0,0 +1,856 @@ +import hashlib +import json +import re +import time +import uuid +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import yaml + +VISIBILITY = ["public", "interclan", "incircle", "soulsafe", "sacred"] +VIS_RANK = {v: i for i, v in enumerate(VISIBILITY)} +CONSENT = {"none", "pending", "confirmed"} + +STOP_SCHEMA_ENVELOPE = "STOP_SCHEMA_ENVELOPE" +STOP_SCHEMA_ARTIFACT = "STOP_SCHEMA_ARTIFACT" +STOP_PROVENANCE_INVALID = "STOP_PROVENANCE_INVALID" +STOP_CONSENT_EVENT_INVALID = "STOP_CONSENT_EVENT_INVALID" +STOP_CONSENT_EVENT_MISSING = "STOP_CONSENT_EVENT_MISSING" +STOP_CONSENT_QUORUM_NOT_MET = "STOP_CONSENT_QUORUM_NOT_MET" +STOP_AGENT_UNKNOWN = "STOP_AGENT_UNKNOWN" +STOP_OUTPUT_NOT_ALLOWED = "STOP_OUTPUT_NOT_ALLOWED" +STOP_VISIBILITY_ESCALATION = "STOP_VISIBILITY_ESCALATION" +STOP_CONSENT_MISSING = "STOP_CONSENT_MISSING" +STOP_SECRETS_DETECTED = "STOP_SECRETS_DETECTED" +STOP_INJECTION_ATTEMPT = "STOP_INJECTION_ATTEMPT" +STOP_EXPORT_PAYLOAD_NOT_PUBLIC = "STOP_EXPORT_PAYLOAD_NOT_PUBLIC" + +CONSENT_TRANSITION_MAP: Dict[str, Dict[str, Dict[str, str]]] = { + "testimony_draft": { + "approve": {"to": "confirmed", "op": "validated"}, + "reject": {"to": "rejected", "op": "validated"}, + "revoke": {"to": "revoked", "op": "corrected"}, + }, + "decision_record_draft": { + "approve": {"to": "confirmed", "op": "validated"}, + "reject": {"to": "rejected", "op": "validated"}, + "revoke": {"to": "revoked", "op": "corrected"}, + }, + "bridge_request_draft": { + "approve": {"to": "approved_for_execution", "op": "export_validated"}, + "reject": {"to": "rejected", "op": "validated"}, + "revoke": {"to": "revoked", "op": "corrected"}, + }, + "export_payload_manifest": { + "approve": {"to": "approved_for_execution", "op": "export_validated"}, + "reject": {"to": "rejected", "op": "validated"}, + "revoke": {"to": "revoked", "op": "corrected"}, + }, + "allocation_proposal": { + "approve": {"to": "approved_for_execution", "op": "validated"}, + "reject": {"to": "rejected", "op": "validated"}, + "revoke": {"to": "revoked", "op": "corrected"}, + }, + "access_grant_draft": { + "approve": {"to": "approved_for_execution", "op": "policy_checked"}, + "reject": {"to": "rejected", "op": "validated"}, + "revoke": {"to": "revoked", "op": "corrected"}, + }, + "visibility_change_draft": { + "approve": {"to": "approved_for_execution", "op": "policy_checked"}, + "reject": {"to": "rejected", "op": "validated"}, + "revoke": {"to": "revoked", "op": "corrected"}, + }, + "offline_merge_plan": { + "approve": {"to": "approved_for_execution", "op": "merged"}, + "reject": {"to": "rejected", "op": "validated"}, + "revoke": {"to": "revoked", "op": "corrected"}, + }, + "core_change_draft": { + "approve": {"to": "needs_confirmation", "op": "policy_checked"}, + "reject": {"to": "rejected", "op": "validated"}, + "revoke": {"to": "revoked", "op": "corrected"}, + }, +} + + +class RuntimeGuard: + def __init__( + self, + registry_path: str, + envelope_schema_path: str, + artifact_schema_path: str, + consent_event_schema_path: Optional[str] = None, + mode: str = "soft", + ) -> None: + self.registry_path = Path(registry_path) + self.envelope_schema_path = Path(envelope_schema_path) + self.artifact_schema_path = Path(artifact_schema_path) + self.consent_event_schema_path = Path(consent_event_schema_path) if consent_event_schema_path else None + self.mode = mode if mode in {"soft", "strict"} else "soft" + self.registry = self._load_registry() + self.envelope_schema = self._load_json(self.envelope_schema_path) + self.artifact_schema = self._load_json(self.artifact_schema_path) + self.consent_event_schema = ( + self._load_json(self.consent_event_schema_path) if self.consent_event_schema_path else {} + ) + + def _load_json(self, path: Path) -> Dict[str, Any]: + try: + if path.exists(): + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + pass + return {} + + def _load_registry(self) -> Dict[str, Any]: + try: + if self.registry_path.exists(): + raw = yaml.safe_load(self.registry_path.read_text(encoding="utf-8")) or {} + if isinstance(raw, dict): + return raw + except Exception: + pass + return {} + + def reload(self) -> None: + self.registry = self._load_registry() + self.envelope_schema = self._load_json(self.envelope_schema_path) + self.artifact_schema = self._load_json(self.artifact_schema_path) + self.consent_event_schema = ( + self._load_json(self.consent_event_schema_path) if self.consent_event_schema_path else {} + ) + + def get_agent(self, agent_id: str) -> Optional[Dict[str, Any]]: + workers = self.registry.get("workers", []) + if not isinstance(workers, list): + return None + for w in workers: + if isinstance(w, dict) and w.get("agent_id") == agent_id: + return w + mgr = self.registry.get("manager") + if isinstance(mgr, dict) and mgr.get("agent_id") == agent_id: + return mgr + return None + + def _hash(self, text: str) -> str: + return hashlib.sha256((text or "").encode("utf-8")).hexdigest()[:12] + + def build_envelope(self, agent_id: str, prompt: str, metadata: Dict[str, Any]) -> Dict[str, Any]: + md = metadata or {} + request_id = str(md.get("request_id") or f"rt-{int(time.time() * 1000)}-{self._hash(prompt)}") + env = { + "request_id": request_id, + "agent_id": agent_id, + "circle_context": md.get("circle_context") if isinstance(md.get("circle_context"), dict) else {}, + "visibility_level_target": str(md.get("visibility_level_target") or "incircle"), + "sensitivity_flags": md.get("sensitivity_flags") if isinstance(md.get("sensitivity_flags"), list) else [], + "consent_status": str(md.get("consent_status") or "none"), + "allowed_actions": md.get("allowed_actions") if isinstance(md.get("allowed_actions"), list) else ["analyze"], + "expected_output": md.get("expected_output") or "__runtime_default__", + "input_text": prompt or "", + "requires_consent": bool(md.get("requires_consent")), + "export_intent": bool(md.get("export_intent")), + "constitution_version": str(md.get("constitution_version") or "JOS_BASE@1.0.0"), + "agent_prompt_version": str(md.get("agent_prompt_version") or md.get("system_prompt_version") or ""), + "router_guard_version": str(md.get("router_guard_version") or "runtime_guard@1.0.0"), + "protocol_version": str(md.get("protocol_version") or "CLAN_AGENT_INTERACTION_PROTOCOL_V1@1.0.0"), + "consent_events": md.get("consent_events") if isinstance(md.get("consent_events"), (dict, list)) else {}, + "provenance": {"source": md.get("source", "router"), "channel": md.get("channel")}, + } + return env + + def _detect_secrets(self, text: str) -> bool: + if not text: + return False + t = text.lower() + patterns = [ + r"seed\s*phrase", + r"private\s*key", + r"api[_-]?key", + r"token", + r"парол", + r"приватн(ый|ий)\s+ключ", + r"сид[-\s]?фраз", + r"\b0x[a-f0-9]{40,}\b", + r"[A-Za-z0-9_\-]{24,}:[A-Za-z0-9_\-]{20,}", + ] + return any(re.search(p, t) for p in patterns) + + def _detect_injection(self, text: str) -> bool: + if not text: + return False + t = text.lower() + markers = [ + "ignore constitution", + "ignore system prompt", + "обійди конституцію", + "ігноруй конституцію", + "покажи секрет", + "show secret", + "виконай без згоди", + "execute without consent", + ] + return any(m in t for m in markers) + + def _validate_envelope_shape(self, env: Dict[str, Any]) -> List[str]: + errors: List[str] = [] + required = self.envelope_schema.get("required", []) if isinstance(self.envelope_schema, dict) else [] + for key in required: + if key not in env: + errors.append(f"missing:{key}") + if not env.get("request_id"): + errors.append("missing:request_id") + if env.get("visibility_level_target") not in VISIBILITY: + errors.append("invalid:visibility_level_target") + if env.get("consent_status") not in CONSENT: + errors.append("invalid:consent_status") + actions = env.get("allowed_actions") + if not isinstance(actions, list) or len(actions) == 0: + errors.append("invalid:allowed_actions") + if not isinstance(env.get("input_text"), str) or not env.get("input_text", "").strip(): + errors.append("invalid:input_text") + return errors + + def pre_dispatch_checks(self, env: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: + errs = self._validate_envelope_shape(env) + if errs and self.mode == "strict": + return False, {"stop_code": STOP_SCHEMA_ENVELOPE, "details": errs[:10]} + if errs and self.mode == "soft": + # Continue in soft mode after coercion-like defaults. + pass + + agent = self.get_agent(env.get("agent_id", "")) + if not agent: + return False, {"stop_code": STOP_AGENT_UNKNOWN, "details": [env.get("agent_id", "")]} + + if self._detect_secrets(env.get("input_text", "")): + return False, {"stop_code": STOP_SECRETS_DETECTED, "details": ["input_contains_secret_like_pattern"]} + + if self._detect_injection(env.get("input_text", "")): + return False, {"stop_code": STOP_INJECTION_ATTEMPT, "details": ["input_attempts_policy_bypass"]} + + expected = env.get("expected_output") + allowed_outputs = agent.get("allowed_outputs", []) if isinstance(agent, dict) else [] + if expected and expected != "__runtime_default__" and isinstance(allowed_outputs, list): + if expected not in allowed_outputs: + return False, {"stop_code": STOP_OUTPUT_NOT_ALLOWED, "details": [str(expected)]} + + if env.get("requires_consent") and env.get("consent_status") != "confirmed": + return False, {"stop_code": STOP_CONSENT_MISSING, "details": ["requires_consent=true"]} + + if env.get("export_intent") and env.get("visibility_level_target") not in {"public", "interclan"}: + return False, {"stop_code": STOP_EXPORT_PAYLOAD_NOT_PUBLIC, "details": [env.get("visibility_level_target")]} + + return True, {"ok": True, "agent": agent} + + def _validate_artifact_shape(self, artifact: Dict[str, Any]) -> List[str]: + errors: List[str] = [] + required = self.artifact_schema.get("required", []) if isinstance(self.artifact_schema, dict) else [] + for key in required: + if key not in artifact: + errors.append(f"missing:{key}") + if artifact.get("visibility_level") not in VISIBILITY: + errors.append("invalid:visibility_level") + if artifact.get("status") not in { + "draft", + "needs_confirmation", + "waiting_for_consent", + "confirmed", + "proposed", + "approved_for_execution", + "rejected", + "revoked", + }: + errors.append("invalid:status") + return errors + + def _validate_provenance_min(self, provenance: Any) -> List[str]: + if not isinstance(provenance, list) or len(provenance) < 1: + return ["invalid:provenance"] + errors: List[str] = [] + for idx, trail in enumerate(provenance): + if not isinstance(trail, dict): + errors.append(f"invalid:provenance[{idx}]") + continue + if not trail.get("event_id"): + errors.append(f"missing:provenance[{idx}].event_id") + ts = trail.get("ts") + if not isinstance(ts, int) or ts < 0: + errors.append(f"invalid:provenance[{idx}].ts") + actor = trail.get("actor") + if not isinstance(actor, dict) or not actor.get("type") or not actor.get("id"): + errors.append(f"missing:provenance[{idx}].actor") + source = trail.get("source") + if not isinstance(source, dict) or not source.get("channel") or not source.get("request_id"): + errors.append(f"missing:provenance[{idx}].source") + operation = trail.get("operation") + if not isinstance(operation, dict) or not operation.get("op"): + errors.append(f"missing:provenance[{idx}].operation") + versions = trail.get("versions") + if not isinstance(versions, dict) or not versions.get("constitution_version"): + errors.append(f"missing:provenance[{idx}].versions.constitution_version") + return errors + + def post_return_checks(self, env: Dict[str, Any], result: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: + agent = self.get_agent(env.get("agent_id", "")) + allowed_outputs = agent.get("allowed_outputs", []) if isinstance(agent, dict) else [] + target = env.get("visibility_level_target", "incircle") + target_rank = VIS_RANK.get(target, VIS_RANK["incircle"]) + + text_result = str(result.get("result") or "") + if self._detect_secrets(text_result): + return False, {"stop_code": STOP_SECRETS_DETECTED, "details": ["output_contains_secret_like_pattern"]} + + artifacts = result.get("artifacts") + if artifacts is None: + return True, {"ok": True} + if not isinstance(artifacts, list): + return False, {"stop_code": STOP_SCHEMA_ARTIFACT, "details": ["artifacts_not_list"]} + + for art in artifacts: + if not isinstance(art, dict): + return False, {"stop_code": STOP_SCHEMA_ARTIFACT, "details": ["artifact_not_object"]} + errs = self._validate_artifact_shape(art) + if errs: + return False, {"stop_code": STOP_SCHEMA_ARTIFACT, "details": errs[:10]} + prov_errs = self._validate_provenance_min(art.get("provenance")) + if prov_errs: + return False, {"stop_code": STOP_PROVENANCE_INVALID, "details": prov_errs[:10]} + + if isinstance(allowed_outputs, list) and art.get("type") not in allowed_outputs: + return False, {"stop_code": STOP_OUTPUT_NOT_ALLOWED, "details": [str(art.get('type'))]} + + art_rank = VIS_RANK.get(art.get("visibility_level"), target_rank) + if art_rank < target_rank: + return False, { + "stop_code": STOP_VISIBILITY_ESCALATION, + "details": [str(art.get("visibility_level")), str(target)], + } + + if env.get("consent_status") != "confirmed" and art.get("status") == "confirmed": + return False, {"stop_code": STOP_CONSENT_MISSING, "details": ["artifact.confirmed_without_consent"]} + + ok_consent, consent_info = self._validate_confirmed_artifact_consent(env, art) + if not ok_consent: + return False, consent_info + + if self._detect_secrets(json.dumps(art.get("content", ""), ensure_ascii=False)): + return False, {"stop_code": STOP_SECRETS_DETECTED, "details": ["artifact_content_secret_like_pattern"]} + + return True, {"ok": True} + + def _artifact_is_confirmed(self, artifact: Dict[str, Any]) -> bool: + if str(artifact.get("status") or "") == "confirmed": + return True + prov = artifact.get("provenance") + if not isinstance(prov, list): + return False + for tr in prov: + if not isinstance(tr, dict): + continue + ctx = tr.get("context") if isinstance(tr.get("context"), dict) else {} + if ctx.get("consent_status") == "confirmed": + return True + return False + + def _extract_consent_ref(self, artifact: Dict[str, Any]) -> str: + prov = artifact.get("provenance") + if not isinstance(prov, list): + return "" + for tr in reversed(prov): + if not isinstance(tr, dict): + continue + ctx = tr.get("context") if isinstance(tr.get("context"), dict) else {} + ref = str(ctx.get("consent_event_ref") or "").strip() + if ref: + return ref + return "" + + def _get_consent_event(self, env: Dict[str, Any], consent_ref: str) -> Optional[Dict[str, Any]]: + source = env.get("consent_events") + if isinstance(source, dict): + event = source.get(consent_ref) + return event if isinstance(event, dict) else None + if isinstance(source, list): + for ev in source: + if not isinstance(ev, dict): + continue + if str(ev.get("consent_event_id") or "") == consent_ref: + return ev + return None + + def _validate_consent_event_min(self, event: Dict[str, Any]) -> List[str]: + errs: List[str] = [] + if not isinstance(event, dict): + return ["invalid:consent_event"] + if not str(event.get("consent_event_id") or "").startswith("ce_"): + errs.append("invalid:consent_event_id") + ts = event.get("ts") + if not isinstance(ts, int) or ts < 0: + errs.append("invalid:ts") + scope = event.get("scope") + if not isinstance(scope, dict) or not scope.get("circle_id"): + errs.append("missing:scope.circle_id") + decision = event.get("decision") + dtype = str((decision or {}).get("type") or "") + if dtype not in {"approve", "reject", "revoke"}: + errs.append("invalid:decision.type") + target = event.get("target") + if not isinstance(target, dict): + errs.append("invalid:target") + else: + if not target.get("target_type"): + errs.append("missing:target.target_type") + if not isinstance(target.get("artifact_ids"), list) or len(target.get("artifact_ids")) < 1: + errs.append("invalid:target.artifact_ids") + if not target.get("operation"): + errs.append("missing:target.operation") + confirmations = event.get("confirmations") + if dtype in {"approve", "reject", "revoke"}: + if not isinstance(confirmations, list) or len(confirmations) < 1: + errs.append("invalid:confirmations") + quorum = event.get("quorum") + if not isinstance(quorum, dict): + errs.append("invalid:quorum") + else: + required = quorum.get("required") + present = quorum.get("present") + if not isinstance(required, int) or required < 1: + errs.append("invalid:quorum.required") + if not isinstance(present, int) or present < 0: + errs.append("invalid:quorum.present") + versions = event.get("versions") + if not isinstance(versions, dict) or not versions.get("constitution_version"): + errs.append("missing:versions.constitution_version") + provenance = event.get("provenance") + if not isinstance(provenance, dict) or not provenance.get("request_id"): + errs.append("missing:provenance.request_id") + return errs + + def _validate_confirmed_artifact_consent(self, env: Dict[str, Any], artifact: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: + if not self._artifact_is_confirmed(artifact): + return True, {"ok": True} + consent_ref = self._extract_consent_ref(artifact) + if not consent_ref: + return False, {"stop_code": STOP_CONSENT_EVENT_MISSING, "details": ["missing:consent_event_ref"]} + event = self._get_consent_event(env, consent_ref) + if not event: + return False, {"stop_code": STOP_CONSENT_EVENT_MISSING, "details": [f"missing:consent_event:{consent_ref}"]} + event_errs = self._validate_consent_event_min(event) + if event_errs: + return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": event_errs[:10]} + if str((event.get("decision") or {}).get("type") or "") != "approve": + return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": ["invalid:decision.type_not_approve"]} + art_id = str(artifact.get("id") or artifact.get("artifact_id") or "").strip() + if not art_id: + return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": ["missing:artifact.id"]} + target_ids = (event.get("target") or {}).get("artifact_ids") + if not isinstance(target_ids, list) or art_id not in [str(x) for x in target_ids]: + return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": ["artifact_not_in_consent_target"]} + confirmations = event.get("confirmations") if isinstance(event.get("confirmations"), list) else [] + quorum = event.get("quorum") if isinstance(event.get("quorum"), dict) else {} + required = quorum.get("required") + present = quorum.get("present") + if not isinstance(required, int) or not isinstance(present, int): + return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": ["invalid:quorum"]} + if len(confirmations) < required or present < required: + return False, { + "stop_code": STOP_CONSENT_QUORUM_NOT_MET, + "details": [f"confirmations={len(confirmations)}", f"required={required}", f"present={present}"], + } + return True, {"ok": True} + + def _has_router_stamped_trail(self, provenance: Any) -> bool: + if not isinstance(provenance, list): + return False + for trail in provenance: + if not isinstance(trail, dict): + continue + actor = trail.get("actor") if isinstance(trail.get("actor"), dict) else {} + op = trail.get("operation") if isinstance(trail.get("operation"), dict) else {} + if actor.get("id") == "system:router" and op.get("op") == "stamped": + return True + return False + + def ensure_stamped_trails(self, result: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: + artifacts = result.get("artifacts") if isinstance(result, dict) else None + if artifacts is None: + return True, {"ok": True} + if not isinstance(artifacts, list): + return False, {"stop_code": STOP_SCHEMA_ARTIFACT, "details": ["artifacts_not_list"]} + for idx, art in enumerate(artifacts): + if not isinstance(art, dict): + return False, {"stop_code": STOP_SCHEMA_ARTIFACT, "details": [f"artifact_not_object:{idx}"]} + if not self._has_router_stamped_trail(art.get("provenance")): + return False, { + "stop_code": STOP_PROVENANCE_INVALID, + "details": [f"missing:provenance[{idx}].stamped_trail"], + } + return True, {"ok": True} + + def _normalize_provenance(self, provenance: Any) -> List[Dict[str, Any]]: + if not isinstance(provenance, list): + return [] + out: List[Dict[str, Any]] = [] + seen_ids = set() + seen_fallback = set() + for item in provenance: + if not isinstance(item, dict): + continue + event_id = str(item.get("event_id") or "").strip() + if event_id and event_id in seen_ids: + continue + if not event_id: + fallback_key = "|".join( + [ + str(item.get("ts", "")), + str((item.get("actor") or {}).get("id", "")), + str((item.get("operation") or {}).get("op", "")), + str((item.get("operation") or {}).get("input_hash", "")), + ] + ) + if fallback_key in seen_fallback: + continue + seen_fallback.add(fallback_key) + else: + seen_ids.add(event_id) + out.append(item) + return out + + def _build_stamp_trail(self, env: Dict[str, Any], artifact: Dict[str, Any]) -> Dict[str, Any]: + now_ts = int(time.time()) + rid = str(env.get("request_id") or "") + aid = str(env.get("agent_id") or "") + vlevel = str(artifact.get("visibility_level") or env.get("visibility_level_target") or "incircle") + raw = json.dumps(artifact, ensure_ascii=False, sort_keys=True) + return { + "event_id": f"prov_{uuid.uuid4().hex}", + "ts": now_ts, + "actor": { + "type": "system", + "id": "system:router", + "display": "Runtime Guard", + }, + "source": { + "channel": str((env.get("provenance") or {}).get("channel") or "internal"), + "request_id": rid, + "session_id": str(((env.get("source") or {}) if isinstance(env.get("source"), dict) else {}).get("session_id") or ""), + "message_id": str(((env.get("source") or {}) if isinstance(env.get("source"), dict) else {}).get("message_id") or ""), + }, + "context": { + "circle_id": str(((env.get("circle_context") or {}) if isinstance(env.get("circle_context"), dict) else {}).get("circle_id") or ""), + "circle_name": str(((env.get("circle_context") or {}) if isinstance(env.get("circle_context"), dict) else {}).get("circle_name") or ""), + "gate_level": str(((env.get("circle_context") or {}) if isinstance(env.get("circle_context"), dict) else {}).get("gate_level") or ""), + "visibility_level": vlevel, + "consent_status": str(env.get("consent_status") or "none"), + "consent_event_ref": str((env.get("provenance") or {}).get("consent_event_ref") or ""), + }, + "operation": { + "op": "stamped", + "input_hash": f"sha256:{self._hash(str(env.get('input_text', '')))}", + "output_hash": f"sha256:{self._hash(raw)}", + "notes": f"runtime stamping for agent:{aid}", + }, + "versions": { + "constitution_version": str(env.get("constitution_version") or "JOS_BASE@1.0.0"), + "agent_prompt_version": str(env.get("agent_prompt_version") or ""), + "router_guard_version": str(env.get("router_guard_version") or "runtime_guard@1.0.0"), + "protocol_version": str(env.get("protocol_version") or "CLAN_AGENT_INTERACTION_PROTOCOL_V1@1.0.0"), + }, + "links": { + "parent_artifact_ids": [], + "related_artifact_ids": [], + "external_refs": [], + }, + } + + def stamp_result_artifacts(self, env: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]: + if not isinstance(result, dict): + return result + artifacts = result.get("artifacts") + if not isinstance(artifacts, list): + return result + for artifact in artifacts: + if not isinstance(artifact, dict): + continue + existing = self._normalize_provenance(artifact.get("provenance")) + trail = self._build_stamp_trail(env, artifact) + artifact["provenance"] = self._normalize_provenance(existing + [trail]) + return result + + def artifact_runtime_rows(self, env: Dict[str, Any], result: Dict[str, Any]) -> List[Dict[str, Any]]: + rows: List[Dict[str, Any]] = [] + if not isinstance(result, dict): + return rows + artifacts = result.get("artifacts") + if not isinstance(artifacts, list): + return rows + for art in artifacts: + if not isinstance(art, dict): + continue + status = str(art.get("status") or "draft") + prov = art.get("provenance") + has_prov = isinstance(prov, list) and len(prov) > 0 + visibility = art.get("visibility_level") + has_vis = visibility in VISIBILITY + rows.append( + { + "event": "artifact_emitted", + "request_id": env.get("request_id"), + "agent_id": env.get("agent_id"), + "artifact_type": art.get("type"), + "visibility_level": visibility, + "has_provenance": has_prov, + "provenance_trails_count": len(prov) if isinstance(prov, list) else 0, + "status": status, + "needs_confirmation": status in {"needs_confirmation", "waiting_for_consent"}, + "has_visibility_and_provenance": bool(has_vis and has_prov), + "constitution_version": env.get("constitution_version"), + "router_guard_version": env.get("router_guard_version"), + } + ) + return rows + + def consent_runtime_rows(self, env: Dict[str, Any], result: Dict[str, Any]) -> List[Dict[str, Any]]: + rows: List[Dict[str, Any]] = [] + if not isinstance(result, dict): + return rows + artifacts = result.get("artifacts") + if not isinstance(artifacts, list): + return rows + now_ts = int(time.time()) + for art in artifacts: + if not isinstance(art, dict): + continue + if not self._artifact_is_confirmed(art): + continue + consent_ref = self._extract_consent_ref(art) + if not consent_ref: + continue + event = self._get_consent_event(env, consent_ref) + if not isinstance(event, dict): + continue + ok, _ = self._validate_confirmed_artifact_consent(env, art) + if not ok: + continue + target = event.get("target") if isinstance(event.get("target"), dict) else {} + quorum = event.get("quorum") if isinstance(event.get("quorum"), dict) else {} + versions = event.get("versions") if isinstance(event.get("versions"), dict) else {} + confirmations = event.get("confirmations") if isinstance(event.get("confirmations"), list) else [] + rows.append( + { + "event": "consent_applied", + "ts": now_ts, + "request_id": env.get("request_id"), + "agent_id": env.get("agent_id"), + "artifact_id": str(art.get("id") or art.get("artifact_id") or ""), + "artifact_type": art.get("type"), + "artifact_status": art.get("status"), + "visibility_level": art.get("visibility_level"), + "consent_event_id": str(event.get("consent_event_id") or consent_ref), + "consent_decision": "approve", + "operation": target.get("operation"), + "target_type": target.get("target_type"), + "confirmations_count": len(confirmations), + "quorum_required": quorum.get("required"), + "quorum_present": quorum.get("present"), + "constitution_version": versions.get("constitution_version") or env.get("constitution_version"), + "protocol_version": versions.get("protocol_version") or env.get("protocol_version"), + "router_guard_version": env.get("router_guard_version"), + } + ) + return rows + + def _consent_transition(self, artifact_type: str, decision_type: str) -> Optional[Dict[str, str]]: + if artifact_type in CONSENT_TRANSITION_MAP: + return CONSENT_TRANSITION_MAP[artifact_type].get(decision_type) + if decision_type == "approve": + return {"to": "needs_confirmation", "op": "validated"} + if decision_type == "reject": + return {"to": "rejected", "op": "validated"} + if decision_type == "revoke": + return {"to": "revoked", "op": "corrected"} + return None + + def _has_consent_application_trail(self, artifact: Dict[str, Any], consent_event_id: str) -> bool: + provenance = artifact.get("provenance") + if not isinstance(provenance, list): + return False + for trail in provenance: + if not isinstance(trail, dict): + continue + ctx = trail.get("context") if isinstance(trail.get("context"), dict) else {} + op = trail.get("operation") if isinstance(trail.get("operation"), dict) else {} + if ( + str(ctx.get("consent_event_ref") or "") == consent_event_id + and str(op.get("op") or "") in {"validated", "export_validated", "policy_checked", "merged", "corrected"} + ): + return True + return False + + def _has_prior_approve_trail(self, artifact: Dict[str, Any]) -> bool: + provenance = artifact.get("provenance") + if not isinstance(provenance, list): + return False + for trail in provenance: + if not isinstance(trail, dict): + continue + ctx = trail.get("context") if isinstance(trail.get("context"), dict) else {} + op = trail.get("operation") if isinstance(trail.get("operation"), dict) else {} + if ctx.get("consent_status") == "confirmed" and str(op.get("op") or "") in { + "validated", + "export_validated", + "policy_checked", + "merged", + }: + return True + return False + + def apply_consent_event( + self, + consent_event: Dict[str, Any], + artifacts_by_id: Dict[str, Dict[str, Any]], + now_ts: Optional[int] = None, + applier_actor_id: str = "system:consent-applier", + ) -> Tuple[bool, Dict[str, Any]]: + ts = int(now_ts or time.time()) + event_errs = self._validate_consent_event_min(consent_event) + if event_errs: + return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": event_errs[:10]} + + decision = consent_event.get("decision") if isinstance(consent_event.get("decision"), dict) else {} + decision_type = str(decision.get("type") or "") + expires_at = decision.get("expires_at") + if isinstance(expires_at, int) and expires_at < ts: + return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": ["consent_event_expired"]} + + confirmations = consent_event.get("confirmations") if isinstance(consent_event.get("confirmations"), list) else [] + quorum = consent_event.get("quorum") if isinstance(consent_event.get("quorum"), dict) else {} + required = quorum.get("required") + present = quorum.get("present") + if decision_type == "approve": + if not isinstance(required, int) or not isinstance(present, int): + return False, {"stop_code": STOP_CONSENT_EVENT_INVALID, "details": ["invalid:quorum"]} + if len(confirmations) < required or present < required: + return False, { + "stop_code": STOP_CONSENT_QUORUM_NOT_MET, + "details": [f"confirmations={len(confirmations)}", f"required={required}", f"present={present}"], + } + + target = consent_event.get("target") if isinstance(consent_event.get("target"), dict) else {} + target_ids = target.get("artifact_ids") if isinstance(target.get("artifact_ids"), list) else [] + consent_event_id = str(consent_event.get("consent_event_id") or "") + target_operation = str(target.get("operation") or "") + + updated: List[Dict[str, Any]] = [] + logs: List[Dict[str, Any]] = [] + + for artifact_id in [str(x) for x in target_ids]: + artifact = artifacts_by_id.get(artifact_id) + if not isinstance(artifact, dict): + return False, { + "stop_code": STOP_CONSENT_EVENT_MISSING, + "details": [f"artifact_missing:{artifact_id}"], + } + + if self._has_consent_application_trail(artifact, consent_event_id): + continue + + if decision_type == "approve" and str(artifact.get("status") or "") == "rejected": + return False, { + "stop_code": STOP_CONSENT_EVENT_INVALID, + "details": [f"one_way_violation:{artifact_id}"], + } + + if decision_type == "revoke" and not self._has_prior_approve_trail(artifact): + return False, { + "stop_code": STOP_CONSENT_EVENT_INVALID, + "details": [f"revoke_without_prior_approve:{artifact_id}"], + } + + transition = self._consent_transition(str(artifact.get("type") or ""), decision_type) + if not transition: + return False, { + "stop_code": STOP_CONSENT_EVENT_INVALID, + "details": [f"unsupported_transition:{artifact.get('type')}:{decision_type}"], + } + + from_status = str(artifact.get("status") or "draft") + to_status = transition["to"] + op = transition["op"] + consent_status = "confirmed" if decision_type == "approve" else "none" + + trail = { + "event_id": f"prov_{uuid.uuid4().hex}", + "ts": ts, + "actor": {"type": "system", "id": applier_actor_id}, + "source": { + "channel": str((consent_event.get("provenance") or {}).get("channel") or "internal"), + "request_id": str((consent_event.get("provenance") or {}).get("request_id") or ""), + }, + "context": { + "visibility_level": str(artifact.get("visibility_level") or "incircle"), + "consent_status": consent_status, + "consent_event_ref": consent_event_id, + }, + "operation": { + "op": op, + "input_hash": str((consent_event.get("provenance") or {}).get("input_hash") or ""), + "notes": f"consent decision={decision_type}; op={target_operation}", + }, + "versions": { + "constitution_version": str((consent_event.get("versions") or {}).get("constitution_version") or ""), + "protocol_version": str((consent_event.get("versions") or {}).get("protocol_version") or ""), + "router_guard_version": "runtime_guard@1.0.0", + }, + "links": { + "related_artifact_ids": [x for x in [str(i) for i in target_ids] if x != artifact_id], + "external_refs": [consent_event_id], + }, + } + + provenance = self._normalize_provenance(artifact.get("provenance")) + artifact["provenance"] = self._normalize_provenance(provenance + [trail]) + artifact["status"] = to_status + + updated.append(artifact) + logs.append( + { + "event": "artifact_state_transition", + "ts": ts, + "request_id": str((consent_event.get("provenance") or {}).get("request_id") or ""), + "artifact_id": artifact_id, + "artifact_type": artifact.get("type"), + "from_status": from_status, + "to_status": to_status, + "op": op, + "consent_event_id": consent_event_id, + } + ) + + return True, {"ok": True, "updated_artifacts": updated, "artifact_state_transition_rows": logs} + + def stop_payload(self, env: Dict[str, Any], stop: Dict[str, Any]) -> Dict[str, Any]: + stop_code = stop.get("stop_code", "STOP_UNKNOWN") + details = stop.get("details", []) + next_step = "Потрібне підтвердження кола або уточнення контексту." + if stop_code in {STOP_SECRETS_DETECTED, STOP_INJECTION_ATTEMPT}: + next_step = "Зупинено з міркувань безпеки. Приберіть секрети/небезпечні інструкції та повторіть запит." + elif stop_code in {STOP_CONSENT_MISSING, STOP_EXPORT_PAYLOAD_NOT_PUBLIC}: + next_step = "Потрібен Consent Event або підвищення рівня видимості за процедурою." + elif stop_code in {STOP_CONSENT_EVENT_INVALID, STOP_CONSENT_EVENT_MISSING, STOP_CONSENT_QUORUM_NOT_MET}: + next_step = "Потрібна валідна ConsentEvent-подія з коректним кворумом і прив’язкою до артефакта." + elif stop_code == STOP_OUTPUT_NOT_ALLOWED: + next_step = "Виправте expected_output відповідно до agents_registry.yaml." + return { + "ok": False, + "stop_code": stop_code, + "details": details, + "request_id": env.get("request_id"), + "agent_id": env.get("agent_id"), + "next_step": next_step, + "timestamp": int(time.time()), + "input_hash": self._hash(env.get("input_text", "")), + }