跳转至

04 - RLHF与人类反馈

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

📌 交叉引用:RLHF的完整讲解(含RLHF三阶段、DPO、PPO实现及对齐技术全景)请参考 LLM学习/03-系统与工程/04-对齐技术.md,本节侧重从强化学习算法视角理解RLHF,强调PPO、策略约束等RL基础在对齐中的应用。

学习时间: 4-5小时 重要性: ⭐⭐⭐⭐⭐ 大语言模型的核心技术 前置知识: PPO、策略约束


🎯 学习目标

完成本章后,你将能够: - 理解RLHF的核心思想 - 掌握奖励模型的训练 - 了解PPO+KL的实现 - 理解ChatGPT/InstructGPT的训练流程


1. RLHF简介

1.1 什么是RLHF

RLHF = Reinforcement Learning from Human Feedback

核心思想

使用人类反馈来训练奖励模型,然后用RL优化策略

1.2 为什么需要RLHF

传统RL的问题: - 奖励函数难以设计 - 难以捕捉人类偏好

RLHF的优势: - 从人类偏好学习 - 更好地对齐人类价值观 - 适用于复杂任务(如对话、写作)

1.3 应用场景

  • 大语言模型(ChatGPT、Claude)
  • 对话系统
  • 内容生成
  • 推荐系统

2. RLHF三阶段流程

Text Only
阶段1: 预训练(SFT)
├── 使用人类编写的示范数据
├── 监督微调(Supervised Fine-Tuning)
└── 得到初始策略模型

阶段2: 奖励模型训练
├── 收集人类偏好数据(比较)
├── 训练奖励模型预测人类偏好
└── 得到奖励模型

阶段3: RL优化
├── 使用PPO优化策略
├── KL散度约束(防止偏离太远)
└── 得到最终模型

3. 阶段1:监督微调(SFT)

Python
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import GPT2LMHeadModel, GPT2Tokenizer

class SFTTrainer:
    """监督微调训练器"""

    def __init__(self, model_name='gpt2', lr=5e-5):
        self.model = GPT2LMHeadModel.from_pretrained(model_name)
        self.tokenizer = GPT2Tokenizer.from_pretrained(model_name)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)

    def train_step(self, prompt, response):
        """
        训练一步

        参数:
            prompt: 输入提示
            response: 期望回复
        """
        # 编码输入
        full_text = prompt + response
        inputs = self.tokenizer(full_text, return_tensors='pt', truncation=True, max_length=512)
        labels = inputs['input_ids'].clone()

        # 只计算response部分的损失
        prompt_length = len(self.tokenizer(prompt)['input_ids'])
        labels[:, :prompt_length] = -100  # 忽略prompt部分

        # 前向传播
        outputs = self.model(**inputs, labels=labels)
        loss = outputs.loss

        # 反向传播
        self.optimizer.zero_grad()  # 清零梯度
        loss.backward()  # 反向传播计算梯度
        self.optimizer.step()  # 更新参数

        return loss.item()  # 将单元素张量转为Python数值

    def generate(self, prompt, max_length=100):
        """生成回复"""
        inputs = self.tokenizer(prompt, return_tensors='pt')

        with torch.no_grad():  # 禁用梯度计算,节省内存
            outputs = self.model.generate(
                **inputs,
                max_length=max_length,
                num_return_sequences=1,
                temperature=0.7,
                do_sample=True
            )

        return self.tokenizer.decode(outputs[0], skip_special_tokens=True)

4. 阶段2:奖励模型训练

4.1 偏好数据

数据格式

Text Only
(prompt, response_A, response_B, preference)

其中preference表示人类更喜欢哪个回复。

4.2 Bradley-Terry偏好模型

Bradley-Terry模型假设人类偏好由潜在的奖励分数决定——回复的奖励越高,被偏好的概率越大。

推导过程

  1. 假设每个回复有潜在质量分数 \(r_\theta(x, y)\)
  2. Bradley-Terry模型定义偏好概率为:
\[P(y_w \succ y_l | x) = \frac{\exp(r_\theta(x, y_w))}{\exp(r_\theta(x, y_w)) + \exp(r_\theta(x, y_l))} = \sigma(r_\theta(x, y_w) - r_\theta(x, y_l))\]

其中 \(\sigma(z) = \frac{1}{1+e^{-z}}\) 是sigmoid函数。

  1. 给定偏好数据集 \(\mathcal{D} = \{(x^{(i)}, y_w^{(i)}, y_l^{(i)})\}\),最大化对数似然:
\[\max_\theta \sum_{i} \log P(y_w^{(i)} \succ y_l^{(i)} | x^{(i)}) = \sum_i \log \sigma(r_\theta(x^{(i)}, y_w^{(i)}) - r_\theta(x^{(i)}, y_l^{(i)}))\]
  1. 等价地,最小化负对数似然(即交叉熵损失):
\[\mathcal{L}_{RM}(\theta) = -\mathbb{E}_{(x, y_w, y_l) \sim \mathcal{D}} \left[ \log \sigma(r_\theta(x, y_w) - r_\theta(x, y_l)) \right]\]

