背景作业处理Skill background-jobs

这个技能涉及异步任务处理、任务队列管理、工作者池、重试策略和监控,用于实现可靠的后台作业执行,支持各种框架和语言如Bull、Celery、Sidekiq,适用于ETL管道、数据处理、ML训练作业等场景。关键词:异步处理、任务队列、调度、工作者管理、重试策略、作业监控、死信队列、ML训练。

后端开发 0 次安装 0 次浏览 更新于 3/24/2026

name: background-jobs description: 背景作业处理模式,包括任务队列、定时作业、工作者池和重试策略。在实现异步处理、任务队列、工作者、任务队列、异步任务、延迟作业、重复作业、定时任务、ETL管道、数据处理、ML训练作业、Celery、Bull、Sidekiq、Resque、cron作业、重试逻辑、死信队列、DLQ、至少一次交付、精确一次交付、作业监控或工作者管理时使用。

背景作业

概述

背景作业允许在请求-响应周期外异步处理任务。此技能涵盖任务队列模式、调度、工作者管理、重试策略和监控,以实现跨不同框架和语言的可靠任务执行。

关键概念

任务队列模式

Bull队列 (Node.js/Redis):

import Queue, { Job, JobOptions } from "bull";
import { Redis } from "ioredis";

// 队列配置
interface QueueConfig {
  name: string;
  redis: Redis;
  defaultJobOptions?: JobOptions;
}

// 作业数据接口
interface EmailJobData {
  to: string;
  subject: string;
  template: string;
  context: Record<string, unknown>;
}

interface ImageProcessingJobData {
  imageId: string;
  operations: Array<{
    type: "resize" | "crop" | "compress";
    params: Record<string, unknown>;
  }>;
}

// 队列工厂
function createQueue<T>(config: QueueConfig): Queue.Queue<T> {
  const queue = new Queue<T>(config.name, {
    createClient: (type) => {
      switch (type) {
        case "client":
          return config.redis.duplicate();
        case "subscriber":
          return config.redis.duplicate();
        case "bclient":
          return config.redis.duplicate();
        default:
          return config.redis.duplicate();
      }
    },
    defaultJobOptions: {
      removeOnComplete: 100, // 保留最后100个已完成作业
      removeOnFail: 1000, // 保留最后1000个失败作业
      attempts: 3,
      backoff: {
        type: "exponential",
        delay: 2000,
      },
      ...config.defaultJobOptions,
    },
  });

  // 全局错误处理程序
  queue.on("error", (error) => {
    console.error(`队列 ${config.name} 错误:`, error);
  });

  return queue;
}

// 带类型处理器的邮件队列
const emailQueue = createQueue<EmailJobData>({
  name: "email",
  redis: new Redis(process.env.REDIS_URL),
});

// 定义处理器
emailQueue.process(async (job: Job<EmailJobData>) => {
  const { to, subject, template, context } = job.data;

  // 更新进度
  await job.progress(10);

  // 渲染模板
  const html = await renderTemplate(template, context);
  await job.progress(50);

  // 发送邮件
  await emailService.send({ to, subject, html });
  await job.progress(100);

  return { sent: true, messageId: `msg_${Date.now()}` };
});

// 添加作业选项
async function sendEmail(
  data: EmailJobData,
  options?: JobOptions,
): Promise<Job<EmailJobData>> {
  return emailQueue.add(data, {
    priority: options?.priority || 0,
    delay: options?.delay || 0,
    jobId: options?.jobId, // 用于去重
    ...options,
  });
}

// 批量作业添加
async function sendBulkEmails(
  emails: EmailJobData[],
): Promise<Job<EmailJobData>[]> {
  const jobs = emails.map((data, index) => ({
    data,
    opts: {
      jobId: `bulk_${Date.now()}_${index}`,
    },
  }));

  return emailQueue.addBulk(jobs);
}

Celery (Python):

from celery import Celery, Task
from celery.exceptions import MaxRetriesExceededError
from typing import Any, Dict, Optional
import logging

# Celery 配置
app = Celery('tasks')
app.config_from_object({
    'broker_url': 'redis://localhost:6379/0',
    'result_backend': 'redis://localhost:6379/1',
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json'],
    'timezone': 'UTC',
    'task_track_started': True,
    'task_time_limit': 300,  # 5分钟硬限制
    'task_soft_time_limit': 240,  # 4分钟软限制
    'worker_prefetch_multiplier': 4,
    'task_acks_late': True,  # 任务完成后确认
    'task_reject_on_worker_lost': True,
})

logger = logging.getLogger(__name__)

# 带重试逻辑的基础任务
class BaseTask(Task):
    autoretry_for = (Exception,)
    retry_kwargs = {'max_retries': 3}
    retry_backoff = True
    retry_backoff_max = 600  # 10分钟最大
    retry_jitter = True

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.error(f'任务 {self.name}[{task_id}] 失败: {exc}')

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.warning(f'任务 {self.name}[{task_id}] 重试中: {exc}')

    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f'任务 {self.name}[{task_id}] 成功')

# 邮件任务
@app.task(base=BaseTask, bind=True, name='send_email')
def send_email(
    self,
    to: str,
    subject: str,
    template: str,
    context: Dict[str, Any]
) -> Dict[str, Any]:
    try:
        # 更新状态
        self.update_state(state='PROGRESS', meta={'progress': 10})

        # 渲染模板
        html = render_template(template, context)
        self.update_state(state='PROGRESS', meta={'progress': 50})

        # 发送邮件
        message_id = email_service.send(to=to, subject=subject, html=html)
        self.update_state(state='PROGRESS', meta={'progress': 100})

        return {'sent': True, 'message_id': message_id}

    except ConnectionError as exc:
        raise self.retry(exc=exc, countdown=60)

# 带链的图像处理
@app.task(base=BaseTask, bind=True, name='process_image')
def process_image(self, image_id: str, operations: list) -> Dict[str, Any]:
    image = load_image(image_id)

    for i, op in enumerate(operations):
        progress = int((i + 1) / len(operations) * 100)
        self.update_state(state='PROGRESS', meta={'progress': progress, 'operation': op['type']})

        if op['type'] == 'resize':
            image = resize_image(image, **op['params'])
        elif op['type'] == 'crop':
            image = crop_image(image, **op['params'])
        elif op['type'] == 'compress':
            image = compress_image(image, **op['params'])

    url = save_image(image, image_id)
    return {'url': url, 'operations_count': len(operations)}

# 任务链示例
from celery import chain, group, chord

def process_order(order_id: str):
    """处理带链任务的订单。"""
    workflow = chain(
        validate_order.s(order_id),
        reserve_inventory.s(),
        process_payment.s(),
        send_confirmation.s(),
    )
    return workflow.apply_async()

def process_bulk_images(image_ids: list):
    """并行处理多个图像,然后聚合结果。"""
    workflow = chord(
        group(process_image.s(img_id, [{'type': 'resize', 'params': {'width': 800}}])
              for img_id in image_ids),
        aggregate_results.s()
    )
    return workflow.apply_async()

Sidekiq (Ruby):

# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  config.redis = { url: ENV['REDIS_URL'], network_timeout: 5 }
  config.death_handlers << ->(job, ex) do
    # 处理作业失败
    ErrorReporter.report(ex, job: job)
  end
end

Sidekiq.configure_client do |config|
  config.redis = { url: ENV['REDIS_URL'], network_timeout: 5 }
end

# app/workers/email_worker.rb
class EmailWorker
  include Sidekiq::Worker

  sidekiq_options queue: :default,
                  retry: 5,
                  backtrace: true,
                  dead: true

  sidekiq_retry_in do |count, exception|
    # 指数退避:1, 8, 27, 64, 125秒
    (count + 1) ** 3
  end

  sidekiq_retries_exhausted do |msg, exception|
    Rails.logger.error "作业 #{msg['jid']} 重试耗尽: #{exception.message}"
    DeadJobNotifier.notify(msg, exception)
  end

  def perform(to, subject, template, context)
    html = ApplicationController.render(
      template: template,
      locals: context.symbolize_keys
    )

    EmailService.send(to: to, subject: subject, html: html)
  end
end

# app/workers/batch_worker.rb
class BatchWorker
  include Sidekiq::Worker

  def perform(batch_id)
    batch = Batch.find(batch_id)

    batch.items.find_each do |item|
      ItemProcessor.perform_async(item.id)
    end
  end
end

# 使用Sidekiq批次(专业功能)
class ImportWorker
  include Sidekiq::Worker

  def perform(import_id)
    import = Import.find(import_id)

    batch = Sidekiq::Batch.new
    batch.description = "导入 #{import_id}"
    batch.on(:complete, ImportCallbacks, import_id: import_id)

    batch.jobs do
      import.rows.each_with_index do |row, index|
        ImportRowWorker.perform_async(import_id, index, row)
      end
    end
  end
end

class ImportCallbacks
  def on_complete(status, options)
    import = Import.find(options['import_id'])

    if status.failures.zero?
      import.update!(status: 'completed')
    else
      import.update!(status: 'completed_with_errors', error_count: status.failures)
    end
  end
end

定时作业和Cron模式

// Bull调度器
import Queue from "bull";

const scheduledQueue = new Queue("scheduled-tasks", process.env.REDIS_URL);

// 可重复作业
async function setupScheduledJobs(): Promise<void> {
  // 每小时清理
  await scheduledQueue.add(
    "cleanup",
    {},
    {
      repeat: { cron: "0 * * * *" }, // 每小时
      jobId: "cleanup-hourly",
    },
  );

  // 每天9点日报
  await scheduledQueue.add(
    "daily-report",
    {},
    {
      repeat: { cron: "0 9 * * *" },
      jobId: "daily-report",
    },
  );

  // 每5分钟
  await scheduledQueue.add(
    "health-check",
    {},
    {
      repeat: { every: 5 * 60 * 1000 }, // 5分钟毫秒
      jobId: "health-check",
    },
  );

  // 每周日午夜
  await scheduledQueue.add(
    "weekly-cleanup",
    {},
    {
      repeat: { cron: "0 0 * * 0" },
      jobId: "weekly-cleanup",
    },
  );
}

// 处理定时作业
scheduledQueue.process("cleanup", async (job) => {
  await cleanupOldRecords();
  return { cleaned: true };
});

scheduledQueue.process("daily-report", async (job) => {
  const report = await generateDailyReport();
  await sendReportEmail(report);
  return { reportId: report.id };
});

// 列出定时作业
async function getScheduledJobs(): Promise<
  Array<{ name: string; next: Date; cron: string }>
> {
  const repeatableJobs = await scheduledQueue.getRepeatableJobs();

  return repeatableJobs.map((job) => ({
    name: job.name,
    next: new Date(job.next),
    cron: job.cron || `每 ${job.every}毫秒`,
  }));
}

// 移除定时作业
async function removeScheduledJob(jobId: string): Promise<void> {
  const jobs = await scheduledQueue.getRepeatableJobs();
  const job = jobs.find((j) => j.id === jobId);

  if (job) {
    await scheduledQueue.removeRepeatableByKey(job.key);
  }
}
# Celery Beat调度器
from celery import Celery
from celery.schedules import crontab

app = Celery('tasks')

app.conf.beat_schedule = {
    # 每小时
    'cleanup-hourly': {
        'task': 'tasks.cleanup',
        'schedule': crontab(minute=0),  # 每小时整点
    },

    # 每天9点
    'daily-report': {
        'task': 'tasks.daily_report',
        'schedule': crontab(hour=9, minute=0),
    },

    # 每5分钟
    'health-check': {
        'task': 'tasks.health_check',
        'schedule': 300.0,  # 5分钟秒数
    },

    # 每周日午夜
    'weekly-cleanup': {
        'task': 'tasks.weekly_cleanup',
        'schedule': crontab(hour=0, minute=0, day_of_week=0),
    },

    # 每月第一天6点
    'monthly-report': {
        'task': 'tasks.monthly_report',
        'schedule': crontab(hour=6, minute=0, day_of_month=1),
    },

    # 带参数
    'check-expiring-subscriptions': {
        'task': 'tasks.check_subscriptions',
        'schedule': crontab(hour=8, minute=0),
        'args': ('expiring',),
        'kwargs': {'days_ahead': 7},
    },
}

