跳转至

第09章 数据血缘

数据血缘

📚 章节概述

本章将深入讲解数据血缘,包括DataHub、Amundsen、元数据管理等。通过本章学习,你将能够构建数据血缘系统。

🎯 学习目标

完成本章后,你将能够:

  1. 理解数据血缘的核心概念
  2. 掌握DataHub的使用
  3. 了解Amundsen的架构
  4. 掌握元数据管理
  5. 能够构建数据血缘系统

9.1 数据血缘概述

9.1.1 什么是数据血缘

数据血缘是描述数据从产生到消费的完整路径和关系。

数据血缘价值

  1. 数据溯源
  2. 追踪数据来源
  3. 理解数据变化
  4. 影响分析

  5. 影响分析

  6. 上游变化影响
  7. 下游依赖分析
  8. 风险评估

  9. 合规审计

  10. 数据流向追踪
  11. 访问路径审计
  12. 合规性验证

9.1.2 数据血缘类型

  1. 表级血缘
  2. 表到表的关系
  3. 简单易懂
  4. 粒度较粗

  5. 列级血缘

  6. 列到列的关系
  7. 精确追踪
  8. 复杂度高

  9. 字段级血缘

  10. 字段到字段的关系
  11. 最细粒度
  12. 实现复杂

9.2 DataHub

9.2.1 DataHub概述

DataHub是开源的现代数据目录和元数据平台。

核心特性

  1. 数据目录
  2. 数据资产发现
  3. 数据资产搜索
  4. 数据资产描述

  5. 数据血缘

  6. 自动血缘捕获
  7. 可视化展示
  8. 影响分析

  9. 元数据管理

  10. Schema管理
  11. 标签管理
  12. 权限管理

9.2.2 DataHub使用

Python
# 导入 DataHub REST 发射器,用于向 DataHub 服务端推送元数据
from datahub.emitter.rest_emitter import DatahubRestEmitter
# 导入元数据变更提案包装器,封装元数据变更请求
from datahub.emitter.mcp import MetadataChangeProposalWrapper
# 导入各类元数据 Schema 定义类
from datahub.metadata.schema_classes import (
    DatasetPropertiesClass,       # 数据集属性(名称、描述等)
    SchemaMetadataClass,          # Schema 元数据(字段定义)
    SchemaFieldClass,             # 单个字段定义
    StringTypeClass,              # 字符串类型
    NumberTypeClass,              # 数值类型
    UpstreamLineageClass,         # 上游血缘关系
    UpstreamClass,                # 上游数据集
    DatasetLineageTypeClass,      # 血缘类型(转换、复制等)
)

# 创建 DataHub Emitter
emitter = DatahubRestEmitter(
    gms_server="http://localhost:8080"
)

# 定义数据集 URN
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:hive,sales_data,PROD)"

# 发送数据集属性
dataset_properties = MetadataChangeProposalWrapper(
    entityUrn=dataset_urn,
    aspect=DatasetPropertiesClass(
        name="Sales Data",
        description="Daily sales data",
        customProperties={"team": "data_engineering"}
    )
)
emitter.emit(dataset_properties)

# 发送 Schema 元数据,定义数据集的字段结构信息
schema_metadata = MetadataChangeProposalWrapper(
    entityUrn=dataset_urn,
    aspect=SchemaMetadataClass(
        schemaName="sales_schema",         # Schema 名称
        platform="urn:li:dataPlatform:hive",  # 数据平台标识
        hash="",
        platformSchema=SchemaMetadataClass.PlatformSchemaClass(),
        fields=[
            # 定义 sale_id 字段:BIGINT 数值类型
            SchemaFieldClass(
                fieldPath="sale_id",
                type=NumberTypeClass(),
                nativeDataType="BIGINT",
                description="Sale ID"
            ),
            # 定义 product_id 字段:BIGINT 数值类型
            SchemaFieldClass(
                fieldPath="product_id",
                type=NumberTypeClass(),
                nativeDataType="BIGINT",
                description="Product ID"
            ),
        ]
    )
)
emitter.emit(schema_metadata)  # 将 Schema 信息推送到 DataHub

# 发送血缘关系(上游依赖),描述数据集之间的转换关系
# 上游数据集:MySQL 中的 raw_orders 表
upstream_urn = "urn:li:dataset:(urn:li:dataPlatform:mysql,raw_orders,PROD)"
lineage = MetadataChangeProposalWrapper(
    entityUrn=dataset_urn,
    aspect=UpstreamLineageClass(
        upstreams=[
            UpstreamClass(
                dataset=upstream_urn,
                # TRANSFORMED 表示数据经过转换处理(ETL)
                type=DatasetLineageTypeClass.TRANSFORMED
            )
        ]
    )
)
emitter.emit(lineage)  # 将血缘关系推送到 DataHub

print(f"DataHub metadata emitted for: {dataset_urn}")

9.3 Amundsen

9.3.1 Amundsen概述

Amundsen是开源的数据发现和元数据引擎。

核心特性

  1. 数据发现
  2. 自动数据发现
  3. 智能数据解析
  4. 数据资产索引

  5. 元数据管理

  6. 元数据存储
  7. 元数据搜索
  8. 元数据版本

  9. 数据血缘

  10. 血缘追踪
  11. 可视化展示
  12. 影响分析

