Skip to content

经典推荐算法与技术细节

在掌握了推荐系统的基本架构后,本章将深入探讨具体的推荐算法和技术实现细节,帮助你从理论走向实战。

1. 基于内容的推荐(Content-Based Recommendation)

1.1 核心思想

基于内容的推荐的核心原理是:给用户推荐之前喜欢的物品相似的物品

这包含两种常见方式:

  • U2I2I:用户→物品→物品

    1. 找用户喜欢的历史物品
    2. 找与这些物品相似的物品
    3. 推荐给用户
  • U2TAG2I:用户→标签→物品

    1. 提取用户的兴趣标签
    2. 找带有这些标签的物品
    3. 推荐给用户

1.2 用户向量与物品向量

1.2.1 向量表示

将用户和物品都表示为向量,通过计算向量相似度来衡量匹配程度。

用户向量:代表用户的兴趣偏好 物品向量:代表物品的特征属性

1.2.2 相似度计算方法

余弦相似度(最常用)

python
def cosine_similarity(vec1, vec2):
    dot_product = np.dot(vec1, vec2)
    norm1 = np.linalg.norm(vec1)
    norm2 = np.linalg.norm(vec2)
    return dot_product / (norm1 * norm2)

欧氏距离

python
def euclidean_distance(vec1, vec2):
    return np.sqrt(np.sum((vec1 - vec2) ** 2))

皮尔逊相关系数

适用于处理评分数据,考虑用户评分习惯的差异。

1.3 基于内容推荐的优缺点

优点

  • ✅ 可解释性强,用户容易理解
  • ✅ 不依赖用户行为数据,冷启动问题较少
  • ✅ 推荐结果稳定,不会频繁变化

缺点

  • ❌ 很难发现用户未接触过的新类型物品
  • ❌ 物品特征需要人工标注或提取
  • ❌ 存在信息茧房风险,推荐范围窄

1.4 实际应用场景

  • 电商:相似商品推荐("看了这个的人还看了")
  • 视频:相似视频推荐("你可能也喜欢")
  • 音乐:相似歌曲推荐("猜你喜欢")

2. 基于协同过滤的推荐(Collaborative Filtering)

协同过滤是推荐系统中最经典、应用最广泛的算法,其核心思想是:根据群体的智慧进行推荐

2.1 核心概念

协同过滤假设:如果用户 A 和用户 B 过去的行为相似,那么他们对物品的偏好也相似;反之,如果物品 A 和物品 B 经常被相同的用户喜欢,那么这两个物品也相似。

2.2 基于记忆的协同过滤(Memory-Based CF)

2.2.1 User-based CF(基于用户的协同过滤)

原理:U2U2I - 和你兴趣相投的人也喜欢 XXX

流程

  1. 计算用户之间的相似度
  2. 找到与目标用户最相似的 K 个用户
  3. 找这些相似用户喜欢但目标用户未互动的物品
  4. 根据相似度加权,生成推荐列表

相似度计算

  • 余弦相似度
  • 皮尔逊相关系数
  • Jaccard 相似度

示例

用户 A 喜欢的物品:[物品1, 物品2, 物品3]
用户 B 喜欢的物品:[物品1, 物品2, 物品4]
用户 C 喜欢的物品:[物品1, 物品5]

给用户 C 推荐:
- 与用户 C 相似的用户是 A(都喜欢物品1)
- 用户 A 喜欢但用户 C 未看过的:物品2, 物品3
- 推荐:[物品2, 物品3]

2.2.2 Item-based CF(基于物品的协同过滤)

原理:U2I2I - 喜欢这个物品的人也喜欢 XXX

流程

  1. 计算物品之间的相似度
  2. 找到用户喜欢物品的相似物品
  3. 根据相似度加权,生成推荐列表

相似度计算

  • 基于用户共现度
  • 基于评分相关性

示例

物品1 被喜欢的人群:[用户A, 用户B, 用户C]
物品2 被喜欢的人群:[用户A, 用户B, 用户D]
物品3 被喜欢的人群:[用户A, 用户E]

用户 A 喜欢物品1,推荐:
- 与物品1 相似的物品是物品2(共现用户:A, B)
- 推荐:物品2

2.3 基于模型的协同过滤(Model-Based CF)

基于模型的方法通过机器学习算法构建用户-物品交互的数学模型。

2.3.1 矩阵分解(Matrix Factorization)

核心思想:将用户-物品评分矩阵分解为两个低维矩阵的乘积

$$R \approx U \times V^T$$

其中:

  • R:用户-物品评分矩阵(稀疏)
  • U:用户隐含因子矩阵
  • V:物品隐含因子矩阵

优势

  • 可以处理稀疏数据
  • 捕捉潜在特征
  • 预测缺失值

算法

  • SVD(Singular Value Decomposition)
  • ALS(Alternating Least Squares)
  • NMF(Non-negative Matrix Factorization)

2.3.2 深度学习模型

Neural Collaborative Filtering (NCF)

  • 用神经网络替代传统矩阵分解
  • 学习用户和物品的非线性交互

Deep & Wide

  • Wide 层:记忆能力,学习历史特征组合
  • Deep 层:泛化能力,学习潜在特征

2.4 协同过滤的优缺点

优点

  • ✅ 不需要物品特征信息
  • ✅ 可以发现用户潜在兴趣
  • ✅ 推荐新颖性和多样性好

缺点

  • ❌ 冷启动问题:新用户/新物品无法推荐
  • ❌ 数据稀疏性:用户行为稀疏时效果差
  • ❌ 热门偏向:热门物品被过度推荐

2.5 实际应用

  • 电商:个性化商品推荐
  • 视频:视频推荐
  • 音乐:音乐推荐

3. 多路召回融合策略

在实际推荐系统中,往往采用多路召回策略,然后对召回结果进行融合排序。

3.1 融合策略对比

3.1.1 按顺序展示

方法:召回1 结果 → 召回2 结果 → 召回3 结果...

优点:实现简单

缺点

  • 早期召回占主导
  • 后期召回难以曝光

适用场景:召回效果差异大,有明确优先级

3.1.2 平均法(Averaging)

方法:对所有召回的结果列表取平均

优点:公平对待所有召回

缺点

  • 忽略召回质量差异
  • 简单平均可能不合理

3.1.3 加权投票(Weighted Voting)

方法:根据召回质量设置权重

$$Score_{final} = \sum_{i=1}^{n} w_i \times Score_i$$

其中:

  • $w_i$:第 i 个召回路径的权重
  • $Score_i$:第 i 个召回路径的打分

