跳转至

01 - 数据工程与预处理

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

学习目标:掌握大模型训练的数据收集、清洗、去重、 tokenization 和高效数据加载技术。


目录

  1. 数据 pipeline 概述
  2. 数据收集与来源
  3. 数据清洗与质量控制
  4. 数据去重技术
  5. Tokenization 工程
  6. 高效数据加载
  7. 数据混合策略

数据 pipeline 概述

1.1 完整数据流程

Text Only
原始数据
┌─────────────────┐
│   数据收集       │  ← Common Crawl, GitHub, 书籍, 论文
│  (Data Collection)│
└────────┬────────┘
┌─────────────────┐
│   文本提取       │  ← HTML解析, PDF提取, 格式转换
│ (Text Extraction)│
└────────┬────────┘
┌─────────────────┐
│   质量过滤       │  ← 语言检测, 质量评分, 垃圾过滤
│ (Quality Filter) │
└────────┬────────┘
┌─────────────────┐
│   数据去重       │  ← MinHash, SimHash, 精确去重
│  (Deduplication) │
└────────┬────────┘
┌─────────────────┐
│   敏感信息过滤   │  ← PII检测, 毒性内容过滤
│   (PII Removal)  │
└────────┬────────┘
┌─────────────────┐
│   Tokenization  │  ← BPE训练, 文本编码
│  (Tokenization)  │
└────────┬────────┘
┌─────────────────┐
│   数据混合与打包 │  ← 多数据源混合, 序列打包
│  (Data Mixing)   │
└────────┬────────┘
    训练数据

1.2 数据规模对比

模型 训练数据量 数据来源 发布年份
GPT-3 300B tokens Common Crawl, WebText, Books, Wikipedia 2020
Chinchilla 1.4T tokens 证明数据量与模型大小同等重要 2022
LLaMA-2 2T tokens 公开数据,去重过滤 2023
LLaMA-3 15T+ tokens 超大规模多语言语料 2024
Qwen-2.5 18T+ tokens 多语言网页、代码、数学、百科 2024
DeepSeek-V3 14.8T tokens 高质量多领域混合数据 2024

关键趋势:2024-2025 年的模型训练数据量已从万亿级迈向十万亿级,数据质量(而非单纯数量)成为决定模型能力的关键因素。Chinchilla 论文揭示的 Scaling Law 表明,数据量与模型参数量应同步增长。


数据收集与来源

2.1 常见数据源

Python
class DataSources:
    """
    大模型训练常见数据源
    """

    SOURCES = {
        # 网页数据
        "common_crawl": {
            "description": "网页爬取数据",
            "volume": "数十PB原始数据",
            "quality": "低-中,需要大量过滤",
            "processing": "WET提取, 语言检测, 质量过滤"
        },

        # 代码数据
        "github": {
            "description": "开源代码仓库",
            "volume": "数百TB",
            "quality": "高,结构化",
            "processing": "License过滤, 去重, 语言分类"
        },

        # 书籍数据
        "books": {
            "description": "书籍和文学作品",
            "volume": "数十TB",
            "quality": "高,长文本",
            "processing": "版权检查, OCR纠错"
        },

        # 学术数据
        "academic": {
            "description": "论文和学术文献",
            "volume": "数TB",
            "quality": "高,专业",
            "processing": "PDF提取, 公式处理"
        },

        # 对话数据
        "conversation": {
            "description": "对话和问答数据",
            "volume": "数TB",
            "quality": "中-高",
            "processing": "隐私过滤, 质量筛选"
        },

        # 百科数据
        "wikipedia": {
            "description": "维基百科",
            "volume": "数百GB",
            "quality": "高,知识密集",
            "processing": "结构化提取, 引用去除"
        },

        # 数学与推理数据(2024-2025 新增重要来源)
        "math_reasoning": {
            "description": "数学证明、推理题、逻辑题",
            "volume": "数十GB",
            "quality": "高,结构化",
            "processing": "LaTeX解析, 答案验证"
        }
    }

# 开源数据集
OPEN_DATASETS = {
    "The Pile": "825GB多样化英文文本 (EleutherAI)",
    "C4": "清洗后的Common Crawl (Google)",
    "OSCAR": "多语言Common Crawl",
    "ROOTS": "BigScience多语言语料",
    "RedPajama": "LLaMA训练数据开源复刻",
    "RefinedWeb": "高质量网页数据 (Falcon)",
    "FineWeb": "15T tokens 高质量网页数据 (HuggingFace, 2024)",
    "Dolma": "3T tokens 多源混合语料 (AI2, 2024)",
}

2.2 Common Crawl 处理流程

Python
import re
from bs4 import BeautifulSoup


