💡

文章摘要

探索多智能体系统的通信协议、角色分配和任务协调机制

1为什么需要 Multi-Agent?从单兵到团队的范式跃迁

单 Agent 系统已经能够处理很多复杂任务,但在面对真正的大规模复杂问题时,单个 Agent 的局限性就会暴露出来。

认知边界问题:单个 LLM 的上下文窗口虽然越来越大(128K、256K、甚至 1M tokens),但它仍然是一个大脑。当任务涉及多个独立领域(例如:同时需要代码审查、安全审计、性能优化),单个 Agent 需要在不同思维模式之间频繁切换,容易遗漏细节。这就像一个程序员同时兼顾前端、后端、运维、测试——不是不能做,而是效率和质量的折中。

专业化分工:Multi-Agent 的核心思想是让每个 Agent 做它最擅长的事。一个 Agent 专门负责搜索和信息收集,一个专门负责代码生成,一个专门负责代码审查。每个 Agent 可以通过系统 prompt 被赋予特定的角色、专业知识和行为约束,就像人类团队中的专家角色。

容错与鲁棒性:单 Agent 系统中,如果核心 LLM 的输出出现偏差,整个任务链就会失败。Multi-Agent 系统可以通过多人投票、交叉验证、冗余执行等机制来提高系统的可靠性。即使某个 Agent 出错,其他 Agent 也能发现并纠正。

可扩展性:当业务增长、任务复杂度增加时,单 Agent 系统往往需要全面重构。Multi-Agent 系统则可以水平扩展——只需添加新的 Agent 角色,而无需修改现有 Agent 的逻辑。

图表加载中…
python
# 单 Agent vs Multi-Agent 的代码结构对比

# ====== 单 Agent 模式 ======
class SingleAgent:
    """一个 Agent 干所有事"""
    def __init__(self, llm):
        self.llm = llm
    
    def handle_task(self, task: str) -> str:
        # 同一个 LLM 处理搜索、编码、审查...
        # prompt 里要包含所有领域的指令
        return self.llm(f"你是全能助手,请完成:{task}")

# ====== Multi-Agent 模式 ======
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class AgentMessage:
    """Agent 之间传递的消息"""
    sender: str
    content: str
    message_type: str  # "request", "response", "feedback"
    metadata: dict = None

class AgentRole:
    """定义 Agent 的角色"""
    def __init__(self, name: str, system_prompt: str, llm):
        self.name = name
        self.system_prompt = system_prompt
        self.llm = llm
        self.message_history: List[AgentMessage] = []
    
    def process(self, message: AgentMessage) -> AgentMessage:
        prompt = f"{self.system_prompt}\n\n收到消息:{message.content}"
        response = self.llm(prompt)
        self.message_history.append(message)
        return AgentMessage(
            sender=self.name, content=response,
            message_type="response", metadata={"source_role": self.name}
        )

# 创建专业 Agent 团队
researcher = AgentRole("Researcher", 
    "你是一个专业研究员,负责收集和分析信息。", llm)
coder = AgentRole("Coder",
    "你是一个资深工程师,负责编写高质量代码。", llm)
reviewer = AgentRole("Reviewer",
    "你是一个严格的代码审查员,负责发现潜在问题。", llm)
python
# Multi-Agent 协作管道(Pipeline)
from typing import Callable, List

class AgentPipeline:
    """将多个 Agent 串联成处理管道"""
    def __init__(self):
        self.stages: List[AgentRole] = []
    
    def add_stage(self, agent: AgentRole):
        self.stages.append(agent)
        return self
    
    def execute(self, initial_input: str) -> str:
        current_input = initial_input
        for stage in self.stages:
            msg = AgentMessage(
                sender="Pipeline", content=current_input,
                message_type="request"
            )
            response = stage.process(msg)
            current_input = response.content
            print(f"[{stage.name}] -> {current_input[:80]}...")
        return current_input

# 构建处理管道
pipeline = (AgentPipeline()
    .add_stage(researcher)    # 先研究
    .add_stage(coder)         # 再编码
    .add_stage(reviewer)      # 最后审查
)

result = pipeline.execute("实现一个支持并发请求的 HTTP 客户端")
维度单 AgentMulti-Agent适用场景

任务复杂度

中低

单 Agent 适合线性任务,Multi-Agent 适合网状任务

上下文利用

集中式,可能拥挤

分布式,各自专注

Multi-Agent 减少上下文竞争

容错能力

低(单点故障)

高(交叉验证)

关键任务推荐 Multi-Agent

开发成本

中高

快速验证用单 Agent

可扩展性

垂直(更强的模型)

水平(更多 Agent)

长期项目推荐 Multi-Agent

💡 一句话理解

何时选择 Multi-Agent:当你的任务需要多个不同领域的专业知识、或者需要交叉验证来提高可靠性时,Multi-Agent 是更好的选择。但如果任务简单明确,单 Agent 更直接高效。

⚠️ 常见踩坑

多 Agent 系统的隐性成本容易被低估:每个 Agent 调用 LLM 都会消耗 token,一个 4-Agent 协作流程的 token 消耗可能是单 Agent 的 5-8 倍(含消息传递、中间结果传递)。在投入生产前,务必进行成本估算和预算控制。

2通信协议:Agent 之间的对话机制

Multi-Agent 系统的核心挑战之一是:Agent 之间如何高效、准确地通信?这不仅仅是发消息这么简单,而是需要设计一套完整的协议。

直接通信 vs 间接通信:直接通信是两个 Agent 之间点对点地交换消息(A 发消息给 B),这种方式延迟低、语义明确,但耦合度高——每个 Agent 需要知道其他 Agent 的存在和能力。间接通信则是通过共享媒介(黑板模式、消息队列、共享数据库)进行交互,Agent 不需要知道消息的接收者是谁,只需将信息发布到共享空间。这种方式解耦了 Agent,但增加了协调的复杂性。

消息格式设计:良好的消息格式应该包含:消息类型(请求/响应/通知/错误)、发送者标识、内容载荷、时间戳、优先级、以及可选的元数据。JSON 是最常用的格式,但在高性能场景下,Protocol Buffers 或 MessagePack 可能更合适。

同步 vs 异步通信:同步通信要求发送者等待接收者的响应(类似函数调用),实现简单但容易阻塞。异步通信允许发送者继续执行其他任务,通过回调或事件总线接收响应(类似事件驱动),更适合大规模分布式系统。

图表加载中…
python
# 基于发布-订阅模式的 Agent 消息总线
import asyncio
from typing import Callable, Dict, List, Any
import json
import time

class MessageBus:
    """异步消息总线,支持发布-订阅模式"""
    def __init__(self):
        self._subscribers: Dict[str, List[Callable]] = {}
        self._message_log: List[dict] = []
    
    def subscribe(self, topic: str, callback: Callable):
        """订阅某个话题"""
        if topic not in self._subscribers:
            self._subscribers[topic] = []
        self._subscribers[topic].append(callback)
    
    async def publish(self, topic: str, data: Any, sender: str = "system"):
        """发布消息到话题"""
        message = {
            "topic": topic, "data": data,
            "sender": sender, "timestamp": time.time(),
            "msg_id": f"{topic}_{len(self._message_log)}",
        }
        self._message_log.append(message)
        print(f"[BUS] 发布 [{topic}] 来自 {sender}")
        tasks = []
        for callback in self._subscribers.get(topic, []):
            tasks.append(asyncio.create_task(callback(message)))
        if tasks:
            results = await asyncio.gather(*tasks, return_exceptions=True)
            for r in results:
                if isinstance(r, Exception):
                    print(f"  [BUS] 回调异常: {r}")
        return message
    
    def get_history(self, topic: str = None) -> List[dict]:
        if topic:
            return [m for m in self._message_log if m["topic"] == topic]
        return self._message_log

# 使用示例
bus = MessageBus()

async def on_research_result(msg):
    print(f"  [Coder] 收到研究结果: {msg['data'][:50]}...")

async def on_code_complete(msg):
    print(f"  [Reviewer] 收到代码: {msg['data'][:50]}...")

bus.subscribe("research.done", on_research_result)
bus.subscribe("code.done", on_code_complete)
python
# Agent 之间的结构化消息协议
from enum import Enum
from typing import Optional, Dict, Any
from dataclasses import dataclass, asdict
import json

class MessageType(Enum):
    TASK_ASSIGN = "task_assign"       # 分配任务
    TASK_COMPLETE = "task_complete"   # 任务完成
    REQUEST_INFO = "request_info"     # 请求信息
    PROVIDE_INFO = "provide_info"     # 提供信息
    ERROR = "error"                   # 错误报告
    FEEDBACK = "feedback"             # 反馈/评价
    VOTE = "vote"                     # 投票

