跳转至

矩阵分解技术

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

矩阵分解技术

📖 章节导读

矩阵分解(Matrix Factorization)是推荐系统中最重要和最成功的技术之一,它通过将用户-物品评分矩阵分解为两个低维矩阵来学习用户和物品的隐式特征。本章将深入介绍矩阵分解的原理、算法和实现。

🎯 学习目标

  • 理解矩阵分解的基本原理
  • 掌握SVD分解技术
  • 掌握ALS算法
  • 理解隐语义模型
  • 能够实现矩阵分解推荐算法

5.1 矩阵分解概述

5.1.1 基本思想

矩阵分解的核心思想是:将高维稀疏的用户-物品评分矩阵分解为两个低维稠密矩阵

数学表示:

Text Only
R ≈ U × V^T
其中: - R: m×n的用户-物品评分矩阵 - U: m×k的用户特征矩阵 - V: n×k的物品特征矩阵 - k: 隐特征维度(k << m, n)

5.1.2 优势

  1. 降维:将高维稀疏矩阵降维到低维稠密空间
  2. 泛化能力:可以预测未观测到的评分
  3. 可解释性:隐特征可以理解为潜在的兴趣或属性
  4. 计算效率:分解后计算复杂度降低

5.2 SVD分解

5.2.1 传统SVD

传统SVD(Singular Value Decomposition)将矩阵分解为:

Text Only
R = U × Σ × V^T

问题: - 传统SVD要求数据矩阵是稠密的 - 推荐系统的评分矩阵通常是稀疏的 - 缺失值难以处理

5.2.2 Funk SVD

Funk SVD(由Simon Funk提出,注意与SVD++是不同的扩展算法)通过优化目标函数来学习U和V:

目标函数:

Text Only
min Σ (rui - u_i^T v_j)^2 + λ(||u_i||^2 + ||v_j||^2)

梯度下降更新:

Python
def sgd_step(u, v, r, learning_rate=0.01, reg=0.01):
    """
    SGD更新步骤
    """
    # 预测误差
    error = r - np.dot(u, v)  # np.dot矩阵/向量点乘

    # 计算梯度
    grad_u = -error * v + reg * u
    grad_v = -error * u + reg * v

    # 更新参数
    u -= learning_rate * grad_u
    v -= learning_rate * grad_v

    return u, v

5.2.3 完整实现

Python
import numpy as np
from collections import defaultdict

class SVDRecommender:
    def __init__(self, n_factors=10, learning_rate=0.01, reg=0.01, n_epochs=100):
        self.n_factors = n_factors
        self.learning_rate = learning_rate
        self.reg = reg
        self.n_epochs = n_epochs
        self.user_factors = None
        self.item_factors = None
        self.global_mean = None

    def fit(self, ratings):
        """
        训练模型
        ratings: list of tuples (user_id, item_id, rating)
        """
        # 获取用户和物品数量
        users = set(r[0] for r in ratings)
        items = set(r[1] for r in ratings)
        n_users = len(users)
        n_items = len(items)

        # 创建用户和物品的索引映射
        self.user_index = {user: idx for idx, user in enumerate(users)}  # enumerate同时获取索引和元素
        self.item_index = {item: idx for idx, item in enumerate(items)}
        self.index_user = {idx: user for user, idx in self.user_index.items()}
        self.index_item = {idx: item for item, idx in self.item_index.items()}

        # 计算全局平均分
        self.global_mean = np.mean([r[2] for r in ratings])

        # 初始化用户和物品因子
        self.user_factors = np.random.normal(0, 0.1, (n_users, self.n_factors))
        self.item_factors = np.random.normal(0, 0.1, (n_items, self.n_factors))

        # 训练
        for epoch in range(self.n_epochs):
            total_error = 0
            for user_id, item_id, rating in ratings:
                user_idx = self.user_index[user_id]
                item_idx = self.item_index[item_id]

                # 预测评分(标准Funk SVD:r̂ = μ + p_u·q_i)
                pred = np.dot(self.user_factors[user_idx],
                            self.item_factors[item_idx])

                # 计算误差(先减去全局均值进行中心化)
                error = (rating - self.global_mean) - pred
                total_error += error ** 2

                # 更新因子(注意:先保存旧值,避免更新顺序影响)
                old_user_factor = self.user_factors[user_idx].copy()
                self.user_factors[user_idx] += self.learning_rate * (
                    error * self.item_factors[item_idx] -
                    self.reg * self.user_factors[user_idx]
                )
                self.item_factors[item_idx] += self.learning_rate * (
                    error * old_user_factor -
                    self.reg * self.item_factors[item_idx]
                )

            if (epoch + 1) % 10 == 0:
                rmse = np.sqrt(total_error / len(ratings))
                print(f"Epoch {epoch + 1}, RMSE: {rmse:.4f}")

    def predict(self, user_id, item_id):
        """
        预测用户对物品的评分
        """
        if user_id not in self.user_index or item_id not in self.item_index:
            return self.global_mean

        user_idx = self.user_index[user_id]
        item_idx = self.item_index[item_id]

        # 标准Funk SVD预测:r̂ = μ + p_u·q_i
        pred = self.global_mean + np.dot(self.user_factors[user_idx],
                     self.item_factors[item_idx])

        return pred

    def recommend(self, user_id, n=10):
        """
        为用户推荐物品
        """
        if user_id not in self.user_index:
            return []

        user_idx = self.user_index[user_id]

        # 计算用户对所有物品的预测评分(需加上 global_mean 偏置)
        predictions = self.global_mean + np.dot(self.user_factors[user_idx],
                          self.item_factors.T)

        # 排序
        item_scores = list(enumerate(predictions))
        item_scores.sort(key=lambda x: x[1], reverse=True)  # lambda匿名函数

        # 返回Top N
        recommendations = [self.index_item[item_idx]
                         for item_idx, score in item_scores[:n]]

        return recommendations

