第4章:缓存架构设计¶
4.1 缓存概述¶
什么是缓存¶
缓存是位于应用和数据源之间的高速存储层,用于存储经常访问的数据,减少对数据源的访问次数,提高系统性能。
缓存的作用¶
- 提高性能:减少数据访问延迟
- 降低负载:减少数据库压力
- 提高并发:支持更高的并发量
- 降低成本:减少昂贵资源的使用
缓存的类型¶
- 本地缓存:应用内存中的缓存
- 分布式缓存:独立的缓存服务
- CDN缓存:内容分发网络缓存
- 数据库缓存:数据库自带的缓存
4.2 缓存策略¶
4.2.1 Cache-Aside(旁路缓存)¶
流程: 1. 读取时先查缓存 2. 缓存命中直接返回 3. 缓存未命中查数据库 4. 将数据写入缓存
优点: - 实现简单 - 缓存失效由应用控制
缺点: - 首次访问慢 - 缓存更新需要应用处理
import json
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
def get_user(user_id):
# 先查缓存
cache_key = f"user:{user_id}"
cached_user = r.get(cache_key)
if cached_user:
return json.loads(cached_user)
# 缓存未命中,查数据库
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
# 写入缓存
r.setex(cache_key, 3600, json.dumps(user))
return user
4.2.2 Read-Through(读穿透)¶
流程: 1. 应用请求缓存 2. 缓存未命中时,缓存负责从数据库加载 3. 缓存返回数据
优点: - 应用代码简单 - 缓存一致性更好
缺点: - 缓存逻辑复杂 - 不灵活
class CacheWithLoader:
def __init__(self, redis_client, db):
self.redis = redis_client
self.db = db
def get(self, key, loader_func):
# 先查缓存
value = self.redis.get(key)
if value:
return json.loads(value)
# 缓存未命中,使用loader函数加载数据
value = loader_func()
# 写入缓存
self.redis.setex(key, 3600, json.dumps(value))
return value
# 使用示例
cache = CacheWithLoader(r, db)
def load_user(user_id):
return db.query("SELECT * FROM users WHERE id = %s", user_id)
user = cache.get(f"user:{user_id}", lambda: load_user(user_id))
4.2.3 Write-Through(写穿透)¶
流程: 1. 应用写入缓存 2. 缓存同步写入数据库 3. 返回成功
优点: - 数据一致性高 - 缓存和数据库同步
缺点: - 写入性能较差 - 实现复杂
def update_user(user_id, data):
# 先更新数据库
db.execute(
"UPDATE users SET name = %s WHERE id = %s",
(data['name'], user_id)
)
# 同步更新缓存
cache_key = f"user:{user_id}"
r.setex(cache_key, 3600, json.dumps(data))
return True
4.2.4 Write-Behind(写回)¶
流程: 1. 应用写入缓存 2. 缓存异步写入数据库 3. 返回成功
优点: - 写入性能高 - 减少数据库压力
缺点: - 数据一致性差 - 可能丢失数据
import threading
from queue import Queue
write_queue = Queue()
def async_write_worker():
while True:
task = write_queue.get()
try: # try/except捕获异常
db.execute(task['sql'], task['params'])
except Exception as e:
# 处理失败,重新入队
write_queue.put(task)
write_queue.task_done()
# 启动异步写入线程
threading.Thread(target=async_write_worker, daemon=True).start()
def update_user_async(user_id, data):
# 更新缓存
cache_key = f"user:{user_id}"
r.setex(cache_key, 3600, json.dumps(data))
# 异步写入数据库
write_queue.put({
'sql': "UPDATE users SET name = %s WHERE id = %s",
'params': (data['name'], user_id)
})
return True
4.3 缓存一致性¶
4.3.1 一致性问题¶
缓存一致性是指缓存和数据库数据保持一致。由于缓存和数据库是独立的系统,可能出现不一致的情况。
不一致的场景: 1. 更新数据库后缓存未更新 2. 缓存失效后数据库已更新 3. 多个节点缓存不一致
4.3.2 一致性解决方案¶
更新缓存策略¶
先更新数据库,再更新缓存:
def update_user(user_id, data):
# 先更新数据库
db.execute(
"UPDATE users SET name = %s WHERE id = %s",
(data['name'], user_id)
)
# 再更新缓存
cache_key = f"user:{user_id}"
r.setex(cache_key, 3600, json.dumps(data))
问题:两个操作之间可能有其他请求读取到旧数据
先删除缓存,再更新数据库:
def update_user(user_id, data):
# 先删除缓存
cache_key = f"user:{user_id}"
r.delete(cache_key)
# 再更新数据库
db.execute(
"UPDATE users SET name = %s WHERE id = %s",
(data['name'], user_id)
)
问题:删除缓存后,更新数据库前,可能有请求读取到旧数据并写入缓存
延时双删:
import time
def update_user(user_id, data):
# 第一次删除缓存
cache_key = f"user:{user_id}"
r.delete(cache_key)
# 更新数据库
db.execute(
"UPDATE users SET name = %s WHERE id = %s",
(data['name'], user_id)
)
# 延时后再次删除缓存
time.sleep(1)
r.delete(cache_key)
订阅Binlog¶
通过订阅数据库的Binlog,在数据变更时更新缓存。
# 使用canal订阅MySQL Binlog
from canal.client import Client
from canal.protocol import EntryProtocol
client = Client()
client.connect(host='127.0.0.1', port=11111)
client.subscribe('test_db', '.*', '.*')
def handle_binlog(entry):
if entry.entry_type == EntryProtocol.ENTRYTYPE_ROWDATA:
# 解析RowData
for row in entry.parse():
if row.event_type == 'UPDATE':
# 更新缓存
cache_key = f"user:{row.after_values['id']}"
r.setex(cache_key, 3600, json.dumps(row.after_values))
while True:
message = client.get(100)
if message:
for entry in message.entries:
handle_binlog(entry)
4.4 缓存问题¶
4.4.1 缓存穿透¶
问题:查询不存在的数据,导致请求直接穿透到数据库。
解决方案:
-
缓存空值:
Pythondef get_user(user_id): cache_key = f"user:{user_id}" cached_user = r.get(cache_key) if cached_user: if cached_user == b'NULL': return None return json.loads(cached_user) # 查数据库 user = db.query("SELECT * FROM users WHERE id = %s", user_id) if not user: # 缓存空值 r.setex(cache_key, 60, 'NULL') return None r.setex(cache_key, 3600, json.dumps(user)) return user -
布隆过滤器:
Pythonfrom pybloom_live import BloomFilter # 初始化布隆过滤器 user_bloom = BloomFilter(capacity=1000000, error_rate=0.001) # 预加载所有用户ID all_user_ids = db.query("SELECT id FROM users") for user_id in all_user_ids: user_bloom.add(user_id) def get_user(user_id): # 先检查布隆过滤器 if user_id not in user_bloom: return None cache_key = f"user:{user_id}" cached_user = r.get(cache_key) if cached_user: return json.loads(cached_user) user = db.query("SELECT * FROM users WHERE id = %s", user_id) r.setex(cache_key, 3600, json.dumps(user)) return user
4.4.2 缓存击穿¶
问题:热点数据过期,大量请求同时穿透到数据库。
解决方案:
-
互斥锁:
Pythonimport threading # ⚠️ 注意:threading.Lock() 仅在单进程内有效。 # 生产环境中多进程/多实例部署时,请使用分布式锁(如 Redis SETNX / Redlock)来防止缓存击穿。 lock = threading.Lock() def get_user(user_id): cache_key = f"user:{user_id}" cached_user = r.get(cache_key) if cached_user: return json.loads(cached_user) # 获取锁 with lock: # 再次检查缓存(双重检查) cached_user = r.get(cache_key) if cached_user: return json.loads(cached_user) # 查数据库 user = db.query("SELECT * FROM users WHERE id = %s", user_id) r.setex(cache_key, 3600, json.dumps(user)) return user -
逻辑过期:
Pythondef get_user(user_id): cache_key = f"user:{user_id}" cached_data = r.get(cache_key) if cached_data: data = json.loads(cached_data) # 检查是否过期 if time.time() < data['expire_time']: return data['user'] # 过期但返回旧数据 # 异步更新缓存 threading.Thread( # 线程池/多线程:并发执行任务 target=update_cache_async, args=(user_id,) ).start() return data['user'] # 缓存不存在,查数据库 user = db.query("SELECT * FROM users WHERE id = %s", user_id) r.setex(cache_key, 3600, json.dumps({ 'user': user, 'expire_time': time.time() + 3600 })) return user
4.4.3 缓存雪崩¶
问题:大量缓存同时失效,导致大量请求穿透到数据库。
解决方案:
-
随机过期时间:
-
缓存预热:
-
多级缓存:
Python# L1缓存:本地缓存 local_cache = {} # L2缓存:Redis def get_user(user_id): # 先查L1缓存 if user_id in local_cache: return local_cache[user_id] # 再查L2缓存 cache_key = f"user:{user_id}" cached_user = r.get(cache_key) if cached_user: user = json.loads(cached_user) local_cache[user_id] = user return user # 查数据库 user = db.query("SELECT * FROM users WHERE id = %s", user_id) # 写入L1和L2缓存 local_cache[user_id] = user r.setex(cache_key, 3600, json.dumps(user)) return user
4.5 多级缓存¶
4.5.1 多级缓存架构¶
4.5.2 多级缓存实现¶
import json
from collections import OrderedDict
class MultiLevelCache:
def __init__(self, redis_client, db, l1_max_size=1000):
self.redis = redis_client
self.db = db
# L1缓存:使用OrderedDict实现简单LRU
self.l1_cache = OrderedDict()
self.l1_max_size = l1_max_size
def get(self, key, loader_func=None):
# L1缓存
value = self._get_l1(key)
if value is not None:
return value
# L2缓存
value = self._get_l2(key)
if value is not None:
self._set_l1(key, value)
return value
# 数据库
if loader_func:
value = loader_func()
self._set_l2(key, value)
self._set_l1(key, value)
return value
return None
def _get_l1(self, key):
# 从本地缓存获取
if key in self.l1_cache:
self.l1_cache.move_to_end(key) # 标记为最近使用
return self.l1_cache[key]
return None
def _set_l1(self, key, value):
# 设置本地缓存(LRU淘汰)
if key in self.l1_cache:
self.l1_cache.move_to_end(key)
self.l1_cache[key] = value
if len(self.l1_cache) > self.l1_max_size:
self.l1_cache.popitem(last=False) # 淘汰最久未使用的
def _get_l2(self, key):
# 从Redis获取
value = self.redis.get(key)
if value:
return json.loads(value) # json.loads将JSON字符串转为Python对象
return None
def _set_l2(self, key, value):
# 设置Redis缓存
self.redis.setex(key, 3600, json.dumps(value))
# 使用示例
cache = MultiLevelCache(r, db)
def load_user(user_id):
return db.query("SELECT * FROM users WHERE id = %s", user_id)
user = cache.get(f"user:{user_id}", lambda: load_user(user_id)) # lambda匿名函数:简洁的单行函数
4.6 缓存预热¶
4.6.1 预热策略¶
- 启动时预热:系统启动时加载热点数据
- 定时预热:定时更新缓存
- 按需预热:根据访问模式预热
4.6.2 预热实现¶
import schedule
import time
def warm_up_hot_data():
# 加载热点用户
hot_users = db.query("""
SELECT * FROM users
WHERE last_login > DATE_SUB(NOW(), INTERVAL 7 DAY)
LIMIT 1000
""")
for user in hot_users:
cache_key = f"user:{user['id']}"
r.setex(cache_key, 3600, json.dumps(user)) # json.dumps将Python对象转为JSON字符串
# 加载热点商品
hot_products = db.query("""
SELECT * FROM products
WHERE sales_count > 1000
LIMIT 1000
""")
for product in hot_products:
cache_key = f"product:{product['id']}"
r.setex(cache_key, 3600, json.dumps(product))
# 启动时预热
warm_up_hot_data()
# 定时预热(每天凌晨2点)
schedule.every().day.at("02:00").do(warm_up_hot_data)
while True:
schedule.run_pending()
time.sleep(60)
4.7 实战练习¶
练习1:实现一个多级缓存系统¶
实现一个包含L1和L2缓存的多级缓存系统: 1. L1缓存使用本地内存 2. L2缓存使用Redis 3. 实现缓存穿透、击穿、雪崩的防护
练习2:设计一个缓存更新策略¶
为一个电商系统设计缓存更新策略: 1. 商品信息缓存 2. 用户信息缓存 3. 购物车缓存
练习3:实现缓存预热¶
为一个新闻网站实现缓存预热功能: 1. 识别热点新闻 2. 预加载到缓存 3. 定时更新缓存
4.8 面试准备¶
常见面试题¶
- 什么是缓存穿透?如何解决?
- 什么是缓存击穿?如何解决?
- 什么是缓存雪崩?如何解决?
- 如何保证缓存和数据库的一致性?
- 多级缓存的优缺点?
项目经验准备¶
准备一个缓存优化项目: - 优化前的问题 - 优化方案 - 优化效果
4.9 总结¶
本章介绍了缓存架构设计,包括缓存策略、缓存一致性、缓存问题和多级缓存。缓存是提高系统性能的重要手段,需要合理设计。
关键要点¶
- 缓存策略包括Cache-Aside、Read-Through、Write-Through、Write-Behind
- 缓存一致性是缓存设计的核心问题
- 缓存穿透、击穿、雪崩是常见问题,需要针对性解决
- 多级缓存可以进一步提高性能
- 缓存预热可以提高系统启动后的性能
下一步¶
下一章将深入学习消息队列架构,包括消息队列选型、消息可靠性等内容。