跳转至

10 - 神经符号AI与持续学习

学习时间: 3-4小时 重要性: ⭐⭐⭐⭐ 走向更可靠、可解释的AI 前置知识: 深度学习基础、知识图谱概念


🎯 学习目标

完成本章后,你将能够: - 理解神经符号AI(Neuro-Symbolic AI)的动机与核心框架 - 掌握神经网络与符号推理结合的主要范式 - 了解持续学习(Continual Learning)的灾难性遗忘问题与解决方案 - 了解归一化流(Normalizing Flows)的基本原理


Part I: 神经符号AI

1. 为什么需要神经符号AI?

1.1 两种AI范式的对比

Text Only
符号AI (传统)            vs          神经网络 (现代)
├── 逻辑规则、知识库                 ├── 数据驱动、端到端学习
├── 可解释、可验证                   ├── 黑箱、难解释
├── 需要手工编写规则                 ├── 自动从数据中学习
├── 处理结构化推理很强               ├── 处理感知(图像/语音)很强
├── 难以处理不确定性                 ├── 天然处理噪声和不确定性
├── 泛化能力有限                     ├── 可泛化到训练分布
└── 数据效率高(但需专家)           └── 数据效率低(需大量数据)

神经符号AI: 结合两者优势
├── 用神经网络做感知(看/听)
├── 用符号系统做推理(想/判断)
└── 目标: 可解释 + 可学习 + 可推理

1.2 现实问题驱动

纯神经网络的不足 神经符号AI如何解决
大模型"幻觉"问题 符号验证层检查逻辑一致性
数学推理不可靠 符号计算器处理精确运算
缺乏因果推理 因果图 + 神经网络
不满足法规要求(可解释性) 符号规则提供可审计的推理路径
小样本 / 零样本困难 先验知识以符号形式注入

2. 神经符号AI的主要范式

2.1 Kautz的六种分类(2022)

Text Only
Type 1: 符号为主,神经为辅
├── 例: 专家系统 + 神经网络做预处理
└── 代表: IBM Watson

Type 2: 神经为主,符号为辅(当前最主流)
├── 例: LLM + 外部工具调用(计算器、知识图谱)
└── 代表: GPT + Code Interpreter, Toolformer

Type 3: 神经符号编织
├── 例: 神经网络层之间嵌入符号推理
└── 代表: Neural Theorem Prover

Type 4: 符号知识编译到神经网络
├── 例: 将知识图谱嵌入到网络参数中
└── 代表: KGAT, Knowledge-enhanced BERT

Type 5: 神经网络生成符号规则
├── 例: 从数据中自动发现因果关系/规则
└── 代表: Neural Logic Programming

Type 6: 完全融合
├── 例: 可微分的逻辑推理
└── 代表: Logic Tensor Networks, DeepProbLog

2.2 Type 2: LLM + 工具调用(最广泛应用)

Python
class NeuroSymbolicLLM:
    """
    Type 2 神经符号AI: LLM + 符号工具

    LLM负责自然语言理解和规划
    符号工具负责精确计算和知识查询

    这是当前最实用的神经符号AI范式
    """

    def __init__(self, llm, tools: dict):  # __init__构造方法,创建对象时自动调用
        """
        参数:
            llm: 语言模型(神经网络部分)
            tools: 工具集合(符号推理部分)
                keys: 工具名称
                values: 可调用的工具函数
        """
        self.llm = llm
        self.tools = tools

    def reason(self, question: str) -> str:
        """
        结合神经和符号推理回答问题

        流程:
        1. LLM理解问题,决定是否需要工具
        2. 如需要,调用对应的符号工具
        3. 将工具结果整合到最终回答中
        """
        # Step 1: LLM分析问题,生成推理计划
        plan = self.llm.generate(f"""
        问题: {question}
        可用工具: {list(self.tools.keys())}
        请分析是否需要工具,如需要请给出调用计划。
        """)

        # Step 2: 执行工具调用(符号推理)
        tool_results = {}
        for tool_name, tool_fn in self.tools.items():
            if tool_name in plan:
                # 提取参数并调用工具
                args = self._extract_args(plan, tool_name)
                tool_results[tool_name] = tool_fn(**args)  # *args接收任意位置参数,**kwargs接收任意关键字参数

        # Step 3: LLM整合结果
        answer = self.llm.generate(f"""
        问题: {question}
        工具结果: {tool_results}
        请给出最终回答。
        """)

        return answer

    def _extract_args(self, plan, tool_name):
        """从推理计划中提取工具参数(简化实现)"""
        return {}

