名称: websocket 描述: 实时双向通信,安全重点在于CSWSH预防、身份认证和消息验证 模型: sonnet 风险级别: 高
WebSocket安全技能
文件组织
- SKILL.md: 核心原则、模式、基本安全(本文件)
- references/security-examples.md: CSWSH示例和身份认证模式
- references/advanced-patterns.md: 连接管理、扩展模式
- references/threat-model.md: 攻击场景,包括CSWSH
验证门
门 0.2: 通过(5+个漏洞已记录)- CVE-2024-23898、CVE-2024-26135、CVE-2023-0957
1. 概述
风险级别: 高
理由: WebSocket连接绕过同源策略保护,使其易受跨站WebSocket劫持(CSWSH)攻击。持久连接需要仔细的身份认证、会话管理和输入验证。
您是WebSocket安全专家,理解持久双向连接的独特漏洞。
核心专业领域
- CSWSH(跨站WebSocket劫持)预防
- Origin头部验证和基于令牌的身份认证
- 消息验证和每消息授权
- 速率限制和连接生命周期安全
2. 核心职责
基本原则
- 测试驱动开发优先: 在实现前编写测试 - 测试安全边界、连接生命周期
- 性能意识: 优化低延迟(<50毫秒)、连接池、背压
- 验证Origin: 始终检查Origin头部与明确允许列表
- 先认证: 在接收消息前验证身份
- 授权每个操作: 不要假设连接等于无限访问
- 验证所有消息: 将WebSocket消息视为不可信输入
- 限制资源: 速率限制消息、超时空闲连接
安全决策框架
| 情况 | 方法 |
|---|---|
| 新连接 | 验证Origin,要求身份认证令牌 |
| 每条消息 | 验证格式,检查操作授权 |
| 敏感操作 | 重新验证会话,记录操作 |
| 空闲连接 | 不活动期后超时 |
| 错误条件 | 关闭连接,记录详情 |
3. 技术基础
版本推荐
| 组件 | 版本 | 说明 |
|---|---|---|
| FastAPI/Starlette | 0.115+ | WebSocket支持 |
| websockets | 12.0+ | Python WebSocket库 |
安全配置
WEBSOCKET_CONFIG = {
"max_message_size": 1024 * 1024, # 1MB
"max_connections_per_ip": 10,
"idle_timeout_seconds": 300,
"messages_per_minute": 60,
}
# 绝不要使用“*”作为源
ALLOWED_ORIGINS = ["https://app.example.com", "https://admin.example.com"]
4. 实现工作流(测试驱动开发)
步骤1:先编写失败测试
import pytest
from httpx import AsyncClient, ASGITransport
from fastapi.testclient import TestClient
# 先测试安全边界
@pytest.mark.asyncio
async def test_origin_validation_rejects_invalid():
"""CSWSH预防 - 必须拒绝无效源。"""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as client:
# 在实现源验证前,这应该失败
with pytest.raises(Exception):
async with client.websocket_connect(
"/ws?token=valid",
headers={"Origin": "https://evil.com"}
):
pass
@pytest.mark.asyncio
async def test_authentication_required():
"""必须拒绝没有有效令牌的连接。"""
with TestClient(app) as client:
with pytest.raises(Exception):
with client.websocket_connect("/ws") as ws:
pass
@pytest.mark.asyncio
async def test_message_authorization():
"""每条消息操作必须被授权。"""
with TestClient(app) as client:
with client.websocket_connect(
"/ws?token=readonly_user",
headers={"Origin": "https://app.example.com"}
) as ws:
ws.send_json({"action": "delete", "id": "123"})
response = ws.receive_json()
assert response.get("error") == "Permission denied"
步骤2:实现最小通过
# 只实现通过测试所需
async def validate_origin(websocket: WebSocket) -> bool:
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True
步骤3:重构和验证
# 运行所有WebSocket测试
pytest tests/websocket/ -v --asyncio-mode=auto
# 检查安全问题
bandit -r src/websocket/
# 验证无回归
pytest tests/ -v
5. 性能模式
模式1:连接池
# 差 - 为每个请求创建新连接
ws = await create_connection(user_id) # 昂贵!
# 好 - 从池中重用连接
class ConnectionPool:
def __init__(self, max_size: int = 100):
self.connections: dict[str, WebSocket] = {}
async def get_or_create(self, user_id: str) -> WebSocket:
if user_id not in self.connections:
self.connections[user_id] = await create_connection(user_id)
return self.connections[user_id]
模式2:消息批处理
# 差 - 逐条发送消息
for item in items:
await websocket.send_json({"type": "item", "data": item})
# 好 - 批量消息以减少开销
await websocket.send_json({"type": "batch", "data": items[:50]})
模式3:二进制协议
# 差 - 对高频数据使用JSON(约80字节)
await websocket.send_json({"x": 123.456, "y": 789.012, "z": 456.789})
# 好 - 二进制格式(20字节)
import struct
await websocket.send_bytes(struct.pack('!3f', 123.456, 789.012, 456.789))
模式4:心跳优化
# 差 - 固定频繁心跳
HEARTBEAT_INTERVAL = 5 # 每5秒
# 好 - 基于活动的自适应心跳
interval = 60 if (time() - last_activity) < 60 else 30
模式5:背压处理
# 差 - 阻塞在慢客户端上
await ws.send_json(message)
# 好 - 超时和有界队列
from collections import deque
queue = deque(maxlen=100) # 满时丢弃最旧
尝试:
await asyncio.wait_for(ws.send_json(message), timeout=1.0)
除了 asyncio.TimeoutError:
pass # 客户端太慢
6. 实现模式
模式1:Origin验证(CSWSH预防关键)
from fastapi import WebSocket
async def validate_origin(websocket: WebSocket) -> bool:
"""验证WebSocket源与允许列表。"""
origin = websocket.headers.get("origin")
if not origin or origin not in ALLOWED_ORIGINS:
await websocket.close(code=4003, reason="Invalid origin")
return False
return True
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
await websocket.accept()
模式2:基于令牌的身份认证
from jose import jwt, JWTError
async def authenticate_websocket(websocket: WebSocket) -> User | None:
"""通过令牌认证(不使用cookies - 易受CSWSH攻击)。"""
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=4001, reason="Authentication required")
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
user = await user_service.get(payload.get("sub"))
if not user:
await websocket.close(code=4001, reason="User not found")
return None
return user
except JWTError:
await websocket.close(code=4001, reason="Invalid token")
return None
模式3:每消息授权
from pydantic import BaseModel, field_validator
class WebSocketMessage(BaseModel):
action: str
data: dict
@field_validator('action')
@classmethod
def validate_action(cls, v):
if v not in {'subscribe', 'unsubscribe', 'send', 'query'}:
raise ValueError(f'Invalid action: {v}')
return v
async def handle_message(websocket: WebSocket, user: User, raw_data: dict):
try:
message = WebSocketMessage(**raw_data)
except ValueError:
await websocket.send_json({"error": "Invalid message format"})
return
if not user.has_permission(f"ws:{message.action}"):
await websocket.send_json({"error": "Permission denied"})
return
result = await handlers[message.action](user, message.data)
await websocket.send_json(result)
模式4:带速率限制的连接管理器
from collections import defaultdict
from time import time
class SecureConnectionManager:
def __init__(self):
self.connections: dict[str, WebSocket] = {}
self.message_counts: dict[str, list[float]] = defaultdict(list)
self.connections_per_ip: dict[str, int] = defaultdict(int)
async def connect(self, websocket: WebSocket, user_id: str, ip: str) -> bool:
if self.connections_per_ip[ip] >= WEBSOCKET_CONFIG["max_connections_per_ip"]:
await websocket.close(code=4029, reason="Too many connections")
return False
await websocket.accept()
self.connections[user_id] = websocket
self.connections_per_ip[ip] += 1
return True
def check_rate_limit(self, user_id: str) -> bool:
now = time()
self.message_counts[user_id] = [
ts for ts in self.message_counts[user_id] if ts > now - 60
]
if len(self.message_counts[user_id]) >= WEBSOCKET_CONFIG["messages_per_minute"]:
return False
self.message_counts[user_id].append(now)
return True
模式5:完整安全处理器
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not await validate_origin(websocket):
return
user = await authenticate_websocket(websocket)
if not user:
return
ip = websocket.client.host
if not await manager.connect(websocket, user.id, ip):
return
try:
while True:
raw = await asyncio.wait_for(
websocket.receive_json(),
timeout=WEBSOCKET_CONFIG["idle_timeout_seconds"]
)
if not manager.check_rate_limit(user.id):
await websocket.send_json({"error": "Rate limited"})
continue
await handle_message(websocket, user, raw)
except (WebSocketDisconnect, asyncio.TimeoutError):
pass
finally:
manager.disconnect(user.id, ip)
7. 安全标准
领域漏洞景观
| CVE ID | 严重性 | 描述 | 缓解措施 |
|---|---|---|---|
| CVE-2024-23898 | 高 | Jenkins CSWSH - 命令执行 | 验证Origin |
| CVE-2024-26135 | 高 | MeshCentral CSWSH - 配置泄露 | Origin + SameSite |
| CVE-2023-0957 | 关键 | Gitpod CSWSH - 账户接管 | Origin + 令牌认证 |
OWASP Top 10 映射
| 类别 | 缓解措施 |
|---|---|
| A01 访问控制 | Origin验证、每消息授权 |
| A02 加密失败 | 仅使用TLS/WSS、签名令牌 |
| A03 注入 | 验证所有消息内容 |
| A07 身份认证失败 | 令牌认证、会话验证 |
CSWSH预防摘要
async def secure_websocket_handler(websocket: WebSocket):
# 1. 验证Origin(关键)
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return
# 2. 使用令牌认证(不使用cookies)
user = await validate_token(websocket.query_params.get("token"))
if not user:
await websocket.close(code=4001)
return
# 3. 仅在验证后接受
await websocket.accept()
# 4. 授权每条消息、5. 速率限制、6. 超时空闲
8. 常见错误和反模式
无Origin验证
# 绝不要 - 易受CSWSH攻击
@app.websocket("/ws")
async def vulnerable(websocket: WebSocket):
await websocket.accept() # 接受任何源!
# 始终 - 先验证源
if websocket.headers.get("origin") not in ALLOWED_ORIGINS:
await websocket.close(code=4003)
return
仅Cookie身份认证
# 绝不要 - cookies在CSWSH攻击中自动发送
session = websocket.cookies.get("session")
# 始终 - 要求明确令牌参数
token = websocket.query_params.get("token")
无每消息授权
# 绝不要 - 假设连接等于完全访问
if data["action"] == "delete":
await delete_resource(data["id"])
# 始终 - 检查每个操作权限
if not user.has_permission("delete"):
return {"error": "Permission denied"}
无输入验证
# 绝不要 - 信任WebSocket消息
await db.execute(f"SELECT * FROM {data['table']}") # SQL注入!
# 始终 - 使用Pydantic验证
message = WebSocketMessage(**data)
9. 预实现清单
阶段1:编码前
- [ ] 为安全边界编写失败测试(CSWSH、认证、授权)
- [ ] 为连接生命周期编写失败测试(连接、断开、超时)
- [ ] 为消息验证和速率限制编写失败测试
- [ ] 查看
references/threat-model.md中的威胁模型 - [ ] 识别性能需求(延迟、吞吐量、连接数)
阶段2:实现期间
- [ ] Origin验证与明确允许列表
- [ ] 基于令牌的身份认证(非仅cookie)
- [ ] 每消息授权检查
- [ ] 实现速率限制和空闲超时
- [ ] 使用Pydantic验证所有消息
- [ ] 连接池以提高效率
- [ ] 背压处理用于慢客户端
阶段3:提交前
- [ ] 所有安全测试通过:
pytest tests/websocket/ -v - [ ] 无安全问题:
bandit -r src/websocket/ - [ ] 生产配置中强制执行WSS(TLS)
- [ ] 验证CSWSH测试覆盖
- [ ] 性能基准达到目标(<50毫秒延迟)
- [ ] 无回归:
pytest tests/ -v
10. 摘要
安全目标:
- 抗CSWSH: Origin验证、令牌认证
- 正确授权: 每消息权限检查
- 速率限制: 防止消息泛洪
- 已验证: 将所有消息视为不可信
关键提醒: 始终验证Origin,使用令牌认证(非cookies),授权每条消息,生产中使用WSS。