队列监控
概览
包括RabbitMQ、Redis和Kafka在内的消息队列监控全面指南,适用于生产系统。
目录
关键指标
核心指标
# 关键队列指标
queue_depth:
description: 队列中等待的消息数量
unit: count
alert_threshold: > 1000
message_rate:
description: 每秒消息数
unit: messages/sec
alert_threshold: < 1 或 > 10000
processing_time:
description: 处理消息的时间
unit: milliseconds
alert_threshold: > 5000
error_rate:
description: 每秒失败消息数
unit: errors/sec
alert_threshold: > 10
consumer_lag:
description: 生产者背后的消息数
unit: count
alert_threshold: > 10000
queue_size_bytes:
description: 队列总大小(字节)
unit: bytes
alert_threshold: > 1GB
指标收集
// metrics-collector.ts
import promClient from 'prom-client';
// 创建指标注册表
const register = new promClient.Registry();
// 队列深度量表
export const queueDepthGauge = new promClient.Gauge({
name: 'queue_depth',
help: '队列中的消息数',
labelNames: ['queue_name', 'queue_type'],
registers: [register],
});
// 消息速率量表
export const messageRateGauge = new promClient.Gauge({
name: 'message_rate',
help: '每秒消息数',
labelNames: ['queue_name', 'direction'], // direction: in/out
registers: [register],
});
// 处理时间直方图
export const processingTimeHistogram = new promClient.Histogram({
name: 'processing_time_ms',
help: '处理消息的时间',
labelNames: ['queue_name'],
buckets: [10, 50, 100, 500, 1000, 5000, 10000],
registers: [register],
});
// 错误速率量表
export const errorRateGauge = new promClient.Gauge({
name: 'error_rate',
help: '每秒错误消息数',
labelNames: ['queue_name', 'error_type'],
registers: [register],
});
// 消费者滞后量表
export const consumerLagGauge = new promClient.Gauge({
name: 'consumer_lag',
help: '生产者背后的消息数',
labelNames: ['queue_name', 'consumer_group'],
registers: [register],
});
// 导出指标端点
export const getMetrics = () => register.metrics();
RabbitMQ监控
管理插件
// rabbitmq-monitor.ts
import axios from 'axios';
export class RabbitMQMonitor {
private baseUrl: string;
private auth: { username: string; password: string };
constructor(
host: string = 'localhost',
port: number = 15672,
username: string = 'guest',
password: string = 'guest'
) {
this.baseUrl = `http://${host}:${port}/api`;
this.auth = { username, password };
}
async getQueues(): Promise<any[]> {
const response = await axios.get(`${this.baseUrl}/queues`, {
auth: this.auth,
});
return response.data;
}
async getQueueMetrics(queueName: string, vhost: string = '/'): Promise<any> {
const response = await axios.get(
`${this.baseUrl}/queues/${encodeURIComponent(vhost)}/${encodeURIComponent(queueName)}`,
{ auth: this.auth }
);
return response.data;
}
async getOverview(): Promise<any> {
const response = await axios.get(`${this.baseUrl}/overview`, {
auth: this.auth,
});
return response.data;
}
async getConnections(): Promise<any[]> {
const response = await axios.get(`${this.baseUrl}/connections`, {
auth: this.auth,
});
return response.data;
}
async getChannels(): Promise<any[]> {
const response = await axios.get(`${this.baseUrl}/channels`, {
auth: this.auth,
});
return response.data;
}
async getConsumers(): Promise<any[]> {
const response = await axios.get(`${this.baseUrl}/consumers`, {
auth: this.auth,
});
return response.data;
}
async getNodeStats(): Promise<any> {
const response = await axios.get(`${this.baseUrl}/nodes`, {
auth: this.auth,
});
return response.data;
}
}
// 使用
const monitor = new RabbitMQMonitor('localhost', 15672, 'admin', 'password');
const queues = await monitor.getQueues();
const overview = await monitor.getOverview();
Prometheus Exporter
# prometheus-rabbitmq-exporter.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin123
prometheus-rabbitmq-exporter:
image: kbudde/rabbitmq-exporter
ports:
- "9090:9090"
environment:
RABBIT_URL: http://admin:admin123@rabbitmq:15672
PUBLISH_PORT: 9090
OUTPUT_FORMAT: JSON
LOG_LEVEL: info
RABBIT_CAPABILITIES: bert,no_sort
RabbitMQ指标仪表板
{
"dashboard": {
"title": "RabbitMQ监控",
"panels": [
{
"title": "队列深度",
"targets": [
{
"expr": "rabbitmq_queue_messages{queue=~\".*\"}",
"legendFormat": "{{queue}}"
}
],
"type": "graph"
},
{
"title": "消息速率",
"targets": [
{
"expr": "rate(rabbitmq_queue_messages_published_total[5m])",
"legendFormat": "{{queue}} (发布)"
},
{
"expr": "rate(rabbitmq_queue_messages_delivered_total[5m])",
"legendFormat": "{{queue}} (交付)"
}
],
"type": "graph"
},
{
"title": "消费者滞后",
"targets": [
{
"expr": "rabbitmq_queue_messages_unacknowledged",
"legendFormat": "{{queue}}"
}
],
"type": "graph"
},
{
"title": "错误速率",
"targets": [
{
"expr": "rate(rabbitmq_channel_messages_unroutable_total[5m])",
"legendFormat": "无法路由"
}
],
"type": "graph"
}
]
}
}
Redis队列监控
Redis指标
// redis-monitor.ts
import { createClient } from 'redis';
import { queueDepthGauge, messageRateGauge } from './metrics-collector';
export class RedisQueueMonitor {
private client: ReturnType<typeof createClient>;
constructor(redisUrl: string = 'redis://localhost:6379') {
this.client = createClient({ url: redisUrl });
}
async connect(): Promise<void> {
await this.client.connect();
}
async getQueueInfo(queueName: string): Promise<any> {
const length = await this.client.lLen(queueName);
const memory = await this.client.memoryUsage(queueName);
return {
name: queueName,
length,
memoryBytes: memory,
};
}
async getAllQueues(pattern: string = 'bull:*'): Promise<any[]> {
const keys = await this.client.keys(pattern);
const queues = [];
for (const key of keys) {
const info = await this.getQueueInfo(key);
queues.push(info);
}
return queues;
}
async getBullQueueMetrics(queueName: string): Promise<any> {
const waiting = await this.client.lLen(`${queueName}:waiting`);
const active = await this.client.lLen(`${queueName}:active`);
const completed = await this.client.lLen(`${queueName}:completed`);
const failed = await this.client.lLen(`${queueName}:failed`);
const delayed = await this.client.zCard(`${queueName}:delayed`);
return {
queueName,
waiting,
active,
completed,
failed,
delayed,
};
}
async collectMetrics(queueNames: string[]): Promise<void> {
for (const queueName of queueNames) {
const metrics = await this.getBullQueueMetrics(queueName);
queueDepthGauge.set(
{ queue_name: queueName, queue_type: 'waiting' },
metrics.waiting
);
queueDepthGauge.set(
{ queue_name: queueName, queue_type: 'active' },
metrics.active
);
queueDepthGauge.set(
{ queue_name: queueName, queue_type: 'failed' },
metrics.failed
);
}
}
async close(): Promise<void> {
await this.client.quit();
}
}
// 使用
const monitor = new RedisQueueMonitor('redis://localhost:6379');
await monitor.connect();
const metrics = await monitor.getBullQueueMetrics('emails');
await monitor.collectMetrics(['emails', 'notifications']);
Redis Exporter
# prometheus-redis-exporter.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
redis-exporter:
image: oliver006/redis_exporter
ports:
- "9121:9121"
environment:
REDIS_ADDR: redis://redis:6379
REDIS_EXPORTER_LOG_FORMAT: txt
健康检查
队列健康检查
// health-check.ts
import { RabbitMQMonitor } from './rabbitmq-monitor';
import { RedisQueueMonitor } from './redis-monitor';
export interface HealthCheckResult {
healthy: boolean;
checks: Record<string, boolean>;
metrics: any;
}
export class QueueHealthChecker {
private rabbitMQMonitor?: RabbitMQMonitor;
private redisMonitor?: RedisQueueMonitor;
constructor(config: {
rabbitmq?: { host: string; port: number };
redis?: { url: string };
}) {
if (config.rabbitmq) {
this.rabbitMQMonitor = new RabbitMQMonitor(
config.rabbitmq.host,
config.rabbitmq.port
);
}
if (config.redis) {
this.redisMonitor = new RedisQueueMonitor(config.redis.url);
}
}
async checkHealth(): Promise<HealthCheckResult> {
const checks: Record<string, boolean> = {};
const metrics: any = {};
// 检查RabbitMQ
if (this.rabbitMQMonitor) {
try {
const overview = await this.rabbitMQMonitor.getOverview();
checks['rabbitmq'] = true;
metrics.rabbitmq = overview;
} catch (error) {
checks['rabbitmq'] = false;
metrics.rabbitmq = { error: (error as Error).message };
}
}
// 检查Redis
if (this.redisMonitor) {
try {
await this.redisMonitor.connect();
const queues = await this.redisMonitor.getAllQueues();
checks['redis'] = true;
metrics.redis = { queueCount: queues.length };
await this.redisMonitor.close();
} catch (error) {
checks['redis'] = false;
metrics.redis = { error: (error as Error).message };
}
}
const healthy = Object.values(checks).every((v) => v === true);
return { healthy, checks, metrics };
}
}
// 在Express中使用
import express from 'express';
const app = express();
const healthChecker = new QueueHealthChecker({
rabbitmq: { host: 'localhost', port: 15672 },
redis: { url: 'redis://localhost:6379' },
});
app.get('/health', async (req, res) => {
const health = await healthChecker.checkHealth();
res.status(health.healthy ? 200 : 503).json(health);
});
告警策略
Prometheus告警规则
# alert-rules.yml
groups:
- name: queue_alerts
interval: 30s
rules:
# 高队列深度
- alert: HighQueueDepth
expr: rabbitmq_queue_messages > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "检测到高队列深度"
description: "队列 {{ $labels.queue }} 有 {{ $value }} 条消息"
# 队列快速增长
- alert: QueueGrowingRapidly
expr: rate(rabbitmq_queue_messages[5m]) > 100
for: 2m
labels:
severity: warning
annotations:
summary: "队列快速增长"
description: "队列 {{ $labels.queue }} 以 {{ $value }} 条消息/秒的速度增长"
# 高错误率
- alert: HighErrorRate
expr: rate(rabbitmq_channel_messages_unroutable_total[5m]) > 10
for: 5m
labels:
severity: critical
annotations:
summary: "检测到高错误率"
description: "错误率为 {{ $value }} 条错误/秒"
# 消费者滞后
- alert: HighConsumerLag
expr: rabbitmq_queue_messages_unacknowledged > 5000
for: 10m
labels:
severity: warning
annotations:
summary: "高消费者滞后"
description: "消费者滞后为 {{ $value }} 条消息"
# 无消费者
- alert: NoConsumers
expr: rabbitmq_queue_consumers == 0
for: 5m
labels:
severity: warning
annotations:
summary: "未检测到消费者"
description: "队列 {{ $labels.queue }} 没有消费者"
# Redis内存使用
- alert: HighRedisMemory
expr: redis_memory_used_bytes / redis_memory_max_bytes > 0.9
for: 5m
labels:
severity: critical
annotations:
summary: "Redis内存使用率高"
description: "Redis使用率为 {{ $value | humanizePercentage }}"
自定义告警
// alert-manager.ts
import axios from 'axios';
export interface AlertConfig {
webhookUrl: string;
channels: string[];
severity: 'info' | 'warning' | 'critical';
}
export class AlertManager {
private alertHistory: Map<string, number> = new Map();
private cooldownMs: number = 300000; // 5分钟
constructor(private config: AlertConfig) {}
async sendAlert(
alertName: string,
message: string,
severity: string = this.config.severity
): Promise<void> {
// 检查冷却时间
const lastAlert = this.alertHistory.get(alertName);
if (lastAlert && Date.now() - lastAlert < this.cooldownMs) {
return; // 由于冷却时间跳过
}
const payload = {
text: `[${severity.toUpperCase()}] ${alertName}`,
attachments: [
{
color: this.getColorForSeverity(severity),
text: message,
ts: Math.floor(Date.now() / 1000),
},
],
};
try {
await axios.post(this.config.webhookUrl, payload);
this.alertHistory.set(alertName, Date.now());
} catch (error) {
console.error('发送告警失败:', error);
}
}
private getColorForSeverity(severity: string): string {
const colors: Record<string, string> = {
info: '#36a64f',
warning: '#ff9900',
critical: '#ff0000',
};
return colors[severity] || '#36a64f';
}
async sendQueueAlert(
queueName: string,
metric: string,
value: number,
threshold: number
): Promise<void> {
const message = `队列 ${queueName} ${metric} 为 ${value},阈值为 ${threshold}`;
await this.sendAlert(
`QueueAlert_${queueName}_${metric}`,
message,
value > threshold * 2 ? 'critical' : 'warning'
);
}
}
// 使用
const alertManager = new AlertManager({
webhookUrl: process.env.SLACK_WEBHOOK_URL!,
channels: ['#alerts'],
severity: 'warning',
});
await alertManager.sendQueueAlert('emails', 'depth', 1500, 1000);
Grafana仪表板
队列监控仪表板
{
"dashboard": {
"title": "队列监控仪表板",
"panels": [
{
"title": "队列深度",
"type": "graph",
"targets": [
{
"expr": "rabbitmq_queue_messages",
"legendFormat": "{{queue}}"
}
]
},
{
"title": "消息发布速率",
"type": "graph",
"targets": [
{
"expr": "rate(rabbitmq_queue_messages_published_total[5m])",
"legendFormat": "{{queue}}"
}
]
},
{
"title": "消息交付速率",
"type": "graph",
"targets": [
{
"expr": "rate(rabbitmq_queue_messages_delivered_total[5m])",
"legendFormat": "{{queue}}"
}
]
},
{
"title": "消费者滞后",
"type": "graph",
"targets": [
{
"expr": "rabbitmq_queue_messages_unacknowledged",
"legendFormat": "{{queue}}"
}
]
},
{
"title": "错误速率",
"type": "graph",
"targets": [
{
"expr": "rate(rabbitmq_channel_messages_unroutable_total[5m])",
"legendFormat": "无法路由"
}
]
},
{
"title": "队列内存",
"type": "graph",
"targets": [
{
"expr": "rabbitmq_queue_memory_bytes",
"legendFormat": "{{queue}}"
}
]
}
]
}
}
死信队列监控
DLQ监控
// dlq-monitor.ts
import { RabbitMQMonitor } from './rabbitmq-monitor';
import { AlertManager } from './alert-manager';
export class DLQMonitor {
constructor(
private rabbitMQMonitor: RabbitMQMonitor,
private alertManager: AlertManager
) {}
async monitorDLQ(dlqName: string, threshold: number = 100): Promise<void> {
const metrics = await this.rabbitMQMonitor.getQueueMetrics(dlqName);
if (metrics.messages > threshold) {
await this.alertManager.sendAlert(
`DLQAlert_${dlqName}`,
`死信队列 ${dlqName} 有 ${metrics.messages} 条消息`,
metrics.messages > threshold * 5 ? 'critical' : 'warning'
);
}
}
async getDLQMessages(dlqName: string, limit: number = 10): Promise<any[]> {
const messages = await this.rabbitMQMonitor.getMessages(dlqName, limit);
return messages;
}
async analyzeDLQ(dlqName: string): Promise<any> {
const messages = await this.getDLQMessages(dlqName, 100);
const analysis = {
total: messages.length,
byReason: {} as Record<string, number>,
byQueue: {} as Record<string, number>,
};
for (const message of messages) {
const reason = message.payload?.properties?.headers?.['x-death']?.[0]?.reason || 'unknown';
const originalQueue = message.payload?.properties?.headers?.['x-death']?.[0]?.queue || 'unknown';
analysis.byReason[reason] = (analysis.byReason[reason] || 0) + 1;
analysis.byQueue[originalQueue] = (analysis.byQueue[originalQueue] || 0) + 1;
}
return analysis;
}
}
性能故障排除
常见问题
// troubleshooting.ts
export class QueueTroubleshooter {
static async diagnoseSlowConsumers(queueName: string): Promise<string[]> {
const issues: string[] = [];
// 检查消费者数量
const consumers = await this.getConsumerCount(queueName);
if (consumers === 0) {
issues.push('队列未附加消费者');
}
// 检查预取设置
const prefetch = await this.getPrefetchCount(queueName);
if (prefetch > 100) {
issues.push(`高预取计数 (${prefetch}) 可能导致内存问题`);
}
// 检查处理时间
const avgProcessingTime = await this.getAvgProcessingTime(queueName);
if (avgProcessingTime > 5000) {
issues.push(`高平均处理时间 (${avgProcessingTime}ms)`);
}
return issues;
}
static async diagnoseBacklog(queueName: string): Promise<string[]> {
const issues: string[] = [];
// 检查队列深度
const depth = await this.getQueueDepth(queueName);
if (depth > 10000) {
issues.push(`大队列积压 (${depth} 条消息)`);
}
// 检查消息速率
const publishRate = await this.getPublishRate(queueName);
const consumeRate = await this.getConsumeRate(queueName);
if (publishRate > consumeRate * 1.5) {
issues.push('发布速率明显高于消费速率');
}
return issues;
}
static async diagnoseMemoryIssues(queueName: string): Promise<string[]> {
const issues: string[] = [];
// 检查队列内存
const memory = await this.getQueueMemory(queueName);
if (memory > 1073741824) { // 1GB
issues.push(`高队列内存使用量 (${memory / 1024 / 1024}MB)`);
}
// 检查消息大小
const avgMessageSize = await this.getAvgMessageSize(queueName);
if (avgMessageSize > 1048576) { // 1MB
issues.push(`大平均消息大小 (${avgMessageSize / 1024}KB)`);
}
return issues;
}
private static async getConsumerCount(queueName: string): Promise<number> {
// 实现
return 0;
}
private static async getPrefetchCount(queueName: string): Promise<number> {
// 实现
return 0;
}
private static async getAvgProcessingTime(queueName: string): Promise<number> {
// 实现
return 0;
}
private static async getQueueDepth(queueName: string): Promise<number> {
// 实现
return 0;
}
private static async getPublishRate(queueName: string): Promise<number> {
// 实现
return 0;
}
private static async getConsumeRate(queueName: string): Promise<number> {
// 实现
return 0;
}
private static async getQueueMemory(queueName: string): Promise<number> {
// 实现
return 0;
}
private static async getAvgMessageSize(queueName: string): Promise<number> {
// 实现
return 0;
}
}
容量规划
容量计算器
// capacity-planning.ts
export interface QueueCapacity {
queueName: string;
currentDepth: number;
peakDepth: number;
avgMessageSize: number;
avgProcessingTime: number;
consumers: number;
}
export class CapacityPlanner {
static calculateRequiredConsumers(capacity: QueueCapacity): number {
const messagesPerSecond = capacity.peakDepth / 86400; // 24小时内的峰值
const processingPerConsumer = 1000 / capacity.avgProcessingTime;
const required = Math.ceil(messagesPerSecond / processingPerConsumer);
// 增加20%缓冲
return Math.ceil(required * 1.2);
}
static estimateQueueMemory(capacity: QueueCapacity): number {
return capacity.peakDepth * capacity.avgMessageSize;
}
static calculateThroughput(capacity: QueueCapacity): number {
return capacity.consumers * (1000 / capacity.avgProcessingTime);
}
static recommendConfiguration(capacity: QueueCapacity): any {
const requiredConsumers = this.calculateRequiredConsumers(capacity);
const estimatedMemory = this.estimateQueueMemory(capacity);
const throughput = this.calculateThroughput(capacity);
return {
recommendedConsumers: requiredConsumers,
estimatedMemoryBytes: estimatedMemory,
estimatedMemoryMB: estimatedMemory / 1024 / 1024,
estimatedThroughput: throughput,
currentConsumers: capacity.consumers,
needsScaling: requiredConsumers > capacity.consumers,
};
}
}
// 使用
const capacity: QueueCapacity = {
queueName: 'emails',
currentDepth: 5000,
peakDepth: 20000,
avgMessageSize: 1024,
avgProcessingTime: 100,
consumers: 5,
};
const recommendation = CapacityPlanner.recommendConfiguration(capacity);
console.log(recommendation);
最佳实践
监控清单
## 队列监控清单
### 基础指标
- [ ] 队列深度(等待消息)
- [ ] 消息速率(发布/交付)
- [ ] 处理时间
- [ ] 错误率
- [ ] 消费者滞后
### 高级指标
- [ ] 队列内存使用
- [ ] 消费者数量
- [ ] 预取设置
- [ ] 连接数量
- [ ] 频道数量
### 告警
- [ ] 高队列深度
- [ ] 增长队列
- [ ] 高错误率
- [ ] 无消费者
- [ ] DLQ消息
### 仪表板
- [ ] 实时队列指标
- [ ] 历史趋势
- [ ] 消费者性能
- [ ] 错误分析
- [ ] 容量规划