# 动态数据库调度
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json

def create_scheduled_task(name: str, task: str, cron: str, args: list = None, kwargs: dict = None):
    """动态创建定时任务。"""
    # 解析cron表达式
    minute, hour, day_of_month, month, day_of_week = cron.split()

    schedule, _ = CrontabSchedule.objects.get_or_create(
        minute=minute,
        hour=hour,
        day_of_month=day_of_month,
        month_of_year=month,
        day_of_week=day_of_week,
    )

    PeriodicTask.objects.update_or_create(
        name=name,
        defaults={
            'task': task,
            'crontab': schedule,
            'args': json.dumps(args or []),
            'kwargs': json.dumps(kwargs or {}),
            'enabled': True,
        },
    )

工作者池管理

import Queue, { Job } from "bull";
import os from "os";

interface WorkerPoolConfig {
  concurrency: number;
  limiter?: {
    max: number;
    duration: number;
  };
}

class WorkerPool {
  private queues: Map<string, Queue.Queue> = new Map();
  private isShuttingDown = false;

  constructor(private config: WorkerPoolConfig) {
    // 优雅关闭
    process.on("SIGTERM", () => this.shutdown());
    process.on("SIGINT", () => this.shutdown());
  }

  registerQueue<T>(
    name: string,
    processor: (job: Job<T>) => Promise<unknown>,
  ): Queue.Queue<T> {
    const queue = new Queue<T>(name, process.env.REDIS_URL!, {
      limiter: this.config.limiter,
    });

    // 带并发处理
    queue.process(this.config.concurrency, async (job: Job<T>) => {
      if (this.isShuttingDown) {
        throw new Error("工作者关闭中");
      }
      return processor(job);
    });

    // 事件处理程序
    queue.on("completed", (job, result) => {
      console.log(`作业 ${job.id} 完成:`, result);
    });

    queue.on("failed", (job, err) => {
      console.error(`作业 ${job?.id} 失败:`, err);
    });

    queue.on("stalled", (job) => {
      console.warn(`作业 ${job} 停滞`);
    });

    this.queues.set(name, queue);
    return queue;
  }

  async shutdown(): Promise<void> {
    console.log("启动优雅关闭...");
    this.isShuttingDown = true;

    // 停止接受新作业
    const closePromises = Array.from(this.queues.values()).map(
      async (queue) => {
        await queue.pause(true); // 暂停并等待活动作业
        await queue.close();
      },
    );

    await Promise.all(closePromises);
    console.log("所有队列关闭");
    process.exit(0);
  }

  async getStats(): Promise<Record<string, QueueStats>> {
    const stats: Record<string, QueueStats> = {};

    for (const [name, queue] of this.queues) {
      const [waiting, active, completed, failed, delayed] = await Promise.all([
        queue.getWaitingCount(),
        queue.getActiveCount(),
        queue.getCompletedCount(),
        queue.getFailedCount(),
        queue.getDelayedCount(),
      ]);

      stats[name] = { waiting, active, completed, failed, delayed };
    }

    return stats;
  }
}

interface QueueStats {
  waiting: number;
  active: number;
  completed: number;
  failed: number;
  delayed: number;
}

// 使用
const pool = new WorkerPool({
  concurrency: os.cpus().length,
  limiter: {
    max: 100, // 每秒最多100个作业
    duration: 1000,
  },
});

pool.registerQueue<EmailJobData>("email", async (job) => {
  await sendEmail(job.data);
});

pool.registerQueue<ImageProcessingJobData>("images", async (job) => {
  await processImage(job.data);
});
# Celery工作者管理
from celery import Celery
from celery.signals import worker_process_init, worker_shutdown
import multiprocessing

app = Celery('tasks')

# 工作者配置
app.conf.update(
    worker_concurrency=multiprocessing.cpu_count(),
    worker_prefetch_multiplier=2,  # 每个工作者预取2个任务
    worker_max_tasks_per_child=1000,  # 每1000个任务重启工作者
    worker_max_memory_per_child=200000,  # 200MB限制
    task_acks_late=True,
    task_reject_on_worker_lost=True,
)

# 每个工作者初始化
@worker_process_init.connect
def init_worker(**kwargs):
    """为每个工作者进程初始化资源。"""
    # 初始化数据库连接池
    db.connect()
    # 预热缓存
    cache.warm_up()

@worker_shutdown.connect
def cleanup_worker(**kwargs):
    """工作者关闭时清理资源。"""
    db.close()
    cache.flush()

# 任务路由专用工作者
app.conf.task_routes = {
    'tasks.send_email': {'queue': 'email'},
    'tasks.process_image': {'queue': 'images'},
    'tasks.heavy_computation': {'queue': 'compute'},
    'tasks.*': {'queue': 'default'},
}

# 队列特定工作者命令:
# celery -A tasks worker -Q email --concurrency=4
# celery -A tasks worker -Q images --concurrency=2
# celery -A tasks worker -Q compute --concurrency=1

# Celery自动缩放
app.conf.worker_autoscaler = 'celery.worker.autoscale:Autoscaler'
app.conf.worker_autoscale_max = 10
app.conf.worker_autoscale_min = 2

作业优先级和公平性

// Bull优先级队列
interface PriorityJobData {
  type: string;
  payload: unknown;
  priority: "critical" | "high" | "normal" | "low";
}

const priorityMap = {
  critical: 1, // 最高优先级(先处理)
  high: 5,
  normal: 10,
  low: 20,
};

async function addPriorityJob(
  data: PriorityJobData,
): Promise<Job<PriorityJobData>> {
  return queue.add(data, {
    priority: priorityMap[data.priority],
    // 关键作业不等待
    delay: data.priority === "critical" ? 0 : undefined,
  });
}

// 多队列公平调度
class FairScheduler {
  private queues: Map<string, Queue.Queue> = new Map();
  private weights: Map<string, number> = new Map();

  constructor(queueConfigs: Array<{ name: string; weight: number }>) {
    for (const config of queueConfigs) {
      const queue = new Queue(config.name, process.env.REDIS_URL!);
      this.queues.set(config.name, queue);
      this.weights.set(config.name, config.weight);
    }
  }

  // 加权轮询处理
  async process(
    handler: (queueName: string, job: Job) => Promise<void>,
  ): Promise<void> {
    const totalWeight = Array.from(this.weights.values()).reduce(
      (a, b) => a + b,
      0,
    );

    for (const [name, queue] of this.queues) {
      const weight = this.weights.get(name)!;
      const concurrency = Math.max(1, Math.floor((weight / totalWeight) * 10));

      queue.process(concurrency, async (job) => {
        await handler(name, job);
      });
    }
  }
}

// 使用:优先处理高级客户
const scheduler = new FairScheduler([
  { name: "premium", weight: 5 }, // 50%容量
  { name: "standard", weight: 3 }, // 30%容量
  { name: "free", weight: 2 }, // 20%容量
]);

await scheduler.process(async (queueName, job) => {
  console.log(`处理 ${queueName} 作业:`, job.id);
  await processJob(job);
});

幂等性和重试策略

import Queue, { Job, JobOptions } from "bull";
import { createHash } from "crypto";

// 幂等键生成
function generateIdempotencyKey(data: unknown): string {
  const hash = createHash("sha256");
  hash.update(JSON.stringify(data));
  return hash.digest("hex");
}

// 幂等作业处理器
class IdempotentProcessor<T> {
  private processedKeys: Set<string> = new Set();
  private redis: Redis;

  constructor(
    private queue: Queue.Queue<T>,
    redis: Redis,
  ) {
    this.redis = redis;
  }

  async process(handler: (job: Job<T>) => Promise<unknown>): Promise<void> {
    this.queue.process(async (job: Job<T>) => {
      const idempotencyKey = job.opts.jobId || generateIdempotencyKey(job.data);

      // 检查是否已处理
      const existing = await this.redis.get(`processed:${idempotencyKey}`);
      if (existing) {
        console.log(`作业 ${job.id} 已处理,跳过`);
        return JSON.parse(existing);
      }

      // 处理作业
      const result = await handler(job);

      // 标记为已处理,带TTL
      await this.redis.setex(
        `processed:${idempotencyKey}`,
        86400, // 24小时
        JSON.stringify(result),
      );

      return result;
    });
  }
}

// 自定义重试策略
interface RetryStrategy {
  type: "exponential" | "linear" | "fixed" | "custom";
  baseDelay: number;
  maxDelay?: number;
  maxRetries: number;
  jitter?: boolean;
  retryOn?: (error: Error) => boolean;
}

function calculateDelay(strategy: RetryStrategy, attempt: number): number {
  let delay: number;

  switch (strategy.type) {
    case "exponential":
      delay = strategy.baseDelay * Math.pow(2, attempt - 1);
      break;
    case "linear":
      delay = strategy.baseDelay * attempt;
      break;
    case "fixed":
      delay = strategy.baseDelay;
      break;
    default:
      delay = strategy.baseDelay;
  }

  // 应用最大延迟上限
  if (strategy.maxDelay) {
    delay = Math.min(delay, strategy.maxDelay);
  }

  // 添加抖动(最多20%变化)
  if (strategy.jitter) {
    const jitterFactor = 0.8 + Math.random() * 0.4; // 0.8到1.2
    delay = Math.floor(delay * jitterFactor);
  }

  return delay;
}

// 带死信队列重试
class RetryableQueue<T> {
  private mainQueue: Queue.Queue<T>;
  private dlq: Queue.Queue<T>;
  private strategy: RetryStrategy;

  constructor(name: string, strategy: RetryStrategy) {
    this.mainQueue = new Queue<T>(name, process.env.REDIS_URL!);
    this.dlq = new Queue<T>(`${name}-dlq`, process.env.REDIS_URL!);
    this.strategy = strategy;
  }

  async process(handler: (job: Job<T>) => Promise<unknown>): Promise<void> {
    this.mainQueue.process(async (job: Job<T>) => {
      const attempts = job.attemptsMade;

      try {
        return await handler(job);
      } catch (error) {
        const err = error as Error;

        // 检查错误是否可重试
        if (this.strategy.retryOn && !this.strategy.retryOn(err)) {
          await this.moveToDLQ(job, err);
          throw err;
        }

        // 检查最大重试次数
        if (attempts >= this.strategy.maxRetries) {
          await this.moveToDLQ(job, err);
          throw err;
        }

        // 带计算延迟重试
        const delay = calculateDelay(this.strategy, attempts + 1);
        throw new Error(`${delay}毫秒后重试: ${err.message}`);
      }
    });
  }

  private async moveToDLQ(job: Job<T>, error: Error): Promise<void> {
    await this.dlq.add({
      originalJob: job.data,
      error: error.message,
      failedAt: new Date().toISOString(),
      attempts: job.attemptsMade,
    } as unknown as T);
  }

  async retryFromDLQ(jobId: string): Promise<void> {
    const job = await this.dlq.getJob(jobId);
    if (!job) return;

    const dlqData = job.data as unknown as { originalJob: T };
    await this.mainQueue.add(dlqData.originalJob);
    await job.remove();
  }
}

作业监控和死作业

import Queue, { Job, JobCounts, JobStatus } from "bull";
import { EventEmitter } from "events";

interface JobMetrics {
  queue: string;
  counts: JobCounts;
  latency: {
    avg: number;
    p50: number;
    p95: number;
    p99: number;
  };
  throughput: number; // 每分钟作业数
  errorRate: number;
}

class JobMonitor extends EventEmitter {
  private queues: Queue.Queue[] = [];
  private metricsHistory: Map<string, number[]> = new Map();

  addQueue(queue: Queue.Queue): void {
    this.queues.push(queue);

    queue.on("completed", (job, result) => {
      this.recordMetric(queue.name, "completed", job);
      this.emit("job:completed", { queue: queue.name, job, result });
    });

    queue.on("failed", (job, err) => {
      this.recordMetric(queue.name, "failed", job!);
      this.emit("job:failed", { queue: queue.name, job, error: err });

      // 高失败率告警
      this.checkErrorRate(queue.name);
    });

    queue.on("stalled", (job) => {
      this.emit("job:stalled", { queue: queue.name, jobId: job });
    });
  }

  private recordMetric(queueName: string, type: string, job: Job): void {
    const duration = Date.now() - job.timestamp;
    const key = `${queueName}:${type}:duration`;

    const history = this.metricsHistory.get(key) || [];
    history.push(duration);

    // 保留最近1000个样本
    if (history.length > 1000) {
      history.shift();
    }

    this.metricsHistory.set(key, history);
  }

  private checkErrorRate(queueName: string): void {
    const completed =
      this.metricsHistory.get(`${queueName}:completed:duration`)?.length || 0;
    const failed =
      this.metricsHistory.get(`${queueName}:failed:duration`)?.length || 0;

    if (completed + failed > 10) {
      const errorRate = failed / (completed + failed);
      if (errorRate > 0.1) {
        // > 10%失败率
        this.emit("alert:high_error_rate", { queue: queueName, errorRate });
      }
    }
  }

  async getMetrics(queueName: string): Promise<JobMetrics> {
    const queue = this.queues.find((q) => q.name === queueName);
    if (!queue) throw new Error(`队列 ${queueName} 未找到`);

    const counts = await queue.getJobCounts();
    const durations =
      this.metricsHistory.get(`${queueName}:completed:duration`) || [];

    return {
      queue: queueName,
      counts,
      latency: this.calculateLatencyPercentiles(durations),
      throughput: this.calculateThroughput(durations),
      errorRate: this.calculateErrorRate(queueName),
    };
  }

  private calculateLatencyPercentiles(
    durations: number[],
  ): JobMetrics["latency"] {
    if (durations.length === 0) {
      return { avg: 0, p50: 0, p95: 0, p99: 0 };
    }

    const sorted = [...durations].sort((a, b) => a - b);
    const avg = sorted.reduce((a, b) => a + b, 0) / sorted.length;

    return {
      avg: Math.round(avg),
      p50: sorted[Math.floor(sorted.length * 0.5)],
      p95: sorted[Math.floor(sorted.length * 0.95)],
      p99: sorted[Math.floor(sorted.length * 0.99)],
    };
  }

  private calculateThroughput(durations: number[]): number {
    // 最近一分钟完成的作业
    const oneMinuteAgo = Date.now() - 60000;
    const recentJobs = durations.filter((_, i) => i > durations.length - 100);
    return recentJobs.length;
  }

  private calculateErrorRate(queueName: string): number {
    const completed =
      this.metricsHistory.get(`${queueName}:completed:duration`)?.length || 0;
    const failed =
      this.metricsHistory.get(`${queueName}:failed:duration`)?.length || 0;
    const total = completed + failed;
    return total > 0 ? failed / total : 0;
  }

  // 死作业管理
  async getDeadJobs(queueName: string, limit: number = 100): Promise<Job[]> {
    const queue = this.queues.find((q) => q.name === queueName);
    if (!queue) throw new Error(`队列 ${queueName} 未找到`);

    return queue.getFailed(0, limit);
  }

  async retryDeadJob(queueName: string, jobId: string): Promise<void> {
    const queue = this.queues.find((q) => q.name === queueName);
    if (!queue) throw new Error(`队列 ${queueName} 未找到`);

    const job = await queue.getJob(jobId);
    if (!job) throw new Error(`作业 ${jobId} 未找到`);

    await job.retry();
  }

  async retryAllDeadJobs(queueName: string): Promise<number> {
    const deadJobs = await this.getDeadJobs(queueName);
    let retried = 0;

    for (const job of deadJobs) {
      try {
        await job.retry();
        retried++;
      } catch (error) {
        console.error(`重试作业 ${job.id} 失败:`, error);
      }
    }

    return retried;
  }

  async cleanDeadJobs(
    queueName: string,
    olderThan: number = 86400000,
  ): Promise<number> {
    const queue = this.queues.find((q) => q.name === queueName);
    if (!queue) throw new Error(`队列 ${queueName} 未找到`);

    const cleaned = await queue.clean(olderThan, "failed");
    return cleaned.length;
  }
}

// 仪表板API端点
import express from "express";

function createMonitoringRouter(monitor: JobMonitor): express.Router {
  const router = express.Router();

  router.get("/queues/:name/metrics", async (req, res) => {
    try {
      const metrics = await monitor.getMetrics(req.params.name);
      res.json(metrics);
    } catch (error) {
      res.status(404).json({ error: (error as Error).message });
    }
  });

  router.get("/queues/:name/dead", async (req, res) => {
    const limit = parseInt(req.query.limit as string) || 100;
    const jobs = await monitor.getDeadJobs(req.params.name, limit);
    res.json(
      jobs.map((j) => ({
        id: j.id,
        data: j.data,
        failedReason: j.failedReason,
        attemptsMade: j.attemptsMade,
        timestamp: j.timestamp,
      })),
    );
  });

  router.post("/queues/:name/dead/:jobId/retry", async (req, res) => {
    try {
      await monitor.retryDeadJob(req.params.name, req.params.jobId);
      res.json({ success: true });
    } catch (error) {
      res.status(400).json({ error: (error as Error).message });
    }
  });

  router.post("/queues/:name/dead/retry-all", async (req, res) => {
    const retried = await monitor.retryAllDeadJobs(req.params.name);
    res.json({ retried });
  });

  router.delete("/queues/:name/dead", async (req, res) => {
    const olderThan = parseInt(req.query.olderThan as string) || 86400000;
    const cleaned = await monitor.cleanDeadJobs(req.params.name, olderThan);
    res.json({ cleaned });
  });

  return router;
}

