推荐系统部署技能 recommendation-system

该技能用于构建和部署生产级推荐系统,提供完整的架构方案,包括特征存储、多层缓存、A/B测试和全面监控。适用于开发个性化推荐API、优化性能、处理缓存失效和多样性问题,以及进行实验跟踪和质量评估。关键词:推荐系统、特征存储、缓存策略、A/B测试、Prometheus监控、实时推荐、机器学习模型、个性化API、Redis、CTR优化。

AI应用 0 次安装 0 次浏览 更新于 3/7/2026

name: recommendation-system description: 部署生产级推荐系统,包括特征存储、缓存和A/B测试。适用于个性化API、低延迟服务,或解决缓存失效、实验跟踪、质量监控问题。 keywords: 推荐系统, 个性化, 特征存储, 模型服务, 缓存策略, Redis, A/B测试, Thompson采样, 推荐指标, CTR, 转化率, 目录覆盖度, 多样性, Prometheus监控, 推荐API, 实时推荐, 协同过滤集成, 生产推荐, 实验跟踪 license: MIT

推荐系统

生产就绪的架构,用于可扩展的推荐系统,包括特征存储、多层缓存、A/B测试和全面监控。

何时使用此技能

加载此技能时:

  • 构建推荐API:大规模提供个性化推荐
  • 实现缓存:多层缓存以实现亚毫秒延迟
  • 运行A/B测试:实验推荐算法
  • 监控质量:跟踪CTR、转化率、多样性、覆盖度
  • 优化性能:降低延迟、增加吞吐量
  • 特征工程:使用特征存储管理用户/物品特征

快速开始:5步创建推荐API

# 1. 安装依赖
pip install fastapi==0.109.0 redis==5.0.0 prometheus-client==0.19.0

# 2. 启动Redis(用于缓存和特征存储)
docker run -d -p 6379:6379 redis:alpine

# 3. 创建推荐服务:app.py
cat > app.py << 'EOF'
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import redis
import json

app = FastAPI()
cache = redis.Redis(host='localhost', port=6379, decode_responses=True)

class RecommendationResponse(BaseModel):
    user_id: str
    items: List[str]
    cached: bool

@app.post("/recommendations", response_model=RecommendationResponse)
async def get_recommendations(user_id: str, n: int = 10):
    # 检查缓存
    cache_key = f"recs:{user_id}:{n}"
    cached = cache.get(cache_key)

    if cached:
        return RecommendationResponse(
            user_id=user_id,
            items=json.loads(cached),
            cached=True
        )

    # 生成推荐(简化)
    items = [f"item_{i}" for i in range(n)]

    # 缓存5分钟
    cache.setex(cache_key, 300, json.dumps(items))

    return RecommendationResponse(
        user_id=user_id,
        items=items,
        cached=False
    )

@app.get("/health")
async def health():
    return {"status": "healthy"}
EOF

# 4. 运行API
uvicorn app:app --host 0.0.0.0 --port 8000

# 5. 测试
curl -X POST "http://localhost:8000/recommendations?user_id=user_123&n=10"

结果:在5分钟内创建具有缓存的推荐API。

系统架构

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ 用户事件    │────▶│ 特征        │────▶│ 模型        │
│ (点击,    │     │ 存储        │     │ 服务        │
│  购买)     │     │ (Redis)   │     │             │
└─────────────┘     └─────────────┘     └─────────────┘
                           │                    │
                           ▼                    ▼
                    ┌─────────────┐     ┌─────────────┐
                    │ 训练        │     │ API         │
                    │ 管道        │     │ (FastAPI)  │
                    └─────────────┘     └─────────────┘
                                               │
                                               ▼
                                        ┌─────────────┐
                                        │ 监控        │
                                        │ (Prometheus)│
                                        └─────────────┘

核心组件

1. 特征存储

集中存储用户和物品特征:

import redis
import json

class FeatureStore:
    """使用Redis缓存快速访问特征。"""

    def __init__(self, redis_client):
        self.redis = redis_client
        self.ttl = 3600  # 1小时

    def get_user_features(self, user_id: str) -> dict:
        cache_key = f"user_features:{user_id}"
        cached = self.redis.get(cache_key)

        if cached:
            return json.loads(cached)

        # 从数据库获取
        features = fetch_from_db(user_id)

        # 缓存
        self.redis.setex(cache_key, self.ttl, json.dumps(features))
        return features