设置权重原则

  • 质量高的召回路径权重高
  • 热门召回路径权重适中
  • 长尾召回路径权重较低

3.1.4 动态加权法(Dynamic Weighting)

方法:根据用户特征、物品特征动态调整权重

示例

python
def dynamic_weight(user_id, item_features):
    weights = {
        'cf': 0.5,      # 协同过滤
        'content': 0.3, # 内容推荐
        'hot': 0.2      # 热门推荐
    }
    
    # 新用户降低协同过滤权重
    if is_new_user(user_id):
        weights['cf'] = 0.2
        weights['content'] = 0.5
        weights['hot'] = 0.3
    
    return weights

3.1.5 机器学习权重法(ML-based Weighting)

方法:使用机器学习模型自动学习召回路径权重

流程

  1. 收集用户历史交互数据
  2. 标记每个物品的召回来源
  3. 训练模型预测用户点击概率
  4. 模型输出即为权重

优势

  • 自动优化权重
  • 适应数据变化

缺点

  • 需要大量标注数据
  • 训练成本高

3.2 融合策略选择建议

策略实现难度效果适用场景
按顺序展示⭐⭐召回差异大
平均法⭐⭐⭐⭐⭐快速上线
加权投票⭐⭐⭐⭐⭐⭐⭐稳定期
动态加权⭐⭐⭐⭐⭐⭐⭐⭐⭐个性化强
机器学习⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐大规模系统

3.3 成本考虑

如果召回路径质量接近,按照成本选择:

  • 实时计算(如协同过滤):成本低,权重可设高
  • 离线计算(如深度学习模型):成本高,权重可设低
  • 第三方接口(如 API):成本高,权重需谨慎设置

4. AB 测试系统设计与实现

AB 测试是推荐系统优化的核心工具,通过科学对比不同策略的效果,选择最优方案。

4.1 AB 测试基本流程

4.1.1 系统架构

用户请求

AB 测试服务

根据用户ID 分桶(分桶A / 分桶B)

访问后端推荐服务

返回对应分桶的推荐结果

用户交互(展现、点击、购买)

日志记录(带分桶标志)

数据分析

结果对比

全量部署获胜分桶

4.1.2 分桶算法

最常用的是哈希取模

python
def get_bucket(user_id, bucket_count=10):
    """
    将用户分配到指定数量的分桶中
    """
    hash_value = hash(user_id)
    return hash_value % bucket_count

注意事项

  • 同一用户必须始终分配到同一分桶
  • 分桶分布应尽量均匀
  • 新用户/老用户可以分流策略

4.2 AB 测试实施步骤

4.2.1 开发两个策略分支

开发同一个接口的两个分支:

  • 分桶 A:使用旧策略(Baseline)
  • 分桶 B:使用新策略(Experiment)

4.2.2 配置实验分流

在 AB 测试配置中设置分流比例:

json
{
  "experiment_id": "exp_001",
  "bucket_allocation": {
    "A": 0.6,  // 60% 流量
    "B": 0.4   // 40% 流量
  },
  "status": "running"
}

4.2.3 用户请求处理

1. 用户访问推荐接口
2. 请求携带用户ID
3. AB 测试服务根据用户ID计算分桶
4. 返回分桶结果:{user_id: "123456", bucket: "A"}
5. 根据分桶选择对应的策略返回推荐结果

4.2.4 日志收集

所有用户交互日志都带上分桶标志:

json
{
  "user_id": "123456",
  "bucket": "A",
  "event_type": "click",
  "item_id": "item_001",
  "timestamp": 1640000000
}

关键指标

  • 展现(Impression):物品展示次数
  • 点击(Click):用户点击次数
  • 购买(Purchase):用户购买次数
  • CTR(Click-Through Rate):点击率 = 点击 / 展现
  • CVR(Conversion Rate):转化率 = 购买 / 点击
  • GMV(Gross Merchandise Value):交易总额

4.2.5 结果分析

A/B 分桶对比表

指标分桶 A(Baseline)分桶 B(Experiment)提升幅度
展现量100,000100,0000%
点击量5,0005,500+10%
CTR5%5.5%+0.5pp
购买量500600+20%
CVR10%10.9%+0.9pp
GMV$50,000$60,000+20%

4.2.6 统计显著性检验

不能只看绝对值,需要进行统计检验:

  • t 检验:检验均值差异是否显著
  • 卡方检验:检验比率差异是否显著
  • 置信区间:计算差异的 95% 置信区间

判断标准

  • P值 < 0.05:差异显著
  • P值 ≥ 0.05:差异不显著,可能是随机波动

4.3 实验优化与全量部署

4.3.1 调整流量

如果分桶 B 表现更好:

  • 调大分桶 B 的流量比例(如 60% → 80%)
  • 继续观察,确认稳定性

4.3.2 全量部署

如果分桶 B 持续优于 A:

  1. 将分桶 B 流量调整到 100%
  2. 停止分桶 A
  3. 分桶 B 成为新的 Baseline
  4. 开始下一轮实验

4.4 AB 测试最佳实践

✅ 应该做的

  • 实验周期足够长(至少 7 天)
  • 控制其他变量(如流量入口)
  • 多指标综合评估
  • 统计显著性检验

❌ 不应该做的

  • 实验周期过短(如 1 天)
  • 忽略统计显著性
  • 只看单一指标
  • 数据窥视(提前终止实验)

5. 内容召回全流程实现

内容召回是推荐系统中最基础、最常用的召回方式之一。本节将详细讲解从内容采集到最终服务的完整实现流程。

5.1 核心目标

主要目标:计算物品与其它最相似的物品列表

应用场景

  • 相似推荐:商品详情页的"相似商品"
  • 扩展推荐:基于用户历史物品找相似物
  • 替代推荐:当某个物品不可用时推荐替代品

5.2 完整流程图

内容采集

中文分词 & 关键词提取

向量化表示(Word2Vec / Doc2Vec)

TopN 相似近邻搜索(余弦相似度)

存储(Redis)

Web 服务(Flask / Java)

返回相似物品列表

5.3 详细实现步骤

5.3.1 内容采集

采集的物品信息

字段说明示例
ID物品唯一标识"item_001"
title标题"Python 编程从入门到实践"
description详细描述"本书是 Python 入门经典..."
tags标签["编程", "Python", "入门"]
category分类"计算机"

数据来源

  • 数据库查询(MySQL / MongoDB)
  • 爬虫采集
  • API 接口
  • 手动标注

5.3.2 中文分词 & 关键词提取

工具:jieba 分词

python
import jieba
import jieba.analyse

