Redis消息队列开发技能 redis-queue

这个技能提供了使用Redis作为消息队列的全面指南,特别针对Node.js应用。涵盖Bull和BullMQ的设置、作业创建、处理、优先级、延迟作业、重复作业、生命周期管理、错误处理、重试、队列监控、工作线程扩展和生产模式。关键词:Redis, 消息队列, BullMQ, Node.js, 后端开发

后端开发 0 次安装 0 次浏览 更新于 3/5/2026

Redis队列

概述

使用Bull和BullMQ为Node.js应用程序提供的Redis作为消息队列的全面指南。

目录

  1. Redis队列概念
  2. Bull/BullMQ设置
  3. 作业创建
  4. 作业处理
  5. 作业优先级
  6. 延迟作业
  7. 可重复作业
  8. 作业生命周期
  9. 错误处理
  10. 重试
  11. 队列监控
  12. 扩展工作线程
  13. 生产模式

Redis队列概念

核心概念

// Redis队列核心概念
/**
 * 队列: 等待处理的作业命名列表
 * 作业: 要处理的工作单元
 * 工作线程: 从队列处理作业的进程
 * 优先级: 作业的重要性级别(越高越先处理)
 * 延迟: 处理作业前的等待时间
 * 重复: 安排作业定期运行
 * 重试: 失败作业的自动重试
 * 回退: 重试尝试之间的延迟
 */

基本设置

// redis-connection.ts
import { Queue, Worker, Job, QueueOptions } from 'bullmq';
import { Redis } from 'ioredis';

// Redis连接配置
export const redisConfig = {
  host: process.env.REDIS_HOST || 'localhost',
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD,
  db: parseInt(process.env.REDIS_DB || '0'),
  maxRetriesPerRequest: 3,
  retryStrategy: (times: number) => {
    const delay = Math.min(times * 50, 2000);
    return delay;
  }
};

// 创建Redis连接
export const createRedisConnection = () => {
  return new Redis(redisConfig);
};

// 默认队列选项
export const defaultQueueOptions: QueueOptions = {
  connection: createRedisConnection(),
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000,
    },
    removeOnComplete: 100,
    removeOnFail: 50,
  },
};

Bull/BullMQ设置

BullMQ队列设置

// queue-setup.ts
import { Queue, QueueOptions } from 'bullmq';
import { defaultQueueOptions } from './redis-connection';

export class JobQueue {
  private queue: Queue;

  constructor(name: string, options?: QueueOptions) {
    this.queue = new Queue(name, {
      ...defaultQueueOptions,
      ...options,
    });
  }

  getQueue(): Queue {
    return this.queue;
  }

  async close(): Promise<void> {
    await this.queue.close();
  }

  async getQueueStats(): Promise<any> {
    const counts = await this.queue.getJobCounts();
    const workers = await this.queue.getWorkers();
    
    return {
      waiting: counts.waiting,
      active: counts.active,
      completed: counts.completed,
      failed: counts.failed,
      delayed: counts.delayed,
      paused: counts.paused,
      workerCount: workers.length,
    };
  }
}

// 用法
const emailQueue = new JobQueue('emails');
const stats = await emailQueue.getQueueStats();
console.log(stats);

工作线程设置

// worker-setup.ts
import { Worker, WorkerOptions, Job } from 'bullmq';
import { createRedisConnection } from './redis-connection';

export class JobWorker {
  private worker: Worker;

  constructor(
    queueName: string,
    processor: (job: Job) => Promise<void>,
    options?: WorkerOptions
  ) {
    this.worker = new Worker(
      queueName,
      processor,
      {
        connection: createRedisConnection(),
        concurrency: 5,
        ...options,
      }
    );

    this.setupEventListeners();
  }

  private setupEventListeners(): void {
    this.worker.on('completed', (job: Job) => {
      console.log(`作业 ${job.id} 完成`);
    });

    this.worker.on('failed', (job: Job | undefined, error: Error) => {
      console.error(`作业 ${job?.id} 失败:`, error.message);
    });

    this.worker.on('error', (error: Error) => {
      console.error('工作线程错误:', error);
    });

    this.worker.on('stalled', (job: Job) => {
      console.warn(`作业 ${job.id} 停滞`);
    });
  }

