名称: Bun Workers 描述: 在 Bun 中使用 Web Workers、worker_threads、并行处理和后台任务。 版本: 1.0.0
Bun Workers
Bun 支持 Web Workers 和 Node.js worker_threads 用于并行执行。
Web Workers
基本用法
// main.ts
const worker = new Worker(new URL("./worker.ts", import.meta.url));
worker.postMessage({ type: "start", data: [1, 2, 3, 4, 5] });
worker.onmessage = (event) => {
console.log("结果:", event.data);
};
worker.onerror = (error) => {
console.error("Worker 错误:", error.message);
};
// worker.ts
self.onmessage = (event) => {
const { type, data } = event.data;
if (type === "start") {
const result = data.map((x) => x * 2);
self.postMessage(result);
}
};
使用 URL 的 Worker
// 从文件路径导入
const worker = new Worker(new URL("./worker.ts", import.meta.url));
// 或使用 blob URL
const code = `
self.onmessage = (e) => {
self.postMessage(e.data * 2);
};
`;
const blob = new Blob([code], { type: "application/javascript" });
const worker = new Worker(URL.createObjectURL(blob));
可转移对象
// main.ts
const buffer = new ArrayBuffer(1024 * 1024); // 1MB
const view = new Uint8Array(buffer);
view.fill(42);
// 转移所有权(零拷贝)
worker.postMessage({ buffer }, [buffer]);
// buffer 现在已分离(空)
// worker.ts
self.onmessage = (event) => {
const { buffer } = event.data;
const view = new Uint8Array(buffer);
// 处理 buffer...
// 转移回
self.postMessage({ buffer }, [buffer]);
};
共享内存
// main.ts
const shared = new SharedArrayBuffer(1024);
const view = new Int32Array(shared);
worker.postMessage({ shared });
// 主线程和 worker 都可以访问
Atomics.add(view, 0, 1);
// worker.ts
self.onmessage = (event) => {
const { shared } = event.data;
const view = new Int32Array(shared);
// 原子操作以确保线程安全
Atomics.add(view, 0, 1);
Atomics.notify(view, 0);
};
Node.js worker_threads
// main.ts
import { Worker, isMainThread, parentPort, workerData } from "worker_threads";
if (isMainThread) {
const worker = new Worker(import.meta.filename, {
workerData: { numbers: [1, 2, 3, 4, 5] },
});
worker.on("message", (result) => {
console.log("结果:", result);
});
worker.on("error", (err) => {
console.error("错误:", err);
});
worker.on("exit", (code) => {
console.log("Worker 退出代码:", code);
});
} else {
// Worker 代码
const { numbers } = workerData;
const sum = numbers.reduce((a, b) => a + b, 0);
parentPort?.postMessage(sum);
}
Worker 池
// worker-pool.ts
import { Worker } from "worker_threads";
class WorkerPool {
private workers: Worker[] = [];
private queue: Array<{
task: any;
resolve: (value: any) => void;
reject: (err: Error) => void;
}> = [];
private activeWorkers = new Set<Worker>();
constructor(
private workerPath: string,
private poolSize: number
) {
for (let i = 0; i < poolSize; i++) {
this.addWorker();
}
}
private addWorker() {
const worker = new Worker(this.workerPath);
worker.on("message", (result) => {
this.activeWorkers.delete(worker);
this.processQueue();
});
worker.on("error", (err) => {
this.activeWorkers.delete(worker);
console.error("Worker 错误:", err);
});
this.workers.push(worker);
}
async execute(task: any): Promise<any> {
return new Promise((resolve, reject) => {
this.queue.push({ task, resolve, reject });
this.processQueue();
});
}
private processQueue() {
for (const worker of this.workers) {
if (!this.activeWorkers.has(worker) && this.queue.length > 0) {
const { task, resolve, reject } = this.queue.shift()!;
this.activeWorkers.add(worker);
worker.once("message", resolve);
worker.once("error", reject);
worker.postMessage(task);
}
}
}
terminate() {
this.workers.forEach((w) => w.terminate());
}
}
// 用法
const pool = new WorkerPool("./worker.ts", 4);
const results = await Promise.all([
pool.execute({ task: 1 }),
pool.execute({ task: 2 }),
pool.execute({ task: 3 }),
]);
pool.terminate();
模式
CPU 密集型任务
// main.ts
const worker = new Worker(new URL("./cpu-worker.ts", import.meta.url));
// 处理大型数据集
const data = Array.from({ length: 1000000 }, () => Math.random());
worker.postMessage({ type: "process", data });
worker.onmessage = (event) => {
if (event.data.type === "progress") {
console.log(`进度: ${event.data.percent}%`);
} else if (event.data.type === "result") {
console.log("完成:", event.data.result);
}
};
// cpu-worker.ts
self.onmessage = (event) => {
const { type, data } = event.data;
if (type === "process") {
const chunkSize = 10000;
let result = 0;
for (let i = 0; i < data.length; i++) {
result += Math.sqrt(data[i]);
// 报告进度
if (i % chunkSize === 0) {
self.postMessage({
type: "progress",
percent: Math.round((i / data.length) * 100),
});
}
}
self.postMessage({ type: "result", result });
}
};
并行映射
async function parallelMap<T, R>(
items: T[],
fn: string, // Worker 中的函数名
workerUrl: URL,
concurrency = 4
): Promise<R[]> {
const results: R[] = new Array(items.length);
const workers: Worker[] = [];
// 创建 workers
for (let i = 0; i < concurrency; i++) {
workers.push(new Worker(workerUrl));
}
// 处理项目
let nextIndex = 0;
const processNext = (worker: Worker): Promise<void> => {
return new Promise((resolve) => {
if (nextIndex >= items.length) {
resolve();
return;
}
const index = nextIndex++;
worker.postMessage({ fn, item: items[index], index });
worker.onmessage = (event) => {
results[event.data.index] = event.data.result;
processNext(worker).then(resolve);
};
});
};
await Promise.all(workers.map(processNext));
workers.forEach((w) => w.terminate());
return results;
}
消息通道
// 为 worker 到 worker 通信创建通道
const channel = new MessageChannel();
const worker1 = new Worker(new URL("./worker1.ts", import.meta.url));
const worker2 = new Worker(new URL("./worker2.ts", import.meta.url));
// 给每个 worker 一个端口
worker1.postMessage({ port: channel.port1 }, [channel.port1]);
worker2.postMessage({ port: channel.port2 }, [channel.port2]);
// worker1.ts
let port: MessagePort;
self.onmessage = (event) => {
if (event.data.port) {
port = event.data.port;
port.onmessage = (e) => console.log("来自 worker2:", e.data);
port.postMessage("Hello from worker1!");
}
};
错误处理
const worker = new Worker(new URL("./worker.ts", import.meta.url));
worker.onerror = (error) => {
console.error("Worker 中未捕获的错误:", error.message);
error.preventDefault(); // 防止冒泡
};
worker.onmessageerror = (event) => {
console.error("消息反序列化失败");
};
// 在 worker 中
self.onerror = (error) => {
self.postMessage({ type: "error", message: error.message });
};
终止
const worker = new Worker(new URL("./worker.ts", import.meta.url));
// 请求优雅关闭
worker.postMessage({ type: "shutdown" });
// 超时后强制终止
setTimeout(() => {
worker.terminate();
}, 5000);
// 在 worker 中
self.onmessage = (event) => {
if (event.data.type === "shutdown") {
// 清理
self.close();
}
};
常见错误
| 错误 | 原因 | 修复方法 |
|---|---|---|
Worker 未找到 |
URL 错误 | 检查 worker 文件路径 |
无法序列化 |
非可转移数据 | 使用可转移对象 |
DataCloneError |
消息中包含函数/DOM | 只发送可序列化数据 |
Worker 已终止 |
提前终止 | 检查终止逻辑 |
何时加载参考
加载 references/optimization.md 当:
- Worker 池调优
- 内存管理
- 性能分析
加载 references/patterns.md 当:
- 复杂协调
- 背压处理
- 错误恢复