class CommonCrawlProcessor:
    """
    Common Crawl 数据处理完整流程
    """

    def __init__(self, min_lang_score=0.8):
        self.min_lang_score = min_lang_score
        self.language_detector = None  # 延迟加载 fastText 语言检测
        self.quality_classifier = None  # 延迟加载质量分类器

    def extract_text_from_html(self, html_content):
        """
        从HTML中提取正文文本

        处理步骤:
        1. 解析HTML结构
        2. 移除脚本、样式等非正文标签
        3. 提取纯文本并清理空白
        """
        soup = BeautifulSoup(html_content, 'html.parser')

        # 移除脚本和样式
        for tag in soup(["script", "style", "nav", "footer", "header"]):
            tag.decompose()

        # 提取文本
        text = soup.get_text()

        # 清理空白:逐行strip,过滤空行
        lines = (line.strip() for line in text.splitlines())
        chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
        text = '\n'.join(chunk for chunk in chunks if chunk)

        return text

    def detect_language(self, text):
        """
        检测文本语言

        使用 fastText 的 lid.176.bin 模型,支持176种语言识别。
        Returns:
            (language_code, confidence_score)
        """
        import fasttext

        if self.language_detector is None:
            # 抑制fasttext的警告输出
            fasttext.FastText.eprint = lambda *args: None
            self.language_detector = fasttext.load_model('lid.176.bin')

        # fasttext不支持换行符,需替换为空格
        predictions = self.language_detector.predict(text.replace('\n', ' '), k=1)
        lang = predictions[0][0].replace('__label__', '')
        score = predictions[1][0]

        return lang, float(score)

    def quality_filter(self, text):
        """
        多维度质量过滤

        过滤条件:
        1. 文本长度(太短无信息量,太长可能是日志/代码)
        2. 符号比例(太多符号可能是垃圾/代码混入)
        3. 重复行比例(高重复=低质量)
        4. 停用词比例(检测无意义文本)
        5. 平均行长度(太短行=列表/导航栏)

        Returns:
            (passed, reason)
        """
        # 长度检查
        if len(text) < 100 or len(text) > 100_000:
            return False, "Length filter"

        # 符号比例
        symbol_ratio = len(re.findall(r'[^\w\s]', text)) / len(text)
        if symbol_ratio > 0.5:
            return False, "Symbol ratio filter"

        # 重复行检测
        lines = text.split('\n')
        if len(lines) > 1:
            unique_lines = set(lines)
            if len(unique_lines) / len(lines) < 0.3:
                return False, "Repetition filter"

        # 单词比例(过滤无意义字符组合)
        words = re.findall(r'\b\w+\b', text)
        if len(words) > 0 and len(words) / max(len(text.split()), 1) < 0.5:
            return False, "Word ratio filter"

        # 平均行长度(太短=导航/列表)
        avg_line_len = sum(len(line) for line in lines) / max(len(lines), 1)
        if avg_line_len < 10:
            return False, "Short line filter"

        return True, "Passed"

    def process_record(self, html_content, target_lang="en"):
        """
        处理单条 Common Crawl 记录的完整流程

        Args:
            html_content: 原始HTML内容
            target_lang: 目标语言代码

        Returns:
            处理后的文本,或 None(如果被过滤)
        """
        # Step 1: 文本提取
        text = self.extract_text_from_html(html_content)
        if not text:
            return None

        # Step 2: 语言检测
        lang, score = self.detect_language(text)
        if lang != target_lang or score < self.min_lang_score:
            return None

        # Step 3: 质量过滤
        passed, reason = self.quality_filter(text)
        if not passed:
            return None

        return text

数据清洗与质量控制

3.1 质量评估指标

Python
import torch
import re
from collections import Counter


class QualityMetrics:
    """
    文本质量评估指标体系

    包含多种互补的质量评估方法:
    - 困惑度(Perplexity):衡量文本的"自然度"
    - 可读性(Readability):衡量文本的阅读难度
    - 连贯性(Coherence):衡量文本的语义一致性
    """

    @staticmethod
    def perplexity_score(text, language_model, tokenizer):
        """
        使用语言模型计算困惑度

        困惑度 = exp(交叉熵损失)

        - 低困惑度(< 50):文本流畅自然
        - 中困惑度(50-200):正常范围
        - 高困惑度(> 200):可能是垃圾/乱码文本

        Args:
            text: 待评估文本
            language_model: 预训练语言模型
            tokenizer: 对应的tokenizer
        """
        with torch.no_grad():
            inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=512)
            outputs = language_model(**inputs, labels=inputs['input_ids'])
            perplexity = torch.exp(outputs.loss)

        return perplexity.item()

    @staticmethod
    def readability_score(text):
        """
        可读性评分(Flesch Reading Ease)

        分数范围 0-100:
        - 90-100: 非常容易(5年级水平)
        - 60-70: 标准(8-9年级水平)
        - 0-30: 非常困难(大学水平)
        """
        import textstat
        return textstat.flesch_reading_ease(text)

    @staticmethod
    def coherence_score(text):
        """
        连贯性评分:基于相邻句子的词汇重叠度

        原理:连贯的文本中,相邻句子通常共享部分词汇(代词、同义词等)。
        使用 Jaccard 相似度衡量句子间的词汇重叠。

        Returns:
            0.0-1.0 之间的连贯性分数
        """
        sentences = [s.strip() for s in text.split('.') if s.strip()]
        if len(sentences) < 2:
            return 0.0

        overlap_scores = []
        for i in range(len(sentences) - 1):
            words1 = set(sentences[i].lower().split())
            words2 = set(sentences[i + 1].lower().split())

            if words1 and words2:
                # Jaccard 相似度 = 交集 / 并集
                overlap = len(words1 & words2) / len(words1 | words2)
                overlap_scores.append(overlap)

        return sum(overlap_scores) / len(overlap_scores) if overlap_scores else 0.0


class QualityClassifier:
    """
    基于特征工程的质量分类器

    在 GPT-3/4 等模型的训练数据清洗中,通常使用两类质量分类器:
    1. 启发式规则过滤(如本类):快速、低成本
    2. 基于模型的分类器(如 fastText):更准确、需要训练数据

    GPT-3 使用的方法:用 Wikipedia 质量的文本作为正样本,
    随机 Common Crawl 作为负样本,训练二分类器。
    """

    def __init__(self):
        self.features = [
            'char_count', 'word_count', 'sentence_count',
            'avg_word_length', 'symbol_ratio', 'uppercase_ratio',
            'stopword_ratio', 'unique_word_ratio'
        ]

    def extract_features(self, text):
        """
        提取文本特征向量

        Returns:
            dict: 特征名到特征值的映射
        """
        features = {}

        # 基本统计
        features['char_count'] = len(text)
        words = text.split()
        features['word_count'] = len(words)
        features['sentence_count'] = len(re.findall(r'[.!?]+', text))

        # 平均词长
        features['avg_word_length'] = (
            sum(len(w) for w in words) / len(words) if words else 0
        )

        # 符号比例
        features['symbol_ratio'] = len(re.findall(r'[^\w\s]', text)) / max(len(text), 1)

        # 大写比例
        features['uppercase_ratio'] = sum(1 for c in text if c.isupper()) / max(len(text), 1)

        # 停用词比例(英文停用词示例)
        stopwords = {
            'the', 'a', 'an', 'in', 'on', 'at', 'to', 'for', 'of',
            'is', 'are', 'was', 'were', 'be', 'been', 'being',
            'have', 'has', 'had', 'do', 'does', 'did',
        }
        word_set = set(w.lower() for w in words)
        features['stopword_ratio'] = (
            len(word_set & stopwords) / len(word_set) if word_set else 0
        )

        # 词汇多样性(Type-Token Ratio)
        features['unique_word_ratio'] = len(word_set) / max(len(words), 1)

        return features

    def classify_quality(self, text, threshold=0.5):
        """
        基于特征组合判断文本质量

        这是一个简化版本。生产中通常训练一个 fastText 或 SVM 分类器。
        """
        features = self.extract_features(text)

        # 简单的加权评分
        score = 0.0

        # 词汇多样性高 → 质量好
        if features['unique_word_ratio'] > 0.4:
            score += 0.2

        # 停用词比例适中 → 自然语言
        if 0.1 < features['stopword_ratio'] < 0.5:
            score += 0.2

        # 符号比例低 → 非垃圾
        if features['symbol_ratio'] < 0.2:
            score += 0.2

        # 平均词长适中 → 正常文本
        if 3 < features['avg_word_length'] < 8:
            score += 0.2

        # 句子数量合理 → 有内容
        if features['sentence_count'] > 2:
            score += 0.2

        return score >= threshold