  async close(): Promise<void> {
    await this.worker.close();
  }
}

// 用法
const worker = new JobWorker('emails', async (job) => {
  console.log('处理作业:', job.data);
  await sendEmail(job.data);
});

作业创建

基本作业创建

// job-creation.ts
import { JobQueue } from './queue-setup';

export class JobManager {
  constructor(private queue: JobQueue) {}

  async addJob(name: string, data: any, options?: any): Promise<Job> {
    const job = await this.queue.getQueue().add(name, data, options);
    console.log(`作业 ${job.id} 添加到队列`);
    return job;
  }

  async addBulkJobs(jobs: Array<{ name: string; data: any; opts?: any }>): Promise<Job[]> {
    const addedJobs = await this.queue.getQueue().addBulk(jobs);
    console.log(`添加 ${addedJobs.length} 个作业到队列`);
    return addedJobs;
  }
}

// 用法
const jobManager = new JobManager(emailQueue);

// 添加单个作业
await jobManager.addJob('send-email', {
  to: 'user@example.com',
  subject: '欢迎',
  body: '你好!'
});

// 添加批量作业
await jobManager.addBulkJobs([
  { name: 'send-email', data: { to: 'user1@example.com' } },
  { name: 'send-email', data: { to: 'user2@example.com' } },
  { name: 'send-email', data: { to: 'user3@example.com' } },
]);

带选项的作业

// job-options.ts
import { Job } from 'bullmq';

export interface JobOptions {
  // 重试选项
  attempts?: number;
  backoff?: {
    type: 'exponential' | 'fixed';
    delay: number;
  };
  
  // 优先级
  priority?: number;
  
  // 延迟
  delay?: number;
  
  // 生命周期
  removeOnComplete?: number | boolean;
  removeOnFail?: number | boolean;
  
  // 超时
  timeout?: number;
  
  // 自定义数据
  jobId?: string;
  repeat?: any;
}

export class AdvancedJobManager extends JobManager {
  async addJobWithOptions(
    name: string,
    data: any,
    options: JobOptions
  ): Promise<Job> {
    return await this.addJob(name, data, options);
  }
}

// 用法
const advancedManager = new AdvancedJobManager(emailQueue);

// 高优先级作业
await advancedManager.addJobWithOptions('send-email', emailData, {
  priority: 10,
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 1000,
  },
});

// 低优先级作业
await advancedManager.addJobWithOptions('send-email', emailData, {
  priority: 1,
  attempts: 3,
});

作业处理

基本处理器

// processor.ts
import { Job } from 'bullmq';

export interface EmailData {
  to: string;
  subject: string;
  body: string;
}

export async function emailProcessor(job: Job<EmailData>): Promise<void> {
  const { to, subject, body } = job.data;

  console.log(`发送邮件到 ${to}`);
  
  // 模拟邮件发送
  await new Promise(resolve => setTimeout(resolve, 1000));
  
  console.log(`邮件发送到 ${to}`);
  
  // 更新进度
  job.updateProgress(100);
}

// 在工作线程中的用法
const worker = new JobWorker('emails', emailProcessor);

进度跟踪

// progress-tracking.ts
import { Job } from 'bullmq';

export async function longRunningProcessor(job: Job): Promise<void> {
  const totalSteps = 100;
  
  for (let i = 0; i < totalSteps; i++) {
    // 处理步骤
    await processStep(i);
    
    // 更新进度
    await job.updateProgress(Math.round((i / totalSteps) * 100));
    
    // 日志进度
    job.log(`完成步骤 ${i + 1} 的 ${totalSteps}`);
  }
}

async function processStep(step: number): Promise<void> {
  // 模拟工作
  await new Promise(resolve => setTimeout(resolve, 50));
}

