🔧 实验管理与模型版本¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
学习目标:掌握MLflow/W&B实验跟踪、模型版本管理、DVC数据版本控制,建立完全可复现的ML实验流程 预计时长:10-12小时 前置知识:Python编程、PyTorch/TensorFlow基础
📋 本章概览¶
在实际AI项目中,模型开发涉及大量实验——不同的超参数、数据集、特征组合。如果缺乏系统化的实验管理,很快就会陷入"哪个实验效果最好?""那个模型用的什么参数?"的困境。本章将系统介绍工业界主流的实验管理工具和最佳实践。
实验管理全景图:
┌─────────────────────────────────────────────────────┐
│ 实验管理平台 │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │
│ │ 参数记录 │ │ 指标跟踪 │ │ Artifact管理 │ │
│ │ lr/batch │ │ loss/acc │ │ 模型/数据/图表 │ │
│ └──────────┘ └──────────┘ └──────────────────┘ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │
│ │ 模型注册 │ │ 数据版本 │ │ 环境可复现 │ │
│ │ Registry │ │ DVC │ │ 种子/依赖/配置 │ │
│ └──────────┘ └──────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────┘
上图展示了MLflow的实验跟踪界面,可以看到所有实验运行的列表,包括模型参数、状态、创建时间和指标。这个界面允许用户按不同属性排序和过滤实验,方便快速找到最佳模型。
一、MLflow实验跟踪¶
1.1 MLflow核心概念¶
MLflow是一个开源的ML生命周期管理平台,核心组件包括:
| 组件 | 功能 | 说明 |
|---|---|---|
| Tracking | 实验跟踪 | 记录参数、指标、Artifact |
| Models | 模型打包 | 统一模型格式,支持多框架 |
| Model Registry | 模型注册 | 模型版本管理与阶段转换 |
| Projects | 项目打包 | 可复现的代码运行环境 |
1.2 安装与基础配置¶
# 安装MLflow
# pip install mlflow>=2.10
import mlflow
import mlflow.pytorch # 或 mlflow.sklearn, mlflow.tensorflow
# 设置跟踪服务器(本地或远程)
# 本地文件存储
mlflow.set_tracking_uri("file:///tmp/mlflow-experiments")
# 远程服务器
# mlflow.set_tracking_uri("http://mlflow-server:5000")
# 创建/设置实验
mlflow.set_experiment("text-classification-bert")
print(f"Tracking URI: {mlflow.get_tracking_uri()}")
print(f"Experiment: {mlflow.get_experiment_by_name('text-classification-bert')}")
1.3 参数与指标记录¶
import mlflow
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
# 生成示例数据
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 定义超参数
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5,
"min_samples_leaf": 2,
"random_state": 42
}
# 使用MLflow记录实验
mlflow.set_experiment("rf-classification")
with mlflow.start_run(run_name="rf-baseline") as run:
# 1. 记录参数
mlflow.log_params(params)
# 2. 记录额外元信息(tags)
mlflow.set_tags({
"model_type": "RandomForest",
"dataset": "synthetic",
"author": "your_name",
"purpose": "baseline"
})
# 3. 训练模型
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# 4. 评估并记录指标
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred)
}
mlflow.log_metrics(metrics)
# 5. 记录训练过程指标(按步骤)
for epoch in range(10):
mlflow.log_metric("training_loss", np.random.exponential(0.5) * (0.9 ** epoch), step=epoch)
mlflow.log_metric("val_accuracy", 0.7 + 0.03 * epoch - np.random.normal(0, 0.01), step=epoch)
# 6. 记录模型
mlflow.sklearn.log_model(model, "model")
# 7. 记录Artifact(图表、数据等)
# 生成并保存混淆矩阵
from sklearn.metrics import confusion_matrix
import json
cm = confusion_matrix(y_test, y_pred).tolist()
with open("confusion_matrix.json", "w") as f:
json.dump({"matrix": cm}, f)
mlflow.log_artifact("confusion_matrix.json")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
1.4 PyTorch模型集成MLflow¶
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import mlflow
import mlflow.pytorch
class SimpleClassifier(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, dropout=0.3):
super().__init__() # super()调用父类方法
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim // 2, output_dim)
)
def forward(self, x):
return self.net(x)
def train_with_mlflow(config: dict):
"""完整的训练+MLflow跟踪流程"""
mlflow.set_experiment("pytorch-classifier")
with mlflow.start_run(run_name=f"lr{config['lr']}_bs{config['batch_size']}"):
# 记录所有超参数
mlflow.log_params(config)
# 准备数据
X_train = torch.randn(800, config["input_dim"])
y_train = torch.randint(0, config["output_dim"], (800,))
X_val = torch.randn(200, config["input_dim"])
y_val = torch.randint(0, config["output_dim"], (200,))
train_loader = DataLoader(
TensorDataset(X_train, y_train),
batch_size=config["batch_size"],
shuffle=True
)
# 初始化模型
model = SimpleClassifier(
config["input_dim"], config["hidden_dim"],
config["output_dim"], config["dropout"]
)
optimizer = optim.Adam(model.parameters(), lr=config["lr"], weight_decay=config["weight_decay"])
criterion = nn.CrossEntropyLoss()
# 训练循环
best_val_acc = 0.0
for epoch in range(config["epochs"]):
model.train()
total_loss = 0
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
output = model(X_batch)
loss = criterion(output, y_batch)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
# 验证
model.eval()
with torch.no_grad():
val_output = model(X_val)
val_loss = criterion(val_output, y_val).item()
val_preds = val_output.argmax(dim=1)
val_acc = (val_preds == y_val).float().mean().item()
# 记录每个epoch的指标
mlflow.log_metrics({
"train_loss": avg_loss,
"val_loss": val_loss,
"val_accuracy": val_acc
}, step=epoch)
# 记录最佳模型
if val_acc > best_val_acc:
best_val_acc = val_acc
mlflow.pytorch.log_model(model, "best_model")
mlflow.log_metric("best_val_accuracy", best_val_acc)
# 记录最终模型
mlflow.pytorch.log_model(model, "final_model")
# 记录模型结构信息
mlflow.log_text(str(model), "model_architecture.txt")
# 记录模型参数量
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
mlflow.log_metrics({
"total_params": total_params,
"trainable_params": trainable_params
})
print(f"Training complete. Best val accuracy: {best_val_acc:.4f}")
# 运行训练
config = {
"input_dim": 20,
"hidden_dim": 128,
"output_dim": 5,
"dropout": 0.3,
"lr": 1e-3,
"weight_decay": 1e-4,
"batch_size": 32,
"epochs": 20
}
# train_with_mlflow(config) # 取消注释运行
1.5 MLflow实验对比与查询¶
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# 查询实验中的所有运行
experiment = client.get_experiment_by_name("pytorch-classifier")
if experiment:
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string="metrics.val_accuracy > 0.5",
order_by=["metrics.val_accuracy DESC"],
max_results=10
)
print("Top runs by validation accuracy:")
for run in runs:
print(f" Run: {run.info.run_id[:8]} "
f"lr={run.data.params.get('lr', 'N/A')} "
f"val_acc={run.data.metrics.get('val_accuracy', 'N/A'):.4f}")
# 加载指定run的模型
# model = mlflow.pytorch.load_model(f"runs:/{run_id}/best_model")
# 比较两个run的指标
def compare_runs(run_id_1: str, run_id_2: str):
"""比较两个实验运行的指标差异"""
run1 = client.get_run(run_id_1)
run2 = client.get_run(run_id_2)
all_metrics = set(run1.data.metrics.keys()) | set(run2.data.metrics.keys())
print(f"{'Metric':<20} {'Run 1':>12} {'Run 2':>12} {'Diff':>12}")
print("-" * 60)
for metric in sorted(all_metrics):
v1 = run1.data.metrics.get(metric, float('nan'))
v2 = run2.data.metrics.get(metric, float('nan'))
diff = v2 - v1
print(f"{metric:<20} {v1:>12.4f} {v2:>12.4f} {diff:>+12.4f}")
二、Weights & Biases(W&B)¶
2.1 W&B核心优势¶
上图展示了W&B的Dashboard界面,显示了机器学习运行的结果、超参数优化图表和参数重要性图表。W&B提供了强大的可视化功能,包括并行坐标图、参数重要性分析等,帮助团队更好地理解和优化模型性能。
W&B相比MLflow的主要优势: - 更好的可视化:内置丰富的图表和交互式Dashboard - 团队协作:云端共享实验结果 - Sweep超参搜索:内置超参数优化 - Artifact版本:完整的数据/模型版本管理 - Report报告:可生成交互式实验报告
2.2 W&B基础使用¶
# pip install wandb
import wandb
import numpy as np
# 初始化(首次需要API key)
# wandb.login(key="your-api-key")
def train_with_wandb():
"""使用W&B跟踪训练过程"""
# 初始化run
run = wandb.init(
project="text-classification",
name="bert-base-v1",
config={
"model": "bert-base-chinese",
"learning_rate": 2e-5,
"batch_size": 16,
"epochs": 10,
"max_length": 128,
"warmup_steps": 100,
"weight_decay": 0.01,
"optimizer": "AdamW",
"scheduler": "linear"
},
tags=["bert", "baseline", "chinese"],
notes="BERT中文文本分类基线实验"
)
config = wandb.config
# 模拟训练过程
for epoch in range(config.epochs):
# 模拟训练指标
train_loss = 2.0 * np.exp(-0.3 * epoch) + np.random.normal(0, 0.05)
train_acc = 1.0 - np.exp(-0.5 * epoch) * 0.6 + np.random.normal(0, 0.02)
val_loss = 2.0 * np.exp(-0.25 * epoch) + np.random.normal(0, 0.08)
val_acc = 1.0 - np.exp(-0.4 * epoch) * 0.6 + np.random.normal(0, 0.03)
lr = config.learning_rate * (1 - epoch / config.epochs)
# 记录指标
wandb.log({
"epoch": epoch,
"train/loss": train_loss,
"train/accuracy": train_acc,
"val/loss": val_loss,
"val/accuracy": val_acc,
"learning_rate": lr
})
# 记录最终结果摘要
wandb.summary["best_val_accuracy"] = 0.92
wandb.summary["best_epoch"] = 8
# 记录混淆矩阵
y_true = np.random.randint(0, 5, 100)
y_pred = y_true.copy()
y_pred[np.random.choice(100, 15, replace=False)] = np.random.randint(0, 5, 15)
wandb.log({
"confusion_matrix": wandb.plot.confusion_matrix(
y_true=y_true,
preds=y_pred,
class_names=["政治", "经济", "科技", "体育", "娱乐"]
)
})
wandb.finish()
# train_with_wandb() # 取消注释运行
2.3 W&B Sweep超参搜索¶
import wandb
# 定义搜索空间
sweep_config = {
"method": "bayes", # bayes, random, grid
"metric": {
"name": "val/accuracy",
"goal": "maximize"
},
"parameters": {
"learning_rate": {
"distribution": "log_uniform_values",
"min": 1e-5,
"max": 1e-3
},
"batch_size": {
"values": [8, 16, 32, 64]
},
"dropout": {
"distribution": "uniform",
"min": 0.1,
"max": 0.5
},
"hidden_dim": {
"values": [64, 128, 256, 512]
},
"optimizer": {
"values": ["adam", "adamw", "sgd"]
}
},
"early_terminate": {
"type": "hyperband",
"min_iter": 3,
"eta": 2
}
}
def sweep_train():
"""Sweep训练函数"""
run = wandb.init()
config = wandb.config
# 使用config中的超参进行训练
import numpy as np
for epoch in range(10):
loss = np.exp(-config.learning_rate * 1000 * epoch) * 2 + np.random.normal(0, 0.05)
acc = min(0.95, config.learning_rate * 5000 * np.log(epoch + 1) * (1 - config.dropout * 0.5))
wandb.log({"val/accuracy": acc, "val/loss": loss, "epoch": epoch})
wandb.finish()
# 创建并运行Sweep
# sweep_id = wandb.sweep(sweep_config, project="text-classification")
# wandb.agent(sweep_id, function=sweep_train, count=20)
2.4 W&B Artifacts管理¶
import wandb
def log_dataset_artifact():
"""记录数据集Artifact"""
run = wandb.init(project="mlops-demo", job_type="data-preparation")
# 创建数据集Artifact
artifact = wandb.Artifact(
name="train-dataset",
type="dataset",
description="预处理后的训练数据集",
metadata={
"num_samples": 10000,
"num_features": 768,
"num_classes": 5,
"preprocessing": "tokenize + normalize"
}
)
# 添加文件/目录
# artifact.add_file("data/train.csv")
# artifact.add_dir("data/processed/")
run.log_artifact(artifact)
wandb.finish()
def use_dataset_artifact():
"""使用已记录的数据集Artifact"""
run = wandb.init(project="mlops-demo", job_type="training")
# 下载指定版本的Artifact
artifact = run.use_artifact("train-dataset:v2")
artifact_dir = artifact.download()
print(f"Dataset downloaded to: {artifact_dir}")
wandb.finish()
2.5 W&B Weave —— 生成式AI评估与调试¶
W&B在2024年推出了Weave,专门面向LLM/GenAI的评估和调试场景(对标llm-cookbook中Evaluating and Debugging Generative AI课程):
"""W&B Weave —— LLM应用评估与Prompt追踪"""
import weave
# pip install weave
# 初始化Weave项目
weave.init("llm-eval-demo")
# 1) 用 @weave.op() 自动追踪LLM调用
@weave.op()
def chat_completion(prompt: str, model: str = "gpt-4o-mini") -> str:
"""追踪每次LLM调用的输入/输出/延迟/Token数"""
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
# 2) 定义评估数据集
eval_dataset = [
{"input": "什么是Transformer?", "expected": "一种基于自注意力机制的神经网络架构"},
{"input": "解释LoRA微调", "expected": "通过低秩矩阵分解减少微调参数量的方法"},
{"input": "RAG是什么?", "expected": "检索增强生成,结合外部知识库和LLM"},
]
# 3) 定义评估指标
@weave.op()
def relevance_scorer(output: str, expected: str) -> dict:
"""用LLM-as-Judge评估回答相关性"""
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"评估回答与参考答案的相关性(1-5分):\n回答: {output}\n参考: {expected}\n只输出数字",
}],
)
score = int(response.choices[0].message.content.strip())
return {"relevance": score}
# 4) 运行评估流水线
@weave.op()
def evaluate_llm():
"""批量评估LLM输出质量"""
results = []
for item in eval_dataset:
output = chat_completion(item["input"])
scores = relevance_scorer(output, item["expected"])
results.append({
"input": item["input"],
"output": output,
"expected": item["expected"],
**scores,
})
avg_relevance = sum(r["relevance"] for r in results) / len(results)
print(f"平均相关性得分: {avg_relevance:.1f}/5.0")
return results
# evaluate_llm()
# 运行后在 https://wandb.ai 的Weave面板中查看:
# - 每次LLM调用的完整Prompt/Response
# - Token用量和延迟统计
# - 评估指标趋势
# - Prompt版本对比
Weave vs 传统W&B的区别:
| 功能 | 传统W&B | Weave (GenAI) |
|---|---|---|
| 追踪对象 | loss/accuracy等数值指标 | Prompt/Response文本 + Token数 |
| 评估方式 | 标准ML指标 | LLM-as-Judge、人工评估 |
| 版本管理 | 模型权重 | Prompt模板版本 |
| 可视化 | 训练曲线 | 对话trace、Prompt对比 |
| 典型用户 | ML Engineer | LLM应用开发者 |
三、模型版本管理(Model Registry)¶
3.1 MLflow Model Registry¶
Model Registry是模型版本管理的核心,支持模型注册、版本控制、阶段转换。
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# ===== 1. 注册模型 =====
# 方式一:训练时直接注册
with mlflow.start_run() as run:
# ... 训练过程 ...
mlflow.sklearn.log_model(
model, "model",
registered_model_name="text-classifier" # 自动注册
)
# 方式二:从已有run注册
# result = mlflow.register_model(
# model_uri=f"runs:/{run_id}/model",
# name="text-classifier"
# )
# ===== 2. 模型版本管理 =====
# 查看所有注册模型及版本(MLflow 2.9.0+ 使用别名系统)
for rm in client.search_registered_models():
print(f"Model: {rm.name}")
versions = client.search_model_versions(f"name='{rm.name}'")
for mv in versions:
aliases = mv.aliases # 别名列表,如 ['champion', 'staging']
print(f" Version: {mv.version}, Aliases: {aliases}")
# ===== 3. 模型别名管理(推荐,替代已废弃的阶段系统) =====
# 注意:transition_model_version_stage() 自 MLflow 2.9.0 起已废弃
# 新版本使用别名(aliases)系统
client.set_registered_model_alias(
name="text-classifier",
alias="staging", # 自定义别名:staging / champion / production
version=3
)
# ===== 4. 添加模型描述 =====
client.update_model_version(
name="text-classifier",
version=3,
description="BERT-base模型,F1=0.92,在测试集A上验证通过"
)
# ===== 5. 加载生产模型 =====
# 按别名加载(推荐,MLflow 2.9.0+)
# model = mlflow.pyfunc.load_model("models:/text-classifier@champion")
# 按版本加载
# model = mlflow.pyfunc.load_model("models:/text-classifier/3")
3.2 模型审批工作流¶
class ModelApprovalWorkflow:
"""模型审批上线工作流"""
def __init__(self, model_name: str):
self.client = MlflowClient()
self.model_name = model_name
def validate_model(self, version: int, test_data, metrics_threshold: dict) -> bool:
"""验证模型是否满足上线标准"""
# 加载模型
model_uri = f"models:/{self.model_name}/{version}"
# model = mlflow.pyfunc.load_model(model_uri)
# 执行验证测试
checks = {
"accuracy_check": True, # metrics["accuracy"] >= threshold
"latency_check": True, # p99_latency < 100ms
"bias_check": True, # fairness metrics pass
"data_quality_check": True, # input data schema validation
}
all_passed = all(checks.values())
# 记录验证结果
self.client.set_model_version_tag(
name=self.model_name,
version=str(version),
key="validation_status",
value="PASSED" if all_passed else "FAILED"
)
return all_passed
def promote_to_production(self, version: int):
"""将模型提升到生产环境(使用别名系统,替代已废弃的阶段API)"""
# 1. 先验证
if not self.validate_model(version, test_data=None, metrics_threshold={}):
raise ValueError(f"Model version {version} failed validation")
# 2. 将新模型设置为 champion(生产环境)
self.client.set_registered_model_alias(
name=self.model_name,
alias="champion",
version=version
)
print(f"Model {self.model_name} v{version} promoted to Production (alias: champion)")
print(f"加载方式: mlflow.pyfunc.load_model('models:/{self.model_name}@champion')")
# 使用示例
# workflow = ModelApprovalWorkflow("text-classifier")
# workflow.promote_to_production(version=5)
四、DVC数据版本控制¶
4.1 DVC基础概念¶
DVC(Data Version Control)是专门用于机器学习项目的数据版本控制工具,可以看作"数据的Git"。
# 安装DVC
# pip install dvc dvc-s3 dvc-gs # 根据存储后端安装
# 初始化DVC(在Git仓库中)
# dvc init
# git add .dvc .dvcignore
# git commit -m "Initialize DVC"
# 配置远程存储
# dvc remote add -d myremote s3://my-bucket/dvc-storage
# dvc remote modify myremote access_key_id YOUR_KEY
# dvc remote modify myremote secret_access_key YOUR_SECRET
4.2 数据追踪与版本管理¶
# ===== 追踪大文件/数据集 =====
# dvc add data/train.csv
# → 生成 data/train.csv.dvc(元数据文件,提交到Git)
# → 原始文件加入 .gitignore
# git add data/train.csv.dvc data/.gitignore
# git commit -m "Add training data v1"
# ===== 推送数据到远程存储 =====
# dvc push
# ===== 拉取数据 =====
# dvc pull
# ===== 切换数据版本 =====
# git checkout v1.0 # 切换到对应Git tag
# dvc checkout # DVC自动拉取对应版本的数据
4.3 DVC Pipeline¶
# dvc.yaml - 定义ML Pipeline
stages:
preprocess:
cmd: python src/preprocess.py --input data/raw --output data/processed
deps:
- src/preprocess.py
- data/raw
outs:
- data/processed
params:
- preprocess.max_length
- preprocess.vocab_size
train:
cmd: python src/train.py --data data/processed --model models/model.pt
deps:
- src/train.py
- data/processed
outs:
- models/model.pt
params:
- train.learning_rate
- train.batch_size
- train.epochs
metrics:
- metrics/train_metrics.json:
cache: false
plots:
- metrics/loss_curve.csv:
x: epoch
y: loss
evaluate:
cmd: python src/evaluate.py --model models/model.pt --data data/processed/test
deps:
- src/evaluate.py
- models/model.pt
- data/processed/test
metrics:
- metrics/eval_metrics.json:
cache: false
# params.yaml - 配置参数(DVC自动跟踪变化)
# preprocess:
# max_length: 128
# vocab_size: 30000
#
# train:
# learning_rate: 2e-5
# batch_size: 16
# epochs: 10
# 运行Pipeline
# dvc repro # 运行所有有变化的阶段
# dvc repro train # 只运行train阶段及其依赖
# 比较实验
# dvc metrics diff # 比较当前与上次的指标
# dvc params diff # 比较参数变化
# dvc plots diff # 生成对比图表
4.4 DVC与MLflow集成¶
import mlflow
import yaml
import json
import subprocess
def train_with_dvc_mlflow():
"""DVC管理数据+Pipeline,MLflow跟踪实验"""
# 1. 从DVC params.yaml读取参数
with open("params.yaml", "r") as f:
params = yaml.safe_load(f)
train_params = params.get("train", {
"learning_rate": 2e-5,
"batch_size": 16,
"epochs": 10
})
# 2. 获取DVC数据版本信息
# dvc_status = subprocess.run(["dvc", "version"], capture_output=True, text=True)
# 3. MLflow记录实验
with mlflow.start_run(run_name="dvc-integrated-run"):
mlflow.log_params(train_params)
mlflow.set_tag("data_version", "v2.1") # 来自git tag
mlflow.set_tag("dvc_commit", "abc123") # DVC hash
# ... 训练过程 ...
# 4. 保存指标供DVC tracking
metrics = {"accuracy": 0.92, "f1": 0.90}
with open("metrics/train_metrics.json", "w") as f:
json.dump(metrics, f, indent=2)
mlflow.log_metrics(metrics)
print("Training complete with DVC + MLflow integration")
# train_with_dvc_mlflow()
五、实验可复现性¶
5.1 随机种子管理¶
import os
import random
import numpy as np
import torch
def set_seed(seed: int = 42):
"""设置全局随机种子,确保实验可复现"""
# Python内置随机数
random.seed(seed)
# Numpy
np.random.seed(seed)
# PyTorch CPU
torch.manual_seed(seed)
# PyTorch GPU
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed) # 多GPU
# CuDNN确定性
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# 环境变量
os.environ["PYTHONHASHSEED"] = str(seed)
# TensorFlow
# tf.random.set_seed(seed)
print(f"Random seed set to {seed}")
# 在训练开始前调用
set_seed(42)
5.2 环境管理¶
import subprocess
import platform
import sys
def capture_environment():
"""捕获完整的运行环境信息"""
env_info = {
"python_version": sys.version,
"platform": platform.platform(),
"processor": platform.processor(),
}
# GPU信息
try: # try/except捕获异常
import torch
env_info["pytorch_version"] = torch.__version__
env_info["cuda_version"] = torch.version.cuda or "N/A"
env_info["gpu_count"] = torch.cuda.device_count()
if torch.cuda.is_available():
env_info["gpu_name"] = torch.cuda.get_device_name(0)
env_info["gpu_memory"] = f"{torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB"
except ImportError:
pass
# 已安装的包
result = subprocess.run(
[sys.executable, "-m", "pip", "freeze"],
capture_output=True, text=True
)
env_info["packages"] = result.stdout
return env_info
# 使用MLflow记录环境
# with mlflow.start_run():
# env = capture_environment()
# for key, value in env.items():
# if key != "packages":
# mlflow.log_param(f"env_{key}", value)
# mlflow.log_text(env["packages"], "requirements.txt")
5.3 配置管理(Hydra/OmegaConf)¶
# pip install hydra-core omegaconf
# config.yaml 示例
"""
model:
name: bert-base-chinese
hidden_dim: 768
num_layers: 12
dropout: 0.1
training:
learning_rate: 2e-5
batch_size: 16
epochs: 10
warmup_ratio: 0.1
weight_decay: 0.01
gradient_accumulation_steps: 2
max_grad_norm: 1.0
data:
train_path: data/train.csv
val_path: data/val.csv
max_length: 128
num_workers: 4
"""
from dataclasses import dataclass, field
@dataclass
class ModelConfig:
name: str = "bert-base-chinese"
hidden_dim: int = 768
num_layers: int = 12
dropout: float = 0.1
@dataclass
class TrainingConfig:
learning_rate: float = 2e-5
batch_size: int = 16
epochs: int = 10
warmup_ratio: float = 0.1
weight_decay: float = 0.01
gradient_accumulation_steps: int = 2
max_grad_norm: float = 1.0
seed: int = 42
@dataclass
class DataConfig:
train_path: str = "data/train.csv"
val_path: str = "data/val.csv"
max_length: int = 128
num_workers: int = 4
@dataclass
class ExperimentConfig:
model: ModelConfig = field(default_factory=ModelConfig)
training: TrainingConfig = field(default_factory=TrainingConfig)
data: DataConfig = field(default_factory=DataConfig)
experiment_name: str = "default"
# Hydra使用示例
# @hydra.main(config_path="configs", config_name="config", version_base=None)
# def main(cfg: ExperimentConfig):
# set_seed(cfg.training.seed)
# mlflow.log_params(OmegaConf.to_container(cfg, resolve=True))
# train(cfg)
5.4 完整的可复现训练模板¶
import os
import json
import hashlib
from datetime import datetime
from dataclasses import dataclass, asdict
@dataclass # 自动生成__init__等方法
class ReproducibleExperiment:
"""完全可复现的实验模板"""
# 实验标识
experiment_name: str = "my-experiment"
run_name: str = ""
seed: int = 42
# 模型参数
model_name: str = "bert-base-chinese"
learning_rate: float = 2e-5
batch_size: int = 16
epochs: int = 10
def __post_init__(self):
if not self.run_name:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.run_name = f"{self.model_name}_{timestamp}"
def get_config_hash(self) -> str:
"""生成配置哈希,快速比对实验配置"""
config_str = json.dumps(asdict(self), sort_keys=True) # json.dumps将Python对象转为JSON字符串
return hashlib.md5(config_str.encode()).hexdigest()[:8] # 切片操作:[start:end:step]提取子序列
def save_config(self, path: str):
"""保存完整实验配置"""
config = asdict(self)
config["config_hash"] = self.get_config_hash()
config["timestamp"] = datetime.now().isoformat()
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
with open(path, "w") as f: # with自动管理资源,确保文件正确关闭
json.dump(config, f, indent=2, ensure_ascii=False)
def run(self):
"""执行可复现的训练流程"""
import mlflow
# 1. 设置种子
set_seed(self.seed)
# 2. 记录环境
env_info = capture_environment()
# 3. MLflow记录
mlflow.set_experiment(self.experiment_name)
with mlflow.start_run(run_name=self.run_name):
# 记录所有配置
mlflow.log_params(asdict(self))
mlflow.set_tag("config_hash", self.get_config_hash())
# 记录环境
for k, v in env_info.items():
if k != "packages":
mlflow.set_tag(f"env_{k}", str(v))
# 保存配置文件
config_path = f"/tmp/{self.run_name}_config.json"
self.save_config(config_path)
mlflow.log_artifact(config_path)
# === 训练过程 ===
print(f"Running experiment: {self.run_name}")
print(f"Config hash: {self.get_config_hash()}")
# ... 实际训练代码 ...
mlflow.log_metric("final_accuracy", 0.92)
# 使用
# exp = ReproducibleExperiment(
# experiment_name="text-cls",
# model_name="bert-base-chinese",
# learning_rate=3e-5,
# batch_size=32
# )
# exp.run()
六、实战:完整的训练+跟踪流程¶
"""
完整实战:sklearn + MLflow 实验管理
包含:数据准备 → 多模型训练 → 实验对比 → 最佳模型注册
"""
import mlflow
import mlflow.sklearn
import numpy as np
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings("ignore")
class TextClassificationExperiment:
"""文本分类实验管理"""
MODELS = {
"logistic_regression": {
"class": LogisticRegression,
"params": {"C": 1.0, "max_iter": 1000, "random_state": 42}
},
"svm": {
"class": LinearSVC,
"params": {"C": 1.0, "max_iter": 1000, "random_state": 42}
},
"random_forest": {
"class": RandomForestClassifier,
"params": {"n_estimators": 100, "max_depth": 20, "random_state": 42}
},
"gradient_boosting": {
"class": GradientBoostingClassifier,
"params": {"n_estimators": 100, "max_depth": 5, "random_state": 42}
}
}
def __init__(self, experiment_name: str = "text-classification-benchmark"):
self.experiment_name = experiment_name
mlflow.set_experiment(experiment_name)
self.results = []
def prepare_data(self):
"""准备数据"""
categories = ['sci.space', 'comp.graphics', 'rec.sport.baseball', 'talk.politics.misc']
newsgroups = fetch_20newsgroups(
subset='all', categories=categories,
remove=('headers', 'footers', 'quotes')
)
# TF-IDF特征提取
vectorizer = TfidfVectorizer(max_features=5000, stop_words='english')
X = vectorizer.fit_transform(newsgroups.data)
y = newsgroups.target
self.X_train, self.X_test, self.y_train, self.y_test = \
train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
self.target_names = newsgroups.target_names
self.vectorizer = vectorizer
print(f"Train samples: {self.X_train.shape[0]}, Test samples: {self.X_test.shape[0]}")
def train_model(self, model_name: str):
"""训练单个模型并记录到MLflow"""
model_config = self.MODELS[model_name]
with mlflow.start_run(run_name=model_name) as run:
# 记录参数
mlflow.log_params(model_config["params"])
mlflow.set_tag("model_type", model_name)
# 训练
model = model_config["class"](**model_config["params"])
model.fit(self.X_train, self.y_train)
# 评估
y_pred = model.predict(self.X_test)
metrics = {
"accuracy": accuracy_score(self.y_test, y_pred),
"f1_macro": f1_score(self.y_test, y_pred, average="macro"),
"f1_weighted": f1_score(self.y_test, y_pred, average="weighted"),
}
mlflow.log_metrics(metrics)
# 记录分类报告
report = classification_report(self.y_test, y_pred, target_names=self.target_names)
mlflow.log_text(report, "classification_report.txt")
# 记录模型
mlflow.sklearn.log_model(model, "model")
self.results.append({
"name": model_name,
"run_id": run.info.run_id,
**metrics
})
print(f" {model_name}: accuracy={metrics['accuracy']:.4f}, f1={metrics['f1_macro']:.4f}")
return run.info.run_id
def run_all(self):
"""运行所有模型"""
print("=" * 60)
print("Starting benchmark experiments...")
print("=" * 60)
self.prepare_data()
for model_name in self.MODELS:
self.train_model(model_name)
# 找出最佳模型
best = max(self.results, key=lambda x: x["f1_macro"]) # lambda匿名函数:简洁的单行函数
print(f"\nBest model: {best['name']} (F1={best['f1_macro']:.4f})")
return best
# 运行实验
# exp = TextClassificationExperiment()
# best_model = exp.run_all()
💼 面试常考题¶
Q1: MLflow和W&B有什么区别?各自适用什么场景?¶
答:MLflow是开源的,可以本地或私有部署,适合对数据安全敏感的企业;W&B是云端SaaS服务(也有企业版),可视化能力更强,内置Sweep超参搜索。小团队/个人研究选W&B更省事,大企业/敏感数据选MLflow私有部署。
Q2: 为什么需要Model Registry?它解决了什么问题?¶
答:Model Registry解决了"哪个模型在生产环境?""能否安全回滚?"的问题。它提供模型版本管理、别名管理(如 champion/challenger 别名标识生产与候选模型)、审批流程、元数据追踪,确保模型从开发到上线有完整的生命周期管理。注意:MLflow 2.9.0+ 已用别名系统替代旧的 Stage(Staging/Production)机制。
Q3: 如何确保ML实验的可复现性?¶
答:五个层面:①固定随机种子(Python/Numpy/PyTorch/CUDA)②锁定环境依赖(pip freeze/Docker)③版本控制代码(Git)和数据(DVC)④记录完整配置(Hydra/配置文件)⑤记录硬件环境信息。缺一不可。
Q4: DVC和Git LFS有什么区别?为什么ML项目更适合用DVC?¶
答:Git LFS只管大文件存储,DVC还提供数据Pipeline、实验管理、指标跟踪。DVC与Git深度集成但独立管理数据存储,支持多种后端(S3/GCS/Azure),且能通过dvc.yaml定义可复现的ML Pipeline。
Q5: 设计一个模型版本管理方案,支持A/B测试和快速回滚。¶
答:使用Model Registry的别名系统(MLflow 2.9.0+): - 新模型注册后设置别名 challenger,表示候选模型 - 通过自动化门禁验证后,设置别名 canary,灰度发布5%流量 - 线上验证通过后,将别名 champion 指向新版本,全量上线 - 每个版本保留metadata和验证报告 - 回滚时直接将 champion 别名指向上一个版本即可 - 结合模型签名验证输入/输出Schema一致性
Q6: 如何管理大规模实验(1000+次)的组织和检索?¶
答:①良好的命名规范(项目-模型-日期-hash)②Tag系统(目标、方法、数据集)③结构化搜索(MLflow search API)④定期清理无用实验⑤建立实验文档模板⑥使用W&B Report汇总关键实验。
Q7: MLflow Model的Signature有什么作用?¶
答:Model Signature定义了模型输入/输出的Schema(列名、类型、形状),用于部署时的输入验证。如果输入数据不符合Signature,推理服务会拒绝请求并返回错误,防止"脏数据"导致生产事故。
Q8: 如何在团队中推行实验管理最佳实践?¶
答:①建立统一的实验跟踪基础设施(共享MLflow/W&B服务器)②制定实验命名和Tag规范③提供标准训练模板(包含跟踪代码)④Code Review中检查实验跟踪完整性⑤定期进行实验复盘⑥将跟踪代码集成到CI/CD流程。
✅ 学习检查清单¶
- 能使用MLflow记录参数、指标、模型和Artifact
- 能使用W&B进行实验跟踪和超参搜索
- 理解Model Registry的阶段管理流程
- 能使用DVC进行数据版本控制和Pipeline管理
- 能设置完整的随机种子管理确保可复现性
- 理解Hydra/OmegaConf配置管理
- 能设计完整的实验跟踪方案
- 能完成模型从开发到注册的全流程
- 理解DVC与MLflow的集成方式
- 能回答所有面试题
📌 下一章:02-模型部署与服务化 — 将训练好的模型部署为生产级推理服务
