feat: implement PDF/image preprocessing, post-processing, and dots.ocr integration prep

G.2.3 - PDF/Image Support:
- Add preprocessing.py with PDF→images conversion (pdf2image)
- Add image loading and normalization
- Add file type detection and validation
- Support for PDF, PNG, JPEG, WebP, TIFF

G.2.4 - Pre/Post-processing:
- Add postprocessing.py with structured output builders
- build_chunks() - semantic chunks for RAG
- build_qa_pairs() - Q&A extraction
- build_markdown() - Markdown conversion
- Text normalization and chunking logic

G.1.3 - dots.ocr Integration Prep:
- Update model_loader.py with proper error handling
- Add USE_DUMMY_PARSER and ALLOW_DUMMY_FALLBACK flags
- Update inference.py to work with images list
- Add parse_document_from_images() function
- Ready for actual model integration

Configuration:
- Add PDF_DPI, IMAGE_MAX_SIZE, PAGE_RANGE settings
- Add parser mode flags (USE_DUMMY_PARSER, ALLOW_DUMMY_FALLBACK)

API Updates:
- Update endpoints to use new preprocessing pipeline
- Integrate post-processing for all output modes
- Remove temp file handling (work directly with bytes)
This commit is contained in:
Apple
2025-11-15 13:19:07 -08:00
parent 0f6cfe046f
commit 4befecc425
6 changed files with 762 additions and 122 deletions

View File

