01 - PyTorch基础¶
学习时间: 12-16小时 重要性: ⭐⭐⭐⭐⭐ 深度学习框架的基石
🎯 学习目标¶
- 深入理解张量操作和内存管理
- 掌握自动微分机制
- 熟练使用Dataset和DataLoader
- 构建复杂的神经网络
- 实现完整的训练和推理流程
- 掌握GPU加速和混合精度训练
- 了解模型部署基础
📚 内容概览¶
1. 张量基础¶
1.1 张量创建¶
Python
import torch
import numpy as np
# 从列表创建
data = [[1, 2], [3, 4]]
x = torch.tensor(data)
print(x)
# tensor([[1, 2],
# [3, 4]])
# 创建特定形状的张量
zeros = torch.zeros(3, 4) # 3x4零矩阵
ones = torch.ones(2, 3, 4) # 2x3x4全1张量
rand = torch.rand(3, 3) # 3x3均匀分布随机数
randn = torch.randn(3, 3) # 3x3标准正态分布
# 创建序列
arange = torch.arange(0, 10, 2) # [0, 2, 4, 6, 8]
linspace = torch.linspace(0, 1, 5) # [0.0, 0.25, 0.5, 0.75, 1.0]
# 创建与现有张量相同形状
x_like = torch.zeros_like(x) # 与x形状相同的全0张量
x_empty = torch.empty_like(x) # 未初始化的张量
1.2 张量属性¶
Python
x = torch.randn(3, 4, 5)
print(x.shape) # torch.Size([3, 4, 5])
print(x.dtype) # torch.float32
print(x.device) # cpu / cuda:0
print(x.ndim) # 3 (维度数)
print(x.numel()) # 60 (元素总数)
print(x.element_size()) # 4 (每个元素字节数)
print(x.stride()) # (20, 5, 1) (步长)
print(x.is_contiguous()) # True (是否连续存储)
1.3 数据类型¶
Python
# 常见数据类型
torch.float32 # 默认浮点型
torch.float64 # 双精度浮点型
torch.int32 # 32位整数
torch.int64 # 64位整数(默认整数类型)
torch.bool # 布尔型
torch.complex64 # 复数
# 类型转换
x = torch.tensor([1, 2, 3])
x_float = x.float() # 转为float32
x_double = x.double() # 转为float64
x_int = x.int() # 转为int32
x_long = x.long() # 转为int64
# 在创建时指定类型
x = torch.tensor([1, 2, 3], dtype=torch.float32)
x = torch.zeros(3, 4, dtype=torch.int64)
1.4 张量操作¶
索引与切片¶
Python
x = torch.randn(4, 5, 6)
# 基本索引
print(x[0]) # 第一行
print(x[0, 1]) # 第0行第1列
print(x[0, 1, 2]) # 具体元素
# 切片
print(x[1:3]) # 第1-2行
print(x[:, 2:4]) # 所有行,第2-3列
print(x[::2]) # 每隔一行
print(x[..., -1]) # 最后一维的最后一个元素
# 高级索引
indices = torch.tensor([0, 2, 3])
print(x[indices]) # 取第0, 2, 3行
mask = x > 0
print(x[mask]) # 取所有正数元素
# 使用torch.index_select
torch.index_select(x, 0, indices) # 沿第0维选择
变形操作¶
Python
x = torch.randn(6, 4)
# view - 共享内存,要求连续存储
y = x.view(2, 12) # 变形为2x12
y = x.view(-1, 3) # -1自动计算
# reshape - 类似view,但可处理非连续张量
y = x.reshape(3, 8)
# transpose - 转置
y = x.t() # 2D转置
y = x.transpose(0, 1) # 交换维度0和1
# permute - 任意维度重排
x = torch.randn(2, 3, 4)
y = x.permute(2, 0, 1) # 变为(4, 2, 3)
# squeeze/unsqueeze - 删除/添加维度
x = torch.randn(1, 3, 1, 4)
y = x.squeeze() # 删除所有大小为1的维度 -> (3, 4)
y = x.squeeze(0) # 删除第0维 -> (3, 1, 4)
y = x.unsqueeze(0) # 在第0维添加维度 -> (1, 1, 3, 1, 4)
# expand/expand_as - 广播扩展
x = torch.randn(1, 3)
y = x.expand(4, 3) # 扩展为(4, 3),不复制数据
# flatten - 展平
x = torch.randn(2, 3, 4)
y = x.flatten() # 展平为(24,)
y = x.flatten(1) # 从第1维开始展平 -> (2, 12)
数学运算¶
Python
x = torch.randn(3, 4)
y = torch.randn(3, 4)
# 逐元素运算
z = x + y
z = x - y
z = x * y # 逐元素乘法(Hadamard积)
z = x / y
z = x ** 2
z = torch.pow(x, 2)
# 矩阵乘法
z = x @ y.t() # 矩阵乘法
z = torch.matmul(x, y.t())
z = torch.mm(x, y.t()) # 仅适用于2D
# 批量矩阵乘法
x = torch.randn(10, 3, 4)
y = torch.randn(10, 4, 5)
z = torch.bmm(x, y) # (10, 3, 5)
# 通用矩阵乘法(支持广播)
z = torch.matmul(x, y)
# 点积、叉积
a = torch.randn(3)
b = torch.randn(3)
dot = torch.dot(a, b)
cross = torch.cross(a, b)
# 求和、平均、极值
sum_all = x.sum()
sum_dim = x.sum(dim=1) # 沿第1维求和
mean = x.mean(dim=1, keepdim=True) # 保持维度
max_val, max_idx = x.max(dim=1) # 最大值和索引
min_val = x.min()
# 累积运算
cumsum = x.cumsum(dim=0) # 累积和
cumprod = x.cumprod(dim=0) # 累积积
# 比较运算
mask = x > 0
equal = torch.equal(x, y) # 完全相等
close = torch.allclose(x, y, atol=1e-5) # 近似相等
1.5 广播机制¶
Python
# 广播规则:从后向前比较维度,要么相等,要么其中一个为1
# 示例1: (3, 4) + (4,) -> (3, 4)
x = torch.randn(3, 4)
y = torch.randn(4)
z = x + y # y被广播为(3, 4)
# 示例2: (3, 1, 4) + (1, 5, 1) -> (3, 5, 4)
x = torch.randn(3, 1, 4)
y = torch.randn(1, 5, 1)
z = x + y
# 示例3: 使用unsqueeze进行广播
x = torch.randn(3, 4)
y = torch.randn(4)
z = x + y.unsqueeze(0) # y变为(1, 4),然后广播为(3, 4)
# 显式广播
x = torch.randn(3, 1)
y = torch.randn(1, 4)
z = torch.broadcast_tensors(x, y) # 都变为(3, 4)
1.6 内存管理¶
Python
# 检查存储是否连续
x = torch.randn(4, 5)
print(x.is_contiguous()) # True
y = x.t()
print(y.is_contiguous()) # False
y = y.contiguous() # 变为连续存储
# 共享内存 vs 复制
x = torch.randn(3, 4)
y = x.view(2, 6) # 共享内存
y[0, 0] = 999
print(x[0, 0]) # 999,x也被修改
y = x.clone() # 深拷贝
y[0, 0] = 888
print(x[0, 0]) # 999,x不变
# 原地操作(节省内存)
x.add_(y) # x += y,原地操作
x.mul_(2) # x *= 2
x.zero_() # 清零
# 非原地操作
z = x.add(y) # 创建新张量
z = x + y # 同上
# detach - 切断梯度计算
x = torch.randn(3, requires_grad=True)
y = x.detach() # y不需要梯度
# 内存布局
x = torch.randn(2, 3, 4)
print(x.stride()) # (12, 4, 1) - 行优先(C风格)
# 创建列优先张量
y = torch.randn(2, 3).t().contiguous()
1.7 与NumPy互操作¶
Python
import numpy as np
# NumPy数组转Tensor
np_array = np.array([[1, 2], [3, 4]])
tensor = torch.from_numpy(np_array) # 共享内存
# Tensor转NumPy数组
np_array = tensor.numpy() # 共享内存(CPU张量)
np_array = tensor.cpu().numpy() # 先移到CPU
# 注意:共享内存意味着修改一个会影响另一个
np_array[0, 0] = 999
print(tensor[0, 0]) # 999
# 不共享内存的转换
tensor = torch.tensor(np_array) # 复制数据
tensor = torch.as_tensor(np_array) # 尽可能共享内存
2. 自动微分¶
2.1 计算图基础¶
Python
# 创建需要梯度的张量
x = torch.tensor([2.0, 3.0], requires_grad=True)
# 构建计算图
y = x ** 2 # y = [4, 9]
z = y.sum() # z = 13
# 查看计算图
print(x.grad_fn) # None(叶子节点)
print(y.grad_fn) # <PowBackward0 object>
print(z.grad_fn) # <SumBackward0 object>
# 反向传播
z.backward()
# 查看梯度
print(x.grad) # tensor([4., 6.])
# dz/dx = d(x^2)/dx = 2x = [4, 6]
2.2 梯度累积与清零¶
Python
x = torch.tensor([2.0], requires_grad=True)
# 第一次前向传播
y = x ** 2
y.backward()
print(x.grad) # tensor([4.])
# 不清零梯度,再次反向传播
y = x ** 2
y.backward()
print(x.grad) # tensor([8.]) - 梯度累积了!
# 正确做法:每次反向传播前清零
x.grad.zero_()
y = x ** 2
y.backward()
print(x.grad) # tensor([4.])
2.3 非标量反向传播¶
Python
x = torch.tensor([2.0, 3.0], requires_grad=True)
y = x ** 2 # y = [4, 9]
# y是向量,需要传入gradient参数
v = torch.tensor([1.0, 1.0]) # 权重
y.backward(v)
print(x.grad) # tensor([4., 6.])
# 或者先求和再反向传播
x.grad.zero_()
z = y.sum()
z.backward()
print(x.grad) # tensor([4., 6.])
2.4 禁用梯度计算¶
Python
# 方式1: requires_grad_(False)
x = torch.tensor([2.0], requires_grad=True)
x.requires_grad_(False)
# 方式2: detach()
x = torch.tensor([2.0], requires_grad=True)
y = x.detach()
print(y.requires_grad) # False
# 方式3: torch.no_grad()上下文
x = torch.tensor([2.0], requires_grad=True)
with torch.no_grad():
y = x ** 2
print(y.requires_grad) # False
# 方式4: torch.inference_mode()(PyTorch 1.9+,推荐用于推理)
with torch.inference_mode():
y = x ** 2
# 用途:推理时节省内存,提高速度
model.eval()
with torch.no_grad():
predictions = model(data)
2.5 保留计算图¶
Python
x = torch.tensor([2.0], requires_grad=True)
# 默认情况下,backward()后计算图被释放
y = x ** 3
y.backward()
# y.backward() # 错误!计算图已释放
# 保留计算图
x.grad.zero_()
y = x ** 3
y.backward(retain_graph=True)
print(x.grad) # tensor([12.])
y.backward() # 可以再次反向传播
print(x.grad) # tensor([24.]) - 梯度累积
2.6 自定义自动微分函数¶
Python
class CustomFunction(torch.autograd.Function):
@staticmethod
def forward(ctx, input):
# 保存张量供反向传播使用
ctx.save_for_backward(input)
return input ** 2
@staticmethod
def backward(ctx, grad_output):
# 获取保存的张量
input, = ctx.saved_tensors
# 计算梯度: d(x^2)/dx = 2x
grad_input = grad_output * 2 * input
return grad_input
# 使用自定义函数
x = torch.tensor([2.0, 3.0], requires_grad=True)
y = CustomFunction.apply(x)
z = y.sum()
z.backward()
print(x.grad) # tensor([4., 6.])
2.7 梯度检查点(内存优化)¶
Python
from torch.utils.checkpoint import checkpoint
# 对于很深的网络,使用梯度检查点节省内存
class LargeModel(torch.nn.Module):
def __init__(self):
super().__init__() # super()调用父类方法,常用于继承中的初始化
self.layers = torch.nn.ModuleList([
torch.nn.Linear(1000, 1000) for _ in range(100)
])
def forward(self, x):
for i, layer in enumerate(self.layers):
# 每10层使用一个检查点
if i % 10 == 0:
x = checkpoint(layer, x)
else:
x = layer(x)
x = torch.relu(x)
return x
3. 数据加载与处理¶
3.1 Dataset类¶
Python
from torch.utils.data import Dataset
import os
from PIL import Image
class CustomDataset(Dataset):
def __init__(self, data_dir, transform=None):
self.data_dir = data_dir
self.transform = transform
self.samples = [] # 存储(路径, 标签)列表
# 加载数据索引
for label in os.listdir(data_dir):
label_dir = os.path.join(data_dir, label)
for filename in os.listdir(label_dir):
filepath = os.path.join(label_dir, filename)
self.samples.append((filepath, int(label)))
def __len__(self): # __len__和__getitem__使对象支持len()和索引操作
return len(self.samples)
def __getitem__(self, idx):
filepath, label = self.samples[idx]
# 加载数据
image = Image.open(filepath).convert('RGB')
# 应用变换
if self.transform:
image = self.transform(image)
return image, label
# 使用
from torchvision import transforms
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
dataset = CustomDataset('data/train', transform=transform)
print(len(dataset))
image, label = dataset[0]
print(image.shape, label)
3.2 内置Dataset¶
Python
from torchvision import datasets
# MNIST
mnist = datasets.MNIST(
root='data',
train=True,
download=True,
transform=transforms.ToTensor()
)
# CIFAR-10
cifar = datasets.CIFAR10(
root='data',
train=True,
download=True,
transform=transforms.ToTensor()
)
# ImageFolder(按文件夹组织的图像数据)
image_dataset = datasets.ImageFolder(
root='data/train',
transform=transform
)
3.3 DataLoader¶
Python
from torch.utils.data import DataLoader
# 基本使用
dataloader = DataLoader(
dataset,
batch_size=32, # 批次大小
shuffle=True, # 是否打乱
num_workers=4, # 数据加载进程数
pin_memory=True, # 将数据固定在CUDA内存(加速GPU训练)
drop_last=True # 丢弃不完整的最后一个批次
)
# 迭代
for batch_idx, (images, labels) in enumerate(dataloader): # enumerate()同时获取索引和值
print(f"Batch {batch_idx}: {images.shape}, {labels.shape}")
# images: (32, 3, 224, 224)
# labels: (32,)
# 获取一个批次
dataiter = iter(dataloader)
images, labels = next(dataiter)
3.4 数据增强¶
Python
from torchvision import transforms
# 训练时的变换
train_transform = transforms.Compose([
transforms.RandomResizedCrop(224), # 随机裁剪
transforms.RandomHorizontalFlip(), # 随机水平翻转
transforms.RandomRotation(10), # 随机旋转
transforms.ColorJitter( # 颜色抖动
brightness=0.2,
contrast=0.2,
saturation=0.2
),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# 验证/测试时的变换(通常不做增强)
val_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# 自定义变换
class RandomNoise(object):
def __init__(self, mean=0., std=1.):
self.mean = mean
self.std = std
def __call__(self, tensor): # 定义__call__使实例可像函数一样被调用,用于transforms管道
return tensor + torch.randn(tensor.size()) * self.std + self.mean
train_transform.transforms.append(RandomNoise(std=0.01))
3.5 自定义Sampler¶
Python
from torch.utils.data import Sampler
import random
class BalancedSampler(Sampler):
"""类别平衡的采样器"""
def __init__(self, dataset, num_samples_per_class):
self.num_samples_per_class = num_samples_per_class
# 按类别组织索引
self.class_indices = {}
for idx, (_, label) in enumerate(dataset):
if label not in self.class_indices:
self.class_indices[label] = []
self.class_indices[label].append(idx)
self.num_classes = len(self.class_indices)
self.length = self.num_classes * num_samples_per_class
def __iter__(self):
indices = []
for _ in range(self.num_samples_per_class):
for class_idx in range(self.num_classes):
idx = random.choice(self.class_indices[class_idx])
indices.append(idx)
random.shuffle(indices)
return iter(indices)
def __len__(self):
return self.length
# 使用
from torch.utils.data import DataLoader
sampler = BalancedSampler(dataset, num_samples_per_class=100)
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)
3.6 处理大型数据集¶
Python
# 方式1: 使用LMDB/TFRecord等格式
# 方式2: 惰性加载
class LazyDataset(Dataset):
"""只在需要时加载数据"""
def __init__(self, file_list):
self.file_list = file_list
def __len__(self):
return len(self.file_list)
def __getitem__(self, idx):
# 每次只加载一个样本
filepath = self.file_list[idx]
data = load_data(filepath) # 自定义加载函数
return data
# 方式3: 使用IterableDataset处理流式数据
from torch.utils.data import IterableDataset
class StreamDataset(IterableDataset):
def __init__(self, data_source):
self.data_source = data_source
def __iter__(self):
for data in self.data_source:
yield process(data)
4. 神经网络构建¶
4.1 nn.Module基础¶
Python
import torch.nn as nn
import torch.nn.functional as F
class NeuralNetwork(nn.Module):
def __init__(self, input_size, hidden_size, num_classes):
super(NeuralNetwork, self).__init__()
# 定义层
self.fc1 = nn.Linear(input_size, hidden_size)
self.bn1 = nn.BatchNorm1d(hidden_size)
self.dropout = nn.Dropout(0.5)
self.fc2 = nn.Linear(hidden_size, num_classes)
# 初始化权重
self._initialize_weights()
def forward(self, x):
# 前向传播
x = self.fc1(x)
x = self.bn1(x)
x = F.relu(x)
x = self.dropout(x)
x = self.fc2(x)
return x
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Linear): # isinstance()检查对象是否为指定类型的实例
nn.init.kaiming_normal_(m.weight, mode='fan_out')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
# 创建模型
model = NeuralNetwork(784, 256, 10)
# 查看模型结构
print(model)
# 查看可训练参数
# 生成器表达式:遍历所有参数张量,numel()返回元素个数,sum求总参数量
total_params = sum(p.numel() for p in model.parameters())
# 只统计requires_grad=True(需要梯度更新)的可训练参数量
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total params: {total_params:,}")
print(f"Trainable params: {trainable_params:,}")
4.2 常用层详解¶
卷积层¶
Python
# 2D卷积
conv = nn.Conv2d(
in_channels=3, # 输入通道数
out_channels=64, # 输出通道数
kernel_size=3, # 卷积核大小
stride=1, # 步长
padding=1, # 填充
dilation=1, # 空洞率
groups=1, # 分组数(1为普通卷积,in_channels为深度可分离卷积)
bias=True # 是否使用偏置
)
# 输入: (batch_size, 3, 32, 32)
x = torch.randn(4, 3, 32, 32)
out = conv(x)
print(out.shape) # (4, 64, 32, 32)
# 转置卷积(上采样)
conv_t = nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1)
out = conv_t(torch.randn(4, 64, 16, 16))
print(out.shape) # (4, 3, 32, 32)
归一化层¶
Python
# Batch Normalization
bn1d = nn.BatchNorm1d(num_features=100)
bn2d = nn.BatchNorm2d(num_features=64)
# Layer Normalization(对单个样本的所有特征归一化)
ln = nn.LayerNorm(normalized_shape=[64, 32, 32])
# Instance Normalization(常用于风格迁移)
in_norm = nn.InstanceNorm2d(num_features=64)
# Group Normalization(介于BN和LN之间)
gn = nn.GroupNorm(num_groups=8, num_channels=64)
激活函数¶
Python
# ReLU及其变体
relu = nn.ReLU() # max(0, x)
relu = nn.ReLU(inplace=True) # 原地操作,节省内存
leaky_relu = nn.LeakyReLU(0.1) # max(0.1x, x)
prelu = nn.PReLU() # 可学习的参数化ReLU
elu = nn.ELU() # 指数线性单元
# Sigmoid和Tanh
sigmoid = nn.Sigmoid()
tanh = nn.Tanh()
# Softmax(通常在forward中使用functional版本)
softmax = nn.Softmax(dim=1)
# GELU(Transformer常用)
gelu = nn.GELU()
# 使用方式
x = torch.randn(4, 100)
out = F.relu(self.fc1(x)) # 推荐在forward中使用F
out = self.relu(self.fc1(x)) # 或使用模块版本
池化层¶
Python
# 最大池化
max_pool = nn.MaxPool2d(kernel_size=2, stride=2)
avg_pool = nn.AvgPool2d(kernel_size=2, stride=2)
# 自适应池化(输出指定大小)
adaptive_pool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
# 全局平均池化
gap = nn.AdaptiveAvgPool2d(1)
# 使用
x = torch.randn(4, 64, 32, 32)
out = max_pool(x) # (4, 64, 16, 16)
out = gap(out) # (4, 64, 1, 1)
正则化层¶
Python
# Dropout
dropout = nn.Dropout(p=0.5) # 训练时随机置0,推理时关闭
dropout2d = nn.Dropout2d(p=0.5) # 对通道置0(用于卷积层后)
# DropPath(Stochastic Depth,用于ResNet等)
class DropPath(nn.Module):
def __init__(self, drop_prob=0.):
super().__init__()
self.drop_prob = drop_prob
def forward(self, x):
if self.drop_prob == 0. or not self.training:
return x
keep_prob = 1 - self.drop_prob
shape = (x.shape[0],) + (1,) * (x.ndim - 1)
random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
random_tensor.floor_()
return x.div(keep_prob) * random_tensor
4.3 构建CNN¶
Python
class CNN(nn.Module):
def __init__(self, num_classes=10):
super(CNN, self).__init__()
# 特征提取层
self.features = nn.Sequential(
# Block 1
nn.Conv2d(3, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.Conv2d(64, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
# Block 2
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.Conv2d(128, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
# Block 3
nn.Conv2d(128, 256, kernel_size=3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
)
# 分类器
self.classifier = nn.Sequential(
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(),
nn.Linear(256, 128),
nn.ReLU(inplace=True),
nn.Dropout(0.5),
nn.Linear(128, num_classes)
)
def forward(self, x):
x = self.features(x)
x = self.classifier(x)
return x
model = CNN(num_classes=10)
4.4 构建ResNet¶
Python
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, in_channels, out_channels, stride=1, downsample=None):
super().__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)
self.downsample = downsample
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class ResNet(nn.Module):
def __init__(self, block, layers, num_classes=1000):
super().__init__()
self.in_channels = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 64, layers[0])
self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512 * block.expansion, num_classes)
self._initialize_weights()
def _make_layer(self, block, out_channels, blocks, stride=1):
downsample = None
if stride != 1 or self.in_channels != out_channels * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(self.in_channels, out_channels * block.expansion, 1, stride, bias=False),
nn.BatchNorm2d(out_channels * block.expansion),
)
layers = []
layers.append(block(self.in_channels, out_channels, stride, downsample))
self.in_channels = out_channels * block.expansion
for _ in range(1, blocks):
layers.append(block(self.in_channels, out_channels))
return nn.Sequential(*layers)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out')
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
def resnet18(num_classes=1000):
return ResNet(BasicBlock, [2, 2, 2, 2], num_classes)
4.5 参数初始化¶
Python
# 常用初始化方法
for m in model.modules():
if isinstance(m, nn.Conv2d):
# Kaiming初始化(推荐用于ReLU)
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
# 或Xavier初始化
# nn.init.xavier_normal_(m.weight)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.constant_(m.bias, 0)
# 使用apply进行初始化
def weights_init(m):
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight)
model.apply(weights_init)
5. 损失函数与优化器¶
5.1 损失函数¶
Python
import torch.nn as nn
# 分类损失
ce_loss = nn.CrossEntropyLoss() # 交叉熵(包含Softmax)
bce_loss = nn.BCELoss() # 二分类交叉熵
bce_with_logits = nn.BCEWithLogitsLoss() # 带logits的二分类交叉熵(更稳定)
nll_loss = nn.NLLLoss() # 负对数似然
# 回归损失
mse_loss = nn.MSELoss() # 均方误差
mae_loss = nn.L1Loss() # 平均绝对误差
smooth_l1 = nn.SmoothL1Loss() # 平滑L1(Huber损失)
# 其他损失
kl_div = nn.KLDivLoss() # KL散度
margin_ranking = nn.MarginRankingLoss() # 排序损失
triplet_margin = nn.TripletMarginLoss() # 三元组损失
# 使用示例
# 分类
outputs = model(images) # (batch_size, num_classes)
labels = torch.randint(0, 10, (32,)) # (batch_size,)
loss = ce_loss(outputs, labels)
# 回归
predictions = model(inputs) # (batch_size, 1)
targets = torch.randn(32, 1)
loss = mse_loss(predictions, targets)
# 带权重的损失
weights = torch.tensor([1.0, 2.0, 3.0]) # 类别权重
ce_loss_weighted = nn.CrossEntropyLoss(weight=weights)
# 自定义损失
class FocalLoss(nn.Module):
def __init__(self, alpha=1, gamma=2):
super().__init__()
self.alpha = alpha
self.gamma = gamma
self.ce = nn.CrossEntropyLoss(reduction='none')
def forward(self, inputs, targets):
ce_loss = self.ce(inputs, targets)
pt = torch.exp(-ce_loss)
focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
return focal_loss.mean()
5.2 优化器¶
Python
import torch.optim as optim
# SGD
optimizer = optim.SGD(
model.parameters(),
lr=0.01, # 学习率
momentum=0.9, # 动量
weight_decay=1e-4, # L2正则化
nesterov=True # Nesterov动量
)
# Adam
optimizer = optim.Adam(
model.parameters(),
lr=1e-3,
betas=(0.9, 0.999), # 一阶和二阶矩估计的衰减率
eps=1e-8,
weight_decay=1e-4
)
# AdamW(推荐,解耦权重衰减)
optimizer = optim.AdamW(
model.parameters(),
lr=1e-3,
weight_decay=0.01 # 真正的权重衰减,不是L2正则
)
# RMSprop
optimizer = optim.RMSprop(
model.parameters(),
lr=1e-2,
alpha=0.99,
momentum=0.9
)
# 不同层使用不同学习率
optim.SGD([
{'params': model.features.parameters(), 'lr': 1e-4}, # 预训练层,小学习率
{'params': model.classifier.parameters(), 'lr': 1e-2} # 新层,大学习率
], momentum=0.9)
5.3 学习率调度¶
Python
from torch.optim.lr_scheduler import *
# StepLR - 每隔step_size个epoch,学习率乘以gamma
scheduler = StepLR(optimizer, step_size=30, gamma=0.1)
# MultiStepLR - 在指定里程碑降低学习率
scheduler = MultiStepLR(optimizer, milestones=[30, 80], gamma=0.1)
# ExponentialLR - 指数衰减
scheduler = ExponentialLR(optimizer, gamma=0.95)
# CosineAnnealingLR - 余弦退火
scheduler = CosineAnnealingLR(optimizer, T_max=100, eta_min=0)
# ReduceLROnPlateau - 验证集不下降时降低学习率
scheduler = ReduceLROnPlateau(
optimizer,
mode='min', # 'min'或'max'
factor=0.1, # 学习率缩放因子
patience=10, # 等待多少个epoch
verbose=True
)
# OneCycleLR - 先增后降(推荐用于快速收敛)
scheduler = OneCycleLR(
optimizer,
max_lr=0.1,
epochs=10,
steps_per_epoch=len(train_loader)
)
# Warmup + Cosine
from torch.optim.lr_scheduler import LambdaLR
def get_cosine_schedule_with_warmup(optimizer, num_warmup_steps, num_training_steps):
def lr_lambda(current_step):
if current_step < num_warmup_steps:
return float(current_step) / float(max(1, num_warmup_steps))
progress = float(current_step - num_warmup_steps) / float(max(1, num_training_steps - num_warmup_steps))
return max(0.0, 0.5 * (1.0 + math.cos(math.pi * progress)))
return LambdaLR(optimizer, lr_lambda)
# 使用
for epoch in range(num_epochs):
for batch in train_loader:
# ... 训练代码 ...
optimizer.step()
scheduler.step() # 注意:OneCycleLR每个batch都要step()
# 其他调度器每个epoch step
# scheduler.step()
# ReduceLROnPlateau需要传入验证指标
# scheduler.step(val_loss)
5.4 梯度裁剪¶
Python
# 在反向传播后,优化器step前进行梯度裁剪
loss.backward()
# 按值裁剪
torch.nn.utils.clip_grad_value_(model.parameters(), clip_value=1.0)
# 按范数裁剪(更常用)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0, norm_type=2)
optimizer.step()
6. 完整训练流程¶
6.1 基础训练循环¶
Python
def train_epoch(model, train_loader, criterion, optimizer, device):
model.train()
running_loss = 0.0
correct = 0
total = 0
for batch_idx, (inputs, targets) in enumerate(train_loader):
inputs, targets = inputs.to(device), targets.to(device)
# 前向传播
outputs = model(inputs)
loss = criterion(outputs, targets)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 统计
running_loss += loss.item()
_, predicted = outputs.max(1) # max(1)按维度1取最大值,_忽略最大值,predicted取索引(预测类别)
total += targets.size(0)
correct += predicted.eq(targets).sum().item() # 逐元素比较→布尔值求和(True=1)→.item()转为Python int
if batch_idx % 100 == 0:
print(f'Batch {batch_idx}/{len(train_loader)}, Loss: {loss.item():.4f}')
epoch_loss = running_loss / len(train_loader)
epoch_acc = 100. * correct / total
return epoch_loss, epoch_acc
def validate(model, val_loader, criterion, device):
model.eval()
running_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for inputs, targets in val_loader:
inputs, targets = inputs.to(device), targets.to(device)
outputs = model(inputs)
loss = criterion(outputs, targets)
running_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
epoch_loss = running_loss / len(val_loader)
epoch_acc = 100. * correct / total
return epoch_loss, epoch_acc
# 主训练循环
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)
best_acc = 0.0
for epoch in range(num_epochs):
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
val_loss, val_acc = validate(model, val_loader, criterion, device)
scheduler.step()
print(f'Epoch {epoch+1}/{num_epochs}:')
print(f' Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
print(f' Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
# 保存最佳模型
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), 'best_model.pth')
6.2 模型保存与加载¶
Python
# 保存模型(推荐方式:只保存参数)
torch.save(model.state_dict(), 'model.pth')
# 加载模型
model = MyModel()
model.load_state_dict(torch.load('model.pth', weights_only=True))
model.eval()
# 保存完整模型(包括结构)— 不推荐,存在安全风险
torch.save(model, 'model_complete.pth')
# 加载完整模型(需显式设置 weights_only=False)
model = torch.load('model_complete.pth', weights_only=False)
# 保存检查点(包含优化器状态、epoch等)— 推荐方式
checkpoint = {
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict(),
'best_acc': best_acc,
}
torch.save(checkpoint, 'checkpoint.pth')
# 恢复训练
# 注意:checkpoint包含优化器等复杂对象,需要weights_only=False
# 仅加载来自可信来源的checkpoint文件
checkpoint = torch.load('checkpoint.pth', weights_only=False)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
start_epoch = checkpoint['epoch'] + 1
best_acc = checkpoint['best_acc']
6.3 早停机制¶
Python
class EarlyStopping:
def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt'):
self.patience = patience
self.verbose = verbose
self.delta = delta
self.path = path
self.counter = 0
self.best_score = None
self.early_stop = False
self.val_loss_min = float('inf')
def __call__(self, val_loss, model):
score = -val_loss
if self.best_score is None:
self.best_score = score
self.save_checkpoint(val_loss, model)
elif score < self.best_score + self.delta:
self.counter += 1
print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(val_loss, model)
self.counter = 0
def save_checkpoint(self, val_loss, model):
if self.verbose:
print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model...')
torch.save(model.state_dict(), self.path)
self.val_loss_min = val_loss
# 使用
early_stopping = EarlyStopping(patience=10, verbose=True)
for epoch in range(num_epochs):
train(...)
val_loss = validate(...)
early_stopping(val_loss, model)
if early_stopping.early_stop:
print("Early stopping triggered")
break
6.4 训练可视化¶
Python
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter('runs/experiment_1')
for epoch in range(num_epochs):
# ... 训练代码 ...
# 记录标量
writer.add_scalar('Loss/train', train_loss, epoch)
writer.add_scalar('Loss/val', val_loss, epoch)
writer.add_scalar('Accuracy/train', train_acc, epoch)
writer.add_scalar('Accuracy/val', val_acc, epoch)
writer.add_scalar('Learning_rate', optimizer.param_groups[0]['lr'], epoch)
# 记录直方图
for name, param in model.named_parameters():
writer.add_histogram(name, param, epoch)
# 记录图像
writer.add_image('Input', images[0], epoch)
# 记录模型图
if epoch == 0:
writer.add_graph(model, images)
writer.close()
# 启动TensorBoard
# tensorboard --logdir=runs
7. GPU与混合精度训练¶
7.1 CUDA基础¶
Python
# 检查CUDA是否可用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# 查看GPU信息
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
# 将模型和数据移到GPU
model = model.to(device)
inputs = inputs.to(device)
labels = labels.to(device)
# 或者一次性创建在GPU上
x = torch.randn(3, 4, device='cuda')
# 多GPU
if torch.cuda.device_count() > 1:
print(f"Using {torch.cuda.device_count()} GPUs")
model = nn.DataParallel(model)
7.2 混合精度训练¶
Python
from torch.amp import autocast, GradScaler
# 创建GradScaler
scaler = GradScaler('cuda')
for epoch in range(num_epochs):
for inputs, labels in train_loader:
inputs = inputs.cuda()
labels = labels.cuda()
optimizer.zero_grad()
# 使用autocast进行前向传播
with autocast(device_type='cuda'):
outputs = model(inputs)
loss = criterion(outputs, labels)
# 缩放损失并反向传播
scaler.scale(loss).backward()
# 梯度裁剪(可选)
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 更新权重
scaler.step(optimizer)
scaler.update()
scheduler.step()
7.3 分布式训练¶
Python
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
def setup_distributed():
dist.init_process_group(backend='nccl')
local_rank = int(os.environ['LOCAL_RANK'])
torch.cuda.set_device(local_rank)
return local_rank
def main():
local_rank = setup_distributed()
# 创建模型并包装为DDP
model = MyModel().cuda(local_rank)
model = DDP(model, device_ids=[local_rank])
# 使用DistributedSampler
train_sampler = DistributedSampler(dataset)
train_loader = DataLoader(
dataset,
batch_size=batch_size,
sampler=train_sampler
)
# 训练
for epoch in range(num_epochs):
train_sampler.set_epoch(epoch) # 每个epoch打乱数据
for inputs, labels in train_loader:
inputs = inputs.cuda(local_rank)
labels = labels.cuda(local_rank)
# ... 训练代码 ...
if __name__ == '__main__':
main()
# 启动命令
# torchrun --nproc_per_node=4 train.py
8. 模型部署基础¶
8.1 模型导出¶
Python
# TorchScript(推荐用于Python环境)
model.eval()
example_input = torch.randn(1, 3, 224, 224)
# 使用tracing
traced_model = torch.jit.trace(model, example_input)
traced_model.save('model_traced.pt')
# 使用scripting(支持控制流)
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')
# 加载TorchScript模型
loaded_model = torch.jit.load('model_traced.pt')
output = loaded_model(input_tensor)
8.2 ONNX导出¶
Python
# 导出为ONNX格式
model.eval()
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
model,
dummy_input,
'model.onnx',
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
# 验证ONNX模型
import onnx
onnx_model = onnx.load('model.onnx')
onnx.checker.check_model(onnx_model)
8.3 推理优化¶
Python
# 1. 使用eval模式
model.eval()
# 2. 禁用梯度计算
with torch.no_grad():
output = model(input)
# 3. 使用TorchScript
model = torch.jit.script(model)
# 4. 使用半精度(推理时)
model = model.half()
input = input.half()
# 5. 批处理推理
def batch_predict(model, inputs, batch_size=32):
results = []
for i in range(0, len(inputs), batch_size):
batch = inputs[i:i+batch_size]
with torch.no_grad():
output = model(batch)
results.append(output)
return torch.cat(results)
📝 练习¶
练习1: 张量操作¶
Python
# 1. 创建一个形状为(4, 3, 32, 32)的随机张量,模拟一批图像
# 2. 将其变形为(4, 3, 1024),然后计算每个图像的均值和标准差
# 3. 提取每个图像的中心区域(16, 16)
# 4. 将结果归一化到[0, 1]范围
练习2: 自动微分¶
Python
# 实现一个简单的线性回归
# 1. 生成随机数据 y = 2x + 1 + noise
# 2. 使用PyTorch的自动微分训练模型
# 3. 绘制损失曲线
# 4. 比较手动计算梯度和autograd的结果
练习3: 自定义Dataset¶
练习4: 构建CNN¶
练习5: 完整训练流程¶
Python
# 1. 实现包含以下功能的训练脚本:
# - 命令行参数解析
# - 日志记录
# - 模型检查点保存
# - 早停
# - 学习率调度
# - TensorBoard可视化
# 2. 在MNIST或CIFAR-10上验证
🎯 自我检查¶
- 深入理解张量的内存布局和操作
- 掌握自动微分的原理和使用
- 能自定义Dataset和DataLoader
- 能构建复杂的神经网络架构
- 掌握完整的训练流程
- 会使用GPU和混合精度训练
- 了解模型部署基础
📚 延伸阅读¶
下一步: 02 - scikit-learn