名称: cloudflare-queues 描述: 此技能应在用户要求“设置 Cloudflare Queues”、“创建消息队列”、“实现队列消费者”、“处理后台作业”、“配置队列重试逻辑”、“发布消息到队列”、“实现死信队列”,或遇到“队列超时”、“消息重试”、“吞吐量超过限制”、“队列积压”错误时使用。
关键词: cloudflare 队列, 队列工作者, 消息队列, 队列绑定, 异步处理, 后台作业, 队列消费者, 队列生产者, 批量处理, 死信队列, dlq, 消息重试, 队列确认, 消费者并发, 队列积压, wrangler 队列 许可证: MIT 元数据: 版本: “3.0.0” wrangler_version: “4.50.0” workers_types_version: “4.20251126.0” 最后验证: “2025-12-27” 预防错误: 10 包含模板: 6 包含参考: 11 包含代理: 2 包含命令: 3
Cloudflare Queues
状态: 生产就绪 ✅ | 最后验证: 2025-12-27
依赖: cloudflare-worker-base(用于 Worker 设置)
内容: 快速开始 • 关键规则 • 前3大关键错误 • 常见使用案例 • 何时加载参考 • 限制
快速开始 (10 分钟)
1. 创建队列
bunx wrangler queues create my-queue
bunx wrangler queues list
2. 生产者(发送消息)
wrangler.jsonc:
{
"name": "my-producer",
"main": "src/index.ts",
"queues": {
"producers": [
{
"binding": "MY_QUEUE",
"queue": "my-queue"
}
]
}
}
src/index.ts:
import { Hono } from 'hono';
type Bindings = {
MY_QUEUE: Queue;
};
const app = new Hono<{ Bindings: Bindings }>();
app.post('/send', async (c) => {
await c.env.MY_QUEUE.send({
userId: '123',
action: 'process-order',
timestamp: Date.now(),
});
return c.json({ status: 'queued' });
});
export default app;
3. 消费者(处理消息)
wrangler.jsonc:
{
"name": "my-consumer",
"main": "src/consumer.ts",
"queues": {
"consumers": [
{
"queue": "my-queue",
"max_batch_size": 10,
"max_retries": 3,
"dead_letter_queue": "my-dlq"
}
]
}
}
src/consumer.ts:
import type { MessageBatch } from '@cloudflare/workers-types';
export default {
async queue(batch: MessageBatch): Promise<void> {
for (const message of batch.messages) {
console.log('Processing:', message.body);
// 您的逻辑在这里
}
// 隐式确认:成功返回即确认所有消息
},
};
部署:
bunx wrangler deploy
加载: references/setup-guide.md 获取包含 DLQ 配置的完整 6 步设置
关键规则
必须做 ✅
- 配置死信队列 用于生产队列
- 使用显式 ack() 用于非幂等操作(数据库写入、API 调用)
- 验证消息大小 在发送前(<128 KB)
- 使用 sendBatch() 用于多条消息(更高效)
- 实现指数退避 用于重试
- 根据工作负载设置适当的批处理设置
- 监控队列积压和消费者错误
- 使用 ctx.waitUntil() 用于消费者中的异步清理
- 优雅处理错误 - 记录、告警、重试
- 让并发自动扩展(除非需要,不要设置 max_concurrency)
切勿做 ❌
- 切勿假设消息顺序 - 不保证 FIFO
- 切勿依赖隐式确认进行非幂等操作 - 使用显式 ack()
- 切勿发送消息 >128 KB - 将失败
- 切勿删除有活动消息的队列 - 数据丢失
- 切勿跳过 DLQ 配置 在生产中
- 切勿超过 5000 条消息/秒每队列 - 速率限制错误
- 切勿在循环中同步处理消息 - 使用 Promise.all()
- 切勿忽略 message.attempts - 用于退避逻辑
- 切勿设置 max_concurrency=1 除非有特定原因
- 切勿忘记 ack() 在显式确认模式中
前3大关键错误
错误 #1: 消息过大
问题: 消息超过 128 KB 限制
解决方案: 将大数据存储在 R2 中,发送引用
// ❌ 错误
await env.MY_QUEUE.send({ data: largeArray }); // >128 KB 失败
// ✅ 正确
const message = { data: largeArray };
const size = new TextEncoder().encode(JSON.stringify(message)).length;
if (size > 128000) {
const key = `messages/${crypto.randomUUID()}.json`;
await env.MY_BUCKET.put(key, JSON.stringify(message));
await env.MY_QUEUE.send({ type: 'large-message', r2Key: key });
} else {
await env.MY_QUEUE.send(message);
}
错误 #2: 吞吐量超过限制
问题: 超过每队列 5000 条消息/秒
解决方案: 使用 sendBatch() 和速率限制
// ❌ 错误
for (let i = 0; i < 10000; i++) {
await env.MY_QUEUE.send({ id: i }); // 太快了!
}
// ✅ 正确
const messages = Array.from({ length: 10000 }, (_, i) => ({
body: { id: i },
}));
// 以 100 条为一批发送
for (let i = 0; i < messages.length; i += 100) {
await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
}
错误 #3: 当一条消息失败时整个批次重试
问题: 单条消息失败导致所有消息重试
解决方案: 使用显式确认
// ❌ 错误 - 隐式确认
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
await env.DB.prepare('INSERT INTO orders VALUES (?, ?)').bind(
message.body.id,
message.body.amount
).run();
}
// 如果任何失败,全部重试!
},
};
// ✅ 正确 - 显式确认
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await env.DB.prepare('INSERT INTO orders VALUES (?, ?)').bind(
message.body.id,
message.body.amount
).run();
message.ack(); // 仅在成功时确认
} catch (error) {
console.error(`Failed: ${message.id}`, error);
// 不确认 - 将独立重试
}
}
},
};
加载 references/error-catalog.md 获取所有 10 个错误,包括 DLQ 配置、自动扩展问题、消息删除预防和详细解决方案。
常见使用案例
使用案例 1: 基本消息队列
何时: 简单的异步作业处理(电子邮件、通知)
快速模式:
// 生产者
await env.MY_QUEUE.send({ type: 'email', to: 'user@example.com' });
// 消费者(隐式确认 - 用于幂等操作)
export default {
async queue(batch: MessageBatch): Promise<void> {
for (const message of batch.messages) {
await sendEmail(message.body.to, message.body.content);
}
},
};
加载: templates/queues-producer.ts + templates/queues-consumer-basic.ts
使用案例 2: 数据库写入(非幂等)
何时: 写入数据库,必须避免重复
加载: templates/queues-consumer-explicit-ack.ts + references/consumer-api.md
使用案例 3: 带指数退避的重试
何时: 调用速率限制的 API、临时失败
加载: templates/queues-retry-with-delay.ts + references/error-catalog.md(错误 #2, #3)
使用案例 4: 死信队列模式
何时: 生产系统,需要捕获永久失败的消息
加载: templates/queues-dlq-pattern.ts + references/setup-guide.md(步骤 4)
使用案例 5: 高吞吐量处理
何时: 处理每秒数千条消息
快速模式:
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 100, // 大批次
"max_batch_timeout": 5, // 快速处理
"max_concurrency": null // 自动扩展
}]
}
}
加载: references/best-practices.md → 优化吞吐量
何时加载参考
加载 references/setup-guide.md 当:
- 用户需要完整的设置步骤(队列 → 生产者 → 消费者 → DLQ)
- 第一次设置 Cloudflare Queues
- 需要生产配置示例
- 想要完整的端到端示例
加载 references/error-catalog.md 当:
- 遇到任何 10 个记录的错误
- 故障排除队列问题
- 消息未传递/处理
- 需要预防清单
加载 references/producer-api.md 当:
- 需要完整的生产者 API 参考
- 使用 send() 或 sendBatch() 方法
- 需要消息格式规范
- 处理大消息或批次
加载 references/consumer-api.md 当:
- 需要完整的消费者 API 参考
- 使用显式 ack(), retry(), 或 retryAll()
- 需要批处理模式
- 实现复杂的消费者逻辑
加载 references/best-practices.md 当:
- 优化队列性能
- 生产部署指南
- 监控和可观测性设置
- 安全和可靠性模式
加载 references/wrangler-commands.md 当:
- 需要 CLI 命令参考
- 管理队列(创建、删除、列表)
- 控制传递(暂停、恢复)
- 调试队列问题
- 实时监控和性能分析
加载 references/typescript-types.md 当:
- 需要完整的 TypeScript 类型定义
- 使用 Queue、MessageBatch 或 Message 接口
- 实现类型安全的生产者或消费者
- 使用泛型类型作为消息体
- 需要类型守卫进行消息验证
加载 references/production-checklist.md 当:
- 准备生产部署
- 需要部署前验证清单
- 想要生产最佳实践的详细解释
- 设置监控、DLQ 或错误处理
- 计划负载测试或安全审查
加载 references/pull-consumers.md 当:
- 需要从非 Workers 环境消费消息
- 与现有后端服务集成(Node.js、Python、Go)
- 实现基于拉取的轮询而不是基于推的消费
- 使用容器化或传统应用程序
加载 references/http-publishing.md 当:
- 通过 HTTP 从外部系统发布消息
- 集成第三方服务的 Webhooks
- 需要发送消息而无需部署 Workers
- 实现 CI/CD 管道通知
加载 references/r2-event-integration.md 当:
- 在 R2 存储桶事件上触发队列消息
- 实现自动化的图像/文档处理
- 设置事件驱动的数据管道
- 需要 R2 对象上传/删除通知
代理与命令
可用代理:
- queue-debugger - 9 阶段诊断分析队列问题(系统化故障排除)
- queue-optimizer - 性能调优和成本优化(批次大小、并发、重试)
可用命令:
- /queue-setup - 交互式向导用于完整队列设置
- /queue-troubleshoot - 快速诊断常见问题
- /queue-monitor - 实时指标和状态显示
限制与配额
关键限制:
- 消息大小: 每消息最大 128 KB
- 吞吐量: 每队列 5,000 条消息/秒
- 批次大小: 每 sendBatch() 最大 100 条消息
- 消费者 CPU 时间: 30 秒(默认),300 秒(最大配置)
- 最大重试次数: 可配置(默认 3)
- 队列名称: 最大 63 个字符,小写/数字/连字符
加载 references/best-practices.md 获取处理限制和优化策略。
配置参考
生产者: 将队列绑定添加到 wrangler.jsonc 的 queues.producers 数组,包含 binding 和 queue 字段。
消费者: 在 wrangler.jsonc 的 queues.consumers 数组中配置,包含 queue、max_batch_size(1-100)、max_batch_timeout(0-60s)、max_retries、dead_letter_queue,可选 max_concurrency(默认:自动扩展)。
CPU 限制: 如果处理时间更长,增加 limits.cpu_ms 从默认 30,000ms。
加载 references/setup-guide.md 获取完整的配置示例和 templates/wrangler-queues-config.jsonc 获取生产就绪配置。
使用捆绑资源
参考 (references/)
- setup-guide.md - 完整 6 步设置(队列 → 生产者 → 消费者 → DLQ → 部署 → 生产配置)
- error-catalog.md - 所有 10 个错误及解决方案 + 预防清单
- producer-api.md - 完整生产者 API(send、sendBatch、消息格式)
- consumer-api.md - 完整消费者 API(ack、retry、批处理)
- best-practices.md - 性能、监控、安全、可靠性模式
- wrangler-commands.md - CLI 参考(创建、删除、列表、暂停、恢复)
- typescript-types.md - 完整的 TypeScript 类型定义,用于 Queue、MessageBatch、Message
- production-checklist.md - 部署前验证和最佳实践
- pull-consumers.md - 非 Workers 环境的基于拉取的消费者(HTTP 轮询)
- http-publishing.md - 通过 HTTP API 从外部系统发布消息
- r2-event-integration.md - R2 事件通知触发队列消息
模板 (templates/)
- queues-producer.ts - 基本生产者,包含单条和批量发送
- queues-consumer-basic.ts - 隐式确认消费者(幂等操作)
- queues-consumer-explicit-ack.ts - 显式确认消费者(非幂等)
- queues-retry-with-delay.ts - 指数退避重试模式
- queues-dlq-pattern.ts - 死信队列设置和消费者
- wrangler-queues-config.jsonc - 完整生产配置
TypeScript 类型
使用 @cloudflare/workers-types 包获取完整的类型定义:Queue、MessageBatch<Body>、Message<Body>、QueueSendOptions。
加载 references/typescript-types.md 获取完整的类型参考,包括接口、泛型、类型守卫和使用示例。
监控与调试
关键命令: wrangler queues info(状态)、wrangler tail(日志)、wrangler queues pause-delivery/resume-delivery(控制)。
加载 references/wrangler-commands.md 获取完整的 CLI 参考,包括实时监控、调试工作流和性能分析命令。
生产清单
12 点部署前清单: DLQ 配置、消息确认策略、大小验证、批次优化、并发设置、CPU 限制、错误处理、监控、速率限制、幂等性、负载测试和安全审查。
加载 references/production-checklist.md 获取完整的清单,包括详细解释、代码示例和部署工作流。
相关技能
- cloudflare-worker-base - Worker 设置和配置
- cloudflare-d1 - 队列消费者的数据库集成
- cloudflare-r2 - 存储大消息负载
- cloudflare-workflows - 更复杂的编排需求
官方文档
- Cloudflare Queues: https://developers.cloudflare.com/queues/
- Configuration: https://developers.cloudflare.com/queues/configuration/
- Wrangler Commands: https://developers.cloudflare.com/workers/wrangler/commands/#queues
- Limits: https://developers.cloudflare.com/queues/platform/limits/
- Troubleshooting: https://developers.cloudflare.com/queues/observability/troubleshooting/
- Best Practices: https://developers.cloudflare.com/queues/configuration/best-practices/
有问题?遇到问题?
- 检查
references/error-catalog.md获取所有 10 个错误和解决方案 - 查看
references/setup-guide.md获取完整的设置步骤 - 参见
references/best-practices.md获取生产模式 - 检查官方文档: https://developers.cloudflare.com/queues/