// 用法
const worker = new JobWorker('long-tasks', longRunningProcessor);

作业依赖

// job-dependencies.ts
import { JobQueue } from './queue-setup';

export class JobDependencyManager {
  constructor(private queue: JobQueue) {}

  async addChainedJobs(jobs: Array<{ name: string; data: any }>): Promise<Job> {
    const queue = this.queue.getQueue();
    
    // 创建作业链
    const chain = jobs.map(job => ({ name: job.name, data: job.data }));
    
    const job = await queue.addBulk(chain);
    
    // 添加依赖
    for (let i = 1; i < job.length; i++) {
      await job[i].addDependency(job[i - 1]);
    }
    
    return job[job.length - 1];
  }

  async addParallelJobs(
    parentJobId: string,
    jobs: Array<{ name: string; data: any }>
  ): Promise<Job[]> {
    const queue = this.queue.getQueue();
    const parentJob = await queue.getJob(parentJobId);
    
    if (!parentJob) {
      throw new Error('未找到父作业');
    }
    
    const addedJobs = await queue.addBulk(jobs);
    
    // 为所有作业添加父依赖
    for (const job of addedJobs) {
      await job.addDependency(parentJob);
    }
    
    return addedJobs;
  }
}

// 用法
const dependencyManager = new JobDependencyManager(emailQueue);

// 链式作业
await dependencyManager.addChainedJobs([
  { name: 'validate-email', data: { email: 'user@example.com' } },
  { name: 'send-email', data: { to: 'user@example.com' } },
  { name: 'log-email', data: { email: 'user@example.com' } },
]);

// 父作业后的并行作业
const parentJob = await jobManager.addJob('prepare-data', {});
await dependencyManager.addParallelJobs(parentJob.id!, [
  { name: 'process-a', data: {} },
  { name: 'process-b', data: {} },
  { name: 'process-c', data: {} },
]);

作业优先级

优先级队列

// priority-queue.ts
import { JobQueue } from './queue-setup';

export class PriorityJobManager extends JobManager {
  async addHighPriorityJob(name: string, data: any): Promise<Job> {
    return await this.addJob(name, data, { priority: 10 });
  }

  async addMediumPriorityJob(name: string, data: any): Promise<Job> {
    return await this.addJob(name, data, { priority: 5 });
  }

  async addLowPriorityJob(name: string, data: any): Promise<Job> {
    return await this.addJob(name, data, { priority: 1 });
  }

  async addPriorityJob(
    name: string,
    data: any,
    priority: number
  ): Promise<Job> {
    return await this.addJob(name, data, { priority });
  }
}

// 用法
const priorityManager = new PriorityJobManager(emailQueue);

// 添加不同优先级的作业
await priorityManager.addHighPriorityJob('urgent-email', urgentEmailData);
await priorityManager.addMediumPriorityJob('normal-email', normalEmailData);
await priorityManager.addLowPriorityJob('newsletter-email', newsletterData);

// 自定义优先级
await priorityManager.addPriorityJob('custom-priority', data, 7);

延迟作业

延迟作业创建

// delayed-jobs.ts
import { JobQueue } from './queue-setup';

export class DelayedJobManager extends JobManager {
  async addDelayedJob(
    name: string,
    data: any,
    delayMs: number
  ): Promise<Job> {
    return await this.addJob(name, data, { delay: delayMs });
  }

  async addDelayedJobByDate(
    name: string,
    data: any,
    executeAt: Date
  ): Promise<Job> {
    const delay = executeAt.getTime() - Date.now();
    if (delay < 0) {
      throw new Error('执行时间必须在未来');
    }
    return await this.addJob(name, data, { delay });
  }
}

// 用法
const delayedManager = new DelayedJobManager(emailQueue);

// 按毫秒延迟
await delayedManager.addDelayedJob('send-reminder', reminderData, 60000); // 1分钟

// 延迟到特定时间
const executeAt = new Date('2024-01-01T10:00:00Z');
await delayedManager.addDelayedJobByDate('send-scheduled-email', emailData, executeAt);

