跳转至

07 - DeepSeek R1 架构详解

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

内容说明:本文档基于 DeepSeek-R1 论文( arXiv:2501.12948 , 2025 年 1 月发布)和相关公开技术资料编写,介绍 DeepSeek R1 的架构要点、训练流程和推理特性。代码示例仅用于说明相关技术原理,不代表官方实现,也不应视为逐行复现。

📖 章节概述

本章介绍 DeepSeek R1 的模型架构、训练流程和推理特性。 DeepSeek R1 基于 DeepSeek-V3 基座模型,通过强化学习训练获得强大的推理能力。

🎯 学习目标

完成本章后,你将能够:

  • 理解 DeepSeek R1 的公开架构要点( MoE + MLA )
  • 掌握 GRPO 强化学习训练流程
  • 了解论文报告的推理行为如何在 RL 训练中出现
  • 理解蒸馏技术在小模型推理中的应用

1. DeepSeek R1 概述

1.1 什么是 DeepSeek R1

DeepSeek R1 ( 2025 年 1 月发布)是一个通过强化学习获得推理能力的开源大语言模型:

核心事实: - 架构:基于 DeepSeek-V3 ,采用 MoE (混合专家)+ MLA (多头潜在注意力) - 参数量: 671B 总参数,每 token 激活 37B - 上下文: 128K tokens - 开源协议:官方仓库公开为 MIT License - 注意: R1 是纯文本模型,不是多模态模型

训练流程(教学化概括): 1. DeepSeek-V3-Base 基座模型 2. 冷启动数据 SFT (少量高质量 CoT 数据) 3. GRPO 强化学习训练(论文报告该阶段出现更长推理、自检等行为) 4. 拒绝采样 + SFT (改善输出格式与可读性) 5. 二次 RL 训练(对齐人类偏好)

1.2 模型架构(基于 DeepSeek-V3 )

Text Only
DeepSeek R1 的公开架构要点(基于 DeepSeek-V3 基座)
├── Multi-Head Latent Attention (MLA)
│   ├── 低秩键值压缩(减少 KV Cache)
│   ├── 解耦旋转位置编码 (RoPE)
│   └── 推理时显著节省显存
├── DeepSeekMoE
│   ├── 细粒度专家划分(每层 256 个路由专家 + 1 个共享专家)
│   ├── Top-8 路由策略
│   └── Auxiliary-loss-free 负载均衡与路由约束
└── 训练创新
    ├── GRPO(Group Relative Policy Optimization)
    ├── 去掉单独的 Critic / Value 模型
    ├── 奖励通常来自规则或可验证信号
    └── 论文报告 RL 后出现更强的推理行为(非硬编码推理模块)

2. 模型架构

2.1 混合专家( MoE )架构

DeepSeek-V3 采用细粒度的混合专家架构,每层包含 256 个路由专家和 1 个共享专家,使用 Top-8 路由策略。

Python
import torch
import torch.nn as nn
import torch.nn.functional as F

class MoEExpert(nn.Module):  # 继承nn.Module定义网络层
    """
    混合专家网络中的单个专家
    """
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()  # super()调用父类方法
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.activation = nn.GELU()

    def forward(self, x):
        x = self.fc1(x)
        x = self.activation(x)
        x = self.fc2(x)
        return x