5.3 ALS算法

5.3.1 算法原理

ALS(Alternating Least Squares)交替最小二乘法通过交替固定用户因子和物品因子来优化目标函数。

步骤: 1. 固定物品因子V,优化用户因子U 2. 固定用户因子U,优化物品因子V 3. 重复步骤1-2直到收敛

5.3.2 数学推导

固定V,优化U:

Text Only
min Σ (rui - u_i^T v_j)^2 + λ||u_i||^2

对u_i求导并令导数为0:

Text Only
u_i = (V^T V + λI)^(-1) V^T r_i

5.3.3 完整实现

Python
class ALSRecommender:
    def __init__(self, n_factors=10, reg=0.01, n_epochs=20):
        self.n_factors = n_factors
        self.reg = reg
        self.n_epochs = n_epochs
        self.user_factors = None
        self.item_factors = None
        self.user_index = None
        self.item_index = None

    def fit(self, ratings):
        """
        训练模型
        """
        # 获取用户和物品
        users = set(r[0] for r in ratings)
        items = set(r[1] for r in ratings)
        n_users = len(users)
        n_items = len(items)

        # 创建索引映射
        self.user_index = {user: idx for idx, user in enumerate(users)}
        self.item_index = {item: idx for idx, item in enumerate(items)}
        self.index_user = {idx: user for user, idx in self.user_index.items()}
        self.index_item = {idx: item for item, idx in self.item_index.items()}

        # 初始化因子
        self.user_factors = np.random.normal(0, 0.1, (n_users, self.n_factors))
        self.item_factors = np.random.normal(0, 0.1, (n_items, self.n_factors))

        # 构建用户-物品评分矩阵
        R = np.zeros((n_users, n_items))
        for user_id, item_id, rating in ratings:
            user_idx = self.user_index[user_id]
            item_idx = self.item_index[item_id]
            R[user_idx, item_idx] = rating

        # ALS训练
        for epoch in range(self.n_epochs):
            # 固定V,优化U
            for i in range(n_users):
                # 找到用户i评分过的物品
                rated_items = np.where(R[i] > 0)[0]
                if len(rated_items) == 0:
                    continue

                # 提取相关物品因子
                V_j = self.item_factors[rated_items]
                r_i = R[i, rated_items]

                # 计算用户因子
                A = V_j.T @ V_j + self.reg * np.eye(self.n_factors)
                b = V_j.T @ r_i
                self.user_factors[i] = np.linalg.solve(A, b)  # np.linalg线性代数运算

            # 固定U,优化V
            for j in range(n_items):
                # 找到评分过物品j的用户
                rated_users = np.where(R[:, j] > 0)[0]
                if len(rated_users) == 0:
                    continue

                # 提取相关用户因子
                U_i = self.user_factors[rated_users]
                r_j = R[rated_users, j]

                # 计算物品因子
                A = U_i.T @ U_i + self.reg * np.eye(self.n_factors)
                b = U_i.T @ r_j
                self.item_factors[j] = np.linalg.solve(A, b)

            if (epoch + 1) % 5 == 0:
                # 计算RMSE
                pred = np.dot(self.user_factors, self.item_factors.T)
                mask = R > 0
                rmse = np.sqrt(np.mean((R[mask] - pred[mask]) ** 2))
                print(f"Epoch {epoch + 1}, RMSE: {rmse:.4f}")

    def predict(self, user_id, item_id):
        """
        预测评分
        """
        if user_id not in self.user_index or item_id not in self.item_index:
            return 0

        user_idx = self.user_index[user_id]
        item_idx = self.item_index[item_id]

        return np.dot(self.user_factors[user_idx],
                     self.item_factors[item_idx])

    def recommend(self, user_id, n=10):
        """
        推荐物品
        """
        if user_id not in self.user_index:
            return []

        user_idx = self.user_index[user_id]
        predictions = np.dot(self.user_factors[user_idx],
                          self.item_factors.T)

        item_scores = list(enumerate(predictions))
        item_scores.sort(key=lambda x: x[1], reverse=True)

        return [self.index_item[item_idx]
               for item_idx, score in item_scores[:n]]

