AI Agent 高级 Multi-Agent 系统设计 LangGraph AutoGen

Multi-Agent系统设计与实现:从架构到实战

AIEng Hub
阅读约 35 分钟

引言

随着AI Agent技术的快速发展,单一Agent已经难以满足复杂业务场景的需求。Multi-Agent系统通过多个专业Agent的协作,能够处理更复杂的任务、提供更可靠的服务。

本文将深入探讨Multi-Agent系统的核心设计原则、通信机制和协调策略,并通过实战代码展示如何构建一个生产级的Multi-Agent系统。

为什么需要Multi-Agent系统?

单一Agent的局限性

局限性说明影响
上下文窗口限制复杂任务需要大量上下文,容易超出token限制信息丢失,决策质量下降
职责单一一个Agent难以同时精通多个领域专业性不足,错误率上升
单点故障单个Agent出错导致整个系统失败系统可靠性降低
扩展困难增加功能需要修改核心逻辑维护成本增加
性能瓶颈复杂任务串行执行,耗时长用户体验差

Multi-Agent的优势

┌─────────────────────────────────────────────────────────┐
│              Multi-Agent 系统架构                        │
├─────────────────────────────────────────────────────────┤
│                                                         │
│   ┌─────────────┐      ┌─────────────┐                 │
│   │  协调Agent   │◄────►│  监控Agent   │                 │
│   │ Coordinator │      │   Monitor   │                 │
│   └──────┬──────┘      └─────────────┘                 │
│          │                                             │
│          ▼                                             │
│   ┌─────────────────────────────────┐                  │
│   │           消息总线               │                  │
│   │         Message Bus             │                  │
│   └─────────────────────────────────┘                  │
│          │              │              │               │
│          ▼              ▼              ▼               │
│   ┌─────────┐    ┌─────────┐    ┌─────────┐           │
│   │研究Agent │    │代码Agent │    │测试Agent │           │
│   │Research │    │  Code   │    │  Test   │           │
│   └─────────┘    └─────────┘    └─────────┘           │
│          │              │              │               │
│          └──────────────┼──────────────┘               │
│                         ▼                              │
│                  ┌─────────────┐                       │
│                  │  结果聚合器  │                       │
│                  │  Aggregator │                       │
│                  └─────────────┘                       │
│                                                         │
└─────────────────────────────────────────────────────────┘
  1. 专业化分工:每个Agent专注特定领域,提升专业度
  2. 并行处理:多个Agent同时工作,提高效率
  3. 容错性:单个Agent失败不影响整体系统
  4. 可扩展性:新增Agent无需修改现有代码
  5. 模块化:易于测试、维护和升级

Multi-Agent系统架构模式

模式1:层级架构(Hierarchical)

┌─────────────────┐
│   主控Agent      │
│  (Supervisor)   │
└────────┬────────┘

    ┌────┴────┬────────┬────────┐
    ▼         ▼        ▼        ▼
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐
│Agent A│ │Agent B│ │Agent C│ │Agent D│
└───────┘ └───────┘ └───────┘ └───────┘

特点

  • 一个主控Agent负责任务分配和协调
  • 子Agent负责具体执行
  • 适合任务明确、流程清晰的场景

适用场景

  • 软件开发流程(需求→设计→编码→测试)
  • 数据处理流水线
  • 客服工单处理

模式2:网状架构(Mesh/Peer-to-Peer)

┌───────┐      ┌───────┐
│Agent A│◄────►│Agent B│
└───┬───┘      └───┬───┘
    │              │
    ▼              ▼
┌───────┐      ┌───────┐
│Agent C│◄────►│Agent D│
└───────┘      └───────┘

特点

  • Agent之间平等通信,无中心节点
  • 灵活度高,适应性强
  • 需要复杂的协商机制

适用场景

  • 模拟仿真系统
  • 多智能体博弈
  • 分布式决策系统

模式3:黑板架构(Blackboard)

┌─────────────────────────────────────┐
│           共享知识库                 │
│         (Shared Blackboard)         │
│  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐   │
│  │知识A │ │知识B │ │知识C │ │知识D │   │
│  └─────┘ └─────┘ └─────┘ └─────┘   │
└─────────────────────────────────────┘
    ▲         ▲         ▲         ▲
    │         │         │         │
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐
│Agent A│ │Agent B│ │Agent C│ │Agent D│
└───────┘ └───────┘ └───────┘ └───────┘

