跳转至

数据工程实战项目

📚 项目概述

本章提供5个完整的实战项目,帮助你将理论知识转化为实际能力。每个项目都包含完整的技术栈、实现步骤、代码示例和部署指南。

🎯 项目列表

项目1:电商数据仓库

项目概述

构建一个完整的电商数据仓库,使用现代数据工程技术栈。

技术栈

  • 数据源:MySQL、PostgreSQL
  • 数据集成:Airflow、Kafka
  • 数据处理:Spark、Flink
  • 数据仓库:Snowflake
  • 数据湖:Delta Lake
  • 数据质量:Great Expectations
  • 数据目录:DataHub
  • 数据可视化:Tableau

实现步骤

  1. 需求分析

    Python
    # 业务需求
    requirements = {
        'sales_analysis': {
            'description': '销售数据分析',
            'metrics': ['total_sales', 'average_order_value', 'conversion_rate'],
            'dimensions': ['product', 'category', 'customer', 'time']
        },
        'customer_analysis': {
            'description': '客户分析',
            'metrics': ['customer_lifetime_value', 'retention_rate', 'churn_rate'],
            'dimensions': ['customer_segment', 'acquisition_channel', 'time']
        },
        'inventory_analysis': {
            'description': '库存分析',
            'metrics': ['inventory_turnover', 'stockout_rate', 'reorder_point'],
            'dimensions': ['product', 'warehouse', 'time']
        }
    }
    

  2. 数据建模

    SQL
    -- 事实表
    CREATE TABLE dwd_sales_fact (
        sale_id BIGINT PRIMARY KEY,
        product_id BIGINT,
        customer_id BIGINT,
        time_id BIGINT,
        store_id BIGINT,
        quantity INT,
        amount DECIMAL(10,2),
        discount DECIMAL(10,2),
        tax DECIMAL(10,2),
        net_amount DECIMAL(10,2)
    ) PARTITIONED BY (YEAR(sale_date), MONTH(sale_date));
    
    -- 维度表
    CREATE TABLE dwd_product_dim (
        product_id BIGINT PRIMARY KEY,
        product_name VARCHAR(200),
        category_id BIGINT,
        brand_id BIGINT,
        price DECIMAL(10,2),
        cost DECIMAL(10,2),
        margin DECIMAL(10,2)
    );
    
    CREATE TABLE dwd_customer_dim (
        customer_id BIGINT PRIMARY KEY,
        customer_name VARCHAR(200),
        email VARCHAR(100),
        phone VARCHAR(20),
        city VARCHAR(100),
        country VARCHAR(100),
        customer_segment VARCHAR(50),
        acquisition_channel VARCHAR(50)
    );
    

  3. ETL流程

    Python
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'data_team',
        'start_date': datetime(2024, 1, 1),
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG(
        'ecommerce_warehouse',
        default_args=default_args,
        description='Ecommerce data warehouse ETL',
        schedule='@daily',  # 使用新参数名(Airflow 2.x)
        catchup=False,
    )
    
    def extract_sales():
        """抽取销售数据"""
        print("Extracting sales data...")
        # 实际的数据抽取逻辑
        return 'sales_data.csv'
    
    def transform_sales():
        """转换销售数据"""
        print("Transforming sales data...")
        # 实际的数据转换逻辑
        return 'transformed_sales.csv'
    
    def load_sales():
        """加载销售数据"""
        print("Loading sales data...")
        # 实际的数据加载逻辑
        return 'load_complete'
    
    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract_sales,
        dag=dag
    )
    
    transform_task = PythonOperator(
        task_id='transform',
        python_callable=transform_sales,
        dag=dag
    )
    
    load_task = PythonOperator(
        task_id='load',
        python_callable=load_sales,
        dag=dag
    )
    
    extract_task >> transform_task >> load_task
    

部署指南

Bash
# 1. 克隆项目
git clone https://github.com/yourorg/ecommerce-dw.git
cd ecommerce-dw

# 2. 配置环境
cp config/airflow.cfg.example config/airflow.cfg
cp config/spark.conf.example config/spark.conf

# 3. 启动Airflow
airflow db init
airflow webserver
airflow scheduler

# 4. 验证部署
# 访问Airflow UI: http://localhost:8080

优化建议

  1. 性能优化
  2. 使用分区表
  3. 创建索引
  4. 优化查询

  5. 成本优化

  6. 使用压缩
  7. 优化存储
  8. 合理配置资源

  9. 质量优化

  10. 实施数据质量检查
  11. 建立数据质量监控
  12. 定期数据质量审计

项目2:实时数据处理平台

项目概述

构建一个实时数据处理平台,支持实时数据分析和决策。

技术栈

  • 消息队列:Kafka
  • 流处理:Flink
  • 存储:Redis、Elasticsearch
  • 可视化:Grafana

实现步骤

  1. 数据采集

    Python
    from kafka import KafkaProducer
    import json
    
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    
    # 发送实时数据
    producer.send('sales', value={
            'product_id': 1,
            'quantity': 10,
            'amount': 100.0,
            'timestamp': datetime.now().isoformat()
        })
    producer.flush()
    

  2. 流处理

    Python
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.table import StreamTableEnvironment
    
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    
    t_env.execute_sql("""
        CREATE TABLE sales (
            product_id INT,
            quantity INT,
            amount DECIMAL(10,2),
            timestamp TIMESTAMP(3),
            WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'sales',
            'properties.bootstrap.servers' = 'localhost:9092',
            'format' = 'json'
        )
    """)
    
    result = t_env.sql_query("""
        SELECT
            product_id,
            COUNT(*) as order_count,
            SUM(quantity) as total_quantity,
            SUM(amount) as total_amount
        FROM sales
        GROUP BY
            product_id,
            TUMBLE(timestamp, INTERVAL '1' MINUTE)
    """)
    
    result.execute().print()
    

部署指南

Bash
# 1. 启动Kafka
docker-compose up -d kafka zookeeper

# 2. 启动Flink
docker-compose up -d flink-jobmanager flink-taskmanager

# 3. 提交作业
flink run -c com.example.SalesProcessor target/sales-processor.jar

# 4. 验证部署
# 访问Flink UI: http://localhost:8081

项目3:数据质量监控平台

项目概述

构建一个数据质量监控平台,实时监控数据质量。

技术栈

  • 数据质量:Great Expectations
  • 监控:Prometheus、Grafana
  • 告警:AlertManager
  • 存储:PostgreSQL

实现步骤

  1. 数据质量检查
    Python
    import great_expectations as gx
    import pandas as pd
    
    # 创建 Data Context
    context = gx.get_context()
    
    # 读取数据并创建 Data Source
    df = pd.read_csv('sales.csv')
    data_source = context.data_sources.add_pandas(name="sales_source")
    data_asset = data_source.add_dataframe_asset(name="sales_asset")
    batch_definition = data_asset.add_batch_definition_whole_dataframe("sales_batch")
    
    # 创建 Expectation Suite
    suite = context.suites.add(
        gx.ExpectationSuite(name="sales_suite")
    )
    
    # 添加 Expectations
    suite.add_expectation(
        gx.expectations.ExpectColumnToExist(column="product_id")
    )
    
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToBeBetween(
            column='quantity',
            min_value=1,
            max_value=1000
        )
    )
    
    suite.add_expectation(
        gx.expectations.ExpectColumnValuesToBeBetween(
            column='amount',
            min_value=0,
            max_value=100000
        )
    )
    
    # 创建 Validation Definition 并验证
    validation_definition = context.validation_definitions.add(
        gx.ValidationDefinition(
            name="sales_validation",
            data=batch_definition,
            suite=suite
        )
    )
    result = validation_definition.run(batch_parameters={"dataframe": df})
    
    # 输出结果
    print(f"Validation success: {result.success}")
    for exp_result in result.results:
        print(f"  {exp_result.expectation_config.type}: {exp_result.success}")
    

部署指南

Bash
# 1. 安装Great Expectations
pip install great_expectations

# 2. 初始化项目(GX 1.x 使用 Python API 配置,无需 CLI init)
# 参考 08-数据质量.md 中的 gx.get_context() 方式

# 3. 运行验证:通过 Python 脚本执行
python run_validation.py

# 4. 查看报告
# 访问: http://localhost:8080

项目4:数据血缘平台

项目概述

构建一个数据血缘平台,追踪数据流向和依赖关系。

技术栈

  • 数据目录:DataHub
  • 元数据:PostgreSQL
  • 可视化:Neo4j

实现步骤

  1. 数据血缘捕获
    Python
    # 导入 DataHub REST 发射器,用于向 DataHub 服务端推送元数据
    from datahub.emitter.rest_emitter import DatahubRestEmitter
    # 导入元数据变更提案包装器,封装元数据变更请求
    from datahub.emitter.mcp import MetadataChangeProposalWrapper
    # 导入各类元数据 Schema 定义类
    from datahub.metadata.schema_classes import (
        DatasetPropertiesClass,       # 数据集属性(名称、描述等)
        UpstreamLineageClass,         # 上游血缘关系
        UpstreamClass,                # 上游数据集
        DatasetLineageTypeClass,      # 血缘类型(转换、复制等)
    )
    
    # 创建 DataHub Emitter
    emitter = DatahubRestEmitter(
        gms_server="http://localhost:8080"
    )
    
    # 定义数据集 URN
    dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:hive,sales_data,PROD)"
    
    # 发送数据集属性
    dataset_properties = MetadataChangeProposalWrapper(
        entityUrn=dataset_urn,
        aspect=DatasetPropertiesClass(
            name="Sales Data",
            description="Daily sales data",
            customProperties={"team": "data_engineering"}
        )
    )
    emitter.emit(dataset_properties)
    
    # 发送血缘关系(上游依赖),描述数据集之间的转换关系
    upstream_urn = "urn:li:dataset:(urn:li:dataPlatform:mysql,raw_orders,PROD)"
    lineage = MetadataChangeProposalWrapper(
        entityUrn=dataset_urn,
        aspect=UpstreamLineageClass(
            upstreams=[
                UpstreamClass(
                    dataset=upstream_urn,
                    type=DatasetLineageTypeClass.TRANSFORMED
                )
            ]
        )
    )
    emitter.emit(lineage)
    
    print(f"DataHub metadata emitted for: {dataset_urn}")
    

部署指南

Bash
# 1. 启动DataHub
docker-compose up -d datahub

# 2. 访问DataHub UI
# http://localhost:9002

# 3. 配置数据源
# 添加数据库、数据湖等数据源

项目5:实时推荐系统

项目概述

构建一个实时推荐系统,基于用户行为实时推荐商品。

技术栈

  • 实时处理:Flink
  • 特征存储:Redis
  • 模型服务:TensorFlow Serving
  • API:FastAPI

实现步骤

  1. 实时特征计算
    Python
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.table import StreamTableEnvironment
    
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    
    t_env.execute_sql("""
        CREATE TABLE user_events (
            user_id BIGINT,
            product_id BIGINT,
            event_type STRING,
            timestamp TIMESTAMP(3)
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'user_events',
            'properties.bootstrap.servers' = 'localhost:9092',
            'format' = 'json'
        )
    """)
    
    # 计算用户特征
    user_features = t_env.sql_query("""
        SELECT
            user_id,
            COUNT(*) as event_count,
            COUNT(DISTINCT product_id) as product_diversity,
            AVG(CASE WHEN event_type = 'view' THEN 1 ELSE 0 END) as view_rate,
            AVG(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchase_rate
        FROM user_events
        GROUP BY user_id,
        TUMBLE(timestamp, INTERVAL '1' HOUR)
    """)
    
    user_features.execute().print()
    

部署指南

Bash
# 1. 启动Redis
docker-compose up -d redis

# 2. 启动TensorFlow Serving
docker run -p 8501:8501 \
   -v /path/to/model:/models/model \
   tensorflow/serving

# 3. 启动Flink
docker-compose up -d flink

# 4. 启动API服务
uvicorn api:app --host 0.0.0.0 --port 8000

📊 项目对比

项目 技术栈 难度 耗时 适用场景
电商数据仓库 Airflow, Spark, Snowflake ⭐⭐⭐ 4周 传统数据仓库
实时数据处理平台 Kafka, Flink, Redis ⭐⭐⭐⭐ 4周 实时数据处理
数据质量监控平台 Great Expectations, Prometheus ⭐⭐ 2周 数据质量监控
数据血缘平台 DataHub, Neo4j ⭐⭐⭐ 3周 数据治理
实时推荐系统 Flink, Redis, TensorFlow ⭐⭐⭐⭐ 4周 实时推荐

🎯 学习建议

  1. 循序渐进
  2. 从简单项目开始
  3. 逐步增加复杂度
  4. 理解每个技术点

  5. 动手实践

  6. 亲自完成每个项目
  7. 不要只看不做
  8. 记录遇到的问题

  9. 扩展优化

  10. 在基础项目上扩展
  11. 尝试不同的实现方式
  12. 优化性能和成本

  13. 总结反思

  14. 项目完成后总结
  15. 记录经验教训
  16. 分享给他人

📚 参考资料

🎯 总结

通过完成这5个实战项目,你将:

  1. 掌握数据仓库的构建方法
  2. 理解实时数据处理技术
  3. 具备数据质量监控能力
  4. 掌握数据血缘管理
  5. 具备实时推荐系统开发能力

每个项目都是完整的、可运行的,可以直接部署到生产环境。建议按照项目顺序逐步完成,每个项目完成后进行总结和反思,为下一个项目做好准备。