Files
microdao-daarion/services/clan-consent-adapter/clan_consent_outbox_worker.py

399 lines
13 KiB
Python

import hashlib
import json
import os
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import psycopg
from psycopg.rows import dict_row
PG_DSN = os.getenv("CLAN_PG_DSN", "postgresql://daarion:DaarionDB2026!@dagi-postgres:5432/daarion_main")
BATCH_SIZE = int(os.getenv("CLAN_OUTBOX_BATCH_SIZE", "10"))
POLL_INTERVAL = float(os.getenv("CLAN_OUTBOX_POLL_INTERVAL_SEC", "1.0"))
MAX_ARTIFACT_CAS_RETRIES = int(os.getenv("CLAN_OUTBOX_MAX_CAS_RETRIES", "5"))
APPLIER_ACTOR_ID = os.getenv("CLAN_CONSENT_APPLIER_ACTOR_ID", "system:consent-applier")
CONSENT_TRANSITION_MAP = {
"bridge_request_draft": {
"approve": ("approved_for_execution", "export_validated"),
"reject": ("rejected", "validated"),
"revoke": ("revoked", "corrected"),
},
"allocation_proposal": {
"approve": ("approved_for_execution", "validated"),
"reject": ("rejected", "validated"),
"revoke": ("revoked", "corrected"),
},
"access_grant_draft": {
"approve": ("approved_for_execution", "policy_checked"),
"reject": ("rejected", "validated"),
"revoke": ("revoked", "corrected"),
},
"visibility_change_draft": {
"approve": ("approved_for_execution", "policy_checked"),
"reject": ("rejected", "validated"),
"revoke": ("revoked", "corrected"),
},
"offline_merge_plan": {
"approve": ("approved_for_execution", "merged"),
"reject": ("rejected", "validated"),
"revoke": ("revoked", "corrected"),
},
"core_change_draft": {
"approve": ("needs_confirmation", "policy_checked"),
"reject": ("rejected", "validated"),
"revoke": ("revoked", "corrected"),
},
"testimony_draft": {
"approve": ("confirmed", "validated"),
"reject": ("rejected", "validated"),
"revoke": ("revoked", "corrected"),
},
}
def _id(prefix: str, seed: str) -> str:
h = hashlib.sha256(seed.encode("utf-8")).hexdigest()[:20]
return f"{prefix}_{h}"
def _now_ts() -> int:
return int(time.time())
def _provenance_has_consent(provenance: Any, consent_event_id: str) -> bool:
if not isinstance(provenance, list):
return False
for tr in provenance:
ctx = (tr or {}).get("context") or {}
op = ((tr or {}).get("operation") or {}).get("op")
if ctx.get("consent_event_ref") == consent_event_id and op in {
"validated",
"export_validated",
"policy_checked",
"merged",
"corrected",
}:
return True
return False
def _build_trail(
consent_event: Dict[str, Any],
request_id: str,
consent_event_id: str,
decision_type: str,
op: str,
visibility_level: str,
versions: Dict[str, Any],
) -> Dict[str, Any]:
return {
"event_id": _id("prov", f"{consent_event_id}:{op}:{_now_ts()}"),
"ts": _now_ts(),
"actor": {"type": "system", "id": APPLIER_ACTOR_ID},
"source": {"channel": "internal", "request_id": request_id},
"context": {
"visibility_level": visibility_level,
"consent_status": "confirmed" if decision_type == "approve" else "none",
"consent_event_ref": consent_event_id,
},
"operation": {"op": op},
"versions": versions,
"links": {},
}
@dataclass
class OutboxRow:
outbox_id: str
consent_event_id: str
target_artifact_ids: List[str]
request_id: str
def _fetch_pending_outbox(cur, limit: int) -> List[OutboxRow]:
cur.execute(
"""
select outbox_id, consent_event_id, target_artifact_ids, request_id
from clan.clan_outbox
where status='pending'
order by created_ts
limit %s
for update skip locked
""",
(limit,),
)
rows = cur.fetchall() or []
return [
OutboxRow(
outbox_id=r["outbox_id"],
consent_event_id=r["consent_event_id"],
target_artifact_ids=list(r["target_artifact_ids"] or []),
request_id=r["request_id"],
)
for r in rows
]
def _set_outbox_error(cur, outbox_id: str, error: str) -> None:
cur.execute(
"""
update clan.clan_outbox
set attempts = attempts + 1,
last_error = %s,
updated_ts = now()
where outbox_id = %s
""",
(error[:800], outbox_id),
)
def _mark_outbox_done(cur, outbox_id: str) -> None:
cur.execute(
"""
update clan.clan_outbox
set status='done', updated_ts=now()
where outbox_id=%s and status='pending'
""",
(outbox_id,),
)
def _get_consent_event(cur, consent_event_id: str) -> Optional[Dict[str, Any]]:
cur.execute(
"select payload from clan.clan_consent_events where consent_event_id=%s",
(consent_event_id,),
)
row = cur.fetchone()
return row["payload"] if row else None
def _get_artifact(cur, artifact_id: str) -> Optional[Dict[str, Any]]:
cur.execute(
"""
select artifact_id, artifact_type, status, visibility_level, payload, provenance, version
from clan.clan_artifacts
where artifact_id=%s
""",
(artifact_id,),
)
return cur.fetchone()
def _cas_update_artifact(
cur,
artifact_id: str,
expected_version: int,
new_status: str,
new_payload: Dict[str, Any],
new_provenance: Any,
) -> bool:
cur.execute(
"""
update clan.clan_artifacts
set status=%s,
payload=%s::jsonb,
provenance=%s::jsonb,
version=version+1,
updated_ts=now()
where artifact_id=%s and version=%s
""",
(
new_status,
json.dumps(new_payload, ensure_ascii=False),
json.dumps(new_provenance, ensure_ascii=False),
artifact_id,
expected_version,
),
)
return cur.rowcount == 1
def _insert_transition(
cur,
*,
consent_event_id: str,
decision_type: str,
request_id: str,
artifact_id: str,
artifact_type: str,
visibility_level: str,
from_status: str,
to_status: str,
op: str,
versions: Dict[str, Any],
) -> None:
transition_id = _id("tr", f"{consent_event_id}:{artifact_id}:{decision_type}:{to_status}:{op}")
cur.execute(
"""
insert into clan.clan_state_transitions(
transition_id, artifact_id, artifact_type, from_status, to_status, op,
consent_event_id, decision_type, request_id, visibility_level, versions
) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s::jsonb)
on conflict (transition_id) do nothing
""",
(
transition_id,
artifact_id,
artifact_type,
from_status,
to_status,
op,
consent_event_id,
decision_type,
request_id,
visibility_level,
json.dumps(versions, ensure_ascii=False),
),
)
def _validate_consent_min(consent_event: Dict[str, Any]) -> Tuple[bool, str]:
try:
decision_type = consent_event["decision"]["type"]
if decision_type not in {"approve", "reject", "revoke"}:
return False, "invalid decision.type"
target_ids = (consent_event.get("target") or {}).get("artifact_ids") or []
if not target_ids:
return False, "empty target.artifact_ids"
expires_at = (consent_event.get("decision") or {}).get("expires_at")
if expires_at is not None and int(expires_at) < _now_ts():
return False, "consent expired"
if decision_type == "approve":
confirmations = consent_event.get("confirmations") or []
quorum = consent_event.get("quorum") or {}
required = int(quorum.get("required", 1))
present = int(quorum.get("present", 0))
if len(confirmations) < required or present < required:
return False, "quorum not met"
return True, ""
except Exception as e:
return False, f"validation_error:{e}"
def _apply_outbox(cur, outbox: OutboxRow) -> None:
consent = _get_consent_event(cur, outbox.consent_event_id)
if not consent:
_set_outbox_error(cur, outbox.outbox_id, "STOP_CONSENT_EVENT_MISSING: consent not found")
return
ok, err = _validate_consent_min(consent)
if not ok:
_set_outbox_error(cur, outbox.outbox_id, f"STOP_CONSENT_EVENT_INVALID: {err}")
return
decision_type = (consent.get("decision") or {}).get("type")
versions = {
"constitution_version": str((consent.get("versions") or {}).get("constitution_version") or "unknown"),
"protocol_version": str((consent.get("versions") or {}).get("protocol_version") or "unknown"),
"router_guard_version": str((consent.get("versions") or {}).get("router_guard_version") or "unknown"),
}
for artifact_id in outbox.target_artifact_ids:
artifact = _get_artifact(cur, artifact_id)
if not artifact:
_set_outbox_error(cur, outbox.outbox_id, f"STOP_CONSENT_EVENT_MISSING: artifact {artifact_id} not found")
return
if artifact["artifact_type"] not in CONSENT_TRANSITION_MAP:
_set_outbox_error(cur, outbox.outbox_id, f"STOP_CONSENT_EVENT_INVALID: no transition for {artifact['artifact_type']}")
return
if decision_type not in CONSENT_TRANSITION_MAP[artifact["artifact_type"]]:
_set_outbox_error(cur, outbox.outbox_id, f"STOP_CONSENT_EVENT_INVALID: decision not mapped {decision_type}")
return
from_status = artifact["status"]
if from_status == "rejected" and decision_type == "approve":
_set_outbox_error(cur, outbox.outbox_id, f"STOP_CONSENT_EVENT_INVALID: one-way violation {artifact_id}")
return
provenance = artifact["provenance"]
if _provenance_has_consent(provenance, outbox.consent_event_id):
continue
to_status, op = CONSENT_TRANSITION_MAP[artifact["artifact_type"]][decision_type]
trail = _build_trail(
consent_event=consent,
request_id=outbox.request_id,
consent_event_id=outbox.consent_event_id,
decision_type=decision_type,
op=op,
visibility_level=artifact["visibility_level"],
versions=versions,
)
payload = artifact["payload"]
expected_version = int(artifact["version"])
new_provenance = (provenance if isinstance(provenance, list) else []) + [trail]
success = False
for _ in range(MAX_ARTIFACT_CAS_RETRIES):
success = _cas_update_artifact(
cur,
artifact_id=artifact_id,
expected_version=expected_version,
new_status=to_status,
new_payload=payload,
new_provenance=new_provenance,
)
if success:
break
current = _get_artifact(cur, artifact_id)
if not current:
break
expected_version = int(current["version"])
payload = current["payload"]
provenance = current["provenance"]
if _provenance_has_consent(provenance, outbox.consent_event_id):
success = True
break
new_provenance = (provenance if isinstance(provenance, list) else []) + [trail]
if not success:
_set_outbox_error(cur, outbox.outbox_id, f"CAS conflict exceeded for {artifact_id}")
return
_insert_transition(
cur,
consent_event_id=outbox.consent_event_id,
decision_type=decision_type,
request_id=outbox.request_id,
artifact_id=artifact_id,
artifact_type=artifact["artifact_type"],
visibility_level=artifact["visibility_level"],
from_status=from_status,
to_status=to_status,
op=op,
versions=versions,
)
_mark_outbox_done(cur, outbox.outbox_id)
def main() -> None:
while True:
try:
with psycopg.connect(PG_DSN, row_factory=dict_row) as conn:
conn.autocommit = False
with conn.cursor() as cur:
outboxes = _fetch_pending_outbox(cur, BATCH_SIZE)
if not outboxes:
conn.rollback()
time.sleep(POLL_INTERVAL)
continue
for ob in outboxes:
_apply_outbox(cur, ob)
conn.commit()
except Exception:
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
main()