AI Agent 高级 Computer Use Claude 浏览器自动化 Playwright

Computer Use 浏览器自动化实战:从截图到自主操作

AIEng Hub
阅读约 35 分钟

引言

2024年10月,Anthropic 发布了 Claude 3.5 Sonnet 的 Computer Use 能力,这是 AI Agent 领域的重大突破。与简单的 API 调用不同,Computer Use 让 AI 能够像人类一样:

  • 查看屏幕:通过截图理解当前界面状态
  • 操作鼠标:点击、拖拽、滚动
  • 输入键盘:打字、快捷键、组合键
  • 等待响应:观察操作结果,决定下一步

本文将深入讲解 Computer Use 的工作原理,并手把手教你构建生产级的浏览器自动化 Agent。

Computer Use 工作原理

┌─────────────────────────────────────────────────────────────┐
│                  Computer Use 循环架构                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────────┐      ┌─────────────┐      ┌──────────┐ │
│   │   截图      │ ───→ │   Claude    │ ───→ │  执行    │ │
│   │ (Screenshot)│      │  分析决策    │      │  操作    │ │
│   └─────────────┘      └─────────────┘      └──────────┘ │
│          ↑                                            │    │
│          │                                            │    │
│          └────────────────────────────────────────────┘    │
│                        观察结果                             │
│                                                             │
│   循环流程:                                                │
│   1. 截取当前屏幕                                          │
│   2. 结合历史操作,分析当前状态                              │
│   3. 决定下一步操作(点击/输入/滚动/完成)                    │
│   4. 执行操作                                              │
│   5. 等待页面响应                                          │
│   6. 回到步骤 1                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

基础实现

1. 环境准备

# requirements.txt
anthropic>=0.39.0
playwright>=1.48.0
Pillow>=10.0.0
python-dotenv>=1.0.0

2. 核心 Computer Use Agent

# computer_use_agent.py
import os
import base64
import json
from typing import List, Dict, Optional, Literal
from dataclasses import dataclass
from pathlib import Path

from anthropic import Anthropic
from playwright.sync_api import sync_playwright, Page, Browser
from PIL import Image
import io

@dataclass
class ComputerAction:
    """计算机操作"""
    action: Literal["click", "type", "scroll", "key", "screenshot", "wait", "finish"]
    coordinates: Optional[tuple[int, int]] = None  # (x, y) for click
    text: Optional[str] = None  # for type
    direction: Optional[Literal["up", "down"]] = None  # for scroll
    key: Optional[str] = None  # for key press
    reason: str = ""  # 操作原因

