引言
2024年10月,Anthropic 发布了 Claude 3.5 Sonnet 的 Computer Use 能力,这是 AI Agent 领域的重大突破。与简单的 API 调用不同,Computer Use 让 AI 能够像人类一样:
- 查看屏幕:通过截图理解当前界面状态
- 操作鼠标:点击、拖拽、滚动
- 输入键盘:打字、快捷键、组合键
- 等待响应:观察操作结果,决定下一步
本文将深入讲解 Computer Use 的工作原理,并手把手教你构建生产级的浏览器自动化 Agent。
Computer Use 工作原理
┌─────────────────────────────────────────────────────────────┐
│ Computer Use 循环架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌──────────┐ │
│ │ 截图 │ ───→ │ Claude │ ───→ │ 执行 │ │
│ │ (Screenshot)│ │ 分析决策 │ │ 操作 │ │
│ └─────────────┘ └─────────────┘ └──────────┘ │
│ ↑ │ │
│ │ │ │
│ └────────────────────────────────────────────┘ │
│ 观察结果 │
│ │
│ 循环流程: │
│ 1. 截取当前屏幕 │
│ 2. 结合历史操作,分析当前状态 │
│ 3. 决定下一步操作(点击/输入/滚动/完成) │
│ 4. 执行操作 │
│ 5. 等待页面响应 │
│ 6. 回到步骤 1 │
│ │
└─────────────────────────────────────────────────────────────┘
基础实现
1. 环境准备
# requirements.txt
anthropic>=0.39.0
playwright>=1.48.0
Pillow>=10.0.0
python-dotenv>=1.0.0
2. 核心 Computer Use Agent
# computer_use_agent.py
import os
import base64
import json
from typing import List, Dict, Optional, Literal
from dataclasses import dataclass
from pathlib import Path
from anthropic import Anthropic
from playwright.sync_api import sync_playwright, Page, Browser
from PIL import Image
import io
@dataclass
class ComputerAction:
"""计算机操作"""
action: Literal["click", "type", "scroll", "key", "screenshot", "wait", "finish"]
coordinates: Optional[tuple[int, int]] = None # (x, y) for click
text: Optional[str] = None # for type
direction: Optional[Literal["up", "down"]] = None # for scroll
key: Optional[str] = None # for key press
reason: str = "" # 操作原因
class ComputerUseAgent:
"""Computer Use Agent"""
def __init__(
self,
api_key: Optional[str] = None,
model: str = "claude-3-5-sonnet-20241022",
max_iterations: int = 50,
screenshot_dir: str = "./screenshots"
):
self.client = Anthropic(api_key=api_key or os.getenv("ANTHROPIC_API_KEY"))
self.model = model
self.max_iterations = max_iterations
self.screenshot_dir = Path(screenshot_dir)
self.screenshot_dir.mkdir(exist_ok=True)
self.browser: Optional[Browser] = None
self.page: Optional[Page] = None
self.playwright = None
self.iteration = 0
self.action_history: List[Dict] = []
def start(self, headless: bool = False):
"""启动浏览器"""
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(headless=headless)
self.page = self.browser.new_page(viewport={"width": 1280, "height": 800})
print(f"浏览器已启动 (headless={headless})")
def stop(self):
"""关闭浏览器"""
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
print("浏览器已关闭")
def take_screenshot(self) -> str:
"""截图并返回 base64"""
screenshot = self.page.screenshot()
# 保存截图
screenshot_path = self.screenshot_dir / f"step_{self.iteration:03d}.png"
with open(screenshot_path, "wb") as f:
f.write(screenshot)
# 返回 base64
return base64.b64encode(screenshot).decode()
def execute_action(self, action: ComputerAction) -> bool:
"""执行操作"""
try:
if action.action == "click":
if action.coordinates:
x, y = action.coordinates
self.page.mouse.click(x, y)
print(f"点击: ({x}, {y})")
elif action.action == "type":
if action.text:
# 先点击输入框(假设坐标已提供)
if action.coordinates:
x, y = action.coordinates
self.page.mouse.click(x, y)
self.page.keyboard.type(action.text)
print(f"输入: {action.text}")
elif action.action == "scroll":
direction = action.direction or "down"
scroll_amount = -500 if direction == "up" else 500
self.page.mouse.wheel(0, scroll_amount)
print(f"滚动: {direction}")
elif action.action == "key":
if action.key:
self.page.keyboard.press(action.key)
print(f"按键: {action.key}")
elif action.action == "wait":
self.page.wait_for_timeout(2000)
print("等待 2 秒")
elif action.action == "finish":
print("任务完成")
return True
# 等待页面响应
self.page.wait_for_load_state("networkidle")
except Exception as e:
print(f"操作失败: {e}")
return False
def get_claude_decision(self, screenshot_base64: str, task: str) -> ComputerAction:
"""让 Claude 决定下一步操作"""
# 构建消息
messages = [
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": screenshot_base64
}
},
{
"type": "text",
"text": f"""当前屏幕截图如上。
任务目标:{task}
历史操作:
{json.dumps(self.action_history[-5:], indent=2, ensure_ascii=False)}
请分析当前屏幕状态,决定下一步操作。
请返回 JSON 格式:
{{
"action": "click|type|scroll|key|screenshot|wait|finish",
"coordinates": [x, y], // 点击或输入时使用
"text": "输入的文本", // type 时使用
"direction": "up|down", // scroll 时使用
"key": "按键", // key 时使用
"reason": "操作原因说明"
}}
注意:
1. 坐标是相对于 1280x800 屏幕的像素坐标
2. 如果不确定元素位置,可以先截图观察
3. 操作后等待页面响应
"""
}
]
}
]
# 调用 Claude
response = self.client.messages.create(
model=self.model,
max_tokens=1024,
messages=messages
)
# 解析响应
try:
content = response.content[0].text
# 提取 JSON
json_start = content.find("{")
json_end = content.rfind("}") + 1
if json_start >= 0 and json_end > json_start:
action_data = json.loads(content[json_start:json_end])
return ComputerAction(
action=action_data.get("action", "wait"),
coordinates=tuple(action_data["coordinates"]) if "coordinates" in action_data else None,
text=action_data.get("text"),
direction=action_data.get("direction"),
key=action_data.get("key"),
reason=action_data.get("reason", "")
)
except Exception as e:
print(f"解析响应失败: {e}")
print(f"原始响应: {content}")
return ComputerAction(action="wait", reason="解析失败,等待")
def run(self, task: str, start_url: Optional[str] = None) -> Dict:
"""运行任务"""
print(f"\n{'='*50}")
print(f"任务: {task}")
print(f"{'='*50}\n")
# 导航到起始页面
if start_url:
self.page.goto(start_url)
print(f"导航到: {start_url}")
for i in range(self.max_iterations):
self.iteration = i
print(f"\n--- 迭代 {i+1}/{self.max_iterations} ---")
# 1. 截图
screenshot = self.take_screenshot()
# 2. 获取决策
action = self.get_claude_decision(screenshot, task)
print(f"决策: {action.action} - {action.reason}")
# 3. 记录历史
self.action_history.append({
"iteration": i,
"action": action.action,
"coordinates": action.coordinates,
"text": action.text,
"reason": action.reason
})
# 4. 执行操作
finished = self.execute_action(action)
if finished:
return {
"success": True,
"iterations": i + 1,
"history": self.action_history
}
return {
"success": False,
"iterations": self.max_iterations,
"history": self.action_history,
"error": "达到最大迭代次数"
}
# 使用示例
if __name__ == "__main__":
agent = ComputerUseAgent()
try:
agent.start(headless=False) # 非 headless 模式可以看到浏览器
# 示例任务:搜索信息
result = agent.run(
task="在 Google 搜索 'Claude Computer Use 教程',然后打开第一个结果",
start_url="https://www.google.com"
)
print(f"\n任务结果: {result}")
finally:
agent.stop()
高级功能:智能元素定位
上面的基础实现需要 Claude 提供像素坐标,这不够稳定。下面是改进版,使用 Playwright 的选择器:
# smart_computer_use_agent.py
from playwright.sync_api import ElementHandle
import re
class SmartComputerUseAgent(ComputerUseAgent):
"""智能 Computer Use Agent - 支持元素选择器"""
def get_claude_decision_with_elements(
self,
screenshot_base64: str,
task: str,
interactive_elements: List[Dict]
) -> ComputerAction:
"""让 Claude 在标注了元素的页面上做决策"""
# 构建可交互元素描述
elements_desc = "\n".join([
f"[{e['id']}] {e['type']}: {e['text'][:50]}... (位置: {e['bbox']})"
for e in interactive_elements[:20] # 限制数量
])
messages = [
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": screenshot_base64
}
},
{
"type": "text",
"text": f"""当前屏幕截图如上。页面上标注了可交互元素。
任务目标:{task}
可交互元素列表:
{elements_desc}
请返回 JSON 格式:
{{
"action": "click|type|scroll|key|wait|finish",
"element_id": 元素ID, // 点击或输入时使用
"text": "输入的文本", // type 时使用
"direction": "up|down", // scroll 时使用
"key": "按键", // key 时使用
"reason": "操作原因"
}}
注意:优先使用 element_id 而不是坐标,更稳定。
"""
}
]
}
]
response = self.client.messages.create(
model=self.model,
max_tokens=1024,
messages=messages
)
# 解析响应...
content = response.content[0].text
# ... 解析逻辑
return action
def get_interactive_elements(self) -> List[Dict]:
"""获取页面上的可交互元素"""
elements = []
# 查找按钮、链接、输入框等
selectors = [
'button',
'a[href]',
'input[type="text"]',
'input[type="search"]',
'textarea',
'[role="button"]',
'[role="link"]'
]
element_id = 0
for selector in selectors:
for elem in self.page.query_selector_all(selector):
try:
# 获取元素位置和文本
bbox = elem.bounding_box()
text = elem.inner_text() or elem.get_attribute('placeholder') or ''
if bbox:
elements.append({
'id': element_id,
'type': selector.split('[')[0],
'text': text.strip(),
'bbox': [bbox['x'], bbox['y'], bbox['width'], bbox['height']],
'element': elem
})
element_id += 1
except:
pass
return elements
def annotate_screenshot(self, elements: List[Dict]) -> str:
"""在截图上标注元素"""
from PIL import Image, ImageDraw, ImageFont
screenshot = self.page.screenshot()
img = Image.open(io.BytesIO(screenshot))
draw = ImageDraw.Draw(img)
for elem in elements:
x, y, w, h = elem['bbox']
# 绘制方框
draw.rectangle([x, y, x+w, y+h], outline="red", width=2)
# 绘制 ID
draw.text((x, y-15), str(elem['id']), fill="red")
# 保存并返回 base64
buffer = io.BytesIO()
img.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode()
def run_smart(self, task: str, start_url: Optional[str] = None) -> Dict:
"""运行任务(智能版)"""
if start_url:
self.page.goto(start_url)
for i in range(self.max_iterations):
self.iteration = i
print(f"\n--- 迭代 {i+1}/{self.max_iterations} ---")
# 1. 获取可交互元素
elements = self.get_interactive_elements()
print(f"发现 {len(elements)} 个可交互元素")
# 2. 标注截图
screenshot = self.annotate_screenshot(elements)
# 3. 获取决策
action = self.get_claude_decision_with_elements(screenshot, task, elements)
# 4. 执行操作(使用 element_id 或坐标)
if hasattr(action, 'element_id') and action.element_id is not None:
# 使用元素引用
element = next(
(e for e in elements if e['id'] == action.element_id),
None
)
if element:
elem = element['element']
if action.action == "click":
elem.click()
elif action.action == "type":
elem.fill(action.text or "")
else:
# 使用坐标
finished = self.execute_action(action)
if finished:
return {"success": True, "iterations": i + 1}
return {"success": False, "iterations": self.max_iterations}
实战:自动化测试 Agent
# test_automation_agent.py
from typing import List, Dict
import json
class TestAutomationAgent(SmartComputerUseAgent):
"""自动化测试 Agent"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.test_results: List[Dict] = []
def run_test_case(self, test_case: Dict) -> Dict:
"""运行单个测试用例"""
print(f"\n运行测试: {test_case['name']}")
result = self.run_smart(
task=test_case['steps'],
start_url=test_case.get('start_url')
)
# 验证结果
verification = self.verify_result(test_case.get('expected_result', ''))
test_result = {
"name": test_case['name'],
"success": result['success'] and verification['success'],
"iterations": result['iterations'],
"error": result.get('error', '') or verification.get('error', ''),
"screenshots": list(self.screenshot_dir.glob("step_*.png"))
}
self.test_results.append(test_result)
return test_result
def verify_result(self, expected: str) -> Dict:
"""验证测试结果"""
# 截图并询问 Claude
screenshot = self.take_screenshot()
messages = [
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": screenshot
}
},
{
"type": "text",
"text": f"请验证当前页面是否符合预期:{expected}\n\n返回 JSON:{{'success': true/false, 'reason': '说明'}}"
}
]
}
]
response = self.client.messages.create(
model=self.model,
max_tokens=512,
messages=messages
)
try:
content = response.content[0].text
json_start = content.find("{")
json_end = content.rfind("}") + 1
return json.loads(content[json_start:json_end])
except:
return {"success": False, "error": "验证解析失败"}
def run_test_suite(self, test_cases: List[Dict]) -> Dict:
"""运行测试套件"""
self.start(headless=True) # 测试用 headless 模式
try:
for test_case in test_cases:
self.run_test_case(test_case)
# 清理截图
for f in self.screenshot_dir.glob("step_*.png"):
f.unlink()
finally:
self.stop()
# 生成报告
return self.generate_report()
def generate_report(self) -> Dict:
"""生成测试报告"""
total = len(self.test_results)
passed = sum(1 for r in self.test_results if r['success'])
failed = total - passed
report = {
"total": total,
"passed": passed,
"failed": failed,
"pass_rate": passed / total if total > 0 else 0,
"results": self.test_results
}
# 打印报告
print(f"\n{'='*50}")
print(f"测试报告")
print(f"{'='*50}")
print(f"总计: {total}")
print(f"通过: {passed}")
print(f"失败: {failed}")
print(f"通过率: {report['pass_rate']*100:.1f}%")
print(f"{'='*50}")
for result in self.test_results:
status = "✓" if result['success'] else "✗"
print(f"{status} {result['name']}")
return report
# 测试用例示例
TEST_CASES = [
{
"name": "登录功能测试",
"start_url": "https://example.com/login",
"steps": "使用用户名 'test' 和密码 'password' 登录,验证是否成功跳转到首页",
"expected_result": "页面显示欢迎信息,URL 变为 /dashboard"
},
{
"name": "搜索功能测试",
"start_url": "https://example.com",
"steps": "在搜索框输入 'AI Agent' 并搜索,验证结果页面显示相关结果",
"expected_result": "搜索结果页面显示,包含 'AI Agent' 相关内容"
},
{
"name": "表单提交测试",
"start_url": "https://example.com/contact",
"steps": "填写联系表单(姓名、邮箱、消息),点击提交,验证成功提示",
"expected_result": "显示提交成功消息 'Thank you for your message'"
}
]
# 运行测试
if __name__ == "__main__":
agent = TestAutomationAgent()
report = agent.run_test_suite(TEST_CASES)
安全与限制
# security_utils.py
"""Computer Use 安全工具"""
import re
from typing import List, Optional
class SecurityGuard:
"""Computer Use 安全守卫"""
# 危险操作列表
DANGEROUS_PATTERNS = [
r"(rm|del|remove)\s+-[rf].*", # 删除命令
r"(curl|wget)\s+.*\|.*bash", # 管道到 bash
r"(sudo|administrator).*", # 提权操作
r"(password|secret|key).*", # 敏感信息
]
# 危险 URL
DANGEROUS_DOMAINS = [
"malware.com",
"phishing.com",
# ...
]
@classmethod
def validate_action(cls, action: str, url: Optional[str] = None) -> bool:
"""验证操作是否安全"""
# 检查危险模式
for pattern in cls.DANGEROUS_PATTERNS:
if re.search(pattern, action, re.IGNORECASE):
print(f"⚠️ 检测到危险操作: {action}")
return False
# 检查危险 URL
if url:
for domain in cls.DANGEROUS_DOMAINS:
if domain in url:
print(f"⚠️ 检测到危险域名: {domain}")
return False
return True
@classmethod
def sanitize_input(cls, text: str) -> str:
"""清理输入"""
# 移除控制字符
text = re.sub(r'[\x00-\x08\x0b-\x0c\x0e-\x1f]', '', text)
# 限制长度
return text[:1000]
# 在 Agent 中使用
class SecureComputerUseAgent(ComputerUseAgent):
"""安全的 Computer Use Agent"""
def execute_action(self, action: ComputerAction) -> bool:
"""安全地执行操作"""
# 验证操作
action_str = json.dumps({
"action": action.action,
"text": action.text,
"key": action.key
})
if not SecurityGuard.validate_action(action_str):
print("操作被安全策略阻止")
return False
# 清理输入
if action.text:
action.text = SecurityGuard.sanitize_input(action.text)
# 执行操作
return super().execute_action(action)
总结
Computer Use 是 AI Agent 的重要能力:
- 核心能力:视觉理解 + 界面操作
- 应用场景:自动化测试、数据采集、流程自动化
- 技术要点:截图分析、元素定位、循环决策
- 安全考虑:操作验证、输入清理、危险拦截
最佳实践:
- 使用元素选择器而非像素坐标
- 添加重试机制和错误处理
- 限制迭代次数防止无限循环
- 保存截图用于调试和审计
- 实施安全策略防止危险操作
局限性:
- 成本较高(每次截图都调用 API)
- 速度较慢(需要多次迭代)
- 对动态内容处理能力有限
- 不适合高频操作场景
相关资源: