跳转至

04 - 对齐技术(全面版)

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

📌 定位说明:本章是对齐技术的主版本,侧重RLHF/DPO/PPO等对齐方法的全景原理。 - 📖 应用安全与合规视角请参考 LLM应用/14-大模型安全与对齐

学习目标:深入理解大模型对齐技术,包括RLHF、DPO、PPO等方法的原理与实现。


目录

  1. 对齐技术概述
  2. 基于人类反馈的强化学习 (RLHF)
  3. 直接偏好优化 (DPO)
  4. 其他对齐方法
  5. 对齐技术实践

对齐技术概述

1.1 什么是对齐

Text Only
对齐(Alignment)

核心问题:如何让大模型的行为符合人类意图和价值观?

预训练的问题:
├── 模型从海量互联网数据学习
├── 可能包含有害、偏见、错误信息
├── 不理解人类真实意图
└── 可能产生不安全输出

对齐的目标:
├── 有用性(Helpful):回答用户问题
├── 诚实性(Honest):提供准确信息
├── 无害性(Harmless):避免有害输出
└── 可控性(Controllable):遵循指令

对齐的三种方法:
├── 1. 基于人类反馈的强化学习(RLHF)
├── 2. 直接偏好优化(DPO)
└── 3. 基于AI反馈的强化学习(RLAIF)

1.2 对齐技术对比

方法 复杂度 稳定性 效果 代表模型
RLHF 优秀 GPT-4, Claude, InstructGPT
DPO 优秀 Zephyr, Neural-Chat
SLiC 良好 -
IPO 良好 -
KTO 良好 -
ORPO 优秀 Llama 3.1
GRPO 优秀 DeepSeek-R1
SimPO 优秀 -

基于人类反馈的强化学习 (RLHF)

2.1 RLHF三阶段流程

Text Only
RLHF完整流程
═══════════════════════════════════════════════════════════════════

阶段1: 监督微调(SFT)
├── 数据:人类编写的指令-回答对
├── 目标:让模型学会遵循指令
├── 方法:标准语言模型微调
└── 输出:SFT模型

阶段2: 奖励模型训练(Reward Modeling)
├── 数据:同一问题的多个回答,人类标注偏好
├── 目标:学习人类偏好
├── 方法:训练奖励模型预测人类偏好
└── 输出:Reward Model

阶段3: 强化学习优化(RL Optimization)
├── 数据:使用SFT模型生成回答
├── 目标:最大化奖励,同时保持与SFT模型的相似性
├── 方法:PPO算法
└── 输出:对齐后的模型

═══════════════════════════════════════════════════════════════════

2.2 阶段1:监督微调 (SFT)

Python
class SFTTrainer:
    """
    监督微调训练器
    """
    def __init__(self, model, tokenizer, learning_rate=1e-5):
        self.model = model
        self.tokenizer = tokenizer
        self.optimizer = torch.optim.AdamW(
            model.parameters(),
            lr=learning_rate
        )

    def prepare_instruction_data(self, examples):
        """
        准备指令数据

        Args:
            examples: [{"instruction": "...", "input": "...", "output": "..."}]
        """
        formatted_texts = []

        for example in examples:
            # 构建prompt模板
            if example.get('input'):
                prompt = f"### Instruction:\n{example['instruction']}\n\n### Input:\n{example['input']}\n\n### Response:\n"
            else:
                prompt = f"### Instruction:\n{example['instruction']}\n\n### Response:\n"

            # 完整文本(prompt + response)
            full_text = prompt + example['output']
            formatted_texts.append(full_text)

        return formatted_texts

    def train_step(self, batch):
        """
        单步训练
        """
        self.model.train()

        # 前向传播
        outputs = self.model(
            input_ids=batch['input_ids'],
            attention_mask=batch['attention_mask'],
            labels=batch['labels']
        )

        loss = outputs.loss

        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()

# SFT数据格式示例
SFT_EXAMPLE = {
    "instruction": "解释什么是机器学习",
    "input": "",
    "output": "机器学习是人工智能的一个分支,它使计算机能够从数据中学习而无需明确编程..."
}

2.3 阶段2:奖励模型训练

Python
class RewardModel(nn.Module):
    """
    奖励模型

    基于Bradley-Terry模型,学习预测人类偏好
    """
    def __init__(self, base_model):
        super().__init__()  # super()调用父类方法
        self.base_model = base_model

        # 在模型输出上添加奖励头
        self.reward_head = nn.Linear(
            base_model.config.hidden_size,
            1,
            bias=False
        )

    def forward(self, input_ids, attention_mask):
        """
        前向传播

        Returns:
            rewards: [batch_size] 每个样本的奖励分数
        """
        # 获取模型最后一层隐藏状态
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            output_hidden_states=True
        )

        # 取最后一个token的隐藏状态
        last_hidden = outputs.hidden_states[-1]  # [batch, seq_len, hidden]  # [-1]负索引取最后一个元素

        # 找到每个序列的实际最后一个token(考虑padding)
        sequence_lengths = attention_mask.sum(dim=1) - 1
        batch_size = input_ids.size(0)

        # 提取最后一个token的表示
        last_token_hidden = last_hidden[
            torch.arange(batch_size),
            sequence_lengths
        ]

        # 计算奖励
        rewards = self.reward_head(last_token_hidden).squeeze(-1)

        return rewards

class RewardModelTrainer:
    """
    奖励模型训练器
    """
    def __init__(self, reward_model, learning_rate=1e-5):
        self.reward_model = reward_model
        self.optimizer = torch.optim.AdamW(
            reward_model.parameters(),
            lr=learning_rate
        )

    def compute_preference_loss(self, chosen_rewards, rejected_rewards):
        """
        计算偏好损失

        使用Bradley-Terry模型:
        P(y_w > y_l | x) = σ(r(x, y_w) - r(x, y_l))

        Loss = -log σ(r_θ(x, y_w) - r_θ(x, y_l))
        """
        # 计算奖励差距
        reward_diff = chosen_rewards - rejected_rewards

        # 使用log-sigmoid损失
        loss = -F.logsigmoid(reward_diff).mean()

        return loss

    def train_step(self, batch):
        """
        训练步骤

        batch包含:
        - chosen_input_ids: 人类偏好的回答
        - rejected_input_ids: 人类不喜欢的回答
        """
        self.reward_model.train()

        # 计算chosen的奖励
        chosen_rewards = self.reward_model(
            input_ids=batch['chosen_input_ids'],
            attention_mask=batch['chosen_attention_mask']
        )

        # 计算rejected的奖励
        rejected_rewards = self.reward_model(
            input_ids=batch['rejected_input_ids'],
            attention_mask=batch['rejected_attention_mask']
        )

        # 计算损失
        loss = self.compute_preference_loss(chosen_rewards, rejected_rewards)

        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # 计算准确率
        accuracy = (chosen_rewards > rejected_rewards).float().mean()

        return {
            'loss': loss.item(),
            'accuracy': accuracy.item(),
            'chosen_reward': chosen_rewards.mean().item(),
            'rejected_reward': rejected_rewards.mean().item()
        }

2.4 阶段3:PPO强化学习

Python
class PPOTrainer:
    """
    PPO(Proximal Policy Optimization)训练器
    """
    def __init__(
        self,
        policy_model,      # 策略模型(要训练的模型)
        reference_model,   # 参考模型(SFT模型,不更新)
        reward_model,      # 奖励模型
        value_model,       # 价值模型(可选)
        tokenizer,         # 分词器(用于padding处理等)
        learning_rate=1e-5,
        clip_epsilon=0.2,
        kl_coef=0.2,
        gamma=0.99,
        lam=0.95
    ):
        self.policy_model = policy_model
        self.reference_model = reference_model
        self.reward_model = reward_model
        self.value_model = value_model
        self.tokenizer = tokenizer

        self.optimizer = torch.optim.AdamW(
            policy_model.parameters(),
            lr=learning_rate
        )

        self.clip_epsilon = clip_epsilon
        self.kl_coef = kl_coef
        self.gamma = gamma
        self.lam = lam

    def generate_responses(self, prompts, max_length=256):
        """
        生成回答
        """
        self.policy_model.eval()

        with torch.no_grad():  # 禁用梯度计算,节省内存(推理时使用)
            outputs = self.policy_model.generate(
                input_ids=prompts,
                max_length=max_length,
                do_sample=True,
                temperature=0.7,
                return_dict_in_generate=True,
                output_scores=True
            )

        return outputs.sequences, outputs.scores

    def compute_rewards(self, sequences, attention_mask):
        """
        计算奖励

        奖励 = 奖励模型分数 - KL惩罚
        """
        with torch.no_grad():
            # 奖励模型分数
            reward_scores = self.reward_model(sequences, attention_mask)

            # 计算KL散度(与参考模型的差异)
            policy_logits = self.policy_model(sequences, attention_mask).logits
            reference_logits = self.reference_model(sequences, attention_mask).logits

            # KL散度计算
            policy_probs = F.softmax(policy_logits, dim=-1)
            reference_probs = F.softmax(reference_logits, dim=-1)

            kl_div = (policy_probs * (torch.log(policy_probs + 1e-10) - torch.log(reference_probs + 1e-10))).sum(-1)

            # 最终奖励
            rewards = reward_scores - self.kl_coef * kl_div.mean(dim=1)

        return rewards

    def compute_advantages(self, rewards, values):
        """
        计算优势函数(GAE)
        """
        advantages = []
        gae = 0

        for t in reversed(range(len(rewards))):
            if t == len(rewards) - 1:
                next_value = 0
            else:
                next_value = values[t + 1]

            delta = rewards[t] + self.gamma * next_value - values[t]
            gae = delta + self.gamma * self.lam * gae
            advantages.insert(0, gae)

        return torch.tensor(advantages)

    def ppo_loss(self, old_logprobs, new_logprobs, advantages):
        """
        计算PPO损失
        """
        # 计算比率
        ratio = torch.exp(new_logprobs - old_logprobs)

        # 裁剪目标
        surr1 = ratio * advantages
        surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages

        # PPO损失(取最小值,限制更新幅度)
        loss = -torch.min(surr1, surr2).mean()

        return loss

    def train_step(self, batch):
        """
        PPO训练步骤
        """
        self.policy_model.train()

        prompts = batch['prompts']
        old_sequences = batch['sequences']
        old_logprobs = batch['logprobs']

        # 计算奖励
        attention_mask = (old_sequences != self.tokenizer.pad_token_id).long()
        rewards = self.compute_rewards(old_sequences, attention_mask)

        # 计算价值(如果有价值模型)
        if self.value_model is not None:
            values = self.value_model(old_sequences, attention_mask)
            advantages = self.compute_advantages(rewards, values)
        else:
            advantages = rewards

        # 新的策略输出
        outputs = self.policy_model(old_sequences, attention_mask)
        new_logits = outputs.logits

        # 计算新的log概率
        new_logprobs = F.log_softmax(new_logits, dim=-1)
        new_logprobs = torch.gather(new_logprobs, 2, old_sequences.unsqueeze(-1)).squeeze(-1)  # unsqueeze增加一个维度
        new_logprobs = new_logprobs.mean(dim=1)  # 平均每个token的logprob

        # 计算PPO损失
        loss = self.ppo_loss(old_logprobs, new_logprobs, advantages)

        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.policy_model.parameters(), 1.0)
        self.optimizer.step()

        return {
            'loss': loss.item(),
            'reward': rewards.mean().item(),
            'advantage': advantages.mean().item()
        }

直接偏好优化 (DPO)

3.1 DPO原理

Text Only
DPO(Direct Preference Optimization)

核心思想:
├── 不需要显式的奖励模型
├── 不需要强化学习
├── 直接用偏好数据优化策略
└── 更简单、更稳定

Bradley-Terry模型到DPO损失的完整推导

第一步:Bradley-Terry偏好模型

给定提示 \(x\),人类更偏好回答 \(y_w\) 而非 \(y_l\) 的概率为:

\[ P(y_w \succ y_l \mid x) = \sigma(r(x, y_w) - r(x, y_l)) \]

其中 \(r(x, y)\) 是真实奖励函数,\(\sigma\) 是sigmoid函数。

第二步:RLHF的KL约束优化目标

RLHF要解决的优化问题是:

\[ \max_{\pi_\theta} \; \mathbb{E}_{x \sim D, \, y \sim \pi_\theta(\cdot|x)}[r(x, y)] - \beta \, \text{KL}[\pi_\theta(\cdot|x) \| \pi_{\text{ref}}(\cdot|x)] \]

第三步:最优策略的闭式解

对上式求解(拉格朗日对偶),最优策略为:

\[ \pi^*(y \mid x) = \frac{1}{Z(x)} \pi_{\text{ref}}(y \mid x) \exp\!\left(\frac{r(x, y)}{\beta}\right) \]

其中 \(Z(x) = \sum_y \pi_{\text{ref}}(y|x) \exp(r(x,y)/\beta)\) 是配分函数。

第四步:用策略表示奖励(关键步骤)

对上式取对数并整理,可以用策略反解出奖励:

\[ r(x, y) = \beta \log \frac{\pi^*(y \mid x)}{\pi_{\text{ref}}(y \mid x)} + \beta \log Z(x) \]

第五步:代入Bradley-Terry消除奖励

将上式代入Bradley-Terry模型(\(Z(x)\) 项在做差时对消):

\[ P(y_w \succ y_l \mid x) = \sigma\!\left(\beta \log \frac{\pi^*(y_w|x)}{\pi_{\text{ref}}(y_w|x)} - \beta \log \frac{\pi^*(y_l|x)}{\pi_{\text{ref}}(y_l|x)}\right) \]

第六步:DPO损失函数

用当前策略 \(\pi_\theta\) 代替最优策略 \(\pi^*\),对偏好数据集取负对数似然:

\[ \mathcal{L}_{\text{DPO}}(\pi_\theta; \pi_{\text{ref}}) = -\mathbb{E}_{(x, y_w, y_l) \sim D}\left[\log \sigma\!\left(\beta \log \frac{\pi_\theta(y_w|x)}{\pi_{\text{ref}}(y_w|x)} - \beta \log \frac{\pi_\theta(y_l|x)}{\pi_{\text{ref}}(y_l|x)}\right)\right] \]
Text Only
推导链总结:
Bradley-Terry偏好模型
    ↓ 定义了"奖励差→偏好概率"的映射
KL约束优化目标
    ↓ 拉格朗日对偶求解
最优策略闭式解 π*(y|x)
    ↓ 取对数,反解出 r(x,y)
用策略比率表达奖励
    ↓ 代入Bradley-Terry,Z(x)对消
DPO损失函数(无需奖励模型!)

3.2 DPO实现

Python
class DPOTrainer:
    """
    DPO(Direct Preference Optimization)训练器
    """
    def __init__(
        self,
        policy_model,      # 策略模型
        reference_model,   # 参考模型(SFT模型,不更新)
        beta=0.1,          # 温度参数
        learning_rate=1e-6
    ):
        self.policy_model = policy_model
        self.reference_model = reference_model
        self.beta = beta

        self.optimizer = torch.optim.AdamW(
            policy_model.parameters(),
            lr=learning_rate
        )

        # 参考模型不更新
        for param in self.reference_model.parameters():
            param.requires_grad = False

    def compute_log_probs(self, model, input_ids, attention_mask, labels):
        """
        计算序列的log概率

        只对response部分计算(labels != -100)
        """
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )

        logits = outputs.logits
        log_probs = F.log_softmax(logits, dim=-1)

        # 收集目标token的log概率
        # 移位以对齐预测
        log_probs = log_probs[:, :-1, :]
        labels = labels[:, 1:]

        # 获取每个位置的目标token log概率
        token_log_probs = torch.gather(
            log_probs,
            dim=2,
            index=labels.unsqueeze(2)
        ).squeeze(2)

        # 只对非padding位置求和
        mask = (labels != -100).float()
        token_log_probs = token_log_probs * mask

        # 每个样本的总log概率
        sequence_log_probs = token_log_probs.sum(dim=1)

        return sequence_log_probs

    def dpo_loss(self, policy_chosen_logps, policy_rejected_logps,
                 reference_chosen_logps, reference_rejected_logps):
        """
        计算DPO损失
        """
        # 计算log比率
        policy_logratios = policy_chosen_logps - policy_rejected_logps
        reference_logratios = reference_chosen_logps - reference_rejected_logps

        # DPO损失
        logits = self.beta * (policy_logratios - reference_logratios)
        loss = -F.logsigmoid(logits).mean()

        return loss

    def train_step(self, batch):
        """
        DPO训练步骤

        batch包含:
        - chosen_input_ids: 偏好的完整序列(prompt + chosen response)
        - rejected_input_ids: 不喜欢的完整序列(prompt + rejected response)
        """
        self.policy_model.train()

        # 计算策略模型的log概率
        policy_chosen_logps = self.compute_log_probs(
            self.policy_model,
            batch['chosen_input_ids'],
            batch['chosen_attention_mask'],
            batch['chosen_labels']
        )

        policy_rejected_logps = self.compute_log_probs(
            self.policy_model,
            batch['rejected_input_ids'],
            batch['rejected_attention_mask'],
            batch['rejected_labels']
        )

        # 计算参考模型的log概率(不计算梯度)
        with torch.no_grad():
            reference_chosen_logps = self.compute_log_probs(
                self.reference_model,
                batch['chosen_input_ids'],
                batch['chosen_attention_mask'],
                batch['chosen_labels']
            )

            reference_rejected_logps = self.compute_log_probs(
                self.reference_model,
                batch['rejected_input_ids'],
                batch['rejected_attention_mask'],
                batch['rejected_labels']
            )

        # 计算DPO损失
        loss = self.dpo_loss(
            policy_chosen_logps,
            policy_rejected_logps,
            reference_chosen_logps,
            reference_rejected_logps
        )

        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.policy_model.parameters(), 1.0)
        self.optimizer.step()

        # 计算准确率(策略模型是否能正确排序)
        with torch.no_grad():
            chosen_rewards = self.beta * (policy_chosen_logps - reference_chosen_logps)
            rejected_rewards = self.beta * (policy_rejected_logps - reference_rejected_logps)
            accuracy = (chosen_rewards > rejected_rewards).float().mean()

        return {
            'loss': loss.item(),
            'accuracy': accuracy.item(),
            'chosen_reward': chosen_rewards.mean().item(),
            'rejected_reward': rejected_rewards.mean().item()
        }

3.3 DPO vs RLHF

Text Only
DPO vs RLHF 对比
═══════════════════════════════════════════════════════════════════

特性              RLHF                    DPO
─────────────────────────────────────────────────────────────────
复杂度            高(三阶段)            低(单阶段)
稳定性            中(PPO可能不稳定)      高(监督学习)
训练速度          慢                      快
内存需求          高(需要4个模型)        低(需要2个模型)
超参数            多(学习率、clip等)      少(主要是beta)
效果              优秀                    优秀

模型需求:
RLHF: Policy + Reference + Reward + Value = 4个模型
DPO:  Policy + Reference = 2个模型

推荐使用DPO的场景:
├── 计算资源有限
├── 快速迭代
├── 稳定性要求高
└── 首次尝试对齐

推荐使用RLHF的场景:
├── 追求极致效果
├── 有大量计算资源
├── 有经验丰富的团队
└── 生产级应用

═══════════════════════════════════════════════════════════════════

其他对齐方法

4.1 IPO (Identity Preference Optimization)

Python
class IPOTrainer:
    """
    IPO训练器

    DPO的改进版本,使用均方误差代替对数损失
    """
    def __init__(self, policy_model, reference_model, beta=0.1, learning_rate=1e-6):
        self.policy_model = policy_model
        self.reference_model = reference_model
        self.beta = beta
        self.optimizer = torch.optim.AdamW(policy_model.parameters(), lr=learning_rate)

    def ipo_loss(self, policy_chosen_logps, policy_rejected_logps,
                 reference_chosen_logps, reference_rejected_logps):
        """
        IPO损失

        L = (log(π_θ(y_w|x)/π_ref(y_w|x)) - log(π_θ(y_l|x)/π_ref(y_l|x)) - 1/(2β))^2
        """
        policy_logratios = policy_chosen_logps - policy_rejected_logps
        reference_logratios = reference_chosen_logps - reference_rejected_logps

        # IPO目标:让差距等于1/(2*beta)
        diff = policy_logratios - reference_logratios
        target = 1.0 / (2 * self.beta)

        loss = ((diff - target) ** 2).mean()

        return loss

