跳转至

图神经网络推荐

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

前置知识:本章需要对深度学习推荐(第6章)和协同过滤(第3章)有基础了解。图神经网络的通用原理可参考机器学习教程中的图神经网络章节。

面试必考:图神经网络推荐是字节跳动、快手、阿里巴巴、Pinterest 等公司的高频面试主题。LightGCN、PinSage、知识图谱推荐、图对比学习是核心考察方向。

📋 学习目标

完成本章学习后,你应该能够:

  • 理解为什么将推荐问题建模为图问题,以及图推荐相对传统方法的优势
  • 掌握 GCN 核心原理:消息传递、邻域聚合、谱域与空间域视角
  • 深入理解 LightGCN 的简化设计及其背后的动机
  • 掌握 PinSage 在工业级十亿节点图上的扩展策略
  • 了解知识图谱增强推荐(KGAT)的注意力机制
  • 理解图对比学习(SGL、SimGCL)在推荐中的应用
  • 能够用 PyTorch 实现 LightGCN 并在 MovieLens 上完成端到端训练与评估
  • 应对图推荐相关的高频面试问题

19.1 图推荐概述

19.1.1 为什么将推荐建模为图问题

推荐系统的核心数据——用户行为、社交关系、物品属性——天然具有图结构

Text Only
传统推荐的数据视角:
┌──────────────────────────────────────┐
│  用户-物品交互矩阵 R (m × n)         │
│  R[u][i] = 1  表示用户 u 交互过物品 i │
│  大部分元素为 0 → 极度稀疏            │
└──────────────────────────────────────┘

图推荐的数据视角:
┌──────────────────────────────────────┐
│  用户和物品都是节点                    │
│  交互关系是边                         │
│  社交关系、属性关系也是边              │
│  → 丰富的拓扑结构 + 高阶连接信息       │
└──────────────────────────────────────┘

直觉:如果用户 A 和用户 B 都喜欢物品 X 和物品 Y,那么 A 喜欢的物品 Z 也可能被 B 喜欢——这就是高阶协同信号,图结构天然编码了这种信号。

19.1.2 推荐系统中的三类图

图类型 节点 典型场景
用户-物品二部图 用户、物品 交互(点击/购买/评分) 协同过滤推荐
社交网络图 用户 关注/好友关系 社交推荐(微信/微博)
知识图谱 实体(物品/属性/类别) 关系(属于/导演/演员) 可解释推荐
Text Only
用户-物品二部图示例:

  User1 ──── Item_A
    │  ╲        │
    │    ╲      │
    │      ╲    │
  User2 ──── Item_B
    │            │
    │            │
  User3 ──── Item_C

高阶连接:User1 → Item_A → User2 → Item_B
→ User1 和 Item_B 之间存在 2 阶路径
→ 可以向 User1 推荐 Item_B

19.1.3 图推荐 vs 传统推荐的优势

维度 传统推荐(MF/DNN) 图推荐(GNN)
信号利用 仅直接交互(1阶) 高阶协同信号(多阶邻居)
数据稀疏 严重受限 通过消息传递缓解
冷启动 困难 可利用图结构中的间接连接
可解释性 黑盒 可通过路径/子图解释
侧信息融合 特征拼接 异质图自然融合

💡 面试考点:面试官常问"图推荐相比矩阵分解的核心优势是什么?"——答案的关键词是高阶协同信号(high-order collaborative signal)。矩阵分解只建模用户和物品的直接交互,而 GNN 通过多层消息传递自然捕获了多跳邻居的协同信息。


19.2 图卷积网络基础

19.2.1 GCN 核心原理

图卷积网络(Graph Convolutional Network, GCN)的核心思想是消息传递(Message Passing):

Text Only
消息传递范式:

  对每个节点 v:
  1. 收集(Aggregate):从邻居节点 u ∈ N(v) 收集消息
  2. 更新(Update):结合自身特征和邻居消息,更新节点表征

  h_v^{(l+1)} = UPDATE(h_v^{(l)}, AGGREGATE({h_u^{(l)} : u ∈ N(v)}))

GCN(Kipf & Welling, 2017) 的具体形式:

\[h_v^{(l+1)} = \sigma\left(\sum_{u \in \mathcal{N}(v) \cup \{v\}} \frac{1}{\sqrt{|\mathcal{N}(u)|} \cdot \sqrt{|\mathcal{N}(v)|}} \cdot W^{(l)} h_u^{(l)}\right)\]

其中: - \(h_v^{(l)}\) 是节点 \(v\) 在第 \(l\) 层的表征 - \(\mathcal{N}(v)\) 是节点 \(v\) 的邻居集合 - \(W^{(l)}\) 是第 \(l\) 层的可学习权重矩阵 - \(\sigma\) 是非线性激活函数(如 ReLU)

19.2.2 图信号处理视角:谱域 vs 空间域

GCN 可以从两个视角理解:

谱域(Spectral Domain): - 将图信号进行图傅里叶变换 - 在频域上用滤波器进行卷积 - 基于图拉普拉斯矩阵 \(L = D - A\) 的特征分解 - GCN 是切比雪夫多项式滤波器的一阶近似

空间域(Spatial Domain): - 直接在图上定义邻域聚合操作 - 每个节点从其邻居收集信息 - 更直觉、更容易推广到大规模图 - 工业界主要采用空间域方法

💡 面试考点:谱域方法需要整个图的拉普拉斯矩阵特征分解,计算复杂度 \(O(n^3)\),无法处理大规模图;空间域方法可以用 mini-batch 采样,可扩展性强。这就是为什么 PinSage(空间域)能处理十亿节点,而纯谱方法不行。

19.2.3 矩阵形式推导

整个图上的 GCN 传播可以写成矩阵形式:

\[H^{(l+1)} = \sigma(\tilde{A} H^{(l)} W^{(l)})\]

其中归一化邻接矩阵:

\[\tilde{A} = D^{-1/2} A D^{-1/2}\]

推导过程:

  1. 原始邻接矩阵 \(A\)\(A_{ij} = 1\) 如果节点 \(i\)\(j\) 之间有边
  2. 加自环\(\hat{A} = A + I\)(每个节点保留自身信息)
  3. 度矩阵\(\hat{D}_{ii} = \sum_j \hat{A}_{ij}\)
  4. 对称归一化\(\tilde{A} = \hat{D}^{-1/2} \hat{A} \hat{D}^{-1/2}\)

对称归一化的物理含义:

\[\tilde{A}_{ij} = \frac{1}{\sqrt{\hat{d}_i} \cdot \sqrt{\hat{d}_j}}\]

度大的节点(热门物品)在传播时被适当"抑制",避免热门物品主导所有用户的表征。

19.2.4 PyTorch 实现:简单 GCN 层

Python
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from scipy.sparse import coo_matrix

class GCNLayer(nn.Module):  # 继承nn.Module定义网络层
    """
    简单的图卷积层实现(纯 PyTorch,不依赖 PyG)

    实现公式:H^{(l+1)} = σ(Ã H^{(l)} W^{(l)})
    其中 Ã = D^{-1/2} A D^{-1/2}(含自环)
    """
    def __init__(self, in_features: int, out_features: int, bias: bool = True):
        super().__init__()  # super()调用父类方法
        self.weight = nn.Parameter(torch.FloatTensor(in_features, out_features))
        if bias:
            self.bias = nn.Parameter(torch.FloatTensor(out_features))
        else:
            self.bias = None
        self.reset_parameters()

    def reset_parameters(self):
        """Xavier 初始化"""
        nn.init.xavier_uniform_(self.weight)
        if self.bias is not None:
            nn.init.zeros_(self.bias)

    def forward(self, x: torch.Tensor, adj_norm: torch.Tensor) -> torch.Tensor:
        """
        前向传播
        Args:
            x: 节点特征矩阵 [num_nodes, in_features]
            adj_norm: 归一化邻接矩阵 Ã [num_nodes, num_nodes](稀疏张量)
        Returns:
            输出特征矩阵 [num_nodes, out_features]
        """
        # 特征变换:H W
        support = torch.mm(x, self.weight)
        # 邻域聚合:Ã (H W)
        output = torch.spmm(adj_norm, support)
        if self.bias is not None:
            output += self.bias
        return output

