name: websocket-realtime description: 使用WebSocket、Socket.io、Server-Sent Events和扩展策略的实时通信模式
WebSocket 与 实时通信
WebSocket 服务器
import { WebSocketServer, WebSocket } from "ws";
const wss = new WebSocketServer({ port: 8080 });
const rooms = new Map<string, Set<WebSocket>>();
wss.on("connection", (ws, req) => {
const userId = authenticateFromUrl(req.url);
if (!userId) {
ws.close(4001, "Unauthorized");
return;
}
ws.on("message", (data) => {
const message = JSON.parse(data.toString());
switch (message.type) {
case "join":
joinRoom(message.room, ws);
break;
case "leave":
leaveRoom(message.room, ws);
break;
case "broadcast":
broadcastToRoom(message.room, message.payload, ws);
break;
}
});
ws.on("close", () => {
rooms.forEach((members) => members.delete(ws));
});
ws.send(JSON.stringify({ type: "connected", userId }));
});
function joinRoom(room: string, ws: WebSocket) {
if (!rooms.has(room)) rooms.set(room, new Set());
rooms.get(room)!.add(ws);
}
function broadcastToRoom(room: string, payload: unknown, sender: WebSocket) {
const members = rooms.get(room);
if (!members) return;
const message = JSON.stringify({ type: "message", room, payload });
members.forEach((client) => {
if (client !== sender && client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
使用 Socket.io 与房间
import { Server } from "socket.io";
import { createAdapter } from "@socket.io/redis-adapter";
import { createClient } from "redis";
const io = new Server(httpServer, {
cors: { origin: "https://app.example.com" },
pingTimeout: 20000,
pingInterval: 25000,
});
const pubClient = createClient({ url: "redis://localhost:6379" });
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
io.adapter(createAdapter(pubClient, subClient));
io.use(async (socket, next) => {
const token = socket.handshake.auth.token;
try {
socket.data.user = verifyToken(token);
next();
} catch {
next(new Error("Authentication failed"));
}
});
io.on("connection", (socket) => {
socket.join(`user:${socket.data.user.id}`);
socket.on("chat:join", (roomId) => {
socket.join(`chat:${roomId}`);
socket.to(`chat:${roomId}`).emit("chat:userJoined", socket.data.user);
});
socket.on("chat:message", async ({ roomId, text }) => {
const message = await saveMessage(roomId, socket.data.user.id, text);
io.to(`chat:${roomId}`).emit("chat:message", message);
});
socket.on("disconnect", () => {
console.log(`User ${socket.data.user.id} disconnected`);
});
});
服务器发送事件 (SSE)
app.get("/events/:userId", authenticate, (req, res) => {
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
const sendEvent = (event: string, data: unknown) => {
res.write(`event: ${event}
`);
res.write(`data: ${JSON.stringify(data)}
`);
};
sendEvent("connected", { userId: req.params.userId });
const interval = setInterval(() => {
res.write(":heartbeat
");
}, 30000);
const listener = (message: string) => {
const event = JSON.parse(message);
sendEvent(event.type, event.data);
};
redis.subscribe(`user:${req.params.userId}`, listener);
req.on("close", () => {
clearInterval(interval);
redis.unsubscribe(`user:${req.params.userId}`, listener);
});
});
SSE 比 WebSocket 更简单,适用于服务器到客户端的单向流式传输。无需特殊配置即可通过 HTTP 代理和负载均衡器工作。
客户端重连
class ReconnectingWebSocket {
private ws: WebSocket | null = null;
private retryCount = 0;
private maxRetries = 10;
constructor(private url: string) {
this.connect();
}
private connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => { this.retryCount = 0; };
this.ws.onclose = () => { this.scheduleReconnect(); };
this.ws.onerror = () => { this.ws?.close(); };
}
private scheduleReconnect() {
if (this.retryCount >= this.maxRetries) return;
const delay = Math.min(1000 * 2 ** this.retryCount, 30000);
this.retryCount++;
setTimeout(() => this.connect(), delay);
}
send(data: string) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(data);
}
}
}
反模式
- 在握手过程中未对 WebSocket 连接进行身份验证
- 发送无限大小的负载而不限制消息大小
- 缺少心跳/乒乓机制以检测过时连接
- 当 SSE 足够时使用 WebSocket(仅服务器到客户端)
- 未使用 Redis 适配器进行 Socket.io 的水平扩展
- 使用同步处理消息阻塞事件循环
清单
- [ ] WebSocket 连接在握手时进行身份验证
- [ ] 对传入数据强制执行消息大小限制
- [ ] 心跳机制检测并关闭过时连接
- [ ] 客户端实现指数退避重连
- [ ] 使用 Redis 发布/订阅适配器进行多服务器部署
- [ ] 当通信仅服务器到客户端时使用 SSE
- [ ] 断开连接时清理房间/频道成员资格
- [ ] 应用速率限制以防止消息泛滥