跳转至

00 - 数学基础(进阶篇)

数学基础进阶图

深入机器学习的数学理论:从矩阵微积分到泛函分析


目录

  1. 矩阵微积分进阶
  2. 反向传播的矩阵形式
  3. 高阶导数与自动微分
  4. 流形上的优化
  5. 概率图模型
  6. 贝叶斯网络
  7. 马尔可夫随机场
  8. 因子图与和积算法
  9. 优化理论深化
  10. 收敛性分析
  11. 复杂度理论
  12. 随机优化的收敛保证
  13. 泛函分析基础
  14. 希尔伯特空间
  15. 再生核希尔伯特空间(RKHS)
  16. 核方法理论
  17. 信息论进阶
  18. 变分表示
  19. 信息瓶颈理论
  20. 率失真理论

一、矩阵微积分进阶

1.1 反向传播的矩阵形式

全连接网络的反向传播

前向传播(矩阵形式):

\[Z^{[l]} = W^{[l]} A^{[l-1]} + b^{[l]}\]
\[A^{[l]} = \sigma(Z^{[l]})\]

其中: - \(W^{[l]} \in \mathbb{R}^{n_l \times n_{l-1}}\): 第\(l\)层权重 - \(A^{[l-1]} \in \mathbb{R}^{n_{l-1} \times m}\): 第\(l-1\)层激活(\(m\)是批量大小) - \(b^{[l]} \in \mathbb{R}^{n_l \times 1}\): 偏置(广播到批量大小)

反向传播(矩阵形式):

