跳转至

🏗️ 特征仓库与ML平台

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

特征仓库与ML平台架构图

学习时间:8小时 | 难度:⭐⭐⭐⭐ 中高级 | 前置知识:模型部署(Ch02)、监控(Ch03)、CI/CD(Ch05)


本章目标

  • 理解Feature Store的核心价值与架构原理
  • 掌握Feast Feature Store的实战使用
  • 了解企业ML平台的架构设计
  • 学会搭建端到端ML工程化体系

1. Feature Store

1.1 为什么需要Feature Store

Text Only
没有Feature Store的问题:
├── 训练/推理特征不一致 → 训练-服务偏差(Training-Serving Skew)
├── 特征重复计算 → 团队间大量重复工作
├── 特征质量无保障 → 无监控、无血缘追踪
└── 实时特征困难 → 高延迟的在线服务

Feature Store解决:
├── 统一特征定义 → 训练和推理用同一份特征
├── 特征复用 → 跨团队共享特征(特征市场)
├── 质量保障 → 自动统计、漂移检测、血缘图
└── 在线/离线双模式 → 批量训练 + 低延迟推理

1.2 Feature Store架构

Text Only
            ┌────────────────────────────────────┐
            │          Feature Store              │
            │                                     │
数据源──────▶│  ┌──────────┐    ┌──────────┐      │
(DB/Kafka/  │  │  离线存储  │    │  在线存储  │      │
 文件)      │  │(Parquet/  │    │(Redis/    │      │
            │  │ BigQuery) │    │ DynamoDB) │      │
            │  └─────┬────┘    └─────┬────┘      │
            │        │               │            │
            │  ┌─────┴───────────────┴────┐      │
            │  │      特征注册表           │      │
            │  │  (定义/版本/血缘/统计)    │      │
            │  └──────────────────────────┘      │
            └────────┬───────────┬───────────────┘
                     │           │
              训练Pipeline    推理服务
              (离线特征)      (在线特征)

1.3 工具对比

Feature Store 定位 在线/离线 部署方式 适合场景
Feast 开源 两者 自托管/云 中小团队
Tecton 商业 两者 SaaS/VPC 企业级
Hopsworks 开源+商业 两者 自托管/云 全栈ML
Databricks FS 商业 两者 Databricks Spark生态
AWS SageMaker FS 云原生 两者 AWS AWS用户

2. Feast实战

2.1 项目初始化

Python
# feature_store/feature_repo/feature_definition.py

from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String

# 1. 定义实体(业务主键)
user = Entity(
    name="user_id",
    join_keys=["user_id"],
    description="用户唯一标识"
)

product = Entity(
    name="product_id",
    join_keys=["product_id"],
    description="商品唯一标识"
)

# 2. 定义数据源
user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp"
)

# 3. 定义特征视图
user_stats_fv = FeatureView(
    name="user_stats",
    entities=[user],
    ttl=timedelta(days=1),  # 特征有效期
    schema=[
        Field(name="total_orders", dtype=Int64, description="历史总订单数"),
        Field(name="avg_order_amount", dtype=Float32, description="平均订单金额"),
        Field(name="days_since_last_order", dtype=Int64, description="距上次下单天数"),
        Field(name="user_segment", dtype=String, description="用户分层(高/中/低)"),
    ],
    source=user_stats_source,
    online=True,  # 同步到在线存储
    tags={"team": "recommendation", "version": "v2"},
)

2.2 训练时获取特征

Python
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path="feature_repo/")

# 训练集: 用实体DataFrame获取历史特征(Point-in-Time Join)
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003, 1004],
    "event_timestamp": pd.to_datetime([
        "2026-01-15", "2026-01-16", "2026-01-17", "2026-01-18"
    ])
})

# 获取特征(自动处理时间穿越,避免数据泄露)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:total_orders",
        "user_stats:avg_order_amount",
        "user_stats:days_since_last_order",
        "user_stats:user_segment",
    ]
).to_df()

print(training_df)
# user_id | event_timestamp | total_orders | avg_order_amount | ...

2.3 推理时获取特征

Python
# 在线推理: 低延迟获取最新特征
online_features = store.get_online_features(
    features=[
        "user_stats:total_orders",
        "user_stats:avg_order_amount",
        "user_stats:days_since_last_order",
    ],
    entity_rows=[
        {"user_id": 1001},
        {"user_id": 1002},
    ]
).to_dict()

print(online_features)
# {'user_id': [1001, 1002], 'total_orders': [42, 7], ...}

2.4 物化(同步到在线存储)

Bash
# 将离线特征同步到在线存储(Redis/DynamoDB)
feast materialize 2026-01-01T00:00:00 2026-02-01T00:00:00

# 增量物化
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")  # $()命令替换:执行命令并获取输出

3. 企业ML平台架构

3.1 平台全景

