AI Agent 进阶 Plan-and-Execute AI Agent 任务规划 LangChain

Plan-and-Execute 模式:让 AI Agent 学会规划

AIEng Hub
阅读约 25 分钟

什么是 Plan-and-Execute?

Plan-and-Execute 是一种让 AI Agent 先制定计划再执行任务的架构模式。与 ReAct 的”边想边做”不同,Plan-and-Execute 强调:

┌─────────────────────────────────────────────────────────────┐
│                  Plan-and-Execute 架构                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────────┐     ┌─────────────┐     ┌─────────────┐ │
│   │   任务输入   │ ──→ │   制定计划   │ ──→ │   执行步骤   │ │
│   │   (Task)    │     │   (Plan)    │     │  (Execute)  │ │
│   └─────────────┘     └─────────────┘     └──────┬──────┘ │
│                                                    │        │
│                            ┌───────────────────────┘        │
│                            ↓                                │
│                     ┌─────────────┐                        │
│                     │  结果检查   │                        │
│                     │  (Verify)   │                        │
│                     └──────┬──────┘                        │
│                            │                                │
│              未完成 ───────┴─────── 完成                   │
│                  ↓                      ↓                   │
│           ┌──────────┐          ┌──────────┐              │
│           │ 重新规划  │          │ 返回结果  │              │
│           │(Re-plan) │          │ (Output) │              │
│           └──────────┘          └──────────┘              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

核心优势:

  • 全局视角:先规划后执行,避免局部最优
  • 可解释性:计划清晰可见,便于调试
  • 效率更高:减少重复推理,批量执行步骤
  • 可回滚:失败时可从计划断点恢复

Plan-and-Execute vs ReAct

特性Plan-and-ExecuteReAct
规划方式先制定完整计划边执行边规划
适用场景复杂多步骤任务简单交互式任务
Token消耗通常更少逐步推理消耗多
可解释性计划清晰可见思维链较分散
灵活性需要重新规划自然适应变化

基础实现

1. 简单 Plan-and-Execute Agent

# plan_and_execute_agent.py
from typing import List, Dict, TypedDict, Annotated
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
import operator

class PlanStep(TypedDict):
    """计划步骤"""
    step_number: int
    description: str
    tool: str  # 使用的工具
    expected_output: str

class AgentState(TypedDict):
    """Agent 状态"""
    messages: Annotated[List[BaseMessage], operator.add]
    plan: List[PlanStep]
    current_step: int
    results: Dict[int, str]
    final_answer: str