5.4 隐语义模型

5.4.1 基本概念

隐语义模型(Latent Semantic Model)假设: - 用户和物品之间存在一些隐式的语义特征 - 这些特征无法直接观测,但可以通过数据学习得到 - 例如:电影的"动作"、"喜剧"、"科幻"等隐式特征

5.4.2 模型解释

用户因子u_i: - 表示用户对各个隐式特征的偏好程度 - 例如:u_i = [0.8, 0.3, 0.9]表示用户喜欢动作和科幻,不太喜欢喜剧

物品因子v_j: - 表示物品在各个隐式特征上的表现 - 例如:v_j = [0.9, 0.2, 0.8]表示这部电影动作和科幻成分高,喜剧成分低

预测评分:

Text Only
r̂ui = u_i^T v_j
表示用户对物品的偏好程度

5.5 实战案例

案例:电影推荐

Python
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# 1. 加载数据(MovieLens)
ratings = pd.read_csv('ml-100k/u.data', sep='\t',
                     names=['user_id', 'item_id', 'rating', 'timestamp'])

# 2. 划分训练集和测试集
train, test = train_test_split(ratings, test_size=0.2, random_state=42)

# 3. 训练模型
model = SVDRecommender(n_factors=20, learning_rate=0.01,
                      reg=0.01, n_epochs=50)

train_data = [(row['user_id'], row['item_id'], row['rating'])
             for _, row in train.iterrows()]
model.fit(train_data)

# 4. 评估模型
test_data = [(row['user_id'], row['item_id'], row['rating'])
            for _, row in test.iterrows()]

predictions = []
actuals = []
for user_id, item_id, rating in test_data:
    pred = model.predict(user_id, item_id)
    predictions.append(pred)
    actuals.append(rating)

rmse = np.sqrt(mean_squared_error(actuals, predictions))
print(f"Test RMSE: {rmse:.4f}")

# 5. 推荐示例
user_id = 1
recommendations = model.recommend(user_id, n=10)
print(f"为用户{user_id}推荐的物品: {recommendations}")

📝 本章小结

本章介绍了矩阵分解技术:

  1. ✅ 矩阵分解的基本原理
  2. ✅ SVD分解技术
  3. ✅ ALS算法
  4. ✅ 隐语义模型
  5. ✅ 实战案例

通过本章学习,你应该能够: - 理解矩阵分解的数学原理 - 实现SVD和ALS算法 - 理解隐语义模型 - 应用矩阵分解解决推荐问题

🔗 下一步

下一章我们将学习深度学习推荐,了解如何使用深度神经网络提升推荐效果。

继续学习: 06-深度学习推荐.md

💡 思考题

  1. SVD和ALS各有什么优缺点?

    SVD:理论完备、精度高,但无法处理缺失值(需填充)、大规模矩阵计算代价高。ALS(Alternating Least Squares):可处理稀疏矩阵、易并行化(Spark MLlib原生支持)、支持隐式反馈,但超参调优较复杂。实践:小规模用SVD++,大规模用ALS(Spark)。

  2. 如何选择合适的隐特征维度?

    维度太小→欠拟合;太大→过拟合+计算贵。经验值:50-200维。调优方法:①交叉验证看验证集RMSE拐点 ②观察奇异值衰减曲线(前几个占能量80%即可) ③业务约束(在线出服务需要小维度控制延迟) ④用户/物品数量少时低维即可。

  3. 矩阵分解如何处理冷启动问题?

    纯矩阵分解无法处理(新用户/新物品无行为→无法分解)。解决:①辅助信息融合(SVDFeature引入特征) ②Hybrid方法(内容特征初始化Embedding) ③预训练Embedding(用属性预测初始Embedding) ④流行Bandit探索补充。现代方法:双塔模型融合内容特征替代传统矩阵分解。

  4. 如何优化矩阵分解的训练效率?

    ①SGD随机梯度下降(每次只更新一个元素) ②ALS并行化(固定一边解另一边,可并行) ③负采样(不遍历所有缺失值,采样部分作负例) ④增量更新(只对新数据微调) ⑤分布式训练(Spark ALS)。大规模场景通常用ALS+Spark,小规模用SGD即可。

  5. 矩阵分解在实际应用中有哪些改进?

    ①SVD++(融合隐式反馈) ②FM(因子分解机,支持任意特征交叉) ③时序感知(TimeSVD++,考虑兴趣漂移) ④上下文感知(融入场景特征) ⑤深度矩阵分解(MLP替代点积,如NCF/NeuMF)。现代趋势:矩阵分解思想被双塔模型+Embedding查找继承,本质相同。

📚 参考资料

  1. 《Recommender Systems Handbook》- Chapter 5
  2. "Matrix Factorization Techniques for Recommender Systems" - Koren et al.
  3. "Collaborative Filtering for Implicit Feedback Datasets" - Hu et al.
  4. "Large-scale Parallel Collaborative Filtering for the Netflix Prize" - Takács et al.
  5. Implicit Library Documentation