02 - 前向扩散过程¶
学习时间: 4小时 重要性: ⭐⭐⭐⭐⭐ 理解扩散模型的第一步
🎯 学习目标¶
完成本章后,你将能够: - 理解前向过程的数学定义和直观意义 - 掌握重参数化技巧在前向过程中的应用 - 理解噪声调度策略(Noise Schedule) - 实现前向扩散过程的代码 - 理解为什么可以直接采样任意时刻的 \(x_t\)
1. 前向过程概述¶
1.1 什么是前向过程¶
定义:前向过程是一个固定的(非学习的)马尔可夫链,逐步向数据添加高斯噪声。
直观理解:
关键特性: 1. 固定过程:不需要学习,是预定义的 2. 马尔可夫性:\(x_t\) 只依赖于 \(x_{t-1}\) 3. 高斯转移:每一步都添加高斯噪声 4. 收敛性:当 \(T\) 足够大时,\(x_T\) 近似纯噪声
1.2 数学定义¶
单步转移概率:
其中: - \(\beta_t \in (0, 1)\):噪声调度参数,控制第 \(t\) 步添加的噪声量 - \(\sqrt{1-\beta_t}\):保留原始信号的比例 - \(\beta_t\):添加噪声的方差
重参数化形式:
2. 噪声调度策略(Noise Schedule)¶
2.1 什么是噪声调度¶
噪声调度定义了 \(\beta_t\) 如何随时间 \(t\) 变化。
为什么重要? - 控制扩散的速度 - 影响训练稳定性和生成质量 - 需要平衡:太快会丢失信息,太慢会训练困难
2.2 常用的噪声调度¶
线性调度(Linear Schedule)¶
import numpy as np
import matplotlib.pyplot as plt
def linear_beta_schedule(timesteps, beta_start=0.0001, beta_end=0.02):
"""
线性噪声调度
"""
return np.linspace(beta_start, beta_end, timesteps)
# 可视化
timesteps = 1000
betas_linear = linear_beta_schedule(timesteps)
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(betas_linear, label='Linear')
plt.xlabel('Timestep')
plt.ylabel('Beta')
plt.title('Linear Noise Schedule')
plt.legend()
plt.grid(True, alpha=0.3)
余弦调度(Cosine Schedule)¶
更平滑的调度,在训练后期添加更少的噪声:
def cosine_beta_schedule(timesteps, s=0.008):
"""
余弦噪声调度(Improved DDPM)
"""
steps = timesteps + 1
x = np.linspace(0, timesteps, steps)
alphas_cumprod = np.cos(((x / timesteps) + s) / (1 + s) * np.pi / 2) ** 2
alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
return np.clip(betas, 0.0001, 0.9999)
betas_cosine = cosine_beta_schedule(timesteps)
plt.subplot(1, 2, 2)
plt.plot(betas_linear, label='Linear', alpha=0.7)
plt.plot(betas_cosine, label='Cosine', alpha=0.7)
plt.xlabel('Timestep')
plt.ylabel('Beta')
plt.title('Noise Schedule Comparison')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('noise_schedules.png', dpi=150)
plt.show()
2.3 调度策略对比¶
| 调度类型 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Linear | 简单直观 | 后期可能过于嘈杂 | 通用 |
| Cosine | 更平滑,后期噪声少 | 稍复杂 | 高质量生成 |
| Quadratic | 前期扩散快 | 需要调参 | 特定任务 |
| Sigmoid | 可控性强 | 参数敏感 | 研究实验 |
3. 边缘分布的闭合形式¶
3.1 为什么需要闭合形式¶
问题:如果要训练模型,我们需要从 \(q(x_t | x_0)\) 中采样。逐步模拟 \(t\) 步太慢了!
解决方案:利用高斯分布的性质,直接计算 \(q(x_t | x_0)\)。
3.2 推导过程¶
定义: - \(\alpha_t = 1 - \beta_t\) - \(\bar{\alpha}_t = \prod_{i=1}^t \alpha_i\) (累积乘积)
推导:
从 \(x_0\) 开始,递归展开:
利用高斯分布的性质:独立高斯变量的线性组合仍是高斯分布
因此: $\(x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1-\bar{\alpha}_t} \epsilon, \quad \epsilon \sim \mathcal{N}(0, \mathbf{I})\)$
结论: $\(q(x_t | x_0) = \mathcal{N}(x_t; \sqrt{\bar{\alpha}_t} x_0, (1-\bar{\alpha}_t)\mathbf{I})\)$
3.3 代码验证¶
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
def q_sample_step_by_step(x_0, t, betas):
"""
逐步模拟:从 x_0 到 x_t,模拟 t 步
"""
x = x_0.copy()
for i in range(t):
epsilon = np.random.normal(0, 1, x.shape)
alpha_t = 1 - betas[i]
x = np.sqrt(alpha_t) * x + np.sqrt(betas[i]) * epsilon
return x
def q_sample_direct(x_0, t, alphas_cumprod):
"""
直接采样:使用闭合形式
"""
alpha_bar_t = alphas_cumprod[t]
epsilon = np.random.normal(0, 1, x_0.shape)
x_t = np.sqrt(alpha_bar_t) * x_0 + np.sqrt(1 - alpha_bar_t) * epsilon
return x_t, epsilon
# 验证两种方法等价
np.random.seed(42)
timesteps = 1000
betas = linear_beta_schedule(timesteps)
alphas = 1 - betas
alphas_cumprod = np.cumprod(alphas)
# 测试数据
x_0 = np.array([2.0, -1.0, 0.5]) # np.array创建NumPy数组
t = 500
# 方法1:逐步模拟
x_step = q_sample_step_by_step(x_0, t, betas)
# 方法2:直接采样
# ⚠️ 注意:两种方法在**分布**上等价,但由于随机采样路径不同
#(逐步法抽 t 次噪声,直接法仅抽 1 次),即使相同随机种子
# 也不会得到相同的数值结果。可通过大量采样验证二者分布一致。
np.random.seed(42)
x_direct, eps = q_sample_direct(x_0, t, alphas_cumprod)
print("验证闭合形式(分布等价性):")
print(f"逐步模拟 x_{t} = {x_step}")
print(f"直接采样 x_{t} = {x_direct}")
print("两种方法采样自同一分布 q(x_t | x_0),可通过统计检验验证分布一致性。")
3.4 可视化扩散过程¶
def visualize_forward_diffusion(x_0, timesteps, betas, show_steps=10):
"""
可视化前向扩散过程
"""
alphas = 1 - betas
alphas_cumprod = np.cumprod(alphas)
# 选择要显示的时间步
show_timesteps = np.linspace(0, timesteps-1, show_steps, dtype=int)
plt.figure(figsize=(15, 3))
for i, t in enumerate(show_timesteps): # enumerate同时获取索引和元素
x_t, _ = q_sample_direct(x_0, t, alphas_cumprod)
plt.subplot(1, show_steps, i+1)
plt.hist(x_t, bins=30, alpha=0.7, density=True)
plt.title(f't={t}\nᾱ={alphas_cumprod[t]:.3f}')
plt.xlabel('Value')
if i == 0:
plt.ylabel('Density')
plt.grid(True, alpha=0.3)
plt.suptitle('Forward Diffusion Process: Distribution at Different Timesteps')
plt.tight_layout()
plt.savefig('forward_diffusion_visualization.png', dpi=150)
plt.show()
# 生成测试数据
np.random.seed(42)
x_0 = np.random.normal(3, 1, 1000) # 均值为3的高斯分布
visualize_forward_diffusion(x_0, timesteps, betas)
4. 信号与噪声的比例分析¶
4.1 信号比例¶
在前向过程中: - 信号比例:\(\sqrt{\bar{\alpha}_t}\) - 噪声比例:\(\sqrt{1-\bar{\alpha}_t}\)
随着 \(t\) 增加: - \(\bar{\alpha}_t\) 从接近1逐渐减小到接近0 - 信号比例减小,噪声比例增加
4.2 可视化信号衰减¶
# 计算信号和噪声比例
signal_ratio = np.sqrt(alphas_cumprod)
noise_ratio = np.sqrt(1 - alphas_cumprod)
snr = alphas_cumprod / (1 - alphas_cumprod) # 信噪比
plt.figure(figsize=(15, 4))
plt.subplot(1, 3, 1)
plt.plot(signal_ratio, label='Signal Ratio (√ᾱ)', color='blue')
plt.plot(noise_ratio, label='Noise Ratio (√(1-ᾱ))', color='red')
plt.xlabel('Timestep')
plt.ylabel('Ratio')
plt.title('Signal vs Noise Ratio')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 3, 2)
plt.plot(alphas_cumprod, label='ᾱ_t', color='green')
plt.xlabel('Timestep')
plt.ylabel('ᾱ_t')
plt.title('Cumulative Product of Alphas')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 3, 3)
plt.semilogy(snr, label='SNR (ᾱ/(1-ᾱ))', color='purple')
plt.xlabel('Timestep')
plt.ylabel('SNR (log scale)')
plt.title('Signal-to-Noise Ratio')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('signal_noise_analysis.png', dpi=150)
plt.show()
# 打印关键时间点的值
key_timesteps = [0, 250, 500, 750, 999]
print("\n关键时间点的信号/噪声比例:")
print("-" * 60)
print(f"{'t':<10} {'ᾱ_t':<15} {'Signal':<15} {'Noise':<15} {'SNR':<15}")
print("-" * 60)
for t in key_timesteps:
print(f"{t:<10} {alphas_cumprod[t]:<15.6f} {signal_ratio[t]:<15.6f} "
f"{noise_ratio[t]:<15.6f} {snr[t]:<15.6f}")
5. 完整的前向扩散实现¶
import numpy as np
import torch
import torch.nn as nn
class ForwardDiffusion:
"""
前向扩散过程的完整实现
"""
def __init__(self, timesteps=1000, beta_schedule='linear'):
self.timesteps = timesteps
# 定义beta调度
if beta_schedule == 'linear':
self.betas = self._linear_beta_schedule(timesteps)
elif beta_schedule == 'cosine':
self.betas = self._cosine_beta_schedule(timesteps)
else:
raise ValueError(f"Unknown beta schedule: {beta_schedule}")
# 预计算alpha相关值
self.alphas = 1.0 - self.betas
self.alphas_cumprod = np.cumprod(self.alphas)
self.alphas_cumprod_prev = np.append(1.0, self.alphas_cumprod[:-1])
# 转换为torch张量
self.betas = torch.from_numpy(self.betas).float()
self.alphas = torch.from_numpy(self.alphas).float()
self.alphas_cumprod = torch.from_numpy(self.alphas_cumprod).float()
self.alphas_cumprod_prev = torch.from_numpy(self.alphas_cumprod_prev).float()
# 计算用于重参数化的值
self.sqrt_alphas_cumprod = torch.sqrt(self.alphas_cumprod)
self.sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - self.alphas_cumprod)
def _linear_beta_schedule(self, timesteps, beta_start=0.0001, beta_end=0.02):
return np.linspace(beta_start, beta_end, timesteps)
def _cosine_beta_schedule(self, timesteps, s=0.008):
steps = timesteps + 1
x = np.linspace(0, timesteps, steps)
alphas_cumprod = np.cos(((x / timesteps) + s) / (1 + s) * np.pi / 2) ** 2
alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
return np.clip(betas, 0.0001, 0.9999)
def q_sample(self, x_0, t, noise=None):
"""
从 q(x_t | x_0) 采样
参数:
x_0: 原始数据 [B, C, H, W]
t: 时间步 [B]
noise: 可选的噪声,如果不提供则随机采样
返回:
x_t: 加噪后的数据
noise: 使用的噪声(用于训练)
"""
if noise is None:
noise = torch.randn_like(x_0)
# 获取对应时间步的值
sqrt_alpha_cumprod_t = self.sqrt_alphas_cumprod[t].view(-1, 1, 1, 1) # 重塑张量形状
sqrt_one_minus_alpha_cumprod_t = self.sqrt_one_minus_alphas_cumprod[t].view(-1, 1, 1, 1)
# 重参数化: x_t = √ᾱ_t * x_0 + √(1-ᾱ_t) * ε
x_t = sqrt_alpha_cumprod_t * x_0 + sqrt_one_minus_alpha_cumprod_t * noise
return x_t, noise
def q_posterior_mean_variance(self, x_0, x_t, t):
"""
计算后验分布 q(x_{t-1} | x_t, x_0) 的均值和方差
用于反向过程
"""
posterior_mean = (
self._extract(self.betas, t, x_t.shape) *
torch.sqrt(self._extract(self.alphas_cumprod_prev, t, x_t.shape)) * x_0 +
torch.sqrt(self._extract(self.alphas, t, x_t.shape)) *
(1 - self._extract(self.alphas_cumprod_prev, t, x_t.shape)) * x_t
) / (1 - self._extract(self.alphas_cumprod, t, x_t.shape))
posterior_variance = (
self._extract(self.betas, t, x_t.shape) *
(1 - self._extract(self.alphas_cumprod_prev, t, x_t.shape))
) / (1 - self._extract(self.alphas_cumprod, t, x_t.shape))
return posterior_mean, posterior_variance
def _extract(self, a, t, x_shape):
"""从张量a中提取对应时间步t的值,并调整形状"""
batch_size = t.shape[0]
out = a.to(t.device).gather(0, t).float()
return out.view(batch_size, *((1,) * (len(x_shape) - 1)))
# 测试
if __name__ == "__main__":
# 创建扩散过程
diffusion = ForwardDiffusion(timesteps=1000, beta_schedule='linear')
# 创建模拟图像数据
batch_size = 4
channels = 3
height, width = 32, 32
x_0 = torch.randn(batch_size, channels, height, width)
# 随机选择时间步
t = torch.randint(0, 1000, (batch_size,))
# 加噪
x_t, noise = diffusion.q_sample(x_0, t)
print(f"原始数据 x_0: shape={x_0.shape}, mean={x_0.mean():.4f}, std={x_0.std():.4f}")
print(f"加噪数据 x_t: shape={x_t.shape}, mean={x_t.mean():.4f}, std={x_t.std():.4f}")
print(f"噪声 ε: shape={noise.shape}, mean={noise.mean():.4f}, std={noise.std():.4f}")
print(f"时间步 t: {t}")
6. 本章总结¶
核心概念¶
- 前向过程
- 固定的高斯马尔可夫链
- 逐步添加噪声:\(x_t = \sqrt{1-\beta_t} x_{t-1} + \sqrt{\beta_t} \epsilon\)
-
最终收敛到标准高斯分布
-
噪声调度
- Linear:简单直观
- Cosine:更平滑,效果更好
-
控制扩散速度和训练稳定性
-
闭合形式
- \(q(x_t | x_0) = \mathcal{N}(\sqrt{\bar{\alpha}_t} x_0, (1-\bar{\alpha}_t)\mathbf{I})\)
- 可以直接采样任意时刻,无需逐步模拟
- 这是训练的关键
关键公式¶
| 概念 | 公式 |
|---|---|
| 单步转移 | \(q(x_t \mid x_{t-1}) = \mathcal{N}(\sqrt{1-\beta_t} x_{t-1}, \beta_t \mathbf{I})\) |
| 重参数化 | \(x_t = \sqrt{1-\beta_t} x_{t-1} + \sqrt{\beta_t} \epsilon\) |
| 边缘分布 | \(q(x_t \mid x_0) = \mathcal{N}(\sqrt{\bar{\alpha}_t} x_0, (1-\bar{\alpha}_t)\mathbf{I})\) |
| 直接采样 | \(x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1-\bar{\alpha}_t} \epsilon\) |
代码要点¶
# 核心操作:从 q(x_t | x_0) 采样
def q_sample(x_0, t, alphas_cumprod):
alpha_bar_t = alphas_cumprod[t]
epsilon = torch.randn_like(x_0)
x_t = torch.sqrt(alpha_bar_t) * x_0 + torch.sqrt(1 - alpha_bar_t) * epsilon
return x_t, epsilon
📝 自测问题¶
基础问题¶
- 前向过程的特性
- 前向过程是固定的还是学习的?为什么?
- 为什么使用高斯噪声?可以用其他分布吗?
-
马尔可夫性在前向过程中起什么作用?
-
噪声调度
- 解释线性调度和余弦调度的区别
- 如果 \(\beta_t\) 太大或太小会怎样?
-
如何选择合适的噪声调度?
-
闭合形式
- 为什么闭合形式很重要?
- 推导 \(q(x_t | x_0)\) 的过程中用到了哪些数学性质?
- 直接采样和逐步模拟有什么区别?
编程练习¶
- 实现不同的噪声调度(quadratic、sigmoid)并可视化
- 计算并绘制信噪比(SNR)随时间的变化
- 实现一个函数,批量采样不同时间步的 \(x_t\)
思考题¶
- 如果前向过程不是马尔可夫的,会怎样?
- 为什么 \(\bar{\alpha}_T\) 要接近0?如果不接近会怎样?
- 前向过程的设计对反向过程有什么影响?
🔗 下一步¶
理解了前向过程后,我们将学习反向去噪过程,这是扩散模型真正"学习"的部分。
→ 下一步:03-反向去噪过程.md