name: recommendation-system description: 部署生产级推荐系统,包括特征存储、缓存和A/B测试。适用于个性化API、低延迟服务,或解决缓存失效、实验跟踪、质量监控问题。 keywords: 推荐系统, 个性化, 特征存储, 模型服务, 缓存策略, Redis, A/B测试, Thompson采样, 推荐指标, CTR, 转化率, 目录覆盖度, 多样性, Prometheus监控, 推荐API, 实时推荐, 协同过滤集成, 生产推荐, 实验跟踪 license: MIT
推荐系统
生产就绪的架构,用于可扩展的推荐系统,包括特征存储、多层缓存、A/B测试和全面监控。
何时使用此技能
加载此技能时:
- 构建推荐API:大规模提供个性化推荐
- 实现缓存:多层缓存以实现亚毫秒延迟
- 运行A/B测试:实验推荐算法
- 监控质量:跟踪CTR、转化率、多样性、覆盖度
- 优化性能:降低延迟、增加吞吐量
- 特征工程:使用特征存储管理用户/物品特征
快速开始:5步创建推荐API
# 1. 安装依赖
pip install fastapi==0.109.0 redis==5.0.0 prometheus-client==0.19.0
# 2. 启动Redis(用于缓存和特征存储)
docker run -d -p 6379:6379 redis:alpine
# 3. 创建推荐服务:app.py
cat > app.py << 'EOF'
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import redis
import json
app = FastAPI()
cache = redis.Redis(host='localhost', port=6379, decode_responses=True)
class RecommendationResponse(BaseModel):
user_id: str
items: List[str]
cached: bool
@app.post("/recommendations", response_model=RecommendationResponse)
async def get_recommendations(user_id: str, n: int = 10):
# 检查缓存
cache_key = f"recs:{user_id}:{n}"
cached = cache.get(cache_key)
if cached:
return RecommendationResponse(
user_id=user_id,
items=json.loads(cached),
cached=True
)
# 生成推荐(简化)
items = [f"item_{i}" for i in range(n)]
# 缓存5分钟
cache.setex(cache_key, 300, json.dumps(items))
return RecommendationResponse(
user_id=user_id,
items=items,
cached=False
)
@app.get("/health")
async def health():
return {"status": "healthy"}
EOF
# 4. 运行API
uvicorn app:app --host 0.0.0.0 --port 8000
# 5. 测试
curl -X POST "http://localhost:8000/recommendations?user_id=user_123&n=10"
结果:在5分钟内创建具有缓存的推荐API。
系统架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 用户事件 │────▶│ 特征 │────▶│ 模型 │
│ (点击, │ │ 存储 │ │ 服务 │
│ 购买) │ │ (Redis) │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ 训练 │ │ API │
│ 管道 │ │ (FastAPI) │
└─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ 监控 │
│ (Prometheus)│
└─────────────┘
核心组件
1. 特征存储
集中存储用户和物品特征:
import redis
import json
class FeatureStore:
"""使用Redis缓存快速访问特征。"""
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 3600 # 1小时
def get_user_features(self, user_id: str) -> dict:
cache_key = f"user_features:{user_id}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# 从数据库获取
features = fetch_from_db(user_id)
# 缓存
self.redis.setex(cache_key, self.ttl, json.dumps(features))
return features
2. 模型服务
服务多个模型进行A/B测试:
class ModelServing:
"""服务多个推荐模型。"""
def __init__(self):
self.models = {}
def register_model(self, name: str, model, is_default: bool = False):
self.models[name] = model
if is_default:
self.default_model = name
def predict(self, user_features: dict, item_features: list, model_name: str = None):
model = self.models.get(model_name or self.default_model)
return model.predict(user_features, item_features)
3. 缓存层
多层缓存以实现低延迟:
class TieredCache:
"""L1(内存) -> L2(Redis) -> L3(数据库)。"""
def __init__(self, redis_client):
self.l1_cache = {} # 内存中
self.redis = redis_client # L2
def get(self, key: str):
# L1:内存(最快)
if key in self.l1_cache:
return self.l1_cache[key]
# L2:Redis
cached = self.redis.get(key)
if cached:
value = json.loads(cached)
self.l1_cache[key] = value # 提升到L1
return value
# L3:未命中(从数据库获取)
return None
关键指标
| 指标 | 描述 | 目标 |
|---|---|---|
| CTR | 点击率 | >5% |
| 转化率 | 从推荐产生的购买 | >2% |
| P95延迟 | 第95百分位响应时间 | <200ms |
| 缓存命中率 | 从缓存服务的百分比 | >80% |
| 覆盖度 | 推荐目录的百分比 | >50% |
| 多样性 | 推荐中的多样性 | >0.7 |
已知问题预防
1. 新用户冷启动
问题:无历史记录的用户没有推荐,初始体验差。
解决方案:使用基于流行度的回退:
def get_recommendations(user_id: str, n: int = 10):
user_features = feature_store.get_user_features(user_id)
# 检查是否为新用户(无购买历史)
if user_features.get('total_purchases', 0) == 0:
# 回退到流行物品
return get_popular_items(n)
# 个性化推荐
return generate_personalized_recs(user_id, n)
2. 用户操作时的缓存失效
问题:用户购买后,缓存仍显示已购物品在推荐中。
解决方案:在相关操作时使缓存失效:
INVALIDATING_ACTIONS = {'purchase', 'rating', 'add_to_cart'}
def on_user_action(user_id: str, action: str):
if action in INVALIDATING_ACTIONS:
cache_key = f"recs:{user_id}:*"
redis_client.delete(cache_key)
logger.info(f"因{action}使{user_id}的缓存失效")
3. 缓存到期时的惊群效应
问题:多个用户缓存同时到期,超载数据库/模型。
解决方案:为TTL添加随机抖动:
import random
def set_cache(key: str, value: dict, base_ttl: int = 300):
# 添加±10%抖动
jitter = random.uniform(-0.1, 0.1) * base_ttl
ttl = int(base_ttl + jitter)
redis_client.setex(key, ttl, json.dumps(value))
4. 低多样性 = 过滤泡泡
问题:推荐太相似,用户只看到同一类别。
解决方案:实施多样性约束:
def rank_with_diversity(items: list, scores: list, n: int = 10):
selected = []
category_counts = {}
for item, score in sorted(zip(items, scores), key=lambda x: -x[1]):
category = item['category']
# 每个类别限制3个物品
if category_counts.get(category, 0) >= 3:
continue
selected.append(item)
category_counts[category] = category_counts.get(category, 0) + 1
if len(selected) >= n:
break
return selected
5. 无监控 = 无声退化
问题:推荐质量下降,直到用户投诉才被发现。
解决方案:使用警报进行持续监控:
from prometheus_client import Counter, Histogram
recommendation_clicks = Counter('recommendation_clicks_total')
recommendation_latency = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
start = time.time()
recs = generate_recs(user_id)
latency = time.time() - start
recommendation_latency.observe(latency)
return recs
@app.post("/track/click")
async def track_click(user_id: str, item_id: str):
recommendation_clicks.inc()
# 如果CTR低于3%则警报
6. 过时特征 = 过时推荐
问题:用户偏好改变但特征未更新,推荐不相关。
解决方案:设置适当的TTL和更新触发器:
class FeatureStore:
def __init__(self, redis_client):
self.redis = redis_client
# 频繁变化特征的较短TTL
self.user_ttl = 300 # 5分钟
self.item_ttl = 3600 # 1小时
def update_on_event(self, user_id: str, event: str):
# 在重要事件时失效
if event in ['purchase', 'rating']:
self.redis.delete(f"user_features:{user_id}")
logger.info(f"更新了{user_id}的特征")
7. A/B测试样本量太小
问题:过早宣布赢家,结果无统计显著性。
解决方案:先计算所需样本量:
def calculate_sample_size(
baseline_rate: float,
min_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.8
) -> int:
"""计算每个变体所需的样本量。"""
from scipy import stats
z_alpha = stats.norm.ppf(1 - alpha/2)
z_beta = stats.norm.ppf(power)
p1 = baseline_rate
p2 = baseline_rate * (1 + min_detectable_effect)
p_avg = (p1 + p2) / 2
n = (
(z_alpha + z_beta)**2 * 2 * p_avg * (1 - p_avg) /
(p2 - p1)**2
)
return int(n)
# 示例:检测基线CTR=5%的10%提升
n_required = calculate_sample_size(
baseline_rate=0.05,
min_detectable_effect=0.10
)
print(f"所需样本量:每个变体{n_required}")
# 在得出结论前等待两个变体达到此大小
何时加载参考文件
为详细生产实现加载参考文件:
-
生产架构:加载
references/production-architecture.md以获取完整的特征存储、模型服务和推荐服务实现,包括批量获取、缓存集成和FastAPI部署模式。 -
缓存策略:加载
references/caching-strategies.md当实施多层缓存(L1/L2/L3)、缓存预热、失效策略、概率刷新或惊群效应预防时。 -
A/B测试框架:加载
references/ab-testing-framework.md以获取确定性变体分配、Thompson采样(多臂老虎机)、贝叶斯和频率显著性测试以及实验跟踪。 -
监控与警报:加载
references/monitoring-alerting.md以获取Prometheus指标集成、仪表板端点、警报规则和质量监控(多样性、覆盖度)。
最佳实践
- 特征预计算:离线计算特征,从缓存服务
- 批量获取:使用Redis MGET获取多个用户/物品
- 积极缓存:用户推荐设置5-15分钟TTL
- 优雅降级:如果个性化失败,返回流行物品
- 监控一切:跟踪CTR、延迟、多样性、覆盖度
- 持续A/B测试:始终实验新算法
- 多样性约束:确保多样化推荐
- 解释推荐:提供理由(“高评分”、“流行”)
常见模式
推荐服务
class RecommendationService:
def __init__(self, feature_store, model_serving, cache):
self.feature_store = feature_store
self.model_serving = model_serving
self.cache = cache
def get_recommendations(self, user_id: str, n: int = 10):
# 1. 检查缓存
cached = self.cache.get(f"recs:{user_id}:{n}")
if cached:
return cached
# 2. 获取特征
user_features = self.feature_store.get_user_features(user_id)
candidates = self.get_candidates(user_id)
# 3. 评分候选
scores = self.model_serving.predict(user_features, candidates)
# 4. 带多样性排名
recommendations = self.rank_with_diversity(candidates, scores, n)
# 5. 缓存
self.cache.set(f"recs:{user_id}:{n}", recommendations, ttl=300)
return recommendations
A/B测试
def assign_variant(user_id: str, experiment_id: str) -> str:
"""确定性分配 - 同一用户始终获得同一变体。"""
import hashlib
hash_input = f"{user_id}:{experiment_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
# 50/50分割
return 'control' if hash_value % 2 == 0 else 'treatment'
# 使用
variant = assign_variant('user_123', 'rec_algo_v2')
model_name = 'main' if variant == 'control' else 'experimental'
recs = get_recommendations(user_id, model_name=model_name)
监控
from prometheus_client import Counter, Histogram
requests_total = Counter('recommendation_requests_total', ['status'])
latency_seconds = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
with latency_seconds.time():
try:
recs = generate_recs(user_id)
requests_total.labels(status='success').inc()
return recs
except Exception as e:
requests_total.labels(status='error').inc()
raise