跳转至

第5章:消息队列架构

消息队列架构

📎 交叉引用(本章侧重架构设计,数据工程视角见以下教程): - Kafka流处理实战 → 数据工程/07-流处理框架 - 大数据处理架构 → 后端架构/大数据处理架构

5.1 消息队列概述

什么是消息队列

消息队列(Message Queue,MQ)是一种进程间通信或服务间通信的方式,生产者将消息发送到队列,消费者从队列中获取并处理消息。

消息队列的作用

  1. 异步处理:提高系统响应速度
  2. 解耦:降低系统耦合度
  3. 削峰填谷:平滑流量峰值
  4. 可靠性:保证消息不丢失

消息队列的应用场景

  1. 异步通信:用户注册后发送欢迎邮件
  2. 系统解耦:订单系统与库存系统解耦
  3. 流量削峰:秒杀场景下的流量控制
  4. 日志处理:收集和处理日志
  5. 实时通信:即时消息推送

5.2 消息队列选型

5.2.1 主流消息队列对比

特性 Kafka RabbitMQ RocketMQ Redis
吞吐量 极高 中等
延迟 极低
可靠性
功能 简单 丰富 丰富 简单
运维复杂度
适用场景 大数据、日志 传统企业应用 电商、金融 简单场景

5.2.2 Kafka

特点: - 高吞吐量:每秒百万级消息 - 低延迟:毫秒级延迟 - 持久化:消息持久化到磁盘 - 分布式:支持分布式部署

适用场景: - 日志收集 - 流式处理 - 大数据场景

Python
from kafka import KafkaProducer, KafkaConsumer

# 生产者
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # lambda匿名函数:简洁的单行函数
)

producer.send('test_topic', {'key': 'value'})
producer.flush()

# 消费者
consumer = KafkaConsumer(
    'test_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    print(message.value)

5.2.3 RabbitMQ

特点: - 功能丰富:支持多种消息模式 - 可靠性高:支持消息确认和持久化 - 灵活的路由:支持Exchange和RoutingKey - 管理界面:提供Web管理界面

适用场景: - 传统企业应用 - 复杂路由场景 - 需要高可靠性的场景

Python
import pika

# 生产者
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

channel.queue_declare(queue='test_queue')

channel.basic_publish(
    exchange='',
    routing_key='test_queue',
    body='Hello, RabbitMQ!'
)

connection.close()

# 消费者
def callback(ch, method, properties, body):
    print(f"Received: {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    queue='test_queue',
    on_message_callback=callback
)

channel.start_consuming()

5.2.4 RocketMQ

特点: - 高性能:支持高吞吐量 - 事务消息:支持分布式事务 - 消息顺序:保证消息顺序 - 消息回溯:支持消息回溯

适用场景: - 电商系统 - 金融系统 - 需要事务消息的场景

Python
from rocketmq.client import Producer, Message, PushConsumer, ConsumeStatus

# 生产者
producer = Producer('test_producer')
producer.set_name_server_address('localhost:9876')
producer.start()

msg = Message('test_topic')
msg.set_body('Hello, RocketMQ!')
producer.send_sync(msg)
producer.shutdown()

# 消费者
def callback(msg):
    print(f"Received: {msg.body}")
    return ConsumeStatus.CONSUME_SUCCESS

consumer = PushConsumer('test_consumer')
consumer.set_name_server_address('localhost:9876')
consumer.subscribe('test_topic', callback)
consumer.start()

5.2.5 Redis作为消息队列

特点: - 简单:实现简单 - 高性能:内存操作 - 功能有限:只支持简单场景

适用场景: - 简单的异步任务 - 轻量级消息队列

Python
import json
import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# 生产者
r.lpush('task_queue', json.dumps({'task': 'send_email'}))

# 消费者
while True:
    task = r.brpop('task_queue', timeout=5)
    if task:
        task_data = json.loads(task[1])  # json.loads将JSON字符串转为Python对象
        print(f"Processing: {task_data}")

5.3 消息可靠性

5.3.1 消息不丢失

生产者端

  1. 同步发送:等待确认
  2. 重试机制:发送失败重试
  3. 持久化:消息持久化到磁盘
Python
# Kafka生产者配置
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    acks='all',  # 等待所有副本确认
    retries=3,  # 重试次数
    max_in_flight_requests_per_connection=1,  # 顺序发送
    enable_idempotence=True  # 幂等性
)

# 发送消息并等待确认
future = producer.send('test_topic', {'key': 'value'})
record_metadata = future.get(timeout=10)

消费者端

  1. 手动确认:处理完成后确认
  2. 持久化:确认前持久化
  3. 重试机制:处理失败重试
Python
# RabbitMQ消费者配置
channel.basic_consume(
    queue='test_queue',
    on_message_callback=callback,
    auto_ack=False  # 关闭自动确认
)

def callback(ch, method, properties, body):
    try:
        # 处理消息
        process_message(body)

        # 手动确认
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # 处理失败,拒绝消息并重新入队
        ch.basic_nack(
            delivery_tag=method.delivery_tag,
            requeue=True
        )

Broker端

  1. 持久化:消息持久化到磁盘
  2. 集群:多副本保证高可用
  3. 监控:监控消息状态
Python
# RabbitMQ持久化配置
channel.queue_declare(
    queue='test_queue',
    durable=True  # 队列持久化
)

channel.basic_publish(
    exchange='',
    routing_key='test_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2  # 消息持久化
    )
)

5.3.2 消息不重复

幂等性设计

Python
class IdempotentProcessor:
    def __init__(self, redis_client):
        self.redis = redis_client

    def process(self, message):
        message_id = message['id']

        # 检查是否已处理
        if self.redis.exists(f"processed:{message_id}"):
            print(f"Message {message_id} already processed")
            return

        # 处理消息
        self.do_process(message)

        # 标记为已处理
        self.redis.setex(
            f"processed:{message_id}",
            86400,  # 24小时
            '1'
        )

    def do_process(self, message):
        # 实际处理逻辑
        pass

# 使用示例
processor = IdempotentProcessor(r)
processor.process({'id': 'msg_123', 'data': '...'})

唯一ID

Python
import uuid

def send_message(producer, topic, data):
    # 生成唯一ID
    message_id = str(uuid.uuid4())

    message = {
        'id': message_id,
        'data': data,
        'timestamp': time.time()
    }

    producer.send(topic, message)
    return message_id

5.3.3 消息顺序

单分区顺序

Python
# Kafka:使用相同key的消息发送到同一分区
producer.send(
    'test_topic',
    key='user_123',  # 相同key发送到同一分区
    value={'data': '...'}
)

消费者单线程

Python
# 消费者使用单线程消费
def consume_messages():
    consumer = KafkaConsumer(
        'test_topic',
        bootstrap_servers=['localhost:9092'],
        max_poll_records=1  # 每次只拉取一条消息
    )

    for message in consumer:
        # 串行处理,保证顺序
        process_message(message.value)
        consumer.commit()

5.4 消息幂等性

5.4.1 幂等性设计

  1. 唯一ID:为每条消息分配唯一ID
  2. 去重表:使用数据库或缓存记录已处理消息
  3. 乐观锁:使用版本号控制

5.4.2 幂等性实现

Python
class MessageProcessor:
    def __init__(self, db):
        self.db = db

    def process(self, message):
        message_id = message['id']

        # 检查是否已处理
        processed = self.db.query(
            "SELECT * FROM processed_messages WHERE id = %s",
            message_id
        )

        if processed:
            print(f"Message {message_id} already processed")
            return

        # 使用事务保证原子性
        with self.db.transaction():
            # 标记为处理中
            self.db.execute(
                "INSERT INTO processed_messages (id, status) VALUES (%s, 'processing')",
                message_id
            )

            try:
                # 处理消息
                self.do_process(message)

                # 更新状态为已完成
                self.db.execute(
                    "UPDATE processed_messages SET status = 'completed' WHERE id = %s",
                    message_id
                )
            except Exception as e:
                # 处理失败,更新状态
                self.db.execute(
                    "UPDATE processed_messages SET status = 'failed', error = %s WHERE id = %s",
                    (str(e), message_id)
                )
                raise

5.5 消息积压处理

5.5.1 积压原因

  1. 生产者发送速度 > 消费者处理速度
  2. 消费者处理失败
  3. 消费者宕机
  4. 网络问题

5.5.2 积压解决方案

增加消费者

Python
# 启动多个消费者进程
def start_consumer(consumer_id):
    consumer = KafkaConsumer(
        'test_topic',
        bootstrap_servers=['localhost:9092'],
        group_id='test_group',
        consumer_timeout_ms=1000
    )

    print(f"Consumer {consumer_id} started")

    for message in consumer:
        process_message(message.value)

# 启动多个消费者
import multiprocessing
processes = []
for i in range(5):  # 启动5个消费者
    p = multiprocessing.Process(target=start_consumer, args=(i,))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

临时扩容

Python
# 临时创建更多消费者处理积压
def handle_backlog():
    # 检查积压量
    backlog_size = get_queue_size()

    if backlog_size > 10000:
        # 临时扩容消费者
        for i in range(10):  # 增加10个临时消费者
            start_temporary_consumer(i)

    # 监控积压量,动态调整消费者数量
    monitor_backlog()

def monitor_backlog():
    while True:
        backlog_size = get_queue_size()
        consumer_count = get_consumer_count()

        # 根据积压量调整消费者数量
        if backlog_size > 10000 and consumer_count < 20:
            add_consumer()
        elif backlog_size < 1000 and consumer_count > 5:
            remove_consumer()

        time.sleep(60)

降级处理

Python
def process_message(message):
    try:  # try/except捕获异常
        # 正常处理
        do_process(message)
    except Exception as e:
        # 处理失败,降级处理
        fallback_process(message, e)

def fallback_process(message, error):
    # 降级处理:记录日志,稍后重试
    log_error(message, str(error))

    # 将消息放入重试队列
    r.lpush('retry_queue', json.dumps(message))  # json.dumps将Python对象转为JSON字符串

5.6 实战练习

练习1:实现一个可靠的消息队列系统

实现一个可靠的消息队列系统,包括: 1. 生产者:保证消息不丢失 2. 消费者:保证消息不丢失、不重复 3. 监控:监控消息状态

练习2:设计一个订单系统的消息队列

为一个电商订单系统设计消息队列: 1. 订单创建后发送消息 2. 库存系统消费消息 3. 支付系统消费消息 4. 保证消息顺序

练习3:处理消息积压

设计一个消息积压处理方案: 1. 监控消息积压 2. 动态调整消费者数量 3. 降级处理策略

5.7 面试准备

常见面试题

  1. 消息队列的作用是什么?
  2. 如何保证消息不丢失?
  3. 如何保证消息不重复?
  4. 如何保证消息顺序?
  5. 如何处理消息积压?

项目经验准备

准备一个使用消息队列的项目: - 使用的消息队列 - 遇到的挑战 - 解决方案 - 项目成果

5.8 总结

本章介绍了消息队列架构,包括消息队列选型、消息可靠性、消息幂等性和消息积压处理。消息队列是异步处理和解耦的重要工具。

关键要点

  1. 消息队列选型需要根据场景选择合适的MQ
  2. 消息可靠性需要从生产者、消费者、Broker三方面保证
  3. 消息幂等性通过唯一ID和去重表实现
  4. 消息顺序可以通过单分区和单线程消费保证
  5. 消息积压可以通过增加消费者和降级处理解决

下一步

下一章将深入学习微服务架构,包括微服务拆分、服务治理等内容。