跳转至

第08章 数据质量

数据质量

📚 章节概述

本章将深入讲解数据质量,包括Great Expectations、Deequ、数据验证等。通过本章学习,你将能够实施数据质量保证体系。

🎯 学习目标

完成本章后,你将能够:

  1. 理解数据质量的核心概念
  2. 掌握Great Expectations的使用
  3. 了解Deequ数据验证
  4. 掌握数据质量监控
  5. 能够实施数据质量保证体系

8.1 数据质量概述

8.1.1 什么是数据质量

数据质量是数据满足业务需求的程度。

数据质量维度

  1. 准确性
  2. 数据正确性
  3. 无错误数据
  4. 符合业务规则

  5. 完整性

  6. 数据无缺失
  7. 无重复
  8. 关系完整

  9. 一致性

  10. 格式一致
  11. 值域一致
  12. 逻辑一致

  13. 及时性

  14. 数据及时更新
  15. 无过期数据
  16. 实时同步

8.1.2 数据质量问题

常见问题

  1. 缺失值
  2. 字段为空
  3. 记录不完整
  4. 数据丢失

  5. 重复数据

  6. 主键重复
  7. 记录重复
  8. 冗余数据

  9. 格式错误

  10. 日期格式错误
  11. 数值格式错误
  12. 字符编码错误

  13. 逻辑错误

  14. 违反业务规则
  15. 数据不一致
  16. 关系错误

8.2 Great Expectations

8.2.1 GE概述

Great Expectations是开源的数据质量工具,提供声明式的数据验证。

核心概念

  1. Expectation(期望)
  2. 数据验证规则
  3. 可重用
  4. 易于维护

  5. Suite(套件)

  6. 多个Expectation
  7. 组织管理
  8. 批量验证

  9. Validation Result(验证结果)

  10. 验证结果
  11. 统计信息
  12. 失败记录

8.2.2 GE使用

Python
import great_expectations as gx
import pandas as pd

# 创建 Data Context
context = gx.get_context()

# 读取数据并创建 Data Source
df = pd.read_csv('sales.csv')
data_source = context.data_sources.add_pandas(name="sales_source")
data_asset = data_source.add_dataframe_asset(name="sales_asset")
batch_definition = data_asset.add_batch_definition_whole_dataframe("sales_batch")
batch = batch_definition.get_batch(batch_parameters={"dataframe": df})

# 创建 Expectation Suite
suite = context.suites.add(
    gx.ExpectationSuite(name="sales_suite")
)

# 添加 Expectations
suite.add_expectation(
    gx.expectations.ExpectColumnToExist(column="product_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="quantity", min_value=1, max_value=1000
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="amount", min_value=0, max_value=100000
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="sale_id")
)

# 创建 Validation Definition 并验证
validation_definition = context.validation_definitions.add(
    gx.ValidationDefinition(
        name="sales_validation",
        data=batch_definition,
        suite=suite
    )
)
result = validation_definition.run(batch_parameters={"dataframe": df})

# 输出结果
print(f"Validation success: {result.success}")
for exp_result in result.results:
    print(f"  {exp_result.expectation_config.type}: {exp_result.success}")

8.3 Deequ

8.3.1 Deequ概述

Deequ是开源的数据质量工具,用于大数据集的数据验证。

核心特性

  1. Metrics(指标)
  2. 数据质量指标
  3. 统计信息
  4. 聚合指标

  5. Verification(验证)

  6. 数据验证
  7. 规则检查
  8. 异常检测

  9. Repository(仓库)

  10. 指标存储
  11. 历史对比
  12. 趋势分析

8.3.2 Deequ使用

Python
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
from pyspark.sql import SparkSession

# 创建SparkSession
spark = SparkSession.builder \
    .appName("DeequExample") \
    .config("spark.jars.packages", "com.amazon.deequ:deequ:2.0.7-spark-3.5") \
    .getOrCreate()

# 读取数据
df = spark.read.json('sales.json')

# 创建检查规则
check = Check(spark, CheckLevel.Error, "sales_data_check") \
    .isComplete("product_id") \
    .isComplete("product_name") \
    .isUnique("sale_id") \
    .isNonNegative("quantity") \
    .isNonNegative("amount") \
    .satisfies("quantity >= 1 AND quantity <= 1000", "quantity_range") \
    .satisfies("amount >= 0 AND amount <= 100000", "amount_range")

# 运行验证
result = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(check) \
    .run()

