#!/usr/bin/env python3 """ Qdrant Migration Script: Legacy Collections → Canonical Migrates points from per-agent collections to canonical cm_text_* collection with proper payload mapping. Usage: python qdrant_migrate_to_canonical.py --dry-run python qdrant_migrate_to_canonical.py --collections helion_docs,nutra_messages python qdrant_migrate_to_canonical.py --all """ import argparse import hashlib import json import logging import os import re import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from uuid import uuid4 try: from qdrant_client import QdrantClient from qdrant_client.models import PointStruct except ImportError: print("Error: qdrant-client not installed. Run: pip install qdrant-client") sys.exit(1) try: import ulid HAS_ULID = True except ImportError: HAS_ULID = False # Add parent to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from services.memory.qdrant.payload_validation import validate_payload, PayloadValidationError from services.memory.qdrant.collections import ensure_collection, get_canonical_collection_name logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) logger = logging.getLogger(__name__) # Default configuration DEFAULT_CONFIG = { "tenant_id": "t_daarion", "team_id": "team_core", "text_dim": 1024, "text_metric": "cosine", "default_visibility": "confidential", "default_owner_kind": "agent", "batch_size": 100, } # Collection name patterns for mapping COLLECTION_PATTERNS = [ # (regex, agent_group_idx, scope, tags) (r"^([a-z]+)_docs$", 1, "docs", []), (r"^([a-z]+)_messages$", 1, "messages", []), (r"^([a-z]+)_memory_items$", 1, "memory", []), (r"^([a-z]+)_artifacts$", 1, "artifacts", []), (r"^druid_legal_kb$", None, "docs", ["legal_kb"]), (r"^nutra_food_knowledge$", None, "docs", ["food_kb"]), (r"^memories$", None, "memory", []), (r"^messages$", None, "messages", []), ] # Agent slug mapping AGENT_SLUGS = { "helion": "agt_helion", "nutra": "agt_nutra", "druid": "agt_druid", "greenfood": "agt_greenfood", "agromatrix": "agt_agromatrix", "daarwizz": "agt_daarwizz", "alateya": "agt_alateya", } def generate_id(prefix: str = "doc") -> str: """Generate a unique ID with prefix.""" if HAS_ULID: return f"{prefix}_{ulid.new().str}" return f"{prefix}_{uuid4().hex[:24].upper()}" def compute_deterministic_id( source_collection: str, legacy_point_id: str, source_id: str, chunk_idx: int, ) -> str: """ Compute deterministic point ID for migration (rerun-safe). ID is stable across reruns: same input = same output. This allows safe re-migration without duplicates. Uses 32 hex chars (128 bits) to minimize collision risk for large collections (10M+ points). """ # Create stable hash from all identifying components content = f"{source_collection}|{legacy_point_id}|{source_id}|{chunk_idx}" hash_hex = hashlib.sha256(content.encode()).hexdigest()[:32] return hash_hex def compute_fingerprint(source_id: str, chunk_idx: int, text: str = "") -> str: """ Compute stable fingerprint for deduplication. IMPORTANT: Does NOT use timestamp or random values. Same content = same fingerprint (idempotent). """ content = f"{source_id}:{chunk_idx}:{text}" return f"sha256:{hashlib.sha256(content.encode()).hexdigest()[:32]}" def get_collection_vector_config(client: QdrantClient, collection_name: str) -> Dict[str, Any]: """ Get vector configuration from a collection. Returns: Dict with 'dim' and 'metric' keys """ try: info = client.get_collection(collection_name) vectors_config = info.config.params.vectors # Handle both named and unnamed vectors if hasattr(vectors_config, 'size'): # Unnamed vectors return { "dim": vectors_config.size, "metric": vectors_config.distance.value.lower(), } elif hasattr(vectors_config, '__iter__'): # Named vectors - get first one for name, config in vectors_config.items(): return { "dim": config.size, "metric": config.distance.value.lower(), } return {"dim": None, "metric": None} except Exception as e: logger.warning(f"Could not get vector config for {collection_name}: {e}") return {"dim": None, "metric": None} class DimMismatchError(Exception): """Raised when source and target collection dimensions don't match.""" pass def parse_collection_name(name: str) -> Optional[Dict[str, Any]]: """ Parse legacy collection name to extract agent_id, scope, tags. Returns: Dict with agent_id, scope, tags or None if no match """ for pattern, agent_group, scope, tags in COLLECTION_PATTERNS: match = re.match(pattern, name.lower()) if match: agent_id = None if agent_group is not None: agent_slug = match.group(agent_group).lower() agent_id = AGENT_SLUGS.get(agent_slug, f"agt_{agent_slug}") elif "druid" in name.lower(): agent_id = "agt_druid" elif "nutra" in name.lower(): agent_id = "agt_nutra" return { "agent_id": agent_id, "scope": scope, "tags": tags.copy(), } return None def build_canonical_payload( legacy_payload: Dict[str, Any], collection_info: Dict[str, Any], config: Dict[str, Any], point_id: str, ) -> Dict[str, Any]: """ Build canonical cm_payload_v1 from legacy payload. Args: legacy_payload: Original payload from legacy collection collection_info: Parsed collection name info (agent_id, scope, tags) config: Migration configuration point_id: Original point ID Returns: Canonical payload """ # Extract values from legacy payload or use defaults agent_id = collection_info.get("agent_id") scope = collection_info.get("scope", "docs") tags = collection_info.get("tags", []) # Try to get source_id from legacy payload source_id = ( legacy_payload.get("source_id") or legacy_payload.get("document_id") or legacy_payload.get("doc_id") or legacy_payload.get("message_id") or generate_id("doc") ) # Ensure source_id has proper prefix if not re.match(r"^(doc|msg|art|web|code)_", source_id): prefix = "msg" if scope == "messages" else "doc" source_id = f"{prefix}_{source_id}" # Chunk info chunk_idx = legacy_payload.get("chunk_idx", legacy_payload.get("chunk_index", 0)) chunk_id = legacy_payload.get("chunk_id") or generate_id("chk") if not chunk_id.startswith("chk_"): chunk_id = f"chk_{chunk_id}" # Fingerprint text = legacy_payload.get("text", legacy_payload.get("content", "")) fingerprint = ( legacy_payload.get("fingerprint") or legacy_payload.get("hash") or compute_fingerprint(source_id, chunk_idx, text) ) # Timestamp created_at = ( legacy_payload.get("created_at") or legacy_payload.get("timestamp") or legacy_payload.get("indexed_at") or datetime.utcnow().isoformat() + "Z" ) # Build canonical payload payload = { "schema_version": "cm_payload_v1", "tenant_id": config["tenant_id"], "team_id": config.get("team_id"), "project_id": legacy_payload.get("project_id"), "agent_id": agent_id, "owner_kind": config["default_owner_kind"], "owner_id": agent_id or config["team_id"], "scope": scope, "visibility": legacy_payload.get("visibility", config["default_visibility"]), "indexed": legacy_payload.get("indexed", True), "source_kind": _infer_source_kind(scope), "source_id": source_id, "chunk": { "chunk_id": chunk_id, "chunk_idx": chunk_idx, }, "fingerprint": fingerprint, "created_at": created_at, } # Add tags if tags: payload["tags"] = tags legacy_tags = legacy_payload.get("tags", []) if legacy_tags: payload["tags"] = list(set(payload.get("tags", []) + legacy_tags)) # Preserve additional fields for field in ["lang", "importance", "ttl_days", "channel_id"]: if field in legacy_payload: payload[field] = legacy_payload[field] # Preserve text/content for debugging (optional) if text: payload["_text"] = text[:500] # Truncate for safety return payload def _infer_source_kind(scope: str) -> str: """Infer source_kind from scope.""" mapping = { "docs": "document", "messages": "message", "memory": "document", "artifacts": "artifact", "signals": "document", } return mapping.get(scope, "document") class MigrationStats: """Track migration statistics.""" def __init__(self): self.collections_processed = 0 self.points_read = 0 self.points_migrated = 0 self.points_skipped = 0 self.errors = [] def add_error(self, collection: str, point_id: str, error: str): self.errors.append({ "collection": collection, "point_id": point_id, "error": error, }) def summary(self) -> Dict[str, Any]: return { "collections_processed": self.collections_processed, "points_read": self.points_read, "points_migrated": self.points_migrated, "points_skipped": self.points_skipped, "errors_count": len(self.errors), "errors": self.errors[:10] if self.errors else [], } def migrate_collection( client: QdrantClient, source_collection: str, target_collection: str, config: Dict[str, Any], stats: MigrationStats, dry_run: bool = True, skip_dim_check: bool = False, ) -> None: """ Migrate a single legacy collection to canonical collection. Args: client: Qdrant client source_collection: Legacy collection name target_collection: Canonical collection name config: Migration configuration stats: Statistics tracker dry_run: If True, don't actually write skip_dim_check: Skip dimension/metric validation (dangerous!) Raises: DimMismatchError: If source dim/metric doesn't match target """ logger.info(f"Migrating collection: {source_collection}") # === SECURITY: Verify dim/metric match === source_config = get_collection_vector_config(client, source_collection) target_dim = config.get("text_dim") target_metric = config.get("text_metric", "cosine") logger.info(f" Source: dim={source_config['dim']}, metric={source_config['metric']}") logger.info(f" Target: dim={target_dim}, metric={target_metric}") if source_config["dim"] and source_config["dim"] != target_dim: msg = ( f"Dimension mismatch: {source_collection} has dim={source_config['dim']}, " f"target {target_collection} expects dim={target_dim}" ) if skip_dim_check: logger.warning(f" WARNING: {msg} (skipping due to --skip-dim-check)") else: logger.error(f" ERROR: {msg}") stats.add_error(source_collection, "", f"DimMismatch: {msg}") raise DimMismatchError(msg) if source_config["metric"] and source_config["metric"] != target_metric: msg = ( f"Metric mismatch: {source_collection} has metric={source_config['metric']}, " f"target {target_collection} expects metric={target_metric}" ) if skip_dim_check: logger.warning(f" WARNING: {msg} (skipping due to --skip-dim-check)") else: logger.error(f" ERROR: {msg}") stats.add_error(source_collection, "", f"MetricMismatch: {msg}") raise DimMismatchError(msg) # Parse collection name collection_info = parse_collection_name(source_collection) if not collection_info: logger.warning(f" Could not parse collection name pattern: {source_collection}") collection_info = {"agent_id": None, "scope": "docs", "tags": []} logger.info(f" Mapped to: agent={collection_info['agent_id']}, scope={collection_info['scope']}") # Scroll through all points batch_size = config.get("batch_size", 100) offset = None batch_count = 0 collection_points_read = 0 collection_points_migrated = 0 while True: # Scroll points scroll_result = client.scroll( collection_name=source_collection, offset=offset, limit=batch_size, with_payload=True, with_vectors=True, ) points, next_offset = scroll_result if not points: break batch_count += 1 stats.points_read += len(points) collection_points_read += len(points) # Convert points canonical_points = [] for point in points: try: legacy_payload = point.payload or {} # Build canonical payload canonical_payload = build_canonical_payload( legacy_payload=legacy_payload, collection_info=collection_info, config=config, point_id=str(point.id), ) # Validate payload try: validate_payload(canonical_payload) except PayloadValidationError as e: stats.add_error(source_collection, str(point.id), str(e)) stats.points_skipped += 1 continue # Compute deterministic ID (rerun-safe) source_id = canonical_payload.get("source_id", "") chunk_idx = canonical_payload.get("chunk", {}).get("chunk_idx", 0) deterministic_id = compute_deterministic_id( source_collection=source_collection, legacy_point_id=str(point.id), source_id=source_id, chunk_idx=chunk_idx, ) # Store original legacy ID in payload for traceability canonical_payload["_legacy_collection"] = source_collection canonical_payload["_legacy_point_id"] = str(point.id) # Create new point with deterministic ID canonical_points.append(PointStruct( id=deterministic_id, vector=point.vector, payload=canonical_payload, )) except Exception as e: stats.add_error(source_collection, str(point.id), str(e)) stats.points_skipped += 1 # Upsert to canonical collection if canonical_points and not dry_run: client.upsert( collection_name=target_collection, points=canonical_points, ) stats.points_migrated += len(canonical_points) collection_points_migrated += len(canonical_points) if batch_count % 10 == 0: logger.info(f" Progress: {collection_points_read} read, {collection_points_migrated} migrated") # Continue scrolling offset = next_offset if offset is None: break stats.collections_processed += 1 logger.info(f" Completed: {collection_points_read} read, {collection_points_migrated} migrated") def discover_legacy_collections(client: QdrantClient) -> List[str]: """Discover all legacy (non-canonical) collections.""" collections = client.get_collections().collections legacy = [] for col in collections: if not col.name.startswith("cm_"): legacy.append(col.name) return sorted(legacy) def main(): parser = argparse.ArgumentParser( description="Migrate Qdrant collections to canonical schema" ) parser.add_argument( "--host", default=os.getenv("QDRANT_HOST", "localhost"), help="Qdrant host" ) parser.add_argument( "--port", type=int, default=int(os.getenv("QDRANT_PORT", "6333")), help="Qdrant port" ) parser.add_argument( "--collections", help="Comma-separated list of collections to migrate" ) parser.add_argument( "--all", action="store_true", help="Migrate all legacy collections" ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be migrated without writing" ) parser.add_argument( "--tenant-id", default=DEFAULT_CONFIG["tenant_id"], help="Tenant ID for payloads" ) parser.add_argument( "--team-id", default=DEFAULT_CONFIG["team_id"], help="Team ID for payloads" ) parser.add_argument( "--dim", type=int, default=DEFAULT_CONFIG["text_dim"], help="Vector dimension" ) parser.add_argument( "--skip-dim-check", action="store_true", help="Skip dimension/metric validation (DANGEROUS - may corrupt data)" ) parser.add_argument( "--continue-on-error", action="store_true", help="Continue migrating other collections if one fails" ) args = parser.parse_args() # Configuration config = DEFAULT_CONFIG.copy() config["tenant_id"] = args.tenant_id config["team_id"] = args.team_id config["text_dim"] = args.dim # Connect to Qdrant logger.info(f"Connecting to Qdrant at {args.host}:{args.port}") client = QdrantClient(host=args.host, port=args.port) # Determine collections to migrate if args.collections: collections = [c.strip() for c in args.collections.split(",")] elif args.all: collections = discover_legacy_collections(client) logger.info(f"Discovered {len(collections)} legacy collections") else: # List available collections legacy = discover_legacy_collections(client) print("\nLegacy collections available for migration:") for col in legacy: info = parse_collection_name(col) info_str = f"→ agent={info['agent_id']}, scope={info['scope']}" if info else "→ (unknown pattern)" print(f" - {col} {info_str}") print("\nUse --collections or --all to migrate") return # Target collection target_collection = get_canonical_collection_name("text", config["text_dim"]) # Ensure target collection exists if not args.dry_run: logger.info(f"Ensuring target collection: {target_collection}") ensure_collection( client, target_collection, config["text_dim"], config["text_metric"], ) # Migrate stats = MigrationStats() logger.info(f"Starting migration ({'DRY RUN' if args.dry_run else 'LIVE'})") logger.info(f"Target collection: {target_collection}") failed_collections = [] for collection in collections: try: migrate_collection( client=client, source_collection=collection, target_collection=target_collection, config=config, stats=stats, dry_run=args.dry_run, skip_dim_check=args.skip_dim_check, ) except DimMismatchError as e: logger.error(f"Failed to migrate {collection}: {e}") stats.add_error(collection, "", str(e)) failed_collections.append(collection) if not args.continue_on_error: logger.error("Stopping migration. Use --continue-on-error to skip failed collections.") break except Exception as e: logger.error(f"Failed to migrate {collection}: {e}") stats.add_error(collection, "", str(e)) failed_collections.append(collection) if not args.continue_on_error: break # Summary print("\n" + "=" * 50) print("MIGRATION SUMMARY") print("=" * 50) summary = stats.summary() print(f"Collections processed: {summary['collections_processed']}/{len(collections)}") print(f"Points read: {summary['points_read']}") print(f"Points migrated: {summary['points_migrated']}") print(f"Points skipped: {summary['points_skipped']}") print(f"Errors: {summary['errors_count']}") if failed_collections: print(f"\nFailed collections ({len(failed_collections)}):") for col in failed_collections: print(f" - {col}") if summary['errors']: print("\nFirst 10 errors:") for err in summary['errors']: print(f" - {err['collection']}:{err['point_id']} - {err['error'][:100]}") if args.dry_run: print("\n[DRY RUN] No changes were made") # Exit with error code if there were failures if failed_collections and not args.continue_on_error: sys.exit(1) if __name__ == "__main__": main()