name: background-jobs description: 背景作业处理模式,包括任务队列、定时作业、工作者池和重试策略。在实现异步处理、任务队列、工作者、任务队列、异步任务、延迟作业、重复作业、定时任务、ETL管道、数据处理、ML训练作业、Celery、Bull、Sidekiq、Resque、cron作业、重试逻辑、死信队列、DLQ、至少一次交付、精确一次交付、作业监控或工作者管理时使用。
背景作业
概述
背景作业允许在请求-响应周期外异步处理任务。此技能涵盖任务队列模式、调度、工作者管理、重试策略和监控,以实现跨不同框架和语言的可靠任务执行。
关键概念
任务队列模式
Bull队列 (Node.js/Redis):
import Queue, { Job, JobOptions } from "bull";
import { Redis } from "ioredis";
// 队列配置
interface QueueConfig {
name: string;
redis: Redis;
defaultJobOptions?: JobOptions;
}
// 作业数据接口
interface EmailJobData {
to: string;
subject: string;
template: string;
context: Record<string, unknown>;
}
interface ImageProcessingJobData {
imageId: string;
operations: Array<{
type: "resize" | "crop" | "compress";
params: Record<string, unknown>;
}>;
}
// 队列工厂
function createQueue<T>(config: QueueConfig): Queue.Queue<T> {
const queue = new Queue<T>(config.name, {
createClient: (type) => {
switch (type) {
case "client":
return config.redis.duplicate();
case "subscriber":
return config.redis.duplicate();
case "bclient":
return config.redis.duplicate();
default:
return config.redis.duplicate();
}
},
defaultJobOptions: {
removeOnComplete: 100, // 保留最后100个已完成作业
removeOnFail: 1000, // 保留最后1000个失败作业
attempts: 3,
backoff: {
type: "exponential",
delay: 2000,
},
...config.defaultJobOptions,
},
});
// 全局错误处理程序
queue.on("error", (error) => {
console.error(`队列 ${config.name} 错误:`, error);
});
return queue;
}
// 带类型处理器的邮件队列
const emailQueue = createQueue<EmailJobData>({
name: "email",
redis: new Redis(process.env.REDIS_URL),
});
// 定义处理器
emailQueue.process(async (job: Job<EmailJobData>) => {
const { to, subject, template, context } = job.data;
// 更新进度
await job.progress(10);
// 渲染模板
const html = await renderTemplate(template, context);
await job.progress(50);
// 发送邮件
await emailService.send({ to, subject, html });
await job.progress(100);
return { sent: true, messageId: `msg_${Date.now()}` };
});
// 添加作业选项
async function sendEmail(
data: EmailJobData,
options?: JobOptions,
): Promise<Job<EmailJobData>> {
return emailQueue.add(data, {
priority: options?.priority || 0,
delay: options?.delay || 0,
jobId: options?.jobId, // 用于去重
...options,
});
}
// 批量作业添加
async function sendBulkEmails(
emails: EmailJobData[],
): Promise<Job<EmailJobData>[]> {
const jobs = emails.map((data, index) => ({
data,
opts: {
jobId: `bulk_${Date.now()}_${index}`,
},
}));
return emailQueue.addBulk(jobs);
}
Celery (Python):
from celery import Celery, Task
from celery.exceptions import MaxRetriesExceededError
from typing import Any, Dict, Optional
import logging
# Celery 配置
app = Celery('tasks')
app.config_from_object({
'broker_url': 'redis://localhost:6379/0',
'result_backend': 'redis://localhost:6379/1',
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json'],
'timezone': 'UTC',
'task_track_started': True,
'task_time_limit': 300, # 5分钟硬限制
'task_soft_time_limit': 240, # 4分钟软限制
'worker_prefetch_multiplier': 4,
'task_acks_late': True, # 任务完成后确认
'task_reject_on_worker_lost': True,
})
logger = logging.getLogger(__name__)
# 带重试逻辑的基础任务
class BaseTask(Task):
autoretry_for = (Exception,)
retry_kwargs = {'max_retries': 3}
retry_backoff = True
retry_backoff_max = 600 # 10分钟最大
retry_jitter = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error(f'任务 {self.name}[{task_id}] 失败: {exc}')
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.warning(f'任务 {self.name}[{task_id}] 重试中: {exc}')
def on_success(self, retval, task_id, args, kwargs):
logger.info(f'任务 {self.name}[{task_id}] 成功')
# 邮件任务
@app.task(base=BaseTask, bind=True, name='send_email')
def send_email(
self,
to: str,
subject: str,
template: str,
context: Dict[str, Any]
) -> Dict[str, Any]:
try:
# 更新状态
self.update_state(state='PROGRESS', meta={'progress': 10})
# 渲染模板
html = render_template(template, context)
self.update_state(state='PROGRESS', meta={'progress': 50})
# 发送邮件
message_id = email_service.send(to=to, subject=subject, html=html)
self.update_state(state='PROGRESS', meta={'progress': 100})
return {'sent': True, 'message_id': message_id}
except ConnectionError as exc:
raise self.retry(exc=exc, countdown=60)
# 带链的图像处理
@app.task(base=BaseTask, bind=True, name='process_image')
def process_image(self, image_id: str, operations: list) -> Dict[str, Any]:
image = load_image(image_id)
for i, op in enumerate(operations):
progress = int((i + 1) / len(operations) * 100)
self.update_state(state='PROGRESS', meta={'progress': progress, 'operation': op['type']})
if op['type'] == 'resize':
image = resize_image(image, **op['params'])
elif op['type'] == 'crop':
image = crop_image(image, **op['params'])
elif op['type'] == 'compress':
image = compress_image(image, **op['params'])
url = save_image(image, image_id)
return {'url': url, 'operations_count': len(operations)}
# 任务链示例
from celery import chain, group, chord
def process_order(order_id: str):
"""处理带链任务的订单。"""
workflow = chain(
validate_order.s(order_id),
reserve_inventory.s(),
process_payment.s(),
send_confirmation.s(),
)
return workflow.apply_async()
def process_bulk_images(image_ids: list):
"""并行处理多个图像,然后聚合结果。"""
workflow = chord(
group(process_image.s(img_id, [{'type': 'resize', 'params': {'width': 800}}])
for img_id in image_ids),
aggregate_results.s()
)
return workflow.apply_async()
Sidekiq (Ruby):
# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
config.redis = { url: ENV['REDIS_URL'], network_timeout: 5 }
config.death_handlers << ->(job, ex) do
# 处理作业失败
ErrorReporter.report(ex, job: job)
end
end
Sidekiq.configure_client do |config|
config.redis = { url: ENV['REDIS_URL'], network_timeout: 5 }
end
# app/workers/email_worker.rb
class EmailWorker
include Sidekiq::Worker
sidekiq_options queue: :default,
retry: 5,
backtrace: true,
dead: true
sidekiq_retry_in do |count, exception|
# 指数退避:1, 8, 27, 64, 125秒
(count + 1) ** 3
end
sidekiq_retries_exhausted do |msg, exception|
Rails.logger.error "作业 #{msg['jid']} 重试耗尽: #{exception.message}"
DeadJobNotifier.notify(msg, exception)
end
def perform(to, subject, template, context)
html = ApplicationController.render(
template: template,
locals: context.symbolize_keys
)
EmailService.send(to: to, subject: subject, html: html)
end
end
# app/workers/batch_worker.rb
class BatchWorker
include Sidekiq::Worker
def perform(batch_id)
batch = Batch.find(batch_id)
batch.items.find_each do |item|
ItemProcessor.perform_async(item.id)
end
end
end
# 使用Sidekiq批次(专业功能)
class ImportWorker
include Sidekiq::Worker
def perform(import_id)
import = Import.find(import_id)
batch = Sidekiq::Batch.new
batch.description = "导入 #{import_id}"
batch.on(:complete, ImportCallbacks, import_id: import_id)
batch.jobs do
import.rows.each_with_index do |row, index|
ImportRowWorker.perform_async(import_id, index, row)
end
end
end
end
class ImportCallbacks
def on_complete(status, options)
import = Import.find(options['import_id'])
if status.failures.zero?
import.update!(status: 'completed')
else
import.update!(status: 'completed_with_errors', error_count: status.failures)
end
end
end
定时作业和Cron模式
// Bull调度器
import Queue from "bull";
const scheduledQueue = new Queue("scheduled-tasks", process.env.REDIS_URL);
// 可重复作业
async function setupScheduledJobs(): Promise<void> {
// 每小时清理
await scheduledQueue.add(
"cleanup",
{},
{
repeat: { cron: "0 * * * *" }, // 每小时
jobId: "cleanup-hourly",
},
);
// 每天9点日报
await scheduledQueue.add(
"daily-report",
{},
{
repeat: { cron: "0 9 * * *" },
jobId: "daily-report",
},
);
// 每5分钟
await scheduledQueue.add(
"health-check",
{},
{
repeat: { every: 5 * 60 * 1000 }, // 5分钟毫秒
jobId: "health-check",
},
);
// 每周日午夜
await scheduledQueue.add(
"weekly-cleanup",
{},
{
repeat: { cron: "0 0 * * 0" },
jobId: "weekly-cleanup",
},
);
}
// 处理定时作业
scheduledQueue.process("cleanup", async (job) => {
await cleanupOldRecords();
return { cleaned: true };
});
scheduledQueue.process("daily-report", async (job) => {
const report = await generateDailyReport();
await sendReportEmail(report);
return { reportId: report.id };
});
// 列出定时作业
async function getScheduledJobs(): Promise<
Array<{ name: string; next: Date; cron: string }>
> {
const repeatableJobs = await scheduledQueue.getRepeatableJobs();
return repeatableJobs.map((job) => ({
name: job.name,
next: new Date(job.next),
cron: job.cron || `每 ${job.every}毫秒`,
}));
}
// 移除定时作业
async function removeScheduledJob(jobId: string): Promise<void> {
const jobs = await scheduledQueue.getRepeatableJobs();
const job = jobs.find((j) => j.id === jobId);
if (job) {
await scheduledQueue.removeRepeatableByKey(job.key);
}
}
# Celery Beat调度器
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks')
app.conf.beat_schedule = {
# 每小时
'cleanup-hourly': {
'task': 'tasks.cleanup',
'schedule': crontab(minute=0), # 每小时整点
},
# 每天9点
'daily-report': {
'task': 'tasks.daily_report',
'schedule': crontab(hour=9, minute=0),
},
# 每5分钟
'health-check': {
'task': 'tasks.health_check',
'schedule': 300.0, # 5分钟秒数
},
# 每周日午夜
'weekly-cleanup': {
'task': 'tasks.weekly_cleanup',
'schedule': crontab(hour=0, minute=0, day_of_week=0),
},
# 每月第一天6点
'monthly-report': {
'task': 'tasks.monthly_report',
'schedule': crontab(hour=6, minute=0, day_of_month=1),
},
# 带参数
'check-expiring-subscriptions': {
'task': 'tasks.check_subscriptions',
'schedule': crontab(hour=8, minute=0),
'args': ('expiring',),
'kwargs': {'days_ahead': 7},
},
}
# 动态数据库调度
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json
def create_scheduled_task(name: str, task: str, cron: str, args: list = None, kwargs: dict = None):
"""动态创建定时任务。"""
# 解析cron表达式
minute, hour, day_of_month, month, day_of_week = cron.split()
schedule, _ = CrontabSchedule.objects.get_or_create(
minute=minute,
hour=hour,
day_of_month=day_of_month,
month_of_year=month,
day_of_week=day_of_week,
)
PeriodicTask.objects.update_or_create(
name=name,
defaults={
'task': task,
'crontab': schedule,
'args': json.dumps(args or []),
'kwargs': json.dumps(kwargs or {}),
'enabled': True,
},
)
工作者池管理
import Queue, { Job } from "bull";
import os from "os";
interface WorkerPoolConfig {
concurrency: number;
limiter?: {
max: number;
duration: number;
};
}
class WorkerPool {
private queues: Map<string, Queue.Queue> = new Map();
private isShuttingDown = false;
constructor(private config: WorkerPoolConfig) {
// 优雅关闭
process.on("SIGTERM", () => this.shutdown());
process.on("SIGINT", () => this.shutdown());
}
registerQueue<T>(
name: string,
processor: (job: Job<T>) => Promise<unknown>,
): Queue.Queue<T> {
const queue = new Queue<T>(name, process.env.REDIS_URL!, {
limiter: this.config.limiter,
});
// 带并发处理
queue.process(this.config.concurrency, async (job: Job<T>) => {
if (this.isShuttingDown) {
throw new Error("工作者关闭中");
}
return processor(job);
});
// 事件处理程序
queue.on("completed", (job, result) => {
console.log(`作业 ${job.id} 完成:`, result);
});
queue.on("failed", (job, err) => {
console.error(`作业 ${job?.id} 失败:`, err);
});
queue.on("stalled", (job) => {
console.warn(`作业 ${job} 停滞`);
});
this.queues.set(name, queue);
return queue;
}
async shutdown(): Promise<void> {
console.log("启动优雅关闭...");
this.isShuttingDown = true;
// 停止接受新作业
const closePromises = Array.from(this.queues.values()).map(
async (queue) => {
await queue.pause(true); // 暂停并等待活动作业
await queue.close();
},
);
await Promise.all(closePromises);
console.log("所有队列关闭");
process.exit(0);
}
async getStats(): Promise<Record<string, QueueStats>> {
const stats: Record<string, QueueStats> = {};
for (const [name, queue] of this.queues) {
const [waiting, active, completed, failed, delayed] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount(),
]);
stats[name] = { waiting, active, completed, failed, delayed };
}
return stats;
}
}
interface QueueStats {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
}
// 使用
const pool = new WorkerPool({
concurrency: os.cpus().length,
limiter: {
max: 100, // 每秒最多100个作业
duration: 1000,
},
});
pool.registerQueue<EmailJobData>("email", async (job) => {
await sendEmail(job.data);
});
pool.registerQueue<ImageProcessingJobData>("images", async (job) => {
await processImage(job.data);
});
# Celery工作者管理
from celery import Celery
from celery.signals import worker_process_init, worker_shutdown
import multiprocessing
app = Celery('tasks')
# 工作者配置
app.conf.update(
worker_concurrency=multiprocessing.cpu_count(),
worker_prefetch_multiplier=2, # 每个工作者预取2个任务
worker_max_tasks_per_child=1000, # 每1000个任务重启工作者
worker_max_memory_per_child=200000, # 200MB限制
task_acks_late=True,
task_reject_on_worker_lost=True,
)
# 每个工作者初始化
@worker_process_init.connect
def init_worker(**kwargs):
"""为每个工作者进程初始化资源。"""
# 初始化数据库连接池
db.connect()
# 预热缓存
cache.warm_up()
@worker_shutdown.connect
def cleanup_worker(**kwargs):
"""工作者关闭时清理资源。"""
db.close()
cache.flush()
# 任务路由专用工作者
app.conf.task_routes = {
'tasks.send_email': {'queue': 'email'},
'tasks.process_image': {'queue': 'images'},
'tasks.heavy_computation': {'queue': 'compute'},
'tasks.*': {'queue': 'default'},
}
# 队列特定工作者命令:
# celery -A tasks worker -Q email --concurrency=4
# celery -A tasks worker -Q images --concurrency=2
# celery -A tasks worker -Q compute --concurrency=1
# Celery自动缩放
app.conf.worker_autoscaler = 'celery.worker.autoscale:Autoscaler'
app.conf.worker_autoscale_max = 10
app.conf.worker_autoscale_min = 2
作业优先级和公平性
// Bull优先级队列
interface PriorityJobData {
type: string;
payload: unknown;
priority: "critical" | "high" | "normal" | "low";
}
const priorityMap = {
critical: 1, // 最高优先级(先处理)
high: 5,
normal: 10,
low: 20,
};
async function addPriorityJob(
data: PriorityJobData,
): Promise<Job<PriorityJobData>> {
return queue.add(data, {
priority: priorityMap[data.priority],
// 关键作业不等待
delay: data.priority === "critical" ? 0 : undefined,
});
}
// 多队列公平调度
class FairScheduler {
private queues: Map<string, Queue.Queue> = new Map();
private weights: Map<string, number> = new Map();
constructor(queueConfigs: Array<{ name: string; weight: number }>) {
for (const config of queueConfigs) {
const queue = new Queue(config.name, process.env.REDIS_URL!);
this.queues.set(config.name, queue);
this.weights.set(config.name, config.weight);
}
}
// 加权轮询处理
async process(
handler: (queueName: string, job: Job) => Promise<void>,
): Promise<void> {
const totalWeight = Array.from(this.weights.values()).reduce(
(a, b) => a + b,
0,
);
for (const [name, queue] of this.queues) {
const weight = this.weights.get(name)!;
const concurrency = Math.max(1, Math.floor((weight / totalWeight) * 10));
queue.process(concurrency, async (job) => {
await handler(name, job);
});
}
}
}
// 使用:优先处理高级客户
const scheduler = new FairScheduler([
{ name: "premium", weight: 5 }, // 50%容量
{ name: "standard", weight: 3 }, // 30%容量
{ name: "free", weight: 2 }, // 20%容量
]);
await scheduler.process(async (queueName, job) => {
console.log(`处理 ${queueName} 作业:`, job.id);
await processJob(job);
});
幂等性和重试策略
import Queue, { Job, JobOptions } from "bull";
import { createHash } from "crypto";
// 幂等键生成
function generateIdempotencyKey(data: unknown): string {
const hash = createHash("sha256");
hash.update(JSON.stringify(data));
return hash.digest("hex");
}
// 幂等作业处理器
class IdempotentProcessor<T> {
private processedKeys: Set<string> = new Set();
private redis: Redis;
constructor(
private queue: Queue.Queue<T>,
redis: Redis,
) {
this.redis = redis;
}
async process(handler: (job: Job<T>) => Promise<unknown>): Promise<void> {
this.queue.process(async (job: Job<T>) => {
const idempotencyKey = job.opts.jobId || generateIdempotencyKey(job.data);
// 检查是否已处理
const existing = await this.redis.get(`processed:${idempotencyKey}`);
if (existing) {
console.log(`作业 ${job.id} 已处理,跳过`);
return JSON.parse(existing);
}
// 处理作业
const result = await handler(job);
// 标记为已处理,带TTL
await this.redis.setex(
`processed:${idempotencyKey}`,
86400, // 24小时
JSON.stringify(result),
);
return result;
});
}
}
// 自定义重试策略
interface RetryStrategy {
type: "exponential" | "linear" | "fixed" | "custom";
baseDelay: number;
maxDelay?: number;
maxRetries: number;
jitter?: boolean;
retryOn?: (error: Error) => boolean;
}
function calculateDelay(strategy: RetryStrategy, attempt: number): number {
let delay: number;
switch (strategy.type) {
case "exponential":
delay = strategy.baseDelay * Math.pow(2, attempt - 1);
break;
case "linear":
delay = strategy.baseDelay * attempt;
break;
case "fixed":
delay = strategy.baseDelay;
break;
default:
delay = strategy.baseDelay;
}
// 应用最大延迟上限
if (strategy.maxDelay) {
delay = Math.min(delay, strategy.maxDelay);
}
// 添加抖动(最多20%变化)
if (strategy.jitter) {
const jitterFactor = 0.8 + Math.random() * 0.4; // 0.8到1.2
delay = Math.floor(delay * jitterFactor);
}
return delay;
}
// 带死信队列重试
class RetryableQueue<T> {
private mainQueue: Queue.Queue<T>;
private dlq: Queue.Queue<T>;
private strategy: RetryStrategy;
constructor(name: string, strategy: RetryStrategy) {
this.mainQueue = new Queue<T>(name, process.env.REDIS_URL!);
this.dlq = new Queue<T>(`${name}-dlq`, process.env.REDIS_URL!);
this.strategy = strategy;
}
async process(handler: (job: Job<T>) => Promise<unknown>): Promise<void> {
this.mainQueue.process(async (job: Job<T>) => {
const attempts = job.attemptsMade;
try {
return await handler(job);
} catch (error) {
const err = error as Error;
// 检查错误是否可重试
if (this.strategy.retryOn && !this.strategy.retryOn(err)) {
await this.moveToDLQ(job, err);
throw err;
}
// 检查最大重试次数
if (attempts >= this.strategy.maxRetries) {
await this.moveToDLQ(job, err);
throw err;
}
// 带计算延迟重试
const delay = calculateDelay(this.strategy, attempts + 1);
throw new Error(`${delay}毫秒后重试: ${err.message}`);
}
});
}
private async moveToDLQ(job: Job<T>, error: Error): Promise<void> {
await this.dlq.add({
originalJob: job.data,
error: error.message,
failedAt: new Date().toISOString(),
attempts: job.attemptsMade,
} as unknown as T);
}
async retryFromDLQ(jobId: string): Promise<void> {
const job = await this.dlq.getJob(jobId);
if (!job) return;
const dlqData = job.data as unknown as { originalJob: T };
await this.mainQueue.add(dlqData.originalJob);
await job.remove();
}
}
作业监控和死作业
import Queue, { Job, JobCounts, JobStatus } from "bull";
import { EventEmitter } from "events";
interface JobMetrics {
queue: string;
counts: JobCounts;
latency: {
avg: number;
p50: number;
p95: number;
p99: number;
};
throughput: number; // 每分钟作业数
errorRate: number;
}
class JobMonitor extends EventEmitter {
private queues: Queue.Queue[] = [];
private metricsHistory: Map<string, number[]> = new Map();
addQueue(queue: Queue.Queue): void {
this.queues.push(queue);
queue.on("completed", (job, result) => {
this.recordMetric(queue.name, "completed", job);
this.emit("job:completed", { queue: queue.name, job, result });
});
queue.on("failed", (job, err) => {
this.recordMetric(queue.name, "failed", job!);
this.emit("job:failed", { queue: queue.name, job, error: err });
// 高失败率告警
this.checkErrorRate(queue.name);
});
queue.on("stalled", (job) => {
this.emit("job:stalled", { queue: queue.name, jobId: job });
});
}
private recordMetric(queueName: string, type: string, job: Job): void {
const duration = Date.now() - job.timestamp;
const key = `${queueName}:${type}:duration`;
const history = this.metricsHistory.get(key) || [];
history.push(duration);
// 保留最近1000个样本
if (history.length > 1000) {
history.shift();
}
this.metricsHistory.set(key, history);
}
private checkErrorRate(queueName: string): void {
const completed =
this.metricsHistory.get(`${queueName}:completed:duration`)?.length || 0;
const failed =
this.metricsHistory.get(`${queueName}:failed:duration`)?.length || 0;
if (completed + failed > 10) {
const errorRate = failed / (completed + failed);
if (errorRate > 0.1) {
// > 10%失败率
this.emit("alert:high_error_rate", { queue: queueName, errorRate });
}
}
}
async getMetrics(queueName: string): Promise<JobMetrics> {
const queue = this.queues.find((q) => q.name === queueName);
if (!queue) throw new Error(`队列 ${queueName} 未找到`);
const counts = await queue.getJobCounts();
const durations =
this.metricsHistory.get(`${queueName}:completed:duration`) || [];
return {
queue: queueName,
counts,
latency: this.calculateLatencyPercentiles(durations),
throughput: this.calculateThroughput(durations),
errorRate: this.calculateErrorRate(queueName),
};
}
private calculateLatencyPercentiles(
durations: number[],
): JobMetrics["latency"] {
if (durations.length === 0) {
return { avg: 0, p50: 0, p95: 0, p99: 0 };
}
const sorted = [...durations].sort((a, b) => a - b);
const avg = sorted.reduce((a, b) => a + b, 0) / sorted.length;
return {
avg: Math.round(avg),
p50: sorted[Math.floor(sorted.length * 0.5)],
p95: sorted[Math.floor(sorted.length * 0.95)],
p99: sorted[Math.floor(sorted.length * 0.99)],
};
}
private calculateThroughput(durations: number[]): number {
// 最近一分钟完成的作业
const oneMinuteAgo = Date.now() - 60000;
const recentJobs = durations.filter((_, i) => i > durations.length - 100);
return recentJobs.length;
}
private calculateErrorRate(queueName: string): number {
const completed =
this.metricsHistory.get(`${queueName}:completed:duration`)?.length || 0;
const failed =
this.metricsHistory.get(`${queueName}:failed:duration`)?.length || 0;
const total = completed + failed;
return total > 0 ? failed / total : 0;
}
// 死作业管理
async getDeadJobs(queueName: string, limit: number = 100): Promise<Job[]> {
const queue = this.queues.find((q) => q.name === queueName);
if (!queue) throw new Error(`队列 ${queueName} 未找到`);
return queue.getFailed(0, limit);
}
async retryDeadJob(queueName: string, jobId: string): Promise<void> {
const queue = this.queues.find((q) => q.name === queueName);
if (!queue) throw new Error(`队列 ${queueName} 未找到`);
const job = await queue.getJob(jobId);
if (!job) throw new Error(`作业 ${jobId} 未找到`);
await job.retry();
}
async retryAllDeadJobs(queueName: string): Promise<number> {
const deadJobs = await this.getDeadJobs(queueName);
let retried = 0;
for (const job of deadJobs) {
try {
await job.retry();
retried++;
} catch (error) {
console.error(`重试作业 ${job.id} 失败:`, error);
}
}
return retried;
}
async cleanDeadJobs(
queueName: string,
olderThan: number = 86400000,
): Promise<number> {
const queue = this.queues.find((q) => q.name === queueName);
if (!queue) throw new Error(`队列 ${queueName} 未找到`);
const cleaned = await queue.clean(olderThan, "failed");
return cleaned.length;
}
}
// 仪表板API端点
import express from "express";
function createMonitoringRouter(monitor: JobMonitor): express.Router {
const router = express.Router();
router.get("/queues/:name/metrics", async (req, res) => {
try {
const metrics = await monitor.getMetrics(req.params.name);
res.json(metrics);
} catch (error) {
res.status(404).json({ error: (error as Error).message });
}
});
router.get("/queues/:name/dead", async (req, res) => {
const limit = parseInt(req.query.limit as string) || 100;
const jobs = await monitor.getDeadJobs(req.params.name, limit);
res.json(
jobs.map((j) => ({
id: j.id,
data: j.data,
failedReason: j.failedReason,
attemptsMade: j.attemptsMade,
timestamp: j.timestamp,
})),
);
});
router.post("/queues/:name/dead/:jobId/retry", async (req, res) => {
try {
await monitor.retryDeadJob(req.params.name, req.params.jobId);
res.json({ success: true });
} catch (error) {
res.status(400).json({ error: (error as Error).message });
}
});
router.post("/queues/:name/dead/retry-all", async (req, res) => {
const retried = await monitor.retryAllDeadJobs(req.params.name);
res.json({ retried });
});
router.delete("/queues/:name/dead", async (req, res) => {
const olderThan = parseInt(req.query.olderThan as string) || 86400000;
const cleaned = await monitor.cleanDeadJobs(req.params.name, olderThan);
res.json({ cleaned });
});
return router;
}
数据管道作业
ETL调度和编排:
# Airflow风格任务依赖
from celery import chain, group
@app.task
def extract_from_source(source_id: str):
"""从源系统提取数据。"""
data = fetch_from_api(source_id)
return {'source_id': source_id, 'records': data}
@app.task
def transform_data(extract_result: dict):
"""转换提取的数据。"""
records = extract_result['records']
transformed = [normalize_record(r) for r in records]
return {'source_id': extract_result['source_id'], 'records': transformed}
@app.task
def load_to_warehouse(transform_result: dict):
"""加载转换数据到仓库。"""
warehouse.bulk_insert(transform_result['records'])
return {'loaded': len(transform_result['records'])}
# 带链的ETL管道
def run_etl_pipeline(source_id: str):
pipeline = chain(
extract_from_source.s(source_id),
transform_data.s(),
load_to_warehouse.s(),
)
return pipeline.apply_async()
# 多源并行提取
def run_multi_source_etl(source_ids: list):
pipeline = chain(
group(extract_from_source.s(sid) for sid in source_ids),
# 扇入:转换所有结果
group(transform_data.s() for _ in source_ids),
# 聚合和加载
aggregate_and_load.s(),
)
return pipeline.apply_async()
// Bull基础数据管道
import Queue from "bull";
interface PipelineStage<T, U> {
name: string;
queue: Queue.Queue<T>;
process: (data: T) => Promise<U>;
nextStage?: PipelineStage<U, any>;
}
class DataPipeline {
private stages: Map<string, PipelineStage<any, any>> = new Map();
addStage<T, U>(stage: PipelineStage<T, U>): void {
this.stages.set(stage.name, stage);
// 处理并转发到下一阶段
stage.queue.process(async (job) => {
const result = await stage.process(job.data);
if (stage.nextStage) {
await stage.nextStage.queue.add(result, {
jobId: `${stage.nextStage.name}-${job.id}`,
});
}
return result;
});
}
async start(initialData: any, startStage: string): Promise<void> {
const stage = this.stages.get(startStage);
if (!stage) throw new Error(`阶段 ${startStage} 未找到`);
await stage.queue.add(initialData);
}
}
// 使用
const extractQueue = new Queue("extract", redisUrl);
const transformQueue = new Queue("transform", redisUrl);
const loadQueue = new Queue("load", redisUrl);
const pipeline = new DataPipeline();
pipeline.addStage({
name: "extract",
queue: extractQueue,
process: async (sourceId) => fetchFromSource(sourceId),
nextStage: {
name: "transform",
queue: transformQueue,
process: async (data) => transformData(data),
nextStage: {
name: "load",
queue: loadQueue,
process: async (data) => loadToWarehouse(data),
},
},
});
await pipeline.start("source-123", "extract");
ML训练作业
带检查点的长运行模型训练:
# 分布式ML训练作业
from celery import Task
import torch
from pathlib import Path
class TrainingTask(Task):
autoretry_for = (RuntimeError,)
max_retries = 3
def __init__(self):
self.checkpoint_dir = Path('/checkpoints')
self.checkpoint_dir.mkdir(exist_ok=True)
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""失败时保存检查点恢复。"""
model = kwargs.get('model_state')
if model:
checkpoint_path = self.checkpoint_dir / f'{task_id}.pt'
torch.save(model, checkpoint_path)
@app.task(base=TrainingTask, bind=True, time_limit=7200)
def train_model(
self,
model_id: str,
dataset_id: str,
config: dict,
resume_from: str = None
):
"""带检查点的ML模型训练。"""
# 如果恢复,加载检查点
if resume_from:
checkpoint = torch.load(resume_from)
model = checkpoint['model']
optimizer = checkpoint['optimizer']
start_epoch = checkpoint['epoch']
else:
model = create_model(config)
optimizer = create_optimizer(model, config)
start_epoch = 0
dataset = load_dataset(dataset_id)
for epoch in range(start_epoch, config['epochs']):
# 更新进度
progress = (epoch / config['epochs']) * 100
self.update_state(
state='PROGRESS',
meta={'epoch': epoch, 'progress': progress}
)
# 训练一个周期
metrics = train_epoch(model, dataset, optimizer)
# 每N周期检查点
if epoch % config.get('checkpoint_interval', 10) == 0:
checkpoint_path = self.checkpoint_dir / f'{model_id}_epoch_{epoch}.pt'
torch.save({
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch,
'metrics': metrics,
}, checkpoint_path)
# 保存最终模型
model_path = save_model(model, model_id)
return {'model_path': model_path, 'final_metrics': metrics}
# 并行作业超参数调优
def hyperparameter_search(model_id: str, param_grid: dict):
"""运行并行超参数搜索。"""
from itertools import product
# 生成参数组合
keys = param_grid.keys()
values = param_grid.values()
combinations = [dict(zip(keys, v)) for v in product(*values)]
# 启动并行训练作业
jobs = group(
train_model.s(
model_id=f'{model_id}_trial_{i}',
dataset_id='train_dataset',
config=params
)
for i, params in enumerate(combinations)
)
# 聚合结果并选择最佳模型
workflow = chain(jobs, select_best_model.s())
return workflow.apply_async()
// 分布式ML推理管道
import Queue, { Job } from "bull";
interface InferenceJob {
modelId: string;
batchId: string;
inputs: Array<{ id: string; data: any }>;
}
interface InferenceResult {
batchId: string;
predictions: Array<{ id: string; prediction: any; confidence: number }>;
}
const inferenceQueue = new Queue<InferenceJob>("ml-inference", redisUrl, {
defaultJobOptions: {
attempts: 2,
backoff: { type: "fixed", delay: 5000 },
},
limiter: {
max: 10, // 最多10个并发推理作业
duration: 1000,
},
});
// 批量推理处理器
inferenceQueue.process(4, async (job: Job<InferenceJob>) => {
const { modelId, batchId, inputs } = job.data;
// 加载模型(缓存)
const model = await loadModel(modelId);
const predictions: InferenceResult["predictions"] = [];
for (let i = 0; i < inputs.length; i++) {
const input = inputs[i];
// 更新进度
await job.progress((i / inputs.length) * 100);
// 运行推理
const prediction = await model.predict(input.data);
predictions.push({
id: input.id,
prediction: prediction.result,
confidence: prediction.confidence,
});
}
return { batchId, predictions };
});
// 大数据集批量分割器
async function runBatchInference(
modelId: string,
dataset: Array<{ id: string; data: any }>,
batchSize: number = 100,
): Promise<string[]> {
const batches: InferenceJob[] = [];
for (let i = 0; i < dataset.length; i += batchSize) {
batches.push({
modelId,
batchId: `batch_${i / batchSize}`,
inputs: dataset.slice(i, i + batchSize),
});
}
const jobs = await inferenceQueue.addBulk(
batches.map((batch, idx) => ({
data: batch,
opts: { jobId: `inference_${modelId}_${idx}` },
})),
);
return jobs.map((j) => j.id!);
}
作业监控和可观测性
指标、跟踪和告警:
import { EventEmitter } from "events";
import Queue, { Job } from "bull";
interface ObservabilityConfig {
metricsInterval: number; // 每N毫秒发出指标
alertThresholds: {
errorRate: number;
queueDepth: number;
latencyP99: number;
};
}
class JobObserver extends EventEmitter {
private queues: Map<string, Queue.Queue> = new Map();
private metrics: Map<string, QueueMetrics> = new Map();
private metricsInterval: NodeJS.Timeout | null = null;
constructor(private config: ObservabilityConfig) {
super();
}
observe(queue: Queue.Queue): void {
this.queues.set(queue.name, queue);
this.metrics.set(queue.name, this.emptyMetrics());
// 仪器化队列事件
queue.on("completed", (job, result) => {
this.recordCompletion(queue.name, job, result);
});
queue.on("failed", (job, err) => {
this.recordFailure(queue.name, job!, err);
});
queue.on("stalled", (job) => {
this.recordStalled(queue.name, job);
});
queue.on("waiting", (jobId) => {
this.recordWaiting(queue.name, jobId);
});
// 启动指标发出
if (!this.metricsInterval) {
this.startMetricsEmission();
}
}
private emptyMetrics(): QueueMetrics {
return {
completed: 0,
failed: 0,
stalled: 0,
waiting: 0,
processing: 0,
latencies: [],
errors: [],
};
}
private recordCompletion(queueName: string, job: Job, result: any): void {
const metrics = this.metrics.get(queueName)!;
metrics.completed++;
const latency = Date.now() - job.timestamp;
metrics.latencies.push(latency);
// 发出跟踪跨度
this.emit("trace", {
queue: queueName,
jobId: job.id,
operation: "job.completed",
duration: latency,
result,
});
}
private recordFailure(queueName: string, job: Job, error: Error): void {
const metrics = this.metrics.get(queueName)!;
metrics.failed++;
metrics.errors.push({
jobId: job.id!,
error: error.message,
timestamp: Date.now(),
});
// 错误率阈值告警
const errorRate =
metrics.failed / (metrics.completed + metrics.failed || 1);
if (errorRate > this.config.alertThresholds.errorRate) {
this.emit("alert", {
type: "high_error_rate",
queue: queueName,
errorRate,
threshold: this.config.alertThresholds.errorRate,
});
}
// 发出错误跟踪
this.emit("trace", {
queue: queueName,
jobId: job.id,
operation: "job.failed",
error: error.message,
stack: error.stack,
});
}
private recordStalled(queueName: string, jobId: string): void {
const metrics = this.metrics.get(queueName)!;
metrics.stalled++;
this.emit("alert", {
type: "job_stalled",
queue: queueName,
jobId,
});
}
private recordWaiting(queueName: string, jobId: string): void {
const metrics = this.metrics.get(queueName)!;
metrics.waiting++;
}
private startMetricsEmission(): void {
this.metricsInterval = setInterval(async () => {
for (const [queueName, queue] of this.queues) {
const metrics = this.metrics.get(queueName)!;
// 获取当前队列状态
const counts = await queue.getJobCounts();
// 计算百分位数
const sorted = [...metrics.latencies].sort((a, b) => a - b);
const p50 = sorted[Math.floor(sorted.length * 0.5)] || 0;
const p95 = sorted[Math.floor(sorted.length * 0.95)] || 0;
const p99 = sorted[Math.floor(sorted.length * 0.99)] || 0;
// 发出指标
this.emit("metrics", {
queue: queueName,
timestamp: Date.now(),
counts,
latency: {
p50,
p95,
p99,
},
throughput: metrics.completed,
errorRate: metrics.failed / (metrics.completed + metrics.failed || 1),
});
// 队列深度告警
if (counts.waiting > this.config.alertThresholds.queueDepth) {
this.emit("alert", {
type: "high_queue_depth",
queue: queueName,
depth: counts.waiting,
threshold: this.config.alertThresholds.queueDepth,
});
}
// 延迟告警
if (p99 > this.config.alertThresholds.latencyP99) {
this.emit("alert", {
type: "high_latency",
queue: queueName,
p99,
threshold: this.config.alertThresholds.latencyP99,
});
}
// 重置计数器
this.metrics.set(queueName, this.emptyMetrics());
}
}, this.config.metricsInterval);
}
stop(): void {
if (this.metricsInterval) {
clearInterval(this.metricsInterval);
this.metricsInterval = null;
}
}
}
interface QueueMetrics {
completed: number;
failed: number;
stalled: number;
waiting: number;
processing: number;
latencies: number[];
errors: Array<{ jobId: string; error: string; timestamp: number }>;
}
// 与可观测性后端集成
const observer = new JobObserver({
metricsInterval: 10000, // 10秒
alertThresholds: {
errorRate: 0.05, // 5%
queueDepth: 1000,
latencyP99: 5000, // 5秒
},
});
// Prometheus指标导出
observer.on("metrics", (metrics) => {
prometheusClient.gauge("queue_depth", metrics.counts.waiting, {
queue: metrics.queue,
});
prometheusClient.histogram("job_latency", metrics.latency.p99, {
queue: metrics.queue,
percentile: "p99",
});
prometheusClient.counter("jobs_completed", metrics.throughput, {
queue: metrics.queue,
});
});
// 分布式跟踪(OpenTelemetry)
observer.on("trace", (span) => {
tracer.startSpan(span.operation, {
attributes: {
queue: span.queue,
jobId: span.jobId,
duration: span.duration,
},
});
});
// 告警(PagerDuty、Slack等)
observer.on("alert", (alert) => {
if (alert.type === "high_error_rate") {
pagerduty.trigger({
severity: "error",
summary: `${alert.queue} 中高错误率: ${(
alert.errorRate * 100
).toFixed(1)}%`,
});
}
});
最佳实践
-
幂等性
- 设计作业以安全重执行
- 使用唯一作业ID去重
- 外部存储处理状态
-
重试策略
- 使用带抖动的指数退避
- 设置最大重试限制
- 区分可重试和不可重试错误
-
监控和可观测性
- 跟踪队列深度、处理延迟和吞吐量
- 高错误率或队列增长告警
- 监控工作者健康和内存使用
- 复杂管道使用分布式跟踪
- 指标导出到Prometheus、Datadog或CloudWatch
-
优雅关闭
- 关闭前完成进行中作业
- 正确使用信号(SIGTERM、SIGINT)
- 设置合理作业完成超时
-
资源管理
- 设置适当并发限制
- CPU密集型任务使用工作者池
- 外部API实现速率限制
- 配置每作业内存和时间限制
-
数据管道设计
- 将管道分解为边界清晰的阶段
- 并行处理使用扇出/扇入模式
- 长运行作业设置检查点恢复
- 存储中间结果调试
-
ML训练作业
- 频繁保存检查点防崩溃恢复
- 训练与推理使用单独队列
- 实现资源配额防饥饿
- 跟踪实验元数据和超参数
示例
完整工作者服务
import Queue, { Job } from "bull";
import { Redis } from "ioredis";
interface WorkerConfig {
queues: Array<{
name: string;
concurrency: number;
processor: (job: Job) => Promise<unknown>;
}>;
redis: Redis;
shutdownTimeout: number;
}
class WorkerService {
private queues: Map<string, Queue.Queue> = new Map();
private isShuttingDown = false;
private activeJobs = 0;
constructor(private config: WorkerConfig) {}
async start(): Promise<void> {
// 设置队列
for (const queueConfig of this.config.queues) {
const queue = new Queue(queueConfig.name, {
createClient: () => this.config.redis.duplicate(),
});
queue.process(queueConfig.concurrency, async (job) => {
if (this.isShuttingDown) {
throw new Error("工作者关闭中");
}
this.activeJobs++;
try {
return await queueConfig.processor(job);
} finally {
this.activeJobs--;
}
});
this.queues.set(queueConfig.name, queue);
}
// 设置优雅关闭
process.on("SIGTERM", () => this.shutdown());
process.on("SIGINT", () => this.shutdown());
console.log("工作者服务启动");
}
private async shutdown(): Promise<void> {
if (this.isShuttingDown) return;
this.isShuttingDown = true;
console.log("关闭工作者服务...");
// 暂停所有队列
await Promise.all(
Array.from(this.queues.values()).map((q) => q.pause(true)),
);
// 等待活动作业完成
const startTime = Date.now();
while (
this.activeJobs > 0 &&
Date.now() - startTime < this.config.shutdownTimeout
) {
await new Promise((r) => setTimeout(r, 100));
}
if (this.activeJobs > 0) {
console.warn(`强制关闭,有 ${this.activeJobs} 个活动作业`);
}
// 关闭所有队列
await Promise.all(Array.from(this.queues.values()).map((q) => q.close()));
console.log("工作者服务停止");
process.exit(0);
}
}
// 使用
const worker = new WorkerService({
redis: new Redis(process.env.REDIS_URL),
shutdownTimeout: 30000,
queues: [
{
name: "email",
concurrency: 5,
processor: async (job) => {
await sendEmail(job.data);
},
},
{
name: "images",
concurrency: 2,
processor: async (job) => {
await processImage(job.data);
},
},
],
});
worker.start();