# 加载自定义词典
jieba.load_userdict("custom_dict.txt")

# 分词
text = "Python 编程从入门到实践"
words = jieba.lcut(text)
# 输出:['Python', '编程', '从', '入门', '到', '实践']

# 关键词提取
keywords = jieba.analyse.extract_tags(text, topK=10, withWeight=True)
# 输出:[('编程', 0.5), ('Python', 0.4), ('入门', 0.3), ...]

关键词提取算法

  • TF-IDF:关键词重要性
  • TextRank:基于图的关键词排序
  • 自定义词典:人工标注的关键词

5.3.3 向量化表示

将文本转换为数值向量,便于计算相似度。

Word2Vec

核心思想:将词语映射到高维向量空间,相似的词在向量空间中距离较近。

工具

  • gensim(Python)
  • Spark Word2Vec(分布式)
  • 腾讯 Word2Vec(预训练模型)

示例

python
from gensim.models import Word2Vec

# 训练 Word2Vec 模型
sentences = [
    ["编程", "Python", "入门", "实践"],
    ["编程", "Java", "进阶", "实战"],
    ["编程", "JavaScript", "前端", "开发"]
]

model = Word2Vec(sentences, vector_size=100, window=5, min_count=1, workers=4)

# 获取词向量
vector = model.wv["Python"]

# 计算相似词
similar_words = model.wv.most_similar("编程")
# 输出:[('开发', 0.9), ('实战', 0.8), ...]
Doc2Vec

核心思想:将整个文档映射到向量空间,适用于文档相似度计算。

示例

python
from gensim.models.doc2vec import Doc2Vec, TaggedDocument

# 准备文档
documents = [
    TaggedDocument(words=["Python", "编程", "入门"], tags=["doc1"]),
    TaggedDocument(words=["Java", "编程", "进阶"], tags=["doc2"]),
    TaggedDocument(words=["JavaScript", "前端", "开发"], tags=["doc3"])
]

# 训练模型
model = Doc2Vec(documents, vector_size=100, window=5, min_count=1, workers=4)

# 获取文档向量
vector = model.dv["doc1"]

# 计算相似文档
similar_docs = model.dv.most_similar("doc1")
向量聚合(平均 / 加权平均)

对于长文本,可以将词向量聚合为文档向量:

python
import numpy as np

def document_to_vector(text, model):
    words = jieba.lcut(text)
    vectors = [model.wv[word] for word in words if word in model.wv]
    
    # 平均
    doc_vector = np.mean(vectors, axis=0)
    
    # 加权平均(使用 TF-IDF 权重)
    # doc_vector = np.average(vectors, axis=0, weights=weights)
    
    return doc_vector

5.3.4 TopN 相似近邻搜索

余弦相似度(最常用)
python
from scipy.spatial.distance import cosine

def cosine_similarity(vec1, vec2):
    return 1 - cosine(vec1, vec2)

# 计算物品 A 和物品 B 的相似度
similarity = cosine_similarity(vector_a, vector_b)
LSH 局部敏感哈希(大规模场景)

问题:如果有 100 万物品,两两计算相似度需要 5 万亿次计算,性能无法接受。

LSH 解决方案

  • 将相似物品哈希到同一个桶中
  • 只计算同桶内的物品相似度
  • 大幅减少计算量

工具

  • datasketch(Python)
  • Spark LSH
python
from datasketch import MinHashLSH

# 创建 LSH 索引
lsh = MinHashLSH(threshold=0.5, num_perm=128)

# 添加物品
lsh.insert("item1", minhash1)
lsh.insert("item2", minhash2)
lsh.insert("item3", minhash3)

# 查询相似物品
result = lsh.query(minhash1)
# 输出:["item1", "item2"]  # item2 与 item1 相似

5.3.5 存储与服务

Redis 存储设计

数据结构:Hash

Key: item:{item_id}
Field: similar_items
Value: [{"item_id": "item2", "similarity": 0.9}, {"item_id": "item3", "similarity": 0.8}, ...]

Redis 命令

bash
# 存储相似物品列表
HSET item:item1 similar_items '[{"item_id":"item2","similarity":0.9},...]'

# 查询相似物品
HGET item:item1 similar_items
Web 服务实现(Flask)
python
from flask import Flask, jsonify
import redis

app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)

@app.route('/recommend/similar/<item_id>')
def get_similar_items(item_id):
    # 从 Redis 获取相似物品
    similar_items_json = r.hget(f'item:{item_id}', 'similar_items')
    
    if similar_items_json is None:
        return jsonify({"error": "Item not found"}), 404
    
    similar_items = json.loads(similar_items_json)
    
    return jsonify({
        "item_id": item_id,
        "similar_items": similar_items
    })

if __name__ == '__main__':
    app.run(port=5000)
Web 服务实现(Java Spring Boot)
java
@RestController
@RequestMapping("/recommend")
public class RecommendController {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @GetMapping("/similar/{itemId}")
    public ResponseEntity<?> getSimilarItems(@PathVariable String itemId) {
        String key = "item:" + itemId;
        String json = redisTemplate.opsForHash().get(key, "similar_items");
        
        if (json == null) {
            return ResponseEntity.status(404).body("Item not found");
        }
        
        ObjectMapper mapper = new ObjectMapper();
        List<SimilarItem> similarItems = mapper.readValue(json, new TypeReference<List<SimilarItem>>(){});
        
        Map<String, Object> response = new HashMap<>();
        response.put("item_id", itemId);
        response.put("similar_items", similarItems);
        
        return ResponseEntity.ok(response);
    }
}

5.6 完整代码示例

python
import jieba
import jieba.analyse
import numpy as np
from gensim.models import Word2Vec
from scipy.spatial.distance import cosine
import redis
import json

# 1. 加载 Word2Vec 模型
model = Word2Vec.load("word2vec.model")

# 2. 文本向量化
def text_to_vector(text):
    words = jieba.lcut(text)
    vectors = [model.wv[word] for word in words if word in model.wv]
    if not vectors:
        return np.zeros(model.vector_size)
    return np.mean(vectors, axis=0)

# 3. 计算相似度
def calculate_similarity(vec1, vec2):
    return 1 - cosine(vec1, vec2)

