跳转至

07 - DeepSeek R1架构详解

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

内容说明:本文档基于 DeepSeek-R1 论文(arXiv:2501.12948,2025年1月发布)和相关技术文档编写,介绍 DeepSeek R1 的真实架构、训练流程和推理特性。代码示例仅用于说明相关技术原理,不代表 DeepSeek R1 的实际实现。

📖 章节概述

本章介绍 DeepSeek R1 的模型架构、训练流程和推理特性。DeepSeek R1 基于 DeepSeek-V3 基座模型,通过强化学习训练获得强大的推理能力。

🎯 学习目标

完成本章后,你将能够:

  • 理解 DeepSeek R1 的真实架构(MoE + MLA)
  • 掌握 GRPO 强化学习训练流程
  • 了解推理能力如何通过 RL 自然涌现
  • 理解蒸馏技术在小模型推理中的应用

1. DeepSeek R1概述

1.1 什么是DeepSeek R1?

DeepSeek R1(2025年1月发布)是一个通过强化学习获得推理能力的开源大语言模型:

核心事实: - 架构:基于 DeepSeek-V3,采用 MoE(混合专家)+ MLA(多头潜在注意力) - 参数量:671B 总参数,每 token 激活 37B - 上下文:128K tokens - 开源协议:MIT License - 注意:R1 是纯文本模型,不是多模态模型

训练流程: 1. DeepSeek-V3-Base 基座模型 2. 冷启动数据 SFT(少量高质量 CoT 数据) 3. GRPO 强化学习训练(推理能力涌现) 4. 拒绝采样 + SFT(改善输出格式与可读性) 5. 二次 RL 训练(对齐人类偏好)

1.2 模型架构(基于 DeepSeek-V3)

Text Only
DeepSeek R1 真实架构(DeepSeek-V3 基座)
├── Multi-Head Latent Attention (MLA)
│   ├── 低秩键值压缩(减少 KV Cache)
│   ├── 解耦旋转位置编码 (RoPE)
│   └── 推理时显著节省显存
├── DeepSeekMoE
│   ├── 细粒度专家划分(每层 256 个路由专家 + 1 个共享专家)
│   ├── Top-8 路由策略
│   └── 辅助损失平衡负载
└── 训练创新
    ├── GRPO(Group Relative Policy Optimization)
    ├── 无需额外 critic/reward 模型
    ├── 推理能力通过 RL 自然涌现(非硬编码模块)
    └── 置信度估计

2. 模型架构

2.1 混合专家(MoE)架构

DeepSeek-V3 采用细粒度的混合专家架构,每层包含 256 个路由专家和 1 个共享专家,使用 Top-8 路由策略。

Python
import torch
import torch.nn as nn
import torch.nn.functional as F

class MoEExpert(nn.Module):  # 继承nn.Module定义网络层
    """
    混合专家网络中的单个专家
    """
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()  # super()调用父类方法
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.activation = nn.GELU()

    def forward(self, x):
        x = self.fc1(x)
        x = self.activation(x)
        x = self.fc2(x)
        return x