4.2 KTO (Kahneman-Tversky Optimization)

Python
class KTOTrainer:
    """
    KTO训练器

    不需要成对偏好数据,只需要好坏标签
    """
    def __init__(self, policy_model, reference_model, beta=0.1, learning_rate=1e-6):
        self.policy_model = policy_model
        self.reference_model = reference_model
        self.beta = beta
        self.optimizer = torch.optim.AdamW(policy_model.parameters(), lr=learning_rate)

    def kto_loss(self, policy_logps, reference_logps, is_desirable):
        """
        KTO损失

        对于desirable样本:最大化 r(x,y) - r(x,y')
        对于undesirable样本:最小化 r(x,y) - r(x,y')
        """
        # 隐式奖励
        implicit_reward = self.beta * (policy_logps - reference_logps)

        # Kahneman-Tversky损失
        if is_desirable:
            # 期望样本:奖励应该高
            loss = (1 - implicit_reward.sigmoid()).log()
        else:
            # 不期望样本:奖励应该低
            loss = implicit_reward.sigmoid().log()

        return -loss.mean()

4.3 ORPO (Odds Ratio Preference Optimization)

ORPO是2024年提出的重要对齐方法,被Llama 3.1等模型采用。它将SFT和对齐合并为单一训练阶段。

Python
class ORPOTrainer:
    """
    ORPO训练器(Odds Ratio Preference Optimization)

    核心思想:在SFT训练中同时加入偏好优化
    - 无需单独的SFT阶段
    - 使用odds ratio来衡量偏好差异
    - 被Llama 3.1采用
    """
    def __init__(self, model, tokenizer, beta=0.1, learning_rate=1e-5):
        self.model = model
        self.tokenizer = tokenizer
        self.beta = beta
        self.optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

    def compute_orpo_loss(self, chosen_logps, rejected_logps):
        """
        ORPO损失 = SFT损失 + 偏好损失

        偏好损失使用log odds ratio:
        L_pref = -log σ(log(odds_chosen) - log(odds_rejected))

        其中 odds = p / (1 - p)
        """
        # 计算odds ratio
        # 为数值稳定性,使用log空间计算
        log_odds_chosen = chosen_logps - torch.log(1 - chosen_logps.exp() + 1e-8)
        log_odds_rejected = rejected_logps - torch.log(1 - rejected_logps.exp() + 1e-8)

        # Odds Ratio损失
        log_odds_ratio = log_odds_chosen - log_odds_rejected
        orpo_loss = -F.logsigmoid(self.beta * log_odds_ratio).mean()

        return orpo_loss

    def train_step(self, batch):
        """
        ORPO训练步骤

        同时优化:
        1. SFT损失:最大化chosen response的似然
        2. 偏好损失:拉大chosen和rejected的差距
        """
        self.model.train()

        # 计算chosen的log概率(同时作为SFT损失)
        chosen_outputs = self.model(
            input_ids=batch['chosen_input_ids'],
            attention_mask=batch['chosen_attention_mask'],
            labels=batch['chosen_labels']
        )
        chosen_logps = self._compute_sequence_logprobs(chosen_outputs, batch['chosen_labels'])
        sft_loss = chosen_outputs.loss

        # 计算rejected的log概率
        with torch.no_grad():
            rejected_outputs = self.model(
                input_ids=batch['rejected_input_ids'],
                attention_mask=batch['rejected_attention_mask']
            )
        rejected_logps = self._compute_sequence_logprobs(rejected_outputs, batch['rejected_labels'])

        # ORPO总损失
        preference_loss = self.compute_orpo_loss(chosen_logps, rejected_logps)
        total_loss = sft_loss + self.beta * preference_loss

        # 反向传播
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()

        return {
            'sft_loss': sft_loss.item(),
            'preference_loss': preference_loss.item(),
            'total_loss': total_loss.item()
        }

    def _compute_sequence_logprobs(self, outputs, labels):
        """计算序列的平均log概率"""
        logits = outputs.logits[:, :-1, :]
        labels = labels[:, 1:]
        log_probs = F.log_softmax(logits, dim=-1)
        token_logps = log_probs.gather(2, labels.unsqueeze(-1)).squeeze(-1)
        mask = (labels != -100).float()
        return (token_logps * mask).sum(dim=1) / mask.sum(dim=1)

4.4 GRPO (Group Relative Policy Optimization)

GRPO是DeepSeek-R1使用的对齐方法,特别适合推理模型的对齐训练。