3.2 PII(个人身份信息)检测与移除

Python
import re
import spacy


class PIIDetector:
    """
    PII检测与脱敏

    在训练数据中移除PII是数据合规的关键步骤,涉及:
    - GDPR(欧盟)、CCPA(加州)等隐私法规
    - 防止模型记忆并泄露个人信息
    - 常见PII类型:邮箱、电话、身份证号、信用卡号、IP地址、姓名

    检测方法:
    1. 正则表达式:模式固定的PII(邮箱、电话等)
    2. NER模型:上下文相关的PII(人名、组织、地点)
    """

    PII_PATTERNS = {
        'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
        'phone_us': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
        'phone_cn': r'\b1[3-9]\d{9}\b',
        'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
        'credit_card': r'\b(?:\d[ -]*?){13,16}\b',
        'ip_address': r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b',
        'id_card_cn': r'\b\d{17}[\dXx]\b',  # 中国身份证号
    }

    def __init__(self, use_ner=True):
        """
        Args:
            use_ner: 是否使用NER模型检测人名等(需要安装spacy模型)
        """
        self.use_ner = use_ner
        if use_ner:
            try:
                self.nlp = spacy.load('en_core_web_sm')
            except OSError:
                print("Warning: spacy model not found. NER detection disabled.")
                self.nlp = None

    def detect_pii(self, text):
        """
        检测文本中的所有PII

        Returns:
            list of dict: 每个dict包含 type, value, start, end
        """
        pii_found = []

        # 正则检测
        for pii_type, pattern in self.PII_PATTERNS.items():
            for match in re.finditer(pattern, text):
                pii_found.append({
                    'type': pii_type,
                    'value': match.group(),
                    'start': match.start(),
                    'end': match.end()
                })

        # NER检测(姓名、组织、地点)
        if self.use_ner and self.nlp is not None:
            doc = self.nlp(text)
            for ent in doc.ents:
                if ent.label_ in ['PERSON', 'ORG', 'GPE']:
                    pii_found.append({
                        'type': ent.label_.lower(),
                        'value': ent.text,
                        'start': ent.start_char,
                        'end': ent.end_char
                    })

        return pii_found

    def anonymize(self, text, pii_list=None):
        """
        对PII进行脱敏处理

        策略:从后往前替换,避免位置偏移
        """
        if pii_list is None:
            pii_list = self.detect_pii(text)

        # 按位置降序排列(从后往前替换)
        pii_list.sort(key=lambda x: x['start'], reverse=True)

        anonymized = text
        for pii in pii_list:
            replacement = f"[{pii['type'].upper()}_REDACTED]"
            anonymized = (
                anonymized[:pii['start']] +
                replacement +
                anonymized[pii['end']:]
            )

        return anonymized

数据去重技术

为什么去重至关重要:研究表明(Lee et al., 2022 "Deduplicating Training Data Makes Language Models Better"),重复数据会导致: 1. 训练效率降低:模型在重复样本上浪费计算 2. 记忆效应加剧:重复数据使模型更容易记忆训练样本(隐私风险) 3. 性能下降:去重后训练的模型在下游任务上表现更好 4. 评估污染:训练集与测试集的重叠导致虚假高分

4.1 精确去重

Python
import hashlib


class ExactDeduplication:
    """
    精确去重:基于哈希的完全匹配

    原理:对每条文本计算哈希值(MD5/SHA256),相同哈希=相同文本。

    时间复杂度:O(N)(N为文档数)
    空间复杂度:O(N)(存储哈希集合)

    适用场景:大规模数据集的快速去重
    """

    def __init__(self, hash_func='md5'):
        self.seen_hashes = set()
        self.hash_func = hash_func
        self.duplicate_count = 0

    def compute_hash(self, text):
        """计算文本哈希"""
        h = hashlib.new(self.hash_func)
        h.update(text.encode('utf-8'))
        return h.hexdigest()

    def is_duplicate(self, text):
        """
        检查是否重复

        Returns:
            True if duplicate, False if new (and added to set)
        """
        text_hash = self.compute_hash(text)
        if text_hash in self.seen_hashes:
            self.duplicate_count += 1
            return True
        self.seen_hashes.add(text_hash)
        return False

    def deduplicate_dataset(self, texts):
        """
        对整个数据集去重

        Args:
            texts: 文本列表
        Returns:
            (unique_texts, duplicate_ratio)
        """
        unique = []
        for text in texts:
            if not self.is_duplicate(text):
                unique.append(text)

        ratio = self.duplicate_count / max(len(texts), 1)
        return unique, ratio

4.2 近似去重(MinHash + LSH)

Python
import hashlib
from collections import defaultdict


