名称: 速率限制 描述: API速率限制和配额管理的实现。用于实施请求节流、API配额、背压处理或防止滥用。关键词:速率限制、限流、令牌桶、漏桶、滑动窗口、固定窗口、配额、429、请求过多、DDoS、滥用防护、API配额、突发、Redis速率限制、分布式速率限制、API网关、每用户限制、每IP限制、并发请求、请求限制。
速率限制
概述
速率限制是一种控制客户端向API发出请求速率的技术。它保护服务免受滥用,确保公平使用,并维护系统稳定性。本技能涵盖分布式速率限制的算法、实现模式和最佳实践。
关键概念
速率限制算法
令牌桶算法:
令牌桶允许突发,同时保持平均速率。
class TokenBucket {
private tokens: number;
private lastRefill: number;
constructor(
private capacity: number, // 最大令牌数
private refillRate: number, // 每秒令牌数
) {
this.tokens = capacity;
this.lastRefill = Date.now();
}
private refill(): void {
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000;
const tokensToAdd = elapsed * this.refillRate;
this.tokens = Math.min(this.capacity, this.tokens + tokensToAdd);
this.lastRefill = now;
}
consume(tokens: number = 1): boolean {
this.refill();
if (this.tokens >= tokens) {
this.tokens -= tokens;
return true;
}
return false;
}
getState(): { tokens: number; capacity: number } {
this.refill();
return { tokens: this.tokens, capacity: this.capacity };
}
}
// 用法:100请求/分钟,突发10个
const bucket = new TokenBucket(10, 100 / 60);
滑动窗口日志算法:
通过跟踪单个请求时间戳实现精确速率限制。
class SlidingWindowLog {
private requests: number[] = [];
constructor(
private windowMs: number, // 窗口大小(毫秒)
private maxRequests: number, // 每个窗口最大请求数
) {}
isAllowed(): boolean {
const now = Date.now();
const windowStart = now - this.windowMs;
// 移除过期条目
this.requests = this.requests.filter((ts) => ts > windowStart);
if (this.requests.length < this.maxRequests) {
this.requests.push(now);
return true;
}
return false;
}
getRemainingRequests(): number {
const now = Date.now();
const windowStart = now - this.windowMs;
this.requests = this.requests.filter((ts) => ts > windowStart);
return Math.max(0, this.maxRequests - this.requests.length);
}
getResetTime(): number {
if (this.requests.length === 0) return 0;
return this.requests[0] + this.windowMs;
}
}
滑动窗口计数器算法:
使用加权计数器实现内存高效近似。
class SlidingWindowCounter {
private previousCount: number = 0;
private currentCount: number = 0;
private windowStart: number;
constructor(
private windowMs: number,
private maxRequests: number,
) {
this.windowStart = Date.now();
}
isAllowed(): boolean {
const now = Date.now();
const elapsed = now - this.windowStart;
// 检查是否移动到新窗口
if (elapsed >= this.windowMs) {
const windowsPassed = Math.floor(elapsed / this.windowMs);
if (windowsPassed === 1) {
this.previousCount = this.currentCount;
} else {
this.previousCount = 0;
}
this.currentCount = 0;
this.windowStart = now - (elapsed % this.windowMs);
}
// 计算加权计数
const windowProgress = (now - this.windowStart) / this.windowMs;
const weightedCount =
this.previousCount * (1 - windowProgress) + this.currentCount;
if (weightedCount < this.maxRequests) {
this.currentCount++;
return true;
}
return false;
}
}
漏桶算法:
通过以恒定速率处理请求来平滑突发。
class LeakyBucket {
private queue: Array<() => void> = [];
private processing: boolean = false;
constructor(
private capacity: number, // 队列大小
private leakRate: number, // 每秒处理的请求数
) {}
async add(request: () => Promise<void>): Promise<boolean> {
if (this.queue.length >= this.capacity) {
return false; // 队列满,拒绝
}
return new Promise((resolve) => {
this.queue.push(async () => {
await request();
resolve(true);
});
this.processQueue();
});
}
private async processQueue(): Promise<void> {
if (this.processing || this.queue.length === 0) return;
this.processing = true;
while (this.queue.length > 0) {
const request = this.queue.shift();
if (request) {
await request();
await this.delay(1000 / this.leakRate);
}
}
this.processing = false;
}
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
API配额管理
分层配额系统:
interface QuotaTier {
name: string;
limits: {
requestsPerMinute: number;
requestsPerDay: number;
burstSize: number;
};
features: string[];
}
const quotaTiers: Record<string, QuotaTier> = {
free: {
name: "免费",
limits: {
requestsPerMinute: 10,
requestsPerDay: 1000,
burstSize: 5,
},
features: ["基础API"],
},
pro: {
name: "专业版",
limits: {
requestsPerMinute: 100,
requestsPerDay: 50000,
burstSize: 20,
},
features: ["基础API", "高级API", "Webhooks"],
},
enterprise: {
name: "企业版",
limits: {
requestsPerMinute: 1000,
requestsPerDay: 1000000,
burstSize: 100,
},
features: ["基础API", "高级API", "Webhooks", "批量API"],
},
};
class QuotaManager {
constructor(private redis: Redis) {}
async checkQuota(
userId: string,
tier: string,
): Promise<{ allowed: boolean; remaining: number; resetAt: number }> {
const config = quotaTiers[tier];
if (!config) throw new Error(`未知层级: ${tier}`);
const minuteKey = `quota:${userId}:minute`;
const dayKey = `quota:${userId}:day`;
const [minuteCount, dayCount] = await Promise.all([
this.redis.get(minuteKey),
this.redis.get(dayKey),
]);
const currentMinute = parseInt(minuteCount || "0");
const currentDay = parseInt(dayCount || "0");
if (currentMinute >= config.limits.requestsPerMinute) {
const ttl = await this.redis.ttl(minuteKey);
return { allowed: false, remaining: 0, resetAt: Date.now() + ttl * 1000 };
}
if (currentDay >= config.limits.requestsPerDay) {
const ttl = await this.redis.ttl(dayKey);
return { allowed: false, remaining: 0, resetAt: Date.now() + ttl * 1000 };
}
// 增加计数器
const pipeline = this.redis.pipeline();
pipeline.incr(minuteKey);
pipeline.expire(minuteKey, 60);
pipeline.incr(dayKey);
pipeline.expire(dayKey, 86400);
await pipeline.exec();
return {
allowed: true,
remaining: config.limits.requestsPerMinute - currentMinute - 1,
resetAt: Date.now() + 60000,
};
}
}
每用户vs每IP限制
组合策略:
interface RateLimitConfig {
authenticated: {
requestsPerMinute: number;
requestsPerHour: number;
};
anonymous: {
requestsPerMinute: number;
requestsPerHour: number;
};
ipBased: {
requestsPerMinute: number;
maxConnectionsPerIP: number;
};
}
class HybridRateLimiter {
constructor(
private redis: Redis,
private config: RateLimitConfig,
) {}
async check(
ip: string,
userId?: string,
): Promise<{ allowed: boolean; retryAfter?: number }> {
// 首先检查基于IP的限制(DDoS防护)
const ipResult = await this.checkIPLimit(ip);
if (!ipResult.allowed) {
return ipResult;
}
// 然后检查用户或匿名限制
if (userId) {
return this.checkUserLimit(userId);
} else {
return this.checkAnonymousLimit(ip);
}
}
private async checkIPLimit(
ip: string,
): Promise<{ allowed: boolean; retryAfter?: number }> {
const key = `ratelimit:ip:${ip}`;
const count = await this.redis.incr(key);
if (count === 1) {
await this.redis.expire(key, 60);
}
if (count > this.config.ipBased.requestsPerMinute) {
const ttl = await this.redis.ttl(key);
return { allowed: false, retryAfter: ttl };
}
return { allowed: true };
}
private async checkUserLimit(
userId: string,
): Promise<{ allowed: boolean; retryAfter?: number }> {
const minuteKey = `ratelimit:user:${userId}:minute`;
const hourKey = `ratelimit:user:${userId}:hour`;
const [minuteCount, hourCount] = await Promise.all([
this.incrementWithExpiry(minuteKey, 60),
this.incrementWithExpiry(hourKey, 3600),
]);
if (minuteCount > this.config.authenticated.requestsPerMinute) {
const ttl = await this.redis.ttl(minuteKey);
return { allowed: false, retryAfter: ttl };
}
if (hourCount > this.config.authenticated.requestsPerHour) {
const ttl = await this.redis.ttl(hourKey);
return { allowed: false, retryAfter: ttl };
}
return { allowed: true };
}
private async incrementWithExpiry(
key: string,
expiry: number,
): Promise<number> {
const count = await this.redis.incr(key);
if (count === 1) {
await this.redis.expire(key, expiry);
}
return count;
}
}
基于Redis的速率限制
Redis提供分布式速率限制,具有原子操作和在多个应用实例上的高性能。
分布式速率限制与Redis
基于Redis的滑动窗口:
import Redis from "ioredis";
class DistributedRateLimiter {
private redis: Redis;
constructor(redisUrl: string) {
this.redis = new Redis(redisUrl);
}
async isAllowed(
key: string,
limit: number,
windowSeconds: number,
): Promise<{ allowed: boolean; remaining: number; resetAt: number }> {
const now = Date.now();
const windowStart = now - windowSeconds * 1000;
const redisKey = `ratelimit:${key}`;
// 使用Lua脚本进行原子操作
const luaScript = `
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window_start = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local window_seconds = tonumber(ARGV[4])
-- 移除旧条目
redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)
-- 计数当前条目
local count = redis.call('ZCARD', key)
if count < limit then
-- 添加新条目
redis.call('ZADD', key, now, now .. '-' .. math.random())
redis.call('EXPIRE', key, window_seconds)
return {1, limit - count - 1}
else
-- 获取最旧条目以获取重置时间
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
local reset_at = oldest[2] and (oldest[2] + window_seconds * 1000) or (now + window_seconds * 1000)
return {0, 0, reset_at}
end
`;
const result = (await this.redis.eval(
luaScript,
1,
redisKey,
now.toString(),
windowStart.toString(),
limit.toString(),
windowSeconds.toString(),
)) as number[];
return {
allowed: result[0] === 1,
remaining: result[1],
resetAt: result[2] || now + windowSeconds * 1000,
};
}
}
Redis集群支持:
class ClusterRateLimiter {
private cluster: Redis.Cluster;
constructor(nodes: { host: string; port: number }[]) {
this.cluster = new Redis.Cluster(nodes, {
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
scaleReads: "slave",
});
}
async checkLimit(
identifier: string,
limit: number,
windowMs: number,
): Promise<boolean> {
// 使用哈希标签确保所有用户键位于同一槽
const key = `{ratelimit:${identifier}}:counter`;
const count = await this.cluster.incr(key);
if (count === 1) {
await this.cluster.pexpire(key, windowMs);
}
return count <= limit;
}
}
API网关速率限制
API网关提供跨微服务的集中式速率限制。
Kong速率限制
# Kong配置
plugins:
- name: rate-limiting
config:
minute: 100
hour: 10000
policy: redis
redis_host: redis.example.com
redis_port: 6379
fault_tolerant: true
hide_client_headers: false
Nginx速率限制
# Nginx配置
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req_zone $http_authorization zone=user:10m rate=100r/s;
server {
location /api/ {
# 突发允许20请求,然后强制速率
limit_req zone=api burst=20 nodelay;
limit_req_status 429;
# 自定义速率限制头
add_header X-RateLimit-Limit $limit_req_rate always;
add_header X-RateLimit-Remaining $limit_req_remaining always;
proxy_pass http://backend;
}
location /api/authenticated/ {
limit_req zone=user burst=50;
proxy_pass http://backend;
}
}
AWS API网关节流
// AWS CDK配置
import * as apigateway from "aws-cdk-lib/aws-apigateway";
const api = new apigateway.RestApi(this, "MyApi", {
deployOptions: {
throttlingRateLimit: 1000, // 每秒请求数
throttlingBurstLimit: 2000, // 突发容量
},
});
// 每个方法节流
const resource = api.root.addResource("users");
resource.addMethod("GET", integration, {
methodResponses: [{ statusCode: "200" }, { statusCode: "429" }],
throttling: {
rateLimit: 100,
burstLimit: 200,
},
});
// 每个客户端API密钥节流
const plan = api.addUsagePlan("BasicPlan", {
throttle: {
rateLimit: 10,
burstLimit: 20,
},
quota: {
limit: 10000,
period: apigateway.Period.MONTH,
},
});
Envoy速率限制
# Envoy代理配置
static_resources:
listeners:
- name: listener_0
address:
socket_address:
address: 0.0.0.0
port_value: 8080
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
http_filters:
- name: envoy.filters.http.ratelimit
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.ratelimit.v3.RateLimit
domain: api_domain
rate_limit_service:
grpc_service:
envoy_grpc:
cluster_name: rate_limit_cluster
failure_mode_deny: false
分布式系统考虑
多区域速率限制
具有Redis集群的全局速率限制器:
class GlobalRateLimiter {
private regions: Map<string, Redis.Cluster>;
private localCache: Map<string, { count: number; expires: number }>;
constructor(redisConfig: Record<string, Redis.ClusterNode[]>) {
this.regions = new Map();
this.localCache = new Map();
for (const [region, nodes] of Object.entries(redisConfig)) {
this.regions.set(region, new Redis.Cluster(nodes));
}
}
async checkLimit(
userId: string,
limit: number,
windowSeconds: number,
): Promise<boolean> {
// 首先检查本地缓存(减少Redis调用)
const cached = this.localCache.get(userId);
if (cached && cached.expires > Date.now()) {
if (cached.count >= limit) {
return false;
}
cached.count++;
return true;
}
// 聚合所有区域的计数
const promises = Array.from(this.regions.values()).map((redis) =>
redis.get(`ratelimit:${userId}`).then((v) => parseInt(v || "0")),
);
const counts = await Promise.all(promises);
const totalCount = counts.reduce((sum, c) => sum + c, 0);
if (totalCount >= limit) {
return false;
}
// 在当前区域增加
const currentRegion = this.regions.get(process.env.REGION!)!;
const key = `ratelimit:${userId}`;
await currentRegion.incr(key);
await currentRegion.expire(key, windowSeconds);
// 更新本地缓存
this.localCache.set(userId, {
count: totalCount + 1,
expires: Date.now() + 1000, // 缓存1秒
});
return true;
}
}
最终一致性权衡
分布式系统中的速率限制涉及权衡:
- 强一致性:准确的限制,更高的延迟(全局协调)
- 最终一致性:更低的延迟,潜在的超限(区域独立)
- 混合方法:本地缓存与定期同步
interface RateLimitStrategy {
consistency: "strong" | "eventual" | "hybrid";
tolerancePercent: number; // 可接受的超限百分比
}
class HybridDistributedLimiter {
private localCount: Map<string, number> = new Map();
private globalSync: Redis;
private syncInterval: number = 5000; // 5秒
constructor(
redis: Redis,
private strategy: RateLimitStrategy,
) {
this.globalSync = redis;
this.startSyncLoop();
}
async checkLimit(userId: string, limit: number): Promise<boolean> {
const local = this.localCount.get(userId) || 0;
if (this.strategy.consistency === "strong") {
// 总是检查全局状态
const global = await this.getGlobalCount(userId);
return global < limit;
}
// 最终一致性:允许本地缓冲
const buffer = Math.ceil((limit * this.strategy.tolerancePercent) / 100);
const effectiveLimit = limit + buffer;
if (local >= effectiveLimit) {
// 超过本地缓冲,检查全局
const global = await this.getGlobalCount(userId);
if (global >= limit) {
return false;
}
}
this.localCount.set(userId, local + 1);
return true;
}
private async getGlobalCount(userId: string): Promise<number> {
const count = await this.globalSync.get(`global:${userId}`);
return parseInt(count || "0");
}
private startSyncLoop(): void {
setInterval(async () => {
for (const [userId, count] of this.localCount.entries()) {
await this.globalSync.incrby(`global:${userId}`, count);
this.localCount.set(userId, 0);
}
}, this.syncInterval);
}
}
背压模式
具有速率限制的断路器:
enum CircuitState {
CLOSED = "CLOSED",
OPEN = "OPEN",
HALF_OPEN = "HALF_OPEN",
}
class CircuitBreaker {
private state: CircuitState = CircuitState.CLOSED;
private failures: number = 0;
private lastFailure: number = 0;
private successCount: number = 0;
constructor(
private failureThreshold: number = 5,
private resetTimeout: number = 30000,
private halfOpenSuccessThreshold: number = 3,
) {}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === CircuitState.OPEN) {
if (Date.now() - this.lastFailure >= this.resetTimeout) {
this.state = CircuitState.HALF_OPEN;
this.successCount = 0;
} else {
throw new Error("断路器打开");
}
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess(): void {
if (this.state === CircuitState.HALF_OPEN) {
this.successCount++;
if (this.successCount >= this.halfOpenSuccessThreshold) {
this.state = CircuitState.CLOSED;
this.failures = 0;
}
} else {
this.failures = 0;
}
}
private onFailure(): void {
this.failures++;
this.lastFailure = Date.now();
if (this.failures >= this.failureThreshold) {
this.state = CircuitState.OPEN;
}
}
getState(): CircuitState {
return this.state;
}
}
自适应速率限制:
class AdaptiveRateLimiter {
private currentLimit: number;
private successRate: number = 1;
private window: { success: boolean; timestamp: number }[] = [];
constructor(
private minLimit: number,
private maxLimit: number,
private targetSuccessRate: number = 0.95,
private windowSize: number = 100,
) {
this.currentLimit = maxLimit;
}
recordResult(success: boolean): void {
const now = Date.now();
this.window.push({ success, timestamp: now });
// 保持窗口大小可管理
if (this.window.length > this.windowSize) {
this.window.shift();
}
this.adjustLimit();
}
private adjustLimit(): void {
if (this.window.length < 10) return;
const successes = this.window.filter((r) => r.success).length;
this.successRate = successes / this.window.length;
if (this.successRate < this.targetSuccessRate) {
// 成功率下降时减少限制
this.currentLimit = Math.max(
this.minLimit,
Math.floor(this.currentLimit * 0.9),
);
} else if (this.successRate > this.targetSuccessRate + 0.02) {
// 稳定时缓慢增加限制
this.currentLimit = Math.min(
this.maxLimit,
Math.ceil(this.currentLimit * 1.05),
);
}
}
getCurrentLimit(): number {
return this.currentLimit;
}
getSuccessRate(): number {
return this.successRate;
}
}
速率限制头和客户端通信
标准头:
interface RateLimitInfo {
limit: number;
remaining: number;
reset: number; // Unix时间戳
retryAfter?: number; // 秒数
}
function setRateLimitHeaders(res: Response, info: RateLimitInfo): void {
// 标准头
res.setHeader("X-RateLimit-Limit", info.limit.toString());
res.setHeader("X-RateLimit-Remaining", info.remaining.toString());
res.setHeader("X-RateLimit-Reset", info.reset.toString());
// 草案IETF标准头
res.setHeader("RateLimit-Limit", info.limit.toString());
res.setHeader("RateLimit-Remaining", info.remaining.toString());
res.setHeader("RateLimit-Reset", info.reset.toString());
if (info.retryAfter !== undefined) {
res.setHeader("Retry-After", info.retryAfter.toString());
}
}
// 429错误的响应体
interface RateLimitErrorResponse {
error: {
code: "RATE_LIMIT_EXCEEDED";
message: string;
retryAfter: number;
limit: number;
resetAt: string;
};
}
function createRateLimitError(info: RateLimitInfo): RateLimitErrorResponse {
return {
error: {
code: "RATE_LIMIT_EXCEEDED",
message: `速率限制超出。请在${info.retryAfter}秒后重试。`,
retryAfter: info.retryAfter!,
limit: info.limit,
resetAt: new Date(info.reset * 1000).toISOString(),
},
};
}
Express中间件:
import { Request, Response, NextFunction } from "express";
interface RateLimiterOptions {
windowMs: number;
max: number;
keyGenerator?: (req: Request) => string;
skip?: (req: Request) => boolean;
handler?: (req: Request, res: Response) => void;
}
function createRateLimiter(options: RateLimiterOptions) {
const limiter = new DistributedRateLimiter(process.env.REDIS_URL!);
return async (req: Request, res: Response, next: NextFunction) => {
// 如果配置则跳过
if (options.skip?.(req)) {
return next();
}
const key = options.keyGenerator?.(req) || req.ip;
const result = await limiter.isAllowed(
key,
options.max,
options.windowMs / 1000,
);
const info: RateLimitInfo = {
limit: options.max,
remaining: result.remaining,
reset: Math.ceil(result.resetAt / 1000),
};
setRateLimitHeaders(res, info);
if (!result.allowed) {
info.retryAfter = Math.ceil((result.resetAt - Date.now()) / 1000);
if (options.handler) {
return options.handler(req, res);
}
return res.status(429).json(createRateLimitError(info));
}
next();
};
}
// 用法
app.use(
"/api/",
createRateLimiter({
windowMs: 60 * 1000, // 1分钟
max: 100,
keyGenerator: (req) => req.user?.id || req.ip,
skip: (req) => req.path === "/api/health",
}),
);
优雅降级
基于优先级的降级:
enum RequestPriority {
CRITICAL = 1, // 健康检查、认证
HIGH = 2, // 用户发起的操作
NORMAL = 3, // 常规API调用
LOW = 4, // 后台任务
BATCH = 5, // 批量操作
}
class PriorityRateLimiter {
private limiters: Map<RequestPriority, TokenBucket> = new Map();
private systemLoad: number = 0;
constructor() {
// 每个优先级的不同限制
this.limiters.set(RequestPriority.CRITICAL, new TokenBucket(1000, 100));
this.limiters.set(RequestPriority.HIGH, new TokenBucket(500, 50));
this.limiters.set(RequestPriority.NORMAL, new TokenBucket(200, 20));
this.limiters.set(RequestPriority.LOW, new TokenBucket(50, 5));
this.limiters.set(RequestPriority.BATCH, new TokenBucket(10, 1));
}
setSystemLoad(load: number): void {
this.systemLoad = Math.max(0, Math.min(1, load));
}
canProcess(priority: RequestPriority): boolean {
// 高负载下,拒绝低优先级请求
if (this.systemLoad > 0.8 && priority > RequestPriority.HIGH) {
return false;
}
if (this.systemLoad > 0.9 && priority > RequestPriority.CRITICAL) {
return false;
}
const limiter = this.limiters.get(priority);
return limiter?.consume() ?? false;
}
}
功能标志集成:
class DegradationManager {
private degradedFeatures: Set<string> = new Set();
async checkAndDegrade(
feature: string,
fallback: () => Promise<any>,
): Promise<any> {
if (this.degradedFeatures.has(feature)) {
return fallback();
}
return null; // 继续正常执行
}
enableDegradedMode(feature: string): void {
this.degradedFeatures.add(feature);
console.log(`降级模式启用: ${feature}`);
}
disableDegradedMode(feature: string): void {
this.degradedFeatures.delete(feature);
console.log(`降级模式禁用: ${feature}`);
}
isDegraded(feature: string): boolean {
return this.degradedFeatures.has(feature);
}
}
最佳实践
算法选择
- 令牌桶:最适合允许突发同时保持平均速率
- 滑动窗口:最适合精确速率限制,无突发
- 漏桶:最适合平滑到下游服务的流量
Redis配置
- 使用Redis集群实现高可用性
- 设置适当的内存限制和逐出策略
- 使用Lua脚本进行原子操作
- 监控Redis延迟和连接池
客户端体验
- 总是返回速率限制头
- 提供清晰的错误消息和重试时间
- 考虑实施客户端速率限制
- 在API文档中记录速率限制
监控
- 按端点和用户跟踪速率限制命中
- 对速率限制突然激增发出警报
- 监控分布式攻击模式
- 记录速率限制事件以供调试
安全性
- 实施基于IP的限制作为第一道防线
- 对合法用户使用认证速率限制
- 考虑特定区域的速率限制以应对区域滥用
- 对可疑模式实施CAPTCHA
示例
完整Express速率限制设置
import express from "express";
import Redis from "ioredis";
const app = express();
const redis = new Redis(process.env.REDIS_URL);
// 速率限制器工厂
function rateLimiter(config: {
points: number;
duration: number;
keyPrefix: string;
}) {
return async (
req: express.Request,
res: express.Response,
next: express.NextFunction,
) => {
const key = `${config.keyPrefix}:${req.user?.id || req.ip}`;
try {
const current = await redis.incr(key);
if (current === 1) {
await redis.expire(key, config.duration);
}
const ttl = await redis.ttl(key);
const remaining = Math.max(0, config.points - current);
res.set({
"X-RateLimit-Limit": config.points.toString(),
"X-RateLimit-Remaining": remaining.toString(),
"X-RateLimit-Reset": (Math.floor(Date.now() / 1000) + ttl).toString(),
});
if (current > config.points) {
res.set("Retry-After", ttl.toString());
return res.status(429).json({
error: "请求过多",
retryAfter: ttl,
});
}
next();
} catch (error) {
// 如果Redis不可用,则故障开放
console.error("速率限制器错误:", error);
next();
}
};
}
// 对不同端点应用不同限制
app.use(
"/api/auth",
rateLimiter({ points: 5, duration: 60, keyPrefix: "auth" }),
);
app.use(
"/api/search",
rateLimiter({ points: 30, duration: 60, keyPrefix: "search" }),
);
app.use("/api", rateLimiter({ points: 100, duration: 60, keyPrefix: "api" }));
app.listen(3000);
客户端速率限制处理
class APIClient {
private retryAfter: number = 0;
async request<T>(url: string, options?: RequestInit): Promise<T> {
// 检查是否处于速率限制状态
if (this.retryAfter > Date.now()) {
const waitTime = this.retryAfter - Date.now();
throw new Error(
`速率限制。请在${Math.ceil(waitTime / 1000)}秒后重试`,
);
}
const response = await fetch(url, options);
// 从头更新速率限制状态
const remaining = response.headers.get("X-RateLimit-Remaining");
const reset = response.headers.get("X-RateLimit-Reset");
if (response.status === 429) {
const retryAfter = response.headers.get("Retry-After");
this.retryAfter = Date.now() + parseInt(retryAfter || "60") * 1000;
const body = await response.json();
throw new RateLimitError(body.error, parseInt(retryAfter || "60"));
}
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
return response.json();
}
async requestWithRetry<T>(
url: string,
options?: RequestInit,
maxRetries: number = 3,
): Promise<T> {
let lastError: Error | null = null;
for (let i = 0; i < maxRetries; i++) {
try {
return await this.request<T>(url, options);
} catch (error) {
lastError = error as Error;
if (error instanceof RateLimitError) {
// 等待重试后期间
await this.delay(error.retryAfter * 1000);
} else {
// 其他错误的指数退避
await this.delay(Math.pow(2, i) * 1000);
}
}
}
throw lastError;
}
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
class RateLimitError extends Error {
constructor(
message: string,
public retryAfter: number,
) {
super(message);
this.name = "RateLimitError";
}
}