23 - 可解释AI与因果推断¶
🔍 可解释AI (XAI) 概述¶
为什么需要可解释性¶
Text Only
模型可解释性的重要性:
┌─────────────────────────────────────────────────────────────┐
│ 场景 │ 解释性需求 │
├─────────────────────────────────────────────────────────────┤
│ 医疗诊断 │ 医生需要理解决策依据 │
│ 金融风控 │ 监管要求解释拒绝原因 │
│ 司法判决 │ 必须提供可审计的决策逻辑 │
│ 自动驾驶 │ 事故分析需要追溯决策过程 │
│ 科学研究 │ 从模型中发现新知识 │
└─────────────────────────────────────────────────────────────┘
黑盒问题:
- 深度学习模型参数量巨大
- 决策过程难以直观理解
- 无法验证模型是否学到正确特征
可解释性类型¶
Text Only
┌─────────────────────────────────────────────────────────────┐
│ 可解释性分类 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 按解释时机 按解释范围 按解释方法 │
│ ├─ 内在可解释 ├─ 全局解释 ├─ 基于特征 │
│ │ (模型本身透明) │ (整体行为) │ (特征重要性) │
│ │ │ │ │
│ └─ 事后解释 └─ 局部解释 └─ 基于样本 │
│ (解释黑盒) (单个预测) (反事实解释) │
│ │
└─────────────────────────────────────────────────────────────┘
🧮 内在可解释模型¶
线性模型解释¶
Python
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.datasets import make_classification, make_regression
class LinearModelInterpretation:
"""线性模型的可解释性"""
def __init__(self): # __init__构造方法,创建对象时自动调用
pass
def explain_linear_regression(self, X, y, feature_names=None):
"""
线性回归的可解释性
y = w₁x₁ + w₂x₂ + ... + wₙxₙ + b
权重直接表示特征影响程度
"""
model = LinearRegression()
model.fit(X, y)
if feature_names is None:
feature_names = [f"Feature_{i}" for i in range(X.shape[1])] # 列表推导式,简洁创建列表
# 特征重要性(标准化后的权重)
X_std = (X - X.mean(axis=0)) / X.std(axis=0)
model_std = LinearRegression()
model_std.fit(X_std, y)
importance = np.abs(model_std.coef_)
print("=" * 50)
print("线性回归解释")
print("=" * 50)
print(f"\n方程: y = {' + '.join([f'{w:.3f}*{name}' for w, name in zip(model.coef_, feature_names)])} + {model.intercept_:.3f}")
print(f"\n特征重要性(标准化权重绝对值):")
for name, imp in zip(feature_names, importance):
print(f" {name}: {imp:.3f}")
return model, importance
def explain_logistic_regression(self, X, y, feature_names=None):
"""
逻辑回归的可解释性
log(p/(1-p)) = w₁x₁ + w₂x₂ + ... + wₙxₙ + b
权重表示对数几率变化
"""
model = LogisticRegression(max_iter=1000)
model.fit(X, y)
if feature_names is None:
feature_names = [f"Feature_{i}" for i in range(X.shape[1])]
# 优势比 (Odds Ratio)
odds_ratios = np.exp(model.coef_[0])
print("=" * 50)
print("逻辑回归解释")
print("=" * 50)
print(f"\n对数几率方程: log(p/(1-p)) = {' + '.join([f'{w:.3f}*{name}' for w, name in zip(model.coef_[0], feature_names)])} + {model.intercept_[0]:.3f}")
print(f"\n优势比 (Odds Ratio) - 特征增加1单位,优势变化倍数:")
for name, or_val in zip(feature_names, odds_ratios):
direction = "增加" if or_val > 1 else "减少"
print(f" {name}: {or_val:.3f} ({direction} {abs(or_val-1)*100:.1f}%)")
return model, odds_ratios
# 示例
X_reg, y_reg = make_regression(n_samples=100, n_features=3, noise=10, random_state=42)
interpreter = LinearModelInterpretation()
interpreter.explain_linear_regression(X_reg, y_reg, ['Age', 'Income', 'Education'])
X_cls, y_cls = make_classification(n_samples=100, n_features=3, n_classes=2, random_state=42)
interpreter.explain_logistic_regression(X_cls, y_cls, ['Age', 'Income', 'Education'])
决策树解释¶
Python
from sklearn.tree import DecisionTreeClassifier, export_text, plot_tree
from sklearn.ensemble import RandomForestClassifier
import graphviz
class TreeModelInterpretation:
"""树模型的可解释性"""
def explain_decision_tree(self, X, y, feature_names=None, max_depth=3):
"""
决策树的可解释性
优势:
- 决策路径完全透明
- 支持可视化
- 特征重要性直观
"""
model = DecisionTreeClassifier(max_depth=max_depth, random_state=42)
model.fit(X, y)
if feature_names is None:
feature_names = [f"Feature_{i}" for i in range(X.shape[1])]
print("=" * 50)
print("决策树解释")
print("=" * 50)
# 文本形式的决策规则
tree_rules = export_text(model, feature_names=feature_names)
print("\n决策规则:")
print(tree_rules)
# 特征重要性
importance = model.feature_importances_
print("\n特征重要性 (Gini重要性):")
for name, imp in zip(feature_names, importance):
print(f" {name}: {imp:.3f}")
# 解释单个样本的决策路径
sample = X[0]
self._explain_prediction_path(model, sample, feature_names)
return model
def _explain_prediction_path(self, model, sample, feature_names):
"""解释单个样本的决策路径"""
node_indicator = model.decision_path([sample])
leaf_id = model.apply([sample])[0]
print("\n决策路径解释:")
node_index = node_indicator.indices[node_indicator.indptr[0]:node_indicator.indptr[1]]
for node_id in node_index:
if node_id == leaf_id:
print(f" → 到达叶节点 {node_id},预测类别: {model.classes_[np.argmax(model.tree_.value[node_id])]}")
else:
feature = model.tree_.feature[node_id]
threshold = model.tree_.threshold[node_id]
if sample[feature] <= threshold:
print(f" → 节点 {node_id}: {feature_names[feature]} = {sample[feature]:.2f} <= {threshold:.2f},走左分支")
else:
print(f" → 节点 {node_id}: {feature_names[feature]} = {sample[feature]:.2f} > {threshold:.2f},走右分支")
def explain_random_forest(self, X, y, feature_names=None):
"""
随机森林的可解释性
虽然整体是黑盒,但可以分析:
- 特征重要性
- 部分依赖图
"""
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)
if feature_names is None:
feature_names = [f"Feature_{i}" for i in range(X.shape[1])]
print("=" * 50)
print("随机森林解释")
print("=" * 50)
# 特征重要性
importance = model.feature_importances_
std = np.std([tree.feature_importances_ for tree in model.estimators_], axis=0)
print("\n特征重要性 (平均Gini重要性):")
for name, imp, s in zip(feature_names, importance, std):
print(f" {name}: {imp:.3f} (±{s:.3f})")
return model, importance
🔬 事后解释方法¶
SHAP (SHapley Additive exPlanations)¶
Python
import shap
from sklearn.ensemble import RandomForestClassifier
class SHAPExplainer:
"""SHAP值解释"""
def __init__(self, model, X_background):
"""
SHAP基于博弈论中的Shapley值
核心思想:
- 每个特征的贡献 = 该特征参与的所有组合的边际贡献的加权平均
- 满足效率性、对称性、虚拟性、可加性
"""
self.model = model
self.explainer = shap.TreeExplainer(model) if hasattr(model, 'tree_') else shap.KernelExplainer(model.predict, X_background) # hasattr检查对象是否有指定属性
def explain_global(self, X, feature_names=None):
"""
全局解释:所有样本的SHAP值汇总
"""
shap_values = self.explainer.shap_values(X)
print("=" * 50)
print("SHAP全局解释")
print("=" * 50)
# 特征重要性(SHAP值绝对值的平均)
if isinstance(shap_values, list): # 多分类
shap_importance = np.abs(shap_values[0]).mean(axis=0) # 链式调用,连续执行多个方法
else:
shap_importance = np.abs(shap_values).mean(axis=0)
if feature_names is None:
feature_names = [f"Feature_{i}" for i in range(X.shape[1])]
print("\n特征重要性 (|SHAP|平均值):")
for name, imp in zip(feature_names, shap_importance):
print(f" {name}: {imp:.4f}")
# 可视化
plt.figure(figsize=(10, 6))
shap.summary_plot(shap_values, X, feature_names=feature_names, show=False)
plt.title("SHAP Summary Plot")
plt.tight_layout()
plt.show()
return shap_values
def explain_local(self, X, instance_idx=0, feature_names=None):
"""
局部解释:单个样本的预测解释
预测 = 基准值 + Σ(SHAP值)
"""
shap_values = self.explainer.shap_values(X)
print("=" * 50)
print(f"SHAP局部解释 - 样本 {instance_idx}")
print("=" * 50)
if feature_names is None:
feature_names = [f"Feature_{i}" for i in range(X.shape[1])]
# 获取该样本的SHAP值
if isinstance(shap_values, list):
instance_shap = shap_values[0][instance_idx]
else:
instance_shap = shap_values[instance_idx]
# 基准值(期望输出)
base_value = self.explainer.expected_value
if isinstance(base_value, list):
base_value = base_value[0]
print(f"\n基准值 (期望输出): {base_value:.4f}")
print(f"实际预测: {base_value + np.sum(instance_shap):.4f}")
print(f"\n各特征贡献:")
# 按贡献排序
contributions = list(zip(feature_names, instance_shap, X[instance_idx]))
contributions.sort(key=lambda x: abs(x[1]), reverse=True) # lambda匿名函数,简洁定义单行函数
for name, shap_val, feat_val in contributions:
direction = "增加" if shap_val > 0 else "减少"
print(f" {name} = {feat_val:.3f}: {direction} {abs(shap_val):.4f}")
# 瀑布图
plt.figure(figsize=(10, 6))
shap.waterfall_plot(shap.Explanation(
values=instance_shap,
base_values=base_value,
data=X[instance_idx],
feature_names=feature_names
), show=False)
plt.title(f"SHAP Waterfall Plot - Instance {instance_idx}")
plt.tight_layout()
plt.show()
return instance_shap
# 使用示例
from sklearn.datasets import load_breast_cancer
data = load_breast_cancer()
X, y = data.data, data.target
feature_names = data.feature_names
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)
explainer = SHAPExplainer(model, X[:100]) # 切片操作取子序列
shap_values = explainer.explain_global(X[:50], feature_names)
explainer.explain_local(X, instance_idx=0, feature_names=feature_names)
LIME (Local Interpretable Model-agnostic Explanations)¶
Python
import lime
import lime.lime_tabular
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_breast_cancer
class LIMEExplainer:
"""LIME局部解释"""
def __init__(self, model, X_train, feature_names=None, class_names=None):
"""
LIME核心思想:
- 在待解释样本附近采样
- 用可解释模型(如线性模型)拟合局部行为
- 局部忠实性优先于全局准确性
"""
self.model = model
self.feature_names = feature_names
self.class_names = class_names
# 创建LIME解释器
self.explainer = lime.lime_tabular.LimeTabularExplainer(
X_train,
feature_names=feature_names,
class_names=class_names,
discretize_continuous=True,
mode='classification'
)
def explain_instance(self, instance, num_features=10):
"""
解释单个实例
返回:哪些特征对当前预测最重要
"""
# 获取解释
exp = self.explainer.explain_instance(
instance,
self.model.predict_proba,
num_features=num_features,
top_labels=1
)
print("=" * 50)
print("LIME局部解释")
print("=" * 50)
# 显示解释
label = exp.available_labels()[0]
print(f"\n预测类别: {self.class_names[label] if self.class_names else label}")
print(f"预测概率: {self.model.predict_proba([instance])[0][label]:.4f}")
print("\n最重要的特征:")
for feature, weight in exp.as_list(label=label):
direction = "支持" if weight > 0 else "反对"
print(f" {feature}: {direction}预测 ({weight:+.4f})")
# 可视化
exp.show_in_notebook(show_table=True, show_all=False)
return exp
# 使用示例
data = load_breast_cancer()
X, y = data.data, data.target
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)
explainer = LIMEExplainer(
model, X,
feature_names=data.feature_names,
class_names=['Malignant', 'Benign']
)
exp = explainer.explain_instance(X[0], num_features=5)
注意力可视化¶
Python
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import seaborn as sns
class AttentionVisualizer:
"""Transformer注意力可视化"""
def __init__(self, model):
self.model = model
self.attention_weights = []
# 注册钩子捕获注意力权重
self._register_hooks()
def _register_hooks(self):
"""注册前向钩子捕获注意力"""
def hook_fn(module, input, output):
# 假设output包含注意力权重
if isinstance(output, tuple) and len(output) > 1:
self.attention_weights.append(output[1].detach()) # detach()从计算图分离,不参与梯度计算
for name, module in self.model.named_modules():
if 'attn' in name.lower() or 'attention' in name.lower():
module.register_forward_hook(hook_fn)
def visualize_attention(self, tokens, layer_idx=-1, head_idx=0):
"""
可视化注意力权重
tokens: 输入token列表
layer_idx: 要可视化的层
head_idx: 要可视化的头
"""
if not self.attention_weights:
print("没有捕获到注意力权重")
return
# 获取指定层的注意力
attn = self.attention_weights[layer_idx] # (batch, heads, seq_len, seq_len)
attn = attn[0, head_idx].cpu().numpy() # (seq_len, seq_len)
plt.figure(figsize=(10, 8))
sns.heatmap(
attn,
xticklabels=tokens,
yticklabels=tokens,
cmap='viridis',
annot=True,
fmt='.2f',
cbar_kws={'label': 'Attention Weight'}
)
plt.title(f'Attention Heatmap - Layer {layer_idx}, Head {head_idx}')
plt.xlabel('Key')
plt.ylabel('Query')
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()
def analyze_attention_patterns(self, tokens):
"""分析注意力模式"""
if not self.attention_weights:
return
print("=" * 50)
print("注意力模式分析")
print("=" * 50)
num_layers = len(self.attention_weights)
num_heads = self.attention_weights[0].shape[1]
print(f"\n模型结构: {num_layers}层, {num_heads}头")
# 分析每层的注意力熵(注意力分布的集中程度)
for layer_idx, attn in enumerate(self.attention_weights):
attn = attn[0] # (heads, seq_len, seq_len)
# 计算熵
entropy = -(attn * torch.log(attn + 1e-10)).sum(dim=-1).mean(dim=-1) # (heads,)
avg_entropy = entropy.mean().item() # .item()将单元素张量转为Python数值
print(f"\n层 {layer_idx}: 平均注意力熵 = {avg_entropy:.4f}")
print(f" 低熵 = 注意力集中, 高熵 = 注意力分散")
# 找出最集中的头
min_entropy_head = entropy.argmin().item()
print(f" 最集中的头: Head {min_entropy_head}")
# 使用BERT的示例
from transformers import BertTokenizer, BertForSequenceClassification
def demo_attention_visualization():
"""演示注意力可视化"""
# 加载预训练模型
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained(
'bert-base-uncased',
output_attentions=True
)
# 输入文本
text = "The cat sat on the mat"
inputs = tokenizer(text, return_tensors='pt')
tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
# 前向传播
with torch.no_grad():
outputs = model(**inputs)
# 获取注意力权重
attentions = outputs.attentions # tuple of (layers) tensors
# 可视化第一层第一个头
attn = attentions[0][0, 0].numpy() # (seq_len, seq_len)
plt.figure(figsize=(10, 8))
sns.heatmap(
attn,
xticklabels=tokens,
yticklabels=tokens,
cmap='viridis',
annot=True,
fmt='.2f'
)
plt.title('BERT Attention - Layer 0, Head 0')
plt.tight_layout()
plt.show()
print(f"输入token: {tokens}")
print(f"[CLS]的注意力分布:")
for token, weight in zip(tokens, attn[0]):
print(f" → {token}: {weight:.3f}")
Grad-CAM (Gradient-weighted Class Activation Mapping)¶
Python
import torch
import torch.nn.functional as F
import cv2
import numpy as np
import matplotlib.pyplot as plt
class GradCAM:
"""Grad-CAM可视化"""
def __init__(self, model, target_layer):
"""
Grad-CAM: 使用梯度加权特征图来定位重要区域
核心公式:
L_gradcam = ReLU(Σ α_k * A^k)
其中 α_k = GAP(∂Y/∂A^k)
"""
self.model = model
self.target_layer = target_layer
self.gradients = None
self.activations = None
# 注册钩子
self._register_hooks()
def _register_hooks(self):
"""注册前向和后向钩子"""
def forward_hook(module, input, output):
self.activations = output.detach()
def backward_hook(module, grad_input, grad_output):
self.gradients = grad_output[0].detach()
self.target_layer.register_forward_hook(forward_hook)
self.target_layer.register_full_backward_hook(backward_hook)
def generate_cam(self, input_image, target_class=None):
"""
生成CAM热力图
input_image: (1, C, H, W)
"""
self.model.eval() # eval()开启评估模式(关闭Dropout等)
# 前向传播
output = self.model(input_image)
if target_class is None:
target_class = output.argmax(dim=1).item()
# 反向传播
self.model.zero_grad()
one_hot = torch.zeros_like(output)
one_hot[0, target_class] = 1
output.backward(gradient=one_hot, retain_graph=True) # 反向传播计算梯度
# 计算权重
gradients = self.gradients # (1, C, H, W)
activations = self.activations # (1, C, H, W)
# 全局平均池化得到权重
weights = gradients.mean(dim=(2, 3), keepdim=True) # (1, C, 1, 1)
# 加权特征图
cam = (weights * activations).sum(dim=1, keepdim=True) # (1, 1, H, W)
cam = F.relu(cam)
# 归一化
cam = cam - cam.min()
cam = cam / (cam.max() + 1e-8)
# 上采样到原图大小
cam = F.interpolate(
cam,
size=input_image.shape[2:],
mode='bilinear',
align_corners=False
)
return cam.squeeze().cpu().numpy(), target_class # squeeze去除大小为1的维度
def visualize(self, input_image, cam, alpha=0.5):
"""可视化CAM"""
# 转换为numpy
img = input_image.squeeze().permute(1, 2, 0).cpu().numpy()
img = (img - img.min()) / (img.max() - img.min())
# 应用颜色映射
heatmap = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET)
heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)
heatmap = heatmap / 255.0
# 叠加
overlay = img * (1 - alpha) + heatmap * alpha
overlay = np.clip(overlay, 0, 1)
# 显示
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
axes[0].imshow(img)
axes[0].set_title('Original Image')
axes[0].axis('off')
axes[1].imshow(heatmap)
axes[1].set_title('Grad-CAM Heatmap')
axes[1].axis('off')
axes[2].imshow(overlay)
axes[2].set_title('Overlay')
axes[2].axis('off')
plt.tight_layout()
plt.show()
# 使用示例
from torchvision.models import resnet50, ResNet50_Weights
from torchvision.transforms import Compose, Resize, ToTensor, Normalize
from PIL import Image
def demo_gradcam():
"""演示Grad-CAM"""
# 加载模型
model = resnet50(weights=ResNet50_Weights.DEFAULT)
model.eval()
# 选择目标层(最后一个卷积层)
target_layer = model.layer4[-1].conv3
grad_cam = GradCAM(model, target_layer)
# 预处理图像
transform = Compose([
Resize((224, 224)),
ToTensor(),
Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# 加载图像(使用随机图像作为示例)
img = torch.randn(1, 3, 224, 224)
# 生成CAM
cam, pred_class = grad_cam.generate_cam(img)
print(f"预测类别: {pred_class}")
print(f"CAM形状: {cam.shape}")
print(f"CAM值范围: [{cam.min():.3f}, {cam.max():.3f}]")
🎯 因果推断基础¶
因果 vs 相关¶
Text Only
核心区别:
相关性 (Correlation):
- A和B同时发生
- 可能有共同原因C
- 可能是巧合
- 不能用于干预决策
因果性 (Causation):
- A导致B发生
- 改变A会改变B
- 可以预测干预效果
- 支持决策制定
经典例子:
- 冰淇淋销量 ↔ 溺水事故 (相关但非因果)
共同原因:天气炎热
- 吸烟 → 肺癌 (因果关系)
干预:戒烟可以降低肺癌风险
因果推断框架¶
Python
class CausalInferenceBasics:
"""因果推断基础概念"""
def __init__(self):
pass
@staticmethod # @staticmethod静态方法,无需实例即可调用
def explain_potential_outcomes():
"""
潜在结果框架 (Rubin Causal Model)
对于个体i:
- Y_i(1): 接受处理的结果
- Y_i(0): 不接受处理的结果
个体处理效应: τ_i = Y_i(1) - Y_i(0)
根本问题:无法同时观察到Y_i(1)和Y_i(0)
"""
print("=" * 50)
print("潜在结果框架")
print("=" * 50)
print("""
核心概念:
1. 处理变量 T ∈ {0, 1}
2. 潜在结果 Y(0), Y(1)
3. 观察结果 Y = T·Y(1) + (1-T)·Y(0)
因果效应度量:
- 平均处理效应 (ATE): E[Y(1) - Y(0)]
- 处理组平均效应 (ATT): E[Y(1) - Y(0) | T=1]
- 条件平均效应 (CATE): E[Y(1) - Y(0) | X=x]
""")
@staticmethod
def explain_causal_graphs():
"""
因果图 (Pearl's Causal Graph)
用有向无环图(DAG)表示因果关系
"""
print("=" * 50)
print("因果图模型")
print("=" * 50)
print("""
基本结构:
1. 链式结构 (Chain): X → Z → Y
- X间接影响Y
- 控制Z会阻断路径
2. 分叉结构 (Fork): X ← Z → Y
- Z是混杂因子
- 控制Z消除混杂
3. 对撞结构 (Collider): X → Z ← Y
- 控制Z会打开路径
- 不应控制对撞变量
d-分离:判断条件独立性的图形准则
""")
# 因果图示例(使用文本表示)
causal_graph_examples = """
示例1:药物试验
年龄 → 药物 ← 病情严重程度
↓ ↓
血压变化 ← ?
混杂因子:年龄、病情严重程度
需要控制这些变量来估计药物的真实效果
示例2:工作培训项目
教育水平 → 工作培训 ← 动机
↓ ↓
收入 ← ?
选择偏差:只有有动机的人才会参加培训
需要处理选择偏差
"""
工具变量法¶
Python
import numpy as np
import statsmodels.api as sm
from sklearn.linear_model import LinearRegression
class InstrumentalVariables:
"""工具变量法"""
def __init__(self):
pass
def explain_iv(self):
"""
工具变量 (IV) 方法
当存在未观察到的混杂因子时,
工具变量可以帮助识别因果效应
工具变量Z的条件:
1. 相关性:Z与处理变量T相关
2. 排他性:Z只通过T影响Y
3. 无混杂:Z与Y的误差项无关
"""
print("=" * 50)
print("工具变量法")
print("=" * 50)
print("""
两阶段最小二乘法 (2SLS):
阶段1: T = π₀ + π₁Z + ε
用Z预测T
阶段2: Y = β₀ + β₁T̂ + u
用预测的T̂估计对Y的影响
β₁就是因果效应的IV估计
""")
def iv_estimation(self, Z, T, Y):
"""
工具变量估计
Z: 工具变量 (n,)
T: 处理变量 (n,)
Y: 结果变量 (n,)
"""
# 阶段1: Z → T
Z_with_const = sm.add_constant(Z)
stage1 = sm.OLS(T, Z_with_const).fit()
T_hat = stage1.predict(Z_with_const)
print("阶段1结果 (Z → T):")
print(f" 系数: {stage1.params[1]:.4f}")
print(f" R²: {stage1.rsquared:.4f}")
# 阶段2: T̂ → Y
T_hat_with_const = sm.add_constant(T_hat)
stage2 = sm.OLS(Y, T_hat_with_const).fit()
print("\n阶段2结果 (T̂ → Y):")
print(f" 因果效应估计: {stage2.params[1]:.4f}")
print(f" 标准误: {stage2.bse[1]:.4f}")
print(f" p值: {stage2.pvalues[1]:.4f}")
return stage2.params[1], stage2.bse[1]
def generate_iv_data(self, n=1000, true_effect=2.0):
"""生成工具变量示例数据"""
np.random.seed(42)
# 未观察到的混杂因子
U = np.random.normal(0, 1, n)
# 工具变量(随机分配)
Z = np.random.binomial(1, 0.5, n)
# 处理变量(受Z和U影响)
T = 0.5 * Z + 0.8 * U + np.random.normal(0, 0.5, n)
# 结果变量(受T和U影响)
Y = true_effect * T + 1.5 * U + np.random.normal(0, 1, n)
return Z, T, Y, true_effect
# 演示
iv = InstrumentalVariables()
iv.explain_iv()
Z, T, Y, true_effect = iv.generate_iv_data()
estimated_effect, se = iv.iv_estimation(Z, T, Y)
print(f"\n真实效应: {true_effect:.4f}")
print(f"估计效应: {estimated_effect:.4f}")
双重差分法 (DID)¶
Python
import pandas as pd
import statsmodels.formula.api as smf
class DifferenceInDifferences:
"""双重差分法"""
def __init__(self):
pass
def explain_did(self):
"""
双重差分法 (Difference-in-Differences)
利用面板数据,比较处理组和对照组
在处理前后的变化差异
关键假设:平行趋势假设
- 如果没有处理,处理组和对照组的趋势应该平行
"""
print("=" * 50)
print("双重差分法 (DID)")
print("=" * 50)
print("""
DID估计量:
效应 = (Y_treatment_post - Y_treatment_pre)
- (Y_control_post - Y_control_pre)
回归形式:
Y = β₀ + β₁·Treat + β₂·Post + β₃·(Treat×Post) + ε
β₃就是DID估计的因果效应
""")
def generate_did_data(self, n_units=1000, n_periods=2, true_effect=3.0):
"""生成DID示例数据"""
np.random.seed(42)
data = []
# 处理组
for i in range(n_units // 2):
# 处理前
y_pre = 10 + np.random.normal(0, 1)
data.append({
'unit': i,
'period': 0,
'treat': 1,
'post': 0,
'y': y_pre
})
# 处理后(接受处理)
y_post = 10 + true_effect + np.random.normal(0, 1)
data.append({
'unit': i,
'period': 1,
'treat': 1,
'post': 1,
'y': y_post
})
# 对照组
for i in range(n_units // 2, n_units):
# 处理前
y_pre = 8 + np.random.normal(0, 1)
data.append({
'unit': i,
'period': 0,
'treat': 0,
'post': 0,
'y': y_pre
})
# 处理后(未接受处理)
y_post = 8 + 0.5 + np.random.normal(0, 1) # 自然增长0.5
data.append({
'unit': i,
'period': 1,
'treat': 0,
'post': 1,
'y': y_post
})
return pd.DataFrame(data)
def estimate_did(self, df):
"""估计DID模型"""
# 创建交互项
df['treat_post'] = df['treat'] * df['post']
# 回归
model = smf.ols('y ~ treat + post + treat_post', data=df).fit()
print("\nDID回归结果:")
print(model.summary().tables[1])
# 提取DID估计量
did_estimate = model.params['treat_post']
did_se = model.bse['treat_post']
print(f"\nDID估计的因果效应: {did_estimate:.4f} (SE: {did_se:.4f})")
return did_estimate, did_se
def check_parallel_trends(self, df):
"""检验平行趋势假设"""
# 需要多期数据才能检验
print("\n平行趋势假设检验:")
print("需要处理前多期数据来验证趋势是否平行")
print("方法:加入处理前各期虚拟变量与处理组的交互项")
print("如果都不显著,则支持平行趋势假设")
# 演示
did = DifferenceInDifferences()
did.explain_did()
df = did.generate_did_data()
effect, se = did.estimate_did(df)
print(f"\n真实效应: 3.0")
print(f"估计效应: {effect:.4f}")
倾向得分匹配 (PSM)¶
Python
from sklearn.neighbors import NearestNeighbors
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
class PropensityScoreMatching:
"""倾向得分匹配"""
def __init__(self):
pass
def explain_psm(self):
"""
倾向得分匹配 (Propensity Score Matching)
核心思想:
- 估计每个个体接受处理的概率(倾向得分)
- 匹配倾向得分相似的个体
- 比较匹配后的处理组和对照组
假设:
- 无未观察到的混杂因子
- 共同支撑域(有重叠的倾向得分分布)
"""
print("=" * 50)
print("倾向得分匹配 (PSM)")
print("=" * 50)
print("""
步骤:
1. 用Logistic回归估计倾向得分
e(X) = P(T=1|X)
2. 匹配算法
- 最近邻匹配
- 卡尺匹配
- 核匹配
3. 平衡性检验
- 匹配后协变量应该平衡
4. 估计处理效应
- 比较匹配后的结果差异
""")
def estimate_propensity_scores(self, X, T):
"""估计倾向得分"""
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Logistic回归
ps_model = LogisticRegression(max_iter=1000)
ps_model.fit(X_scaled, T)
# 预测倾向得分
propensity_scores = ps_model.predict_proba(X_scaled)[:, 1]
return propensity_scores, ps_model
def match_samples(self, X, T, Y, propensity_scores, caliper=0.05):
"""
卡尺内最近邻匹配
为每个处理组样本在对照组中找到
倾向得分最接近的匹配
"""
# 分离处理组和对照组
treated_idx = np.where(T == 1)[0]
control_idx = np.where(T == 0)[0]
treated_ps = propensity_scores[treated_idx]
control_ps = propensity_scores[control_idx]
# 为对照组构建最近邻搜索器
nbrs = NearestNeighbors(n_neighbors=1).fit(control_ps.reshape(-1, 1))
matched_pairs = []
for i, ps in enumerate(treated_ps):
distances, indices = nbrs.kneighbors([[ps]])
distance = distances[0][0]
match_idx = control_idx[indices[0][0]]
# 卡尺检查
if distance <= caliper:
matched_pairs.append((treated_idx[i], match_idx))
print(f"匹配对数: {len(matched_pairs)} / {len(treated_idx)}")
return matched_pairs
def estimate_ate(self, Y, matched_pairs):
"""估计平均处理效应"""
treated_outcomes = [Y[i] for i, _ in matched_pairs]
control_outcomes = [Y[j] for _, j in matched_pairs]
ate = np.mean(treated_outcomes) - np.mean(control_outcomes)
# 标准误(简化计算)
n = len(matched_pairs)
se = np.sqrt(np.var(treated_outcomes) / n + np.var(control_outcomes) / n)
print(f"\nPSM估计的ATE: {ate:.4f} (SE: {se:.4f})")
return ate, se
def check_balance(self, X, T, matched_pairs):
"""检验匹配后的平衡性"""
print("\n平衡性检验:")
print("匹配前 vs 匹配后的标准化差异")
for col in range(X.shape[1]):
# 匹配前
treated_mean_pre = X[T==1, col].mean()
control_mean_pre = X[T==0, col].mean()
pooled_std_pre = np.sqrt((X[T==1, col].var() + X[T==0, col].var()) / 2)
std_diff_pre = abs(treated_mean_pre - control_mean_pre) / pooled_std_pre
# 匹配后
treated_idx = [i for i, _ in matched_pairs]
control_idx = [j for _, j in matched_pairs]
treated_mean_post = X[treated_idx, col].mean()
control_mean_post = X[control_idx, col].mean()
pooled_std_post = np.sqrt((X[treated_idx, col].var() + X[control_idx, col].var()) / 2)
std_diff_post = abs(treated_mean_post - control_mean_post) / pooled_std_post
print(f" 变量{col}: 匹配前={std_diff_pre:.3f}, 匹配后={std_diff_post:.3f}")
# 演示
def demo_psm():
np.random.seed(42)
n = 1000
# 协变量
X1 = np.random.normal(0, 1, n)
X2 = np.random.normal(0, 1, n)
X = np.column_stack([X1, X2])
# 处理变量(受协变量影响)
propensity = 1 / (1 + np.exp(-(0.5 * X1 + 0.3 * X2)))
T = np.random.binomial(1, propensity)
# 结果变量
Y = 2 * T + 1.5 * X1 + 0.8 * X2 + np.random.normal(0, 1, n)
psm = PropensityScoreMatching()
psm.explain_psm()
ps, model = psm.estimate_propensity_scores(X, T)
matched_pairs = psm.match_samples(X, T, Y, ps)
ate, se = psm.estimate_ate(Y, matched_pairs)
psm.check_balance(X, T, matched_pairs)
print(f"\n真实效应: 2.0")
print(f"估计效应: {ate:.4f}")
demo_psm()
🧠 因果发现¶
基于约束的方法 (PC算法)¶
Python
class CausalDiscovery:
"""因果发现算法"""
def __init__(self):
pass
def explain_pc_algorithm(self):
"""
PC算法 (Peter-Clark)
基于条件独立性检验发现因果结构
步骤:
1. 构建完全无向图
2. 基于条件独立性去除边
3. 确定V结构
4. 定向更多边
"""
print("=" * 50)
print("PC算法")
print("=" * 50)
print("""
核心思想:
- 如果X和Y在给定Z的条件下独立,
则它们之间没有直接因果关系
步骤:
1. 骨架学习:找到条件独立的变量对
2. V结构识别:X-Z-Y且X和Y不独立
3. 边定向:传播方向约束
假设:
- 因果充分性(无未观察到的混杂因子)
- 忠实性(条件独立反映d-分离)
""")
def conditional_independence_test(self, X, Y, Z=None, alpha=0.05):
"""
条件独立性检验
使用偏相关检验
"""
from scipy import stats
if Z is None or len(Z) == 0:
# 简单相关检验
corr, p_value = stats.pearsonr(X, Y)
else:
# 偏相关检验
from sklearn.linear_model import LinearRegression
# X对Z回归
reg_x = LinearRegression().fit(Z, X)
resid_x = X - reg_x.predict(Z)
# Y对Z回归
reg_y = LinearRegression().fit(Z, Y)
resid_y = Y - reg_y.predict(Z)
# 残差相关
corr, p_value = stats.pearsonr(resid_x, resid_y)
independent = p_value > alpha
return independent, p_value
# 使用pgmpy库进行因果发现
def demo_causal_discovery():
"""演示因果发现"""
try: # try/except捕获异常,防止程序崩溃
from pgmpy.estimators import PC
from pgmpy.models import BayesianNetwork
import pandas as pd
# 生成示例数据
np.random.seed(42)
n = 1000
# X → Y → Z
X = np.random.normal(0, 1, n)
Y = 0.5 * X + np.random.normal(0, 0.5, n)
Z = 0.7 * Y + np.random.normal(0, 0.5, n)
data = pd.DataFrame({'X': X, 'Y': Y, 'Z': Z})
# PC算法
pc = PC(data)
model = pc.estimate()
print("发现的因果结构:")
print(model.edges())
except ImportError:
print("需要安装pgmpy: pip install pgmpy")
🔬 前沿方法¶
大语言模型的可解释性¶
Python
class LLMInterpretability:
"""
大语言模型可解释性方法
2024-2025年研究热点
"""
def logit_lens(self, model, input_ids, layer_idx):
"""
Logit Lens:观察中间层的预测
通过将中间层表示投影到词表,
观察模型在各层的"思考过程"
"""
with torch.no_grad():
outputs = model(input_ids, output_hidden_states=True)
hidden = outputs.hidden_states[layer_idx]
# 使用输出嵌入矩阵投影
logits = hidden @ model.lm_head.weight.T
probs = torch.softmax(logits, dim=-1)
# 获取top预测
top_probs, top_ids = torch.topk(probs[0, -1], k=5)
return top_ids, top_probs
def attention_pattern_analysis(self, attention_weights, tokens):
"""
注意力模式分析
识别induction heads、previous token heads等
"""
# Induction head检测:关注[ A ] [ B ] ... [ A ] → [ B ] 模式
seq_len = attention_weights.shape[-1]
patterns = {
'induction_heads': [],
'previous_token_heads': [],
'duplicate_token_heads': []
}
for head_idx in range(attention_weights.shape[0]):
attn = attention_weights[head_idx]
# 检测previous token head(关注前一个token)
prev_token_score = torch.diag(attn, offset=-1).mean()
if prev_token_score > 0.5:
patterns['previous_token_heads'].append(head_idx)
# 检测duplicate token head(关注重复token)
for i in range(seq_len):
for j in range(i):
if tokens[i] == tokens[j] and attn[i, j] > 0.3:
patterns['duplicate_token_heads'].append(head_idx)
break
return patterns
def steering_vector(self, model, layer_idx, concept_examples):
"""
方向操控(Representation Engineering)
通过添加方向向量来操控模型行为
"""
# 收集概念的正负例表示
positive_reps = []
negative_reps = []
for text, label in concept_examples:
inputs = tokenizer(text, return_tensors='pt')
with torch.no_grad():
outputs = model(**inputs, output_hidden_states=True)
rep = outputs.hidden_states[layer_idx][0, -1]
if label == 1:
positive_reps.append(rep)
else:
negative_reps.append(rep)
# 计算方向向量
positive_mean = torch.stack(positive_reps).mean(dim=0)
negative_mean = torch.stack(negative_reps).mean(dim=0)
steering_vec = positive_mean - negative_mean
return steering_vec / steering_vec.norm()
class MechanisticInterpretability:
"""
机械可解释性
尝试逆向工程神经网络的内部机制
"""
def circuit_tracing(self, model, input_text, target_output):
"""
电路追踪:识别参与特定任务的子网络
使用激活修补(activation patching)技术
"""
# 1. 记录clean运行的激活
clean_cache = {}
def hook_fn(name):
def fn(module, input, output):
clean_cache[name] = output.detach()
return fn
# 2. 运行corrupted输入
# 3. 将clean激活patch到corrupted运行
# 4. 测量对输出的影响
pass
def superposition_analysis(self, weight_matrix, num_features):
"""
超位置分析
理解神经网络如何编码比维度更多的特征
"""
# SVD分解
U, S, V = torch.svd(weight_matrix)
# 分析特征值分布
explained_variance = (S ** 2) / (S ** 2).sum()
return {
'singular_values': S,
'explained_variance': explained_variance,
'effective_dimensionality': (explained_variance > 0.01).sum()
}
因果机器学习 (Causal Machine Learning)¶
Python
class CausalML:
"""
因果机器学习
结合机器学习与因果推断的前沿方法
"""
def __init__(self):
pass
def causal_forest(self, X, T, Y):
"""
因果森林 (Causal Forest)
估计异质性处理效应 (CATE)
"""
# econml 0.14+ 使用 CausalForestDML(原 econml.grf.CausalForest 已弃用)
from econml.dml import CausalForestDML
# 训练因果森林
est = CausalForestDML(
n_estimators=100,
max_depth=5,
min_samples_leaf=10
)
est.fit(Y, T, X=X)
# 预测个体处理效应
treatment_effects = est.effect(X)
return est, treatment_effects
def double_machine_learning(self, X, T, Y):
"""
双重机器学习 (Double Machine Learning)
使用ML估计干扰函数,获得无偏的ATE估计
"""
from econml.dml import LinearDML
# 使用随机森林作为干扰函数估计器
from sklearn.ensemble import RandomForestRegressor
est = LinearDML(
model_y=RandomForestRegressor(),
model_t=RandomForestRegressor(),
cv=5
)
est.fit(Y, T, X=X)
# 平均处理效应(econml 0.14+ 使用方法调用而非属性)
ate = est.ate(X)
ate_interval = est.ate_interval(X)
return ate, ate_interval
def causal_representation_learning(self, X, T, Y):
"""
因果表示学习
学习对干预不变的表示
"""
import torch.nn as nn
class CausalRepresentationNet(nn.Module): # 继承nn.Module定义神经网络层
def __init__(self, input_dim, hidden_dim, repr_dim):
super().__init__() # super()调用父类方法
# 编码器:学习因果表示
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, repr_dim)
)
# 处理效应预测头
self.treatment_head = nn.Linear(repr_dim, 1)
# 结果预测头
self.outcome_head = nn.Sequential(
nn.Linear(repr_dim + 1, hidden_dim), # +1 for treatment
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, x, t):
# 学习表示
z = self.encoder(x)
# 预测处理效应
treatment_effect = self.treatment_head(z)
# 预测结果
outcome_input = torch.cat([z, t.unsqueeze(-1)], dim=-1)
outcome = self.outcome_head(outcome_input)
return z, treatment_effect, outcome
return CausalRepresentationNet
def counterfactual_generation(self, model, x, t, t_counterfactual):
"""
反事实样本生成
使用生成模型生成反事实结果
"""
# 使用CVAE或扩散模型生成反事实
# 输入:原始特征x,原始处理t,目标处理t_counterfactual
# 输出:反事实结果y_cf
pass
class AutomatedCausalDiscovery:
"""
自动因果发现
从数据自动发现因果结构
"""
def __init__(self):
pass
def lingam(self, data):
"""
LiNGAM: 非高斯数据的线性因果模型
利用非高斯性识别因果方向
"""
try:
from causallearn.search.FCMBased import lingam
model = lingam.ICALiNGAM()
model.fit(data)
# 因果矩阵
causal_matrix = model.adjacency_matrix_
return causal_matrix
except ImportError:
print("需要安装causallearn: pip install causallearn")
return None
def notears(self, data):
"""
NOTEARS: 基于梯度的因果发现
使用可微分优化学习DAG
"""
try:
from notears.linear import notears_linear
# 学习加权邻接矩阵
W = notears_linear(data, lambda1=0.1, loss_type='l2')
# 阈值化得到DAG
A = (np.abs(W) > 0.3).astype(int)
return A, W
except ImportError:
print("需要安装notears: pip install notears")
return None, None
可信AI与公平性¶
Python
class TrustworthyAI:
"""
可信AI:公平性、鲁棒性、隐私保护
"""
def fairness_metrics(self, y_true, y_pred, sensitive_attr):
"""
公平性评估指标
"""
from fairlearn.metrics import demographic_parity_difference, equalized_odds_difference
# 人口统计均等差异
dp_diff = demographic_parity_difference(
y_true, y_pred, sensitive_features=sensitive_attr
)
# 机会均等差异
eo_diff = equalized_odds_difference(
y_true, y_pred, sensitive_features=sensitive_attr
)
return {
'demographic_parity': dp_diff,
'equalized_odds': eo_diff
}
def adversarial_debiasing(self, model, X, y, sensitive_attr):
"""
对抗性去偏
通过对抗训练消除敏感属性影响
"""
import torch.nn as nn
class AdversarialDebiasingNet(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
# 主预测器
self.predictor = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, output_dim)
)
# 对抗分类器(预测敏感属性)
self.adversary = nn.Sequential(
nn.Linear(output_dim, hidden_dim // 2),
nn.ReLU(),
nn.Linear(hidden_dim // 2, 1)
)
def forward(self, x):
# 主预测
prediction = self.predictor(x)
# 梯度反转层
reversed = GradientReversal.apply(prediction)
# 对抗预测
sensitive_pred = self.adversary(reversed)
return prediction, sensitive_pred
return AdversarialDebiasingNet
class GradientReversal(torch.autograd.Function):
"""梯度反转层"""
@staticmethod
def forward(ctx, x):
return x.view_as(x)
@staticmethod
def backward(ctx, grad_output):
return -grad_output
💡 总结¶
Text Only
可解释AI与因果推断核心要点:
┌─────────────────────────────────────────────────────────────┐
│ 可解释AI (XAI) │
├─────────────────────────────────────────────────────────────┤
│ 1. 内在可解释模型 │
│ - 线性模型:权重直接可解释 │
│ - 决策树:决策路径透明 │
├─────────────────────────────────────────────────────────────┤
│ 2. 事后解释方法 │
│ - SHAP:基于博弈论的全局+局部解释 │
│ - LIME:局部线性近似 │
│ - Grad-CAM:CNN可视化 │
│ - 注意力可视化:Transformer解释 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 因果推断 │
├─────────────────────────────────────────────────────────────┤
│ 1. 基础概念 │
│ - 潜在结果框架 │
│ - 因果图模型 │
│ - 混杂因子与选择偏差 │
├─────────────────────────────────────────────────────────────┤
│ 2. 识别策略 │
│ - 工具变量 (IV):处理内生性问题 │
│ - 双重差分 (DID):利用面板数据 │
│ - 倾向得分匹配 (PSM):控制观察到的混杂 │
├─────────────────────────────────────────────────────────────┤
│ 3. 因果发现 │
│ - PC算法:基于条件独立性 │
│ - 从数据中发现因果结构 │
└─────────────────────────────────────────────────────────────┘
实践建议:
1. 优先使用内在可解释模型
2. 对黑盒模型使用多种解释方法交叉验证
3. 因果推断需要清晰的识别策略
4. 注意检验关键假设(平行趋势、工具变量有效性等)
5. 可解释性和因果性结合,构建可信AI系统
恭喜你完成了全部24章的学习! 🎉
你已经建立了从基础到前沿的完整机器学习知识体系。继续实践、探索和创新!
祝你学习愉快! 🚀