后台任务处理
概览
构建具有分布式任务队列、工作池、作业调度、错误处理、重试策略和监控的健壮后台任务处理系统,以实现高效的异步任务执行。
何时使用
- 异步处理长时间运行的操作
- 后台发送电子邮件
- 生成报告或导出
- 处理大型数据集
- 安排定期任务
- 分发计算密集型操作
指令
1. 使用 Celery 和 Redis 的 Python
# celery_app.py
from celery import Celery
from kombu import Exchange, Queue
import os
app = Celery('myapp')
# 配置
app.conf.update(
broker_url=os.getenv('REDIS_URL', 'redis://localhost:6379/0'),
result_backend=os.getenv('REDIS_URL', 'redis://localhost:6379/0'),
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=30 * 60, # 30分钟
task_soft_time_limit=25 * 60, # 25分钟
broker_connection_retry_on_startup=True,
)
# 队列配置
default_exchange = Exchange('tasks', type='direct')
app.conf.task_queues = (
Queue('default', exchange=default_exchange, routing_key='default'),
Queue('emails', exchange=default_exchange, routing_key='emails'),
Queue('reports', exchange=default_exchange, routing_key='reports'),
Queue('batch', exchange=default_exchange, routing_key='batch'),
)
app.conf.task_routes = {
'tasks.send_email': {'queue': 'emails'},
'tasks.generate_report': {'queue': 'reports'},
'tasks.process_batch': {'queue': 'batch'},
}
app.conf.task_default_retry_delay = 60
app.conf.task_max_retries = 3
# 自动发现任务
app.autodiscover_tasks(['myapp.tasks'])
# tasks.py
from celery_app import app
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
import logging
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, user_id, email_subject):
"""发送电子邮件任务,具有重试逻辑"""
try:
user = User.query.get(user_id)
if not user:
logger.error(f"用户 {user_id} 未找到")
return {'status': 'failed', 'reason': '用户未找到'}
# 发送电子邮件逻辑
send_email_helper(user.email, email_subject)
return {'status': 'success', 'user_id': user_id}
except Exception as exc:
logger.error(f"发送电子邮件出错:{exc}")
# 指数退避重试
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
@shared_task(bind=True)
def generate_report(self, report_type, filters):
"""生成报告并跟踪进度"""
try:
self.update_state(
state='PROGRESS',
meta={'current': 0, 'total': 100, 'status': '初始化中...'}
)
total_records = count_records(filters)
processed = 0
for batch in fetch_records_in_batches(filters, batch_size=1000):
process_batch(batch, report_type)
processed += len(batch)
# 更新进度
progress = int((processed / total_records) * 100)
self.update_state(
state='PROGRESS',
meta={'current': processed, 'total': total_records, 'progress': progress}
)
return {'status': 'success', 'total_records': total_records}
except SoftTimeLimitExceeded:
logger.error("报告生成超时")
raise Exception("报告生成超时")
@shared_task(bind=True)
def process_batch(self, batch_data):
"""处理大批量操作"""
results = []
for item in batch_data:
try:
result = process_item(item)
results.append(result)
except Exception as e:
logger.error(f"处理项目 {item} 出错:{e}")
results.append({'status': 'failed', 'error': str(e)})
return {'processed': len(results), 'results': results}
# 使用 Beat 调度器的周期性任务
from celery.schedules import crontab
app.conf.beat_schedule = {
'cleanup-expired-sessions': {
'task': 'tasks.cleanup_expired_sessions',
'schedule': crontab(minute=0, hour='*/6'), # 每6小时
'args': ()
},
'generate-daily-report': {
'task': 'tasks.generate_daily_report',
'schedule': crontab(hour=0, minute=0), # 每天午夜
'args': ()
},
'sync-external-data': {
'task': 'tasks.sync_external_data',
'schedule': crontab(minute=0), # 每小时
'args': ()
},
}
@shared_task
def cleanup_expired_sessions():
"""清理过期会话"""
deleted_count = Session.query.filter(
Session.expires_at < datetime.utcnow()
).delete()
db.session.commit()
return {'deleted': deleted_count}
@shared_task
def sync_external_data():
"""从外部API同步数据"""
try:
data = fetch_from_external_api()
for item in data:
update_or_create_record(item)
return {'status': 'success', 'synced_items': len(data)}
except Exception as e:
logger.error(f"同步失败:{e}")
raise
# Flask集成
from flask import Blueprint, jsonify
celery_bp = Blueprint('celery', __name__, url_prefix='/api/tasks')
@celery_bp.route('/<task_id>/status', methods=['GET'])
def task_status(task_id):
"""获取任务状态"""
result = app.AsyncResult(task_id)
return jsonify({
'task_id': task_id,
'status': result.status,
'result': result.result if result.ready() else None,
'progress': result.info if result.state == 'PROGRESS' else None
})
@celery_bp.route('/send-email', methods=['POST'])
def trigger_email():
"""触发发送电子邮件任务"""
data = request.json
task = send_email.delay(data['user_id'], data['subject'])
return jsonify({'task_id': task.id}), 202
2. 使用 Bull Queue 的 Node.js
// queue.js
const Queue = require('bull');
const redis = require('redis');
const redisClient = redis.createClient({
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379
});
// 创建作业队列
const emailQueue = new Queue('emails', {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379
}
});
const reportQueue = new Queue('reports', {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379
}
});
const batchQueue = new Queue('batch', {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379
}
});
// 处理电子邮件作业
emailQueue.process(5, async (job) => {
const { userId, subject, body } = job.data;
try {
const user = await User.findById(userId);
if (!user) {
throw new Error(`用户 ${userId} 未找到`);
}
await sendEmailHelper(user.email, subject, body);
return { status: 'success', userId };
} catch (error) {
// 指数退避重试
throw error;
}
});
// 处理报告作业并跟踪进度
reportQueue.process(async (job) => {
const { reportType, filters } = job.data;
const totalRecords = await countRecords(filters);
for (let i = 0; i < totalRecords; i += 1000) {
const batch = await fetchRecordsBatch(filters, i, 1000);
await processBatch(batch, reportType);
// 更新进度
job.progress(Math.round((i / totalRecords) * 100));
}
return { status: 'success', totalRecords };
});
// 处理批量作业
batchQueue.process(async (job) => {
const { items } = job.data;
const results = [];
for (const item of items) {
try {
const result = await processItem(item);
results.push(result);
} catch (error) {
results.push({ status: 'failed', error: error.message });
}
}
return { processed: results.length, results };
});
// 事件侦听器
emailQueue.on('completed', (job) => {
console.log(`电子邮件作业 ${job.id} 完成`);
});
emailQueue.on('failed', (job, err) => {
console.error(`电子邮件作业 ${job.id} 失败:`, err.message);
});
emailQueue.on('progress', (job, progress) => {
console.log(`电子邮件作业 ${job.id} ${progress}% 完成`);
});
module.exports = {
emailQueue,
reportQueue,
batchQueue
};
// routes.js
const express = require('express');
const { emailQueue, reportQueue } = require('./queue');
const router = express.Router();
// 触发电子邮件作业
router.post('/send-email', async (req, res) => {
const { userId, subject, body } = req.body;
const job = await emailQueue.add(
{ userId, subject, body },
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: true
}
);
res.status(202).json({ jobId: job.id });
});
// 获取作业状态
router.get('/jobs/:jobId/status', async (req, res) => {
const job = await emailQueue.getJob(req.params.jobId);
if (!job) {
return res.status(404).json({ error: '作业未找到' });
}
const progress = await job.progress();
const state = await job.getState();
const attempts = job.attemptsMade;
res.json({
jobId: job.id,
state,
progress,
attempts,
data: job.data
});
});
module.exports = router;
3. 使用 Sidekiq 的 Ruby
# Gemfile
gem 'sidekiq', '~> 7.0'
gem 'redis'
gem 'sidekiq-scheduler'
# config/sidekiq.yml
---
:redis:
:url: redis://localhost:6379/0
:concurrency: 5
:timeout: 25
:max_retries: 3
:dead_letter_queue:
:enabled: true
:queue_name: dead_letter_queue
# app/workers/email_worker.rb
class EmailWorker
include Sidekiq::Worker
sidekiq_options queue: 'emails', retry: 3, lock: :until_executed
def perform(user_id, subject)
user = User.find(user_id)
UserMailer.send_email(user, subject).deliver_now
logger.info "向用户 #{user_id} 发送了电子邮件"
rescue StandardError => e
logger.error "发送电子邮件失败:#{e.message}"
raise
end
end
# app/workers/report_worker.rb
class ReportWorker
include Sidekiq::Worker
sidekiq_options queue: 'reports', retry: 2
def perform(report_type, filters)
total_records = Record.filter_by(filters).count
processed = 0
Record.filter_by(filters).find_in_batches(batch_size: 1000) do |batch|
process_batch(batch, report_type)
processed += batch.size
# 更新进度
Sidekiq.redis { |conn|
conn.hset("job:#{jid}", 'progress', (processed.to_f / total_records * 100).round(2))
}
end
logger.info "报告 #{report_type} 生成"
{ status: 'success', total_records: total_records }
end
end
# app/controllers/tasks_controller.rb
class TasksController < ApplicationController
def send_email
user_id = params[:user_id]
subject = params[:subject]
job_id = EmailWorker.perform_async(user_id, subject)
render json: { job_id: job_id }, status: :accepted
end
def job_status
job_id = params[:job_id]
status = Sidekiq::Status.get(job_id)
render json: {
job_id: job_id,
status: status || 'not_found'
}
end
end
# 计划作业(lib/tasks/scheduler.rake 或 config/sidekiq.yml)
sidekiq_scheduler:
cleanup_expired_sessions:
cron: '0 */6 * * *'
class: CleanupSessionsWorker
generate_daily_report:
cron: '0 0 * * *'
class: DailyReportWorker
4. 作业重试和错误处理
# 重试策略
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
import logging
import random
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=5, autoretry_for=(Exception,))
def resilient_task(self, data):
"""具有高级重试逻辑的任务"""
try:
# 尝试任务
result = perform_operation(data)
return result
except TemporaryError as exc:
# 指数退避重试
retry_delay = min(2 ** self.request.retries * 60, 3600)
raise self.retry(exc=exc, countdown=retry_delay)
except PermanentError as exc:
logger.error(f"任务 {self.request.id} 中的永久错误:{exc}")
# 不重试,只记录并失败
return {'status': 'failed', 'error': str(exc)}
except Exception as exc:
if self.request.retries < self.max_retries:
logger.warning(f"重试任务 {self.request.id},尝试 {self.request.retries + 1}")
# 添加抖动以防止雷鸣兽
jitter = random.uniform(0, 10)
raise self.retry(exc=exc, countdown=60 + jitter)
else:
raise MaxRetriesExceededError(f"任务 {self.request.id} 在 {self.max_retries} 次重试后失败")
5. 监控和可观察性
# monitoring.py
from prometheus_client import Counter, Histogram, Gauge
import time
# 指标
task_counter = Counter('celery_task_total', '总任务数', ['task_name', 'status'])
task_duration = Histogram('celery_task_duration_seconds', '任务持续时间', ['task_name'])
task_queue_size = Gauge('celery_queue_size', '队列大小', ['queue_name'])
def track_task_metrics(task_name):
def decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
task_counter.labels(task_name=task_name, status='success').inc()
return result
except Exception as e:
task_counter.labels(task_name=task_name, status='failed').inc()
raise
finally:
duration = time.time() - start_time
task_duration.labels(task_name=task_name).observe(duration)
return wrapper
return decorator
@shared_task
@track_task_metrics('send_email')
def send_email_tracked(user_id, subject):
# 任务实现
pass
最佳实践
✅ 做
- 使用任务超时以防止挂起作业
- 实现具有指数退避的重试逻辑
- 使任务幂等
- 为关键任务使用作业优先级
- 监控队列深度和作业失败
- 记录作业执行详情
- 清理完成的作业
- 设置适当的批量大小以提高内存效率
- 使用死信队列处理失败的作业
- 独立测试作业
❌ 不做
- 在异步任务中使用同步操作
- 忽略作业失败
- 使任务依赖于外部状态
- 使用无界重试
- 在作业数据中存储大对象
- 忘记处理超时
- 无监控地运行作业
- 在队列中使用阻塞操作
- 忘记跟踪作业进度
- 将不相关的操作混合在一个作业中
完整示例
from celery import shared_task
from celery_app import app
@shared_task
def simple_task(x, y):
return x + y
# 触发任务
result = simple_task.delay(4, 6)
print(result.get()) # 10