跳至内容

M08 多智能体系统

M05 已经介绍过 Orchestrator-Workers,但那仍然属于单进程内的函数编排:编排者生成子任务,再调用一批 worker 执行。本章继续向前一步,讨论完整的多智能体系统(Multi-Agent System):多个 Agent 作为独立参与者并发运行,并通过消息协作。

Go 很适合实现这类系统。goroutine、channel 和 context 分别承担并发执行、消息传递和生命周期管理,正好对应多 Agent 系统的三项工程基础。本章会先用 channel 实现 MessageBus,再展开 Supervisor、Orchestrator + 隔离 Subagent、Channel Pipeline、Multi-Agent Debate 和 Swarm 五种拓扑。

本章是知识篇中“协作”主题的收尾。M04 讨论单个 Agent 如何运行,M05 讨论常见设计模式,本章把多个 Agent 放到同一个系统中协作。后续 M09 将继续处理上下文治理,M10 将补充可观测性、评估与安全。

学习目标

学完本章,你应该能够:

  1. 区分 M05 的单进程编排与本章的多智能体系统,判断何时才有必要引入多个 Agent;
  2. 用 channel 实现一个 MessageBus,作为 Agent 间通信的基础设施;
  3. 理解并实现 Supervisor、Orchestrator + 隔离 Subagent、Channel Pipeline、Multi-Agent Debate 和 Swarm;
  4. 说明上下文隔离与结果浓缩为什么是隔离 Subagent 模式的核心价值;
  5. 根据任务结构选择协作拓扑,并识别多 Agent 的 token 成本、延迟和特有失败模式;
  6. 完成 debate 多智能体辩论 demo,并比较质量收益与资源成本。

本章依赖 M04 的 agent 包、M05 的设计模式,以及 M01 中的 goroutine、channel、contextsync。配套练习是 debate:多个持不同立场的 Agent 围绕同一问题辩论数轮,再由评审综合出最终答案。

一、从模式到系统

首先要厘清边界,否则很容易把 M05 的 Orchestrator-Workers 和本章的多智能体系统混为一谈。

M05 的模式,本质上是在一个进程内按照既定代码流程组织多次 LLM 调用。编排者向 worker 分配任务,实际仍是主流程调用函数;worker 之间通常不直接通信,也没有各自独立的决策循环。

多智能体系统增加了三个特征:

  • 独立性:每个 Agent 都是自主参与者,拥有自己的角色、上下文和决策循环;
  • 并发性:多个 Agent 可以通过 goroutine 同时运行,而不是只能由主流程串行调用;
  • 通信:Agent 之间通过 channel 传递消息,而不是彼此直接调用函数。
M05 单进程编排:                      M08 多智能体系统:
   [主流程]                          [Agent A] ⇄ [MessageBus] ⇄ [Agent B]
    │ 调用                                │           ↕            │
    ├─► worker1()  (函数)                 └────► [Agent C] ◄───────┘
    └─► worker2()  (函数)              (各自独立、并发运行、消息通信)
   控制流在你手里                       控制流分散在 Agent 与拓扑里

多智能体系统需要并发执行体、消息通道以及超时和取消机制,正好对应 goroutine、channel 和 context。这些都是 Go 语言及标准库的原生能力,因此很适合用来表达本章的几类拓扑。

多智能体系统不是默认升级项,要不要使用多智能体需要根据具体业务来决定。Anthropic 在其研究系统数据中报告,普通 Agent 的 token 用量约为聊天交互的 4 倍,多 Agent 系统约为聊天交互的 15 倍。这个数字来自特定系统,不是所有任务都适用的固定比例,但足以说明引入多个 Agent 前必须评估收益与成本。

二、通信基石 MessageBus

多个 Agent 要协作,首先需要解决通信问题。最直接的做法是让 Agent A 持有 Agent B 的引用并直接调用,但这会让成员之间产生强耦合:A 必须知道 B 的接口和生命周期。

更通用的做法是引入消息总线(MessageBus)。每个 Agent 只负责发布消息并从自己的收件箱读取消息,路由由总线统一处理。

MessageBus 通信模型

在 Go 中,可以为每个 Agent 分配一个收件箱 channel,再由总线维护 Agent 名称到收件箱的映射。先定义消息结构:

package mas // multi-agent system

type Message struct {
	From    string         // 发送者 Agent 名
	To      string         // 收件人;空或 "*" 表示广播
	Content string         // 文本内容
	Meta    map[string]any // 可选的结构化负载
}

MessageBus 提供两个核心能力:Subscribe 为 Agent 注册收件箱,Publish 将消息发送给指定成员或广播给其他成员。内部 map 可能被多个 goroutine 并发访问,因此必须加锁;channel 发送可能阻塞,因此还要支持 context 取消。

package mas

import (
	"context"
	"fmt"
	"sync"
)

