图神经网络推荐¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
前置知识:本章需要对深度学习推荐(第6章)和协同过滤(第3章)有基础了解。图神经网络的通用原理可参考机器学习教程中的图神经网络章节。
面试必考:图神经网络推荐是字节跳动、快手、阿里巴巴、Pinterest 等公司的高频面试主题。LightGCN、PinSage、知识图谱推荐、图对比学习是核心考察方向。
📋 学习目标¶
完成本章学习后,你应该能够:
- 理解为什么将推荐问题建模为图问题,以及图推荐相对传统方法的优势
- 掌握 GCN 核心原理:消息传递、邻域聚合、谱域与空间域视角
- 深入理解 LightGCN 的简化设计及其背后的动机
- 掌握 PinSage 在工业级十亿节点图上的扩展策略
- 了解知识图谱增强推荐(KGAT)的注意力机制
- 理解图对比学习(SGL、SimGCL)在推荐中的应用
- 能够用 PyTorch 实现 LightGCN 并在 MovieLens 上完成端到端训练与评估
- 应对图推荐相关的高频面试问题
19.1 图推荐概述¶
19.1.1 为什么将推荐建模为图问题¶
推荐系统的核心数据——用户行为、社交关系、物品属性——天然具有图结构:
传统推荐的数据视角:
┌──────────────────────────────────────┐
│ 用户-物品交互矩阵 R (m × n) │
│ R[u][i] = 1 表示用户 u 交互过物品 i │
│ 大部分元素为 0 → 极度稀疏 │
└──────────────────────────────────────┘
图推荐的数据视角:
┌──────────────────────────────────────┐
│ 用户和物品都是节点 │
│ 交互关系是边 │
│ 社交关系、属性关系也是边 │
│ → 丰富的拓扑结构 + 高阶连接信息 │
└──────────────────────────────────────┘
直觉:如果用户 A 和用户 B 都喜欢物品 X 和物品 Y,那么 A 喜欢的物品 Z 也可能被 B 喜欢——这就是高阶协同信号,图结构天然编码了这种信号。
19.1.2 推荐系统中的三类图¶
| 图类型 | 节点 | 边 | 典型场景 |
|---|---|---|---|
| 用户-物品二部图 | 用户、物品 | 交互(点击/购买/评分) | 协同过滤推荐 |
| 社交网络图 | 用户 | 关注/好友关系 | 社交推荐(微信/微博) |
| 知识图谱 | 实体(物品/属性/类别) | 关系(属于/导演/演员) | 可解释推荐 |
用户-物品二部图示例:
User1 ──── Item_A
│ ╲ │
│ ╲ │
│ ╲ │
User2 ──── Item_B
│ │
│ │
User3 ──── Item_C
高阶连接:User1 → Item_A → User2 → Item_B
→ User1 和 Item_B 之间存在 2 阶路径
→ 可以向 User1 推荐 Item_B
19.1.3 图推荐 vs 传统推荐的优势¶
| 维度 | 传统推荐(MF/DNN) | 图推荐(GNN) |
|---|---|---|
| 信号利用 | 仅直接交互(1阶) | 高阶协同信号(多阶邻居) |
| 数据稀疏 | 严重受限 | 通过消息传递缓解 |
| 冷启动 | 困难 | 可利用图结构中的间接连接 |
| 可解释性 | 黑盒 | 可通过路径/子图解释 |
| 侧信息融合 | 特征拼接 | 异质图自然融合 |
💡 面试考点:面试官常问"图推荐相比矩阵分解的核心优势是什么?"——答案的关键词是高阶协同信号(high-order collaborative signal)。矩阵分解只建模用户和物品的直接交互,而 GNN 通过多层消息传递自然捕获了多跳邻居的协同信息。
19.2 图卷积网络基础¶
19.2.1 GCN 核心原理¶
图卷积网络(Graph Convolutional Network, GCN)的核心思想是消息传递(Message Passing):
消息传递范式:
对每个节点 v:
1. 收集(Aggregate):从邻居节点 u ∈ N(v) 收集消息
2. 更新(Update):结合自身特征和邻居消息,更新节点表征
h_v^{(l+1)} = UPDATE(h_v^{(l)}, AGGREGATE({h_u^{(l)} : u ∈ N(v)}))
GCN(Kipf & Welling, 2017) 的具体形式:
其中: - \(h_v^{(l)}\) 是节点 \(v\) 在第 \(l\) 层的表征 - \(\mathcal{N}(v)\) 是节点 \(v\) 的邻居集合 - \(W^{(l)}\) 是第 \(l\) 层的可学习权重矩阵 - \(\sigma\) 是非线性激活函数(如 ReLU)
19.2.2 图信号处理视角:谱域 vs 空间域¶
GCN 可以从两个视角理解:
谱域(Spectral Domain): - 将图信号进行图傅里叶变换 - 在频域上用滤波器进行卷积 - 基于图拉普拉斯矩阵 \(L = D - A\) 的特征分解 - GCN 是切比雪夫多项式滤波器的一阶近似
空间域(Spatial Domain): - 直接在图上定义邻域聚合操作 - 每个节点从其邻居收集信息 - 更直觉、更容易推广到大规模图 - 工业界主要采用空间域方法
💡 面试考点:谱域方法需要整个图的拉普拉斯矩阵特征分解,计算复杂度 \(O(n^3)\),无法处理大规模图;空间域方法可以用 mini-batch 采样,可扩展性强。这就是为什么 PinSage(空间域)能处理十亿节点,而纯谱方法不行。
19.2.3 矩阵形式推导¶
整个图上的 GCN 传播可以写成矩阵形式:
其中归一化邻接矩阵:
推导过程:
- 原始邻接矩阵 \(A\):\(A_{ij} = 1\) 如果节点 \(i\) 和 \(j\) 之间有边
- 加自环:\(\hat{A} = A + I\)(每个节点保留自身信息)
- 度矩阵:\(\hat{D}_{ii} = \sum_j \hat{A}_{ij}\)
- 对称归一化:\(\tilde{A} = \hat{D}^{-1/2} \hat{A} \hat{D}^{-1/2}\)
对称归一化的物理含义:
度大的节点(热门物品)在传播时被适当"抑制",避免热门物品主导所有用户的表征。
19.2.4 PyTorch 实现:简单 GCN 层¶
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from scipy.sparse import coo_matrix
class GCNLayer(nn.Module): # 继承nn.Module定义网络层
"""
简单的图卷积层实现(纯 PyTorch,不依赖 PyG)
实现公式:H^{(l+1)} = σ(Ã H^{(l)} W^{(l)})
其中 Ã = D^{-1/2} A D^{-1/2}(含自环)
"""
def __init__(self, in_features: int, out_features: int, bias: bool = True):
super().__init__() # super()调用父类方法
self.weight = nn.Parameter(torch.FloatTensor(in_features, out_features))
if bias:
self.bias = nn.Parameter(torch.FloatTensor(out_features))
else:
self.bias = None
self.reset_parameters()
def reset_parameters(self):
"""Xavier 初始化"""
nn.init.xavier_uniform_(self.weight)
if self.bias is not None:
nn.init.zeros_(self.bias)
def forward(self, x: torch.Tensor, adj_norm: torch.Tensor) -> torch.Tensor:
"""
前向传播
Args:
x: 节点特征矩阵 [num_nodes, in_features]
adj_norm: 归一化邻接矩阵 Ã [num_nodes, num_nodes](稀疏张量)
Returns:
输出特征矩阵 [num_nodes, out_features]
"""
# 特征变换:H W
support = torch.mm(x, self.weight)
# 邻域聚合:Ã (H W)
output = torch.spmm(adj_norm, support)
if self.bias is not None:
output += self.bias
return output
class SimpleGCN(nn.Module):
"""
两层 GCN 模型
用于节点分类或生成节点 Embedding
"""
def __init__(self, n_features: int, n_hidden: int, n_classes: int, dropout: float = 0.5):
super().__init__()
self.gc1 = GCNLayer(n_features, n_hidden)
self.gc2 = GCNLayer(n_hidden, n_classes)
self.dropout = dropout
def forward(self, x: torch.Tensor, adj_norm: torch.Tensor) -> torch.Tensor:
"""
Args:
x: 节点特征 [num_nodes, n_features]
adj_norm: 归一化邻接矩阵
"""
x = F.relu(self.gc1(x, adj_norm)) # F.xxx PyTorch函数式API
x = F.dropout(x, self.dropout, training=self.training)
x = self.gc2(x, adj_norm)
return x
def build_normalized_adj(edge_index: np.ndarray, num_nodes: int) -> torch.Tensor:
"""
从边列表构建归一化邻接矩阵(含自环)
Args:
edge_index: 边列表 [2, num_edges],每列是 (src, dst)
num_nodes: 节点总数
Returns:
归一化后的稀疏邻接矩阵 Ã = D^{-1/2} (A + I) D^{-1/2}
"""
# 构建邻接矩阵(加自环)
row = np.concatenate([edge_index[0], edge_index[1], np.arange(num_nodes)])
col = np.concatenate([edge_index[1], edge_index[0], np.arange(num_nodes)])
data = np.ones(len(row), dtype=np.float32)
adj = coo_matrix((data, (row, col)), shape=(num_nodes, num_nodes))
# 计算度矩阵 D^{-1/2}
degree = np.array(adj.sum(axis=1)).flatten() # np.array创建NumPy数组
d_inv_sqrt = np.power(degree, -0.5)
d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.0
# 对称归一化:D^{-1/2} A D^{-1/2}
row_norm = d_inv_sqrt[adj.row]
col_norm = d_inv_sqrt[adj.col]
adj.data = adj.data * row_norm * col_norm
# 转为 PyTorch 稀疏张量
indices = torch.LongTensor(np.vstack([adj.row, adj.col]))
values = torch.FloatTensor(adj.data)
shape = torch.Size(adj.shape)
return torch.sparse_coo_tensor(indices, values, shape)
# ---- 使用示例 ----
if __name__ == "__main__":
# 构造一个简单的 5 节点图
edge_index = np.array([[0, 0, 1, 2, 3],
[1, 2, 2, 3, 4]]) # 5 条边
num_nodes = 5
# 构建归一化邻接矩阵
adj_norm = build_normalized_adj(edge_index, num_nodes)
# 随机节点特征
x = torch.randn(num_nodes, 16)
# 创建并运行 GCN
model = SimpleGCN(n_features=16, n_hidden=32, n_classes=8)
output = model(x, adj_norm)
print(f"输入形状: {x.shape}") # [5, 16]
print(f"输出形状: {output.shape}") # [5, 8]
print(f"输出: {output}")
19.3 LightGCN¶
19.3.1 从 NGCF 到 LightGCN:为什么简化¶
NGCF(Neural Graph Collaborative Filtering, 2019) 将 GCN 引入协同过滤,在每层传播时使用:
LightGCN(He et al., SIGIR 2020) 指出 NGCF 中的特征变换 \(W\) 和非线性激活 \(\sigma\) 对推荐任务反而有害:
NGCF vs LightGCN 对比:
NGCF 每层操作:
1. 特征变换(W·e) ← LightGCN 移除
2. 非线性激活(σ(·)) ← LightGCN 移除
3. 邻域聚合 ← LightGCN 保留
LightGCN 每层操作:
仅保留邻域聚合(最简形式的图卷积)
为什么移除是合理的?
- 推荐场景没有节点特征:用户和物品的初始表征只有 Embedding ID,没有真正的"特征"需要变换
- 非线性激活引入信息瓶颈:在没有丰富特征的场景下,非线性反而导致优化困难
- 实验验证:移除后效果不降反升,训练也更快
19.3.2 LightGCN 传播规则¶
单层传播:
矩阵形式:
其中 \(\tilde{A}\) 是用户-物品二部图的归一化邻接矩阵(不含自环):
19.3.3 层组合(Layer Combination)¶
LightGCN 的另一个关键设计是层组合——将各层的 Embedding 加权求和作为最终表征:
- \(e_u^{(0)}\) 是用户的初始 Embedding(第 0 层)
- \(e_u^{(k)}\) 是经过 \(k\) 层传播后的表征
- \(\alpha_k\) 是每层的权重,论文中默认 \(\alpha_k = \frac{1}{K+1}\)(均匀权重)
层组合的意义: - 第 0 层:初始 Embedding,捕获用户/物品自身信息 - 第 1 层:直接邻居信息(1 阶协同信号) - 第 2 层:2 跳邻居信息 - 第 K 层:K 阶协同信号 - 加权组合 = 多尺度信息融合
19.3.4 BPR Loss¶
LightGCN 使用 BPR(Bayesian Personalized Ranking)Loss 进行训练:
其中: - \((u, i, j)\):用户 \(u\)、正样本 \(i\)(交互过)、负样本 \(j\)(未交互) - \(\hat{y}_{ui} = e_u^T e_i\):预测分数 - \(\sigma\):sigmoid 函数 - \(\lambda \|\Theta\|^2\):L2 正则化(只对初始 Embedding \(e^{(0)}\) 进行正则化)
19.3.5 完整 PyTorch 实现¶
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from scipy.sparse import coo_matrix, eye as sp_eye
from collections import defaultdict
import random
import time
class LightGCN(nn.Module):
"""
LightGCN 推荐模型(纯 PyTorch 实现,不依赖 PyG)
论文:LightGCN: Simplifying and Powering Graph Convolution Network
for Recommendation (SIGIR 2020)
核心特点:
1. 移除了 GCN 中的特征变换和非线性激活
2. 使用层组合(Layer Combination)融合多层信息
3. 仅需学习初始 Embedding
"""
def __init__(self, num_users: int, num_items: int, embedding_dim: int = 64,
n_layers: int = 3, reg_weight: float = 1e-4):
super().__init__()
self.num_users = num_users
self.num_items = num_items
self.n_layers = n_layers
self.reg_weight = reg_weight
# 初始 Embedding(唯一需要学习的参数)
self.user_embedding = nn.Embedding(num_users, embedding_dim)
self.item_embedding = nn.Embedding(num_items, embedding_dim)
# Xavier 初始化
nn.init.xavier_uniform_(self.user_embedding.weight)
nn.init.xavier_uniform_(self.item_embedding.weight)
# 归一化邻接矩阵(在 set_graph 中设置)
self.adj_norm = None
def set_graph(self, user_item_pairs: np.ndarray):
"""
构建并设置归一化邻接矩阵
Args:
user_item_pairs: [N, 2] 数组,每行是 (user_id, item_id)
"""
n_nodes = self.num_users + self.num_items
# 用户-物品二部图邻接矩阵
# 用户节点编号: 0 ~ num_users-1
# 物品节点编号: num_users ~ num_users+num_items-1
users = user_item_pairs[:, 0]
items = user_item_pairs[:, 1] + self.num_users # 物品编号偏移
# 构建对称邻接矩阵 (无自环)
row = np.concatenate([users, items])
col = np.concatenate([items, users])
data = np.ones(len(row), dtype=np.float32)
adj = coo_matrix((data, (row, col)), shape=(n_nodes, n_nodes))
# D^{-1/2} A D^{-1/2} 归一化
degree = np.array(adj.sum(axis=1)).flatten()
d_inv_sqrt = np.power(degree, -0.5)
d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.0
adj.data = adj.data * d_inv_sqrt[adj.row] * d_inv_sqrt[adj.col]
# 转为 PyTorch 稀疏张量
indices = torch.LongTensor(np.vstack([adj.row, adj.col]))
values = torch.FloatTensor(adj.data)
self.adj_norm = torch.sparse_coo_tensor(indices, values,
torch.Size([n_nodes, n_nodes]))
def compute_all_embeddings(self) -> tuple:
"""
LightGCN 的核心:多层图卷积 + 层组合
Returns:
(user_embeddings, item_embeddings) 最终表征
"""
# 拼接用户和物品的初始 Embedding
all_emb = torch.cat([self.user_embedding.weight, # torch.cat沿已有维度拼接张量
self.item_embedding.weight], dim=0) # [n_nodes, dim]
emb_list = [all_emb] # 保存每层的 Embedding
# 多层图卷积(无特征变换,无激活)
for layer in range(self.n_layers):
all_emb = torch.spmm(self.adj_norm, all_emb) # Ã · E^{(l)}
emb_list.append(all_emb)
# 层组合:各层均匀加权
all_emb = torch.stack(emb_list, dim=0).mean(dim=0) # [n_nodes, dim] # torch.stack沿新维度拼接张量
user_emb = all_emb[:self.num_users]
item_emb = all_emb[self.num_users:]
return user_emb, item_emb
def bpr_loss(self, users: torch.Tensor, pos_items: torch.Tensor,
neg_items: torch.Tensor) -> torch.Tensor:
"""
BPR 损失函数
Args:
users: 用户 ID [batch_size]
pos_items: 正样本物品 ID [batch_size]
neg_items: 负样本物品 ID [batch_size]
Returns:
BPR loss + L2 正则化
"""
user_emb, item_emb = self.compute_all_embeddings()
u_emb = user_emb[users] # [batch, dim]
pos_emb = item_emb[pos_items] # [batch, dim]
neg_emb = item_emb[neg_items] # [batch, dim]
# BPR Loss
pos_scores = (u_emb * pos_emb).sum(dim=1) # [batch]
neg_scores = (u_emb * neg_emb).sum(dim=1) # [batch]
bpr_loss = -F.logsigmoid(pos_scores - neg_scores).mean()
# L2 正则化(只对初始 Embedding 正则化)
u_emb_0 = self.user_embedding(users)
pos_emb_0 = self.item_embedding(pos_items)
neg_emb_0 = self.item_embedding(neg_items)
reg_loss = self.reg_weight * (
u_emb_0.norm(2).pow(2) +
pos_emb_0.norm(2).pow(2) +
neg_emb_0.norm(2).pow(2)
) / users.shape[0]
return bpr_loss + reg_loss
def predict(self, users: torch.Tensor) -> torch.Tensor:
"""
为指定用户生成所有物品的预测分数
Args:
users: 用户 ID [batch_size]
Returns:
预测分数矩阵 [batch_size, num_items]
"""
user_emb, item_emb = self.compute_all_embeddings()
u_emb = user_emb[users] # [batch, dim]
scores = torch.mm(u_emb, item_emb.T) # [batch, num_items]
return scores
def sample_negative(user_item_dict: dict, num_items: int, user: int) -> int:
"""随机负采样:采一个用户未交互过的物品"""
while True:
neg = random.randint(0, num_items - 1)
if neg not in user_item_dict[user]:
return neg
def train_lightgcn(model: LightGCN, train_pairs: np.ndarray,
n_epochs: int = 50, batch_size: int = 1024, lr: float = 1e-3):
"""
LightGCN 训练循环
Args:
model: LightGCN 模型
train_pairs: [N, 2] 训练交互对 (user, item)
n_epochs: 训练轮数
batch_size: 批大小
lr: 学习率
"""
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
# 构建用户交互字典(用于负采样)
user_item_dict = defaultdict(set) # defaultdict访问不存在的键时返回默认值
for u, i in train_pairs:
user_item_dict[u].add(i)
model.train() # train()训练模式
for epoch in range(n_epochs):
# 随机打乱训练数据
perm = np.random.permutation(len(train_pairs))
total_loss = 0.0
n_batches = 0
for start in range(0, len(train_pairs), batch_size):
end = min(start + batch_size, len(train_pairs))
batch_idx = perm[start:end]
batch_pairs = train_pairs[batch_idx]
users = torch.LongTensor(batch_pairs[:, 0])
pos_items = torch.LongTensor(batch_pairs[:, 1])
neg_items = torch.LongTensor([
sample_negative(user_item_dict, model.num_items, u)
for u in batch_pairs[:, 0]
])
loss = model.bpr_loss(users, pos_items, neg_items)
optimizer.zero_grad() # 清零梯度
loss.backward() # 反向传播计算梯度
optimizer.step() # 更新参数
total_loss += loss.item() # 将单元素张量转为Python数值
n_batches += 1
if (epoch + 1) % 10 == 0:
avg_loss = total_loss / n_batches
print(f"Epoch {epoch+1}/{n_epochs}, Loss: {avg_loss:.4f}")
# ---- 快速验证 ----
if __name__ == "__main__":
# 模拟小规模数据
num_users, num_items = 100, 200
n_interactions = 1000
train_pairs = np.column_stack([
np.random.randint(0, num_users, n_interactions),
np.random.randint(0, num_items, n_interactions)
])
model = LightGCN(num_users, num_items, embedding_dim=64, n_layers=3)
model.set_graph(train_pairs)
print("开始训练 LightGCN...")
train_lightgcn(model, train_pairs, n_epochs=50, batch_size=256)
# 预测
model.eval()
with torch.no_grad(): # 禁用梯度计算,节省内存
test_users = torch.LongTensor([0, 1, 2])
scores = model.predict(test_users)
print(f"\n预测分数矩阵形状: {scores.shape}") # [3, 200]
# 每个用户的 Top-5 推荐
_, top_items = scores.topk(5, dim=1)
for idx, uid in enumerate(test_users.tolist()): # enumerate同时获取索引和元素
print(f"用户 {uid} 的 Top-5 推荐: {top_items[idx].tolist()}")
💡 面试考点:
Q:LightGCN 为什么比 NGCF 效果好? A:NGCF 在每层使用了特征变换 \(W\) 和非线性激活 \(\sigma\),但在推荐场景中用户/物品没有丰富的初始特征(只有 ID Embedding),特征变换反而引入了不必要的参数和优化困难。LightGCN 移除这些组件后:(1) 模型更简洁,过拟合风险更低;(2) 训练更稳定、更快;(3) 邻域聚合本身就足以学到高质量的协同信号。
Q:LightGCN 的 over-smoothing 问题如何理解? A:随着 GCN 层数增加,所有节点的表征趋于相同(过度平滑)。对于 LightGCN,层组合机制缓解了这一问题——即使高层表征趋于平坦,低层表征仍保留了局部信息,最终结果是多层的加权组合。实践中通常使用 2-3 层。
19.4 PinSage(工业级图推荐)¶
19.4.1 Pinterest 的图推荐系统¶
PinSage(Ying et al., KDD 2018) 是 Pinterest 设计的工业级图推荐系统,用于为30 亿节点、180 亿边的用户-Pin 图生成高质量的 Pin Embedding。
Pinterest 的推荐场景:
用户浏览 → Pin(图片) → Board(收藏夹)
构成二部图:Pin ←→ Board
- 一个 Board 包含多个 Pin
- 一个 Pin 可以出现在多个 Board 中
- 通过 Board 连接的 Pin 具有语义相似性
目标:为每个 Pin 生成 Embedding → 用于相关推荐
19.4.2 PinSage 的核心创新¶
传统 GCN 需要对整个图进行全图卷积,在十亿节点级别完全不可行。PinSage 的解决方案:
1. 随机游走采样邻居
不使用全部邻居,而是通过随机游走选择最重要的邻居:
全图卷积 vs 随机游走采样:
全图卷积(GCN):
节点 v 的邻居 = N(v)(所有直接邻居)
→ 热门节点可能有百万邻居 → 内存爆炸
PinSage 随机游走采样:
1. 从节点 v 出发,进行多次随机游走
2. 统计被访问到的节点的频次
3. 选择频次最高的 Top-T 个节点作为邻居
→ 邻居数量固定为 T → 内存可控
→ 被多次访问的节点更重要 → 信息更丰富
2. Importance Pooling
基于随机游走频次的加权聚合:
其中 \(c(u)\) 是节点 \(u\) 在随机游走中被访问的频次(归一化后),\(\mathcal{S}(v)\) 是采样后的邻居集合。
3. MapReduce 图采样策略
Producer-Consumer 训练架构:
CPU 进程(Producer):
├── 对每个 mini-batch 的节点,执行随机游走
├── 采样 K 层邻居(构建计算子图)
└── 将子图放入队列
GPU 进程(Consumer):
├── 从队列获取子图
├── 在子图上执行 GCN 前向和反向传播
└── 更新模型参数
→ CPU 和 GPU 流水线并行,消除 IO 瓶颈
19.4.3 PinSage 工业部署经验¶
| 挑战 | 解决方案 |
|---|---|
| 30 亿节点无法全部存入内存 | MapReduce 分布式聚合,每次仅处理子图 |
| 热门 Pin 邻居数量极大 | 随机游走采样固定 T 个邻居 |
| 训练效率低 | CPU 采样 + GPU 计算流水线 |
| 负样本质量差 | Hard Negative Mining:用上一轮 Embedding 检索相似但未交互的物品 |
| 增量更新 | 新增 Pin 只需在其子图上做局部推理,无需全图重训 |
19.4.4 PinSage 的简化 PyTorch 实现思路¶
import torch
import torch.nn as nn
import random
from collections import Counter
class PinSageLayer(nn.Module):
"""
PinSage 单层聚合(简化版本)
核心:随机游走采样 + Importance Pooling
"""
def __init__(self, in_dim: int, out_dim: int):
super().__init__()
self.linear = nn.Linear(in_dim, out_dim)
self.relu = nn.ReLU()
def random_walk_sample(self, graph: dict, node: int,
walk_length: int = 10, num_walks: int = 5,
top_k: int = 15) -> list:
"""
随机游走采样邻居
Args:
graph: 邻接表 {node: [neighbors]}
node: 起始节点
walk_length: 游走长度
num_walks: 游走次数
top_k: 选取的邻居数量
Returns:
[(neighbor, visit_count), ...] 按频次排序的 Top-K 邻居
"""
visit_count = Counter() # Counter统计元素出现次数
for _ in range(num_walks):
current = node
for _ in range(walk_length):
neighbors = graph.get(current, [])
if not neighbors:
break
current = random.choice(neighbors)
if current != node:
visit_count[current] += 1
# 返回频次最高的 Top-K 个邻居
return visit_count.most_common(top_k)
def forward(self, node_embs: torch.Tensor, neighbor_info: list) -> torch.Tensor:
"""
Importance Pooling 聚合
Args:
node_embs: 所有节点的 Embedding [num_nodes, dim]
neighbor_info: 每个目标节点的邻居信息
[[(neighbor_id, count), ...], ...]
Returns:
更新后的节点 Embedding [num_target_nodes, out_dim]
"""
outputs = []
for neighbors_with_counts in neighbor_info:
if not neighbors_with_counts:
# 无邻居,使用零向量
outputs.append(torch.zeros(node_embs.shape[1]))
continue
ids, counts = zip(*neighbors_with_counts) # zip按位置配对
ids = list(ids)
# 归一化频次作为权重
weights = torch.FloatTensor(counts)
weights = weights / weights.sum()
# 加权聚合邻居 Embedding
neighbor_emb = node_embs[ids] # [K, dim]
pooled = (weights.unsqueeze(1) * neighbor_emb).sum(dim=0) # [dim] # unsqueeze增加一个维度
outputs.append(pooled)
stacked = torch.stack(outputs, dim=0) # [batch, dim]
return self.relu(self.linear(stacked))
# ---- 使用示例 ----
if __name__ == "__main__":
num_nodes = 1000
dim = 64
# 模拟邻接表
graph = {i: random.sample(range(num_nodes), min(20, num_nodes))
for i in range(num_nodes)}
# 初始 Embedding
node_embs = torch.randn(num_nodes, dim)
# 对 batch 中的节点进行随机游走采样
layer = PinSageLayer(dim, dim)
target_nodes = [0, 1, 2, 3, 4]
neighbor_info = [
layer.random_walk_sample(graph, node, walk_length=10,
num_walks=5, top_k=15)
for node in target_nodes
]
updated = layer(node_embs, neighbor_info)
print(f"更新后 Embedding 形状: {updated.shape}") # [5, 64]
💡 面试考点:
Q:PinSage 如何解决 GCN 的扩展性问题? A:三个关键技术:(1) 随机游走采样替代全邻居聚合,将邻居数量从百万级固定为 Top-T 个,内存可控;(2) mini-batch 训练,每次只构建子图而非全图计算;(3) CPU-GPU 流水线,CPU 负责图采样,GPU 负责模型计算,并行执行消除 IO 瓶颈。此外,Hard Negative Mining 提升了训练质量。
Q:随机游走采样 vs 均匀采样邻居,哪个更好?为什么? A:随机游走采样更好。均匀采样随机选择邻居,高重要性和低重要性的邻居被平等对待。随机游走则隐式地衡量了邻居的"可达性"——被多次访问的邻居与目标节点的关系更强,这相当于一种 importance sampling。
19.5 知识图谱增强推荐¶
19.5.1 知识图谱在推荐中的作用¶
知识图谱(Knowledge Graph, KG) 提供了物品的结构化属性信息:
知识图谱示例(电影推荐):
《肖申克的救赎》──导演──→ 弗兰克·德拉邦特
──类型──→ 犯罪/剧情
──演员──→ 蒂姆·罗宾斯
──年份──→ 1994
《绿里奇迹》 ──导演──→ 弗兰克·德拉邦特 ← 同一导演!
──类型──→ 犯罪/剧情 ← 同一类型!
──演员──→ 汤姆·汉克斯
→ 通过 KG,系统可以推理:
喜欢《肖申克》 → 可能喜欢同导演/同类型的《绿里》
这比纯协同过滤多了"为什么推荐"的解释能力
KG 在推荐中的三大价值:
| 价值 | 说明 |
|---|---|
| 可解释性 | "因为你看过同导演的 XX 电影" |
| 冷启动缓解 | 新物品没有交互,但有 KG 属性 |
| 语义丰富 | 捕获物品之间的深层语义关系 |
19.5.2 KGAT:Knowledge-Graph Attention Network¶
KGAT(Wang et al., KDD 2019) 将用户-物品交互图和知识图谱统一为一个协作知识图(CKG),并使用注意力机制学习邻居的重要性。
CKG 构建:
协作知识图(CKG):
User_A ──interact──→ 电影_1 ──directed_by──→ 导演_X
│ │
interact genre_of
│ │
▼ ▼
电影_2 ──acted_by──→ 演员_Y ──acted_in──→ 电影_3
KGAT 的注意力聚合:
其中注意力权重:
关键点:不同关系类型 \(r\) 有不同的变换矩阵 \(W_r\)。
19.5.3 简化 KGAT PyTorch 实现¶
import torch
import torch.nn as nn
import torch.nn.functional as F
class KGATLayer(nn.Module):
"""
KGAT 注意力聚合层(简化版)
对每个节点,根据不同关系类型计算邻居的注意力权重,
加权聚合邻居信息。
"""
def __init__(self, emb_dim: int, n_relations: int, dropout: float = 0.1):
super().__init__()
self.emb_dim = emb_dim
# 每种关系一个变换矩阵
self.W_r = nn.Parameter(torch.FloatTensor(n_relations, emb_dim, emb_dim))
nn.init.xavier_uniform_(self.W_r.view(-1, emb_dim)) # 重塑张量形状
# 注意力计算
self.leaky_relu = nn.LeakyReLU(0.2)
self.dropout = nn.Dropout(dropout)
def compute_attention(self, head_emb: torch.Tensor, tail_emb: torch.Tensor,
relation_ids: torch.Tensor) -> torch.Tensor:
"""
计算注意力分数
Args:
head_emb: 头实体 Embedding [num_edges, dim]
tail_emb: 尾实体 Embedding [num_edges, dim]
relation_ids: 关系类型 ID [num_edges]
Returns:
注意力分数 [num_edges]
"""
# 获取对应关系的变换矩阵
W = self.W_r[relation_ids] # [num_edges, dim, dim]
# 变换尾实体 Embedding
tail_transformed = torch.bmm(W, tail_emb.unsqueeze(2)).squeeze(2) # [num_edges, dim] # squeeze压缩维度
# 计算注意力分数
scores = (head_emb * tail_transformed).sum(dim=1) # [num_edges]
scores = self.leaky_relu(scores)
return scores
def forward(self, entity_emb: torch.Tensor, edge_index: torch.Tensor,
relation_ids: torch.Tensor) -> torch.Tensor:
"""
前向传播
Args:
entity_emb: 实体 Embedding [num_entities, dim]
edge_index: 边索引 [2, num_edges],第 0 行是头实体,第 1 行是尾实体
relation_ids: 关系类型 [num_edges]
Returns:
更新后的实体 Embedding [num_entities, dim]
"""
heads, tails = edge_index[0], edge_index[1]
head_emb = entity_emb[heads] # [num_edges, dim]
tail_emb = entity_emb[tails] # [num_edges, dim]
# 计算注意力分数
attn_scores = self.compute_attention(head_emb, tail_emb, relation_ids)
# 按头实体分组做 softmax(使用 scatter 实现)
num_entities = entity_emb.shape[0]
attn_weights = torch.zeros(heads.shape[0], device=entity_emb.device)
# 简化的 softmax:对每个头节点的所有边做归一化
max_scores = torch.zeros(num_entities, device=entity_emb.device)
max_scores.scatter_reduce_(0, heads, attn_scores, reduce='amax', include_self=True)
exp_scores = torch.exp(attn_scores - max_scores[heads])
sum_exp = torch.zeros(num_entities, device=entity_emb.device)
sum_exp.scatter_add_(0, heads, exp_scores)
attn_weights = exp_scores / (sum_exp[heads] + 1e-8)
attn_weights = self.dropout(attn_weights)
# 获取对应关系的变换矩阵,变换尾实体
W = self.W_r[relation_ids]
tail_transformed = torch.bmm(W, tail_emb.unsqueeze(2)).squeeze(2)
# 加权聚合
weighted = attn_weights.unsqueeze(1) * tail_transformed # [num_edges, dim]
# 聚合到头实体
output = torch.zeros(num_entities, self.emb_dim, device=entity_emb.device)
output.scatter_add_(0, heads.unsqueeze(1).expand_as(weighted), weighted)
return F.elu(output)
class KGAT(nn.Module):
"""
简化的 KGAT 模型
"""
def __init__(self, num_entities: int, num_relations: int,
emb_dim: int = 64, n_layers: int = 2):
super().__init__()
self.entity_embedding = nn.Embedding(num_entities, emb_dim)
nn.init.xavier_uniform_(self.entity_embedding.weight)
self.layers = nn.ModuleList([
KGATLayer(emb_dim, num_relations) for _ in range(n_layers)
])
def forward(self, edge_index: torch.Tensor,
relation_ids: torch.Tensor) -> torch.Tensor:
"""
Returns:
最终实体 Embedding [num_entities, dim]
"""
x = self.entity_embedding.weight
emb_list = [x]
for layer in self.layers:
x = layer(x, edge_index, relation_ids)
emb_list.append(x)
# 层组合
return torch.stack(emb_list, dim=0).mean(dim=0)
# ---- 使用示例 ----
if __name__ == "__main__":
num_entities = 500 # 用户 + 物品 + KG 实体
num_relations = 10 # 关系类型数
num_edges = 2000
# 随机生成知识图谱边
edge_index = torch.stack([
torch.randint(0, num_entities, (num_edges,)),
torch.randint(0, num_entities, (num_edges,))
])
relation_ids = torch.randint(0, num_relations, (num_edges,))
model = KGAT(num_entities, num_relations, emb_dim=64, n_layers=2)
entity_embs = model(edge_index, relation_ids)
print(f"实体 Embedding 形状: {entity_embs.shape}") # [500, 64]
💡 面试考点:
Q:KG 推荐的优势和局限是什么? A:优势:(1) 提供可解释性——可以用路径解释推荐原因;(2) 缓解冷启动——新物品可通过 KG 属性与老物品关联;(3) 引入领域知识——让推荐更准确。局限:(1) KG 构建和维护成本高;(2) KG 质量直接影响推荐质量(噪声、不完整);(3) 计算复杂度较高。
19.6 图对比学习推荐¶
19.6.1 为什么需要图对比学习¶
传统 GNN 推荐存在的问题:
GNN 推荐的监督信号问题:
1. 数据稀疏:用户-物品交互极度稀疏(通常 < 1%)
→ 监督信号不足 → Embedding 质量差
2. 长尾分布:少数热门物品占大部分交互
→ 冷门物品的 Embedding 得不到充分训练
3. 噪声交互:用户点击不代表真正喜欢
→ 带噪训练信号
对比学习的思路:
引入自监督信号 → 增强 Embedding 的表征能力 → 弥补监督信号不足
19.6.2 SGL:Self-supervised Graph Learning¶
SGL(Wu et al., SIGIR 2021) 将图数据增强与对比学习引入推荐:
三种图增强策略:
- Node Dropout (ND):随机丢弃节点
- Edge Dropout (ED):随机丢弃边
- Random Walk (RW):随机游走生成子图
SGL 训练框架:
原始图 G
├── 增强视图 1(G')──→ GCN ──→ Embedding z'
├── 增强视图 2(G'')──→ GCN ──→ Embedding z''
└── 原始图 ──→ GCN ──→ Embedding z(用于推荐任务)
对比学习目标:
同一节点在 G' 和 G'' 中的表征应相似(正样本对)
不同节点的表征应不同(负样本对)
总损失 = BPR Loss + λ · InfoNCE Loss
InfoNCE Loss:
其中 \(\tau\) 是温度参数。
19.6.3 SimGCL:更简单的图对比学习¶
SimGCL(Yu et al., SIGIR 2022) 提出一个关键发现:不需要图结构增强,只需在 Embedding 上加噪声即可。
SGL vs SimGCL:
SGL:
原始图 → 图增强(丢边/丢节点) → 两个增强图 → GCN → 对比学习
问题:图增强改变了图结构,可能丢失重要信息
SimGCL:
原始图 → GCN → Embedding → 加随机噪声 → 两个扰动表征 → 对比学习
优势:不修改图结构,计算更高效,效果更好
SimGCL 的噪声扰动:
\(\epsilon\) 控制扰动幅度。
19.6.4 代码示例:图对比学习训练框架¶
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
class GraphContrastiveLearning(nn.Module):
"""
图对比学习推荐框架
支持两种模式:
1. SGL 风格:通过边丢弃进行图增强
2. SimGCL 风格:直接在 Embedding 上加噪声
"""
def __init__(self, num_users: int, num_items: int, emb_dim: int = 64,
n_layers: int = 3, temperature: float = 0.2,
cl_weight: float = 0.1, noise_eps: float = 0.1,
mode: str = 'simgcl'):
"""
Args:
mode: 'sgl' 或 'simgcl'
"""
super().__init__()
self.num_users = num_users
self.num_items = num_items
self.n_layers = n_layers
self.temperature = temperature
self.cl_weight = cl_weight
self.noise_eps = noise_eps
self.mode = mode
self.user_embedding = nn.Embedding(num_users, emb_dim)
self.item_embedding = nn.Embedding(num_items, emb_dim)
nn.init.xavier_uniform_(self.user_embedding.weight)
nn.init.xavier_uniform_(self.item_embedding.weight)
self.adj_norm = None # 原始归一化邻接矩阵
def set_graph(self, adj_norm: torch.Tensor):
"""设置图的邻接矩阵"""
self.adj_norm = adj_norm
def lightgcn_propagate(self, adj: torch.Tensor,
add_noise: bool = False) -> tuple:
"""
LightGCN 风格的图传播
Args:
adj: 邻接矩阵
add_noise: 是否在每层传播后添加噪声(SimGCL)
"""
all_emb = torch.cat([self.user_embedding.weight,
self.item_embedding.weight], dim=0)
emb_list = [all_emb]
for _ in range(self.n_layers):
all_emb = torch.spmm(adj, all_emb)
if add_noise:
# SimGCL:添加均匀分布的归一化噪声
noise = torch.rand_like(all_emb)
noise = F.normalize(noise, dim=1) * self.noise_eps
all_emb = all_emb + noise
emb_list.append(all_emb)
final_emb = torch.stack(emb_list, dim=0).mean(dim=0)
user_emb = final_emb[:self.num_users]
item_emb = final_emb[self.num_users:]
return user_emb, item_emb
def info_nce_loss(self, emb1: torch.Tensor, emb2: torch.Tensor,
indices: torch.Tensor) -> torch.Tensor:
"""
InfoNCE 对比损失
Args:
emb1: 视图 1 的 Embedding [N, dim]
emb2: 视图 2 的 Embedding [N, dim]
indices: 目标节点索引 [batch_size]
Returns:
InfoNCE loss
"""
z1 = F.normalize(emb1[indices], dim=1) # [batch, dim]
z2 = F.normalize(emb2[indices], dim=1) # [batch, dim]
# 正样本相似度
pos_sim = (z1 * z2).sum(dim=1) / self.temperature # [batch]
# 所有节点作为负样本
all_z2 = F.normalize(emb2, dim=1) # [N, dim]
neg_sim = torch.mm(z1, all_z2.T) / self.temperature # [batch, N]
# InfoNCE
loss = -pos_sim + torch.logsumexp(neg_sim, dim=1)
return loss.mean()
def forward(self, users: torch.Tensor, pos_items: torch.Tensor,
neg_items: torch.Tensor) -> torch.Tensor:
"""
计算总损失 = BPR Loss + λ · CL Loss
"""
# ---- 主任务:BPR Loss ----
user_emb, item_emb = self.lightgcn_propagate(self.adj_norm, add_noise=False)
u = user_emb[users]
p = item_emb[pos_items]
n = item_emb[neg_items]
pos_scores = (u * p).sum(dim=1)
neg_scores = (u * n).sum(dim=1)
bpr_loss = -F.logsigmoid(pos_scores - neg_scores).mean()
# ---- 对比学习:InfoNCE Loss ----
if self.mode == 'simgcl':
# SimGCL:两次带噪声的传播生成两个视图
user_emb_1, item_emb_1 = self.lightgcn_propagate(
self.adj_norm, add_noise=True)
user_emb_2, item_emb_2 = self.lightgcn_propagate(
self.adj_norm, add_noise=True)
else:
# SGL:需要两个增强图(这里简化为两次独立的 edge dropout)
user_emb_1, item_emb_1 = self.lightgcn_propagate(
self.adj_norm, add_noise=False)
user_emb_2, item_emb_2 = self.lightgcn_propagate(
self.adj_norm, add_noise=False)
cl_loss_user = self.info_nce_loss(user_emb_1, user_emb_2, users)
cl_loss_item = self.info_nce_loss(item_emb_1, item_emb_2, pos_items)
cl_loss = cl_loss_user + cl_loss_item
return bpr_loss + self.cl_weight * cl_loss
# ---- 使用示例 ----
if __name__ == "__main__":
from scipy.sparse import coo_matrix
num_users, num_items = 100, 200
emb_dim = 64
n_nodes = num_users + num_items
# 构造随机交互图
n_edges = 500
users_arr = np.random.randint(0, num_users, n_edges)
items_arr = np.random.randint(0, num_items, n_edges) + num_users
row = np.concatenate([users_arr, items_arr])
col = np.concatenate([items_arr, users_arr])
data = np.ones(len(row), dtype=np.float32)
adj = coo_matrix((data, (row, col)), shape=(n_nodes, n_nodes))
degree = np.array(adj.sum(1)).flatten()
d_inv_sqrt = np.power(degree + 1e-8, -0.5)
adj.data = adj.data * d_inv_sqrt[adj.row] * d_inv_sqrt[adj.col]
indices = torch.LongTensor(np.vstack([adj.row, adj.col]))
values = torch.FloatTensor(adj.data)
adj_norm = torch.sparse_coo_tensor(indices, values, torch.Size([n_nodes, n_nodes]))
# 创建模型并训练
model = GraphContrastiveLearning(num_users, num_items, emb_dim=emb_dim, mode='simgcl')
model.set_graph(adj_norm)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
for epoch in range(20):
users = torch.randint(0, num_users, (128,))
pos_items = torch.randint(0, num_items, (128,))
neg_items = torch.randint(0, num_items, (128,))
loss = model(users, pos_items, neg_items)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (epoch + 1) % 5 == 0:
print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")
💡 面试考点:
Q:SGL 和 SimGCL 的核心区别是什么? A:SGL 通过图结构增强(丢边/丢节点/随机游走)生成两个增强视图,然后做对比学习。SimGCL 发现图结构增强不是必须的,直接在 GCN 传播过程中对 Embedding 加随机噪声(均匀扰动)即可,而且效果更好、训练更快。SimGCL 的核心见解是:对比学习的关键在于表征空间的均匀性(uniformity),而不是图增强本身。
19.7 最新进展与前沿¶
19.7.1 GNN + LLM 推荐融合¶
大语言模型(LLM)和图神经网络的结合是当前推荐系统研究的热点方向:
融合范式:
1. LLM 生成节点特征 → GNN 学习图结构
┌──────────────────────────────────────────┐
│ 用 LLM 为物品生成文本 Embedding: │
│ e_item = LLM("一部由诺兰执导的科幻电影") │
│ 然后将 e_item 作为 GNN 的初始节点特征 │
└──────────────────────────────────────────┘
2. GNN 提供结构信息 → 增强 LLM 的推荐能力
┌──────────────────────────────────────────┐
│ 用 GNN 生成用户/物品的结构化 Embedding │
│ 将结构 Embedding 作为 Prompt 注入 LLM │
│ LLM 结合结构信息进行推荐推理 │
└──────────────────────────────────────────┘
3. 联合训练
┌──────────────────────────────────────────┐
│ LLM 和 GNN 端到端联合优化 │
│ LLM 负责语义理解,GNN 负责协同过滤 │
│ 对齐两个空间的表征 │
└──────────────────────────────────────────┘
代表性工作:
| 工作 | 方法 | 要点 |
|---|---|---|
| RLMRec (2024) | LLM 生成用户/物品画像文本 → 文本 Embedding 与 GNN Embedding 对齐 | 利用 LLM 世界知识增强推荐 |
| LLMRec (2023) | LLM 增强图推荐的三种策略 | 数据增强+特征增强+知识注入 |
| GraphGPT (2024) | 对齐图编码器和 LLM | 通用图理解能力 |
19.7.2 异质信息网络推荐¶
异质信息网络(Heterogeneous Information Network, HIN) 包含多种类型的节点和边:
异质图示例(电商推荐):
节点类型:用户、商品、品牌、类目、商家
边类型:购买、浏览、属于_品牌、属于_类目、上架_商家
用户A ──购买──→ 商品X ──属于_品牌──→ 品牌P
│
属于_类目
│
▼
类目C ←──属于_类目── 商品Y
元路径(Meta-path):
用户-购买-商品-属于品牌-品牌-属于品牌-商品
→ 这条路径连接了同品牌的商品
核心技术: - 元路径注意力(meta-path attention):学习不同类型路径的重要性 - 语义级聚合:先在每种元路径内聚合,再跨元路径聚合 - 代表工作:HAN(Heterogeneous Graph Attention Network)、HERec
19.7.3 超图推荐¶
超图(Hypergraph) 中的边(超边)可以连接两个以上的节点,比普通图更具表达力:
普通图 vs 超图:
普通图:
一条边连接 2 个节点
用户 A → 物品 X
超图:
一条超边连接任意多个节点
超边 e = {用户A, 物品X, 物品Y, 物品Z}
→ 表示"用户A在一次购物session中购买了X,Y,Z"
→ 捕获群组级别的交互关系
应用场景: - 会话推荐:一个 session 中的物品构成超边 - 社交推荐:一个群组的成员构成超边 - 跨域推荐:共享实体的多个域构成超边
代表工作:DHCF(Dual Channel Hypergraph Collaborative Filtering)、HyperRec
19.7.4 图推荐的公平性与去偏¶
图推荐中的偏差问题:
1. 流行度偏差(Popularity Bias):
度大的节点(热门物品)在消息传递中被"放大"
→ 推荐结果偏向热门物品
2. 度偏差(Degree Bias):
高度节点和低度节点的 Embedding 质量差异大
→ 冷门物品/新用户表征不准确
3. 反馈循环(Feedback Loop):
推荐 → 更多曝光 → 更多交互 → 更高度数 → 更多推荐
→ 马太效应
解决方案: - 因果去偏:使用因果推断区分直接效应和混淆效应(参考第18章因果推荐部分) - 反事实数据增强:生成反事实图进行训练 - 公平性约束:在损失函数中加入公平性正则项 - 度感知归一化:针对不同度数的节点使用不同的归一化策略
19.8 实战:基于 LightGCN 的推荐系统¶
19.8.1 项目概述¶
实战目标:
在 MovieLens-100K 数据集上实现完整的 LightGCN 推荐系统
包含:
1. 数据加载与预处理
2. 用户-物品二部图构建
3. LightGCN 模型训练
4. 评估指标计算(Recall@K, NDCG@K)
5. 结果分析
19.8.2 完整端到端代码¶
"""
LightGCN 推荐系统完整实战
数据集:MovieLens-100K
运行方式:python lightgcn_movielens.py
依赖:torch, numpy, scipy, pandas
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from scipy.sparse import coo_matrix
from collections import defaultdict
import random
import time
import os
import urllib.request
import zipfile
# ============================================================
# 第 1 部分:数据加载与预处理
# ============================================================
def download_movielens_100k(data_dir: str = './ml-100k'):
"""下载 MovieLens-100K 数据集"""
if os.path.exists(os.path.join(data_dir, 'u.data')):
print("数据集已存在,跳过下载。")
return
url = 'https://files.grouplens.org/datasets/movielens/ml-100k.zip'
zip_path = 'ml-100k.zip'
print("正在下载 MovieLens-100K...")
urllib.request.urlretrieve(url, zip_path)
with zipfile.ZipFile(zip_path, 'r') as z:
z.extractall('.')
os.remove(zip_path)
print("下载完成。")
def load_and_split_data(data_dir: str = './ml-100k', test_ratio: float = 0.2):
"""
加载并划分数据集
按时间排序后,每个用户最后 20% 的交互作为测试集
Returns:
train_pairs: [N_train, 2] (user_id, item_id)
test_dict: {user_id: set(item_ids)} 测试集
num_users, num_items
"""
# 读取数据:user_id item_id rating timestamp
data = []
filepath = os.path.join(data_dir, 'u.data')
with open(filepath, 'r') as f: # with自动管理文件关闭
for line in f:
parts = line.strip().split('\t')
user, item, rating, ts = int(parts[0]), int(parts[1]), \
float(parts[2]), int(parts[3])
if rating >= 3.0: # 只保留正向交互(评分 ≥ 3)
data.append((user, item, ts))
# 重新编号:从 0 开始
users = sorted(set(d[0] for d in data))
items = sorted(set(d[1] for d in data))
user_map = {u: i for i, u in enumerate(users)}
item_map = {it: i for i, it in enumerate(items)}
num_users = len(users)
num_items = len(items)
# 按用户分组并按时间排序
user_interactions = defaultdict(list)
for user, item, ts in data:
user_interactions[user_map[user]].append((item_map[item], ts))
train_pairs = []
test_dict = defaultdict(set)
for user, interactions in user_interactions.items():
# 按时间排序
interactions.sort(key=lambda x: x[1]) # lambda匿名函数
items_sorted = [it for it, _ in interactions]
# 划分训练集和测试集
n_test = max(1, int(len(items_sorted) * test_ratio))
train_items = items_sorted[:-n_test]
test_items = items_sorted[-n_test:]
for item in train_items:
train_pairs.append([user, item])
for item in test_items:
test_dict[user].add(item)
train_pairs = np.array(train_pairs)
print(f"用户数: {num_users}, 物品数: {num_items}")
print(f"训练交互数: {len(train_pairs)}, 测试用户数: {len(test_dict)}")
print(f"稀疏度: {1 - len(train_pairs) / (num_users * num_items):.4f}")
return train_pairs, dict(test_dict), num_users, num_items
# ============================================================
# 第 2 部分:图构建
# ============================================================
def build_adj_matrix(train_pairs: np.ndarray, num_users: int,
num_items: int) -> torch.Tensor:
"""
构建归一化邻接矩阵(用户-物品二部图)
邻接矩阵结构:
A = | 0 R |
| R^T 0 |
其中 R 是用户-物品交互矩阵
归一化:Ã = D^{-1/2} A D^{-1/2}
"""
n_nodes = num_users + num_items
users = train_pairs[:, 0]
items = train_pairs[:, 1] + num_users
# 对称邻接矩阵
row = np.concatenate([users, items])
col = np.concatenate([items, users])
data = np.ones(len(row), dtype=np.float32)
adj = coo_matrix((data, (row, col)), shape=(n_nodes, n_nodes))
# 归一化
degree = np.array(adj.sum(axis=1)).flatten()
d_inv_sqrt = np.power(degree, -0.5)
d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.0
adj.data = adj.data * d_inv_sqrt[adj.row] * d_inv_sqrt[adj.col]
# 转 PyTorch 稀疏张量
indices = torch.LongTensor(np.vstack([adj.row, adj.col]))
values = torch.FloatTensor(adj.data)
return torch.sparse_coo_tensor(indices, values, torch.Size([n_nodes, n_nodes]))
# ============================================================
# 第 3 部分:LightGCN 模型
# ============================================================
class LightGCNFull(nn.Module):
"""带完整评估功能的 LightGCN"""
def __init__(self, num_users: int, num_items: int,
emb_dim: int = 64, n_layers: int = 3, reg: float = 1e-4):
super().__init__()
self.num_users = num_users
self.num_items = num_items
self.n_layers = n_layers
self.reg = reg
self.user_emb = nn.Embedding(num_users, emb_dim)
self.item_emb = nn.Embedding(num_items, emb_dim)
nn.init.xavier_uniform_(self.user_emb.weight)
nn.init.xavier_uniform_(self.item_emb.weight)
def propagate(self, adj: torch.Tensor) -> tuple:
"""LightGCN 传播 + 层组合"""
all_emb = torch.cat([self.user_emb.weight, self.item_emb.weight], dim=0)
emb_list = [all_emb]
for _ in range(self.n_layers):
all_emb = torch.spmm(adj, all_emb)
emb_list.append(all_emb)
final = torch.stack(emb_list, dim=0).mean(dim=0)
return final[:self.num_users], final[self.num_users:]
def bpr_loss(self, adj: torch.Tensor, users: torch.Tensor,
pos: torch.Tensor, neg: torch.Tensor) -> torch.Tensor:
user_e, item_e = self.propagate(adj)
u = user_e[users]
p = item_e[pos]
n = item_e[neg]
pos_s = (u * p).sum(dim=1)
neg_s = (u * n).sum(dim=1)
loss = -F.logsigmoid(pos_s - neg_s).mean()
# L2 正则(仅初始 Embedding)
reg = self.reg * (
self.user_emb(users).norm(2).pow(2) +
self.item_emb(pos).norm(2).pow(2) +
self.item_emb(neg).norm(2).pow(2)
) / len(users)
return loss + reg
# ============================================================
# 第 4 部分:评估指标
# ============================================================
def evaluate(model: LightGCNFull, adj: torch.Tensor,
train_user_items: dict, test_dict: dict,
top_k: int = 20) -> dict:
"""
评估推荐模型(Recall@K, NDCG@K)
Args:
model: 训练好的模型
adj: 归一化邻接矩阵
train_user_items: {user: set(items)} 训练集交互
test_dict: {user: set(items)} 测试集真实交互
top_k: 评估的 K 值
Returns:
{'recall@K': ..., 'ndcg@K': ...}
"""
model.eval()
with torch.no_grad():
user_emb, item_emb = model.propagate(adj)
recalls = []
ndcgs = []
for user in test_dict:
if user not in train_user_items:
continue
# 计算该用户对所有物品的分数
scores = torch.mv(item_emb, user_emb[user]) # [num_items]
# 屏蔽训练集中已交互的物品
train_items = list(train_user_items[user])
scores[train_items] = -float('inf')
# Top-K 推荐
_, topk_items = scores.topk(top_k)
topk_items = topk_items.cpu().numpy().tolist() # 保持有序列表,NDCG需要排名信息
# 计算 Recall@K
true_items = test_dict[user]
hits = len(set(topk_items) & true_items)
recall = hits / min(len(true_items), top_k)
recalls.append(recall)
# 计算 NDCG@K
dcg = 0.0
for rank, item in enumerate(topk_items):
if item in true_items:
dcg += 1.0 / np.log2(rank + 2) # rank 从 0 开始
idcg = sum(1.0 / np.log2(i + 2) for i in range(min(len(true_items), top_k)))
ndcg = dcg / idcg if idcg > 0 else 0.0
ndcgs.append(ndcg)
return {
f'Recall@{top_k}': np.mean(recalls),
f'NDCG@{top_k}': np.mean(ndcgs)
}
# ============================================================
# 第 5 部分:训练与评估主流程
# ============================================================
def main():
# 超参数
EMB_DIM = 64
N_LAYERS = 3
LR = 1e-3
REG = 1e-4
BATCH_SIZE = 1024
N_EPOCHS = 80
TOP_K = 20
# 1. 数据加载
download_movielens_100k()
train_pairs, test_dict, num_users, num_items = load_and_split_data()
# 2. 构建图
adj = build_adj_matrix(train_pairs, num_users, num_items)
# 构建训练集用户交互字典
train_user_items = defaultdict(set)
for u, i in train_pairs:
train_user_items[u].add(i)
# 3. 创建模型
model = LightGCNFull(num_users, num_items, EMB_DIM, N_LAYERS, REG)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
# 4. 训练循环
print("\n" + "=" * 60)
print("开始训练 LightGCN")
print("=" * 60)
best_recall = 0.0
for epoch in range(N_EPOCHS):
model.train()
perm = np.random.permutation(len(train_pairs))
total_loss = 0.0
n_batch = 0
for start in range(0, len(train_pairs), BATCH_SIZE):
end = min(start + BATCH_SIZE, len(train_pairs))
batch = train_pairs[perm[start:end]]
users = torch.LongTensor(batch[:, 0])
pos_items = torch.LongTensor(batch[:, 1])
neg_items = torch.LongTensor([
sample_neg(train_user_items, num_items, u)
for u in batch[:, 0]
])
loss = model.bpr_loss(adj, users, pos_items, neg_items)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
n_batch += 1
avg_loss = total_loss / n_batch
# 每 10 个 epoch 评估一次
if (epoch + 1) % 10 == 0:
metrics = evaluate(model, adj, train_user_items, test_dict, TOP_K)
recall = metrics[f'Recall@{TOP_K}']
ndcg = metrics[f'NDCG@{TOP_K}']
improved = " ★ New Best!" if recall > best_recall else ""
best_recall = max(best_recall, recall)
print(f"Epoch {epoch+1:3d} | Loss: {avg_loss:.4f} | "
f"Recall@{TOP_K}: {recall:.4f} | "
f"NDCG@{TOP_K}: {ndcg:.4f}{improved}")
# 5. 最终评估
print("\n" + "=" * 60)
print("最终评估结果")
print("=" * 60)
for k in [5, 10, 20]:
metrics = evaluate(model, adj, train_user_items, test_dict, k)
print(f" Recall@{k:2d}: {metrics[f'Recall@{k}']:.4f} | "
f"NDCG@{k:2d}: {metrics[f'NDCG@{k}']:.4f}")
# 6. 推荐示例
print("\n" + "=" * 60)
print("推荐示例")
print("=" * 60)
model.eval()
with torch.no_grad():
user_emb, item_emb = model.propagate(adj)
for uid in [0, 1, 2]:
scores = torch.mv(item_emb, user_emb[uid])
train_items = list(train_user_items.get(uid, set()))
scores[train_items] = -float('inf')
_, topk = scores.topk(10)
print(f" 用户 {uid} 的 Top-10 推荐: {topk.tolist()}")
def sample_neg(user_items: dict, num_items: int, user: int) -> int:
"""负采样"""
while True:
neg = random.randint(0, num_items - 1)
if neg not in user_items[user]:
return neg
if __name__ == "__main__":
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
main()
19.8.3 结果分析¶
在 MovieLens-100K 上的典型结果(参考值):
| 模型 | Recall@20 | NDCG@20 | 参数量 |
|---|---|---|---|
| MF-BPR | ~0.18 | ~0.15 | 用户数×dim + 物品数×dim |
| NGCF | ~0.20 | ~0.17 | 上述 + 每层 W 矩阵 |
| LightGCN (3层) | ~0.22 | ~0.19 | 仅初始 Embedding |
关键观察:
- LightGCN 参数最少但效果最好:没有额外的权重矩阵,所有参数集中在初始 Embedding
- 层数敏感性:2-3 层效果最好,超过 4 层开始下降(over-smoothing)
- Embedding 维度:64 维通常足够,过大反而过拟合
- 正则化重要:L2 正则化对防止过拟合至关重要
层数 vs Recall@20 关系示意:
Recall@20
^
| ╱ ─ ─ ─ ╲
| ╱ ╲
| ╱ ╲
| ╱ ╲
|╱ ╲
┼───┬───┬───┬───┬───┬───→ 层数
0 1 2 3 4 5
最优通常在 2-3 层
继续增加层数 → over-smoothing → 性能下降
19.9 面试高频题¶
题目 1:为什么要将推荐系统建模为图问题?¶
参考答案:
传统推荐方法(如矩阵分解)仅利用用户与物品的直接交互(一阶信号),忽视了更深层的协同关系。将推荐建模为图问题有三个核心优势:
- 高阶协同信号:通过图上的消息传递,可以自然地捕获多跳邻居的协同信息。例如用户 A 通过"A→物品X→用户B→物品Y"的路径发现物品Y,这是2阶协同信号。
- 灵活融合侧信息:用户属性、物品属性、社交关系、知识图谱等异质信息都可以作为图的节点和边自然融入,无需人工设计特征交叉。
- 缓解数据稀疏:即使两个用户没有共同交互,通过图中的间接路径仍然可以发现它们之间的关联。
题目 2:GCN 的消息传递机制是什么?请用公式表达并解释。¶
参考答案:
GCN 的消息传递分为三步:
- 消息构造:邻居节点 \(u\) 发送其特征 \(h_u^{(l)}\) 给目标节点 \(v\)
- 聚合(Aggregate):将所有邻居消息汇总
- 更新(Update):结合自身特征和聚合消息
矩阵形式为 \(H^{(l+1)} = \sigma(\tilde{A} H^{(l)} W^{(l)})\),其中 \(\tilde{A} = D^{-1/2} A D^{-1/2}\) 是对称归一化邻接矩阵。归一化系数 \(\frac{1}{\sqrt{d_u \cdot d_v}}\) 的作用是防止度大的节点在聚合中占据主导地位。
题目 3:LightGCN 对 GCN 做了哪些简化?为什么这些简化在推荐场景中是合理的?¶
参考答案:
LightGCN 做了两个关键简化:
- 移除特征变换矩阵 \(W\):推荐场景中用户和物品没有丰富的属性特征,只有可学习的 ID Embedding。对 ID Embedding 做线性变换等价于换了一组 Embedding 参数,增加了模型复杂度却没有带来额外信息。
- 移除非线性激活 \(\sigma\):实验表明非线性激活在推荐任务中会导致训练困难和性能下降,因为它可能破坏 Embedding 空间中的几何结构。
合理性在于:GCN 的特征变换和非线性是为节点分类任务设计的(节点有丰富的初始特征需要变换),而推荐场景的本质是协同过滤,核心是学习高质量的用户/物品表征,邻域信息的简单聚合已经足够。
题目 4:什么是 over-smoothing 问题?LightGCN 如何缓解?¶
参考答案:
Over-smoothing(过度平滑) 指随着 GCN 层数增加,所有节点的表征趋于收敛到相同的值,导致不同节点变得不可区分。
数学直觉:\(\tilde{A}\) 的反复相乘相当于随机游走的收敛过程,最终所有节点的表征趋向于度加权的"全局平均"。
LightGCN 的缓解策略是层组合(Layer Combination):
即使第 K 层的表征过于平滑,第 0 层(初始 Embedding)和低层表征仍保留了节点的局部个性化信息。最终的加权组合保证了多尺度信息的融合。实践中层数通常控制在 2-3 层。
题目 5:PinSage 如何解决 GCN 在十亿节点图上的扩展性问题?¶
参考答案:
PinSage 采用三个核心策略:
- 随机游走邻居采样:用随机游走代替全邻居聚合,为每个节点固定选取 Top-T 个最重要的邻居(按游走访问频次排序),将邻居数从可能的百万级降到固定的 T(如 50)。
- Mini-batch 训练:不做全图卷积,而是为每个 mini-batch 的目标节点构建计算子图(K 层邻居的 K-hop 子图),仅在子图上进行前向和反向传播。
- Producer-Consumer 架构:CPU 进程负责图采样(I/O 密集),GPU 进程负责模型计算(计算密集),两者异步流水线执行。
此外,PinSage 使用 Hard Negative Mining(用上一轮 Embedding 检索相似但未交互的物品)提升负样本质量,进一步提升训练效率和效果。
题目 6:LightGCN 中 BPR Loss 和 L2 正则化有什么注意事项?¶
参考答案:
BPR Loss 的核心思想是:正样本(用户交互过的物品)的预测分数应该高于负样本(未交互的物品):
注意事项: 1. L2 正则化只对初始 Embedding \(e^{(0)}\) 进行,不对传播后的 Embedding 正则化。因为 LightGCN 没有额外的权重参数,所有可学习参数都在初始 Embedding 中。 2. 负采样策略很重要:均匀随机负采样是最基础的方式,但效率不高。可以用流行度加权采样或 Hard Negative 提升效果。 3. 正则化系数 \(\lambda\) 需要调参:过大导致欠拟合,过小导致过拟合。
题目 7:对比学习(Contrastive Learning)为什么能提升图推荐效果?¶
参考答案:
对比学习通过引入额外的自监督信号来增强 Embedding 质量,主要从三个方面提升效果:
- 缓解数据稀疏:用户-物品交互极度稀疏(通常 < 1%),监督信号不足。对比学习通过数据增强或噪声扰动创造更多的训练信号,相当于用无标注的方式增加了训练数据。
- 提升 Embedding 均匀性(Uniformity):对比学习的负样本推开机制使得不同节点的 Embedding 更均匀地分布在特征空间中,避免了所有 Embedding 坍塌到一个小区域——这在稀疏图上尤其重要。
- 增强鲁棒性:通过不同增强视图间的一致性约束,模型学到的表征对噪声更鲁棒。
代表方法 SGL 使用图结构增强(丢边/丢节点),SimGCL 发现直接在 Embedding 上加随机噪音效果更好且更高效。
题目 8:知识图谱推荐和纯协同过滤推荐的核心区别是什么?各有什么优劣?¶
参考答案:
| 维度 | 纯协同过滤 | 知识图谱推荐 |
|---|---|---|
| 信息来源 | 仅用户-物品交互 | 交互 + 物品属性 + 实体关系 |
| 冷启动 | 新物品无交互则无法推荐 | 新物品有 KG 属性就可推荐 |
| 可解释性 | 黑盒("相似用户喜欢") | 可解释("因为同导演/同类型") |
| 数据依赖 | 需要大量交互 | 还需要高质量知识图谱 |
| 计算成本 | 较低 | 较高(KG 通常很大) |
| KG 维护 | 无 | 需要持续维护和更新 |
核心权衡:KG 推荐用外部知识换取更好的冷启动和可解释性,但代价是 KG 的构建和维护成本。在工业界,KG 推荐通常作为整体推荐系统的一个组件,与协同过滤互补使用。
题目 9:GCN 推荐中的归一化邻接矩阵 \(\tilde{A} = D^{-1/2}AD^{-1/2}\) 有什么物理含义?为什么不直接用 \(A\)?¶
参考答案:
直接使用邻接矩阵 \(A\) 有两个严重问题:
- 数值不稳定:随着层数增加,\(A^k\) 的值会指数级爆炸或衰减,导致梯度爆炸/消失。
- 度偏差:度大的节点(热门物品)在聚合中贡献过大,主导了所有邻居的表征。
\(\tilde{A}_{ij} = \frac{1}{\sqrt{d_i} \cdot \sqrt{d_j}}\) 的物理含义: - 节点 \(j\) 向节点 \(i\) 传递消息时,按双方度数的几何平均进行缩放。 - 高度节点的消息被缩小(热门物品影响被抑制),低度节点的消息被放大(冷门物品获得更多关注)。 - 这等价于度归一化的对称拉普拉斯算子,保证了传播的数值稳定性。
另外还有行归一化 \(D^{-1}A\)(左归一化),等价于对每个节点的邻居消息取平均,也常用于工业场景。
题目 10:如果让你设计一个工业级图推荐系统,你会考虑哪些关键问题?¶
参考答案:
需要从以下 5 个维度考虑:
1. 可扩展性 - 图规模可能达到十亿节点级别,全图 GCN 不可行 - 方案:mini-batch 采样训练(如 PinSage)、GraphSAGE 风格的邻居采样
2. 实时性 - 用户行为实时发生,图结构动态变化 - 方案:增量图更新 + 局部 Embedding 重新计算(而非全图重训)
3. 异质信息 - 实际场景中有多种节点类型和边类型 - 方案:异质图神经网络(HAN)、关系感知的注意力机制
4. 召回 vs 排序 - GNN Embedding 更适合做召回(向量检索),排序阶段通常用更精细的模型 - 方案:双塔架构——用 GNN 生成用户/物品的召回 Embedding → ANN 检索 → 精排
5. 线上部署 - GNN 推理延迟可能较高(需要多跳邻居聚合) - 方案:离线预计算所有节点 Embedding → 线上直接查表 + ANN 检索 - 增量更新:新物品上线后,仅在其局部子图上做推理
工业级图推荐系统架构:
┌──────────────┐ ┌──────────────────┐
│ 离线训练 │ │ 特征工程 │
│ LightGCN/ │ │ 用户特征、物品特征 │
│ PinSage │ │ 上下文特征 │
│ 全量训练 │ └───────┬────────────┘
└──────┬───────┘ │
│ │
▼ ▼
┌──────────────┐ ┌──────────────────┐
│ Embedding │ │ 精排模型 │
│ 预计算 & 存储│ │ DeepFM/DIN │
│ (Redis/HBase)│ │ (参考第6/7章) │
└──────┬───────┘ └───────┬────────────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────────┐
│ ANN 召回 │────→│ 精排 → 重排 │────→ 推荐结果
│ (Faiss/Milvus)│ │ │
└──────────────┘ └──────────────────┘
📝 本章小结¶
| 主题 | 核心要点 |
|---|---|
| GCN 基础 | 消息传递 + 邻域聚合,矩阵形式 \(\tilde{A}HW\),谱域 vs 空间域 |
| LightGCN | 移除特征变换和非线性激活,层组合多尺度融合,BPR Loss |
| PinSage | 随机游走采样、Importance Pooling、MapReduce 分布式、Hard Negative |
| KGAT | 知识图谱 + 协同过滤统一为 CKG,关系感知的注意力聚合 |
| 图对比学习 | SGL 图增强 + InfoNCE,SimGCL 噪声扰动更简单有效 |
| 前沿方向 | GNN+LLM 融合、异质图、超图、公平性去偏 |
学习建议: 1. 先理解 GCN 的消息传递原理,再学 LightGCN 的简化动机 2. 动手运行实战代码,尝试调整层数、Embedding 维度、正则化系数 3. 重点掌握 LightGCN 和 PinSage,这是面试中出现频率最高的两个模型 4. 了解图对比学习和 KG 推荐的基本思路,能说清楚"为什么有用"
推荐阅读: - LightGCN 论文:LightGCN: Simplifying and Powering Graph Convolution Network for Recommendation (SIGIR 2020) - PinSage 论文:Graph Convolutional Neural Networks for Web-Scale Recommender Systems (KDD 2018) - SGL 论文:Self-supervised Graph Learning for Recommendation (SIGIR 2021) - SimGCL 论文:Are Graph Augmentations Necessary? Simple Graph Contrastive Learning for Recommendation (SIGIR 2022)
下一步:掌握本章内容后,可以回顾第9章-召回算法了解如何将 GNN Embedding 集成到工业推荐系统的召回阶段,或参考第16章-LLM与推荐系统了解 GNN+LLM 的最新融合范式。