02 - 训练基础设施(全面版)¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
学习目标:掌握大模型训练的混合精度、分布式训练、DeepSpeed和Megatron等基础设施技术。
目录¶
混合精度训练¶
1.1 为什么需要混合精度¶
Text Only
FP32 (单精度浮点):
- 32位: 1位符号 + 8位指数 + 23位尾数
- 范围: ~1.18e-38 to ~3.4e38
- 内存占用大,计算慢
FP16 (半精度浮点):
- 16位: 1位符号 + 5位指数 + 10位尾数
- 范围: ~5.96e-8 to ~65504
- 内存占用小,计算快
- 问题: 梯度下溢(underflow),精度损失
BF16 (Brain Floating Point):
- 16位: 1位符号 + 8位指数 + 7位尾数
- 范围与FP32相同,精度略低
- 更适合训练,不易下溢
1.2 PyTorch AMP实现¶
Python
import torch
from torch.amp import autocast, GradScaler
class MixedPrecisionTrainer:
"""
混合精度训练器
"""
def __init__(self, model, optimizer, device='cuda'):
self.model = model.to(device) # .to(device)将数据移至GPU/CPU
self.optimizer = optimizer
self.device = device
# 梯度缩放器,防止FP16梯度下溢
self.scaler = GradScaler()
def train_step(self, batch):
"""
单步训练
"""
self.model.train()
# 将数据移到设备
input_ids = batch['input_ids'].to(self.device)
labels = batch['labels'].to(self.device)
# 清零梯度
self.optimizer.zero_grad()
# 自动混合精度上下文
with autocast(device_type='cuda', dtype=torch.float16):
# 前向传播在FP16
outputs = self.model(input_ids, labels=labels)
loss = outputs.loss
# 反向传播,缩放梯度
self.scaler.scale(loss).backward()
# 梯度裁剪(可选)
self.scaler.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
# 更新参数
self.scaler.step(self.optimizer)
self.scaler.update()
return loss.item()
# BF16训练(需要Ampere架构GPU)
def bf16_training_example():
"""
BF16训练示例
"""
model = MyModel()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
# BF16不需要GradScaler
for batch in dataloader:
with autocast(device_type='cuda', dtype=torch.bfloat16):
outputs = model(**batch)
loss = outputs.loss
loss.backward()
optimizer.step()
optimizer.zero_grad()
分布式训练基础¶
2.1 数据并行 (Data Parallelism)¶
Python
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup_distributed(rank, world_size):
"""
初始化分布式环境
"""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# 初始化进程组
dist.init_process_group(
backend='nccl', # NVIDIA GPU使用NCCL
init_method='env://',
world_size=world_size,
rank=rank
)
# 设置当前设备
torch.cuda.set_device(rank)
def train_ddp(rank, world_size):
"""
DDP训练示例
"""
setup_distributed(rank, world_size)
# 创建模型并移到当前GPU
model = MyModel().to(rank)
# 包装为DDP模型
model = DDP(model, device_ids=[rank])
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
# 创建分布式采样器
train_sampler = torch.utils.data.distributed.DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank
)
dataloader = DataLoader(
dataset,
batch_size=32,
sampler=train_sampler
)
for epoch in range(num_epochs):
train_sampler.set_epoch(epoch) # 确保每个epoch打乱顺序不同
for batch in dataloader:
optimizer.zero_grad()
loss = model(**batch)
loss.backward()
optimizer.step()
dist.destroy_process_group()
# 启动多进程训练
if __name__ == '__main__':
world_size = torch.cuda.device_count()
mp.spawn(train_ddp, args=(world_size,), nprocs=world_size, join=True)
2.2 模型并行 (Model Parallelism)¶
Python
class ModelParallelModel(nn.Module):
"""
模型并行示例
将模型不同层放在不同GPU上
"""
def __init__(self):
super().__init__() # super()调用父类方法
# 第一层在GPU 0
self.layer1 = nn.Linear(1000, 1000).to('cuda:0')
# 第二层在GPU 1
self.layer2 = nn.Linear(1000, 1000).to('cuda:1')
# 第三层在GPU 2
self.layer3 = nn.Linear(1000, 1000).to('cuda:2')
def forward(self, x):
# 输入在GPU 0
x = self.layer1(x.to('cuda:0'))
# 输出传到GPU 1
x = x.to('cuda:1')
x = self.layer2(x)
# 输出传到GPU 2
x = x.to('cuda:2')
x = self.layer3(x)
return x
2.3 流水线并行 (Pipeline Parallelism)¶
Python
from torch.distributed.pipeline.sync import Pipe
class PipelineParallelModel(nn.Module):
"""
流水线并行
将模型分成多个阶段,每个GPU处理一个阶段
"""
def __init__(self):
super().__init__()
# 将模型分成4个阶段
self.stage1 = nn.Sequential(
nn.Linear(1000, 1000),
nn.ReLU(),
).to('cuda:0')
self.stage2 = nn.Sequential(
nn.Linear(1000, 1000),
nn.ReLU(),
).to('cuda:1')
self.stage3 = nn.Sequential(
nn.Linear(1000, 1000),
nn.ReLU(),
).to('cuda:2')
self.stage4 = nn.Sequential(
nn.Linear(1000, 1000),
nn.ReLU(),
).to('cuda:3')
def forward(self, x):
# 使用checkpoint进行流水线并行
x = self.stage1(x)
x = self.stage2(x)
x = self.stage3(x)
x = self.stage4(x)
return x
DeepSpeed详解¶
3.1 DeepSpeed ZeRO优化器¶
Python
# deepspeed_config.json
{
"train_batch_size": 32,
"train_micro_batch_size_per_gpu": 4,
"gradient_accumulation_steps": 2,
"optimizer": {
"type": "AdamW",
"params": {
"lr": 1e-4,
"betas": [0.9, 0.999],
"eps": 1e-8,
"weight_decay": 0.01
}
},
"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 1e-4,
"warmup_num_steps": 1000
}
},
"fp16": {
"enabled": true,
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},
"zero_optimization": {
"stage": 2,
"offload_optimizer": {
"device": "cpu",
"pin_memory": true
},
"allgather_partitions": true,
"allgather_bucket_size": 2e8,
"overlap_comm": true,
"reduce_scatter": true,
"reduce_bucket_size": 2e8,
"contiguous_gradients": true
}
}
Python
import deepspeed
class DeepSpeedTrainer:
"""
DeepSpeed训练器
"""
def __init__(self, model, config_path):
# 加载配置
with open(config_path, 'r') as f: # with自动管理文件关闭
ds_config = json.load(f)
# 初始化DeepSpeed
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
model_parameters=model.parameters(),
config=ds_config
)
self.model = model_engine
self.optimizer = optimizer
def train_step(self, batch):
"""
DeepSpeed训练步骤
"""
# 前向传播
outputs = self.model(**batch)
loss = outputs.loss
# 反向传播(DeepSpeed自动处理梯度累积和混合精度)
self.model.backward(loss)
# 更新参数
self.model.step()
return loss.item()
# ZeRO Stage对比
"""
ZeRO Stage 0: 标准DDP,无优化
ZeRO Stage 1: 优化器状态分片(节省4x内存)
ZeRO Stage 2: 优化器状态 + 梯度分片(节省8x内存)
ZeRO Stage 3: 优化器状态 + 梯度 + 参数分片(与数据并行度线性相关)
例如8卡训练:
- Stage 0: 每卡存完整模型
- Stage 1: 每卡存1/8优化器状态
- Stage 2: 每卡存1/8优化器状态 + 1/8梯度
- Stage 3: 每卡存1/8完整状态
"""
Megatron-LM¶
4.1 张量并行 (Tensor Parallelism)¶
Python
# Megatron张量并行示例
import torch
from megatron.core import tensor_parallel
class TensorParallelLinear(nn.Module):
"""
张量并行线性层
将权重矩阵按列或行分割到多个GPU
"""
def __init__(self, input_size, output_size, world_size):
super().__init__()
# 每个GPU负责一部分输出维度
self.output_size_per_partition = output_size // world_size
# 在当前GPU创建部分权重
self.weight = nn.Parameter(
torch.empty(self.output_size_per_partition, input_size)
)
self.bias = nn.Parameter(torch.empty(self.output_size_per_partition))
def forward(self, input):
# 每个GPU计算部分输出
output_parallel = F.linear(input, self.weight, self.bias)
# All-Gather聚合所有GPU的输出
output = tensor_parallel.gather_from_tensor_model_parallel_region(
output_parallel
)
return output
4.2 序列并行 (Sequence Parallelism)¶
Python
class SequenceParallelLayer(nn.Module):
"""
序列并行
将序列维度分割到多个GPU
适用于长序列训练
"""
def __init__(self, d_model, world_size):
super().__init__()
self.world_size = world_size
self.d_model = d_model
def forward(self, x):
# x: [batch, seq_len, d_model]
batch_size, seq_len, _ = x.shape
# 按序列维度分割
seq_per_partition = seq_len // self.world_size
x_parallel = x[:, :seq_per_partition, :]
# 各GPU独立计算
output_parallel = self.compute(x_parallel)
# 聚合结果
output = torch.cat(
[output_parallel for _ in range(self.world_size)],
dim=1
)
return output
训练优化技巧¶
5.1 梯度累积¶
Python
class GradientAccumulationTrainer:
"""
梯度累积训练器
用小batch模拟大batch训练
"""
def __init__(self, model, optimizer, accumulation_steps=4):
self.model = model
self.optimizer = optimizer
self.accumulation_steps = accumulation_steps
self.step_count = 0
def train_step(self, batch):
"""
训练步骤
"""
# 前向传播
outputs = self.model(**batch)
loss = outputs.loss / self.accumulation_steps # 缩放损失
# 反向传播
loss.backward()
self.step_count += 1
# 每accumulation_steps更新一次参数
if self.step_count % self.accumulation_steps == 0:
self.optimizer.step()
self.optimizer.zero_grad()
return loss.item() * self.accumulation_steps
5.2 梯度裁剪¶
Python
def clip_gradients(parameters, max_norm=1.0, norm_type=2.0):
"""
梯度裁剪
防止梯度爆炸
"""
# 计算梯度范数
total_norm = torch.nn.utils.clip_grad_norm_(
parameters,
max_norm=max_norm,
norm_type=norm_type
)
return total_norm
5.3 学习率调度¶
Python
from transformers import get_cosine_schedule_with_warmup
def create_scheduler(optimizer, num_warmup_steps, num_training_steps):
"""
创建学习率调度器
"""
scheduler = get_cosine_schedule_with_warmup(
optimizer,
num_warmup_steps=num_warmup_steps,
num_training_steps=num_training_steps,
num_cycles=0.5 # 半个余弦周期
)
return scheduler
# 学习率调度可视化
import matplotlib.pyplot as plt
def visualize_lr_schedule():
"""
可视化学习率调度
"""
model = torch.nn.Linear(10, 10)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
num_warmup = 1000
num_total = 10000
scheduler = get_cosine_schedule_with_warmup(
optimizer, num_warmup, num_total
)
lrs = []
for step in range(num_total):
lrs.append(optimizer.param_groups[0]['lr'])
scheduler.step()
plt.plot(lrs)
plt.xlabel('Step')
plt.ylabel('Learning Rate')
plt.title('Learning Rate Schedule')
plt.axvline(x=num_warmup, color='r', linestyle='--', label='Warmup End')
plt.legend()
plt.show()
故障恢复与检查点¶
6.1 检查点保存与加载¶
Python
class CheckpointManager:
"""
检查点管理器
"""
def __init__(self, checkpoint_dir):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
def save_checkpoint(self, model, optimizer, scheduler, step, loss):
"""
保存检查点
"""
checkpoint = {
'step': step,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict() if scheduler else None,
'loss': loss
}
checkpoint_path = self.checkpoint_dir / f'checkpoint_step_{step}.pt'
torch.save(checkpoint, checkpoint_path)
# 保存最新检查点的符号链接
latest_path = self.checkpoint_dir / 'checkpoint_latest.pt'
if latest_path.exists():
latest_path.unlink()
latest_path.symlink_to(checkpoint_path.name)
print(f"Checkpoint saved: {checkpoint_path}")
def load_checkpoint(self, model, optimizer, scheduler=None, checkpoint_path=None):
"""
加载检查点
"""
if checkpoint_path is None:
checkpoint_path = self.checkpoint_dir / 'checkpoint_latest.pt'
checkpoint = torch.load(checkpoint_path, map_location='cpu', weights_only=True)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
if scheduler and checkpoint['scheduler_state_dict']:
scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
step = checkpoint['step']
loss = checkpoint['loss']
print(f"Checkpoint loaded: {checkpoint_path}, step={step}, loss={loss}")
return step, loss
6.2 DeepSpeed检查点¶
Python
# DeepSpeed自动处理检查点
# 保存
model.save_checkpoint(save_dir, tag='step_1000')
# 加载
model.load_checkpoint(load_dir, tag='step_1000')
总结¶
训练基础设施选择指南¶
Text Only
单卡训练:
├── 小模型 (<1B): 直接使用PyTorch
└── 大模型 (>1B): 使用DeepSpeed ZeRO-2/3
多卡训练 (单机多卡):
├── 数据并行: DDP
├── 模型并行: DeepSpeed / Megatron
└── 混合并行: DeepSpeed (ZeRO + Pipeline)
大规模训练 (多机多卡):
├── 通用方案: DeepSpeed
├── 极致性能: Megatron-LM + DeepSpeed
└── 云环境: FSDP (PyTorch原生)
关键要点¶
- 混合精度: 使用BF16或FP16,配合梯度缩放
- 分布式: 根据模型大小选择并行策略
- DeepSpeed: 大模型训练的首选框架
- 检查点: 定期保存,确保可恢复
- 监控: 跟踪损失、学习率、梯度范数
下一步:03-推理服务部署 - 学习vLLM、TGI等推理框架的部署与优化。
最后更新日期:2026-02-12 适用版本:LLM学习教程 v2026