第5章:消息队列架构¶
📎 交叉引用(本章侧重架构设计,数据工程视角见以下教程): - Kafka流处理实战 → 数据工程/07-流处理框架 - 大数据处理架构 → 后端架构/大数据处理架构
5.1 消息队列概述¶
什么是消息队列¶
消息队列(Message Queue,MQ)是一种进程间通信或服务间通信的方式,生产者将消息发送到队列,消费者从队列中获取并处理消息。
消息队列的作用¶
- 异步处理:提高系统响应速度
- 解耦:降低系统耦合度
- 削峰填谷:平滑流量峰值
- 可靠性:保证消息不丢失
消息队列的应用场景¶
- 异步通信:用户注册后发送欢迎邮件
- 系统解耦:订单系统与库存系统解耦
- 流量削峰:秒杀场景下的流量控制
- 日志处理:收集和处理日志
- 实时通信:即时消息推送
5.2 消息队列选型¶
5.2.1 主流消息队列对比¶
| 特性 | Kafka | RabbitMQ | RocketMQ | Redis |
|---|---|---|---|---|
| 吞吐量 | 极高 | 中等 | 高 | 高 |
| 延迟 | 低 | 低 | 低 | 极低 |
| 可靠性 | 高 | 高 | 高 | 中 |
| 功能 | 简单 | 丰富 | 丰富 | 简单 |
| 运维复杂度 | 高 | 中 | 高 | 低 |
| 适用场景 | 大数据、日志 | 传统企业应用 | 电商、金融 | 简单场景 |
5.2.2 Kafka¶
特点: - 高吞吐量:每秒百万级消息 - 低延迟:毫秒级延迟 - 持久化:消息持久化到磁盘 - 分布式:支持分布式部署
适用场景: - 日志收集 - 流式处理 - 大数据场景
from kafka import KafkaProducer, KafkaConsumer
# 生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8') # lambda匿名函数:简洁的单行函数
)
producer.send('test_topic', {'key': 'value'})
producer.flush()
# 消费者
consumer = KafkaConsumer(
'test_topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(message.value)
5.2.3 RabbitMQ¶
特点: - 功能丰富:支持多种消息模式 - 可靠性高:支持消息确认和持久化 - 灵活的路由:支持Exchange和RoutingKey - 管理界面:提供Web管理界面
适用场景: - 传统企业应用 - 复杂路由场景 - 需要高可靠性的场景
import pika
# 生产者
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='test_queue')
channel.basic_publish(
exchange='',
routing_key='test_queue',
body='Hello, RabbitMQ!'
)
connection.close()
# 消费者
def callback(ch, method, properties, body):
print(f"Received: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue='test_queue',
on_message_callback=callback
)
channel.start_consuming()
5.2.4 RocketMQ¶
特点: - 高性能:支持高吞吐量 - 事务消息:支持分布式事务 - 消息顺序:保证消息顺序 - 消息回溯:支持消息回溯
适用场景: - 电商系统 - 金融系统 - 需要事务消息的场景
from rocketmq.client import Producer, Message, PushConsumer, ConsumeStatus
# 生产者
producer = Producer('test_producer')
producer.set_name_server_address('localhost:9876')
producer.start()
msg = Message('test_topic')
msg.set_body('Hello, RocketMQ!')
producer.send_sync(msg)
producer.shutdown()
# 消费者
def callback(msg):
print(f"Received: {msg.body}")
return ConsumeStatus.CONSUME_SUCCESS
consumer = PushConsumer('test_consumer')
consumer.set_name_server_address('localhost:9876')
consumer.subscribe('test_topic', callback)
consumer.start()
5.2.5 Redis作为消息队列¶
特点: - 简单:实现简单 - 高性能:内存操作 - 功能有限:只支持简单场景
适用场景: - 简单的异步任务 - 轻量级消息队列
import json
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 生产者
r.lpush('task_queue', json.dumps({'task': 'send_email'}))
# 消费者
while True:
task = r.brpop('task_queue', timeout=5)
if task:
task_data = json.loads(task[1]) # json.loads将JSON字符串转为Python对象
print(f"Processing: {task_data}")
5.3 消息可靠性¶
5.3.1 消息不丢失¶
生产者端¶
- 同步发送:等待确认
- 重试机制:发送失败重试
- 持久化:消息持久化到磁盘
# Kafka生产者配置
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
acks='all', # 等待所有副本确认
retries=3, # 重试次数
max_in_flight_requests_per_connection=1, # 顺序发送
enable_idempotence=True # 幂等性
)
# 发送消息并等待确认
future = producer.send('test_topic', {'key': 'value'})
record_metadata = future.get(timeout=10)
消费者端¶
- 手动确认:处理完成后确认
- 持久化:确认前持久化
- 重试机制:处理失败重试
# RabbitMQ消费者配置
channel.basic_consume(
queue='test_queue',
on_message_callback=callback,
auto_ack=False # 关闭自动确认
)
def callback(ch, method, properties, body):
try:
# 处理消息
process_message(body)
# 手动确认
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# 处理失败,拒绝消息并重新入队
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
Broker端¶
- 持久化:消息持久化到磁盘
- 集群:多副本保证高可用
- 监控:监控消息状态
# RabbitMQ持久化配置
channel.queue_declare(
queue='test_queue',
durable=True # 队列持久化
)
channel.basic_publish(
exchange='',
routing_key='test_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2 # 消息持久化
)
)
5.3.2 消息不重复¶
幂等性设计¶
class IdempotentProcessor:
def __init__(self, redis_client):
self.redis = redis_client
def process(self, message):
message_id = message['id']
# 检查是否已处理
if self.redis.exists(f"processed:{message_id}"):
print(f"Message {message_id} already processed")
return
# 处理消息
self.do_process(message)
# 标记为已处理
self.redis.setex(
f"processed:{message_id}",
86400, # 24小时
'1'
)
def do_process(self, message):
# 实际处理逻辑
pass
# 使用示例
processor = IdempotentProcessor(r)
processor.process({'id': 'msg_123', 'data': '...'})
唯一ID¶
import uuid
def send_message(producer, topic, data):
# 生成唯一ID
message_id = str(uuid.uuid4())
message = {
'id': message_id,
'data': data,
'timestamp': time.time()
}
producer.send(topic, message)
return message_id
5.3.3 消息顺序¶
单分区顺序¶
# Kafka:使用相同key的消息发送到同一分区
producer.send(
'test_topic',
key='user_123', # 相同key发送到同一分区
value={'data': '...'}
)
消费者单线程¶
# 消费者使用单线程消费
def consume_messages():
consumer = KafkaConsumer(
'test_topic',
bootstrap_servers=['localhost:9092'],
max_poll_records=1 # 每次只拉取一条消息
)
for message in consumer:
# 串行处理,保证顺序
process_message(message.value)
consumer.commit()
5.4 消息幂等性¶
5.4.1 幂等性设计¶
- 唯一ID:为每条消息分配唯一ID
- 去重表:使用数据库或缓存记录已处理消息
- 乐观锁:使用版本号控制
5.4.2 幂等性实现¶
class MessageProcessor:
def __init__(self, db):
self.db = db
def process(self, message):
message_id = message['id']
# 检查是否已处理
processed = self.db.query(
"SELECT * FROM processed_messages WHERE id = %s",
message_id
)
if processed:
print(f"Message {message_id} already processed")
return
# 使用事务保证原子性
with self.db.transaction():
# 标记为处理中
self.db.execute(
"INSERT INTO processed_messages (id, status) VALUES (%s, 'processing')",
message_id
)
try:
# 处理消息
self.do_process(message)
# 更新状态为已完成
self.db.execute(
"UPDATE processed_messages SET status = 'completed' WHERE id = %s",
message_id
)
except Exception as e:
# 处理失败,更新状态
self.db.execute(
"UPDATE processed_messages SET status = 'failed', error = %s WHERE id = %s",
(str(e), message_id)
)
raise
5.5 消息积压处理¶
5.5.1 积压原因¶
- 生产者发送速度 > 消费者处理速度
- 消费者处理失败
- 消费者宕机
- 网络问题
5.5.2 积压解决方案¶
增加消费者¶
# 启动多个消费者进程
def start_consumer(consumer_id):
consumer = KafkaConsumer(
'test_topic',
bootstrap_servers=['localhost:9092'],
group_id='test_group',
consumer_timeout_ms=1000
)
print(f"Consumer {consumer_id} started")
for message in consumer:
process_message(message.value)
# 启动多个消费者
import multiprocessing
processes = []
for i in range(5): # 启动5个消费者
p = multiprocessing.Process(target=start_consumer, args=(i,))
p.start()
processes.append(p)
for p in processes:
p.join()
临时扩容¶
# 临时创建更多消费者处理积压
def handle_backlog():
# 检查积压量
backlog_size = get_queue_size()
if backlog_size > 10000:
# 临时扩容消费者
for i in range(10): # 增加10个临时消费者
start_temporary_consumer(i)
# 监控积压量,动态调整消费者数量
monitor_backlog()
def monitor_backlog():
while True:
backlog_size = get_queue_size()
consumer_count = get_consumer_count()
# 根据积压量调整消费者数量
if backlog_size > 10000 and consumer_count < 20:
add_consumer()
elif backlog_size < 1000 and consumer_count > 5:
remove_consumer()
time.sleep(60)
降级处理¶
def process_message(message):
try: # try/except捕获异常
# 正常处理
do_process(message)
except Exception as e:
# 处理失败,降级处理
fallback_process(message, e)
def fallback_process(message, error):
# 降级处理:记录日志,稍后重试
log_error(message, str(error))
# 将消息放入重试队列
r.lpush('retry_queue', json.dumps(message)) # json.dumps将Python对象转为JSON字符串
5.6 实战练习¶
练习1:实现一个可靠的消息队列系统¶
实现一个可靠的消息队列系统,包括: 1. 生产者:保证消息不丢失 2. 消费者:保证消息不丢失、不重复 3. 监控:监控消息状态
练习2:设计一个订单系统的消息队列¶
为一个电商订单系统设计消息队列: 1. 订单创建后发送消息 2. 库存系统消费消息 3. 支付系统消费消息 4. 保证消息顺序
练习3:处理消息积压¶
设计一个消息积压处理方案: 1. 监控消息积压 2. 动态调整消费者数量 3. 降级处理策略
5.7 面试准备¶
常见面试题¶
- 消息队列的作用是什么?
- 如何保证消息不丢失?
- 如何保证消息不重复?
- 如何保证消息顺序?
- 如何处理消息积压?
项目经验准备¶
准备一个使用消息队列的项目: - 使用的消息队列 - 遇到的挑战 - 解决方案 - 项目成果
5.8 总结¶
本章介绍了消息队列架构,包括消息队列选型、消息可靠性、消息幂等性和消息积压处理。消息队列是异步处理和解耦的重要工具。
关键要点¶
- 消息队列选型需要根据场景选择合适的MQ
- 消息可靠性需要从生产者、消费者、Broker三方面保证
- 消息幂等性通过唯一ID和去重表实现
- 消息顺序可以通过单分区和单线程消费保证
- 消息积压可以通过增加消费者和降级处理解决
下一步¶
下一章将深入学习微服务架构,包括微服务拆分、服务治理等内容。