ReplicateAI后台处理与存储集成技能Skill ai-handler

这是一个用于集成Replicate AI模型的后台处理技能。它通过Inngest队列处理长时间运行的AI生成任务(如图像生成),自动管理用户积分系统,将生成结果(如图片)上传至S3对象存储,并更新数据库状态。该技能实现了完整的生产级AI应用流程,包括触发、验证、异步处理、存储和状态管理,适用于构建需要稳定、可扩展AI服务的Web应用。关键词:Replicate AI集成,后台任务处理,S3存储,积分系统,Inngest队列,AI图像生成,异步处理,生产部署。

AI应用 0 次安装 0 次浏览 更新于 3/1/2026

name: ai-handler description: 将Replicate AI模型与后台处理、S3存储和积分系统集成 tools: 读取、写入、编辑 model: 继承 deps: [“inngest-handler”, “s3-upload-handler”, “credits-handler”, “replicate-handler”]

Replicate AI 处理器技能

此技能提供了一个生产就绪的模式,用于集成Replicate AI模型。它使用Inngest后台作业处理长时间运行的预测,将结果存储在S3中,管理用户积分,并更新数据库状态。

架构

  1. 触发:用户通过API(例如/api/app/ai-images)请求生成。
  2. 验证:检查/扣除用户积分。
  3. 状态:创建一个状态为"processing"(处理中)的数据库记录。
  4. 队列:触发一个Inngest函数来处理Replicate API调用。
  5. 处理
    • 调用Replicate API。
    • 等待完成(轮询或webhook)。
    • 下载结果并上传到S3(服务器端)。
  6. 完成:使用S3 URL和状态"completed"(已完成)更新数据库记录。
  7. 失败:如果失败则退还积分(可选)并将状态更新为failed(失败)。

先决条件

  • 已安装replicate包(npm install replicate)。
  • .env文件中配置REPLICATE_API_TOKEN
  • 已配置S3和Inngest。

实施步骤

1. API路由(触发)

src/app/api/app/generate/route.ts

import withAuthRequired from "@/lib/auth/withAuthRequired";
import { db } from "@/db";
import { generations } from "/db/schema";
import { inngest } from "@/lib/inngest/client";
import { checkCredits, deductCredits } from "@/lib/credits"; // 假设的辅助函数

export const POST = withAuthRequired(async (req, { session }) => {
  const body = await req.json();
  
  // 1. 检查积分
  const hasCredits = await checkCredits(session.user.id, "image_generation", 1);
  if (!hasCredits) return new Response("积分不足", { status: 403 });

  // 2. 创建数据库记录(待处理)
  const [record] = await db.insert(generations).values({
    userId: session.user.id,
    prompt: body.prompt,
    status: "processing",
  }).returning();

  // 3. 扣除积分(乐观)
  await deductCredits(session.user.id, "image_generation", 1, { source: "api", refId: record.id });

  // 4. 触发后台作业
  await inngest.send({
    name: "app/ai.generate",
    data: {
      generationId: record.id,
      prompt: body.prompt,
      userId: session.user.id
    }
  });

  return Response.json({ id: record.id, status: "processing" });
});

2. Inngest函数(处理器)

src/lib/inngest/functions/app/ai/generate.ts

import { inngest } from "@/lib/inngest/client";
import Replicate from "replicate";
import uploadFromServer from "@/lib/s3/uploadFromServer";
import { db } from "@/db";
import { generations } from "@/db/schema";
import { eq } from "drizzle-orm";

const replicate = new Replicate({ auth: process.env.REPLICATE_API_TOKEN });

export const generateAI = inngest.createFunction(
  { id: "ai-generation-worker", concurrency: 5 },
  { event: "app/ai.generate" },
  async ({ event, step }) => {
    const { generationId, prompt } = event.data;

    try {
      // 1. 调用Replicate(Step确保在网络错误时重试)
      const prediction = await step.run("call-replicate", async () => {
        return await replicate.predictions.create({
          version: "model-version-hash",
          input: { prompt }
        });
      });

      // 2. 等待完成
      // Replicate通常需要时间。如果使用webhooks,我们可以使用waitForEvent,
      // 或者如果没有设置webhooks,可以使用带有sleep的简单轮询循环。
      // 为简单起见,这里是一个使用sleep的轮询模式:
      let finalPrediction = prediction;
      while (finalPrediction.status !== "succeeded" && finalPrediction.status !== "failed") {
        await step.sleep("wait-for-gpu", "5s");
        finalPrediction = await step.run("check-status", () => 
          replicate.predictions.get(prediction.id)
        );
      }

      if (finalPrediction.status === "failed") {
        throw new Error(finalPrediction.error);
      }

      // 3. 上传到S3
      // Replicate返回一个临时URL。我们必须持久化它。
      const outputUrl = finalPrediction.output[0]; // 根据模型输出进行调整
      
      const s3Url = await step.run("upload-to-s3", async () => {
        // 获取图像缓冲区
        const response = await fetch(outputUrl);
        const arrayBuffer = await response.arrayBuffer();
        const base64 = Buffer.from(arrayBuffer).toString("base64");

        // 使用现有的S3技能
        return await uploadFromServer({
          file: base64,
          path: `generations/${generationId}.png`,
          contentType: "image/png"
        });
      });

      // 4. 更新数据库
      await step.run("update-db", async () => {
        await db.update(generations)
          .set({ status: "completed", url: s3Url })
          .where(eq(generations.id, generationId));
      });

    } catch (error) {
      // 处理失败
      await step.run("mark-failed", async () => {
        await db.update(generations)
          .set({ status: "failed" })
          .where(eq(generations.id, generationId));
        
        // 可选:在此处退还积分
      });
      throw error; // 重新抛出以在Inngest仪表板中显示失败
    }
  }
);