跳转至

第17章 dbt数据转换

dbt数据转换

📚 章节概述

dbt(data build tool)是现代数据转换的标准工具,以SQL为核心实现ELT中的T(Transform),将软件工程最佳实践引入数据分析。

🎯 学习目标

完成本章后,你将能够:

  1. 理解dbt的核心概念与工作流
  2. 掌握Model开发(staging/intermediate/marts)
  3. 了解Source、Ref与DAG依赖管理
  4. 掌握数据测试与Macro/Jinja模板
  5. 能够构建生产级的dbt数据转换项目

目录


1. dbt概述

1.1 什么是dbt

dbt 是一个数据转换框架,它让数据分析师和数据工程师可以用SQL编写数据转换逻辑,同时享受软件工程的最佳实践。

核心理念:

  • SQL优先:用SELECT语句定义转换,无需编写DDL/DML
  • 版本控制:所有转换逻辑存储在Git中
  • 自动化测试:为数据定义测试规则
  • 文档化:自动生成数据文档和血缘图
  • 模块化:通过ref和macro实现代码复用

1.2 dbt Core vs dbt Cloud

维度 dbt Core dbt Cloud
部署 本地/自建 SaaS
调度 需外部工具(Airflow等) 内置调度器
IDE 命令行 + VS Code 浏览器IDE
费用 开源免费 按用量收费
CI/CD 自建 内置Slim CI
语义层 MetricFlow

1.3 dbt工作流

Text Only
数据源          dbt转换                     消费端
┌──────┐    ┌──────────────────┐      ┌──────────┐
│ 数据库 │    │ Source → Staging │      │ BI工具   │
│ 数据湖 │ →  │ Staging → Marts │  →   │ ML管道   │
│ API   │    │ Tests + Docs    │      │ 应用程序  │
└──────┘    └──────────────────┘      └──────────┘

     EL(Extract & Load)             T(Transform)
     Fivetran/Airbyte处理              dbt处理

2. 核心概念

2.1 Model

Model是dbt的核心单元,每个Model就是一个SELECT语句,保存为.sql文件。

SQL
-- models/staging/stg_orders.sql
-- dbt自动创建表或视图,无需写CREATE TABLE

SELECT
    id AS order_id,
    user_id AS customer_id,
    order_date,
    status,
    amount AS order_amount
FROM {{ source('raw', 'orders') }}
WHERE order_date IS NOT NULL

物化方式(Materialization):

类型 说明 适用场景
view 创建视图(默认) 轻量转换、不需要缓存
table 创建表 需要持久化、下游多次引用
incremental 增量更新 大表、追加型数据
ephemeral CTE(不物化) 仅作为中间逻辑复用

2.2 项目结构

Text Only
my_dbt_project/
├── dbt_project.yml          # 项目配置
├── profiles.yml             # 连接配置(通常在~/.dbt/)
├── models/
│   ├── staging/             # 数据清洗层
│   │   ├── _staging_models.yml   # schema定义
│   │   ├── stg_orders.sql
│   │   ├── stg_customers.sql
│   │   └── stg_products.sql
│   ├── intermediate/        # 中间层
│   │   └── int_order_items_joined.sql
│   └── marts/               # 业务层
│       ├── core/
│       │   ├── _core_models.yml
│       │   ├── dim_customers.sql
│       │   ├── dim_products.sql
│       │   └── fct_orders.sql
│       └── marketing/
│           └── mkt_customer_ltv.sql
├── tests/                   # 自定义测试
│   └── assert_positive_amount.sql
├── macros/                  # 可复用SQL片段
│   └── cents_to_dollars.sql
├── seeds/                   # CSV种子数据
│   └── country_codes.csv
├── snapshots/               # SCD2快照
│   └── scd_customers.sql
└── analyses/                # 分析查询(不物化)
    └── revenue_analysis.sql

2.3 dbt_project.yml

YAML
name: 'my_project'
version: '1.0.0'
config-version: 2