@dataclass
class ProtocolMessage:
    """标准化的 Agent 间通信协议"""
    msg_type: MessageType
    sender: str
    receiver: str  # "*" 表示广播
    content: str
    payload: Dict[str, Any] = None
    priority: int = 5  # 1-10,10 最高
    thread_id: Optional[str] = None  # 关联同一对话线程
    reply_to: Optional[str] = None   # 回复哪条消息
    
    def to_json(self) -> str:
        d = asdict(self)
        d["msg_type"] = self.msg_type.value
        return json.dumps(d, ensure_ascii=False)
    
    @classmethod
    def from_json(cls, json_str: str) -> "ProtocolMessage":
        d = json.loads(json_str)
        d["msg_type"] = MessageType(d["msg_type"])
        return cls(d)

# 示例:Task Manager 分配任务
assign_msg = ProtocolMessage(
    msg_type=MessageType.TASK_ASSIGN,
    sender="Manager",
    receiver="Researcher",
    content="请调研 Python 异步 HTTP 库的优缺点",
    payload={
        "task_id": "T-001",
        "deadline": "5min",
        "expected_format": "对比表格",
    },
    priority=7,
)
print(assign_msg.to_json())
python
# 黑板(Blackboard)模式的实现
class Blackboard:
    """共享黑板:Agent 们通过读写黑板进行间接通信"""
    def __init__(self):
        self._board: Dict[str, Any] = {}
        self._lock = asyncio.Lock()
        self._observers: Dict[str, List[Callable]] = {}
    
    async def write(self, key: str, value: Any, author: str):
        async with self._lock:
            old_value = self._board.get(key)
            self._board[key] = {
                "value": value, "author": author,
                "updated_at": time.time(), "version": 1
            }
            if old_value:
                self._board[key]["version"] = old_value.get("version", 0) + 1
            for callback in self._observers.get(key, []):
                await callback(key, value, author)
    
    def read(self, key: str) -> Optional[dict]:
        return self._board.get(key)
    
    def observe(self, key: str, callback: Callable):
        if key not in self._observers:
            self._observers[key] = []
        self._observers[key].append(callback)
    
    def snapshot(self) -> Dict[str, dict]:
        return {k: v for k, v in self._board.items()}

# Agent 通过黑板协作的示例
blackboard = Blackboard()

async def researcher_work():
    await blackboard.write("research_result", 
        {"frameworks": ["aiohttp", "httpx", "FastAPI"], "recommendation": "httpx"},
        "Researcher")

async def coder_work():
    result = blackboard.read("research_result")
    if result:
        print(f"基于 {result['value']['recommendation']} 进行编码...")
        await blackboard.write("code_draft", 
            "import httpx; async def fetch(url): ...", "Coder")
通信模式实现方式优势劣势适用场景

点对点直连

Agent 直接调用

延迟最低、语义清晰

耦合度高、扩展困难

固定 2-3 个 Agent 的小型系统

消息总线

Redis Pub/Sub、Kafka

解耦、支持广播

需要额外基础设施

中等规模、动态 Agent 数量

黑板模式

共享数据结构/数据库

完全解耦、灵活

协调复杂、可能冲突

高度动态、Agent 角色不确定

RPC/HTTP

REST/gRPC 调用

标准化、跨语言

网络延迟、依赖管理

分布式部署、跨服务通信

💡 一句话理解

消息总线设计实用建议:从简单开始——先用点对点直连验证核心逻辑,确认流程可行后再引入消息总线;定义清晰的话题命名规范——例如 "domain.action.status"(如 "code.review.passed"),避免话题爆炸导致订阅混乱;为每个话题设置 TTL(过期时间),防止僵尸消息堆积。

⚠️ 常见踩坑

通信协议设计的关键陷阱:① 消息爆炸——Agent 之间产生大量不必要的消息,消耗资源并引入噪声。缓解方案:消息优先级过滤、话题隔离。② 循环依赖——Agent A 等待 Agent B 的输出,而 Agent B 又需要 Agent A 的结果。缓解方案:DAG 依赖管理、超时机制。③ 消息丢失——异步通信中消息可能因为异常而丢失。缓解方案:消息持久化、确认机制 ACK、重试策略。

3角色分配:如何让每个 Agent 各司其职

Multi-Agent 系统中,角色(Role)定义了每个 Agent 的职责、能力和行为边界。好的角色设计是系统成功的基石。

角色设计的原则:首先,角色应该是正交的——每个角色有明确的职责范围,尽量避免重叠。重叠的角色不仅浪费计算资源,还可能导致重复工作或冲突。其次,角色的粒度要适中:太粗(一个角色承担太多职责)会退化为单 Agent;太细(每个微小功能一个角色)会导致通信开销爆炸。经验法则是:角色数量等于任务领域中需要专业知识的独立子领域数量。

系统 Prompt 工程:角色的行为和输出格式主要通过系统 Prompt 定义。一个完整的角色 Prompt 应该包括:角色名称和描述、专业领域和知识范围、行为约束和规则、输出格式要求、以及与其他角色交互的方式。例如,Reviewer 角色的 Prompt 不仅要告诉它是代码审查员,还要指定审查的维度(安全性、性能、可读性、测试覆盖率)和输出格式(问题列表加严重等级加修复建议)。

动态角色分配:在一些更高级的系统中,角色不是静态分配的,而是根据任务需求动态创建和调整。例如,当任务中出现新的子领域时,系统可以实例化一个新的专家 Agent;当某个角色完成工作后,可以将其资源释放或重新分配。

图表加载中…
python
# 角色工厂:动态创建不同类型的 Agent
from enum import Enum
from typing import Dict, Callable, Optional

class RoleType(Enum):
    MANAGER = "manager"
    RESEARCHER = "researcher"
    CODER = "coder"
    REVIEWER = "reviewer"
    TESTER = "tester"

# 角色 Prompt 模板库
ROLE_PROMPTS: Dict[RoleType, str] = {
    RoleType.MANAGER: """你是项目 Manager,负责分解复杂任务、分配给合适的专家、协调进度并整合结果。
你的核心职责:
1. 分析用户需求的复杂度和涉及的领域
2. 将任务拆分为可独立执行的子任务
3. 根据子任务类型选择合适的专家角色
4. 监控各子任务的进度,处理阻塞和冲突
5. 整合所有子任务的结果,生成最终输出

决策原则:宁可多分配专家,也不让一个角色承担过多职责。""",

    RoleType.RESEARCHER: """你是 Researcher,负责信息收集、技术调研和分析对比。
你的工作方式:
1. 明确调研的目标和范围
2. 从多个来源收集信息(文档、最佳实践、案例)
3. 对比分析不同方案的优缺点
4. 给出基于证据的推荐意见

输出格式:使用结构化的对比表格,列出方案、优势、劣势、适用场景。""",

    RoleType.CODER: """你是 Coder,一位拥有 10 年经验的资深软件工程师。
你的编码原则:
1. 代码必须清晰、可读、可维护
2. 遵循 SOLID 原则和设计模式
3. 包含必要的类型注解和文档字符串
4. 考虑边界条件和错误处理
5. 优先使用标准库和成熟框架

输出要求:完整的可运行代码,包含注释和示例用法。""",

    RoleType.REVIEWER: """你是 Reviewer,一位经验丰富的技术负责人,负责代码审查。
审查维度:
1. 安全性:是否存在注入、溢出、权限绕过等漏洞
2. 性能:是否有不必要的计算、内存泄漏、N+1 查询
3. 可读性:命名是否清晰、结构是否合理、注释是否充分
4. 可维护性:是否遵循 DRY 原则、是否易于扩展和修改
5. 测试覆盖:关键逻辑是否有对应的测试

输出格式:按严重程度列出问题(Critical/Major/Minor),每个问题附带修复建议。""",
}

class RoleFactory:
    """角色工厂:根据角色类型创建配置好的 Agent"""
    @staticmethod
    def create(role_type: RoleType, llm: Callable, kwargs) -> AgentRole:
        prompt = ROLE_PROMPTS.get(role_type)
        if not prompt:
            raise ValueError(f"未知角色类型: {role_type}")
        custom_prompt = kwargs.get("custom_prompt", "")
        if custom_prompt:
            prompt = f"{prompt}\n\n额外要求:{custom_prompt}"
        return AgentRole(role_type.value, prompt, llm)
python
# 动态角色分配算法
from collections import defaultdict

class DynamicRoleAssigner:
    """根据任务特征动态分配 Agent 角色"""
    
    def __init__(self):
        # 角色能力矩阵:角色 -> 擅长的任务类型
        self.role_capabilities = {
            RoleType.RESEARCHER: ["调研", "分析", "对比", "搜索", "总结"],
            RoleType.CODER: ["编码", "实现", "重构", "调试", "API"],
            RoleType.REVIEWER: ["审查", "评估", "安全", "性能", "优化"],
            RoleType.TESTER: ["测试", "验证", "覆盖率", "边界", "集成测试"],
            RoleType.MANAGER: ["协调", "规划", "整合", "分解"],
        }
    
    def assign_roles(self, task_description: str) -> Dict[str, list]:
        """分析任务描述,分配需要的角色"""
        words = set(task_description)
        
        # 计算每个角色与任务的匹配度
        role_scores = {}
        for role, keywords in self.role_capabilities.items():
            score = sum(1 for kw in keywords if any(kw in w for w in words))
            if score > 0:
                role_scores[role.value] = score
        
        # 总是需要 Manager
        role_scores.setdefault("manager", 1)
        
        # 按匹配度排序,选择匹配度 > 0 的角色
        assigned = {role: [] for role, score in role_scores.items() if score > 0}
        return assigned

