AI Agent 高级 LangGraph Agent 工作流 状态机

LangGraph Agent 工作流编排:从状态机到多 Agent 协作系统

AIEng Hub
阅读约 40 分钟

引言

LangGraph 是 LangChain 团队推出的 Agent 编排框架,它将 Agent 执行建模为状态图(State Graph),支持循环、条件分支、并行执行等复杂工作流。

相比传统的链式调用(Chain)或简单的 Agent 循环,LangGraph 提供了:

  • 精确的状态管理:每个步骤的状态变化清晰可见
  • 持久化支持:支持断点续传、人机协作
  • 可视化调试:可以查看完整的执行轨迹
  • 多 Agent 协作:支持 Supervisor、Hierarchical 等模式

本文将深入讲解 LangGraph 的核心概念,并通过实战案例展示如何构建生产级 Agent 系统。

LangGraph 核心概念

1. 状态机模型

┌─────────────────────────────────────────────────────────────┐
│                  LangGraph 状态机模型                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────┐      action       ┌─────────┐               │
│   │  State  │ ────────────────→ │  State' │               │
│   │  (t)    │                   │  (t+1)  │               │
│   └────┬────┘                   └────┬────┘               │
│        │                              │                     │
│        │  messages: [...]             │  messages: [...]   │
│        │  next_node: "agent"        │  next_node: "tool" │
│        │  tool_calls: [...]         │  tool_results: []  │
│        │                              │                     │
│        └──────────────────────────────┘                     │
│                      状态流转                                │
│                                                             │
│   关键特性:                                                 │
│   • 每个节点接收完整 State,返回更新                         │
│   • 支持条件边(Conditional Edges)                          │
│   • 支持循环(Cycles)                                       │
│   • 内置持久化(Persistence)                                │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2. 基础架构

from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages

# 定义状态
class AgentState(TypedDict):
    """Agent 状态定义"""
    messages: Annotated[Sequence[BaseMessage], add_messages]
    # add_messages 会自动追加消息,而不是覆盖
    next_node: str  # 下一个要执行的节点
    iteration_count: int  # 迭代计数(防止无限循环)

# 创建图
workflow = StateGraph(AgentState)

# 定义节点
def agent_node(state: AgentState):
    """Agent 决策节点"""
    messages = state["messages"]
    
    # 调用 LLM
    response = llm.invoke(messages)
    
    return {
        "messages": [response],
        "next_node": "tools" if response.tool_calls else END,
        "iteration_count": state.get("iteration_count", 0) + 1
    }

def tools_node(state: AgentState):
    """工具执行节点"""
    messages = state["messages"]
    last_message = messages[-1]
    
    # 执行工具调用
    tool_results = []
    for tool_call in last_message.tool_calls:
        result = execute_tool(tool_call)
        tool_results.append(result)
    
    return {
        "messages": tool_results,
        "next_node": "agent"
    }

# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tools_node)

# 添加边
workflow.add_conditional_edges(
    "agent",
    lambda state: state["next_node"],
    {"tools": "tools", END: END}
)
workflow.add_edge("tools", "agent")

# 设置入口
workflow.set_entry_point("agent")

# 编译
app = workflow.compile()

# 运行
result = app.invoke({
    "messages": [HumanMessage(content="搜索最新的 AI 新闻")]
})

实战一:智能研究助手

构建一个能够自主进行多轮搜索、分析和总结的研究助手。

# research_assistant.py
from typing import TypedDict, Annotated, List, Dict
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools import DuckDuckGoSearchRun
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
import json

class ResearchState(TypedDict):
    """研究助手状态"""
    messages: Annotated[List[BaseMessage], add_messages]
    research_topic: str
    search_queries: List[str]
    search_results: List[Dict]
    analysis: str
    final_report: str
    iteration: int
    max_iterations: int

# 初始化
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
search_tool = DuckDuckGoSearchRun()