# 4. 构建相似物品索引
def build_similarity_index(items):
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
    for item in items:
        item_id = item['id']
        item_text = item['title'] + ' ' + item['description']
        item_vector = text_to_vector(item_text)
        
        # 查找最相似的 Top10 物品
        similarities = []
        for other_item in items:
            if other_item['id'] == item_id:
                continue
            other_text = other_item['title'] + ' ' + other_item['description']
            other_vector = text_to_vector(other_text)
            similarity = calculate_similarity(item_vector, other_vector)
            similarities.append({
                'item_id': other_item['id'],
                'similarity': similarity
            })
        
        # 排序并取 Top10
        similarities.sort(key=lambda x: x['similarity'], reverse=True)
        top10 = similarities[:10]
        
        # 存储到 Redis
        redis_client.hset(f'item:{item_id}', 'similar_items', json.dumps(top10))
    
    print("Similarity index built successfully!")

# 5. 查询相似物品
def get_similar_items(item_id):
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    json_str = redis_client.hget(f'item:{item_id}', 'similar_items')
    
    if json_str:
        return json.loads(json_str)
    else:
        return []

5.7 性能优化建议

✅ 优化建议

  1. 批量计算:离线批量计算相似度,避免在线计算
  2. 缓存热点:Redis 缓存热门物品的相似列表
  3. 增量更新:新物品增量计算相似度,而非全量重算
  4. LSH 加速:大规模场景使用 LSH 减少计算量
  5. 向量压缩:使用 PCA 降维或量化压缩向量

⏱️ 性能指标

  • 召回时间:< 50ms(从 Redis 读取)
  • 离线计算:百万级物品 < 1 小时
  • 增量更新:新增物品 < 10s

6. 用户聚类推荐

用户聚类推荐是一种基于用户群体的推荐方法,通过将相似用户分到同一组,然后推荐该群体中热门的物品。

6.1 核心思想

原理:将用户按照兴趣特征分成 K 个聚类,每个聚类代表一个用户群体。对于新用户,先找到他属于哪个聚类,然后推荐该聚类中热门的物品。

优势

  • 可解释性强("和你类似的人都在看")
  • 计算效率高(一次聚类,多次复用)
  • 适合用户数量大但活跃用户少的场景

6.2 实现流程

6.2.1 用户聚类

步骤 1:特征提取

将用户的各种特征转换为数值向量:

python
from pyspark.ml.feature import OneHotEncoder, Word2Vec, VectorAssembler
from pyspark.ml.clustering import KMeans

# 1. 类别信息进行 one-hot 编码
gender_encoder = OneHotEncoder(inputCol="gender", outputCol="gender_vec")

# 2. 行为列表进行 word embedding 获得定长向量列表
# 假设用户行为是 ["商品A", "商品B", "商品C"]
word2vec = Word2Vec(vectorSize=100, minCount=1, inputCol="user_actions", outputCol="action_vec")

# 3. VectorAssembler 拼接所有特征向量
assembler = VectorAssembler(
    inputCols=["gender_vec", "action_vec", "age_vec"],
    outputCol="features"
)

# 4. 聚类算法 k-means
kmeans = KMeans(k=10, seed=42)

步骤 2:训练聚类模型

python
from pyspark.ml import Pipeline

# 构建流水线
pipeline = Pipeline(stages=[gender_encoder, word2vec, assembler, kmeans])

# 训练模型
model = pipeline.fit(user_data)

# 获取聚类结果
clustered_users = model.transform(user_data)
# 输出格式:
# | user_id | features | prediction |
# |---------|----------|------------|
# | 001     | [0.1,...]| 3          |
# | 002     | [0.2,...]| 3          |

步骤 3:聚类结果分析

聚类结果是一个用户ID到聚类数字的映射:

python
# 获取用户聚类数字
user_cluster_map = {
    "user_001": 3,
    "user_002": 3,
    "user_003": 5,
    "user_004": 5,
    ...
}

6.2.2 分群热榜统计

目标:为每个聚类计算热门物品榜单

方法:统计该聚类中用户的历史行为,计算每个物品的热度

python
from pyspark.sql import functions as F

# 合并用户聚类信息和历史行为
user_actions_with_cluster = user_actions.join(
    clustered_users.select("user_id", "prediction"),
    on="user_id"
)

# 按聚类分组,统计物品热度
cluster_hot_items = user_actions_with_cluster.groupBy(
    "prediction", "item_id"
).agg(
    F.count("*").alias("popularity")
).orderBy("prediction", F.desc("popularity"))

# 为每个聚类取 Top20 热门物品
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window.partitionBy("prediction").orderBy(F.desc("popularity"))

cluster_hot_rank = cluster_hot_items.withColumn(
    "rank", row_number().over(windowSpec)
).filter(F.col("rank") <= 20)

# 结果格式:
# | prediction | item_id | popularity | rank |
# |------------|---------|------------|------|
# | 3          | item_123| 5000       | 1    |
# | 3          | item_456| 4800       | 2    |

热榜结果:聚类 → 热门物品列表

python
cluster_hot_list = {
    0: ["item_001", "item_002", ...],  # 聚类0的热门物品
    1: ["item_101", "item_102", ...],  # 聚类1的热门物品
    ...
}

6.2.3 计算结果缓存

正排列表:用户ID → 聚类数字

存储:Cassandra

python
# Cassandra 表设计
CREATE TABLE user_cluster (
    user_id text PRIMARY KEY,
    cluster_id int
);

# 写入数据
INSERT INTO user_cluster (user_id, cluster_id) VALUES ('user_001', 3);

倒排列表:聚类数字 → 推荐ITEM列表

存储:Redis

bash
# Redis Key 设计
# cluster:{cluster_id}:hot_items

# 存储格式(List)
LPUSH cluster:3:hot_items "item_123"
LPUSH cluster:3:hot_items "item_456"
...
python
import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# 存储聚类热门物品
for cluster_id, items in cluster_hot_list.items():
    key = f"cluster:{cluster_id}:hot_items"
    r.delete(key)  # 清空旧数据
    for item_id in items:
        r.lpush(key, item_id)

6.2.4 在线服务

接口设计

输入:用户ID
输出:推荐物品列表

流程

用户请求 (user_id)

从 Cassandra 查询用户聚类 (正排查询)

从 Redis 获取该聚类推荐列表 (倒排查询)

返回推荐结果

实现代码(Python Flask)

python
from flask import Flask, jsonify
import redis
from cassandra.cluster import Cluster

app = Flask(__name__)

# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# 连接 Cassandra
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('recommendation')

@app.route('/recommend/user/<user_id>')
def recommend_user(user_id):
    # 1. 从 Cassandra 获取用户聚类
    query = f"SELECT cluster_id FROM user_cluster WHERE user_id = '{user_id}'"
    rows = session.execute(query)
    
    if not rows:
        return jsonify({"error": "User not found"}), 404
    
    cluster_id = rows[0].cluster_id
    
    # 2. 从 Redis 获取该聚类的热门物品
    key = f"cluster:{cluster_id}:hot_items"
    items = r.lrange(key, 0, 19)  # 获取 Top20
    
    # 解码 bytes
    items = [item.decode('utf-8') for item in items]
    
    return jsonify({
        "user_id": user_id,
        "cluster_id": cluster_id,
        "recommendations": items
    })

