跳转至

02 - 训练基础设施(全面版)

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

学习目标:掌握大模型训练的混合精度、分布式训练、DeepSpeed和Megatron等基础设施技术。


目录

  1. 混合精度训练
  2. 分布式训练基础
  3. DeepSpeed详解
  4. Megatron-LM
  5. 训练优化技巧
  6. 故障恢复与检查点

混合精度训练

1.1 为什么需要混合精度

Text Only
FP32 (单精度浮点):
- 32位: 1位符号 + 8位指数 + 23位尾数
- 范围: ~1.18e-38 to ~3.4e38
- 内存占用大,计算慢

FP16 (半精度浮点):
- 16位: 1位符号 + 5位指数 + 10位尾数
- 范围: ~5.96e-8 to ~65504
- 内存占用小,计算快
- 问题: 梯度下溢(underflow),精度损失

BF16 (Brain Floating Point):
- 16位: 1位符号 + 8位指数 + 7位尾数
- 范围与FP32相同,精度略低
- 更适合训练,不易下溢

1.2 PyTorch AMP实现

Python
import torch
from torch.amp import autocast, GradScaler

class MixedPrecisionTrainer:
    """
    混合精度训练器
    """
    def __init__(self, model, optimizer, device='cuda'):
        self.model = model.to(device)  # .to(device)将数据移至GPU/CPU
        self.optimizer = optimizer
        self.device = device

        # 梯度缩放器,防止FP16梯度下溢
        self.scaler = GradScaler()

    def train_step(self, batch):
        """
        单步训练
        """
        self.model.train()

        # 将数据移到设备
        input_ids = batch['input_ids'].to(self.device)
        labels = batch['labels'].to(self.device)

        # 清零梯度
        self.optimizer.zero_grad()

        # 自动混合精度上下文
        with autocast(device_type='cuda', dtype=torch.float16):
            # 前向传播在FP16
            outputs = self.model(input_ids, labels=labels)
            loss = outputs.loss

        # 反向传播,缩放梯度
        self.scaler.scale(loss).backward()

        # 梯度裁剪(可选)
        self.scaler.unscale_(self.optimizer)
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)

        # 更新参数
        self.scaler.step(self.optimizer)
        self.scaler.update()

        return loss.item()

# BF16训练(需要Ampere架构GPU)
def bf16_training_example():
    """
    BF16训练示例
    """
    model = MyModel()
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

    # BF16不需要GradScaler
    for batch in dataloader:
        with autocast(device_type='cuda', dtype=torch.bfloat16):
            outputs = model(**batch)
            loss = outputs.loss

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

分布式训练基础

2.1 数据并行 (Data Parallelism)

Python
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

def setup_distributed(rank, world_size):
    """
    初始化分布式环境
    """
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # 初始化进程组
    dist.init_process_group(
        backend='nccl',  # NVIDIA GPU使用NCCL
        init_method='env://',
        world_size=world_size,
        rank=rank
    )

    # 设置当前设备
    torch.cuda.set_device(rank)

def train_ddp(rank, world_size):
    """
    DDP训练示例
    """
    setup_distributed(rank, world_size)

    # 创建模型并移到当前GPU
    model = MyModel().to(rank)

    # 包装为DDP模型
    model = DDP(model, device_ids=[rank])

    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

    # 创建分布式采样器
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank
    )

    dataloader = DataLoader(
        dataset,
        batch_size=32,
        sampler=train_sampler
    )

    for epoch in range(num_epochs):
        train_sampler.set_epoch(epoch)  # 确保每个epoch打乱顺序不同

        for batch in dataloader:
            optimizer.zero_grad()
            loss = model(**batch)
            loss.backward()
            optimizer.step()

    dist.destroy_process_group()

# 启动多进程训练
if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    mp.spawn(train_ddp, args=(world_size,), nprocs=world_size, join=True)

2.2 模型并行 (Model Parallelism)

Python
class ModelParallelModel(nn.Module):
    """
    模型并行示例

    将模型不同层放在不同GPU上
    """
    def __init__(self):
        super().__init__()  # super()调用父类方法
        # 第一层在GPU 0
        self.layer1 = nn.Linear(1000, 1000).to('cuda:0')

        # 第二层在GPU 1
        self.layer2 = nn.Linear(1000, 1000).to('cuda:1')

        # 第三层在GPU 2
        self.layer3 = nn.Linear(1000, 1000).to('cuda:2')

    def forward(self, x):
        # 输入在GPU 0
        x = self.layer1(x.to('cuda:0'))

        # 输出传到GPU 1
        x = x.to('cuda:1')
        x = self.layer2(x)

        # 输出传到GPU 2
        x = x.to('cuda:2')
        x = self.layer3(x)

        return x

2.3 流水线并行 (Pipeline Parallelism)

Python
from torch.distributed.pipeline.sync import Pipe