# 使用示例
assigner = DynamicRoleAssigner()
task = "调研 Python 异步编程框架,对比 aiohttp 和 httpx,编写封装库并编写单元测试"
roles = assigner.assign_roles(task)
print(f"需要 {len(roles)} 个角色: {list(roles.keys())}")
角色职责关键能力输出常见 Prompt 关键词

Manager

任务分解、协调、决策

规划能力、全局视角

任务分配方案、最终整合

你是项目经理,负责分解任务并协调团队

Researcher

信息收集、分析、总结

搜索、归纳、对比

研究报告、对比分析

你是领域专家,负责调研和分析

Coder

代码编写、实现

编程、架构设计

可运行代码、注释

你是资深工程师,负责编写高质量代码

Reviewer

代码审查、质量评估

批判性思维、安全性意识

审查报告、修改建议

你是严格的代码审查员,关注安全、性能和可维护性

Tester

测试设计、执行验证

测试方法论、边界思维

测试用例、测试报告

你是 QA 工程师,负责设计全面的测试

💡 一句话理解

角色设计的实用技巧:先定义最小可行团队(通常 3-4 个角色足以覆盖大多数场景);给每个角色写一个自我介绍,如果两个角色的介绍有超过 30% 的重合度,考虑合并或重新划分;为每个角色定义清晰的交接点(什么条件下把控制权交给下一个角色)。

⚠️ 常见踩坑

角色膨胀是 Multi-Agent 项目最常见的失败模式之一。当每个子问题都催生一个新角色时,系统很快会变得难以维护。经验法则是:角色数量不超过 7 个,如果超过这个数,重新审视角色粒度——很多「角色」其实只是工具调用,不值得独立成 Agent。

4任务协调:从分解到交付的完整生命周期

任务协调是 Multi-Agent 系统的中枢神经系统。它负责将用户的宏观目标拆解为具体的子任务,分配给合适的 Agent,监控执行进度,处理异常情况,最终整合结果。

任务分解Task Decomposition):这是协调的第一步,也是最关键的一步。好的分解应该满足三个标准:独立性、完整性、可验证性。常用的分解策略包括:按功能领域分解(搜索、编码、审查)、按数据流分解(输入处理、核心计算、输出生成)、按依赖关系分解(DAG 拓扑排序)。

执行调度:分解完成后,需要决定子任务的执行顺序。如果子任务之间没有依赖关系,可以并行执行以节省时间。如果存在依赖(例如:编码需要研究结果),则需要按拓扑顺序执行。调度器需要动态跟踪每个子任务的状态(待执行、执行中、已完成、失败),并在所有前置任务完成后启动依赖任务。

结果整合:当所有子任务完成后,Manager Agent 需要将结果整合为最终输出。整合不仅仅是简单的拼接——可能需要消除冗余、解决不一致、格式化输出、以及在必要时要求某些子任务重新执行。

图表加载中…
python
# 基于 DAG 的任务调度器
from collections import defaultdict, deque
from typing import Dict, List, Set, Callable, Any
import asyncio

class TaskNode:
    """DAG 中的任务节点"""
    def __init__(self, task_id: str, description: str, 
                 executor: Callable, depends_on: List[str] = None):
        self.task_id = task_id
        self.description = description
        self.executor = executor
        self.depends_on = depends_on or []
        self.result: Any = None
        self.status = "pending"  # pending, running, done, failed

class DAGScheduler:
    """基于有向无环图的任务调度器"""
    def __init__(self):
        self.tasks: Dict[str, TaskNode] = {}
        self._adjacency: Dict[str, List[str]] = defaultdict(list)
        self._in_degree: Dict[str, int] = defaultdict(int)
    
    def add_task(self, task: TaskNode):
        self.tasks[task.task_id] = task
        for dep in task.depends_on:
            self._adjacency[dep].append(task.task_id)
            self._in_degree[task.task_id] += 1
        self._in_degree.setdefault(task.task_id, 0)
    
    def get_ready_tasks(self) -> List[str]:
        """获取所有可以执行的任务(依赖已满足)"""
        return [tid for tid, node in self.tasks.items()
                if node.status == "pending" and self._in_degree.get(tid, 0) == 0]
    
    async def execute(self) -> Dict[str, Any]:
        """执行所有任务(BFS 拓扑排序)"""
        ready = self.get_ready_tasks()
        while ready:
            batch_tasks = []
            for tid in ready:
                task = self.tasks[tid]
                dep_results = {dep: self.tasks[dep].result 
                              for dep in task.depends_on}
                batch_tasks.append(self._run_task(task, dep_results))
            
            results = await asyncio.gather(*batch_tasks, return_exceptions=True)
            
            for tid, result in zip(ready, results):
                if isinstance(result, Exception):
                    self.tasks[tid].status = "failed"
                    self.tasks[tid].result = f"Error: {result}"
                    print(f"  任务 {tid} 失败: {result}")
                else:
                    self.tasks[tid].status = "done"
                    self.tasks[tid].result = result
                    for successor in self._adjacency.get(tid, []):
                        self._in_degree[successor] -= 1
            
            ready = self.get_ready_tasks()
        
        pending = [t for t in self.tasks.values() if t.status == "pending"]
        if pending:
            print(f"  检测到循环依赖: {[t.task_id for t in pending]}")
        
        return {tid: t.result for tid, t in self.tasks.items()}
    
    async def _run_task(self, task: TaskNode, dep_results: dict) -> Any:
        task.status = "running"
        print(f"  执行任务: {task.description}")
        return await task.executor(dep_results) if asyncio.iscoroutinefunction(task.executor) else task.executor(dep_results)
python
# 使用 DAG 调度器构建 Multi-Agent 任务流
async def build_article_pipeline(llm):
    """构建一个文章生成的 DAG 任务流"""
    scheduler = DAGScheduler()
    
    # T1: 研究(无依赖)
    async def research_task(deps):
        researcher = AgentRole("Researcher", ROLE_PROMPTS[RoleType.RESEARCHER], llm)
        msg = AgentMessage("Scheduler", "调研 2026 年 AI Agent 最新进展", "request")
        return researcher.process(msg).content
    scheduler.add_task(TaskNode("research", "调研", research_task))
    
    # T2: 大纲(依赖研究)
    async def outline_task(deps):
        result = deps.get("research", "")
        msg = AgentMessage("Scheduler", f"基于以下研究写文章大纲:\n{result[:200]}", "request")
        manager = AgentRole("Manager", ROLE_PROMPTS[RoleType.MANAGER], llm)
        return manager.process(msg).content
    scheduler.add_task(TaskNode("outline", "大纲", outline_task, ["research"]))
    
    # T3: 正文(依赖大纲)
    async def body_task(deps):
        outline = deps.get("outline", "")
        msg = AgentMessage("Scheduler", f"根据以下大纲撰写正文:\n{outline[:200]}", "request")
        coder = AgentRole("Coder", ROLE_PROMPTS[RoleType.CODER], llm)
        return coder.process(msg).content
    scheduler.add_task(TaskNode("body", "正文", body_task, ["outline"]))
    
    # T4: 审查(依赖正文)
    async def review_task(deps):
        body = deps.get("body", "")
        msg = AgentMessage("Scheduler", f"审查以下文章:\n{body[:300]}", "request")
        reviewer = AgentRole("Reviewer", ROLE_PROMPTS[RoleType.REVIEWER], llm)
        return reviewer.process(msg).content
    scheduler.add_task(TaskNode("review", "审查", review_task, ["body"]))
    
    results = await scheduler.execute()
    return results

# 执行
# results = asyncio.run(build_article_pipeline(some_llm))
# print(results)  # {'research': ..., 'outline': ..., 'body': ..., 'review': ...}
调度策略并行度复杂度适用场景示例

完全并行

最高

子任务完全独立

同时搜索多个关键词

流水线(Pipeline)

线性依赖链

研究 -> 编码 -> 审查

DAG 调度

复杂依赖关系

多输入多输出的数据处理

动态优先级

可变

不确定执行时间

根据 Agent 负载动态调整

工作窃取(Work Stealing)

最高

任务量不均衡

空闲 Agent 帮忙碌 Agent

💡 一句话理解

任务编排实战技巧:为每个子任务设置独立的超时限制——如果一个 Agent 超时,不要阻塞整个流程,而是标记该任务失败并继续执行其他独立任务;使用可视化 DAG 来调试复杂的依赖关系——LangGraphRuflo 都提供了可视化工具,让你直观看到任务执行路径和瓶颈。

⚠️ 常见踩坑