type MessageBus struct {
	mu      sync.RWMutex
	inboxes map[string]chan Message
	buffer  int
}

func NewMessageBus(buffer int) *MessageBus {
	return &MessageBus{inboxes: make(map[string]chan Message), buffer: buffer}
}

// Subscribe 为某个 Agent 注册收件箱,返回只读 channel。
func (b *MessageBus) Subscribe(name string) <-chan Message {
	b.mu.Lock()
	defer b.mu.Unlock()
	ch := make(chan Message, b.buffer)
	b.inboxes[name] = ch
	return ch
}

// Publish 把消息路由给收件人;广播则发给除自己外的所有人。
func (b *MessageBus) Publish(ctx context.Context, msg Message) error {
	b.mu.RLock()
	defer b.mu.RUnlock()

	send := func(ch chan Message) error {
		select {
		case ch <- msg:
			return nil
		case <-ctx.Done():
			return ctx.Err()
		}
	}

	if msg.To != "" && msg.To != "*" {
		ch, ok := b.inboxes[msg.To]
		if !ok {
			return fmt.Errorf("收件人不存在: %s", msg.To)
		}
		return send(ch)
	}
	for name, ch := range b.inboxes { // 广播
		if name == msg.From {
			continue
		}
		if err := send(ch); err != nil {
			return err
		}
	}
	return nil
}

func (b *MessageBus) Close() {
	b.mu.Lock()
	defer b.mu.Unlock()
	for _, ch := range b.inboxes {
		close(ch)
	}
	b.inboxes = make(map[string]chan Message)
}

Agent 接入总线后,可以启动一个 goroutine 持续读取收件箱。每次收到消息后执行处理函数,并按需把回复重新发布到总线。

func RunBusAgent(ctx context.Context, bus *MessageBus, name string, handle func(context.Context, Message) (*Message, error)) {
	inbox := bus.Subscribe(name)
	go func() {
		for {
			select {
			case <-ctx.Done():
				return
			case msg, ok := <-inbox:
				if !ok {
					return // 总线关闭
				}
				reply, err := handle(ctx, msg)
				if err != nil || reply == nil {
					continue
				}
				_ = bus.Publish(ctx, *reply)
			}
		}
	}()
}
这段 Publish 为了突出主线,在持有 RLock 时执行 channel 发送。若某个收件箱已满,发送会阻塞,并影响 SubscribeClose。生产实现通常会先复制目标 channel 列表,释放锁后再发送,同时为慢消费者设置超时、丢弃或隔离策略。

这类问题属于消息背压(backpressure),是多智能体系统必须处理的工程约束。

三、多 Agent 协作方法

通信只是基础。把多个 Agent 组织成系统,还要决定状态如何共享、控制权由谁掌握、失败后如何恢复。本节先建立协作方法,再进入五种拓扑的具体实现。

为什么需要多 Agent

单 Agent 配合工具已经能处理大量任务。只有当以下限制开始影响效果时,拆分为多个 Agent 才可能带来收益。

  1. 上下文容量有限

单 Agent 的决策、工具结果和中间记录都进入同一个上下文窗口。任务步骤较多时,早期信息容易被后续内容稀释,并可能出现 Lost in the Middle 等长上下文问题。

  1. 工具集相互干扰

当一个 Agent 同时暴露大量工具时,工具描述和参数 schema 会增加选择难度。M09 的动态工具暴露可以减少当前可见工具,但将工具按职责拆给不同 Agent,能够进一步缩小决策范围。

  1. 角色要求发生冲突

如果同一个 Agent 既要遵循严谨的法律审查标准,又要进行开放式创意写作,system prompt 中的目标可能相互冲突。拆分角色可以让每个 Agent 使用更明确的提示词和评价标准。

  1. 缺少外部复核

单 Agent 可以根据工具错误自行重试,但不一定能发现整体方向已经偏离。中心监督或多方互评可以增加外部检查环节。

  1. 无法充分并行

单 Agent 的 Think → Act → Observe 循环通常按步骤推进。若任务中存在多个彼此独立的研究方向或数据源,拆成多个 Agent 可以并行执行。

这些限制与多 Agent 的对应关系如下:

  • 上下文有限 → 隔离不同 Agent 的上下文;
  • 工具集冲突 → 为不同 Agent 配置不同工具集;
  • 角色冲突 → 让每个 Agent 承担单一职责;
  • 缺少复核 → 引入主管或多方互评;
  • 无法并行 → 让多个 Agent 同时执行独立任务。

这些收益并不意味着复杂任务都应该使用多 Agent。应先确认“单 Agent + 工具 + 上下文工程”确实无法满足要求,再引入额外的通信、调度和观测成本。

多 Agent 协作的核心问题

设计多 Agent 系统时,有四个问题必须明确。

  1. 通信(Communication)

