Bun工作线程 BunWorkers

Bun Workers 是在 Bun 运行时中利用 Web Workers 和 Node.js worker_threads 进行并行执行和任务处理的技能,适用于处理 CPU 密集型任务、实现并行映射、错误处理和资源管理,关键词包括 Bun、Web Workers、worker_threads、并行处理、多线程编程、后台任务、性能优化。

后端开发 0 次安装 0 次浏览 更新于 3/8/2026

名称: Bun Workers 描述: 在 Bun 中使用 Web Workers、worker_threads、并行处理和后台任务。 版本: 1.0.0

Bun Workers

Bun 支持 Web Workers 和 Node.js worker_threads 用于并行执行。

Web Workers

基本用法

// main.ts
const worker = new Worker(new URL("./worker.ts", import.meta.url));

worker.postMessage({ type: "start", data: [1, 2, 3, 4, 5] });

worker.onmessage = (event) => {
  console.log("结果:", event.data);
};

worker.onerror = (error) => {
  console.error("Worker 错误:", error.message);
};

// worker.ts
self.onmessage = (event) => {
  const { type, data } = event.data;

  if (type === "start") {
    const result = data.map((x) => x * 2);
    self.postMessage(result);
  }
};

使用 URL 的 Worker

// 从文件路径导入
const worker = new Worker(new URL("./worker.ts", import.meta.url));

// 或使用 blob URL
const code = `
  self.onmessage = (e) => {
    self.postMessage(e.data * 2);
  };
`;
const blob = new Blob([code], { type: "application/javascript" });
const worker = new Worker(URL.createObjectURL(blob));

可转移对象

// main.ts
const buffer = new ArrayBuffer(1024 * 1024); // 1MB
const view = new Uint8Array(buffer);
view.fill(42);

// 转移所有权(零拷贝)
worker.postMessage({ buffer }, [buffer]);
// buffer 现在已分离(空)

// worker.ts
self.onmessage = (event) => {
  const { buffer } = event.data;
  const view = new Uint8Array(buffer);
  // 处理 buffer...

  // 转移回
  self.postMessage({ buffer }, [buffer]);
};

共享内存

// main.ts
const shared = new SharedArrayBuffer(1024);
const view = new Int32Array(shared);

worker.postMessage({ shared });

// 主线程和 worker 都可以访问
Atomics.add(view, 0, 1);

// worker.ts
self.onmessage = (event) => {
  const { shared } = event.data;
  const view = new Int32Array(shared);

  // 原子操作以确保线程安全
  Atomics.add(view, 0, 1);
  Atomics.notify(view, 0);
};

Node.js worker_threads

// main.ts
import { Worker, isMainThread, parentPort, workerData } from "worker_threads";

if (isMainThread) {
  const worker = new Worker(import.meta.filename, {
    workerData: { numbers: [1, 2, 3, 4, 5] },
  });

  worker.on("message", (result) => {
    console.log("结果:", result);
  });

  worker.on("error", (err) => {
    console.error("错误:", err);
  });

  worker.on("exit", (code) => {
    console.log("Worker 退出代码:", code);
  });
} else {
  // Worker 代码
  const { numbers } = workerData;
  const sum = numbers.reduce((a, b) => a + b, 0);
  parentPort?.postMessage(sum);
}

Worker 池

// worker-pool.ts
import { Worker } from "worker_threads";

class WorkerPool {
  private workers: Worker[] = [];
  private queue: Array<{
    task: any;
    resolve: (value: any) => void;
    reject: (err: Error) => void;
  }> = [];
  private activeWorkers = new Set<Worker>();

  constructor(
    private workerPath: string,
    private poolSize: number
  ) {
    for (let i = 0; i < poolSize; i++) {
      this.addWorker();
    }
  }

  private addWorker() {
    const worker = new Worker(this.workerPath);

    worker.on("message", (result) => {
      this.activeWorkers.delete(worker);
      this.processQueue();
    });

    worker.on("error", (err) => {
      this.activeWorkers.delete(worker);
      console.error("Worker 错误:", err);
    });

    this.workers.push(worker);
  }

  async execute(task: any): Promise<any> {
    return new Promise((resolve, reject) => {
      this.queue.push({ task, resolve, reject });
      this.processQueue();
    });
  }

