数据工程实战项目¶
📚 项目概述¶
本章提供5个完整的实战项目,帮助你将理论知识转化为实际能力。每个项目都包含完整的技术栈、实现步骤、代码示例和部署指南。
🎯 项目列表¶
项目1:电商数据仓库¶
项目概述¶
构建一个完整的电商数据仓库,使用现代数据工程技术栈。
技术栈¶
- 数据源:MySQL、PostgreSQL
- 数据集成:Airflow、Kafka
- 数据处理:Spark、Flink
- 数据仓库:Snowflake
- 数据湖:Delta Lake
- 数据质量:Great Expectations
- 数据目录:DataHub
- 数据可视化:Tableau
实现步骤¶
-
需求分析
Python# 业务需求 requirements = { 'sales_analysis': { 'description': '销售数据分析', 'metrics': ['total_sales', 'average_order_value', 'conversion_rate'], 'dimensions': ['product', 'category', 'customer', 'time'] }, 'customer_analysis': { 'description': '客户分析', 'metrics': ['customer_lifetime_value', 'retention_rate', 'churn_rate'], 'dimensions': ['customer_segment', 'acquisition_channel', 'time'] }, 'inventory_analysis': { 'description': '库存分析', 'metrics': ['inventory_turnover', 'stockout_rate', 'reorder_point'], 'dimensions': ['product', 'warehouse', 'time'] } } -
数据建模
SQL-- 事实表 CREATE TABLE dwd_sales_fact ( sale_id BIGINT PRIMARY KEY, product_id BIGINT, customer_id BIGINT, time_id BIGINT, store_id BIGINT, quantity INT, amount DECIMAL(10,2), discount DECIMAL(10,2), tax DECIMAL(10,2), net_amount DECIMAL(10,2) ) PARTITIONED BY (YEAR(sale_date), MONTH(sale_date)); -- 维度表 CREATE TABLE dwd_product_dim ( product_id BIGINT PRIMARY KEY, product_name VARCHAR(200), category_id BIGINT, brand_id BIGINT, price DECIMAL(10,2), cost DECIMAL(10,2), margin DECIMAL(10,2) ); CREATE TABLE dwd_customer_dim ( customer_id BIGINT PRIMARY KEY, customer_name VARCHAR(200), email VARCHAR(100), phone VARCHAR(20), city VARCHAR(100), country VARCHAR(100), customer_segment VARCHAR(50), acquisition_channel VARCHAR(50) ); -
ETL流程
Pythonfrom airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = { 'owner': 'data_team', 'start_date': datetime(2024, 1, 1), 'retries': 3, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'ecommerce_warehouse', default_args=default_args, description='Ecommerce data warehouse ETL', schedule='@daily', # 使用新参数名(Airflow 2.x) catchup=False, ) def extract_sales(): """抽取销售数据""" print("Extracting sales data...") # 实际的数据抽取逻辑 return 'sales_data.csv' def transform_sales(): """转换销售数据""" print("Transforming sales data...") # 实际的数据转换逻辑 return 'transformed_sales.csv' def load_sales(): """加载销售数据""" print("Loading sales data...") # 实际的数据加载逻辑 return 'load_complete' extract_task = PythonOperator( task_id='extract', python_callable=extract_sales, dag=dag ) transform_task = PythonOperator( task_id='transform', python_callable=transform_sales, dag=dag ) load_task = PythonOperator( task_id='load', python_callable=load_sales, dag=dag ) extract_task >> transform_task >> load_task
部署指南¶
Bash
# 1. 克隆项目
git clone https://github.com/yourorg/ecommerce-dw.git
cd ecommerce-dw
# 2. 配置环境
cp config/airflow.cfg.example config/airflow.cfg
cp config/spark.conf.example config/spark.conf
# 3. 启动Airflow
airflow db init
airflow webserver
airflow scheduler
# 4. 验证部署
# 访问Airflow UI: http://localhost:8080
优化建议¶
- 性能优化
- 使用分区表
- 创建索引
-
优化查询
-
成本优化
- 使用压缩
- 优化存储
-
合理配置资源
-
质量优化
- 实施数据质量检查
- 建立数据质量监控
- 定期数据质量审计
项目2:实时数据处理平台¶
项目概述¶
构建一个实时数据处理平台,支持实时数据分析和决策。
技术栈¶
- 消息队列:Kafka
- 流处理:Flink
- 存储:Redis、Elasticsearch
- 可视化:Grafana
实现步骤¶
-
数据采集
Pythonfrom kafka import KafkaProducer import json producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) # 发送实时数据 producer.send('sales', value={ 'product_id': 1, 'quantity': 10, 'amount': 100.0, 'timestamp': datetime.now().isoformat() }) producer.flush() -
流处理
Pythonfrom pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.execute_sql(""" CREATE TABLE sales ( product_id INT, quantity INT, amount DECIMAL(10,2), timestamp TIMESTAMP(3), WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'sales', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json' ) """) result = t_env.sql_query(""" SELECT product_id, COUNT(*) as order_count, SUM(quantity) as total_quantity, SUM(amount) as total_amount FROM sales GROUP BY product_id, TUMBLE(timestamp, INTERVAL '1' MINUTE) """) result.execute().print()
部署指南¶
Bash
# 1. 启动Kafka
docker-compose up -d kafka zookeeper
# 2. 启动Flink
docker-compose up -d flink-jobmanager flink-taskmanager
# 3. 提交作业
flink run -c com.example.SalesProcessor target/sales-processor.jar
# 4. 验证部署
# 访问Flink UI: http://localhost:8081
项目3:数据质量监控平台¶
项目概述¶
构建一个数据质量监控平台,实时监控数据质量。
技术栈¶
- 数据质量:Great Expectations
- 监控:Prometheus、Grafana
- 告警:AlertManager
- 存储:PostgreSQL
实现步骤¶
- 数据质量检查 Python
import great_expectations as gx import pandas as pd # 创建 Data Context context = gx.get_context() # 读取数据并创建 Data Source df = pd.read_csv('sales.csv') data_source = context.data_sources.add_pandas(name="sales_source") data_asset = data_source.add_dataframe_asset(name="sales_asset") batch_definition = data_asset.add_batch_definition_whole_dataframe("sales_batch") # 创建 Expectation Suite suite = context.suites.add( gx.ExpectationSuite(name="sales_suite") ) # 添加 Expectations suite.add_expectation( gx.expectations.ExpectColumnToExist(column="product_id") ) suite.add_expectation( gx.expectations.ExpectColumnValuesToBeBetween( column='quantity', min_value=1, max_value=1000 ) ) suite.add_expectation( gx.expectations.ExpectColumnValuesToBeBetween( column='amount', min_value=0, max_value=100000 ) ) # 创建 Validation Definition 并验证 validation_definition = context.validation_definitions.add( gx.ValidationDefinition( name="sales_validation", data=batch_definition, suite=suite ) ) result = validation_definition.run(batch_parameters={"dataframe": df}) # 输出结果 print(f"Validation success: {result.success}") for exp_result in result.results: print(f" {exp_result.expectation_config.type}: {exp_result.success}")
部署指南¶
Bash
# 1. 安装Great Expectations
pip install great_expectations
# 2. 初始化项目(GX 1.x 使用 Python API 配置,无需 CLI init)
# 参考 08-数据质量.md 中的 gx.get_context() 方式
# 3. 运行验证:通过 Python 脚本执行
python run_validation.py
# 4. 查看报告
# 访问: http://localhost:8080
项目4:数据血缘平台¶
项目概述¶
构建一个数据血缘平台,追踪数据流向和依赖关系。
技术栈¶
- 数据目录:DataHub
- 元数据:PostgreSQL
- 可视化:Neo4j
实现步骤¶
- 数据血缘捕获 Python
# 导入 DataHub REST 发射器,用于向 DataHub 服务端推送元数据 from datahub.emitter.rest_emitter import DatahubRestEmitter # 导入元数据变更提案包装器,封装元数据变更请求 from datahub.emitter.mcp import MetadataChangeProposalWrapper # 导入各类元数据 Schema 定义类 from datahub.metadata.schema_classes import ( DatasetPropertiesClass, # 数据集属性(名称、描述等) UpstreamLineageClass, # 上游血缘关系 UpstreamClass, # 上游数据集 DatasetLineageTypeClass, # 血缘类型(转换、复制等) ) # 创建 DataHub Emitter emitter = DatahubRestEmitter( gms_server="http://localhost:8080" ) # 定义数据集 URN dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:hive,sales_data,PROD)" # 发送数据集属性 dataset_properties = MetadataChangeProposalWrapper( entityUrn=dataset_urn, aspect=DatasetPropertiesClass( name="Sales Data", description="Daily sales data", customProperties={"team": "data_engineering"} ) ) emitter.emit(dataset_properties) # 发送血缘关系(上游依赖),描述数据集之间的转换关系 upstream_urn = "urn:li:dataset:(urn:li:dataPlatform:mysql,raw_orders,PROD)" lineage = MetadataChangeProposalWrapper( entityUrn=dataset_urn, aspect=UpstreamLineageClass( upstreams=[ UpstreamClass( dataset=upstream_urn, type=DatasetLineageTypeClass.TRANSFORMED ) ] ) ) emitter.emit(lineage) print(f"DataHub metadata emitted for: {dataset_urn}")
部署指南¶
Bash
# 1. 启动DataHub
docker-compose up -d datahub
# 2. 访问DataHub UI
# http://localhost:9002
# 3. 配置数据源
# 添加数据库、数据湖等数据源
项目5:实时推荐系统¶
项目概述¶
构建一个实时推荐系统,基于用户行为实时推荐商品。
技术栈¶
- 实时处理:Flink
- 特征存储:Redis
- 模型服务:TensorFlow Serving
- API:FastAPI
实现步骤¶
- 实时特征计算 Python
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.execute_sql(""" CREATE TABLE user_events ( user_id BIGINT, product_id BIGINT, event_type STRING, timestamp TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_events', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json' ) """) # 计算用户特征 user_features = t_env.sql_query(""" SELECT user_id, COUNT(*) as event_count, COUNT(DISTINCT product_id) as product_diversity, AVG(CASE WHEN event_type = 'view' THEN 1 ELSE 0 END) as view_rate, AVG(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchase_rate FROM user_events GROUP BY user_id, TUMBLE(timestamp, INTERVAL '1' HOUR) """) user_features.execute().print()
部署指南¶
Bash
# 1. 启动Redis
docker-compose up -d redis
# 2. 启动TensorFlow Serving
docker run -p 8501:8501 \
-v /path/to/model:/models/model \
tensorflow/serving
# 3. 启动Flink
docker-compose up -d flink
# 4. 启动API服务
uvicorn api:app --host 0.0.0.0 --port 8000
📊 项目对比¶
| 项目 | 技术栈 | 难度 | 耗时 | 适用场景 |
|---|---|---|---|---|
| 电商数据仓库 | Airflow, Spark, Snowflake | ⭐⭐⭐ | 4周 | 传统数据仓库 |
| 实时数据处理平台 | Kafka, Flink, Redis | ⭐⭐⭐⭐ | 4周 | 实时数据处理 |
| 数据质量监控平台 | Great Expectations, Prometheus | ⭐⭐ | 2周 | 数据质量监控 |
| 数据血缘平台 | DataHub, Neo4j | ⭐⭐⭐ | 3周 | 数据治理 |
| 实时推荐系统 | Flink, Redis, TensorFlow | ⭐⭐⭐⭐ | 4周 | 实时推荐 |
🎯 学习建议¶
- 循序渐进
- 从简单项目开始
- 逐步增加复杂度
-
理解每个技术点
-
动手实践
- 亲自完成每个项目
- 不要只看不做
-
记录遇到的问题
-
扩展优化
- 在基础项目上扩展
- 尝试不同的实现方式
-
优化性能和成本
-
总结反思
- 项目完成后总结
- 记录经验教训
- 分享给他人
📚 参考资料¶
- Apache Airflow文档:https://airflow.apache.org/docs/
- Apache Spark文档:https://spark.apache.org/docs/
- Apache Flink文档:https://flink.apache.org/docs/
- Great Expectations文档:https://docs.greatexpectations.io/
- DataHub文档:https://datahubproject.io/docs/
🎯 总结¶
通过完成这5个实战项目,你将:
- 掌握数据仓库的构建方法
- 理解实时数据处理技术
- 具备数据质量监控能力
- 掌握数据血缘管理
- 具备实时推荐系统开发能力
每个项目都是完整的、可运行的,可以直接部署到生产环境。建议按照项目顺序逐步完成,每个项目完成后进行总结和反思,为下一个项目做好准备。