系统可能需要异步消息、一对多广播和中间路由。前面实现的 channel + MessageBus 为这些通信方式提供了基础。

  1. 状态(State)

每个 Agent 都有自己的上下文、消息历史和工具结果。状态共享策略会直接影响信息完整性、上下文长度与并发能力:

  • 全共享:每个 Agent 都能看到完整历史,信息最充分,但上下文增长最快;
  • 完全隔离:每个 Agent 只看到自己的输入输出,便于并行,但信息无法自然流通;
  • 浓缩汇总:子 Agent 独立工作,只把结论交给上层,在信息共享与上下文控制之间取得平衡。

浓缩汇总也是 M09 上下文工程的重要思路。

  1. 协调(Coordination)
协调形态谁拿主动权例子
中心化固定主管决定下一步Supervisor
分布式每个 Agent 各自决定Debate / Swarm
静态流程由代码预先确定Pipeline
  1. 容错(Fault Tolerance)

Agent 数量增加后,失败点也会增加。常见策略包括:

  • 超时重试:子 Agent 未按时返回时重新执行或更换执行者;
  • 降级:某个 Agent 失败后回退到单 Agent 或简化流程;
  • 冗余:把同一任务交给多个 Agent,通过投票或评审收敛;
  • 隔离:限制故障 Agent 对主流程和其他成员的影响。

分析任何多 Agent 系统时,都可以依次检查通信、状态、协调和容错。下面五种拓扑正是这四个方面的不同组合。

协作模式的多维分类

可以从控制流、状态共享、决策时机和消息形态四个维度理解协作拓扑。

控制流方向

中心化         混合          去中心化
[主管]      [编排者]         [A]⇄[B]
 / | \      [子 A][子 B]      |   |
[A][B][C]                    [C]⇄[D]
Supervisor   Orchestrator    Debate/Swarm

状态共享策略

策略例子上下文压力
全共享Debate(每个 Agent 看全部历史)
浓缩汇总Orchestrator(子 Agent 只返回结论)
数据流Pipeline(只见上一阶段输出)
点对点转交Swarm(handoff 时打包必要状态)

决策时机

时机含义例子
静态拓扑结构在代码里写死Pipeline
动态运行时由 LLM 决定下一步给谁Supervisor / Swarm
混合大框架静态,内部动态Orchestrator

消息形态

形态例子
任务分发(主管 → worker)Supervisor
数据流(上一阶段 → 下一阶段)Pipeline
辩论(每个 Agent 读取其他成员的结果)Debate
控制权转交(handoff)Swarm
子任务分解与收集Orchestrator

每种拓扑都是这四个维度上不同选择的组合。

五种主流拓扑总览

本章讨论五种常见拓扑:

多智能体拓扑总览

可以先用一句话概括每种拓扑。

拓扑一句话类比
Supervisor一个主管动态决定下一步谁干项目经理 + 工程师团队
Orchestrator + 子代理大任务分解 → 子任务隔离并行 → 浓缩汇总总编辑 + 多个记者分头跑
Pipeline固定阶段的流水线,每个 Agent 是一道工序工厂装配线
Multi-Agent Debate多方各自给方案,互相批判后收敛评审委员会
Swarm点对点转交控制权,职责清晰的接力接力赛

下面将会逐个展开实现。

五种拓扑的详细对比与选型

下面的对比表汇总了五种拓扑的主要工程特征。

维度SupervisorOrchestratorPipelineDebateSwarm
控制流中心化混合静态线性去中心化去中心化
状态共享主管见全局子 Agent 隔离数据流全共享转交时打包
决策时机每步动态顶层动态 + 子任务静态完全静态多轮动态每步动态
通信开销中(每步经过主管)低(可并行)低(线性)高(上下文互相引用)
Token 成本中(包含主管调用)取决于子任务数量高(多轮重复调用)
延迟串行 + 主管开销可并行,最快串行多轮辩论,慢串行接力
收敛性主管决定何时停编排者明确流水线尽头投票或共识终止状态
容错主管是单点子 Agent 可独立处理故障任一阶段失败会影响全链路可通过多数意见降低单点偏差任一节点失败可能中断
可扩展worker 易加,主管难扩子 Agent 易加加阶段需重设计加 Agent 通信压力翻倍易加但路由难
复杂度
适合场景灵活调度复杂任务可并行的研究 / 工单高吞吐批处理高风险决策、质量优先角色清晰的接力
典型例子客户经理调度专家Deep Research 类应用内容审核流水线法律 / 合规评审客户服务多角色转接

可以按下面的顺序选择拓扑。