任务协调的常见陷阱:① 过度分解——把任务拆得太细,导致 Agent 之间通信开销超过计算收益。经验法则是每个子任务应该至少需要 30 秒以上的计算量。② 隐藏依赖——某些子任务看似独立,实际共享隐式依赖(如同一个外部 API)。调度器需要识别这类依赖,避免并发请求导致限流或数据不一致。③ 结果爆炸——多个 Agent 的输出加起来远超过原始任务的合理规模。需要在整合阶段进行摘要和裁剪。

5冲突解决:当 Agent 们意见不合时

Multi-Agent 系统中,冲突是不可避免的。不同的 Agent 可能对同一问题给出不同答案,可能对任务的执行方式有分歧,甚至可能在资源使用上产生竞争。有效的冲突解决机制是系统可靠性的关键保障。

冲突的类型:首先是内容冲突——例如 Researcher 推荐方案 A,Coder 认为方案 B 更好。这类冲突本质上是哪个方案更优的判断问题。其次是资源冲突——多个 Agent 同时请求同一个受限资源(如 API 密钥的速率限制、同一数据库的写入锁)。第三是流程冲突——Manager 要求先做 X 再做 Y,但某个 Agent 认为先做 Y 更高效。

解决策略:对于内容冲突,最常用的方法是投票机制(多个 Agent 各自给出判断,取多数意见)和仲裁机制(由一个更高权限的 Agent 做出最终决定)。对于资源冲突,需要实现资源管理器和锁机制。对于流程冲突,Manager 应该保留最终决策权,但允许 Agent 提出异议(类似人类团队中的强烈反对但服从模式)。

冲突记录与学习:一个好的 Multi-Agent 系统不仅解决冲突,还会记录冲突的历史。通过分析过去的冲突模式,系统可以优化角色定义、改进任务分解、甚至自动调整 Agent 的行为策略。

图表加载中…
python
# 加权投票冲突解决机制
from typing import List, Dict, Any
from collections import Counter

class WeightedVoter:
    """基于权重的多 Agent 投票系统"""
    def __init__(self):
        self._weights: Dict[str, float] = {}
        self._votes: Dict[str, Any] = {}
    
    def set_weight(self, agent_name: str, weight: float):
        """设置 Agent 的投票权重"""
        self._weights[agent_name] = max(0.0, weight)
    
    def cast_vote(self, agent_name: str, vote: Any, confidence: float = 1.0):
        """Agent 投票"""
        weight = self._weights.get(agent_name, 1.0)
        effective_weight = weight * confidence
        self._votes[agent_name] = {"vote": vote, "weight": effective_weight}
    
    def tally(self) -> Dict[str, Any]:
        """统计投票结果"""
        score_map: Dict[str, float] = {}
        vote_map: Dict[str, str] = {}
        for agent, data in self._votes.items():
            vote_key = str(data["vote"])
            vote_map[vote_key] = data["vote"]
            score_map[vote_key] = score_map.get(vote_key, 0) + data["weight"]
        
        if not score_map:
            return {"winner": None, "scores": {}, "total_votes": 0}
        
        winner_key = max(score_map, key=score_map.get)
        total_weight = sum(score_map.values())
        
        return {
            "winner": vote_map[winner_key],
            "scores": {k: round(v / total_weight, 3) for k, v in score_map.items()},
            "total_votes": len(self._votes),
            "consensus": max(score_map.values()) / total_weight > 0.7,
        }

# 使用示例:三个 Agent 对技术方案投票
voter = WeightedVoter()
voter.set_weight("Researcher", 0.3)
voter.set_weight("Coder", 0.5)
voter.set_weight("Reviewer", 0.4)

voter.cast_vote("Researcher", "方案A", 0.8)
voter.cast_vote("Coder", "方案B", 0.9)
voter.cast_vote("Reviewer", "方案A", 0.7)

result = voter.tally()
print(f"胜出: {result['winner']}, 共识: {result['consensus']}")
python
# 辩论式冲突解决(Debate-based Resolution)
class DebateModerator:
    """辩论主持人:引导 Agent 进行结构化辩论"""
    def __init__(self, llm, max_rounds: int = 3):
        self.llm = llm
        self.max_rounds = max_rounds
        self.transcript: List[Dict] = []
    
    def moderate(self, topic: str, positions: Dict[str, str]) -> str:
        """主持辩论
        topic: 争议话题
        positions: {agent_name: 立场}
        """
        agent_names = list(positions.keys())
        print(f"辩论开始:{topic}")
        print(f"   参与方: {', '.join(agent_names)}")
        
        for round_num in range(self.max_rounds):
            print(f"\n--- 第 {round_num + 1} 轮 ---")
            for agent_name in agent_names:
                history = "\n".join(
                    f"[{t['agent']}]: {t['argument'][:100]}" 
                    for t in self.transcript[-4:]
                )
                prompt = f"""辩论话题:{topic}
你的立场:{positions[agent_name]}
之前的辩论记录:
{history}

请用 3-5 句话陈述你的观点。如果是后续轮次,请回应其他参与方的论点。"""
                argument = self.llm(prompt)
                self.transcript.append({
                    "agent": agent_name, "round": round_num + 1,
                    "argument": argument
                })
                print(f"  [{agent_name}]: {argument[:80]}...")
        
        full_transcript = "\n".join(
            f"[{t['agent']} R{t['round']}]: {t['argument']}" 
            for t in self.transcript
        )
        verdict_prompt = f"""作为公正的裁判,请根据以下辩论内容做出裁决。
辩论话题:{topic}
{full_transcript}

请给出裁决结果,说明哪一方的论点更有说服力,为什么。"""
        return self.llm(verdict_prompt)
python
# 冲突日志与学习系统
import json
from datetime import datetime

class ConflictLogger:
    """记录和分析 Agent 之间的冲突"""
    def __init__(self, log_file: str = "conflicts.json"):
        self.log_file = log_file
        self.conflicts = self._load()
    
    def _load(self) -> List[dict]:
        try:
            with open(self.log_file) as f:
                return json.load(f)
        except FileNotFoundError:
            return []
    
    def log_conflict(self, conflict_type: str, agents: List[str],
                     descriptions: Dict[str, str], resolution: str,
                     winner: str = None):
        entry = {
            "timestamp": datetime.now().isoformat(),
            "type": conflict_type,
            "agents": agents,
            "positions": descriptions,
            "resolution": resolution,
            "winner": winner,
        }
        self.conflicts.append(entry)
        self._save()
    
    def _save(self):
        with open(self.log_file, 'w') as f:
            json.dump(self.conflicts, f, ensure_ascii=False, indent=2)
    
    def get_patterns(self) -> Dict[str, Any]:
        """分析冲突模式"""
        type_counts = Counter(c["type"] for c in self.conflicts)
        agent_conflict_count = Counter()
        for c in self.conflicts:
            for agent in c["agents"]:
                agent_conflict_count[agent] += 1
        
        return {
            "total_conflicts": len(self.conflicts),
            "by_type": dict(type_counts),
            "most_conflicted_agents": dict(agent_conflict_count.most_common(3)),
            "recent_trend": "increasing" if len(self.conflicts) > 5 
                           and self.conflicts[-3:] else "stable",
        }

# 使用
logger = ConflictLogger()
logger.log_conflict(
    conflict_type="content_disagreement",
    agents=["Researcher", "Coder"],
    descriptions={
        "Researcher": "推荐使用 httpx,性能更好",
        "Coder": "推荐使用 aiohttp,生态更成熟"
    },
    resolution="加权投票,Coder 权重更高(实际开发经验)",
    winner="Coder"
)
print(json.dumps(logger.get_patterns(), indent=2, ensure_ascii=False))
策略原理优势劣势适用场景

多数投票

多个 Agent 各自判断,取多数

简单公平、容错

可能出现平票、成本高

客观判断、分类问题

权重投票

按 Agent 专业度加权投票

尊重专业意见

权重设定主观

不同专业度的 Agent

仲裁者模式

指定一个 Manager 做最终决定

决策快速、责任明确

仲裁者可能判断失误

时间敏感、需要快速决策

辩论模式

Agent 互相辩论,达成共识

充分讨论、结果高质量

耗时长、可能无法达成共识

重要决策、需要深度分析

置信度比较

每个 Agent 给出答案加置信度,选最高

量化不确定性

LLM 的置信度不可靠

概率判断、风险评估

💡 一句话理解

冲突解决的实用建议:预防胜于治疗——在角色定义阶段就明确边界条件,减少模糊地带;建立升级路径——低级别冲突由 Agent 自行解决,中级别引入投票,高级别交给 Manager 仲裁;允许建设性冲突——不是所有分歧都是坏事,适度的观点碰撞往往能产生更好的结果。

⚠️ 常见踩坑

辩论式冲突解决虽然能产生高质量结果,但代价是极高的 token 消耗和延迟。一次 3 轮、3 方参与的辩论可能消耗超过单 Agent 方案 10 倍的 token。只在关键决策(如架构选型、安全审查)时使用辩论,常规冲突用投票或仲裁更高效。

6典型 Multi-Agent 架构深入解析