特点

  • 所有Agent共享一个知识库
  • Agent通过读写共享数据协作
  • 适合知识密集型任务

适用场景

  • 医疗诊断系统
  • 故障排查系统
  • 复杂问题求解

模式对比

架构模式通信复杂度容错性扩展性适用场景
层级架构流程明确的任务
网状架构复杂协作场景
黑板架构知识共享场景

核心组件设计

1. Agent定义与接口

from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import uuid

class AgentStatus(Enum):
    IDLE = "idle"
    BUSY = "busy"
    ERROR = "error"
    OFFLINE = "offline"

@dataclass
class AgentMessage:
    """Agent间通信消息"""
    id: str
    sender: str
    receiver: str
    message_type: str
    content: Any
    timestamp: float
    metadata: Dict[str, Any]
    
    @classmethod
    def create(cls, sender: str, receiver: str, 
               message_type: str, content: Any, 
               metadata: Dict = None) -> "AgentMessage":
        return cls(
            id=str(uuid.uuid4()),
            sender=sender,
            receiver=receiver,
            message_type=message_type,
            content=content,
            timestamp=time.time(),
            metadata=metadata or {}
        )

class BaseAgent(ABC):
    """Agent基类"""
    
    def __init__(self, agent_id: str, name: str, 
                 capabilities: List[str] = None):
        self.agent_id = agent_id
        self.name = name
        self.capabilities = capabilities or []
        self.status = AgentStatus.IDLE
        self.memory: List[AgentMessage] = []
        self.message_queue: List[AgentMessage] = []
        
    @abstractmethod
    async def process_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """处理任务的核心方法"""
        pass
    
    @abstractmethod
    async def handle_message(self, message: AgentMessage) -> None:
        """处理收到的消息"""
        pass
    
    def can_handle(self, task_type: str) -> bool:
        """检查是否能处理某类任务"""
        return task_type in self.capabilities
    
    async def send_message(self, receiver: str, 
                          message_type: str, 
                          content: Any,
                          metadata: Dict = None) -> AgentMessage:
        """发送消息给其他Agent"""
        message = AgentMessage.create(
            sender=self.agent_id,
            receiver=receiver,
            message_type=message_type,
            content=content,
            metadata=metadata
        )
        # 通过消息总线发送
        await MessageBus.send(message)
        return message

2. 消息总线(Message Bus)

import asyncio
from typing import Callable, Dict, Set
from collections import defaultdict

class MessageBus:
    """Agent间通信的消息总线"""
    
    _instance = None
    _subscribers: Dict[str, Set[Callable]] = defaultdict(set)
    _message_history: List[AgentMessage] = []
    _max_history = 1000
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    @classmethod
    async def send(cls, message: AgentMessage) -> None:
        """发送消息"""
        # 保存到历史
        cls._message_history.append(message)
        if len(cls._message_history) > cls._max_history:
            cls._message_history.pop(0)
        
        # 通知订阅者
        # 1. 特定接收者
        if message.receiver in cls._subscribers:
            for callback in cls._subscribers[message.receiver]:
                try:
                    await callback(message)
                except Exception as e:
                    print(f"消息处理失败: {e}")
        
        # 2. 广播订阅者
        if "*" in cls._subscribers:
            for callback in cls._subscribers["*"]:
                try:
                    await callback(message)
                except Exception as e:
                    print(f"广播处理失败: {e}")
    
    @classmethod
    def subscribe(cls, agent_id: str, 
                  callback: Callable[[AgentMessage], None]) -> None:
        """订阅消息"""
        cls._subscribers[agent_id].add(callback)
    
    @classmethod
    def unsubscribe(cls, agent_id: str, 
                    callback: Callable[[AgentMessage], None]) -> None:
        """取消订阅"""
        cls._subscribers[agent_id].discard(callback)
    
    @classmethod
    def get_message_history(cls, 
                           sender: str = None,
                           receiver: str = None,
                           message_type: str = None,
                           limit: int = 100) -> List[AgentMessage]:
        """查询消息历史"""
        messages = cls._message_history
        
        if sender:
            messages = [m for m in messages if m.sender == sender]
        if receiver:
            messages = [m for m in messages if m.receiver == receiver]
        if message_type:
            messages = [m for m in messages if m.message_type == message_type]
        
        return messages[-limit:]

