会话管理
概述
实现全面的会话管理系统,包括安全的令牌处理、会话持久性、令牌刷新机制、适当的登出程序和CSRF保护,适用于不同的后端框架。
何时使用
- 实现用户认证系统
- 管理会话状态和用户上下文
- 处理JWT令牌刷新周期
- 实施登出功能
- 防止CSRF攻击
- 管理会话过期和清理
指令
1. JWT令牌生成和验证
# Python/Flask 示例
from flask import current_app
from datetime import datetime, timedelta
import jwt
import os
class TokenManager:
def __init__(self, secret_key=None):
self.secret_key = secret_key or os.getenv('JWT_SECRET')
self.algorithm = 'HS256'
self.access_token_expires_hours = 1
self.refresh_token_expires_days = 7
def generate_tokens(self, user_id, email, role='user'):
"""生成访问令牌和刷新令牌"""
now = datetime.utcnow()
# 访问令牌
access_payload = {
'user_id': user_id,
'email': email,
'role': role,
'type': 'access',
'iat': now,
'exp': now + timedelta(hours=self.access_token_expires_hours)
}
access_token = jwt.encode(access_payload, self.secret_key, algorithm=self.algorithm)
# 刷新令牌
refresh_payload = {
'user_id': user_id,
'type': 'refresh',
'iat': now,
'exp': now + timedelta(days=self.refresh_token_expires_days)
}
refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm=self.algorithm)
return {
'access_token': access_token,
'refresh_token': refresh_token,
'expires_in': self.access_token_expires_hours * 3600,
'token_type': 'Bearer'
}
def verify_token(self, token, token_type='access'):
"""验证和解码JWT令牌"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
# 检查令牌类型是否匹配
if payload.get('type') != token_type:
return None, 'Invalid token type'
return payload, None
except jwt.ExpiredSignatureError:
return None, 'Token expired'
except jwt.InvalidTokenError:
return None, 'Invalid token'
def refresh_access_token(self, refresh_token):
"""从刷新令牌生成新的访问令牌"""
payload, error = self.verify_token(refresh_token, token_type='refresh')
if error:
return None, error
new_access_token = self.generate_tokens(
payload['user_id'],
payload.get('email', ''),
payload.get('role', 'user')
)
return new_access_token, None
2. Node.js/Express JWT实现
// Node.js/Express 示例
const jwt = require('jsonwebtoken');
const crypto = require('crypto');
const redis = require('redis');
class SessionManager {
constructor() {
this.secretKey = process.env.JWT_SECRET || 'dev-secret';
this.algorithm = 'HS256';
this.accessTokenExpiry = '1h';
this.refreshTokenExpiry = '7d';
this.redisClient = redis.createClient();
}
generateTokens(userId, email, role = 'user') {
const now = new Date();
const jti = crypto.randomBytes(16).toString('hex');
const accessToken = jwt.sign(
{
userId,
email,
role,
type: 'access',
jti,
iat: Math.floor(now.getTime() / 1000)
},
this.secretKey,
{ algorithm: this.algorithm, expiresIn: this.accessTokenExpiry }
);
const refreshToken = jwt.sign(
{
userId,
type: 'refresh',
jti,
iat: Math.floor(now.getTime() / 1000)
},
this.secretKey,
{ algorithm: this.algorithm, expiresIn: this.refreshTokenExpiry }
);
return {
accessToken,
refreshToken,
expiresIn: 3600,
tokenType: 'Bearer'
};
}
verifyToken(token, tokenType = 'access') {
try {
const decoded = jwt.verify(token, this.secretKey, {
algorithms: [this.algorithm]
});
if (decoded.type !== tokenType) {
return { payload: null, error: 'Invalid token type' };
}
return { payload: decoded, error: null };
} catch (err) {
if (err.name === 'TokenExpiredError') {
return { payload: null, error: 'Token expired' };
}
return { payload: null, error: 'Invalid token' };
}
}
async isTokenBlacklisted(jti) {
const result = await this.redisClient.get(`blacklist:${jti}`);
return result !== null;
}
async blacklistToken(jti, expiresIn) {
await this.redisClient.setex(`blacklist:${jti}`, expiresIn, '1');
}
async logout(token) {
const decoded = jwt.decode(token);
if (decoded && decoded.jti) {
const expiresIn = decoded.exp - Math.floor(Date.now() / 1000);
await this.blacklistToken(decoded.jti, expiresIn);
}
}
refreshAccessToken(refreshToken) {
const { payload, error } = this.verifyToken(refreshToken, 'refresh');
if (error) {
return { tokens: null, error };
}
return {
tokens: this.generateTokens(payload.userId, payload.email, payload.role),
error: null
};
}
}
// Middleware
const authMiddleware = (req, res, next) => {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.status(401).json({ error: 'No token provided' });
}
const sessionManager = new SessionManager();
const { payload, error } = sessionManager.verifyToken(token);
if (error) {
return res.status(401).json({ error });
}
req.user = payload;
next();
};
3. 使用Redis的会话存储
# Python/Flask与Redis
import redis
import json
from datetime import timedelta
from functools import wraps
class RedisSessionManager:
def __init__(self, redis_url='redis://localhost:6379'):
self.redis = redis.from_url(redis_url, decode_responses=True)
self.prefix = 'session:'
def create_session(self, user_id, data, expire_hours=24):
"""为用户创建会话"""
session_data = {
'user_id': user_id,
'data': data,
'created_at': datetime.utcnow().isoformat(),
'last_activity': datetime.utcnow().isoformat()
}
session_id = secrets.token_urlsafe(32)
key = f'{self.prefix}{session_id}'
self.redis.setex(
key,
timedelta(hours=expire_hours),
json.dumps(session_data)
)
return session_id
def get_session(self, session_id):
"""检索会话数据"""
key = f'{self.prefix}{session_id}'
data = self.redis.get(key)
if not data:
return None
session_data = json.loads(data)
# 更新最后活动
session_data['last_activity'] = datetime.utcnow().isoformat()
self.redis.setex(key, timedelta(hours=24), json.dumps(session_data))
return session_data
def destroy_session(self, session_id):
"""销毁会话"""
key = f'{self.prefix}{session_id}'
self.redis.delete(key)
def update_session(self, session_id, updates):
"""更新会话数据"""
session_data = self.get_session(session_id)
if not session_data:
return False
session_data['data'].update(updates)
key = f'{self.prefix}{session_id}'
self.redis.setex(
key,
timedelta(hours=24),
json.dumps(session_data)
)
return True
def get_user_sessions(self, user_id):
"""获取用户的所有会话"""
cursor = 0
sessions = []
while True:
cursor, keys = self.redis.scan(cursor, match=f'{self.prefix}*')
for key in keys:
data = json.loads(self.redis.get(key))
if data['user_id'] == user_id:
sessions.append({
'session_id': key.replace(self.prefix, ''),
'created_at': data['created_at'],
'last_activity': data['last_activity']
})
if cursor == 0:
break
return sessions
def invalidate_all_user_sessions(self, user_id):
"""从所有设备注销用户"""
sessions = self.get_user_sessions(user_id)
for session in sessions:
self.destroy_session(session['session_id'])
4. CSRF保护
# Flask CSRF保护
from flask_wtf.csrf import CSRFProtect
from flask import session, request
csrf = CSRFProtect()
@app.route('/login', methods=['POST'])
@csrf.protect
def login():
# CSRF令牌自动验证
email = request.json.get('email')
password = request.json.get('password')
user = User.query.filter_by(email=email).first()
if user and user.verify_password(password):
session['user_id'] = user.id
session['csrf_token'] = csrf.generate_csrf()
return jsonify({'success': True}), 200
return jsonify({'error': 'Invalid credentials'}), 401
# JavaScript客户端
async function login(email, password) {
const response = await fetch('/csrf-token');
const { csrfToken } = await response.json();
return fetch('/login', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRF-Token': csrfToken
},
body: JSON.stringify({ email, password })
});
}
5. 会话中间件链
// Node.js中间件链
const express = require('express');
const app = express();
// 1.解析cookies
app.use(express.json());
app.use(cookieParser(process.env.COOKIE_SECRET));
// 2.会话中间件
app.use(session({
secret: process.env.SESSION_SECRET,
resave: false,
saveUninitialized: false,
cookie: {
httpOnly: true,
secure: process.env.NODE_ENV === 'production',
sameSite: 'strict',
maxAge: 24 * 60 * 60 * 1000
},
store: new RedisStore({ client: redisClient })
}));
// 3.CSRF保护
const csrfProtection = csrf({ cookie: false });
// 4.每个会话的速率限制
const sessionRateLimit = rateLimit({
store: new RedisStore({ client: redisClient }),
keyGenerator: (req) => req.sessionID,
windowMs: 15 * 60 * 1000,
max: 100
});
app.use(sessionRateLimit);
// 5.认证检查
const requireAuth = (req, res, next) => {
if (!req.session.user) {
return res.status(401).json({ error: 'Unauthorized' });
}
req.user = req.session.user;
next();
};
app.post('/api/login', csrfProtection, async (req, res) => {
//验证凭据
const user = await User.findOne({ email: req.body.email });
if (user && await user.verifyPassword(req.body.password)) {
req.session.user = { id: user.id, email: user.email, role: user.role };
req.session.regenerate((err) => {
if (err) return res.status(500).json({ error: 'Server error' });
res.json({ success: true });
});
} else {
res.status(401).json({ error: 'Invalid credentials' });
}
});
app.post('/api/logout', requireAuth, (req, res) => {
req.session.destroy((err) => {
if (err) return res.status(500).json({ error: 'Logout failed' });
res.clearCookie('connect.sid');
res.json({ success: true });
});
});
6. 令牌刷新端点
# Flask令牌刷新端点
from flask import request, jsonify
from functools import wraps
@app.route('/api/auth/refresh', methods=['POST'])
def refresh_token():
data = request.get_json()
refresh_token = data.get('refresh_token')
if not refresh_token:
return jsonify({'error': 'Refresh token required'}), 400
token_manager = TokenManager()
tokens, error = token_manager.refresh_access_token(refresh_token)
if error:
return jsonify({'error': error}), 401
return jsonify(tokens), 200
@app.route('/api/auth/logout', methods=['POST'])
@require_auth
def logout():
token = request.headers['Authorization'].split(' ')[1]
session_manager = RedisSessionManager()
session_manager.destroy_session(token)
return jsonify({'message': 'Logged out successfully'}), 200
7. 会话清理和维护
# 使用APScheduler的计划清理任务
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
class SessionCleanup:
def __init__(self, redis_client, cleanup_interval_minutes=60):
self.redis = redis_client
self.cleanup_interval = cleanup_interval_minutes
self.scheduler = BackgroundScheduler()
def start(self):
self.scheduler.add_job(
func=self.cleanup_expired_sessions,
trigger='interval',
minutes=self.cleanup_interval,
id='cleanup_expired_sessions',
replace_existing=True
)
self.scheduler.start()
atexit.register(lambda: self.scheduler.shutdown())
def cleanup_expired_sessions(self):
"""从Redis中移除过期会话"""
cursor = 0
removed_count = 0
while True:
cursor, keys = self.redis.scan(cursor, match='session:*')
for key in keys:
ttl = self.redis.ttl(key)
if ttl == -2: # 键不存在
removed_count += 1
elif ttl < 300: # 剩余时间少于5分钟
self.redis.delete(key)
removed_count += 1
if cursor == 0:
break
return removed_count
# 应用启动时初始化
cleanup = SessionCleanup(redis_client)
cleanup.start()
最佳实践
✅ 做
- 使用HTTPS传输所有会话
- 实现安全的cookies(httpOnly, sameSite, secure标志)
- 使用具有适当过期时间的JWT
- 实施令牌刷新机制
- 安全存储刷新令牌
- 在每个请求上验证令牌
- 使用强密钥
- 实施会话超时
- 记录认证事件
- 在注销时清除会话数据
- 为状态更改请求使用CSRF令牌
❌ 不做
- 在令牌中存储敏感数据
- 使用短密钥
- 在URL中传输令牌
- 忽略令牌过期
- 在不同环境之间重用令牌密钥
- 在localStorage中存储令牌(使用httpOnly cookies)
- 不使用HTTPS实施会话
- 忘记验证令牌签名
- 在日志中暴露会话ID
- 使用可预测的会话ID
完整示例
from flask import Flask, request, jsonify
from datetime import datetime, timedelta
import jwt
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
TOKEN_MANAGER = TokenManager()
@app.route('/login', methods=['POST'])
def login():
data = request.json
user = User.query.filter_by(email=data['email']).first()
if user and user.verify_password(data['password']):
tokens = TOKEN_MANAGER.generate_tokens(user.id, user.email, user.role)
return jsonify(tokens), 200
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/refresh', methods=['POST'])
def refresh():
refresh_token = request.json.get('refresh_token')
tokens, error = TOKEN_MANAGER.refresh_access_token(refresh_token)
if error:
return jsonify({'error': error}), 401
return jsonify(tokens), 200