Cloudflare队列设置与管理技能 cloudflare-queues

此技能提供 Cloudflare Queues 的完整设置和管理指南,用于实现消息队列、异步处理和后台作业。它包括快速启动步骤、关键规则、常见错误解决方案、使用案例和参考资源,帮助用户优化队列性能并确保可靠性。关键词:Cloudflare Queues, 消息队列, 异步处理, 后台作业, 队列消费者, 死信队列, 重试逻辑, 批量处理, 队列监控, 云计算。

Serverless 0 次安装 0 次浏览 更新于 3/8/2026

名称: cloudflare-queues 描述: 此技能应在用户要求“设置 Cloudflare Queues”、“创建消息队列”、“实现队列消费者”、“处理后台作业”、“配置队列重试逻辑”、“发布消息到队列”、“实现死信队列”,或遇到“队列超时”、“消息重试”、“吞吐量超过限制”、“队列积压”错误时使用。

关键词: cloudflare 队列, 队列工作者, 消息队列, 队列绑定, 异步处理, 后台作业, 队列消费者, 队列生产者, 批量处理, 死信队列, dlq, 消息重试, 队列确认, 消费者并发, 队列积压, wrangler 队列 许可证: MIT 元数据: 版本: “3.0.0” wrangler_version: “4.50.0” workers_types_version: “4.20251126.0” 最后验证: “2025-12-27” 预防错误: 10 包含模板: 6 包含参考: 11 包含代理: 2 包含命令: 3

Cloudflare Queues

状态: 生产就绪 ✅ | 最后验证: 2025-12-27

依赖: cloudflare-worker-base(用于 Worker 设置)

内容: 快速开始关键规则前3大关键错误常见使用案例何时加载参考限制


快速开始 (10 分钟)

1. 创建队列

bunx wrangler queues create my-queue
bunx wrangler queues list

2. 生产者(发送消息)

wrangler.jsonc:

{
  "name": "my-producer",
  "main": "src/index.ts",
  "queues": {
    "producers": [
      {
        "binding": "MY_QUEUE",
        "queue": "my-queue"
      }
    ]
  }
}

src/index.ts:

import { Hono } from 'hono';

type Bindings = {
  MY_QUEUE: Queue;
};

const app = new Hono<{ Bindings: Bindings }>();

app.post('/send', async (c) => {
  await c.env.MY_QUEUE.send({
    userId: '123',
    action: 'process-order',
    timestamp: Date.now(),
  });

  return c.json({ status: 'queued' });
});

export default app;

3. 消费者(处理消息)

wrangler.jsonc:

{
  "name": "my-consumer",
  "main": "src/consumer.ts",
  "queues": {
    "consumers": [
      {
        "queue": "my-queue",
        "max_batch_size": 10,
        "max_retries": 3,
        "dead_letter_queue": "my-dlq"
      }
    ]
  }
}

src/consumer.ts:

import type { MessageBatch } from '@cloudflare/workers-types';

export default {
  async queue(batch: MessageBatch): Promise<void> {
    for (const message of batch.messages) {
      console.log('Processing:', message.body);
      // 您的逻辑在这里
    }
    // 隐式确认:成功返回即确认所有消息
  },
};

部署:

bunx wrangler deploy

加载: references/setup-guide.md 获取包含 DLQ 配置的完整 6 步设置


关键规则

必须做 ✅

  1. 配置死信队列 用于生产队列
  2. 使用显式 ack() 用于非幂等操作(数据库写入、API 调用)
  3. 验证消息大小 在发送前(<128 KB)
  4. 使用 sendBatch() 用于多条消息(更高效)
  5. 实现指数退避 用于重试
  6. 根据工作负载设置适当的批处理设置
  7. 监控队列积压和消费者错误
  8. 使用 ctx.waitUntil() 用于消费者中的异步清理
  9. 优雅处理错误 - 记录、告警、重试
  10. 让并发自动扩展(除非需要,不要设置 max_concurrency)

切勿做 ❌

  1. 切勿假设消息顺序 - 不保证 FIFO
  2. 切勿依赖隐式确认进行非幂等操作 - 使用显式 ack()
  3. 切勿发送消息 >128 KB - 将失败
  4. 切勿删除有活动消息的队列 - 数据丢失
  5. 切勿跳过 DLQ 配置 在生产中
  6. 切勿超过 5000 条消息/秒每队列 - 速率限制错误
  7. 切勿在循环中同步处理消息 - 使用 Promise.all()
  8. 切勿忽略 message.attempts - 用于退避逻辑
  9. 切勿设置 max_concurrency=1 除非有特定原因
  10. 切勿忘记 ack() 在显式确认模式中