2. 模型服务

服务多个模型进行A/B测试:

class ModelServing:
    """服务多个推荐模型。"""

    def __init__(self):
        self.models = {}

    def register_model(self, name: str, model, is_default: bool = False):
        self.models[name] = model
        if is_default:
            self.default_model = name

    def predict(self, user_features: dict, item_features: list, model_name: str = None):
        model = self.models.get(model_name or self.default_model)
        return model.predict(user_features, item_features)

3. 缓存层

多层缓存以实现低延迟:

class TieredCache:
    """L1(内存) -> L2(Redis) -> L3(数据库)。"""

    def __init__(self, redis_client):
        self.l1_cache = {}  # 内存中
        self.redis = redis_client  # L2

    def get(self, key: str):
        # L1:内存(最快)
        if key in self.l1_cache:
            return self.l1_cache[key]

        # L2:Redis
        cached = self.redis.get(key)
        if cached:
            value = json.loads(cached)
            self.l1_cache[key] = value  # 提升到L1
            return value

        # L3:未命中(从数据库获取)
        return None

关键指标

指标 描述 目标
CTR 点击率 >5%
转化率 从推荐产生的购买 >2%
P95延迟 第95百分位响应时间 <200ms
缓存命中率 从缓存服务的百分比 >80%
覆盖度 推荐目录的百分比 >50%
多样性 推荐中的多样性 >0.7

已知问题预防

1. 新用户冷启动

问题:无历史记录的用户没有推荐,初始体验差。

解决方案:使用基于流行度的回退:

def get_recommendations(user_id: str, n: int = 10):
    user_features = feature_store.get_user_features(user_id)

    # 检查是否为新用户(无购买历史)
    if user_features.get('total_purchases', 0) == 0:
        # 回退到流行物品
        return get_popular_items(n)

    # 个性化推荐
    return generate_personalized_recs(user_id, n)

2. 用户操作时的缓存失效

问题:用户购买后,缓存仍显示已购物品在推荐中。

解决方案:在相关操作时使缓存失效:

INVALIDATING_ACTIONS = {'purchase', 'rating', 'add_to_cart'}

def on_user_action(user_id: str, action: str):
    if action in INVALIDATING_ACTIONS:
        cache_key = f"recs:{user_id}:*"
        redis_client.delete(cache_key)
        logger.info(f"因{action}使{user_id}的缓存失效")

3. 缓存到期时的惊群效应

问题:多个用户缓存同时到期,超载数据库/模型。

解决方案:为TTL添加随机抖动:

import random

def set_cache(key: str, value: dict, base_ttl: int = 300):
    # 添加±10%抖动
    jitter = random.uniform(-0.1, 0.1) * base_ttl
    ttl = int(base_ttl + jitter)
    redis_client.setex(key, ttl, json.dumps(value))

4. 低多样性 = 过滤泡泡

问题:推荐太相似,用户只看到同一类别。

解决方案:实施多样性约束:

def rank_with_diversity(items: list, scores: list, n: int = 10):
    selected = []
    category_counts = {}

    for item, score in sorted(zip(items, scores), key=lambda x: -x[1]):
        category = item['category']

        # 每个类别限制3个物品
        if category_counts.get(category, 0) >= 3:
            continue

        selected.append(item)
        category_counts[category] = category_counts.get(category, 0) + 1

        if len(selected) >= n:
            break

    return selected

5. 无监控 = 无声退化

问题:推荐质量下降,直到用户投诉才被发现。

解决方案:使用警报进行持续监控:

from prometheus_client import Counter, Histogram

recommendation_clicks = Counter('recommendation_clicks_total')
recommendation_latency = Histogram('recommendation_latency_seconds')

@app.post("/recommendations")
async def get_recommendations(user_id: str):
    start = time.time()

    recs = generate_recs(user_id)

    latency = time.time() - start
    recommendation_latency.observe(latency)

    return recs

@app.post("/track/click")
async def track_click(user_id: str, item_id: str):
    recommendation_clicks.inc()
    # 如果CTR低于3%则警报

6. 过时特征 = 过时推荐

问题:用户偏好改变但特征未更新,推荐不相关。

解决方案:设置适当的TTL和更新触发器:

