名称: 队列 描述: 多代理协调中的作业队列模式指南。用于决定使用后台作业还是内联执行、提交长时间运行任务、监控作业进度和处理失败。涵盖何时使用队列、作业优先级、重试策略和监控模式。 标签:
- 队列
- 后台作业
- 分布式工作
- 作业处理
- 可靠性
队列技能
使用BullMQ和Redis为多代理工作流提供可靠的作业处理。
何时使用后台作业(队列)
在以下情况下使用队列作业:
- 长时间运行的操作 (>500ms) - 嵌入生成、PDF处理、数据分析
- 资源密集型工作 - ML推理、图像转码、复杂计算
- 容错性重要 - 可以失败并重试,而不阻塞调用者
- 需要扩展 - 使用工作进程并行处理多个作业
- 异步可接受 - 调用者不需要立即结果
- 需要速率限制 - 通过并发设置控制吞吐量
- 工作流协调 - 链式任务或异步等待结果
何时不使用后台作业
在以下情况下使用内联执行:
- 低于100ms的操作 - 简单数据转换、验证、缓存查找
- 需要立即结果 - 调用者阻塞等待响应
- 不需要失败处理 - 单次请求,无重试逻辑
- 无状态一次性作业 - 不需要持久化或监控
作业队列API
创建队列
import { createSwarmQueue } from 'swarm-queue';
const queue = createSwarmQueue({
name: 'embeddings',
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000, // 从2秒开始,指数退避
},
removeOnComplete: true, // 清理已完成的作业
},
});
提交作业
// 基本作业
const jobId = await queue.addJob('generate-embedding', {
text: 'Hello, world!',
model: 'text-embedding-3-small',
});
// 带优先级 (0=紧急, 1=高, 2=正常, 3=低)
const jobId = await queue.addJob(
'agent-task',
{ agentId: 'worker-1', task: 'analyze' },
{ priority: 1 } // 在正常作业之前处理
);
// 延迟作业 (延迟后开始处理)
const jobId = await queue.addJob(
'retry-agent',
{ attempt: 2 },
{ delay: 30000 } // 30秒后处理
);
// 带自定义重试策略
const jobId = await queue.addJob(
'webhook-call',
{ url: 'https://example.com/webhook' },
{
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s, 8s, 16s
},
}
);
检查作业状态
// 获取作业详情
const job = await queue.getJob(jobId);
if (job) {
console.log({
state: await job.getState(), // 'waiting' | 'active' | 'completed' | 'failed'
progress: job.progress(), // 0-100
attempts: job.attemptsMade,
failedReason: job.failedReason,
});
}
// 获取队列指标
const metrics = await queue.getMetrics();
console.log(metrics);
// {
// waiting: 42, // 排队中,未开始
// active: 5, // 正在处理中
// completed: 1000,// 成功完成
// failed: 3, // 重试后失败
// delayed: 0 // 延迟作业
// }
取消作业
// 移除作业 (可从任何状态)
await queue.removeJob(jobId);
创建工作进程
工作进程从队列处理作业。在单独的进程或服务中启动工作进程。
import { createWorker } from 'swarm-queue';
const worker = await createWorker(
{
queueName: 'embeddings',
concurrency: 4, // 并行处理4个作业
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
},
async (job) => {
try {
const { text, model } = job.data.payload;
// 更新进度
job.updateProgress(25);
// 执行工作
const embedding = await generateEmbedding(text, model);
job.updateProgress(100);
// 返回结果
return {
success: true,
data: { embedding, dimensions: embedding.length },
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
}
);
// 开始处理
await worker.start();
// 优雅关闭
process.on('SIGTERM', async () => {
await worker.stop();
await worker.close();
});
作业优先级模式
紧急与后台
// 紧急: 需要立即响应的代理错误
await queue.addJob('notify-coordinator', { error }, { priority: 0 });
// 高: 其他代理的任务完成
await queue.addJob('merge-results', { results }, { priority: 1 });
// 正常: 进度更新
await queue.addJob('update-metrics', { metrics }, { priority: 2 });
// 低: 清理、归档
await queue.addJob('cleanup-cache', { cacheKey }, { priority: 3 });
工作进程中的优先级处理
BullMQ自动优先处理更高优先级的作业:
// 队列指标将显示更高优先级的作业先被处理
const metrics = await queue.getMetrics();
console.log(`紧急作业等待中: ${metrics.waiting}`);
// 工作进程会在正常作业之前处理紧急(优先级0)作业
失败处理和重试策略
指数退避
const jobId = await queue.addJob('api-call', { endpoint: '/data' }, {
attempts: 4,
backoff: {
type: 'exponential',
delay: 1000, // 第一次重试: 1s, 然后 2s, 4s, 8s
},
});
重试时间线: 1s → 2s → 4s → 8s → 失败(移至死信)
固定延迟
const jobId = await queue.addJob('webhook', { url }, {
attempts: 3,
backoff: {
type: 'fixed',
delay: 5000, // 重试之间总是等待5秒
},
});
死信模式
最大重试后,作业失败并不再重试:
// 监控失败
const metrics = await queue.getMetrics();
if (metrics.failed > 0) {
console.warn(`${metrics.failed} 个作业永久失败`);
// 记录到监控系统,提醒值班人员等
}
监控和可观测性
实时指标
const metrics = await queue.getMetrics();
const queueHealth = {
throughput: metrics.completed, // 总完成数
backlog: metrics.waiting, // 尚未开始的作业
inProgress: metrics.active, // 当前正在处理
failureRate: metrics.failed / (metrics.completed + metrics.failed),
avgTimeInQueue: null, // 需要自定义追踪
};
console.log(`队列健康状态: ${JSON.stringify(queueHealth, null, 2)}`);
监控最佳实践
- 追踪完成时间: 记录作业进入和离开队列的时间
- 失败警报: 当失败率超过阈值时发出警报
- 积压警告: 当等待作业超过容量时警告
- 工作进程健康: 监控工作进程可用性
- 作业超时: 为每种作业类型设置合理的超时预期
// 示例: 监控队列健康
setInterval(async () => {
const metrics = await queue.getMetrics();
const total = Object.values(metrics).reduce((a, b) => a + b);
if (metrics.failed > 0.1 * total) {
console.error('检测到高失败率');
// 提醒监控系统
}
if (metrics.waiting > 1000) {
console.warn('检测到大积压 - 考虑扩展工作进程');
}
}, 30000);
常见作业类型
嵌入生成
// 代理提交: 为文档生成嵌入
const jobId = await queue.addJob('generate-embedding', {
documentId: 'doc-123',
text: '完整文档文本...',
model: 'text-embedding-3-small',
}, {
priority: 2,
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});
// 工作进程: 处理嵌入作业
const result = await generateEmbedding(payload.text, payload.model);
PDF处理
// 代理提交: 从PDF提取文本
const jobId = await queue.addJob('process-pdf', {
fileUrl: 'https://example.com/document.pdf',
pages: [1, 2, 3], // 仅提取特定页面
}, {
priority: 1,
attempts: 2,
removeOnComplete: true,
});
// 工作进程: 长时间运行的PDF处理
const text = await extractPdfText(payload.fileUrl, payload.pages);
批量操作
// 代理提交: 在后台处理1000个项目
const jobId = await queue.addJob('bulk-update', {
items: largeDataset,
operation: 'transform',
}, {
priority: 3, // 低优先级,可以花时间
attempts: 1, // 批量作业无重试(可恢复性是应用特定的)
});
// 工作进程: 基于流的处理
for (const item of payload.items) {
await processItem(item);
job.updateProgress(++processed / payload.items.length * 100);
}
使用队列进行任务协调
等待结果
由于队列作业是异步的,使用轮询或事件获取结果:
// 代理 A: 提交作业并轮询结果
const jobId = await queue.addJob('analyze-data', { data });
// 轮询 (简单,但不适合长等待)
let result = null;
const maxAttempts = 60; // 60 * 5秒 = 5分钟
for (let i = 0; i < maxAttempts; i++) {
const job = await queue.getJob(jobId);
const state = await job?.getState();
if (state === 'completed') {
result = job?.returnvalue;
break;
}
await new Promise(r => setTimeout(r, 5000)); // 等待5秒
}
if (result) {
console.log('分析完成:', result);
} else {
console.error('作业超时');
}
链式作业
// 作业 1: 生成嵌入
const jobId1 = await queue.addJob('generate-embedding', { text });
// 作业 2: 作业 1 完成后,比较嵌入
// 这是应用级协调 - 作业 1 的工作进程提交作业 2
// 工作进程:
if (jobResult.success) {
const jobId2 = await queue.addJob('compare-embeddings', {
embedding: jobResult.data.embedding,
compareWith: otherEmbedding,
});
}
CLI 使用 (swarm queue)
swarm CLI 提供队列管理命令:
# 提交作业
swarm queue submit embeddings '{"text":"hello","model":"small"}' --priority 1
# 检查作业状态
swarm queue status embeddings job-id-123
# 列出队列内容
swarm queue list embeddings --state waiting --limit 10
swarm queue list embeddings --state failed
# 启动工作进程
swarm worker embeddings --concurrency 4
# 清理旧作业
swarm queue cleanup embeddings --before 7d
错误处理策略
优雅降级
// 如果作业失败,提供备用方案
const jobId = await queue.addJob('expensive-compute', { data });
try {
const job = await queue.getJob(jobId);
const state = await job?.getState();
if (state === 'failed') {
// 使用备用结果
return getCachedResult(data) || getDefaultResult();
}
} catch (error) {
// 队列不可用 - 使用备用
return getCachedResult(data);
}
死信处理
// 监控失败作业并移至死信处理
const metrics = await queue.getMetrics();
if (metrics.failed > 0) {
const failedJobs = []; // 从Redis获取失败作业
// 发送到死信队列供手动审查或重新触发
for (const job of failedJobs) {
console.error(`失败作业 ${job.id}: ${job.failedReason}`);
// 记录到监控系统
// 提醒值班工程师
}
}
性能调优
并发设置
// 低并发: 并行处理较少作业(资源使用较少)
const worker = await createWorker(
{
queueName: 'embeddings',
concurrency: 1, // 一次一个(适用于昂贵操作)
},
processor
);
// 高并发: 更多并行处理(更高吞吐量)
const worker = await createWorker(
{
queueName: 'simple-tasks',
concurrency: 16, // 最大并行(适用于I/O密集型操作)
},
processor
);
作业选项优化性能
// 移除已完成的作业以节省Redis内存
await queue.addJob('cleanup', { data }, {
removeOnComplete: true, // 不保留历史
attempts: 1, // 无需重试
});
// 保留重要作业以供审计
await queue.addJob('agent-task', { data }, {
removeOnComplete: false, // 保留历史
attempts: 3, // 失败时重试
});
测试
单元测试作业
import { describe, test, expect } from 'bun:test';
describe('队列作业', () => {
test('嵌入作业返回有效嵌入', async () => {
const processor = async (job) => {
// 测试用模拟作业
return {
success: true,
data: { embedding: [0.1, 0.2, 0.3] },
};
};
const result = await processor({
data: { payload: { text: 'hello', model: 'small' } },
});
expect(result.success).toBe(true);
expect(result.data.embedding).toHaveLength(3);
});
});
集成测试真实队列
test('作业在队列中成功完成', async () => {
const queue = createSwarmQueue({ name: 'test-queue' });
const jobId = await queue.addJob('test-job', { data: 'test' });
const job = await queue.getJob(jobId);
expect(job).toBeDefined();
// 清理
await queue.removeJob(jobId);
await queue.close();
});
总结
- 使用队列进行长时间运行、容错的工作
- 设置适当的优先级和重试策略
- 监控队列健康状态和失败率
- 在应用级别链式作业
- 优雅处理作业失败并提供备用方案
- 独立于API扩展工作进程