跳转至

01 - PyTorch基础

学习时间: 12-16小时 重要性: ⭐⭐⭐⭐⭐ 深度学习框架的基石


🎯 学习目标

  • 深入理解张量操作和内存管理
  • 掌握自动微分机制
  • 熟练使用Dataset和DataLoader
  • 构建复杂的神经网络
  • 实现完整的训练和推理流程
  • 掌握GPU加速和混合精度训练
  • 了解模型部署基础

📚 内容概览

  1. 张量基础
  2. 自动微分
  3. 数据加载与处理
  4. 神经网络构建
  5. 损失函数与优化器
  6. 完整训练流程
  7. GPU与混合精度训练
  8. 模型部署基础

1. 张量基础

1.1 张量创建

Python
import torch
import numpy as np

# 从列表创建
data = [[1, 2], [3, 4]]
x = torch.tensor(data)
print(x)
# tensor([[1, 2],
#         [3, 4]])

# 创建特定形状的张量
zeros = torch.zeros(3, 4)          # 3x4零矩阵
ones = torch.ones(2, 3, 4)         # 2x3x4全1张量
rand = torch.rand(3, 3)            # 3x3均匀分布随机数
randn = torch.randn(3, 3)          # 3x3标准正态分布

# 创建序列
arange = torch.arange(0, 10, 2)    # [0, 2, 4, 6, 8]
linspace = torch.linspace(0, 1, 5) # [0.0, 0.25, 0.5, 0.75, 1.0]

# 创建与现有张量相同形状
x_like = torch.zeros_like(x)       # 与x形状相同的全0张量
x_empty = torch.empty_like(x)      # 未初始化的张量

1.2 张量属性

Python
x = torch.randn(3, 4, 5)

print(x.shape)      # torch.Size([3, 4, 5])
print(x.dtype)      # torch.float32
print(x.device)     # cpu / cuda:0
print(x.ndim)       # 3 (维度数)
print(x.numel())    # 60 (元素总数)
print(x.element_size())  # 4 (每个元素字节数)
print(x.stride())   # (20, 5, 1) (步长)
print(x.is_contiguous())  # True (是否连续存储)

1.3 数据类型

Python
# 常见数据类型
torch.float32   # 默认浮点型
torch.float64   # 双精度浮点型
torch.int32     # 32位整数
torch.int64     # 64位整数(默认整数类型)
torch.bool      # 布尔型
torch.complex64 # 复数

# 类型转换
x = torch.tensor([1, 2, 3])
x_float = x.float()      # 转为float32
x_double = x.double()    # 转为float64
x_int = x.int()          # 转为int32
x_long = x.long()        # 转为int64

# 在创建时指定类型
x = torch.tensor([1, 2, 3], dtype=torch.float32)
x = torch.zeros(3, 4, dtype=torch.int64)

1.4 张量操作

索引与切片

Python
x = torch.randn(4, 5, 6)

# 基本索引
print(x[0])         # 第一行
print(x[0, 1])      # 第0行第1列
print(x[0, 1, 2])   # 具体元素

# 切片
print(x[1:3])           # 第1-2行
print(x[:, 2:4])        # 所有行,第2-3列
print(x[::2])           # 每隔一行
print(x[..., -1])       # 最后一维的最后一个元素

# 高级索引
indices = torch.tensor([0, 2, 3])
print(x[indices])       # 取第0, 2, 3行

mask = x > 0
print(x[mask])          # 取所有正数元素

# 使用torch.index_select
torch.index_select(x, 0, indices)  # 沿第0维选择

变形操作

Python
x = torch.randn(6, 4)

# view - 共享内存,要求连续存储
y = x.view(2, 12)       # 变形为2x12
y = x.view(-1, 3)       # -1自动计算

# reshape - 类似view,但可处理非连续张量
y = x.reshape(3, 8)

# transpose - 转置
y = x.t()               # 2D转置
y = x.transpose(0, 1)   # 交换维度0和1

# permute - 任意维度重排
x = torch.randn(2, 3, 4)
y = x.permute(2, 0, 1)  # 变为(4, 2, 3)

# squeeze/unsqueeze - 删除/添加维度
x = torch.randn(1, 3, 1, 4)
y = x.squeeze()         # 删除所有大小为1的维度 -> (3, 4)
y = x.squeeze(0)        # 删除第0维 -> (3, 1, 4)
y = x.unsqueeze(0)      # 在第0维添加维度 -> (1, 1, 3, 1, 4)

# expand/expand_as - 广播扩展
x = torch.randn(1, 3)
y = x.expand(4, 3)      # 扩展为(4, 3),不复制数据

