跳转至

核心理论 - 机器学习理论的深层解析

机器学习核心理论图

目录

  1. 泛化理论
  2. 表示学习理论
  3. 优化景观分析
  4. 神经正切核理论
  5. 深度学习的归纳偏置
  6. 学习动态与训练理论
  7. 理论前沿与挑战

1. 泛化理论

1.1 PAC学习框架

定义(PAC可学习性):一个概念类 \(\mathcal{C}\) 是PAC可学习的,如果存在一个算法 \(\mathcal{A}\),对于任意 \(\epsilon > 0\)(精度)、\(\delta > 0\)(置信度)、任意目标概念 \(c \in \mathcal{C}\) 和任意分布 \(\mathcal{D}\),算法 \(\mathcal{A}\)\(\mathcal{D}\) 中采样 \(m\) 个样本后,以至少 \(1-\delta\) 的概率输出一个假设 \(h\),使得:

\[R(h) \leq R(c) + \epsilon\]

其中 \(R(h) = \mathbb{E}_{x \sim \mathcal{D}}[\mathbb{1}_{h(x) \neq c(x)}]\) 是泛化误差。

样本复杂度

对于有限假设空间 \(|\mathcal{H}| < \infty\)

\[m \geq \frac{1}{\epsilon}\left(\ln|\mathcal{H}| + \ln\frac{1}{\delta}\right)\]

定理(PAC学习基本定理)

对于二分类问题,ERM(经验风险最小化)算法满足:

\[R(\hat{h}) - R(h^*) \leq \sqrt{\frac{2d\ln(em/d) + 2\ln(4/\delta)}{m}}\]

其中 \(d\) 是VC维,\(h^*\) 是最优假设。

Python
import numpy as np
from collections.abc import Callable
import matplotlib.pyplot as plt

class PACLearningFramework:
    """
    PAC学习框架的实现与验证
    """

    def __init__(self, hypothesis_space_size: int, epsilon: float = 0.1, delta: float = 0.05):  # __init__构造方法,创建对象时自动调用
        """
        初始化PAC学习框架

        Args:
            hypothesis_space_size: 假设空间大小 |H|
            epsilon: 精度参数
            delta: 置信度参数
        """
        self.hypothesis_space_size = hypothesis_space_size
        self.epsilon = epsilon
        self.delta = delta

    def sample_complexity_finite(self) -> int:
        """
        计算有限假设空间的样本复杂度

        Returns:
            所需样本数 m
        """
        # m >= (1/epsilon) * (ln|H| + ln(1/delta))
        m = int(np.ceil((1.0 / self.epsilon) *
                       (np.log(self.hypothesis_space_size) + np.log(1.0 / self.delta))))
        return m

    def sample_complexity_vc(self, vc_dim: int) -> int:
        """
        基于VC维的样本复杂度

        Args:
            vc_dim: VC维

        Returns:
            所需样本数 m
        """
        # m >= (1/epsilon) * (8*VC(H)*log(13/epsilon) + 4*log(2/delta))
        m = int(np.ceil((1.0 / self.epsilon) *
                       (8 * vc_dim * np.log(13.0 / self.epsilon) +
                        4 * np.log(2.0 / self.delta))))
        return m

    def generalization_bound_vc(self, m: int, vc_dim: int) -> float:
        """
        基于VC维的泛化误差上界

        Args:
            m: 样本数
            vc_dim: VC维

        Returns:
            泛化误差上界
        """
        # R(h) <= R_emp(h) + sqrt((2d*ln(em/d) + 2*ln(4/delta))/m)
        bound = np.sqrt((2 * vc_dim * np.log(np.e * m / vc_dim) +
                        2 * np.log(4.0 / self.delta)) / m)
        return bound

    def probably_approximately_correct(self,
                                       empirical_risk: float,
                                       m: int,
                                       vc_dim: int) -> tuple[bool, float]:
        """
        验证PAC条件

        Args:
            empirical_risk: 经验风险
            m: 样本数
            vc_dim: VC维

        Returns:
            (是否满足PAC, 泛化误差上界)
        """
        generalization_gap = self.generalization_bound_vc(m, vc_dim)
        true_risk_upper = empirical_risk + generalization_gap

        # 以至少 1-delta 的概率,真实误差不超过上界
        satisfies_pac = generalization_gap <= self.epsilon

        return satisfies_pac, true_risk_upper

    def visualize_sample_complexity(self, vc_dims: list[int] = None):
        """可视化样本复杂度随VC维的变化"""
        if vc_dims is None:
            vc_dims = list(range(1, 101))

        sample_complexities = [self.sample_complexity_vc(d) for d in vc_dims]  # 列表推导式,简洁创建列表

        plt.figure(figsize=(10, 6))
        plt.plot(vc_dims, sample_complexities, 'b-', linewidth=2)
        plt.xlabel('VC Dimension', fontsize=12)
        plt.ylabel('Sample Complexity m', fontsize=12)
        plt.title(f'PAC Sample Complexity (ε={self.epsilon}, δ={self.delta})', fontsize=14)
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig('pac_sample_complexity.png', dpi=150)
        plt.show()

# 示例:PAC学习框架验证
def pac_learning_example():
    """PAC学习框架示例"""
    # 创建PAC框架实例
    pac = PACLearningFramework(hypothesis_space_size=1000, epsilon=0.1, delta=0.05)

    # 计算不同VC维下的样本复杂度
    vc_dims = [5, 10, 50, 100]
    print("=" * 60)
    print("PAC学习框架分析")
    print("=" * 60)

    print(f"\n假设空间大小 |H| = {pac.hypothesis_space_size}")
    print(f"精度参数 ε = {pac.epsilon}")
    print(f"置信度参数 δ = {pac.delta}")

    print(f"\n有限假设空间样本复杂度: m >= {pac.sample_complexity_finite()}")

    print("\n不同VC维下的样本复杂度:")
    for d in vc_dims:
        m = pac.sample_complexity_vc(d)
        print(f"  VC维 = {d:3d}: m >= {m:5d}")

    # 验证泛化界
    print("\n泛化误差界分析 (m=1000):")
    for d in [10, 50, 100]:
        bound = pac.generalization_bound_vc(m=1000, vc_dim=d)
        satisfies, upper = pac.probably_approximately_correct(
            empirical_risk=0.05, m=1000, vc_dim=d)
        print(f"  VC维 = {d:3d}: 泛化界 = {bound:.4f}, 满足PAC: {satisfies}")

    return pac

# 运行示例
if __name__ == "__main__":
    pac = pac_learning_example()

1.2 VC维与Rademacher复杂度

VC维定义:假设空间 \(\mathcal{H}\) 的VC维是能够被 \(\mathcal{H}\) 打散(shatter)的最大点集的大小。

打散定义:一个点集 \(S = \{x_1, ..., x_m\}\) 能被 \(\mathcal{H}\) 打散,如果对于所有 \(2^m\) 种标签组合,都存在 \(h \in \mathcal{H}\) 能够实现这种标签分配。

常见模型的VC维

模型 VC维 说明
线性分类器(d维) \(d+1\) 超平面分类器
感知机(d维) \(d+1\) 带阈值
神经网络 \(O(W\log W)\) \(W\)为权重数
决策树(深度k) \(2^k\) 二叉树
SVM(RBF核) \(\infty\) 无限VC维

Rademacher复杂度

\[\mathfrak{R}_m(\mathcal{H}) = \mathbb{E}_{S,\sigma}\left[\sup_{h \in \mathcal{H}}\frac{1}{m}\sum_{i=1}^m \sigma_i h(x_i)\right]\]

其中 \(\sigma_i \in \{-1, +1\}\) 是Rademacher随机变量。

泛化界(基于Rademacher复杂度)

以至少 \(1-\delta\) 的概率,对所有 \(h \in \mathcal{H}\)

\[R(h) \leq \hat{R}(h) + 2\mathfrak{R}_m(\mathcal{H}) + \sqrt{\frac{\ln(1/\delta)}{2m}}\]
Python
class ComplexityMeasures:
    """
    复杂度度量:VC维和Rademacher复杂度
    """

    @staticmethod  # @staticmethod静态方法,无需实例即可调用
    def vc_dim_linear(d: int) -> int:
        """d维线性分类器的VC维"""
        return d + 1

    @staticmethod
    def vc_dim_neural_network(weights: int, depth: int = None) -> int:
        """
        神经网络的VC维上界

        Args:
            weights: 权重参数数量
            depth: 网络深度(可选)

        Returns:
            VC维上界
        """
        # 简化上界: O(W log W)
        if depth is None:
            return int(weights * np.log2(weights))
        else:
            # 更紧的上界考虑深度
            return int(depth * weights * np.log2(weights))

    @staticmethod
    def rademacher_complexity_bound(m: int, vc_dim: int) -> float:
        """
        基于VC维的Rademacher复杂度上界

        R_m(H) <= sqrt(2d*ln(em/d)/m)
        """
        if m <= vc_dim:
            return 1.0
        return np.sqrt(2 * vc_dim * np.log(np.e * m / vc_dim) / m)

    @staticmethod
    def empirical_rademacher(X: np.ndarray,
                             hypothesis_class: Callable,
                             n_samples: int = 100) -> float:
        """
        估计经验Rademacher复杂度

        Args:
            X: 数据矩阵 (n_samples, n_features)
            hypothesis_class: 假设类函数
            n_samples: Rademacher采样次数

        Returns:
            经验Rademacher复杂度估计
        """
        n = len(X)
        rademacher_values = []

        for _ in range(n_samples):
            # 生成Rademacher随机变量
            sigma = np.random.choice([-1, 1], size=n)

            # 在假设类上最大化
            # 简化版本:随机采样假设
            max_correlation = 0
            for _ in range(50):  # 采样假设
                h = hypothesis_class(X)
                correlation = np.mean(sigma * h)
                max_correlation = max(max_correlation, correlation)

            rademacher_values.append(max_correlation)

        return np.mean(rademacher_values)

    @staticmethod
    def margin_bound(rademacher: float,
                     margin: float,
                     m: int,
                     delta: float = 0.05) -> float:
        """
        基于间隔的泛化界

        对于间隔为margin的分类器,泛化误差上界
        """
        # R(h) <= R_emp^margin(h) + (2/margin)*R_m(H) + sqrt(ln(1/delta)/(2m))
        bound = (2.0 / margin) * rademacher + np.sqrt(np.log(1.0 / delta) / (2 * m))
        return bound

def complexity_analysis_example():
    """复杂度分析示例"""
    print("\n" + "=" * 60)
    print("复杂度度量分析")
    print("=" * 60)

    cm = ComplexityMeasures()

    # VC维分析
    print("\n1. VC维分析")
    print("-" * 40)

    dimensions = [10, 100, 1000, 10000]
    for d in dimensions:
        vc_linear = cm.vc_dim_linear(d)
        print(f"  输入维度 d={d:5d}: 线性分类器VC维 = {vc_linear:6d}")

    # 神经网络VC维
    print("\n  神经网络VC维上界:")
    weight_counts = [1000, 10000, 100000, 1000000]
    for w in weight_counts:
        vc_nn = cm.vc_dim_neural_network(w)
        print(f"    权重数 W={w:8d}: VC维上界 ≈ {vc_nn:10d}")

    # Rademacher复杂度
    print("\n2. Rademacher复杂度分析")
    print("-" * 40)

    sample_sizes = [100, 1000, 10000, 100000]
    vc_dim = 100

    for m in sample_sizes:
        rad_bound = cm.rademacher_complexity_bound(m, vc_dim)
        print(f"  样本数 m={m:6d}, VC维 d={vc_dim:3d}: "
              f"Rademacher上界 = {rad_bound:.4f}")

    # 间隔界
    print("\n3. 基于间隔的泛化界")
    print("-" * 40)

    margins = [0.1, 0.5, 1.0, 2.0]
    m, rad = 1000, 0.1

    for margin in margins:
        bound = cm.margin_bound(rad, margin, m)
        print(f"  间隔 γ={margin:.1f}: 泛化界 = {bound:.4f}")

# 运行示例
if __name__ == "__main__":
    complexity_analysis_example()

1.3 均匀收敛与泛化界

均匀收敛定理:对于假设空间 \(\mathcal{H}\),以至少 \(1-\delta\) 的概率:

\[\sup_{h \in \mathcal{H}} |R(h) - \hat{R}(h)| \leq 2\mathfrak{R}_m(\mathcal{H}) + \sqrt{\frac{\ln(1/\delta)}{2m}}\]

McDiarmid不等式:设 \(f: \mathcal{X}^m \rightarrow \mathbb{R}\) 满足有界差分条件:

\[\sup_{x_1,...,x_m,x_i'} |f(x_1,...,x_i,...,x_m) - f(x_1,...,x_i',...,x_m)| \leq c_i\]

则:

\[P(|f(X_1,...,X_m) - \mathbb{E}[f]| \geq t) \leq 2\exp\left(-\frac{2t^2}{\sum_{i=1}^m c_i^2}\right)\]

基于覆盖数的泛化界

\[\mathfrak{R}_m(\mathcal{H}) \leq \inf_{\epsilon > 0}\left(\epsilon + \sqrt{\frac{2\ln\mathcal{N}(\mathcal{H}, \epsilon, L_2)}{m}}\right)\]

其中 \(\mathcal{N}(\mathcal{H}, \epsilon, L_2)\)\(L_2\) 覆盖数。

Python
class UniformConvergence:
    """
    均匀收敛理论与泛化界
    """

    @staticmethod
    def mcdiarmid_inequality(bounded_differences: list[float],
                            t: float) -> float:
        """
        McDiarmid不等式

        P(|f - E[f]| >= t) <= 2*exp(-2t^2 / sum(c_i^2))

        Args:
            bounded_differences: 有界差分 c_i 列表
            t: 偏离阈值

        Returns:
            概率上界
        """
        sum_c_squared = sum(c**2 for c in bounded_differences)
        bound = 2 * np.exp(-2 * t**2 / sum_c_squared)
        return bound

    @staticmethod
    def covering_number_bound(m: int,
                               covering_number: int,
                               epsilon: float) -> float:
        """
        基于覆盖数的Rademacher复杂度上界

        Args:
            m: 样本数
            covering_number: 覆盖数 N(H, epsilon)
            epsilon: 覆盖半径

        Returns:
            Rademacher复杂度上界
        """
        bound = epsilon + np.sqrt(2 * np.log(covering_number) / m)
        return bound

    @staticmethod
    def uniform_convergence_bound(rademacher: float,
                                   m: int,
                                   delta: float = 0.05) -> float:
        """
        均匀收敛界

        sup|R(h) - R_hat(h)| <= 2*R_m(H) + sqrt(ln(1/delta)/(2m))
        """
        bound = 2 * rademacher + np.sqrt(np.log(1.0 / delta) / (2 * m))
        return bound

    @staticmethod
    def fat_shattering_dimension(margin: float,
                                  data: np.ndarray) -> int:
        """
        胖粉碎维(Fat-shattering dimension)

        实值函数类的推广VC维

        Args:
            margin: 间隔参数
            data: 数据集

        Returns:
            胖粉碎维估计
        """
        # 简化实现:基于间隔的VC维估计
        n_samples, n_features = data.shape

        # 胖粉碎维与间隔成反比
        # 对于线性分类器: fat_gamma = O(1/gamma^2)
        fat_dim = int(n_features / (margin ** 2))

        return min(fat_dim, n_samples)