class MoELayer(nn.Module):
    """
    混合专家层
    """
    def __init__(self, input_dim, hidden_dim, output_dim, num_experts=8, top_k=2):
        super().__init__()
        self.num_experts = num_experts
        self.top_k = top_k

        # 门控网络
        self.gate = nn.Linear(input_dim, num_experts)

        # 专家网络
        self.experts = nn.ModuleList([
            MoEExpert(input_dim, hidden_dim, output_dim)
            for _ in range(num_experts)
        ])

    def forward(self, x):
        """
        前向传播

        Args:
            x: 输入张量 [batch_size, seq_len, input_dim]
        """
        batch_size, seq_len, input_dim = x.shape

        # 门控选择
        gate_logits = self.gate(x)  # [batch_size, seq_len, num_experts]
        gate_probs = F.softmax(gate_logits, dim=-1)  # F.xxx PyTorch函数式API

        # 选择top-k专家
        top_k_probs, top_k_indices = torch.topk(gate_probs, self.top_k, dim=-1)

        # 归一化top-k概率
        top_k_probs = top_k_probs / top_k_probs.sum(dim=-1, keepdim=True)

        # 初始化输出(注意使用output_dim,即专家输出维度)
        output_dim = self.experts[0].fc2.out_features
        output = torch.zeros(batch_size, seq_len, output_dim, device=x.device)

        # 应用选定的专家
        for k in range(self.top_k):
            # 获取当前k的专家索引和概率
            expert_idx = top_k_indices[:, :, k]  # [batch_size, seq_len]
            expert_prob = top_k_probs[:, :, k:k+1]  # [batch_size, seq_len, 1]

            # 对每个专家计算
            for expert_id in range(self.num_experts):
                # 找到使用该expert_id的样本
                mask = (expert_idx == expert_id)

                if mask.any():  # any()任一为True则返回True
                    # 获取使用该专家的输入
                    expert_input = x[mask]

                    # 通过专家网络
                    expert_output = self.experts[expert_id](expert_input)

                    # 加权累加到输出
                    output[mask] += expert_output * expert_prob[mask]

        return output

# 使用示例
# moe_layer = MoELayer(input_dim=768, hidden_dim=2048, output_dim=768, num_experts=8, top_k=2)
# x = torch.randn(32, 128, 768)
# output = moe_layer(x)

2.2 多头潜在注意力( MLA )

MLA ( Multi-Head Latent Attention )是 DeepSeek-V3 的核心创新之一,通过低秩键值压缩显著减少 KV Cache 的显存占用。

Python
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class MultiHeadLatentAttention(nn.Module):
    """
    多头潜在注意力(MLA)

    MLA 通过低秩键值压缩减少 KV Cache 的显存占用
    """
    def __init__(self, embed_dim, num_heads, latent_dim=64):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        self.latent_dim = latent_dim

        # Q 的投影
        self.q_proj = nn.Linear(embed_dim, embed_dim)

        # KV 的低秩压缩(下投影共享,上投影分开)
        self.kv_down_proj = nn.Linear(embed_dim, latent_dim)
        self.k_up_proj = nn.Linear(latent_dim, embed_dim)  # K 的上投影
        self.v_up_proj = nn.Linear(latent_dim, embed_dim)  # V 的上投影

        # 输出投影
        self.out_proj = nn.Linear(embed_dim, embed_dim)

    def forward(self, x, mask=None):
        """
        前向传播

        Args:
            x: 输入张量 [batch_size, seq_len, embed_dim]
            mask: 注意力掩码 [batch_size, seq_len]
        """
        batch_size, seq_len, _ = x.shape

        # 计算 Q
        Q = self.q_proj(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)  # 重塑张量形状

        # 计算 KV 的低秩表示(共享下投影,分开上投影)
        # MLA低秩压缩:先下投影到latent_dim节省KV Cache,再上投影恢复原始维度
        kv_compressed = self.kv_down_proj(x)  # [batch_size, seq_len, latent_dim]
        k = self.k_up_proj(kv_compressed)  # [batch_size, seq_len, embed_dim]
        v = self.v_up_proj(kv_compressed)  # [batch_size, seq_len, embed_dim]

        # view将(B,S,d_model)拆为(B,S,heads,d_k),transpose(1,2)得(B,heads,S,d_k)多头并行格式
        K = k.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        V = v.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

        # 计算注意力分数
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)

        # 应用掩码
        if mask is not None:
            scores = scores.masked_fill(mask.unsqueeze(1).unsqueeze(1) == 0, float('-inf'))  # unsqueeze增加一个维度

        # 计算注意力权重
        attn_weights = F.softmax(scores, dim=-1)

        # 计算输出
        output = torch.matmul(attn_weights, V)
        output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embed_dim)

        # 输出投影
        output = self.out_proj(output)

        return output

