智能体群协调器 agent-swarm-orchestrator

智能体群协调器是一种专注于设计和实现多智能体系统的技能,通过智能体间的协调、任务分配和通信,实现集体智能和复杂任务处理。关键词包括:多智能体、AI智能体、任务分配、协调协议、涌现行为、分布式AI。

AI智能体 0 次安装 0 次浏览 更新于 3/7/2026

名称: 智能体群协调器 描述: 提供设计多智能体系统的指导,其中多个AI智能体协调以通过分布式执行和涌现行为完成复杂任务。 许可证: MIT

智能体群协调器

此技能提供设计多智能体系统的指导,其中多个AI智能体协调以通过分布式执行和涌现行为完成复杂任务。

核心能力

  • 群架构: 智能体拓扑、通信模式
  • 任务分配: 工作分配、负载均衡
  • 协调协议: 共识、投票、委托
  • 涌现行为: 从简单规则中产生的集体智能

多智能体基础

为什么使用多智能体系统

单智能体:                    多智能体群:
┌─────────────────┐             ┌─────────────────────────────┐
│                 │             │  ┌───┐ ┌───┐ ┌───┐ ┌───┐   │
│   一个智能体     │             │  │ A │ │ A │ │ A │ │ A │   │
│   顺序执行       │     对比     │  └───┘ └───┘ └───┘ └───┘   │
│   单视角         │             │  并行、多样视角             │
│                 │             │  可能实现专业化             │
└─────────────────┘             └─────────────────────────────┘

优势:
- 并行性: 多个智能体同时工作
- 专业化: 智能体可以具有不同能力
- 弹性: 如果一个智能体失败,系统继续运行
- 多样视角: 问题的多种方法

智能体角色

角色 责任 特点
协调器 协调群 全局视图、任务分配
工作者 执行任务 专业技能、专注
监督者 质量控制 审核、批准、重定向
专家 领域专长 深度知识、范围窄
侦察员 探索 信息收集、研究

群拓扑结构

分层结构

                    ┌──────────────┐
                    │   协调器     │
                    └──────┬───────┘
                           │
          ┌────────────────┼────────────────┐
          │                │                │
    ┌─────┴─────┐    ┌─────┴─────┐    ┌─────┴─────┐
    │ 监督者    │    │ 监督者    │    │ 监督者    │
    └─────┬─────┘    └─────┬─────┘    └─────┬─────┘
          │                │                │
    ┌─────┼─────┐    ┌─────┼─────┐    ┌─────┼─────┐
    │     │     │    │     │     │    │     │     │
   ┌┴┐   ┌┴┐   ┌┴┐  ┌┴┐   ┌┴┐   ┌┴┐  ┌┴┐   ┌┴┐   ┌┴┐
   │W│   │W│   │W│  │W│   │W│   │W│  │W│   │W│   │W│
   └─┘   └─┘   └─┘  └─┘   └─┘   └─┘  └─┘   └─┘   └─┘
        工作者

最适合: 清晰的任务分解,需要质量控制

点对点结构

        ┌───┐───────────────┌───┐
        │ A │               │ A │
        └───┘               └───┘
          │ \             / │
          │   \         /   │
          │     \     /     │
          │       \ /       │
        ┌───┐     ╳       ┌───┐
        │ A │   /   \     │ A │
        └───┘ /       \   └───┘
            /           \
        ┌───┐           ┌───┐
        │ A │───────────│ A │
        └───┘           └───┘

最适合: 协作问题解决,无单点故障

黑板结构

┌─────────────────────────────────────────────────────┐
│                     黑板                              │
│  ┌─────────────┐ ┌─────────────┐ ┌───────────────┐ │
│  │ 问题状态    │ │ 部分结果     │ │ 解决方案       │ │
│  │             │ │             │ │               │ │
│  └─────────────┘ └─────────────┘ └───────────────┘ │
└───────────────────────┬─────────────────────────────┘
                        │
    ┌───────────────────┼───────────────────┐
    │        读/写      │                   │
    ▼                   ▼                   ▼
┌───────┐          ┌───────┐          ┌───────┐
│智能体A│          │智能体B│          │智能体C│
│分析师│          │构建者│          │评审者│
└───────┘          └───────┘          └───────┘

最适合: 复杂问题,智能体异步贡献

智能体实现

基础智能体结构

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Optional, List
from enum import Enum
import asyncio

class AgentStatus(Enum):
    IDLE = "空闲"
    WORKING = "工作中"
    WAITING = "等待中"
    COMPLETED = "完成"
    FAILED = "失败"