# flatten - 展平
x = torch.randn(2, 3, 4)
y = x.flatten()         # 展平为(24,)
y = x.flatten(1)        # 从第1维开始展平 -> (2, 12)

数学运算

Python
x = torch.randn(3, 4)
y = torch.randn(3, 4)

# 逐元素运算
z = x + y
z = x - y
z = x * y       # 逐元素乘法(Hadamard积)
z = x / y
z = x ** 2
z = torch.pow(x, 2)

# 矩阵乘法
z = x @ y.t()   # 矩阵乘法
z = torch.matmul(x, y.t())
z = torch.mm(x, y.t())  # 仅适用于2D

# 批量矩阵乘法
x = torch.randn(10, 3, 4)
y = torch.randn(10, 4, 5)
z = torch.bmm(x, y)     # (10, 3, 5)

# 通用矩阵乘法(支持广播)
z = torch.matmul(x, y)

# 点积、叉积
a = torch.randn(3)
b = torch.randn(3)
dot = torch.dot(a, b)
cross = torch.cross(a, b)

# 求和、平均、极值
sum_all = x.sum()
sum_dim = x.sum(dim=1)          # 沿第1维求和
mean = x.mean(dim=1, keepdim=True)  # 保持维度
max_val, max_idx = x.max(dim=1)     # 最大值和索引
min_val = x.min()

# 累积运算
cumsum = x.cumsum(dim=0)        # 累积和
cumprod = x.cumprod(dim=0)      # 累积积

# 比较运算
mask = x > 0
equal = torch.equal(x, y)       # 完全相等
close = torch.allclose(x, y, atol=1e-5)  # 近似相等

1.5 广播机制

Python
# 广播规则:从后向前比较维度,要么相等,要么其中一个为1

# 示例1: (3, 4) + (4,) -> (3, 4)
x = torch.randn(3, 4)
y = torch.randn(4)
z = x + y  # y被广播为(3, 4)

# 示例2: (3, 1, 4) + (1, 5, 1) -> (3, 5, 4)
x = torch.randn(3, 1, 4)
y = torch.randn(1, 5, 1)
z = x + y

# 示例3: 使用unsqueeze进行广播
x = torch.randn(3, 4)
y = torch.randn(4)
z = x + y.unsqueeze(0)  # y变为(1, 4),然后广播为(3, 4)

# 显式广播
x = torch.randn(3, 1)
y = torch.randn(1, 4)
z = torch.broadcast_tensors(x, y)  # 都变为(3, 4)

1.6 内存管理

Python
# 检查存储是否连续
x = torch.randn(4, 5)
print(x.is_contiguous())  # True

y = x.t()
print(y.is_contiguous())  # False
y = y.contiguous()        # 变为连续存储

# 共享内存 vs 复制
x = torch.randn(3, 4)
y = x.view(2, 6)        # 共享内存
y[0, 0] = 999
print(x[0, 0])          # 999,x也被修改

y = x.clone()           # 深拷贝
y[0, 0] = 888
print(x[0, 0])          # 999,x不变

# 原地操作(节省内存)
x.add_(y)       # x += y,原地操作
x.mul_(2)       # x *= 2
x.zero_()       # 清零

# 非原地操作
z = x.add(y)    # 创建新张量
z = x + y       # 同上

# detach - 切断梯度计算
x = torch.randn(3, requires_grad=True)
y = x.detach()  # y不需要梯度

# 内存布局
x = torch.randn(2, 3, 4)
print(x.stride())  # (12, 4, 1) - 行优先(C风格)

# 创建列优先张量
y = torch.randn(2, 3).t().contiguous()

1.7 与NumPy互操作

Python
import numpy as np

# NumPy数组转Tensor
np_array = np.array([[1, 2], [3, 4]])
tensor = torch.from_numpy(np_array)  # 共享内存

# Tensor转NumPy数组
np_array = tensor.numpy()  # 共享内存(CPU张量)
np_array = tensor.cpu().numpy()  # 先移到CPU

# 注意:共享内存意味着修改一个会影响另一个
np_array[0, 0] = 999
print(tensor[0, 0])  # 999

# 不共享内存的转换
tensor = torch.tensor(np_array)  # 复制数据
tensor = torch.as_tensor(np_array)  # 尽可能共享内存

2. 自动微分

2.1 计算图基础

Python
# 创建需要梯度的张量
x = torch.tensor([2.0, 3.0], requires_grad=True)

# 构建计算图
y = x ** 2        # y = [4, 9]
z = y.sum()       # z = 13

# 查看计算图
print(x.grad_fn)  # None(叶子节点)
print(y.grad_fn)  # <PowBackward0 object>
print(z.grad_fn)  # <SumBackward0 object>

