Bknd批量操作Skill bknd-bulk-operations

该技能专注于在Bknd平台中高效执行大规模数据操作,包括批量插入、更新、删除,以及分块处理、错误处理策略等。适用于数据迁移、ETL管道、数据清理等场景。关键词:批量操作,Bknd,数据迁移,错误处理,分块处理,数据清理

低代码开发 0 次安装 0 次浏览 更新于 3/9/2026

name: bknd-bulk-operations description: 在Bknd中执行批量插入、更新或删除操作时使用。涵盖createMany、updateMany、deleteMany、带进度的批处理、分块大型数据集、错误处理策略和类似事务模式。

批量操作

使用Bknd的批量API高效执行大规模插入、更新和删除操作。

先决条件

  • Bknd项目正在运行(本地或部署)
  • 实体存在(首先使用 bknd-create-entity
  • SDK配置或API端点已知

何时使用UI模式

不推荐将UI模式用于批量操作。仅将管理面板用于:

  • 导入小型CSV文件(<100条记录)
  • 手动数据清理

何时使用代码模式

  • 从另一个系统迁移数据
  • 种子大型数据集
  • 批处理更新(发布所有草稿,归档旧记录)
  • 批量删除(清理,GDPR请求)
  • ETL管道

代码方法

批量插入:createMany

import { Api } from "bknd";

const api = new Api({ host: "http://localhost:7654" });

// 在单个调用中插入多条记录
const { ok, data, error } = await api.data.createMany("products", [
  { name: "产品A", price: 10, stock: 100 },
  { name: "产品B", price: 20, stock: 50 },
  { name: "产品C", price: 15, stock: 75 },
]);

if (ok) {
  console.log(`创建了 ${data.length} 个产品`);
  // data包含已创建记录的数组,带有ID
}

批量更新:updateMany

// 更新匹配where子句的所有记录
const { ok, data } = await api.data.updateMany(
  "posts",
  { status: { $eq: "draft" } },     // where子句(必需)
  { status: "archived" }             // 更新数据
);

// 归档旧帖子
await api.data.updateMany(
  "posts",
  {
    status: { $eq: "published" },
    created_at: { $lt: "2024-01-01" },
  },
  { status: "archived" }
);

// 为多个帖子增加查看次数
await api.data.updateMany(
  "posts",
  { id: { $in: [1, 2, 3, 4, 5] } },
  { featured: true }
);

批量删除:deleteMany

// 删除匹配where子句的所有记录
const { ok, data } = await api.data.deleteMany("logs", {
  created_at: { $lt: "2023-01-01" },  // 删除旧日志
});

// 按ID删除
await api.data.deleteMany("temp_files", {
  id: { $in: [10, 11, 12, 13] },
});

// 删除已归档项目
await api.data.deleteMany("posts", {
  status: { $eq: "archived" },
  deleted_at: { $isnull: false },
});

警告: where 子句是必需的——防止意外删除全部。

分块处理

对于大型数据集,分块处理以避免超时和内存问题:

基本分块

async function bulkInsertChunked(
  api: Api,
  entity: string,
  items: object[],
  chunkSize = 100
): Promise<object[]> {
  const results: object[] = [];

  for (let i = 0; i < items.length; i += chunkSize) {
    const chunk = items.slice(i, i + chunkSize);
    const { ok, data, error } = await api.data.createMany(entity, chunk);

    if (!ok) {
      throw new Error(`块 ${i / chunkSize + 1} 失败:${error.message}`);
    }

    results.push(...data);
  }

  return results;
}

// 使用
const products = generateProducts(5000);  // 大型数据集
const created = await bulkInsertChunked(api, "products", products);
console.log(`创建了 ${created.length} 个产品`);

带进度回调

type ProgressCallback = (done: number, total: number, chunk: number) => void;

async function bulkInsertWithProgress(
  api: Api,
  entity: string,
  items: object[],
  onProgress?: ProgressCallback,
  chunkSize = 100
): Promise<{ success: object[]; failed: object[] }> {
  const success: object[] = [];
  const failed: object[] = [];
  const totalChunks = Math.ceil(items.length / chunkSize);

  for (let i = 0; i < items.length; i += chunkSize) {
    const chunkNum = Math.floor(i / chunkSize) + 1;
    const chunk = items.slice(i, i + chunkSize);

    const { ok, data, error } = await api.data.createMany(entity, chunk);

    if (ok) {
      success.push(...data);
    } else {
      failed.push(...chunk);
      console.warn(`块 ${chunkNum} 失败:`, error.message);
    }

    onProgress?.(Math.min(i + chunkSize, items.length), items.length, chunkNum);
  }

  return { success, failed };
}

// 带进度使用
await bulkInsertWithProgress(
  api,
  "products",
  products,
  (done, total, chunk) => {
    const percent = Math.round((done / total) * 100);
    console.log(`进度:${percent}%(块 ${chunk})`);
  }
);

并行分块处理

并发处理多个块(谨慎使用):

async function bulkInsertParallel(
  api: Api,
  entity: string,
  items: object[],
  chunkSize = 100,
  concurrency = 3
): Promise<object[]> {
  const chunks: object[][] = [];
  for (let i = 0; i < items.length; i += chunkSize) {
    chunks.push(items.slice(i, i + chunkSize));
  }

  const results: object[] = [];

  // 以并发请求批次处理
  for (let i = 0; i < chunks.length; i += concurrency) {
    const batch = chunks.slice(i, i + concurrency);
    const promises = batch.map((chunk) =>
      api.data.createMany(entity, chunk)
    );

    const responses = await Promise.all(promises);
    for (const { ok, data } of responses) {
      if (ok) results.push(...data);
    }
  }

  return results;
}

REST API方法

批量插入

curl -X POST http://localhost:7654/api/data/products \
  -H "Content-Type: application/json" \
  -d '[
    {"name": "Product A", "price": 10},
    {"name": "Product B", "price": 20}
  ]'