class FeatureStore:
    def __init__(self, redis_client):
        self.redis = redis_client
        # 频繁变化特征的较短TTL
        self.user_ttl = 300  # 5分钟
        self.item_ttl = 3600  # 1小时

    def update_on_event(self, user_id: str, event: str):
        # 在重要事件时失效
        if event in ['purchase', 'rating']:
            self.redis.delete(f"user_features:{user_id}")
            logger.info(f"更新了{user_id}的特征")

7. A/B测试样本量太小

问题:过早宣布赢家,结果无统计显著性。

解决方案:先计算所需样本量:

def calculate_sample_size(
    baseline_rate: float,
    min_detectable_effect: float,
    alpha: float = 0.05,
    power: float = 0.8
) -> int:
    """计算每个变体所需的样本量。"""
    from scipy import stats

    z_alpha = stats.norm.ppf(1 - alpha/2)
    z_beta = stats.norm.ppf(power)

    p1 = baseline_rate
    p2 = baseline_rate * (1 + min_detectable_effect)
    p_avg = (p1 + p2) / 2

    n = (
        (z_alpha + z_beta)**2 * 2 * p_avg * (1 - p_avg) /
        (p2 - p1)**2
    )

    return int(n)

# 示例:检测基线CTR=5%的10%提升
n_required = calculate_sample_size(
    baseline_rate=0.05,
    min_detectable_effect=0.10
)
print(f"所需样本量:每个变体{n_required}")
# 在得出结论前等待两个变体达到此大小

何时加载参考文件

为详细生产实现加载参考文件:

  • 生产架构:加载 references/production-architecture.md 以获取完整的特征存储、模型服务和推荐服务实现,包括批量获取、缓存集成和FastAPI部署模式。

  • 缓存策略:加载 references/caching-strategies.md 当实施多层缓存(L1/L2/L3)、缓存预热、失效策略、概率刷新或惊群效应预防时。

  • A/B测试框架:加载 references/ab-testing-framework.md 以获取确定性变体分配、Thompson采样(多臂老虎机)、贝叶斯和频率显著性测试以及实验跟踪。

  • 监控与警报:加载 references/monitoring-alerting.md 以获取Prometheus指标集成、仪表板端点、警报规则和质量监控(多样性、覆盖度)。

最佳实践

  1. 特征预计算:离线计算特征,从缓存服务
  2. 批量获取:使用Redis MGET获取多个用户/物品
  3. 积极缓存:用户推荐设置5-15分钟TTL
  4. 优雅降级:如果个性化失败,返回流行物品
  5. 监控一切:跟踪CTR、延迟、多样性、覆盖度
  6. 持续A/B测试:始终实验新算法
  7. 多样性约束:确保多样化推荐
  8. 解释推荐:提供理由(“高评分”、“流行”)

常见模式

推荐服务

class RecommendationService:
    def __init__(self, feature_store, model_serving, cache):
        self.feature_store = feature_store
        self.model_serving = model_serving
        self.cache = cache

    def get_recommendations(self, user_id: str, n: int = 10):
        # 1. 检查缓存
        cached = self.cache.get(f"recs:{user_id}:{n}")
        if cached:
            return cached

        # 2. 获取特征
        user_features = self.feature_store.get_user_features(user_id)
        candidates = self.get_candidates(user_id)

        # 3. 评分候选
        scores = self.model_serving.predict(user_features, candidates)

        # 4. 带多样性排名
        recommendations = self.rank_with_diversity(candidates, scores, n)

        # 5. 缓存
        self.cache.set(f"recs:{user_id}:{n}", recommendations, ttl=300)

        return recommendations

A/B测试

def assign_variant(user_id: str, experiment_id: str) -> str:
    """确定性分配 - 同一用户始终获得同一变体。"""
    import hashlib

    hash_input = f"{user_id}:{experiment_id}"
    hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)

    # 50/50分割
    return 'control' if hash_value % 2 == 0 else 'treatment'

# 使用
variant = assign_variant('user_123', 'rec_algo_v2')
model_name = 'main' if variant == 'control' else 'experimental'
recs = get_recommendations(user_id, model_name=model_name)

监控

from prometheus_client import Counter, Histogram

requests_total = Counter('recommendation_requests_total', ['status'])
latency_seconds = Histogram('recommendation_latency_seconds')

@app.post("/recommendations")
async def get_recommendations(user_id: str):
    with latency_seconds.time():
        try:
            recs = generate_recs(user_id)
            requests_total.labels(status='success').inc()
            return recs
        except Exception as e:
            requests_total.labels(status='error').inc()
            raise