@dataclass
class AgentMessage:
    sender_id: str
    recipient_id: str  # 或 "广播"
    message_type: str
    content: Any
    timestamp: float = field(default_factory=lambda: time.time())
    correlation_id: Optional[str] = None

@dataclass
class Task:
    id: str
    description: str
    priority: int = 0
    dependencies: List[str] = field(default_factory=list)
    assigned_to: Optional[str] = None
    status: str = "待处理"
    result: Any = None


class BaseAgent(ABC):
    """群智能体的基类"""

    def __init__(self, agent_id: str, capabilities: List[str]):
        self.id = agent_id
        self.capabilities = capabilities
        self.status = AgentStatus.IDLE
        self.message_queue: asyncio.Queue = asyncio.Queue()
        self.current_task: Optional[Task] = None

    @abstractmethod
    async def process_task(self, task: Task) -> Any:
        """处理分配的任务 - 在子类中实现"""
        pass

    @abstractmethod
    def can_handle(self, task: Task) -> bool:
        """检查智能体是否可以处理此任务类型"""
        pass

    async def receive_message(self, message: AgentMessage):
        """添加消息到队列进行处理"""
        await self.message_queue.put(message)

    async def run(self):
        """主智能体循环"""
        while True:
            # 检查消息
            try:
                message = await asyncio.wait_for(
                    self.message_queue.get(),
                    timeout=0.1
                )
                await self._handle_message(message)
            except asyncio.TimeoutError:
                pass

            # 处理当前任务
            if self.current_task and self.status == AgentStatus.WORKING:
                await self._work_on_task()

    async def _handle_message(self, message: AgentMessage):
        """处理传入消息"""
        if message.message_type == "分配任务":
            self.current_task = message.content
            self.status = AgentStatus.WORKING
        elif message.message_type == "取消任务":
            self.current_task = None
            self.status = AgentStatus.IDLE
        elif message.message_type == "状态请求":
            await self._send_status(message.sender_id)

    async def _work_on_task(self):
        """执行当前任务"""
        try:
            result = await self.process_task(self.current_task)
            self.current_task.result = result
            self.current_task.status = "完成"
            self.status = AgentStatus.COMPLETED
        except Exception as e:
            self.current_task.status = "失败"
            self.status = AgentStatus.FAILED

专用智能体

class ResearchAgent(BaseAgent):
    """专门用于信息收集的智能体"""

    def __init__(self, agent_id: str):
        super().__init__(agent_id, ["研究", "搜索", "分析"])
        self.search_tools = []

    def can_handle(self, task: Task) -> bool:
        return any(cap in task.description.lower()
                   for cap in ["研究", "查找", "搜索", "调查"])

    async def process_task(self, task: Task) -> dict:
        # 研究实现
        results = await self._search(task.description)
        analysis = await self._analyze(results)
        return {
            "来源": results,
            "分析": analysis,
            "置信度": self._calculate_confidence(results)
        }


class CodeAgent(BaseAgent):
    """专门用于代码生成的智能体"""

    def __init__(self, agent_id: str):
        super().__init__(agent_id, ["代码", "实现", "调试"])

    def can_handle(self, task: Task) -> bool:
        return any(cap in task.description.lower()
                   for cap in ["实现", "代码", "编写", "修复", "调试"])

    async def process_task(self, task: Task) -> dict:
        # 代码生成实现
        code = await self._generate_code(task.description)
        tests = await self._generate_tests(code)
        return {
            "代码": code,
            "测试": tests,
            "语言": self._detect_language(code)
        }


class ReviewAgent(BaseAgent):
    """专门用于质量评审的智能体"""

    def __init__(self, agent_id: str):
        super().__init__(agent_id, ["评审", "批评", "批准"])

    def can_handle(self, task: Task) -> bool:
        return "评审" in task.description.lower()

    async def process_task(self, task: Task) -> dict:
        artifact = task.content  # 要评审的内容
        issues = await self._find_issues(artifact)
        suggestions = await self._generate_suggestions(issues)
        return {
            "批准": len(issues) == 0,
            "问题": issues,
            "建议": suggestions
        }

协调模式

基于任务的协调

