WebSocket安全实施技能Skill websocket

这个技能专注于WebSocket连接的安全性,用于预防跨站WebSocket劫持(CSWSH),实现身份认证、消息验证、速率限制和连接生命周期管理。适用于需要安全实时通信的Web应用开发,提升网络防护能力。关键词:WebSocket安全、CSWSH预防、身份认证、消息验证、网络安全、实时通信、速率限制、连接管理。

身份认证 0 次安装 0 次浏览 更新于 3/15/2026

名称: websocket 描述: 实时双向通信,安全重点在于CSWSH预防、身份认证和消息验证 模型: sonnet 风险级别: 高

WebSocket安全技能

文件组织

  • SKILL.md: 核心原则、模式、基本安全(本文件)
  • references/security-examples.md: CSWSH示例和身份认证模式
  • references/advanced-patterns.md: 连接管理、扩展模式
  • references/threat-model.md: 攻击场景,包括CSWSH

验证门

门 0.2: 通过(5+个漏洞已记录)- CVE-2024-23898、CVE-2024-26135、CVE-2023-0957


1. 概述

风险级别: 高

理由: WebSocket连接绕过同源策略保护,使其易受跨站WebSocket劫持(CSWSH)攻击。持久连接需要仔细的身份认证、会话管理和输入验证。

您是WebSocket安全专家,理解持久双向连接的独特漏洞。

核心专业领域

  • CSWSH(跨站WebSocket劫持)预防
  • Origin头部验证和基于令牌的身份认证
  • 消息验证和每消息授权
  • 速率限制和连接生命周期安全

2. 核心职责

基本原则

  1. 测试驱动开发优先: 在实现前编写测试 - 测试安全边界、连接生命周期
  2. 性能意识: 优化低延迟(<50毫秒)、连接池、背压
  3. 验证Origin: 始终检查Origin头部与明确允许列表
  4. 先认证: 在接收消息前验证身份
  5. 授权每个操作: 不要假设连接等于无限访问
  6. 验证所有消息: 将WebSocket消息视为不可信输入
  7. 限制资源: 速率限制消息、超时空闲连接

安全决策框架

情况 方法
新连接 验证Origin,要求身份认证令牌
每条消息 验证格式,检查操作授权
敏感操作 重新验证会话,记录操作
空闲连接 不活动期后超时
错误条件 关闭连接,记录详情

3. 技术基础

版本推荐

组件 版本 说明
FastAPI/Starlette 0.115+ WebSocket支持
websockets 12.0+ Python WebSocket库

安全配置

WEBSOCKET_CONFIG = {
    "max_message_size": 1024 * 1024,  # 1MB
    "max_connections_per_ip": 10,
    "idle_timeout_seconds": 300,
    "messages_per_minute": 60,
}

# 绝不要使用“*”作为源
ALLOWED_ORIGINS = ["https://app.example.com", "https://admin.example.com"]

4. 实现工作流(测试驱动开发)

步骤1:先编写失败测试

import pytest
from httpx import AsyncClient, ASGITransport
from fastapi.testclient import TestClient

# 先测试安全边界
@pytest.mark.asyncio
async def test_origin_validation_rejects_invalid():
    """CSWSH预防 - 必须拒绝无效源。"""
    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test"
    ) as client:
        # 在实现源验证前,这应该失败
        with pytest.raises(Exception):
            async with client.websocket_connect(
                "/ws?token=valid",
                headers={"Origin": "https://evil.com"}
            ):
                pass

@pytest.mark.asyncio
async def test_authentication_required():
    """必须拒绝没有有效令牌的连接。"""
    with TestClient(app) as client:
        with pytest.raises(Exception):
            with client.websocket_connect("/ws") as ws:
                pass

@pytest.mark.asyncio
async def test_message_authorization():
    """每条消息操作必须被授权。"""
    with TestClient(app) as client:
        with client.websocket_connect(
            "/ws?token=readonly_user",
            headers={"Origin": "https://app.example.com"}
        ) as ws:
            ws.send_json({"action": "delete", "id": "123"})
            response = ws.receive_json()
            assert response.get("error") == "Permission denied"

步骤2:实现最小通过

# 只实现通过测试所需
async def validate_origin(websocket: WebSocket) -> bool:
    origin = websocket.headers.get("origin")
    if not origin or origin not in ALLOWED_ORIGINS:
        await websocket.close(code=4003, reason="Invalid origin")
        return False
    return True

步骤3:重构和验证

# 运行所有WebSocket测试
pytest tests/websocket/ -v --asyncio-mode=auto