前3大关键错误

错误 #1: 消息过大

问题: 消息超过 128 KB 限制

解决方案: 将大数据存储在 R2 中,发送引用

// ❌ 错误
await env.MY_QUEUE.send({ data: largeArray }); // >128 KB 失败

// ✅ 正确
const message = { data: largeArray };
const size = new TextEncoder().encode(JSON.stringify(message)).length;

if (size > 128000) {
  const key = `messages/${crypto.randomUUID()}.json`;
  await env.MY_BUCKET.put(key, JSON.stringify(message));
  await env.MY_QUEUE.send({ type: 'large-message', r2Key: key });
} else {
  await env.MY_QUEUE.send(message);
}

错误 #2: 吞吐量超过限制

问题: 超过每队列 5000 条消息/秒

解决方案: 使用 sendBatch() 和速率限制

// ❌ 错误
for (let i = 0; i < 10000; i++) {
  await env.MY_QUEUE.send({ id: i }); // 太快了!
}

// ✅ 正确
const messages = Array.from({ length: 10000 }, (_, i) => ({
  body: { id: i },
}));

// 以 100 条为一批发送
for (let i = 0; i < messages.length; i += 100) {
  await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
}

错误 #3: 当一条消息失败时整个批次重试

问题: 单条消息失败导致所有消息重试

解决方案: 使用显式确认

// ❌ 错误 - 隐式确认
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      await env.DB.prepare('INSERT INTO orders VALUES (?, ?)').bind(
        message.body.id,
        message.body.amount
      ).run();
    }
    // 如果任何失败,全部重试!
  },
};

// ✅ 正确 - 显式确认
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      try {
        await env.DB.prepare('INSERT INTO orders VALUES (?, ?)').bind(
          message.body.id,
          message.body.amount
        ).run();

        message.ack(); // 仅在成功时确认
      } catch (error) {
        console.error(`Failed: ${message.id}`, error);
        // 不确认 - 将独立重试
      }
    }
  },
};

加载 references/error-catalog.md 获取所有 10 个错误,包括 DLQ 配置、自动扩展问题、消息删除预防和详细解决方案。


常见使用案例

使用案例 1: 基本消息队列

何时: 简单的异步作业处理(电子邮件、通知)

快速模式:

// 生产者
await env.MY_QUEUE.send({ type: 'email', to: 'user@example.com' });

// 消费者(隐式确认 - 用于幂等操作)
export default {
  async queue(batch: MessageBatch): Promise<void> {
    for (const message of batch.messages) {
      await sendEmail(message.body.to, message.body.content);
    }
  },
};

加载: templates/queues-producer.ts + templates/queues-consumer-basic.ts

使用案例 2: 数据库写入(非幂等)

何时: 写入数据库,必须避免重复

加载: templates/queues-consumer-explicit-ack.ts + references/consumer-api.md

使用案例 3: 带指数退避的重试

何时: 调用速率限制的 API、临时失败

加载: templates/queues-retry-with-delay.ts + references/error-catalog.md(错误 #2, #3)

使用案例 4: 死信队列模式

何时: 生产系统,需要捕获永久失败的消息

加载: templates/queues-dlq-pattern.ts + references/setup-guide.md(步骤 4)

使用案例 5: 高吞吐量处理

何时: 处理每秒数千条消息

快速模式:

{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "max_batch_size": 100,        // 大批次
      "max_batch_timeout": 5,       // 快速处理
      "max_concurrency": null       // 自动扩展
    }]
  }
}

加载: references/best-practices.md → 优化吞吐量


何时加载参考

加载 references/setup-guide.md:

  • 用户需要完整的设置步骤(队列 → 生产者 → 消费者 → DLQ)
  • 第一次设置 Cloudflare Queues
  • 需要生产配置示例
  • 想要完整的端到端示例

加载 references/error-catalog.md:

  • 遇到任何 10 个记录的错误
  • 故障排除队列问题
  • 消息未传递/处理
  • 需要预防清单

加载 references/producer-api.md:

  • 需要完整的生产者 API 参考
  • 使用 send() 或 sendBatch() 方法
  • 需要消息格式规范
  • 处理大消息或批次

加载 references/consumer-api.md:

  • 需要完整的消费者 API 参考
  • 使用显式 ack(), retry(), 或 retryAll()
  • 需要批处理模式
  • 实现复杂的消费者逻辑

加载 references/best-practices.md:

  • 优化队列性能
  • 生产部署指南
  • 监控和可观测性设置
  • 安全和可靠性模式

加载 references/wrangler-commands.md:

  • 需要 CLI 命令参考
  • 管理队列(创建、删除、列表)
  • 控制传递(暂停、恢复)
  • 调试队列问题
  • 实时监控和性能分析

