399 lines
13 KiB
Python
399 lines
13 KiB
Python
import hashlib
|
|
import json
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import psycopg
|
|
from psycopg.rows import dict_row
|
|
|
|
|
|
PG_DSN = os.getenv("CLAN_PG_DSN", "postgresql://daarion:DaarionDB2026!@dagi-postgres:5432/daarion_main")
|
|
BATCH_SIZE = int(os.getenv("CLAN_OUTBOX_BATCH_SIZE", "10"))
|
|
POLL_INTERVAL = float(os.getenv("CLAN_OUTBOX_POLL_INTERVAL_SEC", "1.0"))
|
|
MAX_ARTIFACT_CAS_RETRIES = int(os.getenv("CLAN_OUTBOX_MAX_CAS_RETRIES", "5"))
|
|
APPLIER_ACTOR_ID = os.getenv("CLAN_CONSENT_APPLIER_ACTOR_ID", "system:consent-applier")
|
|
|
|
CONSENT_TRANSITION_MAP = {
|
|
"bridge_request_draft": {
|
|
"approve": ("approved_for_execution", "export_validated"),
|
|
"reject": ("rejected", "validated"),
|
|
"revoke": ("revoked", "corrected"),
|
|
},
|
|
"allocation_proposal": {
|
|
"approve": ("approved_for_execution", "validated"),
|
|
"reject": ("rejected", "validated"),
|
|
"revoke": ("revoked", "corrected"),
|
|
},
|
|
"access_grant_draft": {
|
|
"approve": ("approved_for_execution", "policy_checked"),
|
|
"reject": ("rejected", "validated"),
|
|
"revoke": ("revoked", "corrected"),
|
|
},
|
|
"visibility_change_draft": {
|
|
"approve": ("approved_for_execution", "policy_checked"),
|
|
"reject": ("rejected", "validated"),
|
|
"revoke": ("revoked", "corrected"),
|
|
},
|
|
"offline_merge_plan": {
|
|
"approve": ("approved_for_execution", "merged"),
|
|
"reject": ("rejected", "validated"),
|
|
"revoke": ("revoked", "corrected"),
|
|
},
|
|
"core_change_draft": {
|
|
"approve": ("needs_confirmation", "policy_checked"),
|
|
"reject": ("rejected", "validated"),
|
|
"revoke": ("revoked", "corrected"),
|
|
},
|
|
"testimony_draft": {
|
|
"approve": ("confirmed", "validated"),
|
|
"reject": ("rejected", "validated"),
|
|
"revoke": ("revoked", "corrected"),
|
|
},
|
|
}
|
|
|
|
|
|
def _id(prefix: str, seed: str) -> str:
|
|
h = hashlib.sha256(seed.encode("utf-8")).hexdigest()[:20]
|
|
return f"{prefix}_{h}"
|
|
|
|
|
|
def _now_ts() -> int:
|
|
return int(time.time())
|
|
|
|
|
|
def _provenance_has_consent(provenance: Any, consent_event_id: str) -> bool:
|
|
if not isinstance(provenance, list):
|
|
return False
|
|
for tr in provenance:
|
|
ctx = (tr or {}).get("context") or {}
|
|
op = ((tr or {}).get("operation") or {}).get("op")
|
|
if ctx.get("consent_event_ref") == consent_event_id and op in {
|
|
"validated",
|
|
"export_validated",
|
|
"policy_checked",
|
|
"merged",
|
|
"corrected",
|
|
}:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _build_trail(
|
|
consent_event: Dict[str, Any],
|
|
request_id: str,
|
|
consent_event_id: str,
|
|
decision_type: str,
|
|
op: str,
|
|
visibility_level: str,
|
|
versions: Dict[str, Any],
|
|
) -> Dict[str, Any]:
|
|
return {
|
|
"event_id": _id("prov", f"{consent_event_id}:{op}:{_now_ts()}"),
|
|
"ts": _now_ts(),
|
|
"actor": {"type": "system", "id": APPLIER_ACTOR_ID},
|
|
"source": {"channel": "internal", "request_id": request_id},
|
|
"context": {
|
|
"visibility_level": visibility_level,
|
|
"consent_status": "confirmed" if decision_type == "approve" else "none",
|
|
"consent_event_ref": consent_event_id,
|
|
},
|
|
"operation": {"op": op},
|
|
"versions": versions,
|
|
"links": {},
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class OutboxRow:
|
|
outbox_id: str
|
|
consent_event_id: str
|
|
target_artifact_ids: List[str]
|
|
request_id: str
|
|
|
|
|
|
def _fetch_pending_outbox(cur, limit: int) -> List[OutboxRow]:
|
|
cur.execute(
|
|
"""
|
|
select outbox_id, consent_event_id, target_artifact_ids, request_id
|
|
from clan.clan_outbox
|
|
where status='pending'
|
|
order by created_ts
|
|
limit %s
|
|
for update skip locked
|
|
""",
|
|
(limit,),
|
|
)
|
|
rows = cur.fetchall() or []
|
|
return [
|
|
OutboxRow(
|
|
outbox_id=r["outbox_id"],
|
|
consent_event_id=r["consent_event_id"],
|
|
target_artifact_ids=list(r["target_artifact_ids"] or []),
|
|
request_id=r["request_id"],
|
|
)
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def _set_outbox_error(cur, outbox_id: str, error: str) -> None:
|
|
cur.execute(
|
|
"""
|
|
update clan.clan_outbox
|
|
set attempts = attempts + 1,
|
|
last_error = %s,
|
|
updated_ts = now()
|
|
where outbox_id = %s
|
|
""",
|
|
(error[:800], outbox_id),
|
|
)
|
|
|
|
|
|
def _mark_outbox_done(cur, outbox_id: str) -> None:
|
|
cur.execute(
|
|
"""
|
|
update clan.clan_outbox
|
|
set status='done', updated_ts=now()
|
|
where outbox_id=%s and status='pending'
|
|
""",
|
|
(outbox_id,),
|
|
)
|
|
|
|
|
|
def _get_consent_event(cur, consent_event_id: str) -> Optional[Dict[str, Any]]:
|
|
cur.execute(
|
|
"select payload from clan.clan_consent_events where consent_event_id=%s",
|
|
(consent_event_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
return row["payload"] if row else None
|
|
|
|
|
|
def _get_artifact(cur, artifact_id: str) -> Optional[Dict[str, Any]]:
|
|
cur.execute(
|
|
"""
|
|
select artifact_id, artifact_type, status, visibility_level, payload, provenance, version
|
|
from clan.clan_artifacts
|
|
where artifact_id=%s
|
|
""",
|
|
(artifact_id,),
|
|
)
|
|
return cur.fetchone()
|
|
|
|
|
|
def _cas_update_artifact(
|
|
cur,
|
|
artifact_id: str,
|
|
expected_version: int,
|
|
new_status: str,
|
|
new_payload: Dict[str, Any],
|
|
new_provenance: Any,
|
|
) -> bool:
|
|
cur.execute(
|
|
"""
|
|
update clan.clan_artifacts
|
|
set status=%s,
|
|
payload=%s::jsonb,
|
|
provenance=%s::jsonb,
|
|
version=version+1,
|
|
updated_ts=now()
|
|
where artifact_id=%s and version=%s
|
|
""",
|
|
(
|
|
new_status,
|
|
json.dumps(new_payload, ensure_ascii=False),
|
|
json.dumps(new_provenance, ensure_ascii=False),
|
|
artifact_id,
|
|
expected_version,
|
|
),
|
|
)
|
|
return cur.rowcount == 1
|
|
|
|
|
|
def _insert_transition(
|
|
cur,
|
|
*,
|
|
consent_event_id: str,
|
|
decision_type: str,
|
|
request_id: str,
|
|
artifact_id: str,
|
|
artifact_type: str,
|
|
visibility_level: str,
|
|
from_status: str,
|
|
to_status: str,
|
|
op: str,
|
|
versions: Dict[str, Any],
|
|
) -> None:
|
|
transition_id = _id("tr", f"{consent_event_id}:{artifact_id}:{decision_type}:{to_status}:{op}")
|
|
cur.execute(
|
|
"""
|
|
insert into clan.clan_state_transitions(
|
|
transition_id, artifact_id, artifact_type, from_status, to_status, op,
|
|
consent_event_id, decision_type, request_id, visibility_level, versions
|
|
) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s::jsonb)
|
|
on conflict (transition_id) do nothing
|
|
""",
|
|
(
|
|
transition_id,
|
|
artifact_id,
|
|
artifact_type,
|
|
from_status,
|
|
to_status,
|
|
op,
|
|
consent_event_id,
|
|
decision_type,
|
|
request_id,
|
|
visibility_level,
|
|
json.dumps(versions, ensure_ascii=False),
|
|
),
|
|
)
|
|
|
|
|
|
def _validate_consent_min(consent_event: Dict[str, Any]) -> Tuple[bool, str]:
|
|
try:
|
|
decision_type = consent_event["decision"]["type"]
|
|
if decision_type not in {"approve", "reject", "revoke"}:
|
|
return False, "invalid decision.type"
|
|
target_ids = (consent_event.get("target") or {}).get("artifact_ids") or []
|
|
if not target_ids:
|
|
return False, "empty target.artifact_ids"
|
|
|
|
expires_at = (consent_event.get("decision") or {}).get("expires_at")
|
|
if expires_at is not None and int(expires_at) < _now_ts():
|
|
return False, "consent expired"
|
|
|
|
if decision_type == "approve":
|
|
confirmations = consent_event.get("confirmations") or []
|
|
quorum = consent_event.get("quorum") or {}
|
|
required = int(quorum.get("required", 1))
|
|
present = int(quorum.get("present", 0))
|
|
if len(confirmations) < required or present < required:
|
|
return False, "quorum not met"
|
|
return True, ""
|
|
except Exception as e:
|
|
return False, f"validation_error:{e}"
|
|
|
|
|
|
def _apply_outbox(cur, outbox: OutboxRow) -> None:
|
|
consent = _get_consent_event(cur, outbox.consent_event_id)
|
|
if not consent:
|
|
_set_outbox_error(cur, outbox.outbox_id, "STOP_CONSENT_EVENT_MISSING: consent not found")
|
|
return
|
|
|
|
ok, err = _validate_consent_min(consent)
|
|
if not ok:
|
|
_set_outbox_error(cur, outbox.outbox_id, f"STOP_CONSENT_EVENT_INVALID: {err}")
|
|
return
|
|
|
|
decision_type = (consent.get("decision") or {}).get("type")
|
|
versions = {
|
|
"constitution_version": str((consent.get("versions") or {}).get("constitution_version") or "unknown"),
|
|
"protocol_version": str((consent.get("versions") or {}).get("protocol_version") or "unknown"),
|
|
"router_guard_version": str((consent.get("versions") or {}).get("router_guard_version") or "unknown"),
|
|
}
|
|
|
|
for artifact_id in outbox.target_artifact_ids:
|
|
artifact = _get_artifact(cur, artifact_id)
|
|
if not artifact:
|
|
_set_outbox_error(cur, outbox.outbox_id, f"STOP_CONSENT_EVENT_MISSING: artifact {artifact_id} not found")
|
|
return
|
|
|
|
if artifact["artifact_type"] not in CONSENT_TRANSITION_MAP:
|
|
_set_outbox_error(cur, outbox.outbox_id, f"STOP_CONSENT_EVENT_INVALID: no transition for {artifact['artifact_type']}")
|
|
return
|
|
|
|
if decision_type not in CONSENT_TRANSITION_MAP[artifact["artifact_type"]]:
|
|
_set_outbox_error(cur, outbox.outbox_id, f"STOP_CONSENT_EVENT_INVALID: decision not mapped {decision_type}")
|
|
return
|
|
|
|
from_status = artifact["status"]
|
|
if from_status == "rejected" and decision_type == "approve":
|
|
_set_outbox_error(cur, outbox.outbox_id, f"STOP_CONSENT_EVENT_INVALID: one-way violation {artifact_id}")
|
|
return
|
|
|
|
provenance = artifact["provenance"]
|
|
if _provenance_has_consent(provenance, outbox.consent_event_id):
|
|
continue
|
|
|
|
to_status, op = CONSENT_TRANSITION_MAP[artifact["artifact_type"]][decision_type]
|
|
trail = _build_trail(
|
|
consent_event=consent,
|
|
request_id=outbox.request_id,
|
|
consent_event_id=outbox.consent_event_id,
|
|
decision_type=decision_type,
|
|
op=op,
|
|
visibility_level=artifact["visibility_level"],
|
|
versions=versions,
|
|
)
|
|
|
|
payload = artifact["payload"]
|
|
expected_version = int(artifact["version"])
|
|
new_provenance = (provenance if isinstance(provenance, list) else []) + [trail]
|
|
|
|
success = False
|
|
for _ in range(MAX_ARTIFACT_CAS_RETRIES):
|
|
success = _cas_update_artifact(
|
|
cur,
|
|
artifact_id=artifact_id,
|
|
expected_version=expected_version,
|
|
new_status=to_status,
|
|
new_payload=payload,
|
|
new_provenance=new_provenance,
|
|
)
|
|
if success:
|
|
break
|
|
|
|
current = _get_artifact(cur, artifact_id)
|
|
if not current:
|
|
break
|
|
expected_version = int(current["version"])
|
|
payload = current["payload"]
|
|
provenance = current["provenance"]
|
|
if _provenance_has_consent(provenance, outbox.consent_event_id):
|
|
success = True
|
|
break
|
|
new_provenance = (provenance if isinstance(provenance, list) else []) + [trail]
|
|
|
|
if not success:
|
|
_set_outbox_error(cur, outbox.outbox_id, f"CAS conflict exceeded for {artifact_id}")
|
|
return
|
|
|
|
_insert_transition(
|
|
cur,
|
|
consent_event_id=outbox.consent_event_id,
|
|
decision_type=decision_type,
|
|
request_id=outbox.request_id,
|
|
artifact_id=artifact_id,
|
|
artifact_type=artifact["artifact_type"],
|
|
visibility_level=artifact["visibility_level"],
|
|
from_status=from_status,
|
|
to_status=to_status,
|
|
op=op,
|
|
versions=versions,
|
|
)
|
|
|
|
_mark_outbox_done(cur, outbox.outbox_id)
|
|
|
|
|
|
def main() -> None:
|
|
while True:
|
|
try:
|
|
with psycopg.connect(PG_DSN, row_factory=dict_row) as conn:
|
|
conn.autocommit = False
|
|
with conn.cursor() as cur:
|
|
outboxes = _fetch_pending_outbox(cur, BATCH_SIZE)
|
|
if not outboxes:
|
|
conn.rollback()
|
|
time.sleep(POLL_INTERVAL)
|
|
continue
|
|
for ob in outboxes:
|
|
_apply_outbox(cur, ob)
|
|
conn.commit()
|
|
except Exception:
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|