\[dZ^{[l]} = dA^{[l]} \odot \sigma'(Z^{[l]})\]
\[dW^{[l]} = \frac{1}{m} dZ^{[l]} (A^{[l-1]})^T\]
\[db^{[l]} = \frac{1}{m} \sum_{i=1}^m dZ^{[l]}_{:,i}\]
\[dA^{[l-1]} = (W^{[l]})^T dZ^{[l]}\]
Python
import numpy as np

def forward_backward_matrix_form(X, Y, weights, biases, activation='relu'):
    """
    使用矩阵形式进行前向和反向传播

    X: 输入 (n_features, m_samples)
    Y: 标签 (n_output, m_samples)
    weights: 权重列表
    biases: 偏置列表
    """
    m = X.shape[1]
    L = len(weights)

    # 存储中间结果
    Z = []
    A = [X]

    # 前向传播
    for l in range(L):
        Z_l = weights[l] @ A[l] + biases[l]
        Z.append(Z_l)

        if l < L - 1:  # 隐藏层
            A_l = np.maximum(0, Z_l) if activation == 'relu' else np.tanh(Z_l)
        else:  # 输出层
            # Softmax
            exp_Z = np.exp(Z_l - np.max(Z_l, axis=0, keepdims=True))
            A_l = exp_Z / np.sum(exp_Z, axis=0, keepdims=True)
        A.append(A_l)

    # 计算损失 (交叉熵)
    loss = -np.sum(Y * np.log(A[-1] + 1e-8)) / m

    # 反向传播
    dW = []
    db = []

    # 输出层梯度
    dZ = A[-1] - Y  # Softmax + CrossEntropy 的简化

    for l in range(L-1, -1, -1):
        # 权重梯度
        dW_l = (dZ @ A[l].T) / m
        dW.insert(0, dW_l)

        # 偏置梯度
        db_l = np.sum(dZ, axis=1, keepdims=True) / m
        db.insert(0, db_l)

        if l > 0:  # 非输入层
            # 传播到前一层
            dA = weights[l].T @ dZ
            # 激活函数导数
            if activation == 'relu':
                dZ = dA * (Z[l-1] > 0)
            else:
                dZ = dA * (1 - np.tanh(Z[l-1])**2)

    return loss, dW, db

# 示例
np.random.seed(42)
n_features, n_hidden, n_output, m = 784, 256, 10, 100

X = np.random.randn(n_features, m)
Y = np.zeros((n_output, m))
for i in range(m):
    Y[np.random.randint(n_output), i] = 1

weights = [
    np.random.randn(n_hidden, n_features) * 0.01,
    np.random.randn(n_output, n_hidden) * 0.01
]
biases = [
    np.zeros((n_hidden, 1)),
    np.zeros((n_output, 1))
]

loss, dW, db = forward_backward_matrix_form(X, Y, weights, biases)
print(f"损失: {loss:.4f}")
print(f"dW[0] shape: {dW[0].shape}")
print(f"dW[1] shape: {dW[1].shape}")

卷积网络的反向传播

卷积层的反向传播:

\[\frac{\partial L}{\partial W} = \text{conv2d}(X, \frac{\partial L}{\partial Z}, \text{padding}=0)\]
\[\frac{\partial L}{\partial X} = \text{conv2d}(\frac{\partial L}{\partial Z}, \text{rot180}(W), \text{padding}=\text{full})\]
Python
def conv2d_backward(dZ, X, W, stride=1, padding=0):
    """
    卷积层的反向传播

    dZ: 输出梯度 (batch, out_channels, out_h, out_w)
    X: 输入 (batch, in_channels, h, w)
    W: 权重 (out_channels, in_channels, k, k)
    """
    batch, out_channels, out_h, out_w = dZ.shape
    _, in_channels, h, w = X.shape
    k = W.shape[2]

    # 权重梯度
    dW = np.zeros_like(W)
    for i in range(out_channels):
        for j in range(in_channels):
            for b in range(batch):
                dW[i, j] += conv2d_single_channel(
                    X[b, j:j+1],
                    dZ[b, i:i+1],
                    padding=0
                )[0, 0]
    dW /= batch

    # 输入梯度
    dX = np.zeros_like(X)
    W_rot = np.rot90(W, 2, axes=(2, 3))  # 旋转180度

    for b in range(batch):
        for i in range(in_channels):
            for j in range(out_channels):
                dX[b, i] += conv2d_single_channel(
                    dZ[b, j:j+1],
                    W_rot[j, i:i+1],
                    padding=k-1
                )[0, 0]

    return dW, dX

def conv2d_single_channel(X, W, padding=0):
    """单通道卷积"""
    if padding > 0:
        X = np.pad(X, ((0, 0), (0, 0), (padding, padding), (padding, padding)))

    batch, _, h, w = X.shape
    out_channels, _, k, _ = W.shape
    out_h, out_w = h - k + 1, w - k + 1

    output = np.zeros((batch, out_channels, out_h, out_w))
    for i in range(out_h):
        for j in range(out_w):
            patch = X[:, :, i:i+k, j:j+k]
            for oc in range(out_channels):
                output[:, oc, i, j] = np.sum(patch * W[oc:oc+1], axis=(1, 2, 3))

    return output

1.2 高阶导数与自动微分

Hessian矩阵的计算

Hessian-vector积(高效计算):

\[Hv = \nabla_x (\nabla_x L \cdot v)\]
Python
import torch

def hessian_vector_product(loss, params, v):
    """
    计算Hessian-vector积 Hv

    利用: Hv = ∇(∇L · v)
    """
    # 计算梯度
    grad = torch.autograd.grad(loss, params, create_graph=True, retain_graph=True)

    # 计算梯度和v的内积
    grad_v = sum((g * v_i).sum() for g, v_i in zip(grad, v))

    # 再求梯度
    Hv = torch.autograd.grad(grad_v, params, retain_graph=True)

    return Hv

# 示例
def test_hessian():
    x = torch.tensor([1.0, 2.0], requires_grad=True)

    # 函数: f(x) = x₁² + 2x₂² + 3x₁x₂
    f = x[0]**2 + 2*x[1]**2 + 3*x[0]*x[1]

    # Hessian应该是:
    # [[2, 3],
    #  [3, 4]]

    v = [torch.tensor([1.0, 0.0])]  # 测试向量
    Hv = hessian_vector_product(f, [x], v)

    print(f"Hv = {Hv[0]}")  # 应该是 [2, 3]

test_hessian()

自动微分原理

计算图与反向传播:

Text Only
前向传播: 构建计算图,存储中间结果
反向传播: 从输出开始,应用链式法则

自动微分的两种模式:
1. 前向模式 (Forward Mode): 适合输入维度小的情况
2. 反向模式 (Reverse Mode): 适合输出维度小的情况(深度学习常用)
Python
class AutoDiff:
    """简单的自动微分实现"""

    def __init__(self, value, grad_fn=None):  # __init__构造方法,创建对象时自动调用
        self.value = value
        self.grad = 0.0
        self.grad_fn = grad_fn  # 梯度函数
        self.prev = []  # 前驱节点

    def __add__(self, other):
        result = AutoDiff(self.value + other.value)
        result.prev = [self, other]
        result.grad_fn = lambda g: (g, g)  # d(a+b)/da = 1, d(a+b)/db = 1  # lambda匿名函数,简洁定义单行函数
        return result

    def __mul__(self, other):
        result = AutoDiff(self.value * other.value)
        result.prev = [self, other]
        result.grad_fn = lambda g: (g * other.value, g * self.value)
        return result

    def backward(self, grad=1.0):
        """反向传播"""
        self.grad += grad

        if self.grad_fn is not None:
            grads = self.grad_fn(grad)
            for prev, g in zip(self.prev, grads):
                prev.backward(g)  # 反向传播计算梯度

# 示例: z = (x + y) * x
x = AutoDiff(2.0)
y = AutoDiff(3.0)
z = (x + y) * x

z.backward()

print(f"z = {z.value}")  # (2+3)*2 = 10
print(f"dz/dx = {x.grad}")  # 2x + y = 7
print(f"dz/dy = {y.grad}")  # x = 2

1.3 流形上的优化

约束优化与拉格朗日乘子法

问题: \(\min_x f(x)\) s.t. \(g(x) = 0\)

拉格朗日函数: \(\mathcal{L}(x, \lambda) = f(x) + \lambda^T g(x)\)

KKT条件: 1. 稳定性: \(\nabla_x \mathcal{L} = 0\) 2. 原始可行性: \(g(x) = 0\) 3. 对偶可行性: \(\lambda \geq 0\)(不等式约束) 4. 互补松弛: \(\lambda_i g_i(x) = 0\)

Python
from scipy.optimize import minimize

def constrained_optimization_example():
    """
    示例: 在球面上最小化二次函数
    min x^T Q x
    s.t. ||x||^2 = 1
    """
    Q = np.diag([1, 2, 3])  # 目标函数

    def objective(x):
        return x.T @ Q @ x

    def gradient(x):
        return 2 * Q @ x

    # 约束: ||x||^2 = 1
    def constraint(x):
        return np.sum(x**2) - 1

    # 使用SLSQP方法
    result = minimize(
        objective,
        x0=np.array([1, 0, 0]),  # np.array创建NumPy数组
        method='SLSQP',
        jac=gradient,
        constraints={'type': 'eq', 'fun': constraint}
    )

    print(f"最优解: {result.x}")
    print(f"最优值: {result.fun}")
    print(f"约束满足: {constraint(result.x):.10f}")

    # 解析解应该是Q的最小特征值对应的特征向量
    eigenvalues, eigenvectors = np.linalg.eig(Q)
    min_idx = np.argmin(eigenvalues)
    print(f"\n解析解: {eigenvectors[:, min_idx]}")
    print(f"解析最优值: {eigenvalues[min_idx]}")

constrained_optimization_example()

黎曼流形上的梯度下降

黎曼梯度: \(\text{grad} f(x) = \text{Proj}_{T_x\mathcal{M}}(\nabla f(x))\)

投影: 将欧氏梯度投影到切空间

Python
def project_to_sphere(x):
    """投影到单位球面"""
    return x / np.linalg.norm(x)

def riemannian_gradient_descent(f, grad_f, x0, n_iter=100, lr=0.01):
    """
    球面上的黎曼梯度下降

    约束流形: M = {x : ||x|| = 1}
    """
    x = x0.copy()
    x = project_to_sphere(x)  # 确保在流形上

    history = []
    for i in range(n_iter):
        # 计算欧氏梯度
        euclidean_grad = grad_f(x)

        # 投影到切空间: v = (I - xx^T)g
        tangent_grad = euclidean_grad - np.dot(x, euclidean_grad) * x

        # 梯度下降
        x_new = x - lr * tangent_grad

        # 投影回流形(重正交化)
        x = project_to_sphere(x_new)

        history.append(f(x))

    return x, history

# 示例: 在球面上最小化二次型
Q = np.diag([1, 2, 3, 4, 5])
x0 = np.random.randn(5)

x_opt, history = riemannian_gradient_descent(
    lambda x: x.T @ Q @ x,
    lambda x: 2 * Q @ x,
    x0,
    n_iter=200,
    lr=0.1
)

print(f"最优解: {x_opt}")
print(f"最优值: {x_opt.T @ Q @ x_opt:.6f}")
print(f"范数: {np.linalg.norm(x_opt):.6f}")

二、概率图模型

2.1 贝叶斯网络

表示与分解

链式法则:

\[P(X_1, X_2, ..., X_n) = \prod_{i=1}^n P(X_i | Pa(X_i))\]

其中\(Pa(X_i)\)\(X_i\)的父节点

条件独立性:

  • 头到尾 (head-to-tail): \(A \rightarrow B \rightarrow C\),给定\(B\)\(A \perp C\)
  • 尾到尾 (tail-to-tail): \(A \leftarrow B \rightarrow C\),给定\(B\)\(A \perp C\)
  • 头到头 (head-to-head): \(A \rightarrow B \leftarrow C\),给定\(B\)\(B\)的后代时\(A \not\perp C\)
Python
class BayesianNetwork:
    """简单的贝叶斯网络实现"""

    def __init__(self):
        self.graph = {}  # 邻接表
        self.cpts = {}   # 条件概率表

    def add_node(self, node, parents=None, cpt=None):
        """添加节点"""
        if parents is None:
            parents = []
        self.graph[node] = parents
        self.cpts[node] = cpt

    def get_joint_probability(self, assignment):
        """计算联合概率"""
        prob = 1.0
        for node, value in assignment.items():
            parents = self.graph[node]
            parent_values = tuple(assignment[p] for p in parents)  # 列表推导式,简洁创建列表

            if parents:
                prob *= self.cpts[node][parent_values][value]
            else:
                prob *= self.cpts[node][value]

        return prob

    def get_marginal_probability(self, node, evidence=None):
        """计算条件概率 P(node=1 | evidence)(枚举推理)"""
        if evidence is None:
            evidence = {}

        # 需要边缘化掉的变量(排除查询节点和证据节点)
        all_nodes = list(self.graph.keys())
        marginal_nodes = [n for n in all_nodes if n != node and n not in evidence]

        def sum_over(nodes, fixed_assignment):
            """枚举nodes的所有赋值,累加联合概率"""
            if not nodes:
                assignment = {**evidence, **fixed_assignment}
                return self.get_joint_probability(assignment)

            total = 0.0
            n = nodes[0]
            for value in [0, 1]:  # 假设二值变量
                fixed_assignment[n] = value
                total += sum_over(nodes[1:], fixed_assignment)  # 切片操作取子序列
                del fixed_assignment[n]
            return total

        # 对查询节点的每个取值求边缘概率
        prob_node_1 = sum_over(marginal_nodes, {node: 1})
        prob_node_0 = sum_over(marginal_nodes, {node: 0})

        # 归一化得到条件概率
        total_evidence = prob_node_0 + prob_node_1
        if total_evidence == 0:
            return 0.0
        return prob_node_1 / total_evidence

# 示例: 简单的疾病诊断网络
# 结构: Smoking -> LungCancer <- Pollution
#              -> Bronchitis

bn = BayesianNetwork()

# 先验概率
bn.add_node('Smoking', cpt={0: 0.7, 1: 0.3})
bn.add_node('Pollution', cpt={0: 0.9, 1: 0.1})

# 条件概率表
bn.add_node('LungCancer',
            parents=['Smoking', 'Pollution'],
            cpt={
                (0, 0): {0: 0.99, 1: 0.01},
                (0, 1): {0: 0.95, 1: 0.05},
                (1, 0): {0: 0.90, 1: 0.10},
                (1, 1): {0: 0.70, 1: 0.30}
            })

bn.add_node('Bronchitis',
            parents=['Smoking'],
            cpt={
                0: {0: 0.95, 1: 0.05},
                1: {0: 0.60, 1: 0.40}
            })

# 查询: 给定吸烟,患肺癌的概率
prob = bn.get_marginal_probability('LungCancer', evidence={'Smoking': 1})
print(f"P(LungCancer | Smoking) = {prob:.4f}")

2.2 马尔可夫随机场

因子分解

吉布斯分布:

\[P(X) = \frac{1}{Z} \prod_{c \in \mathcal{C}} \psi_c(X_c)\]

其中: - \(\mathcal{C}\)是团(clique)的集合 - \(\psi_c\)是势函数(非负) - \(Z = \sum_X \prod_{c} \psi_c(X_c)\)是配分函数

对数线性模型:

\[P(X) = \frac{1}{Z} \exp\left(\sum_{c} w_c \phi_c(X_c)\right)\]
Python
class MarkovRandomField:
    """马尔可夫随机场"""

    def __init__(self, variables, cliques):
        """
        variables: 变量列表
        cliques: 团列表,每个团是变量的子集
        """
        self.variables = variables
        self.cliques = cliques
        self.potentials = {}

    def set_potential(self, clique_idx, potential_func):
        """设置势函数"""
        self.potentials[clique_idx] = potential_func

    def compute_partition_function(self):
        """计算配分函数(枚举所有赋值)"""
        n = len(self.variables)
        Z = 0.0

        # 枚举所有2^n种赋值
        for assignment_bits in range(2**n):
            assignment = {}
            for i, var in enumerate(self.variables):  # enumerate同时获取索引和元素
                assignment[var] = (assignment_bits >> i) & 1

            # 计算非归一化概率
            energy = 0.0
            for idx, clique in enumerate(self.cliques):
                clique_assignment = tuple(assignment[v] for v in clique)
                if idx in self.potentials:
                    energy += self.potentials[idx](clique_assignment)

            Z += np.exp(energy)

        return Z

    def gibbs_sampling(self, n_samples=1000, burn_in=100):
        """Gibbs采样"""
        # 随机初始化
        state = {var: np.random.randint(2) for var in self.variables}
        samples = []

        for i in range(n_samples + burn_in):
            for var in self.variables:
                # 计算条件概率 P(var | 其他变量)
                probs = []
                for val in [0, 1]:
                    state[var] = val

                    # 计算能量
                    energy = 0.0
                    for idx, clique in enumerate(self.cliques):
                        if var in clique:
                            clique_assignment = tuple(state[v] for v in clique)
                            if idx in self.potentials:
                                energy += self.potentials[idx](clique_assignment)

                    probs.append(np.exp(energy))

                # 归一化并采样
                probs = np.array(probs) / sum(probs)
                state[var] = np.random.choice([0, 1], p=probs)

            if i >= burn_in:
                samples.append(state.copy())

        return samples

# 示例: Ising模型(格子上的MRF)
def create_ising_model(n, J=1.0, h=0.0):
    """
    创建Ising模型
    能量: E = -J * Σ_{i~j} x_i x_j - h * Σ_i x_i
    """
    variables = [(i, j) for i in range(n) for j in range(n)]

    # 团:相邻的变量对
    cliques = []
    for i in range(n):
        for j in range(n):
            if i < n - 1:
                cliques.append([(i, j), (i+1, j)])
            if j < n - 1:
                cliques.append([(i, j), (i, j+1)])

    mrf = MarkovRandomField(variables, cliques)

    # 设置势函数
    for idx in range(len(cliques)):
        # 工厂函数make_potential解决Python闭包延迟绑定问题:若直接用lambda,循环结束后所有势函数的j都会指向最后一个值
        def make_potential(j):
            return lambda assignment: J * (2*assignment[0] - 1) * (2*assignment[1] - 1)
        mrf.set_potential(idx, make_potential(idx))

    return mrf

# 创建3x3 Ising模型
ising = create_ising_model(3, J=0.5)
samples = ising.gibbs_sampling(n_samples=1000)

# 计算平均磁化
magnetization = np.mean([np.mean(list(s.values())) for s in samples])
print(f"平均磁化: {magnetization:.4f}")

2.3 因子图与和积算法

因子图表示

因子图:二分图,包含变量节点和因子节点

和积算法(信念传播):

  • 变量到因子的消息:\(\mu_{x \to f}(x) = \prod_{h \in N(x) \setminus f} \mu_{h \to x}(x)\)
  • 因子到变量的消息:\(\mu_{f \to x}(x) = \sum_{x'} f(x, x') \prod_{y \in N(f) \setminus x} \mu_{y \to f}(y)\)
Python
class FactorGraph:
    """因子图与和积算法"""

    def __init__(self):
        self.variables = {}  # 变量 -> 邻居因子
        self.factors = {}    # 因子 -> (变量列表, 因子函数)
        self.messages = {}   # 消息缓存

    def add_variable(self, var, domain):
        """添加变量"""
        self.variables[var] = {'domain': domain, 'neighbors': []}

    def add_factor(self, factor_id, variables, factor_func):
        """添加因子"""
        self.factors[factor_id] = (variables, factor_func)
        for var in variables:
            self.variables[var]['neighbors'].append(factor_id)

    def sum_product(self, max_iter=10):
        """和积算法(信念传播)"""
        # 初始化消息
        for var in self.variables:
            for factor in self.variables[var]['neighbors']:
                self.messages[('v2f', var, factor)] = np.ones(
                    len(self.variables[var]['domain'])
                )
                self.messages[('f2v', factor, var)] = np.ones(
                    len(self.variables[var]['domain'])
                )

        # 迭代传递消息
        for _ in range(max_iter):
            # 变量到因子
            for var in self.variables:
                for factor in self.variables[var]['neighbors']:
                    msg = np.ones(len(self.variables[var]['domain']))
                    for other_factor in self.variables[var]['neighbors']:
                        if other_factor != factor:
                            msg *= self.messages[('f2v', other_factor, var)]
                    self.messages[('v2f', var, factor)] = msg / msg.sum()

            # 因子到变量
            for factor_id, (vars_in_factor, factor_func) in self.factors.items():
                for target_var in vars_in_factor:
                    # 收集其他变量的消息
                    other_vars = [v for v in vars_in_factor if v != target_var]

                    # 计算消息
                    msg = np.zeros(len(self.variables[target_var]['domain']))

                    # 枚举其他变量的所有赋值
                    def enumerate_assignments(idx, current_assignment):
                        if idx == len(other_vars):
                            # 计算因子值
                            for val_idx, val in enumerate(self.variables[target_var]['domain']):
                                full_assignment = {**current_assignment, target_var: val}
                                factor_val = factor_func(full_assignment)

                                # 乘以其他变量的消息
                                msg_factor = 1.0
                                for other_var in other_vars:
                                    other_val = full_assignment[other_var]
                                    other_val_idx = self.variables[other_var]['domain'].index(other_val)
                                    msg_factor *= self.messages[('v2f', other_var, factor_id)][other_val_idx]

                                msg[val_idx] += factor_val * msg_factor
                            return

                        var = other_vars[idx]
                        for val in self.variables[var]['domain']:
                            current_assignment[var] = val
                            enumerate_assignments(idx + 1, current_assignment)
                            del current_assignment[var]

                    enumerate_assignments(0, {})
                    self.messages[('f2v', factor_id, target_var)] = msg / msg.sum()

        # 计算边缘分布
        marginals = {}
        for var in self.variables:
            belief = np.ones(len(self.variables[var]['domain']))
            for factor in self.variables[var]['neighbors']:
                belief *= self.messages[('f2v', factor, var)]
            marginals[var] = belief / belief.sum()

        return marginals

# 示例: 简单的纠错码
def create_ldpc_code():
    """创建LDPC码的因子图"""
    fg = FactorGraph()

    # 变量(比特)
    for i in range(4):
        fg.add_variable(f'x{i}', [0, 1])

    # 校验因子(奇偶校验)
    def parity_check(assignment):
        """奇偶校验: 和为偶数时返回1,否则返回0"""
        return 1.0 if sum(assignment.values()) % 2 == 0 else 0.0

    fg.add_factor('f0', ['x0', 'x1', 'x2'], parity_check)
    fg.add_factor('f1', ['x1', 'x2', 'x3'], parity_check)

    # 观测因子(信道输出)
    def likelihood(x_idx, p_flip):
        """似然因子"""
        def factor(assignment):
            x = assignment[f'x{x_idx}']
            # 假设观测到0
            return 1 - p_flip if x == 0 else p_flip
        return factor

    fg.add_factor('obs0', ['x0'], likelihood(0, 0.1))
    fg.add_factor('obs1', ['x1'], likelihood(1, 0.1))
    fg.add_factor('obs2', ['x2'], likelihood(2, 0.1))
    fg.add_factor('obs3', ['x3'], likelihood(3, 0.1))

    return fg

# 运行和积算法
fg = create_ldpc_code()
marginals = fg.sum_product(max_iter=5)

print("边缘分布:")
for var, dist in marginals.items():
    print(f"{var}: P(0)={dist[0]:.4f}, P(1)={dist[1]:.4f}")

三、优化理论深化

3.1 收敛性分析

梯度下降的收敛率

凸函数:

  • 步长\(\eta = \frac{1}{L}\)时:\(f(x_T) - f^* \leq \frac{L\|x_0 - x^*\|^2}{2T}\)
  • 收敛率:\(O(1/T)\)

强凸函数(参数\(\mu\)):

  • 步长\(\eta = \frac{1}{L}\)时:\(\|x_T - x^*\|^2 \leq \left(1 - \frac{\mu}{L}\right)^T \|x_0 - x^*\|^2\)
  • 收敛率:\(O((1 - \mu/L)^T)\),即线性收敛

条件数: \(\kappa = \frac{L}{\mu}\)

Python
def analyze_convergence():
    """分析不同条件下的收敛率"""

    # 问题设置: 二次函数 f(x) = 0.5 * x^T A x - b^T x
    # 其中 A 的特征值在 [mu, L] 之间

    mu, L = 1, 100  # 强凸参数和平滑参数
    kappa = L / mu  # 条件数

    # 创建病态矩阵
    n = 10
    eigenvalues = np.linspace(mu, L, n)
    A = np.diag(eigenvalues)
    b = np.ones(n)

    # 最优解
    x_star = np.linalg.solve(A, b)
    f_star = 0.5 * x_star.T @ A @ x_star - b.T @ x_star

    def gradient_descent(x0, eta, n_iter):
        """梯度下降"""
        x = x0.copy()
        history = []

        for _ in range(n_iter):
            grad = A @ x - b
            x = x - eta * grad

            f = 0.5 * x.T @ A @ x - b.T @ x
            history.append(f - f_star)

        return x, history

    # 不同步长的比较
    x0 = np.random.randn(n)

    etas = [1/L, 2/(mu + L), 1/mu]
    labels = ['η=1/L (保守)', 'η=2/(μ+L) (最优)', 'η=1/μ (激进)']

    import matplotlib.pyplot as plt
    plt.figure(figsize=(10, 6))

    for eta, label in zip(etas, labels):
        _, history = gradient_descent(x0, eta, 200)
        plt.semilogy(history, label=label)

    # 理论收敛率
    T = np.arange(200)
    theory = 0.5 * L * np.linalg.norm(x0 - x_star)**2 / (T + 1)
    plt.semilogy(theory, '--', label='Theory: O(1/T)', alpha=0.5)

    plt.xlabel('Iteration')
    plt.ylabel('f(x) - f* (log scale)')
    plt.title('Gradient Descent Convergence')
    plt.legend()
    plt.grid(True)
    plt.show()

    print(f"条件数 κ = {kappa:.2f}")

analyze_convergence()

随机梯度下降的收敛性

凸函数:

步长\(\eta_t = \frac{\eta_0}{\sqrt{t}}\)时:

\[\mathbb{E}[f(\bar{x}_T)] - f^* \leq O\left(\frac{1}{\sqrt{T}}\right)\]

强凸函数:

步长\(\eta_t = \frac{1}{\mu t}\)时:

\[\mathbb{E}[\|x_T - x^*\|^2] \leq O\left(\frac{1}{T}\right)\]
Python
def sgd_convergence_analysis():
    """SGD收敛性分析"""

    np.random.seed(42)

    # 线性回归: min ||Ax - b||^2
    n, d = 1000, 20
    A = np.random.randn(n, d)
    x_true = np.random.randn(d)
    b = A @ x_true + 0.1 * np.random.randn(n)

    # 计算最优解
    x_star = np.linalg.lstsq(A, b, rcond=None)[0]

    # 强凸参数
    mu = 2 * np.min(np.linalg.eigvals(A.T @ A)) / n
    L = 2 * np.max(np.linalg.eigvals(A.T @ A)) / n

    def sgd(x0, lr_schedule, n_epochs, batch_size=1):
        """SGD"""
        x = x0.copy()
        history = []

        for epoch in range(n_epochs):
            indices = np.random.permutation(n)

            for i in range(0, n, batch_size):
                batch_idx = indices[i:i+batch_size]
                A_batch = A[batch_idx]
                b_batch = b[batch_idx]

                # 随机梯度
                grad = 2 * A_batch.T @ (A_batch @ x - b_batch) / len(batch_idx)

                # 学习率
                t = epoch * (n // batch_size) + i // batch_size
                lr = lr_schedule(t)

                x = x - lr * grad

            # 记录误差
            error = np.linalg.norm(x - x_star)**2
            history.append(error)

        return history

    x0 = np.zeros(d)
    n_epochs = 50

    # 不同学习率调度
    # 策略模式:字典值为lambda函数,通过schedules['decay'](t)调用对应的学习率计算公式
    schedules = {
        'constant': lambda t: 0.01,
        'decay': lambda t: 0.1 / (1 + 0.01 * t),
        'optimal': lambda t: 1 / (mu * (t + 1))
    }

    plt.figure(figsize=(10, 6))

    for name, schedule in schedules.items():
        history = sgd(x0, schedule, n_epochs)
        plt.semilogy(history, label=name)

    # 理论收敛率 O(1/T)
    T = np.arange(1, n_epochs + 1)
    theory = 1 / T
    plt.semilogy(theory / theory[0] * 10, '--', label='Theory: O(1/T)', alpha=0.5)

    plt.xlabel('Epoch')
    plt.ylabel('||x - x*||^2 (log scale)')
    plt.title('SGD Convergence Analysis')
    plt.legend()
    plt.grid(True)
    plt.show()

sgd_convergence_analysis()

3.2 复杂度理论

一阶 oracle 复杂度下界

定理: 对于\(L\)-光滑、\(\mu\)-强凸函数,任何一阶方法的迭代复杂度下界为:

\[\Omega\left(\sqrt{\kappa} \log \frac{1}{\epsilon}\right)\]

加速梯度方法(如Nesterov加速)达到此下界。

Python
def compare_optimization_methods():
    """比较不同优化方法的收敛速度"""

    # 病态二次函数
    n = 100
    kappa = 1000  # 条件数

    eigenvalues = np.linspace(1, kappa, n)
    A = np.diag(eigenvalues)
    b = np.ones(n)

    x_star = np.linalg.solve(A, b)

    def gradient_descent(x0, eta, n_iter):
        """普通梯度下降"""
        x = x0.copy()
        history = []

        for _ in range(n_iter):
            grad = A @ x - b
            x = x - eta * grad
            error = np.linalg.norm(x - x_star)
            history.append(error)

        return history

    def nesterov_acceleration(x0, eta, n_iter):
        """Nesterov加速梯度下降"""
        x = x0.copy()
        y = x0.copy()
        history = []

        for t in range(n_iter):
            x_new = y - eta * (A @ y - b)
            y = x_new + (t / (t + 3)) * (x_new - x)
            x = x_new

            error = np.linalg.norm(x - x_star)
            history.append(error)

        return history

    def conjugate_gradient(x0, n_iter):
        """共轭梯度法"""
        x = x0.copy()
        r = b - A @ x
        p = r.copy()
        history = []

        for _ in range(n_iter):
            Ap = A @ p
            alpha = np.dot(r, r) / np.dot(p, Ap)
            x = x + alpha * p
            r_new = r - alpha * Ap

            beta = np.dot(r_new, r_new) / np.dot(r, r)
            p = r_new + beta * p
            r = r_new

            error = np.linalg.norm(x - x_star)
            history.append(error)

        return history

    x0 = np.random.randn(n)
    n_iter = 200

    # 运行各种方法
    gd_history = gradient_descent(x0, 2/(1+kappa), n_iter)
    nesterov_history = nesterov_acceleration(x0, 1/kappa, n_iter)
    cg_history = conjugate_gradient(x0, n_iter)

    # 绘制结果
    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.semilogy(gd_history, label='Gradient Descent')
    plt.semilogy(nesterov_history, label='Nesterov Acceleration')
    plt.semilogy(cg_history, label='Conjugate Gradient')
    plt.xlabel('Iteration')
    plt.ylabel('||x - x*||')
    plt.title('Convergence Comparison')
    plt.legend()
    plt.grid(True)

    plt.subplot(1, 2, 2)
    # 前50次迭代
    plt.semilogy(gd_history[:50], label='Gradient Descent')
    plt.semilogy(nesterov_history[:50], label='Nesterov Acceleration')
    plt.semilogy(cg_history[:50], label='Conjugate Gradient')
    plt.xlabel('Iteration')
    plt.ylabel('||x - x*||')
    plt.title('First 50 Iterations')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.show()

    print(f"条件数: {kappa}")
    print(f"GD最终误差: {gd_history[-1]:.2e}")
    print(f"Nesterov最终误差: {nesterov_history[-1]:.2e}")
    print(f"CG最终误差: {cg_history[-1]:.2e}")

compare_optimization_methods()

3.3 随机优化的收敛保证

方差缩减方法

SVRG (Stochastic Variance Reduced Gradient):

\[\tilde{\nabla}_t = \nabla f_{i_t}(x_t) - \nabla f_{i_t}(\tilde{x}) + \tilde{\mu}\]

其中\(\tilde{\mu} = \nabla f(\tilde{x})\)是全梯度,每隔\(m\)次迭代计算一次。

收敛率: \(O((n + \kappa) \log(1/\epsilon))\),远优于SGD的\(O(1/\epsilon)\)

Python
class VarianceReductionMethods:
    """方差缩减方法实现"""

    def __init__(self, A, b):
        self.A = A
        self.b = b
        self.n = A.shape[0]

    def full_gradient(self, x):
        """计算全梯度"""
        return 2 * self.A.T @ (self.A @ x - self.b) / self.n

    def stochastic_gradient(self, x, i):
        """计算随机梯度"""
        a_i = self.A[i]
        return 2 * a_i * (a_i @ x - self.b[i])

    def svrg(self, x0, n_epochs, inner_iter, lr):
        """SVRG算法"""
        x = x0.copy()
        history = []

        for epoch in range(n_epochs):
            # 计算全梯度
            x_tilde = x.copy()
            mu_tilde = self.full_gradient(x_tilde)

            for t in range(inner_iter):
                # 随机采样
                i = np.random.randint(self.n)

                # 方差缩减梯度
                grad = (self.stochastic_gradient(x, i) -
                       self.stochastic_gradient(x_tilde, i) + mu_tilde)

                x = x - lr * grad

            error = np.linalg.norm(self.full_gradient(x))
            history.append(error)

        return x, history

    def saga(self, x0, n_epochs, lr):
        """SAGA算法"""
        x = x0.copy()
        history = []

        # 存储历史梯度
        grad_history = np.zeros((self.n, len(x0)))
        for i in range(self.n):
            grad_history[i] = self.stochastic_gradient(x, i)

        grad_avg = grad_history.mean(axis=0)

        for epoch in range(n_epochs):
            for _ in range(self.n):
                i = np.random.randint(self.n)

                # 新梯度
                grad_new = self.stochastic_gradient(x, i)

                # SAGA更新
                grad = grad_new - grad_history[i] + grad_avg
                x = x - lr * grad

                # 更新历史
                grad_avg += (grad_new - grad_history[i]) / self.n
                grad_history[i] = grad_new

            error = np.linalg.norm(self.full_gradient(x))
            history.append(error)

        return x, history

def test_variance_reduction():
    """测试方差缩减方法"""

    np.random.seed(42)

    # 生成数据
    n, d = 1000, 50
    A = np.random.randn(n, d)
    x_true = np.random.randn(d)
    b = A @ x_true + 0.1 * np.random.randn(n)

    vr = VarianceReductionMethods(A, b)
    x0 = np.zeros(d)

    # 运行SVRG
    x_svrg, history_svrg = vr.svrg(x0, n_epochs=20, inner_iter=n, lr=0.01)

    # 运行SAGA
    x_saga, history_saga = vr.saga(x0, n_epochs=20, lr=0.01)

    # 运行普通SGD
    def sgd(x0, n_epochs, lr):
        x = x0.copy()
        history = []

        for epoch in range(n_epochs):
            for _ in range(n):
                i = np.random.randint(n)
                grad = vr.stochastic_gradient(x, i)
                x = x - lr * grad

            error = np.linalg.norm(vr.full_gradient(x))
            history.append(error)

        return history

    history_sgd = sgd(x0, 20, 0.001)

    # 绘制结果
    plt.figure(figsize=(10, 6))
    plt.semilogy(history_sgd, label='SGD', alpha=0.7)
    plt.semilogy(history_svrg, label='SVRG', linewidth=2)
    plt.semilogy(history_saga, label='SAGA', linewidth=2)

    plt.xlabel('Epoch')
    plt.ylabel('Gradient Norm (log scale)')
    plt.title('Variance Reduction Methods')
    plt.legend()
    plt.grid(True)
    plt.show()

    print(f"SGD最终梯度范数: {history_sgd[-1]:.4f}")
    print(f"SVRG最终梯度范数: {history_svrg[-1]:.4f}")
    print(f"SAGA最终梯度范数: {history_saga[-1]:.4f}")

test_variance_reduction()

四、泛函分析基础

4.1 希尔伯特空间

定义与性质

希尔伯特空间:完备的内积空间

内积性质: 1. 对称性: \(\langle x, y \rangle = \langle y, x \rangle\) 2. 线性: \(\langle ax + by, z \rangle = a\langle x, z \rangle + b\langle y, z \rangle\) 3. 正定性: \(\langle x, x \rangle \geq 0\),且\(=0\)当且仅当\(x=0\)

范数: \(\|x\| = \sqrt{\langle x, x \rangle}\)

重要定理: - Cauchy-Schwarz不等式: \(|\langle x, y \rangle| \leq \|x\| \|y\|\) - 正交投影定理 - Riesz表示定理

Python
class HilbertSpaceExamples:
    """希尔伯特空间的例子"""

    @staticmethod  # @staticmethod静态方法,无需实例即可调用
    def euclidean_space():
        """
        R^n: 欧氏空间
        内积: <x, y> = x^T y
        """
        x = np.array([1, 2, 3])
        y = np.array([4, 5, 6])

        inner_product = np.dot(x, y)
        norm_x = np.linalg.norm(x)
        norm_y = np.linalg.norm(y)

        print("欧氏空间 R^3:")
        print(f"  <x, y> = {inner_product}")
        print(f"  ||x|| = {norm_x:.4f}")
        print(f"  ||y|| = {norm_y:.4f}")

        # 验证Cauchy-Schwarz
        cs_bound = norm_x * norm_y
        print(f"  |<x, y>| = {abs(inner_product):.4f} <= {cs_bound:.4f}")

        # 正交投影
        proj_y_on_x = (inner_product / norm_x**2) * x
        print(f"  y在x上的投影: {proj_y_on_x}")

    @staticmethod
    def l2_space():
        """
        L^2[a,b]: 平方可积函数空间
        内积: <f, g> = ∫_a^b f(x)g(x) dx
        """
        from scipy import integrate

        # 函数
        f = lambda x: np.sin(x)
        g = lambda x: np.cos(x)

        # 内积
        inner_product, _ = integrate.quad(
            lambda x: f(x) * g(x), 0, 2*np.pi
        )

        # 范数
        norm_f, _ = integrate.quad(lambda x: f(x)**2, 0, 2*np.pi)
        norm_f = np.sqrt(norm_f)

        norm_g, _ = integrate.quad(lambda x: g(x)**2, 0, 2*np.pi)
        norm_g = np.sqrt(norm_g)

        print("\nL^2[0, 2π] 空间:")
        print(f"  <sin, cos> = {inner_product:.6f} (≈ 0, 正交)")
        print(f"  ||sin|| = {norm_f:.4f}")
        print(f"  ||cos|| = {norm_g:.4f}")

    @staticmethod
    def fourier_series():
        """傅里叶级数作为正交基"""

        # 在L^2[0, 2π]中,{1, cos(nx), sin(nx)}是正交基
        from scipy import integrate

        print("\n傅里叶级数正交性:")

        # 验证正交性
        for n in [1, 2, 3]:
            for m in [1, 2, 3]:
                inner, _ = integrate.quad(
                    lambda x: np.sin(n*x) * np.sin(m*x),
                    0, 2*np.pi
                )
                print(f"  <sin({n}x), sin({m}x)> = {inner:.6f}", end="")
                if n == m:
                    print(f" (应该 = π)")
                else:
                    print(f" (应该 = 0)")

# 运行示例
HilbertSpaceExamples.euclidean_space()
HilbertSpaceExamples.l2_space()
HilbertSpaceExamples.fourier_series()

4.2 再生核希尔伯特空间(RKHS)

核函数与RKHS

定义: 希尔伯特空间\(\mathcal{H}\)是RKHS,如果存在再生核\(K(x, \cdot) \in \mathcal{H}\)使得:

\[f(x) = \langle f, K(x, \cdot) \rangle_{\mathcal{H}} \quad \forall f \in \mathcal{H}\]

核技巧: 在高维特征空间中计算内积,而无需显式计算特征映射

\[K(x, y) = \langle \phi(x), \phi(y) \rangle_{\mathcal{H}}\]

Mercer定理: 对称半正定核对应一个RKHS

Python
class KernelMethods:
    """核方法实现"""

    @staticmethod
    def linear_kernel(x, y):
        """线性核"""
        return np.dot(x, y)

    @staticmethod
    def polynomial_kernel(x, y, degree=3, coef0=1):
        """多项式核"""
        return (np.dot(x, y) + coef0) ** degree

    @staticmethod
    def rbf_kernel(x, y, gamma=1.0):
        """RBF/高斯核"""
        return np.exp(-gamma * np.linalg.norm(x - y)**2)

    @staticmethod
    def compute_kernel_matrix(X, kernel_func):
        """计算核矩阵"""
        n = X.shape[0]
        K = np.zeros((n, n))

        for i in range(n):
            for j in range(i, n):
                K[i, j] = kernel_func(X[i], X[j])
                K[j, i] = K[i, j]

        return K

    @staticmethod
    def kernel_ridge_regression(X, y, kernel_func, lambda_reg=0.1):
        """
        核岭回归

        min ||y - Kα||^2 + λ α^T K α

        解: α = (K + λI)^(-1) y
        """
        K = KernelMethods.compute_kernel_matrix(X, kernel_func)
        n = K.shape[0]

        alpha = np.linalg.solve(K + lambda_reg * np.eye(n), y)

        return alpha

    @staticmethod
    def predict(X_train, X_test, alpha, kernel_func):
        """使用核方法预测"""
        n_test = X_test.shape[0]
        n_train = X_train.shape[0]

        K_test = np.zeros((n_test, n_train))
        for i in range(n_test):
            for j in range(n_train):
                K_test[i, j] = kernel_func(X_test[i], X_train[j])

        return K_test @ alpha

def demonstrate_kernel_trick():
    """演示核技巧"""

    np.random.seed(42)

    # 生成非线性数据
    n = 100
    X = np.linspace(-3, 3, n).reshape(-1, 1)  # 链式调用,连续执行多个方法  # reshape重塑张量形状
    y = np.sin(X).ravel() + 0.1 * np.random.randn(n)

    # 划分训练测试
    train_idx = np.random.choice(n, 80, replace=False)
    test_idx = np.array([i for i in range(n) if i not in train_idx])

    X_train, y_train = X[train_idx], y[train_idx]
    X_test, y_test = X[test_idx], y[test_idx]

    # 线性核(等价于线性回归)
    alpha_linear = KernelMethods.kernel_ridge_regression(
        X_train, y_train,
        KernelMethods.linear_kernel,
        lambda_reg=0.1
    )
    y_pred_linear = KernelMethods.predict(
        X_train, X_test, alpha_linear, KernelMethods.linear_kernel
    )

    # RBF核
    alpha_rbf = KernelMethods.kernel_ridge_regression(
        X_train, y_train,
        lambda x, y: KernelMethods.rbf_kernel(x, y, gamma=0.5),
        lambda_reg=0.01
    )
    y_pred_rbf = KernelMethods.predict(
        X_train, X_test, alpha_rbf,
        lambda x, y: KernelMethods.rbf_kernel(x, y, gamma=0.5)
    )

    # 计算误差
    mse_linear = np.mean((y_test - y_pred_linear)**2)
    mse_rbf = np.mean((y_test - y_pred_rbf)**2)

    print("核岭回归结果:")
    print(f"  线性核 MSE: {mse_linear:.4f}")
    print(f"  RBF核 MSE: {mse_rbf:.4f}")

    # 可视化
    X_plot = np.linspace(-3, 3, 200).reshape(-1, 1)
    y_plot_linear = KernelMethods.predict(
        X_train, X_plot, alpha_linear, KernelMethods.linear_kernel
    )
    y_plot_rbf = KernelMethods.predict(
        X_train, X_plot, alpha_rbf,
        lambda x, y: KernelMethods.rbf_kernel(x, y, gamma=0.5)
    )

    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.scatter(X_train, y_train, alpha=0.5, label='Train')
    plt.scatter(X_test, y_test, alpha=0.5, label='Test')
    plt.plot(X_plot, y_plot_linear, 'r-', label='Linear Kernel')
    plt.legend()
    plt.title(f'Linear Kernel (MSE={mse_linear:.4f})')

    plt.subplot(1, 2, 2)
    plt.scatter(X_train, y_train, alpha=0.5, label='Train')
    plt.scatter(X_test, y_test, alpha=0.5, label='Test')
    plt.plot(X_plot, y_plot_rbf, 'r-', label='RBF Kernel')
    plt.legend()
    plt.title(f'RBF Kernel (MSE={mse_rbf:.4f})')

    plt.tight_layout()
    plt.show()

demonstrate_kernel_trick()

4.3 核方法理论

表示定理

定理: 对于RKHS中的正则化问题:

\[\min_{f \in \mathcal{H}} \sum_{i=1}^n L(y_i, f(x_i)) + \lambda \|f\|_{\mathcal{H}}^2\]

解具有形式:

\[f^*(x) = \sum_{i=1}^n \alpha_i K(x_i, x)\]

这意味着最优解位于数据点张成的子空间中。

Python
def demonstrate_representer_theorem():
    """
    演示表示定理

    核岭回归的解自动满足表示定理
    """

    np.random.seed(42)

    # 生成数据
    n = 50
    X = np.random.randn(n, 2)
    y = np.sin(X[:, 0]) + np.cos(X[:, 1]) + 0.1 * np.random.randn(n)

    # 核岭回归
    gamma = 0.5
    lambda_reg = 0.01

    K = np.zeros((n, n))
    for i in range(n):
        for j in range(n):
            K[i, j] = np.exp(-gamma * np.linalg.norm(X[i] - X[j])**2)

    alpha = np.linalg.solve(K + lambda_reg * np.eye(n), y)

    # 验证表示定理: f(x) = Σ α_i K(x_i, x)
    def f(x):
        return sum(alpha[i] * np.exp(-gamma * np.linalg.norm(X[i] - x)**2)
                  for i in range(n))

    # 在训练点上,预测应该接近真实值
    predictions = [f(X[i]) for i in range(n)]

    print("表示定理验证:")
    print(f"  训练误差: {np.mean((y - predictions)**2):.6f}")
    print(f"  非零系数比例: {np.mean(np.abs(alpha) > 1e-6):.2%}")

    # 可视化系数
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.stem(alpha)
    plt.xlabel('Data Point Index')
    plt.ylabel('Coefficient α')
    plt.title('Coefficients in Kernel Expansion')

    plt.subplot(1, 2, 2)
    plt.scatter(y, predictions, alpha=0.6)
    plt.plot([y.min(), y.max()], [y.min(), y.max()], 'r--')
    plt.xlabel('True Value')
    plt.ylabel('Predicted Value')
    plt.title('Prediction Quality')

    plt.tight_layout()
    plt.show()

demonstrate_representer_theorem()

五、信息论进阶

5.1 变分表示

Donsker-Varadhan变分表示

KL散度的变分表示:

\[D_{KL}(P \| Q) = \sup_{T: \Omega \to \mathbb{R}} \left\{ \mathbb{E}_P[T] - \log \mathbb{E}_Q[e^T] \right\}\]

应用: 互信息估计、表示学习

Python
class VariationalRepresentation:
    """信息论的变分表示"""

    @staticmethod
    def dv_representation_estimation(p_samples, q_samples, n_iterations=1000):
        """
        使用Donsker-Varadhan变分表示估计KL散度

        D_KL(P||Q) ≈ max_T E_P[T] - log E_Q[e^T]
        """
        # 简单的统计量作为T函数
        # 实际应用中通常使用神经网络

        def compute_stats(samples):
            """计算统计量"""
            return np.array([
                samples.mean(),
                samples.std(),
                np.percentile(samples, 25),
                np.percentile(samples, 75),
                np.median(samples)
            ])

        # 估计统计量
        p_stats = compute_stats(p_samples)
        q_stats = compute_stats(q_samples)

        # 简单的线性组合
        def T(x, weights):
            x_stats = compute_stats(x)
            return np.dot(weights, x_stats)

        # 优化权重(简化版本)
        best_estimate = -np.inf
        for _ in range(n_iterations):
            weights = np.random.randn(5)

            T_p = T(p_samples, weights)
            T_q = np.mean([T(q_samples[i:i+10], weights)
                          for i in range(0, len(q_samples), 10)])

            # Donsker-Varadhan
            estimate = T_p - np.log(np.mean(np.exp([
                T(q_samples[i:i+10], weights)
                for i in range(0, len(q_samples), 10)
            ])))

            if estimate > best_estimate:
                best_estimate = estimate

        return best_estimate

    @staticmethod
    def mutual_information_dv(x_samples, y_samples, n_bins=10):
        """
        使用DV表示估计互信息

        I(X;Y) = D_KL(P(X,Y) || P(X)P(Y))
        """
        # 离散化
        def discretize(samples, n_bins):
            bins = np.linspace(samples.min(), samples.max(), n_bins+1)
            return np.digitize(samples, bins)

        x_discrete = discretize(x_samples, n_bins)
        y_discrete = discretize(y_samples, n_bins)

        # 估计联合分布和边缘分布
        joint = np.zeros((n_bins+2, n_bins+2))
        for x, y in zip(x_discrete, y_discrete):
            joint[x, y] += 1
        joint /= len(x_samples)

        marginal_x = joint.sum(axis=1)
        marginal_y = joint.sum(axis=0)

        # 计算互信息
        mi = 0.0
        for i in range(n_bins+2):
            for j in range(n_bins+2):
                if joint[i, j] > 0 and marginal_x[i] > 0 and marginal_y[j] > 0:
                    mi += joint[i, j] * np.log(
                        joint[i, j] / (marginal_x[i] * marginal_y[j])
                    )

        return mi

# 示例
np.random.seed(42)

# 生成相关数据
rho = 0.8
cov = [[1, rho], [rho, 1]]
data = np.random.multivariate_normal([0, 0], cov, 1000)
x, y = data[:, 0], data[:, 1]

mi_estimate = VariationalRepresentation.mutual_information_dv(x, y)
print(f"估计的互信息: {mi_estimate:.4f}")
print(f"理论值: {-0.5 * np.log(1 - rho**2):.4f}")

5.2 信息瓶颈理论

信息瓶颈原理

目标: 找到表示\(T\)使得:

\[\min_{p(t|x)} I(X; T) - \beta I(T; Y)\]
  • \(I(X; T)\): 压缩(信息损失)
  • \(I(T; Y)\): 预测能力
  • \(\beta\): 权衡参数

最优解:

\[p(t|x) = \frac{p(t)}{Z(x)} \exp\left(-\frac{1}{\beta} D_{KL}(p(y|x) \| p(y|t))\right)\]
Python
class InformationBottleneck:
    """信息瓶颈方法"""

    def __init__(self, n_clusters, beta=1.0, max_iter=100):
        self.n_clusters = n_clusters
        self.beta = beta
        self.max_iter = max_iter

    def fit(self, X, y):
        """
        拟合信息瓶颈

        简化实现:使用聚类作为表示
        """
        from sklearn.cluster import KMeans

        n_samples = X.shape[0]

        # 初始化聚类
        kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
        cluster_labels = kmeans.fit_predict(X)

        # 计算互信息
        def compute_mi(labels1, labels2):
            """计算离散互信息"""
            joint = np.zeros((len(np.unique(labels1)), len(np.unique(labels2))))
            for i, j in zip(labels1, labels2):
                joint[i, j] += 1
            joint /= len(labels1)

            marginal1 = joint.sum(axis=1)
            marginal2 = joint.sum(axis=0)

            mi = 0.0
            for i in range(joint.shape[0]):
                for j in range(joint.shape[1]):
                    if joint[i, j] > 0:
                        mi += joint[i, j] * np.log(
                            joint[i, j] / (marginal1[i] * marginal2[j])
                        )
            return mi

        # 离散化X(简化处理)
        X_discrete = cluster_labels

        I_X_T = compute_mi(X_discrete, cluster_labels)  # I(X;T)
        I_T_Y = compute_mi(cluster_labels, y)  # I(T;Y)

        # 信息瓶颈目标
        objective = I_X_T - self.beta * I_T_Y

        print(f"信息瓶颈分析:")
        print(f"  I(X; T) = {I_X_T:.4f}")
        print(f"  I(T; Y) = {I_T_Y:.4f}")
        print(f"  目标函数 = {objective:.4f}")

        self.cluster_centers_ = kmeans.cluster_centers_
        self.labels_ = cluster_labels

        return self

    def transform(self, X):
        """转换数据"""
        from sklearn.metrics import pairwise_distances
        distances = pairwise_distances(X, self.cluster_centers_)
        return np.argmin(distances, axis=1)

# 示例: 在MNIST上使用信息瓶颈
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split

digits = load_digits()
X, y = digits.data, digits.target

# 不同beta值的效果
betas = [0.1, 1.0, 10.0]

plt.figure(figsize=(15, 4))

for i, beta in enumerate(betas):
    ib = InformationBottleneck(n_clusters=10, beta=beta)
    ib.fit(X, y)

    plt.subplot(1, 3, i+1)

    # 可视化聚类中心
    for j in range(10):
        plt.subplot(1, 3, i+1)
        center = ib.cluster_centers_[j].reshape(8, 8)
        plt.imshow(center, cmap='gray')
        plt.title(f'β={beta}')
        plt.axis('off')
        break  # 只显示第一个

plt.tight_layout()
plt.show()

5.3 率失真理论

率失真函数

问题: 在允许一定失真的情况下,最小化编码率

\[R(D) = \min_{p(\hat{x}|x): \mathbb{E}[d(X, \hat{X})] \leq D} I(X; \hat{X})\]

高斯源的率失真函数:

\[R(D) = \begin{cases} \frac{1}{2} \log \frac{\sigma^2}{D} & 0 \leq D \leq \sigma^2 \\ 0 & D > \sigma^2 \end{cases}\]
Python
def rate_distortion_gaussian(sigma2, D):
    """
    高斯源的率失真函数

    sigma2: 源方差
    D: 允许失真
    """
    if D >= sigma2:
        return 0
    elif D <= 0:
        return np.inf
    else:
        return 0.5 * np.log2(sigma2 / D)

def plot_rate_distortion():
    """绘制率失真曲线"""

    sigma2 = 1.0
    D_range = np.linspace(0.01, 2*sigma2, 100)

    R = [rate_distortion_gaussian(sigma2, D) for D in D_range]

    plt.figure(figsize=(10, 6))
    plt.plot(D_range, R, 'b-', linewidth=2)
    plt.axhline(y=0, color='r', linestyle='--', alpha=0.5)
    plt.axvline(x=sigma2, color='g', linestyle='--', alpha=0.5, label='D=σ²')

    plt.xlabel('Distortion D')
    plt.ylabel('Rate R (bits)')
    plt.title('Rate-Distortion Function for Gaussian Source')
    plt.grid(True)
    plt.legend()
    plt.show()

    print("关键观察:")
    print(f"  当 D = σ²/4 时, R = {rate_distortion_gaussian(sigma2, sigma2/4):.4f} bits")
    print(f"  当 D = σ²/10 时, R = {rate_distortion_gaussian(sigma2, sigma2/10):.4f} bits")
    print(f"  当 D ≥ σ² 时, R = 0 (不需要传输)")

plot_rate_distortion()

总结

本文档深入探讨了机器学习的数学基础,包括:

  1. 矩阵微积分进阶: 反向传播的矩阵形式、高阶导数、流形优化
  2. 概率图模型: 贝叶斯网络、马尔可夫随机场、因子图与和积算法
  3. 优化理论深化: 收敛性分析、复杂度理论、方差缩减方法
  4. 泛函分析基础: 希尔伯特空间、RKHS、核方法理论
  5. 信息论进阶: 变分表示、信息瓶颈、率失真理论

这些内容为理解现代机器学习算法提供了坚实的理论基础。


下一步: 继续学习 02-核心理论.md,深入理解泛化理论、表示学习和优化景观。