profile: 'my_warehouse'

model-paths: ["models"]
test-paths: ["tests"]
macro-paths: ["macros"]
seed-paths: ["seeds"]
snapshot-paths: ["snapshots"]

models:
  my_project:
    staging:
      +materialized: view
      +schema: staging
    intermediate:
      +materialized: ephemeral
    marts:
      +materialized: table
      +schema: analytics

3. Model开发

3.1 Staging Model

Staging是1:1映射源表的清洗层,负责重命名、类型转换、基本过滤。

SQL
-- models/staging/stg_orders.sql

WITH source AS (
    SELECT * FROM {{ source('raw', 'orders') }}
),

renamed AS (
    SELECT
        id AS order_id,
        user_id AS customer_id,
        CAST(order_date AS DATE) AS order_date,
        CAST(amount AS DECIMAL(10, 2)) AS order_amount,
        LOWER(status) AS order_status,
        created_at,
        updated_at
    FROM source
    WHERE id IS NOT NULL  -- 过滤脏数据
)

SELECT * FROM renamed

3.2 Intermediate Model

中间层处理跨Source的Join和复杂业务逻辑

SQL
-- models/intermediate/int_order_items_enriched.sql

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

order_items AS (
    SELECT * FROM {{ ref('stg_order_items') }}
),

products AS (
    SELECT * FROM {{ ref('stg_products') }}
)

SELECT
    oi.order_item_id,
    oi.order_id,
    o.customer_id,
    o.order_date,
    p.product_name,
    p.category,
    oi.quantity,
    oi.unit_price,
    oi.quantity * oi.unit_price AS line_total
FROM order_items oi
JOIN orders o ON oi.order_id = o.order_id
JOIN products p ON oi.product_id = p.product_id

3.3 Marts Model(Fact & Dimension)

SQL
-- models/marts/core/fct_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}

WITH order_items AS (
    SELECT * FROM {{ ref('int_order_items_enriched') }}
)

SELECT
    order_id,
    customer_id,
    order_date,
    COUNT(DISTINCT order_item_id) AS item_count,
    SUM(line_total) AS order_total,
    MIN(line_total) AS min_item_total,
    MAX(line_total) AS max_item_total
FROM order_items

{% if is_incremental() %}
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}

GROUP BY order_id, customer_id, order_date
SQL
-- models/marts/core/dim_customers.sql

WITH customers AS (
    SELECT * FROM {{ ref('stg_customers') }}
),

orders AS (
    SELECT * FROM {{ ref('fct_orders') }}
),

customer_orders AS (
    SELECT
        customer_id,
        MIN(order_date) AS first_order_date,
        MAX(order_date) AS most_recent_order_date,
        COUNT(order_id) AS total_orders,
        SUM(order_total) AS lifetime_value
    FROM orders
    GROUP BY customer_id
)

SELECT
    c.customer_id,
    c.customer_name,
    c.email,
    c.signup_date,
    co.first_order_date,
    co.most_recent_order_date,
    COALESCE(co.total_orders, 0) AS total_orders,
    COALESCE(co.lifetime_value, 0) AS lifetime_value,
    CASE
        WHEN co.lifetime_value >= 1000 THEN 'high'
        WHEN co.lifetime_value >= 100 THEN 'medium'
        ELSE 'low'
    END AS customer_tier
FROM customers c
LEFT JOIN customer_orders co ON c.customer_id = co.customer_id

4. Source与Ref

4.1 Source定义

YAML
# models/staging/_staging_models.yml
version: 2

sources:
  - name: raw
    description: "原始数据源(EL工具加载)"
    database: raw_db
    schema: public
    tables:
      - name: orders
        description: "订单表"
        loaded_at_field: _etl_loaded_at
        freshness:
          warn_after: { count: 12, period: hour }
          error_after: { count: 24, period: hour }
        columns:
          - name: id
            description: "订单ID"
            data_tests:
              - unique
              - not_null
      - name: customers
        description: "客户表"
      - name: products
        description: "商品表"