可重复作业

Cron作业

// repeatable-jobs.ts
import { JobQueue } from './queue-setup';

export class RepeatableJobManager extends JobManager {
  async addCronJob(
    name: string,
    data: any,
    cronPattern: string
  ): Promise<Job> {
    return await this.addJob(name, data, {
      repeat: {
        pattern: cronPattern,
      },
    });
  }

  async addRepeatJob(
    name: string,
    data: any,
    options: {
      every?: number;  // 毫秒
      cron?: string;
      startDate?: Date;
      endDate?: Date;
      tz?: string;
    }
  ): Promise<Job> {
    return await this.addJob(name, data, {
      repeat: options,
    });
  }

  async removeRepeatableJob(name: string, repeat: any): Promise<void> {
    await this.queue.getQueue().removeRepeatable(name, repeat);
  }
}

// 用法
const repeatableManager = new RepeatableJobManager(emailQueue);

// 每分钟运行
await repeatableManager.addRepeatJob('cleanup', {}, {
  every: 60000, // 1分钟
});

// 每天午夜运行
await repeatableManager.addCronJob('daily-report', {}, '0 0 * * *');

// 每周一上午9点运行
await repeatableManager.addRepeatJob('weekly-summary', {}, {
  cron: '0 9 * * 1',
  tz: 'America/New_York',
});

// 带开始和结束日期运行
await repeatableManager.addRepeatJob('campaign', {}, {
  every: 86400000, // 1天
  startDate: new Date('2024-01-01'),
  endDate: new Date('2024-12-31'),
});

// 移除可重复作业
await repeatableManager.removeRepeatableJob('cleanup', { every: 60000 });

作业生命周期

作业状态

// job-lifecycle.ts
import { JobQueue } from './queue-setup';

export class JobLifecycleManager {
  constructor(private queue: JobQueue) {}

  async getJob(jobId: string): Promise<Job | undefined> {
    return await this.queue.getQueue().getJob(jobId);
  }

  async getJobState(jobId: string): Promise<string> {
    const job = await this.getJob(jobId);
    if (!job) {
      return 'unknown';
    }
    return await job.getState();
  }

  async getJobsByState(state: string, start: number = 0, end: number = 10): Promise<Job[]> {
    return await this.queue.getQueue().getJobs([state], start, end);
  }

  async retryJob(jobId: string): Promise<Job> {
    const job = await this.getJob(jobId);
    if (!job) {
      throw new Error('未找到作业');
    }
    return await job.retry();
  }

  async promoteJob(jobId: string): Promise<Job> {
    const job = await this.getJob(jobId);
    if (!job) {
      throw new Error('未找到作业');
    }
    return await job.promote();
  }

  async removeJob(jobId: string): Promise<void> {
    const job = await this.getJob(jobId);
    if (!job) {
      throw new Error('未找到作业');
    }
    await job.remove();
  }
}

// 用法
const lifecycleManager = new JobLifecycleManager(emailQueue);

// 获取作业状态
const state = await lifecycleManager.getJobState('job-id');
console.log('作业状态:', state);

// 获取失败作业
const failedJobs = await lifecycleManager.getJobsByState('failed');

// 重试失败作业
await lifecycleManager.retryJob('failed-job-id');

// 提升延迟作业
await lifecycleManager.promoteJob('delayed-job-id');

错误处理

处理器中的错误处理

// error-handling.ts
import { Job } from 'bullmq';

export class ErrorHandler {
  static async safeProcessor<T>(
    job: Job<T>,
    handler: (job: Job<T>) => Promise<void>
  ): Promise<void> {
    try {
      await handler(job);
    } catch (error) {
      console.error('处理器错误:', error);
      
      // 添加错误元数据
      job.data.error = {
        message: (error as Error).message,
        stack: (error as Error).stack,
        timestamp: new Date().toISOString(),
      };
      
      // 重新抛出以让BullMQ处理重试
      throw error;
    }
  }
}

