认证与安全模式
此技能为2025年现代web应用程序提供全面的认证和安全模式,重点关注JWT令牌、OAuth2、多因素认证和零信任安全原则,这些原则适用于不同的框架和数据库。
何时使用此技能
当您需要:
- 使用JWT令牌实现安全认证
- 设置OAuth2社交登录提供商
- 实施基于角色的访问控制(RBAC)
- 添加多因素认证(MFA)
- 使用适当的中间件保护API端点
- 处理会话管理和令牌刷新
- 实施零信任安全模式
- 设置WebSocket认证
- 创建审计跟踪和安全日志
现代认证架构
1. JWT令牌管理与刷新令牌
# core/auth.py
import jwt
import secrets
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from passlib.context import CryptContext
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import os
class TokenManager:
"""JWT令牌管理与安全最佳实践"""
def __init__(self):
self.secret_key = os.getenv("JWT_SECRET_KEY", self._generate_secret())
self.algorithm = "HS256"
self.access_token_expire = timedelta(minutes=15)
self.refresh_token_expire = timedelta(days=7)
self.pwd_context = CryptContext(
schemes=["pbkdf2_sha256"],
default="pbkdf2_sha256",
pbkdf2_sha256__default_rounds=120000
)
def _generate_secret(self) -> str:
"""生成密码学上安全的密钥"""
return secrets.token_urlsafe(32)
def create_password_hash(self, password: str) -> str:
"""创建安全的密码哈希"""
return self.pwd_context.hash(password)
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
"""验证密码与哈希"""
return self.pwd_context.verify(plain_password, hashed_password)
def create_access_token(
self,
data: Dict[str, Any],
expires_delta: Optional[timedelta] = None
) -> str:
"""创建JWT访问令牌"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + self.access_token_expire
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"type": "access",
"jti": secrets.token_urlsafe(16) # JWT ID
})
encoded_jwt = jwt.encode(
to_encode,
self.secret_key,
algorithm=self.algorithm
)
return encoded_jwt
def create_refresh_token(
self,
user_id: str,
device_info: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""创建安全的刷新令牌与设备绑定"""
jti = secrets.token_urlsafe(32)
token_data = {
"sub": user_id,
"jti": jti,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + self.refresh_token_expire,
"type": "refresh",
"device": device_info or {}
}
# 在数据库或缓存中存储刷新令牌
refresh_token = jwt.encode(
token_data,
self.secret_key,
algorithm=self.algorithm
)
# 存储令牌哈希以检查撤销
token_hash = self._hash_token(refresh_token)
return {
"token": refresh_token,
"jti": jti,
"expires_at": token_data["exp"],
"token_hash": token_hash
}
def verify_token(self, token: str, token_type: str = "access") -> Dict[str, Any]:
"""验证和解码JWT令牌"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
options={"verify_exp": True}
)
if payload.get("type") != token_type:
raise ValueError("无效的令牌类型")
return payload
except jwt.ExpiredSignatureError:
raise ValueError("令牌已过期")
except jwt.JWTError:
raise ValueError("无效的令牌")
def revoke_token(self, jti: str):
"""撤销令牌(添加到黑名单)"""
# 实现令牌黑名单(Redis或数据库)
pass
def _hash_token(self, token: str) -> str:
"""哈希令牌以存储"""
return self.pwd_context.hash(token)
# 单例实例
token_manager = TokenManager()
2. OAuth2提供商集成
# core/oauth.py
from typing import Dict, Any, Optional
from abc import ABC, abstractmethod
import httpx
from urllib.parse import urlencode, parse_qs
class OAuth2Provider(ABC):
"""基础OAuth2提供商实现"""
def __init__(
self,
client_id: str,
client_secret: str,
redirect_uri: str,
scopes: list
):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.scopes = scopes
@abstractmethod
def get_authorization_url(self, state: str) -> str:
"""获取OAuth2授权URL"""
pass
@abstractmethod
async def exchange_code_for_token(self, code: str, state: str) -> Dict[str, Any]:
"""交换授权码以获取访问令牌"""
pass
@abstractmethod
async def get_user_info(self, access_token: str) -> Dict[str, Any]:
"""从提供商获取用户信息"""
pass
class GoogleOAuth2Provider(OAuth2Provider):
"""Google OAuth2提供商实现"""
AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
TOKEN_URL = "https://oauth2.googleapis.com/token"
USER_INFO_URL = "https://www.googleapis.com/oauth2/v2/userinfo"
def get_authorization_url(self, state: str) -> str:
params = {
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"response_type": "code",
"scope": " ".join(self.scopes),
"state": state,
"access_type": "offline", # 用于刷新令牌
"prompt": "consent"
}
return f"{self.AUTH_URL}?{urlencode(params)}"
async def exchange_code_for_token(self, code: str, state: str) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": self.redirect_uri
}
response = await client.post(self.TOKEN_URL, data=data)
response.raise_for_status()
return response.json()
async def get_user_info(self, access_token: str) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
headers = {"Authorization": f"Bearer {access_token}"}
response = await client.get(
self.USER_INFO_URL,
headers=headers
)
response.raise_for_status()
return response.json()
class GitHubOAuth2Provider(OAuth2Provider):
"""GitHub OAuth2提供商实现"""
AUTH_URL = "https://github.com/login/oauth/authorize"
TOKEN_URL = "https://github.com/login/oauth/access_token"
USER_INFO_URL = "https://api.github.com/user"
def get_authorization_url(self, state: str) -> str:
params = {
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"response_type": "code",
"scope": " ".join(self.scopes),
"state": state
}
return f"{self.AUTH_URL}?{urlencode(params)}"
async def exchange_code_for_token(self, code: str, state: str) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
headers = {"Accept": "application/json"}
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"code": code,
"grant_type": "authorization_code"
}
response = await client.post(
self.TOKEN_URL,
headers=headers,
data=data
)
response.raise_for_status()
return response.json()
async def get_user_info(self, access_token: str) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"token {access_token}",
"User-Agent": "MyApp"
}
response = await client.get(
self.USER_INFO_URL,
headers=headers
)
response.raise_for_status()
return response.json()
# 工厂创建OAuth2提供商
def create_oauth2_provider(
provider: str,
client_id: str,
client_secret: str,
redirect_uri: str,
scopes: list
) -> OAuth2Provider:
"""工厂方法创建OAuth2提供商"""
providers = {
"google": GoogleOAuth2Provider,
"github": GitHubOAuth2Provider,
# 根据需要添加更多提供商
}
provider_class = providers.get(provider.lower())
if not provider_class:
raise ValueError(f"不支持的OAuth2提供商:{provider}")
return provider_class(
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri,
scopes=scopes
)
3. 基于角色的访问控制(RBAC)
# core/rbac.py
from enum import Enum
from typing import List, Dict, Set, Optional
from dataclasses import dataclass
from functools import wraps
import inspect
class Permission(str, Enum):
"""权限标识符"""
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
CREATE = "create"
UPDATE = "update"
MANAGE_USERS = "manage_users"
MANAGE_ROLES = "manage_roles"
VIEW_AUDIT_LOGS = "view_audit_logs"
class Role(str, Enum):
"""用户角色"""
ANONYMOUS = "anonymous"
USER = "user"
MODERATOR = "moderator"
ADMIN = "admin"
SUPER_ADMIN = "super_admin"
@dataclass
class RolePermission:
"""角色到权限的映射"""
role: Role
permissions: Set[Permission]
class RBACManager:
"""基于角色的访问控制管理器"""
def __init__(self):
self._role_permissions = self._initialize_roles()
self._user_roles: Dict[str, Set[Role]] = {}
def _initialize_roles(self) -> Dict[Role, Set[Permission]]:
"""初始化默认角色和权限"""
return {
Role.ANONYMOUS: {Permission.READ},
Role.USER: {
Permission.READ,
Permission.WRITE,
Permission.CREATE,
Permission.UPDATE,
Permission.DELETE # 自己的资源
},
Role.MODERATOR: {
Permission.READ,
Permission.WRITE,
Permission.CREATE,
Permission.UPDATE,
Permission.DELETE,
Permission.MANAGE_ROLES
},
Role.ADMIN: {
Permission.READ,
Permission.WRITE,
Permission.CREATE,
Permission.UPDATE,
Permission.DELETE,
Permission.MANAGE_USERS,
Permission.MANAGE_ROLES,
Permission.VIEW_AUDIT_LOGS
},
Role.SUPER_ADMIN: {
Permission.ADMIN, # 所有管理员权限
# 如果需要,添加超级管理员特定的权限
}
}
def assign_role_to_user(self, user_id: str, role: Role):
"""给用户分配角色"""
if user_id not in self._user_roles:
self._user_roles[user_id] = set()
self._user_roles[user_id].add(role)
def remove_role_from_user(self, user_id: str, role: Role):
"""从用户中移除角色"""
if user_id in self._user_roles:
self._user_roles[user_id].discard(role)
def get_user_permissions(self, user_id: str) -> Set[Permission]:
"""根据用户的角色获取所有权限"""
permissions = set()
roles = self._user_roles.get(user_id, {Role.ANONYMOUS})
for role in roles:
role_perms = self._role_permissions.get(role, set())
permissions.update(role_perms)
return permissions
def has_permission(
self,
user_id: str,
permission: Permission,
resource_owner_id: Optional[str] = None
) -> bool:
"""检查用户是否有权限"""
permissions = self.get_user_permissions(user_id)
# 超级管理员拥有所有权限
if Permission.ADMIN in permissions:
return True
# 检查资源所有权对于非管理员权限
if permission in {Permission.READ, Permission.WRITE, Permission.DELETE}:
if resource_owner_id and user_id == resource_owner_id:
return True
return permission in permissions
# 检查权限的装饰器
def requires_permission(permission: Permission, resource_owner_param: Optional[str] = None):
"""装饰器检查用户是否有所需权限"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 从会话或令牌中获取当前用户
current_user_id = kwargs.get("current_user_id")
if not current_user_id:
raise PermissionError("需要认证")
# 如果指定了获取资源所有者ID
resource_owner_id = None
if resource_owner_param:
resource_owner_id = kwargs.get(resource_owner_param)
# 检查权限
rbac = RBACManager()
if not rbac.has_permission(
current_user_id,
permission,
resource_owner_id
):
raise PermissionError(
f"权限被拒绝。需要:{permission.value}"
)
return await func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
@requires_permission(Permission.DELETE, "user_id")
async def delete_user(user_id: str, current_user_id: str):
"""删除用户(仅限所有者或管理员)"""
# 实现在这里
pass
4. 多因素认证(MFA)
# core/mfa.py
import pyotp
import qrcode
import io
import base64
from typing import Optional
from fastapi import HTTPException
from enum import Enum
class MFAMethod(str, Enum):
TOTP = "totp"
SMS = "sms"
EMAIL = "email"
PUSH = "push"
class MFAService:
"""多因素认证服务"""
def __init__(self):
self.totp_window = 1 # 30秒窗口容忍度
def generate_totp_secret(self) -> str:
"""为用户生成新的TOTP密钥"""
return pyotp.random_base32()
def generate_qr_code(self, user_email: str, secret: str) -> str:
"""为TOTP设置生成QR码"""
totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=user_email,
issuer_name="MyApp"
)
# 生成QR码
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp_uri)
img = qr.make_image(fill_color="black", back_color="white")
# 转换为base64
buffer = io.BytesIO()
img.save(buffer, format="PNG")
img_str = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/png;base64,{img_str}"
def verify_totp(self, secret: str, token: str) -> bool:
"""验证TOTP令牌"""
totp = pyotp.TOTP(secret)
return totp.verify(token, valid_window=self.totp_window)
def generate_backup_codes(self, count: int = 10) -> List[str]:
"""为MFA恢复生成备份代码"""
codes = []
for _ in range(count):
code = f"{secrets.randbelow(10**6):06d}"
codes.append(code)
return codes
def verify_backup_code(
self,
provided_code: str,
backup_codes: List[str]
) -> bool:
"""验证备份代码并删除如果有效"""
if provided_code in backup_codes:
backup_codes.remove(provided_code)
return True
return False
async def send_sms_code(self, phone_number: str, code: str):
"""发送SMS验证码(使用SMS服务)"""
# 实现取决于SMS服务
# 例如使用Twilio:
# twilio_client.messages.create(
# body=f"Your verification code is: {code}",
# from_="+1234567890",
# to=phone_number
# )
pass
async def send_email_code(self, email: str, code: str):
"""发送电子邮件验证码"""
# 实现取决于电子邮件服务
# 例如使用SendGrid:
# sendgrid_client.send(
# from_email="noreply@myapp.com",
# to=email,
# subject="Verification Code",
# html_content=f"Your code is: {code}"
# )
pass
5. 安全中间件
# middleware/security.py
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging
logger = logging.getLogger(__name__)
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""添加安全头到所有响应"""
async def dispatch(self, request, call_next):
response = await call_next(request)
# 安全头
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' 'unsafe-eval'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self' data:; "
"connect-src 'self' https://api.example.com; "
"frame-ancestors 'none'"
)
# 移除服务器信息
response.headers.pop("Server", None)
response.headers.pop("X-Powered-By", None)
return response
class RateLimitMiddleware(BaseHTTPMiddleware):
"""限流中间件"""
def __init__(self, app, calls: int = 100, period: int = 60):
super().__init__(app)
self.calls = calls
self.period = period
self.clients = {}
async def dispatch(self, request, call_next):
client_ip = request.client.host
current_time = time.time()
# 清理旧条目
self.clients = {
ip: times for ip, times in self.clients.items()
if current_time - max(times) < self.period
}
# 检查限流
if client_ip in self.clients:
recent_calls = [
t for t in self.clients[client_ip]
if current_time - t < self.period
]
if len(recent_calls) >= self.calls:
raise HTTPException(
status_code=429,
detail="超出速率限制"
)
self.clients[client_ip] = recent_calls + [current_time]
else:
self.clients[client_ip] = [current_time]
return await call_next(request)
class AuditLoggingMiddleware(BaseHTTPMiddleware):
"""审计日志中间件"""
def __init__(self, app, sensitive_fields: list = None):
super().__init__(app)
self.sensitive_fields = sensitive_fields or [
"password", "token", "secret", "key"
]
async def dispatch(self, request, call_next):
start_time = time.time()
# 日志请求
request_data = {
"method": request.method,
"url": str(request.url),
"headers": dict(request.headers),
"client_ip": request.client.host
}
# 移除敏感数据
self._remove_sensitive_data(request_data)
logger.info(f"请求:{request_data}")
try:
response = await call_next(request)
# 日志响应
duration = time.time() - start_time
response_data = {
"status_code": response.status_code,
"duration": f"{duration:.3f}s"
}
logger.info(f"响应:{response_data}")
return response
except Exception as e:
logger.error(f"错误:{str(e)}", exc_info=True)
raise
def _remove_sensitive_data(self, data: dict):
"""从日志中移除敏感数据"""
if isinstance(data, dict):
for key in self.sensitive_fields:
if key in data:
data[key] = "[REDACTED]"
for value in data.values():
if isinstance(value, dict):
self._remove_sensitive_data(value)
elif isinstance(data, list):
for item in data:
if isinstance(item, dict):
self._remove_sensitive_data(item)
6. 零信任安全模式
# core/zero_trust.py
from typing import Optional, Dict, Any, List
import ipaddress
from datetime import datetime, timedelta
class ZeroTrustManager:
"""零信任安全管理器"""
def __init__(self):
self.trusted_networks = self._load_trusted_networks()
self.device_policies = self._load_device_policies()
def validate_access_request(
self,
request_data: Dict[str, Any]
) -> Dict[str, Any]:
"""使用零信任原则验证访问请求"""
validation_result = {
"allowed": False,
"risk_score": 0,
"factors": []
}
# 因素1:设备信任
device_trust = self._validate_device(request_data)
validation_result["factors"].append(device_trust)
# 因素2:网络信任
network_trust = self._validate_network(request_data)
validation_result["factors"].append(network_trust)
# 因素3:行为分析
behavior_trust = self._analyze_behavior(request_data)
validation_result["factors"].append(behavior_trust)
# 因素4:基于时间的异常检测
time_trust = self._validate_timing(request_data)
validation_result["factors"].append(time_trust)
# 计算总体风险评分
validation_result["risk_score"] = sum(
factor["risk"] for factor in validation_result["factors"]
)
# 确定是否允许访问
validation_result["allowed"] = (
validation_result["risk_score"] < 50 and
all(factor["trusted"] for factor in validation_result["factors"])
)
return validation_result
def _validate_device(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""验证设备信任"""
device_id = request_data.get("device_id")
user_agent = request_data.get("user_agent")
if not device_id:
return {"trusted": False, "risk": 30, "reason": "没有设备ID"}
# 检查设备是否已知且受信任
# 实现取决于您的设备管理系统
device_trusted = self._is_device_trusted(device_id, user_agent)
return {
"trusted": device_trusted,
"risk": 0 if device_trusted else 20,
"reason": "未知设备"
}
def _validate_network(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""验证网络信任"""
client_ip = request_data.get("client_ip")
try:
ip_obj = ipaddress.ip_address(client_ip)
except ValueError:
return {"trusted": False, "risk": 40, "reason": "无效IP"}
# 检查IP是否在受信任网络中
for trusted_network in self.trusted_networks:
if ip_obj in trusted_network:
return {"trusted": True, "risk": 0, "reason": "受信任网络"}
# 检查是否是私有网络
if ip_obj.is_private:
return {"trusted": True, "risk": 5, "reason": "私有网络"}
return {"trusted": False, "risk": 15, "reason": "不受信任网络"}
def _analyze_behavior(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""分析用户行为以发现异常"""
user_id = request_data.get("user_id")
location = request_data.get("location")
timestamp = request_data.get("timestamp", datetime.utcnow())
# 检查异常(同一用户从不同位置)
# 实现将涉及存储和分析用户模式
anomalies = self._detect_behavioral_anomalies(user_id, location, timestamp)
return {
"trusted": len(anomalies) == 0,
"risk": min(len(anomalies) * 10, 30),
"reason": f"异常检测:{anomalies}"
}
def _validate_timing(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""验证请求时间"""
timestamp = request_data.get("timestamp", datetime.utcnow())
# 检查非工作时间访问
hour = timestamp.hour
if hour < 6 or hour > 22:
return {
"trusted": True,
"risk": 10,
"reason": "非工作时间访问"
}
return {"trusted": True, "risk": 0, "reason": "正常工作时间"}
def _is_device_trusted(self, device_id: str, user_agent: str) -> bool:
"""检查设备是否受信任"""
# 实现取决于您的设备管理系统
# 这可以检查注册设备的数据库
return True # 占位符
def _detect_behavioral_anomalies(
self,
user_id: str,
location: Optional[str],
timestamp: datetime
) -> List[str]:
"""检测行为异常"""
anomalies = []
# 实施行为分析逻辑
# - 同一用户从多个位置
# - 快速连续请求
# - 不寻常的访问模式
# - 地理不可能性
return anomalies
def _load_trusted_networks(self):
"""加载受信任的网络范围"""
return [
ipaddress.ip_network("192.168.1.0/24"),
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
# 在这里添加您受信任的网络
]
def _load_device_policies(self):
"""加载设备安全策略"""
return {
"require_device_trust": True,
"max_devices_per_user": 5,
"session_timeout": 3600
}
7. 生产安全清单
# security/production_checklist.yaml
authentication:
jwt:
secret_key_length: 32
algorithm: HS256
access_token_expiry: 15m
refresh_token_expiry: 7d
refresh_rotation: true
password:
min_length: 12
complexity_requirements:
- uppercase
- lowercase
- numbers
- special_characters
hash_algorithm: pbkdf2_sha256
rounds: 120000
mfa:
required_for_admin: true
optional_for_users: true
backup_codes_count: 10
oauth2:
providers:
- google
- github
- microsoft
state_validation: true
pkce: true
security:
headers:
strict_transport_security: true
content_security_policy: true
x_frame_options: deny
x_xss_protection: true
rate_limiting:
default: 100/minute
auth_endpoints: 10/minute
admin_endpoints: 5/minute
session_management:
idle_timeout: 30m
absolute_timeout: 8h
concurrent_sessions: 3
audit_logging:
log_all_auth_events: true
log_sensitive_data: false
retention_period: 90d
zero_trust:
device_validation: true
network_validation: true
behavioral_analysis: true
risk_scoring: true
encryption:
at_rest: AES-256
in_transit: TLS-1.3
key_rotation: 90d
secrets_management:
use_vault: true
environment_variables: false
rotation_enabled: true