@@ -14,7 +14,13 @@ from app.schemas import (
ParseRequest, ParseResponse, ParsedDocument, ParsedChunk, QAPair, ChunksResponse
)
from app.core.config import settings
from app.runtime.inference import parse_document, dummy_parse_document
from app.runtime.inference import parse_document_from_images
from app.runtime.preprocessing import (
convert_pdf_to_images, load_image, detect_file_type, validate_file_size
)
from app.runtime.postprocessing import (
build_chunks, build_qa_pairs, build_markdown
)
logger = logging.getLogger(__name__)
@@ -50,31 +56,29 @@ async def parse_document_endpoint(
detail="Either 'file' or 'doc_url' must be provided"
)
# Determine document type
# Process file
if file:
doc_type = "image" # Will be determined from file extension
file_ext = Path(file.filename or "").suffix.lower()
if file_ext == ".pdf":
doc_type = "pdf"
# Read file content
content = await file.read()
# Check file size
max_size = settings.MAX_FILE_SIZE_MB * 1024 * 1024
if len(content) > max_size:
raise HTTPException(
status_code=413,
detail=f"File size exceeds maximum {settings.MAX_FILE_SIZE_MB}MB"
)
# Validate file size
try:
validate_file_size(content)
except ValueError as e:
raise HTTPException(status_code=413, detail=str(e))
# Save to temp file
temp_dir = Path(settings.TEMP_DIR)
temp_dir.mkdir(exist_ok=True, parents=True)
temp_file = temp_dir / f"{uuid.uuid4()}{file_ext}"
temp_file.write_bytes(content)
# Detect file type
try:
doc_type = detect_file_type(content, file.filename)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
input_path = str(temp_file)
# Convert to images
if doc_type == "pdf":
images = convert_pdf_to_images(content)
else:
image = load_image(content)
images = [image]
else:
# TODO: Download from doc_url
@@ -83,51 +87,31 @@ async def parse_document_endpoint(
detail="doc_url download not yet implemented"
)
# Parse document
logger.info(f"Parsing document: {input_path}, mode: {output_mode}")
# Parse document from images
logger.info(f"Parsing document: {len(images)} page(s), mode: {output_mode}")
# TODO: Replace with real parse_document when model is integrated
parsed_doc = dummy_parse_document(
input_path=input_path,
parsed_doc = parse_document_from_images(
images=images,
output_mode=output_mode,
doc_id=doc_id or str(uuid.uuid4()),
doc_type=doc_type
)
# Build response based on output_mode
response_data = {"metadata": {}}
response_data = {"metadata": {
"doc_id": parsed_doc.doc_id,
"doc_type": parsed_doc.doc_type,
"page_count": len(parsed_doc.pages)
}}
if output_mode == "raw_json":
response_data["document"] = parsed_doc
elif output_mode == "markdown":
# TODO: Convert to markdown
response_data["markdown"] = "# Document\n\n" + "\n\n".join(
block.text for page in parsed_doc.pages for block in page.blocks
)
response_data["markdown"] = build_markdown(parsed_doc)
elif output_mode == "qa_pairs":
# TODO: Extract QA pairs
response_data["qa_pairs"] = []
response_data["qa_pairs"] = build_qa_pairs(parsed_doc)
elif output_mode == "chunks":
# Convert blocks to chunks
chunks = []
for page in parsed_doc.pages:
for block in page.blocks:
chunks.append(ParsedChunk(
text=block.text,
page=page.page_num,
bbox=block.bbox,
section=block.type,
metadata={
"dao_id": dao_id,
"doc_id": parsed_doc.doc_id,
"block_type": block.type
}
))
response_data["chunks"] = chunks
# Cleanup temp file
if file and temp_file.exists():
temp_file.unlink()
response_data["chunks"] = build_chunks(parsed_doc, dao_id=dao_id)
return ParseResponse(**response_data)

View File

@@ -3,7 +3,7 @@ Configuration for PARSER Service
"""
import os
from typing import Literal
from typing import Literal, Optional
from pydantic_settings import BaseSettings
@@ -25,6 +25,17 @@ class Settings(BaseSettings):
MAX_FILE_SIZE_MB: int = int(os.getenv("MAX_FILE_SIZE_MB", "50"))
TEMP_DIR: str = os.getenv("TEMP_DIR", "/tmp/parser")
# PDF processing
PDF_DPI: int = int(os.getenv("PDF_DPI", "200"))
PAGE_RANGE: Optional[str] = os.getenv("PAGE_RANGE", None) # e.g., "1-20" for pages 1-20
# Image processing
IMAGE_MAX_SIZE: int = int(os.getenv("IMAGE_MAX_SIZE", "2048")) # Max size for longest side
# Parser mode
USE_DUMMY_PARSER: bool = os.getenv("USE_DUMMY_PARSER", "false").lower() == "true"
ALLOW_DUMMY_FALLBACK: bool = os.getenv("ALLOW_DUMMY_FALLBACK", "true").lower() == "true"
# Runtime
RUNTIME_TYPE: Literal["local", "remote"] = os.getenv("RUNTIME_TYPE", "local")
RUNTIME_URL: str = os.getenv("RUNTIME_URL", "http://parser-runtime:11435")

View File

@@ -3,16 +3,110 @@ Inference functions for document parsing
"""
import logging
from typing import Literal, Optional
from typing import Literal, Optional, List
from pathlib import Path
from PIL import Image
from app.schemas import ParsedDocument, ParsedPage, ParsedBlock, BBox
from app.runtime.model_loader import get_model
from app.runtime.preprocessing import (
convert_pdf_to_images, load_image, prepare_images_for_model
)
from app.runtime.postprocessing import build_parsed_document
from app.core.config import settings
logger = logging.getLogger(__name__)
def parse_document_from_images(
images: List[Image.Image],
output_mode: Literal["raw_json", "markdown", "qa_pairs", "chunks"] = "raw_json",
doc_id: Optional[str] = None,
doc_type: Literal["pdf", "image"] = "image"
) -> ParsedDocument:
"""
Parse document from list of images using dots.ocr model
Args:
images: List of PIL Images (pages)
output_mode: Output format mode
doc_id: Document ID
doc_type: Document type (pdf or image)
Returns:
ParsedDocument with structured content
"""
# Check if we should use dummy parser
if settings.USE_DUMMY_PARSER:
logger.info("Using dummy parser (USE_DUMMY_PARSER=true)")
return dummy_parse_document_from_images(images, doc_id, doc_type)
# Try to get model
model = get_model()
if model is None:
if settings.ALLOW_DUMMY_FALLBACK:
logger.warning("Model not loaded, falling back to dummy parser")
return dummy_parse_document_from_images(images, doc_id, doc_type)
else:
raise RuntimeError("Model not loaded and dummy fallback is disabled")
# Prepare images for model
prepared_images = prepare_images_for_model(images)
if not prepared_images:
raise ValueError("No valid images to process")
# Process with model
pages_data = []
for idx, image in enumerate(prepared_images, start=1):
try:
# TODO: Implement actual inference with dots.ocr
# Example:
# inputs = model["processor"](images=image, return_tensors="pt")
# outputs = model["model"].generate(**inputs)
# text = model["processor"].decode(outputs[0], skip_special_tokens=True)
#
# # Parse model output into blocks
# blocks = parse_model_output_to_blocks(text, image.size)
#
# pages_data.append({
# "blocks": blocks,
# "width": image.width,
# "height": image.height
# })
# For now, use dummy for each page
logger.debug(f"Processing page {idx} with model (placeholder)")
pages_data.append({
"blocks": [
{
"type": "paragraph",
"text": f"Page {idx} content (model output placeholder)",
"bbox": {"x": 0, "y": 0, "width": image.width, "height": image.height},
"reading_order": 1
}
],
"width": image.width,
"height": image.height
})
except Exception as e:
logger.error(f"Error processing page {idx}: {e}", exc_info=True)
# Continue with other pages
continue
# Build ParsedDocument from model output
return build_parsed_document(
pages_data=pages_data,
doc_id=doc_id or "parsed-doc",
doc_type=doc_type,
metadata={"model": settings.PARSER_MODEL_NAME}
)
def parse_document(
input_path: str,
output_mode: Literal["raw_json", "markdown", "qa_pairs", "chunks"] = "raw_json",
@@ -20,7 +114,9 @@ def parse_document(
doc_type: Literal["pdf", "image"] = "image"
) -> ParsedDocument:
"""
Parse document using dots.ocr model
Parse document from file path
This function handles file loading and delegates to parse_document_from_images
Args:
input_path: Path to document file (PDF or image)
@@ -31,37 +127,70 @@ def parse_document(
Returns:
ParsedDocument with structured content
"""
model = get_model()
# Load file content
with open(input_path, 'rb') as f:
content = f.read()
if model is None:
logger.warning("Model not loaded, using dummy parser")
return dummy_parse_document(input_path, output_mode, doc_id, doc_type)
# Convert to images based on type
if doc_type == "pdf":
images = convert_pdf_to_images(content)
else:
image = load_image(content)
images = [image]
# TODO: Implement actual inference with dots.ocr
# Example:
# from PIL import Image
# import pdf2image # for PDF
# Parse from images
return parse_document_from_images(images, output_mode, doc_id, doc_type)
def dummy_parse_document_from_images(
images: List[Image.Image],
doc_id: Optional[str] = None,
doc_type: Literal["pdf", "image"] = "image"
) -> ParsedDocument:
"""
Dummy parser for testing (returns mock data from images)
# if doc_type == "pdf":
# images = pdf2image.convert_from_path(input_path)
# else:
# images = [Image.open(input_path)]
#
# pages = []
# for idx, image in enumerate(images):
# # Process with model
# inputs = model["processor"](images=image, return_tensors="pt")
# outputs = model["model"].generate(**inputs)
# text = model["processor"].decode(outputs[0], skip_special_tokens=True)
#
# # Parse output into blocks
# blocks = parse_model_output(text, idx + 1)
# pages.append(ParsedPage(...))
#
# return ParsedDocument(...)
This will be replaced with actual dots.ocr inference
"""
logger.info(f"Dummy parsing: {len(images)} image(s)")
# For now, use dummy
return dummy_parse_document(input_path, output_mode, doc_id, doc_type)
pages = []
for idx, image in enumerate(images, start=1):
mock_page = ParsedPage(
page_num=idx,
blocks=[
ParsedBlock(
type="heading",
text=f"Page {idx} Title",
bbox=BBox(x=0, y=0, width=image.width, height=50),
reading_order=1,
page_num=idx
),
ParsedBlock(
type="paragraph",
text=f"This is a dummy parsed document (page {idx}). "
f"Image size: {image.width}x{image.height}. "
f"Replace this with actual dots.ocr inference.",
bbox=BBox(x=0, y=60, width=image.width, height=100),
reading_order=2,
page_num=idx
)
],
width=image.width,
height=image.height
)
pages.append(mock_page)
return ParsedDocument(
doc_id=doc_id or "dummy-doc-1",
doc_type=doc_type,
pages=pages,
metadata={
"parser": "dummy",
"page_count": len(images)
}
)
def dummy_parse_document(
@@ -73,40 +202,18 @@ def dummy_parse_document(
"""
Dummy parser for testing (returns mock data)
This will be replaced with actual dots.ocr inference
This function loads the file and delegates to dummy_parse_document_from_images
"""
logger.info(f"Dummy parsing: {input_path}")
# Load file content
with open(input_path, 'rb') as f:
content = f.read()
# Mock data
mock_page = ParsedPage(
page_num=1,
blocks=[
ParsedBlock(
type="heading",
text="Document Title",
bbox=BBox(x=0, y=0, width=800, height=50),
reading_order=1,
page_num=1
),
ParsedBlock(
type="paragraph",
text="This is a dummy parsed document. Replace this with actual dots.ocr inference.",
bbox=BBox(x=0, y=60, width=800, height=100),
reading_order=2,
page_num=1
)
],
width=800,
height=1200
)
# Convert to images
if doc_type == "pdf":
images = convert_pdf_to_images(content)
else:
image = load_image(content)
images = [image]
return ParsedDocument(
doc_id=doc_id or "dummy-doc-1",
doc_type=doc_type,
pages=[mock_page],
metadata={
"parser": "dummy",
"input_path": input_path
}
)
return dummy_parse_document_from_images(images, doc_id, doc_type)

View File

@@ -15,44 +15,63 @@ logger = logging.getLogger(__name__)
_model: Optional[object] = None
def load_model() -> object:
def load_model() -> Optional[object]:
"""
Load dots.ocr model
Returns:
Loaded model instance
Loaded model instance or None if loading fails
"""
global _model
if _model is not None:
return _model
# Check if dummy mode is enabled
if settings.USE_DUMMY_PARSER:
logger.info("Dummy parser mode enabled, skipping model loading")
return None
logger.info(f"Loading model: {settings.PARSER_MODEL_NAME}")
logger.info(f"Device: {settings.PARSER_DEVICE}")
try:
# TODO: Implement actual model loading
# Example:
# Example for dots.ocr (adjust based on actual model structure):
# from transformers import AutoModelForVision2Seq, AutoProcessor
#
# processor = AutoProcessor.from_pretrained(settings.PARSER_MODEL_NAME)
# model = AutoModelForVision2Seq.from_pretrained(
# settings.PARSER_MODEL_NAME,
# device_map=settings.PARSER_DEVICE
# device_map=settings.PARSER_DEVICE if settings.PARSER_DEVICE != "cpu" else None,
# torch_dtype=torch.float16 if settings.PARSER_DEVICE != "cpu" else torch.float32
# )
#
# if settings.PARSER_DEVICE == "cpu":
# model = model.to("cpu")
#
# _model = {
# "model": model,
# "processor": processor
# "processor": processor,
# "device": settings.PARSER_DEVICE
# }
#
# logger.info("Model loaded successfully")
# For now, return None (will use dummy parser)
logger.warning("Model loading not yet implemented, using dummy parser")
logger.warning("Model loading not yet implemented, will use dummy parser")
_model = None
except ImportError as e:
logger.error(f"Required packages not installed: {e}")
if not settings.ALLOW_DUMMY_FALLBACK:
raise
_model = None
except Exception as e:
logger.error(f"Failed to load model: {e}", exc_info=True)
raise
if not settings.ALLOW_DUMMY_FALLBACK:
raise
_model = None
return _model

View File

@@ -0,0 +1,321 @@
"""
Post-processing functions to convert model output to structured formats
"""
import logging
import re
from typing import List, Dict, Any, Optional
from app.schemas import (
ParsedDocument, ParsedPage, ParsedBlock, ParsedChunk, QAPair, BBox
)
logger = logging.getLogger(__name__)
def normalize_text(text: str) -> str:
"""
Normalize text: remove extra whitespace, line breaks, invisible chars
Args:
text: Raw text
Returns:
Normalized text
"""
if not text:
return ""
# Remove invisible characters
text = re.sub(r'[\x00-\x08\x0b-\x0c\x0e-\x1f\x7f-\x9f]', '', text)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text)
# Remove leading/trailing whitespace
text = text.strip()
return text
def build_parsed_document(
pages_data: List[Dict[str, Any]],
doc_id: str,
doc_type: str,
metadata: Dict[str, Any] = None
) -> ParsedDocument:
"""
Build ParsedDocument from model output
Args:
pages_data: List of page data from model
Each page should have: blocks, width, height
doc_id: Document ID
doc_type: Document type ("pdf" or "image")
metadata: Additional metadata
Returns:
ParsedDocument
"""
pages = []
for page_idx, page_data in enumerate(pages_data, start=1):
blocks = []
for block_data in page_data.get('blocks', []):
# Normalize text
text = normalize_text(block_data.get('text', ''))
if not text:
continue
# Extract bbox
bbox_data = block_data.get('bbox', {})
bbox = BBox(
x=bbox_data.get('x', 0),
y=bbox_data.get('y', 0),
width=bbox_data.get('width', 0),
height=bbox_data.get('height', 0)
)
# Create block
block = ParsedBlock(
type=block_data.get('type', 'paragraph'),
text=text,
bbox=bbox,
reading_order=block_data.get('reading_order', len(blocks) + 1),
page_num=page_idx,
metadata=block_data.get('metadata', {})
)
blocks.append(block)
page = ParsedPage(
page_num=page_idx,
blocks=blocks,
width=page_data.get('width', 0),
height=page_data.get('height', 0)
)
pages.append(page)
return ParsedDocument(
doc_id=doc_id,
doc_type=doc_type,
pages=pages,
metadata=metadata or {}
)
def build_chunks(
parsed_doc: ParsedDocument,
chunk_size: int = 500,
chunk_overlap: int = 50,
dao_id: Optional[str] = None
) -> List[ParsedChunk]:
"""
Build semantic chunks from ParsedDocument
Args:
parsed_doc: Parsed document
chunk_size: Target chunk size in characters
chunk_overlap: Overlap between chunks
dao_id: Optional DAO ID for metadata
Returns:
List of ParsedChunk
"""
chunks = []
for page in parsed_doc.pages:
# Group blocks by section (heading-based)
current_section = None
current_text_parts = []
for block in page.blocks:
# Update section if we encounter a heading
if block.type == 'heading':
# Save previous section if exists
if current_text_parts:
text = ' '.join(current_text_parts)
if text:
chunks.append(ParsedChunk(
text=text,
page=page.page_num,
bbox=block.bbox, # Use first block's bbox
section=current_section or "main",
metadata={
"dao_id": dao_id,
"doc_id": parsed_doc.doc_id,
"chunk_type": "section"
}
))
current_section = normalize_text(block.text)
current_text_parts = []
# Add block text
if block.text:
current_text_parts.append(block.text)
# Save last section
if current_text_parts:
text = ' '.join(current_text_parts)
if text:
chunks.append(ParsedChunk(
text=text,
page=page.page_num,
section=current_section or "main",
metadata={
"dao_id": dao_id,
"doc_id": parsed_doc.doc_id,
"chunk_type": "section"
}
))
# Split large chunks
final_chunks = []
for chunk in chunks:
if len(chunk.text) <= chunk_size:
final_chunks.append(chunk)
else:
# Split into smaller chunks
words = chunk.text.split()
current_chunk_words = []
current_length = 0
for word in words:
word_length = len(word) + 1 # +1 for space
if current_length + word_length > chunk_size and current_chunk_words:
# Save current chunk
chunk_text = ' '.join(current_chunk_words)
final_chunks.append(ParsedChunk(
text=chunk_text,
page=chunk.page,
bbox=chunk.bbox,
section=chunk.section,
metadata=chunk.metadata
))
# Start new chunk with overlap
overlap_words = current_chunk_words[-chunk_overlap:] if chunk_overlap > 0 else []
current_chunk_words = overlap_words + [word]
current_length = sum(len(w) + 1 for w in current_chunk_words)
else:
current_chunk_words.append(word)
current_length += word_length
# Save last chunk
if current_chunk_words:
chunk_text = ' '.join(current_chunk_words)
final_chunks.append(ParsedChunk(
text=chunk_text,
page=chunk.page,
bbox=chunk.bbox,
section=chunk.section,
metadata=chunk.metadata
))
logger.info(f"Created {len(final_chunks)} chunks from document")
return final_chunks
def build_qa_pairs(
parsed_doc: ParsedDocument,
max_pairs: int = 10
) -> List[QAPair]:
"""
Build Q&A pairs from ParsedDocument
This is a simple implementation. For production, consider using LLM
to generate better Q&A pairs.
Args:
parsed_doc: Parsed document
max_pairs: Maximum number of Q&A pairs to generate
Returns:
List of QAPair
"""
qa_pairs = []
# Simple heuristic: use headings as questions, following paragraphs as answers
for page in parsed_doc.pages:
for i, block in enumerate(page.blocks):
if block.type == 'heading' and i + 1 < len(page.blocks):
question = f"What is {normalize_text(block.text)}?"
answer_block = page.blocks[i + 1]
answer = normalize_text(answer_block.text)
if answer:
qa_pairs.append(QAPair(
question=question,
answer=answer,
source_page=page.page_num,
source_bbox=block.bbox,
confidence=0.7 # Placeholder
))
if len(qa_pairs) >= max_pairs:
break
if len(qa_pairs) >= max_pairs:
break
logger.info(f"Generated {len(qa_pairs)} Q&A pairs")
return qa_pairs
def build_markdown(parsed_doc: ParsedDocument) -> str:
"""
Build Markdown representation from ParsedDocument
Args:
parsed_doc: Parsed document
Returns:
Markdown string
"""
markdown_parts = []
for page in parsed_doc.pages:
if len(parsed_doc.pages) > 1:
markdown_parts.append(f"\n## Page {page.page_num}\n")
for block in page.blocks:
text = normalize_text(block.text)
if not text:
continue
if block.type == 'heading':
# Determine heading level (simple heuristic)
if len(text) < 50:
markdown_parts.append(f"### {text}\n")
else:
markdown_parts.append(f"#### {text}\n")
elif block.type == 'paragraph':
markdown_parts.append(f"{text}\n\n")
elif block.type == 'list':
# Simple list formatting
lines = text.split('\n')
for line in lines:
if line.strip():
markdown_parts.append(f"- {line.strip()}\n")
markdown_parts.append("\n")
elif block.type == 'table' and block.table_data:
# Format table as Markdown
table = block.table_data
if table.columns:
# Header
markdown_parts.append("| " + " | ".join(table.columns) + " |\n")
markdown_parts.append("| " + " | ".join(["---"] * len(table.columns)) + " |\n")
# Rows
for row in table.rows:
markdown_parts.append("| " + " | ".join(str(cell) for cell in row) + " |\n")
markdown_parts.append("\n")
else:
# Default: plain text
markdown_parts.append(f"{text}\n\n")
return ''.join(markdown_parts)

View File

@@ -0,0 +1,198 @@
"""
Preprocessing functions for PDF and images
"""
import logging
from typing import List, Optional
from io import BytesIO
from pathlib import Path
from PIL import Image
import pdf2image
from app.core.config import settings
logger = logging.getLogger(__name__)
def convert_pdf_to_images(
pdf_bytes: bytes,
dpi: Optional[int] = None,
max_pages: Optional[int] = None
) -> List[Image.Image]:
"""
Convert PDF bytes to list of PIL Images
Args:
pdf_bytes: PDF file content as bytes
dpi: DPI for conversion (default from settings)
max_pages: Maximum number of pages to process (default from settings)
Returns:
List of PIL Images (one per page)
"""
dpi = dpi or getattr(settings, 'PDF_DPI', 200)
max_pages = max_pages or settings.PARSER_MAX_PAGES
try:
# Convert PDF to images
images = pdf2image.convert_from_bytes(
pdf_bytes,
dpi=dpi,
first_page=1,
last_page=max_pages
)
logger.info(f"Converted PDF to {len(images)} images (DPI: {dpi}, max_pages: {max_pages})")
return images
except Exception as e:
logger.error(f"Failed to convert PDF to images: {e}", exc_info=True)
raise ValueError(f"PDF conversion failed: {str(e)}")
def load_image(image_bytes: bytes) -> Image.Image:
"""
Load image from bytes
Args:
image_bytes: Image file content as bytes
Returns:
PIL Image
"""
try:
image = Image.open(BytesIO(image_bytes))
logger.info(f"Loaded image: {image.format}, size: {image.size}")
return image
except Exception as e:
logger.error(f"Failed to load image: {e}", exc_info=True)
raise ValueError(f"Image loading failed: {str(e)}")
def normalize_image(
image: Image.Image,
max_size: Optional[int] = None
) -> Image.Image:
"""
Normalize image for model input
- Convert to RGB
- Resize to max_size (preserving aspect ratio)
- Ensure proper format
Args:
image: PIL Image
max_size: Maximum size for longest side (default from settings)
Returns:
Normalized PIL Image
"""
max_size = max_size or getattr(settings, 'IMAGE_MAX_SIZE', 2048)
# Convert to RGB if needed
if image.mode != 'RGB':
image = image.convert('RGB')
# Resize if needed (preserve aspect ratio)
width, height = image.size
if width > max_size or height > max_size:
if width > height:
new_width = max_size
new_height = int(height * (max_size / width))
else:
new_height = max_size
new_width = int(width * (max_size / height))
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
logger.info(f"Resized image from {width}x{height} to {new_width}x{new_height}")
return image
def prepare_images_for_model(
images: List[Image.Image],
max_size: Optional[int] = None
) -> List[Image.Image]:
"""
Prepare list of images for model inference
- Normalize each image
- Apply batch processing if needed
Args:
images: List of PIL Images
max_size: Maximum size for longest side
Returns:
List of normalized PIL Images
"""
normalized = []
for idx, image in enumerate(images):
try:
norm_image = normalize_image(image, max_size)
normalized.append(norm_image)
except Exception as e:
logger.warning(f"Failed to normalize image {idx + 1}: {e}")
# Skip problematic images
continue
logger.info(f"Prepared {len(normalized)} images for model")
return normalized
def detect_file_type(content: bytes, filename: Optional[str] = None) -> str:
"""
Detect file type from content and/or filename
Args:
content: File content as bytes
filename: Optional filename (for extension detection)
Returns:
File type: "pdf" or "image"
"""
# Check magic bytes
if content.startswith(b'%PDF'):
return "pdf"
# Check by extension if available
if filename:
ext = Path(filename).suffix.lower()
if ext == '.pdf':
return "pdf"
elif ext in ['.png', '.jpg', '.jpeg', '.webp', '.tiff', '.tif']:
return "image"
# Try to open as image
try:
Image.open(BytesIO(content))
return "image"
except:
pass
raise ValueError("Unsupported file type. Expected PDF or image (PNG/JPEG/WebP)")
def validate_file_size(content: bytes) -> None:
"""
Validate file size against MAX_FILE_SIZE_MB
Args:
content: File content as bytes
Raises:
ValueError if file is too large
"""
max_size_bytes = settings.MAX_FILE_SIZE_MB * 1024 * 1024
file_size = len(content)
if file_size > max_size_bytes:
raise ValueError(
f"File size ({file_size / 1024 / 1024:.2f} MB) exceeds maximum "
f"({settings.MAX_FILE_SIZE_MB} MB)"
)