有多 Agent 协作需求
        ├─ 任务结构能画清流程图?
        │     │是 → Pipeline(简单、高吞吐)
        ├─ 大任务可分解为多个独立子任务并行?
        │     │是 → Orchestrator + 隔离子代理(并行最快)
        ├─ 路径动态、需要灵活调度?
        │     │是 → Supervisor(适合大多数复杂任务)
        ├─ 高风险决策、要多方论证?
        │     │是 → Debate(成本高,适合多方复核)
        └─ 多个清晰角色接力转交?
              │是 → Swarm(职责边界清的接力场景)

工程实践中可以遵循以下原则:

  1. 流程固定时优先 Pipeline,因为它最简单、最便宜,也最容易控制;
  2. 大任务能够拆成独立子任务时,优先考虑 Orchestrator,以利用并行能力;
  3. 执行路径需要根据中间结果动态调整时,使用 Supervisor;
  4. 决策错误代价较高且确实需要多方论证时,再使用 Debate;
  5. 只有角色边界和转交条件都足够清楚时,才使用 Swarm。

多 Agent 的工程代价

拓扑选择不能只比较能力,还要评估以下工程代价。

  1. Token 成本增加

每个 Agent 都有自己的 system prompt、上下文和 LLM 调用。Debate 的调用次数至少与“辩手数 × 轮数”成正比;如果每个辩手都读取其他成员的完整回答,上下文总量还会随参与者数量进一步增长。

  1. 延迟增加

Supervisor、Pipeline 和 Swarm 包含串行路径,总延迟会累积每一步调用时间。Orchestrator 可以并行执行子任务,但分解和汇总仍然位于关键路径上。

  1. 调试难度增加

多 Agent 系统需要同时观察多条执行轨迹和消息链路:

  • 每个 Agent 都有独立轨迹,合并后形成更大的执行树;
  • 消息传递成为新的失败面;
  • 一个 Agent 的错误输出可能污染所有下游成员;
  • 定位问题时往往需要同时检查多条轨迹。
  1. 状态一致性问题

多个 Agent 共享短期进度或长期记忆时,需要处理:

  • 写冲突:两个 Agent 同时修改同一状态;
  • 版本滞后:一个 Agent 读取到其他成员更新前的旧状态;
  • 状态泄露:Agent 获得了超出自身职责范围的信息。
  1. 设计复杂度增加

每增加一个 Agent,都要设计 system prompt、工具集、通信接口和失败处理。成员之间的潜在交互关系随数量增加,因此设计成本通常不是简单的线性增长。

选型判据

在选择具体拓扑之前,先判断任务是否需要 Agent。

要解决一个复杂任务
        ├─ 路径完全已知 + 步数 < 5?
        │     │是 → 固定工作流
        │           使用普通代码实现
        ├─ 路径不确定 + 单 Agent 能承担?
        │     │是 → 单 Agent + 工具(M04)
        │           优先采用的方案
        ├─ 单 Agent 出现前述限制?
        │     │是 → 多 Agent
        │           按前面的决策树挑拓扑
        └─ 评估发现多 Agent 比单 Agent 反而差?
              │是 → 退回单 Agent
                    多 Agent 不是默认更优的方案

M05 的 ReAct、Plan-and-Execute 和 Evaluator-Optimizer 已经覆盖了部分多角色能力。例如,Plan-and-Execute 会在一个进程内分离规划与执行。只有这些模式仍不足以解决上下文隔离、并行探索或角色转交问题时,才需要升级到多智能体系统。

Anthropic 的工程实践建议先从简单、可组合的模式开始;LangChain 的多 Agent 指南同样强调,核心问题是为每个 Agent 提供恰当的上下文。多 Agent 的收益来自明确的任务结构,而不是成员数量本身。

对话调用助手

下面逐个实现五种拓扑。它们都建立在 M04 的 agent.Agent 和 M02 的 llm.Provider 之上。

为减少重复代码,先在 mas 包中准备一个基础调用助手:

package mas

import (
	"context"

	"github.com/yourname/llmagent/internal/llm"
)

func chat(ctx context.Context, p llm.Provider, model, system, user string) (string, error) {
	resp, err := p.Chat(ctx, llm.ChatRequest{
		Model: model,
		Messages: []llm.Message{
			{Role: llm.RoleSystem, Content: system},
			{Role: llm.RoleUser, Content: user},
		},
	})
	if err != nil {
		return "", err
	}
	return resp.Content, nil
}

四、Supervisor

Supervisor 模式设置一个中心 Agent。主管不直接执行具体任务,而是读取任务目标和当前进展,通过一次 LLM 调用决定下一步由哪个成员执行,或者判断任务已经完成。成员返回结果后,主管继续决策,直到结束。

这种模式的优势是灵活:执行路径由运行时状态决定,而不是像 Pipeline 一样预先固定。代价是每一步都要增加一次主管决策调用,而且主管会成为中心节点。

实现时应让主管返回结构化决策。下面要求模型只输出 JSON,再使用 M02 的 ParseInto 解析下一位执行者。

package mas

