跳转至

第03章 ETL流程设计

ETL流程设计

📚 章节概述

本章将深入讲解ETL流程设计,包括ETL/ELT概念、数据抽取、转换、加载等。通过本章学习,你将能够设计和实现高效的ETL流程。

🎯 学习目标

完成本章后,你将能够:

  1. 理解ETL/ELT的核心概念
  2. 掌握数据抽取技术
  3. 了解数据转换方法
  4. 掌握数据加载策略
  5. 能够设计和实现ETL流程

3.1 ETL概述

3.1.1 什么是ETL

ETL是Extract(抽取)、Transform(转换)、Load(加载)的缩写。

ETL流程

Text Only
数据源 → 抽取 → 转换 → 加载 → 目标系统

3.1.2 ETL vs ELT

特性 ETL ELT
转换位置 ETL服务器 目标系统
性能 需要中间服务器 利用目标系统性能
灵活性 较低 较高
适用场景 传统数据仓库 云数据仓库

3.2 数据抽取

3.2.1 抽取方式

  1. 全量抽取
  2. 每次抽取所有数据
  3. 适合小数据量
  4. 简单但效率低

  5. 增量抽取

  6. 只抽取变化的数据
  7. 适合大数据量
  8. 需要变化检测机制

3.2.2 抽取实现

Python
import pandas as pd
from datetime import datetime, timedelta
from sqlalchemy import text

# 允许的表名白名单(防止SQL注入)
ALLOWED_TABLES = {'users', 'orders', 'products', 'events'}

def full_extract(source_db, table_name):
    """全量抽取

    安全说明:table_name 不能参数化,因此使用白名单验证防止SQL注入
    """
    if table_name not in ALLOWED_TABLES:
        raise ValueError(f"不允许的表名: {table_name}")
    # 经白名单验证后的标识符是安全的
    query = f"SELECT * FROM {table_name}"
    df = pd.read_sql(query, source_db)
    return df

def incremental_extract(source_db, table_name, last_extract_time):
    """增量抽取

    安全说明:table_name 用白名单验证,时间条件用参数化查询
    """
    if table_name not in ALLOWED_TABLES:
        raise ValueError(f"不允许的表名: {table_name}")
    # table_name 经白名单验证,last_time 使用参数化查询
    query = text(f"SELECT * FROM {table_name} WHERE updated_at > :last_time")
    df = pd.read_sql(query, source_db, params={"last_time": last_extract_time})
    return df

# 使用示例
# source_db = create_engine('mysql+pymysql://user:pass@host/db')
# df = full_extract(source_db, 'users')
# df = incremental_extract(source_db, 'users', '2024-01-01 00:00:00')

3.3 数据转换

3.3.1 转换类型

  1. 数据清洗
  2. 去重
  3. 填充缺失值
  4. 纠正错误数据

  5. 数据转换

  6. 格式转换
  7. 数据标准化
  8. 数据聚合

  9. 数据丰富

  10. 关联其他数据源
  11. 计算衍生字段
  12. 数据补全

3.3.2 转换实现

Python
def clean_data(df):
    """数据清洗"""
    # 去重
    df = df.drop_duplicates()

    # 填充缺失值
    df['age'].fillna(df['age'].mean(), inplace=True)
    df['salary'].fillna(df['salary'].median(), inplace=True)

    # 纠正错误数据
    df['age'] = df['age'].clip(0, 120)

    return df

def transform_data(df):
    """数据转换"""
    # 格式转换
    df['date'] = pd.to_datetime(df['date'])

    # 数据标准化
    df['salary_normalized'] = (df['salary'] - df['salary'].mean()) / df['salary'].std()

    # 数据聚合
    daily_stats = df.groupby('date').agg({
        'sales': 'sum',
        'orders': 'count'
    }).reset_index()

    return daily_stats

def enrich_data(df, reference_df):
    """数据丰富"""
    # 关联其他数据源
    enriched_df = df.merge(reference_df, on='product_id', how='left')

    # 计算衍生字段
    enriched_df['total_price'] = enriched_df['quantity'] * enriched_df['price']

    return enriched_df

3.4 数据加载

3.4.1 加载策略

  1. 全量加载
  2. 删除目标表数据
  3. 加载所有数据
  4. 适合小数据量

  5. 增量加载

  6. 只加载新数据
  7. 追加或更新
  8. 适合大数据量

3.4.2 加载实现

Python
import sqlalchemy
from sqlalchemy import text

# 允许加载的目标表白名单(防止SQL注入)
ALLOWED_TABLES = {"dim_users", "dim_products", "fact_orders", "fact_sales"}

def _validate_table_name(table_name: str) -> str:
    """校验表名是否在白名单中,防止SQL注入"""
    if table_name not in ALLOWED_TABLES:
        raise ValueError(f"非法表名: {table_name},允许的表: {ALLOWED_TABLES}")
    return table_name