# 检查安全问题
bandit -r src/websocket/

# 验证无回归
pytest tests/ -v

5. 性能模式

模式1:连接池

# 差 - 为每个请求创建新连接
ws = await create_connection(user_id)  # 昂贵!

# 好 - 从池中重用连接
class ConnectionPool:
    def __init__(self, max_size: int = 100):
        self.connections: dict[str, WebSocket] = {}

    async def get_or_create(self, user_id: str) -> WebSocket:
        if user_id not in self.connections:
            self.connections[user_id] = await create_connection(user_id)
        return self.connections[user_id]

模式2:消息批处理

# 差 - 逐条发送消息
for item in items:
    await websocket.send_json({"type": "item", "data": item})

# 好 - 批量消息以减少开销
await websocket.send_json({"type": "batch", "data": items[:50]})

模式3:二进制协议

# 差 - 对高频数据使用JSON(约80字节)
await websocket.send_json({"x": 123.456, "y": 789.012, "z": 456.789})

# 好 - 二进制格式(20字节)
import struct
await websocket.send_bytes(struct.pack('!3f', 123.456, 789.012, 456.789))

模式4:心跳优化

# 差 - 固定频繁心跳
HEARTBEAT_INTERVAL = 5  # 每5秒

# 好 - 基于活动的自适应心跳
interval = 60 if (time() - last_activity) < 60 else 30

模式5:背压处理

# 差 - 阻塞在慢客户端上
await ws.send_json(message)

# 好 - 超时和有界队列
from collections import deque
queue = deque(maxlen=100)  # 满时丢弃最旧
尝试:
    await asyncio.wait_for(ws.send_json(message), timeout=1.0)
除了 asyncio.TimeoutError:
    pass  # 客户端太慢

6. 实现模式

模式1:Origin验证(CSWSH预防关键)

from fastapi import WebSocket

async def validate_origin(websocket: WebSocket) -> bool:
    """验证WebSocket源与允许列表。"""
    origin = websocket.headers.get("origin")
    if not origin or origin not in ALLOWED_ORIGINS:
        await websocket.close(code=4003, reason="Invalid origin")
        return False
    return True

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    if not await validate_origin(websocket):
        return
    await websocket.accept()

模式2:基于令牌的身份认证

from jose import jwt, JWTError

async def authenticate_websocket(websocket: WebSocket) -> User | None:
    """通过令牌认证(不使用cookies - 易受CSWSH攻击)。"""
    token = websocket.query_params.get("token")
    if not token:
        await websocket.close(code=4001, reason="Authentication required")
        return None
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        user = await user_service.get(payload.get("sub"))
        if not user:
            await websocket.close(code=4001, reason="User not found")
            return None
        return user
    except JWTError:
        await websocket.close(code=4001, reason="Invalid token")
        return None

模式3:每消息授权

from pydantic import BaseModel, field_validator

class WebSocketMessage(BaseModel):
    action: str
    data: dict

    @field_validator('action')
    @classmethod
    def validate_action(cls, v):
        if v not in {'subscribe', 'unsubscribe', 'send', 'query'}:
            raise ValueError(f'Invalid action: {v}')
        return v

async def handle_message(websocket: WebSocket, user: User, raw_data: dict):
    try:
        message = WebSocketMessage(**raw_data)
    except ValueError:
        await websocket.send_json({"error": "Invalid message format"})
        return

    if not user.has_permission(f"ws:{message.action}"):
        await websocket.send_json({"error": "Permission denied"})
        return

    result = await handlers[message.action](user, message.data)
    await websocket.send_json(result)

模式4:带速率限制的连接管理器

from collections import defaultdict
from time import time

class SecureConnectionManager:
    def __init__(self):
        self.connections: dict[str, WebSocket] = {}
        self.message_counts: dict[str, list[float]] = defaultdict(list)
        self.connections_per_ip: dict[str, int] = defaultdict(int)

    async def connect(self, websocket: WebSocket, user_id: str, ip: str) -> bool:
        if self.connections_per_ip[ip] >= WEBSOCKET_CONFIG["max_connections_per_ip"]:
            await websocket.close(code=4029, reason="Too many connections")
            return False
        await websocket.accept()
        self.connections[user_id] = websocket
        self.connections_per_ip[ip] += 1
        return True

    def check_rate_limit(self, user_id: str) -> bool:
        now = time()
        self.message_counts[user_id] = [
            ts for ts in self.message_counts[user_id] if ts > now - 60
        ]
        if len(self.message_counts[user_id]) >= WEBSOCKET_CONFIG["messages_per_minute"]:
            return False
        self.message_counts[user_id].append(now)
        return True

模式5:完整安全处理器

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    if not await validate_origin(websocket):
        return
    user = await authenticate_websocket(websocket)
    if not user:
        return

    ip = websocket.client.host
    if not await manager.connect(websocket, user.id, ip):
        return

    try:
        while True:
            raw = await asyncio.wait_for(
                websocket.receive_json(),
                timeout=WEBSOCKET_CONFIG["idle_timeout_seconds"]
            )
            if not manager.check_rate_limit(user.id):
                await websocket.send_json({"error": "Rate limited"})
                continue
            await handle_message(websocket, user, raw)
    except (WebSocketDisconnect, asyncio.TimeoutError):
        pass
    finally:
        manager.disconnect(user.id, ip)

7. 安全标准

领域漏洞景观

CVE ID 严重性 描述 缓解措施
CVE-2024-23898 Jenkins CSWSH - 命令执行 验证Origin
CVE-2024-26135 MeshCentral CSWSH - 配置泄露 Origin + SameSite
CVE-2023-0957 关键 Gitpod CSWSH - 账户接管 Origin + 令牌认证

OWASP Top 10 映射

类别 缓解措施
A01 访问控制 Origin验证、每消息授权
A02 加密失败 仅使用TLS/WSS、签名令牌
A03 注入 验证所有消息内容
A07 身份认证失败 令牌认证、会话验证

CSWSH预防摘要

async def secure_websocket_handler(websocket: WebSocket):
    # 1. 验证Origin(关键)
    if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
        await websocket.close(code=4003)
        return
    # 2. 使用令牌认证(不使用cookies)
    user = await validate_token(websocket.query_params.get("token"))
    if not user:
        await websocket.close(code=4001)
        return
    # 3. 仅在验证后接受
    await websocket.accept()
    # 4. 授权每条消息、5. 速率限制、6. 超时空闲

8. 常见错误和反模式

无Origin验证

# 绝不要 - 易受CSWSH攻击
@app.websocket("/ws")
async def vulnerable(websocket: WebSocket):
    await websocket.accept()  # 接受任何源!

# 始终 - 先验证源
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
    await websocket.close(code=4003)
    return

仅Cookie身份认证

# 绝不要 - cookies在CSWSH攻击中自动发送
session = websocket.cookies.get("session")

# 始终 - 要求明确令牌参数
token = websocket.query_params.get("token")

无每消息授权

# 绝不要 - 假设连接等于完全访问
if data["action"] == "delete":
    await delete_resource(data["id"])

# 始终 - 检查每个操作权限
if not user.has_permission("delete"):
    return {"error": "Permission denied"}

无输入验证

# 绝不要 - 信任WebSocket消息
await db.execute(f"SELECT * FROM {data['table']}")  # SQL注入!

# 始终 - 使用Pydantic验证
message = WebSocketMessage(**data)

9. 预实现清单

阶段1:编码前

  • [ ] 为安全边界编写失败测试(CSWSH、认证、授权)
  • [ ] 为连接生命周期编写失败测试(连接、断开、超时)
  • [ ] 为消息验证和速率限制编写失败测试
  • [ ] 查看references/threat-model.md中的威胁模型
  • [ ] 识别性能需求(延迟、吞吐量、连接数)

阶段2:实现期间

  • [ ] Origin验证与明确允许列表
  • [ ] 基于令牌的身份认证(非仅cookie)
  • [ ] 每消息授权检查
  • [ ] 实现速率限制和空闲超时
  • [ ] 使用Pydantic验证所有消息
  • [ ] 连接池以提高效率
  • [ ] 背压处理用于慢客户端

阶段3:提交前

  • [ ] 所有安全测试通过:pytest tests/websocket/ -v
  • [ ] 无安全问题:bandit -r src/websocket/
  • [ ] 生产配置中强制执行WSS(TLS)
  • [ ] 验证CSWSH测试覆盖
  • [ ] 性能基准达到目标(<50毫秒延迟)
  • [ ] 无回归:pytest tests/ -v

10. 摘要

安全目标:

  • 抗CSWSH: Origin验证、令牌认证
  • 正确授权: 每消息权限检查
  • 速率限制: 防止消息泛洪
  • 已验证: 将所有消息视为不可信

关键提醒: 始终验证Origin,使用令牌认证(非cookies),授权每条消息,生产中使用WSS。