# 节点 1:规划搜索策略
def planner_node(state: ResearchState) -> ResearchState:
    """规划搜索策略,生成多个搜索查询"""
    topic = state["research_topic"]
    
    prompt = f"""你是一个研究规划专家。请为以下主题生成 3-5 个不同的搜索查询,
    确保覆盖主题的不同方面。
    
    主题:{topic}
    
    请返回 JSON 格式:
    {{"queries": ["查询1", "查询2", "查询3"]}}
    """
    
    response = llm.invoke([HumanMessage(content=prompt)])
    
    try:
        queries = json.loads(response.content)["queries"]
    except:
        queries = [topic]  # 回退方案
    
    return {
        "messages": [AIMessage(content=f"规划搜索查询: {queries}")],
        "search_queries": queries,
        "iteration": state.get("iteration", 0) + 1
    }

# 节点 2:执行搜索
def search_node(state: ResearchState) -> ResearchState:
    """并行执行多个搜索查询"""
    queries = state["search_queries"]
    results = []
    
    for query in queries:
        try:
            result = search_tool.invoke(query)
            results.append({
                "query": query,
                "result": result,
                "source": "duckduckgo"
            })
        except Exception as e:
            results.append({
                "query": query,
                "result": f"搜索失败: {str(e)}",
                "source": "error"
            })
    
    return {
        "messages": [AIMessage(content=f"完成 {len(results)} 个搜索")],
        "search_results": results
    }

# 节点 3:分析搜索结果
def analyzer_node(state: ResearchState) -> ResearchState:
    """分析搜索结果,提取关键信息"""
    results = state["search_results"]
    topic = state["research_topic"]
    
    # 格式化搜索结果
    formatted_results = "\n\n".join([
        f"查询: {r['query']}\n结果: {r['result'][:500]}..."
        for r in results
    ])
    
    prompt = f"""请分析以下关于"{topic}"的搜索结果,提取关键发现和要点:
    
    {formatted_results}
    
    请提供:
    1. 主要发现(3-5 条)
    2. 关键数据/事实
    3. 不同观点/争议点
    4. 需要进一步研究的问题
    """
    
    response = llm.invoke([HumanMessage(content=prompt)])
    
    return {
        "messages": [response],
        "analysis": response.content
    }

# 节点 4:判断是否需要更多搜索
def should_continue(state: ResearchState) -> str:
    """条件边:判断是否继续搜索"""
    iteration = state.get("iteration", 0)
    max_iter = state.get("max_iterations", 3)
    
    if iteration >= max_iter:
        return "generate_report"
    
    # 可以在这里添加更复杂的逻辑
    # 比如检查分析结果是否充分
    return "generate_report"  # 简化示例

# 节点 5:生成最终报告
def report_node(state: ResearchState) -> ResearchState:
    """生成研究报告"""
    topic = state["research_topic"]
    analysis = state["analysis"]
    results = state["search_results"]
    
    prompt = f"""基于以下分析,生成一份结构化的研究报告:
    
    主题:{topic}
    
    分析摘要:
    {analysis}
    
    请生成包含以下部分的报告:
    1. 执行摘要
    2. 背景介绍
    3. 主要发现
    4. 关键数据与事实
    5. 结论与建议
    6. 参考来源
    """
    
    response = llm.invoke([HumanMessage(content=prompt)])
    
    return {
        "messages": [response],
        "final_report": response.content
    }

# 构建图
workflow = StateGraph(ResearchState)

workflow.add_node("planner", planner_node)
workflow.add_node("search", search_node)
workflow.add_node("analyzer", analyzer_node)
workflow.add_node("report", report_node)

workflow.add_edge("planner", "search")
workflow.add_edge("search", "analyzer")
workflow.add_conditional_edges(
    "analyzer",
    should_continue,
    {
        "planner": "planner",  # 继续下一轮搜索
        "generate_report": "report"
    }
)
workflow.add_edge("report", END)

workflow.set_entry_point("planner")

# 添加持久化
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

# 使用示例
def run_research(topic: str):
    """运行研究任务"""
    initial_state = {
        "research_topic": topic,
        "messages": [HumanMessage(content=f"开始研究: {topic}")],
        "iteration": 0,
        "max_iterations": 2
    }
    
    # 运行(支持断点续传)
    config = {"configurable": {"thread_id": "research_001"}}
    
    for event in app.stream(initial_state, config, stream_mode="values"):
        if "analysis" in event:
            print(f"\n分析完成:{event['analysis'][:200]}...")
        if "final_report" in event:
            print(f"\n报告生成完成!")
            return event["final_report"]
    
    return None

