RabbitMQPatterns rabbitmq-patterns

RabbitMQ Patterns 是一个关于RabbitMQ消息模式和分布式系统最佳实践的综合指南,涵盖了核心组件、交换机类型、消息模式、生产者和消费者模式等关键概念,是分布式消息队列领域的必备技能。

后端开发 0 次安装 0 次浏览 更新于 3/5/2026

RabbitMQ 模式

概览

RabbitMQ消息模式和分布式系统最佳实践的全面指南。

目录

  1. RabbitMQ 概念
  2. 交换机类型
  3. 消息模式
  4. 生产者模式
  5. 消费者模式
  6. 死信队列
  7. 消息确认
  8. 持久化
  9. 错误处理
  10. 性能优化
  11. 监控
  12. 生产环境设置

RabbitMQ 概念

核心组件

# RabbitMQ 核心概念
"""
- 生产者:发送消息的应用程序
- 消费者:接收消息的应用程序
- 队列:存储消息的缓冲区
- 交换机:从生产者接收消息并将其路由到队列
- 绑定:交换机和队列之间的链接
- 路由键:交换机用来路由消息的键
- 连接:应用程序和RabbitMQ之间的TCP连接
- 通道:TCP连接内的虚拟连接
"""

基本连接(Node.js)

// rabbitmq-connection.ts
import amqp from 'amqplib';

export class RabbitMQConnection {
  private connection: amqp.Connection | null = null;
  private channel: amqp.Channel | null = null;

  async connect(url: string = 'amqp://localhost'): Promise<void> {
    try {
      this.connection = await amqp.connect(url);
      this.channel = await this.connection.createChannel();
      console.log('Connected to RabbitMQ');
    } catch (error) {
      console.error('Failed to connect to RabbitMQ:', error);
      throw error;
    }
  }

  getChannel(): amqp.Channel {
    if (!this.channel) {
      throw new Error('Channel not initialized');
    }
    return this.channel;
  }

  async close(): Promise<void> {
    if (this.channel) {
      await this.channel.close();
    }
    if (this.connection) {
      await this.connection.close();
    }
  }
}

// 使用方法
const connection = new RabbitMQConnection();
await connection.connect('amqp://user:password@localhost:5672');
const channel = connection.getChannel();

基本连接(Python)

# rabbitmq_connection.py
import pika
import logging

class RabbitMQConnection:
    def __init__(self, url: str = 'amqp://localhost'):
        self.url = url
        self.connection = None
        self.channel = None
    
    def connect(self):
        """Establish connection to RabbitMQ"""
        try:
            parameters = pika.URLParameters(self.url)
            self.connection = pika.BlockingConnection(parameters)
            self.channel = self.connection.channel()
            logging.info("Connected to RabbitMQ")
            return self.channel
        except Exception as e:
            logging.error(f"Failed to connect to RabbitMQ: {e}")
            raise
    
    def close(self):
        """Close connection"""
        if self.channel:
            self.channel.close()
        if self.connection:
            self.connection.close()
        logging.info("RabbitMQ connection closed")

# 使用方法
connection = RabbitMQConnection('amqp://user:password@localhost:5672')
channel = connection.connect()

交换机类型

直连交换机

// direct-exchange.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class DirectExchange {
  constructor(private connection: RabbitMQConnection) {}

  async setup(exchangeName: string, queueNames: string[], routingKeys: string[]) {
    const channel = this.connection.getChannel();

    // 声明直连交换机
    await channel.assertExchange(exchangeName, 'direct', { durable: true });

    // 声明并绑定队列
    for (let i = 0; i < queueNames.length; i++) {
      await channel.assertQueue(queueNames[i], { durable: true });
      await channel.bindQueue(queueNames[i], exchangeName, routingKeys[i]);
    }
  }

  async publish(exchangeName: string, routingKey: string, message: any) {
    const channel = this.connection.getChannel();
    channel.publish(
      exchangeName,
      routingKey,
      Buffer.from(JSON.stringify(message)),
      { persistent: true }
    );
  }
}

// 使用方法
const direct = new DirectExchange(connection);
await direct.setup('orders', ['order-processing', 'order-shipping'], ['process', 'ship']);
await direct.publish('orders', 'process', { orderId: 123 });

主题交换机

