跳转至

🔧 实验管理与模型版本

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

学习目标:掌握MLflow/W&B实验跟踪、模型版本管理、DVC数据版本控制,建立完全可复现的ML实验流程 预计时长:10-12小时 前置知识:Python编程、PyTorch/TensorFlow基础


📋 本章概览

在实际AI项目中,模型开发涉及大量实验——不同的超参数、数据集、特征组合。如果缺乏系统化的实验管理,很快就会陷入"哪个实验效果最好?""那个模型用的什么参数?"的困境。本章将系统介绍工业界主流的实验管理工具和最佳实践。

Text Only
实验管理全景图:
┌─────────────────────────────────────────────────────┐
│                    实验管理平台                        │
│  ┌──────────┐  ┌──────────┐  ┌──────────────────┐   │
│  │ 参数记录  │  │ 指标跟踪  │  │ Artifact管理     │   │
│  │ lr/batch  │  │ loss/acc  │  │ 模型/数据/图表    │   │
│  └──────────┘  └──────────┘  └──────────────────┘   │
│  ┌──────────┐  ┌──────────┐  ┌──────────────────┐   │
│  │ 模型注册  │  │ 数据版本  │  │ 环境可复现       │   │
│  │ Registry  │  │ DVC      │  │ 种子/依赖/配置    │   │
│  └──────────┘  └──────────┘  └──────────────────┘   │
└─────────────────────────────────────────────────────┘

MLflow实验跟踪界面

上图展示了MLflow的实验跟踪界面,可以看到所有实验运行的列表,包括模型参数、状态、创建时间和指标。这个界面允许用户按不同属性排序和过滤实验,方便快速找到最佳模型。


一、MLflow实验跟踪

1.1 MLflow核心概念

MLflow是一个开源的ML生命周期管理平台,核心组件包括:

组件 功能 说明
Tracking 实验跟踪 记录参数、指标、Artifact
Models 模型打包 统一模型格式,支持多框架
Model Registry 模型注册 模型版本管理与阶段转换
Projects 项目打包 可复现的代码运行环境

1.2 安装与基础配置

Python
# 安装MLflow
# pip install mlflow>=2.10

import mlflow
import mlflow.pytorch  # 或 mlflow.sklearn, mlflow.tensorflow

# 设置跟踪服务器(本地或远程)
# 本地文件存储
mlflow.set_tracking_uri("file:///tmp/mlflow-experiments")

# 远程服务器
# mlflow.set_tracking_uri("http://mlflow-server:5000")

# 创建/设置实验
mlflow.set_experiment("text-classification-bert")

print(f"Tracking URI: {mlflow.get_tracking_uri()}")
print(f"Experiment: {mlflow.get_experiment_by_name('text-classification-bert')}")

1.3 参数与指标记录

Python
import mlflow
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

# 生成示例数据
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 定义超参数
params = {
    "n_estimators": 100,
    "max_depth": 10,
    "min_samples_split": 5,
    "min_samples_leaf": 2,
    "random_state": 42
}

# 使用MLflow记录实验
mlflow.set_experiment("rf-classification")

with mlflow.start_run(run_name="rf-baseline") as run:
    # 1. 记录参数
    mlflow.log_params(params)

    # 2. 记录额外元信息(tags)
    mlflow.set_tags({
        "model_type": "RandomForest",
        "dataset": "synthetic",
        "author": "your_name",
        "purpose": "baseline"
    })

    # 3. 训练模型
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # 4. 评估并记录指标
    y_pred = model.predict(X_test)

    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_score": f1_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred),
        "recall": recall_score(y_test, y_pred)
    }
    mlflow.log_metrics(metrics)

    # 5. 记录训练过程指标(按步骤)
    for epoch in range(10):
        mlflow.log_metric("training_loss", np.random.exponential(0.5) * (0.9 ** epoch), step=epoch)
        mlflow.log_metric("val_accuracy", 0.7 + 0.03 * epoch - np.random.normal(0, 0.01), step=epoch)

    # 6. 记录模型
    mlflow.sklearn.log_model(model, "model")

    # 7. 记录Artifact(图表、数据等)
    # 生成并保存混淆矩阵
    from sklearn.metrics import confusion_matrix
    import json

    cm = confusion_matrix(y_test, y_pred).tolist()
    with open("confusion_matrix.json", "w") as f:
        json.dump({"matrix": cm}, f)
    mlflow.log_artifact("confusion_matrix.json")

    print(f"Run ID: {run.info.run_id}")
    print(f"Metrics: {metrics}")

1.4 PyTorch模型集成MLflow

Python
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import mlflow
import mlflow.pytorch

class SimpleClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout=0.3):
        super().__init__()  # super()调用父类方法
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, output_dim)
        )

    def forward(self, x):
        return self.net(x)

