Inngest函数处理器Skill inngest-handler

Inngest函数处理器技能用于创建和管理可靠的后台作业、多步骤工作流和定时任务。该技能提供了一套标准化的开发模式,确保函数具有持久化、可重试、可恢复的特性,适用于处理复杂的异步业务流程。关键词:Inngest,后台作业,工作流,定时任务,Serverless,函数即服务,异步处理,错误处理,重试机制,事件驱动。

Serverless 0 次安装 0 次浏览 更新于 3/1/2026

name: inngest-handler description: 创建和管理Inngest函数,用于可靠的背景作业、工作流和定时任务

Inngest函数处理技能

本技能定义了使用Inngest构建持久化、多步骤工作流的标准。

🚨 硬性规则(严格遵守)

  1. 禁止使用setTimeout/setInterval

    • 错误示例await new Promise(r => setTimeout(r, 1000))
    • 正确示例await step.sleep("wait-1s", "1s")
    • 原因:无服务器函数会超时;Inngest的休眠可持续长达一年。
  2. 步骤外禁止副作用

    • 任何数据库写入、API调用或非确定性逻辑(随机、日期)必须包裹在step.run()中。
    • 原因:Inngest函数会多次执行(记忆化)。步骤外的代码每次都会运行。
  3. 确定性步骤

    • 步骤通过其ID(第一个参数)进行记忆化。ID必须唯一且稳定。
    • 除非你清楚自己在做什么(例如在循环内使用索引),否则不要动态生成步骤ID。
  4. 从步骤返回数据

    • 如果后续需要某个值,请从步骤中返回它。
    • 错误示例let userId; await step.run(..., () => { userId = ... })
    • 正确示例const userId = await step.run(..., () => { return ... })

核心模式

1. 多步骤执行

将所有逻辑包裹在步骤中,以确保可重试性和可恢复性。

export const processOrder = inngest.createFunction(
  { id: "process-order" },
  { event: "shop/order.created" },
  async ({ event, step }) => {
    // 1. 步骤:验证(可重试)
    const user = await step.run("get-user", async () => {
      return await db.users.findById(event.data.userId);
    });

    // 2. 步骤:休眠(持久化暂停)
    await step.sleep("wait-for-payment", "1h");

    // 3. 步骤:等待事件(人机/系统交互)
    const payment = await step.waitForEvent("wait-payment", {
      event: "shop/payment.success",
      match: "data.orderId",
      timeout: "24h"
    });

    // 4. 步骤:条件逻辑
    if (!payment) {
        await step.run("cancel-order", async () => { ... });
    }
  }
);

2. 并行处理

并发运行步骤以加速执行。

const [user, subscription] = await Promise.all([
  step.run("fetch-user", () => db.users.find(...)),
  step.run("fetch-sub", () => stripe.subscriptions.retrieve(...))
]);

3. 循环处理

在循环内部,确保步骤ID唯一。

const items = event.data.items;
for (const item of items) {
  // 使用动态ID确保每个项目唯一
  await step.run(`process-item-${item.id}`, async () => {
    await processItem(item);
    });
}

配置与流程控制

速率限制与节流

防止压垮第三方API。

inngest.createFunction({
    id: "sync-crm",
    // 每个用户每分钟最多10个请求
    rateLimit: { limit: 10, period: "1m", key: "event.data.userId" },
    // 如果队列已满则丢弃事件
    throttle: { limit: 5, period: "1s" } 
}, ...);

防抖

仅处理时间窗口内的最新事件(例如搜索索引)。

inngest.createFunction({
    id: "index-product",
    // 等待10秒以接收更多事件;仅使用最新数据运行
    debounce: { period: "10s", key: "event.data.productId" }
}, ...);

优先级

优先处理特定事件(例如付费用户)。

inngest.createFunction({
    id: "generate-report",
    // 数字越大优先级越高
    priority: { run: "event.data.plan === 'enterprise' ? 100 : 0" }
}, ...);

错误处理

自动重试

Inngest在出错时会自动重试步骤(默认约4-5次,带退避)。

  • 自定义:在配置中使用{ retries: 10 }

不可重试错误

如果错误是致命的(例如400错误请求),立即停止执行。

import { NonRetriableError } from "inngest";

await step.run("validate", async () => {
  if (!isValid) throw new NonRetriableError("无效载荷");
});

失败处理器(回滚)

如果函数在所有重试后失败,执行清理逻辑。

export const riskyFunc = inngest.createFunction(
  { 
    id: "risky-transfer",
    // 主处理器失败时运行
    onFailure: async ({ error, event, step }) => {
      await step.run("rollback-funds", async () => {
        await reverseTransfer(event.data.transferId);
      });
      await step.run("notify-admin", async () => {
        await sendAlert(`转账失败:${error.message}`);
      });
    }
  },
  { event: "bank/transfer.init" },
  async ({ step }) => { /* ... */ }
);

注册

强制要求:所有函数必须在src/lib/inngest/functions/index.ts中导入和导出。