class MoELayer(nn.Module):
    """
    混合专家层
    """
    def __init__(self, input_dim, hidden_dim, output_dim, num_experts=8, top_k=2):
        super().__init__()
        self.num_experts = num_experts
        self.top_k = top_k

        # 门控网络
        self.gate = nn.Linear(input_dim, num_experts)

        # 专家网络
        self.experts = nn.ModuleList([
            MoEExpert(input_dim, hidden_dim, output_dim)
            for _ in range(num_experts)
        ])

    def forward(self, x):
        """
        前向传播

        Args:
            x: 输入张量 [batch_size, seq_len, input_dim]
        """
        batch_size, seq_len, input_dim = x.shape

        # 门控选择
        gate_logits = self.gate(x)  # [batch_size, seq_len, num_experts]
        gate_probs = F.softmax(gate_logits, dim=-1)  # F.xxx PyTorch函数式API

        # 选择top-k专家
        top_k_probs, top_k_indices = torch.topk(gate_probs, self.top_k, dim=-1)

        # 归一化top-k概率
        top_k_probs = top_k_probs / top_k_probs.sum(dim=-1, keepdim=True)

        # 初始化输出(注意使用output_dim,即专家输出维度)
        output_dim = self.experts[0].fc2.out_features
        output = torch.zeros(batch_size, seq_len, output_dim, device=x.device)

        # 应用选定的专家
        for k in range(self.top_k):
            # 获取当前k的专家索引和概率
            expert_idx = top_k_indices[:, :, k]  # [batch_size, seq_len]
            expert_prob = top_k_probs[:, :, k:k+1]  # [batch_size, seq_len, 1]

            # 对每个专家计算
            for expert_id in range(self.num_experts):
                # 找到使用该expert_id的样本
                mask = (expert_idx == expert_id)

                if mask.any():  # any()任一为True则返回True
                    # 获取使用该专家的输入
                    expert_input = x[mask]

                    # 通过专家网络
                    expert_output = self.experts[expert_id](expert_input)

                    # 加权累加到输出
                    output[mask] += expert_output * expert_prob[mask]

        return output

# 使用示例
# moe_layer = MoELayer(input_dim=768, hidden_dim=2048, output_dim=768, num_experts=8, top_k=2)
# x = torch.randn(32, 128, 768)
# output = moe_layer(x)

2.2 多头潜在注意力(MLA)

MLA(Multi-Head Latent Attention)是 DeepSeek-V3 的核心创新之一,通过低秩键值压缩显著减少 KV Cache 的显存占用。

Python
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class MultiHeadLatentAttention(nn.Module):
    """
    多头潜在注意力(MLA)

    MLA 通过低秩键值压缩减少 KV Cache 的显存占用
    """
    def __init__(self, embed_dim, num_heads, latent_dim=64):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        self.latent_dim = latent_dim

        # Q 的投影
        self.q_proj = nn.Linear(embed_dim, embed_dim)

        # KV 的低秩压缩(下投影共享,上投影分开)
        self.kv_down_proj = nn.Linear(embed_dim, latent_dim)
        self.k_up_proj = nn.Linear(latent_dim, embed_dim)  # K 的上投影
        self.v_up_proj = nn.Linear(latent_dim, embed_dim)  # V 的上投影

        # 输出投影
        self.out_proj = nn.Linear(embed_dim, embed_dim)

    def forward(self, x, mask=None):
        """
        前向传播

        Args:
            x: 输入张量 [batch_size, seq_len, embed_dim]
            mask: 注意力掩码 [batch_size, seq_len]
        """
        batch_size, seq_len, _ = x.shape

        # 计算 Q
        Q = self.q_proj(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)  # 重塑张量形状

        # 计算 KV 的低秩表示(共享下投影,分开上投影)
        # MLA低秩压缩:先下投影到latent_dim节省KV Cache,再上投影恢复原始维度
        kv_compressed = self.kv_down_proj(x)  # [batch_size, seq_len, latent_dim]
        k = self.k_up_proj(kv_compressed)  # [batch_size, seq_len, embed_dim]
        v = self.v_up_proj(kv_compressed)  # [batch_size, seq_len, embed_dim]

        # view将(B,S,d_model)拆为(B,S,heads,d_k),transpose(1,2)得(B,heads,S,d_k)多头并行格式
        K = k.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        V = v.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

        # 计算注意力分数
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)

        # 应用掩码
        if mask is not None:
            scores = scores.masked_fill(mask.unsqueeze(1).unsqueeze(1) == 0, float('-inf'))  # unsqueeze增加一个维度

        # 计算注意力权重
        attn_weights = F.softmax(scores, dim=-1)

        # 计算输出
        output = torch.matmul(attn_weights, V)
        output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embed_dim)

        # 输出投影
        output = self.out_proj(output)

        return output