// 用法
const worker = new JobWorker('emails', async (job) => {
  await ErrorHandler.safeProcessor(job, async (j) => {
    await emailProcessor(j);
  });
});

自定义错误类型

// custom-errors.ts
export class RetryableError extends Error {
  constructor(message: string) {
    super(message);
    this.name = 'RetryableError';
  }
}

export class NonRetryableError extends Error {
  constructor(message: string) {
    super(message);
    this.name = 'NonRetryableError';
  }
}

// 在处理器中的用法
export async function smartProcessor(job: Job): Promise<void> {
  try {
    await processJob(job);
  } catch (error) {
    if (error instanceof RetryableError) {
      // 让BullMQ处理重试
      throw error;
    } else if (error instanceof NonRetryableError) {
      // 不重试,标记为失败
      throw error;
    } else {
      // 默认可重试
      throw new RetryableError(error.message);
    }
  }
}

重试

重试配置

// retry-config.ts
export interface RetryConfig {
  attempts: number;
  backoff: {
    type: 'exponential' | 'fixed';
    delay: number;
  };
}

export const retryConfigs: Record<string, RetryConfig> = {
  default: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000,
    },
  },
  aggressive: {
    attempts: 10,
    backoff: {
      type: 'exponential',
      delay: 1000,
    },
  },
  conservative: {
    attempts: 2,
    backoff: {
      type: 'fixed',
      delay: 5000,
    },
  },
};

// 用法
await jobManager.addJob('send-email', data, retryConfigs.aggressive);

自定义重试逻辑

// custom-retry.ts
import { Job } from 'bullmq';

export class CustomRetryHandler {
  static async shouldRetry(job: Job, error: Error): Promise<boolean> {
    const attemptsMade = job.attemptsMade;
    const maxAttempts = job.opts.attempts || 3;
    
    // 如果达到最大尝试次数,不重试
    if (attemptsMade >= maxAttempts) {
      return false;
    }
    
    // 不重试某些错误
    if (error.message.includes('invalid')) {
      return false;
    }
    
    // 在超时或网络错误时重试
    if (error.message.includes('timeout') || error.message.includes('network')) {
      return true;
    }
    
    // 默认重试
    return true;
  }

  static async getRetryDelay(job: Job): Promise<number> {
    const attemptsMade = job.attemptsMade;
    
    // 指数回退
    return Math.pow(2, attemptsMade) * 1000;
  }
}

// 在工作线程中的用法
const worker = new JobWorker('emails', async (job) => {
  try {
    await processJob(job);
  } catch (error) {
    if (await CustomRetryHandler.shouldRetry(job, error as Error)) {
      const delay = await CustomRetryHandler.getRetryDelay(job);
      await job.moveToDelayed(Date.now() + delay);
      throw error;
    } else {
      throw error;
    }
  }
});

队列监控

队列统计

// queue-monitoring.ts
import { JobQueue } from './queue-setup';

export class QueueMonitor {
  constructor(private queue: JobQueue) {}

  async getQueueMetrics(): Promise<any> {
    const queue = this.queue.getQueue();
    
    const [
      waiting,
      active,
      completed,
      failed,
      delayed,
      paused,
    ] = await Promise.all([
        queue.getWaiting(),
        queue.getActive(),
        queue.getCompleted(),
        queue.getFailed(),
        queue.getDelayed(),
        queue.isPaused(),
      ]);
    
    return {
      waiting: waiting.length,
      active: active.length,
      completed: completed.length,
      failed: failed.length,
      delayed: delayed.length,
      paused,
    };
  }

  async getJobCounts(): Promise<any> {
    return await this.queue.getQueue().getJobCounts();
  }

  async getWorkers(): Promise<any[]> {
    const workers = await this.queue.getQueue().getWorkers();
    return workers.map(worker => ({
      name: worker.name,
      pid: worker.process.pid,
      ready: worker.isReady(),
    }));
  }

