跳转至

系统设计面试题 - 12道经典题目深度解析

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

系统设计面试题图

系统设计面试是高级工程师面试的核心环节。本文整理12道经典系统设计题,每题按需求分析→概要设计→详细设计→扩展讨论四步法展开。


面试通用框架:四步法

步骤 时间分配 核心内容
需求分析 5分钟 功能性需求、非功能性需求、容量估算
概要设计 10分钟 API设计、数据模型、高层架构
详细设计 15分钟 核心算法、关键决策、数据流
扩展讨论 10分钟 性能优化、故障处理、监控告警

1. 设计短链接系统(TinyURL)

1.1 需求分析

功能性需求: - 给定长 URL,生成短链接 - 访问短链接时重定向到原始 URL - 可选:自定义短链接、设置过期时间、点击统计

非功能性需求: - 高可用性(99.99%) - 低延迟(重定向 < 100ms) - 短链接不可预测(安全性)

容量估算: - 假设每天创建 1 亿条短链接,读写比 100:1 - 写入 QPS:100M / 86400 ≈ 1160/s - 读取 QPS:116,000/s - 存储:每条记录约 500 bytes,每天 50GB,5年约 90TB - 短链接长度:62^7 ≈ 3.5万亿,足够使用

1.2 概要设计

API 设计:

Text Only
POST /api/v1/shorten
  Request:  { "long_url": "https://...", "expiry": "2025-12-31" }
  Response: { "short_url": "https://tiny.url/aB3cD9x" }

GET /{short_code}
  Response: 301/302 Redirect to long_url

数据模型:

SQL
URL :
  id          BIGINT PRIMARY KEY (自增)
  short_code  VARCHAR(7) UNIQUE INDEX
  long_url    TEXT NOT NULL
  user_id     BIGINT (可选)
  created_at  TIMESTAMP
  expires_at  TIMESTAMP (可选)
  click_count BIGINT DEFAULT 0

架构概览:

Text Only
客户端 → 负载均衡器(Nginx/ALB) → API服务器集群 → 缓存层(Redis) → 数据库(MySQL分片)
                                                            短码生成服务
  • 客户端发送请求到负载均衡器
  • 负载均衡器分发到 API 服务器集群
  • API 服务器先查 Redis 缓存,缓存未命中则查数据库
  • 短码生成服务负责生成唯一短码

1.3 详细设计

短码生成方案对比:

方案 优点 缺点
哈希 + Base62 简单、无中心依赖 可能冲突,需要碰撞处理
自增ID + Base62 无冲突、有序 可预测,需分布式ID生成
预生成 Key 服务 无冲突、无延迟 额外维护成本

方案1:哈希 + Base62

Text Only
long_url → MD5/SHA256 → 取前43位 → Base62编码 → 7位短码
碰撞处理:追加用户ID或时间戳重新哈希,或用布隆过滤器快速判断是否已存在。

方案2:分布式自增ID(推荐) - 使用 Snowflake 或 Twitter 的分布式 ID 生成器 - 将 ID 转换为 62 进制字符串

Python
CHARS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

def id_to_short_code(num: int) -> str:
    if num == 0:
        return CHARS[0]
    result = []
    while num > 0:
        result.append(CHARS[num % 62])
        num //= 62
    return ''.join(reversed(result))

301 vs 302 重定向: - 301 Moved Permanently: 浏览器缓存重定向,减少服务器负载,但无法统计点击 - 302 Found: 每次请求都经过服务器,可以统计点击和做 A/B 测试 - 推荐:默认用 302,需要统计数据;无需统计时用 301

缓存策略: - 使用 Redis 缓存热门短链接,LRU 淘汰策略 - 缓存命中率目标 > 80% - 缓存 TTL 设置为 24-48 小时

1.4 扩展讨论

性能优化: - 数据库读写分离:主库写入,从库读取 - 数据库分片:按短码首字符或哈希值分片 - 地理分布式部署:就近服务用户 - CDN 缓存:301 重定向可被 CDN 缓存

故障处理: - 数据库主从切换(自动 Failover) - Redis 集群模式避免单点故障 - 限流防止恶意创建(令牌桶算法) - 布隆过滤器防止无效短码查询穿透

监控告警: - QPS 监控、响应时间 P99 - 缓存命中率、数据库连接数 - 短码碰撞率、存储使用率


2. 设计消息队列

2.1 需求分析

功能性需求: - 生产者发布消息到 Topic - 消费者订阅 Topic 并消费消息 - 支持点对点(P2P)和发布/订阅(Pub/Sub)模式 - 消息持久化,重启不丢失 - 支持消息重试和死信队列

非功能性需求: - 高吞吐量(百万级 TPS) - 低延迟(P99 < 10ms) - 消息可靠性(不丢失、不重复) - 消息顺序保证(分区级别)

容量估算: - 日均消息量:10 亿条 - 平均消息大小:1KB - 日存储需求:1TB - 峰值 TPS:50,000 - 消息保留 7 天:7TB

2.2 概要设计

API 设计:

Text Only
// 生产者
POST /api/v1/topics/{topic}/messages
  Request:  { "key": "order_123", "value": "...", "headers": {...} }
  Response: { "partition": 3, "offset": 12345 }

// 消费者
GET /api/v1/topics/{topic}/messages?group={group_id}&timeout=5000
  Response: { "messages": [...], "offsets": {...} }

POST /api/v1/topics/{topic}/ack
  Request:  { "group": "group_1", "offsets": {"0": 100, "1": 200} }

数据模型:

SQL
Topic:
  name        VARCHAR PRIMARY KEY
  partitions  INT
  replication INT
  retention   BIGINT (毫秒)

Message (每个Partition一个日志文件):
  offset      BIGINT (自增,分区内唯一)
  key         BYTES
  value       BYTES
  timestamp   BIGINT
  headers     MAP<STRING, STRING>

ConsumerOffset:
  group_id    VARCHAR
  topic       VARCHAR
  partition   INT
  offset      BIGINT
  PRIMARY KEY (group_id, topic, partition)

架构概览:

Text Only
生产者集群 → Broker集群(Leader/Follower) → 消费者组
    ↑              ↓                          ↓
    └── 协调服务(ZooKeeper/Raft) ←── 消费者偏移存储
  • 每个 Topic 分为多个 Partition,分布在不同 Broker
  • 每个 Partition 有一个 Leader 和多个 Follower
  • 消费者组中每个消费者负责一部分 Partition