if __name__ == '__main__':
    app.run(port=5000)

6.3 优化建议

✅ 优化方向

  1. 定期更新聚类:用户兴趣会变化,建议每周重新聚类一次
  2. 热榜时效性:按天/周统计热度,保证新鲜度
  3. 冷启动优化:新用户先用热门物品推荐,积累数据后再聚类
  4. 混合策略:结合协同过滤,提高推荐准确性

⏱️ 性能指标

  • 聚类时间:百万用户,K=10,< 30分钟
  • 热榜统计:< 10分钟
  • 在线查询:< 10ms

7. 矩阵分解的推荐

矩阵分解是推荐系统中最经典、最有效的算法之一,广泛应用于工业界。

7.1 核心思想

问题:用户-物品评分矩阵通常是极其稀疏的(用户只会给很少的物品评分)

解决方法:将稀疏的大矩阵分解为两个稠密的小矩阵的乘积

$$R \approx U \times V^T$$

其中:

  • $R$:用户-物品评分矩阵($m \times n$,稀疏)
  • $U$:用户隐含因子矩阵($m \times k$)
  • $V$:物品隐含因子矩阵($n \times k$)
  • $k$:隐含因子数量(通常 20-100)

直观理解

  • 每个用户用一个 $k$ 维向量表示,代表用户的偏好
  • 每个物品用一个 $k$ 维向量表示,代表物品的特征
  • 用户对物品的预测评分 = 用户向量 · 物品向量

7.2 PySpark ALS 算法实现

7.2.1 原始输入文件

数据格式:用户ID、物品ID、评分

csv
user_id,item_id,rating
1,101,5.0
1,102,3.0
2,101,2.0
2,103,5.0
3,102,4.0
...

7.2.2 矩阵分解模型训练

算法:ALS(Alternating Least Squares)

ALS 是 Spark MLlib 中最常用的矩阵分解算法,原理是交替最小二乘法:

  1. 固定物品矩阵 V,优化用户矩阵 U
  2. 固定用户矩阵 U,优化物品矩阵 V
  3. 重复 1-2 直到收敛

代码实现

python
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# 创建 SparkSession
spark = SparkSession.builder.appName("MatrixFactorization").getOrCreate()

# 读取数据
data = spark.read.csv("ratings.csv", header=True, inferSchema=True)

# 训练集和测试集划分
train, test = data.randomSplit([0.8, 0.2])

# ALS 参数
als = ALS(
    userCol="user_id",
    itemCol="item_id",
    ratingCol="rating",
    rank=20,              # 隐含因子数量
    maxIter=10,           # 最大迭代次数
    regParam=0.1,         # 正则化参数
    coldStartStrategy="drop"  # 处理冷启动
)

# 训练模型
model = als.fit(train)

# 预测测试集
predictions = model.transform(test)

# 评估模型
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

7.2.3 获得用户向量和物品向量

用户向量

python
# 获取用户因子矩阵
user_factors = model.userFactors
user_factors.show(5)

# 输出格式:
# +--------+--------------------+
# |user_id|        features     |
# +--------+--------------------+
# |      1 |[0.1,0.2,0.3,...]   |
# |      2 |[0.2,0.1,0.4,...]   |
# +--------+--------------------+

# 转换为字典
user_vectors = {row['user_id']: row['features'].toArray() for row in user_factors.collect()}

物品向量

python
# 获取物品因子矩阵
item_factors = model.itemFactors
item_factors.show(5)

# 输出格式:
# +--------+--------------------+
# |item_id|        features     |
# +--------+--------------------+
# |     101|[0.3,0.1,0.2,...]   |
# |     102|[0.2,0.4,0.1,...]   |
# +--------+--------------------+

# 转换为字典
item_vectors = {row['item_id']: row['features'].toArray() for row in item_factors.collect()}

7.2.4 生成推荐列表

U2I 推荐:用户向量 × 物品向量

python
import numpy as np
from pyspark.sql.functions import col

# 为用户 1 推荐物品
user_id = 1
user_vec = user_vectors[user_id]

# 计算用户对所有物品的评分
recommendations = []
for item_id, item_vec in item_vectors.items():
    score = np.dot(user_vec, item_vec)  # 点积
    recommendations.append((item_id, score))

# 排序并取 TopN
recommendations.sort(key=lambda x: x[1], reverse=True)
top_n = recommendations[:20]

print(f"Top20 recommendations for user {user_id}:")
for item_id, score in top_n:
    print(f"Item {item_id}: {score:.4f}")

或者使用 Spark 内置方法

python
# 为所有用户推荐 Top10
user_recs = model.recommendForAllUsers(10)
user_recs.show(5)

# 输出格式:
# +--------+--------------------+
# |user_id|   recommendations   |
# +--------+--------------------+
# |      1 |[[102,4.5],[...]]   |
# +--------+--------------------+

# 为单个用户推荐
user_subset = train.filter(col("user_id") == 1)
user_subset_recs = model.recommendForUserSubset(user_subset, 10)
user_subset_recs.show()

7.2.5 生成 I2I 相似推荐

I2I 推荐:物品向量 × 物品向量

python
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

# 计算物品相似度矩阵
item_ids = list(item_vectors.keys())
item_vecs = np.array([item_vectors[i] for i in item_ids])

# 使用余弦相似度
similarities = cosine_similarity(item_vecs)

# 为物品 101 找最相似的物品
item_id = "101"
item_idx = item_ids.index(item_id)

# 获取相似度排序
sim_scores = list(enumerate(similarities[item_idx]))
sim_scores.sort(key=lambda x: x[1], reverse=True)

# 取 Top10(排除自己)
top_similar = [(item_ids[i], score) for i, score in sim_scores[1:11]]

print(f"Top10 similar items to {item_id}:")
for similar_item_id, score in top_similar:
    print(f"{similar_item_id}: {score:.4f}")

存储到 Redis

python
import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

# 存储每个物品的相似物品列表
for item_id in item_ids:
    item_idx = item_ids.index(item_id)
    sim_scores = list(enumerate(similarities[item_idx]))
    sim_scores.sort(key=lambda x: x[1], reverse=True)
    
    # 取 Top10
    similar_items = [
        {"item_id": item_ids[i], "similarity": float(score)}
        for i, score in sim_scores[1:11]
    ]
    
    # 存储到 Redis
    key = f"item:{item_id}:similar"
    r.set(key, json.dumps(similar_items))

