跳转至

23 - 可解释AI与因果推断

可解释AI与因果推断图

🔍 可解释AI (XAI) 概述

为什么需要可解释性

Text Only
模型可解释性的重要性:

┌─────────────────────────────────────────────────────────────┐
│ 场景                    │ 解释性需求                        │
├─────────────────────────────────────────────────────────────┤
│ 医疗诊断                │ 医生需要理解决策依据              │
│ 金融风控                │ 监管要求解释拒绝原因              │
│ 司法判决                │ 必须提供可审计的决策逻辑          │
│ 自动驾驶                │ 事故分析需要追溯决策过程          │
│ 科学研究                │ 从模型中发现新知识                │
└─────────────────────────────────────────────────────────────┘

黑盒问题:
- 深度学习模型参数量巨大
- 决策过程难以直观理解
- 无法验证模型是否学到正确特征

可解释性类型

Text Only
┌─────────────────────────────────────────────────────────────┐
│                    可解释性分类                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  按解释时机          按解释范围          按解释方法          │
│  ├─ 内在可解释       ├─ 全局解释         ├─ 基于特征         │
│  │   (模型本身透明)   │   (整体行为)      │   (特征重要性)    │
│  │                    │                   │                   │
│  └─ 事后解释         └─ 局部解释         └─ 基于样本         │
│      (解释黑盒)        (单个预测)          (反事实解释)       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

🧮 内在可解释模型

线性模型解释

Python
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.datasets import make_classification, make_regression

class LinearModelInterpretation:
    """线性模型的可解释性"""

    def __init__(self):  # __init__构造方法,创建对象时自动调用
        pass

    def explain_linear_regression(self, X, y, feature_names=None):
        """
        线性回归的可解释性

        y = w₁x₁ + w₂x₂ + ... + wₙxₙ + b

        权重直接表示特征影响程度
        """
        model = LinearRegression()
        model.fit(X, y)

        if feature_names is None:
            feature_names = [f"Feature_{i}" for i in range(X.shape[1])]  # 列表推导式,简洁创建列表

        # 特征重要性(标准化后的权重)
        X_std = (X - X.mean(axis=0)) / X.std(axis=0)
        model_std = LinearRegression()
        model_std.fit(X_std, y)

        importance = np.abs(model_std.coef_)

        print("=" * 50)
        print("线性回归解释")
        print("=" * 50)
        print(f"\n方程: y = {' + '.join([f'{w:.3f}*{name}' for w, name in zip(model.coef_, feature_names)])} + {model.intercept_:.3f}")
        print(f"\n特征重要性(标准化权重绝对值):")
        for name, imp in zip(feature_names, importance):
            print(f"  {name}: {imp:.3f}")

        return model, importance

    def explain_logistic_regression(self, X, y, feature_names=None):
        """
        逻辑回归的可解释性

        log(p/(1-p)) = w₁x₁ + w₂x₂ + ... + wₙxₙ + b

        权重表示对数几率变化
        """
        model = LogisticRegression(max_iter=1000)
        model.fit(X, y)

        if feature_names is None:
            feature_names = [f"Feature_{i}" for i in range(X.shape[1])]

        # 优势比 (Odds Ratio)
        odds_ratios = np.exp(model.coef_[0])

        print("=" * 50)
        print("逻辑回归解释")
        print("=" * 50)
        print(f"\n对数几率方程: log(p/(1-p)) = {' + '.join([f'{w:.3f}*{name}' for w, name in zip(model.coef_[0], feature_names)])} + {model.intercept_[0]:.3f}")
        print(f"\n优势比 (Odds Ratio) - 特征增加1单位,优势变化倍数:")
        for name, or_val in zip(feature_names, odds_ratios):
            direction = "增加" if or_val > 1 else "减少"
            print(f"  {name}: {or_val:.3f} ({direction} {abs(or_val-1)*100:.1f}%)")

        return model, odds_ratios

# 示例
X_reg, y_reg = make_regression(n_samples=100, n_features=3, noise=10, random_state=42)
interpreter = LinearModelInterpretation()
interpreter.explain_linear_regression(X_reg, y_reg, ['Age', 'Income', 'Education'])

X_cls, y_cls = make_classification(n_samples=100, n_features=3, n_classes=2, random_state=42)
interpreter.explain_logistic_regression(X_cls, y_cls, ['Age', 'Income', 'Education'])

决策树解释

Python
from sklearn.tree import DecisionTreeClassifier, export_text, plot_tree
from sklearn.ensemble import RandomForestClassifier
import graphviz

class TreeModelInterpretation:
    """树模型的可解释性"""

    def explain_decision_tree(self, X, y, feature_names=None, max_depth=3):
        """
        决策树的可解释性

        优势:
        - 决策路径完全透明
        - 支持可视化
        - 特征重要性直观
        """
        model = DecisionTreeClassifier(max_depth=max_depth, random_state=42)
        model.fit(X, y)

        if feature_names is None:
            feature_names = [f"Feature_{i}" for i in range(X.shape[1])]

        print("=" * 50)
        print("决策树解释")
        print("=" * 50)

        # 文本形式的决策规则
        tree_rules = export_text(model, feature_names=feature_names)
        print("\n决策规则:")
        print(tree_rules)

        # 特征重要性
        importance = model.feature_importances_
        print("\n特征重要性 (Gini重要性):")
        for name, imp in zip(feature_names, importance):
            print(f"  {name}: {imp:.3f}")

        # 解释单个样本的决策路径
        sample = X[0]
        self._explain_prediction_path(model, sample, feature_names)

        return model

    def _explain_prediction_path(self, model, sample, feature_names):
        """解释单个样本的决策路径"""
        node_indicator = model.decision_path([sample])
        leaf_id = model.apply([sample])[0]

        print("\n决策路径解释:")
        node_index = node_indicator.indices[node_indicator.indptr[0]:node_indicator.indptr[1]]

        for node_id in node_index:
            if node_id == leaf_id:
                print(f"  → 到达叶节点 {node_id},预测类别: {model.classes_[np.argmax(model.tree_.value[node_id])]}")
            else:
                feature = model.tree_.feature[node_id]
                threshold = model.tree_.threshold[node_id]
                if sample[feature] <= threshold:
                    print(f"  → 节点 {node_id}: {feature_names[feature]} = {sample[feature]:.2f} <= {threshold:.2f},走左分支")
                else:
                    print(f"  → 节点 {node_id}: {feature_names[feature]} = {sample[feature]:.2f} > {threshold:.2f},走右分支")

    def explain_random_forest(self, X, y, feature_names=None):
        """
        随机森林的可解释性

        虽然整体是黑盒,但可以分析:
        - 特征重要性
        - 部分依赖图
        """
        model = RandomForestClassifier(n_estimators=100, random_state=42)
        model.fit(X, y)

        if feature_names is None:
            feature_names = [f"Feature_{i}" for i in range(X.shape[1])]

        print("=" * 50)
        print("随机森林解释")
        print("=" * 50)

        # 特征重要性
        importance = model.feature_importances_
        std = np.std([tree.feature_importances_ for tree in model.estimators_], axis=0)

        print("\n特征重要性 (平均Gini重要性):")
        for name, imp, s in zip(feature_names, importance, std):
            print(f"  {name}: {imp:.3f}{s:.3f})")

        return model, importance