  async getJobHistory(jobId: string): Promise<any> {
    const job = await this.queue.getQueue().getJob(jobId);
    if (!job) {
      return null;
    }
    
    return {
      id: job.id,
      name: job.name,
      data: job.data,
      progress: job.progress,
      attemptsMade: job.attemptsMade,
      failedReason: job.failedReason,
      processedOn: job.processedOn,
      finishedOn: job.finishedOn,
      stacktrace: job.stacktrace,
    };
  }
}

// 用法
const monitor = new QueueMonitor(emailQueue);
const metrics = await monitor.getQueueMetrics();
console.log('队列指标:', metrics);

扩展工作线程

水平扩展

// worker-scaling.ts
import { JobWorker } from './worker-setup';

export class WorkerPool {
  private workers: JobWorker[] = [];
  private concurrency: number;

  constructor(
    queueName: string,
    processor: (job: any) => Promise<void>,
    concurrency: number = 5,
    workerCount: number = 1
  ) {
    this.concurrency = concurrency;
    
    // 创建多个工作线程
    for (let i = 0; i < workerCount; i++) {
      const worker = new JobWorker(
        queueName,
        processor,
        { concurrency: this.concurrency }
      );
      this.workers.push(worker);
    }
  }

  async scaleUp(additionalWorkers: number): Promise<void> {
    for (let i = 0; i < additionalWorkers; i++) {
      const worker = new JobWorker(
        this.workers[0].name,
        this.workers[0].processor,
        { concurrency: this.concurrency }
      );
      this.workers.push(worker);
    }
  }

  async scaleDown(removeWorkers: number): Promise<void> {
    const workersToRemove = this.workers.splice(0, removeWorkers);
    for (const worker of workersToRemove) {
      await worker.close();
    }
  }

  async closeAll(): Promise<void> {
    for (const worker of this.workers) {
      await worker.close();
    }
    this.workers = [];
  }
}

// 用法
const workerPool = new WorkerPool('emails', emailProcessor, 5, 3);

// 在高峰时段扩展
await workerPool.scaleUp(2);

// 在非高峰时段缩减
await workerPool.scaleDown(1);

生产模式

优雅关闭

// graceful-shutdown.ts
import { JobWorker } from './worker-setup';

export class GracefulWorker extends JobWorker {
  private isShuttingDown = false;

  constructor(
    queueName: string,
    processor: (job: any) => Promise<void>,
    options?: any
  ) {
    super(queueName, processor, options);
    this.setupShutdownHandlers();
  }

  private setupShutdownHandlers(): void {
    process.on('SIGTERM', () => this.shutdown('SIGTERM'));
    process.on('SIGINT', () => this.shutdown('SIGINT'));
  }

  private async shutdown(signal: string): Promise<void> {
    console.log(`收到 ${signal}, 正在优雅关闭...`);
    this.isShuttingDown = true;
    
    // 停止接受新作业
    await this.worker.pause();
    
    // 等待活跃作业完成
    await this.worker.waitUntilReady();
    
    // 关闭工作线程
    await this.close();
    
    console.log('工作线程优雅关闭');
    process.exit(0);
  }
}

// 用法
const worker = new GracefulWorker('emails', emailProcessor);

健康检查

// health-check.ts
import { JobQueue } from './queue-setup';

export class HealthChecker {
  constructor(private queue: JobQueue) {}

  async checkHealth(): Promise<{
    healthy: boolean;
    metrics: any;
  }> {
    try {
      const metrics = await this.queue.getQueueStats();
      
      // 检查队列是否健康
      const healthy = (
        metrics.active < 100 &&  // 活跃作业不太多
        metrics.failed < 1000    // 失败作业不太多
      );
      
      return { healthy, metrics };
    } catch (error) {
      return {
        healthy: false,
        metrics: null,
        error: (error as Error).message,
      };
    }
  }
}

// 在Express中的用法
import express from 'express';

const app = express();
const healthChecker = new HealthChecker(emailQueue);

app.get('/health', async (req, res) => {
  const health = await healthChecker.checkHealth();
  res.status(health.healthy ? 200 : 503).json(health);
});

额外资源

最佳实践

