""" MinIO client for uploading assets (logos, banners, avatars) to S3-compatible storage. """ import os from datetime import datetime from uuid import uuid4 from typing import BinaryIO, Optional from minio import Minio from minio.error import S3Error import logging logger = logging.getLogger(__name__) # MinIO configuration from environment MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "http://minio:9000") MINIO_ACCESS_KEY = os.getenv("MINIO_ROOT_USER", "assets-admin") MINIO_SECRET_KEY = os.getenv("MINIO_ROOT_PASSWORD", "") ASSETS_BUCKET = os.getenv("ASSETS_BUCKET", "daarion-assets") ASSETS_PUBLIC_BASE_URL = os.getenv( "ASSETS_PUBLIC_BASE_URL", "https://assets.daarion.space/daarion-assets" ) # Initialize MinIO client _minio_client: Optional[Minio] = None def get_minio_client() -> Minio: """Get or create MinIO client instance.""" global _minio_client if _minio_client is None: # Remove http:// or https:// from endpoint for MinIO client endpoint = MINIO_ENDPOINT.replace("http://", "").replace("https://", "") secure = MINIO_ENDPOINT.startswith("https://") _minio_client = Minio( endpoint, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=secure, ) return _minio_client def ensure_bucket(): """Ensure the assets bucket exists, create if it doesn't.""" try: client = get_minio_client() if not client.bucket_exists(ASSETS_BUCKET): client.make_bucket(ASSETS_BUCKET) logger.info(f"Created bucket: {ASSETS_BUCKET}") else: logger.debug(f"Bucket {ASSETS_BUCKET} already exists") except S3Error as e: logger.error(f"Failed to ensure bucket {ASSETS_BUCKET}: {e}") raise def upload_asset( file_obj: BinaryIO, content_type: str, prefix: str, filename: Optional[str] = None ) -> str: """ Upload a file to MinIO and return the public URL. Args: file_obj: File-like object to upload content_type: MIME type (e.g., 'image/png', 'image/jpeg') prefix: Path prefix (e.g., 'microdao/logo', 'agents/avatar') filename: Optional original filename (for extension detection) Returns: Full public HTTPS URL to the uploaded asset """ try: ensure_bucket() client = get_minio_client() # Generate object key: prefix/YYYY/MM/DD/uuid.ext date_path = datetime.utcnow().strftime("%Y/%m/%d") unique_id = uuid4().hex # Try to extract extension from filename or content_type ext = "" if filename: ext = os.path.splitext(filename)[1] or "" elif content_type: # Map common content types to extensions ext_map = { "image/png": ".png", "image/jpeg": ".jpg", "image/jpg": ".jpg", "image/webp": ".webp", "image/gif": ".gif", } ext = ext_map.get(content_type, "") object_key = f"{prefix}/{date_path}/{unique_id}{ext}" # Get file size (seek to end, get position, reset) file_obj.seek(0, 2) # Seek to end file_size = file_obj.tell() file_obj.seek(0) # Reset to beginning # Upload to MinIO client.put_object( ASSETS_BUCKET, object_key, file_obj, length=file_size, content_type=content_type, ) # Return public URL public_url = f"{ASSETS_PUBLIC_BASE_URL}/{object_key}" logger.info(f"Uploaded asset to {public_url}") return public_url except S3Error as e: logger.error(f"Failed to upload asset: {e}") raise except Exception as e: logger.error(f"Unexpected error uploading asset: {e}") raise def delete_asset(object_key: str) -> bool: """ Delete an asset from MinIO. Args: object_key: Full object key or URL (will extract key from URL if needed) Returns: True if deleted, False otherwise """ try: client = get_minio_client() # If it's a full URL, extract the object key if object_key.startswith("http://") or object_key.startswith("https://"): # Extract key from URL like: https://assets.daarion.space/daarion-assets/microdao/logo/... parts = object_key.split(f"{ASSETS_PUBLIC_BASE_URL}/") if len(parts) < 2: logger.warning(f"Could not extract object key from URL: {object_key}") return False object_key = parts[1] client.remove_object(ASSETS_BUCKET, object_key) logger.info(f"Deleted asset: {object_key}") return True except S3Error as e: logger.error(f"Failed to delete asset {object_key}: {e}") return False except Exception as e: logger.error(f"Unexpected error deleting asset: {e}") return False