跳转至

召回算法

召回算法架构

📖 章节导读

召回(Recall)是推荐系统的第一步,负责从海量物品中快速筛选出候选集。本章将介绍召回算法的基本原理、主流方法和实际应用。

🎯 学习目标

  • 理解召回阶段的作用和挑战
  • 掌握双塔模型
  • 掌握ANN检索技术
  • 了解多路召回策略
  • 能够实现高效的召回系统

9.1 召回概述

9.1.1 召回的作用

目标:从百万级到亿级物品中筛选出几百到几千个候选物品

要求: 1. 高召回率:尽可能覆盖用户可能感兴趣的物品 2. 低延迟:快速响应,通常要求<100ms 3. 高精度:召回的物品要尽可能相关

9.1.2 召回vs排序

维度 召回 排序
物品数量 海量→几千 几千→几百
计算复杂度
延迟要求 极高 中等
模型复杂度 相对简单 复杂
目标 覆盖率 准确率

9.1.3 召回策略

常见召回策略: 1. 协同过滤召回:基于用户或物品相似度 2. 内容召回:基于内容特征相似度 3. 热度召回:推荐热门物品 4. 深度召回:使用深度学习模型 5. 多路召回:融合多种召回结果

9.2 双塔模型

9.2.1 模型原理

双塔模型(Two-Tower Model)是经典的召回模型,分别学习用户和物品的表示,然后计算相似度。

模型结构:

Text Only
用户特征 → 用户塔 → 用户向量
物品特征 → 物品塔 → 物品向量
相似度计算(点积、余弦相似度)

9.2.2 PyTorch实现

Python
import torch
import torch.nn as nn

class TwoTowerModel(nn.Module):  # 继承nn.Module定义网络层
    def __init__(self, user_feature_dim, item_feature_dim, embed_dim, hidden_dims):
        super(TwoTowerModel, self).__init__()

        # 用户塔
        user_layers = []
        input_dim = user_feature_dim
        for hidden_dim in hidden_dims:
            user_layers.append(nn.Linear(input_dim, hidden_dim))
            user_layers.append(nn.ReLU())
            user_layers.append(nn.BatchNorm1d(hidden_dim))
            user_layers.append(nn.Dropout(0.3))
            input_dim = hidden_dim
        user_layers.append(nn.Linear(input_dim, embed_dim))
        self.user_tower = nn.Sequential(*user_layers)

        # 物品塔
        item_layers = []
        input_dim = item_feature_dim
        for hidden_dim in hidden_dims:
            item_layers.append(nn.Linear(input_dim, hidden_dim))
            item_layers.append(nn.ReLU())
            item_layers.append(nn.BatchNorm1d(hidden_dim))
            item_layers.append(nn.Dropout(0.3))
            input_dim = hidden_dim
        item_layers.append(nn.Linear(input_dim, embed_dim))
        self.item_tower = nn.Sequential(*item_layers)

    def forward(self, user_features, item_features):
        """
        user_features: [batch_size, user_feature_dim]
        item_features: [batch_size, item_feature_dim]
        """
        # 用户向量
        user_vector = self.user_tower(user_features)

        # 物品向量
        item_vector = self.item_tower(item_features)

        # 相似度(点积)
        similarity = torch.sum(user_vector * item_vector, dim=1, keepdim=True)

        return similarity

    def get_user_vector(self, user_features):
        """获取用户向量"""
        return self.user_tower(user_features)

    def get_item_vector(self, item_features):
        """获取物品向量"""
        return self.item_tower(item_features)

9.2.3 训练代码

Python
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# 准备数据
user_features = torch.randn(10000, 50)
item_features = torch.randn(10000, 50)
labels = torch.randint(0, 2, (10000, 1)).float()

dataset = TensorDataset(user_features, item_features, labels)
dataloader = DataLoader(dataset, batch_size=256, shuffle=True)  # DataLoader批量加载数据

# 初始化模型
model = TwoTowerModel(
    user_feature_dim=50,
    item_feature_dim=50,
    embed_dim=128,
    hidden_dims=[256, 128]
)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练
for epoch in range(10):
    model.train()  # train()训练模式
    total_loss = 0
    for user_feat, item_feat, label in dataloader:
        optimizer.zero_grad()  # 清零梯度

        # 前向传播
        similarity = model(user_feat, item_feat)

        # 计算损失
        loss = criterion(similarity, label)

        # 反向传播
        loss.backward()  # 反向传播计算梯度
        optimizer.step()  # 更新参数

        total_loss += loss.item()  # 将单元素张量转为Python数值

    avg_loss = total_loss / len(dataloader)
    print(f"Epoch {epoch + 1}, Loss: {avg_loss:.4f}")

