跳转至

第4章:缓存架构设计

缓存架构设计

4.1 缓存概述

什么是缓存

缓存是位于应用和数据源之间的高速存储层,用于存储经常访问的数据,减少对数据源的访问次数,提高系统性能。

缓存的作用

  1. 提高性能:减少数据访问延迟
  2. 降低负载:减少数据库压力
  3. 提高并发:支持更高的并发量
  4. 降低成本:减少昂贵资源的使用

缓存的类型

  1. 本地缓存:应用内存中的缓存
  2. 分布式缓存:独立的缓存服务
  3. CDN缓存:内容分发网络缓存
  4. 数据库缓存:数据库自带的缓存

4.2 缓存策略

4.2.1 Cache-Aside(旁路缓存)

流程: 1. 读取时先查缓存 2. 缓存命中直接返回 3. 缓存未命中查数据库 4. 将数据写入缓存

优点: - 实现简单 - 缓存失效由应用控制

缺点: - 首次访问慢 - 缓存更新需要应用处理

Python
import json
import redis

r = redis.Redis(host='localhost', port=6379, db=0)

def get_user(user_id):
    # 先查缓存
    cache_key = f"user:{user_id}"
    cached_user = r.get(cache_key)
    if cached_user:
        return json.loads(cached_user)

    # 缓存未命中,查数据库
    user = db.query("SELECT * FROM users WHERE id = %s", user_id)

    # 写入缓存
    r.setex(cache_key, 3600, json.dumps(user))

    return user

4.2.2 Read-Through(读穿透)

流程: 1. 应用请求缓存 2. 缓存未命中时,缓存负责从数据库加载 3. 缓存返回数据

优点: - 应用代码简单 - 缓存一致性更好

缺点: - 缓存逻辑复杂 - 不灵活

Python
class CacheWithLoader:
    def __init__(self, redis_client, db):
        self.redis = redis_client
        self.db = db

    def get(self, key, loader_func):
        # 先查缓存
        value = self.redis.get(key)
        if value:
            return json.loads(value)

        # 缓存未命中,使用loader函数加载数据
        value = loader_func()

        # 写入缓存
        self.redis.setex(key, 3600, json.dumps(value))

        return value

# 使用示例
cache = CacheWithLoader(r, db)

def load_user(user_id):
    return db.query("SELECT * FROM users WHERE id = %s", user_id)

user = cache.get(f"user:{user_id}", lambda: load_user(user_id))

4.2.3 Write-Through(写穿透)

流程: 1. 应用写入缓存 2. 缓存同步写入数据库 3. 返回成功

优点: - 数据一致性高 - 缓存和数据库同步

缺点: - 写入性能较差 - 实现复杂

Python
def update_user(user_id, data):
    # 先更新数据库
    db.execute(
        "UPDATE users SET name = %s WHERE id = %s",
        (data['name'], user_id)
    )

    # 同步更新缓存
    cache_key = f"user:{user_id}"
    r.setex(cache_key, 3600, json.dumps(data))

    return True

4.2.4 Write-Behind(写回)

流程: 1. 应用写入缓存 2. 缓存异步写入数据库 3. 返回成功

优点: - 写入性能高 - 减少数据库压力

缺点: - 数据一致性差 - 可能丢失数据

Python
import threading
from queue import Queue

write_queue = Queue()

def async_write_worker():
    while True:
        task = write_queue.get()
        try:  # try/except捕获异常
            db.execute(task['sql'], task['params'])
        except Exception as e:
            # 处理失败,重新入队
            write_queue.put(task)
        write_queue.task_done()

# 启动异步写入线程
threading.Thread(target=async_write_worker, daemon=True).start()

def update_user_async(user_id, data):
    # 更新缓存
    cache_key = f"user:{user_id}"
    r.setex(cache_key, 3600, json.dumps(data))

    # 异步写入数据库
    write_queue.put({
        'sql': "UPDATE users SET name = %s WHERE id = %s",
        'params': (data['name'], user_id)
    })

    return True