3. 协调器(Coordinator)

from typing import List, Dict, Any, Optional
import asyncio

class TaskPlanner:
    """任务规划器"""
    
    def __init__(self, llm_client):
        self.llm = llm_client
    
    async def decompose_task(self, task: str, 
                            available_agents: List[BaseAgent]) -> List[Dict]:
        """将复杂任务分解为子任务"""
        
        agent_info = "\n".join([
            f"- {agent.name}: {', '.join(agent.capabilities)}"
            for agent in available_agents
        ])
        
        prompt = f"""
        请将以下任务分解为可并行或串行执行的子任务。
        
        可用Agent及其能力:
        {agent_info}
        
        任务:{task}
        
        要求:
        1. 每个子任务明确指定执行Agent
        2. 标注任务依赖关系
        3. 输出JSON格式
        
        输出示例:
        {{
            "subtasks": [
                {{
                    "id": "task_1",
                    "description": "子任务描述",
                    "assigned_to": "agent_name",
                    "dependencies": [],
                    "priority": 1
                }}
            ]
        }}
        """
        
        response = await self.llm.complete(prompt)
        # 解析JSON并返回子任务列表
        return self._parse_plan(response)

class CoordinatorAgent(BaseAgent):
    """协调Agent - 负责任务分配和进度管理"""
    
    def __init__(self, agent_id: str, llm_client):
        super().__init__(agent_id, "Coordinator", 
                        ["task_planning", "coordination"])
        self.planner = TaskPlanner(llm_client)
        self.agents: Dict[str, BaseAgent] = {}
        self.active_tasks: Dict[str, Dict] = {}
        
    def register_agent(self, agent: BaseAgent) -> None:
        """注册Agent到系统"""
        self.agents[agent.agent_id] = agent
        # 订阅Agent的消息
        MessageBus.subscribe(agent.agent_id, self._handle_agent_message)
    
    async def execute_task(self, task: str) -> Dict[str, Any]:
        """执行复杂任务"""
        # 1. 任务分解
        subtasks = await self.planner.decompose_task(
            task, 
            list(self.agents.values())
        )
        
        # 2. 创建任务追踪
        task_id = str(uuid.uuid4())
        self.active_tasks[task_id] = {
            "task": task,
            "subtasks": {st["id"]: st for st in subtasks},
            "completed": set(),
            "results": {}
        }
        
        # 3. 按依赖关系调度执行
        results = await self._schedule_execution(task_id)
        
        # 4. 聚合结果
        final_result = await self._aggregate_results(task_id, results)
        
        return final_result
    
    async def _schedule_execution(self, task_id: str) -> Dict[str, Any]:
        """调度子任务执行"""
        task_info = self.active_tasks[task_id]
        subtasks = task_info["subtasks"]
        results = {}
        
        # 拓扑排序,按依赖执行
        ready_tasks = [st for st in subtasks.values() 
                      if not st["dependencies"]]
        pending_tasks = [st for st in subtasks.values() 
                        if st["dependencies"]]
        
        while ready_tasks:
            # 并行执行就绪任务
            batch = ready_tasks[:5]  # 每批最多5个
            ready_tasks = ready_tasks[5:]
            
            batch_results = await asyncio.gather(*[
                self._execute_subtask(task_id, st)
                for st in batch
            ])
            
            for st, result in zip(batch, batch_results):
                results[st["id"]] = result
                task_info["completed"].add(st["id"])
            
            # 检查新就绪的任务
            newly_ready = []
            for st in pending_tasks[:]:
                if all(dep in task_info["completed"] 
                       for dep in st["dependencies"]):
                    newly_ready.append(st)
                    pending_tasks.remove(st)
            
            ready_tasks.extend(newly_ready)
        
        return results
    
    async def _execute_subtask(self, task_id: str, 
                               subtask: Dict) -> Any:
        """执行单个子任务"""
        agent = self.agents.get(subtask["assigned_to"])
        if not agent:
            raise ValueError(f"Agent {subtask['assigned_to']} 不存在")
        
        # 发送任务执行消息
        await self.send_message(
            receiver=agent.agent_id,
            message_type="TASK_ASSIGNMENT",
            content={
                "task_id": task_id,
                "subtask_id": subtask["id"],
                "description": subtask["description"]
            }
        )
        
        # 等待结果(实际实现需要更复杂的同步机制)
        # 这里简化处理
        return await agent.process_task(subtask)
    
    async def _handle_agent_message(self, message: AgentMessage) -> None:
        """处理Agent的反馈消息"""
        if message.message_type == "TASK_COMPLETE":
            # 更新任务状态
            task_id = message.content.get("task_id")
            subtask_id = message.content.get("subtask_id")
            result = message.content.get("result")
            
            if task_id in self.active_tasks:
                self.active_tasks[task_id]["results"][subtask_id] = result

