第12章 数据管道¶
📚 章节概述¶
本章将深入讲解数据管道,包括Airflow、dbt、调度系统等。通过本章学习,你将能够构建高效的数据管道。
🎯 学习目标¶
完成本章后,你将能够:
- 理解数据管道的核心概念
- 掌握Airflow的使用
- 了解dbt数据转换
- 掌握调度系统
- 能够构建生产级的数据管道
12.1 数据管道概述¶
12.1.1 什么是数据管道¶
数据管道是数据从源系统到目标系统的自动化流程。
数据管道价值¶
- 自动化
- 自动化数据流动
- 减少人工干预
-
提高效率
-
可靠性
- 错误处理
- 重试机制
-
监控告警
-
可扩展性
- 水平扩展
- 弹性伸缩
- 资源优化
12.1.2 数据管道架构¶
Text Only
┌─────────────────────────────────────────────────────┐
│ 数据管道架构 │
├─────────────────────────────────────────────────────┤
│ 数据源层 │
│ ├─ 数据库 │
│ ├─ API │
│ ├─ 文件系统 │
│ └─ 消息队列 │
├─────────────────────────────────────────────────────┤
│ 数据处理层 │
│ ├─ 数据抽取 │
│ ├─ 数据转换 │
│ ├─ 数据清洗 │
│ └─ 数据丰富 │
├─────────────────────────────────────────────────────┤
│ 数据存储层 │
│ ├─ 数据仓库 │
│ ├─ 数据湖 │
│ ├─ 数据集市 │
│ └─ 数据服务 │
├─────────────────────────────────────────────────────┤
│ 数据服务层 │
│ ├─ API服务 │
│ ├─ BI工具 │
│ ├─ 数据应用 │
│ └─ 数据推送 │
└─────────────────────────────────────────────────────┘
12.2 Apache Airflow¶
12.2.1 Airflow概述¶
Apache Airflow是开源的工作流调度平台,用于开发、调度和监控数据管道。
核心概念¶
- DAG(有向无环图)
- 工作流定义
- 任务依赖
-
调度执行
-
Operator
- 任务执行器
- 数据库操作
-
文件操作
-
Sensor
- 条件检查
- 外部触发
- 定时调度
12.2.2 Airflow使用¶
Python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# 定义DAG
default_args = {
'owner': 'data_team',
'start_date': datetime(2024, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
'depends_on_past': False,
}
dag = DAG(
'sales_pipeline',
default_args=default_args,
description='Daily sales ETL pipeline',
schedule='@daily',
catchup=False,
tags=['sales', 'etl']
)
# 定义任务
def extract_sales():
"""抽取销售数据"""
print("Extracting sales data...")
# 实际的数据抽取逻辑
return 'sales_data.csv'
def transform_sales(ti):
"""转换销售数据"""
print("Transforming sales data...")
# 实际的数据转换逻辑
return 'transformed_sales.csv'
def load_sales(ti):
"""加载销售数据"""
print("Loading sales data...")
# 实际的数据加载逻辑
return 'load_complete'
# 创建任务
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_sales,
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform_sales,
dag=dag
)
load_task = PythonOperator(
task_id='load',
python_callable=load_sales,
dag=dag
)
# 设置任务依赖
extract_task >> transform_task >> load_task
12.3 dbt¶
12.3.1 dbt概述¶
dbt(data build tool)是数据转换工具,支持SQL和Python。
核心特性¶
- SQL优先
- SQL作为主要语言
- 易于学习
-
团队协作
-
版本控制
- 代码版本管理
- 变更追踪
-
回滚支持
-
测试驱动
- 数据测试
- 模式验证
- 质量保证
12.3.2 dbt使用¶
SQL
-- models/marts/product_sales.sql
{{ config(materialized='table') }}
SELECT
product_id,
SUM(quantity) as total_quantity,
SUM(amount) as total_amount,
COUNT(*) as order_count
FROM {{ source('raw', 'sales') }}
WHERE sale_date >= '{{ var("start_date", "2024-01-01") }}'
GROUP BY product_id -- GROUP BY分组;HAVING过滤分组
Text Only
-- seeds/sales_seed.csv——dbt seed 使用 CSV 文件作为种子数据
product_id,customer_id,sale_date,quantity,amount
1,1,2024-01-01,10,100.0
2,2,2024-01-01,20,200.0
1,3,2024-01-02,15,150.0
YAML
# dbt_project.yml
name: 'sales_project'
version: '1.0.0'
config-version: 2
profile: 'sales'
model-paths: ["models"]
seed-paths: ["seeds"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
sales_project:
+materialized: table
+persist_docs:
relation: true
columns: true
seeds:
sales_project:
+quote_columns: false
+column_types:
product_id: integer
customer_id: integer
sale_date: date
quantity: integer
amount: numeric(10,2)
12.4 调度系统¶
12.4.1 调度概述¶
调度系统是自动化执行任务的系统。
调度类型¶
- 时间调度
- 定时执行
- 周期调度
-
一次性执行
-
事件调度
- 文件触发
- 消息触发
-
API触发
-
依赖调度
- 任务依赖
- 串行执行
- 并行执行
12.4.2 调度实现¶
Python
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
class TaskScheduler:
"""任务调度器"""
def __init__(self):
self.scheduler = BackgroundScheduler()
self.jobs = {}
def add_job(self, job_id, func, trigger, **kwargs): # *args接收任意位置参数;**kwargs接收任意关键字参数
"""添加任务"""
self.scheduler.add_job(func, trigger, id=job_id, **kwargs)
self.jobs[job_id] = func
print(f"Job added: {job_id}")
def remove_job(self, job_id):
"""移除任务"""
self.scheduler.remove_job(job_id)
if job_id in self.jobs:
del self.jobs[job_id]
print(f"Job removed: {job_id}")
def list_jobs(self):
"""列出任务"""
jobs = self.scheduler.get_jobs()
for job in jobs:
print(f"Job: {job.id}, Next run: {job.next_run_time}")
def start(self):
"""启动调度器"""
self.scheduler.start()
print("Scheduler started")
def shutdown(self):
"""关闭调度器"""
self.scheduler.shutdown()
print("Scheduler shutdown")
# 使用示例
scheduler = TaskScheduler()
def extract_data():
"""抽取数据"""
print("Extracting data...")
def transform_data():
"""转换数据"""
print("Transforming data...")
def load_data():
"""加载数据"""
print("Loading data...")
# 添加任务
scheduler.add_job('extract', extract_data, 'interval', seconds=3600)
scheduler.add_job('transform', transform_data, 'interval', seconds=3600)
scheduler.add_job('load', load_data, 'interval', seconds=3600)
# 启动调度器
scheduler.start()
# 列出任务
scheduler.list_jobs()
12.5 练习题¶
基础题¶
- 选择题
-
数据管道的核心价值不包括什么?
- A. 自动化
- B. 可靠性
- C. 手动操作
- D. 可扩展性
-
简答题
- 解释数据管道的架构。
- 说明Airflow的核心概念。
进阶题¶
- 实践题
- 使用Airflow构建数据管道。
- 使用dbt实现数据转换。
-
实现调度系统。
-
设计题
- 设计一个实时数据管道。
- 设计一个批处理管道。
答案¶
1. 选择题答案¶
- C(数据管道的核心价值不包括手动操作)
2. 简答题答案¶
数据管道的架构: - 数据源层、数据处理层、数据存储层、数据服务层
Airflow的核心概念: - DAG、Operator、Sensor
3. 实践题答案¶
参见12.2-12.4节的示例。
4. 设计题答案¶
参见12.1-12.4节的架构设计。
12.6 面试准备¶
大厂面试题¶
字节跳动¶
- 解释数据管道的核心概念。
- Airflow和dbt的区别是什么?
- 如何设计数据管道?
- 如何优化管道性能?
腾讯¶
- 数据管道的最佳实践是什么?
- 如何设计容错机制?
- 如何监控数据管道?
- 如何设计管道的扩展性?
阿里云¶
- 调度系统的类型有哪些?
- 如何设计管道的版本控制?
- 如何处理管道的数据质量?
- 如何设计管道的安全?
📚 参考资料¶
- Airflow文档:https://airflow.apache.org/docs/
- dbt文档:https://docs.getdbt.com/
- 《Data Pipeline》
- 《Workflow Scheduling》
🎯 本章小结¶
本章深入讲解了数据管道,包括:
- 数据管道的核心概念
- Apache Airflow的使用
- dbt数据转换
- 调度系统
通过本章学习,你掌握了数据管道的核心技术,能够构建高效的数据管道。下一章将深入学习大数据处理。