跳转至

第2章:分布式系统基础

分布式系统基础

2.1 分布式系统概述

什么是分布式系统

分布式系统是由多个通过网络连接的计算机组成的系统,这些计算机协同工作以完成共同的目标。对用户来说,分布式系统看起来像一个统一的系统。

分布式系统的特点

  1. 并发性:多个节点可以同时处理请求
  2. 无全局时钟:节点之间没有统一的时钟
  3. 故障独立性:单个节点的故障不会导致整个系统崩溃
  4. 异步通信:节点之间通过消息传递通信

分布式系统的优势

  1. 高性能:通过并行处理提高性能
  2. 可扩展性:可以通过增加节点来扩展系统
  3. 高可用性:单个节点故障不影响整体
  4. 成本效益:使用廉价硬件构建大规模系统

2.2 CAP理论

2.2.1 CAP定理

CAP 定理(Brewer 猜想,后由 Gilbert & Lynch 形式化证明)讨论的是:当发生网络分区(Partition)时,分布式系统不可能同时满足一致性(C)与可用性(A)。在工程语境里,更准确的表述是:

  • P 发生 时,你必须在 CA 之间做权衡;
  • 无分区 的正常情况下,系统可以同时做到“看起来既一致又可用”。

  • Consistency(一致性,常指线性一致性/原子一致性):一次成功写入后,后续读取不会读到旧值

  • Availability(可用性):对每个请求,非故障节点都能在有限时间内返回响应(即便是错误响应也算“有响应”)
  • Partition Tolerance(分区容错):即使节点间消息发生丢失/延迟/分割成互不连通的分区,系统仍能继续按照既定策略提供服务

2.2.2 CAP的权衡

CA系统(假设无分区)

特点: - 强一致性 - 高可用性 - 一旦出现分区就无法同时保证 C 与 A(因此“CA”通常只在“单机/同机房强假设/不把分区作为必须容忍的故障模型”下讨论)

代表: - 单机关系型数据库(如单机 MySQL/PostgreSQL)

适用场景: - 单机系统 - 网络稳定的环境

CP系统(分区时优先一致性,牺牲可用性)

特点: - 强一致性 - 分区容错 - 在分区或无法达成多数派时,可能拒绝服务/超时(牺牲可用性)

代表: - ZooKeeper / etcd(典型:依赖共识,多数派可用) - HBase(依赖 HDFS + ZooKeeper,整体更偏 CP;具体还取决于部署与读写语义)

适用场景: - 金融系统 - 需要强一致性的场景

Python
# CP系统示例:ZooKeeper
from kazoo.client import KazooClient

zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()

# 创建节点(强一致性)
zk.create('/my_node', b'my_data')

# 读取节点(保证一致性)
data, stat = zk.get('/my_node')

AP系统(分区时优先可用性,接受弱一致/最终一致)

特点: - 最终一致性或可调一致性(读写一致性可按场景配置) - 高可用性 - 分区容错

代表: - Dynamo 风格系统、Cassandra(在选择较弱一致性级别时更偏 AP) - CouchDB(强调可用与复制,通常接受最终一致)

说明:许多现代系统并非“纯 CP/纯 AP”,而是提供 可调一致性(tunable consistency) 或在不同故障条件下呈现不同权衡。

适用场景: - 社交网络 - 内容分发 - 可以接受最终一致性的场景

Python
# AP系统示例:Cassandra
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

# 写入数据(最终一致性)
session.execute(
    "INSERT INTO users (id, name) VALUES (%s, %s)",
    (1, 'John')
)

# 读取数据(可能读到旧数据)
rows = session.execute("SELECT * FROM users WHERE id = 1")

2.2.3 BASE理论

BASE理论是对CAP中AP系统的补充,强调最终一致性:

  • Basically Available(基本可用):系统基本可用
  • Soft state(软状态):状态可以随时间变化
  • Eventually consistent(最终一致性):最终达到一致

2.3 分布式一致性

2.3.1 一致性模型

强一致性

所有操作在同一时间对所有节点可见。

优点: - 数据一致性强 - 简单易理解

缺点: - 性能开销大 - 可用性低

弱一致性

不保证所有操作在同一时间对所有节点可见。

优点: - 性能高 - 可用性高

缺点: - 一致性保证弱 - 应用复杂

最终一致性

保证在没有新更新的情况下,最终所有节点看到相同的数据。

优点: - 平衡一致性和性能 - 适合大多数场景

缺点: - 不保证实时一致性 - 需要处理不一致窗口

2.3.2 一致性协议

两阶段提交(2PC)

流程: 1. 准备阶段:协调者询问参与者是否可以提交 2. 提交阶段:根据参与者响应决定提交或回滚

优点: - 保证强一致性 - 实现简单

缺点: - 阻塞协议 - 单点故障 - 性能差

Python
# 2PC简化实现
class TwoPhaseCommit:
    def __init__(self, participants):
        self.participants = participants
        self.coordinator = Coordinator()

    def commit(self, transaction):
        # 准备阶段
        prepared = []
        for participant in self.participants:
            if participant.prepare(transaction):
                prepared.append(participant)

        # 如果所有参与者都准备好,提交
        if len(prepared) == len(self.participants):
            for participant in prepared:
                participant.commit(transaction)
            return True
        else:
            # 回滚
            for participant in prepared:
                participant.rollback(transaction)
            return False

三阶段提交(3PC)

流程: 1. CanCommit阶段:询问参与者是否可以提交 2. PreCommit阶段:参与者预提交 3. DoCommit阶段:正式提交

优点: - 减少阻塞 - 提高可用性

缺点: - 仍然存在阻塞 - 实现复杂

Paxos算法

Paxos是一种基于消息传递的分布式一致性算法。

角色: - Proposer:提出提案 - Acceptor:接受提案 - Learner:学习提案

流程: 1. Proposer选择一个提案编号n 2. Proposer向多数Acceptor发送Prepare(n) 3. Acceptor响应Promise(n, previous_accepted) 4. Proposer发送Accept(n, value) 5. Acceptor响应Accepted(n, value)

Python
# Paxos简化实现
class PaxosNode:
    def __init__(self, node_id):
        self.node_id = node_id
        self.proposed_value = None
        self.accepted_value = None
        self.promised_n = 0

    def prepare(self, n):
        if n > self.promised_n:
            self.promised_n = n
            return (True, self.accepted_value)
        return (False, None)

    def accept(self, n, value):
        if n >= self.promised_n:
            self.promised_n = n
            self.accepted_value = value
            return True
        return False

Raft算法

Raft是一种更易理解的分布式一致性算法。

核心概念: - Leader:处理所有客户端请求 - Follower:被动接收Leader的请求 - Candidate:竞选Leader

流程: 1. 选举:Follower超时后成为Candidate,发起选举 2. 日志复制:Leader复制日志到Follower 3. 安全:保证已提交的日志不会丢失

Python
# Raft简化实现
class RaftNode:
    def __init__(self, node_id):
        self.node_id = node_id
        self.state = 'follower'  # follower, candidate, leader
        self.current_term = 0
        self.voted_for = None
        self.log = []
        self.commit_index = 0
        self.last_applied = 0

    def start_election(self):
        self.state = 'candidate'
        self.current_term += 1
        self.voted_for = self.node_id
        # 请求投票
        self.request_votes()

    def request_votes(self):
        # 向其他节点请求投票
        pass

    def append_entries(self, entries):
        # Leader追加日志
        pass

2.4 分布式事务

2.4.1 分布式事务的挑战

  1. 网络延迟:节点间通信存在延迟
  2. 节点故障:节点可能随时故障
  3. 数据一致性:保证多个节点的数据一致性
  4. 性能开销:协调多个节点的开销

2.4.2 分布式事务解决方案

2PC(两阶段提交)

如前所述,2PC是一种强一致性的分布式事务解决方案。

TCC(Try-Confirm-Cancel)

TCC是一种应用层的分布式事务解决方案。

流程: 1. Try:预留资源 2. Confirm:确认提交 3. Cancel:取消操作

Python
# TCC实现示例
class TCCService:
    def try_operation(self, data):
        # 预留资源
        if self.check_resource(data):
            self.reserve_resource(data)
            return True
        return False

    def confirm_operation(self, data):
        # 确认提交
        self.commit_resource(data)

    def cancel_operation(self, data):
        # 取消操作
        self.release_resource(data)

Saga模式

Saga模式将长事务拆分为多个本地事务,每个本地事务都有补偿操作。

优点: - 避免长时间锁定 - 提高并发度 - 易于实现

缺点: - 最终一致性 - 需要补偿逻辑

Python
# Saga实现示例
class Saga:
    def __init__(self):
        self.transactions = []
        self.compensations = []

    def add_transaction(self, transaction, compensation):
        self.transactions.append(transaction)
        self.compensations.append(compensation)

    def execute(self):
        executed = []
        try:
            for i, transaction in enumerate(self.transactions):  # enumerate同时获取索引和值
                transaction()
                executed.append(i)
            return True
        except Exception as e:
            # 执行补偿
            for i in reversed(executed):
                self.compensations[i]()
            return False

本地消息表

本地消息表通过在本地数据库中记录消息,保证消息的可靠发送。

流程: 1. 执行本地事务 2. 在同一事务中写入消息表 3. 定时任务扫描消息表发送消息 4. 消息发送成功后删除或标记

Python
# 本地消息表实现
class LocalMessageTable:
    def __init__(self, db):
        self.db = db

    def send_message(self, message):
        # 在事务中写入消息
        with self.db.transaction():
            self.execute_business_logic()
            self.db.insert('message_table', {
                'message': message,
                'status': 'pending'
            })

    def process_messages(self):
        # 定时处理消息
        messages = self.db.query(
            "SELECT * FROM message_table WHERE status = 'pending'"
        )
        for msg in messages:
            if self.send_to_remote(msg['message']):
                self.db.update(
                    'message_table',
                    {'status': 'sent'},
                    {'id': msg['id']}
                )

事务消息

事务消息利用消息队列的事务特性保证消息的可靠发送。

流程: 1. 发送半消息 2. 执行本地事务 3. 提交/回滚消息 4. 消费者消费消息

Python
# 事务消息实现(以RocketMQ为例)
from rocketmq.client import Producer, Message

producer = Producer('transaction_producer')
producer.set_name_server_address('localhost:9876')
producer.start()

def send_transaction_message(message, local_transaction):
    # 发送事务消息
    msg = Message('test_topic', message.encode('utf-8'))
    send_result = producer.send_message_in_transaction(
        msg, local_transaction, None
    )
    return send_result

def local_transaction_execute(msg, arg):
    # 执行本地事务
    try:
        execute_business_logic()
        return TransactionStatus.COMMIT
    except Exception as e:
        return TransactionStatus.ROLLBACK

2.5 分布式锁

2.5.1 分布式锁的需求

在分布式系统中,多个节点可能同时访问共享资源,需要使用分布式锁来保证互斥访问。

2.5.2 分布式锁实现

基于Redis的分布式锁

Python
import redis
import time
import uuid

class RedisLock:
    def __init__(self, redis_client, lock_name, expire_time=30):
        self.redis = redis_client
        self.lock_name = f"lock:{lock_name}"
        self.expire_time = expire_time
        self.identifier = str(uuid.uuid4())

    def acquire(self, timeout=10):
        end_time = time.time() + timeout
        while time.time() < end_time:
            # 使用SET命令的NX选项
            if self.redis.set(
                self.lock_name,
                self.identifier,
                nx=True,
                ex=self.expire_time
            ):
                return True
            time.sleep(0.001)
        return False

    def release(self):
        # 使用Lua脚本保证原子性
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        self.redis.eval(
            lua_script,
            1,
            self.lock_name,
            self.identifier
        )

# 使用示例
redis_client = redis.Redis(host='localhost', port=6379)
lock = RedisLock(redis_client, 'my_resource')

if lock.acquire():
    try:
        # 执行需要加锁的操作
        do_something()
    finally:
        lock.release()

基于ZooKeeper的分布式锁

Python
from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError

class ZKLock:
    def __init__(self, zk_client, lock_path):
        self.zk = zk_client
        self.lock_path = lock_path
        self.lock_node = None

    def acquire(self, timeout=10):
        start_time = time.time()
        while time.time() - start_time < timeout:
            try:  # try/except捕获异常
                # 创建临时顺序节点
                self.lock_node = self.zk.create(
                    f"{self.lock_path}/lock-",
                    ephemeral=True,
                    sequence=True
                )

                # 检查是否是最小的节点
                children = self.zk.get_children(self.lock_path)
                children.sort()

                if self.lock_node.endswith(children[0]):
                    return True

                # 等待前一个节点释放
                self.wait_for_predecessor(children)

            except NodeExistsError:
                pass
        return False

    def release(self):
        if self.lock_node:
            self.zk.delete(self.lock_node)
            self.lock_node = None

2.6 实战练习

练习1:实现一个简单的分布式锁

使用Redis实现一个分布式锁,要求: 1. 支持超时自动释放 2. 支持可重入 3. 防止死锁

练习2:实现一个简单的分布式事务

使用TCC模式实现一个分布式事务,包括: 1. Try阶段 2. Confirm阶段 3. Cancel阶段

练习3:分析CAP权衡

分析以下系统的CAP权衡: 1. MySQL主从复制 2. Redis Cluster 3. Cassandra

2.7 面试准备

常见面试题

  1. 什么是CAP理论?如何权衡?
  2. 什么是BASE理论?
  3. 什么是分布式事务?有哪些解决方案?
  4. 什么是分布式锁?如何实现?
  5. Paxos和Raft的区别?

项目经验准备

准备一个使用分布式技术的项目: - 使用的分布式技术 - 遇到的挑战 - 解决方案 - 项目成果

2.8 总结

本章介绍了分布式系统的基础理论,包括CAP理论、分布式一致性、分布式事务和分布式锁。这些是分布式系统设计的核心知识。

关键要点

  1. CAP定理指出一致性、可用性、分区容错性只能同时满足两个
  2. BASE理论强调最终一致性
  3. 分布式一致性协议包括2PC、3PC、Paxos、Raft
  4. 分布式事务解决方案包括2PC、TCC、Saga、本地消息表、事务消息
  5. 分布式锁可以使用Redis、ZooKeeper等实现

下一步

下一章将深入学习数据库架构设计,包括关系型数据库、NoSQL、分库分表等内容。