import (
	"context"
	"fmt"
	"strings"

	"github.com/yourname/llmagent/internal/agent"
	"github.com/yourname/llmagent/internal/llm"
)

type SupervisorDecision struct {
	Next   string `json:"next"`   // 下一个成员名;"FINISH" 表示结束
	Reason string `json:"reason"`
}

type Supervisor struct {
	Provider llm.Provider
	Model    string
	Workers  map[string]*agent.Agent // 成员名 -> Agent
	MaxTurns int
}

func (s *Supervisor) decide(ctx context.Context, task, progress string) (SupervisorDecision, error) {
	names := make([]string, 0, len(s.Workers))
	for n := range s.Workers {
		names = append(names, n)
	}
	system := fmt.Sprintf(
		"你是团队主管。可调度的成员有:%s。根据任务和当前进展,决定下一步该谁来做;"+
			"若任务已完成则输出 FINISH。严格只输出 JSON:{\"next\":\"成员名或FINISH\",\"reason\":\"理由\"}",
		strings.Join(names, ", "))
	out, err := chat(ctx, s.Provider, s.Model, system, "任务:"+task+"\n\n当前进展:\n"+progress)
	if err != nil {
		return SupervisorDecision{}, err
	}
	return llm.ParseInto[SupervisorDecision](out)
}

func (s *Supervisor) Run(ctx context.Context, task string) (string, error) {
	var progress strings.Builder
	for turn := 0; turn < s.MaxTurns; turn++ {
		d, err := s.decide(ctx, task, progress.String())
		if err != nil {
			return "", err
		}
		if d.Next == "FINISH" {
			return progress.String(), nil
		}
		w, ok := s.Workers[d.Next]
		if !ok {
			return "", fmt.Errorf("主管选了不存在的成员: %q", d.Next)
		}
		result, err := w.Run(ctx, task+"\n\n已有进展:\n"+progress.String())
		if err != nil {
			return "", err
		}
		fmt.Fprintf(&progress, "【%s】\n%s\n\n", d.Next, result)
	}
	return progress.String(), fmt.Errorf("达到最大轮次 %d 仍未 FINISH", s.MaxTurns)
}

MaxTurns 是必要的停止条件。如果主管始终不输出 FINISH,系统可能在多个成员之间持续调度并不断消耗 token。所有由模型控制下一步的循环都必须设置硬上限。

Supervisor 适合步骤不固定、需要根据中间结果动态调整执行者的任务。其中心化结构也是系统瓶颈:主管既增加调用成本,也是单点故障位置。

五、Orchestrator 与隔离 Subagent

截至 2026 年 6 月,Anthropic、OpenAI 和 LangChain 的官方资料都在介绍编排者调用专门子 Agent 的协作方式。它与 M05 的 Orchestrator-Workers 形式相近,但这里强调两个工程约束:子代理使用隔离的上下文,并且只把与任务相关的结果交回编排者。

M04 已经说明,Agent 的 Messages 会随着工具调用不断增长。如果编排者接收所有子代理的完整消息、工具结果和中间输出,主上下文会迅速膨胀,增加成本并降低有效信息密度。

隔离 Subagent 的做法是让每个子代理在全新的上下文中独立完成子任务。子代理内部可以进行多轮工具调用,但返回给编排者的只是浓缩结论,而不是完整执行记录。这样,编排者主要处理“目标 + 子结论 + 综合”,不会被子代理内部细节占满上下文。

隔离 Subagent

实现上可以复用 M05 的 Sectioning 并行执行任务。关键点是通过工厂函数为每个子任务创建新的 Agent,并且只收集 Run 返回的最终结果,而不是内部 transcript。

package mas

import (
	"context"
	"fmt"
	"strings"

	"github.com/yourname/llmagent/internal/agent"
	"github.com/yourname/llmagent/internal/llm"
	"github.com/yourname/llmagent/internal/patterns" // 复用 M05 的 Sectioning
)

type IsolatedOrchestrator struct {
	Provider llm.Provider
	Model    string
	// NewSubagent 为某个角色现造一个全新 Agent —— 每次调用都是干净、隔离的上下文。
	NewSubagent func(role string) *agent.Agent
}

type subtask struct {
	Role string `json:"role"`
	Task string `json:"task"`
}
type plan struct {
	Subtasks []subtask `json:"subtasks"`
}

func (o *IsolatedOrchestrator) Run(ctx context.Context, goal string) (string, error) {
	// 1) 编排者分解任务(结构化输出)
	sys := "你是编排者。把任务拆成可独立完成的子任务,为每个子任务指定一个角色。" +
		"严格只输出 JSON:{\"subtasks\":[{\"role\":\"...\",\"task\":\"...\"}]}"
	raw, err := chat(ctx, o.Provider, o.Model, sys, goal)
	if err != nil {
		return "", err
	}
	p, err := llm.ParseInto[plan](raw)
	if err != nil {
		return "", fmt.Errorf("分解结果无法解析: %w", err)
	}

	// 2) 并行跑各子代理。每个子代理上下文隔离,只交回浓缩结论。
	jobs := make([]func(context.Context) (string, error), len(p.Subtasks))
	for i, st := range p.Subtasks {
		st := st
		jobs[i] = func(ctx context.Context) (string, error) {
			sub := o.NewSubagent(st.Role) // 全新、隔离的上下文
			result, err := sub.Run(ctx, st.Task)
			if err != nil {
				return "", fmt.Errorf("子代理[%s]失败: %w", st.Role, err)
			}
			return fmt.Sprintf("【%s 的结论】%s", st.Role, result), nil
		}
	}
	results, err := patterns.Sectioning(ctx, jobs)
	if err != nil {
		return "", err
	}

	// 3) 编排者只基于“浓缩结论”综合 —— 它的上下文从未被子代理内部细节污染
	return chat(ctx, o.Provider, o.Model,
		"下面是各子代理的结论,请综合成对原始目标的完整回答。",
		"目标:"+goal+"\n\n各子代理结论:\n"+strings.Join(results, "\n\n"))
}
这一模式的主要价值是上下文隔离与结果浓缩。Supervisor 采用串行、动态调度并共享任务进展;隔离 Orchestrator 则预先分解任务、并行执行子任务,并限制子代理上下文进入主流程。

六、Channel Pipeline

当一批数据需要依次经过固定阶段时,Pipeline 是最直接的拓扑。例如批量处理文档,每条数据都要经过 分类 → 检索资料 → 起草内容 → 质检。每个阶段由一个或一组 Agent 负责,阶段之间通过 channel 串联。

这正是 Go 常见的 pipeline 并发模式。下面用泛型实现一个通用阶段,同时支持 fan-out 和 fan-in:一个阶段启动多个 worker 并行消费输入,再把结果汇入同一个输出 channel。

package mas

import (
	"context"
	"sync"
)

// Stage 是流水线的一个阶段:从 in 读取,用 workers 个 goroutine 并行处理,结果写入返回的 channel。
func Stage[I, O any](ctx context.Context, in <-chan I, workers int, fn func(context.Context, I) (O, error)) <-chan O {
	out := make(chan O)
	var wg sync.WaitGroup
	for i := 0; i < workers; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for item := range in {
				o, err := fn(ctx, item)
				if err != nil {
					continue // 教学简化:实际应把错误送往专门的错误 channel
				}
				select {
				case out <- o:
				case <-ctx.Done():
					return
				}
			}
		}()
	}
	go func() { wg.Wait(); close(out) }() // 所有 worker 退出后关闭输出
	return out
}

有了 Stage,工单处理流水线只需要把多个阶段依次连接起来。

// 把工单流依次经过:分类 → 检索 → 起草 → 质检
classified := Stage(ctx, tickets, 4, classifyAgent)   // 4 个并行做分类
retrieved := Stage(ctx, classified, 2, retrieveAgent)
drafted := Stage(ctx, retrieved, 4, draftAgent)
reviewed := Stage(ctx, drafted, 2, reviewAgent)

for r := range reviewed { // 消费最终结果
	save(r)
}

每个阶段都可以独立设置并行度。例如分类和起草可以配置更多 worker,受外部知识库限制的检索阶段则降低并发。阶段之间通过 channel 形成数据流和背压,context 取消后整条流水线都能停止。

Pipeline 适合阶段固定、追求吞吐的批处理场景。它不适合频繁改变执行路径,但结构简单、容易并行,也便于定位各阶段的容量瓶颈。

七、Multi-Agent Debate

有些高风险任务需要多视角复核,例如医疗建议初筛、风控判断或重要方案评审。Multi-Agent Debate 让多个 Agent 先独立作答,再读取其他成员的答案、提出批评并修订观点,最后由评审综合定稿。

它与 M05 的 Voting 不同。Voting 中的 Agent 独立作答,最后只做汇总或计票;Debate 中的 Agent 会看到并回应其他观点。额外的修订过程可能发现遗漏,但也会增加调用次数和上下文长度。

Multi-Agent Debate 流程

实现的核心是按轮次推进。每轮开始时先复制上一轮答案,所有辩手基于同一份快照并行作答;本轮全部完成后再进入下一轮。

package mas

import (
	"context"
	"fmt"
	"strings"
	"sync"

	"github.com/yourname/llmagent/internal/llm"
)

type Debater struct {
	Name     string
	Provider llm.Provider
	Model    string
	Persona  string // 该辩手的立场/视角,作为 system 提示词
}