# 反向传播
z.backward()

# 查看梯度
print(x.grad)     # tensor([4., 6.])
# dz/dx = d(x^2)/dx = 2x = [4, 6]

2.2 梯度累积与清零

Python
x = torch.tensor([2.0], requires_grad=True)

# 第一次前向传播
y = x ** 2
y.backward()
print(x.grad)  # tensor([4.])

# 不清零梯度,再次反向传播
y = x ** 2
y.backward()
print(x.grad)  # tensor([8.]) - 梯度累积了!

# 正确做法:每次反向传播前清零
x.grad.zero_()
y = x ** 2
y.backward()
print(x.grad)  # tensor([4.])

2.3 非标量反向传播

Python
x = torch.tensor([2.0, 3.0], requires_grad=True)
y = x ** 2  # y = [4, 9]

# y是向量,需要传入gradient参数
v = torch.tensor([1.0, 1.0])  # 权重
y.backward(v)
print(x.grad)  # tensor([4., 6.])

# 或者先求和再反向传播
x.grad.zero_()
z = y.sum()
z.backward()
print(x.grad)  # tensor([4., 6.])

2.4 禁用梯度计算

Python
# 方式1: requires_grad_(False)
x = torch.tensor([2.0], requires_grad=True)
x.requires_grad_(False)

# 方式2: detach()
x = torch.tensor([2.0], requires_grad=True)
y = x.detach()
print(y.requires_grad)  # False

# 方式3: torch.no_grad()上下文
x = torch.tensor([2.0], requires_grad=True)
with torch.no_grad():
    y = x ** 2
print(y.requires_grad)  # False

# 方式4: torch.inference_mode()(PyTorch 1.9+,推荐用于推理)
with torch.inference_mode():
    y = x ** 2

# 用途:推理时节省内存,提高速度
model.eval()
with torch.no_grad():
    predictions = model(data)

2.5 保留计算图

Python
x = torch.tensor([2.0], requires_grad=True)

# 默认情况下,backward()后计算图被释放
y = x ** 3
y.backward()
# y.backward()  # 错误!计算图已释放

# 保留计算图
x.grad.zero_()
y = x ** 3
y.backward(retain_graph=True)
print(x.grad)  # tensor([12.])

y.backward()   # 可以再次反向传播
print(x.grad)  # tensor([24.]) - 梯度累积

2.6 自定义自动微分函数

Python
class CustomFunction(torch.autograd.Function):
    @staticmethod
    def forward(ctx, input):
        # 保存张量供反向传播使用
        ctx.save_for_backward(input)
        return input ** 2

    @staticmethod
    def backward(ctx, grad_output):
        # 获取保存的张量
        input, = ctx.saved_tensors
        # 计算梯度: d(x^2)/dx = 2x
        grad_input = grad_output * 2 * input
        return grad_input

# 使用自定义函数
x = torch.tensor([2.0, 3.0], requires_grad=True)
y = CustomFunction.apply(x)
z = y.sum()
z.backward()
print(x.grad)  # tensor([4., 6.])

2.7 梯度检查点(内存优化)

Python
from torch.utils.checkpoint import checkpoint

# 对于很深的网络,使用梯度检查点节省内存
class LargeModel(torch.nn.Module):
    def __init__(self):
        super().__init__()  # super()调用父类方法,常用于继承中的初始化
        self.layers = torch.nn.ModuleList([
            torch.nn.Linear(1000, 1000) for _ in range(100)
        ])

    def forward(self, x):
        for i, layer in enumerate(self.layers):
            # 每10层使用一个检查点
            if i % 10 == 0:
                x = checkpoint(layer, x)
            else:
                x = layer(x)
            x = torch.relu(x)
        return x

3. 数据加载与处理

3.1 Dataset类

Python
from torch.utils.data import Dataset
import os
from PIL import Image

class CustomDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.samples = []  # 存储(路径, 标签)列表

        # 加载数据索引
        for label in os.listdir(data_dir):
            label_dir = os.path.join(data_dir, label)
            for filename in os.listdir(label_dir):
                filepath = os.path.join(label_dir, filename)
                self.samples.append((filepath, int(label)))

    def __len__(self):  # __len__和__getitem__使对象支持len()和索引操作
        return len(self.samples)

    def __getitem__(self, idx):
        filepath, label = self.samples[idx]

        # 加载数据
        image = Image.open(filepath).convert('RGB')

        # 应用变换
        if self.transform:
            image = self.transform(image)

        return image, label

# 使用
from torchvision import transforms

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225])
])