# 使用示例
# mla = MultiHeadLatentAttention(embed_dim=768, num_heads=12, latent_dim=64)
# x = torch.randn(32, 128, 768)
# output = mla(x)

2.3 旋转位置编码( RoPE )

Python
import torch
import torch.nn as nn
import math

class RotaryPositionalEmbedding(nn.Module):
    """
    旋转位置编码(RoPE)
    """
    def __init__(self, embed_dim, max_seq_len=8192):
        super().__init__()
        self.embed_dim = embed_dim
        self.max_seq_len = max_seq_len

        # 计算频率
        inv_freq = 1.0 / (10000 ** (torch.arange(0, embed_dim, 2).float() / embed_dim))
        self.register_buffer('inv_freq', inv_freq)

    def forward(self, x, seq_len=None):
        """
        计算旋转位置编码

        Args:
            x: 输入张量 [batch_size, seq_len, embed_dim]
            seq_len: 序列长度
        """
        if seq_len is None:
            seq_len = x.size(1)

        # 生成位置索引
        positions = torch.arange(seq_len, device=x.device).float()

        # 计算角度
        # RoPE角度计算:unsqueeze(-1)和unsqueeze(0)将1D向量广播相乘得(S,d/2)矩阵
        angles = positions.unsqueeze(-1) * self.inv_freq.unsqueeze(0)  # [seq_len, embed_dim/2]

        # 创建旋转矩阵
        # 两次unsqueeze(0)扩展为(1,1,S,d/2)以广播匹配(B,heads,S,d_k)的Q/K张量
        angles = angles.unsqueeze(0).unsqueeze(0)  # [1, 1, seq_len, embed_dim/2]
        cos_angles = torch.cos(angles)
        sin_angles = torch.sin(angles)

        return cos_angles, sin_angles

def apply_rotary_pos_emb(q, k, cos_angles, sin_angles):
    """
    应用旋转位置编码到Q和K

    Args:
        q: 查询张量 [batch_size, num_heads, seq_len, head_dim]
        k: 键张量 [batch_size, num_heads, seq_len, head_dim]
        cos_angles: 余弦角度 [1, 1, seq_len, head_dim/2]
        sin_angles: 正弦角度 [1, 1, seq_len, head_dim/2]
    """
    # 分离实部和虚部
    q_real, q_imag = q[..., ::2], q[..., 1::2]
    k_real, k_imag = k[..., ::2], k[..., 1::2]

    # 应用旋转
    q_rot_real = q_real * cos_angles - q_imag * sin_angles
    q_rot_imag = q_real * sin_angles + q_imag * cos_angles
    k_rot_real = k_real * cos_angles - k_imag * sin_angles
    k_rot_imag = k_real * sin_angles + k_imag * cos_angles

    # 合并
    q_rot = torch.stack([q_rot_real, q_rot_imag], dim=-1).flatten(-2)  # torch.stack沿新维度拼接张量
    k_rot = torch.stack([k_rot_real, k_rot_imag], dim=-1).flatten(-2)

    return q_rot, k_rot

# 使用示例
# rope = RotaryPositionalEmbedding(embed_dim=768)
# x = torch.randn(32, 128, 768)
# cos_angles, sin_angles = rope(x)

3. GRPO 强化学习训练

3.1 GRPO 算法原理

GRPO ( Group Relative Policy Optimization )是 DeepSeek R1 使用的强化学习算法,它是对传统 PPO ( Proximal Policy Optimization )的改进。

核心特点: - 无需 Value Model: GRPO 去掉了 PPO 中的价值模型,减少了计算开销 - Group Sampling:对每个问题采样多个输出,计算平均奖励作为基线 - 相对优势:使用相对于组内平均奖励的优势,而不是绝对优势 - 奖励信号仍然存在:只是不再额外训练 Critic / Value 模型;奖励可以来自规则、校验器或其他可验证信号

说明:下面的代码只演示 GRPO 的核心思路,便于理解组内采样、相对优势和策略更新;它不是 DeepSeek R1 官方训练代码,也没有覆盖分布式训练、真实 log-prob 计算和工程优化细节。