def train_with_mlflow(config: dict):
    """完整的训练+MLflow跟踪流程"""
    mlflow.set_experiment("pytorch-classifier")

    with mlflow.start_run(run_name=f"lr{config['lr']}_bs{config['batch_size']}"):
        # 记录所有超参数
        mlflow.log_params(config)

        # 准备数据
        X_train = torch.randn(800, config["input_dim"])
        y_train = torch.randint(0, config["output_dim"], (800,))
        X_val = torch.randn(200, config["input_dim"])
        y_val = torch.randint(0, config["output_dim"], (200,))

        train_loader = DataLoader(
            TensorDataset(X_train, y_train),
            batch_size=config["batch_size"],
            shuffle=True
        )

        # 初始化模型
        model = SimpleClassifier(
            config["input_dim"], config["hidden_dim"],
            config["output_dim"], config["dropout"]
        )
        optimizer = optim.Adam(model.parameters(), lr=config["lr"], weight_decay=config["weight_decay"])
        criterion = nn.CrossEntropyLoss()

        # 训练循环
        best_val_acc = 0.0
        for epoch in range(config["epochs"]):
            model.train()
            total_loss = 0
            for X_batch, y_batch in train_loader:
                optimizer.zero_grad()
                output = model(X_batch)
                loss = criterion(output, y_batch)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()

            avg_loss = total_loss / len(train_loader)

            # 验证
            model.eval()
            with torch.no_grad():
                val_output = model(X_val)
                val_loss = criterion(val_output, y_val).item()
                val_preds = val_output.argmax(dim=1)
                val_acc = (val_preds == y_val).float().mean().item()

            # 记录每个epoch的指标
            mlflow.log_metrics({
                "train_loss": avg_loss,
                "val_loss": val_loss,
                "val_accuracy": val_acc
            }, step=epoch)

            # 记录最佳模型
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                mlflow.pytorch.log_model(model, "best_model")
                mlflow.log_metric("best_val_accuracy", best_val_acc)

        # 记录最终模型
        mlflow.pytorch.log_model(model, "final_model")

        # 记录模型结构信息
        mlflow.log_text(str(model), "model_architecture.txt")

        # 记录模型参数量
        total_params = sum(p.numel() for p in model.parameters())
        trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
        mlflow.log_metrics({
            "total_params": total_params,
            "trainable_params": trainable_params
        })

        print(f"Training complete. Best val accuracy: {best_val_acc:.4f}")

# 运行训练
config = {
    "input_dim": 20,
    "hidden_dim": 128,
    "output_dim": 5,
    "dropout": 0.3,
    "lr": 1e-3,
    "weight_decay": 1e-4,
    "batch_size": 32,
    "epochs": 20
}

# train_with_mlflow(config)  # 取消注释运行

1.5 MLflow实验对比与查询

Python
import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# 查询实验中的所有运行
experiment = client.get_experiment_by_name("pytorch-classifier")
if experiment:
    runs = client.search_runs(
        experiment_ids=[experiment.experiment_id],
        filter_string="metrics.val_accuracy > 0.5",
        order_by=["metrics.val_accuracy DESC"],
        max_results=10
    )

    print("Top runs by validation accuracy:")
    for run in runs:
        print(f"  Run: {run.info.run_id[:8]}  "
              f"lr={run.data.params.get('lr', 'N/A')}  "
              f"val_acc={run.data.metrics.get('val_accuracy', 'N/A'):.4f}")

# 加载指定run的模型
# model = mlflow.pytorch.load_model(f"runs:/{run_id}/best_model")

# 比较两个run的指标
def compare_runs(run_id_1: str, run_id_2: str):
    """比较两个实验运行的指标差异"""
    run1 = client.get_run(run_id_1)
    run2 = client.get_run(run_id_2)

    all_metrics = set(run1.data.metrics.keys()) | set(run2.data.metrics.keys())

    print(f"{'Metric':<20} {'Run 1':>12} {'Run 2':>12} {'Diff':>12}")
    print("-" * 60)
    for metric in sorted(all_metrics):
        v1 = run1.data.metrics.get(metric, float('nan'))
        v2 = run2.data.metrics.get(metric, float('nan'))
        diff = v2 - v1
        print(f"{metric:<20} {v1:>12.4f} {v2:>12.4f} {diff:>+12.4f}")

二、Weights & Biases(W&B)

2.1 W&B核心优势

W&B Dashboard界面

上图展示了W&B的Dashboard界面,显示了机器学习运行的结果、超参数优化图表和参数重要性图表。W&B提供了强大的可视化功能,包括并行坐标图、参数重要性分析等,帮助团队更好地理解和优化模型性能。

W&B相比MLflow的主要优势: - 更好的可视化:内置丰富的图表和交互式Dashboard - 团队协作:云端共享实验结果 - Sweep超参搜索:内置超参数优化 - Artifact版本:完整的数据/模型版本管理 - Report报告:可生成交互式实验报告