# 使用示例
# mla = MultiHeadLatentAttention(embed_dim=768, num_heads=12, latent_dim=64)
# x = torch.randn(32, 128, 768)
# output = mla(x)

2.3 旋转位置编码(RoPE)

Python
import torch
import torch.nn as nn
import math

class RotaryPositionalEmbedding(nn.Module):
    """
    旋转位置编码(RoPE)
    """
    def __init__(self, embed_dim, max_seq_len=8192):
        super().__init__()
        self.embed_dim = embed_dim
        self.max_seq_len = max_seq_len

        # 计算频率
        inv_freq = 1.0 / (10000 ** (torch.arange(0, embed_dim, 2).float() / embed_dim))
        self.register_buffer('inv_freq', inv_freq)

    def forward(self, x, seq_len=None):
        """
        计算旋转位置编码

        Args:
            x: 输入张量 [batch_size, seq_len, embed_dim]
            seq_len: 序列长度
        """
        if seq_len is None:
            seq_len = x.size(1)

        # 生成位置索引
        positions = torch.arange(seq_len, device=x.device).float()

        # 计算角度
        # RoPE角度计算:unsqueeze(-1)和unsqueeze(0)将1D向量广播相乘得(S,d/2)矩阵
        angles = positions.unsqueeze(-1) * self.inv_freq.unsqueeze(0)  # [seq_len, embed_dim/2]

        # 创建旋转矩阵
        # 两次unsqueeze(0)扩展为(1,1,S,d/2)以广播匹配(B,heads,S,d_k)的Q/K张量
        angles = angles.unsqueeze(0).unsqueeze(0)  # [1, 1, seq_len, embed_dim/2]
        cos_angles = torch.cos(angles)
        sin_angles = torch.sin(angles)

        return cos_angles, sin_angles

def apply_rotary_pos_emb(q, k, cos_angles, sin_angles):
    """
    应用旋转位置编码到Q和K

    Args:
        q: 查询张量 [batch_size, num_heads, seq_len, head_dim]
        k: 键张量 [batch_size, num_heads, seq_len, head_dim]
        cos_angles: 余弦角度 [1, 1, seq_len, head_dim/2]
        sin_angles: 正弦角度 [1, 1, seq_len, head_dim/2]
    """
    # 分离实部和虚部
    q_real, q_imag = q[..., ::2], q[..., 1::2]
    k_real, k_imag = k[..., ::2], k[..., 1::2]

    # 应用旋转
    q_rot_real = q_real * cos_angles - q_imag * sin_angles
    q_rot_imag = q_real * sin_angles + q_imag * cos_angles
    k_rot_real = k_real * cos_angles - k_imag * sin_angles
    k_rot_imag = k_real * sin_angles + k_imag * cos_angles

    # 合并
    q_rot = torch.stack([q_rot_real, q_rot_imag], dim=-1).flatten(-2)  # torch.stack沿新维度拼接张量
    k_rot = torch.stack([k_rot_real, k_rot_imag], dim=-1).flatten(-2)

    return q_rot, k_rot

# 使用示例
# rope = RotaryPositionalEmbedding(embed_dim=768)
# x = torch.randn(32, 128, 768)
# cos_angles, sin_angles = rope(x)

3. GRPO 强化学习训练

3.1 GRPO 算法原理

GRPO(Group Relative Policy Optimization)是 DeepSeek R1 使用的强化学习算法,它是对传统 PPO(Proximal Policy Optimization)的改进。

核心特点: - 无需 Value Model:GRPO 去掉了 PPO 中的价值模型,减少了计算开销 - Group Sampling:对每个问题采样多个输出,计算平均奖励作为基线 - 相对优势:使用相对于组内平均奖励的优势,而不是绝对优势

Python
import torch
import torch.nn as nn
import torch.nn.functional as F