4.2 ref函数

SQL
-- ref()建立模型间的依赖关系,dbt自动构建DAG

-- 引用同项目model
SELECT * FROM {{ ref('stg_orders') }}

-- 引用其他dbt包的model
SELECT * FROM {{ ref('dbt_utils', 'date_spine') }}

-- source()引用源表
SELECT * FROM {{ source('raw', 'orders') }}

4.3 DAG(有向无环图)

Text Only
dbt自动根据ref()关系构建DAG:

source.raw.orders ──→ stg_orders ──┐
                                   ├──→ int_order_items ──→ fct_orders
source.raw.items  ──→ stg_items  ──┘                          │
source.raw.customers ──→ stg_customers ──────────────→ dim_customers

5. 数据测试

5.1 内置通用测试

YAML
# models/staging/_staging_models.yml
version: 2

models:
  - name: stg_orders
    description: "清洗后的订单数据"
    columns:
      - name: order_id
        description: "订单唯一标识"
        data_tests:
          - unique
          - not_null
      - name: order_status
        description: "订单状态"
        data_tests:
          - accepted_values:
              values: ['pending', 'shipped', 'completed', 'cancelled']
      - name: customer_id
        description: "客户ID"
        data_tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id

5.2 自定义数据测试

SQL
-- tests/assert_positive_order_amount.sql
-- 返回行数 > 0 即测试失败

SELECT
    order_id,
    order_amount
FROM {{ ref('fct_orders') }}
WHERE order_amount <= 0
SQL
-- tests/generic/test_is_positive.sql
{% test is_positive(model, column_name) %}

SELECT *
FROM {{ model }}
WHERE {{ column_name }} < 0

{% endtest %}
YAML
# 使用自定义测试
models:
  - name: fct_orders
    columns:
      - name: order_total
        data_tests:
          - is_positive

5.3 运行测试

Bash
# 运行所有测试
dbt test

# 运行特定model的测试
dbt test --select stg_orders

# 运行source freshness检查
dbt source freshness

# 运行特定测试类型
dbt test --select "test_type:singular"

6. Macro与Jinja

6.1 Jinja模板基础

SQL
-- Jinja在dbt中的用法

-- 变量
{% set payment_methods = ['credit_card', 'bank_transfer', 'gift_card'] %}

-- 循环
SELECT
    order_id,
    {% for method in payment_methods %}
    SUM(CASE WHEN payment_method = '{{ method }}' THEN amount ELSE 0 END)
        AS {{ method }}_amount
    {% if not loop.last %},{% endif %}
    {% endfor %}
FROM {{ ref('stg_payments') }}
GROUP BY order_id

-- 条件
SELECT *
FROM {{ ref('fct_orders') }}
{% if target.name == 'dev' %}
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
{% endif %}

6.2 自定义Macro

SQL
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, decimal_places=2) %}
    ROUND({{ column_name }} / 100.0, {{ decimal_places }})
{% endmacro %}

-- 使用
SELECT
    order_id,
    {{ cents_to_dollars('amount_cents') }} AS amount_dollars
FROM {{ ref('stg_payments') }}
SQL
-- macros/generate_schema_name.sql(自定义schema命名规则)
{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ default_schema }}
    {%- else -%}
        {{ default_schema }}_{{ custom_schema_name }}
    {%- endif -%}
{%- endmacro %}

6.3 常用dbt_utils Macro

SQL
-- 安装:packages.yml
packages:
  - package: dbt-labs/dbt_utils
    version: ">=1.0.0"

-- 使用surrogate_key
SELECT
    {{ dbt_utils.generate_surrogate_key(['order_id', 'product_id']) }} AS order_item_key,
    *
FROM {{ ref('stg_order_items') }}