class NearDeduplication:
    """
    近似去重(MinHash + LSH)

    核心思想:
    1. 将文本分解为 k-gram shingles(滑动窗口词组)
    2. 用 MinHash 将 shingle 集合压缩为固定长度签名
    3. 用 LSH(局部敏感哈希)快速找到候选对
    4. 对候选对计算精确 Jaccard 相似度验证

    MinHash 原理:
    Jaccard(A, B) = |A ∩ B| / |A ∪ B|
    MinHash 签名近似保持 Jaccard 相似度:
    P(min_hash(A) = min_hash(B)) ≈ Jaccard(A, B)

    LSH 原理:
    将签名分为 b 个 band,每个 band 有 r 行
    如果两个签名在任一 band 完全相同,则为候选对
    阈值 ≈ (1/b)^(1/r)

    Args:
        num_hashes: MinHash 签名长度(越大越精确,但越慢)
        num_bands: LSH band 数量
    """

    def __init__(self, num_hashes=128, num_bands=16):
        self.num_hashes = num_hashes
        self.num_bands = num_bands
        self.rows_per_band = num_hashes // num_bands
        # 每个 band 一个独立的哈希桶
        self.buckets = [defaultdict(list) for _ in range(num_bands)]
        self.doc_index = 0  # 文档编号(避免存储原始文本)
        self._stored_shingles = {}  # doc_id -> shingles

    def get_shingles(self, text, k=5):
        """
        获取 k-gram shingles

        k=5 表示每5个连续词组成一个 shingle。
        k 越大,去重粒度越细(更严格)。
        """
        words = text.split()
        shingles = set()
        for i in range(len(words) - k + 1):
            shingle = ' '.join(words[i:i + k])
            shingles.add(shingle)
        return shingles

    def compute_minhash(self, shingles):
        """
        计算 MinHash 签名

        通过对每个 shingle 应用 num_hashes 个不同的哈希函数,
        取每个哈希函数的最小值,构成签名向量。
        """
        signature = []
        for i in range(self.num_hashes):
            min_hash = float('inf')
            for shingle in shingles:
                # 通过加盐模拟不同的哈希函数
                hash_val = int(
                    hashlib.sha1(f"{shingle}:{i}".encode()).hexdigest(), 16
                )
                min_hash = min(min_hash, hash_val)
            signature.append(min_hash)
        return signature

    def add_document(self, text):
        """
        添加文档并检查是否近似重复

        Returns:
            (is_duplicate, best_similarity, duplicate_of_doc_id)
        """
        shingles = self.get_shingles(text)
        if not shingles:
            return False, 0.0, None

        signature = self.compute_minhash(shingles)

        # LSH: 将签名分桶,收集候选对
        candidate_ids = set()
        for band_idx in range(self.num_bands):
            start = band_idx * self.rows_per_band
            end = start + self.rows_per_band
            band_signature = tuple(signature[start:end])

            bucket = self.buckets[band_idx]
            if band_signature in bucket:
                candidate_ids.update(bucket[band_signature])
            bucket[band_signature].append(self.doc_index)

        # 对候选对计算精确 Jaccard 相似度
        best_sim = 0.0
        best_match = None
        for cand_id in candidate_ids:
            # 注意:生产中应维护 shingles 存储而非重算
            cand_shingles = self._stored_shingles.get(cand_id, set())
            if cand_shingles:
                sim = len(shingles & cand_shingles) / len(shingles | cand_shingles)
                if sim > best_sim:
                    best_sim = sim
                    best_match = cand_id

        # 存储当前文档的 shingles
        self._stored_shingles[self.doc_index] = shingles
        self.doc_index += 1

        threshold = 0.8
        return best_sim > threshold, best_sim, best_match

4.3 文档级去重(SimHash)

Python
import hashlib
from collections import defaultdict


class DocumentDeduplication:
    """
    文档级去重(SimHash)

    SimHash 是一种局部敏感哈希(LSH)算法:
    - 相似文档的 SimHash 指纹的汉明距离小
    - 不相似文档的汉明距离大
    - 通过比较汉明距离判断文档相似性

    与 MinHash 的区别:
    - MinHash: 估计 Jaccard 相似度(集合交并比)
    - SimHash: 估计余弦相似度(向量夹角)

    Args:
        hashbits: 指纹位数(通常64位)
        threshold: 汉明距离阈值(越小越严格,通常3-5)
    """

    def __init__(self, hashbits=64, threshold=3):
        self.hashbits = hashbits
        self.threshold = threshold
        self.fingerprints = []  # 存储所有已见文档的指纹

    def compute_simhash(self, text):
        """
        计算 SimHash 指纹

        算法步骤:
        1. 分词并统计词频
        2. 对每个词计算哈希值
        3. 根据哈希的每一位,加权累加到向量中
        4. 向量正分量→1,负分量→0,生成指纹
        """
        # 分词并统计词频
        words = text.split()
        word_freq = defaultdict(int)
        for word in words:
            word_freq[word] += 1

        # 初始化累加向量
        vector = [0] * self.hashbits

        # 加权累加
        for word, freq in word_freq.items():
            hash_val = int(hashlib.md5(word.encode()).hexdigest(), 16)

            for i in range(self.hashbits):
                bit = (hash_val >> i) & 1
                if bit:
                    vector[i] += freq  # 正权重
                else:
                    vector[i] -= freq  # 负权重

        # 生成指纹:正→1,负→0
        fingerprint = 0
        for i in range(self.hashbits):
            if vector[i] > 0:
                fingerprint |= (1 << i)

        return fingerprint

    def hamming_distance(self, hash1, hash2):
        """
        计算汉明距离:两个等长二进制串中不同位的数量
        """
        xor = hash1 ^ hash2
        return bin(xor).count('1')

    def is_duplicate(self, text):
        """
        检查是否与已有文档重复

        Returns:
            (is_duplicate, best_distance)
        """
        fingerprint = self.compute_simhash(text)

        best_dist = self.hashbits  # 最大可能距离
        for seen_fp in self.fingerprints:
            dist = self.hamming_distance(fingerprint, seen_fp)
            best_dist = min(best_dist, dist)
            if dist <= self.threshold:
                return True, dist

        self.fingerprints.append(fingerprint)
        return False, best_dist