Python
class GRPOTrainer:
    """
    GRPO训练器(Group Relative Policy Optimization)

    核心思想:
    - 对同一问题生成多个回答(group)
    - 在group内部进行相对比较
    - 无需单独的奖励模型

    被DeepSeek-R1用于推理能力对齐
    """
    def __init__(
        self,
        model,
        ref_model,
        tokenizer,
        group_size=4,        # 每组生成4个回答
        beta=0.1,
        learning_rate=1e-5
    ):
        self.model = model
        self.ref_model = ref_model
        self.tokenizer = tokenizer
        self.group_size = group_size
        self.beta = beta
        self.optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

    def generate_group(self, prompt, num_samples=None):
        """
        为同一个prompt生成多个回答
        """
        if num_samples is None:
            num_samples = self.group_size

        responses = []
        for _ in range(num_samples):
            output = self.model.generate(
                prompt,
                max_length=512,
                do_sample=True,
                temperature=0.7
            )
            responses.append(output)
        return responses

    def compute_group_advantages(self, rewards):
        """
        计算组内相对优势

        advantage_i = reward_i - mean(rewards)
        """
        mean_reward = rewards.mean()
        advantages = rewards - mean_reward
        return advantages

    def grpo_loss(self, old_logprobs, new_logprobs, advantages, clip_eps=0.2):
        """
        GRPO损失(类似PPO但在组内归一化)
        """
        ratio = torch.exp(new_logprobs - old_logprobs)
        surr1 = ratio * advantages
        surr2 = torch.clamp(ratio, 1 - clip_eps, 1 + clip_eps) * advantages
        loss = -torch.min(surr1, surr2).mean()
        return loss

    def train_step(self, batch):
        """
        GRPO训练步骤

        流程:
        1. 对每个prompt生成group_size个回答
        2. 使用规则/模型给每个回答打分
        3. 计算组内相对优势
        4. 更新策略
        """
        self.model.train()
        total_loss = 0

        for prompt in batch['prompts']:
            # 生成多个回答
            responses = self.generate_group(prompt)

            # 评估每个回答(可以使用规则或模型)
            rewards = self._evaluate_responses(prompt, responses)

            # 计算组内相对优势
            advantages = self.compute_group_advantages(rewards)

            # 计算log概率
            old_logprobs = self._compute_logprobs(self.ref_model, prompt, responses)
            new_logprobs = self._compute_logprobs(self.model, prompt, responses)

            # GRPO损失
            loss = self.grpo_loss(old_logprobs, new_logprobs, advantages)
            total_loss += loss

        # 反向传播
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()

        return {'loss': total_loss.item()}

    def _evaluate_responses(self, prompt, responses):
        """
        评估回答质量

        可以使用:
        1. 规则奖励(如格式正确性、长度等)
        2. 另一个LLM作为judge
        3. 任务特定的评估函数
        """
        # 示例:使用简单的规则奖励
        rewards = []
        for response in responses:
            reward = 0.0
            # 长度奖励
            if 50 < len(response) < 500:
                reward += 0.5
            # 格式奖励(示例)
            if response.endswith('.'):
                reward += 0.2
            rewards.append(reward)
        return torch.tensor(rewards)

    def _compute_logprobs(self, model, prompt, responses):
        """计算每个response的log概率"""
        logprobs = []
        for response in responses:
            full_text = prompt + response
            inputs = self.tokenizer(full_text, return_tensors='pt')
            outputs = model(**inputs)
            # 计算response部分的log概率
            log_prob = self._get_response_logprob(outputs, inputs)
            logprobs.append(log_prob)
        return torch.stack(logprobs)

4.5 SimPO (Simple Preference Optimization)

SimPO简化了DPO,无需参考模型,直接优化偏好。

Python
class SimPOTrainer:
    """
    SimPO训练器(Simple Preference Optimization)

    核心思想:
    - 移除对参考模型的依赖
    - 使用平均token概率代替序列概率
    - 更简单、更高效
    """
    def __init__(self, model, tokenizer, beta=2.0, gamma=0.5, learning_rate=1e-5):
        self.model = model
        self.tokenizer = tokenizer
        self.beta = beta      # 温度参数
        self.gamma = gamma    # 奖励边际
        self.optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

    def compute_simpo_loss(self, chosen_logps, rejected_logps, seq_lengths):
        """
        SimPO损失

        L = -log σ(β * (p_chosen/|y_chosen| - p_rejected/|y_rejected|) - γ)

        使用平均token概率,并加入奖励边际γ
        """
        # 归一化为平均token概率
        chosen_avg_logp = chosen_logps / seq_lengths['chosen']
        rejected_avg_logp = rejected_logps / seq_lengths['rejected']

        # SimPO损失(带边际)
        logits = self.beta * (chosen_avg_logp - rejected_avg_logp) - self.gamma
        loss = -F.logsigmoid(logits).mean()

        return loss

    def train_step(self, batch):
        """SimPO训练步骤"""
        self.model.train()

        # 计算chosen的log概率
        chosen_outputs = self.model(
            input_ids=batch['chosen_input_ids'],
            attention_mask=batch['chosen_attention_mask']
        )
        chosen_logps = self._compute_avg_logprobs(
            chosen_outputs,
            batch['chosen_input_ids'],
            batch['chosen_attention_mask']
        )

        # 计算rejected的log概率
        rejected_outputs = self.model(
            input_ids=batch['rejected_input_ids'],
            attention_mask=batch['rejected_attention_mask']
        )
        rejected_logps = self._compute_avg_logprobs(
            rejected_outputs,
            batch['rejected_input_ids'],
            batch['rejected_attention_mask']
        )

        # 计算损失
        loss = self.compute_simpo_loss(
            chosen_logps,
            rejected_logps,
            {'chosen': batch['chosen_lengths'], 'rejected': batch['rejected_lengths']}
        )

        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return {'loss': loss.item()}

    def _compute_avg_logprobs(self, outputs, input_ids, attention_mask):
        """计算平均token log概率"""
        logits = outputs.logits[:, :-1, :]
        labels = input_ids[:, 1:]
        log_probs = F.log_softmax(logits, dim=-1)
        token_logps = log_probs.gather(2, labels.unsqueeze(-1)).squeeze(-1)
        mask = attention_mask[:, 1:]
        avg_logp = (token_logps * mask).sum(dim=1) / mask.sum(dim=1)
        return avg_logp