Text Only
┌─────────────────────────────────────────────────────────────┐
│                        ML平台架构                            │
├─────────────────────────────────────────────────────────────┤
│  用户界面层                                                  │
│  ├── 实验Dashboard (MLflow UI / W&B)                         │
│  ├── 特征市场 (Feature Catalog)                              │
│  ├── 模型注册表 (Model Registry)                             │
│  └── 监控看板 (Grafana)                                      │
├─────────────────────────────────────────────────────────────┤
│  编排层                                                     │
│  ├── 流水线编排 (Kubeflow / Airflow)                         │
│  ├── CI/CD (GitHub Actions / GitLab CI)                     │
│  └── 自动重训练触发器                                        │
├─────────────────────────────────────────────────────────────┤
│  核心服务层                                                  │
│  ├── 实验管理 (MLflow Tracking)                              │
│  ├── Feature Store (Feast)                                  │
│  ├── 模型服务 (Triton / vLLM / TorchServe)                  │
│  └── 监控 (Prometheus + Evidently)                          │
├─────────────────────────────────────────────────────────────┤
│  基础设施层                                                  │
│  ├── 计算 (Kubernetes + GPU节点池)                           │
│  ├── 存储 (S3/MinIO + Redis + PostgreSQL)                   │
│  └── 网络 (Istio服务网格 + API Gateway)                     │
└─────────────────────────────────────────────────────────────┘

3.2 ML平台配置示例

YAML
# ml-platform-config.yaml
platform:
  name: "ml-platform-v1"

  experiment_tracking:
    tool: "mlflow"
    backend_store: "postgresql://mlflow:pass@db:5432/mlflow"
    artifact_store: "s3://ml-artifacts"

  feature_store:
    tool: "feast"
    online_store:
      type: "redis"
      connection_string: "redis://redis:6379"
    offline_store:
      type: "file"  # 生产环境用BigQuery/Redshift

  model_serving:
    default_engine: "triton"
    autoscaling:
      min_replicas: 2
      max_replicas: 20
      target_gpu_utilization: 70
    canary:
      enabled: true
      initial_weight: 10  # 新模型初始流量10%
      promotion_threshold: 0.95  # 准确率>95%提升到100%

  monitoring:
    drift_detection:
      tool: "evidently"
      check_interval: "1h"
      alert_threshold: 0.3
    metrics_backend: "prometheus"
    dashboard: "grafana"

  pipeline:
    orchestrator: "kubeflow"
    retrain_triggers:
      - type: "drift"
        threshold: 0.3
      - type: "schedule"
        cron: "0 2 * * 0"  # 每周日2点
      - type: "performance"
        min_accuracy: 0.85

4. 端到端工程化实践

4.1 从训练到部署的完整Python接口

Python
class MLPlatformClient:
    """统一ML平台客户端"""

    def __init__(self, config_path="ml-platform-config.yaml"):
        import yaml
        with open(config_path) as f:  # with自动管理资源,确保文件正确关闭
            self.config = yaml.safe_load(f)

        self.feature_store = FeatureStore(repo_path="feature_repo/")
        self.mlflow_client = mlflow.tracking.MlflowClient()

    def get_training_data(self, entity_df, feature_list):
        """从Feature Store获取训练数据"""
        return self.feature_store.get_historical_features(
            entity_df=entity_df,
            features=feature_list
        ).to_df()

    def train_and_log(self, model, X_train, y_train, params):
        """训练并记录到MLflow"""
        with mlflow.start_run():
            mlflow.log_params(params)
            model.fit(X_train, y_train)

            # 记录指标
            from sklearn.metrics import accuracy_score
            train_acc = accuracy_score(y_train, model.predict(X_train))
            mlflow.log_metric("train_accuracy", train_acc)

            # 记录模型
            mlflow.sklearn.log_model(model, "model")

            return mlflow.active_run().info.run_id

    def evaluate_and_promote(self, run_id, test_df, model_name):
        """评估并决定是否推到生产"""
        # 加载模型
        model_uri = f"runs:/{run_id}/model"
        model = mlflow.sklearn.load_model(model_uri)

        X_test = test_df.drop("label", axis=1)
        y_test = test_df["label"]

        acc = accuracy_score(y_test, model.predict(X_test))

        # 注册模型
        mv = mlflow.register_model(model_uri, model_name)

        # 质量门禁
        if acc >= 0.85:
            # 使用别名系统(MLflow 2.9.0+ 推荐,替代已废弃的 Stage 系统)
            self.mlflow_client.set_registered_model_alias(
                name=model_name,
                alias="champion",
                version=mv.version
            )
            print(f"✅ 模型 v{mv.version} 推到生产 (acc={acc:.4f})")
        else:
            print(f"❌ 模型 v{mv.version} 未通过门禁 (acc={acc:.4f})")

        return acc

5. 练习题

代码实践

  1. 入门:用Feast定义一个用户特征视图,完成离线特征获取
  2. 进阶:实现Feature Store + MLflow + 质量门禁的完整训练流程
  3. 高级:搭建一个迷你ML平台(Feature Store + 训练Pipeline + 模型服务 + 监控告警)

面试题

  1. Feature Store解决了什么核心问题?训练-服务偏差如何产生?
  2. Point-in-Time Join是什么?为什么对避免数据泄露很重要?
  3. 在线存储和离线存储的选型考虑是什么?
  4. 如何设计一个ML平台的模型审批流程?
  5. Feature Store的特征血缘追踪如何实现?

最后更新:2026年2月