消息队列应用技能Skill using-message-queues

该技能涉及使用消息代理和任务队列实现异步通信模式,专为构建事件驱动系统、处理后台作业或实现服务解耦而设计。涵盖Kafka、RabbitMQ、NATS、Redis Streams、Celery、BullMQ和Temporal等技术,适用于微服务架构、数据分析管道和复杂工作流编排。关键词:消息队列、异步通信、事件驱动、服务解耦、后台作业、任务队列、事件流、工作流编排、微服务、Kafka、RabbitMQ、NATS、Redis、Celery、BullMQ、Temporal。

架构设计 0 次安装 0 次浏览 更新于 3/23/2026

名称:使用消息队列 描述:使用消息代理和任务队列实现异步通信模式。适用于构建事件驱动系统、后台作业处理或服务解耦。涵盖Kafka(事件流)、RabbitMQ(复杂路由)、NATS(云原生)、Redis Streams、Celery(Python)、BullMQ(TypeScript)、Temporal(工作流)和事件源模式。

消息队列

为事件驱动架构、后台作业处理和服务解耦实现异步通信模式。

何时使用此技能

在以下情况下使用消息队列:

  • 长时间运行的操作阻塞HTTP请求(报告生成、视频处理)
  • 需要服务解耦(微服务、事件驱动架构)
  • 需要保证交付(支付处理、订单履行)
  • 事件流用于分析(日志聚合、指标管道)
  • 工作流程编排用于复杂过程(多步骤Saga、人工介入)
  • 后台作业处理(邮件发送、图像调整)

代理选择决策树

根据主要需求选择消息代理:

事件流 / 日志聚合

→ Apache Kafka

  • 吞吐量:500K-1M 条消息/秒
  • 重放事件(事件源)
  • 精确一次语义
  • 长期保留
  • 使用:分析管道、CQRS、事件源

简单后台作业

→ 任务队列

  • Python → Celery + Redis
  • TypeScript → BullMQ + Redis
  • Go → Asynq + Redis
  • 使用:邮件发送、报告生成、Webhook

复杂工作流 / Saga

→ Temporal

  • 持久执行(重启后恢复)
  • 支持Saga模式
  • 人工介入工作流
  • 使用:订单处理、AI代理编排

请求-回复 / RPC模式

→ NATS

  • 内置请求-回复
  • 亚毫秒延迟
  • 云原生、操作简单
  • 使用:微服务RPC、物联网命令/控制

复杂消息路由

→ RabbitMQ

  • 交换机(直连、主题、扇出、头)
  • 死信交换机
  • 消息TTL、优先级
  • 使用:多消费者模式、发布/订阅

已使用Redis

→ Redis Streams

  • 无需新基础设施
  • 简单消费者组
  • 中等吞吐量(100K+ 条消息/秒)
  • 使用:通知队列、简单作业队列

性能比较

代理 吞吐量 延迟(p99) 最佳用途
Kafka 500K-1M 条消息/秒 10-50ms 事件流
NATS JetStream 200K-400K 条消息/秒 亚毫秒到5ms 云原生微服务
RabbitMQ 50K-100K 条消息/秒 5-20ms 任务队列、复杂路由
Redis Streams 100K+ 条消息/秒 亚毫秒 简单队列、缓存

快速入门示例

Kafka 生产者/消费者(Python)

参见 examples/kafka-python/ 获取工作代码。

from confluent_kafka import Producer, Consumer

# 生产者
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()

# 消费者
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processors',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])

while True:
    msg = consumer.poll(1.0)
    if msg is not None:
        process_order(msg.value())

Celery 后台作业(Python)

参见 examples/celery-image-processing/ 获取完整实现。

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
    try:
        result = expensive_image_processing(image_url)
        return result
    except RecoverableError as e:
        raise self.retry(exc=e, countdown=60)

BullMQ 作业处理(TypeScript)

参见 examples/bullmq-webhook-processor/ 获取完整实现。

import { Queue, Worker } from 'bullmq'

const queue = new Queue('webhooks', {
  connection: { host: 'localhost', port: 6379 }
})

// 入队作业
await queue.add('send-webhook', {
  url: 'https://example.com/webhook',
  payload: { event: 'order.created' }
})

// 处理作业
const worker = new Worker('webhooks', async job => {
  await fetch(job.data.url, {
    method: 'POST',
    body: JSON.stringify(job.data.payload)
  })
}, { connection: { host: 'localhost', port: 6379 } })

Temporal 工作流程编排

参见 examples/temporal-order-saga/ 获取Saga模式实现。

from temporalio import workflow, activity
from datetime import timedelta

