name: api-hardening description: API安全强化模式。在实施速率限制、输入验证、CORS配置、API密钥管理、请求节流或保护端点免受滥用时使用。涵盖REST API的深度防御策略,并提供Express、FastAPI和Serverless的实用实现。
API安全强化
保护API免受滥用、注入攻击和数据泄露的深度防御模式。
速率限制
为什么重要
没有速率限制:
- 暴力攻击会成功
- API会因意外或故意而遭受DDoS攻击
- 一个不良行为者会影响所有用户
- 你会收到云服务商的意外账单
使用express-rate-limit的Express.js
const rateLimit = require('express-rate-limit');
const RedisStore = require('rate-limit-redis').default;
const { createClient } = require('redis');
const redisClient = createClient({ url: process.env.REDIS_URL });
redisClient.connect();
// 通用API速率限制
const apiLimiter = rateLimit({
store: new RedisStore({ sendCommand: (...args) => redisClient.sendCommand(args) }),
windowMs: 15 * 60 * 1000, // 15分钟
max: 100, // 每个窗口100个请求
standardHeaders: true,
legacyHeaders: false,
message: { error: '请求过多,请稍后重试' },
skip: (req) => {
// 跳过健康检查的速率限制
return req.path === '/health';
}
});
// 认证端点的严格限制
const authLimiter = rateLimit({
store: new RedisStore({ sendCommand: (...args) => redisClient.sendCommand(args) }),
windowMs: 15 * 60 * 1000, // 15分钟
max: 5, // 5次尝试
message: { error: '登录尝试过多,请在15分钟后重试' },
keyGenerator: (req) => {
// 通过IP + 电子邮件进行速率限制,防止分布式攻击
return `${req.ip}-${req.body?.email || 'unknown'}`;
}
});
// 密码重置的非常严格限制
const passwordResetLimiter = rateLimit({
store: new RedisStore({ sendCommand: (...args) => redisClient.sendCommand(args) }),
windowMs: 60 * 60 * 1000, // 1小时
max: 3, // 每小时3个请求
message: { error: '密码重置请求过多' }
});
// 应用限制器
app.use('/api/', apiLimiter);
app.use('/auth/login', authLimiter);
app.use('/auth/forgot-password', passwordResetLimiter);
滑动窗口实现(自定义)
// 基于Redis的滑动窗口速率限制器
class SlidingWindowRateLimiter {
constructor(redisClient, options = {}) {
this.redis = redisClient;
this.windowMs = options.windowMs || 60000; // 默认1分钟
this.maxRequests = options.maxRequests || 100;
this.keyPrefix = options.keyPrefix || 'ratelimit';
}
async isAllowed(identifier) {
const now = Date.now();
const windowStart = now - this.windowMs;
const key = `${this.keyPrefix}:${identifier}`;
// 移除旧条目并计数最近的
const multi = this.redis.multi();
multi.zRemRangeByScore(key, 0, windowStart);
multi.zCard(key);
multi.zAdd(key, { score: now, value: `${now}-${Math.random()}` });
multi.expire(key, Math.ceil(this.windowMs / 1000));
const results = await multi.exec();
const requestCount = results[1];
return {
allowed: requestCount < this.maxRequests,
remaining: Math.max(0, this.maxRequests - requestCount - 1),
resetAt: now + this.windowMs
};
}
}
// Express中间件
function createRateLimitMiddleware(limiter) {
return async (req, res, next) => {
const identifier = req.ip;
const result = await limiter.isAllowed(identifier);
res.setHeader('X-RateLimit-Limit', limiter.maxRequests);
res.setHeader('X-RateLimit-Remaining', result.remaining);
res.setHeader('X-RateLimit-Reset', result.resetAt);
if (!result.allowed) {
return res.status(429).json({ error: '超出速率限制' });
}
next();
};
}
使用API密钥的每用户速率限制
// 基于层级的限制
const tierLimits = {
free: { windowMs: 60000, max: 10 },
pro: { windowMs: 60000, max: 100 },
enterprise: { windowMs: 60000, max: 1000 }
};
async function apiKeyRateLimiter(req, res, next) {
const apiKey = req.headers['x-api-key'];
if (!apiKey) {
return res.status(401).json({ error: '需要API密钥' });
}
// 查找API密钥
const keyData = await db.query(
'SELECT user_id, tier, revoked FROM api_keys WHERE key_hash = $1',
[hashApiKey(apiKey)]
);
if (keyData.rows.length === 0 || keyData.rows[0].revoked) {
return res.status(401).json({ error: '无效API密钥' });
}
const { user_id, tier } = keyData.rows[0];
const limits = tierLimits[tier] || tierLimits.free;
// 按用户进行速率限制,防止密钥轮换滥用
const limiter = new SlidingWindowRateLimiter(redisClient, {
...limits,
keyPrefix: 'apikey'
});
const result = await limiter.isAllowed(user_id);
res.setHeader('X-RateLimit-Limit', limits.max);
res.setHeader('X-RateLimit-Remaining', result.remaining);
res.setHeader('X-RateLimit-Reset', result.resetAt);
if (!result.allowed) {
return res.status(429).json({ error: '超出速率限制' });
}
req.userId = user_id;
next();
}
输入验证
使用Zod进行验证(TypeScript/JavaScript)
const { z } = require('zod');
// 定义模式
const createUserSchema = z.object({
email: z.string().email().max(255),
password: z.string().min(12).max(128),
name: z.string().min(1).max(100).optional()
});
const updateProfileSchema = z.object({
name: z.string().min(1).max(100).optional(),
bio: z.string().max(500).optional(),
website: z.string().url().optional().or(z.literal(''))
});
const paginationSchema = z.object({
page: z.coerce.number().int().min(1).default(1),
limit: z.coerce.number().int().min(1).max(100).default(20)
});
// 中间件工厂
function validate(schema) {
return (req, res, next) => {
const result = schema.safeParse(req.body);
if (!result.success) {
return res.status(400).json({
error: '验证失败',
details: result.error.issues.map(issue => ({
field: issue.path.join('.'),
message: issue.message
}))
});
}
req.validated = result.data;
next();
};
}
// 用法
app.post('/users', validate(createUserSchema), async (req, res) => {
const { email, password, name } = req.validated;
// 数据已验证和类型化
});
净化
const createDOMPurify = require('dompurify');
const { JSDOM } = require('jsdom');
const validator = require('validator');
const window = new JSDOM('').window;
const DOMPurify = createDOMPurify(window);
// HTML净化(当必须允许一些HTML时)
function sanitizeHtml(dirty) {
return DOMPurify.sanitize(dirty, {
ALLOWED_TAGS: ['b', 'i', 'em', 'strong', 'a', 'p', 'br'],
ALLOWED_ATTR: ['href'],
ALLOW_DATA_ATTR: false
});
}
// 字符串净化
function sanitizeString(str) {
if (typeof str !== 'string') return '';
return str
.trim()
.slice(0, 10000) // 最大长度
.replace(/[\x00-\x1F\x7F]/g, ''); // 移除控制字符
}
// SQL安全标识符(用于动态列名)
function sanitizeIdentifier(str) {
// 仅允许字母数字和下划线
if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(str)) {
throw new Error('无效标识符');
}
return str;
}
// 文件名净化
function sanitizeFilename(filename) {
return filename
.replace(/[^a-zA-Z0-9._-]/g, '_')
.replace(/\.{2,}/g, '.')
.slice(0, 255);
}
防止SQL注入
// 错误:字符串插值
const query = `SELECT * FROM users WHERE id = ${userId}`;
// 错误:字符串连接
const query = 'SELECT * FROM users WHERE id = ' + userId;
// 错误:使用用户输入的模板文字
const query = `SELECT * FROM users WHERE name = '${name}'`;
// 正确:参数化查询(PostgreSQL)
const result = await db.query(
'SELECT * FROM users WHERE id = $1',
[userId]
);
// 正确:参数化查询(MySQL)
const result = await db.query(
'SELECT * FROM users WHERE id = ?',
[userId]
);
// 正确:查询构建器(Knex)
const users = await knex('users')
.where('id', userId)
.first();
// 正确:ORM(Prisma)
const user = await prisma.user.findUnique({
where: { id: userId }
});
// 当需要动态列名时(罕见)
const allowedColumns = ['name', 'email', 'created_at'];
const sortColumn = allowedColumns.includes(req.query.sort)
? req.query.sort
: 'created_at';
const query = `SELECT * FROM users ORDER BY ${sortColumn}`; // 安全,因为白名单
防止XSS
// 错误:直接插入用户内容
res.send(`<h1>Hello ${userName}</h1>`);
// 正确:使用具有自动转义的模板引擎
// EJS(默认使用<%= %>自动转义)
res.render('greeting', { name: userName });
// 正确:需要时手动转义
const escapeHtml = require('escape-html');
res.send(`<h1>Hello ${escapeHtml(userName)}</h1>`);
// 正确:为JSON响应设置Content-Type
res.json({ name: userName }); // Express设置正确头
// 在React/Vue/Angular中:框架默认处理转义
// 只需避免使用dangerouslySetInnerHTML / v-html / [innerHTML]
CORS配置
Express.js
const cors = require('cors');
// 开发:允许本地主机
const developmentOrigins = [
'http://localhost:3000',
'http://localhost:5173',
'http://127.0.0.1:3000'
];
// 生产:仅限特定域名
const productionOrigins = [
'https://yourapp.com',
'https://www.yourapp.com',
'https://app.yourapp.com'
];
const allowedOrigins = process.env.NODE_ENV === 'production'
? productionOrigins
: [...productionOrigins, ...developmentOrigins];
const corsOptions = {
origin: (origin, callback) => {
// 允许无来源的请求(移动应用、curl等)
if (!origin) {
return callback(null, true);
}
if (allowedOrigins.includes(origin)) {
callback(null, true);
} else {
callback(new Error('CORS不允许'));
}
},
methods: ['GET', 'POST', 'PUT', 'DELETE', 'PATCH'],
allowedHeaders: ['Content-Type', 'Authorization', 'X-API-Key'],
exposedHeaders: ['X-RateLimit-Limit', 'X-RateLimit-Remaining'],
credentials: true, // 允许cookie
maxAge: 86400 // 预检缓存24小时
};
app.use(cors(corsOptions));
// 处理CORS错误
app.use((err, req, res, next) => {
if (err.message === 'CORS不允许') {
return res.status(403).json({ error: 'CORS不允许' });
}
next(err);
});
常见CORS错误
// 错误:允许所有来源
app.use(cors()); // 默认为'*'
// 错误:允许所有来源并带凭证
app.use(cors({ origin: '*', credentials: true })); // 浏览器会拒绝
// 错误:反射Origin头(允许任何来源)
app.use(cors({
origin: (origin, cb) => cb(null, origin) // 切勿这样做
}));
// 错误:过于宽松的正则表达式
const origin = /yourapp\.com/; // 也匹配evilyourapp.com!
// 正确:精确匹配或严格正则
const origin = /^https:\/\/(www\.)?yourapp\.com$/;
API密钥管理
安全密钥生成和存储
const crypto = require('crypto');
// 生成API密钥
function generateApiKey() {
// 格式:前缀_随机字节
// 前缀帮助识别密钥类型并使其可识别
const prefix = 'sk_live';
const randomPart = crypto.randomBytes(24).toString('base64url');
return `${prefix}_${randomPart}`;
}
// 存储哈希(切勿存储明文密钥)
function hashApiKey(key) {
return crypto.createHash('sha256').update(key).digest('hex');
}
// 创建新API密钥
app.post('/api-keys', requireAuth, async (req, res) => {
const { name } = req.body;
// 生成密钥
const plainKey = generateApiKey();
const keyHash = hashApiKey(plainKey);
// 仅存储哈希
await db.query(
`INSERT INTO api_keys (user_id, key_hash, name, created_at)
VALUES ($1, $2, $3, NOW())`,
[req.userId, keyHash, name]
);
// 仅返回一次明文密钥 - 用户必须保存
res.json({
key: plainKey,
message: '立即保存此密钥,它将不再显示。'
});
});
// 验证API密钥
async function verifyApiKey(key) {
const keyHash = hashApiKey(key);
const result = await db.query(
`SELECT id, user_id, revoked, last_used_at
FROM api_keys WHERE key_hash = $1`,
[keyHash]
);
if (result.rows.length === 0) {
return null;
}
const keyData = result.rows[0];
if (keyData.revoked) {
return null;
}
// 更新最后使用时间戳
await db.query(
'UPDATE api_keys SET last_used_at = NOW() WHERE id = $1',
[keyData.id]
);
return keyData;
}
// 吊销API密钥
app.delete('/api-keys/:id', requireAuth, async (req, res) => {
// 用户只能吊销自己的密钥
await db.query(
'UPDATE api_keys SET revoked = true, revoked_at = NOW() WHERE id = $1 AND user_id = $2',
[req.params.id, req.userId]
);
res.json({ success: true });
});
API密钥中间件
async function apiKeyAuth(req, res, next) {
// 检查多个位置的API密钥
const apiKey = req.headers['x-api-key']
|| req.headers['authorization']?.replace('Bearer ', '')
|| req.query.api_key;
if (!apiKey) {
return res.status(401).json({
error: '需要API密钥',
hint: '在X-API-Key头中传递API密钥'
});
}
const keyData = await verifyApiKey(apiKey);
if (!keyData) {
// 不透露密钥是否存在但已吊销
return res.status(401).json({ error: '无效API密钥' });
}
req.apiKeyId = keyData.id;
req.userId = keyData.user_id;
next();
}
请求大小限制
const express = require('express');
// 全局主体大小限制
app.use(express.json({ limit: '100kb' }));
app.use(express.urlencoded({ limit: '100kb', extended: true }));
// 每路由限制
app.post('/api/upload', express.json({ limit: '10mb' }), (req, res) => {
// 处理大上传
});
// 文件上传限制
const multer = require('multer');
const upload = multer({
limits: {
fileSize: 5 * 1024 * 1024, // 5MB
files: 5 // 最多5个文件
},
fileFilter: (req, file, cb) => {
const allowedTypes = ['image/jpeg', 'image/png', 'image/gif', 'application/pdf'];
if (allowedTypes.includes(file.mimetype)) {
cb(null, true);
} else {
cb(new Error('无效文件类型'));
}
}
});
app.post('/upload', upload.single('file'), (req, res) => {
// 处理上传
});
响应安全
不泄漏信息
// 错误:泄漏堆栈跟踪
app.use((err, req, res, next) => {
res.status(500).json({
error: err.message,
stack: err.stack // 切勿在生产中使用!
});
});
// 正确:生产中的通用错误
app.use((err, req, res, next) => {
console.error(err); // 在服务器端记录完整错误
if (process.env.NODE_ENV === 'production') {
res.status(500).json({ error: '内部服务器错误' });
} else {
res.status(500).json({ error: err.message, stack: err.stack });
}
});
// 错误:暴露数据库结构
res.status(400).json({
error: '重复键值违反唯一约束“users_email_key”'
});
// 正确:用户友好错误
res.status(400).json({
error: '已有此电子邮件的账户'
});
安全头
const helmet = require('helmet');
app.use(helmet());
// 或单独配置
app.use(helmet.contentSecurityPolicy({
directives: {
defaultSrc: ["'self'"],
scriptSrc: ["'self'"],
styleSrc: ["'self'", "'unsafe-inline'"],
imgSrc: ["'self'", "data:", "https:"],
connectSrc: ["'self'", "https://api.yourapp.com"],
fontSrc: ["'self'", "https://fonts.gstatic.com"],
objectSrc: ["'none'"],
upgradeInsecureRequests: []
}
}));
app.use(helmet.hsts({
maxAge: 31536000,
includeSubDomains: true,
preload: true
}));
超时保护
// 请求超时中间件
function timeout(ms) {
return (req, res, next) => {
res.setTimeout(ms, () => {
res.status(408).json({ error: '请求超时' });
});
next();
};
}
app.use(timeout(30000)); // 默认30秒
// 外部API调用超时
async function fetchWithTimeout(url, options = {}, timeoutMs = 5000) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
try {
const response = await fetch(url, {
...options,
signal: controller.signal
});
return response;
} finally {
clearTimeout(timeoutId);
}
}
// 数据库查询超时
const result = await db.query({
text: 'SELECT * FROM large_table WHERE condition = $1',
values: [value],
timeout: 5000 // 5秒查询超时
});
FastAPI(Python)等效
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from slowapi import Limiter
from slowapi.util import get_remote_address
from pydantic import BaseModel, EmailStr, Field
import hashlib
import secrets
app = FastAPI()
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourapp.com"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
# 速率限制
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.post("/api/login")
@limiter.limit("5/minute")
async def login(request: Request, credentials: LoginRequest):
# 处理登录
pass
# 使用Pydantic进行输入验证
class CreateUserRequest(BaseModel):
email: EmailStr
password: str = Field(min_length=12, max_length=128)
name: str = Field(max_length=100, default=None)
@app.post("/users")
async def create_user(user: CreateUserRequest):
# 数据已验证
pass
# API密钥生成
def generate_api_key() -> str:
return f"sk_live_{secrets.token_urlsafe(24)}"
def hash_api_key(key: str) -> str:
return hashlib.sha256(key.encode()).hexdigest()
API安全检查清单
- [ ] 所有端点的速率限制
- [ ] 认证端点的更严格限制
- [ ] 使用模式库进行输入验证
- [ ] 参数化数据库查询
- [ ] 配置特定来源的CORS
- [ ] API密钥存储前哈希
- [ ] 配置请求大小限制
- [ ] 所有外部调用的超时
- [ ] 通过Helmet或等效的安全头
- [ ] 错误消息不泄漏系统信息
- [ ] 所有认证仅通过HTTPS
- [ ] API版本化用于重大更改