Python
import torch
import torch.nn as nn
import torch.nn.functional as F

class GRPOPolicy(nn.Module):
    """
    GRPO 策略网络
    """
    def __init__(self, config):
        super().__init__()
        self.config = config

        # 词嵌入
        self.embeddings = nn.Embedding(config.vocab_size, config.hidden_size)

        # Transformer层
        # 注意:PyTorch 2.1+ 中 TransformerEncoderLayer 的 dropout 和 activation 参数
        # 推荐通过 activation 参数传入 'gelu' 或使用自定义激活函数
        # 如需更灵活的配置,建议使用 nn.TransformerEncoderLayer.from_config() 方法
        self.layers = nn.ModuleList([
            nn.TransformerEncoderLayer(
                d_model=config.hidden_size,
                nhead=config.num_attention_heads,
                dim_feedforward=config.intermediate_size,
                dropout=0.1,  # PyTorch 2.0+ 仍支持此参数
                activation='gelu',
                batch_first=True  # PyTorch 1.12+ 推荐使用 batch_first=True
            )
            for _ in range(config.num_hidden_layers)
        ])

        # 最终层归一化
        self.ln_f = nn.LayerNorm(config.hidden_size)

        # 语言模型头
        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

    def forward(self, input_ids, attention_mask=None):
        """
        前向传播
        """
        batch_size, seq_len = input_ids.shape

        # 词嵌入
        hidden_states = self.embeddings(input_ids)

        # 通过Transformer层
        for layer in self.layers:
            hidden_states = layer(
                hidden_states.transpose(0, 1),
                src_key_padding_mask=~attention_mask.bool() if attention_mask is not None else None
            ).transpose(0, 1)

        # 最终层归一化
        hidden_states = self.ln_f(hidden_states)

        # 语言模型头
        logits = self.lm_head(hidden_states)

        return logits

class GRPOTrainer:
    """
    GRPO 训练器
    """
    def __init__(self, policy_model, reference_model, reward_model, config):
        self.policy_model = policy_model
        self.reference_model = reference_model
        self.reward_model = reward_model
        self.config = config

        self.optimizer = torch.optim.AdamW(
            policy_model.parameters(),
            lr=config.learning_rate,
            betas=(0.9, 0.999)
        )

    def compute_advantages(self, rewards):
        """
        计算优势(相对于组内平均奖励)

        Args:
            rewards: 奖励张量 [batch_size, group_size]
        """
        # 计算组内平均奖励
        mean_rewards = rewards.mean(dim=-1, keepdim=True)  # [batch_size, 1]

        # 计算优势(相对于平均)
        advantages = rewards - mean_rewards  # [batch_size, group_size]

        # 归一化优势
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        return advantages

    def compute_policy_loss(self, policy_logits, ref_logits, advantages, attention_mask):
        """
        计算策略损失

        Args:
            policy_logits: 策略模型的输出 [batch_size, seq_len, vocab_size]
            ref_logits: 参考模型的输出 [batch_size, seq_len, vocab_size]
            advantages: 优势 [batch_size, group_size]
            attention_mask: 注意力掩码 [batch_size, seq_len]
        """
        # 计算策略概率
        policy_log_probs = F.log_softmax(policy_logits, dim=-1)
        ref_log_probs = F.log_softmax(ref_logits, dim=-1)

        # 计算重要性比率
        ratio = torch.exp(policy_log_probs - ref_log_probs)

        # 应用 GRPO 裁剪
        clipped_ratio = torch.clamp(ratio, 1 - self.config.clip_ratio, 1 + self.config.clip_ratio)

        # 计算损失
        policy_loss = -torch.min(
            ratio * advantages.unsqueeze(-1).unsqueeze(-1),
            clipped_ratio * advantages.unsqueeze(-1).unsqueeze(-1)
        )

        # 应用掩码
        if attention_mask is not None:
            policy_loss = policy_loss * attention_mask.unsqueeze(-1)

        policy_loss = policy_loss.sum() / attention_mask.sum()

        return policy_loss

    def train_step(self, questions, group_size=16):
        """
        训练步骤

        Args:
            questions: 问题列表
            group_size: 每个问题的采样组大小
        """
        total_loss = 0
        num_batches = 0

        for question in questions:
            # 对每个问题采样多个输出
            outputs = []
            rewards = []

            for _ in range(group_size):
                # 从策略模型采样输出
                output = self.sample_output(question)
                outputs.append(output)

                # 计算奖励
                reward = self.reward_model(question, output)
                rewards.append(reward)

            # 转换为张量
            rewards = torch.tensor(rewards)

            # 计算优势
            advantages = self.compute_advantages(rewards)

            # 计算策略损失(简化:使用平均优势加权的负对数概率)
            # 实际 GRPO 会对每个 sample 计算 policy/ref log-prob 并裁剪
            avg_advantage = advantages.mean()
            policy_loss = -avg_advantage  # 简化的策略梯度

            # 反向传播
            self.optimizer.zero_grad()  # 清零梯度
            policy_loss.backward()  # 反向传播计算梯度
            self.optimizer.step()  # 更新参数

            total_loss += policy_loss.item()  # 将单元素张量转为Python数值
            num_batches += 1

        return total_loss / num_batches

    def sample_output(self, question):
        """
        从策略模型采样输出

        Args:
            question: 输入问题
        """
        # 这里简化实现,实际需要更复杂的采样逻辑
        input_ids = self.tokenize(question)
        attention_mask = torch.ones_like(input_ids)

        with torch.no_grad():  # 禁用梯度计算,节省内存
            logits = self.policy_model(input_ids, attention_mask)

        # 采样输出
        output_ids = torch.multinomial(F.softmax(logits[:, -1, :], dim=-1), num_samples=1)

        return output_ids

    def tokenize(self, text):
        """
        简化的分词函数
        """
        # 这里简化实现,实际需要使用真实的 tokenizer
        return torch.randint(0, 100000, (1, 128))

