WebSocketImplementation websocket-implementation

WebSocket 实现提供了构建实时通信系统的方法,包括聊天系统、实时通知、协作工具等,支持水平扩展和Redis适配。

后端开发 0 次安装 0 次浏览 更新于 3/4/2026

WebSocket 实现

概览

构建具有适当连接管理、消息路由、错误处理和水平扩展支持的可扩展 WebSocket 系统,用于实时通信。

何时使用

  • 构建实时聊天和消息传递
  • 实施实时通知
  • 创建协作编辑工具
  • 广播实时数据更新
  • 构建实时仪表板
  • 向客户端流式传输事件
  • 实时多人游戏

指令

1. Node.js WebSocket 服务器(Socket.IO

const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const redis = require('redis');

const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
  cors: { origin: '*' },
  transports: ['websocket', 'polling'],
  reconnection: true,
  reconnectionDelay: 1000,
  reconnectionDelayMax: 5000,
  reconnectionAttempts: 5
});

// Redis 适配器用于水平扩展
const redisClient = redis.createClient();
const { createAdapter } = require('@socket.io/redis-adapter');

io.adapter(createAdapter(redisClient, redisClient.duplicate()));

// 连接管理
const connectedUsers = new Map();

io.on('connection', (socket) => {
  console.log(`用户已连接: ${socket.id}`);

  // 存储用户连接
  socket.on('auth', (userData) => {
    connectedUsers.set(socket.id, {
      userId: userData.id,
      username: userData.username,
      socketId: socket.id,
      connectedAt: new Date()
    });

    // 加入用户特定房间
    socket.join(`user:${userData.id}`);
    socket.join('authenticated_users');

    // 通知其他人用户在线
    io.to('authenticated_users').emit('user:online', {
      userId: userData.id,
      username: userData.username,
      timestamp: new Date()
    });

    console.log(`用户已认证: ${userData.username}`);
  });

  // 聊天消息
  socket.on('chat:message', (message) => {
    const user = connectedUsers.get(socket.id);

    if (!user) {
      socket.emit('error', { message: '未认证' });
      return;
    }

    const chatMessage = {
      id: `msg_${Date.now()}`,
      senderId: user.userId,
      senderName: user.username,
      text: message.text,
      roomId: message.roomId,
      timestamp: new Date(),
      status: 'delivered'
    };

    // 保存到数据库
    Message.create(chatMessage);

    // 广播到房间
    io.to(`room:${message.roomId}`).emit('chat:message', chatMessage);

    // 更新消息状态
    setTimeout(() => {
      socket.emit('chat:message:ack', { messageId: chatMessage.id, status: 'read' });
    }, 100);
  });

  // 房间管理
  socket.on('room:join', (roomId) => {
    socket.join(`room:${roomId}`);

    const user = connectedUsers.get(socket.id);
    io.to(`room:${roomId}`).emit('room:user:joined', {
      userId: user.userId,
      username: user.username,
      timestamp: new Date()
    });
  });

  socket.on('room:leave', (roomId) => {
    socket.leave(`room:${roomId}`);

    const user = connectedUsers.get(socket.id);
    io.to(`room:${roomId}`).emit('room:user:left', {
      userId: user.userId,
      timestamp: new Date()
    });
  });

  // 正在输入指示器
  socket.on('typing:start', (roomId) => {
    const user = connectedUsers.get(socket.id);
    io.to(`room:${roomId}`).emit('typing:indicator', {
      userId: user.userId,
      username: user.username,
      isTyping: true
    });
  });

  socket.on('typing:stop', (roomId) => {
    const user = connectedUsers.get(socket.id);
    io.to(`room:${roomId}`).emit('typing:indicator', {
      userId: user.userId,
      isTyping: false
    });
  });

  // 处理断开连接
  socket.on('disconnect', () => {
    const user = connectedUsers.get(socket.id);

    if (user) {
      connectedUsers.delete(socket.id);
      io.to('authenticated_users').emit('user:offline', {
        userId: user.userId,
        timestamp: new Date()
      });

      console.log(`用户已断开连接: ${user.username}`);
    }
  });

  // 错误处理
  socket.on('error', (error) => {
    console.error(`Socket 错误: ${error}`);
    socket.emit('error', { message: '发生错误' });
  });
});