  private processQueue() {
    for (const worker of this.workers) {
      if (!this.activeWorkers.has(worker) && this.queue.length > 0) {
        const { task, resolve, reject } = this.queue.shift()!;
        this.activeWorkers.add(worker);

        worker.once("message", resolve);
        worker.once("error", reject);
        worker.postMessage(task);
      }
    }
  }

  terminate() {
    this.workers.forEach((w) => w.terminate());
  }
}

// 用法
const pool = new WorkerPool("./worker.ts", 4);
const results = await Promise.all([
  pool.execute({ task: 1 }),
  pool.execute({ task: 2 }),
  pool.execute({ task: 3 }),
]);
pool.terminate();

模式

CPU 密集型任务

// main.ts
const worker = new Worker(new URL("./cpu-worker.ts", import.meta.url));

// 处理大型数据集
const data = Array.from({ length: 1000000 }, () => Math.random());

worker.postMessage({ type: "process", data });

worker.onmessage = (event) => {
  if (event.data.type === "progress") {
    console.log(`进度: ${event.data.percent}%`);
  } else if (event.data.type === "result") {
    console.log("完成:", event.data.result);
  }
};

// cpu-worker.ts
self.onmessage = (event) => {
  const { type, data } = event.data;

  if (type === "process") {
    const chunkSize = 10000;
    let result = 0;

    for (let i = 0; i < data.length; i++) {
      result += Math.sqrt(data[i]);

      // 报告进度
      if (i % chunkSize === 0) {
        self.postMessage({
          type: "progress",
          percent: Math.round((i / data.length) * 100),
        });
      }
    }

    self.postMessage({ type: "result", result });
  }
};

并行映射

async function parallelMap<T, R>(
  items: T[],
  fn: string, // Worker 中的函数名
  workerUrl: URL,
  concurrency = 4
): Promise<R[]> {
  const results: R[] = new Array(items.length);
  const workers: Worker[] = [];

  // 创建 workers
  for (let i = 0; i < concurrency; i++) {
    workers.push(new Worker(workerUrl));
  }

  // 处理项目
  let nextIndex = 0;
  const processNext = (worker: Worker): Promise<void> => {
    return new Promise((resolve) => {
      if (nextIndex >= items.length) {
        resolve();
        return;
      }

      const index = nextIndex++;
      worker.postMessage({ fn, item: items[index], index });

      worker.onmessage = (event) => {
        results[event.data.index] = event.data.result;
        processNext(worker).then(resolve);
      };
    });
  };

  await Promise.all(workers.map(processNext));

  workers.forEach((w) => w.terminate());
  return results;
}

消息通道

// 为 worker 到 worker 通信创建通道
const channel = new MessageChannel();

const worker1 = new Worker(new URL("./worker1.ts", import.meta.url));
const worker2 = new Worker(new URL("./worker2.ts", import.meta.url));

// 给每个 worker 一个端口
worker1.postMessage({ port: channel.port1 }, [channel.port1]);
worker2.postMessage({ port: channel.port2 }, [channel.port2]);

// worker1.ts
let port: MessagePort;
self.onmessage = (event) => {
  if (event.data.port) {
    port = event.data.port;
    port.onmessage = (e) => console.log("来自 worker2:", e.data);
    port.postMessage("Hello from worker1!");
  }
};

错误处理

const worker = new Worker(new URL("./worker.ts", import.meta.url));

worker.onerror = (error) => {
  console.error("Worker 中未捕获的错误:", error.message);
  error.preventDefault(); // 防止冒泡
};

worker.onmessageerror = (event) => {
  console.error("消息反序列化失败");
};

// 在 worker 中
self.onerror = (error) => {
  self.postMessage({ type: "error", message: error.message });
};

终止

const worker = new Worker(new URL("./worker.ts", import.meta.url));

// 请求优雅关闭
worker.postMessage({ type: "shutdown" });

// 超时后强制终止
setTimeout(() => {
  worker.terminate();
}, 5000);

// 在 worker 中
self.onmessage = (event) => {
  if (event.data.type === "shutdown") {
    // 清理
    self.close();
  }
};

常见错误

错误 原因 修复方法
Worker 未找到 URL 错误 检查 worker 文件路径
无法序列化 非可转移数据 使用可转移对象
DataCloneError 消息中包含函数/DOM 只发送可序列化数据
Worker 已终止 提前终止 检查终止逻辑

何时加载参考

加载 references/optimization.md 当:

  • Worker 池调优
  • 内存管理
  • 性能分析

加载 references/patterns.md 当:

  • 复杂协调
  • 背压处理
  • 错误恢复