跳转至

02 - 前向扩散过程

学习时间: 4小时 重要性: ⭐⭐⭐⭐⭐ 理解扩散模型的第一步


🎯 学习目标

完成本章后,你将能够: - 理解前向过程的数学定义和直观意义 - 掌握重参数化技巧在前向过程中的应用 - 理解噪声调度策略(Noise Schedule) - 实现前向扩散过程的代码 - 理解为什么可以直接采样任意时刻的 \(x_t\)


1. 前向过程概述

1.1 什么是前向过程

定义:前向过程是一个固定的(非学习的)马尔可夫链,逐步向数据添加高斯噪声。

直观理解

Text Only
x_0 (原始图像)
    ↓ + 少量噪声
x_1 (轻微模糊)
    ↓ + 少量噪声
x_2 (更模糊)
    ...
    ↓ + 噪声
x_T (纯噪声,近似标准高斯分布)

关键特性: 1. 固定过程:不需要学习,是预定义的 2. 马尔可夫性\(x_t\) 只依赖于 \(x_{t-1}\) 3. 高斯转移:每一步都添加高斯噪声 4. 收敛性:当 \(T\) 足够大时,\(x_T\) 近似纯噪声

1.2 数学定义

单步转移概率

\[q(x_t | x_{t-1}) = \mathcal{N}(x_t; \sqrt{1-\beta_t} x_{t-1}, \beta_t \mathbf{I})\]

其中: - \(\beta_t \in (0, 1)\):噪声调度参数,控制第 \(t\) 步添加的噪声量 - \(\sqrt{1-\beta_t}\):保留原始信号的比例 - \(\beta_t\):添加噪声的方差

重参数化形式

\[x_t = \sqrt{1-\beta_t} x_{t-1} + \sqrt{\beta_t} \epsilon_{t-1}, \quad \epsilon_{t-1} \sim \mathcal{N}(0, \mathbf{I})\]

2. 噪声调度策略(Noise Schedule)

2.1 什么是噪声调度

噪声调度定义了 \(\beta_t\) 如何随时间 \(t\) 变化。

为什么重要? - 控制扩散的速度 - 影响训练稳定性和生成质量 - 需要平衡:太快会丢失信息,太慢会训练困难

2.2 常用的噪声调度

线性调度(Linear Schedule)

\[\beta_t = \text{linspace}(\beta_{\text{start}}, \beta_{\text{end}}, T)\]
Python
import numpy as np
import matplotlib.pyplot as plt

def linear_beta_schedule(timesteps, beta_start=0.0001, beta_end=0.02):
    """
    线性噪声调度
    """
    return np.linspace(beta_start, beta_end, timesteps)

# 可视化
timesteps = 1000
betas_linear = linear_beta_schedule(timesteps)

plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(betas_linear, label='Linear')
plt.xlabel('Timestep')
plt.ylabel('Beta')
plt.title('Linear Noise Schedule')
plt.legend()
plt.grid(True, alpha=0.3)

余弦调度(Cosine Schedule)

更平滑的调度,在训练后期添加更少的噪声:

\[\bar{\alpha}_t = \frac{f(t)}{f(0)}, \quad f(t) = \cos\left(\frac{t/T + s}{1+s} \cdot \frac{\pi}{2}\right)^2\]
Python
def cosine_beta_schedule(timesteps, s=0.008):
    """
    余弦噪声调度(Improved DDPM)
    """
    steps = timesteps + 1
    x = np.linspace(0, timesteps, steps)
    alphas_cumprod = np.cos(((x / timesteps) + s) / (1 + s) * np.pi / 2) ** 2
    alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
    betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
    return np.clip(betas, 0.0001, 0.9999)

betas_cosine = cosine_beta_schedule(timesteps)

plt.subplot(1, 2, 2)
plt.plot(betas_linear, label='Linear', alpha=0.7)
plt.plot(betas_cosine, label='Cosine', alpha=0.7)
plt.xlabel('Timestep')
plt.ylabel('Beta')
plt.title('Noise Schedule Comparison')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('noise_schedules.png', dpi=150)
plt.show()