4.6 对齐方法对比(2024-2025更新)

Text Only
对齐方法对比(2024-2025)
═══════════════════════════════════════════════════════════════════

方法          需要RM   需要Ref   需要RL   数据要求        代表模型
───────────────────────────────────────────────────────────────────
RLHF (PPO)      ✓        ✓        ✓     成对偏好        GPT-4, Claude
DPO             ✗        ✓        ✗     成对偏好        Zephyr
KTO             ✗        ✓        ✗     二元标签        -
ORPO            ✗        ✗        ✗     成对偏好        Llama 3.1
GRPO            ✗        ✓        ✓*    组内比较        DeepSeek-R1
SimPO           ✗        ✗        ✗     成对偏好        -
IPO             ✗        ✓        ✗     成对偏好        -

* GRPO使用轻量级RL,无需单独训练奖励模型

选择建议:
├── 资源充足、追求极致效果 → RLHF (PPO)
├── 快速迭代、稳定性优先 → DPO / SimPO
├── 无参考模型、简化流程 → ORPO / SimPO
├── 推理模型对齐 → GRPO
└── 无成对偏好数据 → KTO

═══════════════════════════════════════════════════════════════════

对齐技术实践

5.1 完整训练流程

Python
class AlignmentPipeline:
    """
    对齐完整流程
    """
    def __init__(self, base_model_name, method='dpo'):
        self.base_model_name = base_model_name
        self.method = method

        # 加载基础模型和tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(base_model_name)
        self.base_model = AutoModelForCausalLM.from_pretrained(base_model_name)

    def stage1_sft(self, sft_data, output_dir, num_epochs=3):
        """
        阶段1:监督微调
        """
        print("=" * 60)
        print("Stage 1: Supervised Fine-Tuning")
        print("=" * 60)

        # 创建SFT训练器
        sft_trainer = SFTTrainer(
            model=self.base_model,
            tokenizer=self.tokenizer
        )

        # 准备数据
        train_dataloader = self._prepare_sft_dataloader(sft_data)

        # 训练
        for epoch in range(num_epochs):
            total_loss = 0
            for batch in train_dataloader:
                loss = sft_trainer.train_step(batch)
                total_loss += loss

            avg_loss = total_loss / len(train_dataloader)
            print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

        # 保存SFT模型
        self.sft_model = sft_trainer.model
        self.sft_model.save_pretrained(f"{output_dir}/sft_model")
        self.tokenizer.save_pretrained(f"{output_dir}/sft_model")

        print(f"SFT model saved to {output_dir}/sft_model")

        return self.sft_model

    def stage2_alignment(self, preference_data, output_dir, num_epochs=1):
        """
        阶段2:对齐训练(RLHF或DPO)
        """
        print("=" * 60)
        print(f"Stage 2: Alignment ({self.method.upper()})")
        print("=" * 60)

        # 加载参考模型(SFT模型的副本)
        reference_model = AutoModelForCausalLM.from_pretrained(
            f"{output_dir}/sft_model"
        )

        # 创建对齐训练器
        if self.method == 'dpo':
            trainer = DPOTrainer(
                policy_model=self.sft_model,
                reference_model=reference_model,
                beta=0.1
            )
        elif self.method == 'rlhf':
            # 需要先训练奖励模型
            reward_model = self._train_reward_model(preference_data)
            trainer = PPOTrainer(
                policy_model=self.sft_model,
                reference_model=reference_model,
                reward_model=reward_model
            )
        else:
            raise ValueError(f"Unknown method: {self.method}")

        # 准备数据
        train_dataloader = self._prepare_preference_dataloader(preference_data)

        # 训练
        for epoch in range(num_epochs):
            total_loss = 0
            for batch in train_dataloader:
                metrics = trainer.train_step(batch)
                total_loss += metrics['loss']

            avg_loss = total_loss / len(train_dataloader)
            print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

        # 保存对齐后的模型
        aligned_model = trainer.policy_model
        aligned_model.save_pretrained(f"{output_dir}/aligned_model")
        self.tokenizer.save_pretrained(f"{output_dir}/aligned_model")

        print(f"Aligned model saved to {output_dir}/aligned_model")

        return aligned_model

    def run_full_pipeline(self, sft_data, preference_data, output_dir):
        """
        运行完整流程
        """
        # Stage 1: SFT
        sft_model = self.stage1_sft(sft_data, output_dir)

        # Stage 2: Alignment
        aligned_model = self.stage2_alignment(preference_data, output_dir)

        return aligned_model