实战:构建代码审查Multi-Agent系统

系统架构

┌─────────────────────────────────────────────────────────┐
│              代码审查Multi-Agent系统                      │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────────┐                                        │
│  │  协调Agent   │  - 接收代码审查请求                     │
│  │Coordinator  │  - 分配审查任务                         │
│  └──────┬──────┘  - 聚合审查结果                         │
│         │                                               │
│    ┌────┴────┬────────────┬────────────┐               │
│    ▼         ▼            ▼            ▼               │
│ ┌───────┐ ┌───────┐   ┌───────┐   ┌───────┐           │
│ │安全Agent│ │性能Agent│   │风格Agent│   │测试Agent│           │
│ │Security│ │Performance│ │ Style │   │  Test │           │
│ └───────┘ └───────┘   └───────┘   └───────┘           │
│    │         │            │            │               │
│    └─────────┴────────────┴────────────┘               │
│                      │                                  │
│                      ▼                                  │
│              ┌─────────────┐                           │
│              │  报告生成器  │                           │
│              │  Reporter   │                           │
│              └─────────────┘                           │
│                                                         │
└─────────────────────────────────────────────────────────┘

完整代码实现

# multi_agent_code_review.py
import asyncio
import time
from typing import Dict, List, Any
from dataclasses import dataclass, field

@dataclass
class ReviewResult:
    """代码审查结果"""
    agent_name: str
    issues: List[Dict[str, Any]] = field(default_factory=list)
    score: float = 0.0
    suggestions: List[str] = field(default_factory=list)
    time_spent: float = 0.0