# 使用示例
# config = type('Config', (), {
#     'vocab_size': 100000,
#     'hidden_size': 768,
#     'num_hidden_layers': 12,
#     'num_attention_heads': 12,
#     'intermediate_size': 2048,
#     'learning_rate': 3e-6,
#     'clip_ratio': 10.0
# })()
#
# policy_model = GRPOPolicy(config)
# reference_model = GRPOPolicy(config)
# reward_model = lambda q, o: torch.randn(1)  # 简化的奖励函数
#
# trainer = GRPOTrainer(policy_model, reference_model, reward_model, config)
# loss = trainer.train_step(["What is 2+2?"], group_size=16)

3.2 推理能力的涌现

论文报告显示,在基于可验证奖励的 RL 训练过程中,模型逐步出现了更长的推理链、自检和回溯修正等行为。更稳妥的表述是:这些推理行为是在训练中被观察到的现象,而不是额外硬编码的显式推理模块。

论文中报告到的现象: 1. 初始阶段:模型开始生成较短的推理步骤 2. 中间阶段:推理链逐渐变长,出现更多自检式文本 3. 后期阶段:在数学、代码等可验证任务上表现进一步提升

学习时应注意: - 这里的“涌现”是对训练现象的概括,不宜理解为一个精确、可保证复现的阈值事件 - 推理行为是否出现、出现得多强,与数据、奖励设计、采样策略和训练稳定性都有关 - 教学上可以把它理解为:RL 训练并没有显式加一个“推理模块”,而是在原有语言模型上强化了某类行为模式

