Redis 缓存模式
概览
Redis 是一个内存中的数据结构存储,可以用作数据库、缓存、消息代理和队列。本技能涵盖了 Redis 设置、数据结构、缓存策略、键命名约定、TTL 管理、缓存失效、分布式缓存、发布/订阅模式、会话存储和限流。
前提条件
- 了解缓存概念
- 了解数据结构(字符串、哈希、列表、集合、有序集合)
- 熟悉 Node.js 或 Python
- 了解 TTL(Time To Live)概念
- 了解分布式系统基础
核心概念
Redis 数据结构
- 字符串:二进制安全的字符串,大小可达 512MB
- 哈希:字符串字段和字符串值之间的映射
- 列表:按插入顺序排序的字符串元素集合
- 集合:无序的唯一字符串集合
- 有序集合:按相关分数排序的唯一字符串集合
缓存策略
- 旁路缓存(懒加载):按需填充缓存
- 写入时:与数据库同步更新缓存
- 写入后:异步更新数据库的缓存
- 读穿:由缓存提供程序填充缓存
关键设计模式
- 键命名:层次结构、基于环境和基于模式的命名
- TTL 管理:绝对 TTL、滑动 TTL、动态 TTL
- 缓存失效:简单、基于模式、基于标签、基于哈希
- 分布式缓存:Redis 集群、客户端分片
实施指南
Redis 设置
基本连接
// config/redis.ts
import { createClient } from 'redis'
const redisClient = createClient({
url: process.env.REDIS_URL || 'redis://localhost:6379',
socket: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
password: process.env.REDIS_PASSWORD,
database: process.env.REDIS_DB || '0',
})
export { redisClient }
# config/redis.py
import redis
from typing import Optional
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
password=Optional[str] = None,
decode_responses=True
)
连接池
// config/redis-pool.ts
import { createClient } from 'redis'
const pool = createClient({
url: process.env.REDIS_URL || 'redis://localhost:6379',
socket: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
maxRetriesPerRequest: 3,
retryDelayOnFailover: 100,
lazyConnect: true,
})
export { pool }
# config/redis-pool.py
import redis
from redis.connection import ConnectionPool
pool = ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50,
decode_responses=True
)
def get_redis_client():
return pool.get_connection()
数据结构
字符串
// 基本字符串操作
import { redisClient } from '../config/redis'
// 设置键
await redisClient.set('user:123', JSON.stringify({ name: 'John', email: 'john@example.com' }))
// 获取键
const user = await redisClient.get('user:123')
if (user) {
console.log(JSON.parse(user))
}
// 设置带过期(TTL 秒)
await redisClient.set('session:abc', JSON.stringify({ userId: 123 }), {
EX: 3600 // 1小时
})
// 设置带过期使用毫秒
await redisClient.set('temp:data', 'value', {
PX: 60000 // 60秒
})
// 获取剩余 TTL
const ttl = await redisClient.ttl('session:abc')
console.log(`会话在 ${ttl} 秒后过期`)
// 删除键
await redisClient.del('user:123')
# 基本字符串操作
import redis_client
# 设置键
await redis_client.set('user:123', '{"name": "John", "email": "john@example.com"}')
# 获取键
user_data = await redis_client.get('user:123')
if user_data:
import json
print(json.loads(user_data))
# 设置带过期(TTL 秒)
await redis_client.setex('session:abc', json.dumps({"user_id": 123}), 3600)
# 获取剩余 TTL
ttl = await redis_client.ttl('session:abc')
print(f'会话在 {ttl} 秒后过期')
# 删除键
await redis_client.delete('user:123')
哈希
// 哈希操作
import { redisClient } from '../config/redis'
// 设置哈希字段
await redisClient.hSet('user:123', {
name: 'John Doe',
email: 'john@example.com',
age: '30'
})
// 获取特定哈希字段
const name = await redisClient.hGet('user:123', 'name')
// 获取所有哈希字段
const user = await redisClient.hGetAll('user:123')
// 获取多个哈希字段
const fields = await redisClient.hMGet('user:123', ['name', 'email'])
// 设置多个哈希字段
await redisClient.hMSet('user:123', {
name: 'Jane Smith',
email: 'jane@example.com'
})
// 增加哈希字段
const newAge = await redisClient.hIncrBy('user:123', 'age', 1)
// 检查哈希字段是否存在
const hasName = await redisClient.hExists('user:123', 'name')
// 删除哈希字段
await redisClient.hDel('user:123', 'age')
// 获取哈希长度
const hashLength = await redisClient.hLen('user:123')
// 获取所有哈希键
const allUsers = await redisClient.keys('user:*')
# 哈希操作
import redis_client
# 设置哈希字段
await redis_client.hset('user:123', mapping={
'name': 'John Doe',
'email': 'john@example.com',
'age': '30'
})
# 获取特定哈希字段
name = await redis_client.hget('user:123', 'name')
# 获取所有哈希字段
user = await redis_client.hgetall('user:123')
# 获取多个哈希字段
fields = await redis_client.hmget('user:123', ['name', 'email'])
# 设置多个哈希字段
await redis_client.hmset('user:123', {
'name': 'Jane Smith',
'email': 'jane@example.com'
})
# 增加哈希字段
new_age = await redis_client.hincrby('user:123', 'age', 1)
# 检查哈希字段是否存在
has_name = await redis_client.hexists('user:123', 'name')
# 删除哈希字段
await redis_client.hdel('user:123', 'age')
# 获取哈希长度
hash_length = await redis_client.hlen('user:123')
# 获取所有哈希键
all_users = await redis_client.keys('user:*')
列表
// 列表操作
import { redisClient } from '../config/redis'
// 向列表添加(左推)
await redisClient.lPush('recent:items', JSON.stringify({ id: 1, name: 'Item 1' }))
// 向列表添加(右推)
await redisClient.rPush('recent:items', JSON.stringify({ id: 2, name: 'Item 2' }))
// 获取列表范围(0 到 -1 表示全部)
const items = await redisClient.lRange('recent:items', 0, -1)
// 获取列表范围分页
const page1 = await redisClient.lRange('recent:items', 0, 9)
const page2 = await redisClient.lRange('recent:items', 10, 19)
// 获取列表长度
const length = await redisClient.lLen('recent:items')
// 修剪列表到 N 项
await redisClient.lTrim('recent:items', 0, 99)
// 从列表中移除
await redisClient.lRem('recent:items', 0)
// 获取索引处的元素
const firstItem = await redisClient.lIndex('recent:items', 0)
# 列表操作
import redis_client
# 向列表添加(左推)
await redis_client.lpush('recent:items', json.dumps({"id": 1, "name": "Item 1"}))
# 向列表添加(右推)
await redis_client.rpush('recent:items', json.dumps({"id": 2, "name": "Item 2"}))
# 获取列表范围(0 到 -1 表示全部)
items = await redis_client.lrange('recent:items', 0, -1)
# 获取列表范围分页
page1 = await redis_client.lrange('recent:items', 0, 9)
page2 = await redis_client.lrange('recent:items', 10, 19)
# 获取列表长度
length = await redis_client.llen('recent:items')
# 修剪列表到 N 项
await redis_client.ltrim('recent:items', 0, 99)
# 从列表中移除
await redis_client.lrem('recent:items', 0, 1)
# 获取索引处的元素
first_item = await redis_client.lindex('recent:items', 0)
集合
// 集合操作
import { redisClient } from '../config/redis'
// 向集合添加
await redisClient.sAdd('user:123:followers', 'user456')
// 向集合添加多个
await redisClient.sAdd('user:123:followers', ['user456', 'user789', 'user012'])
// 检查成员是否存在
const isFollower = await redisClient.sIsMember('user:123:followers', 'user456')
// 获取所有成员
const followers = await redisClient.sMembers('user:123:followers')
// 获取随机成员
const randomFollower = await redisClient.sRandMember('user:123:followers')
// 从集合中移除
await redisClient.sRem('user:123:followers', 'user456')
// 获取集合大小
const followerCount = await redisClient.sCard('user:123:followers')
// 获取多个集合的并集
const allUsers = await redisClient.sUnion('user:123:followers', 'user:456:following')
# 集合操作
import redis_client
# 向集合添加
await redis_client.sadd('user:123:followers', 'user456')
# 向集合添加多个
await redis_client.sadd('user:123:followers', ['user456', 'user789', 'user012'])
# 检查成员是否存在
is_follower = await redis_client.sismember('user:123:followers', 'user456')
# 获取所有成员
followers = await redis_client.smembers('user:123:followers')
# 获取随机成员
random_follower = await redis_client.srandmember('user:123:followers')
# 从集合中移除
await redis_client.srem('user:123:followers', 'user456')
# 获取集合大小
follower_count = await redis_client.scard('user:123:followers')
# 获取多个集合的并集
all_users = await redis_client.sunion('user:123:followers', 'user:456:following')
有序集合
// 有序集合操作
import { redisClient } from '../config/redis'
// 向有序集合添加
await redisClient.zAdd('leaderboard:scores', JSON.stringify({ userId: 1, score: 100 }))
// 向有序集合添加多个带分数
await redisClient.zAdd('leaderboard:scores', [
JSON.stringify({ userId: 1, score: 100 }),
JSON.stringify({ userId: 2, score: 95 }),
JSON.stringify({ userId: 3, score: 90 })
])
// 按分数范围获取(升序)
const top10 = await redisClient.zRangeWithScores('leaderboard:scores', 0, 9, 'WITHSCORES')
// 按分数范围获取(降序)
const top10 = await redisClient.zRevRangeWithScores('leaderboard:scores', 0, 9, 'WITHSCORES')
// 获取成员的排名
const rank = await redis.zRevRank('leaderboard:scores', JSON.stringify({ userId: 1 }))
// 获取成员的分数
const score = await redis.zScore('leaderboard:scores', JSON.stringify({ userId: 1 }))
// 移除成员
await redis.zRem('leaderboard:scores', JSON.stringify({ userId: 1 }))
# 有序集合操作
import redis_client
# 向有序集合添加
await redis_client.zadd('leaderboard:scores', json.dumps({"user_id": 1, "score": 100}))
# 向有序集合添加多个带分数
await redis_client.zadd('leaderboard:scores', [
json.dumps({"user_id": 1, "score": 100}),
json.dumps({"user_id": 2, "score": 95}),
json.dumps({"user_id": 3, "score": 90})
])
# 按分数范围获取(升序)
top_10 = await redis_client.zrange('leaderboard:scores', 0, 9, withscores=True)
# 按分数范围获取(降序)
top_10 = await redis_client.zrevrange('leaderboard:scores', 0, 9, withscores=True)
# 获取成员的排名
rank = await redis_client.zrevrank('leaderboard:scores', json.dumps({"user_id": 1}))
# 获取成员的分数
score = await redis_client.zscore('leaderboard:scores', json.dumps({"user_id": 1}))
# 移除成员
await redis_client.zrem('leaderboard:scores', json.dumps({"user_id": 1}))
缓存策略
旁路缓存(懒加载)
// services/cache.service.ts
import { redisClient } from '../config/redis'
interface CacheOptions {
ttl?: number
}
export async function getFromCache<T>(
key: string,
options?: CacheOptions
): Promise<T | null> {
const cached = await redisClient.get(key)
if (cached) {
return JSON.parse(cached)
}
return null
}
export async function setCache<T>(
key: string,
data: T,
options?: CacheOptions
): Promise<void> {
const value = JSON.stringify(data)
if (options?.ttl) {
await redisClient.set(key, value, {
EX: options.ttl
})
} else {
await redisClient.set(key, value)
}
}
// 使用
async function getUser(userId: string) {
const cacheKey = `user:${userId}`
// 尝试从缓存中获取
const cached = await getFromCache<User>(cacheKey)
if (cached) {
return cached
}
// 从数据库获取
const user = await db.user.findUnique({ where: { id: userId } })
// 在缓存中设置 1 小时 TTL
await setCache(cacheKey, user, { ttl: 3600 })
return user
}
# services/cache.service.py
import redis_client
import json
async def get_from_cache(key: str, ttl: int = None) -> dict | None:
cached = await redis_client.get(key)
if cached:
return json.loads(cached)
return None
async def set_cache(key: str, data: any, ttl: int = None) -> None:
value = json.dumps(data)
if ttl:
await redis_client.setex(key, value, ttl)
else:
await redis_client.set(key, value)
async def get_user(user_id: str):
cache_key = f"user:{user_id}"
# 尝试从缓存中获取
cached = await get_from_cache(cache_key)
if cached:
return cached
# 从数据库获取
user = await db.user.find_unique({"id": user_id})
# 在缓存中设置 1 小时 TTL
await set_cache(cache_key, user, ttl=3600)
return user
写入时
// services/cache.service.ts
export async function updateWithCache<T>(
key: string,
updater: () => Promise<T>
): Promise<T> {
// 缓存和数据库原子性更新
const data = await updater()
// 在缓存中设置
await redisClient.set(key, JSON.stringify(data), {
EX: 3600
})
return data
}
// 使用
async function updateUser(userId: string, updates: Partial<User>) {
const cacheKey = `user:${userId}`
const updatedUser = await updateWithCache(cacheKey, async () => {
return await db.user.update({
where: { id: userId },
data: updates
})
})
return updatedUser
}
async def update_with_cache(key: str, updater: callable) -> dict:
# 缓存和数据库原子性更新
data = await updater()
# 在缓存中设置
await redis_client.set(key, json.dumps(data), ex=3600)
return data
async def update_user(user_id: str, updates: dict):
cache_key = f"user:{user_id}"
updated_user = await update_with_cache(cacheKey, lambda: db.user.update(
{"id": user_id},
updates=updates
))
return updated_user
写入后(异步)
// services/cache.service.ts
import { redisClient } from '../config/redis'
export async function setCacheAsync<T>(
key: string,
data: T,
ttl?: number
): Promise<void> {
const value = JSON.stringify(data)
// 在 Redis 中设置
await redisClient.set(key, value, {
EX: ttl || 3600
})
// 后台更新
await db.user.update({
where: { id: data.id },
data: updates
})
}
// 使用
async function createUser(userData: User) {
// 先在数据库中创建
const user = await db.user.create({ data: userData })
// 异步在缓存中设置
await setCacheAsync(`user:${user.id}`, user)
}
async def set_cache_async(key: str, data: dict, ttl: int = None) -> None:
value = json.dumps(data)
# 在 Redis 中设置
await redis_client.set(key, value, ex=ttl or 3600)
# 后台更新
await db.user.create(data=data)
async def create_user(user_data: dict):
# 先在数据库中创建
user = await db.user.create(data=user_data)
# 异步在缓存中设置
await set_cache_async(f"user:{user['id']}", user)
写入后(同步)
// services/cache.service.ts
export async function setCacheSync<T>(
key: string,
data: T,
ttl?: number
): Promise<void> {
const value = JSON.stringify(data)
// 更新数据库
await db.user.create({ data })
// 在 Redis 中同步设置
await redisClient.set(key, value, {
EX: ttl || 3600
})
}
// 使用
async function createAndCacheUser(userData: User) {
// 在数据库中创建
const user = await db.user.create({ data: userData })
// 在缓存中设置
await setCacheSync(`user:${user.id}`, user)
}
async def set_cache_sync(key: str, data: dict, ttl: int = None) -> None:
value = json.dumps(data)
# 在 Redis 中同步设置
await redis_client.set(key, value, ex=ttl or 3600)
键命名约定
命名模式
// 良好的命名约定
const keys = {
user: 'user:{userId}', // 用户数据
userSession: 'session:{sessionId}', // 用户会话
userList: 'users:page:{page}', // 用户列表分页
userCache: 'cache:user:{userId}', // 用户缓存
product: 'product:{productId}', // 产品数据
productCache: 'cache:product:{productId}', // 产品缓存
searchResults: 'search:{query}:page:{page}', // 搜索结果分页
leaderboard: 'leaderboard:category:{category}', // 按类别的排行榜
rateLimit: 'ratelimit:{ip}', // 每个 IP 的限流
session: 'session:{sessionId}:user:{userId}', // 用户会话数据
}
// 使用
const userKey = keys.user('123')
const sessionKey = keys.userSession('abc-123', '123')
键层次结构
// 键的层次结构
const keys = {
user: {
base: 'user',
profile: (userId: string) => `user:${userId}:profile`,
settings: (userId: string) => `user:${userId}:settings`,
posts: (userId: string, page: number) => `user:${userId}:posts:${page}`,
},
product: {
base: 'product',
details: (productId: string) => `product:${productId}:details`,
reviews: (productId: string) => `product:${productId}:reviews`,
cache: (productId: string) => `cache:product:${productId}`,
},
session: {
base: 'session',
user: (sessionId: string, userId: string) => `session:${sessionId}:user:${userId}`,
},
}
// 使用
const userProfileKey = keys.user.profile('123')
const productDetailsKey = keys.product.details('abc-456')
基于环境的键
const keys = {
development: {
user: (userId: string) => `dev:user:${userId}`,
session: (sessionId: string) => `dev:session:${sessionId}`,
},
production: {
user: (userId: string) => `prod:user:${userId}`,
session: (sessionId: string) => `prod:session:${sessionId}`,
},
}
const envPrefix = process.env.NODE_ENV === 'production' ? 'prod' : 'dev'
const userKey = keys[envPrefix].user('123')
# 键的层次结构
keys = {
'user': {
'base': 'user',
'profile': lambda user_id: f'user:{user_id}:profile',
'settings': lambda user_id: f'user:{user_id}:settings',
'posts': lambda user_id, page: f'user:{user_id}:posts:{page}',
},
'product': {
'base': 'product',
'details': lambda product_id: f'product:{product_id}:details',
'reviews': lambda product_id: f'product:{product_id}:reviews',
'cache': lambda product_id: f'cache:product:{product_id}',
},
'session': {
'base': 'session',
'user': lambda session_id, user_id: f'session:{session_id}:user:{user_id}',
},
}
# 使用
user_profile_key = keys['user']['profile']('123')
product_details_key = keys['product']['details']('abc-456')
TTL 管理
绝对 TTL
// services/cache.service.ts
export async function setWithTTL(
key: string,
data: any,
ttlSeconds: number
): Promise<void> {
await redisClient.set(key, JSON.stringify(data), {
EX: ttlSeconds
})
}
// 使用
await setWithTTL('user:123', userData, 3600) // 1小时
await setWithTTL('session:abc', sessionData, 1800) // 30分钟
await setWithTTL('temp:data', tempData, 60) // 1分钟
滑动 TTL(访问时刷新)
// services/cache.service.ts
export async function getWithRefresh(
key: string,
ttlSeconds: number
): Promise<any> {
const cached = await redisClient.get(key)
if (cached) {
// 访问时刷新 TTL
await redisClient.expire(key, ttlSeconds)
return JSON.parse(cached)
}
return null
}
// 使用
async function getUserWithRefresh(userId: string) {
const key = `user:${userId}`
const user = await getWithRefresh(key, 3600)
if (user) {
return user
}
// 从数据库获取
const dbUser = await db.user.findUnique({ where: { id: userId } })
await setWithTTL(key, dbUser, 3600)
return dbUser
}
async def get_with_refresh(key: str, ttl_seconds: int) -> dict | None:
cached = await redis_client.get(key)
if cached:
# 访问时刷新 TTL
await redis_client.expire(key, ttl_seconds)
return json.loads(cached)
return None
async def get_user_with_refresh(user_id: str):
key = f"user:{user_id}"
user = await get_with_refresh(key, 3600)
if user:
return user
# 从数据库获取
db_user = await db.user.find_unique({"id": user_id})
await set_with_ttl(key, db_user, 3600)
return db_user
基于数据的动态 TTL
// services/cache.service.ts
export async function setDynamicTTL(
key: string,
data: any
): Promise<void> {
let ttl: number
// 根据数据类型计算 TTL
if (data.type === 'user') {
ttl = 3600 // 1小时
} else if (data.type === 'session') {
ttl = 1800 // 30分钟
} else if (data.type === 'temp') {
ttl = 60 // 1分钟
} else {
ttl = 600 // 10分钟
}
await redisClient.set(key, JSON.stringify(data), {
EX: ttl
})
}
缓存失效
简单失效
// services/cache.service.ts
export async function invalidateUser(userId: string): Promise<void> {
// 删除用户缓存
await redisClient.del(`user:${userId}`)
// 删除与用户相关的缓存
const keys = await redisClient.keys(`user:${userId}:*`)
if (keys.length > 0) {
await redisClient.del(keys)
}
}
// 使用
async function updateUser(userId: string, updates: Partial<User>) {
// 在数据库中更新
const user = await db.user.update({
where: { id: userId },
data: updates
})
// 使缓存失效
await invalidateUser(userId)
return user
}
基于模式的失效
// services/cache.service.ts
export async function invalidatePattern(pattern: string): Promise<number> {
const keys = await redisClient.keys(pattern)
if (keys.length > 0) {
await redisClient.del(keys)
}
return keys.length
}
// 使用
async function invalidateAllUserCaches() {
await invalidatePattern('user:*')
}
async function invalidateAllProductCaches(productId: string) {
await invalidatePattern(`product:${productId}:*`)
}
基于标签的失效
// services/cache.service.ts
export async function invalidateTag(tag: string): Promise<number> {
// 使所有带标签的键失效
const keys = await redisClient.keys(`*:${tag}`)
if (keys.length > 0) {
await redisClient.del(keys)
}
return keys.length
}
// 使用
async function invalidateByTags(tags: string[]) {
for (const tag of tags) {
await invalidateTag(tag)
}
}
基于哈希的失效
// services/cache.service.ts
export async function invalidateUserHash(userId: string): Promise<void> {
// 删除用户哈希
await redisClient.del(`user:${userId}`)
// 删除用户会话
const sessionKeys = await redisClient.keys(`session:*:user:${userId}`)
if (sessionKeys.length > 0) {
await redisClient.del(sessionKeys)
}
}
# 基于哈希的失效
async def invalidate_user_hash(user_id: str) -> None:
# 删除用户哈希
await redis_client.delete(f"user:{user_id}")
# 删除用户会话
session_keys = await redis_client.keys(f"session:*:user:{user_id}")
if session_keys:
await redis_client.delete(*session_keys)
分布式缓存
Redis 集群
// config/redis-cluster.ts
import { createCluster } from 'redis'
const cluster = createCluster({
rootNodes: [
{ host: 'redis-1.example.com', port: 6379 },
{ host: 'redis-2.example.com', port: 6379 },
{ host: 'redis-3.example.com', port: 6379 },
],
readonlyMode: 'slave',
})
export { cluster }
客户端分片
// services/cache.service.ts
export function getShardKey(key: string, userId?: string): string {
// 按用户 ID 分片用户特定数据
if (userId) {
return `${userId}:${key}`
}
// 按键前缀分片
const shard = Math.abs(key.split(':')[0].charCodeAt(0) % 4)
return `shard:${shard}:${key}`
}
// 使用
const userKey = getShardKey('user:123', '456')
const sessionKey = getShardKey('session:abc', '123')
发布/订阅模式
简单发布者
// services/pubsub.service.ts
import { redisClient } from '../config/redis'
export async function publish(channel: string, message: any): Promise<number> {
return await redisClient.publish(channel, JSON.stringify(message))
}
// 使用
await publish('notifications', { type: 'user.created', userId: '123' })
简单订阅者
// services/pubsub.service.ts
import { redisClient } from '../config/redis'
export async function subscribe(
channel: string,
callback: (message: any) => void
): Promise<void> {
const subscriber = redisClient.duplicate()
await subscriber.subscribe(channel, (message) => {
const data = JSON.parse(message)
callback(data)
})
}
// 使用
await subscribe('notifications', (data) => {
console.log('收到通知:', data)
})
基于模式的订阅
// services/pubsub.service.ts
export async function subscribeToUserNotifications(
userId: string,
callback: (message: any) => void
): Promise<void> {
const channel = `notifications:user:${userId}`
await subscribe(channel, callback)
}
export async function subscribeToGlobalNotifications(
callback: (message: any) => void
): Promise<void> {
const channel = 'notifications:global'
await subscribe(channel, callback)
}
消息队列(发布/订阅)
// services/queue.service.ts
export async function addToQueue(
queueName: string,
item: any
): Promise<number> {
return await redisClient.lPush(queueName, JSON.stringify(item))
}
export async function processQueue(
queueName: string,
processor: (item: any) => Promise<void>
): Promise<void> {
while (true) {
const result = await redisClient.brPop(queueName)
if (!result) {
break
}
const item = JSON.parse(result)
await processor(item)
}
}
// 使用
await addToQueue('email:queue', { to: 'user@example.com', subject: 'Hello', body: 'Message' })
会话存储
会话管理
// services/session.service.ts
import { redisClient } from '../config/redis'
export async function createSession(
userId: string,
userData: any,
ttlSeconds: number = 3600
): Promise<string> {
const sessionId = generateSessionId()
const sessionKey = `session:${sessionId}`
const sessionData = {
userId,
userData,
createdAt: new Date().toISOString(),
lastActivity: new Date().toISOString(),
}
// 在哈希中存储会话数据
await redisClient.hSet(sessionKey, sessionData)
// 设置 TTL
await redisClient.expire(sessionKey, ttlSeconds)
return sessionId
}
export async function getSession(sessionId: string): Promise<any | null> {
const sessionKey = `session:${sessionId}`
const sessionData = await redisClient.hGetAll(sessionKey)
if (!sessionData) {
return null
}
return sessionData
}
export async function updateSessionActivity(sessionId: string): Promise<void> {
const sessionKey = `session:${sessionId}`
await redisClient.hSet(sessionKey, {
lastActivity: new Date().toISOString()
})
}
export async function deleteSession(sessionId: string): Promise<void> {
const sessionKey = `session:${sessionId}`
await redisClient.del(sessionKey)
}
function generateSessionId(): string {
return Date.now().toString(36) + Math.random().toString(36).slice(2)
}
# services/session.service.py
import redis_client
import json
from datetime import datetime, timedelta
import uuid
def create_session(user_id: str, user_data: dict, ttl_seconds: int = 3600) -> str:
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data = {
"user_id": user_id,
"user_data": user_data,
"created_at": datetime.utcnow().isoformat(),
"last_activity": datetime.utcnow().isoformat(),
}
# 在哈希中存储会话数据
await redis_client.hset(session_key, session_data)
# 设置 TTL
await redis_client.expire(session_key, ttl_seconds)
return session_id
async def get_session(session_id: str) -> dict | None:
session_key = f"session:{session_id}"
session_data = await redis_client.hgetall(session_key)
if not session_data:
return None
return session_data
async def update_session_activity(session_id: str) -> None:
session_key = f"session:{session_id}"
await redis_client.hset(session_key, {
"last_activity": datetime.utcnow().isoformat()
})
async def delete_session(session_id: str) -> None:
session_key = f"session:{session_id}"
await redis_client.delete(session_key)
多设备会话
// services/session.service.ts
export async function createSessionForDevice(
userId: string,
deviceId: string,
userData: any
): Promise<string> {
const sessionId = generateSessionId()
const sessionKey = `session:${sessionId}:device:${deviceId}`
const sessionData = {
userId,
deviceId,
userData,
createdAt: new Date().toISOString(),
lastActivity: new Date().toISOString(),
}
await redisClient.hSet(sessionKey, sessionData)
await redisClient.expire(sessionKey, 3600)
return sessionId
}
export async function getUserSessions(userId: string): Promise<string[]> {
const pattern = `session:*:user:${userId}`
const keys = await redisClient.keys(pattern)
return keys
}
使用 Redis 进行限流
固定窗口限流器
// services/ratelimit.service.ts
import { redisClient } from '../config/redis'
export async function checkRateLimit(
identifier: string,
limit: number,
windowMs: number
): Promise<boolean> {
const key = `ratelimit:${identifier}`
const current = await redisClient.incr(key)
if (current > limit) {
return false
}
// 为窗口设置过期时间
await redisClient.expire(key, windowMs / 1000)
return true
}
export async function resetRateLimit(identifier: string): Promise<void> {
await redisClient.del(`ratelimit:${identifier}`)
}
// 使用
async function rateLimiter(
identifier: string,
limit: number = 10,
windowMs: number = 60000 // 1分钟
): Promise<void> {
const allowed = await checkRateLimit(identifier, limit, windowMs)
if (!allowed) {
throw new Error('超出频率限制')
}
// 继续操作
}
# services/ratelimit.service.py
import redis_client
async def check_rate_limit(identifier: str, limit: int, window_ms: int = 60000) -> bool:
key = f"ratelimit:{identifier}"
current = await redis_client.incr(key)
if current > limit:
return False
# 为窗口设置过期时间
await redis_client.expire(key, window_ms / 1000)
return True
async def reset_rate_limit(identifier: str) -> None:
await redis_client.delete(f"ratelimit:{identifier}")
async def rate_limiter(identifier: str, limit: int = 10, window_ms: int = 60000):
allowed = await check_rate_limit(identifier, limit, window_ms)
if not allowed:
raise Exception('超出频率限制')
# 继续操作
滑动窗口限流器
// services/ratelimit.service.ts
export async function slidingWindowRateLimit(
identifier: string,
limit: number,
windowMs: number
): Promise<void> {
const key = `ratelimit:${identifier}`
// 获取当前计数
const current = await redisClient.incr(key)
if (current > limit) {
throw new Error('超出频率限制')
}
// 移除窗口外的旧条目
const now = Date.now()
const oldest = await redisClient.zRangeByScore(
key,
0,
Date.now() - windowMs,
Date.now(),
'WITHSCORES'
)
if (oldest.length > 0) {
await redisClient.zRemRangeByScore(
key,
oldest[0].score,
oldest[oldest.length - 1].score,
oldest[oldest.length - 1].member
)
}
}
// 使用
async function apiRateLimiter(
identifier: string,
limit: number = 100,
windowMs: number = 60000
): Promise<void> {
await slidingWindowRateLimit(identifier, limit, windowMs)
// 继续 API 调用
}
async def sliding_window_rate_limit(
identifier: str,
limit: int = 100,
window_ms: int = 60000
) -> None:
key = f"ratelimit:{identifier}"
# 获取当前计数
current = await redis_client.incr(key)
if current > limit:
raise Exception('超出频率限制')
# 移除窗口外的旧条目
now = int(time.time() * 1000)
oldest = await redis_client.zrangebyscore(
key,
0,
now - window_ms,
now,
withscores=True
)
if oldest:
await redis_client.zremrangebyscore(
key,
oldest[0].score,
oldest[oldest.length - 1].score,
oldest[oldest.length - 1].member
)
最佳实践
-
使用适当的数据结构
- 使用哈希存储相关字段
- 使用集合存储唯一集合
- 使用有序集合存储排名数据
- 使用列表存储有序序列
-
始终设置 TTL
- 缓存数据应始终设置过期时间
- 根据数据的易变性使用适当的 TTL
- 考虑为频繁访问的数据使用滑动 TTL
-
使用压缩
- 在缓存之前压缩大量数据
- 使用高效的序列化格式
- 考虑为性能使用二进制格式
-
处理连接错误
- 实施适当的错误处理
- 使用连接池
- 设置适当的重试策略
- 监控连接健康状态
-
监控 Redis 性能
- 跟踪内存使用情况
- 监控命中率/未命中率
- 观察慢操作
- 设置关键指标的警报
-
使用管道批量操作
- 将多个操作一起批处理
- 减少网络往返次数
- 提高整体性能
-
使用适当的序列化
- 为复杂对象使用 JSON
- 考虑为高性能使用协议缓冲区
- 避免对结构化数据使用字符串连接
-
键命名
- 使用层次结构命名约定
- 包括环境前缀
- 使用一致的模式
- 避免过长的键
-
缓存失效
- 实施适当的失效策略
- 谨慎使用基于模式的失效
- 考虑为复杂场景使用基于标签的失效
-
安全
- 在生产中使用认证
- 加密敏感数据
- 使用 TLS 进行网络连接
- 遵循最小权限原则