🔧 监控与持续优化¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
学习目标:掌握模型监控指标体系、数据漂移检测、性能衰退检测、自动重训练Pipeline和Feature Store实践 预计时长:10-12小时 前置知识:模型部署基础(第2章)、统计学基础、Python编程
📎 交叉引用(本章聚焦ML/AI模型监控,基础设施监控见以下教程): - 基础设施监控(Prometheus/Grafana) → 云原生/监控告警系统 - eBPF可观测性 → 云原生/eBPF与可观测性 - LLM推理服务监控 → LLM学习/推理服务部署
📋 本章概览¶
模型上线后并非一劳永逸——真实世界的数据在持续变化,模型性能随时间衰退是必然现象。建立完善的监控和自动化持续优化系统,是MLOps成熟度的关键标志。
生产环境模型监控闭环:
┌───────────┐ ┌──────────┐ ┌──────────┐
│ 推理请求 │────▶│ 推理服务 │────▶│ 预测结果 │
└───────────┘ └──────────┘ └──────────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 输入监控 │ │ 服务监控 │ │ 输出监控 │
│ 数据漂移 │ │ 延迟/吞吐 │ │ 预测分布 │
└──────────┘ └──────────┘ └──────────┘
│ │ │
└────────────────┼──────────────────┘
▼
┌──────────────┐
│ 告警系统 │
│ Prometheus │
└──────────────┘
│
▼
┌──────────────┐
│ 自动重训练 │
│ Pipeline │
└──────────────┘
上图展示了Grafana监控仪表板,显示了HTTP请求的各种指标,包括请求数量、成功率、延迟百分位数、HTTP状态码和连接统计。在MLOps中,类似的仪表板用于监控模型推理服务的性能、延迟、吞吐量和错误率等关键指标,帮助团队及时发现和解决问题。
一、模型监控指标体系¶
1.1 三层监控指标¶
"""
模型监控指标体系
分为:基础设施层、服务层、业务层
"""
from dataclasses import dataclass, field
import time
import numpy as np
from collections import deque
@dataclass
class InfraMetrics:
"""基础设施层指标"""
cpu_utilization: float = 0.0 # CPU利用率 %
gpu_utilization: float = 0.0 # GPU利用率 %
gpu_memory_used: float = 0.0 # GPU显存使用 GB
memory_used: float = 0.0 # 内存使用 GB
disk_io_read: float = 0.0 # 磁盘读取 MB/s
network_bandwidth: float = 0.0 # 网络带宽 MB/s
@dataclass
class ServiceMetrics:
"""服务层指标"""
qps: float = 0.0 # 每秒查询数
latency_p50: float = 0.0 # P50延迟 ms
latency_p95: float = 0.0 # P95延迟 ms
latency_p99: float = 0.0 # P99延迟 ms
error_rate: float = 0.0 # 错误率 %
timeout_rate: float = 0.0 # 超时率 %
batch_size_avg: float = 0.0 # 平均批大小
queue_depth: int = 0 # 请求队列深度
@dataclass
class ModelMetrics:
"""模型/业务层指标"""
prediction_distribution: dict = field(default_factory=dict) # 预测分布
confidence_mean: float = 0.0 # 平均置信度
confidence_below_threshold: float = 0.0 # 低置信度比例
input_feature_stats: dict = field(default_factory=dict) # 输入特征统计
output_drift_score: float = 0.0 # 输出漂移分数
data_quality_score: float = 0.0 # 数据质量分数
1.2 Prometheus指标采集¶
"""
使用prometheus_client暴露推理服务指标
"""
from prometheus_client import (
Counter, Histogram, Gauge, Summary, # Counter计数器:统计元素出现次数
CollectorRegistry, generate_latest
)
import time
# 创建指标注册表
registry = CollectorRegistry()
# ===== 定义Prometheus指标 =====
# 请求计数
REQUEST_COUNT = Counter(
"model_request_total",
"Total number of inference requests",
["model_name", "model_version", "status"],
registry=registry
)
# 请求延迟
REQUEST_LATENCY = Histogram(
"model_request_latency_seconds",
"Inference request latency in seconds",
["model_name", "model_version"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
registry=registry
)
# 预测置信度分布
PREDICTION_CONFIDENCE = Histogram(
"model_prediction_confidence",
"Prediction confidence distribution",
["model_name", "predicted_class"],
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99],
registry=registry
)
# 批大小
BATCH_SIZE = Histogram(
"model_batch_size",
"Inference batch size",
["model_name"],
buckets=[1, 2, 4, 8, 16, 32, 64],
registry=registry
)
# 当前GPU显存使用
GPU_MEMORY = Gauge(
"model_gpu_memory_bytes",
"GPU memory usage in bytes",
["gpu_id"],
registry=registry
)
# 模型加载状态
MODEL_LOADED = Gauge(
"model_loaded_info",
"Model loaded information",
["model_name", "model_version"],
registry=registry
)
class MetricsMiddleware:
"""推理服务指标中间件"""
def __init__(self, model_name: str, model_version: str):
self.model_name = model_name
self.model_version = model_version
MODEL_LOADED.labels(model_name, model_version).set(1)
def record_request(self, status: str, latency: float,
batch_size: int, predictions: list, confidences: list):
"""记录一次推理请求的指标"""
# 请求计数
REQUEST_COUNT.labels(
self.model_name, self.model_version, status
).inc()
# 延迟
REQUEST_LATENCY.labels(
self.model_name, self.model_version
).observe(latency)
# 批大小
BATCH_SIZE.labels(self.model_name).observe(batch_size)
# 预测置信度
for pred, conf in zip(predictions, confidences): # zip并行遍历多个可迭代对象
PREDICTION_CONFIDENCE.labels(
self.model_name, str(pred)
).observe(conf)
# FastAPI集成
from fastapi import FastAPI, Request
from fastapi.responses import Response
app = FastAPI()
metrics = MetricsMiddleware("text-classifier", "v1.2")
@app.get("/metrics")
def prometheus_metrics():
"""暴露Prometheus指标端点"""
return Response(
content=generate_latest(registry),
media_type="text/plain"
)
@app.post("/predict")
async def predict(request: Request): # async def定义异步函数;用await调用
start_time = time.time()
try:
# ... 推理逻辑 ...
predictions = [1, 0, 2]
confidences = [0.95, 0.87, 0.72]
latency = time.time() - start_time
metrics.record_request(
status="success",
latency=latency,
batch_size=3,
predictions=predictions,
confidences=confidences
)
return {"predictions": predictions}
except Exception as e:
latency = time.time() - start_time
metrics.record_request("error", latency, 0, [], [])
raise
1.3 Grafana Dashboard配置¶
{
"dashboard": {
"title": "Model Inference Monitoring",
"panels": [
{
"title": "Request Rate (QPS)",
"type": "graph",
"targets": [
{
"expr": "rate(model_request_total{model_name='text-classifier'}[5m])",
"legendFormat": "{{status}}"
}
]
},
{
"title": "Latency Percentiles",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.50, rate(model_request_latency_seconds_bucket[5m]))",
"legendFormat": "P50"
},
{
"expr": "histogram_quantile(0.95, rate(model_request_latency_seconds_bucket[5m]))",
"legendFormat": "P95"
},
{
"expr": "histogram_quantile(0.99, rate(model_request_latency_seconds_bucket[5m]))",
"legendFormat": "P99"
}
]
},
{
"title": "Error Rate",
"type": "stat",
"targets": [
{
"expr": "rate(model_request_total{status='error'}[5m]) / rate(model_request_total[5m])"
}
]
},
{
"title": "Prediction Confidence Distribution",
"type": "heatmap",
"targets": [
{
"expr": "rate(model_prediction_confidence_bucket[5m])"
}
]
}
]
}
}
二、数据漂移检测¶
2.1 漂移类型¶
数据漂移分类:
┌──────────────────────────────────────────────────────┐
│ 协变量漂移(Covariate Shift) │
│ P_train(X) ≠ P_serve(X) │
│ 特征分布变化,但X→Y的映射没变 │
│ 例:训练集主要是白天图片,线上出现大量夜间图片 │
├──────────────────────────────────────────────────────┤
│ 概念漂移(Concept Drift) │
│ P_train(Y|X) ≠ P_serve(Y|X) │
│ 特征与标签的关系发生变化 │
│ 例:疫情导致购买行为模式剧变 │
├──────────────────────────────────────────────────────┤
│ 标签漂移(Label Shift / Prior Probability Shift) │
│ P_train(Y) ≠ P_serve(Y) │
│ 标签分布变化 │
│ 例:垃圾邮件比例从5%上升到20% │
└──────────────────────────────────────────────────────┘
2.2 统计检测方法¶
"""
数据漂移检测方法实现
"""
import numpy as np
from scipy import stats
class DriftDetector:
"""数据漂移检测器"""
@staticmethod # @staticmethod静态方法,不需要实例
def psi(expected: np.ndarray, actual: np.ndarray, bins: int = 10) -> float:
"""
PSI (Population Stability Index) - 群体稳定性指标
PSI < 0.1: 无显著漂移
0.1 ≤ PSI < 0.2: 中等漂移,需关注
PSI ≥ 0.2: 显著漂移,需采取行动
"""
# 计算分桶边界(基于期望分布)
breakpoints = np.linspace(
min(expected.min(), actual.min()),
max(expected.max(), actual.max()),
bins + 1
)
# 计算各桶的比例
expected_counts = np.histogram(expected, breakpoints)[0]
actual_counts = np.histogram(actual, breakpoints)[0]
# 避免零值
expected_pct = np.clip(expected_counts / len(expected), 1e-6, None)
actual_pct = np.clip(actual_counts / len(actual), 1e-6, None)
# PSI = Σ (actual% - expected%) * ln(actual% / expected%)
psi_value = np.sum(
(actual_pct - expected_pct) * np.log(actual_pct / expected_pct)
)
return psi_value
@staticmethod
def ks_test(reference: np.ndarray, current: np.ndarray) -> tuple[float, float]:
"""
KS检验 (Kolmogorov-Smirnov Test)
适用于连续特征的分布比较
返回: (统计量, p值)
p值 < 0.05 表示两个分布有显著差异
"""
statistic, p_value = stats.ks_2samp(reference, current)
return statistic, p_value
@staticmethod
def chi_square_test(reference: np.ndarray, current: np.ndarray) -> tuple[float, float]:
"""
卡方检验 - 适用于类别特征
"""
# 获取所有类别
categories = np.unique(np.concatenate([reference, current]))
ref_counts = np.array([np.sum(reference == c) for c in categories])
cur_counts = np.array([np.sum(current == c) for c in categories])
# 归一化
ref_freq = ref_counts / ref_counts.sum()
cur_freq = cur_counts / cur_counts.sum()
# 期望频率基于参考分布
expected = ref_freq * cur_counts.sum()
expected = np.clip(expected, 1e-6, None)
statistic, p_value = stats.chisquare(cur_counts, f_exp=expected)
return statistic, p_value
@staticmethod
def js_divergence(p: np.ndarray, q: np.ndarray, bins: int = 50) -> float:
"""
JS散度 (Jensen-Shannon Divergence)
值域[0, 1],0表示分布相同
"""
# 将连续值离散化
breakpoints = np.linspace(
min(p.min(), q.min()),
max(p.max(), q.max()),
bins + 1
)
p_hist = np.histogram(p, breakpoints)[0].astype(float)
q_hist = np.histogram(q, breakpoints)[0].astype(float)
# 归一化
p_hist = p_hist / p_hist.sum()
q_hist = q_hist / q_hist.sum()
# 避免零值
p_hist = np.clip(p_hist, 1e-10, None)
q_hist = np.clip(q_hist, 1e-10, None)
# M = (P + Q) / 2
m = 0.5 * (p_hist + q_hist)
js = 0.5 * stats.entropy(p_hist, m) + 0.5 * stats.entropy(q_hist, m)
return js
# ===== 使用示例 =====
np.random.seed(42)
# 模拟参考分布(训练数据)和当前分布(线上数据)
reference_data = np.random.normal(0, 1, 10000) # 训练时的特征分布
current_data_ok = np.random.normal(0.05, 1.02, 5000) # 轻微漂移
current_data_bad = np.random.normal(0.5, 1.5, 5000) # 严重漂移
detector = DriftDetector()
print("=" * 50)
print("轻微漂移场景:")
print(f" PSI: {detector.psi(reference_data, current_data_ok):.4f}")
ks_stat, ks_p = detector.ks_test(reference_data, current_data_ok)
print(f" KS Test: statistic={ks_stat:.4f}, p-value={ks_p:.4f}")
print(f" JS Divergence: {detector.js_divergence(reference_data, current_data_ok):.4f}")
print("\n严重漂移场景:")
print(f" PSI: {detector.psi(reference_data, current_data_bad):.4f}")
ks_stat, ks_p = detector.ks_test(reference_data, current_data_bad)
print(f" KS Test: statistic={ks_stat:.4f}, p-value={ks_p:.4f}")
print(f" JS Divergence: {detector.js_divergence(reference_data, current_data_bad):.4f}")
2.3 Evidently AI - 数据漂移监控¶
"""
使用Evidently进行数据漂移检测与报告生成
pip install evidently
"""
import pandas as pd
import numpy as np
# 创建模拟数据
np.random.seed(42)
n_samples = 1000
# 参考数据(训练时的分布)
reference_df = pd.DataFrame({
"feature_age": np.random.normal(35, 10, n_samples),
"feature_income": np.random.lognormal(10, 1, n_samples),
"feature_score": np.random.uniform(0, 100, n_samples),
"category": np.random.choice(["A", "B", "C", "D"], n_samples, p=[0.4, 0.3, 0.2, 0.1]),
"prediction": np.random.choice([0, 1], n_samples, p=[0.7, 0.3]),
"target": np.random.choice([0, 1], n_samples, p=[0.7, 0.3])
})
# 当前数据(线上数据,模拟漂移)
current_df = pd.DataFrame({
"feature_age": np.random.normal(38, 12, n_samples), # 均值和方差漂移
"feature_income": np.random.lognormal(10.5, 1.2, n_samples), # 分布漂移
"feature_score": np.random.uniform(10, 90, n_samples), # 范围变化
"category": np.random.choice(["A", "B", "C", "D"], n_samples, p=[0.25, 0.25, 0.3, 0.2]), # 类别漂移
"prediction": np.random.choice([0, 1], n_samples, p=[0.6, 0.4]), # 预测分布变化
"target": np.random.choice([0, 1], n_samples, p=[0.65, 0.35])
})
# ===== Evidently数据漂移报告 =====
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import (
DataDriftTable,
DatasetDriftMetric,
ColumnDriftMetric
)
# 生成数据漂移报告
drift_report = Report(metrics=[
DataDriftPreset(), # 整体数据漂移
ColumnDriftMetric(column_name="feature_age"), # 单特征漂移
])
drift_report.run(
reference_data=reference_df,
current_data=current_df
)
# 保存HTML报告
# drift_report.save_html("drift_report.html")
# 获取JSON结果(用于程序化处理)
drift_results = drift_report.as_dict()
print("Dataset drift detected:", drift_results["metrics"][0]["result"]["dataset_drift"])
# ===== Evidently数据质量报告 =====
quality_report = Report(metrics=[
DataQualityPreset()
])
quality_report.run(
reference_data=reference_df,
current_data=current_df
)
# quality_report.save_html("quality_report.html")
# ===== Evidently监控(实时/批量)=====
from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataStabilityTestPreset
from evidently.tests import (
TestColumnDrift,
TestShareOfDriftedColumns,
TestNumberOfDriftedColumns
)
# 定义测试套件
test_suite = TestSuite(tests=[
DataDriftTestPreset(),
TestColumnDrift(column_name="feature_age", stattest="ks", stattest_threshold=0.05),
TestShareOfDriftedColumns(lt=0.3), # 漂移特征比例不超过30%
])
test_suite.run(reference_data=reference_df, current_data=current_df)
# 获取测试结果
# 获取测试结果
test_results = test_suite.as_dict()
# all()检查所有测试是否通过:生成器逐一产出布尔值,遇到第一个False立即短路返回False
all_passed = all(
test["status"] == "SUCCESS"
for test in test_results["tests"]
)
print(f"All drift tests passed: {all_passed}")
三、模型性能衰退检测¶
3.1 性能衰退检测器¶
"""
模型性能衰退检测
支持:滑动窗口检测、ADWIN、Page-Hinkley
"""
import numpy as np
from collections import deque
class SlidingWindowDetector:
"""
滑动窗口性能检测器
比较最近窗口与参考窗口的指标差异
"""
def __init__(
self,
window_size: int = 1000,
reference_size: int = 5000,
threshold: float = 0.05,
metric_name: str = "accuracy"
):
self.window_size = window_size
self.reference_size = reference_size
self.threshold = threshold
self.metric_name = metric_name
self.reference_metrics: deque = deque(maxlen=reference_size)
self.current_metrics: deque = deque(maxlen=window_size)
self.alerts: list = []
def update(self, metric_value: float, timestamp: float | None = None):
"""更新指标值"""
import time
ts = timestamp or time.time()
self.current_metrics.append(metric_value)
# 参考窗口满后开始检测
if len(self.reference_metrics) >= self.reference_size:
if len(self.current_metrics) >= self.window_size:
self._check_degradation(ts)
self.reference_metrics.append(metric_value)
def _check_degradation(self, timestamp: float):
"""检查是否发生性能衰退"""
ref_mean = np.mean(list(self.reference_metrics))
cur_mean = np.mean(list(self.current_metrics))
# 相对下降幅度
degradation = (ref_mean - cur_mean) / (ref_mean + 1e-10)
if degradation > self.threshold:
alert = {
"timestamp": timestamp,
"metric": self.metric_name,
"reference_mean": ref_mean,
"current_mean": cur_mean,
"degradation_pct": degradation * 100,
"severity": "HIGH" if degradation > 0.1 else "MEDIUM"
}
self.alerts.append(alert)
return True
return False
class PageHinkleyDetector:
"""
Page-Hinkley变点检测
用于检测数据流中的分布变化点
"""
def __init__(self, delta: float = 0.005, threshold: float = 50, alpha: float = 0.9999):
self.delta = delta # 容许的变化量
self.threshold = threshold # 检测阈值
self.alpha = alpha # 遗忘因子
self.sum = 0.0
self.x_mean = 0.0
self.count = 0
self.min_sum = float('inf')
def update(self, value: float) -> bool:
"""
输入新值,返回是否检测到变化
"""
self.count += 1
# 更新均值
self.x_mean = self.x_mean + (value - self.x_mean) / self.count
# 更新累积和
self.sum = self.alpha * self.sum + (value - self.x_mean - self.delta)
# 更新最小累积和
self.min_sum = min(self.min_sum, self.sum)
# 检测
ph_value = self.sum - self.min_sum
if ph_value > self.threshold:
# 检测到变化,重置
self.sum = 0.0
self.min_sum = float('inf')
return True
return False
# ===== 使用示例 =====
np.random.seed(42)
# 模拟准确率指标流
# 前3000个:稳定在0.92附近
# 后2000个:衰退到0.85附近
metrics_stream = np.concatenate([
np.random.normal(0.92, 0.02, 3000),
np.random.normal(0.85, 0.03, 2000)
])
# 滑动窗口检测
sw_detector = SlidingWindowDetector(
window_size=500, reference_size=2000, threshold=0.03
)
for i, metric in enumerate(metrics_stream): # enumerate同时获取索引和值
sw_detector.update(metric)
if sw_detector.alerts:
print(f"Sliding Window alerts: {len(sw_detector.alerts)}")
for alert in sw_detector.alerts[:3]: # 切片操作:[start:end:step]提取子序列
print(f" Degradation: {alert['degradation_pct']:.1f}%, "
f"Ref: {alert['reference_mean']:.4f}, Cur: {alert['current_mean']:.4f}")
# Page-Hinkley检测
ph_detector = PageHinkleyDetector(delta=0.005, threshold=20)
for i, metric in enumerate(metrics_stream):
if ph_detector.update(metric):
print(f"Page-Hinkley: Change detected at step {i}")
break
3.2 预测分布监控¶
"""
监控模型预测输出的分布变化
"""
import numpy as np
from scipy.stats import entropy
class PredictionMonitor:
"""预测输出分布监控"""
def __init__(self, num_classes: int, window_size: int = 10000):
self.num_classes = num_classes
self.window_size = window_size
self.reference_dist = None
self.predictions_buffer = deque(maxlen=window_size)
self.confidences_buffer = deque(maxlen=window_size)
def set_reference(self, predictions: np.ndarray, confidences: np.ndarray):
"""设置参考分布(从验证集获取)"""
# 预测类别分布
self.reference_dist = np.bincount(predictions, minlength=self.num_classes)
self.reference_dist = self.reference_dist / self.reference_dist.sum()
# 参考置信度统计
self.reference_conf_mean = np.mean(confidences)
self.reference_conf_std = np.std(confidences)
def update(self, prediction: int, confidence: float):
"""更新监控数据"""
self.predictions_buffer.append(prediction)
self.confidences_buffer.append(confidence)
def check(self) -> dict:
"""检查当前分布是否异常"""
if len(self.predictions_buffer) < self.window_size // 2:
return {"status": "insufficient_data"}
preds = np.array(list(self.predictions_buffer))
confs = np.array(list(self.confidences_buffer))
# 1. 预测分布偏移
current_dist = np.bincount(preds, minlength=self.num_classes)
current_dist = current_dist / current_dist.sum()
kl_div = entropy(current_dist + 1e-10, self.reference_dist + 1e-10)
# 2. 置信度异常
conf_mean_shift = abs(np.mean(confs) - self.reference_conf_mean)
low_conf_ratio = np.mean(confs < 0.5)
# 3. 判断告警
alerts = []
if kl_div > 0.1:
alerts.append(f"Prediction distribution drift (KL={kl_div:.4f})")
if conf_mean_shift > 0.1:
alerts.append(f"Confidence mean shift ({conf_mean_shift:.4f})")
if low_conf_ratio > 0.3:
alerts.append(f"High low-confidence ratio ({low_conf_ratio:.2%})")
return {
"status": "alert" if alerts else "healthy",
"prediction_distribution": current_dist.tolist(),
"kl_divergence": kl_div,
"confidence_mean": np.mean(confs),
"low_confidence_ratio": low_conf_ratio,
"alerts": alerts
}
# 使用示例
monitor = PredictionMonitor(num_classes=5)
# 设置参考分布
ref_preds = np.random.choice(5, 5000, p=[0.3, 0.25, 0.2, 0.15, 0.1])
ref_confs = np.random.beta(5, 1, 5000) # 高置信度
monitor.set_reference(ref_preds, ref_confs)
# 模拟正常流量
for _ in range(5000):
pred = np.random.choice(5, p=[0.3, 0.25, 0.2, 0.15, 0.1])
conf = np.random.beta(5, 1)
monitor.update(pred, conf)
status = monitor.check()
print(f"Normal traffic - Status: {status['status']}")
# 模拟异常流量(分布偏移)
monitor_bad = PredictionMonitor(num_classes=5)
monitor_bad.set_reference(ref_preds, ref_confs)
for _ in range(5000):
pred = np.random.choice(5, p=[0.1, 0.1, 0.4, 0.2, 0.2]) # 分布变化
conf = np.random.beta(2, 3) # 低置信度
monitor_bad.update(pred, conf)
status_bad = monitor_bad.check()
print(f"Drifted traffic - Status: {status_bad['status']}")
for alert in status_bad.get("alerts", []):
print(f" Alert: {alert}")
四、告警与自动重训练Pipeline¶
4.1 告警系统¶
"""
多级告警系统
"""
import json
from enum import Enum
from datetime import datetime
from dataclasses import dataclass
from collections.abc import Callable
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class Alert:
"""告警定义"""
name: str
severity: AlertSeverity
message: str
metric_name: str
metric_value: float
threshold: float
timestamp: datetime = None
def __post_init__(self):
self.timestamp = self.timestamp or datetime.now()
class AlertRule:
"""告警规则"""
def __init__(self, name: str, metric_name: str, threshold: float,
comparison: str = "gt", severity: AlertSeverity = AlertSeverity.WARNING,
cooldown_seconds: int = 300):
self.name = name
self.metric_name = metric_name
self.threshold = threshold
self.comparison = comparison # gt, lt, gte, lte
self.severity = severity
self.cooldown = cooldown_seconds
self.last_alert_time = None
def evaluate(self, metric_value: float) -> Alert | None:
"""评估是否触发告警"""
now = datetime.now()
# 冷却期检查
if self.last_alert_time:
elapsed = (now - self.last_alert_time).total_seconds()
if elapsed < self.cooldown:
return None
# 条件判断
triggered = False
if self.comparison == "gt" and metric_value > self.threshold:
triggered = True
elif self.comparison == "lt" and metric_value < self.threshold:
triggered = True
elif self.comparison == "gte" and metric_value >= self.threshold:
triggered = True
elif self.comparison == "lte" and metric_value <= self.threshold:
triggered = True
if triggered:
self.last_alert_time = now
return Alert(
name=self.name,
severity=self.severity,
message=f"{self.metric_name} ({metric_value:.4f}) "
f"{'exceeded' if 'g' in self.comparison else 'below'} "
f"threshold ({self.threshold})",
metric_name=self.metric_name,
metric_value=metric_value,
threshold=self.threshold
)
return None
class AlertManager:
"""告警管理器"""
def __init__(self):
self.rules: list[AlertRule] = []
self.handlers: list[Callable] = []
self.alert_history: list[Alert] = []
def add_rule(self, rule: AlertRule):
self.rules.append(rule)
def add_handler(self, handler: Callable):
"""添加告警处理器(邮件/Slack/钉钉/PagerDuty)"""
self.handlers.append(handler)
def evaluate(self, metrics: dict):
"""评估所有规则"""
for rule in self.rules:
if rule.metric_name in metrics:
alert = rule.evaluate(metrics[rule.metric_name])
if alert:
self.alert_history.append(alert)
for handler in self.handlers:
handler(alert)
# ===== 定义告警规则 =====
alert_manager = AlertManager()
# 错误率告警
alert_manager.add_rule(AlertRule(
name="high_error_rate",
metric_name="error_rate",
threshold=0.01,
comparison="gt",
severity=AlertSeverity.CRITICAL,
cooldown_seconds=600
))
# 延迟告警
alert_manager.add_rule(AlertRule(
name="high_latency",
metric_name="p99_latency_ms",
threshold=500,
comparison="gt",
severity=AlertSeverity.WARNING
))
# 数据漂移告警
alert_manager.add_rule(AlertRule(
name="data_drift",
metric_name="drift_psi",
threshold=0.2,
comparison="gt",
severity=AlertSeverity.WARNING
))
# 准确率衰退告警
alert_manager.add_rule(AlertRule(
name="accuracy_degradation",
metric_name="accuracy",
threshold=0.85,
comparison="lt",
severity=AlertSeverity.CRITICAL
))
# 添加告警处理器
def console_handler(alert: Alert):
print(f"[{alert.severity.value.upper()}] {alert.name}: {alert.message}")
def slack_handler(alert: Alert):
"""发送Slack告警(示例)"""
# import requests
# webhook_url = "https://hooks.slack.com/services/xxx"
# requests.post(webhook_url, json={
# "text": f":warning: *{alert.name}*\n{alert.message}"
# })
pass
alert_manager.add_handler(console_handler)
alert_manager.add_handler(slack_handler)
# 评估指标
alert_manager.evaluate({
"error_rate": 0.02,
"p99_latency_ms": 320,
"drift_psi": 0.25,
"accuracy": 0.83
})
4.2 自动重训练Pipeline¶
"""
自动重训练Pipeline
检测到漂移/衰退后自动触发模型重训练
"""
from dataclasses import dataclass
from enum import Enum
import time
class PipelineStatus(Enum):
IDLE = "idle"
TRIGGERED = "triggered"
TRAINING = "training"
VALIDATING = "validating"
DEPLOYING = "deploying"
COMPLETED = "completed"
FAILED = "failed"
@dataclass # 自动生成__init__等方法
class RetrainingConfig:
"""重训练配置"""
trigger_conditions: dict = None
min_retrain_interval_hours: int = 24
validation_threshold: float = 0.85
auto_deploy: bool = False
max_retries: int = 2
def __post_init__(self):
if self.trigger_conditions is None:
self.trigger_conditions = {
"accuracy_below": 0.85,
"drift_psi_above": 0.2,
"error_rate_above": 0.05
}
class AutoRetrainingPipeline:
"""
自动重训练Pipeline
流程:
1. 监控指标触发重训练
2. 收集最新训练数据
3. 执行训练
4. 验证模型质量
5. 自动/手动部署
"""
def __init__(self, config: RetrainingConfig):
self.config = config
self.status = PipelineStatus.IDLE
self.last_retrain_time = 0
self.history = []
def should_retrain(self, metrics: dict) -> bool:
"""判断是否需要重训练"""
if self.status != PipelineStatus.IDLE:
return False
# 最小间隔检查
hours_since_last = (time.time() - self.last_retrain_time) / 3600
if hours_since_last < self.config.min_retrain_interval_hours:
return False
conditions = self.config.trigger_conditions
if metrics.get("accuracy", 1.0) < conditions.get("accuracy_below", 0):
return True
if metrics.get("drift_psi", 0) > conditions.get("drift_psi_above", 1):
return True
if metrics.get("error_rate", 0) > conditions.get("error_rate_above", 1):
return True
return False
def run(self, metrics: dict):
"""执行重训练Pipeline"""
if not self.should_retrain(metrics):
return
run_info = {
"trigger_time": time.time(),
"trigger_metrics": metrics.copy(),
"steps": []
}
try: # try/except捕获异常
# Step 1: 数据准备
self.status = PipelineStatus.TRIGGERED
print("[Pipeline] Step 1: Preparing training data...")
run_info["steps"].append("data_preparation")
# self._prepare_data()
# Step 2: 训练
self.status = PipelineStatus.TRAINING
print("[Pipeline] Step 2: Training model...")
run_info["steps"].append("training")
# model = self._train_model()
# Step 3: 验证
self.status = PipelineStatus.VALIDATING
print("[Pipeline] Step 3: Validating model...")
validation_score = 0.91 # 模拟验证分数
run_info["validation_score"] = validation_score
if validation_score < self.config.validation_threshold:
raise ValueError(
f"Validation score {validation_score} below threshold "
f"{self.config.validation_threshold}"
)
# Step 4: 部署
if self.config.auto_deploy:
self.status = PipelineStatus.DEPLOYING
print("[Pipeline] Step 4: Deploying model...")
# self._deploy_model(model)
else:
print("[Pipeline] Step 4: Model ready for manual deployment")
self.status = PipelineStatus.COMPLETED
self.last_retrain_time = time.time()
run_info["status"] = "completed"
print("[Pipeline] Retraining completed successfully!")
except Exception as e:
self.status = PipelineStatus.FAILED
run_info["status"] = "failed"
run_info["error"] = str(e)
print(f"[Pipeline] Failed: {e}")
finally:
self.history.append(run_info)
self.status = PipelineStatus.IDLE
# 使用示例
config = RetrainingConfig(
trigger_conditions={
"accuracy_below": 0.85,
"drift_psi_above": 0.2
},
min_retrain_interval_hours=0, # 演示用,设为0
auto_deploy=False
)
pipeline = AutoRetrainingPipeline(config)
# 模拟监控指标
current_metrics = {
"accuracy": 0.82, # 低于阈值
"drift_psi": 0.25, # 高于阈值
"error_rate": 0.01
}
pipeline.run(current_metrics)
五、在线评估与离线评估¶
5.1 评估方法对比¶
"""
在线评估 vs 离线评估 对比和实现
"""
class OfflineEvaluator:
"""
离线评估:在静态测试集上评估模型
优点:快速、可控、可复现
缺点:不能反映真实用户行为
"""
def evaluate(self, model, test_data: dict) -> dict:
"""标准离线评估"""
# predictions = model.predict(test_data["features"])
predictions = test_data.get("predictions", [])
labels = test_data.get("labels", [])
from sklearn.metrics import accuracy_score, f1_score
metrics = {
"accuracy": accuracy_score(labels, predictions),
"f1_macro": f1_score(labels, predictions, average="macro"),
"f1_weighted": f1_score(labels, predictions, average="weighted"),
}
# 分组评估(公平性)
# for group in test_data.get("groups", []):
# group_metrics = evaluate_group(predictions, labels, group)
# metrics[f"accuracy_{group}"] = group_metrics["accuracy"]
return metrics
class OnlineEvaluator:
"""
在线评估:通过线上实验评估模型
优点:反映真实效果,考虑用户交互
缺点:周期长、需要足够流量、有风险
"""
def __init__(self, experiment_name: str, metric_names: list):
self.experiment_name = experiment_name
self.metric_names = metric_names
self.control_data = []
self.treatment_data = []
def record(self, group: str, metrics: dict):
"""记录实验数据"""
if group == "control":
self.control_data.append(metrics)
else:
self.treatment_data.append(metrics)
def analyze(self) -> dict:
"""分析A/B测试结果"""
from scipy import stats
import numpy as np
results = {}
for metric in self.metric_names:
control_values = [d[metric] for d in self.control_data if metric in d]
treatment_values = [d[metric] for d in self.treatment_data if metric in d]
# t检验
t_stat, p_value = stats.ttest_ind(control_values, treatment_values)
control_mean = np.mean(control_values)
treatment_mean = np.mean(treatment_values)
lift = (treatment_mean - control_mean) / (control_mean + 1e-10)
results[metric] = {
"control_mean": control_mean,
"treatment_mean": treatment_mean,
"lift": lift,
"p_value": p_value,
"significant": p_value < 0.05
}
return results
# 使用示例
online_eval = OnlineEvaluator("model_v2_test", ["ctr", "conversion_rate"])
import numpy as np
np.random.seed(42)
# 模拟在线数据
for _ in range(5000):
online_eval.record("control", {
"ctr": np.random.binomial(1, 0.05),
"conversion_rate": np.random.binomial(1, 0.02)
})
online_eval.record("treatment", {
"ctr": np.random.binomial(1, 0.058), # 新模型CTR更高
"conversion_rate": np.random.binomial(1, 0.022)
})
results = online_eval.analyze()
for metric, data in results.items():
print(f"{metric}: lift={data['lift']:.2%}, p={data['p_value']:.4f}, "
f"significant={data['significant']}")
六、Feature Store概念与实践¶
6.1 Feature Store核心概念¶
Feature Store架构:
┌─────────────┐ ┌──────────────────────────────────┐
│ 数据源 │ │ Feature Store │
│ ┌─────────┐ │ │ ┌───────────┐ ┌─────────────┐ │
│ │数据库 │─┼───▶│ │Feature │ │ 在线Store │ │──▶ 推理服务
│ │流数据 │ │ │ │Pipeline │ │ (Redis) │ │ (低延迟)
│ │日志 │ │ │ │Transform │ └─────────────┘ │
│ └─────────┘ │ │ │ │ ┌─────────────┐ │
└─────────────┘ │ └───────────┘ │ 离线Store │ │──▶ 训练Pipeline
│ │ (Parquet) │ │ (批量读取)
│ └─────────────┘ │
│ ┌───────────────────────────┐ │
│ │ Feature Registry │ │
│ │ (元数据/版本/血缘/权限) │ │
│ └───────────────────────────┘ │
└──────────────────────────────────┘
6.2 Feast实践¶
"""
Feast Feature Store 实践
pip install feast
"""
# ===== 1. Feast项目定义 (feature_store.yaml) =====
"""
project: fraud_detection
registry: data/registry.db
provider: local
online_store:
type: sqlite
path: data/online_store.db
offline_store:
type: file
entity_key_serialization_version: 2
"""
# ===== 2. 特征定义 (features.py) =====
from datetime import timedelta
# from feast import Entity, FeatureView, FileSource, Field
# from feast.types import Float64, Int64, String
# 实体定义(用户ID)
# user = Entity(
# name="user_id",
# join_keys=["user_id"],
# description="User ID"
# )
# 数据源
# user_features_source = FileSource(
# path="data/user_features.parquet",
# timestamp_field="event_timestamp",
# created_timestamp_column="created_timestamp"
# )
# 特征视图
# user_feature_view = FeatureView(
# name="user_features",
# entities=[user],
# ttl=timedelta(days=1), # 特征有效期
# schema=[
# Field(name="transaction_count_7d", dtype=Int64),
# Field(name="avg_transaction_amount_7d", dtype=Float64),
# Field(name="max_transaction_amount_7d", dtype=Float64),
# Field(name="account_age_days", dtype=Int64),
# Field(name="risk_score", dtype=Float64),
# ],
# source=user_features_source
# )
# ===== 3. Feast操作 =====
"""
# 初始化(命令行)
feast init fraud_detection
cd fraud_detection
# 注册特征
feast apply
# 物化特征到在线存储
feast materialize-incremental $(date +%Y-%m-%dT%H:%M:%S)
"""
# ===== 4. 获取特征(训练时 - 离线) =====
# from feast import FeatureStore
# import pandas as pd
#
# store = FeatureStore(repo_path=".")
#
# # 获取训练特征(point-in-time join)
# entity_df = pd.DataFrame({
# "user_id": [1001, 1002, 1003],
# "event_timestamp": pd.to_datetime(["2024-01-15", "2024-01-15", "2024-01-15"])
# })
#
# training_df = store.get_historical_features(
# entity_df=entity_df,
# features=[
# "user_features:transaction_count_7d",
# "user_features:avg_transaction_amount_7d",
# "user_features:risk_score"
# ]
# ).to_df()
#
# print(training_df.head())
# ===== 5. 获取特征(推理时 - 在线) =====
# online_features = store.get_online_features(
# features=[
# "user_features:transaction_count_7d",
# "user_features:avg_transaction_amount_7d",
# "user_features:risk_score"
# ],
# entity_rows=[
# {"user_id": 1001},
# {"user_id": 1002}
# ]
# ).to_dict()
#
# print(online_features)
6.3 Feature Store选型对比¶
"""
主流Feature Store对比
"""
feature_stores = {
"Feast": {
"类型": "开源",
"在线存储": "Redis/DynamoDB/SQLite",
"离线存储": "BigQuery/Redshift/Parquet",
"适用场景": "中小团队,灵活部署",
"优点": "开源免费、支持多后端、社区活跃",
"缺点": "企业级功能需自建"
},
"Tecton": {
"类型": "商业SaaS",
"在线存储": "托管服务",
"离线存储": "托管服务",
"适用场景": "大规模企业",
"优点": "全托管、实时特征、监控完善",
"缺点": "付费、供应商锁定"
},
"Hopsworks": {
"类型": "开源+商业",
"在线存储": "RonDB",
"离线存储": "Hudi",
"适用场景": "需要完整ML平台",
"优点": "内置特征工程、GPU支持",
"缺点": "较重"
}
}
for name, info in feature_stores.items():
print(f"\n{name}:")
for key, value in info.items():
print(f" {key}: {value}")
💼 面试常考题¶
Q1: 什么是数据漂移?如何检测?¶
答:数据漂移指生产数据的分布与训练数据的分布发生偏移。分为协变量漂移(输入分布变化)、概念漂移(输入-输出关系变化)、标签漂移(输出分布变化)。检测方法:①PSI(群体稳定性指标)②KS检验③卡方检验(类别特征)④JS散度⑤使用Evidently等工具自动化检测。
Q2: PSI的计算公式和阈值是什么?¶
答:\(PSI = \sum_{i=1}^{n}(P_i^{actual} - P_i^{expected}) \times \ln(\frac{P_i^{actual}}{P_i^{expected}})\)。将数据分桶,比较每个桶中实际分布与期望分布的差异。阈值:PSI<0.1无漂移,0.1-0.2中等漂移,≥0.2严重漂移。
Q3: 模型上线后性能下降怎么排查?¶
答:按层排查:①数据层:检查输入数据质量(缺失值、异常值、分布变化)②模型层:在近期数据上离线评估,对比历史指标③服务层:检查延迟、错误率、资源使用④业务层:检查标签分布是否变化、新场景覆盖率。同时检查上下游依赖变更、特征Pipeline是否正常。
Q4: 如何设计自动重训练系统?¶
答:组成部分:①触发器(定时/指标驱动/漂移驱动)②数据收集(从Feature Store获取最新特征+标签)③训练Pipeline(自动化训练+超参搜索)④模型验证(Shadow mode/回测)⑤审批部署(自动金丝雀或人工审批)⑥监控回滚。关键:设置最小重训间隔、验证阈值、回滚机制。
Q5: 在线评估和离线评估的区别和互补关系?¶
答:离线评估在固定数据集上测试,快速、可复现,但不能反映真实用户行为。在线评估通过A/B测试在真实流量中测试,反映真实效果,但周期长、需要流量。互补关系:先离线评估筛选候选模型(精度、公平性),再通过在线A/B测试验证业务指标(CTR、转化率、留存率)。
Q6: Feature Store解决了什么问题?¶
答:①训练-推理偏斜(Training-Serving Skew):确保训练和推理使用相同的特征计算逻辑②特征复用:不同模型团队共享特征③时间穿越(Time Travel):point-in-time正确获取历史特征④特征监控:追踪特征质量和分布变化⑤特征发现:统一的特征目录和文档。
Q7: 如何设计一个完整的模型监控系统?¶
答:四个维度:①数据监控(输入特征分布、缺失率、异常值)→Evidently②服务监控(QPS、延迟、错误率、资源使用)→Prometheus+Grafana③模型监控(预测分布、置信度、准确率)→自定义指标④业务监控(转化率、收入等)→BI系统。告警策略:多级告警(Info→Warning→Critical)+告警收敛+on-call机制。
Q8: 如何处理概念漂移(Concept Drift)?¶
答:概念漂移是X→Y映射变化,最难检测。方法:①持续收集反馈标签,定期更新模型②在线学习/增量学习③使用窗口(滑动窗口/衰减窗口)让模型更关注近期数据④集成方法:多个模型加权,新模型权重更高⑤检测:监控后验概率分布P(Y|X)随时间的变化。
✅ 学习检查清单¶
- 理解三层监控指标体系(基础设施/服务/模型)
- 能使用Prometheus+Grafana搭建监控面板
- 理解PSI/KS检验/卡方检验等漂移检测方法
- 能使用Evidently生成数据漂移报告
- 理解模型性能衰退的检测方法
- 能设计多级告警系统
- 能设计自动重训练Pipeline
- 理解在线评估与离线评估的关系
- 理解Feature Store的核心价值和Feast使用
- 能回答所有面试题
📌 下一章:04-LLM工程化实践 — 大语言模型的部署优化与工程化实践
