高级Python调度器Skill apscheduler

APScheduler是一个功能强大的Python任务调度和作业队列系统,支持定时任务、周期性作业、cron调度和分布式任务处理。适用于后台任务执行、自动化脚本调度、数据定时处理等场景。关键词:Python调度器、任务队列、定时任务、cron调度、后台作业、分布式任务、异步调度、作业管理

后端开发 0 次安装 0 次浏览 更新于 3/2/2026

name: apscheduler description: 高级Python调度器 - 任务调度与作业队列系统 when_to_use: “后台任务执行、周期性作业、cron调度、分布式任务处理” tags: [调度, 后台作业, cron, 异步, 任务队列]

APScheduler

APScheduler是一个灵活的Python应用程序任务调度和作业队列系统。它支持同步和异步执行,提供多种调度机制,包括cron风格、基于间隔和一次性调度。

快速开始

基本同步调度器

from datetime import datetime
from apscheduler import Scheduler
from apscheduler.triggers.interval import IntervalTrigger

def tick():
    print(f"Tick: {datetime.now()}")

# 创建并启动使用内存数据存储的调度器
with Scheduler() as scheduler:
    scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
    scheduler.run_until_stopped()

使用FastAPI的异步调度器

from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger

def cleanup_task():
    print("运行清理任务...")

@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler = AsyncScheduler()
    async with scheduler:
        await scheduler.add_schedule(
            cleanup_task,
            IntervalTrigger(hours=1),
            id="cleanup"
        )
        await scheduler.start_in_background()
        yield

app = FastAPI(lifespan=lifespan)

常见模式

调度器

内存调度器(开发环境):

from apscheduler import AsyncScheduler

async def main():
    async with AsyncScheduler() as scheduler:
        # 重启时作业会丢失
        await scheduler.add_schedule(my_task, trigger)
        await scheduler.run_until_stopped()

持久化调度器(生产环境):

from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore

async def main():
    engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
    data_store = SQLAlchemyDataStore(engine)

    async with AsyncScheduler(data_store) as scheduler:
        # 作业在重启后仍然存在
        await scheduler.add_schedule(my_task, trigger)
        await scheduler.run_until_stopped()

分布式调度器:

from apscheduler import AsyncScheduler, SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker

# 调度器节点 - 从计划创建作业
async def scheduler_node():
    async with AsyncScheduler(
        data_store,
        event_broker,
        role=SchedulerRole.scheduler
    ) as scheduler:
        await scheduler.add_schedule(task, trigger)
        await scheduler.run_until_stopped()

# 工作节点 - 仅执行作业
async def worker_node():
    async with AsyncScheduler(
        data_store,
        event_broker,
        role=SchedulerRole.worker
    ) as scheduler:
        await scheduler.run_until_stopped()

作业

简单函数作业:

def send_daily_report():
    generate_report()
    email_report("admin@example.com")

scheduler.add_schedule(
    send_daily_report,
    CronTrigger(hour=9, minute=0)  # 每天上午9点
)

带参数的作业:

def process_data(source: str, destination: str, batch_size: int):
    # 数据处理逻辑
    pass

scheduler.add_schedule(
    process_data,
    IntervalTrigger(hours=1),
    kwargs={
        'source': 's3://incoming',
        'destination': 's3://processed',
        'batch_size': 1000
    }
)

异步作业:

async def fetch_external_api():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as resp:
            data = await resp.json()
            await save_to_database(data)

scheduler.add_schedule(
    fetch_external_api,
    IntervalTrigger(minutes=5)
)

触发器

间隔触发器:

from apscheduler.triggers.interval import IntervalTrigger

# 每30秒
IntervalTrigger(seconds=30)

# 每2小时15分钟
IntervalTrigger(hours=2, minutes=15)

# 每3天
IntervalTrigger(days=3)

Cron触发器:

from apscheduler.triggers.cron import CronTrigger

# 周一至周五上午9:00
CronTrigger(hour=9, minute=0, day_of_week='mon-fri')