# 输出结果
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show(truncate=False)
print(f"Verification status: {result.status}")

8.4 数据质量监控

8.4.1 监控概述

数据质量监控是对数据质量进行持续监控和告警。

监控指标

  1. 完整性指标
  2. 缺失值比例
  3. 重复记录数量
  4. 关系完整性

  5. 准确性指标

  6. 格式错误数量
  7. 值域错误数量
  8. 逻辑错误数量

  9. 一致性指标

  10. 跨表一致性
  11. 跨系统一致性
  12. 时间一致性

8.4.2 监控实现

Python
class DataQualityMonitor:
    """数据质量监控类"""

    def __init__(self):
        self.metrics = {}

    def check_completeness(self, df):
        """检查完整性"""
        metrics = {}

        # 缺失值检查
        for column in df.columns:
            missing_count = df[column].isnull().sum()
            missing_ratio = missing_count / len(df)
            metrics[f'{column}_missing_ratio'] = missing_ratio

        # 重复检查
        duplicate_count = df.duplicated().sum()
        metrics['duplicate_ratio'] = duplicate_count / len(df)

        return metrics

    def check_accuracy(self, df):
        """检查准确性"""
        metrics = {}

        # 格式检查
        if 'date' in df.columns:
            try:  # try/except捕获异常
                pd.to_datetime(df['date'])
                metrics['date_format_valid'] = True
            except:
                metrics['date_format_valid'] = False

        # 值域检查
        if 'quantity' in df.columns:
            invalid_count = df[(df['quantity'] < 0) | (df['quantity'] > 1000)].shape[0]
            metrics['quantity_valid_ratio'] = 1 - invalid_count / len(df)

        return metrics

    def check_consistency(self, df):
        """检查一致性"""
        metrics = {}

        # 逻辑一致性
        if 'amount' in df.columns and 'quantity' in df.columns and 'price' in df.columns:
            calculated_amount = df['quantity'] * df['price']
            consistency = (df['amount'] - calculated_amount).abs() < 0.01
            metrics['amount_consistency_ratio'] = consistency.mean()

        return metrics

    def run_checks(self, df):
        """运行所有检查"""
        completeness = self.check_completeness(df)
        accuracy = self.check_accuracy(df)
        consistency = self.check_consistency(df)

        self.metrics = {
            'completeness': completeness,
            'accuracy': accuracy,
            'consistency': consistency
        }

        return self.metrics

    def alert(self, threshold=0.05):
        """告警"""
        for category, metrics in self.metrics.items():
            for metric_name, value in metrics.items():
                if value < threshold:
                    print(f"ALERT: {category}.{metric_name} = {value}")

8.5 练习题

基础题

  1. 选择题
  2. 数据质量不包括哪个维度?

    • A. 准确性
    • B. 完整性
    • C. 美观性
    • D. 一致性
  3. 简答题

  4. 解释数据质量的维度。
  5. 说明Great Expectations的核心概念。

进阶题

  1. 实践题
  2. 使用GE验证数据质量。
  3. 使用Deequ检查大数据质量。
  4. 实施数据质量监控。

  5. 设计题

  6. 设计一个数据质量体系。
  7. 设计一个数据质量监控平台。

答案

1. 选择题答案

  1. C(数据质量不包括美观性)

2. 简答题答案

数据质量的维度: - 准确性 - 完整性 - 一致性 - 及时性

Great Expectations的核心概念: - Expectation、Suite、Validation Result

3. 实践题答案

参见8.2-8.4节的示例。

4. 设计题答案

参见8.1-8.4节的体系设计。

8.6 面试准备

大厂面试题

字节跳动

  1. 解释数据质量的维度。
  2. 如何设计数据质量规则?
  3. 如何监控数据质量?
  4. 如何处理数据质量问题?

腾讯

  1. Great Expectations和Deequ的区别是什么?
  2. 如何设计数据质量监控?
  3. 如何实施数据质量治理?
  4. 如何设计数据质量告警?

阿里云

  1. 数据质量的最佳实践是什么?
  2. 如何设计数据质量自动化?
  3. 如何处理数据质量的历史对比?
  4. 如何设计数据质量报告?

📚 参考资料

🎯 本章小结

本章深入讲解了数据质量,包括:

  1. 数据质量的核心概念
  2. Great Expectations的使用
  3. Deequ数据验证
  4. 数据质量监控

通过本章学习,你掌握了数据质量的核心技术,能够实施数据质量保证体系。下一章将深入学习数据血缘。