跳转至

多任务学习推荐

多任务学习推荐

📖 章节导读

多任务学习(Multi-Task Learning, MTL)在推荐系统中越来越重要,它可以同时优化多个相关任务,提升整体推荐效果。本章将介绍多任务学习的基本原理、主流模型和实际应用。

🎯 学习目标

  • 理解多任务学习的基本原理
  • 掌握MMoE模型
  • 掌握PLE模型
  • 了解其他多任务学习模型
  • 能够实现多任务推荐系统

8.1 多任务学习概述

8.1.1 基本思想

多任务学习的核心思想是:同时学习多个相关任务,共享底层特征表示,提升整体性能

优势: 1. 数据利用:多个任务共享数据,提高数据利用率 2. 特征共享:学习更通用的特征表示 3. 正则化:多任务相互正则化,防止过拟合 4. 效率:一次训练多个任务,提高效率

8.1.2 推荐系统中的多任务

常见任务: 1. 点击率(CTR)预测:预测用户点击的概率 2. 转化率(CVR)预测:预测用户购买的概率 3. 停留时长预测:预测用户停留时间 4. 点赞率预测:预测用户点赞的概率 5. 收藏率预测:预测用户收藏的概率

任务关系: - 点击和转化:点击是转化的前提 - 点赞和收藏:都表示用户喜欢 - 停留时长和点击:停留时长反映兴趣程度

8.2 MMoE模型

8.2.1 模型原理

MMoE(Multi-gate Mixture-of-Experts)由Google提出,是经典的多任务学习模型。

核心思想: 1. 使用多个专家网络(Expert)学习不同的特征表示 2. 使用门控网络(Gate)为每个任务动态选择专家 3. 每个任务使用不同的专家组合

模型结构:

Text Only
输入特征
共享层(可选)
专家网络(Expert1, Expert2, ..., ExpertN)
门控网络(Gate1, Gate2, ..., GateM)
任务特定层(Tower1, Tower2, ..., TowerM)
输出(Output1, Output2, ..., OutputM)

8.2.2 PyTorch实现

Python
import torch
import torch.nn as nn

