name: apscheduler description: 高级Python调度器 - 任务调度与作业队列系统 when_to_use: “后台任务执行、周期性作业、cron调度、分布式任务处理” tags: [调度, 后台作业, cron, 异步, 任务队列]
APScheduler
APScheduler是一个灵活的Python应用程序任务调度和作业队列系统。它支持同步和异步执行,提供多种调度机制,包括cron风格、基于间隔和一次性调度。
快速开始
基本同步调度器
from datetime import datetime
from apscheduler import Scheduler
from apscheduler.triggers.interval import IntervalTrigger
def tick():
print(f"Tick: {datetime.now()}")
# 创建并启动使用内存数据存储的调度器
with Scheduler() as scheduler:
scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
scheduler.run_until_stopped()
使用FastAPI的异步调度器
from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger
def cleanup_task():
print("运行清理任务...")
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = AsyncScheduler()
async with scheduler:
await scheduler.add_schedule(
cleanup_task,
IntervalTrigger(hours=1),
id="cleanup"
)
await scheduler.start_in_background()
yield
app = FastAPI(lifespan=lifespan)
常见模式
调度器
内存调度器(开发环境):
from apscheduler import AsyncScheduler
async def main():
async with AsyncScheduler() as scheduler:
# 重启时作业会丢失
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
持久化调度器(生产环境):
from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
async def main():
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
async with AsyncScheduler(data_store) as scheduler:
# 作业在重启后仍然存在
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
分布式调度器:
from apscheduler import AsyncScheduler, SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
# 调度器节点 - 从计划创建作业
async def scheduler_node():
async with AsyncScheduler(
data_store,
event_broker,
role=SchedulerRole.scheduler
) as scheduler:
await scheduler.add_schedule(task, trigger)
await scheduler.run_until_stopped()
# 工作节点 - 仅执行作业
async def worker_node():
async with AsyncScheduler(
data_store,
event_broker,
role=SchedulerRole.worker
) as scheduler:
await scheduler.run_until_stopped()
作业
简单函数作业:
def send_daily_report():
generate_report()
email_report("admin@example.com")
scheduler.add_schedule(
send_daily_report,
CronTrigger(hour=9, minute=0) # 每天上午9点
)
带参数的作业:
def process_data(source: str, destination: str, batch_size: int):
# 数据处理逻辑
pass
scheduler.add_schedule(
process_data,
IntervalTrigger(hours=1),
kwargs={
'source': 's3://incoming',
'destination': 's3://processed',
'batch_size': 1000
}
)
异步作业:
async def fetch_external_api():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as resp:
data = await resp.json()
await save_to_database(data)
scheduler.add_schedule(
fetch_external_api,
IntervalTrigger(minutes=5)
)
触发器
间隔触发器:
from apscheduler.triggers.interval import IntervalTrigger
# 每30秒
IntervalTrigger(seconds=30)
# 每2小时15分钟
IntervalTrigger(hours=2, minutes=15)
# 每3天
IntervalTrigger(days=3)
Cron触发器:
from apscheduler.triggers.cron import CronTrigger
# 周一至周五上午9:00
CronTrigger(hour=9, minute=0, day_of_week='mon-fri')
# 每15分钟
CronTrigger(minute='*/15')
# 每月最后一天午夜
CronTrigger(day='last', hour=0, minute=0)
# 使用crontab语法
CronTrigger.from_crontab('0 9 * * 1-5') # 工作日9点
日期触发器(一次性):
from datetime import datetime, timedelta
from apscheduler.triggers.date import DateTrigger
# 5分钟后
run_time = datetime.now() + timedelta(minutes=5)
DateTrigger(run_time=run_time)
# 特定日期时间
DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))
日历间隔触发器:
from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger
# 每月第一天上午9点
CalendarIntervalTrigger(months=1, hour=9, minute=0)
# 每周一上午10点
CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)
持久化
SQLite:
engine = create_async_engine("sqlite+aiosqlite:///scheduler.db")
data_store = SQLAlchemyDataStore(engine)
PostgreSQL:
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
Redis(事件代理):
from apscheduler.eventbrokers.redis import RedisEventBroker
event_broker = RedisEventBroker.from_url("redis://localhost:6379")
作业管理
获取作业结果:
async def main():
async with AsyncScheduler() as scheduler:
await scheduler.start_in_background()
# 添加保留结果的作业
job_id = await scheduler.add_job(
calculate_result,
args=(10, 20),
result_expiration_time=timedelta(hours=1)
)
# 等待结果
result = await scheduler.get_job_result(job_id, wait=True)
print(f"结果: {result.return_value}")
计划管理:
# 暂停计划
await scheduler.pause_schedule("my_schedule")
# 恢复计划
await scheduler.unpause_schedule("my_schedule")
# 移除计划
await scheduler.remove_schedule("my_schedule")
# 获取计划信息
schedule = await scheduler.get_schedule("my_schedule")
print(f"下次运行: {schedule.next_fire_time}")
事件处理:
from apscheduler import JobAdded, JobReleased
def on_job_completed(event: JobReleased):
if event.outcome == Outcome.success:
print(f"作业 {event.job_id} 成功完成")
else:
print(f"作业 {event.job_id} 失败: {event.exception}")
scheduler.subscribe(on_job_completed, JobReleased)
配置
任务默认值:
from apscheduler import TaskDefaults
task_defaults = TaskDefaults(
job_executor='threadpool',
max_running_jobs=3,
misfire_grace_time=timedelta(minutes=5)
)
scheduler = AsyncScheduler(task_defaults=task_defaults)
作业执行选项:
# 配置任务行为
await scheduler.configure_task(
my_function,
job_executor='processpool',
max_running_jobs=5,
misfire_grace_time=timedelta(minutes=10)
)
# 每个计划覆盖
await scheduler.add_schedule(
my_function,
trigger,
job_executor='threadpool', # 覆盖默认值
coalesce=CoalescePolicy.latest
)
要求
# 核心包
uv add apscheduler
# 数据库后端
uv add "apscheduler[postgresql]" # PostgreSQL
uv add "apscheduler[mongodb]" # MongoDB
uv add "apscheduler[sqlite]" # SQLite
# 事件代理
uv add "apscheduler[redis]" # Redis
uv add "apscheduler[mqtt]" # MQTT
按使用场景的依赖项:
- 基本调度:
apscheduler - PostgreSQL持久化:
asyncpg、sqlalchemy - Redis分布式:
redis - MongoDB:
motor - SQLite:
aiosqlite
最佳实践
- 生产环境使用持久化存储以确保重启后不丢失
- 设置适当的misfire_grace_time以处理系统延迟
- 更新计划时使用冲突策略
- 订阅事件以进行监控和调试
- 选择合适的执行器(I/O密集型用线程池,CPU密集型用进程池)
- 在作业函数中实现错误处理
- 使用唯一的计划ID以便管理操作
- 设置结果过期时间以防止内存泄漏