代理群协调器 agent-swarm-orchestrator

此技能专注于设计和协调多代理系统,通过代理群架构、任务分发、通信协议和集体行为来实现复杂任务的分布式执行和集体智能。关键词:多代理系统、代理协调、任务分配、通信模式、集体智能、AI代理、智能体、分布式执行、涌现行为、系统设计。

AI智能体 0 次安装 0 次浏览 更新于 3/7/2026

name: agent-swarm-orchestrator description: 设计具有协调代理群、任务分发、代理间通信和涌现集体行为的多代理系统。 license: MIT

代理群协调器

此技能提供设计多代理系统的指导,其中多个AI代理通过分布式执行和涌现行为协调完成复杂任务。

核心能力

  • 群架构: 代理拓扑结构、通信模式
  • 任务分发: 工作分配、负载均衡
  • 协调协议: 共识、投票、委派
  • 涌现行为: 来自简单规则的集体智能

多代理基础知识

为什么使用多代理系统

单代理:                    多代理群:
┌─────────────────┐             ┌─────────────────────────────┐
│                 │             │  ┌───┐ ┌───┐ ┌───┐ ┌───┐   │
│   一个代理      │             │  │ A │ │ A │ │ A │ │ A │   │
│   顺序执行      │     对比    │  └───┘ └───┘ └───┘ └───┘   │
│   单一视角      │             │  并行、多样化视角           │
│                 │             │  专业化可能                │
└─────────────────┘             └─────────────────────────────┘

优势:
- 并行性: 多个代理同时工作
- 专业化: 代理可具有不同能力
- 韧性: 如果一个代理失败,系统继续运行
- 多样化视角: 问题的多种解决方法

代理角色

角色 责任 特征
协调器 协调代理群 全局视图、任务分配
工作器 执行任务 专业技能、专注
监督器 质量控制 审查、批准、重定向
专家 领域专业知识 深度知识、窄范围
侦察员 探索 信息收集、研究

群拓扑结构

层次结构

                    ┌──────────────┐
                    │  协调器      │
                    └──────┬───────┘
                           │
          ┌────────────────┼────────────────┐
          │                │                │
    ┌─────┴─────┐    ┌─────┴─────┐    ┌─────┴─────┐
    │监督器     │    │监督器     │    │监督器     │
    └─────┬─────┘    └─────┬─────┘    └─────┬─────┘
          │                │                │
    ┌─────┼─────┐    ┌─────┼─────┐    ┌─────┼─────┐
    │     │     │    │     │     │    │     │     │
   ┌┴┐   ┌┴┐   ┌┴┐  ┌┴┐   ┌┴┐   ┌┴┐  ┌┴┐   ┌┴┐   ┌┴┐
   │W│   │W│   │W│  │W│   │W│   │W│  │W│   │W│   │W│
   └─┘   └─┘   └─┘  └─┘   └─┘   └─┘  └─┘   └─┘   └─┘
         工作器

最适用: 清晰任务分解、需要质量控制

点对点

        ┌───┐───────────────┌───┐
        │ A │               │ A │
        └───┘               └───┘
          │ \             / │
          │   \         /   │
          │     \     /     │
          │       \ /       │
        ┌───┐     ╳       ┌───┐
        │ A │   /   \     │ A │
        └───┘ /       \   └───┘
            /           \
        ┌───┐           ┌───┐
        │ A │───────────│ A │
        └───┘           └───┘

最适用: 协作问题解决、无单点故障

黑板

┌─────────────────────────────────────────────────────┐
│                     黑板                             │
│  ┌─────────────┐ ┌─────────────┐ ┌───────────────┐ │
│  │ 问题状态    │ │ 部分结果    │ │ 解决方案      │ │
│  └─────────────┘ └─────────────┘ └───────────────┘ │
└───────────────────────┬─────────────────────────────┘
                        │
    ┌───────────────────┼───────────────────┐
    │         读/写     │                   │
    ▼                   ▼                   ▼
┌───────┐          ┌───────┐          ┌───────┐
│代理A  │          │代理B  │          │代理C  │
│分析师│          │构建师│          │评审师│
└───────┘          └───────┘          └───────┘

最适用: 复杂问题、代理异步贡献

代理实现

基础代理结构

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Optional, List
from enum import Enum
import asyncio

class AgentStatus(Enum):
    IDLE = "idle"
    WORKING = "working"
    WAITING = "waiting"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class AgentMessage:
    sender_id: str
    recipient_id: str  # 或 "broadcast"
    message_type: str
    content: Any
    timestamp: float = field(default_factory=lambda: time.time())
    correlation_id: Optional[str] = None

@dataclass
class Task:
    id: str
    description: str
    priority: int = 0
    dependencies: List[str] = field(default_factory=list)
    assigned_to: Optional[str] = None
    status: str = "pending"
    result: Any = None


class BaseAgent(ABC):
    """群代理的基类"""

    def __init__(self, agent_id: str, capabilities: List[str]):
        self.id = agent_id
        self.capabilities = capabilities
        self.status = AgentStatus.IDLE
        self.message_queue: asyncio.Queue = asyncio.Queue()
        self.current_task: Optional[Task] = None

    @abstractmethod
    async def process_task(self, task: Task) -> Any:
        """处理分配的任务 - 在子类中实现"""
        pass

    @abstractmethod
    def can_handle(self, task: Task) -> bool:
        """检查代理是否可以处理此任务类型"""
        pass

    async def receive_message(self, message: AgentMessage):
        """将消息添加到队列进行处理"""
        await self.message_queue.put(message)

    async def run(self):
        """主代理循环"""
        while True:
            # 检查消息
            try:
                message = await asyncio.wait_for(
                    self.message_queue.get(),
                    timeout=0.1
                )
                await self._handle_message(message)
            except asyncio.TimeoutError:
                pass

            # 处理当前任务
            if self.current_task and self.status == AgentStatus.WORKING:
                await self._work_on_task()

    async def _handle_message(self, message: AgentMessage):
        """处理传入消息"""
        if message.message_type == "assign_task":
            self.current_task = message.content
            self.status = AgentStatus.WORKING
        elif message.message_type == "cancel_task":
            self.current_task = None
            self.status = AgentStatus.IDLE
        elif message.message_type == "status_request":
            await self._send_status(message.sender_id)

    async def _work_on_task(self):
        """执行当前任务"""
        try:
            result = await self.process_task(self.current_task)
            self.current_task.result = result
            self.current_task.status = "completed"
            self.status = AgentStatus.COMPLETED
        except Exception as e:
            self.current_task.status = "failed"
            self.status = AgentStatus.FAILED

专业化代理

class ResearchAgent(BaseAgent):
    """专门用于信息收集的代理"""

    def __init__(self, agent_id: str):
        super().__init__(agent_id, ["research", "search", "analyze"])
        self.search_tools = []

    def can_handle(self, task: Task) -> bool:
        return any(cap in task.description.lower()
                   for cap in ["research", "find", "search", "investigate"])

    async def process_task(self, task: Task) -> dict:
        # 研究实现
        results = await self._search(task.description)
        analysis = await self._analyze(results)
        return {
            "sources": results,
            "analysis": analysis,
            "confidence": self._calculate_confidence(results)
        }


class CodeAgent(BaseAgent):
    """专门用于代码生成的代理"""

    def __init__(self, agent_id: str):
        super().__init__(agent_id, ["code", "implement", "debug"])

    def can_handle(self, task: Task) -> bool:
        return any(cap in task.description.lower()
                   for cap in ["implement", "code", "write", "fix", "debug"])

    async def process_task(self, task: Task) -> dict:
        # 代码生成实现
        code = await self._generate_code(task.description)
        tests = await self._generate_tests(code)
        return {
            "code": code,
            "tests": tests,
            "language": self._detect_language(code)
        }


class ReviewAgent(BaseAgent):
    """专门用于质量评审的代理"""

    def __init__(self, agent_id: str):
        super().__init__(agent_id, ["review", "critique", "approve"])

    def can_handle(self, task: Task) -> bool:
        return "review" in task.description.lower()

    async def process_task(self, task: Task) -> dict:
        artifact = task.content  # 要评审的内容
        issues = await self._find_issues(artifact)
        suggestions = await self._generate_suggestions(issues)
        return {
            "approved": len(issues) == 0,
            "issues": issues,
            "suggestions": suggestions
        }

协调模式

基于任务的协调

class SwarmOrchestrator:
    """协调代理群完成任务"""

    def __init__(self):
        self.agents: dict[str, BaseAgent] = {}
        self.task_queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
        self.completed_tasks: dict[str, Task] = {}
        self.message_bus = MessageBus()

    def register_agent(self, agent: BaseAgent):
        """将代理添加到群中"""
        self.agents[agent.id] = agent

    async def submit_task(self, task: Task):
        """提交任务进行处理"""
        # 计算有效优先级(低值 = 高优先级)
        priority = -task.priority  # 取负用于最小堆行为
        await self.task_queue.put((priority, task))

    async def run(self):
        """主协调循环"""
        while True:
            # 获取最高优先级任务
            _, task = await self.task_queue.get()

            # 检查依赖
            if not self._dependencies_met(task):
                # 重新排队,优先级降低
                await self.task_queue.put((1, task))
                continue

            # 找到合适的代理
            agent = await self._find_available_agent(task)
            if agent:
                await self._assign_task(agent, task)
            else:
                # 无可用代理,重新排队
                await asyncio.sleep(0.1)
                await self.task_queue.put((0, task))

    def _dependencies_met(self, task: Task) -> bool:
        """检查所有依赖是否已完成"""
        for dep_id in task.dependencies:
            if dep_id not in self.completed_tasks:
                return False
            if self.completed_tasks[dep_id].status != "completed":
                return False
        return True

    async def _find_available_agent(self, task: Task) -> Optional[BaseAgent]:
        """找到可以处理任务且空闲的代理"""
        for agent in self.agents.values():
            if agent.status == AgentStatus.IDLE and agent.can_handle(task):
                return agent
        return None

    async def _assign_task(self, agent: BaseAgent, task: Task):
        """将任务分配给代理"""
        task.assigned_to = agent.id
        task.status = "assigned"

        message = AgentMessage(
            sender_id="orchestrator",
            recipient_id=agent.id,
            message_type="assign_task",
            content=task
        )
        await agent.receive_message(message)

工作流协调

from dataclasses import dataclass
from typing import Callable, List

@dataclass
class WorkflowStep:
    name: str
    agent_type: str
    input_transform: Callable[[dict], dict] = lambda x: x
    required_approval: bool = False

class WorkflowOrchestrator:
    """通过代理群执行多步骤工作流"""

    def __init__(self, swarm: SwarmOrchestrator):
        self.swarm = swarm
        self.workflows: dict[str, List[WorkflowStep]] = {}

    def register_workflow(self, name: str, steps: List[WorkflowStep]):
        """注册多步骤工作流"""
        self.workflows[name] = steps

    async def execute_workflow(
        self,
        workflow_name: str,
        initial_input: dict
    ) -> dict:
        """通过代理群执行工作流"""
        steps = self.workflows[workflow_name]
        current_data = initial_input
        results = []

        for i, step in enumerate(steps):
            # 为此步骤转换输入
            step_input = step.input_transform(current_data)

            # 创建任务
            task = Task(
                id=f"{workflow_name}_{i}_{step.name}",
                description=f"执行 {step.name}",
                context=step_input
            )

            # 提交并等待完成
            await self.swarm.submit_task(task)
            result = await self._wait_for_completion(task.id)

            # 如果需要,处理批准
            if step.required_approval:
                approved = await self._request_approval(step, result)
                if not approved:
                    return {"status": "rejected", "step": step.name}

            results.append(result)
            current_data = {**current_data, **result}

        return {
            "status": "completed",
            "results": results,
            "final_output": current_data
        }

代理间通信

消息模式

class MessageBus:
    """群通信的中央消息路由"""

    def __init__(self):
        self.subscribers: dict[str, List[BaseAgent]] = {}
        self.message_history: List[AgentMessage] = []

    def subscribe(self, topic: str, agent: BaseAgent):
        """订阅代理到主题"""
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(agent)

    async def publish(self, topic: str, message: AgentMessage):
        """发布消息到主题订阅者"""
        self.message_history.append(message)

        for agent in self.subscribers.get(topic, []):
            if agent.id != message.sender_id:  # 不发送给自己
                await agent.receive_message(message)

    async def send_direct(self, message: AgentMessage):
        """发送消息到特定代理"""
        # 路由到接收者
        pass

    async def broadcast(self, message: AgentMessage):
        """广播到所有代理"""
        for topic_subscribers in self.subscribers.values():
            for agent in topic_subscribers:
                if agent.id != message.sender_id:
                    await agent.receive_message(message)

共识协议

class ConsensusProtocol:
    """在代理中达成共识"""

    def __init__(self, agents: List[BaseAgent], threshold: float = 0.66):
        self.agents = agents
        self.threshold = threshold

    async def vote(self, proposal: Any) -> dict:
        """从代理收集投票"""
        votes = {}

        for agent in self.agents:
            message = AgentMessage(
                sender_id="consensus",
                recipient_id=agent.id,
                message_type="vote_request",
                content=proposal
            )
            response = await self._request_vote(agent, message)
            votes[agent.id] = response

        return self._tally_votes(votes)

    def _tally_votes(self, votes: dict) -> dict:
        """计算共识结果"""
        approve_count = sum(1 for v in votes.values() if v.get("approve"))
        total = len(votes)

        consensus_reached = (approve_count / total) >= self.threshold

        return {
            "consensus_reached": consensus_reached,
            "approve_count": approve_count,
            "reject_count": total - approve_count,
            "threshold": self.threshold,
            "votes": votes
        }

涌现行为

刺激源模式

通过环境修改间接协调:

class SharedWorkspace:
    """用于刺激源协调的共享环境"""

    def __init__(self):
        self.artifacts: dict[str, Any] = {}
        self.pheromones: dict[str, float] = {}  # 强度指示器
        self.decay_rate = 0.1

    def deposit(self, key: str, artifact: Any, strength: float = 1.0):
        """添加制品到共享空间"""
        self.artifacts[key] = artifact
        self.pheromones[key] = strength

    def sense(self, pattern: str) -> List[tuple[str, Any, float]]:
        """找到匹配模式的制品"""
        matches = []
        for key, artifact in self.artifacts.items():
            if pattern in key:
                strength = self.pheromones.get(key, 0)
                matches.append((key, artifact, strength))
        return sorted(matches, key=lambda x: -x[2])  # 强度最高优先

    def decay(self):
        """随时间减少信息素强度"""
        for key in self.pheromones:
            self.pheromones[key] *= (1 - self.decay_rate)
            if self.pheromones[key] < 0.01:
                del self.pheromones[key]
                del self.artifacts[key]

    def reinforce(self, key: str, amount: float = 0.1):
        """增强信息素轨迹"""
        if key in self.pheromones:
            self.pheromones[key] = min(1.0, self.pheromones[key] + amount)

蚁群优化

class AntColonyTaskAllocator:
    """使用蚁群优化分配任务"""

    def __init__(self, agents: List[BaseAgent], tasks: List[Task]):
        self.agents = agents
        self.tasks = tasks
        self.pheromones = {}  # (agent_id, task_id) -> 强度
        self.alpha = 1.0  # 信息素重要性
        self.beta = 2.0   # 启发式重要性
        self.evaporation = 0.1

    def _calculate_probability(
        self,
        agent: BaseAgent,
        task: Task
    ) -> float:
        """计算将任务分配给代理的概率"""
        key = (agent.id, task.id)

        # 信息素组件
        pheromone = self.pheromones.get(key, 0.1)

        # 启发式: 代理能力匹配
        heuristic = 1.0 if agent.can_handle(task) else 0.1

        return (pheromone ** self.alpha) * (heuristic ** self.beta)

    def allocate(self) -> dict[str, str]:
        """生成任务分配"""
        allocation = {}
        available_tasks = set(t.id for t in self.tasks)

        for agent in self.agents:
            if not available_tasks:
                break

            # 计算可用任务的概率
            probs = {}
            for task in self.tasks:
                if task.id in available_tasks:
                    probs[task.id] = self._calculate_probability(agent, task)

            # 归一化并选择
            total = sum(probs.values())
            if total > 0:
                selected = self._weighted_choice(probs, total)
                allocation[agent.id] = selected
                available_tasks.remove(selected)

        return allocation

    def update_pheromones(self, allocation: dict, quality: dict):
        """基于解决方案质量更新信息素"""
        # 蒸发
        for key in self.pheromones:
            self.pheromones[key] *= (1 - self.evaporation)

        # 基于质量沉积
        for agent_id, task_id in allocation.items():
            key = (agent_id, task_id)
            deposit = quality.get(task_id, 0.1)
            self.pheromones[key] = self.pheromones.get(key, 0) + deposit

最佳实践

设计指南

  1. 保持代理简单: 复杂行为从简单规则涌现
  2. 定义清晰接口: 消息格式、任务结构
  3. 规划故障: 代理会失败;系统应继续运行
  4. 监控集体行为: 单个代理可能正常,但群可能卡住
  5. 版本协调协议: 代理可能运行不同版本

避免的反模式

  • 上帝协调器: 一个代理做所有事情
  • 闲聊代理: 过多代理间通信
  • 紧耦合: 代理依赖特定其他代理
  • 缺失截止时间: 任务完成无超时
  • 状态爆炸: 代理维护太多状态

参考资料

  • references/swarm-topologies.md - 详细拓扑模式
  • references/coordination-protocols.md - 共识和投票算法
  • references/emergent-patterns.md - 刺激源和自组织