其中: - \(y_w\):人类偏好的回复(winner) - \(y_l\):人类不偏好的回复(loser) - \(r_\theta\):参数化的奖励模型

与DPO的联系:DPO跳过了显式训练奖励模型的步骤,将最优奖励 \(r^*(x,y) = \beta \log \frac{\pi_\theta(y|x)}{\pi_{ref}(y|x)}\) 代入上式,直接得到DPO损失函数。

4.3 代码实现

Python
class RewardModel(nn.Module):  # 继承nn.Module定义网络层
    """奖励模型"""

    def __init__(self, base_model_name='gpt2'):
        super(RewardModel, self).__init__()

        self.transformer = GPT2LMHeadModel.from_pretrained(base_model_name)
        self.reward_head = nn.Linear(self.transformer.config.n_embd, 1)

    def forward(self, input_ids, attention_mask=None):
        """计算奖励分数"""
        outputs = self.transformer.transformer(input_ids, attention_mask=attention_mask)
        hidden_states = outputs.last_hidden_state

        # 取最后一个token的隐藏状态
        last_hidden = hidden_states[:, -1, :]
        reward = self.reward_head(last_hidden)

        return reward.squeeze(-1)  # squeeze压缩维度

class RewardModelTrainer:
    """奖励模型训练器"""

    def __init__(self, model_name='gpt2', lr=1e-5):
        self.model = RewardModel(model_name)
        self.tokenizer = GPT2Tokenizer.from_pretrained(model_name)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)

    def train_step(self, prompts, responses_w, responses_l):
        """
        训练一步

        参数:
            prompts: 提示列表
            responses_w: 偏好的回复列表
            responses_l: 不偏好的回复列表
        """
        # 编码输入
        texts_w = [p + r for p, r in zip(prompts, responses_w)]  # zip按位置配对
        texts_l = [p + r for p, r in zip(prompts, responses_l)]

        inputs_w = self.tokenizer(texts_w, return_tensors='pt', padding=True, truncation=True)
        inputs_l = self.tokenizer(texts_l, return_tensors='pt', padding=True, truncation=True)

        # 计算奖励
        rewards_w = self.model(**inputs_w)
        rewards_l = self.model(**inputs_l)

        # Bradley-Terry损失
        loss = -torch.log(torch.sigmoid(rewards_w - rewards_l)).mean()

        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()

    def get_reward(self, prompt, response):
        """获取奖励分数"""
        text = prompt + response
        inputs = self.tokenizer(text, return_tensors='pt', truncation=True, max_length=512)

        with torch.no_grad():
            reward = self.model(**inputs)

        return reward.item()

5. 阶段3:PPO+KL优化

5.1 目标函数

\[\max_{\pi_\theta} \mathbb{E}_{x \sim D, y \sim \pi_\theta(y|x)} [r_\phi(x, y)] - \beta \cdot D_{KL}(\pi_\theta(y|x) || \pi_{ref}(y|x))\]

其中: - \(r_\phi\):奖励模型 - \(\pi_{ref}\):参考策略(SFT模型) - \(\beta\):KL惩罚系数

5.2 代码实现

Python
class RLHFTrainer:
    """RLHF训练器"""

    def __init__(self, policy_model, ref_model, reward_model, tokenizer,
                 lr=1e-5, beta=0.1, epsilon=0.2):
        self.policy = policy_model
        self.ref_model = ref_model
        self.reward_model = reward_model
        self.tokenizer = tokenizer
        self.beta = beta
        self.epsilon = epsilon

        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)

        # 冻结参考模型和奖励模型
        for param in self.ref_model.parameters():
            param.requires_grad = False
        for param in self.reward_model.parameters():
            param.requires_grad = False

    def compute_rewards(self, prompts, responses):
        """计算奖励"""
        texts = [p + r for p, r in zip(prompts, responses)]
        inputs = self.tokenizer(texts, return_tensors='pt', padding=True, truncation=True)

        with torch.no_grad():
            rewards = self.reward_model(**inputs)

        return rewards

    def compute_kl_penalty(self, prompts, responses):
        """计算KL散度惩罚"""
        texts = [p + r for p, r in zip(prompts, responses)]
        inputs = self.tokenizer(texts, return_tensors='pt', padding=True, truncation=True)

        # 策略模型的log概率
        # 注意:.loss 是batch平均交叉熵,而非逐token的log概率
        # 正确做法应使用logits计算逐token logprobs:
        #   logits = model(**inputs).logits
        #   per_token_logprobs = torch.gather(F.log_softmax(logits, dim=-1), 2, inputs['input_ids'].unsqueeze(-1)).squeeze(-1)
        #   然后对response部分求和/平均
        policy_outputs = self.policy(**inputs, labels=inputs['input_ids'])
        policy_logprobs = -policy_outputs.loss

        # 参考模型的log概率(同样存在上述batch平均近似的局限性)
        with torch.no_grad():
            ref_outputs = self.ref_model(**inputs, labels=inputs['input_ids'])
            ref_logprobs = -ref_outputs.loss

        # KL散度
        kl_div = policy_logprobs - ref_logprobs

        return kl_div

    def ppo_step(self, prompts, old_responses, old_logprobs):
        """
        PPO更新步骤

        参数:
            prompts: 提示列表
            old_responses: 旧回复列表
            old_logprobs: 旧log概率
        """
        # 计算奖励
        rewards = self.compute_rewards(prompts, old_responses)

        # 计算KL惩罚
        kl_penalty = self.compute_kl_penalty(prompts, old_responses)

        # 最终奖励
        final_rewards = rewards - self.beta * kl_penalty

        # 计算新的log概率
        texts = [p + r for p, r in zip(prompts, old_responses)]
        inputs = self.tokenizer(texts, return_tensors='pt', padding=True, truncation=True)

        outputs = self.policy(**inputs, labels=inputs['input_ids'])
        new_logprobs = -outputs.loss

        # 计算比率
        ratio = torch.exp(new_logprobs - old_logprobs)

        # PPO裁剪目标
        surr1 = ratio * final_rewards
        surr2 = torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon) * final_rewards
        loss = -torch.min(surr1, surr2).mean()

        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()

