name: api-rate-limiting description: 实现 API 限流策略,使用令牌桶、滑动窗口和固定窗口算法。用于保护 API 免受滥用、管理流量或实现分层限流。
API 限流
概览
保护 API 免受滥用和使用各种限流算法管理流量,包括按用户、按 IP 和按端点策略。
使用场景
- 保护 API 免受暴力破解攻击
- 管理流量高峰
- 实施分层服务计划
- 防止 DoS 攻击
- 资源分配的公平性
- 强制执行配额和使用限制
指南
1. 令牌桶算法
// 令牌桶限流器
class TokenBucket {
constructor(capacity, refillRate) {
this.capacity = capacity;
this.tokens = capacity;
this.refillRate = refillRate; // 每秒令牌数
this.lastRefillTime = Date.now();
}
refill() {
const now = Date.now();
const timePassed = (now - this.lastRefillTime) / 1000;
const tokensToAdd = timePassed * this.refillRate;
this.tokens = Math.min(this.capacity, this.tokens + tokensToAdd);
this.lastRefillTime = now;
}
consume(tokens = 1) {
this.refill();
if (this.tokens >= tokens) {
this.tokens -= tokens;
return true;
}
return false;
}
available() {
this.refill();
return Math.floor(this.tokens);
}
}
// Express 中间件
const express = require('express');
const app = express();
const rateLimiters = new Map();
const tokenBucketRateLimit = (capacity, refillRate) => {
return (req, res, next) => {
const key = req.user?.id || req.ip;
if (!rateLimiters.has(key)) {
rateLimiters.set(key, new TokenBucket(capacity, refillRate));
}
const limiter = rateLimiters.get(key);
if (limiter.consume(1)) {
res.setHeader('X-RateLimit-Limit', capacity);
res.setHeader('X-RateLimit-Remaining', limiter.available());
next();
} else {
res.status(429).json({
error: 'Rate limit exceeded',
retryAfter: Math.ceil(1 / limiter.refillRate)
});
}
};
};
app.get('/api/data', tokenBucketRateLimit(100, 10), (req, res) => {
res.json({ data: 'api response' });
});
2. 滑动窗口算法
class SlidingWindowLimiter {
constructor(maxRequests, windowSizeSeconds) {
this.maxRequests = maxRequests;
this.windowSize = windowSizeSeconds * 1000; // 转换为毫秒
this.requests = [];
}
isAllowed() {
const now = Date.now();
const windowStart = now - this.windowSize;
// 移除窗口外的旧请求
this.requests = this.requests.filter(time => time > windowStart);
if (this.requests.length < this.maxRequests) {
this.requests.push(now);
return true;
}
return false;
}
remaining() {
const now = Date.now();
const windowStart = now - this.windowSize;
this.requests = this.requests.filter(time => time > windowStart);
return Math.max(0, this.maxRequests - this.requests.length);
}
}
const slidingWindowRateLimit = (maxRequests, windowSeconds) => {
const limiters = new Map();
return (req, res, next) => {
const key = req.user?.id || req.ip;
if (!limiters.has(key)) {
limiters.set(key, new SlidingWindowLimiter(maxRequests, windowSeconds));
}
const limiter = limiters.get(key);
if (limiter.isAllowed()) {
res.setHeader('X-RateLimit-Limit', maxRequests);
res.setHeader('X-RateLimit-Remaining', limiter.remaining());
next();
} else {
res.status(429).json({ error: 'Rate limit exceeded' });
}
};
};
app.get('/api/search', slidingWindowRateLimit(30, 60), (req, res) => {
res.json({ results: [] });
});
3. 基于 Redis 的限流
const redis = require('redis');
const client = redis.createClient();
// 基于 Redis 的滑动窗口
const redisRateLimit = (maxRequests, windowSeconds) => {
return async (req, res, next) => {
const key = `ratelimit:${req.user?.id || req.ip}`;
const now = Date.now();
const windowStart = now - (windowSeconds * 1000);
try {
// 移除旧请求
await client.zremrangebyscore(key, 0, windowStart);
// 计算窗口内的请求数
const count = await client.zcard(key);
if (count < maxRequests) {
// 添加当前请求
await client.zadd(key, now, `${now}-${Math.random()}`);
// 设置过期时间
await client.expire(key, windowSeconds);
res.setHeader('X-RateLimit-Limit', maxRequests);
res.setHeader('X-RateLimit-Remaining', maxRequests - count - 1);
next();
} else {
const oldestRequest = await client.zrange(key, 0, 0);
const resetTime = parseInt(oldestRequest[0]) + (windowSeconds * 1000);
const retryAfter = Math.ceil((resetTime - now) / 1000);
res.set('Retry-After', retryAfter);
res.status(429).json({
error: 'Rate limit exceeded',
retryAfter
});
}
} catch (error) {
console.error('Rate limit error:', error);
next(); // 如果 Redis 失败,则允许请求
}
};
};
app.get('/api/expensive', redisRateLimit(10, 60), (req, res) => {
res.json({ result: 'expensive operation' });
});
4. 分层限流
const RATE_LIMITS = {
free: { requests: 100, window: 3600 }, // 每小时 100 次
pro: { requests: 10000, window: 3600 }, // 每小时 10,000 次
enterprise: { requests: null, window: null } // 无限制
};
const tieredRateLimit = async (req, res, next) => {
const user = req.user;
const plan = user?.plan || 'free';
const limits = RATE_LIMITS[plan];
if (!limits.requests) {
return next(); // 无限制计划
}
const key = `ratelimit:${user.id}`;
const now = Date.now();
const windowStart = now - (limits.window * 1000);
try {
await client.zremrangebyscore(key, 0, windowStart);
const count = await client.zcard(key);
if (count < limits.requests) {
await client.zadd(key, now, `${now}-${Math.random()}`);
await client.expire(key, limits.window);
res.setHeader('X-RateLimit-Limit', limits.requests);
res.setHeader('X-RateLimit-Remaining', limits.requests - count - 1);
res.setHeader('X-Plan', plan);
next();
} else {
res.status(429).json({
error: 'Rate limit exceeded',
plan,
upgradeUrl: '/plans'
});
}
} catch (error) {
next();
}
};
app.use(tieredRateLimit);
5. Python 限流 (Flask)
from flask import Flask, request, jsonify
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from datetime import datetime, timedelta
import redis
app = Flask(__name__)
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
# 基于用户计划的自定义限流
redis_client = redis.Redis(host='localhost', port=6379)
def get_rate_limit(user_id):
plan = redis_client.get(f'user:{user_id}:plan').decode()
limits = {
'free': (100, 3600),
'pro': (10000, 3600),
'enterprise': (None, None)
}
return limits.get(plan, (100, 3600))
@app.route('/api/data', methods=['GET'])
@limiter.limit("30 per minute")
def get_data():
return jsonify({'data': 'api response'}), 200
@app.route('/api/premium', methods=['GET'])
def get_premium_data():
user_id = request.user_id
max_requests, window = get_rate_limit(user_id)
if max_requests is None:
return jsonify({'data': 'unlimited data'}), 200
key = f'ratelimit:{user_id}'
current = redis_client.incr(key)
redis_client.expire(key, window)
if current <= max_requests:
return jsonify({'data': 'premium data'}), 200
else:
return jsonify({'error': 'Rate limit exceeded'}), 429
6. 响应头
// 标准限流头
res.setHeader('X-RateLimit-Limit', maxRequests); // 允许的总请求数
res.setHeader('X-RateLimit-Remaining', remaining); // 剩余请求数
res.setHeader('X-RateLimit-Reset', resetTime); // 重置的 Unix 时间戳
res.setHeader('Retry-After', secondsToWait); // 等待时间
// 429 太多请求响应
{
"error": "Rate limit exceeded",
"code": "RATE_LIMIT_EXCEEDED",
"retryAfter": 60,
"resetAt": "2025-01-15T15:00:00Z"
}
最佳实践
✅ 要做
- 在响应中包含限流头
- 使用 Redis 进行分布式限流
- 为不同用户计划实施分层限制
- 设置适当的窗口大小和限制
- 监控限流指标
- 提供清晰的重试指导
- 在 API 文档中记录限流
- 在高负载下进行测试
❌ 不要做
- 在生产中使用内存存储
- 设置过于严格的限制
- 忘记包含 Retry-After 头
- 忽略分布式场景
- 公开限流(安全)
- 在分布式系统中使用简单计数器
- 忘记清理旧数据
监控
// 跟踪限流指标
const metrics = {
totalRequests: 0,
limitedRequests: 0,
byUser: new Map()
};
app.use((req, res, next) => {
metrics.totalRequests++;
res.on('finish', () => {
if (res.statusCode === 429) {
metrics.limitedRequests++;
}
});
next();
});
app.get('/metrics/rate-limit', (req, res) => {
res.json({
totalRequests: metrics.totalRequests,
limitedRequests: metrics.limitedRequests,
percentage: (metrics.limitedRequests / metrics.totalRequests * 100).toFixed(2)
});
});