4.4 去重方法对比

Text Only
去重方法对比
═══════════════════════════════════════════════════════════════════

方法            精确度    速度      内存      适用场景
──────────────────────────────────────────────────────────────────
精确哈希         完全匹配  O(N)     O(N)     快速去重,第一轮过滤
MinHash+LSH     近似      O(N·H)   O(N·H)   大规模近似去重(推荐)
SimHash         近似      O(N·L)   O(N)     文档级粗粒度去重
Suffix Array    精确      O(N·L)   O(N·L)   段落级精确去重
MinHash+LSH+SA  最优      较慢     较高     生产级多阶段去重

推荐的多阶段去重流程:
1. 精确哈希去重(快速移除完全重复)
2. MinHash+LSH 近似去重(移除高度相似文档)
3. Suffix Array 段落级去重(移除部分重复段落)

工具推荐:
- datasketch (Python): MinHash/LSH 的生产级实现
- text-dedup: 预训练数据去重工具包
- cc_net: Common Crawl 专用处理工具
═══════════════════════════════════════════════════════════════════

Tokenization 工程

5.1 BPE 训练

Python
from collections import defaultdict


class BPETokenizerTrainer:
    """
    BPE (Byte-Pair Encoding) 训练器

    BPE 算法原理:
    1. 初始化:将所有单词拆分为字符序列
    2. 迭代:统计相邻 token 对的频率,合并最高频的 token 对
    3. 重复直到达到目标词汇表大小

    BPE 的优势:
    - 平衡了词汇表大小和序列长度
    - 能处理未登录词(OOV)
    - 被广泛用于 GPT、LLaMA 等模型

    Args:
        vocab_size: 目标词汇表大小
    """

    def __init__(self, vocab_size=32000):
        self.vocab_size = vocab_size
        self.vocab = set()
        self.merges = []

    def train(self, texts):
        """
        训练 BPE tokenizer

        Args:
            texts: 训练文本列表

        Returns:
            self(支持链式调用)
        """
        # Step 1: 初始化——将所有单词拆分为字符序列
        word_freqs = defaultdict(int)
        for text in texts:
            for word in text.split():
                # 添加结束标记 </w> 区分词边界
                chars = tuple(list(word) + ['</w>'])
                word_freqs[chars] += 1

        # 初始化词汇表(所有出现过的字符)
        self.vocab = set()
        for word in word_freqs:
            self.vocab.update(word)

        # Step 2: 迭代合并
        num_merges = self.vocab_size - len(self.vocab)

        for i in range(num_merges):
            # 统计相邻 token 对的频率
            pairs = defaultdict(int)
            for word, freq in word_freqs.items():
                for j in range(len(word) - 1):
                    pairs[(word[j], word[j + 1])] += freq

            if not pairs:
                break

            # 找到最高频 token 对
            best_pair = max(pairs, key=pairs.get)
            self.merges.append(best_pair)

            # 在所有单词中合并该 token 对
            new_word_freqs = defaultdict(int)
            new_vocab = set()

            for word, freq in word_freqs.items():
                new_word = []
                idx = 0
                while idx < len(word):
                    if (idx < len(word) - 1
                            and (word[idx], word[idx + 1]) == best_pair):
                        new_word.append(word[idx] + word[idx + 1])
                        idx += 2
                    else:
                        new_word.append(word[idx])
                        idx += 1

                new_word_freqs[tuple(new_word)] += freq
                new_vocab.update(new_word)

            word_freqs = new_word_freqs
            self.vocab = new_vocab

            if (i + 1) % 1000 == 0:
                merged = best_pair[0] + best_pair[1]
                print(f"Merge {i + 1}/{num_merges}: "
                      f"{best_pair} -> {merged}")

        # 添加特殊 token
        special_tokens = ['<pad>', '<unk>', '<s>', '</s>', '<mask>']
        self.vocab.update(special_tokens)

        return self

    def encode(self, text):
        """
        使用学到的 merge 规则编码文本
        """
        words = text.split()
        tokens = []

        for word in words:
            word_tokens = list(word) + ['</w>']

            # 按顺序应用 merge 规则
            for merge in self.merges:
                new_tokens = []
                i = 0
                while i < len(word_tokens):
                    if (i < len(word_tokens) - 1
                            and (word_tokens[i], word_tokens[i + 1]) == merge):
                        new_tokens.append(merge[0] + merge[1])
                        i += 2
                    else:
                        new_tokens.append(word_tokens[i])
                        i += 1
                word_tokens = new_tokens

            tokens.extend(word_tokens)

        return tokens

    def decode(self, tokens):
        """
        解码 token 序列为文本
        """
        text = ''.join(tokens)
        text = text.replace('</w>', ' ')
        return text.strip()

5.2 使用 Hugging Face Tokenizers(生产级)

Python
from tokenizers import Tokenizer, models, pre_tokenizers, decoders, trainers