class ComputerUseAgent:
    """Computer Use Agent"""
    
    def __init__(
        self, 
        api_key: Optional[str] = None,
        model: str = "claude-3-5-sonnet-20241022",
        max_iterations: int = 50,
        screenshot_dir: str = "./screenshots"
    ):
        self.client = Anthropic(api_key=api_key or os.getenv("ANTHROPIC_API_KEY"))
        self.model = model
        self.max_iterations = max_iterations
        self.screenshot_dir = Path(screenshot_dir)
        self.screenshot_dir.mkdir(exist_ok=True)
        
        self.browser: Optional[Browser] = None
        self.page: Optional[Page] = None
        self.playwright = None
        self.iteration = 0
        self.action_history: List[Dict] = []
    
    def start(self, headless: bool = False):
        """启动浏览器"""
        self.playwright = sync_playwright().start()
        self.browser = self.playwright.chromium.launch(headless=headless)
        self.page = self.browser.new_page(viewport={"width": 1280, "height": 800})
        print(f"浏览器已启动 (headless={headless})")
    
    def stop(self):
        """关闭浏览器"""
        if self.browser:
            self.browser.close()
        if self.playwright:
            self.playwright.stop()
        print("浏览器已关闭")
    
    def take_screenshot(self) -> str:
        """截图并返回 base64"""
        screenshot = self.page.screenshot()
        
        # 保存截图
        screenshot_path = self.screenshot_dir / f"step_{self.iteration:03d}.png"
        with open(screenshot_path, "wb") as f:
            f.write(screenshot)
        
        # 返回 base64
        return base64.b64encode(screenshot).decode()
    
    def execute_action(self, action: ComputerAction) -> bool:
        """执行操作"""
        try:
            if action.action == "click":
                if action.coordinates:
                    x, y = action.coordinates
                    self.page.mouse.click(x, y)
                    print(f"点击: ({x}, {y})")
                    
            elif action.action == "type":
                if action.text:
                    # 先点击输入框(假设坐标已提供)
                    if action.coordinates:
                        x, y = action.coordinates
                        self.page.mouse.click(x, y)
                    self.page.keyboard.type(action.text)
                    print(f"输入: {action.text}")
                    
            elif action.action == "scroll":
                direction = action.direction or "down"
                scroll_amount = -500 if direction == "up" else 500
                self.page.mouse.wheel(0, scroll_amount)
                print(f"滚动: {direction}")
                
            elif action.action == "key":
                if action.key:
                    self.page.keyboard.press(action.key)
                    print(f"按键: {action.key}")
                    
            elif action.action == "wait":
                self.page.wait_for_timeout(2000)
                print("等待 2 秒")
                
            elif action.action == "finish":
                print("任务完成")
                return True
                
            # 等待页面响应
            self.page.wait_for_load_state("networkidle")
            
        except Exception as e:
            print(f"操作失败: {e}")
            
        return False
    
    def get_claude_decision(self, screenshot_base64: str, task: str) -> ComputerAction:
        """让 Claude 决定下一步操作"""
        
        # 构建消息
        messages = [
            {
                "role": "user",
                "content": [
                    {
                        "type": "image",
                        "source": {
                            "type": "base64",
                            "media_type": "image/png",
                            "data": screenshot_base64
                        }
                    },
                    {
                        "type": "text",
                        "text": f"""当前屏幕截图如上。

任务目标:{task}

历史操作:
{json.dumps(self.action_history[-5:], indent=2, ensure_ascii=False)}

请分析当前屏幕状态,决定下一步操作。

请返回 JSON 格式:
{{
    "action": "click|type|scroll|key|screenshot|wait|finish",
    "coordinates": [x, y],  // 点击或输入时使用
    "text": "输入的文本",  // type 时使用
    "direction": "up|down",  // scroll 时使用
    "key": "按键",  // key 时使用
    "reason": "操作原因说明"
}}

注意:
1. 坐标是相对于 1280x800 屏幕的像素坐标
2. 如果不确定元素位置,可以先截图观察
3. 操作后等待页面响应
"""
                    }
                ]
            }
        ]
        
        # 调用 Claude
        response = self.client.messages.create(
            model=self.model,
            max_tokens=1024,
            messages=messages
        )
        
        # 解析响应
        try:
            content = response.content[0].text
            # 提取 JSON
            json_start = content.find("{")
            json_end = content.rfind("}") + 1
            if json_start >= 0 and json_end > json_start:
                action_data = json.loads(content[json_start:json_end])
                
                return ComputerAction(
                    action=action_data.get("action", "wait"),
                    coordinates=tuple(action_data["coordinates"]) if "coordinates" in action_data else None,
                    text=action_data.get("text"),
                    direction=action_data.get("direction"),
                    key=action_data.get("key"),
                    reason=action_data.get("reason", "")
                )
        except Exception as e:
            print(f"解析响应失败: {e}")
            print(f"原始响应: {content}")
        
        return ComputerAction(action="wait", reason="解析失败,等待")
    
    def run(self, task: str, start_url: Optional[str] = None) -> Dict:
        """运行任务"""
        print(f"\n{'='*50}")
        print(f"任务: {task}")
        print(f"{'='*50}\n")
        
        # 导航到起始页面
        if start_url:
            self.page.goto(start_url)
            print(f"导航到: {start_url}")
        
        for i in range(self.max_iterations):
            self.iteration = i
            print(f"\n--- 迭代 {i+1}/{self.max_iterations} ---")
            
            # 1. 截图
            screenshot = self.take_screenshot()
            
            # 2. 获取决策
            action = self.get_claude_decision(screenshot, task)
            print(f"决策: {action.action} - {action.reason}")
            
            # 3. 记录历史
            self.action_history.append({
                "iteration": i,
                "action": action.action,
                "coordinates": action.coordinates,
                "text": action.text,
                "reason": action.reason
            })
            
            # 4. 执行操作
            finished = self.execute_action(action)
            
            if finished:
                return {
                    "success": True,
                    "iterations": i + 1,
                    "history": self.action_history
                }
        
        return {
            "success": False,
            "iterations": self.max_iterations,
            "history": self.action_history,
            "error": "达到最大迭代次数"
        }


