跳转至

11 - MLOps与模型部署

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

MLOps与部署图

🚀 从模型到产品

为什么需要MLOps?

Text Only
常见痛点:
❌ 训练环境与生产环境不一致
❌ 模型性能随时间退化
❌ 难以追踪模型版本
❌ 部署周期长,难以迭代
❌ 无法监控生产环境表现

MLOps解决方案:
✅ 标准化ML生命周期
✅ 自动化CI/CD流程
✅ 模型版本管理
✅ 持续监控与再训练
✅ 可复现性

📦 模型保存与加载

Scikit-learn模型

Python
import joblib
from sklearn.ensemble import RandomForestClassifier

# 训练模型
model = RandomForestClassifier()
model.fit(X_train, y_train)

# 保存
joblib.dump(model, 'model.joblib')

# 加载
loaded_model = joblib.load('model.joblib')
predictions = loaded_model.predict(X_test)

PyTorch模型

Python
import torch

# 保存整个模型
torch.save(model, 'model.pth')

# 保存模型参数 (推荐)
torch.save(model.state_dict(), 'model_weights.pth')

# 加载
model = MyModelClass()
model.load_state_dict(torch.load('model_weights.pth', weights_only=True))
model.eval()  # 设置为评估模式

ONNX (跨框架格式)

Python
import torch.onnx

# 导出为ONNX
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
    model,
    dummy_input,
    'model.onnx',
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'},
                 'output': {0: 'batch_size'}}
)

# 加载并运行
import onnxruntime as ort
session = ort.InferenceSession('model.onnx')
outputs = session.run(None, {'input': dummy_input.numpy()})

🐳 Docker容器化

Dockerfile

Docker
# 基础镜像
FROM python:3.9-slim

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .

# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 暴露端口
EXPOSE 8000

# 运行服务
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

requirements.txt

Text Only
fastapi==0.104.1
uvicorn==0.24.0
scikit-learn==1.3.2
pandas==2.1.3
numpy==1.26.2
joblib==1.3.2
pydantic==2.5.0

构建和运行

Bash
# 构建镜像
docker build -t ml-api:latest .

# 运行容器
docker run -p 8000:8000 ml-api:latest

# 后台运行
docker run -d -p 8000:8000 --name ml-api ml-api:latest

🌐 REST API部署

FastAPI示例

Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
import pandas as pd

app = FastAPI(title="ML Model API")

# 加载模型
model = joblib.load('model.joblib')
scaler = joblib.load('scaler.joblib')

# 定义输入数据格式
class PredictionRequest(BaseModel):
    features: list[float]

class PredictionResponse(BaseModel):
    prediction: float
    probability: float

@app.get("/")
def home():
    return {"message": "ML Model API is running"}

@app.get("/health")
def health_check():
    return {"status": "healthy"}

@app.post("/predict", response_model=PredictionResponse)
def predict(request: PredictionRequest):
    try:  # try/except捕获异常,防止程序崩溃
        # 转换输入
        features = np.array(request.features).reshape(1, -1)

        # 预处理
        features_scaled = scaler.transform(features)

        # 预测
        prediction = model.predict(features_scaled)[0]
        probability = model.predict_proba(features_scaled)[0].max()

        return PredictionResponse(
            prediction=float(prediction),
            probability=float(probability)
        )
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

# 批量预测
@app.post("/predict_batch")
def predict_batch(requests: list[PredictionRequest]):
    results = []
    for req in requests:
        features = np.array(req.features).reshape(1, -1)
        features_scaled = scaler.transform(features)
        prediction = model.predict(features_scaled)[0]
        results.append({"prediction": int(prediction)})
    return {"results": results}

运行服务

Bash
# 开发环境
uvicorn main:app --reload --port 8000

# 生产环境
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

☁️ 云平台部署

AWS SageMaker

Python
import boto3
import sagemaker
from sagemaker.sklearn.model import SKLearnModel

# 创建SageMaker会话
sagemaker_session = sagemaker.Session()
role = "arn:aws:iam::account-id:role/your-role"

# 上传模型
model_data = sagemaker_session.upload_data(
    path="model.joblib",
    bucket="your-bucket",
    key_prefix="model"
)