# 每15分钟
CronTrigger(minute='*/15')

# 每月最后一天午夜
CronTrigger(day='last', hour=0, minute=0)

# 使用crontab语法
CronTrigger.from_crontab('0 9 * * 1-5')  # 工作日9点

日期触发器(一次性):

from datetime import datetime, timedelta
from apscheduler.triggers.date import DateTrigger

# 5分钟后
run_time = datetime.now() + timedelta(minutes=5)
DateTrigger(run_time=run_time)

# 特定日期时间
DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))

日历间隔触发器:

from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger

# 每月第一天上午9点
CalendarIntervalTrigger(months=1, hour=9, minute=0)

# 每周一上午10点
CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)

持久化

SQLite:

engine = create_async_engine("sqlite+aiosqlite:///scheduler.db")
data_store = SQLAlchemyDataStore(engine)

PostgreSQL:

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)

Redis(事件代理):

from apscheduler.eventbrokers.redis import RedisEventBroker

event_broker = RedisEventBroker.from_url("redis://localhost:6379")

作业管理

获取作业结果:

async def main():
    async with AsyncScheduler() as scheduler:
        await scheduler.start_in_background()

        # 添加保留结果的作业
        job_id = await scheduler.add_job(
            calculate_result,
            args=(10, 20),
            result_expiration_time=timedelta(hours=1)
        )

        # 等待结果
        result = await scheduler.get_job_result(job_id, wait=True)
        print(f"结果: {result.return_value}")

计划管理:

# 暂停计划
await scheduler.pause_schedule("my_schedule")

# 恢复计划
await scheduler.unpause_schedule("my_schedule")

# 移除计划
await scheduler.remove_schedule("my_schedule")

# 获取计划信息
schedule = await scheduler.get_schedule("my_schedule")
print(f"下次运行: {schedule.next_fire_time}")

事件处理:

from apscheduler import JobAdded, JobReleased

def on_job_completed(event: JobReleased):
    if event.outcome == Outcome.success:
        print(f"作业 {event.job_id} 成功完成")
    else:
        print(f"作业 {event.job_id} 失败: {event.exception}")

scheduler.subscribe(on_job_completed, JobReleased)

配置

任务默认值:

from apscheduler import TaskDefaults

task_defaults = TaskDefaults(
    job_executor='threadpool',
    max_running_jobs=3,
    misfire_grace_time=timedelta(minutes=5)
)

scheduler = AsyncScheduler(task_defaults=task_defaults)

作业执行选项:

# 配置任务行为
await scheduler.configure_task(
    my_function,
    job_executor='processpool',
    max_running_jobs=5,
    misfire_grace_time=timedelta(minutes=10)
)

# 每个计划覆盖
await scheduler.add_schedule(
    my_function,
    trigger,
    job_executor='threadpool',  # 覆盖默认值
    coalesce=CoalescePolicy.latest
)

要求

# 核心包
uv add apscheduler

# 数据库后端
uv add "apscheduler[postgresql]"  # PostgreSQL
uv add "apscheduler[mongodb]"     # MongoDB
uv add "apscheduler[sqlite]"      # SQLite

# 事件代理
uv add "apscheduler[redis]"       # Redis
uv add "apscheduler[mqtt]"        # MQTT

按使用场景的依赖项:

  • 基本调度apscheduler
  • PostgreSQL持久化asyncpgsqlalchemy
  • Redis分布式redis
  • MongoDBmotor
  • SQLiteaiosqlite

最佳实践

  1. 生产环境使用持久化存储以确保重启后不丢失
  2. 设置适当的misfire_grace_time以处理系统延迟
  3. 更新计划时使用冲突策略
  4. 订阅事件以进行监控和调试
  5. 选择合适的执行器(I/O密集型用线程池,CPU密集型用进程池)
  6. 在作业函数中实现错误处理
  7. 使用唯一的计划ID以便管理操作
  8. 设置结果过期时间以防止内存泄漏