00 - 数学基础(进阶篇)¶
深入机器学习的数学理论:从矩阵微积分到泛函分析
目录¶
- 矩阵微积分进阶
- 反向传播的矩阵形式
- 高阶导数与自动微分
- 流形上的优化
- 概率图模型
- 贝叶斯网络
- 马尔可夫随机场
- 因子图与和积算法
- 优化理论深化
- 收敛性分析
- 复杂度理论
- 随机优化的收敛保证
- 泛函分析基础
- 希尔伯特空间
- 再生核希尔伯特空间(RKHS)
- 核方法理论
- 信息论进阶
- 变分表示
- 信息瓶颈理论
- 率失真理论
一、矩阵微积分进阶¶
1.1 反向传播的矩阵形式¶
全连接网络的反向传播¶
前向传播(矩阵形式):
其中: - \(W^{[l]} \in \mathbb{R}^{n_l \times n_{l-1}}\): 第\(l\)层权重 - \(A^{[l-1]} \in \mathbb{R}^{n_{l-1} \times m}\): 第\(l-1\)层激活(\(m\)是批量大小) - \(b^{[l]} \in \mathbb{R}^{n_l \times 1}\): 偏置(广播到批量大小)
反向传播(矩阵形式):
import numpy as np
def forward_backward_matrix_form(X, Y, weights, biases, activation='relu'):
"""
使用矩阵形式进行前向和反向传播
X: 输入 (n_features, m_samples)
Y: 标签 (n_output, m_samples)
weights: 权重列表
biases: 偏置列表
"""
m = X.shape[1]
L = len(weights)
# 存储中间结果
Z = []
A = [X]
# 前向传播
for l in range(L):
Z_l = weights[l] @ A[l] + biases[l]
Z.append(Z_l)
if l < L - 1: # 隐藏层
A_l = np.maximum(0, Z_l) if activation == 'relu' else np.tanh(Z_l)
else: # 输出层
# Softmax
exp_Z = np.exp(Z_l - np.max(Z_l, axis=0, keepdims=True))
A_l = exp_Z / np.sum(exp_Z, axis=0, keepdims=True)
A.append(A_l)
# 计算损失 (交叉熵)
loss = -np.sum(Y * np.log(A[-1] + 1e-8)) / m
# 反向传播
dW = []
db = []
# 输出层梯度
dZ = A[-1] - Y # Softmax + CrossEntropy 的简化
for l in range(L-1, -1, -1):
# 权重梯度
dW_l = (dZ @ A[l].T) / m
dW.insert(0, dW_l)
# 偏置梯度
db_l = np.sum(dZ, axis=1, keepdims=True) / m
db.insert(0, db_l)
if l > 0: # 非输入层
# 传播到前一层
dA = weights[l].T @ dZ
# 激活函数导数
if activation == 'relu':
dZ = dA * (Z[l-1] > 0)
else:
dZ = dA * (1 - np.tanh(Z[l-1])**2)
return loss, dW, db
# 示例
np.random.seed(42)
n_features, n_hidden, n_output, m = 784, 256, 10, 100
X = np.random.randn(n_features, m)
Y = np.zeros((n_output, m))
for i in range(m):
Y[np.random.randint(n_output), i] = 1
weights = [
np.random.randn(n_hidden, n_features) * 0.01,
np.random.randn(n_output, n_hidden) * 0.01
]
biases = [
np.zeros((n_hidden, 1)),
np.zeros((n_output, 1))
]
loss, dW, db = forward_backward_matrix_form(X, Y, weights, biases)
print(f"损失: {loss:.4f}")
print(f"dW[0] shape: {dW[0].shape}")
print(f"dW[1] shape: {dW[1].shape}")
卷积网络的反向传播¶
卷积层的反向传播:
def conv2d_backward(dZ, X, W, stride=1, padding=0):
"""
卷积层的反向传播
dZ: 输出梯度 (batch, out_channels, out_h, out_w)
X: 输入 (batch, in_channels, h, w)
W: 权重 (out_channels, in_channels, k, k)
"""
batch, out_channels, out_h, out_w = dZ.shape
_, in_channels, h, w = X.shape
k = W.shape[2]
# 权重梯度
dW = np.zeros_like(W)
for i in range(out_channels):
for j in range(in_channels):
for b in range(batch):
dW[i, j] += conv2d_single_channel(
X[b, j:j+1],
dZ[b, i:i+1],
padding=0
)[0, 0]
dW /= batch
# 输入梯度
dX = np.zeros_like(X)
W_rot = np.rot90(W, 2, axes=(2, 3)) # 旋转180度
for b in range(batch):
for i in range(in_channels):
for j in range(out_channels):
dX[b, i] += conv2d_single_channel(
dZ[b, j:j+1],
W_rot[j, i:i+1],
padding=k-1
)[0, 0]
return dW, dX
def conv2d_single_channel(X, W, padding=0):
"""单通道卷积"""
if padding > 0:
X = np.pad(X, ((0, 0), (0, 0), (padding, padding), (padding, padding)))
batch, _, h, w = X.shape
out_channels, _, k, _ = W.shape
out_h, out_w = h - k + 1, w - k + 1
output = np.zeros((batch, out_channels, out_h, out_w))
for i in range(out_h):
for j in range(out_w):
patch = X[:, :, i:i+k, j:j+k]
for oc in range(out_channels):
output[:, oc, i, j] = np.sum(patch * W[oc:oc+1], axis=(1, 2, 3))
return output
1.2 高阶导数与自动微分¶
Hessian矩阵的计算¶
Hessian-vector积(高效计算):
import torch
def hessian_vector_product(loss, params, v):
"""
计算Hessian-vector积 Hv
利用: Hv = ∇(∇L · v)
"""
# 计算梯度
grad = torch.autograd.grad(loss, params, create_graph=True, retain_graph=True)
# 计算梯度和v的内积
grad_v = sum((g * v_i).sum() for g, v_i in zip(grad, v))
# 再求梯度
Hv = torch.autograd.grad(grad_v, params, retain_graph=True)
return Hv
# 示例
def test_hessian():
x = torch.tensor([1.0, 2.0], requires_grad=True)
# 函数: f(x) = x₁² + 2x₂² + 3x₁x₂
f = x[0]**2 + 2*x[1]**2 + 3*x[0]*x[1]
# Hessian应该是:
# [[2, 3],
# [3, 4]]
v = [torch.tensor([1.0, 0.0])] # 测试向量
Hv = hessian_vector_product(f, [x], v)
print(f"Hv = {Hv[0]}") # 应该是 [2, 3]
test_hessian()
自动微分原理¶
计算图与反向传播:
前向传播: 构建计算图,存储中间结果
反向传播: 从输出开始,应用链式法则
自动微分的两种模式:
1. 前向模式 (Forward Mode): 适合输入维度小的情况
2. 反向模式 (Reverse Mode): 适合输出维度小的情况(深度学习常用)
class AutoDiff:
"""简单的自动微分实现"""
def __init__(self, value, grad_fn=None): # __init__构造方法,创建对象时自动调用
self.value = value
self.grad = 0.0
self.grad_fn = grad_fn # 梯度函数
self.prev = [] # 前驱节点
def __add__(self, other):
result = AutoDiff(self.value + other.value)
result.prev = [self, other]
result.grad_fn = lambda g: (g, g) # d(a+b)/da = 1, d(a+b)/db = 1 # lambda匿名函数,简洁定义单行函数
return result
def __mul__(self, other):
result = AutoDiff(self.value * other.value)
result.prev = [self, other]
result.grad_fn = lambda g: (g * other.value, g * self.value)
return result
def backward(self, grad=1.0):
"""反向传播"""
self.grad += grad
if self.grad_fn is not None:
grads = self.grad_fn(grad)
for prev, g in zip(self.prev, grads):
prev.backward(g) # 反向传播计算梯度
# 示例: z = (x + y) * x
x = AutoDiff(2.0)
y = AutoDiff(3.0)
z = (x + y) * x
z.backward()
print(f"z = {z.value}") # (2+3)*2 = 10
print(f"dz/dx = {x.grad}") # 2x + y = 7
print(f"dz/dy = {y.grad}") # x = 2
1.3 流形上的优化¶
约束优化与拉格朗日乘子法¶
问题: \(\min_x f(x)\) s.t. \(g(x) = 0\)
拉格朗日函数: \(\mathcal{L}(x, \lambda) = f(x) + \lambda^T g(x)\)
KKT条件: 1. 稳定性: \(\nabla_x \mathcal{L} = 0\) 2. 原始可行性: \(g(x) = 0\) 3. 对偶可行性: \(\lambda \geq 0\)(不等式约束) 4. 互补松弛: \(\lambda_i g_i(x) = 0\)
from scipy.optimize import minimize
def constrained_optimization_example():
"""
示例: 在球面上最小化二次函数
min x^T Q x
s.t. ||x||^2 = 1
"""
Q = np.diag([1, 2, 3]) # 目标函数
def objective(x):
return x.T @ Q @ x
def gradient(x):
return 2 * Q @ x
# 约束: ||x||^2 = 1
def constraint(x):
return np.sum(x**2) - 1
# 使用SLSQP方法
result = minimize(
objective,
x0=np.array([1, 0, 0]), # np.array创建NumPy数组
method='SLSQP',
jac=gradient,
constraints={'type': 'eq', 'fun': constraint}
)
print(f"最优解: {result.x}")
print(f"最优值: {result.fun}")
print(f"约束满足: {constraint(result.x):.10f}")
# 解析解应该是Q的最小特征值对应的特征向量
eigenvalues, eigenvectors = np.linalg.eig(Q)
min_idx = np.argmin(eigenvalues)
print(f"\n解析解: {eigenvectors[:, min_idx]}")
print(f"解析最优值: {eigenvalues[min_idx]}")
constrained_optimization_example()
黎曼流形上的梯度下降¶
黎曼梯度: \(\text{grad} f(x) = \text{Proj}_{T_x\mathcal{M}}(\nabla f(x))\)
投影: 将欧氏梯度投影到切空间
def project_to_sphere(x):
"""投影到单位球面"""
return x / np.linalg.norm(x)
def riemannian_gradient_descent(f, grad_f, x0, n_iter=100, lr=0.01):
"""
球面上的黎曼梯度下降
约束流形: M = {x : ||x|| = 1}
"""
x = x0.copy()
x = project_to_sphere(x) # 确保在流形上
history = []
for i in range(n_iter):
# 计算欧氏梯度
euclidean_grad = grad_f(x)
# 投影到切空间: v = (I - xx^T)g
tangent_grad = euclidean_grad - np.dot(x, euclidean_grad) * x
# 梯度下降
x_new = x - lr * tangent_grad
# 投影回流形(重正交化)
x = project_to_sphere(x_new)
history.append(f(x))
return x, history
# 示例: 在球面上最小化二次型
Q = np.diag([1, 2, 3, 4, 5])
x0 = np.random.randn(5)
x_opt, history = riemannian_gradient_descent(
lambda x: x.T @ Q @ x,
lambda x: 2 * Q @ x,
x0,
n_iter=200,
lr=0.1
)
print(f"最优解: {x_opt}")
print(f"最优值: {x_opt.T @ Q @ x_opt:.6f}")
print(f"范数: {np.linalg.norm(x_opt):.6f}")
二、概率图模型¶
2.1 贝叶斯网络¶
表示与分解¶
链式法则:
其中\(Pa(X_i)\)是\(X_i\)的父节点
条件独立性:
- 头到尾 (head-to-tail): \(A \rightarrow B \rightarrow C\),给定\(B\)时\(A \perp C\)
- 尾到尾 (tail-to-tail): \(A \leftarrow B \rightarrow C\),给定\(B\)时\(A \perp C\)
- 头到头 (head-to-head): \(A \rightarrow B \leftarrow C\),给定\(B\)或\(B\)的后代时\(A \not\perp C\)
class BayesianNetwork:
"""简单的贝叶斯网络实现"""
def __init__(self):
self.graph = {} # 邻接表
self.cpts = {} # 条件概率表
def add_node(self, node, parents=None, cpt=None):
"""添加节点"""
if parents is None:
parents = []
self.graph[node] = parents
self.cpts[node] = cpt
def get_joint_probability(self, assignment):
"""计算联合概率"""
prob = 1.0
for node, value in assignment.items():
parents = self.graph[node]
parent_values = tuple(assignment[p] for p in parents) # 列表推导式,简洁创建列表
if parents:
prob *= self.cpts[node][parent_values][value]
else:
prob *= self.cpts[node][value]
return prob
def get_marginal_probability(self, node, evidence=None):
"""计算条件概率 P(node=1 | evidence)(枚举推理)"""
if evidence is None:
evidence = {}
# 需要边缘化掉的变量(排除查询节点和证据节点)
all_nodes = list(self.graph.keys())
marginal_nodes = [n for n in all_nodes if n != node and n not in evidence]
def sum_over(nodes, fixed_assignment):
"""枚举nodes的所有赋值,累加联合概率"""
if not nodes:
assignment = {**evidence, **fixed_assignment}
return self.get_joint_probability(assignment)
total = 0.0
n = nodes[0]
for value in [0, 1]: # 假设二值变量
fixed_assignment[n] = value
total += sum_over(nodes[1:], fixed_assignment) # 切片操作取子序列
del fixed_assignment[n]
return total
# 对查询节点的每个取值求边缘概率
prob_node_1 = sum_over(marginal_nodes, {node: 1})
prob_node_0 = sum_over(marginal_nodes, {node: 0})
# 归一化得到条件概率
total_evidence = prob_node_0 + prob_node_1
if total_evidence == 0:
return 0.0
return prob_node_1 / total_evidence
# 示例: 简单的疾病诊断网络
# 结构: Smoking -> LungCancer <- Pollution
# -> Bronchitis
bn = BayesianNetwork()
# 先验概率
bn.add_node('Smoking', cpt={0: 0.7, 1: 0.3})
bn.add_node('Pollution', cpt={0: 0.9, 1: 0.1})
# 条件概率表
bn.add_node('LungCancer',
parents=['Smoking', 'Pollution'],
cpt={
(0, 0): {0: 0.99, 1: 0.01},
(0, 1): {0: 0.95, 1: 0.05},
(1, 0): {0: 0.90, 1: 0.10},
(1, 1): {0: 0.70, 1: 0.30}
})
bn.add_node('Bronchitis',
parents=['Smoking'],
cpt={
0: {0: 0.95, 1: 0.05},
1: {0: 0.60, 1: 0.40}
})
# 查询: 给定吸烟,患肺癌的概率
prob = bn.get_marginal_probability('LungCancer', evidence={'Smoking': 1})
print(f"P(LungCancer | Smoking) = {prob:.4f}")
2.2 马尔可夫随机场¶
因子分解¶
吉布斯分布:
其中: - \(\mathcal{C}\)是团(clique)的集合 - \(\psi_c\)是势函数(非负) - \(Z = \sum_X \prod_{c} \psi_c(X_c)\)是配分函数
对数线性模型:
class MarkovRandomField:
"""马尔可夫随机场"""
def __init__(self, variables, cliques):
"""
variables: 变量列表
cliques: 团列表,每个团是变量的子集
"""
self.variables = variables
self.cliques = cliques
self.potentials = {}
def set_potential(self, clique_idx, potential_func):
"""设置势函数"""
self.potentials[clique_idx] = potential_func
def compute_partition_function(self):
"""计算配分函数(枚举所有赋值)"""
n = len(self.variables)
Z = 0.0
# 枚举所有2^n种赋值
for assignment_bits in range(2**n):
assignment = {}
for i, var in enumerate(self.variables): # enumerate同时获取索引和元素
assignment[var] = (assignment_bits >> i) & 1
# 计算非归一化概率
energy = 0.0
for idx, clique in enumerate(self.cliques):
clique_assignment = tuple(assignment[v] for v in clique)
if idx in self.potentials:
energy += self.potentials[idx](clique_assignment)
Z += np.exp(energy)
return Z
def gibbs_sampling(self, n_samples=1000, burn_in=100):
"""Gibbs采样"""
# 随机初始化
state = {var: np.random.randint(2) for var in self.variables}
samples = []
for i in range(n_samples + burn_in):
for var in self.variables:
# 计算条件概率 P(var | 其他变量)
probs = []
for val in [0, 1]:
state[var] = val
# 计算能量
energy = 0.0
for idx, clique in enumerate(self.cliques):
if var in clique:
clique_assignment = tuple(state[v] for v in clique)
if idx in self.potentials:
energy += self.potentials[idx](clique_assignment)
probs.append(np.exp(energy))
# 归一化并采样
probs = np.array(probs) / sum(probs)
state[var] = np.random.choice([0, 1], p=probs)
if i >= burn_in:
samples.append(state.copy())
return samples
# 示例: Ising模型(格子上的MRF)
def create_ising_model(n, J=1.0, h=0.0):
"""
创建Ising模型
能量: E = -J * Σ_{i~j} x_i x_j - h * Σ_i x_i
"""
variables = [(i, j) for i in range(n) for j in range(n)]
# 团:相邻的变量对
cliques = []
for i in range(n):
for j in range(n):
if i < n - 1:
cliques.append([(i, j), (i+1, j)])
if j < n - 1:
cliques.append([(i, j), (i, j+1)])
mrf = MarkovRandomField(variables, cliques)
# 设置势函数
for idx in range(len(cliques)):
# 工厂函数make_potential解决Python闭包延迟绑定问题:若直接用lambda,循环结束后所有势函数的j都会指向最后一个值
def make_potential(j):
return lambda assignment: J * (2*assignment[0] - 1) * (2*assignment[1] - 1)
mrf.set_potential(idx, make_potential(idx))
return mrf
# 创建3x3 Ising模型
ising = create_ising_model(3, J=0.5)
samples = ising.gibbs_sampling(n_samples=1000)
# 计算平均磁化
magnetization = np.mean([np.mean(list(s.values())) for s in samples])
print(f"平均磁化: {magnetization:.4f}")
2.3 因子图与和积算法¶
因子图表示¶
因子图:二分图,包含变量节点和因子节点
和积算法(信念传播):
- 变量到因子的消息:\(\mu_{x \to f}(x) = \prod_{h \in N(x) \setminus f} \mu_{h \to x}(x)\)
- 因子到变量的消息:\(\mu_{f \to x}(x) = \sum_{x'} f(x, x') \prod_{y \in N(f) \setminus x} \mu_{y \to f}(y)\)
class FactorGraph:
"""因子图与和积算法"""
def __init__(self):
self.variables = {} # 变量 -> 邻居因子
self.factors = {} # 因子 -> (变量列表, 因子函数)
self.messages = {} # 消息缓存
def add_variable(self, var, domain):
"""添加变量"""
self.variables[var] = {'domain': domain, 'neighbors': []}
def add_factor(self, factor_id, variables, factor_func):
"""添加因子"""
self.factors[factor_id] = (variables, factor_func)
for var in variables:
self.variables[var]['neighbors'].append(factor_id)
def sum_product(self, max_iter=10):
"""和积算法(信念传播)"""
# 初始化消息
for var in self.variables:
for factor in self.variables[var]['neighbors']:
self.messages[('v2f', var, factor)] = np.ones(
len(self.variables[var]['domain'])
)
self.messages[('f2v', factor, var)] = np.ones(
len(self.variables[var]['domain'])
)
# 迭代传递消息
for _ in range(max_iter):
# 变量到因子
for var in self.variables:
for factor in self.variables[var]['neighbors']:
msg = np.ones(len(self.variables[var]['domain']))
for other_factor in self.variables[var]['neighbors']:
if other_factor != factor:
msg *= self.messages[('f2v', other_factor, var)]
self.messages[('v2f', var, factor)] = msg / msg.sum()
# 因子到变量
for factor_id, (vars_in_factor, factor_func) in self.factors.items():
for target_var in vars_in_factor:
# 收集其他变量的消息
other_vars = [v for v in vars_in_factor if v != target_var]
# 计算消息
msg = np.zeros(len(self.variables[target_var]['domain']))
# 枚举其他变量的所有赋值
def enumerate_assignments(idx, current_assignment):
if idx == len(other_vars):
# 计算因子值
for val_idx, val in enumerate(self.variables[target_var]['domain']):
full_assignment = {**current_assignment, target_var: val}
factor_val = factor_func(full_assignment)
# 乘以其他变量的消息
msg_factor = 1.0
for other_var in other_vars:
other_val = full_assignment[other_var]
other_val_idx = self.variables[other_var]['domain'].index(other_val)
msg_factor *= self.messages[('v2f', other_var, factor_id)][other_val_idx]
msg[val_idx] += factor_val * msg_factor
return
var = other_vars[idx]
for val in self.variables[var]['domain']:
current_assignment[var] = val
enumerate_assignments(idx + 1, current_assignment)
del current_assignment[var]
enumerate_assignments(0, {})
self.messages[('f2v', factor_id, target_var)] = msg / msg.sum()
# 计算边缘分布
marginals = {}
for var in self.variables:
belief = np.ones(len(self.variables[var]['domain']))
for factor in self.variables[var]['neighbors']:
belief *= self.messages[('f2v', factor, var)]
marginals[var] = belief / belief.sum()
return marginals
# 示例: 简单的纠错码
def create_ldpc_code():
"""创建LDPC码的因子图"""
fg = FactorGraph()
# 变量(比特)
for i in range(4):
fg.add_variable(f'x{i}', [0, 1])
# 校验因子(奇偶校验)
def parity_check(assignment):
"""奇偶校验: 和为偶数时返回1,否则返回0"""
return 1.0 if sum(assignment.values()) % 2 == 0 else 0.0
fg.add_factor('f0', ['x0', 'x1', 'x2'], parity_check)
fg.add_factor('f1', ['x1', 'x2', 'x3'], parity_check)
# 观测因子(信道输出)
def likelihood(x_idx, p_flip):
"""似然因子"""
def factor(assignment):
x = assignment[f'x{x_idx}']
# 假设观测到0
return 1 - p_flip if x == 0 else p_flip
return factor
fg.add_factor('obs0', ['x0'], likelihood(0, 0.1))
fg.add_factor('obs1', ['x1'], likelihood(1, 0.1))
fg.add_factor('obs2', ['x2'], likelihood(2, 0.1))
fg.add_factor('obs3', ['x3'], likelihood(3, 0.1))
return fg
# 运行和积算法
fg = create_ldpc_code()
marginals = fg.sum_product(max_iter=5)
print("边缘分布:")
for var, dist in marginals.items():
print(f"{var}: P(0)={dist[0]:.4f}, P(1)={dist[1]:.4f}")
三、优化理论深化¶
3.1 收敛性分析¶
梯度下降的收敛率¶
凸函数:
- 步长\(\eta = \frac{1}{L}\)时:\(f(x_T) - f^* \leq \frac{L\|x_0 - x^*\|^2}{2T}\)
- 收敛率:\(O(1/T)\)
强凸函数(参数\(\mu\)):
- 步长\(\eta = \frac{1}{L}\)时:\(\|x_T - x^*\|^2 \leq \left(1 - \frac{\mu}{L}\right)^T \|x_0 - x^*\|^2\)
- 收敛率:\(O((1 - \mu/L)^T)\),即线性收敛
条件数: \(\kappa = \frac{L}{\mu}\)
def analyze_convergence():
"""分析不同条件下的收敛率"""
# 问题设置: 二次函数 f(x) = 0.5 * x^T A x - b^T x
# 其中 A 的特征值在 [mu, L] 之间
mu, L = 1, 100 # 强凸参数和平滑参数
kappa = L / mu # 条件数
# 创建病态矩阵
n = 10
eigenvalues = np.linspace(mu, L, n)
A = np.diag(eigenvalues)
b = np.ones(n)
# 最优解
x_star = np.linalg.solve(A, b)
f_star = 0.5 * x_star.T @ A @ x_star - b.T @ x_star
def gradient_descent(x0, eta, n_iter):
"""梯度下降"""
x = x0.copy()
history = []
for _ in range(n_iter):
grad = A @ x - b
x = x - eta * grad
f = 0.5 * x.T @ A @ x - b.T @ x
history.append(f - f_star)
return x, history
# 不同步长的比较
x0 = np.random.randn(n)
etas = [1/L, 2/(mu + L), 1/mu]
labels = ['η=1/L (保守)', 'η=2/(μ+L) (最优)', 'η=1/μ (激进)']
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
for eta, label in zip(etas, labels):
_, history = gradient_descent(x0, eta, 200)
plt.semilogy(history, label=label)
# 理论收敛率
T = np.arange(200)
theory = 0.5 * L * np.linalg.norm(x0 - x_star)**2 / (T + 1)
plt.semilogy(theory, '--', label='Theory: O(1/T)', alpha=0.5)
plt.xlabel('Iteration')
plt.ylabel('f(x) - f* (log scale)')
plt.title('Gradient Descent Convergence')
plt.legend()
plt.grid(True)
plt.show()
print(f"条件数 κ = {kappa:.2f}")
analyze_convergence()
随机梯度下降的收敛性¶
凸函数:
步长\(\eta_t = \frac{\eta_0}{\sqrt{t}}\)时:
强凸函数:
步长\(\eta_t = \frac{1}{\mu t}\)时:
def sgd_convergence_analysis():
"""SGD收敛性分析"""
np.random.seed(42)
# 线性回归: min ||Ax - b||^2
n, d = 1000, 20
A = np.random.randn(n, d)
x_true = np.random.randn(d)
b = A @ x_true + 0.1 * np.random.randn(n)
# 计算最优解
x_star = np.linalg.lstsq(A, b, rcond=None)[0]
# 强凸参数
mu = 2 * np.min(np.linalg.eigvals(A.T @ A)) / n
L = 2 * np.max(np.linalg.eigvals(A.T @ A)) / n
def sgd(x0, lr_schedule, n_epochs, batch_size=1):
"""SGD"""
x = x0.copy()
history = []
for epoch in range(n_epochs):
indices = np.random.permutation(n)
for i in range(0, n, batch_size):
batch_idx = indices[i:i+batch_size]
A_batch = A[batch_idx]
b_batch = b[batch_idx]
# 随机梯度
grad = 2 * A_batch.T @ (A_batch @ x - b_batch) / len(batch_idx)
# 学习率
t = epoch * (n // batch_size) + i // batch_size
lr = lr_schedule(t)
x = x - lr * grad
# 记录误差
error = np.linalg.norm(x - x_star)**2
history.append(error)
return history
x0 = np.zeros(d)
n_epochs = 50
# 不同学习率调度
# 策略模式:字典值为lambda函数,通过schedules['decay'](t)调用对应的学习率计算公式
schedules = {
'constant': lambda t: 0.01,
'decay': lambda t: 0.1 / (1 + 0.01 * t),
'optimal': lambda t: 1 / (mu * (t + 1))
}
plt.figure(figsize=(10, 6))
for name, schedule in schedules.items():
history = sgd(x0, schedule, n_epochs)
plt.semilogy(history, label=name)
# 理论收敛率 O(1/T)
T = np.arange(1, n_epochs + 1)
theory = 1 / T
plt.semilogy(theory / theory[0] * 10, '--', label='Theory: O(1/T)', alpha=0.5)
plt.xlabel('Epoch')
plt.ylabel('||x - x*||^2 (log scale)')
plt.title('SGD Convergence Analysis')
plt.legend()
plt.grid(True)
plt.show()
sgd_convergence_analysis()
3.2 复杂度理论¶
一阶 oracle 复杂度下界¶
定理: 对于\(L\)-光滑、\(\mu\)-强凸函数,任何一阶方法的迭代复杂度下界为:
加速梯度方法(如Nesterov加速)达到此下界。
def compare_optimization_methods():
"""比较不同优化方法的收敛速度"""
# 病态二次函数
n = 100
kappa = 1000 # 条件数
eigenvalues = np.linspace(1, kappa, n)
A = np.diag(eigenvalues)
b = np.ones(n)
x_star = np.linalg.solve(A, b)
def gradient_descent(x0, eta, n_iter):
"""普通梯度下降"""
x = x0.copy()
history = []
for _ in range(n_iter):
grad = A @ x - b
x = x - eta * grad
error = np.linalg.norm(x - x_star)
history.append(error)
return history
def nesterov_acceleration(x0, eta, n_iter):
"""Nesterov加速梯度下降"""
x = x0.copy()
y = x0.copy()
history = []
for t in range(n_iter):
x_new = y - eta * (A @ y - b)
y = x_new + (t / (t + 3)) * (x_new - x)
x = x_new
error = np.linalg.norm(x - x_star)
history.append(error)
return history
def conjugate_gradient(x0, n_iter):
"""共轭梯度法"""
x = x0.copy()
r = b - A @ x
p = r.copy()
history = []
for _ in range(n_iter):
Ap = A @ p
alpha = np.dot(r, r) / np.dot(p, Ap)
x = x + alpha * p
r_new = r - alpha * Ap
beta = np.dot(r_new, r_new) / np.dot(r, r)
p = r_new + beta * p
r = r_new
error = np.linalg.norm(x - x_star)
history.append(error)
return history
x0 = np.random.randn(n)
n_iter = 200
# 运行各种方法
gd_history = gradient_descent(x0, 2/(1+kappa), n_iter)
nesterov_history = nesterov_acceleration(x0, 1/kappa, n_iter)
cg_history = conjugate_gradient(x0, n_iter)
# 绘制结果
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.semilogy(gd_history, label='Gradient Descent')
plt.semilogy(nesterov_history, label='Nesterov Acceleration')
plt.semilogy(cg_history, label='Conjugate Gradient')
plt.xlabel('Iteration')
plt.ylabel('||x - x*||')
plt.title('Convergence Comparison')
plt.legend()
plt.grid(True)
plt.subplot(1, 2, 2)
# 前50次迭代
plt.semilogy(gd_history[:50], label='Gradient Descent')
plt.semilogy(nesterov_history[:50], label='Nesterov Acceleration')
plt.semilogy(cg_history[:50], label='Conjugate Gradient')
plt.xlabel('Iteration')
plt.ylabel('||x - x*||')
plt.title('First 50 Iterations')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
print(f"条件数: {kappa}")
print(f"GD最终误差: {gd_history[-1]:.2e}")
print(f"Nesterov最终误差: {nesterov_history[-1]:.2e}")
print(f"CG最终误差: {cg_history[-1]:.2e}")
compare_optimization_methods()
3.3 随机优化的收敛保证¶
方差缩减方法¶
SVRG (Stochastic Variance Reduced Gradient):
其中\(\tilde{\mu} = \nabla f(\tilde{x})\)是全梯度,每隔\(m\)次迭代计算一次。
收敛率: \(O((n + \kappa) \log(1/\epsilon))\),远优于SGD的\(O(1/\epsilon)\)
class VarianceReductionMethods:
"""方差缩减方法实现"""
def __init__(self, A, b):
self.A = A
self.b = b
self.n = A.shape[0]
def full_gradient(self, x):
"""计算全梯度"""
return 2 * self.A.T @ (self.A @ x - self.b) / self.n
def stochastic_gradient(self, x, i):
"""计算随机梯度"""
a_i = self.A[i]
return 2 * a_i * (a_i @ x - self.b[i])
def svrg(self, x0, n_epochs, inner_iter, lr):
"""SVRG算法"""
x = x0.copy()
history = []
for epoch in range(n_epochs):
# 计算全梯度
x_tilde = x.copy()
mu_tilde = self.full_gradient(x_tilde)
for t in range(inner_iter):
# 随机采样
i = np.random.randint(self.n)
# 方差缩减梯度
grad = (self.stochastic_gradient(x, i) -
self.stochastic_gradient(x_tilde, i) + mu_tilde)
x = x - lr * grad
error = np.linalg.norm(self.full_gradient(x))
history.append(error)
return x, history
def saga(self, x0, n_epochs, lr):
"""SAGA算法"""
x = x0.copy()
history = []
# 存储历史梯度
grad_history = np.zeros((self.n, len(x0)))
for i in range(self.n):
grad_history[i] = self.stochastic_gradient(x, i)
grad_avg = grad_history.mean(axis=0)
for epoch in range(n_epochs):
for _ in range(self.n):
i = np.random.randint(self.n)
# 新梯度
grad_new = self.stochastic_gradient(x, i)
# SAGA更新
grad = grad_new - grad_history[i] + grad_avg
x = x - lr * grad
# 更新历史
grad_avg += (grad_new - grad_history[i]) / self.n
grad_history[i] = grad_new
error = np.linalg.norm(self.full_gradient(x))
history.append(error)
return x, history
def test_variance_reduction():
"""测试方差缩减方法"""
np.random.seed(42)
# 生成数据
n, d = 1000, 50
A = np.random.randn(n, d)
x_true = np.random.randn(d)
b = A @ x_true + 0.1 * np.random.randn(n)
vr = VarianceReductionMethods(A, b)
x0 = np.zeros(d)
# 运行SVRG
x_svrg, history_svrg = vr.svrg(x0, n_epochs=20, inner_iter=n, lr=0.01)
# 运行SAGA
x_saga, history_saga = vr.saga(x0, n_epochs=20, lr=0.01)
# 运行普通SGD
def sgd(x0, n_epochs, lr):
x = x0.copy()
history = []
for epoch in range(n_epochs):
for _ in range(n):
i = np.random.randint(n)
grad = vr.stochastic_gradient(x, i)
x = x - lr * grad
error = np.linalg.norm(vr.full_gradient(x))
history.append(error)
return history
history_sgd = sgd(x0, 20, 0.001)
# 绘制结果
plt.figure(figsize=(10, 6))
plt.semilogy(history_sgd, label='SGD', alpha=0.7)
plt.semilogy(history_svrg, label='SVRG', linewidth=2)
plt.semilogy(history_saga, label='SAGA', linewidth=2)
plt.xlabel('Epoch')
plt.ylabel('Gradient Norm (log scale)')
plt.title('Variance Reduction Methods')
plt.legend()
plt.grid(True)
plt.show()
print(f"SGD最终梯度范数: {history_sgd[-1]:.4f}")
print(f"SVRG最终梯度范数: {history_svrg[-1]:.4f}")
print(f"SAGA最终梯度范数: {history_saga[-1]:.4f}")
test_variance_reduction()
四、泛函分析基础¶
4.1 希尔伯特空间¶
定义与性质¶
希尔伯特空间:完备的内积空间
内积性质: 1. 对称性: \(\langle x, y \rangle = \langle y, x \rangle\) 2. 线性: \(\langle ax + by, z \rangle = a\langle x, z \rangle + b\langle y, z \rangle\) 3. 正定性: \(\langle x, x \rangle \geq 0\),且\(=0\)当且仅当\(x=0\)
范数: \(\|x\| = \sqrt{\langle x, x \rangle}\)
重要定理: - Cauchy-Schwarz不等式: \(|\langle x, y \rangle| \leq \|x\| \|y\|\) - 正交投影定理 - Riesz表示定理
class HilbertSpaceExamples:
"""希尔伯特空间的例子"""
@staticmethod # @staticmethod静态方法,无需实例即可调用
def euclidean_space():
"""
R^n: 欧氏空间
内积: <x, y> = x^T y
"""
x = np.array([1, 2, 3])
y = np.array([4, 5, 6])
inner_product = np.dot(x, y)
norm_x = np.linalg.norm(x)
norm_y = np.linalg.norm(y)
print("欧氏空间 R^3:")
print(f" <x, y> = {inner_product}")
print(f" ||x|| = {norm_x:.4f}")
print(f" ||y|| = {norm_y:.4f}")
# 验证Cauchy-Schwarz
cs_bound = norm_x * norm_y
print(f" |<x, y>| = {abs(inner_product):.4f} <= {cs_bound:.4f}")
# 正交投影
proj_y_on_x = (inner_product / norm_x**2) * x
print(f" y在x上的投影: {proj_y_on_x}")
@staticmethod
def l2_space():
"""
L^2[a,b]: 平方可积函数空间
内积: <f, g> = ∫_a^b f(x)g(x) dx
"""
from scipy import integrate
# 函数
f = lambda x: np.sin(x)
g = lambda x: np.cos(x)
# 内积
inner_product, _ = integrate.quad(
lambda x: f(x) * g(x), 0, 2*np.pi
)
# 范数
norm_f, _ = integrate.quad(lambda x: f(x)**2, 0, 2*np.pi)
norm_f = np.sqrt(norm_f)
norm_g, _ = integrate.quad(lambda x: g(x)**2, 0, 2*np.pi)
norm_g = np.sqrt(norm_g)
print("\nL^2[0, 2π] 空间:")
print(f" <sin, cos> = {inner_product:.6f} (≈ 0, 正交)")
print(f" ||sin|| = {norm_f:.4f}")
print(f" ||cos|| = {norm_g:.4f}")
@staticmethod
def fourier_series():
"""傅里叶级数作为正交基"""
# 在L^2[0, 2π]中,{1, cos(nx), sin(nx)}是正交基
from scipy import integrate
print("\n傅里叶级数正交性:")
# 验证正交性
for n in [1, 2, 3]:
for m in [1, 2, 3]:
inner, _ = integrate.quad(
lambda x: np.sin(n*x) * np.sin(m*x),
0, 2*np.pi
)
print(f" <sin({n}x), sin({m}x)> = {inner:.6f}", end="")
if n == m:
print(f" (应该 = π)")
else:
print(f" (应该 = 0)")
# 运行示例
HilbertSpaceExamples.euclidean_space()
HilbertSpaceExamples.l2_space()
HilbertSpaceExamples.fourier_series()
4.2 再生核希尔伯特空间(RKHS)¶
核函数与RKHS¶
定义: 希尔伯特空间\(\mathcal{H}\)是RKHS,如果存在再生核\(K(x, \cdot) \in \mathcal{H}\)使得:
核技巧: 在高维特征空间中计算内积,而无需显式计算特征映射
Mercer定理: 对称半正定核对应一个RKHS
class KernelMethods:
"""核方法实现"""
@staticmethod
def linear_kernel(x, y):
"""线性核"""
return np.dot(x, y)
@staticmethod
def polynomial_kernel(x, y, degree=3, coef0=1):
"""多项式核"""
return (np.dot(x, y) + coef0) ** degree
@staticmethod
def rbf_kernel(x, y, gamma=1.0):
"""RBF/高斯核"""
return np.exp(-gamma * np.linalg.norm(x - y)**2)
@staticmethod
def compute_kernel_matrix(X, kernel_func):
"""计算核矩阵"""
n = X.shape[0]
K = np.zeros((n, n))
for i in range(n):
for j in range(i, n):
K[i, j] = kernel_func(X[i], X[j])
K[j, i] = K[i, j]
return K
@staticmethod
def kernel_ridge_regression(X, y, kernel_func, lambda_reg=0.1):
"""
核岭回归
min ||y - Kα||^2 + λ α^T K α
解: α = (K + λI)^(-1) y
"""
K = KernelMethods.compute_kernel_matrix(X, kernel_func)
n = K.shape[0]
alpha = np.linalg.solve(K + lambda_reg * np.eye(n), y)
return alpha
@staticmethod
def predict(X_train, X_test, alpha, kernel_func):
"""使用核方法预测"""
n_test = X_test.shape[0]
n_train = X_train.shape[0]
K_test = np.zeros((n_test, n_train))
for i in range(n_test):
for j in range(n_train):
K_test[i, j] = kernel_func(X_test[i], X_train[j])
return K_test @ alpha
def demonstrate_kernel_trick():
"""演示核技巧"""
np.random.seed(42)
# 生成非线性数据
n = 100
X = np.linspace(-3, 3, n).reshape(-1, 1) # 链式调用,连续执行多个方法 # reshape重塑张量形状
y = np.sin(X).ravel() + 0.1 * np.random.randn(n)
# 划分训练测试
train_idx = np.random.choice(n, 80, replace=False)
test_idx = np.array([i for i in range(n) if i not in train_idx])
X_train, y_train = X[train_idx], y[train_idx]
X_test, y_test = X[test_idx], y[test_idx]
# 线性核(等价于线性回归)
alpha_linear = KernelMethods.kernel_ridge_regression(
X_train, y_train,
KernelMethods.linear_kernel,
lambda_reg=0.1
)
y_pred_linear = KernelMethods.predict(
X_train, X_test, alpha_linear, KernelMethods.linear_kernel
)
# RBF核
alpha_rbf = KernelMethods.kernel_ridge_regression(
X_train, y_train,
lambda x, y: KernelMethods.rbf_kernel(x, y, gamma=0.5),
lambda_reg=0.01
)
y_pred_rbf = KernelMethods.predict(
X_train, X_test, alpha_rbf,
lambda x, y: KernelMethods.rbf_kernel(x, y, gamma=0.5)
)
# 计算误差
mse_linear = np.mean((y_test - y_pred_linear)**2)
mse_rbf = np.mean((y_test - y_pred_rbf)**2)
print("核岭回归结果:")
print(f" 线性核 MSE: {mse_linear:.4f}")
print(f" RBF核 MSE: {mse_rbf:.4f}")
# 可视化
X_plot = np.linspace(-3, 3, 200).reshape(-1, 1)
y_plot_linear = KernelMethods.predict(
X_train, X_plot, alpha_linear, KernelMethods.linear_kernel
)
y_plot_rbf = KernelMethods.predict(
X_train, X_plot, alpha_rbf,
lambda x, y: KernelMethods.rbf_kernel(x, y, gamma=0.5)
)
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(X_train, y_train, alpha=0.5, label='Train')
plt.scatter(X_test, y_test, alpha=0.5, label='Test')
plt.plot(X_plot, y_plot_linear, 'r-', label='Linear Kernel')
plt.legend()
plt.title(f'Linear Kernel (MSE={mse_linear:.4f})')
plt.subplot(1, 2, 2)
plt.scatter(X_train, y_train, alpha=0.5, label='Train')
plt.scatter(X_test, y_test, alpha=0.5, label='Test')
plt.plot(X_plot, y_plot_rbf, 'r-', label='RBF Kernel')
plt.legend()
plt.title(f'RBF Kernel (MSE={mse_rbf:.4f})')
plt.tight_layout()
plt.show()
demonstrate_kernel_trick()
4.3 核方法理论¶
表示定理¶
定理: 对于RKHS中的正则化问题:
解具有形式:
这意味着最优解位于数据点张成的子空间中。
def demonstrate_representer_theorem():
"""
演示表示定理
核岭回归的解自动满足表示定理
"""
np.random.seed(42)
# 生成数据
n = 50
X = np.random.randn(n, 2)
y = np.sin(X[:, 0]) + np.cos(X[:, 1]) + 0.1 * np.random.randn(n)
# 核岭回归
gamma = 0.5
lambda_reg = 0.01
K = np.zeros((n, n))
for i in range(n):
for j in range(n):
K[i, j] = np.exp(-gamma * np.linalg.norm(X[i] - X[j])**2)
alpha = np.linalg.solve(K + lambda_reg * np.eye(n), y)
# 验证表示定理: f(x) = Σ α_i K(x_i, x)
def f(x):
return sum(alpha[i] * np.exp(-gamma * np.linalg.norm(X[i] - x)**2)
for i in range(n))
# 在训练点上,预测应该接近真实值
predictions = [f(X[i]) for i in range(n)]
print("表示定理验证:")
print(f" 训练误差: {np.mean((y - predictions)**2):.6f}")
print(f" 非零系数比例: {np.mean(np.abs(alpha) > 1e-6):.2%}")
# 可视化系数
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.stem(alpha)
plt.xlabel('Data Point Index')
plt.ylabel('Coefficient α')
plt.title('Coefficients in Kernel Expansion')
plt.subplot(1, 2, 2)
plt.scatter(y, predictions, alpha=0.6)
plt.plot([y.min(), y.max()], [y.min(), y.max()], 'r--')
plt.xlabel('True Value')
plt.ylabel('Predicted Value')
plt.title('Prediction Quality')
plt.tight_layout()
plt.show()
demonstrate_representer_theorem()
五、信息论进阶¶
5.1 变分表示¶
Donsker-Varadhan变分表示¶
KL散度的变分表示:
应用: 互信息估计、表示学习
class VariationalRepresentation:
"""信息论的变分表示"""
@staticmethod
def dv_representation_estimation(p_samples, q_samples, n_iterations=1000):
"""
使用Donsker-Varadhan变分表示估计KL散度
D_KL(P||Q) ≈ max_T E_P[T] - log E_Q[e^T]
"""
# 简单的统计量作为T函数
# 实际应用中通常使用神经网络
def compute_stats(samples):
"""计算统计量"""
return np.array([
samples.mean(),
samples.std(),
np.percentile(samples, 25),
np.percentile(samples, 75),
np.median(samples)
])
# 估计统计量
p_stats = compute_stats(p_samples)
q_stats = compute_stats(q_samples)
# 简单的线性组合
def T(x, weights):
x_stats = compute_stats(x)
return np.dot(weights, x_stats)
# 优化权重(简化版本)
best_estimate = -np.inf
for _ in range(n_iterations):
weights = np.random.randn(5)
T_p = T(p_samples, weights)
T_q = np.mean([T(q_samples[i:i+10], weights)
for i in range(0, len(q_samples), 10)])
# Donsker-Varadhan
estimate = T_p - np.log(np.mean(np.exp([
T(q_samples[i:i+10], weights)
for i in range(0, len(q_samples), 10)
])))
if estimate > best_estimate:
best_estimate = estimate
return best_estimate
@staticmethod
def mutual_information_dv(x_samples, y_samples, n_bins=10):
"""
使用DV表示估计互信息
I(X;Y) = D_KL(P(X,Y) || P(X)P(Y))
"""
# 离散化
def discretize(samples, n_bins):
bins = np.linspace(samples.min(), samples.max(), n_bins+1)
return np.digitize(samples, bins)
x_discrete = discretize(x_samples, n_bins)
y_discrete = discretize(y_samples, n_bins)
# 估计联合分布和边缘分布
joint = np.zeros((n_bins+2, n_bins+2))
for x, y in zip(x_discrete, y_discrete):
joint[x, y] += 1
joint /= len(x_samples)
marginal_x = joint.sum(axis=1)
marginal_y = joint.sum(axis=0)
# 计算互信息
mi = 0.0
for i in range(n_bins+2):
for j in range(n_bins+2):
if joint[i, j] > 0 and marginal_x[i] > 0 and marginal_y[j] > 0:
mi += joint[i, j] * np.log(
joint[i, j] / (marginal_x[i] * marginal_y[j])
)
return mi
# 示例
np.random.seed(42)
# 生成相关数据
rho = 0.8
cov = [[1, rho], [rho, 1]]
data = np.random.multivariate_normal([0, 0], cov, 1000)
x, y = data[:, 0], data[:, 1]
mi_estimate = VariationalRepresentation.mutual_information_dv(x, y)
print(f"估计的互信息: {mi_estimate:.4f}")
print(f"理论值: {-0.5 * np.log(1 - rho**2):.4f}")
5.2 信息瓶颈理论¶
信息瓶颈原理¶
目标: 找到表示\(T\)使得:
- \(I(X; T)\): 压缩(信息损失)
- \(I(T; Y)\): 预测能力
- \(\beta\): 权衡参数
最优解:
class InformationBottleneck:
"""信息瓶颈方法"""
def __init__(self, n_clusters, beta=1.0, max_iter=100):
self.n_clusters = n_clusters
self.beta = beta
self.max_iter = max_iter
def fit(self, X, y):
"""
拟合信息瓶颈
简化实现:使用聚类作为表示
"""
from sklearn.cluster import KMeans
n_samples = X.shape[0]
# 初始化聚类
kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
cluster_labels = kmeans.fit_predict(X)
# 计算互信息
def compute_mi(labels1, labels2):
"""计算离散互信息"""
joint = np.zeros((len(np.unique(labels1)), len(np.unique(labels2))))
for i, j in zip(labels1, labels2):
joint[i, j] += 1
joint /= len(labels1)
marginal1 = joint.sum(axis=1)
marginal2 = joint.sum(axis=0)
mi = 0.0
for i in range(joint.shape[0]):
for j in range(joint.shape[1]):
if joint[i, j] > 0:
mi += joint[i, j] * np.log(
joint[i, j] / (marginal1[i] * marginal2[j])
)
return mi
# 离散化X(简化处理)
X_discrete = cluster_labels
I_X_T = compute_mi(X_discrete, cluster_labels) # I(X;T)
I_T_Y = compute_mi(cluster_labels, y) # I(T;Y)
# 信息瓶颈目标
objective = I_X_T - self.beta * I_T_Y
print(f"信息瓶颈分析:")
print(f" I(X; T) = {I_X_T:.4f}")
print(f" I(T; Y) = {I_T_Y:.4f}")
print(f" 目标函数 = {objective:.4f}")
self.cluster_centers_ = kmeans.cluster_centers_
self.labels_ = cluster_labels
return self
def transform(self, X):
"""转换数据"""
from sklearn.metrics import pairwise_distances
distances = pairwise_distances(X, self.cluster_centers_)
return np.argmin(distances, axis=1)
# 示例: 在MNIST上使用信息瓶颈
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
digits = load_digits()
X, y = digits.data, digits.target
# 不同beta值的效果
betas = [0.1, 1.0, 10.0]
plt.figure(figsize=(15, 4))
for i, beta in enumerate(betas):
ib = InformationBottleneck(n_clusters=10, beta=beta)
ib.fit(X, y)
plt.subplot(1, 3, i+1)
# 可视化聚类中心
for j in range(10):
plt.subplot(1, 3, i+1)
center = ib.cluster_centers_[j].reshape(8, 8)
plt.imshow(center, cmap='gray')
plt.title(f'β={beta}')
plt.axis('off')
break # 只显示第一个
plt.tight_layout()
plt.show()
5.3 率失真理论¶
率失真函数¶
问题: 在允许一定失真的情况下,最小化编码率
高斯源的率失真函数:
def rate_distortion_gaussian(sigma2, D):
"""
高斯源的率失真函数
sigma2: 源方差
D: 允许失真
"""
if D >= sigma2:
return 0
elif D <= 0:
return np.inf
else:
return 0.5 * np.log2(sigma2 / D)
def plot_rate_distortion():
"""绘制率失真曲线"""
sigma2 = 1.0
D_range = np.linspace(0.01, 2*sigma2, 100)
R = [rate_distortion_gaussian(sigma2, D) for D in D_range]
plt.figure(figsize=(10, 6))
plt.plot(D_range, R, 'b-', linewidth=2)
plt.axhline(y=0, color='r', linestyle='--', alpha=0.5)
plt.axvline(x=sigma2, color='g', linestyle='--', alpha=0.5, label='D=σ²')
plt.xlabel('Distortion D')
plt.ylabel('Rate R (bits)')
plt.title('Rate-Distortion Function for Gaussian Source')
plt.grid(True)
plt.legend()
plt.show()
print("关键观察:")
print(f" 当 D = σ²/4 时, R = {rate_distortion_gaussian(sigma2, sigma2/4):.4f} bits")
print(f" 当 D = σ²/10 时, R = {rate_distortion_gaussian(sigma2, sigma2/10):.4f} bits")
print(f" 当 D ≥ σ² 时, R = 0 (不需要传输)")
plot_rate_distortion()
总结¶
本文档深入探讨了机器学习的数学基础,包括:
- 矩阵微积分进阶: 反向传播的矩阵形式、高阶导数、流形优化
- 概率图模型: 贝叶斯网络、马尔可夫随机场、因子图与和积算法
- 优化理论深化: 收敛性分析、复杂度理论、方差缩减方法
- 泛函分析基础: 希尔伯特空间、RKHS、核方法理论
- 信息论进阶: 变分表示、信息瓶颈、率失真理论
这些内容为理解现代机器学习算法提供了坚实的理论基础。
下一步: 继续学习 02-核心理论.md,深入理解泛化理论、表示学习和优化景观。