QueueMonitoring 'queue_depth',

这个技能涉及对RabbitMQ、Redis和Kafka等消息队列的监控,包括关键指标的收集、健康检查、告警策略、仪表板展示、死信队列监控、性能故障排除和容量规划。关键词包括队列深度、消息速率、处理时间、错误率、消费者滞后、内存使用、告警规则、Grafana仪表板、性能优化。

DevOps 0 次安装 0 次浏览 更新于 3/5/2026

队列监控

概览

包括RabbitMQ、Redis和Kafka在内的消息队列监控全面指南,适用于生产系统。

目录

  1. 关键指标
  2. RabbitMQ监控
  3. Redis队列监控
  4. 健康检查
  5. 告警策略
  6. Grafana仪表板
  7. 死信队列监控
  8. 性能故障排除
  9. 容量规划
  10. 最佳实践

关键指标

核心指标

# 关键队列指标
queue_depth:
  description: 队列中等待的消息数量
  unit: count
  alert_threshold: > 1000

message_rate:
  description: 每秒消息数
  unit: messages/sec
  alert_threshold: < 1 或 > 10000

processing_time:
  description: 处理消息的时间
  unit: milliseconds
  alert_threshold: > 5000

error_rate:
  description: 每秒失败消息数
  unit: errors/sec
  alert_threshold: > 10

consumer_lag:
  description: 生产者背后的消息数
  unit: count
  alert_threshold: > 10000

queue_size_bytes:
  description: 队列总大小(字节)
  unit: bytes
  alert_threshold: > 1GB

指标收集

// metrics-collector.ts
import promClient from 'prom-client';

// 创建指标注册表
const register = new promClient.Registry();

// 队列深度量表
export const queueDepthGauge = new promClient.Gauge({
  name: 'queue_depth',
  help: '队列中的消息数',
  labelNames: ['queue_name', 'queue_type'],
  registers: [register],
});

// 消息速率量表
export const messageRateGauge = new promClient.Gauge({
  name: 'message_rate',
  help: '每秒消息数',
  labelNames: ['queue_name', 'direction'],  // direction: in/out
  registers: [register],
});

// 处理时间直方图
export const processingTimeHistogram = new promClient.Histogram({
  name: 'processing_time_ms',
  help: '处理消息的时间',
  labelNames: ['queue_name'],
  buckets: [10, 50, 100, 500, 1000, 5000, 10000],
  registers: [register],
});

// 错误速率量表
export const errorRateGauge = new promClient.Gauge({
  name: 'error_rate',
  help: '每秒错误消息数',
  labelNames: ['queue_name', 'error_type'],
  registers: [register],
});

// 消费者滞后量表
export const consumerLagGauge = new promClient.Gauge({
  name: 'consumer_lag',
  help: '生产者背后的消息数',
  labelNames: ['queue_name', 'consumer_group'],
  registers: [register],
});

// 导出指标端点
export const getMetrics = () => register.metrics();

RabbitMQ监控

管理插件

// rabbitmq-monitor.ts
import axios from 'axios';

export class RabbitMQMonitor {
  private baseUrl: string;
  private auth: { username: string; password: string };

  constructor(
    host: string = 'localhost',
    port: number = 15672,
    username: string = 'guest',
    password: string = 'guest'
  ) {
    this.baseUrl = `http://${host}:${port}/api`;
    this.auth = { username, password };
  }

  async getQueues(): Promise<any[]> {
    const response = await axios.get(`${this.baseUrl}/queues`, {
      auth: this.auth,
    });
    return response.data;
  }

  async getQueueMetrics(queueName: string, vhost: string = '/'): Promise<any> {
    const response = await axios.get(
      `${this.baseUrl}/queues/${encodeURIComponent(vhost)}/${encodeURIComponent(queueName)}`,
      { auth: this.auth }
    );
    return response.data;
  }

  async getOverview(): Promise<any> {
    const response = await axios.get(`${this.baseUrl}/overview`, {
      auth: this.auth,
    });
    return response.data;
  }

  async getConnections(): Promise<any[]> {
    const response = await axios.get(`${this.baseUrl}/connections`, {
      auth: this.auth,
    });
    return response.data;
  }

  async getChannels(): Promise<any[]> {
    const response = await axios.get(`${this.baseUrl}/channels`, {
      auth: this.auth,
    });
    return response.data;
  }

  async getConsumers(): Promise<any[]> {
    const response = await axios.get(`${this.baseUrl}/consumers`, {
      auth: this.auth,
    });
    return response.data;
  }

  async getNodeStats(): Promise<any> {
    const response = await axios.get(`${this.baseUrl}/nodes`, {
      auth: this.auth,
    });
    return response.data;
  }
}

// 使用
const monitor = new RabbitMQMonitor('localhost', 15672, 'admin', 'password');
const queues = await monitor.getQueues();
const overview = await monitor.getOverview();

Prometheus Exporter

# prometheus-rabbitmq-exporter.yml
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123

  prometheus-rabbitmq-exporter:
    image: kbudde/rabbitmq-exporter
    ports:
      - "9090:9090"
    environment:
      RABBIT_URL: http://admin:admin123@rabbitmq:15672
      PUBLISH_PORT: 9090
      OUTPUT_FORMAT: JSON
      LOG_LEVEL: info
      RABBIT_CAPABILITIES: bert,no_sort

RabbitMQ指标仪表板

{
  "dashboard": {
    "title": "RabbitMQ监控",
    "panels": [
      {
        "title": "队列深度",
        "targets": [
          {
            "expr": "rabbitmq_queue_messages{queue=~\".*\"}",
            "legendFormat": "{{queue}}"
          }
        ],
        "type": "graph"
      },
      {
        "title": "消息速率",
        "targets": [
          {
            "expr": "rate(rabbitmq_queue_messages_published_total[5m])",
            "legendFormat": "{{queue}} (发布)"
          },
          {
            "expr": "rate(rabbitmq_queue_messages_delivered_total[5m])",
            "legendFormat": "{{queue}} (交付)"
          }
        ],
        "type": "graph"
      },
      {
        "title": "消费者滞后",
        "targets": [
          {
            "expr": "rabbitmq_queue_messages_unacknowledged",
            "legendFormat": "{{queue}}"
          }
        ],
        "type": "graph"
      },
      {
        "title": "错误速率",
        "targets": [
          {
            "expr": "rate(rabbitmq_channel_messages_unroutable_total[5m])",
            "legendFormat": "无法路由"
          }
        ],
        "type": "graph"
      }
    ]
  }
}

Redis队列监控

Redis指标

// redis-monitor.ts
import { createClient } from 'redis';
import { queueDepthGauge, messageRateGauge } from './metrics-collector';

export class RedisQueueMonitor {
  private client: ReturnType<typeof createClient>;

  constructor(redisUrl: string = 'redis://localhost:6379') {
    this.client = createClient({ url: redisUrl });
  }

  async connect(): Promise<void> {
    await this.client.connect();
  }

  async getQueueInfo(queueName: string): Promise<any> {
    const length = await this.client.lLen(queueName);
    const memory = await this.client.memoryUsage(queueName);
    
    return {
      name: queueName,
      length,
      memoryBytes: memory,
    };
  }

  async getAllQueues(pattern: string = 'bull:*'): Promise<any[]> {
    const keys = await this.client.keys(pattern);
    const queues = [];
    
    for (const key of keys) {
      const info = await this.getQueueInfo(key);
      queues.push(info);
    }
    
    return queues;
  }

  async getBullQueueMetrics(queueName: string): Promise<any> {
    const waiting = await this.client.lLen(`${queueName}:waiting`);
    const active = await this.client.lLen(`${queueName}:active`);
    const completed = await this.client.lLen(`${queueName}:completed`);
    const failed = await this.client.lLen(`${queueName}:failed`);
    const delayed = await this.client.zCard(`${queueName}:delayed`);
    
    return {
      queueName,
      waiting,
      active,
      completed,
      failed,
      delayed,
    };
  }

