## Agents Added - Alateya: R&D, biotech, innovations - Clan (Spirit): Community spirit agent - Eonarch: Consciousness evolution agent ## Changes - docker-compose.node1.yml: Added tokens for all 3 new agents - gateway-bot/http_api.py: Added configs and webhook endpoints - gateway-bot/clan_prompt.txt: New prompt file - gateway-bot/eonarch_prompt.txt: New prompt file ## Fixes - Fixed ROUTER_URL from :9102 to :8000 (internal container port) - All 9 Telegram agents now working ## Documentation - Created PROJECT-MASTER-INDEX.md - single entry point - Added various status documents and scripts Tokens configured: - Helion, NUTRA, Agromatrix (existing) - Alateya, Clan, Eonarch (new) - Druid, GreenFood, DAARWIZZ (configured)
670 lines
22 KiB
Python
Executable File
670 lines
22 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Qdrant Migration Script: Legacy Collections → Canonical
|
|
|
|
Migrates points from per-agent collections to canonical cm_text_* collection
|
|
with proper payload mapping.
|
|
|
|
Usage:
|
|
python qdrant_migrate_to_canonical.py --dry-run
|
|
python qdrant_migrate_to_canonical.py --collections helion_docs,nutra_messages
|
|
python qdrant_migrate_to_canonical.py --all
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
from uuid import uuid4
|
|
|
|
try:
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.models import PointStruct
|
|
except ImportError:
|
|
print("Error: qdrant-client not installed. Run: pip install qdrant-client")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
import ulid
|
|
HAS_ULID = True
|
|
except ImportError:
|
|
HAS_ULID = False
|
|
|
|
|
|
# Add parent to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from services.memory.qdrant.payload_validation import validate_payload, PayloadValidationError
|
|
from services.memory.qdrant.collections import ensure_collection, get_canonical_collection_name
|
|
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s"
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Default configuration
|
|
DEFAULT_CONFIG = {
|
|
"tenant_id": "t_daarion",
|
|
"team_id": "team_core",
|
|
"text_dim": 1024,
|
|
"text_metric": "cosine",
|
|
"default_visibility": "confidential",
|
|
"default_owner_kind": "agent",
|
|
"batch_size": 100,
|
|
}
|
|
|
|
# Collection name patterns for mapping
|
|
COLLECTION_PATTERNS = [
|
|
# (regex, agent_group_idx, scope, tags)
|
|
(r"^([a-z]+)_docs$", 1, "docs", []),
|
|
(r"^([a-z]+)_messages$", 1, "messages", []),
|
|
(r"^([a-z]+)_memory_items$", 1, "memory", []),
|
|
(r"^([a-z]+)_artifacts$", 1, "artifacts", []),
|
|
(r"^druid_legal_kb$", None, "docs", ["legal_kb"]),
|
|
(r"^nutra_food_knowledge$", None, "docs", ["food_kb"]),
|
|
(r"^memories$", None, "memory", []),
|
|
(r"^messages$", None, "messages", []),
|
|
]
|
|
|
|
# Agent slug mapping
|
|
AGENT_SLUGS = {
|
|
"helion": "agt_helion",
|
|
"nutra": "agt_nutra",
|
|
"druid": "agt_druid",
|
|
"greenfood": "agt_greenfood",
|
|
"agromatrix": "agt_agromatrix",
|
|
"daarwizz": "agt_daarwizz",
|
|
"alateya": "agt_alateya",
|
|
}
|
|
|
|
|
|
def generate_id(prefix: str = "doc") -> str:
|
|
"""Generate a unique ID with prefix."""
|
|
if HAS_ULID:
|
|
return f"{prefix}_{ulid.new().str}"
|
|
return f"{prefix}_{uuid4().hex[:24].upper()}"
|
|
|
|
|
|
def compute_deterministic_id(
|
|
source_collection: str,
|
|
legacy_point_id: str,
|
|
source_id: str,
|
|
chunk_idx: int,
|
|
) -> str:
|
|
"""
|
|
Compute deterministic point ID for migration (rerun-safe).
|
|
|
|
ID is stable across reruns: same input = same output.
|
|
This allows safe re-migration without duplicates.
|
|
|
|
Uses 32 hex chars (128 bits) to minimize collision risk
|
|
for large collections (10M+ points).
|
|
"""
|
|
# Create stable hash from all identifying components
|
|
content = f"{source_collection}|{legacy_point_id}|{source_id}|{chunk_idx}"
|
|
hash_hex = hashlib.sha256(content.encode()).hexdigest()[:32]
|
|
return hash_hex
|
|
|
|
|
|
def compute_fingerprint(source_id: str, chunk_idx: int, text: str = "") -> str:
|
|
"""
|
|
Compute stable fingerprint for deduplication.
|
|
|
|
IMPORTANT: Does NOT use timestamp or random values.
|
|
Same content = same fingerprint (idempotent).
|
|
"""
|
|
content = f"{source_id}:{chunk_idx}:{text}"
|
|
return f"sha256:{hashlib.sha256(content.encode()).hexdigest()[:32]}"
|
|
|
|
|
|
def get_collection_vector_config(client: QdrantClient, collection_name: str) -> Dict[str, Any]:
|
|
"""
|
|
Get vector configuration from a collection.
|
|
|
|
Returns:
|
|
Dict with 'dim' and 'metric' keys
|
|
"""
|
|
try:
|
|
info = client.get_collection(collection_name)
|
|
vectors_config = info.config.params.vectors
|
|
|
|
# Handle both named and unnamed vectors
|
|
if hasattr(vectors_config, 'size'):
|
|
# Unnamed vectors
|
|
return {
|
|
"dim": vectors_config.size,
|
|
"metric": vectors_config.distance.value.lower(),
|
|
}
|
|
elif hasattr(vectors_config, '__iter__'):
|
|
# Named vectors - get first one
|
|
for name, config in vectors_config.items():
|
|
return {
|
|
"dim": config.size,
|
|
"metric": config.distance.value.lower(),
|
|
}
|
|
|
|
return {"dim": None, "metric": None}
|
|
except Exception as e:
|
|
logger.warning(f"Could not get vector config for {collection_name}: {e}")
|
|
return {"dim": None, "metric": None}
|
|
|
|
|
|
class DimMismatchError(Exception):
|
|
"""Raised when source and target collection dimensions don't match."""
|
|
pass
|
|
|
|
|
|
def parse_collection_name(name: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Parse legacy collection name to extract agent_id, scope, tags.
|
|
|
|
Returns:
|
|
Dict with agent_id, scope, tags or None if no match
|
|
"""
|
|
for pattern, agent_group, scope, tags in COLLECTION_PATTERNS:
|
|
match = re.match(pattern, name.lower())
|
|
if match:
|
|
agent_id = None
|
|
if agent_group is not None:
|
|
agent_slug = match.group(agent_group).lower()
|
|
agent_id = AGENT_SLUGS.get(agent_slug, f"agt_{agent_slug}")
|
|
elif "druid" in name.lower():
|
|
agent_id = "agt_druid"
|
|
elif "nutra" in name.lower():
|
|
agent_id = "agt_nutra"
|
|
|
|
return {
|
|
"agent_id": agent_id,
|
|
"scope": scope,
|
|
"tags": tags.copy(),
|
|
}
|
|
|
|
return None
|
|
|
|
|
|
def build_canonical_payload(
|
|
legacy_payload: Dict[str, Any],
|
|
collection_info: Dict[str, Any],
|
|
config: Dict[str, Any],
|
|
point_id: str,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build canonical cm_payload_v1 from legacy payload.
|
|
|
|
Args:
|
|
legacy_payload: Original payload from legacy collection
|
|
collection_info: Parsed collection name info (agent_id, scope, tags)
|
|
config: Migration configuration
|
|
point_id: Original point ID
|
|
|
|
Returns:
|
|
Canonical payload
|
|
"""
|
|
# Extract values from legacy payload or use defaults
|
|
agent_id = collection_info.get("agent_id")
|
|
scope = collection_info.get("scope", "docs")
|
|
tags = collection_info.get("tags", [])
|
|
|
|
# Try to get source_id from legacy payload
|
|
source_id = (
|
|
legacy_payload.get("source_id") or
|
|
legacy_payload.get("document_id") or
|
|
legacy_payload.get("doc_id") or
|
|
legacy_payload.get("message_id") or
|
|
generate_id("doc")
|
|
)
|
|
|
|
# Ensure source_id has proper prefix
|
|
if not re.match(r"^(doc|msg|art|web|code)_", source_id):
|
|
prefix = "msg" if scope == "messages" else "doc"
|
|
source_id = f"{prefix}_{source_id}"
|
|
|
|
# Chunk info
|
|
chunk_idx = legacy_payload.get("chunk_idx", legacy_payload.get("chunk_index", 0))
|
|
chunk_id = legacy_payload.get("chunk_id") or generate_id("chk")
|
|
if not chunk_id.startswith("chk_"):
|
|
chunk_id = f"chk_{chunk_id}"
|
|
|
|
# Fingerprint
|
|
text = legacy_payload.get("text", legacy_payload.get("content", ""))
|
|
fingerprint = (
|
|
legacy_payload.get("fingerprint") or
|
|
legacy_payload.get("hash") or
|
|
compute_fingerprint(source_id, chunk_idx, text)
|
|
)
|
|
|
|
# Timestamp
|
|
created_at = (
|
|
legacy_payload.get("created_at") or
|
|
legacy_payload.get("timestamp") or
|
|
legacy_payload.get("indexed_at") or
|
|
datetime.utcnow().isoformat() + "Z"
|
|
)
|
|
|
|
# Build canonical payload
|
|
payload = {
|
|
"schema_version": "cm_payload_v1",
|
|
"tenant_id": config["tenant_id"],
|
|
"team_id": config.get("team_id"),
|
|
"project_id": legacy_payload.get("project_id"),
|
|
"agent_id": agent_id,
|
|
"owner_kind": config["default_owner_kind"],
|
|
"owner_id": agent_id or config["team_id"],
|
|
"scope": scope,
|
|
"visibility": legacy_payload.get("visibility", config["default_visibility"]),
|
|
"indexed": legacy_payload.get("indexed", True),
|
|
"source_kind": _infer_source_kind(scope),
|
|
"source_id": source_id,
|
|
"chunk": {
|
|
"chunk_id": chunk_id,
|
|
"chunk_idx": chunk_idx,
|
|
},
|
|
"fingerprint": fingerprint,
|
|
"created_at": created_at,
|
|
}
|
|
|
|
# Add tags
|
|
if tags:
|
|
payload["tags"] = tags
|
|
legacy_tags = legacy_payload.get("tags", [])
|
|
if legacy_tags:
|
|
payload["tags"] = list(set(payload.get("tags", []) + legacy_tags))
|
|
|
|
# Preserve additional fields
|
|
for field in ["lang", "importance", "ttl_days", "channel_id"]:
|
|
if field in legacy_payload:
|
|
payload[field] = legacy_payload[field]
|
|
|
|
# Preserve text/content for debugging (optional)
|
|
if text:
|
|
payload["_text"] = text[:500] # Truncate for safety
|
|
|
|
return payload
|
|
|
|
|
|
def _infer_source_kind(scope: str) -> str:
|
|
"""Infer source_kind from scope."""
|
|
mapping = {
|
|
"docs": "document",
|
|
"messages": "message",
|
|
"memory": "document",
|
|
"artifacts": "artifact",
|
|
"signals": "document",
|
|
}
|
|
return mapping.get(scope, "document")
|
|
|
|
|
|
class MigrationStats:
|
|
"""Track migration statistics."""
|
|
|
|
def __init__(self):
|
|
self.collections_processed = 0
|
|
self.points_read = 0
|
|
self.points_migrated = 0
|
|
self.points_skipped = 0
|
|
self.errors = []
|
|
|
|
def add_error(self, collection: str, point_id: str, error: str):
|
|
self.errors.append({
|
|
"collection": collection,
|
|
"point_id": point_id,
|
|
"error": error,
|
|
})
|
|
|
|
def summary(self) -> Dict[str, Any]:
|
|
return {
|
|
"collections_processed": self.collections_processed,
|
|
"points_read": self.points_read,
|
|
"points_migrated": self.points_migrated,
|
|
"points_skipped": self.points_skipped,
|
|
"errors_count": len(self.errors),
|
|
"errors": self.errors[:10] if self.errors else [],
|
|
}
|
|
|
|
|
|
def migrate_collection(
|
|
client: QdrantClient,
|
|
source_collection: str,
|
|
target_collection: str,
|
|
config: Dict[str, Any],
|
|
stats: MigrationStats,
|
|
dry_run: bool = True,
|
|
skip_dim_check: bool = False,
|
|
) -> None:
|
|
"""
|
|
Migrate a single legacy collection to canonical collection.
|
|
|
|
Args:
|
|
client: Qdrant client
|
|
source_collection: Legacy collection name
|
|
target_collection: Canonical collection name
|
|
config: Migration configuration
|
|
stats: Statistics tracker
|
|
dry_run: If True, don't actually write
|
|
skip_dim_check: Skip dimension/metric validation (dangerous!)
|
|
|
|
Raises:
|
|
DimMismatchError: If source dim/metric doesn't match target
|
|
"""
|
|
logger.info(f"Migrating collection: {source_collection}")
|
|
|
|
# === SECURITY: Verify dim/metric match ===
|
|
source_config = get_collection_vector_config(client, source_collection)
|
|
target_dim = config.get("text_dim")
|
|
target_metric = config.get("text_metric", "cosine")
|
|
|
|
logger.info(f" Source: dim={source_config['dim']}, metric={source_config['metric']}")
|
|
logger.info(f" Target: dim={target_dim}, metric={target_metric}")
|
|
|
|
if source_config["dim"] and source_config["dim"] != target_dim:
|
|
msg = (
|
|
f"Dimension mismatch: {source_collection} has dim={source_config['dim']}, "
|
|
f"target {target_collection} expects dim={target_dim}"
|
|
)
|
|
if skip_dim_check:
|
|
logger.warning(f" WARNING: {msg} (skipping due to --skip-dim-check)")
|
|
else:
|
|
logger.error(f" ERROR: {msg}")
|
|
stats.add_error(source_collection, "", f"DimMismatch: {msg}")
|
|
raise DimMismatchError(msg)
|
|
|
|
if source_config["metric"] and source_config["metric"] != target_metric:
|
|
msg = (
|
|
f"Metric mismatch: {source_collection} has metric={source_config['metric']}, "
|
|
f"target {target_collection} expects metric={target_metric}"
|
|
)
|
|
if skip_dim_check:
|
|
logger.warning(f" WARNING: {msg} (skipping due to --skip-dim-check)")
|
|
else:
|
|
logger.error(f" ERROR: {msg}")
|
|
stats.add_error(source_collection, "", f"MetricMismatch: {msg}")
|
|
raise DimMismatchError(msg)
|
|
|
|
# Parse collection name
|
|
collection_info = parse_collection_name(source_collection)
|
|
if not collection_info:
|
|
logger.warning(f" Could not parse collection name pattern: {source_collection}")
|
|
collection_info = {"agent_id": None, "scope": "docs", "tags": []}
|
|
|
|
logger.info(f" Mapped to: agent={collection_info['agent_id']}, scope={collection_info['scope']}")
|
|
|
|
# Scroll through all points
|
|
batch_size = config.get("batch_size", 100)
|
|
offset = None
|
|
batch_count = 0
|
|
collection_points_read = 0
|
|
collection_points_migrated = 0
|
|
|
|
while True:
|
|
# Scroll points
|
|
scroll_result = client.scroll(
|
|
collection_name=source_collection,
|
|
offset=offset,
|
|
limit=batch_size,
|
|
with_payload=True,
|
|
with_vectors=True,
|
|
)
|
|
|
|
points, next_offset = scroll_result
|
|
|
|
if not points:
|
|
break
|
|
|
|
batch_count += 1
|
|
stats.points_read += len(points)
|
|
collection_points_read += len(points)
|
|
|
|
# Convert points
|
|
canonical_points = []
|
|
for point in points:
|
|
try:
|
|
legacy_payload = point.payload or {}
|
|
|
|
# Build canonical payload
|
|
canonical_payload = build_canonical_payload(
|
|
legacy_payload=legacy_payload,
|
|
collection_info=collection_info,
|
|
config=config,
|
|
point_id=str(point.id),
|
|
)
|
|
|
|
# Validate payload
|
|
try:
|
|
validate_payload(canonical_payload)
|
|
except PayloadValidationError as e:
|
|
stats.add_error(source_collection, str(point.id), str(e))
|
|
stats.points_skipped += 1
|
|
continue
|
|
|
|
# Compute deterministic ID (rerun-safe)
|
|
source_id = canonical_payload.get("source_id", "")
|
|
chunk_idx = canonical_payload.get("chunk", {}).get("chunk_idx", 0)
|
|
|
|
deterministic_id = compute_deterministic_id(
|
|
source_collection=source_collection,
|
|
legacy_point_id=str(point.id),
|
|
source_id=source_id,
|
|
chunk_idx=chunk_idx,
|
|
)
|
|
|
|
# Store original legacy ID in payload for traceability
|
|
canonical_payload["_legacy_collection"] = source_collection
|
|
canonical_payload["_legacy_point_id"] = str(point.id)
|
|
|
|
# Create new point with deterministic ID
|
|
canonical_points.append(PointStruct(
|
|
id=deterministic_id,
|
|
vector=point.vector,
|
|
payload=canonical_payload,
|
|
))
|
|
|
|
except Exception as e:
|
|
stats.add_error(source_collection, str(point.id), str(e))
|
|
stats.points_skipped += 1
|
|
|
|
# Upsert to canonical collection
|
|
if canonical_points and not dry_run:
|
|
client.upsert(
|
|
collection_name=target_collection,
|
|
points=canonical_points,
|
|
)
|
|
|
|
stats.points_migrated += len(canonical_points)
|
|
collection_points_migrated += len(canonical_points)
|
|
|
|
if batch_count % 10 == 0:
|
|
logger.info(f" Progress: {collection_points_read} read, {collection_points_migrated} migrated")
|
|
|
|
# Continue scrolling
|
|
offset = next_offset
|
|
if offset is None:
|
|
break
|
|
|
|
stats.collections_processed += 1
|
|
logger.info(f" Completed: {collection_points_read} read, {collection_points_migrated} migrated")
|
|
|
|
|
|
def discover_legacy_collections(client: QdrantClient) -> List[str]:
|
|
"""Discover all legacy (non-canonical) collections."""
|
|
collections = client.get_collections().collections
|
|
|
|
legacy = []
|
|
for col in collections:
|
|
if not col.name.startswith("cm_"):
|
|
legacy.append(col.name)
|
|
|
|
return sorted(legacy)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Migrate Qdrant collections to canonical schema"
|
|
)
|
|
parser.add_argument(
|
|
"--host",
|
|
default=os.getenv("QDRANT_HOST", "localhost"),
|
|
help="Qdrant host"
|
|
)
|
|
parser.add_argument(
|
|
"--port",
|
|
type=int,
|
|
default=int(os.getenv("QDRANT_PORT", "6333")),
|
|
help="Qdrant port"
|
|
)
|
|
parser.add_argument(
|
|
"--collections",
|
|
help="Comma-separated list of collections to migrate"
|
|
)
|
|
parser.add_argument(
|
|
"--all",
|
|
action="store_true",
|
|
help="Migrate all legacy collections"
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Show what would be migrated without writing"
|
|
)
|
|
parser.add_argument(
|
|
"--tenant-id",
|
|
default=DEFAULT_CONFIG["tenant_id"],
|
|
help="Tenant ID for payloads"
|
|
)
|
|
parser.add_argument(
|
|
"--team-id",
|
|
default=DEFAULT_CONFIG["team_id"],
|
|
help="Team ID for payloads"
|
|
)
|
|
parser.add_argument(
|
|
"--dim",
|
|
type=int,
|
|
default=DEFAULT_CONFIG["text_dim"],
|
|
help="Vector dimension"
|
|
)
|
|
parser.add_argument(
|
|
"--skip-dim-check",
|
|
action="store_true",
|
|
help="Skip dimension/metric validation (DANGEROUS - may corrupt data)"
|
|
)
|
|
parser.add_argument(
|
|
"--continue-on-error",
|
|
action="store_true",
|
|
help="Continue migrating other collections if one fails"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Configuration
|
|
config = DEFAULT_CONFIG.copy()
|
|
config["tenant_id"] = args.tenant_id
|
|
config["team_id"] = args.team_id
|
|
config["text_dim"] = args.dim
|
|
|
|
# Connect to Qdrant
|
|
logger.info(f"Connecting to Qdrant at {args.host}:{args.port}")
|
|
client = QdrantClient(host=args.host, port=args.port)
|
|
|
|
# Determine collections to migrate
|
|
if args.collections:
|
|
collections = [c.strip() for c in args.collections.split(",")]
|
|
elif args.all:
|
|
collections = discover_legacy_collections(client)
|
|
logger.info(f"Discovered {len(collections)} legacy collections")
|
|
else:
|
|
# List available collections
|
|
legacy = discover_legacy_collections(client)
|
|
print("\nLegacy collections available for migration:")
|
|
for col in legacy:
|
|
info = parse_collection_name(col)
|
|
info_str = f"→ agent={info['agent_id']}, scope={info['scope']}" if info else "→ (unknown pattern)"
|
|
print(f" - {col} {info_str}")
|
|
print("\nUse --collections <names> or --all to migrate")
|
|
return
|
|
|
|
# Target collection
|
|
target_collection = get_canonical_collection_name("text", config["text_dim"])
|
|
|
|
# Ensure target collection exists
|
|
if not args.dry_run:
|
|
logger.info(f"Ensuring target collection: {target_collection}")
|
|
ensure_collection(
|
|
client,
|
|
target_collection,
|
|
config["text_dim"],
|
|
config["text_metric"],
|
|
)
|
|
|
|
# Migrate
|
|
stats = MigrationStats()
|
|
|
|
logger.info(f"Starting migration ({'DRY RUN' if args.dry_run else 'LIVE'})")
|
|
logger.info(f"Target collection: {target_collection}")
|
|
|
|
failed_collections = []
|
|
|
|
for collection in collections:
|
|
try:
|
|
migrate_collection(
|
|
client=client,
|
|
source_collection=collection,
|
|
target_collection=target_collection,
|
|
config=config,
|
|
stats=stats,
|
|
dry_run=args.dry_run,
|
|
skip_dim_check=args.skip_dim_check,
|
|
)
|
|
except DimMismatchError as e:
|
|
logger.error(f"Failed to migrate {collection}: {e}")
|
|
stats.add_error(collection, "", str(e))
|
|
failed_collections.append(collection)
|
|
if not args.continue_on_error:
|
|
logger.error("Stopping migration. Use --continue-on-error to skip failed collections.")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Failed to migrate {collection}: {e}")
|
|
stats.add_error(collection, "", str(e))
|
|
failed_collections.append(collection)
|
|
if not args.continue_on_error:
|
|
break
|
|
|
|
# Summary
|
|
print("\n" + "=" * 50)
|
|
print("MIGRATION SUMMARY")
|
|
print("=" * 50)
|
|
summary = stats.summary()
|
|
print(f"Collections processed: {summary['collections_processed']}/{len(collections)}")
|
|
print(f"Points read: {summary['points_read']}")
|
|
print(f"Points migrated: {summary['points_migrated']}")
|
|
print(f"Points skipped: {summary['points_skipped']}")
|
|
print(f"Errors: {summary['errors_count']}")
|
|
|
|
if failed_collections:
|
|
print(f"\nFailed collections ({len(failed_collections)}):")
|
|
for col in failed_collections:
|
|
print(f" - {col}")
|
|
|
|
if summary['errors']:
|
|
print("\nFirst 10 errors:")
|
|
for err in summary['errors']:
|
|
print(f" - {err['collection']}:{err['point_id']} - {err['error'][:100]}")
|
|
|
|
if args.dry_run:
|
|
print("\n[DRY RUN] No changes were made")
|
|
|
|
# Exit with error code if there were failures
|
|
if failed_collections and not args.continue_on_error:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|