name: Bun WebSocket 服务器 description: 当用户询问"Bun中的WebSocket"、“实时通信”、“Bun.serve websocket”、“ws服务器”、“socket连接”、“pub/sub”、“广播消息”、"WebSocket升级"或使用Bun构建实时应用程序时,应使用此技能。 version: 1.0.0
Bun WebSocket 服务器
Bun内置了与Bun.serve()集成的WebSocket支持。
快速开始
const server = Bun.serve({
fetch(req, server) {
// 升级到WebSocket
if (server.upgrade(req)) {
return; // 升级成功
}
return new Response("不是WebSocket请求", { status: 400 });
},
websocket: {
open(ws) {
console.log("客户端连接");
},
message(ws, message) {
console.log("接收:", message);
ws.send(`回显: ${message}`);
},
close(ws) {
console.log("客户端断开连接");
},
},
});
console.log(`WebSocket服务器运行在 ws://localhost:${server.port}`);
WebSocket处理器
Bun.serve({
fetch(req, server) {
server.upgrade(req);
},
websocket: {
// 客户端连接
open(ws) {
console.log("新连接");
},
// 消息接收
message(ws, message) {
// message 是 string | Buffer
if (typeof message === "string") {
console.log("文本:", message);
} else {
console.log("二进制:", message);
}
},
// 连接关闭
close(ws, code, reason) {
console.log(`关闭: ${code} - ${reason}`);
},
// 缓冲清除事件
drain(ws) {
console.log("缓冲清除");
},
// 接收到Ping
ping(ws, data) {
// Pong自动发送
},
// 接收到Pong
pong(ws, data) {
console.log("接收到Pong");
},
},
});
发送消息
websocket: {
message(ws, message) {
// 发送文本
ws.send("你好");
// 发送JSON
ws.send(JSON.stringify({ type: "问候", data: "你好" }));
// 发送二进制
ws.send(new Uint8Array([1, 2, 3]));
ws.send(Buffer.from("二进制数据"));
// 带压缩发送
ws.send("压缩消息", true);
// 检查缓冲是否满
const bufferedAmount = ws.send("数据");
if (bufferedAmount > 1024 * 1024) {
console.log("缓冲即将满");
}
},
}
为连接附加数据
interface UserData {
id: string;
name: string;
joinedAt: Date;
}
Bun.serve<UserData>({
fetch(req, server) {
const url = new URL(req.url);
const userId = url.searchParams.get("userId");
// 升级时附加数据
server.upgrade(req, {
data: {
id: userId,
name: "用户 " + userId,
joinedAt: new Date(),
},
});
},
websocket: {
open(ws) {
// 访问附加数据
console.log(`${ws.data.name} 连接`);
},
message(ws, message) {
console.log(`${ws.data.name}: ${message}`);
},
},
});
Pub/Sub (主题)
Bun.serve({
fetch(req, server) {
const url = new URL(req.url);
const room = url.searchParams.get("room") || "general";
server.upgrade(req, {
data: { room },
});
},
websocket: {
open(ws) {
// 订阅主题
ws.subscribe(ws.data.room);
// 发布到主题(排除发送者)
ws.publish(ws.data.room, `用户加入 ${ws.data.room}`);
},
message(ws, message) {
// 广播给房间内所有用户(排除发送者)
ws.publish(ws.data.room, message);
},
close(ws) {
// 取消订阅(关闭时自动)
ws.unsubscribe(ws.data.room);
ws.publish(ws.data.room, "用户离开");
},
},
});
广播给所有客户端
Bun.serve({
fetch(req, server) {
server.upgrade(req);
},
websocket: {
open(ws) {
// 订阅全局主题
ws.subscribe("global");
},
message(ws, message) {
// 广播给所有客户端包括发送者
server.publish("global", message);
},
},
});
服务器级发布
const server = Bun.serve({
fetch(req, server) {
const url = new URL(req.url);
// HTTP端点发布
if (url.pathname === "/broadcast") {
const message = url.searchParams.get("msg");
server.publish("global", message);
return new Response("已广播");
}
server.upgrade(req);
},
websocket: {
open(ws) {
ws.subscribe("global");
},
},
});
// 也可以在fetch外部发布
setInterval(() => {
server.publish("global", `服务器时间: ${new Date().toISOString()}`);
}, 5000);
WebSocket选项
Bun.serve({
websocket: {
// 最大消息大小(默认16MB)
maxPayloadLength: 1024 * 1024, // 1MB
// 空闲超时时间(秒)(默认120)
idleTimeout: 60,
// 背压限制
backpressureLimit: 1024 * 1024,
// 启用压缩
perMessageDeflate: true,
// 或带选项
perMessageDeflate: {
compress: "shared",
decompress: "shared",
},
// 发送/接收ping
sendPings: true,
// 处理器
open(ws) {},
message(ws, message) {},
close(ws) {},
},
});
客户端连接
// 浏览器
const ws = new WebSocket("ws://localhost:3000");
ws.onopen = () => {
ws.send("你好服务器!");
};
ws.onmessage = (event) => {
console.log("接收:", event.data);
};
ws.onclose = () => {
console.log("断开连接");
};
认证
Bun.serve({
fetch(req, server) {
// 升级前验证认证
const token = req.headers.get("Authorization");
if (!verifyToken(token)) {
return new Response("未授权", { status: 401 });
}
const user = decodeToken(token);
server.upgrade(req, {
data: { userId: user.id },
});
},
websocket: {
open(ws) {
console.log(`认证用户 ${ws.data.userId} 连接`);
},
},
});
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
升级失败 |
无效请求 | 检查升级头 |
连接关闭 |
客户端断开 | 在关闭处理器中处理 |
消息过大 |
超过maxPayloadLength | 增加限制或分块数据 |
背压 |
客户端慢 | 检查缓冲,等待清除 |
常见模式
聊天室
Bun.serve({
fetch(req, server) {
const url = new URL(req.url);
const username = url.searchParams.get("user") || "匿名";
server.upgrade(req, {
data: { username },
});
},
websocket: {
open(ws) {
ws.subscribe("chat");
ws.publish("chat", `${ws.data.username} 加入`);
},
message(ws, message) {
ws.publish("chat", `${ws.data.username}: ${message}`);
},
close(ws) {
ws.publish("chat", `${ws.data.username} 离开`);
},
},
});
何时加载参考
加载 references/compression.md 当:
- perMessageDeflate配置
- 压缩调优
- 二进制消息处理
加载 references/scaling.md 当:
- 多个服务器实例
- Redis pub/sub集成
- 水平扩展