print("Item similarity index saved to Redis!")

7.3 矩阵分解的优缺点

优点

  • ✅ 处理稀疏数据效果好
  • ✅ 捕捉潜在特征,可解释性强
  • ✅ 预测评分准确
  • ✅ 计算效率高(Spark ALS 分布式)

缺点

  • ❌ 冷启动问题严重(新用户/新物品无法推荐)
  • ❌ 无法利用物品内容信息
  • ❌ 参数调优复杂(rank、regParam 等)

7.4 实际应用场景

  • 电影推荐:Netflix、豆瓣电影
  • 音乐推荐:Spotify、网易云音乐
  • 电商推荐:亚马逊、淘宝商品推荐

8. 推荐系统的 API 接口设计

推荐系统的 API 接口是在线服务层与客户端交互的桥梁,设计良好的接口至关重要。

8.1 接口设计原则

✅ 好的接口设计

  1. RESTful 风格:遵循 REST 架构风格
  2. 版本控制:URL 中包含版本号(如 /v1/recommend
  3. 统一格式:请求和响应格式统一
  4. 错误处理:清晰的错误码和错误信息
  5. 性能优化:接口响应时间 < 100ms
  6. 幂等性:相同请求返回相同结果

8.2 核心接口设计

8.2.1 个性化推荐接口

场景:首页推荐、"猜你喜欢"

请求

http
GET /v1/recommend/personalized

请求参数

json
{
  "user_id": "user_123456",
  "context": {
    "device": "mobile",
    "location": "beijing",
    "time": "2026-03-28 20:00:00"
  },
  "limit": 20
}

响应

json
{
  "code": 0,
  "message": "success",
  "data": {
    "user_id": "user_123456",
    "recommendations": [
      {
        "item_id": "item_001",
        "score": 0.95,
        "reason": "基于你的历史行为",
        "recall_source": "cf"
      },
      {
        "item_id": "item_002",
        "score": 0.88,
        "reason": "热门推荐",
        "recall_source": "hot"
      }
    ],
    "request_id": "req_1234567890"
  },
  "timestamp": 1700000000
}

8.2.2 相似推荐接口

场景:商品详情页"相似商品"

请求

http
GET /v1/recommend/similar/<item_id>

请求参数

json
{
  "item_id": "item_001",
  "limit": 10
}

响应

json
{
  "code": 0,
  "message": "success",
  "data": {
    "item_id": "item_001",
    "similar_items": [
      {
        "item_id": "item_123",
        "similarity": 0.92,
        "title": "Python 编程从入门到实践"
      },
      {
        "item_id": "item_456",
        "similarity": 0.87,
        "title": "Java 核心技术卷一"
      }
    ]
  },
  "timestamp": 1700000000
}

8.2.3 热门推荐接口

场景:热门榜单、流量兜底

请求

http
GET /v1/recommend/hot

请求参数

json
{
  "category": "tech",
  "period": "daily",  // daily, weekly, monthly
  "limit": 20
}

响应

json
{
  "code": 0,
  "message": "success",
  "data": {
    "category": "tech",
    "period": "daily",
    "hot_items": [
      {
        "item_id": "item_001",
        "rank": 1,
        "popularity": 5000,
        "trend": "up"
      },
      {
        "item_id": "item_002",
        "rank": 2,
        "popularity": 4800,
        "trend": "stable"
      }
    ]
  },
  "timestamp": 1700000000
}

8.3 错误码设计

统一错误码格式

json
{
  "code": 40001,
  "message": "用户不存在",
  "data": null,
  "request_id": "req_1234567890"
}

常见错误码

错误码说明
0成功
40001参数错误
40002用户不存在
40003物品不存在
50001推荐服务超时
50002推荐服务异常
50003数据库异常

8.4 性能优化

缓存策略

python
from flask import Flask
from functools import lru_cache
import redis

app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)

# 方案 1:使用 LRU 缓存(内存)
@lru_cache(maxsize=10000)
def get_recommendations_from_db(user_id):
    # 从数据库获取推荐
    pass

# 方案 2:使用 Redis 缓存
def get_recommendations(user_id, limit=20):
    # 先查缓存
    cache_key = f"recommend:{user_id}:{limit}"
    cached = r.get(cache_key)
    
    if cached:
        return json.loads(cached)
    
    # 缓存未命中,从数据库获取
    recommendations = get_recommendations_from_db(user_id, limit)
    
    # 写入缓存(5分钟过期)
    r.setex(cache_key, 300, json.dumps(recommendations))
    
    return recommendations

超时设置

python
from flask import Flask, jsonify
import signal

app = Flask(__name__)

# 超时处理
def timeout_handler(signum, frame):
    raise TimeoutError("Request timeout")

@app.route('/v1/recommend/personalized')
def recommend_personalized():
    # 设置超时(100ms)
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(0.1)  # 100ms
    
    try:
        user_id = request.args.get('user_id')
        recommendations = get_recommendations(user_id)
        return jsonify({"code": 0, "data": recommendations})
    except TimeoutError:
        # 超时返回兜底推荐
        fallback_items = get_fallback_items()
        return jsonify({
            "code": 0,
            "data": fallback_items,
            "warning": "timeout, using fallback"
        })
    finally:
        signal.alarm(0)  # 取消超时

9. 物品冷启动问题

冷启动是推荐系统的经典难题,指新物品没有历史数据,无法被推荐系统识别和推荐。

9.1 冷启动场景

物品冷启动

  • 新上架的商品/内容
  • 没有用户互动过的物品
  • 长尾物品(曝光机会少)

用户冷启动

  • 新注册用户
  • 没有历史行为的用户

9.2 解决方案一:基于内容推荐

原理:U2I2I - 用户→物品→物品

当物品没有行为数据时,利用物品的内容特征进行推荐。

方法 1:内容相似度

python
# 基于物品内容计算相似度
def content_based_recommendation(user_id, new_item_id):
    # 1. 获取用户历史行为
    user_history = get_user_history(user_id)
    
    # 2. 找用户历史物品与新物品的相似度
    similarities = []
    for old_item_id in user_history:
        sim = calculate_content_similarity(old_item_id, new_item_id)
        similarities.append((old_item_id, sim))
    
    # 3. 如果有高相似度的历史物品,则推荐该新物品
    max_sim = max(similarities, key=lambda x: x[1])
    if max_sim[1] > 0.7:  # 相似度阈值
        return True  # 推荐新物品
    
    return False  # 不推荐