class PlanAndExecuteAgent:
    """Plan-and-Execute Agent"""
    
    def __init__(self, tools: List):
        self.tools = {t.name: t for t in tools}
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
        self.planner = self._create_planner()
        self.executor = self._create_executor()
        self.graph = self._build_graph()
    
    def _create_planner(self):
        """创建规划器"""
        planner_prompt = """你是一个任务规划专家。请分析用户请求,制定详细的执行计划。

可用工具:
{tools_description}

用户请求:{input}

请制定一个详细的执行计划,以 JSON 格式返回:
{{
    "plan": [
        {{
            "step_number": 1,
            "description": "步骤描述",
            "tool": "工具名称",
            "expected_output": "预期输出"
        }}
    ]
}}

要求:
1. 步骤要具体、可执行
2. 明确每个步骤使用的工具
3. 考虑步骤间的依赖关系
4. 预估每个步骤的输出
"""
        return planner_prompt
    
    def _create_executor(self):
        """创建执行器"""
        return self.llm.bind_tools(list(self.tools.values()))
    
    def _build_graph(self) -> StateGraph:
        """构建执行图"""
        workflow = StateGraph(AgentState)
        
        # 添加节点
        workflow.add_node("planner", self._plan_step)
        workflow.add_node("executor", self._execute_step)
        workflow.add_node("evaluator", self._evaluate_step)
        workflow.add_node("replan", self._replan_step)
        workflow.add_node("finish", self._finish)
        
        # 添加边
        workflow.set_entry_point("planner")
        workflow.add_edge("planner", "executor")
        workflow.add_edge("executor", "evaluator")
        
        # 条件边:根据评估结果决定下一步
        workflow.add_conditional_edges(
            "evaluator",
            self._should_continue,
            {
                "continue": "executor",
                "replan": "replan",
                "finish": "finish"
            }
        )
        
        workflow.add_edge("replan", "executor")
        workflow.add_edge("finish", END)
        
        return workflow.compile(checkpointer=MemorySaver())
    
    def _plan_step(self, state: AgentState) -> Dict:
        """规划步骤"""
        # 获取用户输入
        user_input = state["messages"][-1].content
        
        # 构建工具描述
        tools_desc = "\n".join([
            f"- {name}: {tool.description}"
            for name, tool in self.tools.items()
        ])
        
        # 调用规划器
        prompt = self._create_planner().format(
            tools_description=tools_desc,
            input=user_input
        )
        
        response = self.llm.invoke([HumanMessage(content=prompt)])
        
        # 解析计划
        import json
        try:
            plan_data = json.loads(response.content)
            plan = plan_data.get("plan", [])
        except:
            # 如果解析失败,创建一个简单计划
            plan = [{
                "step_number": 1,
                "description": user_input,
                "tool": "default",
                "expected_output": "任务结果"
            }]
        
        return {
            "messages": [AIMessage(content=f"计划已制定:\n{json.dumps(plan, indent=2, ensure_ascii=False)}")],
            "plan": plan,
            "current_step": 0,
            "results": {}
        }
    
    def _execute_step(self, state: AgentState) -> Dict:
        """执行当前步骤"""
        plan = state["plan"]
        current = state["current_step"]
        
        if current >= len(plan):
            return {"messages": [AIMessage(content="所有步骤已完成")]}
        
        step = plan[current]
        
        # 构建执行提示
        execution_prompt = f"""执行以下计划步骤:

步骤 {step['step_number']}: {step['description']}
使用工具: {step['tool']}
预期输出: {step['expected_output']}

之前的执行结果:
{state['results']}

请执行此步骤并返回结果。
"""
        
        # 调用执行器
        response = self.executor.invoke([HumanMessage(content=execution_prompt)])
        
        # 保存结果
        results = state["results"].copy()
        results[current] = response.content
        
        return {
            "messages": [response],
            "results": results
        }
    
    def _evaluate_step(self, state: AgentState) -> Dict:
        """评估执行结果"""
        plan = state["plan"]
        current = state["current_step"]
        results = state["results"]
        
        if current >= len(plan):
            return {"current_step": current, "should_finish": True}
        
        step = plan[current]
        result = results.get(current, "")
        
        # 评估结果
        eval_prompt = f"""评估以下执行结果:

步骤: {step['description']}
预期输出: {step['expected_output']}
实际输出: {result}

请判断:
1. 结果是否符合预期?
2. 是否需要重新规划?
3. 是否可以继续下一步?

返回 JSON:{{"status": "success/fail/replan", "reason": "原因"}}
"""
        
        response = self.llm.invoke([HumanMessage(content=eval_prompt)])
        
        # 解析评估结果
        import json
        try:
            eval_data = json.loads(response.content)
            status = eval_data.get("status", "success")
        except:
            status = "success"
        
        # 决定下一步
        if status == "replan":
            return {"current_step": current, "should_replan": True}
        elif status == "fail":
            return {"current_step": current, "should_replan": True}
        else:
            # 成功,进入下一步
            return {"current_step": current + 1, "should_continue": True}
    
    def _should_continue(self, state: AgentState) -> str:
        """决定下一步"""
        plan = state["plan"]
        current = state["current_step"]
        
        if current >= len(plan):
            return "finish"
        elif state.get("should_replan"):
            return "replan"
        else:
            return "continue"
    
    def _replan_step(self, state: AgentState) -> Dict:
        """重新规划"""
        # 基于当前结果重新制定计划
        user_input = state["messages"][0].content
        current_results = state["results"]
        
        replan_prompt = f"""基于以下执行结果,重新制定计划:

原始请求:{user_input}
已执行步骤结果:
{current_results}

剩余任务需要重新规划。请制定新的计划。
"""
        
        response = self.llm.invoke([HumanMessage(content=replan_prompt)])
        
        # 解析新计划
        import json
        try:
            plan_data = json.loads(response.content)
            new_plan = plan_data.get("plan", [])
        except:
            new_plan = []
        
        return {
            "messages": [AIMessage(content=f"计划已调整:\n{json.dumps(new_plan, indent=2, ensure_ascii=False)}")],
            "plan": new_plan,
            "should_replan": False
        }
    
    def _finish(self, state: AgentState) -> Dict:
        """完成任务"""
        results = state["results"]
        
        # 汇总结果
        summary_prompt = f"""汇总以下执行结果,给出最终答案:

执行结果:
{results}

请给出清晰、完整的最终答案。
"""
        
        response = self.llm.invoke([HumanMessage(content=summary_prompt)])
        
        return {
            "messages": [response],
            "final_answer": response.content
        }
    
    def run(self, query: str) -> Dict:
        """运行 Agent"""
        initial_state = {
            "messages": [HumanMessage(content=query)],
            "plan": [],
            "current_step": 0,
            "results": {},
            "final_answer": ""
        }
        
        result = self.graph.invoke(
            initial_state,
            config={"configurable": {"thread_id": "1"}}
        )
        
        return result


