第06章 实时数据处理¶
📚 章节概述¶
本章将深入讲解实时数据处理,包括Kafka、Flink、Spark Streaming等。通过本章学习,你将能够设计和实现实时数据处理系统。
🎯 学习目标¶
完成本章后,你将能够:
- 理解实时数据处理的核心概念
- 掌握Kafka消息队列的使用
- 了解Flink流处理框架
- 掌握Spark Streaming
- 能够设计和实现实时数据处理系统
6.1 实时数据处理概述¶
6.1.1 什么是实时数据处理¶
实时数据处理是对数据流进行实时处理和分析的技术。
实时处理特点¶
- 低延迟
- 毫秒级响应
- 实时决策
-
快速反馈
-
高吞吐
- 处理大量数据
- 水平扩展
-
弹性伸缩
-
持续处理
- 7x24运行
- 数据流式处理
- 无缝扩展
6.1.2 实时vs 批处理¶
| 特性 | 实时处理 | 批处理 |
|---|---|---|
| 延迟 | 毫秒级 | 分钟/小时级 |
| 吞吐 | 高 | 中 |
| 成本 | 高 | 低 |
| 复杂度 | 高 | 中 |
6.2 Kafka消息队列¶
6.2.1 Kafka概述¶
Apache Kafka是分布式流处理平台,用于构建实时数据管道和流应用。
核心概念¶
- Topic(主题)
- 数据流分类
- 类似数据库表
-
支持多分区
-
Partition(分区)
- Topic的分片
- 提高并行度
-
保证顺序
-
Producer(生产者)
- 发送消息
- 支持多种协议
-
高吞吐写入
-
Consumer(消费者)
- 接收消息
- 支持消费组
- 自动负载均衡
6.2.2 Kafka使用¶
Python
from kafka import KafkaProducer, KafkaConsumer
import json
# 生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8') # lambda匿名函数:简洁的单行函数
)
# 发送消息
producer.send('sales', value={'product_id': 1, 'quantity': 10, 'amount': 100.0})
producer.flush()
# 消费者
consumer = KafkaConsumer(
'sales',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')), # json.loads将JSON字符串转为Python对象
auto_offset_reset='earliest'
)
# 接收消息
for message in consumer:
print(f"Received: {message.value}")
# 处理消息
process_message(message.value)
6.3 Flink流处理¶
6.3.1 Flink概述¶
Apache Flink是分布式流处理引擎,支持有界和无界数据流。
核心特性¶
- 流批一体
- 统一API
- 流批切换
-
相同语义
-
状态管理
- 精确一次
- 状态后端
-
容错机制
-
窗口操作
- 时间窗口
- 计数窗口
- 会话窗口
6.3.2 Flink使用¶
Python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.expressions import col
# 创建执行环境
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# 定义数据源
t_env.execute_sql("""
CREATE TABLE sales (
product_id INT,
quantity INT,
amount DECIMAL(10,2),
sale_time TIMESTAMP(3),
WATERMARK FOR sale_time AS sale_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'sales',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'sales-group',
'format' = 'json'
)
""")
# 实时聚合
result = t_env.sql_query("""
SELECT
product_id,
COUNT(*) as order_count,
SUM(quantity) as total_quantity,
SUM(amount) as total_amount
FROM sales
GROUP BY
product_id,
TUMBLE(sale_time, INTERVAL '1' MINUTE)
""")
# 定义输出表
t_env.execute_sql("""
CREATE TABLE sales_aggregate (
product_id INT,
order_count BIGINT,
total_quantity BIGINT,
total_amount DECIMAL(10,2)
) WITH (
'connector' = 'print'
)
""")
# 将结果写入输出表
result.execute_insert('sales_aggregate').wait()
6.4 Spark Streaming¶
6.4.1 Spark Streaming概述¶
Spark Streaming是Spark的流处理组件,支持微批处理。
核心特性¶
- 微批处理
- 批次处理
- 低延迟
-
高吞吐
-
Exactly-Once
- 精确一次
- 事务支持
-
容错机制
-
统一API
- 流批一体
- 相同操作
- 易于使用
6.4.2 Spark Streaming使用¶
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col
from pyspark.sql.types import StructType, IntegerType, TimestampType, DecimalType
# 创建SparkSession
spark = SparkSession.builder \
.appName("SparkStreamingExample") \
.getOrCreate()
# 读取Kafka数据
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sales") \
.load()
# 解析数据
from pyspark.sql.functions import from_json, to_json
schema = StructType().add("product_id", IntegerType()) \
.add("quantity", IntegerType()) \
.add("amount", DecimalType(10,2)) \
.add("sale_time", TimestampType())
df = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select(
col("data.product_id"),
col("data.quantity"),
col("data.amount"),
col("data.sale_time")
)
# 实时聚合
from pyspark.sql.functions import sum as _sum, count as _count
aggregated_df = df.groupBy(
window(col("sale_time"), "1 minute")
).agg(
_count("*").alias("order_count"),
_sum("quantity").alias("total_quantity"),
_sum("amount").alias("total_amount")
)
# 输出结果
query = aggregated_df \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
# 等待流结束
query.awaitTermination(timeout=300)
6.5 练习题¶
基础题¶
- 选择题
-
实时处理的核心特点不包括什么?
- A. 低延迟
- B. 高成本
- C. 高吞吐
- D. 持续处理
-
简答题
- 解释实时处理vs 批处理的区别。
- 说明Kafka的核心概念。
进阶题¶
- 实践题
- 搭建一个Kafka集群。
- 实现Flink流处理。
-
实现Spark Streaming。
-
设计题
- 设计一个实时数据处理架构。
- 设计一个实时推荐系统。
答案¶
1. 选择题答案¶
- B(实时处理的核心特点不包括高成本)
2. 简答题答案¶
实时处理vs 批处理的区别: - 实时处理:毫秒级延迟、高吞吐、高成本 - 批处理:分钟/小时级延迟、中吞吐、低成本
Kafka的核心概念: - Topic、Partition、Producer、Consumer
3. 实践题答案¶
参见6.2-6.4节的示例。
4. 设计题答案¶
参见6.1-6.4节的架构设计。
6.6 面试准备¶
大厂面试题¶
字节跳动¶
- 解释实时处理的核心概念。
- Kafka的分区机制是什么?
- Flink的窗口操作有哪些?
- 如何设计实时数据处理系统?
腾讯¶
- Flink和Spark Streaming的区别是什么?
- 如何实现Exactly-Once?
- 如何设计实时数据管道?
- 如何处理实时数据延迟?
阿里云¶
- 实时处理的最佳实践是什么?
- 如何设计实时数据质量?
- 如何设计实时数据监控?
- 如何设计实时数据容灾?
📚 参考资料¶
- Kafka文档:https://kafka.apache.org/documentation/
- Flink文档:https://flink.apache.org/docs/
- Spark Streaming文档:https://spark.apache.org/docs/latest/streaming/
- 《Stream Processing with Apache Flink》
🎯 本章小结¶
本章深入讲解了实时数据处理,包括:
- 实时数据处理的核心概念
- Kafka消息队列
- Flink流处理
- Spark Streaming
通过本章学习,你掌握了实时数据处理的核心技术,能够设计和实现实时数据处理系统。下一章将深入学习流处理框架。