class SecurityAgent(BaseAgent):
    """安全审查Agent"""
    
    def __init__(self, agent_id: str):
        super().__init__(
            agent_id, 
            "SecurityReviewer",
            ["security_review", "vulnerability_detection"]
        )
        self.security_patterns = [
            r"eval\s*\(",
            r"exec\s*\(",
            r"os\.system\s*\(",
            r"subprocess\.call\s*\(",
            r"input\s*\(",
            r"raw_input\s*\(",
        ]
    
    async def process_task(self, task: Dict[str, Any]) -> ReviewResult:
        """执行安全审查"""
        start_time = time.time()
        code = task.get("code", "")
        language = task.get("language", "python")
        
        issues = []
        
        # 1. 检查危险函数
        import re
        for pattern in self.security_patterns:
            matches = re.finditer(pattern, code, re.IGNORECASE)
            for match in matches:
                issues.append({
                    "type": "security",
                    "severity": "high",
                    "line": code[:match.start()].count('\n') + 1,
                    "message": f"发现潜在危险函数调用: {match.group()}",
                    "suggestion": "避免使用危险函数,使用更安全的替代方案"
                })
        
        # 2. 检查SQL注入风险
        sql_patterns = [r"SELECT.*FROM.*WHERE.*\+", r"INSERT.*INTO.*\+"]
        for pattern in sql_patterns:
            if re.search(pattern, code, re.IGNORECASE):
                issues.append({
                    "type": "security",
                    "severity": "critical",
                    "line": 0,
                    "message": "可能存在SQL注入风险",
                    "suggestion": "使用参数化查询或ORM"
                })
        
        # 3. 检查硬编码密钥
        secret_patterns = [
            r"password\s*=\s*['\"][^'\"]+['\"]",
            r"api_key\s*=\s*['\"][^'\"]+['\"]",
            r"secret\s*=\s*['\"][^'\"]+['\"]",
        ]
        for pattern in secret_patterns:
            matches = re.finditer(pattern, code, re.IGNORECASE)
            for match in matches:
                issues.append({
                    "type": "security",
                    "severity": "high",
                    "line": code[:match.start()].count('\n') + 1,
                    "message": "发现可能的硬编码敏感信息",
                    "suggestion": "使用环境变量或密钥管理服务"
                })
        
        # 计算安全评分
        score = max(0, 100 - len([i for i in issues if i["severity"] == "critical"]) * 30
                      - len([i for i in issues if i["severity"] == "high"]) * 10
                      - len([i for i in issues if i["severity"] == "medium"]) * 5)
        
        return ReviewResult(
            agent_name=self.name,
            issues=issues,
            score=score,
            suggestions=list(set([i["suggestion"] for i in issues])),
            time_spent=time.time() - start_time
        )
    
    async def handle_message(self, message: AgentMessage) -> None:
        if message.message_type == "TASK_ASSIGNMENT":
            result = await self.process_task(message.content)
            # 发送结果回协调器
            await self.send_message(
                receiver="coordinator",
                message_type="TASK_COMPLETE",
                content={
                    "task_id": message.content.get("task_id"),
                    "subtask_id": message.content.get("subtask_id"),
                    "result": result
                }
            )

class PerformanceAgent(BaseAgent):
    """性能审查Agent"""
    
    def __init__(self, agent_id: str):
        super().__init__(
            agent_id,
            "PerformanceReviewer",
            ["performance_review", "optimization_suggestions"]
        )
    
    async def process_task(self, task: Dict[str, Any]) -> ReviewResult:
        """执行性能审查"""
        start_time = time.time()
        code = task.get("code", "")
        
        issues = []
        import re
        
        # 1. 检查循环中的低效操作
        loop_patterns = [
            (r"for.*in.*:\s*\n.*\.append\s*\(", "在循环中使用append,考虑使用列表推导式"),
            (r"for.*in.*:\s*\n.*range\s*\(\s*len\s*\(", "使用range(len()),建议直接使用enumerate"),
        ]
        for pattern, msg in loop_patterns:
            if re.search(pattern, code):
                issues.append({
                    "type": "performance",
                    "severity": "medium",
                    "line": 0,
                    "message": msg,
                    "suggestion": "优化循环结构以提高性能"
                })
        
        # 2. 检查重复计算
        if re.search(r"for.*in.*:\s*\n.*len\s*\(\s*\w+\s*\)", code):
            issues.append({
                "type": "performance",
                "severity": "low",
                "line": 0,
                "message": "循环中重复计算长度",
                "suggestion": "将长度计算移到循环外部"
            })
        
        # 3. 检查递归深度
        if re.search(r"def\s+\w+\s*\([^)]*\):\s*\n.*\w+\s*\(", code):
            issues.append({
                "type": "performance",
                "severity": "low",
                "line": 0,
                "message": "检测到递归调用",
                "suggestion": "考虑使用迭代替代递归,或添加递归深度限制"
            })
        
        score = max(0, 100 - len(issues) * 5)
        
        return ReviewResult(
            agent_name=self.name,
            issues=issues,
            score=score,
            suggestions=list(set([i["suggestion"] for i in issues])),
            time_spent=time.time() - start_time
        )
    
    async def handle_message(self, message: AgentMessage) -> None:
        if message.message_type == "TASK_ASSIGNMENT":
            result = await self.process_task(message.content)
            await self.send_message(
                receiver="coordinator",
                message_type="TASK_COMPLETE",
                content={
                    "task_id": message.content.get("task_id"),
                    "subtask_id": message.content.get("subtask_id"),
                    "result": result
                }
            )