class Expert(nn.Module):  # 继承nn.Module定义网络层
    def __init__(self, input_dim, hidden_dim):
        super(Expert, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        x = self.dropout(self.relu(self.fc1(x)))
        x = self.dropout(self.relu(self.fc2(x)))
        return x

class Gate(nn.Module):
    def __init__(self, input_dim, num_experts):
        super(Gate, self).__init__()
        self.fc = nn.Linear(input_dim, num_experts)

    def forward(self, x):
        return torch.softmax(self.fc(x), dim=-1)

class MMoE(nn.Module):
    def __init__(self, input_dim, num_experts, expert_hidden_dim,
                 task_hidden_dims, num_tasks):
        """
        input_dim: 输入特征维度
        num_experts: 专家网络数量
        expert_hidden_dim: 专家网络隐藏层维度
        task_hidden_dims: 每个任务的隐藏层维度列表
        num_tasks: 任务数量
        """
        super(MMoE, self).__init__()

        # 专家网络
        self.experts = nn.ModuleList([
            Expert(input_dim, expert_hidden_dim)
            for _ in range(num_experts)
        ])

        # 门控网络
        self.gates = nn.ModuleList([
            Gate(input_dim, num_experts)
            for _ in range(num_tasks)
        ])

        # 任务特定层
        self.towers = nn.ModuleList()
        for hidden_dims in task_hidden_dims:
            tower = []
            input_dim_tower = expert_hidden_dim
            for hidden_dim in hidden_dims:
                tower.append(nn.Linear(input_dim_tower, hidden_dim))
                tower.append(nn.ReLU())
                tower.append(nn.Dropout(0.3))
                input_dim_tower = hidden_dim
            tower.append(nn.Linear(input_dim_tower, 1))
            self.towers.append(nn.Sequential(*tower))

    def forward(self, x):
        """
        x: [batch_size, input_dim]
        返回: list of outputs for each task
        """
        # 专家网络输出
        expert_outputs = [expert(x) for expert in self.experts]
        expert_outputs = torch.stack(expert_outputs, dim=1)  # [batch_size, num_experts, expert_hidden_dim]  # torch.stack沿新维度拼接张量

        # 门控网络输出
        task_outputs = []
        for i, gate in enumerate(self.gates):  # enumerate同时获取索引和元素
            gate_weights = gate(x)  # [batch_size, num_experts]
            gate_weights = gate_weights.unsqueeze(-1)  # [batch_size, num_experts, 1]  # unsqueeze增加一个维度

            # 加权组合专家输出
            weighted_experts = expert_outputs * gate_weights
            combined = torch.sum(weighted_experts, dim=1)  # [batch_size, expert_hidden_dim]

            # 任务特定层
            task_output = self.towers[i](combined)
            task_outputs.append(task_output)

        return task_outputs

8.2.3 训练代码

Python
import torch.optim as optim

# 准备数据
X = torch.randn(1000, 100)  # 特征
y1 = torch.randint(0, 2, (1000, 1)).float()  # 任务1标签(CTR)
y2 = torch.randint(0, 2, (1000, 1)).float()  # 任务2标签(CVR)

# 初始化模型
model = MMoE(
    input_dim=100,
    num_experts=4,
    expert_hidden_dim=128,
    task_hidden_dims=[[64, 32], [64, 32]],
    num_tasks=2
)

criterion1 = nn.BCEWithLogitsLoss()
criterion2 = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练
for epoch in range(10):
    model.train()  # train()训练模式
    optimizer.zero_grad()  # 清零梯度

    # 前向传播
    outputs = model(X)
    output1, output2 = outputs

    # 计算损失
    loss1 = criterion1(output1, y1)
    loss2 = criterion2(output2, y2)
    loss = loss1 + loss2  # 多任务损失

    # 反向传播
    loss.backward()  # 反向传播计算梯度
    optimizer.step()  # 更新参数

    print(f"Epoch {epoch + 1}, Loss: {loss.item():.4f}, "  # 将单元素张量转为Python数值
          f"Loss1: {loss1.item():.4f}, Loss2: {loss2.item():.4f}")

8.3 PLE模型

8.3.1 模型原理

PLE(Progressive Layered Extraction)是腾讯提出的改进模型,解决了MMoE中专家共享导致的负迁移问题。

核心改进: 1. 显式分离任务特定专家 2. 渐进式提取特征 3. 多层结构

模型结构:

Text Only
输入特征
第1层:
  任务特定专家 + 共享专家
第2层:
  任务特定专家 + 共享专家
...
任务特定层
输出

8.3.2 PyTorch实现

Python
class PLELayer(nn.Module):
    def __init__(self, input_dim, num_tasks, num_shared_experts,
                 num_task_experts, expert_hidden_dim):
        super(PLELayer, self).__init__()
        self.num_tasks = num_tasks

        # 共享专家
        self.shared_experts = nn.ModuleList([
            Expert(input_dim, expert_hidden_dim)
            for _ in range(num_shared_experts)
        ])

        # 任务特定专家
        self.task_experts = nn.ModuleList([
            nn.ModuleList([
                Expert(input_dim, expert_hidden_dim)
                for _ in range(num_task_experts)
            ])
            for _ in range(num_tasks)
        ])

        # 门控网络
        self.gates = nn.ModuleList([
            Gate(input_dim, num_shared_experts + num_task_experts)
            for _ in range(num_tasks)
        ])

    def forward(self, task_inputs):
        """
        task_inputs: list of [batch_size, input_dim],每个任务一个输入
                     首层可传入单个 tensor,自动扩展为所有任务共享
        """
        # 首层输入是单个tensor,扩展为每个任务相同的输入
        if isinstance(task_inputs, torch.Tensor):  # isinstance检查类型
            task_inputs = [task_inputs] * self.num_tasks

        # 共享专家使用所有任务输入的均值
        shared_input = torch.stack(task_inputs).mean(dim=0)
        shared_outputs = [expert(shared_input) for expert in self.shared_experts]

        # 任务特定专家输出
        task_outputs = []
        for i in range(self.num_tasks):
            task_expert_outputs = [expert(task_inputs[i])
                                 for expert in self.task_experts[i]]
            # 合并共享和任务特定专家
            all_experts = shared_outputs + task_expert_outputs
            all_experts = torch.stack(all_experts, dim=1)

            # 门控
            gate_weights = self.gates[i](task_inputs[i]).unsqueeze(-1)
            weighted = all_experts * gate_weights
            combined = torch.sum(weighted, dim=1)
            task_outputs.append(combined)

        return task_outputs

class PLE(nn.Module):
    def __init__(self, input_dim, num_layers, num_tasks,
                 num_shared_experts, num_task_experts,
                 expert_hidden_dim, task_hidden_dims):
        super(PLE, self).__init__()

        # 多层PLE(第1层输入维度为input_dim,后续层为expert_hidden_dim)
        self.ple_layers = nn.ModuleList()
        for i in range(num_layers):
            layer_input_dim = input_dim if i == 0 else expert_hidden_dim
            self.ple_layers.append(
                PLELayer(layer_input_dim, num_tasks, num_shared_experts,
                        num_task_experts, expert_hidden_dim)
            )

        # 任务特定层
        self.towers = nn.ModuleList()
        for hidden_dims in task_hidden_dims:
            tower = []
            input_dim_tower = expert_hidden_dim
            for hidden_dim in hidden_dims:
                tower.append(nn.Linear(input_dim_tower, hidden_dim))
                tower.append(nn.ReLU())
                tower.append(nn.Dropout(0.3))
                input_dim_tower = hidden_dim
            tower.append(nn.Linear(input_dim_tower, 1))
            self.towers.append(nn.Sequential(*tower))

    def forward(self, x):
        """
        x: [batch_size, input_dim]
        """
        # 多层PLE(首层接收原始输入tensor,后续层接收任务输出list)
        outputs = x
        for layer in self.ple_layers:
            outputs = layer(outputs)

        # 使用最后一层的输出
        final_outputs = outputs

        # 任务特定层
        task_predictions = []
        for i, tower in enumerate(self.towers):
            pred = tower(final_outputs[i])
            task_predictions.append(pred)

        return task_predictions

8.4 其他多任务学习模型

8.4.1 CGC(Customized Gate Control)

  • 在PLE基础上改进
  • 更灵活的门控机制
  • 更好的任务分离

8.4.2 AITM(Adaptive Information Transfer Multi-task)

  • 自适应信息传递
  • 任务间信息选择性共享
  • 减少负迁移

8.4.3 MMoE-v2

  • 改进的门控机制
  • 更好的专家选择
  • 提升模型性能

8.5 实战案例

案例:CTR和CVR联合预测

Python
import pandas as pd
import torch
from sklearn.model_selection import train_test_split

# 1. 加载数据
data = pd.read_csv('ad_data.csv')

# 2. 特征处理
categorical_features = ['user_id', 'item_id', 'category']
numerical_features = ['price', 'click_count', 'view_count']

# 编码类别特征
from sklearn.preprocessing import LabelEncoder
label_encoders = {}
for feat in categorical_features:
    le = LabelEncoder()
    data[feat] = le.fit_transform(data[feat])
    label_encoders[feat] = le

# 3. 构建训练数据
X = data[categorical_features + numerical_features].values
y_click = data['click'].values
y_convert = data['convert'].values

# 4. 划分数据集
X_train, X_test, y_click_train, y_click_test, \
y_convert_train, y_convert_test = train_test_split(
    X, y_click, y_convert, test_size=0.2, random_state=42
)

# 5. 转换为Tensor
X_train = torch.FloatTensor(X_train)
X_test = torch.FloatTensor(X_test)
y_click_train = torch.FloatTensor(y_click_train).unsqueeze(1)
y_click_test = torch.FloatTensor(y_click_test).unsqueeze(1)
y_convert_train = torch.FloatTensor(y_convert_train).unsqueeze(1)
y_convert_test = torch.FloatTensor(y_convert_test).unsqueeze(1)

# 6. 训练模型
model = PLE(
    input_dim=X.shape[1],
    num_layers=2,
    num_tasks=2,
    num_shared_experts=2,
    num_task_experts=2,
    expert_hidden_dim=128,
    task_hidden_dims=[[64, 32], [64, 32]]
)

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

def calculate_auc(y_true, y_pred):
    from sklearn.metrics import roc_auc_score
    return roc_auc_score(y_true, y_pred)

# 训练
for epoch in range(10):
    model.train()
    optimizer.zero_grad()

    # 前向传播
    outputs = model(X_train)
    output_click, output_convert = outputs

    # 计算损失
    loss_click = criterion(output_click, y_click_train)
    loss_convert = criterion(output_convert, y_convert_train)
    loss = loss_click + loss_convert

    # 反向传播
    loss.backward()
    optimizer.step()

    # 评估
    model.eval()
    with torch.no_grad():  # 禁用梯度计算,节省内存
        test_outputs = model(X_test)
        test_click, test_convert = test_outputs

        auc_click = calculate_auc(y_click_test.numpy(),
                                test_click.numpy())
        auc_convert = calculate_auc(y_convert_test.numpy(),
                                   test_convert.numpy())

    print(f"Epoch {epoch + 1}, Loss: {loss.item():.4f}, "
          f"Click AUC: {auc_click:.4f}, Convert AUC: {auc_convert:.4f}")

📝 本章小结

本章介绍了多任务学习推荐:

  1. ✅ 多任务学习的基本原理
  2. ✅ MMoE模型
  3. ✅ PLE模型
  4. ✅ 其他多任务学习模型
  5. ✅ 实战案例

通过本章学习,你应该能够: - 理解多任务学习的优势 - 实现MMoE和PLE模型 - 了解其他多任务学习模型 - 应用多任务学习解决实际问题

🔗 下一步

下一章我们将学习召回算法,了解如何从海量物品中快速筛选候选集。

继续学习: 09-召回算法.md

💡 思考题

  1. 多任务学习相比单任务学习有什么优势?

    ①共享表示学习(补充任务信号,缓解稀疏性) ②正则化效果(多目标互相约束防止过拟合) ③一个模型服务多个目标(节省计算资源)。典型:同时优化点击率+转化率+留存率,而单任务只能优化一个。场景:几乎所有工业推荐系统都是多目标的。

  2. MMoE和PLE各有什么优缺点?

    MMoE(Multi-gate Mixture-of-Experts):多个Expert网络+每个任务有独立Gate选择Expert。优点:灵活、获得业界广泛验证。缺点:任务相关性低时Expert编分不足(跟跁效应)。PLE(Progressive Layered Extraction):明确区分任务专属Expert和共享Expert,多层提取。优点:任务冲突小、表现更稳定。缺点:参数量更大。

  3. 如何设计多任务学习的损失函数?

    L = Σw_i×L_i。权重设计:①手动调权(根据业务重要性) ②不确定性加权(Kendall方法:用task的不确定性自动平衡) ③GradNorm(动态调整梯度范数一致) ④Pareto优化(寻找pareto最优解)。实践:先等权基线,再根据业务指标调权,最终常用不确定性加权。

  4. 多任务学习如何处理任务冲突?

    任务冲突:优化一个任务时另一个变差(如提升CTR但CVR下降)。解决:①PLE分离专属/共享Expert ②梯度手术(PCGrad:去除冲突梯度分量、CAGrad) ③交替训练(每次只优化一个任务) ④任务梯度截断(防止主任务受辅助任务干扰)。核心原则:多目标间必然有trade-off,根据业务目标找平衡点。

  5. 多任务学习在实际应用中有哪些挑战?

    ①任务定义(哪些目标值得联合建模) ②样本空间不一致(曝光样本>>点击>>购买) ③权重调优耗时(每次加新任务需重新调权) ④在线服务(多个任务头的推理延迟) ⑤任务优先级变化(业务不同阶段关注不同指标)。建议:先双任务(CTR+CVR),成功后再加更多目标。

📚 参考资料

  1. "Modeling Task Relationships in Multi-task Learning with Multi-gate Mixture-of-Experts" - Ma et al.
  2. "Progressive Layered Extraction (PLE): A Novel Multi-Task Learning (MTL) Model for Personalized Recommendations" - Tang et al.
  3. "An Improved Multi-gate Mixture-of-Experts for CTR Prediction" - Rendle et al.
  4. "Multi-task Learning for Recommendation" - Beutel et al.
  5. PyTorch Documentation