#!/usr/bin/env python3 """ Qdrant Migration Script (REST API version) No qdrant-client dependency - uses pure REST API. Usage: python3 qdrant_migrate_rest.py --dry-run python3 qdrant_migrate_rest.py --all """ import argparse import hashlib import json import logging import os import re import sys import urllib.request import urllib.error from datetime import datetime from typing import Any, Dict, List, Optional, Tuple logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) logger = logging.getLogger(__name__) # Configuration DEFAULT_CONFIG = { "qdrant_url": "http://localhost:6333", "tenant_id": "t_daarion", "team_id": "team_core", "text_dim": 1024, "text_metric": "Cosine", # Qdrant uses capitalized "default_visibility": "confidential", "default_owner_kind": "agent", "batch_size": 100, } # Collection patterns COLLECTION_PATTERNS = [ (r"^([a-z]+)_docs$", 1, "docs", []), (r"^([a-z]+)_messages$", 1, "messages", []), (r"^([a-z]+)_memory_items$", 1, "memory", []), (r"^([a-z]+)_artifacts$", 1, "artifacts", []), (r"^druid_legal_kb$", None, "docs", ["legal_kb"]), (r"^nutra_food_knowledge$", None, "docs", ["food_kb"]), (r"^memories$", None, "memory", []), (r"^messages$", None, "messages", []), ] AGENT_SLUGS = { "helion": "agt_helion", "nutra": "agt_nutra", "druid": "agt_druid", "greenfood": "agt_greenfood", "agromatrix": "agt_agromatrix", "daarwizz": "agt_daarwizz", "alateya": "agt_alateya", } def qdrant_request(url: str, method: str = "GET", data: Optional[Dict] = None) -> Dict: """Make HTTP request to Qdrant.""" req = urllib.request.Request(url, method=method) req.add_header("Content-Type", "application/json") body = None if data: body = json.dumps(data).encode("utf-8") try: with urllib.request.urlopen(req, body, timeout=30) as resp: return json.loads(resp.read().decode("utf-8")) except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") if e.fp else "" raise Exception(f"Qdrant error {e.code}: {error_body}") def get_collections(base_url: str) -> List[str]: """Get list of all collections.""" resp = qdrant_request(f"{base_url}/collections") return [c["name"] for c in resp.get("result", {}).get("collections", [])] def get_collection_info(base_url: str, name: str) -> Dict: """Get collection info including vector config.""" resp = qdrant_request(f"{base_url}/collections/{name}") result = resp.get("result", {}) config = result.get("config", {}).get("params", {}).get("vectors", {}) return { "name": name, "points_count": result.get("points_count", 0), "dim": config.get("size"), "metric": config.get("distance"), } def create_collection(base_url: str, name: str, dim: int, metric: str) -> bool: """Create a new collection.""" data = { "vectors": { "size": dim, "distance": metric, } } try: qdrant_request(f"{base_url}/collections/{name}", "PUT", data) return True except Exception as e: if "already exists" in str(e).lower(): return False raise def scroll_points(base_url: str, collection: str, limit: int = 100, offset: Optional[str] = None) -> Tuple[List, Optional[str]]: """Scroll through collection points.""" data = { "limit": limit, "with_payload": True, "with_vector": True, } if offset: data["offset"] = offset resp = qdrant_request(f"{base_url}/collections/{collection}/points/scroll", "POST", data) result = resp.get("result", {}) points = result.get("points", []) next_offset = result.get("next_page_offset") return points, next_offset def upsert_points(base_url: str, collection: str, points: List[Dict]) -> None: """Upsert points to collection.""" data = {"points": points} qdrant_request(f"{base_url}/collections/{collection}/points", "PUT", data) def compute_deterministic_id(source_collection: str, legacy_id: str, source_id: str, chunk_idx: int) -> str: """Compute deterministic point ID (32 hex chars).""" content = f"{source_collection}|{legacy_id}|{source_id}|{chunk_idx}" return hashlib.sha256(content.encode()).hexdigest()[:32] def compute_fingerprint(source_id: str, chunk_idx: int, text: str = "") -> str: """Compute stable fingerprint.""" content = f"{source_id}:{chunk_idx}:{text}" return f"sha256:{hashlib.sha256(content.encode()).hexdigest()[:32]}" def parse_collection_name(name: str) -> Optional[Dict]: """Parse legacy collection name.""" for pattern, agent_group, scope, tags in COLLECTION_PATTERNS: match = re.match(pattern, name.lower()) if match: agent_id = None if agent_group is not None: agent_slug = match.group(agent_group).lower() agent_id = AGENT_SLUGS.get(agent_slug, f"agt_{agent_slug}") elif "druid" in name.lower(): agent_id = "agt_druid" elif "nutra" in name.lower(): agent_id = "agt_nutra" return {"agent_id": agent_id, "scope": scope, "tags": tags.copy()} return None def build_canonical_payload(legacy_payload: Dict, collection_info: Dict, config: Dict, point_id: str) -> Dict: """Build canonical payload from legacy.""" agent_id = collection_info.get("agent_id") scope = collection_info.get("scope", "docs") tags = collection_info.get("tags", []) source_id = ( legacy_payload.get("source_id") or legacy_payload.get("document_id") or legacy_payload.get("doc_id") or f"doc_{point_id}" ) if not re.match(r"^(doc|msg|art|web|code)_", source_id): prefix = "msg" if scope == "messages" else "doc" source_id = f"{prefix}_{source_id}" chunk_idx = legacy_payload.get("chunk_idx", legacy_payload.get("chunk_index", 0)) chunk_id = legacy_payload.get("chunk_id") or f"chk_{point_id}" if not chunk_id.startswith("chk_"): chunk_id = f"chk_{chunk_id}" text = legacy_payload.get("text", legacy_payload.get("content", "")) fingerprint = legacy_payload.get("fingerprint") or compute_fingerprint(source_id, chunk_idx, text) created_at = ( legacy_payload.get("created_at") or legacy_payload.get("timestamp") or datetime.utcnow().isoformat() + "Z" ) source_kind = {"docs": "document", "messages": "message", "memory": "document", "artifacts": "artifact"}.get(scope, "document") payload = { "schema_version": "cm_payload_v1", "tenant_id": config["tenant_id"], "team_id": config.get("team_id"), "project_id": legacy_payload.get("project_id"), "agent_id": agent_id, "owner_kind": config["default_owner_kind"], "owner_id": agent_id or config["team_id"], "scope": scope, "visibility": legacy_payload.get("visibility", config["default_visibility"]), "indexed": legacy_payload.get("indexed", True), "source_kind": source_kind, "source_id": source_id, "chunk": {"chunk_id": chunk_id, "chunk_idx": chunk_idx}, "fingerprint": fingerprint, "created_at": created_at, "_legacy_collection": collection_info.get("_collection"), "_legacy_point_id": point_id, } if tags: payload["tags"] = tags if legacy_payload.get("tags"): payload["tags"] = list(set(payload.get("tags", []) + legacy_payload["tags"])) for field in ["lang", "importance", "channel_id"]: if field in legacy_payload: payload[field] = legacy_payload[field] return payload class MigrationStats: def __init__(self): self.collections_processed = 0 self.points_read = 0 self.points_migrated = 0 self.points_skipped = 0 self.errors = [] self.dim_mismatches = [] def summary(self) -> Dict: return { "collections_processed": self.collections_processed, "points_read": self.points_read, "points_migrated": self.points_migrated, "points_skipped": self.points_skipped, "errors_count": len(self.errors), "dim_mismatches": self.dim_mismatches, } def migrate_collection( base_url: str, source_collection: str, target_collection: str, config: Dict, stats: MigrationStats, dry_run: bool = True, ) -> None: """Migrate a single collection.""" logger.info(f"Migrating: {source_collection}") # Get source info source_info = get_collection_info(base_url, source_collection) logger.info(f" Source: dim={source_info['dim']}, metric={source_info['metric']}, points={source_info['points_count']}") # Check dim/metric if source_info["dim"] != config["text_dim"]: msg = f"Dim mismatch: {source_collection} has {source_info['dim']}, target expects {config['text_dim']}" logger.error(f" ❌ {msg}") stats.dim_mismatches.append(source_collection) stats.errors.append({"collection": source_collection, "error": msg}) return if source_info["metric"] and source_info["metric"] != config["text_metric"]: msg = f"Metric mismatch: {source_collection} has {source_info['metric']}, target expects {config['text_metric']}" logger.warning(f" ⚠️ {msg}") # Parse collection name collection_info = parse_collection_name(source_collection) or {"agent_id": None, "scope": "docs", "tags": []} collection_info["_collection"] = source_collection logger.info(f" Mapped: agent={collection_info['agent_id']}, scope={collection_info['scope']}") if dry_run: logger.info(f" [DRY RUN] Would migrate {source_info['points_count']} points") stats.points_read += source_info['points_count'] stats.points_migrated += source_info['points_count'] stats.collections_processed += 1 return # Scroll and migrate offset = None batch_count = 0 collection_migrated = 0 while True: points, next_offset = scroll_points(base_url, source_collection, config["batch_size"], offset) if not points: break batch_count += 1 stats.points_read += len(points) canonical_points = [] for point in points: try: legacy_payload = point.get("payload", {}) vector = point.get("vector", []) point_id = str(point.get("id", "")) canonical_payload = build_canonical_payload(legacy_payload, collection_info, config, point_id) source_id = canonical_payload.get("source_id", "") chunk_idx = canonical_payload.get("chunk", {}).get("chunk_idx", 0) det_id = compute_deterministic_id(source_collection, point_id, source_id, chunk_idx) canonical_points.append({ "id": det_id, "vector": vector, "payload": canonical_payload, }) except Exception as e: stats.errors.append({"collection": source_collection, "point": point_id, "error": str(e)}) stats.points_skipped += 1 if canonical_points: upsert_points(base_url, target_collection, canonical_points) collection_migrated += len(canonical_points) stats.points_migrated += len(canonical_points) if batch_count % 10 == 0: logger.info(f" Progress: {stats.points_read} read, {collection_migrated} migrated") offset = next_offset if not offset: break stats.collections_processed += 1 logger.info(f" Completed: {collection_migrated} points migrated") def main(): parser = argparse.ArgumentParser(description="Qdrant Migration (REST API)") parser.add_argument("--url", default=DEFAULT_CONFIG["qdrant_url"], help="Qdrant URL") parser.add_argument("--collections", help="Comma-separated collections") parser.add_argument("--all", action="store_true", help="Migrate all legacy collections") parser.add_argument("--dry-run", action="store_true", help="Show what would be migrated") parser.add_argument("--dim", type=int, default=DEFAULT_CONFIG["text_dim"], help="Target dimension") parser.add_argument("--continue-on-error", action="store_true", help="Continue on errors") args = parser.parse_args() config = DEFAULT_CONFIG.copy() config["qdrant_url"] = args.url config["text_dim"] = args.dim base_url = config["qdrant_url"] # Get collections logger.info(f"Connecting to Qdrant at {base_url}") all_collections = get_collections(base_url) logger.info(f"Found {len(all_collections)} collections") # Determine which to migrate if args.collections: collections = [c.strip() for c in args.collections.split(",")] elif args.all: collections = [c for c in all_collections if not c.startswith("cm_")] logger.info(f"Legacy collections to migrate: {len(collections)}") else: print("\nLegacy collections:") for col in all_collections: if not col.startswith("cm_"): info = parse_collection_name(col) info_str = f"→ agent={info['agent_id']}, scope={info['scope']}" if info else "→ (unknown)" print(f" - {col} {info_str}") print("\nUse --collections or --all to migrate") return # Target collection target_collection = f"cm_text_{config['text_dim']}_v1" logger.info(f"Target collection: {target_collection}") # Create target if needed if not args.dry_run: if target_collection not in all_collections: logger.info(f"Creating target collection: {target_collection}") create_collection(base_url, target_collection, config["text_dim"], config["text_metric"]) else: logger.info(f"Target collection exists: {target_collection}") # Migrate stats = MigrationStats() logger.info(f"Starting migration ({'DRY RUN' if args.dry_run else 'LIVE'})") for collection in collections: try: migrate_collection(base_url, collection, target_collection, config, stats, args.dry_run) except Exception as e: logger.error(f"Failed: {collection}: {e}") stats.errors.append({"collection": collection, "error": str(e)}) if not args.continue_on_error: break # Summary print("\n" + "=" * 60) print("MIGRATION SUMMARY") print("=" * 60) summary = stats.summary() print(f"Target collection: {target_collection}") print(f"Collections processed: {summary['collections_processed']}/{len(collections)}") print(f"Points read: {summary['points_read']}") print(f"Points migrated: {summary['points_migrated']}") print(f"Points skipped: {summary['points_skipped']}") print(f"Errors: {summary['errors_count']}") if summary['dim_mismatches']: print(f"\n❌ Dim mismatches ({len(summary['dim_mismatches'])}):") for col in summary['dim_mismatches']: print(f" - {col}") if stats.errors[:5]: print("\nFirst 5 errors:") for err in stats.errors[:5]: print(f" - {err}") if args.dry_run: print("\n[DRY RUN] No changes were made") if summary['dim_mismatches']: print("\n❌ MIGRATION BLOCKED due to dim mismatches") sys.exit(1) if __name__ == "__main__": main()