加载 references/typescript-types.md:

  • 需要完整的 TypeScript 类型定义
  • 使用 Queue、MessageBatch 或 Message 接口
  • 实现类型安全的生产者或消费者
  • 使用泛型类型作为消息体
  • 需要类型守卫进行消息验证

加载 references/production-checklist.md:

  • 准备生产部署
  • 需要部署前验证清单
  • 想要生产最佳实践的详细解释
  • 设置监控、DLQ 或错误处理
  • 计划负载测试或安全审查

加载 references/pull-consumers.md:

  • 需要从非 Workers 环境消费消息
  • 与现有后端服务集成(Node.js、Python、Go)
  • 实现基于拉取的轮询而不是基于推的消费
  • 使用容器化或传统应用程序

加载 references/http-publishing.md:

  • 通过 HTTP 从外部系统发布消息
  • 集成第三方服务的 Webhooks
  • 需要发送消息而无需部署 Workers
  • 实现 CI/CD 管道通知

加载 references/r2-event-integration.md:

  • 在 R2 存储桶事件上触发队列消息
  • 实现自动化的图像/文档处理
  • 设置事件驱动的数据管道
  • 需要 R2 对象上传/删除通知

代理与命令

可用代理:

  • queue-debugger - 9 阶段诊断分析队列问题(系统化故障排除)
  • queue-optimizer - 性能调优和成本优化(批次大小、并发、重试)

可用命令:

  • /queue-setup - 交互式向导用于完整队列设置
  • /queue-troubleshoot - 快速诊断常见问题
  • /queue-monitor - 实时指标和状态显示

限制与配额

关键限制:

  • 消息大小: 每消息最大 128 KB
  • 吞吐量: 每队列 5,000 条消息/秒
  • 批次大小: 每 sendBatch() 最大 100 条消息
  • 消费者 CPU 时间: 30 秒(默认),300 秒(最大配置)
  • 最大重试次数: 可配置(默认 3)
  • 队列名称: 最大 63 个字符,小写/数字/连字符

加载 references/best-practices.md 获取处理限制和优化策略。


配置参考

生产者: 将队列绑定添加到 wrangler.jsonc 的 queues.producers 数组,包含 bindingqueue 字段。

消费者: 在 wrangler.jsonc 的 queues.consumers 数组中配置,包含 queuemax_batch_size(1-100)、max_batch_timeout(0-60s)、max_retriesdead_letter_queue,可选 max_concurrency(默认:自动扩展)。

CPU 限制: 如果处理时间更长,增加 limits.cpu_ms 从默认 30,000ms。

加载 references/setup-guide.md 获取完整的配置示例和 templates/wrangler-queues-config.jsonc 获取生产就绪配置。


使用捆绑资源

参考 (references/)

模板 (templates/)

  • queues-producer.ts - 基本生产者,包含单条和批量发送
  • queues-consumer-basic.ts - 隐式确认消费者(幂等操作)
  • queues-consumer-explicit-ack.ts - 显式确认消费者(非幂等)
  • queues-retry-with-delay.ts - 指数退避重试模式
  • queues-dlq-pattern.ts - 死信队列设置和消费者
  • wrangler-queues-config.jsonc - 完整生产配置

TypeScript 类型

使用 @cloudflare/workers-types 包获取完整的类型定义:QueueMessageBatch<Body>Message<Body>QueueSendOptions

加载 references/typescript-types.md 获取完整的类型参考,包括接口、泛型、类型守卫和使用示例。


监控与调试

关键命令: wrangler queues info(状态)、wrangler tail(日志)、wrangler queues pause-delivery/resume-delivery(控制)。

加载 references/wrangler-commands.md 获取完整的 CLI 参考,包括实时监控、调试工作流和性能分析命令。


生产清单

12 点部署前清单: DLQ 配置、消息确认策略、大小验证、批次优化、并发设置、CPU 限制、错误处理、监控、速率限制、幂等性、负载测试和安全审查。

加载 references/production-checklist.md 获取完整的清单,包括详细解释、代码示例和部署工作流。


相关技能

  • cloudflare-worker-base - Worker 设置和配置
  • cloudflare-d1 - 队列消费者的数据库集成
  • cloudflare-r2 - 存储大消息负载
  • cloudflare-workflows - 更复杂的编排需求

官方文档


有问题?遇到问题?

  1. 检查 references/error-catalog.md 获取所有 10 个错误和解决方案
  2. 查看 references/setup-guide.md 获取完整的设置步骤
  3. 参见 references/best-practices.md 获取生产模式
  4. 检查官方文档: https://developers.cloudflare.com/queues/