当前主流的 Multi-Agent 框架各有不同的设计哲学和架构风格。理解它们的底层架构,能帮助你在选择框架时做出更明智的决策。

CrewAI 的角色-任务-流程架构:CrewAI 采用最直观的设计——你定义 Role(角色)、Task(任务)、Process(流程),然后框架自动管理 Agent 之间的协作。它的 Process 支持 Sequential(顺序执行)和 Hierarchical(层级管理)两种模式。CrewAI 的优势在于极简的 API 设计,但灵活性和自定义能力相对有限。

AutoGen 的对话驱动架构:AutoGen 的核心思想是:Agent 之间的协作本质上是对话。每个 Agent 可以与其他 Agent 进行多轮对话,通过对话来交换信息、协商方案、解决问题。AutoGen 支持非常灵活的多 Agent 拓扑结构:可以是星形(中心 Agent 协调)、环形(Agent 链式传递)、全连接(任何 Agent 之间直接通信)。

ReAct 架构的多 Agent 扩展:虽然 ReAct 最初是为单 Agent 设计的(Thought 到 Action 到 Observation),但在 Multi-Agent 场景下,每个 Agent 都可以独立执行 ReAct 循环,同时通过共享的观察空间进行协作。这种架构的优势是每个 Agent 都保持自主性,同时又能感知到全局状态。

图表加载中…
python
# CrewAI 风格的架构实现(简化版)
from typing import List, Optional
from dataclasses import dataclass, field

@dataclass
class Task:
    description: str
    agent: Optional[str] = None
    expected_output: str = ""
    is_complete: bool = False
    result: str = ""

@dataclass
class Crew:
    """CrewAI 风格的团队编排"""
    agents: Dict[str, AgentRole] = field(default_factory=dict)
    tasks: List[Task] = field(default_factory=list)
    process: str = "sequential"  # sequential | hierarchical
    
    def add_agent(self, agent: AgentRole):
        self.agents[agent.name] = agent
    
    def add_task(self, task: Task):
        self.tasks.append(task)
    
    def kickoff(self) -> str:
        """启动团队执行任务"""
        if self.process == "sequential":
            return self._sequential_process()
        elif self.process == "hierarchical":
            return self._hierarchical_process()
        else:
            raise ValueError(f"未知流程: {self.process}")
    
    def _sequential_process(self) -> str:
        """顺序执行:按任务列表依次执行"""
        context = ""
        for task in self.tasks:
            if task.agent and task.agent in self.agents:
                agent = self.agents[task.agent]
                input_text = f"{task.description}\n\n上下文信息:\n{context}" if context else task.description
                msg = AgentMessage("Crew", input_text, "request")
                result = agent.process(msg).content
                task.result = result
                task.is_complete = True
                context += f"\n[{task.agent} 的结果]: {result[:200]}"
                print(f"  完成: {task.description[:40]}...")
        return context
    
    def _hierarchical_process(self) -> str:
        """层级执行:Manager Agent 协调所有任务"""
        manager = self.agents.get("manager")
        if not manager:
            return self._sequential_process()
        
        task_summary = "\n".join(
            f"- [{t.agent}] {t.description}" for t in self.tasks
        )
        msg = AgentMessage("Crew", f"请协调完成以下任务:\n{task_summary}", "request")
        return manager.process(msg).content
python
# AutoGen 风格的对话驱动架构
class ConversationalAgent:
    """AutoGen 风格的对话式 Agent"""
    def __init__(self, name: str, system_prompt: str, llm, 
                 can_terminate: bool = False, can_ask_human: bool = False):
        self.name = name
        self.system_prompt = system_prompt
        self.llm = llm
        self.can_terminate = can_terminate
        self.can_ask_human = can_ask_human
        self.chat_history: List[Dict] = []
    
    def receive(self, message: str, sender: str) -> str:
        """接收消息并回复"""
        self.chat_history.append({"role": "user", "content": message, "sender": sender})
        
        context = "\n".join(
            f"[{m['sender']}]: {m['content']}" for m in self.chat_history[-10:]
        )
        prompt = f"{self.system_prompt}\n\n对话历史:\n{context}\n\n请回复:"
        
        reply = self.llm(prompt)
        self.chat_history.append({"role": "assistant", "content": reply, "sender": self.name})
        return reply
    
    def is_termination(self, message: str) -> bool:
        """判断对话是否应该终止"""
        if not self.can_terminate:
            return False
        termination_signals = ["TERMINATE", "任务完成", "没有问题了", "done"]
        return any(signal in message.upper() for signal in termination_signals)

class GroupChat:
    """AutoGen 风格的群聊"""
    def __init__(self, agents: List[ConversationalAgent], max_rounds: int = 10):
        self.agents = {a.name: a for a in agents}
        self.max_rounds = max_rounds
        self.messages: List[Dict] = []
    
    def run(self, initial_message: str, speaker_order: List[str] = None) -> str:
        """运行群聊"""
        self.messages.append({"sender": "User", "content": initial_message})
        current_speaker_idx = 0
        
        for round_num in range(self.max_rounds):
            if speaker_order:
                speaker_name = speaker_order[current_speaker_idx % len(speaker_order)]
                current_speaker_idx += 1
            else:
                speaker_name = list(self.agents.keys())[round_num % len(self.agents)]
            
            agent = self.agents[speaker_name]
            last_message = self.messages[-1]["content"]
            
            reply = agent.receive(last_message, self.messages[-1]["sender"])
            self.messages.append({"sender": speaker_name, "content": reply})
            print(f"  [{speaker_name}]: {reply[:60]}...")
            
            if agent.is_termination(reply):
                print(f"  {speaker_name} 发起终止")
                break
        
        return self.messages[-1]["content"]
python
# Multi-ReAct 架构:多个 Agent 共享观察空间
class SharedObservationSpace:
    """多个 ReAct Agent 共享的观察空间"""
    def __init__(self):
        self.observations: List[Dict] = []
        self.global_state: Dict[str, Any] = {}
    
    def publish(self, agent_name: str, thought: str, action: str, observation: str):
        """Agent 发布自己的 ReAct 步骤"""
        entry = {
            "agent": agent_name, "thought": thought,
            "action": action, "observation": observation,
            "timestamp": time.time(),
        }
        self.observations.append(entry)
        self.global_state[f"{agent_name}_last_observation"] = observation
    
    def get_shared_context(self, agent_name: str, last_n: int = 5) -> str:
        """获取共享上下文(不包括自己的最新步骤)"""
        others = [o for o in self.observations[-last_n*2:] if o["agent"] != agent_name]
        if not others:
            return "暂无其他 Agent 的观察结果。"
        return "\n".join(
            f"[{o['agent']}] 行动: {o['action']}\n  结果: {o['observation'][:100]}"
            for o in others
        )
    
    def get_global_state(self) -> Dict[str, Any]:
        return dict(self.global_state)

class MultiReactAgent(AgentRole):
    """执行 ReAct 循环的 Agent,可以感知其他 Agent 的观察"""
    def __init__(self, name: str, system_prompt: str, llm, shared_space: SharedObservationSpace):
        super().__init__(name, system_prompt, llm)
        self.shared_space = shared_space
    
    def react_step(self, goal: str, max_steps: int = 5) -> str:
        for step in range(max_steps):
            shared_context = self.shared_space.get_shared_context(self.name)
            
            prompt = f"""{self.system_prompt}

目标: {goal}
其他 Agent 的观察:
{shared_context}

请用 ReAct 格式回复:
Thought: <思考>
Action: <行动>
Observation: <你观察到的结果>"""
            
            response = self.llm(prompt)
            
            thought = self._extract(response, "Thought:")
            action = self._extract(response, "Action:")
            observation = self._extract(response, "Observation:")
            
            self.shared_space.publish(self.name, thought, action, observation)
            
            if "done" in observation.lower() or not action:
                return observation
        
        return "达到最大步数"
    
    def _extract(self, text: str, prefix: str) -> str:
        for line in text.split("\n"):
            if line.startswith(prefix):
                return line[len(prefix):].strip()
        return ""
框架核心理念通信方式编排模式学习曲线生态

CrewAI

角色-任务-流程

共享上下文

Sequential / Hierarchical

快速增长中

AutoGen

对话即协作

Agent 对话

灵活拓扑(星形/环形/全连接)

中等

Microsoft 支持

LangGraph

图结构工作流

状态传递

有向图(循环、分支)

中等

LangChain 生态

OpenAI Swarm

极简协作

Handoff 协议

Agent 之间切换

官方实验性

MetaGPT

软件公司模拟

结构化消息

SOP 标准流程

较陡

学术导向

💡 一句话理解

框架迁移的渐进策略:如果你已经在使用某个框架(如 CrewAI),不需要完全重写。可以先在新功能中试用目标框架(如 LangGraph),积累团队经验,再逐步迁移核心逻辑。同时关注框架之间的互操作性——很多框架都支持标准的 OpenAI API 接口,可以共享底层的 LLM 调用层。