数据管道作业

ETL调度和编排:

# Airflow风格任务依赖
from celery import chain, group

@app.task
def extract_from_source(source_id: str):
    """从源系统提取数据。"""
    data = fetch_from_api(source_id)
    return {'source_id': source_id, 'records': data}

@app.task
def transform_data(extract_result: dict):
    """转换提取的数据。"""
    records = extract_result['records']
    transformed = [normalize_record(r) for r in records]
    return {'source_id': extract_result['source_id'], 'records': transformed}

@app.task
def load_to_warehouse(transform_result: dict):
    """加载转换数据到仓库。"""
    warehouse.bulk_insert(transform_result['records'])
    return {'loaded': len(transform_result['records'])}

# 带链的ETL管道
def run_etl_pipeline(source_id: str):
    pipeline = chain(
        extract_from_source.s(source_id),
        transform_data.s(),
        load_to_warehouse.s(),
    )
    return pipeline.apply_async()

# 多源并行提取
def run_multi_source_etl(source_ids: list):
    pipeline = chain(
        group(extract_from_source.s(sid) for sid in source_ids),
        # 扇入:转换所有结果
        group(transform_data.s() for _ in source_ids),
        # 聚合和加载
        aggregate_and_load.s(),
    )
    return pipeline.apply_async()
// Bull基础数据管道
import Queue from "bull";

interface PipelineStage<T, U> {
  name: string;
  queue: Queue.Queue<T>;
  process: (data: T) => Promise<U>;
  nextStage?: PipelineStage<U, any>;
}

class DataPipeline {
  private stages: Map<string, PipelineStage<any, any>> = new Map();

  addStage<T, U>(stage: PipelineStage<T, U>): void {
    this.stages.set(stage.name, stage);

    // 处理并转发到下一阶段
    stage.queue.process(async (job) => {
      const result = await stage.process(job.data);

      if (stage.nextStage) {
        await stage.nextStage.queue.add(result, {
          jobId: `${stage.nextStage.name}-${job.id}`,
        });
      }

      return result;
    });
  }

  async start(initialData: any, startStage: string): Promise<void> {
    const stage = this.stages.get(startStage);
    if (!stage) throw new Error(`阶段 ${startStage} 未找到`);

    await stage.queue.add(initialData);
  }
}

// 使用
const extractQueue = new Queue("extract", redisUrl);
const transformQueue = new Queue("transform", redisUrl);
const loadQueue = new Queue("load", redisUrl);

const pipeline = new DataPipeline();

pipeline.addStage({
  name: "extract",
  queue: extractQueue,
  process: async (sourceId) => fetchFromSource(sourceId),
  nextStage: {
    name: "transform",
    queue: transformQueue,
    process: async (data) => transformData(data),
    nextStage: {
      name: "load",
      queue: loadQueue,
      process: async (data) => loadToWarehouse(data),
    },
  },
});

await pipeline.start("source-123", "extract");

ML训练作业

带检查点的长运行模型训练:

# 分布式ML训练作业
from celery import Task
import torch
from pathlib import Path

class TrainingTask(Task):
    autoretry_for = (RuntimeError,)
    max_retries = 3

    def __init__(self):
        self.checkpoint_dir = Path('/checkpoints')
        self.checkpoint_dir.mkdir(exist_ok=True)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """失败时保存检查点恢复。"""
        model = kwargs.get('model_state')
        if model:
            checkpoint_path = self.checkpoint_dir / f'{task_id}.pt'
            torch.save(model, checkpoint_path)

@app.task(base=TrainingTask, bind=True, time_limit=7200)
def train_model(
    self,
    model_id: str,
    dataset_id: str,
    config: dict,
    resume_from: str = None
):
    """带检查点的ML模型训练。"""

    # 如果恢复,加载检查点
    if resume_from:
        checkpoint = torch.load(resume_from)
        model = checkpoint['model']
        optimizer = checkpoint['optimizer']
        start_epoch = checkpoint['epoch']
    else:
        model = create_model(config)
        optimizer = create_optimizer(model, config)
        start_epoch = 0

    dataset = load_dataset(dataset_id)

    for epoch in range(start_epoch, config['epochs']):
        # 更新进度
        progress = (epoch / config['epochs']) * 100
        self.update_state(
            state='PROGRESS',
            meta={'epoch': epoch, 'progress': progress}
        )

        # 训练一个周期
        metrics = train_epoch(model, dataset, optimizer)

        # 每N周期检查点
        if epoch % config.get('checkpoint_interval', 10) == 0:
            checkpoint_path = self.checkpoint_dir / f'{model_id}_epoch_{epoch}.pt'
            torch.save({
                'model': model.state_dict(),
                'optimizer': optimizer.state_dict(),
                'epoch': epoch,
                'metrics': metrics,
            }, checkpoint_path)

    # 保存最终模型
    model_path = save_model(model, model_id)
    return {'model_path': model_path, 'final_metrics': metrics}

# 并行作业超参数调优
def hyperparameter_search(model_id: str, param_grid: dict):
    """运行并行超参数搜索。"""
    from itertools import product

    # 生成参数组合
    keys = param_grid.keys()
    values = param_grid.values()
    combinations = [dict(zip(keys, v)) for v in product(*values)]

    # 启动并行训练作业
    jobs = group(
        train_model.s(
            model_id=f'{model_id}_trial_{i}',
            dataset_id='train_dataset',
            config=params
        )
        for i, params in enumerate(combinations)
    )

    # 聚合结果并选择最佳模型
    workflow = chain(jobs, select_best_model.s())
    return workflow.apply_async()
// 分布式ML推理管道
import Queue, { Job } from "bull";