class ModernTokenizerTraining:
    """
    使用 HuggingFace Tokenizers 库训练生产级 Tokenizer

    相比手写 BPE,HuggingFace Tokenizers 的优势:
    - Rust 实现,速度快 10-100 倍
    - 支持多种预分词策略
    - 内置多种解码器
    - 支持直接保存/加载为 JSON 格式
    """

    @staticmethod
    def train_bpe(corpus_files, vocab_size=32000, output_path="tokenizer.json"):
        """
        训练 BPE tokenizer(字节级,支持所有 Unicode 字符)

        Args:
            corpus_files: 语料文件路径列表
            vocab_size: 目标词汇表大小
            output_path: 输出路径
        """
        # 初始化 BPE 模型
        tokenizer = Tokenizer(models.BPE())

        # 字节级预分词器(GPT-2 风格)
        tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(
            add_prefix_space=False
        )

        # 配置训练器
        trainer = trainers.BpeTrainer(
            vocab_size=vocab_size,
            special_tokens=["<pad>", "<unk>", "<s>", "</s>", "<mask>"],
            min_frequency=2,
            show_progress=True,
        )

        # 训练
        tokenizer.train(files=corpus_files, trainer=trainer)

        # 设置解码器
        tokenizer.decoder = decoders.ByteLevel()

        # 保存
        tokenizer.save(output_path)
        print(f"Tokenizer saved to {output_path}")
        print(f"Vocabulary size: {tokenizer.get_vocab_size()}")

        return tokenizer

    @staticmethod
    def train_sentencepiece(
        corpus_files, vocab_size=32000,
        model_prefix="spm", model_type='bpe'
    ):
        """
        训练 SentencePiece tokenizer

        SentencePiece 的优势:
        - 直接处理 raw text,不需要预分词
        - 对中文、日文等多语言支持更好
        - 支持 BPE 和 Unigram 两种算法
        - 被 LLaMA、T5、Gemini 等模型使用

        Args:
            corpus_files: 语料文件路径列表
            vocab_size: 目标词汇表大小
            model_prefix: 输出模型前缀
            model_type: 'bpe' 或 'unigram'
        """
        import sentencepiece as spm
        import tempfile

        # 合并所有文件
        with tempfile.NamedTemporaryFile(
            mode='w', delete=False, suffix='.txt', encoding='utf-8'
        ) as tmp:
            for file in corpus_files:
                with open(file, 'r', encoding='utf-8') as f:
                    tmp.write(f.read())
            combined_file = tmp.name

        # 训练
        spm.SentencePieceTrainer.train(
            input=combined_file,
            model_prefix=model_prefix,
            vocab_size=vocab_size,
            model_type=model_type,
            character_coverage=0.9995,  # 覆盖99.95%的字符
            num_threads=8,
            split_digits=True,  # 将数字拆分为单个数字token
            allow_whitespace_only_pieces=True,
            byte_fallback=True,  # 未登录字符回退到字节级
            unk_piece='<unk>',
            bos_piece='<s>',
            eos_piece='</s>',
            pad_piece='<pad>'
        )

        print(f"SentencePiece model saved: {model_prefix}.model")
        return f"{model_prefix}.model"

5.3 Tokenizer 选择指南

Text Only
主流 Tokenizer 对比
═══════════════════════════════════════════════════════════════════

Tokenizer          算法      词汇表大小   使用模型
──────────────────────────────────────────────────────────────────
GPT-2 Tokenizer    BPE       50,257     GPT-2, GPT-3
GPT-4 Tokenizer    BPE       100,256    GPT-4, o1
LLaMA Tokenizer    BPE(SPM)  32,000     LLaMA-1/2
LLaMA-3 Tokenizer  BPE(SPM)  128,256    LLaMA-3(大幅扩充多语言)
Qwen Tokenizer     BPE(SPM)  151,643    Qwen-2.5(中文优化)
Claude Tokenizer   BPE       ~65,000    Claude 系列

关键选择因素:
1. 多语言支持:LLaMA-3/Qwen 的大词汇表对中文更友好
2. 代码能力:代码 token 效率影响模型编程能力
3. 压缩率:越高 → 同等信息量下 token 数越少 → 训练/推理越快
═══════════════════════════════════════════════════════════════════

高效数据加载

6.1 内存映射与流式加载

Python
import mmap
import json
import torch
from torch.utils.data import IterableDataset, DataLoader


class MemoryMappedDataset:
    """
    内存映射数据集

    使用 mmap 将文件映射到虚拟内存,按需加载。
    适合数据集大于物理内存的场景(如TB级预训练数据)。

    原理:
    - mmap 创建虚拟内存映射,不实际加载数据
    - 访问某条数据时,OS 自动将对应页加载到物理内存
    - 配合索引文件实现 O(1) 随机访问
    """

    def __init__(self, data_path, index_path):
        # 打开数据文件并创建内存映射
        self.file = open(data_path, 'rb')
        self.mm = mmap.mmap(
            self.file.fileno(), 0, access=mmap.ACCESS_READ
        )

        # 加载索引(记录每条数据的字节偏移)
        with open(index_path, 'r') as f:
            self.index = json.load(f)

    def __getitem__(self, idx):
        """通过索引随机访问第 idx 条数据"""
        start, end = self.index[idx]
        self.mm.seek(start)
        data = self.mm.read(end - start)
        return json.loads(data.decode('utf-8'))

    def __len__(self):
        return len(self.index)

    def __del__(self):
        """清理资源"""
        if hasattr(self, 'mm') and self.mm:
            self.mm.close()
        if hasattr(self, 'file') and self.file:
            self.file.close()


class StreamingDataset:
    """
    流式数据集

    使用生成器逐行读取,不将全部数据加载到内存。
    适合超大规模数据集(TB级)的顺序遍历。

    注意:IterableDataset 不支持随机访问和 shuffle。
    分布式训练中需要确保每个 worker 读取不同的数据分片。
    """

    def __init__(self, data_paths):
        self.data_paths = data_paths

    def __iter__(self):
        for path in self.data_paths:
            with open(path, 'r', encoding='utf-8') as f:
                for line in f:
                    if line.strip():
                        yield json.loads(line)

6.2 DataLoader 优化与序列打包

Python
import torch
from torch.utils.data import IterableDataset, DataLoader


class EfficientDataLoader:
    """
    高效数据加载器配置

    关键优化参数:
    - num_workers: 多进程并行加载(通常设为 CPU 核心数的一半)
    - pin_memory: 锁页内存,加速 CPU→GPU 数据传输
    - prefetch_factor: 预取批次数,减少 GPU 等待
    - persistent_workers: 保持 worker 进程存活,避免重复创建
    """

    @staticmethod
    def create_dataloader(dataset, batch_size, num_workers=4):
        """创建优化的 DataLoader(Map-style Dataset)"""
        return DataLoader(
            dataset,
            batch_size=batch_size,
            num_workers=num_workers,
            pin_memory=True,
            prefetch_factor=2 if num_workers > 0 else None,
            persistent_workers=True if num_workers > 0 else False,
            drop_last=True,
        )

    @staticmethod
    def create_streaming_dataloader(dataset, batch_size):
        """流式 DataLoader(IterableDataset,通常不用多 worker)"""
        return DataLoader(
            dataset,
            batch_size=batch_size,
            num_workers=0,
            pin_memory=True,
        )