# 运行
report = run_research("2025年 AI Agent 发展趋势")
print(report)

实战二:多 Agent 代码审查系统

构建一个模拟真实团队代码审查流程的多 Agent 系统。

# multi_agent_code_review.py
from typing import TypedDict, Annotated, Dict, List
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from enum import Enum

class ReviewState(TypedDict):
    """代码审查状态"""
    messages: Annotated[List[BaseMessage], add_messages]
    code: str
    language: str
    current_reviewer: str
    reviews: Dict[str, str]  # reviewer -> review_content
    consolidated_feedback: str
    approved: bool
    review_round: int

class ReviewerType(Enum):
    SECURITY = "security_expert"
    PERFORMANCE = "performance_expert"
    STYLE = "style_expert"
    ARCHITECT = "architect"

# 初始化多个 LLM 实例(可以配置不同的模型)
reviewer_llms = {
    ReviewerType.SECURITY: ChatOpenAI(model="gpt-4o", temperature=0.1),
    ReviewerType.PERFORMANCE: ChatOpenAI(model="gpt-4o", temperature=0.1),
    ReviewerType.STYLE: ChatOpenAI(model="gpt-4o-mini", temperature=0.2),
    ReviewerType.ARCHITECT: ChatOpenAI(model="gpt-4o", temperature=0.1),
}

# 系统提示词
REVIEWER_PROMPTS = {
    ReviewerType.SECURITY: """你是一位安全专家。请审查代码中的安全问题:
    - SQL 注入、XSS、命令注入等漏洞
    - 敏感信息泄露
    - 不安全的反序列化
    - 权限控制问题
    
    请按严重程度分类问题,并提供修复建议。""",
    
    ReviewerType.PERFORMANCE: """你是一位性能专家。请审查代码中的性能问题:
    - 时间复杂度问题
    - 内存泄漏风险
    - 数据库查询优化
    - 缓存策略
    - 资源释放
    
    请提供具体的优化建议。""",
    
    ReviewerType.STYLE: """你是一位代码风格专家。请审查代码的可读性和规范性:
    - 命名规范
    - 代码结构
    - 注释质量
    - 文档字符串
    - 代码复杂度
    
    请参照 PEP8(Python)或相应语言的规范。""",
    
    ReviewerType.ARCHITECT: """你是一位架构师。请审查代码的架构设计:
    - 设计模式使用
    - 模块划分
    - 接口设计
    - 可扩展性
    - 可维护性
    
    请提供架构层面的改进建议。"""
}

def create_reviewer_node(reviewer_type: ReviewerType):
    """创建审查者节点"""
    
    def reviewer_node(state: ReviewState) -> ReviewState:
        code = state["code"]
        language = state["language"]
        
        llm = reviewer_llms[reviewer_type]
        system_prompt = REVIEWER_PROMPTS[reviewer_type]
        
        messages = [
            ("system", system_prompt),
            ("human", f"请审查以下 {language} 代码:\n\n```{language}\n{code}\n```")
        ]
        
        response = llm.invoke(messages)
        
        # 更新状态
        reviews = state.get("reviews", {})
        reviews[reviewer_type.value] = response.content
        
        return {
            "messages": [AIMessage(content=f"{reviewer_type.value} 完成审查")],
            "reviews": reviews,
            "current_reviewer": reviewer_type.value
        }
    
    return reviewer_node

# 创建所有审查者节点
for reviewer_type in ReviewerType:
    node_func = create_reviewer_node(reviewer_type)
    globals()[f"{reviewer_type.value}_node"] = node_func

# 并行审查节点(使用 Send 语法)
def parallel_review_node(state: ReviewState) -> List[Dict]:
    """触发并行审查"""
    from langgraph.types import Send
    
    # 返回多个 Send 指令,实现并行执行
    return [
        Send(f"{rt.value}_review", state)
        for rt in ReviewerType
    ]

