第03章 ETL流程设计¶
📚 章节概述¶
本章将深入讲解ETL流程设计,包括ETL/ELT概念、数据抽取、转换、加载等。通过本章学习,你将能够设计和实现高效的ETL流程。
🎯 学习目标¶
完成本章后,你将能够:
- 理解ETL/ELT的核心概念
- 掌握数据抽取技术
- 了解数据转换方法
- 掌握数据加载策略
- 能够设计和实现ETL流程
3.1 ETL概述¶
3.1.1 什么是ETL¶
ETL是Extract(抽取)、Transform(转换)、Load(加载)的缩写。
ETL流程¶
3.1.2 ETL vs ELT¶
| 特性 | ETL | ELT |
|---|---|---|
| 转换位置 | ETL服务器 | 目标系统 |
| 性能 | 需要中间服务器 | 利用目标系统性能 |
| 灵活性 | 较低 | 较高 |
| 适用场景 | 传统数据仓库 | 云数据仓库 |
3.2 数据抽取¶
3.2.1 抽取方式¶
- 全量抽取
- 每次抽取所有数据
- 适合小数据量
-
简单但效率低
-
增量抽取
- 只抽取变化的数据
- 适合大数据量
- 需要变化检测机制
3.2.2 抽取实现¶
Python
import pandas as pd
from datetime import datetime, timedelta
from sqlalchemy import text
# 允许的表名白名单(防止SQL注入)
ALLOWED_TABLES = {'users', 'orders', 'products', 'events'}
def full_extract(source_db, table_name):
"""全量抽取
安全说明:table_name 不能参数化,因此使用白名单验证防止SQL注入
"""
if table_name not in ALLOWED_TABLES:
raise ValueError(f"不允许的表名: {table_name}")
# 经白名单验证后的标识符是安全的
query = f"SELECT * FROM {table_name}"
df = pd.read_sql(query, source_db)
return df
def incremental_extract(source_db, table_name, last_extract_time):
"""增量抽取
安全说明:table_name 用白名单验证,时间条件用参数化查询
"""
if table_name not in ALLOWED_TABLES:
raise ValueError(f"不允许的表名: {table_name}")
# table_name 经白名单验证,last_time 使用参数化查询
query = text(f"SELECT * FROM {table_name} WHERE updated_at > :last_time")
df = pd.read_sql(query, source_db, params={"last_time": last_extract_time})
return df
# 使用示例
# source_db = create_engine('mysql+pymysql://user:pass@host/db')
# df = full_extract(source_db, 'users')
# df = incremental_extract(source_db, 'users', '2024-01-01 00:00:00')
3.3 数据转换¶
3.3.1 转换类型¶
- 数据清洗
- 去重
- 填充缺失值
-
纠正错误数据
-
数据转换
- 格式转换
- 数据标准化
-
数据聚合
-
数据丰富
- 关联其他数据源
- 计算衍生字段
- 数据补全
3.3.2 转换实现¶
Python
def clean_data(df):
"""数据清洗"""
# 去重
df = df.drop_duplicates()
# 填充缺失值
df['age'].fillna(df['age'].mean(), inplace=True)
df['salary'].fillna(df['salary'].median(), inplace=True)
# 纠正错误数据
df['age'] = df['age'].clip(0, 120)
return df
def transform_data(df):
"""数据转换"""
# 格式转换
df['date'] = pd.to_datetime(df['date'])
# 数据标准化
df['salary_normalized'] = (df['salary'] - df['salary'].mean()) / df['salary'].std()
# 数据聚合
daily_stats = df.groupby('date').agg({
'sales': 'sum',
'orders': 'count'
}).reset_index()
return daily_stats
def enrich_data(df, reference_df):
"""数据丰富"""
# 关联其他数据源
enriched_df = df.merge(reference_df, on='product_id', how='left')
# 计算衍生字段
enriched_df['total_price'] = enriched_df['quantity'] * enriched_df['price']
return enriched_df
3.4 数据加载¶
3.4.1 加载策略¶
- 全量加载
- 删除目标表数据
- 加载所有数据
-
适合小数据量
-
增量加载
- 只加载新数据
- 追加或更新
- 适合大数据量
3.4.2 加载实现¶
Python
import sqlalchemy
from sqlalchemy import text
# 允许加载的目标表白名单(防止SQL注入)
ALLOWED_TABLES = {"dim_users", "dim_products", "fact_orders", "fact_sales"}
def _validate_table_name(table_name: str) -> str:
"""校验表名是否在白名单中,防止SQL注入"""
if table_name not in ALLOWED_TABLES:
raise ValueError(f"非法表名: {table_name},允许的表: {ALLOWED_TABLES}")
return table_name
def full_load(df, engine, table_name):
"""全量加载"""
_validate_table_name(table_name)
# 删除目标表数据
with engine.connect() as conn:
conn.execute(text(f"TRUNCATE TABLE {table_name}"))
conn.commit()
# 加载数据
df.to_sql(table_name, engine, if_exists='append', index=False)
print(f"全量加载完成,共加载 {len(df)} 条记录")
def incremental_load(df, engine, table_name, key_columns):
"""增量加载(基于临时表 + MERGE 策略)"""
_validate_table_name(table_name)
# 将新数据写入临时表
temp_table = f"{table_name}_staging"
df.to_sql(temp_table, engine, if_exists='replace', index=False)
# 构建 MERGE 语句(以 PostgreSQL 为例)
key_condition = ' AND '.join([f"t.{col} = s.{col}" for col in key_columns])
update_cols = [col for col in df.columns if col not in key_columns]
set_clause = ', '.join([f"{col} = s.{col}" for col in update_cols])
all_cols = ', '.join(df.columns)
source_cols = ', '.join([f"s.{col}" for col in df.columns])
merge_sql = f"""
INSERT INTO {table_name} ({all_cols})
SELECT {source_cols} FROM {temp_table} s
ON CONFLICT ({', '.join(key_columns)})
DO UPDATE SET {set_clause}
"""
with engine.connect() as conn:
conn.execute(text(merge_sql))
conn.execute(text(f"DROP TABLE IF EXISTS {temp_table}"))
conn.commit()
print(f"增量加载完成,共处理 {len(df)} 条记录")
3.5 完整ETL流程¶
Python
class ETLProcess:
"""数据管道流程类"""
def __init__(self, source_engine, target_engine):
self.source_engine = source_engine
self.target_engine = target_engine
def extract(self):
"""抽取数据"""
print("开始抽取数据...")
df = pd.read_sql("SELECT * FROM source_table", self.source_engine)
print(f"抽取完成,共 {len(df)} 条记录")
return df
def transform(self, df):
"""转换数据"""
print("开始转换数据...")
df = clean_data(df)
print("转换完成")
return df
def load(self, df):
"""加载数据"""
print("开始加载数据...")
full_load(df, self.target_engine, 'target_table')
print("加载完成")
def run(self):
"""运行ETL流程"""
df = self.extract()
df = self.transform(df)
self.load(df)
print("ETL流程完成")
# 使用示例
# from sqlalchemy import create_engine
# source_engine = create_engine('postgresql://user:pass@host/source_db')
# target_engine = create_engine('postgresql://user:pass@host/target_db')
# etl = ETLProcess(source_engine, target_engine)
# etl.run()
3.6 练习题¶
基础题¶
- 选择题
-
ETL中的T代表什么?
- A. Transform
- B. Transfer
- C. Transport
- D. Translate
-
简答题
- 解释ETL和ELT的区别。
- 说明数据抽取的方式。
进阶题¶
- 实践题
- 设计一个ETL流程。
- 实现数据抽取和转换。
-
实现数据加载。
-
设计题
- 设计一个增量ETL流程。
- 设计一个实时ETL流程。
答案¶
1. 选择题答案¶
- A(ETL中的T代表Transform)
2. 简答题答案¶
ETL和ELT的区别: - ETL:转换在ETL服务器进行 - ELT:转换在目标系统进行
数据抽取的方式: - 全量抽取 - 增量抽取
3. 实践题答案¶
参见3.2-3.5节的示例。
4. 设计题答案¶
参见3.2-3.5节的示例。
3.7 面试准备¶
大厂面试题¶
字节跳动¶
- 解释ETL流程的三个阶段。
- 如何设计增量ETL?
- 如何处理ETL中的错误?
- 如何优化ETL性能?
腾讯¶
- 数据转换的最佳实践是什么?
- 如何设计容错的ETL流程?
- 如何监控ETL流程?
- 如何设计ETL调度?
阿里云¶
- ETL工具的选择标准是什么?
- 如何设计实时ETL?
- 如何处理数据质量问题?
- 如何设计ETL版本控制?
📚 参考资料¶
- 《ETL Best Practices》
- 《Data Integration Blueprint》
- Apache Airflow文档
- 《Data Pipeline》
🎯 本章小结¶
本章深入讲解了ETL流程设计,包括:
- ETL/ELT的核心概念
- 数据抽取技术
- 数据转换方法
- 数据加载策略
- 完整的ETL流程实现
通过本章学习,你掌握了ETL流程设计的核心技术,能够设计和实现高效的ETL流程。下一章将深入学习数据仓库架构。