name: agent-swarm-orchestrator description: 设计具有协调代理群、任务分发、代理间通信和涌现集体行为的多代理系统。 license: MIT
代理群协调器
此技能提供设计多代理系统的指导,其中多个AI代理通过分布式执行和涌现行为协调完成复杂任务。
核心能力
- 群架构: 代理拓扑结构、通信模式
- 任务分发: 工作分配、负载均衡
- 协调协议: 共识、投票、委派
- 涌现行为: 来自简单规则的集体智能
多代理基础知识
为什么使用多代理系统
单代理: 多代理群:
┌─────────────────┐ ┌─────────────────────────────┐
│ │ │ ┌───┐ ┌───┐ ┌───┐ ┌───┐ │
│ 一个代理 │ │ │ A │ │ A │ │ A │ │ A │ │
│ 顺序执行 │ 对比 │ └───┘ └───┘ └───┘ └───┘ │
│ 单一视角 │ │ 并行、多样化视角 │
│ │ │ 专业化可能 │
└─────────────────┘ └─────────────────────────────┘
优势:
- 并行性: 多个代理同时工作
- 专业化: 代理可具有不同能力
- 韧性: 如果一个代理失败,系统继续运行
- 多样化视角: 问题的多种解决方法
代理角色
| 角色 | 责任 | 特征 |
|---|---|---|
| 协调器 | 协调代理群 | 全局视图、任务分配 |
| 工作器 | 执行任务 | 专业技能、专注 |
| 监督器 | 质量控制 | 审查、批准、重定向 |
| 专家 | 领域专业知识 | 深度知识、窄范围 |
| 侦察员 | 探索 | 信息收集、研究 |
群拓扑结构
层次结构
┌──────────────┐
│ 协调器 │
└──────┬───────┘
│
┌────────────────┼────────────────┐
│ │ │
┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐
│监督器 │ │监督器 │ │监督器 │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
┌─────┼─────┐ ┌─────┼─────┐ ┌─────┼─────┐
│ │ │ │ │ │ │ │ │
┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐
│W│ │W│ │W│ │W│ │W│ │W│ │W│ │W│ │W│
└─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘
工作器
最适用: 清晰任务分解、需要质量控制
点对点
┌───┐───────────────┌───┐
│ A │ │ A │
└───┘ └───┘
│ \ / │
│ \ / │
│ \ / │
│ \ / │
┌───┐ ╳ ┌───┐
│ A │ / \ │ A │
└───┘ / \ └───┘
/ \
┌───┐ ┌───┐
│ A │───────────│ A │
└───┘ └───┘
最适用: 协作问题解决、无单点故障
黑板
┌─────────────────────────────────────────────────────┐
│ 黑板 │
│ ┌─────────────┐ ┌─────────────┐ ┌───────────────┐ │
│ │ 问题状态 │ │ 部分结果 │ │ 解决方案 │ │
│ └─────────────┘ └─────────────┘ └───────────────┘ │
└───────────────────────┬─────────────────────────────┘
│
┌───────────────────┼───────────────────┐
│ 读/写 │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│代理A │ │代理B │ │代理C │
│分析师│ │构建师│ │评审师│
└───────┘ └───────┘ └───────┘
最适用: 复杂问题、代理异步贡献
代理实现
基础代理结构
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Optional, List
from enum import Enum
import asyncio
class AgentStatus(Enum):
IDLE = "idle"
WORKING = "working"
WAITING = "waiting"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class AgentMessage:
sender_id: str
recipient_id: str # 或 "broadcast"
message_type: str
content: Any
timestamp: float = field(default_factory=lambda: time.time())
correlation_id: Optional[str] = None
@dataclass
class Task:
id: str
description: str
priority: int = 0
dependencies: List[str] = field(default_factory=list)
assigned_to: Optional[str] = None
status: str = "pending"
result: Any = None
class BaseAgent(ABC):
"""群代理的基类"""
def __init__(self, agent_id: str, capabilities: List[str]):
self.id = agent_id
self.capabilities = capabilities
self.status = AgentStatus.IDLE
self.message_queue: asyncio.Queue = asyncio.Queue()
self.current_task: Optional[Task] = None
@abstractmethod
async def process_task(self, task: Task) -> Any:
"""处理分配的任务 - 在子类中实现"""
pass
@abstractmethod
def can_handle(self, task: Task) -> bool:
"""检查代理是否可以处理此任务类型"""
pass
async def receive_message(self, message: AgentMessage):
"""将消息添加到队列进行处理"""
await self.message_queue.put(message)
async def run(self):
"""主代理循环"""
while True:
# 检查消息
try:
message = await asyncio.wait_for(
self.message_queue.get(),
timeout=0.1
)
await self._handle_message(message)
except asyncio.TimeoutError:
pass
# 处理当前任务
if self.current_task and self.status == AgentStatus.WORKING:
await self._work_on_task()
async def _handle_message(self, message: AgentMessage):
"""处理传入消息"""
if message.message_type == "assign_task":
self.current_task = message.content
self.status = AgentStatus.WORKING
elif message.message_type == "cancel_task":
self.current_task = None
self.status = AgentStatus.IDLE
elif message.message_type == "status_request":
await self._send_status(message.sender_id)
async def _work_on_task(self):
"""执行当前任务"""
try:
result = await self.process_task(self.current_task)
self.current_task.result = result
self.current_task.status = "completed"
self.status = AgentStatus.COMPLETED
except Exception as e:
self.current_task.status = "failed"
self.status = AgentStatus.FAILED
专业化代理
class ResearchAgent(BaseAgent):
"""专门用于信息收集的代理"""
def __init__(self, agent_id: str):
super().__init__(agent_id, ["research", "search", "analyze"])
self.search_tools = []
def can_handle(self, task: Task) -> bool:
return any(cap in task.description.lower()
for cap in ["research", "find", "search", "investigate"])
async def process_task(self, task: Task) -> dict:
# 研究实现
results = await self._search(task.description)
analysis = await self._analyze(results)
return {
"sources": results,
"analysis": analysis,
"confidence": self._calculate_confidence(results)
}
class CodeAgent(BaseAgent):
"""专门用于代码生成的代理"""
def __init__(self, agent_id: str):
super().__init__(agent_id, ["code", "implement", "debug"])
def can_handle(self, task: Task) -> bool:
return any(cap in task.description.lower()
for cap in ["implement", "code", "write", "fix", "debug"])
async def process_task(self, task: Task) -> dict:
# 代码生成实现
code = await self._generate_code(task.description)
tests = await self._generate_tests(code)
return {
"code": code,
"tests": tests,
"language": self._detect_language(code)
}
class ReviewAgent(BaseAgent):
"""专门用于质量评审的代理"""
def __init__(self, agent_id: str):
super().__init__(agent_id, ["review", "critique", "approve"])
def can_handle(self, task: Task) -> bool:
return "review" in task.description.lower()
async def process_task(self, task: Task) -> dict:
artifact = task.content # 要评审的内容
issues = await self._find_issues(artifact)
suggestions = await self._generate_suggestions(issues)
return {
"approved": len(issues) == 0,
"issues": issues,
"suggestions": suggestions
}
协调模式
基于任务的协调
class SwarmOrchestrator:
"""协调代理群完成任务"""
def __init__(self):
self.agents: dict[str, BaseAgent] = {}
self.task_queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
self.completed_tasks: dict[str, Task] = {}
self.message_bus = MessageBus()
def register_agent(self, agent: BaseAgent):
"""将代理添加到群中"""
self.agents[agent.id] = agent
async def submit_task(self, task: Task):
"""提交任务进行处理"""
# 计算有效优先级(低值 = 高优先级)
priority = -task.priority # 取负用于最小堆行为
await self.task_queue.put((priority, task))
async def run(self):
"""主协调循环"""
while True:
# 获取最高优先级任务
_, task = await self.task_queue.get()
# 检查依赖
if not self._dependencies_met(task):
# 重新排队,优先级降低
await self.task_queue.put((1, task))
continue
# 找到合适的代理
agent = await self._find_available_agent(task)
if agent:
await self._assign_task(agent, task)
else:
# 无可用代理,重新排队
await asyncio.sleep(0.1)
await self.task_queue.put((0, task))
def _dependencies_met(self, task: Task) -> bool:
"""检查所有依赖是否已完成"""
for dep_id in task.dependencies:
if dep_id not in self.completed_tasks:
return False
if self.completed_tasks[dep_id].status != "completed":
return False
return True
async def _find_available_agent(self, task: Task) -> Optional[BaseAgent]:
"""找到可以处理任务且空闲的代理"""
for agent in self.agents.values():
if agent.status == AgentStatus.IDLE and agent.can_handle(task):
return agent
return None
async def _assign_task(self, agent: BaseAgent, task: Task):
"""将任务分配给代理"""
task.assigned_to = agent.id
task.status = "assigned"
message = AgentMessage(
sender_id="orchestrator",
recipient_id=agent.id,
message_type="assign_task",
content=task
)
await agent.receive_message(message)
工作流协调
from dataclasses import dataclass
from typing import Callable, List
@dataclass
class WorkflowStep:
name: str
agent_type: str
input_transform: Callable[[dict], dict] = lambda x: x
required_approval: bool = False
class WorkflowOrchestrator:
"""通过代理群执行多步骤工作流"""
def __init__(self, swarm: SwarmOrchestrator):
self.swarm = swarm
self.workflows: dict[str, List[WorkflowStep]] = {}
def register_workflow(self, name: str, steps: List[WorkflowStep]):
"""注册多步骤工作流"""
self.workflows[name] = steps
async def execute_workflow(
self,
workflow_name: str,
initial_input: dict
) -> dict:
"""通过代理群执行工作流"""
steps = self.workflows[workflow_name]
current_data = initial_input
results = []
for i, step in enumerate(steps):
# 为此步骤转换输入
step_input = step.input_transform(current_data)
# 创建任务
task = Task(
id=f"{workflow_name}_{i}_{step.name}",
description=f"执行 {step.name}",
context=step_input
)
# 提交并等待完成
await self.swarm.submit_task(task)
result = await self._wait_for_completion(task.id)
# 如果需要,处理批准
if step.required_approval:
approved = await self._request_approval(step, result)
if not approved:
return {"status": "rejected", "step": step.name}
results.append(result)
current_data = {**current_data, **result}
return {
"status": "completed",
"results": results,
"final_output": current_data
}
代理间通信
消息模式
class MessageBus:
"""群通信的中央消息路由"""
def __init__(self):
self.subscribers: dict[str, List[BaseAgent]] = {}
self.message_history: List[AgentMessage] = []
def subscribe(self, topic: str, agent: BaseAgent):
"""订阅代理到主题"""
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(agent)
async def publish(self, topic: str, message: AgentMessage):
"""发布消息到主题订阅者"""
self.message_history.append(message)
for agent in self.subscribers.get(topic, []):
if agent.id != message.sender_id: # 不发送给自己
await agent.receive_message(message)
async def send_direct(self, message: AgentMessage):
"""发送消息到特定代理"""
# 路由到接收者
pass
async def broadcast(self, message: AgentMessage):
"""广播到所有代理"""
for topic_subscribers in self.subscribers.values():
for agent in topic_subscribers:
if agent.id != message.sender_id:
await agent.receive_message(message)
共识协议
class ConsensusProtocol:
"""在代理中达成共识"""
def __init__(self, agents: List[BaseAgent], threshold: float = 0.66):
self.agents = agents
self.threshold = threshold
async def vote(self, proposal: Any) -> dict:
"""从代理收集投票"""
votes = {}
for agent in self.agents:
message = AgentMessage(
sender_id="consensus",
recipient_id=agent.id,
message_type="vote_request",
content=proposal
)
response = await self._request_vote(agent, message)
votes[agent.id] = response
return self._tally_votes(votes)
def _tally_votes(self, votes: dict) -> dict:
"""计算共识结果"""
approve_count = sum(1 for v in votes.values() if v.get("approve"))
total = len(votes)
consensus_reached = (approve_count / total) >= self.threshold
return {
"consensus_reached": consensus_reached,
"approve_count": approve_count,
"reject_count": total - approve_count,
"threshold": self.threshold,
"votes": votes
}
涌现行为
刺激源模式
通过环境修改间接协调:
class SharedWorkspace:
"""用于刺激源协调的共享环境"""
def __init__(self):
self.artifacts: dict[str, Any] = {}
self.pheromones: dict[str, float] = {} # 强度指示器
self.decay_rate = 0.1
def deposit(self, key: str, artifact: Any, strength: float = 1.0):
"""添加制品到共享空间"""
self.artifacts[key] = artifact
self.pheromones[key] = strength
def sense(self, pattern: str) -> List[tuple[str, Any, float]]:
"""找到匹配模式的制品"""
matches = []
for key, artifact in self.artifacts.items():
if pattern in key:
strength = self.pheromones.get(key, 0)
matches.append((key, artifact, strength))
return sorted(matches, key=lambda x: -x[2]) # 强度最高优先
def decay(self):
"""随时间减少信息素强度"""
for key in self.pheromones:
self.pheromones[key] *= (1 - self.decay_rate)
if self.pheromones[key] < 0.01:
del self.pheromones[key]
del self.artifacts[key]
def reinforce(self, key: str, amount: float = 0.1):
"""增强信息素轨迹"""
if key in self.pheromones:
self.pheromones[key] = min(1.0, self.pheromones[key] + amount)
蚁群优化
class AntColonyTaskAllocator:
"""使用蚁群优化分配任务"""
def __init__(self, agents: List[BaseAgent], tasks: List[Task]):
self.agents = agents
self.tasks = tasks
self.pheromones = {} # (agent_id, task_id) -> 强度
self.alpha = 1.0 # 信息素重要性
self.beta = 2.0 # 启发式重要性
self.evaporation = 0.1
def _calculate_probability(
self,
agent: BaseAgent,
task: Task
) -> float:
"""计算将任务分配给代理的概率"""
key = (agent.id, task.id)
# 信息素组件
pheromone = self.pheromones.get(key, 0.1)
# 启发式: 代理能力匹配
heuristic = 1.0 if agent.can_handle(task) else 0.1
return (pheromone ** self.alpha) * (heuristic ** self.beta)
def allocate(self) -> dict[str, str]:
"""生成任务分配"""
allocation = {}
available_tasks = set(t.id for t in self.tasks)
for agent in self.agents:
if not available_tasks:
break
# 计算可用任务的概率
probs = {}
for task in self.tasks:
if task.id in available_tasks:
probs[task.id] = self._calculate_probability(agent, task)
# 归一化并选择
total = sum(probs.values())
if total > 0:
selected = self._weighted_choice(probs, total)
allocation[agent.id] = selected
available_tasks.remove(selected)
return allocation
def update_pheromones(self, allocation: dict, quality: dict):
"""基于解决方案质量更新信息素"""
# 蒸发
for key in self.pheromones:
self.pheromones[key] *= (1 - self.evaporation)
# 基于质量沉积
for agent_id, task_id in allocation.items():
key = (agent_id, task_id)
deposit = quality.get(task_id, 0.1)
self.pheromones[key] = self.pheromones.get(key, 0) + deposit
最佳实践
设计指南
- 保持代理简单: 复杂行为从简单规则涌现
- 定义清晰接口: 消息格式、任务结构
- 规划故障: 代理会失败;系统应继续运行
- 监控集体行为: 单个代理可能正常,但群可能卡住
- 版本协调协议: 代理可能运行不同版本
避免的反模式
- 上帝协调器: 一个代理做所有事情
- 闲聊代理: 过多代理间通信
- 紧耦合: 代理依赖特定其他代理
- 缺失截止时间: 任务完成无超时
- 状态爆炸: 代理维护太多状态
参考资料
references/swarm-topologies.md- 详细拓扑模式references/coordination-protocols.md- 共识和投票算法references/emergent-patterns.md- 刺激源和自组织