经典推荐算法与技术细节
在掌握了推荐系统的基本架构后,本章将深入探讨具体的推荐算法和技术实现细节,帮助你从理论走向实战。
1. 基于内容的推荐(Content-Based Recommendation)
1.1 核心思想
基于内容的推荐的核心原理是:给用户推荐之前喜欢的物品相似的物品
这包含两种常见方式:
U2I2I:用户→物品→物品
- 找用户喜欢的历史物品
- 找与这些物品相似的物品
- 推荐给用户
U2TAG2I:用户→标签→物品
- 提取用户的兴趣标签
- 找带有这些标签的物品
- 推荐给用户
1.2 用户向量与物品向量
1.2.1 向量表示
将用户和物品都表示为向量,通过计算向量相似度来衡量匹配程度。
用户向量:代表用户的兴趣偏好 物品向量:代表物品的特征属性
1.2.2 相似度计算方法
余弦相似度(最常用)
def cosine_similarity(vec1, vec2):
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
return dot_product / (norm1 * norm2)欧氏距离
def euclidean_distance(vec1, vec2):
return np.sqrt(np.sum((vec1 - vec2) ** 2))皮尔逊相关系数
适用于处理评分数据,考虑用户评分习惯的差异。
1.3 基于内容推荐的优缺点
优点:
- ✅ 可解释性强,用户容易理解
- ✅ 不依赖用户行为数据,冷启动问题较少
- ✅ 推荐结果稳定,不会频繁变化
缺点:
- ❌ 很难发现用户未接触过的新类型物品
- ❌ 物品特征需要人工标注或提取
- ❌ 存在信息茧房风险,推荐范围窄
1.4 实际应用场景
- 电商:相似商品推荐("看了这个的人还看了")
- 视频:相似视频推荐("你可能也喜欢")
- 音乐:相似歌曲推荐("猜你喜欢")
2. 基于协同过滤的推荐(Collaborative Filtering)
协同过滤是推荐系统中最经典、应用最广泛的算法,其核心思想是:根据群体的智慧进行推荐
2.1 核心概念
协同过滤假设:如果用户 A 和用户 B 过去的行为相似,那么他们对物品的偏好也相似;反之,如果物品 A 和物品 B 经常被相同的用户喜欢,那么这两个物品也相似。
2.2 基于记忆的协同过滤(Memory-Based CF)
2.2.1 User-based CF(基于用户的协同过滤)
原理:U2U2I - 和你兴趣相投的人也喜欢 XXX
流程:
- 计算用户之间的相似度
- 找到与目标用户最相似的 K 个用户
- 找这些相似用户喜欢但目标用户未互动的物品
- 根据相似度加权,生成推荐列表
相似度计算:
- 余弦相似度
- 皮尔逊相关系数
- Jaccard 相似度
示例:
用户 A 喜欢的物品:[物品1, 物品2, 物品3]
用户 B 喜欢的物品:[物品1, 物品2, 物品4]
用户 C 喜欢的物品:[物品1, 物品5]
给用户 C 推荐:
- 与用户 C 相似的用户是 A(都喜欢物品1)
- 用户 A 喜欢但用户 C 未看过的:物品2, 物品3
- 推荐:[物品2, 物品3]2.2.2 Item-based CF(基于物品的协同过滤)
原理:U2I2I - 喜欢这个物品的人也喜欢 XXX
流程:
- 计算物品之间的相似度
- 找到用户喜欢物品的相似物品
- 根据相似度加权,生成推荐列表
相似度计算:
- 基于用户共现度
- 基于评分相关性
示例:
物品1 被喜欢的人群:[用户A, 用户B, 用户C]
物品2 被喜欢的人群:[用户A, 用户B, 用户D]
物品3 被喜欢的人群:[用户A, 用户E]
用户 A 喜欢物品1,推荐:
- 与物品1 相似的物品是物品2(共现用户:A, B)
- 推荐:物品22.3 基于模型的协同过滤(Model-Based CF)
基于模型的方法通过机器学习算法构建用户-物品交互的数学模型。
2.3.1 矩阵分解(Matrix Factorization)
核心思想:将用户-物品评分矩阵分解为两个低维矩阵的乘积
$$R \approx U \times V^T$$
其中:
- R:用户-物品评分矩阵(稀疏)
- U:用户隐含因子矩阵
- V:物品隐含因子矩阵
优势:
- 可以处理稀疏数据
- 捕捉潜在特征
- 预测缺失值
算法:
- SVD(Singular Value Decomposition)
- ALS(Alternating Least Squares)
- NMF(Non-negative Matrix Factorization)
2.3.2 深度学习模型
Neural Collaborative Filtering (NCF)
- 用神经网络替代传统矩阵分解
- 学习用户和物品的非线性交互
Deep & Wide
- Wide 层:记忆能力,学习历史特征组合
- Deep 层:泛化能力,学习潜在特征
2.4 协同过滤的优缺点
优点:
- ✅ 不需要物品特征信息
- ✅ 可以发现用户潜在兴趣
- ✅ 推荐新颖性和多样性好
缺点:
- ❌ 冷启动问题:新用户/新物品无法推荐
- ❌ 数据稀疏性:用户行为稀疏时效果差
- ❌ 热门偏向:热门物品被过度推荐
2.5 实际应用
- 电商:个性化商品推荐
- 视频:视频推荐
- 音乐:音乐推荐
3. 多路召回融合策略
在实际推荐系统中,往往采用多路召回策略,然后对召回结果进行融合排序。
3.1 融合策略对比
3.1.1 按顺序展示
方法:召回1 结果 → 召回2 结果 → 召回3 结果...
优点:实现简单
缺点:
- 早期召回占主导
- 后期召回难以曝光
适用场景:召回效果差异大,有明确优先级
3.1.2 平均法(Averaging)
方法:对所有召回的结果列表取平均
优点:公平对待所有召回
缺点:
- 忽略召回质量差异
- 简单平均可能不合理
3.1.3 加权投票(Weighted Voting)
方法:根据召回质量设置权重
$$Score_{final} = \sum_{i=1}^{n} w_i \times Score_i$$
其中:
- $w_i$:第 i 个召回路径的权重
- $Score_i$:第 i 个召回路径的打分
设置权重原则:
- 质量高的召回路径权重高
- 热门召回路径权重适中
- 长尾召回路径权重较低
3.1.4 动态加权法(Dynamic Weighting)
方法:根据用户特征、物品特征动态调整权重
示例:
def dynamic_weight(user_id, item_features):
weights = {
'cf': 0.5, # 协同过滤
'content': 0.3, # 内容推荐
'hot': 0.2 # 热门推荐
}
# 新用户降低协同过滤权重
if is_new_user(user_id):
weights['cf'] = 0.2
weights['content'] = 0.5
weights['hot'] = 0.3
return weights3.1.5 机器学习权重法(ML-based Weighting)
方法:使用机器学习模型自动学习召回路径权重
流程:
- 收集用户历史交互数据
- 标记每个物品的召回来源
- 训练模型预测用户点击概率
- 模型输出即为权重
优势:
- 自动优化权重
- 适应数据变化
缺点:
- 需要大量标注数据
- 训练成本高
3.2 融合策略选择建议
| 策略 | 实现难度 | 效果 | 适用场景 |
|---|---|---|---|
| 按顺序展示 | ⭐ | ⭐⭐ | 召回差异大 |
| 平均法 | ⭐⭐ | ⭐⭐⭐ | 快速上线 |
| 加权投票 | ⭐⭐⭐ | ⭐⭐⭐⭐ | 稳定期 |
| 动态加权 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 个性化强 |
| 机器学习 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 大规模系统 |
3.3 成本考虑
如果召回路径质量接近,按照成本选择:
- 实时计算(如协同过滤):成本低,权重可设高
- 离线计算(如深度学习模型):成本高,权重可设低
- 第三方接口(如 API):成本高,权重需谨慎设置
4. AB 测试系统设计与实现
AB 测试是推荐系统优化的核心工具,通过科学对比不同策略的效果,选择最优方案。
4.1 AB 测试基本流程
4.1.1 系统架构
用户请求
↓
AB 测试服务
↓
根据用户ID 分桶(分桶A / 分桶B)
↓
访问后端推荐服务
↓
返回对应分桶的推荐结果
↓
用户交互(展现、点击、购买)
↓
日志记录(带分桶标志)
↓
数据分析
↓
结果对比
↓
全量部署获胜分桶4.1.2 分桶算法
最常用的是哈希取模:
def get_bucket(user_id, bucket_count=10):
"""
将用户分配到指定数量的分桶中
"""
hash_value = hash(user_id)
return hash_value % bucket_count注意事项:
- 同一用户必须始终分配到同一分桶
- 分桶分布应尽量均匀
- 新用户/老用户可以分流策略
4.2 AB 测试实施步骤
4.2.1 开发两个策略分支
开发同一个接口的两个分支:
- 分桶 A:使用旧策略(Baseline)
- 分桶 B:使用新策略(Experiment)
4.2.2 配置实验分流
在 AB 测试配置中设置分流比例:
{
"experiment_id": "exp_001",
"bucket_allocation": {
"A": 0.6, // 60% 流量
"B": 0.4 // 40% 流量
},
"status": "running"
}4.2.3 用户请求处理
1. 用户访问推荐接口
2. 请求携带用户ID
3. AB 测试服务根据用户ID计算分桶
4. 返回分桶结果:{user_id: "123456", bucket: "A"}
5. 根据分桶选择对应的策略返回推荐结果4.2.4 日志收集
所有用户交互日志都带上分桶标志:
{
"user_id": "123456",
"bucket": "A",
"event_type": "click",
"item_id": "item_001",
"timestamp": 1640000000
}关键指标:
- 展现(Impression):物品展示次数
- 点击(Click):用户点击次数
- 购买(Purchase):用户购买次数
- CTR(Click-Through Rate):点击率 = 点击 / 展现
- CVR(Conversion Rate):转化率 = 购买 / 点击
- GMV(Gross Merchandise Value):交易总额
4.2.5 结果分析
A/B 分桶对比表:
| 指标 | 分桶 A(Baseline) | 分桶 B(Experiment) | 提升幅度 |
|---|---|---|---|
| 展现量 | 100,000 | 100,000 | 0% |
| 点击量 | 5,000 | 5,500 | +10% |
| CTR | 5% | 5.5% | +0.5pp |
| 购买量 | 500 | 600 | +20% |
| CVR | 10% | 10.9% | +0.9pp |
| GMV | $50,000 | $60,000 | +20% |
4.2.6 统计显著性检验
不能只看绝对值,需要进行统计检验:
- t 检验:检验均值差异是否显著
- 卡方检验:检验比率差异是否显著
- 置信区间:计算差异的 95% 置信区间
判断标准:
- P值 < 0.05:差异显著
- P值 ≥ 0.05:差异不显著,可能是随机波动
4.3 实验优化与全量部署
4.3.1 调整流量
如果分桶 B 表现更好:
- 调大分桶 B 的流量比例(如 60% → 80%)
- 继续观察,确认稳定性
4.3.2 全量部署
如果分桶 B 持续优于 A:
- 将分桶 B 流量调整到 100%
- 停止分桶 A
- 分桶 B 成为新的 Baseline
- 开始下一轮实验
4.4 AB 测试最佳实践
✅ 应该做的:
- 实验周期足够长(至少 7 天)
- 控制其他变量(如流量入口)
- 多指标综合评估
- 统计显著性检验
❌ 不应该做的:
- 实验周期过短(如 1 天)
- 忽略统计显著性
- 只看单一指标
- 数据窥视(提前终止实验)
5. 内容召回全流程实现
内容召回是推荐系统中最基础、最常用的召回方式之一。本节将详细讲解从内容采集到最终服务的完整实现流程。
5.1 核心目标
主要目标:计算物品与其它最相似的物品列表
应用场景:
- 相似推荐:商品详情页的"相似商品"
- 扩展推荐:基于用户历史物品找相似物
- 替代推荐:当某个物品不可用时推荐替代品
5.2 完整流程图
内容采集
↓
中文分词 & 关键词提取
↓
向量化表示(Word2Vec / Doc2Vec)
↓
TopN 相似近邻搜索(余弦相似度)
↓
存储(Redis)
↓
Web 服务(Flask / Java)
↓
返回相似物品列表5.3 详细实现步骤
5.3.1 内容采集
采集的物品信息:
| 字段 | 说明 | 示例 |
|---|---|---|
| ID | 物品唯一标识 | "item_001" |
| title | 标题 | "Python 编程从入门到实践" |
| description | 详细描述 | "本书是 Python 入门经典..." |
| tags | 标签 | ["编程", "Python", "入门"] |
| category | 分类 | "计算机" |
数据来源:
- 数据库查询(MySQL / MongoDB)
- 爬虫采集
- API 接口
- 手动标注
5.3.2 中文分词 & 关键词提取
工具:jieba 分词
import jieba
import jieba.analyse
# 加载自定义词典
jieba.load_userdict("custom_dict.txt")
# 分词
text = "Python 编程从入门到实践"
words = jieba.lcut(text)
# 输出:['Python', '编程', '从', '入门', '到', '实践']
# 关键词提取
keywords = jieba.analyse.extract_tags(text, topK=10, withWeight=True)
# 输出:[('编程', 0.5), ('Python', 0.4), ('入门', 0.3), ...]关键词提取算法:
- TF-IDF:关键词重要性
- TextRank:基于图的关键词排序
- 自定义词典:人工标注的关键词
5.3.3 向量化表示
将文本转换为数值向量,便于计算相似度。
Word2Vec
核心思想:将词语映射到高维向量空间,相似的词在向量空间中距离较近。
工具:
- gensim(Python)
- Spark Word2Vec(分布式)
- 腾讯 Word2Vec(预训练模型)
示例:
from gensim.models import Word2Vec
# 训练 Word2Vec 模型
sentences = [
["编程", "Python", "入门", "实践"],
["编程", "Java", "进阶", "实战"],
["编程", "JavaScript", "前端", "开发"]
]
model = Word2Vec(sentences, vector_size=100, window=5, min_count=1, workers=4)
# 获取词向量
vector = model.wv["Python"]
# 计算相似词
similar_words = model.wv.most_similar("编程")
# 输出:[('开发', 0.9), ('实战', 0.8), ...]Doc2Vec
核心思想:将整个文档映射到向量空间,适用于文档相似度计算。
示例:
from gensim.models.doc2vec import Doc2Vec, TaggedDocument
# 准备文档
documents = [
TaggedDocument(words=["Python", "编程", "入门"], tags=["doc1"]),
TaggedDocument(words=["Java", "编程", "进阶"], tags=["doc2"]),
TaggedDocument(words=["JavaScript", "前端", "开发"], tags=["doc3"])
]
# 训练模型
model = Doc2Vec(documents, vector_size=100, window=5, min_count=1, workers=4)
# 获取文档向量
vector = model.dv["doc1"]
# 计算相似文档
similar_docs = model.dv.most_similar("doc1")向量聚合(平均 / 加权平均)
对于长文本,可以将词向量聚合为文档向量:
import numpy as np
def document_to_vector(text, model):
words = jieba.lcut(text)
vectors = [model.wv[word] for word in words if word in model.wv]
# 平均
doc_vector = np.mean(vectors, axis=0)
# 加权平均(使用 TF-IDF 权重)
# doc_vector = np.average(vectors, axis=0, weights=weights)
return doc_vector5.3.4 TopN 相似近邻搜索
余弦相似度(最常用)
from scipy.spatial.distance import cosine
def cosine_similarity(vec1, vec2):
return 1 - cosine(vec1, vec2)
# 计算物品 A 和物品 B 的相似度
similarity = cosine_similarity(vector_a, vector_b)LSH 局部敏感哈希(大规模场景)
问题:如果有 100 万物品,两两计算相似度需要 5 万亿次计算,性能无法接受。
LSH 解决方案:
- 将相似物品哈希到同一个桶中
- 只计算同桶内的物品相似度
- 大幅减少计算量
工具:
datasketch(Python)- Spark LSH
from datasketch import MinHashLSH
# 创建 LSH 索引
lsh = MinHashLSH(threshold=0.5, num_perm=128)
# 添加物品
lsh.insert("item1", minhash1)
lsh.insert("item2", minhash2)
lsh.insert("item3", minhash3)
# 查询相似物品
result = lsh.query(minhash1)
# 输出:["item1", "item2"] # item2 与 item1 相似5.3.5 存储与服务
Redis 存储设计
数据结构:Hash
Key: item:{item_id}
Field: similar_items
Value: [{"item_id": "item2", "similarity": 0.9}, {"item_id": "item3", "similarity": 0.8}, ...]Redis 命令:
# 存储相似物品列表
HSET item:item1 similar_items '[{"item_id":"item2","similarity":0.9},...]'
# 查询相似物品
HGET item:item1 similar_itemsWeb 服务实现(Flask)
from flask import Flask, jsonify
import redis
app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)
@app.route('/recommend/similar/<item_id>')
def get_similar_items(item_id):
# 从 Redis 获取相似物品
similar_items_json = r.hget(f'item:{item_id}', 'similar_items')
if similar_items_json is None:
return jsonify({"error": "Item not found"}), 404
similar_items = json.loads(similar_items_json)
return jsonify({
"item_id": item_id,
"similar_items": similar_items
})
if __name__ == '__main__':
app.run(port=5000)Web 服务实现(Java Spring Boot)
@RestController
@RequestMapping("/recommend")
public class RecommendController {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@GetMapping("/similar/{itemId}")
public ResponseEntity<?> getSimilarItems(@PathVariable String itemId) {
String key = "item:" + itemId;
String json = redisTemplate.opsForHash().get(key, "similar_items");
if (json == null) {
return ResponseEntity.status(404).body("Item not found");
}
ObjectMapper mapper = new ObjectMapper();
List<SimilarItem> similarItems = mapper.readValue(json, new TypeReference<List<SimilarItem>>(){});
Map<String, Object> response = new HashMap<>();
response.put("item_id", itemId);
response.put("similar_items", similarItems);
return ResponseEntity.ok(response);
}
}5.6 完整代码示例
import jieba
import jieba.analyse
import numpy as np
from gensim.models import Word2Vec
from scipy.spatial.distance import cosine
import redis
import json
# 1. 加载 Word2Vec 模型
model = Word2Vec.load("word2vec.model")
# 2. 文本向量化
def text_to_vector(text):
words = jieba.lcut(text)
vectors = [model.wv[word] for word in words if word in model.wv]
if not vectors:
return np.zeros(model.vector_size)
return np.mean(vectors, axis=0)
# 3. 计算相似度
def calculate_similarity(vec1, vec2):
return 1 - cosine(vec1, vec2)
# 4. 构建相似物品索引
def build_similarity_index(items):
redis_client = redis.Redis(host='localhost', port=6379, db=0)
for item in items:
item_id = item['id']
item_text = item['title'] + ' ' + item['description']
item_vector = text_to_vector(item_text)
# 查找最相似的 Top10 物品
similarities = []
for other_item in items:
if other_item['id'] == item_id:
continue
other_text = other_item['title'] + ' ' + other_item['description']
other_vector = text_to_vector(other_text)
similarity = calculate_similarity(item_vector, other_vector)
similarities.append({
'item_id': other_item['id'],
'similarity': similarity
})
# 排序并取 Top10
similarities.sort(key=lambda x: x['similarity'], reverse=True)
top10 = similarities[:10]
# 存储到 Redis
redis_client.hset(f'item:{item_id}', 'similar_items', json.dumps(top10))
print("Similarity index built successfully!")
# 5. 查询相似物品
def get_similar_items(item_id):
redis_client = redis.Redis(host='localhost', port=6379, db=0)
json_str = redis_client.hget(f'item:{item_id}', 'similar_items')
if json_str:
return json.loads(json_str)
else:
return []5.7 性能优化建议
✅ 优化建议:
- 批量计算:离线批量计算相似度,避免在线计算
- 缓存热点:Redis 缓存热门物品的相似列表
- 增量更新:新物品增量计算相似度,而非全量重算
- LSH 加速:大规模场景使用 LSH 减少计算量
- 向量压缩:使用 PCA 降维或量化压缩向量
⏱️ 性能指标:
- 召回时间:< 50ms(从 Redis 读取)
- 离线计算:百万级物品 < 1 小时
- 增量更新:新增物品 < 10s
6. 用户聚类推荐
用户聚类推荐是一种基于用户群体的推荐方法,通过将相似用户分到同一组,然后推荐该群体中热门的物品。
6.1 核心思想
原理:将用户按照兴趣特征分成 K 个聚类,每个聚类代表一个用户群体。对于新用户,先找到他属于哪个聚类,然后推荐该聚类中热门的物品。
优势:
- 可解释性强("和你类似的人都在看")
- 计算效率高(一次聚类,多次复用)
- 适合用户数量大但活跃用户少的场景
6.2 实现流程
6.2.1 用户聚类
步骤 1:特征提取
将用户的各种特征转换为数值向量:
from pyspark.ml.feature import OneHotEncoder, Word2Vec, VectorAssembler
from pyspark.ml.clustering import KMeans
# 1. 类别信息进行 one-hot 编码
gender_encoder = OneHotEncoder(inputCol="gender", outputCol="gender_vec")
# 2. 行为列表进行 word embedding 获得定长向量列表
# 假设用户行为是 ["商品A", "商品B", "商品C"]
word2vec = Word2Vec(vectorSize=100, minCount=1, inputCol="user_actions", outputCol="action_vec")
# 3. VectorAssembler 拼接所有特征向量
assembler = VectorAssembler(
inputCols=["gender_vec", "action_vec", "age_vec"],
outputCol="features"
)
# 4. 聚类算法 k-means
kmeans = KMeans(k=10, seed=42)步骤 2:训练聚类模型
from pyspark.ml import Pipeline
# 构建流水线
pipeline = Pipeline(stages=[gender_encoder, word2vec, assembler, kmeans])
# 训练模型
model = pipeline.fit(user_data)
# 获取聚类结果
clustered_users = model.transform(user_data)
# 输出格式:
# | user_id | features | prediction |
# |---------|----------|------------|
# | 001 | [0.1,...]| 3 |
# | 002 | [0.2,...]| 3 |步骤 3:聚类结果分析
聚类结果是一个用户ID到聚类数字的映射:
# 获取用户聚类数字
user_cluster_map = {
"user_001": 3,
"user_002": 3,
"user_003": 5,
"user_004": 5,
...
}6.2.2 分群热榜统计
目标:为每个聚类计算热门物品榜单
方法:统计该聚类中用户的历史行为,计算每个物品的热度
from pyspark.sql import functions as F
# 合并用户聚类信息和历史行为
user_actions_with_cluster = user_actions.join(
clustered_users.select("user_id", "prediction"),
on="user_id"
)
# 按聚类分组,统计物品热度
cluster_hot_items = user_actions_with_cluster.groupBy(
"prediction", "item_id"
).agg(
F.count("*").alias("popularity")
).orderBy("prediction", F.desc("popularity"))
# 为每个聚类取 Top20 热门物品
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("prediction").orderBy(F.desc("popularity"))
cluster_hot_rank = cluster_hot_items.withColumn(
"rank", row_number().over(windowSpec)
).filter(F.col("rank") <= 20)
# 结果格式:
# | prediction | item_id | popularity | rank |
# |------------|---------|------------|------|
# | 3 | item_123| 5000 | 1 |
# | 3 | item_456| 4800 | 2 |热榜结果:聚类 → 热门物品列表
cluster_hot_list = {
0: ["item_001", "item_002", ...], # 聚类0的热门物品
1: ["item_101", "item_102", ...], # 聚类1的热门物品
...
}6.2.3 计算结果缓存
正排列表:用户ID → 聚类数字
存储:Cassandra
# Cassandra 表设计
CREATE TABLE user_cluster (
user_id text PRIMARY KEY,
cluster_id int
);
# 写入数据
INSERT INTO user_cluster (user_id, cluster_id) VALUES ('user_001', 3);倒排列表:聚类数字 → 推荐ITEM列表
存储:Redis
# Redis Key 设计
# cluster:{cluster_id}:hot_items
# 存储格式(List)
LPUSH cluster:3:hot_items "item_123"
LPUSH cluster:3:hot_items "item_456"
...import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 存储聚类热门物品
for cluster_id, items in cluster_hot_list.items():
key = f"cluster:{cluster_id}:hot_items"
r.delete(key) # 清空旧数据
for item_id in items:
r.lpush(key, item_id)6.2.4 在线服务
接口设计:
输入:用户ID
输出:推荐物品列表流程:
用户请求 (user_id)
↓
从 Cassandra 查询用户聚类 (正排查询)
↓
从 Redis 获取该聚类推荐列表 (倒排查询)
↓
返回推荐结果实现代码(Python Flask):
from flask import Flask, jsonify
import redis
from cassandra.cluster import Cluster
app = Flask(__name__)
# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 连接 Cassandra
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('recommendation')
@app.route('/recommend/user/<user_id>')
def recommend_user(user_id):
# 1. 从 Cassandra 获取用户聚类
query = f"SELECT cluster_id FROM user_cluster WHERE user_id = '{user_id}'"
rows = session.execute(query)
if not rows:
return jsonify({"error": "User not found"}), 404
cluster_id = rows[0].cluster_id
# 2. 从 Redis 获取该聚类的热门物品
key = f"cluster:{cluster_id}:hot_items"
items = r.lrange(key, 0, 19) # 获取 Top20
# 解码 bytes
items = [item.decode('utf-8') for item in items]
return jsonify({
"user_id": user_id,
"cluster_id": cluster_id,
"recommendations": items
})
if __name__ == '__main__':
app.run(port=5000)6.3 优化建议
✅ 优化方向:
- 定期更新聚类:用户兴趣会变化,建议每周重新聚类一次
- 热榜时效性:按天/周统计热度,保证新鲜度
- 冷启动优化:新用户先用热门物品推荐,积累数据后再聚类
- 混合策略:结合协同过滤,提高推荐准确性
⏱️ 性能指标:
- 聚类时间:百万用户,K=10,< 30分钟
- 热榜统计:< 10分钟
- 在线查询:< 10ms
7. 矩阵分解的推荐
矩阵分解是推荐系统中最经典、最有效的算法之一,广泛应用于工业界。
7.1 核心思想
问题:用户-物品评分矩阵通常是极其稀疏的(用户只会给很少的物品评分)
解决方法:将稀疏的大矩阵分解为两个稠密的小矩阵的乘积
$$R \approx U \times V^T$$
其中:
- $R$:用户-物品评分矩阵($m \times n$,稀疏)
- $U$:用户隐含因子矩阵($m \times k$)
- $V$:物品隐含因子矩阵($n \times k$)
- $k$:隐含因子数量(通常 20-100)
直观理解:
- 每个用户用一个 $k$ 维向量表示,代表用户的偏好
- 每个物品用一个 $k$ 维向量表示,代表物品的特征
- 用户对物品的预测评分 = 用户向量 · 物品向量
7.2 PySpark ALS 算法实现
7.2.1 原始输入文件
数据格式:用户ID、物品ID、评分
user_id,item_id,rating
1,101,5.0
1,102,3.0
2,101,2.0
2,103,5.0
3,102,4.0
...7.2.2 矩阵分解模型训练
算法:ALS(Alternating Least Squares)
ALS 是 Spark MLlib 中最常用的矩阵分解算法,原理是交替最小二乘法:
- 固定物品矩阵 V,优化用户矩阵 U
- 固定用户矩阵 U,优化物品矩阵 V
- 重复 1-2 直到收敛
代码实现:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
# 创建 SparkSession
spark = SparkSession.builder.appName("MatrixFactorization").getOrCreate()
# 读取数据
data = spark.read.csv("ratings.csv", header=True, inferSchema=True)
# 训练集和测试集划分
train, test = data.randomSplit([0.8, 0.2])
# ALS 参数
als = ALS(
userCol="user_id",
itemCol="item_id",
ratingCol="rating",
rank=20, # 隐含因子数量
maxIter=10, # 最大迭代次数
regParam=0.1, # 正则化参数
coldStartStrategy="drop" # 处理冷启动
)
# 训练模型
model = als.fit(train)
# 预测测试集
predictions = model.transform(test)
# 评估模型
evaluator = RegressionEvaluator(
metricName="rmse",
labelCol="rating",
predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")7.2.3 获得用户向量和物品向量
用户向量:
# 获取用户因子矩阵
user_factors = model.userFactors
user_factors.show(5)
# 输出格式:
# +--------+--------------------+
# |user_id| features |
# +--------+--------------------+
# | 1 |[0.1,0.2,0.3,...] |
# | 2 |[0.2,0.1,0.4,...] |
# +--------+--------------------+
# 转换为字典
user_vectors = {row['user_id']: row['features'].toArray() for row in user_factors.collect()}物品向量:
# 获取物品因子矩阵
item_factors = model.itemFactors
item_factors.show(5)
# 输出格式:
# +--------+--------------------+
# |item_id| features |
# +--------+--------------------+
# | 101|[0.3,0.1,0.2,...] |
# | 102|[0.2,0.4,0.1,...] |
# +--------+--------------------+
# 转换为字典
item_vectors = {row['item_id']: row['features'].toArray() for row in item_factors.collect()}7.2.4 生成推荐列表
U2I 推荐:用户向量 × 物品向量
import numpy as np
from pyspark.sql.functions import col
# 为用户 1 推荐物品
user_id = 1
user_vec = user_vectors[user_id]
# 计算用户对所有物品的评分
recommendations = []
for item_id, item_vec in item_vectors.items():
score = np.dot(user_vec, item_vec) # 点积
recommendations.append((item_id, score))
# 排序并取 TopN
recommendations.sort(key=lambda x: x[1], reverse=True)
top_n = recommendations[:20]
print(f"Top20 recommendations for user {user_id}:")
for item_id, score in top_n:
print(f"Item {item_id}: {score:.4f}")或者使用 Spark 内置方法:
# 为所有用户推荐 Top10
user_recs = model.recommendForAllUsers(10)
user_recs.show(5)
# 输出格式:
# +--------+--------------------+
# |user_id| recommendations |
# +--------+--------------------+
# | 1 |[[102,4.5],[...]] |
# +--------+--------------------+
# 为单个用户推荐
user_subset = train.filter(col("user_id") == 1)
user_subset_recs = model.recommendForUserSubset(user_subset, 10)
user_subset_recs.show()7.2.5 生成 I2I 相似推荐
I2I 推荐:物品向量 × 物品向量
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
# 计算物品相似度矩阵
item_ids = list(item_vectors.keys())
item_vecs = np.array([item_vectors[i] for i in item_ids])
# 使用余弦相似度
similarities = cosine_similarity(item_vecs)
# 为物品 101 找最相似的物品
item_id = "101"
item_idx = item_ids.index(item_id)
# 获取相似度排序
sim_scores = list(enumerate(similarities[item_idx]))
sim_scores.sort(key=lambda x: x[1], reverse=True)
# 取 Top10(排除自己)
top_similar = [(item_ids[i], score) for i, score in sim_scores[1:11]]
print(f"Top10 similar items to {item_id}:")
for similar_item_id, score in top_similar:
print(f"{similar_item_id}: {score:.4f}")存储到 Redis:
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
# 存储每个物品的相似物品列表
for item_id in item_ids:
item_idx = item_ids.index(item_id)
sim_scores = list(enumerate(similarities[item_idx]))
sim_scores.sort(key=lambda x: x[1], reverse=True)
# 取 Top10
similar_items = [
{"item_id": item_ids[i], "similarity": float(score)}
for i, score in sim_scores[1:11]
]
# 存储到 Redis
key = f"item:{item_id}:similar"
r.set(key, json.dumps(similar_items))
print("Item similarity index saved to Redis!")7.3 矩阵分解的优缺点
优点:
- ✅ 处理稀疏数据效果好
- ✅ 捕捉潜在特征,可解释性强
- ✅ 预测评分准确
- ✅ 计算效率高(Spark ALS 分布式)
缺点:
- ❌ 冷启动问题严重(新用户/新物品无法推荐)
- ❌ 无法利用物品内容信息
- ❌ 参数调优复杂(rank、regParam 等)
7.4 实际应用场景
- 电影推荐:Netflix、豆瓣电影
- 音乐推荐:Spotify、网易云音乐
- 电商推荐:亚马逊、淘宝商品推荐
8. 推荐系统的 API 接口设计
推荐系统的 API 接口是在线服务层与客户端交互的桥梁,设计良好的接口至关重要。
8.1 接口设计原则
✅ 好的接口设计:
- RESTful 风格:遵循 REST 架构风格
- 版本控制:URL 中包含版本号(如
/v1/recommend) - 统一格式:请求和响应格式统一
- 错误处理:清晰的错误码和错误信息
- 性能优化:接口响应时间 < 100ms
- 幂等性:相同请求返回相同结果
8.2 核心接口设计
8.2.1 个性化推荐接口
场景:首页推荐、"猜你喜欢"
请求:
GET /v1/recommend/personalized请求参数:
{
"user_id": "user_123456",
"context": {
"device": "mobile",
"location": "beijing",
"time": "2026-03-28 20:00:00"
},
"limit": 20
}响应:
{
"code": 0,
"message": "success",
"data": {
"user_id": "user_123456",
"recommendations": [
{
"item_id": "item_001",
"score": 0.95,
"reason": "基于你的历史行为",
"recall_source": "cf"
},
{
"item_id": "item_002",
"score": 0.88,
"reason": "热门推荐",
"recall_source": "hot"
}
],
"request_id": "req_1234567890"
},
"timestamp": 1700000000
}8.2.2 相似推荐接口
场景:商品详情页"相似商品"
请求:
GET /v1/recommend/similar/<item_id>请求参数:
{
"item_id": "item_001",
"limit": 10
}响应:
{
"code": 0,
"message": "success",
"data": {
"item_id": "item_001",
"similar_items": [
{
"item_id": "item_123",
"similarity": 0.92,
"title": "Python 编程从入门到实践"
},
{
"item_id": "item_456",
"similarity": 0.87,
"title": "Java 核心技术卷一"
}
]
},
"timestamp": 1700000000
}8.2.3 热门推荐接口
场景:热门榜单、流量兜底
请求:
GET /v1/recommend/hot请求参数:
{
"category": "tech",
"period": "daily", // daily, weekly, monthly
"limit": 20
}响应:
{
"code": 0,
"message": "success",
"data": {
"category": "tech",
"period": "daily",
"hot_items": [
{
"item_id": "item_001",
"rank": 1,
"popularity": 5000,
"trend": "up"
},
{
"item_id": "item_002",
"rank": 2,
"popularity": 4800,
"trend": "stable"
}
]
},
"timestamp": 1700000000
}8.3 错误码设计
统一错误码格式:
{
"code": 40001,
"message": "用户不存在",
"data": null,
"request_id": "req_1234567890"
}常见错误码:
| 错误码 | 说明 |
|---|---|
| 0 | 成功 |
| 40001 | 参数错误 |
| 40002 | 用户不存在 |
| 40003 | 物品不存在 |
| 50001 | 推荐服务超时 |
| 50002 | 推荐服务异常 |
| 50003 | 数据库异常 |
8.4 性能优化
缓存策略:
from flask import Flask
from functools import lru_cache
import redis
app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)
# 方案 1:使用 LRU 缓存(内存)
@lru_cache(maxsize=10000)
def get_recommendations_from_db(user_id):
# 从数据库获取推荐
pass
# 方案 2:使用 Redis 缓存
def get_recommendations(user_id, limit=20):
# 先查缓存
cache_key = f"recommend:{user_id}:{limit}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 缓存未命中,从数据库获取
recommendations = get_recommendations_from_db(user_id, limit)
# 写入缓存(5分钟过期)
r.setex(cache_key, 300, json.dumps(recommendations))
return recommendations超时设置:
from flask import Flask, jsonify
import signal
app = Flask(__name__)
# 超时处理
def timeout_handler(signum, frame):
raise TimeoutError("Request timeout")
@app.route('/v1/recommend/personalized')
def recommend_personalized():
# 设置超时(100ms)
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(0.1) # 100ms
try:
user_id = request.args.get('user_id')
recommendations = get_recommendations(user_id)
return jsonify({"code": 0, "data": recommendations})
except TimeoutError:
# 超时返回兜底推荐
fallback_items = get_fallback_items()
return jsonify({
"code": 0,
"data": fallback_items,
"warning": "timeout, using fallback"
})
finally:
signal.alarm(0) # 取消超时9. 物品冷启动问题
冷启动是推荐系统的经典难题,指新物品没有历史数据,无法被推荐系统识别和推荐。
9.1 冷启动场景
物品冷启动:
- 新上架的商品/内容
- 没有用户互动过的物品
- 长尾物品(曝光机会少)
用户冷启动:
- 新注册用户
- 没有历史行为的用户
9.2 解决方案一:基于内容推荐
原理:U2I2I - 用户→物品→物品
当物品没有行为数据时,利用物品的内容特征进行推荐。
方法 1:内容相似度
# 基于物品内容计算相似度
def content_based_recommendation(user_id, new_item_id):
# 1. 获取用户历史行为
user_history = get_user_history(user_id)
# 2. 找用户历史物品与新物品的相似度
similarities = []
for old_item_id in user_history:
sim = calculate_content_similarity(old_item_id, new_item_id)
similarities.append((old_item_id, sim))
# 3. 如果有高相似度的历史物品,则推荐该新物品
max_sim = max(similarities, key=lambda x: x[1])
if max_sim[1] > 0.7: # 相似度阈值
return True # 推荐新物品
return False # 不推荐方法 2:标签匹配
def tag_based_recommendation(user_id, new_item):
# 1. 提取用户兴趣标签
user_tags = get_user_tags(user_id)
# 输出:["Python", "编程", "AI"]
# 2. 提取新物品标签
item_tags = new_item['tags']
# 输出:["Python", "入门"]
# 3. 计算标签重合度
overlap = len(set(user_tags) & set(item_tags))
if overlap >= 2: # 至少有2个标签重合
return True
return False9.3 解决方案二:多级流量池机制(抖音模式)
抖音的多级流量池机制是解决冷启动的经典方案,也称"赛马机制"。
9.3.1 核心思想
原理:新物品先在小的流量池测试,表现好的逐步进入更大的流量池。
流程:
新物品上架
↓
内容质量审核(标题、简介、封面、账号粉丝数)
↓
一级流量池(500推荐量)
↓
表现达标?
├─ 是 → 二级流量池(1W推荐量)
│ ↓
│ 表现达标?
│ ├─ 是 → 三级流量池(10W推荐量)
│ │ ↓
│ │ ...
│ └─ 否 → 停止推荐
└─ 否 → 停止推荐9.3.2 流量池设计
一级流量池(初始测试)
- 推荐量:500次
- 目标用户:随机分发
- 考核指标:
- 播放率 > 30%
- 点赞率 > 10%
- 评论率 > 5%
- 完播率 > 40%
二级流量池(小规模推广)
- 推荐量:1万次
- 目标用户:相似兴趣用户
- 考核指标:
- 播放率 > 25%
- 点赞率 > 8%
- 评论率 > 4%
三级流量池(中等规模)
- 推荐量:10万次
- 目标用户:精准推荐
- 考核指标:
- 播放率 > 20%
- 点赞率 > 6%
N级流量池(大规模推广)
- 推荐量:100万+次
- 目标用户:全量用户
- 考核指标:相对指标(与其他内容对比)
9.3.3 晋升影响因素
核心指标:
- 播放率 = 播放量 / 曝光量
- 点赞率 = 点赞量 / 播放量
- 评论率 = 评论量 / 播放量
- 转发率 = 转发量 / 播放量
- 完播率 = 完整观看次数 / 播放量
- 收藏率 = 收藏量 / 播放量
- 转化率 = 转化量 / 播放量
综合评分:
def calculate_content_score(content_id):
metrics = get_content_metrics(content_id)
# 各指标权重(根据业务调整)
weights = {
'play_rate': 0.3,
'like_rate': 0.2,
'comment_rate': 0.15,
'share_rate': 0.15,
'completion_rate': 0.2
}
# 计算综合评分
score = (
weights['play_rate'] * metrics['play_rate'] +
weights['like_rate'] * metrics['like_rate'] +
weights['comment_rate'] * metrics['comment_rate'] +
weights['share_rate'] * metrics['share_rate'] +
weights['completion_rate'] * metrics['completion_rate']
)
return score
# 流量池晋升判断
def should_promote(content_id, current_level):
score = calculate_content_score(content_id)
# 不同流量池的晋升阈值
thresholds = {
1: 0.6, # 一级升二级
2: 0.7, # 二级升三级
3: 0.75 # 三级升四级
}
return score >= thresholds[current_level]9.3.4 冷启动结束
停止推荐的条件:
- 综合评分低于阈值,连续3个流量池未通过
- 内容被举报/违规
- 账号被降权/封禁
- 内容过时(如新闻超过7天)
处理方式:
def handle_cold_start_failure(content_id):
content = get_content(content_id)
# 分析失败原因
reason = analyze_failure_reason(content_id)
# 可能原因:标题不吸引、封面质量差、内容质量低等
# 记录失败原因,供创作者参考
log_failure_reason(content_id, reason)
# 停止推荐
stop_recommendation(content_id)9.4 其他冷启动解决方案
方案 3:利用社交关系
def social_based_recommendation(user_id, new_item):
# 获取用户关注的好友
friends = get_user_friends(user_id)
# 检查是否有好友已经互动过该新物品
for friend_id in friends:
if has_interaction(friend_id, new_item['id']):
# 好友喜欢,推荐给用户
return True
return False方案 4:主动征集反馈
# 在推荐结果中插入少量新物品
def recommend_with_exploration(user_id, limit=20):
# 正常推荐(18个)
normal_items = get_normal_recommendations(user_id, limit=18)
# 探索推荐(2个新物品)
new_items = get_random_new_items(limit=2)
# 合并返回
return normal_items + new_items方案 5:利用外部数据
- 物品的搜索热度
- 社交媒体讨论度
- 竞品平台表现
10. Embedding 技术深度解析
Embedding(嵌入)是将离散的实体(如用户、物品、词语)映射到连续的低维向量空间的技术,是现代推荐系统的核心。
10.1 Embedding 的核心思想
直观理解:
- 将复杂的实体用一个向量表示
- 相似的实体在向量空间中距离较近
- 向量可以参与数学运算(加减、点积等)
示例:
国王 - 男人 + 女人 ≈ 女王
北京 - 中国 + 法国 ≈ 巴黎10.2 三种主流 Embedding 方法
10.2.1 基于内容的 Word2Vec
应用场景:文本内容丰富的物品(如文章、商品描述)
方法:
from gensim.models import Word2Vec
import jieba
# 1. 训练 Word2Vec 模型
sentences = [
jieba.lcut("Python 编程从入门到实践"),
jieba.lcut("Java 核心技术卷一"),
jieba.lcut("JavaScript 高级程序设计")
]
model = Word2Vec(sentences, vector_size=100, window=5, min_count=1)
# 2. 获取物品向量
def get_item_embedding(text):
words = jieba.lcut(text)
vectors = [model.wv[word] for word in words if word in model.wv]
if vectors:
return np.mean(vectors, axis=0) # 平均
else:
return np.zeros(100)
# 3. 计算物品相似度
item1_text = "Python 编程从入门到实践"
item2_text = "Java 核心技术卷一"
vec1 = get_item_embedding(item1_text)
vec2 = get_item_embedding(item2_text)
similarity = cosine_similarity([vec1], [vec2])[0][0]
print(f"Similarity: {similarity}")10.2.2 协同过滤矩阵分解(CF Embedding)
应用场景:有丰富用户-物品交互数据的场景
方法:使用 ALS 矩阵分解得到的用户向量和物品向量
from pyspark.ml.recommendation import ALS
# 训练 ALS 模型
als = ALS(
userCol="user_id",
itemCol="item_id",
ratingCol="rating",
rank=50 # Embedding 维度
)
model = als.fit(ratings_data)
# 获取 Embedding
user_embeddings = model.userFactors # 用户 Embedding
item_embeddings = model.itemFactors # 物品 Embedding10.2.3 深度学习方法(DNN Embedding)
应用场景:复杂的非线性关系
方法:
双塔模型(Two-Tower Model)
import tensorflow as tf
from tensorflow.keras import layers, Model
# 用户塔
def build_user_tower(user_features, embedding_dim=64):
user_id_input = layers.Input(shape=(1,), name='user_id')
user_embed = layers.Embedding(input_dim=10000, output_dim=embedding_dim)(user_id_input)
user_vec = layers.Flatten()(user_embed)
# 添加其他用户特征
age_input = layers.Input(shape=(1,), name='age')
gender_input = layers.Input(shape=(1,), name='gender')
# 拼接
concat = layers.Concatenate()([user_vec, age_input, gender_input])
# 全连接层
dense = layers.Dense(128, activation='relu')(concat)
dense = layers.Dense(64, activation='relu')(dense)
user_embedding = layers.Dense(embedding_dim, activation='linear')(dense)
return Model(inputs=[user_id_input, age_input, gender_input], outputs=user_embedding)
# 物品塔
def build_item_tower(item_features, embedding_dim=64):
item_id_input = layers.Input(shape=(1,), name='item_id')
item_embed = layers.Embedding(input_dim=50000, output_dim=embedding_dim)(item_id_input)
item_vec = layers.Flatten()(item_embed)
# 添加其他物品特征
category_input = layers.Input(shape=(10,), name='category') # one-hot
price_input = layers.Input(shape=(1,), name='price')
# 拼接
concat = layers.Concatenate()([item_vec, category_input, price_input])
# 全连接层
dense = layers.Dense(128, activation='relu')(concat)
dense = layers.Dense(64, activation='relu')(dense)
item_embedding = layers.Dense(embedding_dim, activation='linear')(dense)
return Model(inputs=[item_id_input, category_input, price_input], outputs=item_embedding)
# 构建模型
user_tower = build_user_tower(None)
item_tower = build_item_tower(None)
# 计算相似度
def similarity_score(user_emb, item_emb):
# 点积
dot_product = tf.reduce_sum(user_emb * item_emb, axis=1, keepdims=True)
return dot_product
# 组合模型
user_embedding = user_tower([
layers.Input(shape=(1,), name='user_id'),
layers.Input(shape=(1,), name='age'),
layers.Input(shape=(1,), name='gender')
])
item_embedding = item_tower([
layers.Input(shape=(1,), name='item_id'),
layers.Input(shape=(10,), name='category'),
layers.Input(shape=(1,), name='price')
])
score = similarity_score(user_embedding, item_embedding)
# 完整模型
model = Model(
inputs=user_tower.inputs + item_tower.inputs,
outputs=score
)
# 编译
model.compile(optimizer='adam', loss='mse')
# 训练
model.fit([user_ids, ages, genders, item_ids, categories, prices], ratings, epochs=10)10.3 Embedding 在推荐系统中的应用
1. 用户画像
# 用户兴趣向量
user_interest_vector = get_user_embedding(user_id)
# 分析用户兴趣
def analyze_user_interest(user_embedding):
# 找到最接近的兴趣标签
interest_tags = ['科技', '娱乐', '体育', '财经', '教育']
tag_embeddings = load_tag_embeddings()
similarities = []
for tag in interest_tags:
sim = cosine_similarity([user_embedding], [tag_embeddings[tag]])[0][0]
similarities.append((tag, sim))
# 排序
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:5] # Top5 兴趣2. 物品相似度计算
# 计算物品相似度
def find_similar_items(target_item_id, top_n=10):
target_vector = get_item_embedding(target_item_id)
similarities = []
for item_id in all_items:
if item_id == target_item_id:
continue
item_vector = get_item_embedding(item_id)
sim = cosine_similarity([target_vector], [item_vector])[0][0]
similarities.append((item_id, sim))
# 排序
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_n]3. 推荐打分
# 用户对物品的推荐分数 = 用户向量 · 物品向量
def recommend_score(user_id, item_id):
user_vector = get_user_embedding(user_id)
item_vector = get_item_embedding(item_id)
score = np.dot(user_vector, item_vector)
return score10.4 Embedding 的优势
✅ 优势:
- 降维:将高维稀疏数据转换为低维稠密向量
- 捕捉语义:学习实体之间的潜在关系
- 通用性:可用于多种推荐场景
- 可解释性:通过可视化理解特征
10.5 Embedding 的选择建议
| 场景 | 推荐方法 |
|---|---|
| 文本内容丰富 | Word2Vec / BERT |
| 用户行为数据多 | ALS 矩阵分解 |
| 复杂非线性关系 | 双塔 DNN 模型 |
| 图结构数据 | Graph Embedding(Node2Vec) |
11. 总结
本章详细介绍了推荐系统的经典算法与技术细节,包括:
- 基于内容的推荐:通过计算用户向量和物品向量的相似度来推荐
- 协同过滤推荐:
- 基于记忆的方法(User-based、Item-based)
- 基于模型的方法(矩阵分解、深度学习)
- 多路召回融合:按顺序、平均、加权、动态、机器学习等策略
- AB 测试:科学评估推荐效果的完整流程
- 内容召回全流程:从内容采集到 Web 服务的完整实现
通过这两章的学习,你已经掌握了推荐系统的基础架构和核心算法,可以开始动手实践了!
💡 下一步建议:
- 动手实现一个简单的推荐系统(如电影推荐)
- 学习深度学习推荐模型(如 Wide & Deep、DeepFM)
- 关注工业级推荐系统的性能优化和工程实践