跳转至

排序算法

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

学习排序算法

📖 章节导读

排序(Ranking)是推荐系统的第二步,负责对召回的候选集进行精准排序,最终展示给用户。本章将介绍排序算法的基本原理、主流模型和实际应用。

🎯 学习目标

  • 理解排序阶段的作用和挑战
  • 掌握逻辑回归(LR)模型
  • 掌握GBDT模型
  • 掌握深度排序模型
  • 能够实现高效的排序系统

10.1 排序概述

10.1.1 排序的作用

目标:对召回的几百到几千个候选物品进行精准排序,选出Top N展示给用户

要求: 1. 高准确率:排序结果要尽可能符合用户偏好 2. 高效率:快速响应,通常要求<50ms 3. 可解释性:能够解释排序理由

10.1.2 排序vs召回

维度 召回 排序
物品数量 海量→几千 几千→几十
计算复杂度
延迟要求 极高 中等
模型复杂度 相对简单 复杂
目标 覆盖率 准确率

10.1.3 排序策略

常见排序策略: 1. 点对方法:学习每个样本的独立得分 2. 对对方法:学习样本对的相对顺序 3. 列表对方法:学习整个列表的排序

10.2 逻辑回归(LR)

10.2.1 模型原理

逻辑回归是经典的排序模型,简单高效,易于解释。

模型公式:

Text Only
P(y=1|x) = sigmoid(w^T x + b)

特点: - 简单高效,训练快速 - 可解释性强 - 适合线性可分数据

10.2.2 特征工程

常用特征: 1. 用户特征:年龄、性别、地域、消费能力 2. 物品特征:类别、品牌、价格、评分 3. 交叉特征:用户-物品交互特征 4. 上下文特征:时间、地点、设备

特征交叉:

Python
def feature_cross(user_features, item_features):
    """
    特征交叉
    """
    # 一阶特征
    features = []
    features.extend(user_features)
    features.extend(item_features)

    # 二阶交叉特征
    for uf in user_features:
        for if_ in item_features:
            features.append(uf * if_)

    return features

10.2.3 PyTorch实现

Python
import torch
import torch.nn as nn

class LogisticRegression(nn.Module):  # 继承nn.Module定义网络层
    def __init__(self, input_dim):
        super(LogisticRegression, self).__init__()
        self.linear = nn.Linear(input_dim, 1)

    def forward(self, x):
        """
        x: [batch_size, input_dim]
        """
        output = self.linear(x)
        return torch.sigmoid(output)

# 训练
model = LogisticRegression(input_dim=100)
criterion = nn.BCELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

for epoch in range(100):
    optimizer.zero_grad()  # 清零梯度
    output = model(X_train)
    loss = criterion(output, y_train)
    loss.backward()  # 反向传播计算梯度
    optimizer.step()  # 更新参数

10.3 GBDT模型

10.3.1 模型原理

GBDT(Gradient Boosting Decision Tree)是强大的排序模型,能够学习非线性特征。

特点: - 能够自动学习特征交互 - 对缺失值不敏感 - 泛化能力强

10.3.2 XGBoost实现

Python
import xgboost as xgb
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import auc, roc_auc_score

# 1. 准备数据
data = pd.read_csv('ranking_data.csv')
X = data.drop('label', axis=1)
y = data['label']

# 2. 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# 3. 转换为DMatrix
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

# 4. 训练模型
params = {
    'objective': 'binary:logistic',
    'eval_metric': 'auc',
    'max_depth': 6,
    'eta': 0.1,
    'subsample': 0.8,
    'colsample_bytree': 0.8,
    'seed': 42
}

model = xgb.train(
    params,
    dtrain,
    num_boost_round=1000,
    evals=[(dtrain, 'train'), (dtest, 'test')],
    early_stopping_rounds=50,
    verbose_eval=100
)

# 5. 预测
y_pred = model.predict(dtest)
auc_score = roc_auc_score(y_test, y_pred)
print(f"AUC: {auc_score:.4f}")

# 6. 特征重要性
importance = model.get_score(importance_type='gain')
sorted_importance = sorted(importance.items(),
                        key=lambda x: x[1], reverse=True)  # lambda匿名函数
print("特征重要性:")
for feature, score in sorted_importance[:10]:  # 切片操作,取前n个元素
    print(f"{feature}: {score:.2f}")

10.3.3 LightGBM实现

Python
import lightgbm as lgb

# 1. 准备数据
train_data = lgb.Dataset(X_train, label=y_train)
test_data = lgb.Dataset(X_test, label=y_test, reference=train_data)

# 2. 训练模型
params = {
    'objective': 'binary',
    'metric': 'auc',
    'num_leaves': 31,
    'learning_rate': 0.05,
    'feature_fraction': 0.8,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'verbose': -1
}

model = lgb.train(
    params,
    train_data,
    num_boost_round=1000,
    valid_sets=[train_data, test_data],
    early_stopping_rounds=50,
    verbose_eval=100
)

