第17章 dbt数据转换¶
📚 章节概述¶
dbt(data build tool)是现代数据转换的标准工具,以SQL为核心实现ELT中的T(Transform),将软件工程最佳实践引入数据分析。
🎯 学习目标¶
完成本章后,你将能够:
- 理解dbt的核心概念与工作流
- 掌握Model开发(staging/intermediate/marts)
- 了解Source、Ref与DAG依赖管理
- 掌握数据测试与Macro/Jinja模板
- 能够构建生产级的dbt数据转换项目
目录¶
- 1. dbt概述
- 2. 核心概念
- 3. Model开发
- 4. Source与Ref
- 5. 数据测试
- 6. Macro与Jinja
- 7. SQL转换最佳实践
- 8. 数据质量体系
- 9. dbt项目实战
- 10. 面试题精选
- 11. 推荐资源
1. dbt概述¶
1.1 什么是dbt¶
dbt 是一个数据转换框架,它让数据分析师和数据工程师可以用SQL编写数据转换逻辑,同时享受软件工程的最佳实践。
核心理念:
- SQL优先:用SELECT语句定义转换,无需编写DDL/DML
- 版本控制:所有转换逻辑存储在Git中
- 自动化测试:为数据定义测试规则
- 文档化:自动生成数据文档和血缘图
- 模块化:通过ref和macro实现代码复用
1.2 dbt Core vs dbt Cloud¶
| 维度 | dbt Core | dbt Cloud |
|---|---|---|
| 部署 | 本地/自建 | SaaS |
| 调度 | 需外部工具(Airflow等) | 内置调度器 |
| IDE | 命令行 + VS Code | 浏览器IDE |
| 费用 | 开源免费 | 按用量收费 |
| CI/CD | 自建 | 内置Slim CI |
| 语义层 | 无 | MetricFlow |
1.3 dbt工作流¶
Text Only
数据源 dbt转换 消费端
┌──────┐ ┌──────────────────┐ ┌──────────┐
│ 数据库 │ │ Source → Staging │ │ BI工具 │
│ 数据湖 │ → │ Staging → Marts │ → │ ML管道 │
│ API │ │ Tests + Docs │ │ 应用程序 │
└──────┘ └──────────────────┘ └──────────┘
EL(Extract & Load) T(Transform)
Fivetran/Airbyte处理 dbt处理
2. 核心概念¶
2.1 Model¶
Model是dbt的核心单元,每个Model就是一个SELECT语句,保存为.sql文件。
SQL
-- models/staging/stg_orders.sql
-- dbt自动创建表或视图,无需写CREATE TABLE
SELECT
id AS order_id,
user_id AS customer_id,
order_date,
status,
amount AS order_amount
FROM {{ source('raw', 'orders') }}
WHERE order_date IS NOT NULL
物化方式(Materialization):
| 类型 | 说明 | 适用场景 |
|---|---|---|
view | 创建视图(默认) | 轻量转换、不需要缓存 |
table | 创建表 | 需要持久化、下游多次引用 |
incremental | 增量更新 | 大表、追加型数据 |
ephemeral | CTE(不物化) | 仅作为中间逻辑复用 |
2.2 项目结构¶
Text Only
my_dbt_project/
├── dbt_project.yml # 项目配置
├── profiles.yml # 连接配置(通常在~/.dbt/)
├── models/
│ ├── staging/ # 数据清洗层
│ │ ├── _staging_models.yml # schema定义
│ │ ├── stg_orders.sql
│ │ ├── stg_customers.sql
│ │ └── stg_products.sql
│ ├── intermediate/ # 中间层
│ │ └── int_order_items_joined.sql
│ └── marts/ # 业务层
│ ├── core/
│ │ ├── _core_models.yml
│ │ ├── dim_customers.sql
│ │ ├── dim_products.sql
│ │ └── fct_orders.sql
│ └── marketing/
│ └── mkt_customer_ltv.sql
├── tests/ # 自定义测试
│ └── assert_positive_amount.sql
├── macros/ # 可复用SQL片段
│ └── cents_to_dollars.sql
├── seeds/ # CSV种子数据
│ └── country_codes.csv
├── snapshots/ # SCD2快照
│ └── scd_customers.sql
└── analyses/ # 分析查询(不物化)
└── revenue_analysis.sql
2.3 dbt_project.yml¶
YAML
name: 'my_project'
version: '1.0.0'
config-version: 2
profile: 'my_warehouse'
model-paths: ["models"]
test-paths: ["tests"]
macro-paths: ["macros"]
seed-paths: ["seeds"]
snapshot-paths: ["snapshots"]
models:
my_project:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: analytics
3. Model开发¶
3.1 Staging Model¶
Staging是1:1映射源表的清洗层,负责重命名、类型转换、基本过滤。
SQL
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
),
renamed AS (
SELECT
id AS order_id,
user_id AS customer_id,
CAST(order_date AS DATE) AS order_date,
CAST(amount AS DECIMAL(10, 2)) AS order_amount,
LOWER(status) AS order_status,
created_at,
updated_at
FROM source
WHERE id IS NOT NULL -- 过滤脏数据
)
SELECT * FROM renamed
3.2 Intermediate Model¶
中间层处理跨Source的Join和复杂业务逻辑。
SQL
-- models/intermediate/int_order_items_enriched.sql
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
order_items AS (
SELECT * FROM {{ ref('stg_order_items') }}
),
products AS (
SELECT * FROM {{ ref('stg_products') }}
)
SELECT
oi.order_item_id,
oi.order_id,
o.customer_id,
o.order_date,
p.product_name,
p.category,
oi.quantity,
oi.unit_price,
oi.quantity * oi.unit_price AS line_total
FROM order_items oi
JOIN orders o ON oi.order_id = o.order_id
JOIN products p ON oi.product_id = p.product_id
3.3 Marts Model(Fact & Dimension)¶
SQL
-- models/marts/core/fct_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}
WITH order_items AS (
SELECT * FROM {{ ref('int_order_items_enriched') }}
)
SELECT
order_id,
customer_id,
order_date,
COUNT(DISTINCT order_item_id) AS item_count,
SUM(line_total) AS order_total,
MIN(line_total) AS min_item_total,
MAX(line_total) AS max_item_total
FROM order_items
{% if is_incremental() %}
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
GROUP BY order_id, customer_id, order_date
SQL
-- models/marts/core/dim_customers.sql
WITH customers AS (
SELECT * FROM {{ ref('stg_customers') }}
),
orders AS (
SELECT * FROM {{ ref('fct_orders') }}
),
customer_orders AS (
SELECT
customer_id,
MIN(order_date) AS first_order_date,
MAX(order_date) AS most_recent_order_date,
COUNT(order_id) AS total_orders,
SUM(order_total) AS lifetime_value
FROM orders
GROUP BY customer_id
)
SELECT
c.customer_id,
c.customer_name,
c.email,
c.signup_date,
co.first_order_date,
co.most_recent_order_date,
COALESCE(co.total_orders, 0) AS total_orders,
COALESCE(co.lifetime_value, 0) AS lifetime_value,
CASE
WHEN co.lifetime_value >= 1000 THEN 'high'
WHEN co.lifetime_value >= 100 THEN 'medium'
ELSE 'low'
END AS customer_tier
FROM customers c
LEFT JOIN customer_orders co ON c.customer_id = co.customer_id
4. Source与Ref¶
4.1 Source定义¶
YAML
# models/staging/_staging_models.yml
version: 2
sources:
- name: raw
description: "原始数据源(EL工具加载)"
database: raw_db
schema: public
tables:
- name: orders
description: "订单表"
loaded_at_field: _etl_loaded_at
freshness:
warn_after: { count: 12, period: hour }
error_after: { count: 24, period: hour }
columns:
- name: id
description: "订单ID"
data_tests:
- unique
- not_null
- name: customers
description: "客户表"
- name: products
description: "商品表"
4.2 ref函数¶
SQL
-- ref()建立模型间的依赖关系,dbt自动构建DAG
-- 引用同项目model
SELECT * FROM {{ ref('stg_orders') }}
-- 引用其他dbt包的model
SELECT * FROM {{ ref('dbt_utils', 'date_spine') }}
-- source()引用源表
SELECT * FROM {{ source('raw', 'orders') }}
4.3 DAG(有向无环图)¶
Text Only
dbt自动根据ref()关系构建DAG:
source.raw.orders ──→ stg_orders ──┐
├──→ int_order_items ──→ fct_orders
source.raw.items ──→ stg_items ──┘ │
▼
source.raw.customers ──→ stg_customers ──────────────→ dim_customers
5. 数据测试¶
5.1 内置通用测试¶
YAML
# models/staging/_staging_models.yml
version: 2
models:
- name: stg_orders
description: "清洗后的订单数据"
columns:
- name: order_id
description: "订单唯一标识"
data_tests:
- unique
- not_null
- name: order_status
description: "订单状态"
data_tests:
- accepted_values:
values: ['pending', 'shipped', 'completed', 'cancelled']
- name: customer_id
description: "客户ID"
data_tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
5.2 自定义数据测试¶
SQL
-- tests/assert_positive_order_amount.sql
-- 返回行数 > 0 即测试失败
SELECT
order_id,
order_amount
FROM {{ ref('fct_orders') }}
WHERE order_amount <= 0
SQL
-- tests/generic/test_is_positive.sql
{% test is_positive(model, column_name) %}
SELECT *
FROM {{ model }}
WHERE {{ column_name }} < 0
{% endtest %}
5.3 运行测试¶
Bash
# 运行所有测试
dbt test
# 运行特定model的测试
dbt test --select stg_orders
# 运行source freshness检查
dbt source freshness
# 运行特定测试类型
dbt test --select "test_type:singular"
6. Macro与Jinja¶
6.1 Jinja模板基础¶
SQL
-- Jinja在dbt中的用法
-- 变量
{% set payment_methods = ['credit_card', 'bank_transfer', 'gift_card'] %}
-- 循环
SELECT
order_id,
{% for method in payment_methods %}
SUM(CASE WHEN payment_method = '{{ method }}' THEN amount ELSE 0 END)
AS {{ method }}_amount
{% if not loop.last %},{% endif %}
{% endfor %}
FROM {{ ref('stg_payments') }}
GROUP BY order_id
-- 条件
SELECT *
FROM {{ ref('fct_orders') }}
{% if target.name == 'dev' %}
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
{% endif %}
6.2 自定义Macro¶
SQL
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, decimal_places=2) %}
ROUND({{ column_name }} / 100.0, {{ decimal_places }})
{% endmacro %}
-- 使用
SELECT
order_id,
{{ cents_to_dollars('amount_cents') }} AS amount_dollars
FROM {{ ref('stg_payments') }}
SQL
-- macros/generate_schema_name.sql(自定义schema命名规则)
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name }}
{%- endif -%}
{%- endmacro %}
6.3 常用dbt_utils Macro¶
SQL
-- 安装:packages.yml
packages:
- package: dbt-labs/dbt_utils
version: ">=1.0.0"
-- 使用surrogate_key
SELECT
{{ dbt_utils.generate_surrogate_key(['order_id', 'product_id']) }} AS order_item_key,
*
FROM {{ ref('stg_order_items') }}
-- Date Spine(生成日期维度)
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('2020-01-01' as date)",
end_date="cast('2026-01-01' as date)"
) }}
-- Pivot
SELECT
customer_id,
{{ dbt_utils.pivot(
'order_status',
dbt_utils.get_column_values(ref('stg_orders'), 'order_status')
) }}
FROM {{ ref('stg_orders') }}
GROUP BY customer_id
7. SQL转换最佳实践¶
7.1 CTE风格指南¶
SQL
-- 推荐:使用CTE让逻辑清晰
-- 导入CTE(引用依赖)
WITH orders AS ( -- CTE公共表表达式:临时命名结果集
SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
SELECT * FROM {{ ref('stg_customers') }}
),
-- 逻辑CTE(业务转换)
customer_orders AS (
SELECT
customer_id,
COUNT(*) AS order_count,
SUM(order_amount) AS total_spent
FROM orders
GROUP BY customer_id -- GROUP BY分组;HAVING过滤分组
),
-- 最终CTE(组装结果)
final AS (
SELECT
c.customer_id,
c.customer_name,
COALESCE(co.order_count, 0) AS order_count,
COALESCE(co.total_spent, 0) AS total_spent
FROM customers c
LEFT JOIN customer_orders co USING (customer_id) -- JOIN连接多个表
)
SELECT * FROM final
7.2 命名规范¶
| 层次 | 前缀 | 示例 |
|---|---|---|
| Staging | stg_ | stg_orders, stg_customers |
| Intermediate | int_ | int_order_items_enriched |
| Fact | fct_ | fct_orders, fct_sessions |
| Dimension | dim_ | dim_customers, dim_products |
| Metric | mkt_/fin_ | mkt_campaign_performance |
7.3 增量模型策略¶
SQL
-- 策略一:Append(追加,适合事件型数据)
{{ config(
materialized='incremental',
incremental_strategy='append'
) }}
SELECT * FROM {{ ref('stg_events') }}
{% if is_incremental() %}
WHERE event_time > (SELECT MAX(event_time) FROM {{ this }})
{% endif %}
-- 策略二:Merge(合并,适合缓慢变化数据)
{{ config(
materialized='incremental',
unique_key='user_id',
incremental_strategy='merge'
) }}
SELECT * FROM {{ ref('stg_users') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
8. 数据质量体系¶
8.1 多层测试策略¶
Text Only
┌──────────────────────────────────────────┐
│ dbt数据质量金字塔 │
│ │
│ ┌────────────┐ │
│ │ 业务规则测试 │ ← fct/dim层 │
│ │ (少量关键) │ │
│ ├────────────┤ │
│ │ 一致性测试 │ ← intermediate层 │
│ │ (跨表校验) │ │
│ ├────────────┤ │
│ │ 基础质量测试 │ ← staging层 │
│ │ (大量覆盖) │ unique/not_null/FK │
│ ├────────────┤ │
│ │ Source新鲜度 │ ← source层 │
│ │ (数据到达) │ │
│ └────────────┘ │
└──────────────────────────────────────────┘
8.2 dbt-expectations包¶
YAML
# 丰富的数据质量测试
models:
- name: fct_orders
columns:
- name: order_total
data_tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 100000
- dbt_expectations.expect_column_mean_to_be_between:
min_value: 50
max_value: 500
- name: order_date
data_tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: date
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
8.3 文档生成¶
YAML
# models/marts/core/_core_models.yml
version: 2
models:
- name: fct_orders
description: |
订单事实表,每行一个订单。
包含订单的金额、商品数量等度量值。
**粒度**: 一行 = 一个订单
**更新频率**: 每日增量
**数据源**: stg_orders, stg_order_items
columns:
- name: order_id
description: "{{ doc('order_id_desc') }}"
- name: order_total
description: "订单总金额(美元)"
9. dbt项目实战¶
9.1 常用命令¶
Bash
# 初始化项目
dbt init my_project
# 安装依赖包
dbt deps
# 编译(不执行)
dbt compile
# 运行所有model
dbt run
# 运行特定model及其依赖
dbt run --select fct_orders+ # fct_orders及下游
dbt run --select +fct_orders # fct_orders及上游
dbt run --select stg_orders fct_orders # 指定多个
# Seed(加载CSV)
dbt seed
# Snapshot(SCD2)
dbt snapshot
# 完整构建(seed + run + test + snapshot)
dbt build
# 查看血缘
dbt ls --select +fct_orders+
9.2 环境管理¶
YAML
# profiles.yml
my_warehouse:
target: dev
outputs:
dev:
type: snowflake
account: xxx.snowflakecomputing.com
user: "{{ env_var('SNOWFLAKE_USER') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
database: analytics_dev
schema: "dbt_{{ env_var('USER', 'default') }}"
warehouse: dev_wh
prod:
type: snowflake
account: xxx.snowflakecomputing.com
user: "{{ env_var('SNOWFLAKE_USER') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
database: analytics
schema: public
warehouse: prod_wh
9.3 CI/CD集成¶
YAML
# GitHub Actions
name: dbt CI
on:
pull_request:
paths: ['dbt/**']
jobs:
dbt-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dbt
run: pip install dbt-snowflake
- name: dbt deps
run: dbt deps
- name: dbt build (modified models only)
run: |
dbt build --select state:modified+ \
--defer --state ./prod-manifest/ \
--target ci
env:
SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
10. 面试题精选¶
Q1: dbt的核心价值是什么?¶
- SQL优先:分析师用熟悉的SQL即可开发转换
- 版本控制:Git管理所有转换逻辑
- 自动化DAG:
ref()自动建立依赖关系 - 测试框架:内置数据质量测试
- 文档即代码:schema文件同时定义文档和测试
- 环境隔离:dev/staging/prod独立运行
Q2: dbt的四种物化方式分别适用什么场景?¶
| 类型 | 适用场景 | 特点 |
|---|---|---|
view | 轻量清洗、数据量小 | 不占存储,查询时计算 |
table | 频繁查询、下游多 | 空间换时间 |
incremental | 大表、追加型数据 | 只处理新增数据 |
ephemeral | 仅被其他model引用 | 编译为CTE,不物化 |
Q3: 如何设计dbt项目的分层结构?¶
- Staging:1:1映射源表,做清洗、重命名、类型转换
- Intermediate:跨Source的Join和复杂逻辑
- Marts:面向业务的Fact和Dimension表
- 原则:staging用view,marts用table,intermediate用ephemeral
Q4: dbt的增量模型如何处理延迟数据?¶
SQL
{{ config(
materialized='incremental',
unique_key='event_id'
) }}
SELECT * FROM {{ ref('stg_events') }}
{% if is_incremental() %}
-- 回看3天数据处理延迟到达
WHERE event_time > (SELECT MAX(event_time) - INTERVAL '3 days' FROM {{ this }}) -- 子查询:嵌套在另一个查询中
{% endif %}
配合unique_key使用merge策略,延迟数据会更新已有记录而非重复插入。
Q5: dbt中source和ref的区别?¶
| 维度 | source() | ref() |
|---|---|---|
| 引用对象 | 外部原始数据表 | dbt管理的model |
| 定义位置 | YAML的sources块 | 自动根据文件名 |
| 新鲜度检查 | 支持 | 不支持 |
| 典型使用 | staging层引用源表 | 非staging层引用上游model |
11. 推荐资源¶
- dbt官方文档
- dbt Learn(免费课程)
- dbt Package Hub
- dbt Style Guide
- dbt Best Practices
- 《Analytics Engineering with dbt》—— dbt Labs