6. 完整训练流程

Python
def train_rlhf():
    """完整的RLHF训练流程

    ⚠️ 简化说明:此代码为教学用的简化实现。真实的 RLHF PPO 训练还包括:
    - 参考策略 KL 散度的逐 token 计算(而非 loss 级近似)
    - 奖励归一化(running mean/std)与 reward clipping
    - GAE (Generalized Advantage Estimation) 优势函数计算
    - 价值函数(Critic)的联合训练
    - 多 epoch mini-batch PPO 更新
    请参考 TRL (trl) 库的 PPOTrainer 获取生产级实现。
    """

    # 阶段1: SFT
    print("=== Stage 1: Supervised Fine-Tuning ===")
    sft_trainer = SFTTrainer()

    # 加载示范数据
    demonstration_data = load_demonstration_data()

    for epoch in range(3):
        for prompt, response in demonstration_data:
            loss = sft_trainer.train_step(prompt, response)

        print(f"Epoch {epoch + 1}, Loss: {loss:.4f}")

    # 保存SFT模型
    sft_model = sft_trainer.model
    sft_model.save_pretrained('sft_model')

    # 阶段2: 奖励模型训练
    print("\n=== Stage 2: Reward Model Training ===")
    rm_trainer = RewardModelTrainer()

    # 加载偏好数据
    preference_data = load_preference_data()

    for epoch in range(3):
        for prompts, responses_w, responses_l in preference_data:
            loss = rm_trainer.train_step(prompts, responses_w, responses_l)

        print(f"Epoch {epoch + 1}, Loss: {loss:.4f}")

    reward_model = rm_trainer.model

    # 阶段3: RL优化
    print("\n=== Stage 3: RL Optimization ===")
    rlhf_trainer = RLHFTrainer(
        policy_model=sft_model,
        ref_model=sft_model,  # 参考模型使用SFT模型
        reward_model=reward_model
    )

    for iteration in range(1000):
        # 生成回复
        prompts = sample_prompts()
        responses = []
        logprobs = []

        for prompt in prompts:
            response, logprob = generate_with_logprob(sft_model, prompt)
            responses.append(response)
            logprobs.append(logprob)

        # PPO更新
        loss = rlhf_trainer.ppo_step(prompts, responses, torch.stack(logprobs))  # torch.stack沿新维度拼接张量

        if (iteration + 1) % 100 == 0:
            print(f"Iteration {iteration + 1}, Loss: {loss:.4f}")

    # 保存最终模型
    rlhf_trainer.policy.save_pretrained('rlhf_model')

def generate_with_logprob(model, prompt, max_length=100):
    """生成回复并返回log概率"""
    # 实现生成逻辑
    # ...
    return response, logprob

7. 本章总结

核心概念

Text Only
RLHF:
├── 阶段1: SFT(监督微调)
│   └── 使用人类示范数据
├── 阶段2: 奖励模型训练
│   └── 从人类偏好学习
└── 阶段3: RL优化
    └── PPO + KL约束

关键组件:
├── 奖励模型: 预测人类偏好
├── KL约束: 防止模型偏离太远
└── PPO: 策略优化

挑战与解决方案

挑战 解决方案
奖励黑客 KL约束、奖励模型正则化
数据质量 多轮标注、质量控制
训练不稳定 小学习率、早停

✅ 自测问题

  1. RLHF的三个阶段分别是什么?

  2. 为什么要使用KL散度约束?

  3. 奖励模型如何训练?


📚 延伸阅读

  1. Ziegler et al. (2019) - Fine-Tuning Language Models from Human Preferences
  2. Stiennon et al. (2020) - Learning to Summarize with Human Feedback
  3. Ouyang et al. (2022) - Training language models to follow instructions with human feedback (InstructGPT)
  4. Bai et al. (2022) - Constitutional AI: Harmlessness from AI Feedback

→ 下一步:05-模型基础方法前沿.md