跳转至

序列推荐算法

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

序列推荐模型

📖 章节导读

序列推荐(Sequential Recommendation)关注用户的行为序列,利用时序信息捕捉用户兴趣的演化过程。本章将介绍序列推荐的基本原理、主流算法和实际应用。

🎯 学习目标

  • 理解序列推荐的基本原理
  • 掌握RNN/LSTM推荐模型
  • 掌握GRU4Rec模型
  • 了解Transformer推荐模型
  • 能够实现序列推荐系统

7.1 序列推荐概述

7.1.1 基本思想

序列推荐的核心思想是:用户的行为是有序的,过去的行为会影响未来的行为

特点: 1. 时序性:考虑行为的时间顺序 2. 动态性:用户兴趣随时间变化 3. 上下文依赖:当前行为依赖于历史行为

7.1.2 应用场景

  • 电商推荐:基于浏览/购买序列推荐商品
  • 短视频推荐:基于观看序列推荐视频
  • 音乐推荐:基于播放序列推荐歌曲
  • 新闻推荐:基于阅读序列推荐新闻

7.2 RNN/LSTM推荐

7.2.1 RNN推荐

基本原理: - 使用RNN建模用户行为序列 - 隐藏状态表示用户当前兴趣 - 预测下一个可能的行为

模型结构:

Text Only
行为序列: [item1, item2, item3, ...]
RNN/LSTM
隐藏状态: [h1, h2, h3, ...]
预测: P(item_next | h_last)

7.2.2 LSTM推荐实现

Python
import torch
import torch.nn as nn

class LSTMRecommender(nn.Module):  # 继承nn.Module定义网络层
    def __init__(self, num_items, embed_dim, hidden_dim, num_layers=2):
        """
        num_items: 物品数量
        embed_dim: Embedding维度
        hidden_dim: LSTM隐藏层维度
        num_layers: LSTM层数
        """
        super(LSTMRecommender, self).__init__()

        # Embedding层
        self.embedding = nn.Embedding(num_items, embed_dim)

        # LSTM层
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers,
                          batch_first=True, dropout=0.3)

        # 输出层
        self.fc = nn.Linear(hidden_dim, num_items)

    def forward(self, sequences):
        """
        sequences: [batch_size, seq_len]
        """
        # Embedding
        embeds = self.embedding(sequences)  # [batch_size, seq_len, embed_dim]

        # LSTM
        lstm_out, (hidden, cell) = self.lstm(embeds)

        # 使用最后一个时间步的隐藏状态
        last_hidden = hidden[-1]  # [batch_size, hidden_dim]  # [-1]负索引取最后元素

        # 预测
        output = self.fc(last_hidden)  # [batch_size, num_items]

        return output

7.2.3 训练代码

Python
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

class SequenceDataset(Dataset):
    def __init__(self, sequences, seq_len=10):
        self.sequences = sequences
        self.seq_len = seq_len

    def __len__(self):  # __len__定义len()行为
        return len(self.sequences)

    def __getitem__(self, idx):  # __getitem__定义索引访问行为
        seq = self.sequences[idx]
        # 输入:前seq_len个物品
        # 目标:第seq_len+1个物品
        if len(seq) <= self.seq_len:
            input_seq = seq[:-1] + [0] * (self.seq_len - len(seq) + 1)
            target = seq[-1]
        else:
            input_seq = seq[-self.seq_len-1:-1]
            target = seq[-1]

        return torch.LongTensor(input_seq), torch.LongTensor([target])

# 准备数据
sequences = [[1, 2, 3, 4, 5], [2, 3, 5, 7, 9], ...]
dataset = SequenceDataset(sequences, seq_len=10)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)  # DataLoader批量加载数据

# 初始化模型
model = LSTMRecommender(num_items=10000, embed_dim=128,
                       hidden_dim=256, num_layers=2)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练
for epoch in range(10):
    model.train()  # train()训练模式
    total_loss = 0
    for input_seq, target in dataloader:
        optimizer.zero_grad()  # 清零梯度

        # 前向传播
        output = model(input_seq)

        # 计算损失
        loss = criterion(output, target.squeeze())  # squeeze压缩维度

        # 反向传播
        loss.backward()  # 反向传播计算梯度
        optimizer.step()  # 更新参数

        total_loss += loss.item()  # 将单元素张量转为Python数值

    avg_loss = total_loss / len(dataloader)
    print(f"Epoch {epoch + 1}, Loss: {avg_loss:.4f}")

7.3 GRU4Rec模型

7.3.1 模型原理

GRU4Rec是基于GRU的序列推荐模型,由Hidasi等人提出。

特点: 1. 使用GRU代替LSTM,计算效率更高 2. 引入BPR损失函数优化排序 3. 支持负采样

7.3.2 PyTorch实现