2.2 W&B基础使用

Python
# pip install wandb
import wandb
import numpy as np

# 初始化(首次需要API key)
# wandb.login(key="your-api-key")

def train_with_wandb():
    """使用W&B跟踪训练过程"""

    # 初始化run
    run = wandb.init(
        project="text-classification",
        name="bert-base-v1",
        config={
            "model": "bert-base-chinese",
            "learning_rate": 2e-5,
            "batch_size": 16,
            "epochs": 10,
            "max_length": 128,
            "warmup_steps": 100,
            "weight_decay": 0.01,
            "optimizer": "AdamW",
            "scheduler": "linear"
        },
        tags=["bert", "baseline", "chinese"],
        notes="BERT中文文本分类基线实验"
    )

    config = wandb.config

    # 模拟训练过程
    for epoch in range(config.epochs):
        # 模拟训练指标
        train_loss = 2.0 * np.exp(-0.3 * epoch) + np.random.normal(0, 0.05)
        train_acc = 1.0 - np.exp(-0.5 * epoch) * 0.6 + np.random.normal(0, 0.02)
        val_loss = 2.0 * np.exp(-0.25 * epoch) + np.random.normal(0, 0.08)
        val_acc = 1.0 - np.exp(-0.4 * epoch) * 0.6 + np.random.normal(0, 0.03)
        lr = config.learning_rate * (1 - epoch / config.epochs)

        # 记录指标
        wandb.log({
            "epoch": epoch,
            "train/loss": train_loss,
            "train/accuracy": train_acc,
            "val/loss": val_loss,
            "val/accuracy": val_acc,
            "learning_rate": lr
        })

    # 记录最终结果摘要
    wandb.summary["best_val_accuracy"] = 0.92
    wandb.summary["best_epoch"] = 8

    # 记录混淆矩阵
    y_true = np.random.randint(0, 5, 100)
    y_pred = y_true.copy()
    y_pred[np.random.choice(100, 15, replace=False)] = np.random.randint(0, 5, 15)

    wandb.log({
        "confusion_matrix": wandb.plot.confusion_matrix(
            y_true=y_true,
            preds=y_pred,
            class_names=["政治", "经济", "科技", "体育", "娱乐"]
        )
    })

    wandb.finish()

# train_with_wandb()  # 取消注释运行

2.3 W&B Sweep超参搜索

Python
import wandb

# 定义搜索空间
sweep_config = {
    "method": "bayes",  # bayes, random, grid
    "metric": {
        "name": "val/accuracy",
        "goal": "maximize"
    },
    "parameters": {
        "learning_rate": {
            "distribution": "log_uniform_values",
            "min": 1e-5,
            "max": 1e-3
        },
        "batch_size": {
            "values": [8, 16, 32, 64]
        },
        "dropout": {
            "distribution": "uniform",
            "min": 0.1,
            "max": 0.5
        },
        "hidden_dim": {
            "values": [64, 128, 256, 512]
        },
        "optimizer": {
            "values": ["adam", "adamw", "sgd"]
        }
    },
    "early_terminate": {
        "type": "hyperband",
        "min_iter": 3,
        "eta": 2
    }
}

def sweep_train():
    """Sweep训练函数"""
    run = wandb.init()
    config = wandb.config

    # 使用config中的超参进行训练
    import numpy as np
    for epoch in range(10):
        loss = np.exp(-config.learning_rate * 1000 * epoch) * 2 + np.random.normal(0, 0.05)
        acc = min(0.95, config.learning_rate * 5000 * np.log(epoch + 1) * (1 - config.dropout * 0.5))
        wandb.log({"val/accuracy": acc, "val/loss": loss, "epoch": epoch})

    wandb.finish()

# 创建并运行Sweep
# sweep_id = wandb.sweep(sweep_config, project="text-classification")
# wandb.agent(sweep_id, function=sweep_train, count=20)

2.4 W&B Artifacts管理

Python
import wandb

def log_dataset_artifact():
    """记录数据集Artifact"""
    run = wandb.init(project="mlops-demo", job_type="data-preparation")

    # 创建数据集Artifact
    artifact = wandb.Artifact(
        name="train-dataset",
        type="dataset",
        description="预处理后的训练数据集",
        metadata={
            "num_samples": 10000,
            "num_features": 768,
            "num_classes": 5,
            "preprocessing": "tokenize + normalize"
        }
    )

    # 添加文件/目录
    # artifact.add_file("data/train.csv")
    # artifact.add_dir("data/processed/")

    run.log_artifact(artifact)
    wandb.finish()