9.3.2 Amundsen使用

Python
# Amundsen 使用示例(通过 REST API 调用)
import requests

AMUNDSEN_SEARCH_URL = "http://localhost:5001/search"
AMUNDSEN_METADATA_URL = "http://localhost:5002/table"

# 搜索数据资产
response = requests.get(
    f"{AMUNDSEN_SEARCH_URL}",
    params={"query": "sales", "page_index": 0}
)
search_results = response.json()

# 遍历搜索结果,打印每个数据表的基本信息
for table in search_results.get("results", []):
    print(f"Table: {table.get('name')}")          # 表名
    print(f"Schema: {table.get('schema')}")        # 所属 Schema
    print(f"Database: {table.get('database')}")    # 所属数据库
    print(f"Description: {table.get('description')}")  # 描述信息
    print("---")

# 获取表的详细元数据(包含列信息、统计数据等)
table_key = "hive://gold.test_schema/sales"
metadata_resp = requests.get(
    f"{AMUNDSEN_METADATA_URL}",
    params={"key": table_key}
)
table_detail = metadata_resp.json()
# 提取并打印表的所有列名列表
print(f"Columns: {[col['name'] for col in table_detail.get('columns', [])]}")

9.4 元数据管理

9.4.1 元数据概述

元数据是关于数据的数据,用于描述、管理和理解数据。

元数据类型

  1. 技术元数据
  2. Schema信息
  3. 存储位置
  4. 数据格式

  5. 业务元数据

  6. 业务含义
  7. 数据定义
  8. 业务规则

  9. 操作元数据

  10. 访问统计
  11. 使用频率
  12. 更新历史

9.4.2 元数据管理实现

Python
from datetime import datetime

class MetadataManager:
    """元数据管理类"""

    def __init__(self):
        self.metadata = {}

    def extract_metadata(self, df):
        """从 DataFrame 中自动提取元数据信息"""
        # 构建表级元数据:表名和行数
        metadata = {
            'table_name': 'sales',
            'row_count': len(df),
            'columns': []
        }

        # 遍历每一列,收集列级元数据
        for column in df.columns:
            column_info = {
                'name': column,                    # 列名
                'type': str(df[column].dtype),      # 数据类型
                'null_count': df[column].isnull().sum(),    # 空值数量
                'unique_count': df[column].nunique(),       # 唯一值数量
                # 对于数值列,记录最小值和最大值
                'min_value': df[column].min() if df[column].dtype != 'object' else None,
                'max_value': df[column].max() if df[column].dtype != 'object' else None
            }
            metadata['columns'].append(column_info)

        return metadata

    def track_lineage(self, source_table, target_table, transformation):
        """追踪数据血缘关系,记录数据从源表到目标表的转换路径"""
        lineage = {
            'source': source_table,              # 源表(上游数据)
            'target': target_table,              # 目标表(下游数据)
            'transformation': transformation,    # 转换逻辑描述
            'timestamp': datetime.now().isoformat()  # 记录时间戳
        }

        return lineage

    def get_metadata(self, table_name):
        """获取元数据"""
        return self.metadata.get(table_name)

    def update_metadata(self, table_name, metadata):
        """更新元数据"""
        self.metadata[table_name] = metadata

9.5 练习题

基础题

  1. 选择题
  2. 数据血缘的核心价值不包括什么?

    • A. 数据溯源
    • B. 影响分析
    • C. 数据美化
    • D. 合规审计
  3. 简答题

  4. 解释数据血缘的类型。
  5. 说明DataHub的核心特性。

进阶题

  1. 实践题
  2. 使用DataHub构建数据目录。
  3. 使用Amundsen发现数据。
  4. 实现数据血缘追踪。

  5. 设计题

  6. 设计一个数据血缘系统。
  7. 设计一个元数据管理平台。

答案

1. 选择题答案

  1. C(数据血缘的核心价值不包括数据美化)

2. 简答题答案

数据血缘的类型: - 表级血缘 - 列级血缘 - 字段级血缘

DataHub的核心特性: - 数据目录、数据血缘、元数据管理

3. 实践题答案

参见9.2-9.4节的示例。

4. 设计题答案

参见9.1-9.4节的系统设计。

9.6 面试准备

大厂面试题

字节跳动

  1. 解释数据血缘的核心概念。
  2. 如何设计数据血缘系统?
  3. 如何追踪数据变化?
  4. 如何分析数据影响?

腾讯

  1. DataHub和Amundsen的区别是什么?
  2. 如何设计元数据管理?
  3. 如何实现自动血缘捕获?
  4. 如何设计血缘可视化?

阿里云

  1. 数据血缘的最佳实践是什么?
  2. 如何设计血缘的扩展性?
  3. 如何处理血缘的性能问题?
  4. 如何设计血缘的安全性?

📚 参考资料

🎯 本章小结

本章深入讲解了数据血缘,包括:

  1. 数据血缘的核心概念
  2. DataHub的使用
  3. Amundsen的架构
  4. 元数据管理

通过本章学习,你掌握了数据血缘的核心技术,能够构建数据血缘系统。下一章将深入学习数据治理。