选择Swarm模式
概览
10种多智能体工作流的协调模式。选择最简单的能解决问题的模式——只有在系统证明它不够用时才增加复杂性。
快速决策框架
任务是否每个智能体独立?
是 → fan-out(并行工作者)
每个步骤是否需要前一个步骤的输出?
是 → 是否严格线性?
是 → pipeline
否 → dag(可能的情况下并行)
是否需要一个协调者保持活跃并适应?
是 → 是否有一个管理级别?
是 → hub-spoke
否 → hierarchical(多级别)
任务是否涉及决策?
是 → 智能体需要争论对立的方面吗?
是 → debate(对抗性)
否 → consensus(合作投票)
在处理过程中是否出现了正确的专家?
是 → handoff(动态路由)
所有智能体是否需要自由协作?
是 → mesh(点对点)
成本是否是首要考虑因素?
是 → cascade(首先使用最便宜的模型,如果需要则升级)
模式参考
| # | 模式 | 拓扑 | 智能体 | 最适合 |
|---|---|---|---|---|
| 1 | fan-out | 星形(SDK中心) | N并行 | 独立子任务(审查,研究,测试) |
| 2 | pipeline | 线性链 | 顺序 | 有序阶段(设计 → 实施 → 测试) |
| 3 | hub-spoke | 星形(活动中心) | 1领导 + N工作者 | 动态协调,领导审查/调整 |
| 4 | consensus | 广播+投票 | N投票者 | 架构决策,批准门 |
| 5 | mesh | 全连接 | N对等 | 头脑风暴,协作调试 |
| 6 | handoff | 路由链 | 一次1个活动 | 分诊,专家路由,支持流程 |
| 7 | cascade | 分层升级 | 最便宜 → 最能干 | 成本优化,生产工作负载 |
| 8 | dag | 依赖图 | 并行+连接 | 具有混合依赖的复杂项目 |
| 9 | debate | 对抗性回合 | 2+辩手+裁判 | 严格评估,架构权衡 |
| 10 | hierarchical | 树(多级别) | 领导 → 协调员 → 工作者 | 大型团队,领域分离 |
模式详情
1. fan-out — 并行工作者
fanOut([
{ task: "Review auth.ts", name: "AuthReviewer" },
{ task: "Review db.ts", name: "DbReviewer" },
], { cli: "claude" });
- 工作者独立运行,没有智能体间通信
- SDK收集所有DONE消息
- 使用时:任务是尴尬地并行的
2. pipeline — 顺序阶段
pipeline([
{ task: "Design the API schema", name: "Designer" },
{ task: "Implement the endpoints", name: "Implementer" },
{ task: "Write integration tests", name: "Tester" },
]);
- 阶段N+1接收阶段N的DONE摘要作为上下文
- 管道在失败时停止
- 使用时:清晰的线性依赖链
3. hub-spoke — 持久协调者
hubAndSpoke({
hub: { task: "Coordinate building a REST API", name: "Lead" },
workers: [
{ task: "Build database models", name: "DbWorker" },
{ task: "Build route handlers", name: "ApiWorker" },
],
});
- 中心保持活跃,接收工作者的ACK/DONE
- 中心可以动态生成额外的工作者
- 使用时:领导需要审查,调整并做出决策
4. consensus — 合作投票
consensus({
proposal: "Should we migrate to Fastify?",
voters: [
{ task: "Evaluate performance", name: "PerfExpert" },
{ task: "Evaluate DX", name: "DxExpert" },
],
consensusType: "majority",
});
- 智能体独立评估,然后VOTE:批准/拒绝
- 支持多数,超多数,一致,加权,法定人数
- 使用时:需要多样化视角的决策
5. mesh — 点对点协作
mesh({
goal: "Debug the auth flow returning 500",
agents: [
{ task: "Check server logs", name: "LogAnalyst" },
{ task: "Review auth code", name: "CodeReviewer" },
{ task: "Write repro test", name: "Tester" },
],
});
- 所有智能体在同一频道,自由通信
- 轮询跟踪检测停滞
- 使用时:没有层级的协作探索
6. handoff — 动态路由
handoff({
entryPoint: { task: "Triage the request", name: "Triage" },
routes: [
{ agent: { task: "Handle billing", name: "Billing" }, condition: "billing, payment" },
{ agent: { task: "Handle tech issues", name: "TechSupport" }, condition: "error, bug" },
],
maxHandoffs: 3,
});
- 一次一个活动代理;动态转移控制
- 断路器防止无限路由循环
- 使用时:事先不知道正确的专家
7. cascade — 成本意识升级
cascade({
tiers: [
{ agent: { task: "Answer this", cli: "claude" }, confidenceThreshold: 0.7, costWeight: 1 },
{ agent: { task: "Answer this", cli: "claude" }, confidenceThreshold: 0.85, costWeight: 5 },
{ agent: { task: "Answer this", cli: "claude" }, costWeight: 20 },
],
});
- 从便宜开始,如果信心<阈值则升级
- 代理报告:
DONE [confidence=0.4]: <answer> - 使用时:大多数任务简单,有些需要重推理
8. dag — 有向无环图
dag({
nodes: [
{ id: "scaffold", task: "Create project scaffold" },
{ id: "frontend", task: "Build React UI", dependsOn: ["scaffold"] },
{ id: "backend", task: "Build API", dependsOn: ["scaffold"] },
{ id: "integrate", task: "Wire together", dependsOn: ["frontend", "backend"] },
],
maxConcurrency: 3,
});
- 拓扑排序确定执行顺序
- 独立节点并行运行
- 使用时:管道太线性,fan-out太平坦
9. debate — 对抗性提炼
debate({
topic: "Monorepo vs polyrepo for the new platform?",
debaters: [
{ task: "Argue for monorepo", position: "monorepo" },
{ task: "Argue for polyrepo", position: "polyrepo" },
],
judge: { task: "Judge and decide", name: "ArchJudge" },
maxRounds: 3,
});
- 结构化回合:论点 → 反驳 → 裁决
- 可选裁判;没有裁判,代理自我汇聚或分裂
- 使用时:需要严格的对抗性审查
10. hierarchical — 多级代理
hierarchical({
agents: [
{ id: "lead", task: "Coordinate full-stack app", role: "lead" },
{ id: "fe-coord", task: "Manage frontend", role: "coordinator", reportsTo: "lead" },
{ id: "be-coord", task: "Manage backend", role: "coordinator", reportsTo: "lead" },
{ id: "fe-dev", task: "Build components", role: "worker", reportsTo: "fe-coord" },
{ id: "be-dev", task: "Build API", role: "worker", reportsTo: "be-coord" },
],
});
- 工作者 → 协调员 → 领导(多级报告)
- 协调员综合子团队输出
- 使用时:一个中心无法管理太多工作者
反思协议
所有模式都支持反思——定期综合,使航向修正成为可能。通过WorkflowOptions上的reflectionThreshold启用。
{
reflectionThreshold: 10, // 触发后10个智能体消息
onReflect: async (ctx) => {
// 检查ctx.recentMessages, ctx.agentStatuses
// 返回调整或null
},
}
反思是事件驱动的(重要性加权累积),不是基于定时器的。有关详细信息,请参见WORKFLOWS_SPEC.md。
常见错误
| 错误 | 为什么失败 | 修复 |
|---|---|---|
| 使用mesh做所有事情 | O(n^2)通信,调试噩梦 | 对大多数任务使用hub-spoke |
| 独立工作使用pipeline | 顺序瓶颈 | 使用fan-out或dag |
| 简单并行任务使用hub-spoke | 中心是不必要的开销 | 使用fan-out |
| 非决策使用共识 | 在实施任务上投票浪费时间 | 使用hub-spoke,让领导决定 |
| handoff没有断路器 | 无限路由循环 | 总是设置maxHandoffs |
| 没有信心解析的级联 | 代理不报告信心 | 约定注入处理这个问题 |
| 3个代理使用层次结构 | 管理开销超过好处 | 对小团队使用hub-spoke |
DAG执行器——经过验证的模式
DAG工作流执行的推荐架构,在9节点/5波生产运行上验证。
代理完成:检测 → 释放 → 收集
这是关键模式。 Claude代码代理不会自动退出——协调器必须检测完成并释放它们。
代理写摘要文件 → 协调器轮询(5s) → 检测到新的mtime →
读取摘要 → 调用client.release(agent) → agent_exited触发 → 节点标记为完成
实现:
// 跟踪初始mtime以区分新写入与旧文件
let initialMtime = 0;
try { initialMtime = statSync(summaryPath).mtimeMs; } catch {}
// 每5秒轮询摘要文件
const poll = setInterval(() => {
const stat = statSync(summaryPath);
if (stat.mtimeMs > initialMtime) {
const content = readFileSync(summaryPath, "utf-8").trim();
await client.release(agentName); // 触发agent_exited
finish("completed", content);
}
}, 5_000);
约定注入告诉代理:
- 通过Relaycast MCP发送摘要(mcp__relaycast__send到频道)进行智能体间通信
- 将摘要写入.relay/summaries/{nodeId}.md作为完成信号
- 包括文件路径,类型名称,方法签名——下游代理依赖于此
通信:Relaycast MCP
代理通过Relaycast MCP通信,而不是基于文件的协议:
- **频道消息:**mcp__relaycast__send带有频道名称
- **直接消息:**mcp__relaycast__dm带有代理名称
- Claude代码代理继承.mcp.json配置并具有完整的MCP访问权限
- 其他CLI(codex,aider)可能没有MCP——使用摘要文件作为后备
状态和恢复
在每个节点完成后持久化状态以进行崩溃恢复:
saveState(completed, depsOutput, results, startTime);
// 使用--resume重启以跳过已完成的节点
**陷阱:**当恢复时,只加载completed节点——绝不加载failed条目,否则下游将被永久阻塞。
陷阱参考
| 类别 | 陷阱 | 修复 |
|---|---|---|
| 完成 | 等待agent_exited而不释放——代理空闲直到超时 | 轮询摘要文件,检测到时释放代理 |
| 完成 | 没有resolved警卫——轮询间隔和超时都触发,双重解决 | 每次resolve前检查resolved布尔标志 |
| 信号 | PTY提示回声匹配信号关键字(DONE:,ERROR:)导致虚假完成 | 永远不要在任务提示中放置信号关键字;使用基于文件的信号 |
| 摘要 | 薄摘要(“创建类型”)对下游代理无用 | 约定注入要求文件路径,签名,关键导出 |
| 执行 | batch中的Promise.race——一次成功掩盖了后来的失败 | 每个批次使用Promise.allSettled |
| 弹性 | 没有–resume——协调器崩溃丢失所有进度 | 在每个节点后持久化completed集+depsOutput |
| 弹性 | 没有下游失败传播——依赖项陷入困境 | 在失败时将所有传递依赖标记为"blocked" |
| 约定 | 代理不读取现有代码——输出与项目模式不匹配 | 每个节点的readFirst字段,包含在约定注入中 |
| 能力 | 假设所有CLI都有MCP工具——codex/aider可能没有 | 检查CLI能力;对于非Claude CLI使用摘要文件作为后备 |
| 基础设施 | Rust代理与Node.js CLI二进制文件混淆(同名,不同行为) | 总是设置明确的binaryPath;使用独特的代理名称以避免409冲突 |
| 基础设施 | getLogs()假设Node.js守护进程日志文件——Rust代理不写它们 | 使用代理事件或摘要文件,而不是日志文件轮询 |
YAML工作流定义
任何模式都可以在YAML中定义以便于移植:
version: "1.0"
name: feature-dev
pattern: hub-spoke
agents:
- id: lead
role: lead
cli: claude
- id: developer
role: worker
cli: codex
reportsTo: lead
steps:
- id: plan
agent: lead
prompt: "Create a development plan for: {{task}}"
expects: "PLAN_COMPLETE"
- id: implement
agent: developer
dependsOn: [plan]
prompt: "Implement: {{steps.plan.output}}"
expects: "DONE"
reflection:
enabled: true
threshold: 10
trajectory:
enabled: true
存储在.relay/workflows/中,并使用以下命令运行:
const workflow = await loadWorkflow(".relay/workflows/feature-dev.yaml");
const run = runWorkflow(workflow, "Add user authentication");