class PipelineParallelModel(nn.Module):
    """
    流水线并行

    将模型分成多个阶段,每个GPU处理一个阶段
    """
    def __init__(self):
        super().__init__()

        # 将模型分成4个阶段
        self.stage1 = nn.Sequential(
            nn.Linear(1000, 1000),
            nn.ReLU(),
        ).to('cuda:0')

        self.stage2 = nn.Sequential(
            nn.Linear(1000, 1000),
            nn.ReLU(),
        ).to('cuda:1')

        self.stage3 = nn.Sequential(
            nn.Linear(1000, 1000),
            nn.ReLU(),
        ).to('cuda:2')

        self.stage4 = nn.Sequential(
            nn.Linear(1000, 1000),
            nn.ReLU(),
        ).to('cuda:3')

    def forward(self, x):
        # 使用checkpoint进行流水线并行
        x = self.stage1(x)
        x = self.stage2(x)
        x = self.stage3(x)
        x = self.stage4(x)
        return x

DeepSpeed详解

3.1 DeepSpeed ZeRO优化器

Python
# deepspeed_config.json
{
    "train_batch_size": 32,
    "train_micro_batch_size_per_gpu": 4,
    "gradient_accumulation_steps": 2,

    "optimizer": {
        "type": "AdamW",
        "params": {
            "lr": 1e-4,
            "betas": [0.9, 0.999],
            "eps": 1e-8,
            "weight_decay": 0.01
        }
    },

    "scheduler": {
        "type": "WarmupLR",
        "params": {
            "warmup_min_lr": 0,
            "warmup_max_lr": 1e-4,
            "warmup_num_steps": 1000
        }
    },

    "fp16": {
        "enabled": true,
        "loss_scale": 0,
        "loss_scale_window": 1000,
        "initial_scale_power": 16,
        "hysteresis": 2,
        "min_loss_scale": 1
    },

    "zero_optimization": {
        "stage": 2,
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": true
        },
        "allgather_partitions": true,
        "allgather_bucket_size": 2e8,
        "overlap_comm": true,
        "reduce_scatter": true,
        "reduce_bucket_size": 2e8,
        "contiguous_gradients": true
    }
}
Python
import deepspeed

class DeepSpeedTrainer:
    """
    DeepSpeed训练器
    """
    def __init__(self, model, config_path):
        # 加载配置
        with open(config_path, 'r') as f:  # with自动管理文件关闭
            ds_config = json.load(f)

        # 初始化DeepSpeed
        model_engine, optimizer, _, _ = deepspeed.initialize(
            model=model,
            model_parameters=model.parameters(),
            config=ds_config
        )

        self.model = model_engine
        self.optimizer = optimizer

    def train_step(self, batch):
        """
        DeepSpeed训练步骤
        """
        # 前向传播
        outputs = self.model(**batch)
        loss = outputs.loss

        # 反向传播(DeepSpeed自动处理梯度累积和混合精度)
        self.model.backward(loss)

        # 更新参数
        self.model.step()

        return loss.item()

# ZeRO Stage对比
"""
ZeRO Stage 0: 标准DDP,无优化
ZeRO Stage 1: 优化器状态分片(节省4x内存)
ZeRO Stage 2: 优化器状态 + 梯度分片(节省8x内存)
ZeRO Stage 3: 优化器状态 + 梯度 + 参数分片(与数据并行度线性相关)

例如8卡训练:
- Stage 0: 每卡存完整模型
- Stage 1: 每卡存1/8优化器状态
- Stage 2: 每卡存1/8优化器状态 + 1/8梯度
- Stage 3: 每卡存1/8完整状态
"""

Megatron-LM

4.1 张量并行 (Tensor Parallelism)

Python
# Megatron张量并行示例
import torch
from megatron.core import tensor_parallel

class TensorParallelLinear(nn.Module):
    """
    张量并行线性层

    将权重矩阵按列或行分割到多个GPU
    """
    def __init__(self, input_size, output_size, world_size):
        super().__init__()

        # 每个GPU负责一部分输出维度
        self.output_size_per_partition = output_size // world_size

        # 在当前GPU创建部分权重
        self.weight = nn.Parameter(
            torch.empty(self.output_size_per_partition, input_size)
        )
        self.bias = nn.Parameter(torch.empty(self.output_size_per_partition))

    def forward(self, input):
        # 每个GPU计算部分输出
        output_parallel = F.linear(input, self.weight, self.bias)

        # All-Gather聚合所有GPU的输出
        output = tensor_parallel.gather_from_tensor_model_parallel_region(
            output_parallel
        )

        return output

4.2 序列并行 (Sequence Parallelism)

Python
class SequenceParallelLayer(nn.Module):
    """
    序列并行

    将序列维度分割到多个GPU
    适用于长序列训练
    """
    def __init__(self, d_model, world_size):
        super().__init__()
        self.world_size = world_size
        self.d_model = d_model

    def forward(self, x):
        # x: [batch, seq_len, d_model]
        batch_size, seq_len, _ = x.shape

        # 按序列维度分割
        seq_per_partition = seq_len // self.world_size
        x_parallel = x[:, :seq_per_partition, :]

        # 各GPU独立计算
        output_parallel = self.compute(x_parallel)

        # 聚合结果
        output = torch.cat(
            [output_parallel for _ in range(self.world_size)],
            dim=1
        )

        return output

训练优化技巧

5.1 梯度累积

