跳转至

第06章 实时数据处理

实时数据处理

📚 章节概述

本章将深入讲解实时数据处理,包括Kafka、Flink、Spark Streaming等。通过本章学习,你将能够设计和实现实时数据处理系统。

🎯 学习目标

完成本章后,你将能够:

  1. 理解实时数据处理的核心概念
  2. 掌握Kafka消息队列的使用
  3. 了解Flink流处理框架
  4. 掌握Spark Streaming
  5. 能够设计和实现实时数据处理系统

6.1 实时数据处理概述

6.1.1 什么是实时数据处理

实时数据处理是对数据流进行实时处理和分析的技术。

实时处理特点

  1. 低延迟
  2. 毫秒级响应
  3. 实时决策
  4. 快速反馈

  5. 高吞吐

  6. 处理大量数据
  7. 水平扩展
  8. 弹性伸缩

  9. 持续处理

  10. 7x24运行
  11. 数据流式处理
  12. 无缝扩展

6.1.2 实时vs 批处理

特性 实时处理 批处理
延迟 毫秒级 分钟/小时级
吞吐
成本
复杂度

6.2 Kafka消息队列

6.2.1 Kafka概述

Apache Kafka是分布式流处理平台,用于构建实时数据管道和流应用。

核心概念

  1. Topic(主题)
  2. 数据流分类
  3. 类似数据库表
  4. 支持多分区

  5. Partition(分区)

  6. Topic的分片
  7. 提高并行度
  8. 保证顺序

  9. Producer(生产者)

  10. 发送消息
  11. 支持多种协议
  12. 高吞吐写入

  13. Consumer(消费者)

  14. 接收消息
  15. 支持消费组
  16. 自动负载均衡

6.2.2 Kafka使用

Python
from kafka import KafkaProducer, KafkaConsumer
import json

# 生产者
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # lambda匿名函数:简洁的单行函数
)

# 发送消息
producer.send('sales', value={'product_id': 1, 'quantity': 10, 'amount': 100.0})
producer.flush()

# 消费者
consumer = KafkaConsumer(
    'sales',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),  # json.loads将JSON字符串转为Python对象
    auto_offset_reset='earliest'
)

# 接收消息
for message in consumer:
    print(f"Received: {message.value}")
    # 处理消息
    process_message(message.value)

6.3 Flink流处理

6.3.1 Flink概述

Apache Flink是分布式流处理引擎,支持有界和无界数据流。

核心特性

  1. 流批一体
  2. 统一API
  3. 流批切换
  4. 相同语义

  5. 状态管理

  6. 精确一次
  7. 状态后端
  8. 容错机制

  9. 窗口操作

  10. 时间窗口
  11. 计数窗口
  12. 会话窗口

6.3.2 Flink使用

Python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.expressions import col

# 创建执行环境
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

# 定义数据源
t_env.execute_sql("""
    CREATE TABLE sales (
        product_id INT,
        quantity INT,
        amount DECIMAL(10,2),
        sale_time TIMESTAMP(3),
        WATERMARK FOR sale_time AS sale_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'sales',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'sales-group',
        'format' = 'json'
    )
""")

# 实时聚合
result = t_env.sql_query("""
    SELECT
        product_id,
        COUNT(*) as order_count,
        SUM(quantity) as total_quantity,
        SUM(amount) as total_amount
    FROM sales
    GROUP BY
        product_id,
        TUMBLE(sale_time, INTERVAL '1' MINUTE)
""")

# 定义输出表
t_env.execute_sql("""
    CREATE TABLE sales_aggregate (
        product_id INT,
        order_count BIGINT,
        total_quantity BIGINT,
        total_amount DECIMAL(10,2)
    ) WITH (
        'connector' = 'print'
    )
""")

# 将结果写入输出表
result.execute_insert('sales_aggregate').wait()

6.4 Spark Streaming

6.4.1 Spark Streaming概述

Spark Streaming是Spark的流处理组件,支持微批处理。

核心特性

  1. 微批处理
  2. 批次处理
  3. 低延迟
  4. 高吞吐

  5. Exactly-Once

  6. 精确一次
  7. 事务支持
  8. 容错机制

  9. 统一API

  10. 流批一体
  11. 相同操作
  12. 易于使用

6.4.2 Spark Streaming使用

Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col
from pyspark.sql.types import StructType, IntegerType, TimestampType, DecimalType

# 创建SparkSession
spark = SparkSession.builder \
    .appName("SparkStreamingExample") \
    .getOrCreate()

# 读取Kafka数据
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sales") \
    .load()

# 解析数据
from pyspark.sql.functions import from_json, to_json

schema = StructType().add("product_id", IntegerType()) \
    .add("quantity", IntegerType()) \
    .add("amount", DecimalType(10,2)) \
    .add("sale_time", TimestampType())

df = df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select(
    col("data.product_id"),
    col("data.quantity"),
    col("data.amount"),
    col("data.sale_time")
)

# 实时聚合
from pyspark.sql.functions import sum as _sum, count as _count

aggregated_df = df.groupBy(
    window(col("sale_time"), "1 minute")
).agg(
    _count("*").alias("order_count"),
    _sum("quantity").alias("total_quantity"),
    _sum("amount").alias("total_amount")
)

# 输出结果
query = aggregated_df \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# 等待流结束
query.awaitTermination(timeout=300)

6.5 练习题

基础题

  1. 选择题
  2. 实时处理的核心特点不包括什么?

    • A. 低延迟
    • B. 高成本
    • C. 高吞吐
    • D. 持续处理
  3. 简答题

  4. 解释实时处理vs 批处理的区别。
  5. 说明Kafka的核心概念。

进阶题

  1. 实践题
  2. 搭建一个Kafka集群。
  3. 实现Flink流处理。
  4. 实现Spark Streaming。

  5. 设计题

  6. 设计一个实时数据处理架构。
  7. 设计一个实时推荐系统。

答案

1. 选择题答案

  1. B(实时处理的核心特点不包括高成本)

2. 简答题答案

实时处理vs 批处理的区别: - 实时处理:毫秒级延迟、高吞吐、高成本 - 批处理:分钟/小时级延迟、中吞吐、低成本

Kafka的核心概念: - Topic、Partition、Producer、Consumer

3. 实践题答案

参见6.2-6.4节的示例。

4. 设计题答案

参见6.1-6.4节的架构设计。

6.6 面试准备

大厂面试题

字节跳动

  1. 解释实时处理的核心概念。
  2. Kafka的分区机制是什么?
  3. Flink的窗口操作有哪些?
  4. 如何设计实时数据处理系统?

腾讯

  1. Flink和Spark Streaming的区别是什么?
  2. 如何实现Exactly-Once?
  3. 如何设计实时数据管道?
  4. 如何处理实时数据延迟?

阿里云

  1. 实时处理的最佳实践是什么?
  2. 如何设计实时数据质量?
  3. 如何设计实时数据监控?
  4. 如何设计实时数据容灾?

📚 参考资料

🎯 本章小结

本章深入讲解了实时数据处理,包括:

  1. 实时数据处理的核心概念
  2. Kafka消息队列
  3. Flink流处理
  4. Spark Streaming

通过本章学习,你掌握了实时数据处理的核心技术,能够设计和实现实时数据处理系统。下一章将深入学习流处理框架。