2.3 调度策略对比

调度类型 优点 缺点 适用场景
Linear 简单直观 后期可能过于嘈杂 通用
Cosine 更平滑,后期噪声少 稍复杂 高质量生成
Quadratic 前期扩散快 需要调参 特定任务
Sigmoid 可控性强 参数敏感 研究实验

3. 边缘分布的闭合形式

3.1 为什么需要闭合形式

问题:如果要训练模型,我们需要从 \(q(x_t | x_0)\) 中采样。逐步模拟 \(t\) 步太慢了!

解决方案:利用高斯分布的性质,直接计算 \(q(x_t | x_0)\)

3.2 推导过程

定义: - \(\alpha_t = 1 - \beta_t\) - \(\bar{\alpha}_t = \prod_{i=1}^t \alpha_i\) (累积乘积)

推导

\(x_0\) 开始,递归展开:

\[ \begin{aligned} x_t &= \sqrt{\alpha_t} x_{t-1} + \sqrt{1-\alpha_t} \epsilon_{t-1} \\ &= \sqrt{\alpha_t} (\sqrt{\alpha_{t-1}} x_{t-2} + \sqrt{1-\alpha_{t-1}} \epsilon_{t-2}) + \sqrt{1-\alpha_t} \epsilon_{t-1} \\ &= \sqrt{\alpha_t \alpha_{t-1}} x_{t-2} + \underbrace{(\sqrt{\alpha_t(1-\alpha_{t-1})} \epsilon_{t-2} + \sqrt{1-\alpha_t} \epsilon_{t-1})}_{\text{高斯分布的和}} \end{aligned} \]

利用高斯分布的性质:独立高斯变量的线性组合仍是高斯分布

\[a \cdot \mathcal{N}(0, \sigma_1^2) + b \cdot \mathcal{N}(0, \sigma_2^2) = \mathcal{N}(0, a^2\sigma_1^2 + b^2\sigma_2^2)\]

因此: $\(x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1-\bar{\alpha}_t} \epsilon, \quad \epsilon \sim \mathcal{N}(0, \mathbf{I})\)$

结论: $\(q(x_t | x_0) = \mathcal{N}(x_t; \sqrt{\bar{\alpha}_t} x_0, (1-\bar{\alpha}_t)\mathbf{I})\)$

3.3 代码验证

Python
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats

def q_sample_step_by_step(x_0, t, betas):
    """
    逐步模拟:从 x_0 到 x_t,模拟 t 步
    """
    x = x_0.copy()
    for i in range(t):
        epsilon = np.random.normal(0, 1, x.shape)
        alpha_t = 1 - betas[i]
        x = np.sqrt(alpha_t) * x + np.sqrt(betas[i]) * epsilon
    return x

def q_sample_direct(x_0, t, alphas_cumprod):
    """
    直接采样:使用闭合形式
    """
    alpha_bar_t = alphas_cumprod[t]
    epsilon = np.random.normal(0, 1, x_0.shape)
    x_t = np.sqrt(alpha_bar_t) * x_0 + np.sqrt(1 - alpha_bar_t) * epsilon
    return x_t, epsilon

# 验证两种方法等价
np.random.seed(42)
timesteps = 1000
betas = linear_beta_schedule(timesteps)
alphas = 1 - betas
alphas_cumprod = np.cumprod(alphas)

# 测试数据
x_0 = np.array([2.0, -1.0, 0.5])  # np.array创建NumPy数组
t = 500

# 方法1:逐步模拟
x_step = q_sample_step_by_step(x_0, t, betas)

# 方法2:直接采样
# ⚠️ 注意:两种方法在**分布**上等价,但由于随机采样路径不同
#(逐步法抽 t 次噪声,直接法仅抽 1 次),即使相同随机种子
# 也不会得到相同的数值结果。可通过大量采样验证二者分布一致。
np.random.seed(42)
x_direct, eps = q_sample_direct(x_0, t, alphas_cumprod)

