跳转至

🔄 ML流水线与CI/CD

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

ML流水线与CICD流程图

学习时间:8小时 | 难度:⭐⭐⭐⭐ 中高级 | 前置知识:Docker、Git、MLflow(Ch01)、模型部署(Ch02)


本章目标

  • 理解ML流水线的设计原则与主流编排工具
  • 掌握Kubeflow Pipelines与Airflow的ML流水线编排
  • 学会ML项目的CI/CD自动化(模型验证→注册→部署)
  • 了解GitOps for ML与模型审批流程

1. ML流水线基础

1.1 为什么需要流水线

Text Only
手动流程的问题:
├── 数据科学家在Notebook中训练 → 工程师手动复现 → 部署
├── 步骤不可复现,依赖隐式
├── 无法自动重训练
└── 模型回溯困难

流水线解决:
├── 每步定义为独立组件(可复用、可缓存)
├── DAG编排自动执行
├── 全流程版本化 + 日志
└── 支持自动触发(定时 / 数据漂移 / API触发)

1.2 流水线工具对比

工具 定位 部署方式 适合场景 学习成本
Kubeflow Pipelines ML原生 Kubernetes 大规模ML团队
Apache Airflow 通用调度 独立/K8s 数据+ML混合
Prefect 现代调度 云原生 中小团队
MLflow Pipelines 轻量ML 本地/远程 快速原型
ZenML ML抽象层 多后端 可移植流水线

2. Kubeflow Pipelines实战

2.1 Pipeline组件定义

Python
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn"]
)
def preprocess_data(
    raw_data_path: str,
    processed_data: Output[Dataset],
    test_data: Output[Dataset],
    test_size: float = 0.2,
):
    """数据预处理组件"""
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(raw_data_path)

    # 清洗
    df = df.dropna()
    df["feature_normalized"] = (df["feature"] - df["feature"].mean()) / df["feature"].std()

    # 分割
    train_df, test_df = train_test_split(df, test_size=test_size, random_state=42)

    train_df.to_csv(processed_data.path, index=False)
    test_df.to_csv(test_data.path, index=False)

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def train_model(
    train_data: Input[Dataset],
    model_artifact: Output[Model],
    metrics: Output[Metrics],
    n_estimators: int = 100,
):
    """模型训练组件"""
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    import joblib

    df = pd.read_csv(train_data.path)
    X = df.drop("label", axis=1)
    y = df["label"]

    model = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
    model.fit(X, y)

    # 记录指标
    train_acc = accuracy_score(y, model.predict(X))
    metrics.log_metric("train_accuracy", train_acc)
    metrics.log_metric("n_estimators", n_estimators)

    joblib.dump(model, model_artifact.path)

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def evaluate_model(
    model_artifact: Input[Model],
    test_data: Input[Dataset],
    metrics: Output[Metrics],
    accuracy_threshold: float = 0.8,
) -> bool:
    """模型评估组件(含质量门禁)"""
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score, f1_score

    model = joblib.load(model_artifact.path)
    df = pd.read_csv(test_data.path)

    X = df.drop("label", axis=1)
    y = df["label"]

    preds = model.predict(X)
    acc = accuracy_score(y, preds)
    f1 = f1_score(y, preds, average="weighted")

    metrics.log_metric("test_accuracy", acc)
    metrics.log_metric("test_f1", f1)
    metrics.log_metric("passed_threshold", acc >= accuracy_threshold)

    return acc >= accuracy_threshold  # 质量门禁

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["google-cloud-storage"]
)
def deploy_model(
    model_artifact: Input[Model],
):
    """模型部署组件(通过门禁后执行)"""
    import logging
    logging.info(f"Deploying model from {model_artifact.path}")
    # 实际部署逻辑示例:
    # 1. 推送模型到注册表(MLflow / 云端 Model Registry)
    # 2. 更新 K8s Deployment 的镜像版本
    # 3. 执行冒烟测试验证服务可用性

2.2 Pipeline编排