9.3 ANN检索

9.3.1 基本概念

ANN(Approximate Nearest Neighbor)近似最近邻搜索,用于快速找到与查询向量最相似的向量。

常用算法: 1. LSH(Locality Sensitive Hashing):局部敏感哈希 2. IVF(Inverted File Index):倒排索引 3. HNSW(Hierarchical Navigable Small World):分层导航小世界图 4. Faiss:Facebook开源的向量检索库

9.3.2 Faiss使用

Python
import faiss
import numpy as np

# 1. 构建索引
def build_index(vectors, index_type='IVFFlat'):
    """
    构建Faiss索引
    vectors: [n, d] 向量矩阵
    index_type: 索引类型
    """
    n, d = vectors.shape

    if index_type == 'IVFFlat':
        quantizer = faiss.IndexFlatL2(d)
        index = faiss.IndexIVFFlat(quantizer, d, 100)  # 100个聚类中心
        index.train(vectors)
        index.add(vectors)
    elif index_type == 'HNSW':
        index = faiss.IndexHNSWFlat(d, 32)  # M=32
        index.add(vectors)
    elif index_type == 'Flat':
        index = faiss.IndexFlatL2(d)
        index.add(vectors)

    return index

# 2. 搜索
def search(index, query, k=10):
    """
    搜索最近邻
    index: Faiss索引
    query: 查询向量 [d]
    k: 返回Top K
    """
    query = query.reshape(1, -1)  # 重塑张量形状
    distances, indices = index.search(query, k)
    return distances[0], indices[0]

# 3. 示例
# 生成随机向量
vectors = np.random.randn(100000, 128).astype('float32')

# 构建索引
index = build_index(vectors, index_type='IVFFlat')

# 搜索
query = np.random.randn(128).astype('float32')
distances, indices = search(index, query, k=10)

print(f"Top 10 nearest items: {indices}")
print(f"Distances: {distances}")

9.3.3 召回系统集成

Python
class ANNRecall:
    def __init__(self, model, item_features, index_type='IVFFlat'):
        """
        model: 双塔模型
        item_features: 物品特征
        index_type: 索引类型
        """
        self.model = model
        self.item_features = item_features

        # 预计算物品向量
        self.model.eval()
        with torch.no_grad():  # 禁用梯度计算,节省内存
            self.item_vectors = self.model.get_item_vector(
                torch.FloatTensor(item_features)
            ).numpy().astype('float32')

        # 构建索引
        self.index = build_index(self.item_vectors, index_type)

    def recall(self, user_features, top_k=100):
        """
        召回Top K物品
        """
        # 获取用户向量
        self.model.eval()
        with torch.no_grad():
            user_vector = self.model.get_user_vector(
                torch.FloatTensor(user_features)
            ).numpy().astype('float32')

        # 搜索
        distances, indices = search(self.index, user_vector, k=top_k)

        return indices

9.4 多路召回

9.4.1 召回策略融合

常见召回路: 1. 协同过滤召回:User-based CF, Item-based CF 2. 内容召回:基于内容相似度 3. 深度召回:双塔模型、深度匹配 4. 热度召回:热门物品 5. 实时召回:基于实时行为

9.4.2 融合策略

简单融合:

Python
def multi_recall(user_id, recall_strategies, weights):
    """
    多路召回融合
    recall_strategies: 召回策略列表
    weights: 各策略权重
    """
    all_items = {}

    for strategy, weight in zip(recall_strategies, weights):  # zip按位置配对
        items = strategy.recall(user_id)
        for item_id, score in items:
            if item_id not in all_items:
                all_items[item_id] = 0
            all_items[item_id] += weight * score

    # 排序
    sorted_items = sorted(all_items.items(), key=lambda x: x[1], reverse=True)  # lambda匿名函数

    return [item_id for item_id, score in sorted_items]

学习融合:

Python
class RecallFusion(nn.Module):
    def __init__(self, num_strategies, hidden_dim):
        super(RecallFusion, self).__init__()

        # 特征融合层
        self.fc1 = nn.Linear(num_strategies, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 1)
        self.relu = nn.ReLU()

    def forward(self, recall_scores):
        """
        recall_scores: [batch_size, num_strategies]
        """
        x = self.relu(self.fc1(recall_scores))
        score = self.fc2(x)
        return score

9.5 实战案例

案例:电商召回系统

Python
import pandas as pd
import numpy as np
import torch
import faiss

# 1. 加载数据
users = pd.read_csv('users.csv')
items = pd.read_csv('items.csv')
interactions = pd.read_csv('interactions.csv')