Python
class RewardModel:
    """
    示意性的奖励函数(基于规则/可验证信号)
    """
    def __init__(self):
        self.rules = {
            'format': self.check_format,
            'correctness': self.check_correctness,
            'consistency': self.check_consistency
        }

    def __call__(self, question, output):  # __call__使实例可像函数一样调用
        """
        计算总奖励
        """
        total_reward = 0

        for rule_name, rule_func in self.rules.items():
            reward = rule_func(question, output)
            total_reward += reward

        return total_reward

    def check_format(self, question, output):
        """
        检查输出格式
        """
        # 这里只是示例:检查是否有特定格式标记
        if '<think>' in output and '</think>' in output:
            return 1.0

        return 0.0

    def check_correctness(self, question, output):
        """
        检查答案正确性
        """
        # 这里简化实现,实际任务中通常应由可验证答案或外部校验器提供信号
        return torch.randn(1).item()

    def check_consistency(self, question, output):
        """
        检查推理一致性
        """
        # 检查推理步骤是否合理
        # 这里简化实现
        return torch.randn(1).item()

# 使用示例
# reward_model = RewardModel()
# question = "What is 2+2?"
# output = "<think>Let me think about this...\n2+2=4</think>\nThe answer is 4."
# reward = reward_model(question, output)

4. 完整的 DeepSeek R1 模型

Python
import torch
import torch.nn as nn
from transformers import PreTrainedModel, PretrainedConfig

class DeepSeekR1Config(PretrainedConfig):
    """
    DeepSeek R1配置
    """
    def __init__(
        self,
        vocab_size=100000,
        max_position_embeddings=128000,
        hidden_size=768,
        num_hidden_layers=32,
        num_attention_heads=12,
        intermediate_size=2048,
        num_experts=256,
        top_k_experts=8,
        latent_dim=64,
        **kwargs  # *args接收任意位置参数,**kwargs接收任意关键字参数
    ):
        super().__init__(**kwargs)
        self.vocab_size = vocab_size
        self.max_position_embeddings = max_position_embeddings
        self.hidden_size = hidden_size
        self.num_hidden_layers = num_hidden_layers
        self.num_attention_heads = num_attention_heads
        self.intermediate_size = intermediate_size
        self.num_experts = num_experts
        self.top_k_experts = top_k_experts
        self.latent_dim = latent_dim

class DeepSeekR1Model(PreTrainedModel):
    """
    DeepSeek R1模型(基于DeepSeek-V3架构)

    注意:这是一个简化的实现,用于说明架构原理。
    实际的 DeepSeek R1 使用更复杂的实现和优化。
    """
    config_class = DeepSeekR1Config

    def __init__(self, config):
        super().__init__(config)
        self.config = config

        # 词嵌入
        self.embeddings = nn.Embedding(config.vocab_size, config.hidden_size)

        # 旋转位置编码
        self.rope = RotaryPositionalEmbedding(config.hidden_size, config.max_position_embeddings)

        # Transformer层
        self.layers = nn.ModuleList([
            DeepSeekR1Layer(config)
            for _ in range(config.num_hidden_layers)
        ])

        # 最终层归一化
        self.ln_f = nn.LayerNorm(config.hidden_size)

        # 语言模型头
        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

        # 初始化权重
        self.apply(self._init_weights)

    def _init_weights(self, module):
        """
        初始化权重
        """
        if isinstance(module, nn.Linear):  # isinstance检查类型
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
        elif isinstance(module, nn.LayerNorm):
            nn.init.ones_(module.weight)
            nn.init.zeros_(module.bias)

    def forward(self, input_ids, attention_mask=None):
        """
        前向传播
        """
        batch_size, seq_len = input_ids.shape

        # 词嵌入
        hidden_states = self.embeddings(input_ids)

        # 位置编码
        cos_angles, sin_angles = self.rope(hidden_states, seq_len)

        # 通过Transformer层
        all_hidden_states = []
        for layer in self.layers:
            hidden_states = layer(hidden_states, cos_angles, sin_angles, attention_mask)
            all_hidden_states.append(hidden_states)

        # 最终层归一化
        hidden_states = self.ln_f(hidden_states)

        # 语言模型头
        logits = self.lm_head(hidden_states)

        return {
            "logits": logits,
            "hidden_states": all_hidden_states
        }