class GRPOPolicy(nn.Module):
    """
    GRPO 策略网络
    """
    def __init__(self, config):
        super().__init__()
        self.config = config

        # 词嵌入
        self.embeddings = nn.Embedding(config.vocab_size, config.hidden_size)

        # Transformer层
        # 注意:PyTorch 2.1+ 中 TransformerEncoderLayer 的 dropout 和 activation 参数
        # 推荐通过 activation 参数传入 'gelu' 或使用自定义激活函数
        # 如需更灵活的配置,建议使用 nn.TransformerEncoderLayer.from_config() 方法
        self.layers = nn.ModuleList([
            nn.TransformerEncoderLayer(
                d_model=config.hidden_size,
                nhead=config.num_attention_heads,
                dim_feedforward=config.intermediate_size,
                dropout=0.1,  # PyTorch 2.0+ 仍支持此参数
                activation='gelu',
                batch_first=True  # PyTorch 1.12+ 推荐使用 batch_first=True
            )
            for _ in range(config.num_hidden_layers)
        ])

        # 最终层归一化
        self.ln_f = nn.LayerNorm(config.hidden_size)

        # 语言模型头
        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

    def forward(self, input_ids, attention_mask=None):
        """
        前向传播
        """
        batch_size, seq_len = input_ids.shape

        # 词嵌入
        hidden_states = self.embeddings(input_ids)

        # 通过Transformer层
        for layer in self.layers:
            hidden_states = layer(
                hidden_states.transpose(0, 1),
                src_key_padding_mask=~attention_mask.bool() if attention_mask is not None else None
            ).transpose(0, 1)

        # 最终层归一化
        hidden_states = self.ln_f(hidden_states)

        # 语言模型头
        logits = self.lm_head(hidden_states)

        return logits

class GRPOTrainer:
    """
    GRPO 训练器
    """
    def __init__(self, policy_model, reference_model, reward_model, config):
        self.policy_model = policy_model
        self.reference_model = reference_model
        self.reward_model = reward_model
        self.config = config

        self.optimizer = torch.optim.AdamW(
            policy_model.parameters(),
            lr=config.learning_rate,
            betas=(0.9, 0.999)
        )

    def compute_advantages(self, rewards):
        """
        计算优势(相对于组内平均奖励)

        Args:
            rewards: 奖励张量 [batch_size, group_size]
        """
        # 计算组内平均奖励
        mean_rewards = rewards.mean(dim=-1, keepdim=True)  # [batch_size, 1]

        # 计算优势(相对于平均)
        advantages = rewards - mean_rewards  # [batch_size, group_size]

        # 归一化优势
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        return advantages

    def compute_policy_loss(self, policy_logits, ref_logits, advantages, attention_mask):
        """
        计算策略损失

        Args:
            policy_logits: 策略模型的输出 [batch_size, seq_len, vocab_size]
            ref_logits: 参考模型的输出 [batch_size, seq_len, vocab_size]
            advantages: 优势 [batch_size, group_size]
            attention_mask: 注意力掩码 [batch_size, seq_len]
        """
        # 计算策略概率
        policy_log_probs = F.log_softmax(policy_logits, dim=-1)
        ref_log_probs = F.log_softmax(ref_logits, dim=-1)

        # 计算重要性比率
        ratio = torch.exp(policy_log_probs - ref_log_probs)

        # 应用 GRPO 裁剪
        clipped_ratio = torch.clamp(ratio, 1 - self.config.clip_ratio, 1 + self.config.clip_ratio)

        # 计算损失
        policy_loss = -torch.min(
            ratio * advantages.unsqueeze(-1).unsqueeze(-1),
            clipped_ratio * advantages.unsqueeze(-1).unsqueeze(-1)
        )

        # 应用掩码
        if attention_mask is not None:
            policy_loss = policy_loss * attention_mask.unsqueeze(-1)

        policy_loss = policy_loss.sum() / attention_mask.sum()

        return policy_loss

    def train_step(self, questions, group_size=16):
        """
        训练步骤

        Args:
            questions: 问题列表
            group_size: 每个问题的采样组大小
        """
        total_loss = 0
        num_batches = 0

        for question in questions:
            # 对每个问题采样多个输出
            outputs = []
            rewards = []

            for _ in range(group_size):
                # 从策略模型采样输出
                output = self.sample_output(question)
                outputs.append(output)

                # 计算奖励
                reward = self.reward_model(question, output)
                rewards.append(reward)

            # 转换为张量
            rewards = torch.tensor(rewards)

            # 计算优势
            advantages = self.compute_advantages(rewards)

            # 计算策略损失(简化:使用平均优势加权的负对数概率)
            # 实际 GRPO 会对每个 sample 计算 policy/ref log-prob 并裁剪
            avg_advantage = advantages.mean()
            policy_loss = -avg_advantage  # 简化的策略梯度

            # 反向传播
            self.optimizer.zero_grad()  # 清零梯度
            policy_loss.backward()  # 反向传播计算梯度
            self.optimizer.step()  # 更新参数

            total_loss += policy_loss.item()  # 将单元素张量转为Python数值
            num_batches += 1

        return total_loss / num_batches

    def sample_output(self, question):
        """
        从策略模型采样输出

        Args:
            question: 输入问题
        """
        # 这里简化实现,实际需要更复杂的采样逻辑
        input_ids = self.tokenize(question)
        attention_mask = torch.ones_like(input_ids)

        with torch.no_grad():  # 禁用梯度计算,节省内存
            logits = self.policy_model(input_ids, attention_mask)

        # 采样输出
        output_ids = torch.multinomial(F.softmax(logits[:, -1, :], dim=-1), num_samples=1)

        return output_ids

    def tokenize(self, text):
        """
        简化的分词函数
        """
        # 这里简化实现,实际需要使用真实的 tokenizer
        return torch.randint(0, 100000, (1, 128))

