name: convex-cron-jobs displayName: Convex Cron Jobs description: 用于后台任务的定时函数模式,包括间隔调度、Cron表达式、作业监控、重试策略以及长时间运行任务的最佳实践 version: 1.0.0 author: Convex tags: [convex, cron, scheduling, background-jobs, automation]
Convex Cron Jobs
在Convex应用中调度重复性函数用于后台任务、清理作业、数据同步和自动化工作流。
文档来源
在实现之前,不要假设;获取最新文档:
- 主要:https://docs.convex.dev/scheduling/cron-jobs
- 调度概述:https://docs.convex.dev/scheduling
- 定时函数:https://docs.convex.dev/scheduling/scheduled-functions
- 更广泛上下文:https://docs.convex.dev/llms.txt
说明
Cron Jobs 概述
Convex cron jobs 允许您按固定间隔或特定时间调度函数运行。主要特性:
- 在固定时间表上运行函数
- 支持基于间隔和Cron表达式的调度
- 失败时自动重试
- 通过Convex仪表板监控
基本Cron设置
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// 每小时运行一次
crons.interval(
"清理过期会话",
{ hours: 1 },
internal.tasks.cleanupExpiredSessions,
{}
);
// 每天UTC时间午夜运行
crons.cron(
"每日报告",
"0 0 * * *",
internal.reports.generateDailyReport,
{}
);
export default crons;
基于间隔的调度
使用 crons.interval 进行简单的重复任务:
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// 每5分钟
crons.interval(
"同步外部数据",
{ minutes: 5 },
internal.sync.fetchExternalData,
{}
);
// 每2小时
crons.interval(
"清理临时文件",
{ hours: 2 },
internal.files.cleanupTempFiles,
{}
);
// 每30秒(最小间隔)
crons.interval(
"健康检查",
{ seconds: 30 },
internal.monitoring.healthCheck,
{}
);
export default crons;
Cron表达式调度
使用 crons.cron 通过Cron表达式进行精确调度:
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// 每天UTC时间9点
crons.cron(
"早晨通知",
"0 9 * * *",
internal.notifications.sendMorningDigest,
{}
);
// 每周一UTC时间8点
crons.cron(
"每周总结",
"0 8 * * 1",
internal.reports.generateWeeklySummary,
{}
);
// 每月第一天午夜
crons.cron(
"月度计费",
"0 0 1 * *",
internal.billing.processMonthlyBilling,
{}
);
// 每15分钟
crons.cron(
"频繁同步",
"*/15 * * * *",
internal.sync.syncData,
{}
);
export default crons;
Cron表达式参考
┌───────────── 分钟 (0-59)
│ ┌───────────── 小时 (0-23)
│ │ ┌───────────── 月内日期 (1-31)
│ │ │ ┌───────────── 月份 (1-12)
│ │ │ │ ┌───────────── 星期几 (0-6, 周日=0)
│ │ │ │ │
* * * * *
常见模式:
* * * * *- 每分钟0 * * * *- 每小时0 0 * * *- 每天午夜0 0 * * 0- 每周日午夜0 0 1 * *- 每月第一天*/5 * * * *- 每5分钟0 9-17 * * 1-5- 工作日每天9点到17点每小时
用于Cron的内部函数
Cron jobs 应调用内部函数以确保安全:
// convex/tasks.ts
import { internalMutation, internalQuery } from "./_generated/server";
import { v } from "convex/values";
// 清理过期会话
export const cleanupExpiredSessions = internalMutation({
args: {},
returns: v.number(),
handler: async (ctx) => {
const oneHourAgo = Date.now() - 60 * 60 * 1000;
const expiredSessions = await ctx.db
.query("sessions")
.withIndex("by_lastActive")
.filter((q) => q.lt(q.field("lastActive"), oneHourAgo))
.collect();
for (const session of expiredSessions) {
await ctx.db.delete(session._id);
}
return expiredSessions.length;
},
});
// 处理待处理任务
export const processPendingTasks = internalMutation({
args: {},
returns: v.null(),
handler: async (ctx) => {
const pendingTasks = await ctx.db
.query("tasks")
.withIndex("by_status", (q) => q.eq("status", "pending"))
.take(100);
for (const task of pendingTasks) {
await ctx.db.patch(task._id, {
status: "processing",
startedAt: Date.now(),
});
// 调度实际处理
await ctx.scheduler.runAfter(0, internal.tasks.processTask, {
taskId: task._id,
});
}
return null;
},
});
带参数的Cron Jobs
向Cron jobs传递静态参数:
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// 不同清理间隔对应不同类型
crons.interval(
"清理临时文件",
{ hours: 1 },
internal.cleanup.cleanupByType,
{ fileType: "temp", maxAge: 3600000 }
);
crons.interval(
"清理缓存文件",
{ hours: 24 },
internal.cleanup.cleanupByType,
{ fileType: "cache", maxAge: 86400000 }
);
export default crons;
// convex/cleanup.ts
import { internalMutation } from "./_generated/server";
import { v } from "convex/values";
export const cleanupByType = internalMutation({
args: {
fileType: v.string(),
maxAge: v.number(),
},
returns: v.number(),
handler: async (ctx, args) => {
const cutoff = Date.now() - args.maxAge;
const oldFiles = await ctx.db
.query("files")
.withIndex("by_type_and_created", (q) =>
q.eq("type", args.fileType).lt("createdAt", cutoff)
)
.collect();
for (const file of oldFiles) {
await ctx.storage.delete(file.storageId);
await ctx.db.delete(file._id);
}
return oldFiles.length;
},
});
监控和日志记录
添加日志以跟踪Cron job执行:
// convex/tasks.ts
import { internalMutation } from "./_generated/server";
import { v } from "convex/values";
export const cleanupWithLogging = internalMutation({
args: {},
returns: v.null(),
handler: async (ctx) => {
const startTime = Date.now();
let processedCount = 0;
let errorCount = 0;
try {
const expiredItems = await ctx.db
.query("items")
.withIndex("by_expiresAt")
.filter((q) => q.lt(q.field("expiresAt"), Date.now()))
.collect();
for (const item of expiredItems) {
try {
await ctx.db.delete(item._id);
processedCount++;
} catch (error) {
errorCount++;
console.error(`删除项目 ${item._id} 失败:`, error);
}
}
// 记录作业完成
await ctx.db.insert("cronLogs", {
jobName: "cleanup",
startTime,
endTime: Date.now(),
duration: Date.now() - startTime,
processedCount,
errorCount,
status: errorCount === 0 ? "success" : "partial",
});
} catch (error) {
// 记录作业失败
await ctx.db.insert("cronLogs", {
jobName: "cleanup",
startTime,
endTime: Date.now(),
duration: Date.now() - startTime,
processedCount,
errorCount,
status: "failed",
error: String(error),
});
throw error;
}
return null;
},
});
大数据集的分批处理
分批处理大数据集以避免超时:
// convex/tasks.ts
import { internalMutation } from "./_generated/server";
import { internal } from "./_generated/api";
import { v } from "convex/values";
const BATCH_SIZE = 100;
export const processBatch = internalMutation({
args: {
cursor: v.optional(v.string()),
},
returns: v.null(),
handler: async (ctx, args) => {
const result = await ctx.db
.query("items")
.withIndex("by_status", (q) => q.eq("status", "pending"))
.paginate({ numItems: BATCH_SIZE, cursor: args.cursor ?? null });
for (const item of result.page) {
await ctx.db.patch(item._id, {
status: "processed",
processedAt: Date.now(),
});
}
// 如果有更多项目,调度下一批
if (!result.isDone) {
await ctx.scheduler.runAfter(0, internal.tasks.processBatch, {
cursor: result.continueCursor,
});
}
return null;
},
});
Cron中的外部API调用
使用actions进行外部API调用:
// convex/sync.ts
"use node";
import { internalAction } from "./_generated/server";
import { internal } from "./_generated/api";
import { v } from "convex/values";
export const syncExternalData = internalAction({
args: {},
returns: v.null(),
handler: async (ctx) => {
// 从外部API获取
const response = await fetch("https://api.example.com/data", {
headers: {
Authorization: `Bearer ${process.env.API_KEY}`,
},
});
if (!response.ok) {
throw new Error(`API请求失败: ${response.status}`);
}
const data = await response.json();
// 使用mutation存储数据
await ctx.runMutation(internal.sync.storeExternalData, {
data,
syncedAt: Date.now(),
});
return null;
},
});
export const storeExternalData = internalMutation({
args: {
data: v.any(),
syncedAt: v.number(),
},
returns: v.null(),
handler: async (ctx, args) => {
await ctx.db.insert("externalData", {
data: args.data,
syncedAt: args.syncedAt,
});
return null;
},
});
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
crons.interval(
"同步外部数据",
{ minutes: 15 },
internal.sync.syncExternalData,
{}
);
export default crons;
示例
Cron Job日志记录的模式
// convex/schema.ts
import { defineSchema, defineTable } from "convex/server";
import { v } from "convex/values";
export default defineSchema({
cronLogs: defineTable({
jobName: v.string(),
startTime: v.number(),
endTime: v.number(),
duration: v.number(),
processedCount: v.number(),
errorCount: v.number(),
status: v.union(
v.literal("success"),
v.literal("partial"),
v.literal("failed")
),
error: v.optional(v.string()),
})
.index("by_job", ["jobName"])
.index("by_status", ["status"])
.index("by_startTime", ["startTime"]),
sessions: defineTable({
userId: v.id("users"),
token: v.string(),
lastActive: v.number(),
expiresAt: v.number(),
})
.index("by_user", ["userId"])
.index("by_lastActive", ["lastActive"])
.index("by_expiresAt", ["expiresAt"]),
tasks: defineTable({
type: v.string(),
status: v.union(
v.literal("pending"),
v.literal("processing"),
v.literal("completed"),
v.literal("failed")
),
data: v.any(),
createdAt: v.number(),
startedAt: v.optional(v.number()),
completedAt: v.optional(v.number()),
})
.index("by_status", ["status"])
.index("by_type_and_status", ["type", "status"]),
});
完整Cron配置示例
// convex/crons.ts
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
// 清理任务
crons.interval(
"清理过期会话",
{ hours: 1 },
internal.cleanup.expiredSessions,
{}
);
crons.interval(
"清理旧日志",
{ hours: 24 },
internal.cleanup.oldLogs,
{ maxAgeDays: 30 }
);
// 同步任务
crons.interval(
"同步用户数据",
{ minutes: 15 },
internal.sync.userData,
{}
);
// 报告任务
crons.cron(
"每日分析",
"0 1 * * *",
internal.reports.dailyAnalytics,
{}
);
crons.cron(
"每周总结",
"0 9 * * 1",
internal.reports.weeklySummary,
{}
);
// 健康检查
crons.interval(
"服务健康检查",
{ minutes: 5 },
internal.monitoring.healthCheck,
{}
);
export default crons;
最佳实践
- 除非明确指示,否则不要运行
npx convex deploy - 除非明确指示,否则不要运行任何git命令
- 仅使用
crons.interval或crons.cron方法,不要使用已弃用的助手 - 为安全起见,Cron jobs始终调用内部函数
- 即使在同一文件中,也从
_generated/api导入internal - 为生产Cron jobs添加日志记录和监控
- 对处理大数据集的操作使用分批处理
- 优雅地处理错误以防止作业失败
- 为仪表板可见性使用有意义的作业名称
- 使用Cron表达式时考虑时区(Convex使用UTC)
常见陷阱
- 使用公共函数 - Cron jobs应仅调用内部函数
- 长时间运行的mutations - 将大型操作分解为批次
- 缺少错误处理 - 未处理的错误将使整个作业失败
- 忘记时区 - 所有Cron表达式使用UTC
- 使用已弃用的助手 - 避免
crons.hourly、crons.daily等 - 不记录执行 - 使得调试生产问题困难
参考资料
- Convex文档:https://docs.convex.dev/
- Convex LLMs.txt:https://docs.convex.dev/llms.txt
- Cron Jobs:https://docs.convex.dev/scheduling/cron-jobs
- 调度概述:https://docs.convex.dev/scheduling
- 定时函数:https://docs.convex.dev/scheduling/scheduled-functions