name: issue-devpipeline
description: |
使用Wave Pipeline模式进行规划和执行流水线。
编排器协调规划器(深度交互)和执行器(并行扇出)。
规划器产生波队列,执行器并发实现解决方案。
agents: 4
phases: 4
Issue开发流水线
边规划边执行流水线。编排器通过 Wave Pipeline 协调 planner 和 executor(s):planner 完成一个 wave 的规划后输出执行队列,编排器立即为该 wave 派发 executor agents,同时 planner 继续规划下一 wave。
架构概述
┌─────────────────────────────────────────────────────────────┐
│ 编排器(此文件) │
│ → 解析输入 → 管理规划器 → 派发执行器 │
└───────────┬──────────────────────────────────────┬──────────┘
│ │
┌──────┴──────┐ ┌──────────┴──────────┐
│ 规划器 │ │ 执行器(N) │
│ (深度 │ │ (并行扇出) │
│ 交互多轮) │ │ │
│ │ │ exec-1 exec-2 ... │
└──────┬──────┘ └──────────┬──────────┘
│ │
┌──────┴──────┐ ┌──────────┴──────────┐
│ issue-plan │ │ code-developer │
│ issue-queue│ │ (角色参考) │
│ (现有) │ │ │
└─────────────┘ └─────────────────────┘
Wave Pipeline 流程:
规划器第1轮 → Wave 1 队列
↓ (为wave 1生成执行器)
↓ 发送输入 → 规划器第2轮 → Wave 2 队列
↓ (为wave 2生成执行器)
...
↓ 规划器输出“ALL_PLANNED”
↓ 等待所有执行器代理
↓ 聚合结果 → 完成
代理注册表
| 代理 |
角色文件 |
职责 |
新/现有 |
planex-planner |
~/.codex/agents/planex-planner.md |
需求拆解 → 问题创建 → 方案设计 → 队列编排 |
新 |
planex-executor |
~/.codex/agents/planex-executor.md |
加载解决方案 → 代码实现 → 测试 → 提交 |
新 |
issue-plan-agent |
~/.codex/agents/issue-plan-agent.md |
闭环:ACE探索 + 解决方案生成 |
现有 |
issue-queue-agent |
~/.codex/agents/issue-queue-agent.md |
解决方案排序 + 冲突检测 → 执行队列 |
现有 |
输入类型
支持 3 种输入方式(通过编排器参数传入):
| 输入类型 |
格式 |
示例 |
| 问题ID |
直接传入ID |
ISS-20260215-001 ISS-20260215-002 |
| 需求文本 |
--text '...' |
--text '实现用户认证模块' |
| 规划文件 |
--plan 路径 |
--plan plan/2026-02-15-auth.md |
阶段执行
阶段 1: 输入解析(编排器内联)
// 解析输入参数
const args = orchestratorInput
const issueIds = args.match(/ISS-\d{8}-\d{6}/g) || []
const textMatch = args.match(/--text\s+['"]([^'"]+)['"]/)
const planMatch = args.match(/--plan\s+(\S+)/)
let inputType = 'unknown'
if (issueIds.length > 0) inputType = 'issue_ids'
else if (textMatch) inputType = 'text'
else if (planMatch) inputType = 'plan_file'
else inputType = 'text_from_description'
const inputPayload = {
type: inputType,
issueIds: issueIds,
text: textMatch ? textMatch[1] : args,
planFile: planMatch ? planMatch[1] : null
}
阶段 2: 规划(与规划器的深度交互)
// 跟踪所有代理以进行清理
const allAgentIds = []
// 生成规划器代理
const plannerId = spawn_agent({
message: `
## 任务分配
### 强制第一步(代理执行)
1. **阅读角色定义**:~/.codex/agents/planex-planner.md(必须先读)
2. 阅读:.workflow/project-tech.json
3. 阅读:.workflow/project-guidelines.json
---
目标:分析需求并完成第一波(Wave 1)的规划。输出执行队列。
输入:
${JSON.stringify(inputPayload, null, 2)}
范围:
- 包括:需求分析、问题创建、方案设计、队列编排
- 排除:代码实现、测试执行、git操作
交付物:
输出严格遵循以下JSON格式:
\`\`\`json
{
"wave": 1,
"status": "wave_ready" | "all_planned",
"issues": ["ISS-xxx", ...],
"queue": [
{
"issue_id": "ISS-xxx",
"solution_id": "SOL-xxx",
"title": "描述",
"priority": "normal",
"depends_on": []
}
],
"remaining_issues": ["ISS-yyy", ...],
"summary": "本波次规划摘要"
}
\`\`\`
质量标准:
- 每个问题必须有绑定的解决方案
- 队列必须按依赖排序
- 每波最多5个问题
`
})
allAgentIds.push(plannerId)
// 等待规划器Wave 1输出
let plannerResult = wait({ ids: [plannerId], timeout_ms: 900000 })
if (plannerResult.timed_out) {
send_input({ id: plannerId, message: "请尽快输出当前已完成的规划结果。" })
plannerResult = wait({ ids: [plannerId], timeout_ms: 120000 })
}
// 解析规划器输出
let waveData = parseWaveOutput(plannerResult.status[plannerId].completed)
阶段 3: Wave执行循环
const executorResults = []
let waveNum = 0
while (true) {
waveNum++
// ─── 为当前wave派发执行器(并行扇出) ───
const waveExecutors = waveData.queue.map(entry =>
spawn_agent({
message: `
## 任务分配
### 强制第一步(代理执行)
1. **阅读角色定义**:~/.codex/agents/planex-executor.md(必须先读)
2. 阅读:.workflow/project-tech.json
3. 阅读:.workflow/project-guidelines.json
---
目标:实现 ${entry.issue_id} 的解决方案
问题:${entry.issue_id}
解决方案:${entry.solution_id}
标题:${entry.title}
优先级:${entry.priority}
依赖项:${entry.depends_on?.join(', ') || '无'}
范围:
- 包括:加载解决方案计划、代码实现、测试运行、git提交
- 排除:问题创建、方案修改、队列变更
交付物:
输出严格遵循以下格式:
\`\`\`json
{
"issue_id": "${entry.issue_id}",
"status": "success" | "failed",
"files_changed": ["path/to/file", ...],
"tests_passed": true | false,
"committed": true | false,
"commit_hash": "abc123" | null,
"error": null | "错误描述",
"summary": "实现摘要"
}
\`\`\`
质量标准:
- 解决方案计划中的所有任务必须实现
- 现有测试不能中断
- 遵循项目编码规范
- 每个变更必须提交
`
})
)
allAgentIds.push(...waveExecutors)
// ─── 检查是否需要更多waves ───
if (waveData.status === 'all_planned') {
// 没有更多waves — 等待当前执行器并完成
const execResults = wait({ ids: waveExecutors, timeout_ms: 1200000 })
waveExecutors.forEach((id, i) => {
executorResults.push({
wave: waveNum,
issue: waveData.queue[i].issue_id,
result: execResults.status[id]?.completed || 'timeout'
})
})
break
}
// ─── 请求规划器下一wave(同时执行器运行) ───
send_input({
id: plannerId,
message: `
## Wave ${waveNum} 已派发
已为 Wave ${waveNum} 创建 ${waveExecutors.length} 个执行器代理。
## 下一步
请继续规划下一波(Wave ${waveNum + 1})。
剩余问题:${JSON.stringify(waveData.remaining_issues)}
输出格式同前。如果所有问题已规划完毕,status设为"all_planned"。
`
})
// ─── 等待两者:执行器(当前wave)+ 规划器(下一wave) ───
const allWaiting = [...waveExecutors, plannerId]
const batchResult = wait({ ids: allWaiting, timeout_ms: 1200000 })
// 收集执行器结果
waveExecutors.forEach((id, i) => {
executorResults.push({
wave: waveNum,
issue: waveData.queue[i].issue_id,
result: batchResult.status[id]?.completed || 'timeout'
})
})
// 解析规划器下一wave
if (batchResult.status[plannerId]?.completed) {
waveData = parseWaveOutput(batchResult.status[plannerId].completed)
} else {
// 规划器超时 — 再等待
const plannerRetry = wait({ ids: [plannerId], timeout_ms: 300000 })
if (plannerRetry.timed_out) {
// 中止流水线
break
}
waveData = parseWaveOutput(plannerRetry.status[plannerId].completed)
}
}
阶段 4: 聚合与清理
// ─── 聚合结果 ───
const succeeded = executorResults.filter(r => {
try {
const parsed = JSON.parse(r.result)
return parsed.status === 'success'
} catch { return false }
})
const failed = executorResults.filter(r => {
try {
const parsed = JSON.parse(r.result)
return parsed.status === 'failed'
} catch { return true }
})
// ─── 输出最终报告 ───
const report = `
## PlanEx流水线完成
**Waves**: ${waveNum}
**总问题数**: ${executorResults.length}
**成功**: ${succeeded.length}
**失败**: ${failed.length}
### 按Wave结果
${executorResults.map(r => `- Wave ${r.wave} | ${r.issue} | ${(() => {
try { return JSON.parse(r.result).status } catch { return 'error' }
})()}`).join('
')}
${failed.length > 0 ? `### 失败问题
${failed.map(r => `- ${r.issue}: ${(() => {
try { return JSON.parse(r.result).error } catch { return r.result.slice(0, 200) }
})()}`).join('
')}` : ''}
`
console.log(report)
// ─── 生命周期清理 ───
allAgentIds.forEach(id => {
try { close_agent({ id }) } catch { /* 已关闭 */ }
})
辅助函数
function parseWaveOutput(output) {
// 从代理输出中提取JSON块
const jsonMatch = output.match(/```json\s*([\s\S]*?)```/)
if (jsonMatch) {
try { return JSON.parse(jsonMatch[1]) } catch {}
}
// 备选:尝试将整个输出解析为JSON
try { return JSON.parse(output) } catch {}
// 最后手段:返回空wave并标记为all_planned
return { wave: 0, status: 'all_planned', queue: [], remaining_issues: [], summary: '解析失败' }
}
配置
const CONFIG = {
sessionDir: ".workflow/.team/PEX-{slug}-{date}/",
issueDataDir: ".workflow/issues/",
maxWaveSize: 5,
plannerTimeout: 900000, // 15分钟
executorTimeout: 1200000, // 20分钟
maxWaves: 10
}
生命周期管理
超时处理
| 场景 |
操作 |
| 规划器wave超时 |
send_input催促收敛,重试wait 120秒 |
| 执行器超时 |
标记为失败,继续其他执行器 |
| 批次等待部分超时 |
收集已完成结果,继续流水线 |
| 流水线停滞(> 2 waves超时) |
中止流水线,输出部分结果 |
清理协议
// 所有代理在allAgentIds中跟踪
// 最终清理在结束或错误时
allAgentIds.forEach(id => {
try { close_agent({ id }) } catch { /* 已关闭 */ }
})
错误处理
| 场景 |
解决方案 |
| 规划器输出解析失败 |
重试send_input要求严格JSON |
| 无问题创建 |
报告错误,中止流水线 |
| 解决方案规划失败 |
跳过问题,在最终结果中报告 |
| 执行器实现失败 |
标记为失败,继续其他执行器 |
| 波中所有执行器失败 |
报告波失败,继续下一波 |
| 规划器提前退出 |
视为all_planned,完成当前wave |