# 使用示例
# config = type('Config', (), {
#     'vocab_size': 100000,
#     'hidden_size': 768,
#     'num_hidden_layers': 12,
#     'num_attention_heads': 12,
#     'intermediate_size': 2048,
#     'learning_rate': 3e-6,
#     'clip_ratio': 10.0
# })()
#
# policy_model = GRPOPolicy(config)
# reference_model = GRPOPolicy(config)
# reward_model = lambda q, o: torch.randn(1)  # 简化的奖励函数
#
# trainer = GRPOTrainer(policy_model, reference_model, reward_model, config)
# loss = trainer.train_step(["What is 2+2?"], group_size=16)

3.2 推理能力的涌现

DeepSeek R1 的推理能力是通过 GRPO 强化学习训练自然涌现的,而不是通过硬编码的推理模块。

涌现过程: 1. 初始阶段:模型开始生成简单的推理步骤 2. 中间阶段:推理链逐渐变长,模型开始自我反思 3. 后期阶段:推理能力稳定,能够解决复杂问题

关键发现: - 模型自发地生成了思维链(Chain of Thought) - 模型能够自我反思和修正错误 - 推理能力随着训练时间的增加而增强

Python
class RewardModel:
    """
    奖励模型(基于规则的奖励)
    """
    def __init__(self):
        self.rules = {
            'format': self.check_format,
            'correctness': self.check_correctness,
            'consistency': self.check_consistency
        }

    def __call__(self, question, output):  # __call__使实例可像函数一样调用
        """
        计算总奖励
        """
        total_reward = 0

        for rule_name, rule_func in self.rules.items():
            reward = rule_func(question, output)
            total_reward += reward

        return total_reward

    def check_format(self, question, output):
        """
        检查输出格式
        """
        # 检查是否有思维链标记
        if '<think>' in output and '</think>' in output:
            return 1.0

        return 0.0

    def check_correctness(self, question, output):
        """
        检查答案正确性
        """
        # 这里简化实现,实际需要更复杂的逻辑
        # 例如,对于数学问题,可以检查最终答案是否正确
        return torch.randn(1).item()

    def check_consistency(self, question, output):
        """
        检查推理一致性
        """
        # 检查推理步骤是否合理
        # 这里简化实现
        return torch.randn(1).item()