def full_load(df, engine, table_name):
    """全量加载"""
    _validate_table_name(table_name)
    # 删除目标表数据
    with engine.connect() as conn:
        conn.execute(text(f"TRUNCATE TABLE {table_name}"))
        conn.commit()

    # 加载数据
    df.to_sql(table_name, engine, if_exists='append', index=False)
    print(f"全量加载完成,共加载 {len(df)} 条记录")

def incremental_load(df, engine, table_name, key_columns):
    """增量加载(基于临时表 + MERGE 策略)"""
    _validate_table_name(table_name)
    # 将新数据写入临时表
    temp_table = f"{table_name}_staging"
    df.to_sql(temp_table, engine, if_exists='replace', index=False)

    # 构建 MERGE 语句(以 PostgreSQL 为例)
    key_condition = ' AND '.join([f"t.{col} = s.{col}" for col in key_columns])
    update_cols = [col for col in df.columns if col not in key_columns]
    set_clause = ', '.join([f"{col} = s.{col}" for col in update_cols])
    all_cols = ', '.join(df.columns)
    source_cols = ', '.join([f"s.{col}" for col in df.columns])

    merge_sql = f"""
        INSERT INTO {table_name} ({all_cols})
        SELECT {source_cols} FROM {temp_table} s
        ON CONFLICT ({', '.join(key_columns)})
        DO UPDATE SET {set_clause}
    """

    with engine.connect() as conn:
        conn.execute(text(merge_sql))
        conn.execute(text(f"DROP TABLE IF EXISTS {temp_table}"))
        conn.commit()

    print(f"增量加载完成,共处理 {len(df)} 条记录")

3.5 完整ETL流程

Python
class ETLProcess:
    """数据管道流程类"""

    def __init__(self, source_engine, target_engine):
        self.source_engine = source_engine
        self.target_engine = target_engine

    def extract(self):
        """抽取数据"""
        print("开始抽取数据...")
        df = pd.read_sql("SELECT * FROM source_table", self.source_engine)
        print(f"抽取完成,共 {len(df)} 条记录")
        return df

    def transform(self, df):
        """转换数据"""
        print("开始转换数据...")
        df = clean_data(df)
        print("转换完成")
        return df

    def load(self, df):
        """加载数据"""
        print("开始加载数据...")
        full_load(df, self.target_engine, 'target_table')
        print("加载完成")

    def run(self):
        """运行ETL流程"""
        df = self.extract()
        df = self.transform(df)
        self.load(df)
        print("ETL流程完成")

# 使用示例
# from sqlalchemy import create_engine
# source_engine = create_engine('postgresql://user:pass@host/source_db')
# target_engine = create_engine('postgresql://user:pass@host/target_db')
# etl = ETLProcess(source_engine, target_engine)
# etl.run()

3.6 练习题

基础题

  1. 选择题
  2. ETL中的T代表什么?

    • A. Transform
    • B. Transfer
    • C. Transport
    • D. Translate
  3. 简答题

  4. 解释ETL和ELT的区别。
  5. 说明数据抽取的方式。

进阶题

  1. 实践题
  2. 设计一个ETL流程。
  3. 实现数据抽取和转换。
  4. 实现数据加载。

  5. 设计题

  6. 设计一个增量ETL流程。
  7. 设计一个实时ETL流程。

答案

1. 选择题答案

  1. A(ETL中的T代表Transform)

2. 简答题答案

ETL和ELT的区别: - ETL:转换在ETL服务器进行 - ELT:转换在目标系统进行

数据抽取的方式: - 全量抽取 - 增量抽取

3. 实践题答案

参见3.2-3.5节的示例。

4. 设计题答案

参见3.2-3.5节的示例。

3.7 面试准备

大厂面试题

字节跳动

  1. 解释ETL流程的三个阶段。
  2. 如何设计增量ETL?
  3. 如何处理ETL中的错误?
  4. 如何优化ETL性能?

腾讯

  1. 数据转换的最佳实践是什么?
  2. 如何设计容错的ETL流程?
  3. 如何监控ETL流程?
  4. 如何设计ETL调度?

阿里云

  1. ETL工具的选择标准是什么?
  2. 如何设计实时ETL?
  3. 如何处理数据质量问题?
  4. 如何设计ETL版本控制?

📚 参考资料

  • 《ETL Best Practices》
  • 《Data Integration Blueprint》
  • Apache Airflow文档
  • 《Data Pipeline》

🎯 本章小结

本章深入讲解了ETL流程设计,包括:

  1. ETL/ELT的核心概念
  2. 数据抽取技术
  3. 数据转换方法
  4. 数据加载策略
  5. 完整的ETL流程实现

通过本章学习,你掌握了ETL流程设计的核心技术,能够设计和实现高效的ETL流程。下一章将深入学习数据仓库架构。