// Debate 进行 rounds 轮辩论,返回每位辩手的最终答案。
func Debate(ctx context.Context, debaters []Debater, question string, rounds int) (map[string]string, error) {
	answers := make(map[string]string)
	var mu sync.Mutex

	for round := 0; round < rounds; round++ {
		// 快照本轮开始时的答案,供各辩手参考(避免并发读写同一 map)
		mu.Lock()
		prev := make(map[string]string, len(answers))
		for k, v := range answers {
			prev[k] = v
		}
		mu.Unlock()

		var wg sync.WaitGroup
		errCh := make(chan error, len(debaters))
		for _, d := range debaters {
			d := d
			wg.Add(1)
			go func() {
				defer wg.Done()
				ans, err := chat(ctx, d.Provider, d.Model, d.Persona, debatePrompt(question, d.Name, prev))
				if err != nil {
					errCh <- err
					return
				}
				mu.Lock()
				answers[d.Name] = ans
				mu.Unlock()
			}()
		}
		wg.Wait()
		close(errCh)
		if err := <-errCh; err != nil { // 取第一个错误(channel 空且关闭时返回 nil)
			return nil, err
		}
	}
	return answers, nil
}

func debatePrompt(question, self string, prev map[string]string) string {
	if len(prev) == 0 {
		return "问题:" + question + "\n请给出你的回答和理由。"
	}
	var sb strings.Builder
	sb.WriteString("问题:" + question + "\n\n其他成员上一轮的观点:\n")
	for name, ans := range prev {
		if name == self {
			continue
		}
		fmt.Fprintf(&sb, "- %s:%s\n", name, ans)
	}
	sb.WriteString("\n请批判性地参考他们的观点,修订并强化你的回答。")
	return sb.String()
}

// Judge 综合各辩手的最终答案,给出定稿。
func Judge(ctx context.Context, p llm.Provider, model, question string, answers map[string]string) (string, error) {
	var sb strings.Builder
	for name, ans := range answers {
		fmt.Fprintf(&sb, "【%s】%s\n\n", name, ans)
	}
	return chat(ctx, p, model,
		"你是评审。综合下面各位专家的最终回答,给出最准确、全面、平衡的定稿。",
		"问题:"+question+"\n\n各方回答:\n"+sb.String())
}
辩论的调用次数随“辩手数 × 轮数”增长。三个辩手进行三轮辩论需要 9 次辩手调用,此外还要增加一次评审调用。是否提升质量必须通过任务集评估,不能仅凭拓扑名称判断。

八、Swarm

Swarm 是去中心化的控制权转交模式。系统没有固定主管,每个 Agent 处理当前请求后,自行决定给出最终答案,还是通过 handoff 把请求转交给另一个 Agent。

这种结构适合分诊和接力场景。例如接待 Agent 先识别问题领域,再转交给对应专家;专家若发现问题涉及其他职责,还可以继续转交。每个 Agent 只负责自己的专业范围和可转交对象。

实现时,每个 Agent 的结果要么包含最终答案,要么包含下一个 handoff 目标。Swarm 运行器沿转交链执行,直到得到答案或达到转交上限。

package mas

import (
	"context"
	"fmt"
)

// SwarmResult:Agent 要么给出答案,要么转交给下一个。
type SwarmResult struct {
	Answer    string // 非空表示给出最终答案
	HandoffTo string // 非空表示转交给该 Agent
}

type SwarmAgent struct {
	Name string
	Run  func(ctx context.Context, input string) (SwarmResult, error)
}

type Swarm struct {
	Agents  map[string]SwarmAgent
	MaxHops int // 最大转交次数,防止无限转交
}

func (s *Swarm) Run(ctx context.Context, start, input string) (string, error) {
	cur := start
	for hop := 0; hop < s.MaxHops; hop++ {
		a, ok := s.Agents[cur]
		if !ok {
			return "", fmt.Errorf("不存在的 Agent: %q", cur)
		}
		res, err := a.Run(ctx, input)
		if err != nil {
			return "", err
		}
		if res.HandoffTo == "" {
			return res.Answer, nil // 有人给出了答案,结束
		}
		// 转交:把当前的处理结果作为上下文带给下一个 Agent
		cur = res.HandoffTo
		if res.Answer != "" {
			input = input + "\n\n[" + a.Name + " 的处理]:" + res.Answer
		}
	}
	return "", fmt.Errorf("转交超过 %d 次仍无最终答案", s.MaxHops)
}

每个 SwarmAgent 内部仍然可以使用 M04 的 Agent,只需增加一个表达转交目标的 handoff 工具。Swarm 容易增加新角色,但也可能形成 A → B → A 的循环,因此 MaxHops 是必要边界。生产系统还应记录已访问节点,以便提前发现循环转交。

Swarm 把“下一步由谁处理”的决策交给当前 Agent。只有在角色职责、转交条件和终止条件都足够明确时,这种结构才容易保持稳定。

九、选型与代价