interface InferenceJob {
  modelId: string;
  batchId: string;
  inputs: Array<{ id: string; data: any }>;
}

interface InferenceResult {
  batchId: string;
  predictions: Array<{ id: string; prediction: any; confidence: number }>;
}

const inferenceQueue = new Queue<InferenceJob>("ml-inference", redisUrl, {
  defaultJobOptions: {
    attempts: 2,
    backoff: { type: "fixed", delay: 5000 },
  },
  limiter: {
    max: 10, // 最多10个并发推理作业
    duration: 1000,
  },
});

// 批量推理处理器
inferenceQueue.process(4, async (job: Job<InferenceJob>) => {
  const { modelId, batchId, inputs } = job.data;

  // 加载模型(缓存)
  const model = await loadModel(modelId);

  const predictions: InferenceResult["predictions"] = [];

  for (let i = 0; i < inputs.length; i++) {
    const input = inputs[i];

    // 更新进度
    await job.progress((i / inputs.length) * 100);

    // 运行推理
    const prediction = await model.predict(input.data);
    predictions.push({
      id: input.id,
      prediction: prediction.result,
      confidence: prediction.confidence,
    });
  }

  return { batchId, predictions };
});

// 大数据集批量分割器
async function runBatchInference(
  modelId: string,
  dataset: Array<{ id: string; data: any }>,
  batchSize: number = 100,
): Promise<string[]> {
  const batches: InferenceJob[] = [];

  for (let i = 0; i < dataset.length; i += batchSize) {
    batches.push({
      modelId,
      batchId: `batch_${i / batchSize}`,
      inputs: dataset.slice(i, i + batchSize),
    });
  }

  const jobs = await inferenceQueue.addBulk(
    batches.map((batch, idx) => ({
      data: batch,
      opts: { jobId: `inference_${modelId}_${idx}` },
    })),
  );

  return jobs.map((j) => j.id!);
}

作业监控和可观测性

指标、跟踪和告警:

import { EventEmitter } from "events";
import Queue, { Job } from "bull";

interface ObservabilityConfig {
  metricsInterval: number; // 每N毫秒发出指标
  alertThresholds: {
    errorRate: number;
    queueDepth: number;
    latencyP99: number;
  };
}

class JobObserver extends EventEmitter {
  private queues: Map<string, Queue.Queue> = new Map();
  private metrics: Map<string, QueueMetrics> = new Map();
  private metricsInterval: NodeJS.Timeout | null = null;

  constructor(private config: ObservabilityConfig) {
    super();
  }

  observe(queue: Queue.Queue): void {
    this.queues.set(queue.name, queue);
    this.metrics.set(queue.name, this.emptyMetrics());

    // 仪器化队列事件
    queue.on("completed", (job, result) => {
      this.recordCompletion(queue.name, job, result);
    });

    queue.on("failed", (job, err) => {
      this.recordFailure(queue.name, job!, err);
    });

    queue.on("stalled", (job) => {
      this.recordStalled(queue.name, job);
    });

    queue.on("waiting", (jobId) => {
      this.recordWaiting(queue.name, jobId);
    });

    // 启动指标发出
    if (!this.metricsInterval) {
      this.startMetricsEmission();
    }
  }

  private emptyMetrics(): QueueMetrics {
    return {
      completed: 0,
      failed: 0,
      stalled: 0,
      waiting: 0,
      processing: 0,
      latencies: [],
      errors: [],
    };
  }

  private recordCompletion(queueName: string, job: Job, result: any): void {
    const metrics = this.metrics.get(queueName)!;
    metrics.completed++;

    const latency = Date.now() - job.timestamp;
    metrics.latencies.push(latency);

    // 发出跟踪跨度
    this.emit("trace", {
      queue: queueName,
      jobId: job.id,
      operation: "job.completed",
      duration: latency,
      result,
    });
  }

  private recordFailure(queueName: string, job: Job, error: Error): void {
    const metrics = this.metrics.get(queueName)!;
    metrics.failed++;
    metrics.errors.push({
      jobId: job.id!,
      error: error.message,
      timestamp: Date.now(),
    });

    // 错误率阈值告警
    const errorRate =
      metrics.failed / (metrics.completed + metrics.failed || 1);
    if (errorRate > this.config.alertThresholds.errorRate) {
      this.emit("alert", {
        type: "high_error_rate",
        queue: queueName,
        errorRate,
        threshold: this.config.alertThresholds.errorRate,
      });
    }

    // 发出错误跟踪
    this.emit("trace", {
      queue: queueName,
      jobId: job.id,
      operation: "job.failed",
      error: error.message,
      stack: error.stack,
    });
  }

  private recordStalled(queueName: string, jobId: string): void {
    const metrics = this.metrics.get(queueName)!;
    metrics.stalled++;

    this.emit("alert", {
      type: "job_stalled",
      queue: queueName,
      jobId,
    });
  }

  private recordWaiting(queueName: string, jobId: string): void {
    const metrics = this.metrics.get(queueName)!;
    metrics.waiting++;
  }

  private startMetricsEmission(): void {
    this.metricsInterval = setInterval(async () => {
      for (const [queueName, queue] of this.queues) {
        const metrics = this.metrics.get(queueName)!;

        // 获取当前队列状态
        const counts = await queue.getJobCounts();

        // 计算百分位数
        const sorted = [...metrics.latencies].sort((a, b) => a - b);
        const p50 = sorted[Math.floor(sorted.length * 0.5)] || 0;
        const p95 = sorted[Math.floor(sorted.length * 0.95)] || 0;
        const p99 = sorted[Math.floor(sorted.length * 0.99)] || 0;

        // 发出指标
        this.emit("metrics", {
          queue: queueName,
          timestamp: Date.now(),
          counts,
          latency: {
            p50,
            p95,
            p99,
          },
          throughput: metrics.completed,
          errorRate: metrics.failed / (metrics.completed + metrics.failed || 1),
        });

        // 队列深度告警
        if (counts.waiting > this.config.alertThresholds.queueDepth) {
          this.emit("alert", {
            type: "high_queue_depth",
            queue: queueName,
            depth: counts.waiting,
            threshold: this.config.alertThresholds.queueDepth,
          });
        }

        // 延迟告警
        if (p99 > this.config.alertThresholds.latencyP99) {
          this.emit("alert", {
            type: "high_latency",
            queue: queueName,
            p99,
            threshold: this.config.alertThresholds.latencyP99,
          });
        }

        // 重置计数器
        this.metrics.set(queueName, this.emptyMetrics());
      }
    }, this.config.metricsInterval);
  }