def use_dataset_artifact():
    """使用已记录的数据集Artifact"""
    run = wandb.init(project="mlops-demo", job_type="training")

    # 下载指定版本的Artifact
    artifact = run.use_artifact("train-dataset:v2")
    artifact_dir = artifact.download()

    print(f"Dataset downloaded to: {artifact_dir}")
    wandb.finish()

2.5 W&B Weave —— 生成式AI评估与调试

W&B在2024年推出了Weave,专门面向LLM/GenAI的评估和调试场景(对标llm-cookbook中Evaluating and Debugging Generative AI课程):

Python
"""W&B Weave —— LLM应用评估与Prompt追踪"""

import weave
# pip install weave

# 初始化Weave项目
weave.init("llm-eval-demo")

# 1) 用 @weave.op() 自动追踪LLM调用
@weave.op()
def chat_completion(prompt: str, model: str = "gpt-4o-mini") -> str:
    """追踪每次LLM调用的输入/输出/延迟/Token数"""
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
    )
    return response.choices[0].message.content

# 2) 定义评估数据集
eval_dataset = [
    {"input": "什么是Transformer?", "expected": "一种基于自注意力机制的神经网络架构"},
    {"input": "解释LoRA微调", "expected": "通过低秩矩阵分解减少微调参数量的方法"},
    {"input": "RAG是什么?", "expected": "检索增强生成,结合外部知识库和LLM"},
]

# 3) 定义评估指标
@weave.op()
def relevance_scorer(output: str, expected: str) -> dict:
    """用LLM-as-Judge评估回答相关性"""
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": f"评估回答与参考答案的相关性(1-5分):\n回答: {output}\n参考: {expected}\n只输出数字",
        }],
    )
    score = int(response.choices[0].message.content.strip())
    return {"relevance": score}

# 4) 运行评估流水线
@weave.op()
def evaluate_llm():
    """批量评估LLM输出质量"""
    results = []
    for item in eval_dataset:
        output = chat_completion(item["input"])
        scores = relevance_scorer(output, item["expected"])
        results.append({
            "input": item["input"],
            "output": output,
            "expected": item["expected"],
            **scores,
        })
    avg_relevance = sum(r["relevance"] for r in results) / len(results)
    print(f"平均相关性得分: {avg_relevance:.1f}/5.0")
    return results

# evaluate_llm()
# 运行后在 https://wandb.ai 的Weave面板中查看:
# - 每次LLM调用的完整Prompt/Response
# - Token用量和延迟统计
# - 评估指标趋势
# - Prompt版本对比

Weave vs 传统W&B的区别

功能 传统W&B Weave (GenAI)
追踪对象 loss/accuracy等数值指标 Prompt/Response文本 + Token数
评估方式 标准ML指标 LLM-as-Judge、人工评估
版本管理 模型权重 Prompt模板版本
可视化 训练曲线 对话trace、Prompt对比
典型用户 ML Engineer LLM应用开发者

三、模型版本管理(Model Registry)

3.1 MLflow Model Registry

Model Registry是模型版本管理的核心,支持模型注册、版本控制、阶段转换。

Python
import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# ===== 1. 注册模型 =====
# 方式一:训练时直接注册
with mlflow.start_run() as run:
    # ... 训练过程 ...
    mlflow.sklearn.log_model(
        model, "model",
        registered_model_name="text-classifier"  # 自动注册
    )

# 方式二:从已有run注册
# result = mlflow.register_model(
#     model_uri=f"runs:/{run_id}/model",
#     name="text-classifier"
# )

# ===== 2. 模型版本管理 =====
# 查看所有注册模型及版本(MLflow 2.9.0+ 使用别名系统)
for rm in client.search_registered_models():
    print(f"Model: {rm.name}")
    versions = client.search_model_versions(f"name='{rm.name}'")
    for mv in versions:
        aliases = mv.aliases  # 别名列表,如 ['champion', 'staging']
        print(f"  Version: {mv.version}, Aliases: {aliases}")

# ===== 3. 模型别名管理(推荐,替代已废弃的阶段系统) =====
# 注意:transition_model_version_stage() 自 MLflow 2.9.0 起已废弃
# 新版本使用别名(aliases)系统
client.set_registered_model_alias(
    name="text-classifier",
    alias="staging",  # 自定义别名:staging / champion / production
    version=3
)

# ===== 4. 添加模型描述 =====
client.update_model_version(
    name="text-classifier",
    version=3,
    description="BERT-base模型,F1=0.92,在测试集A上验证通过"
)

# ===== 5. 加载生产模型 =====
# 按别名加载(推荐,MLflow 2.9.0+)
# model = mlflow.pyfunc.load_model("models:/text-classifier@champion")

# 按版本加载
# model = mlflow.pyfunc.load_model("models:/text-classifier/3")

3.2 模型审批工作流