理解五种拓扑后,还要回答两个问题:当前任务是否需要多 Agent,以及需要哪一种拓扑。

先按任务特征选择候选拓扑。

任务特征候选拓扑
阶段固定、批量、追吞吐Pipeline
步骤不定、需灵活动态调度Supervisor
可并行分解、子任务上下文需隔离Orchestrator + 隔离子代理
高风险决策、值得多方互相批判Debate
职责清晰、接力式转交Swarm

在选拓扑之前,应重新确认任务是否真的需要多个 Agent。除额外 token 成本外,多智能体系统还会引入以下特有失败模式:

  • 死锁与背压:Agent 互相等待消息,或慢消费者阻塞消息总线;
  • 协调风暴:广播式通信使消息数量快速增长;
  • 跨 Agent 调试困难:错误轨迹分散在多个并发 Agent 和消息链路中;
  • 错误跨体传播:一个 Agent 的错误结论被其他成员继续使用并放大;
  • 停止条件缺失:主管、辩论或转交流程没有硬边界,导致成本失控。
默认方案仍然应该是单 Agent + 合适的工具。只有经过评估确认单 Agent、RAG、Routing 或固定工作流无法满足要求,并且能够说明某种拓扑解决的具体问题时,才引入多 Agent。

配套练习:debate 多智能体辩论 demo

使用本章的 DebateJudge 实现一个多智能体辩论工具,并通过对比实验评估质量收益和 token 成本。

需求如下:

  • 给定一个有争议或需要权衡的问题,例如“这段架构方案有什么风险”;
  • 创建 3 个持不同视角的辩手,例如务实派、谨慎派和数据派;
  • 辩论 2~3 轮,并打印每轮每位辩手的观点变化;
  • 由评审综合各方最终观点并生成定稿;
  • 统计总 token 消耗;
  • 对比单个 Agent 直接回答与“三辩手辩论三轮 + 评审”的质量和成本。

验收点:

  • 使用不同 Persona 运行多轮 Debate,并可选用不同模型或 Provider;
  • 使用 Judge 综合生成定稿;
  • 打印每一轮观点,观察辩手是否根据其他观点做出有效修订;
  • 设置轮数上限并统计总 token 消耗;
  • 对同一个问题执行单 Agent 与多 Agent 对比实验。

入口骨架如下:

func main() {
	ctx := context.Background()
	question := "我们准备用单体架构起步、后期再拆微服务,这个决策有哪些风险?"

	debaters := []mas.Debater{
		{Name: "务实派", Provider: p, Model: m, Persona: "你务实,关注落地速度与团队现状。"},
		{Name: "谨慎派", Provider: p, Model: m, Persona: "你谨慎,关注长期可维护性与技术债。"},
		{Name: "数据派", Provider: p, Model: m, Persona: "你重证据,倾向用事实和案例说话。"},
	}

	answers, err := mas.Debate(ctx, debaters, question, 3)
	if err != nil { /* ... */ }

	final, err := mas.Judge(ctx, p, m, question, answers)
	if err != nil { /* ... */ }
	fmt.Println("=== 定稿 ===\n", final)
}
不要预设 Debate 一定优于单 Agent。应针对准确性、完整性、延迟和 token 成本建立评价标准,再根据实验结果决定是否采用该拓扑。

本章小结

内容在系统中的作用
channel 式 MessageBusAgent 间解耦通信的基础
Supervisor(JSON 决策)步骤不固定任务的动态调度
Orchestrator + 隔离子代理复杂任务并行分解、上下文隔离与结果浓缩
Channel Pipeline批处理场景的高吞吐流水线
Multi-Agent Debate高风险决策的多视角复核
Swarm 转交职责清晰的接力式转接
选型与失败模式判断是否值得引入多 Agent

本章从 MessageBus 开始,依次讨论了多 Agent 协作的核心问题、五种拓扑及其 Go 实现,并给出了两层选型方法:先判断是否需要 Agent,再判断是否需要多个 Agent。

下一章进入 M09 上下文工程。多智能体系统运行后,多个 Agent 的消息、工具结果和中间结论会持续进入上下文。如何压缩、隔离和组织这些信息,是系统长期稳定运行的关键。

思考题

  1. 本章手写了主管循环、流水线串联和辩论轮次。如果使用声明式图结构描述节点连接与数据流,哪些重复代码可以被框架接管?这正是 M11 Eino 类型化图编排要解决的问题。
  2. 本章的 Agent 通过进程内 channel 通信。如果 Agent 分属不同团队、使用不同语言并部署在不同机器上,应该如何定义通信协议?M12 将进入 A2A 协议。
  3. 多智能体系统出现问题时,应该记录哪些输入输出、消息流转、token 消耗和耗时信息,才能还原完整执行过程?这会在 M10 可观测性中展开。

参考资料

最后更新于 • Q1mi