# 3. 预测
y_pred = model.predict(X_test, num_iteration=model.best_iteration)
auc_score = roc_auc_score(y_test, y_pred)
print(f"AUC: {auc_score:.4f}")

10.4 深度排序模型

10.4.1 DeepFM

DeepFM结合了FM和DNN的优势,能够同时学习低阶和高阶特征交互。

Python
import torch
import torch.nn as nn

class DeepFM(nn.Module):
    def __init__(self, field_dims, embed_dim, hidden_dims):
        super(DeepFM, self).__init__()

        # Embedding层
        self.embedding = nn.ModuleList([
            nn.Embedding(dim, embed_dim) for dim in field_dims
        ])

        # FM部分(一阶项,用Embedding查找而非nn.Linear)
        self.fm_linear = nn.ModuleList([
            nn.Embedding(dim, 1) for dim in field_dims
        ])

        # Deep部分
        input_dim = len(field_dims) * embed_dim
        deep_layers = []
        for hidden_dim in hidden_dims:
            deep_layers.append(nn.Linear(input_dim, hidden_dim))
            deep_layers.append(nn.ReLU())
            deep_layers.append(nn.BatchNorm1d(hidden_dim))
            deep_layers.append(nn.Dropout(0.3))
            input_dim = hidden_dim
        self.deep = nn.Sequential(*deep_layers)

        # 输出层
        self.output_layer = nn.Linear(hidden_dims[-1] + 1, 1)  # [-1]负索引取最后元素

    def forward(self, X):
        """
        X: [batch_size, num_fields]
        """
        # Embedding
        embeddings = []
        for i in range(X.shape[1]):
            embed = self.embedding[i](X[:, i])
            embeddings.append(embed)
        embeddings = torch.stack(embeddings, dim=1)  # torch.stack沿新维度拼接张量

        # FM一阶部分
        fm_first_order = 0
        for i in range(X.shape[1]):
            fm_first_order += self.fm_linear[i](X[:, i])

        # FM二阶部分
        square_of_sum = torch.sum(embeddings, dim=1) ** 2
        sum_of_square = torch.sum(embeddings ** 2, dim=1)
        fm_second_order = 0.5 * torch.sum(
            square_of_sum - sum_of_square, dim=1, keepdim=True
        )

        # FM完整输出(一阶 + 二阶)
        fm_output = fm_first_order + fm_second_order

        # Deep部分
        deep_input = embeddings.view(embeddings.size(0), -1)  # 重塑张量形状
        deep_output = self.deep(deep_input)

        # 融合FM和Deep
        concat = torch.cat([fm_output, deep_output], dim=1)  # torch.cat沿已有维度拼接张量
        output = self.output_layer(concat)

        return torch.sigmoid(output)

10.4.2 DIN(Deep Interest Network)

DIN使用注意力机制动态调整用户兴趣。

Python
class Attention(nn.Module):
    def __init__(self, embed_dim):
        super(Attention, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(embed_dim * 4, embed_dim),
            nn.ReLU(),
            nn.Linear(embed_dim, 1)
        )

    def forward(self, query, keys):
        """
        query: [batch_size, embed_dim]
        keys: [batch_size, seq_len, embed_dim]
        """
        # 扩展query
        query = query.unsqueeze(1).repeat(1, keys.size(1), 1)  # unsqueeze增加一个维度

        # 拼接(DIN原论文使用query, keys, query-keys, query*keys四组特征)
        concat = torch.cat([query, keys, query - keys, query * keys], dim=-1)

        # 计算注意力权重
        scores = self.fc(concat).squeeze(-1)  # squeeze压缩维度
        weights = torch.softmax(scores, dim=1)

        # 加权求和
        attended = torch.sum(weights.unsqueeze(-1) * keys, dim=1)

        return attended

class DIN(nn.Module):
    def __init__(self, item_dim, embed_dim, hidden_dims):
        super(DIN, self).__init__()

        # Embedding
        self.item_embedding = nn.Embedding(item_dim, embed_dim)

        # 注意力
        self.attention = Attention(embed_dim)

        # MLP
        input_dim = embed_dim * 2
        layers = []
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(input_dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(0.3))
            input_dim = hidden_dim
        layers.append(nn.Linear(input_dim, 1))
        self.mlp = nn.Sequential(*layers)

    def forward(self, history_items, target_item):
        """
        history_items: [batch_size, seq_len]
        target_item: [batch_size]
        """
        # Embedding
        history_embeds = self.item_embedding(history_items)
        target_embed = self.item_embedding(target_item)

        # 注意力
        user_interest = self.attention(target_embed, history_embeds)

        # MLP
        concat = torch.cat([user_interest, target_embed], dim=1)
        output = self.mlp(concat)

        return torch.sigmoid(output)

10.5 实战案例

案例:电商排序系统

Python
import pandas as pd
import numpy as np
import torch
from sklearn.model_selection import train_test_split