🔬 事后解释方法

SHAP (SHapley Additive exPlanations)

Python
import shap
from sklearn.ensemble import RandomForestClassifier

class SHAPExplainer:
    """SHAP值解释"""

    def __init__(self, model, X_background):
        """
        SHAP基于博弈论中的Shapley值

        核心思想:
        - 每个特征的贡献 = 该特征参与的所有组合的边际贡献的加权平均
        - 满足效率性、对称性、虚拟性、可加性
        """
        self.model = model
        self.explainer = shap.TreeExplainer(model) if hasattr(model, 'tree_') else shap.KernelExplainer(model.predict, X_background)  # hasattr检查对象是否有指定属性

    def explain_global(self, X, feature_names=None):
        """
        全局解释:所有样本的SHAP值汇总
        """
        shap_values = self.explainer.shap_values(X)

        print("=" * 50)
        print("SHAP全局解释")
        print("=" * 50)

        # 特征重要性(SHAP值绝对值的平均)
        if isinstance(shap_values, list):  # 多分类
            shap_importance = np.abs(shap_values[0]).mean(axis=0)  # 链式调用,连续执行多个方法
        else:
            shap_importance = np.abs(shap_values).mean(axis=0)

        if feature_names is None:
            feature_names = [f"Feature_{i}" for i in range(X.shape[1])]

        print("\n特征重要性 (|SHAP|平均值):")
        for name, imp in zip(feature_names, shap_importance):
            print(f"  {name}: {imp:.4f}")

        # 可视化
        plt.figure(figsize=(10, 6))
        shap.summary_plot(shap_values, X, feature_names=feature_names, show=False)
        plt.title("SHAP Summary Plot")
        plt.tight_layout()
        plt.show()

        return shap_values

    def explain_local(self, X, instance_idx=0, feature_names=None):
        """
        局部解释:单个样本的预测解释

        预测 = 基准值 + Σ(SHAP值)
        """
        shap_values = self.explainer.shap_values(X)

        print("=" * 50)
        print(f"SHAP局部解释 - 样本 {instance_idx}")
        print("=" * 50)

        if feature_names is None:
            feature_names = [f"Feature_{i}" for i in range(X.shape[1])]

        # 获取该样本的SHAP值
        if isinstance(shap_values, list):
            instance_shap = shap_values[0][instance_idx]
        else:
            instance_shap = shap_values[instance_idx]

        # 基准值(期望输出)
        base_value = self.explainer.expected_value
        if isinstance(base_value, list):
            base_value = base_value[0]

        print(f"\n基准值 (期望输出): {base_value:.4f}")
        print(f"实际预测: {base_value + np.sum(instance_shap):.4f}")
        print(f"\n各特征贡献:")

        # 按贡献排序
        contributions = list(zip(feature_names, instance_shap, X[instance_idx]))
        contributions.sort(key=lambda x: abs(x[1]), reverse=True)  # lambda匿名函数,简洁定义单行函数

        for name, shap_val, feat_val in contributions:
            direction = "增加" if shap_val > 0 else "减少"
            print(f"  {name} = {feat_val:.3f}: {direction} {abs(shap_val):.4f}")

        # 瀑布图
        plt.figure(figsize=(10, 6))
        shap.waterfall_plot(shap.Explanation(
            values=instance_shap,
            base_values=base_value,
            data=X[instance_idx],
            feature_names=feature_names
        ), show=False)
        plt.title(f"SHAP Waterfall Plot - Instance {instance_idx}")
        plt.tight_layout()
        plt.show()

        return instance_shap

# 使用示例
from sklearn.datasets import load_breast_cancer

data = load_breast_cancer()
X, y = data.data, data.target
feature_names = data.feature_names

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)

explainer = SHAPExplainer(model, X[:100])  # 切片操作取子序列
shap_values = explainer.explain_global(X[:50], feature_names)
explainer.explain_local(X, instance_idx=0, feature_names=feature_names)

LIME (Local Interpretable Model-agnostic Explanations)

Python
import lime
import lime.lime_tabular
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_breast_cancer

class LIMEExplainer:
    """LIME局部解释"""

    def __init__(self, model, X_train, feature_names=None, class_names=None):
        """
        LIME核心思想:
        - 在待解释样本附近采样
        - 用可解释模型(如线性模型)拟合局部行为
        - 局部忠实性优先于全局准确性
        """
        self.model = model
        self.feature_names = feature_names
        self.class_names = class_names

        # 创建LIME解释器
        self.explainer = lime.lime_tabular.LimeTabularExplainer(
            X_train,
            feature_names=feature_names,
            class_names=class_names,
            discretize_continuous=True,
            mode='classification'
        )

    def explain_instance(self, instance, num_features=10):
        """
        解释单个实例

        返回:哪些特征对当前预测最重要
        """
        # 获取解释
        exp = self.explainer.explain_instance(
            instance,
            self.model.predict_proba,
            num_features=num_features,
            top_labels=1
        )

        print("=" * 50)
        print("LIME局部解释")
        print("=" * 50)

        # 显示解释
        label = exp.available_labels()[0]
        print(f"\n预测类别: {self.class_names[label] if self.class_names else label}")
        print(f"预测概率: {self.model.predict_proba([instance])[0][label]:.4f}")

        print("\n最重要的特征:")
        for feature, weight in exp.as_list(label=label):
            direction = "支持" if weight > 0 else "反对"
            print(f"  {feature}: {direction}预测 ({weight:+.4f})")

        # 可视化
        exp.show_in_notebook(show_table=True, show_all=False)

        return exp

