28 - 深度学习到大语言模型的演进¶
⚠️ 时效性说明:本章涉及前沿模型与技术,部分内容可能随研究快速演进;请以论文原文和官方发布为准。
📌 导航提示:本章是深度学习与LLM之间的桥梁,承接
05-深度学习.md和18-NLP与Transformer详解.md,为后续22-大语言模型原理.md做铺垫。
🗺️ 章节导航¶
深度学习
└── NLP与Transformer
└── 预训练范式演进 (Word2Vec → BERT → GPT)
└── Transformer架构分类 (Encoder/Decoder/Encoder-Decoder)
└── 规模效应与涌现能力
└── 预训练技术 (NTP/MLM/课程学习)
└── 指令微调与对齐 (RLHF/DPO)
└── 大语言模型
1. NLP预训练范式的演进¶
1.1 从词向量到预训练语言模型¶
NLP领域经历了从离散符号表示到分布式表示的范式转变,而预训练范式的出现标志着NLP进入了"预训练+微调"的新时代。
1.1.1 Word2Vec:分布式表示的奠基¶
Word2Vec(2013,Mikolov等人)提出了两种词向量学习框架:
CBOW (Continuous Bag-of-Words): - 根据上下文预测中心词 - 适合小型数据集
Skip-gram: - 根据中心词预测上下文 - 适合大型语料库
# 使用 Gensim 训练 Word2Vec
from gensim.models import Word2Vec
import jieba
# 准备语料
corpus = [
"深度学习是机器学习的分支",
"神经网络是深度学习的核心",
"Transformer改变了NLP的发展方向"
]
sentences = [list(jieba.cut(s)) for s in corpus]
# 训练模型
model = Word2Vec(
sentences,
vector_size=100, # 词向量维度
window=5, # 上下文窗口大小
min_count=1, # 最小词频
workers=4, # 并行线程数
sg=1 # 1=Skip-gram, 0=CBOW
)
# 获取词向量
vector = model.wv['深度学习']
print(f"词向量维度: {vector.shape}")
print(f"'神经网络'与'深度学习'的相似度: {model.wv.similarity('神经网络', '深度学习')}")
Word2Vec的局限性: - 无法处理多义词(bank既指银行又指河岸) - 静态向量,无法根据上下文动态调整 - 无法捕捉词序信息
1.1.2 ELMo:上下文相关的词向量¶
ELMo(2018,Peters等人)提出了上下文词向量的概念:
# ELMo 上下文词向量示例 (使用 TensorFlow Hub)
import tensorflow_hub as hub
import tensorflow as tf
# 加载预训练 ELMo 模型
elmo = hub.load("https://tfhub.dev/google/elmo/3")
# 获取上下文词向量
sentences = [
"The bank refused to lend money.",
"The bank is located by the river."
]
# 输出三个层的加权和
embeddings = elmo(
sentences,
signature="default",
as_dict=True
)["elmo"]
print(f"输出形状: {embeddings.shape}") # [batch_size, seq_len, 1024]
ELMo的核心思想:使用双向LSTM编码句子,不同层的LSTM捕获不同粒度的语义信息。
1.1.3 BERT:预训练+微调范式的突破¶
BERT(2018,Devlin等人)彻底改变了NLP的范式,提出了两项革命性的预训练任务:
# BERT 预训练任务实现
import torch
import torch.nn as nn
from transformers import BertModel, BertTokenizer
# 加载预训练 BERT
tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
model = BertModel.from_pretrained('bert-base-chinese')
# ============ Masked Language Modeling (MLM) ============
def mlm_forward(text, tokenizer, model, mask_prob=0.15):
"""
BERT的MLM任务:随机mask tokens并预测被mask的词
"""
# 分词
tokens = tokenizer.tokenize(text)
input_ids = tokenizer.convert_tokens_to_ids(tokens)
# 随机mask
masked_indices = []
for i, token in enumerate(tokens):
if token != '[CLS]' and token != '[SEP]':
if torch.rand(1).item() < mask_prob:
tokens[i] = '[MASK]'
masked_indices.append(i)
# 编码
input_ids = tokenizer.convert_tokens_to_ids(tokens)
input_tensor = torch.tensor([input_ids])
# 前向传播
outputs = model(input_tensor)
predictions = outputs.last_hidden_state
# 预测被mask的词
for idx in masked_indices:
pred_token_id = predictions[0, idx].argmax().item()
pred_token = tokenizer.decode([pred_token_id])
print(f"Masked token at {idx}: {pred_token}")
return predictions
# ============ Next Sentence Prediction (NSP) ============
def nsp_forward(sentence_a, sentence_b, tokenizer, model):
"""
BERT的NSP任务:预测句子B是否为句子A的下一句
"""
# 构建 [CLS] A [SEP] B [SEP] 格式
encoded = tokenizer(
sentence_a,
sentence_b,
return_tensors='pt',
max_length=512,
truncation=True
)
# 添加句子标签 (0=下一句, 1=随机句子)
# 实际训练中标签由数据提供
outputs = model(**encoded)
pooled_output = outputs.pooler_output # [CLS] 位置的输出
# 使用 [CLS] 进行二分类
classifier = nn.Linear(768, 2)
logits = classifier(pooled_output)
return logits
1.2 预训练+微调范式的形成¶
预训练+微调(Pre-training + Fine-tuning)已成为NLP的标准范式:
┌─────────────────────────────────────────────────────────────┐
│ 预训练+微调范式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 预训练阶段 │ ──► │ 微调阶段 │ ──► │ 下游任务 │ │
│ │ (大规模数据) │ │ (任务数据) │ │ (应用部署) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ↓ ↓ ↓ │
│ 学习通用语言 适应特定任务 分类/生成/匹配... │
│ 表示与知识 任务能力 评估指标 │
│ │
└─────────────────────────────────────────────────────────────┘
1.2.1 迁移学习在NLP中的应用¶
# 完整的预训练+微调流程
from transformers import (
BertForSequenceClassification,
BertTokenizer,
Trainer,
TrainingArguments
)
from datasets import load_dataset
# ========== 步骤1: 加载预训练模型和分词器 ==========
model_name = "bert-base-chinese"
tokenizer = BertTokenizer.from_pretrained(model_name)
# ========== 步骤2: 准备下游任务数据 ==========
raw_datasets = load_dataset("tyqiangz/multilingual-books", "zh")
print(raw_datasets)
# 步骤2.1: 定义预处理函数
def tokenize_function(examples):
return tokenizer(
examples["text"],
padding="max_length",
truncation=True,
max_length=512
)
# 步骤2.2: 应用预处理
tokenized_datasets = raw_datasets.map(tokenize_function, batched=True)
# 步骤2.3: 划分训练/验证集
train_dataset = tokenized_datasets["train"].train_test_split(test_size=0.1)["train"]
eval_dataset = tokenized_datasets["train"].train_test_split(test_size=0.1)["test"]
# ========== 步骤3: 加载微调模型 ==========
model = BertForSequenceClassification.from_pretrained(
model_name,
num_labels=2 # 二分类任务
)
# ========== 步骤4: 定义训练参数 ==========
training_args = TrainingArguments(
output_dir="./results",
num_train_epochs=3,
per_device_train_batch_size=16,
per_device_eval_batch_size=64,
warmup_steps=500,
weight_decay=0.01,
logging_dir="./logs",
logging_steps=10,
evaluation_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="accuracy",
)
# ========== 步骤5: 创建Trainer并训练 ==========
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
)
trainer.train()
# ========== 步骤6: 模型推理 ==========
def predict(text):
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = model(**inputs)
prediction = outputs.logits.argmax(dim=-1)
return "正面" if prediction.item() == 1 else "负面"
print(predict("这个产品非常好用!"))
2. Transformer到LLM的演进¶
2.1 Transformer架构的三种形态¶
Transformer(Vaswani, 2017)最初作为机器翻译的Encoder-Decoder架构,后来衍生出三种主要形态:
| 架构类型 | 代表模型 | 特点 | 适用场景 |
|---|---|---|---|
| Encoder-only | BERT, RoBERTa, ALBERT | 双向注意力,适合理解任务 | 文本分类、NER、问答 |
| Decoder-only | GPT系列、LLaMA、Claude | 单向注意力,适合生成任务 | 文本生成、对话 |
| Encoder-Decoder | T5, BART, FLAN-T5 | 编解码分离,适合Seq2Seq | 机器翻译、摘要、问答 |
┌─────────────────────────────────────────────────────────────────┐
│ Transformer 三种架构对比 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Encoder-only (BERT) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Input → [Encoder] → [Encoder] → ... → [Encoder] → Output│ │
│ │ ↓ ↓ ↓ │ │
│ │ Self-Att Self-Att Self-Att │ │
│ │ (双向) (双向) (双向) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ✓ 上下文感知 ✗ 仅编码,无法直接生成 │
│ │
│ Decoder-only (GPT) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Input → [Decoder] → [Decoder] → ... → [Decoder] → Output│ │
│ │ ↓ ↓ ↓ │ │
│ │ Masked-Att Masked-Att Masked-Att │ │
│ │ (单向) (单向) (单向) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ✓ 自回归生成 ✗ 只能看到前面的上下文 │
│ │
│ Encoder-Decoder (T5) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Input → [Encoder] → ... → [Encoder] │ │
│ │ ↓ (Cross-Att) │ │
│ │ [Decoder] → ... → [Decoder] → Output │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ✓ 灵活可控 ✗ 参数量大,训练成本高 │
│ │
└─────────────────────────────────────────────────────────────────┘
2.1.1 Encoder-only 架构详解¶
# BERT 架构代码实现要点
import torch
import torch.nn as nn
import math
class TransformerEncoder(nn.Module):
"""
Transformer Encoder 实现
"""
def __init__(self, d_model=768, nhead=12, num_layers=12):
super().__init__()
self.d_model = d_model
# 多头自注意力 (双向)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=d_model * 4,
dropout=0.1,
activation='gelu',
batch_first=True
)
self.transformer_encoder = nn.TransformerEncoder(
encoder_layer,
num_layers=num_layers
)
# 池化层
self.pooler = nn.Linear(d_model, d_model)
self.pooler_activation = nn.Tanh()
def forward(self, x, attention_mask=None):
"""
Args:
x: [batch_size, seq_len, d_model]
attention_mask: [batch_size, seq_len]
"""
# Transformer Encoder
output = self.transformer_encoder(x, src_key_padding_mask=attention_mask)
# [CLS] 池化
cls_output = output[:, 0, :]
pooled = self.pooler_activation(self.pooler(cls_output))
return {
'last_hidden_state': output,
'pooled_output': pooled
}
class BERTModel(nn.Module):
"""
简化的 BERT 模型
"""
def __init__(self, vocab_size=21128, d_model=768, nhead=12,
num_layers=12, max_len=512):
super().__init__()
self.d_model = d_model
# 词嵌入
self.token_embedding = nn.Embedding(vocab_size, d_model)
self.position_embedding = nn.Embedding(max_len, d_model)
self.embedding_norm = nn.LayerNorm(d_model)
# Transformer Encoder
self.encoder = TransformerEncoder(d_model, nhead, num_layers)
# 输出层
self.vocab_projection = nn.Linear(d_model, vocab_size)
def forward(self, input_ids, attention_mask=None):
# 词嵌入 + 位置嵌入
seq_len = input_ids.size(1)
position_ids = torch.arange(seq_len, device=input_ids.device)
position_ids = position_ids.unsqueeze(0).expand_as(input_ids)
x = self.token_embedding(input_ids) + self.position_embedding(position_ids)
x = self.embedding_norm(x)
# 编码
encoder_output = self.encoder(x, attention_mask)
# 词汇预测
logits = self.vocab_projection(encoder_output['last_hidden_state'])
return logits
2.1.2 Decoder-only 架构详解¶
# GPT 风格的自回归解码器实现
class TransformerDecoder(nn.Module):
"""
Transformer Decoder (Causal/Autoregressive)
"""
def __init__(self, d_model=768, nhead=12, num_layers=12):
super().__init__()
decoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=d_model * 4,
dropout=0.1,
activation='gelu',
batch_first=True
)
# 注意:TransformerEncoderLayer 默认使用因果掩码
# 通过 mask 机制实现自回归
self.transformer_decoder = nn.TransformerEncoder(
decoder_layer,
num_layers=num_layers
)
def forward(self, x, attention_mask=None):
"""
x: [batch_size, seq_len, d_model]
attention_mask: 因果掩码 [seq_len, seq_len]
"""
output = self.transformer_decoder(x, src_key_padding_mask=attention_mask)
return output
class GPTModel(nn.Module):
"""
简化的 GPT 模型
"""
def __init__(self, vocab_size=50257, d_model=768, nhead=12,
num_layers=12, max_len=1024):
super().__init__()
self.d_model = d_model
# 词嵌入 (不带Dropout,GPT-3使用embedding weight tying)
self.token_embedding = nn.Embedding(vocab_size, d_model)
self.position_embedding = nn.Embedding(max_len, d_model)
# Transformer Decoder
self.decoder = TransformerDecoder(d_model, nhead, num_layers)
# 输出层 (与输入嵌入共享权重以减少参数量)
self.lm_head = nn.Linear(d_model, vocab_size, bias=False)
self.lm_head.weight = self.token_embedding.weight
def forward(self, input_ids, attention_mask=None):
batch_size, seq_len = input_ids.size()
# 词嵌入 + 位置嵌入
position_ids = torch.arange(seq_len, device=input_ids.device)
position_ids = position_ids.unsqueeze(0).expand(batch_size, -1)
x = self.token_embedding(input_ids) + self.position_embedding(position_ids)
# 编码
output = self.decoder(x, attention_mask)
# 词汇预测
logits = self.lm_head(output)
return logits
def generate(self, input_ids, max_new_tokens, temperature=1.0, top_k=None):
"""
自回归生成
"""
for _ in range(max_new_tokens):
# 裁剪输入到模型最大长度
input_ids_cond = input_ids if input_ids.size(1) <= 1024 else input_ids[:, -1024:]
# 前向传播
logits = self.forward(input_ids_cond)
logits = logits[:, -1, :] / temperature
# Top-k 采样
if top_k is not None:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits[logits < v[:, [-1]]] = float('-inf')
# 采样
probs = torch.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
input_ids = torch.cat([input_ids, next_token], dim=1)
return input_ids
2.2 规模效应:参数量的影响¶
2.2.1 Scaling Laws 详解¶
规模定律(Scaling Laws)揭示了模型性能与参数量、数据量、计算量之间的幂律关系:
# Scaling Laws 核心代码实现
import torch
import torch.nn as nn
import numpy as np
class ScalingLaws:
"""
实现 Kaplan 和 Chinchilla 的 Scaling Laws
"""
@staticmethod
def kaplan_loss(N, D, Nc=8.8e13, Dc=5.4e13,
alpha_N=0.076, alpha_D=0.095, E=0.07):
"""
Kaplan Scaling Law (2020)
L(N, D) = (Nc/N)^αN + (Dc/D)^αD + E
结论: 优先扩大模型规模
"""
loss = (Nc / N) ** alpha_N + (Dc / D) ** alpha_D + E
return loss
@staticmethod
def chinchilla_loss(N, D, A=406.4, B=410.7,
alpha=0.34, beta=0.28, E=1.69):
"""
Chinchilla Scaling Law (2022)
L(N, D) = A/N^α + B/D^β + E
结论: 数据和模型需要等比例扩展
"""
loss = A / (N ** alpha) + B / (D ** beta) + E
return loss
@staticmethod
def compute_optimal_allocation(C, alpha=0.34, beta=0.28):
"""
计算最优计算分配 (Chinchilla)
C = 6ND (训练FLOPs)
"""
# 最优参数
N_opt = C ** (alpha / (alpha + beta)) * (410.7 / 406.4) ** (1 / (alpha + beta))
# 最优数据量
D_opt = C ** (beta / (alpha + beta)) * (406.4 / 410.7) ** (1 / (alpha + beta))
return N_opt, D_opt
# 示例:计算不同预算下的最优分配
scaling = ScalingLaws()
compute_budgets = [1e20, 1e21, 1e22, 1e23, 1e24]
print("Chinchilla 最优分配策略:")
print("=" * 60)
print(f"{'计算预算 (FLOPs)':<20} {'最优参数量 N':<20} {'最优数据量 D (tokens)':<25}")
print("-" * 60)
for C in compute_budgets:
N_opt, D_opt = scaling.compute_optimal_allocation(C)
print(f"{C:<20.1e} {N_opt:<20.1e} {D_opt:<25.1e}")
2.2.2 涌现能力 (Emergent Abilities)¶
涌现能力是指模型在规模超过某个临界点后,突然涌现出的非连续性能力:
┌─────────────────────────────────────────────────────────────────┐
│ 涌现能力示意图 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 性能 │
│ │ │
│ 高 │ ╭─────────── 思维链推理 │
│ │ ╭──── 指令跟随 │
│ │ ╭──── 零样本任务迁移 │
│ │ ╭──── │
│ │────╯ BERT-style 任务 │
│ │ │
│ └────────────────────────────────────────────────────────► │
│ 参数量 (对数刻度) │
│ 10M 100M 1B 10B 100B 1T │
│ │
│ 涌现能力通常在 6B-100B 参数区间出现 │
│ │
└─────────────────────────────────────────────────────────────────┘
常见的涌现能力:
| 能力 | 描述 | 典型参数量 |
|---|---|---|
| 思维链 (CoT) | 能够进行逐步推理 | ~100B |
| 指令跟随 | 理解并执行复杂指令 | ~60B |
| 零样本迁移 | 无需微调完成新任务 | ~40B |
| 上下文学习 | 从少量示例中学习 | ~30B |
| 工具使用 | 调用外部工具/API | ~100B+ |
# 涌现能力示例:思维链提示
from transformers import AutoModelForCausalLM, AutoTokenizer
# 加载大型语言模型 (需要足够大的模型才具备涌现能力)
model_name = "gpt-3.5-turbo" # 或其他 30B+ 模型
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
# 标准提示 vs 思维链提示
standard_prompt = """
Q: 小明有5个苹果,小红给了他3个,小明吃掉了2个。小明现在有多少个苹果?
A:
"""
cot_prompt = """
Q: 小明有5个苹果,小红给了他3个,小明吃掉了2个。小明现在有多少个苹果?
让我们一步步思考:
1. 小明最初有5个苹果
2. 小红给了他3个,所以 5 + 3 = 8 个
3. 小明吃掉了2个,所以 8 - 2 = 6 个
因此小明现在有6个苹果。
A:
"""
# 思维链效果演示
def generate_with_prompt(prompt, max_length=200):
inputs = tokenizer(prompt, return_tensors="pt")
outputs = model.generate(
**inputs,
max_new_tokens=max_length,
temperature=0.7,
do_sample=True
)
return tokenizer.decode(outputs[0], skip_special_tokens=True)
print("标准提示结果:")
print(generate_with_prompt(standard_prompt))
print("\n思维链提示结果:")
print(generate_with_prompt(cot_prompt))
3. 预训练技术¶
3.1 Next Token Prediction (NTP)¶
Next Token Prediction 是 GPT 系列模型的核心预训练目标:
# Next Token Prediction 实现
import torch
import torch.nn as nn
class NextTokenPredictionLoss(nn.Module):
"""
NTP Loss 实现
目标: 最大化 P(x_t | x_{<t}) 的对数似然
"""
def __init__(self, ignore_index=-100):
super().__init__()
self.ignore_index = ignore_index
self.loss_fn = nn.CrossEntropyLoss(ignore_index=ignore_index)
def forward(self, logits, labels):
"""
Args:
logits: [batch_size, seq_len, vocab_size]
labels: [batch_size, seq_len] (target token ids)
Returns:
loss: scalar tensor
"""
# 将 logits 调整为 [batch_size * seq_len, vocab_size]
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
# 计算损失
loss = self.loss_fn(
shift_logits.view(-1, shift_logits.size(-1)),
shift_labels.view(-1)
)
return loss
class GPTPretrainingDataset:
"""
GPT 预训练数据集
"""
def __init__(self, texts, tokenizer, max_length=1024):
self.tokenizer = tokenizer
self.max_length = max_length
self.texts = texts
# 分词所有文本
self.tokenized = tokenizer(
texts,
truncation=True,
max_length=max_length,
return_overflowing_tokens=True,
return_attention_mask=False
)
def __len__(self):
return len(self.tokenized["input_ids"])
def __getitem__(self, idx):
input_ids = self.tokenized["input_ids"][idx]
# 标签 = 输入右移一位
labels = input_ids[1:] + [self.tokenizer.eos_token_id]
input_ids = input_ids[:-1]
return {
"input_ids": torch.tensor(input_ids, dtype=torch.long),
"labels": torch.tensor(labels, dtype=torch.long)
}
# 训练循环示例
def train_gpt_epoch(model, dataloader, optimizer, device):
model.train()
total_loss = 0
criterion = NextTokenPredictionLoss()
for batch in dataloader:
input_ids = batch["input_ids"].to(device)
labels = batch["labels"].to(device)
# 前向传播
logits = model(input_ids)
# 计算损失
loss = criterion(logits, labels)
# 反向传播
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
3.2 Masked Language Modeling (MLM)¶
MLM 是 BERT 系列模型的核心预训练目标:
# Masked Language Modeling 实现
import torch
import torch.nn as nn
import random
class MLMDataset:
"""
BERT MLM 数据集实现
"""
def __init__(self, texts, tokenizer, max_length=512, mask_prob=0.15):
self.tokenizer = tokenizer
self.max_length = max_length
self.mask_prob = mask_prob
# 分词
self.tokenized = tokenizer(
texts,
truncation=True,
max_length=max_length,
padding="max_length",
return_tensors="pt"
)
def __len__(self):
return self.tokenized["input_ids"].size(0)
def __getitem__(self, idx):
input_ids = self.tokenized["input_ids"][idx].clone()
labels = input_ids.clone()
# 创建 mask
probability_matrix = torch.full(input_ids.shape, self.mask_prob)
# 不对特殊 token 进行 mask
special_tokens_mask = (
(input_ids == self.tokenizer.cls_token_id) |
(input_ids == self.tokenizer.sep_token_id) |
(input_ids == self.tokenizer.pad_token_id)
)
probability_matrix.masked_fill_(special_tokens_mask, value=0.0)
# 随机选择要 mask 的位置
masked_indices = torch.bernoulli(probability_matrix).bool()
labels[~masked_indices] = -100 # 非 mask 位置不计算损失
# 80% 替换为 [MASK], 10% 替换为随机 token, 10% 保持原样
for i in range(len(input_ids)):
if masked_indices[i]:
r = random.random()
if r < 0.8:
input_ids[i] = self.tokenizer.mask_token_id
elif r < 0.9:
input_ids[i] = random.randint(
len(self.tokenizer),
len(self.tokenizer) - 1
)
# 10% 保持原样
return {
"input_ids": input_ids,
"labels": labels,
"attention_mask": self.tokenized["attention_mask"][idx]
}
class MLMLoss(nn.Module):
"""
MLM Loss 实现
"""
def __init__(self):
super().__init__()
self.loss_fn = nn.CrossEntropyLoss(ignore_index=-100)
def forward(self, logits, labels):
"""
Args:
logits: [batch_size, seq_len, vocab_size]
labels: [batch_size, seq_len]
"""
loss = self.loss_fn(
logits.view(-1, logits.size(-1)),
labels.view(-1)
)
return loss
3.3 课程学习 (Curriculum Learning)¶
课程学习是一种训练策略,从简单样本逐渐过渡到复杂样本:
# 课程学习实现
import torch
from torch.utils.data import Sampler
import numpy as np
class CurriculumSampler(Sampler):
"""
课程学习采样器
策略:随着训练进行,逐渐增加样本难度
"""
def __init__(self, dataset, batch_size, num_epochs,
difficulty_scores, strategy="linear"):
"""
Args:
dataset: 数据集
batch_size: 批大小
num_epochs: 训练轮数
difficulty_scores: 每个样本的难度分数 (0-1, 越高越难)
strategy: 课程策略 ("linear", "sqrt", "exponential")
"""
self.dataset = dataset
self.batch_size = batch_size
self.num_epochs = num_epochs
self.difficulty_scores = np.array(difficulty_scores)
self.strategy = strategy
# 排序样本(从易到难)
self.sorted_indices = np.argsort(self.difficulty_scores)
def __iter__(self):
# 计算当前 epoch 的最大难度阈值
progress = torch.distributed.get_rank() if torch.distributed.is_initialized() else 0
# 课程进度 (0 -> 1)
epoch = self._get_current_epoch()
curriculum_progress = epoch / self.num_epochs
# 根据策略计算阈值
if self.strategy == "linear":
threshold_percentile = curriculum_progress * 100
elif self.strategy == "sqrt":
threshold_percentile = np.sqrt(curriculum_progress) * 100
elif self.strategy == "exponential":
threshold_percentile = (curriculum_progress ** 2) * 100
else:
threshold_percentile = 100
# 选择难度在阈值内的样本
num_samples = int(len(self.dataset) * threshold_percentile / 100)
selected_indices = self.sorted_indices[:num_samples]
# 随机打乱
np.random.shuffle(selected_indices)
# 生成批次
for i in range(0, len(selected_indices), self.batch_size):
yield from selected_indices[i:i + self.batch_size]
def __len__(self):
return len(self.dataset)
class CurriculumLoss(nn.Module):
"""
课程学习损失
思路:让简单样本对损失的贡献更大
"""
def __init__(self, base_criterion):
super().__init__()
self.base_criterion = base_criterion
def forward(self, logits, labels, difficulty_weights=None):
"""
Args:
logits: 模型输出
labels: 真实标签
difficulty_weights: 样本难度权重 (可选)
"""
if difficulty_weights is not None:
# 加权损失
loss = self.base_criterion(logits, labels)
loss = loss * difficulty_weights
return loss.mean()
else:
return self.base_criterion(logits, labels)
# 使用示例
def curriculum_training_example():
"""
课程学习训练流程示例
"""
# 1. 定义数据难度(可以使用多种指标)
# 例如:句子长度、稀有词比例、语法复杂度等
difficulty_metrics = {
"sentence_length": [10, 25, 50, 80, 100], # 句子长度
"rare_word_ratio": [0.1, 0.2, 0.3, 0.4, 0.5], # 稀有词比例
}
# 2. 计算综合难度分数
# 归一化后加权平均
def compute_difficulty(length, rare_ratio):
# 长度归一化 (0-1)
length_norm = min(length / 100, 1.0)
# 综合难度
return 0.6 * length_norm + 0.4 * rare_ratio
difficulty_scores = [
compute_difficulty(l, r)
for l, r in zip(difficulty_metrics["sentence_length"],
difficulty_metrics["rare_word_ratio"])
]
# 3. 创建课程采样器
# sampler = CurriculumSampler(
# dataset=train_dataset,
# batch_size=32,
# num_epochs=10,
# difficulty_scores=difficulty_scores,
# strategy="sqrt"
# )
print("课程学习配置:")
print(f" 难度分数: {difficulty_scores}")
print(f" 训练策略: 从易到难,逐步增加样本复杂度")
return difficulty_scores
4. 指令微调与对齐¶
4.1 Instruction Tuning (指令微调)¶
指令微调是对预训练模型进行多任务微调,使其能够理解和执行各种自然语言指令:
# 指令微调数据处理
from transformers import InstructDataset
class InstructionDataset:
"""
指令微调数据集
格式要求:
{
"instruction": "指令",
"input": "输入内容(可选)",
"output": "期望输出"
}
"""
def __init__(self, data_path, tokenizer, max_length=2048):
self.tokenizer = tokenizer
self.max_length = max_length
# 加载数据
import json
with open(data_path, 'r', encoding='utf-8') as f:
self.data = json.load(f)
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
item = self.data[idx]
# 构建指令模板
if item.get("input"):
prompt = f"""Instruction: {item['instruction']}
Input: {item['input']}
Response: """
else:
prompt = f"""Instruction: {item['instruction']}
Response: """
# 完整文本 (用于训练)
full_text = prompt + item['output']
# 分词
inputs = self.tokenizer(
prompt,
truncation=True,
max_length=self.max_length,
padding="max_length",
return_tensors="pt"
)
labels = self.tokenizer(
full_text,
truncation=True,
max_length=self.max_length,
padding="max_length",
return_tensors="pt"
)
# 设置标签:只计算 Response 部分的损失
# Prompt 部分的损失不反向传播
label_ids = labels["input_ids"].squeeze()
prompt_len = inputs["input_ids"].size(1)
# Prompt 部分设为 -100 (忽略)
label_ids[:prompt_len-1] = -100
return {
"input_ids": inputs["input_ids"].squeeze(),
"attention_mask": inputs["attention_mask"].squeeze(),
"labels": label_ids
}
# 指令微调训练配置
INSTRUCTION_TUNING_CONFIG = {
"training_steps": 10000,
"learning_rate": 2e-5,
"warmup_steps": 500,
"batch_size": 8,
"gradient_accumulation_steps": 4,
"max_seq_length": 2048,
"weight_decay": 0.01,
"lora": {
"r": 8,
"lora_alpha": 16,
"lora_dropout": 0.05,
"target_modules": ["q_proj", "v_proj", "k_proj", "o_proj"]
}
}
4.2 RLHF (Reinforcement Learning from Human Feedback)¶
RLHF 是让模型与人类偏好对齐的核心技术:
┌─────────────────────────────────────────────────────────────────┐
│ RLHF 流程图 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 预训练模型 │────►│ 奖励模型 │────►│ PPO 训练 │ │
│ │ (SFT模型) │ │ (RM) │ │ (PPO) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ↑ ↑ ↑ │
│ │ │ │ │
│ └────────────────────┴───────────────────┘ │
│ 人类反馈信号 │
│ │
│ Step 1: 收集人类偏好数据 ─────────────────────────────────► │
│ Step 2: 训练奖励模型 ─────────────────────────────────────► │
│ Step 3: 使用 PPO 优化策略 ─────────────────────────────────► │
│ │
└─────────────────────────────────────────────────────────────────┘
# RLHF 实现
import torch
import torch.nn as nn
from transformers import AutoModelForCausalLM, AutoTokenizer
class RewardModel(nn.Module):
"""
奖励模型 (Reward Model)
输入: 提示 + 响应
输出: 奖励分数 (标量)
"""
def __init__(self, base_model_name):
super().__init__()
self.base_model = AutoModelForCausalLM.from_pretrained(base_model_name)
# 添加奖励输出层
self.reward_head = nn.Linear(self.base_model.config.hidden_size, 1)
def forward(self, input_ids, attention_mask=None):
outputs = self.base_model(input_ids, attention_mask=attention_mask)
# 使用最后一个 token 的表示作为句子表示
last_hidden = outputs.last_hidden_state
reward = self.reward_head(last_hidden[:, -1, :])
return reward
class PPOTrainer:
"""
PPO 训练器用于 RLHF
目标: 最大化期望奖励同时限制与参考模型的偏离
"""
def __init__(self, policy_model, ref_model, reward_model,
tokenizer, device="cuda"):
self.policy_model = policy_model
self.ref_model = ref_model
self.reward_model = reward_model
self.tokenizer = tokenizer
self.device = device
# PPO 超参数
self.gamma = 1.0
self.lam = 0.95
self.clip_epsilon = 0.2
self.value_coef = 0.5
self.entropy_coef = 0.01
def compute_log_probs(self, model, input_ids, action_ids):
"""
计算给定动作的对数概率
"""
outputs = model(input_ids)
logits = outputs.logits
# 只看 action 位置
action_logits = logits[:, :-1, :]
action_log_probs = torch.log_softmax(action_logits, dim=-1)
# 提取实际动作对应的 log prob
log_probs = action_log_probs.gather(
dim=-1,
index=action_ids[:, 1:].unsqueeze(-1)
).squeeze(-1)
return log_probs
def ppo_step(self, batch_prompts, batch_responses, batch_rewards):
"""
单步 PPO 更新
Args:
batch_prompts: 提示列表
batch_responses: 生成的响应
batch_rewards: 奖励分数
"""
# 1. 计算旧策略的对数概率
old_log_probs = self.compute_log_probs(
self.policy_model,
batch_responses["input_ids"],
batch_responses["input_ids"]
).detach()
# 2. 生成新响应 (策略模型)
# 这里简化处理,实际应该使用 generate
new_responses = self.policy_model.generate(
**batch_prompts,
max_new_tokens=256,
do_sample=True,
temperature=0.7
)
# 3. 计算新策略的对数概率
new_log_probs = self.compute_log_probs(
self.policy_model,
new_responses,
new_responses
)
# 4. 计算奖励模型分数
with torch.no_grad():
rewards = self.reward_model(new_responses)
# 5. 计算 PPO 损失
ratio = torch.exp(new_log_probs - old_log_probs)
# 剪裁
clipped_ratio = torch.clamp(ratio, 1 - self.clip_epsilon,
1 + self.clip_epsilon)
# 策略损失 (最大化)
policy_loss = -torch.min(ratio, clipped_ratio).mean()
# 价值损失
# 简化:使用奖励作为价值目标
value_loss = nn.functional.mse_loss(
rewards.squeeze(-1),
batch_rewards
)
# 熵奖励 (鼓励探索)
entropy_loss = -self.entropy_coef * self.compute_entropy(
self.policy_model, new_responses
)
# KL 散度 (限制偏离参考模型)
kl_loss = self.compute_kl(new_responses)
# 总损失
total_loss = policy_loss + self.value_coef * value_loss + \
entropy_loss + kl_loss
# 反向传播
total_loss.backward()
return {
"policy_loss": policy_loss.item(),
"value_loss": value_loss.item(),
"kl_loss": kl_loss.item(),
"reward": rewards.mean().item()
}
def compute_entropy(self, model, input_ids):
"""计算策略熵"""
outputs = model(input_ids)
logits = outputs.logits[:, :-1, :]
probs = torch.softmax(logits, dim=-1)
log_probs = torch.log_softmax(logits, dim=-1)
entropy = -(probs * log_probs).sum(dim=-1).mean()
return entropy
def compute_kl(self, response_ids):
"""计算与参考模型的 KL 散度"""
with torch.no_grad():
ref_log_probs = self.compute_log_probs(
self.ref_model,
response_ids,
response_ids
)
policy_log_probs = self.compute_log_probs(
self.policy_model,
response_ids,
response_ids
)
kl = (torch.exp(policy_log_probs) * (policy_log_probs - ref_log_probs)).sum(dim=-1).mean()
return kl
4.3 DPO (Direct Preference Optimization)¶
DPO 是一种无需显式奖励模型的直接对齐方法:
# DPO 实现
import torch
import torch.nn as nn
class DPOTrainer:
"""
Direct Preference Optimization (DPO)
核心思想: 直接使用偏好数据优化策略,绕过奖励模型
目标: 最大化偏好数据的对数似然
"""
def __init__(self, policy_model, ref_model, beta=0.1):
"""
Args:
policy_model: 待优化的策略模型
ref_model: 参考模型 (通常是 SFT 模型)
beta: 温度参数,控制与参考模型的偏离程度
"""
self.policy_model = policy_model
self.ref_model = ref_model
self.beta = beta
def dpo_loss(self, prompt_ids, chosen_ids, rejected_ids):
"""
计算 DPO 损失
Args:
prompt_ids: 提示的 token ids
chosen_ids: 偏好响应 (chosen) 的 token ids
rejected_ids: 非偏好响应 (rejected) 的 token ids
"""
# 合并 prompt 和 response
chosen_input = torch.cat([prompt_ids, chosen_ids], dim=1)
rejected_input = torch.cat([prompt_ids, rejected_ids], dim=1)
# 计算 policy 的对数概率
chosen_log_probs = self._compute_log_prob(chosen_input)
rejected_log_probs = self._compute_log_prob(rejected_input)
# 计算参考模型的对数概率
with torch.no_grad():
ref_chosen_log_probs = self._compute_log_prob(chosen_input, use_ref=True)
ref_rejected_log_probs = self._compute_log_prob(rejected_input, use_ref=True)
# 计算优势 (advantage)
chosen_rewards = self.beta * (chosen_log_probs - ref_chosen_log_probs)
rejected_rewards = self.beta * (rejected_log_probs - ref_rejected_log_probs)
# DPO 损失
loss = -torch.log(torch.sigmoid(chosen_rewards - rejected_rewards)).mean()
return loss
def _compute_log_prob(self, input_ids, use_ref=False):
"""
计算序列的对数概率
Args:
input_ids: [batch_size, seq_len]
use_ref: 是否使用参考模型
"""
model = self.ref_model if use_ref else self.policy_model
outputs = model(input_ids)
logits = outputs.logits[:, :-1, :] # 移到最后一位
# 计算 log softmax
log_probs = torch.log_softmax(logits, dim=-1)
# 提取目标 token 的 log prob
target_ids = input_ids[:, 1:] # 目标右移一位
gathered_log_probs = log_probs.gather(
dim=-1,
index=target_ids.unsqueeze(-1)
).squeeze(-1)
# 平均 (或求和)
return gathered_log_probs.mean(dim=-1)
# ORPO: Odds-Ratio Preference Optimization
class ORPOTrainer:
"""
ORPO (Odds-Ratio Preference Optimization)
核心思想: 使用 odds ratio 直接衡量偏好
"""
def __init__(self, policy_model, ref_model, lambda_odds=0.5):
self.policy_model = policy_model
self.ref_model = ref_model
self.lambda_odds = lambda_odds
def orpo_loss(self, prompt_ids, chosen_ids, rejected_ids):
"""
ORPO 损失
目标: 最小化 chosen 的 odds ratio
"""
# 计算 policy 概率
chosen_log_prob = self._compute_seq_log_prob(chosen_ids)
rejected_log_prob = self._compute_seq_log_prob(rejected_ids)
# 计算参考模型概率
with torch.no_grad():
ref_chosen_log_prob = self._compute_seq_log_prob(chosen_ids, use_ref=True)
ref_rejected_log_prob = self._compute_seq_log_prob(rejected_ids, use_ref=True)
# Odds Ratio 损失
# P(chosen) / (P(chosen) + P(rejected))
chosen_odds = torch.exp(chosen_log_prob)
rejected_odds = torch.exp(rejected_log_prob)
odds_ratio = chosen_odds / (chosen_odds + rejected_odds + 1e-8)
# 参考模型的 odds ratio
ref_chosen_odds = torch.exp(ref_chosen_log_prob)
ref_rejected_odds = torch.exp(ref_rejected_log_prob)
ref_odds_ratio = ref_chosen_odds / (ref_chosen_odds + ref_rejected_odds + 1e-8)
# ORPO 损失
loss = -torch.log(odds_ratio).mean() + \
self.lambda_odds * torch.log(odds_ratio / (ref_odds_ratio + 1e-8)).mean()
return loss
def _compute_seq_log_prob(self, input_ids, use_ref=False):
"""计算序列的 log probability"""
model = self.ref_model if use_ref else self.policy_model
outputs = model(input_ids)
logits = outputs.logits[:, :-1, :]
log_probs = torch.log_softmax(logits, dim=-1)
target_ids = input_ids[:, 1:]
seq_log_prob = log_probs.gather(
dim=-1,
index=target_ids.unsqueeze(-1)
).squeeze(-1).sum(dim=-1)
return seq_log_prob
5. 总结与展望¶
5.1 核心要点回顾¶
┌─────────────────────────────────────────────────────────────────┐
│ DL → LLM 演进全景图 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. NLP预训练范式 │
│ Word2Vec → ELMo → BERT → GPT │
│ 核心: 迁移学习 + 预训练 + 微调 │
│ │
│ 2. Transformer 架构 │
│ Encoder-only (BERT) - 理解任务 │
│ Decoder-only (GPT) - 生成任务 │
│ Encoder-Decoder - Seq2Seq 任务 │
│ │
│ 3. 规模效应 │
│ Scaling Laws: 性能 ~ N^α, D^β │
│ 涌现能力: >6B 参数出现新能力 │
│ │
│ 4. 预训练技术 │
│ NTP: 自回归预测 │
│ MLM: 掩码语言建模 │
│ Curriculum Learning: 渐进学习 │
│ │
│ 5. 对齐技术 │
│ Instruction Tuning: 指令跟随 │
│ RLHF: 人类反馈强化学习 │
│ DPO/ORPO: 直接偏好优化 │
│ │
└─────────────────────────────────────────────────────────────────┘
5.2 延伸阅读¶
| 主题 | 关键论文 | 年份 |
|---|---|---|
| Word2Vec | Efficient Estimation of Word Representations in Vector Space | 2013 |
| ELMo | Deep contextualized word representations | 2018 |
| BERT | BERT: Pre-training of Deep Bidirectional Transformers | 2018 |
| GPT-2 | Language Models are Unsupervised Multitask Learners | 2019 |
| GPT-3 | Language Models are Few-Shot Learners | 2020 |
| Scaling Laws | Scaling Laws for Neural Language Models | 2020 |
| Chinchilla | Training Compute-Optimal Large Language Models | 2022 |
| InstructGPT | Training language models to follow instructions with human feedback | 2022 |
| DPO | Direct Preference Optimization: Your Language Model is a Reward Model | 2023 |
📚 继续学习:建议结合
22-大语言模型原理.md深入理解LLM的技术细节,并参考LLM学习/目录进行实践。