第09章 数据血缘¶
📚 章节概述¶
本章将深入讲解数据血缘,包括DataHub、Amundsen、元数据管理等。通过本章学习,你将能够构建数据血缘系统。
🎯 学习目标¶
完成本章后,你将能够:
- 理解数据血缘的核心概念
- 掌握DataHub的使用
- 了解Amundsen的架构
- 掌握元数据管理
- 能够构建数据血缘系统
9.1 数据血缘概述¶
9.1.1 什么是数据血缘¶
数据血缘是描述数据从产生到消费的完整路径和关系。
数据血缘价值¶
- 数据溯源
- 追踪数据来源
- 理解数据变化
-
影响分析
-
影响分析
- 上游变化影响
- 下游依赖分析
-
风险评估
-
合规审计
- 数据流向追踪
- 访问路径审计
- 合规性验证
9.1.2 数据血缘类型¶
- 表级血缘
- 表到表的关系
- 简单易懂
-
粒度较粗
-
列级血缘
- 列到列的关系
- 精确追踪
-
复杂度高
-
字段级血缘
- 字段到字段的关系
- 最细粒度
- 实现复杂
9.2 DataHub¶
9.2.1 DataHub概述¶
DataHub是开源的现代数据目录和元数据平台。
核心特性¶
- 数据目录
- 数据资产发现
- 数据资产搜索
-
数据资产描述
-
数据血缘
- 自动血缘捕获
- 可视化展示
-
影响分析
-
元数据管理
- Schema管理
- 标签管理
- 权限管理
9.2.2 DataHub使用¶
Python
# 导入 DataHub REST 发射器,用于向 DataHub 服务端推送元数据
from datahub.emitter.rest_emitter import DatahubRestEmitter
# 导入元数据变更提案包装器,封装元数据变更请求
from datahub.emitter.mcp import MetadataChangeProposalWrapper
# 导入各类元数据 Schema 定义类
from datahub.metadata.schema_classes import (
DatasetPropertiesClass, # 数据集属性(名称、描述等)
SchemaMetadataClass, # Schema 元数据(字段定义)
SchemaFieldClass, # 单个字段定义
StringTypeClass, # 字符串类型
NumberTypeClass, # 数值类型
UpstreamLineageClass, # 上游血缘关系
UpstreamClass, # 上游数据集
DatasetLineageTypeClass, # 血缘类型(转换、复制等)
)
# 创建 DataHub Emitter
emitter = DatahubRestEmitter(
gms_server="http://localhost:8080"
)
# 定义数据集 URN
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:hive,sales_data,PROD)"
# 发送数据集属性
dataset_properties = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=DatasetPropertiesClass(
name="Sales Data",
description="Daily sales data",
customProperties={"team": "data_engineering"}
)
)
emitter.emit(dataset_properties)
# 发送 Schema 元数据,定义数据集的字段结构信息
schema_metadata = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=SchemaMetadataClass(
schemaName="sales_schema", # Schema 名称
platform="urn:li:dataPlatform:hive", # 数据平台标识
hash="",
platformSchema=SchemaMetadataClass.PlatformSchemaClass(),
fields=[
# 定义 sale_id 字段:BIGINT 数值类型
SchemaFieldClass(
fieldPath="sale_id",
type=NumberTypeClass(),
nativeDataType="BIGINT",
description="Sale ID"
),
# 定义 product_id 字段:BIGINT 数值类型
SchemaFieldClass(
fieldPath="product_id",
type=NumberTypeClass(),
nativeDataType="BIGINT",
description="Product ID"
),
]
)
)
emitter.emit(schema_metadata) # 将 Schema 信息推送到 DataHub
# 发送血缘关系(上游依赖),描述数据集之间的转换关系
# 上游数据集:MySQL 中的 raw_orders 表
upstream_urn = "urn:li:dataset:(urn:li:dataPlatform:mysql,raw_orders,PROD)"
lineage = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=upstream_urn,
# TRANSFORMED 表示数据经过转换处理(ETL)
type=DatasetLineageTypeClass.TRANSFORMED
)
]
)
)
emitter.emit(lineage) # 将血缘关系推送到 DataHub
print(f"DataHub metadata emitted for: {dataset_urn}")
9.3 Amundsen¶
9.3.1 Amundsen概述¶
Amundsen是开源的数据发现和元数据引擎。
核心特性¶
- 数据发现
- 自动数据发现
- 智能数据解析
-
数据资产索引
-
元数据管理
- 元数据存储
- 元数据搜索
-
元数据版本
-
数据血缘
- 血缘追踪
- 可视化展示
- 影响分析
9.3.2 Amundsen使用¶
Python
# Amundsen 使用示例(通过 REST API 调用)
import requests
AMUNDSEN_SEARCH_URL = "http://localhost:5001/search"
AMUNDSEN_METADATA_URL = "http://localhost:5002/table"
# 搜索数据资产
response = requests.get(
f"{AMUNDSEN_SEARCH_URL}",
params={"query": "sales", "page_index": 0}
)
search_results = response.json()
# 遍历搜索结果,打印每个数据表的基本信息
for table in search_results.get("results", []):
print(f"Table: {table.get('name')}") # 表名
print(f"Schema: {table.get('schema')}") # 所属 Schema
print(f"Database: {table.get('database')}") # 所属数据库
print(f"Description: {table.get('description')}") # 描述信息
print("---")
# 获取表的详细元数据(包含列信息、统计数据等)
table_key = "hive://gold.test_schema/sales"
metadata_resp = requests.get(
f"{AMUNDSEN_METADATA_URL}",
params={"key": table_key}
)
table_detail = metadata_resp.json()
# 提取并打印表的所有列名列表
print(f"Columns: {[col['name'] for col in table_detail.get('columns', [])]}")
9.4 元数据管理¶
9.4.1 元数据概述¶
元数据是关于数据的数据,用于描述、管理和理解数据。
元数据类型¶
- 技术元数据
- Schema信息
- 存储位置
-
数据格式
-
业务元数据
- 业务含义
- 数据定义
-
业务规则
-
操作元数据
- 访问统计
- 使用频率
- 更新历史
9.4.2 元数据管理实现¶
Python
from datetime import datetime
class MetadataManager:
"""元数据管理类"""
def __init__(self):
self.metadata = {}
def extract_metadata(self, df):
"""从 DataFrame 中自动提取元数据信息"""
# 构建表级元数据:表名和行数
metadata = {
'table_name': 'sales',
'row_count': len(df),
'columns': []
}
# 遍历每一列,收集列级元数据
for column in df.columns:
column_info = {
'name': column, # 列名
'type': str(df[column].dtype), # 数据类型
'null_count': df[column].isnull().sum(), # 空值数量
'unique_count': df[column].nunique(), # 唯一值数量
# 对于数值列,记录最小值和最大值
'min_value': df[column].min() if df[column].dtype != 'object' else None,
'max_value': df[column].max() if df[column].dtype != 'object' else None
}
metadata['columns'].append(column_info)
return metadata
def track_lineage(self, source_table, target_table, transformation):
"""追踪数据血缘关系,记录数据从源表到目标表的转换路径"""
lineage = {
'source': source_table, # 源表(上游数据)
'target': target_table, # 目标表(下游数据)
'transformation': transformation, # 转换逻辑描述
'timestamp': datetime.now().isoformat() # 记录时间戳
}
return lineage
def get_metadata(self, table_name):
"""获取元数据"""
return self.metadata.get(table_name)
def update_metadata(self, table_name, metadata):
"""更新元数据"""
self.metadata[table_name] = metadata
9.5 练习题¶
基础题¶
- 选择题
-
数据血缘的核心价值不包括什么?
- A. 数据溯源
- B. 影响分析
- C. 数据美化
- D. 合规审计
-
简答题
- 解释数据血缘的类型。
- 说明DataHub的核心特性。
进阶题¶
- 实践题
- 使用DataHub构建数据目录。
- 使用Amundsen发现数据。
-
实现数据血缘追踪。
-
设计题
- 设计一个数据血缘系统。
- 设计一个元数据管理平台。
答案¶
1. 选择题答案¶
- C(数据血缘的核心价值不包括数据美化)
2. 简答题答案¶
数据血缘的类型: - 表级血缘 - 列级血缘 - 字段级血缘
DataHub的核心特性: - 数据目录、数据血缘、元数据管理
3. 实践题答案¶
参见9.2-9.4节的示例。
4. 设计题答案¶
参见9.1-9.4节的系统设计。
9.6 面试准备¶
大厂面试题¶
字节跳动¶
- 解释数据血缘的核心概念。
- 如何设计数据血缘系统?
- 如何追踪数据变化?
- 如何分析数据影响?
腾讯¶
- DataHub和Amundsen的区别是什么?
- 如何设计元数据管理?
- 如何实现自动血缘捕获?
- 如何设计血缘可视化?
阿里云¶
- 数据血缘的最佳实践是什么?
- 如何设计血缘的扩展性?
- 如何处理血缘的性能问题?
- 如何设计血缘的安全性?
📚 参考资料¶
- DataHub文档:https://datahubproject.io/docs/
- Amundsen文档:https://amundsen.io/docs/
- 《Data Lineage》
- 《Metadata Management》
🎯 本章小结¶
本章深入讲解了数据血缘,包括:
- 数据血缘的核心概念
- DataHub的使用
- Amundsen的架构
- 元数据管理
通过本章学习,你掌握了数据血缘的核心技术,能够构建数据血缘系统。下一章将深入学习数据治理。