# 使用示例
data = load_breast_cancer()
X, y = data.data, data.target

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)

explainer = LIMEExplainer(
    model, X,
    feature_names=data.feature_names,
    class_names=['Malignant', 'Benign']
)

exp = explainer.explain_instance(X[0], num_features=5)

注意力可视化

Python
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import seaborn as sns

class AttentionVisualizer:
    """Transformer注意力可视化"""

    def __init__(self, model):
        self.model = model
        self.attention_weights = []

        # 注册钩子捕获注意力权重
        self._register_hooks()

    def _register_hooks(self):
        """注册前向钩子捕获注意力"""
        def hook_fn(module, input, output):
            # 假设output包含注意力权重
            if isinstance(output, tuple) and len(output) > 1:
                self.attention_weights.append(output[1].detach())  # detach()从计算图分离,不参与梯度计算

        for name, module in self.model.named_modules():
            if 'attn' in name.lower() or 'attention' in name.lower():
                module.register_forward_hook(hook_fn)

    def visualize_attention(self, tokens, layer_idx=-1, head_idx=0):
        """
        可视化注意力权重

        tokens: 输入token列表
        layer_idx: 要可视化的层
        head_idx: 要可视化的头
        """
        if not self.attention_weights:
            print("没有捕获到注意力权重")
            return

        # 获取指定层的注意力
        attn = self.attention_weights[layer_idx]  # (batch, heads, seq_len, seq_len)
        attn = attn[0, head_idx].cpu().numpy()  # (seq_len, seq_len)

        plt.figure(figsize=(10, 8))
        sns.heatmap(
            attn,
            xticklabels=tokens,
            yticklabels=tokens,
            cmap='viridis',
            annot=True,
            fmt='.2f',
            cbar_kws={'label': 'Attention Weight'}
        )
        plt.title(f'Attention Heatmap - Layer {layer_idx}, Head {head_idx}')
        plt.xlabel('Key')
        plt.ylabel('Query')
        plt.xticks(rotation=45, ha='right')
        plt.yticks(rotation=0)
        plt.tight_layout()
        plt.show()

    def analyze_attention_patterns(self, tokens):
        """分析注意力模式"""
        if not self.attention_weights:
            return

        print("=" * 50)
        print("注意力模式分析")
        print("=" * 50)

        num_layers = len(self.attention_weights)
        num_heads = self.attention_weights[0].shape[1]

        print(f"\n模型结构: {num_layers}层, {num_heads}头")

        # 分析每层的注意力熵(注意力分布的集中程度)
        for layer_idx, attn in enumerate(self.attention_weights):
            attn = attn[0]  # (heads, seq_len, seq_len)

            # 计算熵
            entropy = -(attn * torch.log(attn + 1e-10)).sum(dim=-1).mean(dim=-1)  # (heads,)
            avg_entropy = entropy.mean().item()  # .item()将单元素张量转为Python数值

            print(f"\n{layer_idx}: 平均注意力熵 = {avg_entropy:.4f}")
            print(f"  低熵 = 注意力集中, 高熵 = 注意力分散")

            # 找出最集中的头
            min_entropy_head = entropy.argmin().item()
            print(f"  最集中的头: Head {min_entropy_head}")

# 使用BERT的示例
from transformers import BertTokenizer, BertForSequenceClassification

def demo_attention_visualization():
    """演示注意力可视化"""
    # 加载预训练模型
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    model = BertForSequenceClassification.from_pretrained(
        'bert-base-uncased',
        output_attentions=True
    )

    # 输入文本
    text = "The cat sat on the mat"
    inputs = tokenizer(text, return_tensors='pt')
    tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])

    # 前向传播
    with torch.no_grad():
        outputs = model(**inputs)

    # 获取注意力权重
    attentions = outputs.attentions  # tuple of (layers) tensors

    # 可视化第一层第一个头
    attn = attentions[0][0, 0].numpy()  # (seq_len, seq_len)

    plt.figure(figsize=(10, 8))
    sns.heatmap(
        attn,
        xticklabels=tokens,
        yticklabels=tokens,
        cmap='viridis',
        annot=True,
        fmt='.2f'
    )
    plt.title('BERT Attention - Layer 0, Head 0')
    plt.tight_layout()
    plt.show()

    print(f"输入token: {tokens}")
    print(f"[CLS]的注意力分布:")
    for token, weight in zip(tokens, attn[0]):
        print(f"  → {token}: {weight:.3f}")

Grad-CAM (Gradient-weighted Class Activation Mapping)

Python
import torch
import torch.nn.functional as F
import cv2
import numpy as np
import matplotlib.pyplot as plt

