name: inngest-handler description: 创建和管理Inngest函数,用于可靠的背景作业、工作流和定时任务
Inngest函数处理技能
本技能定义了使用Inngest构建持久化、多步骤工作流的标准。
🚨 硬性规则(严格遵守)
-
禁止使用
setTimeout/setInterval:- ❌ 错误示例:
await new Promise(r => setTimeout(r, 1000)) - ✅ 正确示例:
await step.sleep("wait-1s", "1s") - 原因:无服务器函数会超时;Inngest的休眠可持续长达一年。
- ❌ 错误示例:
-
步骤外禁止副作用:
- 任何数据库写入、API调用或非确定性逻辑(随机、日期)必须包裹在
step.run()中。 - 原因:Inngest函数会多次执行(记忆化)。步骤外的代码每次都会运行。
- 任何数据库写入、API调用或非确定性逻辑(随机、日期)必须包裹在
-
确定性步骤:
- 步骤通过其ID(第一个参数)进行记忆化。ID必须唯一且稳定。
- 除非你清楚自己在做什么(例如在循环内使用索引),否则不要动态生成步骤ID。
-
从步骤返回数据:
- 如果后续需要某个值,请从步骤中返回它。
- ❌ 错误示例:
let userId; await step.run(..., () => { userId = ... }) - ✅ 正确示例:
const userId = await step.run(..., () => { return ... })
核心模式
1. 多步骤执行
将所有逻辑包裹在步骤中,以确保可重试性和可恢复性。
export const processOrder = inngest.createFunction(
{ id: "process-order" },
{ event: "shop/order.created" },
async ({ event, step }) => {
// 1. 步骤:验证(可重试)
const user = await step.run("get-user", async () => {
return await db.users.findById(event.data.userId);
});
// 2. 步骤:休眠(持久化暂停)
await step.sleep("wait-for-payment", "1h");
// 3. 步骤:等待事件(人机/系统交互)
const payment = await step.waitForEvent("wait-payment", {
event: "shop/payment.success",
match: "data.orderId",
timeout: "24h"
});
// 4. 步骤:条件逻辑
if (!payment) {
await step.run("cancel-order", async () => { ... });
}
}
);
2. 并行处理
并发运行步骤以加速执行。
const [user, subscription] = await Promise.all([
step.run("fetch-user", () => db.users.find(...)),
step.run("fetch-sub", () => stripe.subscriptions.retrieve(...))
]);
3. 循环处理
在循环内部,确保步骤ID唯一。
const items = event.data.items;
for (const item of items) {
// 使用动态ID确保每个项目唯一
await step.run(`process-item-${item.id}`, async () => {
await processItem(item);
});
}
配置与流程控制
速率限制与节流
防止压垮第三方API。
inngest.createFunction({
id: "sync-crm",
// 每个用户每分钟最多10个请求
rateLimit: { limit: 10, period: "1m", key: "event.data.userId" },
// 如果队列已满则丢弃事件
throttle: { limit: 5, period: "1s" }
}, ...);
防抖
仅处理时间窗口内的最新事件(例如搜索索引)。
inngest.createFunction({
id: "index-product",
// 等待10秒以接收更多事件;仅使用最新数据运行
debounce: { period: "10s", key: "event.data.productId" }
}, ...);
优先级
优先处理特定事件(例如付费用户)。
inngest.createFunction({
id: "generate-report",
// 数字越大优先级越高
priority: { run: "event.data.plan === 'enterprise' ? 100 : 0" }
}, ...);
错误处理
自动重试
Inngest在出错时会自动重试步骤(默认约4-5次,带退避)。
- 自定义:在配置中使用
{ retries: 10 }。
不可重试错误
如果错误是致命的(例如400错误请求),立即停止执行。
import { NonRetriableError } from "inngest";
await step.run("validate", async () => {
if (!isValid) throw new NonRetriableError("无效载荷");
});
失败处理器(回滚)
如果函数在所有重试后失败,执行清理逻辑。
export const riskyFunc = inngest.createFunction(
{
id: "risky-transfer",
// 主处理器失败时运行
onFailure: async ({ error, event, step }) => {
await step.run("rollback-funds", async () => {
await reverseTransfer(event.data.transferId);
});
await step.run("notify-admin", async () => {
await sendAlert(`转账失败:${error.message}`);
});
}
},
{ event: "bank/transfer.init" },
async ({ step }) => { /* ... */ }
);
注册
强制要求:所有函数必须在src/lib/inngest/functions/index.ts中导入和导出。