Convex定时任务调度Skill convex-cron-jobs

Convex定时任务调度是一种在Convex平台中自动化执行后台任务的技术,支持基于间隔和Cron表达式的调度,提供作业监控、自动重试和错误处理功能。适用于数据同步、清理操作、报告生成等场景,关键词包括Convex、定时任务、后台调度、Cron表达式、自动化、作业监控、Serverless、后端开发。

后端开发 0 次安装 0 次浏览 更新于 3/17/2026

name: convex-cron-jobs displayName: Convex Cron Jobs description: 用于后台任务的定时函数模式,包括间隔调度、Cron表达式、作业监控、重试策略以及长时间运行任务的最佳实践 version: 1.0.0 author: Convex tags: [convex, cron, scheduling, background-jobs, automation]

Convex Cron Jobs

在Convex应用中调度重复性函数用于后台任务、清理作业、数据同步和自动化工作流。

文档来源

在实现之前,不要假设;获取最新文档:

说明

Cron Jobs 概述

Convex cron jobs 允许您按固定间隔或特定时间调度函数运行。主要特性:

  • 在固定时间表上运行函数
  • 支持基于间隔和Cron表达式的调度
  • 失败时自动重试
  • 通过Convex仪表板监控

基本Cron设置

// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";

const crons = cronJobs();

// 每小时运行一次
crons.interval(
  "清理过期会话",
  { hours: 1 },
  internal.tasks.cleanupExpiredSessions,
  {}
);

// 每天UTC时间午夜运行
crons.cron(
  "每日报告",
  "0 0 * * *",
  internal.reports.generateDailyReport,
  {}
);

export default crons;

基于间隔的调度

使用 crons.interval 进行简单的重复任务:

// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";

const crons = cronJobs();

// 每5分钟
crons.interval(
  "同步外部数据",
  { minutes: 5 },
  internal.sync.fetchExternalData,
  {}
);

// 每2小时
crons.interval(
  "清理临时文件",
  { hours: 2 },
  internal.files.cleanupTempFiles,
  {}
);

// 每30秒(最小间隔)
crons.interval(
  "健康检查",
  { seconds: 30 },
  internal.monitoring.healthCheck,
  {}
);

export default crons;

Cron表达式调度

使用 crons.cron 通过Cron表达式进行精确调度:

// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";

const crons = cronJobs();

// 每天UTC时间9点
crons.cron(
  "早晨通知",
  "0 9 * * *",
  internal.notifications.sendMorningDigest,
  {}
);

// 每周一UTC时间8点
crons.cron(
  "每周总结",
  "0 8 * * 1",
  internal.reports.generateWeeklySummary,
  {}
);

// 每月第一天午夜
crons.cron(
  "月度计费",
  "0 0 1 * *",
  internal.billing.processMonthlyBilling,
  {}
);

// 每15分钟
crons.cron(
  "频繁同步",
  "*/15 * * * *",
  internal.sync.syncData,
  {}
);

export default crons;

Cron表达式参考

┌───────────── 分钟 (0-59)
│ ┌───────────── 小时 (0-23)
│ │ ┌───────────── 月内日期 (1-31)
│ │ │ ┌───────────── 月份 (1-12)
│ │ │ │ ┌───────────── 星期几 (0-6, 周日=0)
│ │ │ │ │
* * * * *

常见模式:

  • * * * * * - 每分钟
  • 0 * * * * - 每小时
  • 0 0 * * * - 每天午夜
  • 0 0 * * 0 - 每周日午夜
  • 0 0 1 * * - 每月第一天
  • */5 * * * * - 每5分钟
  • 0 9-17 * * 1-5 - 工作日每天9点到17点每小时

用于Cron的内部函数

Cron jobs 应调用内部函数以确保安全:

// convex/tasks.ts
import { internalMutation, internalQuery } from "./_generated/server";
import { v } from "convex/values";

// 清理过期会话
export const cleanupExpiredSessions = internalMutation({
  args: {},
  returns: v.number(),
  handler: async (ctx) => {
    const oneHourAgo = Date.now() - 60 * 60 * 1000;
    
    const expiredSessions = await ctx.db
      .query("sessions")
      .withIndex("by_lastActive")
      .filter((q) => q.lt(q.field("lastActive"), oneHourAgo))
      .collect();

    for (const session of expiredSessions) {
      await ctx.db.delete(session._id);
    }

    return expiredSessions.length;
  },
});