# 定义工具
@tool
def search_web(query: str) -> str:
    """搜索网络信息"""
    return f"搜索结果:关于 '{query}' 的相关信息..."

@tool
def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        result = eval(expression)
        return f"计算结果:{result}"
    except:
        return "计算错误"

@tool
def write_file(filename: str, content: str) -> str:
    """写入文件"""
    return f"文件 '{filename}' 已写入"


# 使用示例
if __name__ == "__main__":
    tools = [search_web, calculate, write_file]
    agent = PlanAndExecuteAgent(tools)
    
    result = agent.run("搜索2024年AI发展趋势,计算增长率,并将结果保存到文件")
    print("最终答案:", result.get("final_answer"))

2. 使用 LangGraph 的简化实现

# simplified_plan_execute.py
from typing import List, TypedDict
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
import json

class State(TypedDict):
    input: str
    plan: List[str]
    past_steps: List[tuple]
    response: str

# 创建 LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0)

# 规划器
planner_prompt = """对于给定目标,制定一个简单的逐步计划。
计划应包含独立的任务,如果正确执行,将得出正确答案。
不要添加任何多余的步骤。

目标:{input}

以 JSON 格式返回计划:{{"steps": ["步骤1", "步骤2", ...]}}"""

def planner(state: State):
    """制定计划"""
    prompt = planner_prompt.format(input=state["input"])
    response = llm.invoke([HumanMessage(content=prompt)])
    
    try:
        plan = json.loads(response.content)
        steps = plan.get("steps", [])
    except:
        steps = [state["input"]]
    
    return {"plan": steps}

# 执行器
executor_prompt = """执行以下任务:

任务:{step}

之前的执行结果:
{past_steps}

请执行任务并返回结果。"""

def executor(state: State):
    """执行步骤"""
    if not state["plan"]:
        return {"response": "计划为空"}
    
    current_step = state["plan"][0]
    past_steps_str = "\n".join([
        f"步骤:{s}\n结果:{r}"
        for s, r in state["past_steps"]
    ])
    
    prompt = executor_prompt.format(
        step=current_step,
        past_steps=past_steps_str
    )
    
    response = llm.invoke([HumanMessage(content=prompt)])
    
    # 更新状态
    new_past_steps = state["past_steps"] + [(current_step, response.content)]
    new_plan = state["plan"][1:]
    
    return {
        "past_steps": new_past_steps,
        "plan": new_plan
    }

# 重新规划器
replan_prompt = """你的任务是:
1. 根据原始目标判断当前进度
2. 决定是否需要调整计划

原始目标:{input}
已完成步骤:
{past_steps}
剩余计划:{plan}

如果需要调整,请返回新的计划。
以 JSON 格式返回:{{"plan": ["新步骤1", "新步骤2"]}}"""