方法 2:标签匹配

python
def tag_based_recommendation(user_id, new_item):
    # 1. 提取用户兴趣标签
    user_tags = get_user_tags(user_id)
    # 输出:["Python", "编程", "AI"]
    
    # 2. 提取新物品标签
    item_tags = new_item['tags']
    # 输出:["Python", "入门"]
    
    # 3. 计算标签重合度
    overlap = len(set(user_tags) & set(item_tags))
    if overlap >= 2:  # 至少有2个标签重合
        return True
    
    return False

9.3 解决方案二:多级流量池机制(抖音模式)

抖音的多级流量池机制是解决冷启动的经典方案,也称"赛马机制"。

9.3.1 核心思想

原理:新物品先在小的流量池测试,表现好的逐步进入更大的流量池。

流程

新物品上架

内容质量审核(标题、简介、封面、账号粉丝数)

一级流量池(500推荐量)

表现达标?
  ├─ 是 → 二级流量池(1W推荐量)
  │   ↓
  │   表现达标?
  │   ├─ 是 → 三级流量池(10W推荐量)
  │   │   ↓
  │   │   ...
  │   └─ 否 → 停止推荐
  └─ 否 → 停止推荐

9.3.2 流量池设计

一级流量池(初始测试)

  • 推荐量:500次
  • 目标用户:随机分发
  • 考核指标:
    • 播放率 > 30%
    • 点赞率 > 10%
    • 评论率 > 5%
    • 完播率 > 40%

二级流量池(小规模推广)

  • 推荐量:1万次
  • 目标用户:相似兴趣用户
  • 考核指标:
    • 播放率 > 25%
    • 点赞率 > 8%
    • 评论率 > 4%

三级流量池(中等规模)

  • 推荐量:10万次
  • 目标用户:精准推荐
  • 考核指标:
    • 播放率 > 20%
    • 点赞率 > 6%

N级流量池(大规模推广)

  • 推荐量:100万+次
  • 目标用户:全量用户
  • 考核指标:相对指标(与其他内容对比)

9.3.3 晋升影响因素

核心指标

  1. 播放率 = 播放量 / 曝光量
  2. 点赞率 = 点赞量 / 播放量
  3. 评论率 = 评论量 / 播放量
  4. 转发率 = 转发量 / 播放量
  5. 完播率 = 完整观看次数 / 播放量
  6. 收藏率 = 收藏量 / 播放量
  7. 转化率 = 转化量 / 播放量

综合评分

python
def calculate_content_score(content_id):
    metrics = get_content_metrics(content_id)
    
    # 各指标权重(根据业务调整)
    weights = {
        'play_rate': 0.3,
        'like_rate': 0.2,
        'comment_rate': 0.15,
        'share_rate': 0.15,
        'completion_rate': 0.2
    }
    
    # 计算综合评分
    score = (
        weights['play_rate'] * metrics['play_rate'] +
        weights['like_rate'] * metrics['like_rate'] +
        weights['comment_rate'] * metrics['comment_rate'] +
        weights['share_rate'] * metrics['share_rate'] +
        weights['completion_rate'] * metrics['completion_rate']
    )
    
    return score

# 流量池晋升判断
def should_promote(content_id, current_level):
    score = calculate_content_score(content_id)
    
    # 不同流量池的晋升阈值
    thresholds = {
        1: 0.6,  # 一级升二级
        2: 0.7,  # 二级升三级
        3: 0.75  # 三级升四级
    }
    
    return score >= thresholds[current_level]

9.3.4 冷启动结束

停止推荐的条件

  1. 综合评分低于阈值,连续3个流量池未通过
  2. 内容被举报/违规
  3. 账号被降权/封禁
  4. 内容过时(如新闻超过7天)

处理方式

python
def handle_cold_start_failure(content_id):
    content = get_content(content_id)
    
    # 分析失败原因
    reason = analyze_failure_reason(content_id)
    # 可能原因:标题不吸引、封面质量差、内容质量低等
    
    # 记录失败原因,供创作者参考
    log_failure_reason(content_id, reason)
    
    # 停止推荐
    stop_recommendation(content_id)

9.4 其他冷启动解决方案

方案 3:利用社交关系

python
def social_based_recommendation(user_id, new_item):
    # 获取用户关注的好友
    friends = get_user_friends(user_id)
    
    # 检查是否有好友已经互动过该新物品
    for friend_id in friends:
        if has_interaction(friend_id, new_item['id']):
            # 好友喜欢,推荐给用户
            return True
    
    return False

方案 4:主动征集反馈

python
# 在推荐结果中插入少量新物品
def recommend_with_exploration(user_id, limit=20):
    # 正常推荐(18个)
    normal_items = get_normal_recommendations(user_id, limit=18)
    
    # 探索推荐(2个新物品)
    new_items = get_random_new_items(limit=2)
    
    # 合并返回
    return normal_items + new_items

方案 5:利用外部数据

  • 物品的搜索热度
  • 社交媒体讨论度
  • 竞品平台表现

10. Embedding 技术深度解析

Embedding(嵌入)是将离散的实体(如用户、物品、词语)映射到连续的低维向量空间的技术,是现代推荐系统的核心。

10.1 Embedding 的核心思想

直观理解

  • 将复杂的实体用一个向量表示
  • 相似的实体在向量空间中距离较近
  • 向量可以参与数学运算(加减、点积等)

示例

国王 - 男人 + 女人 ≈ 女王
北京 - 中国 + 法国 ≈ 巴黎

10.2 三种主流 Embedding 方法

10.2.1 基于内容的 Word2Vec

应用场景:文本内容丰富的物品(如文章、商品描述)

方法

python
from gensim.models import Word2Vec
import jieba

# 1. 训练 Word2Vec 模型
sentences = [
    jieba.lcut("Python 编程从入门到实践"),
    jieba.lcut("Java 核心技术卷一"),
    jieba.lcut("JavaScript 高级程序设计")
]

model = Word2Vec(sentences, vector_size=100, window=5, min_count=1)

# 2. 获取物品向量
def get_item_embedding(text):
    words = jieba.lcut(text)
    vectors = [model.wv[word] for word in words if word in model.wv]
    if vectors:
        return np.mean(vectors, axis=0)  # 平均
    else:
        return np.zeros(100)

# 3. 计算物品相似度
item1_text = "Python 编程从入门到实践"
item2_text = "Java 核心技术卷一"

vec1 = get_item_embedding(item1_text)
vec2 = get_item_embedding(item2_text)