// 处理待处理任务
export const processPendingTasks = internalMutation({
  args: {},
  returns: v.null(),
  handler: async (ctx) => {
    const pendingTasks = await ctx.db
      .query("tasks")
      .withIndex("by_status", (q) => q.eq("status", "pending"))
      .take(100);

    for (const task of pendingTasks) {
      await ctx.db.patch(task._id, {
        status: "processing",
        startedAt: Date.now(),
      });
      
      // 调度实际处理
      await ctx.scheduler.runAfter(0, internal.tasks.processTask, {
        taskId: task._id,
      });
    }

    return null;
  },
});

带参数的Cron Jobs

向Cron jobs传递静态参数:

// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";

const crons = cronJobs();

// 不同清理间隔对应不同类型
crons.interval(
  "清理临时文件",
  { hours: 1 },
  internal.cleanup.cleanupByType,
  { fileType: "temp", maxAge: 3600000 }
);

crons.interval(
  "清理缓存文件",
  { hours: 24 },
  internal.cleanup.cleanupByType,
  { fileType: "cache", maxAge: 86400000 }
);

export default crons;
// convex/cleanup.ts
import { internalMutation } from "./_generated/server";
import { v } from "convex/values";

export const cleanupByType = internalMutation({
  args: {
    fileType: v.string(),
    maxAge: v.number(),
  },
  returns: v.number(),
  handler: async (ctx, args) => {
    const cutoff = Date.now() - args.maxAge;
    
    const oldFiles = await ctx.db
      .query("files")
      .withIndex("by_type_and_created", (q) => 
        q.eq("type", args.fileType).lt("createdAt", cutoff)
      )
      .collect();

    for (const file of oldFiles) {
      await ctx.storage.delete(file.storageId);
      await ctx.db.delete(file._id);
    }

    return oldFiles.length;
  },
});

监控和日志记录

添加日志以跟踪Cron job执行:

// convex/tasks.ts
import { internalMutation } from "./_generated/server";
import { v } from "convex/values";

export const cleanupWithLogging = internalMutation({
  args: {},
  returns: v.null(),
  handler: async (ctx) => {
    const startTime = Date.now();
    let processedCount = 0;
    let errorCount = 0;

    try {
      const expiredItems = await ctx.db
        .query("items")
        .withIndex("by_expiresAt")
        .filter((q) => q.lt(q.field("expiresAt"), Date.now()))
        .collect();

      for (const item of expiredItems) {
        try {
          await ctx.db.delete(item._id);
          processedCount++;
        } catch (error) {
          errorCount++;
          console.error(`删除项目 ${item._id} 失败:`, error);
        }
      }

      // 记录作业完成
      await ctx.db.insert("cronLogs", {
        jobName: "cleanup",
        startTime,
        endTime: Date.now(),
        duration: Date.now() - startTime,
        processedCount,
        errorCount,
        status: errorCount === 0 ? "success" : "partial",
      });
    } catch (error) {
      // 记录作业失败
      await ctx.db.insert("cronLogs", {
        jobName: "cleanup",
        startTime,
        endTime: Date.now(),
        duration: Date.now() - startTime,
        processedCount,
        errorCount,
        status: "failed",
        error: String(error),
      });
      throw error;
    }

    return null;
  },
});

大数据集的分批处理

分批处理大数据集以避免超时:

// convex/tasks.ts
import { internalMutation } from "./_generated/server";
import { internal } from "./_generated/api";
import { v } from "convex/values";

const BATCH_SIZE = 100;

export const processBatch = internalMutation({
  args: {
    cursor: v.optional(v.string()),
  },
  returns: v.null(),
  handler: async (ctx, args) => {
    const result = await ctx.db
      .query("items")
      .withIndex("by_status", (q) => q.eq("status", "pending"))
      .paginate({ numItems: BATCH_SIZE, cursor: args.cursor ?? null });

    for (const item of result.page) {
      await ctx.db.patch(item._id, {
        status: "processed",
        processedAt: Date.now(),
      });
    }

    // 如果有更多项目,调度下一批
    if (!result.isDone) {
      await ctx.scheduler.runAfter(0, internal.tasks.processBatch, {
        cursor: result.continueCursor,
      });
    }

    return null;
  },
});

Cron中的外部API调用

使用actions进行外部API调用:

// convex/sync.ts
"use node";

import { internalAction } from "./_generated/server";
import { internal } from "./_generated/api";
import { v } from "convex/values";