4.3 缓存一致性

4.3.1 一致性问题

缓存一致性是指缓存和数据库数据保持一致。由于缓存和数据库是独立的系统,可能出现不一致的情况。

不一致的场景: 1. 更新数据库后缓存未更新 2. 缓存失效后数据库已更新 3. 多个节点缓存不一致

4.3.2 一致性解决方案

更新缓存策略

先更新数据库,再更新缓存

Python
def update_user(user_id, data):
    # 先更新数据库
    db.execute(
        "UPDATE users SET name = %s WHERE id = %s",
        (data['name'], user_id)
    )

    # 再更新缓存
    cache_key = f"user:{user_id}"
    r.setex(cache_key, 3600, json.dumps(data))

问题:两个操作之间可能有其他请求读取到旧数据

先删除缓存,再更新数据库

Python
def update_user(user_id, data):
    # 先删除缓存
    cache_key = f"user:{user_id}"
    r.delete(cache_key)

    # 再更新数据库
    db.execute(
        "UPDATE users SET name = %s WHERE id = %s",
        (data['name'], user_id)
    )

问题:删除缓存后,更新数据库前,可能有请求读取到旧数据并写入缓存

延时双删

Python
import time

def update_user(user_id, data):
    # 第一次删除缓存
    cache_key = f"user:{user_id}"
    r.delete(cache_key)

    # 更新数据库
    db.execute(
        "UPDATE users SET name = %s WHERE id = %s",
        (data['name'], user_id)
    )

    # 延时后再次删除缓存
    time.sleep(1)
    r.delete(cache_key)

订阅Binlog

通过订阅数据库的Binlog,在数据变更时更新缓存。

Python
# 使用canal订阅MySQL Binlog
from canal.client import Client
from canal.protocol import EntryProtocol

client = Client()
client.connect(host='127.0.0.1', port=11111)
client.subscribe('test_db', '.*', '.*')

def handle_binlog(entry):
    if entry.entry_type == EntryProtocol.ENTRYTYPE_ROWDATA:
        # 解析RowData
        for row in entry.parse():
            if row.event_type == 'UPDATE':
                # 更新缓存
                cache_key = f"user:{row.after_values['id']}"
                r.setex(cache_key, 3600, json.dumps(row.after_values))

while True:
    message = client.get(100)
    if message:
        for entry in message.entries:
            handle_binlog(entry)

4.4 缓存问题

4.4.1 缓存穿透

问题:查询不存在的数据,导致请求直接穿透到数据库。

解决方案

  1. 缓存空值

    Python
    def get_user(user_id):
        cache_key = f"user:{user_id}"
        cached_user = r.get(cache_key)
    
        if cached_user:
            if cached_user == b'NULL':
                return None
            return json.loads(cached_user)
    
        # 查数据库
        user = db.query("SELECT * FROM users WHERE id = %s", user_id)
    
        if not user:
            # 缓存空值
            r.setex(cache_key, 60, 'NULL')
            return None
    
        r.setex(cache_key, 3600, json.dumps(user))
        return user
    

  2. 布隆过滤器

    Python
    from pybloom_live import BloomFilter
    
    # 初始化布隆过滤器
    user_bloom = BloomFilter(capacity=1000000, error_rate=0.001)
    
    # 预加载所有用户ID
    all_user_ids = db.query("SELECT id FROM users")
    for user_id in all_user_ids:
        user_bloom.add(user_id)
    
    def get_user(user_id):
        # 先检查布隆过滤器
        if user_id not in user_bloom:
            return None
    
        cache_key = f"user:{user_id}"
        cached_user = r.get(cache_key)
        if cached_user:
            return json.loads(cached_user)
    
        user = db.query("SELECT * FROM users WHERE id = %s", user_id)
        r.setex(cache_key, 3600, json.dumps(user))
        return user
    

4.4.2 缓存击穿

问题:热点数据过期,大量请求同时穿透到数据库。