⚠️ 常见踩坑

框架选择的常见误区:追求最新最火——新框架可能功能丰富但不够稳定,生产环境应优先选择经过验证的方案。过度工程化——简单任务不需要复杂框架,如果 3 个 Agent 顺序执行就能解决问题,不要引入完整的消息总线和 DAG 调度器。忽略团队熟悉度——框架的学习曲线直接影响开发效率。如果团队已经熟悉 LangChain,切换到 AutoGen 的隐性成本可能高于框架本身的优势。

7实战案例:构建一个完整的 Multi-Agent 代码评审系统

理论说得再多,不如一个完整案例来得直观。我们将设计一个 Multi-Agent 代码评审系统,模拟真实软件开发流程中的代码评审环节。

系统设计:这个系统包含四个 Agent——Coder、StaticAnalyzer、SecurityReviewer、TechLead,分别对应开发者、静态分析、安全审查与技术负责人四种角色。

工作流程:Coder 提交代码后,StaticAnalyzer 和 SecurityReviewer 并行执行分析(因为它们之间没有依赖关系),两者的结果汇总后交给 TechLead,TechLead 综合所有信息生成最终的评审报告。这个过程完美展示了 Multi-Agent 系统的并行处理和专业分工优势。

技术实现:我们将使用前面章节介绍的通信协议(消息总线)、角色定义(角色工厂)、任务调度(DAG 调度器)来构建完整的系统。这个案例把所有概念串联起来,展示如何从零构建一个实用的 Multi-Agent 应用。

图表加载中…
python
# 静态分析 Agent(模拟 AST 级别的代码分析)
import ast
import re
from typing import List, Dict

class StaticAnalyzerAgent:
    """模拟静态分析的 Agent"""
    def __init__(self, llm):
        self.llm = llm
        self.name = "StaticAnalyzer"
    
    def analyze(self, code: str) -> Dict:
        """执行多维度代码分析"""
        issues = []
        
        # 规则 1: 检查函数长度(超过 50 行的大函数)
        try:
            tree = ast.parse(code)
            for node in ast.walk(tree):
                if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
                    lines = node.end_lineno - node.lineno + 1 if hasattr(node, 'end_lineno') else 0
                    if lines > 50:
                        issues.append({
                            "line": node.lineno, "type": "complexity",
                            "severity": "warning",
                            "message": f"函数 '{node.name}' 有 {lines} 行,建议拆分为更小的函数"
                        })
        except SyntaxError as e:
            issues.append({"line": e.lineno, "type": "syntax", "severity": "error",
                          "message": f"语法错误: {e.msg}"})
        
        # 规则 2: 检查 TODO/FIXME/HACK 注释
        for i, line in enumerate(code.split("\n"), 1):
            if re.search(r'#.*\b(TODO|FIXME|HACK|XXX)\b', line):
                issues.append({
                    "line": i, "type": "code_smell",
                    "severity": "info",
                    "message": f"发现标记注释: {line.strip()}"
                })
        
        # 规则 3: LLM 辅助语义分析
        if issues:
            llm_prompt = f"""分析以下 Python 代码的质量问题:

代码片段:
{code[:2000]}

已发现的机械检查问题:
{issues}

请补充:命名是否清晰、是否有更好的设计模式、整体代码结构建议"""
            llm_feedback = self.llm(llm_prompt)
        else:
            llm_feedback = "代码结构良好,未发现明显质量问题。"
        
        return {
            "agent": self.name,
            "total_issues": len(issues),
            "issues": issues,
            "llm_feedback": llm_feedback,
            "status": "passed" if not any(i["severity"] == "error" for i in issues) else "failed"
        }
python
# 安全审查 Agent
class SecurityReviewerAgent:
    """安全漏洞扫描 Agent"""
    def __init__(self, llm):
        self.llm = llm
        self.name = "SecurityReviewer"
        # 常见的安全模式
        self.dangerous_patterns = [
            (r'eval\s*\(', "使用 eval() 可能导致代码注入"),
            (r'exec\s*\(', "使用 exec() 可能导致代码注入"),
            (r'os\.system\s*\(', "os.system() 可能导致命令注入"),
            (r'subprocess\.call\s*\(.*shell\s*=\s*True', "shell=True 可能导致命令注入"),
            (r'(?i)password\s*=\s*["\x27][^"\x27]+["\x27]', "硬编码密码"),
            (r'(?i)api[_-]?key\s*=\s*["\x27][^"\x27]+["\x27]', "硬编码 API 密钥"),
            (r'(?i)secret\s*=\s*["\x27][^"\x27]+["\x27]', "硬编码密钥"),
            (r'pickle\.loads?', "pickle 反序列化可能导致远程代码执行"),
            (r'marshal\.loads?', "marshal 反序列化可能不安全"),
        ]
    
    def review(self, code: str) -> Dict:
        vulnerabilities = []
        
        # 规则扫描
        for pattern, message in self.dangerous_patterns:
            for i, line in enumerate(code.split("\n"), 1):
                if re.search(pattern, line):
                    vulnerabilities.append({
                        "line": i, "severity": "critical" if "注入" in message else "high",
                        "type": "security", "description": message,
                        "code": line.strip()
                    })
        
        # LLM 深度安全分析
        llm_prompt = f"""作为安全专家,审查以下代码的安全漏洞:

代码片段:
{code[:2000]}

请检查:
1. 输入验证是否充分
2. 是否存在 SQL 注入、XSS、CSRF 风险
3. 认证和授权逻辑是否安全
4. 敏感数据处理是否合规
5. 依赖包是否有已知漏洞

列出发现的所有安全问题,按严重程度排序。"""
        llm_security_report = self.llm(llm_prompt)
        
        critical_count = sum(1 for v in vulnerabilities if v["severity"] == "critical")
        return {
            "agent": self.name,
            "vulnerabilities": vulnerabilities,
            "llm_security_report": llm_security_report,
            "critical_count": critical_count,
            "status": "blocked" if critical_count > 0 else "passed"
        }
python
# 技术负责人 Agent + 完整的代码评审流程
class TechLeadAgent:
    """技术负责人:综合分析并做出评审决策"""
    def __init__(self, llm):
        self.llm = llm
        self.name = "TechLead"
    
    def review(self, code: str, static_result: Dict, security_result: Dict) -> Dict:
        """综合评审"""
        all_issues = []
        all_issues.extend(static_result.get("issues", []))
        all_issues.extend(security_result.get("vulnerabilities", []))
        
        critical = [i for i in all_issues if i.get("severity") in ("critical", "error")]
        warnings = [i for i in all_issues if i.get("severity") == "warning"]
        infos = [i for i in all_issues if i.get("severity") in ("info", "minor")]
        
        # 自动生成决策
        if critical:
            decision = "REJECT"
            reason = f"发现 {len(critical)} 个严重问题,必须修复后才能合并"
        elif len(warnings) > 5:
            decision = "REQUEST_CHANGES"
            reason = f"发现 {len(warnings)} 个警告,建议修改后重新提交"
        elif len(all_issues) <= 3:
            decision = "APPROVE"
            reason = "代码质量良好,可以合并"
        else:
            decision = "APPROVE_WITH_COMMENTS"
            reason = f"可以合并,但有 {len(all_issues)} 个建议改进项"
        
        # LLM 生成详细评审报告
        report_prompt = f"""作为技术负责人,请生成代码评审报告。

代码片段:
{code[:1500]}

静态分析结果({static_result['total_issues']} 个问题):
{str(static_result.get('issues', []))[:500]}

安全审查结果({security_result['critical_count']} 个严重漏洞):
{str(security_result.get('vulnerabilities', []))[:500]}

安全分析报告:
{security_result.get('llm_security_report', 'N/A')[:500]}

请生成格式化的评审报告,包括:
1. 总体评价(1-2 句话)
2. 必须修复的问题(按优先级排序)
3. 建议改进的方面
4. 最终结论:APPROVE / REQUEST_CHANGES / REJECT"""
        report = self.llm(report_prompt)
        
        return {
            "agent": self.name,
            "decision": decision,
            "reason": reason,
            "summary": {
                "critical": len(critical),
                "warnings": len(warnings),
                "info": len(infos),
                "total": len(all_issues),
            },
            "report": report,
        }

# ====== 完整的代码评审 Pipeline ======
def code_review_pipeline(code: str, llm) -> Dict:
    """完整的 Multi-Agent 代码评审流程"""
    print("开始 Multi-Agent 代码评审...")
    
    # Step 1: 并行执行静态分析和安全审查
    static_analyzer = StaticAnalyzerAgent(llm)
    security_reviewer = SecurityReviewerAgent(llm)
    
    print("  静态分析中...")
    static_result = static_analyzer.analyze(code)
    print(f"    发现 {static_result['total_issues']} 个问题")
    
    print("  安全审查中...")
    security_result = security_reviewer.review(code)
    print(f"    发现 {security_result['critical_count']} 个严重漏洞")
    
    # Step 2: TechLead 综合评审
    print("  TechLead 评审中...")
    tech_lead = TechLeadAgent(llm)
    final_result = tech_lead.review(code, static_result, security_result)
    
    print(f"\n评审结果: {final_result['decision']}")
    print(f"   原因: {final_result['reason']}")
    print(f"   问题统计: {final_result['summary']}")
    
    return final_result
