01 - 数据工程与预处理¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
学习目标:掌握大模型训练的数据收集、清洗、去重、 tokenization 和高效数据加载技术。
目录¶
数据 pipeline 概述¶
1.1 完整数据流程¶
原始数据
│
▼
┌─────────────────┐
│ 数据收集 │ ← Common Crawl, GitHub, 书籍, 论文
│ (Data Collection)│
└────────┬────────┘
│
▼
┌─────────────────┐
│ 文本提取 │ ← HTML解析, PDF提取, 格式转换
│ (Text Extraction)│
└────────┬────────┘
│
▼
┌─────────────────┐
│ 质量过滤 │ ← 语言检测, 质量评分, 垃圾过滤
│ (Quality Filter) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 数据去重 │ ← MinHash, SimHash, 精确去重
│ (Deduplication) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 敏感信息过滤 │ ← PII检测, 毒性内容过滤
│ (PII Removal) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Tokenization │ ← BPE训练, 文本编码
│ (Tokenization) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 数据混合与打包 │ ← 多数据源混合, 序列打包
│ (Data Mixing) │
└────────┬────────┘
│
▼
训练数据
1.2 数据规模对比¶
| 模型 | 训练数据量 | 数据来源 | 发布年份 |
|---|---|---|---|
| GPT-3 | 300B tokens | Common Crawl, WebText, Books, Wikipedia | 2020 |
| Chinchilla | 1.4T tokens | 证明数据量与模型大小同等重要 | 2022 |
| LLaMA-2 | 2T tokens | 公开数据,去重过滤 | 2023 |
| LLaMA-3 | 15T+ tokens | 超大规模多语言语料 | 2024 |
| Qwen-2.5 | 18T+ tokens | 多语言网页、代码、数学、百科 | 2024 |
| DeepSeek-V3 | 14.8T tokens | 高质量多领域混合数据 | 2024 |
关键趋势:2024-2025 年的模型训练数据量已从万亿级迈向十万亿级,数据质量(而非单纯数量)成为决定模型能力的关键因素。Chinchilla 论文揭示的 Scaling Law 表明,数据量与模型参数量应同步增长。
数据收集与来源¶
2.1 常见数据源¶
class DataSources:
"""
大模型训练常见数据源
"""
SOURCES = {
# 网页数据
"common_crawl": {
"description": "网页爬取数据",
"volume": "数十PB原始数据",
"quality": "低-中,需要大量过滤",
"processing": "WET提取, 语言检测, 质量过滤"
},
# 代码数据
"github": {
"description": "开源代码仓库",
"volume": "数百TB",
"quality": "高,结构化",
"processing": "License过滤, 去重, 语言分类"
},
# 书籍数据
"books": {
"description": "书籍和文学作品",
"volume": "数十TB",
"quality": "高,长文本",
"processing": "版权检查, OCR纠错"
},
# 学术数据
"academic": {
"description": "论文和学术文献",
"volume": "数TB",
"quality": "高,专业",
"processing": "PDF提取, 公式处理"
},
# 对话数据
"conversation": {
"description": "对话和问答数据",
"volume": "数TB",
"quality": "中-高",
"processing": "隐私过滤, 质量筛选"
},
# 百科数据
"wikipedia": {
"description": "维基百科",
"volume": "数百GB",
"quality": "高,知识密集",
"processing": "结构化提取, 引用去除"
},
# 数学与推理数据(2024-2025 新增重要来源)
"math_reasoning": {
"description": "数学证明、推理题、逻辑题",
"volume": "数十GB",
"quality": "高,结构化",
"processing": "LaTeX解析, 答案验证"
}
}
# 开源数据集
OPEN_DATASETS = {
"The Pile": "825GB多样化英文文本 (EleutherAI)",
"C4": "清洗后的Common Crawl (Google)",
"OSCAR": "多语言Common Crawl",
"ROOTS": "BigScience多语言语料",
"RedPajama": "LLaMA训练数据开源复刻",
"RefinedWeb": "高质量网页数据 (Falcon)",
"FineWeb": "15T tokens 高质量网页数据 (HuggingFace, 2024)",
"Dolma": "3T tokens 多源混合语料 (AI2, 2024)",
}
2.2 Common Crawl 处理流程¶
import re
from bs4 import BeautifulSoup
class CommonCrawlProcessor:
"""
Common Crawl 数据处理完整流程
"""
def __init__(self, min_lang_score=0.8):
self.min_lang_score = min_lang_score
self.language_detector = None # 延迟加载 fastText 语言检测
self.quality_classifier = None # 延迟加载质量分类器
def extract_text_from_html(self, html_content):
"""
从HTML中提取正文文本
处理步骤:
1. 解析HTML结构
2. 移除脚本、样式等非正文标签
3. 提取纯文本并清理空白
"""
soup = BeautifulSoup(html_content, 'html.parser')
# 移除脚本和样式
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
# 提取文本
text = soup.get_text()
# 清理空白:逐行strip,过滤空行
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
return text
def detect_language(self, text):
"""
检测文本语言
使用 fastText 的 lid.176.bin 模型,支持176种语言识别。
Returns:
(language_code, confidence_score)
"""
import fasttext
if self.language_detector is None:
# 抑制fasttext的警告输出
fasttext.FastText.eprint = lambda *args: None
self.language_detector = fasttext.load_model('lid.176.bin')
# fasttext不支持换行符,需替换为空格
predictions = self.language_detector.predict(text.replace('\n', ' '), k=1)
lang = predictions[0][0].replace('__label__', '')
score = predictions[1][0]
return lang, float(score)
def quality_filter(self, text):
"""
多维度质量过滤
过滤条件:
1. 文本长度(太短无信息量,太长可能是日志/代码)
2. 符号比例(太多符号可能是垃圾/代码混入)
3. 重复行比例(高重复=低质量)
4. 停用词比例(检测无意义文本)
5. 平均行长度(太短行=列表/导航栏)
Returns:
(passed, reason)
"""
# 长度检查
if len(text) < 100 or len(text) > 100_000:
return False, "Length filter"
# 符号比例
symbol_ratio = len(re.findall(r'[^\w\s]', text)) / len(text)
if symbol_ratio > 0.5:
return False, "Symbol ratio filter"
# 重复行检测
lines = text.split('\n')
if len(lines) > 1:
unique_lines = set(lines)
if len(unique_lines) / len(lines) < 0.3:
return False, "Repetition filter"
# 单词比例(过滤无意义字符组合)
words = re.findall(r'\b\w+\b', text)
if len(words) > 0 and len(words) / max(len(text.split()), 1) < 0.5:
return False, "Word ratio filter"
# 平均行长度(太短=导航/列表)
avg_line_len = sum(len(line) for line in lines) / max(len(lines), 1)
if avg_line_len < 10:
return False, "Short line filter"
return True, "Passed"
def process_record(self, html_content, target_lang="en"):
"""
处理单条 Common Crawl 记录的完整流程
Args:
html_content: 原始HTML内容
target_lang: 目标语言代码
Returns:
处理后的文本,或 None(如果被过滤)
"""
# Step 1: 文本提取
text = self.extract_text_from_html(html_content)
if not text:
return None
# Step 2: 语言检测
lang, score = self.detect_language(text)
if lang != target_lang or score < self.min_lang_score:
return None
# Step 3: 质量过滤
passed, reason = self.quality_filter(text)
if not passed:
return None
return text
数据清洗与质量控制¶
3.1 质量评估指标¶
import torch
import re
from collections import Counter
class QualityMetrics:
"""
文本质量评估指标体系
包含多种互补的质量评估方法:
- 困惑度(Perplexity):衡量文本的"自然度"
- 可读性(Readability):衡量文本的阅读难度
- 连贯性(Coherence):衡量文本的语义一致性
"""
@staticmethod
def perplexity_score(text, language_model, tokenizer):
"""
使用语言模型计算困惑度
困惑度 = exp(交叉熵损失)
- 低困惑度(< 50):文本流畅自然
- 中困惑度(50-200):正常范围
- 高困惑度(> 200):可能是垃圾/乱码文本
Args:
text: 待评估文本
language_model: 预训练语言模型
tokenizer: 对应的tokenizer
"""
with torch.no_grad():
inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=512)
outputs = language_model(**inputs, labels=inputs['input_ids'])
perplexity = torch.exp(outputs.loss)
return perplexity.item()
@staticmethod
def readability_score(text):
"""
可读性评分(Flesch Reading Ease)
分数范围 0-100:
- 90-100: 非常容易(5年级水平)
- 60-70: 标准(8-9年级水平)
- 0-30: 非常困难(大学水平)
"""
import textstat
return textstat.flesch_reading_ease(text)
@staticmethod
def coherence_score(text):
"""
连贯性评分:基于相邻句子的词汇重叠度
原理:连贯的文本中,相邻句子通常共享部分词汇(代词、同义词等)。
使用 Jaccard 相似度衡量句子间的词汇重叠。
Returns:
0.0-1.0 之间的连贯性分数
"""
sentences = [s.strip() for s in text.split('.') if s.strip()]
if len(sentences) < 2:
return 0.0
overlap_scores = []
for i in range(len(sentences) - 1):
words1 = set(sentences[i].lower().split())
words2 = set(sentences[i + 1].lower().split())
if words1 and words2:
# Jaccard 相似度 = 交集 / 并集
overlap = len(words1 & words2) / len(words1 | words2)
overlap_scores.append(overlap)
return sum(overlap_scores) / len(overlap_scores) if overlap_scores else 0.0
class QualityClassifier:
"""
基于特征工程的质量分类器
在 GPT-3/4 等模型的训练数据清洗中,通常使用两类质量分类器:
1. 启发式规则过滤(如本类):快速、低成本
2. 基于模型的分类器(如 fastText):更准确、需要训练数据
GPT-3 使用的方法:用 Wikipedia 质量的文本作为正样本,
随机 Common Crawl 作为负样本,训练二分类器。
"""
def __init__(self):
self.features = [
'char_count', 'word_count', 'sentence_count',
'avg_word_length', 'symbol_ratio', 'uppercase_ratio',
'stopword_ratio', 'unique_word_ratio'
]
def extract_features(self, text):
"""
提取文本特征向量
Returns:
dict: 特征名到特征值的映射
"""
features = {}
# 基本统计
features['char_count'] = len(text)
words = text.split()
features['word_count'] = len(words)
features['sentence_count'] = len(re.findall(r'[.!?]+', text))
# 平均词长
features['avg_word_length'] = (
sum(len(w) for w in words) / len(words) if words else 0
)
# 符号比例
features['symbol_ratio'] = len(re.findall(r'[^\w\s]', text)) / max(len(text), 1)
# 大写比例
features['uppercase_ratio'] = sum(1 for c in text if c.isupper()) / max(len(text), 1)
# 停用词比例(英文停用词示例)
stopwords = {
'the', 'a', 'an', 'in', 'on', 'at', 'to', 'for', 'of',
'is', 'are', 'was', 'were', 'be', 'been', 'being',
'have', 'has', 'had', 'do', 'does', 'did',
}
word_set = set(w.lower() for w in words)
features['stopword_ratio'] = (
len(word_set & stopwords) / len(word_set) if word_set else 0
)
# 词汇多样性(Type-Token Ratio)
features['unique_word_ratio'] = len(word_set) / max(len(words), 1)
return features
def classify_quality(self, text, threshold=0.5):
"""
基于特征组合判断文本质量
这是一个简化版本。生产中通常训练一个 fastText 或 SVM 分类器。
"""
features = self.extract_features(text)
# 简单的加权评分
score = 0.0
# 词汇多样性高 → 质量好
if features['unique_word_ratio'] > 0.4:
score += 0.2
# 停用词比例适中 → 自然语言
if 0.1 < features['stopword_ratio'] < 0.5:
score += 0.2
# 符号比例低 → 非垃圾
if features['symbol_ratio'] < 0.2:
score += 0.2
# 平均词长适中 → 正常文本
if 3 < features['avg_word_length'] < 8:
score += 0.2
# 句子数量合理 → 有内容
if features['sentence_count'] > 2:
score += 0.2
return score >= threshold
3.2 PII(个人身份信息)检测与移除¶
import re
import spacy
class PIIDetector:
"""
PII检测与脱敏
在训练数据中移除PII是数据合规的关键步骤,涉及:
- GDPR(欧盟)、CCPA(加州)等隐私法规
- 防止模型记忆并泄露个人信息
- 常见PII类型:邮箱、电话、身份证号、信用卡号、IP地址、姓名
检测方法:
1. 正则表达式:模式固定的PII(邮箱、电话等)
2. NER模型:上下文相关的PII(人名、组织、地点)
"""
PII_PATTERNS = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone_us': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'phone_cn': r'\b1[3-9]\d{9}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b(?:\d[ -]*?){13,16}\b',
'ip_address': r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b',
'id_card_cn': r'\b\d{17}[\dXx]\b', # 中国身份证号
}
def __init__(self, use_ner=True):
"""
Args:
use_ner: 是否使用NER模型检测人名等(需要安装spacy模型)
"""
self.use_ner = use_ner
if use_ner:
try:
self.nlp = spacy.load('en_core_web_sm')
except OSError:
print("Warning: spacy model not found. NER detection disabled.")
self.nlp = None
def detect_pii(self, text):
"""
检测文本中的所有PII
Returns:
list of dict: 每个dict包含 type, value, start, end
"""
pii_found = []
# 正则检测
for pii_type, pattern in self.PII_PATTERNS.items():
for match in re.finditer(pattern, text):
pii_found.append({
'type': pii_type,
'value': match.group(),
'start': match.start(),
'end': match.end()
})
# NER检测(姓名、组织、地点)
if self.use_ner and self.nlp is not None:
doc = self.nlp(text)
for ent in doc.ents:
if ent.label_ in ['PERSON', 'ORG', 'GPE']:
pii_found.append({
'type': ent.label_.lower(),
'value': ent.text,
'start': ent.start_char,
'end': ent.end_char
})
return pii_found
def anonymize(self, text, pii_list=None):
"""
对PII进行脱敏处理
策略:从后往前替换,避免位置偏移
"""
if pii_list is None:
pii_list = self.detect_pii(text)
# 按位置降序排列(从后往前替换)
pii_list.sort(key=lambda x: x['start'], reverse=True)
anonymized = text
for pii in pii_list:
replacement = f"[{pii['type'].upper()}_REDACTED]"
anonymized = (
anonymized[:pii['start']] +
replacement +
anonymized[pii['end']:]
)
return anonymized
数据去重技术¶
为什么去重至关重要:研究表明(Lee et al., 2022 "Deduplicating Training Data Makes Language Models Better"),重复数据会导致: 1. 训练效率降低:模型在重复样本上浪费计算 2. 记忆效应加剧:重复数据使模型更容易记忆训练样本(隐私风险) 3. 性能下降:去重后训练的模型在下游任务上表现更好 4. 评估污染:训练集与测试集的重叠导致虚假高分
4.1 精确去重¶
import hashlib
class ExactDeduplication:
"""
精确去重:基于哈希的完全匹配
原理:对每条文本计算哈希值(MD5/SHA256),相同哈希=相同文本。
时间复杂度:O(N)(N为文档数)
空间复杂度:O(N)(存储哈希集合)
适用场景:大规模数据集的快速去重
"""
def __init__(self, hash_func='md5'):
self.seen_hashes = set()
self.hash_func = hash_func
self.duplicate_count = 0
def compute_hash(self, text):
"""计算文本哈希"""
h = hashlib.new(self.hash_func)
h.update(text.encode('utf-8'))
return h.hexdigest()
def is_duplicate(self, text):
"""
检查是否重复
Returns:
True if duplicate, False if new (and added to set)
"""
text_hash = self.compute_hash(text)
if text_hash in self.seen_hashes:
self.duplicate_count += 1
return True
self.seen_hashes.add(text_hash)
return False
def deduplicate_dataset(self, texts):
"""
对整个数据集去重
Args:
texts: 文本列表
Returns:
(unique_texts, duplicate_ratio)
"""
unique = []
for text in texts:
if not self.is_duplicate(text):
unique.append(text)
ratio = self.duplicate_count / max(len(texts), 1)
return unique, ratio
4.2 近似去重(MinHash + LSH)¶
import hashlib
from collections import defaultdict
class NearDeduplication:
"""
近似去重(MinHash + LSH)
核心思想:
1. 将文本分解为 k-gram shingles(滑动窗口词组)
2. 用 MinHash 将 shingle 集合压缩为固定长度签名
3. 用 LSH(局部敏感哈希)快速找到候选对
4. 对候选对计算精确 Jaccard 相似度验证
MinHash 原理:
Jaccard(A, B) = |A ∩ B| / |A ∪ B|
MinHash 签名近似保持 Jaccard 相似度:
P(min_hash(A) = min_hash(B)) ≈ Jaccard(A, B)
LSH 原理:
将签名分为 b 个 band,每个 band 有 r 行
如果两个签名在任一 band 完全相同,则为候选对
阈值 ≈ (1/b)^(1/r)
Args:
num_hashes: MinHash 签名长度(越大越精确,但越慢)
num_bands: LSH band 数量
"""
def __init__(self, num_hashes=128, num_bands=16):
self.num_hashes = num_hashes
self.num_bands = num_bands
self.rows_per_band = num_hashes // num_bands
# 每个 band 一个独立的哈希桶
self.buckets = [defaultdict(list) for _ in range(num_bands)]
self.doc_index = 0 # 文档编号(避免存储原始文本)
self._stored_shingles = {} # doc_id -> shingles
def get_shingles(self, text, k=5):
"""
获取 k-gram shingles
k=5 表示每5个连续词组成一个 shingle。
k 越大,去重粒度越细(更严格)。
"""
words = text.split()
shingles = set()
for i in range(len(words) - k + 1):
shingle = ' '.join(words[i:i + k])
shingles.add(shingle)
return shingles
def compute_minhash(self, shingles):
"""
计算 MinHash 签名
通过对每个 shingle 应用 num_hashes 个不同的哈希函数,
取每个哈希函数的最小值,构成签名向量。
"""
signature = []
for i in range(self.num_hashes):
min_hash = float('inf')
for shingle in shingles:
# 通过加盐模拟不同的哈希函数
hash_val = int(
hashlib.sha1(f"{shingle}:{i}".encode()).hexdigest(), 16
)
min_hash = min(min_hash, hash_val)
signature.append(min_hash)
return signature
def add_document(self, text):
"""
添加文档并检查是否近似重复
Returns:
(is_duplicate, best_similarity, duplicate_of_doc_id)
"""
shingles = self.get_shingles(text)
if not shingles:
return False, 0.0, None
signature = self.compute_minhash(shingles)
# LSH: 将签名分桶,收集候选对
candidate_ids = set()
for band_idx in range(self.num_bands):
start = band_idx * self.rows_per_band
end = start + self.rows_per_band
band_signature = tuple(signature[start:end])
bucket = self.buckets[band_idx]
if band_signature in bucket:
candidate_ids.update(bucket[band_signature])
bucket[band_signature].append(self.doc_index)
# 对候选对计算精确 Jaccard 相似度
best_sim = 0.0
best_match = None
for cand_id in candidate_ids:
# 注意:生产中应维护 shingles 存储而非重算
cand_shingles = self._stored_shingles.get(cand_id, set())
if cand_shingles:
sim = len(shingles & cand_shingles) / len(shingles | cand_shingles)
if sim > best_sim:
best_sim = sim
best_match = cand_id
# 存储当前文档的 shingles
self._stored_shingles[self.doc_index] = shingles
self.doc_index += 1
threshold = 0.8
return best_sim > threshold, best_sim, best_match
4.3 文档级去重(SimHash)¶
import hashlib
from collections import defaultdict
class DocumentDeduplication:
"""
文档级去重(SimHash)
SimHash 是一种局部敏感哈希(LSH)算法:
- 相似文档的 SimHash 指纹的汉明距离小
- 不相似文档的汉明距离大
- 通过比较汉明距离判断文档相似性
与 MinHash 的区别:
- MinHash: 估计 Jaccard 相似度(集合交并比)
- SimHash: 估计余弦相似度(向量夹角)
Args:
hashbits: 指纹位数(通常64位)
threshold: 汉明距离阈值(越小越严格,通常3-5)
"""
def __init__(self, hashbits=64, threshold=3):
self.hashbits = hashbits
self.threshold = threshold
self.fingerprints = [] # 存储所有已见文档的指纹
def compute_simhash(self, text):
"""
计算 SimHash 指纹
算法步骤:
1. 分词并统计词频
2. 对每个词计算哈希值
3. 根据哈希的每一位,加权累加到向量中
4. 向量正分量→1,负分量→0,生成指纹
"""
# 分词并统计词频
words = text.split()
word_freq = defaultdict(int)
for word in words:
word_freq[word] += 1
# 初始化累加向量
vector = [0] * self.hashbits
# 加权累加
for word, freq in word_freq.items():
hash_val = int(hashlib.md5(word.encode()).hexdigest(), 16)
for i in range(self.hashbits):
bit = (hash_val >> i) & 1
if bit:
vector[i] += freq # 正权重
else:
vector[i] -= freq # 负权重
# 生成指纹:正→1,负→0
fingerprint = 0
for i in range(self.hashbits):
if vector[i] > 0:
fingerprint |= (1 << i)
return fingerprint
def hamming_distance(self, hash1, hash2):
"""
计算汉明距离:两个等长二进制串中不同位的数量
"""
xor = hash1 ^ hash2
return bin(xor).count('1')
def is_duplicate(self, text):
"""
检查是否与已有文档重复
Returns:
(is_duplicate, best_distance)
"""
fingerprint = self.compute_simhash(text)
best_dist = self.hashbits # 最大可能距离
for seen_fp in self.fingerprints:
dist = self.hamming_distance(fingerprint, seen_fp)
best_dist = min(best_dist, dist)
if dist <= self.threshold:
return True, dist
self.fingerprints.append(fingerprint)
return False, best_dist
4.4 去重方法对比¶
去重方法对比
═══════════════════════════════════════════════════════════════════
方法 精确度 速度 内存 适用场景
──────────────────────────────────────────────────────────────────
精确哈希 完全匹配 O(N) O(N) 快速去重,第一轮过滤
MinHash+LSH 近似 O(N·H) O(N·H) 大规模近似去重(推荐)
SimHash 近似 O(N·L) O(N) 文档级粗粒度去重
Suffix Array 精确 O(N·L) O(N·L) 段落级精确去重
MinHash+LSH+SA 最优 较慢 较高 生产级多阶段去重
推荐的多阶段去重流程:
1. 精确哈希去重(快速移除完全重复)
2. MinHash+LSH 近似去重(移除高度相似文档)
3. Suffix Array 段落级去重(移除部分重复段落)
工具推荐:
- datasketch (Python): MinHash/LSH 的生产级实现
- text-dedup: 预训练数据去重工具包
- cc_net: Common Crawl 专用处理工具
═══════════════════════════════════════════════════════════════════
Tokenization 工程¶
5.1 BPE 训练¶
from collections import defaultdict
class BPETokenizerTrainer:
"""
BPE (Byte-Pair Encoding) 训练器
BPE 算法原理:
1. 初始化:将所有单词拆分为字符序列
2. 迭代:统计相邻 token 对的频率,合并最高频的 token 对
3. 重复直到达到目标词汇表大小
BPE 的优势:
- 平衡了词汇表大小和序列长度
- 能处理未登录词(OOV)
- 被广泛用于 GPT、LLaMA 等模型
Args:
vocab_size: 目标词汇表大小
"""
def __init__(self, vocab_size=32000):
self.vocab_size = vocab_size
self.vocab = set()
self.merges = []
def train(self, texts):
"""
训练 BPE tokenizer
Args:
texts: 训练文本列表
Returns:
self(支持链式调用)
"""
# Step 1: 初始化——将所有单词拆分为字符序列
word_freqs = defaultdict(int)
for text in texts:
for word in text.split():
# 添加结束标记 </w> 区分词边界
chars = tuple(list(word) + ['</w>'])
word_freqs[chars] += 1
# 初始化词汇表(所有出现过的字符)
self.vocab = set()
for word in word_freqs:
self.vocab.update(word)
# Step 2: 迭代合并
num_merges = self.vocab_size - len(self.vocab)
for i in range(num_merges):
# 统计相邻 token 对的频率
pairs = defaultdict(int)
for word, freq in word_freqs.items():
for j in range(len(word) - 1):
pairs[(word[j], word[j + 1])] += freq
if not pairs:
break
# 找到最高频 token 对
best_pair = max(pairs, key=pairs.get)
self.merges.append(best_pair)
# 在所有单词中合并该 token 对
new_word_freqs = defaultdict(int)
new_vocab = set()
for word, freq in word_freqs.items():
new_word = []
idx = 0
while idx < len(word):
if (idx < len(word) - 1
and (word[idx], word[idx + 1]) == best_pair):
new_word.append(word[idx] + word[idx + 1])
idx += 2
else:
new_word.append(word[idx])
idx += 1
new_word_freqs[tuple(new_word)] += freq
new_vocab.update(new_word)
word_freqs = new_word_freqs
self.vocab = new_vocab
if (i + 1) % 1000 == 0:
merged = best_pair[0] + best_pair[1]
print(f"Merge {i + 1}/{num_merges}: "
f"{best_pair} -> {merged}")
# 添加特殊 token
special_tokens = ['<pad>', '<unk>', '<s>', '</s>', '<mask>']
self.vocab.update(special_tokens)
return self
def encode(self, text):
"""
使用学到的 merge 规则编码文本
"""
words = text.split()
tokens = []
for word in words:
word_tokens = list(word) + ['</w>']
# 按顺序应用 merge 规则
for merge in self.merges:
new_tokens = []
i = 0
while i < len(word_tokens):
if (i < len(word_tokens) - 1
and (word_tokens[i], word_tokens[i + 1]) == merge):
new_tokens.append(merge[0] + merge[1])
i += 2
else:
new_tokens.append(word_tokens[i])
i += 1
word_tokens = new_tokens
tokens.extend(word_tokens)
return tokens
def decode(self, tokens):
"""
解码 token 序列为文本
"""
text = ''.join(tokens)
text = text.replace('</w>', ' ')
return text.strip()
5.2 使用 Hugging Face Tokenizers(生产级)¶
from tokenizers import Tokenizer, models, pre_tokenizers, decoders, trainers
class ModernTokenizerTraining:
"""
使用 HuggingFace Tokenizers 库训练生产级 Tokenizer
相比手写 BPE,HuggingFace Tokenizers 的优势:
- Rust 实现,速度快 10-100 倍
- 支持多种预分词策略
- 内置多种解码器
- 支持直接保存/加载为 JSON 格式
"""
@staticmethod
def train_bpe(corpus_files, vocab_size=32000, output_path="tokenizer.json"):
"""
训练 BPE tokenizer(字节级,支持所有 Unicode 字符)
Args:
corpus_files: 语料文件路径列表
vocab_size: 目标词汇表大小
output_path: 输出路径
"""
# 初始化 BPE 模型
tokenizer = Tokenizer(models.BPE())
# 字节级预分词器(GPT-2 风格)
tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(
add_prefix_space=False
)
# 配置训练器
trainer = trainers.BpeTrainer(
vocab_size=vocab_size,
special_tokens=["<pad>", "<unk>", "<s>", "</s>", "<mask>"],
min_frequency=2,
show_progress=True,
)
# 训练
tokenizer.train(files=corpus_files, trainer=trainer)
# 设置解码器
tokenizer.decoder = decoders.ByteLevel()
# 保存
tokenizer.save(output_path)
print(f"Tokenizer saved to {output_path}")
print(f"Vocabulary size: {tokenizer.get_vocab_size()}")
return tokenizer
@staticmethod
def train_sentencepiece(
corpus_files, vocab_size=32000,
model_prefix="spm", model_type='bpe'
):
"""
训练 SentencePiece tokenizer
SentencePiece 的优势:
- 直接处理 raw text,不需要预分词
- 对中文、日文等多语言支持更好
- 支持 BPE 和 Unigram 两种算法
- 被 LLaMA、T5、Gemini 等模型使用
Args:
corpus_files: 语料文件路径列表
vocab_size: 目标词汇表大小
model_prefix: 输出模型前缀
model_type: 'bpe' 或 'unigram'
"""
import sentencepiece as spm
import tempfile
# 合并所有文件
with tempfile.NamedTemporaryFile(
mode='w', delete=False, suffix='.txt', encoding='utf-8'
) as tmp:
for file in corpus_files:
with open(file, 'r', encoding='utf-8') as f:
tmp.write(f.read())
combined_file = tmp.name
# 训练
spm.SentencePieceTrainer.train(
input=combined_file,
model_prefix=model_prefix,
vocab_size=vocab_size,
model_type=model_type,
character_coverage=0.9995, # 覆盖99.95%的字符
num_threads=8,
split_digits=True, # 将数字拆分为单个数字token
allow_whitespace_only_pieces=True,
byte_fallback=True, # 未登录字符回退到字节级
unk_piece='<unk>',
bos_piece='<s>',
eos_piece='</s>',
pad_piece='<pad>'
)
print(f"SentencePiece model saved: {model_prefix}.model")
return f"{model_prefix}.model"
5.3 Tokenizer 选择指南¶
主流 Tokenizer 对比
═══════════════════════════════════════════════════════════════════
Tokenizer 算法 词汇表大小 使用模型
──────────────────────────────────────────────────────────────────
GPT-2 Tokenizer BPE 50,257 GPT-2, GPT-3
GPT-4 Tokenizer BPE 100,256 GPT-4, o1
LLaMA Tokenizer BPE(SPM) 32,000 LLaMA-1/2
LLaMA-3 Tokenizer BPE(SPM) 128,256 LLaMA-3(大幅扩充多语言)
Qwen Tokenizer BPE(SPM) 151,643 Qwen-2.5(中文优化)
Claude Tokenizer BPE ~65,000 Claude 系列
关键选择因素:
1. 多语言支持:LLaMA-3/Qwen 的大词汇表对中文更友好
2. 代码能力:代码 token 效率影响模型编程能力
3. 压缩率:越高 → 同等信息量下 token 数越少 → 训练/推理越快
═══════════════════════════════════════════════════════════════════
高效数据加载¶
6.1 内存映射与流式加载¶
import mmap
import json
import torch
from torch.utils.data import IterableDataset, DataLoader
class MemoryMappedDataset:
"""
内存映射数据集
使用 mmap 将文件映射到虚拟内存,按需加载。
适合数据集大于物理内存的场景(如TB级预训练数据)。
原理:
- mmap 创建虚拟内存映射,不实际加载数据
- 访问某条数据时,OS 自动将对应页加载到物理内存
- 配合索引文件实现 O(1) 随机访问
"""
def __init__(self, data_path, index_path):
# 打开数据文件并创建内存映射
self.file = open(data_path, 'rb')
self.mm = mmap.mmap(
self.file.fileno(), 0, access=mmap.ACCESS_READ
)
# 加载索引(记录每条数据的字节偏移)
with open(index_path, 'r') as f:
self.index = json.load(f)
def __getitem__(self, idx):
"""通过索引随机访问第 idx 条数据"""
start, end = self.index[idx]
self.mm.seek(start)
data = self.mm.read(end - start)
return json.loads(data.decode('utf-8'))
def __len__(self):
return len(self.index)
def __del__(self):
"""清理资源"""
if hasattr(self, 'mm') and self.mm:
self.mm.close()
if hasattr(self, 'file') and self.file:
self.file.close()
class StreamingDataset:
"""
流式数据集
使用生成器逐行读取,不将全部数据加载到内存。
适合超大规模数据集(TB级)的顺序遍历。
注意:IterableDataset 不支持随机访问和 shuffle。
分布式训练中需要确保每个 worker 读取不同的数据分片。
"""
def __init__(self, data_paths):
self.data_paths = data_paths
def __iter__(self):
for path in self.data_paths:
with open(path, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
yield json.loads(line)
6.2 DataLoader 优化与序列打包¶
import torch
from torch.utils.data import IterableDataset, DataLoader
class EfficientDataLoader:
"""
高效数据加载器配置
关键优化参数:
- num_workers: 多进程并行加载(通常设为 CPU 核心数的一半)
- pin_memory: 锁页内存,加速 CPU→GPU 数据传输
- prefetch_factor: 预取批次数,减少 GPU 等待
- persistent_workers: 保持 worker 进程存活,避免重复创建
"""
@staticmethod
def create_dataloader(dataset, batch_size, num_workers=4):
"""创建优化的 DataLoader(Map-style Dataset)"""
return DataLoader(
dataset,
batch_size=batch_size,
num_workers=num_workers,
pin_memory=True,
prefetch_factor=2 if num_workers > 0 else None,
persistent_workers=True if num_workers > 0 else False,
drop_last=True,
)
@staticmethod
def create_streaming_dataloader(dataset, batch_size):
"""流式 DataLoader(IterableDataset,通常不用多 worker)"""
return DataLoader(
dataset,
batch_size=batch_size,
num_workers=0,
pin_memory=True,
)
class PackedDataset(IterableDataset):
"""
序列打包数据集(Packing)
核心思想:将多个短序列拼接成一个长序列(max_length),
减少 padding 浪费,提高 GPU 利用率。
实测效果:
- 短序列为主的数据集:训练速度提升 2-4 倍
- 长序列为主的数据集:提升有限
注意事项:
- 需要在 attention mask 中正确处理序列边界
- 某些训练框架(如 Megatron-LM)原生支持 packing
"""
def __init__(self, dataset, max_length=2048, tokenizer=None):
self.dataset = dataset
self.max_length = max_length
self.tokenizer = tokenizer
def __iter__(self):
buffer = []
buffer_length = 0
for sample in self.dataset:
tokens = self.tokenizer.encode(sample['text'])
if buffer_length + len(tokens) > self.max_length:
if buffer:
# 添加 EOS token 分隔
packed = buffer + [self.tokenizer.eos_token_id]
# Padding 到 max_length
pad_len = self.max_length - len(packed)
packed = packed + [self.tokenizer.pad_token_id] * pad_len
yield {
'input_ids': torch.tensor(packed[:self.max_length]),
'labels': torch.tensor(packed[:self.max_length]),
}
buffer = tokens
buffer_length = len(tokens)
else:
buffer.extend(tokens)
buffer_length += len(tokens)
数据混合策略¶
7.1 多数据源混合¶
import random
class DataMixer:
"""
多数据源混合策略
数据混合是预训练的关键决策之一。不同的混合比例会显著影响模型能力:
- 代码比例高 → 编程能力强,但可能影响自然语言流畅度
- 学术比例高 → 推理能力强,但可能缺乏常识
- 网页比例高 → 知识面广,但噪声也多
LLaMA-3 的数据混合(参考):
- 网页文本: ~85%
- 代码: ~8%
- 学术: ~5%
- 其他: ~2%
"""
def __init__(self, datasets, weights, seed=42):
"""
Args:
datasets: 数据源字典 {name: iterable}
weights: 采样权重 {name: weight}(无需归一化)
"""
self.datasets = datasets
self.weights = weights
self.rng = random.Random(seed)
# 创建迭代器
self.iterators = {
name: iter(dataset) for name, dataset in datasets.items()
}
def sample(self):
"""
按权重采样一个数据源并获取一条数据
Returns:
(sample, source_name)
"""
names = list(self.weights.keys())
weights = [self.weights[name] for name in names]
# 加权随机选择数据源
chosen = self.rng.choices(names, weights=weights, k=1)[0]
try:
sample = next(self.iterators[chosen])
except StopIteration:
# 数据源耗尽,重新创建迭代器
self.iterators[chosen] = iter(self.datasets[chosen])
sample = next(self.iterators[chosen])
return sample, chosen
def create_mixed_iterator(self, total_samples):
"""
创建混合迭代器
Yields:
dict: 包含数据和来源标记的样本
"""
for _ in range(total_samples):
sample, source = self.sample()
sample['source'] = source
yield sample
# 数据混合示例配置
DATA_MIXING_CONFIGS = {
"general_purpose": {
"web": 0.60,
"code": 0.15,
"books": 0.10,
"academic": 0.10,
"conversation": 0.05
},
"code_focused": {
"code": 0.50,
"web": 0.30,
"books": 0.10,
"academic": 0.05,
"conversation": 0.05
},
"reasoning_focused": {
"academic": 0.30,
"math_reasoning": 0.20,
"code": 0.20,
"web": 0.20,
"books": 0.10
},
}
7.2 动态数据课程与上采样¶
import torch
from torch.utils.data import WeightedRandomSampler
class CurriculumLearning:
"""
课程学习(Curriculum Learning)
核心思想:按照从易到难的顺序组织训练数据,
类似人类学习过程中的课程安排。
实践中的课程策略:
1. 按文本长度:先短后长(短文本更容易学习基础模式)
2. 按文本质量:先高质量后低质量(先建立好的表示)
3. 按任务复杂度:先简单任务后复杂任务
注意:课程学习在 LLM 预训练中的效果仍有争议,
一些研究发现随机打乱的效果与课程学习相当。
"""
def __init__(self, datasets_by_difficulty):
"""
Args:
datasets_by_difficulty: 按难度排序的数据源列表
[(difficulty_level, dataset), ...]
difficulty_level: 0(最易)到 N(最难)
"""
self.datasets = datasets_by_difficulty
def get_current_data(self, training_progress):
"""
根据训练进度获取对应难度的数据
Args:
training_progress: 0.0 ~ 1.0
Returns:
当前阶段的数据集
"""
num_stages = len(self.datasets)
target_stage = int(training_progress * num_stages)
target_stage = min(target_stage, num_stages - 1)
return self.datasets[target_stage][1]
class UpsamplingStrategy:
"""
上采样策略:对高质量数据增加采样频率
实践中的应用:
- Wikipedia 等高质量数据通常上采样 2-5 倍
- 代码数据在非代码专用模型中也常上采样
- DeepSeek-Math 报告对数学数据进行了大幅上采样
"""
@staticmethod
def quality_based_upsampling(dataset, quality_scores, target_ratio=2.0):
"""
基于质量评分的上采样
Args:
dataset: 原始数据集
quality_scores: 每个样本的质量分数(0.0-1.0)
target_ratio: 最高质量数据的采样倍数
Returns:
WeightedRandomSampler
"""
weights = [
1.0 + (target_ratio - 1.0) * score
for score in quality_scores
]
sampler = WeightedRandomSampler(
weights, len(dataset), replacement=True
)
return sampler
@staticmethod
def domain_upsampling(datasets, sampling_multipliers):
"""
按领域上采样
Args:
datasets: {domain_name: dataset}
sampling_multipliers: {domain_name: multiplier}
例如 {"wikipedia": 3.0, "code": 2.0, "web": 1.0}
Returns:
混合后的数据列表
"""
mixed = []
for domain, dataset in datasets.items():
multiplier = sampling_multipliers.get(domain, 1.0)
data = list(dataset)
# 重复采样
repeats = int(multiplier)
for _ in range(repeats):
mixed.extend(data)
# 处理小数部分(随机采样补充)
remainder = multiplier - repeats
if remainder > 0:
import random
sample_size = int(len(data) * remainder)
mixed.extend(random.sample(data, min(sample_size, len(data))))
return mixed
总结¶
数据工程关键要点¶
1. 数据质量 > 数据数量
- LIMA 论文证明 1000 条高质量数据即可微调出优秀模型
- 垃圾数据会损害模型性能(Garbage In, Garbage Out)
- 2024-2025 趋势:更注重数据质量而非单纯扩大规模
2. 去重至关重要
- 重复数据导致过拟合和记忆效应
- 多阶段去重:精确 → 近似 → 段落级
- 生产中推荐使用 datasketch/text-dedup 工具
3. Tokenization 影响模型能力
- 词汇表大小影响模型效率和语言覆盖
- 中文/多语言需要更大的词汇表(128K+)
- LLaMA-3 从 32K 扩展到 128K 词汇表显著提升多语言能力
4. 数据混合需要策略
- 不同任务需要不同数据配比
- 高质量数据应适当上采样
- 课程学习可作为辅助策略
5. 高效加载是训练瓶颈
- 数据加载不能成为 GPU 等待的原因
- 内存映射和流式加载解决大数据问题
- 序列打包(Packing)减少 padding 浪费
推荐工具¶
| 工具 | 用途 | 特点 |
|---|---|---|
| HuggingFace Datasets | 数据集加载和处理 | 流式加载、内存映射 |
| Tokenizers | 快速 tokenizer 训练 | Rust 实现,极快 |
| SentencePiece | 多语言 tokenization | 支持中文/日文等 |
| datasketch | MinHash/LSH 去重 | 大规模近似去重 |
| text-dedup | 预训练数据去重 | 多算法集成 |
| Spark/Dask | 大规模数据处理 | 分布式处理 TB 级数据 |
| cc_net | Common Crawl 处理 | 专用 CC 工具链 |
下一步:02-训练基础设施 - 学习分布式训练、混合精度和训练优化技术。
练习题¶
思考题¶
-
数据质量 vs 数据数量:为什么 LIMA 论文只用了 1000 条高质量数据就能微调出优秀模型?这对数据工程实践有什么启示?
-
去重策略选择:在什么场景下应该使用精确去重,什么场景下应该使用近似去重(MinHash+LSH)?为什么生产环境推荐多阶段去重?
-
Tokenizer 词汇表大小:LLaMA-3 将词汇表从 32K 扩展到 128K,这对中文处理有什么影响?词汇表越大越好吗?
-
数据混合策略:如果训练一个代码能力强的模型,数据配比应该如何设计?如果训练一个通用对话模型呢?
-
序列打包(Packing):Packing 为什么能提升训练效率?在 attention mask 中需要特别注意什么?
代码实践¶
-
入门:实现一个简单的精确去重器,统计给定文本列表中的重复率。
-
进阶:使用 HuggingFace Tokenizers 库训练一个 BPE tokenizer,并在自定义语料上测试编码/解码效果。
-
高级:实现 MinHash+LSH 近似去重算法,对比其与精确去重在效率和准确性上的差异。
💡 参考答案
**思考题 1**:LIMA 论文(Zhou et al., 2023)证明了预训练模型已经学到了大量知识,SFT 的核心作用是"教模型如何使用已有知识",而非"注入新知识"。1000 条高质量、多样化的指令数据足以教会模型遵循指令的格式和风格。这启示我们:数据质量(准确性、多样性、格式规范)远比数量重要,低质量数据反而会损害模型性能(Garbage In, Garbage Out)。 **思考题 2**:精确去重适合第一轮快速过滤(O(N) 时间复杂度),能移除完全相同的文档;近似去重(MinHash+LSH)适合第二轮过滤,能检测高度相似但不完全相同的文档(如翻译、改写)。生产环境推荐多阶段:先精确去重(快)→ 再近似去重(准)→ 最后段落级去重(细粒度)。 **思考题 3**:128K 词汇表对中文的影响:(1) 大部分常用中文字/词可以被编码为单个 token,而非被拆分为多个字节级 token;(2) 编码效率提升 → 同等信息量下 token 数更少 → 训练和推理更快。但词汇表越大,embedding 层参数越多,模型体积增大。128K 是当前中文模型的经验上限,继续增大收益递减。 **思考题 4**:代码能力强的模型:代码 40-50%、网页 20-25%、学术 10-15%、数学推理 10-15%、其他 5-10%。通用对话模型:网页 50-60%、代码 15-20%、书籍 10%、学术 5-10%、对话 5-10%。关键原则:代码和推理数据对模型整体智能提升最显著。 **思考题 5**:Packing 将多个短序列拼接成一个长序列,减少 padding 浪费的算力,GPU 利用率可提升 2-4 倍。但 attention mask 中必须正确标记序列边界,防止不同样本之间的 token 互相 attend。Megatron-LM 等框架通过 position_ids 重置和自定义 attention mask 来处理。最后更新日期: 2026-04-21 适用版本: LLM 学习教程 v2026