队列配置

  • 为不同类型作业使用独立队列: 允许独立扩展和优先级设置
  • 设置适当的 removeOnCompleteremoveOnFail: 保留足够历史用于调试,但避免内存膨胀
  • 适当配置 attemptsbackoff: 在可靠性和处理时间之间平衡
  • 使用 prefetch 进行公平分配: 防止一个工作线程占用所有作业
  • 设置合理的 timeout: 防止卡住作业阻塞工作线程

工作线程配置

  • 设置适当的 concurrency: 太高可能压倒资源,太低浪费容量
  • 实现优雅关闭: 处理SIGTERM/SIGINT以完成进行中的作业
  • 监控工作线程健康: 跟踪活跃作业、失败作业和队列深度
  • 使用独立工作线程进程: 隔离失败并提高可靠性
  • 实现适当的错误处理: 区分可重试和不可重试错误

作业设计

  • 保持作业幂等: 作业应可安全重试,无副作用
  • 使用作业依赖: 在需要时确保正确顺序
  • 实现进度跟踪: 为长时间运行作业提供可见性
  • 添加作业元数据: 包含调试和监控的上下文
  • 设计为最终一致性: 作业可能不会立即执行

生产考虑

  • 使用Redis集群实现高可用性: 跨多个节点分布负载
  • 启用持久性: 配置Redis AOF或RDB以实现持久性
  • 监控Redis指标: 跟踪内存使用、连接和命令统计
  • 实现速率限制: 防止队列泛滥压倒工作线程
  • 使用作业优先级: 确保关键作业优先处理

安全

  • 使用Redis AUTH: 使用密码认证保护Redis实例
  • 启用TLS: 加密工作线程和Redis之间的连接
  • 使用连接池: 限制到Redis的连接数
  • 清理作业数据: 防止通过作业负载的注入攻击
  • 实现作业大小限制: 防止大作业耗尽内存

检查清单

设置和配置

  • [ ] 配置具有适当设置的Redis连接
  • [ ] 为不同类型作业设置独立队列
  • [ ] 配置默认作业选项(尝试次数、回退、超时)
  • [ ] 设置Redis持久性(AOF/RDB)
  • [ ] 配置Redis认证和TLS

队列管理

  • [ ] 实现队列监控和指标收集
  • [ ] 设置队列深度和失败率的警报
  • [ ] 配置作业清理策略(removeOnComplete/removeOnFail)
  • [ ] 实现队列暂停和排空程序
  • [ ] 设置队列备份和恢复程序

工作线程设置

  • [ ] 配置每个工作线程的适当并发性
  • [ ] 实现优雅关闭处理
  • [ ] 设置工作线程健康检查
  • [ ] 配置工作线程日志和错误跟踪
  • [ ] 实现工作线程重启策略

作业处理

  • [ ] 实现幂等作业处理器
  • [ ] 为长时间运行作业添加进度跟踪
  • [ ] 配置适当的重试策略
  • [ ] 实现失败作业的死信队列
  • [ ] 添加用于调试的作业元数据

监控和警报

  • [ ] 设置队列指标仪表板
  • [ ] 配置队列深度阈值的警报
  • [ ] 监控工作线程健康并重启失败
  • [ ] 跟踪作业完成率和延迟
  • [ ] 设置日志聚合和搜索

安全和合规

  • [ ] 启用Redis认证
  • [ ] 配置Redis连接的TLS
  • [ ] 实现作业数据清理
  • [ ] 设置队列操作访问控制
  • [ ] 审计队列访问和作业修改

测试和验证

  • [ ] 在负载下测试作业处理
  • [ ] 验证重试和回退行为
  • [ ] 测试优雅关闭场景
  • [ ] 验证错误处理和DLQ路由
  • [ ] 测试Redis故障转移场景

文档

  • [ ] 文档队列命名约定
  • [ ] 文档作业模式和数据结构
  • [ ] 创建常见问题运行手册
  • [ ] 文档扩展程序
  • [ ] 维护队列操作的API文档