Agent职责检查项输出格式优先级

StaticAnalyzer

代码质量分析

PEP8 规范、圈复杂度、重复代码、未使用变量

JSON: issues 列表

SecurityReviewer

安全漏洞扫描

SQL 注入、XSS、硬编码密钥、不安全的反序列化

JSON: vulnerabilities 列表

最高

TechLead

综合评审决策

整合所有分析结果、权衡问题严重程度、给出最终建议

文本: 评审报告加通过或拒绝决定

Coder

代码提交与修改

根据评审意见修改代码、回复评审意见

文本: 修改说明加更新后的代码

💡 一句话理解

Multi-Agent 实战开发的最佳实践:先用单 Agent 验证核心逻辑——确保每个 Agent 的独立功能正确,再组合成 Multi-Agent 系统;为每个 Agent 编写单元测试——Agent 本身也是代码,需要测试覆盖;设置超时和重试——Agent 调用 LLM 可能失败或超时,需要有合理的容错机制;监控 Agent 的 token 消耗——Multi-Agent 系统的 token 消耗是单 Agent 的 N 倍,需要监控和优化。

⚠️ 常见踩坑

代码评审系统的成本控制:在生产环境中,每次代码提交触发一次完整的 Multi-Agent 评审(StaticAnalyzer + SecurityReviewer + TechLead),token 消耗可能高达数万次。优化策略:对增量代码(diff)而非全量代码进行评审;缓存历史评审结果,避免重复审查未修改的文件;对低风险变更(如文档更新)跳过安全审查。

8更新于 2026-05-16:集体决策架构与 Agent 编排新范式

2025-2026 年间,Multi-Agent 系统的研究方向从简单的任务分解与协作走向了更加复杂的集体决策和层次化治理架构。这一转变的驱动力来自于一个核心观察:多个 Agent 简单堆叠并不等于更好的结果——如果没有合理的决策机制,多 Agent 系统可能比单 Agent 更差(更多冲突、更高成本、更低一致性)。

CHAL 框架(Collective Hierarchical Agent Learning)是 2026 年初提出的最新多 Agent 集体决策架构。它的核心创新在于:将多 Agent 系统组织为层次化的决策树,每一层负责不同粒度的决策。底层 Agent 处理具体执行(如代码生成、数据分析),中层 Agent 负责任务协调与冲突解决,顶层 Agent 负责全局目标对齐与资源分配。这种层次结构使得大规模 Agent 集群(100+ Agent)能够在保持决策一致性的同时最大化并行度。

CHAL 的层次化决策机制详解:CHAL 将决策过程分为三个层次。执行层(Execution Layer)由大量专用 Agent组成,每个 Agent 拥有特定领域的专业知识(如 Python 编程、数据库查询、API 调用)。这些 Agent 的输出是候选方案(Candidate Solutions),而不是最终决策。协调层(Coordination Layer)由协调 Agent(Coordinator Agents)组成,每个协调 Agent 管理一组执行层 Agent。协调层的职责是:整合执行层的候选方案、消除冲突、评估每个方案的可行性和一致性,然后向上层提交综合建议。治理层(Governance Layer)由少数治理 Agent(Governor Agents)组成,它们接收协调层的综合建议,做出最终决策。治理层的关键能力是元认知——它们不仅评估方案的内容,还评估提出该方案的 Agent 群体的可靠性。

TradingAgents 框架代表了 Multi-Agent 在金融领域的最新实践:多个 Agent 分别扮演宏观分析师(分析市场趋势)、技术分析师(分析价格模式)、风险管理师(评估投资组合风险)、交易执行师(执行买卖操作)。这四个 Agent 通过辩论机制(Debate Mechanism)达成共识:每个 Agent 独立提出交易建议,然后通过多轮辩论逐步收敛到一个综合决策。实验表明,这种多 Agent 辩论策略的年化收益率显著优于单 Agent 策略,更重要的是最大回撤(Max Drawdown)降低了约 40%。

TradingAgents 的辩论机制原理:辩论不是简单的投票(每个 Agent 投一票,多数胜出),而是结构化的论证过程。在每一轮辩论中,每个 Agent 需要:提出自己的交易建议和支撑论据(为什么选择这个策略);对其他 Agent 的建议提出质疑(指出潜在风险和不合理之处);根据其他 Agent 的反馈调整自己的建议。经过 3-5 轮辩论后,Agent 们的建议会收敛到一个共识区域。这种辩论机制的核心优势是:不同 Agent 的知识盲区可以互相补充——宏观分析师可能忽略技术面的细节,技术分析师可能忽略宏观面的风险,通过辩论,这些盲区被显式地暴露和弥补。

Ruflo(GitHub 51k 星)是目前Agent 编排领域的领跑者。它的核心设计理念是:Agent 编排的本质是工作流管理——将 Agent 的执行流程定义为一个有向无环图(DAG),每个节点是一个 Agent 的执行步骤,边表示数据依赖和执行顺序。Ruflo 的创新在于:支持动态重规划——当某个 Agent 执行失败或结果不满足预期时,系统可以动态调整 DAG 结构(添加新的 Agent 节点、改变执行顺序),而不是简单地重试或报错。这种自愈能力是 Ruflo 能够在大规模生产环境中稳定运行的关键。

Agent 编排 vs 传统工作流引擎的根本区别:传统工作流引擎的节点是确定性的函数调用,而 Agent 编排的节点是概率性的 LLM 调用。这意味着 Agent 编排需要处理执行结果的不确定性——同一个 Agent 在不同时间对同一个输入可能给出不同的输出。因此,Agent 编排系统需要内置结果验证和重试策略,而传统工作流引擎不需要。

Agent 编排的三种范式对比:

静态编排(Static Orchestration):DAG 结构在任务开始前完全确定,执行过程中不可改变。优点是简单、可预测;缺点是缺乏灵活性——当某个 Agent 失败时,整个流程可能崩溃。

动态编排(Dynamic Orchestration):DAG 结构在执行过程中可以动态调整。Ruflo 属于这一类。优点是灵活、容错能力强;缺点是复杂度高、调试困难。

自主编排(Autonomous Orchestration):Agent 不仅可以执行任务,还可以自主决定是否需要调整流程。这是最灵活但也最不可控的范式,目前仍处于研究阶段。

2026 年的关键趋势判断:Multi-Agent 系统正在从「学术玩具」走向「生产工具」。早期的 Multi-Agent 研究多在仿真环境和基准测试中验证,而当前的趋势是将 Multi-Agent 系统部署到真实的业务场景中(代码评审、金融交易、客户服务、内容生产)。这一转变带来的核心挑战不再是「Agent 能否协作」,而是「Agent 协作的成本效益如何」——如果 Multi-Agent 系统的性能提升不足以覆盖额外的推理成本和延迟开销,那么单 Agent 仍然是更优选择。

图表加载中…

💡 一句话理解

2026 年 Multi-Agent 实战建议:如果你正在考虑引入多 Agent 系统,先回答一个问题——你的任务是否真的需要多个 Agent?如果单 Agent + 工具调用就能解决,不要为了『多 Agent』而多 Agent。Multi-Agent 的价值在复杂决策和并行执行场景中才能充分体现。

⚠️ 常见踩坑

集体决策的常见陷阱:① 投票悖论——多个 Agent 的投票结果可能不一致,需要额外的仲裁机制;② 群体极化——Agent 之间互相影响可能导致决策走向极端;③ 成本失控——100 个 Agent 执行一次集体决策的成本可能是单 Agent 的数百倍。在投入生产前,必须进行成本-收益分析。

13更新于 2026-05-19:Agora-1 多智能体世界模型与共享环境

2026 年 5 月,Odyssey 公司发布了Agora-1——这是首个面向多 Agent 的共享环境世界模型。Agora-1 的发布标志着 Multi-Agent 系统从「独立运行的多个智能体」进化到了「在同一个共享世界中协作的智能体」。Agora-1 解决的核心问题: 在传统的 Multi-Agent 系统中,每个 Agent 都运行在自己的上下文中,它们通过消息传递进行通信。但这带来了一个根本性的问题——没有一个统一的「世界状态」供所有 Agent 参考。Agent A 和 Agent B 可能对同一件事情有不同的理解,因为它们看到的「世界」不同。

