🏗️ 经典系统设计案例¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
学习时间:8小时 | 难度:⭐⭐⭐⭐⭐ 挑战 | 前置知识:01~05全部章节
🎯 本章目标¶
- 用四步法(需求澄清→高层设计→详细设计→扩展优化)完整分析6个经典系统
- 理解每个系统的核心难点和解决方案
- 积累面试中系统设计题的实战经验
📋 目录¶
案例一:短链接系统(TinyURL)¶
Step 1:需求澄清¶
功能需求: - 给定一个长URL,生成一个短链接 - 用户访问短链接时,重定向到原始URL - 短链接可设置过期时间(可选) - 自定义短链接(可选)
非功能需求: - 高可用:系统7×24可用 - 低延迟:重定向响应时间 < 100ms - 短链接不可预测
容量估算:
假设:
每天新建短链接: 100M (1亿)
读写比: 100:1 → 每天读取: 10B (100亿)
QPS:
写入 QPS = 100M / 86400 ≈ 1160/s
读取 QPS = 10B / 86400 ≈ 115,000/s
峰值读QPS ≈ 230,000/s
存储:
假设每条记录500字节,保存5年
总记录 = 100M × 365 × 5 = 182.5B ≈ 200B
存储 = 200B × 500B = 100TB
短链接长度:
用[a-zA-Z0-9]共62个字符
62^6 ≈ 56.8B(568亿,足够)
62^7 ≈ 3.5T(3.5万亿,非常充裕)
选择7位字符的短链接
Step 2:高层设计¶
┌──────────────────────────┐
│ Client │
│ (Browser / App) │
└────────┬─────────────────┘
│
┌────────▼─────────────────┐
│ 负载均衡器 │
│ (Nginx/LB) │
└────┬──────────┬───────────┘
│ │
┌─────────▼──┐ ┌───▼──────────┐
│ 写入服务 │ │ 重定向服务 │
│ POST /url │ │ GET /:short │
└─────┬──────┘ └───┬──────────┘
│ │
┌──────────▼──────────────▼──────────┐
│ 缓存层 │
│ (Redis) │
└──────────────┬─────────────────────┘
│
┌──────────────▼─────────────────────┐
│ 数据库 │
│ (MySQL / DynamoDB) │
└────────────────────────────────────┘
核心API:
POST /api/v1/urls
Request: { "long_url": "https://...", "expiry": "2025-12-31" }
Response: { "short_url": "https://tiny.url/aB3xK9z" }
GET /:shortCode
Response: 301/302 Redirect → original long URL
301 vs 302重定向:
| 状态码 | 含义 | 浏览器行为 | 适用 |
|---|---|---|---|
| 301 | 永久重定向 | 浏览器缓存,后续不再请求服务端 | 节省服务端压力 |
| 302 | 临时重定向 | 每次都请求服务端 | 需要统计点击量 |
Step 3:详细设计¶
短链接生成方案:
| 方案 | 原理 | 优点 | 缺点 |
|---|---|---|---|
| Hash截取 | 对长URL做MD5/SHA256,取前7位 | 简单 | 冲突、相同URL不同短码 |
| 62进制编码 | 用自增ID转62进制 | 无冲突 | ID可预测 |
| 预生成 | 提前生成所有短码存入库 | 性能好 | 存储大 |
| 随机+去重 | 随机生成+数据库去重 | 简单 | 高并发下冲突多 |
推荐:62进制编码
import string
CHARS = string.digits + string.ascii_uppercase + string.ascii_lowercase # 0-9A-Za-z, 共62个
def encode_base62(num: int) -> str:
"""将数字转换为62进制字符串"""
if num == 0:
return CHARS[0]
result = []
while num > 0:
result.append(CHARS[num % 62])
num //= 62
return ''.join(reversed(result))
def decode_base62(s: str) -> int:
"""将62进制字符串还原为数字"""
num = 0
for char in s:
num = num * 62 + CHARS.index(char)
return num
# 示例
print(encode_base62(123456789)) # "8M0kX"
print(decode_base62("8M0kX")) # 123456789
布隆过滤器去重:
检查短链接是否已存在:
直接查数据库 → 太慢
用布隆过滤器(Bloom Filter):
- 空间高效(10亿条数据只需 ~1.2GB)
- 查询O(1)
- 可能有假阳性(说存在可能不存在)
- 不会有假阴性(说不存在一定不存在)
流程:
1. 生成短码
2. 布隆过滤器检查 → 不存在 → 直接写入
3. 布隆过滤器检查 → 可能存在 → 查数据库确认
4. 确实存在 → 重新生成
import mmh3
from bitarray import bitarray
class BloomFilter:
def __init__(self, size=1000000, hash_count=7):
self.size = size
self.hash_count = hash_count
self.bit_array = bitarray(size)
self.bit_array.setall(0)
def add(self, item):
for i in range(self.hash_count):
idx = mmh3.hash(item, i) % self.size
self.bit_array[idx] = 1
def contains(self, item):
for i in range(self.hash_count):
idx = mmh3.hash(item, i) % self.size
if not self.bit_array[idx]:
return False
return True # 可能存在(有假阳性)
缓存策略:
热门短链接缓存在Redis中
GET /aB3xK9z
│
▼
查Redis → 命中 → 302重定向 (< 5ms)
│
未命中
│
▼
查数据库 → 找到 → 写入Redis + 302重定向
│
未找到 → 404
Step 4:扩展优化¶
数据库分片:
按短码Hash分片到多个数据库实例
短码 → Hash(短码) % N → 分片编号
全局ID生成:
使用Snowflake算法或独立ID生成服务
避免自增ID在分布式环境下冲突
安全优化:
- 防止恶意URL:对长URL做安全检查
- 限流:每个用户/IP限制创建频率
- 短码不可预测:ID加上随机salt后编码
监控:
- 点击量统计
- 热门链接排行
- 异常访问检测
案例二:即时通讯系统(IM)¶
Step 1:需求澄清¶
功能需求: - 一对一聊天(私聊) - 群聊(支持500人群) - 消息类型:文字、图片、语音、文件 - 已读/未读状态、在线状态 - 消息历史记录、离线消息
非功能需求: - 消息延迟 < 200ms - 消息不丢失 - 消息顺序保证 - 支持 1000万 DAU
容量估算:
假设:
DAU: 10M
每用户每天发40条消息
每天消息总量: 400M
QPS:
写入 QPS = 400M / 86400 ≈ 4,600/s
峰值 QPS ≈ 15,000/s
存储:
平均消息大小: 100字节
每天: 400M × 100B = 40GB
每年: 40GB × 365 = 14.6TB
Step 2:高层设计¶
┌─────────┐ WebSocket ┌──────────────┐
│ Client A │ ←───────────→ │ 连接网关 │
└─────────┘ 长连接 │ (Gateway) │
└──────┬───────┘
┌─────────┐ WebSocket │
│ Client B │ ←───────────→ ┌─────▼────────┐
└─────────┘ │ 连接网关 │
│ (Gateway) │
└──────┬───────┘
│
┌──────▼───────┐
│ 消息路由服务 │
│ (Router) │
└──────┬───────┘
│
┌──────────────────┼────────────────┐
│ │ │
┌──────▼───────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ 消息存储服务 │ │ 推送服务 │ │ 在线状态 │
│ (Storage) │ │ (Push) │ │ (Presence) │
└──────────────┘ └─────────────┘ └─────────────┘
Step 3:详细设计¶
WebSocket长连接管理:
为什么用WebSocket而不是HTTP轮询?
HTTP轮询:
Client → Server: "有新消息吗?" (每秒)
Server → Client: "没有" ← 浪费带宽
Client → Server: "有新消息吗?"
Server → Client: "有一条新消息" ← 有延迟
WebSocket:
Client ←→ Server: 建立长连接
Server → Client: 有消息立即推送 ← 实时、省资源
连接管理:
┌─────────────────────────────────────────┐
│ 连接映射表 (Redis) │
│ │
│ user_001 → gateway_ip:port, conn_id │
│ user_002 → gateway_ip:port, conn_id │
│ user_003 → gateway_ip:port, conn_id │
└─────────────────────────────────────────┘
发消息给user_002:
1. 查连接映射 → 找到user_002在gateway_2
2. 路由消息到gateway_2
3. gateway_2通过WebSocket推送给user_002
消息模型设计:
# 消息结构
class Message:
msg_id: str # 消息唯一ID (snowflake)
conversation_id: str # 会话ID
sender_id: str # 发送者
receiver_id: str # 接收者(单聊)/ 群ID(群聊)
msg_type: int # 1=文本, 2=图片, 3=语音, 4=文件
content: str # 消息内容 / 资源URL
timestamp: int # 发送时间戳
status: int # 0=发送中, 1=已送达, 2=已读
# 会话结构
class Conversation:
conversation_id: str
type: int # 1=单聊, 2=群聊
participants: list # 参与者列表
last_msg_id: str # 最后一条消息ID
last_msg_time: int # 最后消息时间
unread_count: dict # {user_id: count}
消息发送流程(单聊):
A发消息给B:
Client A Gateway Router Storage Gateway Client B
│ │ │ │ │ │
│──WebSocket消息─────→ │ │ │ │ │
│ │──路由请求──→│ │ │ │
│ │ │──存储消息──→ │ │ │
│ │ │←─存储成功───│ │ │
│ │ │──查B在线──→ │ │ │
│ │ │ │ │ │
│ │ │ [B在线] │ │ │
│ │ │──推送给B───────────────→│ │
│ │ │ │ │──推送──→│
│ │ │ │ │←─ACK───│
│ │ │ │ │ │
│ │ │ [B离线] │ │ │
│ │ │──存入离线队列│ │ │
│ │ │ (B上线后拉取)│ │ │
│←─发送成功ACK─────── │ │ │ │ │
│ │ │ │ │ │
已读/未读实现:
已读状态实现:
方案一:每条消息独立状态(准确但存储大)
msg_001: {sender: A, status: read}
msg_002: {sender: A, status: unread}
方案二:读取位点(推荐)
conversation_001: {
user_B_read_seq: 1050 // B已读到消息序号1050
}
未读数 = 会话最新seq - 用户已读seq
未读消息 = seq > 用户已读seq 的所有消息
优势:
- 只存储一个数字
- 更新简单(只需更新序号)
- 不需要逐条标记已读
群聊消息分发:
群聊消息(写扩散 vs 读扩散):
写扩散(Fanout on Write):
A在500人群发消息
→ 写入500个收件箱
优:读取快(每人只查自己收件箱)
劣:写放大(500次写入)
适用:小群
读扩散(Fanout on Read):
A在群发消息
→ 只写入群的消息列表
→ 每人读取时去群消息列表拉取
优:写入快(只写一次)
劣:读取慢(每次要拉群消息+合并)
适用:大群/超大群
推荐:
小群(<100人)→ 写扩散
大群(>=100人)→ 读扩散
Step 4:扩展优化¶
消息可靠性保证:
1. 客户端发送 → 服务端ACK确认
2. 服务端存储 → 持久化后才ACK
3. 离线消息 → 上线后拉取补偿
4. 消息去重 → 幂等性设计(msg_id去重)
消息顺序保证:
单聊:同一会话消息走同一队列
群聊:服务端分配递增序号(seq)
消息搜索:
全文搜索 → Elasticsearch
按时间/会话检索 → 数据库索引
多端同步:
设备A已读 → 同步到设备B
使用消息同步序号(sync_seq)
案例三:Feed流系统¶
Step 1:需求澄清¶
功能需求: - 发布动态(文字、图片、视频) - 关注/取关用户 - 获取关注者的动态(Timeline) - 点赞、评论、转发
非功能需求: - Feed刷新延迟 < 500ms - 支持亿级用户 - 刷Feed顺序按时间倒序
容量估算:
假设:
DAU: 300M
每用户每天平均刷Feed 10次
每次返回20条
每用户平均关注200人
每天发布动态的用户: 10% = 30M
每人每天发1条
QPS:
Feed读QPS = 300M × 10 / 86400 ≈ 35,000/s
峰值 ≈ 100,000/s
Step 2:高层设计¶
┌──────────┐
│ Client │
└────┬─────┘
│
┌────▼─────┐
│ LB │
└────┬─────┘
│
┌─────────────┼─────────────┐
│ │ │
┌──────▼──────┐ ┌───▼─────┐ ┌────▼─────┐
│ 发布服务 │ │Feed服务 │ │ 关系服务 │
│(Post Svc) │ │(Feed Svc)│ │(Social) │
└──────┬──────┘ └───┬─────┘ └────┬─────┘
│ │ │
┌──────▼──────┐ ┌───▼─────┐ ┌────▼─────┐
│ 动态存储 │ │Feed缓存 │ │ 关系存储 │
│(Post DB) │ │(Redis) │ │(Graph DB)│
└─────────────┘ └──────────┘ └──────────┘
Step 3:详细设计¶
三种Feed流模式对比:
| 模式 | 写入时 | 读取时 | 优势 | 劣势 |
|---|---|---|---|---|
| 推模式 | 发布时推送到所有粉丝的收件箱 | 直接读自己收件箱 | 读取快 | 大V发布慢(扇出大) |
| 拉模式 | 只写到自己的发件箱 | 读取时拉取所有关注者动态并合并 | 写入快 | 读取慢(合并200人) |
| 推拉结合 | 推给活跃粉丝,不推给不活跃粉丝 | 活跃用户读收件箱,不活跃用户拉取 | 平衡 | 实现复杂 |
推模式(Fanout on Write)详解:
用户A(10万粉丝)发布一条动态:
A发布动态
│
▼
存储到Post表
│
▼
获取A的所有粉丝列表
│
▼
异步推送到每个粉丝的Feed收件箱(Redis List)
粉丝B的Feed: [A的动态, C的动态, D的动态, ...]
粉丝E的Feed: [A的动态, F的动态, ...]
B刷Feed → 直接读取自己的Feed List → 返回前20条
问题:A有100万粉丝 → 推送100万次 → 延迟高!
拉模式(Fanout on Read)详解:
用户B刷Feed:
B关注了 A, C, D, E, F 共200人
│
▼
获取B的关注列表
│
▼
查询每个关注者最近的动态
A的最近5条, C的最近5条, ... (共1000条)
│
▼
按时间戳合并排序
│
▼
返回Top 20
问题:每次刷Feed都要合并200人的数据 → 慢!
推拉结合方案(推荐):
发布时:
┌─────────────────────────────────┐
│ 用户A发布动态 │
│ │
│ 粉丝 < 阈值(如1万)? │
│ ├── 是 → 推模式(推送到所有粉丝)│
│ └── 否 → 只推送给活跃粉丝 │
│ 不活跃粉丝不推送 │
└─────────────────────────────────┘
读取时:
┌─────────────────────────────────┐
│ 用户B刷Feed │
│ │
│ 1. 读取自己的推送收件箱 │
│ 2. 读取未推送的大V的最新动态 │
│ 3. 合并 + 排序 + 返回Top N │
└─────────────────────────────────┘
活跃用户判定:
最近7天有登录 → 活跃
超过7天未登录 → 不活跃 → 不推送
不活跃用户登录后 → 拉取补齐
class FeedService:
def __init__(self, redis_client, post_db, social_db):
self.redis = redis_client
self.post_db = post_db
self.social_db = social_db
self.ACTIVE_THRESHOLD = 7 * 86400 # 7天
self.BIG_V_THRESHOLD = 10000 # 大V粉丝阈值
def publish(self, user_id: str, post: dict):
"""发布动态"""
# 1. 存储动态
post_id = self.post_db.save(post)
# 2. 获取粉丝列表
followers = self.social_db.get_followers(user_id)
# 3. 判断走推模式还是选择性推送
if len(followers) < self.BIG_V_THRESHOLD:
# 普通用户 → 推送给所有粉丝
self._fanout_to_all(post_id, followers)
else:
# 大V → 只推送给活跃粉丝
active = self._filter_active(followers)
self._fanout_to_all(post_id, active)
def get_feed(self, user_id: str, page: int = 1, size: int = 20):
"""获取Feed"""
# 1. 从推送收件箱读取
inbox_posts = self.redis.lrange(
f"feed:{user_id}", (page-1)*size, page*size-1
)
# 2. 拉取关注的大V最新动态
big_v_followings = self._get_big_v_followings(user_id)
pull_posts = self.post_db.get_recent_posts(big_v_followings)
# 3. 合并、去重、排序
all_posts = self._merge_and_sort(inbox_posts, pull_posts)
return all_posts[:size]
def _fanout_to_all(self, post_id, followers):
"""异步推送到粉丝收件箱"""
for follower_id in followers:
self.redis.lpush(f"feed:{follower_id}", post_id)
self.redis.ltrim(f"feed:{follower_id}", 0, 999) # 保留最近1000条
Step 4:扩展优化¶
排序策略:
时间线排序: 简单按发布时间(Twitter早期)
算法排序: 综合考虑相关性、热度、新鲜度(Instagram、微博)
缓存策略:
热用户Feed缓存在Redis
冷用户上线时现拉取
Feed List最多保留1000条
数据存储:
帖子内容 → MySQL/PostgreSQL
Feed索引 → Redis
社交关系 → 图数据库 / 邻接表
媒体文件 → 对象存储(S3/OSS)
案例四:秒杀系统¶
Step 1:需求澄清¶
功能需求: - 在指定时间开始抢购 - 每个用户只能购买1件 - 库存有限(如1000件) - 下单成功后进入支付流程
非功能需求: - 不能超卖 - 高并发(百万级瞬时请求) - 秒杀不影响正常业务
容量估算:
Step 2:高层设计¶
100万+ 用户
│
┌───────▼───────┐
│ CDN/静态化 │ ← 页面静态资源不走后端
└───────┬───────┘
│
┌───────▼───────┐
│ 接入层过滤 │ ← 验证码、限流、风控
│ (Nginx/LB) │ 过滤大量无效请求
└───────┬───────┘
│ (~10万有效请求)
┌───────▼───────┐
│ 应用层过滤 │ ← 库存预判、排队
│ (秒杀服务) │
└───────┬───────┘
│ (~1万请求)
┌───────▼───────┐
│ Redis库存扣减 │ ← 原子操作扣库存
└───────┬───────┘
│ (~1000成功)
┌───────▼───────┐
│ 下单 + 支付 │ ← 异步处理
└───────────────┘
Step 3:详细设计¶
核心思路:层层削峰
漏斗模型:
100万请求 ─┐
│ CDN缓存 + 前端限制(按钮置灰)
50万请求 ─┤
│ Nginx限流 + 验证码
10万请求 ─┤
│ 应用层排队 + 库存预判
1万请求 ─┤
│ Redis原子扣库存
1000订单 ─┤
│ 消息队列异步下单
1000支付 ─┘
Redis库存预扣(核心):
# Redis Lua脚本 —— 原子性扣减库存
SECKILL_SCRIPT = """
local stock_key = KEYS[1] -- 库存Key
local user_set_key = KEYS[2] -- 已购用户集合Key
local user_id = ARGV[1] -- 用户ID
-- 检查是否已购买
if redis.call('sismember', user_set_key, user_id) == 1 then
return -1 -- 已购买
end
-- 检查库存
local stock = tonumber(redis.call('get', stock_key))
if stock <= 0 then
return 0 -- 库存不足
end
-- 扣减库存
redis.call('decr', stock_key)
-- 记录已购买
redis.call('sadd', user_set_key, user_id)
return 1 -- 成功
"""
import redis
class SeckillService:
def __init__(self):
self.r = redis.Redis()
self.script = self.r.register_script(SECKILL_SCRIPT)
def init_stock(self, item_id: str, stock: int):
"""初始化库存"""
self.r.set(f"seckill:stock:{item_id}", stock)
self.r.delete(f"seckill:users:{item_id}")
def try_seckill(self, item_id: str, user_id: str) -> int:
"""尝试秒杀"""
result = self.script(
keys=[
f"seckill:stock:{item_id}",
f"seckill:users:{item_id}"
],
args=[user_id]
)
if result == 1:
# 秒杀成功 → 发送到消息队列异步创建订单
self._send_to_mq(item_id, user_id)
return 1 # 抢购成功
elif result == 0:
return 0 # 已售罄
else:
return -1 # 已购买过
// Go版本 - 库存预扣
func (s *SeckillService) TrySeckill(itemID, userID string) (int, error) {
script := redis.NewScript(`
local stock = tonumber(redis.call('get', KEYS[1]))
if stock <= 0 then return 0 end
if redis.call('sismember', KEYS[2], ARGV[1]) == 1 then return -1 end
redis.call('decr', KEYS[1])
redis.call('sadd', KEYS[2], ARGV[1])
return 1
`)
stockKey := fmt.Sprintf("seckill:stock:%s", itemID)
usersKey := fmt.Sprintf("seckill:users:%s", itemID)
result, err := script.Run(ctx, s.rdb, []string{stockKey, usersKey}, userID).Int()
if err != nil {
return 0, err
}
if result == 1 {
// 发送到消息队列
s.sendToMQ(itemID, userID)
}
return result, nil
}
异步下单 + 支付:
秒杀成功后的流程:
Redis扣库存成功
│
▼
发送消息到MQ (Kafka/RocketMQ)
│
▼
订单服务消费消息
├── 创建订单(状态:待支付)
├── 扣减数据库库存(最终一致性)
└── 通知用户(WebSocket/推送)
│
▼
用户15分钟内完成支付
├── 支付成功 → 订单完成
└── 超时未支付 →
├── 取消订单
├── 恢复Redis库存
└── 恢复数据库库存
防刷策略:
| 层级 | 策略 | 说明 |
|---|---|---|
| 前端 | 按钮置灰 + 倒计时 | 防止提前/重复点击 |
| 前端 | 动态URL | 秒杀开始时才获取真实接口地址 |
| 网关 | 验证码 | 人机验证,拦截脚本 |
| 网关 | IP限流 | 单IP每秒最多N次 |
| 网关 | 用户限流 | 单用户每秒最多1次 |
| 应用 | 黑名单 | 封禁异常用户 |
| 应用 | 令牌机制 | 预先发放令牌,有令牌才能抢 |
Step 4:扩展优化¶
CDN静态化:
秒杀页面HTML/JS/CSS全部CDN缓存
只有"抢购"这个Ajax请求到后端
隔离部署:
秒杀服务独立部署,独立数据库
防止秒杀流量影响正常业务
正常业务 → 常规服务集群
秒杀业务 → 独立秒杀集群 ← 隔离!
预热:
秒杀前将库存数据加载到Redis
预建数据库连接池
CDN预热静态资源
案例五:搜索引擎¶
Step 1:需求澄清¶
功能需求: - 输入关键词,返回相关网页列表 - 支持中文分词 - 搜索结果按相关性排序 - 搜索建议(自动补全) - 拼写纠错
非功能需求: - 搜索延迟 < 500ms - 索引覆盖数十亿网页 - 高可用
容量估算:
假设(中等规模搜索引擎):
索引网页数: 10B(100亿)
每天搜索请求: 1B
QPS:
搜索 QPS = 1B / 86400 ≈ 11,500/s
峰值 QPS ≈ 30,000/s
存储:
平均网页大小: 100KB
索引总量远小于原始网页(通常10-20%)
倒排索引大小: ~200TB
Step 2:高层设计¶
┌─────────────────────────────────────────────────────┐
│ 搜索引擎架构 │
│ │
│ ┌───────────┐ │
│ │ 爬虫系统 │ → 抓取网页 │
│ └─────┬─────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ 处理管道 │ → 解析HTML、提取文本、去重 │
│ └─────┬─────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ 索引系统 │ → 构建倒排索引 │
│ └─────┬─────┘ │
│ │ │
│ ┌─────▼──────────────────────────┐ │
│ │ 查询系统 │ │
│ │ 用户查询 → 分词 → 检索 → 排序 → 返回结果 │
│ └────────────────────────────────┘ │
│ │
│ ┌────────────────┐ │
│ │ 缓存层(Redis) │ → 热门查询缓存 │
│ └────────────────┘ │
└─────────────────────────────────────────────────────┘
Step 3:详细设计¶
倒排索引(Inverted Index):
正排索引(Forward Index):
doc_1 → ["系统", "设计", "面试"]
doc_2 → ["设计", "模式", "Java"]
doc_3 → ["系统", "架构", "微服务"]
查"系统设计" → 遍历所有文档 → 太慢!
倒排索引(Inverted Index):
"系统" → [doc_1, doc_3] ← posting list
"设计" → [doc_1, doc_2]
"面试" → [doc_1]
"模式" → [doc_2]
"Java" → [doc_2]
"架构" → [doc_3]
"微服务" → [doc_3]
查"系统设计" → 分词["系统", "设计"]
→ "系统"的posting: [doc_1, doc_3]
→ "设计"的posting: [doc_1, doc_2]
→ 交集: [doc_1] ← 匹配结果
import math
from collections import defaultdict # defaultdict带默认值的字典,避免KeyError
import jieba
class InvertedIndex:
def __init__(self):
# 词 → [(doc_id, 词频, 位置列表)]
self.index = defaultdict(list)
self.doc_store = {}
self.doc_count = 0
def add_document(self, doc_id: str, text: str):
"""向索引中添加文档"""
self.doc_store[doc_id] = text
self.doc_count += 1
# 分词
words = list(jieba.cut(text))
# 统计词频和位置
word_info = defaultdict(lambda: {"freq": 0, "positions": []})
for pos, word in enumerate(words): # enumerate同时获取索引和值
word_info[word]["freq"] += 1
word_info[word]["positions"].append(pos)
# 构建倒排索引
for word, info in word_info.items():
self.index[word].append({
"doc_id": doc_id,
"freq": info["freq"],
"positions": info["positions"]
})
def search(self, query: str):
"""搜索查询"""
# 1. 查询分词
terms = list(jieba.cut(query))
# 2. 获取每个词的posting list
postings = [self.index.get(term, []) for term in terms]
if not postings:
return []
# 3. 求交集(AND查询)
doc_sets = [set(p["doc_id"] for p in posting) for posting in postings]
result_docs = doc_sets[0]
for s in doc_sets[1:]: # 切片操作:[start:end:step]提取子序列
result_docs &= s
# 4. 计算相关性分数(简化版TF-IDF)
scores = {}
for doc_id in result_docs:
score = 0
for term, posting in zip(terms, postings): # zip并行遍历多个可迭代对象
for entry in posting:
if entry["doc_id"] == doc_id:
tf = entry["freq"]
idf = math.log(self.doc_count / len(posting))
score += tf * idf
scores[doc_id] = score
# 5. 按分数排序
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
相关性排序(TF-IDF + BM25):
TF-IDF:
TF(词频) = 词在文档中出现次数 / 文档总词数
IDF(逆文档频率) = log(总文档数 / 包含该词的文档数)
TF-IDF = TF × IDF
高TF-IDF = 这个词在这篇文档中很重要
BM25 (改进的TF-IDF):
score(D, Q) = Σ IDF(qi) × (tf × (k1+1)) / (tf + k1 × (1 - b + b × |D|/avgdl))
k1 = 1.2 (词频饱和参数)
b = 0.75 (文档长度归一化参数)
|D| = 文档长度
avgdl = 平均文档长度
BM25比TF-IDF好在:词频有饱和效应,不会因为出现100次就比50次好2倍
爬虫架构:
分布式爬虫:
┌──────────┐
│ URL管理器 │ ← 待爬URL队列
│ (Redis) │ 已爬URL集合(布隆过滤器)
└───┬──────┘
│
┌─────────────┼─────────────┐
│ │ │
┌────▼───┐ ┌────▼───┐ ┌────▼───┐
│爬虫节点│ │爬虫节点│ │爬虫节点│
│ #1 │ │ #2 │ │ #3 │
└────┬───┘ └────┬───┘ └────┬───┘
│ │ │
└──────┬──────┘─────────────┘
│
┌──────▼──────┐
│ 内容处理 │ → 解析HTML、提取文本
│ 去重 │ → SimHash/MinHash
│ 索引更新 │ → 增量更新倒排索引
└─────────────┘
礼貌爬取规则:
- 遵守robots.txt
- 请求间隔(同一域名至少1-2秒)
- 分布式去重(URL去重 + 内容去重)
Step 4:扩展优化¶
搜索建议(Auto-Complete):
Trie树 + 热门查询频率
用户输入"系统" → 推荐"系统设计", "系统架构", "操作系统"
拼写纠错:
编辑距离(Levenshtein Distance)
"系统设几" → "系统设计"
索引分片:
方案1:按文档分片(每个分片包含不同文档的完整索引)
方案2:按词分片(每个分片包含不同词的posting list)
通常选方案1
查询缓存:
热门查询 → Redis缓存结果
"天气"这种高频查询,缓存命中率很高
案例六:视频流系统¶
Step 1:需求澄清¶
功能需求: - 上传视频 - 观看视频(流畅播放) - 视频搜索 - 评论、点赞 - 弹幕系统
非功能需求: - 高可靠(视频不丢失) - 流畅播放(缓冲时间短) - 支持多种清晰度 - 全球CDN分发
容量估算:
假设:
DAU: 100M
每人每天看5个视频,平均5分钟
每天新上传视频: 500K
平均视频大小: 500MB
存储:
每天新增: 500K × 500MB = 250TB/天
每年: 250TB × 365 ≈ 90PB
带宽:
同时在线观看: 10M用户
平均码率: 5Mbps
总带宽: 10M × 5Mbps = 50Tbps
Step 2:高层设计¶
┌─────────────────────────────────────────────────┐
│ 视频流系统架构 │
│ │
│ 上传流程: │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │上传 │→ │存储 │→ │转码 │→ │CDN │ │
│ │服务 │ │(S3) │ │集群 │ │分发 │ │
│ └──────┘ └──────┘ └──────┘ └──────┘ │
│ │
│ 播放流程: │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │播放器│← │CDN │← │源站 │← │存储 │ │
│ │(ABR) │ │边缘 │ │(回源)│ │(S3) │ │
│ └──────┘ └──────┘ └──────┘ └──────┘ │
│ │
│ 元数据: │
│ ┌───────────┐ ┌──────────┐ │
│ │视频信息DB │ │搜索引擎ES │ │
│ └───────────┘ └──────────┘ │
└─────────────────────────────────────────────────┘
Step 3:详细设计¶
视频上传与转码流程:
上传流程:
用户 → 上传原始视频(1080p, H.264, 500MB)
│
▼
上传服务
├── 校验(格式、大小、内容安全)
├── 生成唯一video_id
└── 存储到对象存储(S3/OSS)
│
▼
发送转码任务到消息队列
│
▼
转码集群(并行处理)
├── 240p (H.264, ~50MB)
├── 360p (H.264, ~100MB)
├── 480p (H.264, ~200MB)
├── 720p (H.264, ~350MB)
├── 1080p (H.264, ~500MB)
└── 4K (H.265/VP9, ~2GB) [可选]
│
▼
生成HLS/DASH分片
每个清晰度 → 切成2-10秒的小片段(.ts文件)
生成播放清单(.m3u8)
│
▼
上传到CDN源站
│
▼
更新视频状态为"可播放"
通知用户上传完成
自适应码率(ABR):
ABR (Adaptive Bitrate Streaming):
根据用户网络状况自动切换清晰度
用户网速 10Mbps 用户网速降至 2Mbps
│ │
▼ ▼
播放1080p 自动切换到480p
(码率 5Mbps) (码率 1.5Mbps)
实现方式(HLS为例):
Master Playlist (master.m3u8):
#EXTM3U
#EXT-X-STREAM-INF:BANDWIDTH=500000,RESOLUTION=426x240
240p/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=1500000,RESOLUTION=854x480
480p/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=5000000,RESOLUTION=1920x1080
1080p/playlist.m3u8
播放器逻辑:
1. 获取Master Playlist
2. 测量当前下载速度
3. 选择不超过当前带宽80%的最高清晰度
4. 持续监测,动态切换
┌─────────────────────────────────────┐
│ 带宽 │ 清晰度 │ 码率 │
│──────────┼──────────┼─────────────│
│ < 1Mbps │ 240p │ 500Kbps │
│ 1-3Mbps │ 480p │ 1.5Mbps │
│ 3-8Mbps │ 720p │ 3Mbps │
│ > 8Mbps │ 1080p │ 5Mbps │
└─────────────────────────────────────┘
CDN分发架构:
全球CDN分发:
┌──────────────────────────────────────────┐
│ 源站 │
│ (对象存储 S3/OSS) │
└─────────┬──────────┬──────────┬──────────┘
│ │ │
┌──────▼───┐ ┌────▼────┐ ┌──▼───────┐
│ 北美CDN │ │ 亚太CDN │ │ 欧洲CDN │
│ POP节点 │ │ POP节点 │ │ POP节点 │
└──┬───┬───┘ └──┬──┬──┘ └──┬───┬────┘
│ │ │ │ │ │
┌──▼┐ ┌▼──┐ ┌──▼┐ ┌▼──┐ ┌▼──┐ ┌▼──┐
│LA │ │NY │ │东京│ │新加│ │伦敦│ │法兰│
│ │ │ │ │ │ │坡 │ │ │ │克福│
└───┘ └───┘ └───┘ └───┘ └───┘ └───┘
用户请求流程:
1. 用户请求视频片段
2. DNS解析到最近的CDN POP节点
3. POP节点有缓存 → 直接返回
4. POP节点无缓存 → 回源拉取 → 缓存 → 返回
缓存策略:
- 热门视频:主动推送到CDN(Push)
- 长尾视频:按需拉取(Pull)
- 过期策略:LRU + TTL
弹幕系统设计:
弹幕系统:
发送弹幕:
用户发送 → WebSocket → 弹幕服务 → 存储
→ 广播给同一视频的其他用户
拉取弹幕:
播放器 → 按时间段拉取弹幕 → 渲染到画面上
弹幕数据结构:
{
"danmaku_id": "xxx",
"video_id": "v001",
"user_id": "u001",
"content": "666",
"time_offset": 38.5, // 视频第38.5秒
"color": "#FFFFFF",
"position": "scroll", // scroll/top/bottom
"font_size": 25
}
存储方案:
按视频时间分段存储(每10秒一个分段)
video_v001_0: [0-10秒的弹幕]
video_v001_10: [10-20秒的弹幕]
video_v001_20: [20-30秒的弹幕]
播放到第35秒 → 提前拉取 30-40秒的弹幕
实时弹幕广播:
同一视频的观众 → 同一个WebSocket房间
新弹幕 → 广播到房间内所有连接
弹幕密度控制:
每屏最多显示N条,超出后随机丢弃或按优先级过滤
import json
import time
class DanmakuService:
def __init__(self, redis_client, db):
self.redis = redis_client
self.db = db
self.SEGMENT_SECONDS = 10
def send_danmaku(self, video_id: str, user_id: str,
content: str, time_offset: float):
"""发送弹幕"""
danmaku = {
"id": generate_id(),
"video_id": video_id,
"user_id": user_id,
"content": content,
"time_offset": time_offset,
"created_at": time.time()
}
# 1. 存储到数据库
self.db.save_danmaku(danmaku)
# 2. 写入Redis分段缓存
segment = int(time_offset // self.SEGMENT_SECONDS) * self.SEGMENT_SECONDS
key = f"danmaku:{video_id}:{segment}"
self.redis.zadd(key, {json.dumps(danmaku): time_offset}) # json.dumps将Python对象转为JSON字符串
self.redis.expire(key, 86400) # 缓存1天
# 3. 实时广播给同一视频的观众
self._broadcast(video_id, danmaku)
def get_danmaku(self, video_id: str, start_time: float, end_time: float):
"""获取时间段内的弹幕"""
start_seg = int(start_time // self.SEGMENT_SECONDS) * self.SEGMENT_SECONDS
end_seg = int(end_time // self.SEGMENT_SECONDS) * self.SEGMENT_SECONDS
result = []
for seg in range(start_seg, end_seg + self.SEGMENT_SECONDS, self.SEGMENT_SECONDS):
key = f"danmaku:{video_id}:{seg}"
items = self.redis.zrangebyscore(key, start_time, end_time)
result.extend([json.loads(item) for item in items]) # json.loads将JSON字符串转为Python对象
# 密度控制:最多返回100条
if len(result) > 100:
result = sorted(result, key=lambda x: x["created_at"])[:100] # lambda匿名函数:简洁的单行函数
return result
Step 4:扩展优化¶
视频推荐:
协同过滤 + 内容特征 + 深度学习
看完视频A → 推荐相似视频
视频搜索:
标题/标签/字幕 → Elasticsearch
视频封面 → 图像特征索引(以图搜视频)
版权保护:
内容指纹(视频DNA/音频DNA)
上传时自动检查是否侵权
成本优化:
冷视频(长期不播放)→ 归档存储(如S3 Glacier)
热视频 → 高性能存储 + CDN预热
中间 → 标准存储 + CDN按需缓存
练习与延伸阅读¶
综合练习¶
-
系统选择题: 如果让你设计一个在线文档协作系统(类Google Docs),你会选择哪些核心组件?请用四步法分析。
-
扩展设计: 在短链接系统基础上,增加"链接访问数据分析"功能(PV/UV、地域分布、设备分布),如何设计?
-
优化题: Feed流系统中,如果一个用户关注了5000个人,且其中有100个是百万粉丝大V,如何优化这个用户的Feed加载性能?
-
容灾题: 秒杀系统中,如果Redis主节点突然宕机且还没完成failover,如何保证不超卖?
-
规模题: 你的搜索引擎索引了100亿网页,现在需要支持实时搜索(新网页发布后1分钟内可搜索到),如何设计?
延伸阅读¶
- System Design Interview (Alex Xu) — 第1+2册,经典参考
- Designing Data-Intensive Applications (DDIA) — 深入理解数据系统
- Google SRE Book — 大规模系统运维经验
- 阿里技术团队:《秒杀系统设计与实现》
- 美团技术博客:Feed流系统实践
- B站技术团队:弹幕系统架构
- 字节跳动:短视频推荐架构
📝 六大案例速查表¶
| 系统 | 核心难点 | 关键技术 | 读/写特点 |
|---|---|---|---|
| 短链接 | 唯一性+高读QPS | 62进制编码+布隆过滤器+缓存 | 读多写少(100:1) |
| IM | 实时性+可靠性 | WebSocket+消息队列+读写位点 | 读写均衡 |
| Feed流 | 大V扇出+读性能 | 推拉结合+活跃用户策略 | 读多写少 |
| 秒杀 | 高并发+不超卖 | 层层削峰+Redis Lua原子扣减 | 写竞争激烈 |
| 搜索 | 海量数据+相关性 | 倒排索引+BM25+分片 | 读多写少 |
| 视频流 | 大存储+流畅播放 | 转码+CDN+ABR自适应码率 | 读密集 |
← 上一章:高可用与容灾 | 返回目录 →