# 使用示例
tools = {
    'calculator': lambda expr: eval(expr),          # 精确计算
    'knowledge_graph': lambda query: "查询结果...",  # 知识查询
    'theorem_prover': lambda prop: True,            # 定理证明
}

2.3 Type 6: 可微分逻辑推理

Python
import torch
import torch.nn as nn

class DifferentiableLogic(nn.Module):  # 继承nn.Module定义神经网络层
    """
    可微分的逻辑推理 (概率软逻辑)

    将离散的逻辑运算替换为连续可微的近似:
    - AND(a, b) → min(a, b) 或 a * b
    - OR(a, b)  → max(a, b) 或 a + b - a*b
    - NOT(a)    → 1 - a

    这样逻辑规则可以参与梯度反向传播
    """

    def __init__(self, t_norm: str = 'product'):
        """
        参数:
            t_norm: T-范数类型
                - 'product': 乘积 (AND=a*b, OR=a+b-a*b)
                - 'godel': Gödel (AND=min, OR=max)
                - 'lukasiewicz': Łukasiewicz (AND=max(0,a+b-1))
        """
        super().__init__()  # super()调用父类方法
        self.t_norm = t_norm

    def fuzzy_and(self, a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
        """可微分的AND操作"""
        if self.t_norm == 'product':
            return a * b
        elif self.t_norm == 'godel':
            return torch.min(a, b)
        elif self.t_norm == 'lukasiewicz':
            return torch.clamp(a + b - 1, min=0)

    def fuzzy_or(self, a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
        """可微分的OR操作"""
        if self.t_norm == 'product':
            return a + b - a * b
        elif self.t_norm == 'godel':
            return torch.max(a, b)
        elif self.t_norm == 'lukasiewicz':
            return torch.clamp(a + b, max=1)

    def fuzzy_not(self, a: torch.Tensor) -> torch.Tensor:
        """可微分的NOT操作"""
        return 1 - a

    def fuzzy_implies(self, a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
        """可微分的蕴含 (A → B ≡ ¬A ∨ B)"""
        return self.fuzzy_or(self.fuzzy_not(a), b)

class NeuralLogicNetwork(nn.Module):
    """
    神经逻辑网络: 将逻辑规则嵌入神经网络

    示例任务: 关系推理
    - 输入: 实体对(x, y)的特征
    - 规则: parent(x,y) ∧ parent(y,z) → grandparent(x,z)
    - 输出: 各种关系的概率
    """

    def __init__(self, entity_dim: int, n_relations: int):
        """
        参数:
            entity_dim: 实体嵌入维度
            n_relations: 关系类型数量
        """
        super().__init__()

        # 实体嵌入层
        self.entity_encoder = nn.Linear(entity_dim, 128)

        # 关系预测器(神经部分)
        self.relation_predictor = nn.Sequential(
            nn.Linear(256, 128),  # 两个实体拼接
            nn.ReLU(),
            nn.Linear(128, n_relations),
            nn.Sigmoid()  # 输出概率 [0, 1]
        )

        # 可微分逻辑引擎(符号部分)
        self.logic = DifferentiableLogic(t_norm='product')

    def predict_relation(self, entity_a: torch.Tensor,
                         entity_b: torch.Tensor) -> torch.Tensor:
        """预测两个实体间的关系概率"""
        a_emb = self.entity_encoder(entity_a)
        b_emb = self.entity_encoder(entity_b)
        pair = torch.cat([a_emb, b_emb], dim=-1)
        return self.relation_predictor(pair)

    def apply_transitivity_rule(
        self,
        p_parent_xy: torch.Tensor,
        p_parent_yz: torch.Tensor
    ) -> torch.Tensor:
        """
        应用传递性规则: parent(x,y) ∧ parent(y,z) → grandparent(x,z)
        使用可微分逻辑保持梯度流
        """
        return self.logic.fuzzy_and(p_parent_xy, p_parent_yz)

    def forward(self, entities_a, entities_b, entities_c=None):
        """
        前向传播: 神经预测 + 逻辑规则推导
        """
        # 神经部分: 直接预测关系
        direct_preds = self.predict_relation(entities_a, entities_b)

        if entities_c is not None:
            # 逻辑推理部分: 利用传递性增强预测
            p_ab = self.predict_relation(entities_a, entities_b)
            p_bc = self.predict_relation(entities_b, entities_c)

            # 应用规则: parent(a,b) ∧ parent(b,c) → grandparent(a,c)
            p_grandparent = self.apply_transitivity_rule(
                p_ab[:, 0],  # parent关系的概率
                p_bc[:, 0]
            )

            return direct_preds, p_grandparent

        return direct_preds

Part II: 持续学习(Continual Learning)

3. 灾难性遗忘问题

3.1 定义

灾难性遗忘(Catastrophic Forgetting):神经网络在学习新任务时,会急剧遗忘之前学过的任务。

Text Only
灾难性遗忘示例:
任务1: 识别猫和狗 → 准确率 95%
任务2: 识别鸟和鱼 → 准确率 93%
       但此时猫狗识别降至 40%!

原因: 任务2的训练覆盖了任务1学到的权重

3.2 为什么这很重要?

  • 现实部署:模型需要持续学习新知识而不忘旧知识
  • LLM:模型更新时如何不丢失已有能力
  • 机器人:在新环境中学习时保留旧技能
  • 边缘设备:增量更新而非重新训练

4. 主要解决方案

4.1 方法分类

Text Only
持续学习方法:
├── 1. 正则化方法(Regularization-based)
│   ├── EWC (Elastic Weight Consolidation)
│   ├── SI (Synaptic Intelligence)
│   └── 核心: 保护重要权重不被改变
├── 2. 回放方法(Replay-based)
│   ├── 经验回放 (Experience Replay)
│   ├── 生成式回放 (Generative Replay)
│   └── 核心: 混合新旧数据训练
├── 3. 架构方法(Architecture-based)
│   ├── Progressive Neural Networks
│   ├── PackNet
│   └── 核心: 为新任务分配新参数
└── 4. 提示方法(Prompt-based, 适用于LLM)
    ├── L2P (Learning to Prompt)
    ├── DualPrompt
    └── 核心: 用不同prompt激活不同任务能力

4.2 EWC(弹性权重巩固)

Python
import torch
import torch.nn as nn
from typing import Dict

class ElasticWeightConsolidation:
    """
    EWC (Elastic Weight Consolidation)

    核心思想:
    - 计算每个权重对旧任务的"重要性"(Fisher信息矩阵)
    - 训练新任务时,惩罚重要权重的变化
    - 类比: 像弹簧一样把重要权重拉回到旧值附近

    损失函数: L = L_new + (λ/2) Σ F_i (θ_i - θ*_i)²

    参考: Kirkpatrick et al., "Overcoming Catastrophic Forgetting" (2017)
    """

    def __init__(self, model: nn.Module, ewc_lambda: float = 400.0):
        """
        参数:
            model: 神经网络模型
            ewc_lambda: EWC正则化强度
        """
        self.model = model
        self.ewc_lambda = ewc_lambda

        # 存储旧任务的参数和Fisher矩阵
        self.saved_params: Dict[str, torch.Tensor] = {}
        self.fisher: Dict[str, torch.Tensor] = {}

    def compute_fisher(self, data_loader, n_samples: int = 200):
        """
        计算Fisher信息矩阵(对角近似)

        F_i = E[( ∂log p(y|x,θ) / ∂θ_i )²]

        直觉: 如果某个权重的梯度经常很大,
              说明这个权重对当前任务很重要
        """
        fisher = {name: torch.zeros_like(param)
                  for name, param in self.model.named_parameters()}

        self.model.eval()  # eval()开启评估模式(关闭Dropout等)
        count = 0

        for inputs, targets in data_loader:
            if count >= n_samples:
                break

            self.model.zero_grad()  # 清零梯度,防止梯度累积
            outputs = self.model(inputs)
            loss = nn.functional.cross_entropy(outputs, targets)
            loss.backward()  # 反向传播计算梯度

            # 累加梯度的平方(Fisher的对角近似)
            for name, param in self.model.named_parameters():
                if param.grad is not None:
                    fisher[name] += param.grad.data ** 2

            count += inputs.size(0)

        # 取平均
        for name in fisher:
            fisher[name] /= count

        self.fisher = fisher

        # 保存当前参数(旧任务的最优参数 θ*)
        self.saved_params = {
            name: param.data.clone()
            for name, param in self.model.named_parameters()
        }

    def ewc_loss(self) -> torch.Tensor:
        """
        计算EWC正则化损失

        L_ewc = (λ/2) Σ_i F_i (θ_i - θ*_i)²

        F_i大的权重(对旧任务重要)变化受到更大惩罚
        """
        loss = torch.tensor(0.0, device=next(self.model.parameters()).device)

        for name, param in self.model.named_parameters():
            if name in self.fisher:
                # 权重变化 * Fisher重要性
                loss += (self.fisher[name] *
                        (param - self.saved_params[name]) ** 2).sum()

        return 0.5 * self.ewc_lambda * loss

    def train_new_task(self, data_loader, optimizer, epochs=10):
        """
        训练新任务(带EWC正则化)
        """
        self.model.train()  # train()开启训练模式

        for epoch in range(epochs):
            total_loss = 0
            for inputs, targets in data_loader:
                optimizer.zero_grad()

                # 新任务损失
                outputs = self.model(inputs)
                task_loss = nn.functional.cross_entropy(outputs, targets)

                # EWC正则化损失
                reg_loss = self.ewc_loss()

                # 总损失
                loss = task_loss + reg_loss
                loss.backward()
                optimizer.step()  # 根据梯度更新模型参数

                total_loss += loss.item()  # .item()将单元素张量转为Python数值

            print(f"Epoch {epoch+1}: loss={total_loss/len(data_loader):.4f}")

4.3 生成式回放

Python
class GenerativeReplay:
    """
    生成式回放: 用生成模型代替存储旧数据

    核心思想:
    - 训练一个生成模型(如VAE/GAN)来生成旧任务的"伪数据"
    - 学新任务时,混合真实新数据和生成的旧数据
    - 优势: 不需要存储旧数据(隐私友好)

    参考: Shin et al., "Continual Learning with Deep Generative Replay" (2017)
    """

    def __init__(self, task_model: nn.Module, generator: nn.Module):
        """
        参数:
            task_model: 任务模型(分类器等)
            generator: 生成模型(VAE/GAN)
        """
        self.task_model = task_model
        self.generator = generator

    def generate_replay_data(self, n_samples: int = 100):
        """用生成模型生成旧任务的伪数据"""
        self.generator.eval()
        with torch.no_grad():  # 禁用梯度计算,节省内存
            # 从噪声生成数据
            z = torch.randn(n_samples, self.generator.latent_dim)
            fake_data = self.generator.decode(z)

            # 用当前任务模型为伪数据打标签
            fake_labels = self.task_model(fake_data).argmax(dim=-1)

        return fake_data, fake_labels

    def train_step(self, real_data, real_labels, replay_ratio: float = 0.5):
        """
        混合真实数据和回放数据训练

        参数:
            real_data: 新任务的真实数据
            real_labels: 新任务的真实标签
            replay_ratio: 回放数据的比例
        """
        n_replay = int(len(real_data) * replay_ratio)

        if n_replay > 0:
            # 生成旧任务回放数据
            fake_data, fake_labels = self.generate_replay_data(n_replay)

            # 混合新旧数据
            all_data = torch.cat([real_data, fake_data])
            all_labels = torch.cat([real_labels, fake_labels])
        else:
            all_data, all_labels = real_data, real_labels

        # 正常训练
        outputs = self.task_model(all_data)
        loss = nn.functional.cross_entropy(outputs, all_labels)

        return loss

Part III: 归一化流(Normalizing Flows)

5. 归一化流基础

5.1 核心思想

归一化流是另一种重要的生成模型(与GAN、VAE、扩散模型并列)。

核心思想:通过一系列可逆且可微的变换 \(f_1, f_2, \ldots, f_K\),将简单分布(如高斯)变换为复杂的数据分布。

\[\mathbf{z}_0 \sim \mathcal{N}(0, I) \xrightarrow{f_1} \mathbf{z}_1 \xrightarrow{f_2} \mathbf{z}_2 \xrightarrow{f_K} \mathbf{x}\]

由变量替换公式:

\[\log p(\mathbf{x}) = \log p(\mathbf{z}_0) - \sum_{k=1}^{K} \log \left| \det \frac{\partial f_k}{\partial \mathbf{z}_{k-1}} \right|\]

5.2 与其他生成模型的对比

特性 GAN VAE 扩散模型 归一化流
精确似然 近似(ELBO) ❌(ELBO) ✅ 精确
可逆映射
训练稳定 不稳定 稳定 稳定 稳定
生成质量 最高 中高
推理速度 极快 可快可慢
潜在空间 无意义 有意义 - 有意义

5.3 代码实现

Python
import torch
import torch.nn as nn
import numpy as np

class PlanarFlow(nn.Module):
    """
    平面流(Planar Flow)— 最简单的归一化流

    变换: f(z) = z + u · h(w^T z + b)
    其中 u, w ∈ R^d, b ∈ R, h 是激活函数

    参考: Rezende & Mohamed, "Variational Inference with Normalizing Flows" (2015)
    """

    def __init__(self, dim: int):
        super().__init__()
        self.w = nn.Parameter(torch.randn(dim))     # 方向向量
        self.u = nn.Parameter(torch.randn(dim))     # 缩放向量
        self.b = nn.Parameter(torch.zeros(1))       # 偏置

    def forward(self, z: torch.Tensor):
        """
        前向变换 + 计算对数行列式雅可比

        参数:
            z: 输入 (batch, dim)
        返回:
            z': 变换后的输出
            log_det: 对数行列式 (batch,)
        """
        # 确保可逆性(通过reparameterize u)
        u_hat = self._get_u_hat()

        # 线性部分
        linear = z @ self.w + self.b  # (batch,)

        # 非线性变换
        h = torch.tanh(linear)           # (batch,)
        h_prime = 1 - h ** 2            # tanh的导数

        # f(z) = z + u_hat * h(w^T z + b)
        z_new = z + u_hat.unsqueeze(0) * h.unsqueeze(1)  # unsqueeze增加一个维度

        # 对数行列式雅可比
        # det(I + u * (h' * w)^T) = 1 + u^T (h' * w)
        psi = h_prime.unsqueeze(1) * self.w.unsqueeze(0)  # (batch, dim)
        log_det = torch.log(torch.abs(1 + psi @ u_hat) + 1e-8)

        return z_new, log_det

    def _get_u_hat(self) -> torch.Tensor:
        """重参数化u,确保变换可逆"""
        wu = self.w @ self.u
        m_wu = -1 + torch.log1p(torch.exp(wu))  # softplus
        u_hat = self.u + (m_wu - wu) * self.w / (self.w @ self.w + 1e-8)
        return u_hat

class NormalizingFlow(nn.Module):
    """
    归一化流: K个可逆变换的组合

    z_0 ~ N(0, I) → f_1 → f_2 → ... → f_K → x

    精确计算似然:
    log p(x) = log p(z_0) - Σ log|det J_k|
    """

    def __init__(self, dim: int, n_flows: int = 8, flow_type: str = 'planar'):
        """
        参数:
            dim: 数据维度
            n_flows: 流的层数(越多表达能力越强)
            flow_type: 流的类型
        """
        super().__init__()
        self.dim = dim

        if flow_type == 'planar':
            self.flows = nn.ModuleList([PlanarFlow(dim) for _ in range(n_flows)])  # 列表推导式,简洁创建列表
        else:
            raise ValueError(f"未知流类型: {flow_type}")

    def forward(self, z: torch.Tensor):
        """前向传播: 简单分布 → 复杂分布"""
        log_det_sum = torch.zeros(z.shape[0], device=z.device)

        for flow in self.flows:
            z, log_det = flow(z)
            log_det_sum += log_det.squeeze()  # squeeze去除大小为1的维度

        return z, log_det_sum

    def log_likelihood(self, x: torch.Tensor) -> torch.Tensor:
        """
        计算数据的精确对数似然

        需要逆变换(从x到z),然后:
        log p(x) = log p(z_0) + Σ log|det J_k^{-1}|
        """
        # 注: 平面流的逆变换没有解析解,需要用其他架构
        # 这里展示正向计算(从z到x的对数似然)
        z0 = torch.randn_like(x)
        x_gen, log_det = self.forward(z0)

        # 先验分布的log概率(标准正态)
        log_pz = -0.5 * (z0 ** 2 + np.log(2 * np.pi)).sum(dim=-1)

        return log_pz + log_det

    def sample(self, n_samples: int = 100) -> torch.Tensor:
        """从学到的分布中采样"""
        z0 = torch.randn(n_samples, self.dim)
        samples, _ = self.forward(z0)
        return samples

    def train_step(self, data: torch.Tensor, optimizer):
        """
        最大似然训练
        """
        # 从先验采样
        z0 = torch.randn_like(data)
        x_gen, log_det = self.forward(z0)

        # 负对数似然损失
        log_pz = -0.5 * (z0 ** 2 + np.log(2 * np.pi)).sum(dim=-1)
        nll = -(log_pz + log_det).mean()

        optimizer.zero_grad()
        nll.backward()
        optimizer.step()

        return nll.item()

6. 面试要点

6.1 高频问题

  1. 神经符号AI的动机是什么?
  2. 结合神经网络的感知能力和符号系统的推理能力,实现可解释、可验证的AI

  3. 什么是灾难性遗忘?EWC如何缓解?

  4. 学习新任务时丢失旧知识;EWC通过Fisher矩阵惩罚重要权重的变化

  5. 归一化流相比GAN/VAE的核心优势?

  6. 精确的似然计算(GAN无似然,VAE只有ELBO下界)

  7. 可微分逻辑的意义?

  8. 让逻辑规则参与梯度计算→端到端训练→结合先验知识与数据学习

📌 关键要点总结

  1. 神经符号AI结合神经感知与符号推理,是通向更可靠AI的关键方向
  2. Type 2(LLM+工具)是当前最成功的实践范式
  3. 灾难性遗忘是持续学习的核心挑战,EWC和回放是主要缓解手段
  4. 归一化流是唯一能精确计算似然的深度生成模型

📚 延伸阅读

  • 可解释AI
  • GAN基础
  • VAE基础
  • 论文: Kautz, "The Third AI Summer" (2022)
  • 论文: Kirkpatrick et al., "Overcoming Catastrophic Forgetting in NNs" (2017)
  • 论文: Rezende & Mohamed, "Variational Inference with Normalizing Flows" (2015)