  async collectMetrics(queueNames: string[]): Promise<void> {
    for (const queueName of queueNames) {
      const metrics = await this.getBullQueueMetrics(queueName);
      
      queueDepthGauge.set(
        { queue_name: queueName, queue_type: 'waiting' },
        metrics.waiting
      );
      queueDepthGauge.set(
        { queue_name: queueName, queue_type: 'active' },
        metrics.active
      );
      queueDepthGauge.set(
        { queue_name: queueName, queue_type: 'failed' },
        metrics.failed
      );
    }
  }

  async close(): Promise<void> {
    await this.client.quit();
  }
}

// 使用
const monitor = new RedisQueueMonitor('redis://localhost:6379');
await monitor.connect();
const metrics = await monitor.getBullQueueMetrics('emails');
await monitor.collectMetrics(['emails', 'notifications']);

Redis Exporter

# prometheus-redis-exporter.yml
version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  redis-exporter:
    image: oliver006/redis_exporter
    ports:
      - "9121:9121"
    environment:
      REDIS_ADDR: redis://redis:6379
      REDIS_EXPORTER_LOG_FORMAT: txt

健康检查

队列健康检查

// health-check.ts
import { RabbitMQMonitor } from './rabbitmq-monitor';
import { RedisQueueMonitor } from './redis-monitor';

export interface HealthCheckResult {
  healthy: boolean;
  checks: Record<string, boolean>;
  metrics: any;
}

export class QueueHealthChecker {
  private rabbitMQMonitor?: RabbitMQMonitor;
  private redisMonitor?: RedisQueueMonitor;

  constructor(config: {
    rabbitmq?: { host: string; port: number };
    redis?: { url: string };
  }) {
    if (config.rabbitmq) {
      this.rabbitMQMonitor = new RabbitMQMonitor(
        config.rabbitmq.host,
        config.rabbitmq.port
      );
    }
    if (config.redis) {
      this.redisMonitor = new RedisQueueMonitor(config.redis.url);
    }
  }

  async checkHealth(): Promise<HealthCheckResult> {
    const checks: Record<string, boolean> = {};
    const metrics: any = {};

    // 检查RabbitMQ
    if (this.rabbitMQMonitor) {
      try {
        const overview = await this.rabbitMQMonitor.getOverview();
        checks['rabbitmq'] = true;
        metrics.rabbitmq = overview;
      } catch (error) {
        checks['rabbitmq'] = false;
        metrics.rabbitmq = { error: (error as Error).message };
      }
    }

    // 检查Redis
    if (this.redisMonitor) {
      try {
        await this.redisMonitor.connect();
        const queues = await this.redisMonitor.getAllQueues();
        checks['redis'] = true;
        metrics.redis = { queueCount: queues.length };
        await this.redisMonitor.close();
      } catch (error) {
        checks['redis'] = false;
        metrics.redis = { error: (error as Error).message };
      }
    }

    const healthy = Object.values(checks).every((v) => v === true);

    return { healthy, checks, metrics };
  }
}

// 在Express中使用
import express from 'express';

const app = express();
const healthChecker = new QueueHealthChecker({
  rabbitmq: { host: 'localhost', port: 15672 },
  redis: { url: 'redis://localhost:6379' },
});

app.get('/health', async (req, res) => {
  const health = await healthChecker.checkHealth();
  res.status(health.healthy ? 200 : 503).json(health);
});

告警策略

Prometheus告警规则

# alert-rules.yml
groups:
  - name: queue_alerts
    interval: 30s
    rules:
      # 高队列深度
      - alert: HighQueueDepth
        expr: rabbitmq_queue_messages > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "检测到高队列深度"
          description: "队列 {{ $labels.queue }} 有 {{ $value }} 条消息"

      # 队列快速增长
      - alert: QueueGrowingRapidly
        expr: rate(rabbitmq_queue_messages[5m]) > 100
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "队列快速增长"
          description: "队列 {{ $labels.queue }} 以 {{ $value }} 条消息/秒的速度增长"

      # 高错误率
      - alert: HighErrorRate
        expr: rate(rabbitmq_channel_messages_unroutable_total[5m]) > 10
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "检测到高错误率"
          description: "错误率为 {{ $value }} 条错误/秒"

      # 消费者滞后
      - alert: HighConsumerLag
        expr: rabbitmq_queue_messages_unacknowledged > 5000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "高消费者滞后"
          description: "消费者滞后为 {{ $value }} 条消息"

      # 无消费者
      - alert: NoConsumers
        expr: rabbitmq_queue_consumers == 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "未检测到消费者"
          description: "队列 {{ $labels.queue }} 没有消费者"

      # Redis内存使用
      - alert: HighRedisMemory
        expr: redis_memory_used_bytes / redis_memory_max_bytes > 0.9
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Redis内存使用率高"
          description: "Redis使用率为 {{ $value | humanizePercentage }}"

自定义告警

// alert-manager.ts
import axios from 'axios';

export interface AlertConfig {
  webhookUrl: string;
  channels: string[];
  severity: 'info' | 'warning' | 'critical';
}

export class AlertManager {
  private alertHistory: Map<string, number> = new Map();
  private cooldownMs: number = 300000; // 5分钟

  constructor(private config: AlertConfig) {}

  async sendAlert(
    alertName: string,
    message: string,
    severity: string = this.config.severity
  ): Promise<void> {
    // 检查冷却时间
    const lastAlert = this.alertHistory.get(alertName);
    if (lastAlert && Date.now() - lastAlert < this.cooldownMs) {
      return; // 由于冷却时间跳过
    }

    const payload = {
      text: `[${severity.toUpperCase()}] ${alertName}`,
      attachments: [
        {
          color: this.getColorForSeverity(severity),
          text: message,
          ts: Math.floor(Date.now() / 1000),
        },
      ],
    };

    try {
      await axios.post(this.config.webhookUrl, payload);
      this.alertHistory.set(alertName, Date.now());
    } catch (error) {
      console.error('发送告警失败:', error);
    }
  }

  private getColorForSeverity(severity: string): string {
    const colors: Record<string, string> = {
      info: '#36a64f',
      warning: '#ff9900',
      critical: '#ff0000',
    };
    return colors[severity] || '#36a64f';
  }

  async sendQueueAlert(
    queueName: string,
    metric: string,
    value: number,
    threshold: number
  ): Promise<void> {
    const message = `队列 ${queueName} ${metric} 为 ${value},阈值为 ${threshold}`;
    await this.sendAlert(
      `QueueAlert_${queueName}_${metric}`,
      message,
      value > threshold * 2 ? 'critical' : 'warning'
    );
  }
}

// 使用
const alertManager = new AlertManager({
  webhookUrl: process.env.SLACK_WEBHOOK_URL!,
  channels: ['#alerts'],
  severity: 'warning',
});

await alertManager.sendQueueAlert('emails', 'depth', 1500, 1000);

Grafana仪表板

队列监控仪表板

{
  "dashboard": {
    "title": "队列监控仪表板",
    "panels": [
      {
        "title": "队列深度",
        "type": "graph",
        "targets": [
          {
            "expr": "rabbitmq_queue_messages",
            "legendFormat": "{{queue}}"
          }
        ]
      },
      {
        "title": "消息发布速率",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(rabbitmq_queue_messages_published_total[5m])",
            "legendFormat": "{{queue}}"
          }
        ]
      },
      {
        "title": "消息交付速率",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(rabbitmq_queue_messages_delivered_total[5m])",
            "legendFormat": "{{queue}}"
          }
        ]
      },
      {
        "title": "消费者滞后",
        "type": "graph",
        "targets": [
          {
            "expr": "rabbitmq_queue_messages_unacknowledged",
            "legendFormat": "{{queue}}"
          }
        ]
      },
      {
        "title": "错误速率",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(rabbitmq_channel_messages_unroutable_total[5m])",
            "legendFormat": "无法路由"
          }
        ]
      },
      {
        "title": "队列内存",
        "type": "graph",
        "targets": [
          {
            "expr": "rabbitmq_queue_memory_bytes",
            "legendFormat": "{{queue}}"
          }
        ]
      }
    ]
  }
}