2.3 详细设计

消息存储设计:

采用追加写日志(Append-Only Log): - 每个 Partition 是一个有序的、不可变的消息序列 - 消息追加写到日志末尾,通过 offset 索引 - 分段存储:每个段文件固定大小(如 1GB),便于清理

Text Only
partition-0/
  00000000000000000000.log    # 第一个段
  00000000000000000000.index  # 稀疏索引
  00000000000012345678.log    # 第二个段
  00000000000012345678.index

顺序写 + 零拷贝(Zero-Copy): - 顺序写磁盘速度接近内存(600MB/s) - 使用 sendfile() 系统调用,避免用户态和内核态之间的数据拷贝

ACK 机制:

ACK 级别 含义 可靠性 性能
acks=0 不等待确认 最低 最高
acks=1 Leader 写入确认 中等 中等
acks=all 所有副本确认 最高 最低

消息顺序保证: - 分区内有序: 同一 Partition 内消息严格有序 - 全局有序: 只使用一个 Partition(牺牲吞吐量) - Key 级别有序: 相同 Key 的消息路由到同一 Partition

消费者组协调: - 消费者加入/离开触发 Rebalance - 分配算法:Range、RoundRobin、Sticky - Sticky 分配最小化 Partition 迁移

2.4 扩展讨论

性能优化: - 批量发送/消费,减少网络开销 - 消息压缩(Snappy/LZ4/Zstd) - Page Cache 利用,减少磁盘 IO - 生产者端幂等性(Producer ID + Sequence Number)

故障处理: - ISR(In-Sync Replicas)机制:落后太多的副本剥离 - Unclean Leader Election:是否允许非同步副本成为 Leader - 消费者故障:Session Timeout 触发 Rebalance - 消息积压:增加消费者数量(不超过 Partition 数)

监控告警: - 消费者 Lag(消费延迟):最新offset - 消费offset - Broker CPU/内存/磁盘使用率 - ISR 缩减告警 - 生产者发送失败率


3. 设计 Feed 流系统(微博/Twitter)

3.1 需求分析

功能性需求: - 用户发布推文(文字、图片、视频) - 用户关注/取关其他用户 - 查看自己的 Timeline(关注人的推文按时间排序) - 点赞、评论、转发

非功能性需求: - Timeline 加载 < 200ms - 新推文在 5 秒内出现在粉丝 Timeline - 支持大V(千万级粉丝)

容量估算: - DAU:3 亿 - 每用户日均查看 Timeline:5 次 - Timeline QPS:3亿 × 5 / 86400 ≈ 17,000/s - 日均发推:5000 万条 - 写入 QPS:580/s(读写比 30:1)

3.2 概要设计

API 设计:

Text Only
POST /api/v1/tweets       # 发推文
GET  /api/v1/timeline     # 获取Timeline
GET  /api/v1/feed/{user}  # 获取用户主页
POST /api/v1/follow       # 关注

数据模型:

Text Only
User: { user_id, name, avatar, follower_count, following_count }
Tweet: { tweet_id, user_id, content, media_urls, created_at, like_count }
Follow: { follower_id, followee_id, created_at }
Timeline: { user_id, tweet_id, created_at }  # Feed 缓存表

架构概览:

Text Only
                    ┌─→ Timeline 服务 → Redis Timeline 缓存
用户请求 → API网关 ─┤
                    └─→ 推文服务 → 推文DB → Fanout 服务 → 异步写入粉丝Timeline

3.3 详细设计

推模型 vs 拉模型:

方案 原理 优点 缺点
推模型(Fanout-on-Write) 发推时写入所有粉丝的Timeline 读取快O(1) 大V fanout 慢、存储浪费
拉模型(Fanout-on-Read) 读Timeline时聚合关注人的推文 写入快、节省存储 读取慢、需要合并排序
混合模型 普通用户推、大V拉 兼顾两者 实现复杂

混合策略(推荐): - 粉丝数 < 1万:使用推模型 - 粉丝数 ≥ 1万(大V):使用拉模型 - 用户获取 Timeline 时,合并推模型缓存 + 大V实时拉取

Fanout 服务设计:

Python
def fanout_tweet(tweet_id, author_id):
    followers = get_followers(author_id)

    if len(followers) > CELEBRITY_THRESHOLD:
        # 大V:不推送,标记为需要拉取
        mark_as_celebrity_tweet(tweet_id)
        return

    # 普通用户:推送到所有粉丝的 Timeline 缓存
    for follower_id in followers:
        # 异步消息队列处理
        mq.publish("timeline_fanout", {
            "user_id": follower_id,
            "tweet_id": tweet_id,
            "timestamp": time.time()
        })

Timeline 缓存结构(Redis):

Text Only
Key: timeline:{user_id}
Type: Sorted Set
Score: timestamp
Member: tweet_id

ZADD timeline:user123 1706000000 tweet456
ZREVRANGE timeline:user123 0 19  # 获取最新20条

每个用户缓存最近 800 条推文 ID,超出自动淘汰。

3.4 扩展讨论

性能优化: - CDN 缓存静态资源(图片/视频) - Timeline 预加载(下拉刷新时提前加载) - 推文内容与 ID 分层缓存 - 冷用户不预生成 Timeline

故障处理: - Fanout 服务故障:消息队列保证最终一致性 - Redis 缓存失效:回源数据库重建 - 数据库分片:按 user_id 分片

监控告警: - Fanout 延迟(发推到粉丝可见的时间) - Timeline 缓存命中率 - 大V 推文的合并延迟


4. 设计即时通讯系统(IM)

4.1 需求分析

功能性需求: - 一对一聊天和群聊 - 发送文字、图片、语音、视频消息 - 在线状态显示 - 已读/未读标记 - 消息历史记录查询

非功能性需求: - 消息端到端延迟 < 200ms - 消息不丢失、不重复 - 支持离线消息推送 - 支持百万级并发连接

容量估算: - DAU:5000 万 - 每用户日均发送 40 条消息 - 日消息量:20 亿条 - 峰值 QPS:约 50,000 消息/秒 - 并发长连接:500 万

4.2 概要设计

API 设计:

Text Only
WebSocket 连接: wss://chat.example.com/ws?token=xxx

