跳转至

🔧 监控与持续优化

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

学习目标:掌握模型监控指标体系、数据漂移检测、性能衰退检测、自动重训练Pipeline和Feature Store实践 预计时长:10-12小时 前置知识:模型部署基础(第2章)、统计学基础、Python编程

📎 交叉引用(本章聚焦ML/AI模型监控,基础设施监控见以下教程): - 基础设施监控(Prometheus/Grafana) → 云原生/监控告警系统 - eBPF可观测性 → 云原生/eBPF与可观测性 - LLM推理服务监控 → LLM学习/推理服务部署


📋 本章概览

模型上线后并非一劳永逸——真实世界的数据在持续变化,模型性能随时间衰退是必然现象。建立完善的监控和自动化持续优化系统,是MLOps成熟度的关键标志。

Text Only
生产环境模型监控闭环:

  ┌───────────┐     ┌──────────┐     ┌──────────┐
  │  推理请求  │────▶│ 推理服务  │────▶│  预测结果  │
  └───────────┘     └──────────┘     └──────────┘
       │                 │                  │
       ▼                 ▼                  ▼
  ┌──────────┐    ┌──────────┐      ┌──────────┐
  │ 输入监控  │    │ 服务监控  │      │ 输出监控  │
  │ 数据漂移  │    │ 延迟/吞吐 │      │ 预测分布  │
  └──────────┘    └──────────┘      └──────────┘
       │                │                  │
       └────────────────┼──────────────────┘
                 ┌──────────────┐
                 │   告警系统    │
                 │ Prometheus   │
                 └──────────────┘
                 ┌──────────────┐
                 │ 自动重训练    │
                 │  Pipeline    │
                 └──────────────┘

Grafana监控仪表板

上图展示了Grafana监控仪表板,显示了HTTP请求的各种指标,包括请求数量、成功率、延迟百分位数、HTTP状态码和连接统计。在MLOps中,类似的仪表板用于监控模型推理服务的性能、延迟、吞吐量和错误率等关键指标,帮助团队及时发现和解决问题。


一、模型监控指标体系

1.1 三层监控指标

Python
"""
模型监控指标体系
分为:基础设施层、服务层、业务层
"""
from dataclasses import dataclass, field
import time
import numpy as np
from collections import deque

@dataclass
class InfraMetrics:
    """基础设施层指标"""
    cpu_utilization: float = 0.0          # CPU利用率 %
    gpu_utilization: float = 0.0          # GPU利用率 %
    gpu_memory_used: float = 0.0          # GPU显存使用 GB
    memory_used: float = 0.0              # 内存使用 GB
    disk_io_read: float = 0.0             # 磁盘读取 MB/s
    network_bandwidth: float = 0.0        # 网络带宽 MB/s

@dataclass
class ServiceMetrics:
    """服务层指标"""
    qps: float = 0.0                      # 每秒查询数
    latency_p50: float = 0.0              # P50延迟 ms
    latency_p95: float = 0.0              # P95延迟 ms
    latency_p99: float = 0.0              # P99延迟 ms
    error_rate: float = 0.0               # 错误率 %
    timeout_rate: float = 0.0             # 超时率 %
    batch_size_avg: float = 0.0           # 平均批大小
    queue_depth: int = 0                  # 请求队列深度

@dataclass
class ModelMetrics:
    """模型/业务层指标"""
    prediction_distribution: dict = field(default_factory=dict)  # 预测分布
    confidence_mean: float = 0.0          # 平均置信度
    confidence_below_threshold: float = 0.0  # 低置信度比例
    input_feature_stats: dict = field(default_factory=dict)     # 输入特征统计
    output_drift_score: float = 0.0       # 输出漂移分数
    data_quality_score: float = 0.0       # 数据质量分数

1.2 Prometheus指标采集

Python
"""
使用prometheus_client暴露推理服务指标
"""
from prometheus_client import (
    Counter, Histogram, Gauge, Summary,  # Counter计数器:统计元素出现次数
    CollectorRegistry, generate_latest
)
import time

# 创建指标注册表
registry = CollectorRegistry()

# ===== 定义Prometheus指标 =====

# 请求计数
REQUEST_COUNT = Counter(
    "model_request_total",
    "Total number of inference requests",
    ["model_name", "model_version", "status"],
    registry=registry
)

# 请求延迟
REQUEST_LATENCY = Histogram(
    "model_request_latency_seconds",
    "Inference request latency in seconds",
    ["model_name", "model_version"],
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
    registry=registry
)

# 预测置信度分布
PREDICTION_CONFIDENCE = Histogram(
    "model_prediction_confidence",
    "Prediction confidence distribution",
    ["model_name", "predicted_class"],
    buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99],
    registry=registry
)

# 批大小
BATCH_SIZE = Histogram(
    "model_batch_size",
    "Inference batch size",
    ["model_name"],
    buckets=[1, 2, 4, 8, 16, 32, 64],
    registry=registry
)

# 当前GPU显存使用
GPU_MEMORY = Gauge(
    "model_gpu_memory_bytes",
    "GPU memory usage in bytes",
    ["gpu_id"],
    registry=registry
)

# 模型加载状态
MODEL_LOADED = Gauge(
    "model_loaded_info",
    "Model loaded information",
    ["model_name", "model_version"],
    registry=registry
)