死信队列监控

DLQ监控

// dlq-monitor.ts
import { RabbitMQMonitor } from './rabbitmq-monitor';
import { AlertManager } from './alert-manager';

export class DLQMonitor {
  constructor(
    private rabbitMQMonitor: RabbitMQMonitor,
    private alertManager: AlertManager
  ) {}

  async monitorDLQ(dlqName: string, threshold: number = 100): Promise<void> {
    const metrics = await this.rabbitMQMonitor.getQueueMetrics(dlqName);
    
    if (metrics.messages > threshold) {
      await this.alertManager.sendAlert(
        `DLQAlert_${dlqName}`,
        `死信队列 ${dlqName} 有 ${metrics.messages} 条消息`,
        metrics.messages > threshold * 5 ? 'critical' : 'warning'
      );
    }
  }

  async getDLQMessages(dlqName: string, limit: number = 10): Promise<any[]> {
    const messages = await this.rabbitMQMonitor.getMessages(dlqName, limit);
    return messages;
  }

  async analyzeDLQ(dlqName: string): Promise<any> {
    const messages = await this.getDLQMessages(dlqName, 100);
    
    const analysis = {
      total: messages.length,
      byReason: {} as Record<string, number>,
      byQueue: {} as Record<string, number>,
    };
    
    for (const message of messages) {
      const reason = message.payload?.properties?.headers?.['x-death']?.[0]?.reason || 'unknown';
      const originalQueue = message.payload?.properties?.headers?.['x-death']?.[0]?.queue || 'unknown';
      
      analysis.byReason[reason] = (analysis.byReason[reason] || 0) + 1;
      analysis.byQueue[originalQueue] = (analysis.byQueue[originalQueue] || 0) + 1;
    }
    
    return analysis;
  }
}

性能故障排除

常见问题

// troubleshooting.ts
export class QueueTroubleshooter {
  static async diagnoseSlowConsumers(queueName: string): Promise<string[]> {
    const issues: string[] = [];
    
    // 检查消费者数量
    const consumers = await this.getConsumerCount(queueName);
    if (consumers === 0) {
      issues.push('队列未附加消费者');
    }
    
    // 检查预取设置
    const prefetch = await this.getPrefetchCount(queueName);
    if (prefetch > 100) {
      issues.push(`高预取计数 (${prefetch}) 可能导致内存问题`);
    }
    
    // 检查处理时间
    const avgProcessingTime = await this.getAvgProcessingTime(queueName);
    if (avgProcessingTime > 5000) {
      issues.push(`高平均处理时间 (${avgProcessingTime}ms)`);
    }
    
    return issues;
  }

  static async diagnoseBacklog(queueName: string): Promise<string[]> {
    const issues: string[] = [];
    
    // 检查队列深度
    const depth = await this.getQueueDepth(queueName);
    if (depth > 10000) {
      issues.push(`大队列积压 (${depth} 条消息)`);
    }
    
    // 检查消息速率
    const publishRate = await this.getPublishRate(queueName);
    const consumeRate = await this.getConsumeRate(queueName);
    
    if (publishRate > consumeRate * 1.5) {
      issues.push('发布速率明显高于消费速率');
    }
    
    return issues;
  }