消息格式(JSON):
{
  "type": "message",
  "msg_id": "uuid",
  "from": "user_a",
  "to": "user_b" / "group_123",
  "content": { "type": "text", "text": "Hello" },
  "timestamp": 1706000000
}

ACK格式:
{ "type": "ack", "msg_id": "uuid", "status": "delivered" }

数据模型:

SQL
Message:
  msg_id       VARCHAR PRIMARY KEY
  conversation_id VARCHAR INDEX
  sender_id    BIGINT
  content_type ENUM('text','image','voice','video')
  content      TEXT/BLOB
  created_at   TIMESTAMP

Conversation:
  conv_id      VARCHAR PRIMARY KEY
  type         ENUM('private','group')
  members      JSON
  last_msg_id  VARCHAR
  updated_at   TIMESTAMP

UserConversation:
  user_id      BIGINT
  conv_id      VARCHAR
  unread_count INT
  last_read_msg VARCHAR
  PRIMARY KEY (user_id, conv_id)

架构概览:

Text Only
客户端 ←─ WebSocket ─→ 连接网关集群 → 消息路由服务 → 消息存储(DB)
                            ↑                ↓
                        在线状态服务     推送服务(离线)
                        (Redis)        (APNs/FCM)

4.3 详细设计

WebSocket 连接管理: - 每台网关服务器维护数万长连接 - 用户连接时注册:Redis SET online:{user_id} gateway_server_id - 心跳检测:30秒间隔,3次未响应断开

消息发送流程: 1. 发送方通过 WebSocket 发送消息到网关 2. 网关转发到消息路由服务 3. 路由服务生成全局唯一 msg_id(Snowflake) 4. 消息写入存储(异步) 5. 查询接收方在线状态和所在网关 6. 在线:通过目标网关推送;离线:推送通知

消息可靠性保证:

Text Only
发送方 → 网关(收到ACK_1) → 服务端存储(存储成功)→ 接收方网关 → 接收方(收到ACK_2)
         ↑                                              ↑
    超时重发                                       超时重发

  • 客户端本地消息队列,发送后等待 ACK
  • 服务端幂等处理(msg_id 去重)
  • 三次确认:发送ACK、存储ACK、接收ACK

已读/未读实现: - 每个会话记录用户最后已读消息 ID - 未读计数 = 会话最新消息 ID - 最后已读消息 ID(通过序号实现) - 群聊:每人维护自己的已读位置

群聊消息扩散: - 写扩散:消息写一份到群消息表 - 读扩散:每个成员读取时从群消息表拉取 - 未读计数用 Redis 维护:HINCRBY unread:{user_id} {conv_id} 1

4.4 扩展讨论

性能优化: - 消息合并发送(Batch),减少网络往返 - 连接层和业务层分离,独立扩展 - 消息存储:热数据(近7天)在快存储,冷数据归档 - Protocol Buffers 替代 JSON(节省带宽 30%+)

故障处理: - 网关故障:客户端自动重连到其他网关 - 消息队列缓冲:路由服务故障时消息不丢失 - 多机房部署:同城双活 + 异地灾备

监控告警: - 在线用户数、连接数 - 消息投递延迟 P99 - 消息投递成功率 - WebSocket 连接断开率


5. 设计秒杀系统

5.1 需求分析

功能性需求: - 定时开始秒杀活动 - 用户抢购限量商品 - 每人限购一件 - 秒杀成功后创建订单并支付

非功能性需求: - 峰值 QPS 可能达百万级 - 库存扣减准确(不超卖、不少卖) - 响应时间 < 200ms - 系统不因流量高峰崩溃

容量估算: - 商品库存:1000件 - 参与用户:100万 - 瞬时 QPS:50万(秒杀开始瞬间) - 实际有效请求:1000

5.2 概要设计

API 设计:

Text Only
POST /api/v1/seckill/{item_id}
  Header: Authorization: Bearer {token}
  Response:
    200 { "order_id": "xxx", "status": "success" }
    400 { "error": "sold_out" }
    429 { "error": "rate_limited" }

架构概览(多级过滤漏斗):

Text Only
用户 → CDN(静态页面) → 网关(限流) → 风控(去机器人) → 库存校验(Redis) → 下单(MQ) → 支付

每一层过滤掉大量无效请求,最终到达下游的流量很小。

5.3 详细设计

流量削峰三板斧:

1. 前端优化: - 秒杀按钮倒计时,防止提前请求 - 点击后按钮置灰,防止重复提交 - 秒杀页面静态化 + CDN 缓存 - 前端验证码/答题,人为延迟 0.5-2 秒

2. 网关层限流: - 令牌桶算法限制总 QPS - 同一用户 1 秒内只能请求一次(IP + UserID 限流) - 库存售罄后直接返回,不再透传请求

3. 消息队列异步下单:

Python
# Redis 原子扣减库存
def try_seckill(user_id, item_id):
    # 1. 检查是否已购买(防重复)
    if redis.sismember(f"seckill:bought:{item_id}", user_id):
        return "already_bought"

    # 2. Redis 原子扣减(Lua 脚本保证原子性)
    lua_script = """
    local stock = tonumber(redis.call('get', KEYS[1]))
    if stock and stock > 0 then
        redis.call('decr', KEYS[1])
        redis.call('sadd', KEYS[2], ARGV[1])
        return 1
    end
    return 0
    """
    result = redis.eval(lua_script, 2,
                        f"seckill:stock:{item_id}",
                        f"seckill:bought:{item_id}",
                        user_id)

    if result == 1:
        # 3. 发送到消息队列,异步创建订单
        mq.publish("seckill_order", {
            "user_id": user_id,
            "item_id": item_id,
            "timestamp": time.time()
        })
        return "success"
    return "sold_out"

分布式锁(Redisson): - 场景:活动开始前预热库存到 Redis - 使用 Redis SET NX EX 实现分布式锁 - Redis 集群模式下使用 RedLock 算法

订单超时处理: - 用户秒杀成功后有 15 分钟支付时间 - 使用延迟队列(RocketMQ 延迟消息)或 Redis 过期事件 - 超时未支付:回滚库存,取消订单

5.4 扩展讨论

性能优化: - 库存预热:活动前将库存加载到 Redis - 本地缓存:每个应用节点缓存库存状态,售罄后直接拦截 - 分段库存:将 1000 件分成 10 个段,减少 Redis 热 key 压力