解决方案

  1. 互斥锁

    Python
    import threading
    
    # ⚠️ 注意:threading.Lock() 仅在单进程内有效。
    # 生产环境中多进程/多实例部署时,请使用分布式锁(如 Redis SETNX / Redlock)来防止缓存击穿。
    lock = threading.Lock()
    
    def get_user(user_id):
        cache_key = f"user:{user_id}"
        cached_user = r.get(cache_key)
        if cached_user:
            return json.loads(cached_user)
    
        # 获取锁
        with lock:
            # 再次检查缓存(双重检查)
            cached_user = r.get(cache_key)
            if cached_user:
                return json.loads(cached_user)
    
            # 查数据库
            user = db.query("SELECT * FROM users WHERE id = %s", user_id)
            r.setex(cache_key, 3600, json.dumps(user))
            return user
    

  2. 逻辑过期

    Python
    def get_user(user_id):
        cache_key = f"user:{user_id}"
        cached_data = r.get(cache_key)
    
        if cached_data:
            data = json.loads(cached_data)
            # 检查是否过期
            if time.time() < data['expire_time']:
                return data['user']
    
            # 过期但返回旧数据
            # 异步更新缓存
            threading.Thread(  # 线程池/多线程:并发执行任务
                target=update_cache_async,
                args=(user_id,)
            ).start()
    
            return data['user']
    
        # 缓存不存在,查数据库
        user = db.query("SELECT * FROM users WHERE id = %s", user_id)
        r.setex(cache_key, 3600, json.dumps({
            'user': user,
            'expire_time': time.time() + 3600
        }))
        return user
    

4.4.3 缓存雪崩

问题:大量缓存同时失效,导致大量请求穿透到数据库。

解决方案

  1. 随机过期时间

    Python
    import random
    
    def set_cache_with_random_ttl(key, value, base_ttl):
        # 随机过期时间
        random_ttl = base_ttl + random.randint(-300, 300)
        r.setex(key, random_ttl, json.dumps(value))
    

  2. 缓存预热

    Python
    def warm_up_cache():
        # 启动时加载热点数据
        hot_users = db.query("""
            SELECT * FROM users
            WHERE last_login > DATE_SUB(NOW(), INTERVAL 7 DAY)
            LIMIT 1000
        """)
    
        for user in hot_users:
            cache_key = f"user:{user['id']}"
            r.setex(cache_key, 3600, json.dumps(user))
    

  3. 多级缓存

    Python
    # L1缓存:本地缓存
    local_cache = {}
    
    # L2缓存:Redis
    def get_user(user_id):
        # 先查L1缓存
        if user_id in local_cache:
            return local_cache[user_id]
    
        # 再查L2缓存
        cache_key = f"user:{user_id}"
        cached_user = r.get(cache_key)
        if cached_user:
            user = json.loads(cached_user)
            local_cache[user_id] = user
            return user
    
        # 查数据库
        user = db.query("SELECT * FROM users WHERE id = %s", user_id)
    
        # 写入L1和L2缓存
        local_cache[user_id] = user
        r.setex(cache_key, 3600, json.dumps(user))
    
        return user
    

4.5 多级缓存

4.5.1 多级缓存架构

Text Only
应用 -> L1缓存(本地) -> L2缓存(Redis) -> 数据库

4.5.2 多级缓存实现

Python
import json
from collections import OrderedDict

class MultiLevelCache:
    def __init__(self, redis_client, db, l1_max_size=1000):
        self.redis = redis_client
        self.db = db
        # L1缓存:使用OrderedDict实现简单LRU
        self.l1_cache = OrderedDict()
        self.l1_max_size = l1_max_size

    def get(self, key, loader_func=None):
        # L1缓存
        value = self._get_l1(key)
        if value is not None:
            return value

        # L2缓存
        value = self._get_l2(key)
        if value is not None:
            self._set_l1(key, value)
            return value

        # 数据库
        if loader_func:
            value = loader_func()
            self._set_l2(key, value)
            self._set_l1(key, value)
            return value

        return None

    def _get_l1(self, key):
        # 从本地缓存获取
        if key in self.l1_cache:
            self.l1_cache.move_to_end(key)  # 标记为最近使用
            return self.l1_cache[key]
        return None

    def _set_l1(self, key, value):
        # 设置本地缓存(LRU淘汰)
        if key in self.l1_cache:
            self.l1_cache.move_to_end(key)
        self.l1_cache[key] = value
        if len(self.l1_cache) > self.l1_max_size:
            self.l1_cache.popitem(last=False)  # 淘汰最久未使用的

    def _get_l2(self, key):
        # 从Redis获取
        value = self.redis.get(key)
        if value:
            return json.loads(value)  # json.loads将JSON字符串转为Python对象
        return None

    def _set_l2(self, key, value):
        # 设置Redis缓存
        self.redis.setex(key, 3600, json.dumps(value))

# 使用示例
cache = MultiLevelCache(r, db)

def load_user(user_id):
    return db.query("SELECT * FROM users WHERE id = %s", user_id)

user = cache.get(f"user:{user_id}", lambda: load_user(user_id))  # lambda匿名函数:简洁的单行函数

4.6 缓存预热

4.6.1 预热策略

  1. 启动时预热:系统启动时加载热点数据
  2. 定时预热:定时更新缓存
  3. 按需预热:根据访问模式预热

4.6.2 预热实现

Python
import schedule
import time

def warm_up_hot_data():
    # 加载热点用户
    hot_users = db.query("""
        SELECT * FROM users
        WHERE last_login > DATE_SUB(NOW(), INTERVAL 7 DAY)
        LIMIT 1000
    """)

    for user in hot_users:
        cache_key = f"user:{user['id']}"
        r.setex(cache_key, 3600, json.dumps(user))  # json.dumps将Python对象转为JSON字符串

    # 加载热点商品
    hot_products = db.query("""
        SELECT * FROM products
        WHERE sales_count > 1000
        LIMIT 1000
    """)

    for product in hot_products:
        cache_key = f"product:{product['id']}"
        r.setex(cache_key, 3600, json.dumps(product))

# 启动时预热
warm_up_hot_data()

# 定时预热(每天凌晨2点)
schedule.every().day.at("02:00").do(warm_up_hot_data)

while True:
    schedule.run_pending()
    time.sleep(60)

4.7 实战练习

练习1:实现一个多级缓存系统

实现一个包含L1和L2缓存的多级缓存系统: 1. L1缓存使用本地内存 2. L2缓存使用Redis 3. 实现缓存穿透、击穿、雪崩的防护

练习2:设计一个缓存更新策略

为一个电商系统设计缓存更新策略: 1. 商品信息缓存 2. 用户信息缓存 3. 购物车缓存

练习3:实现缓存预热

为一个新闻网站实现缓存预热功能: 1. 识别热点新闻 2. 预加载到缓存 3. 定时更新缓存

4.8 面试准备

常见面试题

  1. 什么是缓存穿透?如何解决?
  2. 什么是缓存击穿?如何解决?
  3. 什么是缓存雪崩?如何解决?
  4. 如何保证缓存和数据库的一致性?
  5. 多级缓存的优缺点?

项目经验准备

准备一个缓存优化项目: - 优化前的问题 - 优化方案 - 优化效果

4.9 总结

本章介绍了缓存架构设计,包括缓存策略、缓存一致性、缓存问题和多级缓存。缓存是提高系统性能的重要手段,需要合理设计。

关键要点

  1. 缓存策略包括Cache-Aside、Read-Through、Write-Through、Write-Behind
  2. 缓存一致性是缓存设计的核心问题
  3. 缓存穿透、击穿、雪崩是常见问题,需要针对性解决
  4. 多级缓存可以进一步提高性能
  5. 缓存预热可以提高系统启动后的性能

下一步

下一章将深入学习消息队列架构,包括消息队列选型、消息可靠性等内容。