API安全强化Skill api-hardening

API安全强化是一种技能,专注于实施多种防御措施以保护Web API免受攻击和滥用。它包括速率限制、输入验证、CORS配置、API密钥管理、请求大小限制和超时保护等策略,旨在防止DDoS攻击、SQL注入、XSS攻击和数据泄露。关键词包括API安全、速率限制、输入验证、CORS、API密钥管理、SQL注入防护、XSS防护。

后端开发 0 次安装 0 次浏览 更新于 3/15/2026

name: api-hardening description: API安全强化模式。在实施速率限制、输入验证、CORS配置、API密钥管理、请求节流或保护端点免受滥用时使用。涵盖REST API的深度防御策略,并提供Express、FastAPI和Serverless的实用实现。

API安全强化

保护API免受滥用、注入攻击和数据泄露的深度防御模式。

速率限制

为什么重要

没有速率限制:

  • 暴力攻击会成功
  • API会因意外或故意而遭受DDoS攻击
  • 一个不良行为者会影响所有用户
  • 你会收到云服务商的意外账单

使用express-rate-limit的Express.js

const rateLimit = require('express-rate-limit');
const RedisStore = require('rate-limit-redis').default;
const { createClient } = require('redis');

const redisClient = createClient({ url: process.env.REDIS_URL });
redisClient.connect();

// 通用API速率限制
const apiLimiter = rateLimit({
  store: new RedisStore({ sendCommand: (...args) => redisClient.sendCommand(args) }),
  windowMs: 15 * 60 * 1000, // 15分钟
  max: 100, // 每个窗口100个请求
  standardHeaders: true,
  legacyHeaders: false,
  message: { error: '请求过多,请稍后重试' },
  skip: (req) => {
    // 跳过健康检查的速率限制
    return req.path === '/health';
  }
});

// 认证端点的严格限制
const authLimiter = rateLimit({
  store: new RedisStore({ sendCommand: (...args) => redisClient.sendCommand(args) }),
  windowMs: 15 * 60 * 1000, // 15分钟
  max: 5, // 5次尝试
  message: { error: '登录尝试过多,请在15分钟后重试' },
  keyGenerator: (req) => {
    // 通过IP + 电子邮件进行速率限制,防止分布式攻击
    return `${req.ip}-${req.body?.email || 'unknown'}`;
  }
});

// 密码重置的非常严格限制
const passwordResetLimiter = rateLimit({
  store: new RedisStore({ sendCommand: (...args) => redisClient.sendCommand(args) }),
  windowMs: 60 * 60 * 1000, // 1小时
  max: 3, // 每小时3个请求
  message: { error: '密码重置请求过多' }
});

// 应用限制器
app.use('/api/', apiLimiter);
app.use('/auth/login', authLimiter);
app.use('/auth/forgot-password', passwordResetLimiter);

滑动窗口实现(自定义)

// 基于Redis的滑动窗口速率限制器
class SlidingWindowRateLimiter {
  constructor(redisClient, options = {}) {
    this.redis = redisClient;
    this.windowMs = options.windowMs || 60000; // 默认1分钟
    this.maxRequests = options.maxRequests || 100;
    this.keyPrefix = options.keyPrefix || 'ratelimit';
  }

  async isAllowed(identifier) {
    const now = Date.now();
    const windowStart = now - this.windowMs;
    const key = `${this.keyPrefix}:${identifier}`;

    // 移除旧条目并计数最近的
    const multi = this.redis.multi();
    multi.zRemRangeByScore(key, 0, windowStart);
    multi.zCard(key);
    multi.zAdd(key, { score: now, value: `${now}-${Math.random()}` });
    multi.expire(key, Math.ceil(this.windowMs / 1000));

    const results = await multi.exec();
    const requestCount = results[1];

    return {
      allowed: requestCount < this.maxRequests,
      remaining: Math.max(0, this.maxRequests - requestCount - 1),
      resetAt: now + this.windowMs
    };
  }
}

// Express中间件
function createRateLimitMiddleware(limiter) {
  return async (req, res, next) => {
    const identifier = req.ip;
    const result = await limiter.isAllowed(identifier);

    res.setHeader('X-RateLimit-Limit', limiter.maxRequests);
    res.setHeader('X-RateLimit-Remaining', result.remaining);
    res.setHeader('X-RateLimit-Reset', result.resetAt);

    if (!result.allowed) {
      return res.status(429).json({ error: '超出速率限制' });
    }

    next();
  };
}

使用API密钥的每用户速率限制

// 基于层级的限制
const tierLimits = {
  free: { windowMs: 60000, max: 10 },
  pro: { windowMs: 60000, max: 100 },
  enterprise: { windowMs: 60000, max: 1000 }
};

async function apiKeyRateLimiter(req, res, next) {
  const apiKey = req.headers['x-api-key'];

  if (!apiKey) {
    return res.status(401).json({ error: '需要API密钥' });
  }

  // 查找API密钥
  const keyData = await db.query(
    'SELECT user_id, tier, revoked FROM api_keys WHERE key_hash = $1',
    [hashApiKey(apiKey)]
  );

  if (keyData.rows.length === 0 || keyData.rows[0].revoked) {
    return res.status(401).json({ error: '无效API密钥' });
  }

  const { user_id, tier } = keyData.rows[0];
  const limits = tierLimits[tier] || tierLimits.free;

  // 按用户进行速率限制,防止密钥轮换滥用
  const limiter = new SlidingWindowRateLimiter(redisClient, {
    ...limits,
    keyPrefix: 'apikey'
  });

  const result = await limiter.isAllowed(user_id);

  res.setHeader('X-RateLimit-Limit', limits.max);
  res.setHeader('X-RateLimit-Remaining', result.remaining);
  res.setHeader('X-RateLimit-Reset', result.resetAt);

  if (!result.allowed) {
    return res.status(429).json({ error: '超出速率限制' });
  }

  req.userId = user_id;
  next();
}

输入验证

使用Zod进行验证(TypeScript/JavaScript)

const { z } = require('zod');

// 定义模式
const createUserSchema = z.object({
  email: z.string().email().max(255),
  password: z.string().min(12).max(128),
  name: z.string().min(1).max(100).optional()
});

const updateProfileSchema = z.object({
  name: z.string().min(1).max(100).optional(),
  bio: z.string().max(500).optional(),
  website: z.string().url().optional().or(z.literal(''))
});

const paginationSchema = z.object({
  page: z.coerce.number().int().min(1).default(1),
  limit: z.coerce.number().int().min(1).max(100).default(20)
});

// 中间件工厂
function validate(schema) {
  return (req, res, next) => {
    const result = schema.safeParse(req.body);

    if (!result.success) {
      return res.status(400).json({
        error: '验证失败',
        details: result.error.issues.map(issue => ({
          field: issue.path.join('.'),
          message: issue.message
        }))
      });
    }

    req.validated = result.data;
    next();
  };
}

// 用法
app.post('/users', validate(createUserSchema), async (req, res) => {
  const { email, password, name } = req.validated;
  // 数据已验证和类型化
});

净化

const createDOMPurify = require('dompurify');
const { JSDOM } = require('jsdom');
const validator = require('validator');

const window = new JSDOM('').window;
const DOMPurify = createDOMPurify(window);

// HTML净化(当必须允许一些HTML时)
function sanitizeHtml(dirty) {
  return DOMPurify.sanitize(dirty, {
    ALLOWED_TAGS: ['b', 'i', 'em', 'strong', 'a', 'p', 'br'],
    ALLOWED_ATTR: ['href'],
    ALLOW_DATA_ATTR: false
  });
}

// 字符串净化
function sanitizeString(str) {
  if (typeof str !== 'string') return '';

  return str
    .trim()
    .slice(0, 10000) // 最大长度
    .replace(/[\x00-\x1F\x7F]/g, ''); // 移除控制字符
}

// SQL安全标识符(用于动态列名)
function sanitizeIdentifier(str) {
  // 仅允许字母数字和下划线
  if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(str)) {
    throw new Error('无效标识符');
  }
  return str;
}

// 文件名净化
function sanitizeFilename(filename) {
  return filename
    .replace(/[^a-zA-Z0-9._-]/g, '_')
    .replace(/\.{2,}/g, '.')
    .slice(0, 255);
}

防止SQL注入

// 错误:字符串插值
const query = `SELECT * FROM users WHERE id = ${userId}`;

// 错误:字符串连接
const query = 'SELECT * FROM users WHERE id = ' + userId;

// 错误:使用用户输入的模板文字
const query = `SELECT * FROM users WHERE name = '${name}'`;

// 正确:参数化查询(PostgreSQL)
const result = await db.query(
  'SELECT * FROM users WHERE id = $1',
  [userId]
);

// 正确:参数化查询(MySQL)
const result = await db.query(
  'SELECT * FROM users WHERE id = ?',
  [userId]
);

// 正确:查询构建器(Knex)
const users = await knex('users')
  .where('id', userId)
  .first();

// 正确:ORM(Prisma)
const user = await prisma.user.findUnique({
  where: { id: userId }
});

// 当需要动态列名时(罕见)
const allowedColumns = ['name', 'email', 'created_at'];
const sortColumn = allowedColumns.includes(req.query.sort)
  ? req.query.sort
  : 'created_at';

const query = `SELECT * FROM users ORDER BY ${sortColumn}`; // 安全,因为白名单

防止XSS

// 错误:直接插入用户内容
res.send(`<h1>Hello ${userName}</h1>`);

// 正确:使用具有自动转义的模板引擎
// EJS(默认使用<%= %>自动转义)
res.render('greeting', { name: userName });

// 正确:需要时手动转义
const escapeHtml = require('escape-html');
res.send(`<h1>Hello ${escapeHtml(userName)}</h1>`);

// 正确:为JSON响应设置Content-Type
res.json({ name: userName }); // Express设置正确头

// 在React/Vue/Angular中:框架默认处理转义
// 只需避免使用dangerouslySetInnerHTML / v-html / [innerHTML]

CORS配置

Express.js

const cors = require('cors');

// 开发:允许本地主机
const developmentOrigins = [
  'http://localhost:3000',
  'http://localhost:5173',
  'http://127.0.0.1:3000'
];

// 生产:仅限特定域名
const productionOrigins = [
  'https://yourapp.com',
  'https://www.yourapp.com',
  'https://app.yourapp.com'
];

const allowedOrigins = process.env.NODE_ENV === 'production'
  ? productionOrigins
  : [...productionOrigins, ...developmentOrigins];

const corsOptions = {
  origin: (origin, callback) => {
    // 允许无来源的请求(移动应用、curl等)
    if (!origin) {
      return callback(null, true);
    }

    if (allowedOrigins.includes(origin)) {
      callback(null, true);
    } else {
      callback(new Error('CORS不允许'));
    }
  },
  methods: ['GET', 'POST', 'PUT', 'DELETE', 'PATCH'],
  allowedHeaders: ['Content-Type', 'Authorization', 'X-API-Key'],
  exposedHeaders: ['X-RateLimit-Limit', 'X-RateLimit-Remaining'],
  credentials: true, // 允许cookie
  maxAge: 86400 // 预检缓存24小时
};

app.use(cors(corsOptions));

// 处理CORS错误
app.use((err, req, res, next) => {
  if (err.message === 'CORS不允许') {
    return res.status(403).json({ error: 'CORS不允许' });
  }
  next(err);
});

常见CORS错误

// 错误:允许所有来源
app.use(cors()); // 默认为'*'

// 错误:允许所有来源并带凭证
app.use(cors({ origin: '*', credentials: true })); // 浏览器会拒绝

// 错误:反射Origin头(允许任何来源)
app.use(cors({
  origin: (origin, cb) => cb(null, origin) // 切勿这样做
}));

// 错误:过于宽松的正则表达式
const origin = /yourapp\.com/; // 也匹配evilyourapp.com!

// 正确:精确匹配或严格正则
const origin = /^https:\/\/(www\.)?yourapp\.com$/;

API密钥管理

安全密钥生成和存储

const crypto = require('crypto');

// 生成API密钥
function generateApiKey() {
  // 格式:前缀_随机字节
  // 前缀帮助识别密钥类型并使其可识别
  const prefix = 'sk_live';
  const randomPart = crypto.randomBytes(24).toString('base64url');
  return `${prefix}_${randomPart}`;
}

// 存储哈希(切勿存储明文密钥)
function hashApiKey(key) {
  return crypto.createHash('sha256').update(key).digest('hex');
}

// 创建新API密钥
app.post('/api-keys', requireAuth, async (req, res) => {
  const { name } = req.body;

  // 生成密钥
  const plainKey = generateApiKey();
  const keyHash = hashApiKey(plainKey);

  // 仅存储哈希
  await db.query(
    `INSERT INTO api_keys (user_id, key_hash, name, created_at)
     VALUES ($1, $2, $3, NOW())`,
    [req.userId, keyHash, name]
  );

  // 仅返回一次明文密钥 - 用户必须保存
  res.json({
    key: plainKey,
    message: '立即保存此密钥,它将不再显示。'
  });
});

// 验证API密钥
async function verifyApiKey(key) {
  const keyHash = hashApiKey(key);

  const result = await db.query(
    `SELECT id, user_id, revoked, last_used_at
     FROM api_keys WHERE key_hash = $1`,
    [keyHash]
  );

  if (result.rows.length === 0) {
    return null;
  }

  const keyData = result.rows[0];

  if (keyData.revoked) {
    return null;
  }

  // 更新最后使用时间戳
  await db.query(
    'UPDATE api_keys SET last_used_at = NOW() WHERE id = $1',
    [keyData.id]
  );

  return keyData;
}

// 吊销API密钥
app.delete('/api-keys/:id', requireAuth, async (req, res) => {
  // 用户只能吊销自己的密钥
  await db.query(
    'UPDATE api_keys SET revoked = true, revoked_at = NOW() WHERE id = $1 AND user_id = $2',
    [req.params.id, req.userId]
  );

  res.json({ success: true });
});

API密钥中间件

async function apiKeyAuth(req, res, next) {
  // 检查多个位置的API密钥
  const apiKey = req.headers['x-api-key']
    || req.headers['authorization']?.replace('Bearer ', '')
    || req.query.api_key;

  if (!apiKey) {
    return res.status(401).json({
      error: '需要API密钥',
      hint: '在X-API-Key头中传递API密钥'
    });
  }

  const keyData = await verifyApiKey(apiKey);

  if (!keyData) {
    // 不透露密钥是否存在但已吊销
    return res.status(401).json({ error: '无效API密钥' });
  }

  req.apiKeyId = keyData.id;
  req.userId = keyData.user_id;

  next();
}

请求大小限制

const express = require('express');

// 全局主体大小限制
app.use(express.json({ limit: '100kb' }));
app.use(express.urlencoded({ limit: '100kb', extended: true }));

// 每路由限制
app.post('/api/upload', express.json({ limit: '10mb' }), (req, res) => {
  // 处理大上传
});

// 文件上传限制
const multer = require('multer');
const upload = multer({
  limits: {
    fileSize: 5 * 1024 * 1024, // 5MB
    files: 5 // 最多5个文件
  },
  fileFilter: (req, file, cb) => {
    const allowedTypes = ['image/jpeg', 'image/png', 'image/gif', 'application/pdf'];
    if (allowedTypes.includes(file.mimetype)) {
      cb(null, true);
    } else {
      cb(new Error('无效文件类型'));
    }
  }
});

app.post('/upload', upload.single('file'), (req, res) => {
  // 处理上传
});

响应安全

不泄漏信息

// 错误:泄漏堆栈跟踪
app.use((err, req, res, next) => {
  res.status(500).json({
    error: err.message,
    stack: err.stack // 切勿在生产中使用!
  });
});

// 正确:生产中的通用错误
app.use((err, req, res, next) => {
  console.error(err); // 在服务器端记录完整错误

  if (process.env.NODE_ENV === 'production') {
    res.status(500).json({ error: '内部服务器错误' });
  } else {
    res.status(500).json({ error: err.message, stack: err.stack });
  }
});

// 错误:暴露数据库结构
res.status(400).json({
  error: '重复键值违反唯一约束“users_email_key”'
});

// 正确:用户友好错误
res.status(400).json({
  error: '已有此电子邮件的账户'
});

安全头

const helmet = require('helmet');

app.use(helmet());

// 或单独配置
app.use(helmet.contentSecurityPolicy({
  directives: {
    defaultSrc: ["'self'"],
    scriptSrc: ["'self'"],
    styleSrc: ["'self'", "'unsafe-inline'"],
    imgSrc: ["'self'", "data:", "https:"],
    connectSrc: ["'self'", "https://api.yourapp.com"],
    fontSrc: ["'self'", "https://fonts.gstatic.com"],
    objectSrc: ["'none'"],
    upgradeInsecureRequests: []
  }
}));

app.use(helmet.hsts({
  maxAge: 31536000,
  includeSubDomains: true,
  preload: true
}));

超时保护

// 请求超时中间件
function timeout(ms) {
  return (req, res, next) => {
    res.setTimeout(ms, () => {
      res.status(408).json({ error: '请求超时' });
    });
    next();
  };
}

app.use(timeout(30000)); // 默认30秒

// 外部API调用超时
async function fetchWithTimeout(url, options = {}, timeoutMs = 5000) {
  const controller = new AbortController();
  const timeoutId = setTimeout(() => controller.abort(), timeoutMs);

  try {
    const response = await fetch(url, {
      ...options,
      signal: controller.signal
    });
    return response;
  } finally {
    clearTimeout(timeoutId);
  }
}

// 数据库查询超时
const result = await db.query({
  text: 'SELECT * FROM large_table WHERE condition = $1',
  values: [value],
  timeout: 5000 // 5秒查询超时
});

FastAPI(Python)等效

from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from slowapi import Limiter
from slowapi.util import get_remote_address
from pydantic import BaseModel, EmailStr, Field
import hashlib
import secrets

app = FastAPI()

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://yourapp.com"],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["*"],
)

# 速率限制
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter

@app.post("/api/login")
@limiter.limit("5/minute")
async def login(request: Request, credentials: LoginRequest):
    # 处理登录
    pass

# 使用Pydantic进行输入验证
class CreateUserRequest(BaseModel):
    email: EmailStr
    password: str = Field(min_length=12, max_length=128)
    name: str = Field(max_length=100, default=None)

@app.post("/users")
async def create_user(user: CreateUserRequest):
    # 数据已验证
    pass

# API密钥生成
def generate_api_key() -> str:
    return f"sk_live_{secrets.token_urlsafe(24)}"

def hash_api_key(key: str) -> str:
    return hashlib.sha256(key.encode()).hexdigest()

API安全检查清单

  • [ ] 所有端点的速率限制
  • [ ] 认证端点的更严格限制
  • [ ] 使用模式库进行输入验证
  • [ ] 参数化数据库查询
  • [ ] 配置特定来源的CORS
  • [ ] API密钥存储前哈希
  • [ ] 配置请求大小限制
  • [ ] 所有外部调用的超时
  • [ ] 通过Helmet或等效的安全头
  • [ ] 错误消息不泄漏系统信息
  • [ ] 所有认证仅通过HTTPS
  • [ ] API版本化用于重大更改