什么是 Plan-and-Execute?
Plan-and-Execute 是一种让 AI Agent 先制定计划再执行任务的架构模式。与 ReAct 的”边想边做”不同,Plan-and-Execute 强调:
┌─────────────────────────────────────────────────────────────┐
│ Plan-and-Execute 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 任务输入 │ ──→ │ 制定计划 │ ──→ │ 执行步骤 │ │
│ │ (Task) │ │ (Plan) │ │ (Execute) │ │
│ └─────────────┘ └─────────────┘ └──────┬──────┘ │
│ │ │
│ ┌───────────────────────┘ │
│ ↓ │
│ ┌─────────────┐ │
│ │ 结果检查 │ │
│ │ (Verify) │ │
│ └──────┬──────┘ │
│ │ │
│ 未完成 ───────┴─────── 完成 │
│ ↓ ↓ │
│ ┌──────────┐ ┌──────────┐ │
│ │ 重新规划 │ │ 返回结果 │ │
│ │(Re-plan) │ │ (Output) │ │
│ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
核心优势:
- 全局视角:先规划后执行,避免局部最优
- 可解释性:计划清晰可见,便于调试
- 效率更高:减少重复推理,批量执行步骤
- 可回滚:失败时可从计划断点恢复
Plan-and-Execute vs ReAct
| 特性 | Plan-and-Execute | ReAct |
|---|---|---|
| 规划方式 | 先制定完整计划 | 边执行边规划 |
| 适用场景 | 复杂多步骤任务 | 简单交互式任务 |
| Token消耗 | 通常更少 | 逐步推理消耗多 |
| 可解释性 | 计划清晰可见 | 思维链较分散 |
| 灵活性 | 需要重新规划 | 自然适应变化 |
基础实现
1. 简单 Plan-and-Execute Agent
# plan_and_execute_agent.py
from typing import List, Dict, TypedDict, Annotated
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
import operator
class PlanStep(TypedDict):
"""计划步骤"""
step_number: int
description: str
tool: str # 使用的工具
expected_output: str
class AgentState(TypedDict):
"""Agent 状态"""
messages: Annotated[List[BaseMessage], operator.add]
plan: List[PlanStep]
current_step: int
results: Dict[int, str]
final_answer: str
class PlanAndExecuteAgent:
"""Plan-and-Execute Agent"""
def __init__(self, tools: List):
self.tools = {t.name: t for t in tools}
self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
self.planner = self._create_planner()
self.executor = self._create_executor()
self.graph = self._build_graph()
def _create_planner(self):
"""创建规划器"""
planner_prompt = """你是一个任务规划专家。请分析用户请求,制定详细的执行计划。
可用工具:
{tools_description}
用户请求:{input}
请制定一个详细的执行计划,以 JSON 格式返回:
{{
"plan": [
{{
"step_number": 1,
"description": "步骤描述",
"tool": "工具名称",
"expected_output": "预期输出"
}}
]
}}
要求:
1. 步骤要具体、可执行
2. 明确每个步骤使用的工具
3. 考虑步骤间的依赖关系
4. 预估每个步骤的输出
"""
return planner_prompt
def _create_executor(self):
"""创建执行器"""
return self.llm.bind_tools(list(self.tools.values()))
def _build_graph(self) -> StateGraph:
"""构建执行图"""
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("planner", self._plan_step)
workflow.add_node("executor", self._execute_step)
workflow.add_node("evaluator", self._evaluate_step)
workflow.add_node("replan", self._replan_step)
workflow.add_node("finish", self._finish)
# 添加边
workflow.set_entry_point("planner")
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "evaluator")
# 条件边:根据评估结果决定下一步
workflow.add_conditional_edges(
"evaluator",
self._should_continue,
{
"continue": "executor",
"replan": "replan",
"finish": "finish"
}
)
workflow.add_edge("replan", "executor")
workflow.add_edge("finish", END)
return workflow.compile(checkpointer=MemorySaver())
def _plan_step(self, state: AgentState) -> Dict:
"""规划步骤"""
# 获取用户输入
user_input = state["messages"][-1].content
# 构建工具描述
tools_desc = "\n".join([
f"- {name}: {tool.description}"
for name, tool in self.tools.items()
])
# 调用规划器
prompt = self._create_planner().format(
tools_description=tools_desc,
input=user_input
)
response = self.llm.invoke([HumanMessage(content=prompt)])
# 解析计划
import json
try:
plan_data = json.loads(response.content)
plan = plan_data.get("plan", [])
except:
# 如果解析失败,创建一个简单计划
plan = [{
"step_number": 1,
"description": user_input,
"tool": "default",
"expected_output": "任务结果"
}]
return {
"messages": [AIMessage(content=f"计划已制定:\n{json.dumps(plan, indent=2, ensure_ascii=False)}")],
"plan": plan,
"current_step": 0,
"results": {}
}
def _execute_step(self, state: AgentState) -> Dict:
"""执行当前步骤"""
plan = state["plan"]
current = state["current_step"]
if current >= len(plan):
return {"messages": [AIMessage(content="所有步骤已完成")]}
step = plan[current]
# 构建执行提示
execution_prompt = f"""执行以下计划步骤:
步骤 {step['step_number']}: {step['description']}
使用工具: {step['tool']}
预期输出: {step['expected_output']}
之前的执行结果:
{state['results']}
请执行此步骤并返回结果。
"""
# 调用执行器
response = self.executor.invoke([HumanMessage(content=execution_prompt)])
# 保存结果
results = state["results"].copy()
results[current] = response.content
return {
"messages": [response],
"results": results
}
def _evaluate_step(self, state: AgentState) -> Dict:
"""评估执行结果"""
plan = state["plan"]
current = state["current_step"]
results = state["results"]
if current >= len(plan):
return {"current_step": current, "should_finish": True}
step = plan[current]
result = results.get(current, "")
# 评估结果
eval_prompt = f"""评估以下执行结果:
步骤: {step['description']}
预期输出: {step['expected_output']}
实际输出: {result}
请判断:
1. 结果是否符合预期?
2. 是否需要重新规划?
3. 是否可以继续下一步?
返回 JSON:{{"status": "success/fail/replan", "reason": "原因"}}
"""
response = self.llm.invoke([HumanMessage(content=eval_prompt)])
# 解析评估结果
import json
try:
eval_data = json.loads(response.content)
status = eval_data.get("status", "success")
except:
status = "success"
# 决定下一步
if status == "replan":
return {"current_step": current, "should_replan": True}
elif status == "fail":
return {"current_step": current, "should_replan": True}
else:
# 成功,进入下一步
return {"current_step": current + 1, "should_continue": True}
def _should_continue(self, state: AgentState) -> str:
"""决定下一步"""
plan = state["plan"]
current = state["current_step"]
if current >= len(plan):
return "finish"
elif state.get("should_replan"):
return "replan"
else:
return "continue"
def _replan_step(self, state: AgentState) -> Dict:
"""重新规划"""
# 基于当前结果重新制定计划
user_input = state["messages"][0].content
current_results = state["results"]
replan_prompt = f"""基于以下执行结果,重新制定计划:
原始请求:{user_input}
已执行步骤结果:
{current_results}
剩余任务需要重新规划。请制定新的计划。
"""
response = self.llm.invoke([HumanMessage(content=replan_prompt)])
# 解析新计划
import json
try:
plan_data = json.loads(response.content)
new_plan = plan_data.get("plan", [])
except:
new_plan = []
return {
"messages": [AIMessage(content=f"计划已调整:\n{json.dumps(new_plan, indent=2, ensure_ascii=False)}")],
"plan": new_plan,
"should_replan": False
}
def _finish(self, state: AgentState) -> Dict:
"""完成任务"""
results = state["results"]
# 汇总结果
summary_prompt = f"""汇总以下执行结果,给出最终答案:
执行结果:
{results}
请给出清晰、完整的最终答案。
"""
response = self.llm.invoke([HumanMessage(content=summary_prompt)])
return {
"messages": [response],
"final_answer": response.content
}
def run(self, query: str) -> Dict:
"""运行 Agent"""
initial_state = {
"messages": [HumanMessage(content=query)],
"plan": [],
"current_step": 0,
"results": {},
"final_answer": ""
}
result = self.graph.invoke(
initial_state,
config={"configurable": {"thread_id": "1"}}
)
return result
# 定义工具
@tool
def search_web(query: str) -> str:
"""搜索网络信息"""
return f"搜索结果:关于 '{query}' 的相关信息..."
@tool
def calculate(expression: str) -> str:
"""计算数学表达式"""
try:
result = eval(expression)
return f"计算结果:{result}"
except:
return "计算错误"
@tool
def write_file(filename: str, content: str) -> str:
"""写入文件"""
return f"文件 '{filename}' 已写入"
# 使用示例
if __name__ == "__main__":
tools = [search_web, calculate, write_file]
agent = PlanAndExecuteAgent(tools)
result = agent.run("搜索2024年AI发展趋势,计算增长率,并将结果保存到文件")
print("最终答案:", result.get("final_answer"))
2. 使用 LangGraph 的简化实现
# simplified_plan_execute.py
from typing import List, TypedDict
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
import json
class State(TypedDict):
input: str
plan: List[str]
past_steps: List[tuple]
response: str
# 创建 LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# 规划器
planner_prompt = """对于给定目标,制定一个简单的逐步计划。
计划应包含独立的任务,如果正确执行,将得出正确答案。
不要添加任何多余的步骤。
目标:{input}
以 JSON 格式返回计划:{{"steps": ["步骤1", "步骤2", ...]}}"""
def planner(state: State):
"""制定计划"""
prompt = planner_prompt.format(input=state["input"])
response = llm.invoke([HumanMessage(content=prompt)])
try:
plan = json.loads(response.content)
steps = plan.get("steps", [])
except:
steps = [state["input"]]
return {"plan": steps}
# 执行器
executor_prompt = """执行以下任务:
任务:{step}
之前的执行结果:
{past_steps}
请执行任务并返回结果。"""
def executor(state: State):
"""执行步骤"""
if not state["plan"]:
return {"response": "计划为空"}
current_step = state["plan"][0]
past_steps_str = "\n".join([
f"步骤:{s}\n结果:{r}"
for s, r in state["past_steps"]
])
prompt = executor_prompt.format(
step=current_step,
past_steps=past_steps_str
)
response = llm.invoke([HumanMessage(content=prompt)])
# 更新状态
new_past_steps = state["past_steps"] + [(current_step, response.content)]
new_plan = state["plan"][1:]
return {
"past_steps": new_past_steps,
"plan": new_plan
}
# 重新规划器
replan_prompt = """你的任务是:
1. 根据原始目标判断当前进度
2. 决定是否需要调整计划
原始目标:{input}
已完成步骤:
{past_steps}
剩余计划:{plan}
如果需要调整,请返回新的计划。
以 JSON 格式返回:{{"plan": ["新步骤1", "新步骤2"]}}"""
def replanner(state: State):
"""重新规划"""
if not state["plan"]:
return {"response": "任务完成"}
past_steps_str = "\n".join([
f"步骤:{s}\n结果:{r}"
for s, r in state["past_steps"]
])
prompt = replan_prompt.format(
input=state["input"],
past_steps=past_steps_str,
plan=state["plan"]
)
response = llm.invoke([HumanMessage(content=prompt)])
try:
new_plan = json.loads(response.content)
steps = new_plan.get("plan", state["plan"])
except:
steps = state["plan"]
return {"plan": steps}
# 构建图
workflow = StateGraph(State)
workflow.add_node("planner", planner)
workflow.add_node("executor", executor)
workflow.add_node("replan", replanner)
workflow.set_entry_point("planner")
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "replan")
# 条件边:检查是否完成
def should_end(state: State):
if not state["plan"] or state.get("response"):
return END
return "executor"
workflow.add_conditional_edges("replan", should_end)
app = workflow.compile()
# 运行
result = app.invoke({
"input": "研究2024年AI Agent发展趋势,总结3个关键方向",
"plan": [],
"past_steps": [],
"response": ""
})
print("执行结果:")
for step, result_text in result["past_steps"]:
print(f"\n【{step}】")
print(result_text)
高级功能
1. 分层规划
# hierarchical_planning.py
from typing import List, Dict
from dataclasses import dataclass
@dataclass
class Task:
"""任务定义"""
id: str
description: str
subtasks: List['Task'] = None
dependencies: List[str] = None
status: str = "pending" # pending, in_progress, completed, failed
def __post_init__(self):
if self.subtasks is None:
self.subtasks = []
if self.dependencies is None:
self.dependencies = []
class HierarchicalPlanner:
"""分层规划器"""
def __init__(self, llm):
self.llm = llm
def create_high_level_plan(self, goal: str) -> List[Task]:
"""创建高层计划"""
prompt = f"""将以下目标分解为3-5个主要阶段:
目标:{goal}
以 JSON 格式返回:
{{
"phases": [
{{"id": "phase1", "description": "阶段描述"}},
...
]
}}"""
response = self.llm.invoke([HumanMessage(content=prompt)])
try:
data = json.loads(response.content)
phases = data.get("phases", [])
return [
Task(id=p["id"], description=p["description"])
for p in phases
]
except:
return [Task(id="1", description=goal)]
def expand_task(self, task: Task) -> Task:
"""展开任务为子任务"""
prompt = f"""将以下任务分解为具体的子任务:
任务:{task.description}
要求:
1. 每个子任务应可独立执行
2. 明确子任务间的依赖关系
3. 最多5个子任务
以 JSON 格式返回:
{{
"subtasks": [
{{
"id": "subtask1",
"description": "子任务描述",
"dependencies": []
}}
]
}}"""
response = self.llm.invoke([HumanMessage(content=prompt)])
try:
data = json.loads(response.content)
subtasks_data = data.get("subtasks", [])
task.subtasks = [
Task(
id=f"{task.id}.{st['id']}",
description=st["description"],
dependencies=st.get("dependencies", [])
)
for st in subtasks_data
]
except:
pass
return task
def execute_hierarchical(self, goal: str, executor_func):
"""分层执行"""
# 1. 创建高层计划
high_level_plan = self.create_high_level_plan(goal)
print(f"高层计划:{len(high_level_plan)} 个阶段")
results = {}
for phase in high_level_plan:
print(f"\n{'='*50}")
print(f"执行阶段:{phase.description}")
print(f"{'='*50}")
# 2. 展开为子任务
phase = self.expand_task(phase)
# 3. 按依赖顺序执行子任务
completed = set()
for subtask in phase.subtasks:
# 检查依赖
if not all(dep in completed for dep in subtask.dependencies):
print(f"等待依赖:{subtask.dependencies}")
continue
print(f"\n执行子任务:{subtask.description}")
result = executor_func(subtask.description)
results[subtask.id] = result
completed.add(subtask.id)
print(f"结果:{result[:100]}...")
return results
2. 动态重新规划
# dynamic_replanning.py
class DynamicReplanningAgent:
"""支持动态重新规划的 Agent"""
def __init__(self, llm, tools):
self.llm = llm
self.tools = tools
self.execution_history = []
def should_replan(self, state: Dict) -> bool:
"""判断是否需要重新规划"""
prompt = f"""基于以下执行情况,判断是否需要调整计划:
原始计划:
{state['plan']}
当前步骤:{state['current_step']}
最近执行结果:{state['last_result']}
考虑因素:
1. 执行结果是否符合预期?
2. 是否出现了新的信息需要调整计划?
3. 当前路径是否无法达成目标?
返回 JSON:{{"should_replan": true/false, "reason": "原因"}}"""
response = self.llm.invoke([HumanMessage(content=prompt)])
try:
result = json.loads(response.content)
return result.get("should_replan", False)
except:
return False
def replan(self, state: Dict) -> List[Dict]:
"""重新制定计划"""
prompt = f"""基于以下信息,制定新的执行计划:
原始目标:{state['goal']}
已完成步骤:{state['completed_steps']}
已获得的成果:{state['results']}
遇到的问题:{state.get('issues', '无')}
要求:
1. 充分利用已完成的成果
2. 避开已发现的问题
3. 制定更可行的路径
以 JSON 格式返回新的计划。"""
response = self.llm.invoke([HumanMessage(content=prompt)])
try:
new_plan = json.loads(response.content)
return new_plan.get("plan", [])
except:
return state["plan"]
最佳实践
1. 计划粒度控制
def optimize_plan_granularity(steps: List[str], max_steps: int = 10) -> List[str]:
"""优化计划粒度"""
if len(steps) <= max_steps:
return steps
# 合并过细的步骤
prompt = f"""将以下 {len(steps)} 个步骤合并为不超过 {max_steps} 个主要步骤:
步骤:
{chr(10).join(f"{i+1}. {step}" for i, step in enumerate(steps))}
要求:
1. 保持逻辑完整性
2. 每个步骤可独立执行
3. 步骤间有清晰的依赖关系
以列表形式返回合并后的步骤。"""
response = llm.invoke([HumanMessage(content=prompt)])
# 解析结果
merged_steps = [line.strip("- ") for line in response.content.split("\n") if line.strip()]
return merged_steps[:max_steps]
2. 执行监控
class ExecutionMonitor:
"""执行监控器"""
def __init__(self):
self.metrics = {
"total_steps": 0,
"completed_steps": 0,
"replan_count": 0,
"execution_time": 0
}
def log_step(self, step: Dict, result: str):
"""记录步骤执行"""
self.metrics["total_steps"] += 1
log_entry = {
"timestamp": time.time(),
"step": step,
"result": result[:200], # 截断
"success": "error" not in result.lower()
}
print(f"[Step {self.metrics['total_steps']}] {step.get('description', 'Unknown')}")
return log_entry
def get_report(self) -> Dict:
"""生成执行报告"""
return {
**self.metrics,
"success_rate": self.metrics["completed_steps"] / max(self.metrics["total_steps"], 1),
"avg_time_per_step": self.metrics["execution_time"] / max(self.metrics["total_steps"], 1)
}
应用场景
| 场景 | 说明 | 示例 |
|---|---|---|
| 数据分析 | 多步骤数据处理和分析 | 收集数据→清洗→分析→可视化 |
| 内容创作 | 结构化内容生成 | 大纲→章节→润色→排版 |
| 软件开发 | 复杂任务分解 | 需求→设计→编码→测试→部署 |
| 研究调研 | 多源信息整合 | 搜索→筛选→总结→报告 |
总结
Plan-and-Execute 模式的核心要点:
- 先规划后执行:避免盲目尝试,提高成功率
- 灵活调整:根据实际情况动态重新规划
- 可观测性:计划清晰,便于调试和优化
- 错误恢复:支持从断点恢复,提高鲁棒性
适用场景:
- 复杂多步骤任务
- 需要全局优化的场景
- 对可解释性要求高的场景
不适用场景:
- 简单单步任务
- 需要快速响应的交互场景
- 高度动态变化的环境
相关资源: