11 - MLOps与模型部署¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
🚀 从模型到产品¶
为什么需要MLOps?¶
Text Only
常见痛点:
❌ 训练环境与生产环境不一致
❌ 模型性能随时间退化
❌ 难以追踪模型版本
❌ 部署周期长,难以迭代
❌ 无法监控生产环境表现
MLOps解决方案:
✅ 标准化ML生命周期
✅ 自动化CI/CD流程
✅ 模型版本管理
✅ 持续监控与再训练
✅ 可复现性
📦 模型保存与加载¶
Scikit-learn模型¶
Python
import joblib
from sklearn.ensemble import RandomForestClassifier
# 训练模型
model = RandomForestClassifier()
model.fit(X_train, y_train)
# 保存
joblib.dump(model, 'model.joblib')
# 加载
loaded_model = joblib.load('model.joblib')
predictions = loaded_model.predict(X_test)
PyTorch模型¶
Python
import torch
# 保存整个模型
torch.save(model, 'model.pth')
# 保存模型参数 (推荐)
torch.save(model.state_dict(), 'model_weights.pth')
# 加载
model = MyModelClass()
model.load_state_dict(torch.load('model_weights.pth', weights_only=True))
model.eval() # 设置为评估模式
ONNX (跨框架格式)¶
Python
import torch.onnx
# 导出为ONNX
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
model,
dummy_input,
'model.onnx',
input_names=['input'],
output_names=['output'],
dynamic_axes={'input': {0: 'batch_size'},
'output': {0: 'batch_size'}}
)
# 加载并运行
import onnxruntime as ort
session = ort.InferenceSession('model.onnx')
outputs = session.run(None, {'input': dummy_input.numpy()})
🐳 Docker容器化¶
Dockerfile¶
Docker
# 基础镜像
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 暴露端口
EXPOSE 8000
# 运行服务
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
requirements.txt¶
Text Only
fastapi==0.104.1
uvicorn==0.24.0
scikit-learn==1.3.2
pandas==2.1.3
numpy==1.26.2
joblib==1.3.2
pydantic==2.5.0
构建和运行¶
Bash
# 构建镜像
docker build -t ml-api:latest .
# 运行容器
docker run -p 8000:8000 ml-api:latest
# 后台运行
docker run -d -p 8000:8000 --name ml-api ml-api:latest
🌐 REST API部署¶
FastAPI示例¶
Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
import pandas as pd
app = FastAPI(title="ML Model API")
# 加载模型
model = joblib.load('model.joblib')
scaler = joblib.load('scaler.joblib')
# 定义输入数据格式
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: float
probability: float
@app.get("/")
def home():
return {"message": "ML Model API is running"}
@app.get("/health")
def health_check():
return {"status": "healthy"}
@app.post("/predict", response_model=PredictionResponse)
def predict(request: PredictionRequest):
try: # try/except捕获异常,防止程序崩溃
# 转换输入
features = np.array(request.features).reshape(1, -1)
# 预处理
features_scaled = scaler.transform(features)
# 预测
prediction = model.predict(features_scaled)[0]
probability = model.predict_proba(features_scaled)[0].max()
return PredictionResponse(
prediction=float(prediction),
probability=float(probability)
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# 批量预测
@app.post("/predict_batch")
def predict_batch(requests: list[PredictionRequest]):
results = []
for req in requests:
features = np.array(req.features).reshape(1, -1)
features_scaled = scaler.transform(features)
prediction = model.predict(features_scaled)[0]
results.append({"prediction": int(prediction)})
return {"results": results}
运行服务¶
Bash
# 开发环境
uvicorn main:app --reload --port 8000
# 生产环境
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
☁️ 云平台部署¶
AWS SageMaker¶
Python
import boto3
import sagemaker
from sagemaker.sklearn.model import SKLearnModel
# 创建SageMaker会话
sagemaker_session = sagemaker.Session()
role = "arn:aws:iam::account-id:role/your-role"
# 上传模型
model_data = sagemaker_session.upload_data(
path="model.joblib",
bucket="your-bucket",
key_prefix="model"
)
# 部署模型
sklearn_model = SKLearnModel(
model_data=model_data,
role=role,
entry_point="inference.py",
framework_version="1.0-1"
)
predictor = sklearn_model.deploy(
initial_instance_count=1,
instance_type="ml.m5.large"
)
# 预测
result = predictor.predict(X_test)
Google Cloud AI Platform¶
Bash
# 上传模型到GCS
gsutil cp model.joblib gs://your-bucket/model/
# 部署模型
gcloud ai-platform models create your_model \
--regions=us-central1
gcloud ai-platform versions create v1 \
--model=your_model \
--origin=gs://your-bucket/model/ \
--runtime-version=2.5 \
--python-version=3.9 \
--framework=scikit-learn
Azure ML¶
Python
from azureml.core import Workspace, Model, Webservice
from azureml.core.webservice import AciWebservice
from azureml.core.model import InferenceConfig
# 连接工作区
ws = Workspace.from_config()
# 注册模型
model = Model.register(
workspace=ws,
model_path="model.joblib",
model_name="my-model"
)
# 配置推理
inference_config = InferenceConfig(
entry_script="score.py",
environment=my_env
)
# 部署
deployment_config = AciWebservice.deploy_configuration(
cpu_cores=1,
memory_gb=4
)
service = Model.deploy(
ws,
"myservice",
[model],
inference_config,
deployment_config
)
service.wait_for_deployment(show_output=True)
🔄 CI/CD流水线¶
GitHub Actions示例¶
YAML
# .github/workflows/ml-ci-cd.yml
name: ML CI/CD Pipeline
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
train-and-evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Run tests
run: |
pytest tests/
- name: Train model
run: |
python train.py
- name: Evaluate model
run: |
python evaluate.py
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: model
path: model.joblib
deploy:
needs: train-and-evaluate
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Download model
uses: actions/download-artifact@v4
with:
name: model
- name: Deploy to production
run: |
# 部署脚本
./deploy.sh
📊 模型监控¶
监控指标¶
Python
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# 定义指标
prediction_counter = Counter(
'predictions_total',
'Total number of predictions'
)
prediction_latency = Histogram(
'prediction_latency_seconds',
'Prediction latency'
)
model_confidence = Gauge(
'model_confidence',
'Model prediction confidence'
)
data_drift_score = Gauge(
'data_drift_score',
'Data drift detection score'
)
# 在预测时记录
import time
def predict_with_monitoring(features):
start_time = time.time()
# 预测
result = model.predict(features)
# 记录指标
prediction_counter.inc()
latency = time.time() - start_time
prediction_latency.observe(latency)
if hasattr(result, 'max'): # hasattr检查对象是否有指定属性
model_confidence.set(result.max())
return result
# 启动监控服务器
start_http_server(8001)
数据漂移检测¶
Python
from alibi_detect.cd import ChiSquareDrift, KSDrift
# 准备参考数据和当前数据
X_ref = X_train # 训练数据作为参考
X_current = X_test # 当前数据
# 检测分类变量漂移
dd = ChiSquareDrift(
X_ref,
p_val=0.05 # 显著性水平
)
labels = dd.predict(X_current)
print(f"漂移检测结果: {labels['data']['is_drift']}")
# 检测连续变量漂移(Kolmogorov-Smirnov检验)
cd = KSDrift(X_ref, p_val=0.05)
labels = cd.predict(X_current)
性能监控¶
Python
import mlflow
def log_metrics(model, X_test, y_test):
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# 记录到MLflow
mlflow.log_metrics({
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1
})
return {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1": f1
}
📋 实验追踪¶
MLflow示例¶
Python
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
# 启动MLflow UI
# mlflow ui
# 设置实验
mlflow.set_experiment("My Experiment")
# 训练并记录
with mlflow.start_run():
# 记录参数
mlflow.log_param("n_estimators", 100)
mlflow.log_param("max_depth", 10)
mlflow.log_param("learning_rate", 0.1)
# 训练模型
model = RandomForestClassifier(
n_estimators=100,
max_depth=10
)
model.fit(X_train, y_train)
# 记录指标
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
# 记录模型
mlflow.sklearn.log_model(model, "model")
# 记录artifacts
mlflow.log_artifact("config.json")
# 查看实验
# mlflow ui
# 访问 http://localhost:5000
🔄 A/B测试¶
部署多个模型版本¶
Python
from fastapi import FastAPI
import random
app = FastAPI()
# 加载不同版本
model_v1 = joblib.load("model_v1.joblib")
model_v2 = joblib.load("model_v2.joblib")
@app.post("/predict")
def predict(features: list[float]):
# A/B测试: 50%流量到新模型
if random.random() < 0.5:
model = model_v1
version = "v1"
else:
model = model_v2
version = "v2"
prediction = model.predict([features])[0]
# 记录版本和结果
log_prediction(version, features, prediction)
return {"prediction": prediction, "model_version": version}
🛡️ 安全与隐私¶
API认证¶
Python
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): # async def定义异步函数
token = credentials.credentials
if token != "your-secret-token":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
return token
@app.post("/predict")
async def predict(
request: PredictionRequest,
token: str = Depends(verify_token)
):
# 预测逻辑
pass
输入验证¶
Python
from pydantic import BaseModel, field_validator
class PredictionRequest(BaseModel):
age: int
income: float
score: float
@field_validator('age')
@classmethod
def validate_age(cls, v):
if v < 0 or v > 120:
raise ValueError('Age must be between 0 and 120')
return v
@field_validator('income')
@classmethod
def validate_income(cls, v):
if v < 0:
raise ValueError('Income cannot be negative')
return v
🚀 性能优化¶
批量预测¶
Python
@app.post("/predict_batch")
async def predict_batch(requests: list[PredictionRequest]):
# 批量处理,提高吞吐量
features_batch = np.array([req.features for req in requests])
# 批量预测
predictions_batch = model.predict(features_batch)
return [
{"prediction": int(pred)}
for pred in predictions_batch
]
缓存¶
Python
from functools import lru_cache
from fastapi import FastAPI
app = FastAPI()
@lru_cache(maxsize=1000)
def get_prediction(features_tuple):
# 转换为tuple以支持缓存
features = list(features_tuple)
return model.predict([features])[0]
@app.post("/predict")
async def predict(request: PredictionRequest):
features_tuple = tuple(request.features)
prediction = get_prediction(features_tuple)
return {"prediction": prediction}
📈 最佳实践清单¶
模型开发¶
模型部署¶
生产环境¶
安全¶
💡 实战建议¶
从小开始¶
Text Only
第1步: 本地API
→ Flask/FastAPI
→ 本地测试
第2步: Docker容器化
→ 保证环境一致性
→ 易于部署
第3步: 云平台部署
→ AWS/GCP/Azure
→ 托管服务
第4步: 监控与迭代
→ 持续监控
→ 定期重训练
成熟度模型¶
Text Only
Level 1: 手动部署
→ 手动训练,手动部署
Level 2: 自动化训练
→ CI/CD自动化训练流程
Level 3: 自动化部署
→ 模型自动部署到生产
Level 4: 持续监控与重训练
→ 自动监控性能,触发重训练
Level 5: 完全MLOps
→ 端到端自动化,持续优化
下一步: 查看 README.md,规划你的学习路线!