class DeepSeekR1Layer(nn.Module):
    """
    DeepSeek R1层
    """
    def __init__(self, config):
        super().__init__()
        self.config = config

        # 多头潜在注意力(MLA)
        self.attention = MultiHeadLatentAttention(
            config.hidden_size,
            config.num_attention_heads,
            config.latent_dim
        )

        # 前馈网络(MoE)
        self.moe = MoELayer(
            config.hidden_size,
            config.intermediate_size,
            config.hidden_size,
            config.num_experts,
            config.top_k_experts
        )

        # 层归一化
        self.norm1 = nn.LayerNorm(config.hidden_size)
        self.norm2 = nn.LayerNorm(config.hidden_size)

    def forward(self, x, cos_angles, sin_angles, attention_mask=None):
        """
        前向传播
        """
        # 注意力
        residual = x
        x = self.norm1(x)
        attn_output = self.attention(x, attention_mask)
        x = residual + attn_output

        # 前馈网络(MoE)
        residual = x
        x = self.norm2(x)
        moe_output = self.moe(x)
        x = residual + moe_output

        return x

# 使用示例
# config = DeepSeekR1Config(
#     vocab_size=100000,
#     hidden_size=768,
#     num_hidden_layers=12,
#     num_attention_heads=12
# )
# model = DeepSeekR1Model(config)
# input_ids = torch.randint(0, 100000, (32, 128))
# outputs = model(input_ids)

5. 练习题

基础练习

  1. 实现简单的 MoE 层
Python
# 练习: 实现一个简单的MoE层
class SimpleMoE(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_experts):
        # 你的代码
        pass

    def forward(self, x):
        # 你的代码
        pass
  1. 实现多头潜在注意力
Python
# 练习: 实现一个简单的MLA
class SimpleMLA(nn.Module):
    def __init__(self, embed_dim, num_heads, latent_dim):
        # 你的代码
        pass

    def forward(self, x):
        # 你的代码
        pass

进阶练习

  1. 实现 GRPO 训练器
Python
# 练习: 实现一个GRPO训练器
class GRPOTrainer:
    def __init__(self, policy_model, reference_model, reward_model):
        # 你的代码
        pass

    def compute_advantages(self, rewards):
        # 你的代码
        pass

    def train_step(self, questions, group_size):
        # 你的代码
        pass
  1. 实现奖励模型
Python
# 练习: 实现一个奖励模型
class RewardModel:
    def __init__(self):
        # 你的代码
        pass

    def __call__(self, question, output):
        # 你的代码
        pass

项目练习

  1. 创建完整的推理优化模型
  2. 实现 MoE 架构
  3. 添加 MLA 注意力
  4. 实现 GRPO 训练

6. 最佳实践

✅ 推荐做法

  1. 理解架构原理
  2. 深入理解 MoE 机制
  3. 掌握 MLA 注意力
  4. 理解 GRPO 算法

  5. 优化推理性能

  6. 使用批处理
  7. 优化注意力计算
  8. 减少内存访问

  9. 监控训练过程

  10. 追踪奖励变化
  11. 监控推理质量
  12. 评估模型性能

❌ 避免做法

  1. 盲目使用复杂架构
  2. 根据任务选择架构
  3. 考虑计算资源
  4. 评估实际收益

  5. 忽略训练稳定性

  6. 监控梯度
  7. 调整学习率
  8. 使用合适的裁剪

  9. 缺乏实验验证

  10. 在多个任务上测试
  11. 对比不同算法
  12. 记录实验结果

7. 总结

本章介绍了 DeepSeek R1 的核心技术与学习框架:

  • 混合专家( MoE ): 高效的专家网络
  • 多头潜在注意力( MLA ): 低秩键值压缩
  • 旋转位置编码: RoPE 位置编码
  • GRPO 强化学习: 去掉单独的 Critic / Value 模型,用组内相对奖励更新策略
  • 奖励信号: 在推理任务中更常见的是规则或可验证反馈

这些技术共同构成了公开资料中描述的 DeepSeek R1 推理能力来源。若需要论文级细节,应继续对照官方论文和仓库说明阅读。

8. 下一步

继续学习08-推理优化技术,深入了解推理优化的具体技术。


最后更新日期: 2026-03-26