  stop(): void {
    if (this.metricsInterval) {
      clearInterval(this.metricsInterval);
      this.metricsInterval = null;
    }
  }
}

interface QueueMetrics {
  completed: number;
  failed: number;
  stalled: number;
  waiting: number;
  processing: number;
  latencies: number[];
  errors: Array<{ jobId: string; error: string; timestamp: number }>;
}

// 与可观测性后端集成
const observer = new JobObserver({
  metricsInterval: 10000, // 10秒
  alertThresholds: {
    errorRate: 0.05, // 5%
    queueDepth: 1000,
    latencyP99: 5000, // 5秒
  },
});

// Prometheus指标导出
observer.on("metrics", (metrics) => {
  prometheusClient.gauge("queue_depth", metrics.counts.waiting, {
    queue: metrics.queue,
  });
  prometheusClient.histogram("job_latency", metrics.latency.p99, {
    queue: metrics.queue,
    percentile: "p99",
  });
  prometheusClient.counter("jobs_completed", metrics.throughput, {
    queue: metrics.queue,
  });
});

// 分布式跟踪(OpenTelemetry)
observer.on("trace", (span) => {
  tracer.startSpan(span.operation, {
    attributes: {
      queue: span.queue,
      jobId: span.jobId,
      duration: span.duration,
    },
  });
});

// 告警(PagerDuty、Slack等)
observer.on("alert", (alert) => {
  if (alert.type === "high_error_rate") {
    pagerduty.trigger({
      severity: "error",
      summary: `${alert.queue} 中高错误率: ${(
        alert.errorRate * 100
      ).toFixed(1)}%`,
    });
  }
});

最佳实践

  1. 幂等性

    • 设计作业以安全重执行
    • 使用唯一作业ID去重
    • 外部存储处理状态
  2. 重试策略

    • 使用带抖动的指数退避
    • 设置最大重试限制
    • 区分可重试和不可重试错误
  3. 监控和可观测性

    • 跟踪队列深度、处理延迟和吞吐量
    • 高错误率或队列增长告警
    • 监控工作者健康和内存使用
    • 复杂管道使用分布式跟踪
    • 指标导出到Prometheus、Datadog或CloudWatch
  4. 优雅关闭

    • 关闭前完成进行中作业
    • 正确使用信号(SIGTERM、SIGINT)
    • 设置合理作业完成超时
  5. 资源管理

    • 设置适当并发限制
    • CPU密集型任务使用工作者池
    • 外部API实现速率限制
    • 配置每作业内存和时间限制
  6. 数据管道设计

    • 将管道分解为边界清晰的阶段
    • 并行处理使用扇出/扇入模式
    • 长运行作业设置检查点恢复
    • 存储中间结果调试
  7. ML训练作业

    • 频繁保存检查点防崩溃恢复
    • 训练与推理使用单独队列
    • 实现资源配额防饥饿
    • 跟踪实验元数据和超参数

示例

完整工作者服务

import Queue, { Job } from "bull";
import { Redis } from "ioredis";

interface WorkerConfig {
  queues: Array<{
    name: string;
    concurrency: number;
    processor: (job: Job) => Promise<unknown>;
  }>;
  redis: Redis;
  shutdownTimeout: number;
}

class WorkerService {
  private queues: Map<string, Queue.Queue> = new Map();
  private isShuttingDown = false;
  private activeJobs = 0;

  constructor(private config: WorkerConfig) {}

  async start(): Promise<void> {
    // 设置队列
    for (const queueConfig of this.config.queues) {
      const queue = new Queue(queueConfig.name, {
        createClient: () => this.config.redis.duplicate(),
      });

      queue.process(queueConfig.concurrency, async (job) => {
        if (this.isShuttingDown) {
          throw new Error("工作者关闭中");
        }

        this.activeJobs++;
        try {
          return await queueConfig.processor(job);
        } finally {
          this.activeJobs--;
        }
      });

      this.queues.set(queueConfig.name, queue);
    }

    // 设置优雅关闭
    process.on("SIGTERM", () => this.shutdown());
    process.on("SIGINT", () => this.shutdown());

    console.log("工作者服务启动");
  }

  private async shutdown(): Promise<void> {
    if (this.isShuttingDown) return;
    this.isShuttingDown = true;

    console.log("关闭工作者服务...");

    // 暂停所有队列
    await Promise.all(
      Array.from(this.queues.values()).map((q) => q.pause(true)),
    );

    // 等待活动作业完成
    const startTime = Date.now();
    while (
      this.activeJobs > 0 &&
      Date.now() - startTime < this.config.shutdownTimeout
    ) {
      await new Promise((r) => setTimeout(r, 100));
    }

    if (this.activeJobs > 0) {
      console.warn(`强制关闭,有 ${this.activeJobs} 个活动作业`);
    }

    // 关闭所有队列
    await Promise.all(Array.from(this.queues.values()).map((q) => q.close()));

    console.log("工作者服务停止");
    process.exit(0);
  }
}

// 使用
const worker = new WorkerService({
  redis: new Redis(process.env.REDIS_URL),
  shutdownTimeout: 30000,
  queues: [
    {
      name: "email",
      concurrency: 5,
      processor: async (job) => {
        await sendEmail(job.data);
      },
    },
    {
      name: "images",
      concurrency: 2,
      processor: async (job) => {
        await processImage(job.data);
      },
    },
  ],
});

worker.start();