协同过滤算法¶
📖 章节导读¶
协同过滤(Collaborative Filtering, CF)是推荐系统中最经典和最常用的算法之一。本章将深入介绍协同过滤的原理、基于用户和基于物品的方法、相似度计算技术以及实际应用。
🎯 学习目标¶
- 理解协同过滤的基本原理
- 掌握基于用户的协同过滤(User-based CF)
- 掌握基于物品的协同过滤(Item-based CF)
- 熟练使用各种相似度计算方法
- 能够实现协同过滤算法
3.1 协同过滤概述¶
3.1.1 基本思想¶
协同过滤的核心思想是:"物以类聚,人以群分"。
- 基于用户的协同过滤:找到与目标用户相似的其他用户,推荐这些相似用户喜欢的物品
- 基于物品的协同过滤:找到与用户喜欢的物品相似的其他物品,推荐这些相似物品
3.1.2 假设前提¶
协同过滤基于以下假设: 1. 用户偏好一致性:如果两个用户在过去对物品的偏好相似,那么他们在未来对物品的偏好也会相似 2. 物品相似性:如果两个物品被相似的用户喜欢,那么这两个物品是相似的
3.1.3 优缺点¶
优点: - 不需要物品的内容信息 - 可以发现用户意想不到的物品 - 推荐效果好,被广泛应用
缺点: - 冷启动问题:新用户或新物品无法推荐 - 数据稀疏性问题:用户-物品矩阵非常稀疏 - 流行度偏向:热门物品容易被推荐
3.2 基于用户的协同过滤(User-based CF)¶
3.2.1 算法原理¶
步骤: 1. 找到与目标用户相似度最高的K个用户 2. 找到这K个用户喜欢但目标用户未接触的物品 3. 根据相似度和评分预测目标用户对这些物品的评分 4. 推荐预测评分最高的N个物品
3.2.2 相似度计算¶
皮尔逊相关系数:
def pearson_similarity(user1, user2):
"""
计算两个用户的皮尔逊相关系数
"""
# 找到两个用户共同评分的物品
common_items = set(user1.keys()) & set(user2.keys())
if len(common_items) < 2:
return 0
# 计算均值
mean1 = np.mean([user1[item] for item in common_items])
mean2 = np.mean([user2[item] for item in common_items])
# 计算相关系数
numerator = sum((user1[item] - mean1) * (user2[item] - mean2)
for item in common_items)
denominator1 = sum((user1[item] - mean1) ** 2 for item in common_items)
denominator2 = sum((user2[item] - mean2) ** 2 for item in common_items)
if denominator1 == 0 or denominator2 == 0:
return 0
return numerator / np.sqrt(denominator1 * denominator2)
余弦相似度:
def cosine_similarity(user1, user2):
"""
计算两个用户的余弦相似度
"""
# 找到两个用户共同评分的物品
common_items = set(user1.keys()) & set(user2.keys())
if len(common_items) == 0:
return 0
# 计算点积和模长
dot_product = sum(user1[item] * user2[item] for item in common_items)
norm1 = np.sqrt(sum(user1[item] ** 2 for item in common_items))
norm2 = np.sqrt(sum(user2[item] ** 2 for item in common_items))
if norm1 == 0 or norm2 == 0:
return 0
return dot_product / (norm1 * norm2)
3.2.3 预测评分¶
def predict_rating(target_user, user_similarities, ratings, item_id, k=10):
"""
预测目标用户对物品的评分
参数:
target_user: 目标用户ID
user_similarities: dict, {user_id: similarity_score}
ratings: dict, {user_id: {item_id: rating}}
item_id: 待预测的物品ID
k: 使用的近邻数量
"""
# 找到评分过该物品的用户
users_rated_item = [user for user in user_similarities.keys()
if item_id in ratings.get(user, {})]
# 按相似度排序,取前K个
top_k_users = sorted(users_rated_item,
key=lambda u: user_similarities[u], # lambda匿名函数
reverse=True)[:k]
# 计算加权平均
numerator = sum(user_similarities[user] * ratings[user][item_id]
for user in top_k_users)
denominator = sum(abs(user_similarities[user])
for user in top_k_users)
if denominator == 0:
return 0
return numerator / denominator
3.2.4 完整实现¶
import numpy as np
from collections import defaultdict
class UserBasedCF:
def __init__(self, k=10, similarity='pearson'):
self.k = k
self.similarity = similarity
self.user_similarities = None
self.train_data = None
def fit(self, train_data):
"""
训练模型
train_data: dict, {user_id: {item_id: rating}}
"""
self.train_data = train_data
self.user_similarities = self._compute_similarities()
def _compute_similarities(self):
"""
计算用户相似度矩阵
"""
similarities = defaultdict(dict) # defaultdict访问不存在的键时返回默认值
users = list(self.train_data.keys())
for i, user1 in enumerate(users): # enumerate同时获取索引和元素
for user2 in users[i+1:]:
if self.similarity == 'pearson':
sim = pearson_similarity(self.train_data[user1],
self.train_data[user2])
elif self.similarity == 'cosine':
sim = cosine_similarity(self.train_data[user1],
self.train_data[user2])
else:
sim = 0
similarities[user1][user2] = sim
similarities[user2][user1] = sim
return similarities
def recommend(self, user_id, n=10):
"""
为用户推荐物品
"""
if user_id not in self.train_data:
return []
# 获取用户已评分的物品
rated_items = set(self.train_data[user_id].keys())
# 获取相似用户
similar_users = self.user_similarities.get(user_id, {})
top_k_users = sorted(similar_users.items(),
key=lambda x: x[1],
reverse=True)[:self.k]
# 收集候选物品
candidate_items = defaultdict(float)
for similar_user, similarity in top_k_users:
for item_id, rating in self.train_data[similar_user].items():
if item_id not in rated_items:
candidate_items[item_id] += similarity * rating
# 排序并返回Top N
recommendations = sorted(candidate_items.items(),
key=lambda x: x[1],
reverse=True)[:n]
return [item_id for item_id, score in recommendations]
3.3 基于物品的协同过滤(Item-based CF)¶
3.3.1 算法原理¶
步骤: 1. 计算物品之间的相似度 2. 找到用户喜欢的物品的相似物品 3. 根据相似度和用户评分预测用户对相似物品的评分 4. 推荐预测评分最高的N个物品
3.3.2 物品相似度计算¶
def item_similarity(train_data, similarity='cosine'):
"""
计算物品相似度矩阵
"""
# 构建物品-用户倒排索引
item_users = defaultdict(dict)
for user_id, items in train_data.items():
for item_id, rating in items.items():
item_users[item_id][user_id] = rating
# 计算物品相似度
similarities = defaultdict(dict)
items = list(item_users.keys())
for i, item1 in enumerate(items):
for item2 in items[i+1:]:
if similarity == 'cosine':
sim = cosine_similarity(item_users[item1], item_users[item2])
elif similarity == 'pearson':
sim = pearson_similarity(item_users[item1], item_users[item2])
else:
sim = 0
similarities[item1][item2] = sim
similarities[item2][item1] = sim
return similarities
3.3.3 完整实现¶
class ItemBasedCF:
def __init__(self, k=10, similarity='cosine'):
self.k = k
self.similarity = similarity
self.item_similarities = None
self.train_data = None
def fit(self, train_data):
"""
训练模型
"""
self.train_data = train_data
self.item_similarities = self._compute_similarities()
def _compute_similarities(self):
"""
计算物品相似度矩阵
"""
# 构建物品-用户倒排索引
item_users = defaultdict(dict)
for user_id, items in self.train_data.items():
for item_id, rating in items.items():
item_users[item_id][user_id] = rating
# 计算物品相似度
similarities = defaultdict(dict)
items = list(item_users.keys())
for i, item1 in enumerate(items):
for item2 in items[i+1:]:
if self.similarity == 'cosine':
sim = cosine_similarity(item_users[item1],
item_users[item2])
elif self.similarity == 'pearson':
sim = pearson_similarity(item_users[item1],
item_users[item2])
else:
sim = 0
similarities[item1][item2] = sim
similarities[item2][item1] = sim
return similarities
def recommend(self, user_id, n=10):
"""
为用户推荐物品
"""
if user_id not in self.train_data:
return []
# 获取用户已评分的物品
user_items = self.train_data[user_id]
rated_items = set(user_items.keys())
# 收集候选物品
candidate_items = defaultdict(float)
for item_id, rating in user_items.items():
# 获取相似物品
similar_items = self.item_similarities.get(item_id, {})
top_k_items = sorted(similar_items.items(),
key=lambda x: x[1],
reverse=True)[:self.k]
# 累加评分
for similar_item, similarity in top_k_items:
if similar_item not in rated_items:
candidate_items[similar_item] += similarity * rating
# 排序并返回Top N
recommendations = sorted(candidate_items.items(),
key=lambda x: x[1],
reverse=True)[:n]
return [item_id for item_id, score in recommendations]
3.4 相似度计算方法¶
3.4.1 余弦相似度¶
公式:
特点: - 取值范围:[-1, 1] - 不考虑评分的绝对值,只考虑方向 - 适合处理评分尺度不一致的情况
3.4.2 皮尔逊相关系数¶
公式:
特点: - 取值范围:[-1, 1] - 考虑评分的均值,消除评分偏差 - 适合处理用户评分尺度不一致的情况
3.4.3 Jaccard相似度¶
公式:
特点: - 取值范围:[0, 1] - 只考虑交互与否,不考虑评分值 - 适合处理隐式反馈数据
def jaccard_similarity(user1, user2):
"""
计算Jaccard相似度
"""
items1 = set(user1.keys())
items2 = set(user2.keys())
intersection = items1 & items2
union = items1 | items2
if len(union) == 0:
return 0
return len(intersection) / len(union)
3.5 实战案例¶
案例:电影推荐系统¶
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
# 1. 加载数据(MovieLens数据集)
ratings = pd.read_csv('ml-100k/u.data', sep='\t',
names=['user_id', 'item_id', 'rating', 'timestamp'])
# 2. 构建用户-物品评分矩阵
train_data = defaultdict(dict)
for _, row in ratings.iterrows():
train_data[row['user_id']][row['item_id']] = row['rating']
# 3. 划分训练集和测试集
train_users, test_users = train_test_split(list(train_data.keys()),
test_size=0.2,
random_state=42)
train_set = {user: train_data[user] for user in train_users}
test_set = {user: train_data[user] for user in test_users}
# 4. 训练模型
model = UserBasedCF(k=20, similarity='pearson')
model.fit(train_set)
# 5. 评估模型
def evaluate(model, test_set):
"""
评估模型
"""
predictions = []
actuals = []
for user_id in test_set:
for item_id, actual_rating in test_set[user_id].items():
# 预测评分
similar_users = model.user_similarities.get(user_id, {})
top_k_users = sorted(similar_users.items(),
key=lambda x: x[1],
reverse=True)[:model.k]
predicted_rating = 0
total_weight = 0
for similar_user, similarity in top_k_users:
if item_id in model.train_data[similar_user]:
predicted_rating += similarity * model.train_data[similar_user][item_id]
total_weight += abs(similarity)
if total_weight > 0:
predicted_rating /= total_weight
predictions.append(predicted_rating)
actuals.append(actual_rating)
rmse = np.sqrt(mean_squared_error(actuals, predictions))
return rmse
rmse = evaluate(model, test_set)
print(f"RMSE: {rmse:.4f}")
# 6. 推荐示例
user_id = 1
recommendations = model.recommend(user_id, n=10)
print(f"为用户{user_id}推荐的物品: {recommendations}")
📝 本章小结¶
本章介绍了协同过滤算法的核心内容:
- ✅ 协同过滤的基本原理
- ✅ 基于用户的协同过滤
- ✅ 基于物品的协同过滤
- ✅ 相似度计算方法
- ✅ 实战案例
通过本章学习,你应该能够: - 理解协同过滤的工作原理 - 实现User-based和Item-based CF - 选择合适的相似度计算方法 - 应用协同过滤解决实际问题
🔗 下一步¶
下一章我们将学习基于内容的推荐,了解如何利用物品的内容特征进行推荐。
继续学习: 04-基于内容的推荐.md
💡 思考题¶
-
User-based CF和Item-based CF各有什么优缺点?
UserCF:能发现意外兴趣、社交属性强,但用户量大时计算慢、新用户冷启动严重。ItemCF:可解释性强("因为你看了X")、物品相似度稳定、适合物品变化慢的场景,但多样性不足。实践:UserCF适合社交/新闻,ItemCF适合电商/影视。
-
如何选择合适的相似度计算方法?
余弦相似度:最常用,不受向量长度影响。皮尔逊相关:考虑均值偏差。Jaccard:适合二值数据(点击/未点击)。欧氏距离:对尺度敏感,需归一化。实践原则:评分数据用皮尔逊,点击行为用余弦,二值行为用Jaccard。也可直接用学习到的Embedding计算余弦相似度。
-
协同过滤如何解决冷启动问题?
纯协同过滤无法解决,需补充策略:①新用户→热门/人口统计推荐→新注册引导(选兴趣标签) ②新物品→基于内容特征推荐→运营人工导流量 ③混合方法:协同过滤+内容推荐融合 ④迁移学习(跨域推荐)。
-
如何优化协同过滤的计算效率?
①稀疏矩阵存储(CSR格式减少内存) ②只计算有共同行为的用户/物品对 ③倒排索引加速(从物品倒查共同用户) ④分布式计算(Spark/MapReduce) ⑤量化相似度+截断TopK邻居 ⑥离线计算+缓存(定时更新相似度矩阵)。ALS矩阵分解可替代直接相似度计算。
-
协同过滤在什么场景下效果最好?
①用户行为丰富(不稀疏) ②物品集相对稳定 ③不需要实时响应(可离线计算) ④惊喜性重要(社交、娱乐场景)。典型成功:Netflix奖(电影推荐)、音乐推荐(Spotify)。不适合:短生命周期内容(新闻)、隶属于长尾的物品。
📚 参考资料¶
- 《Recommender Systems Handbook》- Chapter 3
- 《推荐系统实践》- 第3章
- "Item-based Collaborative Filtering Recommendation Algorithms" - Sarwar et al.
- "Empirical Analysis of Predictive Algorithms for Collaborative Filtering" - Breese et al.
- Surprise Library Documentation
