name: bknd-bulk-operations description: 在Bknd中执行批量插入、更新或删除操作时使用。涵盖createMany、updateMany、deleteMany、带进度的批处理、分块大型数据集、错误处理策略和类似事务模式。
批量操作
使用Bknd的批量API高效执行大规模插入、更新和删除操作。
先决条件
- Bknd项目正在运行(本地或部署)
- 实体存在(首先使用
bknd-create-entity) - SDK配置或API端点已知
何时使用UI模式
不推荐将UI模式用于批量操作。仅将管理面板用于:
- 导入小型CSV文件(<100条记录)
- 手动数据清理
何时使用代码模式
- 从另一个系统迁移数据
- 种子大型数据集
- 批处理更新(发布所有草稿,归档旧记录)
- 批量删除(清理,GDPR请求)
- ETL管道
代码方法
批量插入:createMany
import { Api } from "bknd";
const api = new Api({ host: "http://localhost:7654" });
// 在单个调用中插入多条记录
const { ok, data, error } = await api.data.createMany("products", [
{ name: "产品A", price: 10, stock: 100 },
{ name: "产品B", price: 20, stock: 50 },
{ name: "产品C", price: 15, stock: 75 },
]);
if (ok) {
console.log(`创建了 ${data.length} 个产品`);
// data包含已创建记录的数组,带有ID
}
批量更新:updateMany
// 更新匹配where子句的所有记录
const { ok, data } = await api.data.updateMany(
"posts",
{ status: { $eq: "draft" } }, // where子句(必需)
{ status: "archived" } // 更新数据
);
// 归档旧帖子
await api.data.updateMany(
"posts",
{
status: { $eq: "published" },
created_at: { $lt: "2024-01-01" },
},
{ status: "archived" }
);
// 为多个帖子增加查看次数
await api.data.updateMany(
"posts",
{ id: { $in: [1, 2, 3, 4, 5] } },
{ featured: true }
);
批量删除:deleteMany
// 删除匹配where子句的所有记录
const { ok, data } = await api.data.deleteMany("logs", {
created_at: { $lt: "2023-01-01" }, // 删除旧日志
});
// 按ID删除
await api.data.deleteMany("temp_files", {
id: { $in: [10, 11, 12, 13] },
});
// 删除已归档项目
await api.data.deleteMany("posts", {
status: { $eq: "archived" },
deleted_at: { $isnull: false },
});
警告: where 子句是必需的——防止意外删除全部。
分块处理
对于大型数据集,分块处理以避免超时和内存问题:
基本分块
async function bulkInsertChunked(
api: Api,
entity: string,
items: object[],
chunkSize = 100
): Promise<object[]> {
const results: object[] = [];
for (let i = 0; i < items.length; i += chunkSize) {
const chunk = items.slice(i, i + chunkSize);
const { ok, data, error } = await api.data.createMany(entity, chunk);
if (!ok) {
throw new Error(`块 ${i / chunkSize + 1} 失败:${error.message}`);
}
results.push(...data);
}
return results;
}
// 使用
const products = generateProducts(5000); // 大型数据集
const created = await bulkInsertChunked(api, "products", products);
console.log(`创建了 ${created.length} 个产品`);
带进度回调
type ProgressCallback = (done: number, total: number, chunk: number) => void;
async function bulkInsertWithProgress(
api: Api,
entity: string,
items: object[],
onProgress?: ProgressCallback,
chunkSize = 100
): Promise<{ success: object[]; failed: object[] }> {
const success: object[] = [];
const failed: object[] = [];
const totalChunks = Math.ceil(items.length / chunkSize);
for (let i = 0; i < items.length; i += chunkSize) {
const chunkNum = Math.floor(i / chunkSize) + 1;
const chunk = items.slice(i, i + chunkSize);
const { ok, data, error } = await api.data.createMany(entity, chunk);
if (ok) {
success.push(...data);
} else {
failed.push(...chunk);
console.warn(`块 ${chunkNum} 失败:`, error.message);
}
onProgress?.(Math.min(i + chunkSize, items.length), items.length, chunkNum);
}
return { success, failed };
}
// 带进度使用
await bulkInsertWithProgress(
api,
"products",
products,
(done, total, chunk) => {
const percent = Math.round((done / total) * 100);
console.log(`进度:${percent}%(块 ${chunk})`);
}
);
并行分块处理
并发处理多个块(谨慎使用):
async function bulkInsertParallel(
api: Api,
entity: string,
items: object[],
chunkSize = 100,
concurrency = 3
): Promise<object[]> {
const chunks: object[][] = [];
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push(items.slice(i, i + chunkSize));
}
const results: object[] = [];
// 以并发请求批次处理
for (let i = 0; i < chunks.length; i += concurrency) {
const batch = chunks.slice(i, i + concurrency);
const promises = batch.map((chunk) =>
api.data.createMany(entity, chunk)
);
const responses = await Promise.all(promises);
for (const { ok, data } of responses) {
if (ok) results.push(...data);
}
}
return results;
}
REST API方法
批量插入
curl -X POST http://localhost:7654/api/data/products \
-H "Content-Type: application/json" \
-d '[
{"name": "Product A", "price": 10},
{"name": "Product B", "price": 20}
]'
批量更新
curl -X PATCH http://localhost:7654/api/data/posts \
-H "Content-Type: application/json" \
-d '{
"where": {"status": {"$eq": "draft"}},
"data": {"status": "archived"}
}'
批量删除
curl -X DELETE http://localhost:7654/api/data/logs \
-H "Content-Type: application/json" \
-d '{"where": {"created_at": {"$lt": "2023-01-01"}}}'
服务器端种子数据
对于初始数据填充,使用种子函数:
import { App, em, entity, text, number } from "bknd";
const schema = em({
products: entity("products", {
name: text().required(),
price: number().required(),
stock: number({ default_value: 0 }),
}),
});
new App({
...schema,
options: {
seed: async (ctx) => {
// 检查是否已种子化
const { data } = await ctx.em.repo("products").count();
if (data.count > 0) return;
// 通过mutator批量插入
await ctx.em.mutator("products").insertMany([
{ name: "Widget", price: 9.99, stock: 100 },
{ name: "Gadget", price: 19.99, stock: 50 },
{ name: "Gizmo", price: 14.99, stock: 75 },
]);
console.log("已种子化产品");
},
},
});
错误处理策略
全有或全无(快速失败)
在第一个错误时停止:
async function bulkInsertStrict(api: Api, entity: string, items: object[]) {
for (let i = 0; i < items.length; i += 100) {
const chunk = items.slice(i, i + 100);
const { ok, error } = await api.data.createMany(entity, chunk);
if (!ok) {
throw new Error(`块 ${i / 100 + 1} 失败:${error.message}`);
}
}
}
最佳努力(继续处理错误)
收集失败,继续处理:
async function bulkInsertBestEffort(api: Api, entity: string, items: object[]) {
const results = { success: [] as object[], failed: [] as object[] };
for (let i = 0; i < items.length; i += 100) {
const chunk = items.slice(i, i + 100);
const { ok, data } = await api.data.createMany(entity, chunk);
if (ok) {
results.success.push(...data);
} else {
results.failed.push(...chunk);
}
}
return results;
}
单个回退
在块失败时回退到单个插入:
async function bulkInsertWithFallback(api: Api, entity: string, items: object[]) {
const success: object[] = [];
const failed: object[] = [];
for (let i = 0; i < items.length; i += 100) {
const chunk = items.slice(i, i + 100);
const { ok, data } = await api.data.createMany(entity, chunk);
if (ok) {
success.push(...data);
} else {
// 回退到单个插入
for (const item of chunk) {
const { ok: itemOk, data: itemData } = await api.data.createOne(
entity,
item
);
if (itemOk) {
success.push(itemData);
} else {
failed.push(item);
}
}
}
}
return { success, failed };
}
常见模式
数据迁移
async function migrateData(
sourceApi: Api,
targetApi: Api,
entity: string,
transform?: (record: object) => object
) {
let offset = 0;
const limit = 100;
let total = 0;
while (true) {
const { data, meta } = await sourceApi.data.readMany(entity, {
limit,
offset,
});
if (data.length === 0) break;
const transformed = transform ? data.map(transform) : data;
await targetApi.data.createMany(entity, transformed);
total += data.length;
offset += limit;
console.log(`已迁移 ${total}/${meta.total} 条记录`);
}
return total;
}
条件批量更新
// 发布特定作者的所有帖子
await api.data.updateMany(
"posts",
{
author_id: { $eq: authorId },
status: { $eq: "draft" },
},
{ status: "published", published_at: new Date().toISOString() }
);
// 标记非活跃用户
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
.toISOString();
await api.data.updateMany(
"users",
{ last_login: { $lt: thirtyDaysAgo } },
{ status: "inactive" }
);
软删除清理
// 永久删除超过30天的软删除记录
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
.toISOString();
await api.data.deleteMany("posts", {
deleted_at: { $lt: thirtyDaysAgo, $isnull: false },
});
批量更新关系
// 为多个帖子添加标签
const postIds = [1, 2, 3, 4, 5];
for (const postId of postIds) {
await api.data.updateOne("posts", postId, {
tags: { $add: [newTagId] },
});
}
注意:Bknd不支持单个调用中的批量关系更新。循环处理记录。
类似事务模式
Bknd没有显式事务。使用此模式处理相关操作:
async function createOrderWithItems(
api: Api,
orderData: object,
items: object[]
) {
// 创建订单
const { ok, data: order, error } = await api.data.createOne("orders", orderData);
if (!ok) throw new Error(`订单失败:${error.message}`);
// 创建订单项
const itemsWithOrder = items.map((item) => ({
...item,
order: { $set: order.id },
}));
const { ok: itemsOk, error: itemsError } = await api.data.createMany(
"order_items",
itemsWithOrder
);
if (!itemsOk) {
// 回滚:删除订单
await api.data.deleteOne("orders", order.id);
throw new Error(`项失败,订单已回滚:${itemsError.message}`);
}
return order;
}
React集成
批量导入组件
import { useApp } from "bknd/react";
import { useState } from "react";
function BulkImport({ entity }: { entity: string }) {
const { api } = useApp();
const [progress, setProgress] = useState(0);
const [status, setStatus] = useState<"idle" | "importing" | "done">("idle");
async function handleFileUpload(e: React.ChangeEvent<HTMLInputElement>) {
const file = e.target.files?.[0];
if (!file) return;
const text = await file.text();
const items = JSON.parse(text); // 假设JSON数组
setStatus("importing");
setProgress(0);
const chunkSize = 100;
for (let i = 0; i < items.length; i += chunkSize) {
const chunk = items.slice(i, i + chunkSize);
await api.data.createMany(entity, chunk);
setProgress(Math.round(((i + chunkSize) / items.length) * 100));
}
setStatus("done");
}
return (
<div>
<input type="file" accept=".json" onChange={handleFileUpload} />
{status === "importing" && <p>导入中... {progress}%</p>}
{status === "done" && <p>导入完成!</p>}
</div>
);
}
带确认的批量删除
function BulkDeleteButton({
entity,
where,
onComplete,
}: {
entity: string;
where: object;
onComplete: () => void;
}) {
const { api } = useApp();
const [loading, setLoading] = useState(false);
async function handleDelete() {
// 首先获取数量
const { data } = await api.data.count(entity, where);
const confirmed = window.confirm(
`删除 ${data.count} 条记录?此操作无法撤销。`
);
if (!confirmed) return;
setLoading(true);
await api.data.deleteMany(entity, where);
setLoading(false);
onComplete();
}
return (
<button onClick={handleDelete} disabled={loading}>
{loading ? "删除中..." : "删除所有匹配项"}
</button>
);
}
性能提示
- 最佳分块大小: 每个块100-500条记录(平衡速度与内存)
- 避免并行写入到同一实体(可能导致锁)
- 使用服务器端种子处理初始大型数据集
- 索引字段用于批量更新/删除的where子句
- 监控内存当客户端处理非常大型数据集时
常见陷阱
在deleteMany上缺少Where子句
问题: 尝试删除所有记录被阻止。
修复: 始终提供where子句:
// 错误 - 没有where子句
await api.data.deleteMany("posts"); // 错误!
// 正确
await api.data.deleteMany("posts", { status: { $eq: "archived" } });
// 若要删除所有(有意地):
await api.data.deleteMany("posts", { id: { $gt: 0 } });
大型数据集的内存问题
问题: 处理数百万条记录时内存不足。
修复: 分块处理,避免一次性加载所有数据:
// 错误 - 加载所有数据
const { data } = await api.data.readMany("logs", { limit: 1000000 });
await api.data.deleteMany("logs", { id: { $in: data.map((d) => d.id) } });
// 正确 - 直接使用where删除
await api.data.deleteMany("logs", { created_at: { $lt: cutoffDate } });
无唯一约束处理
问题: 批量插入在重复键上失败。
修复: 插入前去重或使用upsert模式:
// 插入前按邮箱去重
const uniqueItems = [...new Map(items.map((i) => [i.email, i])).values()];
await api.data.createMany("users", uniqueItems);
超大型操作上的超时
问题: 请求在巨大批量操作上超时。
修复: 使用更小的块和更长的延迟:
for (let i = 0; i < items.length; i += 50) {
await api.data.createMany(entity, items.slice(i, i + 50));
await new Promise((r) => setTimeout(r, 100)); // 小延迟
}
DOs和DON’Ts
DO:
- 对大型数据集(>100条记录)使用分块
- 为updateMany/deleteMany提供where子句
- 跟踪进度以提供用户反馈
- 优雅处理部分失败
- 使用服务器端种子处理初始数据
DON’T:
- 将数百万条记录加载到内存中
- 并行批量写入到同一实体
- 假设批量操作是原子的
- 忘记处理唯一约束错误
- 跳过破坏性批量删除的确认
相关技能
- bknd-crud-create - 单条记录插入
- bknd-crud-update - 单条记录更新
- bknd-crud-delete - 单条记录删除
- bknd-seed-data - 服务器端初始数据填充
- bknd-query-filter - 为批量操作构建where子句