// topic-exchange.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class TopicExchange {
  constructor(private connection: RabbitMQConnection) {}

  async setup(exchangeName: string, bindings: Array<{ queue: string; pattern: string }>) {
    const channel = this.connection.getChannel();

    // 声明主题交换机
    await channel.assertExchange(exchangeName, 'topic', { durable: true });

    // 声明并绑定队列
    for (const binding of bindings) {
      await channel.assertQueue(binding.queue, { durable: true });
      await channel.bindQueue(binding.queue, exchangeName, binding.pattern);
    }
  }

  async publish(exchangeName: string, routingKey: string, message: any) {
    const channel = this.connection.getChannel();
    channel.publish(
      exchangeName,
      routingKey,
      Buffer.from(JSON.stringify(message)),
      { persistent: true }
    );
  }
}

// 使用方法
const topic = new TopicExchange(connection);
await topic.setup('logs', [
  { queue: 'error-logs', pattern: '*.error' },
  { queue: 'all-logs', pattern: '#' }
]);

// 发布带有路由键的消息
await topic.publish('logs', 'app.error', { message: 'Error occurred' });
await topic.publish('logs', 'app.info', { message: 'Info message' });

扇形交换机

// fanout-exchange.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class FanoutExchange {
  constructor(private connection: RabbitMQConnection) {}

  async setup(exchangeName: string, queueNames: string[]) {
    const channel = this.connection.getChannel();

    // 声明扇形交换机
    await channel.assertExchange(exchangeName, 'fanout', { durable: true });

    // 声明并绑定队列(不需要路由键)
    for (const queueName of queueNames) {
      await channel.assertQueue(queueName, { durable: true });
      await channel.bindQueue(queueName, exchangeName, '');
    }
  }

  async publish(exchangeName: string, message: any) {
    const channel = this.connection.getChannel();
    channel.publish(
      exchangeName,
      '',  // 扇形忽略路由键
      Buffer.from(JSON.stringify(message)),
      { persistent: true }
    );
  }
}

// 使用方法
const fanout = new FanoutExchange(connection);
await fanout.setup('notifications', ['email-queue', 'sms-queue', 'push-queue']);
await fanout.publish('notifications', { userId: 123, message: 'Hello!' });

头交换机

// headers-exchange.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class HeadersExchange {
  constructor(private connection: RabbitMQConnection) {}

  async setup(exchangeName: string, bindings: Array<{ queue: string; headers: any }>) {
    const channel = this.connection.getChannel();

    // 声明头交换机
    await channel.assertExchange(exchangeName, 'headers', { durable: true });

    // 声明并绑定队列
    for (const binding of bindings) {
      await channel.assertQueue(binding.queue, { durable: true });
      await channel.bindQueue(binding.queue, exchangeName, '', binding.headers);
    }
  }

  async publish(exchangeName: string, message: any, headers: any) {
    const channel = this.connection.getChannel();
    channel.publish(
      exchangeName,
      '',
      Buffer.from(JSON.stringify(message)),
      { persistent: true, headers }
    );
  }
}

// 使用方法
const headers = new HeadersExchange(connection);
await headers.setup('priority', [
  { queue: 'high-priority', headers: { 'x-match': 'all', priority: 'high' } },
  { queue: 'low-priority', headers: { 'x-match': 'all', priority: 'low' } }
]);

await headers.publish('priority', { data: 'test' }, { priority: 'high' });

消息模式

工作队列(竞争消费者)

// work-queue.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class WorkQueue {
  constructor(private connection: RabbitMQConnection) {}

  async setup(queueName: string) {
    const channel = this.connection.getChannel();
    await channel.assertQueue(queueName, { durable: true });
    await channel.prefetch(1);  // 公平分发
  }

  async publish(queueName: string, task: any) {
    const channel = this.connection.getChannel();
    channel.sendToQueue(
      queueName,
      Buffer.from(JSON.stringify(task)),
      { persistent: true }
    );
  }

  async consume(queueName: string, handler: (task: any) => Promise<void>) {
    const channel = this.connection.getChannel();
    await channel.consume(
      queueName,
      async (msg) => {
        if (msg) {
          try {
            const task = JSON.parse(msg.content.toString());
            await handler(task);
            channel.ack(msg);
          } catch (error) {
            console.error('Error processing task:', error);
            channel.nack(msg, false, true);  // 重新入队
          }
        }
      },
      { noAck: false }
    );
  }
}

// 使用方法
const workQueue = new WorkQueue(connection);
await workQueue.setup('tasks');

