## Agents Added - Alateya: R&D, biotech, innovations - Clan (Spirit): Community spirit agent - Eonarch: Consciousness evolution agent ## Changes - docker-compose.node1.yml: Added tokens for all 3 new agents - gateway-bot/http_api.py: Added configs and webhook endpoints - gateway-bot/clan_prompt.txt: New prompt file - gateway-bot/eonarch_prompt.txt: New prompt file ## Fixes - Fixed ROUTER_URL from :9102 to :8000 (internal container port) - All 9 Telegram agents now working ## Documentation - Created PROJECT-MASTER-INDEX.md - single entry point - Added various status documents and scripts Tokens configured: - Helion, NUTRA, Agromatrix (existing) - Alateya, Clan, Eonarch (new) - Druid, GreenFood, DAARWIZZ (configured)
284 lines
9.7 KiB
Python
284 lines
9.7 KiB
Python
"""
|
|
Payload Validation for Co-Memory Qdrant
|
|
|
|
Validates payloads against cm_payload_v1 schema before upsert.
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
# Try to use jsonschema if available, otherwise use manual validation
|
|
try:
|
|
import jsonschema
|
|
HAS_JSONSCHEMA = True
|
|
except ImportError:
|
|
HAS_JSONSCHEMA = False
|
|
|
|
|
|
class PayloadValidationError(Exception):
|
|
"""Raised when payload validation fails."""
|
|
|
|
def __init__(self, message: str, errors: Optional[List[str]] = None):
|
|
super().__init__(message)
|
|
self.errors = errors or []
|
|
|
|
|
|
# Enums
|
|
VALID_SCOPES = {"docs", "messages", "memory", "artifacts", "signals"}
|
|
VALID_VISIBILITY = {"public", "confidential", "private"}
|
|
VALID_OWNER_KINDS = {"user", "team", "agent"}
|
|
VALID_SOURCE_KINDS = {"document", "wiki", "message", "artifact", "web", "code"}
|
|
VALID_METRICS = {"cosine", "dot", "euclidean"}
|
|
|
|
# ID patterns
|
|
TENANT_ID_PATTERN = re.compile(r"^t_[a-z0-9_]+$")
|
|
TEAM_ID_PATTERN = re.compile(r"^team_[a-z0-9_]+$")
|
|
PROJECT_ID_PATTERN = re.compile(r"^proj_[a-z0-9_]+$")
|
|
AGENT_ID_PATTERN = re.compile(r"^agt_[a-z0-9_]+$")
|
|
SOURCE_ID_PATTERN = re.compile(r"^(doc|msg|art|web|code)_[A-Za-z0-9]+$")
|
|
CHUNK_ID_PATTERN = re.compile(r"^chk_[A-Za-z0-9]+$")
|
|
|
|
|
|
def _load_json_schema() -> Optional[Dict]:
|
|
"""Load JSON schema from file if available."""
|
|
schema_path = Path(__file__).parent.parent.parent.parent / "docs" / "memory" / "cm_payload_v1.schema.json"
|
|
if schema_path.exists():
|
|
with open(schema_path) as f:
|
|
return json.load(f)
|
|
return None
|
|
|
|
|
|
_SCHEMA = _load_json_schema()
|
|
|
|
|
|
def validate_payload(payload: Dict[str, Any], strict: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
Validate payload against cm_payload_v1 schema.
|
|
|
|
Args:
|
|
payload: The payload dictionary to validate
|
|
strict: If True, raise exception on validation failure
|
|
|
|
Returns:
|
|
The validated payload (potentially with defaults added)
|
|
|
|
Raises:
|
|
PayloadValidationError: If validation fails and strict=True
|
|
"""
|
|
errors = []
|
|
|
|
# Use jsonschema if available
|
|
if HAS_JSONSCHEMA and _SCHEMA:
|
|
try:
|
|
jsonschema.validate(payload, _SCHEMA)
|
|
except jsonschema.ValidationError as e:
|
|
errors.append(f"Schema validation: {e.message}")
|
|
else:
|
|
# Manual validation
|
|
errors.extend(_validate_required_fields(payload))
|
|
errors.extend(_validate_field_values(payload))
|
|
|
|
if errors and strict:
|
|
raise PayloadValidationError(
|
|
f"Payload validation failed: {len(errors)} error(s)",
|
|
errors=errors
|
|
)
|
|
|
|
return payload
|
|
|
|
|
|
def _validate_required_fields(payload: Dict[str, Any]) -> List[str]:
|
|
"""Validate required fields are present."""
|
|
errors = []
|
|
|
|
required = [
|
|
"schema_version",
|
|
"tenant_id",
|
|
"owner_kind",
|
|
"owner_id",
|
|
"scope",
|
|
"visibility",
|
|
"indexed",
|
|
"source_kind",
|
|
"source_id",
|
|
"chunk",
|
|
"fingerprint",
|
|
"created_at",
|
|
]
|
|
|
|
for field in required:
|
|
if field not in payload:
|
|
errors.append(f"Missing required field: {field}")
|
|
|
|
# Check nested required fields
|
|
if "chunk" in payload and isinstance(payload["chunk"], dict):
|
|
if "chunk_id" not in payload["chunk"]:
|
|
errors.append("Missing required field: chunk.chunk_id")
|
|
if "chunk_idx" not in payload["chunk"]:
|
|
errors.append("Missing required field: chunk.chunk_idx")
|
|
|
|
return errors
|
|
|
|
|
|
def _validate_field_values(payload: Dict[str, Any]) -> List[str]:
|
|
"""Validate field values match expected formats."""
|
|
errors = []
|
|
|
|
# Schema version
|
|
if payload.get("schema_version") != "cm_payload_v1":
|
|
errors.append(f"Invalid schema_version: {payload.get('schema_version')}, expected 'cm_payload_v1'")
|
|
|
|
# Tenant ID
|
|
tenant_id = payload.get("tenant_id")
|
|
if tenant_id and not TENANT_ID_PATTERN.match(tenant_id):
|
|
errors.append(f"Invalid tenant_id format: {tenant_id}")
|
|
|
|
# Team ID (optional)
|
|
team_id = payload.get("team_id")
|
|
if team_id and not TEAM_ID_PATTERN.match(team_id):
|
|
errors.append(f"Invalid team_id format: {team_id}")
|
|
|
|
# Project ID (optional)
|
|
project_id = payload.get("project_id")
|
|
if project_id and not PROJECT_ID_PATTERN.match(project_id):
|
|
errors.append(f"Invalid project_id format: {project_id}")
|
|
|
|
# Agent ID (optional)
|
|
agent_id = payload.get("agent_id")
|
|
if agent_id and not AGENT_ID_PATTERN.match(agent_id):
|
|
errors.append(f"Invalid agent_id format: {agent_id}")
|
|
|
|
# Scope
|
|
if payload.get("scope") not in VALID_SCOPES:
|
|
errors.append(f"Invalid scope: {payload.get('scope')}, valid: {VALID_SCOPES}")
|
|
|
|
# Visibility
|
|
if payload.get("visibility") not in VALID_VISIBILITY:
|
|
errors.append(f"Invalid visibility: {payload.get('visibility')}, valid: {VALID_VISIBILITY}")
|
|
|
|
# Owner kind
|
|
if payload.get("owner_kind") not in VALID_OWNER_KINDS:
|
|
errors.append(f"Invalid owner_kind: {payload.get('owner_kind')}, valid: {VALID_OWNER_KINDS}")
|
|
|
|
# Source kind
|
|
if payload.get("source_kind") not in VALID_SOURCE_KINDS:
|
|
errors.append(f"Invalid source_kind: {payload.get('source_kind')}, valid: {VALID_SOURCE_KINDS}")
|
|
|
|
# Source ID
|
|
source_id = payload.get("source_id")
|
|
if source_id and not SOURCE_ID_PATTERN.match(source_id):
|
|
errors.append(f"Invalid source_id format: {source_id}")
|
|
|
|
# Chunk
|
|
chunk = payload.get("chunk", {})
|
|
if isinstance(chunk, dict):
|
|
chunk_id = chunk.get("chunk_id")
|
|
if chunk_id and not CHUNK_ID_PATTERN.match(chunk_id):
|
|
errors.append(f"Invalid chunk.chunk_id format: {chunk_id}")
|
|
|
|
chunk_idx = chunk.get("chunk_idx")
|
|
if chunk_idx is not None and (not isinstance(chunk_idx, int) or chunk_idx < 0):
|
|
errors.append(f"Invalid chunk.chunk_idx: {chunk_idx}, must be non-negative integer")
|
|
|
|
# Indexed
|
|
if not isinstance(payload.get("indexed"), bool):
|
|
errors.append(f"Invalid indexed: {payload.get('indexed')}, must be boolean")
|
|
|
|
# Created at
|
|
created_at = payload.get("created_at")
|
|
if created_at:
|
|
try:
|
|
datetime.fromisoformat(created_at.replace("Z", "+00:00"))
|
|
except (ValueError, AttributeError):
|
|
errors.append(f"Invalid created_at format: {created_at}, expected ISO 8601")
|
|
|
|
# Embedding (optional)
|
|
embedding = payload.get("embedding", {})
|
|
if isinstance(embedding, dict):
|
|
if "metric" in embedding and embedding["metric"] not in VALID_METRICS:
|
|
errors.append(f"Invalid embedding.metric: {embedding['metric']}, valid: {VALID_METRICS}")
|
|
if "dim" in embedding and (not isinstance(embedding["dim"], int) or embedding["dim"] < 1):
|
|
errors.append(f"Invalid embedding.dim: {embedding['dim']}, must be positive integer")
|
|
|
|
# Importance (optional)
|
|
importance = payload.get("importance")
|
|
if importance is not None and (not isinstance(importance, (int, float)) or importance < 0 or importance > 1):
|
|
errors.append(f"Invalid importance: {importance}, must be 0-1")
|
|
|
|
# TTL days (optional)
|
|
ttl_days = payload.get("ttl_days")
|
|
if ttl_days is not None and (not isinstance(ttl_days, int) or ttl_days < 1):
|
|
errors.append(f"Invalid ttl_days: {ttl_days}, must be positive integer")
|
|
|
|
# ACL fields (must be arrays of non-empty strings, no nulls)
|
|
acl = payload.get("acl", {})
|
|
if isinstance(acl, dict):
|
|
for acl_field in ["read_team_ids", "read_agent_ids", "read_role_ids"]:
|
|
value = acl.get(acl_field)
|
|
if value is not None:
|
|
if not isinstance(value, list):
|
|
errors.append(f"Invalid acl.{acl_field}: must be array, got {type(value).__name__}")
|
|
elif not all(isinstance(item, str) and item for item in value):
|
|
# Check: all items must be non-empty strings (no None, no "")
|
|
errors.append(f"Invalid acl.{acl_field}: all items must be non-empty strings (no null/empty)")
|
|
elif acl is not None:
|
|
errors.append(f"Invalid acl: must be object, got {type(acl).__name__}")
|
|
|
|
# Tags must be array of non-empty strings
|
|
tags = payload.get("tags")
|
|
if tags is not None:
|
|
if not isinstance(tags, list):
|
|
errors.append(f"Invalid tags: must be array, got {type(tags).__name__}")
|
|
elif not all(isinstance(item, str) and item for item in tags):
|
|
errors.append(f"Invalid tags: all items must be non-empty strings (no null/empty)")
|
|
|
|
return errors
|
|
|
|
|
|
def create_minimal_payload(
|
|
tenant_id: str,
|
|
source_id: str,
|
|
chunk_id: str,
|
|
chunk_idx: int,
|
|
fingerprint: str,
|
|
scope: str = "docs",
|
|
visibility: str = "confidential",
|
|
owner_kind: str = "team",
|
|
owner_id: Optional[str] = None,
|
|
agent_id: Optional[str] = None,
|
|
team_id: Optional[str] = None,
|
|
**kwargs
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a minimal valid payload with required fields.
|
|
|
|
Returns a payload that passes validation.
|
|
"""
|
|
payload = {
|
|
"schema_version": "cm_payload_v1",
|
|
"tenant_id": tenant_id,
|
|
"team_id": team_id,
|
|
"agent_id": agent_id,
|
|
"owner_kind": owner_kind,
|
|
"owner_id": owner_id or team_id or tenant_id,
|
|
"scope": scope,
|
|
"visibility": visibility,
|
|
"indexed": True,
|
|
"source_kind": "document",
|
|
"source_id": source_id,
|
|
"chunk": {
|
|
"chunk_id": chunk_id,
|
|
"chunk_idx": chunk_idx,
|
|
},
|
|
"fingerprint": fingerprint,
|
|
"created_at": datetime.utcnow().isoformat() + "Z",
|
|
}
|
|
|
|
# Add optional fields
|
|
payload.update(kwargs)
|
|
|
|
return payload
|