Agora-1 通过引入一个 共享的环境核心(Environment Core),为所有 Agent 提供了一个统一的参考系。这个环境核心包含:物理规则层:定义空间中的几何结构、物理约束(重力、碰撞、遮挡)、资源分布。所有 Agent 对同一物理实体的理解是一致的——如果一个物体在位置 (x, y),所有 Agent 都认同它在那里。Agent 表示层:每个 Agent 的状态、能力和意图被编码为其他 Agent 可以理解的表示。Agora-1 采用分层编码策略——底层编码物理状态(位置、速度),中层编码能力集(能做什么),高层编码意图(想做什么)。交互协调层:当多个 Agent 在同一环境中行动时,它们的行为可能产生冲突、协同或竞争。Agora-1 通过一个 共享的冲突解决协议来处理这些情况,确保环境状态的演变是确定性的。世界预测层:Agora-1 不仅建模「当前世界是什么」,还预测「未来世界会变成什么」。它采用迭代推理机制——每个 Agent 基于对其他 Agent 行为的预测来规划自己的行动,这些预测被汇总后生成联合的未来状态。Agora-1 对 Multi-Agent 系统设计的深远影响: 它提出了一个新的架构范式——与其让 Agent 通过「消息」来间接理解彼此,不如让它们在一个「共享世界」中直接观察和交互。这与人类社会的运作方式非常相似:我们不需要互相发消息来确认彼此的位置,因为我们共享同一个物理空间。

这种范式转变的意义在于:它降低了 Multi-Agent 系统的通信开销。在传统的消息传递架构中,N 个 Agent 之间需要 O(N 平方) 的通信通道;在 Agora-1 的共享环境架构中,每个 Agent 只需要与共享环境交互,通信复杂度降低到 O(N)。AI Master 认为,Agora-1 的出现为 Multi-Agent 系统提供了一个「操作系统」级别的抽象层。 就像操作系统为应用程序提供了统一的硬件抽象一样,Agora-1 为 Multi-Agent 系统提供了一个统一的共享环境抽象。在这个抽象层之上,各种 Multi-Agent 应用(协作编程、多机器人控制、群体智能决策)都可以更高效地构建。

图表加载中…

💡 一句话理解

如果你正在设计 Multi-Agent 系统,考虑是否可以引入「共享环境」的概念。即使不是物理世界,也可以是共享的知识图谱、共享的任务状态板、或共享的上下文缓存。共享环境的本质是「减少不必要的通信,增加可观察性」。

⚠️ 常见踩坑

Agora-1 的共享环境架构虽然降低了通信复杂度,但也引入了单点依赖风险。如果环境核心出现故障,所有 Agent 都会受到影响。在生产环境中,共享环境需要实现高可用(如多副本同步、故障转移),否则可能成为整个系统的瓶颈。

14更新于 2026-05-21:2026 年 Multi-Agent 最新进展与工具生态

2026 年上半年,Multi-Agent 领域迎来了多项突破性进展,从底层模型能力到编排框架生态都发生了显著变化。本节汇总最新的行业动态,并修正前文中可能过时的信息。Anthropic Claude 3.7 Sonnet + Computer Use(2026 年 3 月正式发布): Claude 3.7 Sonnet 是 Anthropic 推出的混合推理模型,支持可扩展推理模式(Extended Thinking),允许模型在输出前进行更长时间的「思考」,大幅提升复杂任务的处理能力。更重要的是,Claude 3.7 内置了Computer Use 能力——模型可以直接观察屏幕、操作鼠标键盘、浏览网页和执行终端命令。这意味着一个 Claude Agent 可以独立完成原本需要多 Agent 协作才能完成的任务链(如:打开浏览器搜索信息 → 复制数据 → 打开 Excel 整理分析 → 生成报告),这重新定义了「什么时候需要 Multi-Agent」的边界问题

Claude Computer UseMulti-Agent 架构的影响:过去,一个复杂的网页操作任务可能需要 Researcher Agent(搜索)+ DataProcessor Agent(处理)+ ReportWriter Agent(写报告)三个角色协作。现在,单个 Claude Agent 通过 Computer Use 就能独立完成整个流程。这并不意味着 Multi-Agent 被淘汰——在需要高可靠性和交叉验证的场景中,多 Agent 仍然是更好的选择。但对于中等复杂度的「UI 自动化」类任务,单 Agent + Computer Use 可能更高效、更经济。OpenAI Codex Agent(2026 年 2 月 GA): OpenAI 将 Codex 从一个代码补全工具升级为全自主编程 Agent 830。Codex Agent 可以:理解自然语言需求、规划实现方案、编写和测试代码、修复 Bug、甚至提交 Pull Request。它支持多文件编辑工作区级别的上下文理解,不再局限于单文件补全。Codex Agent 的核心竞争力在于与 GitHub 的深度集成——它可以创建分支、提交代码、发起 PR,并自动处理 Review 反馈。

Codex Agent 与 Multi-Agent 的关系:Codex Agent 本质上是一个 高度专业化的 Coder Agent1079。在 Multi-Agent 系统中,你可以将 Codex Agent 作为「编码角色」的一个实现选项,与其他 Agent(如 Manager、Reviewer)组合使用。但 Codex Agent 也内置了多 Agent 协作能力 ——它可以自动调用不同的工具(代码搜索、测试框架、终端)来模拟多角色协作。 LangGraph 最新进展(2026 Q1-Q2):LangGraph 作为 LangChain 生态的核心编排引擎,在 2026 年进行了多项重大更新。最重要的变化是:引入了原生 Human-in-the-Loop 支持——流程可以在任意节点暂停,等待人工审批或输入后再继续;新增状态检查点(Checkpointing)——支持断点续传和流程回放,大幅提升了调试能力;推出了LangGraph Studio1456——一个可视化的 Agent 工作流编辑器和调试器,让你直观看到状态在图中的流转过程。这些更新让 LangGraph 从「开发者的编程框架」进化为了「业务人员的可视化工具」。CrewAI 新特性(2026 Q1-Q2): CrewAI 在 2026 年新增了多项实用功能。最值得关注的是:Knowledge 模块 ——允许 Agent 接入外部知识库(文档、数据库、API),而不需要将所有信息塞进 Context; Memory 增强 ——新增短期记忆(对话历史)和长期记忆(经验存储)的自动管理,Agent 可以从过去的交互中学习;Crew 级别的条件分支 ——支持基于前一步结果动态调整后续流程,不再局限于简单的 Sequential / Hierarchical 模式; Flows 概念——将多个 Crew 编排为更大的工作流,支持跨 Crew 的数据传递和状态管理。这些更新大幅提升了 CrewAI 在复杂场景下的适用性。DeepAgents(2026 年新进入者): 一个新兴的 Multi-Agent 编排框架,其核心创新是 基于意图的任务分解——用户只需描述目标,框架自动识别需要哪些 Agent 角色、如何分解任务、如何编排执行。与传统框架(需要手动定义每个 Agent 和 Task)相比,DeepAgents 的目标是 零代码构建 Multi-Agent 工作流。目前仍在快速迭代中,适合快速原型验证。 LangGraph vs CrewAI vs AutoGen 最新对比(2026 年 5 月更新):三大框架的定位正在逐渐分化——LangGraph 专注于 复杂工作流的精确控制(适合需要细粒度状态管理的场景);CrewAI 专注于 角色驱动的自然协作(适合快速搭建 Agent 团队);AutoGen 专注于 灵活的对话拓扑(适合需要动态 Agent 交互的场景)。选择建议:如果你的任务可以明确分解为 DAG,选 LangGraph;如果你想快速定义几个角色并开始协作,选 CrewAI;如果你需要 Agent 之间自由对话和协商,选 AutoGen。 修正说明:前文第 6 章提到 Ruflo 是 Agent 编排的领跑者(51k 星)。截至 2026 年 5 月, LangGraph 的社区活跃度和生态完整性已经超越 Ruflo2486,成为 LangChain 生态事实上的编排标准。Ruflo 仍然是一个优秀的轻量级选项,但如果团队已经在使用 LangChainLangGraph 是更自然的选择。2026 下半年的关键观察 Multi-Agent 系统正在经历从「框架竞争」到「标准化」的过渡期。OpenAI、Anthropic、Google 都在推动各自的 Agent 协议标准,未来可能出现跨框架的 Agent 互操作协议。同时,Computer Use 和 Code Agent 等能力正在模糊「单 Agent」和「Multi-Agent 的边界——当单个 Agent 能够使用工具独立完成复杂任务时,Multi-Agent 的价值需要重新论证:不是「能不能做」,而是「做得比单 Agent 更好吗」。

图表加载中…

💡 一句话理解

2026 年 Multi-Agent 选型实用建议:如果你刚入门,从 CrewAI 开始(API 最简单);如果你需要细粒度控制和可视化调试,选 LangGraph;如果你在构建需要高可靠性的生产系统,先用单 Agent + Computer Use 验证需求,再逐步引入 Multi-Agent 架构来提升容错能力。

⚠️ 常见踩坑

信息过时风险提醒:AI 领域进展极快,本节提到的框架特性可能在数月后发生变化。建议定期查阅各框架的官方文档和 GitHub Releases:LangGraphlangchain-ai/langgraph)、CrewAI(crewAIInc/crewAI)、Anthropic 文档(docs.anthropic.com)。不要仅依赖本文作为框架选型的唯一依据。

🎯 相关面试题

巩固本篇知识点,备战 AI 岗位面试。