跳转至

第12章 数据管道

数据管道

📚 章节概述

本章将深入讲解数据管道,包括Airflow、dbt、调度系统等。通过本章学习,你将能够构建高效的数据管道。

🎯 学习目标

完成本章后,你将能够:

  1. 理解数据管道的核心概念
  2. 掌握Airflow的使用
  3. 了解dbt数据转换
  4. 掌握调度系统
  5. 能够构建生产级的数据管道

12.1 数据管道概述

12.1.1 什么是数据管道

数据管道是数据从源系统到目标系统的自动化流程。

数据管道价值

  1. 自动化
  2. 自动化数据流动
  3. 减少人工干预
  4. 提高效率

  5. 可靠性

  6. 错误处理
  7. 重试机制
  8. 监控告警

  9. 可扩展性

  10. 水平扩展
  11. 弹性伸缩
  12. 资源优化

12.1.2 数据管道架构

Text Only
┌─────────────────────────────────────────────────────┐
│                   数据管道架构                        │
├─────────────────────────────────────────────────────┤
│  数据源层                                          │
│  ├─ 数据库                                          │
│  ├─ API                                             │
│  ├─ 文件系统                                          │
│  └─ 消息队列                                        │
├─────────────────────────────────────────────────────┤
│  数据处理层                                          │
│  ├─ 数据抽取                                        │
│  ├─ 数据转换                                        │
│  ├─ 数据清洗                                        │
│  └─ 数据丰富                                        │
├─────────────────────────────────────────────────────┤
│  数据存储层                                          │
│  ├─ 数据仓库                                          │
│  ├─ 数据湖                                            │
│  ├─ 数据集市                                          │
│  └─ 数据服务                                          │
├─────────────────────────────────────────────────────┤
│  数据服务层                                          │
│  ├─ API服务                                          │
│  ├─ BI工具                                            │
│  ├─ 数据应用                                          │
│  └─ 数据推送                                        │
└─────────────────────────────────────────────────────┘

12.2 Apache Airflow

12.2.1 Airflow概述

Apache Airflow是开源的工作流调度平台,用于开发、调度和监控数据管道。

核心概念

  1. DAG(有向无环图)
  2. 工作流定义
  3. 任务依赖
  4. 调度执行

  5. Operator

  6. 任务执行器
  7. 数据库操作
  8. 文件操作

  9. Sensor

  10. 条件检查
  11. 外部触发
  12. 定时调度

12.2.2 Airflow使用

Python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# 定义DAG
default_args = {
    'owner': 'data_team',
    'start_date': datetime(2024, 1, 1),
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'depends_on_past': False,
}

dag = DAG(
    'sales_pipeline',
    default_args=default_args,
    description='Daily sales ETL pipeline',
    schedule='@daily',
    catchup=False,
    tags=['sales', 'etl']
)

# 定义任务
def extract_sales():
    """抽取销售数据"""
    print("Extracting sales data...")
    # 实际的数据抽取逻辑
    return 'sales_data.csv'

def transform_sales(ti):
    """转换销售数据"""
    print("Transforming sales data...")
    # 实际的数据转换逻辑
    return 'transformed_sales.csv'

def load_sales(ti):
    """加载销售数据"""
    print("Loading sales data...")
    # 实际的数据加载逻辑
    return 'load_complete'

# 创建任务
extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_sales,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform_sales,
    dag=dag
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load_sales,
    dag=dag
)

# 设置任务依赖
extract_task >> transform_task >> load_task

12.3 dbt

12.3.1 dbt概述

dbt(data build tool)是数据转换工具,支持SQL和Python。

核心特性

  1. SQL优先
  2. SQL作为主要语言
  3. 易于学习
  4. 团队协作

  5. 版本控制

  6. 代码版本管理
  7. 变更追踪
  8. 回滚支持

  9. 测试驱动

  10. 数据测试
  11. 模式验证
  12. 质量保证

12.3.2 dbt使用

SQL
-- models/marts/product_sales.sql
{{ config(materialized='table') }}

SELECT
    product_id,
    SUM(quantity) as total_quantity,
    SUM(amount) as total_amount,
    COUNT(*) as order_count
FROM {{ source('raw', 'sales') }}
WHERE sale_date >= '{{ var("start_date", "2024-01-01") }}'
GROUP BY product_id  -- GROUP BY分组;HAVING过滤分组
Text Only
-- seeds/sales_seed.csv——dbt seed 使用 CSV 文件作为种子数据
product_id,customer_id,sale_date,quantity,amount
1,1,2024-01-01,10,100.0
2,2,2024-01-01,20,200.0
1,3,2024-01-02,15,150.0
YAML
# dbt_project.yml
name: 'sales_project'
version: '1.0.0'
config-version: 2

profile: 'sales'

model-paths: ["models"]
seed-paths: ["seeds"]

target-path: "target"
clean-targets:
  - "target"
  - "dbt_packages"

models:
  sales_project:
    +materialized: table
    +persist_docs:
      relation: true
      columns: true

seeds:
  sales_project:
    +quote_columns: false
    +column_types:
        product_id: integer
        customer_id: integer
        sale_date: date
        quantity: integer
        amount: numeric(10,2)

12.4 调度系统

12.4.1 调度概述

调度系统是自动化执行任务的系统。

调度类型

  1. 时间调度
  2. 定时执行
  3. 周期调度
  4. 一次性执行

  5. 事件调度

  6. 文件触发
  7. 消息触发
  8. API触发

  9. 依赖调度

  10. 任务依赖
  11. 串行执行
  12. 并行执行

12.4.2 调度实现

Python
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime

class TaskScheduler:
    """任务调度器"""

    def __init__(self):
        self.scheduler = BackgroundScheduler()
        self.jobs = {}

    def add_job(self, job_id, func, trigger, **kwargs):  # *args接收任意位置参数;**kwargs接收任意关键字参数
        """添加任务"""
        self.scheduler.add_job(func, trigger, id=job_id, **kwargs)
        self.jobs[job_id] = func
        print(f"Job added: {job_id}")

    def remove_job(self, job_id):
        """移除任务"""
        self.scheduler.remove_job(job_id)
        if job_id in self.jobs:
            del self.jobs[job_id]
        print(f"Job removed: {job_id}")

    def list_jobs(self):
        """列出任务"""
        jobs = self.scheduler.get_jobs()
        for job in jobs:
            print(f"Job: {job.id}, Next run: {job.next_run_time}")

    def start(self):
        """启动调度器"""
        self.scheduler.start()
        print("Scheduler started")

    def shutdown(self):
        """关闭调度器"""
        self.scheduler.shutdown()
        print("Scheduler shutdown")

# 使用示例
scheduler = TaskScheduler()

def extract_data():
    """抽取数据"""
    print("Extracting data...")

def transform_data():
    """转换数据"""
    print("Transforming data...")

def load_data():
    """加载数据"""
    print("Loading data...")

# 添加任务
scheduler.add_job('extract', extract_data, 'interval', seconds=3600)
scheduler.add_job('transform', transform_data, 'interval', seconds=3600)
scheduler.add_job('load', load_data, 'interval', seconds=3600)

# 启动调度器
scheduler.start()

# 列出任务
scheduler.list_jobs()

12.5 练习题

基础题

  1. 选择题
  2. 数据管道的核心价值不包括什么?

    • A. 自动化
    • B. 可靠性
    • C. 手动操作
    • D. 可扩展性
  3. 简答题

  4. 解释数据管道的架构。
  5. 说明Airflow的核心概念。

进阶题

  1. 实践题
  2. 使用Airflow构建数据管道。
  3. 使用dbt实现数据转换。
  4. 实现调度系统。

  5. 设计题

  6. 设计一个实时数据管道。
  7. 设计一个批处理管道。

答案

1. 选择题答案

  1. C(数据管道的核心价值不包括手动操作)

2. 简答题答案

数据管道的架构: - 数据源层、数据处理层、数据存储层、数据服务层

Airflow的核心概念: - DAG、Operator、Sensor

3. 实践题答案

参见12.2-12.4节的示例。

4. 设计题答案

参见12.1-12.4节的架构设计。

12.6 面试准备

大厂面试题

字节跳动

  1. 解释数据管道的核心概念。
  2. Airflow和dbt的区别是什么?
  3. 如何设计数据管道?
  4. 如何优化管道性能?

腾讯

  1. 数据管道的最佳实践是什么?
  2. 如何设计容错机制?
  3. 如何监控数据管道?
  4. 如何设计管道的扩展性?

阿里云

  1. 调度系统的类型有哪些?
  2. 如何设计管道的版本控制?
  3. 如何处理管道的数据质量?
  4. 如何设计管道的安全?

📚 参考资料

🎯 本章小结

本章深入讲解了数据管道,包括:

  1. 数据管道的核心概念
  2. Apache Airflow的使用
  3. dbt数据转换
  4. 调度系统

通过本章学习,你掌握了数据管道的核心技术,能够构建高效的数据管道。下一章将深入学习大数据处理。