export const syncExternalData = internalAction({
  args: {},
  returns: v.null(),
  handler: async (ctx) => {
    // 从外部API获取
    const response = await fetch("https://api.example.com/data", {
      headers: {
        Authorization: `Bearer ${process.env.API_KEY}`,
      },
    });

    if (!response.ok) {
      throw new Error(`API请求失败: ${response.status}`);
    }

    const data = await response.json();

    // 使用mutation存储数据
    await ctx.runMutation(internal.sync.storeExternalData, {
      data,
      syncedAt: Date.now(),
    });

    return null;
  },
});

export const storeExternalData = internalMutation({
  args: {
    data: v.any(),
    syncedAt: v.number(),
  },
  returns: v.null(),
  handler: async (ctx, args) => {
    await ctx.db.insert("externalData", {
      data: args.data,
      syncedAt: args.syncedAt,
    });
    return null;
  },
});
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";

const crons = cronJobs();

crons.interval(
  "同步外部数据",
  { minutes: 15 },
  internal.sync.syncExternalData,
  {}
);

export default crons;

示例

Cron Job日志记录的模式

// convex/schema.ts
import { defineSchema, defineTable } from "convex/server";
import { v } from "convex/values";

export default defineSchema({
  cronLogs: defineTable({
    jobName: v.string(),
    startTime: v.number(),
    endTime: v.number(),
    duration: v.number(),
    processedCount: v.number(),
    errorCount: v.number(),
    status: v.union(
      v.literal("success"),
      v.literal("partial"),
      v.literal("failed")
    ),
    error: v.optional(v.string()),
  })
    .index("by_job", ["jobName"])
    .index("by_status", ["status"])
    .index("by_startTime", ["startTime"]),

  sessions: defineTable({
    userId: v.id("users"),
    token: v.string(),
    lastActive: v.number(),
    expiresAt: v.number(),
  })
    .index("by_user", ["userId"])
    .index("by_lastActive", ["lastActive"])
    .index("by_expiresAt", ["expiresAt"]),

  tasks: defineTable({
    type: v.string(),
    status: v.union(
      v.literal("pending"),
      v.literal("processing"),
      v.literal("completed"),
      v.literal("failed")
    ),
    data: v.any(),
    createdAt: v.number(),
    startedAt: v.optional(v.number()),
    completedAt: v.optional(v.number()),
  })
    .index("by_status", ["status"])
    .index("by_type_and_status", ["type", "status"]),
});

完整Cron配置示例

// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";

const crons = cronJobs();

// 清理任务
crons.interval(
  "清理过期会话",
  { hours: 1 },
  internal.cleanup.expiredSessions,
  {}
);

crons.interval(
  "清理旧日志",
  { hours: 24 },
  internal.cleanup.oldLogs,
  { maxAgeDays: 30 }
);

// 同步任务
crons.interval(
  "同步用户数据",
  { minutes: 15 },
  internal.sync.userData,
  {}
);

// 报告任务
crons.cron(
  "每日分析",
  "0 1 * * *",
  internal.reports.dailyAnalytics,
  {}
);

crons.cron(
  "每周总结",
  "0 9 * * 1",
  internal.reports.weeklySummary,
  {}
);

// 健康检查
crons.interval(
  "服务健康检查",
  { minutes: 5 },
  internal.monitoring.healthCheck,
  {}
);

export default crons;

最佳实践

  • 除非明确指示,否则不要运行 npx convex deploy
  • 除非明确指示,否则不要运行任何git命令
  • 仅使用 crons.intervalcrons.cron 方法,不要使用已弃用的助手
  • 为安全起见,Cron jobs始终调用内部函数
  • 即使在同一文件中,也从 _generated/api 导入 internal
  • 为生产Cron jobs添加日志记录和监控
  • 对处理大数据集的操作使用分批处理
  • 优雅地处理错误以防止作业失败
  • 为仪表板可见性使用有意义的作业名称
  • 使用Cron表达式时考虑时区(Convex使用UTC)

常见陷阱

  1. 使用公共函数 - Cron jobs应仅调用内部函数
  2. 长时间运行的mutations - 将大型操作分解为批次
  3. 缺少错误处理 - 未处理的错误将使整个作业失败
  4. 忘记时区 - 所有Cron表达式使用UTC
  5. 使用已弃用的助手 - 避免 crons.hourlycrons.daily
  6. 不记录执行 - 使得调试生产问题困难

参考资料