- Vision Encoder Service (OpenCLIP ViT-L/14, GPU-accelerated)
- FastAPI app with text/image embedding endpoints (768-dim)
- Docker support with NVIDIA GPU runtime
- Port 8001, health checks, model info API
- Qdrant Vector Database integration
- Port 6333/6334 (HTTP/gRPC)
- Image embeddings storage (768-dim, Cosine distance)
- Auto collection creation
- Vision RAG implementation
- VisionEncoderClient (Python client for API)
- Image Search module (text-to-image, image-to-image)
- Vision RAG routing in DAGI Router (mode: image_search)
- VisionEncoderProvider integration
- Documentation (5000+ lines)
- SYSTEM-INVENTORY.md - Complete system inventory
- VISION-ENCODER-STATUS.md - Service status
- VISION-RAG-IMPLEMENTATION.md - Implementation details
- vision_encoder_deployment_task.md - Deployment checklist
- services/vision-encoder/README.md - Deployment guide
- Updated WARP.md, INFRASTRUCTURE.md, Jupyter Notebook
- Testing
- test-vision-encoder.sh - Smoke tests (6 tests)
- Unit tests for client, image search, routing
- Services: 17 total (added Vision Encoder + Qdrant)
- AI Models: 3 (qwen3:8b, OpenCLIP ViT-L/14, BAAI/bge-m3)
- GPU Services: 2 (Vision Encoder, Ollama)
- VRAM Usage: ~10 GB (concurrent)
Status: Production Ready ✅
323 lines
10 KiB
Python
323 lines
10 KiB
Python
"""
|
|
Vision Encoder Service - FastAPI app for text and image embeddings using OpenCLIP.
|
|
|
|
Endpoints:
|
|
- POST /embed/text - Generate text embeddings
|
|
- POST /embed/image - Generate image embeddings
|
|
- GET /health - Health check
|
|
- GET /info - Model information
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from typing import List, Optional, Dict, Any
|
|
from contextlib import asynccontextmanager
|
|
|
|
import torch
|
|
import open_clip
|
|
from PIL import Image
|
|
import numpy as np
|
|
from fastapi import FastAPI, HTTPException, UploadFile, File
|
|
from pydantic import BaseModel, Field
|
|
import httpx
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='{"timestamp": "%(asctime)s", "level": "%(levelname)s", "message": "%(message)s", "module": "%(name)s"}'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration from environment
|
|
DEVICE = os.getenv("DEVICE", "cuda" if torch.cuda.is_available() else "cpu")
|
|
MODEL_NAME = os.getenv("MODEL_NAME", "ViT-L-14")
|
|
MODEL_PRETRAINED = os.getenv("MODEL_PRETRAINED", "openai")
|
|
NORMALIZE_EMBEDDINGS = os.getenv("NORMALIZE_EMBEDDINGS", "true").lower() == "true"
|
|
|
|
# Qdrant configuration (optional)
|
|
QDRANT_HOST = os.getenv("QDRANT_HOST", "qdrant")
|
|
QDRANT_PORT = int(os.getenv("QDRANT_PORT", "6333"))
|
|
QDRANT_ENABLED = os.getenv("QDRANT_ENABLED", "false").lower() == "true"
|
|
|
|
# Global model cache
|
|
_model = None
|
|
_preprocess = None
|
|
_tokenizer = None
|
|
|
|
|
|
class TextEmbedRequest(BaseModel):
|
|
"""Request for text embedding."""
|
|
text: str = Field(..., description="Text to embed")
|
|
normalize: bool = Field(True, description="Normalize embedding to unit vector")
|
|
|
|
|
|
class ImageEmbedRequest(BaseModel):
|
|
"""Request for image embedding from URL."""
|
|
image_url: str = Field(..., description="URL of image to embed")
|
|
normalize: bool = Field(True, description="Normalize embedding to unit vector")
|
|
|
|
|
|
class EmbedResponse(BaseModel):
|
|
"""Response with embedding vector."""
|
|
embedding: List[float] = Field(..., description="Embedding vector")
|
|
dimension: int = Field(..., description="Embedding dimension")
|
|
model: str = Field(..., description="Model used for embedding")
|
|
normalized: bool = Field(..., description="Whether embedding is normalized")
|
|
|
|
|
|
class HealthResponse(BaseModel):
|
|
"""Health check response."""
|
|
status: str
|
|
device: str
|
|
model: str
|
|
cuda_available: bool
|
|
gpu_name: Optional[str] = None
|
|
|
|
|
|
class ModelInfo(BaseModel):
|
|
"""Model information response."""
|
|
model_name: str
|
|
pretrained: str
|
|
device: str
|
|
embedding_dim: int
|
|
normalize_default: bool
|
|
qdrant_enabled: bool
|
|
|
|
|
|
def load_model():
|
|
"""Load OpenCLIP model and preprocessing pipeline."""
|
|
global _model, _preprocess, _tokenizer
|
|
|
|
if _model is not None:
|
|
return _model, _preprocess, _tokenizer
|
|
|
|
logger.info(f"Loading model {MODEL_NAME} with pretrained weights {MODEL_PRETRAINED}")
|
|
logger.info(f"Device: {DEVICE}")
|
|
|
|
try:
|
|
# Load model and preprocessing
|
|
model, _, preprocess = open_clip.create_model_and_transforms(
|
|
MODEL_NAME,
|
|
pretrained=MODEL_PRETRAINED,
|
|
device=DEVICE
|
|
)
|
|
|
|
# Get tokenizer
|
|
tokenizer = open_clip.get_tokenizer(MODEL_NAME)
|
|
|
|
# Set to eval mode
|
|
model.eval()
|
|
|
|
_model = model
|
|
_preprocess = preprocess
|
|
_tokenizer = tokenizer
|
|
|
|
# Log model info
|
|
with torch.no_grad():
|
|
dummy_text = tokenizer(["test"])
|
|
text_features = model.encode_text(dummy_text.to(DEVICE))
|
|
embedding_dim = text_features.shape[1]
|
|
|
|
logger.info(f"Model loaded successfully. Embedding dimension: {embedding_dim}")
|
|
|
|
if DEVICE == "cuda":
|
|
gpu_name = torch.cuda.get_device_name(0)
|
|
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
|
|
logger.info(f"GPU: {gpu_name}, Memory: {gpu_memory:.2f} GB")
|
|
|
|
return _model, _preprocess, _tokenizer
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to load model: {e}")
|
|
raise
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Lifespan context manager for model loading."""
|
|
logger.info("Starting vision-encoder service...")
|
|
|
|
# Load model on startup
|
|
try:
|
|
load_model()
|
|
logger.info("Model loaded successfully during startup")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load model during startup: {e}")
|
|
raise
|
|
|
|
yield
|
|
|
|
# Cleanup
|
|
logger.info("Shutting down vision-encoder service...")
|
|
|
|
|
|
# Create FastAPI app
|
|
app = FastAPI(
|
|
title="Vision Encoder Service",
|
|
description="Text and Image embedding service using OpenCLIP",
|
|
version="1.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
|
|
@app.get("/health", response_model=HealthResponse)
|
|
async def health_check():
|
|
"""Health check endpoint."""
|
|
gpu_name = None
|
|
if torch.cuda.is_available():
|
|
gpu_name = torch.cuda.get_device_name(0)
|
|
|
|
return HealthResponse(
|
|
status="healthy",
|
|
device=DEVICE,
|
|
model=f"{MODEL_NAME}/{MODEL_PRETRAINED}",
|
|
cuda_available=torch.cuda.is_available(),
|
|
gpu_name=gpu_name
|
|
)
|
|
|
|
|
|
@app.get("/info", response_model=ModelInfo)
|
|
async def model_info():
|
|
"""Get model information."""
|
|
model, _, _ = load_model()
|
|
|
|
# Get embedding dimension
|
|
with torch.no_grad():
|
|
dummy_text = _tokenizer(["test"])
|
|
text_features = model.encode_text(dummy_text.to(DEVICE))
|
|
embedding_dim = text_features.shape[1]
|
|
|
|
return ModelInfo(
|
|
model_name=MODEL_NAME,
|
|
pretrained=MODEL_PRETRAINED,
|
|
device=DEVICE,
|
|
embedding_dim=embedding_dim,
|
|
normalize_default=NORMALIZE_EMBEDDINGS,
|
|
qdrant_enabled=QDRANT_ENABLED
|
|
)
|
|
|
|
|
|
@app.post("/embed/text", response_model=EmbedResponse)
|
|
async def embed_text(request: TextEmbedRequest):
|
|
"""Generate text embedding."""
|
|
try:
|
|
model, _, tokenizer = load_model()
|
|
|
|
# Tokenize text
|
|
text_tokens = tokenizer([request.text]).to(DEVICE)
|
|
|
|
# Generate embedding
|
|
with torch.no_grad():
|
|
text_features = model.encode_text(text_tokens)
|
|
|
|
# Normalize if requested
|
|
if request.normalize:
|
|
text_features = text_features / text_features.norm(dim=-1, keepdim=True)
|
|
|
|
# Convert to numpy and then to list
|
|
embedding = text_features.cpu().numpy()[0].tolist()
|
|
|
|
return EmbedResponse(
|
|
embedding=embedding,
|
|
dimension=len(embedding),
|
|
model=f"{MODEL_NAME}/{MODEL_PRETRAINED}",
|
|
normalized=request.normalize
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating text embedding: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to generate text embedding: {str(e)}")
|
|
|
|
|
|
@app.post("/embed/image", response_model=EmbedResponse)
|
|
async def embed_image_from_url(request: ImageEmbedRequest):
|
|
"""Generate image embedding from URL."""
|
|
try:
|
|
model, preprocess, _ = load_model()
|
|
|
|
# Download image
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
response = await client.get(request.image_url)
|
|
response.raise_for_status()
|
|
image_bytes = response.content
|
|
|
|
# Load and preprocess image
|
|
from io import BytesIO
|
|
image = Image.open(BytesIO(image_bytes)).convert("RGB")
|
|
image_tensor = preprocess(image).unsqueeze(0).to(DEVICE)
|
|
|
|
# Generate embedding
|
|
with torch.no_grad():
|
|
image_features = model.encode_image(image_tensor)
|
|
|
|
# Normalize if requested
|
|
if request.normalize:
|
|
image_features = image_features / image_features.norm(dim=-1, keepdim=True)
|
|
|
|
# Convert to numpy and then to list
|
|
embedding = image_features.cpu().numpy()[0].tolist()
|
|
|
|
return EmbedResponse(
|
|
embedding=embedding,
|
|
dimension=len(embedding),
|
|
model=f"{MODEL_NAME}/{MODEL_PRETRAINED}",
|
|
normalized=request.normalize
|
|
)
|
|
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"Failed to download image from URL: {e}")
|
|
raise HTTPException(status_code=400, detail=f"Failed to download image: {str(e)}")
|
|
except Exception as e:
|
|
logger.error(f"Error generating image embedding: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to generate image embedding: {str(e)}")
|
|
|
|
|
|
@app.post("/embed/image/upload", response_model=EmbedResponse)
|
|
async def embed_image_from_upload(
|
|
file: UploadFile = File(...),
|
|
normalize: bool = True
|
|
):
|
|
"""Generate image embedding from uploaded file."""
|
|
try:
|
|
model, preprocess, _ = load_model()
|
|
|
|
# Read uploaded file
|
|
image_bytes = await file.read()
|
|
|
|
# Load and preprocess image
|
|
from io import BytesIO
|
|
image = Image.open(BytesIO(image_bytes)).convert("RGB")
|
|
image_tensor = preprocess(image).unsqueeze(0).to(DEVICE)
|
|
|
|
# Generate embedding
|
|
with torch.no_grad():
|
|
image_features = model.encode_image(image_tensor)
|
|
|
|
# Normalize if requested
|
|
if normalize:
|
|
image_features = image_features / image_features.norm(dim=-1, keepdim=True)
|
|
|
|
# Convert to numpy and then to list
|
|
embedding = image_features.cpu().numpy()[0].tolist()
|
|
|
|
return EmbedResponse(
|
|
embedding=embedding,
|
|
dimension=len(embedding),
|
|
model=f"{MODEL_NAME}/{MODEL_PRETRAINED}",
|
|
normalized=normalize
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating image embedding from upload: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to generate image embedding: {str(e)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
port = int(os.getenv("PORT", "8001"))
|
|
host = os.getenv("HOST", "0.0.0.0")
|
|
|
|
logger.info(f"Starting server on {host}:{port}")
|
|
uvicorn.run(app, host=host, port=port, log_level="info")
|