速率限制Skill rate-limiting

速率限制技能用于控制API请求速率,防止服务滥用和DDoS攻击,确保系统稳定性和公平使用。涵盖令牌桶、滑动窗口等算法,以及Redis实现、API网关集成和分布式系统最佳实践。关键词:速率限制、API管理、分布式限流、Redis、微服务、背压处理、令牌桶、滑动窗口、配额管理。

后端开发 0 次安装 2 次浏览 更新于 3/24/2026

名称: 速率限制 描述: API速率限制和配额管理的实现。用于实施请求节流、API配额、背压处理或防止滥用。关键词:速率限制、限流、令牌桶、漏桶、滑动窗口、固定窗口、配额、429、请求过多、DDoS、滥用防护、API配额、突发、Redis速率限制、分布式速率限制、API网关、每用户限制、每IP限制、并发请求、请求限制。

速率限制

概述

速率限制是一种控制客户端向API发出请求速率的技术。它保护服务免受滥用,确保公平使用,并维护系统稳定性。本技能涵盖分布式速率限制的算法、实现模式和最佳实践。

关键概念

速率限制算法

令牌桶算法:

令牌桶允许突发,同时保持平均速率。

class TokenBucket {
  private tokens: number;
  private lastRefill: number;

  constructor(
    private capacity: number, // 最大令牌数
    private refillRate: number, // 每秒令牌数
  ) {
    this.tokens = capacity;
    this.lastRefill = Date.now();
  }

  private refill(): void {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000;
    const tokensToAdd = elapsed * this.refillRate;

    this.tokens = Math.min(this.capacity, this.tokens + tokensToAdd);
    this.lastRefill = now;
  }

  consume(tokens: number = 1): boolean {
    this.refill();

    if (this.tokens >= tokens) {
      this.tokens -= tokens;
      return true;
    }
    return false;
  }

  getState(): { tokens: number; capacity: number } {
    this.refill();
    return { tokens: this.tokens, capacity: this.capacity };
  }
}

// 用法:100请求/分钟,突发10个
const bucket = new TokenBucket(10, 100 / 60);

滑动窗口日志算法:

通过跟踪单个请求时间戳实现精确速率限制。

class SlidingWindowLog {
  private requests: number[] = [];

  constructor(
    private windowMs: number, // 窗口大小(毫秒)
    private maxRequests: number, // 每个窗口最大请求数
  ) {}

  isAllowed(): boolean {
    const now = Date.now();
    const windowStart = now - this.windowMs;

    // 移除过期条目
    this.requests = this.requests.filter((ts) => ts > windowStart);

    if (this.requests.length < this.maxRequests) {
      this.requests.push(now);
      return true;
    }

    return false;
  }

  getRemainingRequests(): number {
    const now = Date.now();
    const windowStart = now - this.windowMs;
    this.requests = this.requests.filter((ts) => ts > windowStart);
    return Math.max(0, this.maxRequests - this.requests.length);
  }

  getResetTime(): number {
    if (this.requests.length === 0) return 0;
    return this.requests[0] + this.windowMs;
  }
}

滑动窗口计数器算法:

使用加权计数器实现内存高效近似。

class SlidingWindowCounter {
  private previousCount: number = 0;
  private currentCount: number = 0;
  private windowStart: number;

  constructor(
    private windowMs: number,
    private maxRequests: number,
  ) {
    this.windowStart = Date.now();
  }

  isAllowed(): boolean {
    const now = Date.now();
    const elapsed = now - this.windowStart;

    // 检查是否移动到新窗口
    if (elapsed >= this.windowMs) {
      const windowsPassed = Math.floor(elapsed / this.windowMs);
      if (windowsPassed === 1) {
        this.previousCount = this.currentCount;
      } else {
        this.previousCount = 0;
      }
      this.currentCount = 0;
      this.windowStart = now - (elapsed % this.windowMs);
    }

    // 计算加权计数
    const windowProgress = (now - this.windowStart) / this.windowMs;
    const weightedCount =
      this.previousCount * (1 - windowProgress) + this.currentCount;

    if (weightedCount < this.maxRequests) {
      this.currentCount++;
      return true;
    }

    return false;
  }
}

漏桶算法:

通过以恒定速率处理请求来平滑突发。

class LeakyBucket {
  private queue: Array<() => void> = [];
  private processing: boolean = false;

  constructor(
    private capacity: number, // 队列大小
    private leakRate: number, // 每秒处理的请求数
  ) {}

  async add(request: () => Promise<void>): Promise<boolean> {
    if (this.queue.length >= this.capacity) {
      return false; // 队列满,拒绝
    }

    return new Promise((resolve) => {
      this.queue.push(async () => {
        await request();
        resolve(true);
      });

      this.processQueue();
    });
  }

  private async processQueue(): Promise<void> {
    if (this.processing || this.queue.length === 0) return;

    this.processing = true;

    while (this.queue.length > 0) {
      const request = this.queue.shift();
      if (request) {
        await request();
        await this.delay(1000 / this.leakRate);
      }
    }

    this.processing = false;
  }

  private delay(ms: number): Promise<void> {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }
}

API配额管理

分层配额系统:

interface QuotaTier {
  name: string;
  limits: {
    requestsPerMinute: number;
    requestsPerDay: number;
    burstSize: number;
  };
  features: string[];
}

const quotaTiers: Record<string, QuotaTier> = {
  free: {
    name: "免费",
    limits: {
      requestsPerMinute: 10,
      requestsPerDay: 1000,
      burstSize: 5,
    },
    features: ["基础API"],
  },
  pro: {
    name: "专业版",
    limits: {
      requestsPerMinute: 100,
      requestsPerDay: 50000,
      burstSize: 20,
    },
    features: ["基础API", "高级API", "Webhooks"],
  },
  enterprise: {
    name: "企业版",
    limits: {
      requestsPerMinute: 1000,
      requestsPerDay: 1000000,
      burstSize: 100,
    },
    features: ["基础API", "高级API", "Webhooks", "批量API"],
  },
};

class QuotaManager {
  constructor(private redis: Redis) {}

  async checkQuota(
    userId: string,
    tier: string,
  ): Promise<{ allowed: boolean; remaining: number; resetAt: number }> {
    const config = quotaTiers[tier];
    if (!config) throw new Error(`未知层级: ${tier}`);

    const minuteKey = `quota:${userId}:minute`;
    const dayKey = `quota:${userId}:day`;

    const [minuteCount, dayCount] = await Promise.all([
      this.redis.get(minuteKey),
      this.redis.get(dayKey),
    ]);

    const currentMinute = parseInt(minuteCount || "0");
    const currentDay = parseInt(dayCount || "0");

    if (currentMinute >= config.limits.requestsPerMinute) {
      const ttl = await this.redis.ttl(minuteKey);
      return { allowed: false, remaining: 0, resetAt: Date.now() + ttl * 1000 };
    }

    if (currentDay >= config.limits.requestsPerDay) {
      const ttl = await this.redis.ttl(dayKey);
      return { allowed: false, remaining: 0, resetAt: Date.now() + ttl * 1000 };
    }

    // 增加计数器
    const pipeline = this.redis.pipeline();
    pipeline.incr(minuteKey);
    pipeline.expire(minuteKey, 60);
    pipeline.incr(dayKey);
    pipeline.expire(dayKey, 86400);
    await pipeline.exec();

    return {
      allowed: true,
      remaining: config.limits.requestsPerMinute - currentMinute - 1,
      resetAt: Date.now() + 60000,
    };
  }
}

每用户vs每IP限制

组合策略:

interface RateLimitConfig {
  authenticated: {
    requestsPerMinute: number;
    requestsPerHour: number;
  };
  anonymous: {
    requestsPerMinute: number;
    requestsPerHour: number;
  };
  ipBased: {
    requestsPerMinute: number;
    maxConnectionsPerIP: number;
  };
}

class HybridRateLimiter {
  constructor(
    private redis: Redis,
    private config: RateLimitConfig,
  ) {}

  async check(
    ip: string,
    userId?: string,
  ): Promise<{ allowed: boolean; retryAfter?: number }> {
    // 首先检查基于IP的限制(DDoS防护)
    const ipResult = await this.checkIPLimit(ip);
    if (!ipResult.allowed) {
      return ipResult;
    }

    // 然后检查用户或匿名限制
    if (userId) {
      return this.checkUserLimit(userId);
    } else {
      return this.checkAnonymousLimit(ip);
    }
  }

  private async checkIPLimit(
    ip: string,
  ): Promise<{ allowed: boolean; retryAfter?: number }> {
    const key = `ratelimit:ip:${ip}`;
    const count = await this.redis.incr(key);

    if (count === 1) {
      await this.redis.expire(key, 60);
    }

    if (count > this.config.ipBased.requestsPerMinute) {
      const ttl = await this.redis.ttl(key);
      return { allowed: false, retryAfter: ttl };
    }

    return { allowed: true };
  }

  private async checkUserLimit(
    userId: string,
  ): Promise<{ allowed: boolean; retryAfter?: number }> {
    const minuteKey = `ratelimit:user:${userId}:minute`;
    const hourKey = `ratelimit:user:${userId}:hour`;

    const [minuteCount, hourCount] = await Promise.all([
      this.incrementWithExpiry(minuteKey, 60),
      this.incrementWithExpiry(hourKey, 3600),
    ]);

    if (minuteCount > this.config.authenticated.requestsPerMinute) {
      const ttl = await this.redis.ttl(minuteKey);
      return { allowed: false, retryAfter: ttl };
    }

    if (hourCount > this.config.authenticated.requestsPerHour) {
      const ttl = await this.redis.ttl(hourKey);
      return { allowed: false, retryAfter: ttl };
    }

    return { allowed: true };
  }

  private async incrementWithExpiry(
    key: string,
    expiry: number,
  ): Promise<number> {
    const count = await this.redis.incr(key);
    if (count === 1) {
      await this.redis.expire(key, expiry);
    }
    return count;
  }
}

基于Redis的速率限制

Redis提供分布式速率限制,具有原子操作和在多个应用实例上的高性能。

分布式速率限制与Redis

基于Redis的滑动窗口:

import Redis from "ioredis";

class DistributedRateLimiter {
  private redis: Redis;

  constructor(redisUrl: string) {
    this.redis = new Redis(redisUrl);
  }

  async isAllowed(
    key: string,
    limit: number,
    windowSeconds: number,
  ): Promise<{ allowed: boolean; remaining: number; resetAt: number }> {
    const now = Date.now();
    const windowStart = now - windowSeconds * 1000;
    const redisKey = `ratelimit:${key}`;

    // 使用Lua脚本进行原子操作
    const luaScript = `
      local key = KEYS[1]
      local now = tonumber(ARGV[1])
      local window_start = tonumber(ARGV[2])
      local limit = tonumber(ARGV[3])
      local window_seconds = tonumber(ARGV[4])

      -- 移除旧条目
      redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)

      -- 计数当前条目
      local count = redis.call('ZCARD', key)

      if count < limit then
        -- 添加新条目
        redis.call('ZADD', key, now, now .. '-' .. math.random())
        redis.call('EXPIRE', key, window_seconds)
        return {1, limit - count - 1}
      else
        -- 获取最旧条目以获取重置时间
        local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
        local reset_at = oldest[2] and (oldest[2] + window_seconds * 1000) or (now + window_seconds * 1000)
        return {0, 0, reset_at}
      end
    `;

    const result = (await this.redis.eval(
      luaScript,
      1,
      redisKey,
      now.toString(),
      windowStart.toString(),
      limit.toString(),
      windowSeconds.toString(),
    )) as number[];

    return {
      allowed: result[0] === 1,
      remaining: result[1],
      resetAt: result[2] || now + windowSeconds * 1000,
    };
  }
}

Redis集群支持:

class ClusterRateLimiter {
  private cluster: Redis.Cluster;

  constructor(nodes: { host: string; port: number }[]) {
    this.cluster = new Redis.Cluster(nodes, {
      redisOptions: {
        password: process.env.REDIS_PASSWORD,
      },
      scaleReads: "slave",
    });
  }

  async checkLimit(
    identifier: string,
    limit: number,
    windowMs: number,
  ): Promise<boolean> {
    // 使用哈希标签确保所有用户键位于同一槽
    const key = `{ratelimit:${identifier}}:counter`;

    const count = await this.cluster.incr(key);
    if (count === 1) {
      await this.cluster.pexpire(key, windowMs);
    }

    return count <= limit;
  }
}

API网关速率限制

API网关提供跨微服务的集中式速率限制。

Kong速率限制

# Kong配置
plugins:
  - name: rate-limiting
    config:
      minute: 100
      hour: 10000
      policy: redis
      redis_host: redis.example.com
      redis_port: 6379
      fault_tolerant: true
      hide_client_headers: false

Nginx速率限制

# Nginx配置
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req_zone $http_authorization zone=user:10m rate=100r/s;

server {
    location /api/ {
        # 突发允许20请求,然后强制速率
        limit_req zone=api burst=20 nodelay;
        limit_req_status 429;

        # 自定义速率限制头
        add_header X-RateLimit-Limit $limit_req_rate always;
        add_header X-RateLimit-Remaining $limit_req_remaining always;

        proxy_pass http://backend;
    }

    location /api/authenticated/ {
        limit_req zone=user burst=50;
        proxy_pass http://backend;
    }
}

AWS API网关节流

// AWS CDK配置
import * as apigateway from "aws-cdk-lib/aws-apigateway";

const api = new apigateway.RestApi(this, "MyApi", {
  deployOptions: {
    throttlingRateLimit: 1000, // 每秒请求数
    throttlingBurstLimit: 2000, // 突发容量
  },
});

// 每个方法节流
const resource = api.root.addResource("users");
resource.addMethod("GET", integration, {
  methodResponses: [{ statusCode: "200" }, { statusCode: "429" }],
  throttling: {
    rateLimit: 100,
    burstLimit: 200,
  },
});

// 每个客户端API密钥节流
const plan = api.addUsagePlan("BasicPlan", {
  throttle: {
    rateLimit: 10,
    burstLimit: 20,
  },
  quota: {
    limit: 10000,
    period: apigateway.Period.MONTH,
  },
});

Envoy速率限制

# Envoy代理配置
static_resources:
  listeners:
    - name: listener_0
      address:
        socket_address:
          address: 0.0.0.0
          port_value: 8080
      filter_chains:
        - filters:
            - name: envoy.filters.network.http_connection_manager
              typed_config:
                "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
                http_filters:
                  - name: envoy.filters.http.ratelimit
                    typed_config:
                      "@type": type.googleapis.com/envoy.extensions.filters.http.ratelimit.v3.RateLimit
                      domain: api_domain
                      rate_limit_service:
                        grpc_service:
                          envoy_grpc:
                            cluster_name: rate_limit_cluster
                      failure_mode_deny: false

分布式系统考虑

多区域速率限制

具有Redis集群的全局速率限制器:

class GlobalRateLimiter {
  private regions: Map<string, Redis.Cluster>;
  private localCache: Map<string, { count: number; expires: number }>;

  constructor(redisConfig: Record<string, Redis.ClusterNode[]>) {
    this.regions = new Map();
    this.localCache = new Map();

    for (const [region, nodes] of Object.entries(redisConfig)) {
      this.regions.set(region, new Redis.Cluster(nodes));
    }
  }

  async checkLimit(
    userId: string,
    limit: number,
    windowSeconds: number,
  ): Promise<boolean> {
    // 首先检查本地缓存(减少Redis调用)
    const cached = this.localCache.get(userId);
    if (cached && cached.expires > Date.now()) {
      if (cached.count >= limit) {
        return false;
      }
      cached.count++;
      return true;
    }

    // 聚合所有区域的计数
    const promises = Array.from(this.regions.values()).map((redis) =>
      redis.get(`ratelimit:${userId}`).then((v) => parseInt(v || "0")),
    );

    const counts = await Promise.all(promises);
    const totalCount = counts.reduce((sum, c) => sum + c, 0);

    if (totalCount >= limit) {
      return false;
    }

    // 在当前区域增加
    const currentRegion = this.regions.get(process.env.REGION!)!;
    const key = `ratelimit:${userId}`;
    await currentRegion.incr(key);
    await currentRegion.expire(key, windowSeconds);

    // 更新本地缓存
    this.localCache.set(userId, {
      count: totalCount + 1,
      expires: Date.now() + 1000, // 缓存1秒
    });

    return true;
  }
}

最终一致性权衡

分布式系统中的速率限制涉及权衡:

  • 强一致性:准确的限制,更高的延迟(全局协调)
  • 最终一致性:更低的延迟,潜在的超限(区域独立)
  • 混合方法:本地缓存与定期同步
interface RateLimitStrategy {
  consistency: "strong" | "eventual" | "hybrid";
  tolerancePercent: number; // 可接受的超限百分比
}

class HybridDistributedLimiter {
  private localCount: Map<string, number> = new Map();
  private globalSync: Redis;
  private syncInterval: number = 5000; // 5秒

  constructor(
    redis: Redis,
    private strategy: RateLimitStrategy,
  ) {
    this.globalSync = redis;
    this.startSyncLoop();
  }

  async checkLimit(userId: string, limit: number): Promise<boolean> {
    const local = this.localCount.get(userId) || 0;

    if (this.strategy.consistency === "strong") {
      // 总是检查全局状态
      const global = await this.getGlobalCount(userId);
      return global < limit;
    }

    // 最终一致性:允许本地缓冲
    const buffer = Math.ceil((limit * this.strategy.tolerancePercent) / 100);
    const effectiveLimit = limit + buffer;

    if (local >= effectiveLimit) {
      // 超过本地缓冲,检查全局
      const global = await this.getGlobalCount(userId);
      if (global >= limit) {
        return false;
      }
    }

    this.localCount.set(userId, local + 1);
    return true;
  }

  private async getGlobalCount(userId: string): Promise<number> {
    const count = await this.globalSync.get(`global:${userId}`);
    return parseInt(count || "0");
  }

  private startSyncLoop(): void {
    setInterval(async () => {
      for (const [userId, count] of this.localCount.entries()) {
        await this.globalSync.incrby(`global:${userId}`, count);
        this.localCount.set(userId, 0);
      }
    }, this.syncInterval);
  }
}

背压模式

具有速率限制的断路器:

enum CircuitState {
  CLOSED = "CLOSED",
  OPEN = "OPEN",
  HALF_OPEN = "HALF_OPEN",
}

class CircuitBreaker {
  private state: CircuitState = CircuitState.CLOSED;
  private failures: number = 0;
  private lastFailure: number = 0;
  private successCount: number = 0;

  constructor(
    private failureThreshold: number = 5,
    private resetTimeout: number = 30000,
    private halfOpenSuccessThreshold: number = 3,
  ) {}

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.state === CircuitState.OPEN) {
      if (Date.now() - this.lastFailure >= this.resetTimeout) {
        this.state = CircuitState.HALF_OPEN;
        this.successCount = 0;
      } else {
        throw new Error("断路器打开");
      }
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  private onSuccess(): void {
    if (this.state === CircuitState.HALF_OPEN) {
      this.successCount++;
      if (this.successCount >= this.halfOpenSuccessThreshold) {
        this.state = CircuitState.CLOSED;
        this.failures = 0;
      }
    } else {
      this.failures = 0;
    }
  }

  private onFailure(): void {
    this.failures++;
    this.lastFailure = Date.now();

    if (this.failures >= this.failureThreshold) {
      this.state = CircuitState.OPEN;
    }
  }

  getState(): CircuitState {
    return this.state;
  }
}

自适应速率限制:

class AdaptiveRateLimiter {
  private currentLimit: number;
  private successRate: number = 1;
  private window: { success: boolean; timestamp: number }[] = [];

  constructor(
    private minLimit: number,
    private maxLimit: number,
    private targetSuccessRate: number = 0.95,
    private windowSize: number = 100,
  ) {
    this.currentLimit = maxLimit;
  }

  recordResult(success: boolean): void {
    const now = Date.now();
    this.window.push({ success, timestamp: now });

    // 保持窗口大小可管理
    if (this.window.length > this.windowSize) {
      this.window.shift();
    }

    this.adjustLimit();
  }

  private adjustLimit(): void {
    if (this.window.length < 10) return;

    const successes = this.window.filter((r) => r.success).length;
    this.successRate = successes / this.window.length;

    if (this.successRate < this.targetSuccessRate) {
      // 成功率下降时减少限制
      this.currentLimit = Math.max(
        this.minLimit,
        Math.floor(this.currentLimit * 0.9),
      );
    } else if (this.successRate > this.targetSuccessRate + 0.02) {
      // 稳定时缓慢增加限制
      this.currentLimit = Math.min(
        this.maxLimit,
        Math.ceil(this.currentLimit * 1.05),
      );
    }
  }

  getCurrentLimit(): number {
    return this.currentLimit;
  }

  getSuccessRate(): number {
    return this.successRate;
  }
}

速率限制头和客户端通信

标准头:

interface RateLimitInfo {
  limit: number;
  remaining: number;
  reset: number; // Unix时间戳
  retryAfter?: number; // 秒数
}

function setRateLimitHeaders(res: Response, info: RateLimitInfo): void {
  // 标准头
  res.setHeader("X-RateLimit-Limit", info.limit.toString());
  res.setHeader("X-RateLimit-Remaining", info.remaining.toString());
  res.setHeader("X-RateLimit-Reset", info.reset.toString());

  // 草案IETF标准头
  res.setHeader("RateLimit-Limit", info.limit.toString());
  res.setHeader("RateLimit-Remaining", info.remaining.toString());
  res.setHeader("RateLimit-Reset", info.reset.toString());

  if (info.retryAfter !== undefined) {
    res.setHeader("Retry-After", info.retryAfter.toString());
  }
}

// 429错误的响应体
interface RateLimitErrorResponse {
  error: {
    code: "RATE_LIMIT_EXCEEDED";
    message: string;
    retryAfter: number;
    limit: number;
    resetAt: string;
  };
}

function createRateLimitError(info: RateLimitInfo): RateLimitErrorResponse {
  return {
    error: {
      code: "RATE_LIMIT_EXCEEDED",
      message: `速率限制超出。请在${info.retryAfter}秒后重试。`,
      retryAfter: info.retryAfter!,
      limit: info.limit,
      resetAt: new Date(info.reset * 1000).toISOString(),
    },
  };
}

Express中间件:

import { Request, Response, NextFunction } from "express";

interface RateLimiterOptions {
  windowMs: number;
  max: number;
  keyGenerator?: (req: Request) => string;
  skip?: (req: Request) => boolean;
  handler?: (req: Request, res: Response) => void;
}

function createRateLimiter(options: RateLimiterOptions) {
  const limiter = new DistributedRateLimiter(process.env.REDIS_URL!);

  return async (req: Request, res: Response, next: NextFunction) => {
    // 如果配置则跳过
    if (options.skip?.(req)) {
      return next();
    }

    const key = options.keyGenerator?.(req) || req.ip;
    const result = await limiter.isAllowed(
      key,
      options.max,
      options.windowMs / 1000,
    );

    const info: RateLimitInfo = {
      limit: options.max,
      remaining: result.remaining,
      reset: Math.ceil(result.resetAt / 1000),
    };

    setRateLimitHeaders(res, info);

    if (!result.allowed) {
      info.retryAfter = Math.ceil((result.resetAt - Date.now()) / 1000);

      if (options.handler) {
        return options.handler(req, res);
      }

      return res.status(429).json(createRateLimitError(info));
    }

    next();
  };
}

// 用法
app.use(
  "/api/",
  createRateLimiter({
    windowMs: 60 * 1000, // 1分钟
    max: 100,
    keyGenerator: (req) => req.user?.id || req.ip,
    skip: (req) => req.path === "/api/health",
  }),
);

优雅降级

基于优先级的降级:

enum RequestPriority {
  CRITICAL = 1, // 健康检查、认证
  HIGH = 2, // 用户发起的操作
  NORMAL = 3, // 常规API调用
  LOW = 4, // 后台任务
  BATCH = 5, // 批量操作
}

class PriorityRateLimiter {
  private limiters: Map<RequestPriority, TokenBucket> = new Map();
  private systemLoad: number = 0;

  constructor() {
    // 每个优先级的不同限制
    this.limiters.set(RequestPriority.CRITICAL, new TokenBucket(1000, 100));
    this.limiters.set(RequestPriority.HIGH, new TokenBucket(500, 50));
    this.limiters.set(RequestPriority.NORMAL, new TokenBucket(200, 20));
    this.limiters.set(RequestPriority.LOW, new TokenBucket(50, 5));
    this.limiters.set(RequestPriority.BATCH, new TokenBucket(10, 1));
  }

  setSystemLoad(load: number): void {
    this.systemLoad = Math.max(0, Math.min(1, load));
  }

  canProcess(priority: RequestPriority): boolean {
    // 高负载下,拒绝低优先级请求
    if (this.systemLoad > 0.8 && priority > RequestPriority.HIGH) {
      return false;
    }
    if (this.systemLoad > 0.9 && priority > RequestPriority.CRITICAL) {
      return false;
    }

    const limiter = this.limiters.get(priority);
    return limiter?.consume() ?? false;
  }
}

功能标志集成:

class DegradationManager {
  private degradedFeatures: Set<string> = new Set();

  async checkAndDegrade(
    feature: string,
    fallback: () => Promise<any>,
  ): Promise<any> {
    if (this.degradedFeatures.has(feature)) {
      return fallback();
    }
    return null; // 继续正常执行
  }

  enableDegradedMode(feature: string): void {
    this.degradedFeatures.add(feature);
    console.log(`降级模式启用: ${feature}`);
  }

  disableDegradedMode(feature: string): void {
    this.degradedFeatures.delete(feature);
    console.log(`降级模式禁用: ${feature}`);
  }

  isDegraded(feature: string): boolean {
    return this.degradedFeatures.has(feature);
  }
}

最佳实践

算法选择

  • 令牌桶:最适合允许突发同时保持平均速率
  • 滑动窗口:最适合精确速率限制,无突发
  • 漏桶:最适合平滑到下游服务的流量

Redis配置

  • 使用Redis集群实现高可用性
  • 设置适当的内存限制和逐出策略
  • 使用Lua脚本进行原子操作
  • 监控Redis延迟和连接池

客户端体验

  • 总是返回速率限制头
  • 提供清晰的错误消息和重试时间
  • 考虑实施客户端速率限制
  • 在API文档中记录速率限制

监控

  • 按端点和用户跟踪速率限制命中
  • 对速率限制突然激增发出警报
  • 监控分布式攻击模式
  • 记录速率限制事件以供调试

安全性

  • 实施基于IP的限制作为第一道防线
  • 对合法用户使用认证速率限制
  • 考虑特定区域的速率限制以应对区域滥用
  • 对可疑模式实施CAPTCHA

示例

完整Express速率限制设置

import express from "express";
import Redis from "ioredis";

const app = express();
const redis = new Redis(process.env.REDIS_URL);

// 速率限制器工厂
function rateLimiter(config: {
  points: number;
  duration: number;
  keyPrefix: string;
}) {
  return async (
    req: express.Request,
    res: express.Response,
    next: express.NextFunction,
  ) => {
    const key = `${config.keyPrefix}:${req.user?.id || req.ip}`;

    try {
      const current = await redis.incr(key);

      if (current === 1) {
        await redis.expire(key, config.duration);
      }

      const ttl = await redis.ttl(key);
      const remaining = Math.max(0, config.points - current);

      res.set({
        "X-RateLimit-Limit": config.points.toString(),
        "X-RateLimit-Remaining": remaining.toString(),
        "X-RateLimit-Reset": (Math.floor(Date.now() / 1000) + ttl).toString(),
      });

      if (current > config.points) {
        res.set("Retry-After", ttl.toString());
        return res.status(429).json({
          error: "请求过多",
          retryAfter: ttl,
        });
      }

      next();
    } catch (error) {
      // 如果Redis不可用,则故障开放
      console.error("速率限制器错误:", error);
      next();
    }
  };
}

// 对不同端点应用不同限制
app.use(
  "/api/auth",
  rateLimiter({ points: 5, duration: 60, keyPrefix: "auth" }),
);
app.use(
  "/api/search",
  rateLimiter({ points: 30, duration: 60, keyPrefix: "search" }),
);
app.use("/api", rateLimiter({ points: 100, duration: 60, keyPrefix: "api" }));

app.listen(3000);

客户端速率限制处理

class APIClient {
  private retryAfter: number = 0;

  async request<T>(url: string, options?: RequestInit): Promise<T> {
    // 检查是否处于速率限制状态
    if (this.retryAfter > Date.now()) {
      const waitTime = this.retryAfter - Date.now();
      throw new Error(
        `速率限制。请在${Math.ceil(waitTime / 1000)}秒后重试`,
      );
    }

    const response = await fetch(url, options);

    // 从头更新速率限制状态
    const remaining = response.headers.get("X-RateLimit-Remaining");
    const reset = response.headers.get("X-RateLimit-Reset");

    if (response.status === 429) {
      const retryAfter = response.headers.get("Retry-After");
      this.retryAfter = Date.now() + parseInt(retryAfter || "60") * 1000;

      const body = await response.json();
      throw new RateLimitError(body.error, parseInt(retryAfter || "60"));
    }

    if (!response.ok) {
      throw new Error(`HTTP ${response.status}`);
    }

    return response.json();
  }

  async requestWithRetry<T>(
    url: string,
    options?: RequestInit,
    maxRetries: number = 3,
  ): Promise<T> {
    let lastError: Error | null = null;

    for (let i = 0; i < maxRetries; i++) {
      try {
        return await this.request<T>(url, options);
      } catch (error) {
        lastError = error as Error;

        if (error instanceof RateLimitError) {
          // 等待重试后期间
          await this.delay(error.retryAfter * 1000);
        } else {
          // 其他错误的指数退避
          await this.delay(Math.pow(2, i) * 1000);
        }
      }
    }

    throw lastError;
  }

  private delay(ms: number): Promise<void> {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }
}

class RateLimitError extends Error {
  constructor(
    message: string,
    public retryAfter: number,
  ) {
    super(message);
    this.name = "RateLimitError";
  }
}