# 1. 加载数据
data = pd.read_csv('ranking_data.csv')

# 2. 特征工程
def extract_features(row):
    """提取特征"""
    features = []

    # 用户特征
    features.append(row['user_age'])
    features.append(row['user_gender'])
    features.append(row['user_income_level'])

    # 物品特征
    features.append(row['item_price'])
    features.append(row['item_category'])
    features.append(row['item_rating'])

    # 交叉特征
    features.append(row['user_age'] * row['item_price'])
    features.append(row['user_income_level'] * row['item_category'])

    return features

# 3. 构建训练数据
X = np.array([extract_features(row) for _, row in data.iterrows()])  # np.array创建NumPy数组
y = data['click'].values

# 4. 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# 5. 训练模型
model = LogisticRegression(input_dim=X.shape[1])
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.FloatTensor(y_train).unsqueeze(1)

for epoch in range(100):
    optimizer.zero_grad()
    output = model(X_train_tensor)
    loss = criterion(output, y_train_tensor)
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 20 == 0:
        print(f"Epoch {epoch + 1}, Loss: {loss.item():.4f}")  # 将单元素张量转为Python数值

# 6. 评估
model.eval()  # eval()评估模式
with torch.no_grad():  # 禁用梯度计算,节省内存
    X_test_tensor = torch.FloatTensor(X_test)
    y_pred = model(X_test_tensor).numpy()
    auc_score = roc_auc_score(y_test, y_pred)
    print(f"Test AUC: {auc_score:.4f}")

# 7. 排序示例
def rank_items(model, user_features, item_features_list):
    """对物品排序"""
    scores = []
    for item_features in item_features_list:
        features = user_features + item_features
        x = torch.FloatTensor([features])
        score = model(x).item()
        scores.append(score)

    # 排序
    sorted_indices = np.argsort(scores)[::-1]
    return sorted_indices

# 测试
user_features = [25, 1, 2]  # 用户特征
item_features_list = [
    [100, 1, 4.5],  # 物品1特征
    [50, 2, 4.0],   # 物品2特征
    [200, 1, 4.8],  # 物品3特征
]

ranked_indices = rank_items(model, user_features, item_features_list)
print(f"排序结果: {ranked_indices}")

📝 本章小结

本章介绍了排序算法:

  1. ✅ 排序阶段的作用和挑战
  2. ✅ 逻辑回归模型
  3. ✅ GBDT模型
  4. ✅ 深度排序模型
  5. ✅ 实战案例

通过本章学习,你应该能够: - 理解排序阶段的重要性 - 实现LR和GBDT排序 - 实现深度排序模型 - 应用排序算法解决实际问题

🔗 下一步

下一章我们将学习在线学习,了解如何实时更新推荐模型。

继续学习: 11-在线学习.md

💡 思考题

  1. 排序和召回各有什么作用?如何配合?

    召回:快速筛出候选集(~千级),重召回率。排序:精确打分排序(TopK),重精度。配合架构:召回(ms级,千万→1000)→粗排(10ms,1000→200)→精排(50ms,200→50)→重排(多样性/业务规则→最终展示)。粷排模型越复杂,候选集越小。

  2. LR和GBDT各有什么优缺点?

    LR(逻辑回归):简单快速、可解释性强、易上线,但需手动特征交叉、表达能力弱。GBDT:自动特征组合、处理缺失值好、非线性强,但难处理高维稀疏特征、不易在线学习。经典组合:GBDT+LR(Facebook 2014,GBDT做特征变换,LR做最终打分)。现已被深度模型替代。

  3. 深度排序模型相比传统方法有什么优势?

    ①自动特征交叉(无需手动设计) ②Embedding处理高维稀疏特征 ③序列建模(捕捉用户行为演变) ④多任务联合优化 ⑤容量更大,开发更快。代表:DeepFM→DIN→DIEN→DCNv2。现代排序基本都是深度模型。

  4. 如何设计有效的排序特征?

    四类特征:①用户特征(画像/统计) ②物品特征(属性/统计) ③交叉特征(用户×物品历史交互) ④上下文特征(时间/设备/位置)。重要技巧:行为序列特征(DIN)、实时特征(最近30min行为)、序列位置特征(position bias去优)。特征质量>模型复杂度。

  5. 如何评估排序系统的效果?

    离线:AUC(区分能力,>0.7才有意义)、GAUC(分用户计算AUC再平均)、NDCG(排序质量)、校准度(Calibration)。在线:CTR、CVR、时长、GMV、用户留存。重要:离线AUC提升不一定线上有效,必须经A/B测试验证。GAUC比AUC更接近线上效果。

📚 参考资料

  1. "Learning to Rank for Information Retrieval" - Liu
  2. "From RankNet to LambdaRank to LambdaMART: An Overview" - Burges
  3. "Deep Interest Network for Click-Through Rate Prediction" - Zhou et al.
  4. "Ad Click Prediction: a View from the Trenches" - McMahan et al.
  5. XGBoost Documentation