dataset = CustomDataset('data/train', transform=transform)
print(len(dataset))
image, label = dataset[0]
print(image.shape, label)

3.2 内置Dataset

Python
from torchvision import datasets

# MNIST
mnist = datasets.MNIST(
    root='data',
    train=True,
    download=True,
    transform=transforms.ToTensor()
)

# CIFAR-10
cifar = datasets.CIFAR10(
    root='data',
    train=True,
    download=True,
    transform=transforms.ToTensor()
)

# ImageFolder(按文件夹组织的图像数据)
image_dataset = datasets.ImageFolder(
    root='data/train',
    transform=transform
)

3.3 DataLoader

Python
from torch.utils.data import DataLoader

# 基本使用
dataloader = DataLoader(
    dataset,
    batch_size=32,      # 批次大小
    shuffle=True,       # 是否打乱
    num_workers=4,      # 数据加载进程数
    pin_memory=True,    # 将数据固定在CUDA内存(加速GPU训练)
    drop_last=True      # 丢弃不完整的最后一个批次
)

# 迭代
for batch_idx, (images, labels) in enumerate(dataloader):  # enumerate()同时获取索引和值
    print(f"Batch {batch_idx}: {images.shape}, {labels.shape}")
    # images: (32, 3, 224, 224)
    # labels: (32,)

# 获取一个批次
dataiter = iter(dataloader)
images, labels = next(dataiter)

3.4 数据增强

Python
from torchvision import transforms

# 训练时的变换
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),      # 随机裁剪
    transforms.RandomHorizontalFlip(),       # 随机水平翻转
    transforms.RandomRotation(10),           # 随机旋转
    transforms.ColorJitter(                  # 颜色抖动
        brightness=0.2,
        contrast=0.2,
        saturation=0.2
    ),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225])
])

# 验证/测试时的变换(通常不做增强)
val_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225])
])

# 自定义变换
class RandomNoise(object):
    def __init__(self, mean=0., std=1.):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):  # 定义__call__使实例可像函数一样被调用,用于transforms管道
        return tensor + torch.randn(tensor.size()) * self.std + self.mean

train_transform.transforms.append(RandomNoise(std=0.01))

3.5 自定义Sampler

Python
from torch.utils.data import Sampler
import random

class BalancedSampler(Sampler):
    """类别平衡的采样器"""
    def __init__(self, dataset, num_samples_per_class):
        self.num_samples_per_class = num_samples_per_class

        # 按类别组织索引
        self.class_indices = {}
        for idx, (_, label) in enumerate(dataset):
            if label not in self.class_indices:
                self.class_indices[label] = []
            self.class_indices[label].append(idx)

        self.num_classes = len(self.class_indices)
        self.length = self.num_classes * num_samples_per_class

    def __iter__(self):
        indices = []
        for _ in range(self.num_samples_per_class):
            for class_idx in range(self.num_classes):
                idx = random.choice(self.class_indices[class_idx])
                indices.append(idx)
        random.shuffle(indices)
        return iter(indices)

    def __len__(self):
        return self.length

# 使用
from torch.utils.data import DataLoader

sampler = BalancedSampler(dataset, num_samples_per_class=100)
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)

3.6 处理大型数据集

Python
# 方式1: 使用LMDB/TFRecord等格式
# 方式2: 惰性加载

class LazyDataset(Dataset):
    """只在需要时加载数据"""
    def __init__(self, file_list):
        self.file_list = file_list

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        # 每次只加载一个样本
        filepath = self.file_list[idx]
        data = load_data(filepath)  # 自定义加载函数
        return data

# 方式3: 使用IterableDataset处理流式数据
from torch.utils.data import IterableDataset

class StreamDataset(IterableDataset):
    def __init__(self, data_source):
        self.data_source = data_source

    def __iter__(self):
        for data in self.data_source:
            yield process(data)

4. 神经网络构建

4.1 nn.Module基础

Python
import torch.nn as nn
import torch.nn.functional as F

class NeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(NeuralNetwork, self).__init__()

        # 定义层
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.bn1 = nn.BatchNorm1d(hidden_size)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(hidden_size, num_classes)

        # 初始化权重
        self._initialize_weights()

    def forward(self, x):
        # 前向传播
        x = self.fc1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):  # isinstance()检查对象是否为指定类型的实例
                nn.init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)

# 创建模型
model = NeuralNetwork(784, 256, 10)

# 查看模型结构
print(model)

# 查看可训练参数
# 生成器表达式:遍历所有参数张量,numel()返回元素个数,sum求总参数量
total_params = sum(p.numel() for p in model.parameters())
# 只统计requires_grad=True(需要梯度更新)的可训练参数量
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total params: {total_params:,}")
print(f"Trainable params: {trainable_params:,}")

