Files
microdao-daarion/tools/browser_tool/browser_tool.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

722 lines
26 KiB
Python

"""
BrowserTool - Production-ready browser automation for AI agents
Fully self-hosted, open-source, privacy-by-default
PRIMARY: Playwright (sync API)
"""
import os
import re
import json
import logging
import hashlib
import time
import random
from pathlib import Path
from typing import Optional, List, Dict, Any, Union
from datetime import datetime
from dataclasses import dataclass, field
from threading import Lock
from collections import defaultdict
# Security & Encryption
try:
from cryptography.fernet import Fernet
CRYPTO_AVAILABLE = True
except ImportError:
CRYPTO_AVAILABLE = False
# Playwright (sync)
try:
from playwright.sync_api import sync_playwright, Browser, Page, BrowserContext
PLAYWRIGHT_AVAILABLE = True
except ImportError:
PLAYWRIGHT_AVAILABLE = False
logger = logging.getLogger(__name__)
# ============================================================================
# CONFIGURATION
# ============================================================================
@dataclass
class BrowserConfig:
encryption_key: Optional[bytes] = None
context_dir: str = "/tmp/browser_contexts"
memory_dir: str = "/tmp/agent_memory"
headless: bool = True
stealth: bool = True
proxy: Optional[str] = None
rate_limit_per_minute: int = 30
redact_pii_in_logs: bool = True
user_agents: List[str] = field(default_factory=lambda: [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0 Safari/537.36",
])
class RateLimiter:
def __init__(self, max_per_minute: int = 30):
self.max_per_minute = max_per_minute
self.requests: Dict[str, List[float]] = defaultdict(list)
self.lock = Lock()
def is_allowed(self, agent_id: str) -> bool:
with self.lock:
now = time.time()
self.requests[agent_id] = [t for t in self.requests[agent_id] if now - t < 60]
if len(self.requests[agent_id]) >= self.max_per_minute:
return False
self.requests[agent_id].append(now)
return True
def wait_time(self, agent_id: str) -> float:
with self.lock:
if not self.requests[agent_id]:
return 0
return max(0, 60 - (time.time() - min(self.requests[agent_id])))
rate_limiter = RateLimiter()
# ============================================================================
# PII & SECURITY
# ============================================================================
class PIIGuard:
PATTERNS = [
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]'),
(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '[CARD]'),
(r'(?i)(password|passwd|pwd|secret)[=:\s]+\S+', '[SECRET]'),
]
@classmethod
def redact(cls, text: str) -> str:
if not text:
return text
for pattern, replacement in cls.PATTERNS:
text = re.sub(pattern, replacement, text)
return text
@classmethod
def redact_dict(cls, data: Dict) -> Dict:
result = {}
for k, v in data.items():
if isinstance(v, str):
result[k] = cls.redact(v)
elif isinstance(v, dict):
result[k] = cls.redact_dict(v)
elif isinstance(v, list):
result[k] = [cls.redact(str(i)) for i in v]
else:
result[k] = v
return result
@classmethod
def redact_url(cls, url: str) -> str:
if not url:
return url
try:
from urllib.parse import urlparse, parse_qs
parsed = urlparse(url)
params = parse_qs(parsed.query)
if any(p in ['token', 'key', 'password', 'secret'] for p in params):
params = {k: '[REDACTED]' for k in params}
return parsed._replace(query='&'.join(f'{k}={v[0]}' for k, v in params.items())).geturl()
except:
pass
return url
class AuditLogger:
def __init__(self, log_dir: str = "/tmp/browser_tool_logs"):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
def log(self, agent_id: str, operation: str, details: Dict[str, Any]) -> None:
safe = PIIGuard.redact_dict(details)
entry = {
"timestamp": datetime.utcnow().isoformat(),
"agent_id": hashlib.sha256(agent_id.encode()).hexdigest()[:16],
"operation": operation,
"details": safe
}
with open(self.log_dir / f"audit_{datetime.utcnow().strftime('%Y%m%d')}.jsonl", "a") as f:
f.write(json.dumps(entry) + "\n")
# ============================================================================
# SECURE CONTEXT STORE
# ============================================================================
class SecureContextStore:
def __init__(self, config: BrowserConfig):
self.config = config
self.context_dir = Path(config.context_dir)
self.context_dir.mkdir(parents=True, exist_ok=True)
if config.encryption_key:
self.fernet = Fernet(config.encryption_key)
else:
if CRYPTO_AVAILABLE:
self.fernet = Fernet(Fernet.generate_key())
else:
self.fernet = None
def _path(self, agent_id: str, session_id: str) -> Path:
agent_hash = hashlib.sha256(agent_id.encode()).hexdigest()[:16]
return self.context_dir / f"context_{agent_hash}_{session_id}.enc"
def save(self, agent_id: str, session_id: str, data: Dict) -> None:
file_path = self._path(agent_id, session_id)
json_data = json.dumps(data).encode()
if self.fernet:
encrypted = self.fernet.encrypt(json_data)
file_path.write_bytes(encrypted)
else:
file_path.write_bytes(json_data)
def load(self, agent_id: str, session_id: str) -> Optional[Dict]:
file_path = self._path(agent_id, session_id)
if not file_path.exists():
return None
try:
data = file_path.read_bytes()
return json.loads(self.fernet.decrypt(data) if self.fernet else data)
except:
return None
def delete(self, agent_id: str, session_id: str) -> None:
p = self._path(agent_id, session_id)
if p.exists():
p.unlink()
# ============================================================================
# MAIN BROWSER TOOL
# ============================================================================
class BrowserTool:
"""Production-ready browser automation for AI agents."""
def __init__(
self,
agent_id: str,
config: Optional[BrowserConfig] = None,
headless: bool = True,
proxy: Optional[str] = None,
stealth: bool = True
):
self.agent_id = agent_id
self.config = config or self._load_config()
self.config.headless = headless
self.config.proxy = proxy
self.config.stealth = stealth
self.context_store = SecureContextStore(self.config)
self.audit = AuditLogger()
self._playwright = None
self._browser: Optional[Browser] = None
self._context: Optional[BrowserContext] = None
self._page: Optional[Page] = None
self._session_id: Optional[str] = None
self._lock = Lock()
self.temp_dir = Path(self.config.context_dir) / "screenshots"
self.temp_dir.mkdir(parents=True, exist_ok=True)
def _load_config(self) -> BrowserConfig:
return BrowserConfig(
encryption_key=os.getenv("BROWSER_ENCRYPTION_KEY", "").encode() or None,
context_dir=os.getenv("BROWSER_CONTEXT_DIR", "/tmp/browser_contexts"),
rate_limit_per_minute=int(os.getenv("BROWSER_RATE_LIMIT", "30")),
)
@property
def is_connected(self) -> bool:
return self._page is not None
def _check_rate_limit(self) -> None:
if not rate_limiter.is_allowed(self.agent_id):
wait = rate_limiter.wait_time(self.agent_id)
raise RateLimitError(f"Rate limit exceeded. Wait {wait:.1f}s")
# ========================================================================
# SESSION MANAGEMENT
# ========================================================================
def start_session(
self,
headless: Optional[bool] = None,
proxy: Optional[str] = None,
stealth: Optional[bool] = None,
restore_existing: bool = True
) -> Dict[str, Any]:
"""Start a new browser session."""
self._check_rate_limit()
self._session_id = hashlib.sha256(f"{self.agent_id}_{time.time()}".encode()).hexdigest()[:16]
if headless is not None:
self.config.headless = headless
if proxy is not None:
self.config.proxy = proxy
if stealth is not None:
self.config.stealth = stealth
if restore_existing:
existing = self.context_store.load(self.agent_id, self.agent_id)
if existing:
logger.info(f"Found existing session for agent: {self.agent_id}")
try:
return self._start_playwright_session()
except Exception as e:
self.audit.log(self.agent_id, "start_session_error", {"error": str(e)})
raise BrowserError(f"Failed to start session: {e}")
def _start_playwright_session(self) -> Dict[str, Any]:
if not PLAYWRIGHT_AVAILABLE:
raise BrowserError("Playwright not available. Run: pip install playwright")
# Start playwright
self._playwright = sync_playwright().start()
# Launch args
launch_args = [
"--disable-blink-features=AutomationControlled",
"--disable-dev-shm-usage",
"--no-sandbox",
]
# Launch browser
self._browser = self._playwright.chromium.launch(
headless=self.config.headless,
args=launch_args
)
# Context options
context_opts: Dict[str, Any] = {
"viewport": {"width": 1920, "height": 1080},
"user_agent": random.choice(self.config.user_agents),
"locale": "en-US",
}
if self.config.proxy:
context_opts["proxy"] = {"server": self.config.proxy}
self._context = self._browser.new_context(**context_opts)
# Inject stealth scripts
self._context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.navigator.chrome = {runtime: {}};
""")
self._page = self._context.new_page()
session_info = {
"session_id": self._session_id,
"provider": "playwright",
"headless": self.config.headless,
"stealth": self.config.stealth,
"proxy": self.config.proxy,
"started_at": datetime.utcnow().isoformat()
}
self._save_context()
self.audit.log(self.agent_id, "start_session", session_info)
return session_info
def _save_context(self) -> None:
if not self._page or not self._session_id:
return
try:
cookies = self._context.cookies()
local_storage = self._page.evaluate("() => JSON.stringify(localStorage)")
session_storage = self._page.evaluate("() => JSON.stringify(sessionStorage)")
self.context_store.save(self.agent_id, self.agent_id, {
"session_id": self._session_id,
"cookies": cookies,
"local_storage": json.loads(local_storage) if local_storage else {},
"session_storage": json.loads(session_storage) if session_storage else {},
"saved_at": datetime.utcnow().isoformat()
})
except Exception as e:
logger.warning(f"Failed to save context: {e}")
def restore_session(self, agent_id: str) -> Dict[str, Any]:
"""Restore a previous browser session."""
context = self.context_store.load(agent_id, agent_id)
if not context:
raise BrowserError(f"No saved session found for agent: {agent_id}")
self.agent_id = agent_id
session = self.start_session(restore_existing=False)
if context.get("cookies"):
try:
self._context.add_cookies(context["cookies"])
except Exception as e:
logger.warning(f"Failed to restore cookies: {e}")
self.audit.log(self.agent_id, "restore_session", {
"session_id": session["session_id"],
"restored_from": context.get("saved_at")
})
return session
def close_session(self) -> Dict[str, Any]:
"""Close current browser session."""
if not self._session_id:
return {"status": "no_session"}
self._save_context()
try:
if self._page:
self._page.close()
if self._context:
self._context.close()
if self._browser:
self._browser.close()
if self._playwright:
self._playwright.stop()
except Exception as e:
logger.warning(f"Error closing browser: {e}")
self._page = None
self._context = None
self._browser = None
self._playwright = None
result = {
"status": "closed",
"session_id": self._session_id,
"closed_at": datetime.utcnow().isoformat()
}
self.audit.log(self.agent_id, "close_session", result)
return result
# ========================================================================
# NAVIGATION
# ========================================================================
def goto(self, url: str) -> Dict[str, Any]:
"""Navigate to a URL."""
self._check_rate_limit()
if not self._page:
raise BrowserError("No active session. Call start_session() first.")
url = PIIGuard.redact_url(url)
try:
self._page.goto(url, wait_until="domcontentloaded", timeout=30000)
result = {
"url": self._page.url,
"status": "loaded",
"title": self._page.title()
}
self.audit.log(self.agent_id, "goto", {"url": url, "status": result["status"]})
return result
except Exception as e:
raise BrowserError(f"Navigation failed: {e}")
def get_current_url(self) -> str:
if not self._page:
return ""
return self._page.url
def get_page_text(self) -> str:
if not self._page:
return ""
return self._page.evaluate("() => document.body.innerText")
def get_html(self) -> str:
if not self._page:
return ""
return self._page.content()
# ========================================================================
# ACTIONS
# ========================================================================
def act(self, instruction: str) -> Dict[str, Any]:
"""Execute a natural language action."""
self._check_rate_limit()
if not self._page:
raise BrowserError("No active session")
instruction = PIIGuard.redact(instruction)
result = {"success": False, "actions": [], "provider": "playwright"}
try:
inst_lower = instruction.lower()
# Click
click_match = re.search(r'click (?:on )?(?:the )?(.+?)(?:\s|$)', inst_lower)
if click_match:
selector = click_match.group(1).strip()
try:
self._page.click(f"text={selector}", timeout=5000)
result["success"] = True
result["actions"].append(f"clicked: {selector}")
except:
try:
self._page.click(selector, timeout=5000)
result["success"] = True
result["actions"].append(f"clicked: {selector}")
except:
pass
# Type
type_match = re.search(r'type ["\'](.+?)["\'] (?:into |in )(?:the )?(.+?)(?:\s|$)', inst_lower)
if type_match:
text, field = type_match.groups()
try:
self._page.fill(f"input[name='{field}'], textarea[name='{field}'], #{field}", text)
result["success"] = True
result["actions"].append(f"typed into: {field}")
except:
pass
# Scroll
if "scroll" in inst_lower:
direction = "down" if "down" in inst_lower else "up"
self._page.evaluate(f"() => window.scrollBy(0, {500 if direction == 'down' else -500})")
result["success"] = True
result["actions"].append(f"scrolled {direction}")
# Wait
if "wait" in inst_lower:
time.sleep(2)
result["success"] = True
result["actions"].append("waited")
self.audit.log(self.agent_id, "act", {"instruction": instruction})
return result
except Exception as e:
self.audit.log(self.agent_id, "act_error", {"instruction": instruction, "error": str(e)})
raise BrowserError(f"Act failed: {e}")
def extract(self, instruction: str, schema: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Extract structured data from page."""
self._check_rate_limit()
if not self._page:
raise BrowserError("No active session")
instruction = PIIGuard.redact(instruction)
try:
html = self._page.content()
inst_lower = instruction.lower()
data = []
if "email" in inst_lower:
emails = re.findall(r'[\w.-]+@[\w.-]+\.\w+', html)
data = [{"email": e} for e in list(set(emails))[:20]]
elif "price" in inst_lower or "cost" in inst_lower:
prices = re.findall(r'[\$£€]?\d+[.,]?\d*', html)
data = [{"price": p} for p in list(set(prices))[:20]]
elif "link" in inst_lower or "url" in inst_lower:
links = self._page.query_selector_all("a[href]")
data = [
{"text": l.inner_text().strip(), "href": l.get_attribute("href")}
for l in links[:20] if l.get_attribute("href")
]
elif "button" in inst_lower:
buttons = self._page.query_selector_all("button, input[type='submit']")
data = [{"text": b.inner_text(), "type": b.evaluate("e => e.tagName")} for b in buttons[:20]]
else:
text = self._page.evaluate("() => document.body.innerText")
data = [{"text": text[:5000]}]
result = {
"data": data,
"provider": "playwright",
"instruction": instruction,
"count": len(data)
}
self.audit.log(self.agent_id, "extract", {"instruction": instruction, "count": result["count"]})
return result
except Exception as e:
self.audit.log(self.agent_id, "extract_error", {"error": str(e)})
raise BrowserError(f"Extract failed: {e}")
def observe(self, instruction: Optional[str] = None) -> List[Dict[str, Any]]:
"""Observe available actions on page."""
self._check_rate_limit()
if not self._page:
raise BrowserError("No active session")
try:
actions = []
# Clickable elements
clickables = self._page.query_selector_all("a, button, [role='button']")
for el in clickables[:30]:
try:
text = el.inner_text()
tag = el.evaluate("e => e.tagName")
if text and len(text.strip()) > 0:
actions.append({
"action": f"Click {text.strip()[:50]}",
"element": tag.lower(),
"text": text.strip()[:100]
})
except:
pass
# Input fields
inputs = self._page.query_selector_all("input, textarea, select")
for el in inputs[:20]:
try:
name = el.get_attribute("name")
placeholder = el.get_attribute("placeholder")
if name or placeholder:
actions.append({
"action": f"Fill {name or placeholder}",
"element": "input",
"field": name,
"placeholder": placeholder
})
except:
pass
self.audit.log(self.agent_id, "observe", {"instruction": instruction, "actions_count": len(actions)})
return actions
except Exception as e:
raise BrowserError(f"Observe failed: {e}")
# ========================================================================
# SCREENSHOTS & FORMS
# ========================================================================
def screenshot(self, save_path: Optional[str] = None) -> Union[str, bytes]:
"""Take a screenshot."""
if not self._page:
raise BrowserError("No active session")
if not save_path:
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
save_path = str(self.temp_dir / f"screenshot_{self.agent_id}_{timestamp}.png")
try:
self._page.screenshot(path=save_path, full_page=False)
self.audit.log(self.agent_id, "screenshot", {"path": save_path})
return save_path
except Exception as e:
raise BrowserError(f"Screenshot failed: {e}")
def fill_form(self, fields: List[Dict[str, str]]) -> Dict[str, Any]:
"""Fill form fields."""
self._check_rate_limit()
if not self._page:
raise BrowserError("No active session")
results = []
for field in fields:
field_name = field.get("name", "")
value = PIIGuard.redact(str(field.get("value", "")))
field_type = field.get("type", "text")
try:
if field_type in ["text", "email", "password", "tel"]:
selector = f"input[name='{field_name}'], input[id='{field_name}'], #{field_name}"
self._page.fill(selector, value)
elif field_type == "checkbox":
selector = f"input[name='{field_name}']"
if field.get("checked", True):
self._page.check(selector)
else:
self._page.uncheck(selector)
elif field_type == "select":
selector = f"select[name='{field_name}']"
self._page.select_option(selector, value)
results.append({"field": field_name, "status": "filled"})
except Exception as e:
results.append({"field": field_name, "status": "error", "error": str(e)})
self.audit.log(self.agent_id, "fill_form", {"fields": len(fields), "results": results})
return {"filled": results}
def wait_for(self, selector_or_text: str, timeout: int = 10) -> bool:
"""Wait for element or text."""
if not self._page:
raise BrowserError("No active session")
try:
try:
self._page.wait_for_selector(selector_or_text, timeout=timeout * 1000)
return True
except:
try:
self._page.wait_for_selector(f"text={selector_or_text}", timeout=timeout * 1000)
return True
except:
return False
except:
return False
# ============================================================================
# ERROR CLASSES
# ============================================================================
class BrowserError(Exception):
pass
class RateLimitError(BrowserError):
pass
# ============================================================================
# REGISTRATION FOR OCTOTOOLS
# ============================================================================
def register_tools() -> Dict[str, Any]:
return {
"browser": {
"class": BrowserTool,
"description": "Browser automation for AI agents - fully self-hosted, privacy-by-default",
"methods": [
"start_session", "restore_session", "close_session",
"goto", "act", "extract", "observe", "screenshot",
"fill_form", "wait_for", "get_current_url", "get_page_text", "get_html"
]
}
}