# 整合审查结果
def consolidate_node(state: ReviewState) -> ReviewState:
    """整合多个审查者的反馈"""
    reviews = state["reviews"]
    code = state["code"]
    
    # 格式化所有审查意见
    formatted_reviews = "\n\n".join([
        f"## {reviewer.upper()}\n{content}"
        for reviewer, content in reviews.items()
    ])
    
    prompt = f"""你是一位首席技术官。请整合以下多位专家的代码审查意见:
    
    原始代码:{code[:1000]}...
    
    审查意见:
    {formatted_reviews}
    
    请提供:
    1. 关键问题汇总(按优先级排序)
    2. 必须修复的问题(阻塞性)
    3. 建议修复的问题(非阻塞)
    4. 整体评估:APPROVE / NEEDS_REVISION / REJECT
    5. 下一步行动建议
    """
    
    llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
    response = llm.invoke([HumanMessage(content=prompt)])
    
    # 判断是否通过
    content = response.content.upper()
    approved = "APPROVE" in content and "NEEDS_REVISION" not in content and "REJECT" not in content
    
    return {
        "messages": [response],
        "consolidated_feedback": response.content,
        "approved": approved,
        "review_round": state.get("review_round", 0) + 1
    }

# 构建多 Agent 工作流
workflow = StateGraph(ReviewState)

# 添加并行审查节点
for reviewer_type in ReviewerType:
    node_name = f"{reviewer_type.value}_review"
    workflow.add_node(node_name, create_reviewer_node(reviewer_type))

workflow.add_node("consolidate", consolidate_node)

# 添加边
workflow.add_conditional_edges(
    "__start__",
    parallel_review_node,
    [f"{rt.value}_review" for rt in ReviewerType]
)

# 所有审查节点都连接到整合节点
for reviewer_type in ReviewerType:
    workflow.add_edge(f"{reviewer_type.value}_review", "consolidate")

workflow.add_edge("consolidate", END)

# 编译
app = workflow.compile()

# 使用示例
def review_code(code: str, language: str = "python") -> Dict:
    """审查代码"""
    initial_state = {
        "code": code,
        "language": language,
        "messages": [HumanMessage(content="开始代码审查")],
        "reviews": {},
        "review_round": 0
    }
    
    result = app.invoke(initial_state)
    
    return {
        "approved": result["approved"],
        "feedback": result["consolidated_feedback"],
        "individual_reviews": result["reviews"]
    }

# 测试
test_code = """
def process_user_data(user_input):
    query = f"SELECT * FROM users WHERE name = '{user_input}'"
    result = db.execute(query)
    return result
"""

result = review_code(test_code)
print(f"审查结果:{'通过' if result['approved'] else '未通过'}")
print(result["feedback"])

持久化与人机协作

LangGraph 的持久化机制支持断点续传和人机协作。

from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.types import Command

# 使用 SQLite 持久化
with SqliteSaver.from_conn_string(":memory:") as checkpointer:
    app = workflow.compile(checkpointer=checkpointer)
    
    # 运行并中断
    config = {"configurable": {"thread_id": "review_001"}}
    
    # 运行到断点
    for event in app.stream(
        initial_state, 
        config, 
        stream_mode="values",
        interrupt_after=["consolidate"]  # 在整合后中断
    ):
        print(event)
    
    # 人机协作:人工审查
    user_input = input("是否批准? (yes/no/modify): ")
    
    if user_input == "modify":
        # 修改后继续
        modified_state = {
            **event,
            "human_feedback": input("请输入修改意见:")
        }
        
        # 继续执行
        for event in app.stream(
            Command(resume=modified_state),
            config,
            stream_mode="values"
        ):
            print(event)

可视化调试

from langgraph.graph import StateGraph

# 生成 Mermaid 图表
graph = app.get_graph()
print(graph.draw_mermaid())

# 保存为图片(需要安装 graphviz)
graph.draw_png("workflow.png")

总结

LangGraph 的核心优势:

  1. 精确控制:状态机模型让每一步都可控可观测
  2. 持久化:支持断点续传、人机协作
  3. 并行执行:支持复杂的并行和条件分支
  4. 可视化:完整的执行轨迹和状态变化

适用场景:

  • 需要精确控制流程的 Agent 系统
  • 多 Agent 协作场景
  • 需要人机协作的工作流
  • 长时间运行的任务

生产建议:

  • 使用持久化存储(SQLite/Redis/Postgres)
  • 设置最大迭代次数防止无限循环
  • 添加错误处理和重试机制
  • 使用可视化工具调试复杂工作流

相关资源: