跳转至

第10章:大数据处理架构

大数据处理架构

10.1 大数据处理概述

大数据处理的特点

  1. 数据量大:TB、PB级别的数据
  2. 处理速度快:实时或准实时处理
  3. 数据类型多:结构化、半结构化、非结构化数据
  4. 价值密度低:需要从大量数据中提取价值

大数据处理架构的类型

  1. 批处理:处理历史数据
  2. 流处理:处理实时数据
  3. 混合处理:批处理和流处理结合

10.2 批处理

10.2.1 Hadoop

Hadoop是一个分布式计算框架,用于处理大规模数据。

核心组件: - HDFS:分布式文件系统 - MapReduce:分布式计算框架 - YARN:资源管理器

Python
# MapReduce示例
from mrjob.job import MRJob

class WordCount(MRJob):

    def mapper(self, _, line):
        for word in line.split():
            # yield使函数成为生成器:每次yield产出一个(key,value)对,框架逐条收集结果而非一次性返回,适合超大数据集
            yield (word, 1)  # yield生成器:惰性产出值,节省内存

    def reducer(self, word, counts):
        yield (word, sum(counts))

if __name__ == '__main__':
    WordCount.run()

10.2.2 Spark

Spark是一个快速的大数据处理引擎,支持批处理、流处理、SQL、机器学习等。

特点: - 内存计算 - 快速 - 易用 - 生态系统丰富

Python
from pyspark.sql import SparkSession

# 创建SparkSession
spark = SparkSession.builder \
    .appName("WordCount") \
    .getOrCreate()

# 读取数据
text_file = spark.read.text("data.txt")

# 词频统计
word_counts = text_file.rdd \
    .flatMap(lambda line: line.value.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)

# 输出结果
word_counts.saveAsTextFile("output")

10.2.3 Spark SQL

Python
from pyspark.sql import SparkSession

# 创建SparkSession
spark = SparkSession.builder \
    .appName("SparkSQL") \
    .getOrCreate()

# 读取数据
df = spark.read.json("users.json")

# 注册临时表
df.createOrReplaceTempView("users")

# 执行SQL查询
result = spark.sql("""
    SELECT
        age,
        COUNT(*) as count
    FROM users
    GROUP BY age
    ORDER BY age
""")

# 输出结果
result.show()

10.3 流处理

10.3.1 Kafka Streams

Python
import json
from kafka import KafkaConsumer, KafkaProducer

# 消费者
consumer = KafkaConsumer(
    'input-topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))  # json.loads将JSON字符串转为Python对象
)

# 生产者
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # json.dumps将Python对象转为JSON字符串
)

# 流处理
for message in consumer:
    data = message.value

    # 处理数据
    processed_data = process(data)

    # 发送到输出topic
    producer.send('output-topic', processed_data)

10.3.2 Spark Streaming

Python
from pyspark.streaming import StreamingContext
from pyspark import SparkContext

# 创建StreamingContext
sc = SparkContext(appName="NetworkWordCount")
ssc = StreamingContext(sc, batchDuration=1)

# 创建DStream
lines = ssc.socketTextStream("localhost", 9999)

# 词频统计
words = lines.flatMap(lambda line: line.split())  # lambda匿名函数:简洁的单行函数
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# 输出结果
word_counts.pprint()

# 启动流处理
ssc.start()
ssc.awaitTermination()
Python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

# 创建执行环境
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

# 创建Kafka表
t_env.execute_sql("""
    CREATE TABLE input_table (
        user_id INT,
        product_id INT,
        amount DECIMAL(10, 2),
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'json'
    )
""")

# 创建结果表
t_env.execute_sql("""
    CREATE TABLE output_table (
        user_id INT,
        total_amount DECIMAL(10, 2),
        window_start TIMESTAMP(3),
        window_end TIMESTAMP(3)
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_amounts',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'json'
    )
""")

# 执行查询
t_env.execute_sql("""
    INSERT INTO output_table
    SELECT
        user_id,
        SUM(amount) as total_amount,
        TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
        TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end
    FROM input_table
    GROUP BY
        user_id,
        TUMBLE(event_time, INTERVAL '1' MINUTE)
""").wait()

10.4 数据仓库

10.4.1 数据仓库概述

数据仓库是一个面向主题的、集成的、相对稳定的、反映历史变化的数据集合,用于支持管理决策。

10.4.2 数据建模

星型模型

Text Only
事实表
├── 维度表1
├── 维度表2
└── 维度表3

雪花模型

Text Only
事实表
├── 维度表1
│   ├── 子维度表1
│   └── 子维度表2
├── 维度表2
└── 维度表3

10.4.3 Hive

SQL
-- 创建表
CREATE TABLE orders (
    order_id INT,
    user_id INT,
    product_id INT,
    amount DECIMAL(10, 2),
    order_date STRING
)
PARTITIONED BY (year STRING, month STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

-- 加载数据
LOAD DATA LOCAL INPATH 'orders.csv'
INTO TABLE orders
PARTITION (year='2024', month='01');

-- 查询数据
SELECT
    user_id,
    SUM(amount) as total_amount
FROM orders
WHERE year='2024' AND month='01'
GROUP BY user_id;

10.4.4 ClickHouse

SQL
-- 创建表
CREATE TABLE orders (
    order_id UInt32,
    user_id UInt32,
    product_id UInt32,
    amount Decimal(10, 2),
    order_date Date
)
ENGINE = MergeTree()
ORDER BY (order_date, user_id);

-- 插入数据
INSERT INTO orders VALUES
    (1, 100, 200, 100.50, '2024-01-01'),
    (2, 101, 201, 200.75, '2024-01-02');

-- 查询数据
SELECT
    user_id,
    SUM(amount) as total_amount
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY user_id;  -- GROUP BY分组;HAVING过滤分组

10.5 数据湖

10.5.1 数据湖概述

数据湖是一个存储各种格式数据的集中式存储,支持结构化、半结构化和非结构化数据。

10.5.2 Delta Lake

Python
from delta import *
from pyspark.sql import SparkSession

# 创建SparkSession
spark = SparkSession.builder \
    .appName("DeltaLake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 创建Delta表
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

# 读取Delta表
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

# 更新Delta表
df = spark.read.format("delta").load("/tmp/delta-table")
df = df.filter("id > 2")
df.write.format("delta").mode("overwrite").save("/tmp/delta-table")

# 时间旅行
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

10.6 实战练习

练习1:实现一个批处理任务

使用Spark实现一个批处理任务: 1. 读取数据 2. 数据清洗 3. 数据转换 4. 数据输出

练习2:实现一个流处理任务

使用Flink实现一个流处理任务: 1. 读取Kafka数据 2. 实时计算 3. 输出到Kafka 4. 容错处理

练习3:设计一个数据仓库

设计一个电商数据仓库: 1. 设计数据模型 2. 创建Hive表 3. 实现ETL流程 4. 优化查询性能

10.7 面试准备

常见面试题

  1. 什么是大数据处理?有哪些架构?
  2. Hadoop和Spark的区别?
  3. 批处理和流处理的区别?
  4. 什么是数据仓库?什么是数据湖?
  5. 如何设计一个大数据处理架构?

项目经验准备

准备一个大数据处理项目: - 使用的技术栈 - 遇到的挑战 - 解决方案 - 项目成果

10.8 总结

本章介绍了大数据处理架构,包括批处理、流处理、数据仓库和数据湖。大数据处理是现代数据架构的核心。

关键要点

  1. 大数据处理包括批处理和流处理
  2. Hadoop和Spark是主流的批处理框架
  3. Flink是主流的流处理框架
  4. 数据仓库用于支持管理决策
  5. 数据湖支持各种格式的数据存储

下一步

下一章将深入学习存储架构,包括对象存储、文件存储等内容。