class PackedDataset(IterableDataset):
    """
    序列打包数据集(Packing)

    核心思想:将多个短序列拼接成一个长序列(max_length),
    减少 padding 浪费,提高 GPU 利用率。

    实测效果:
    - 短序列为主的数据集:训练速度提升 2-4 倍
    - 长序列为主的数据集:提升有限

    注意事项:
    - 需要在 attention mask 中正确处理序列边界
    - 某些训练框架(如 Megatron-LM)原生支持 packing
    """

    def __init__(self, dataset, max_length=2048, tokenizer=None):
        self.dataset = dataset
        self.max_length = max_length
        self.tokenizer = tokenizer

    def __iter__(self):
        buffer = []
        buffer_length = 0

        for sample in self.dataset:
            tokens = self.tokenizer.encode(sample['text'])

            if buffer_length + len(tokens) > self.max_length:
                if buffer:
                    # 添加 EOS token 分隔
                    packed = buffer + [self.tokenizer.eos_token_id]
                    # Padding 到 max_length
                    pad_len = self.max_length - len(packed)
                    packed = packed + [self.tokenizer.pad_token_id] * pad_len

                    yield {
                        'input_ids': torch.tensor(packed[:self.max_length]),
                        'labels': torch.tensor(packed[:self.max_length]),
                    }

                buffer = tokens
                buffer_length = len(tokens)
            else:
                buffer.extend(tokens)
                buffer_length += len(tokens)

数据混合策略

7.1 多数据源混合

Python
import random


class DataMixer:
    """
    多数据源混合策略

    数据混合是预训练的关键决策之一。不同的混合比例会显著影响模型能力:
    - 代码比例高 → 编程能力强,但可能影响自然语言流畅度
    - 学术比例高 → 推理能力强,但可能缺乏常识
    - 网页比例高 → 知识面广,但噪声也多

    LLaMA-3 的数据混合(参考):
    - 网页文本: ~85%
    - 代码: ~8%
    - 学术: ~5%
    - 其他: ~2%
    """

    def __init__(self, datasets, weights, seed=42):
        """
        Args:
            datasets: 数据源字典 {name: iterable}
            weights: 采样权重 {name: weight}(无需归一化)
        """
        self.datasets = datasets
        self.weights = weights
        self.rng = random.Random(seed)

        # 创建迭代器
        self.iterators = {
            name: iter(dataset) for name, dataset in datasets.items()
        }

    def sample(self):
        """
        按权重采样一个数据源并获取一条数据

        Returns:
            (sample, source_name)
        """
        names = list(self.weights.keys())
        weights = [self.weights[name] for name in names]

        # 加权随机选择数据源
        chosen = self.rng.choices(names, weights=weights, k=1)[0]

        try:
            sample = next(self.iterators[chosen])
        except StopIteration:
            # 数据源耗尽,重新创建迭代器
            self.iterators[chosen] = iter(self.datasets[chosen])
            sample = next(self.iterators[chosen])

        return sample, chosen

    def create_mixed_iterator(self, total_samples):
        """
        创建混合迭代器

        Yields:
            dict: 包含数据和来源标记的样本
        """
        for _ in range(total_samples):
            sample, source = self.sample()
            sample['source'] = source
            yield sample


# 数据混合示例配置
DATA_MIXING_CONFIGS = {
    "general_purpose": {
        "web": 0.60,
        "code": 0.15,
        "books": 0.10,
        "academic": 0.10,
        "conversation": 0.05
    },
    "code_focused": {
        "code": 0.50,
        "web": 0.30,
        "books": 0.10,
        "academic": 0.05,
        "conversation": 0.05
    },
    "reasoning_focused": {
        "academic": 0.30,
        "math_reasoning": 0.20,
        "code": 0.20,
        "web": 0.20,
        "books": 0.10
    },
}

7.2 动态数据课程与上采样

Python
import torch
from torch.utils.data import WeightedRandomSampler


class CurriculumLearning:
    """
    课程学习(Curriculum Learning)

    核心思想:按照从易到难的顺序组织训练数据,
    类似人类学习过程中的课程安排。

    实践中的课程策略:
    1. 按文本长度:先短后长(短文本更容易学习基础模式)
    2. 按文本质量:先高质量后低质量(先建立好的表示)
    3. 按任务复杂度:先简单任务后复杂任务

    注意:课程学习在 LLM 预训练中的效果仍有争议,
    一些研究发现随机打乱的效果与课程学习相当。
    """

    def __init__(self, datasets_by_difficulty):
        """
        Args:
            datasets_by_difficulty: 按难度排序的数据源列表
                [(difficulty_level, dataset), ...]
                difficulty_level: 0(最易)到 N(最难)
        """
        self.datasets = datasets_by_difficulty

    def get_current_data(self, training_progress):
        """
        根据训练进度获取对应难度的数据

        Args:
            training_progress: 0.0 ~ 1.0

        Returns:
            当前阶段的数据集
        """
        num_stages = len(self.datasets)
        target_stage = int(training_progress * num_stages)
        target_stage = min(target_stage, num_stages - 1)

        return self.datasets[target_stage][1]


