引言
LangGraph 是 LangChain 团队推出的 Agent 编排框架,它将 Agent 执行建模为状态图(State Graph),支持循环、条件分支、并行执行等复杂工作流。
相比传统的链式调用(Chain)或简单的 Agent 循环,LangGraph 提供了:
- 精确的状态管理:每个步骤的状态变化清晰可见
- 持久化支持:支持断点续传、人机协作
- 可视化调试:可以查看完整的执行轨迹
- 多 Agent 协作:支持 Supervisor、Hierarchical 等模式
本文将深入讲解 LangGraph 的核心概念,并通过实战案例展示如何构建生产级 Agent 系统。
LangGraph 核心概念
1. 状态机模型
┌─────────────────────────────────────────────────────────────┐
│ LangGraph 状态机模型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ action ┌─────────┐ │
│ │ State │ ────────────────→ │ State' │ │
│ │ (t) │ │ (t+1) │ │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ │ messages: [...] │ messages: [...] │
│ │ next_node: "agent" │ next_node: "tool" │
│ │ tool_calls: [...] │ tool_results: [] │
│ │ │ │
│ └──────────────────────────────┘ │
│ 状态流转 │
│ │
│ 关键特性: │
│ • 每个节点接收完整 State,返回更新 │
│ • 支持条件边(Conditional Edges) │
│ • 支持循环(Cycles) │
│ • 内置持久化(Persistence) │
│ │
└─────────────────────────────────────────────────────────────┘
2. 基础架构
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
# 定义状态
class AgentState(TypedDict):
"""Agent 状态定义"""
messages: Annotated[Sequence[BaseMessage], add_messages]
# add_messages 会自动追加消息,而不是覆盖
next_node: str # 下一个要执行的节点
iteration_count: int # 迭代计数(防止无限循环)
# 创建图
workflow = StateGraph(AgentState)
# 定义节点
def agent_node(state: AgentState):
"""Agent 决策节点"""
messages = state["messages"]
# 调用 LLM
response = llm.invoke(messages)
return {
"messages": [response],
"next_node": "tools" if response.tool_calls else END,
"iteration_count": state.get("iteration_count", 0) + 1
}
def tools_node(state: AgentState):
"""工具执行节点"""
messages = state["messages"]
last_message = messages[-1]
# 执行工具调用
tool_results = []
for tool_call in last_message.tool_calls:
result = execute_tool(tool_call)
tool_results.append(result)
return {
"messages": tool_results,
"next_node": "agent"
}
# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tools_node)
# 添加边
workflow.add_conditional_edges(
"agent",
lambda state: state["next_node"],
{"tools": "tools", END: END}
)
workflow.add_edge("tools", "agent")
# 设置入口
workflow.set_entry_point("agent")
# 编译
app = workflow.compile()
# 运行
result = app.invoke({
"messages": [HumanMessage(content="搜索最新的 AI 新闻")]
})
实战一:智能研究助手
构建一个能够自主进行多轮搜索、分析和总结的研究助手。
# research_assistant.py
from typing import TypedDict, Annotated, List, Dict
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools import DuckDuckGoSearchRun
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
import json
class ResearchState(TypedDict):
"""研究助手状态"""
messages: Annotated[List[BaseMessage], add_messages]
research_topic: str
search_queries: List[str]
search_results: List[Dict]
analysis: str
final_report: str
iteration: int
max_iterations: int
# 初始化
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
search_tool = DuckDuckGoSearchRun()
# 节点 1:规划搜索策略
def planner_node(state: ResearchState) -> ResearchState:
"""规划搜索策略,生成多个搜索查询"""
topic = state["research_topic"]
prompt = f"""你是一个研究规划专家。请为以下主题生成 3-5 个不同的搜索查询,
确保覆盖主题的不同方面。
主题:{topic}
请返回 JSON 格式:
{{"queries": ["查询1", "查询2", "查询3"]}}
"""
response = llm.invoke([HumanMessage(content=prompt)])
try:
queries = json.loads(response.content)["queries"]
except:
queries = [topic] # 回退方案
return {
"messages": [AIMessage(content=f"规划搜索查询: {queries}")],
"search_queries": queries,
"iteration": state.get("iteration", 0) + 1
}
# 节点 2:执行搜索
def search_node(state: ResearchState) -> ResearchState:
"""并行执行多个搜索查询"""
queries = state["search_queries"]
results = []
for query in queries:
try:
result = search_tool.invoke(query)
results.append({
"query": query,
"result": result,
"source": "duckduckgo"
})
except Exception as e:
results.append({
"query": query,
"result": f"搜索失败: {str(e)}",
"source": "error"
})
return {
"messages": [AIMessage(content=f"完成 {len(results)} 个搜索")],
"search_results": results
}
# 节点 3:分析搜索结果
def analyzer_node(state: ResearchState) -> ResearchState:
"""分析搜索结果,提取关键信息"""
results = state["search_results"]
topic = state["research_topic"]
# 格式化搜索结果
formatted_results = "\n\n".join([
f"查询: {r['query']}\n结果: {r['result'][:500]}..."
for r in results
])
prompt = f"""请分析以下关于"{topic}"的搜索结果,提取关键发现和要点:
{formatted_results}
请提供:
1. 主要发现(3-5 条)
2. 关键数据/事实
3. 不同观点/争议点
4. 需要进一步研究的问题
"""
response = llm.invoke([HumanMessage(content=prompt)])
return {
"messages": [response],
"analysis": response.content
}
# 节点 4:判断是否需要更多搜索
def should_continue(state: ResearchState) -> str:
"""条件边:判断是否继续搜索"""
iteration = state.get("iteration", 0)
max_iter = state.get("max_iterations", 3)
if iteration >= max_iter:
return "generate_report"
# 可以在这里添加更复杂的逻辑
# 比如检查分析结果是否充分
return "generate_report" # 简化示例
# 节点 5:生成最终报告
def report_node(state: ResearchState) -> ResearchState:
"""生成研究报告"""
topic = state["research_topic"]
analysis = state["analysis"]
results = state["search_results"]
prompt = f"""基于以下分析,生成一份结构化的研究报告:
主题:{topic}
分析摘要:
{analysis}
请生成包含以下部分的报告:
1. 执行摘要
2. 背景介绍
3. 主要发现
4. 关键数据与事实
5. 结论与建议
6. 参考来源
"""
response = llm.invoke([HumanMessage(content=prompt)])
return {
"messages": [response],
"final_report": response.content
}
# 构建图
workflow = StateGraph(ResearchState)
workflow.add_node("planner", planner_node)
workflow.add_node("search", search_node)
workflow.add_node("analyzer", analyzer_node)
workflow.add_node("report", report_node)
workflow.add_edge("planner", "search")
workflow.add_edge("search", "analyzer")
workflow.add_conditional_edges(
"analyzer",
should_continue,
{
"planner": "planner", # 继续下一轮搜索
"generate_report": "report"
}
)
workflow.add_edge("report", END)
workflow.set_entry_point("planner")
# 添加持久化
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
# 使用示例
def run_research(topic: str):
"""运行研究任务"""
initial_state = {
"research_topic": topic,
"messages": [HumanMessage(content=f"开始研究: {topic}")],
"iteration": 0,
"max_iterations": 2
}
# 运行(支持断点续传)
config = {"configurable": {"thread_id": "research_001"}}
for event in app.stream(initial_state, config, stream_mode="values"):
if "analysis" in event:
print(f"\n分析完成:{event['analysis'][:200]}...")
if "final_report" in event:
print(f"\n报告生成完成!")
return event["final_report"]
return None
# 运行
report = run_research("2025年 AI Agent 发展趋势")
print(report)
实战二:多 Agent 代码审查系统
构建一个模拟真实团队代码审查流程的多 Agent 系统。
# multi_agent_code_review.py
from typing import TypedDict, Annotated, Dict, List
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from enum import Enum
class ReviewState(TypedDict):
"""代码审查状态"""
messages: Annotated[List[BaseMessage], add_messages]
code: str
language: str
current_reviewer: str
reviews: Dict[str, str] # reviewer -> review_content
consolidated_feedback: str
approved: bool
review_round: int
class ReviewerType(Enum):
SECURITY = "security_expert"
PERFORMANCE = "performance_expert"
STYLE = "style_expert"
ARCHITECT = "architect"
# 初始化多个 LLM 实例(可以配置不同的模型)
reviewer_llms = {
ReviewerType.SECURITY: ChatOpenAI(model="gpt-4o", temperature=0.1),
ReviewerType.PERFORMANCE: ChatOpenAI(model="gpt-4o", temperature=0.1),
ReviewerType.STYLE: ChatOpenAI(model="gpt-4o-mini", temperature=0.2),
ReviewerType.ARCHITECT: ChatOpenAI(model="gpt-4o", temperature=0.1),
}
# 系统提示词
REVIEWER_PROMPTS = {
ReviewerType.SECURITY: """你是一位安全专家。请审查代码中的安全问题:
- SQL 注入、XSS、命令注入等漏洞
- 敏感信息泄露
- 不安全的反序列化
- 权限控制问题
请按严重程度分类问题,并提供修复建议。""",
ReviewerType.PERFORMANCE: """你是一位性能专家。请审查代码中的性能问题:
- 时间复杂度问题
- 内存泄漏风险
- 数据库查询优化
- 缓存策略
- 资源释放
请提供具体的优化建议。""",
ReviewerType.STYLE: """你是一位代码风格专家。请审查代码的可读性和规范性:
- 命名规范
- 代码结构
- 注释质量
- 文档字符串
- 代码复杂度
请参照 PEP8(Python)或相应语言的规范。""",
ReviewerType.ARCHITECT: """你是一位架构师。请审查代码的架构设计:
- 设计模式使用
- 模块划分
- 接口设计
- 可扩展性
- 可维护性
请提供架构层面的改进建议。"""
}
def create_reviewer_node(reviewer_type: ReviewerType):
"""创建审查者节点"""
def reviewer_node(state: ReviewState) -> ReviewState:
code = state["code"]
language = state["language"]
llm = reviewer_llms[reviewer_type]
system_prompt = REVIEWER_PROMPTS[reviewer_type]
messages = [
("system", system_prompt),
("human", f"请审查以下 {language} 代码:\n\n```{language}\n{code}\n```")
]
response = llm.invoke(messages)
# 更新状态
reviews = state.get("reviews", {})
reviews[reviewer_type.value] = response.content
return {
"messages": [AIMessage(content=f"{reviewer_type.value} 完成审查")],
"reviews": reviews,
"current_reviewer": reviewer_type.value
}
return reviewer_node
# 创建所有审查者节点
for reviewer_type in ReviewerType:
node_func = create_reviewer_node(reviewer_type)
globals()[f"{reviewer_type.value}_node"] = node_func
# 并行审查节点(使用 Send 语法)
def parallel_review_node(state: ReviewState) -> List[Dict]:
"""触发并行审查"""
from langgraph.types import Send
# 返回多个 Send 指令,实现并行执行
return [
Send(f"{rt.value}_review", state)
for rt in ReviewerType
]
# 整合审查结果
def consolidate_node(state: ReviewState) -> ReviewState:
"""整合多个审查者的反馈"""
reviews = state["reviews"]
code = state["code"]
# 格式化所有审查意见
formatted_reviews = "\n\n".join([
f"## {reviewer.upper()}\n{content}"
for reviewer, content in reviews.items()
])
prompt = f"""你是一位首席技术官。请整合以下多位专家的代码审查意见:
原始代码:{code[:1000]}...
审查意见:
{formatted_reviews}
请提供:
1. 关键问题汇总(按优先级排序)
2. 必须修复的问题(阻塞性)
3. 建议修复的问题(非阻塞)
4. 整体评估:APPROVE / NEEDS_REVISION / REJECT
5. 下一步行动建议
"""
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
response = llm.invoke([HumanMessage(content=prompt)])
# 判断是否通过
content = response.content.upper()
approved = "APPROVE" in content and "NEEDS_REVISION" not in content and "REJECT" not in content
return {
"messages": [response],
"consolidated_feedback": response.content,
"approved": approved,
"review_round": state.get("review_round", 0) + 1
}
# 构建多 Agent 工作流
workflow = StateGraph(ReviewState)
# 添加并行审查节点
for reviewer_type in ReviewerType:
node_name = f"{reviewer_type.value}_review"
workflow.add_node(node_name, create_reviewer_node(reviewer_type))
workflow.add_node("consolidate", consolidate_node)
# 添加边
workflow.add_conditional_edges(
"__start__",
parallel_review_node,
[f"{rt.value}_review" for rt in ReviewerType]
)
# 所有审查节点都连接到整合节点
for reviewer_type in ReviewerType:
workflow.add_edge(f"{reviewer_type.value}_review", "consolidate")
workflow.add_edge("consolidate", END)
# 编译
app = workflow.compile()
# 使用示例
def review_code(code: str, language: str = "python") -> Dict:
"""审查代码"""
initial_state = {
"code": code,
"language": language,
"messages": [HumanMessage(content="开始代码审查")],
"reviews": {},
"review_round": 0
}
result = app.invoke(initial_state)
return {
"approved": result["approved"],
"feedback": result["consolidated_feedback"],
"individual_reviews": result["reviews"]
}
# 测试
test_code = """
def process_user_data(user_input):
query = f"SELECT * FROM users WHERE name = '{user_input}'"
result = db.execute(query)
return result
"""
result = review_code(test_code)
print(f"审查结果:{'通过' if result['approved'] else '未通过'}")
print(result["feedback"])
持久化与人机协作
LangGraph 的持久化机制支持断点续传和人机协作。
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.types import Command
# 使用 SQLite 持久化
with SqliteSaver.from_conn_string(":memory:") as checkpointer:
app = workflow.compile(checkpointer=checkpointer)
# 运行并中断
config = {"configurable": {"thread_id": "review_001"}}
# 运行到断点
for event in app.stream(
initial_state,
config,
stream_mode="values",
interrupt_after=["consolidate"] # 在整合后中断
):
print(event)
# 人机协作:人工审查
user_input = input("是否批准? (yes/no/modify): ")
if user_input == "modify":
# 修改后继续
modified_state = {
**event,
"human_feedback": input("请输入修改意见:")
}
# 继续执行
for event in app.stream(
Command(resume=modified_state),
config,
stream_mode="values"
):
print(event)
可视化调试
from langgraph.graph import StateGraph
# 生成 Mermaid 图表
graph = app.get_graph()
print(graph.draw_mermaid())
# 保存为图片(需要安装 graphviz)
graph.draw_png("workflow.png")
总结
LangGraph 的核心优势:
- 精确控制:状态机模型让每一步都可控可观测
- 持久化:支持断点续传、人机协作
- 并行执行:支持复杂的并行和条件分支
- 可视化:完整的执行轨迹和状态变化
适用场景:
- 需要精确控制流程的 Agent 系统
- 多 Agent 协作场景
- 需要人机协作的工作流
- 长时间运行的任务
生产建议:
- 使用持久化存储(SQLite/Redis/Postgres)
- 设置最大迭代次数防止无限循环
- 添加错误处理和重试机制
- 使用可视化工具调试复杂工作流
相关资源: