name: distributed-caching description: 使用Redis和Memcached进行分布式缓存设计、实施和优化的专家技能。设计缓存架构、配置淘汰策略、实现缓存模式(旁路缓存、直写、后写)、监控缓存性能并优化内存使用。 allowed-tools: Bash(*) Read Write Edit Glob Grep WebFetch metadata: author: babysitter-sdk version: “1.0.0” category: caching backlog-id: SK-010
distributed-caching
您是 distributed-caching - 专注于分布式缓存架构和优化的专业技能。此技能提供使用Redis、Memcached及相关技术设计、实施和维护高性能缓存层的专家能力。
概述
此技能支持AI驱动的缓存操作,包括:
- 设计Redis数据结构和访问模式
- 配置Redis Cluster和Sentinel以实现高可用性
- 实现缓存模式(旁路缓存、直写、后写)
- 配置淘汰策略(LRU、LFU、基于TTL)
- 监控缓存命中率和内存使用情况
- 调试缓存失效问题
- 优化内存效率
前提条件
- Redis 6.0+(推荐7.0+以使用高级功能)
- 或 Memcached 1.6+
- redis-cli和memcached实用程序
- 可选:Redis Stack用于JSON、搜索和时间序列
- 可选:Redis Enterprise用于生产部署
能力
1. Redis数据结构设计
为用例设计最优数据结构:
# String - 简单键值缓存
SET user:1001:profile '{"name":"John","email":"john@example.com"}' EX 3600
GET user:1001:profile
# Hash - 支持部分更新的结构化数据
HSET product:5001 name "Widget" price 29.99 stock 150
HGET product:5001 price
HINCRBY product:5001 stock -1
# Sorted Set - 排行榜和排名
ZADD leaderboard 1500 "player:1" 2200 "player:2" 1800 "player:3"
ZREVRANGE leaderboard 0 9 WITHSCORES # 前10名
ZRANK leaderboard "player:1"
# List - 消息队列和活动流
LPUSH notifications:user:1001 '{"type":"order","id":"ord-123"}'
LRANGE notifications:user:1001 0 19 # 最新20条
LTRIM notifications:user:1001 0 99 # 仅保留100条
# Set - 标签、独立访客、关系
SADD product:5001:tags "electronics" "sale" "featured"
SINTER user:1001:interests product:5001:tags # 共同兴趣
# HyperLogLog - 基数估计
PFADD daily:visitors:20260124 "user:1001" "user:1002" "guest:abc"
PFCOUNT daily:visitors:20260124
# Stream - 事件溯源和消息流
XADD orders * action "created" order_id "ord-123" total "99.99"
XREAD COUNT 10 STREAMS orders 0
XGROUP CREATE orders order-processors $ MKSTREAM
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders >
2. 缓存模式实现
实现常见缓存模式:
import redis
import json
from functools import wraps
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 旁路缓存模式(延迟加载)
def get_user(user_id):
cache_key = f"user:{user_id}"
# 先尝试缓存
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 缓存未命中 - 从数据库获取
user = database.get_user(user_id)
# 使用TTL填充缓存
r.setex(cache_key, 3600, json.dumps(user))
return user
# 直写模式
def update_user(user_id, data):
cache_key = f"user:{user_id}"
# 先更新数据库
database.update_user(user_id, data)
# 立即更新缓存
r.setex(cache_key, 3600, json.dumps(data))
return data
# 后写(写回)模式
def update_user_async(user_id, data):
cache_key = f"user:{user_id}"
# 立即更新缓存
r.setex(cache_key, 3600, json.dumps(data))
# 队列化数据库写入
r.lpush("write_queue", json.dumps({
"operation": "update_user",
"user_id": user_id,
"data": data,
"timestamp": time.time()
}))
# 使用旁路缓存装饰器实现读穿透
def cached(ttl=3600, prefix="cache"):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 从函数和参数生成缓存键
key = f"{prefix}:{func.__name__}:{hash(str(args) + str(kwargs))}"
cached_value = r.get(key)
if cached_value:
return json.loads(cached_value)
result = func(*args, **kwargs)
r.setex(key, ttl, json.dumps(result))
return result
return wrapper
return decorator
@cached(ttl=300, prefix="products")
def get_product_recommendations(user_id, category):
return recommendation_service.get_recommendations(user_id, category)
3. 缓存失效策略
实现健壮的缓存失效:
# 基于时间的失效(TTL)
r.setex("session:abc123", 1800, session_data) # 30分钟
# 事件驱动失效
def on_user_updated(user_id):
# 删除特定缓存条目
r.delete(f"user:{user_id}")
r.delete(f"user:{user_id}:profile")
# 删除模式匹配的键(谨慎使用)
keys = r.keys(f"user:{user_id}:*")
if keys:
r.delete(*keys)
# 基于标签的失效
def set_with_tags(key, value, ttl, tags):
pipe = r.pipeline()
pipe.setex(key, ttl, value)
for tag in tags:
pipe.sadd(f"tag:{tag}", key)
pipe.execute()
def invalidate_by_tag(tag):
keys = r.smembers(f"tag:{tag}")
if keys:
pipe = r.pipeline()
pipe.delete(*keys)
pipe.delete(f"tag:{tag}")
pipe.execute()
# 基于版本的失效
def get_with_version(key, version_key):
version = r.get(version_key) or "1"
versioned_key = f"{key}:v{version}"
return r.get(versioned_key)
def invalidate_version(version_key):
r.incr(version_key) # 递增版本,旧键自然过期
4. Redis集群配置
配置Redis集群以实现可扩展性:
# redis-cluster.conf
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 5000
appendonly yes
appendfsync everysec
# 内存管理
maxmemory 4gb
maxmemory-policy allkeys-lru
# 持久化
save 900 1
save 300 10
save 60 10000
# 复制
replica-read-only yes
min-replicas-to-write 1
min-replicas-max-lag 10
# 创建集群
redis-cli --cluster create \
127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
--cluster-replicas 1
# 检查集群状态
redis-cli -c -p 7000 cluster info
redis-cli -c -p 7000 cluster nodes
# 重新平衡槽
redis-cli --cluster rebalance 127.0.0.1:7000
5. Redis Sentinel高可用性配置
配置Sentinel以实现自动故障转移:
# sentinel.conf
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel auth-pass mymaster <password>
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1
# 通知脚本
sentinel notification-script mymaster /opt/redis/notify.sh
sentinel client-reconfig-script mymaster /opt/redis/reconfig.sh
# 使用Sentinel的Python客户端
from redis.sentinel import Sentinel
sentinel = Sentinel([
('sentinel1.example.com', 26379),
('sentinel2.example.com', 26379),
('sentinel3.example.com', 26379)
], socket_timeout=0.1)
# 获取主节点
master = sentinel.master_for('mymaster', socket_timeout=0.1)
master.set('key', 'value')
# 获取副本用于读取
replica = sentinel.slave_for('mymaster', socket_timeout=0.1)
value = replica.get('key')
6. 淘汰策略配置
配置最优淘汰策略:
# LRU - 最近最少使用(通用目的)
maxmemory-policy allkeys-lru
# LFU - 最不经常使用(热点数据场景)
maxmemory-policy allkeys-lfu
lfu-log-factor 10
lfu-decay-time 1
# Volatile - 仅淘汰具有TTL的键
maxmemory-policy volatile-lru
maxmemory-policy volatile-lfu
maxmemory-policy volatile-ttl
# 不淘汰 - 内存满时返回错误
maxmemory-policy noeviction
7. 缓存性能监控
监控缓存健康状况和性能:
# Redis INFO命令
redis-cli INFO stats
redis-cli INFO memory
redis-cli INFO replication
redis-cli INFO clients
# 要监控的关键指标
# - hit_rate: keyspace_hits / (keyspace_hits + keyspace_misses)
# - memory_usage: used_memory / maxmemory
# - evicted_keys: 被淘汰的键数量
# - connected_clients: 当前客户端连接数
# - blocked_clients: 等待阻塞操作的客户端
# 计算缓存命中率
info = r.info('stats')
hits = info['keyspace_hits']
misses = info['keyspace_misses']
hit_rate = hits / (hits + misses) * 100 if (hits + misses) > 0 else 0
print(f"缓存命中率: {hit_rate:.2f}%")
# 内存分析
memory_info = r.info('memory')
print(f"已用内存: {memory_info['used_memory_human']}")
print(f"峰值内存: {memory_info['used_memory_peak_human']}")
print(f"碎片化比率: {memory_info['mem_fragmentation_ratio']}")
MCP服务器集成
此技能可以利用以下MCP服务器:
| 服务器 | 描述 | 安装 |
|---|---|---|
| mcp-redis(官方) | Redis数据管理 | GitHub |
| Redis Cloud Admin API | 云Redis管理 | 参见Redis文档 |
最佳实践
缓存设计
- 键命名约定 - 使用一致、分层的命名(例如,
entity:id:attribute) - TTL策略 - 始终设置TTL以防止无限制增长
- 序列化 - 使用高效格式(MessagePack、Protocol Buffers)
- 热键处理 - 分片热键或使用本地缓存
数据一致性
- 读取使用旁路缓存 - 对大多数用例最安全的模式
- 写入使用直写 - 当一致性至关重要时
- 最终一致性 - 为性能接受数据延迟
- 版本标记 - 跟踪数据版本以进行失效
性能
- 管道命令 - 批量多个操作
- 连接池 - 重用连接
- 避免大键 - 保持值小于100KB
- 使用适当的数据结构 - 对部分更新使用哈希而非JSON字符串
流程集成
此技能与以下流程集成:
caching-strategy-design.js- 缓存架构规划- 应用级缓存优化工作流
- 性能调优建议
输出格式
执行操作时,提供结构化输出:
{
"operation": "analyze-cache",
"status": "success",
"metrics": {
"hitRate": 94.5,
"missRate": 5.5,
"evictionRate": 0.02,
"memoryUsage": {
"used": "3.2GB",
"peak": "3.8GB",
"maxmemory": "4GB",
"utilizationPercent": 80
},
"connections": {
"current": 45,
"blocked": 0,
"maxClients": 10000
}
},
"recommendations": [
{
"category": "memory",
"issue": "内存利用率高",
"action": "考虑增加maxmemory或启用LFU淘汰策略",
"priority": "medium"
}
]
}
错误处理
常见问题
| 错误 | 原因 | 解决方案 |
|---|---|---|
OOM command not allowed |
达到内存限制 | 增加maxmemory或启用淘汰策略 |
CLUSTERDOWN |
集群不可用 | 检查集群健康状态,多数节点 |
MOVED |
键在不同节点 | 使用集群感知客户端 |
BUSY |
Lua脚本正在运行 | 等待或使用SCRIPT KILL终止脚本 |
LOADING |
Redis正在从磁盘加载 | 等待加载完成 |
约束
- 监控内存使用以防止OOM情况
- 在应用程序中使用连接池
- 为缓存不可用实现断路器
- 彻底测试缓存失效
- 考虑缓存雪崩预防