引言
随着AI Agent技术的快速发展,单一Agent已经难以满足复杂业务场景的需求。Multi-Agent系统通过多个专业Agent的协作,能够处理更复杂的任务、提供更可靠的服务。
本文将深入探讨Multi-Agent系统的核心设计原则、通信机制和协调策略,并通过实战代码展示如何构建一个生产级的Multi-Agent系统。
为什么需要Multi-Agent系统?
单一Agent的局限性
| 局限性 | 说明 | 影响 |
|---|---|---|
| 上下文窗口限制 | 复杂任务需要大量上下文,容易超出token限制 | 信息丢失,决策质量下降 |
| 职责单一 | 一个Agent难以同时精通多个领域 | 专业性不足,错误率上升 |
| 单点故障 | 单个Agent出错导致整个系统失败 | 系统可靠性降低 |
| 扩展困难 | 增加功能需要修改核心逻辑 | 维护成本增加 |
| 性能瓶颈 | 复杂任务串行执行,耗时长 | 用户体验差 |
Multi-Agent的优势
┌─────────────────────────────────────────────────────────┐
│ Multi-Agent 系统架构 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ 协调Agent │◄────►│ 监控Agent │ │
│ │ Coordinator │ │ Monitor │ │
│ └──────┬──────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ 消息总线 │ │
│ │ Message Bus │ │
│ └─────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │研究Agent │ │代码Agent │ │测试Agent │ │
│ │Research │ │ Code │ │ Test │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ ▼ │
│ ┌─────────────┐ │
│ │ 结果聚合器 │ │
│ │ Aggregator │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
- 专业化分工:每个Agent专注特定领域,提升专业度
- 并行处理:多个Agent同时工作,提高效率
- 容错性:单个Agent失败不影响整体系统
- 可扩展性:新增Agent无需修改现有代码
- 模块化:易于测试、维护和升级
Multi-Agent系统架构模式
模式1:层级架构(Hierarchical)
┌─────────────────┐
│ 主控Agent │
│ (Supervisor) │
└────────┬────────┘
│
┌────┴────┬────────┬────────┐
▼ ▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐
│Agent A│ │Agent B│ │Agent C│ │Agent D│
└───────┘ └───────┘ └───────┘ └───────┘
特点:
- 一个主控Agent负责任务分配和协调
- 子Agent负责具体执行
- 适合任务明确、流程清晰的场景
适用场景:
- 软件开发流程(需求→设计→编码→测试)
- 数据处理流水线
- 客服工单处理
模式2:网状架构(Mesh/Peer-to-Peer)
┌───────┐ ┌───────┐
│Agent A│◄────►│Agent B│
└───┬───┘ └───┬───┘
│ │
▼ ▼
┌───────┐ ┌───────┐
│Agent C│◄────►│Agent D│
└───────┘ └───────┘
特点:
- Agent之间平等通信,无中心节点
- 灵活度高,适应性强
- 需要复杂的协商机制
适用场景:
- 模拟仿真系统
- 多智能体博弈
- 分布式决策系统
模式3:黑板架构(Blackboard)
┌─────────────────────────────────────┐
│ 共享知识库 │
│ (Shared Blackboard) │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │知识A │ │知识B │ │知识C │ │知识D │ │
│ └─────┘ └─────┘ └─────┘ └─────┘ │
└─────────────────────────────────────┘
▲ ▲ ▲ ▲
│ │ │ │
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐
│Agent A│ │Agent B│ │Agent C│ │Agent D│
└───────┘ └───────┘ └───────┘ └───────┘
特点:
- 所有Agent共享一个知识库
- Agent通过读写共享数据协作
- 适合知识密集型任务
适用场景:
- 医疗诊断系统
- 故障排查系统
- 复杂问题求解
模式对比
| 架构模式 | 通信复杂度 | 容错性 | 扩展性 | 适用场景 |
|---|---|---|---|---|
| 层级架构 | 低 | 中 | 中 | 流程明确的任务 |
| 网状架构 | 高 | 高 | 高 | 复杂协作场景 |
| 黑板架构 | 中 | 中 | 低 | 知识共享场景 |
核心组件设计
1. Agent定义与接口
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import uuid
class AgentStatus(Enum):
IDLE = "idle"
BUSY = "busy"
ERROR = "error"
OFFLINE = "offline"
@dataclass
class AgentMessage:
"""Agent间通信消息"""
id: str
sender: str
receiver: str
message_type: str
content: Any
timestamp: float
metadata: Dict[str, Any]
@classmethod
def create(cls, sender: str, receiver: str,
message_type: str, content: Any,
metadata: Dict = None) -> "AgentMessage":
return cls(
id=str(uuid.uuid4()),
sender=sender,
receiver=receiver,
message_type=message_type,
content=content,
timestamp=time.time(),
metadata=metadata or {}
)
class BaseAgent(ABC):
"""Agent基类"""
def __init__(self, agent_id: str, name: str,
capabilities: List[str] = None):
self.agent_id = agent_id
self.name = name
self.capabilities = capabilities or []
self.status = AgentStatus.IDLE
self.memory: List[AgentMessage] = []
self.message_queue: List[AgentMessage] = []
@abstractmethod
async def process_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""处理任务的核心方法"""
pass
@abstractmethod
async def handle_message(self, message: AgentMessage) -> None:
"""处理收到的消息"""
pass
def can_handle(self, task_type: str) -> bool:
"""检查是否能处理某类任务"""
return task_type in self.capabilities
async def send_message(self, receiver: str,
message_type: str,
content: Any,
metadata: Dict = None) -> AgentMessage:
"""发送消息给其他Agent"""
message = AgentMessage.create(
sender=self.agent_id,
receiver=receiver,
message_type=message_type,
content=content,
metadata=metadata
)
# 通过消息总线发送
await MessageBus.send(message)
return message
2. 消息总线(Message Bus)
import asyncio
from typing import Callable, Dict, Set
from collections import defaultdict
class MessageBus:
"""Agent间通信的消息总线"""
_instance = None
_subscribers: Dict[str, Set[Callable]] = defaultdict(set)
_message_history: List[AgentMessage] = []
_max_history = 1000
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
async def send(cls, message: AgentMessage) -> None:
"""发送消息"""
# 保存到历史
cls._message_history.append(message)
if len(cls._message_history) > cls._max_history:
cls._message_history.pop(0)
# 通知订阅者
# 1. 特定接收者
if message.receiver in cls._subscribers:
for callback in cls._subscribers[message.receiver]:
try:
await callback(message)
except Exception as e:
print(f"消息处理失败: {e}")
# 2. 广播订阅者
if "*" in cls._subscribers:
for callback in cls._subscribers["*"]:
try:
await callback(message)
except Exception as e:
print(f"广播处理失败: {e}")
@classmethod
def subscribe(cls, agent_id: str,
callback: Callable[[AgentMessage], None]) -> None:
"""订阅消息"""
cls._subscribers[agent_id].add(callback)
@classmethod
def unsubscribe(cls, agent_id: str,
callback: Callable[[AgentMessage], None]) -> None:
"""取消订阅"""
cls._subscribers[agent_id].discard(callback)
@classmethod
def get_message_history(cls,
sender: str = None,
receiver: str = None,
message_type: str = None,
limit: int = 100) -> List[AgentMessage]:
"""查询消息历史"""
messages = cls._message_history
if sender:
messages = [m for m in messages if m.sender == sender]
if receiver:
messages = [m for m in messages if m.receiver == receiver]
if message_type:
messages = [m for m in messages if m.message_type == message_type]
return messages[-limit:]
3. 协调器(Coordinator)
from typing import List, Dict, Any, Optional
import asyncio
class TaskPlanner:
"""任务规划器"""
def __init__(self, llm_client):
self.llm = llm_client
async def decompose_task(self, task: str,
available_agents: List[BaseAgent]) -> List[Dict]:
"""将复杂任务分解为子任务"""
agent_info = "\n".join([
f"- {agent.name}: {', '.join(agent.capabilities)}"
for agent in available_agents
])
prompt = f"""
请将以下任务分解为可并行或串行执行的子任务。
可用Agent及其能力:
{agent_info}
任务:{task}
要求:
1. 每个子任务明确指定执行Agent
2. 标注任务依赖关系
3. 输出JSON格式
输出示例:
{{
"subtasks": [
{{
"id": "task_1",
"description": "子任务描述",
"assigned_to": "agent_name",
"dependencies": [],
"priority": 1
}}
]
}}
"""
response = await self.llm.complete(prompt)
# 解析JSON并返回子任务列表
return self._parse_plan(response)
class CoordinatorAgent(BaseAgent):
"""协调Agent - 负责任务分配和进度管理"""
def __init__(self, agent_id: str, llm_client):
super().__init__(agent_id, "Coordinator",
["task_planning", "coordination"])
self.planner = TaskPlanner(llm_client)
self.agents: Dict[str, BaseAgent] = {}
self.active_tasks: Dict[str, Dict] = {}
def register_agent(self, agent: BaseAgent) -> None:
"""注册Agent到系统"""
self.agents[agent.agent_id] = agent
# 订阅Agent的消息
MessageBus.subscribe(agent.agent_id, self._handle_agent_message)
async def execute_task(self, task: str) -> Dict[str, Any]:
"""执行复杂任务"""
# 1. 任务分解
subtasks = await self.planner.decompose_task(
task,
list(self.agents.values())
)
# 2. 创建任务追踪
task_id = str(uuid.uuid4())
self.active_tasks[task_id] = {
"task": task,
"subtasks": {st["id"]: st for st in subtasks},
"completed": set(),
"results": {}
}
# 3. 按依赖关系调度执行
results = await self._schedule_execution(task_id)
# 4. 聚合结果
final_result = await self._aggregate_results(task_id, results)
return final_result
async def _schedule_execution(self, task_id: str) -> Dict[str, Any]:
"""调度子任务执行"""
task_info = self.active_tasks[task_id]
subtasks = task_info["subtasks"]
results = {}
# 拓扑排序,按依赖执行
ready_tasks = [st for st in subtasks.values()
if not st["dependencies"]]
pending_tasks = [st for st in subtasks.values()
if st["dependencies"]]
while ready_tasks:
# 并行执行就绪任务
batch = ready_tasks[:5] # 每批最多5个
ready_tasks = ready_tasks[5:]
batch_results = await asyncio.gather(*[
self._execute_subtask(task_id, st)
for st in batch
])
for st, result in zip(batch, batch_results):
results[st["id"]] = result
task_info["completed"].add(st["id"])
# 检查新就绪的任务
newly_ready = []
for st in pending_tasks[:]:
if all(dep in task_info["completed"]
for dep in st["dependencies"]):
newly_ready.append(st)
pending_tasks.remove(st)
ready_tasks.extend(newly_ready)
return results
async def _execute_subtask(self, task_id: str,
subtask: Dict) -> Any:
"""执行单个子任务"""
agent = self.agents.get(subtask["assigned_to"])
if not agent:
raise ValueError(f"Agent {subtask['assigned_to']} 不存在")
# 发送任务执行消息
await self.send_message(
receiver=agent.agent_id,
message_type="TASK_ASSIGNMENT",
content={
"task_id": task_id,
"subtask_id": subtask["id"],
"description": subtask["description"]
}
)
# 等待结果(实际实现需要更复杂的同步机制)
# 这里简化处理
return await agent.process_task(subtask)
async def _handle_agent_message(self, message: AgentMessage) -> None:
"""处理Agent的反馈消息"""
if message.message_type == "TASK_COMPLETE":
# 更新任务状态
task_id = message.content.get("task_id")
subtask_id = message.content.get("subtask_id")
result = message.content.get("result")
if task_id in self.active_tasks:
self.active_tasks[task_id]["results"][subtask_id] = result
实战:构建代码审查Multi-Agent系统
系统架构
┌─────────────────────────────────────────────────────────┐
│ 代码审查Multi-Agent系统 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ 协调Agent │ - 接收代码审查请求 │
│ │Coordinator │ - 分配审查任务 │
│ └──────┬──────┘ - 聚合审查结果 │
│ │ │
│ ┌────┴────┬────────────┬────────────┐ │
│ ▼ ▼ ▼ ▼ │
│ ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ │
│ │安全Agent│ │性能Agent│ │风格Agent│ │测试Agent│ │
│ │Security│ │Performance│ │ Style │ │ Test │ │
│ └───────┘ └───────┘ └───────┘ └───────┘ │
│ │ │ │ │ │
│ └─────────┴────────────┴────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ 报告生成器 │ │
│ │ Reporter │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
完整代码实现
# multi_agent_code_review.py
import asyncio
import time
from typing import Dict, List, Any
from dataclasses import dataclass, field
@dataclass
class ReviewResult:
"""代码审查结果"""
agent_name: str
issues: List[Dict[str, Any]] = field(default_factory=list)
score: float = 0.0
suggestions: List[str] = field(default_factory=list)
time_spent: float = 0.0
class SecurityAgent(BaseAgent):
"""安全审查Agent"""
def __init__(self, agent_id: str):
super().__init__(
agent_id,
"SecurityReviewer",
["security_review", "vulnerability_detection"]
)
self.security_patterns = [
r"eval\s*\(",
r"exec\s*\(",
r"os\.system\s*\(",
r"subprocess\.call\s*\(",
r"input\s*\(",
r"raw_input\s*\(",
]
async def process_task(self, task: Dict[str, Any]) -> ReviewResult:
"""执行安全审查"""
start_time = time.time()
code = task.get("code", "")
language = task.get("language", "python")
issues = []
# 1. 检查危险函数
import re
for pattern in self.security_patterns:
matches = re.finditer(pattern, code, re.IGNORECASE)
for match in matches:
issues.append({
"type": "security",
"severity": "high",
"line": code[:match.start()].count('\n') + 1,
"message": f"发现潜在危险函数调用: {match.group()}",
"suggestion": "避免使用危险函数,使用更安全的替代方案"
})
# 2. 检查SQL注入风险
sql_patterns = [r"SELECT.*FROM.*WHERE.*\+", r"INSERT.*INTO.*\+"]
for pattern in sql_patterns:
if re.search(pattern, code, re.IGNORECASE):
issues.append({
"type": "security",
"severity": "critical",
"line": 0,
"message": "可能存在SQL注入风险",
"suggestion": "使用参数化查询或ORM"
})
# 3. 检查硬编码密钥
secret_patterns = [
r"password\s*=\s*['\"][^'\"]+['\"]",
r"api_key\s*=\s*['\"][^'\"]+['\"]",
r"secret\s*=\s*['\"][^'\"]+['\"]",
]
for pattern in secret_patterns:
matches = re.finditer(pattern, code, re.IGNORECASE)
for match in matches:
issues.append({
"type": "security",
"severity": "high",
"line": code[:match.start()].count('\n') + 1,
"message": "发现可能的硬编码敏感信息",
"suggestion": "使用环境变量或密钥管理服务"
})
# 计算安全评分
score = max(0, 100 - len([i for i in issues if i["severity"] == "critical"]) * 30
- len([i for i in issues if i["severity"] == "high"]) * 10
- len([i for i in issues if i["severity"] == "medium"]) * 5)
return ReviewResult(
agent_name=self.name,
issues=issues,
score=score,
suggestions=list(set([i["suggestion"] for i in issues])),
time_spent=time.time() - start_time
)
async def handle_message(self, message: AgentMessage) -> None:
if message.message_type == "TASK_ASSIGNMENT":
result = await self.process_task(message.content)
# 发送结果回协调器
await self.send_message(
receiver="coordinator",
message_type="TASK_COMPLETE",
content={
"task_id": message.content.get("task_id"),
"subtask_id": message.content.get("subtask_id"),
"result": result
}
)
class PerformanceAgent(BaseAgent):
"""性能审查Agent"""
def __init__(self, agent_id: str):
super().__init__(
agent_id,
"PerformanceReviewer",
["performance_review", "optimization_suggestions"]
)
async def process_task(self, task: Dict[str, Any]) -> ReviewResult:
"""执行性能审查"""
start_time = time.time()
code = task.get("code", "")
issues = []
import re
# 1. 检查循环中的低效操作
loop_patterns = [
(r"for.*in.*:\s*\n.*\.append\s*\(", "在循环中使用append,考虑使用列表推导式"),
(r"for.*in.*:\s*\n.*range\s*\(\s*len\s*\(", "使用range(len()),建议直接使用enumerate"),
]
for pattern, msg in loop_patterns:
if re.search(pattern, code):
issues.append({
"type": "performance",
"severity": "medium",
"line": 0,
"message": msg,
"suggestion": "优化循环结构以提高性能"
})
# 2. 检查重复计算
if re.search(r"for.*in.*:\s*\n.*len\s*\(\s*\w+\s*\)", code):
issues.append({
"type": "performance",
"severity": "low",
"line": 0,
"message": "循环中重复计算长度",
"suggestion": "将长度计算移到循环外部"
})
# 3. 检查递归深度
if re.search(r"def\s+\w+\s*\([^)]*\):\s*\n.*\w+\s*\(", code):
issues.append({
"type": "performance",
"severity": "low",
"line": 0,
"message": "检测到递归调用",
"suggestion": "考虑使用迭代替代递归,或添加递归深度限制"
})
score = max(0, 100 - len(issues) * 5)
return ReviewResult(
agent_name=self.name,
issues=issues,
score=score,
suggestions=list(set([i["suggestion"] for i in issues])),
time_spent=time.time() - start_time
)
async def handle_message(self, message: AgentMessage) -> None:
if message.message_type == "TASK_ASSIGNMENT":
result = await self.process_task(message.content)
await self.send_message(
receiver="coordinator",
message_type="TASK_COMPLETE",
content={
"task_id": message.content.get("task_id"),
"subtask_id": message.content.get("subtask_id"),
"result": result
}
)
class StyleAgent(BaseAgent):
"""代码风格审查Agent"""
def __init__(self, agent_id: str):
super().__init__(
agent_id,
"StyleReviewer",
["style_review", "code_formatting"]
)
async def process_task(self, task: Dict[str, Any]) -> ReviewResult:
"""执行风格审查"""
start_time = time.time()
code = task.get("code", "")
issues = []
import re
# 1. 检查行长度
lines = code.split('\n')
for i, line in enumerate(lines, 1):
if len(line) > 100:
issues.append({
"type": "style",
"severity": "low",
"line": i,
"message": f"行长度超过100字符 ({len(line)}字符)",
"suggestion": "考虑换行或重构代码"
})
# 2. 检查命名规范
snake_case_pattern = r"[a-z_][a-z0-9_]*"
camel_case_vars = re.findall(r"\b([a-z]+[A-Z][a-zA-Z0-9]*)\b", code)
for var in camel_case_vars:
issues.append({
"type": "style",
"severity": "low",
"line": 0,
"message": f"变量名 '{var}' 使用驼峰命名法",
"suggestion": "Python建议使用snake_case命名"
})
# 3. 检查文档字符串
if not re.search(r'""".*?"""', code, re.DOTALL):
issues.append({
"type": "style",
"severity": "medium",
"line": 0,
"message": "缺少模块/函数文档字符串",
"suggestion": "为公共函数和类添加docstring"
})
score = max(0, 100 - len(issues) * 3)
return ReviewResult(
agent_name=self.name,
issues=issues,
score=score,
suggestions=list(set([i["suggestion"] for i in issues])),
time_spent=time.time() - start_time
)
async def handle_message(self, message: AgentMessage) -> None:
if message.message_type == "TASK_ASSIGNMENT":
result = await self.process_task(message.content)
await self.send_message(
receiver="coordinator",
message_type="TASK_COMPLETE",
content={
"task_id": message.content.get("task_id"),
"subtask_id": message.content.get("subtask_id"),
"result": result
}
)
class CodeReviewSystem:
"""代码审查系统主类"""
def __init__(self):
self.coordinator = CoordinatorAgent("coordinator", None)
self.agents: Dict[str, BaseAgent] = {}
self._setup_agents()
def _setup_agents(self):
"""初始化所有审查Agent"""
agents = [
SecurityAgent("security_agent"),
PerformanceAgent("performance_agent"),
StyleAgent("style_agent"),
]
for agent in agents:
self.agents[agent.agent_id] = agent
self.coordinator.register_agent(agent)
async def review_code(self, code: str,
language: str = "python") -> Dict[str, Any]:
"""执行代码审查"""
start_time = time.time()
# 并行执行所有审查
tasks = [
agent.process_task({"code": code, "language": language})
for agent in self.agents.values()
]
results = await asyncio.gather(*tasks)
# 聚合结果
total_issues = sum(len(r.issues) for r in results)
avg_score = sum(r.score for r in results) / len(results)
total_time = time.time() - start_time
# 按严重级别分类
critical_issues = []
high_issues = []
medium_issues = []
low_issues = []
for result in results:
for issue in result.issues:
if issue["severity"] == "critical":
critical_issues.append(issue)
elif issue["severity"] == "high":
high_issues.append(issue)
elif issue["severity"] == "medium":
medium_issues.append(issue)
else:
low_issues.append(issue)
return {
"overall_score": round(avg_score, 2),
"total_issues": total_issues,
"critical_count": len(critical_issues),
"high_count": len(high_issues),
"medium_count": len(medium_issues),
"low_count": len(low_issues),
"issues_by_category": {
"critical": critical_issues,
"high": high_issues,
"medium": medium_issues,
"low": low_issues
},
"agent_results": [
{
"agent": r.agent_name,
"score": r.score,
"issue_count": len(r.issues),
"time_spent": round(r.time_spent, 3)
}
for r in results
],
"total_time": round(total_time, 3),
"suggestions": list(set(
suggestion
for r in results
for suggestion in r.suggestions
))
}
# 使用示例
async def main():
# 待审查的代码
sample_code = '''
def process_user_data(user_input):
query = "SELECT * FROM users WHERE name = '" + user_input + "'"
result = db.execute(query)
password = "secret123"
api_key = "sk-1234567890abcdef"
data = []
for i in range(len(result)):
data.append(result[i])
eval(user_input)
return data
'''
# 创建审查系统
review_system = CodeReviewSystem()
# 执行审查
report = await review_system.review_code(sample_code)
# 输出报告
print("=" * 60)
print("代码审查报告")
print("=" * 60)
print(f"综合评分: {report['overall_score']}/100")
print(f"问题总数: {report['total_issues']}")
print(f" - 严重: {report['critical_count']}")
print(f" - 高: {report['high_count']}")
print(f" - 中: {report['medium_count']}")
print(f" - 低: {report['low_count']}")
print(f"\n审查耗时: {report['total_time']}秒")
print("\n各Agent审查结果:")
for agent_result in report['agent_results']:
print(f" - {agent_result['agent']}: "
f"评分 {agent_result['score']}, "
f"发现 {agent_result['issue_count']} 个问题, "
f"耗时 {agent_result['time_spent']}秒")
print("\n改进建议:")
for suggestion in report['suggestions']:
print(f" • {suggestion}")
if __name__ == "__main__":
asyncio.run(main())
性能优化策略
1. 并行执行优化
import asyncio
from concurrent.futures import ThreadPoolExecutor
class OptimizedCoordinator:
"""优化的协调器,支持更高效的并行执行"""
def __init__(self, max_workers: int = 10):
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.semaphore = asyncio.Semaphore(max_workers)
async def execute_parallel(self, tasks: List[Dict],
agents: List[BaseAgent]) -> List[Any]:
"""优化的并行执行"""
async def execute_with_limit(task, agent):
async with self.semaphore:
return await agent.process_task(task)
# 创建所有任务
coroutines = [
execute_with_limit(task, agent)
for task, agent in zip(tasks, agents)
]
# 使用gather并行执行,添加超时控制
results = await asyncio.gather(
*coroutines,
return_exceptions=True
)
# 处理异常
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 执行失败: {result}")
processed_results.append(None)
else:
processed_results.append(result)
return processed_results
2. 结果缓存
import hashlib
import json
from functools import wraps
class ResultCache:
"""审查结果缓存"""
def __init__(self, ttl: int = 3600):
self.cache: Dict[str, Dict] = {}
self.ttl = ttl
def _generate_key(self, code: str, agent_type: str) -> str:
"""生成缓存key"""
content = f"{code}:{agent_type}"
return hashlib.md5(content.encode()).hexdigest()
def get(self, code: str, agent_type: str) -> Optional[Any]:
"""获取缓存结果"""
key = self._generate_key(code, agent_type)
if key in self.cache:
entry = self.cache[key]
if time.time() - entry["timestamp"] < self.ttl:
return entry["result"]
else:
del self.cache[key]
return None
def set(self, code: str, agent_type: str, result: Any) -> None:
"""设置缓存"""
key = self._generate_key(code, agent_type)
self.cache[key] = {
"result": result,
"timestamp": time.time()
}
def cached_review(cache: ResultCache):
"""审查结果缓存装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(self, task: Dict[str, Any]):
code = task.get("code", "")
agent_type = self.__class__.__name__
# 检查缓存
cached_result = cache.get(code, agent_type)
if cached_result:
print(f"使用缓存结果: {agent_type}")
return cached_result
# 执行审查
result = await func(self, task)
# 缓存结果
cache.set(code, agent_type, result)
return result
return wrapper
return decorator
3. 负载均衡
class LoadBalancer:
"""Agent负载均衡器"""
def __init__(self):
self.agent_loads: Dict[str, int] = {}
self.agent_queue_sizes: Dict[str, int] = {}
def register_agent(self, agent_id: str):
"""注册Agent"""
self.agent_loads[agent_id] = 0
self.agent_queue_sizes[agent_id] = 0
def update_load(self, agent_id: str, delta: int):
"""更新Agent负载"""
self.agent_loads[agent_id] = max(0, self.agent_loads[agent_id] + delta)
def select_agent(self, capable_agents: List[str]) -> str:
"""选择负载最低的Agent"""
if not capable_agents:
raise ValueError("没有可用的Agent")
# 选择负载最低的
return min(
capable_agents,
key=lambda aid: (
self.agent_loads.get(aid, float('inf')),
self.agent_queue_sizes.get(aid, float('inf'))
)
)
监控与可观测性
from dataclasses import dataclass
from typing import Dict, List
import time
@dataclass
class AgentMetrics:
"""Agent性能指标"""
agent_id: str
tasks_completed: int = 0
tasks_failed: int = 0
avg_response_time: float = 0.0
total_response_time: float = 0.0
last_active: float = 0.0
error_count: int = 0
class MetricsCollector:
"""指标收集器"""
def __init__(self):
self.metrics: Dict[str, AgentMetrics] = {}
self.message_count = 0
self.start_time = time.time()
def record_task_start(self, agent_id: str):
"""记录任务开始"""
if agent_id not in self.metrics:
self.metrics[agent_id] = AgentMetrics(agent_id)
def record_task_complete(self, agent_id: str,
duration: float, success: bool = True):
"""记录任务完成"""
metric = self.metrics.get(agent_id)
if metric:
if success:
metric.tasks_completed += 1
else:
metric.tasks_failed += 1
metric.total_response_time += duration
metric.avg_response_time = (
metric.total_response_time /
(metric.tasks_completed + metric.tasks_failed)
)
metric.last_active = time.time()
def record_message(self):
"""记录消息"""
self.message_count += 1
def get_system_health(self) -> Dict:
"""获取系统健康状态"""
total_tasks = sum(m.tasks_completed + m.tasks_failed
for m in self.metrics.values())
total_errors = sum(m.error_count for m in self.metrics.values())
return {
"uptime": time.time() - self.start_time,
"total_messages": self.message_count,
"total_tasks": total_tasks,
"total_errors": total_errors,
"success_rate": (
(total_tasks - total_errors) / total_tasks * 100
if total_tasks > 0 else 100
),
"agent_status": {
aid: {
"tasks_completed": m.tasks_completed,
"avg_response_time": round(m.avg_response_time, 3),
"last_active": m.last_active
}
for aid, m in self.metrics.items()
}
}
总结
本文深入探讨了Multi-Agent系统的设计与实现,核心要点包括:
- 架构选择:根据业务场景选择合适的架构模式(层级、网状、黑板)
- 通信机制:消息总线模式实现松耦合的Agent通信
- 协调策略:任务分解、依赖管理和并行调度
- 实战应用:代码审查系统的完整实现
- 性能优化:并行执行、结果缓存和负载均衡
- 可观测性:全面的监控和指标收集
Multi-Agent系统虽然增加了复杂度,但能够显著提升系统的专业性、可靠性和扩展性。在实际应用中,建议:
- 从简单的双Agent系统开始,逐步扩展
- 重视Agent间的接口设计和契约定义
- 建立完善的监控和容错机制
- 持续优化性能和资源利用率
相关资源
本文最后更新于 2024-02-20,如有问题欢迎在社区讨论。