核心理论 - 机器学习理论的深层解析¶
目录¶
1. 泛化理论¶
1.1 PAC学习框架¶
定义(PAC可学习性):一个概念类 \(\mathcal{C}\) 是PAC可学习的,如果存在一个算法 \(\mathcal{A}\),对于任意 \(\epsilon > 0\)(精度)、\(\delta > 0\)(置信度)、任意目标概念 \(c \in \mathcal{C}\) 和任意分布 \(\mathcal{D}\),算法 \(\mathcal{A}\) 从 \(\mathcal{D}\) 中采样 \(m\) 个样本后,以至少 \(1-\delta\) 的概率输出一个假设 \(h\),使得:
其中 \(R(h) = \mathbb{E}_{x \sim \mathcal{D}}[\mathbb{1}_{h(x) \neq c(x)}]\) 是泛化误差。
样本复杂度:
对于有限假设空间 \(|\mathcal{H}| < \infty\):
定理(PAC学习基本定理):
对于二分类问题,ERM(经验风险最小化)算法满足:
其中 \(d\) 是VC维,\(h^*\) 是最优假设。
import numpy as np
from collections.abc import Callable
import matplotlib.pyplot as plt
class PACLearningFramework:
"""
PAC学习框架的实现与验证
"""
def __init__(self, hypothesis_space_size: int, epsilon: float = 0.1, delta: float = 0.05): # __init__构造方法,创建对象时自动调用
"""
初始化PAC学习框架
Args:
hypothesis_space_size: 假设空间大小 |H|
epsilon: 精度参数
delta: 置信度参数
"""
self.hypothesis_space_size = hypothesis_space_size
self.epsilon = epsilon
self.delta = delta
def sample_complexity_finite(self) -> int:
"""
计算有限假设空间的样本复杂度
Returns:
所需样本数 m
"""
# m >= (1/epsilon) * (ln|H| + ln(1/delta))
m = int(np.ceil((1.0 / self.epsilon) *
(np.log(self.hypothesis_space_size) + np.log(1.0 / self.delta))))
return m
def sample_complexity_vc(self, vc_dim: int) -> int:
"""
基于VC维的样本复杂度
Args:
vc_dim: VC维
Returns:
所需样本数 m
"""
# m >= (1/epsilon) * (8*VC(H)*log(13/epsilon) + 4*log(2/delta))
m = int(np.ceil((1.0 / self.epsilon) *
(8 * vc_dim * np.log(13.0 / self.epsilon) +
4 * np.log(2.0 / self.delta))))
return m
def generalization_bound_vc(self, m: int, vc_dim: int) -> float:
"""
基于VC维的泛化误差上界
Args:
m: 样本数
vc_dim: VC维
Returns:
泛化误差上界
"""
# R(h) <= R_emp(h) + sqrt((2d*ln(em/d) + 2*ln(4/delta))/m)
bound = np.sqrt((2 * vc_dim * np.log(np.e * m / vc_dim) +
2 * np.log(4.0 / self.delta)) / m)
return bound
def probably_approximately_correct(self,
empirical_risk: float,
m: int,
vc_dim: int) -> tuple[bool, float]:
"""
验证PAC条件
Args:
empirical_risk: 经验风险
m: 样本数
vc_dim: VC维
Returns:
(是否满足PAC, 泛化误差上界)
"""
generalization_gap = self.generalization_bound_vc(m, vc_dim)
true_risk_upper = empirical_risk + generalization_gap
# 以至少 1-delta 的概率,真实误差不超过上界
satisfies_pac = generalization_gap <= self.epsilon
return satisfies_pac, true_risk_upper
def visualize_sample_complexity(self, vc_dims: list[int] = None):
"""可视化样本复杂度随VC维的变化"""
if vc_dims is None:
vc_dims = list(range(1, 101))
sample_complexities = [self.sample_complexity_vc(d) for d in vc_dims] # 列表推导式,简洁创建列表
plt.figure(figsize=(10, 6))
plt.plot(vc_dims, sample_complexities, 'b-', linewidth=2)
plt.xlabel('VC Dimension', fontsize=12)
plt.ylabel('Sample Complexity m', fontsize=12)
plt.title(f'PAC Sample Complexity (ε={self.epsilon}, δ={self.delta})', fontsize=14)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('pac_sample_complexity.png', dpi=150)
plt.show()
# 示例:PAC学习框架验证
def pac_learning_example():
"""PAC学习框架示例"""
# 创建PAC框架实例
pac = PACLearningFramework(hypothesis_space_size=1000, epsilon=0.1, delta=0.05)
# 计算不同VC维下的样本复杂度
vc_dims = [5, 10, 50, 100]
print("=" * 60)
print("PAC学习框架分析")
print("=" * 60)
print(f"\n假设空间大小 |H| = {pac.hypothesis_space_size}")
print(f"精度参数 ε = {pac.epsilon}")
print(f"置信度参数 δ = {pac.delta}")
print(f"\n有限假设空间样本复杂度: m >= {pac.sample_complexity_finite()}")
print("\n不同VC维下的样本复杂度:")
for d in vc_dims:
m = pac.sample_complexity_vc(d)
print(f" VC维 = {d:3d}: m >= {m:5d}")
# 验证泛化界
print("\n泛化误差界分析 (m=1000):")
for d in [10, 50, 100]:
bound = pac.generalization_bound_vc(m=1000, vc_dim=d)
satisfies, upper = pac.probably_approximately_correct(
empirical_risk=0.05, m=1000, vc_dim=d)
print(f" VC维 = {d:3d}: 泛化界 = {bound:.4f}, 满足PAC: {satisfies}")
return pac
# 运行示例
if __name__ == "__main__":
pac = pac_learning_example()
1.2 VC维与Rademacher复杂度¶
VC维定义:假设空间 \(\mathcal{H}\) 的VC维是能够被 \(\mathcal{H}\) 打散(shatter)的最大点集的大小。
打散定义:一个点集 \(S = \{x_1, ..., x_m\}\) 能被 \(\mathcal{H}\) 打散,如果对于所有 \(2^m\) 种标签组合,都存在 \(h \in \mathcal{H}\) 能够实现这种标签分配。
常见模型的VC维:
| 模型 | VC维 | 说明 |
|---|---|---|
| 线性分类器(d维) | \(d+1\) | 超平面分类器 |
| 感知机(d维) | \(d+1\) | 带阈值 |
| 神经网络 | \(O(W\log W)\) | \(W\)为权重数 |
| 决策树(深度k) | \(2^k\) | 二叉树 |
| SVM(RBF核) | \(\infty\) | 无限VC维 |
Rademacher复杂度:
其中 \(\sigma_i \in \{-1, +1\}\) 是Rademacher随机变量。
泛化界(基于Rademacher复杂度):
以至少 \(1-\delta\) 的概率,对所有 \(h \in \mathcal{H}\):
class ComplexityMeasures:
"""
复杂度度量:VC维和Rademacher复杂度
"""
@staticmethod # @staticmethod静态方法,无需实例即可调用
def vc_dim_linear(d: int) -> int:
"""d维线性分类器的VC维"""
return d + 1
@staticmethod
def vc_dim_neural_network(weights: int, depth: int = None) -> int:
"""
神经网络的VC维上界
Args:
weights: 权重参数数量
depth: 网络深度(可选)
Returns:
VC维上界
"""
# 简化上界: O(W log W)
if depth is None:
return int(weights * np.log2(weights))
else:
# 更紧的上界考虑深度
return int(depth * weights * np.log2(weights))
@staticmethod
def rademacher_complexity_bound(m: int, vc_dim: int) -> float:
"""
基于VC维的Rademacher复杂度上界
R_m(H) <= sqrt(2d*ln(em/d)/m)
"""
if m <= vc_dim:
return 1.0
return np.sqrt(2 * vc_dim * np.log(np.e * m / vc_dim) / m)
@staticmethod
def empirical_rademacher(X: np.ndarray,
hypothesis_class: Callable,
n_samples: int = 100) -> float:
"""
估计经验Rademacher复杂度
Args:
X: 数据矩阵 (n_samples, n_features)
hypothesis_class: 假设类函数
n_samples: Rademacher采样次数
Returns:
经验Rademacher复杂度估计
"""
n = len(X)
rademacher_values = []
for _ in range(n_samples):
# 生成Rademacher随机变量
sigma = np.random.choice([-1, 1], size=n)
# 在假设类上最大化
# 简化版本:随机采样假设
max_correlation = 0
for _ in range(50): # 采样假设
h = hypothesis_class(X)
correlation = np.mean(sigma * h)
max_correlation = max(max_correlation, correlation)
rademacher_values.append(max_correlation)
return np.mean(rademacher_values)
@staticmethod
def margin_bound(rademacher: float,
margin: float,
m: int,
delta: float = 0.05) -> float:
"""
基于间隔的泛化界
对于间隔为margin的分类器,泛化误差上界
"""
# R(h) <= R_emp^margin(h) + (2/margin)*R_m(H) + sqrt(ln(1/delta)/(2m))
bound = (2.0 / margin) * rademacher + np.sqrt(np.log(1.0 / delta) / (2 * m))
return bound
def complexity_analysis_example():
"""复杂度分析示例"""
print("\n" + "=" * 60)
print("复杂度度量分析")
print("=" * 60)
cm = ComplexityMeasures()
# VC维分析
print("\n1. VC维分析")
print("-" * 40)
dimensions = [10, 100, 1000, 10000]
for d in dimensions:
vc_linear = cm.vc_dim_linear(d)
print(f" 输入维度 d={d:5d}: 线性分类器VC维 = {vc_linear:6d}")
# 神经网络VC维
print("\n 神经网络VC维上界:")
weight_counts = [1000, 10000, 100000, 1000000]
for w in weight_counts:
vc_nn = cm.vc_dim_neural_network(w)
print(f" 权重数 W={w:8d}: VC维上界 ≈ {vc_nn:10d}")
# Rademacher复杂度
print("\n2. Rademacher复杂度分析")
print("-" * 40)
sample_sizes = [100, 1000, 10000, 100000]
vc_dim = 100
for m in sample_sizes:
rad_bound = cm.rademacher_complexity_bound(m, vc_dim)
print(f" 样本数 m={m:6d}, VC维 d={vc_dim:3d}: "
f"Rademacher上界 = {rad_bound:.4f}")
# 间隔界
print("\n3. 基于间隔的泛化界")
print("-" * 40)
margins = [0.1, 0.5, 1.0, 2.0]
m, rad = 1000, 0.1
for margin in margins:
bound = cm.margin_bound(rad, margin, m)
print(f" 间隔 γ={margin:.1f}: 泛化界 = {bound:.4f}")
# 运行示例
if __name__ == "__main__":
complexity_analysis_example()
1.3 均匀收敛与泛化界¶
均匀收敛定理:对于假设空间 \(\mathcal{H}\),以至少 \(1-\delta\) 的概率:
McDiarmid不等式:设 \(f: \mathcal{X}^m \rightarrow \mathbb{R}\) 满足有界差分条件:
则:
基于覆盖数的泛化界:
其中 \(\mathcal{N}(\mathcal{H}, \epsilon, L_2)\) 是 \(L_2\) 覆盖数。
class UniformConvergence:
"""
均匀收敛理论与泛化界
"""
@staticmethod
def mcdiarmid_inequality(bounded_differences: list[float],
t: float) -> float:
"""
McDiarmid不等式
P(|f - E[f]| >= t) <= 2*exp(-2t^2 / sum(c_i^2))
Args:
bounded_differences: 有界差分 c_i 列表
t: 偏离阈值
Returns:
概率上界
"""
sum_c_squared = sum(c**2 for c in bounded_differences)
bound = 2 * np.exp(-2 * t**2 / sum_c_squared)
return bound
@staticmethod
def covering_number_bound(m: int,
covering_number: int,
epsilon: float) -> float:
"""
基于覆盖数的Rademacher复杂度上界
Args:
m: 样本数
covering_number: 覆盖数 N(H, epsilon)
epsilon: 覆盖半径
Returns:
Rademacher复杂度上界
"""
bound = epsilon + np.sqrt(2 * np.log(covering_number) / m)
return bound
@staticmethod
def uniform_convergence_bound(rademacher: float,
m: int,
delta: float = 0.05) -> float:
"""
均匀收敛界
sup|R(h) - R_hat(h)| <= 2*R_m(H) + sqrt(ln(1/delta)/(2m))
"""
bound = 2 * rademacher + np.sqrt(np.log(1.0 / delta) / (2 * m))
return bound
@staticmethod
def fat_shattering_dimension(margin: float,
data: np.ndarray) -> int:
"""
胖粉碎维(Fat-shattering dimension)
实值函数类的推广VC维
Args:
margin: 间隔参数
data: 数据集
Returns:
胖粉碎维估计
"""
# 简化实现:基于间隔的VC维估计
n_samples, n_features = data.shape
# 胖粉碎维与间隔成反比
# 对于线性分类器: fat_gamma = O(1/gamma^2)
fat_dim = int(n_features / (margin ** 2))
return min(fat_dim, n_samples)
def uniform_convergence_example():
"""均匀收敛示例"""
print("\n" + "=" * 60)
print("均匀收敛理论")
print("=" * 60)
uc = UniformConvergence()
# McDiarmid不等式
print("\n1. McDiarmid不等式")
print("-" * 40)
# 经验风险的有界差分
n_samples = 1000
bounded_diffs = [1.0 / n_samples] * n_samples # 每个样本影响最大1/n
thresholds = [0.01, 0.05, 0.1, 0.2]
for t in thresholds:
prob_bound = uc.mcdiarmid_inequality(bounded_diffs, t)
print(f" 偏离阈值 t={t:.2f}: P(|f-E[f]|>={t}) <= {prob_bound:.6f}")
# 均匀收敛界
print("\n2. 均匀收敛界")
print("-" * 40)
m_values = [100, 500, 1000, 5000]
rademacher = 0.1
for m in m_values:
bound = uc.uniform_convergence_bound(rademacher, m)
print(f" 样本数 m={m:4d}: 均匀收敛界 = {bound:.4f}")
# 胖粉碎维
print("\n3. 胖粉碎维(实值函数类)")
print("-" * 40)
margins = [0.1, 0.5, 1.0, 2.0]
data = np.random.randn(1000, 100)
for margin in margins:
fat_dim = uc.fat_shattering_dimension(margin, data)
print(f" 间隔 γ={margin:.1f}: 胖粉碎维 ≈ {fat_dim}")
# 运行示例
if __name__ == "__main__":
uniform_convergence_example()
1.4 深度学习中的泛化之谜¶
泛化之谜现象:
深度神经网络通常具有: - 参数数量 \(>>\) 样本数量(过参数化) - VC维理论上无限或极大 - 但仍能泛化良好
可能的解释:
- 隐式正则化(Implicit Regularization)
- 优化算法(如SGD)隐式偏好简单解
-
梯度下降收敛到平坦极小值
-
双下降现象(Double Descent)
- 传统U型风险曲线在过参数化后再次下降
-
插值阈值后的"良性过拟合"
-
PAC-Bayes理论
- 考虑后验分布而非点估计
- 与优化轨迹相关
class DeepGeneralizationMystery:
"""
深度学习泛化之谜的分析
"""
@staticmethod
def effective_complexity(parameters: int,
training_epochs: int,
batch_size: int,
learning_rate: float) -> float:
"""
估计有效复杂度(考虑优化过程)
启发式:有效复杂度 < 参数数量
Args:
parameters: 参数数量
training_epochs: 训练轮数
batch_size: 批量大小
learning_rate: 学习率
Returns:
有效复杂度估计
"""
# 启发式:有效复杂度与训练步数相关
steps = training_epochs * (parameters / batch_size)
# 学习率影响收敛到的解的复杂度
lr_factor = np.tanh(learning_rate * 10) # 归一化
# 有效复杂度上界
effective = min(parameters, steps * lr_factor)
return effective
@staticmethod
def flat_minima_generalization(hessian_eigenvalues: np.ndarray,
empirical_risk: float) -> float:
"""
基于Hessian特征值的平坦极小值泛化分析
平坦极小值通常泛化更好
Args:
hessian_eigenvalues: Hessian矩阵特征值
empirical_risk: 经验风险
Returns:
泛化能力评分(越高越好)
"""
# 计算Hessian的迹(特征值之和)
trace = np.sum(hessian_eigenvalues)
# 计算特征值分布的集中度
eigenvalue_variance = np.var(hessian_eigenvalues)
# 平坦度指标:迹越小、方差越小越平坦
flatness_score = 1.0 / (1.0 + trace + eigenvalue_variance)
# 综合泛化评分
generalization_score = (1 - empirical_risk) * flatness_score
return generalization_score
@staticmethod
def double_descent_curve(n_samples: int,
max_params: int,
noise_level: float = 0.1) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
生成双下降曲线
Args:
n_samples: 样本数
max_params: 最大参数数
noise_level: 噪声水平
Returns:
(参数数数组, 训练误差, 测试误差)
"""
param_counts = np.arange(1, max_params + 1)
train_errors = []
test_errors = []
for n_params in param_counts:
if n_params < n_samples:
# 欠参数化区域:传统U型曲线
underfitting = np.exp(-n_params / (n_samples * 0.3))
overfitting = 0.1 * (n_params / n_samples) ** 2
test_err = underfitting + overfitting + noise_level
elif n_params == n_samples:
# 插值阈值:峰值
test_err = 0.5 + noise_level
else:
# 过参数化区域:良性过拟合
interpolation = noise_level * (n_samples / n_params)
test_err = interpolation + 0.05
# 训练误差单调递减
train_err = max(0.01, noise_level * np.exp(-n_params / n_samples))
train_errors.append(train_err)
test_errors.append(test_err)
return param_counts, np.array(train_errors), np.array(test_errors) # np.array创建NumPy数组
@staticmethod
def pac_bayes_bound(empirical_risk: float,
kl_divergence: float,
m: int,
delta: float = 0.05) -> float:
"""
PAC-Bayes泛化界
Args:
empirical_risk: 经验风险
kl_divergence: KL散度 D(Q||P)
m: 样本数
delta: 置信度
Returns:
PAC-Bayes泛化界
"""
# KL项
kl_term = (kl_divergence + np.log(2 * np.sqrt(m) / delta)) / m
# PAC-Bayes界
bound = empirical_risk + np.sqrt(kl_term / 2)
return bound
def deep_generalization_example():
"""深度学习泛化之谜示例"""
print("\n" + "=" * 60)
print("深度学习泛化之谜")
print("=" * 60)
dgm = DeepGeneralizationMystery()
# 有效复杂度
print("\n1. 有效复杂度分析")
print("-" * 40)
configs = [
{"params": 10000, "epochs": 100, "batch": 32, "lr": 0.001},
{"params": 1000000, "epochs": 50, "batch": 128, "lr": 0.01},
{"params": 100000000, "epochs": 10, "batch": 256, "lr": 0.1},
]
for config in configs:
effective = dgm.effective_complexity(
config["params"], config["epochs"],
config["batch"], config["lr"]
)
ratio = effective / config["params"]
print(f" 参数: {config['params']:10d}, 有效复杂度: {effective:12.1f}, "
f"比例: {ratio:.4f}")
# 双下降曲线
print("\n2. 双下降现象")
print("-" * 40)
n_samples = 100
max_params = 300
params, train_err, test_err = dgm.double_descent_curve(
n_samples, max_params, noise_level=0.1)
print(f" 样本数: {n_samples}")
print(f" 插值阈值(参数数=样本数): {n_samples}")
print(f" 阈值处测试误差: {test_err[n_samples-1]:.4f}")
print(f" 最大参数处测试误差: {test_err[-1]:.4f}")
# 平坦极小值
print("\n3. 平坦极小值分析")
print("-" * 40)
# 模拟Hessian特征值
sharp_eigenvalues = np.array([10, 8, 5, 3, 2, 1, 0.5, 0.1]) # 尖锐
flat_eigenvalues = np.array([2, 1.5, 1, 0.8, 0.5, 0.3, 0.2, 0.1]) # 平坦
sharp_score = dgm.flat_minima_generalization(sharp_eigenvalues, 0.05)
flat_score = dgm.flat_minima_generalization(flat_eigenvalues, 0.05)
print(f" 尖锐极小值泛化评分: {sharp_score:.4f}")
print(f" 平坦极小值泛化评分: {flat_score:.4f}")
print(f" 平坦/尖锐比值: {flat_score/sharp_score:.2f}x")
# PAC-Bayes
print("\n4. PAC-Bayes界")
print("-" * 40)
kl_divs = [1, 10, 100, 1000]
for kl in kl_divs:
bound = dgm.pac_bayes_bound(0.05, kl, m=1000)
print(f" KL散度={kl:4d}: PAC-Bayes界 = {bound:.4f}")
# 运行示例
if __name__ == "__main__":
deep_generalization_example()
2. 表示学习理论¶
2.1 流形假设与数据表示¶
流形假设:高维数据(如图像、文本)实际分布在低维流形上。
数学表述:
设数据 \(x \in \mathbb{R}^d\) 实际来自 \(k\) 维流形 \(\mathcal{M} \subset \mathbb{R}^d\),其中 \(k << d\)。
局部线性嵌入(LLE):
- 为每个点找到 \(k\) 近邻
- 计算重构权重 \(W_{ij}\),最小化: $\(\mathcal{E}(W) = \sum_i \|x_i - \sum_j W_{ij}x_j\|^2\)$
- 在低维空间寻找嵌入 \(Y\),保持权重: $\(\Phi(Y) = \sum_i \|y_i - \sum_j W_{ij}y_j\|^2\)$
t-SNE优化目标:
其中 \(p_{ij}\) 是高维空间中的相似度,\(q_{ij}\) 是低维空间中的相似度。
class ManifoldLearning:
"""
流形学习理论与实现
"""
@staticmethod
def lle(X: np.ndarray, n_components: int = 2, n_neighbors: int = 10) -> np.ndarray:
"""
局部线性嵌入 (LLE)
Args:
X: 高维数据 (n_samples, n_features)
n_components: 目标维度
n_neighbors: 近邻数
Returns:
低维嵌入 (n_samples, n_components)
"""
n_samples = X.shape[0]
# 1. 计算k近邻
from scipy.spatial.distance import cdist
distances = cdist(X, X)
neighbors = np.argsort(distances, axis=1)[:, 1:n_neighbors+1]
# 2. 计算重构权重
W = np.zeros((n_samples, n_samples))
for i in range(n_samples):
# 中心化的近邻点
Xi = X[neighbors[i]] - X[i]
# 局部协方差矩阵
C = Xi @ Xi.T
# 添加正则化
C += np.eye(n_neighbors) * 1e-3 * np.trace(C)
# 求解权重
w = np.linalg.solve(C, np.ones(n_neighbors))
w /= np.sum(w) # 归一化
W[i, neighbors[i]] = w
# 3. 计算低维嵌入
M = (np.eye(n_samples) - W).T @ (np.eye(n_samples) - W)
# 特征值分解
eigenvalues, eigenvectors = np.linalg.eigh(M)
# 选择最小的非零特征值对应的特征向量
Y = eigenvectors[:, 1:n_components+1]
return Y
@staticmethod
def intrinsic_dimension(X: np.ndarray,
k: int = 10,
method: str = 'correlation') -> float:
"""
估计数据的内在维度
Args:
X: 数据矩阵
k: 近邻数
method: 估计方法
Returns:
内在维度估计
"""
from scipy.spatial.distance import cdist
n_samples = X.shape[0]
distances = cdist(X, X)
# 对每个点计算到k近邻的距离
neighbor_distances = np.sort(distances, axis=1)[:, 1:k+1]
if method == 'correlation':
# 基于距离相关性的方法
# 在d维流形上,体积按r^d增长
# 计算平均距离
r = np.mean(neighbor_distances, axis=0)
k_values = np.arange(1, k+1)
# 拟合 log(k) ~ d * log(r)
log_r = np.log(r + 1e-10)
log_k = np.log(k_values)
# 线性回归估计斜率
slope = np.polyfit(log_r, log_k, 1)[0]
return slope
elif method == 'mle':
# 最大似然估计
# d_hat = (1/(N-1)) * sum(log(r_k / r_i))^{-1}
dims = []
for i in range(n_samples):
r_i = neighbor_distances[i, :-1]
r_k = neighbor_distances[i, -1]
if r_k > 0:
d_hat = 1.0 / (np.mean(np.log(r_k / (r_i + 1e-10))) + 1e-10)
dims.append(d_hat)
return np.median(dims)
return 0.0
@staticmethod
def manifold_hypothesis_validation(X: np.ndarray,
embedding_methods: list[str] = None) -> dict:
"""
验证流形假设
Args:
X: 数据矩阵
embedding_methods: 嵌入方法列表
Returns:
验证结果字典
"""
if embedding_methods is None:
embedding_methods = ['pca', 'lle', 'tsne']
results = {}
# 估计内在维度
intrinsic_dim = ManifoldLearning.intrinsic_dimension(X)
results['intrinsic_dimension'] = intrinsic_dim
results['ambient_dimension'] = X.shape[1]
results['dimension_ratio'] = intrinsic_dim / X.shape[1]
print(f"环境维度: {X.shape[1]}")
print(f"估计内在维度: {intrinsic_dim:.2f}")
print(f"维度比: {intrinsic_dim/X.shape[1]:.4f}")
return results
def manifold_learning_example():
"""流形学习示例"""
print("\n" + "=" * 60)
print("流形学习理论")
print("=" * 60)
ml = ManifoldLearning()
# 生成瑞士卷数据
from sklearn.datasets import make_swiss_roll
X, t = make_swiss_roll(n_samples=1000, noise=0.1, random_state=42)
print("\n1. 瑞士卷数据流形分析")
print("-" * 40)
print(f" 数据形状: {X.shape}")
# 估计内在维度
intrinsic_dim = ml.intrinsic_dimension(X, k=10)
print(f" 估计内在维度: {intrinsic_dim:.2f} (真实: 2)")
# LLE降维
print("\n2. LLE降维")
print("-" * 40)
X_lle = ml.lle(X, n_components=2, n_neighbors=10)
print(f" 降维后形状: {X_lle.shape}")
# 验证流形假设
print("\n3. 流形假设验证")
print("-" * 40)
results = ml.manifold_hypothesis_validation(X)
print(f" 维度压缩比: {results['dimension_ratio']:.4f}")
return X, X_lle
# 运行示例
if __name__ == "__main__":
X_swiss, X_lle = manifold_learning_example()
2.2 分布式表示与层次表示¶
分布式表示:信息分布在多个神经元/特征上,每个神经元参与表示多个概念。
优点: - 指数级表示能力:\(n\) 个二元神经元可表示 \(2^n\) 个模式 - 泛化:相似概念有相似表示 - 组合性:可以组合基本特征表示复杂概念
层次表示:
深度网络学习层次特征: - 底层:边缘、颜色、纹理 - 中层:形状、部件 - 高层:物体、语义
数学分析:
设 \(h^{(l)}\) 为第 \(l\) 层的表示,则:
表示能力定理:
具有 \(L\) 层、每层 \(n\) 个神经元的网络,其可表示的函数类大小为 \(O(2^{n^L})\)。
class DistributedRepresentation:
"""
分布式表示与层次表示分析
"""
@staticmethod
def representation_capacity(n_neurons: int,
n_layers: int,
binary: bool = True) -> int:
"""
计算网络的表示能力
Args:
n_neurons: 每层神经元数
n_layers: 层数
binary: 是否二元激活
Returns:
可表示的模式数
"""
if binary:
# 二元激活:每层有 2^n 种状态
patterns_per_layer = 2 ** n_neurons
# 总表示能力(考虑层次组合)
total_capacity = patterns_per_layer ** n_layers
else:
# 连续激活:无限可能,考虑精度
precision = 16 # 假设16位精度
states_per_neuron = 2 ** precision
total_capacity = states_per_neuron ** (n_neurons * n_layers)
return total_capacity
@staticmethod
def compositionality_analysis(features: np.ndarray,
compositions: list[tuple[int, ...]]) -> dict:
"""
分析表示的组合性
Args:
features: 特征矩阵 (n_samples, n_features)
compositions: 组合关系列表
Returns:
组合性分析结果
"""
results = {}
# 计算特征之间的相关性
feature_corr = np.corrcoef(features.T)
# 分析组合关系
composition_scores = []
for comp in compositions:
if len(comp) >= 2:
# 检查组合特征是否线性可分
comp_features = features[:, comp]
# 简单线性可分性检验
# 如果组合特征的方差大于单个特征,说明有组合效应
joint_variance = np.var(np.sum(comp_features, axis=1))
individual_variances = [np.var(features[:, i]) for i in comp]
composition_score = joint_variance / (sum(individual_variances) + 1e-10)
composition_scores.append(composition_score)
results['mean_composition_score'] = np.mean(composition_scores)
results['feature_correlation_mean'] = np.mean(np.abs(feature_corr[np.triu_indices_from(feature_corr, k=1)]))
return results
@staticmethod
def hierarchical_decomposition(X: np.ndarray,
layer_sizes: list[int]) -> list[np.ndarray]:
"""
层次分解数据表示
Args:
X: 输入数据
layer_sizes: 每层的大小
Returns:
各层表示列表
"""
representations = [X]
current = X
for size in layer_sizes:
# 使用PCA作为简单的层次分解
from sklearn.decomposition import PCA
pca = PCA(n_components=min(size, current.shape[1]))
current = pca.fit_transform(current)
representations.append(current)
return representations
@staticmethod
def disentanglement_score(representations: np.ndarray,
ground_truth_factors: np.ndarray) -> float:
"""
计算解耦分数
衡量表示中每个维度是否对应独立的生成因子
Args:
representations: 学习到的表示 (n_samples, n_features)
ground_truth_factors: 真实因子 (n_samples, n_factors)
Returns:
解耦分数 [0, 1]
"""
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import r2_score
n_factors = ground_truth_factors.shape[1]
n_features = representations.shape[1]
# 对每个真实因子,找到最相关的表示维度
importance_matrix = np.zeros((n_factors, n_features))
for i in range(n_factors):
# 使用随机森林评估每个特征的重要性
rf = RandomForestRegressor(n_estimators=10, random_state=42)
rf.fit(representations, ground_truth_factors[:, i])
importance_matrix[i] = rf.feature_importances_
# 计算解耦程度:每个因子应该主要由一个特征解释
max_importance_per_factor = np.max(importance_matrix, axis=1)
disentanglement = np.mean(max_importance_per_factor)
return disentanglement
def distributed_representation_example():
"""分布式表示示例"""
print("\n" + "=" * 60)
print("分布式表示与层次表示")
print("=" * 60)
dr = DistributedRepresentation()
# 表示能力分析
print("\n1. 表示能力分析")
print("-" * 40)
configs = [
{"neurons": 100, "layers": 1},
{"neurons": 100, "layers": 3},
{"neurons": 1000, "layers": 3},
{"neurons": 1000, "layers": 10},
]
for config in configs:
capacity = dr.representation_capacity(config["neurons"], config["layers"])
log_capacity = np.log10(capacity) if capacity > 0 else 0
print(f" 神经元={config['neurons']:4d}, 层数={config['layers']:2d}: "
f"log10(容量) = {log_capacity:.2f}")
# 层次分解
print("\n2. 层次分解")
print("-" * 40)
# 生成层次数据
np.random.seed(42)
n_samples = 1000
# 底层特征(如边缘)
low_level = np.random.randn(n_samples, 100)
# 中层特征(如形状)- 底层特征的组合
mid_level = np.hstack([
np.mean(low_level[:, :50], axis=1, keepdims=True),
np.std(low_level[:, 50:], axis=1, keepdims=True),
])
# 高层特征(如物体)- 中层特征的组合
high_level = np.hstack([
(mid_level[:, 0] > 0).astype(float).reshape(-1, 1), # 链式调用,连续执行多个方法 # reshape重塑张量形状
(mid_level[:, 1] < 0.5).astype(float).reshape(-1, 1),
])
X = np.hstack([low_level, mid_level, high_level])
representations = dr.hierarchical_decomposition(X, [50, 20, 5])
print(f" 原始数据维度: {X.shape}")
for i, rep in enumerate(representations[1:], 1): # enumerate同时获取索引和元素 # 切片操作取子序列
print(f" 第{i}层表示维度: {rep.shape}")
# 组合性分析
print("\n3. 组合性分析")
print("-" * 40)
# 模拟组合关系
compositions = [(0, 1), (2, 3), (4, 5, 6)]
comp_results = dr.compositionality_analysis(X[:, :10], compositions)
print(f" 平均组合分数: {comp_results['mean_composition_score']:.4f}")
print(f" 特征相关性: {comp_results['feature_correlation_mean']:.4f}")
# 运行示例
if __name__ == "__main__":
distributed_representation_example()
2.3 表示学习的理论保证¶
表示学习的目标:
找到映射 \(\phi: \mathcal{X} \rightarrow \mathcal{Z}\),使得:
- 信息量保持:\(I(X; Z)\) 最大化
- 下游任务友好:\(Z\) 对目标任务 \(Y\) 有预测性
- 可解释性:\(Z\) 的维度有语义意义
信息瓶颈理论:
- \(I(X; Z)\):压缩(最小化)
- \(I(Z; Y)\):预测性(最大化)
- \(\beta\):权衡参数
对比学习理论:
InfoNCE损失最小化等价于最大化表示与目标之间的互信息下界:
class RepresentationLearningTheory:
"""
表示学习的理论分析
"""
@staticmethod
def mutual_information_lower_bound(X: np.ndarray,
Z: np.ndarray,
method: str = 'knn') -> float:
"""
估计互信息的下界
Args:
X: 原始数据
Z: 学习到的表示
method: 估计方法
Returns:
互信息估计
"""
if method == 'knn':
# 基于k近邻的互信息估计(Kraskov-Stögbauer-Grassberger)
from sklearn.neighbors import NearestNeighbors
n_samples = len(X)
k = min(5, n_samples // 10)
# 联合空间 (X, Z)
XZ = np.hstack([X, Z])
# 计算近邻
nn_x = NearestNeighbors(n_neighbors=k+1).fit(X)
nn_z = NearestNeighbors(n_neighbors=k+1).fit(Z)
nn_xz = NearestNeighbors(n_neighbors=k+1).fit(XZ)
distances_x, _ = nn_x.kneighbors(X)
distances_z, _ = nn_z.kneighbors(Z)
distances_xz, _ = nn_xz.kneighbors(XZ)
# 互信息估计
eps_x = 2 * distances_x[:, k]
eps_z = 2 * distances_z[:, k]
eps_xz = 2 * distances_xz[:, k]
# 简化估计
mi_estimate = (np.mean(np.log(eps_x + 1e-10)) +
np.mean(np.log(eps_z + 1e-10)) -
np.mean(np.log(eps_xz + 1e-10)))
return max(0, mi_estimate)
return 0.0
@staticmethod
def information_bottleneck_objective(X: np.ndarray,
Y: np.ndarray,
Z: np.ndarray,
beta: float = 1.0) -> float:
"""
信息瓶颈目标函数
min I(X; Z) - beta * I(Z; Y)
Args:
X: 输入
Y: 目标
Z: 表示
beta: 权衡参数
Returns:
IB目标值
"""
# 估计各项互信息
I_XZ = RepresentationLearningTheory.mutual_information_lower_bound(X, Z)
I_ZY = RepresentationLearningTheory.mutual_information_lower_bound(Z, Y.reshape(-1, 1) if Y.ndim == 1 else Y)
# IB目标
objective = I_XZ - beta * I_ZY
return objective
@staticmethod
def contrastive_learning_bound(n_negative: int) -> float:
"""
对比学习的互信息下界
InfoNCE >= -log(N) - I(Z; Y)
Args:
n_negative: 负样本数
Returns:
下界常数
"""
return -np.log(n_negative)
@staticmethod
def representation_quality_metrics(Z: np.ndarray,
Y: np.ndarray,
task_type: str = 'classification') -> dict:
"""
评估表示质量的多个指标
Args:
Z: 表示
Y: 标签
task_type: 任务类型
Returns:
质量指标字典
"""
metrics = {}
# 1. 线性可分性
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
if task_type == 'classification':
clf = LogisticRegression(max_iter=1000, random_state=42)
scores = cross_val_score(clf, Z, Y, cv=5)
metrics['linear_separability'] = np.mean(scores)
# 2. 表示的紧凑性
# 计算特征值的熵
from sklearn.decomposition import PCA
pca = PCA().fit(Z)
eigenvalues = pca.explained_variance_ratio_
# 有效维度
entropy = -np.sum(eigenvalues * np.log(eigenvalues + 1e-10))
metrics['effective_dimensionality'] = np.exp(entropy)
metrics['explained_variance_top10'] = np.sum(eigenvalues[:10])
# 3. 类内/类间方差比
if task_type == 'classification':
classes = np.unique(Y)
overall_mean = np.mean(Z, axis=0)
# 类间散度
S_b = np.zeros((Z.shape[1], Z.shape[1]))
# 类内散度
S_w = np.zeros((Z.shape[1], Z.shape[1]))
for c in classes:
Z_c = Z[Y == c]
mean_c = np.mean(Z_c, axis=0)
S_b += len(Z_c) * np.outer(mean_c - overall_mean, mean_c - overall_mean)
S_w += np.cov(Z_c.T) * (len(Z_c) - 1)
# Fisher ratio
if np.linalg.det(S_w) > 1e-10:
fisher_ratio = np.trace(S_b) / np.trace(S_w)
metrics['fisher_ratio'] = fisher_ratio
return metrics
def representation_theory_example():
"""表示学习理论示例"""
print("\n" + "=" * 60)
print("表示学习理论")
print("=" * 60)
rlt = RepresentationLearningTheory()
# 生成模拟数据
np.random.seed(42)
n_samples = 500
# 输入数据
X = np.random.randn(n_samples, 50)
# 目标(基于X的某些维度)
Y = (X[:, 0] + X[:, 1] > 0).astype(int)
# 学习到的表示(模拟)
Z = np.hstack([
X[:, :2] + 0.1 * np.random.randn(n_samples, 2), # 相关信息
0.1 * np.random.randn(n_samples, 8) # 噪声
])
# 互信息估计
print("\n1. 互信息分析")
print("-" * 40)
I_XZ = rlt.mutual_information_lower_bound(X, Z)
I_ZY = rlt.mutual_information_lower_bound(Z, Y.reshape(-1, 1))
print(f" I(X; Z) ≈ {I_XZ:.4f}")
print(f" I(Z; Y) ≈ {I_ZY:.4f}")
# 信息瓶颈
print("\n2. 信息瓶颈分析")
print("-" * 40)
for beta in [0.1, 1.0, 10.0]:
ib_obj = rlt.information_bottleneck_objective(X, Y, Z, beta)
print(f" β={beta:5.1f}: IB目标 = {ib_obj:.4f}")
# 对比学习界
print("\n3. 对比学习理论界")
print("-" * 40)
for n_neg in [10, 100, 1000, 10000]:
bound = rlt.contrastive_learning_bound(n_neg)
print(f" 负样本数={n_neg:5d}: 下界 = {bound:.4f}")
# 表示质量评估
print("\n4. 表示质量指标")
print("-" * 40)
metrics = rlt.representation_quality_metrics(Z, Y)
for metric, value in metrics.items():
print(f" {metric}: {value:.4f}")
# 运行示例
if __name__ == "__main__":
representation_theory_example()
3. 优化景观分析¶
3.1 临界点分析¶
临界点定义:损失函数 \(\mathcal{L}(\theta)\) 的梯度为零的点:
临界点分类:
- 局部极小值:Hessian矩阵正定(所有特征值 > 0)
- 局部极大值:Hessian矩阵负定(所有特征值 < 0)
- 鞍点:Hessian矩阵不定(有正有负特征值)
Hessian矩阵:
临界点数量定理:
对于深度网络,临界点数量随参数数量指数增长,但鞍点占主导地位。
class CriticalPointAnalysis:
"""
临界点分析与优化景观
"""
@staticmethod
def classify_critical_point(hessian_eigenvalues: np.ndarray,
tolerance: float = 1e-6) -> str:
"""
根据Hessian特征值分类临界点
Args:
hessian_eigenvalues: Hessian矩阵特征值
tolerance: 数值容差
Returns:
临界点类型
"""
n_positive = np.sum(hessian_eigenvalues > tolerance)
n_negative = np.sum(hessian_eigenvalues < -tolerance)
n_zero = len(hessian_eigenvalues) - n_positive - n_negative
if n_zero > len(hessian_eigenvalues) * 0.1:
return "degenerate" # 退化临界点
elif n_positive > 0 and n_negative > 0:
return "saddle" # 鞍点
elif n_positive == len(hessian_eigenvalues):
return "local_minimum" # 局部极小值
elif n_negative == len(hessian_eigenvalues):
return "local_maximum" # 局部极大值
else:
return "unknown"
@staticmethod
def hessian_spectrum_analysis(model: torch.nn.Module,
data_loader: torch.utils.data.DataLoader,
loss_fn: Callable,
n_top_eigenvalues: int = 20) -> dict:
"""
分析Hessian矩阵的谱
Args:
model: PyTorch模型
data_loader: 数据加载器
loss_fn: 损失函数
n_top_eigenvalues: 计算的最大特征值数量
Returns:
谱分析结果
"""
model.eval() # eval()开启评估模式(关闭Dropout等)
# 收集梯度
for batch in data_loader:
inputs, targets = batch
model.zero_grad() # 清零梯度,防止梯度累积
outputs = model(inputs)
loss = loss_fn(outputs, targets)
loss.backward() # 反向传播计算梯度
# 提取梯度
grads = []
for param in model.parameters():
if param.grad is not None:
grads.append(param.grad.flatten())
gradient = torch.cat(grads)
break # 只用一个batch
# 使用幂迭代估计最大特征值
def hessian_vector_product(v):
"""计算Hessian-向量积"""
model.zero_grad()
for batch in data_loader:
inputs, targets = batch
outputs = model(inputs)
loss = loss_fn(outputs, targets)
# 一阶梯度
grads = torch.autograd.grad(loss, model.parameters(), create_graph=True)
flat_grads = torch.cat([g.flatten() for g in grads])
# 梯度与v的内积
grad_v = torch.sum(flat_grads * v)
# 二阶梯度
hvp = torch.autograd.grad(grad_v, model.parameters(), retain_graph=True)
flat_hvp = torch.cat([g.flatten() for g in hvp])
return flat_hvp
# 幂迭代求最大特征值
v = torch.randn_like(gradient)
v = v / torch.norm(v)
for _ in range(50): # 迭代次数
Hv = hessian_vector_product(v)
v = Hv / torch.norm(Hv)
# 瑞利商估计特征值
Hv = hessian_vector_product(v)
max_eigenvalue = torch.dot(v, Hv).item() # .item()将单元素张量转为Python数值
results = {
'max_eigenvalue': max_eigenvalue,
'gradient_norm': torch.norm(gradient).item(),
}
return results
@staticmethod
def loss_landscape_1d(model: torch.nn.Module,
direction: torch.Tensor,
data_loader: torch.utils.data.DataLoader,
loss_fn: Callable,
alpha_range: tuple[float, float] = (-1, 1),
n_points: int = 50) -> tuple[np.ndarray, np.ndarray]:
"""
沿一维方向分析损失景观
Args:
model: 模型
direction: 方向向量
data_loader: 数据加载器
loss_fn: 损失函数
alpha_range: 参数范围
n_points: 采样点数
Returns:
(alpha值, 损失值)
"""
# 保存原始参数
original_params = {}
for name, param in model.named_parameters():
original_params[name] = param.data.clone()
alphas = np.linspace(alpha_range[0], alpha_range[1], n_points)
losses = []
for alpha in alphas:
# 沿方向移动参数
idx = 0
for name, param in model.named_parameters():
param_size = param.numel()
param_direction = direction[idx:idx+param_size].reshape(param.shape)
param.data = original_params[name] + alpha * param_direction
idx += param_size
# 计算损失
total_loss = 0
n_batches = 0
with torch.no_grad(): # 禁用梯度计算,节省内存
for batch in data_loader:
inputs, targets = batch
outputs = model(inputs)
loss = loss_fn(outputs, targets)
total_loss += loss.item()
n_batches += 1
losses.append(total_loss / n_batches)
# 恢复原始参数
for name, param in model.named_parameters():
param.data = original_params[name]
return alphas, np.array(losses)
def critical_point_example():
"""临界点分析示例"""
print("\n" + "=" * 60)
print("临界点分析与优化景观")
print("=" * 60)
cpa = CriticalPointAnalysis()
# 临界点分类
print("\n1. 临界点分类")
print("-" * 40)
# 局部极小值
min_eigenvalues = np.array([1.0, 0.5, 0.3, 0.1])
print(f" 特征值 {min_eigenvalues}: {cpa.classify_critical_point(min_eigenvalues)}")
# 局部极大值
max_eigenvalues = np.array([-1.0, -0.5, -0.3, -0.1])
print(f" 特征值 {max_eigenvalues}: {cpa.classify_critical_point(max_eigenvalues)}")
# 鞍点
saddle_eigenvalues = np.array([1.0, 0.5, -0.3, -0.1])
print(f" 特征值 {saddle_eigenvalues}: {cpa.classify_critical_point(saddle_eigenvalues)}")
# 退化临界点
degenerate_eigenvalues = np.array([1.0, 0.0, 0.0, -0.1])
print(f" 特征值 {degenerate_eigenvalues}: {cpa.classify_critical_point(degenerate_eigenvalues)}")
# 高维分析
print("\n2. 高维Hessian分析")
print("-" * 40)
dimensions = [10, 100, 1000, 10000]
for dim in dimensions:
# 模拟Hessian特征值分布
# 大多数特征值接近0(鞍点特征)
eigenvalues = np.random.randn(dim) * 0.1
eigenvalues[0] = 1.0 # 一个正的大特征值
eigenvalues[1] = -0.5 # 一个负的特征值
critical_type = cpa.classify_critical_point(eigenvalues)
n_positive = np.sum(eigenvalues > 1e-6)
n_negative = np.sum(eigenvalues < -1e-6)
print(f" 维度={dim:5d}: 类型={critical_type:15s}, "
f"正特征值={n_positive:4d}, 负特征值={n_negative:4d}")
# 运行示例
if __name__ == "__main__":
critical_point_example()
3.2 鞍点问题与逃离¶
鞍点问题:在高维非凸优化中,鞍点比局部极小值更常见。
鞍点比例定理:对于高维高斯随机场,鞍点数量按维度指数增长,而局部极小值比例趋于零。
逃离鞍点的方法:
- 随机梯度下降(SGD):噪声自然帮助逃离鞍点
- 扰动梯度下降(PGD):添加随机扰动
- 二阶方法:利用Hessian信息
- 负曲率利用:沿负曲率方向更新
逃离时间分析:
对于严格鞍点(Hessian有负特征值),SGD以高概率在 \(O(\log(1/\epsilon))\) 步内逃离。
class SaddlePointEscape:
"""
鞍点逃离理论与算法
"""
@staticmethod
def is_strict_saddle(hessian_eigenvalues: np.ndarray,
tolerance: float = 1e-3) -> bool:
"""
检查是否为严格鞍点
严格鞍点:至少有一个负特征值
Args:
hessian_eigenvalues: Hessian特征值
tolerance: 容差
Returns:
是否为严格鞍点
"""
return np.any(hessian_eigenvalues < -tolerance)
@staticmethod
def escape_direction(hessian_eigenvalues: np.ndarray,
hessian_eigenvectors: np.ndarray) -> np.ndarray:
"""
计算鞍点逃离方向
沿最负曲率方向
Args:
hessian_eigenvalues: Hessian特征值
hessian_eigenvectors: Hessian特征向量
Returns:
逃离方向
"""
# 找到最负的特征值
min_idx = np.argmin(hessian_eigenvalues)
if hessian_eigenvalues[min_idx] < 0:
# 返回对应的特征向量
return hessian_eigenvectors[:, min_idx]
else:
# 不是鞍点,返回随机方向
return np.random.randn(len(hessian_eigenvalues))
@staticmethod
def perturbed_gradient_descent_step(params: np.ndarray,
gradient: np.ndarray,
learning_rate: float,
perturbation_scale: float = 0.1) -> np.ndarray:
"""
扰动梯度下降步骤
Args:
params: 当前参数
gradient: 梯度
learning_rate: 学习率
perturbation_scale: 扰动尺度
Returns:
更新后的参数
"""
# 添加随机扰动
perturbation = np.random.randn(len(params)) * perturbation_scale
# 梯度下降 + 扰动
new_params = params - learning_rate * gradient + perturbation
return new_params
@staticmethod
def cubic_regularization_step(params: np.ndarray,
gradient: np.ndarray,
hessian: np.ndarray,
learning_rate: float = 0.01,
regularization: float = 0.1) -> np.ndarray:
"""
三次正则化牛顿步骤
利用二阶信息逃离鞍点
Args:
params: 当前参数
gradient: 梯度
hessian: Hessian矩阵
learning_rate: 学习率
regularization: 正则化参数
Returns:
更新后的参数
"""
# 修改Hessian以逃离鞍点
eigenvalues, eigenvectors = np.linalg.eigh(hessian)
# 将负特征值变为正
modified_eigenvalues = np.abs(eigenvalues) + regularization
# 重构修改后的Hessian
modified_hessian = eigenvectors @ np.diag(modified_eigenvalues) @ eigenvectors.T
# 牛顿更新
new_params = params - learning_rate * np.linalg.solve(modified_hessian, gradient)
return new_params
@staticmethod
def estimate_escape_time(initial_distance: float,
negative_curvature: float,
learning_rate: float,
noise_scale: float) -> float:
"""
估计逃离鞍点的时间
Args:
initial_distance: 初始距离
negative_curvature: 负曲率大小
learning_rate: 学习率
noise_scale: 噪声尺度
Returns:
估计逃离时间
"""
# 简化分析:指数增长逃离
# d(t) = d_0 * exp(|lambda| * t)
# 逃离时间:t = log(r/d_0) / |lambda|
target_distance = 1.0 # 目标逃离距离
if negative_curvature > 0:
escape_time = np.log(target_distance / initial_distance) / negative_curvature
else:
escape_time = np.inf
return escape_time
def saddle_point_escape_example():
"""鞍点逃离示例"""
print("\n" + "=" * 60)
print("鞍点逃离理论")
print("=" * 60)
spe = SaddlePointEscape()
# 鞍点检测
print("\n1. 严格鞍点检测")
print("-" * 40)
test_cases = [
np.array([1.0, 0.5, 0.1]), # 局部极小值
np.array([1.0, -0.5, 0.1]), # 严格鞍点
np.array([-1.0, -0.5, -0.1]), # 局部极大值
np.array([0.1, 0.01, -0.01]), # 近似退化鞍点
]
for i, eigenvalues in enumerate(test_cases):
is_strict = spe.is_strict_saddle(eigenvalues)
print(f" 特征值 {eigenvalues}: 严格鞍点 = {is_strict}")
# 逃离方向
print("\n2. 逃离方向计算")
print("-" * 40)
eigenvalues = np.array([2.0, 1.0, -0.5, -1.5])
eigenvectors = np.eye(4) # 单位矩阵作为特征向量
escape_dir = spe.escape_direction(eigenvalues, eigenvectors)
print(f" 特征值: {eigenvalues}")
print(f" 逃离方向: {escape_dir}")
print(f" 最负特征值索引: {np.argmin(eigenvalues)}")
# 逃离时间估计
print("\n3. 逃离时间估计")
print("-" * 40)
scenarios = [
{"curvature": 0.1, "lr": 0.01, "noise": 0.1},
{"curvature": 0.5, "lr": 0.01, "noise": 0.1},
{"curvature": 1.0, "lr": 0.01, "noise": 0.1},
]
for scenario in scenarios:
escape_time = spe.estimate_escape_time(
initial_distance=0.01,
negative_curvature=scenario["curvature"],
learning_rate=scenario["lr"],
noise_scale=scenario["noise"]
)
print(f" 曲率={scenario['curvature']:.1f}: 估计逃离时间 ≈ {escape_time:.2f} 步")
# 运行示例
if __name__ == "__main__":
saddle_point_escape_example()
3.3 平坦极小值与泛化¶
平坦极小值:损失函数在极小值点附近的曲率较小。
数学定义:
- 几何平坦性:\(\|\nabla^2 \mathcal{L}(\theta)\|\) 小
- 权重扰动鲁棒性:\(\mathcal{L}(\theta + \delta) - \mathcal{L}(\theta)\) 小
平坦性与泛化关系:
理论表明,平坦极小值通常对应更好的泛化性能。
寻找平坦极小值的方法:
-
Sharpness-Aware Minimization (SAM): $\(\min_\theta \max_{\|\epsilon\| \leq \rho} \mathcal{L}(\theta + \epsilon)\)$
-
随机权重平均(SWA):平均多个检查点
-
大学习率训练:倾向于收敛到平坦区域
class FlatMinima:
"""
平坦极小值理论与算法
"""
@staticmethod
def sharpness_measure(loss_fn: Callable,
params: np.ndarray,
epsilon: float = 0.001) -> float:
"""
计算极小值的尖锐度
Args:
loss_fn: 损失函数
params: 参数
epsilon: 扰动大小
Returns:
尖锐度度量
"""
base_loss = loss_fn(params)
# 随机扰动
perturbations = np.random.randn(10, len(params)) * epsilon
sharpness_values = []
for pert in perturbations:
perturbed_loss = loss_fn(params + pert)
sharpness_values.append(perturbed_loss - base_loss)
return np.mean(sharpness_values)
@staticmethod
def sam_update(loss_fn: Callable,
params: np.ndarray,
learning_rate: float = 0.01,
rho: float = 0.05) -> np.ndarray:
"""
Sharpness-Aware Minimization (SAM) 更新
Args:
loss_fn: 损失函数
params: 当前参数
learning_rate: 学习率
rho: 扰动半径
Returns:
更新后的参数
"""
# 计算梯度
eps = 1e-5
gradient = np.zeros_like(params)
for i in range(len(params)):
params_plus = params.copy()
params_minus = params.copy()
params_plus[i] += eps
params_minus[i] -= eps
gradient[i] = (loss_fn(params_plus) - loss_fn(params_minus)) / (2 * eps)
# 计算扰动方向
epsilon_w = rho * gradient / (np.linalg.norm(gradient) + 1e-10)
# 在扰动点计算梯度
perturbed_params = params + epsilon_w
perturbed_gradient = np.zeros_like(params)
for i in range(len(params)):
params_plus = perturbed_params.copy()
params_minus = perturbed_params.copy()
params_plus[i] += eps
params_minus[i] -= eps
perturbed_gradient[i] = (loss_fn(params_plus) - loss_fn(params_minus)) / (2 * eps)
# SAM更新
new_params = params - learning_rate * perturbed_gradient
return new_params
@staticmethod
def stochastic_weight_averaging(weight_history: list[np.ndarray],
start_epoch: int = None) -> np.ndarray:
"""
随机权重平均 (SWA)
Args:
weight_history: 权重历史列表
start_epoch: 开始平均的epoch
Returns:
平均权重
"""
if start_epoch is None:
start_epoch = len(weight_history) // 2
# 平均后期权重
weights_to_average = weight_history[start_epoch:]
averaged_weights = np.mean(weights_to_average, axis=0)
return averaged_weights
@staticmethod
def hessian_based_flatness(hessian_eigenvalues: np.ndarray) -> dict:
"""
基于Hessian的平坦度度量
Args:
hessian_eigenvalues: Hessian特征值
Returns:
平坦度指标字典
"""
metrics = {}
# 迹(所有特征值之和)
metrics['trace'] = np.sum(hessian_eigenvalues)
# 最大特征值
metrics['max_eigenvalue'] = np.max(hessian_eigenvalues)
# 有效平坦度:大于阈值的特征值数量
threshold = 1e-3
metrics['effective_flatness'] = np.sum(hessian_eigenvalues < threshold)
# 特征值分布熵
normalized_eigenvalues = hessian_eigenvalues / (np.sum(hessian_eigenvalues) + 1e-10)
entropy = -np.sum(normalized_eigenvalues * np.log(normalized_eigenvalues + 1e-10))
metrics['eigenvalue_entropy'] = entropy
# 平坦度评分(越高越平坦)
metrics['flatness_score'] = 1.0 / (1.0 + metrics['trace'])
return metrics
def flat_minima_example():
"""平坦极小值示例"""
print("\n" + "=" * 60)
print("平坦极小值与泛化")
print("=" * 60)
fm = FlatMinima()
# 定义测试损失函数
def sharp_loss(x):
"""尖锐损失函数"""
return x[0]**2 + 100*x[1]**2
def flat_loss(x):
"""平坦损失函数"""
return x[0]**2 + x[1]**2
# 尖锐度测量
print("\n1. 尖锐度测量")
print("-" * 40)
x_sharp = np.array([0.0, 0.0])
x_flat = np.array([0.0, 0.0])
sharpness_sharp = fm.sharpness_measure(sharp_loss, x_sharp, epsilon=0.1)
sharpness_flat = fm.sharpness_measure(flat_loss, x_flat, epsilon=0.1)
print(f" 尖锐损失函数在(0,0)的尖锐度: {sharpness_sharp:.6f}")
print(f" 平坦损失函数在(0,0)的尖锐度: {sharpness_flat:.6f}")
print(f" 比值: {sharpness_sharp/sharpness_flat:.2f}x")
# Hessian分析
print("\n2. Hessian分析")
print("-" * 40)
# 尖锐函数的Hessian
hessian_sharp = np.array([[2, 0], [0, 200]])
eigenvalues_sharp = np.linalg.eigvalsh(hessian_sharp)
# 平坦函数的Hessian
hessian_flat = np.array([[2, 0], [0, 2]])
eigenvalues_flat = np.linalg.eigvalsh(hessian_flat)
metrics_sharp = fm.hessian_based_flatness(eigenvalues_sharp)
metrics_flat = fm.hessian_based_flatness(eigenvalues_flat)
print(" 尖锐极小值:")
for k, v in metrics_sharp.items():
print(f" {k}: {v:.4f}")
print(" 平坦极小值:")
for k, v in metrics_flat.items():
print(f" {k}: {v:.4f}")
# SWA
print("\n3. 随机权重平均 (SWA)")
print("-" * 40)
# 模拟训练轨迹
weight_history = []
for i in range(100):
# 模拟收敛到不同点
weight = np.array([1.0, 1.0]) + np.random.randn(2) * 0.1 / (i + 1)
weight_history.append(weight)
swa_weights = fm.stochastic_weight_averaging(weight_history, start_epoch=50)
final_weights = weight_history[-1]
print(f" 最终权重: {final_weights}")
print(f" SWA权重: {swa_weights}")
print(f" SWA权重方差: {np.var(weight_history[50:], axis=0)}")
# 运行示例
if __name__ == "__main__":
flat_minima_example()
4. 神经正切核理论¶
4.1 NTK的推导与性质¶
神经正切核(Neural Tangent Kernel, NTK):
对于神经网络 \(f(x; \theta)\),NTK定义为:
无限宽度极限:
当网络宽度趋于无穷时,NTK在训练过程中保持不变(kernel regime)。
训练动态:
在kernel regime下,网络输出演化遵循:
其中 \(\Theta(X, X)\) 是NTK矩阵。
解析解:
class NeuralTangentKernel:
"""
神经正切核 (NTK) 理论与计算
"""
@staticmethod
def ntk_matrix(X: np.ndarray,
network_fn: Callable,
params: np.ndarray) -> np.ndarray:
"""
计算NTK矩阵
Args:
X: 输入数据 (n_samples, n_features)
network_fn: 网络函数 f(x, params)
params: 网络参数
Returns:
NTK矩阵 (n_samples, n_samples)
"""
n_samples = len(X)
n_params = len(params)
# 计算每个样本的梯度
jacobian = np.zeros((n_samples, n_params))
eps = 1e-7
for i in range(n_samples):
grad = np.zeros(n_params)
for j in range(n_params):
params_plus = params.copy()
params_minus = params.copy()
params_plus[j] += eps
params_minus[j] -= eps
grad[j] = (network_fn(X[i], params_plus) -
network_fn(X[i], params_minus)) / (2 * eps)
jacobian[i] = grad
# NTK = J @ J^T
ntk = jacobian @ jacobian.T
return ntk
@staticmethod
def infinite_width_ntk(X: np.ndarray,
activation: str = 'relu',
depth: int = 1) -> np.ndarray:
"""
计算无限宽度网络的NTK
对于ReLU激活,NTK有闭式解
Args:
X: 输入数据
activation: 激活函数类型
depth: 网络深度
Returns:
NTK矩阵
"""
# 计算输入的内积矩阵
Sigma = X @ X.T
# 归一化
norms = np.sqrt(np.diag(Sigma))
Sigma = Sigma / np.outer(norms, norms)
# 限制在[-1, 1]
Sigma = np.clip(Sigma, -1, 1)
if activation == 'relu':
# ReLU的NTK
# K = ||x|| ||x'|| * (sin(theta) + (pi - theta)cos(theta)) / (2pi)
# 其中 theta = arccos(<x, x'> / (||x|| ||x'||))
theta = np.arccos(Sigma)
# 基础NTK
K = (np.sin(theta) + (np.pi - theta) * np.cos(theta)) / (2 * np.pi)
# 多层NTK
ntk = K.copy()
for _ in range(depth - 1):
theta = np.arccos(np.clip(ntk, -1, 1))
ntk = (np.sin(theta) + (np.pi - theta) * np.cos(theta)) / (2 * np.pi)
elif activation == 'tanh':
# tanh的近似NTK
ntk = Sigma
else:
# 线性激活
ntk = Sigma
return ntk
@staticmethod
def kernel_dynamics(ntk_matrix: np.ndarray,
initial_predictions: np.ndarray,
targets: np.ndarray,
learning_rate: float = 0.01,
n_steps: int = 1000) -> np.ndarray:
"""
模拟kernel regime下的训练动态
Args:
ntk_matrix: NTK矩阵
initial_predictions: 初始预测
targets: 目标值
learning_rate: 学习率
n_steps: 训练步数
Returns:
最终预测
"""
# 解析解:f(t) = Y + exp(-eta * Theta * t) * (f(0) - Y)
residual = initial_predictions - targets
# 矩阵指数
from scipy.linalg import expm
evolution_matrix = expm(-learning_rate * ntk_matrix * n_steps)
final_predictions = targets + evolution_matrix @ residual
return final_predictions
@staticmethod
def lazy_training_condition(n_samples: int,
n_params: int,
learning_rate: float) -> bool:
"""
判断是否处于lazy training regime
Lazy training: 参数变化很小,网络近似线性
Args:
n_samples: 样本数
n_params: 参数数
learning_rate: 学习率
Returns:
是否在lazy regime
"""
# 启发式条件:参数数 >> 样本数
# 且学习率足够小
wide_condition = n_params > n_samples * 10
small_lr_condition = learning_rate < 1.0 / np.sqrt(n_params)
return wide_condition and small_lr_condition
def ntk_example():
"""NTK示例"""
print("\n" + "=" * 60)
print("神经正切核 (NTK) 理论")
print("=" * 60)
ntk = NeuralTangentKernel()
# 生成数据
np.random.seed(42)
n_samples = 50
X = np.random.randn(n_samples, 10)
# 归一化
X = X / np.linalg.norm(X, axis=1, keepdims=True)
# 计算无限宽度NTK
print("\n1. 无限宽度NTK")
print("-" * 40)
K_relu = ntk.infinite_width_ntk(X, activation='relu', depth=1)
K_relu_deep = ntk.infinite_width_ntk(X, activation='relu', depth=3)
print(f" 输入形状: {X.shape}")
print(f" NTK矩阵形状: {K_relu.shape}")
print(f" 浅层NTK条件数: {np.linalg.cond(K_relu):.2f}")
print(f" 深层NTK条件数: {np.linalg.cond(K_relu_deep):.2f}")
# NTK特征值分析
eigenvalues = np.linalg.eigvalsh(K_relu)
print(f" NTK特征值范围: [{eigenvalues[0]:.4f}, {eigenvalues[-1]:.4f}]")
print(f" 有效秩: {np.sum(eigenvalues > 1e-6)}")
# Kernel regime训练动态
print("\n2. Kernel Regime训练动态")
print("-" * 40)
# 模拟回归任务
y_true = np.sin(X[:, 0] * 3)
f_0 = np.zeros(n_samples) # 初始预测为0
learning_rates = [0.001, 0.01, 0.1]
for lr in learning_rates:
f_final = ntk.kernel_dynamics(K_relu, f_0, y_true, lr, n_steps=1000)
mse = np.mean((f_final - y_true)**2)
print(f" 学习率={lr:.3f}: 最终MSE = {mse:.6f}")
# Lazy training条件
print("\n3. Lazy Training条件")
print("-" * 40)
configs = [
{"samples": 100, "params": 1000, "lr": 0.01},
{"samples": 1000, "params": 10000, "lr": 0.001},
{"samples": 100, "params": 100, "lr": 0.1},
]
for config in configs:
is_lazy = ntk.lazy_training_condition(
config["samples"], config["params"], config["lr"]
)
print(f" 样本={config['samples']:4d}, 参数={config['params']:5d}, "
f"lr={config['lr']:.3f}: Lazy regime = {is_lazy}")
# 运行示例
if __name__ == "__main__":
ntk_example()
4.2 宽网络的训练动态¶
宽网络的特点:
- 线性化:在初始化附近,网络行为近似线性
- 高斯过程:无限宽度网络等价于高斯过程
- 确定性收敛:训练动态可由NTK完全描述
特征学习 vs Lazy Training:
- Lazy Training:参数变化小,NTK近似恒定
- 特征学习:参数变化大,网络学习数据表示
过渡条件:
从lazy training到特征学习的过渡由以下因素决定: - 网络宽度 - 学习率 - 初始化尺度
class WideNetworkDynamics:
"""
宽网络的训练动态分析
"""
@staticmethod
def linearized_network(x: np.ndarray,
params: np.ndarray,
initial_params: np.ndarray,
initial_output: Callable) -> float:
"""
线性化网络近似
f_lin(x) = f(x; theta_0) + ∇f(x; theta_0)^T (theta - theta_0)
Args:
x: 输入
params: 当前参数
initial_params: 初始参数
initial_output: 初始输出函数
Returns:
线性化输出
"""
# 初始输出
f_0 = initial_output(x)
# 参数变化
delta_params = params - initial_params
# 这里简化处理,实际应计算梯度
# 假设梯度近似为1
linearized_output = f_0 + np.sum(delta_params) * 0.01
return linearized_output
@staticmethod
def gp_prior_covariance(X: np.ndarray,
activation: str = 'relu',
depth: int = 1) -> np.ndarray:
"""
无限宽度网络的高斯过程先验协方差
Args:
X: 输入数据
activation: 激活函数
depth: 深度
Returns:
GP先验协方差矩阵
"""
# 与NTK类似,但描述的是输出的先验分布
Sigma = X @ X.T
norms = np.sqrt(np.diag(Sigma))
Sigma = Sigma / np.outer(norms, norms + 1e-10)
Sigma = np.clip(Sigma, -1, 1)
if activation == 'relu':
# ReLU NNGP核 (Cho & Saul, 2009)
theta = np.arccos(Sigma)
K = (np.sin(theta) + (np.pi - theta) * np.cos(theta)) / (2 * np.pi) * norms.reshape(-1, 1) * norms
else:
K = Sigma
return K
@staticmethod
def feature_learning_regime(width: int,
n_samples: int,
learning_rate: float,
init_scale: float = 1.0) -> bool:
"""
判断是否处于特征学习regime
与lazy training相反
Args:
width: 网络宽度
n_samples: 样本数
learning_rate: 学习率
init_scale: 初始化尺度
Returns:
是否在特征学习regime
"""
# 特征学习条件:
# 1. 网络不太宽
# 2. 学习率较大
# 3. 初始化尺度较大
not_too_wide = width < n_samples * 5
large_lr = learning_rate > 0.1 / width
large_init = init_scale > 1.0
return not_too_wide and (large_lr or large_init)
@staticmethod
def training_regime_diagram(widths: list[int],
learning_rates: list[float],
n_samples: int = 100) -> np.ndarray:
"""
生成训练相图
Args:
widths: 宽度列表
learning_rates: 学习率列表
n_samples: 样本数
Returns:
相图矩阵
"""
diagram = np.zeros((len(widths), len(learning_rates)))
for i, width in enumerate(widths):
for j, lr in enumerate(learning_rates):
is_lazy = NeuralTangentKernel.lazy_training_condition(
n_samples, width * 100, lr # 假设每层100参数
)
is_feature = WideNetworkDynamics.feature_learning_regime(
width, n_samples, lr
)
# 0: lazy, 1: intermediate, 2: feature learning
if is_lazy and not is_feature:
diagram[i, j] = 0
elif is_feature and not is_lazy:
diagram[i, j] = 2
else:
diagram[i, j] = 1
return diagram
def wide_network_dynamics_example():
"""宽网络动态示例"""
print("\n" + "=" * 60)
print("宽网络的训练动态")
print("=" * 60)
wnd = WideNetworkDynamics()
# GP先验
print("\n1. 高斯过程先验")
print("-" * 40)
np.random.seed(42)
X = np.random.randn(20, 5)
X = X / np.linalg.norm(X, axis=1, keepdims=True)
K_gp = wnd.gp_prior_covariance(X, activation='relu')
print(f" GP协方差矩阵形状: {K_gp.shape}")
print(f" 对角线元素(方差): {np.diag(K_gp)[:5]}")
print(f" 矩阵正定性: {np.all(np.linalg.eigvalsh(K_gp) > -1e-10)}")
# 训练regime分析
print("\n2. 训练Regime分析")
print("-" * 40)
configs = [
{"width": 10, "samples": 100, "lr": 0.01, "init": 1.0},
{"width": 1000, "samples": 100, "lr": 0.001, "init": 0.1},
{"width": 100, "samples": 100, "lr": 0.1, "init": 2.0},
]
for config in configs:
is_lazy = NeuralTangentKernel.lazy_training_condition(
config["samples"], config["width"] * 100, config["lr"]
)
is_feature = wnd.feature_learning_regime(
config["width"], config["samples"], config["lr"], config["init"]
)
regime = "Lazy" if is_lazy else ("Feature Learning" if is_feature else "Intermediate")
print(f" 宽度={config['width']:4d}, lr={config['lr']:.3f}: {regime}")
# 运行示例
if __name__ == "__main__":
wide_network_dynamics_example()
4.3 NTK与泛化的关系¶
NTK视角下的泛化:
在kernel regime下,泛化性能由NTK的谱性质决定:
- 特征值衰减:决定收敛速度
- 有效维度:决定模型复杂度
- 核对齐:NTK与目标函数的匹配程度
泛化误差界:
对于NTK回归,泛化误差:
其中 \(\|f^*\|_{\mathcal{H}}\) 是目标函数的RKHS范数。
class NTKGeneralization:
"""
NTK与泛化的关系
"""
@staticmethod
def rkhs_norm(target_function: Callable,
ntk_matrix: np.ndarray,
X: np.ndarray) -> float:
"""
估计目标函数的RKHS范数
||f||_H^2 = f^T Theta^{-1} f
Args:
target_function: 目标函数
ntk_matrix: NTK矩阵
X: 输入数据
Returns:
RKHS范数估计
"""
# 计算目标函数在数据点的值
y = np.array([target_function(x) for x in X])
# 添加正则化
reg_ntk = ntk_matrix + 1e-6 * np.eye(len(ntk_matrix))
# RKHS范数
norm_squared = y @ np.linalg.solve(reg_ntk, y)
return np.sqrt(norm_squared)
@staticmethod
def generalization_bound_ntk(n_samples: int,
rkhs_norm: float,
ntk_trace: float,
noise_variance: float = 0.1) -> float:
"""
NTK回归的泛化误差界
Args:
n_samples: 样本数
rkhs_norm: RKHS范数
ntk_trace: NTK矩阵的迹
noise_variance: 噪声方差
Returns:
泛化误差上界
"""
# 简化界:O(||f*||^2 / n + sigma^2 * tr(Theta) / n)
bound = (rkhs_norm**2 / n_samples +
noise_variance * ntk_trace / n_samples)
return bound
@staticmethod
def kernel_alignment(K: np.ndarray,
y: np.ndarray) -> float:
"""
计算核与目标的匹配度
Args:
K: 核矩阵
y: 目标值
Returns:
对齐分数
"""
# 中心化
K_centered = K - K.mean(axis=0, keepdims=True) - K.mean(axis=1, keepdims=True) + K.mean()
y_centered = y - y.mean()
# 对齐分数
alignment = (y_centered @ K_centered @ y_centered) / (np.linalg.norm(K_centered, 'fro') * np.linalg.norm(y_centered)**2)
return alignment
@staticmethod
def effective_dimension(ntk_eigenvalues: np.ndarray,
regularization: float = 1e-3) -> float:
"""
计算有效维度
d_eff = sum(lambda_i / (lambda_i + lambda))
Args:
ntk_eigenvalues: NTK特征值
regularization: 正则化参数
Returns:
有效维度
"""
effective_dim = np.sum(ntk_eigenvalues / (ntk_eigenvalues + regularization))
return effective_dim
def ntk_generalization_example():
"""NTK泛化示例"""
print("\n" + "=" * 60)
print("NTK与泛化")
print("=" * 60)
ntk_gen = NTKGeneralization()
# 生成数据
np.random.seed(42)
n_samples = 100
X = np.random.randn(n_samples, 10)
X = X / np.linalg.norm(X, axis=1, keepdims=True)
# 目标函数
def target_fn(x):
return np.sin(3 * x[0]) + 0.5 * np.cos(2 * x[1])
y = np.array([target_fn(x) for x in X])
# 计算NTK
K = NeuralTangentKernel.infinite_width_ntk(X, activation='relu')
# RKHS范数
print("\n1. RKHS范数分析")
print("-" * 40)
rkhs_norm = ntk_gen.rkhs_norm(target_fn, K, X)
print(f" 目标函数RKHS范数: {rkhs_norm:.4f}")
# 泛化界
print("\n2. 泛化误差界")
print("-" * 40)
ntk_trace = np.trace(K)
bound = ntk_gen.generalization_bound_ntk(n_samples, rkhs_norm, ntk_trace)
print(f" NTK迹: {ntk_trace:.2f}")
print(f" 泛化误差界: {bound:.4f}")
# 核对齐
print("\n3. 核对齐分析")
print("-" * 40)
alignment = ntk_gen.kernel_alignment(K, y)
print(f" 核对齐分数: {alignment:.4f}")
print(f" 对齐程度: {'高' if alignment > 0.5 else '中' if alignment > 0.2 else '低'}")
# 有效维度
print("\n4. 有效维度")
print("-" * 40)
eigenvalues = np.linalg.eigvalsh(K)
for reg in [1e-3, 1e-2, 1e-1]:
eff_dim = ntk_gen.effective_dimension(eigenvalues, reg)
print(f" 正则化={reg:.3f}: 有效维度 = {eff_dim:.2f}")
# 运行示例
if __name__ == "__main__":
ntk_generalization_example()
5. 深度学习的归纳偏置¶
5.1 架构的归纳偏置¶
归纳偏置:学习算法在学习过程中对解的偏好。
CNN的归纳偏置: - 局部连接:空间局部性 - 权重共享:平移等变性 - 层次结构:从局部到全局
Transformer的归纳偏置: - 自注意力:全局依赖 - 位置编码:序列顺序 - 多头机制:多子空间表示
RNN的归纳偏置: - 时间递归:序列建模 - 参数共享:时间平移不变性
class InductiveBias:
"""
深度学习的归纳偏置分析
"""
@staticmethod
def cnn_inductive_bias(input_shape: tuple[int, ...],
kernel_size: int) -> dict:
"""
分析CNN的归纳偏置
Args:
input_shape: 输入形状
kernel_size: 卷积核大小
Returns:
归纳偏置分析
"""
analysis = {}
# 局部感受野比例
total_pixels = np.prod(input_shape)
local_pixels = kernel_size ** len(input_shape)
analysis['local_receptive_field_ratio'] = local_pixels / total_pixels
# 平移等变性
analysis['translation_equivariance'] = True
# 参数共享程度
# 每个输出位置共享相同卷积核
analysis['weight_sharing_factor'] = total_pixels / local_pixels
return analysis
@staticmethod
def transformer_inductive_bias(seq_length: int,
d_model: int,
n_heads: int) -> dict:
"""
分析Transformer的归纳偏置
Args:
seq_length: 序列长度
d_model: 模型维度
n_heads: 注意力头数
Returns:
归纳偏置分析
"""
analysis = {}
# 全局依赖范围
analysis['receptive_field'] = seq_length # 全局
# 排列等变性(无位置编码时)
analysis['permutation_equivariance'] = True
# 多头表示的子空间数
analysis['subspace_dimension'] = d_model // n_heads
# 计算复杂度
analysis['attention_complexity'] = seq_length ** 2
return analysis
@staticmethod
def gnn_inductive_bias(graph: np.ndarray,
aggregation: str = 'mean') -> dict:
"""
分析GNN的归纳偏置
Args:
graph: 邻接矩阵
aggregation: 聚合函数
Returns:
归纳偏置分析
"""
analysis = {}
# 置换等变性
analysis['permutation_equivariance'] = True
# 局部邻域大小
degrees = np.sum(graph, axis=1)
analysis['mean_neighborhood_size'] = np.mean(degrees)
analysis['max_neighborhood_size'] = np.max(degrees)
# 图同构不变性
analysis['graph_isomorphism_invariance'] = (aggregation in ['sum', 'mean', 'max'])
return analysis
@staticmethod
def compare_architectures(input_specs: dict) -> dict:
"""
比较不同架构的归纳偏置
Args:
input_specs: 输入规格
Returns:
比较结果
"""
comparison = {}
# CNN
cnn_bias = InductiveBias.cnn_inductive_bias(
input_specs.get('image_shape', (32, 32, 3)),
kernel_size=3
)
comparison['CNN'] = cnn_bias
# Transformer
transformer_bias = InductiveBias.transformer_inductive_bias(
seq_length=input_specs.get('seq_length', 100),
d_model=input_specs.get('d_model', 512),
n_heads=input_specs.get('n_heads', 8)
)
comparison['Transformer'] = transformer_bias
return comparison
def inductive_bias_example():
"""归纳偏置示例"""
print("\n" + "=" * 60)
print("深度学习的归纳偏置")
print("=" * 60)
ib = InductiveBias()
# CNN归纳偏置
print("\n1. CNN归纳偏置")
print("-" * 40)
cnn_bias = ib.cnn_inductive_bias((32, 32, 3), kernel_size=3)
for k, v in cnn_bias.items():
print(f" {k}: {v}")
# Transformer归纳偏置
print("\n2. Transformer归纳偏置")
print("-" * 40)
transformer_bias = ib.transformer_inductive_bias(100, 512, 8)
for k, v in transformer_bias.items():
if isinstance(v, float): # isinstance检查对象类型
print(f" {k}: {v:.2f}")
else:
print(f" {k}: {v}")
# GNN归纳偏置
print("\n3. GNN归纳偏置")
print("-" * 40)
# 创建示例图
graph = np.array([
[0, 1, 1, 0],
[1, 0, 1, 1],
[1, 1, 0, 1],
[0, 1, 1, 0]
])
gnn_bias = ib.gnn_inductive_bias(graph)
for k, v in gnn_bias.items():
if isinstance(v, float):
print(f" {k}: {v:.2f}")
else:
print(f" {k}: {v}")
# 运行示例
if __name__ == "__main__":
inductive_bias_example()
5.2 优化算法的隐式偏置¶
SGD的隐式偏置:
- 解的选择:SGD倾向于收敛到平坦极小值
- 正则化效果:小批量噪声有正则化作用
- 早停效应:防止过拟合
Adam的隐式偏置:
- 自适应学习率:不同参数不同更新速度
- 动量效应:加速收敛
- 偏差校正:初期步长调整
数学分析:
SGD可以看作是对损失函数的随机梯度流:
其中 \(\Sigma\) 是梯度协方差,\(W_t\) 是维纳过程。
class ImplicitBias:
"""
优化算法的隐式偏置
"""
@staticmethod
def sgd_implicit_regularization(gradient_variance: float,
learning_rate: float,
n_iterations: int) -> float:
"""
估计SGD的隐式正则化效果
Args:
gradient_variance: 梯度方差
learning_rate: 学习率
n_iterations: 迭代次数
Returns:
隐式正则化强度
"""
# SGD噪声的隐式正则化 ≈ (eta/2) * tr(Sigma)
implicit_reg = 0.5 * learning_rate * gradient_variance * n_iterations
return implicit_reg
@staticmethod
def flat_minima_preference(optimization_trajectory: list[np.ndarray],
loss_fn: Callable) -> dict:
"""
分析优化轨迹对平坦极小值的偏好
Args:
optimization_trajectory: 参数轨迹
loss_fn: 损失函数
Returns:
平坦度分析
"""
analysis = {}
# 计算每个点的损失
losses = [loss_fn(params) for params in optimization_trajectory]
# 计算损失变化率
loss_changes = np.diff(losses)
# 最终损失的平坦度(扰动测试)
final_params = optimization_trajectory[-1]
perturbations = np.random.randn(10, len(final_params)) * 0.01
final_loss = loss_fn(final_params)
perturbed_losses = [loss_fn(final_params + pert) for pert in perturbations]
analysis['final_loss'] = final_loss
analysis['loss_sensitivity'] = np.std(perturbed_losses)
analysis['convergence_rate'] = np.mean(np.abs(loss_changes[-10:]))
return analysis
@staticmethod
def adaptive_optimizer_bias(first_moment: np.ndarray,
second_moment: np.ndarray,
epsilon: float = 1e-8) -> np.ndarray:
"""
分析自适应优化器的更新偏置
Args:
first_moment: 一阶矩估计
second_moment: 二阶矩估计
epsilon: 数值稳定性常数
Returns:
有效学习率比率
"""
# Adam的有效学习率 ∝ 1 / sqrt(v_t)
effective_lr_ratio = 1.0 / (np.sqrt(second_moment) + epsilon)
# 归一化
effective_lr_ratio = effective_lr_ratio / np.mean(effective_lr_ratio)
return effective_lr_ratio
@staticmethod
def gradient_flow_approximation(gradient_fn: Callable,
initial_params: np.ndarray,
n_steps: int = 100,
step_size: float = 0.01) -> list[np.ndarray]:
"""
模拟梯度流
Args:
gradient_fn: 梯度函数
initial_params: 初始参数
n_steps: 步数
step_size: 步长
Returns:
参数轨迹
"""
trajectory = [initial_params.copy()]
params = initial_params.copy()
for _ in range(n_steps):
gradient = gradient_fn(params)
params = params - step_size * gradient
trajectory.append(params.copy())
return trajectory
def implicit_bias_example():
"""隐式偏置示例"""
print("\n" + "=" * 60)
print("优化算法的隐式偏置")
print("=" * 60)
ib = ImplicitBias()
# SGD隐式正则化
print("\n1. SGD隐式正则化")
print("-" * 40)
configs = [
{"var": 0.1, "lr": 0.01, "iter": 1000},
{"var": 0.5, "lr": 0.01, "iter": 1000},
{"var": 0.1, "lr": 0.1, "iter": 1000},
]
for config in configs:
reg = ib.sgd_implicit_regularization(
config["var"], config["lr"], config["iter"]
)
print(f" 方差={config['var']:.1f}, lr={config['lr']:.2f}: "
f"隐式正则化 = {reg:.4f}")
# 自适应优化器偏置
print("\n2. 自适应优化器偏置")
print("-" * 40)
# 模拟梯度统计
m = np.array([0.1, 0.5, 1.0, 2.0]) # 一阶矩
v = np.array([0.01, 0.1, 0.5, 1.0]) # 二阶矩
lr_ratio = ib.adaptive_optimizer_bias(m, v)
print(" 梯度统计:")
print(f" 一阶矩: {m}")
print(f" 二阶矩: {v}")
print(f" 有效lr比率: {lr_ratio}")
print(f" 最大/最小比率: {np.max(lr_ratio)/np.min(lr_ratio):.2f}x")
# 运行示例
if __name__ == "__main__":
implicit_bias_example()
5.3 数据分布的隐式影响¶
数据分布对学习的影响:
- 长尾分布:类别不平衡导致偏置
- 分布偏移:训练/测试分布不一致
- 数据增强:隐式正则化效果
标签噪声的鲁棒性:
深度网络对标签噪声有一定鲁棒性,但严重噪声会导致过拟合。
记忆与泛化:
- 深度网络能记忆随机标签
- 但优先学习简单模式( simplicity bias)
class DataDistributionImpact:
"""
数据分布对学习的隐式影响
"""
@staticmethod
def long_tail_analysis(class_counts: np.ndarray) -> dict:
"""
分析长尾分布的影响
Args:
class_counts: 各类别样本数
Returns:
分析结果
"""
analysis = {}
# 不平衡比率
analysis['imbalance_ratio'] = np.max(class_counts) / np.min(class_counts)
# Gini系数
sorted_counts = np.sort(class_counts)
n = len(class_counts)
index = np.arange(1, n + 1)
analysis['gini_coefficient'] = (2 * np.sum(index * sorted_counts)) / (n * np.sum(sorted_counts)) - (n + 1) / n
# 有效类别数
probs = class_counts / np.sum(class_counts)
analysis['effective_num_classes'] = np.exp(-np.sum(probs * np.log(probs)))
return analysis
@staticmethod
def label_noise_robustness(true_labels: np.ndarray,
noisy_labels: np.ndarray,
predictions: np.ndarray) -> dict:
"""
分析对标签噪声的鲁棒性
Args:
true_labels: 真实标签
noisy_labels: 带噪声标签
predictions: 模型预测
Returns:
鲁棒性分析
"""
analysis = {}
# 噪声率
noise_rate = np.mean(true_labels != noisy_labels)
analysis['noise_rate'] = noise_rate
# 在干净样本上的准确率
clean_mask = true_labels == noisy_labels
analysis['accuracy_on_clean'] = np.mean(predictions[clean_mask] == true_labels[clean_mask])
# 在噪声样本上的准确率
analysis['accuracy_on_noisy'] = np.mean(predictions[~clean_mask] == true_labels[~clean_mask])
# 总体准确率
analysis['overall_accuracy'] = np.mean(predictions == true_labels)
return analysis
@staticmethod
def simplicity_bias_measure(train_losses: np.ndarray,
complexity_measures: np.ndarray) -> float:
"""
测量简单性偏置
网络优先学习简单模式
Args:
train_losses: 不同复杂度下的训练损失
complexity_measures: 复杂度度量
Returns:
简单性偏置分数
"""
# 简单性偏置:低复杂度样本损失下降更快
# 计算损失与复杂度的相关性
correlation = np.corrcoef(train_losses, complexity_measures)[0, 1]
# 负相关表示简单性偏置
simplicity_bias = -correlation
return simplicity_bias
@staticmethod
def distribution_shift_detection(train_data: np.ndarray,
test_data: np.ndarray) -> dict:
"""
检测训练/测试分布偏移
Args:
train_data: 训练数据
test_data: 测试数据
Returns:
分布偏移分析
"""
analysis = {}
# 均值偏移
train_mean = np.mean(train_data, axis=0)
test_mean = np.mean(test_data, axis=0)
analysis['mean_shift'] = np.linalg.norm(train_mean - test_mean)
# 协方差偏移
train_cov = np.cov(train_data.T)
test_cov = np.cov(test_data.T)
analysis['covariance_shift'] = np.linalg.norm(train_cov - test_cov, 'fro')
# 最大均值差异 (MMD)
mmd = np.linalg.norm(train_mean - test_mean) ** 2
analysis['mmd_squared'] = mmd
return analysis
def data_distribution_example():
"""数据分布影响示例"""
print("\n" + "=" * 60)
print("数据分布的隐式影响")
print("=" * 60)
ddi = DataDistributionImpact()
# 长尾分布分析
print("\n1. 长尾分布分析")
print("-" * 40)
# 模拟长尾分布
class_counts = np.array([1000, 500, 200, 100, 50, 20, 10, 5])
long_tail = ddi.long_tail_analysis(class_counts)
for k, v in long_tail.items():
print(f" {k}: {v:.4f}")
# 标签噪声鲁棒性
print("\n2. 标签噪声鲁棒性")
print("-" * 40)
np.random.seed(42)
n_samples = 1000
n_classes = 10
true_labels = np.random.randint(0, n_classes, n_samples)
# 添加20%噪声
noise_mask = np.random.rand(n_samples) < 0.2
noisy_labels = true_labels.copy()
noisy_labels[noise_mask] = np.random.randint(0, n_classes, np.sum(noise_mask))
# 模拟预测(80%准确)
predictions = true_labels.copy()
error_mask = np.random.rand(n_samples) < 0.2
predictions[error_mask] = np.random.randint(0, n_classes, np.sum(error_mask))
robustness = ddi.label_noise_robustness(true_labels, noisy_labels, predictions)
for k, v in robustness.items():
print(f" {k}: {v:.4f}")
# 分布偏移检测
print("\n3. 分布偏移检测")
print("-" * 40)
train_data = np.random.randn(500, 10)
test_data = np.random.randn(500, 10) + 0.5 # 有偏移
shift = ddi.distribution_shift_detection(train_data, test_data)
for k, v in shift.items():
print(f" {k}: {v:.4f}")
# 运行示例
if __name__ == "__main__":
data_distribution_example()
6. 学习动态与训练理论¶
6.1 训练过程的相变¶
相变现象:
训练过程中,网络经历不同阶段: 1. 初始化阶段:随机行为 2. 拟合阶段:学习简单模式 3. 细化阶段:优化细节 4. 收敛阶段:接近极小值
损失景观的演化:
信号部分先收敛,噪声部分后被记忆。
class TrainingPhaseTransitions:
"""
训练过程的相变分析
"""
@staticmethod
def detect_phase_transitions(loss_history: np.ndarray,
gradient_norm_history: np.ndarray) -> dict:
"""
检测训练相变
Args:
loss_history: 损失历史
gradient_norm_history: 梯度范数历史
Returns:
相变检测结果
"""
phases = {}
# 计算损失变化率
loss_changes = np.abs(np.diff(loss_history))
# 检测快速下降阶段
rapid_descent_threshold = np.percentile(loss_changes, 75)
rapid_descent_epochs = np.where(loss_changes > rapid_descent_threshold)[0]
if len(rapid_descent_epochs) > 0:
phases['rapid_descent_end'] = rapid_descent_epochs[-1]
# 检测收敛阶段(梯度范数稳定)
gradient_variance = np.array([
np.var(gradient_norm_history[max(0, i-10):i+1])
for i in range(len(gradient_norm_history))
])
convergence_threshold = np.percentile(gradient_variance, 10)
convergence_candidates = np.where(gradient_variance < convergence_threshold)[0]
if len(convergence_candidates) > 0:
phases['convergence_start'] = convergence_candidates[0]
# 检测平台期
plateau_threshold = np.percentile(loss_changes, 25)
plateau_epochs = np.where(loss_changes < plateau_threshold)[0]
phases['n_plateaus'] = len(plateau_epochs)
return phases
@staticmethod
def signal_noise_decomposition(predictions: np.ndarray,
true_labels: np.ndarray,
noise_level: float = 0.1) -> dict:
"""
分解信号和噪声的学习
Args:
predictions: 预测历史
true_labels: 真实标签
noise_level: 噪声水平
Returns:
信号和噪声学习分析
"""
decomposition = {}
# 假设前一半epoch学习信号,后一半记忆噪声
n_epochs = len(predictions)
mid_point = n_epochs // 2
# 信号学习准确率
signal_acc = np.mean(predictions[:mid_point] == true_labels)
decomposition['signal_learning_accuracy'] = signal_acc
# 整体准确率
overall_acc = np.mean(predictions == true_labels)
decomposition['overall_accuracy'] = overall_acc
# 噪声记忆程度
noise_memorization = overall_acc - signal_acc
decomposition['noise_memorization'] = noise_memorization
return decomposition
@staticmethod
def learning_speed_analysis(loss_history: np.ndarray,
epoch_window: int = 10) -> dict:
"""
分析学习速度的变化
Args:
loss_history: 损失历史
epoch_window: 计算窗口
Returns:
学习速度分析
"""
analysis = {}
# 计算滑动窗口内的平均下降速度
learning_speeds = []
for i in range(epoch_window, len(loss_history)):
speed = (loss_history[i-epoch_window] - loss_history[i]) / epoch_window
learning_speeds.append(speed)
learning_speeds = np.array(learning_speeds)
analysis['max_learning_speed'] = np.max(learning_speeds)
analysis['mean_learning_speed'] = np.mean(learning_speeds)
analysis['final_learning_speed'] = learning_speeds[-1]
# 检测学习速度显著下降的点
speed_threshold = analysis['max_learning_speed'] * 0.1
slowdown_points = np.where(learning_speeds < speed_threshold)[0]
if len(slowdown_points) > 0:
analysis['learning_slowdown_epoch'] = slowdown_points[0] + epoch_window
return analysis
def training_phases_example():
"""训练相变示例"""
print("\n" + "=" * 60)
print("训练过程的相变")
print("=" * 60)
tpt = TrainingPhaseTransitions()
# 模拟训练历史
np.random.seed(42)
n_epochs = 200
# 模拟损失曲线(快速下降后缓慢收敛)
epochs = np.arange(n_epochs)
loss_history = 1.0 * np.exp(-epochs / 20) + 0.1 + 0.01 * np.random.randn(n_epochs)
# 模拟梯度范数
gradient_norms = 1.0 * np.exp(-epochs / 30) + 0.05 + 0.005 * np.random.randn(n_epochs)
# 相变检测
print("\n1. 相变检测")
print("-" * 40)
phases = tpt.detect_phase_transitions(loss_history, gradient_norms)
for k, v in phases.items():
print(f" {k}: {v}")
# 学习速度分析
print("\n2. 学习速度分析")
print("-" * 40)
speed_analysis = tpt.learning_speed_analysis(loss_history)
for k, v in speed_analysis.items():
if isinstance(v, float):
print(f" {k}: {v:.6f}")
else:
print(f" {k}: {v}")
# 运行示例
if __name__ == "__main__":
training_phases_example()
6.2 学习率调度理论¶
学习率的重要性:
学习率是影响训练的最关键超参数之一。
调度策略:
- Step Decay:每隔一定epoch降低学习率
-
Exponential Decay:指数衰减 $\(\eta_t = \eta_0 \cdot \gamma^t\)$
-
Cosine Annealing: $\(\eta_t = \eta_{min} + \frac{1}{2}(\eta_{max} - \eta_{min})(1 + \cos(\frac{t}{T}\pi))\)$
-
Warmup:初始阶段线性增加
理论依据:
- 大学习率:探索损失景观
- 小学习率:精细优化
- 学习率与批量大小成正比
class LearningRateTheory:
"""
学习率调度理论
"""
@staticmethod
def step_decay_schedule(initial_lr: float,
epoch: int,
drop_every: int = 30,
drop_factor: float = 0.1) -> float:
"""
Step decay学习率调度
Args:
initial_lr: 初始学习率
epoch: 当前epoch
drop_every: 每隔多少epoch降低
drop_factor: 降低因子
Returns:
当前学习率
"""
n_drops = epoch // drop_every
lr = initial_lr * (drop_factor ** n_drops)
return lr
@staticmethod
def exponential_decay_schedule(initial_lr: float,
epoch: int,
decay_rate: float = 0.95) -> float:
"""
指数衰减学习率
Args:
initial_lr: 初始学习率
epoch: 当前epoch
decay_rate: 衰减率
Returns:
当前学习率
"""
lr = initial_lr * (decay_rate ** epoch)
return lr
@staticmethod
def cosine_annealing_schedule(initial_lr: float,
epoch: int,
total_epochs: int,
min_lr: float = 0.0) -> float:
"""
余弦退火学习率调度
Args:
initial_lr: 初始学习率
epoch: 当前epoch
total_epochs: 总epoch数
min_lr: 最小学习率
Returns:
当前学习率
"""
import math
lr = min_lr + 0.5 * (initial_lr - min_lr) * (1 + math.cos(math.pi * epoch / total_epochs))
return lr
@staticmethod
def warmup_schedule(initial_lr: float,
epoch: int,
warmup_epochs: int) -> float:
"""
Warmup学习率调度
Args:
initial_lr: 目标学习率
epoch: 当前epoch
warmup_epochs: warmup epoch数
Returns:
当前学习率
"""
if epoch < warmup_epochs:
return initial_lr * (epoch + 1) / warmup_epochs
return initial_lr
@staticmethod
def lr_batch_size_scaling(base_lr: float,
base_batch_size: int,
new_batch_size: int) -> float:
"""
根据批量大小缩放学习率
线性缩放规则:lr ∝ batch_size
Args:
base_lr: 基础学习率
base_batch_size: 基础批量大小
new_batch_size: 新批量大小
Returns:
缩放后的学习率
"""
return base_lr * (new_batch_size / base_batch_size)
def learning_rate_example():
"""学习率调度示例"""
print("\n" + "=" * 60)
print("学习率调度理论")
print("=" * 60)
lr_theory = LearningRateTheory()
# 不同调度策略比较
print("\n1. 学习率调度策略比较")
print("-" * 40)
initial_lr = 0.1
total_epochs = 100
epochs = [0, 25, 50, 75, 99]
print(f"{'Epoch':<10} {'Step':<12} {'Exponential':<15} {'Cosine':<12}")
print("-" * 55)
for epoch in epochs:
step_lr = lr_theory.step_decay_schedule(initial_lr, epoch, drop_every=30)
exp_lr = lr_theory.exponential_decay_schedule(initial_lr, epoch, 0.97)
cos_lr = lr_theory.cosine_annealing_schedule(initial_lr, epoch, total_epochs)
print(f"{epoch:<10} {step_lr:<12.6f} {exp_lr:<15.6f} {cos_lr:<12.6f}")
# Warmup
print("\n2. Warmup调度")
print("-" * 40)
warmup_epochs = 10
for epoch in [0, 5, 9, 10, 20]:
lr = lr_theory.warmup_schedule(initial_lr, epoch, warmup_epochs)
print(f" Epoch {epoch:2d}: lr = {lr:.6f}")
# 批量大小缩放
print("\n3. 批量大小缩放")
print("-" * 40)
base_lr = 0.01
base_bs = 256
batch_sizes = [64, 128, 256, 512, 1024]
for bs in batch_sizes:
scaled_lr = lr_theory.lr_batch_size_scaling(base_lr, base_bs, bs)
print(f" Batch size {bs:4d}: lr = {scaled_lr:.6f}")
# 运行示例
if __name__ == "__main__":
learning_rate_example()
6.3 训练收敛理论¶
收敛性定义:
算法收敛如果: $\(\lim_{t \to \infty} \mathcal{L}(\theta_t) = \mathcal{L}^*\)$
其中 \(\mathcal{L}^*\) 是最优损失。
凸优化收敛率:
| 算法 | 强凸 | 凸 |
|---|---|---|
| GD | \(O(e^{-t/\kappa})\) | \(O(1/t)\) |
| SGD | \(O(1/t)\) | \(O(1/\sqrt{t})\) |
| Nesterov | \(O(e^{-t/\sqrt{\kappa}})\) | \(O(1/t^2)\) |
非凸优化:
- 只能保证收敛到一阶稳定点(梯度为零)
- 收敛率通常为 \(O(1/\sqrt{t})\)
提前停止:
在验证损失开始上升前停止训练,防止过拟合。
class ConvergenceTheory:
"""
训练收敛理论
"""
@staticmethod
def convex_convergence_rate(iteration: int,
condition_number: float = 100.0,
algorithm: str = 'gd') -> float:
"""
计算凸优化的理论收敛率
Args:
iteration: 迭代次数
condition_number: 条件数
algorithm: 算法类型
Returns:
误差上界
"""
if algorithm == 'gd':
# 梯度下降: O(e^(-t/κ))
return np.exp(-iteration / condition_number)
elif algorithm == 'sgd':
# SGD: O(1/t)
return 1.0 / iteration
elif algorithm == 'nesterov':
# Nesterov加速: O(e^(-t/√κ))
return np.exp(-iteration / np.sqrt(condition_number))
else:
return 1.0 / iteration
@staticmethod
def nonconvex_convergence_rate(iteration: int,
lipschitz_constant: float = 1.0) -> float:
"""
非凸优化的收敛率
Args:
iteration: 迭代次数
lipschitz_constant: Lipschitz常数
Returns:
梯度范数上界
"""
# 非凸: O(1/√t)
return lipschitz_constant / np.sqrt(iteration)
@staticmethod
def early_stopping_criterion(validation_losses: np.ndarray,
patience: int = 10) -> int:
"""
提前停止准则
Args:
validation_losses: 验证损失历史
patience: 容忍次数
Returns:
建议停止的epoch
"""
best_loss = float('inf')
best_epoch = 0
for epoch, loss in enumerate(validation_losses):
if loss < best_loss:
best_loss = loss
best_epoch = epoch
elif epoch - best_epoch >= patience:
return epoch
return len(validation_losses)
@staticmethod
def convergence_diagnostic(loss_history: np.ndarray,
gradient_norm_history: np.ndarray = None) -> dict:
"""
收敛诊断
Args:
loss_history: 损失历史
gradient_norm_history: 梯度范数历史
Returns:
诊断结果
"""
diagnostic = {}
# 损失下降速度
if len(loss_history) > 1:
loss_changes = np.diff(loss_history)
diagnostic['mean_loss_change'] = np.mean(loss_changes)
diagnostic['recent_loss_change'] = np.mean(loss_changes[-10:])
# 是否收敛
if gradient_norm_history is not None:
diagnostic['final_gradient_norm'] = gradient_norm_history[-1]
diagnostic['converged'] = gradient_norm_history[-1] < 1e-6
# 损失波动
diagnostic['loss_variance'] = np.var(loss_history[-20:])
return diagnostic
def convergence_example():
"""收敛理论示例"""
print("\n" + "=" * 60)
print("训练收敛理论")
print("=" * 60)
ct = ConvergenceTheory()
# 凸优化收敛率
print("\n1. 凸优化收敛率")
print("-" * 40)
iterations = [10, 100, 1000, 10000]
condition_number = 100
print(f"{'Iter':<10} {'GD':<15} {'SGD':<15} {'Nesterov':<15}")
print("-" * 55)
for t in iterations:
gd_rate = ct.convex_convergence_rate(t, condition_number, 'gd')
sgd_rate = ct.convex_convergence_rate(t, condition_number, 'sgd')
nest_rate = ct.convex_convergence_rate(t, condition_number, 'nesterov')
print(f"{t:<10} {gd_rate:<15.6e} {sgd_rate:<15.6e} {nest_rate:<15.6e}")
# 非凸收敛
print("\n2. 非凸优化收敛率")
print("-" * 40)
for t in iterations:
rate = ct.nonconvex_convergence_rate(t)
print(f" Iteration {t:5d}: gradient norm <= {rate:.6e}")
# 提前停止
print("\n3. 提前停止")
print("-" * 40)
# 模拟验证损失
np.random.seed(42)
val_losses = 1.0 * np.exp(-np.arange(100) / 30) + 0.1
val_losses[60:] += np.linspace(0, 0.1, 40) # 后期上升
stop_epoch = ct.early_stopping_criterion(val_losses, patience=10)
print(f" 建议停止epoch: {stop_epoch}")
print(f" 最佳验证损失: {np.min(val_losses):.4f}")
# 运行示例
if __name__ == "__main__":
convergence_example()
7. 理论前沿与挑战¶
7.1 开放问题¶
主要开放问题:
- 泛化的完整理论
- 为什么过参数化网络能泛化?
-
隐式正则化的精确刻画
-
优化的全局收敛
- 非凸损失的全局收敛保证
-
鞍点逃离的精确分析
-
深度 vs 宽度
- 深度的真实作用
-
最优网络架构设计
-
表示学习的理论
- 什么构成好的表示?
- 自监督学习的理论保证
7.2 新兴研究方向¶
神经正切核的扩展: - 有限宽度网络的修正 - 特征学习regime的分析
平均场理论: - 无限宽度极限的替代视角 - 训练动态的宏观描述
信息论方法: - 信息瓶颈的深化 - 压缩与预测的权衡
因果推断: - 因果表示学习 - 分布外泛化
7.3 理论与实践的结合¶
理论指导实践:
- 架构设计:理解归纳偏置指导架构选择
- 优化策略:基于理论设计学习率调度
- 正则化技术:理论启发的正则化方法
- 模型选择:基于复杂度理论的模型选择
实践反馈理论:
- 现象发现:实验发现新现象驱动理论发展
- 理论验证:大规模实验验证理论预测
- 理论修正:根据实验结果修正理论假设
class TheoryPracticeIntegration:
"""
理论与实践的结合
"""
@staticmethod
def architecture_selection_guide(data_properties: dict) -> str:
"""
基于理论指导架构选择
Args:
data_properties: 数据属性
Returns:
架构建议
"""
if data_properties.get('spatial_structure', False):
return "CNN - 利用局部性和平移等变性"
elif data_properties.get('sequential', False):
if data_properties.get('long_range_dependencies', False):
return "Transformer - 处理长程依赖"
else:
return "RNN/LSTM - 序列建模"
elif data_properties.get('graph_structure', False):
return "GNN - 图结构数据"
else:
return "MLP - 通用函数逼近"
@staticmethod
def optimization_recommendations(model_size: int,
data_size: int) -> dict:
"""
基于理论的优化建议
Args:
model_size: 模型参数量
data_size: 数据量
Returns:
优化建议
"""
recommendations = {}
# 学习率建议
if model_size > 1e7:
recommendations['learning_rate'] = 1e-4
recommendations['scheduler'] = 'cosine_with_warmup'
else:
recommendations['learning_rate'] = 1e-3
recommendations['scheduler'] = 'step_decay'
# 批量大小
if data_size > 1e6:
recommendations['batch_size'] = 512
else:
recommendations['batch_size'] = 128
# 正则化
if model_size / data_size > 10:
recommendations['regularization'] = 'strong'
recommendations['dropout'] = 0.5
else:
recommendations['regularization'] = 'moderate'
recommendations['dropout'] = 0.2
return recommendations
@staticmethod
def theoretical_insights_summary():
"""理论洞察总结"""
insights = {
"泛化": [
"平坦极小值通常泛化更好",
"有效复杂度决定泛化能力",
"双下降现象挑战传统理论"
],
"优化": [
"SGD隐式偏好简单解",
"学习率调度影响收敛质量",
"鞍点在高维中占主导"
],
"表示": [
"分布式表示具有指数级能力",
"层次结构自动学习",
"流形假设指导降维"
],
"架构": [
"归纳偏置决定适用场景",
"深度提供组合能力",
"宽度影响优化难度"
]
}
print("\n理论核心洞察:")
print("=" * 60)
for category, points in insights.items():
print(f"\n{category}:")
for point in points:
print(f" • {point}")
def theory_practice_example():
"""理论与实践结合示例"""
print("\n" + "=" * 60)
print("理论与实践的结合")
print("=" * 60)
tpi = TheoryPracticeIntegration()
# 架构选择
print("\n1. 架构选择指导")
print("-" * 40)
scenarios = [
{'spatial_structure': True, 'sequential': False},
{'spatial_structure': False, 'sequential': True, 'long_range_dependencies': True},
{'graph_structure': True},
]
for i, props in enumerate(scenarios):
recommendation = tpi.architecture_selection_guide(props)
print(f" 场景 {i+1}: {recommendation}")
# 优化建议
print("\n2. 优化建议")
print("-" * 40)
configs = [
{'model_size': 1e6, 'data_size': 1e5},
{'model_size': 1e8, 'data_size': 1e6},
]
for config in configs:
rec = tpi.optimization_recommendations(**config)
print(f"\n 模型规模: {config['model_size']:.0e}, 数据量: {config['data_size']:.0e}")
for k, v in rec.items():
print(f" {k}: {v}")
# 理论洞察
tpi.theoretical_insights_summary()
# 运行示例
if __name__ == "__main__":
theory_practice_example()
总结¶
本文档深入探讨了机器学习的核心理论,涵盖以下关键领域:
1. 泛化理论¶
- PAC学习框架提供了学习的基础理论保证
- VC维和Rademacher复杂度量化了模型复杂度
- 深度学习中的泛化之谜推动了对隐式正则化的研究
2. 表示学习理论¶
- 流形假设解释了高维数据的低维结构
- 分布式表示具有指数级表示能力
- 信息瓶颈理论指导表示学习的设计
3. 优化景观分析¶
- 临界点分析揭示了损失函数的几何结构
- 鞍点问题要求优化算法具有逃离能力
- 平坦极小值与泛化性能密切相关
4. 神经正切核理论¶
- NTK描述了宽网络的训练动态
- Kernel regime vs Feature learning regime
- 为理解深度学习提供了新的理论视角
5. 归纳偏置¶
- 架构设计蕴含特定的归纳偏置
- 优化算法具有隐式偏置
- 数据分布影响学习过程
6. 学习动态¶
- 训练过程经历多个相变阶段
- 学习率调度对收敛至关重要
- 收敛理论指导训练设计
这些理论为理解和改进机器学习算法提供了坚实的基础,同时也有许多开放问题等待进一步研究。