- matrix-gateway: POST /internal/matrix/presence/online endpoint - usePresenceHeartbeat hook with activity tracking - Auto away after 5 min inactivity - Offline on page close/visibility change - Integrated in MatrixChatRoom component
438 lines
17 KiB
Python
438 lines
17 KiB
Python
"""
|
|
Swapper Service - Dynamic Model Loading Service
|
|
Manages loading/unloading LLM models on-demand to optimize memory usage.
|
|
Supports single-active model mode (one model loaded at a time).
|
|
"""
|
|
|
|
import os
|
|
import asyncio
|
|
import logging
|
|
from typing import Optional, Dict, List, Any
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
|
|
from fastapi import FastAPI, HTTPException, BackgroundTasks
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
import httpx
|
|
import yaml
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ========== Configuration ==========
|
|
|
|
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
|
|
SWAPPER_CONFIG_PATH = os.getenv("SWAPPER_CONFIG_PATH", "./config/swapper_config.yaml")
|
|
SWAPPER_MODE = os.getenv("SWAPPER_MODE", "single-active") # single-active or multi-active
|
|
MAX_CONCURRENT_MODELS = int(os.getenv("MAX_CONCURRENT_MODELS", "1"))
|
|
MODEL_SWAP_TIMEOUT = int(os.getenv("MODEL_SWAP_TIMEOUT", "30"))
|
|
|
|
# ========== Models ==========
|
|
|
|
class ModelStatus(str, Enum):
|
|
"""Model status"""
|
|
LOADED = "loaded"
|
|
LOADING = "loading"
|
|
UNLOADED = "unloaded"
|
|
UNLOADING = "unloading"
|
|
ERROR = "error"
|
|
|
|
class ModelInfo(BaseModel):
|
|
"""Model information"""
|
|
name: str
|
|
ollama_name: str
|
|
type: str # llm, code, vision, math
|
|
size_gb: float
|
|
priority: str # high, medium, low
|
|
status: ModelStatus
|
|
loaded_at: Optional[datetime] = None
|
|
unloaded_at: Optional[datetime] = None
|
|
total_uptime_seconds: float = 0.0
|
|
request_count: int = 0
|
|
|
|
class SwapperStatus(BaseModel):
|
|
"""Swapper service status"""
|
|
status: str
|
|
active_model: Optional[str] = None
|
|
available_models: List[str]
|
|
loaded_models: List[str]
|
|
mode: str
|
|
total_models: int
|
|
|
|
class ModelMetrics(BaseModel):
|
|
"""Model usage metrics"""
|
|
model_name: str
|
|
status: str
|
|
loaded_at: Optional[datetime] = None
|
|
uptime_hours: float
|
|
request_count: int
|
|
total_uptime_seconds: float
|
|
|
|
# ========== Swapper Service ==========
|
|
|
|
class SwapperService:
|
|
"""Swapper Service - manages model loading/unloading"""
|
|
|
|
def __init__(self):
|
|
self.models: Dict[str, ModelInfo] = {}
|
|
self.active_model: Optional[str] = None
|
|
self.loading_lock = asyncio.Lock()
|
|
self.http_client = httpx.AsyncClient(timeout=300.0)
|
|
self.model_uptime: Dict[str, float] = {} # Track uptime per model
|
|
self.model_load_times: Dict[str, datetime] = {} # Track when model was loaded
|
|
|
|
async def initialize(self):
|
|
"""Initialize Swapper Service - load configuration"""
|
|
config = None
|
|
try:
|
|
logger.info(f"🔧 Initializing Swapper Service...")
|
|
logger.info(f"🔧 Config path: {SWAPPER_CONFIG_PATH}")
|
|
logger.info(f"🔧 Config exists: {os.path.exists(SWAPPER_CONFIG_PATH)}")
|
|
|
|
if os.path.exists(SWAPPER_CONFIG_PATH):
|
|
with open(SWAPPER_CONFIG_PATH, 'r') as f:
|
|
config = yaml.safe_load(f)
|
|
models_config = config.get('models', {})
|
|
logger.info(f"🔧 Found {len(models_config)} models in config")
|
|
|
|
for model_key, model_config in models_config.items():
|
|
ollama_name = model_config.get('path', '').replace('ollama:', '')
|
|
logger.info(f"🔧 Adding model: {model_key} -> {ollama_name}")
|
|
self.models[model_key] = ModelInfo(
|
|
name=model_key,
|
|
ollama_name=ollama_name,
|
|
type=model_config.get('type', 'llm'),
|
|
size_gb=model_config.get('size_gb', 0),
|
|
priority=model_config.get('priority', 'medium'),
|
|
status=ModelStatus.UNLOADED
|
|
)
|
|
self.model_uptime[model_key] = 0.0
|
|
logger.info(f"✅ Loaded {len(self.models)} models into Swapper")
|
|
else:
|
|
logger.warning(f"⚠️ Config file not found: {SWAPPER_CONFIG_PATH}, using defaults")
|
|
# Load default models from Ollama
|
|
await self._load_models_from_ollama()
|
|
|
|
logger.info(f"✅ Swapper Service initialized with {len(self.models)} models")
|
|
logger.info(f"✅ Model names: {list(self.models.keys())}")
|
|
|
|
# Завантажити модель за замовчанням, якщо вказано в конфігурації
|
|
if config:
|
|
swapper_config = config.get('swapper', {})
|
|
default_model = swapper_config.get('default_model')
|
|
|
|
if default_model and default_model in self.models:
|
|
logger.info(f"🔄 Loading default model: {default_model}")
|
|
success = await self.load_model(default_model)
|
|
if success:
|
|
logger.info(f"✅ Default model loaded: {default_model}")
|
|
else:
|
|
logger.warning(f"⚠️ Failed to load default model: {default_model}")
|
|
elif default_model:
|
|
logger.warning(f"⚠️ Default model '{default_model}' not found in models list")
|
|
except Exception as e:
|
|
logger.error(f"❌ Error initializing Swapper Service: {e}", exc_info=True)
|
|
import traceback
|
|
logger.error(f"❌ Traceback: {traceback.format_exc()}")
|
|
|
|
async def _load_models_from_ollama(self):
|
|
"""Load available models from Ollama"""
|
|
try:
|
|
response = await self.http_client.get(f"{OLLAMA_BASE_URL}/api/tags")
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
for model in data.get('models', []):
|
|
model_name = model.get('name', '')
|
|
# Extract base name (remove :latest, :7b, etc.)
|
|
base_name = model_name.split(':')[0]
|
|
|
|
if base_name not in self.models:
|
|
size_gb = model.get('size', 0) / (1024**3) # Convert bytes to GB
|
|
self.models[base_name] = ModelInfo(
|
|
name=base_name,
|
|
ollama_name=model_name,
|
|
type='llm', # Default type
|
|
size_gb=size_gb,
|
|
priority='medium',
|
|
status=ModelStatus.UNLOADED
|
|
)
|
|
self.model_uptime[base_name] = 0.0
|
|
|
|
logger.info(f"✅ Loaded {len(self.models)} models from Ollama")
|
|
except Exception as e:
|
|
logger.error(f"❌ Error loading models from Ollama: {e}")
|
|
|
|
async def load_model(self, model_name: str) -> bool:
|
|
"""Load a model (unload current if in single-active mode)"""
|
|
async with self.loading_lock:
|
|
try:
|
|
# Check if model exists
|
|
if model_name not in self.models:
|
|
logger.error(f"❌ Model not found: {model_name}")
|
|
return False
|
|
|
|
model_info = self.models[model_name]
|
|
|
|
# If single-active mode and another model is loaded, unload it first
|
|
if SWAPPER_MODE == "single-active" and self.active_model and self.active_model != model_name:
|
|
await self._unload_model_internal(self.active_model)
|
|
|
|
# Load the model
|
|
logger.info(f"🔄 Loading model: {model_name}")
|
|
model_info.status = ModelStatus.LOADING
|
|
|
|
# Check if model is already loaded in Ollama
|
|
response = await self.http_client.post(
|
|
f"{OLLAMA_BASE_URL}/api/generate",
|
|
json={
|
|
"model": model_info.ollama_name,
|
|
"prompt": "test",
|
|
"stream": False
|
|
},
|
|
timeout=MODEL_SWAP_TIMEOUT
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
model_info.status = ModelStatus.LOADED
|
|
model_info.loaded_at = datetime.now()
|
|
model_info.unloaded_at = None
|
|
self.active_model = model_name
|
|
self.model_load_times[model_name] = datetime.now()
|
|
logger.info(f"✅ Model loaded: {model_name}")
|
|
return True
|
|
else:
|
|
model_info.status = ModelStatus.ERROR
|
|
logger.error(f"❌ Failed to load model: {model_name}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error loading model {model_name}: {e}", exc_info=True)
|
|
if model_name in self.models:
|
|
self.models[model_name].status = ModelStatus.ERROR
|
|
return False
|
|
|
|
async def _unload_model_internal(self, model_name: str) -> bool:
|
|
"""Internal method to unload a model"""
|
|
try:
|
|
if model_name not in self.models:
|
|
return False
|
|
|
|
model_info = self.models[model_name]
|
|
|
|
if model_info.status == ModelStatus.LOADED:
|
|
logger.info(f"🔄 Unloading model: {model_name}")
|
|
model_info.status = ModelStatus.UNLOADING
|
|
|
|
# Calculate uptime
|
|
if model_name in self.model_load_times:
|
|
load_time = self.model_load_times[model_name]
|
|
uptime_seconds = (datetime.now() - load_time).total_seconds()
|
|
self.model_uptime[model_name] = self.model_uptime.get(model_name, 0.0) + uptime_seconds
|
|
model_info.total_uptime_seconds = self.model_uptime[model_name]
|
|
del self.model_load_times[model_name]
|
|
|
|
model_info.status = ModelStatus.UNLOADED
|
|
model_info.unloaded_at = datetime.now()
|
|
|
|
if self.active_model == model_name:
|
|
self.active_model = None
|
|
|
|
logger.info(f"✅ Model unloaded: {model_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error unloading model {model_name}: {e}")
|
|
return False
|
|
|
|
async def unload_model(self, model_name: str) -> bool:
|
|
"""Unload a model"""
|
|
async with self.loading_lock:
|
|
return await self._unload_model_internal(model_name)
|
|
|
|
async def get_status(self) -> SwapperStatus:
|
|
"""Get Swapper service status"""
|
|
# Update uptime for currently loaded model
|
|
if self.active_model and self.active_model in self.model_load_times:
|
|
load_time = self.model_load_times[self.active_model]
|
|
current_uptime = (datetime.now() - load_time).total_seconds()
|
|
self.model_uptime[self.active_model] = self.model_uptime.get(self.active_model, 0.0) + current_uptime
|
|
self.model_load_times[self.active_model] = datetime.now() # Reset timer
|
|
|
|
loaded_models = [
|
|
name for name, model in self.models.items()
|
|
if model.status == ModelStatus.LOADED
|
|
]
|
|
|
|
return SwapperStatus(
|
|
status="healthy",
|
|
active_model=self.active_model,
|
|
available_models=list(self.models.keys()),
|
|
loaded_models=loaded_models,
|
|
mode=SWAPPER_MODE,
|
|
total_models=len(self.models)
|
|
)
|
|
|
|
async def get_model_metrics(self, model_name: Optional[str] = None) -> List[ModelMetrics]:
|
|
"""Get metrics for model(s)"""
|
|
metrics = []
|
|
|
|
models_to_check = [model_name] if model_name else list(self.models.keys())
|
|
|
|
for name in models_to_check:
|
|
if name not in self.models:
|
|
continue
|
|
|
|
model_info = self.models[name]
|
|
|
|
# Calculate current uptime
|
|
uptime_seconds = self.model_uptime.get(name, 0.0)
|
|
if name in self.model_load_times:
|
|
load_time = self.model_load_times[name]
|
|
current_uptime = (datetime.now() - load_time).total_seconds()
|
|
uptime_seconds += current_uptime
|
|
|
|
uptime_hours = uptime_seconds / 3600.0
|
|
|
|
metrics.append(ModelMetrics(
|
|
model_name=name,
|
|
status=model_info.status.value,
|
|
loaded_at=model_info.loaded_at,
|
|
uptime_hours=uptime_hours,
|
|
request_count=model_info.request_count,
|
|
total_uptime_seconds=uptime_seconds
|
|
))
|
|
|
|
return metrics
|
|
|
|
async def close(self):
|
|
"""Close HTTP client"""
|
|
await self.http_client.aclose()
|
|
|
|
# ========== FastAPI App ==========
|
|
|
|
app = FastAPI(
|
|
title="Swapper Service",
|
|
description="Dynamic model loading service for Node #2",
|
|
version="1.0.0"
|
|
)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Include cabinet API router (import after swapper is created)
|
|
try:
|
|
from app.cabinet_api import router as cabinet_router
|
|
app.include_router(cabinet_router)
|
|
logger.info("✅ Cabinet API router included")
|
|
except ImportError:
|
|
logger.warning("⚠️ cabinet_api module not found, skipping cabinet router")
|
|
|
|
# Global Swapper instance
|
|
swapper = SwapperService()
|
|
|
|
@app.on_event("startup")
|
|
async def startup():
|
|
"""Initialize Swapper on startup"""
|
|
await swapper.initialize()
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown():
|
|
"""Close Swapper on shutdown"""
|
|
await swapper.close()
|
|
|
|
# ========== API Endpoints ==========
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
"""Health check endpoint"""
|
|
status = await swapper.get_status()
|
|
return {
|
|
"status": "healthy",
|
|
"service": "swapper-service",
|
|
"active_model": status.active_model,
|
|
"mode": status.mode
|
|
}
|
|
|
|
@app.get("/status", response_model=SwapperStatus)
|
|
async def get_status():
|
|
"""Get Swapper service status"""
|
|
return await swapper.get_status()
|
|
|
|
@app.get("/models")
|
|
async def list_models():
|
|
"""List all available models"""
|
|
return {
|
|
"models": [
|
|
{
|
|
"name": model.name,
|
|
"ollama_name": model.ollama_name,
|
|
"type": model.type,
|
|
"size_gb": model.size_gb,
|
|
"priority": model.priority,
|
|
"status": model.status.value
|
|
}
|
|
for model in swapper.models.values()
|
|
]
|
|
}
|
|
|
|
@app.get("/models/{model_name}")
|
|
async def get_model_info(model_name: str):
|
|
"""Get information about a specific model"""
|
|
if model_name not in swapper.models:
|
|
raise HTTPException(status_code=404, detail=f"Model not found: {model_name}")
|
|
|
|
model_info = swapper.models[model_name]
|
|
return {
|
|
"name": model_info.name,
|
|
"ollama_name": model_info.ollama_name,
|
|
"type": model_info.type,
|
|
"size_gb": model_info.size_gb,
|
|
"priority": model_info.priority,
|
|
"status": model_info.status.value,
|
|
"loaded_at": model_info.loaded_at.isoformat() if model_info.loaded_at else None,
|
|
"unloaded_at": model_info.unloaded_at.isoformat() if model_info.unloaded_at else None,
|
|
"total_uptime_seconds": swapper.model_uptime.get(model_name, 0.0)
|
|
}
|
|
|
|
@app.post("/models/{model_name}/load")
|
|
async def load_model_endpoint(model_name: str):
|
|
"""Load a model"""
|
|
success = await swapper.load_model(model_name)
|
|
if success:
|
|
return {"status": "success", "model": model_name, "message": f"Model {model_name} loaded"}
|
|
raise HTTPException(status_code=500, detail=f"Failed to load model: {model_name}")
|
|
|
|
@app.post("/models/{model_name}/unload")
|
|
async def unload_model_endpoint(model_name: str):
|
|
"""Unload a model"""
|
|
success = await swapper.unload_model(model_name)
|
|
if success:
|
|
return {"status": "success", "model": model_name, "message": f"Model {model_name} unloaded"}
|
|
raise HTTPException(status_code=500, detail=f"Failed to unload model: {model_name}")
|
|
|
|
@app.get("/metrics")
|
|
async def get_metrics(model_name: Optional[str] = None):
|
|
"""Get metrics for model(s)"""
|
|
metrics = await swapper.get_model_metrics(model_name)
|
|
return {
|
|
"metrics": [metric.dict() for metric in metrics]
|
|
}
|
|
|
|
@app.get("/metrics/{model_name}")
|
|
async def get_model_metrics(model_name: str):
|
|
"""Get metrics for a specific model"""
|
|
metrics = await swapper.get_model_metrics(model_name)
|
|
if not metrics:
|
|
raise HTTPException(status_code=404, detail=f"Model not found: {model_name}")
|
|
return metrics[0].dict()
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8890)
|
|
|