名称: 智能体群协调器 描述: 提供设计多智能体系统的指导,其中多个AI智能体协调以通过分布式执行和涌现行为完成复杂任务。 许可证: MIT
智能体群协调器
此技能提供设计多智能体系统的指导,其中多个AI智能体协调以通过分布式执行和涌现行为完成复杂任务。
核心能力
- 群架构: 智能体拓扑、通信模式
- 任务分配: 工作分配、负载均衡
- 协调协议: 共识、投票、委托
- 涌现行为: 从简单规则中产生的集体智能
多智能体基础
为什么使用多智能体系统
单智能体: 多智能体群:
┌─────────────────┐ ┌─────────────────────────────┐
│ │ │ ┌───┐ ┌───┐ ┌───┐ ┌───┐ │
│ 一个智能体 │ │ │ A │ │ A │ │ A │ │ A │ │
│ 顺序执行 │ 对比 │ └───┘ └───┘ └───┘ └───┘ │
│ 单视角 │ │ 并行、多样视角 │
│ │ │ 可能实现专业化 │
└─────────────────┘ └─────────────────────────────┘
优势:
- 并行性: 多个智能体同时工作
- 专业化: 智能体可以具有不同能力
- 弹性: 如果一个智能体失败,系统继续运行
- 多样视角: 问题的多种方法
智能体角色
| 角色 | 责任 | 特点 |
|---|---|---|
| 协调器 | 协调群 | 全局视图、任务分配 |
| 工作者 | 执行任务 | 专业技能、专注 |
| 监督者 | 质量控制 | 审核、批准、重定向 |
| 专家 | 领域专长 | 深度知识、范围窄 |
| 侦察员 | 探索 | 信息收集、研究 |
群拓扑结构
分层结构
┌──────────────┐
│ 协调器 │
└──────┬───────┘
│
┌────────────────┼────────────────┐
│ │ │
┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐
│ 监督者 │ │ 监督者 │ │ 监督者 │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
┌─────┼─────┐ ┌─────┼─────┐ ┌─────┼─────┐
│ │ │ │ │ │ │ │ │
┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐
│W│ │W│ │W│ │W│ │W│ │W│ │W│ │W│ │W│
└─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘
工作者
最适合: 清晰的任务分解,需要质量控制
点对点结构
┌───┐───────────────┌───┐
│ A │ │ A │
└───┘ └───┘
│ \ / │
│ \ / │
│ \ / │
│ \ / │
┌───┐ ╳ ┌───┐
│ A │ / \ │ A │
└───┘ / \ └───┘
/ \
┌───┐ ┌───┐
│ A │───────────│ A │
└───┘ └───┘
最适合: 协作问题解决,无单点故障
黑板结构
┌─────────────────────────────────────────────────────┐
│ 黑板 │
│ ┌─────────────┐ ┌─────────────┐ ┌───────────────┐ │
│ │ 问题状态 │ │ 部分结果 │ │ 解决方案 │ │
│ │ │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └───────────────┘ │
└───────────────────────┬─────────────────────────────┘
│
┌───────────────────┼───────────────────┐
│ 读/写 │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│智能体A│ │智能体B│ │智能体C│
│分析师│ │构建者│ │评审者│
└───────┘ └───────┘ └───────┘
最适合: 复杂问题,智能体异步贡献
智能体实现
基础智能体结构
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Optional, List
from enum import Enum
import asyncio
class AgentStatus(Enum):
IDLE = "空闲"
WORKING = "工作中"
WAITING = "等待中"
COMPLETED = "完成"
FAILED = "失败"
@dataclass
class AgentMessage:
sender_id: str
recipient_id: str # 或 "广播"
message_type: str
content: Any
timestamp: float = field(default_factory=lambda: time.time())
correlation_id: Optional[str] = None
@dataclass
class Task:
id: str
description: str
priority: int = 0
dependencies: List[str] = field(default_factory=list)
assigned_to: Optional[str] = None
status: str = "待处理"
result: Any = None
class BaseAgent(ABC):
"""群智能体的基类"""
def __init__(self, agent_id: str, capabilities: List[str]):
self.id = agent_id
self.capabilities = capabilities
self.status = AgentStatus.IDLE
self.message_queue: asyncio.Queue = asyncio.Queue()
self.current_task: Optional[Task] = None
@abstractmethod
async def process_task(self, task: Task) -> Any:
"""处理分配的任务 - 在子类中实现"""
pass
@abstractmethod
def can_handle(self, task: Task) -> bool:
"""检查智能体是否可以处理此任务类型"""
pass
async def receive_message(self, message: AgentMessage):
"""添加消息到队列进行处理"""
await self.message_queue.put(message)
async def run(self):
"""主智能体循环"""
while True:
# 检查消息
try:
message = await asyncio.wait_for(
self.message_queue.get(),
timeout=0.1
)
await self._handle_message(message)
except asyncio.TimeoutError:
pass
# 处理当前任务
if self.current_task and self.status == AgentStatus.WORKING:
await self._work_on_task()
async def _handle_message(self, message: AgentMessage):
"""处理传入消息"""
if message.message_type == "分配任务":
self.current_task = message.content
self.status = AgentStatus.WORKING
elif message.message_type == "取消任务":
self.current_task = None
self.status = AgentStatus.IDLE
elif message.message_type == "状态请求":
await self._send_status(message.sender_id)
async def _work_on_task(self):
"""执行当前任务"""
try:
result = await self.process_task(self.current_task)
self.current_task.result = result
self.current_task.status = "完成"
self.status = AgentStatus.COMPLETED
except Exception as e:
self.current_task.status = "失败"
self.status = AgentStatus.FAILED
专用智能体
class ResearchAgent(BaseAgent):
"""专门用于信息收集的智能体"""
def __init__(self, agent_id: str):
super().__init__(agent_id, ["研究", "搜索", "分析"])
self.search_tools = []
def can_handle(self, task: Task) -> bool:
return any(cap in task.description.lower()
for cap in ["研究", "查找", "搜索", "调查"])
async def process_task(self, task: Task) -> dict:
# 研究实现
results = await self._search(task.description)
analysis = await self._analyze(results)
return {
"来源": results,
"分析": analysis,
"置信度": self._calculate_confidence(results)
}
class CodeAgent(BaseAgent):
"""专门用于代码生成的智能体"""
def __init__(self, agent_id: str):
super().__init__(agent_id, ["代码", "实现", "调试"])
def can_handle(self, task: Task) -> bool:
return any(cap in task.description.lower()
for cap in ["实现", "代码", "编写", "修复", "调试"])
async def process_task(self, task: Task) -> dict:
# 代码生成实现
code = await self._generate_code(task.description)
tests = await self._generate_tests(code)
return {
"代码": code,
"测试": tests,
"语言": self._detect_language(code)
}
class ReviewAgent(BaseAgent):
"""专门用于质量评审的智能体"""
def __init__(self, agent_id: str):
super().__init__(agent_id, ["评审", "批评", "批准"])
def can_handle(self, task: Task) -> bool:
return "评审" in task.description.lower()
async def process_task(self, task: Task) -> dict:
artifact = task.content # 要评审的内容
issues = await self._find_issues(artifact)
suggestions = await self._generate_suggestions(issues)
return {
"批准": len(issues) == 0,
"问题": issues,
"建议": suggestions
}
协调模式
基于任务的协调
class SwarmOrchestrator:
"""协调智能体群完成任务"""
def __init__(self):
self.agents: dict[str, BaseAgent] = {}
self.task_queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
self.completed_tasks: dict[str, Task] = {}
self.message_bus = MessageBus()
def register_agent(self, agent: BaseAgent):
"""添加智能体到群"""
self.agents[agent.id] = agent
async def submit_task(self, task: Task):
"""提交任务处理"""
# 计算有效优先级(越低优先级越高)
priority = -task.priority # 取负用于最小堆行为
await self.task_queue.put((priority, task))
async def run(self):
"""主协调循环"""
while True:
# 获取最高优先级任务
_, task = await self.task_queue.get()
# 检查依赖
if not self._dependencies_met(task):
# 以较低优先级重新入队
await self.task_queue.put((1, task))
continue
# 找到合适智能体
agent = await self._find_available_agent(task)
if agent:
await self._assign_task(agent, task)
else:
# 无可用智能体,重新入队
await asyncio.sleep(0.1)
await self.task_queue.put((0, task))
def _dependencies_met(self, task: Task) -> bool:
"""检查所有依赖是否完成"""
for dep_id in task.dependencies:
if dep_id not in self.completed_tasks:
return False
if self.completed_tasks[dep_id].status != "完成":
return False
return True
async def _find_available_agent(self, task: Task) -> Optional[BaseAgent]:
"""找到空闲且能处理任务的智能体"""
for agent in self.agents.values():
if agent.status == AgentStatus.IDLE and agent.can_handle(task):
return agent
return None
async def _assign_task(self, agent: BaseAgent, task: Task):
"""分配任务给智能体"""
task.assigned_to = agent.id
task.status = "已分配"
message = AgentMessage(
sender_id="协调器",
recipient_id=agent.id,
message_type="分配任务",
content=task
)
await agent.receive_message(message)
工作流协调
from dataclasses import dataclass
from typing import Callable, List
@dataclass
class WorkflowStep:
name: str
agent_type: str
input_transform: Callable[[dict], dict] = lambda x: x
required_approval: bool = False
class WorkflowOrchestrator:
"""使用智能体群执行多步骤工作流"""
def __init__(self, swarm: SwarmOrchestrator):
self.swarm = swarm
self.workflows: dict[str, List[WorkflowStep]] = {}
def register_workflow(self, name: str, steps: List[WorkflowStep]):
"""注册多步骤工作流"""
self.workflows[name] = steps
async def execute_workflow(
self,
workflow_name: str,
initial_input: dict
) -> dict:
"""通过智能体群执行工作流"""
steps = self.workflows[workflow_name]
current_data = initial_input
results = []
for i, step in enumerate(steps):
# 转换此步骤的输入
step_input = step.input_transform(current_data)
# 创建任务
task = Task(
id=f"{workflow_name}_{i}_{step.name}",
description=f"执行 {step.name}",
context=step_input
)
# 提交并等待完成
await self.swarm.submit_task(task)
result = await self._wait_for_completion(task.id)
# 如果需要,处理批准
if step.required_approval:
approved = await self._request_approval(step, result)
if not approved:
return {"状态": "拒绝", "步骤": step.name}
results.append(result)
current_data = {**current_data, **result}
return {
"状态": "完成",
"结果": results,
"最终输出": current_data
}
智能体间通信
消息模式
class MessageBus:
"""群通信的中央消息路由"""
def __init__(self):
self.subscribers: dict[str, List[BaseAgent]] = {}
self.message_history: List[AgentMessage] = []
def subscribe(self, topic: str, agent: BaseAgent):
"""订阅智能体到主题"""
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(agent)
async def publish(self, topic: str, message: AgentMessage):
"""发布消息到主题订阅者"""
self.message_history.append(message)
for agent in self.subscribers.get(topic, []):
if agent.id != message.sender_id: # 不发送给自己
await agent.receive_message(message)
async def send_direct(self, message: AgentMessage):
"""发送消息到特定智能体"""
# 路由到接收者
pass
async def broadcast(self, message: AgentMessage):
"""广播到所有智能体"""
for topic_subscribers in self.subscribers.values():
for agent in topic_subscribers:
if agent.id != message.sender_id:
await agent.receive_message(message)
共识协议
class ConsensusProtocol:
"""在智能体间达成共识"""
def __init__(self, agents: List[BaseAgent], threshold: float = 0.66):
self.agents = agents
self.threshold = threshold
async def vote(self, proposal: Any) -> dict:
"""从智能体收集投票"""
votes = {}
for agent in self.agents:
message = AgentMessage(
sender_id="共识",
recipient_id=agent.id,
message_type="投票请求",
content=proposal
)
response = await self._request_vote(agent, message)
votes[agent.id] = response
return self._tally_votes(votes)
def _tally_votes(self, votes: dict) -> dict:
"""计算共识结果"""
approve_count = sum(1 for v in votes.values() if v.get("批准"))
total = len(votes)
consensus_reached = (approve_count / total) >= self.threshold
return {
"达成共识": consensus_reached,
"批准数": approve_count,
"拒绝数": total - approve_count,
"阈值": self.threshold,
"投票": votes
}
涌现行为
痕迹模式
通过环境修改的间接协调:
class SharedWorkspace:
"""用于痕迹协调的共享环境"""
def __init__(self):
self.artifacts: dict[str, Any] = {}
self.pheromones: dict[str, float] = {} # 强度指示器
self.decay_rate = 0.1
def deposit(self, key: str, artifact: Any, strength: float = 1.0):
"""添加内容到共享空间"""
self.artifacts[key] = artifact
self.pheromones[key] = strength
def sense(self, pattern: str) -> List[tuple[str, Any, float]]:
"""找到匹配模式的内容"""
matches = []
for key, artifact in self.artifacts.items():
if pattern in key:
strength = self.pheromones.get(key, 0)
matches.append((key, artifact, strength))
return sorted(matches, key=lambda x: -x[2]) # 最高强度优先
def decay(self):
"""随时间减少痕迹强度"""
for key in self.pheromones:
self.pheromones[key] *= (1 - self.decay_rate)
if self.pheromones[key] < 0.01:
del self.pheromones[key]
del self.artifacts[key]
def reinforce(self, key: str, amount: float = 0.1):
"""加强痕迹轨迹"""
if key in self.pheromones:
self.pheromones[key] = min(1.0, self.pheromones[key] + amount)
蚁群优化
class AntColonyTaskAllocator:
"""使用蚁群优化分配任务"""
def __init__(self, agents: List[BaseAgent], tasks: List[Task]):
self.agents = agents
self.tasks = tasks
self.pheromones = {} # (agent_id, task_id) -> 强度
self.alpha = 1.0 # 痕迹重要性
self.beta = 2.0 # 启发式重要性
self.evaporation = 0.1
def _calculate_probability(
self,
agent: BaseAgent,
task: Task
) -> float:
"""计算任务分配给智能体的概率"""
key = (agent.id, task.id)
# 痕迹组件
pheromone = self.pheromones.get(key, 0.1)
# 启发式:智能体能力匹配
heuristic = 1.0 if agent.can_handle(task) else 0.1
return (pheromone ** self.alpha) * (heuristic ** self.beta)
def allocate(self) -> dict[str, str]:
"""生成任务分配"""
allocation = {}
available_tasks = set(t.id for t in self.tasks)
for agent in self.agents:
if not available_tasks:
break
# 计算可用任务的概率
probs = {}
for task in self.tasks:
if task.id in available_tasks:
probs[task.id] = self._calculate_probability(agent, task)
# 归一化并选择
total = sum(probs.values())
if total > 0:
selected = self._weighted_choice(probs, total)
allocation[agent.id] = selected
available_tasks.remove(selected)
return allocation
def update_pheromones(self, allocation: dict, quality: dict):
"""基于解决方案质量更新痕迹"""
# 蒸发
for key in self.pheromones:
self.pheromones[key] *= (1 - self.evaporation)
# 基于质量沉积
for agent_id, task_id in allocation.items():
key = (agent_id, task_id)
deposit = quality.get(task_id, 0.1)
self.pheromones[key] = self.pheromones.get(key, 0) + deposit
最佳实践
设计指南
- 保持智能体简单: 复杂行为从简单规则中涌现
- 定义清晰接口: 消息格式、任务结构
- 规划失败: 智能体会失败;系统应继续运行
- 监控集体行为: 单个智能体可能正常,但群可能卡住
- 版本协调协议: 智能体可能运行不同版本
要避免的反模式
- 上帝协调器: 一个智能体做所有事情
- 健谈智能体: 过多的智能体间通信
- 紧密耦合: 智能体依赖特定其他智能体
- 错过截止时间: 任务完成无超时
- 状态爆炸: 智能体维护太多状态
参考文献
references/swarm-topologies.md- 详细拓扑模式references/coordination-protocols.md- 共识和投票算法references/emergent-patterns.md- 痕迹和自组织