class GradCAM:
    """Grad-CAM可视化"""

    def __init__(self, model, target_layer):
        """
        Grad-CAM: 使用梯度加权特征图来定位重要区域

        核心公式:
        L_gradcam = ReLU(Σ α_k * A^k)
        其中 α_k = GAP(∂Y/∂A^k)
        """
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None

        # 注册钩子
        self._register_hooks()

    def _register_hooks(self):
        """注册前向和后向钩子"""
        def forward_hook(module, input, output):
            self.activations = output.detach()

        def backward_hook(module, grad_input, grad_output):
            self.gradients = grad_output[0].detach()

        self.target_layer.register_forward_hook(forward_hook)
        self.target_layer.register_full_backward_hook(backward_hook)

    def generate_cam(self, input_image, target_class=None):
        """
        生成CAM热力图

        input_image: (1, C, H, W)
        """
        self.model.eval()  # eval()开启评估模式(关闭Dropout等)

        # 前向传播
        output = self.model(input_image)

        if target_class is None:
            target_class = output.argmax(dim=1).item()

        # 反向传播
        self.model.zero_grad()
        one_hot = torch.zeros_like(output)
        one_hot[0, target_class] = 1
        output.backward(gradient=one_hot, retain_graph=True)  # 反向传播计算梯度

        # 计算权重
        gradients = self.gradients  # (1, C, H, W)
        activations = self.activations  # (1, C, H, W)

        # 全局平均池化得到权重
        weights = gradients.mean(dim=(2, 3), keepdim=True)  # (1, C, 1, 1)

        # 加权特征图
        cam = (weights * activations).sum(dim=1, keepdim=True)  # (1, 1, H, W)
        cam = F.relu(cam)

        # 归一化
        cam = cam - cam.min()
        cam = cam / (cam.max() + 1e-8)

        # 上采样到原图大小
        cam = F.interpolate(
            cam,
            size=input_image.shape[2:],
            mode='bilinear',
            align_corners=False
        )

        return cam.squeeze().cpu().numpy(), target_class  # squeeze去除大小为1的维度

    def visualize(self, input_image, cam, alpha=0.5):
        """可视化CAM"""
        # 转换为numpy
        img = input_image.squeeze().permute(1, 2, 0).cpu().numpy()
        img = (img - img.min()) / (img.max() - img.min())

        # 应用颜色映射
        heatmap = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET)
        heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)
        heatmap = heatmap / 255.0

        # 叠加
        overlay = img * (1 - alpha) + heatmap * alpha
        overlay = np.clip(overlay, 0, 1)

        # 显示
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))

        axes[0].imshow(img)
        axes[0].set_title('Original Image')
        axes[0].axis('off')

        axes[1].imshow(heatmap)
        axes[1].set_title('Grad-CAM Heatmap')
        axes[1].axis('off')

        axes[2].imshow(overlay)
        axes[2].set_title('Overlay')
        axes[2].axis('off')

        plt.tight_layout()
        plt.show()

# 使用示例
from torchvision.models import resnet50, ResNet50_Weights
from torchvision.transforms import Compose, Resize, ToTensor, Normalize
from PIL import Image