4.2 常用层详解

卷积层

Python
# 2D卷积
conv = nn.Conv2d(
    in_channels=3,      # 输入通道数
    out_channels=64,    # 输出通道数
    kernel_size=3,      # 卷积核大小
    stride=1,           # 步长
    padding=1,          # 填充
    dilation=1,         # 空洞率
    groups=1,           # 分组数(1为普通卷积,in_channels为深度可分离卷积)
    bias=True           # 是否使用偏置
)

# 输入: (batch_size, 3, 32, 32)
x = torch.randn(4, 3, 32, 32)
out = conv(x)
print(out.shape)  # (4, 64, 32, 32)

# 转置卷积(上采样)
conv_t = nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1)
out = conv_t(torch.randn(4, 64, 16, 16))
print(out.shape)  # (4, 3, 32, 32)

归一化层

Python
# Batch Normalization
bn1d = nn.BatchNorm1d(num_features=100)
bn2d = nn.BatchNorm2d(num_features=64)

# Layer Normalization(对单个样本的所有特征归一化)
ln = nn.LayerNorm(normalized_shape=[64, 32, 32])

# Instance Normalization(常用于风格迁移)
in_norm = nn.InstanceNorm2d(num_features=64)

# Group Normalization(介于BN和LN之间)
gn = nn.GroupNorm(num_groups=8, num_channels=64)

激活函数

Python
# ReLU及其变体
relu = nn.ReLU()                    # max(0, x)
relu = nn.ReLU(inplace=True)        # 原地操作,节省内存
leaky_relu = nn.LeakyReLU(0.1)      # max(0.1x, x)
prelu = nn.PReLU()                  # 可学习的参数化ReLU
elu = nn.ELU()                      # 指数线性单元

# Sigmoid和Tanh
sigmoid = nn.Sigmoid()
tanh = nn.Tanh()

# Softmax(通常在forward中使用functional版本)
softmax = nn.Softmax(dim=1)

# GELU(Transformer常用)
gelu = nn.GELU()

# 使用方式
x = torch.randn(4, 100)
out = F.relu(self.fc1(x))  # 推荐在forward中使用F
out = self.relu(self.fc1(x))  # 或使用模块版本

池化层

Python
# 最大池化
max_pool = nn.MaxPool2d(kernel_size=2, stride=2)
avg_pool = nn.AvgPool2d(kernel_size=2, stride=2)

# 自适应池化(输出指定大小)
adaptive_pool = nn.AdaptiveAvgPool2d(output_size=(1, 1))

# 全局平均池化
gap = nn.AdaptiveAvgPool2d(1)

# 使用
x = torch.randn(4, 64, 32, 32)
out = max_pool(x)  # (4, 64, 16, 16)
out = gap(out)     # (4, 64, 1, 1)

正则化层

Python
# Dropout
dropout = nn.Dropout(p=0.5)         # 训练时随机置0,推理时关闭
dropout2d = nn.Dropout2d(p=0.5)     # 对通道置0(用于卷积层后)

# DropPath(Stochastic Depth,用于ResNet等)
class DropPath(nn.Module):
    def __init__(self, drop_prob=0.):
        super().__init__()
        self.drop_prob = drop_prob

    def forward(self, x):
        if self.drop_prob == 0. or not self.training:
            return x
        keep_prob = 1 - self.drop_prob
        shape = (x.shape[0],) + (1,) * (x.ndim - 1)
        random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
        random_tensor.floor_()
        return x.div(keep_prob) * random_tensor

4.3 构建CNN

Python
class CNN(nn.Module):
    def __init__(self, num_classes=10):
        super(CNN, self).__init__()

        # 特征提取层
        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            # Block 2
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            # Block 3
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        # 分类器
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

model = CNN(num_classes=10)

4.4 构建ResNet

Python
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super().__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        self._initialize_weights()

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion, 1, stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion

        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

def resnet18(num_classes=1000):
    return ResNet(BasicBlock, [2, 2, 2, 2], num_classes)

4.5 参数初始化

Python
# 常用初始化方法
for m in model.modules():
    if isinstance(m, nn.Conv2d):
        # Kaiming初始化(推荐用于ReLU)
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        # 或Xavier初始化
        # nn.init.xavier_normal_(m.weight)
    elif isinstance(m, nn.BatchNorm2d):
        nn.init.constant_(m.weight, 1)
        nn.init.constant_(m.bias, 0)
    elif isinstance(m, nn.Linear):
        nn.init.normal_(m.weight, 0, 0.01)
        nn.init.constant_(m.bias, 0)