def replanner(state: State):
    """重新规划"""
    if not state["plan"]:
        return {"response": "任务完成"}
    
    past_steps_str = "\n".join([
        f"步骤:{s}\n结果:{r}"
        for s, r in state["past_steps"]
    ])
    
    prompt = replan_prompt.format(
        input=state["input"],
        past_steps=past_steps_str,
        plan=state["plan"]
    )
    
    response = llm.invoke([HumanMessage(content=prompt)])
    
    try:
        new_plan = json.loads(response.content)
        steps = new_plan.get("plan", state["plan"])
    except:
        steps = state["plan"]
    
    return {"plan": steps}

# 构建图
workflow = StateGraph(State)

workflow.add_node("planner", planner)
workflow.add_node("executor", executor)
workflow.add_node("replan", replanner)

workflow.set_entry_point("planner")
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "replan")

# 条件边:检查是否完成
def should_end(state: State):
    if not state["plan"] or state.get("response"):
        return END
    return "executor"

workflow.add_conditional_edges("replan", should_end)

app = workflow.compile()

# 运行
result = app.invoke({
    "input": "研究2024年AI Agent发展趋势,总结3个关键方向",
    "plan": [],
    "past_steps": [],
    "response": ""
})

print("执行结果:")
for step, result_text in result["past_steps"]:
    print(f"\n{step}】")
    print(result_text)

高级功能

1. 分层规划

# hierarchical_planning.py
from typing import List, Dict
from dataclasses import dataclass

@dataclass
class Task:
    """任务定义"""
    id: str
    description: str
    subtasks: List['Task'] = None
    dependencies: List[str] = None
    status: str = "pending"  # pending, in_progress, completed, failed
    
    def __post_init__(self):
        if self.subtasks is None:
            self.subtasks = []
        if self.dependencies is None:
            self.dependencies = []

class HierarchicalPlanner:
    """分层规划器"""
    
    def __init__(self, llm):
        self.llm = llm
    
    def create_high_level_plan(self, goal: str) -> List[Task]:
        """创建高层计划"""
        prompt = f"""将以下目标分解为3-5个主要阶段:

目标:{goal}

以 JSON 格式返回:
{{
    "phases": [
        {{"id": "phase1", "description": "阶段描述"}},
        ...
    ]
}}"""
        
        response = self.llm.invoke([HumanMessage(content=prompt)])
        
        try:
            data = json.loads(response.content)
            phases = data.get("phases", [])
            return [
                Task(id=p["id"], description=p["description"])
                for p in phases
            ]
        except:
            return [Task(id="1", description=goal)]
    
    def expand_task(self, task: Task) -> Task:
        """展开任务为子任务"""
        prompt = f"""将以下任务分解为具体的子任务:

任务:{task.description}

要求:
1. 每个子任务应可独立执行
2. 明确子任务间的依赖关系
3. 最多5个子任务

以 JSON 格式返回:
{{
    "subtasks": [
        {{
            "id": "subtask1",
            "description": "子任务描述",
            "dependencies": []
        }}
    ]
}}"""
        
        response = self.llm.invoke([HumanMessage(content=prompt)])
        
        try:
            data = json.loads(response.content)
            subtasks_data = data.get("subtasks", [])
            task.subtasks = [
                Task(
                    id=f"{task.id}.{st['id']}",
                    description=st["description"],
                    dependencies=st.get("dependencies", [])
                )
                for st in subtasks_data
            ]
        except:
            pass
        
        return task
    
    def execute_hierarchical(self, goal: str, executor_func):
        """分层执行"""
        # 1. 创建高层计划
        high_level_plan = self.create_high_level_plan(goal)
        print(f"高层计划:{len(high_level_plan)} 个阶段")
        
        results = {}
        
        for phase in high_level_plan:
            print(f"\n{'='*50}")
            print(f"执行阶段:{phase.description}")
            print(f"{'='*50}")
            
            # 2. 展开为子任务
            phase = self.expand_task(phase)
            
            # 3. 按依赖顺序执行子任务
            completed = set()
            for subtask in phase.subtasks:
                # 检查依赖
                if not all(dep in completed for dep in subtask.dependencies):
                    print(f"等待依赖:{subtask.dependencies}")
                    continue
                
                print(f"\n执行子任务:{subtask.description}")
                result = executor_func(subtask.description)
                results[subtask.id] = result
                completed.add(subtask.id)
                
                print(f"结果:{result[:100]}...")
        
        return results

2. 动态重新规划

