第2章:分布式系统基础¶
2.1 分布式系统概述¶
什么是分布式系统¶
分布式系统是由多个通过网络连接的计算机组成的系统,这些计算机协同工作以完成共同的目标。对用户来说,分布式系统看起来像一个统一的系统。
分布式系统的特点¶
- 并发性:多个节点可以同时处理请求
- 无全局时钟:节点之间没有统一的时钟
- 故障独立性:单个节点的故障不会导致整个系统崩溃
- 异步通信:节点之间通过消息传递通信
分布式系统的优势¶
- 高性能:通过并行处理提高性能
- 可扩展性:可以通过增加节点来扩展系统
- 高可用性:单个节点故障不影响整体
- 成本效益:使用廉价硬件构建大规模系统
2.2 CAP理论¶
2.2.1 CAP定理¶
CAP 定理(Brewer 猜想,后由 Gilbert & Lynch 形式化证明)讨论的是:当发生网络分区(Partition)时,分布式系统不可能同时满足一致性(C)与可用性(A)。在工程语境里,更准确的表述是:
- 在 P 发生 时,你必须在 C 与 A 之间做权衡;
-
在 无分区 的正常情况下,系统可以同时做到“看起来既一致又可用”。
-
Consistency(一致性,常指线性一致性/原子一致性):一次成功写入后,后续读取不会读到旧值
- Availability(可用性):对每个请求,非故障节点都能在有限时间内返回响应(即便是错误响应也算“有响应”)
- Partition Tolerance(分区容错):即使节点间消息发生丢失/延迟/分割成互不连通的分区,系统仍能继续按照既定策略提供服务
2.2.2 CAP的权衡¶
CA系统(假设无分区)¶
特点: - 强一致性 - 高可用性 - 一旦出现分区就无法同时保证 C 与 A(因此“CA”通常只在“单机/同机房强假设/不把分区作为必须容忍的故障模型”下讨论)
代表: - 单机关系型数据库(如单机 MySQL/PostgreSQL)
适用场景: - 单机系统 - 网络稳定的环境
CP系统(分区时优先一致性,牺牲可用性)¶
特点: - 强一致性 - 分区容错 - 在分区或无法达成多数派时,可能拒绝服务/超时(牺牲可用性)
代表: - ZooKeeper / etcd(典型:依赖共识,多数派可用) - HBase(依赖 HDFS + ZooKeeper,整体更偏 CP;具体还取决于部署与读写语义)
适用场景: - 金融系统 - 需要强一致性的场景
# CP系统示例:ZooKeeper
from kazoo.client import KazooClient
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
# 创建节点(强一致性)
zk.create('/my_node', b'my_data')
# 读取节点(保证一致性)
data, stat = zk.get('/my_node')
AP系统(分区时优先可用性,接受弱一致/最终一致)¶
特点: - 最终一致性或可调一致性(读写一致性可按场景配置) - 高可用性 - 分区容错
代表: - Dynamo 风格系统、Cassandra(在选择较弱一致性级别时更偏 AP) - CouchDB(强调可用与复制,通常接受最终一致)
说明:许多现代系统并非“纯 CP/纯 AP”,而是提供 可调一致性(tunable consistency) 或在不同故障条件下呈现不同权衡。
适用场景: - 社交网络 - 内容分发 - 可以接受最终一致性的场景
# AP系统示例:Cassandra
from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
# 写入数据(最终一致性)
session.execute(
"INSERT INTO users (id, name) VALUES (%s, %s)",
(1, 'John')
)
# 读取数据(可能读到旧数据)
rows = session.execute("SELECT * FROM users WHERE id = 1")
2.2.3 BASE理论¶
BASE理论是对CAP中AP系统的补充,强调最终一致性:
- Basically Available(基本可用):系统基本可用
- Soft state(软状态):状态可以随时间变化
- Eventually consistent(最终一致性):最终达到一致
2.3 分布式一致性¶
2.3.1 一致性模型¶
强一致性¶
所有操作在同一时间对所有节点可见。
优点: - 数据一致性强 - 简单易理解
缺点: - 性能开销大 - 可用性低
弱一致性¶
不保证所有操作在同一时间对所有节点可见。
优点: - 性能高 - 可用性高
缺点: - 一致性保证弱 - 应用复杂
最终一致性¶
保证在没有新更新的情况下,最终所有节点看到相同的数据。
优点: - 平衡一致性和性能 - 适合大多数场景
缺点: - 不保证实时一致性 - 需要处理不一致窗口
2.3.2 一致性协议¶
两阶段提交(2PC)¶
流程: 1. 准备阶段:协调者询问参与者是否可以提交 2. 提交阶段:根据参与者响应决定提交或回滚
优点: - 保证强一致性 - 实现简单
缺点: - 阻塞协议 - 单点故障 - 性能差
# 2PC简化实现
class TwoPhaseCommit:
def __init__(self, participants):
self.participants = participants
self.coordinator = Coordinator()
def commit(self, transaction):
# 准备阶段
prepared = []
for participant in self.participants:
if participant.prepare(transaction):
prepared.append(participant)
# 如果所有参与者都准备好,提交
if len(prepared) == len(self.participants):
for participant in prepared:
participant.commit(transaction)
return True
else:
# 回滚
for participant in prepared:
participant.rollback(transaction)
return False
三阶段提交(3PC)¶
流程: 1. CanCommit阶段:询问参与者是否可以提交 2. PreCommit阶段:参与者预提交 3. DoCommit阶段:正式提交
优点: - 减少阻塞 - 提高可用性
缺点: - 仍然存在阻塞 - 实现复杂
Paxos算法¶
Paxos是一种基于消息传递的分布式一致性算法。
角色: - Proposer:提出提案 - Acceptor:接受提案 - Learner:学习提案
流程: 1. Proposer选择一个提案编号n 2. Proposer向多数Acceptor发送Prepare(n) 3. Acceptor响应Promise(n, previous_accepted) 4. Proposer发送Accept(n, value) 5. Acceptor响应Accepted(n, value)
# Paxos简化实现
class PaxosNode:
def __init__(self, node_id):
self.node_id = node_id
self.proposed_value = None
self.accepted_value = None
self.promised_n = 0
def prepare(self, n):
if n > self.promised_n:
self.promised_n = n
return (True, self.accepted_value)
return (False, None)
def accept(self, n, value):
if n >= self.promised_n:
self.promised_n = n
self.accepted_value = value
return True
return False
Raft算法¶
Raft是一种更易理解的分布式一致性算法。
核心概念: - Leader:处理所有客户端请求 - Follower:被动接收Leader的请求 - Candidate:竞选Leader
流程: 1. 选举:Follower超时后成为Candidate,发起选举 2. 日志复制:Leader复制日志到Follower 3. 安全:保证已提交的日志不会丢失
# Raft简化实现
class RaftNode:
def __init__(self, node_id):
self.node_id = node_id
self.state = 'follower' # follower, candidate, leader
self.current_term = 0
self.voted_for = None
self.log = []
self.commit_index = 0
self.last_applied = 0
def start_election(self):
self.state = 'candidate'
self.current_term += 1
self.voted_for = self.node_id
# 请求投票
self.request_votes()
def request_votes(self):
# 向其他节点请求投票
pass
def append_entries(self, entries):
# Leader追加日志
pass
2.4 分布式事务¶
2.4.1 分布式事务的挑战¶
- 网络延迟:节点间通信存在延迟
- 节点故障:节点可能随时故障
- 数据一致性:保证多个节点的数据一致性
- 性能开销:协调多个节点的开销
2.4.2 分布式事务解决方案¶
2PC(两阶段提交)¶
如前所述,2PC是一种强一致性的分布式事务解决方案。
TCC(Try-Confirm-Cancel)¶
TCC是一种应用层的分布式事务解决方案。
流程: 1. Try:预留资源 2. Confirm:确认提交 3. Cancel:取消操作
# TCC实现示例
class TCCService:
def try_operation(self, data):
# 预留资源
if self.check_resource(data):
self.reserve_resource(data)
return True
return False
def confirm_operation(self, data):
# 确认提交
self.commit_resource(data)
def cancel_operation(self, data):
# 取消操作
self.release_resource(data)
Saga模式¶
Saga模式将长事务拆分为多个本地事务,每个本地事务都有补偿操作。
优点: - 避免长时间锁定 - 提高并发度 - 易于实现
缺点: - 最终一致性 - 需要补偿逻辑
# Saga实现示例
class Saga:
def __init__(self):
self.transactions = []
self.compensations = []
def add_transaction(self, transaction, compensation):
self.transactions.append(transaction)
self.compensations.append(compensation)
def execute(self):
executed = []
try:
for i, transaction in enumerate(self.transactions): # enumerate同时获取索引和值
transaction()
executed.append(i)
return True
except Exception as e:
# 执行补偿
for i in reversed(executed):
self.compensations[i]()
return False
本地消息表¶
本地消息表通过在本地数据库中记录消息,保证消息的可靠发送。
流程: 1. 执行本地事务 2. 在同一事务中写入消息表 3. 定时任务扫描消息表发送消息 4. 消息发送成功后删除或标记
# 本地消息表实现
class LocalMessageTable:
def __init__(self, db):
self.db = db
def send_message(self, message):
# 在事务中写入消息
with self.db.transaction():
self.execute_business_logic()
self.db.insert('message_table', {
'message': message,
'status': 'pending'
})
def process_messages(self):
# 定时处理消息
messages = self.db.query(
"SELECT * FROM message_table WHERE status = 'pending'"
)
for msg in messages:
if self.send_to_remote(msg['message']):
self.db.update(
'message_table',
{'status': 'sent'},
{'id': msg['id']}
)
事务消息¶
事务消息利用消息队列的事务特性保证消息的可靠发送。
流程: 1. 发送半消息 2. 执行本地事务 3. 提交/回滚消息 4. 消费者消费消息
# 事务消息实现(以RocketMQ为例)
from rocketmq.client import Producer, Message
producer = Producer('transaction_producer')
producer.set_name_server_address('localhost:9876')
producer.start()
def send_transaction_message(message, local_transaction):
# 发送事务消息
msg = Message('test_topic', message.encode('utf-8'))
send_result = producer.send_message_in_transaction(
msg, local_transaction, None
)
return send_result
def local_transaction_execute(msg, arg):
# 执行本地事务
try:
execute_business_logic()
return TransactionStatus.COMMIT
except Exception as e:
return TransactionStatus.ROLLBACK
2.5 分布式锁¶
2.5.1 分布式锁的需求¶
在分布式系统中,多个节点可能同时访问共享资源,需要使用分布式锁来保证互斥访问。
2.5.2 分布式锁实现¶
基于Redis的分布式锁¶
import redis
import time
import uuid
class RedisLock:
def __init__(self, redis_client, lock_name, expire_time=30):
self.redis = redis_client
self.lock_name = f"lock:{lock_name}"
self.expire_time = expire_time
self.identifier = str(uuid.uuid4())
def acquire(self, timeout=10):
end_time = time.time() + timeout
while time.time() < end_time:
# 使用SET命令的NX选项
if self.redis.set(
self.lock_name,
self.identifier,
nx=True,
ex=self.expire_time
):
return True
time.sleep(0.001)
return False
def release(self):
# 使用Lua脚本保证原子性
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis.eval(
lua_script,
1,
self.lock_name,
self.identifier
)
# 使用示例
redis_client = redis.Redis(host='localhost', port=6379)
lock = RedisLock(redis_client, 'my_resource')
if lock.acquire():
try:
# 执行需要加锁的操作
do_something()
finally:
lock.release()
基于ZooKeeper的分布式锁¶
from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError
class ZKLock:
def __init__(self, zk_client, lock_path):
self.zk = zk_client
self.lock_path = lock_path
self.lock_node = None
def acquire(self, timeout=10):
start_time = time.time()
while time.time() - start_time < timeout:
try: # try/except捕获异常
# 创建临时顺序节点
self.lock_node = self.zk.create(
f"{self.lock_path}/lock-",
ephemeral=True,
sequence=True
)
# 检查是否是最小的节点
children = self.zk.get_children(self.lock_path)
children.sort()
if self.lock_node.endswith(children[0]):
return True
# 等待前一个节点释放
self.wait_for_predecessor(children)
except NodeExistsError:
pass
return False
def release(self):
if self.lock_node:
self.zk.delete(self.lock_node)
self.lock_node = None
2.6 实战练习¶
练习1:实现一个简单的分布式锁¶
使用Redis实现一个分布式锁,要求: 1. 支持超时自动释放 2. 支持可重入 3. 防止死锁
练习2:实现一个简单的分布式事务¶
使用TCC模式实现一个分布式事务,包括: 1. Try阶段 2. Confirm阶段 3. Cancel阶段
练习3:分析CAP权衡¶
分析以下系统的CAP权衡: 1. MySQL主从复制 2. Redis Cluster 3. Cassandra
2.7 面试准备¶
常见面试题¶
- 什么是CAP理论?如何权衡?
- 什么是BASE理论?
- 什么是分布式事务?有哪些解决方案?
- 什么是分布式锁?如何实现?
- Paxos和Raft的区别?
项目经验准备¶
准备一个使用分布式技术的项目: - 使用的分布式技术 - 遇到的挑战 - 解决方案 - 项目成果
2.8 总结¶
本章介绍了分布式系统的基础理论,包括CAP理论、分布式一致性、分布式事务和分布式锁。这些是分布式系统设计的核心知识。
关键要点¶
- CAP定理指出一致性、可用性、分区容错性只能同时满足两个
- BASE理论强调最终一致性
- 分布式一致性协议包括2PC、3PC、Paxos、Raft
- 分布式事务解决方案包括2PC、TCC、Saga、本地消息表、事务消息
- 分布式锁可以使用Redis、ZooKeeper等实现
下一步¶
下一章将深入学习数据库架构设计,包括关系型数据库、NoSQL、分库分表等内容。