print("验证闭合形式(分布等价性):")
print(f"逐步模拟 x_{t} = {x_step}")
print(f"直接采样 x_{t} = {x_direct}")
print("两种方法采样自同一分布 q(x_t | x_0),可通过统计检验验证分布一致性。")

3.4 可视化扩散过程

Python
def visualize_forward_diffusion(x_0, timesteps, betas, show_steps=10):
    """
    可视化前向扩散过程
    """
    alphas = 1 - betas
    alphas_cumprod = np.cumprod(alphas)

    # 选择要显示的时间步
    show_timesteps = np.linspace(0, timesteps-1, show_steps, dtype=int)

    plt.figure(figsize=(15, 3))

    for i, t in enumerate(show_timesteps):  # enumerate同时获取索引和元素
        x_t, _ = q_sample_direct(x_0, t, alphas_cumprod)

        plt.subplot(1, show_steps, i+1)
        plt.hist(x_t, bins=30, alpha=0.7, density=True)
        plt.title(f't={t}\nᾱ={alphas_cumprod[t]:.3f}')
        plt.xlabel('Value')
        if i == 0:
            plt.ylabel('Density')
        plt.grid(True, alpha=0.3)

    plt.suptitle('Forward Diffusion Process: Distribution at Different Timesteps')
    plt.tight_layout()
    plt.savefig('forward_diffusion_visualization.png', dpi=150)
    plt.show()

# 生成测试数据
np.random.seed(42)
x_0 = np.random.normal(3, 1, 1000)  # 均值为3的高斯分布
visualize_forward_diffusion(x_0, timesteps, betas)

4. 信号与噪声的比例分析

4.1 信号比例

在前向过程中: - 信号比例\(\sqrt{\bar{\alpha}_t}\) - 噪声比例\(\sqrt{1-\bar{\alpha}_t}\)

随着 \(t\) 增加: - \(\bar{\alpha}_t\) 从接近1逐渐减小到接近0 - 信号比例减小,噪声比例增加

4.2 可视化信号衰减

Python
# 计算信号和噪声比例
signal_ratio = np.sqrt(alphas_cumprod)
noise_ratio = np.sqrt(1 - alphas_cumprod)
snr = alphas_cumprod / (1 - alphas_cumprod)  # 信噪比

plt.figure(figsize=(15, 4))

plt.subplot(1, 3, 1)
plt.plot(signal_ratio, label='Signal Ratio (√ᾱ)', color='blue')
plt.plot(noise_ratio, label='Noise Ratio (√(1-ᾱ))', color='red')
plt.xlabel('Timestep')
plt.ylabel('Ratio')
plt.title('Signal vs Noise Ratio')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 3, 2)
plt.plot(alphas_cumprod, label='ᾱ_t', color='green')
plt.xlabel('Timestep')
plt.ylabel('ᾱ_t')
plt.title('Cumulative Product of Alphas')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 3, 3)
plt.semilogy(snr, label='SNR (ᾱ/(1-ᾱ))', color='purple')
plt.xlabel('Timestep')
plt.ylabel('SNR (log scale)')
plt.title('Signal-to-Noise Ratio')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('signal_noise_analysis.png', dpi=150)
plt.show()

# 打印关键时间点的值
key_timesteps = [0, 250, 500, 750, 999]
print("\n关键时间点的信号/噪声比例:")
print("-" * 60)
print(f"{'t':<10} {'ᾱ_t':<15} {'Signal':<15} {'Noise':<15} {'SNR':<15}")
print("-" * 60)
for t in key_timesteps:
    print(f"{t:<10} {alphas_cumprod[t]:<15.6f} {signal_ratio[t]:<15.6f} "
          f"{noise_ratio[t]:<15.6f} {snr[t]:<15.6f}")

5. 完整的前向扩散实现

Python
import numpy as np
import torch
import torch.nn as nn

class ForwardDiffusion:
    """
    前向扩散过程的完整实现
    """
    def __init__(self, timesteps=1000, beta_schedule='linear'):
        self.timesteps = timesteps

        # 定义beta调度
        if beta_schedule == 'linear':
            self.betas = self._linear_beta_schedule(timesteps)
        elif beta_schedule == 'cosine':
            self.betas = self._cosine_beta_schedule(timesteps)
        else:
            raise ValueError(f"Unknown beta schedule: {beta_schedule}")

        # 预计算alpha相关值
        self.alphas = 1.0 - self.betas
        self.alphas_cumprod = np.cumprod(self.alphas)
        self.alphas_cumprod_prev = np.append(1.0, self.alphas_cumprod[:-1])

        # 转换为torch张量
        self.betas = torch.from_numpy(self.betas).float()
        self.alphas = torch.from_numpy(self.alphas).float()
        self.alphas_cumprod = torch.from_numpy(self.alphas_cumprod).float()
        self.alphas_cumprod_prev = torch.from_numpy(self.alphas_cumprod_prev).float()

        # 计算用于重参数化的值
        self.sqrt_alphas_cumprod = torch.sqrt(self.alphas_cumprod)
        self.sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - self.alphas_cumprod)

    def _linear_beta_schedule(self, timesteps, beta_start=0.0001, beta_end=0.02):
        return np.linspace(beta_start, beta_end, timesteps)

    def _cosine_beta_schedule(self, timesteps, s=0.008):
        steps = timesteps + 1
        x = np.linspace(0, timesteps, steps)
        alphas_cumprod = np.cos(((x / timesteps) + s) / (1 + s) * np.pi / 2) ** 2
        alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
        betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
        return np.clip(betas, 0.0001, 0.9999)

    def q_sample(self, x_0, t, noise=None):
        """
        从 q(x_t | x_0) 采样

        参数:
            x_0: 原始数据 [B, C, H, W]
            t: 时间步 [B]
            noise: 可选的噪声,如果不提供则随机采样

        返回:
            x_t: 加噪后的数据
            noise: 使用的噪声(用于训练)
        """
        if noise is None:
            noise = torch.randn_like(x_0)

        # 获取对应时间步的值
        sqrt_alpha_cumprod_t = self.sqrt_alphas_cumprod[t].view(-1, 1, 1, 1)  # 重塑张量形状
        sqrt_one_minus_alpha_cumprod_t = self.sqrt_one_minus_alphas_cumprod[t].view(-1, 1, 1, 1)

        # 重参数化: x_t = √ᾱ_t * x_0 + √(1-ᾱ_t) * ε
        x_t = sqrt_alpha_cumprod_t * x_0 + sqrt_one_minus_alpha_cumprod_t * noise

        return x_t, noise

    def q_posterior_mean_variance(self, x_0, x_t, t):
        """
        计算后验分布 q(x_{t-1} | x_t, x_0) 的均值和方差
        用于反向过程
        """
        posterior_mean = (
            self._extract(self.betas, t, x_t.shape) *
            torch.sqrt(self._extract(self.alphas_cumprod_prev, t, x_t.shape)) * x_0 +
            torch.sqrt(self._extract(self.alphas, t, x_t.shape)) *
            (1 - self._extract(self.alphas_cumprod_prev, t, x_t.shape)) * x_t
        ) / (1 - self._extract(self.alphas_cumprod, t, x_t.shape))

        posterior_variance = (
            self._extract(self.betas, t, x_t.shape) *
            (1 - self._extract(self.alphas_cumprod_prev, t, x_t.shape))
        ) / (1 - self._extract(self.alphas_cumprod, t, x_t.shape))

        return posterior_mean, posterior_variance

    def _extract(self, a, t, x_shape):
        """从张量a中提取对应时间步t的值,并调整形状"""
        batch_size = t.shape[0]
        out = a.to(t.device).gather(0, t).float()
        return out.view(batch_size, *((1,) * (len(x_shape) - 1)))