# dynamic_replanning.py
class DynamicReplanningAgent:
    """支持动态重新规划的 Agent"""
    
    def __init__(self, llm, tools):
        self.llm = llm
        self.tools = tools
        self.execution_history = []
    
    def should_replan(self, state: Dict) -> bool:
        """判断是否需要重新规划"""
        prompt = f"""基于以下执行情况,判断是否需要调整计划:

原始计划:
{state['plan']}

当前步骤:{state['current_step']}
最近执行结果:{state['last_result']}

考虑因素:
1. 执行结果是否符合预期?
2. 是否出现了新的信息需要调整计划?
3. 当前路径是否无法达成目标?

返回 JSON:{{"should_replan": true/false, "reason": "原因"}}"""
        
        response = self.llm.invoke([HumanMessage(content=prompt)])
        
        try:
            result = json.loads(response.content)
            return result.get("should_replan", False)
        except:
            return False
    
    def replan(self, state: Dict) -> List[Dict]:
        """重新制定计划"""
        prompt = f"""基于以下信息,制定新的执行计划:

原始目标:{state['goal']}
已完成步骤:{state['completed_steps']}
已获得的成果:{state['results']}
遇到的问题:{state.get('issues', '')}

要求:
1. 充分利用已完成的成果
2. 避开已发现的问题
3. 制定更可行的路径

以 JSON 格式返回新的计划。"""
        
        response = self.llm.invoke([HumanMessage(content=prompt)])
        
        try:
            new_plan = json.loads(response.content)
            return new_plan.get("plan", [])
        except:
            return state["plan"]

最佳实践

1. 计划粒度控制

def optimize_plan_granularity(steps: List[str], max_steps: int = 10) -> List[str]:
    """优化计划粒度"""
    if len(steps) <= max_steps:
        return steps
    
    # 合并过细的步骤
    prompt = f"""将以下 {len(steps)} 个步骤合并为不超过 {max_steps} 个主要步骤:

步骤:
{chr(10).join(f"{i+1}. {step}" for i, step in enumerate(steps))}

要求:
1. 保持逻辑完整性
2. 每个步骤可独立执行
3. 步骤间有清晰的依赖关系

以列表形式返回合并后的步骤。"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    
    # 解析结果
    merged_steps = [line.strip("- ") for line in response.content.split("\n") if line.strip()]
    return merged_steps[:max_steps]

2. 执行监控

class ExecutionMonitor:
    """执行监控器"""
    
    def __init__(self):
        self.metrics = {
            "total_steps": 0,
            "completed_steps": 0,
            "replan_count": 0,
            "execution_time": 0
        }
    
    def log_step(self, step: Dict, result: str):
        """记录步骤执行"""
        self.metrics["total_steps"] += 1
        
        log_entry = {
            "timestamp": time.time(),
            "step": step,
            "result": result[:200],  # 截断
            "success": "error" not in result.lower()
        }
        
        print(f"[Step {self.metrics['total_steps']}] {step.get('description', 'Unknown')}")
        
        return log_entry
    
    def get_report(self) -> Dict:
        """生成执行报告"""
        return {
            **self.metrics,
            "success_rate": self.metrics["completed_steps"] / max(self.metrics["total_steps"], 1),
            "avg_time_per_step": self.metrics["execution_time"] / max(self.metrics["total_steps"], 1)
        }

应用场景

场景说明示例
数据分析多步骤数据处理和分析收集数据→清洗→分析→可视化
内容创作结构化内容生成大纲→章节→润色→排版
软件开发复杂任务分解需求→设计→编码→测试→部署
研究调研多源信息整合搜索→筛选→总结→报告

总结

Plan-and-Execute 模式的核心要点:

  1. 先规划后执行:避免盲目尝试,提高成功率
  2. 灵活调整:根据实际情况动态重新规划
  3. 可观测性:计划清晰,便于调试和优化
  4. 错误恢复:支持从断点恢复,提高鲁棒性

适用场景:

  • 复杂多步骤任务
  • 需要全局优化的场景
  • 对可解释性要求高的场景

不适用场景:

  • 简单单步任务
  • 需要快速响应的交互场景
  • 高度动态变化的环境

相关资源: