身份验证与授权Skill auth

此技能专注于现代应用的身份验证与授权策略,涵盖OAuth2、JWT、RBAC/ABAC、会话管理、API密钥、密码哈希和多因素认证(MFA)。适用于实现登录流程、访问控制、身份管理、令牌处理、权限管理、会话处理、API密钥认证或MFA,包含关键词:身份验证、授权、OAuth2、JWT、RBAC、ABAC、MFA、安全认证、令牌管理、访问控制、会话安全。

身份认证 0 次安装 0 次浏览 更新于 3/24/2026

name: auth description: | 身份验证与授权模式,包括OAuth2、JWT、RBAC/ABAC、会话管理、API密钥、密码哈希和MFA。

使用时机:当实现登录流程、访问控制、身份管理、令牌、权限、会话处理、API密钥认证或MFA时使用。 不要用于:安全漏洞扫描(使用 /security-scan)、安全审计(使用 /security-audit)、威胁建模(使用 /threat-model)。

触发器:login、logout、signin、signup、authentication、authorization、password、credential、token、JWT、OAuth、OAuth2、OIDC、SSO、SAML、session、cookie、RBAC、ABAC、permissions、roles、MFA、2FA、TOTP、API key、PKCE。 triggers:

  • login
  • logout
  • signin
  • signup
  • register
  • authentication
  • authorization
  • password
  • credential
  • token
  • JWT
  • OAuth
  • OAuth2
  • OIDC
  • OpenID
  • SSO
  • SAML
  • session
  • cookie
  • refresh token
  • access token
  • bearer
  • authorization header
  • auth header
  • 401
  • 403
  • forbidden
  • unauthorized
  • RBAC
  • ABAC
  • permissions
  • roles
  • access control
  • identity
  • MFA
  • 2FA
  • two-factor
  • multi-factor
  • TOTP
  • API key
  • auth flow
  • PKCE
  • client credentials

身份验证与授权

概述

此技能涵盖现代应用的综合身份验证与授权策略。它包括身份验证(验证身份)、授权(访问控制)以及跨Web、移动和API环境的凭证安全管理。

使用时机

软件工程师 (Sonnet) - 在以下情况下使用:

  • 实现标准认证流程(登录、注销、密码重置)
  • 为现有应用添加JWT或会话处理
  • 实现RBAC/ABAC模式并定义需求
  • 集成到现有的OAuth2提供商
  • 添加MFA或API密钥认证

高级软件工程师 (Opus) - 在以下情况下升级使用:

  • 从头设计认证架构
  • 选择认证策略(JWT vs 会话、OAuth流程)
  • 评估不同访问控制模型之间的权衡
  • 规划令牌轮换、刷新策略或会话生命周期
  • 做出跨领域的安全决策

高级软件工程师 (Opus) - 在以下情况下请求审核:

  • 实现密码哈希或凭证存储
  • 处理敏感令牌(刷新令牌、API密钥)
  • 实现速率限制或暴力破解保护
  • 添加MFA或阶梯式认证
  • 处理个人身份信息、合规性或监管要求
  • 在生产之前进行任何认证/授权实现
  • 设置身份提供商(如Keycloak、Auth0、Cognito)
  • 配置SSO、SAML或OIDC集成
  • 扩展会话存储(Redis集群、分布式会话)
  • 管理密钥、密钥轮换基础设施
  • 设置用于JWT签名的证书管理

关键概念

OAuth2 流程

授权码流程 - 最适合服务器端应用:

// 1. 重定向用户到授权服务器
const authUrl = new URL("https://auth.example.com/authorize");
authUrl.searchParams.set("response_type", "code");
authUrl.searchParams.set("client_id", CLIENT_ID);
authUrl.searchParams.set("redirect_uri", REDIRECT_URI);
authUrl.searchParams.set("scope", "openid profile email");
authUrl.searchParams.set("state", generateSecureState());

// 2. 交换代码以获取令牌(服务器端)
async function exchangeCode(code: string): Promise<TokenResponse> {
  const response = await fetch("https://auth.example.com/token", {
    method: "POST",
    headers: { "Content-Type": "application/x-www-form-urlencoded" },
    body: new URLSearchParams({
      grant_type: "authorization_code",
      code,
      client_id: CLIENT_ID,
      client_secret: CLIENT_SECRET,
      redirect_uri: REDIRECT_URI,
    }),
  });
  return response.json();
}

PKCE 流程 - 公共客户端(SPA、移动应用)必需:

// 生成代码验证器和挑战
function generatePKCE(): { verifier: string; challenge: string } {
  const verifier = base64UrlEncode(crypto.getRandomValues(new Uint8Array(32)));
  const challenge = base64UrlEncode(
    await crypto.subtle.digest("SHA-256", new TextEncoder().encode(verifier)),
  );
  return { verifier, challenge };
}

// 包含在授权请求中
authUrl.searchParams.set("code_challenge", challenge);
authUrl.searchParams.set("code_challenge_method", "S256");

// 在令牌交换中包含验证器
body.set("code_verifier", verifier);

客户端凭证流程 - 用于服务到服务通信:

async function getServiceToken(): Promise<string> {
  const response = await fetch("https://auth.example.com/token", {
    method: "POST",
    headers: {
      "Content-Type": "application/x-www-form-urlencoded",
      Authorization: `Basic ${btoa(`${CLIENT_ID}:${CLIENT_SECRET}`)}`,
    },
    body: new URLSearchParams({
      grant_type: "client_credentials",
      scope: "api:read api:write",
    }),
  });
  return (await response.json()).access_token;
}

JWT 处理

令牌结构与签名

import jwt from "jsonwebtoken";

interface TokenPayload {
  sub: string; // 主题(用户ID)
  iss: string; // 签发者
  aud: string; // 受众
  exp: number; // 过期时间
  iat: number; // 签发时间
  roles: string[]; // 自定义声明
}

// 使用RS256签名(非对称 - 生产环境推荐)
function signToken(payload: Omit<TokenPayload, "iat" | "exp">): string {
  return jwt.sign(
    { ...payload, iat: Math.floor(Date.now() / 1000) },
    PRIVATE_KEY,
    {
      algorithm: "RS256",
      expiresIn: "15m",
      issuer: "https://api.example.com",
      audience: "https://app.example.com",
    },
  );
}

// 验证令牌
function verifyToken(token: string): TokenPayload {
  return jwt.verify(token, PUBLIC_KEY, {
    algorithms: ["RS256"],
    issuer: "https://api.example.com",
    audience: "https://app.example.com",
  }) as TokenPayload;
}

刷新令牌模式

interface TokenPair {
  accessToken: string; // 短生命周期(15分钟)
  refreshToken: string; // 长生命周期(7天),安全存储
}

async function refreshTokens(refreshToken: string): Promise<TokenPair> {
  // 验证刷新令牌存在于数据库中(允许撤销)
  const storedToken = await db.refreshTokens.findUnique({
    where: { token: hashToken(refreshToken) },
  });

  if (!storedToken || storedToken.expiresAt < new Date()) {
    throw new UnauthorizedError("无效刷新令牌");
  }

  // 轮换刷新令牌(一次性使用)
  await db.refreshTokens.delete({ where: { id: storedToken.id } });

  const newRefreshToken = generateSecureToken();
  await db.refreshTokens.create({
    data: {
      token: hashToken(newRefreshToken),
      userId: storedToken.userId,
      expiresAt: addDays(new Date(), 7),
    },
  });

  return {
    accessToken: signToken({ sub: storedToken.userId, roles: [] }),
    refreshToken: newRefreshToken,
  };
}

RBAC(基于角色的访问控制)

// 定义角色和权限
const ROLES = {
  admin: [
    "users:read",
    "users:write",
    "users:delete",
    "reports:read",
    "settings:write",
  ],
  manager: ["users:read", "users:write", "reports:read"],
  user: ["users:read:own", "reports:read:own"],
} as const;

type Role = keyof typeof ROLES;
type Permission = (typeof ROLES)[Role][number];

// 权限检查中间件
function requirePermission(permission: Permission) {
  return (req: Request, res: Response, next: NextFunction) => {
    const userRoles: Role[] = req.user.roles;
    const userPermissions = userRoles.flatMap((role) => ROLES[role]);

    // 处理:own后缀以实现资源级权限
    const [resource, action, scope] = permission.split(":");
    const basePermission = `${resource}:${action}`;

    const hasPermission =
      userPermissions.includes(permission) ||
      (scope === "own" && userPermissions.includes(basePermission));

    if (!hasPermission) {
      return res.status(403).json({ error: "禁止访问" });
    }
    next();
  };
}

// 使用
app.delete("/users/:id", requirePermission("users:delete"), deleteUser);

ABAC(基于属性的访问控制)

interface PolicyContext {
  subject: { id: string; roles: string[]; department: string };
  resource: { type: string; owner: string; classification: string };
  action: string;
  environment: { time: Date; ip: string };
}

type Policy = (ctx: PolicyContext) => boolean;

const policies: Policy[] = [
  // 用户可以访问自己的资源
  (ctx) => ctx.resource.owner === ctx.subject.id,

  // 经理可以访问其部门的资源
  (ctx) =>
    ctx.subject.roles.includes("manager") &&
    ctx.resource.department === ctx.subject.department,

  // 在非工作时间不允许访问机密资源
  (ctx) => {
    if (ctx.resource.classification === "confidential") {
      const hour = ctx.environment.time.getHours();
      return hour >= 9 && hour < 17;
    }
    return true;
  },
];

function checkAccess(ctx: PolicyContext): boolean {
  return policies.every((policy) => policy(ctx));
}

会话管理

import { Redis } from "ioredis";

const redis = new Redis();
const SESSION_TTL = 24 * 60 * 60; // 24小时

interface Session {
  userId: string;
  createdAt: number;
  lastActivity: number;
  userAgent: string;
  ip: string;
}

async function createSession(userId: string, req: Request): Promise<string> {
  const sessionId = crypto.randomUUID();
  const session: Session = {
    userId,
    createdAt: Date.now(),
    lastActivity: Date.now(),
    userAgent: req.headers["user-agent"] || "",
    ip: req.ip,
  };

  await redis.setex(
    `session:${sessionId}`,
    SESSION_TTL,
    JSON.stringify(session),
  );
  await redis.sadd(`user-sessions:${userId}`, sessionId);

  return sessionId;
}

async function validateSession(sessionId: string): Promise<Session | null> {
  const data = await redis.get(`session:${sessionId}`);
  if (!data) return null;

  const session: Session = JSON.parse(data);

  // 更新最后活动时间
  session.lastActivity = Date.now();
  await redis.setex(
    `session:${sessionId}`,
    SESSION_TTL,
    JSON.stringify(session),
  );

  return session;
}

async function invalidateAllUserSessions(userId: string): Promise<void> {
  const sessionIds = await redis.smembers(`user-sessions:${userId}`);
  if (sessionIds.length > 0) {
    await redis.del(...sessionIds.map((id) => `session:${id}`));
    await redis.del(`user-sessions:${userId}`);
  }
}

API 密钥策略

interface ApiKey {
  id: string;
  prefix: string; // 前8个字符(用于标识)
  hash: string; // 哈希密钥
  name: string;
  scopes: string[];
  rateLimit: number;
  expiresAt: Date | null;
  lastUsedAt: Date | null;
}

// 生成带前缀的API密钥以便轻松标识
function generateApiKey(): { key: string; prefix: string; hash: string } {
  const key = `sk_live_${crypto.randomBytes(32).toString("base64url")}`;
  return {
    key,
    prefix: key.substring(0, 16),
    hash: crypto.createHash("sha256").update(key).digest("hex"),
  };
}

// 验证和速率限制
async function validateApiKey(key: string): Promise<ApiKey> {
  const hash = crypto.createHash("sha256").update(key).digest("hex");
  const apiKey = await db.apiKeys.findUnique({ where: { hash } });

  if (!apiKey) throw new UnauthorizedError("无效API密钥");
  if (apiKey.expiresAt && apiKey.expiresAt < new Date()) {
    throw new UnauthorizedError("API密钥已过期");
  }

  // 检查速率限制
  const rateLimitKey = `ratelimit:${apiKey.id}`;
  const requests = await redis.incr(rateLimitKey);
  if (requests === 1) await redis.expire(rateLimitKey, 60);
  if (requests > apiKey.rateLimit) {
    throw new TooManyRequestsError("超出速率限制");
  }

  // 更新最后使用时间(异步,不等待)
  db.apiKeys.update({
    where: { id: apiKey.id },
    data: { lastUsedAt: new Date() },
  });

  return apiKey;
}