Python
class GradientAccumulationTrainer:
    """
    梯度累积训练器

    用小batch模拟大batch训练
    """
    def __init__(self, model, optimizer, accumulation_steps=4):
        self.model = model
        self.optimizer = optimizer
        self.accumulation_steps = accumulation_steps
        self.step_count = 0

    def train_step(self, batch):
        """
        训练步骤
        """
        # 前向传播
        outputs = self.model(**batch)
        loss = outputs.loss / self.accumulation_steps  # 缩放损失

        # 反向传播
        loss.backward()

        self.step_count += 1

        # 每accumulation_steps更新一次参数
        if self.step_count % self.accumulation_steps == 0:
            self.optimizer.step()
            self.optimizer.zero_grad()

        return loss.item() * self.accumulation_steps

5.2 梯度裁剪

Python
def clip_gradients(parameters, max_norm=1.0, norm_type=2.0):
    """
    梯度裁剪

    防止梯度爆炸
    """
    # 计算梯度范数
    total_norm = torch.nn.utils.clip_grad_norm_(
        parameters,
        max_norm=max_norm,
        norm_type=norm_type
    )

    return total_norm

5.3 学习率调度

Python
from transformers import get_cosine_schedule_with_warmup

def create_scheduler(optimizer, num_warmup_steps, num_training_steps):
    """
    创建学习率调度器
    """
    scheduler = get_cosine_schedule_with_warmup(
        optimizer,
        num_warmup_steps=num_warmup_steps,
        num_training_steps=num_training_steps,
        num_cycles=0.5  # 半个余弦周期
    )

    return scheduler

# 学习率调度可视化
import matplotlib.pyplot as plt

def visualize_lr_schedule():
    """
    可视化学习率调度
    """
    model = torch.nn.Linear(10, 10)
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

    num_warmup = 1000
    num_total = 10000

    scheduler = get_cosine_schedule_with_warmup(
        optimizer, num_warmup, num_total
    )

    lrs = []
    for step in range(num_total):
        lrs.append(optimizer.param_groups[0]['lr'])
        scheduler.step()

    plt.plot(lrs)
    plt.xlabel('Step')
    plt.ylabel('Learning Rate')
    plt.title('Learning Rate Schedule')
    plt.axvline(x=num_warmup, color='r', linestyle='--', label='Warmup End')
    plt.legend()
    plt.show()

故障恢复与检查点

6.1 检查点保存与加载

Python
class CheckpointManager:
    """
    检查点管理器
    """
    def __init__(self, checkpoint_dir):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)

    def save_checkpoint(self, model, optimizer, scheduler, step, loss):
        """
        保存检查点
        """
        checkpoint = {
            'step': step,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict() if scheduler else None,
            'loss': loss
        }

        checkpoint_path = self.checkpoint_dir / f'checkpoint_step_{step}.pt'
        torch.save(checkpoint, checkpoint_path)

        # 保存最新检查点的符号链接
        latest_path = self.checkpoint_dir / 'checkpoint_latest.pt'
        if latest_path.exists():
            latest_path.unlink()
        latest_path.symlink_to(checkpoint_path.name)

        print(f"Checkpoint saved: {checkpoint_path}")

    def load_checkpoint(self, model, optimizer, scheduler=None, checkpoint_path=None):
        """
        加载检查点
        """
        if checkpoint_path is None:
            checkpoint_path = self.checkpoint_dir / 'checkpoint_latest.pt'

        checkpoint = torch.load(checkpoint_path, map_location='cpu', weights_only=True)

        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

        if scheduler and checkpoint['scheduler_state_dict']:
            scheduler.load_state_dict(checkpoint['scheduler_state_dict'])

        step = checkpoint['step']
        loss = checkpoint['loss']

        print(f"Checkpoint loaded: {checkpoint_path}, step={step}, loss={loss}")

        return step, loss

6.2 DeepSpeed检查点

Python
# DeepSpeed自动处理检查点
# 保存
model.save_checkpoint(save_dir, tag='step_1000')

# 加载
model.load_checkpoint(load_dir, tag='step_1000')

总结

训练基础设施选择指南

Text Only
单卡训练:
├── 小模型 (<1B): 直接使用PyTorch
└── 大模型 (>1B): 使用DeepSpeed ZeRO-2/3

多卡训练 (单机多卡):
├── 数据并行: DDP
├── 模型并行: DeepSpeed / Megatron
└── 混合并行: DeepSpeed (ZeRO + Pipeline)

大规模训练 (多机多卡):
├── 通用方案: DeepSpeed
├── 极致性能: Megatron-LM + DeepSpeed
└── 云环境: FSDP (PyTorch原生)

关键要点

  1. 混合精度: 使用BF16或FP16,配合梯度缩放
  2. 分布式: 根据模型大小选择并行策略
  3. DeepSpeed: 大模型训练的首选框架
  4. 检查点: 定期保存,确保可恢复
  5. 监控: 跟踪损失、学习率、梯度范数

下一步03-推理服务部署 - 学习vLLM、TGI等推理框架的部署与优化。


最后更新日期:2026-02-12 适用版本:LLM学习教程 v2026