// 服务器方法
const broadcastUserUpdate = (userId, data) => {
  io.to(`user:${userId}`).emit('user:update', data);
};

const notifyRoom = (roomId, event, data) => {
  io.to(`room:${roomId}`).emit(event, data);
};

const sendDirectMessage = (userId, event, data) => {
  io.to(`user:${userId}`).emit(event, data);
};

server.listen(3000, () => {
  console.log('WebSocket 服务器正在 3000 端口监听');
});

2. 浏览器 WebSocket 客户端

class WebSocketClient {
  constructor(url, options = {}) {
    this.url = url;
    this.socket = null;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
    this.reconnectDelay = options.reconnectDelay || 1000;
    this.listeners = new Map();
    this.messageQueue = [];
    this.isAuthenticated = false;

    this.connect();
  }

  connect() {
    this.socket = io(this.url, {
      reconnection: true,
      reconnectionDelay: this.reconnectDelay,
      reconnectionAttempts: this.maxReconnectAttempts
    });

    this.socket.on('connect', () => {
      console.log('已连接到服务器');
      this.reconnectAttempts = 0;
      this.processMessageQueue();
    });

    this.socket.on('disconnect', () => {
      console.log('与服务器断开连接');
    });

    this.socket.on('error', (error) => {
      console.error('Socket 错误:', error);
      this.emit('error', error);
    });

    this.socket.on('connect_error', (error) => {
      console.error('连接错误:', error);
    });
  }

  authenticate(userData) {
    this.socket.emit('auth', userData, (response) => {
      if (response.success) {
        this.isAuthenticated = true;
        this.emit('authenticated');
      }
    });
  }

  on(event, callback) {
    this.socket.on(event, callback);

    if (!this.listeners.has(event)) {
      this.listeners.set(event, []);
    }
    this.listeners.get(event).push(callback);
  }

  emit(event, data, callback) {
    if (!this.socket.connected) {
      this.messageQueue.push({ event, data, callback });
      return;
    }

    this.socket.emit(event, data, callback);
  }

  processMessageQueue() {
    while (this.messageQueue.length > 0) {
      const { event, data, callback } = this.messageQueue.shift();
      this.socket.emit(event, data, callback);
    }
  }

  joinRoom(roomId) {
    this.emit('room:join', roomId);
  }

  leaveRoom(roomId) {
    this.emit('room:leave', roomId);
  }

  sendMessage(roomId, text) {
    this.emit('chat:message', { roomId, text });
  }

  setTypingIndicator(roomId, isTyping) {
    if (isTyping) {
      this.emit('typing:start', roomId);
    } else {
      this.emit('typing:stop', roomId);
    }
  }

  disconnect() {
    this.socket.disconnect();
  }
}

// 使用方法
const client = new WebSocketClient('http://localhost:3000');

client.on('chat:message', (message) => {
  console.log('收到消息:', message);
  displayMessage(message);
});

client.on('typing:indicator', (data) => {
  updateTypingIndicator(data);
});

client.on('user:online', (user) => {
  updateUserStatus(user.userId, 'online');
});

client.authenticate({ id: 'user123', username: 'john' });
client.joinRoom('room1');
client.sendMessage('room1', '大家好!');

3. Python WebSocket 服务器(aiohttp)

from aiohttp import web
import aiohttp
import json
from datetime import datetime
from typing import Set

