name: auth description: | 身份验证与授权模式,包括OAuth2、JWT、RBAC/ABAC、会话管理、API密钥、密码哈希和MFA。
使用时机:当实现登录流程、访问控制、身份管理、令牌、权限、会话处理、API密钥认证或MFA时使用。 不要用于:安全漏洞扫描(使用 /security-scan)、安全审计(使用 /security-audit)、威胁建模(使用 /threat-model)。
触发器:login、logout、signin、signup、authentication、authorization、password、credential、token、JWT、OAuth、OAuth2、OIDC、SSO、SAML、session、cookie、RBAC、ABAC、permissions、roles、MFA、2FA、TOTP、API key、PKCE。 triggers:
- login
- logout
- signin
- signup
- register
- authentication
- authorization
- password
- credential
- token
- JWT
- OAuth
- OAuth2
- OIDC
- OpenID
- SSO
- SAML
- session
- cookie
- refresh token
- access token
- bearer
- authorization header
- auth header
- 401
- 403
- forbidden
- unauthorized
- RBAC
- ABAC
- permissions
- roles
- access control
- identity
- MFA
- 2FA
- two-factor
- multi-factor
- TOTP
- API key
- auth flow
- PKCE
- client credentials
身份验证与授权
概述
此技能涵盖现代应用的综合身份验证与授权策略。它包括身份验证(验证身份)、授权(访问控制)以及跨Web、移动和API环境的凭证安全管理。
使用时机
软件工程师 (Sonnet) - 在以下情况下使用:
- 实现标准认证流程(登录、注销、密码重置)
- 为现有应用添加JWT或会话处理
- 实现RBAC/ABAC模式并定义需求
- 集成到现有的OAuth2提供商
- 添加MFA或API密钥认证
高级软件工程师 (Opus) - 在以下情况下升级使用:
- 从头设计认证架构
- 选择认证策略(JWT vs 会话、OAuth流程)
- 评估不同访问控制模型之间的权衡
- 规划令牌轮换、刷新策略或会话生命周期
- 做出跨领域的安全决策
高级软件工程师 (Opus) - 在以下情况下请求审核:
- 实现密码哈希或凭证存储
- 处理敏感令牌(刷新令牌、API密钥)
- 实现速率限制或暴力破解保护
- 添加MFA或阶梯式认证
- 处理个人身份信息、合规性或监管要求
- 在生产之前进行任何认证/授权实现
- 设置身份提供商(如Keycloak、Auth0、Cognito)
- 配置SSO、SAML或OIDC集成
- 扩展会话存储(Redis集群、分布式会话)
- 管理密钥、密钥轮换基础设施
- 设置用于JWT签名的证书管理
关键概念
OAuth2 流程
授权码流程 - 最适合服务器端应用:
// 1. 重定向用户到授权服务器
const authUrl = new URL("https://auth.example.com/authorize");
authUrl.searchParams.set("response_type", "code");
authUrl.searchParams.set("client_id", CLIENT_ID);
authUrl.searchParams.set("redirect_uri", REDIRECT_URI);
authUrl.searchParams.set("scope", "openid profile email");
authUrl.searchParams.set("state", generateSecureState());
// 2. 交换代码以获取令牌(服务器端)
async function exchangeCode(code: string): Promise<TokenResponse> {
const response = await fetch("https://auth.example.com/token", {
method: "POST",
headers: { "Content-Type": "application/x-www-form-urlencoded" },
body: new URLSearchParams({
grant_type: "authorization_code",
code,
client_id: CLIENT_ID,
client_secret: CLIENT_SECRET,
redirect_uri: REDIRECT_URI,
}),
});
return response.json();
}
PKCE 流程 - 公共客户端(SPA、移动应用)必需:
// 生成代码验证器和挑战
function generatePKCE(): { verifier: string; challenge: string } {
const verifier = base64UrlEncode(crypto.getRandomValues(new Uint8Array(32)));
const challenge = base64UrlEncode(
await crypto.subtle.digest("SHA-256", new TextEncoder().encode(verifier)),
);
return { verifier, challenge };
}
// 包含在授权请求中
authUrl.searchParams.set("code_challenge", challenge);
authUrl.searchParams.set("code_challenge_method", "S256");
// 在令牌交换中包含验证器
body.set("code_verifier", verifier);
客户端凭证流程 - 用于服务到服务通信:
async function getServiceToken(): Promise<string> {
const response = await fetch("https://auth.example.com/token", {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
Authorization: `Basic ${btoa(`${CLIENT_ID}:${CLIENT_SECRET}`)}`,
},
body: new URLSearchParams({
grant_type: "client_credentials",
scope: "api:read api:write",
}),
});
return (await response.json()).access_token;
}
JWT 处理
令牌结构与签名:
import jwt from "jsonwebtoken";
interface TokenPayload {
sub: string; // 主题(用户ID)
iss: string; // 签发者
aud: string; // 受众
exp: number; // 过期时间
iat: number; // 签发时间
roles: string[]; // 自定义声明
}
// 使用RS256签名(非对称 - 生产环境推荐)
function signToken(payload: Omit<TokenPayload, "iat" | "exp">): string {
return jwt.sign(
{ ...payload, iat: Math.floor(Date.now() / 1000) },
PRIVATE_KEY,
{
algorithm: "RS256",
expiresIn: "15m",
issuer: "https://api.example.com",
audience: "https://app.example.com",
},
);
}
// 验证令牌
function verifyToken(token: string): TokenPayload {
return jwt.verify(token, PUBLIC_KEY, {
algorithms: ["RS256"],
issuer: "https://api.example.com",
audience: "https://app.example.com",
}) as TokenPayload;
}
刷新令牌模式:
interface TokenPair {
accessToken: string; // 短生命周期(15分钟)
refreshToken: string; // 长生命周期(7天),安全存储
}
async function refreshTokens(refreshToken: string): Promise<TokenPair> {
// 验证刷新令牌存在于数据库中(允许撤销)
const storedToken = await db.refreshTokens.findUnique({
where: { token: hashToken(refreshToken) },
});
if (!storedToken || storedToken.expiresAt < new Date()) {
throw new UnauthorizedError("无效刷新令牌");
}
// 轮换刷新令牌(一次性使用)
await db.refreshTokens.delete({ where: { id: storedToken.id } });
const newRefreshToken = generateSecureToken();
await db.refreshTokens.create({
data: {
token: hashToken(newRefreshToken),
userId: storedToken.userId,
expiresAt: addDays(new Date(), 7),
},
});
return {
accessToken: signToken({ sub: storedToken.userId, roles: [] }),
refreshToken: newRefreshToken,
};
}
RBAC(基于角色的访问控制)
// 定义角色和权限
const ROLES = {
admin: [
"users:read",
"users:write",
"users:delete",
"reports:read",
"settings:write",
],
manager: ["users:read", "users:write", "reports:read"],
user: ["users:read:own", "reports:read:own"],
} as const;
type Role = keyof typeof ROLES;
type Permission = (typeof ROLES)[Role][number];
// 权限检查中间件
function requirePermission(permission: Permission) {
return (req: Request, res: Response, next: NextFunction) => {
const userRoles: Role[] = req.user.roles;
const userPermissions = userRoles.flatMap((role) => ROLES[role]);
// 处理:own后缀以实现资源级权限
const [resource, action, scope] = permission.split(":");
const basePermission = `${resource}:${action}`;
const hasPermission =
userPermissions.includes(permission) ||
(scope === "own" && userPermissions.includes(basePermission));
if (!hasPermission) {
return res.status(403).json({ error: "禁止访问" });
}
next();
};
}
// 使用
app.delete("/users/:id", requirePermission("users:delete"), deleteUser);
ABAC(基于属性的访问控制)
interface PolicyContext {
subject: { id: string; roles: string[]; department: string };
resource: { type: string; owner: string; classification: string };
action: string;
environment: { time: Date; ip: string };
}
type Policy = (ctx: PolicyContext) => boolean;
const policies: Policy[] = [
// 用户可以访问自己的资源
(ctx) => ctx.resource.owner === ctx.subject.id,
// 经理可以访问其部门的资源
(ctx) =>
ctx.subject.roles.includes("manager") &&
ctx.resource.department === ctx.subject.department,
// 在非工作时间不允许访问机密资源
(ctx) => {
if (ctx.resource.classification === "confidential") {
const hour = ctx.environment.time.getHours();
return hour >= 9 && hour < 17;
}
return true;
},
];
function checkAccess(ctx: PolicyContext): boolean {
return policies.every((policy) => policy(ctx));
}
会话管理
import { Redis } from "ioredis";
const redis = new Redis();
const SESSION_TTL = 24 * 60 * 60; // 24小时
interface Session {
userId: string;
createdAt: number;
lastActivity: number;
userAgent: string;
ip: string;
}
async function createSession(userId: string, req: Request): Promise<string> {
const sessionId = crypto.randomUUID();
const session: Session = {
userId,
createdAt: Date.now(),
lastActivity: Date.now(),
userAgent: req.headers["user-agent"] || "",
ip: req.ip,
};
await redis.setex(
`session:${sessionId}`,
SESSION_TTL,
JSON.stringify(session),
);
await redis.sadd(`user-sessions:${userId}`, sessionId);
return sessionId;
}
async function validateSession(sessionId: string): Promise<Session | null> {
const data = await redis.get(`session:${sessionId}`);
if (!data) return null;
const session: Session = JSON.parse(data);
// 更新最后活动时间
session.lastActivity = Date.now();
await redis.setex(
`session:${sessionId}`,
SESSION_TTL,
JSON.stringify(session),
);
return session;
}
async function invalidateAllUserSessions(userId: string): Promise<void> {
const sessionIds = await redis.smembers(`user-sessions:${userId}`);
if (sessionIds.length > 0) {
await redis.del(...sessionIds.map((id) => `session:${id}`));
await redis.del(`user-sessions:${userId}`);
}
}
API 密钥策略
interface ApiKey {
id: string;
prefix: string; // 前8个字符(用于标识)
hash: string; // 哈希密钥
name: string;
scopes: string[];
rateLimit: number;
expiresAt: Date | null;
lastUsedAt: Date | null;
}
// 生成带前缀的API密钥以便轻松标识
function generateApiKey(): { key: string; prefix: string; hash: string } {
const key = `sk_live_${crypto.randomBytes(32).toString("base64url")}`;
return {
key,
prefix: key.substring(0, 16),
hash: crypto.createHash("sha256").update(key).digest("hex"),
};
}
// 验证和速率限制
async function validateApiKey(key: string): Promise<ApiKey> {
const hash = crypto.createHash("sha256").update(key).digest("hex");
const apiKey = await db.apiKeys.findUnique({ where: { hash } });
if (!apiKey) throw new UnauthorizedError("无效API密钥");
if (apiKey.expiresAt && apiKey.expiresAt < new Date()) {
throw new UnauthorizedError("API密钥已过期");
}
// 检查速率限制
const rateLimitKey = `ratelimit:${apiKey.id}`;
const requests = await redis.incr(rateLimitKey);
if (requests === 1) await redis.expire(rateLimitKey, 60);
if (requests > apiKey.rateLimit) {
throw new TooManyRequestsError("超出速率限制");
}
// 更新最后使用时间(异步,不等待)
db.apiKeys.update({
where: { id: apiKey.id },
data: { lastUsedAt: new Date() },
});
return apiKey;
}
密码哈希
import argon2 from "argon2";
import bcrypt from "bcrypt";
// Argon2(新实现推荐)
async function hashPasswordArgon2(password: string): Promise<string> {
return argon2.hash(password, {
type: argon2.argon2id, // 混合模式
memoryCost: 65536, // 64 MB
timeCost: 3, // 3次迭代
parallelism: 4, // 4线程
});
}
async function verifyPasswordArgon2(
hash: string,
password: string,
): Promise<boolean> {
return argon2.verify(hash, password);
}
// bcrypt(广泛支持)
const BCRYPT_ROUNDS = 12;
async function hashPasswordBcrypt(password: string): Promise<string> {
return bcrypt.hash(password, BCRYPT_ROUNDS);
}
async function verifyPasswordBcrypt(
hash: string,
password: string,
): Promise<boolean> {
return bcrypt.compare(password, hash);
}
// 密码强度验证
function validatePasswordStrength(password: string): {
valid: boolean;
errors: string[];
} {
const errors: string[] = [];
if (password.length < 12)
errors.push("密码必须至少12个字符");
if (!/[A-Z]/.test(password))
errors.push("密码必须包含大写字母");
if (!/[a-z]/.test(password))
errors.push("密码必须包含小写字母");
if (!/[0-9]/.test(password)) errors.push("密码必须包含数字");
if (!/[^A-Za-z0-9]/.test(password))
errors.push("密码必须包含特殊字符");
return { valid: errors.length === 0, errors };
}
MFA 实现
import { authenticator } from "otplib";
import QRCode from "qrcode";
// TOTP 设置
async function setupTOTP(
userId: string,
email: string,
): Promise<{ secret: string; qrCode: string }> {
const secret = authenticator.generateSecret();
const otpauth = authenticator.keyuri(email, "MyApp", secret);
const qrCode = await QRCode.toDataURL(otpauth);
// 在验证前临时存储加密密钥
await redis.setex(`mfa-setup:${userId}`, 600, encrypt(secret));
return { secret, qrCode };
}
// 验证 TOTP
function verifyTOTP(secret: string, token: string): boolean {
return authenticator.verify({ token, secret });
}
// 备份代码生成
function generateBackupCodes(): { codes: string[]; hashes: string[] } {
const codes = Array.from({ length: 10 }, () =>
crypto.randomBytes(4).toString("hex").toUpperCase(),
);
const hashes = codes.map((code) =>
crypto.createHash("sha256").update(code).digest("hex"),
);
return { codes, hashes };
}
// 完成 MFA 验证流程
async function verifyMFA(userId: string, code: string): Promise<boolean> {
const user = await db.users.findUnique({
where: { id: userId },
include: { mfaSettings: true },
});
if (!user?.mfaSettings?.enabled) return true;
// 首先尝试 TOTP
if (verifyTOTP(decrypt(user.mfaSettings.totpSecret), code)) {
return true;
}
// 尝试备份代码
const codeHash = crypto.createHash("sha256").update(code).digest("hex");
const backupCode = user.mfaSettings.backupCodes.find(
(bc) => bc.hash === codeHash && !bc.usedAt,
);
if (backupCode) {
await db.backupCodes.update({
where: { id: backupCode.id },
data: { usedAt: new Date() },
});
return true;
}
return false;
}
安全考虑
关键安全检查清单(供高级软件工程师审核):
-
凭证存储
- 切勿记录密码、令牌或API密钥
- 使用Argon2id或bcrypt(12+轮次)哈希密码
- 在数据库存储前哈希API密钥
- 静态加密刷新令牌和MFA密钥
- 切勿在错误消息中返回敏感数据
-
令牌安全
- 验证所有令牌声明(签名、过期时间、签发者、受众、生效时间)
- 使用RS256或ES256进行JWT签名(分布式系统中切勿使用HS256)
- 设置最小令牌过期时间(访问令牌:15分钟,刷新令牌:最多7天)
- 为注销实现令牌撤销列表
- 所有公共客户端(SPA、移动应用)使用PKCE
-
攻击预防
- 在认证端点上实施速率限制(15分钟内5次尝试)
- 登录失败后账户锁定(10次失败 = 30分钟锁定)
- 使用时间安全比较进行密码/令牌验证
- 防止用户枚举(无效用户/密码使用相同错误)
- 验证重定向URI是否在白名单内(防止开放重定向)
-
会话安全
- 在cookie上设置httpOnly、secure、sameSite=strict
- 权限提升后重新生成会话ID
- 实现绝对超时(24小时)和空闲超时(30分钟)
- 更改密码时清除所有会话
- 检测并警示不同IP的并发会话
-
传输安全
- 要求所有认证端点使用HTTPS(HSTS头)
- 使用安全WebSocket(wss://)进行实时认证
- 验证Content-Type头(防止CSRF)
- 严格设置CORS策略
-
合规与隐私
- 记录认证事件(登录、注销、失败)以供审计
- 实施个人身份信息数据保留策略
- 支持账户删除(GDPR被遗忘权)
- 提供数据导出(GDPR数据可携权)
- 如果适用,考虑SOC2、HIPAA、PCI-DSS要求
要避免的常见漏洞:
- 时序攻击(使用crypto.timingSafeEqual)
- JWT算法混淆(始终指定允许的算法)
- 会话固定(登录时重新生成ID)
- 不安全的直接对象引用(验证资源所有权)
- 大规模赋值(验证所有输入字段)
- 访问控制破坏(默认拒绝,明确允许)
最佳实践
-
令牌安全
- 使用短生命周期访问令牌(15分钟或更短)
- 安全存储刷新令牌(httpOnly cookie、加密存储)
- 实现刷新令牌的令牌轮换
- 始终验证令牌签名、过期时间、签发者和受众
-
密码安全
- 新实现使用Argon2id
- 切勿存储明文密码
- 失败尝试后实现账户锁定
- 使用带有时限令牌的安全密码重置流程
-
会话安全
- 认证后重新生成会话ID
- 实现绝对和空闲超时
- 适将会话绑定到用户代理/IP
- 为用户提供会话管理界面
-
API 密钥安全
- 存储前哈希API密钥
- 使用前缀进行密钥标识
- 实现作用域和速率限制
- 允许无需停机的密钥轮换
-
MFA 最佳实践
- 提供多种MFA方法(TOTP、WebAuthn、短信备份)
- 设置过程中提供备份代码
- 允许信任设备记住
- 敏感操作需要重新验证MFA
示例
包含 MFA 的完整登录流程
async function login(
email: string,
password: string,
mfaCode?: string,
): Promise<AuthResponse> {
// 查找用户
const user = await db.users.findUnique({ where: { email } });
if (!user) throw new UnauthorizedError("无效凭据");
// 检查账户锁定
if (user.lockedUntil && user.lockedUntil > new Date()) {
throw new UnauthorizedError("账户暂时锁定");
}
// 验证密码
const validPassword = await verifyPasswordArgon2(user.passwordHash, password);
if (!validPassword) {
await incrementFailedAttempts(user.id);
throw new UnauthorizedError("无效凭据");
}
// 检查 MFA
if (user.mfaEnabled) {
if (!mfaCode) {
return { requiresMfa: true, mfaToken: generateMfaToken(user.id) };
}
const validMfa = await verifyMFA(user.id, mfaCode);
if (!validMfa) throw new UnauthorizedError("无效 MFA 代码");
}
// 重置失败尝试
await db.users.update({
where: { id: user.id },
data: { failedAttempts: 0, lockedUntil: null },
});
// 生成令牌
const accessToken = signToken({ sub: user.id, roles: user.roles });
const refreshToken = await createRefreshToken(user.id);
return { accessToken, refreshToken };
}