密码哈希

import argon2 from "argon2";
import bcrypt from "bcrypt";

// Argon2(新实现推荐)
async function hashPasswordArgon2(password: string): Promise<string> {
  return argon2.hash(password, {
    type: argon2.argon2id, // 混合模式
    memoryCost: 65536, // 64 MB
    timeCost: 3, // 3次迭代
    parallelism: 4, // 4线程
  });
}

async function verifyPasswordArgon2(
  hash: string,
  password: string,
): Promise<boolean> {
  return argon2.verify(hash, password);
}

// bcrypt(广泛支持)
const BCRYPT_ROUNDS = 12;

async function hashPasswordBcrypt(password: string): Promise<string> {
  return bcrypt.hash(password, BCRYPT_ROUNDS);
}

async function verifyPasswordBcrypt(
  hash: string,
  password: string,
): Promise<boolean> {
  return bcrypt.compare(password, hash);
}

// 密码强度验证
function validatePasswordStrength(password: string): {
  valid: boolean;
  errors: string[];
} {
  const errors: string[] = [];

  if (password.length < 12)
    errors.push("密码必须至少12个字符");
  if (!/[A-Z]/.test(password))
    errors.push("密码必须包含大写字母");
  if (!/[a-z]/.test(password))
    errors.push("密码必须包含小写字母");
  if (!/[0-9]/.test(password)) errors.push("密码必须包含数字");
  if (!/[^A-Za-z0-9]/.test(password))
    errors.push("密码必须包含特殊字符");

  return { valid: errors.length === 0, errors };
}

MFA 实现

import { authenticator } from "otplib";
import QRCode from "qrcode";

// TOTP 设置
async function setupTOTP(
  userId: string,
  email: string,
): Promise<{ secret: string; qrCode: string }> {
  const secret = authenticator.generateSecret();
  const otpauth = authenticator.keyuri(email, "MyApp", secret);
  const qrCode = await QRCode.toDataURL(otpauth);

  // 在验证前临时存储加密密钥
  await redis.setex(`mfa-setup:${userId}`, 600, encrypt(secret));

  return { secret, qrCode };
}

// 验证 TOTP
function verifyTOTP(secret: string, token: string): boolean {
  return authenticator.verify({ token, secret });
}

// 备份代码生成
function generateBackupCodes(): { codes: string[]; hashes: string[] } {
  const codes = Array.from({ length: 10 }, () =>
    crypto.randomBytes(4).toString("hex").toUpperCase(),
  );
  const hashes = codes.map((code) =>
    crypto.createHash("sha256").update(code).digest("hex"),
  );
  return { codes, hashes };
}

// 完成 MFA 验证流程
async function verifyMFA(userId: string, code: string): Promise<boolean> {
  const user = await db.users.findUnique({
    where: { id: userId },
    include: { mfaSettings: true },
  });

  if (!user?.mfaSettings?.enabled) return true;

  // 首先尝试 TOTP
  if (verifyTOTP(decrypt(user.mfaSettings.totpSecret), code)) {
    return true;
  }

  // 尝试备份代码
  const codeHash = crypto.createHash("sha256").update(code).digest("hex");
  const backupCode = user.mfaSettings.backupCodes.find(
    (bc) => bc.hash === codeHash && !bc.usedAt,
  );

  if (backupCode) {
    await db.backupCodes.update({
      where: { id: backupCode.id },
      data: { usedAt: new Date() },
    });
    return true;
  }

  return false;
}

安全考虑

关键安全检查清单(供高级软件工程师审核):

  1. 凭证存储

    • 切勿记录密码、令牌或API密钥
    • 使用Argon2id或bcrypt(12+轮次)哈希密码
    • 在数据库存储前哈希API密钥
    • 静态加密刷新令牌和MFA密钥
    • 切勿在错误消息中返回敏感数据
  2. 令牌安全

    • 验证所有令牌声明(签名、过期时间、签发者、受众、生效时间)
    • 使用RS256或ES256进行JWT签名(分布式系统中切勿使用HS256)
    • 设置最小令牌过期时间(访问令牌:15分钟,刷新令牌:最多7天)
    • 为注销实现令牌撤销列表
    • 所有公共客户端(SPA、移动应用)使用PKCE
  3. 攻击预防

    • 在认证端点上实施速率限制(15分钟内5次尝试)
    • 登录失败后账户锁定(10次失败 = 30分钟锁定)
    • 使用时间安全比较进行密码/令牌验证
    • 防止用户枚举(无效用户/密码使用相同错误)
    • 验证重定向URI是否在白名单内(防止开放重定向)
  4. 会话安全

    • 在cookie上设置httpOnly、secure、sameSite=strict
    • 权限提升后重新生成会话ID
    • 实现绝对超时(24小时)和空闲超时(30分钟)
    • 更改密码时清除所有会话
    • 检测并警示不同IP的并发会话
  5. 传输安全

    • 要求所有认证端点使用HTTPS(HSTS头)
    • 使用安全WebSocket(wss://)进行实时认证
    • 验证Content-Type头(防止CSRF)
    • 严格设置CORS策略
  6. 合规与隐私

    • 记录认证事件(登录、注销、失败)以供审计
    • 实施个人身份信息数据保留策略
    • 支持账户删除(GDPR被遗忘权)
    • 提供数据导出(GDPR数据可携权)
    • 如果适用,考虑SOC2、HIPAA、PCI-DSS要求

要避免的常见漏洞

  • 时序攻击(使用crypto.timingSafeEqual)
  • JWT算法混淆(始终指定允许的算法)
  • 会话固定(登录时重新生成ID)
  • 不安全的直接对象引用(验证资源所有权)
  • 大规模赋值(验证所有输入字段)
  • 访问控制破坏(默认拒绝,明确允许)

最佳实践

  1. 令牌安全

    • 使用短生命周期访问令牌(15分钟或更短)
    • 安全存储刷新令牌(httpOnly cookie、加密存储)
    • 实现刷新令牌的令牌轮换
    • 始终验证令牌签名、过期时间、签发者和受众
  2. 密码安全

    • 新实现使用Argon2id
    • 切勿存储明文密码
    • 失败尝试后实现账户锁定
    • 使用带有时限令牌的安全密码重置流程
  3. 会话安全

    • 认证后重新生成会话ID
    • 实现绝对和空闲超时
    • 适将会话绑定到用户代理/IP
    • 为用户提供会话管理界面
  4. API 密钥安全

    • 存储前哈希API密钥
    • 使用前缀进行密钥标识
    • 实现作用域和速率限制
    • 允许无需停机的密钥轮换
  5. MFA 最佳实践

    • 提供多种MFA方法(TOTP、WebAuthn、短信备份)
    • 设置过程中提供备份代码
    • 允许信任设备记住
    • 敏感操作需要重新验证MFA

示例

包含 MFA 的完整登录流程

async function login(
  email: string,
  password: string,
  mfaCode?: string,
): Promise<AuthResponse> {
  // 查找用户
  const user = await db.users.findUnique({ where: { email } });
  if (!user) throw new UnauthorizedError("无效凭据");

  // 检查账户锁定
  if (user.lockedUntil && user.lockedUntil > new Date()) {
    throw new UnauthorizedError("账户暂时锁定");
  }

  // 验证密码
  const validPassword = await verifyPasswordArgon2(user.passwordHash, password);
  if (!validPassword) {
    await incrementFailedAttempts(user.id);
    throw new UnauthorizedError("无效凭据");
  }

  // 检查 MFA
  if (user.mfaEnabled) {
    if (!mfaCode) {
      return { requiresMfa: true, mfaToken: generateMfaToken(user.id) };
    }
    const validMfa = await verifyMFA(user.id, mfaCode);
    if (!validMfa) throw new UnauthorizedError("无效 MFA 代码");
  }

  // 重置失败尝试
  await db.users.update({
    where: { id: user.id },
    data: { failedAttempts: 0, lockedUntil: null },
  });

  // 生成令牌
  const accessToken = signToken({ sub: user.id, roles: user.roles });
  const refreshToken = await createRefreshToken(user.id);

  return { accessToken, refreshToken };
}