class WebSocketServer:
    def __init__(self):
        self.app = web.Application()
        self.rooms = {}
        self.users = {}
        self.setup_routes()

    def setup_routes(self):
        self.app.router.add_get('/ws', self.websocket_handler)
        self.app.router.add_post('/api/message', self.send_message_api)

    async def websocket_handler(self, request):
        ws = web.WebSocketResponse()
        await ws.prepare(request)

        user_id = None
        room_id = None

        async for msg in ws.iter_any():
            if isinstance(msg, aiohttp.WSMessage):
                data = json.loads(msg.data)
                event_type = data.get('type')

                try:
                    if event_type == 'auth':
                        user_id = data.get('userId')
                        self.users[user_id] = ws
                        await ws.send_json({
                            'type': 'authenticated',
                            'timestamp': datetime.now().isoformat()
                        })

                    elif event_type == 'join_room':
                        room_id = data.get('roomId')
                        if room_id not in self.rooms:
                            self.rooms[room_id] = set()
                        self.rooms[room_id].add(user_id)

                        # 通知其他人
                        await self.broadcast_to_room(room_id, {
                            'type': 'user_joined',
                            'userId': user_id,
                            'timestamp': datetime.now().isoformat()
                        }, exclude=user_id)

                    elif event_type == 'message':
                        message = {
                            'id': f'msg_{datetime.now().timestamp()}',
                            'userId': user_id,
                            'text': data.get('text'),
                            'roomId': room_id,
                            'timestamp': datetime.now().isoformat()
                        }

                        # 保存到数据库
                        await self.save_message(message)

                        # 广播到房间
                        await self.broadcast_to_room(room_id, message)

                    elif event_type == 'leave_room':
                        if room_id in self.rooms:
                            self.rooms[room_id].discard(user_id)

                except Exception as error:
                    await ws.send_json({
                        'type': 'error',
                        'message': str(error)
                    })

        # 断开连接时清理
        if user_id:
            del self.users[user_id]
        if room_id and user_id:
            if room_id in self.rooms:
                self.rooms[room_id].discard(user_id)

        return ws

    async def broadcast_to_room(self, room_id, message, exclude=None):
        if room_id not in self.rooms:
            return

        for user_id in self.rooms[room_id]:
            if user_id != exclude and user_id in self.users:
                try:
                    await self.users[user_id].send_json(message)
                except Exception as error:
                    print(f'发送消息错误: {error}')

    async def save_message(self, message):
        # 保存到数据库
        pass

    async def send_message_api(self, request):
        data = await request.json()
        room_id = data.get('roomId')

        await self.broadcast_to_room(room_id, {
            'type': 'message',
            'text': data.get('text'),
            'timestamp': datetime.now().isoformat()
        })

        return web.json_response({'sent': True})

def create_app():
    server = WebSocketServer()
    return server.app

if __name__ == '__main__':
    app = create_app()
    web.run_app(app, port=3000)

4. 消息类型和协议

// 认证
{
  "type": "auth",
  "userId": "user123",
  "token": "jwt_token_here"
}

// 聊天消息
{
  "type": "message",
  "roomId": "room123",
  "text": "大家好!",
  "timestamp": "2025-01-15T10:30:00Z"
}

// 正在输入指示器
{
  "type": "typing",
  "roomId": "room123",
  "isTyping": true
}

// 存在性
{
  "type": "presence",
  "status": "online|away|offline"
}

// 通知
{
  "type": "notification",
  "title": "新消息",
  "body": "您有一条新消息",
  "data": {}
}

5. 使用 Redis 进行扩展

const redis = require('redis');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');

const pubClient = createClient({ host: 'redis', port: 6379 });
const subClient = pubClient.duplicate();

io.adapter(createAdapter(pubClient, subClient));

// 向多个服务器发布
io.emit('user:action', { userId: 123, action: 'login' });

// 从其他服务器订阅事件
redisClient.subscribe('notifications', (message) => {
  const notification = JSON.parse(message);
  io.to(`user:${notification.userId}`).emit('notification', notification);
});

最佳实践

✅ 执行

  • 实施适当的认证
  • 优雅地处理重新连接
  • 有效管理房间/频道
  • 适当地持久化消息
  • 监控活动连接
  • 实施存在性功能
  • 使用 Redis 进行扩展
  • 添加消息确认
  • 实施速率限制
  • 正确处理错误

❌ 不要

  • 发送未加密的敏感数据
  • 在内存中保留无限的消息历史
  • 允许任意房间/频道创建
  • 忘记清理断开的连接
  • 频繁发送大消息
  • 忽略网络故障
  • 在消息中存储密码
  • 跳过认证/授权
  • 创建无限制的连接增长
  • 从一开始就忽略可扩展性

监控

// 跟踪活动连接
io.engine.on('connection_error', (err) => {
  console.log(err.req); // 请求对象
  console.log(err.code); // 错误代码,例如 1
  console.log(err.message); // 错误消息
  console.log(err.context); // 一些额外的错误上下文
});

app.get('/metrics/websocket', (req, res) => {
  res.json({
    activeConnections: io.engine.clientsCount,
    connectedSockets: io.sockets.sockets.size,
    rooms: Object.keys(io.sockets.adapter.rooms)
  });
});