Python
class ModelApprovalWorkflow:
    """模型审批上线工作流"""

    def __init__(self, model_name: str):
        self.client = MlflowClient()
        self.model_name = model_name

    def validate_model(self, version: int, test_data, metrics_threshold: dict) -> bool:
        """验证模型是否满足上线标准"""
        # 加载模型
        model_uri = f"models:/{self.model_name}/{version}"
        # model = mlflow.pyfunc.load_model(model_uri)

        # 执行验证测试
        checks = {
            "accuracy_check": True,      # metrics["accuracy"] >= threshold
            "latency_check": True,       # p99_latency < 100ms
            "bias_check": True,          # fairness metrics pass
            "data_quality_check": True,  # input data schema validation
        }

        all_passed = all(checks.values())

        # 记录验证结果
        self.client.set_model_version_tag(
            name=self.model_name,
            version=str(version),
            key="validation_status",
            value="PASSED" if all_passed else "FAILED"
        )

        return all_passed

    def promote_to_production(self, version: int):
        """将模型提升到生产环境(使用别名系统,替代已废弃的阶段API)"""
        # 1. 先验证
        if not self.validate_model(version, test_data=None, metrics_threshold={}):
            raise ValueError(f"Model version {version} failed validation")

        # 2. 将新模型设置为 champion(生产环境)
        self.client.set_registered_model_alias(
            name=self.model_name,
            alias="champion",
            version=version
        )

        print(f"Model {self.model_name} v{version} promoted to Production (alias: champion)")
        print(f"加载方式: mlflow.pyfunc.load_model('models:/{self.model_name}@champion')")

# 使用示例
# workflow = ModelApprovalWorkflow("text-classifier")
# workflow.promote_to_production(version=5)

四、DVC数据版本控制

4.1 DVC基础概念

DVC(Data Version Control)是专门用于机器学习项目的数据版本控制工具,可以看作"数据的Git"。

Bash
# 安装DVC
# pip install dvc dvc-s3 dvc-gs  # 根据存储后端安装

# 初始化DVC(在Git仓库中)
# dvc init
# git add .dvc .dvcignore
# git commit -m "Initialize DVC"

# 配置远程存储
# dvc remote add -d myremote s3://my-bucket/dvc-storage
# dvc remote modify myremote access_key_id YOUR_KEY
# dvc remote modify myremote secret_access_key YOUR_SECRET

4.2 数据追踪与版本管理

Bash
# ===== 追踪大文件/数据集 =====
# dvc add data/train.csv
# → 生成 data/train.csv.dvc(元数据文件,提交到Git)
# → 原始文件加入 .gitignore

# git add data/train.csv.dvc data/.gitignore
# git commit -m "Add training data v1"

# ===== 推送数据到远程存储 =====
# dvc push

# ===== 拉取数据 =====
# dvc pull

# ===== 切换数据版本 =====
# git checkout v1.0  # 切换到对应Git tag
# dvc checkout       # DVC自动拉取对应版本的数据

4.3 DVC Pipeline

YAML
# dvc.yaml - 定义ML Pipeline
stages:
  preprocess:
    cmd: python src/preprocess.py --input data/raw --output data/processed
    deps:
      - src/preprocess.py
      - data/raw
    outs:
      - data/processed
    params:
      - preprocess.max_length
      - preprocess.vocab_size

  train:
    cmd: python src/train.py --data data/processed --model models/model.pt
    deps:
      - src/train.py
      - data/processed
    outs:
      - models/model.pt
    params:
      - train.learning_rate
      - train.batch_size
      - train.epochs
    metrics:
      - metrics/train_metrics.json:
          cache: false
    plots:
      - metrics/loss_curve.csv:
          x: epoch
          y: loss

  evaluate:
    cmd: python src/evaluate.py --model models/model.pt --data data/processed/test
    deps:
      - src/evaluate.py
      - models/model.pt
      - data/processed/test
    metrics:
      - metrics/eval_metrics.json:
          cache: false
Python
# params.yaml - 配置参数(DVC自动跟踪变化)
# preprocess:
#   max_length: 128
#   vocab_size: 30000
#
# train:
#   learning_rate: 2e-5
#   batch_size: 16
#   epochs: 10
Bash
# 运行Pipeline
# dvc repro                # 运行所有有变化的阶段
# dvc repro train          # 只运行train阶段及其依赖

# 比较实验
# dvc metrics diff         # 比较当前与上次的指标
# dvc params diff          # 比较参数变化
# dvc plots diff           # 生成对比图表

4.4 DVC与MLflow集成

Python
import mlflow
import yaml
import json
import subprocess