# 使用apply进行初始化
def weights_init(m):
    if isinstance(m, nn.Conv2d):
        nn.init.kaiming_normal_(m.weight)

model.apply(weights_init)

5. 损失函数与优化器

5.1 损失函数

Python
import torch.nn as nn

# 分类损失
ce_loss = nn.CrossEntropyLoss()         # 交叉熵(包含Softmax)
bce_loss = nn.BCELoss()                 # 二分类交叉熵
bce_with_logits = nn.BCEWithLogitsLoss() # 带logits的二分类交叉熵(更稳定)
nll_loss = nn.NLLLoss()                 # 负对数似然

# 回归损失
mse_loss = nn.MSELoss()                 # 均方误差
mae_loss = nn.L1Loss()                  # 平均绝对误差
smooth_l1 = nn.SmoothL1Loss()           # 平滑L1(Huber损失)

# 其他损失
kl_div = nn.KLDivLoss()                 # KL散度
margin_ranking = nn.MarginRankingLoss() # 排序损失
triplet_margin = nn.TripletMarginLoss() # 三元组损失

# 使用示例
# 分类
outputs = model(images)                 # (batch_size, num_classes)
labels = torch.randint(0, 10, (32,))    # (batch_size,)
loss = ce_loss(outputs, labels)

# 回归
predictions = model(inputs)             # (batch_size, 1)
targets = torch.randn(32, 1)
loss = mse_loss(predictions, targets)

# 带权重的损失
weights = torch.tensor([1.0, 2.0, 3.0]) # 类别权重
ce_loss_weighted = nn.CrossEntropyLoss(weight=weights)

# 自定义损失
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.ce = nn.CrossEntropyLoss(reduction='none')

    def forward(self, inputs, targets):
        ce_loss = self.ce(inputs, targets)
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        return focal_loss.mean()

5.2 优化器

Python
import torch.optim as optim

# SGD
optimizer = optim.SGD(
    model.parameters(),
    lr=0.01,                # 学习率
    momentum=0.9,           # 动量
    weight_decay=1e-4,      # L2正则化
    nesterov=True           # Nesterov动量
)

# Adam
optimizer = optim.Adam(
    model.parameters(),
    lr=1e-3,
    betas=(0.9, 0.999),     # 一阶和二阶矩估计的衰减率
    eps=1e-8,
    weight_decay=1e-4
)

# AdamW(推荐,解耦权重衰减)
optimizer = optim.AdamW(
    model.parameters(),
    lr=1e-3,
    weight_decay=0.01       # 真正的权重衰减,不是L2正则
)

# RMSprop
optimizer = optim.RMSprop(
    model.parameters(),
    lr=1e-2,
    alpha=0.99,
    momentum=0.9
)

# 不同层使用不同学习率
optim.SGD([
    {'params': model.features.parameters(), 'lr': 1e-4},  # 预训练层,小学习率
    {'params': model.classifier.parameters(), 'lr': 1e-2}  # 新层,大学习率
], momentum=0.9)

5.3 学习率调度

Python
from torch.optim.lr_scheduler import *

# StepLR - 每隔step_size个epoch,学习率乘以gamma
scheduler = StepLR(optimizer, step_size=30, gamma=0.1)

# MultiStepLR - 在指定里程碑降低学习率
scheduler = MultiStepLR(optimizer, milestones=[30, 80], gamma=0.1)

# ExponentialLR - 指数衰减
scheduler = ExponentialLR(optimizer, gamma=0.95)

# CosineAnnealingLR - 余弦退火
scheduler = CosineAnnealingLR(optimizer, T_max=100, eta_min=0)

# ReduceLROnPlateau - 验证集不下降时降低学习率
scheduler = ReduceLROnPlateau(
    optimizer,
    mode='min',           # 'min'或'max'
    factor=0.1,           # 学习率缩放因子
    patience=10,          # 等待多少个epoch
    verbose=True
)

# OneCycleLR - 先增后降(推荐用于快速收敛)
scheduler = OneCycleLR(
    optimizer,
    max_lr=0.1,
    epochs=10,
    steps_per_epoch=len(train_loader)
)

# Warmup + Cosine
from torch.optim.lr_scheduler import LambdaLR

def get_cosine_schedule_with_warmup(optimizer, num_warmup_steps, num_training_steps):
    def lr_lambda(current_step):
        if current_step < num_warmup_steps:
            return float(current_step) / float(max(1, num_warmup_steps))
        progress = float(current_step - num_warmup_steps) / float(max(1, num_training_steps - num_warmup_steps))
        return max(0.0, 0.5 * (1.0 + math.cos(math.pi * progress)))

    return LambdaLR(optimizer, lr_lambda)