# 测试
if __name__ == "__main__":
    # 创建扩散过程
    diffusion = ForwardDiffusion(timesteps=1000, beta_schedule='linear')

    # 创建模拟图像数据
    batch_size = 4
    channels = 3
    height, width = 32, 32
    x_0 = torch.randn(batch_size, channels, height, width)

    # 随机选择时间步
    t = torch.randint(0, 1000, (batch_size,))

    # 加噪
    x_t, noise = diffusion.q_sample(x_0, t)

    print(f"原始数据 x_0: shape={x_0.shape}, mean={x_0.mean():.4f}, std={x_0.std():.4f}")
    print(f"加噪数据 x_t: shape={x_t.shape}, mean={x_t.mean():.4f}, std={x_t.std():.4f}")
    print(f"噪声 ε: shape={noise.shape}, mean={noise.mean():.4f}, std={noise.std():.4f}")
    print(f"时间步 t: {t}")

6. 本章总结

核心概念

  1. 前向过程
  2. 固定的高斯马尔可夫链
  3. 逐步添加噪声:\(x_t = \sqrt{1-\beta_t} x_{t-1} + \sqrt{\beta_t} \epsilon\)
  4. 最终收敛到标准高斯分布

  5. 噪声调度

  6. Linear:简单直观
  7. Cosine:更平滑,效果更好
  8. 控制扩散速度和训练稳定性

  9. 闭合形式

  10. \(q(x_t | x_0) = \mathcal{N}(\sqrt{\bar{\alpha}_t} x_0, (1-\bar{\alpha}_t)\mathbf{I})\)
  11. 可以直接采样任意时刻,无需逐步模拟
  12. 这是训练的关键

关键公式

概念 公式
单步转移 \(q(x_t \mid x_{t-1}) = \mathcal{N}(\sqrt{1-\beta_t} x_{t-1}, \beta_t \mathbf{I})\)
重参数化 \(x_t = \sqrt{1-\beta_t} x_{t-1} + \sqrt{\beta_t} \epsilon\)
边缘分布 \(q(x_t \mid x_0) = \mathcal{N}(\sqrt{\bar{\alpha}_t} x_0, (1-\bar{\alpha}_t)\mathbf{I})\)
直接采样 \(x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1-\bar{\alpha}_t} \epsilon\)

代码要点

Python
# 核心操作:从 q(x_t | x_0) 采样
def q_sample(x_0, t, alphas_cumprod):
    alpha_bar_t = alphas_cumprod[t]
    epsilon = torch.randn_like(x_0)
    x_t = torch.sqrt(alpha_bar_t) * x_0 + torch.sqrt(1 - alpha_bar_t) * epsilon
    return x_t, epsilon

📝 自测问题

基础问题

  1. 前向过程的特性
  2. 前向过程是固定的还是学习的?为什么?
  3. 为什么使用高斯噪声?可以用其他分布吗?
  4. 马尔可夫性在前向过程中起什么作用?

  5. 噪声调度

  6. 解释线性调度和余弦调度的区别
  7. 如果 \(\beta_t\) 太大或太小会怎样?
  8. 如何选择合适的噪声调度?

  9. 闭合形式

  10. 为什么闭合形式很重要?
  11. 推导 \(q(x_t | x_0)\) 的过程中用到了哪些数学性质?
  12. 直接采样和逐步模拟有什么区别?

编程练习

  1. 实现不同的噪声调度(quadratic、sigmoid)并可视化
  2. 计算并绘制信噪比(SNR)随时间的变化
  3. 实现一个函数,批量采样不同时间步的 \(x_t\)

思考题

  1. 如果前向过程不是马尔可夫的,会怎样?
  2. 为什么 \(\bar{\alpha}_T\) 要接近0?如果不接近会怎样?
  3. 前向过程的设计对反向过程有什么影响?

🔗 下一步

理解了前向过程后,我们将学习反向去噪过程,这是扩散模型真正"学习"的部分。

→ 下一步:03-反向去噪过程.md