class StyleAgent(BaseAgent):
    """代码风格审查Agent"""
    
    def __init__(self, agent_id: str):
        super().__init__(
            agent_id,
            "StyleReviewer",
            ["style_review", "code_formatting"]
        )
    
    async def process_task(self, task: Dict[str, Any]) -> ReviewResult:
        """执行风格审查"""
        start_time = time.time()
        code = task.get("code", "")
        
        issues = []
        import re
        
        # 1. 检查行长度
        lines = code.split('\n')
        for i, line in enumerate(lines, 1):
            if len(line) > 100:
                issues.append({
                    "type": "style",
                    "severity": "low",
                    "line": i,
                    "message": f"行长度超过100字符 ({len(line)}字符)",
                    "suggestion": "考虑换行或重构代码"
                })
        
        # 2. 检查命名规范
        snake_case_pattern = r"[a-z_][a-z0-9_]*"
        camel_case_vars = re.findall(r"\b([a-z]+[A-Z][a-zA-Z0-9]*)\b", code)
        for var in camel_case_vars:
            issues.append({
                "type": "style",
                "severity": "low",
                "line": 0,
                "message": f"变量名 '{var}' 使用驼峰命名法",
                "suggestion": "Python建议使用snake_case命名"
            })
        
        # 3. 检查文档字符串
        if not re.search(r'""".*?"""', code, re.DOTALL):
            issues.append({
                "type": "style",
                "severity": "medium",
                "line": 0,
                "message": "缺少模块/函数文档字符串",
                "suggestion": "为公共函数和类添加docstring"
            })
        
        score = max(0, 100 - len(issues) * 3)
        
        return ReviewResult(
            agent_name=self.name,
            issues=issues,
            score=score,
            suggestions=list(set([i["suggestion"] for i in issues])),
            time_spent=time.time() - start_time
        )
    
    async def handle_message(self, message: AgentMessage) -> None:
        if message.message_type == "TASK_ASSIGNMENT":
            result = await self.process_task(message.content)
            await self.send_message(
                receiver="coordinator",
                message_type="TASK_COMPLETE",
                content={
                    "task_id": message.content.get("task_id"),
                    "subtask_id": message.content.get("subtask_id"),
                    "result": result
                }
            )

class CodeReviewSystem:
    """代码审查系统主类"""
    
    def __init__(self):
        self.coordinator = CoordinatorAgent("coordinator", None)
        self.agents: Dict[str, BaseAgent] = {}
        self._setup_agents()
    
    def _setup_agents(self):
        """初始化所有审查Agent"""
        agents = [
            SecurityAgent("security_agent"),
            PerformanceAgent("performance_agent"),
            StyleAgent("style_agent"),
        ]
        
        for agent in agents:
            self.agents[agent.agent_id] = agent
            self.coordinator.register_agent(agent)
    
    async def review_code(self, code: str, 
                         language: str = "python") -> Dict[str, Any]:
        """执行代码审查"""
        start_time = time.time()
        
        # 并行执行所有审查
        tasks = [
            agent.process_task({"code": code, "language": language})
            for agent in self.agents.values()
        ]
        
        results = await asyncio.gather(*tasks)
        
        # 聚合结果
        total_issues = sum(len(r.issues) for r in results)
        avg_score = sum(r.score for r in results) / len(results)
        total_time = time.time() - start_time
        
        # 按严重级别分类
        critical_issues = []
        high_issues = []
        medium_issues = []
        low_issues = []
        
        for result in results:
            for issue in result.issues:
                if issue["severity"] == "critical":
                    critical_issues.append(issue)
                elif issue["severity"] == "high":
                    high_issues.append(issue)
                elif issue["severity"] == "medium":
                    medium_issues.append(issue)
                else:
                    low_issues.append(issue)
        
        return {
            "overall_score": round(avg_score, 2),
            "total_issues": total_issues,
            "critical_count": len(critical_issues),
            "high_count": len(high_issues),
            "medium_count": len(medium_issues),
            "low_count": len(low_issues),
            "issues_by_category": {
                "critical": critical_issues,
                "high": high_issues,
                "medium": medium_issues,
                "low": low_issues
            },
            "agent_results": [
                {
                    "agent": r.agent_name,
                    "score": r.score,
                    "issue_count": len(r.issues),
                    "time_spent": round(r.time_spent, 3)
                }
                for r in results
            ],
            "total_time": round(total_time, 3),
            "suggestions": list(set(
                suggestion 
                for r in results 
                for suggestion in r.suggestions
            ))
        }