批量更新

curl -X PATCH http://localhost:7654/api/data/posts \
  -H "Content-Type: application/json" \
  -d '{
    "where": {"status": {"$eq": "draft"}},
    "data": {"status": "archived"}
  }'

批量删除

curl -X DELETE http://localhost:7654/api/data/logs \
  -H "Content-Type: application/json" \
  -d '{"where": {"created_at": {"$lt": "2023-01-01"}}}'

服务器端种子数据

对于初始数据填充,使用种子函数:

import { App, em, entity, text, number } from "bknd";

const schema = em({
  products: entity("products", {
    name: text().required(),
    price: number().required(),
    stock: number({ default_value: 0 }),
  }),
});

new App({
  ...schema,
  options: {
    seed: async (ctx) => {
      // 检查是否已种子化
      const { data } = await ctx.em.repo("products").count();
      if (data.count > 0) return;

      // 通过mutator批量插入
      await ctx.em.mutator("products").insertMany([
        { name: "Widget", price: 9.99, stock: 100 },
        { name: "Gadget", price: 19.99, stock: 50 },
        { name: "Gizmo", price: 14.99, stock: 75 },
      ]);

      console.log("已种子化产品");
    },
  },
});

错误处理策略

全有或全无(快速失败)

在第一个错误时停止:

async function bulkInsertStrict(api: Api, entity: string, items: object[]) {
  for (let i = 0; i < items.length; i += 100) {
    const chunk = items.slice(i, i + 100);
    const { ok, error } = await api.data.createMany(entity, chunk);

    if (!ok) {
      throw new Error(`块 ${i / 100 + 1} 失败:${error.message}`);
    }
  }
}

最佳努力(继续处理错误)

收集失败,继续处理:

async function bulkInsertBestEffort(api: Api, entity: string, items: object[]) {
  const results = { success: [] as object[], failed: [] as object[] };

  for (let i = 0; i < items.length; i += 100) {
    const chunk = items.slice(i, i + 100);
    const { ok, data } = await api.data.createMany(entity, chunk);

    if (ok) {
      results.success.push(...data);
    } else {
      results.failed.push(...chunk);
    }
  }

  return results;
}

单个回退

在块失败时回退到单个插入:

async function bulkInsertWithFallback(api: Api, entity: string, items: object[]) {
  const success: object[] = [];
  const failed: object[] = [];

  for (let i = 0; i < items.length; i += 100) {
    const chunk = items.slice(i, i + 100);
    const { ok, data } = await api.data.createMany(entity, chunk);

    if (ok) {
      success.push(...data);
    } else {
      // 回退到单个插入
      for (const item of chunk) {
        const { ok: itemOk, data: itemData } = await api.data.createOne(
          entity,
          item
        );
        if (itemOk) {
          success.push(itemData);
        } else {
          failed.push(item);
        }
      }
    }
  }

  return { success, failed };
}

常见模式

数据迁移

async function migrateData(
  sourceApi: Api,
  targetApi: Api,
  entity: string,
  transform?: (record: object) => object
) {
  let offset = 0;
  const limit = 100;
  let total = 0;

  while (true) {
    const { data, meta } = await sourceApi.data.readMany(entity, {
      limit,
      offset,
    });

    if (data.length === 0) break;

    const transformed = transform ? data.map(transform) : data;
    await targetApi.data.createMany(entity, transformed);

    total += data.length;
    offset += limit;

    console.log(`已迁移 ${total}/${meta.total} 条记录`);
  }

  return total;
}

条件批量更新

// 发布特定作者的所有帖子
await api.data.updateMany(
  "posts",
  {
    author_id: { $eq: authorId },
    status: { $eq: "draft" },
  },
  { status: "published", published_at: new Date().toISOString() }
);

// 标记非活跃用户
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
  .toISOString();

await api.data.updateMany(
  "users",
  { last_login: { $lt: thirtyDaysAgo } },
  { status: "inactive" }
);

软删除清理

// 永久删除超过30天的软删除记录
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
  .toISOString();

await api.data.deleteMany("posts", {
  deleted_at: { $lt: thirtyDaysAgo, $isnull: false },
});

批量更新关系

// 为多个帖子添加标签
const postIds = [1, 2, 3, 4, 5];

for (const postId of postIds) {
  await api.data.updateOne("posts", postId, {
    tags: { $add: [newTagId] },
  });
}

注意:Bknd不支持单个调用中的批量关系更新。循环处理记录。

类似事务模式

Bknd没有显式事务。使用此模式处理相关操作:

async function createOrderWithItems(
  api: Api,
  orderData: object,
  items: object[]
) {
  // 创建订单
  const { ok, data: order, error } = await api.data.createOne("orders", orderData);
  if (!ok) throw new Error(`订单失败:${error.message}`);

  // 创建订单项
  const itemsWithOrder = items.map((item) => ({
    ...item,
    order: { $set: order.id },
  }));

  const { ok: itemsOk, error: itemsError } = await api.data.createMany(
    "order_items",
    itemsWithOrder
  );

  if (!itemsOk) {
    // 回滚:删除订单
    await api.data.deleteOne("orders", order.id);
    throw new Error(`项失败,订单已回滚:${itemsError.message}`);
  }

  return order;
}

React集成

批量导入组件

import { useApp } from "bknd/react";
import { useState } from "react";