Python
@dsl.pipeline(
    name="ml-training-pipeline",
    description="端到端ML训练流水线(含质量门禁)"
)
def training_pipeline(
    raw_data_path: str = "gs://my-bucket/data/train.csv",
    n_estimators: int = 100,
    accuracy_threshold: float = 0.8,
):
    # Step 1: 数据预处理
    preprocess_task = preprocess_data(raw_data_path=raw_data_path)

    # Step 2: 模型训练
    train_task = train_model(
        train_data=preprocess_task.outputs["processed_data"],
        n_estimators=n_estimators,
    )

    # Step 3: 评估(含门禁)
    eval_task = evaluate_model(
        model_artifact=train_task.outputs["model_artifact"],
        test_data=preprocess_task.outputs["test_data"],
        accuracy_threshold=accuracy_threshold,
    )

    # Step 4: 条件部署(仅通过门禁时)
    with dsl.If(eval_task.output == True):
        deploy_model(model_artifact=train_task.outputs["model_artifact"])

# 编译并提交
from kfp import compiler
compiler.Compiler().compile(training_pipeline, "pipeline.yaml")

3. ML项目CI/CD

3.1 GitHub Actions实现ML CI/CD

YAML
# .github/workflows/ml-cicd.yml
name: ML Training & Deployment Pipeline

on:
  push:
    paths:
      - 'src/**'
      - 'configs/**'
      - 'data/version.txt'  # 数据版本变更时触发

jobs:
  # 阶段1: 代码质量检查
  code-quality:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      - run: |
          pip install ruff pytest
          ruff check src/
          pytest tests/unit/ -v

  # 阶段2: 模型训练
  train:
    needs: code-quality
    runs-on: ubuntu-latest  # 生产中用self-hosted GPU runner
    steps:
      - uses: actions/checkout@v4
      - name: Train Model
        run: |
          pip install -r requirements.txt
          python src/train.py --config configs/prod.yaml
      - name: Upload Model Artifact
        uses: actions/upload-artifact@v4
        with:
          name: model-${{ github.sha }}
          path: outputs/model/

  # 阶段3: 模型评估 + 质量门禁
  evaluate:
    needs: train
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/download-artifact@v4
        with:
          name: model-${{ github.sha }}
          path: outputs/model/
      - name: Evaluate Model
        id: eval
        run: |
          pip install -r requirements.txt
          python src/evaluate.py --model-dir outputs/model/ \
            --test-data data/test.csv \
            --output metrics.json

          # 读取指标并检查门禁
          ACCURACY=$(python -c "import json; print(json.load(open('metrics.json'))['accuracy'])")
          echo "accuracy=$ACCURACY" >> $GITHUB_OUTPUT

          if (( $(echo "$ACCURACY < 0.85" | bc -l) )); then
            echo "❌ 模型准确率 $ACCURACY 低于阈值 0.85"
            exit 1
          fi

      - name: Comment PR with Metrics
        if: github.event_name == 'pull_request'
        uses: actions/github-script@v7
        with:
          script: |
            const fs = require('fs');
            const metrics = JSON.parse(fs.readFileSync('metrics.json'));
            github.rest.issues.createComment({
              owner: context.repo.owner,
              repo: context.repo.repo,
              issue_number: context.issue.number,
              body: `## 🤖 Model Evaluation Report
              | Metric | Value |
              |--------|-------|
              | Accuracy | ${metrics.accuracy.toFixed(4)} |
              | F1 Score | ${metrics.f1.toFixed(4)} |
              | Latency P95 | ${metrics.latency_p95}ms |`
            });

  # 阶段4: 模型注册 + 部署
  deploy:
    needs: evaluate
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Register Model
        run: |
          python src/register_model.py \
            --model-dir outputs/model/ \
            --version ${{ github.sha }} \
            --registry mlflow

      - name: Deploy to Staging
        run: |
          kubectl set image deployment/ml-serving \
            model-server=myregistry/model:${{ github.sha }} \
            --namespace staging

      - name: Smoke Test
        run: |
          sleep 30
          python src/smoke_test.py --endpoint http://staging-ml.internal/predict

      - name: Deploy to Production (Blue-Green)
        run: |
          python src/deploy_bluegreen.py \
            --image myregistry/model:${{ github.sha }} \
            --namespace production

3.2 模型验证脚本

Python
"""模型质量门禁: 新模型必须优于当前生产模型"""
import json
import argparse
import mlflow