class MetricsMiddleware:
    """推理服务指标中间件"""

    def __init__(self, model_name: str, model_version: str):
        self.model_name = model_name
        self.model_version = model_version
        MODEL_LOADED.labels(model_name, model_version).set(1)

    def record_request(self, status: str, latency: float,
                       batch_size: int, predictions: list, confidences: list):
        """记录一次推理请求的指标"""
        # 请求计数
        REQUEST_COUNT.labels(
            self.model_name, self.model_version, status
        ).inc()

        # 延迟
        REQUEST_LATENCY.labels(
            self.model_name, self.model_version
        ).observe(latency)

        # 批大小
        BATCH_SIZE.labels(self.model_name).observe(batch_size)

        # 预测置信度
        for pred, conf in zip(predictions, confidences):  # zip并行遍历多个可迭代对象
            PREDICTION_CONFIDENCE.labels(
                self.model_name, str(pred)
            ).observe(conf)

# FastAPI集成
from fastapi import FastAPI, Request
from fastapi.responses import Response

app = FastAPI()
metrics = MetricsMiddleware("text-classifier", "v1.2")

@app.get("/metrics")
def prometheus_metrics():
    """暴露Prometheus指标端点"""
    return Response(
        content=generate_latest(registry),
        media_type="text/plain"
    )

@app.post("/predict")
async def predict(request: Request):  # async def定义异步函数;用await调用
    start_time = time.time()

    try:
        # ... 推理逻辑 ...
        predictions = [1, 0, 2]
        confidences = [0.95, 0.87, 0.72]

        latency = time.time() - start_time
        metrics.record_request(
            status="success",
            latency=latency,
            batch_size=3,
            predictions=predictions,
            confidences=confidences
        )

        return {"predictions": predictions}
    except Exception as e:
        latency = time.time() - start_time
        metrics.record_request("error", latency, 0, [], [])
        raise

1.3 Grafana Dashboard配置

JSON
{
  "dashboard": {
    "title": "Model Inference Monitoring",
    "panels": [
      {
        "title": "Request Rate (QPS)",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(model_request_total{model_name='text-classifier'}[5m])",
            "legendFormat": "{{status}}"
          }
        ]
      },
      {
        "title": "Latency Percentiles",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.50, rate(model_request_latency_seconds_bucket[5m]))",
            "legendFormat": "P50"
          },
          {
            "expr": "histogram_quantile(0.95, rate(model_request_latency_seconds_bucket[5m]))",
            "legendFormat": "P95"
          },
          {
            "expr": "histogram_quantile(0.99, rate(model_request_latency_seconds_bucket[5m]))",
            "legendFormat": "P99"
          }
        ]
      },
      {
        "title": "Error Rate",
        "type": "stat",
        "targets": [
          {
            "expr": "rate(model_request_total{status='error'}[5m]) / rate(model_request_total[5m])"
          }
        ]
      },
      {
        "title": "Prediction Confidence Distribution",
        "type": "heatmap",
        "targets": [
          {
            "expr": "rate(model_prediction_confidence_bucket[5m])"
          }
        ]
      }
    ]
  }
}

二、数据漂移检测

2.1 漂移类型

Text Only
数据漂移分类:
┌──────────────────────────────────────────────────────┐
│  协变量漂移(Covariate Shift)                         │
│  P_train(X) ≠ P_serve(X)                             │
│  特征分布变化,但X→Y的映射没变                           │
│  例:训练集主要是白天图片,线上出现大量夜间图片             │
├──────────────────────────────────────────────────────┤
│  概念漂移(Concept Drift)                             │
│  P_train(Y|X) ≠ P_serve(Y|X)                         │
│  特征与标签的关系发生变化                                │
│  例:疫情导致购买行为模式剧变                            │
├──────────────────────────────────────────────────────┤
│  标签漂移(Label Shift / Prior Probability Shift)     │
│  P_train(Y) ≠ P_serve(Y)                             │
│  标签分布变化                                          │
│  例:垃圾邮件比例从5%上升到20%                          │
└──────────────────────────────────────────────────────┘

2.2 统计检测方法

Python
"""
数据漂移检测方法实现
"""
import numpy as np
from scipy import stats

class DriftDetector:
    """数据漂移检测器"""

    @staticmethod  # @staticmethod静态方法,不需要实例
    def psi(expected: np.ndarray, actual: np.ndarray, bins: int = 10) -> float:
        """
        PSI (Population Stability Index) - 群体稳定性指标

        PSI < 0.1: 无显著漂移
        0.1 ≤ PSI < 0.2: 中等漂移,需关注
        PSI ≥ 0.2: 显著漂移,需采取行动
        """
        # 计算分桶边界(基于期望分布)
        breakpoints = np.linspace(
            min(expected.min(), actual.min()),
            max(expected.max(), actual.max()),
            bins + 1
        )

        # 计算各桶的比例
        expected_counts = np.histogram(expected, breakpoints)[0]
        actual_counts = np.histogram(actual, breakpoints)[0]

        # 避免零值
        expected_pct = np.clip(expected_counts / len(expected), 1e-6, None)
        actual_pct = np.clip(actual_counts / len(actual), 1e-6, None)

        # PSI = Σ (actual% - expected%) * ln(actual% / expected%)
        psi_value = np.sum(
            (actual_pct - expected_pct) * np.log(actual_pct / expected_pct)
        )

        return psi_value

    @staticmethod
    def ks_test(reference: np.ndarray, current: np.ndarray) -> tuple[float, float]:
        """
        KS检验 (Kolmogorov-Smirnov Test)
        适用于连续特征的分布比较

        返回: (统计量, p值)
        p值 < 0.05 表示两个分布有显著差异
        """
        statistic, p_value = stats.ks_2samp(reference, current)
        return statistic, p_value

    @staticmethod
    def chi_square_test(reference: np.ndarray, current: np.ndarray) -> tuple[float, float]:
        """
        卡方检验 - 适用于类别特征
        """
        # 获取所有类别
        categories = np.unique(np.concatenate([reference, current]))

        ref_counts = np.array([np.sum(reference == c) for c in categories])
        cur_counts = np.array([np.sum(current == c) for c in categories])

        # 归一化
        ref_freq = ref_counts / ref_counts.sum()
        cur_freq = cur_counts / cur_counts.sum()

        # 期望频率基于参考分布
        expected = ref_freq * cur_counts.sum()
        expected = np.clip(expected, 1e-6, None)

        statistic, p_value = stats.chisquare(cur_counts, f_exp=expected)
        return statistic, p_value

    @staticmethod
    def js_divergence(p: np.ndarray, q: np.ndarray, bins: int = 50) -> float:
        """
        JS散度 (Jensen-Shannon Divergence)
        值域[0, 1],0表示分布相同
        """
        # 将连续值离散化
        breakpoints = np.linspace(
            min(p.min(), q.min()),
            max(p.max(), q.max()),
            bins + 1
        )

        p_hist = np.histogram(p, breakpoints)[0].astype(float)
        q_hist = np.histogram(q, breakpoints)[0].astype(float)

        # 归一化
        p_hist = p_hist / p_hist.sum()
        q_hist = q_hist / q_hist.sum()

        # 避免零值
        p_hist = np.clip(p_hist, 1e-10, None)
        q_hist = np.clip(q_hist, 1e-10, None)

        # M = (P + Q) / 2
        m = 0.5 * (p_hist + q_hist)

        js = 0.5 * stats.entropy(p_hist, m) + 0.5 * stats.entropy(q_hist, m)
        return js