def uniform_convergence_example():
    """均匀收敛示例"""
    print("\n" + "=" * 60)
    print("均匀收敛理论")
    print("=" * 60)

    uc = UniformConvergence()

    # McDiarmid不等式
    print("\n1. McDiarmid不等式")
    print("-" * 40)

    # 经验风险的有界差分
    n_samples = 1000
    bounded_diffs = [1.0 / n_samples] * n_samples  # 每个样本影响最大1/n

    thresholds = [0.01, 0.05, 0.1, 0.2]
    for t in thresholds:
        prob_bound = uc.mcdiarmid_inequality(bounded_diffs, t)
        print(f"  偏离阈值 t={t:.2f}: P(|f-E[f]|>={t}) <= {prob_bound:.6f}")

    # 均匀收敛界
    print("\n2. 均匀收敛界")
    print("-" * 40)

    m_values = [100, 500, 1000, 5000]
    rademacher = 0.1

    for m in m_values:
        bound = uc.uniform_convergence_bound(rademacher, m)
        print(f"  样本数 m={m:4d}: 均匀收敛界 = {bound:.4f}")

    # 胖粉碎维
    print("\n3. 胖粉碎维(实值函数类)")
    print("-" * 40)

    margins = [0.1, 0.5, 1.0, 2.0]
    data = np.random.randn(1000, 100)

    for margin in margins:
        fat_dim = uc.fat_shattering_dimension(margin, data)
        print(f"  间隔 γ={margin:.1f}: 胖粉碎维 ≈ {fat_dim}")

# 运行示例
if __name__ == "__main__":
    uniform_convergence_example()

1.4 深度学习中的泛化之谜

泛化之谜现象

深度神经网络通常具有: - 参数数量 \(>>\) 样本数量(过参数化) - VC维理论上无限或极大 - 但仍能泛化良好

可能的解释

  1. 隐式正则化(Implicit Regularization)
  2. 优化算法(如SGD)隐式偏好简单解
  3. 梯度下降收敛到平坦极小值

  4. 双下降现象(Double Descent)

  5. 传统U型风险曲线在过参数化后再次下降
  6. 插值阈值后的"良性过拟合"

  7. PAC-Bayes理论

  8. 考虑后验分布而非点估计
  9. 与优化轨迹相关