def validate_model(new_metrics_path, production_model_name):
    """对比新模型与生产模型"""

    # 读取新模型指标
    with open(new_metrics_path) as f:  # with自动管理资源,确保文件正确关闭
        new_metrics = json.load(f)  # json.loads将JSON字符串转为Python对象

    # 获取当前生产模型指标(使用别名系统,替代已废弃的 Stage API)
    client = mlflow.tracking.MlflowClient()
    try:  # try/except捕获异常
        prod_mv = client.get_model_version_by_alias(production_model_name, "champion")
    except mlflow.exceptions.MlflowException:
        print("无生产模型,新模型自动通过")
        return True

    prod_run = client.get_run(prod_mv.run_id)
    prod_accuracy = float(prod_run.data.metrics.get("test_accuracy", 0))

    # 门禁规则
    checks = {
        "accuracy_improvement": new_metrics["accuracy"] >= prod_accuracy - 0.01,  # 允许1%下降
        "latency_p95": new_metrics["latency_p95"] < 100,  # P95延迟<100ms
        "min_accuracy": new_metrics["accuracy"] >= 0.85,   # 绝对下限
    }

    print(f"新模型准确率: {new_metrics['accuracy']:.4f}")
    print(f"生产模型准确率: {prod_accuracy:.4f}")

    for check, passed in checks.items():
        status = "✅" if passed else "❌"
        print(f"  {status} {check}: {passed}")

    return all(checks.values())

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--metrics", required=True)
    parser.add_argument("--production-model", required=True)
    args = parser.parse_args()

    passed = validate_model(args.metrics, args.production_model)
    exit(0 if passed else 1)

4. 自动重训练触发

4.1 漂移触发重训练

Python
from datetime import datetime

class RetrainingTrigger:
    """自动重训练触发器"""

    def __init__(self, config):
        self.config = config
        self.last_retrain = datetime.now()

    def should_retrain(self, monitoring_metrics: dict) -> tuple:
        """判断是否需要重训练"""
        reasons = []

        # 1. 数据漂移超阈值
        if monitoring_metrics.get("data_drift_score", 0) > self.config["drift_threshold"]:
            reasons.append(f"数据漂移分数 {monitoring_metrics['data_drift_score']:.3f} > 阈值")

        # 2. 性能衰退
        if monitoring_metrics.get("accuracy_7d", 1.0) < self.config["min_accuracy"]:
            reasons.append(f"7日准确率 {monitoring_metrics['accuracy_7d']:.3f} 低于下限")

        # 3. 定时重训练(兜底)
        days_since = (datetime.now() - self.last_retrain).days
        if days_since > self.config["max_days_without_retrain"]:
            reasons.append(f"距上次训练已 {days_since} 天")

        return len(reasons) > 0, reasons

    def trigger_pipeline(self, reasons):
        """触发Kubeflow Pipeline"""
        from kfp.client import Client

        client = Client(host=self.config["kfp_endpoint"])
        run = client.create_run_from_pipeline_package(
            "pipeline.yaml",
            arguments={"trigger_reasons": ", ".join(reasons)},
            run_name=f"auto-retrain-{datetime.now().strftime('%Y%m%d-%H%M')}"
        )
        return run.run_id

# 使用
config = {
    "drift_threshold": 0.3,
    "min_accuracy": 0.85,
    "max_days_without_retrain": 30,
    "kfp_endpoint": "http://kubeflow.internal"
}

trigger = RetrainingTrigger(config)
should, reasons = trigger.should_retrain({
    "data_drift_score": 0.42,
    "accuracy_7d": 0.83
})

if should:
    print(f"触发重训练: {reasons}")
    trigger.trigger_pipeline(reasons)

5. 练习题

代码实践

  1. 入门:用GitHub Actions实现一个简单的ML CI(代码检查→训练→评估→输出报告)
  2. 进阶:实现模型质量门禁(新模型 vs 生产模型对比)
  3. 高级:搭建完整的Kubeflow Pipeline(数据预处理→训练→评估→条件部署→监控)

面试题

  1. ML的CI/CD和传统软件CI/CD有什么区别?
  2. 如何设计模型的灰度发布/金丝雀部署策略?
  3. 自动重训练的触发条件应该怎么设计?
  4. 如何保证ML流水线的可复现性?
  5. 模型版本回滚的策略是什么?

最后更新:2026年2月