# 使用
for epoch in range(num_epochs):
    for batch in train_loader:
        # ... 训练代码 ...
        optimizer.step()
        scheduler.step()  # 注意:OneCycleLR每个batch都要step()

    # 其他调度器每个epoch step
    # scheduler.step()

    # ReduceLROnPlateau需要传入验证指标
    # scheduler.step(val_loss)

5.4 梯度裁剪

Python
# 在反向传播后,优化器step前进行梯度裁剪
loss.backward()

# 按值裁剪
torch.nn.utils.clip_grad_value_(model.parameters(), clip_value=1.0)

# 按范数裁剪(更常用)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0, norm_type=2)

optimizer.step()

6. 完整训练流程

6.1 基础训练循环

Python
def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)

        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # 统计
        running_loss += loss.item()
        _, predicted = outputs.max(1)  # max(1)按维度1取最大值,_忽略最大值,predicted取索引(预测类别)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()  # 逐元素比较→布尔值求和(True=1)→.item()转为Python int

        if batch_idx % 100 == 0:
            print(f'Batch {batch_idx}/{len(train_loader)}, Loss: {loss.item():.4f}')

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

def validate(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, targets)

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

    epoch_loss = running_loss / len(val_loader)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

# 主训练循环
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)

best_acc = 0.0
for epoch in range(num_epochs):
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc = validate(model, val_loader, criterion, device)
    scheduler.step()

    print(f'Epoch {epoch+1}/{num_epochs}:')
    print(f'  Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
    print(f'  Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')

    # 保存最佳模型
    if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), 'best_model.pth')

6.2 模型保存与加载

Python
# 保存模型(推荐方式:只保存参数)
torch.save(model.state_dict(), 'model.pth')

# 加载模型
model = MyModel()
model.load_state_dict(torch.load('model.pth', weights_only=True))
model.eval()

# 保存完整模型(包括结构)— 不推荐,存在安全风险
torch.save(model, 'model_complete.pth')

# 加载完整模型(需显式设置 weights_only=False)
model = torch.load('model_complete.pth', weights_only=False)

# 保存检查点(包含优化器状态、epoch等)— 推荐方式
checkpoint = {
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'scheduler_state_dict': scheduler.state_dict(),
    'best_acc': best_acc,
}
torch.save(checkpoint, 'checkpoint.pth')

# 恢复训练
# 注意:checkpoint包含优化器等复杂对象,需要weights_only=False
# 仅加载来自可信来源的checkpoint文件
checkpoint = torch.load('checkpoint.pth', weights_only=False)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
start_epoch = checkpoint['epoch'] + 1
best_acc = checkpoint['best_acc']

6.3 早停机制

Python
class EarlyStopping:
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt'):
        self.patience = patience
        self.verbose = verbose
        self.delta = delta
        self.path = path
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = float('inf')

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

# 使用
early_stopping = EarlyStopping(patience=10, verbose=True)

for epoch in range(num_epochs):
    train(...)
    val_loss = validate(...)

    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        print("Early stopping triggered")
        break

6.4 训练可视化

Python
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter('runs/experiment_1')

for epoch in range(num_epochs):
    # ... 训练代码 ...

    # 记录标量
    writer.add_scalar('Loss/train', train_loss, epoch)
    writer.add_scalar('Loss/val', val_loss, epoch)
    writer.add_scalar('Accuracy/train', train_acc, epoch)
    writer.add_scalar('Accuracy/val', val_acc, epoch)
    writer.add_scalar('Learning_rate', optimizer.param_groups[0]['lr'], epoch)

    # 记录直方图
    for name, param in model.named_parameters():
        writer.add_histogram(name, param, epoch)

    # 记录图像
    writer.add_image('Input', images[0], epoch)

    # 记录模型图
    if epoch == 0:
        writer.add_graph(model, images)

writer.close()

# 启动TensorBoard
# tensorboard --logdir=runs

7. GPU与混合精度训练

7.1 CUDA基础

Python
# 检查CUDA是否可用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# 查看GPU信息
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

# 将模型和数据移到GPU
model = model.to(device)
inputs = inputs.to(device)
labels = labels.to(device)

# 或者一次性创建在GPU上
x = torch.randn(3, 4, device='cuda')

# 多GPU
if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs")
    model = nn.DataParallel(model)

7.2 混合精度训练

Python
from torch.amp import autocast, GradScaler

# 创建GradScaler
scaler = GradScaler('cuda')