// 生产者
await workQueue.publish('tasks', { id: 1, data: 'Process me' });

// 消费者
await workQueue.consume('tasks', async (task) => {
  console.log('Processing task:', task.id);
  await processTask(task);
});

发布/订阅

// pub-sub.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class PubSub {
  constructor(private connection: RabbitMQConnection) {}

  async setup(exchangeName: string, queueNames: string[]) {
    const channel = this.connection.getChannel();
    await channel.assertExchange(exchangeName, 'fanout', { durable: true });

    for (const queueName of queueNames) {
      await channel.assertQueue(queueName, { durable: false });  // 临时队列非持久化
      await channel.bindQueue(queueName, exchangeName, '');
    }
  }

  async publish(exchangeName: string, message: any) {
    const channel = this.connection.getChannel();
    channel.publish(exchangeName, '', Buffer.from(JSON.stringify(message)));
  }

  async subscribe(queueName: string, handler: (message: any) => Promise<void>) {
    const channel = this.connection.getChannel();
    await channel.consume(queueName, async (msg) => {
      if (msg) {
        const message = JSON.parse(msg.content.toString());
        await handler(message);
        channel.ack(msg);
      }
    });
  }
}

// 使用方法
const pubsub = new PubSub(connection);
await pubsub.setup('news', ['sports-queue', 'tech-queue', 'weather-queue']);

// 发布者
await pubsub.publish('news', { category: 'tech', title: 'New AI breakthrough' });

// 订阅者
await pubsub.subscribe('tech-queue', async (msg) => {
  console.log('Tech news:', msg.title);
});

路由模式

// routing-pattern.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class RoutingPattern {
  constructor(private connection: RabbitMQConnection) {}

  async setup(exchangeName: string, bindings: Array<{ queue: string; routingKey: string }>) {
    const channel = this.connection.getChannel();
    await channel.assertExchange(exchangeName, 'direct', { durable: true });

    for (const binding of bindings) {
      await channel.assertQueue(binding.queue, { durable: true });
      await channel.bindQueue(binding.queue, exchangeName, binding.routingKey);
    }
  }

  async publish(exchangeName: string, routingKey: string, message: any) {
    const channel = this.connection.getChannel();
    channel.publish(
      exchangeName,
      routingKey,
      Buffer.from(JSON.stringify(message))
    );
  }

  async consume(queueName: string, handler: (message: any) => Promise<void>) {
    const channel = this.connection.getChannel();
    await channel.consume(queueName, async (msg) => {
      if (msg) {
        const message = JSON.parse(msg.content.toString());
        await handler(message);
        channel.ack(msg);
      }
    });
  }
}

// 使用方法
const routing = new RoutingPattern(connection);
await routing.setup('logs', [
  { queue: 'error-queue', routingKey: 'error' },
  { queue: 'info-queue', routingKey: 'info' },
  { queue: 'warning-queue', routingKey: 'warning' }
]);

// 发布者
await routing.publish('logs', 'error', { message: 'Critical error' });
await routing.publish('logs', 'info', { message: 'Info message' });

RPC(远程过程调用)

// rpc-pattern.ts
import { RabbitMQConnection } from './rabbitmq-connection';
import { v4 as uuidv4 } from 'uuid';

export class RPCServer {
  constructor(private connection: RabbitMQConnection) {}

  async setup(queueName: string, handler: (request: any) => Promise<any>) {
    const channel = this.connection.getChannel();
    await channel.assertQueue(queueName, { durable: false });
    await channel.prefetch(1);

    await channel.consume(queueName, async (msg) => {
      if (msg) {
        const request = JSON.parse(msg.content.toString());
        const correlationId = msg.properties.correlationId;
        const replyTo = msg.properties.replyTo;

        try {
          const response = await handler(request);
          channel.sendToQueue(
            replyTo,
            Buffer.from(JSON.stringify(response)),
            { correlationId }
          );
          channel.ack(msg);
        } catch (error) {
          channel.nack(msg, false, false);
        }
      }
    });
  }
}

export class RPCClient {
  private correlationMap = new Map<string, { resolve: any; reject: any }>();
  private replyQueue: string;

  constructor(private connection: RabbitMQConnection) {}

  async initialize() {
    const channel = this.connection.getChannel();
    const replyQueue = await channel.assertQueue('', { exclusive: true });
    this.replyQueue = replyQueue.queue;

    await channel.consume(this.replyQueue, (msg) => {
      if (msg) {
        const correlationId = msg.properties.correlationId;
        const callback = this.correlationMap.get(correlationId);
        if (callback) {
          callback.resolve(JSON.parse(msg.content.toString()));
          this.correlationMap.delete(correlationId);
        }
        channel.ack(msg);
      }
    });
  }

  async call(queueName: string, request: any, timeout: number = 5000): Promise<any> {
    const channel = this.connection.getChannel();
    const correlationId = uuidv4();

    return new Promise((resolve, reject) => {
      this.correlationMap.set(correlationId, { resolve, reject });

      const timer = setTimeout(() => {
        this.correlationMap.delete(correlationId);
        reject(new Error('RPC timeout'));
      }, timeout);

      channel.sendToQueue(
        queueName,
        Buffer.from(JSON.stringify(request)),
        {
          correlationId,
          replyTo: this.replyQueue
        }
      );
    });
  }
}

// 使用方法
// 服务器
const server = new RPCServer(connection);
await server.setup('rpc_queue', async (request) => {
  return { result: `Processed: ${request.data}` };
});

// 客户端
const client = new RPCClient(connection);
await client.initialize();
const response = await client.call('rpc_queue', { data: 'test' });
console.log(response);

生产者模式

可靠的发布者

// reliable-publisher.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class ReliablePublisher {
  private confirmChannel: amqp.ConfirmChannel;

  constructor(private connection: RabbitMQConnection) {}

  async setup() {
    const channel = this.connection.getChannel();
    this.confirmChannel = channel as amqp.ConfirmChannel;
    await this.confirmChannel.confirmChannel();
  }

  async publishWithConfirm(
    exchangeName: string,
    routingKey: string,
    message: any
  ): Promise<boolean> {
    return new Promise((resolve, reject) => {
      this.confirmChannel.publish(
        exchangeName,
        routingKey,
        Buffer.from(JSON.stringify(message)),
        { persistent: true },
        (err) => {
          if (err) {
            reject(err);
          } else {
            resolve(true);
          }
        }
      );
    });
  }

  async publishWithRetry(
    exchangeName: string,
    routingKey: string,
    message: any,
    maxRetries: number = 3
  ): Promise<void> {
    let lastError: Error | null = null;

    for (let i = 0; i < maxRetries; i++) {
      try {
        await this.publishWithConfirm(exchangeName, routingKey, message);
        return;
      } catch (error) {
        lastError = error as Error;
        console.error(`Publish attempt ${i + 1} failed:`, error);
        await this.delay(1000 * (i + 1));  // 指数退避
      }
    }

    throw lastError;
  }

  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

事务性发布者

// transactional-publisher.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class TransactionalPublisher {
  constructor(private connection: RabbitMQConnection) {}

  async publishInTransaction(
    exchangeName: string,
    routingKey: string,
    messages: any[]
  ): Promise<void> {
    const channel = this.connection.getChannel();

    try {
      await channel.selectMode();  // 开始事务

      // 发布所有消息
      for (const message of messages) {
        channel.publish(
          exchangeName,
          routingKey,
          Buffer.from(JSON.stringify(message))
        );
      }

      await channel.commitTx();  // 提交事务
    } catch (error) {
      await channel.rollbackTx();  // 出错回滚
      throw error;
    }
  }
}

消费者模式

批量消费者

// batch-consumer.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class BatchConsumer {
  private batch: any[] = [];
  private timer: NodeJS.Timeout | null = null;

  constructor(
    private connection: RabbitMQConnection,
    private batchSize: number = 10,
    private batchTimeout: number = 5000
  ) {}

  async consume(queueName: string, handler: (batch: any[]) => Promise<void>) {
    const channel = this.connection.getChannel();
    await channel.prefetch(this.batchSize);

    await channel.consume(queueName, async (msg) => {
      if (msg) {
        this.batch.push(JSON.parse(msg.content.toString()));

        if (this.batch.length >= this.batchSize) {
          await this.processBatch(handler);
        } else if (!this.timer) {
          this.timer = setTimeout(() => this.processBatch(handler), this.batchTimeout);
        }
      }
    });
  }

  private async processBatch(handler: (batch: any[]) => Promise<void>) {
    if (this.timer) {
      clearTimeout(this.timer);
      this.timer = null;
    }

    if (this.batch.length === 0) return;

    const batch = [...this.batch];
    this.batch = [];

    try {
      await handler(batch);
    } catch (error) {
      console.error('Error processing batch:', error);
      // 处理批量失败
    }
  }
}

速率限制消费者

// rate-limited-consumer.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class RateLimitedConsumer {
  private lastProcessTime = 0;

  constructor(
    private connection: RabbitMQConnection,
    private rateLimit: number  // 每秒消息数
  ) {}

  async consume(queueName: string, handler: (message: any) => Promise<void>) {
    const channel = this.connection.getChannel();
    const interval = 1000 / this.rateLimit;

    await channel.consume(queueName, async (msg) => {
      if (msg) {
        const now = Date.now();
        const elapsed = now - this.lastProcessTime;

        if (elapsed < interval) {
          await this.delay(interval - elapsed);
        }

        try {
          const message = JSON.parse(msg.content.toString());
          await handler(message);
          channel.ack(msg);
          this.lastProcessTime = Date.now();
        } catch (error) {
          console.error('Error processing message:', error);
          channel.nack(msg, false, true);
        }
      }
    });
  }

  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

死信队列

DLX 设置

// dead-letter-queue.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class DeadLetterQueue {
  constructor(private connection: RabbitMQConnection) {}

  async setup(
    queueName: string,
    dlxName: string,
    dlqName: string,
    maxRetries: number = 3
  ) {
    const channel = this.connection.getChannel();

    // 声明死信交换机
    await channel.assertExchange(dlxName, 'direct', { durable: true });

    // 声明死信队列
    await channel.assertQueue(dlqName, { durable: true });
    await channel.bindQueue(dlqName, dlxName, '');

    // 声明主队列并设置DLX参数
    await channel.assertQueue(queueName, {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': dlxName,
        'x-dead-letter-routing-key': '',
        'x-max-retries': maxRetries
      }
    });
  }

  async publish(queueName: string, message: any) {
    const channel = this.connection.getChannel();
    channel.publish(
      '',
      queueName,
      Buffer.from(JSON.stringify(message)),
      {
        persistent: true,
        headers: {
          'x-retry-count': 0
        }
      }
    );
  }

  async consumeDLQ(dlqName: string, handler: (message: any) => Promise<void>) {
    const channel = this.connection.getChannel();
    await channel.consume(dlqName, async (msg) => {
      if (msg) {
        const message = JSON.parse(msg.content.toString());
        const retryCount = msg.properties.headers?.['x-retry-count'] || 0;

        await handler({ ...message, retryCount });
        channel.ack(msg);
      }
    });
  }
}

// 使用方法
const dlq = new DeadLetterQueue(connection);
await dlq.setup('tasks', 'tasks-dlx', 'tasks-dlq', 3);

消息确认

手动确认

// manual-ack.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class ManualAckConsumer {
  constructor(private connection: RabbitMQConnection) {}

  async consume(queueName: string, handler: (message: any) => Promise<boolean>) {
    const channel = this.connection.getChannel();
    await channel.prefetch(10);  // 限制未确认消息

    await channel.consume(queueName, async (msg) => {
      if (msg) {
        try {
          const message = JSON.parse(msg.content.toString());
          const success = await handler(message);

          if (success) {
            channel.ack(msg);  // 确认处理成功
          } else {
            channel.nack(msg, false, true);  // 重新入队消息
          }
        } catch (error) {
          console.error('Error processing message:', error);
          channel.nack(msg, false, false);  // 不重新入队,发送到DLQ
        }
      }
    });
  }
}

持久化

持久化队列和消息

// persistence.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class PersistentMessaging {
  constructor(private connection: RabbitMQConnection) {}

  async setupDurableQueue(queueName: string) {
    const channel = this.connection.getChannel();
    await channel.assertQueue(queueName, { durable: true });
  }

  async publishPersistent(queueName: string, message: any) {
    const channel = this.connection.getChannel();
    channel.sendToQueue(
      queueName,
      Buffer.from(JSON.stringify(message)),
      {
        persistent: true,  // 消息在代理重启后仍然存在
        deliveryMode: 2   // 持久化传输模式
      }
    );
  }
}

错误处理

错误处理策略

// error-handling.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class ErrorHandler {
  private errorCount = 0;
  private maxErrors = 5;
  private backoffTime = 1000;

  async consumeWithRetry(
    queueName: string,
    handler: (message: any) => Promise<void>
  ) {
    const channel = this.connection.getChannel();

    await channel.consume(queueName, async (msg) => {
      if (msg) {
        try {
          const message = JSON.parse(msg.content.toString());
          await handler(message);
          channel.ack(msg);
          this.errorCount = 0;  // 成功后重置
        } catch (error) {
          this.errorCount++;
          console.error(`Error ${this.errorCount}:`, error);

          if (this.errorCount >= this.maxErrors) {
            // 达到最大重试次数,发送到DLQ
            channel.nack(msg, false, false);
            this.errorCount = 0;
          } else {
            // 带退避的重新入队
            await this.delay(this.backoffTime * this.errorCount);
            channel.nack(msg, false, true);
          }
        }
      }
    });
  }

  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

性能优化

连接池

// connection-pool.ts
import amqp from 'amqplib';

export class ConnectionPool {
  private pool: amqp.Channel[] = [];
  private maxSize: number;

  constructor(maxSize: number = 10) {
    this.maxSize = maxSize;
  }

  async getChannel(url: string): Promise<amqp.Channel> {
    if (this.pool.length > 0) {
      return this.pool.pop()!;
    }

    const connection = await amqp.connect(url);
    return await connection.createChannel();
  }

  releaseChannel(channel: amqp.Channel) {
    if (this.pool.length < this.maxSize) {
      this.pool.push(channel);
    } else {
      channel.close();
    }
  }
}

监控

健康检查

// health-check.ts
import { RabbitMQConnection } from './rabbitmq-connection';

export class HealthCheck {
  constructor(private connection: RabbitMQConnection) {}

  async check(): Promise<boolean> {
    try {
      const channel = this.connection.getChannel();
      await channel.checkExchange('amq.direct');  // 内置交换机
      return true;
    } catch (error) {
      console.error('Health check failed:', error);
      return false;
    }
  }

  async getQueueInfo(queueName: string): Promise<any> {
    const channel = this.connection.getChannel();
    return await channel.checkQueue(queueName);
  }
}

生产环境设置

Docker Compose

# docker-compose.yml
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672"   # AMQP 端口
      - "15672:15672" # 管理界面
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123
      RABBITMQ_DEFAULT_VHOST: /
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 30s
      timeout: 10s
      retries: 3

volumes:
  rabbitmq_data:

配置最佳实践

// config.ts
export interface RabbitMQConfig {
  url: string;
  prefetch: number;
  reconnectDelay: number;
  maxRetries: number;
  heartbeat: number;
}

export const defaultConfig: RabbitMQConfig = {
  url: process.env.RABBITMQ_URL || 'amqp://localhost',
  prefetch: 10,
  reconnectDelay: 5000,
  maxRetries: 10,
  heartbeat: 60
};

附加资源

最佳实践

交换机选择

  • 使用直连交换机:用于具有特定路由键的点对点路由
  • 使用主题交换机:用于具有模式匹配的灵活路由
  • 使用扇形交换机:用于向多个消费者广播
  • 使用头交换机:用于基于消息头的复杂路由
  • 文档化交换机用途:清晰记录每个交换机的用途

队列配置

  • 使用持久化队列:用于必须在代理重启后存活的关键数据
  • 设置适当的TTL:配置消息和队列的TTL以进行清理
  • 使用独占队列:用于临时的、单消费者队列
  • 配置自动删除:对于不再使用的队列进行删除
  • 设置队列长度限制:防止内存耗尽

消息设计

  • 保持消息小巧:优先考虑小于1MB的消息以获得最佳性能
  • 包含消息元数据:添加头信息以用于路由和追踪
  • 使用一致的模式:定义消息格式和版本
  • 设计幂等性:消息可能会被多次传递
  • 添加关联ID:用于请求/响应模式

消费者配置

  • 使用手动确认:启用手动确认以获得更好的控制
  • 设置适当的预取:限制每个消费者未确认的消息数
  • 正确处理确认:始终确认或否定确认消息
  • 实现优雅关闭:停止消费并正确关闭连接
  • 监控消费者健康:跟踪消息处理速率

生产者配置

  • 使用发布者确认:确保消息被代理接收
  • 设置持久化传输:对于必须在重启后存活的关键消息
  • 实现重试逻辑:处理瞬时故障
  • 使用连接池:重用连接以获得更好的性能
  • 批量消息时可能:减少网络开销

