名称: websockets-realtime
描述: 使用WebSockets、Server-Sent Events和相关技术进行实时通信。在构建聊天、实时更新、协作功能或任何实时功能时使用。
WebSockets 与实时通信
构建实时应用程序的综合指南。
实时技术
比较
| 技术 |
方向 |
使用案例 |
| WebSocket |
双向 |
聊天、游戏、协作 |
| Server-Sent Events |
服务器 → 客户端 |
实时信息流、通知 |
| 长轮询 |
模拟双向 |
后备、简单更新 |
| WebRTC |
对等网络 |
视频通话、文件共享 |
何时使用什么
WEBSOCKETS:
✓ 聊天应用程序
✓ 实时协作
✓ 游戏
✓ 金融交易
✓ IoT 仪表板
✓ 任何双向通信
SERVER-SENT EVENTS (SSE):
✓ 实时信息流(新闻、体育)
✓ 通知
✓ 进度更新
✓ 仅服务器发起的更新
LONG POLLING:
✓ 当WebSocket不可用时作为后备
✓ 简单、不频繁的更新
✓ 在严格防火墙后
WEBRTC:
✓ 视频/音频通话
✓ 屏幕共享
✓ 对等文件传输
WebSocket 基础
WebSockets 如何工作
HTTP 升级握手:
┌──────┐ ┌──────┐
│客户端│ GET /ws HTTP/1.1 │服务器│
│ │ Upgrade: websocket │ │
│ │ ──────────────────> │ │
│ │ │ │
│ │ HTTP/1.1 101 │ │
│ │ 切换协议 │ │
│ │ <────────────────── │ │
└──────┘ └──────┘
握手后:
┌──────┐ ┌──────┐
│客户端│ <═══════════════════>│服务器│
│ │ 全双工 TCP │ │
│ │ 二进制或文本 │ │
└──────┘ └──────┘
客户端实现
// 基本WebSocket客户端
const ws = new WebSocket("wss://api.example.com/ws");
ws.onopen = () => {
console.log("已连接");
ws.send(JSON.stringify({ type: "subscribe", channel: "updates" }));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log("接收到:", data);
};
ws.onerror = (error) => {
console.error("WebSocket错误:", error);
};
ws.onclose = (event) => {
console.log("断开连接:", event.code, event.reason);
};
// 发送消息
ws.send(JSON.stringify({ type: "message", content: "你好!" }));
// 关闭连接
ws.close(1000, "正常关闭");
重连逻辑
class ReconnectingWebSocket {
private ws: WebSocket | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
private reconnectDelay = 1000;
constructor(private url: string) {
this.connect();
}
private connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log("已连接");
this.reconnectAttempts = 0;
};
this.ws.onclose = (event) => {
if (event.code !== 1000) {
this.reconnect();
}
};
this.ws.onerror = () => {
this.ws?.close();
};
}
private reconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("达到最大重连尝试次数");
return;
}
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(
`在 ${delay}ms 后重连 (尝试 ${this.reconnectAttempts})`,
);
setTimeout(() => this.connect(), delay);
}
send(data: unknown) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
}
}
}
服务器实现 (Node.js)
ws 库
import { WebSocketServer, WebSocket } from "ws";
import { createServer } from "http";
const server = createServer();
const wss = new WebSocketServer({ server });
// 跟踪连接的客户端
const clients = new Set<WebSocket>();
wss.on("connection", (ws, request) => {
console.log("客户端已连接");
clients.add(ws);
// 发送欢迎消息
ws.send(JSON.stringify({ type: "connected", clientCount: clients.size }));
ws.on("message", (data) => {
try {
const message = JSON.parse(data.toString());
handleMessage(ws, message);
} catch (error) {
ws.send(JSON.stringify({ type: "error", message: "无效的JSON" }));
}
});
ws.on("close", () => {
clients.delete(ws);
console.log("客户端断开连接");
});
ws.on("error", (error) => {
console.error("WebSocket错误:", error);
});
// 心跳检测以识别空闲连接
ws.isAlive = true;
ws.on("pong", () => {
ws.isAlive = true;
});
});
// 心跳间隔
const heartbeatInterval = setInterval(() => {
wss.clients.forEach((ws) => {
if (!ws.isAlive) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
wss.on("close", () => {
clearInterval(heartbeatInterval);
});
function handleMessage(ws: WebSocket, message: any) {
switch (message.type) {
case "broadcast":
broadcast(message.content);
break;
case "private":
// 处理私密消息
break;
default:
ws.send(
JSON.stringify({ type: "error", message: "未知消息类型" }),
);
}
}
function broadcast(content: any) {
const message = JSON.stringify({ type: "broadcast", content });
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
server.listen(3000);
import { Server } from "socket.io";
import { createServer } from "http";
const httpServer = createServer();
const io = new Server(httpServer, {
cors: {
origin: "https://example.com",
methods: ["GET", "POST"],
},
});
// 聊天命名空间
const chat = io.of("/chat");
chat.on("connection", (socket) => {
console.log("用户已连接:", socket.id);
// 加入房间
socket.on("join", (room: string) => {
socket.join(room);
socket.to(room).emit("user_joined", { userId: socket.id });
});
// 处理消息
socket.on("message", (data: { room: string; content: string }) => {
chat.to(data.room).emit("message", {
from: socket.id,
content: data.content,
timestamp: Date.now(),
});
});
// 离开房间
socket.on("leave", (room: string) => {
socket.leave(room);
socket.to(room).emit("user_left", { userId: socket.id });
});
socket.on("disconnect", () => {
console.log("用户断开连接:", socket.id);
});
});
// 认证中间件
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (validateToken(token)) {
socket.data.user = decodeToken(token);
next();
} else {
next(new Error("认证错误"));
}
});
httpServer.listen(3000);
Server-Sent Events (SSE)
服务器实现
import express from "express";
const app = express();
app.get("/events", (req, res) => {
// 设置SSE头
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
// 发送初始事件
res.write("event: connected
");
res.write('data: {"status": "connected"}
');
// 发送定期更新
const interval = setInterval(() => {
const data = JSON.stringify({
timestamp: Date.now(),
value: Math.random(),
});
res.write(`data: ${data}
`);
}, 1000);
// 断开连接时清理
req.on("close", () => {
clearInterval(interval);
res.end();
});
});
app.listen(3000);
客户端实现
const eventSource = new EventSource("/events");
eventSource.onopen = () => {
console.log("SSE连接已打开");
};
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log("接收到:", data);
};
eventSource.addEventListener("connected", (event) => {
console.log("连接事件:", event.data);
});
eventSource.onerror = (error) => {
console.error("SSE错误:", error);
if (eventSource.readyState === EventSource.CLOSED) {
// 需要时的重连逻辑
}
};
// 关闭连接
eventSource.close();
消息协议
JSON 消息格式
// 定义消息类型
interface BaseMessage {
type: string;
timestamp: number;
id: string;
}
interface ChatMessage extends BaseMessage {
type: "chat";
room: string;
content: string;
sender: string;
}
interface PresenceMessage extends BaseMessage {
type: "presence";
status: "online" | "offline" | "away";
userId: string;
}
interface ErrorMessage extends BaseMessage {
type: "error";
code: string;
message: string;
}
type Message = ChatMessage | PresenceMessage | ErrorMessage;
// 类型安全的消息处理
function handleMessage(data: string) {
const message: Message = JSON.parse(data);
switch (message.type) {
case "chat":
displayChatMessage(message);
break;
case "presence":
updateUserPresence(message);
break;
case "error":
handleError(message);
break;
}
}
二进制协议
// 对于高性能需求,使用二进制格式
// MessagePack
import { encode, decode } from "@msgpack/msgpack";
const encoded = encode({ type: "position", x: 100, y: 200 });
ws.send(encoded);
ws.onmessage = (event) => {
const data = decode(event.data);
};
// Protocol Buffers
// 在 .proto 文件中定义架构,生成类型
// 消息更小,序列化更快
缩放 WebSockets
架构
负载均衡器
(粘性会话)
│
┌────────────────┼────────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 服务器 1│ │ 服务器 2│ │ 服务器 3│
│ (Node) │ │ (Node) │ │ (Node) │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└────────────────┼────────────────┘
│
┌─────────┐
│ Redis │
│ 发布/订阅 │
└─────────┘
Redis Pub/Sub 用于跨服务器消息
import Redis from "ioredis";
import { WebSocketServer } from "ws";
const pub = new Redis();
const sub = new Redis();
const wss = new WebSocketServer({ port: 3000 });
// 订阅频道
sub.subscribe("broadcast");
// 将Redis消息转发到本地客户端
sub.on("message", (channel, message) => {
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
});
// 发布消息到Redis
function broadcast(message: object) {
pub.publish("broadcast", JSON.stringify(message));
}
// 从WebSocket接收,发布到Redis
wss.on("connection", (ws) => {
ws.on("message", (data) => {
broadcast(JSON.parse(data.toString()));
});
});
import { Server } from "socket.io";
import { createAdapter } from "@socket.io/redis-adapter";
import { createClient } from "redis";
const pubClient = createClient({ url: "redis://localhost:6379" });
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
const io = new Server();
io.adapter(createAdapter(pubClient, subClient));
// 现在消息自动在服务器间同步
io.emit("notification", { message: "所有服务器,大家好!" });
React 集成
自定义钩子
import { useEffect, useRef, useState, useCallback } from 'react';
interface UseWebSocketOptions {
url: string;
onMessage?: (data: any) => void;
reconnect?: boolean;
}
export function useWebSocket(options: UseWebSocketOptions) {
const { url, onMessage, reconnect = true } = options;
const wsRef = useRef<WebSocket | null>(null);
const [isConnected, setIsConnected] = useState(false);
const [lastMessage, setLastMessage] = useState<any>(null);
const connect = useCallback(() => {
const ws = new WebSocket(url);
ws.onopen = () => setIsConnected(true);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
setLastMessage(data);
onMessage?.(data);
};
ws.onclose = () => {
setIsConnected(false);
if (reconnect) {
setTimeout(connect, 3000);
}
};
wsRef.current = ws;
}, [url, onMessage, reconnect]);
useEffect(() => {
connect();
return () => {
wsRef.current?.close();
};
}, [connect]);
const send = useCallback((data: any) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify(data));
}
}, []);
return { isConnected, lastMessage, send };
}
// 用法
function ChatComponent() {
const { isConnected, lastMessage, send } = useWebSocket({
url: 'wss://api.example.com/chat',
onMessage: (data) => {
console.log('新消息:', data);
},
});
return (
<div>
<span>状态: {isConnected ? '已连接' : '断开连接'}</span>
<button onClick={() => send({ type: 'message', content: '你好!' })}>
发送
</button>
</div>
);
}
import { io, Socket } from 'socket.io-client';
import { createContext, useContext, useEffect, useState } from 'react';
const SocketContext = createContext<Socket | null>(null);
export function SocketProvider({ children }: { children: React.ReactNode }) {
const [socket, setSocket] = useState<Socket | null>(null);
useEffect(() => {
const newSocket = io('https://api.example.com', {
auth: { token: getAuthToken() },
});
setSocket(newSocket);
return () => {
newSocket.close();
};
}, []);
return (
<SocketContext.Provider value={socket}>{children}</SocketContext.Provider>
);
}
export function useSocket() {
const socket = useContext(SocketContext);
if (!socket) {
throw new Error('useSocket 必须在 SocketProvider 内使用');
}
return socket;
}
// 用法
function ChatRoom({ roomId }: { roomId: string }) {
const socket = useSocket();
const [messages, setMessages] = useState<Message[]>([]);
useEffect(() => {
socket.emit('join', roomId);
socket.on('message', (message: Message) => {
setMessages((prev) => [...prev, message]);
});
return () => {
socket.emit('leave', roomId);
socket.off('message');
};
}, [socket, roomId]);
const sendMessage = (content: string) => {
socket.emit('message', { room: roomId, content });
};
return (/* 渲染消息 */);
}
安全
认证
// 基于令牌的认证
const ws = new WebSocket("wss://api.example.com/ws");
ws.onopen = () => {
// 立即发送认证消息
ws.send(
JSON.stringify({
type: "auth",
token: localStorage.getItem("token"),
}),
);
};
// 服务器端验证
wss.on("connection", (ws, request) => {
let authenticated = false;
const authTimeout = setTimeout(() => {
if (!authenticated) {
ws.close(4001, "认证超时");
}
}, 5000);
ws.on("message", (data) => {
const message = JSON.parse(data.toString());
if (message.type === "auth") {
if (validateToken(message.token)) {
authenticated = true;
clearTimeout(authTimeout);
ws.send(JSON.stringify({ type: "auth_success" }));
} else {
ws.close(4002, "无效令牌");
}
} else if (!authenticated) {
ws.close(4003, "未认证");
}
});
});
速率限制
const rateLimits = new Map<WebSocket, { count: number; timestamp: number }>();
function checkRateLimit(ws: WebSocket): boolean {
const now = Date.now();
const limit = rateLimits.get(ws);
if (!limit || now - limit.timestamp > 1000) {
rateLimits.set(ws, { count: 1, timestamp: now });
return true;
}
if (limit.count >= 10) {
// 每秒10条消息
return false;
}
limit.count++;
return true;
}
ws.on("message", (data) => {
if (!checkRateLimit(ws)) {
ws.send(JSON.stringify({ type: "error", message: "超出速率限制" }));
return;
}
// 处理消息
});
最佳实践
应做:
- 在生产中使用 WSS (WebSocket Secure)
- 实现心跳/乒乓机制
- 优雅处理重连
- 认证连接
- 验证所有传入消息
- 使用消息ID进行确认
- 实现背压处理
- 监控连接健康
不应做:
- 未经验证信任客户端数据
- 未加密发送敏感数据
- 无限期保持连接打开
- 忽略断开连接处理
- 阻塞消息处理器
- 发送无界数据
- 忘记水平缩放
故障排除
常见问题
| 问题 |
原因 |
解决方案 |
| 连接断开 |
空闲超时 |
实现心跳 |
| 消息丢失 |
无确认 |
添加消息ID + 确认 |
| 高延迟 |
大消息 |
使用二进制,压缩 |
| 内存泄漏 |
未关闭的连接 |
正确清理 |
| 跨源被阻止 |
缺少CORS |
配置服务器CORS |
调试日志
// 开发日志
if (process.env.NODE_ENV === "development") {
ws.on("message", (data) => {
console.log("← 接收到:", JSON.parse(data.toString()));
});
const originalSend = ws.send.bind(ws);
ws.send = (data: string) => {
console.log("→ 发送:", JSON.parse(data));
originalSend(data);
};
}