04 - 对齐技术(全面版)¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
📌 定位说明:本章是对齐技术的主版本,侧重RLHF/DPO/PPO等对齐方法的全景原理。 - 📖 应用安全与合规视角请参考 LLM应用/14-大模型安全与对齐
学习目标:深入理解大模型对齐技术,包括RLHF、DPO、PPO等方法的原理与实现。
目录¶
对齐技术概述¶
1.1 什么是对齐¶
对齐(Alignment)
核心问题:如何让大模型的行为符合人类意图和价值观?
预训练的问题:
├── 模型从海量互联网数据学习
├── 可能包含有害、偏见、错误信息
├── 不理解人类真实意图
└── 可能产生不安全输出
对齐的目标:
├── 有用性(Helpful):回答用户问题
├── 诚实性(Honest):提供准确信息
├── 无害性(Harmless):避免有害输出
└── 可控性(Controllable):遵循指令
对齐的三种方法:
├── 1. 基于人类反馈的强化学习(RLHF)
├── 2. 直接偏好优化(DPO)
└── 3. 基于AI反馈的强化学习(RLAIF)
1.2 对齐技术对比¶
| 方法 | 复杂度 | 稳定性 | 效果 | 代表模型 |
|---|---|---|---|---|
| RLHF | 高 | 中 | 优秀 | GPT-4, Claude, InstructGPT |
| DPO | 中 | 高 | 优秀 | Zephyr, Neural-Chat |
| SLiC | 低 | 高 | 良好 | - |
| IPO | 中 | 高 | 良好 | - |
| KTO | 中 | 高 | 良好 | - |
| ORPO | 低 | 高 | 优秀 | Llama 3.1 |
| GRPO | 中 | 高 | 优秀 | DeepSeek-R1 |
| SimPO | 低 | 高 | 优秀 | - |
基于人类反馈的强化学习 (RLHF)¶
2.1 RLHF三阶段流程¶
RLHF完整流程
═══════════════════════════════════════════════════════════════════
阶段1: 监督微调(SFT)
├── 数据:人类编写的指令-回答对
├── 目标:让模型学会遵循指令
├── 方法:标准语言模型微调
└── 输出:SFT模型
阶段2: 奖励模型训练(Reward Modeling)
├── 数据:同一问题的多个回答,人类标注偏好
├── 目标:学习人类偏好
├── 方法:训练奖励模型预测人类偏好
└── 输出:Reward Model
阶段3: 强化学习优化(RL Optimization)
├── 数据:使用SFT模型生成回答
├── 目标:最大化奖励,同时保持与SFT模型的相似性
├── 方法:PPO算法
└── 输出:对齐后的模型
═══════════════════════════════════════════════════════════════════
2.2 阶段1:监督微调 (SFT)¶
class SFTTrainer:
"""
监督微调训练器
"""
def __init__(self, model, tokenizer, learning_rate=1e-5):
self.model = model
self.tokenizer = tokenizer
self.optimizer = torch.optim.AdamW(
model.parameters(),
lr=learning_rate
)
def prepare_instruction_data(self, examples):
"""
准备指令数据
Args:
examples: [{"instruction": "...", "input": "...", "output": "..."}]
"""
formatted_texts = []
for example in examples:
# 构建prompt模板
if example.get('input'):
prompt = f"### Instruction:\n{example['instruction']}\n\n### Input:\n{example['input']}\n\n### Response:\n"
else:
prompt = f"### Instruction:\n{example['instruction']}\n\n### Response:\n"
# 完整文本(prompt + response)
full_text = prompt + example['output']
formatted_texts.append(full_text)
return formatted_texts
def train_step(self, batch):
"""
单步训练
"""
self.model.train()
# 前向传播
outputs = self.model(
input_ids=batch['input_ids'],
attention_mask=batch['attention_mask'],
labels=batch['labels']
)
loss = outputs.loss
# 反向传播
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
# SFT数据格式示例
SFT_EXAMPLE = {
"instruction": "解释什么是机器学习",
"input": "",
"output": "机器学习是人工智能的一个分支,它使计算机能够从数据中学习而无需明确编程..."
}
2.3 阶段2:奖励模型训练¶
class RewardModel(nn.Module):
"""
奖励模型
基于Bradley-Terry模型,学习预测人类偏好
"""
def __init__(self, base_model):
super().__init__() # super()调用父类方法
self.base_model = base_model
# 在模型输出上添加奖励头
self.reward_head = nn.Linear(
base_model.config.hidden_size,
1,
bias=False
)
def forward(self, input_ids, attention_mask):
"""
前向传播
Returns:
rewards: [batch_size] 每个样本的奖励分数
"""
# 获取模型最后一层隐藏状态
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True
)
# 取最后一个token的隐藏状态
last_hidden = outputs.hidden_states[-1] # [batch, seq_len, hidden] # [-1]负索引取最后一个元素
# 找到每个序列的实际最后一个token(考虑padding)
sequence_lengths = attention_mask.sum(dim=1) - 1
batch_size = input_ids.size(0)
# 提取最后一个token的表示
last_token_hidden = last_hidden[
torch.arange(batch_size),
sequence_lengths
]
# 计算奖励
rewards = self.reward_head(last_token_hidden).squeeze(-1)
return rewards
class RewardModelTrainer:
"""
奖励模型训练器
"""
def __init__(self, reward_model, learning_rate=1e-5):
self.reward_model = reward_model
self.optimizer = torch.optim.AdamW(
reward_model.parameters(),
lr=learning_rate
)
def compute_preference_loss(self, chosen_rewards, rejected_rewards):
"""
计算偏好损失
使用Bradley-Terry模型:
P(y_w > y_l | x) = σ(r(x, y_w) - r(x, y_l))
Loss = -log σ(r_θ(x, y_w) - r_θ(x, y_l))
"""
# 计算奖励差距
reward_diff = chosen_rewards - rejected_rewards
# 使用log-sigmoid损失
loss = -F.logsigmoid(reward_diff).mean()
return loss
def train_step(self, batch):
"""
训练步骤
batch包含:
- chosen_input_ids: 人类偏好的回答
- rejected_input_ids: 人类不喜欢的回答
"""
self.reward_model.train()
# 计算chosen的奖励
chosen_rewards = self.reward_model(
input_ids=batch['chosen_input_ids'],
attention_mask=batch['chosen_attention_mask']
)
# 计算rejected的奖励
rejected_rewards = self.reward_model(
input_ids=batch['rejected_input_ids'],
attention_mask=batch['rejected_attention_mask']
)
# 计算损失
loss = self.compute_preference_loss(chosen_rewards, rejected_rewards)
# 反向传播
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 计算准确率
accuracy = (chosen_rewards > rejected_rewards).float().mean()
return {
'loss': loss.item(),
'accuracy': accuracy.item(),
'chosen_reward': chosen_rewards.mean().item(),
'rejected_reward': rejected_rewards.mean().item()
}
2.4 阶段3:PPO强化学习¶
class PPOTrainer:
"""
PPO(Proximal Policy Optimization)训练器
"""
def __init__(
self,
policy_model, # 策略模型(要训练的模型)
reference_model, # 参考模型(SFT模型,不更新)
reward_model, # 奖励模型
value_model, # 价值模型(可选)
tokenizer, # 分词器(用于padding处理等)
learning_rate=1e-5,
clip_epsilon=0.2,
kl_coef=0.2,
gamma=0.99,
lam=0.95
):
self.policy_model = policy_model
self.reference_model = reference_model
self.reward_model = reward_model
self.value_model = value_model
self.tokenizer = tokenizer
self.optimizer = torch.optim.AdamW(
policy_model.parameters(),
lr=learning_rate
)
self.clip_epsilon = clip_epsilon
self.kl_coef = kl_coef
self.gamma = gamma
self.lam = lam
def generate_responses(self, prompts, max_length=256):
"""
生成回答
"""
self.policy_model.eval()
with torch.no_grad(): # 禁用梯度计算,节省内存(推理时使用)
outputs = self.policy_model.generate(
input_ids=prompts,
max_length=max_length,
do_sample=True,
temperature=0.7,
return_dict_in_generate=True,
output_scores=True
)
return outputs.sequences, outputs.scores
def compute_rewards(self, sequences, attention_mask):
"""
计算奖励
奖励 = 奖励模型分数 - KL惩罚
"""
with torch.no_grad():
# 奖励模型分数
reward_scores = self.reward_model(sequences, attention_mask)
# 计算KL散度(与参考模型的差异)
policy_logits = self.policy_model(sequences, attention_mask).logits
reference_logits = self.reference_model(sequences, attention_mask).logits
# KL散度计算
policy_probs = F.softmax(policy_logits, dim=-1)
reference_probs = F.softmax(reference_logits, dim=-1)
kl_div = (policy_probs * (torch.log(policy_probs + 1e-10) - torch.log(reference_probs + 1e-10))).sum(-1)
# 最终奖励
rewards = reward_scores - self.kl_coef * kl_div.mean(dim=1)
return rewards
def compute_advantages(self, rewards, values):
"""
计算优势函数(GAE)
"""
advantages = []
gae = 0
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
next_value = 0
else:
next_value = values[t + 1]
delta = rewards[t] + self.gamma * next_value - values[t]
gae = delta + self.gamma * self.lam * gae
advantages.insert(0, gae)
return torch.tensor(advantages)
def ppo_loss(self, old_logprobs, new_logprobs, advantages):
"""
计算PPO损失
"""
# 计算比率
ratio = torch.exp(new_logprobs - old_logprobs)
# 裁剪目标
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages
# PPO损失(取最小值,限制更新幅度)
loss = -torch.min(surr1, surr2).mean()
return loss
def train_step(self, batch):
"""
PPO训练步骤
"""
self.policy_model.train()
prompts = batch['prompts']
old_sequences = batch['sequences']
old_logprobs = batch['logprobs']
# 计算奖励
attention_mask = (old_sequences != self.tokenizer.pad_token_id).long()
rewards = self.compute_rewards(old_sequences, attention_mask)
# 计算价值(如果有价值模型)
if self.value_model is not None:
values = self.value_model(old_sequences, attention_mask)
advantages = self.compute_advantages(rewards, values)
else:
advantages = rewards
# 新的策略输出
outputs = self.policy_model(old_sequences, attention_mask)
new_logits = outputs.logits
# 计算新的log概率
new_logprobs = F.log_softmax(new_logits, dim=-1)
new_logprobs = torch.gather(new_logprobs, 2, old_sequences.unsqueeze(-1)).squeeze(-1) # unsqueeze增加一个维度
new_logprobs = new_logprobs.mean(dim=1) # 平均每个token的logprob
# 计算PPO损失
loss = self.ppo_loss(old_logprobs, new_logprobs, advantages)
# 反向传播
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.policy_model.parameters(), 1.0)
self.optimizer.step()
return {
'loss': loss.item(),
'reward': rewards.mean().item(),
'advantage': advantages.mean().item()
}
直接偏好优化 (DPO)¶
3.1 DPO原理¶
DPO(Direct Preference Optimization)
核心思想:
├── 不需要显式的奖励模型
├── 不需要强化学习
├── 直接用偏好数据优化策略
└── 更简单、更稳定
Bradley-Terry模型到DPO损失的完整推导¶
第一步:Bradley-Terry偏好模型
给定提示 \(x\),人类更偏好回答 \(y_w\) 而非 \(y_l\) 的概率为:
其中 \(r(x, y)\) 是真实奖励函数,\(\sigma\) 是sigmoid函数。
第二步:RLHF的KL约束优化目标
RLHF要解决的优化问题是:
第三步:最优策略的闭式解
对上式求解(拉格朗日对偶),最优策略为:
其中 \(Z(x) = \sum_y \pi_{\text{ref}}(y|x) \exp(r(x,y)/\beta)\) 是配分函数。
第四步:用策略表示奖励(关键步骤)
对上式取对数并整理,可以用策略反解出奖励:
第五步:代入Bradley-Terry消除奖励
将上式代入Bradley-Terry模型(\(Z(x)\) 项在做差时对消):
第六步:DPO损失函数
用当前策略 \(\pi_\theta\) 代替最优策略 \(\pi^*\),对偏好数据集取负对数似然:
推导链总结:
Bradley-Terry偏好模型
↓ 定义了"奖励差→偏好概率"的映射
KL约束优化目标
↓ 拉格朗日对偶求解
最优策略闭式解 π*(y|x)
↓ 取对数,反解出 r(x,y)
用策略比率表达奖励
↓ 代入Bradley-Terry,Z(x)对消
DPO损失函数(无需奖励模型!)
3.2 DPO实现¶
class DPOTrainer:
"""
DPO(Direct Preference Optimization)训练器
"""
def __init__(
self,
policy_model, # 策略模型
reference_model, # 参考模型(SFT模型,不更新)
beta=0.1, # 温度参数
learning_rate=1e-6
):
self.policy_model = policy_model
self.reference_model = reference_model
self.beta = beta
self.optimizer = torch.optim.AdamW(
policy_model.parameters(),
lr=learning_rate
)
# 参考模型不更新
for param in self.reference_model.parameters():
param.requires_grad = False
def compute_log_probs(self, model, input_ids, attention_mask, labels):
"""
计算序列的log概率
只对response部分计算(labels != -100)
"""
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask
)
logits = outputs.logits
log_probs = F.log_softmax(logits, dim=-1)
# 收集目标token的log概率
# 移位以对齐预测
log_probs = log_probs[:, :-1, :]
labels = labels[:, 1:]
# 获取每个位置的目标token log概率
token_log_probs = torch.gather(
log_probs,
dim=2,
index=labels.unsqueeze(2)
).squeeze(2)
# 只对非padding位置求和
mask = (labels != -100).float()
token_log_probs = token_log_probs * mask
# 每个样本的总log概率
sequence_log_probs = token_log_probs.sum(dim=1)
return sequence_log_probs
def dpo_loss(self, policy_chosen_logps, policy_rejected_logps,
reference_chosen_logps, reference_rejected_logps):
"""
计算DPO损失
"""
# 计算log比率
policy_logratios = policy_chosen_logps - policy_rejected_logps
reference_logratios = reference_chosen_logps - reference_rejected_logps
# DPO损失
logits = self.beta * (policy_logratios - reference_logratios)
loss = -F.logsigmoid(logits).mean()
return loss
def train_step(self, batch):
"""
DPO训练步骤
batch包含:
- chosen_input_ids: 偏好的完整序列(prompt + chosen response)
- rejected_input_ids: 不喜欢的完整序列(prompt + rejected response)
"""
self.policy_model.train()
# 计算策略模型的log概率
policy_chosen_logps = self.compute_log_probs(
self.policy_model,
batch['chosen_input_ids'],
batch['chosen_attention_mask'],
batch['chosen_labels']
)
policy_rejected_logps = self.compute_log_probs(
self.policy_model,
batch['rejected_input_ids'],
batch['rejected_attention_mask'],
batch['rejected_labels']
)
# 计算参考模型的log概率(不计算梯度)
with torch.no_grad():
reference_chosen_logps = self.compute_log_probs(
self.reference_model,
batch['chosen_input_ids'],
batch['chosen_attention_mask'],
batch['chosen_labels']
)
reference_rejected_logps = self.compute_log_probs(
self.reference_model,
batch['rejected_input_ids'],
batch['rejected_attention_mask'],
batch['rejected_labels']
)
# 计算DPO损失
loss = self.dpo_loss(
policy_chosen_logps,
policy_rejected_logps,
reference_chosen_logps,
reference_rejected_logps
)
# 反向传播
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.policy_model.parameters(), 1.0)
self.optimizer.step()
# 计算准确率(策略模型是否能正确排序)
with torch.no_grad():
chosen_rewards = self.beta * (policy_chosen_logps - reference_chosen_logps)
rejected_rewards = self.beta * (policy_rejected_logps - reference_rejected_logps)
accuracy = (chosen_rewards > rejected_rewards).float().mean()
return {
'loss': loss.item(),
'accuracy': accuracy.item(),
'chosen_reward': chosen_rewards.mean().item(),
'rejected_reward': rejected_rewards.mean().item()
}
3.3 DPO vs RLHF¶
DPO vs RLHF 对比
═══════════════════════════════════════════════════════════════════
特性 RLHF DPO
─────────────────────────────────────────────────────────────────
复杂度 高(三阶段) 低(单阶段)
稳定性 中(PPO可能不稳定) 高(监督学习)
训练速度 慢 快
内存需求 高(需要4个模型) 低(需要2个模型)
超参数 多(学习率、clip等) 少(主要是beta)
效果 优秀 优秀
模型需求:
RLHF: Policy + Reference + Reward + Value = 4个模型
DPO: Policy + Reference = 2个模型
推荐使用DPO的场景:
├── 计算资源有限
├── 快速迭代
├── 稳定性要求高
└── 首次尝试对齐
推荐使用RLHF的场景:
├── 追求极致效果
├── 有大量计算资源
├── 有经验丰富的团队
└── 生产级应用
═══════════════════════════════════════════════════════════════════
其他对齐方法¶
4.1 IPO (Identity Preference Optimization)¶
class IPOTrainer:
"""
IPO训练器
DPO的改进版本,使用均方误差代替对数损失
"""
def __init__(self, policy_model, reference_model, beta=0.1, learning_rate=1e-6):
self.policy_model = policy_model
self.reference_model = reference_model
self.beta = beta
self.optimizer = torch.optim.AdamW(policy_model.parameters(), lr=learning_rate)
def ipo_loss(self, policy_chosen_logps, policy_rejected_logps,
reference_chosen_logps, reference_rejected_logps):
"""
IPO损失
L = (log(π_θ(y_w|x)/π_ref(y_w|x)) - log(π_θ(y_l|x)/π_ref(y_l|x)) - 1/(2β))^2
"""
policy_logratios = policy_chosen_logps - policy_rejected_logps
reference_logratios = reference_chosen_logps - reference_rejected_logps
# IPO目标:让差距等于1/(2*beta)
diff = policy_logratios - reference_logratios
target = 1.0 / (2 * self.beta)
loss = ((diff - target) ** 2).mean()
return loss
4.2 KTO (Kahneman-Tversky Optimization)¶
class KTOTrainer:
"""
KTO训练器
不需要成对偏好数据,只需要好坏标签
"""
def __init__(self, policy_model, reference_model, beta=0.1, learning_rate=1e-6):
self.policy_model = policy_model
self.reference_model = reference_model
self.beta = beta
self.optimizer = torch.optim.AdamW(policy_model.parameters(), lr=learning_rate)
def kto_loss(self, policy_logps, reference_logps, is_desirable):
"""
KTO损失
对于desirable样本:最大化 r(x,y) - r(x,y')
对于undesirable样本:最小化 r(x,y) - r(x,y')
"""
# 隐式奖励
implicit_reward = self.beta * (policy_logps - reference_logps)
# Kahneman-Tversky损失
if is_desirable:
# 期望样本:奖励应该高
loss = (1 - implicit_reward.sigmoid()).log()
else:
# 不期望样本:奖励应该低
loss = implicit_reward.sigmoid().log()
return -loss.mean()
4.3 ORPO (Odds Ratio Preference Optimization)¶
ORPO是2024年提出的重要对齐方法,被Llama 3.1等模型采用。它将SFT和对齐合并为单一训练阶段。
class ORPOTrainer:
"""
ORPO训练器(Odds Ratio Preference Optimization)
核心思想:在SFT训练中同时加入偏好优化
- 无需单独的SFT阶段
- 使用odds ratio来衡量偏好差异
- 被Llama 3.1采用
"""
def __init__(self, model, tokenizer, beta=0.1, learning_rate=1e-5):
self.model = model
self.tokenizer = tokenizer
self.beta = beta
self.optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
def compute_orpo_loss(self, chosen_logps, rejected_logps):
"""
ORPO损失 = SFT损失 + 偏好损失
偏好损失使用log odds ratio:
L_pref = -log σ(log(odds_chosen) - log(odds_rejected))
其中 odds = p / (1 - p)
"""
# 计算odds ratio
# 为数值稳定性,使用log空间计算
log_odds_chosen = chosen_logps - torch.log(1 - chosen_logps.exp() + 1e-8)
log_odds_rejected = rejected_logps - torch.log(1 - rejected_logps.exp() + 1e-8)
# Odds Ratio损失
log_odds_ratio = log_odds_chosen - log_odds_rejected
orpo_loss = -F.logsigmoid(self.beta * log_odds_ratio).mean()
return orpo_loss
def train_step(self, batch):
"""
ORPO训练步骤
同时优化:
1. SFT损失:最大化chosen response的似然
2. 偏好损失:拉大chosen和rejected的差距
"""
self.model.train()
# 计算chosen的log概率(同时作为SFT损失)
chosen_outputs = self.model(
input_ids=batch['chosen_input_ids'],
attention_mask=batch['chosen_attention_mask'],
labels=batch['chosen_labels']
)
chosen_logps = self._compute_sequence_logprobs(chosen_outputs, batch['chosen_labels'])
sft_loss = chosen_outputs.loss
# 计算rejected的log概率
with torch.no_grad():
rejected_outputs = self.model(
input_ids=batch['rejected_input_ids'],
attention_mask=batch['rejected_attention_mask']
)
rejected_logps = self._compute_sequence_logprobs(rejected_outputs, batch['rejected_labels'])
# ORPO总损失
preference_loss = self.compute_orpo_loss(chosen_logps, rejected_logps)
total_loss = sft_loss + self.beta * preference_loss
# 反向传播
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
return {
'sft_loss': sft_loss.item(),
'preference_loss': preference_loss.item(),
'total_loss': total_loss.item()
}
def _compute_sequence_logprobs(self, outputs, labels):
"""计算序列的平均log概率"""
logits = outputs.logits[:, :-1, :]
labels = labels[:, 1:]
log_probs = F.log_softmax(logits, dim=-1)
token_logps = log_probs.gather(2, labels.unsqueeze(-1)).squeeze(-1)
mask = (labels != -100).float()
return (token_logps * mask).sum(dim=1) / mask.sum(dim=1)
4.4 GRPO (Group Relative Policy Optimization)¶
GRPO是DeepSeek-R1使用的对齐方法,特别适合推理模型的对齐训练。
class GRPOTrainer:
"""
GRPO训练器(Group Relative Policy Optimization)
核心思想:
- 对同一问题生成多个回答(group)
- 在group内部进行相对比较
- 无需单独的奖励模型
被DeepSeek-R1用于推理能力对齐
"""
def __init__(
self,
model,
ref_model,
tokenizer,
group_size=4, # 每组生成4个回答
beta=0.1,
learning_rate=1e-5
):
self.model = model
self.ref_model = ref_model
self.tokenizer = tokenizer
self.group_size = group_size
self.beta = beta
self.optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
def generate_group(self, prompt, num_samples=None):
"""
为同一个prompt生成多个回答
"""
if num_samples is None:
num_samples = self.group_size
responses = []
for _ in range(num_samples):
output = self.model.generate(
prompt,
max_length=512,
do_sample=True,
temperature=0.7
)
responses.append(output)
return responses
def compute_group_advantages(self, rewards):
"""
计算组内相对优势
advantage_i = reward_i - mean(rewards)
"""
mean_reward = rewards.mean()
advantages = rewards - mean_reward
return advantages
def grpo_loss(self, old_logprobs, new_logprobs, advantages, clip_eps=0.2):
"""
GRPO损失(类似PPO但在组内归一化)
"""
ratio = torch.exp(new_logprobs - old_logprobs)
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - clip_eps, 1 + clip_eps) * advantages
loss = -torch.min(surr1, surr2).mean()
return loss
def train_step(self, batch):
"""
GRPO训练步骤
流程:
1. 对每个prompt生成group_size个回答
2. 使用规则/模型给每个回答打分
3. 计算组内相对优势
4. 更新策略
"""
self.model.train()
total_loss = 0
for prompt in batch['prompts']:
# 生成多个回答
responses = self.generate_group(prompt)
# 评估每个回答(可以使用规则或模型)
rewards = self._evaluate_responses(prompt, responses)
# 计算组内相对优势
advantages = self.compute_group_advantages(rewards)
# 计算log概率
old_logprobs = self._compute_logprobs(self.ref_model, prompt, responses)
new_logprobs = self._compute_logprobs(self.model, prompt, responses)
# GRPO损失
loss = self.grpo_loss(old_logprobs, new_logprobs, advantages)
total_loss += loss
# 反向传播
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
return {'loss': total_loss.item()}
def _evaluate_responses(self, prompt, responses):
"""
评估回答质量
可以使用:
1. 规则奖励(如格式正确性、长度等)
2. 另一个LLM作为judge
3. 任务特定的评估函数
"""
# 示例:使用简单的规则奖励
rewards = []
for response in responses:
reward = 0.0
# 长度奖励
if 50 < len(response) < 500:
reward += 0.5
# 格式奖励(示例)
if response.endswith('.'):
reward += 0.2
rewards.append(reward)
return torch.tensor(rewards)
def _compute_logprobs(self, model, prompt, responses):
"""计算每个response的log概率"""
logprobs = []
for response in responses:
full_text = prompt + response
inputs = self.tokenizer(full_text, return_tensors='pt')
outputs = model(**inputs)
# 计算response部分的log概率
log_prob = self._get_response_logprob(outputs, inputs)
logprobs.append(log_prob)
return torch.stack(logprobs)
4.5 SimPO (Simple Preference Optimization)¶
SimPO简化了DPO,无需参考模型,直接优化偏好。
class SimPOTrainer:
"""
SimPO训练器(Simple Preference Optimization)
核心思想:
- 移除对参考模型的依赖
- 使用平均token概率代替序列概率
- 更简单、更高效
"""
def __init__(self, model, tokenizer, beta=2.0, gamma=0.5, learning_rate=1e-5):
self.model = model
self.tokenizer = tokenizer
self.beta = beta # 温度参数
self.gamma = gamma # 奖励边际
self.optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
def compute_simpo_loss(self, chosen_logps, rejected_logps, seq_lengths):
"""
SimPO损失
L = -log σ(β * (p_chosen/|y_chosen| - p_rejected/|y_rejected|) - γ)
使用平均token概率,并加入奖励边际γ
"""
# 归一化为平均token概率
chosen_avg_logp = chosen_logps / seq_lengths['chosen']
rejected_avg_logp = rejected_logps / seq_lengths['rejected']
# SimPO损失(带边际)
logits = self.beta * (chosen_avg_logp - rejected_avg_logp) - self.gamma
loss = -F.logsigmoid(logits).mean()
return loss
def train_step(self, batch):
"""SimPO训练步骤"""
self.model.train()
# 计算chosen的log概率
chosen_outputs = self.model(
input_ids=batch['chosen_input_ids'],
attention_mask=batch['chosen_attention_mask']
)
chosen_logps = self._compute_avg_logprobs(
chosen_outputs,
batch['chosen_input_ids'],
batch['chosen_attention_mask']
)
# 计算rejected的log概率
rejected_outputs = self.model(
input_ids=batch['rejected_input_ids'],
attention_mask=batch['rejected_attention_mask']
)
rejected_logps = self._compute_avg_logprobs(
rejected_outputs,
batch['rejected_input_ids'],
batch['rejected_attention_mask']
)
# 计算损失
loss = self.compute_simpo_loss(
chosen_logps,
rejected_logps,
{'chosen': batch['chosen_lengths'], 'rejected': batch['rejected_lengths']}
)
# 反向传播
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return {'loss': loss.item()}
def _compute_avg_logprobs(self, outputs, input_ids, attention_mask):
"""计算平均token log概率"""
logits = outputs.logits[:, :-1, :]
labels = input_ids[:, 1:]
log_probs = F.log_softmax(logits, dim=-1)
token_logps = log_probs.gather(2, labels.unsqueeze(-1)).squeeze(-1)
mask = attention_mask[:, 1:]
avg_logp = (token_logps * mask).sum(dim=1) / mask.sum(dim=1)
return avg_logp
4.6 对齐方法对比(2024-2025更新)¶
对齐方法对比(2024-2025)
═══════════════════════════════════════════════════════════════════
方法 需要RM 需要Ref 需要RL 数据要求 代表模型
───────────────────────────────────────────────────────────────────
RLHF (PPO) ✓ ✓ ✓ 成对偏好 GPT-4, Claude
DPO ✗ ✓ ✗ 成对偏好 Zephyr
KTO ✗ ✓ ✗ 二元标签 -
ORPO ✗ ✗ ✗ 成对偏好 Llama 3.1
GRPO ✗ ✓ ✓* 组内比较 DeepSeek-R1
SimPO ✗ ✗ ✗ 成对偏好 -
IPO ✗ ✓ ✗ 成对偏好 -
* GRPO使用轻量级RL,无需单独训练奖励模型
选择建议:
├── 资源充足、追求极致效果 → RLHF (PPO)
├── 快速迭代、稳定性优先 → DPO / SimPO
├── 无参考模型、简化流程 → ORPO / SimPO
├── 推理模型对齐 → GRPO
└── 无成对偏好数据 → KTO
═══════════════════════════════════════════════════════════════════
对齐技术实践¶
5.1 完整训练流程¶
class AlignmentPipeline:
"""
对齐完整流程
"""
def __init__(self, base_model_name, method='dpo'):
self.base_model_name = base_model_name
self.method = method
# 加载基础模型和tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(base_model_name)
self.base_model = AutoModelForCausalLM.from_pretrained(base_model_name)
def stage1_sft(self, sft_data, output_dir, num_epochs=3):
"""
阶段1:监督微调
"""
print("=" * 60)
print("Stage 1: Supervised Fine-Tuning")
print("=" * 60)
# 创建SFT训练器
sft_trainer = SFTTrainer(
model=self.base_model,
tokenizer=self.tokenizer
)
# 准备数据
train_dataloader = self._prepare_sft_dataloader(sft_data)
# 训练
for epoch in range(num_epochs):
total_loss = 0
for batch in train_dataloader:
loss = sft_trainer.train_step(batch)
total_loss += loss
avg_loss = total_loss / len(train_dataloader)
print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")
# 保存SFT模型
self.sft_model = sft_trainer.model
self.sft_model.save_pretrained(f"{output_dir}/sft_model")
self.tokenizer.save_pretrained(f"{output_dir}/sft_model")
print(f"SFT model saved to {output_dir}/sft_model")
return self.sft_model
def stage2_alignment(self, preference_data, output_dir, num_epochs=1):
"""
阶段2:对齐训练(RLHF或DPO)
"""
print("=" * 60)
print(f"Stage 2: Alignment ({self.method.upper()})")
print("=" * 60)
# 加载参考模型(SFT模型的副本)
reference_model = AutoModelForCausalLM.from_pretrained(
f"{output_dir}/sft_model"
)
# 创建对齐训练器
if self.method == 'dpo':
trainer = DPOTrainer(
policy_model=self.sft_model,
reference_model=reference_model,
beta=0.1
)
elif self.method == 'rlhf':
# 需要先训练奖励模型
reward_model = self._train_reward_model(preference_data)
trainer = PPOTrainer(
policy_model=self.sft_model,
reference_model=reference_model,
reward_model=reward_model
)
else:
raise ValueError(f"Unknown method: {self.method}")
# 准备数据
train_dataloader = self._prepare_preference_dataloader(preference_data)
# 训练
for epoch in range(num_epochs):
total_loss = 0
for batch in train_dataloader:
metrics = trainer.train_step(batch)
total_loss += metrics['loss']
avg_loss = total_loss / len(train_dataloader)
print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")
# 保存对齐后的模型
aligned_model = trainer.policy_model
aligned_model.save_pretrained(f"{output_dir}/aligned_model")
self.tokenizer.save_pretrained(f"{output_dir}/aligned_model")
print(f"Aligned model saved to {output_dir}/aligned_model")
return aligned_model
def run_full_pipeline(self, sft_data, preference_data, output_dir):
"""
运行完整流程
"""
# Stage 1: SFT
sft_model = self.stage1_sft(sft_data, output_dir)
# Stage 2: Alignment
aligned_model = self.stage2_alignment(preference_data, output_dir)
return aligned_model
5.2 评估对齐效果¶
class AlignmentEvaluator:
"""
对齐效果评估
"""
@staticmethod # @staticmethod无需实例即可调用
def evaluate_helpfulness(model, tokenizer, test_questions):
"""
评估有用性
"""
results = []
for question in test_questions:
inputs = tokenizer(question, return_tensors='pt')
with torch.no_grad():
outputs = model.generate(
**inputs,
max_length=256,
do_sample=True,
temperature=0.7
)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
results.append({
'question': question,
'response': response
})
return results
@staticmethod
def evaluate_harmlessness(model, tokenizer, harmful_prompts):
"""
评估无害性
测试模型是否拒绝有害请求
"""
results = []
for prompt in harmful_prompts:
inputs = tokenizer(prompt, return_tensors='pt')
with torch.no_grad():
outputs = model.generate(
**inputs,
max_length=256,
do_sample=True,
temperature=0.7
)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
# 检查是否拒绝(简单启发式)
refusal_keywords = ['sorry', 'cannot', 'unable', 'inappropriate', 'harmful']
is_refused = any(keyword in response.lower() for keyword in refusal_keywords) # any()任一为True则返回True
results.append({
'prompt': prompt,
'response': response,
'refused': is_refused
})
refusal_rate = sum(r['refused'] for r in results) / len(results)
print(f"Refusal Rate: {refusal_rate:.2%}")
return results
@staticmethod
def compare_models(before_model, after_model, tokenizer, test_prompts):
"""
对比对齐前后的模型
"""
comparison = []
for prompt in test_prompts:
inputs = tokenizer(prompt, return_tensors='pt')
# 对齐前的输出
with torch.no_grad():
before_outputs = before_model.generate(
**inputs,
max_length=256,
do_sample=True,
temperature=0.7
)
before_response = tokenizer.decode(before_outputs[0], skip_special_tokens=True)
# 对齐后的输出
with torch.no_grad():
after_outputs = after_model.generate(
**inputs,
max_length=256,
do_sample=True,
temperature=0.7
)
after_response = tokenizer.decode(after_outputs[0], skip_special_tokens=True)
comparison.append({
'prompt': prompt,
'before': before_response,
'after': after_response
})
return comparison
总结¶
对齐技术选择指南¶
| 方法 | 复杂度 | 稳定性 | 推荐场景 |
|---|---|---|---|
| DPO | 低 | 高 | 首选方法,快速迭代 |
| RLHF | 高 | 中 | 追求极致效果,资源充足 |
| IPO | 低 | 高 | DPO的改进版 |
| KTO | 低 | 高 | 无成对偏好数据 |
关键超参数¶
# DPO推荐配置
DPO_CONFIG = {
'beta': 0.1, # 温度参数(0.05-0.5)
'learning_rate': 1e-6, # 学习率(要小)
'batch_size': 4, # 批次大小
'num_epochs': 1, # 通常1-3个epoch
'max_length': 512, # 最大序列长度
}
# RLHF推荐配置
RLHF_CONFIG = {
'kl_coef': 0.2, # KL惩罚系数
'clip_epsilon': 0.2, # PPO裁剪参数
'learning_rate': 1e-5, # 学习率
'batch_size': 32, # 批次大小
'ppo_epochs': 4, # 每个数据点的PPO epoch数
}
对齐最佳实践¶
- 使用高质量的SFT模型作为起点
- 偏好数据要多样且高质量
- DPO的beta参数需要调优
- 监控训练稳定性
- 定期评估对齐效果
- 结合人工评估
- 2024-2025新增:考虑使用ORPO/SimPO简化训练流程
- 2024-2025新增:推理模型对齐可尝试GRPO
- 2024-2025新增:关注RLAIF(AI反馈强化学习)降低人工成本
2024-2025对齐技术趋势¶
对齐技术发展趋势
═══════════════════════════════════════════════════════════════════
1. 简化流程
├── ORPO:SFT + 对齐合并为单阶段
├── SimPO:无需参考模型
└── 趋势:减少训练复杂度
2. AI辅助对齐(RLAIF)
├── 使用LLM作为偏好标注器
├── Constitutional AI:自我批评与改进
└── 降低人工标注成本
3. 推理能力对齐
├── GRPO:DeepSeek-R1采用
├── 强化推理过程而非仅结果
└── 思维链质量优化
4. 多目标对齐
├── 同时优化有用性、诚实性、无害性
├── 帕累托最优权衡
└── 用户可定制的价值观
5. 长上下文对齐
├── 长对话一致性
├── 长文档理解对齐
└── 多轮交互价值观一致性
═══════════════════════════════════════════════════════════════════
下一步:学习05-大模型安全与对齐,深入了解安全训练和红队测试!
最后更新日期:2026-02-20 适用版本:LLM学习教程 v2026