10 - 神经符号AI与持续学习¶
学习时间: 3-4小时 重要性: ⭐⭐⭐⭐ 走向更可靠、可解释的AI 前置知识: 深度学习基础、知识图谱概念
🎯 学习目标¶
完成本章后,你将能够: - 理解神经符号AI(Neuro-Symbolic AI)的动机与核心框架 - 掌握神经网络与符号推理结合的主要范式 - 了解持续学习(Continual Learning)的灾难性遗忘问题与解决方案 - 了解归一化流(Normalizing Flows)的基本原理
Part I: 神经符号AI¶
1. 为什么需要神经符号AI?¶
1.1 两种AI范式的对比¶
Text Only
符号AI (传统) vs 神经网络 (现代)
├── 逻辑规则、知识库 ├── 数据驱动、端到端学习
├── 可解释、可验证 ├── 黑箱、难解释
├── 需要手工编写规则 ├── 自动从数据中学习
├── 处理结构化推理很强 ├── 处理感知(图像/语音)很强
├── 难以处理不确定性 ├── 天然处理噪声和不确定性
├── 泛化能力有限 ├── 可泛化到训练分布
└── 数据效率高(但需专家) └── 数据效率低(需大量数据)
神经符号AI: 结合两者优势
├── 用神经网络做感知(看/听)
├── 用符号系统做推理(想/判断)
└── 目标: 可解释 + 可学习 + 可推理
1.2 现实问题驱动¶
| 纯神经网络的不足 | 神经符号AI如何解决 |
|---|---|
| 大模型"幻觉"问题 | 符号验证层检查逻辑一致性 |
| 数学推理不可靠 | 符号计算器处理精确运算 |
| 缺乏因果推理 | 因果图 + 神经网络 |
| 不满足法规要求(可解释性) | 符号规则提供可审计的推理路径 |
| 小样本 / 零样本困难 | 先验知识以符号形式注入 |
2. 神经符号AI的主要范式¶
2.1 Kautz的六种分类(2022)¶
Text Only
Type 1: 符号为主,神经为辅
├── 例: 专家系统 + 神经网络做预处理
└── 代表: IBM Watson
Type 2: 神经为主,符号为辅(当前最主流)
├── 例: LLM + 外部工具调用(计算器、知识图谱)
└── 代表: GPT + Code Interpreter, Toolformer
Type 3: 神经符号编织
├── 例: 神经网络层之间嵌入符号推理
└── 代表: Neural Theorem Prover
Type 4: 符号知识编译到神经网络
├── 例: 将知识图谱嵌入到网络参数中
└── 代表: KGAT, Knowledge-enhanced BERT
Type 5: 神经网络生成符号规则
├── 例: 从数据中自动发现因果关系/规则
└── 代表: Neural Logic Programming
Type 6: 完全融合
├── 例: 可微分的逻辑推理
└── 代表: Logic Tensor Networks, DeepProbLog
2.2 Type 2: LLM + 工具调用(最广泛应用)¶
Python
class NeuroSymbolicLLM:
"""
Type 2 神经符号AI: LLM + 符号工具
LLM负责自然语言理解和规划
符号工具负责精确计算和知识查询
这是当前最实用的神经符号AI范式
"""
def __init__(self, llm, tools: dict): # __init__构造方法,创建对象时自动调用
"""
参数:
llm: 语言模型(神经网络部分)
tools: 工具集合(符号推理部分)
keys: 工具名称
values: 可调用的工具函数
"""
self.llm = llm
self.tools = tools
def reason(self, question: str) -> str:
"""
结合神经和符号推理回答问题
流程:
1. LLM理解问题,决定是否需要工具
2. 如需要,调用对应的符号工具
3. 将工具结果整合到最终回答中
"""
# Step 1: LLM分析问题,生成推理计划
plan = self.llm.generate(f"""
问题: {question}
可用工具: {list(self.tools.keys())}
请分析是否需要工具,如需要请给出调用计划。
""")
# Step 2: 执行工具调用(符号推理)
tool_results = {}
for tool_name, tool_fn in self.tools.items():
if tool_name in plan:
# 提取参数并调用工具
args = self._extract_args(plan, tool_name)
tool_results[tool_name] = tool_fn(**args) # *args接收任意位置参数,**kwargs接收任意关键字参数
# Step 3: LLM整合结果
answer = self.llm.generate(f"""
问题: {question}
工具结果: {tool_results}
请给出最终回答。
""")
return answer
def _extract_args(self, plan, tool_name):
"""从推理计划中提取工具参数(简化实现)"""
return {}
# 使用示例
tools = {
'calculator': lambda expr: eval(expr), # 精确计算
'knowledge_graph': lambda query: "查询结果...", # 知识查询
'theorem_prover': lambda prop: True, # 定理证明
}
2.3 Type 6: 可微分逻辑推理¶
Python
import torch
import torch.nn as nn
class DifferentiableLogic(nn.Module): # 继承nn.Module定义神经网络层
"""
可微分的逻辑推理 (概率软逻辑)
将离散的逻辑运算替换为连续可微的近似:
- AND(a, b) → min(a, b) 或 a * b
- OR(a, b) → max(a, b) 或 a + b - a*b
- NOT(a) → 1 - a
这样逻辑规则可以参与梯度反向传播
"""
def __init__(self, t_norm: str = 'product'):
"""
参数:
t_norm: T-范数类型
- 'product': 乘积 (AND=a*b, OR=a+b-a*b)
- 'godel': Gödel (AND=min, OR=max)
- 'lukasiewicz': Łukasiewicz (AND=max(0,a+b-1))
"""
super().__init__() # super()调用父类方法
self.t_norm = t_norm
def fuzzy_and(self, a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
"""可微分的AND操作"""
if self.t_norm == 'product':
return a * b
elif self.t_norm == 'godel':
return torch.min(a, b)
elif self.t_norm == 'lukasiewicz':
return torch.clamp(a + b - 1, min=0)
def fuzzy_or(self, a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
"""可微分的OR操作"""
if self.t_norm == 'product':
return a + b - a * b
elif self.t_norm == 'godel':
return torch.max(a, b)
elif self.t_norm == 'lukasiewicz':
return torch.clamp(a + b, max=1)
def fuzzy_not(self, a: torch.Tensor) -> torch.Tensor:
"""可微分的NOT操作"""
return 1 - a
def fuzzy_implies(self, a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
"""可微分的蕴含 (A → B ≡ ¬A ∨ B)"""
return self.fuzzy_or(self.fuzzy_not(a), b)
class NeuralLogicNetwork(nn.Module):
"""
神经逻辑网络: 将逻辑规则嵌入神经网络
示例任务: 关系推理
- 输入: 实体对(x, y)的特征
- 规则: parent(x,y) ∧ parent(y,z) → grandparent(x,z)
- 输出: 各种关系的概率
"""
def __init__(self, entity_dim: int, n_relations: int):
"""
参数:
entity_dim: 实体嵌入维度
n_relations: 关系类型数量
"""
super().__init__()
# 实体嵌入层
self.entity_encoder = nn.Linear(entity_dim, 128)
# 关系预测器(神经部分)
self.relation_predictor = nn.Sequential(
nn.Linear(256, 128), # 两个实体拼接
nn.ReLU(),
nn.Linear(128, n_relations),
nn.Sigmoid() # 输出概率 [0, 1]
)
# 可微分逻辑引擎(符号部分)
self.logic = DifferentiableLogic(t_norm='product')
def predict_relation(self, entity_a: torch.Tensor,
entity_b: torch.Tensor) -> torch.Tensor:
"""预测两个实体间的关系概率"""
a_emb = self.entity_encoder(entity_a)
b_emb = self.entity_encoder(entity_b)
pair = torch.cat([a_emb, b_emb], dim=-1)
return self.relation_predictor(pair)
def apply_transitivity_rule(
self,
p_parent_xy: torch.Tensor,
p_parent_yz: torch.Tensor
) -> torch.Tensor:
"""
应用传递性规则: parent(x,y) ∧ parent(y,z) → grandparent(x,z)
使用可微分逻辑保持梯度流
"""
return self.logic.fuzzy_and(p_parent_xy, p_parent_yz)
def forward(self, entities_a, entities_b, entities_c=None):
"""
前向传播: 神经预测 + 逻辑规则推导
"""
# 神经部分: 直接预测关系
direct_preds = self.predict_relation(entities_a, entities_b)
if entities_c is not None:
# 逻辑推理部分: 利用传递性增强预测
p_ab = self.predict_relation(entities_a, entities_b)
p_bc = self.predict_relation(entities_b, entities_c)
# 应用规则: parent(a,b) ∧ parent(b,c) → grandparent(a,c)
p_grandparent = self.apply_transitivity_rule(
p_ab[:, 0], # parent关系的概率
p_bc[:, 0]
)
return direct_preds, p_grandparent
return direct_preds
Part II: 持续学习(Continual Learning)¶
3. 灾难性遗忘问题¶
3.1 定义¶
灾难性遗忘(Catastrophic Forgetting):神经网络在学习新任务时,会急剧遗忘之前学过的任务。
3.2 为什么这很重要?¶
- 现实部署:模型需要持续学习新知识而不忘旧知识
- LLM:模型更新时如何不丢失已有能力
- 机器人:在新环境中学习时保留旧技能
- 边缘设备:增量更新而非重新训练
4. 主要解决方案¶
4.1 方法分类¶
Text Only
持续学习方法:
├── 1. 正则化方法(Regularization-based)
│ ├── EWC (Elastic Weight Consolidation)
│ ├── SI (Synaptic Intelligence)
│ └── 核心: 保护重要权重不被改变
├── 2. 回放方法(Replay-based)
│ ├── 经验回放 (Experience Replay)
│ ├── 生成式回放 (Generative Replay)
│ └── 核心: 混合新旧数据训练
├── 3. 架构方法(Architecture-based)
│ ├── Progressive Neural Networks
│ ├── PackNet
│ └── 核心: 为新任务分配新参数
└── 4. 提示方法(Prompt-based, 适用于LLM)
├── L2P (Learning to Prompt)
├── DualPrompt
└── 核心: 用不同prompt激活不同任务能力
4.2 EWC(弹性权重巩固)¶
Python
import torch
import torch.nn as nn
from typing import Dict
class ElasticWeightConsolidation:
"""
EWC (Elastic Weight Consolidation)
核心思想:
- 计算每个权重对旧任务的"重要性"(Fisher信息矩阵)
- 训练新任务时,惩罚重要权重的变化
- 类比: 像弹簧一样把重要权重拉回到旧值附近
损失函数: L = L_new + (λ/2) Σ F_i (θ_i - θ*_i)²
参考: Kirkpatrick et al., "Overcoming Catastrophic Forgetting" (2017)
"""
def __init__(self, model: nn.Module, ewc_lambda: float = 400.0):
"""
参数:
model: 神经网络模型
ewc_lambda: EWC正则化强度
"""
self.model = model
self.ewc_lambda = ewc_lambda
# 存储旧任务的参数和Fisher矩阵
self.saved_params: Dict[str, torch.Tensor] = {}
self.fisher: Dict[str, torch.Tensor] = {}
def compute_fisher(self, data_loader, n_samples: int = 200):
"""
计算Fisher信息矩阵(对角近似)
F_i = E[( ∂log p(y|x,θ) / ∂θ_i )²]
直觉: 如果某个权重的梯度经常很大,
说明这个权重对当前任务很重要
"""
fisher = {name: torch.zeros_like(param)
for name, param in self.model.named_parameters()}
self.model.eval() # eval()开启评估模式(关闭Dropout等)
count = 0
for inputs, targets in data_loader:
if count >= n_samples:
break
self.model.zero_grad() # 清零梯度,防止梯度累积
outputs = self.model(inputs)
loss = nn.functional.cross_entropy(outputs, targets)
loss.backward() # 反向传播计算梯度
# 累加梯度的平方(Fisher的对角近似)
for name, param in self.model.named_parameters():
if param.grad is not None:
fisher[name] += param.grad.data ** 2
count += inputs.size(0)
# 取平均
for name in fisher:
fisher[name] /= count
self.fisher = fisher
# 保存当前参数(旧任务的最优参数 θ*)
self.saved_params = {
name: param.data.clone()
for name, param in self.model.named_parameters()
}
def ewc_loss(self) -> torch.Tensor:
"""
计算EWC正则化损失
L_ewc = (λ/2) Σ_i F_i (θ_i - θ*_i)²
F_i大的权重(对旧任务重要)变化受到更大惩罚
"""
loss = torch.tensor(0.0, device=next(self.model.parameters()).device)
for name, param in self.model.named_parameters():
if name in self.fisher:
# 权重变化 * Fisher重要性
loss += (self.fisher[name] *
(param - self.saved_params[name]) ** 2).sum()
return 0.5 * self.ewc_lambda * loss
def train_new_task(self, data_loader, optimizer, epochs=10):
"""
训练新任务(带EWC正则化)
"""
self.model.train() # train()开启训练模式
for epoch in range(epochs):
total_loss = 0
for inputs, targets in data_loader:
optimizer.zero_grad()
# 新任务损失
outputs = self.model(inputs)
task_loss = nn.functional.cross_entropy(outputs, targets)
# EWC正则化损失
reg_loss = self.ewc_loss()
# 总损失
loss = task_loss + reg_loss
loss.backward()
optimizer.step() # 根据梯度更新模型参数
total_loss += loss.item() # .item()将单元素张量转为Python数值
print(f"Epoch {epoch+1}: loss={total_loss/len(data_loader):.4f}")
4.3 生成式回放¶
Python
class GenerativeReplay:
"""
生成式回放: 用生成模型代替存储旧数据
核心思想:
- 训练一个生成模型(如VAE/GAN)来生成旧任务的"伪数据"
- 学新任务时,混合真实新数据和生成的旧数据
- 优势: 不需要存储旧数据(隐私友好)
参考: Shin et al., "Continual Learning with Deep Generative Replay" (2017)
"""
def __init__(self, task_model: nn.Module, generator: nn.Module):
"""
参数:
task_model: 任务模型(分类器等)
generator: 生成模型(VAE/GAN)
"""
self.task_model = task_model
self.generator = generator
def generate_replay_data(self, n_samples: int = 100):
"""用生成模型生成旧任务的伪数据"""
self.generator.eval()
with torch.no_grad(): # 禁用梯度计算,节省内存
# 从噪声生成数据
z = torch.randn(n_samples, self.generator.latent_dim)
fake_data = self.generator.decode(z)
# 用当前任务模型为伪数据打标签
fake_labels = self.task_model(fake_data).argmax(dim=-1)
return fake_data, fake_labels
def train_step(self, real_data, real_labels, replay_ratio: float = 0.5):
"""
混合真实数据和回放数据训练
参数:
real_data: 新任务的真实数据
real_labels: 新任务的真实标签
replay_ratio: 回放数据的比例
"""
n_replay = int(len(real_data) * replay_ratio)
if n_replay > 0:
# 生成旧任务回放数据
fake_data, fake_labels = self.generate_replay_data(n_replay)
# 混合新旧数据
all_data = torch.cat([real_data, fake_data])
all_labels = torch.cat([real_labels, fake_labels])
else:
all_data, all_labels = real_data, real_labels
# 正常训练
outputs = self.task_model(all_data)
loss = nn.functional.cross_entropy(outputs, all_labels)
return loss
Part III: 归一化流(Normalizing Flows)¶
5. 归一化流基础¶
5.1 核心思想¶
归一化流是另一种重要的生成模型(与GAN、VAE、扩散模型并列)。
核心思想:通过一系列可逆且可微的变换 \(f_1, f_2, \ldots, f_K\),将简单分布(如高斯)变换为复杂的数据分布。
\[\mathbf{z}_0 \sim \mathcal{N}(0, I) \xrightarrow{f_1} \mathbf{z}_1 \xrightarrow{f_2} \mathbf{z}_2 \xrightarrow{f_K} \mathbf{x}\]
由变量替换公式:
\[\log p(\mathbf{x}) = \log p(\mathbf{z}_0) - \sum_{k=1}^{K} \log \left| \det \frac{\partial f_k}{\partial \mathbf{z}_{k-1}} \right|\]
5.2 与其他生成模型的对比¶
| 特性 | GAN | VAE | 扩散模型 | 归一化流 |
|---|---|---|---|---|
| 精确似然 | ❌ | 近似(ELBO) | ❌(ELBO) | ✅ 精确 |
| 可逆映射 | ❌ | ❌ | ❌ | ✅ |
| 训练稳定 | 不稳定 | 稳定 | 稳定 | 稳定 |
| 生成质量 | 高 | 中 | 最高 | 中高 |
| 推理速度 | 极快 | 快 | 慢 | 可快可慢 |
| 潜在空间 | 无意义 | 有意义 | - | 有意义 |
5.3 代码实现¶
Python
import torch
import torch.nn as nn
import numpy as np
class PlanarFlow(nn.Module):
"""
平面流(Planar Flow)— 最简单的归一化流
变换: f(z) = z + u · h(w^T z + b)
其中 u, w ∈ R^d, b ∈ R, h 是激活函数
参考: Rezende & Mohamed, "Variational Inference with Normalizing Flows" (2015)
"""
def __init__(self, dim: int):
super().__init__()
self.w = nn.Parameter(torch.randn(dim)) # 方向向量
self.u = nn.Parameter(torch.randn(dim)) # 缩放向量
self.b = nn.Parameter(torch.zeros(1)) # 偏置
def forward(self, z: torch.Tensor):
"""
前向变换 + 计算对数行列式雅可比
参数:
z: 输入 (batch, dim)
返回:
z': 变换后的输出
log_det: 对数行列式 (batch,)
"""
# 确保可逆性(通过reparameterize u)
u_hat = self._get_u_hat()
# 线性部分
linear = z @ self.w + self.b # (batch,)
# 非线性变换
h = torch.tanh(linear) # (batch,)
h_prime = 1 - h ** 2 # tanh的导数
# f(z) = z + u_hat * h(w^T z + b)
z_new = z + u_hat.unsqueeze(0) * h.unsqueeze(1) # unsqueeze增加一个维度
# 对数行列式雅可比
# det(I + u * (h' * w)^T) = 1 + u^T (h' * w)
psi = h_prime.unsqueeze(1) * self.w.unsqueeze(0) # (batch, dim)
log_det = torch.log(torch.abs(1 + psi @ u_hat) + 1e-8)
return z_new, log_det
def _get_u_hat(self) -> torch.Tensor:
"""重参数化u,确保变换可逆"""
wu = self.w @ self.u
m_wu = -1 + torch.log1p(torch.exp(wu)) # softplus
u_hat = self.u + (m_wu - wu) * self.w / (self.w @ self.w + 1e-8)
return u_hat
class NormalizingFlow(nn.Module):
"""
归一化流: K个可逆变换的组合
z_0 ~ N(0, I) → f_1 → f_2 → ... → f_K → x
精确计算似然:
log p(x) = log p(z_0) - Σ log|det J_k|
"""
def __init__(self, dim: int, n_flows: int = 8, flow_type: str = 'planar'):
"""
参数:
dim: 数据维度
n_flows: 流的层数(越多表达能力越强)
flow_type: 流的类型
"""
super().__init__()
self.dim = dim
if flow_type == 'planar':
self.flows = nn.ModuleList([PlanarFlow(dim) for _ in range(n_flows)]) # 列表推导式,简洁创建列表
else:
raise ValueError(f"未知流类型: {flow_type}")
def forward(self, z: torch.Tensor):
"""前向传播: 简单分布 → 复杂分布"""
log_det_sum = torch.zeros(z.shape[0], device=z.device)
for flow in self.flows:
z, log_det = flow(z)
log_det_sum += log_det.squeeze() # squeeze去除大小为1的维度
return z, log_det_sum
def log_likelihood(self, x: torch.Tensor) -> torch.Tensor:
"""
计算数据的精确对数似然
需要逆变换(从x到z),然后:
log p(x) = log p(z_0) + Σ log|det J_k^{-1}|
"""
# 注: 平面流的逆变换没有解析解,需要用其他架构
# 这里展示正向计算(从z到x的对数似然)
z0 = torch.randn_like(x)
x_gen, log_det = self.forward(z0)
# 先验分布的log概率(标准正态)
log_pz = -0.5 * (z0 ** 2 + np.log(2 * np.pi)).sum(dim=-1)
return log_pz + log_det
def sample(self, n_samples: int = 100) -> torch.Tensor:
"""从学到的分布中采样"""
z0 = torch.randn(n_samples, self.dim)
samples, _ = self.forward(z0)
return samples
def train_step(self, data: torch.Tensor, optimizer):
"""
最大似然训练
"""
# 从先验采样
z0 = torch.randn_like(data)
x_gen, log_det = self.forward(z0)
# 负对数似然损失
log_pz = -0.5 * (z0 ** 2 + np.log(2 * np.pi)).sum(dim=-1)
nll = -(log_pz + log_det).mean()
optimizer.zero_grad()
nll.backward()
optimizer.step()
return nll.item()
6. 面试要点¶
6.1 高频问题¶
- 神经符号AI的动机是什么?
-
结合神经网络的感知能力和符号系统的推理能力,实现可解释、可验证的AI
-
什么是灾难性遗忘?EWC如何缓解?
-
学习新任务时丢失旧知识;EWC通过Fisher矩阵惩罚重要权重的变化
-
归一化流相比GAN/VAE的核心优势?
-
精确的似然计算(GAN无似然,VAE只有ELBO下界)
-
可微分逻辑的意义?
- 让逻辑规则参与梯度计算→端到端训练→结合先验知识与数据学习
📌 关键要点总结¶
- 神经符号AI结合神经感知与符号推理,是通向更可靠AI的关键方向
- Type 2(LLM+工具)是当前最成功的实践范式
- 灾难性遗忘是持续学习的核心挑战,EWC和回放是主要缓解手段
- 归一化流是唯一能精确计算似然的深度生成模型