Python
class GRU4Rec(nn.Module):
    def __init__(self, num_items, embed_dim, hidden_dim, num_layers=1):
        """
        num_items: 物品数量
        embed_dim: Embedding维度
        hidden_dim: GRU隐藏层维度
        num_layers: GRU层数
        """
        super(GRU4Rec, self).__init__()

        # Embedding层
        self.embedding = nn.Embedding(num_items, embed_dim)

        # GRU层
        self.gru = nn.GRU(embed_dim, hidden_dim, num_layers,
                        batch_first=True)

        # 输出层
        self.fc = nn.Linear(hidden_dim, num_items)

    def forward(self, sequences):
        """
        sequences: [batch_size, seq_len]
        """
        # Embedding
        embeds = self.embedding(sequences)  # [batch_size, seq_len, embed_dim]

        # GRU
        gru_out, hidden = self.gru(embeds)

        # 使用最后一个时间步的输出
        last_output = gru_out[:, -1, :]  # [batch_size, hidden_dim]

        # 预测
        output = self.fc(last_output)  # [batch_size, num_items]

        return output

7.3.3 BPR损失函数

Python
def bpr_loss(pos_scores, neg_scores):
    """
    BPR损失函数
    pos_scores: 正样本得分 [batch_size]
    neg_scores: 负样本得分 [batch_size]
    """
    # 计算差值
    diff = pos_scores - neg_scores

    # BPR损失
    loss = -torch.mean(torch.log(torch.sigmoid(diff)))

    return loss

7.4 Transformer推荐

7.4.1 模型原理

Transformer通过自注意力机制建模序列,能够捕捉长距离依赖。

优势: 1. 并行计算,训练效率高 2. 能够捕捉长距离依赖 3. 注意力机制提供可解释性

7.4.2 SASRec模型

SASRec(Self-Attentive Sequential Recommendation)是经典的Transformer推荐模型。

Python
import torch
import torch.nn as nn
import math

class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads

        self.q_linear = nn.Linear(embed_dim, embed_dim)
        self.k_linear = nn.Linear(embed_dim, embed_dim)
        self.v_linear = nn.Linear(embed_dim, embed_dim)
        self.out_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)

        # 线性变换
        Q = self.q_linear(query)
        K = self.k_linear(key)
        V = self.v_linear(value)

        # 分头
        Q = Q.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)  # 重塑张量形状
        K = K.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        V = V.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)

        # 注意力计算
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)

        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        attention = torch.softmax(scores, dim=-1)
        context = torch.matmul(attention, V)

        # 合并头
        context = context.transpose(1, 2).contiguous().view(
            batch_size, -1, self.embed_dim)

        output = self.out_linear(context)
        return output

class SASRec(nn.Module):
    def __init__(self, num_items, embed_dim, num_heads, num_layers, max_len=50):
        super(SASRec, self).__init__()

        # Embedding层
        self.item_embedding = nn.Embedding(num_items, embed_dim)
        self.pos_embedding = nn.Embedding(max_len, embed_dim)

        # Transformer层
        self.transformer_layers = nn.ModuleList([
            nn.TransformerEncoderLayer(embed_dim, num_heads,
                                    dim_feedforward=embed_dim*4,
                                    dropout=0.1)
            for _ in range(num_layers)
        ])

        # 输出层
        self.fc = nn.Linear(embed_dim, num_items)

    def forward(self, sequences):
        """
        sequences: [batch_size, seq_len]
        """
        batch_size, seq_len = sequences.size()

        # Embedding
        item_embeds = self.item_embedding(sequences)
        pos_ids = torch.arange(seq_len, device=sequences.device).unsqueeze(0).repeat(batch_size, 1)  # unsqueeze增加一个维度
        pos_embeds = self.pos_embedding(pos_ids)

        embeds = item_embeds + pos_embeds  # [batch_size, seq_len, embed_dim]

        # 因果注意力掩码:防止模型看到未来的物品(下三角矩阵)
        causal_mask = torch.triu(
            torch.ones(seq_len, seq_len, device=sequences.device), diagonal=1
        ).bool()  # 上三角为True(被屏蔽)

        # Transformer
        embeds = embeds.transpose(0, 1)  # [seq_len, batch_size, embed_dim]
        for layer in self.transformer_layers:
            embeds = layer(embeds, src_mask=causal_mask)
        embeds = embeds.transpose(0, 1)  # [batch_size, seq_len, embed_dim]

        # 使用最后一个时间步
        last_output = embeds[:, -1, :]  # [batch_size, embed_dim]

        # 预测
        output = self.fc(last_output)  # [batch_size, num_items]

        return output

7.5 实战案例

案例:电商序列推荐

Python
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset

# 1. 加载数据
data = pd.read_csv('user_sequences.csv')