故障处理: - Redis 故障:备用节点自动切换,或降级到数据库扣减 - MQ 故障:本地消息表 + 定时重试 - 超卖防护:数据库层加乐观锁 UPDATE stock SET count=count-1 WHERE count>0

监控告警: - 实时库存监控 - QPS 和响应时间 - 订单创建成功率 - 支付转化率


6. 设计分布式缓存

6.1 需求分析

功能性需求: - 支持 Get/Set/Delete 操作 - 支持过期时间(TTL) - 支持多种数据结构(String、Hash、List、Set) - 支持集群模式

非功能性需求: - 读写延迟 < 1ms - 高可用(99.99%) - 线性可扩展(动态增减节点) - 内存使用效率高

容量估算: - 缓存数据量:1TB - QPS:100万/s(读写比 10:1) - 单节点内存:64GB - 节点数:约 20 个

6.2 概要设计

API 设计:

Text Only
GET    /cache/{key}
PUT    /cache/{key}  { "value": "...", "ttl": 3600 }
DELETE /cache/{key}

架构概览:

Text Only
客户端 SDK(带路由) → 代理层(可选) → 缓存节点集群
                                      ├─ Node1 (slot 0-5460)
                                      ├─ Node2 (slot 5461-10922)
                                      └─ Node3 (slot 10923-16383)
                                           ↕ (主从复制)
                                         Replica

6.3 详细设计

一致性哈希(Consistent Hashing):

传统哈希取模 hash(key) % N 在节点增减时导致大量 key 重新分配。一致性哈希将节点和 key 映射到同一个环上:

Python
import hashlib
from bisect import bisect_right

class ConsistentHash:
    def __init__(self, nodes=None, virtual_nodes=150):
        self.ring = {}       # hash值 -> 节点
        self.sorted_keys = [] # 有序hash值列表
        self.virtual_nodes = virtual_nodes

        if nodes:
            for node in nodes:
                self.add_node(node)

    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def add_node(self, node: str):
        for i in range(self.virtual_nodes):
            vnode_key = f"{node}:vn{i}"
            h = self._hash(vnode_key)
            self.ring[h] = node
            self.sorted_keys.append(h)
        self.sorted_keys.sort()

    def remove_node(self, node: str):
        for i in range(self.virtual_nodes):
            vnode_key = f"{node}:vn{i}"
            h = self._hash(vnode_key)
            del self.ring[h]
            self.sorted_keys.remove(h)

    def get_node(self, key: str) -> str:
        h = self._hash(key)
        idx = bisect_right(self.sorted_keys, h) % len(self.sorted_keys)
        return self.ring[self.sorted_keys[idx]]

虚拟节点的作用:解决节点数少时数据分布不均匀的问题。每个真实节点映射为多个虚拟节点。

缓存三大问题:

问题 定义 解决方案
缓存穿透 查询不存在的数据,请求穿透到DB 布隆过滤器 + 空值缓存
缓存击穿 热点key过期,大量请求打到DB 互斥锁 + 永不过期 + 逻辑过期
缓存雪崩 大量key同时过期 过期时间加随机值 + 多级缓存

缓存穿透防护(布隆过滤器):

Python
class BloomFilter:
    def __init__(self, size=1000000, hash_count=7):
        self.size = size
        self.hash_count = hash_count
        self.bit_array = [0] * size

    def add(self, item):
        for seed in range(self.hash_count):
            idx = hash(f"{seed}:{item}") % self.size
            self.bit_array[idx] = 1

    def might_contain(self, item):
        for seed in range(self.hash_count):
            idx = hash(f"{seed}:{item}") % self.size
            if self.bit_array[idx] == 0:
                return False
        return True  # 可能存在(有误判率)

热 Key 处理: - 检测:统计访问频率,超过阈值告警 - 应对:本地缓存(L1 Cache)、Key 分片(key_1, key_2, ... 随机读取) - 读写分离:主节点写入,多个从节点读取

6.4 扩展讨论

性能优化: - Pipeline 批量操作,减少网络往返 - 客户端缓存(Client-side caching) - 内存碎片整理(jemalloc) - 大 Key 拆分

故障处理: - 主从切换:Sentinel 模式自动故障转移 - 数据丢失:RDB + AOF 持久化 - 脑裂问题:min-slaves-to-write 配置

监控告警: - 内存使用率(> 80% 告警) - 命中率(< 90% 需要排查) - 慢查询日志 - 主从复制延迟


7. 设计搜索引擎

7.1 需求分析

功能性需求: - 网页爬取和索引 - 关键词搜索,返回相关结果 - 搜索结果排序(相关性排名) - 搜索建议和自动补全 - 拼写纠错

非功能性需求: - 搜索延迟 < 500ms - 索引实时性 < 1小时 - 支持数十亿网页 - 高可用性

容量估算: - 索引网页数:100亿 - 平均页面大小(索引后):100KB - 索引数据量:1PB - 日搜索量:50亿次 - 峰值 QPS:100,000/s

7.2 概要设计

API 设计:

Text Only
GET /api/v1/search?q={query}&page=1&size=10
Response: {
  "results": [
    { "url": "...", "title": "...", "snippet": "...", "score": 0.95 },
    ...
  ],
  "total": 123456,
  "suggestions": ["related query 1", "..."]
}

GET /api/v1/suggest?q={prefix}
Response: { "suggestions": ["query1", "query2", ...] }

架构概览:

Text Only
用户查询 → 查询解析 → 索引服务集群 → 排序服务(PageRank+ML) → 结果聚合 → 返回用户
              爬虫集群 → 文档处理 → 倒排索引构建

7.3 详细设计

倒排索引(Inverted Index):

正排索引:文档 → [词条列表] 倒排索引:词条 → [文档列表(含位置)]

Text Only
"Python"  → [(doc1, [5,23,89]), (doc3, [12,45]), (doc7, [3])]
"编程"    → [(doc1, [6,24]),    (doc2, [1,15]),  (doc5, [33])]
Python
from collections import defaultdict  # defaultdict带默认值的字典,避免KeyError

