第08章 数据质量¶
📚 章节概述¶
本章将深入讲解数据质量,包括Great Expectations、Deequ、数据验证等。通过本章学习,你将能够实施数据质量保证体系。
🎯 学习目标¶
完成本章后,你将能够:
- 理解数据质量的核心概念
- 掌握Great Expectations的使用
- 了解Deequ数据验证
- 掌握数据质量监控
- 能够实施数据质量保证体系
8.1 数据质量概述¶
8.1.1 什么是数据质量¶
数据质量是数据满足业务需求的程度。
数据质量维度¶
- 准确性
- 数据正确性
- 无错误数据
-
符合业务规则
-
完整性
- 数据无缺失
- 无重复
-
关系完整
-
一致性
- 格式一致
- 值域一致
-
逻辑一致
-
及时性
- 数据及时更新
- 无过期数据
- 实时同步
8.1.2 数据质量问题¶
常见问题¶
- 缺失值
- 字段为空
- 记录不完整
-
数据丢失
-
重复数据
- 主键重复
- 记录重复
-
冗余数据
-
格式错误
- 日期格式错误
- 数值格式错误
-
字符编码错误
-
逻辑错误
- 违反业务规则
- 数据不一致
- 关系错误
8.2 Great Expectations¶
8.2.1 GE概述¶
Great Expectations是开源的数据质量工具,提供声明式的数据验证。
核心概念¶
- Expectation(期望)
- 数据验证规则
- 可重用
-
易于维护
-
Suite(套件)
- 多个Expectation
- 组织管理
-
批量验证
-
Validation Result(验证结果)
- 验证结果
- 统计信息
- 失败记录
8.2.2 GE使用¶
Python
import great_expectations as gx
import pandas as pd
# 创建 Data Context
context = gx.get_context()
# 读取数据并创建 Data Source
df = pd.read_csv('sales.csv')
data_source = context.data_sources.add_pandas(name="sales_source")
data_asset = data_source.add_dataframe_asset(name="sales_asset")
batch_definition = data_asset.add_batch_definition_whole_dataframe("sales_batch")
batch = batch_definition.get_batch(batch_parameters={"dataframe": df})
# 创建 Expectation Suite
suite = context.suites.add(
gx.ExpectationSuite(name="sales_suite")
)
# 添加 Expectations
suite.add_expectation(
gx.expectations.ExpectColumnToExist(column="product_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="quantity", min_value=1, max_value=1000
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount", min_value=0, max_value=100000
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="sale_id")
)
# 创建 Validation Definition 并验证
validation_definition = context.validation_definitions.add(
gx.ValidationDefinition(
name="sales_validation",
data=batch_definition,
suite=suite
)
)
result = validation_definition.run(batch_parameters={"dataframe": df})
# 输出结果
print(f"Validation success: {result.success}")
for exp_result in result.results:
print(f" {exp_result.expectation_config.type}: {exp_result.success}")
8.3 Deequ¶
8.3.1 Deequ概述¶
Deequ是开源的数据质量工具,用于大数据集的数据验证。
核心特性¶
- Metrics(指标)
- 数据质量指标
- 统计信息
-
聚合指标
-
Verification(验证)
- 数据验证
- 规则检查
-
异常检测
-
Repository(仓库)
- 指标存储
- 历史对比
- 趋势分析
8.3.2 Deequ使用¶
Python
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder \
.appName("DeequExample") \
.config("spark.jars.packages", "com.amazon.deequ:deequ:2.0.7-spark-3.5") \
.getOrCreate()
# 读取数据
df = spark.read.json('sales.json')
# 创建检查规则
check = Check(spark, CheckLevel.Error, "sales_data_check") \
.isComplete("product_id") \
.isComplete("product_name") \
.isUnique("sale_id") \
.isNonNegative("quantity") \
.isNonNegative("amount") \
.satisfies("quantity >= 1 AND quantity <= 1000", "quantity_range") \
.satisfies("amount >= 0 AND amount <= 100000", "amount_range")
# 运行验证
result = VerificationSuite(spark) \
.onData(df) \
.addCheck(check) \
.run()
# 输出结果
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show(truncate=False)
print(f"Verification status: {result.status}")
8.4 数据质量监控¶
8.4.1 监控概述¶
数据质量监控是对数据质量进行持续监控和告警。
监控指标¶
- 完整性指标
- 缺失值比例
- 重复记录数量
-
关系完整性
-
准确性指标
- 格式错误数量
- 值域错误数量
-
逻辑错误数量
-
一致性指标
- 跨表一致性
- 跨系统一致性
- 时间一致性
8.4.2 监控实现¶
Python
class DataQualityMonitor:
"""数据质量监控类"""
def __init__(self):
self.metrics = {}
def check_completeness(self, df):
"""检查完整性"""
metrics = {}
# 缺失值检查
for column in df.columns:
missing_count = df[column].isnull().sum()
missing_ratio = missing_count / len(df)
metrics[f'{column}_missing_ratio'] = missing_ratio
# 重复检查
duplicate_count = df.duplicated().sum()
metrics['duplicate_ratio'] = duplicate_count / len(df)
return metrics
def check_accuracy(self, df):
"""检查准确性"""
metrics = {}
# 格式检查
if 'date' in df.columns:
try: # try/except捕获异常
pd.to_datetime(df['date'])
metrics['date_format_valid'] = True
except:
metrics['date_format_valid'] = False
# 值域检查
if 'quantity' in df.columns:
invalid_count = df[(df['quantity'] < 0) | (df['quantity'] > 1000)].shape[0]
metrics['quantity_valid_ratio'] = 1 - invalid_count / len(df)
return metrics
def check_consistency(self, df):
"""检查一致性"""
metrics = {}
# 逻辑一致性
if 'amount' in df.columns and 'quantity' in df.columns and 'price' in df.columns:
calculated_amount = df['quantity'] * df['price']
consistency = (df['amount'] - calculated_amount).abs() < 0.01
metrics['amount_consistency_ratio'] = consistency.mean()
return metrics
def run_checks(self, df):
"""运行所有检查"""
completeness = self.check_completeness(df)
accuracy = self.check_accuracy(df)
consistency = self.check_consistency(df)
self.metrics = {
'completeness': completeness,
'accuracy': accuracy,
'consistency': consistency
}
return self.metrics
def alert(self, threshold=0.05):
"""告警"""
for category, metrics in self.metrics.items():
for metric_name, value in metrics.items():
if value < threshold:
print(f"ALERT: {category}.{metric_name} = {value}")
8.5 练习题¶
基础题¶
- 选择题
-
数据质量不包括哪个维度?
- A. 准确性
- B. 完整性
- C. 美观性
- D. 一致性
-
简答题
- 解释数据质量的维度。
- 说明Great Expectations的核心概念。
进阶题¶
- 实践题
- 使用GE验证数据质量。
- 使用Deequ检查大数据质量。
-
实施数据质量监控。
-
设计题
- 设计一个数据质量体系。
- 设计一个数据质量监控平台。
答案¶
1. 选择题答案¶
- C(数据质量不包括美观性)
2. 简答题答案¶
数据质量的维度: - 准确性 - 完整性 - 一致性 - 及时性
Great Expectations的核心概念: - Expectation、Suite、Validation Result
3. 实践题答案¶
参见8.2-8.4节的示例。
4. 设计题答案¶
参见8.1-8.4节的体系设计。
8.6 面试准备¶
大厂面试题¶
字节跳动¶
- 解释数据质量的维度。
- 如何设计数据质量规则?
- 如何监控数据质量?
- 如何处理数据质量问题?
腾讯¶
- Great Expectations和Deequ的区别是什么?
- 如何设计数据质量监控?
- 如何实施数据质量治理?
- 如何设计数据质量告警?
阿里云¶
- 数据质量的最佳实践是什么?
- 如何设计数据质量自动化?
- 如何处理数据质量的历史对比?
- 如何设计数据质量报告?
📚 参考资料¶
- Great Expectations文档:https://docs.greatexpectations.io/
- Deequ文档:https://deequ.io/
- 《Data Quality》
- 《Data Profiling》
🎯 本章小结¶
本章深入讲解了数据质量,包括:
- 数据质量的核心概念
- Great Expectations的使用
- Deequ数据验证
- 数据质量监控
通过本章学习,你掌握了数据质量的核心技术,能够实施数据质量保证体系。下一章将深入学习数据血缘。