死信处理

  • 始终使用DLX:将失败的消息路由到死信队列
  • 配置重试策略:设置最大重试次数和退避
  • 监控DLQ大小:对不断增长的死信队列发出警报
  • 处理DLQ消息:分析和重新处理失败的消息
  • 文档化DLQ处理:清晰的处理失败消息的程序

性能优化

  • 使用多个队列:跨队列分配负载
  • 配置适当的预取:在吞吐量和公平性之间取得平衡
  • 使用连接池:限制TCP连接的数量
  • 启用压缩:对于大型消息负载
  • 监控资源使用:跟踪内存、CPU和磁盘I/O

安全

  • 启用认证:使用用户名/密码或证书
  • 配置TLS加密:在生产环境中加密连接
  • 使用虚拟主机:隔离不同的应用程序
  • 设置用户权限:限制对特定资源的访问
  • 审计访问日志:监控谁在访问什么

监控和可观测性

  • 跟踪队列深度:监控队列中的消息数量
  • 监控消费者滞后:跟踪未处理的消息
  • 收集代理指标:使用管理API或插件
  • 设置警报:对队列深度、消费者故障或代理问题发出警报
  • 使用分布式追踪:跨服务关联消息

高可用性

  • 使用集群:设置RabbitMQ集群以实现高可用性
  • 配置队列镜像:跨集群节点复制队列
  • 使用负载均衡器:跨集群节点分配连接
  • 测试故障转移场景:验证自动恢复是否有效
  • 监控集群健康:跟踪节点状态和同步

清单

设置和配置

  • [ ] 安装并配置RabbitMQ代理
  • [ ] 启用管理插件以进行监控
  • [ ] 配置认证和授权
  • [ ] 设置TLS加密以实现安全连接
  • [ ] 配置集群以实现高可用性

交换机设置

  • [ ] 为每个用例选择适当的交换机类型
  • [ ] 文档化交换机用途和路由规则
  • [ ] 为关键数据配置持久化交换机
  • [ ] 如有需要,设置交换机到交换机的绑定
  • [ ] 测试交换机路由行为

队列设置

  • [ ] 为关键数据配置持久化队列
  • [ ] 设置适当的TTL用于消息和队列
  • [ ] 为失败消息配置死信交换机
  • [ ] 设置队列长度限制以防止内存问题
  • [ ] 文档化队列用途和使用

生产者配置

  • [ ] 启用发布者确认
  • [ ] 为关键消息设置持久化传输
  • [ ] 实现带有退避的重试逻辑
  • [ ] 使用连接池
  • [ ] 添加消息元数据和关联ID

消费者配置

  • [] 使用手动确认
  • [] 设置适当的预取数量
  • [] 实施优雅关闭处理
  • [] 配置错误处理和DLQ路由
  • [] 监控消费者健康和处理速率

死信队列设置

  • [] 配置死信交换机
  • [] 设置死信队列
  • [] 配置重试策略(最大重试次数、退避)
  • [] 设置DLQ监控和警报
  • [] 文档化DLQ处理程序

安全设置

  • [] 启用认证(用户名/密码或证书)
  • [] 配置TLS/SSL加密
  • [] 设置虚拟主机以隔离
  • [] 配置用户权限和ACL
  • [] 启用审计日志

性能调整

  • [] 调整消费者预取设置
  • [] 配置连接池
  • [] 为大型消息启用压缩
  • [] 监控资源使用(内存、CPU、磁盘)
  • [] 优化队列和交换机配置

高可用性

  • [] 设置RabbitMQ集群
  • [] 配置队列镜像
  • [] 设置连接的负载均衡器
  • [] 测试故障转移场景
  • [] 监控集群健康和同步

监控和警报

  • [] 启用管理插件
  • [] 设置指标收集(Prometheus/Grafana)
  • [] 配置队列深度警报
  • [] 监控消费者滞后和处理速率
  • [] 跟踪代理健康指标

测试

  • [] 测试消息发布和消费
  • [] 验证交换机路由行为
  • [] 测试死信队列路由
  • [] 验证确认行为
  • [] 测试故障转移和恢复

文档化

  • [] 文档化交换机和队列结构
  • [] 文档化消息模式和格式
  • [] 创建常见问题解答
  • [] 文档化安全配置
  • [] 维护API文档