name: python-background-jobs description: Python 后台作业模式,包括任务队列、工作者和事件驱动架构。适用于实现异步任务处理、任务队列、长时运行操作或解耦工作从请求/响应周期。
Python 后台任务与任务队列
将长时运行或不可靠的工作从请求/响应周期中解耦。立即返回给用户,而后台工作者异步处理繁重任务。
何时使用此技能
- 处理耗时超过几秒的任务
- 发送电子邮件、通知或网络钩子
- 生成报告或导出数据
- 处理上传或媒体转换
- 集成不可靠的外部服务
- 构建事件驱动架构
核心概念
1. 任务队列模式
API 接收请求,将作业入队,立即返回作业 ID。工作者异步处理作业。
2. 幂等性
任务在失败时可能重试。设计为安全重执行。
3. 作业状态机
作业通过状态转换:待处理 → 运行中 → 成功/失败。
4. 至少一次交付
大多数队列保证至少一次交付。代码必须处理重复项。
快速开始
此技能使用 Celery 作为示例,一个广泛采用的任务队列。替代方案如 RQ、Dramatiq 和云原生解决方案(AWS SQS、GCP Tasks)同样有效。
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
@app.task
def send_email(to: str, subject: str, body: str) -> None:
# 这在一个后台工作者中运行
email_client.send(to, subject, body)
# 在您的 API 处理器中
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
基础模式
模式 1:立即返回作业 ID
对于超过几秒的操作,返回作业 ID 并异步处理。
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
@dataclass
class Job:
id: str
status: JobStatus
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
result: dict | None = None
error: str | None = None
# API 端点
async def start_export(request: ExportRequest) -> JobResponse:
"""启动导出作业并返回作业 ID。"""
job_id = str(uuid4())
# 持久化作业记录
await jobs_repo.create(Job(
id=job_id,
status=JobStatus.PENDING,
created_at=datetime.utcnow(),
))
# 将任务入队以进行后台处理
await task_queue.enqueue(
"export_data",
job_id=job_id,
params=request.model_dump(),
)
# 立即返回作业 ID
return JobResponse(
job_id=job_id,
status="pending",
poll_url=f"/jobs/{job_id}",
)
模式 2:Celery 任务配置
配置 Celery 任务,设置适当的重试和超时设置。
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
# 全局配置
app.conf.update(
task_time_limit=3600, # 硬限制:1 小时
task_soft_time_limit=3000, # 软限制:50 分钟
task_acks_late=True, # 完成后确认
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1, # 不要预取过多任务
)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
)
def process_payment(self, payment_id: str) -> dict:
"""处理支付,对瞬时错误自动重试。"""
try:
result = payment_gateway.charge(payment_id)
return {"status": "success", "transaction_id": result.id}
except PaymentDeclinedError as e:
# 不要重试永久失败
return {"status": "declined", "reason": str(e)}
except TransientError as e:
# 使用指数退避重试
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
模式 3:使任务幂等
工作者可能在崩溃或超时后重试。设计为安全重执行。
@app.task(bind=True)
def process_order(self, order_id: str) -> None:
"""幂等地处理订单。"""
order = orders_repo.get(order_id)
# 已经处理过了?提前返回
if order.status == OrderStatus.COMPLETED:
logger.info("订单已处理", order_id=order_id)
return
# 已经在进行中?检查是否应该继续
if order.status == OrderStatus.PROCESSING:
# 使用幂等键避免双重收费
pass
# 使用幂等键处理
result = payment_provider.charge(
amount=order.total,
idempotency_key=f"order-{order_id}", # 关键!
)
orders_repo.update(order_id, status=OrderStatus.COMPLETED)
幂等性策略:
- 检查-后写:在行动前验证状态
- 幂等键:为外部服务调用使用唯一令牌
- 插入更新模式:
INSERT ... ON CONFLICT UPDATE - 去重窗口:跟踪 N 小时内已处理的 ID
模式 4:作业状态管理
持久化作业状态转换以提供可见性和调试。
class JobRepository:
"""管理作业状态的仓库。"""
async def create(self, job: Job) -> Job:
"""创建新作业记录。"""
await self._db.execute(
"""INSERT INTO jobs (id, status, created_at)
VALUES ($1, $2, $3)""",
job.id, job.status.value, job.created_at,
)
return job
async def update_status(
self,
job_id: str,
status: JobStatus,
**fields,
) -> None:
"""更新作业状态,带时间戳。"""
updates = {"status": status.value, **fields}
if status == JobStatus.RUNNING:
updates["started_at"] = datetime.utcnow()
elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
updates["completed_at"] = datetime.utcnow()
await self._db.execute(
"UPDATE jobs SET status = $1, ... WHERE id = $2",
updates, job_id,
)
logger.info(
"作业状态已更新",
job_id=job_id,
status=status.value,
)
高级模式
模式 5:死信队列
处理永久失败的任务以供手动检查。
@app.task(bind=True, max_retries=3)
def process_webhook(self, webhook_id: str, payload: dict) -> None:
"""处理网络钩子,失败时移至死信队列。"""
try:
result = send_webhook(payload)
if not result.success:
raise WebhookFailedError(result.error)
except Exception as e:
if self.request.retries >= self.max_retries:
# 移至死信队列以供手动检查
dead_letter_queue.send({
"task": "process_webhook",
"webhook_id": webhook_id,
"payload": payload,
"error": str(e),
"attempts": self.request.retries + 1,
"failed_at": datetime.utcnow().isoformat(),
})
logger.error(
"网络钩子在最大重试后移至死信队列",
webhook_id=webhook_id,
error=str(e),
)
return
# 指数退避重试
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
模式 6:状态轮询端点
为客户端提供一个端点来检查作业状态。
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str) -> JobStatusResponse:
"""获取后台作业的当前状态。"""
job = await jobs_repo.get(job_id)
if job is None:
raise HTTPException(404, f"作业 {job_id} 未找到")
return JobStatusResponse(
job_id=job.id,
status=job.status.value,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
result=job.result if job.status == JobStatus.SUCCEEDED else None,
error=job.error if job.status == JobStatus.FAILED else None,
# 对客户端有帮助
is_terminal=job.status in (JobStatus.SUCCEEDED, JobStatus.FAILED),
)
模式 7:任务链和工作流
从简单任务组合复杂工作流。
from celery import chain, group, chord
# 简单链:A → B → C
workflow = chain(
extract_data.s(source_id),
transform_data.s(),
load_data.s(destination_id),
)
# 并行执行:A, B, C 同时运行
parallel = group(
send_email.s(user_email),
send_sms.s(user_phone),
update_analytics.s(event_data),
)
# 和弦:并行运行任务,然后一个回调
# 处理所有项目,然后发送完成通知
workflow = chord(
[process_item.s(item_id) for item_id in item_ids],
send_completion_notification.s(batch_id),
)
workflow.apply_async()
模式 8:替代任务队列
根据需求选择合适的工具。
RQ(Redis 队列):简单,基于 Redis
from rq import Queue
from redis import Redis
queue = Queue(connection=Redis())
job = queue.enqueue(send_email, "user@example.com", "Subject", "Body")
Dramatiq:现代 Celery 替代品
import dramatiq
from dramatiq.brokers.redis import RedisBroker
dramatiq.set_broker(RedisBroker())
@dramatiq.actor
def send_email(to: str, subject: str, body: str) -> None:
email_client.send(to, subject, body)
云原生选项:
- AWS SQS + Lambda
- Google Cloud Tasks
- Azure Functions
最佳实践总结
- 立即返回 - 不要为长时操作阻塞请求
- 持久化作业状态 - 启用状态轮询和调试
- 使任务幂等 - 在任何失败时安全重试
- 使用幂等键 - 对于外部服务调用
- 设置超时 - 同时设置软和硬限制
- 实现死信队列 - 捕获永久失败的任务
- 记录状态转换 - 跟踪作业状态变化
- 适当重试 - 对瞬时错误使用指数退避
- 不要重试永久失败 - 验证错误、无效凭据
- 监控队列深度 - 对积压增长发出警报