-- Date Spine(生成日期维度)
{{ dbt_utils.date_spine(
    datepart="day",
    start_date="cast('2020-01-01' as date)",
    end_date="cast('2026-01-01' as date)"
) }}

-- Pivot
SELECT
    customer_id,
    {{ dbt_utils.pivot(
        'order_status',
        dbt_utils.get_column_values(ref('stg_orders'), 'order_status')
    ) }}
FROM {{ ref('stg_orders') }}
GROUP BY customer_id

7. SQL转换最佳实践

7.1 CTE风格指南

SQL
-- 推荐:使用CTE让逻辑清晰

-- 导入CTE(引用依赖)
WITH orders AS (  -- CTE公共表表达式:临时命名结果集
    SELECT * FROM {{ ref('stg_orders') }}
),

customers AS (
    SELECT * FROM {{ ref('stg_customers') }}
),

-- 逻辑CTE(业务转换)
customer_orders AS (
    SELECT
        customer_id,
        COUNT(*) AS order_count,
        SUM(order_amount) AS total_spent
    FROM orders
    GROUP BY customer_id  -- GROUP BY分组;HAVING过滤分组
),

-- 最终CTE(组装结果)
final AS (
    SELECT
        c.customer_id,
        c.customer_name,
        COALESCE(co.order_count, 0) AS order_count,
        COALESCE(co.total_spent, 0) AS total_spent
    FROM customers c
    LEFT JOIN customer_orders co USING (customer_id)  -- JOIN连接多个表
)

SELECT * FROM final

7.2 命名规范

层次 前缀 示例
Staging stg_ stg_orders, stg_customers
Intermediate int_ int_order_items_enriched
Fact fct_ fct_orders, fct_sessions
Dimension dim_ dim_customers, dim_products
Metric mkt_/fin_ mkt_campaign_performance

7.3 增量模型策略

SQL
-- 策略一:Append(追加,适合事件型数据)
{{ config(
    materialized='incremental',
    incremental_strategy='append'
) }}

SELECT * FROM {{ ref('stg_events') }}
{% if is_incremental() %}
WHERE event_time > (SELECT MAX(event_time) FROM {{ this }})
{% endif %}

-- 策略二:Merge(合并,适合缓慢变化数据)
{{ config(
    materialized='incremental',
    unique_key='user_id',
    incremental_strategy='merge'
) }}

SELECT * FROM {{ ref('stg_users') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

8. 数据质量体系

8.1 多层测试策略

Text Only
┌──────────────────────────────────────────┐
│            dbt数据质量金字塔               │
│                                          │
│  ┌────────────┐                          │
│  │ 业务规则测试 │  ← fct/dim层             │
│  │ (少量关键)  │                          │
│  ├────────────┤                          │
│  │ 一致性测试   │  ← intermediate层        │
│  │ (跨表校验)  │                          │
│  ├────────────┤                          │
│  │ 基础质量测试 │  ← staging层             │
│  │ (大量覆盖)  │  unique/not_null/FK      │
│  ├────────────┤                          │
│  │ Source新鲜度 │  ← source层              │
│  │ (数据到达)  │                          │
│  └────────────┘                          │
└──────────────────────────────────────────┘

8.2 dbt-expectations包

YAML
# packages.yml
packages:
  - package: calogica/dbt_expectations
    version: ">=0.10.0"
YAML
# 丰富的数据质量测试
models:
  - name: fct_orders
    columns:
      - name: order_total
        data_tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 100000
          - dbt_expectations.expect_column_mean_to_be_between:
              min_value: 50
              max_value: 500
      - name: order_date
        data_tests:
          - dbt_expectations.expect_column_values_to_be_of_type:
              column_type: date
          - dbt_expectations.expect_row_values_to_have_recent_data:
              datepart: day
              interval: 1

8.3 文档生成

YAML
# models/marts/core/_core_models.yml
version: 2

models:
  - name: fct_orders
    description: |
      订单事实表,每行一个订单。
      包含订单的金额、商品数量等度量值。

      **粒度**: 一行 = 一个订单
      **更新频率**: 每日增量
      **数据源**: stg_orders, stg_order_items
    columns:
      - name: order_id
        description: "{{ doc('order_id_desc') }}"
      - name: order_total
        description: "订单总金额(美元)"
Bash
# 生成并查看文档
dbt docs generate
dbt docs serve --port 8080

9. dbt项目实战

9.1 常用命令

Bash
# 初始化项目
dbt init my_project

# 安装依赖包
dbt deps

# 编译(不执行)
dbt compile

# 运行所有model
dbt run

# 运行特定model及其依赖
dbt run --select fct_orders+      # fct_orders及下游
dbt run --select +fct_orders      # fct_orders及上游
dbt run --select stg_orders fct_orders  # 指定多个

# Seed(加载CSV)
dbt seed

# Snapshot(SCD2)
dbt snapshot

# 完整构建(seed + run + test + snapshot)
dbt build

# 查看血缘
dbt ls --select +fct_orders+

9.2 环境管理

YAML
# profiles.yml
my_warehouse:
  target: dev
  outputs:
    dev:
      type: snowflake
      account: xxx.snowflakecomputing.com
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
      database: analytics_dev
      schema: "dbt_{{ env_var('USER', 'default') }}"
      warehouse: dev_wh

    prod:
      type: snowflake
      account: xxx.snowflakecomputing.com
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
      database: analytics
      schema: public
      warehouse: prod_wh

9.3 CI/CD集成

YAML
# GitHub Actions
name: dbt CI
on:
  pull_request:
    paths: ['dbt/**']

jobs:
  dbt-test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dbt
        run: pip install dbt-snowflake

      - name: dbt deps
        run: dbt deps

      - name: dbt build (modified models only)
        run: |
          dbt build --select state:modified+ \
            --defer --state ./prod-manifest/ \
            --target ci
        env:
          SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
          SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}

10. 面试题精选

Q1: dbt的核心价值是什么?

  1. SQL优先:分析师用熟悉的SQL即可开发转换
  2. 版本控制:Git管理所有转换逻辑
  3. 自动化DAGref()自动建立依赖关系
  4. 测试框架:内置数据质量测试
  5. 文档即代码:schema文件同时定义文档和测试
  6. 环境隔离:dev/staging/prod独立运行

Q2: dbt的四种物化方式分别适用什么场景?

类型 适用场景 特点
view 轻量清洗、数据量小 不占存储,查询时计算
table 频繁查询、下游多 空间换时间
incremental 大表、追加型数据 只处理新增数据
ephemeral 仅被其他model引用 编译为CTE,不物化

Q3: 如何设计dbt项目的分层结构?

  • Staging:1:1映射源表,做清洗、重命名、类型转换
  • Intermediate:跨Source的Join和复杂逻辑
  • Marts:面向业务的Fact和Dimension表
  • 原则:staging用view,marts用table,intermediate用ephemeral

Q4: dbt的增量模型如何处理延迟数据?

SQL
{{ config(
    materialized='incremental',
    unique_key='event_id'
) }}

SELECT * FROM {{ ref('stg_events') }}
{% if is_incremental() %}
-- 回看3天数据处理延迟到达
WHERE event_time > (SELECT MAX(event_time) - INTERVAL '3 days' FROM {{ this }})  -- 子查询:嵌套在另一个查询中
{% endif %}

配合unique_key使用merge策略,延迟数据会更新已有记录而非重复插入。

Q5: dbt中source和ref的区别?

维度 source() ref()
引用对象 外部原始数据表 dbt管理的model
定义位置 YAML的sources块 自动根据文件名
新鲜度检查 支持 不支持
典型使用 staging层引用源表 非staging层引用上游model

11. 推荐资源


下一步学习:结合08-数据质量构建全面的数据质量体系,参考16-数据湖仓一体了解dbt在Lakehouse上的应用。