class SwarmOrchestrator:
    """协调智能体群完成任务"""

    def __init__(self):
        self.agents: dict[str, BaseAgent] = {}
        self.task_queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
        self.completed_tasks: dict[str, Task] = {}
        self.message_bus = MessageBus()

    def register_agent(self, agent: BaseAgent):
        """添加智能体到群"""
        self.agents[agent.id] = agent

    async def submit_task(self, task: Task):
        """提交任务处理"""
        # 计算有效优先级(越低优先级越高)
        priority = -task.priority  # 取负用于最小堆行为
        await self.task_queue.put((priority, task))

    async def run(self):
        """主协调循环"""
        while True:
            # 获取最高优先级任务
            _, task = await self.task_queue.get()

            # 检查依赖
            if not self._dependencies_met(task):
                # 以较低优先级重新入队
                await self.task_queue.put((1, task))
                continue

            # 找到合适智能体
            agent = await self._find_available_agent(task)
            if agent:
                await self._assign_task(agent, task)
            else:
                # 无可用智能体,重新入队
                await asyncio.sleep(0.1)
                await self.task_queue.put((0, task))

    def _dependencies_met(self, task: Task) -> bool:
        """检查所有依赖是否完成"""
        for dep_id in task.dependencies:
            if dep_id not in self.completed_tasks:
                return False
            if self.completed_tasks[dep_id].status != "完成":
                return False
        return True

    async def _find_available_agent(self, task: Task) -> Optional[BaseAgent]:
        """找到空闲且能处理任务的智能体"""
        for agent in self.agents.values():
            if agent.status == AgentStatus.IDLE and agent.can_handle(task):
                return agent
        return None

    async def _assign_task(self, agent: BaseAgent, task: Task):
        """分配任务给智能体"""
        task.assigned_to = agent.id
        task.status = "已分配"

        message = AgentMessage(
            sender_id="协调器",
            recipient_id=agent.id,
            message_type="分配任务",
            content=task
        )
        await agent.receive_message(message)

工作流协调

from dataclasses import dataclass
from typing import Callable, List

@dataclass
class WorkflowStep:
    name: str
    agent_type: str
    input_transform: Callable[[dict], dict] = lambda x: x
    required_approval: bool = False

class WorkflowOrchestrator:
    """使用智能体群执行多步骤工作流"""

    def __init__(self, swarm: SwarmOrchestrator):
        self.swarm = swarm
        self.workflows: dict[str, List[WorkflowStep]] = {}

    def register_workflow(self, name: str, steps: List[WorkflowStep]):
        """注册多步骤工作流"""
        self.workflows[name] = steps

    async def execute_workflow(
        self,
        workflow_name: str,
        initial_input: dict
    ) -> dict:
        """通过智能体群执行工作流"""
        steps = self.workflows[workflow_name]
        current_data = initial_input
        results = []

        for i, step in enumerate(steps):
            # 转换此步骤的输入
            step_input = step.input_transform(current_data)

            # 创建任务
            task = Task(
                id=f"{workflow_name}_{i}_{step.name}",
                description=f"执行 {step.name}",
                context=step_input
            )

            # 提交并等待完成
            await self.swarm.submit_task(task)
            result = await self._wait_for_completion(task.id)

            # 如果需要,处理批准
            if step.required_approval:
                approved = await self._request_approval(step, result)
                if not approved:
                    return {"状态": "拒绝", "步骤": step.name}

            results.append(result)
            current_data = {**current_data, **result}

        return {
            "状态": "完成",
            "结果": results,
            "最终输出": current_data
        }

智能体间通信

消息模式

class MessageBus:
    """群通信的中央消息路由"""

    def __init__(self):
        self.subscribers: dict[str, List[BaseAgent]] = {}
        self.message_history: List[AgentMessage] = []

    def subscribe(self, topic: str, agent: BaseAgent):
        """订阅智能体到主题"""
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(agent)

    async def publish(self, topic: str, message: AgentMessage):
        """发布消息到主题订阅者"""
        self.message_history.append(message)

        for agent in self.subscribers.get(topic, []):
            if agent.id != message.sender_id:  # 不发送给自己
                await agent.receive_message(message)

    async def send_direct(self, message: AgentMessage):
        """发送消息到特定智能体"""
        # 路由到接收者
        pass

    async def broadcast(self, message: AgentMessage):
        """广播到所有智能体"""
        for topic_subscribers in self.subscribers.values():
            for agent in topic_subscribers:
                if agent.id != message.sender_id:
                    await agent.receive_message(message)

共识协议

class ConsensusProtocol:
    """在智能体间达成共识"""

    def __init__(self, agents: List[BaseAgent], threshold: float = 0.66):
        self.agents = agents
        self.threshold = threshold

    async def vote(self, proposal: Any) -> dict:
        """从智能体收集投票"""
        votes = {}

        for agent in self.agents:
            message = AgentMessage(
                sender_id="共识",
                recipient_id=agent.id,
                message_type="投票请求",
                content=proposal
            )
            response = await self._request_vote(agent, message)
            votes[agent.id] = response

        return self._tally_votes(votes)

    def _tally_votes(self, votes: dict) -> dict:
        """计算共识结果"""
        approve_count = sum(1 for v in votes.values() if v.get("批准"))
        total = len(votes)

        consensus_reached = (approve_count / total) >= self.threshold

        return {
            "达成共识": consensus_reached,
            "批准数": approve_count,
            "拒绝数": total - approve_count,
            "阈值": self.threshold,
            "投票": votes
        }

涌现行为

痕迹模式

通过环境修改的间接协调:

class SharedWorkspace:
    """用于痕迹协调的共享环境"""

    def __init__(self):
        self.artifacts: dict[str, Any] = {}
        self.pheromones: dict[str, float] = {}  # 强度指示器
        self.decay_rate = 0.1

    def deposit(self, key: str, artifact: Any, strength: float = 1.0):
        """添加内容到共享空间"""
        self.artifacts[key] = artifact
        self.pheromones[key] = strength

    def sense(self, pattern: str) -> List[tuple[str, Any, float]]:
        """找到匹配模式的内容"""
        matches = []
        for key, artifact in self.artifacts.items():
            if pattern in key:
                strength = self.pheromones.get(key, 0)
                matches.append((key, artifact, strength))
        return sorted(matches, key=lambda x: -x[2])  # 最高强度优先

    def decay(self):
        """随时间减少痕迹强度"""
        for key in self.pheromones:
            self.pheromones[key] *= (1 - self.decay_rate)
            if self.pheromones[key] < 0.01:
                del self.pheromones[key]
                del self.artifacts[key]

    def reinforce(self, key: str, amount: float = 0.1):
        """加强痕迹轨迹"""
        if key in self.pheromones:
            self.pheromones[key] = min(1.0, self.pheromones[key] + amount)

蚁群优化

class AntColonyTaskAllocator:
    """使用蚁群优化分配任务"""

    def __init__(self, agents: List[BaseAgent], tasks: List[Task]):
        self.agents = agents
        self.tasks = tasks
        self.pheromones = {}  # (agent_id, task_id) -> 强度
        self.alpha = 1.0  # 痕迹重要性
        self.beta = 2.0   # 启发式重要性
        self.evaporation = 0.1

    def _calculate_probability(
        self,
        agent: BaseAgent,
        task: Task
    ) -> float:
        """计算任务分配给智能体的概率"""
        key = (agent.id, task.id)

        # 痕迹组件
        pheromone = self.pheromones.get(key, 0.1)

        # 启发式:智能体能力匹配
        heuristic = 1.0 if agent.can_handle(task) else 0.1

        return (pheromone ** self.alpha) * (heuristic ** self.beta)

    def allocate(self) -> dict[str, str]:
        """生成任务分配"""
        allocation = {}
        available_tasks = set(t.id for t in self.tasks)

        for agent in self.agents:
            if not available_tasks:
                break

            # 计算可用任务的概率
            probs = {}
            for task in self.tasks:
                if task.id in available_tasks:
                    probs[task.id] = self._calculate_probability(agent, task)

            # 归一化并选择
            total = sum(probs.values())
            if total > 0:
                selected = self._weighted_choice(probs, total)
                allocation[agent.id] = selected
                available_tasks.remove(selected)

        return allocation

    def update_pheromones(self, allocation: dict, quality: dict):
        """基于解决方案质量更新痕迹"""
        # 蒸发
        for key in self.pheromones:
            self.pheromones[key] *= (1 - self.evaporation)

        # 基于质量沉积
        for agent_id, task_id in allocation.items():
            key = (agent_id, task_id)
            deposit = quality.get(task_id, 0.1)
            self.pheromones[key] = self.pheromones.get(key, 0) + deposit

最佳实践

设计指南

  1. 保持智能体简单: 复杂行为从简单规则中涌现
  2. 定义清晰接口: 消息格式、任务结构
  3. 规划失败: 智能体会失败;系统应继续运行
  4. 监控集体行为: 单个智能体可能正常,但群可能卡住
  5. 版本协调协议: 智能体可能运行不同版本

要避免的反模式

  • 上帝协调器: 一个智能体做所有事情
  • 健谈智能体: 过多的智能体间通信
  • 紧密耦合: 智能体依赖特定其他智能体
  • 错过截止时间: 任务完成无超时
  • 状态爆炸: 智能体维护太多状态

参考文献

  • references/swarm-topologies.md - 详细拓扑模式
  • references/coordination-protocols.md - 共识和投票算法
  • references/emergent-patterns.md - 痕迹和自组织