feat(file-tool): harden svg rendering and add rich pptx/pdf updates

This commit is contained in:
Apple
2026-02-15 02:48:35 -08:00
parent aad5870e81
commit 3a565fd910
2 changed files with 271 additions and 15 deletions

View File

@@ -329,7 +329,7 @@ TOOL_DEFINITIONS = [
"excel_create", "excel_update", "docx_create", "docx_update",
"pptx_create", "pptx_update",
"ods_create", "ods_update", "parquet_create", "parquet_update",
"csv_create", "csv_update", "pdf_fill", "pdf_merge", "pdf_split",
"csv_create", "csv_update", "pdf_fill", "pdf_merge", "pdf_split", "pdf_update",
"json_export", "yaml_export", "zip_bundle",
"text_create", "text_update", "markdown_create", "markdown_update",
"xml_export", "html_export",
@@ -667,6 +667,8 @@ class ToolManager:
return self._file_pdf_merge(args)
if action == "pdf_split":
return self._file_pdf_split(args)
if action == "pdf_update":
return self._file_pdf_update(args)
if action == "pdf_fill":
return self._file_pdf_fill(args)
@@ -1227,6 +1229,61 @@ class ToolManager:
except Exception:
return default
@staticmethod
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
text = str(value).strip()
if text.endswith("px"):
text = text[:-2]
return float(text)
except Exception:
return default
@staticmethod
def _svg_style_map(elem: Any) -> Dict[str, str]:
style = str(elem.attrib.get("style") or "")
out: Dict[str, str] = {}
for chunk in style.split(";"):
if ":" not in chunk:
continue
k, v = chunk.split(":", 1)
out[k.strip()] = v.strip()
return out
def _svg_paint(self, elem: Any, key: str, default: Optional[str]) -> Optional[str]:
style = self._svg_style_map(elem)
value = elem.attrib.get(key, style.get(key, default))
if value is None:
return None
text = str(value).strip()
if not text or text.lower() == "none":
return None
return text
@staticmethod
def _svg_color(value: Optional[str], fallback: Optional[tuple[int, int, int]] = None) -> Optional[tuple[int, int, int]]:
if value is None:
return fallback
try:
from PIL import ImageColor
return ImageColor.getrgb(value)
except Exception:
return fallback
@staticmethod
def _svg_points(raw: Any) -> List[tuple[float, float]]:
text = str(raw or "").replace(",", " ")
nums: List[float] = []
for token in text.split():
try:
nums.append(float(token))
except Exception:
continue
pts: List[tuple[float, float]] = []
for i in range(0, len(nums) - 1, 2):
pts.append((nums[i], nums[i + 1]))
return pts
def _file_svg_export(self, args: Dict[str, Any]) -> ToolResult:
file_name = self._sanitize_file_name(args.get("file_name"), "image.svg", force_ext=".svg")
svg_raw = args.get("svg")
@@ -1266,7 +1323,7 @@ class ToolManager:
)
def _file_svg_to_png(self, args: Dict[str, Any]) -> ToolResult:
from PIL import Image, ImageColor, ImageDraw
from PIL import Image, ImageDraw
file_name = self._sanitize_file_name(args.get("file_name"), "converted.png", force_ext=".png")
src_b64 = args.get("file_base64")
@@ -1296,20 +1353,68 @@ class ToolManager:
y = self._safe_int(elem.attrib.get("y"), 0)
w = self._safe_int(elem.attrib.get("width"), width)
h = self._safe_int(elem.attrib.get("height"), height)
fill = elem.attrib.get("fill", "white")
try:
color = ImageColor.getrgb(fill)
except Exception:
color = (255, 255, 255)
draw.rectangle([x, y, x + max(0, w), y + max(0, h)], fill=color)
fill = self._svg_paint(elem, "fill", "white")
stroke = self._svg_paint(elem, "stroke", None)
stroke_width = max(1, self._safe_int(elem.attrib.get("stroke-width"), 1))
color = self._svg_color(fill, (255, 255, 255) if fill else None)
outline = None
if stroke:
outline = self._svg_color(stroke, (0, 0, 0))
draw.rectangle([x, y, x + max(0, w), y + max(0, h)], fill=color, outline=outline, width=stroke_width)
elif tag == "circle":
cx = self._safe_float(elem.attrib.get("cx"), width / 2.0)
cy = self._safe_float(elem.attrib.get("cy"), height / 2.0)
r = max(0.0, self._safe_float(elem.attrib.get("r"), 0.0))
fill = self._svg_paint(elem, "fill", None)
stroke = self._svg_paint(elem, "stroke", None)
stroke_width = max(1, self._safe_int(elem.attrib.get("stroke-width"), 1))
fill_color = self._svg_color(fill, None)
outline = self._svg_color(stroke, None)
draw.ellipse([cx - r, cy - r, cx + r, cy + r], fill=fill_color, outline=outline, width=stroke_width)
elif tag == "ellipse":
cx = self._safe_float(elem.attrib.get("cx"), width / 2.0)
cy = self._safe_float(elem.attrib.get("cy"), height / 2.0)
rx = max(0.0, self._safe_float(elem.attrib.get("rx"), 0.0))
ry = max(0.0, self._safe_float(elem.attrib.get("ry"), 0.0))
fill = self._svg_paint(elem, "fill", None)
stroke = self._svg_paint(elem, "stroke", None)
stroke_width = max(1, self._safe_int(elem.attrib.get("stroke-width"), 1))
fill_color = self._svg_color(fill, None)
outline = self._svg_color(stroke, None)
draw.ellipse([cx - rx, cy - ry, cx + rx, cy + ry], fill=fill_color, outline=outline, width=stroke_width)
elif tag == "line":
x1 = self._safe_float(elem.attrib.get("x1"), 0.0)
y1 = self._safe_float(elem.attrib.get("y1"), 0.0)
x2 = self._safe_float(elem.attrib.get("x2"), 0.0)
y2 = self._safe_float(elem.attrib.get("y2"), 0.0)
stroke = self._svg_paint(elem, "stroke", "black")
stroke_width = max(1, self._safe_int(elem.attrib.get("stroke-width"), 1))
color = self._svg_color(stroke, (0, 0, 0)) or (0, 0, 0)
draw.line([(x1, y1), (x2, y2)], fill=color, width=stroke_width)
elif tag == "polyline":
points = self._svg_points(elem.attrib.get("points"))
if len(points) >= 2:
stroke = self._svg_paint(elem, "stroke", "black")
stroke_width = max(1, self._safe_int(elem.attrib.get("stroke-width"), 1))
color = self._svg_color(stroke, (0, 0, 0)) or (0, 0, 0)
draw.line(points, fill=color, width=stroke_width)
elif tag == "polygon":
points = self._svg_points(elem.attrib.get("points"))
if len(points) >= 3:
fill = self._svg_paint(elem, "fill", None)
stroke = self._svg_paint(elem, "stroke", None)
stroke_width = max(1, self._safe_int(elem.attrib.get("stroke-width"), 1))
fill_color = self._svg_color(fill, None)
outline = self._svg_color(stroke, None)
draw.polygon(points, fill=fill_color, outline=outline)
# Pillow polygon has no width support; emulate thicker stroke
if outline and stroke_width > 1:
draw.line(points + [points[0]], fill=outline, width=stroke_width)
elif tag == "text":
x = self._safe_int(elem.attrib.get("x"), 0)
y = self._safe_int(elem.attrib.get("y"), 0)
fill = elem.attrib.get("fill", "black")
try:
color = ImageColor.getrgb(fill)
except Exception:
color = (0, 0, 0)
fill = self._svg_paint(elem, "fill", "black")
color = self._svg_color(fill, (0, 0, 0)) or (0, 0, 0)
draw.text((x, y), elem.text or "", fill=color)
out = BytesIO()
@@ -1666,6 +1771,7 @@ class ToolManager:
def _file_pptx_update(self, args: Dict[str, Any]) -> ToolResult:
from pptx import Presentation
from pptx.util import Inches
src_b64 = args.get("file_base64")
operations = args.get("operations") or []
@@ -1683,7 +1789,10 @@ class ToolManager:
op_type = str(op.get("type") or "").strip().lower()
if op_type == "append_slide":
slide = prs.slides.add_slide(prs.slide_layouts[1])
layout_idx = int(op.get("layout") or 1)
if layout_idx < 0 or layout_idx >= len(prs.slide_layouts):
layout_idx = 1
slide = prs.slides.add_slide(prs.slide_layouts[layout_idx])
if slide.shapes.title:
slide.shapes.title.text = str(op.get("title") or "")
lines = op.get("bullets")
@@ -1702,6 +1811,35 @@ class ToolManager:
else:
p = body.add_paragraph()
p.text = str(line)
elif op_type == "add_table":
slide_index = int(op.get("slide_index") or len(prs.slides))
if slide_index < 0:
slide_index = 0
while len(prs.slides) <= slide_index:
prs.slides.add_slide(prs.slide_layouts[1])
slide = prs.slides[slide_index]
headers = [str(h) for h in (op.get("headers") or [])]
rows_raw = op.get("rows") or []
rows = self._normalize_rows(rows_raw, headers=headers if headers else None)
if rows and not headers and isinstance(rows_raw[0], dict):
headers = list(rows_raw[0].keys())
rows = self._normalize_rows(rows_raw, headers=headers)
row_count = len(rows) + (1 if headers else 0)
col_count = len(headers) if headers else (len(rows[0]) if rows else 1)
left = Inches(float(op.get("left_inches") or 1.0))
top = Inches(float(op.get("top_inches") or 1.5))
width = Inches(float(op.get("width_inches") or 8.0))
height = Inches(float(op.get("height_inches") or 3.0))
table = slide.shapes.add_table(max(1, row_count), max(1, col_count), left, top, width, height).table
offset = 0
if headers:
for idx, value in enumerate(headers):
table.cell(0, idx).text = str(value)
offset = 1
for r_idx, row in enumerate(rows):
for c_idx, value in enumerate(row):
if c_idx < col_count:
table.cell(r_idx + offset, c_idx).text = str(value)
elif op_type == "replace_text":
old = str(op.get("old") or "")
new = str(op.get("new") or "")
@@ -1714,6 +1852,25 @@ class ToolManager:
text = shape.text or ""
if old in text:
shape.text = text.replace(old, new)
elif op_type == "replace_text_preserve_layout":
old = str(op.get("old") or "")
new = str(op.get("new") or "")
if not old:
return ToolResult(success=False, result=None, error="replace_text_preserve_layout requires old")
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text_frame") and shape.text_frame:
for paragraph in shape.text_frame.paragraphs:
for run in paragraph.runs:
if old in run.text:
run.text = run.text.replace(old, new)
if getattr(shape, "has_table", False):
for row in shape.table.rows:
for cell in row.cells:
for paragraph in cell.text_frame.paragraphs:
for run in paragraph.runs:
if old in run.text:
run.text = run.text.replace(old, new)
else:
return ToolResult(success=False, result=None, error=f"Unsupported pptx_update operation: {op_type}")
@@ -1850,6 +2007,101 @@ class ToolManager:
file_name=file_name,
file_mime="application/pdf",
)
def _file_pdf_update(self, args: Dict[str, Any]) -> ToolResult:
from pypdf import PdfReader, PdfWriter
src_b64 = args.get("file_base64")
operations = args.get("operations") or []
if not src_b64:
return ToolResult(success=False, result=None, error="file_base64 is required for pdf_update")
if not isinstance(operations, list) or not operations:
return ToolResult(success=False, result=None, error="operations must be non-empty array")
file_name = self._sanitize_file_name(args.get("file_name"), "updated.pdf", force_ext=".pdf")
reader = PdfReader(BytesIO(self._bytes_from_b64(src_b64)))
pages = [reader.pages[i] for i in range(len(reader.pages))]
total = len(pages)
if total == 0:
return ToolResult(success=False, result=None, error="Input PDF has no pages")
def parse_pages_list(raw: Any, allow_empty: bool = False) -> Optional[List[int]]:
if raw is None:
return [] if allow_empty else None
if not isinstance(raw, list) or (not raw and not allow_empty):
return None
out: List[int] = []
for val in raw:
try:
idx = int(val)
except Exception:
return None
if idx < 1 or idx > len(pages):
return None
out.append(idx)
return out
for op in operations:
if not isinstance(op, dict):
return ToolResult(success=False, result=None, error="Each operation must be object")
op_type = str(op.get("type") or "").strip().lower()
if op_type == "rotate_pages":
angle = int(op.get("angle") or 90)
if angle not in {90, 180, 270}:
return ToolResult(success=False, result=None, error="rotate_pages angle must be 90|180|270")
target = parse_pages_list(op.get("pages"), allow_empty=True)
if target is None:
return ToolResult(success=False, result=None, error="Invalid pages for rotate_pages")
targets = target or list(range(1, len(pages) + 1))
for p in targets:
page = pages[p - 1]
try:
pages[p - 1] = page.rotate(angle)
except Exception:
if hasattr(page, "rotate_clockwise"):
page.rotate_clockwise(angle)
pages[p - 1] = page
else:
return ToolResult(success=False, result=None, error="PDF rotation not supported by library")
elif op_type == "remove_pages":
target = parse_pages_list(op.get("pages"))
if not target:
return ToolResult(success=False, result=None, error="remove_pages requires pages")
drop = set(target)
pages = [p for idx, p in enumerate(pages, start=1) if idx not in drop]
if not pages:
return ToolResult(success=False, result=None, error="remove_pages removed all pages")
elif op_type in {"reorder_pages", "extract_pages"}:
target = parse_pages_list(op.get("pages"))
if not target:
return ToolResult(success=False, result=None, error=f"{op_type} requires pages")
pages = [pages[i - 1] for i in target]
elif op_type == "set_metadata":
# Applied later on writer to avoid page object recreation.
continue
else:
return ToolResult(success=False, result=None, error=f"Unsupported pdf_update operation: {op_type}")
writer = PdfWriter()
for page in pages:
writer.add_page(page)
for op in operations:
if isinstance(op, dict) and str(op.get("type") or "").strip().lower() == "set_metadata":
meta = op.get("metadata")
if isinstance(meta, dict) and meta:
normalized = {k if str(k).startswith("/") else f"/{k}": str(v) for k, v in meta.items()}
writer.add_metadata(normalized)
out = BytesIO()
writer.write(out)
return ToolResult(
success=True,
result={"message": f"PDF updated: {file_name} ({len(pages)} pages)"},
file_base64=self._b64_from_bytes(out.getvalue()),
file_name=file_name,
file_mime="application/pdf",
)
async def _memory_search(self, args: Dict, agent_id: str = None, chat_id: str = None, user_id: str = None) -> ToolResult:
"""Search in Qdrant vector memory using Router's memory_retrieval - PRIORITY 1"""