# ===== 使用示例 =====
np.random.seed(42)

# 模拟参考分布(训练数据)和当前分布(线上数据)
reference_data = np.random.normal(0, 1, 10000)      # 训练时的特征分布
current_data_ok = np.random.normal(0.05, 1.02, 5000) # 轻微漂移
current_data_bad = np.random.normal(0.5, 1.5, 5000)  # 严重漂移

detector = DriftDetector()

print("=" * 50)
print("轻微漂移场景:")
print(f"  PSI: {detector.psi(reference_data, current_data_ok):.4f}")
ks_stat, ks_p = detector.ks_test(reference_data, current_data_ok)
print(f"  KS Test: statistic={ks_stat:.4f}, p-value={ks_p:.4f}")
print(f"  JS Divergence: {detector.js_divergence(reference_data, current_data_ok):.4f}")

print("\n严重漂移场景:")
print(f"  PSI: {detector.psi(reference_data, current_data_bad):.4f}")
ks_stat, ks_p = detector.ks_test(reference_data, current_data_bad)
print(f"  KS Test: statistic={ks_stat:.4f}, p-value={ks_p:.4f}")
print(f"  JS Divergence: {detector.js_divergence(reference_data, current_data_bad):.4f}")

2.3 Evidently AI - 数据漂移监控

Python
"""
使用Evidently进行数据漂移检测与报告生成
pip install evidently
"""
import pandas as pd
import numpy as np

# 创建模拟数据
np.random.seed(42)
n_samples = 1000

# 参考数据(训练时的分布)
reference_df = pd.DataFrame({
    "feature_age": np.random.normal(35, 10, n_samples),
    "feature_income": np.random.lognormal(10, 1, n_samples),
    "feature_score": np.random.uniform(0, 100, n_samples),
    "category": np.random.choice(["A", "B", "C", "D"], n_samples, p=[0.4, 0.3, 0.2, 0.1]),
    "prediction": np.random.choice([0, 1], n_samples, p=[0.7, 0.3]),
    "target": np.random.choice([0, 1], n_samples, p=[0.7, 0.3])
})

# 当前数据(线上数据,模拟漂移)
current_df = pd.DataFrame({
    "feature_age": np.random.normal(38, 12, n_samples),        # 均值和方差漂移
    "feature_income": np.random.lognormal(10.5, 1.2, n_samples), # 分布漂移
    "feature_score": np.random.uniform(10, 90, n_samples),      # 范围变化
    "category": np.random.choice(["A", "B", "C", "D"], n_samples, p=[0.25, 0.25, 0.3, 0.2]),  # 类别漂移
    "prediction": np.random.choice([0, 1], n_samples, p=[0.6, 0.4]),  # 预测分布变化
    "target": np.random.choice([0, 1], n_samples, p=[0.65, 0.35])
})

# ===== Evidently数据漂移报告 =====
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import (
    DataDriftTable,
    DatasetDriftMetric,
    ColumnDriftMetric
)

# 生成数据漂移报告
drift_report = Report(metrics=[
    DataDriftPreset(),                           # 整体数据漂移
    ColumnDriftMetric(column_name="feature_age"), # 单特征漂移
])

drift_report.run(
    reference_data=reference_df,
    current_data=current_df
)

# 保存HTML报告
# drift_report.save_html("drift_report.html")

# 获取JSON结果(用于程序化处理)
drift_results = drift_report.as_dict()
print("Dataset drift detected:", drift_results["metrics"][0]["result"]["dataset_drift"])

# ===== Evidently数据质量报告 =====
quality_report = Report(metrics=[
    DataQualityPreset()
])

quality_report.run(
    reference_data=reference_df,
    current_data=current_df
)

# quality_report.save_html("quality_report.html")

# ===== Evidently监控(实时/批量)=====
from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataStabilityTestPreset
from evidently.tests import (
    TestColumnDrift,
    TestShareOfDriftedColumns,
    TestNumberOfDriftedColumns
)

# 定义测试套件
test_suite = TestSuite(tests=[
    DataDriftTestPreset(),
    TestColumnDrift(column_name="feature_age", stattest="ks", stattest_threshold=0.05),
    TestShareOfDriftedColumns(lt=0.3),  # 漂移特征比例不超过30%
])

test_suite.run(reference_data=reference_df, current_data=current_df)

# 获取测试结果
# 获取测试结果
test_results = test_suite.as_dict()
# all()检查所有测试是否通过:生成器逐一产出布尔值,遇到第一个False立即短路返回False
all_passed = all(
    test["status"] == "SUCCESS"
    for test in test_results["tests"]
)
print(f"All drift tests passed: {all_passed}")

三、模型性能衰退检测

3.1 性能衰退检测器

Python
"""
模型性能衰退检测
支持:滑动窗口检测、ADWIN、Page-Hinkley
"""
import numpy as np
from collections import deque

class SlidingWindowDetector:
    """
    滑动窗口性能检测器
    比较最近窗口与参考窗口的指标差异
    """

    def __init__(
        self,
        window_size: int = 1000,
        reference_size: int = 5000,
        threshold: float = 0.05,
        metric_name: str = "accuracy"
    ):
        self.window_size = window_size
        self.reference_size = reference_size
        self.threshold = threshold
        self.metric_name = metric_name

        self.reference_metrics: deque = deque(maxlen=reference_size)
        self.current_metrics: deque = deque(maxlen=window_size)
        self.alerts: list = []

    def update(self, metric_value: float, timestamp: float | None = None):
        """更新指标值"""
        import time
        ts = timestamp or time.time()

        self.current_metrics.append(metric_value)

        # 参考窗口满后开始检测
        if len(self.reference_metrics) >= self.reference_size:
            if len(self.current_metrics) >= self.window_size:
                self._check_degradation(ts)

        self.reference_metrics.append(metric_value)

    def _check_degradation(self, timestamp: float):
        """检查是否发生性能衰退"""
        ref_mean = np.mean(list(self.reference_metrics))
        cur_mean = np.mean(list(self.current_metrics))

        # 相对下降幅度
        degradation = (ref_mean - cur_mean) / (ref_mean + 1e-10)

        if degradation > self.threshold:
            alert = {
                "timestamp": timestamp,
                "metric": self.metric_name,
                "reference_mean": ref_mean,
                "current_mean": cur_mean,
                "degradation_pct": degradation * 100,
                "severity": "HIGH" if degradation > 0.1 else "MEDIUM"
            }
            self.alerts.append(alert)
            return True
        return False

class PageHinkleyDetector:
    """
    Page-Hinkley变点检测
    用于检测数据流中的分布变化点
    """

    def __init__(self, delta: float = 0.005, threshold: float = 50, alpha: float = 0.9999):
        self.delta = delta         # 容许的变化量
        self.threshold = threshold  # 检测阈值
        self.alpha = alpha         # 遗忘因子

        self.sum = 0.0
        self.x_mean = 0.0
        self.count = 0
        self.min_sum = float('inf')

    def update(self, value: float) -> bool:
        """
        输入新值,返回是否检测到变化
        """
        self.count += 1

        # 更新均值
        self.x_mean = self.x_mean + (value - self.x_mean) / self.count

        # 更新累积和
        self.sum = self.alpha * self.sum + (value - self.x_mean - self.delta)

        # 更新最小累积和
        self.min_sum = min(self.min_sum, self.sum)

        # 检测
        ph_value = self.sum - self.min_sum

        if ph_value > self.threshold:
            # 检测到变化,重置
            self.sum = 0.0
            self.min_sum = float('inf')
            return True

        return False

# ===== 使用示例 =====
np.random.seed(42)

# 模拟准确率指标流
# 前3000个:稳定在0.92附近
# 后2000个:衰退到0.85附近
metrics_stream = np.concatenate([
    np.random.normal(0.92, 0.02, 3000),
    np.random.normal(0.85, 0.03, 2000)
])

# 滑动窗口检测
sw_detector = SlidingWindowDetector(
    window_size=500, reference_size=2000, threshold=0.03
)

for i, metric in enumerate(metrics_stream):  # enumerate同时获取索引和值
    sw_detector.update(metric)

if sw_detector.alerts:
    print(f"Sliding Window alerts: {len(sw_detector.alerts)}")
    for alert in sw_detector.alerts[:3]:  # 切片操作:[start:end:step]提取子序列
        print(f"  Degradation: {alert['degradation_pct']:.1f}%, "
              f"Ref: {alert['reference_mean']:.4f}, Cur: {alert['current_mean']:.4f}")

# Page-Hinkley检测
ph_detector = PageHinkleyDetector(delta=0.005, threshold=20)
for i, metric in enumerate(metrics_stream):
    if ph_detector.update(metric):
        print(f"Page-Hinkley: Change detected at step {i}")
        break

3.2 预测分布监控

Python
"""
监控模型预测输出的分布变化
"""
import numpy as np
from scipy.stats import entropy

class PredictionMonitor:
    """预测输出分布监控"""

    def __init__(self, num_classes: int, window_size: int = 10000):
        self.num_classes = num_classes
        self.window_size = window_size
        self.reference_dist = None
        self.predictions_buffer = deque(maxlen=window_size)
        self.confidences_buffer = deque(maxlen=window_size)

    def set_reference(self, predictions: np.ndarray, confidences: np.ndarray):
        """设置参考分布(从验证集获取)"""
        # 预测类别分布
        self.reference_dist = np.bincount(predictions, minlength=self.num_classes)
        self.reference_dist = self.reference_dist / self.reference_dist.sum()

        # 参考置信度统计
        self.reference_conf_mean = np.mean(confidences)
        self.reference_conf_std = np.std(confidences)

    def update(self, prediction: int, confidence: float):
        """更新监控数据"""
        self.predictions_buffer.append(prediction)
        self.confidences_buffer.append(confidence)

    def check(self) -> dict:
        """检查当前分布是否异常"""
        if len(self.predictions_buffer) < self.window_size // 2:
            return {"status": "insufficient_data"}

        preds = np.array(list(self.predictions_buffer))
        confs = np.array(list(self.confidences_buffer))

        # 1. 预测分布偏移
        current_dist = np.bincount(preds, minlength=self.num_classes)
        current_dist = current_dist / current_dist.sum()

        kl_div = entropy(current_dist + 1e-10, self.reference_dist + 1e-10)

        # 2. 置信度异常
        conf_mean_shift = abs(np.mean(confs) - self.reference_conf_mean)
        low_conf_ratio = np.mean(confs < 0.5)

        # 3. 判断告警
        alerts = []
        if kl_div > 0.1:
            alerts.append(f"Prediction distribution drift (KL={kl_div:.4f})")
        if conf_mean_shift > 0.1:
            alerts.append(f"Confidence mean shift ({conf_mean_shift:.4f})")
        if low_conf_ratio > 0.3:
            alerts.append(f"High low-confidence ratio ({low_conf_ratio:.2%})")

        return {
            "status": "alert" if alerts else "healthy",
            "prediction_distribution": current_dist.tolist(),
            "kl_divergence": kl_div,
            "confidence_mean": np.mean(confs),
            "low_confidence_ratio": low_conf_ratio,
            "alerts": alerts
        }

# 使用示例
monitor = PredictionMonitor(num_classes=5)

# 设置参考分布
ref_preds = np.random.choice(5, 5000, p=[0.3, 0.25, 0.2, 0.15, 0.1])
ref_confs = np.random.beta(5, 1, 5000)  # 高置信度
monitor.set_reference(ref_preds, ref_confs)

# 模拟正常流量
for _ in range(5000):
    pred = np.random.choice(5, p=[0.3, 0.25, 0.2, 0.15, 0.1])
    conf = np.random.beta(5, 1)
    monitor.update(pred, conf)

status = monitor.check()
print(f"Normal traffic - Status: {status['status']}")

# 模拟异常流量(分布偏移)
monitor_bad = PredictionMonitor(num_classes=5)
monitor_bad.set_reference(ref_preds, ref_confs)

for _ in range(5000):
    pred = np.random.choice(5, p=[0.1, 0.1, 0.4, 0.2, 0.2])  # 分布变化
    conf = np.random.beta(2, 3)  # 低置信度
    monitor_bad.update(pred, conf)

status_bad = monitor_bad.check()
print(f"Drifted traffic - Status: {status_bad['status']}")
for alert in status_bad.get("alerts", []):
    print(f"  Alert: {alert}")

四、告警与自动重训练Pipeline

4.1 告警系统

Python
"""
多级告警系统
"""
import json
from enum import Enum
from datetime import datetime
from dataclasses import dataclass
from collections.abc import Callable

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class Alert:
    """告警定义"""
    name: str
    severity: AlertSeverity
    message: str
    metric_name: str
    metric_value: float
    threshold: float
    timestamp: datetime = None

    def __post_init__(self):
        self.timestamp = self.timestamp or datetime.now()

class AlertRule:
    """告警规则"""

    def __init__(self, name: str, metric_name: str, threshold: float,
                 comparison: str = "gt", severity: AlertSeverity = AlertSeverity.WARNING,
                 cooldown_seconds: int = 300):
        self.name = name
        self.metric_name = metric_name
        self.threshold = threshold
        self.comparison = comparison  # gt, lt, gte, lte
        self.severity = severity
        self.cooldown = cooldown_seconds
        self.last_alert_time = None

    def evaluate(self, metric_value: float) -> Alert | None:
        """评估是否触发告警"""
        now = datetime.now()

        # 冷却期检查
        if self.last_alert_time:
            elapsed = (now - self.last_alert_time).total_seconds()
            if elapsed < self.cooldown:
                return None

        # 条件判断
        triggered = False
        if self.comparison == "gt" and metric_value > self.threshold:
            triggered = True
        elif self.comparison == "lt" and metric_value < self.threshold:
            triggered = True
        elif self.comparison == "gte" and metric_value >= self.threshold:
            triggered = True
        elif self.comparison == "lte" and metric_value <= self.threshold:
            triggered = True

        if triggered:
            self.last_alert_time = now
            return Alert(
                name=self.name,
                severity=self.severity,
                message=f"{self.metric_name} ({metric_value:.4f}) "
                        f"{'exceeded' if 'g' in self.comparison else 'below'} "
                        f"threshold ({self.threshold})",
                metric_name=self.metric_name,
                metric_value=metric_value,
                threshold=self.threshold
            )
        return None

class AlertManager:
    """告警管理器"""

    def __init__(self):
        self.rules: list[AlertRule] = []
        self.handlers: list[Callable] = []
        self.alert_history: list[Alert] = []

    def add_rule(self, rule: AlertRule):
        self.rules.append(rule)

    def add_handler(self, handler: Callable):
        """添加告警处理器(邮件/Slack/钉钉/PagerDuty)"""
        self.handlers.append(handler)

    def evaluate(self, metrics: dict):
        """评估所有规则"""
        for rule in self.rules:
            if rule.metric_name in metrics:
                alert = rule.evaluate(metrics[rule.metric_name])
                if alert:
                    self.alert_history.append(alert)
                    for handler in self.handlers:
                        handler(alert)

# ===== 定义告警规则 =====
alert_manager = AlertManager()

# 错误率告警
alert_manager.add_rule(AlertRule(
    name="high_error_rate",
    metric_name="error_rate",
    threshold=0.01,
    comparison="gt",
    severity=AlertSeverity.CRITICAL,
    cooldown_seconds=600
))

# 延迟告警
alert_manager.add_rule(AlertRule(
    name="high_latency",
    metric_name="p99_latency_ms",
    threshold=500,
    comparison="gt",
    severity=AlertSeverity.WARNING
))

# 数据漂移告警
alert_manager.add_rule(AlertRule(
    name="data_drift",
    metric_name="drift_psi",
    threshold=0.2,
    comparison="gt",
    severity=AlertSeverity.WARNING
))

# 准确率衰退告警
alert_manager.add_rule(AlertRule(
    name="accuracy_degradation",
    metric_name="accuracy",
    threshold=0.85,
    comparison="lt",
    severity=AlertSeverity.CRITICAL
))

# 添加告警处理器
def console_handler(alert: Alert):
    print(f"[{alert.severity.value.upper()}] {alert.name}: {alert.message}")

def slack_handler(alert: Alert):
    """发送Slack告警(示例)"""
    # import requests
    # webhook_url = "https://hooks.slack.com/services/xxx"
    # requests.post(webhook_url, json={
    #     "text": f":warning: *{alert.name}*\n{alert.message}"
    # })
    pass

alert_manager.add_handler(console_handler)
alert_manager.add_handler(slack_handler)

# 评估指标
alert_manager.evaluate({
    "error_rate": 0.02,
    "p99_latency_ms": 320,
    "drift_psi": 0.25,
    "accuracy": 0.83
})

4.2 自动重训练Pipeline

Python
"""
自动重训练Pipeline
检测到漂移/衰退后自动触发模型重训练
"""
from dataclasses import dataclass
from enum import Enum
import time

class PipelineStatus(Enum):
    IDLE = "idle"
    TRIGGERED = "triggered"
    TRAINING = "training"
    VALIDATING = "validating"
    DEPLOYING = "deploying"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass  # 自动生成__init__等方法
class RetrainingConfig:
    """重训练配置"""
    trigger_conditions: dict = None
    min_retrain_interval_hours: int = 24
    validation_threshold: float = 0.85
    auto_deploy: bool = False
    max_retries: int = 2

    def __post_init__(self):
        if self.trigger_conditions is None:
            self.trigger_conditions = {
                "accuracy_below": 0.85,
                "drift_psi_above": 0.2,
                "error_rate_above": 0.05
            }

class AutoRetrainingPipeline:
    """
    自动重训练Pipeline

    流程:
    1. 监控指标触发重训练
    2. 收集最新训练数据
    3. 执行训练
    4. 验证模型质量
    5. 自动/手动部署
    """

    def __init__(self, config: RetrainingConfig):
        self.config = config
        self.status = PipelineStatus.IDLE
        self.last_retrain_time = 0
        self.history = []

    def should_retrain(self, metrics: dict) -> bool:
        """判断是否需要重训练"""
        if self.status != PipelineStatus.IDLE:
            return False

        # 最小间隔检查
        hours_since_last = (time.time() - self.last_retrain_time) / 3600
        if hours_since_last < self.config.min_retrain_interval_hours:
            return False

        conditions = self.config.trigger_conditions

        if metrics.get("accuracy", 1.0) < conditions.get("accuracy_below", 0):
            return True
        if metrics.get("drift_psi", 0) > conditions.get("drift_psi_above", 1):
            return True
        if metrics.get("error_rate", 0) > conditions.get("error_rate_above", 1):
            return True

        return False

    def run(self, metrics: dict):
        """执行重训练Pipeline"""
        if not self.should_retrain(metrics):
            return

        run_info = {
            "trigger_time": time.time(),
            "trigger_metrics": metrics.copy(),
            "steps": []
        }

        try:  # try/except捕获异常
            # Step 1: 数据准备
            self.status = PipelineStatus.TRIGGERED
            print("[Pipeline] Step 1: Preparing training data...")
            run_info["steps"].append("data_preparation")
            # self._prepare_data()

            # Step 2: 训练
            self.status = PipelineStatus.TRAINING
            print("[Pipeline] Step 2: Training model...")
            run_info["steps"].append("training")
            # model = self._train_model()

            # Step 3: 验证
            self.status = PipelineStatus.VALIDATING
            print("[Pipeline] Step 3: Validating model...")
            validation_score = 0.91  # 模拟验证分数
            run_info["validation_score"] = validation_score

            if validation_score < self.config.validation_threshold:
                raise ValueError(
                    f"Validation score {validation_score} below threshold "
                    f"{self.config.validation_threshold}"
                )

            # Step 4: 部署
            if self.config.auto_deploy:
                self.status = PipelineStatus.DEPLOYING
                print("[Pipeline] Step 4: Deploying model...")
                # self._deploy_model(model)
            else:
                print("[Pipeline] Step 4: Model ready for manual deployment")

            self.status = PipelineStatus.COMPLETED
            self.last_retrain_time = time.time()
            run_info["status"] = "completed"
            print("[Pipeline] Retraining completed successfully!")

        except Exception as e:
            self.status = PipelineStatus.FAILED
            run_info["status"] = "failed"
            run_info["error"] = str(e)
            print(f"[Pipeline] Failed: {e}")

        finally:
            self.history.append(run_info)
            self.status = PipelineStatus.IDLE

# 使用示例
config = RetrainingConfig(
    trigger_conditions={
        "accuracy_below": 0.85,
        "drift_psi_above": 0.2
    },
    min_retrain_interval_hours=0,  # 演示用,设为0
    auto_deploy=False
)

pipeline = AutoRetrainingPipeline(config)

# 模拟监控指标
current_metrics = {
    "accuracy": 0.82,     # 低于阈值
    "drift_psi": 0.25,    # 高于阈值
    "error_rate": 0.01
}

pipeline.run(current_metrics)

五、在线评估与离线评估

5.1 评估方法对比

Python
"""
在线评估 vs 离线评估 对比和实现
"""

class OfflineEvaluator:
    """
    离线评估:在静态测试集上评估模型
    优点:快速、可控、可复现
    缺点:不能反映真实用户行为
    """

    def evaluate(self, model, test_data: dict) -> dict:
        """标准离线评估"""
        # predictions = model.predict(test_data["features"])
        predictions = test_data.get("predictions", [])
        labels = test_data.get("labels", [])

        from sklearn.metrics import accuracy_score, f1_score

        metrics = {
            "accuracy": accuracy_score(labels, predictions),
            "f1_macro": f1_score(labels, predictions, average="macro"),
            "f1_weighted": f1_score(labels, predictions, average="weighted"),
        }

        # 分组评估(公平性)
        # for group in test_data.get("groups", []):
        #     group_metrics = evaluate_group(predictions, labels, group)
        #     metrics[f"accuracy_{group}"] = group_metrics["accuracy"]

        return metrics

class OnlineEvaluator:
    """
    在线评估:通过线上实验评估模型
    优点:反映真实效果,考虑用户交互
    缺点:周期长、需要足够流量、有风险
    """

    def __init__(self, experiment_name: str, metric_names: list):
        self.experiment_name = experiment_name
        self.metric_names = metric_names
        self.control_data = []
        self.treatment_data = []

    def record(self, group: str, metrics: dict):
        """记录实验数据"""
        if group == "control":
            self.control_data.append(metrics)
        else:
            self.treatment_data.append(metrics)

    def analyze(self) -> dict:
        """分析A/B测试结果"""
        from scipy import stats
        import numpy as np

        results = {}
        for metric in self.metric_names:
            control_values = [d[metric] for d in self.control_data if metric in d]
            treatment_values = [d[metric] for d in self.treatment_data if metric in d]

            # t检验
            t_stat, p_value = stats.ttest_ind(control_values, treatment_values)

            control_mean = np.mean(control_values)
            treatment_mean = np.mean(treatment_values)
            lift = (treatment_mean - control_mean) / (control_mean + 1e-10)

            results[metric] = {
                "control_mean": control_mean,
                "treatment_mean": treatment_mean,
                "lift": lift,
                "p_value": p_value,
                "significant": p_value < 0.05
            }

        return results

# 使用示例
online_eval = OnlineEvaluator("model_v2_test", ["ctr", "conversion_rate"])

import numpy as np
np.random.seed(42)

# 模拟在线数据
for _ in range(5000):
    online_eval.record("control", {
        "ctr": np.random.binomial(1, 0.05),
        "conversion_rate": np.random.binomial(1, 0.02)
    })
    online_eval.record("treatment", {
        "ctr": np.random.binomial(1, 0.058),  # 新模型CTR更高
        "conversion_rate": np.random.binomial(1, 0.022)
    })

results = online_eval.analyze()
for metric, data in results.items():
    print(f"{metric}: lift={data['lift']:.2%}, p={data['p_value']:.4f}, "
          f"significant={data['significant']}")

六、Feature Store概念与实践

6.1 Feature Store核心概念

Text Only
Feature Store架构:

 ┌─────────────┐    ┌──────────────────────────────────┐
 │ 数据源       │    │        Feature Store              │
 │ ┌─────────┐ │    │  ┌───────────┐  ┌─────────────┐  │
 │ │数据库    │─┼───▶│  │Feature    │  │ 在线Store   │  │──▶ 推理服务
 │ │流数据    │ │    │  │Pipeline   │  │ (Redis)     │  │    (低延迟)
 │ │日志     │ │    │  │Transform  │  └─────────────┘  │
 │ └─────────┘ │    │  │           │  ┌─────────────┐  │
 └─────────────┘    │  └───────────┘  │ 离线Store   │  │──▶ 训练Pipeline
                    │                 │ (Parquet)   │  │    (批量读取)
                    │                 └─────────────┘  │
                    │  ┌───────────────────────────┐   │
                    │  │ Feature Registry           │   │
                    │  │ (元数据/版本/血缘/权限)      │   │
                    │  └───────────────────────────┘   │
                    └──────────────────────────────────┘

6.2 Feast实践

Python
"""
Feast Feature Store 实践
pip install feast
"""
# ===== 1. Feast项目定义 (feature_store.yaml) =====
"""
project: fraud_detection
registry: data/registry.db
provider: local
online_store:
  type: sqlite
  path: data/online_store.db
offline_store:
  type: file
entity_key_serialization_version: 2
"""

# ===== 2. 特征定义 (features.py) =====
from datetime import timedelta
# from feast import Entity, FeatureView, FileSource, Field
# from feast.types import Float64, Int64, String

# 实体定义(用户ID)
# user = Entity(
#     name="user_id",
#     join_keys=["user_id"],
#     description="User ID"
# )

# 数据源
# user_features_source = FileSource(
#     path="data/user_features.parquet",
#     timestamp_field="event_timestamp",
#     created_timestamp_column="created_timestamp"
# )

# 特征视图
# user_feature_view = FeatureView(
#     name="user_features",
#     entities=[user],
#     ttl=timedelta(days=1),  # 特征有效期
#     schema=[
#         Field(name="transaction_count_7d", dtype=Int64),
#         Field(name="avg_transaction_amount_7d", dtype=Float64),
#         Field(name="max_transaction_amount_7d", dtype=Float64),
#         Field(name="account_age_days", dtype=Int64),
#         Field(name="risk_score", dtype=Float64),
#     ],
#     source=user_features_source
# )

# ===== 3. Feast操作 =====
"""
# 初始化(命令行)
feast init fraud_detection
cd fraud_detection

# 注册特征
feast apply

# 物化特征到在线存储
feast materialize-incremental $(date +%Y-%m-%dT%H:%M:%S)
"""

# ===== 4. 获取特征(训练时 - 离线) =====
# from feast import FeatureStore
# import pandas as pd
#
# store = FeatureStore(repo_path=".")
#
# # 获取训练特征(point-in-time join)
# entity_df = pd.DataFrame({
#     "user_id": [1001, 1002, 1003],
#     "event_timestamp": pd.to_datetime(["2024-01-15", "2024-01-15", "2024-01-15"])
# })
#
# training_df = store.get_historical_features(
#     entity_df=entity_df,
#     features=[
#         "user_features:transaction_count_7d",
#         "user_features:avg_transaction_amount_7d",
#         "user_features:risk_score"
#     ]
# ).to_df()
#
# print(training_df.head())

# ===== 5. 获取特征(推理时 - 在线) =====
# online_features = store.get_online_features(
#     features=[
#         "user_features:transaction_count_7d",
#         "user_features:avg_transaction_amount_7d",
#         "user_features:risk_score"
#     ],
#     entity_rows=[
#         {"user_id": 1001},
#         {"user_id": 1002}
#     ]
# ).to_dict()
#
# print(online_features)

6.3 Feature Store选型对比

Python
"""
主流Feature Store对比
"""
feature_stores = {
    "Feast": {
        "类型": "开源",
        "在线存储": "Redis/DynamoDB/SQLite",
        "离线存储": "BigQuery/Redshift/Parquet",
        "适用场景": "中小团队,灵活部署",
        "优点": "开源免费、支持多后端、社区活跃",
        "缺点": "企业级功能需自建"
    },
    "Tecton": {
        "类型": "商业SaaS",
        "在线存储": "托管服务",
        "离线存储": "托管服务",
        "适用场景": "大规模企业",
        "优点": "全托管、实时特征、监控完善",
        "缺点": "付费、供应商锁定"
    },
    "Hopsworks": {
        "类型": "开源+商业",
        "在线存储": "RonDB",
        "离线存储": "Hudi",
        "适用场景": "需要完整ML平台",
        "优点": "内置特征工程、GPU支持",
        "缺点": "较重"
    }
}

for name, info in feature_stores.items():
    print(f"\n{name}:")
    for key, value in info.items():
        print(f"  {key}: {value}")

💼 面试常考题

Q1: 什么是数据漂移?如何检测?

:数据漂移指生产数据的分布与训练数据的分布发生偏移。分为协变量漂移(输入分布变化)、概念漂移(输入-输出关系变化)、标签漂移(输出分布变化)。检测方法:①PSI(群体稳定性指标)②KS检验③卡方检验(类别特征)④JS散度⑤使用Evidently等工具自动化检测。

Q2: PSI的计算公式和阈值是什么?

\(PSI = \sum_{i=1}^{n}(P_i^{actual} - P_i^{expected}) \times \ln(\frac{P_i^{actual}}{P_i^{expected}})\)。将数据分桶,比较每个桶中实际分布与期望分布的差异。阈值:PSI<0.1无漂移,0.1-0.2中等漂移,≥0.2严重漂移。

Q3: 模型上线后性能下降怎么排查?

:按层排查:①数据层:检查输入数据质量(缺失值、异常值、分布变化)②模型层:在近期数据上离线评估,对比历史指标③服务层:检查延迟、错误率、资源使用④业务层:检查标签分布是否变化、新场景覆盖率。同时检查上下游依赖变更、特征Pipeline是否正常。

Q4: 如何设计自动重训练系统?

:组成部分:①触发器(定时/指标驱动/漂移驱动)②数据收集(从Feature Store获取最新特征+标签)③训练Pipeline(自动化训练+超参搜索)④模型验证(Shadow mode/回测)⑤审批部署(自动金丝雀或人工审批)⑥监控回滚。关键:设置最小重训间隔、验证阈值、回滚机制。

Q5: 在线评估和离线评估的区别和互补关系?

:离线评估在固定数据集上测试,快速、可复现,但不能反映真实用户行为。在线评估通过A/B测试在真实流量中测试,反映真实效果,但周期长、需要流量。互补关系:先离线评估筛选候选模型(精度、公平性),再通过在线A/B测试验证业务指标(CTR、转化率、留存率)。

Q6: Feature Store解决了什么问题?

:①训练-推理偏斜(Training-Serving Skew):确保训练和推理使用相同的特征计算逻辑②特征复用:不同模型团队共享特征③时间穿越(Time Travel):point-in-time正确获取历史特征④特征监控:追踪特征质量和分布变化⑤特征发现:统一的特征目录和文档。

Q7: 如何设计一个完整的模型监控系统?

:四个维度:①数据监控(输入特征分布、缺失率、异常值)→Evidently②服务监控(QPS、延迟、错误率、资源使用)→Prometheus+Grafana③模型监控(预测分布、置信度、准确率)→自定义指标④业务监控(转化率、收入等)→BI系统。告警策略:多级告警(Info→Warning→Critical)+告警收敛+on-call机制。

Q8: 如何处理概念漂移(Concept Drift)?

:概念漂移是X→Y映射变化,最难检测。方法:①持续收集反馈标签,定期更新模型②在线学习/增量学习③使用窗口(滑动窗口/衰减窗口)让模型更关注近期数据④集成方法:多个模型加权,新模型权重更高⑤检测:监控后验概率分布P(Y|X)随时间的变化。


✅ 学习检查清单

  • 理解三层监控指标体系(基础设施/服务/模型)
  • 能使用Prometheus+Grafana搭建监控面板
  • 理解PSI/KS检验/卡方检验等漂移检测方法
  • 能使用Evidently生成数据漂移报告
  • 理解模型性能衰退的检测方法
  • 能设计多级告警系统
  • 能设计自动重训练Pipeline
  • 理解在线评估与离线评估的关系
  • 理解Feature Store的核心价值和Feast使用
  • 能回答所有面试题

📌 下一章04-LLM工程化实践 — 大语言模型的部署优化与工程化实践