实时特性
概述
实现客户端与服务器之间的实时双向通信,以便即时数据同步和实时更新。
何时使用
- 聊天和消息应用
- 实时仪表板和分析
- 协作编辑(Google文档风格)
- 实时通知
- 现场体育比分或股票行情
- 多人游戏
- 现场拍卖或竞价系统
- IoT设备监控
- 实时位置跟踪
技术比较
| 技术 | 方向 | 用例 | 浏览器支持 |
|---|---|---|---|
| WebSockets | 双向 | 聊天,游戏,协作 | 优秀 |
| SSE | 服务器 → 客户端 | 实时更新,通知 | 良好(不支持IE) |
| 长轮询 | 请求/响应 | 回退,简单更新 | 通用 |
| WebRTC | 点对点 | 视频/音频流 | 良好 |
实现示例
1. WebSocket服务器(Node.js)
// server.ts
import WebSocket, { WebSocketServer } from 'ws';
import { createServer } from 'http';
interface Message {
type: 'join' | 'message' | 'leave' | 'typing';
userId: string;
username: string;
content?: string;
timestamp: number;
}
interface Client {
ws: WebSocket;
userId: string;
username: string;
roomId: string;
}
class ChatServer {
private wss: WebSocketServer;
private clients: Map<string, Client> = new Map();
private rooms: Map<string, Set<string>> = new Map();
constructor(port: number) {
const server = createServer();
this.wss = new WebSocketServer({ server });
this.wss.on('connection', this.handleConnection.bind(this));
server.listen(port, () => {
console.log(`WebSocket服务器运行在端口${port}`);
});
// 心跳检测以检测断开连接
this.startHeartbeat();
}
private handleConnection(ws: WebSocket): void {
const clientId = this.generateId();
console.log(`新连接:${clientId}`);
ws.on('message', (data: string) => {
try {
const message: Message = JSON.parse(data.toString());
this.handleMessage(clientId, message, ws);
} catch (error) {
console.error('无效的消息格式:', error);
}
});
ws.on('close', () => {
this.handleDisconnect(clientId);
});
ws.on('error', (error) => {
console.error(`WebSocket错误${clientId}:`, error);
});
// 保持连接活跃
(ws as any).isAlive = true;
ws.on('pong', () => {
(ws as any).isAlive = true;
});
}
private handleMessage(
clientId: string,
message: Message,
ws: WebSocket
): void {
switch (message.type) {
case 'join':
this.handleJoin(clientId, message, ws);
break;
case 'message':
this.broadcastToRoom(clientId, message);
break;
case 'typing':
this.broadcastToRoom(clientId, message, [clientId]);
break;
case 'leave':
this.handleDisconnect(clientId);
break;
}
}
private handleJoin(
clientId: string,
message: Message,
ws: WebSocket
): void {
const client: Client = {
ws,
userId: message.userId,
username: message.username,
roomId: 'general' // 可以是动态的
};
this.clients.set(clientId, client);
// 添加到房间
if (!this.rooms.has(client.roomId)) {
this.rooms.set(client.roomId, new Set());
}
this.rooms.get(client.roomId)!.add(clientId);
// 通知房间
this.broadcastToRoom(clientId, {
type: 'join',
userId: message.userId,
username: message.username,
timestamp: Date.now()
});
// 发送房间状态给新用户
this.sendRoomState(clientId);
}
private broadcastToRoom(
senderId: string,
message: Message,
exclude: string[] = []
): void {
const sender = this.clients.get(senderId);
if (!sender) return;
const roomClients = this.rooms.get(sender.roomId);
if (!roomClients) return;
const payload = JSON.stringify(message);
roomClients.forEach(clientId => {
if (!exclude.includes(clientId)) {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(payload);
}
}
});
}
private sendRoomState(clientId: string): void {
const client = this.clients.get(clientId);
if (!client) return;
const roomClients = this.rooms.get(client.roomId);
if (!roomClients) return;
const users = Array.from(roomClients)
.map(id => this.clients.get(id))
.filter(c => c && c.userId !== client.userId)
.map(c => ({ userId: c!.userId, username: c!.username }));
client.ws.send(JSON.stringify({
type: 'room_state',
users,
timestamp: Date.now()
}));
}
private handleDisconnect(clientId: string): void {
const client = this.clients.get(clientId);
if (!client) return;
// 从房间移除
const roomClients = this.rooms.get(client.roomId);
if (roomClients) {
roomClients.delete(clientId);
// 通知其他人
this.broadcastToRoom(clientId, {
type: 'leave',
userId: client.userId,
username: client.username,
timestamp: Date.now()
});
}
this.clients.delete(clientId);
console.log(`客户端断开连接:${clientId}`);
}
private startHeartbeat(): void {
setInterval(() => {
this.wss.clients.forEach((ws: any) => {
if (ws.isAlive === false) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
}
private generateId(): string {
return Math.random().toString(36).substr(2, 9);
}
}
// 启动服务器
new ChatServer(8080);
2. WebSocket客户端(React)
// useWebSocket.ts
import { useEffect, useRef, useState, useCallback } from 'react';
interface UseWebSocketOptions {
url: string;
onMessage?: (data: any) => void;
onOpen?: () => void;
onClose?: () => void;
onError?: (error: Event) => void;
reconnectAttempts?: number;
reconnectInterval?: number;
}
export const useWebSocket = (options: UseWebSocketOptions) => {
const {
url,
onMessage,
onOpen,
onClose,
onError,
reconnectAttempts = 5,
reconnectInterval = 3000
} = options;
const [isConnected, setIsConnected] = useState(false);
const [connectionStatus, setConnectionStatus] = useState<
'connecting' | 'connected' | 'disconnected' | 'error'
>('connecting');
const wsRef = useRef<WebSocket | null>(null);
const reconnectCountRef = useRef(0);
const reconnectTimeoutRef = useRef<NodeJS.Timeout>();
const connect = useCallback(() => {
try {
setConnectionStatus('connecting');
const ws = new WebSocket(url);
ws.onopen = () => {
console.log('WebSocket已连接');
setIsConnected(true);
setConnectionStatus('connected');
reconnectCountRef.current = 0;
onOpen?.();
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
onMessage?.(data);
} catch (error) {
console.error('消息解析失败:', error);
}
};
ws.onerror = (error) => {
console.error('WebSocket错误:', error);
setConnectionStatus('error');
onError?.(error);
};
ws.onclose = () => {
console.log('WebSocket断开连接');
setIsConnected(false);
setConnectionStatus('disconnected');
onClose?.();
// 尝试重新连接
if (reconnectCountRef.current < reconnectAttempts) {
reconnectCountRef.current++;
console.log(
`重新连接... (${reconnectCountRef.current}/${reconnectAttempts})`
);
reconnectTimeoutRef.current = setTimeout(() => {
connect();
}, reconnectInterval);
}
};
wsRef.current = ws;
} catch (error) {
console.error('连接失败:', error);
setConnectionStatus('error');
}
}, [url, onMessage, onOpen, onClose, onError, reconnectAttempts, reconnectInterval]);
const disconnect = useCallback(() => {
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
}
wsRef.current?.close();
wsRef.current = null;
}, []);
const send = useCallback((data: any) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify(data));
} else {
console.warn('WebSocket未连接');
}
}, []);
useEffect(() => {
connect();
return () => {
disconnect();
};
}, [connect, disconnect]);
return {
isConnected,
connectionStatus,
send,
disconnect,
reconnect: connect
};
};
// 组件中的使用
const ChatComponent: React.FC = () => {
const [messages, setMessages] = useState<any[]>([]);
const { isConnected, send } = useWebSocket({
url: 'ws://localhost:8080',
onMessage: (data) => {
if (data.type === 'message') {
setMessages(prev => [...prev, data]);
}
},
onOpen: () => {
send({
type: 'join',
userId: 'user123',
username: 'John Doe',
timestamp: Date.now()
});
}
});
const sendMessage = (content: string) => {
send({
type: 'message',
userId: 'user123',
username: 'John Doe',
content,
timestamp: Date.now()
});
};
return (
<div>
<div>状态:{isConnected ? '已连接' : '未连接'}</div>
<div>
{messages.map((msg, i) => (
<div key={i}>{msg.username}: {msg.content}</div>
))}
</div>
</div>
);
};
3. 服务器发送事件(SSE)
// server.ts - SSE端点
import express from 'express';
const app = express();
interface Client {
id: string;
res: express.Response;
}
class SSEManager {
private clients: Client[] = [];
addClient(id: string, res: express.Response): void {
// 设置SSE头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
this.clients.push({ id, res });
// 发送初始连接事件
this.sendToClient(id, {
type: 'connected',
clientId: id,
timestamp: Date.now()
});
console.log(`客户端${id}已连接。总数:${this.clients.length}`);
}
removeClient(id: string): void {
this.clients = this.clients.filter(client => client.id !== id);
console.log(`客户端${id}已断开连接。总数:${this.clients.length}`);
}
sendToClient(id: string, data: any): void {
const client = this.clients.find(c => c.id === id);
if (client) {
client.res.write(`data: ${JSON.stringify(data)}
`);
}
}
broadcast(data: any, excludeId?: string): void {
const message = `data: ${JSON.stringify(data)}
`;
this.clients.forEach(client => {
if (client.id !== excludeId) {
client.res.write(message);
}
});
}
sendEvent(event: string, data: any): void {
const message = `event: ${event}
data: ${JSON.stringify(data)}
`;
this.clients.forEach(client => {
client.res.write(message);
});
}
}
const sseManager = new SSEManager();
app.get('/events', (req, res) => {
const clientId = Math.random().toString(36).substr(2, 9);
sseManager.addClient(clientId, res);
req.on('close', () => {
sseManager.removeClient(clientId);
});
});
// 模拟实时更新
setInterval(() => {
sseManager.broadcast({
type: 'update',
value: Math.random() * 100,
timestamp: Date.now()
});
}, 5000);
app.listen(3000, () => {
console.log('SSE服务器运行在端口3000');
});
// client.ts - SSE客户端
class SSEClient {
private eventSource: EventSource | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
connect(url: string, handlers: {
onMessage?: (data: any) => void;
onError?: (error: Event) => void;
onOpen?: (){ () => void;
}
}): void {
this.eventSource = new EventSource(url);
this.eventSource.onopen = () => {
console.log('SSE已连接');
this.reconnectAttempts = 0;
handlers.onOpen?.();
};
this.eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
handlers.onMessage?.(data);
} catch (error) {
console.error('SSE数据解析失败:', error);
}
};
this.eventSource.onerror = (error) => {
console.error('SSE错误:', error);
handlers.onError?.(error);
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
setTimeout(() => {
console.log('重新连接到SSE...');
this.connect(url, handlers);
}, 3000);
}
};
// 自定义事件监听器
this.eventSource.addEventListener('custom-event', (event: any) => {
console.log('自定义事件:', JSON.parse(event.data));
});
}
disconnect(): void {
this.eventSource?.close();
this.eventSource = null;
}
}
// 使用
const client = new SSEClient();
client.connect('http://localhost:3000/events', {
onMessage: (data) => {
console.log('收到:', data);
},
onOpen: () => {
console.log('已连接到服务器');
}
});
4. Socket.IO(生产就绪)
// server.ts
import { Server } from 'socket.io';
import { createServer } from 'http';
const httpServer = createServer();
const io = new Server(httpServer, {
cors: {
origin: process.env.CLIENT_URL || 'http://localhost:3000',
methods: ['GET', 'POST']
},
pingTimeout: 60000,
pingInterval: 25000
});
// 中间件
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (isValidToken(token)) {
next();
} else {
next(new Error('认证错误'));
}
});
io.on('connection', (socket) => {
console.log(`用户已连接:${socket.id}`);
// 加入房间
socket.on('join-room', (roomId: string) => {
socket.join(roomId);
socket.to(roomId).emit('user-joined', {
userId: socket.id,
timestamp: Date.now()
});
});
// 处理消息
socket.on('message', (data) => {
const roomId = Array.from(socket.rooms)[1]; // 第一个是自己ID
io.to(roomId).emit('message', {
...data,
userId: socket.id,
timestamp: Date.now()
});
});
// 打字指示器
socket.on('typing', (isTyping: boolean) => {
const roomId = Array.from(socket.rooms)[1];
socket.to(roomId).emit('user-typing', {
userId: socket.id,
isTyping
});
});
socket.on('disconnect', () => {
console.log(`用户已断开连接:${socket.id}`);
});
});
httpServer.listen(3001);
function isValidToken(token: string): boolean {
// 实现令牌验证
return true;
}
最佳实践
✅ 执行
- 实现带有指数退避的重新连接逻辑
- 使用心跳/ping-pong检测死亡连接
- 验证和清理所有消息
- 实施认证和授权
- 处理连接限制和速率限制
- 对大负载使用压缩
- 实施适当的错误处理
- 监控连接健康
- 使用房间/频道进行目标消息传递
- 实施优雅关闭
❌ 不要
- 不要发送未加密的敏感数据
- 不要无限期地保持连接而不进行清理
- 当目标消息传递足够时,不要广播给所有用户
- 忽略连接状态管理
- 频繁发送大负载
- 跳过消息验证
- 忘记移动/不稳定连接
- 忽略扩展考虑
性能优化
// 消息批处理
class MessageBatcher {
private queue: any[] = [];
private timer: NodeJS.Timeout | null = null;
private batchSize = 10;
private batchDelay = 100;
constructor(
private sendFn: (messages: any[]) => void
) {}
add(message: any): void {
this.queue.push(message);
if (this.queue.length >= this.batchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.batchDelay);
}
}
private flush(): void {
if (this.queue.length > 0) {
this.sendFn(this.queue.splice(0));
}
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
}
}