class UpsamplingStrategy:
    """
    上采样策略:对高质量数据增加采样频率

    实践中的应用:
    - Wikipedia 等高质量数据通常上采样 2-5 倍
    - 代码数据在非代码专用模型中也常上采样
    - DeepSeek-Math 报告对数学数据进行了大幅上采样
    """

    @staticmethod
    def quality_based_upsampling(dataset, quality_scores, target_ratio=2.0):
        """
        基于质量评分的上采样

        Args:
            dataset: 原始数据集
            quality_scores: 每个样本的质量分数(0.0-1.0)
            target_ratio: 最高质量数据的采样倍数

        Returns:
            WeightedRandomSampler
        """
        weights = [
            1.0 + (target_ratio - 1.0) * score
            for score in quality_scores
        ]

        sampler = WeightedRandomSampler(
            weights, len(dataset), replacement=True
        )
        return sampler

    @staticmethod
    def domain_upsampling(datasets, sampling_multipliers):
        """
        按领域上采样

        Args:
            datasets: {domain_name: dataset}
            sampling_multipliers: {domain_name: multiplier}
                例如 {"wikipedia": 3.0, "code": 2.0, "web": 1.0}

        Returns:
            混合后的数据列表
        """
        mixed = []
        for domain, dataset in datasets.items():
            multiplier = sampling_multipliers.get(domain, 1.0)
            data = list(dataset)

            # 重复采样
            repeats = int(multiplier)
            for _ in range(repeats):
                mixed.extend(data)

            # 处理小数部分(随机采样补充)
            remainder = multiplier - repeats
            if remainder > 0:
                import random
                sample_size = int(len(data) * remainder)
                mixed.extend(random.sample(data, min(sample_size, len(data))))

        return mixed

总结

数据工程关键要点

Text Only
1. 数据质量 > 数据数量
   - LIMA 论文证明 1000 条高质量数据即可微调出优秀模型
   - 垃圾数据会损害模型性能(Garbage In, Garbage Out)
   - 2024-2025 趋势:更注重数据质量而非单纯扩大规模

2. 去重至关重要
   - 重复数据导致过拟合和记忆效应
   - 多阶段去重:精确 → 近似 → 段落级
   - 生产中推荐使用 datasketch/text-dedup 工具

3. Tokenization 影响模型能力
   - 词汇表大小影响模型效率和语言覆盖
   - 中文/多语言需要更大的词汇表(128K+)
   - LLaMA-3 从 32K 扩展到 128K 词汇表显著提升多语言能力

4. 数据混合需要策略
   - 不同任务需要不同数据配比
   - 高质量数据应适当上采样
   - 课程学习可作为辅助策略

5. 高效加载是训练瓶颈
   - 数据加载不能成为 GPU 等待的原因
   - 内存映射和流式加载解决大数据问题
   - 序列打包(Packing)减少 padding 浪费

推荐工具

工具 用途 特点
HuggingFace Datasets 数据集加载和处理 流式加载、内存映射
Tokenizers 快速 tokenizer 训练 Rust 实现,极快
SentencePiece 多语言 tokenization 支持中文/日文等
datasketch MinHash/LSH 去重 大规模近似去重
text-dedup 预训练数据去重 多算法集成
Spark/Dask 大规模数据处理 分布式处理 TB 级数据
cc_net Common Crawl 处理 专用 CC 工具链

下一步02-训练基础设施 - 学习分布式训练、混合精度和训练优化技术。



练习题

思考题

  1. 数据质量 vs 数据数量:为什么 LIMA 论文只用了 1000 条高质量数据就能微调出优秀模型?这对数据工程实践有什么启示?

  2. 去重策略选择:在什么场景下应该使用精确去重,什么场景下应该使用近似去重(MinHash+LSH)?为什么生产环境推荐多阶段去重?

  3. Tokenizer 词汇表大小:LLaMA-3 将词汇表从 32K 扩展到 128K,这对中文处理有什么影响?词汇表越大越好吗?

  4. 数据混合策略:如果训练一个代码能力强的模型,数据配比应该如何设计?如果训练一个通用对话模型呢?

  5. 序列打包(Packing):Packing 为什么能提升训练效率?在 attention mask 中需要特别注意什么?

代码实践

  1. 入门:实现一个简单的精确去重器,统计给定文本列表中的重复率。

  2. 进阶:使用 HuggingFace Tokenizers 库训练一个 BPE tokenizer,并在自定义语料上测试编码/解码效果。

  3. 高级:实现 MinHash+LSH 近似去重算法,对比其与精确去重在效率和准确性上的差异。

💡 参考答案 **思考题 1**:LIMA 论文(Zhou et al., 2023)证明了预训练模型已经学到了大量知识,SFT 的核心作用是"教模型如何使用已有知识",而非"注入新知识"。1000 条高质量、多样化的指令数据足以教会模型遵循指令的格式和风格。这启示我们:数据质量(准确性、多样性、格式规范)远比数量重要,低质量数据反而会损害模型性能(Garbage In, Garbage Out)。 **思考题 2**:精确去重适合第一轮快速过滤(O(N) 时间复杂度),能移除完全相同的文档;近似去重(MinHash+LSH)适合第二轮过滤,能检测高度相似但不完全相同的文档(如翻译、改写)。生产环境推荐多阶段:先精确去重(快)→ 再近似去重(准)→ 最后段落级去重(细粒度)。 **思考题 3**:128K 词汇表对中文的影响:(1) 大部分常用中文字/词可以被编码为单个 token,而非被拆分为多个字节级 token;(2) 编码效率提升 → 同等信息量下 token 数更少 → 训练和推理更快。但词汇表越大,embedding 层参数越多,模型体积增大。128K 是当前中文模型的经验上限,继续增大收益递减。 **思考题 4**:代码能力强的模型:代码 40-50%、网页 20-25%、学术 10-15%、数学推理 10-15%、其他 5-10%。通用对话模型:网页 50-60%、代码 15-20%、书籍 10%、学术 5-10%、对话 5-10%。关键原则:代码和推理数据对模型整体智能提升最显著。 **思考题 5**:Packing 将多个短序列拼接成一个长序列,减少 padding 浪费的算力,GPU 利用率可提升 2-4 倍。但 attention mask 中必须正确标记序列边界,防止不同样本之间的 token 互相 attend。Megatron-LM 等框架通过 position_ids 重置和自定义 attention mask 来处理。

最后更新日期: 2026-04-21 适用版本: LLM 学习教程 v2026