@workflow.defn
class OrderSagaWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # 步骤1:保留库存
        inventory_id = await workflow.execute_activity(
            reserve_inventory,
            order_id,
            start_to_close_timeout=timedelta(seconds=10),
        )

        # 步骤2:收费支付
        payment_id = await workflow.execute_activity(
            charge_payment,
            order_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return f"订单 {order_id} 完成"

核心模式

事件命名约定

使用:域.实体.操作.版本

示例:

  • order.created.v1
  • user.profile.updated.v2
  • payment.failed.v1

事件模式结构

{
  "event_type": "order.created.v2",
  "event_id": "uuid-here",
  "timestamp": "2025-12-02T10:00:00Z",
  "version": "2.0",
  "data": {
    "order_id": "ord_123",
    "customer_id": "cus_456"
  },
  "metadata": {
    "producer": "order-service",
    "trace_id": "abc123",
    "correlation_id": "xyz789"
  }
}

死信队列模式

在最大重试后将失败消息路由到死信队列(DLQ):

@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
    try:
        result = perform_processing(order_id)
        return result
    except UnrecoverableError as e:
        send_to_dlq(order_id, str(e))
        raise Reject(e, requeue=False)

幂等性用于精确一次处理

@app.post("/process")
async def process_payment(
    payment_data: dict,
    idempotency_key: str = Header(None)
):
    # 检查是否已处理
    cached_result = redis_client.get(f"idempotency:{idempotency_key}")
    if cached_result:
        return {"status": "already_processed"}

    result = process_payment_logic(payment_data)
    redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
    return {"status": "processed", "result": result}

前端集成

通过SSE更新作业状态

# FastAPI端点用于实时作业状态
@app.get("/status/{task_id}")
async def task_status_stream(task_id: str):
    async def event_generator():
        while True:
            task = celery_app.AsyncResult(task_id)

            if task.state == 'PROGRESS':
                yield {"event": "progress", "data": task.info.get('progress', 0)}
            elif task.state == 'SUCCESS':
                yield {"event": "complete", "data": task.result}
                break

            await asyncio.sleep(0.5)

    return EventSourceResponse(event_generator())

React组件

export function JobStatus({ jobId }: { jobId: string }) {
  const [progress, setProgress] = useState(0)

  useEffect(() => {
    const eventSource = new EventSource(`/api/status/${jobId}`)

    eventSource.addEventListener('progress', (e) => {
      setProgress(JSON.parse(e.data))
    })

    eventSource.addEventListener('complete', (e) => {
      toast({ title: '作业完成', description: JSON.parse(e.data) })
      eventSource.close()
    })

    return () => eventSource.close()
  }, [jobId])

  return <ProgressBar value={progress} />
}

详细指南

有关全面文档,请参阅参考文件:

代理特定指南

  • Kafka:参见 references/kafka.md 获取分区、消费者组、精确一次语义
  • RabbitMQ:参见 references/rabbitmq.md 获取交换机、绑定、路由模式
  • NATS:参见 references/nats.md 获取JetStream、请求-回复模式
  • Redis Streams:参见 references/redis-streams.md 获取消费者组、确认

任务队列指南

  • Celery:参见 references/celery.md 获取周期性任务、画布(工作流)、监控
  • BullMQ:参见 references/bullmq.md 获取作业优先级、流、Bull Board监控
  • Temporal:参见 references/temporal-workflows.md 获取Saga模式、信号、查询

模式指南

  • 事件模式:参见 references/event-patterns.md 获取事件源、CQRS、出箱模式

避免的常见反模式

1. 使用同步API处理长时间操作

# ❌ 不好:阻塞请求线程
@app.post("/generate-report")
def generate_report(user_id: str):
    report = expensive_computation(user_id)  # 5分钟!
    return report

# ✅ 好:入队后台作业
@app.post("/generate-report")
async def generate_report(user_id: str):
    task = generate_report_task.delay(user_id)
    return {"task_id": task.id}

2. 非幂等消费者

# ❌ 不好:处理重复项
@app.task
def send_email(email: str):
    send_email_service(email)  # 如果重试,发送两次!

# ✅ 好:具有去重的幂等性
@app.task
def send_email(email: str, idempotency_key: str):
    if redis.exists(f"sent:{idempotency_key}"):
        return "already_sent"
    send_email_service(email)
    redis.setex(f"sent:{idempotency_key}", 86400, "1")

3. 忽略死信队列

# ❌ 不好:失败消息永远丢失
@app.task(max_retries=3)
def risky_task(data):
    process(data)  # 如果所有重试失败,数据消失

# ✅ 好:DLQ用于手动检查
@app.task(max_retries=3)
def risky_task(data):
    try:
        process(data)
    except Exception as e:
        if self.request.retries >= 3:
            send_to_dlq(data, str(e))
        raise

4. 使用Kafka进行请求-回复

# ❌ 不好:Kafka未设计用于RPC
def get_user_profile(user_id: str):
    kafka_producer.send("user_requests", {"user_id": user_id})
    # 如何关联响应?Kafka是异步的!

# ✅ 好:使用NATS请求-回复或HTTP/gRPC
response = await nats.request("user.profile", user_id.encode())

库推荐

Context7研究

Confluent Kafka(Python)

  • Context7 ID:/confluentinc/confluent-kafka-python
  • 信任分数:68.8/100
  • 代码片段:192+
  • 生产就绪的Python Kafka客户端

Temporal

  • Context7 ID:/websites/temporal_io
  • 信任分数:80.9/100
  • 代码片段:3,769+
  • 持久执行的工作流程编排

安装

Python:

pip install confluent-kafka celery[redis] temporalio aio-pika redis

TypeScript/Node.js:

npm install kafkajs bullmq @temporalio/client amqplib ioredis

Rust:

cargo add rdkafka lapin async-nats redis

Go:

go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdk

实用工具

使用脚本进行设置自动化:

  • Kafka设置:运行 python scripts/kafka_producer_consumer.py 获取测试工具
  • 模式验证:运行 python scripts/validate_message_schema.py 验证事件模式

相关技能

  • api-patterns:异步作业提交的API设计
  • realtime-sync:WebSocket/SSE用于作业状态更新
  • feedback:Toast通知用于作业完成
  • databases-*:事件日志的持久存储
  • observability:队列操作的追踪和指标