🔄 ML流水线与CI/CD¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
学习时间:8小时 | 难度:⭐⭐⭐⭐ 中高级 | 前置知识:Docker、Git、MLflow(Ch01)、模型部署(Ch02)
本章目标¶
- 理解ML流水线的设计原则与主流编排工具
- 掌握Kubeflow Pipelines与Airflow的ML流水线编排
- 学会ML项目的CI/CD自动化(模型验证→注册→部署)
- 了解GitOps for ML与模型审批流程
1. ML流水线基础¶
1.1 为什么需要流水线¶
Text Only
手动流程的问题:
├── 数据科学家在Notebook中训练 → 工程师手动复现 → 部署
├── 步骤不可复现,依赖隐式
├── 无法自动重训练
└── 模型回溯困难
流水线解决:
├── 每步定义为独立组件(可复用、可缓存)
├── DAG编排自动执行
├── 全流程版本化 + 日志
└── 支持自动触发(定时 / 数据漂移 / API触发)
1.2 流水线工具对比¶
| 工具 | 定位 | 部署方式 | 适合场景 | 学习成本 |
|---|---|---|---|---|
| Kubeflow Pipelines | ML原生 | Kubernetes | 大规模ML团队 | 高 |
| Apache Airflow | 通用调度 | 独立/K8s | 数据+ML混合 | 中 |
| Prefect | 现代调度 | 云原生 | 中小团队 | 低 |
| MLflow Pipelines | 轻量ML | 本地/远程 | 快速原型 | 低 |
| ZenML | ML抽象层 | 多后端 | 可移植流水线 | 中 |
2. Kubeflow Pipelines实战¶
2.1 Pipeline组件定义¶
Python
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn"]
)
def preprocess_data(
raw_data_path: str,
processed_data: Output[Dataset],
test_data: Output[Dataset],
test_size: float = 0.2,
):
"""数据预处理组件"""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(raw_data_path)
# 清洗
df = df.dropna()
df["feature_normalized"] = (df["feature"] - df["feature"].mean()) / df["feature"].std()
# 分割
train_df, test_df = train_test_split(df, test_size=test_size, random_state=42)
train_df.to_csv(processed_data.path, index=False)
test_df.to_csv(test_data.path, index=False)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def train_model(
train_data: Input[Dataset],
model_artifact: Output[Model],
metrics: Output[Metrics],
n_estimators: int = 100,
):
"""模型训练组件"""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import joblib
df = pd.read_csv(train_data.path)
X = df.drop("label", axis=1)
y = df["label"]
model = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
model.fit(X, y)
# 记录指标
train_acc = accuracy_score(y, model.predict(X))
metrics.log_metric("train_accuracy", train_acc)
metrics.log_metric("n_estimators", n_estimators)
joblib.dump(model, model_artifact.path)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "joblib"]
)
def evaluate_model(
model_artifact: Input[Model],
test_data: Input[Dataset],
metrics: Output[Metrics],
accuracy_threshold: float = 0.8,
) -> bool:
"""模型评估组件(含质量门禁)"""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, f1_score
model = joblib.load(model_artifact.path)
df = pd.read_csv(test_data.path)
X = df.drop("label", axis=1)
y = df["label"]
preds = model.predict(X)
acc = accuracy_score(y, preds)
f1 = f1_score(y, preds, average="weighted")
metrics.log_metric("test_accuracy", acc)
metrics.log_metric("test_f1", f1)
metrics.log_metric("passed_threshold", acc >= accuracy_threshold)
return acc >= accuracy_threshold # 质量门禁
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["google-cloud-storage"]
)
def deploy_model(
model_artifact: Input[Model],
):
"""模型部署组件(通过门禁后执行)"""
import logging
logging.info(f"Deploying model from {model_artifact.path}")
# 实际部署逻辑示例:
# 1. 推送模型到注册表(MLflow / 云端 Model Registry)
# 2. 更新 K8s Deployment 的镜像版本
# 3. 执行冒烟测试验证服务可用性
2.2 Pipeline编排¶
Python
@dsl.pipeline(
name="ml-training-pipeline",
description="端到端ML训练流水线(含质量门禁)"
)
def training_pipeline(
raw_data_path: str = "gs://my-bucket/data/train.csv",
n_estimators: int = 100,
accuracy_threshold: float = 0.8,
):
# Step 1: 数据预处理
preprocess_task = preprocess_data(raw_data_path=raw_data_path)
# Step 2: 模型训练
train_task = train_model(
train_data=preprocess_task.outputs["processed_data"],
n_estimators=n_estimators,
)
# Step 3: 评估(含门禁)
eval_task = evaluate_model(
model_artifact=train_task.outputs["model_artifact"],
test_data=preprocess_task.outputs["test_data"],
accuracy_threshold=accuracy_threshold,
)
# Step 4: 条件部署(仅通过门禁时)
with dsl.If(eval_task.output == True):
deploy_model(model_artifact=train_task.outputs["model_artifact"])
# 编译并提交
from kfp import compiler
compiler.Compiler().compile(training_pipeline, "pipeline.yaml")
3. ML项目CI/CD¶
3.1 GitHub Actions实现ML CI/CD¶
YAML
# .github/workflows/ml-cicd.yml
name: ML Training & Deployment Pipeline
on:
push:
paths:
- 'src/**'
- 'configs/**'
- 'data/version.txt' # 数据版本变更时触发
jobs:
# 阶段1: 代码质量检查
code-quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: |
pip install ruff pytest
ruff check src/
pytest tests/unit/ -v
# 阶段2: 模型训练
train:
needs: code-quality
runs-on: ubuntu-latest # 生产中用self-hosted GPU runner
steps:
- uses: actions/checkout@v4
- name: Train Model
run: |
pip install -r requirements.txt
python src/train.py --config configs/prod.yaml
- name: Upload Model Artifact
uses: actions/upload-artifact@v4
with:
name: model-${{ github.sha }}
path: outputs/model/
# 阶段3: 模型评估 + 质量门禁
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/download-artifact@v4
with:
name: model-${{ github.sha }}
path: outputs/model/
- name: Evaluate Model
id: eval
run: |
pip install -r requirements.txt
python src/evaluate.py --model-dir outputs/model/ \
--test-data data/test.csv \
--output metrics.json
# 读取指标并检查门禁
ACCURACY=$(python -c "import json; print(json.load(open('metrics.json'))['accuracy'])")
echo "accuracy=$ACCURACY" >> $GITHUB_OUTPUT
if (( $(echo "$ACCURACY < 0.85" | bc -l) )); then
echo "❌ 模型准确率 $ACCURACY 低于阈值 0.85"
exit 1
fi
- name: Comment PR with Metrics
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const metrics = JSON.parse(fs.readFileSync('metrics.json'));
github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: `## 🤖 Model Evaluation Report
| Metric | Value |
|--------|-------|
| Accuracy | ${metrics.accuracy.toFixed(4)} |
| F1 Score | ${metrics.f1.toFixed(4)} |
| Latency P95 | ${metrics.latency_p95}ms |`
});
# 阶段4: 模型注册 + 部署
deploy:
needs: evaluate
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Register Model
run: |
python src/register_model.py \
--model-dir outputs/model/ \
--version ${{ github.sha }} \
--registry mlflow
- name: Deploy to Staging
run: |
kubectl set image deployment/ml-serving \
model-server=myregistry/model:${{ github.sha }} \
--namespace staging
- name: Smoke Test
run: |
sleep 30
python src/smoke_test.py --endpoint http://staging-ml.internal/predict
- name: Deploy to Production (Blue-Green)
run: |
python src/deploy_bluegreen.py \
--image myregistry/model:${{ github.sha }} \
--namespace production
3.2 模型验证脚本¶
Python
"""模型质量门禁: 新模型必须优于当前生产模型"""
import json
import argparse
import mlflow
def validate_model(new_metrics_path, production_model_name):
"""对比新模型与生产模型"""
# 读取新模型指标
with open(new_metrics_path) as f: # with自动管理资源,确保文件正确关闭
new_metrics = json.load(f) # json.loads将JSON字符串转为Python对象
# 获取当前生产模型指标(使用别名系统,替代已废弃的 Stage API)
client = mlflow.tracking.MlflowClient()
try: # try/except捕获异常
prod_mv = client.get_model_version_by_alias(production_model_name, "champion")
except mlflow.exceptions.MlflowException:
print("无生产模型,新模型自动通过")
return True
prod_run = client.get_run(prod_mv.run_id)
prod_accuracy = float(prod_run.data.metrics.get("test_accuracy", 0))
# 门禁规则
checks = {
"accuracy_improvement": new_metrics["accuracy"] >= prod_accuracy - 0.01, # 允许1%下降
"latency_p95": new_metrics["latency_p95"] < 100, # P95延迟<100ms
"min_accuracy": new_metrics["accuracy"] >= 0.85, # 绝对下限
}
print(f"新模型准确率: {new_metrics['accuracy']:.4f}")
print(f"生产模型准确率: {prod_accuracy:.4f}")
for check, passed in checks.items():
status = "✅" if passed else "❌"
print(f" {status} {check}: {passed}")
return all(checks.values())
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--metrics", required=True)
parser.add_argument("--production-model", required=True)
args = parser.parse_args()
passed = validate_model(args.metrics, args.production_model)
exit(0 if passed else 1)
4. 自动重训练触发¶
4.1 漂移触发重训练¶
Python
from datetime import datetime
class RetrainingTrigger:
"""自动重训练触发器"""
def __init__(self, config):
self.config = config
self.last_retrain = datetime.now()
def should_retrain(self, monitoring_metrics: dict) -> tuple:
"""判断是否需要重训练"""
reasons = []
# 1. 数据漂移超阈值
if monitoring_metrics.get("data_drift_score", 0) > self.config["drift_threshold"]:
reasons.append(f"数据漂移分数 {monitoring_metrics['data_drift_score']:.3f} > 阈值")
# 2. 性能衰退
if monitoring_metrics.get("accuracy_7d", 1.0) < self.config["min_accuracy"]:
reasons.append(f"7日准确率 {monitoring_metrics['accuracy_7d']:.3f} 低于下限")
# 3. 定时重训练(兜底)
days_since = (datetime.now() - self.last_retrain).days
if days_since > self.config["max_days_without_retrain"]:
reasons.append(f"距上次训练已 {days_since} 天")
return len(reasons) > 0, reasons
def trigger_pipeline(self, reasons):
"""触发Kubeflow Pipeline"""
from kfp.client import Client
client = Client(host=self.config["kfp_endpoint"])
run = client.create_run_from_pipeline_package(
"pipeline.yaml",
arguments={"trigger_reasons": ", ".join(reasons)},
run_name=f"auto-retrain-{datetime.now().strftime('%Y%m%d-%H%M')}"
)
return run.run_id
# 使用
config = {
"drift_threshold": 0.3,
"min_accuracy": 0.85,
"max_days_without_retrain": 30,
"kfp_endpoint": "http://kubeflow.internal"
}
trigger = RetrainingTrigger(config)
should, reasons = trigger.should_retrain({
"data_drift_score": 0.42,
"accuracy_7d": 0.83
})
if should:
print(f"触发重训练: {reasons}")
trigger.trigger_pipeline(reasons)
5. 练习题¶
代码实践¶
- 入门:用GitHub Actions实现一个简单的ML CI(代码检查→训练→评估→输出报告)
- 进阶:实现模型质量门禁(新模型 vs 生产模型对比)
- 高级:搭建完整的Kubeflow Pipeline(数据预处理→训练→评估→条件部署→监控)
面试题¶
- ML的CI/CD和传统软件CI/CD有什么区别?
- 如何设计模型的灰度发布/金丝雀部署策略?
- 自动重训练的触发条件应该怎么设计?
- 如何保证ML流水线的可复现性?
- 模型版本回滚的策略是什么?
最后更新:2026年2月