# 使用示例
# reward_model = RewardModel()
# question = "What is 2+2?"
# output = "<think>Let me think about this...\n2+2=4</think>\nThe answer is 4."
# reward = reward_model(question, output)

4. 完整的DeepSeek R1模型

Python
import torch
import torch.nn as nn
from transformers import PreTrainedModel, PretrainedConfig

class DeepSeekR1Config(PretrainedConfig):
    """
    DeepSeek R1配置
    """
    def __init__(
        self,
        vocab_size=100000,
        max_position_embeddings=128000,
        hidden_size=768,
        num_hidden_layers=32,
        num_attention_heads=12,
        intermediate_size=2048,
        num_experts=256,
        top_k_experts=8,
        latent_dim=64,
        **kwargs  # *args接收任意位置参数,**kwargs接收任意关键字参数
    ):
        super().__init__(**kwargs)
        self.vocab_size = vocab_size
        self.max_position_embeddings = max_position_embeddings
        self.hidden_size = hidden_size
        self.num_hidden_layers = num_hidden_layers
        self.num_attention_heads = num_attention_heads
        self.intermediate_size = intermediate_size
        self.num_experts = num_experts
        self.top_k_experts = top_k_experts
        self.latent_dim = latent_dim

class DeepSeekR1Model(PreTrainedModel):
    """
    DeepSeek R1模型(基于DeepSeek-V3架构)

    注意:这是一个简化的实现,用于说明架构原理。
    实际的 DeepSeek R1 使用更复杂的实现和优化。
    """
    config_class = DeepSeekR1Config

    def __init__(self, config):
        super().__init__(config)
        self.config = config

        # 词嵌入
        self.embeddings = nn.Embedding(config.vocab_size, config.hidden_size)

        # 旋转位置编码
        self.rope = RotaryPositionalEmbedding(config.hidden_size, config.max_position_embeddings)

        # Transformer层
        self.layers = nn.ModuleList([
            DeepSeekR1Layer(config)
            for _ in range(config.num_hidden_layers)
        ])

        # 最终层归一化
        self.ln_f = nn.LayerNorm(config.hidden_size)

        # 语言模型头
        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

        # 初始化权重
        self.apply(self._init_weights)

    def _init_weights(self, module):
        """
        初始化权重
        """
        if isinstance(module, nn.Linear):  # isinstance检查类型
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
        elif isinstance(module, nn.LayerNorm):
            nn.init.ones_(module.weight)
            nn.init.zeros_(module.bias)

    def forward(self, input_ids, attention_mask=None):
        """
        前向传播
        """
        batch_size, seq_len = input_ids.shape

        # 词嵌入
        hidden_states = self.embeddings(input_ids)

        # 位置编码
        cos_angles, sin_angles = self.rope(hidden_states, seq_len)

        # 通过Transformer层
        all_hidden_states = []
        for layer in self.layers:
            hidden_states = layer(hidden_states, cos_angles, sin_angles, attention_mask)
            all_hidden_states.append(hidden_states)

        # 最终层归一化
        hidden_states = self.ln_f(hidden_states)

        # 语言模型头
        logits = self.lm_head(hidden_states)

        return {
            "logits": logits,
            "hidden_states": all_hidden_states
        }

