""" Qdrant Collection Management for Co-Memory Handles canonical collection creation and configuration. """ import logging from typing import Any, Dict, List, Optional try: from qdrant_client import QdrantClient from qdrant_client.models import ( Distance, VectorParams, PayloadSchemaType, TextIndexParams, TokenizerType, ) HAS_QDRANT = True except ImportError: HAS_QDRANT = False logger = logging.getLogger(__name__) # Canonical collection naming COLLECTION_PREFIX = "cm" COLLECTION_VERSION = "v1" def get_canonical_collection_name( collection_type: str = "text", dim: int = 1024, version: str = COLLECTION_VERSION ) -> str: """ Generate canonical collection name. Args: collection_type: Type of embeddings (text, code, mm) dim: Vector dimension version: Schema version Returns: Collection name like "cm_text_1024_v1" """ return f"{COLLECTION_PREFIX}_{collection_type}_{dim}_{version}" def get_distance_metric(metric: str) -> "Distance": """Convert metric string to Qdrant Distance enum.""" if not HAS_QDRANT: raise ImportError("qdrant-client not installed") metrics = { "cosine": Distance.COSINE, "dot": Distance.DOT, "euclidean": Distance.EUCLID, } return metrics.get(metric.lower(), Distance.COSINE) # Default payload indexes for optimal query performance DEFAULT_PAYLOAD_INDEXES = [ {"field": "tenant_id", "type": "keyword"}, {"field": "team_id", "type": "keyword"}, {"field": "project_id", "type": "keyword"}, {"field": "agent_id", "type": "keyword"}, {"field": "scope", "type": "keyword"}, {"field": "visibility", "type": "keyword"}, {"field": "indexed", "type": "bool"}, {"field": "source_id", "type": "keyword"}, {"field": "owner_kind", "type": "keyword"}, {"field": "owner_id", "type": "keyword"}, {"field": "tags", "type": "keyword"}, {"field": "acl.read_team_ids", "type": "keyword"}, {"field": "acl.read_agent_ids", "type": "keyword"}, {"field": "acl.read_role_ids", "type": "keyword"}, ] def ensure_collection( client: "QdrantClient", name: str, dim: int, metric: str = "cosine", payload_indexes: Optional[List[Dict[str, str]]] = None, on_disk: bool = True, ) -> bool: """ Ensure a canonical collection exists with proper configuration. Args: client: Qdrant client instance name: Collection name dim: Vector dimension metric: Distance metric (cosine, dot, euclidean) payload_indexes: List of payload fields to index on_disk: Whether to store vectors on disk Returns: True if collection was created, False if already exists """ if not HAS_QDRANT: raise ImportError("qdrant-client not installed") # Check if collection exists collections = client.get_collections().collections existing_names = [c.name for c in collections] if name in existing_names: logger.info(f"Collection '{name}' already exists") # Ensure payload indexes _ensure_payload_indexes(client, name, payload_indexes or DEFAULT_PAYLOAD_INDEXES) return False # Create collection logger.info(f"Creating collection '{name}' with dim={dim}, metric={metric}") client.create_collection( collection_name=name, vectors_config=VectorParams( size=dim, distance=get_distance_metric(metric), on_disk=on_disk, ), ) # Create payload indexes _ensure_payload_indexes(client, name, payload_indexes or DEFAULT_PAYLOAD_INDEXES) logger.info(f"Collection '{name}' created successfully") return True def _ensure_payload_indexes( client: "QdrantClient", collection_name: str, indexes: List[Dict[str, str]] ) -> None: """ Ensure payload indexes exist on collection. Args: client: Qdrant client collection_name: Collection name indexes: List of index configurations """ if not HAS_QDRANT: return for index_config in indexes: field_name = index_config["field"] field_type = index_config.get("type", "keyword") try: if field_type == "keyword": client.create_payload_index( collection_name=collection_name, field_name=field_name, field_schema=PayloadSchemaType.KEYWORD, ) elif field_type == "bool": client.create_payload_index( collection_name=collection_name, field_name=field_name, field_schema=PayloadSchemaType.BOOL, ) elif field_type == "integer": client.create_payload_index( collection_name=collection_name, field_name=field_name, field_schema=PayloadSchemaType.INTEGER, ) elif field_type == "float": client.create_payload_index( collection_name=collection_name, field_name=field_name, field_schema=PayloadSchemaType.FLOAT, ) elif field_type == "datetime": client.create_payload_index( collection_name=collection_name, field_name=field_name, field_schema=PayloadSchemaType.DATETIME, ) elif field_type == "text": client.create_payload_index( collection_name=collection_name, field_name=field_name, field_schema=TextIndexParams( type="text", tokenizer=TokenizerType.WORD, min_token_len=2, max_token_len=15, ), ) logger.debug(f"Created payload index: {field_name} ({field_type})") except Exception as e: # Index might already exist if "already exists" not in str(e).lower(): logger.warning(f"Failed to create index {field_name}: {e}") def get_collection_info(client: "QdrantClient", name: str) -> Optional[Dict[str, Any]]: """ Get collection information. Args: client: Qdrant client name: Collection name Returns: Collection info dict or None if not found """ if not HAS_QDRANT: raise ImportError("qdrant-client not installed") try: info = client.get_collection(name) return { "name": name, "vectors_count": info.vectors_count, "points_count": info.points_count, "status": info.status.value, "config": { "size": info.config.params.vectors.size, "distance": info.config.params.vectors.distance.value, } } except Exception: return None def list_legacy_collections(client: "QdrantClient") -> List[str]: """ List all legacy (non-canonical) collections. Args: client: Qdrant client Returns: List of legacy collection names """ if not HAS_QDRANT: raise ImportError("qdrant-client not installed") collections = client.get_collections().collections legacy = [] for col in collections: # Canonical collections start with "cm_" if not col.name.startswith(f"{COLLECTION_PREFIX}_"): legacy.append(col.name) return legacy