# 使用示例
if __name__ == "__main__":
    agent = ComputerUseAgent()
    
    try:
        agent.start(headless=False)  # 非 headless 模式可以看到浏览器
        
        # 示例任务:搜索信息
        result = agent.run(
            task="在 Google 搜索 'Claude Computer Use 教程',然后打开第一个结果",
            start_url="https://www.google.com"
        )
        
        print(f"\n任务结果: {result}")
        
    finally:
        agent.stop()

高级功能:智能元素定位

上面的基础实现需要 Claude 提供像素坐标,这不够稳定。下面是改进版,使用 Playwright 的选择器:

# smart_computer_use_agent.py
from playwright.sync_api import ElementHandle
import re

class SmartComputerUseAgent(ComputerUseAgent):
    """智能 Computer Use Agent - 支持元素选择器"""
    
    def get_claude_decision_with_elements(
        self, 
        screenshot_base64: str, 
        task: str,
        interactive_elements: List[Dict]
    ) -> ComputerAction:
        """让 Claude 在标注了元素的页面上做决策"""
        
        # 构建可交互元素描述
        elements_desc = "\n".join([
            f"[{e['id']}] {e['type']}: {e['text'][:50]}... (位置: {e['bbox']})"
            for e in interactive_elements[:20]  # 限制数量
        ])
        
        messages = [
            {
                "role": "user",
                "content": [
                    {
                        "type": "image",
                        "source": {
                            "type": "base64",
                            "media_type": "image/png",
                            "data": screenshot_base64
                        }
                    },
                    {
                        "type": "text",
                        "text": f"""当前屏幕截图如上。页面上标注了可交互元素。

任务目标:{task}

可交互元素列表:
{elements_desc}

请返回 JSON 格式:
{{
    "action": "click|type|scroll|key|wait|finish",
    "element_id": 元素ID,  // 点击或输入时使用
    "text": "输入的文本",  // type 时使用
    "direction": "up|down",  // scroll 时使用
    "key": "按键",  // key 时使用
    "reason": "操作原因"
}}

注意:优先使用 element_id 而不是坐标,更稳定。
"""
                    }
                ]
            }
        ]
        
        response = self.client.messages.create(
            model=self.model,
            max_tokens=1024,
            messages=messages
        )
        
        # 解析响应...
        content = response.content[0].text
        # ... 解析逻辑
        
        return action
    
    def get_interactive_elements(self) -> List[Dict]:
        """获取页面上的可交互元素"""
        elements = []
        
        # 查找按钮、链接、输入框等
        selectors = [
            'button',
            'a[href]',
            'input[type="text"]',
            'input[type="search"]',
            'textarea',
            '[role="button"]',
            '[role="link"]'
        ]
        
        element_id = 0
        for selector in selectors:
            for elem in self.page.query_selector_all(selector):
                try:
                    # 获取元素位置和文本
                    bbox = elem.bounding_box()
                    text = elem.inner_text() or elem.get_attribute('placeholder') or ''
                    
                    if bbox:
                        elements.append({
                            'id': element_id,
                            'type': selector.split('[')[0],
                            'text': text.strip(),
                            'bbox': [bbox['x'], bbox['y'], bbox['width'], bbox['height']],
                            'element': elem
                        })
                        element_id += 1
                except:
                    pass
        
        return elements
    
    def annotate_screenshot(self, elements: List[Dict]) -> str:
        """在截图上标注元素"""
        from PIL import Image, ImageDraw, ImageFont
        
        screenshot = self.page.screenshot()
        img = Image.open(io.BytesIO(screenshot))
        draw = ImageDraw.Draw(img)
        
        for elem in elements:
            x, y, w, h = elem['bbox']
            # 绘制方框
            draw.rectangle([x, y, x+w, y+h], outline="red", width=2)
            # 绘制 ID
            draw.text((x, y-15), str(elem['id']), fill="red")
        
        # 保存并返回 base64
        buffer = io.BytesIO()
        img.save(buffer, format="PNG")
        return base64.b64encode(buffer.getvalue()).decode()
    
    def run_smart(self, task: str, start_url: Optional[str] = None) -> Dict:
        """运行任务(智能版)"""
        if start_url:
            self.page.goto(start_url)
        
        for i in range(self.max_iterations):
            self.iteration = i
            print(f"\n--- 迭代 {i+1}/{self.max_iterations} ---")
            
            # 1. 获取可交互元素
            elements = self.get_interactive_elements()
            print(f"发现 {len(elements)} 个可交互元素")
            
            # 2. 标注截图
            screenshot = self.annotate_screenshot(elements)
            
            # 3. 获取决策
            action = self.get_claude_decision_with_elements(screenshot, task, elements)
            
            # 4. 执行操作(使用 element_id 或坐标)
            if hasattr(action, 'element_id') and action.element_id is not None:
                # 使用元素引用
                element = next(
                    (e for e in elements if e['id'] == action.element_id), 
                    None
                )
                if element:
                    elem = element['element']
                    if action.action == "click":
                        elem.click()
                    elif action.action == "type":
                        elem.fill(action.text or "")
            else:
                # 使用坐标
                finished = self.execute_action(action)
                if finished:
                    return {"success": True, "iterations": i + 1}
        
        return {"success": False, "iterations": self.max_iterations}

