名称: redis-patterns 描述: Redis模式包括缓存策略、发布/订阅、用于事件处理的流、Lua脚本和数据结构
Redis模式
缓存策略
async function getUser(userId: string): Promise<User> {
const cacheKey = `user:${userId}`;
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
const user = await db.user.findUnique({ where: { id: userId } });
if (user) {
await redis.set(cacheKey, JSON.stringify(user), "EX", 3600);
}
return user;
}
async function invalidateUser(userId: string): Promise<void> {
await redis.del(`user:${userId}`);
await redis.del(`user:${userId}:orders`);
}
async function cacheAside<T>(
key: string,
ttlSeconds: number,
fetcher: () => Promise<T>
): Promise<T> {
const cached = await redis.get(key);
if (cached) return JSON.parse(cached);
const value = await fetcher();
await redis.set(key, JSON.stringify(value), "EX", ttlSeconds);
return value;
}
滑动窗口速率限制
async function isRateLimited(
clientId: string,
limit: number,
windowSeconds: number
): Promise<boolean> {
const key = `ratelimit:${clientId}`;
const now = Date.now();
const windowStart = now - windowSeconds * 1000;
const pipe = redis.multi();
pipe.zremrangebyscore(key, 0, windowStart);
pipe.zadd(key, now, `${now}:${crypto.randomUUID()}`);
pipe.zcard(key);
pipe.expire(key, windowSeconds);
const results = await pipe.exec();
const count = results[2][1] as number;
return count > limit;
}
发布/订阅
const subscriber = redis.duplicate();
await subscriber.subscribe("notifications", "orders");
subscriber.on("message", (channel, message) => {
const event = JSON.parse(message);
switch (channel) {
case "notifications":
handleNotification(event);
break;
case "orders":
handleOrderEvent(event);
break;
}
});
async function publishEvent(channel: string, event: object): Promise<void> {
await redis.publish(channel, JSON.stringify(event));
}
用于事件处理的流
async function produceEvent(stream: string, event: Record<string, string>) {
await redis.xadd(stream, "*", ...Object.entries(event).flat());
}
async function consumeEvents(
stream: string,
group: string,
consumer: string
) {
try {
await redis.xgroup("CREATE", stream, group, "0", "MKSTREAM");
} catch {
// 组已存在
}
while (true) {
const results = await redis.xreadgroup(
"GROUP", group, consumer,
"COUNT", 10,
"BLOCK", 5000,
"STREAMS", stream, ">"
);
if (!results) continue;
for (const [, messages] of results) {
for (const [id, fields] of messages) {
await processMessage(fields);
await redis.xack(stream, group, id);
}
}
}
}
流提供基于消费者组的持久事件处理,带有确认和重放功能。
Lua脚本用于原子操作
const acquireLock = `
local key = KEYS[1]
local token = ARGV[1]
local ttl = ARGV[2]
if redis.call("SET", key, token, "NX", "EX", ttl) then
return 1
end
return 0
`;
const releaseLock = `
local key = KEYS[1]
local token = ARGV[1]
if redis.call("GET", key) == token then
return redis.call("DEL", key)
end
return 0
`;
async function withLock<T>(
resource: string,
ttl: number,
fn: () => Promise<T>
): Promise<T> {
const token = crypto.randomUUID();
const acquired = await redis.eval(acquireLock, 1, `lock:${resource}`, token, ttl);
if (!acquired) throw new Error("获取锁失败");
try {
return await fn();
} finally {
await redis.eval(releaseLock, 1, `lock:${resource}`, token);
}
}
反模式
- 在Redis中存储大对象(>100KB)而不压缩
- 在生产中使用
KEYS *(阻塞服务器;使用SCAN代替) - 不为缓存条目设置TTL(内存无限增长)
- 使用发布/订阅进行持久消息传递(如果没有订阅者连接,消息会丢失)
- 依赖Redis作为唯一数据存储而没有持久化策略
- 不对多个顺序命令使用管道
检查清单
- [ ] 缓存键遵循一致的命名约定(
实体:id:字段) - [ ] 所有缓存条目都有TTL以防止内存泄漏
- [ ] 在生产中使用
SCAN代替KEYS进行模式匹配 - [ ] 对需要原子性的操作使用Lua脚本
- [ ] 当需要持久性时使用流代替发布/订阅
- [ ] 为高吞吐量应用程序配置连接池
- [ ] 速率限制使用滑动窗口与有序集合
- [ ] 分布式锁包括防护令牌和TTL