function BulkImport({ entity }: { entity: string }) {
  const { api } = useApp();
  const [progress, setProgress] = useState(0);
  const [status, setStatus] = useState<"idle" | "importing" | "done">("idle");

  async function handleFileUpload(e: React.ChangeEvent<HTMLInputElement>) {
    const file = e.target.files?.[0];
    if (!file) return;

    const text = await file.text();
    const items = JSON.parse(text);  // 假设JSON数组

    setStatus("importing");
    setProgress(0);

    const chunkSize = 100;
    for (let i = 0; i < items.length; i += chunkSize) {
      const chunk = items.slice(i, i + chunkSize);
      await api.data.createMany(entity, chunk);
      setProgress(Math.round(((i + chunkSize) / items.length) * 100));
    }

    setStatus("done");
  }

  return (
    <div>
      <input type="file" accept=".json" onChange={handleFileUpload} />
      {status === "importing" && <p>导入中... {progress}%</p>}
      {status === "done" && <p>导入完成!</p>}
    </div>
  );
}

带确认的批量删除

function BulkDeleteButton({
  entity,
  where,
  onComplete,
}: {
  entity: string;
  where: object;
  onComplete: () => void;
}) {
  const { api } = useApp();
  const [loading, setLoading] = useState(false);

  async function handleDelete() {
    // 首先获取数量
    const { data } = await api.data.count(entity, where);
    const confirmed = window.confirm(
      `删除 ${data.count} 条记录?此操作无法撤销。`
    );

    if (!confirmed) return;

    setLoading(true);
    await api.data.deleteMany(entity, where);
    setLoading(false);
    onComplete();
  }

  return (
    <button onClick={handleDelete} disabled={loading}>
      {loading ? "删除中..." : "删除所有匹配项"}
    </button>
  );
}

性能提示

  1. 最佳分块大小: 每个块100-500条记录(平衡速度与内存)
  2. 避免并行写入到同一实体(可能导致锁)
  3. 使用服务器端种子处理初始大型数据集
  4. 索引字段用于批量更新/删除的where子句
  5. 监控内存当客户端处理非常大型数据集时

常见陷阱

在deleteMany上缺少Where子句

问题: 尝试删除所有记录被阻止。

修复: 始终提供where子句:

// 错误 - 没有where子句
await api.data.deleteMany("posts");  // 错误!

// 正确
await api.data.deleteMany("posts", { status: { $eq: "archived" } });

// 若要删除所有(有意地):
await api.data.deleteMany("posts", { id: { $gt: 0 } });

大型数据集的内存问题

问题: 处理数百万条记录时内存不足。

修复: 分块处理,避免一次性加载所有数据:

// 错误 - 加载所有数据
const { data } = await api.data.readMany("logs", { limit: 1000000 });
await api.data.deleteMany("logs", { id: { $in: data.map((d) => d.id) } });

// 正确 - 直接使用where删除
await api.data.deleteMany("logs", { created_at: { $lt: cutoffDate } });

无唯一约束处理

问题: 批量插入在重复键上失败。

修复: 插入前去重或使用upsert模式:

// 插入前按邮箱去重
const uniqueItems = [...new Map(items.map((i) => [i.email, i])).values()];
await api.data.createMany("users", uniqueItems);

超大型操作上的超时

问题: 请求在巨大批量操作上超时。

修复: 使用更小的块和更长的延迟:

for (let i = 0; i < items.length; i += 50) {
  await api.data.createMany(entity, items.slice(i, i + 50));
  await new Promise((r) => setTimeout(r, 100));  // 小延迟
}

DOs和DON’Ts

DO:

  • 对大型数据集(>100条记录)使用分块
  • 为updateMany/deleteMany提供where子句
  • 跟踪进度以提供用户反馈
  • 优雅处理部分失败
  • 使用服务器端种子处理初始数据

DON’T:

  • 将数百万条记录加载到内存中
  • 并行批量写入到同一实体
  • 假设批量操作是原子的
  • 忘记处理唯一约束错误
  • 跳过破坏性批量删除的确认

相关技能

  • bknd-crud-create - 单条记录插入
  • bknd-crud-update - 单条记录更新
  • bknd-crud-delete - 单条记录删除
  • bknd-seed-data - 服务器端初始数据填充
  • bknd-query-filter - 为批量操作构建where子句