def train_with_dvc_mlflow():
    """DVC管理数据+Pipeline,MLflow跟踪实验"""

    # 1. 从DVC params.yaml读取参数
    with open("params.yaml", "r") as f:
        params = yaml.safe_load(f)

    train_params = params.get("train", {
        "learning_rate": 2e-5,
        "batch_size": 16,
        "epochs": 10
    })

    # 2. 获取DVC数据版本信息
    # dvc_status = subprocess.run(["dvc", "version"], capture_output=True, text=True)

    # 3. MLflow记录实验
    with mlflow.start_run(run_name="dvc-integrated-run"):
        mlflow.log_params(train_params)
        mlflow.set_tag("data_version", "v2.1")  # 来自git tag
        mlflow.set_tag("dvc_commit", "abc123")   # DVC hash

        # ... 训练过程 ...

        # 4. 保存指标供DVC tracking
        metrics = {"accuracy": 0.92, "f1": 0.90}
        with open("metrics/train_metrics.json", "w") as f:
            json.dump(metrics, f, indent=2)

        mlflow.log_metrics(metrics)

    print("Training complete with DVC + MLflow integration")

# train_with_dvc_mlflow()

五、实验可复现性

5.1 随机种子管理

Python
import os
import random
import numpy as np
import torch

def set_seed(seed: int = 42):
    """设置全局随机种子,确保实验可复现"""
    # Python内置随机数
    random.seed(seed)

    # Numpy
    np.random.seed(seed)

    # PyTorch CPU
    torch.manual_seed(seed)

    # PyTorch GPU
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # 多GPU

    # CuDNN确定性
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

    # 环境变量
    os.environ["PYTHONHASHSEED"] = str(seed)

    # TensorFlow
    # tf.random.set_seed(seed)

    print(f"Random seed set to {seed}")

# 在训练开始前调用
set_seed(42)

5.2 环境管理

Python
import subprocess
import platform
import sys

def capture_environment():
    """捕获完整的运行环境信息"""
    env_info = {
        "python_version": sys.version,
        "platform": platform.platform(),
        "processor": platform.processor(),
    }

    # GPU信息
    try:  # try/except捕获异常
        import torch
        env_info["pytorch_version"] = torch.__version__
        env_info["cuda_version"] = torch.version.cuda or "N/A"
        env_info["gpu_count"] = torch.cuda.device_count()
        if torch.cuda.is_available():
            env_info["gpu_name"] = torch.cuda.get_device_name(0)
            env_info["gpu_memory"] = f"{torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB"
    except ImportError:
        pass

    # 已安装的包
    result = subprocess.run(
        [sys.executable, "-m", "pip", "freeze"],
        capture_output=True, text=True
    )
    env_info["packages"] = result.stdout

    return env_info

# 使用MLflow记录环境
# with mlflow.start_run():
#     env = capture_environment()
#     for key, value in env.items():
#         if key != "packages":
#             mlflow.log_param(f"env_{key}", value)
#     mlflow.log_text(env["packages"], "requirements.txt")

5.3 配置管理(Hydra/OmegaConf)

Python
# pip install hydra-core omegaconf

# config.yaml 示例
"""
model:
  name: bert-base-chinese
  hidden_dim: 768
  num_layers: 12
  dropout: 0.1

training:
  learning_rate: 2e-5
  batch_size: 16
  epochs: 10
  warmup_ratio: 0.1
  weight_decay: 0.01
  gradient_accumulation_steps: 2
  max_grad_norm: 1.0

data:
  train_path: data/train.csv
  val_path: data/val.csv
  max_length: 128
  num_workers: 4
"""

from dataclasses import dataclass, field

@dataclass
class ModelConfig:
    name: str = "bert-base-chinese"
    hidden_dim: int = 768
    num_layers: int = 12
    dropout: float = 0.1

@dataclass
class TrainingConfig:
    learning_rate: float = 2e-5
    batch_size: int = 16
    epochs: int = 10
    warmup_ratio: float = 0.1
    weight_decay: float = 0.01
    gradient_accumulation_steps: int = 2
    max_grad_norm: float = 1.0
    seed: int = 42

@dataclass
class DataConfig:
    train_path: str = "data/train.csv"
    val_path: str = "data/val.csv"
    max_length: int = 128
    num_workers: int = 4

@dataclass
class ExperimentConfig:
    model: ModelConfig = field(default_factory=ModelConfig)
    training: TrainingConfig = field(default_factory=TrainingConfig)
    data: DataConfig = field(default_factory=DataConfig)
    experiment_name: str = "default"

# Hydra使用示例
# @hydra.main(config_path="configs", config_name="config", version_base=None)
# def main(cfg: ExperimentConfig):
#     set_seed(cfg.training.seed)
#     mlflow.log_params(OmegaConf.to_container(cfg, resolve=True))
#     train(cfg)

5.4 完整的可复现训练模板

Python
import os
import json
import hashlib
from datetime import datetime
from dataclasses import dataclass, asdict

