WebSocket实时通信 websocket-realtime

这个技能涉及使用WebSocket、Socket.io和Server-Sent Events(SSE)等技术实现实时通信。它包括服务器端实现如WebSocket服务器、房间管理、身份认证,客户端重连机制,反模式避免和最佳实践清单。适用于构建聊天应用、实时数据推送等场景。关键词:实时通信、WebSocket、Socket.io、SSE、客户端重连、反模式、后端开发、前端开发、缩放策略、Redis适配器。

后端开发 0 次安装 0 次浏览 更新于 3/8/2026

name: websocket-realtime description: 使用WebSocket、Socket.io、Server-Sent Events和扩展策略的实时通信模式

WebSocket 与 实时通信

WebSocket 服务器

import { WebSocketServer, WebSocket } from "ws";

const wss = new WebSocketServer({ port: 8080 });

const rooms = new Map<string, Set<WebSocket>>();

wss.on("connection", (ws, req) => {
  const userId = authenticateFromUrl(req.url);
  if (!userId) {
    ws.close(4001, "Unauthorized");
    return;
  }

  ws.on("message", (data) => {
    const message = JSON.parse(data.toString());

    switch (message.type) {
      case "join":
        joinRoom(message.room, ws);
        break;
      case "leave":
        leaveRoom(message.room, ws);
        break;
      case "broadcast":
        broadcastToRoom(message.room, message.payload, ws);
        break;
    }
  });

  ws.on("close", () => {
    rooms.forEach((members) => members.delete(ws));
  });

  ws.send(JSON.stringify({ type: "connected", userId }));
});

function joinRoom(room: string, ws: WebSocket) {
  if (!rooms.has(room)) rooms.set(room, new Set());
  rooms.get(room)!.add(ws);
}

function broadcastToRoom(room: string, payload: unknown, sender: WebSocket) {
  const members = rooms.get(room);
  if (!members) return;
  const message = JSON.stringify({ type: "message", room, payload });
  members.forEach((client) => {
    if (client !== sender && client.readyState === WebSocket.OPEN) {
      client.send(message);
    }
  });
}

使用 Socket.io 与房间

import { Server } from "socket.io";
import { createAdapter } from "@socket.io/redis-adapter";
import { createClient } from "redis";

const io = new Server(httpServer, {
  cors: { origin: "https://app.example.com" },
  pingTimeout: 20000,
  pingInterval: 25000,
});

const pubClient = createClient({ url: "redis://localhost:6379" });
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
io.adapter(createAdapter(pubClient, subClient));

io.use(async (socket, next) => {
  const token = socket.handshake.auth.token;
  try {
    socket.data.user = verifyToken(token);
    next();
  } catch {
    next(new Error("Authentication failed"));
  }
});

io.on("connection", (socket) => {
  socket.join(`user:${socket.data.user.id}`);

  socket.on("chat:join", (roomId) => {
    socket.join(`chat:${roomId}`);
    socket.to(`chat:${roomId}`).emit("chat:userJoined", socket.data.user);
  });

  socket.on("chat:message", async ({ roomId, text }) => {
    const message = await saveMessage(roomId, socket.data.user.id, text);
    io.to(`chat:${roomId}`).emit("chat:message", message);
  });

  socket.on("disconnect", () => {
    console.log(`User ${socket.data.user.id} disconnected`);
  });
});

服务器发送事件 (SSE)

app.get("/events/:userId", authenticate, (req, res) => {
  res.writeHead(200, {
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache",
    Connection: "keep-alive",
  });

  const sendEvent = (event: string, data: unknown) => {
    res.write(`event: ${event}
`);
    res.write(`data: ${JSON.stringify(data)}

`);
  };

  sendEvent("connected", { userId: req.params.userId });

  const interval = setInterval(() => {
    res.write(":heartbeat

");
  }, 30000);

  const listener = (message: string) => {
    const event = JSON.parse(message);
    sendEvent(event.type, event.data);
  };

  redis.subscribe(`user:${req.params.userId}`, listener);

  req.on("close", () => {
    clearInterval(interval);
    redis.unsubscribe(`user:${req.params.userId}`, listener);
  });
});

SSE 比 WebSocket 更简单,适用于服务器到客户端的单向流式传输。无需特殊配置即可通过 HTTP 代理和负载均衡器工作。

客户端重连

class ReconnectingWebSocket {
  private ws: WebSocket | null = null;
  private retryCount = 0;
  private maxRetries = 10;

  constructor(private url: string) {
    this.connect();
  }

  private connect() {
    this.ws = new WebSocket(this.url);
    this.ws.onopen = () => { this.retryCount = 0; };
    this.ws.onclose = () => { this.scheduleReconnect(); };
    this.ws.onerror = () => { this.ws?.close(); };
  }

  private scheduleReconnect() {
    if (this.retryCount >= this.maxRetries) return;
    const delay = Math.min(1000 * 2 ** this.retryCount, 30000);
    this.retryCount++;
    setTimeout(() => this.connect(), delay);
  }

  send(data: string) {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(data);
    }
  }
}

反模式

  • 在握手过程中未对 WebSocket 连接进行身份验证
  • 发送无限大小的负载而不限制消息大小
  • 缺少心跳/乒乓机制以检测过时连接
  • 当 SSE 足够时使用 WebSocket(仅服务器到客户端)
  • 未使用 Redis 适配器进行 Socket.io 的水平扩展
  • 使用同步处理消息阻塞事件循环

清单

  • [ ] WebSocket 连接在握手时进行身份验证
  • [ ] 对传入数据强制执行消息大小限制
  • [ ] 心跳机制检测并关闭过时连接
  • [ ] 客户端实现指数退避重连
  • [ ] 使用 Redis 发布/订阅适配器进行多服务器部署
  • [ ] 当通信仅服务器到客户端时使用 SSE
  • [ ] 断开连接时清理房间/频道成员资格
  • [ ] 应用速率限制以防止消息泛滥