🏗️ 特征仓库与ML平台¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
学习时间:8小时 | 难度:⭐⭐⭐⭐ 中高级 | 前置知识:模型部署(Ch02)、监控(Ch03)、CI/CD(Ch05)
本章目标¶
- 理解Feature Store的核心价值与架构原理
- 掌握Feast Feature Store的实战使用
- 了解企业ML平台的架构设计
- 学会搭建端到端ML工程化体系
1. Feature Store¶
1.1 为什么需要Feature Store¶
Text Only
没有Feature Store的问题:
├── 训练/推理特征不一致 → 训练-服务偏差(Training-Serving Skew)
├── 特征重复计算 → 团队间大量重复工作
├── 特征质量无保障 → 无监控、无血缘追踪
└── 实时特征困难 → 高延迟的在线服务
Feature Store解决:
├── 统一特征定义 → 训练和推理用同一份特征
├── 特征复用 → 跨团队共享特征(特征市场)
├── 质量保障 → 自动统计、漂移检测、血缘图
└── 在线/离线双模式 → 批量训练 + 低延迟推理
1.2 Feature Store架构¶
Text Only
┌────────────────────────────────────┐
│ Feature Store │
│ │
数据源──────▶│ ┌──────────┐ ┌──────────┐ │
(DB/Kafka/ │ │ 离线存储 │ │ 在线存储 │ │
文件) │ │(Parquet/ │ │(Redis/ │ │
│ │ BigQuery) │ │ DynamoDB) │ │
│ └─────┬────┘ └─────┬────┘ │
│ │ │ │
│ ┌─────┴───────────────┴────┐ │
│ │ 特征注册表 │ │
│ │ (定义/版本/血缘/统计) │ │
│ └──────────────────────────┘ │
└────────┬───────────┬───────────────┘
│ │
训练Pipeline 推理服务
(离线特征) (在线特征)
1.3 工具对比¶
| Feature Store | 定位 | 在线/离线 | 部署方式 | 适合场景 |
|---|---|---|---|---|
| Feast | 开源 | 两者 | 自托管/云 | 中小团队 |
| Tecton | 商业 | 两者 | SaaS/VPC | 企业级 |
| Hopsworks | 开源+商业 | 两者 | 自托管/云 | 全栈ML |
| Databricks FS | 商业 | 两者 | Databricks | Spark生态 |
| AWS SageMaker FS | 云原生 | 两者 | AWS | AWS用户 |
2. Feast实战¶
2.1 项目初始化¶
Python
# feature_store/feature_repo/feature_definition.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String
# 1. 定义实体(业务主键)
user = Entity(
name="user_id",
join_keys=["user_id"],
description="用户唯一标识"
)
product = Entity(
name="product_id",
join_keys=["product_id"],
description="商品唯一标识"
)
# 2. 定义数据源
user_stats_source = FileSource(
path="data/user_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp"
)
# 3. 定义特征视图
user_stats_fv = FeatureView(
name="user_stats",
entities=[user],
ttl=timedelta(days=1), # 特征有效期
schema=[
Field(name="total_orders", dtype=Int64, description="历史总订单数"),
Field(name="avg_order_amount", dtype=Float32, description="平均订单金额"),
Field(name="days_since_last_order", dtype=Int64, description="距上次下单天数"),
Field(name="user_segment", dtype=String, description="用户分层(高/中/低)"),
],
source=user_stats_source,
online=True, # 同步到在线存储
tags={"team": "recommendation", "version": "v2"},
)
2.2 训练时获取特征¶
Python
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path="feature_repo/")
# 训练集: 用实体DataFrame获取历史特征(Point-in-Time Join)
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003, 1004],
"event_timestamp": pd.to_datetime([
"2026-01-15", "2026-01-16", "2026-01-17", "2026-01-18"
])
})
# 获取特征(自动处理时间穿越,避免数据泄露)
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:total_orders",
"user_stats:avg_order_amount",
"user_stats:days_since_last_order",
"user_stats:user_segment",
]
).to_df()
print(training_df)
# user_id | event_timestamp | total_orders | avg_order_amount | ...
2.3 推理时获取特征¶
Python
# 在线推理: 低延迟获取最新特征
online_features = store.get_online_features(
features=[
"user_stats:total_orders",
"user_stats:avg_order_amount",
"user_stats:days_since_last_order",
],
entity_rows=[
{"user_id": 1001},
{"user_id": 1002},
]
).to_dict()
print(online_features)
# {'user_id': [1001, 1002], 'total_orders': [42, 7], ...}
2.4 物化(同步到在线存储)¶
Bash
# 将离线特征同步到在线存储(Redis/DynamoDB)
feast materialize 2026-01-01T00:00:00 2026-02-01T00:00:00
# 增量物化
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S") # $()命令替换:执行命令并获取输出
3. 企业ML平台架构¶
3.1 平台全景¶
Text Only
┌─────────────────────────────────────────────────────────────┐
│ ML平台架构 │
├─────────────────────────────────────────────────────────────┤
│ 用户界面层 │
│ ├── 实验Dashboard (MLflow UI / W&B) │
│ ├── 特征市场 (Feature Catalog) │
│ ├── 模型注册表 (Model Registry) │
│ └── 监控看板 (Grafana) │
├─────────────────────────────────────────────────────────────┤
│ 编排层 │
│ ├── 流水线编排 (Kubeflow / Airflow) │
│ ├── CI/CD (GitHub Actions / GitLab CI) │
│ └── 自动重训练触发器 │
├─────────────────────────────────────────────────────────────┤
│ 核心服务层 │
│ ├── 实验管理 (MLflow Tracking) │
│ ├── Feature Store (Feast) │
│ ├── 模型服务 (Triton / vLLM / TorchServe) │
│ └── 监控 (Prometheus + Evidently) │
├─────────────────────────────────────────────────────────────┤
│ 基础设施层 │
│ ├── 计算 (Kubernetes + GPU节点池) │
│ ├── 存储 (S3/MinIO + Redis + PostgreSQL) │
│ └── 网络 (Istio服务网格 + API Gateway) │
└─────────────────────────────────────────────────────────────┘
3.2 ML平台配置示例¶
YAML
# ml-platform-config.yaml
platform:
name: "ml-platform-v1"
experiment_tracking:
tool: "mlflow"
backend_store: "postgresql://mlflow:pass@db:5432/mlflow"
artifact_store: "s3://ml-artifacts"
feature_store:
tool: "feast"
online_store:
type: "redis"
connection_string: "redis://redis:6379"
offline_store:
type: "file" # 生产环境用BigQuery/Redshift
model_serving:
default_engine: "triton"
autoscaling:
min_replicas: 2
max_replicas: 20
target_gpu_utilization: 70
canary:
enabled: true
initial_weight: 10 # 新模型初始流量10%
promotion_threshold: 0.95 # 准确率>95%提升到100%
monitoring:
drift_detection:
tool: "evidently"
check_interval: "1h"
alert_threshold: 0.3
metrics_backend: "prometheus"
dashboard: "grafana"
pipeline:
orchestrator: "kubeflow"
retrain_triggers:
- type: "drift"
threshold: 0.3
- type: "schedule"
cron: "0 2 * * 0" # 每周日2点
- type: "performance"
min_accuracy: 0.85
4. 端到端工程化实践¶
4.1 从训练到部署的完整Python接口¶
Python
class MLPlatformClient:
"""统一ML平台客户端"""
def __init__(self, config_path="ml-platform-config.yaml"):
import yaml
with open(config_path) as f: # with自动管理资源,确保文件正确关闭
self.config = yaml.safe_load(f)
self.feature_store = FeatureStore(repo_path="feature_repo/")
self.mlflow_client = mlflow.tracking.MlflowClient()
def get_training_data(self, entity_df, feature_list):
"""从Feature Store获取训练数据"""
return self.feature_store.get_historical_features(
entity_df=entity_df,
features=feature_list
).to_df()
def train_and_log(self, model, X_train, y_train, params):
"""训练并记录到MLflow"""
with mlflow.start_run():
mlflow.log_params(params)
model.fit(X_train, y_train)
# 记录指标
from sklearn.metrics import accuracy_score
train_acc = accuracy_score(y_train, model.predict(X_train))
mlflow.log_metric("train_accuracy", train_acc)
# 记录模型
mlflow.sklearn.log_model(model, "model")
return mlflow.active_run().info.run_id
def evaluate_and_promote(self, run_id, test_df, model_name):
"""评估并决定是否推到生产"""
# 加载模型
model_uri = f"runs:/{run_id}/model"
model = mlflow.sklearn.load_model(model_uri)
X_test = test_df.drop("label", axis=1)
y_test = test_df["label"]
acc = accuracy_score(y_test, model.predict(X_test))
# 注册模型
mv = mlflow.register_model(model_uri, model_name)
# 质量门禁
if acc >= 0.85:
# 使用别名系统(MLflow 2.9.0+ 推荐,替代已废弃的 Stage 系统)
self.mlflow_client.set_registered_model_alias(
name=model_name,
alias="champion",
version=mv.version
)
print(f"✅ 模型 v{mv.version} 推到生产 (acc={acc:.4f})")
else:
print(f"❌ 模型 v{mv.version} 未通过门禁 (acc={acc:.4f})")
return acc
5. 练习题¶
代码实践¶
- 入门:用Feast定义一个用户特征视图,完成离线特征获取
- 进阶:实现Feature Store + MLflow + 质量门禁的完整训练流程
- 高级:搭建一个迷你ML平台(Feature Store + 训练Pipeline + 模型服务 + 监控告警)
面试题¶
- Feature Store解决了什么核心问题?训练-服务偏差如何产生?
- Point-in-Time Join是什么?为什么对避免数据泄露很重要?
- 在线存储和离线存储的选型考虑是什么?
- 如何设计一个ML平台的模型审批流程?
- Feature Store的特征血缘追踪如何实现?
最后更新:2026年2月