# 部署模型
sklearn_model = SKLearnModel(
    model_data=model_data,
    role=role,
    entry_point="inference.py",
    framework_version="1.0-1"
)

predictor = sklearn_model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large"
)

# 预测
result = predictor.predict(X_test)

Google Cloud AI Platform

Bash
# 上传模型到GCS
gsutil cp model.joblib gs://your-bucket/model/

# 部署模型
gcloud ai-platform models create your_model \
  --regions=us-central1

gcloud ai-platform versions create v1 \
  --model=your_model \
  --origin=gs://your-bucket/model/ \
  --runtime-version=2.5 \
  --python-version=3.9 \
  --framework=scikit-learn

Azure ML

Python
from azureml.core import Workspace, Model, Webservice
from azureml.core.webservice import AciWebservice
from azureml.core.model import InferenceConfig

# 连接工作区
ws = Workspace.from_config()

# 注册模型
model = Model.register(
    workspace=ws,
    model_path="model.joblib",
    model_name="my-model"
)

# 配置推理
inference_config = InferenceConfig(
    entry_script="score.py",
    environment=my_env
)

# 部署
deployment_config = AciWebservice.deploy_configuration(
    cpu_cores=1,
    memory_gb=4
)

service = Model.deploy(
    ws,
    "myservice",
    [model],
    inference_config,
    deployment_config
)

service.wait_for_deployment(show_output=True)

🔄 CI/CD流水线

GitHub Actions示例

YAML
# .github/workflows/ml-ci-cd.yml

name: ML CI/CD Pipeline

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  train-and-evaluate:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4

    - name: Set up Python
      uses: actions/setup-python@v5
      with:
        python-version: 3.9

    - name: Install dependencies
      run: |
        pip install -r requirements.txt

    - name: Run tests
      run: |
        pytest tests/

    - name: Train model
      run: |
        python train.py

    - name: Evaluate model
      run: |
        python evaluate.py

    - name: Upload model artifact
      uses: actions/upload-artifact@v4
      with:
        name: model
        path: model.joblib

  deploy:
    needs: train-and-evaluate
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
    - name: Download model
      uses: actions/download-artifact@v4
      with:
        name: model

    - name: Deploy to production
      run: |
        # 部署脚本
        ./deploy.sh

📊 模型监控

监控指标

Python
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# 定义指标
prediction_counter = Counter(
    'predictions_total',
    'Total number of predictions'
)

prediction_latency = Histogram(
    'prediction_latency_seconds',
    'Prediction latency'
)

model_confidence = Gauge(
    'model_confidence',
    'Model prediction confidence'
)

data_drift_score = Gauge(
    'data_drift_score',
    'Data drift detection score'
)

# 在预测时记录
import time

def predict_with_monitoring(features):
    start_time = time.time()

    # 预测
    result = model.predict(features)

    # 记录指标
    prediction_counter.inc()
    latency = time.time() - start_time
    prediction_latency.observe(latency)

    if hasattr(result, 'max'):  # hasattr检查对象是否有指定属性
        model_confidence.set(result.max())

    return result

# 启动监控服务器
start_http_server(8001)

数据漂移检测

Python
from alibi_detect.cd import ChiSquareDrift, KSDrift

# 准备参考数据和当前数据
X_ref = X_train  # 训练数据作为参考
X_current = X_test  # 当前数据

# 检测分类变量漂移
dd = ChiSquareDrift(
    X_ref,
    p_val=0.05  # 显著性水平
)

labels = dd.predict(X_current)
print(f"漂移检测结果: {labels['data']['is_drift']}")

# 检测连续变量漂移(Kolmogorov-Smirnov检验)
cd = KSDrift(X_ref, p_val=0.05)
labels = cd.predict(X_current)

性能监控

Python
import mlflow

def log_metrics(model, X_test, y_test):
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)

    # 记录到MLflow
    mlflow.log_metrics({
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": f1
    })

    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1
    }

📋 实验追踪

MLflow示例

Python
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier

# 启动MLflow UI
# mlflow ui

# 设置实验
mlflow.set_experiment("My Experiment")

# 训练并记录
with mlflow.start_run():
    # 记录参数
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("max_depth", 10)
    mlflow.log_param("learning_rate", 0.1)

    # 训练模型
    model = RandomForestClassifier(
        n_estimators=100,
        max_depth=10
    )
    model.fit(X_train, y_train)

    # 记录指标
    accuracy = model.score(X_test, y_test)
    mlflow.log_metric("accuracy", accuracy)

    # 记录模型
    mlflow.sklearn.log_model(model, "model")

    # 记录artifacts
    mlflow.log_artifact("config.json")

# 查看实验
# mlflow ui
# 访问 http://localhost:5000

🔄 A/B测试

部署多个模型版本

Python
from fastapi import FastAPI
import random

app = FastAPI()

# 加载不同版本
model_v1 = joblib.load("model_v1.joblib")
model_v2 = joblib.load("model_v2.joblib")

@app.post("/predict")
def predict(features: list[float]):
    # A/B测试: 50%流量到新模型
    if random.random() < 0.5:
        model = model_v1
        version = "v1"
    else:
        model = model_v2
        version = "v2"

    prediction = model.predict([features])[0]

    # 记录版本和结果
    log_prediction(version, features, prediction)

    return {"prediction": prediction, "model_version": version}

🛡️ 安全与隐私

API认证

Python
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):  # async def定义异步函数
    token = credentials.credentials
    if token != "your-secret-token":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials"
        )
    return token

@app.post("/predict")
async def predict(
    request: PredictionRequest,
    token: str = Depends(verify_token)
):
    # 预测逻辑
    pass

输入验证

Python
from pydantic import BaseModel, field_validator

class PredictionRequest(BaseModel):
    age: int
    income: float
    score: float

    @field_validator('age')
    @classmethod
    def validate_age(cls, v):
        if v < 0 or v > 120:
            raise ValueError('Age must be between 0 and 120')
        return v

    @field_validator('income')
    @classmethod
    def validate_income(cls, v):
        if v < 0:
            raise ValueError('Income cannot be negative')
        return v

🚀 性能优化

批量预测

Python
@app.post("/predict_batch")
async def predict_batch(requests: list[PredictionRequest]):
    # 批量处理,提高吞吐量
    features_batch = np.array([req.features for req in requests])

    # 批量预测
    predictions_batch = model.predict(features_batch)

    return [
        {"prediction": int(pred)}
        for pred in predictions_batch
    ]

缓存

Python
from functools import lru_cache
from fastapi import FastAPI

app = FastAPI()

@lru_cache(maxsize=1000)
def get_prediction(features_tuple):
    # 转换为tuple以支持缓存
    features = list(features_tuple)
    return model.predict([features])[0]

@app.post("/predict")
async def predict(request: PredictionRequest):
    features_tuple = tuple(request.features)
    prediction = get_prediction(features_tuple)
    return {"prediction": prediction}

📈 最佳实践清单

模型开发

Text Only
□ 使用版本控制 (Git)
□ 记录所有超参数
□ 保存训练数据和配置
□ 建立基准模型
□ 进行交叉验证

模型部署

Text Only
□ API文档清晰
□ 输入验证
□ 错误处理
□ 日志记录
□ 监控指标

生产环境

Text Only
□ 健康检查端点
□ 优雅重启
□ 资源限制
□ 自动扩缩容
□ 备份与恢复

安全

Text Only
□ 身份认证
□ 速率限制
□ 数据加密
□ 输入清理
□ 访问控制

💡 实战建议

从小开始

Text Only
第1步: 本地API
→ Flask/FastAPI
→ 本地测试

第2步: Docker容器化
→ 保证环境一致性
→ 易于部署

第3步: 云平台部署
→ AWS/GCP/Azure
→ 托管服务

第4步: 监控与迭代
→ 持续监控
→ 定期重训练

成熟度模型

Text Only
Level 1: 手动部署
→ 手动训练,手动部署

Level 2: 自动化训练
→ CI/CD自动化训练流程

Level 3: 自动化部署
→ 模型自动部署到生产

Level 4: 持续监控与重训练
→ 自动监控性能,触发重训练

Level 5: 完全MLOps
→ 端到端自动化,持续优化

下一步: 查看 README.md,规划你的学习路线!