Redis高级模式与应用 redis-patterns

本技能涵盖了Redis的各种高级使用模式,包括缓存策略、滑动窗口速率限制、发布/订阅、流处理、Lua脚本原子操作等,适用于后端开发、数据缓存和事件处理场景。关键词:Redis, 缓存, 发布订阅, 流处理, Lua脚本, 速率限制, 后端开发, 数据模式, 事件驱动

后端开发 0 次安装 0 次浏览 更新于 3/8/2026

名称: redis-patterns 描述: Redis模式包括缓存策略、发布/订阅、用于事件处理的流、Lua脚本和数据结构

Redis模式

缓存策略

async function getUser(userId: string): Promise<User> {
  const cacheKey = `user:${userId}`;
  const cached = await redis.get(cacheKey);

  if (cached) {
    return JSON.parse(cached);
  }

  const user = await db.user.findUnique({ where: { id: userId } });
  if (user) {
    await redis.set(cacheKey, JSON.stringify(user), "EX", 3600);
  }

  return user;
}

async function invalidateUser(userId: string): Promise<void> {
  await redis.del(`user:${userId}`);
  await redis.del(`user:${userId}:orders`);
}

async function cacheAside<T>(
  key: string,
  ttlSeconds: number,
  fetcher: () => Promise<T>
): Promise<T> {
  const cached = await redis.get(key);
  if (cached) return JSON.parse(cached);

  const value = await fetcher();
  await redis.set(key, JSON.stringify(value), "EX", ttlSeconds);
  return value;
}

滑动窗口速率限制

async function isRateLimited(
  clientId: string,
  limit: number,
  windowSeconds: number
): Promise<boolean> {
  const key = `ratelimit:${clientId}`;
  const now = Date.now();
  const windowStart = now - windowSeconds * 1000;

  const pipe = redis.multi();
  pipe.zremrangebyscore(key, 0, windowStart);
  pipe.zadd(key, now, `${now}:${crypto.randomUUID()}`);
  pipe.zcard(key);
  pipe.expire(key, windowSeconds);

  const results = await pipe.exec();
  const count = results[2][1] as number;
  return count > limit;
}

发布/订阅

const subscriber = redis.duplicate();
await subscriber.subscribe("notifications", "orders");

subscriber.on("message", (channel, message) => {
  const event = JSON.parse(message);
  switch (channel) {
    case "notifications":
      handleNotification(event);
      break;
    case "orders":
      handleOrderEvent(event);
      break;
  }
});

async function publishEvent(channel: string, event: object): Promise<void> {
  await redis.publish(channel, JSON.stringify(event));
}

用于事件处理的流

async function produceEvent(stream: string, event: Record<string, string>) {
  await redis.xadd(stream, "*", ...Object.entries(event).flat());
}

async function consumeEvents(
  stream: string,
  group: string,
  consumer: string
) {
  try {
    await redis.xgroup("CREATE", stream, group, "0", "MKSTREAM");
  } catch {
    // 组已存在
  }

  while (true) {
    const results = await redis.xreadgroup(
      "GROUP", group, consumer,
      "COUNT", 10,
      "BLOCK", 5000,
      "STREAMS", stream, ">"
    );

    if (!results) continue;

    for (const [, messages] of results) {
      for (const [id, fields] of messages) {
        await processMessage(fields);
        await redis.xack(stream, group, id);
      }
    }
  }
}

流提供基于消费者组的持久事件处理,带有确认和重放功能。

Lua脚本用于原子操作

const acquireLock = `
  local key = KEYS[1]
  local token = ARGV[1]
  local ttl = ARGV[2]
  if redis.call("SET", key, token, "NX", "EX", ttl) then
    return 1
  end
  return 0
`;

const releaseLock = `
  local key = KEYS[1]
  local token = ARGV[1]
  if redis.call("GET", key) == token then
    return redis.call("DEL", key)
  end
  return 0
`;

async function withLock<T>(
  resource: string,
  ttl: number,
  fn: () => Promise<T>
): Promise<T> {
  const token = crypto.randomUUID();
  const acquired = await redis.eval(acquireLock, 1, `lock:${resource}`, token, ttl);
  if (!acquired) throw new Error("获取锁失败");
  try {
    return await fn();
  } finally {
    await redis.eval(releaseLock, 1, `lock:${resource}`, token);
  }
}

反模式

  • 在Redis中存储大对象(>100KB)而不压缩
  • 在生产中使用KEYS *(阻塞服务器;使用SCAN代替)
  • 不为缓存条目设置TTL(内存无限增长)
  • 使用发布/订阅进行持久消息传递(如果没有订阅者连接,消息会丢失)
  • 依赖Redis作为唯一数据存储而没有持久化策略
  • 不对多个顺序命令使用管道

检查清单

  • [ ] 缓存键遵循一致的命名约定(实体:id:字段
  • [ ] 所有缓存条目都有TTL以防止内存泄漏
  • [ ] 在生产中使用SCAN代替KEYS进行模式匹配
  • [ ] 对需要原子性的操作使用Lua脚本
  • [ ] 当需要持久性时使用流代替发布/订阅
  • [ ] 为高吞吐量应用程序配置连接池
  • [ ] 速率限制使用滑动窗口与有序集合
  • [ ] 分布式锁包括防护令牌和TTL