召回算法¶
📖 章节导读¶
召回(Recall)是推荐系统的第一步,负责从海量物品中快速筛选出候选集。本章将介绍召回算法的基本原理、主流方法和实际应用。
🎯 学习目标¶
- 理解召回阶段的作用和挑战
- 掌握双塔模型
- 掌握ANN检索技术
- 了解多路召回策略
- 能够实现高效的召回系统
9.1 召回概述¶
9.1.1 召回的作用¶
目标:从百万级到亿级物品中筛选出几百到几千个候选物品
要求: 1. 高召回率:尽可能覆盖用户可能感兴趣的物品 2. 低延迟:快速响应,通常要求<100ms 3. 高精度:召回的物品要尽可能相关
9.1.2 召回vs排序¶
| 维度 | 召回 | 排序 |
|---|---|---|
| 物品数量 | 海量→几千 | 几千→几百 |
| 计算复杂度 | 高 | 中 |
| 延迟要求 | 极高 | 中等 |
| 模型复杂度 | 相对简单 | 复杂 |
| 目标 | 覆盖率 | 准确率 |
9.1.3 召回策略¶
常见召回策略: 1. 协同过滤召回:基于用户或物品相似度 2. 内容召回:基于内容特征相似度 3. 热度召回:推荐热门物品 4. 深度召回:使用深度学习模型 5. 多路召回:融合多种召回结果
9.2 双塔模型¶
9.2.1 模型原理¶
双塔模型(Two-Tower Model)是经典的召回模型,分别学习用户和物品的表示,然后计算相似度。
模型结构:
9.2.2 PyTorch实现¶
import torch
import torch.nn as nn
class TwoTowerModel(nn.Module): # 继承nn.Module定义网络层
def __init__(self, user_feature_dim, item_feature_dim, embed_dim, hidden_dims):
super(TwoTowerModel, self).__init__()
# 用户塔
user_layers = []
input_dim = user_feature_dim
for hidden_dim in hidden_dims:
user_layers.append(nn.Linear(input_dim, hidden_dim))
user_layers.append(nn.ReLU())
user_layers.append(nn.BatchNorm1d(hidden_dim))
user_layers.append(nn.Dropout(0.3))
input_dim = hidden_dim
user_layers.append(nn.Linear(input_dim, embed_dim))
self.user_tower = nn.Sequential(*user_layers)
# 物品塔
item_layers = []
input_dim = item_feature_dim
for hidden_dim in hidden_dims:
item_layers.append(nn.Linear(input_dim, hidden_dim))
item_layers.append(nn.ReLU())
item_layers.append(nn.BatchNorm1d(hidden_dim))
item_layers.append(nn.Dropout(0.3))
input_dim = hidden_dim
item_layers.append(nn.Linear(input_dim, embed_dim))
self.item_tower = nn.Sequential(*item_layers)
def forward(self, user_features, item_features):
"""
user_features: [batch_size, user_feature_dim]
item_features: [batch_size, item_feature_dim]
"""
# 用户向量
user_vector = self.user_tower(user_features)
# 物品向量
item_vector = self.item_tower(item_features)
# 相似度(点积)
similarity = torch.sum(user_vector * item_vector, dim=1, keepdim=True)
return similarity
def get_user_vector(self, user_features):
"""获取用户向量"""
return self.user_tower(user_features)
def get_item_vector(self, item_features):
"""获取物品向量"""
return self.item_tower(item_features)
9.2.3 训练代码¶
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
# 准备数据
user_features = torch.randn(10000, 50)
item_features = torch.randn(10000, 50)
labels = torch.randint(0, 2, (10000, 1)).float()
dataset = TensorDataset(user_features, item_features, labels)
dataloader = DataLoader(dataset, batch_size=256, shuffle=True) # DataLoader批量加载数据
# 初始化模型
model = TwoTowerModel(
user_feature_dim=50,
item_feature_dim=50,
embed_dim=128,
hidden_dims=[256, 128]
)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练
for epoch in range(10):
model.train() # train()训练模式
total_loss = 0
for user_feat, item_feat, label in dataloader:
optimizer.zero_grad() # 清零梯度
# 前向传播
similarity = model(user_feat, item_feat)
# 计算损失
loss = criterion(similarity, label)
# 反向传播
loss.backward() # 反向传播计算梯度
optimizer.step() # 更新参数
total_loss += loss.item() # 将单元素张量转为Python数值
avg_loss = total_loss / len(dataloader)
print(f"Epoch {epoch + 1}, Loss: {avg_loss:.4f}")
9.3 ANN检索¶
9.3.1 基本概念¶
ANN(Approximate Nearest Neighbor)近似最近邻搜索,用于快速找到与查询向量最相似的向量。
常用算法: 1. LSH(Locality Sensitive Hashing):局部敏感哈希 2. IVF(Inverted File Index):倒排索引 3. HNSW(Hierarchical Navigable Small World):分层导航小世界图 4. Faiss:Facebook开源的向量检索库
9.3.2 Faiss使用¶
import faiss
import numpy as np
# 1. 构建索引
def build_index(vectors, index_type='IVFFlat'):
"""
构建Faiss索引
vectors: [n, d] 向量矩阵
index_type: 索引类型
"""
n, d = vectors.shape
if index_type == 'IVFFlat':
quantizer = faiss.IndexFlatL2(d)
index = faiss.IndexIVFFlat(quantizer, d, 100) # 100个聚类中心
index.train(vectors)
index.add(vectors)
elif index_type == 'HNSW':
index = faiss.IndexHNSWFlat(d, 32) # M=32
index.add(vectors)
elif index_type == 'Flat':
index = faiss.IndexFlatL2(d)
index.add(vectors)
return index
# 2. 搜索
def search(index, query, k=10):
"""
搜索最近邻
index: Faiss索引
query: 查询向量 [d]
k: 返回Top K
"""
query = query.reshape(1, -1) # 重塑张量形状
distances, indices = index.search(query, k)
return distances[0], indices[0]
# 3. 示例
# 生成随机向量
vectors = np.random.randn(100000, 128).astype('float32')
# 构建索引
index = build_index(vectors, index_type='IVFFlat')
# 搜索
query = np.random.randn(128).astype('float32')
distances, indices = search(index, query, k=10)
print(f"Top 10 nearest items: {indices}")
print(f"Distances: {distances}")
9.3.3 召回系统集成¶
class ANNRecall:
def __init__(self, model, item_features, index_type='IVFFlat'):
"""
model: 双塔模型
item_features: 物品特征
index_type: 索引类型
"""
self.model = model
self.item_features = item_features
# 预计算物品向量
self.model.eval()
with torch.no_grad(): # 禁用梯度计算,节省内存
self.item_vectors = self.model.get_item_vector(
torch.FloatTensor(item_features)
).numpy().astype('float32')
# 构建索引
self.index = build_index(self.item_vectors, index_type)
def recall(self, user_features, top_k=100):
"""
召回Top K物品
"""
# 获取用户向量
self.model.eval()
with torch.no_grad():
user_vector = self.model.get_user_vector(
torch.FloatTensor(user_features)
).numpy().astype('float32')
# 搜索
distances, indices = search(self.index, user_vector, k=top_k)
return indices
9.4 多路召回¶
9.4.1 召回策略融合¶
常见召回路: 1. 协同过滤召回:User-based CF, Item-based CF 2. 内容召回:基于内容相似度 3. 深度召回:双塔模型、深度匹配 4. 热度召回:热门物品 5. 实时召回:基于实时行为
9.4.2 融合策略¶
简单融合:
def multi_recall(user_id, recall_strategies, weights):
"""
多路召回融合
recall_strategies: 召回策略列表
weights: 各策略权重
"""
all_items = {}
for strategy, weight in zip(recall_strategies, weights): # zip按位置配对
items = strategy.recall(user_id)
for item_id, score in items:
if item_id not in all_items:
all_items[item_id] = 0
all_items[item_id] += weight * score
# 排序
sorted_items = sorted(all_items.items(), key=lambda x: x[1], reverse=True) # lambda匿名函数
return [item_id for item_id, score in sorted_items]
学习融合:
class RecallFusion(nn.Module):
def __init__(self, num_strategies, hidden_dim):
super(RecallFusion, self).__init__()
# 特征融合层
self.fc1 = nn.Linear(num_strategies, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, 1)
self.relu = nn.ReLU()
def forward(self, recall_scores):
"""
recall_scores: [batch_size, num_strategies]
"""
x = self.relu(self.fc1(recall_scores))
score = self.fc2(x)
return score
9.5 实战案例¶
案例:电商召回系统¶
import pandas as pd
import numpy as np
import torch
import faiss
# 1. 加载数据
users = pd.read_csv('users.csv')
items = pd.read_csv('items.csv')
interactions = pd.read_csv('interactions.csv')
# 2. 特征工程
def extract_user_features(user_id):
"""提取用户特征"""
user = users[users['user_id'] == user_id].iloc[0]
features = [
user['age'],
user['gender'],
user['income_level'],
user['avg_spend'],
user['active_days']
]
return np.array(features) # np.array创建NumPy数组
def extract_item_features(item_id):
"""提取物品特征"""
item = items[items['item_id'] == item_id].iloc[0]
features = [
item['price'],
item['category'],
item['brand'],
item['rating'],
item['sales']
]
return np.array(features)
# 3. 构建训练数据
train_data = []
for _, row in interactions.iterrows():
user_feat = extract_user_features(row['user_id'])
item_feat = extract_item_features(row['item_id'])
label = row['click']
train_data.append((user_feat, item_feat, label))
# 4. 训练双塔模型
user_features = torch.FloatTensor([d[0] for d in train_data])
item_features = torch.FloatTensor([d[1] for d in train_data])
labels = torch.FloatTensor([d[2] for d in train_data]).unsqueeze(1) # unsqueeze增加一个维度
model = TwoTowerModel(
user_feature_dim=5,
item_feature_dim=5,
embed_dim=128,
hidden_dims=[256, 128]
)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 训练(简化)
for epoch in range(5):
optimizer.zero_grad()
similarity = model(user_features, item_features)
loss = criterion(similarity, labels)
loss.backward()
optimizer.step()
print(f"Epoch {epoch + 1}, Loss: {loss.item():.4f}")
# 5. 构建召回系统
all_item_features = np.array([extract_item_features(i) for i in items['item_id']])
recall_system = ANNRecall(model, all_item_features, index_type='IVFFlat')
# 6. 召回示例
user_id = 1
user_feat = extract_user_features(user_id)
recalled_items = recall_system.recall(user_feat, top_k=100)
print(f"为用户{user_id}召回的物品: {recalled_items[:10]}") # 切片操作,取前n个元素
📝 本章小结¶
本章介绍了召回算法:
- ✅ 召回阶段的作用和挑战
- ✅ 双塔模型
- ✅ ANN检索技术
- ✅ 多路召回策略
- ✅ 实战案例
通过本章学习,你应该能够: - 理解召回阶段的重要性 - 实现双塔召回模型 - 使用Faiss进行高效检索 - 设计多路召回策略 - 构建高效的召回系统
🔗 下一步¶
下一章我们将学习排序算法,了解如何对召回的候选集进行精准排序。
继续学习: 10-排序算法.md
💡 思考题¶
-
召回和排序各有什么作用?如何平衡?
召回:从billion级物品中快速筛选几百-几千候选集,要求速度快+召回率高。排序:对候选集精确打分排序,要求精度高。平衡:召回决定上限(推荐池),排序决定质量。召回漏掉的排序救不回来,所以召回率是第一优先级。
-
双塔模型相比单塔模型有什么优势?
单塔:用户和物品特征拼接后进模型,精度高但需遍历所有物品(O(N)推理)。双塔(DSSM):用户和物品分别编码为向量,可离线计算物品向量、在线ANN检索(O(logN))。优势:①推理速度快百万倍 ②物品向量可离线计算+索引 ③易于更新。缺点:无法建模用户-物品交叉特征。
-
如何选择合适的ANN索引类型?
HNSW:召回率最高(>95%)、速度快,但内存大。IVF:平衡型,适合大规模。PQ:最省内存但精度低。IVF+PQ:大规模生产首选。选择原则:<100万物品用HNSW;100万-1亿用IVF+HNSW;>1亿用IVF+PQ。工具:FAISS/Milvus/Qdrant。
-
多路召回如何设计融合策略?
多路:向量召回+热门召回+协同召回+实时召回+运营政策。融合策略:①简单去重取并集 ②按路配额(各路固定比例) ③加权融合(合并分数) ④级联融合(统一进粷排模型)。实践:统一进粷排最佳,各路召回不同量级配额以平衡多样性和准确性。
-
如何评估召回系统的效果?
核心指标:Recall@K(召回K个候选中包含正样本的比例)、召回集的CTR/CVR(下游质量)、响应时间(P99<50ms)。评估方法:①离线回放(AUC/Recall) ②在线A/B测试(替换召回路线观察业务指标) ③召回覆盖率分析(各路的唯一贡献率,判断有无冗余)。
📚 参考资料¶
- "Two-Tower Neural Network Models for Document Ranking" - Huang et al.
- "Efficient Retrieval for Deep Learning Recommender Systems" - Yang et al.
- Faiss Documentation
- "Product Quantization for Nearest Neighbor Search" - Jegou et al.
- "Hierarchical Navigable Small World Graphs" - Malkov & Yashunin