def demo_gradcam():
    """演示Grad-CAM"""
    # 加载模型
    model = resnet50(weights=ResNet50_Weights.DEFAULT)
    model.eval()

    # 选择目标层(最后一个卷积层)
    target_layer = model.layer4[-1].conv3

    grad_cam = GradCAM(model, target_layer)

    # 预处理图像
    transform = Compose([
        Resize((224, 224)),
        ToTensor(),
        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    # 加载图像(使用随机图像作为示例)
    img = torch.randn(1, 3, 224, 224)

    # 生成CAM
    cam, pred_class = grad_cam.generate_cam(img)

    print(f"预测类别: {pred_class}")
    print(f"CAM形状: {cam.shape}")
    print(f"CAM值范围: [{cam.min():.3f}, {cam.max():.3f}]")

🎯 因果推断基础

因果 vs 相关

Text Only
核心区别:

相关性 (Correlation):
- A和B同时发生
- 可能有共同原因C
- 可能是巧合
- 不能用于干预决策

因果性 (Causation):
- A导致B发生
- 改变A会改变B
- 可以预测干预效果
- 支持决策制定

经典例子:
- 冰淇淋销量 ↔ 溺水事故 (相关但非因果)
  共同原因:天气炎热

- 吸烟 → 肺癌 (因果关系)
  干预:戒烟可以降低肺癌风险

因果推断框架

Python
class CausalInferenceBasics:
    """因果推断基础概念"""

    def __init__(self):
        pass

    @staticmethod  # @staticmethod静态方法,无需实例即可调用
    def explain_potential_outcomes():
        """
        潜在结果框架 (Rubin Causal Model)

        对于个体i:
        - Y_i(1): 接受处理的结果
        - Y_i(0): 不接受处理的结果

        个体处理效应: τ_i = Y_i(1) - Y_i(0)

        根本问题:无法同时观察到Y_i(1)和Y_i(0)
        """
        print("=" * 50)
        print("潜在结果框架")
        print("=" * 50)
        print("""
核心概念:
1. 处理变量 T ∈ {0, 1}
2. 潜在结果 Y(0), Y(1)
3. 观察结果 Y = T·Y(1) + (1-T)·Y(0)

因果效应度量:
- 平均处理效应 (ATE): E[Y(1) - Y(0)]
- 处理组平均效应 (ATT): E[Y(1) - Y(0) | T=1]
- 条件平均效应 (CATE): E[Y(1) - Y(0) | X=x]
        """)

    @staticmethod
    def explain_causal_graphs():
        """
        因果图 (Pearl's Causal Graph)

        用有向无环图(DAG)表示因果关系
        """
        print("=" * 50)
        print("因果图模型")
        print("=" * 50)
        print("""
基本结构:

1. 链式结构 (Chain): X → Z → Y
   - X间接影响Y
   - 控制Z会阻断路径

2. 分叉结构 (Fork): X ← Z → Y
   - Z是混杂因子
   - 控制Z消除混杂

3. 对撞结构 (Collider): X → Z ← Y
   - 控制Z会打开路径
   - 不应控制对撞变量

d-分离:判断条件独立性的图形准则
        """)

# 因果图示例(使用文本表示)
causal_graph_examples = """
示例1:药物试验
    年龄 → 药物 ← 病情严重程度
      ↓      ↓
    血压变化 ← ?

混杂因子:年龄、病情严重程度
需要控制这些变量来估计药物的真实效果

示例2:工作培训项目
    教育水平 → 工作培训 ← 动机
       ↓           ↓
    收入 ← ?

选择偏差:只有有动机的人才会参加培训
需要处理选择偏差
"""

工具变量法

Python
import numpy as np
import statsmodels.api as sm
from sklearn.linear_model import LinearRegression

class InstrumentalVariables:
    """工具变量法"""

    def __init__(self):
        pass

    def explain_iv(self):
        """
        工具变量 (IV) 方法

        当存在未观察到的混杂因子时,
        工具变量可以帮助识别因果效应

        工具变量Z的条件:
        1. 相关性:Z与处理变量T相关
        2. 排他性:Z只通过T影响Y
        3. 无混杂:Z与Y的误差项无关
        """
        print("=" * 50)
        print("工具变量法")
        print("=" * 50)
        print("""
两阶段最小二乘法 (2SLS):

阶段1: T = π₀ + π₁Z + ε
        用Z预测T

阶段2: Y = β₀ + β₁T̂ + u
        用预测的T̂估计对Y的影响

β₁就是因果效应的IV估计
        """)

    def iv_estimation(self, Z, T, Y):
        """
        工具变量估计

        Z: 工具变量 (n,)
        T: 处理变量 (n,)
        Y: 结果变量 (n,)
        """
        # 阶段1: Z → T
        Z_with_const = sm.add_constant(Z)
        stage1 = sm.OLS(T, Z_with_const).fit()
        T_hat = stage1.predict(Z_with_const)

        print("阶段1结果 (Z → T):")
        print(f"  系数: {stage1.params[1]:.4f}")
        print(f"  R²: {stage1.rsquared:.4f}")

        # 阶段2: T̂ → Y
        T_hat_with_const = sm.add_constant(T_hat)
        stage2 = sm.OLS(Y, T_hat_with_const).fit()

        print("\n阶段2结果 (T̂ → Y):")
        print(f"  因果效应估计: {stage2.params[1]:.4f}")
        print(f"  标准误: {stage2.bse[1]:.4f}")
        print(f"  p值: {stage2.pvalues[1]:.4f}")

        return stage2.params[1], stage2.bse[1]

    def generate_iv_data(self, n=1000, true_effect=2.0):
        """生成工具变量示例数据"""
        np.random.seed(42)

        # 未观察到的混杂因子
        U = np.random.normal(0, 1, n)

        # 工具变量(随机分配)
        Z = np.random.binomial(1, 0.5, n)

        # 处理变量(受Z和U影响)
        T = 0.5 * Z + 0.8 * U + np.random.normal(0, 0.5, n)

        # 结果变量(受T和U影响)
        Y = true_effect * T + 1.5 * U + np.random.normal(0, 1, n)

        return Z, T, Y, true_effect

# 演示
iv = InstrumentalVariables()
iv.explain_iv()
Z, T, Y, true_effect = iv.generate_iv_data()
estimated_effect, se = iv.iv_estimation(Z, T, Y)
print(f"\n真实效应: {true_effect:.4f}")
print(f"估计效应: {estimated_effect:.4f}")

双重差分法 (DID)

Python
import pandas as pd
import statsmodels.formula.api as smf

class DifferenceInDifferences:
    """双重差分法"""

    def __init__(self):
        pass

    def explain_did(self):
        """
        双重差分法 (Difference-in-Differences)

        利用面板数据,比较处理组和对照组
        在处理前后的变化差异

        关键假设:平行趋势假设
        - 如果没有处理,处理组和对照组的趋势应该平行
        """
        print("=" * 50)
        print("双重差分法 (DID)")
        print("=" * 50)
        print("""
DID估计量:

效应 = (Y_treatment_post - Y_treatment_pre)
     - (Y_control_post - Y_control_pre)

回归形式:
Y = β₀ + β₁·Treat + β₂·Post + β₃·(Treat×Post) + ε

β₃就是DID估计的因果效应
        """)

    def generate_did_data(self, n_units=1000, n_periods=2, true_effect=3.0):
        """生成DID示例数据"""
        np.random.seed(42)

        data = []

        # 处理组
        for i in range(n_units // 2):
            # 处理前
            y_pre = 10 + np.random.normal(0, 1)
            data.append({
                'unit': i,
                'period': 0,
                'treat': 1,
                'post': 0,
                'y': y_pre
            })
            # 处理后(接受处理)
            y_post = 10 + true_effect + np.random.normal(0, 1)
            data.append({
                'unit': i,
                'period': 1,
                'treat': 1,
                'post': 1,
                'y': y_post
            })

        # 对照组
        for i in range(n_units // 2, n_units):
            # 处理前
            y_pre = 8 + np.random.normal(0, 1)
            data.append({
                'unit': i,
                'period': 0,
                'treat': 0,
                'post': 0,
                'y': y_pre
            })
            # 处理后(未接受处理)
            y_post = 8 + 0.5 + np.random.normal(0, 1)  # 自然增长0.5
            data.append({
                'unit': i,
                'period': 1,
                'treat': 0,
                'post': 1,
                'y': y_post
            })

        return pd.DataFrame(data)

    def estimate_did(self, df):
        """估计DID模型"""
        # 创建交互项
        df['treat_post'] = df['treat'] * df['post']

        # 回归
        model = smf.ols('y ~ treat + post + treat_post', data=df).fit()

        print("\nDID回归结果:")
        print(model.summary().tables[1])

        # 提取DID估计量
        did_estimate = model.params['treat_post']
        did_se = model.bse['treat_post']

        print(f"\nDID估计的因果效应: {did_estimate:.4f} (SE: {did_se:.4f})")

        return did_estimate, did_se

    def check_parallel_trends(self, df):
        """检验平行趋势假设"""
        # 需要多期数据才能检验
        print("\n平行趋势假设检验:")
        print("需要处理前多期数据来验证趋势是否平行")
        print("方法:加入处理前各期虚拟变量与处理组的交互项")
        print("如果都不显著,则支持平行趋势假设")

# 演示
did = DifferenceInDifferences()
did.explain_did()
df = did.generate_did_data()
effect, se = did.estimate_did(df)
print(f"\n真实效应: 3.0")
print(f"估计效应: {effect:.4f}")

倾向得分匹配 (PSM)

Python
from sklearn.neighbors import NearestNeighbors
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler

class PropensityScoreMatching:
    """倾向得分匹配"""

    def __init__(self):
        pass

    def explain_psm(self):
        """
        倾向得分匹配 (Propensity Score Matching)

        核心思想:
        - 估计每个个体接受处理的概率(倾向得分)
        - 匹配倾向得分相似的个体
        - 比较匹配后的处理组和对照组

        假设:
        - 无未观察到的混杂因子
        - 共同支撑域(有重叠的倾向得分分布)
        """
        print("=" * 50)
        print("倾向得分匹配 (PSM)")
        print("=" * 50)
        print("""
步骤:
1. 用Logistic回归估计倾向得分
   e(X) = P(T=1|X)

2. 匹配算法
   - 最近邻匹配
   - 卡尺匹配
   - 核匹配

3. 平衡性检验
   - 匹配后协变量应该平衡

4. 估计处理效应
   - 比较匹配后的结果差异
        """)

    def estimate_propensity_scores(self, X, T):
        """估计倾向得分"""
        # 标准化
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)

        # Logistic回归
        ps_model = LogisticRegression(max_iter=1000)
        ps_model.fit(X_scaled, T)

        # 预测倾向得分
        propensity_scores = ps_model.predict_proba(X_scaled)[:, 1]

        return propensity_scores, ps_model

    def match_samples(self, X, T, Y, propensity_scores, caliper=0.05):
        """
        卡尺内最近邻匹配

        为每个处理组样本在对照组中找到
        倾向得分最接近的匹配
        """
        # 分离处理组和对照组
        treated_idx = np.where(T == 1)[0]
        control_idx = np.where(T == 0)[0]

        treated_ps = propensity_scores[treated_idx]
        control_ps = propensity_scores[control_idx]

        # 为对照组构建最近邻搜索器
        nbrs = NearestNeighbors(n_neighbors=1).fit(control_ps.reshape(-1, 1))

        matched_pairs = []

        for i, ps in enumerate(treated_ps):
            distances, indices = nbrs.kneighbors([[ps]])
            distance = distances[0][0]
            match_idx = control_idx[indices[0][0]]

            # 卡尺检查
            if distance <= caliper:
                matched_pairs.append((treated_idx[i], match_idx))

        print(f"匹配对数: {len(matched_pairs)} / {len(treated_idx)}")

        return matched_pairs

    def estimate_ate(self, Y, matched_pairs):
        """估计平均处理效应"""
        treated_outcomes = [Y[i] for i, _ in matched_pairs]
        control_outcomes = [Y[j] for _, j in matched_pairs]

        ate = np.mean(treated_outcomes) - np.mean(control_outcomes)

        # 标准误(简化计算)
        n = len(matched_pairs)
        se = np.sqrt(np.var(treated_outcomes) / n + np.var(control_outcomes) / n)

        print(f"\nPSM估计的ATE: {ate:.4f} (SE: {se:.4f})")

        return ate, se

    def check_balance(self, X, T, matched_pairs):
        """检验匹配后的平衡性"""
        print("\n平衡性检验:")
        print("匹配前 vs 匹配后的标准化差异")

        for col in range(X.shape[1]):
            # 匹配前
            treated_mean_pre = X[T==1, col].mean()
            control_mean_pre = X[T==0, col].mean()
            pooled_std_pre = np.sqrt((X[T==1, col].var() + X[T==0, col].var()) / 2)
            std_diff_pre = abs(treated_mean_pre - control_mean_pre) / pooled_std_pre

            # 匹配后
            treated_idx = [i for i, _ in matched_pairs]
            control_idx = [j for _, j in matched_pairs]
            treated_mean_post = X[treated_idx, col].mean()
            control_mean_post = X[control_idx, col].mean()
            pooled_std_post = np.sqrt((X[treated_idx, col].var() + X[control_idx, col].var()) / 2)
            std_diff_post = abs(treated_mean_post - control_mean_post) / pooled_std_post

            print(f"  变量{col}: 匹配前={std_diff_pre:.3f}, 匹配后={std_diff_post:.3f}")

# 演示
def demo_psm():
    np.random.seed(42)
    n = 1000

    # 协变量
    X1 = np.random.normal(0, 1, n)
    X2 = np.random.normal(0, 1, n)
    X = np.column_stack([X1, X2])

    # 处理变量(受协变量影响)
    propensity = 1 / (1 + np.exp(-(0.5 * X1 + 0.3 * X2)))
    T = np.random.binomial(1, propensity)

    # 结果变量
    Y = 2 * T + 1.5 * X1 + 0.8 * X2 + np.random.normal(0, 1, n)

    psm = PropensityScoreMatching()
    psm.explain_psm()

    ps, model = psm.estimate_propensity_scores(X, T)
    matched_pairs = psm.match_samples(X, T, Y, ps)
    ate, se = psm.estimate_ate(Y, matched_pairs)
    psm.check_balance(X, T, matched_pairs)

    print(f"\n真实效应: 2.0")
    print(f"估计效应: {ate:.4f}")

demo_psm()

🧠 因果发现

基于约束的方法 (PC算法)

Python
class CausalDiscovery:
    """因果发现算法"""

    def __init__(self):
        pass

    def explain_pc_algorithm(self):
        """
        PC算法 (Peter-Clark)

        基于条件独立性检验发现因果结构

        步骤:
        1. 构建完全无向图
        2. 基于条件独立性去除边
        3. 确定V结构
        4. 定向更多边
        """
        print("=" * 50)
        print("PC算法")
        print("=" * 50)
        print("""
核心思想:
- 如果X和Y在给定Z的条件下独立,
  则它们之间没有直接因果关系

步骤:
1. 骨架学习:找到条件独立的变量对
2. V结构识别:X-Z-Y且X和Y不独立
3. 边定向:传播方向约束

假设:
- 因果充分性(无未观察到的混杂因子)
- 忠实性(条件独立反映d-分离)
        """)

    def conditional_independence_test(self, X, Y, Z=None, alpha=0.05):
        """
        条件独立性检验

        使用偏相关检验
        """
        from scipy import stats

        if Z is None or len(Z) == 0:
            # 简单相关检验
            corr, p_value = stats.pearsonr(X, Y)
        else:
            # 偏相关检验
            from sklearn.linear_model import LinearRegression

            # X对Z回归
            reg_x = LinearRegression().fit(Z, X)
            resid_x = X - reg_x.predict(Z)

            # Y对Z回归
            reg_y = LinearRegression().fit(Z, Y)
            resid_y = Y - reg_y.predict(Z)

            # 残差相关
            corr, p_value = stats.pearsonr(resid_x, resid_y)

        independent = p_value > alpha
        return independent, p_value

# 使用pgmpy库进行因果发现
def demo_causal_discovery():
    """演示因果发现"""
    try:  # try/except捕获异常,防止程序崩溃
        from pgmpy.estimators import PC
        from pgmpy.models import BayesianNetwork
        import pandas as pd

        # 生成示例数据
        np.random.seed(42)
        n = 1000

        # X → Y → Z
        X = np.random.normal(0, 1, n)
        Y = 0.5 * X + np.random.normal(0, 0.5, n)
        Z = 0.7 * Y + np.random.normal(0, 0.5, n)

        data = pd.DataFrame({'X': X, 'Y': Y, 'Z': Z})

        # PC算法
        pc = PC(data)
        model = pc.estimate()

        print("发现的因果结构:")
        print(model.edges())

    except ImportError:
        print("需要安装pgmpy: pip install pgmpy")

🔬 前沿方法

大语言模型的可解释性

Python
class LLMInterpretability:
    """
    大语言模型可解释性方法

    2024-2025年研究热点
    """

    def logit_lens(self, model, input_ids, layer_idx):
        """
        Logit Lens:观察中间层的预测

        通过将中间层表示投影到词表,
        观察模型在各层的"思考过程"
        """
        with torch.no_grad():
            outputs = model(input_ids, output_hidden_states=True)
            hidden = outputs.hidden_states[layer_idx]

            # 使用输出嵌入矩阵投影
            logits = hidden @ model.lm_head.weight.T
            probs = torch.softmax(logits, dim=-1)

            # 获取top预测
            top_probs, top_ids = torch.topk(probs[0, -1], k=5)

        return top_ids, top_probs

    def attention_pattern_analysis(self, attention_weights, tokens):
        """
        注意力模式分析

        识别induction heads、previous token heads等
        """
        # Induction head检测:关注[ A ] [ B ] ... [ A ] → [ B ] 模式
        seq_len = attention_weights.shape[-1]

        patterns = {
            'induction_heads': [],
            'previous_token_heads': [],
            'duplicate_token_heads': []
        }

        for head_idx in range(attention_weights.shape[0]):
            attn = attention_weights[head_idx]

            # 检测previous token head(关注前一个token)
            prev_token_score = torch.diag(attn, offset=-1).mean()
            if prev_token_score > 0.5:
                patterns['previous_token_heads'].append(head_idx)

            # 检测duplicate token head(关注重复token)
            for i in range(seq_len):
                for j in range(i):
                    if tokens[i] == tokens[j] and attn[i, j] > 0.3:
                        patterns['duplicate_token_heads'].append(head_idx)
                        break

        return patterns

    def steering_vector(self, model, layer_idx, concept_examples):
        """
        方向操控(Representation Engineering)

        通过添加方向向量来操控模型行为
        """
        # 收集概念的正负例表示
        positive_reps = []
        negative_reps = []

        for text, label in concept_examples:
            inputs = tokenizer(text, return_tensors='pt')
            with torch.no_grad():
                outputs = model(**inputs, output_hidden_states=True)
                rep = outputs.hidden_states[layer_idx][0, -1]

                if label == 1:
                    positive_reps.append(rep)
                else:
                    negative_reps.append(rep)

        # 计算方向向量
        positive_mean = torch.stack(positive_reps).mean(dim=0)
        negative_mean = torch.stack(negative_reps).mean(dim=0)
        steering_vec = positive_mean - negative_mean

        return steering_vec / steering_vec.norm()

class MechanisticInterpretability:
    """
    机械可解释性

    尝试逆向工程神经网络的内部机制
    """

    def circuit_tracing(self, model, input_text, target_output):
        """
        电路追踪:识别参与特定任务的子网络

        使用激活修补(activation patching)技术
        """
        # 1. 记录clean运行的激活
        clean_cache = {}
        def hook_fn(name):
            def fn(module, input, output):
                clean_cache[name] = output.detach()
            return fn

        # 2. 运行corrupted输入
        # 3. 将clean激活patch到corrupted运行
        # 4. 测量对输出的影响

        pass

    def superposition_analysis(self, weight_matrix, num_features):
        """
        超位置分析

        理解神经网络如何编码比维度更多的特征
        """
        # SVD分解
        U, S, V = torch.svd(weight_matrix)

        # 分析特征值分布
        explained_variance = (S ** 2) / (S ** 2).sum()

        return {
            'singular_values': S,
            'explained_variance': explained_variance,
            'effective_dimensionality': (explained_variance > 0.01).sum()
        }

因果机器学习 (Causal Machine Learning)

Python
class CausalML:
    """
    因果机器学习

    结合机器学习与因果推断的前沿方法
    """

    def __init__(self):
        pass

    def causal_forest(self, X, T, Y):
        """
        因果森林 (Causal Forest)

        估计异质性处理效应 (CATE)
        """
        # econml 0.14+ 使用 CausalForestDML(原 econml.grf.CausalForest 已弃用)
        from econml.dml import CausalForestDML

        # 训练因果森林
        est = CausalForestDML(
            n_estimators=100,
            max_depth=5,
            min_samples_leaf=10
        )

        est.fit(Y, T, X=X)

        # 预测个体处理效应
        treatment_effects = est.effect(X)

        return est, treatment_effects

    def double_machine_learning(self, X, T, Y):
        """
        双重机器学习 (Double Machine Learning)

        使用ML估计干扰函数,获得无偏的ATE估计
        """
        from econml.dml import LinearDML

        # 使用随机森林作为干扰函数估计器
        from sklearn.ensemble import RandomForestRegressor

        est = LinearDML(
            model_y=RandomForestRegressor(),
            model_t=RandomForestRegressor(),
            cv=5
        )

        est.fit(Y, T, X=X)

        # 平均处理效应(econml 0.14+ 使用方法调用而非属性)
        ate = est.ate(X)
        ate_interval = est.ate_interval(X)

        return ate, ate_interval

    def causal_representation_learning(self, X, T, Y):
        """
        因果表示学习

        学习对干预不变的表示
        """
        import torch.nn as nn

        class CausalRepresentationNet(nn.Module):  # 继承nn.Module定义神经网络层
            def __init__(self, input_dim, hidden_dim, repr_dim):
                super().__init__()  # super()调用父类方法

                # 编码器:学习因果表示
                self.encoder = nn.Sequential(
                    nn.Linear(input_dim, hidden_dim),
                    nn.ReLU(),
                    nn.Linear(hidden_dim, repr_dim)
                )

                # 处理效应预测头
                self.treatment_head = nn.Linear(repr_dim, 1)

                # 结果预测头
                self.outcome_head = nn.Sequential(
                    nn.Linear(repr_dim + 1, hidden_dim),  # +1 for treatment
                    nn.ReLU(),
                    nn.Linear(hidden_dim, 1)
                )

            def forward(self, x, t):
                # 学习表示
                z = self.encoder(x)

                # 预测处理效应
                treatment_effect = self.treatment_head(z)

                # 预测结果
                outcome_input = torch.cat([z, t.unsqueeze(-1)], dim=-1)
                outcome = self.outcome_head(outcome_input)

                return z, treatment_effect, outcome

        return CausalRepresentationNet

    def counterfactual_generation(self, model, x, t, t_counterfactual):
        """
        反事实样本生成

        使用生成模型生成反事实结果
        """
        # 使用CVAE或扩散模型生成反事实
        # 输入:原始特征x,原始处理t,目标处理t_counterfactual
        # 输出:反事实结果y_cf

        pass

class AutomatedCausalDiscovery:
    """
    自动因果发现

    从数据自动发现因果结构
    """

    def __init__(self):
        pass

    def lingam(self, data):
        """
        LiNGAM: 非高斯数据的线性因果模型

        利用非高斯性识别因果方向
        """
        try:
            from causallearn.search.FCMBased import lingam

            model = lingam.ICALiNGAM()
            model.fit(data)

            # 因果矩阵
            causal_matrix = model.adjacency_matrix_

            return causal_matrix
        except ImportError:
            print("需要安装causallearn: pip install causallearn")
            return None

    def notears(self, data):
        """
        NOTEARS: 基于梯度的因果发现

        使用可微分优化学习DAG
        """
        try:
            from notears.linear import notears_linear

            # 学习加权邻接矩阵
            W = notears_linear(data, lambda1=0.1, loss_type='l2')

            # 阈值化得到DAG
            A = (np.abs(W) > 0.3).astype(int)

            return A, W
        except ImportError:
            print("需要安装notears: pip install notears")
            return None, None

可信AI与公平性

Python
class TrustworthyAI:
    """
    可信AI:公平性、鲁棒性、隐私保护
    """

    def fairness_metrics(self, y_true, y_pred, sensitive_attr):
        """
        公平性评估指标
        """
        from fairlearn.metrics import demographic_parity_difference, equalized_odds_difference

        # 人口统计均等差异
        dp_diff = demographic_parity_difference(
            y_true, y_pred, sensitive_features=sensitive_attr
        )

        # 机会均等差异
        eo_diff = equalized_odds_difference(
            y_true, y_pred, sensitive_features=sensitive_attr
        )

        return {
            'demographic_parity': dp_diff,
            'equalized_odds': eo_diff
        }

    def adversarial_debiasing(self, model, X, y, sensitive_attr):
        """
        对抗性去偏

        通过对抗训练消除敏感属性影响
        """
        import torch.nn as nn

        class AdversarialDebiasingNet(nn.Module):
            def __init__(self, input_dim, hidden_dim, output_dim):
                super().__init__()

                # 主预测器
                self.predictor = nn.Sequential(
                    nn.Linear(input_dim, hidden_dim),
                    nn.ReLU(),
                    nn.Linear(hidden_dim, output_dim)
                )

                # 对抗分类器(预测敏感属性)
                self.adversary = nn.Sequential(
                    nn.Linear(output_dim, hidden_dim // 2),
                    nn.ReLU(),
                    nn.Linear(hidden_dim // 2, 1)
                )

            def forward(self, x):
                # 主预测
                prediction = self.predictor(x)

                # 梯度反转层
                reversed = GradientReversal.apply(prediction)

                # 对抗预测
                sensitive_pred = self.adversary(reversed)

                return prediction, sensitive_pred

        return AdversarialDebiasingNet

class GradientReversal(torch.autograd.Function):
    """梯度反转层"""

    @staticmethod
    def forward(ctx, x):
        return x.view_as(x)

    @staticmethod
    def backward(ctx, grad_output):
        return -grad_output

💡 总结

Text Only
可解释AI与因果推断核心要点:

┌─────────────────────────────────────────────────────────────┐
│ 可解释AI (XAI)                                              │
├─────────────────────────────────────────────────────────────┤
│ 1. 内在可解释模型                                            │
│    - 线性模型:权重直接可解释                                │
│    - 决策树:决策路径透明                                    │
├─────────────────────────────────────────────────────────────┤
│ 2. 事后解释方法                                              │
│    - SHAP:基于博弈论的全局+局部解释                         │
│    - LIME:局部线性近似                                      │
│    - Grad-CAM:CNN可视化                                     │
│    - 注意力可视化:Transformer解释                           │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ 因果推断                                                    │
├─────────────────────────────────────────────────────────────┤
│ 1. 基础概念                                                  │
│    - 潜在结果框架                                            │
│    - 因果图模型                                              │
│    - 混杂因子与选择偏差                                      │
├─────────────────────────────────────────────────────────────┤
│ 2. 识别策略                                                  │
│    - 工具变量 (IV):处理内生性问题                           │
│    - 双重差分 (DID):利用面板数据                            │
│    - 倾向得分匹配 (PSM):控制观察到的混杂                    │
├─────────────────────────────────────────────────────────────┤
│ 3. 因果发现                                                  │
│    - PC算法:基于条件独立性                                  │
│    - 从数据中发现因果结构                                    │
└─────────────────────────────────────────────────────────────┘

实践建议:
1. 优先使用内在可解释模型
2. 对黑盒模型使用多种解释方法交叉验证
3. 因果推断需要清晰的识别策略
4. 注意检验关键假设(平行趋势、工具变量有效性等)
5. 可解释性和因果性结合,构建可信AI系统

恭喜你完成了全部24章的学习! 🎉

你已经建立了从基础到前沿的完整机器学习知识体系。继续实践、探索和创新!

祝你学习愉快! 🚀