API 认证
概述
实现包括 JWT 令牌、OAuth 2.0、API 密钥和会话管理在内的全面认证策略,以确保 API 安全。
何时使用
- 保护 API 端点
- 实现用户登录/登出流程
- 管理访问令牌和刷新令牌
- 集成 OAuth 2.0 提供商
- 保护敏感数据
- 实施 API 密钥认证
指南
1. JWT 认证
// Node.js JWT 实现
const express = require('express');
const jwt = require('jsonwebtoken');
const bcrypt = require('bcrypt');
const app = express();
const SECRET_KEY = process.env.JWT_SECRET || 'your-secret-key';
const REFRESH_SECRET = process.env.REFRESH_SECRET || 'your-refresh-secret';
// 用户登录端点
app.post('/api/auth/login', async (req, res) => {
try {
const { email, password } = req.body;
// 在数据库中查找用户
const user = await User.findOne({ email });
if (!user) {
return res.status(401).json({ error: '无效凭证' });
}
// 验证密码
const isValid = await bcrypt.compare(password, user.password);
if (!isValid) {
return res.status(401).json({ error: '无效凭证' });
}
// 生成令牌
const accessToken = jwt.sign(
{ userId: user.id, email: user.email, role: user.role },
SECRET_KEY,
{ expiresIn: '15m' }
);
const refreshToken = jwt.sign(
{ userId: user.id },
REFRESH_SECRET,
{ expiresIn: '7d' }
);
// 在数据库中存储刷新令牌
await RefreshToken.create({ token: refreshToken, userId: user.id });
res.json({
accessToken,
refreshToken,
expiresIn: 900,
user: { id: user.id, email: user.email, role: user.role }
});
} catch (error) {
res.status(500).json({ error: '认证失败' });
}
});
// 刷新令牌端点
app.post('/api/auth/refresh', (req, res) => {
const { refreshToken } = req.body;
if (!refreshToken) {
return res.status(401).json({ error: '需要刷新令牌' });
}
try {
const decoded = jwt.verify(refreshToken, REFRESH_SECRET);
// 验证令牌是否存在于数据库
const storedToken = await RefreshToken.findOne({
token: refreshToken,
userId: decoded.userId
});
if (!storedToken) {
return res.status(401).json({ error: '无效刷新令牌' });
}
// 生成新的访问令牌
const newAccessToken = jwt.sign(
{ userId: decoded.userId },
SECRET_KEY,
{ expiresIn: '15m' }
);
res.json({ accessToken: newAccessToken, expiresIn: 900 });
} catch (error) {
res.status(401).json({ error: '无效刷新令牌' });
}
});
// 验证 JWT 的中间件
const verifyToken = (req, res, next) => {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1]; // 持有者令牌
if (!token) {
return res.status(401).json({ error: '需要访问令牌' });
}
try {
const decoded = jwt.verify(token, SECRET_KEY);
req.user = decoded;
next();
} catch (error) {
if (error.name === 'TokenExpiredError') {
return res.status(401).json({ error: '令牌过期', code: 'TOKEN_EXPIRED' });
}
res.status(403).json({ error: '无效令牌' });
}
};
// 受保护的端点
app.get('/api/profile', verifyToken, (req, res) => {
res.json({ user: req.user });
});
// 登出端点
app.post('/api/auth/logout', verifyToken, async (req, res) => {
try {
await RefreshToken.deleteOne({ userId: req.user.userId });
res.json({ message: '成功登出' });
} catch (error) {
res.status(500).json({ error: '登出失败' });
}
});
2. OAuth 2.0 实现
const passport = require('passport');
const GoogleStrategy = require('passport-google-oauth20').Strategy;
passport.use(new GoogleStrategy(
{
clientID: process.env.GOOGLE_CLIENT_ID,
clientSecret: process.env.GOOGLE_CLIENT_SECRET,
callbackURL: '/api/auth/google/callback'
},
async (accessToken, refreshToken, profile, done) => {
try {
let user = await User.findOne({ googleId: profile.id });
if (!user) {
user = await User.create({
googleId: profile.id,
email: profile.emails[0].value,
firstName: profile.name.givenName,
lastName: profile.name.familyName
});
}
return done(null, user);
} catch (error) {
return done(error);
}
}
));
// OAuth 路由
app.get('/api/auth/google',
passport.authenticate('google', { scope: ['profile', 'email'] })
);
app.get('/api/auth/google/callback',
passport.authenticate('google', { failureRedirect: '/login' }),
(req, res) => {
const token = jwt.sign(
{ userId: req.user.id, email: req.user.email },
SECRET_KEY,
{ expiresIn: '7d' }
);
res.redirect(`/dashboard?token=${token}`);
}
);
3. API 密钥认证
// API 密钥中间件
const verifyApiKey = (req, res, next) => {
const apiKey = req.headers['x-api-key'];
if (!apiKey) {
return res.status(401).json({ error: '需要 API 密钥' });
}
try {
// 验证 API 密钥格式和存在性
const keyHash = crypto.createHash('sha256').update(apiKey).digest('hex');
const apiKeyRecord = await ApiKey.findOne({ key_hash: keyHash, active: true });
if (!apiKeyRecord) {
return res.status(401).json({ error: '无效 API 密钥' });
}
req.apiKey = apiKeyRecord;
next();
} catch (error) {
res.status(500).json({ error: '认证失败' });
}
};
// 生成 API 密钥端点
app.post('/api/apikeys/generate', verifyToken, async (req, res) => {
try {
const apiKey = crypto.randomBytes(32).toString('hex');
const keyHash = crypto.createHash('sha256').update(apiKey).digest('hex');
const record = await ApiKey.create({
userId: req.user.userId,
key_hash: keyHash,
name: req.body.name,
active: true
});
res.json({ apiKey, message: '安全保存此密钥' });
} catch (error) {
res.status(500).json({ error: '生成 API 密钥失败' });
}
});
// 使用 API 密钥的受保护端点
app.get('/api/data', verifyApiKey, (req, res) => {
res.json({ data: 'API 密钥持有者的敏感数据' });
});
4. Python 认证实现
from flask import Flask, request, jsonify
from flask_jwt_extended import JWTManager, create_access_token, jwt_required
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'secret-key'
jwt = JWTManager(app)
@app.route('/api/auth/login', methods=['POST'])
def login():
data = request.get_json()
user = User.query.filter_by(email=data['email']).first()
if not user or not check_password_hash(user.password, data['password']):
return jsonify({'error': '无效凭证'}), 401
access_token = create_access_token(
identity=user.id,
additional_claims={'email': user.email, 'role': user.role}
)
return jsonify({
'accessToken': access_token,
'user': {'id': user.id, 'email': user.email}
}), 200
@app.route('/api/protected', methods=['GET'])
@jwt_required()
def protected():
from flask_jwt_extended import get_jwt_identity
user_id = get_jwt_identity()
return jsonify({'userId': user_id}), 200
def require_role(role):
def decorator(fn):
@wraps(fn)
@jwt_required()
def wrapper(*args, **kwargs):
from flask_jwt_extended import get_jwt
claims = get_jwt()
if claims.get('role') != role:
return jsonify({'error': '禁止'}), 403
return fn(*args, **kwargs)
return wrapper
return decorator
@app.route('/api/admin', methods=['GET'])
@require_role('admin')
def admin_endpoint():
return jsonify({'message': '管理员数据'}), 200
最佳实践
✅ 做
- 对所有认证使用 HTTPS
- 安全存储令牌(HttpOnly 饼干)
- 实施令牌刷新机制
- 设置适当的令牌过期时间
- 哈希和盐密码
- 使用强密钥
- 在每个请求上验证令牌
- 在认证端点上实施速率限制
- 记录认证尝试
- 定期旋转密钥
❌ 不做
- 以纯文本形式存储密码
- 在 URL 参数中发送令牌
- 使用弱密钥
- 在 JWT 负载中存储敏感数据
- 忽略令牌过期
- 在生产中禁用 HTTPS
- 记录敏感令牌
- 跨服务重用 API 密钥
- 在代码中存储凭据
安全头
app.use((req, res, next) => {
res.setHeader('X-Content-Type-Options', 'nosniff');
res.setHeader('X-Frame-Options', 'DENY');
res.setHeader('X-XSS-Protection', '1; mode=block');
res.setHeader('Strict-Transport-Security', 'max-age=31536000; includeSubDomains');
next();
});