similarity = cosine_similarity([vec1], [vec2])[0][0]
print(f"Similarity: {similarity}")

10.2.2 协同过滤矩阵分解(CF Embedding)

应用场景:有丰富用户-物品交互数据的场景

方法:使用 ALS 矩阵分解得到的用户向量和物品向量

python
from pyspark.ml.recommendation import ALS

# 训练 ALS 模型
als = ALS(
    userCol="user_id",
    itemCol="item_id",
    ratingCol="rating",
    rank=50  # Embedding 维度
)
model = als.fit(ratings_data)

# 获取 Embedding
user_embeddings = model.userFactors  # 用户 Embedding
item_embeddings = model.itemFactors  # 物品 Embedding

10.2.3 深度学习方法(DNN Embedding)

应用场景:复杂的非线性关系

方法

双塔模型(Two-Tower Model)

python
import tensorflow as tf
from tensorflow.keras import layers, Model

# 用户塔
def build_user_tower(user_features, embedding_dim=64):
    user_id_input = layers.Input(shape=(1,), name='user_id')
    user_embed = layers.Embedding(input_dim=10000, output_dim=embedding_dim)(user_id_input)
    user_vec = layers.Flatten()(user_embed)
    
    # 添加其他用户特征
    age_input = layers.Input(shape=(1,), name='age')
    gender_input = layers.Input(shape=(1,), name='gender')
    
    # 拼接
    concat = layers.Concatenate()([user_vec, age_input, gender_input])
    
    # 全连接层
    dense = layers.Dense(128, activation='relu')(concat)
    dense = layers.Dense(64, activation='relu')(dense)
    user_embedding = layers.Dense(embedding_dim, activation='linear')(dense)
    
    return Model(inputs=[user_id_input, age_input, gender_input], outputs=user_embedding)

# 物品塔
def build_item_tower(item_features, embedding_dim=64):
    item_id_input = layers.Input(shape=(1,), name='item_id')
    item_embed = layers.Embedding(input_dim=50000, output_dim=embedding_dim)(item_id_input)
    item_vec = layers.Flatten()(item_embed)
    
    # 添加其他物品特征
    category_input = layers.Input(shape=(10,), name='category')  # one-hot
    price_input = layers.Input(shape=(1,), name='price')
    
    # 拼接
    concat = layers.Concatenate()([item_vec, category_input, price_input])
    
    # 全连接层
    dense = layers.Dense(128, activation='relu')(concat)
    dense = layers.Dense(64, activation='relu')(dense)
    item_embedding = layers.Dense(embedding_dim, activation='linear')(dense)
    
    return Model(inputs=[item_id_input, category_input, price_input], outputs=item_embedding)

# 构建模型
user_tower = build_user_tower(None)
item_tower = build_item_tower(None)

# 计算相似度
def similarity_score(user_emb, item_emb):
    # 点积
    dot_product = tf.reduce_sum(user_emb * item_emb, axis=1, keepdims=True)
    return dot_product

# 组合模型
user_embedding = user_tower([
    layers.Input(shape=(1,), name='user_id'),
    layers.Input(shape=(1,), name='age'),
    layers.Input(shape=(1,), name='gender')
])

item_embedding = item_tower([
    layers.Input(shape=(1,), name='item_id'),
    layers.Input(shape=(10,), name='category'),
    layers.Input(shape=(1,), name='price')
])

score = similarity_score(user_embedding, item_embedding)

# 完整模型
model = Model(
    inputs=user_tower.inputs + item_tower.inputs,
    outputs=score
)

# 编译
model.compile(optimizer='adam', loss='mse')

# 训练
model.fit([user_ids, ages, genders, item_ids, categories, prices], ratings, epochs=10)

10.3 Embedding 在推荐系统中的应用

1. 用户画像

python
# 用户兴趣向量
user_interest_vector = get_user_embedding(user_id)

# 分析用户兴趣
def analyze_user_interest(user_embedding):
    # 找到最接近的兴趣标签
    interest_tags = ['科技', '娱乐', '体育', '财经', '教育']
    tag_embeddings = load_tag_embeddings()
    
    similarities = []
    for tag in interest_tags:
        sim = cosine_similarity([user_embedding], [tag_embeddings[tag]])[0][0]
        similarities.append((tag, sim))
    
    # 排序
    similarities.sort(key=lambda x: x[1], reverse=True)
    
    return similarities[:5]  # Top5 兴趣

2. 物品相似度计算

python
# 计算物品相似度
def find_similar_items(target_item_id, top_n=10):
    target_vector = get_item_embedding(target_item_id)
    
    similarities = []
    for item_id in all_items:
        if item_id == target_item_id:
            continue
        item_vector = get_item_embedding(item_id)
        sim = cosine_similarity([target_vector], [item_vector])[0][0]
        similarities.append((item_id, sim))
    
    # 排序
    similarities.sort(key=lambda x: x[1], reverse=True)
    
    return similarities[:top_n]

3. 推荐打分

python
# 用户对物品的推荐分数 = 用户向量 · 物品向量
def recommend_score(user_id, item_id):
    user_vector = get_user_embedding(user_id)
    item_vector = get_item_embedding(item_id)
    
    score = np.dot(user_vector, item_vector)
    return score

10.4 Embedding 的优势

✅ 优势

  1. 降维:将高维稀疏数据转换为低维稠密向量
  2. 捕捉语义:学习实体之间的潜在关系
  3. 通用性:可用于多种推荐场景
  4. 可解释性:通过可视化理解特征

10.5 Embedding 的选择建议

场景推荐方法
文本内容丰富Word2Vec / BERT
用户行为数据多ALS 矩阵分解
复杂非线性关系双塔 DNN 模型
图结构数据Graph Embedding(Node2Vec)

11. 总结

本章详细介绍了推荐系统的经典算法与技术细节,包括:

  1. 基于内容的推荐:通过计算用户向量和物品向量的相似度来推荐
  2. 协同过滤推荐
    • 基于记忆的方法(User-based、Item-based)
    • 基于模型的方法(矩阵分解、深度学习)
  3. 多路召回融合:按顺序、平均、加权、动态、机器学习等策略
  4. AB 测试:科学评估推荐效果的完整流程
  5. 内容召回全流程:从内容采集到 Web 服务的完整实现

通过这两章的学习,你已经掌握了推荐系统的基础架构和核心算法,可以开始动手实践了!


💡 下一步建议

  • 动手实现一个简单的推荐系统(如电影推荐)
  • 学习深度学习推荐模型(如 Wide & Deep、DeepFM)
  • 关注工业级推荐系统的性能优化和工程实践

MIT