class SimpleGCN(nn.Module):
    """
    两层 GCN 模型
    用于节点分类或生成节点 Embedding
    """
    def __init__(self, n_features: int, n_hidden: int, n_classes: int, dropout: float = 0.5):
        super().__init__()
        self.gc1 = GCNLayer(n_features, n_hidden)
        self.gc2 = GCNLayer(n_hidden, n_classes)
        self.dropout = dropout

    def forward(self, x: torch.Tensor, adj_norm: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: 节点特征 [num_nodes, n_features]
            adj_norm: 归一化邻接矩阵
        """
        x = F.relu(self.gc1(x, adj_norm))  # F.xxx PyTorch函数式API
        x = F.dropout(x, self.dropout, training=self.training)
        x = self.gc2(x, adj_norm)
        return x

def build_normalized_adj(edge_index: np.ndarray, num_nodes: int) -> torch.Tensor:
    """
    从边列表构建归一化邻接矩阵(含自环)

    Args:
        edge_index: 边列表 [2, num_edges],每列是 (src, dst)
        num_nodes: 节点总数
    Returns:
        归一化后的稀疏邻接矩阵 Ã = D^{-1/2} (A + I) D^{-1/2}
    """
    # 构建邻接矩阵(加自环)
    row = np.concatenate([edge_index[0], edge_index[1], np.arange(num_nodes)])
    col = np.concatenate([edge_index[1], edge_index[0], np.arange(num_nodes)])
    data = np.ones(len(row), dtype=np.float32)
    adj = coo_matrix((data, (row, col)), shape=(num_nodes, num_nodes))

    # 计算度矩阵 D^{-1/2}
    degree = np.array(adj.sum(axis=1)).flatten()  # np.array创建NumPy数组
    d_inv_sqrt = np.power(degree, -0.5)
    d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.0

    # 对称归一化:D^{-1/2} A D^{-1/2}
    row_norm = d_inv_sqrt[adj.row]
    col_norm = d_inv_sqrt[adj.col]
    adj.data = adj.data * row_norm * col_norm

    # 转为 PyTorch 稀疏张量
    indices = torch.LongTensor(np.vstack([adj.row, adj.col]))
    values = torch.FloatTensor(adj.data)
    shape = torch.Size(adj.shape)
    return torch.sparse_coo_tensor(indices, values, shape)

# ---- 使用示例 ----
if __name__ == "__main__":
    # 构造一个简单的 5 节点图
    edge_index = np.array([[0, 0, 1, 2, 3],
                           [1, 2, 2, 3, 4]])  # 5 条边
    num_nodes = 5

    # 构建归一化邻接矩阵
    adj_norm = build_normalized_adj(edge_index, num_nodes)

    # 随机节点特征
    x = torch.randn(num_nodes, 16)

    # 创建并运行 GCN
    model = SimpleGCN(n_features=16, n_hidden=32, n_classes=8)
    output = model(x, adj_norm)
    print(f"输入形状: {x.shape}")        # [5, 16]
    print(f"输出形状: {output.shape}")    # [5, 8]
    print(f"输出: {output}")

19.3 LightGCN

19.3.1 从 NGCF 到 LightGCN:为什么简化

NGCF(Neural Graph Collaborative Filtering, 2019) 将 GCN 引入协同过滤,在每层传播时使用:

\[e_u^{(l+1)} = \sigma\left(W_1^{(l)} e_u^{(l)} + \sum_{i \in \mathcal{N}_u} \frac{1}{\sqrt{|\mathcal{N}_u||\mathcal{N}_i|}} \left(W_1^{(l)} e_i^{(l)} + W_2^{(l)}(e_i^{(l)} \odot e_u^{(l)})\right)\right)\]

LightGCN(He et al., SIGIR 2020) 指出 NGCF 中的特征变换 \(W\)非线性激活 \(\sigma\) 对推荐任务反而有害:

Text Only
NGCF vs LightGCN 对比:

NGCF 每层操作:
  1. 特征变换(W·e)      ← LightGCN 移除
  2. 非线性激活(σ(·))    ← LightGCN 移除
  3. 邻域聚合              ← LightGCN 保留

LightGCN 每层操作:
  仅保留邻域聚合(最简形式的图卷积)

为什么移除是合理的?

  1. 推荐场景没有节点特征:用户和物品的初始表征只有 Embedding ID,没有真正的"特征"需要变换
  2. 非线性激活引入信息瓶颈:在没有丰富特征的场景下,非线性反而导致优化困难
  3. 实验验证:移除后效果不降反升,训练也更快

19.3.2 LightGCN 传播规则

单层传播

\[e_u^{(l+1)} = \sum_{i \in \mathcal{N}_u} \frac{1}{\sqrt{|\mathcal{N}_u|} \cdot \sqrt{|\mathcal{N}_i|}} e_i^{(l)}\]
\[e_i^{(l+1)} = \sum_{u \in \mathcal{N}_i} \frac{1}{\sqrt{|\mathcal{N}_i|} \cdot \sqrt{|\mathcal{N}_u|}} e_u^{(l)}\]

矩阵形式:

\[E^{(l+1)} = \tilde{A} E^{(l)}\]

其中 \(\tilde{A}\) 是用户-物品二部图的归一化邻接矩阵(不含自环):

\[\tilde{A} = D^{-1/2} A D^{-1/2}, \quad A = \begin{pmatrix} 0 & R \\ R^T & 0 \end{pmatrix}\]

19.3.3 层组合(Layer Combination)

LightGCN 的另一个关键设计是层组合——将各层的 Embedding 加权求和作为最终表征:

\[e_u = \sum_{k=0}^{K} \alpha_k \cdot e_u^{(k)}, \quad e_i = \sum_{k=0}^{K} \alpha_k \cdot e_i^{(k)}\]
  • \(e_u^{(0)}\) 是用户的初始 Embedding(第 0 层)
  • \(e_u^{(k)}\) 是经过 \(k\) 层传播后的表征
  • \(\alpha_k\) 是每层的权重,论文中默认 \(\alpha_k = \frac{1}{K+1}\)(均匀权重)

层组合的意义: - 第 0 层:初始 Embedding,捕获用户/物品自身信息 - 第 1 层:直接邻居信息(1 阶协同信号) - 第 2 层:2 跳邻居信息 - 第 K 层:K 阶协同信号 - 加权组合 = 多尺度信息融合

19.3.4 BPR Loss

LightGCN 使用 BPR(Bayesian Personalized Ranking)Loss 进行训练:

\[\mathcal{L}_{BPR} = -\sum_{(u,i,j) \in \mathcal{O}} \ln \sigma(\hat{y}_{ui} - \hat{y}_{uj}) + \lambda \|\Theta\|^2\]

其中: - \((u, i, j)\):用户 \(u\)、正样本 \(i\)(交互过)、负样本 \(j\)(未交互) - \(\hat{y}_{ui} = e_u^T e_i\):预测分数 - \(\sigma\):sigmoid 函数 - \(\lambda \|\Theta\|^2\):L2 正则化(只对初始 Embedding \(e^{(0)}\) 进行正则化)

19.3.5 完整 PyTorch 实现

Python
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from scipy.sparse import coo_matrix, eye as sp_eye
from collections import defaultdict
import random
import time

class LightGCN(nn.Module):
    """
    LightGCN 推荐模型(纯 PyTorch 实现,不依赖 PyG)

    论文:LightGCN: Simplifying and Powering Graph Convolution Network
          for Recommendation (SIGIR 2020)

    核心特点:
    1. 移除了 GCN 中的特征变换和非线性激活
    2. 使用层组合(Layer Combination)融合多层信息
    3. 仅需学习初始 Embedding
    """
    def __init__(self, num_users: int, num_items: int, embedding_dim: int = 64,
                 n_layers: int = 3, reg_weight: float = 1e-4):
        super().__init__()
        self.num_users = num_users
        self.num_items = num_items
        self.n_layers = n_layers
        self.reg_weight = reg_weight

        # 初始 Embedding(唯一需要学习的参数)
        self.user_embedding = nn.Embedding(num_users, embedding_dim)
        self.item_embedding = nn.Embedding(num_items, embedding_dim)

        # Xavier 初始化
        nn.init.xavier_uniform_(self.user_embedding.weight)
        nn.init.xavier_uniform_(self.item_embedding.weight)

        # 归一化邻接矩阵(在 set_graph 中设置)
        self.adj_norm = None

    def set_graph(self, user_item_pairs: np.ndarray):
        """
        构建并设置归一化邻接矩阵

        Args:
            user_item_pairs: [N, 2] 数组,每行是 (user_id, item_id)
        """
        n_nodes = self.num_users + self.num_items

        # 用户-物品二部图邻接矩阵
        # 用户节点编号: 0 ~ num_users-1
        # 物品节点编号: num_users ~ num_users+num_items-1
        users = user_item_pairs[:, 0]
        items = user_item_pairs[:, 1] + self.num_users  # 物品编号偏移

        # 构建对称邻接矩阵 (无自环)
        row = np.concatenate([users, items])
        col = np.concatenate([items, users])
        data = np.ones(len(row), dtype=np.float32)
        adj = coo_matrix((data, (row, col)), shape=(n_nodes, n_nodes))

        # D^{-1/2} A D^{-1/2} 归一化
        degree = np.array(adj.sum(axis=1)).flatten()
        d_inv_sqrt = np.power(degree, -0.5)
        d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.0
        adj.data = adj.data * d_inv_sqrt[adj.row] * d_inv_sqrt[adj.col]

        # 转为 PyTorch 稀疏张量
        indices = torch.LongTensor(np.vstack([adj.row, adj.col]))
        values = torch.FloatTensor(adj.data)
        self.adj_norm = torch.sparse_coo_tensor(indices, values,
                                                 torch.Size([n_nodes, n_nodes]))

    def compute_all_embeddings(self) -> tuple:
        """
        LightGCN 的核心:多层图卷积 + 层组合

        Returns:
            (user_embeddings, item_embeddings) 最终表征
        """
        # 拼接用户和物品的初始 Embedding
        all_emb = torch.cat([self.user_embedding.weight,  # torch.cat沿已有维度拼接张量
                             self.item_embedding.weight], dim=0)  # [n_nodes, dim]

        emb_list = [all_emb]  # 保存每层的 Embedding

        # 多层图卷积(无特征变换,无激活)
        for layer in range(self.n_layers):
            all_emb = torch.spmm(self.adj_norm, all_emb)  # Ã · E^{(l)}
            emb_list.append(all_emb)

        # 层组合:各层均匀加权
        all_emb = torch.stack(emb_list, dim=0).mean(dim=0)  # [n_nodes, dim]  # torch.stack沿新维度拼接张量

        user_emb = all_emb[:self.num_users]
        item_emb = all_emb[self.num_users:]
        return user_emb, item_emb

    def bpr_loss(self, users: torch.Tensor, pos_items: torch.Tensor,
                 neg_items: torch.Tensor) -> torch.Tensor:
        """
        BPR 损失函数

        Args:
            users: 用户 ID [batch_size]
            pos_items: 正样本物品 ID [batch_size]
            neg_items: 负样本物品 ID [batch_size]
        Returns:
            BPR loss + L2 正则化
        """
        user_emb, item_emb = self.compute_all_embeddings()

        u_emb = user_emb[users]           # [batch, dim]
        pos_emb = item_emb[pos_items]      # [batch, dim]
        neg_emb = item_emb[neg_items]      # [batch, dim]

        # BPR Loss
        pos_scores = (u_emb * pos_emb).sum(dim=1)  # [batch]
        neg_scores = (u_emb * neg_emb).sum(dim=1)   # [batch]
        bpr_loss = -F.logsigmoid(pos_scores - neg_scores).mean()

        # L2 正则化(只对初始 Embedding 正则化)
        u_emb_0 = self.user_embedding(users)
        pos_emb_0 = self.item_embedding(pos_items)
        neg_emb_0 = self.item_embedding(neg_items)
        reg_loss = self.reg_weight * (
            u_emb_0.norm(2).pow(2) +
            pos_emb_0.norm(2).pow(2) +
            neg_emb_0.norm(2).pow(2)
        ) / users.shape[0]

        return bpr_loss + reg_loss

    def predict(self, users: torch.Tensor) -> torch.Tensor:
        """
        为指定用户生成所有物品的预测分数

        Args:
            users: 用户 ID [batch_size]
        Returns:
            预测分数矩阵 [batch_size, num_items]
        """
        user_emb, item_emb = self.compute_all_embeddings()
        u_emb = user_emb[users]  # [batch, dim]
        scores = torch.mm(u_emb, item_emb.T)  # [batch, num_items]
        return scores

def sample_negative(user_item_dict: dict, num_items: int, user: int) -> int:
    """随机负采样:采一个用户未交互过的物品"""
    while True:
        neg = random.randint(0, num_items - 1)
        if neg not in user_item_dict[user]:
            return neg

def train_lightgcn(model: LightGCN, train_pairs: np.ndarray,
                   n_epochs: int = 50, batch_size: int = 1024, lr: float = 1e-3):
    """
    LightGCN 训练循环

    Args:
        model: LightGCN 模型
        train_pairs: [N, 2] 训练交互对 (user, item)
        n_epochs: 训练轮数
        batch_size: 批大小
        lr: 学习率
    """
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # 构建用户交互字典(用于负采样)
    user_item_dict = defaultdict(set)  # defaultdict访问不存在的键时返回默认值
    for u, i in train_pairs:
        user_item_dict[u].add(i)

    model.train()  # train()训练模式
    for epoch in range(n_epochs):
        # 随机打乱训练数据
        perm = np.random.permutation(len(train_pairs))
        total_loss = 0.0
        n_batches = 0

        for start in range(0, len(train_pairs), batch_size):
            end = min(start + batch_size, len(train_pairs))
            batch_idx = perm[start:end]
            batch_pairs = train_pairs[batch_idx]

            users = torch.LongTensor(batch_pairs[:, 0])
            pos_items = torch.LongTensor(batch_pairs[:, 1])
            neg_items = torch.LongTensor([
                sample_negative(user_item_dict, model.num_items, u)
                for u in batch_pairs[:, 0]
            ])

            loss = model.bpr_loss(users, pos_items, neg_items)

            optimizer.zero_grad()  # 清零梯度
            loss.backward()  # 反向传播计算梯度
            optimizer.step()  # 更新参数

            total_loss += loss.item()  # 将单元素张量转为Python数值
            n_batches += 1

        if (epoch + 1) % 10 == 0:
            avg_loss = total_loss / n_batches
            print(f"Epoch {epoch+1}/{n_epochs}, Loss: {avg_loss:.4f}")

# ---- 快速验证 ----
if __name__ == "__main__":
    # 模拟小规模数据
    num_users, num_items = 100, 200
    n_interactions = 1000

    train_pairs = np.column_stack([
        np.random.randint(0, num_users, n_interactions),
        np.random.randint(0, num_items, n_interactions)
    ])

    model = LightGCN(num_users, num_items, embedding_dim=64, n_layers=3)
    model.set_graph(train_pairs)

    print("开始训练 LightGCN...")
    train_lightgcn(model, train_pairs, n_epochs=50, batch_size=256)

    # 预测
    model.eval()
    with torch.no_grad():  # 禁用梯度计算,节省内存
        test_users = torch.LongTensor([0, 1, 2])
        scores = model.predict(test_users)
        print(f"\n预测分数矩阵形状: {scores.shape}")  # [3, 200]

        # 每个用户的 Top-5 推荐
        _, top_items = scores.topk(5, dim=1)
        for idx, uid in enumerate(test_users.tolist()):  # enumerate同时获取索引和元素
            print(f"用户 {uid} 的 Top-5 推荐: {top_items[idx].tolist()}")

💡 面试考点

Q:LightGCN 为什么比 NGCF 效果好? A:NGCF 在每层使用了特征变换 \(W\) 和非线性激活 \(\sigma\),但在推荐场景中用户/物品没有丰富的初始特征(只有 ID Embedding),特征变换反而引入了不必要的参数和优化困难。LightGCN 移除这些组件后:(1) 模型更简洁,过拟合风险更低;(2) 训练更稳定、更快;(3) 邻域聚合本身就足以学到高质量的协同信号。

Q:LightGCN 的 over-smoothing 问题如何理解? A:随着 GCN 层数增加,所有节点的表征趋于相同(过度平滑)。对于 LightGCN,层组合机制缓解了这一问题——即使高层表征趋于平坦,低层表征仍保留了局部信息,最终结果是多层的加权组合。实践中通常使用 2-3 层。


19.4 PinSage(工业级图推荐)

19.4.1 Pinterest 的图推荐系统

PinSage(Ying et al., KDD 2018) 是 Pinterest 设计的工业级图推荐系统,用于为30 亿节点、180 亿边的用户-Pin 图生成高质量的 Pin Embedding。

Text Only
Pinterest 的推荐场景:

  用户浏览 → Pin(图片) → Board(收藏夹)

  构成二部图:Pin ←→ Board
  - 一个 Board 包含多个 Pin
  - 一个 Pin 可以出现在多个 Board 中
  - 通过 Board 连接的 Pin 具有语义相似性

  目标:为每个 Pin 生成 Embedding → 用于相关推荐

19.4.2 PinSage 的核心创新

传统 GCN 需要对整个图进行全图卷积,在十亿节点级别完全不可行。PinSage 的解决方案:

1. 随机游走采样邻居

不使用全部邻居,而是通过随机游走选择最重要的邻居:

Text Only
全图卷积 vs 随机游走采样:

全图卷积(GCN):
  节点 v 的邻居 = N(v)(所有直接邻居)
  → 热门节点可能有百万邻居 → 内存爆炸

PinSage 随机游走采样:
  1. 从节点 v 出发,进行多次随机游走
  2. 统计被访问到的节点的频次
  3. 选择频次最高的 Top-T 个节点作为邻居
  → 邻居数量固定为 T → 内存可控
  → 被多次访问的节点更重要 → 信息更丰富

2. Importance Pooling

基于随机游走频次的加权聚合:

\[e_v^{(l+1)} = \text{ReLU}\left(W \cdot \text{MEAN}\left(\{c(u) \cdot e_u^{(l)} : u \in \mathcal{S}(v)\}\right)\right)\]

其中 \(c(u)\) 是节点 \(u\) 在随机游走中被访问的频次(归一化后),\(\mathcal{S}(v)\) 是采样后的邻居集合。

3. MapReduce 图采样策略

Text Only
Producer-Consumer 训练架构:

  CPU 进程(Producer):
    ├── 对每个 mini-batch 的节点,执行随机游走
    ├── 采样 K 层邻居(构建计算子图)
    └── 将子图放入队列

  GPU 进程(Consumer):
    ├── 从队列获取子图
    ├── 在子图上执行 GCN 前向和反向传播
    └── 更新模型参数

  → CPU 和 GPU 流水线并行,消除 IO 瓶颈

19.4.3 PinSage 工业部署经验

挑战 解决方案
30 亿节点无法全部存入内存 MapReduce 分布式聚合,每次仅处理子图
热门 Pin 邻居数量极大 随机游走采样固定 T 个邻居
训练效率低 CPU 采样 + GPU 计算流水线
负样本质量差 Hard Negative Mining:用上一轮 Embedding 检索相似但未交互的物品
增量更新 新增 Pin 只需在其子图上做局部推理,无需全图重训

19.4.4 PinSage 的简化 PyTorch 实现思路

Python
import torch
import torch.nn as nn
import random
from collections import Counter

class PinSageLayer(nn.Module):
    """
    PinSage 单层聚合(简化版本)

    核心:随机游走采样 + Importance Pooling
    """
    def __init__(self, in_dim: int, out_dim: int):
        super().__init__()
        self.linear = nn.Linear(in_dim, out_dim)
        self.relu = nn.ReLU()

    def random_walk_sample(self, graph: dict, node: int,
                           walk_length: int = 10, num_walks: int = 5,
                           top_k: int = 15) -> list:
        """
        随机游走采样邻居

        Args:
            graph: 邻接表 {node: [neighbors]}
            node: 起始节点
            walk_length: 游走长度
            num_walks: 游走次数
            top_k: 选取的邻居数量
        Returns:
            [(neighbor, visit_count), ...] 按频次排序的 Top-K 邻居
        """
        visit_count = Counter()  # Counter统计元素出现次数

        for _ in range(num_walks):
            current = node
            for _ in range(walk_length):
                neighbors = graph.get(current, [])
                if not neighbors:
                    break
                current = random.choice(neighbors)
                if current != node:
                    visit_count[current] += 1

        # 返回频次最高的 Top-K 个邻居
        return visit_count.most_common(top_k)

    def forward(self, node_embs: torch.Tensor, neighbor_info: list) -> torch.Tensor:
        """
        Importance Pooling 聚合

        Args:
            node_embs: 所有节点的 Embedding [num_nodes, dim]
            neighbor_info: 每个目标节点的邻居信息
                           [[(neighbor_id, count), ...], ...]
        Returns:
            更新后的节点 Embedding [num_target_nodes, out_dim]
        """
        outputs = []
        for neighbors_with_counts in neighbor_info:
            if not neighbors_with_counts:
                # 无邻居,使用零向量
                outputs.append(torch.zeros(node_embs.shape[1]))
                continue

            ids, counts = zip(*neighbors_with_counts)  # zip按位置配对
            ids = list(ids)
            # 归一化频次作为权重
            weights = torch.FloatTensor(counts)
            weights = weights / weights.sum()

            # 加权聚合邻居 Embedding
            neighbor_emb = node_embs[ids]  # [K, dim]
            pooled = (weights.unsqueeze(1) * neighbor_emb).sum(dim=0)  # [dim]  # unsqueeze增加一个维度
            outputs.append(pooled)

        stacked = torch.stack(outputs, dim=0)  # [batch, dim]
        return self.relu(self.linear(stacked))

# ---- 使用示例 ----
if __name__ == "__main__":
    num_nodes = 1000
    dim = 64

    # 模拟邻接表
    graph = {i: random.sample(range(num_nodes), min(20, num_nodes))
             for i in range(num_nodes)}

    # 初始 Embedding
    node_embs = torch.randn(num_nodes, dim)

    # 对 batch 中的节点进行随机游走采样
    layer = PinSageLayer(dim, dim)
    target_nodes = [0, 1, 2, 3, 4]

    neighbor_info = [
        layer.random_walk_sample(graph, node, walk_length=10,
                                  num_walks=5, top_k=15)
        for node in target_nodes
    ]

    updated = layer(node_embs, neighbor_info)
    print(f"更新后 Embedding 形状: {updated.shape}")  # [5, 64]

💡 面试考点

Q:PinSage 如何解决 GCN 的扩展性问题? A:三个关键技术:(1) 随机游走采样替代全邻居聚合,将邻居数量从百万级固定为 Top-T 个,内存可控;(2) mini-batch 训练,每次只构建子图而非全图计算;(3) CPU-GPU 流水线,CPU 负责图采样,GPU 负责模型计算,并行执行消除 IO 瓶颈。此外,Hard Negative Mining 提升了训练质量。

Q:随机游走采样 vs 均匀采样邻居,哪个更好?为什么? A:随机游走采样更好。均匀采样随机选择邻居,高重要性和低重要性的邻居被平等对待。随机游走则隐式地衡量了邻居的"可达性"——被多次访问的邻居与目标节点的关系更强,这相当于一种 importance sampling。


19.5 知识图谱增强推荐

19.5.1 知识图谱在推荐中的作用

知识图谱(Knowledge Graph, KG) 提供了物品的结构化属性信息:

Text Only
知识图谱示例(电影推荐):

  《肖申克的救赎》──导演──→ 弗兰克·德拉邦特
                   ──类型──→ 犯罪/剧情
                   ──演员──→ 蒂姆·罗宾斯
                   ──年份──→ 1994

  《绿里奇迹》    ──导演──→ 弗兰克·德拉邦特   ← 同一导演!
                   ──类型──→ 犯罪/剧情         ← 同一类型!
                   ──演员──→ 汤姆·汉克斯

  → 通过 KG,系统可以推理:
    喜欢《肖申克》 → 可能喜欢同导演/同类型的《绿里》
    这比纯协同过滤多了"为什么推荐"的解释能力

KG 在推荐中的三大价值

价值 说明
可解释性 "因为你看过同导演的 XX 电影"
冷启动缓解 新物品没有交互,但有 KG 属性
语义丰富 捕获物品之间的深层语义关系

19.5.2 KGAT:Knowledge-Graph Attention Network

KGAT(Wang et al., KDD 2019) 将用户-物品交互图和知识图谱统一为一个协作知识图(CKG),并使用注意力机制学习邻居的重要性。

CKG 构建

\[\text{CKG} = \{(h, r, t) | (h, r, t) \in \text{KG}\} \cup \{(u, \text{interact}, i) | (u, i) \in \text{交互}\}\]
Text Only
协作知识图(CKG):

  User_A ──interact──→ 电影_1 ──directed_by──→ 导演_X
    │                    │
  interact            genre_of
    │                    │
    ▼                    ▼
  电影_2 ──acted_by──→ 演员_Y ──acted_in──→ 电影_3

KGAT 的注意力聚合

\[e_v^{(l+1)} = \sigma\left(\sum_{(v, r, u) \in \mathcal{N}(v)} \alpha(v, r, u) \cdot W_r e_u^{(l)}\right)\]

其中注意力权重:

\[\alpha(v, r, u) = \frac{\exp\left(\text{LeakyReLU}(e_v^T W_r e_u)\right)}{\sum_{(v, r', u') \in \mathcal{N}(v)} \exp\left(\text{LeakyReLU}(e_v^T W_{r'} e_{u'})\right)}\]

关键点:不同关系类型 \(r\) 有不同的变换矩阵 \(W_r\)

19.5.3 简化 KGAT PyTorch 实现

Python
import torch
import torch.nn as nn
import torch.nn.functional as F

class KGATLayer(nn.Module):
    """
    KGAT 注意力聚合层(简化版)

    对每个节点,根据不同关系类型计算邻居的注意力权重,
    加权聚合邻居信息。
    """
    def __init__(self, emb_dim: int, n_relations: int, dropout: float = 0.1):
        super().__init__()
        self.emb_dim = emb_dim

        # 每种关系一个变换矩阵
        self.W_r = nn.Parameter(torch.FloatTensor(n_relations, emb_dim, emb_dim))
        nn.init.xavier_uniform_(self.W_r.view(-1, emb_dim))  # 重塑张量形状

        # 注意力计算
        self.leaky_relu = nn.LeakyReLU(0.2)
        self.dropout = nn.Dropout(dropout)

    def compute_attention(self, head_emb: torch.Tensor, tail_emb: torch.Tensor,
                          relation_ids: torch.Tensor) -> torch.Tensor:
        """
        计算注意力分数

        Args:
            head_emb: 头实体 Embedding [num_edges, dim]
            tail_emb: 尾实体 Embedding [num_edges, dim]
            relation_ids: 关系类型 ID [num_edges]
        Returns:
            注意力分数 [num_edges]
        """
        # 获取对应关系的变换矩阵
        W = self.W_r[relation_ids]  # [num_edges, dim, dim]

        # 变换尾实体 Embedding
        tail_transformed = torch.bmm(W, tail_emb.unsqueeze(2)).squeeze(2)  # [num_edges, dim]  # squeeze压缩维度

        # 计算注意力分数
        scores = (head_emb * tail_transformed).sum(dim=1)  # [num_edges]
        scores = self.leaky_relu(scores)
        return scores

    def forward(self, entity_emb: torch.Tensor, edge_index: torch.Tensor,
                relation_ids: torch.Tensor) -> torch.Tensor:
        """
        前向传播

        Args:
            entity_emb: 实体 Embedding [num_entities, dim]
            edge_index: 边索引 [2, num_edges],第 0 行是头实体,第 1 行是尾实体
            relation_ids: 关系类型 [num_edges]
        Returns:
            更新后的实体 Embedding [num_entities, dim]
        """
        heads, tails = edge_index[0], edge_index[1]
        head_emb = entity_emb[heads]  # [num_edges, dim]
        tail_emb = entity_emb[tails]  # [num_edges, dim]

        # 计算注意力分数
        attn_scores = self.compute_attention(head_emb, tail_emb, relation_ids)

        # 按头实体分组做 softmax(使用 scatter 实现)
        num_entities = entity_emb.shape[0]
        attn_weights = torch.zeros(heads.shape[0], device=entity_emb.device)

        # 简化的 softmax:对每个头节点的所有边做归一化
        max_scores = torch.zeros(num_entities, device=entity_emb.device)
        max_scores.scatter_reduce_(0, heads, attn_scores, reduce='amax', include_self=True)

        exp_scores = torch.exp(attn_scores - max_scores[heads])
        sum_exp = torch.zeros(num_entities, device=entity_emb.device)
        sum_exp.scatter_add_(0, heads, exp_scores)
        attn_weights = exp_scores / (sum_exp[heads] + 1e-8)
        attn_weights = self.dropout(attn_weights)

        # 获取对应关系的变换矩阵,变换尾实体
        W = self.W_r[relation_ids]
        tail_transformed = torch.bmm(W, tail_emb.unsqueeze(2)).squeeze(2)

        # 加权聚合
        weighted = attn_weights.unsqueeze(1) * tail_transformed  # [num_edges, dim]

        # 聚合到头实体
        output = torch.zeros(num_entities, self.emb_dim, device=entity_emb.device)
        output.scatter_add_(0, heads.unsqueeze(1).expand_as(weighted), weighted)

        return F.elu(output)

class KGAT(nn.Module):
    """
    简化的 KGAT 模型
    """
    def __init__(self, num_entities: int, num_relations: int,
                 emb_dim: int = 64, n_layers: int = 2):
        super().__init__()
        self.entity_embedding = nn.Embedding(num_entities, emb_dim)
        nn.init.xavier_uniform_(self.entity_embedding.weight)

        self.layers = nn.ModuleList([
            KGATLayer(emb_dim, num_relations) for _ in range(n_layers)
        ])

    def forward(self, edge_index: torch.Tensor,
                relation_ids: torch.Tensor) -> torch.Tensor:
        """
        Returns:
            最终实体 Embedding [num_entities, dim]
        """
        x = self.entity_embedding.weight
        emb_list = [x]

        for layer in self.layers:
            x = layer(x, edge_index, relation_ids)
            emb_list.append(x)

        # 层组合
        return torch.stack(emb_list, dim=0).mean(dim=0)

# ---- 使用示例 ----
if __name__ == "__main__":
    num_entities = 500   # 用户 + 物品 + KG 实体
    num_relations = 10   # 关系类型数
    num_edges = 2000

    # 随机生成知识图谱边
    edge_index = torch.stack([
        torch.randint(0, num_entities, (num_edges,)),
        torch.randint(0, num_entities, (num_edges,))
    ])
    relation_ids = torch.randint(0, num_relations, (num_edges,))

    model = KGAT(num_entities, num_relations, emb_dim=64, n_layers=2)
    entity_embs = model(edge_index, relation_ids)
    print(f"实体 Embedding 形状: {entity_embs.shape}")  # [500, 64]

💡 面试考点

Q:KG 推荐的优势和局限是什么? A:优势:(1) 提供可解释性——可以用路径解释推荐原因;(2) 缓解冷启动——新物品可通过 KG 属性与老物品关联;(3) 引入领域知识——让推荐更准确。局限:(1) KG 构建和维护成本高;(2) KG 质量直接影响推荐质量(噪声、不完整);(3) 计算复杂度较高。


19.6 图对比学习推荐

19.6.1 为什么需要图对比学习

传统 GNN 推荐存在的问题:

Text Only
GNN 推荐的监督信号问题:

  1. 数据稀疏:用户-物品交互极度稀疏(通常 < 1%)
     → 监督信号不足 → Embedding 质量差

  2. 长尾分布:少数热门物品占大部分交互
     → 冷门物品的 Embedding 得不到充分训练

  3. 噪声交互:用户点击不代表真正喜欢
     → 带噪训练信号

对比学习的思路:
  引入自监督信号 → 增强 Embedding 的表征能力 → 弥补监督信号不足

19.6.2 SGL:Self-supervised Graph Learning

SGL(Wu et al., SIGIR 2021) 将图数据增强与对比学习引入推荐:

三种图增强策略

  1. Node Dropout (ND):随机丢弃节点
  2. Edge Dropout (ED):随机丢弃边
  3. Random Walk (RW):随机游走生成子图
Text Only
SGL 训练框架:

  原始图 G
    ├── 增强视图 1(G')──→ GCN ──→ Embedding z'
    ├── 增强视图 2(G'')──→ GCN ──→ Embedding z''
    └── 原始图 ──→ GCN ──→ Embedding z(用于推荐任务)

  对比学习目标:
    同一节点在 G' 和 G'' 中的表征应相似(正样本对)
    不同节点的表征应不同(负样本对)

  总损失 = BPR Loss + λ · InfoNCE Loss

InfoNCE Loss

\[\mathcal{L}_{cl}^{user} = \sum_{u \in \mathcal{U}} -\log \frac{\exp(z_u' \cdot z_u'' / \tau)}{\sum_{v \in \mathcal{U}} \exp(z_u' \cdot z_v'' / \tau)}\]

其中 \(\tau\) 是温度参数。

19.6.3 SimGCL:更简单的图对比学习

SimGCL(Yu et al., SIGIR 2022) 提出一个关键发现:不需要图结构增强,只需在 Embedding 上加噪声即可

Text Only
SGL vs SimGCL:

SGL:
  原始图 → 图增强(丢边/丢节点) → 两个增强图 → GCN → 对比学习
  问题:图增强改变了图结构,可能丢失重要信息

SimGCL:
  原始图 → GCN → Embedding → 加随机噪声 → 两个扰动表征 → 对比学习
  优势:不修改图结构,计算更高效,效果更好

SimGCL 的噪声扰动

\[\tilde{e}_u = e_u + \Delta_u, \quad \Delta_u = \epsilon \cdot \frac{\Delta}{\|\Delta\|}, \quad \Delta \sim \text{Uniform}\]

\(\epsilon\) 控制扰动幅度。

19.6.4 代码示例:图对比学习训练框架

Python
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class GraphContrastiveLearning(nn.Module):
    """
    图对比学习推荐框架

    支持两种模式:
    1. SGL 风格:通过边丢弃进行图增强
    2. SimGCL 风格:直接在 Embedding 上加噪声
    """
    def __init__(self, num_users: int, num_items: int, emb_dim: int = 64,
                 n_layers: int = 3, temperature: float = 0.2,
                 cl_weight: float = 0.1, noise_eps: float = 0.1,
                 mode: str = 'simgcl'):
        """
        Args:
            mode: 'sgl' 或 'simgcl'
        """
        super().__init__()
        self.num_users = num_users
        self.num_items = num_items
        self.n_layers = n_layers
        self.temperature = temperature
        self.cl_weight = cl_weight
        self.noise_eps = noise_eps
        self.mode = mode

        self.user_embedding = nn.Embedding(num_users, emb_dim)
        self.item_embedding = nn.Embedding(num_items, emb_dim)
        nn.init.xavier_uniform_(self.user_embedding.weight)
        nn.init.xavier_uniform_(self.item_embedding.weight)

        self.adj_norm = None  # 原始归一化邻接矩阵

    def set_graph(self, adj_norm: torch.Tensor):
        """设置图的邻接矩阵"""
        self.adj_norm = adj_norm

    def lightgcn_propagate(self, adj: torch.Tensor,
                           add_noise: bool = False) -> tuple:
        """
        LightGCN 风格的图传播

        Args:
            adj: 邻接矩阵
            add_noise: 是否在每层传播后添加噪声(SimGCL)
        """
        all_emb = torch.cat([self.user_embedding.weight,
                             self.item_embedding.weight], dim=0)
        emb_list = [all_emb]

        for _ in range(self.n_layers):
            all_emb = torch.spmm(adj, all_emb)
            if add_noise:
                # SimGCL:添加均匀分布的归一化噪声
                noise = torch.rand_like(all_emb)
                noise = F.normalize(noise, dim=1) * self.noise_eps
                all_emb = all_emb + noise
            emb_list.append(all_emb)

        final_emb = torch.stack(emb_list, dim=0).mean(dim=0)
        user_emb = final_emb[:self.num_users]
        item_emb = final_emb[self.num_users:]
        return user_emb, item_emb

    def info_nce_loss(self, emb1: torch.Tensor, emb2: torch.Tensor,
                      indices: torch.Tensor) -> torch.Tensor:
        """
        InfoNCE 对比损失

        Args:
            emb1: 视图 1 的 Embedding [N, dim]
            emb2: 视图 2 的 Embedding [N, dim]
            indices: 目标节点索引 [batch_size]
        Returns:
            InfoNCE loss
        """
        z1 = F.normalize(emb1[indices], dim=1)  # [batch, dim]
        z2 = F.normalize(emb2[indices], dim=1)  # [batch, dim]

        # 正样本相似度
        pos_sim = (z1 * z2).sum(dim=1) / self.temperature  # [batch]

        # 所有节点作为负样本
        all_z2 = F.normalize(emb2, dim=1)  # [N, dim]
        neg_sim = torch.mm(z1, all_z2.T) / self.temperature  # [batch, N]

        # InfoNCE
        loss = -pos_sim + torch.logsumexp(neg_sim, dim=1)
        return loss.mean()

    def forward(self, users: torch.Tensor, pos_items: torch.Tensor,
                neg_items: torch.Tensor) -> torch.Tensor:
        """
        计算总损失 = BPR Loss + λ · CL Loss
        """
        # ---- 主任务:BPR Loss ----
        user_emb, item_emb = self.lightgcn_propagate(self.adj_norm, add_noise=False)

        u = user_emb[users]
        p = item_emb[pos_items]
        n = item_emb[neg_items]

        pos_scores = (u * p).sum(dim=1)
        neg_scores = (u * n).sum(dim=1)
        bpr_loss = -F.logsigmoid(pos_scores - neg_scores).mean()

        # ---- 对比学习:InfoNCE Loss ----
        if self.mode == 'simgcl':
            # SimGCL:两次带噪声的传播生成两个视图
            user_emb_1, item_emb_1 = self.lightgcn_propagate(
                self.adj_norm, add_noise=True)
            user_emb_2, item_emb_2 = self.lightgcn_propagate(
                self.adj_norm, add_noise=True)
        else:
            # SGL:需要两个增强图(这里简化为两次独立的 edge dropout)
            user_emb_1, item_emb_1 = self.lightgcn_propagate(
                self.adj_norm, add_noise=False)
            user_emb_2, item_emb_2 = self.lightgcn_propagate(
                self.adj_norm, add_noise=False)

        cl_loss_user = self.info_nce_loss(user_emb_1, user_emb_2, users)
        cl_loss_item = self.info_nce_loss(item_emb_1, item_emb_2, pos_items)
        cl_loss = cl_loss_user + cl_loss_item

        return bpr_loss + self.cl_weight * cl_loss

# ---- 使用示例 ----
if __name__ == "__main__":
    from scipy.sparse import coo_matrix

    num_users, num_items = 100, 200
    emb_dim = 64
    n_nodes = num_users + num_items

    # 构造随机交互图
    n_edges = 500
    users_arr = np.random.randint(0, num_users, n_edges)
    items_arr = np.random.randint(0, num_items, n_edges) + num_users

    row = np.concatenate([users_arr, items_arr])
    col = np.concatenate([items_arr, users_arr])
    data = np.ones(len(row), dtype=np.float32)
    adj = coo_matrix((data, (row, col)), shape=(n_nodes, n_nodes))

    degree = np.array(adj.sum(1)).flatten()
    d_inv_sqrt = np.power(degree + 1e-8, -0.5)
    adj.data = adj.data * d_inv_sqrt[adj.row] * d_inv_sqrt[adj.col]

    indices = torch.LongTensor(np.vstack([adj.row, adj.col]))
    values = torch.FloatTensor(adj.data)
    adj_norm = torch.sparse_coo_tensor(indices, values, torch.Size([n_nodes, n_nodes]))

    # 创建模型并训练
    model = GraphContrastiveLearning(num_users, num_items, emb_dim=emb_dim, mode='simgcl')
    model.set_graph(adj_norm)

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    for epoch in range(20):
        users = torch.randint(0, num_users, (128,))
        pos_items = torch.randint(0, num_items, (128,))
        neg_items = torch.randint(0, num_items, (128,))

        loss = model(users, pos_items, neg_items)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (epoch + 1) % 5 == 0:
            print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")

💡 面试考点

Q:SGL 和 SimGCL 的核心区别是什么? A:SGL 通过图结构增强(丢边/丢节点/随机游走)生成两个增强视图,然后做对比学习。SimGCL 发现图结构增强不是必须的,直接在 GCN 传播过程中对 Embedding 加随机噪声(均匀扰动)即可,而且效果更好、训练更快。SimGCL 的核心见解是:对比学习的关键在于表征空间的均匀性(uniformity),而不是图增强本身。


19.7 最新进展与前沿

19.7.1 GNN + LLM 推荐融合

大语言模型(LLM)和图神经网络的结合是当前推荐系统研究的热点方向:

Text Only
融合范式:

1. LLM 生成节点特征 → GNN 学习图结构
   ┌──────────────────────────────────────────┐
   │  用 LLM 为物品生成文本 Embedding:        │
   │  e_item = LLM("一部由诺兰执导的科幻电影") │
   │  然后将 e_item 作为 GNN 的初始节点特征     │
   └──────────────────────────────────────────┘

2. GNN 提供结构信息 → 增强 LLM 的推荐能力
   ┌──────────────────────────────────────────┐
   │  用 GNN 生成用户/物品的结构化 Embedding    │
   │  将结构 Embedding 作为 Prompt 注入 LLM     │
   │  LLM 结合结构信息进行推荐推理              │
   └──────────────────────────────────────────┘

3. 联合训练
   ┌──────────────────────────────────────────┐
   │  LLM 和 GNN 端到端联合优化               │
   │  LLM 负责语义理解,GNN 负责协同过滤       │
   │  对齐两个空间的表征                       │
   └──────────────────────────────────────────┘

代表性工作

工作 方法 要点
RLMRec (2024) LLM 生成用户/物品画像文本 → 文本 Embedding 与 GNN Embedding 对齐 利用 LLM 世界知识增强推荐
LLMRec (2023) LLM 增强图推荐的三种策略 数据增强+特征增强+知识注入
GraphGPT (2024) 对齐图编码器和 LLM 通用图理解能力

19.7.2 异质信息网络推荐

异质信息网络(Heterogeneous Information Network, HIN) 包含多种类型的节点和边

Text Only
异质图示例(电商推荐):

  节点类型:用户、商品、品牌、类目、商家
  边类型:购买、浏览、属于_品牌、属于_类目、上架_商家

  用户A ──购买──→ 商品X ──属于_品牌──→ 品牌P
                属于_类目
                  类目C ←──属于_类目── 商品Y

  元路径(Meta-path):
    用户-购买-商品-属于品牌-品牌-属于品牌-商品
    → 这条路径连接了同品牌的商品

核心技术: - 元路径注意力(meta-path attention):学习不同类型路径的重要性 - 语义级聚合:先在每种元路径内聚合,再跨元路径聚合 - 代表工作:HAN(Heterogeneous Graph Attention Network)、HERec

19.7.3 超图推荐

超图(Hypergraph) 中的边(超边)可以连接两个以上的节点,比普通图更具表达力:

Text Only
普通图 vs 超图:

普通图:
  一条边连接 2 个节点
  用户 A → 物品 X

超图:
  一条超边连接任意多个节点
  超边 e = {用户A, 物品X, 物品Y, 物品Z}
  → 表示"用户A在一次购物session中购买了X,Y,Z"
  → 捕获群组级别的交互关系

应用场景: - 会话推荐:一个 session 中的物品构成超边 - 社交推荐:一个群组的成员构成超边 - 跨域推荐:共享实体的多个域构成超边

代表工作:DHCF(Dual Channel Hypergraph Collaborative Filtering)、HyperRec

19.7.4 图推荐的公平性与去偏

Text Only
图推荐中的偏差问题:

1. 流行度偏差(Popularity Bias):
   度大的节点(热门物品)在消息传递中被"放大"
   → 推荐结果偏向热门物品

2. 度偏差(Degree Bias):
   高度节点和低度节点的 Embedding 质量差异大
   → 冷门物品/新用户表征不准确

3. 反馈循环(Feedback Loop):
   推荐 → 更多曝光 → 更多交互 → 更高度数 → 更多推荐
   → 马太效应

解决方案: - 因果去偏:使用因果推断区分直接效应和混淆效应(参考第18章因果推荐部分) - 反事实数据增强:生成反事实图进行训练 - 公平性约束:在损失函数中加入公平性正则项 - 度感知归一化:针对不同度数的节点使用不同的归一化策略


19.8 实战:基于 LightGCN 的推荐系统

19.8.1 项目概述

Text Only
实战目标:
  在 MovieLens-100K 数据集上实现完整的 LightGCN 推荐系统

包含:
  1. 数据加载与预处理
  2. 用户-物品二部图构建
  3. LightGCN 模型训练
  4. 评估指标计算(Recall@K, NDCG@K)
  5. 结果分析

19.8.2 完整端到端代码

Python
"""
LightGCN 推荐系统完整实战
数据集:MovieLens-100K

运行方式:python lightgcn_movielens.py
依赖:torch, numpy, scipy, pandas
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from scipy.sparse import coo_matrix
from collections import defaultdict
import random
import time
import os
import urllib.request
import zipfile

# ============================================================
# 第 1 部分:数据加载与预处理
# ============================================================

def download_movielens_100k(data_dir: str = './ml-100k'):
    """下载 MovieLens-100K 数据集"""
    if os.path.exists(os.path.join(data_dir, 'u.data')):
        print("数据集已存在,跳过下载。")
        return

    url = 'https://files.grouplens.org/datasets/movielens/ml-100k.zip'
    zip_path = 'ml-100k.zip'

    print("正在下载 MovieLens-100K...")
    urllib.request.urlretrieve(url, zip_path)

    with zipfile.ZipFile(zip_path, 'r') as z:
        z.extractall('.')
    os.remove(zip_path)
    print("下载完成。")

def load_and_split_data(data_dir: str = './ml-100k', test_ratio: float = 0.2):
    """
    加载并划分数据集

    按时间排序后,每个用户最后 20% 的交互作为测试集

    Returns:
        train_pairs: [N_train, 2] (user_id, item_id)
        test_dict: {user_id: set(item_ids)} 测试集
        num_users, num_items
    """
    # 读取数据:user_id  item_id  rating  timestamp
    data = []
    filepath = os.path.join(data_dir, 'u.data')
    with open(filepath, 'r') as f:  # with自动管理文件关闭
        for line in f:
            parts = line.strip().split('\t')
            user, item, rating, ts = int(parts[0]), int(parts[1]), \
                                      float(parts[2]), int(parts[3])
            if rating >= 3.0:  # 只保留正向交互(评分 ≥ 3)
                data.append((user, item, ts))

    # 重新编号:从 0 开始
    users = sorted(set(d[0] for d in data))
    items = sorted(set(d[1] for d in data))
    user_map = {u: i for i, u in enumerate(users)}
    item_map = {it: i for i, it in enumerate(items)}

    num_users = len(users)
    num_items = len(items)

    # 按用户分组并按时间排序
    user_interactions = defaultdict(list)
    for user, item, ts in data:
        user_interactions[user_map[user]].append((item_map[item], ts))

    train_pairs = []
    test_dict = defaultdict(set)

    for user, interactions in user_interactions.items():
        # 按时间排序
        interactions.sort(key=lambda x: x[1])  # lambda匿名函数
        items_sorted = [it for it, _ in interactions]

        # 划分训练集和测试集
        n_test = max(1, int(len(items_sorted) * test_ratio))
        train_items = items_sorted[:-n_test]
        test_items = items_sorted[-n_test:]

        for item in train_items:
            train_pairs.append([user, item])
        for item in test_items:
            test_dict[user].add(item)

    train_pairs = np.array(train_pairs)

    print(f"用户数: {num_users}, 物品数: {num_items}")
    print(f"训练交互数: {len(train_pairs)}, 测试用户数: {len(test_dict)}")
    print(f"稀疏度: {1 - len(train_pairs) / (num_users * num_items):.4f}")

    return train_pairs, dict(test_dict), num_users, num_items

# ============================================================
# 第 2 部分:图构建
# ============================================================

def build_adj_matrix(train_pairs: np.ndarray, num_users: int,
                     num_items: int) -> torch.Tensor:
    """
    构建归一化邻接矩阵(用户-物品二部图)

    邻接矩阵结构:
    A = | 0   R |
        | R^T 0 |
    其中 R 是用户-物品交互矩阵

    归一化:Ã = D^{-1/2} A D^{-1/2}
    """
    n_nodes = num_users + num_items

    users = train_pairs[:, 0]
    items = train_pairs[:, 1] + num_users

    # 对称邻接矩阵
    row = np.concatenate([users, items])
    col = np.concatenate([items, users])
    data = np.ones(len(row), dtype=np.float32)

    adj = coo_matrix((data, (row, col)), shape=(n_nodes, n_nodes))

    # 归一化
    degree = np.array(adj.sum(axis=1)).flatten()
    d_inv_sqrt = np.power(degree, -0.5)
    d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.0
    adj.data = adj.data * d_inv_sqrt[adj.row] * d_inv_sqrt[adj.col]

    # 转 PyTorch 稀疏张量
    indices = torch.LongTensor(np.vstack([adj.row, adj.col]))
    values = torch.FloatTensor(adj.data)
    return torch.sparse_coo_tensor(indices, values, torch.Size([n_nodes, n_nodes]))

# ============================================================
# 第 3 部分:LightGCN 模型
# ============================================================

class LightGCNFull(nn.Module):
    """带完整评估功能的 LightGCN"""

    def __init__(self, num_users: int, num_items: int,
                 emb_dim: int = 64, n_layers: int = 3, reg: float = 1e-4):
        super().__init__()
        self.num_users = num_users
        self.num_items = num_items
        self.n_layers = n_layers
        self.reg = reg

        self.user_emb = nn.Embedding(num_users, emb_dim)
        self.item_emb = nn.Embedding(num_items, emb_dim)
        nn.init.xavier_uniform_(self.user_emb.weight)
        nn.init.xavier_uniform_(self.item_emb.weight)

    def propagate(self, adj: torch.Tensor) -> tuple:
        """LightGCN 传播 + 层组合"""
        all_emb = torch.cat([self.user_emb.weight, self.item_emb.weight], dim=0)
        emb_list = [all_emb]

        for _ in range(self.n_layers):
            all_emb = torch.spmm(adj, all_emb)
            emb_list.append(all_emb)

        final = torch.stack(emb_list, dim=0).mean(dim=0)
        return final[:self.num_users], final[self.num_users:]

    def bpr_loss(self, adj: torch.Tensor, users: torch.Tensor,
                 pos: torch.Tensor, neg: torch.Tensor) -> torch.Tensor:
        user_e, item_e = self.propagate(adj)

        u = user_e[users]
        p = item_e[pos]
        n = item_e[neg]

        pos_s = (u * p).sum(dim=1)
        neg_s = (u * n).sum(dim=1)
        loss = -F.logsigmoid(pos_s - neg_s).mean()

        # L2 正则(仅初始 Embedding)
        reg = self.reg * (
            self.user_emb(users).norm(2).pow(2) +
            self.item_emb(pos).norm(2).pow(2) +
            self.item_emb(neg).norm(2).pow(2)
        ) / len(users)

        return loss + reg

# ============================================================
# 第 4 部分:评估指标
# ============================================================

def evaluate(model: LightGCNFull, adj: torch.Tensor,
             train_user_items: dict, test_dict: dict,
             top_k: int = 20) -> dict:
    """
    评估推荐模型(Recall@K, NDCG@K)

    Args:
        model: 训练好的模型
        adj: 归一化邻接矩阵
        train_user_items: {user: set(items)} 训练集交互
        test_dict: {user: set(items)} 测试集真实交互
        top_k: 评估的 K 值
    Returns:
        {'recall@K': ..., 'ndcg@K': ...}
    """
    model.eval()

    with torch.no_grad():
        user_emb, item_emb = model.propagate(adj)

    recalls = []
    ndcgs = []

    for user in test_dict:
        if user not in train_user_items:
            continue

        # 计算该用户对所有物品的分数
        scores = torch.mv(item_emb, user_emb[user])  # [num_items]

        # 屏蔽训练集中已交互的物品
        train_items = list(train_user_items[user])
        scores[train_items] = -float('inf')

        # Top-K 推荐
        _, topk_items = scores.topk(top_k)
        topk_items = topk_items.cpu().numpy().tolist()  # 保持有序列表,NDCG需要排名信息

        # 计算 Recall@K
        true_items = test_dict[user]
        hits = len(set(topk_items) & true_items)
        recall = hits / min(len(true_items), top_k)
        recalls.append(recall)

        # 计算 NDCG@K
        dcg = 0.0
        for rank, item in enumerate(topk_items):
            if item in true_items:
                dcg += 1.0 / np.log2(rank + 2)  # rank 从 0 开始

        idcg = sum(1.0 / np.log2(i + 2) for i in range(min(len(true_items), top_k)))
        ndcg = dcg / idcg if idcg > 0 else 0.0
        ndcgs.append(ndcg)

    return {
        f'Recall@{top_k}': np.mean(recalls),
        f'NDCG@{top_k}': np.mean(ndcgs)
    }

# ============================================================
# 第 5 部分:训练与评估主流程
# ============================================================

def main():
    # 超参数
    EMB_DIM = 64
    N_LAYERS = 3
    LR = 1e-3
    REG = 1e-4
    BATCH_SIZE = 1024
    N_EPOCHS = 80
    TOP_K = 20

    # 1. 数据加载
    download_movielens_100k()
    train_pairs, test_dict, num_users, num_items = load_and_split_data()

    # 2. 构建图
    adj = build_adj_matrix(train_pairs, num_users, num_items)

    # 构建训练集用户交互字典
    train_user_items = defaultdict(set)
    for u, i in train_pairs:
        train_user_items[u].add(i)

    # 3. 创建模型
    model = LightGCNFull(num_users, num_items, EMB_DIM, N_LAYERS, REG)
    optimizer = torch.optim.Adam(model.parameters(), lr=LR)

    # 4. 训练循环
    print("\n" + "=" * 60)
    print("开始训练 LightGCN")
    print("=" * 60)

    best_recall = 0.0

    for epoch in range(N_EPOCHS):
        model.train()
        perm = np.random.permutation(len(train_pairs))
        total_loss = 0.0
        n_batch = 0

        for start in range(0, len(train_pairs), BATCH_SIZE):
            end = min(start + BATCH_SIZE, len(train_pairs))
            batch = train_pairs[perm[start:end]]

            users = torch.LongTensor(batch[:, 0])
            pos_items = torch.LongTensor(batch[:, 1])
            neg_items = torch.LongTensor([
                sample_neg(train_user_items, num_items, u)
                for u in batch[:, 0]
            ])

            loss = model.bpr_loss(adj, users, pos_items, neg_items)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            n_batch += 1

        avg_loss = total_loss / n_batch

        # 每 10 个 epoch 评估一次
        if (epoch + 1) % 10 == 0:
            metrics = evaluate(model, adj, train_user_items, test_dict, TOP_K)
            recall = metrics[f'Recall@{TOP_K}']
            ndcg = metrics[f'NDCG@{TOP_K}']

            improved = " ★ New Best!" if recall > best_recall else ""
            best_recall = max(best_recall, recall)

            print(f"Epoch {epoch+1:3d} | Loss: {avg_loss:.4f} | "
                  f"Recall@{TOP_K}: {recall:.4f} | "
                  f"NDCG@{TOP_K}: {ndcg:.4f}{improved}")

    # 5. 最终评估
    print("\n" + "=" * 60)
    print("最终评估结果")
    print("=" * 60)

    for k in [5, 10, 20]:
        metrics = evaluate(model, adj, train_user_items, test_dict, k)
        print(f"  Recall@{k:2d}: {metrics[f'Recall@{k}']:.4f}  |  "
              f"NDCG@{k:2d}: {metrics[f'NDCG@{k}']:.4f}")

    # 6. 推荐示例
    print("\n" + "=" * 60)
    print("推荐示例")
    print("=" * 60)

    model.eval()
    with torch.no_grad():
        user_emb, item_emb = model.propagate(adj)

    for uid in [0, 1, 2]:
        scores = torch.mv(item_emb, user_emb[uid])
        train_items = list(train_user_items.get(uid, set()))
        scores[train_items] = -float('inf')
        _, topk = scores.topk(10)
        print(f"  用户 {uid} 的 Top-10 推荐: {topk.tolist()}")

def sample_neg(user_items: dict, num_items: int, user: int) -> int:
    """负采样"""
    while True:
        neg = random.randint(0, num_items - 1)
        if neg not in user_items[user]:
            return neg

if __name__ == "__main__":
    random.seed(42)
    np.random.seed(42)
    torch.manual_seed(42)
    main()

19.8.3 结果分析

在 MovieLens-100K 上的典型结果(参考值):

模型 Recall@20 NDCG@20 参数量
MF-BPR ~0.18 ~0.15 用户数×dim + 物品数×dim
NGCF ~0.20 ~0.17 上述 + 每层 W 矩阵
LightGCN (3层) ~0.22 ~0.19 仅初始 Embedding

关键观察

  1. LightGCN 参数最少但效果最好:没有额外的权重矩阵,所有参数集中在初始 Embedding
  2. 层数敏感性:2-3 层效果最好,超过 4 层开始下降(over-smoothing)
  3. Embedding 维度:64 维通常足够,过大反而过拟合
  4. 正则化重要:L2 正则化对防止过拟合至关重要
Text Only
层数 vs Recall@20 关系示意:

Recall@20
  ^
  |    ╱ ─ ─ ─ ╲
  |   ╱           ╲
  |  ╱              ╲
  | ╱                  ╲
  |╱                      ╲
  ┼───┬───┬───┬───┬───┬───→ 层数
  0   1   2   3   4   5

  最优通常在 2-3 层
  继续增加层数 → over-smoothing → 性能下降

19.9 面试高频题

题目 1:为什么要将推荐系统建模为图问题?

参考答案

传统推荐方法(如矩阵分解)仅利用用户与物品的直接交互(一阶信号),忽视了更深层的协同关系。将推荐建模为图问题有三个核心优势:

  1. 高阶协同信号:通过图上的消息传递,可以自然地捕获多跳邻居的协同信息。例如用户 A 通过"A→物品X→用户B→物品Y"的路径发现物品Y,这是2阶协同信号。
  2. 灵活融合侧信息:用户属性、物品属性、社交关系、知识图谱等异质信息都可以作为图的节点和边自然融入,无需人工设计特征交叉。
  3. 缓解数据稀疏:即使两个用户没有共同交互,通过图中的间接路径仍然可以发现它们之间的关联。

题目 2:GCN 的消息传递机制是什么?请用公式表达并解释。

参考答案

GCN 的消息传递分为三步:

  1. 消息构造:邻居节点 \(u\) 发送其特征 \(h_u^{(l)}\) 给目标节点 \(v\)
  2. 聚合(Aggregate):将所有邻居消息汇总
\[m_v^{(l)} = \sum_{u \in \mathcal{N}(v)} \frac{1}{\sqrt{d_u \cdot d_v}} h_u^{(l)}\]
  1. 更新(Update):结合自身特征和聚合消息
\[h_v^{(l+1)} = \sigma(W^{(l)} [h_v^{(l)} + m_v^{(l)}])\]

矩阵形式为 \(H^{(l+1)} = \sigma(\tilde{A} H^{(l)} W^{(l)})\),其中 \(\tilde{A} = D^{-1/2} A D^{-1/2}\) 是对称归一化邻接矩阵。归一化系数 \(\frac{1}{\sqrt{d_u \cdot d_v}}\) 的作用是防止度大的节点在聚合中占据主导地位。


题目 3:LightGCN 对 GCN 做了哪些简化?为什么这些简化在推荐场景中是合理的?

参考答案

LightGCN 做了两个关键简化:

  1. 移除特征变换矩阵 \(W\):推荐场景中用户和物品没有丰富的属性特征,只有可学习的 ID Embedding。对 ID Embedding 做线性变换等价于换了一组 Embedding 参数,增加了模型复杂度却没有带来额外信息。
  2. 移除非线性激活 \(\sigma\):实验表明非线性激活在推荐任务中会导致训练困难和性能下降,因为它可能破坏 Embedding 空间中的几何结构。

合理性在于:GCN 的特征变换和非线性是为节点分类任务设计的(节点有丰富的初始特征需要变换),而推荐场景的本质是协同过滤,核心是学习高质量的用户/物品表征,邻域信息的简单聚合已经足够。


题目 4:什么是 over-smoothing 问题?LightGCN 如何缓解?

参考答案

Over-smoothing(过度平滑) 指随着 GCN 层数增加,所有节点的表征趋于收敛到相同的值,导致不同节点变得不可区分。

数学直觉:\(\tilde{A}\) 的反复相乘相当于随机游走的收敛过程,最终所有节点的表征趋向于度加权的"全局平均"。

LightGCN 的缓解策略是层组合(Layer Combination)

\[e = \frac{1}{K+1} \sum_{k=0}^K e^{(k)}\]

即使第 K 层的表征过于平滑,第 0 层(初始 Embedding)和低层表征仍保留了节点的局部个性化信息。最终的加权组合保证了多尺度信息的融合。实践中层数通常控制在 2-3 层。


题目 5:PinSage 如何解决 GCN 在十亿节点图上的扩展性问题?

参考答案

PinSage 采用三个核心策略:

  1. 随机游走邻居采样:用随机游走代替全邻居聚合,为每个节点固定选取 Top-T 个最重要的邻居(按游走访问频次排序),将邻居数从可能的百万级降到固定的 T(如 50)。
  2. Mini-batch 训练:不做全图卷积,而是为每个 mini-batch 的目标节点构建计算子图(K 层邻居的 K-hop 子图),仅在子图上进行前向和反向传播。
  3. Producer-Consumer 架构:CPU 进程负责图采样(I/O 密集),GPU 进程负责模型计算(计算密集),两者异步流水线执行。

此外,PinSage 使用 Hard Negative Mining(用上一轮 Embedding 检索相似但未交互的物品)提升负样本质量,进一步提升训练效率和效果。


题目 6:LightGCN 中 BPR Loss 和 L2 正则化有什么注意事项?

参考答案

BPR Loss 的核心思想是:正样本(用户交互过的物品)的预测分数应该高于负样本(未交互的物品):

\[\mathcal{L}_{BPR} = -\sum_{(u,i,j)} \ln \sigma(\hat{y}_{ui} - \hat{y}_{uj})\]

注意事项: 1. L2 正则化只对初始 Embedding \(e^{(0)}\) 进行,不对传播后的 Embedding 正则化。因为 LightGCN 没有额外的权重参数,所有可学习参数都在初始 Embedding 中。 2. 负采样策略很重要:均匀随机负采样是最基础的方式,但效率不高。可以用流行度加权采样或 Hard Negative 提升效果。 3. 正则化系数 \(\lambda\) 需要调参:过大导致欠拟合,过小导致过拟合。


题目 7:对比学习(Contrastive Learning)为什么能提升图推荐效果?

参考答案

对比学习通过引入额外的自监督信号来增强 Embedding 质量,主要从三个方面提升效果:

  1. 缓解数据稀疏:用户-物品交互极度稀疏(通常 < 1%),监督信号不足。对比学习通过数据增强或噪声扰动创造更多的训练信号,相当于用无标注的方式增加了训练数据。
  2. 提升 Embedding 均匀性(Uniformity):对比学习的负样本推开机制使得不同节点的 Embedding 更均匀地分布在特征空间中,避免了所有 Embedding 坍塌到一个小区域——这在稀疏图上尤其重要。
  3. 增强鲁棒性:通过不同增强视图间的一致性约束,模型学到的表征对噪声更鲁棒。

代表方法 SGL 使用图结构增强(丢边/丢节点),SimGCL 发现直接在 Embedding 上加随机噪音效果更好且更高效。


题目 8:知识图谱推荐和纯协同过滤推荐的核心区别是什么?各有什么优劣?

参考答案

维度 纯协同过滤 知识图谱推荐
信息来源 仅用户-物品交互 交互 + 物品属性 + 实体关系
冷启动 新物品无交互则无法推荐 新物品有 KG 属性就可推荐
可解释性 黑盒("相似用户喜欢") 可解释("因为同导演/同类型")
数据依赖 需要大量交互 还需要高质量知识图谱
计算成本 较低 较高(KG 通常很大)
KG 维护 需要持续维护和更新

核心权衡:KG 推荐用外部知识换取更好的冷启动和可解释性,但代价是 KG 的构建和维护成本。在工业界,KG 推荐通常作为整体推荐系统的一个组件,与协同过滤互补使用。


题目 9:GCN 推荐中的归一化邻接矩阵 \(\tilde{A} = D^{-1/2}AD^{-1/2}\) 有什么物理含义?为什么不直接用 \(A\)

参考答案

直接使用邻接矩阵 \(A\) 有两个严重问题:

  1. 数值不稳定:随着层数增加,\(A^k\) 的值会指数级爆炸或衰减,导致梯度爆炸/消失。
  2. 度偏差:度大的节点(热门物品)在聚合中贡献过大,主导了所有邻居的表征。

\(\tilde{A}_{ij} = \frac{1}{\sqrt{d_i} \cdot \sqrt{d_j}}\) 的物理含义: - 节点 \(j\) 向节点 \(i\) 传递消息时,按双方度数的几何平均进行缩放。 - 高度节点的消息被缩小(热门物品影响被抑制),低度节点的消息被放大(冷门物品获得更多关注)。 - 这等价于度归一化的对称拉普拉斯算子,保证了传播的数值稳定性。

另外还有行归一化 \(D^{-1}A\)(左归一化),等价于对每个节点的邻居消息取平均,也常用于工业场景。


题目 10:如果让你设计一个工业级图推荐系统,你会考虑哪些关键问题?

参考答案

需要从以下 5 个维度考虑:

1. 可扩展性 - 图规模可能达到十亿节点级别,全图 GCN 不可行 - 方案:mini-batch 采样训练(如 PinSage)、GraphSAGE 风格的邻居采样

2. 实时性 - 用户行为实时发生,图结构动态变化 - 方案:增量图更新 + 局部 Embedding 重新计算(而非全图重训)

3. 异质信息 - 实际场景中有多种节点类型和边类型 - 方案:异质图神经网络(HAN)、关系感知的注意力机制

4. 召回 vs 排序 - GNN Embedding 更适合做召回(向量检索),排序阶段通常用更精细的模型 - 方案:双塔架构——用 GNN 生成用户/物品的召回 Embedding → ANN 检索 → 精排

5. 线上部署 - GNN 推理延迟可能较高(需要多跳邻居聚合) - 方案:离线预计算所有节点 Embedding → 线上直接查表 + ANN 检索 - 增量更新:新物品上线后,仅在其局部子图上做推理

Text Only
工业级图推荐系统架构:

  ┌──────────────┐     ┌──────────────────┐
  │ 离线训练      │     │ 特征工程          │
  │ LightGCN/    │     │ 用户特征、物品特征  │
  │ PinSage      │     │ 上下文特征          │
  │ 全量训练      │     └───────┬────────────┘
  └──────┬───────┘             │
         │                     │
         ▼                     ▼
  ┌──────────────┐     ┌──────────────────┐
  │ Embedding    │     │ 精排模型           │
  │ 预计算 & 存储│     │ DeepFM/DIN        │
  │ (Redis/HBase)│     │ (参考第6/7章)      │
  └──────┬───────┘     └───────┬────────────┘
         │                     │
         ▼                     ▼
  ┌──────────────┐     ┌──────────────────┐
  │ ANN 召回     │────→│ 精排 → 重排       │────→ 推荐结果
  │ (Faiss/Milvus)│    │                  │
  └──────────────┘     └──────────────────┘

📝 本章小结

主题 核心要点
GCN 基础 消息传递 + 邻域聚合,矩阵形式 \(\tilde{A}HW\),谱域 vs 空间域
LightGCN 移除特征变换和非线性激活,层组合多尺度融合,BPR Loss
PinSage 随机游走采样、Importance Pooling、MapReduce 分布式、Hard Negative
KGAT 知识图谱 + 协同过滤统一为 CKG,关系感知的注意力聚合
图对比学习 SGL 图增强 + InfoNCE,SimGCL 噪声扰动更简单有效
前沿方向 GNN+LLM 融合、异质图、超图、公平性去偏

学习建议: 1. 先理解 GCN 的消息传递原理,再学 LightGCN 的简化动机 2. 动手运行实战代码,尝试调整层数、Embedding 维度、正则化系数 3. 重点掌握 LightGCN 和 PinSage,这是面试中出现频率最高的两个模型 4. 了解图对比学习和 KG 推荐的基本思路,能说清楚"为什么有用"

推荐阅读: - LightGCN 论文:LightGCN: Simplifying and Powering Graph Convolution Network for Recommendation (SIGIR 2020) - PinSage 论文:Graph Convolutional Neural Networks for Web-Scale Recommender Systems (KDD 2018) - SGL 论文:Self-supervised Graph Learning for Recommendation (SIGIR 2021) - SimGCL 论文:Are Graph Augmentations Necessary? Simple Graph Contrastive Learning for Recommendation (SIGIR 2022)


下一步:掌握本章内容后,可以回顾第9章-召回算法了解如何将 GNN Embedding 集成到工业推荐系统的召回阶段,或参考第16章-LLM与推荐系统了解 GNN+LLM 的最新融合范式。