RabbitMQ 模式
概览
RabbitMQ消息模式和分布式系统最佳实践的全面指南。
目录
RabbitMQ 概念
核心组件
# RabbitMQ 核心概念
"""
- 生产者:发送消息的应用程序
- 消费者:接收消息的应用程序
- 队列:存储消息的缓冲区
- 交换机:从生产者接收消息并将其路由到队列
- 绑定:交换机和队列之间的链接
- 路由键:交换机用来路由消息的键
- 连接:应用程序和RabbitMQ之间的TCP连接
- 通道:TCP连接内的虚拟连接
"""
基本连接(Node.js)
// rabbitmq-connection.ts
import amqp from 'amqplib';
export class RabbitMQConnection {
private connection: amqp.Connection | null = null;
private channel: amqp.Channel | null = null;
async connect(url: string = 'amqp://localhost'): Promise<void> {
try {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
console.log('Connected to RabbitMQ');
} catch (error) {
console.error('Failed to connect to RabbitMQ:', error);
throw error;
}
}
getChannel(): amqp.Channel {
if (!this.channel) {
throw new Error('Channel not initialized');
}
return this.channel;
}
async close(): Promise<void> {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
}
}
// 使用方法
const connection = new RabbitMQConnection();
await connection.connect('amqp://user:password@localhost:5672');
const channel = connection.getChannel();
基本连接(Python)
# rabbitmq_connection.py
import pika
import logging
class RabbitMQConnection:
def __init__(self, url: str = 'amqp://localhost'):
self.url = url
self.connection = None
self.channel = None
def connect(self):
"""Establish connection to RabbitMQ"""
try:
parameters = pika.URLParameters(self.url)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
logging.info("Connected to RabbitMQ")
return self.channel
except Exception as e:
logging.error(f"Failed to connect to RabbitMQ: {e}")
raise
def close(self):
"""Close connection"""
if self.channel:
self.channel.close()
if self.connection:
self.connection.close()
logging.info("RabbitMQ connection closed")
# 使用方法
connection = RabbitMQConnection('amqp://user:password@localhost:5672')
channel = connection.connect()
交换机类型
直连交换机
// direct-exchange.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class DirectExchange {
constructor(private connection: RabbitMQConnection) {}
async setup(exchangeName: string, queueNames: string[], routingKeys: string[]) {
const channel = this.connection.getChannel();
// 声明直连交换机
await channel.assertExchange(exchangeName, 'direct', { durable: true });
// 声明并绑定队列
for (let i = 0; i < queueNames.length; i++) {
await channel.assertQueue(queueNames[i], { durable: true });
await channel.bindQueue(queueNames[i], exchangeName, routingKeys[i]);
}
}
async publish(exchangeName: string, routingKey: string, message: any) {
const channel = this.connection.getChannel();
channel.publish(
exchangeName,
routingKey,
Buffer.from(JSON.stringify(message)),
{ persistent: true }
);
}
}
// 使用方法
const direct = new DirectExchange(connection);
await direct.setup('orders', ['order-processing', 'order-shipping'], ['process', 'ship']);
await direct.publish('orders', 'process', { orderId: 123 });
主题交换机
// topic-exchange.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class TopicExchange {
constructor(private connection: RabbitMQConnection) {}
async setup(exchangeName: string, bindings: Array<{ queue: string; pattern: string }>) {
const channel = this.connection.getChannel();
// 声明主题交换机
await channel.assertExchange(exchangeName, 'topic', { durable: true });
// 声明并绑定队列
for (const binding of bindings) {
await channel.assertQueue(binding.queue, { durable: true });
await channel.bindQueue(binding.queue, exchangeName, binding.pattern);
}
}
async publish(exchangeName: string, routingKey: string, message: any) {
const channel = this.connection.getChannel();
channel.publish(
exchangeName,
routingKey,
Buffer.from(JSON.stringify(message)),
{ persistent: true }
);
}
}
// 使用方法
const topic = new TopicExchange(connection);
await topic.setup('logs', [
{ queue: 'error-logs', pattern: '*.error' },
{ queue: 'all-logs', pattern: '#' }
]);
// 发布带有路由键的消息
await topic.publish('logs', 'app.error', { message: 'Error occurred' });
await topic.publish('logs', 'app.info', { message: 'Info message' });
扇形交换机
// fanout-exchange.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class FanoutExchange {
constructor(private connection: RabbitMQConnection) {}
async setup(exchangeName: string, queueNames: string[]) {
const channel = this.connection.getChannel();
// 声明扇形交换机
await channel.assertExchange(exchangeName, 'fanout', { durable: true });
// 声明并绑定队列(不需要路由键)
for (const queueName of queueNames) {
await channel.assertQueue(queueName, { durable: true });
await channel.bindQueue(queueName, exchangeName, '');
}
}
async publish(exchangeName: string, message: any) {
const channel = this.connection.getChannel();
channel.publish(
exchangeName,
'', // 扇形忽略路由键
Buffer.from(JSON.stringify(message)),
{ persistent: true }
);
}
}
// 使用方法
const fanout = new FanoutExchange(connection);
await fanout.setup('notifications', ['email-queue', 'sms-queue', 'push-queue']);
await fanout.publish('notifications', { userId: 123, message: 'Hello!' });
头交换机
// headers-exchange.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class HeadersExchange {
constructor(private connection: RabbitMQConnection) {}
async setup(exchangeName: string, bindings: Array<{ queue: string; headers: any }>) {
const channel = this.connection.getChannel();
// 声明头交换机
await channel.assertExchange(exchangeName, 'headers', { durable: true });
// 声明并绑定队列
for (const binding of bindings) {
await channel.assertQueue(binding.queue, { durable: true });
await channel.bindQueue(binding.queue, exchangeName, '', binding.headers);
}
}
async publish(exchangeName: string, message: any, headers: any) {
const channel = this.connection.getChannel();
channel.publish(
exchangeName,
'',
Buffer.from(JSON.stringify(message)),
{ persistent: true, headers }
);
}
}
// 使用方法
const headers = new HeadersExchange(connection);
await headers.setup('priority', [
{ queue: 'high-priority', headers: { 'x-match': 'all', priority: 'high' } },
{ queue: 'low-priority', headers: { 'x-match': 'all', priority: 'low' } }
]);
await headers.publish('priority', { data: 'test' }, { priority: 'high' });
消息模式
工作队列(竞争消费者)
// work-queue.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class WorkQueue {
constructor(private connection: RabbitMQConnection) {}
async setup(queueName: string) {
const channel = this.connection.getChannel();
await channel.assertQueue(queueName, { durable: true });
await channel.prefetch(1); // 公平分发
}
async publish(queueName: string, task: any) {
const channel = this.connection.getChannel();
channel.sendToQueue(
queueName,
Buffer.from(JSON.stringify(task)),
{ persistent: true }
);
}
async consume(queueName: string, handler: (task: any) => Promise<void>) {
const channel = this.connection.getChannel();
await channel.consume(
queueName,
async (msg) => {
if (msg) {
try {
const task = JSON.parse(msg.content.toString());
await handler(task);
channel.ack(msg);
} catch (error) {
console.error('Error processing task:', error);
channel.nack(msg, false, true); // 重新入队
}
}
},
{ noAck: false }
);
}
}
// 使用方法
const workQueue = new WorkQueue(connection);
await workQueue.setup('tasks');
// 生产者
await workQueue.publish('tasks', { id: 1, data: 'Process me' });
// 消费者
await workQueue.consume('tasks', async (task) => {
console.log('Processing task:', task.id);
await processTask(task);
});
发布/订阅
// pub-sub.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class PubSub {
constructor(private connection: RabbitMQConnection) {}
async setup(exchangeName: string, queueNames: string[]) {
const channel = this.connection.getChannel();
await channel.assertExchange(exchangeName, 'fanout', { durable: true });
for (const queueName of queueNames) {
await channel.assertQueue(queueName, { durable: false }); // 临时队列非持久化
await channel.bindQueue(queueName, exchangeName, '');
}
}
async publish(exchangeName: string, message: any) {
const channel = this.connection.getChannel();
channel.publish(exchangeName, '', Buffer.from(JSON.stringify(message)));
}
async subscribe(queueName: string, handler: (message: any) => Promise<void>) {
const channel = this.connection.getChannel();
await channel.consume(queueName, async (msg) => {
if (msg) {
const message = JSON.parse(msg.content.toString());
await handler(message);
channel.ack(msg);
}
});
}
}
// 使用方法
const pubsub = new PubSub(connection);
await pubsub.setup('news', ['sports-queue', 'tech-queue', 'weather-queue']);
// 发布者
await pubsub.publish('news', { category: 'tech', title: 'New AI breakthrough' });
// 订阅者
await pubsub.subscribe('tech-queue', async (msg) => {
console.log('Tech news:', msg.title);
});
路由模式
// routing-pattern.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class RoutingPattern {
constructor(private connection: RabbitMQConnection) {}
async setup(exchangeName: string, bindings: Array<{ queue: string; routingKey: string }>) {
const channel = this.connection.getChannel();
await channel.assertExchange(exchangeName, 'direct', { durable: true });
for (const binding of bindings) {
await channel.assertQueue(binding.queue, { durable: true });
await channel.bindQueue(binding.queue, exchangeName, binding.routingKey);
}
}
async publish(exchangeName: string, routingKey: string, message: any) {
const channel = this.connection.getChannel();
channel.publish(
exchangeName,
routingKey,
Buffer.from(JSON.stringify(message))
);
}
async consume(queueName: string, handler: (message: any) => Promise<void>) {
const channel = this.connection.getChannel();
await channel.consume(queueName, async (msg) => {
if (msg) {
const message = JSON.parse(msg.content.toString());
await handler(message);
channel.ack(msg);
}
});
}
}
// 使用方法
const routing = new RoutingPattern(connection);
await routing.setup('logs', [
{ queue: 'error-queue', routingKey: 'error' },
{ queue: 'info-queue', routingKey: 'info' },
{ queue: 'warning-queue', routingKey: 'warning' }
]);
// 发布者
await routing.publish('logs', 'error', { message: 'Critical error' });
await routing.publish('logs', 'info', { message: 'Info message' });
RPC(远程过程调用)
// rpc-pattern.ts
import { RabbitMQConnection } from './rabbitmq-connection';
import { v4 as uuidv4 } from 'uuid';
export class RPCServer {
constructor(private connection: RabbitMQConnection) {}
async setup(queueName: string, handler: (request: any) => Promise<any>) {
const channel = this.connection.getChannel();
await channel.assertQueue(queueName, { durable: false });
await channel.prefetch(1);
await channel.consume(queueName, async (msg) => {
if (msg) {
const request = JSON.parse(msg.content.toString());
const correlationId = msg.properties.correlationId;
const replyTo = msg.properties.replyTo;
try {
const response = await handler(request);
channel.sendToQueue(
replyTo,
Buffer.from(JSON.stringify(response)),
{ correlationId }
);
channel.ack(msg);
} catch (error) {
channel.nack(msg, false, false);
}
}
});
}
}
export class RPCClient {
private correlationMap = new Map<string, { resolve: any; reject: any }>();
private replyQueue: string;
constructor(private connection: RabbitMQConnection) {}
async initialize() {
const channel = this.connection.getChannel();
const replyQueue = await channel.assertQueue('', { exclusive: true });
this.replyQueue = replyQueue.queue;
await channel.consume(this.replyQueue, (msg) => {
if (msg) {
const correlationId = msg.properties.correlationId;
const callback = this.correlationMap.get(correlationId);
if (callback) {
callback.resolve(JSON.parse(msg.content.toString()));
this.correlationMap.delete(correlationId);
}
channel.ack(msg);
}
});
}
async call(queueName: string, request: any, timeout: number = 5000): Promise<any> {
const channel = this.connection.getChannel();
const correlationId = uuidv4();
return new Promise((resolve, reject) => {
this.correlationMap.set(correlationId, { resolve, reject });
const timer = setTimeout(() => {
this.correlationMap.delete(correlationId);
reject(new Error('RPC timeout'));
}, timeout);
channel.sendToQueue(
queueName,
Buffer.from(JSON.stringify(request)),
{
correlationId,
replyTo: this.replyQueue
}
);
});
}
}
// 使用方法
// 服务器
const server = new RPCServer(connection);
await server.setup('rpc_queue', async (request) => {
return { result: `Processed: ${request.data}` };
});
// 客户端
const client = new RPCClient(connection);
await client.initialize();
const response = await client.call('rpc_queue', { data: 'test' });
console.log(response);
生产者模式
可靠的发布者
// reliable-publisher.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class ReliablePublisher {
private confirmChannel: amqp.ConfirmChannel;
constructor(private connection: RabbitMQConnection) {}
async setup() {
const channel = this.connection.getChannel();
this.confirmChannel = channel as amqp.ConfirmChannel;
await this.confirmChannel.confirmChannel();
}
async publishWithConfirm(
exchangeName: string,
routingKey: string,
message: any
): Promise<boolean> {
return new Promise((resolve, reject) => {
this.confirmChannel.publish(
exchangeName,
routingKey,
Buffer.from(JSON.stringify(message)),
{ persistent: true },
(err) => {
if (err) {
reject(err);
} else {
resolve(true);
}
}
);
});
}
async publishWithRetry(
exchangeName: string,
routingKey: string,
message: any,
maxRetries: number = 3
): Promise<void> {
let lastError: Error | null = null;
for (let i = 0; i < maxRetries; i++) {
try {
await this.publishWithConfirm(exchangeName, routingKey, message);
return;
} catch (error) {
lastError = error as Error;
console.error(`Publish attempt ${i + 1} failed:`, error);
await this.delay(1000 * (i + 1)); // 指数退避
}
}
throw lastError;
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
事务性发布者
// transactional-publisher.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class TransactionalPublisher {
constructor(private connection: RabbitMQConnection) {}
async publishInTransaction(
exchangeName: string,
routingKey: string,
messages: any[]
): Promise<void> {
const channel = this.connection.getChannel();
try {
await channel.selectMode(); // 开始事务
// 发布所有消息
for (const message of messages) {
channel.publish(
exchangeName,
routingKey,
Buffer.from(JSON.stringify(message))
);
}
await channel.commitTx(); // 提交事务
} catch (error) {
await channel.rollbackTx(); // 出错回滚
throw error;
}
}
}
消费者模式
批量消费者
// batch-consumer.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class BatchConsumer {
private batch: any[] = [];
private timer: NodeJS.Timeout | null = null;
constructor(
private connection: RabbitMQConnection,
private batchSize: number = 10,
private batchTimeout: number = 5000
) {}
async consume(queueName: string, handler: (batch: any[]) => Promise<void>) {
const channel = this.connection.getChannel();
await channel.prefetch(this.batchSize);
await channel.consume(queueName, async (msg) => {
if (msg) {
this.batch.push(JSON.parse(msg.content.toString()));
if (this.batch.length >= this.batchSize) {
await this.processBatch(handler);
} else if (!this.timer) {
this.timer = setTimeout(() => this.processBatch(handler), this.batchTimeout);
}
}
});
}
private async processBatch(handler: (batch: any[]) => Promise<void>) {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
if (this.batch.length === 0) return;
const batch = [...this.batch];
this.batch = [];
try {
await handler(batch);
} catch (error) {
console.error('Error processing batch:', error);
// 处理批量失败
}
}
}
速率限制消费者
// rate-limited-consumer.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class RateLimitedConsumer {
private lastProcessTime = 0;
constructor(
private connection: RabbitMQConnection,
private rateLimit: number // 每秒消息数
) {}
async consume(queueName: string, handler: (message: any) => Promise<void>) {
const channel = this.connection.getChannel();
const interval = 1000 / this.rateLimit;
await channel.consume(queueName, async (msg) => {
if (msg) {
const now = Date.now();
const elapsed = now - this.lastProcessTime;
if (elapsed < interval) {
await this.delay(interval - elapsed);
}
try {
const message = JSON.parse(msg.content.toString());
await handler(message);
channel.ack(msg);
this.lastProcessTime = Date.now();
} catch (error) {
console.error('Error processing message:', error);
channel.nack(msg, false, true);
}
}
});
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
死信队列
DLX 设置
// dead-letter-queue.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class DeadLetterQueue {
constructor(private connection: RabbitMQConnection) {}
async setup(
queueName: string,
dlxName: string,
dlqName: string,
maxRetries: number = 3
) {
const channel = this.connection.getChannel();
// 声明死信交换机
await channel.assertExchange(dlxName, 'direct', { durable: true });
// 声明死信队列
await channel.assertQueue(dlqName, { durable: true });
await channel.bindQueue(dlqName, dlxName, '');
// 声明主队列并设置DLX参数
await channel.assertQueue(queueName, {
durable: true,
arguments: {
'x-dead-letter-exchange': dlxName,
'x-dead-letter-routing-key': '',
'x-max-retries': maxRetries
}
});
}
async publish(queueName: string, message: any) {
const channel = this.connection.getChannel();
channel.publish(
'',
queueName,
Buffer.from(JSON.stringify(message)),
{
persistent: true,
headers: {
'x-retry-count': 0
}
}
);
}
async consumeDLQ(dlqName: string, handler: (message: any) => Promise<void>) {
const channel = this.connection.getChannel();
await channel.consume(dlqName, async (msg) => {
if (msg) {
const message = JSON.parse(msg.content.toString());
const retryCount = msg.properties.headers?.['x-retry-count'] || 0;
await handler({ ...message, retryCount });
channel.ack(msg);
}
});
}
}
// 使用方法
const dlq = new DeadLetterQueue(connection);
await dlq.setup('tasks', 'tasks-dlx', 'tasks-dlq', 3);
消息确认
手动确认
// manual-ack.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class ManualAckConsumer {
constructor(private connection: RabbitMQConnection) {}
async consume(queueName: string, handler: (message: any) => Promise<boolean>) {
const channel = this.connection.getChannel();
await channel.prefetch(10); // 限制未确认消息
await channel.consume(queueName, async (msg) => {
if (msg) {
try {
const message = JSON.parse(msg.content.toString());
const success = await handler(message);
if (success) {
channel.ack(msg); // 确认处理成功
} else {
channel.nack(msg, false, true); // 重新入队消息
}
} catch (error) {
console.error('Error processing message:', error);
channel.nack(msg, false, false); // 不重新入队,发送到DLQ
}
}
});
}
}
持久化
持久化队列和消息
// persistence.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class PersistentMessaging {
constructor(private connection: RabbitMQConnection) {}
async setupDurableQueue(queueName: string) {
const channel = this.connection.getChannel();
await channel.assertQueue(queueName, { durable: true });
}
async publishPersistent(queueName: string, message: any) {
const channel = this.connection.getChannel();
channel.sendToQueue(
queueName,
Buffer.from(JSON.stringify(message)),
{
persistent: true, // 消息在代理重启后仍然存在
deliveryMode: 2 // 持久化传输模式
}
);
}
}
错误处理
错误处理策略
// error-handling.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class ErrorHandler {
private errorCount = 0;
private maxErrors = 5;
private backoffTime = 1000;
async consumeWithRetry(
queueName: string,
handler: (message: any) => Promise<void>
) {
const channel = this.connection.getChannel();
await channel.consume(queueName, async (msg) => {
if (msg) {
try {
const message = JSON.parse(msg.content.toString());
await handler(message);
channel.ack(msg);
this.errorCount = 0; // 成功后重置
} catch (error) {
this.errorCount++;
console.error(`Error ${this.errorCount}:`, error);
if (this.errorCount >= this.maxErrors) {
// 达到最大重试次数,发送到DLQ
channel.nack(msg, false, false);
this.errorCount = 0;
} else {
// 带退避的重新入队
await this.delay(this.backoffTime * this.errorCount);
channel.nack(msg, false, true);
}
}
}
});
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
性能优化
连接池
// connection-pool.ts
import amqp from 'amqplib';
export class ConnectionPool {
private pool: amqp.Channel[] = [];
private maxSize: number;
constructor(maxSize: number = 10) {
this.maxSize = maxSize;
}
async getChannel(url: string): Promise<amqp.Channel> {
if (this.pool.length > 0) {
return this.pool.pop()!;
}
const connection = await amqp.connect(url);
return await connection.createChannel();
}
releaseChannel(channel: amqp.Channel) {
if (this.pool.length < this.maxSize) {
this.pool.push(channel);
} else {
channel.close();
}
}
}
监控
健康检查
// health-check.ts
import { RabbitMQConnection } from './rabbitmq-connection';
export class HealthCheck {
constructor(private connection: RabbitMQConnection) {}
async check(): Promise<boolean> {
try {
const channel = this.connection.getChannel();
await channel.checkExchange('amq.direct'); // 内置交换机
return true;
} catch (error) {
console.error('Health check failed:', error);
return false;
}
}
async getQueueInfo(queueName: string): Promise<any> {
const channel = this.connection.getChannel();
return await channel.checkQueue(queueName);
}
}
生产环境设置
Docker Compose
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672" # AMQP 端口
- "15672:15672" # 管理界面
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin123
RABBITMQ_DEFAULT_VHOST: /
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 30s
timeout: 10s
retries: 3
volumes:
rabbitmq_data:
配置最佳实践
// config.ts
export interface RabbitMQConfig {
url: string;
prefetch: number;
reconnectDelay: number;
maxRetries: number;
heartbeat: number;
}
export const defaultConfig: RabbitMQConfig = {
url: process.env.RABBITMQ_URL || 'amqp://localhost',
prefetch: 10,
reconnectDelay: 5000,
maxRetries: 10,
heartbeat: 60
};
附加资源
最佳实践
交换机选择
- 使用直连交换机:用于具有特定路由键的点对点路由
- 使用主题交换机:用于具有模式匹配的灵活路由
- 使用扇形交换机:用于向多个消费者广播
- 使用头交换机:用于基于消息头的复杂路由
- 文档化交换机用途:清晰记录每个交换机的用途
队列配置
- 使用持久化队列:用于必须在代理重启后存活的关键数据
- 设置适当的TTL:配置消息和队列的TTL以进行清理
- 使用独占队列:用于临时的、单消费者队列
- 配置自动删除:对于不再使用的队列进行删除
- 设置队列长度限制:防止内存耗尽
消息设计
- 保持消息小巧:优先考虑小于1MB的消息以获得最佳性能
- 包含消息元数据:添加头信息以用于路由和追踪
- 使用一致的模式:定义消息格式和版本
- 设计幂等性:消息可能会被多次传递
- 添加关联ID:用于请求/响应模式
消费者配置
- 使用手动确认:启用手动确认以获得更好的控制
- 设置适当的预取:限制每个消费者未确认的消息数
- 正确处理确认:始终确认或否定确认消息
- 实现优雅关闭:停止消费并正确关闭连接
- 监控消费者健康:跟踪消息处理速率
生产者配置
- 使用发布者确认:确保消息被代理接收
- 设置持久化传输:对于必须在重启后存活的关键消息
- 实现重试逻辑:处理瞬时故障
- 使用连接池:重用连接以获得更好的性能
- 批量消息时可能:减少网络开销
死信处理
- 始终使用DLX:将失败的消息路由到死信队列
- 配置重试策略:设置最大重试次数和退避
- 监控DLQ大小:对不断增长的死信队列发出警报
- 处理DLQ消息:分析和重新处理失败的消息
- 文档化DLQ处理:清晰的处理失败消息的程序
性能优化
- 使用多个队列:跨队列分配负载
- 配置适当的预取:在吞吐量和公平性之间取得平衡
- 使用连接池:限制TCP连接的数量
- 启用压缩:对于大型消息负载
- 监控资源使用:跟踪内存、CPU和磁盘I/O
安全
- 启用认证:使用用户名/密码或证书
- 配置TLS加密:在生产环境中加密连接
- 使用虚拟主机:隔离不同的应用程序
- 设置用户权限:限制对特定资源的访问
- 审计访问日志:监控谁在访问什么
监控和可观测性
- 跟踪队列深度:监控队列中的消息数量
- 监控消费者滞后:跟踪未处理的消息
- 收集代理指标:使用管理API或插件
- 设置警报:对队列深度、消费者故障或代理问题发出警报
- 使用分布式追踪:跨服务关联消息
高可用性
- 使用集群:设置RabbitMQ集群以实现高可用性
- 配置队列镜像:跨集群节点复制队列
- 使用负载均衡器:跨集群节点分配连接
- 测试故障转移场景:验证自动恢复是否有效
- 监控集群健康:跟踪节点状态和同步
清单
设置和配置
- [ ] 安装并配置RabbitMQ代理
- [ ] 启用管理插件以进行监控
- [ ] 配置认证和授权
- [ ] 设置TLS加密以实现安全连接
- [ ] 配置集群以实现高可用性
交换机设置
- [ ] 为每个用例选择适当的交换机类型
- [ ] 文档化交换机用途和路由规则
- [ ] 为关键数据配置持久化交换机
- [ ] 如有需要,设置交换机到交换机的绑定
- [ ] 测试交换机路由行为
队列设置
- [ ] 为关键数据配置持久化队列
- [ ] 设置适当的TTL用于消息和队列
- [ ] 为失败消息配置死信交换机
- [ ] 设置队列长度限制以防止内存问题
- [ ] 文档化队列用途和使用
生产者配置
- [ ] 启用发布者确认
- [ ] 为关键消息设置持久化传输
- [ ] 实现带有退避的重试逻辑
- [ ] 使用连接池
- [ ] 添加消息元数据和关联ID
消费者配置
- [] 使用手动确认
- [] 设置适当的预取数量
- [] 实施优雅关闭处理
- [] 配置错误处理和DLQ路由
- [] 监控消费者健康和处理速率
死信队列设置
- [] 配置死信交换机
- [] 设置死信队列
- [] 配置重试策略(最大重试次数、退避)
- [] 设置DLQ监控和警报
- [] 文档化DLQ处理程序
安全设置
- [] 启用认证(用户名/密码或证书)
- [] 配置TLS/SSL加密
- [] 设置虚拟主机以隔离
- [] 配置用户权限和ACL
- [] 启用审计日志
性能调整
- [] 调整消费者预取设置
- [] 配置连接池
- [] 为大型消息启用压缩
- [] 监控资源使用(内存、CPU、磁盘)
- [] 优化队列和交换机配置
高可用性
- [] 设置RabbitMQ集群
- [] 配置队列镜像
- [] 设置连接的负载均衡器
- [] 测试故障转移场景
- [] 监控集群健康和同步
监控和警报
- [] 启用管理插件
- [] 设置指标收集(Prometheus/Grafana)
- [] 配置队列深度警报
- [] 监控消费者滞后和处理速率
- [] 跟踪代理健康指标
测试
- [] 测试消息发布和消费
- [] 验证交换机路由行为
- [] 测试死信队列路由
- [] 验证确认行为
- [] 测试故障转移和恢复
文档化
- [] 文档化交换机和队列结构
- [] 文档化消息模式和格式
- [] 创建常见问题解答
- [] 文档化安全配置
- [] 维护API文档