# 使用示例
async def main():
    # 待审查的代码
    sample_code = '''
def process_user_data(user_input):
    query = "SELECT * FROM users WHERE name = '" + user_input + "'"
    result = db.execute(query)
    
    password = "secret123"
    api_key = "sk-1234567890abcdef"
    
    data = []
    for i in range(len(result)):
        data.append(result[i])
    
    eval(user_input)
    
    return data
'''
    
    # 创建审查系统
    review_system = CodeReviewSystem()
    
    # 执行审查
    report = await review_system.review_code(sample_code)
    
    # 输出报告
    print("=" * 60)
    print("代码审查报告")
    print("=" * 60)
    print(f"综合评分: {report['overall_score']}/100")
    print(f"问题总数: {report['total_issues']}")
    print(f"  - 严重: {report['critical_count']}")
    print(f"  - 高: {report['high_count']}")
    print(f"  - 中: {report['medium_count']}")
    print(f"  - 低: {report['low_count']}")
    print(f"\n审查耗时: {report['total_time']}秒")
    print("\n各Agent审查结果:")
    for agent_result in report['agent_results']:
        print(f"  - {agent_result['agent']}: "
              f"评分 {agent_result['score']}, "
              f"发现 {agent_result['issue_count']} 个问题, "
              f"耗时 {agent_result['time_spent']}秒")
    
    print("\n改进建议:")
    for suggestion in report['suggestions']:
        print(f"  • {suggestion}")

if __name__ == "__main__":
    asyncio.run(main())

性能优化策略

1. 并行执行优化

import asyncio
from concurrent.futures import ThreadPoolExecutor

class OptimizedCoordinator:
    """优化的协调器,支持更高效的并行执行"""
    
    def __init__(self, max_workers: int = 10):
        self.max_workers = max_workers
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.semaphore = asyncio.Semaphore(max_workers)
    
    async def execute_parallel(self, tasks: List[Dict], 
                               agents: List[BaseAgent]) -> List[Any]:
        """优化的并行执行"""
        
        async def execute_with_limit(task, agent):
            async with self.semaphore:
                return await agent.process_task(task)
        
        # 创建所有任务
        coroutines = [
            execute_with_limit(task, agent)
            for task, agent in zip(tasks, agents)
        ]
        
        # 使用gather并行执行,添加超时控制
        results = await asyncio.gather(
            *coroutines,
            return_exceptions=True
        )
        
        # 处理异常
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"任务 {i} 执行失败: {result}")
                processed_results.append(None)
            else:
                processed_results.append(result)
        
        return processed_results

2. 结果缓存

import hashlib
import json
from functools import wraps

class ResultCache:
    """审查结果缓存"""
    
    def __init__(self, ttl: int = 3600):
        self.cache: Dict[str, Dict] = {}
        self.ttl = ttl
    
    def _generate_key(self, code: str, agent_type: str) -> str:
        """生成缓存key"""
        content = f"{code}:{agent_type}"
        return hashlib.md5(content.encode()).hexdigest()
    
    def get(self, code: str, agent_type: str) -> Optional[Any]:
        """获取缓存结果"""
        key = self._generate_key(code, agent_type)
        if key in self.cache:
            entry = self.cache[key]
            if time.time() - entry["timestamp"] < self.ttl:
                return entry["result"]
            else:
                del self.cache[key]
        return None
    
    def set(self, code: str, agent_type: str, result: Any) -> None:
        """设置缓存"""
        key = self._generate_key(code, agent_type)
        self.cache[key] = {
            "result": result,
            "timestamp": time.time()
        }

def cached_review(cache: ResultCache):
    """审查结果缓存装饰器"""
    def decorator(func):
        @wraps(func)
        async def wrapper(self, task: Dict[str, Any]):
            code = task.get("code", "")
            agent_type = self.__class__.__name__
            
            # 检查缓存
            cached_result = cache.get(code, agent_type)
            if cached_result:
                print(f"使用缓存结果: {agent_type}")
                return cached_result
            
            # 执行审查
            result = await func(self, task)
            
            # 缓存结果
            cache.set(code, agent_type, result)
            return result
        return wrapper
    return decorator