@dataclass  # 自动生成__init__等方法
class ReproducibleExperiment:
    """完全可复现的实验模板"""

    # 实验标识
    experiment_name: str = "my-experiment"
    run_name: str = ""
    seed: int = 42

    # 模型参数
    model_name: str = "bert-base-chinese"
    learning_rate: float = 2e-5
    batch_size: int = 16
    epochs: int = 10

    def __post_init__(self):
        if not self.run_name:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            self.run_name = f"{self.model_name}_{timestamp}"

    def get_config_hash(self) -> str:
        """生成配置哈希,快速比对实验配置"""
        config_str = json.dumps(asdict(self), sort_keys=True)  # json.dumps将Python对象转为JSON字符串
        return hashlib.md5(config_str.encode()).hexdigest()[:8]  # 切片操作:[start:end:step]提取子序列

    def save_config(self, path: str):
        """保存完整实验配置"""
        config = asdict(self)
        config["config_hash"] = self.get_config_hash()
        config["timestamp"] = datetime.now().isoformat()

        os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
        with open(path, "w") as f:  # with自动管理资源,确保文件正确关闭
            json.dump(config, f, indent=2, ensure_ascii=False)

    def run(self):
        """执行可复现的训练流程"""
        import mlflow

        # 1. 设置种子
        set_seed(self.seed)

        # 2. 记录环境
        env_info = capture_environment()

        # 3. MLflow记录
        mlflow.set_experiment(self.experiment_name)
        with mlflow.start_run(run_name=self.run_name):
            # 记录所有配置
            mlflow.log_params(asdict(self))
            mlflow.set_tag("config_hash", self.get_config_hash())

            # 记录环境
            for k, v in env_info.items():
                if k != "packages":
                    mlflow.set_tag(f"env_{k}", str(v))

            # 保存配置文件
            config_path = f"/tmp/{self.run_name}_config.json"
            self.save_config(config_path)
            mlflow.log_artifact(config_path)

            # === 训练过程 ===
            print(f"Running experiment: {self.run_name}")
            print(f"Config hash: {self.get_config_hash()}")
            # ... 实际训练代码 ...

            mlflow.log_metric("final_accuracy", 0.92)

# 使用
# exp = ReproducibleExperiment(
#     experiment_name="text-cls",
#     model_name="bert-base-chinese",
#     learning_rate=3e-5,
#     batch_size=32
# )
# exp.run()

六、实战:完整的训练+跟踪流程

Python
"""
完整实战:sklearn + MLflow 实验管理
包含:数据准备 → 多模型训练 → 实验对比 → 最佳模型注册
"""
import mlflow
import mlflow.sklearn
import numpy as np
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings("ignore")

class TextClassificationExperiment:
    """文本分类实验管理"""

    MODELS = {
        "logistic_regression": {
            "class": LogisticRegression,
            "params": {"C": 1.0, "max_iter": 1000, "random_state": 42}
        },
        "svm": {
            "class": LinearSVC,
            "params": {"C": 1.0, "max_iter": 1000, "random_state": 42}
        },
        "random_forest": {
            "class": RandomForestClassifier,
            "params": {"n_estimators": 100, "max_depth": 20, "random_state": 42}
        },
        "gradient_boosting": {
            "class": GradientBoostingClassifier,
            "params": {"n_estimators": 100, "max_depth": 5, "random_state": 42}
        }
    }

    def __init__(self, experiment_name: str = "text-classification-benchmark"):
        self.experiment_name = experiment_name
        mlflow.set_experiment(experiment_name)
        self.results = []

    def prepare_data(self):
        """准备数据"""
        categories = ['sci.space', 'comp.graphics', 'rec.sport.baseball', 'talk.politics.misc']

        newsgroups = fetch_20newsgroups(
            subset='all', categories=categories,
            remove=('headers', 'footers', 'quotes')
        )

        # TF-IDF特征提取
        vectorizer = TfidfVectorizer(max_features=5000, stop_words='english')
        X = vectorizer.fit_transform(newsgroups.data)
        y = newsgroups.target

        self.X_train, self.X_test, self.y_train, self.y_test = \
            train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
        self.target_names = newsgroups.target_names
        self.vectorizer = vectorizer

        print(f"Train samples: {self.X_train.shape[0]}, Test samples: {self.X_test.shape[0]}")

    def train_model(self, model_name: str):
        """训练单个模型并记录到MLflow"""
        model_config = self.MODELS[model_name]

        with mlflow.start_run(run_name=model_name) as run:
            # 记录参数
            mlflow.log_params(model_config["params"])
            mlflow.set_tag("model_type", model_name)

            # 训练
            model = model_config["class"](**model_config["params"])
            model.fit(self.X_train, self.y_train)

            # 评估
            y_pred = model.predict(self.X_test)
            metrics = {
                "accuracy": accuracy_score(self.y_test, y_pred),
                "f1_macro": f1_score(self.y_test, y_pred, average="macro"),
                "f1_weighted": f1_score(self.y_test, y_pred, average="weighted"),
            }
            mlflow.log_metrics(metrics)

            # 记录分类报告
            report = classification_report(self.y_test, y_pred, target_names=self.target_names)
            mlflow.log_text(report, "classification_report.txt")

            # 记录模型
            mlflow.sklearn.log_model(model, "model")

            self.results.append({
                "name": model_name,
                "run_id": run.info.run_id,
                **metrics
            })

            print(f"  {model_name}: accuracy={metrics['accuracy']:.4f}, f1={metrics['f1_macro']:.4f}")

            return run.info.run_id

    def run_all(self):
        """运行所有模型"""
        print("=" * 60)
        print("Starting benchmark experiments...")
        print("=" * 60)

        self.prepare_data()

        for model_name in self.MODELS:
            self.train_model(model_name)

        # 找出最佳模型
        best = max(self.results, key=lambda x: x["f1_macro"])  # lambda匿名函数:简洁的单行函数
        print(f"\nBest model: {best['name']} (F1={best['f1_macro']:.4f})")

        return best

