API认证Skill api-authentication

提供全面的API安全认证策略,包括JWT、OAuth 2.0、API密钥和会话管理,以保护API端点和敏感数据。

身份认证 0 次安装 0 次浏览 更新于 3/3/2026

API 认证

概述

实现包括 JWT 令牌、OAuth 2.0、API 密钥和会话管理在内的全面认证策略,以确保 API 安全。

何时使用

  • 保护 API 端点
  • 实现用户登录/登出流程
  • 管理访问令牌和刷新令牌
  • 集成 OAuth 2.0 提供商
  • 保护敏感数据
  • 实施 API 密钥认证

指南

1. JWT 认证

// Node.js JWT 实现
const express = require('express');
const jwt = require('jsonwebtoken');
const bcrypt = require('bcrypt');

const app = express();
const SECRET_KEY = process.env.JWT_SECRET || 'your-secret-key';
const REFRESH_SECRET = process.env.REFRESH_SECRET || 'your-refresh-secret';

// 用户登录端点
app.post('/api/auth/login', async (req, res) => {
  try {
    const { email, password } = req.body;

    // 在数据库中查找用户
    const user = await User.findOne({ email });
    if (!user) {
      return res.status(401).json({ error: '无效凭证' });
    }

    // 验证密码
    const isValid = await bcrypt.compare(password, user.password);
    if (!isValid) {
      return res.status(401).json({ error: '无效凭证' });
    }

    // 生成令牌
    const accessToken = jwt.sign(
      { userId: user.id, email: user.email, role: user.role },
      SECRET_KEY,
      { expiresIn: '15m' }
    );

    const refreshToken = jwt.sign(
      { userId: user.id },
      REFRESH_SECRET,
      { expiresIn: '7d' }
    );

    // 在数据库中存储刷新令牌
    await RefreshToken.create({ token: refreshToken, userId: user.id });

    res.json({
      accessToken,
      refreshToken,
      expiresIn: 900,
      user: { id: user.id, email: user.email, role: user.role }
    });
  } catch (error) {
    res.status(500).json({ error: '认证失败' });
  }
});

// 刷新令牌端点
app.post('/api/auth/refresh', (req, res) => {
  const { refreshToken } = req.body;

  if (!refreshToken) {
    return res.status(401).json({ error: '需要刷新令牌' });
  }

  try {
    const decoded = jwt.verify(refreshToken, REFRESH_SECRET);

    // 验证令牌是否存在于数据库
    const storedToken = await RefreshToken.findOne({
      token: refreshToken,
      userId: decoded.userId
    });

    if (!storedToken) {
      return res.status(401).json({ error: '无效刷新令牌' });
    }

    // 生成新的访问令牌
    const newAccessToken = jwt.sign(
      { userId: decoded.userId },
      SECRET_KEY,
      { expiresIn: '15m' }
    );

    res.json({ accessToken: newAccessToken, expiresIn: 900 });
  } catch (error) {
    res.status(401).json({ error: '无效刷新令牌' });
  }
});

// 验证 JWT 的中间件
const verifyToken = (req, res, next) => {
  const authHeader = req.headers['authorization'];
  const token = authHeader && authHeader.split(' ')[1]; // 持有者令牌

  if (!token) {
    return res.status(401).json({ error: '需要访问令牌' });
  }

  try {
    const decoded = jwt.verify(token, SECRET_KEY);
    req.user = decoded;
    next();
  } catch (error) {
    if (error.name === 'TokenExpiredError') {
      return res.status(401).json({ error: '令牌过期', code: 'TOKEN_EXPIRED' });
    }
    res.status(403).json({ error: '无效令牌' });
  }
};

// 受保护的端点
app.get('/api/profile', verifyToken, (req, res) => {
  res.json({ user: req.user });
});

// 登出端点
app.post('/api/auth/logout', verifyToken, async (req, res) => {
  try {
    await RefreshToken.deleteOne({ userId: req.user.userId });
    res.json({ message: '成功登出' });
  } catch (error) {
    res.status(500).json({ error: '登出失败' });
  }
});

2. OAuth 2.0 实现

const passport = require('passport');
const GoogleStrategy = require('passport-google-oauth20').Strategy;

passport.use(new GoogleStrategy(
  {
    clientID: process.env.GOOGLE_CLIENT_ID,
    clientSecret: process.env.GOOGLE_CLIENT_SECRET,
    callbackURL: '/api/auth/google/callback'
  },
  async (accessToken, refreshToken, profile, done) => {
    try {
      let user = await User.findOne({ googleId: profile.id });

      if (!user) {
        user = await User.create({
          googleId: profile.id,
          email: profile.emails[0].value,
          firstName: profile.name.givenName,
          lastName: profile.name.familyName
        });
      }

      return done(null, user);
    } catch (error) {
      return done(error);
    }
  }
));

// OAuth 路由
app.get('/api/auth/google',
  passport.authenticate('google', { scope: ['profile', 'email'] })
);

app.get('/api/auth/google/callback',
  passport.authenticate('google', { failureRedirect: '/login' }),
  (req, res) => {
    const token = jwt.sign(
      { userId: req.user.id, email: req.user.email },
      SECRET_KEY,
      { expiresIn: '7d' }
    );
    res.redirect(`/dashboard?token=${token}`);
  }
);

3. API 密钥认证

// API 密钥中间件
const verifyApiKey = (req, res, next) => {
  const apiKey = req.headers['x-api-key'];

  if (!apiKey) {
    return res.status(401).json({ error: '需要 API 密钥' });
  }

  try {
    // 验证 API 密钥格式和存在性
    const keyHash = crypto.createHash('sha256').update(apiKey).digest('hex');
    const apiKeyRecord = await ApiKey.findOne({ key_hash: keyHash, active: true });

    if (!apiKeyRecord) {
      return res.status(401).json({ error: '无效 API 密钥' });
    }

    req.apiKey = apiKeyRecord;
    next();
  } catch (error) {
    res.status(500).json({ error: '认证失败' });
  }
};

// 生成 API 密钥端点
app.post('/api/apikeys/generate', verifyToken, async (req, res) => {
  try {
    const apiKey = crypto.randomBytes(32).toString('hex');
    const keyHash = crypto.createHash('sha256').update(apiKey).digest('hex');

    const record = await ApiKey.create({
      userId: req.user.userId,
      key_hash: keyHash,
      name: req.body.name,
      active: true
    });

    res.json({ apiKey, message: '安全保存此密钥' });
  } catch (error) {
    res.status(500).json({ error: '生成 API 密钥失败' });
  }
});

// 使用 API 密钥的受保护端点
app.get('/api/data', verifyApiKey, (req, res) => {
  res.json({ data: 'API 密钥持有者的敏感数据' });
});

4. Python 认证实现

from flask import Flask, request, jsonify
from flask_jwt_extended import JWTManager, create_access_token, jwt_required
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps

app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'secret-key'
jwt = JWTManager(app)

@app.route('/api/auth/login', methods=['POST'])
def login():
    data = request.get_json()
    user = User.query.filter_by(email=data['email']).first()

    if not user or not check_password_hash(user.password, data['password']):
        return jsonify({'error': '无效凭证'}), 401

    access_token = create_access_token(
        identity=user.id,
        additional_claims={'email': user.email, 'role': user.role}
    )

    return jsonify({
        'accessToken': access_token,
        'user': {'id': user.id, 'email': user.email}
    }), 200

@app.route('/api/protected', methods=['GET'])
@jwt_required()
def protected():
    from flask_jwt_extended import get_jwt_identity
    user_id = get_jwt_identity()
    return jsonify({'userId': user_id}), 200

def require_role(role):
    def decorator(fn):
        @wraps(fn)
        @jwt_required()
        def wrapper(*args, **kwargs):
            from flask_jwt_extended import get_jwt
            claims = get_jwt()
            if claims.get('role') != role:
                return jsonify({'error': '禁止'}), 403
            return fn(*args, **kwargs)
        return wrapper
    return decorator

@app.route('/api/admin', methods=['GET'])
@require_role('admin')
def admin_endpoint():
    return jsonify({'message': '管理员数据'}), 200

最佳实践

✅ 做

  • 对所有认证使用 HTTPS
  • 安全存储令牌(HttpOnly 饼干)
  • 实施令牌刷新机制
  • 设置适当的令牌过期时间
  • 哈希和盐密码
  • 使用强密钥
  • 在每个请求上验证令牌
  • 在认证端点上实施速率限制
  • 记录认证尝试
  • 定期旋转密钥

❌ 不做

  • 以纯文本形式存储密码
  • 在 URL 参数中发送令牌
  • 使用弱密钥
  • 在 JWT 负载中存储敏感数据
  • 忽略令牌过期
  • 在生产中禁用 HTTPS
  • 记录敏感令牌
  • 跨服务重用 API 密钥
  • 在代码中存储凭据

安全头

app.use((req, res, next) => {
  res.setHeader('X-Content-Type-Options', 'nosniff');
  res.setHeader('X-Frame-Options', 'DENY');
  res.setHeader('X-XSS-Protection', '1; mode=block');
  res.setHeader('Strict-Transport-Security', 'max-age=31536000; includeSubDomains');
  next();
});