# 2. 构建序列数据
def build_sequences(data, min_len=5):
    """
    构建用户行为序列
    """
    sequences = []
    for user_id, group in data.groupby('user_id'):
        items = group['item_id'].tolist()
        if len(items) >= min_len:
            sequences.append(items)
    return sequences

sequences = build_sequences(data, min_len=5)

# 3. 创建数据集
dataset = SequenceDataset(sequences, seq_len=10)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# 4. 初始化模型
num_items = max(max(seq) for seq in sequences) + 1
model = SASRec(num_items=num_items, embed_dim=128,
               num_heads=4, num_layers=2)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 5. 训练
for epoch in range(10):
    model.train()
    total_loss = 0
    for input_seq, target in dataloader:
        optimizer.zero_grad()

        # 前向传播
        output = model(input_seq)

        # 计算损失
        loss = criterion(output, target.squeeze())

        # 反向传播
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    print(f"Epoch {epoch + 1}, Loss: {avg_loss:.4f}")

# 6. 推荐示例
def recommend(model, user_sequence, top_k=10):
    """
    为用户推荐物品
    """
    model.eval()
    with torch.no_grad():  # 禁用梯度计算,节省内存
        # 准备输入
        if len(user_sequence) < 10:
            input_seq = user_sequence + [0] * (10 - len(user_sequence))
        else:
            input_seq = user_sequence[-10:]

        input_tensor = torch.LongTensor([input_seq])

        # 预测
        output = model(input_tensor)
        scores = output[0].numpy()

        # 排序
        top_items = np.argsort(scores)[::-1][:top_k]

        return top_items

# 测试
user_sequence = [1, 2, 3, 4, 5, 6, 7, 8, 9]
recommendations = recommend(model, user_sequence, top_k=10)
print(f"推荐物品: {recommendations}")

📝 本章小结

本章介绍了序列推荐算法:

  1. ✅ 序列推荐的基本原理
  2. ✅ RNN/LSTM推荐模型
  3. ✅ GRU4Rec模型
  4. ✅ Transformer推荐模型
  5. ✅ 实战案例

通过本章学习,你应该能够: - 理解序列推荐的重要性 - 实现RNN/LSTM推荐 - 实现GRU4Rec模型 - 实现Transformer推荐 - 应用序列推荐解决实际问题

🔗 下一步

下一章我们将学习多任务学习推荐,了解如何同时优化多个目标。

继续学习: 08-多任务学习推荐.md

💡 思考题

  1. 序列推荐相比传统推荐有什么优势?

    传统推荐只用统计特征(点击次数/收藏率),序列推荐建模行为顺序,捕捉①兴趣演变(从入门到进阶) ②短期意图(浏览手机→可能买手机) ③时序模式(午餐后喝咖啡)。代表模型:DIN(物品级注意力)、SASRec(Self-Attention)、BERT4Rec(双向序列建模)。

  2. RNN/LSTM和Transformer各有什么优缺点?

    RNN/LSTM:捕捉序列依赖强、参数少,但串行计算慢、长序列遗忘、难并行。Transformer(Self-Attention):并行度高、长距离依赖强、效果更好,但计算量O(n²)、需要更多数据。实践:序列<100用Transformer(SASRec),超长序列用LSTM+Attention或截断。

  3. 如何处理变长的行为序列?

    ①截断(取最近N个行为,如N=50-200) ②Padding(不足的补0+Mask) ③分段Pooling(最近1天/7天/30天分别编码) ④采样(重要行为保留,曝光行为采样) ⑤SIM模型(先用类目检索相关子序列,再计算Attention,适合超长序列)。

  4. 如何评估序列推荐的效果?

    离线:HitRate@K、NDCG@K、MRR(预测下一次点击)。评估方式:Leave-One-Out(最后一个行为做测试)或时间切分(用T时刻前训练、T后测试)。在线:CTR、时长、转化率A/B测试。重要:避免数据泄露(不能用未来信息训练)。

  5. 序列推荐在实际应用中有哪些挑战?

    ①序列长度不均(新用户短、老用户很长) ②实时性需求(用户行为需实时更新) ③序列噪声(误点击/重复点击) ④计算开销(Transformer在线推理延迟) ⑤特征融合(序列特征+统计特征+上下文特征的融合)。解决:SIM(长序列) + 实时特征更新(Flink) + 序列去噪。

📚 参考资料

  1. "Session-based Recommendations with Recurrent Neural Networks" - Hidasi et al.
  2. "Self-Attentive Sequential Recommendation" - Kang & McAuley
  3. "BERT4Rec: Sequential Recommendation with Bidirectional Encoder Representations from Transformer" - Sun et al.
  4. "Improving Sequential Recommendation with Knowledge-Enhanced User Modeling" - Chen et al.
  5. PyTorch Documentation