第10章:大数据处理架构¶
10.1 大数据处理概述¶
大数据处理的特点¶
- 数据量大:TB、PB级别的数据
- 处理速度快:实时或准实时处理
- 数据类型多:结构化、半结构化、非结构化数据
- 价值密度低:需要从大量数据中提取价值
大数据处理架构的类型¶
- 批处理:处理历史数据
- 流处理:处理实时数据
- 混合处理:批处理和流处理结合
10.2 批处理¶
10.2.1 Hadoop¶
Hadoop是一个分布式计算框架,用于处理大规模数据。
核心组件: - HDFS:分布式文件系统 - MapReduce:分布式计算框架 - YARN:资源管理器
Python
# MapReduce示例
from mrjob.job import MRJob
class WordCount(MRJob):
def mapper(self, _, line):
for word in line.split():
# yield使函数成为生成器:每次yield产出一个(key,value)对,框架逐条收集结果而非一次性返回,适合超大数据集
yield (word, 1) # yield生成器:惰性产出值,节省内存
def reducer(self, word, counts):
yield (word, sum(counts))
if __name__ == '__main__':
WordCount.run()
10.2.2 Spark¶
Spark是一个快速的大数据处理引擎,支持批处理、流处理、SQL、机器学习等。
特点: - 内存计算 - 快速 - 易用 - 生态系统丰富
Python
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder \
.appName("WordCount") \
.getOrCreate()
# 读取数据
text_file = spark.read.text("data.txt")
# 词频统计
word_counts = text_file.rdd \
.flatMap(lambda line: line.value.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
# 输出结果
word_counts.saveAsTextFile("output")
10.2.3 Spark SQL¶
Python
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder \
.appName("SparkSQL") \
.getOrCreate()
# 读取数据
df = spark.read.json("users.json")
# 注册临时表
df.createOrReplaceTempView("users")
# 执行SQL查询
result = spark.sql("""
SELECT
age,
COUNT(*) as count
FROM users
GROUP BY age
ORDER BY age
""")
# 输出结果
result.show()
10.3 流处理¶
10.3.1 Kafka Streams¶
Python
import json
from kafka import KafkaConsumer, KafkaProducer
# 消费者
consumer = KafkaConsumer(
'input-topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')) # json.loads将JSON字符串转为Python对象
)
# 生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8') # json.dumps将Python对象转为JSON字符串
)
# 流处理
for message in consumer:
data = message.value
# 处理数据
processed_data = process(data)
# 发送到输出topic
producer.send('output-topic', processed_data)
10.3.2 Spark Streaming¶
Python
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
# 创建StreamingContext
sc = SparkContext(appName="NetworkWordCount")
ssc = StreamingContext(sc, batchDuration=1)
# 创建DStream
lines = ssc.socketTextStream("localhost", 9999)
# 词频统计
words = lines.flatMap(lambda line: line.split()) # lambda匿名函数:简洁的单行函数
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
# 输出结果
word_counts.pprint()
# 启动流处理
ssc.start()
ssc.awaitTermination()
10.3.3 Flink¶
Python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
# 创建执行环境
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# 创建Kafka表
t_env.execute_sql("""
CREATE TABLE input_table (
user_id INT,
product_id INT,
amount DECIMAL(10, 2),
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# 创建结果表
t_env.execute_sql("""
CREATE TABLE output_table (
user_id INT,
total_amount DECIMAL(10, 2),
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_amounts',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# 执行查询
t_env.execute_sql("""
INSERT INTO output_table
SELECT
user_id,
SUM(amount) as total_amount,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end
FROM input_table
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '1' MINUTE)
""").wait()
10.4 数据仓库¶
10.4.1 数据仓库概述¶
数据仓库是一个面向主题的、集成的、相对稳定的、反映历史变化的数据集合,用于支持管理决策。
10.4.2 数据建模¶
星型模型¶
雪花模型¶
10.4.3 Hive¶
SQL
-- 创建表
CREATE TABLE orders (
order_id INT,
user_id INT,
product_id INT,
amount DECIMAL(10, 2),
order_date STRING
)
PARTITIONED BY (year STRING, month STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
-- 加载数据
LOAD DATA LOCAL INPATH 'orders.csv'
INTO TABLE orders
PARTITION (year='2024', month='01');
-- 查询数据
SELECT
user_id,
SUM(amount) as total_amount
FROM orders
WHERE year='2024' AND month='01'
GROUP BY user_id;
10.4.4 ClickHouse¶
SQL
-- 创建表
CREATE TABLE orders (
order_id UInt32,
user_id UInt32,
product_id UInt32,
amount Decimal(10, 2),
order_date Date
)
ENGINE = MergeTree()
ORDER BY (order_date, user_id);
-- 插入数据
INSERT INTO orders VALUES
(1, 100, 200, 100.50, '2024-01-01'),
(2, 101, 201, 200.75, '2024-01-02');
-- 查询数据
SELECT
user_id,
SUM(amount) as total_amount
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY user_id; -- GROUP BY分组;HAVING过滤分组
10.5 数据湖¶
10.5.1 数据湖概述¶
数据湖是一个存储各种格式数据的集中式存储,支持结构化、半结构化和非结构化数据。
10.5.2 Delta Lake¶
Python
from delta import *
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder \
.appName("DeltaLake") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 创建Delta表
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
# 读取Delta表
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
# 更新Delta表
df = spark.read.format("delta").load("/tmp/delta-table")
df = df.filter("id > 2")
df.write.format("delta").mode("overwrite").save("/tmp/delta-table")
# 时间旅行
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()
10.6 实战练习¶
练习1:实现一个批处理任务¶
使用Spark实现一个批处理任务: 1. 读取数据 2. 数据清洗 3. 数据转换 4. 数据输出
练习2:实现一个流处理任务¶
使用Flink实现一个流处理任务: 1. 读取Kafka数据 2. 实时计算 3. 输出到Kafka 4. 容错处理
练习3:设计一个数据仓库¶
设计一个电商数据仓库: 1. 设计数据模型 2. 创建Hive表 3. 实现ETL流程 4. 优化查询性能
10.7 面试准备¶
常见面试题¶
- 什么是大数据处理?有哪些架构?
- Hadoop和Spark的区别?
- 批处理和流处理的区别?
- 什么是数据仓库?什么是数据湖?
- 如何设计一个大数据处理架构?
项目经验准备¶
准备一个大数据处理项目: - 使用的技术栈 - 遇到的挑战 - 解决方案 - 项目成果
10.8 总结¶
本章介绍了大数据处理架构,包括批处理、流处理、数据仓库和数据湖。大数据处理是现代数据架构的核心。
关键要点¶
- 大数据处理包括批处理和流处理
- Hadoop和Spark是主流的批处理框架
- Flink是主流的流处理框架
- 数据仓库用于支持管理决策
- 数据湖支持各种格式的数据存储
下一步¶
下一章将深入学习存储架构,包括对象存储、文件存储等内容。