class DeepSeekR1Layer(nn.Module):
    """
    DeepSeek R1层
    """
    def __init__(self, config):
        super().__init__()
        self.config = config

        # 多头潜在注意力(MLA)
        self.attention = MultiHeadLatentAttention(
            config.hidden_size,
            config.num_attention_heads,
            config.latent_dim
        )

        # 前馈网络(MoE)
        self.moe = MoELayer(
            config.hidden_size,
            config.intermediate_size,
            config.hidden_size,
            config.num_experts,
            config.top_k_experts
        )

        # 层归一化
        self.norm1 = nn.LayerNorm(config.hidden_size)
        self.norm2 = nn.LayerNorm(config.hidden_size)

    def forward(self, x, cos_angles, sin_angles, attention_mask=None):
        """
        前向传播
        """
        # 注意力
        residual = x
        x = self.norm1(x)
        attn_output = self.attention(x, attention_mask)
        x = residual + attn_output

        # 前馈网络(MoE)
        residual = x
        x = self.norm2(x)
        moe_output = self.moe(x)
        x = residual + moe_output

        return x

# 使用示例
# config = DeepSeekR1Config(
#     vocab_size=100000,
#     hidden_size=768,
#     num_hidden_layers=12,
#     num_attention_heads=12
# )
# model = DeepSeekR1Model(config)
# input_ids = torch.randint(0, 100000, (32, 128))
# outputs = model(input_ids)

5. 练习题

基础练习

  1. 实现简单的MoE层

    Python
    # TODO: 实现一个简单的MoE层
    class SimpleMoE(nn.Module):
        def __init__(self, input_dim, hidden_dim, num_experts):
            # 你的代码
            pass
    
        def forward(self, x):
            # 你的代码
            pass
    

  2. 实现多头潜在注意力

    Python
    # TODO: 实现一个简单的MLA
    class SimpleMLA(nn.Module):
        def __init__(self, embed_dim, num_heads, latent_dim):
            # 你的代码
            pass
    
        def forward(self, x):
            # 你的代码
            pass
    

进阶练习

  1. 实现GRPO训练器

    Python
    # TODO: 实现一个GRPO训练器
    class GRPOTrainer:
        def __init__(self, policy_model, reference_model, reward_model):
            # 你的代码
            pass
    
        def compute_advantages(self, rewards):
            # 你的代码
            pass
    
        def train_step(self, questions, group_size):
            # 你的代码
            pass
    

  2. 实现奖励模型

    Python
    # TODO: 实现一个奖励模型
    class RewardModel:
        def __init__(self):
            # 你的代码
            pass
    
        def __call__(self, question, output):
            # 你的代码
            pass
    

项目练习

  1. 创建完整的推理优化模型
  2. 实现MoE架构
  3. 添加MLA注意力
  4. 实现GRPO训练

6. 最佳实践

✅ 推荐做法

  1. 理解架构原理
  2. 深入理解MoE机制
  3. 掌握MLA注意力
  4. 理解GRPO算法

  5. 优化推理性能

  6. 使用批处理
  7. 优化注意力计算
  8. 减少内存访问

  9. 监控训练过程

  10. 追踪奖励变化
  11. 监控推理质量
  12. 评估模型性能

❌ 避免做法

  1. 盲目使用复杂架构
  2. 根据任务选择架构
  3. 考虑计算资源
  4. 评估实际收益

  5. 忽略训练稳定性

  6. 监控梯度
  7. 调整学习率
  8. 使用合适的裁剪

  9. 缺乏实验验证

  10. 在多个任务上测试
  11. 对比不同算法
  12. 记录实验结果

7. 总结

本章介绍了DeepSeek R1的核心技术:

  • 混合专家(MoE): 高效的专家网络
  • 多头潜在注意力(MLA): 低秩键值压缩
  • 旋转位置编码: RoPE位置编码
  • GRPO强化学习: 推理能力自然涌现
  • 奖励模型: 基于规则的奖励

这些技术共同构成了DeepSeek R1的高效推理能力。

8. 下一步

继续学习08-推理优化技术,深入了解推理优化的具体技术。