class InvertedIndex:
    def __init__(self):
        self.index = defaultdict(list)  # term -> [(doc_id, [positions])]
        self.doc_count = 0

    def add_document(self, doc_id, text):
        self.doc_count += 1
        words = self.tokenize(text)
        term_positions = defaultdict(list)

        for pos, word in enumerate(words):  # enumerate同时获取索引和值
            term_positions[word].append(pos)

        for term, positions in term_positions.items():
            self.index[term].append((doc_id, positions))

    def search(self, query):
        terms = self.tokenize(query)
        if not terms:
            return []

        # 获取每个词的文档列表
        posting_lists = [self.index.get(t, []) for t in terms]

        # 求交集(AND 查询)
        if not posting_lists:
            return []

        doc_sets = [set(doc_id for doc_id, _ in pl) for pl in posting_lists]
        result_docs = doc_sets[0]
        for ds in doc_sets[1:]:  # 切片操作:[start:end:step]提取子序列
            result_docs &= ds

        return list(result_docs)

    def tokenize(self, text):
        # 简化的分词(实际需要jieba等分词库)
        return text.lower().split()

PageRank 算法核心思想: - 每个网页的重要性由指向它的网页决定 - PR(A) = (1-d)/N + d × Σ(PR(Ti)/C(Ti)) - d = 阻尼系数(通常 0.85),C(Ti) = 页面 Ti 的出链数 - 迭代计算直到收敛

TF-IDF 相关性评分:

Python
import math

def tf_idf(term, document, documents):
    # TF: 词频
    tf = document.count(term) / len(document)
    # IDF: 逆文档频率
    doc_count = sum(1 for doc in documents if term in doc)
    idf = math.log(len(documents) / (1 + doc_count))
    return tf * idf

BM25 评分(实际工业界常用): 在 TF-IDF 基础上加入文档长度归一化,对 TF 做了非线性缩放。

爬虫设计: - URL Frontier:优先队列 + 去重(布隆过滤器) - Robots.txt 遵守 - 爬取频率控制(Politeness) - 增量爬取 vs 全量爬取

7.4 扩展讨论

性能优化: - 索引分片:按文档 ID 或关键词哈希分片 - 缓存热门查询结果 - 跳跃列表(Skip List)加速倒排索引合并 - 压缩倒排列表(Variable Byte / PForDelta)

故障处理: - 索引副本:每个分片 2-3 个副本 - 爬虫故障:断点续爬 - 搜索降级:部分索引不可用时返回不完整结果

监控告警: - 搜索延迟分布 - 索引新鲜度(最近更新时间) - 爬虫吞吐量 - 零结果率(表示索引覆盖不足)


8. 设计推荐系统

8.1 需求分析

功能性需求: - 为用户推荐感兴趣的内容(商品/视频/文章) - 支持实时推荐和离线推荐 - 新用户冷启动推荐 - 推荐结果多样性

非功能性需求: - 推荐延迟 < 200ms - 推荐 CTR(点击率)最大化 - 推荐结果可解释 - 支持 AB 测试

容量估算: - DAU:1 亿 - 每用户日均推荐请求:50 次 - 推荐 QPS:约 60,000/s - 候选物品池:1 亿 - 用户特征维度:1000+

8.2 概要设计

API 设计:

Text Only
GET /api/v1/recommend?user_id=xxx&scene=homepage&count=20
Response: {
  "items": [
    { "item_id": "...", "score": 0.95, "reason": "基于你的浏览历史" },
    ...
  ],
  "experiment": "exp_abc_v2"
}

架构概览(四阶段漏斗):

Text Only
全量物品库(1亿) → 召回(数千) → 粗排(数百) → 精排(数十) → 重排(最终结果)
                    ↑                            ↑
               离线特征计算                    实时特征服务
               (Spark/Flink)                  (Redis/Feature Store)

8.3 详细设计

召回层(Recall):

多路召回并行执行:

召回策略 方法 特点
协同过滤 UserCF / ItemCF 经典可靠
向量召回 ANN (FAISS/Milvus) 语义匹配
热门召回 按热度/时效排序 补充冷启动
标签召回 基于用户标签匹配物品标签 可解释性好
关注链召回 关注的人喜欢的物品 社交关系

粗排层(Pre-Ranking): - 轻量级模型(双塔模型/逻辑回归) - 从数千个候选中筛选出数百个 - 延迟要求 < 20ms

精排层(Ranking): - 重量级模型(DeepFM/DIN/DCN) - 特征工程:用户特征 × 物品特征 × 上下文特征

Python
# 特征工程示例
def build_features(user_id, item_id, context):
    features = {}

    # 用户特征
    user = get_user_profile(user_id)
    features['user_age'] = user.age
    features['user_gender'] = user.gender
    features['user_active_days'] = user.active_days

    # 物品特征
    item = get_item_info(item_id)
    features['item_category'] = item.category
    features['item_ctr_7d'] = item.ctr_last_7days
    features['item_age_hours'] = (now() - item.create_time).hours

    # 交叉特征
    features['user_item_category_pref'] = get_preference(user_id, item.category)

    # 上下文特征
    features['hour_of_day'] = context.hour
    features['is_weekend'] = context.is_weekend
    features['device_type'] = context.device

    return features

重排层(Re-Ranking): - 多样性打散(MMR 算法) - 去重(同作者/同类型间隔排列) - 业务规则插入(广告、运营位) - 新鲜度提权

AB 测试框架: - 流量分桶:用户 ID 哈希到不同实验组 - 分层实验:不同模块可以独立做实验 - 指标体系:CTR、CVR、用户停留时长、多样性指标

8.4 扩展讨论

性能优化: - 特征预计算 + 缓存 - 模型推理优化(ONNX/TensorRT 加速) - 预测结果缓存(相似用户共享推荐列表)

冷启动: - 新用户:基于人口统计学 + 热门推荐 - 新物品:利用物品属性(Content-Based)+ Explore-Exploit

监控告警: - 推荐 CTR/CVR - 推荐延迟分布 - 特征覆盖率 - 模型 AUC 变化


9. 设计限流器(Rate Limiter)

9.1 需求分析

功能性需求: - 限制某个用户/IP/API 在时间窗口内的请求次数 - 超过限制返回 429 状态码 - 支持多种限流粒度(全局/用户/IP/API级别) - 分布式环境下一致性限流

非功能性需求: - 限流检查延迟 < 1ms - 高可用,限流器故障不影响正常业务 - 准确性(不能差太多)

9.2 概要设计

架构概览:

Text Only
请求 → 限流中间件(Gateway/Nginx) → 限流规则引擎 → Redis 计数器 → 放行/拒绝
                                   规则配置中心

9.3 详细设计

四种限流算法:

1. 固定窗口计数器:

Python
def fixed_window(user_id, limit, window_seconds):
    key = f"rate:{user_id}:{int(time.time()) // window_seconds}"
    count = redis.incr(key)
    if count == 1:
        redis.expire(key, window_seconds)
    return count <= limit
缺点:窗口边界突刺问题(两个窗口交界处可能放过 2×limit 请求)

2. 滑动窗口日志:

Python
def sliding_window_log(user_id, limit, window_seconds):
    key = f"rate:{user_id}"
    now = time.time()
    pipe = redis.pipeline()
    # 移除窗口外的请求
    pipe.zremrangebyscore(key, 0, now - window_seconds)
    # 添加当前请求
    pipe.zadd(key, {str(now): now})
    # 计数
    pipe.zcard(key)
    pipe.expire(key, window_seconds)
    results = pipe.execute()
    return results[2] <= limit

3. 令牌桶算法(Token Bucket):

Python
def token_bucket(user_id, capacity, refill_rate):
    """
    capacity: 桶容量
    refill_rate: 每秒补充的令牌数
    """
    lua_script = """
    local key = KEYS[1]
    local capacity = tonumber(ARGV[1])
    local refill_rate = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])

    local data = redis.call('hmget', key, 'tokens', 'last_refill')
    local tokens = tonumber(data[1]) or capacity
    local last_refill = tonumber(data[2]) or now

    -- 补充令牌
    local elapsed = now - last_refill
    tokens = math.min(capacity, tokens + elapsed * refill_rate)

    local allowed = 0
    if tokens >= 1 then
        tokens = tokens - 1
        allowed = 1
    end

    redis.call('hmset', key, 'tokens', tokens, 'last_refill', now)
    redis.call('expire', key, math.ceil(capacity / refill_rate) * 2)

    return allowed
    """
    return redis.eval(lua_script, 1, f"bucket:{user_id}",
                      capacity, refill_rate, time.time())

4. 漏桶算法(Leaky Bucket): - 请求以任意速率流入桶中 - 桶以固定速率流出(处理请求) - 桶满时拒绝新请求 - 适合流量整形(平滑输出)

算法 突发流量 实现复杂度 精确度
固定窗口 允许(边界问题) 简单
滑动窗口 不允许 中等 高(内存大)
令牌桶 允许(消耗存量) 中等
漏桶 不允许(排队) 简单

分布式限流: - Redis 集中式:所有节点共享 Redis 计数 - 本地 + Redis 混合:本地限流预过滤,Redis 全局精确限流 - 令牌桶预分配:将全局令牌预分配到各节点本地

9.4 扩展讨论

性能优化: - Lua 脚本保证原子性,减少网络往返 - 本地缓存限流规则 - 批量请求令牌

故障处理: - Redis 故障:降级为本地限流 - 限流器 bug:配置全局开关,紧急关闭限流

监控告警: - 被限流的请求比例 - 限流延迟 - 各接口的 QPS 趋势


10. 设计视频流系统(YouTube/Netflix)

10.1 需求分析

功能性需求: - 视频上传、转码、存储 - 视频播放(自适应码率) - 弹幕系统 - 视频推荐、搜索

非功能性需求: - 播放起始延迟 < 2秒 - 流畅播放(缓冲率 < 1%) - 支持多分辨率(360p/720p/1080p/4K) - 全球用户就近访问

容量估算: - DAU:5000 万 - 日均视频上传:50 万条 - 平均视频时长:5 分钟 - 平均原始文件大小:500MB - 日存储新增:250TB(多分辨率转码后倍增)

10.2 概要设计

API 设计:

Text Only
// 上传
POST /api/v1/videos/upload (分片上传)
GET  /api/v1/videos/{id}/upload-status

// 播放
GET /api/v1/videos/{id}/manifest.m3u8  (HLS 播放列表)
GET /api/v1/videos/{id}/segments/{quality}/{segment_number}.ts

// 弹幕
GET  /api/v1/videos/{id}/danmaku?time_start=0&time_end=300
POST /api/v1/videos/{id}/danmaku  { "text": "...", "time": 120.5 }

架构概览:

Text Only
上传流:  客户端 → 上传服务 → 对象存储(S3) → 转码集群 → CDN 分发
播放流:  客户端 → CDN → 源站(对象存储)
弹幕流:  客户端 ←WebSocket→ 弹幕服务 → Redis/Kafka

10.3 详细设计

视频转码流水线:

Text Only
原始视频 → 视频分片(按GOP) → 并行转码(多分辨率) → 打包(HLS/DASH) → 上传到CDN源站
                              生成缩略图/封面
                              提取字幕(ASR)
                              内容审核(AI)
  • 分辨率:360p / 480p / 720p / 1080p / 4K
  • 编码格式:H.264(兼容性好)、H.265/HEVC(压缩率高 30-50%)、AV1(免费 + 高压缩)
  • 容器格式:HLS(苹果生态)、DASH(通用标准)

自适应码率流(ABR - Adaptive Bitrate Streaming):

Text Only
// HLS M3U8 Master Playlist
#EXTM3U
#EXT-X-STREAM-INF:BANDWIDTH=800000,RESOLUTION=640x360
360p/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=2400000,RESOLUTION=1280x720
720p/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=5000000,RESOLUTION=1920x1080
1080p/playlist.m3u8
  • 播放器根据网络带宽动态切换清晰度
  • 每个清晰度的视频被分割为 2-10 秒的小片段
  • 算法:Buffer-Based(基于缓冲区水位)、Throughput-Based(基于带宽测量)

CDN 分发策略: - 多级 CDN:边缘节点 → 区域节点 → 源站 - 热门视频预推送到边缘节点 - 冷门视频按需回源 - 就近接入:DNS 解析到最近的 CDN 节点

弹幕系统设计:

Python
# 弹幕存储:按视频ID + 时间分段
# Redis: video:{id}:danmaku:{time_segment} -> Sorted Set
# Score = 弹幕出现时间(秒), Member = 弹幕JSON

def get_danmaku(video_id, time_start, time_end):
    """获取时间段内的弹幕"""
    segment_start = time_start // 60  # 按分钟分段
    segment_end = time_end // 60
    result = []

    for seg in range(segment_start, segment_end + 1):
        key = f"video:{video_id}:danmaku:{seg}"
        items = redis.zrangebyscore(key, time_start, time_end)
        result.extend(items)

    return result[:MAX_DANMAKU_PER_SCREEN]  # 屏幕显示上限

def send_danmaku(video_id, text, time_point, user_id):
    """发送弹幕"""
    segment = int(time_point) // 60
    key = f"video:{video_id}:danmaku:{segment}"
    danmaku = json.dumps({"text": text, "user": user_id, "time": time_point})  # json.dumps将Python对象转为JSON字符串
    redis.zadd(key, {danmaku: time_point})

    # 实时推送给正在观看的用户
    channel = f"danmaku:live:{video_id}"
    redis.publish(channel, danmaku)

10.4 扩展讨论

性能优化: - 预转码热门视频为更多分辨率 - 视频压缩优化(per-title encoding:根据视频内容选择最佳编码参数) - P2P 分发(减少 CDN 成本)

故障处理: - 转码失败重试(幂等操作) - CDN 节点故障:自动切换到其他节点 - 弹幕服务降级:暂停实时推送,只返回缓存弹幕

监控告警: - 缓冲率(卡顿率) - 起播延迟 - CDN 命中率和回源率 - 转码成功率和耗时


11. 设计电商系统

11.1 需求分析

功能性需求: - 商品浏览和搜索 - 购物车管理 - 订单创建和管理 - 支付和退款 - 库存管理

非功能性需求: - 高可用(99.99%) - 一致性(库存和订单) - 低延迟(搜索 < 200ms,下单 < 500ms) - 支持大促峰值流量(10x 日常)

容量估算: - 商品数:1 亿 - DAU:5000 万 - 日均订单:500 万 - 峰值下单 QPS:10,000/s(大促可达 10 万)

11.2 概要设计

核心微服务拆分:

Text Only
API Gateway → 用户服务 / 商品服务 / 搜索服务 / 购物车服务 / 订单服务 / 支付服务 / 库存服务
                                                                        物流服务 / 通知服务

数据模型(核心表):

SQL
Product:
  product_id   BIGINT PRIMARY KEY
  name         VARCHAR
  description  TEXT
  price        DECIMAL(10,2)
  category_id  INT
  status       ENUM('active','inactive')

SKU:
  sku_id       BIGINT PRIMARY KEY
  product_id   BIGINT FOREIGN KEY
  attributes   JSON  (颜色/尺码等)
  price        DECIMAL(10,2)
  stock        INT

Order:
  order_id     BIGINT PRIMARY KEY
  user_id      BIGINT INDEX
  status       ENUM('created','paid','shipped','completed','cancelled')
  total_amount DECIMAL(10,2)
  created_at   TIMESTAMP

OrderItem:
  item_id      BIGINT PRIMARY KEY
  order_id     BIGINT FOREIGN KEY
  sku_id       BIGINT
  quantity     INT
  unit_price   DECIMAL(10,2)

11.3 详细设计

下单核心流程(分布式事务):

Text Only
1. 用户提交订单
2. 检查库存 (库存服务)
3. 锁定库存 (库存服务 - 预扣减)
4. 创建订单 (订单服务)
5. 调起支付 (支付服务)
6. 支付成功 → 确认库存扣减 + 更新订单状态
7. 支付超时 → 释放库存 + 取消订单

分布式事务方案对比:

方案 一致性 性能 复杂度 适用场景
2PC (两阶段提交) 强一致 低(阻塞) 银行转账
TCC (Try-Confirm-Cancel) 最终一致 电商下单
Saga 最终一致 长事务
本地消息表 最终一致 通用

TCC 实现示例:

Python
class OrderTCC:
    def try_phase(self, order):
        """预留资源"""
        # 1. 库存服务:冻结库存(stock-=n, frozen+=n)
        inventory_service.freeze(order.items)
        # 2. 优惠券服务:冻结优惠券
        coupon_service.freeze(order.coupon_id)
        # 3. 订单服务:创建待确认订单
        order_service.create_pending(order)

    def confirm_phase(self, order):
        """确认提交"""
        # 1. 库存服务:确认扣减(frozen-=n)
        inventory_service.confirm(order.items)
        # 2. 优惠券服务:确认使用
        coupon_service.confirm(order.coupon_id)
        # 3. 订单服务:更新为已确认
        order_service.confirm(order.order_id)

    def cancel_phase(self, order):
        """取消回滚"""
        # 1. 库存服务:释放冻结(frozen-=n, stock+=n)
        inventory_service.cancel(order.items)
        # 2. 优惠券服务:释放优惠券
        coupon_service.cancel(order.coupon_id)
        # 3. 订单服务:标记为已取消
        order_service.cancel(order.order_id)

库存扣减方案:

Python
# Redis + 数据库双写
def deduct_stock(sku_id, quantity):
    # 1. Redis 原子扣减(快速判断)
    lua_script = """
    local stock = tonumber(redis.call('get', KEYS[1]))
    if stock >= tonumber(ARGV[1]) then
        redis.call('decrby', KEYS[1], ARGV[1])
        return 1
    end
    return 0
    """
    if not redis.eval(lua_script, 1, f"stock:{sku_id}", quantity):
        return False

    # 2. 异步数据库持久化(消息队列)
    mq.publish("stock_deduction", {
        "sku_id": sku_id, "quantity": quantity
    })
    return True

支付系统设计要点: - 幂等性:支付请求带唯一订单号,防止重复支付 - 对账:定时与支付渠道对账,发现差异告警 - 退款:异步处理,状态机管理退款流程

11.4 扩展讨论

性能优化: - 读写分离:商品信息主从分离 - 分库分表:订单按 user_id 分片 - 多级缓存:CDN → Redis → 本地缓存 - 搜索优化:Elasticsearch 独立搜索服务

故障处理: - 幂等设计:所有写操作支持重试 - 补偿机制:定时扫描异常订单 - 降级策略:大促时非核心功能降级

监控告警: - GMV(交易总额)实时大盘 - 支付成功率 - 库存告警(低库存/超卖) - 各服务 SLA


12. 设计分布式文件存储

12.1 需求分析

功能性需求: - 文件上传(支持大文件分片上传) - 文件下载 - 文件删除 - 目录管理(创建、列表、删除) - 文件元数据查询

非功能性需求: - 高可靠性(11 个 9,即 99.999999999%) - 高可用性(99.99%) - 高吞吐量 - 支持 PB 级存储 - 数据一致性

容量估算: - 存储容量:10PB - 文件数量:10 亿 - 平均文件大小:10MB - 日写入量:100TB - 读写 QPS:读 100,000/s,写 10,000/s

12.2 概要设计

架构概览(类 GFS/HDFS):

Text Only
客户端 → Master节点(NameNode) ←→ 元数据存储(内存+磁盘持久化)
  ↓          ↓
  └──→ Chunk服务器集群(DataNode)
        ├─ ChunkServer1: [chunk_A(primary), chunk_D(replica)]
        ├─ ChunkServer2: [chunk_A(replica), chunk_B(primary)]
        └─ ChunkServer3: [chunk_B(replica), chunk_C(primary)]

数据模型:

SQL
FileMetadata:
  file_id       UUID PRIMARY KEY
  file_name     VARCHAR
  file_size     BIGINT
  chunk_ids     LIST<UUID>        # 有序的chunk列表
  created_at    TIMESTAMP
  parent_dir    UUID

ChunkMetadata:
  chunk_id      UUID PRIMARY KEY
  file_id       UUID
  chunk_index   INT               # 在文件中的序号
  chunk_size    INT
  replicas      LIST<server_id>   # 副本所在的服务器
  checksum      VARCHAR           # 数据校验

12.3 详细设计

文件分片(Chunking): - 固定大小分片:64MB(GFS)/ 128MB(HDFS) - 大分片优势:减少元数据量,减少 Master 压力 - 大分片劣势:小文件浪费空间(可用变长分片解决)

写入流程:

Text Only
1. 客户端请求Master: "我要写文件 A"
2. Master 分配 chunk_id,选择3个ChunkServer作为副本
3. Master 返回 chunk 位置列表给客户端
4. 客户端将数据发送到 Primary ChunkServer
5. Primary 转发数据到其他副本(链式复制: Primary → Secondary1 → Secondary2)
6. 所有副本写入成功后,Primary 返回 ACK 给客户端
7. 客户端通知 Master 写入完成

副本策略: - 默认 3 副本(1 主 2 从) - 副本放置策略:不同机架/可用区,防止整机架故障 - 纠删码(Erasure Coding):存储开销更低(1.5x vs 3x),但恢复更慢

Python
# 副本放置算法
def choose_chunk_servers(num_replicas=3, existing_servers=None):
    """选择存放副本的服务器"""
    selected = []
    racks_used = set()

    candidates = sorted(chunk_servers,
                       key=lambda s: s.available_space,  # lambda匿名函数:简洁的单行函数
                       reverse=True)

    for server in candidates:
        if len(selected) >= num_replicas:
            break
        # 确保不同机架
        if server.rack_id not in racks_used or len(racks_used) >= num_replicas:
            selected.append(server)
            racks_used.add(server.rack_id)

    return selected

一致性保证: - 写一致性: 所有副本都写入成功才返回成功 - 读一致性: 默认从任意副本读取(最终一致性) - 强一致性模式: 读主副本,或版本号比较

Master 高可用: - Master 单点问题:Active-Standby 模式 - 元数据持久化:Operation Log + Checkpoint - Master 状态恢复:加载 Checkpoint + 重放 Operation Log - ChunkServer 心跳:定期上报持有的 chunk 列表

数据完整性: - 每个 chunk 存储校验和(Checksum) - 读取时验证校验和,发现损坏从其他副本恢复 - 后台定期扫描验证数据完整性

垃圾回收: - 文件删除不立即释放空间,先标记为"待删除" - 后台 GC 进程延迟删除(通常 3 天后),防误删 - 孤儿 chunk 检测:Master 中无引用的 chunk

12.4 扩展讨论

性能优化: - 数据本地化:计算任务分发到数据所在节点 - 写入优化:客户端缓冲后批量写入 - 读取优化:就近读取 + 预读取 - 小文件合并:多个小文件打包成一个大文件

故障处理: - ChunkServer 故障:Master 检测到后在其他节点创建新副本 - Master 故障:自动切换到 Standby - 数据中心故障:跨数据中心副本或备份

监控告警: - 存储使用率(整体和单节点) - 副本完整性(under-replicated chunks) - 读写延迟 - Master 元数据大小


总结:系统设计面试核心考点

必须掌握的基础概念

概念 要点
CAP 定理 一致性/可用性/分区容错性三选二
数据分片 哈希分片 vs 范围分片
复制 主从复制 / 多主复制 / 无主复制
一致性模型 强一致/最终一致/因果一致
负载均衡 轮询/加权/一致性哈希/最少连接

面试沟通技巧

  1. 不要急于画架构: 先花 5 分钟充分理解需求
  2. 主动做取舍: 说明为什么选 A 不选 B
  3. 数字说话: 容量估算定量分析
  4. 由粗到细: 先画大图,再深入核心模块
  5. 考虑故障: 主动聊 What if 场景
  6. 知道边界: 承认不确定的地方,展示学习能力

准备建议: 每道题至少练习画一遍架构 + 讲一遍完整的设计思路。面试时重点在于展示结构化思维和工程权衡能力,而非背诵标准答案。


2028年趋势:AI系统设计成为面试标配

传统系统设计(如上述12题)仍然是基础功,但2026年之后,大厂面试已大量增加AI系统设计题型。AI系统设计与传统系统设计的核心差异:

维度 传统系统设计 AI系统设计
核心组件 数据库/缓存/消息队列 模型训练/推理服务/特征系统
评估 功能正确性 离线指标 + 在线A/B测试
迭代 代码更新 数据 + 模型 + 代码三者联动
特有挑战 高并发/一致性 数据偏差/模型漂移/幻觉/GPU成本
容量估算 QPS/存储/带宽 + GPU显存/推理延迟/训练FLOPs

高频AI系统设计题型(详见 AI系统设计面试/): - 设计推荐系统(抖音/小红书) - 设计RAG企业知识库问答系统 - 设计LLM推理服务(日调用亿级) - 设计多Agent协作系统 - 设计多模态内容理解系统 - 设计大模型训练平台

建议同时学习本章(传统基础)+ AI系统设计面试(AI专项),两者互补。