# 2. 特征工程
def extract_user_features(user_id):
    """提取用户特征"""
    user = users[users['user_id'] == user_id].iloc[0]
    features = [
        user['age'],
        user['gender'],
        user['income_level'],
        user['avg_spend'],
        user['active_days']
    ]
    return np.array(features)  # np.array创建NumPy数组

def extract_item_features(item_id):
    """提取物品特征"""
    item = items[items['item_id'] == item_id].iloc[0]
    features = [
        item['price'],
        item['category'],
        item['brand'],
        item['rating'],
        item['sales']
    ]
    return np.array(features)

# 3. 构建训练数据
train_data = []
for _, row in interactions.iterrows():
    user_feat = extract_user_features(row['user_id'])
    item_feat = extract_item_features(row['item_id'])
    label = row['click']
    train_data.append((user_feat, item_feat, label))

# 4. 训练双塔模型
user_features = torch.FloatTensor([d[0] for d in train_data])
item_features = torch.FloatTensor([d[1] for d in train_data])
labels = torch.FloatTensor([d[2] for d in train_data]).unsqueeze(1)  # unsqueeze增加一个维度

model = TwoTowerModel(
    user_feature_dim=5,
    item_feature_dim=5,
    embed_dim=128,
    hidden_dims=[256, 128]
)

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 训练(简化)
for epoch in range(5):
    optimizer.zero_grad()
    similarity = model(user_features, item_features)
    loss = criterion(similarity, labels)
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch + 1}, Loss: {loss.item():.4f}")

# 5. 构建召回系统
all_item_features = np.array([extract_item_features(i) for i in items['item_id']])
recall_system = ANNRecall(model, all_item_features, index_type='IVFFlat')

# 6. 召回示例
user_id = 1
user_feat = extract_user_features(user_id)
recalled_items = recall_system.recall(user_feat, top_k=100)

print(f"为用户{user_id}召回的物品: {recalled_items[:10]}")  # 切片操作,取前n个元素

📝 本章小结

本章介绍了召回算法:

  1. ✅ 召回阶段的作用和挑战
  2. ✅ 双塔模型
  3. ✅ ANN检索技术
  4. ✅ 多路召回策略
  5. ✅ 实战案例

通过本章学习,你应该能够: - 理解召回阶段的重要性 - 实现双塔召回模型 - 使用Faiss进行高效检索 - 设计多路召回策略 - 构建高效的召回系统

🔗 下一步

下一章我们将学习排序算法,了解如何对召回的候选集进行精准排序。

继续学习: 10-排序算法.md

💡 思考题

  1. 召回和排序各有什么作用?如何平衡?

    召回:从billion级物品中快速筛选几百-几千候选集,要求速度快+召回率高。排序:对候选集精确打分排序,要求精度高。平衡:召回决定上限(推荐池),排序决定质量。召回漏掉的排序救不回来,所以召回率是第一优先级。

  2. 双塔模型相比单塔模型有什么优势?

    单塔:用户和物品特征拼接后进模型,精度高但需遍历所有物品(O(N)推理)。双塔(DSSM):用户和物品分别编码为向量,可离线计算物品向量、在线ANN检索(O(logN))。优势:①推理速度快百万倍 ②物品向量可离线计算+索引 ③易于更新。缺点:无法建模用户-物品交叉特征。

  3. 如何选择合适的ANN索引类型?

    HNSW:召回率最高(>95%)、速度快,但内存大。IVF:平衡型,适合大规模。PQ:最省内存但精度低。IVF+PQ:大规模生产首选。选择原则:<100万物品用HNSW;100万-1亿用IVF+HNSW;>1亿用IVF+PQ。工具:FAISS/Milvus/Qdrant。

  4. 多路召回如何设计融合策略?

    多路:向量召回+热门召回+协同召回+实时召回+运营政策。融合策略:①简单去重取并集 ②按路配额(各路固定比例) ③加权融合(合并分数) ④级联融合(统一进粷排模型)。实践:统一进粷排最佳,各路召回不同量级配额以平衡多样性和准确性。

  5. 如何评估召回系统的效果?

    核心指标:Recall@K(召回K个候选中包含正样本的比例)、召回集的CTR/CVR(下游质量)、响应时间(P99<50ms)。评估方法:①离线回放(AUC/Recall) ②在线A/B测试(替换召回路线观察业务指标) ③召回覆盖率分析(各路的唯一贡献率,判断有无冗余)。

📚 参考资料

  1. "Two-Tower Neural Network Models for Document Ranking" - Huang et al.
  2. "Efficient Retrieval for Deep Learning Recommender Systems" - Yang et al.
  3. Faiss Documentation
  4. "Product Quantization for Nearest Neighbor Search" - Jegou et al.
  5. "Hierarchical Navigable Small World Graphs" - Malkov & Yashunin