5.2 评估对齐效果

Python
class AlignmentEvaluator:
    """
    对齐效果评估
    """

    @staticmethod  # @staticmethod无需实例即可调用
    def evaluate_helpfulness(model, tokenizer, test_questions):
        """
        评估有用性
        """
        results = []

        for question in test_questions:
            inputs = tokenizer(question, return_tensors='pt')

            with torch.no_grad():
                outputs = model.generate(
                    **inputs,
                    max_length=256,
                    do_sample=True,
                    temperature=0.7
                )

            response = tokenizer.decode(outputs[0], skip_special_tokens=True)
            results.append({
                'question': question,
                'response': response
            })

        return results

    @staticmethod
    def evaluate_harmlessness(model, tokenizer, harmful_prompts):
        """
        评估无害性

        测试模型是否拒绝有害请求
        """
        results = []

        for prompt in harmful_prompts:
            inputs = tokenizer(prompt, return_tensors='pt')

            with torch.no_grad():
                outputs = model.generate(
                    **inputs,
                    max_length=256,
                    do_sample=True,
                    temperature=0.7
                )

            response = tokenizer.decode(outputs[0], skip_special_tokens=True)

            # 检查是否拒绝(简单启发式)
            refusal_keywords = ['sorry', 'cannot', 'unable', 'inappropriate', 'harmful']
            is_refused = any(keyword in response.lower() for keyword in refusal_keywords)  # any()任一为True则返回True

            results.append({
                'prompt': prompt,
                'response': response,
                'refused': is_refused
            })

        refusal_rate = sum(r['refused'] for r in results) / len(results)
        print(f"Refusal Rate: {refusal_rate:.2%}")

        return results

    @staticmethod
    def compare_models(before_model, after_model, tokenizer, test_prompts):
        """
        对比对齐前后的模型
        """
        comparison = []

        for prompt in test_prompts:
            inputs = tokenizer(prompt, return_tensors='pt')

            # 对齐前的输出
            with torch.no_grad():
                before_outputs = before_model.generate(
                    **inputs,
                    max_length=256,
                    do_sample=True,
                    temperature=0.7
                )
            before_response = tokenizer.decode(before_outputs[0], skip_special_tokens=True)

            # 对齐后的输出
            with torch.no_grad():
                after_outputs = after_model.generate(
                    **inputs,
                    max_length=256,
                    do_sample=True,
                    temperature=0.7
                )
            after_response = tokenizer.decode(after_outputs[0], skip_special_tokens=True)

            comparison.append({
                'prompt': prompt,
                'before': before_response,
                'after': after_response
            })

        return comparison

总结

对齐技术选择指南

方法 复杂度 稳定性 推荐场景
DPO 首选方法,快速迭代
RLHF 追求极致效果,资源充足
IPO DPO的改进版
KTO 无成对偏好数据

关键超参数

Python
# DPO推荐配置
DPO_CONFIG = {
    'beta': 0.1,              # 温度参数(0.05-0.5)
    'learning_rate': 1e-6,    # 学习率(要小)
    'batch_size': 4,          # 批次大小
    'num_epochs': 1,          # 通常1-3个epoch
    'max_length': 512,        # 最大序列长度
}

# RLHF推荐配置
RLHF_CONFIG = {
    'kl_coef': 0.2,           # KL惩罚系数
    'clip_epsilon': 0.2,      # PPO裁剪参数
    'learning_rate': 1e-5,    # 学习率
    'batch_size': 32,         # 批次大小
    'ppo_epochs': 4,          # 每个数据点的PPO epoch数
}

对齐最佳实践

  • 使用高质量的SFT模型作为起点
  • 偏好数据要多样且高质量
  • DPO的beta参数需要调优
  • 监控训练稳定性
  • 定期评估对齐效果
  • 结合人工评估
  • 2024-2025新增:考虑使用ORPO/SimPO简化训练流程
  • 2024-2025新增:推理模型对齐可尝试GRPO
  • 2024-2025新增:关注RLAIF(AI反馈强化学习)降低人工成本

2024-2025对齐技术趋势

Text Only
对齐技术发展趋势
═══════════════════════════════════════════════════════════════════

1. 简化流程
   ├── ORPO:SFT + 对齐合并为单阶段
   ├── SimPO:无需参考模型
   └── 趋势:减少训练复杂度

2. AI辅助对齐(RLAIF)
   ├── 使用LLM作为偏好标注器
   ├── Constitutional AI:自我批评与改进
   └── 降低人工标注成本

3. 推理能力对齐
   ├── GRPO:DeepSeek-R1采用
   ├── 强化推理过程而非仅结果
   └── 思维链质量优化

4. 多目标对齐
   ├── 同时优化有用性、诚实性、无害性
   ├── 帕累托最优权衡
   └── 用户可定制的价值观

5. 长上下文对齐
   ├── 长对话一致性
   ├── 长文档理解对齐
   └── 多轮交互价值观一致性

═══════════════════════════════════════════════════════════════════

下一步:学习05-大模型安全与对齐,深入了解安全训练和红队测试!


最后更新日期:2026-02-20 适用版本:LLM学习教程 v2026