""" BrowserTool - Production-ready browser automation for AI agents Fully self-hosted, open-source, privacy-by-default PRIMARY: Playwright (sync API) """ import os import re import json import logging import hashlib import time import random from pathlib import Path from typing import Optional, List, Dict, Any, Union from datetime import datetime from dataclasses import dataclass, field from threading import Lock from collections import defaultdict # Security & Encryption try: from cryptography.fernet import Fernet CRYPTO_AVAILABLE = True except ImportError: CRYPTO_AVAILABLE = False # Playwright (sync) try: from playwright.sync_api import sync_playwright, Browser, Page, BrowserContext PLAYWRIGHT_AVAILABLE = True except ImportError: PLAYWRIGHT_AVAILABLE = False logger = logging.getLogger(__name__) # ============================================================================ # CONFIGURATION # ============================================================================ @dataclass class BrowserConfig: encryption_key: Optional[bytes] = None context_dir: str = "/tmp/browser_contexts" memory_dir: str = "/tmp/agent_memory" headless: bool = True stealth: bool = True proxy: Optional[str] = None rate_limit_per_minute: int = 30 redact_pii_in_logs: bool = True user_agents: List[str] = field(default_factory=lambda: [ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0 Safari/537.36", ]) class RateLimiter: def __init__(self, max_per_minute: int = 30): self.max_per_minute = max_per_minute self.requests: Dict[str, List[float]] = defaultdict(list) self.lock = Lock() def is_allowed(self, agent_id: str) -> bool: with self.lock: now = time.time() self.requests[agent_id] = [t for t in self.requests[agent_id] if now - t < 60] if len(self.requests[agent_id]) >= self.max_per_minute: return False self.requests[agent_id].append(now) return True def wait_time(self, agent_id: str) -> float: with self.lock: if not self.requests[agent_id]: return 0 return max(0, 60 - (time.time() - min(self.requests[agent_id]))) rate_limiter = RateLimiter() # ============================================================================ # PII & SECURITY # ============================================================================ class PIIGuard: PATTERNS = [ (r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]'), (r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '[CARD]'), (r'(?i)(password|passwd|pwd|secret)[=:\s]+\S+', '[SECRET]'), ] @classmethod def redact(cls, text: str) -> str: if not text: return text for pattern, replacement in cls.PATTERNS: text = re.sub(pattern, replacement, text) return text @classmethod def redact_dict(cls, data: Dict) -> Dict: result = {} for k, v in data.items(): if isinstance(v, str): result[k] = cls.redact(v) elif isinstance(v, dict): result[k] = cls.redact_dict(v) elif isinstance(v, list): result[k] = [cls.redact(str(i)) for i in v] else: result[k] = v return result @classmethod def redact_url(cls, url: str) -> str: if not url: return url try: from urllib.parse import urlparse, parse_qs parsed = urlparse(url) params = parse_qs(parsed.query) if any(p in ['token', 'key', 'password', 'secret'] for p in params): params = {k: '[REDACTED]' for k in params} return parsed._replace(query='&'.join(f'{k}={v[0]}' for k, v in params.items())).geturl() except: pass return url class AuditLogger: def __init__(self, log_dir: str = "/tmp/browser_tool_logs"): self.log_dir = Path(log_dir) self.log_dir.mkdir(parents=True, exist_ok=True) def log(self, agent_id: str, operation: str, details: Dict[str, Any]) -> None: safe = PIIGuard.redact_dict(details) entry = { "timestamp": datetime.utcnow().isoformat(), "agent_id": hashlib.sha256(agent_id.encode()).hexdigest()[:16], "operation": operation, "details": safe } with open(self.log_dir / f"audit_{datetime.utcnow().strftime('%Y%m%d')}.jsonl", "a") as f: f.write(json.dumps(entry) + "\n") # ============================================================================ # SECURE CONTEXT STORE # ============================================================================ class SecureContextStore: def __init__(self, config: BrowserConfig): self.config = config self.context_dir = Path(config.context_dir) self.context_dir.mkdir(parents=True, exist_ok=True) if config.encryption_key: self.fernet = Fernet(config.encryption_key) else: if CRYPTO_AVAILABLE: self.fernet = Fernet(Fernet.generate_key()) else: self.fernet = None def _path(self, agent_id: str, session_id: str) -> Path: agent_hash = hashlib.sha256(agent_id.encode()).hexdigest()[:16] return self.context_dir / f"context_{agent_hash}_{session_id}.enc" def save(self, agent_id: str, session_id: str, data: Dict) -> None: file_path = self._path(agent_id, session_id) json_data = json.dumps(data).encode() if self.fernet: encrypted = self.fernet.encrypt(json_data) file_path.write_bytes(encrypted) else: file_path.write_bytes(json_data) def load(self, agent_id: str, session_id: str) -> Optional[Dict]: file_path = self._path(agent_id, session_id) if not file_path.exists(): return None try: data = file_path.read_bytes() return json.loads(self.fernet.decrypt(data) if self.fernet else data) except: return None def delete(self, agent_id: str, session_id: str) -> None: p = self._path(agent_id, session_id) if p.exists(): p.unlink() # ============================================================================ # MAIN BROWSER TOOL # ============================================================================ class BrowserTool: """Production-ready browser automation for AI agents.""" def __init__( self, agent_id: str, config: Optional[BrowserConfig] = None, headless: bool = True, proxy: Optional[str] = None, stealth: bool = True ): self.agent_id = agent_id self.config = config or self._load_config() self.config.headless = headless self.config.proxy = proxy self.config.stealth = stealth self.context_store = SecureContextStore(self.config) self.audit = AuditLogger() self._playwright = None self._browser: Optional[Browser] = None self._context: Optional[BrowserContext] = None self._page: Optional[Page] = None self._session_id: Optional[str] = None self._lock = Lock() self.temp_dir = Path(self.config.context_dir) / "screenshots" self.temp_dir.mkdir(parents=True, exist_ok=True) def _load_config(self) -> BrowserConfig: return BrowserConfig( encryption_key=os.getenv("BROWSER_ENCRYPTION_KEY", "").encode() or None, context_dir=os.getenv("BROWSER_CONTEXT_DIR", "/tmp/browser_contexts"), rate_limit_per_minute=int(os.getenv("BROWSER_RATE_LIMIT", "30")), ) @property def is_connected(self) -> bool: return self._page is not None def _check_rate_limit(self) -> None: if not rate_limiter.is_allowed(self.agent_id): wait = rate_limiter.wait_time(self.agent_id) raise RateLimitError(f"Rate limit exceeded. Wait {wait:.1f}s") # ======================================================================== # SESSION MANAGEMENT # ======================================================================== def start_session( self, headless: Optional[bool] = None, proxy: Optional[str] = None, stealth: Optional[bool] = None, restore_existing: bool = True ) -> Dict[str, Any]: """Start a new browser session.""" self._check_rate_limit() self._session_id = hashlib.sha256(f"{self.agent_id}_{time.time()}".encode()).hexdigest()[:16] if headless is not None: self.config.headless = headless if proxy is not None: self.config.proxy = proxy if stealth is not None: self.config.stealth = stealth if restore_existing: existing = self.context_store.load(self.agent_id, self.agent_id) if existing: logger.info(f"Found existing session for agent: {self.agent_id}") try: return self._start_playwright_session() except Exception as e: self.audit.log(self.agent_id, "start_session_error", {"error": str(e)}) raise BrowserError(f"Failed to start session: {e}") def _start_playwright_session(self) -> Dict[str, Any]: if not PLAYWRIGHT_AVAILABLE: raise BrowserError("Playwright not available. Run: pip install playwright") # Start playwright self._playwright = sync_playwright().start() # Launch args launch_args = [ "--disable-blink-features=AutomationControlled", "--disable-dev-shm-usage", "--no-sandbox", ] # Launch browser self._browser = self._playwright.chromium.launch( headless=self.config.headless, args=launch_args ) # Context options context_opts: Dict[str, Any] = { "viewport": {"width": 1920, "height": 1080}, "user_agent": random.choice(self.config.user_agents), "locale": "en-US", } if self.config.proxy: context_opts["proxy"] = {"server": self.config.proxy} self._context = self._browser.new_context(**context_opts) # Inject stealth scripts self._context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); window.navigator.chrome = {runtime: {}}; """) self._page = self._context.new_page() session_info = { "session_id": self._session_id, "provider": "playwright", "headless": self.config.headless, "stealth": self.config.stealth, "proxy": self.config.proxy, "started_at": datetime.utcnow().isoformat() } self._save_context() self.audit.log(self.agent_id, "start_session", session_info) return session_info def _save_context(self) -> None: if not self._page or not self._session_id: return try: cookies = self._context.cookies() local_storage = self._page.evaluate("() => JSON.stringify(localStorage)") session_storage = self._page.evaluate("() => JSON.stringify(sessionStorage)") self.context_store.save(self.agent_id, self.agent_id, { "session_id": self._session_id, "cookies": cookies, "local_storage": json.loads(local_storage) if local_storage else {}, "session_storage": json.loads(session_storage) if session_storage else {}, "saved_at": datetime.utcnow().isoformat() }) except Exception as e: logger.warning(f"Failed to save context: {e}") def restore_session(self, agent_id: str) -> Dict[str, Any]: """Restore a previous browser session.""" context = self.context_store.load(agent_id, agent_id) if not context: raise BrowserError(f"No saved session found for agent: {agent_id}") self.agent_id = agent_id session = self.start_session(restore_existing=False) if context.get("cookies"): try: self._context.add_cookies(context["cookies"]) except Exception as e: logger.warning(f"Failed to restore cookies: {e}") self.audit.log(self.agent_id, "restore_session", { "session_id": session["session_id"], "restored_from": context.get("saved_at") }) return session def close_session(self) -> Dict[str, Any]: """Close current browser session.""" if not self._session_id: return {"status": "no_session"} self._save_context() try: if self._page: self._page.close() if self._context: self._context.close() if self._browser: self._browser.close() if self._playwright: self._playwright.stop() except Exception as e: logger.warning(f"Error closing browser: {e}") self._page = None self._context = None self._browser = None self._playwright = None result = { "status": "closed", "session_id": self._session_id, "closed_at": datetime.utcnow().isoformat() } self.audit.log(self.agent_id, "close_session", result) return result # ======================================================================== # NAVIGATION # ======================================================================== def goto(self, url: str) -> Dict[str, Any]: """Navigate to a URL.""" self._check_rate_limit() if not self._page: raise BrowserError("No active session. Call start_session() first.") url = PIIGuard.redact_url(url) try: self._page.goto(url, wait_until="domcontentloaded", timeout=30000) result = { "url": self._page.url, "status": "loaded", "title": self._page.title() } self.audit.log(self.agent_id, "goto", {"url": url, "status": result["status"]}) return result except Exception as e: raise BrowserError(f"Navigation failed: {e}") def get_current_url(self) -> str: if not self._page: return "" return self._page.url def get_page_text(self) -> str: if not self._page: return "" return self._page.evaluate("() => document.body.innerText") def get_html(self) -> str: if not self._page: return "" return self._page.content() # ======================================================================== # ACTIONS # ======================================================================== def act(self, instruction: str) -> Dict[str, Any]: """Execute a natural language action.""" self._check_rate_limit() if not self._page: raise BrowserError("No active session") instruction = PIIGuard.redact(instruction) result = {"success": False, "actions": [], "provider": "playwright"} try: inst_lower = instruction.lower() # Click click_match = re.search(r'click (?:on )?(?:the )?(.+?)(?:\s|$)', inst_lower) if click_match: selector = click_match.group(1).strip() try: self._page.click(f"text={selector}", timeout=5000) result["success"] = True result["actions"].append(f"clicked: {selector}") except: try: self._page.click(selector, timeout=5000) result["success"] = True result["actions"].append(f"clicked: {selector}") except: pass # Type type_match = re.search(r'type ["\'](.+?)["\'] (?:into |in )(?:the )?(.+?)(?:\s|$)', inst_lower) if type_match: text, field = type_match.groups() try: self._page.fill(f"input[name='{field}'], textarea[name='{field}'], #{field}", text) result["success"] = True result["actions"].append(f"typed into: {field}") except: pass # Scroll if "scroll" in inst_lower: direction = "down" if "down" in inst_lower else "up" self._page.evaluate(f"() => window.scrollBy(0, {500 if direction == 'down' else -500})") result["success"] = True result["actions"].append(f"scrolled {direction}") # Wait if "wait" in inst_lower: time.sleep(2) result["success"] = True result["actions"].append("waited") self.audit.log(self.agent_id, "act", {"instruction": instruction}) return result except Exception as e: self.audit.log(self.agent_id, "act_error", {"instruction": instruction, "error": str(e)}) raise BrowserError(f"Act failed: {e}") def extract(self, instruction: str, schema: Optional[Dict[str, str]] = None) -> Dict[str, Any]: """Extract structured data from page.""" self._check_rate_limit() if not self._page: raise BrowserError("No active session") instruction = PIIGuard.redact(instruction) try: html = self._page.content() inst_lower = instruction.lower() data = [] if "email" in inst_lower: emails = re.findall(r'[\w.-]+@[\w.-]+\.\w+', html) data = [{"email": e} for e in list(set(emails))[:20]] elif "price" in inst_lower or "cost" in inst_lower: prices = re.findall(r'[\$£€]?\d+[.,]?\d*', html) data = [{"price": p} for p in list(set(prices))[:20]] elif "link" in inst_lower or "url" in inst_lower: links = self._page.query_selector_all("a[href]") data = [ {"text": l.inner_text().strip(), "href": l.get_attribute("href")} for l in links[:20] if l.get_attribute("href") ] elif "button" in inst_lower: buttons = self._page.query_selector_all("button, input[type='submit']") data = [{"text": b.inner_text(), "type": b.evaluate("e => e.tagName")} for b in buttons[:20]] else: text = self._page.evaluate("() => document.body.innerText") data = [{"text": text[:5000]}] result = { "data": data, "provider": "playwright", "instruction": instruction, "count": len(data) } self.audit.log(self.agent_id, "extract", {"instruction": instruction, "count": result["count"]}) return result except Exception as e: self.audit.log(self.agent_id, "extract_error", {"error": str(e)}) raise BrowserError(f"Extract failed: {e}") def observe(self, instruction: Optional[str] = None) -> List[Dict[str, Any]]: """Observe available actions on page.""" self._check_rate_limit() if not self._page: raise BrowserError("No active session") try: actions = [] # Clickable elements clickables = self._page.query_selector_all("a, button, [role='button']") for el in clickables[:30]: try: text = el.inner_text() tag = el.evaluate("e => e.tagName") if text and len(text.strip()) > 0: actions.append({ "action": f"Click {text.strip()[:50]}", "element": tag.lower(), "text": text.strip()[:100] }) except: pass # Input fields inputs = self._page.query_selector_all("input, textarea, select") for el in inputs[:20]: try: name = el.get_attribute("name") placeholder = el.get_attribute("placeholder") if name or placeholder: actions.append({ "action": f"Fill {name or placeholder}", "element": "input", "field": name, "placeholder": placeholder }) except: pass self.audit.log(self.agent_id, "observe", {"instruction": instruction, "actions_count": len(actions)}) return actions except Exception as e: raise BrowserError(f"Observe failed: {e}") # ======================================================================== # SCREENSHOTS & FORMS # ======================================================================== def screenshot(self, save_path: Optional[str] = None) -> Union[str, bytes]: """Take a screenshot.""" if not self._page: raise BrowserError("No active session") if not save_path: timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") save_path = str(self.temp_dir / f"screenshot_{self.agent_id}_{timestamp}.png") try: self._page.screenshot(path=save_path, full_page=False) self.audit.log(self.agent_id, "screenshot", {"path": save_path}) return save_path except Exception as e: raise BrowserError(f"Screenshot failed: {e}") def fill_form(self, fields: List[Dict[str, str]]) -> Dict[str, Any]: """Fill form fields.""" self._check_rate_limit() if not self._page: raise BrowserError("No active session") results = [] for field in fields: field_name = field.get("name", "") value = PIIGuard.redact(str(field.get("value", ""))) field_type = field.get("type", "text") try: if field_type in ["text", "email", "password", "tel"]: selector = f"input[name='{field_name}'], input[id='{field_name}'], #{field_name}" self._page.fill(selector, value) elif field_type == "checkbox": selector = f"input[name='{field_name}']" if field.get("checked", True): self._page.check(selector) else: self._page.uncheck(selector) elif field_type == "select": selector = f"select[name='{field_name}']" self._page.select_option(selector, value) results.append({"field": field_name, "status": "filled"}) except Exception as e: results.append({"field": field_name, "status": "error", "error": str(e)}) self.audit.log(self.agent_id, "fill_form", {"fields": len(fields), "results": results}) return {"filled": results} def wait_for(self, selector_or_text: str, timeout: int = 10) -> bool: """Wait for element or text.""" if not self._page: raise BrowserError("No active session") try: try: self._page.wait_for_selector(selector_or_text, timeout=timeout * 1000) return True except: try: self._page.wait_for_selector(f"text={selector_or_text}", timeout=timeout * 1000) return True except: return False except: return False # ============================================================================ # ERROR CLASSES # ============================================================================ class BrowserError(Exception): pass class RateLimitError(BrowserError): pass # ============================================================================ # REGISTRATION FOR OCTOTOOLS # ============================================================================ def register_tools() -> Dict[str, Any]: return { "browser": { "class": BrowserTool, "description": "Browser automation for AI agents - fully self-hosted, privacy-by-default", "methods": [ "start_session", "restore_session", "close_session", "goto", "act", "extract", "observe", "screenshot", "fill_form", "wait_for", "get_current_url", "get_page_text", "get_html" ] } }