Python
class DeepGeneralizationMystery:
    """
    深度学习泛化之谜的分析
    """

    @staticmethod
    def effective_complexity(parameters: int,
                            training_epochs: int,
                            batch_size: int,
                            learning_rate: float) -> float:
        """
        估计有效复杂度(考虑优化过程)

        启发式:有效复杂度 < 参数数量

        Args:
            parameters: 参数数量
            training_epochs: 训练轮数
            batch_size: 批量大小
            learning_rate: 学习率

        Returns:
            有效复杂度估计
        """
        # 启发式:有效复杂度与训练步数相关
        steps = training_epochs * (parameters / batch_size)

        # 学习率影响收敛到的解的复杂度
        lr_factor = np.tanh(learning_rate * 10)  # 归一化

        # 有效复杂度上界
        effective = min(parameters, steps * lr_factor)

        return effective

    @staticmethod
    def flat_minima_generalization(hessian_eigenvalues: np.ndarray,
                                   empirical_risk: float) -> float:
        """
        基于Hessian特征值的平坦极小值泛化分析

        平坦极小值通常泛化更好

        Args:
            hessian_eigenvalues: Hessian矩阵特征值
            empirical_risk: 经验风险

        Returns:
            泛化能力评分(越高越好)
        """
        # 计算Hessian的迹(特征值之和)
        trace = np.sum(hessian_eigenvalues)

        # 计算特征值分布的集中度
        eigenvalue_variance = np.var(hessian_eigenvalues)

        # 平坦度指标:迹越小、方差越小越平坦
        flatness_score = 1.0 / (1.0 + trace + eigenvalue_variance)

        # 综合泛化评分
        generalization_score = (1 - empirical_risk) * flatness_score

        return generalization_score

    @staticmethod
    def double_descent_curve(n_samples: int,
                             max_params: int,
                             noise_level: float = 0.1) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        生成双下降曲线

        Args:
            n_samples: 样本数
            max_params: 最大参数数
            noise_level: 噪声水平

        Returns:
            (参数数数组, 训练误差, 测试误差)
        """
        param_counts = np.arange(1, max_params + 1)
        train_errors = []
        test_errors = []

        for n_params in param_counts:
            if n_params < n_samples:
                # 欠参数化区域:传统U型曲线
                underfitting = np.exp(-n_params / (n_samples * 0.3))
                overfitting = 0.1 * (n_params / n_samples) ** 2
                test_err = underfitting + overfitting + noise_level
            elif n_params == n_samples:
                # 插值阈值:峰值
                test_err = 0.5 + noise_level
            else:
                # 过参数化区域:良性过拟合
                interpolation = noise_level * (n_samples / n_params)
                test_err = interpolation + 0.05

            # 训练误差单调递减
            train_err = max(0.01, noise_level * np.exp(-n_params / n_samples))

            train_errors.append(train_err)
            test_errors.append(test_err)

        return param_counts, np.array(train_errors), np.array(test_errors)  # np.array创建NumPy数组

    @staticmethod
    def pac_bayes_bound(empirical_risk: float,
                       kl_divergence: float,
                       m: int,
                       delta: float = 0.05) -> float:
        """
        PAC-Bayes泛化界

        Args:
            empirical_risk: 经验风险
            kl_divergence: KL散度 D(Q||P)
            m: 样本数
            delta: 置信度

        Returns:
            PAC-Bayes泛化界
        """
        # KL项
        kl_term = (kl_divergence + np.log(2 * np.sqrt(m) / delta)) / m

        # PAC-Bayes界
        bound = empirical_risk + np.sqrt(kl_term / 2)

        return bound

def deep_generalization_example():
    """深度学习泛化之谜示例"""
    print("\n" + "=" * 60)
    print("深度学习泛化之谜")
    print("=" * 60)

    dgm = DeepGeneralizationMystery()

    # 有效复杂度
    print("\n1. 有效复杂度分析")
    print("-" * 40)

    configs = [
        {"params": 10000, "epochs": 100, "batch": 32, "lr": 0.001},
        {"params": 1000000, "epochs": 50, "batch": 128, "lr": 0.01},
        {"params": 100000000, "epochs": 10, "batch": 256, "lr": 0.1},
    ]

    for config in configs:
        effective = dgm.effective_complexity(
            config["params"], config["epochs"],
            config["batch"], config["lr"]
        )
        ratio = effective / config["params"]
        print(f"  参数: {config['params']:10d}, 有效复杂度: {effective:12.1f}, "
              f"比例: {ratio:.4f}")

    # 双下降曲线
    print("\n2. 双下降现象")
    print("-" * 40)

    n_samples = 100
    max_params = 300

    params, train_err, test_err = dgm.double_descent_curve(
        n_samples, max_params, noise_level=0.1)

    print(f"  样本数: {n_samples}")
    print(f"  插值阈值(参数数=样本数): {n_samples}")
    print(f"  阈值处测试误差: {test_err[n_samples-1]:.4f}")
    print(f"  最大参数处测试误差: {test_err[-1]:.4f}")

    # 平坦极小值
    print("\n3. 平坦极小值分析")
    print("-" * 40)

    # 模拟Hessian特征值
    sharp_eigenvalues = np.array([10, 8, 5, 3, 2, 1, 0.5, 0.1])  # 尖锐
    flat_eigenvalues = np.array([2, 1.5, 1, 0.8, 0.5, 0.3, 0.2, 0.1])  # 平坦

    sharp_score = dgm.flat_minima_generalization(sharp_eigenvalues, 0.05)
    flat_score = dgm.flat_minima_generalization(flat_eigenvalues, 0.05)

    print(f"  尖锐极小值泛化评分: {sharp_score:.4f}")
    print(f"  平坦极小值泛化评分: {flat_score:.4f}")
    print(f"  平坦/尖锐比值: {flat_score/sharp_score:.2f}x")

    # PAC-Bayes
    print("\n4. PAC-Bayes界")
    print("-" * 40)

    kl_divs = [1, 10, 100, 1000]
    for kl in kl_divs:
        bound = dgm.pac_bayes_bound(0.05, kl, m=1000)
        print(f"  KL散度={kl:4d}: PAC-Bayes界 = {bound:.4f}")

# 运行示例
if __name__ == "__main__":
    deep_generalization_example()

2. 表示学习理论

2.1 流形假设与数据表示

流形假设:高维数据(如图像、文本)实际分布在低维流形上。

数学表述

设数据 \(x \in \mathbb{R}^d\) 实际来自 \(k\) 维流形 \(\mathcal{M} \subset \mathbb{R}^d\),其中 \(k << d\)

局部线性嵌入(LLE)

  1. 为每个点找到 \(k\) 近邻
  2. 计算重构权重 \(W_{ij}\),最小化: $\(\mathcal{E}(W) = \sum_i \|x_i - \sum_j W_{ij}x_j\|^2\)$
  3. 在低维空间寻找嵌入 \(Y\),保持权重: $\(\Phi(Y) = \sum_i \|y_i - \sum_j W_{ij}y_j\|^2\)$

t-SNE优化目标

\[KL(P||Q) = \sum_{i \neq j} p_{ij} \log\frac{p_{ij}}{q_{ij}}\]

其中 \(p_{ij}\) 是高维空间中的相似度,\(q_{ij}\) 是低维空间中的相似度。

Python
class ManifoldLearning:
    """
    流形学习理论与实现
    """

    @staticmethod
    def lle(X: np.ndarray, n_components: int = 2, n_neighbors: int = 10) -> np.ndarray:
        """
        局部线性嵌入 (LLE)

        Args:
            X: 高维数据 (n_samples, n_features)
            n_components: 目标维度
            n_neighbors: 近邻数

        Returns:
            低维嵌入 (n_samples, n_components)
        """
        n_samples = X.shape[0]

        # 1. 计算k近邻
        from scipy.spatial.distance import cdist
        distances = cdist(X, X)
        neighbors = np.argsort(distances, axis=1)[:, 1:n_neighbors+1]

        # 2. 计算重构权重
        W = np.zeros((n_samples, n_samples))

        for i in range(n_samples):
            # 中心化的近邻点
            Xi = X[neighbors[i]] - X[i]

            # 局部协方差矩阵
            C = Xi @ Xi.T

            # 添加正则化
            C += np.eye(n_neighbors) * 1e-3 * np.trace(C)

            # 求解权重
            w = np.linalg.solve(C, np.ones(n_neighbors))
            w /= np.sum(w)  # 归一化

            W[i, neighbors[i]] = w

        # 3. 计算低维嵌入
        M = (np.eye(n_samples) - W).T @ (np.eye(n_samples) - W)

        # 特征值分解
        eigenvalues, eigenvectors = np.linalg.eigh(M)

        # 选择最小的非零特征值对应的特征向量
        Y = eigenvectors[:, 1:n_components+1]

        return Y

    @staticmethod
    def intrinsic_dimension(X: np.ndarray,
                           k: int = 10,
                           method: str = 'correlation') -> float:
        """
        估计数据的内在维度

        Args:
            X: 数据矩阵
            k: 近邻数
            method: 估计方法

        Returns:
            内在维度估计
        """
        from scipy.spatial.distance import cdist

        n_samples = X.shape[0]
        distances = cdist(X, X)

        # 对每个点计算到k近邻的距离
        neighbor_distances = np.sort(distances, axis=1)[:, 1:k+1]

        if method == 'correlation':
            # 基于距离相关性的方法
            # 在d维流形上,体积按r^d增长

            # 计算平均距离
            r = np.mean(neighbor_distances, axis=0)
            k_values = np.arange(1, k+1)

            # 拟合 log(k) ~ d * log(r)
            log_r = np.log(r + 1e-10)
            log_k = np.log(k_values)

            # 线性回归估计斜率
            slope = np.polyfit(log_r, log_k, 1)[0]

            return slope

        elif method == 'mle':
            # 最大似然估计
            # d_hat = (1/(N-1)) * sum(log(r_k / r_i))^{-1}

            dims = []
            for i in range(n_samples):
                r_i = neighbor_distances[i, :-1]
                r_k = neighbor_distances[i, -1]

                if r_k > 0:
                    d_hat = 1.0 / (np.mean(np.log(r_k / (r_i + 1e-10))) + 1e-10)
                    dims.append(d_hat)

            return np.median(dims)

        return 0.0

    @staticmethod
    def manifold_hypothesis_validation(X: np.ndarray,
                                       embedding_methods: list[str] = None) -> dict:
        """
        验证流形假设

        Args:
            X: 数据矩阵
            embedding_methods: 嵌入方法列表

        Returns:
            验证结果字典
        """
        if embedding_methods is None:
            embedding_methods = ['pca', 'lle', 'tsne']

        results = {}

        # 估计内在维度
        intrinsic_dim = ManifoldLearning.intrinsic_dimension(X)
        results['intrinsic_dimension'] = intrinsic_dim
        results['ambient_dimension'] = X.shape[1]
        results['dimension_ratio'] = intrinsic_dim / X.shape[1]

        print(f"环境维度: {X.shape[1]}")
        print(f"估计内在维度: {intrinsic_dim:.2f}")
        print(f"维度比: {intrinsic_dim/X.shape[1]:.4f}")

        return results

def manifold_learning_example():
    """流形学习示例"""
    print("\n" + "=" * 60)
    print("流形学习理论")
    print("=" * 60)

    ml = ManifoldLearning()

    # 生成瑞士卷数据
    from sklearn.datasets import make_swiss_roll
    X, t = make_swiss_roll(n_samples=1000, noise=0.1, random_state=42)

    print("\n1. 瑞士卷数据流形分析")
    print("-" * 40)
    print(f"  数据形状: {X.shape}")

    # 估计内在维度
    intrinsic_dim = ml.intrinsic_dimension(X, k=10)
    print(f"  估计内在维度: {intrinsic_dim:.2f} (真实: 2)")

    # LLE降维
    print("\n2. LLE降维")
    print("-" * 40)

    X_lle = ml.lle(X, n_components=2, n_neighbors=10)
    print(f"  降维后形状: {X_lle.shape}")

    # 验证流形假设
    print("\n3. 流形假设验证")
    print("-" * 40)

    results = ml.manifold_hypothesis_validation(X)
    print(f"  维度压缩比: {results['dimension_ratio']:.4f}")

    return X, X_lle

# 运行示例
if __name__ == "__main__":
    X_swiss, X_lle = manifold_learning_example()

2.2 分布式表示与层次表示

分布式表示:信息分布在多个神经元/特征上,每个神经元参与表示多个概念。

优点: - 指数级表示能力:\(n\) 个二元神经元可表示 \(2^n\) 个模式 - 泛化:相似概念有相似表示 - 组合性:可以组合基本特征表示复杂概念

层次表示

深度网络学习层次特征: - 底层:边缘、颜色、纹理 - 中层:形状、部件 - 高层:物体、语义

数学分析

\(h^{(l)}\) 为第 \(l\) 层的表示,则:

\[h^{(l)} = f(W^{(l)} h^{(l-1)} + b^{(l)})\]

表示能力定理

具有 \(L\) 层、每层 \(n\) 个神经元的网络,其可表示的函数类大小为 \(O(2^{n^L})\)

Python
class DistributedRepresentation:
    """
    分布式表示与层次表示分析
    """

    @staticmethod
    def representation_capacity(n_neurons: int,
                                n_layers: int,
                                binary: bool = True) -> int:
        """
        计算网络的表示能力

        Args:
            n_neurons: 每层神经元数
            n_layers: 层数
            binary: 是否二元激活

        Returns:
            可表示的模式数
        """
        if binary:
            # 二元激活:每层有 2^n 种状态
            patterns_per_layer = 2 ** n_neurons
            # 总表示能力(考虑层次组合)
            total_capacity = patterns_per_layer ** n_layers
        else:
            # 连续激活:无限可能,考虑精度
            precision = 16  # 假设16位精度
            states_per_neuron = 2 ** precision
            total_capacity = states_per_neuron ** (n_neurons * n_layers)

        return total_capacity

    @staticmethod
    def compositionality_analysis(features: np.ndarray,
                                   compositions: list[tuple[int, ...]]) -> dict:
        """
        分析表示的组合性

        Args:
            features: 特征矩阵 (n_samples, n_features)
            compositions: 组合关系列表

        Returns:
            组合性分析结果
        """
        results = {}

        # 计算特征之间的相关性
        feature_corr = np.corrcoef(features.T)

        # 分析组合关系
        composition_scores = []
        for comp in compositions:
            if len(comp) >= 2:
                # 检查组合特征是否线性可分
                comp_features = features[:, comp]

                # 简单线性可分性检验
                # 如果组合特征的方差大于单个特征,说明有组合效应
                joint_variance = np.var(np.sum(comp_features, axis=1))
                individual_variances = [np.var(features[:, i]) for i in comp]

                composition_score = joint_variance / (sum(individual_variances) + 1e-10)
                composition_scores.append(composition_score)

        results['mean_composition_score'] = np.mean(composition_scores)
        results['feature_correlation_mean'] = np.mean(np.abs(feature_corr[np.triu_indices_from(feature_corr, k=1)]))

        return results

    @staticmethod
    def hierarchical_decomposition(X: np.ndarray,
                                   layer_sizes: list[int]) -> list[np.ndarray]:
        """
        层次分解数据表示

        Args:
            X: 输入数据
            layer_sizes: 每层的大小

        Returns:
            各层表示列表
        """
        representations = [X]
        current = X

        for size in layer_sizes:
            # 使用PCA作为简单的层次分解
            from sklearn.decomposition import PCA

            pca = PCA(n_components=min(size, current.shape[1]))
            current = pca.fit_transform(current)
            representations.append(current)

        return representations

    @staticmethod
    def disentanglement_score(representations: np.ndarray,
                              ground_truth_factors: np.ndarray) -> float:
        """
        计算解耦分数

        衡量表示中每个维度是否对应独立的生成因子

        Args:
            representations: 学习到的表示 (n_samples, n_features)
            ground_truth_factors: 真实因子 (n_samples, n_factors)

        Returns:
            解耦分数 [0, 1]
        """
        from sklearn.ensemble import RandomForestRegressor
        from sklearn.metrics import r2_score

        n_factors = ground_truth_factors.shape[1]
        n_features = representations.shape[1]

        # 对每个真实因子,找到最相关的表示维度
        importance_matrix = np.zeros((n_factors, n_features))

        for i in range(n_factors):
            # 使用随机森林评估每个特征的重要性
            rf = RandomForestRegressor(n_estimators=10, random_state=42)
            rf.fit(representations, ground_truth_factors[:, i])
            importance_matrix[i] = rf.feature_importances_

        # 计算解耦程度:每个因子应该主要由一个特征解释
        max_importance_per_factor = np.max(importance_matrix, axis=1)
        disentanglement = np.mean(max_importance_per_factor)

        return disentanglement

def distributed_representation_example():
    """分布式表示示例"""
    print("\n" + "=" * 60)
    print("分布式表示与层次表示")
    print("=" * 60)

    dr = DistributedRepresentation()

    # 表示能力分析
    print("\n1. 表示能力分析")
    print("-" * 40)

    configs = [
        {"neurons": 100, "layers": 1},
        {"neurons": 100, "layers": 3},
        {"neurons": 1000, "layers": 3},
        {"neurons": 1000, "layers": 10},
    ]

    for config in configs:
        capacity = dr.representation_capacity(config["neurons"], config["layers"])
        log_capacity = np.log10(capacity) if capacity > 0 else 0
        print(f"  神经元={config['neurons']:4d}, 层数={config['layers']:2d}: "
              f"log10(容量) = {log_capacity:.2f}")

    # 层次分解
    print("\n2. 层次分解")
    print("-" * 40)

    # 生成层次数据
    np.random.seed(42)
    n_samples = 1000

    # 底层特征(如边缘)
    low_level = np.random.randn(n_samples, 100)

    # 中层特征(如形状)- 底层特征的组合
    mid_level = np.hstack([
        np.mean(low_level[:, :50], axis=1, keepdims=True),
        np.std(low_level[:, 50:], axis=1, keepdims=True),
    ])

    # 高层特征(如物体)- 中层特征的组合
    high_level = np.hstack([
        (mid_level[:, 0] > 0).astype(float).reshape(-1, 1),  # 链式调用,连续执行多个方法  # reshape重塑张量形状
        (mid_level[:, 1] < 0.5).astype(float).reshape(-1, 1),
    ])

    X = np.hstack([low_level, mid_level, high_level])

    representations = dr.hierarchical_decomposition(X, [50, 20, 5])

    print(f"  原始数据维度: {X.shape}")
    for i, rep in enumerate(representations[1:], 1):  # enumerate同时获取索引和元素  # 切片操作取子序列
        print(f"  第{i}层表示维度: {rep.shape}")

    # 组合性分析
    print("\n3. 组合性分析")
    print("-" * 40)

    # 模拟组合关系
    compositions = [(0, 1), (2, 3), (4, 5, 6)]
    comp_results = dr.compositionality_analysis(X[:, :10], compositions)

    print(f"  平均组合分数: {comp_results['mean_composition_score']:.4f}")
    print(f"  特征相关性: {comp_results['feature_correlation_mean']:.4f}")

# 运行示例
if __name__ == "__main__":
    distributed_representation_example()

2.3 表示学习的理论保证

表示学习的目标

找到映射 \(\phi: \mathcal{X} \rightarrow \mathcal{Z}\),使得:

  1. 信息量保持\(I(X; Z)\) 最大化
  2. 下游任务友好\(Z\) 对目标任务 \(Y\) 有预测性
  3. 可解释性\(Z\) 的维度有语义意义

信息瓶颈理论

\[\min_{p(z|x)} I(X; Z) - \beta I(Z; Y)\]
  • \(I(X; Z)\):压缩(最小化)
  • \(I(Z; Y)\):预测性(最大化)
  • \(\beta\):权衡参数

对比学习理论

InfoNCE损失最小化等价于最大化表示与目标之间的互信息下界:

\[\mathcal{L}_{InfoNCE} \geq -\log N - I(Z; Y)\]
Python
class RepresentationLearningTheory:
    """
    表示学习的理论分析
    """

    @staticmethod
    def mutual_information_lower_bound(X: np.ndarray,
                                        Z: np.ndarray,
                                        method: str = 'knn') -> float:
        """
        估计互信息的下界

        Args:
            X: 原始数据
            Z: 学习到的表示
            method: 估计方法

        Returns:
            互信息估计
        """
        if method == 'knn':
            # 基于k近邻的互信息估计(Kraskov-Stögbauer-Grassberger)
            from sklearn.neighbors import NearestNeighbors

            n_samples = len(X)
            k = min(5, n_samples // 10)

            # 联合空间 (X, Z)
            XZ = np.hstack([X, Z])

            # 计算近邻
            nn_x = NearestNeighbors(n_neighbors=k+1).fit(X)
            nn_z = NearestNeighbors(n_neighbors=k+1).fit(Z)
            nn_xz = NearestNeighbors(n_neighbors=k+1).fit(XZ)

            distances_x, _ = nn_x.kneighbors(X)
            distances_z, _ = nn_z.kneighbors(Z)
            distances_xz, _ = nn_xz.kneighbors(XZ)

            # 互信息估计
            eps_x = 2 * distances_x[:, k]
            eps_z = 2 * distances_z[:, k]
            eps_xz = 2 * distances_xz[:, k]

            # 简化估计
            mi_estimate = (np.mean(np.log(eps_x + 1e-10)) +
                          np.mean(np.log(eps_z + 1e-10)) -
                          np.mean(np.log(eps_xz + 1e-10)))

            return max(0, mi_estimate)

        return 0.0

    @staticmethod
    def information_bottleneck_objective(X: np.ndarray,
                                          Y: np.ndarray,
                                          Z: np.ndarray,
                                          beta: float = 1.0) -> float:
        """
        信息瓶颈目标函数

        min I(X; Z) - beta * I(Z; Y)

        Args:
            X: 输入
            Y: 目标
            Z: 表示
            beta: 权衡参数

        Returns:
            IB目标值
        """
        # 估计各项互信息
        I_XZ = RepresentationLearningTheory.mutual_information_lower_bound(X, Z)
        I_ZY = RepresentationLearningTheory.mutual_information_lower_bound(Z, Y.reshape(-1, 1) if Y.ndim == 1 else Y)

        # IB目标
        objective = I_XZ - beta * I_ZY

        return objective

    @staticmethod
    def contrastive_learning_bound(n_negative: int) -> float:
        """
        对比学习的互信息下界

        InfoNCE >= -log(N) - I(Z; Y)

        Args:
            n_negative: 负样本数

        Returns:
            下界常数
        """
        return -np.log(n_negative)

    @staticmethod
    def representation_quality_metrics(Z: np.ndarray,
                                        Y: np.ndarray,
                                        task_type: str = 'classification') -> dict:
        """
        评估表示质量的多个指标

        Args:
            Z: 表示
            Y: 标签
            task_type: 任务类型

        Returns:
            质量指标字典
        """
        metrics = {}

        # 1. 线性可分性
        from sklearn.linear_model import LogisticRegression
        from sklearn.model_selection import cross_val_score

        if task_type == 'classification':
            clf = LogisticRegression(max_iter=1000, random_state=42)
            scores = cross_val_score(clf, Z, Y, cv=5)
            metrics['linear_separability'] = np.mean(scores)

        # 2. 表示的紧凑性
        # 计算特征值的熵
        from sklearn.decomposition import PCA
        pca = PCA().fit(Z)
        eigenvalues = pca.explained_variance_ratio_

        # 有效维度
        entropy = -np.sum(eigenvalues * np.log(eigenvalues + 1e-10))
        metrics['effective_dimensionality'] = np.exp(entropy)
        metrics['explained_variance_top10'] = np.sum(eigenvalues[:10])

        # 3. 类内/类间方差比
        if task_type == 'classification':
            classes = np.unique(Y)
            overall_mean = np.mean(Z, axis=0)

            # 类间散度
            S_b = np.zeros((Z.shape[1], Z.shape[1]))
            # 类内散度
            S_w = np.zeros((Z.shape[1], Z.shape[1]))

            for c in classes:
                Z_c = Z[Y == c]
                mean_c = np.mean(Z_c, axis=0)

                S_b += len(Z_c) * np.outer(mean_c - overall_mean, mean_c - overall_mean)
                S_w += np.cov(Z_c.T) * (len(Z_c) - 1)

            # Fisher ratio
            if np.linalg.det(S_w) > 1e-10:
                fisher_ratio = np.trace(S_b) / np.trace(S_w)
                metrics['fisher_ratio'] = fisher_ratio

        return metrics

def representation_theory_example():
    """表示学习理论示例"""
    print("\n" + "=" * 60)
    print("表示学习理论")
    print("=" * 60)

    rlt = RepresentationLearningTheory()

    # 生成模拟数据
    np.random.seed(42)
    n_samples = 500

    # 输入数据
    X = np.random.randn(n_samples, 50)

    # 目标(基于X的某些维度)
    Y = (X[:, 0] + X[:, 1] > 0).astype(int)

    # 学习到的表示(模拟)
    Z = np.hstack([
        X[:, :2] + 0.1 * np.random.randn(n_samples, 2),  # 相关信息
        0.1 * np.random.randn(n_samples, 8)  # 噪声
    ])

    # 互信息估计
    print("\n1. 互信息分析")
    print("-" * 40)

    I_XZ = rlt.mutual_information_lower_bound(X, Z)
    I_ZY = rlt.mutual_information_lower_bound(Z, Y.reshape(-1, 1))

    print(f"  I(X; Z) ≈ {I_XZ:.4f}")
    print(f"  I(Z; Y) ≈ {I_ZY:.4f}")

    # 信息瓶颈
    print("\n2. 信息瓶颈分析")
    print("-" * 40)

    for beta in [0.1, 1.0, 10.0]:
        ib_obj = rlt.information_bottleneck_objective(X, Y, Z, beta)
        print(f"  β={beta:5.1f}: IB目标 = {ib_obj:.4f}")

    # 对比学习界
    print("\n3. 对比学习理论界")
    print("-" * 40)

    for n_neg in [10, 100, 1000, 10000]:
        bound = rlt.contrastive_learning_bound(n_neg)
        print(f"  负样本数={n_neg:5d}: 下界 = {bound:.4f}")

    # 表示质量评估
    print("\n4. 表示质量指标")
    print("-" * 40)

    metrics = rlt.representation_quality_metrics(Z, Y)

    for metric, value in metrics.items():
        print(f"  {metric}: {value:.4f}")

# 运行示例
if __name__ == "__main__":
    representation_theory_example()

3. 优化景观分析

3.1 临界点分析

临界点定义:损失函数 \(\mathcal{L}(\theta)\) 的梯度为零的点:

\[\nabla_\theta \mathcal{L}(\theta) = 0\]

临界点分类

  1. 局部极小值:Hessian矩阵正定(所有特征值 > 0)
  2. 局部极大值:Hessian矩阵负定(所有特征值 < 0)
  3. 鞍点:Hessian矩阵不定(有正有负特征值)

Hessian矩阵

\[H_{ij} = \frac{\partial^2 \mathcal{L}}{\partial \theta_i \partial \theta_j}\]

临界点数量定理

对于深度网络,临界点数量随参数数量指数增长,但鞍点占主导地位。

Python
class CriticalPointAnalysis:
    """
    临界点分析与优化景观
    """

    @staticmethod
    def classify_critical_point(hessian_eigenvalues: np.ndarray,
                                 tolerance: float = 1e-6) -> str:
        """
        根据Hessian特征值分类临界点

        Args:
            hessian_eigenvalues: Hessian矩阵特征值
            tolerance: 数值容差

        Returns:
            临界点类型
        """
        n_positive = np.sum(hessian_eigenvalues > tolerance)
        n_negative = np.sum(hessian_eigenvalues < -tolerance)
        n_zero = len(hessian_eigenvalues) - n_positive - n_negative

        if n_zero > len(hessian_eigenvalues) * 0.1:
            return "degenerate"  # 退化临界点
        elif n_positive > 0 and n_negative > 0:
            return "saddle"  # 鞍点
        elif n_positive == len(hessian_eigenvalues):
            return "local_minimum"  # 局部极小值
        elif n_negative == len(hessian_eigenvalues):
            return "local_maximum"  # 局部极大值
        else:
            return "unknown"

    @staticmethod
    def hessian_spectrum_analysis(model: torch.nn.Module,
                                   data_loader: torch.utils.data.DataLoader,
                                   loss_fn: Callable,
                                   n_top_eigenvalues: int = 20) -> dict:
        """
        分析Hessian矩阵的谱

        Args:
            model: PyTorch模型
            data_loader: 数据加载器
            loss_fn: 损失函数
            n_top_eigenvalues: 计算的最大特征值数量

        Returns:
            谱分析结果
        """
        model.eval()  # eval()开启评估模式(关闭Dropout等)

        # 收集梯度
        for batch in data_loader:
            inputs, targets = batch

            model.zero_grad()  # 清零梯度,防止梯度累积
            outputs = model(inputs)
            loss = loss_fn(outputs, targets)
            loss.backward()  # 反向传播计算梯度

            # 提取梯度
            grads = []
            for param in model.parameters():
                if param.grad is not None:
                    grads.append(param.grad.flatten())

            gradient = torch.cat(grads)
            break  # 只用一个batch

        # 使用幂迭代估计最大特征值
        def hessian_vector_product(v):
            """计算Hessian-向量积"""
            model.zero_grad()
            for batch in data_loader:
                inputs, targets = batch
                outputs = model(inputs)
                loss = loss_fn(outputs, targets)

                # 一阶梯度
                grads = torch.autograd.grad(loss, model.parameters(), create_graph=True)
                flat_grads = torch.cat([g.flatten() for g in grads])

                # 梯度与v的内积
                grad_v = torch.sum(flat_grads * v)

                # 二阶梯度
                hvp = torch.autograd.grad(grad_v, model.parameters(), retain_graph=True)
                flat_hvp = torch.cat([g.flatten() for g in hvp])

                return flat_hvp

        # 幂迭代求最大特征值
        v = torch.randn_like(gradient)
        v = v / torch.norm(v)

        for _ in range(50):  # 迭代次数
            Hv = hessian_vector_product(v)
            v = Hv / torch.norm(Hv)

        # 瑞利商估计特征值
        Hv = hessian_vector_product(v)
        max_eigenvalue = torch.dot(v, Hv).item()  # .item()将单元素张量转为Python数值

        results = {
            'max_eigenvalue': max_eigenvalue,
            'gradient_norm': torch.norm(gradient).item(),
        }

        return results

    @staticmethod
    def loss_landscape_1d(model: torch.nn.Module,
                          direction: torch.Tensor,
                          data_loader: torch.utils.data.DataLoader,
                          loss_fn: Callable,
                          alpha_range: tuple[float, float] = (-1, 1),
                          n_points: int = 50) -> tuple[np.ndarray, np.ndarray]:
        """
        沿一维方向分析损失景观

        Args:
            model: 模型
            direction: 方向向量
            data_loader: 数据加载器
            loss_fn: 损失函数
            alpha_range: 参数范围
            n_points: 采样点数

        Returns:
            (alpha值, 损失值)
        """
        # 保存原始参数
        original_params = {}
        for name, param in model.named_parameters():
            original_params[name] = param.data.clone()

        alphas = np.linspace(alpha_range[0], alpha_range[1], n_points)
        losses = []

        for alpha in alphas:
            # 沿方向移动参数
            idx = 0
            for name, param in model.named_parameters():
                param_size = param.numel()
                param_direction = direction[idx:idx+param_size].reshape(param.shape)
                param.data = original_params[name] + alpha * param_direction
                idx += param_size

            # 计算损失
            total_loss = 0
            n_batches = 0
            with torch.no_grad():  # 禁用梯度计算,节省内存
                for batch in data_loader:
                    inputs, targets = batch
                    outputs = model(inputs)
                    loss = loss_fn(outputs, targets)
                    total_loss += loss.item()
                    n_batches += 1

            losses.append(total_loss / n_batches)

        # 恢复原始参数
        for name, param in model.named_parameters():
            param.data = original_params[name]

        return alphas, np.array(losses)

def critical_point_example():
    """临界点分析示例"""
    print("\n" + "=" * 60)
    print("临界点分析与优化景观")
    print("=" * 60)

    cpa = CriticalPointAnalysis()

    # 临界点分类
    print("\n1. 临界点分类")
    print("-" * 40)

    # 局部极小值
    min_eigenvalues = np.array([1.0, 0.5, 0.3, 0.1])
    print(f"  特征值 {min_eigenvalues}: {cpa.classify_critical_point(min_eigenvalues)}")

    # 局部极大值
    max_eigenvalues = np.array([-1.0, -0.5, -0.3, -0.1])
    print(f"  特征值 {max_eigenvalues}: {cpa.classify_critical_point(max_eigenvalues)}")

    # 鞍点
    saddle_eigenvalues = np.array([1.0, 0.5, -0.3, -0.1])
    print(f"  特征值 {saddle_eigenvalues}: {cpa.classify_critical_point(saddle_eigenvalues)}")

    # 退化临界点
    degenerate_eigenvalues = np.array([1.0, 0.0, 0.0, -0.1])
    print(f"  特征值 {degenerate_eigenvalues}: {cpa.classify_critical_point(degenerate_eigenvalues)}")

    # 高维分析
    print("\n2. 高维Hessian分析")
    print("-" * 40)

    dimensions = [10, 100, 1000, 10000]

    for dim in dimensions:
        # 模拟Hessian特征值分布
        # 大多数特征值接近0(鞍点特征)
        eigenvalues = np.random.randn(dim) * 0.1
        eigenvalues[0] = 1.0  # 一个正的大特征值
        eigenvalues[1] = -0.5  # 一个负的特征值

        critical_type = cpa.classify_critical_point(eigenvalues)
        n_positive = np.sum(eigenvalues > 1e-6)
        n_negative = np.sum(eigenvalues < -1e-6)

        print(f"  维度={dim:5d}: 类型={critical_type:15s}, "
              f"正特征值={n_positive:4d}, 负特征值={n_negative:4d}")

# 运行示例
if __name__ == "__main__":
    critical_point_example()

3.2 鞍点问题与逃离

鞍点问题:在高维非凸优化中,鞍点比局部极小值更常见。

鞍点比例定理:对于高维高斯随机场,鞍点数量按维度指数增长,而局部极小值比例趋于零。

逃离鞍点的方法

  1. 随机梯度下降(SGD):噪声自然帮助逃离鞍点
  2. 扰动梯度下降(PGD):添加随机扰动
  3. 二阶方法:利用Hessian信息
  4. 负曲率利用:沿负曲率方向更新

逃离时间分析

对于严格鞍点(Hessian有负特征值),SGD以高概率在 \(O(\log(1/\epsilon))\) 步内逃离。

Python
class SaddlePointEscape:
    """
    鞍点逃离理论与算法
    """

    @staticmethod
    def is_strict_saddle(hessian_eigenvalues: np.ndarray,
                         tolerance: float = 1e-3) -> bool:
        """
        检查是否为严格鞍点

        严格鞍点:至少有一个负特征值

        Args:
            hessian_eigenvalues: Hessian特征值
            tolerance: 容差

        Returns:
            是否为严格鞍点
        """
        return np.any(hessian_eigenvalues < -tolerance)

    @staticmethod
    def escape_direction(hessian_eigenvalues: np.ndarray,
                         hessian_eigenvectors: np.ndarray) -> np.ndarray:
        """
        计算鞍点逃离方向

        沿最负曲率方向

        Args:
            hessian_eigenvalues: Hessian特征值
            hessian_eigenvectors: Hessian特征向量

        Returns:
            逃离方向
        """
        # 找到最负的特征值
        min_idx = np.argmin(hessian_eigenvalues)

        if hessian_eigenvalues[min_idx] < 0:
            # 返回对应的特征向量
            return hessian_eigenvectors[:, min_idx]
        else:
            # 不是鞍点,返回随机方向
            return np.random.randn(len(hessian_eigenvalues))

    @staticmethod
    def perturbed_gradient_descent_step(params: np.ndarray,
                                        gradient: np.ndarray,
                                        learning_rate: float,
                                        perturbation_scale: float = 0.1) -> np.ndarray:
        """
        扰动梯度下降步骤

        Args:
            params: 当前参数
            gradient: 梯度
            learning_rate: 学习率
            perturbation_scale: 扰动尺度

        Returns:
            更新后的参数
        """
        # 添加随机扰动
        perturbation = np.random.randn(len(params)) * perturbation_scale

        # 梯度下降 + 扰动
        new_params = params - learning_rate * gradient + perturbation

        return new_params

    @staticmethod
    def cubic_regularization_step(params: np.ndarray,
                                   gradient: np.ndarray,
                                   hessian: np.ndarray,
                                   learning_rate: float = 0.01,
                                   regularization: float = 0.1) -> np.ndarray:
        """
        三次正则化牛顿步骤

        利用二阶信息逃离鞍点

        Args:
            params: 当前参数
            gradient: 梯度
            hessian: Hessian矩阵
            learning_rate: 学习率
            regularization: 正则化参数

        Returns:
            更新后的参数
        """
        # 修改Hessian以逃离鞍点
        eigenvalues, eigenvectors = np.linalg.eigh(hessian)

        # 将负特征值变为正
        modified_eigenvalues = np.abs(eigenvalues) + regularization

        # 重构修改后的Hessian
        modified_hessian = eigenvectors @ np.diag(modified_eigenvalues) @ eigenvectors.T

        # 牛顿更新
        new_params = params - learning_rate * np.linalg.solve(modified_hessian, gradient)

        return new_params

    @staticmethod
    def estimate_escape_time(initial_distance: float,
                             negative_curvature: float,
                             learning_rate: float,
                             noise_scale: float) -> float:
        """
        估计逃离鞍点的时间

        Args:
            initial_distance: 初始距离
            negative_curvature: 负曲率大小
            learning_rate: 学习率
            noise_scale: 噪声尺度

        Returns:
            估计逃离时间
        """
        # 简化分析:指数增长逃离
        # d(t) = d_0 * exp(|lambda| * t)
        # 逃离时间:t = log(r/d_0) / |lambda|

        target_distance = 1.0  # 目标逃离距离

        if negative_curvature > 0:
            escape_time = np.log(target_distance / initial_distance) / negative_curvature
        else:
            escape_time = np.inf

        return escape_time

def saddle_point_escape_example():
    """鞍点逃离示例"""
    print("\n" + "=" * 60)
    print("鞍点逃离理论")
    print("=" * 60)

    spe = SaddlePointEscape()

    # 鞍点检测
    print("\n1. 严格鞍点检测")
    print("-" * 40)

    test_cases = [
        np.array([1.0, 0.5, 0.1]),  # 局部极小值
        np.array([1.0, -0.5, 0.1]),  # 严格鞍点
        np.array([-1.0, -0.5, -0.1]),  # 局部极大值
        np.array([0.1, 0.01, -0.01]),  # 近似退化鞍点
    ]

    for i, eigenvalues in enumerate(test_cases):
        is_strict = spe.is_strict_saddle(eigenvalues)
        print(f"  特征值 {eigenvalues}: 严格鞍点 = {is_strict}")

    # 逃离方向
    print("\n2. 逃离方向计算")
    print("-" * 40)

    eigenvalues = np.array([2.0, 1.0, -0.5, -1.5])
    eigenvectors = np.eye(4)  # 单位矩阵作为特征向量

    escape_dir = spe.escape_direction(eigenvalues, eigenvectors)
    print(f"  特征值: {eigenvalues}")
    print(f"  逃离方向: {escape_dir}")
    print(f"  最负特征值索引: {np.argmin(eigenvalues)}")

    # 逃离时间估计
    print("\n3. 逃离时间估计")
    print("-" * 40)

    scenarios = [
        {"curvature": 0.1, "lr": 0.01, "noise": 0.1},
        {"curvature": 0.5, "lr": 0.01, "noise": 0.1},
        {"curvature": 1.0, "lr": 0.01, "noise": 0.1},
    ]

    for scenario in scenarios:
        escape_time = spe.estimate_escape_time(
            initial_distance=0.01,
            negative_curvature=scenario["curvature"],
            learning_rate=scenario["lr"],
            noise_scale=scenario["noise"]
        )
        print(f"  曲率={scenario['curvature']:.1f}: 估计逃离时间 ≈ {escape_time:.2f} 步")

# 运行示例
if __name__ == "__main__":
    saddle_point_escape_example()

3.3 平坦极小值与泛化

平坦极小值:损失函数在极小值点附近的曲率较小。

数学定义

  • 几何平坦性:\(\|\nabla^2 \mathcal{L}(\theta)\|\)
  • 权重扰动鲁棒性:\(\mathcal{L}(\theta + \delta) - \mathcal{L}(\theta)\)

平坦性与泛化关系

理论表明,平坦极小值通常对应更好的泛化性能。

寻找平坦极小值的方法

  1. Sharpness-Aware Minimization (SAM): $\(\min_\theta \max_{\|\epsilon\| \leq \rho} \mathcal{L}(\theta + \epsilon)\)$

  2. 随机权重平均(SWA):平均多个检查点

  3. 大学习率训练:倾向于收敛到平坦区域

Python
class FlatMinima:
    """
    平坦极小值理论与算法
    """

    @staticmethod
    def sharpness_measure(loss_fn: Callable,
                          params: np.ndarray,
                          epsilon: float = 0.001) -> float:
        """
        计算极小值的尖锐度

        Args:
            loss_fn: 损失函数
            params: 参数
            epsilon: 扰动大小

        Returns:
            尖锐度度量
        """
        base_loss = loss_fn(params)

        # 随机扰动
        perturbations = np.random.randn(10, len(params)) * epsilon

        sharpness_values = []
        for pert in perturbations:
            perturbed_loss = loss_fn(params + pert)
            sharpness_values.append(perturbed_loss - base_loss)

        return np.mean(sharpness_values)

    @staticmethod
    def sam_update(loss_fn: Callable,
                   params: np.ndarray,
                   learning_rate: float = 0.01,
                   rho: float = 0.05) -> np.ndarray:
        """
        Sharpness-Aware Minimization (SAM) 更新

        Args:
            loss_fn: 损失函数
            params: 当前参数
            learning_rate: 学习率
            rho: 扰动半径

        Returns:
            更新后的参数
        """
        # 计算梯度
        eps = 1e-5
        gradient = np.zeros_like(params)

        for i in range(len(params)):
            params_plus = params.copy()
            params_minus = params.copy()
            params_plus[i] += eps
            params_minus[i] -= eps

            gradient[i] = (loss_fn(params_plus) - loss_fn(params_minus)) / (2 * eps)

        # 计算扰动方向
        epsilon_w = rho * gradient / (np.linalg.norm(gradient) + 1e-10)

        # 在扰动点计算梯度
        perturbed_params = params + epsilon_w

        perturbed_gradient = np.zeros_like(params)
        for i in range(len(params)):
            params_plus = perturbed_params.copy()
            params_minus = perturbed_params.copy()
            params_plus[i] += eps
            params_minus[i] -= eps

            perturbed_gradient[i] = (loss_fn(params_plus) - loss_fn(params_minus)) / (2 * eps)

        # SAM更新
        new_params = params - learning_rate * perturbed_gradient

        return new_params

    @staticmethod
    def stochastic_weight_averaging(weight_history: list[np.ndarray],
                                     start_epoch: int = None) -> np.ndarray:
        """
        随机权重平均 (SWA)

        Args:
            weight_history: 权重历史列表
            start_epoch: 开始平均的epoch

        Returns:
            平均权重
        """
        if start_epoch is None:
            start_epoch = len(weight_history) // 2

        # 平均后期权重
        weights_to_average = weight_history[start_epoch:]

        averaged_weights = np.mean(weights_to_average, axis=0)

        return averaged_weights

    @staticmethod
    def hessian_based_flatness(hessian_eigenvalues: np.ndarray) -> dict:
        """
        基于Hessian的平坦度度量

        Args:
            hessian_eigenvalues: Hessian特征值

        Returns:
            平坦度指标字典
        """
        metrics = {}

        # 迹(所有特征值之和)
        metrics['trace'] = np.sum(hessian_eigenvalues)

        # 最大特征值
        metrics['max_eigenvalue'] = np.max(hessian_eigenvalues)

        # 有效平坦度:大于阈值的特征值数量
        threshold = 1e-3
        metrics['effective_flatness'] = np.sum(hessian_eigenvalues < threshold)

        # 特征值分布熵
        normalized_eigenvalues = hessian_eigenvalues / (np.sum(hessian_eigenvalues) + 1e-10)
        entropy = -np.sum(normalized_eigenvalues * np.log(normalized_eigenvalues + 1e-10))
        metrics['eigenvalue_entropy'] = entropy

        # 平坦度评分(越高越平坦)
        metrics['flatness_score'] = 1.0 / (1.0 + metrics['trace'])

        return metrics

def flat_minima_example():
    """平坦极小值示例"""
    print("\n" + "=" * 60)
    print("平坦极小值与泛化")
    print("=" * 60)

    fm = FlatMinima()

    # 定义测试损失函数
    def sharp_loss(x):
        """尖锐损失函数"""
        return x[0]**2 + 100*x[1]**2

    def flat_loss(x):
        """平坦损失函数"""
        return x[0]**2 + x[1]**2

    # 尖锐度测量
    print("\n1. 尖锐度测量")
    print("-" * 40)

    x_sharp = np.array([0.0, 0.0])
    x_flat = np.array([0.0, 0.0])

    sharpness_sharp = fm.sharpness_measure(sharp_loss, x_sharp, epsilon=0.1)
    sharpness_flat = fm.sharpness_measure(flat_loss, x_flat, epsilon=0.1)

    print(f"  尖锐损失函数在(0,0)的尖锐度: {sharpness_sharp:.6f}")
    print(f"  平坦损失函数在(0,0)的尖锐度: {sharpness_flat:.6f}")
    print(f"  比值: {sharpness_sharp/sharpness_flat:.2f}x")

    # Hessian分析
    print("\n2. Hessian分析")
    print("-" * 40)

    # 尖锐函数的Hessian
    hessian_sharp = np.array([[2, 0], [0, 200]])
    eigenvalues_sharp = np.linalg.eigvalsh(hessian_sharp)

    # 平坦函数的Hessian
    hessian_flat = np.array([[2, 0], [0, 2]])
    eigenvalues_flat = np.linalg.eigvalsh(hessian_flat)

    metrics_sharp = fm.hessian_based_flatness(eigenvalues_sharp)
    metrics_flat = fm.hessian_based_flatness(eigenvalues_flat)

    print("  尖锐极小值:")
    for k, v in metrics_sharp.items():
        print(f"    {k}: {v:.4f}")

    print("  平坦极小值:")
    for k, v in metrics_flat.items():
        print(f"    {k}: {v:.4f}")

    # SWA
    print("\n3. 随机权重平均 (SWA)")
    print("-" * 40)

    # 模拟训练轨迹
    weight_history = []
    for i in range(100):
        # 模拟收敛到不同点
        weight = np.array([1.0, 1.0]) + np.random.randn(2) * 0.1 / (i + 1)
        weight_history.append(weight)

    swa_weights = fm.stochastic_weight_averaging(weight_history, start_epoch=50)
    final_weights = weight_history[-1]

    print(f"  最终权重: {final_weights}")
    print(f"  SWA权重: {swa_weights}")
    print(f"  SWA权重方差: {np.var(weight_history[50:], axis=0)}")

# 运行示例
if __name__ == "__main__":
    flat_minima_example()

4. 神经正切核理论

4.1 NTK的推导与性质

神经正切核(Neural Tangent Kernel, NTK)

对于神经网络 \(f(x; \theta)\),NTK定义为:

\[\Theta(x, x') = \nabla_\theta f(x; \theta)^T \nabla_\theta f(x'; \theta)\]

无限宽度极限

当网络宽度趋于无穷时,NTK在训练过程中保持不变(kernel regime)。

训练动态

在kernel regime下,网络输出演化遵循:

\[\frac{\partial f(X; \theta_t)}{\partial t} = -\eta \Theta(X, X)(f(X; \theta_t) - Y)\]

其中 \(\Theta(X, X)\) 是NTK矩阵。

解析解

\[f(X; \theta_t) = Y + e^{-\eta \Theta t}(f(X; \theta_0) - Y)\]
Python
class NeuralTangentKernel:
    """
    神经正切核 (NTK) 理论与计算
    """

    @staticmethod
    def ntk_matrix(X: np.ndarray,
                   network_fn: Callable,
                   params: np.ndarray) -> np.ndarray:
        """
        计算NTK矩阵

        Args:
            X: 输入数据 (n_samples, n_features)
            network_fn: 网络函数 f(x, params)
            params: 网络参数

        Returns:
            NTK矩阵 (n_samples, n_samples)
        """
        n_samples = len(X)
        n_params = len(params)

        # 计算每个样本的梯度
        jacobian = np.zeros((n_samples, n_params))

        eps = 1e-7
        for i in range(n_samples):
            grad = np.zeros(n_params)
            for j in range(n_params):
                params_plus = params.copy()
                params_minus = params.copy()
                params_plus[j] += eps
                params_minus[j] -= eps

                grad[j] = (network_fn(X[i], params_plus) -
                          network_fn(X[i], params_minus)) / (2 * eps)

            jacobian[i] = grad

        # NTK = J @ J^T
        ntk = jacobian @ jacobian.T

        return ntk

    @staticmethod
    def infinite_width_ntk(X: np.ndarray,
                           activation: str = 'relu',
                           depth: int = 1) -> np.ndarray:
        """
        计算无限宽度网络的NTK

        对于ReLU激活,NTK有闭式解

        Args:
            X: 输入数据
            activation: 激活函数类型
            depth: 网络深度

        Returns:
            NTK矩阵
        """
        # 计算输入的内积矩阵
        Sigma = X @ X.T

        # 归一化
        norms = np.sqrt(np.diag(Sigma))
        Sigma = Sigma / np.outer(norms, norms)

        # 限制在[-1, 1]
        Sigma = np.clip(Sigma, -1, 1)

        if activation == 'relu':
            # ReLU的NTK
            # K = ||x|| ||x'|| * (sin(theta) + (pi - theta)cos(theta)) / (2pi)
            # 其中 theta = arccos(<x, x'> / (||x|| ||x'||))

            theta = np.arccos(Sigma)

            # 基础NTK
            K = (np.sin(theta) + (np.pi - theta) * np.cos(theta)) / (2 * np.pi)

            # 多层NTK
            ntk = K.copy()
            for _ in range(depth - 1):
                theta = np.arccos(np.clip(ntk, -1, 1))
                ntk = (np.sin(theta) + (np.pi - theta) * np.cos(theta)) / (2 * np.pi)

        elif activation == 'tanh':
            # tanh的近似NTK
            ntk = Sigma

        else:
            # 线性激活
            ntk = Sigma

        return ntk

    @staticmethod
    def kernel_dynamics(ntk_matrix: np.ndarray,
                        initial_predictions: np.ndarray,
                        targets: np.ndarray,
                        learning_rate: float = 0.01,
                        n_steps: int = 1000) -> np.ndarray:
        """
        模拟kernel regime下的训练动态

        Args:
            ntk_matrix: NTK矩阵
            initial_predictions: 初始预测
            targets: 目标值
            learning_rate: 学习率
            n_steps: 训练步数

        Returns:
            最终预测
        """
        # 解析解:f(t) = Y + exp(-eta * Theta * t) * (f(0) - Y)
        residual = initial_predictions - targets

        # 矩阵指数
        from scipy.linalg import expm

        evolution_matrix = expm(-learning_rate * ntk_matrix * n_steps)

        final_predictions = targets + evolution_matrix @ residual

        return final_predictions

    @staticmethod
    def lazy_training_condition(n_samples: int,
                                 n_params: int,
                                 learning_rate: float) -> bool:
        """
        判断是否处于lazy training regime

        Lazy training: 参数变化很小,网络近似线性

        Args:
            n_samples: 样本数
            n_params: 参数数
            learning_rate: 学习率

        Returns:
            是否在lazy regime
        """
        # 启发式条件:参数数 >> 样本数
        # 且学习率足够小

        wide_condition = n_params > n_samples * 10
        small_lr_condition = learning_rate < 1.0 / np.sqrt(n_params)

        return wide_condition and small_lr_condition

def ntk_example():
    """NTK示例"""
    print("\n" + "=" * 60)
    print("神经正切核 (NTK) 理论")
    print("=" * 60)

    ntk = NeuralTangentKernel()

    # 生成数据
    np.random.seed(42)
    n_samples = 50
    X = np.random.randn(n_samples, 10)

    # 归一化
    X = X / np.linalg.norm(X, axis=1, keepdims=True)

    # 计算无限宽度NTK
    print("\n1. 无限宽度NTK")
    print("-" * 40)

    K_relu = ntk.infinite_width_ntk(X, activation='relu', depth=1)
    K_relu_deep = ntk.infinite_width_ntk(X, activation='relu', depth=3)

    print(f"  输入形状: {X.shape}")
    print(f"  NTK矩阵形状: {K_relu.shape}")
    print(f"  浅层NTK条件数: {np.linalg.cond(K_relu):.2f}")
    print(f"  深层NTK条件数: {np.linalg.cond(K_relu_deep):.2f}")

    # NTK特征值分析
    eigenvalues = np.linalg.eigvalsh(K_relu)
    print(f"  NTK特征值范围: [{eigenvalues[0]:.4f}, {eigenvalues[-1]:.4f}]")
    print(f"  有效秩: {np.sum(eigenvalues > 1e-6)}")

    # Kernel regime训练动态
    print("\n2. Kernel Regime训练动态")
    print("-" * 40)

    # 模拟回归任务
    y_true = np.sin(X[:, 0] * 3)
    f_0 = np.zeros(n_samples)  # 初始预测为0

    learning_rates = [0.001, 0.01, 0.1]

    for lr in learning_rates:
        f_final = ntk.kernel_dynamics(K_relu, f_0, y_true, lr, n_steps=1000)
        mse = np.mean((f_final - y_true)**2)
        print(f"  学习率={lr:.3f}: 最终MSE = {mse:.6f}")

    # Lazy training条件
    print("\n3. Lazy Training条件")
    print("-" * 40)

    configs = [
        {"samples": 100, "params": 1000, "lr": 0.01},
        {"samples": 1000, "params": 10000, "lr": 0.001},
        {"samples": 100, "params": 100, "lr": 0.1},
    ]

    for config in configs:
        is_lazy = ntk.lazy_training_condition(
            config["samples"], config["params"], config["lr"]
        )
        print(f"  样本={config['samples']:4d}, 参数={config['params']:5d}, "
              f"lr={config['lr']:.3f}: Lazy regime = {is_lazy}")

# 运行示例
if __name__ == "__main__":
    ntk_example()

4.2 宽网络的训练动态

宽网络的特点

  1. 线性化:在初始化附近,网络行为近似线性
  2. 高斯过程:无限宽度网络等价于高斯过程
  3. 确定性收敛:训练动态可由NTK完全描述

特征学习 vs Lazy Training

  • Lazy Training:参数变化小,NTK近似恒定
  • 特征学习:参数变化大,网络学习数据表示

过渡条件

从lazy training到特征学习的过渡由以下因素决定: - 网络宽度 - 学习率 - 初始化尺度

Python
class WideNetworkDynamics:
    """
    宽网络的训练动态分析
    """

    @staticmethod
    def linearized_network(x: np.ndarray,
                           params: np.ndarray,
                           initial_params: np.ndarray,
                           initial_output: Callable) -> float:
        """
        线性化网络近似

        f_lin(x) = f(x; theta_0) + ∇f(x; theta_0)^T (theta - theta_0)

        Args:
            x: 输入
            params: 当前参数
            initial_params: 初始参数
            initial_output: 初始输出函数

        Returns:
            线性化输出
        """
        # 初始输出
        f_0 = initial_output(x)

        # 参数变化
        delta_params = params - initial_params

        # 这里简化处理,实际应计算梯度
        # 假设梯度近似为1
        linearized_output = f_0 + np.sum(delta_params) * 0.01

        return linearized_output

    @staticmethod
    def gp_prior_covariance(X: np.ndarray,
                            activation: str = 'relu',
                            depth: int = 1) -> np.ndarray:
        """
        无限宽度网络的高斯过程先验协方差

        Args:
            X: 输入数据
            activation: 激活函数
            depth: 深度

        Returns:
            GP先验协方差矩阵
        """
        # 与NTK类似,但描述的是输出的先验分布
        Sigma = X @ X.T
        norms = np.sqrt(np.diag(Sigma))
        Sigma = Sigma / np.outer(norms, norms + 1e-10)
        Sigma = np.clip(Sigma, -1, 1)

        if activation == 'relu':
            # ReLU NNGP核 (Cho & Saul, 2009)
            theta = np.arccos(Sigma)
            K = (np.sin(theta) + (np.pi - theta) * np.cos(theta)) / (2 * np.pi) * norms.reshape(-1, 1) * norms
        else:
            K = Sigma

        return K

    @staticmethod
    def feature_learning_regime(width: int,
                                 n_samples: int,
                                 learning_rate: float,
                                 init_scale: float = 1.0) -> bool:
        """
        判断是否处于特征学习regime

        与lazy training相反

        Args:
            width: 网络宽度
            n_samples: 样本数
            learning_rate: 学习率
            init_scale: 初始化尺度

        Returns:
            是否在特征学习regime
        """
        # 特征学习条件:
        # 1. 网络不太宽
        # 2. 学习率较大
        # 3. 初始化尺度较大

        not_too_wide = width < n_samples * 5
        large_lr = learning_rate > 0.1 / width
        large_init = init_scale > 1.0

        return not_too_wide and (large_lr or large_init)

    @staticmethod
    def training_regime_diagram(widths: list[int],
                                 learning_rates: list[float],
                                 n_samples: int = 100) -> np.ndarray:
        """
        生成训练相图

        Args:
            widths: 宽度列表
            learning_rates: 学习率列表
            n_samples: 样本数

        Returns:
            相图矩阵
        """
        diagram = np.zeros((len(widths), len(learning_rates)))

        for i, width in enumerate(widths):
            for j, lr in enumerate(learning_rates):
                is_lazy = NeuralTangentKernel.lazy_training_condition(
                    n_samples, width * 100, lr  # 假设每层100参数
                )
                is_feature = WideNetworkDynamics.feature_learning_regime(
                    width, n_samples, lr
                )

                # 0: lazy, 1: intermediate, 2: feature learning
                if is_lazy and not is_feature:
                    diagram[i, j] = 0
                elif is_feature and not is_lazy:
                    diagram[i, j] = 2
                else:
                    diagram[i, j] = 1

        return diagram

def wide_network_dynamics_example():
    """宽网络动态示例"""
    print("\n" + "=" * 60)
    print("宽网络的训练动态")
    print("=" * 60)

    wnd = WideNetworkDynamics()

    # GP先验
    print("\n1. 高斯过程先验")
    print("-" * 40)

    np.random.seed(42)
    X = np.random.randn(20, 5)
    X = X / np.linalg.norm(X, axis=1, keepdims=True)

    K_gp = wnd.gp_prior_covariance(X, activation='relu')

    print(f"  GP协方差矩阵形状: {K_gp.shape}")
    print(f"  对角线元素(方差): {np.diag(K_gp)[:5]}")
    print(f"  矩阵正定性: {np.all(np.linalg.eigvalsh(K_gp) > -1e-10)}")

    # 训练regime分析
    print("\n2. 训练Regime分析")
    print("-" * 40)

    configs = [
        {"width": 10, "samples": 100, "lr": 0.01, "init": 1.0},
        {"width": 1000, "samples": 100, "lr": 0.001, "init": 0.1},
        {"width": 100, "samples": 100, "lr": 0.1, "init": 2.0},
    ]

    for config in configs:
        is_lazy = NeuralTangentKernel.lazy_training_condition(
            config["samples"], config["width"] * 100, config["lr"]
        )
        is_feature = wnd.feature_learning_regime(
            config["width"], config["samples"], config["lr"], config["init"]
        )

        regime = "Lazy" if is_lazy else ("Feature Learning" if is_feature else "Intermediate")
        print(f"  宽度={config['width']:4d}, lr={config['lr']:.3f}: {regime}")

# 运行示例
if __name__ == "__main__":
    wide_network_dynamics_example()

4.3 NTK与泛化的关系

NTK视角下的泛化

在kernel regime下,泛化性能由NTK的谱性质决定:

  1. 特征值衰减:决定收敛速度
  2. 有效维度:决定模型复杂度
  3. 核对齐:NTK与目标函数的匹配程度

泛化误差界

对于NTK回归,泛化误差:

\[R(f) \leq \tilde{O}\left(\frac{\|f^*\|_{\mathcal{H}}^2}{n} + \frac{\sigma^2 \text{tr}(\Theta)}{n}\right)\]

其中 \(\|f^*\|_{\mathcal{H}}\) 是目标函数的RKHS范数。

Python
class NTKGeneralization:
    """
    NTK与泛化的关系
    """

    @staticmethod
    def rkhs_norm(target_function: Callable,
                  ntk_matrix: np.ndarray,
                  X: np.ndarray) -> float:
        """
        估计目标函数的RKHS范数

        ||f||_H^2 = f^T Theta^{-1} f

        Args:
            target_function: 目标函数
            ntk_matrix: NTK矩阵
            X: 输入数据

        Returns:
            RKHS范数估计
        """
        # 计算目标函数在数据点的值
        y = np.array([target_function(x) for x in X])

        # 添加正则化
        reg_ntk = ntk_matrix + 1e-6 * np.eye(len(ntk_matrix))

        # RKHS范数
        norm_squared = y @ np.linalg.solve(reg_ntk, y)

        return np.sqrt(norm_squared)

    @staticmethod
    def generalization_bound_ntk(n_samples: int,
                                  rkhs_norm: float,
                                  ntk_trace: float,
                                  noise_variance: float = 0.1) -> float:
        """
        NTK回归的泛化误差界

        Args:
            n_samples: 样本数
            rkhs_norm: RKHS范数
            ntk_trace: NTK矩阵的迹
            noise_variance: 噪声方差

        Returns:
            泛化误差上界
        """
        # 简化界:O(||f*||^2 / n + sigma^2 * tr(Theta) / n)
        bound = (rkhs_norm**2 / n_samples +
                noise_variance * ntk_trace / n_samples)

        return bound

    @staticmethod
    def kernel_alignment(K: np.ndarray,
                         y: np.ndarray) -> float:
        """
        计算核与目标的匹配度

        Args:
            K: 核矩阵
            y: 目标值

        Returns:
            对齐分数
        """
        # 中心化
        K_centered = K - K.mean(axis=0, keepdims=True) - K.mean(axis=1, keepdims=True) + K.mean()
        y_centered = y - y.mean()

        # 对齐分数
        alignment = (y_centered @ K_centered @ y_centered) / (np.linalg.norm(K_centered, 'fro') * np.linalg.norm(y_centered)**2)

        return alignment

    @staticmethod
    def effective_dimension(ntk_eigenvalues: np.ndarray,
                           regularization: float = 1e-3) -> float:
        """
        计算有效维度

        d_eff = sum(lambda_i / (lambda_i + lambda))

        Args:
            ntk_eigenvalues: NTK特征值
            regularization: 正则化参数

        Returns:
            有效维度
        """
        effective_dim = np.sum(ntk_eigenvalues / (ntk_eigenvalues + regularization))

        return effective_dim

def ntk_generalization_example():
    """NTK泛化示例"""
    print("\n" + "=" * 60)
    print("NTK与泛化")
    print("=" * 60)

    ntk_gen = NTKGeneralization()

    # 生成数据
    np.random.seed(42)
    n_samples = 100
    X = np.random.randn(n_samples, 10)
    X = X / np.linalg.norm(X, axis=1, keepdims=True)

    # 目标函数
    def target_fn(x):
        return np.sin(3 * x[0]) + 0.5 * np.cos(2 * x[1])

    y = np.array([target_fn(x) for x in X])

    # 计算NTK
    K = NeuralTangentKernel.infinite_width_ntk(X, activation='relu')

    # RKHS范数
    print("\n1. RKHS范数分析")
    print("-" * 40)

    rkhs_norm = ntk_gen.rkhs_norm(target_fn, K, X)
    print(f"  目标函数RKHS范数: {rkhs_norm:.4f}")

    # 泛化界
    print("\n2. 泛化误差界")
    print("-" * 40)

    ntk_trace = np.trace(K)
    bound = ntk_gen.generalization_bound_ntk(n_samples, rkhs_norm, ntk_trace)
    print(f"  NTK迹: {ntk_trace:.2f}")
    print(f"  泛化误差界: {bound:.4f}")

    # 核对齐
    print("\n3. 核对齐分析")
    print("-" * 40)

    alignment = ntk_gen.kernel_alignment(K, y)
    print(f"  核对齐分数: {alignment:.4f}")
    print(f"  对齐程度: {'高' if alignment > 0.5 else '中' if alignment > 0.2 else '低'}")

    # 有效维度
    print("\n4. 有效维度")
    print("-" * 40)

    eigenvalues = np.linalg.eigvalsh(K)

    for reg in [1e-3, 1e-2, 1e-1]:
        eff_dim = ntk_gen.effective_dimension(eigenvalues, reg)
        print(f"  正则化={reg:.3f}: 有效维度 = {eff_dim:.2f}")

# 运行示例
if __name__ == "__main__":
    ntk_generalization_example()

5. 深度学习的归纳偏置

5.1 架构的归纳偏置

归纳偏置:学习算法在学习过程中对解的偏好。

CNN的归纳偏置: - 局部连接:空间局部性 - 权重共享:平移等变性 - 层次结构:从局部到全局

Transformer的归纳偏置: - 自注意力:全局依赖 - 位置编码:序列顺序 - 多头机制:多子空间表示

RNN的归纳偏置: - 时间递归:序列建模 - 参数共享:时间平移不变性

Python
class InductiveBias:
    """
    深度学习的归纳偏置分析
    """

    @staticmethod
    def cnn_inductive_bias(input_shape: tuple[int, ...],
                           kernel_size: int) -> dict:
        """
        分析CNN的归纳偏置

        Args:
            input_shape: 输入形状
            kernel_size: 卷积核大小

        Returns:
            归纳偏置分析
        """
        analysis = {}

        # 局部感受野比例
        total_pixels = np.prod(input_shape)
        local_pixels = kernel_size ** len(input_shape)
        analysis['local_receptive_field_ratio'] = local_pixels / total_pixels

        # 平移等变性
        analysis['translation_equivariance'] = True

        # 参数共享程度
        # 每个输出位置共享相同卷积核
        analysis['weight_sharing_factor'] = total_pixels / local_pixels

        return analysis

    @staticmethod
    def transformer_inductive_bias(seq_length: int,
                                    d_model: int,
                                    n_heads: int) -> dict:
        """
        分析Transformer的归纳偏置

        Args:
            seq_length: 序列长度
            d_model: 模型维度
            n_heads: 注意力头数

        Returns:
            归纳偏置分析
        """
        analysis = {}

        # 全局依赖范围
        analysis['receptive_field'] = seq_length  # 全局

        # 排列等变性(无位置编码时)
        analysis['permutation_equivariance'] = True

        # 多头表示的子空间数
        analysis['subspace_dimension'] = d_model // n_heads

        # 计算复杂度
        analysis['attention_complexity'] = seq_length ** 2

        return analysis

    @staticmethod
    def gnn_inductive_bias(graph: np.ndarray,
                            aggregation: str = 'mean') -> dict:
        """
        分析GNN的归纳偏置

        Args:
            graph: 邻接矩阵
            aggregation: 聚合函数

        Returns:
            归纳偏置分析
        """
        analysis = {}

        # 置换等变性
        analysis['permutation_equivariance'] = True

        # 局部邻域大小
        degrees = np.sum(graph, axis=1)
        analysis['mean_neighborhood_size'] = np.mean(degrees)
        analysis['max_neighborhood_size'] = np.max(degrees)

        # 图同构不变性
        analysis['graph_isomorphism_invariance'] = (aggregation in ['sum', 'mean', 'max'])

        return analysis

    @staticmethod
    def compare_architectures(input_specs: dict) -> dict:
        """
        比较不同架构的归纳偏置

        Args:
            input_specs: 输入规格

        Returns:
            比较结果
        """
        comparison = {}

        # CNN
        cnn_bias = InductiveBias.cnn_inductive_bias(
            input_specs.get('image_shape', (32, 32, 3)),
            kernel_size=3
        )
        comparison['CNN'] = cnn_bias

        # Transformer
        transformer_bias = InductiveBias.transformer_inductive_bias(
            seq_length=input_specs.get('seq_length', 100),
            d_model=input_specs.get('d_model', 512),
            n_heads=input_specs.get('n_heads', 8)
        )
        comparison['Transformer'] = transformer_bias

        return comparison

def inductive_bias_example():
    """归纳偏置示例"""
    print("\n" + "=" * 60)
    print("深度学习的归纳偏置")
    print("=" * 60)

    ib = InductiveBias()

    # CNN归纳偏置
    print("\n1. CNN归纳偏置")
    print("-" * 40)

    cnn_bias = ib.cnn_inductive_bias((32, 32, 3), kernel_size=3)
    for k, v in cnn_bias.items():
        print(f"  {k}: {v}")

    # Transformer归纳偏置
    print("\n2. Transformer归纳偏置")
    print("-" * 40)

    transformer_bias = ib.transformer_inductive_bias(100, 512, 8)
    for k, v in transformer_bias.items():
        if isinstance(v, float):  # isinstance检查对象类型
            print(f"  {k}: {v:.2f}")
        else:
            print(f"  {k}: {v}")

    # GNN归纳偏置
    print("\n3. GNN归纳偏置")
    print("-" * 40)

    # 创建示例图
    graph = np.array([
        [0, 1, 1, 0],
        [1, 0, 1, 1],
        [1, 1, 0, 1],
        [0, 1, 1, 0]
    ])

    gnn_bias = ib.gnn_inductive_bias(graph)
    for k, v in gnn_bias.items():
        if isinstance(v, float):
            print(f"  {k}: {v:.2f}")
        else:
            print(f"  {k}: {v}")

# 运行示例
if __name__ == "__main__":
    inductive_bias_example()

5.2 优化算法的隐式偏置

SGD的隐式偏置

  1. 解的选择:SGD倾向于收敛到平坦极小值
  2. 正则化效果:小批量噪声有正则化作用
  3. 早停效应:防止过拟合

Adam的隐式偏置

  1. 自适应学习率:不同参数不同更新速度
  2. 动量效应:加速收敛
  3. 偏差校正:初期步长调整

数学分析

SGD可以看作是对损失函数的随机梯度流:

\[d\theta_t = -\nabla \mathcal{L}(\theta_t) dt + \sqrt{\eta \Sigma(\theta_t)} dW_t\]

其中 \(\Sigma\) 是梯度协方差,\(W_t\) 是维纳过程。

Python
class ImplicitBias:
    """
    优化算法的隐式偏置
    """

    @staticmethod
    def sgd_implicit_regularization(gradient_variance: float,
                                     learning_rate: float,
                                     n_iterations: int) -> float:
        """
        估计SGD的隐式正则化效果

        Args:
            gradient_variance: 梯度方差
            learning_rate: 学习率
            n_iterations: 迭代次数

        Returns:
            隐式正则化强度
        """
        # SGD噪声的隐式正则化 ≈ (eta/2) * tr(Sigma)
        implicit_reg = 0.5 * learning_rate * gradient_variance * n_iterations

        return implicit_reg

    @staticmethod
    def flat_minima_preference(optimization_trajectory: list[np.ndarray],
                                loss_fn: Callable) -> dict:
        """
        分析优化轨迹对平坦极小值的偏好

        Args:
            optimization_trajectory: 参数轨迹
            loss_fn: 损失函数

        Returns:
            平坦度分析
        """
        analysis = {}

        # 计算每个点的损失
        losses = [loss_fn(params) for params in optimization_trajectory]

        # 计算损失变化率
        loss_changes = np.diff(losses)

        # 最终损失的平坦度(扰动测试)
        final_params = optimization_trajectory[-1]
        perturbations = np.random.randn(10, len(final_params)) * 0.01

        final_loss = loss_fn(final_params)
        perturbed_losses = [loss_fn(final_params + pert) for pert in perturbations]

        analysis['final_loss'] = final_loss
        analysis['loss_sensitivity'] = np.std(perturbed_losses)
        analysis['convergence_rate'] = np.mean(np.abs(loss_changes[-10:]))

        return analysis

    @staticmethod
    def adaptive_optimizer_bias(first_moment: np.ndarray,
                                 second_moment: np.ndarray,
                                 epsilon: float = 1e-8) -> np.ndarray:
        """
        分析自适应优化器的更新偏置

        Args:
            first_moment: 一阶矩估计
            second_moment: 二阶矩估计
            epsilon: 数值稳定性常数

        Returns:
            有效学习率比率
        """
        # Adam的有效学习率 ∝ 1 / sqrt(v_t)
        effective_lr_ratio = 1.0 / (np.sqrt(second_moment) + epsilon)

        # 归一化
        effective_lr_ratio = effective_lr_ratio / np.mean(effective_lr_ratio)

        return effective_lr_ratio

    @staticmethod
    def gradient_flow_approximation(gradient_fn: Callable,
                                    initial_params: np.ndarray,
                                    n_steps: int = 100,
                                    step_size: float = 0.01) -> list[np.ndarray]:
        """
        模拟梯度流

        Args:
            gradient_fn: 梯度函数
            initial_params: 初始参数
            n_steps: 步数
            step_size: 步长

        Returns:
            参数轨迹
        """
        trajectory = [initial_params.copy()]
        params = initial_params.copy()

        for _ in range(n_steps):
            gradient = gradient_fn(params)
            params = params - step_size * gradient
            trajectory.append(params.copy())

        return trajectory

def implicit_bias_example():
    """隐式偏置示例"""
    print("\n" + "=" * 60)
    print("优化算法的隐式偏置")
    print("=" * 60)

    ib = ImplicitBias()

    # SGD隐式正则化
    print("\n1. SGD隐式正则化")
    print("-" * 40)

    configs = [
        {"var": 0.1, "lr": 0.01, "iter": 1000},
        {"var": 0.5, "lr": 0.01, "iter": 1000},
        {"var": 0.1, "lr": 0.1, "iter": 1000},
    ]

    for config in configs:
        reg = ib.sgd_implicit_regularization(
            config["var"], config["lr"], config["iter"]
        )
        print(f"  方差={config['var']:.1f}, lr={config['lr']:.2f}: "
              f"隐式正则化 = {reg:.4f}")

    # 自适应优化器偏置
    print("\n2. 自适应优化器偏置")
    print("-" * 40)

    # 模拟梯度统计
    m = np.array([0.1, 0.5, 1.0, 2.0])  # 一阶矩
    v = np.array([0.01, 0.1, 0.5, 1.0])  # 二阶矩

    lr_ratio = ib.adaptive_optimizer_bias(m, v)

    print("  梯度统计:")
    print(f"    一阶矩: {m}")
    print(f"    二阶矩: {v}")
    print(f"    有效lr比率: {lr_ratio}")
    print(f"    最大/最小比率: {np.max(lr_ratio)/np.min(lr_ratio):.2f}x")

# 运行示例
if __name__ == "__main__":
    implicit_bias_example()

5.3 数据分布的隐式影响

数据分布对学习的影响

  1. 长尾分布:类别不平衡导致偏置
  2. 分布偏移:训练/测试分布不一致
  3. 数据增强:隐式正则化效果

标签噪声的鲁棒性

深度网络对标签噪声有一定鲁棒性,但严重噪声会导致过拟合。

记忆与泛化

  • 深度网络能记忆随机标签
  • 但优先学习简单模式( simplicity bias)
Python
class DataDistributionImpact:
    """
    数据分布对学习的隐式影响
    """

    @staticmethod
    def long_tail_analysis(class_counts: np.ndarray) -> dict:
        """
        分析长尾分布的影响

        Args:
            class_counts: 各类别样本数

        Returns:
            分析结果
        """
        analysis = {}

        # 不平衡比率
        analysis['imbalance_ratio'] = np.max(class_counts) / np.min(class_counts)

        # Gini系数
        sorted_counts = np.sort(class_counts)
        n = len(class_counts)
        index = np.arange(1, n + 1)
        analysis['gini_coefficient'] = (2 * np.sum(index * sorted_counts)) / (n * np.sum(sorted_counts)) - (n + 1) / n

        # 有效类别数
        probs = class_counts / np.sum(class_counts)
        analysis['effective_num_classes'] = np.exp(-np.sum(probs * np.log(probs)))

        return analysis

    @staticmethod
    def label_noise_robustness(true_labels: np.ndarray,
                                noisy_labels: np.ndarray,
                                predictions: np.ndarray) -> dict:
        """
        分析对标签噪声的鲁棒性

        Args:
            true_labels: 真实标签
            noisy_labels: 带噪声标签
            predictions: 模型预测

        Returns:
            鲁棒性分析
        """
        analysis = {}

        # 噪声率
        noise_rate = np.mean(true_labels != noisy_labels)
        analysis['noise_rate'] = noise_rate

        # 在干净样本上的准确率
        clean_mask = true_labels == noisy_labels
        analysis['accuracy_on_clean'] = np.mean(predictions[clean_mask] == true_labels[clean_mask])

        # 在噪声样本上的准确率
        analysis['accuracy_on_noisy'] = np.mean(predictions[~clean_mask] == true_labels[~clean_mask])

        # 总体准确率
        analysis['overall_accuracy'] = np.mean(predictions == true_labels)

        return analysis

    @staticmethod
    def simplicity_bias_measure(train_losses: np.ndarray,
                                 complexity_measures: np.ndarray) -> float:
        """
        测量简单性偏置

        网络优先学习简单模式

        Args:
            train_losses: 不同复杂度下的训练损失
            complexity_measures: 复杂度度量

        Returns:
            简单性偏置分数
        """
        # 简单性偏置:低复杂度样本损失下降更快
        # 计算损失与复杂度的相关性
        correlation = np.corrcoef(train_losses, complexity_measures)[0, 1]

        # 负相关表示简单性偏置
        simplicity_bias = -correlation

        return simplicity_bias

    @staticmethod
    def distribution_shift_detection(train_data: np.ndarray,
                                     test_data: np.ndarray) -> dict:
        """
        检测训练/测试分布偏移

        Args:
            train_data: 训练数据
             test_data: 测试数据

        Returns:
            分布偏移分析
        """
        analysis = {}

        # 均值偏移
        train_mean = np.mean(train_data, axis=0)
        test_mean = np.mean(test_data, axis=0)
        analysis['mean_shift'] = np.linalg.norm(train_mean - test_mean)

        # 协方差偏移
        train_cov = np.cov(train_data.T)
        test_cov = np.cov(test_data.T)
        analysis['covariance_shift'] = np.linalg.norm(train_cov - test_cov, 'fro')

        # 最大均值差异 (MMD)
        mmd = np.linalg.norm(train_mean - test_mean) ** 2
        analysis['mmd_squared'] = mmd

        return analysis

def data_distribution_example():
    """数据分布影响示例"""
    print("\n" + "=" * 60)
    print("数据分布的隐式影响")
    print("=" * 60)

    ddi = DataDistributionImpact()

    # 长尾分布分析
    print("\n1. 长尾分布分析")
    print("-" * 40)

    # 模拟长尾分布
    class_counts = np.array([1000, 500, 200, 100, 50, 20, 10, 5])

    long_tail = ddi.long_tail_analysis(class_counts)
    for k, v in long_tail.items():
        print(f"  {k}: {v:.4f}")

    # 标签噪声鲁棒性
    print("\n2. 标签噪声鲁棒性")
    print("-" * 40)

    np.random.seed(42)
    n_samples = 1000
    n_classes = 10

    true_labels = np.random.randint(0, n_classes, n_samples)
    # 添加20%噪声
    noise_mask = np.random.rand(n_samples) < 0.2
    noisy_labels = true_labels.copy()
    noisy_labels[noise_mask] = np.random.randint(0, n_classes, np.sum(noise_mask))

    # 模拟预测(80%准确)
    predictions = true_labels.copy()
    error_mask = np.random.rand(n_samples) < 0.2
    predictions[error_mask] = np.random.randint(0, n_classes, np.sum(error_mask))

    robustness = ddi.label_noise_robustness(true_labels, noisy_labels, predictions)
    for k, v in robustness.items():
        print(f"  {k}: {v:.4f}")

    # 分布偏移检测
    print("\n3. 分布偏移检测")
    print("-" * 40)

    train_data = np.random.randn(500, 10)
    test_data = np.random.randn(500, 10) + 0.5  # 有偏移

    shift = ddi.distribution_shift_detection(train_data, test_data)
    for k, v in shift.items():
        print(f"  {k}: {v:.4f}")

# 运行示例
if __name__ == "__main__":
    data_distribution_example()

6. 学习动态与训练理论

6.1 训练过程的相变

相变现象

训练过程中,网络经历不同阶段: 1. 初始化阶段:随机行为 2. 拟合阶段:学习简单模式 3. 细化阶段:优化细节 4. 收敛阶段:接近极小值

损失景观的演化

\[\mathcal{L}(\theta_t) = \mathcal{L}_{signal}(\theta_t) + \mathcal{L}_{noise}(\theta_t)\]

信号部分先收敛,噪声部分后被记忆。

Python
class TrainingPhaseTransitions:
    """
    训练过程的相变分析
    """

    @staticmethod
    def detect_phase_transitions(loss_history: np.ndarray,
                                  gradient_norm_history: np.ndarray) -> dict:
        """
        检测训练相变

        Args:
            loss_history: 损失历史
            gradient_norm_history: 梯度范数历史

        Returns:
            相变检测结果
        """
        phases = {}

        # 计算损失变化率
        loss_changes = np.abs(np.diff(loss_history))

        # 检测快速下降阶段
        rapid_descent_threshold = np.percentile(loss_changes, 75)
        rapid_descent_epochs = np.where(loss_changes > rapid_descent_threshold)[0]

        if len(rapid_descent_epochs) > 0:
            phases['rapid_descent_end'] = rapid_descent_epochs[-1]

        # 检测收敛阶段(梯度范数稳定)
        gradient_variance = np.array([
            np.var(gradient_norm_history[max(0, i-10):i+1])
            for i in range(len(gradient_norm_history))
        ])

        convergence_threshold = np.percentile(gradient_variance, 10)
        convergence_candidates = np.where(gradient_variance < convergence_threshold)[0]

        if len(convergence_candidates) > 0:
            phases['convergence_start'] = convergence_candidates[0]

        # 检测平台期
        plateau_threshold = np.percentile(loss_changes, 25)
        plateau_epochs = np.where(loss_changes < plateau_threshold)[0]

        phases['n_plateaus'] = len(plateau_epochs)

        return phases

    @staticmethod
    def signal_noise_decomposition(predictions: np.ndarray,
                                    true_labels: np.ndarray,
                                    noise_level: float = 0.1) -> dict:
        """
        分解信号和噪声的学习

        Args:
            predictions: 预测历史
            true_labels: 真实标签
            noise_level: 噪声水平

        Returns:
            信号和噪声学习分析
        """
        decomposition = {}

        # 假设前一半epoch学习信号,后一半记忆噪声
        n_epochs = len(predictions)
        mid_point = n_epochs // 2

        # 信号学习准确率
        signal_acc = np.mean(predictions[:mid_point] == true_labels)
        decomposition['signal_learning_accuracy'] = signal_acc

        # 整体准确率
        overall_acc = np.mean(predictions == true_labels)
        decomposition['overall_accuracy'] = overall_acc

        # 噪声记忆程度
        noise_memorization = overall_acc - signal_acc
        decomposition['noise_memorization'] = noise_memorization

        return decomposition

    @staticmethod
    def learning_speed_analysis(loss_history: np.ndarray,
                                 epoch_window: int = 10) -> dict:
        """
        分析学习速度的变化

        Args:
            loss_history: 损失历史
            epoch_window: 计算窗口

        Returns:
            学习速度分析
        """
        analysis = {}

        # 计算滑动窗口内的平均下降速度
        learning_speeds = []
        for i in range(epoch_window, len(loss_history)):
            speed = (loss_history[i-epoch_window] - loss_history[i]) / epoch_window
            learning_speeds.append(speed)

        learning_speeds = np.array(learning_speeds)

        analysis['max_learning_speed'] = np.max(learning_speeds)
        analysis['mean_learning_speed'] = np.mean(learning_speeds)
        analysis['final_learning_speed'] = learning_speeds[-1]

        # 检测学习速度显著下降的点
        speed_threshold = analysis['max_learning_speed'] * 0.1
        slowdown_points = np.where(learning_speeds < speed_threshold)[0]

        if len(slowdown_points) > 0:
            analysis['learning_slowdown_epoch'] = slowdown_points[0] + epoch_window

        return analysis

def training_phases_example():
    """训练相变示例"""
    print("\n" + "=" * 60)
    print("训练过程的相变")
    print("=" * 60)

    tpt = TrainingPhaseTransitions()

    # 模拟训练历史
    np.random.seed(42)
    n_epochs = 200

    # 模拟损失曲线(快速下降后缓慢收敛)
    epochs = np.arange(n_epochs)
    loss_history = 1.0 * np.exp(-epochs / 20) + 0.1 + 0.01 * np.random.randn(n_epochs)

    # 模拟梯度范数
    gradient_norms = 1.0 * np.exp(-epochs / 30) + 0.05 + 0.005 * np.random.randn(n_epochs)

    # 相变检测
    print("\n1. 相变检测")
    print("-" * 40)

    phases = tpt.detect_phase_transitions(loss_history, gradient_norms)
    for k, v in phases.items():
        print(f"  {k}: {v}")

    # 学习速度分析
    print("\n2. 学习速度分析")
    print("-" * 40)

    speed_analysis = tpt.learning_speed_analysis(loss_history)
    for k, v in speed_analysis.items():
        if isinstance(v, float):
            print(f"  {k}: {v:.6f}")
        else:
            print(f"  {k}: {v}")

# 运行示例
if __name__ == "__main__":
    training_phases_example()

6.2 学习率调度理论

学习率的重要性

学习率是影响训练的最关键超参数之一。

调度策略

  1. Step Decay:每隔一定epoch降低学习率
  2. Exponential Decay:指数衰减 $\(\eta_t = \eta_0 \cdot \gamma^t\)$

  3. Cosine Annealing: $\(\eta_t = \eta_{min} + \frac{1}{2}(\eta_{max} - \eta_{min})(1 + \cos(\frac{t}{T}\pi))\)$

  4. Warmup:初始阶段线性增加

理论依据

  • 大学习率:探索损失景观
  • 小学习率:精细优化
  • 学习率与批量大小成正比
Python
class LearningRateTheory:
    """
    学习率调度理论
    """

    @staticmethod
    def step_decay_schedule(initial_lr: float,
                            epoch: int,
                            drop_every: int = 30,
                            drop_factor: float = 0.1) -> float:
        """
        Step decay学习率调度

        Args:
            initial_lr: 初始学习率
            epoch: 当前epoch
            drop_every: 每隔多少epoch降低
            drop_factor: 降低因子

        Returns:
            当前学习率
        """
        n_drops = epoch // drop_every
        lr = initial_lr * (drop_factor ** n_drops)
        return lr

    @staticmethod
    def exponential_decay_schedule(initial_lr: float,
                                    epoch: int,
                                    decay_rate: float = 0.95) -> float:
        """
        指数衰减学习率

        Args:
            initial_lr: 初始学习率
            epoch: 当前epoch
            decay_rate: 衰减率

        Returns:
            当前学习率
        """
        lr = initial_lr * (decay_rate ** epoch)
        return lr

    @staticmethod
    def cosine_annealing_schedule(initial_lr: float,
                                   epoch: int,
                                   total_epochs: int,
                                   min_lr: float = 0.0) -> float:
        """
        余弦退火学习率调度

        Args:
            initial_lr: 初始学习率
            epoch: 当前epoch
            total_epochs: 总epoch数
            min_lr: 最小学习率

        Returns:
            当前学习率
        """
        import math
        lr = min_lr + 0.5 * (initial_lr - min_lr) * (1 + math.cos(math.pi * epoch / total_epochs))
        return lr

    @staticmethod
    def warmup_schedule(initial_lr: float,
                         epoch: int,
                         warmup_epochs: int) -> float:
        """
        Warmup学习率调度

        Args:
            initial_lr: 目标学习率
            epoch: 当前epoch
            warmup_epochs: warmup epoch数

        Returns:
            当前学习率
        """
        if epoch < warmup_epochs:
            return initial_lr * (epoch + 1) / warmup_epochs
        return initial_lr

    @staticmethod
    def lr_batch_size_scaling(base_lr: float,
                               base_batch_size: int,
                               new_batch_size: int) -> float:
        """
        根据批量大小缩放学习率

        线性缩放规则:lr ∝ batch_size

        Args:
            base_lr: 基础学习率
            base_batch_size: 基础批量大小
            new_batch_size: 新批量大小

        Returns:
            缩放后的学习率
        """
        return base_lr * (new_batch_size / base_batch_size)

def learning_rate_example():
    """学习率调度示例"""
    print("\n" + "=" * 60)
    print("学习率调度理论")
    print("=" * 60)

    lr_theory = LearningRateTheory()

    # 不同调度策略比较
    print("\n1. 学习率调度策略比较")
    print("-" * 40)

    initial_lr = 0.1
    total_epochs = 100

    epochs = [0, 25, 50, 75, 99]

    print(f"{'Epoch':<10} {'Step':<12} {'Exponential':<15} {'Cosine':<12}")
    print("-" * 55)

    for epoch in epochs:
        step_lr = lr_theory.step_decay_schedule(initial_lr, epoch, drop_every=30)
        exp_lr = lr_theory.exponential_decay_schedule(initial_lr, epoch, 0.97)
        cos_lr = lr_theory.cosine_annealing_schedule(initial_lr, epoch, total_epochs)
        print(f"{epoch:<10} {step_lr:<12.6f} {exp_lr:<15.6f} {cos_lr:<12.6f}")

    # Warmup
    print("\n2. Warmup调度")
    print("-" * 40)

    warmup_epochs = 10
    for epoch in [0, 5, 9, 10, 20]:
        lr = lr_theory.warmup_schedule(initial_lr, epoch, warmup_epochs)
        print(f"  Epoch {epoch:2d}: lr = {lr:.6f}")

    # 批量大小缩放
    print("\n3. 批量大小缩放")
    print("-" * 40)

    base_lr = 0.01
    base_bs = 256

    batch_sizes = [64, 128, 256, 512, 1024]
    for bs in batch_sizes:
        scaled_lr = lr_theory.lr_batch_size_scaling(base_lr, base_bs, bs)
        print(f"  Batch size {bs:4d}: lr = {scaled_lr:.6f}")

# 运行示例
if __name__ == "__main__":
    learning_rate_example()

6.3 训练收敛理论

收敛性定义

算法收敛如果: $\(\lim_{t \to \infty} \mathcal{L}(\theta_t) = \mathcal{L}^*\)$

其中 \(\mathcal{L}^*\) 是最优损失。

凸优化收敛率

算法 强凸
GD \(O(e^{-t/\kappa})\) \(O(1/t)\)
SGD \(O(1/t)\) \(O(1/\sqrt{t})\)
Nesterov \(O(e^{-t/\sqrt{\kappa}})\) \(O(1/t^2)\)

非凸优化

  • 只能保证收敛到一阶稳定点(梯度为零)
  • 收敛率通常为 \(O(1/\sqrt{t})\)

提前停止

在验证损失开始上升前停止训练,防止过拟合。

Python
class ConvergenceTheory:
    """
    训练收敛理论
    """

    @staticmethod
    def convex_convergence_rate(iteration: int,
                                 condition_number: float = 100.0,
                                 algorithm: str = 'gd') -> float:
        """
        计算凸优化的理论收敛率

        Args:
            iteration: 迭代次数
            condition_number: 条件数
            algorithm: 算法类型

        Returns:
            误差上界
        """
        if algorithm == 'gd':
            # 梯度下降: O(e^(-t/κ))
            return np.exp(-iteration / condition_number)
        elif algorithm == 'sgd':
            # SGD: O(1/t)
            return 1.0 / iteration
        elif algorithm == 'nesterov':
            # Nesterov加速: O(e^(-t/√κ))
            return np.exp(-iteration / np.sqrt(condition_number))
        else:
            return 1.0 / iteration

    @staticmethod
    def nonconvex_convergence_rate(iteration: int,
                                    lipschitz_constant: float = 1.0) -> float:
        """
        非凸优化的收敛率

        Args:
            iteration: 迭代次数
            lipschitz_constant: Lipschitz常数

        Returns:
            梯度范数上界
        """
        # 非凸: O(1/√t)
        return lipschitz_constant / np.sqrt(iteration)

    @staticmethod
    def early_stopping_criterion(validation_losses: np.ndarray,
                                  patience: int = 10) -> int:
        """
        提前停止准则

        Args:
            validation_losses: 验证损失历史
            patience: 容忍次数

        Returns:
            建议停止的epoch
        """
        best_loss = float('inf')
        best_epoch = 0

        for epoch, loss in enumerate(validation_losses):
            if loss < best_loss:
                best_loss = loss
                best_epoch = epoch
            elif epoch - best_epoch >= patience:
                return epoch

        return len(validation_losses)

    @staticmethod
    def convergence_diagnostic(loss_history: np.ndarray,
                                gradient_norm_history: np.ndarray = None) -> dict:
        """
        收敛诊断

        Args:
            loss_history: 损失历史
            gradient_norm_history: 梯度范数历史

        Returns:
            诊断结果
        """
        diagnostic = {}

        # 损失下降速度
        if len(loss_history) > 1:
            loss_changes = np.diff(loss_history)
            diagnostic['mean_loss_change'] = np.mean(loss_changes)
            diagnostic['recent_loss_change'] = np.mean(loss_changes[-10:])

        # 是否收敛
        if gradient_norm_history is not None:
            diagnostic['final_gradient_norm'] = gradient_norm_history[-1]
            diagnostic['converged'] = gradient_norm_history[-1] < 1e-6

        # 损失波动
        diagnostic['loss_variance'] = np.var(loss_history[-20:])

        return diagnostic

def convergence_example():
    """收敛理论示例"""
    print("\n" + "=" * 60)
    print("训练收敛理论")
    print("=" * 60)

    ct = ConvergenceTheory()

    # 凸优化收敛率
    print("\n1. 凸优化收敛率")
    print("-" * 40)

    iterations = [10, 100, 1000, 10000]
    condition_number = 100

    print(f"{'Iter':<10} {'GD':<15} {'SGD':<15} {'Nesterov':<15}")
    print("-" * 55)

    for t in iterations:
        gd_rate = ct.convex_convergence_rate(t, condition_number, 'gd')
        sgd_rate = ct.convex_convergence_rate(t, condition_number, 'sgd')
        nest_rate = ct.convex_convergence_rate(t, condition_number, 'nesterov')
        print(f"{t:<10} {gd_rate:<15.6e} {sgd_rate:<15.6e} {nest_rate:<15.6e}")

    # 非凸收敛
    print("\n2. 非凸优化收敛率")
    print("-" * 40)

    for t in iterations:
        rate = ct.nonconvex_convergence_rate(t)
        print(f"  Iteration {t:5d}: gradient norm <= {rate:.6e}")

    # 提前停止
    print("\n3. 提前停止")
    print("-" * 40)

    # 模拟验证损失
    np.random.seed(42)
    val_losses = 1.0 * np.exp(-np.arange(100) / 30) + 0.1
    val_losses[60:] += np.linspace(0, 0.1, 40)  # 后期上升

    stop_epoch = ct.early_stopping_criterion(val_losses, patience=10)
    print(f"  建议停止epoch: {stop_epoch}")
    print(f"  最佳验证损失: {np.min(val_losses):.4f}")

# 运行示例
if __name__ == "__main__":
    convergence_example()

7. 理论前沿与挑战

7.1 开放问题

主要开放问题

  1. 泛化的完整理论
  2. 为什么过参数化网络能泛化?
  3. 隐式正则化的精确刻画

  4. 优化的全局收敛

  5. 非凸损失的全局收敛保证
  6. 鞍点逃离的精确分析

  7. 深度 vs 宽度

  8. 深度的真实作用
  9. 最优网络架构设计

  10. 表示学习的理论

  11. 什么构成好的表示?
  12. 自监督学习的理论保证

7.2 新兴研究方向

神经正切核的扩展: - 有限宽度网络的修正 - 特征学习regime的分析

平均场理论: - 无限宽度极限的替代视角 - 训练动态的宏观描述

信息论方法: - 信息瓶颈的深化 - 压缩与预测的权衡

因果推断: - 因果表示学习 - 分布外泛化

7.3 理论与实践的结合

理论指导实践

  1. 架构设计:理解归纳偏置指导架构选择
  2. 优化策略:基于理论设计学习率调度
  3. 正则化技术:理论启发的正则化方法
  4. 模型选择:基于复杂度理论的模型选择

实践反馈理论

  1. 现象发现:实验发现新现象驱动理论发展
  2. 理论验证:大规模实验验证理论预测
  3. 理论修正:根据实验结果修正理论假设
Python
class TheoryPracticeIntegration:
    """
    理论与实践的结合
    """

    @staticmethod
    def architecture_selection_guide(data_properties: dict) -> str:
        """
        基于理论指导架构选择

        Args:
            data_properties: 数据属性

        Returns:
            架构建议
        """
        if data_properties.get('spatial_structure', False):
            return "CNN - 利用局部性和平移等变性"
        elif data_properties.get('sequential', False):
            if data_properties.get('long_range_dependencies', False):
                return "Transformer - 处理长程依赖"
            else:
                return "RNN/LSTM - 序列建模"
        elif data_properties.get('graph_structure', False):
            return "GNN - 图结构数据"
        else:
            return "MLP - 通用函数逼近"

    @staticmethod
    def optimization_recommendations(model_size: int,
                                      data_size: int) -> dict:
        """
        基于理论的优化建议

        Args:
            model_size: 模型参数量
            data_size: 数据量

        Returns:
            优化建议
        """
        recommendations = {}

        # 学习率建议
        if model_size > 1e7:
            recommendations['learning_rate'] = 1e-4
            recommendations['scheduler'] = 'cosine_with_warmup'
        else:
            recommendations['learning_rate'] = 1e-3
            recommendations['scheduler'] = 'step_decay'

        # 批量大小
        if data_size > 1e6:
            recommendations['batch_size'] = 512
        else:
            recommendations['batch_size'] = 128

        # 正则化
        if model_size / data_size > 10:
            recommendations['regularization'] = 'strong'
            recommendations['dropout'] = 0.5
        else:
            recommendations['regularization'] = 'moderate'
            recommendations['dropout'] = 0.2

        return recommendations

    @staticmethod
    def theoretical_insights_summary():
        """理论洞察总结"""
        insights = {
            "泛化": [
                "平坦极小值通常泛化更好",
                "有效复杂度决定泛化能力",
                "双下降现象挑战传统理论"
            ],
            "优化": [
                "SGD隐式偏好简单解",
                "学习率调度影响收敛质量",
                "鞍点在高维中占主导"
            ],
            "表示": [
                "分布式表示具有指数级能力",
                "层次结构自动学习",
                "流形假设指导降维"
            ],
            "架构": [
                "归纳偏置决定适用场景",
                "深度提供组合能力",
                "宽度影响优化难度"
            ]
        }

        print("\n理论核心洞察:")
        print("=" * 60)

        for category, points in insights.items():
            print(f"\n{category}:")
            for point in points:
                print(f"  • {point}")

def theory_practice_example():
    """理论与实践结合示例"""
    print("\n" + "=" * 60)
    print("理论与实践的结合")
    print("=" * 60)

    tpi = TheoryPracticeIntegration()

    # 架构选择
    print("\n1. 架构选择指导")
    print("-" * 40)

    scenarios = [
        {'spatial_structure': True, 'sequential': False},
        {'spatial_structure': False, 'sequential': True, 'long_range_dependencies': True},
        {'graph_structure': True},
    ]

    for i, props in enumerate(scenarios):
        recommendation = tpi.architecture_selection_guide(props)
        print(f"  场景 {i+1}: {recommendation}")

    # 优化建议
    print("\n2. 优化建议")
    print("-" * 40)

    configs = [
        {'model_size': 1e6, 'data_size': 1e5},
        {'model_size': 1e8, 'data_size': 1e6},
    ]

    for config in configs:
        rec = tpi.optimization_recommendations(**config)
        print(f"\n  模型规模: {config['model_size']:.0e}, 数据量: {config['data_size']:.0e}")
        for k, v in rec.items():
            print(f"    {k}: {v}")

    # 理论洞察
    tpi.theoretical_insights_summary()

# 运行示例
if __name__ == "__main__":
    theory_practice_example()

总结

本文档深入探讨了机器学习的核心理论,涵盖以下关键领域:

1. 泛化理论

  • PAC学习框架提供了学习的基础理论保证
  • VC维和Rademacher复杂度量化了模型复杂度
  • 深度学习中的泛化之谜推动了对隐式正则化的研究

2. 表示学习理论

  • 流形假设解释了高维数据的低维结构
  • 分布式表示具有指数级表示能力
  • 信息瓶颈理论指导表示学习的设计

3. 优化景观分析

  • 临界点分析揭示了损失函数的几何结构
  • 鞍点问题要求优化算法具有逃离能力
  • 平坦极小值与泛化性能密切相关

4. 神经正切核理论

  • NTK描述了宽网络的训练动态
  • Kernel regime vs Feature learning regime
  • 为理解深度学习提供了新的理论视角

5. 归纳偏置

  • 架构设计蕴含特定的归纳偏置
  • 优化算法具有隐式偏置
  • 数据分布影响学习过程

6. 学习动态

  • 训练过程经历多个相变阶段
  • 学习率调度对收敛至关重要
  • 收敛理论指导训练设计

这些理论为理解和改进机器学习算法提供了坚实的基础,同时也有许多开放问题等待进一步研究。