  static async diagnoseMemoryIssues(queueName: string): Promise<string[]> {
    const issues: string[] = [];
    
    // 检查队列内存
    const memory = await this.getQueueMemory(queueName);
    if (memory > 1073741824) { // 1GB
      issues.push(`高队列内存使用量 (${memory / 1024 / 1024}MB)`);
    }
    
    // 检查消息大小
    const avgMessageSize = await this.getAvgMessageSize(queueName);
    if (avgMessageSize > 1048576) { // 1MB
      issues.push(`大平均消息大小 (${avgMessageSize / 1024}KB)`);
    }
    
    return issues;
  }

  private static async getConsumerCount(queueName: string): Promise<number> {
    // 实现
    return 0;
  }

  private static async getPrefetchCount(queueName: string): Promise<number> {
    // 实现
    return 0;
  }

  private static async getAvgProcessingTime(queueName: string): Promise<number> {
    // 实现
    return 0;
  }

  private static async getQueueDepth(queueName: string): Promise<number> {
    // 实现
    return 0;
  }

  private static async getPublishRate(queueName: string): Promise<number> {
    // 实现
    return 0;
  }

  private static async getConsumeRate(queueName: string): Promise<number> {
    // 实现
    return 0;
  }

  private static async getQueueMemory(queueName: string): Promise<number> {
    // 实现
    return 0;
  }

  private static async getAvgMessageSize(queueName: string): Promise<number> {
    // 实现
    return 0;
  }
}

容量规划

容量计算器

// capacity-planning.ts
export interface QueueCapacity {
  queueName: string;
  currentDepth: number;
  peakDepth: number;
  avgMessageSize: number;
  avgProcessingTime: number;
  consumers: number;
}

export class CapacityPlanner {
  static calculateRequiredConsumers(capacity: QueueCapacity): number {
    const messagesPerSecond = capacity.peakDepth / 86400; // 24小时内的峰值
    const processingPerConsumer = 1000 / capacity.avgProcessingTime;
    
    const required = Math.ceil(messagesPerSecond / processingPerConsumer);
    
    // 增加20%缓冲
    return Math.ceil(required * 1.2);
  }

  static estimateQueueMemory(capacity: QueueCapacity): number {
    return capacity.peakDepth * capacity.avgMessageSize;
  }

  static calculateThroughput(capacity: QueueCapacity): number {
    return capacity.consumers * (1000 / capacity.avgProcessingTime);
  }

  static recommendConfiguration(capacity: QueueCapacity): any {
    const requiredConsumers = this.calculateRequiredConsumers(capacity);
    const estimatedMemory = this.estimateQueueMemory(capacity);
    const throughput = this.calculateThroughput(capacity);
    
    return {
      recommendedConsumers: requiredConsumers,
      estimatedMemoryBytes: estimatedMemory,
      estimatedMemoryMB: estimatedMemory / 1024 / 1024,
      estimatedThroughput: throughput,
      currentConsumers: capacity.consumers,
      needsScaling: requiredConsumers > capacity.consumers,
    };
  }
}

// 使用
const capacity: QueueCapacity = {
  queueName: 'emails',
  currentDepth: 5000,
  peakDepth: 20000,
  avgMessageSize: 1024,
  avgProcessingTime: 100,
  consumers: 5,
};

const recommendation = CapacityPlanner.recommendConfiguration(capacity);
console.log(recommendation);

最佳实践

监控清单

## 队列监控清单

### 基础指标
- [ ] 队列深度(等待消息)
- [ ] 消息速率(发布/交付)
- [ ] 处理时间
- [ ] 错误率
- [ ] 消费者滞后

### 高级指标
- [ ] 队列内存使用
- [ ] 消费者数量
- [ ] 预取设置
- [ ] 连接数量
- [ ] 频道数量

### 告警
- [ ] 高队列深度
- [ ] 增长队列
- [ ] 高错误率
- [ ] 无消费者
- [ ] DLQ消息

### 仪表板
- [ ] 实时队列指标
- [ ] 历史趋势
- [ ] 消费者性能
- [ ] 错误分析
- [ ] 容量规划

额外资源