for epoch in range(num_epochs):
    for inputs, labels in train_loader:
        inputs = inputs.cuda()
        labels = labels.cuda()

        optimizer.zero_grad()

        # 使用autocast进行前向传播
        with autocast(device_type='cuda'):
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        # 缩放损失并反向传播
        scaler.scale(loss).backward()

        # 梯度裁剪(可选)
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        # 更新权重
        scaler.step(optimizer)
        scaler.update()

        scheduler.step()

7.3 分布式训练

Python
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

def setup_distributed():
    dist.init_process_group(backend='nccl')
    local_rank = int(os.environ['LOCAL_RANK'])
    torch.cuda.set_device(local_rank)
    return local_rank

def main():
    local_rank = setup_distributed()

    # 创建模型并包装为DDP
    model = MyModel().cuda(local_rank)
    model = DDP(model, device_ids=[local_rank])

    # 使用DistributedSampler
    train_sampler = DistributedSampler(dataset)
    train_loader = DataLoader(
        dataset,
        batch_size=batch_size,
        sampler=train_sampler
    )

    # 训练
    for epoch in range(num_epochs):
        train_sampler.set_epoch(epoch)  # 每个epoch打乱数据
        for inputs, labels in train_loader:
            inputs = inputs.cuda(local_rank)
            labels = labels.cuda(local_rank)
            # ... 训练代码 ...

if __name__ == '__main__':
    main()

# 启动命令
# torchrun --nproc_per_node=4 train.py

8. 模型部署基础

8.1 模型导出

Python
# TorchScript(推荐用于Python环境)
model.eval()
example_input = torch.randn(1, 3, 224, 224)

# 使用tracing
traced_model = torch.jit.trace(model, example_input)
traced_model.save('model_traced.pt')

# 使用scripting(支持控制流)
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')

# 加载TorchScript模型
loaded_model = torch.jit.load('model_traced.pt')
output = loaded_model(input_tensor)

8.2 ONNX导出

Python
# 导出为ONNX格式
model.eval()
dummy_input = torch.randn(1, 3, 224, 224)

torch.onnx.export(
    model,
    dummy_input,
    'model.onnx',
    export_params=True,
    opset_version=11,
    do_constant_folding=True,
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={
        'input': {0: 'batch_size'},
        'output': {0: 'batch_size'}
    }
)

# 验证ONNX模型
import onnx
onnx_model = onnx.load('model.onnx')
onnx.checker.check_model(onnx_model)

8.3 推理优化

Python
# 1. 使用eval模式
model.eval()

# 2. 禁用梯度计算
with torch.no_grad():
    output = model(input)

# 3. 使用TorchScript
model = torch.jit.script(model)

# 4. 使用半精度(推理时)
model = model.half()
input = input.half()

# 5. 批处理推理
def batch_predict(model, inputs, batch_size=32):
    results = []
    for i in range(0, len(inputs), batch_size):
        batch = inputs[i:i+batch_size]
        with torch.no_grad():
            output = model(batch)
        results.append(output)
    return torch.cat(results)

📝 练习

练习1: 张量操作

Python
# 1. 创建一个形状为(4, 3, 32, 32)的随机张量,模拟一批图像
# 2. 将其变形为(4, 3, 1024),然后计算每个图像的均值和标准差
# 3. 提取每个图像的中心区域(16, 16)
# 4. 将结果归一化到[0, 1]范围

练习2: 自动微分

Python
# 实现一个简单的线性回归
# 1. 生成随机数据 y = 2x + 1 + noise
# 2. 使用PyTorch的自动微分训练模型
# 3. 绘制损失曲线
# 4. 比较手动计算梯度和autograd的结果

练习3: 自定义Dataset

Python
# 创建一个处理CSV数据的Dataset
# 1. 从CSV加载数据
# 2. 实现数据预处理(标准化、编码等)
# 3. 使用DataLoader进行批处理
# 4. 添加数据增强

练习4: 构建CNN

Python
# 1. 实现一个VGG风格的网络
# 2. 添加BatchNorm和Dropout
# 3. 使用不同的初始化方法
# 4. 在CIFAR-10上训练并达到85%+准确率

练习5: 完整训练流程

Python
# 1. 实现包含以下功能的训练脚本:
#    - 命令行参数解析
#    - 日志记录
#    - 模型检查点保存
#    - 早停
#    - 学习率调度
#    - TensorBoard可视化
# 2. 在MNIST或CIFAR-10上验证

🎯 自我检查

  • 深入理解张量的内存布局和操作
  • 掌握自动微分的原理和使用
  • 能自定义Dataset和DataLoader
  • 能构建复杂的神经网络架构
  • 掌握完整的训练流程
  • 会使用GPU和混合精度训练
  • 了解模型部署基础

📚 延伸阅读


下一步: 02 - scikit-learn