3. 负载均衡

class LoadBalancer:
    """Agent负载均衡器"""
    
    def __init__(self):
        self.agent_loads: Dict[str, int] = {}
        self.agent_queue_sizes: Dict[str, int] = {}
    
    def register_agent(self, agent_id: str):
        """注册Agent"""
        self.agent_loads[agent_id] = 0
        self.agent_queue_sizes[agent_id] = 0
    
    def update_load(self, agent_id: str, delta: int):
        """更新Agent负载"""
        self.agent_loads[agent_id] = max(0, self.agent_loads[agent_id] + delta)
    
    def select_agent(self, capable_agents: List[str]) -> str:
        """选择负载最低的Agent"""
        if not capable_agents:
            raise ValueError("没有可用的Agent")
        
        # 选择负载最低的
        return min(
            capable_agents,
            key=lambda aid: (
                self.agent_loads.get(aid, float('inf')),
                self.agent_queue_sizes.get(aid, float('inf'))
            )
        )

监控与可观测性

from dataclasses import dataclass
from typing import Dict, List
import time

@dataclass
class AgentMetrics:
    """Agent性能指标"""
    agent_id: str
    tasks_completed: int = 0
    tasks_failed: int = 0
    avg_response_time: float = 0.0
    total_response_time: float = 0.0
    last_active: float = 0.0
    error_count: int = 0

class MetricsCollector:
    """指标收集器"""
    
    def __init__(self):
        self.metrics: Dict[str, AgentMetrics] = {}
        self.message_count = 0
        self.start_time = time.time()
    
    def record_task_start(self, agent_id: str):
        """记录任务开始"""
        if agent_id not in self.metrics:
            self.metrics[agent_id] = AgentMetrics(agent_id)
    
    def record_task_complete(self, agent_id: str, 
                            duration: float, success: bool = True):
        """记录任务完成"""
        metric = self.metrics.get(agent_id)
        if metric:
            if success:
                metric.tasks_completed += 1
            else:
                metric.tasks_failed += 1
            
            metric.total_response_time += duration
            metric.avg_response_time = (
                metric.total_response_time / 
                (metric.tasks_completed + metric.tasks_failed)
            )
            metric.last_active = time.time()
    
    def record_message(self):
        """记录消息"""
        self.message_count += 1
    
    def get_system_health(self) -> Dict:
        """获取系统健康状态"""
        total_tasks = sum(m.tasks_completed + m.tasks_failed 
                         for m in self.metrics.values())
        total_errors = sum(m.error_count for m in self.metrics.values())
        
        return {
            "uptime": time.time() - self.start_time,
            "total_messages": self.message_count,
            "total_tasks": total_tasks,
            "total_errors": total_errors,
            "success_rate": (
                (total_tasks - total_errors) / total_tasks * 100
                if total_tasks > 0 else 100
            ),
            "agent_status": {
                aid: {
                    "tasks_completed": m.tasks_completed,
                    "avg_response_time": round(m.avg_response_time, 3),
                    "last_active": m.last_active
                }
                for aid, m in self.metrics.items()
            }
        }

总结

本文深入探讨了Multi-Agent系统的设计与实现,核心要点包括:

  1. 架构选择:根据业务场景选择合适的架构模式(层级、网状、黑板)
  2. 通信机制:消息总线模式实现松耦合的Agent通信
  3. 协调策略:任务分解、依赖管理和并行调度
  4. 实战应用:代码审查系统的完整实现
  5. 性能优化:并行执行、结果缓存和负载均衡
  6. 可观测性:全面的监控和指标收集

Multi-Agent系统虽然增加了复杂度,但能够显著提升系统的专业性、可靠性和扩展性。在实际应用中,建议:

  • 从简单的双Agent系统开始,逐步扩展
  • 重视Agent间的接口设计和契约定义
  • 建立完善的监控和容错机制
  • 持续优化性能和资源利用率

相关资源


本文最后更新于 2024-02-20,如有问题欢迎在社区讨论。