实战:自动化测试 Agent

# test_automation_agent.py
from typing import List, Dict
import json

class TestAutomationAgent(SmartComputerUseAgent):
    """自动化测试 Agent"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.test_results: List[Dict] = []
    
    def run_test_case(self, test_case: Dict) -> Dict:
        """运行单个测试用例"""
        print(f"\n运行测试: {test_case['name']}")
        
        result = self.run_smart(
            task=test_case['steps'],
            start_url=test_case.get('start_url')
        )
        
        # 验证结果
        verification = self.verify_result(test_case.get('expected_result', ''))
        
        test_result = {
            "name": test_case['name'],
            "success": result['success'] and verification['success'],
            "iterations": result['iterations'],
            "error": result.get('error', '') or verification.get('error', ''),
            "screenshots": list(self.screenshot_dir.glob("step_*.png"))
        }
        
        self.test_results.append(test_result)
        return test_result
    
    def verify_result(self, expected: str) -> Dict:
        """验证测试结果"""
        # 截图并询问 Claude
        screenshot = self.take_screenshot()
        
        messages = [
            {
                "role": "user",
                "content": [
                    {
                        "type": "image",
                        "source": {
                            "type": "base64",
                            "media_type": "image/png",
                            "data": screenshot
                        }
                    },
                    {
                        "type": "text",
                        "text": f"请验证当前页面是否符合预期:{expected}\n\n返回 JSON:{{'success': true/false, 'reason': '说明'}}"
                    }
                ]
            }
        ]
        
        response = self.client.messages.create(
            model=self.model,
            max_tokens=512,
            messages=messages
        )
        
        try:
            content = response.content[0].text
            json_start = content.find("{")
            json_end = content.rfind("}") + 1
            return json.loads(content[json_start:json_end])
        except:
            return {"success": False, "error": "验证解析失败"}
    
    def run_test_suite(self, test_cases: List[Dict]) -> Dict:
        """运行测试套件"""
        self.start(headless=True)  # 测试用 headless 模式
        
        try:
            for test_case in test_cases:
                self.run_test_case(test_case)
                # 清理截图
                for f in self.screenshot_dir.glob("step_*.png"):
                    f.unlink()
        finally:
            self.stop()
        
        # 生成报告
        return self.generate_report()
    
    def generate_report(self) -> Dict:
        """生成测试报告"""
        total = len(self.test_results)
        passed = sum(1 for r in self.test_results if r['success'])
        failed = total - passed
        
        report = {
            "total": total,
            "passed": passed,
            "failed": failed,
            "pass_rate": passed / total if total > 0 else 0,
            "results": self.test_results
        }
        
        # 打印报告
        print(f"\n{'='*50}")
        print(f"测试报告")
        print(f"{'='*50}")
        print(f"总计: {total}")
        print(f"通过: {passed}")
        print(f"失败: {failed}")
        print(f"通过率: {report['pass_rate']*100:.1f}%")
        print(f"{'='*50}")
        
        for result in self.test_results:
            status = "" if result['success'] else ""
            print(f"{status} {result['name']}")
        
        return report


# 测试用例示例
TEST_CASES = [
    {
        "name": "登录功能测试",
        "start_url": "https://example.com/login",
        "steps": "使用用户名 'test' 和密码 'password' 登录,验证是否成功跳转到首页",
        "expected_result": "页面显示欢迎信息,URL 变为 /dashboard"
    },
    {
        "name": "搜索功能测试",
        "start_url": "https://example.com",
        "steps": "在搜索框输入 'AI Agent' 并搜索,验证结果页面显示相关结果",
        "expected_result": "搜索结果页面显示,包含 'AI Agent' 相关内容"
    },
    {
        "name": "表单提交测试",
        "start_url": "https://example.com/contact",
        "steps": "填写联系表单(姓名、邮箱、消息),点击提交,验证成功提示",
        "expected_result": "显示提交成功消息 'Thank you for your message'"
    }
]

# 运行测试
if __name__ == "__main__":
    agent = TestAutomationAgent()
    report = agent.run_test_suite(TEST_CASES)

安全与限制

# security_utils.py
"""Computer Use 安全工具"""

import re
from typing import List, Optional

class SecurityGuard:
    """Computer Use 安全守卫"""
    
    # 危险操作列表
    DANGEROUS_PATTERNS = [
        r"(rm|del|remove)\s+-[rf].*",  # 删除命令
        r"(curl|wget)\s+.*\|.*bash",    # 管道到 bash
        r"(sudo|administrator).*",       # 提权操作
        r"(password|secret|key).*",      # 敏感信息
    ]
    
    # 危险 URL
    DANGEROUS_DOMAINS = [
        "malware.com",
        "phishing.com",
        # ...
    ]
    
    @classmethod
    def validate_action(cls, action: str, url: Optional[str] = None) -> bool:
        """验证操作是否安全"""
        
        # 检查危险模式
        for pattern in cls.DANGEROUS_PATTERNS:
            if re.search(pattern, action, re.IGNORECASE):
                print(f"⚠️ 检测到危险操作: {action}")
                return False
        
        # 检查危险 URL
        if url:
            for domain in cls.DANGEROUS_DOMAINS:
                if domain in url:
                    print(f"⚠️ 检测到危险域名: {domain}")
                    return False
        
        return True
    
    @classmethod
    def sanitize_input(cls, text: str) -> str:
        """清理输入"""
        # 移除控制字符
        text = re.sub(r'[\x00-\x08\x0b-\x0c\x0e-\x1f]', '', text)
        # 限制长度
        return text[:1000]


# 在 Agent 中使用
class SecureComputerUseAgent(ComputerUseAgent):
    """安全的 Computer Use Agent"""
    
    def execute_action(self, action: ComputerAction) -> bool:
        """安全地执行操作"""
        
        # 验证操作
        action_str = json.dumps({
            "action": action.action,
            "text": action.text,
            "key": action.key
        })
        
        if not SecurityGuard.validate_action(action_str):
            print("操作被安全策略阻止")
            return False
        
        # 清理输入
        if action.text:
            action.text = SecurityGuard.sanitize_input(action.text)
        
        # 执行操作
        return super().execute_action(action)

总结

Computer Use 是 AI Agent 的重要能力:

  1. 核心能力:视觉理解 + 界面操作
  2. 应用场景:自动化测试、数据采集、流程自动化
  3. 技术要点:截图分析、元素定位、循环决策
  4. 安全考虑:操作验证、输入清理、危险拦截

最佳实践:

  • 使用元素选择器而非像素坐标
  • 添加重试机制和错误处理
  • 限制迭代次数防止无限循环
  • 保存截图用于调试和审计
  • 实施安全策略防止危险操作

局限性:

  • 成本较高(每次截图都调用 API)
  • 速度较慢(需要多次迭代)
  • 对动态内容处理能力有限
  • 不适合高频操作场景

相关资源: