RedisCachingPatterns RedisCachingPatterns

本技能提供 Redis 缓存的全面指南,包括设置、数据结构、缓存策略、键命名、TTL 管理、缓存失效、分布式缓存、发布/订阅模式、会话存储和限流等关键概念和实践。

后端开发 0 次安装 0 次浏览 更新于 3/5/2026

Redis 缓存模式

概览

Redis 是一个内存中的数据结构存储,可以用作数据库、缓存、消息代理和队列。本技能涵盖了 Redis 设置、数据结构、缓存策略、键命名约定、TTL 管理、缓存失效、分布式缓存、发布/订阅模式、会话存储和限流。

前提条件

  • 了解缓存概念
  • 了解数据结构(字符串、哈希、列表、集合、有序集合)
  • 熟悉 Node.js 或 Python
  • 了解 TTL(Time To Live)概念
  • 了解分布式系统基础

核心概念

Redis 数据结构

  • 字符串:二进制安全的字符串,大小可达 512MB
  • 哈希:字符串字段和字符串值之间的映射
  • 列表:按插入顺序排序的字符串元素集合
  • 集合:无序的唯一字符串集合
  • 有序集合:按相关分数排序的唯一字符串集合

缓存策略

  • 旁路缓存(懒加载):按需填充缓存
  • 写入时:与数据库同步更新缓存
  • 写入后:异步更新数据库的缓存
  • 读穿:由缓存提供程序填充缓存

关键设计模式

  • 键命名:层次结构、基于环境和基于模式的命名
  • TTL 管理:绝对 TTL、滑动 TTL、动态 TTL
  • 缓存失效:简单、基于模式、基于标签、基于哈希
  • 分布式缓存:Redis 集群、客户端分片

实施指南

Redis 设置

基本连接

// config/redis.ts
import { createClient } from 'redis'

const redisClient = createClient({
  url: process.env.REDIS_URL || 'redis://localhost:6379',
  socket: {
    host: process.env.REDIS_HOST || 'localhost',
    port: parseInt(process.env.REDIS_PORT || '6379'),
  },
  password: process.env.REDIS_PASSWORD,
  database: process.env.REDIS_DB || '0',
})

export { redisClient }
# config/redis.py
import redis
from typing import Optional

redis_client = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    password=Optional[str] = None,
    decode_responses=True
)

连接池

// config/redis-pool.ts
import { createClient } from 'redis'

const pool = createClient({
  url: process.env.REDIS_URL || 'redis://localhost:6379',
  socket: {
    host: process.env.REDIS_HOST || 'localhost',
    port: parseInt(process.env.REDIS_PORT || '6379'),
  },
  maxRetriesPerRequest: 3,
  retryDelayOnFailover: 100,
  lazyConnect: true,
})

export { pool }
# config/redis-pool.py
import redis
from redis.connection import ConnectionPool

pool = ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=50,
    decode_responses=True
)

def get_redis_client():
    return pool.get_connection()

数据结构

字符串

// 基本字符串操作
import { redisClient } from '../config/redis'

// 设置键
await redisClient.set('user:123', JSON.stringify({ name: 'John', email: 'john@example.com' }))

// 获取键
const user = await redisClient.get('user:123')
if (user) {
  console.log(JSON.parse(user))
}

// 设置带过期(TTL 秒)
await redisClient.set('session:abc', JSON.stringify({ userId: 123 }), {
  EX: 3600 // 1小时
})

// 设置带过期使用毫秒
await redisClient.set('temp:data', 'value', {
  PX: 60000 // 60秒
})

// 获取剩余 TTL
const ttl = await redisClient.ttl('session:abc')
console.log(`会话在 ${ttl} 秒后过期`)

// 删除键
await redisClient.del('user:123')
# 基本字符串操作
import redis_client

# 设置键
await redis_client.set('user:123', '{"name": "John", "email": "john@example.com"}')

# 获取键
user_data = await redis_client.get('user:123')
if user_data:
    import json
    print(json.loads(user_data))

# 设置带过期(TTL 秒)
await redis_client.setex('session:abc', json.dumps({"user_id": 123}), 3600)

# 获取剩余 TTL
ttl = await redis_client.ttl('session:abc')
print(f'会话在 {ttl} 秒后过期')

# 删除键
await redis_client.delete('user:123')

哈希

// 哈希操作
import { redisClient } from '../config/redis'

// 设置哈希字段
await redisClient.hSet('user:123', {
  name: 'John Doe',
  email: 'john@example.com',
  age: '30'
})

// 获取特定哈希字段
const name = await redisClient.hGet('user:123', 'name')

// 获取所有哈希字段
const user = await redisClient.hGetAll('user:123')

// 获取多个哈希字段
const fields = await redisClient.hMGet('user:123', ['name', 'email'])

// 设置多个哈希字段
await redisClient.hMSet('user:123', {
  name: 'Jane Smith',
  email: 'jane@example.com'
})

// 增加哈希字段
const newAge = await redisClient.hIncrBy('user:123', 'age', 1)

// 检查哈希字段是否存在
const hasName = await redisClient.hExists('user:123', 'name')

// 删除哈希字段
await redisClient.hDel('user:123', 'age')

// 获取哈希长度
const hashLength = await redisClient.hLen('user:123')

// 获取所有哈希键
const allUsers = await redisClient.keys('user:*')
# 哈希操作
import redis_client

# 设置哈希字段
await redis_client.hset('user:123', mapping={
    'name': 'John Doe',
    'email': 'john@example.com',
    'age': '30'
})

# 获取特定哈希字段
name = await redis_client.hget('user:123', 'name')

# 获取所有哈希字段
user = await redis_client.hgetall('user:123')

# 获取多个哈希字段
fields = await redis_client.hmget('user:123', ['name', 'email'])

# 设置多个哈希字段
await redis_client.hmset('user:123', {
    'name': 'Jane Smith',
    'email': 'jane@example.com'
})

# 增加哈希字段
new_age = await redis_client.hincrby('user:123', 'age', 1)

# 检查哈希字段是否存在
has_name = await redis_client.hexists('user:123', 'name')

# 删除哈希字段
await redis_client.hdel('user:123', 'age')

# 获取哈希长度
hash_length = await redis_client.hlen('user:123')

# 获取所有哈希键
all_users = await redis_client.keys('user:*')

列表

// 列表操作
import { redisClient } from '../config/redis'

// 向列表添加(左推)
await redisClient.lPush('recent:items', JSON.stringify({ id: 1, name: 'Item 1' }))

// 向列表添加(右推)
await redisClient.rPush('recent:items', JSON.stringify({ id: 2, name: 'Item 2' }))

// 获取列表范围(0 到 -1 表示全部)
const items = await redisClient.lRange('recent:items', 0, -1)

// 获取列表范围分页
const page1 = await redisClient.lRange('recent:items', 0, 9)
const page2 = await redisClient.lRange('recent:items', 10, 19)

// 获取列表长度
const length = await redisClient.lLen('recent:items')

// 修剪列表到 N 项
await redisClient.lTrim('recent:items', 0, 99)

// 从列表中移除
await redisClient.lRem('recent:items', 0)

// 获取索引处的元素
const firstItem = await redisClient.lIndex('recent:items', 0)
# 列表操作
import redis_client

# 向列表添加(左推)
await redis_client.lpush('recent:items', json.dumps({"id": 1, "name": "Item 1"}))

# 向列表添加(右推)
await redis_client.rpush('recent:items', json.dumps({"id": 2, "name": "Item 2"}))

# 获取列表范围(0 到 -1 表示全部)
items = await redis_client.lrange('recent:items', 0, -1)

# 获取列表范围分页
page1 = await redis_client.lrange('recent:items', 0, 9)
page2 = await redis_client.lrange('recent:items', 10, 19)

# 获取列表长度
length = await redis_client.llen('recent:items')

# 修剪列表到 N 项
await redis_client.ltrim('recent:items', 0, 99)

# 从列表中移除
await redis_client.lrem('recent:items', 0, 1)

# 获取索引处的元素
first_item = await redis_client.lindex('recent:items', 0)

集合

// 集合操作
import { redisClient } from '../config/redis'

// 向集合添加
await redisClient.sAdd('user:123:followers', 'user456')

// 向集合添加多个
await redisClient.sAdd('user:123:followers', ['user456', 'user789', 'user012'])

// 检查成员是否存在
const isFollower = await redisClient.sIsMember('user:123:followers', 'user456')

// 获取所有成员
const followers = await redisClient.sMembers('user:123:followers')

// 获取随机成员
const randomFollower = await redisClient.sRandMember('user:123:followers')

// 从集合中移除
await redisClient.sRem('user:123:followers', 'user456')

// 获取集合大小
const followerCount = await redisClient.sCard('user:123:followers')

// 获取多个集合的并集
const allUsers = await redisClient.sUnion('user:123:followers', 'user:456:following')
# 集合操作
import redis_client

# 向集合添加
await redis_client.sadd('user:123:followers', 'user456')

# 向集合添加多个
await redis_client.sadd('user:123:followers', ['user456', 'user789', 'user012'])

# 检查成员是否存在
is_follower = await redis_client.sismember('user:123:followers', 'user456')

# 获取所有成员
followers = await redis_client.smembers('user:123:followers')

# 获取随机成员
random_follower = await redis_client.srandmember('user:123:followers')

# 从集合中移除
await redis_client.srem('user:123:followers', 'user456')

# 获取集合大小
follower_count = await redis_client.scard('user:123:followers')

# 获取多个集合的并集
all_users = await redis_client.sunion('user:123:followers', 'user:456:following')

有序集合

// 有序集合操作
import { redisClient } from '../config/redis'

// 向有序集合添加
await redisClient.zAdd('leaderboard:scores', JSON.stringify({ userId: 1, score: 100 }))

// 向有序集合添加多个带分数
await redisClient.zAdd('leaderboard:scores', [
  JSON.stringify({ userId: 1, score: 100 }),
  JSON.stringify({ userId: 2, score: 95 }),
  JSON.stringify({ userId: 3, score: 90 })
])

// 按分数范围获取(升序)
const top10 = await redisClient.zRangeWithScores('leaderboard:scores', 0, 9, 'WITHSCORES')

// 按分数范围获取(降序)
const top10 = await redisClient.zRevRangeWithScores('leaderboard:scores', 0, 9, 'WITHSCORES')

// 获取成员的排名
const rank = await redis.zRevRank('leaderboard:scores', JSON.stringify({ userId: 1 }))

// 获取成员的分数
const score = await redis.zScore('leaderboard:scores', JSON.stringify({ userId: 1 }))

// 移除成员
await redis.zRem('leaderboard:scores', JSON.stringify({ userId: 1 }))
# 有序集合操作
import redis_client

# 向有序集合添加
await redis_client.zadd('leaderboard:scores', json.dumps({"user_id": 1, "score": 100}))

# 向有序集合添加多个带分数
await redis_client.zadd('leaderboard:scores', [
    json.dumps({"user_id": 1, "score": 100}),
    json.dumps({"user_id": 2, "score": 95}),
    json.dumps({"user_id": 3, "score": 90})
])

# 按分数范围获取(升序)
top_10 = await redis_client.zrange('leaderboard:scores', 0, 9, withscores=True)

# 按分数范围获取(降序)
top_10 = await redis_client.zrevrange('leaderboard:scores', 0, 9, withscores=True)

# 获取成员的排名
rank = await redis_client.zrevrank('leaderboard:scores', json.dumps({"user_id": 1}))

# 获取成员的分数
score = await redis_client.zscore('leaderboard:scores', json.dumps({"user_id": 1}))

# 移除成员
await redis_client.zrem('leaderboard:scores', json.dumps({"user_id": 1}))

缓存策略

旁路缓存(懒加载)

// services/cache.service.ts
import { redisClient } from '../config/redis'

interface CacheOptions {
  ttl?: number
}

export async function getFromCache<T>(
  key: string,
  options?: CacheOptions
): Promise<T | null> {
  const cached = await redisClient.get(key)
  
  if (cached) {
    return JSON.parse(cached)
  }
  
  return null
}

export async function setCache<T>(
  key: string,
  data: T,
  options?: CacheOptions
): Promise<void> {
  const value = JSON.stringify(data)
  
  if (options?.ttl) {
    await redisClient.set(key, value, {
      EX: options.ttl
    })
  } else {
    await redisClient.set(key, value)
  }
}

// 使用
async function getUser(userId: string) {
  const cacheKey = `user:${userId}`
  
  // 尝试从缓存中获取
  const cached = await getFromCache<User>(cacheKey)
  if (cached) {
    return cached
  }
  
  // 从数据库获取
  const user = await db.user.findUnique({ where: { id: userId } })
  
  // 在缓存中设置 1 小时 TTL
  await setCache(cacheKey, user, { ttl: 3600 })
  
  return user
}
# services/cache.service.py
import redis_client
import json

async def get_from_cache(key: str, ttl: int = None) -> dict | None:
    cached = await redis_client.get(key)
    
    if cached:
        return json.loads(cached)
    
    return None


async def set_cache(key: str, data: any, ttl: int = None) -> None:
    value = json.dumps(data)
    
    if ttl:
        await redis_client.setex(key, value, ttl)
    else:
        await redis_client.set(key, value)


async def get_user(user_id: str):
    cache_key = f"user:{user_id}"
    
    # 尝试从缓存中获取
    cached = await get_from_cache(cache_key)
    if cached:
        return cached
    
    # 从数据库获取
    user = await db.user.find_unique({"id": user_id})
    
    # 在缓存中设置 1 小时 TTL
    await set_cache(cache_key, user, ttl=3600)
    
    return user

写入时

// services/cache.service.ts
export async function updateWithCache<T>(
  key: string,
  updater: () => Promise<T>
): Promise<T> {
  // 缓存和数据库原子性更新
  const data = await updater()
  
  // 在缓存中设置
  await redisClient.set(key, JSON.stringify(data), {
    EX: 3600
  })
  
  return data
}

// 使用
async function updateUser(userId: string, updates: Partial<User>) {
  const cacheKey = `user:${userId}`
  
  const updatedUser = await updateWithCache(cacheKey, async () => {
    return await db.user.update({
      where: { id: userId },
      data: updates
    })
  })
  
  return updatedUser
}
async def update_with_cache(key: str, updater: callable) -> dict:
    # 缓存和数据库原子性更新
    data = await updater()
    
    # 在缓存中设置
    await redis_client.set(key, json.dumps(data), ex=3600)
    
    return data


async def update_user(user_id: str, updates: dict):
    cache_key = f"user:{user_id}"
    
    updated_user = await update_with_cache(cacheKey, lambda: db.user.update(
        {"id": user_id},
        updates=updates
    ))
    
    return updated_user

写入后(异步)

// services/cache.service.ts
import { redisClient } from '../config/redis'

export async function setCacheAsync<T>(
  key: string,
  data: T,
  ttl?: number
): Promise<void> {
  const value = JSON.stringify(data)
  
  // 在 Redis 中设置
  await redisClient.set(key, value, {
    EX: ttl || 3600
  })
  
  // 后台更新
  await db.user.update({
    where: { id: data.id },
    data: updates
  })
}

// 使用
async function createUser(userData: User) {
  // 先在数据库中创建
  const user = await db.user.create({ data: userData })
  
  // 异步在缓存中设置
  await setCacheAsync(`user:${user.id}`, user)
}
async def set_cache_async(key: str, data: dict, ttl: int = None) -> None:
    value = json.dumps(data)
    
    # 在 Redis 中设置
    await redis_client.set(key, value, ex=ttl or 3600)
    
    # 后台更新
    await db.user.create(data=data)


async def create_user(user_data: dict):
    # 先在数据库中创建
    user = await db.user.create(data=user_data)
    
    # 异步在缓存中设置
    await set_cache_async(f"user:{user['id']}", user)

写入后(同步)

// services/cache.service.ts
export async function setCacheSync<T>(
  key: string,
  data: T,
  ttl?: number
): Promise<void> {
  const value = JSON.stringify(data)
  
  // 更新数据库
  await db.user.create({ data })
  
  // 在 Redis 中同步设置
  await redisClient.set(key, value, {
    EX: ttl || 3600
  })
}

// 使用
async function createAndCacheUser(userData: User) {
  // 在数据库中创建
  const user = await db.user.create({ data: userData })
  
  // 在缓存中设置
  await setCacheSync(`user:${user.id}`, user)
}
async def set_cache_sync(key: str, data: dict, ttl: int = None) -> None:
    value = json.dumps(data)
    
    # 在 Redis 中同步设置
    await redis_client.set(key, value, ex=ttl or 3600)

键命名约定

命名模式

// 良好的命名约定
const keys = {
  user: 'user:{userId}',              // 用户数据
  userSession: 'session:{sessionId}',      // 用户会话
  userList: 'users:page:{page}',       // 用户列表分页
  userCache: 'cache:user:{userId}',       // 用户缓存
  product: 'product:{productId}',         // 产品数据
  productCache: 'cache:product:{productId}', // 产品缓存
  searchResults: 'search:{query}:page:{page}', // 搜索结果分页
  leaderboard: 'leaderboard:category:{category}', // 按类别的排行榜
  rateLimit: 'ratelimit:{ip}', // 每个 IP 的限流
  session: 'session:{sessionId}:user:{userId}', // 用户会话数据
}

// 使用
const userKey = keys.user('123')
const sessionKey = keys.userSession('abc-123', '123')

键层次结构

// 键的层次结构
const keys = {
  user: {
    base: 'user',
    profile: (userId: string) => `user:${userId}:profile`,
    settings: (userId: string) => `user:${userId}:settings`,
    posts: (userId: string, page: number) => `user:${userId}:posts:${page}`,
  },
  product: {
    base: 'product',
    details: (productId: string) => `product:${productId}:details`,
    reviews: (productId: string) => `product:${productId}:reviews`,
    cache: (productId: string) => `cache:product:${productId}`,
  },
  session: {
    base: 'session',
    user: (sessionId: string, userId: string) => `session:${sessionId}:user:${userId}`,
  },
}

// 使用
const userProfileKey = keys.user.profile('123')
const productDetailsKey = keys.product.details('abc-456')

基于环境的键

const keys = {
  development: {
    user: (userId: string) => `dev:user:${userId}`,
    session: (sessionId: string) => `dev:session:${sessionId}`,
  },
  production: {
    user: (userId: string) => `prod:user:${userId}`,
    session: (sessionId: string) => `prod:session:${sessionId}`,
  },
}

const envPrefix = process.env.NODE_ENV === 'production' ? 'prod' : 'dev'

const userKey = keys[envPrefix].user('123')
# 键的层次结构
keys = {
    'user': {
        'base': 'user',
        'profile': lambda user_id: f'user:{user_id}:profile',
        'settings': lambda user_id: f'user:{user_id}:settings',
        'posts': lambda user_id, page: f'user:{user_id}:posts:{page}',
    },
    'product': {
        'base': 'product',
        'details': lambda product_id: f'product:{product_id}:details',
        'reviews': lambda product_id: f'product:{product_id}:reviews',
        'cache': lambda product_id: f'cache:product:{product_id}',
    },
    'session': {
        'base': 'session',
        'user': lambda session_id, user_id: f'session:{session_id}:user:{user_id}',
    },
}

# 使用
user_profile_key = keys['user']['profile']('123')
product_details_key = keys['product']['details']('abc-456')

TTL 管理

绝对 TTL

// services/cache.service.ts
export async function setWithTTL(
  key: string,
  data: any,
  ttlSeconds: number
): Promise<void> {
  await redisClient.set(key, JSON.stringify(data), {
    EX: ttlSeconds
  })
}

// 使用
await setWithTTL('user:123', userData, 3600) // 1小时
await setWithTTL('session:abc', sessionData, 1800) // 30分钟
await setWithTTL('temp:data', tempData, 60) // 1分钟

滑动 TTL(访问时刷新)

// services/cache.service.ts
export async function getWithRefresh(
  key: string,
  ttlSeconds: number
): Promise<any> {
  const cached = await redisClient.get(key)
  
  if (cached) {
    // 访问时刷新 TTL
    await redisClient.expire(key, ttlSeconds)
    return JSON.parse(cached)
  }
  
  return null
}

// 使用
async function getUserWithRefresh(userId: string) {
  const key = `user:${userId}`
  
  const user = await getWithRefresh(key, 3600)
  
  if (user) {
    return user
  }
  
  // 从数据库获取
  const dbUser = await db.user.findUnique({ where: { id: userId } })
  
  await setWithTTL(key, dbUser, 3600)
  
  return dbUser
}
async def get_with_refresh(key: str, ttl_seconds: int) -> dict | None:
    cached = await redis_client.get(key)
    
    if cached:
        # 访问时刷新 TTL
        await redis_client.expire(key, ttl_seconds)
        return json.loads(cached)
    
    return None


async def get_user_with_refresh(user_id: str):
    key = f"user:{user_id}"
    
    user = await get_with_refresh(key, 3600)
    if user:
        return user
    
    # 从数据库获取
    db_user = await db.user.find_unique({"id": user_id})
    await set_with_ttl(key, db_user, 3600)
    
    return db_user

基于数据的动态 TTL

// services/cache.service.ts
export async function setDynamicTTL(
  key: string,
  data: any
): Promise<void> {
  let ttl: number
  
  // 根据数据类型计算 TTL
  if (data.type === 'user') {
    ttl = 3600 // 1小时
  } else if (data.type === 'session') {
    ttl = 1800 // 30分钟
  } else if (data.type === 'temp') {
    ttl = 60 // 1分钟
  } else {
    ttl = 600 // 10分钟
  }
  
  await redisClient.set(key, JSON.stringify(data), {
    EX: ttl
  })
}

缓存失效

简单失效

// services/cache.service.ts
export async function invalidateUser(userId: string): Promise<void> {
  // 删除用户缓存
  await redisClient.del(`user:${userId}`)
  
  // 删除与用户相关的缓存
  const keys = await redisClient.keys(`user:${userId}:*`)
  if (keys.length > 0) {
    await redisClient.del(keys)
  }
}

// 使用
async function updateUser(userId: string, updates: Partial<User>) {
  // 在数据库中更新
  const user = await db.user.update({
    where: { id: userId },
    data: updates
  })
  
  // 使缓存失效
  await invalidateUser(userId)
  
  return user
}

基于模式的失效

// services/cache.service.ts
export async function invalidatePattern(pattern: string): Promise<number> {
  const keys = await redisClient.keys(pattern)
  
  if (keys.length > 0) {
    await redisClient.del(keys)
  }
  
  return keys.length
}

// 使用
async function invalidateAllUserCaches() {
  await invalidatePattern('user:*')
}

async function invalidateAllProductCaches(productId: string) {
  await invalidatePattern(`product:${productId}:*`)
}

基于标签的失效

// services/cache.service.ts
export async function invalidateTag(tag: string): Promise<number> {
  // 使所有带标签的键失效
  const keys = await redisClient.keys(`*:${tag}`)
  
  if (keys.length > 0) {
    await redisClient.del(keys)
  }
  
  return keys.length
}

// 使用
async function invalidateByTags(tags: string[]) {
  for (const tag of tags) {
    await invalidateTag(tag)
  }
}

基于哈希的失效

// services/cache.service.ts
export async function invalidateUserHash(userId: string): Promise<void> {
  // 删除用户哈希
  await redisClient.del(`user:${userId}`)
  
  // 删除用户会话
  const sessionKeys = await redisClient.keys(`session:*:user:${userId}`)
  if (sessionKeys.length > 0) {
    await redisClient.del(sessionKeys)
  }
}
# 基于哈希的失效
async def invalidate_user_hash(user_id: str) -> None:
    # 删除用户哈希
    await redis_client.delete(f"user:{user_id}")
    
    # 删除用户会话
    session_keys = await redis_client.keys(f"session:*:user:{user_id}")
    if session_keys:
        await redis_client.delete(*session_keys)

分布式缓存

Redis 集群

// config/redis-cluster.ts
import { createCluster } from 'redis'

const cluster = createCluster({
  rootNodes: [
    { host: 'redis-1.example.com', port: 6379 },
    { host: 'redis-2.example.com', port: 6379 },
    { host: 'redis-3.example.com', port: 6379 },
  ],
  readonlyMode: 'slave',
})

export { cluster }

客户端分片

// services/cache.service.ts
export function getShardKey(key: string, userId?: string): string {
  // 按用户 ID 分片用户特定数据
  if (userId) {
    return `${userId}:${key}`
  }
  
  // 按键前缀分片
  const shard = Math.abs(key.split(':')[0].charCodeAt(0) % 4)
  return `shard:${shard}:${key}`
}

// 使用
const userKey = getShardKey('user:123', '456')
const sessionKey = getShardKey('session:abc', '123')

发布/订阅模式

简单发布者

// services/pubsub.service.ts
import { redisClient } from '../config/redis'

export async function publish(channel: string, message: any): Promise<number> {
  return await redisClient.publish(channel, JSON.stringify(message))
}

// 使用
await publish('notifications', { type: 'user.created', userId: '123' })

简单订阅者

// services/pubsub.service.ts
import { redisClient } from '../config/redis'

export async function subscribe(
  channel: string,
  callback: (message: any) => void
): Promise<void> {
  const subscriber = redisClient.duplicate()
  
  await subscriber.subscribe(channel, (message) => {
    const data = JSON.parse(message)
    callback(data)
  })
}

// 使用
await subscribe('notifications', (data) => {
  console.log('收到通知:', data)
})

基于模式的订阅

// services/pubsub.service.ts
export async function subscribeToUserNotifications(
  userId: string,
  callback: (message: any) => void
): Promise<void> {
  const channel = `notifications:user:${userId}`
  await subscribe(channel, callback)
}
export async function subscribeToGlobalNotifications(
  callback: (message: any) => void
): Promise<void> {
  const channel = 'notifications:global'
  await subscribe(channel, callback)
}

消息队列(发布/订阅)

// services/queue.service.ts
export async function addToQueue(
  queueName: string,
  item: any
): Promise<number> {
  return await redisClient.lPush(queueName, JSON.stringify(item))
}
export async function processQueue(
  queueName: string,
  processor: (item: any) => Promise<void>
): Promise<void> {
  while (true) {
    const result = await redisClient.brPop(queueName)
    
    if (!result) {
      break
    }
    
    const item = JSON.parse(result)
    await processor(item)
  }
}

// 使用
await addToQueue('email:queue', { to: 'user@example.com', subject: 'Hello', body: 'Message' })

会话存储

会话管理

// services/session.service.ts
import { redisClient } from '../config/redis'

export async function createSession(
  userId: string,
  userData: any,
  ttlSeconds: number = 3600
): Promise<string> {
  const sessionId = generateSessionId()
  const sessionKey = `session:${sessionId}`
  
  const sessionData = {
    userId,
    userData,
    createdAt: new Date().toISOString(),
    lastActivity: new Date().toISOString(),
  }
  
  // 在哈希中存储会话数据
  await redisClient.hSet(sessionKey, sessionData)
  
  // 设置 TTL
  await redisClient.expire(sessionKey, ttlSeconds)
  
  return sessionId
}
export async function getSession(sessionId: string): Promise<any | null> {
  const sessionKey = `session:${sessionId}`
  
  const sessionData = await redisClient.hGetAll(sessionKey)
  
  if (!sessionData) {
    return null
  }
  
  return sessionData
}
export async function updateSessionActivity(sessionId: string): Promise<void> {
  const sessionKey = `session:${sessionId}`
  
  await redisClient.hSet(sessionKey, {
    lastActivity: new Date().toISOString()
  })
}
export async function deleteSession(sessionId: string): Promise<void> {
  const sessionKey = `session:${sessionId}`
  await redisClient.del(sessionKey)
}

function generateSessionId(): string {
  return Date.now().toString(36) + Math.random().toString(36).slice(2)
}
# services/session.service.py
import redis_client
import json
from datetime import datetime, timedelta
import uuid

def create_session(user_id: str, user_data: dict, ttl_seconds: int = 3600) -> str:
    session_id = str(uuid.uuid4())
    session_key = f"session:{session_id}"
    
    session_data = {
        "user_id": user_id,
        "user_data": user_data,
        "created_at": datetime.utcnow().isoformat(),
        "last_activity": datetime.utcnow().isoformat(),
    }
    
    # 在哈希中存储会话数据
    await redis_client.hset(session_key, session_data)
    
    # 设置 TTL
    await redis_client.expire(session_key, ttl_seconds)
    
    return session_id


async def get_session(session_id: str) -> dict | None:
    session_key = f"session:{session_id}"
    session_data = await redis_client.hgetall(session_key)
    
    if not session_data:
        return None
    
    return session_data


async def update_session_activity(session_id: str) -> None:
    session_key = f"session:{session_id}"
    
    await redis_client.hset(session_key, {
        "last_activity": datetime.utcnow().isoformat()
    })


async def delete_session(session_id: str) -> None:
    session_key = f"session:{session_id}"
    await redis_client.delete(session_key)

多设备会话

// services/session.service.ts
export async function createSessionForDevice(
  userId: string,
  deviceId: string,
  userData: any
): Promise<string> {
  const sessionId = generateSessionId()
  const sessionKey = `session:${sessionId}:device:${deviceId}`
  
  const sessionData = {
    userId,
    deviceId,
    userData,
    createdAt: new Date().toISOString(),
    lastActivity: new Date().toISOString(),
  }
  
  await redisClient.hSet(sessionKey, sessionData)
  await redisClient.expire(sessionKey, 3600)
  
  return sessionId
}
export async function getUserSessions(userId: string): Promise<string[]> {
  const pattern = `session:*:user:${userId}`
  const keys = await redisClient.keys(pattern)
  
  return keys
}

使用 Redis 进行限流

固定窗口限流器

// services/ratelimit.service.ts
import { redisClient } from '../config/redis'

export async function checkRateLimit(
  identifier: string,
  limit: number,
  windowMs: number
): Promise<boolean> {
  const key = `ratelimit:${identifier}`
  const current = await redisClient.incr(key)
  
  if (current > limit) {
    return false
  }
  
  // 为窗口设置过期时间
  await redisClient.expire(key, windowMs / 1000)
  
  return true
}
export async function resetRateLimit(identifier: string): Promise<void> {
  await redisClient.del(`ratelimit:${identifier}`)
}

// 使用
async function rateLimiter(
  identifier: string,
  limit: number = 10,
  windowMs: number = 60000 // 1分钟
): Promise<void> {
  const allowed = await checkRateLimit(identifier, limit, windowMs)
  
  if (!allowed) {
    throw new Error('超出频率限制')
  }
  
  // 继续操作
}
# services/ratelimit.service.py
import redis_client

async def check_rate_limit(identifier: str, limit: int, window_ms: int = 60000) -> bool:
    key = f"ratelimit:{identifier}"
    current = await redis_client.incr(key)
    
    if current > limit:
        return False
    
    # 为窗口设置过期时间
    await redis_client.expire(key, window_ms / 1000)
    
    return True


async def reset_rate_limit(identifier: str) -> None:
    await redis_client.delete(f"ratelimit:{identifier}")


async def rate_limiter(identifier: str, limit: int = 10, window_ms: int = 60000):
    allowed = await check_rate_limit(identifier, limit, window_ms)
    
    if not allowed:
        raise Exception('超出频率限制')
    
    # 继续操作

滑动窗口限流器

// services/ratelimit.service.ts
export async function slidingWindowRateLimit(
  identifier: string,
  limit: number,
  windowMs: number
): Promise<void> {
  const key = `ratelimit:${identifier}`
  
  // 获取当前计数
  const current = await redisClient.incr(key)
  
  if (current > limit) {
    throw new Error('超出频率限制')
  }
  
  // 移除窗口外的旧条目
  const now = Date.now()
  const oldest = await redisClient.zRangeByScore(
    key,
    0,
    Date.now() - windowMs,
    Date.now(),
    'WITHSCORES'
  )
  
  if (oldest.length > 0) {
    await redisClient.zRemRangeByScore(
      key,
      oldest[0].score,
      oldest[oldest.length - 1].score,
      oldest[oldest.length - 1].member
    )
  }
}

// 使用
async function apiRateLimiter(
  identifier: string,
  limit: number = 100,
  windowMs: number = 60000
): Promise<void> {
  await slidingWindowRateLimit(identifier, limit, windowMs)
  
  // 继续 API 调用
}
async def sliding_window_rate_limit(
    identifier: str,
    limit: int = 100,
    window_ms: int = 60000
) -> None:
    key = f"ratelimit:{identifier}"
    
    # 获取当前计数
    current = await redis_client.incr(key)
    
    if current > limit:
        raise Exception('超出频率限制')
    
    # 移除窗口外的旧条目
    now = int(time.time() * 1000)
    oldest = await redis_client.zrangebyscore(
        key,
        0,
        now - window_ms,
        now,
        withscores=True
    )
    
    if oldest:
        await redis_client.zremrangebyscore(
            key,
            oldest[0].score,
            oldest[oldest.length - 1].score,
            oldest[oldest.length - 1].member
        )

最佳实践

  1. 使用适当的数据结构

    • 使用哈希存储相关字段
    • 使用集合存储唯一集合
    • 使用有序集合存储排名数据
    • 使用列表存储有序序列
  2. 始终设置 TTL

    • 缓存数据应始终设置过期时间
    • 根据数据的易变性使用适当的 TTL
    • 考虑为频繁访问的数据使用滑动 TTL
  3. 使用压缩

    • 在缓存之前压缩大量数据
    • 使用高效的序列化格式
    • 考虑为性能使用二进制格式
  4. 处理连接错误

    • 实施适当的错误处理
    • 使用连接池
    • 设置适当的重试策略
    • 监控连接健康状态
  5. 监控 Redis 性能

    • 跟踪内存使用情况
    • 监控命中率/未命中率
    • 观察慢操作
    • 设置关键指标的警报
  6. 使用管道批量操作

    • 将多个操作一起批处理
    • 减少网络往返次数
    • 提高整体性能
  7. 使用适当的序列化

    • 为复杂对象使用 JSON
    • 考虑为高性能使用协议缓冲区
    • 避免对结构化数据使用字符串连接
  8. 键命名

    • 使用层次结构命名约定
    • 包括环境前缀
    • 使用一致的模式
    • 避免过长的键
  9. 缓存失效

    • 实施适当的失效策略
    • 谨慎使用基于模式的失效
    • 考虑为复杂场景使用基于标签的失效
  10. 安全

    • 在生产中使用认证
    • 加密敏感数据
    • 使用 TLS 进行网络连接
    • 遵循最小权限原则

相关技能