# 运行实验
# exp = TextClassificationExperiment()
# best_model = exp.run_all()

💼 面试常考题

Q1: MLflow和W&B有什么区别?各自适用什么场景?

:MLflow是开源的,可以本地或私有部署,适合对数据安全敏感的企业;W&B是云端SaaS服务(也有企业版),可视化能力更强,内置Sweep超参搜索。小团队/个人研究选W&B更省事,大企业/敏感数据选MLflow私有部署。

Q2: 为什么需要Model Registry?它解决了什么问题?

:Model Registry解决了"哪个模型在生产环境?""能否安全回滚?"的问题。它提供模型版本管理、别名管理(如 champion/challenger 别名标识生产与候选模型)、审批流程、元数据追踪,确保模型从开发到上线有完整的生命周期管理。注意:MLflow 2.9.0+ 已用别名系统替代旧的 Stage(Staging/Production)机制。

Q3: 如何确保ML实验的可复现性?

:五个层面:①固定随机种子(Python/Numpy/PyTorch/CUDA)②锁定环境依赖(pip freeze/Docker)③版本控制代码(Git)和数据(DVC)④记录完整配置(Hydra/配置文件)⑤记录硬件环境信息。缺一不可。

Q4: DVC和Git LFS有什么区别?为什么ML项目更适合用DVC?

:Git LFS只管大文件存储,DVC还提供数据Pipeline、实验管理、指标跟踪。DVC与Git深度集成但独立管理数据存储,支持多种后端(S3/GCS/Azure),且能通过dvc.yaml定义可复现的ML Pipeline。

Q5: 设计一个模型版本管理方案,支持A/B测试和快速回滚。

:使用Model Registry的别名系统(MLflow 2.9.0+): - 新模型注册后设置别名 challenger,表示候选模型 - 通过自动化门禁验证后,设置别名 canary,灰度发布5%流量 - 线上验证通过后,将别名 champion 指向新版本,全量上线 - 每个版本保留metadata和验证报告 - 回滚时直接将 champion 别名指向上一个版本即可 - 结合模型签名验证输入/输出Schema一致性

Q6: 如何管理大规模实验(1000+次)的组织和检索?

:①良好的命名规范(项目-模型-日期-hash)②Tag系统(目标、方法、数据集)③结构化搜索(MLflow search API)④定期清理无用实验⑤建立实验文档模板⑥使用W&B Report汇总关键实验。

Q7: MLflow Model的Signature有什么作用?

:Model Signature定义了模型输入/输出的Schema(列名、类型、形状),用于部署时的输入验证。如果输入数据不符合Signature,推理服务会拒绝请求并返回错误,防止"脏数据"导致生产事故。

Q8: 如何在团队中推行实验管理最佳实践?

:①建立统一的实验跟踪基础设施(共享MLflow/W&B服务器)②制定实验命名和Tag规范③提供标准训练模板(包含跟踪代码)④Code Review中检查实验跟踪完整性⑤定期进行实验复盘⑥将跟踪代码集成到CI/CD流程。


✅ 学习检查清单

  • 能使用MLflow记录参数、指标、模型和Artifact
  • 能使用W&B进行实验跟踪和超参搜索
  • 理解Model Registry的阶段管理流程
  • 能使用DVC进行数据版本控制和Pipeline管理
  • 能设置完整的随机种子管理确保可复现性
  • 理解Hydra/OmegaConf配置管理
  • 能设计完整的实验跟踪方案
  • 能完成模型从开发到注册的全流程
  • 理解DVC与MLflow的集成方式
  • 能回答所有面试题

📌 下一章02-模型部署与服务化 — 将训练好的模型部署为生产级推理服务