名称:使用消息队列 描述:使用消息代理和任务队列实现异步通信模式。适用于构建事件驱动系统、后台作业处理或服务解耦。涵盖Kafka(事件流)、RabbitMQ(复杂路由)、NATS(云原生)、Redis Streams、Celery(Python)、BullMQ(TypeScript)、Temporal(工作流)和事件源模式。
消息队列
为事件驱动架构、后台作业处理和服务解耦实现异步通信模式。
何时使用此技能
在以下情况下使用消息队列:
- 长时间运行的操作阻塞HTTP请求(报告生成、视频处理)
- 需要服务解耦(微服务、事件驱动架构)
- 需要保证交付(支付处理、订单履行)
- 事件流用于分析(日志聚合、指标管道)
- 工作流程编排用于复杂过程(多步骤Saga、人工介入)
- 后台作业处理(邮件发送、图像调整)
代理选择决策树
根据主要需求选择消息代理:
事件流 / 日志聚合
→ Apache Kafka
- 吞吐量:500K-1M 条消息/秒
- 重放事件(事件源)
- 精确一次语义
- 长期保留
- 使用:分析管道、CQRS、事件源
简单后台作业
→ 任务队列
- Python → Celery + Redis
- TypeScript → BullMQ + Redis
- Go → Asynq + Redis
- 使用:邮件发送、报告生成、Webhook
复杂工作流 / Saga
→ Temporal
- 持久执行(重启后恢复)
- 支持Saga模式
- 人工介入工作流
- 使用:订单处理、AI代理编排
请求-回复 / RPC模式
→ NATS
- 内置请求-回复
- 亚毫秒延迟
- 云原生、操作简单
- 使用:微服务RPC、物联网命令/控制
复杂消息路由
→ RabbitMQ
- 交换机(直连、主题、扇出、头)
- 死信交换机
- 消息TTL、优先级
- 使用:多消费者模式、发布/订阅
已使用Redis
→ Redis Streams
- 无需新基础设施
- 简单消费者组
- 中等吞吐量(100K+ 条消息/秒)
- 使用:通知队列、简单作业队列
性能比较
| 代理 | 吞吐量 | 延迟(p99) | 最佳用途 |
|---|---|---|---|
| Kafka | 500K-1M 条消息/秒 | 10-50ms | 事件流 |
| NATS JetStream | 200K-400K 条消息/秒 | 亚毫秒到5ms | 云原生微服务 |
| RabbitMQ | 50K-100K 条消息/秒 | 5-20ms | 任务队列、复杂路由 |
| Redis Streams | 100K+ 条消息/秒 | 亚毫秒 | 简单队列、缓存 |
快速入门示例
Kafka 生产者/消费者(Python)
参见 examples/kafka-python/ 获取工作代码。
from confluent_kafka import Producer, Consumer
# 生产者
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()
# 消费者
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg is not None:
process_order(msg.value())
Celery 后台作业(Python)
参见 examples/celery-image-processing/ 获取完整实现。
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
try:
result = expensive_image_processing(image_url)
return result
except RecoverableError as e:
raise self.retry(exc=e, countdown=60)
BullMQ 作业处理(TypeScript)
参见 examples/bullmq-webhook-processor/ 获取完整实现。
import { Queue, Worker } from 'bullmq'
const queue = new Queue('webhooks', {
connection: { host: 'localhost', port: 6379 }
})
// 入队作业
await queue.add('send-webhook', {
url: 'https://example.com/webhook',
payload: { event: 'order.created' }
})
// 处理作业
const worker = new Worker('webhooks', async job => {
await fetch(job.data.url, {
method: 'POST',
body: JSON.stringify(job.data.payload)
})
}, { connection: { host: 'localhost', port: 6379 } })
Temporal 工作流程编排
参见 examples/temporal-order-saga/ 获取Saga模式实现。
from temporalio import workflow, activity
from datetime import timedelta
@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# 步骤1:保留库存
inventory_id = await workflow.execute_activity(
reserve_inventory,
order_id,
start_to_close_timeout=timedelta(seconds=10),
)
# 步骤2:收费支付
payment_id = await workflow.execute_activity(
charge_payment,
order_id,
start_to_close_timeout=timedelta(seconds=30),
)
return f"订单 {order_id} 完成"
核心模式
事件命名约定
使用:域.实体.操作.版本
示例:
order.created.v1user.profile.updated.v2payment.failed.v1
事件模式结构
{
"event_type": "order.created.v2",
"event_id": "uuid-here",
"timestamp": "2025-12-02T10:00:00Z",
"version": "2.0",
"data": {
"order_id": "ord_123",
"customer_id": "cus_456"
},
"metadata": {
"producer": "order-service",
"trace_id": "abc123",
"correlation_id": "xyz789"
}
}
死信队列模式
在最大重试后将失败消息路由到死信队列(DLQ):
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
try:
result = perform_processing(order_id)
return result
except UnrecoverableError as e:
send_to_dlq(order_id, str(e))
raise Reject(e, requeue=False)
幂等性用于精确一次处理
@app.post("/process")
async def process_payment(
payment_data: dict,
idempotency_key: str = Header(None)
):
# 检查是否已处理
cached_result = redis_client.get(f"idempotency:{idempotency_key}")
if cached_result:
return {"status": "already_processed"}
result = process_payment_logic(payment_data)
redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
return {"status": "processed", "result": result}
前端集成
通过SSE更新作业状态
# FastAPI端点用于实时作业状态
@app.get("/status/{task_id}")
async def task_status_stream(task_id: str):
async def event_generator():
while True:
task = celery_app.AsyncResult(task_id)
if task.state == 'PROGRESS':
yield {"event": "progress", "data": task.info.get('progress', 0)}
elif task.state == 'SUCCESS':
yield {"event": "complete", "data": task.result}
break
await asyncio.sleep(0.5)
return EventSourceResponse(event_generator())
React组件
export function JobStatus({ jobId }: { jobId: string }) {
const [progress, setProgress] = useState(0)
useEffect(() => {
const eventSource = new EventSource(`/api/status/${jobId}`)
eventSource.addEventListener('progress', (e) => {
setProgress(JSON.parse(e.data))
})
eventSource.addEventListener('complete', (e) => {
toast({ title: '作业完成', description: JSON.parse(e.data) })
eventSource.close()
})
return () => eventSource.close()
}, [jobId])
return <ProgressBar value={progress} />
}
详细指南
有关全面文档,请参阅参考文件:
代理特定指南
- Kafka:参见
references/kafka.md获取分区、消费者组、精确一次语义 - RabbitMQ:参见
references/rabbitmq.md获取交换机、绑定、路由模式 - NATS:参见
references/nats.md获取JetStream、请求-回复模式 - Redis Streams:参见
references/redis-streams.md获取消费者组、确认
任务队列指南
- Celery:参见
references/celery.md获取周期性任务、画布(工作流)、监控 - BullMQ:参见
references/bullmq.md获取作业优先级、流、Bull Board监控 - Temporal:参见
references/temporal-workflows.md获取Saga模式、信号、查询
模式指南
- 事件模式:参见
references/event-patterns.md获取事件源、CQRS、出箱模式
避免的常见反模式
1. 使用同步API处理长时间操作
# ❌ 不好:阻塞请求线程
@app.post("/generate-report")
def generate_report(user_id: str):
report = expensive_computation(user_id) # 5分钟!
return report
# ✅ 好:入队后台作业
@app.post("/generate-report")
async def generate_report(user_id: str):
task = generate_report_task.delay(user_id)
return {"task_id": task.id}
2. 非幂等消费者
# ❌ 不好:处理重复项
@app.task
def send_email(email: str):
send_email_service(email) # 如果重试,发送两次!
# ✅ 好:具有去重的幂等性
@app.task
def send_email(email: str, idempotency_key: str):
if redis.exists(f"sent:{idempotency_key}"):
return "already_sent"
send_email_service(email)
redis.setex(f"sent:{idempotency_key}", 86400, "1")
3. 忽略死信队列
# ❌ 不好:失败消息永远丢失
@app.task(max_retries=3)
def risky_task(data):
process(data) # 如果所有重试失败,数据消失
# ✅ 好:DLQ用于手动检查
@app.task(max_retries=3)
def risky_task(data):
try:
process(data)
except Exception as e:
if self.request.retries >= 3:
send_to_dlq(data, str(e))
raise
4. 使用Kafka进行请求-回复
# ❌ 不好:Kafka未设计用于RPC
def get_user_profile(user_id: str):
kafka_producer.send("user_requests", {"user_id": user_id})
# 如何关联响应?Kafka是异步的!
# ✅ 好:使用NATS请求-回复或HTTP/gRPC
response = await nats.request("user.profile", user_id.encode())
库推荐
Context7研究
Confluent Kafka(Python)
- Context7 ID:
/confluentinc/confluent-kafka-python - 信任分数:68.8/100
- 代码片段:192+
- 生产就绪的Python Kafka客户端
Temporal
- Context7 ID:
/websites/temporal_io - 信任分数:80.9/100
- 代码片段:3,769+
- 持久执行的工作流程编排
安装
Python:
pip install confluent-kafka celery[redis] temporalio aio-pika redis
TypeScript/Node.js:
npm install kafkajs bullmq @temporalio/client amqplib ioredis
Rust:
cargo add rdkafka lapin async-nats redis
Go:
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdk
实用工具
使用脚本进行设置自动化:
- Kafka设置:运行
python scripts/kafka_producer_consumer.py获取测试工具 - 模式验证:运行
python scripts/validate_message_schema.py验证事件模式
相关技能
- api-patterns:异步作业提交的API设计
- realtime-sync:WebSocket/SSE用于作业状态更新
- feedback:Toast通知用于作业完成
- databases-*:事件日志的持久存储
- observability:队列操作的追踪和指标