02 - 推理优化技术¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
📌 定位说明:本章侧重推理优化的算法原理与技术研究。 - 📖 推理优化的工程部署实践请参考 LLM应用/12-推理优化
学习目标:理解并掌握大模型推理优化的核心技术,包括KV Cache、量化、连续批处理等。
1. 推理 vs 训练¶
1.1 核心区别¶
| 方面 | 训练 | 推理 |
|---|---|---|
| 目标 | 更新权重,最小化损失 | 生成输出,最小化延迟 |
| 计算 | 前向 + 反向传播 | 只有前向传播 |
| 内存 | 参数 + 梯度 + 优化器状态 + 激活 | 主要是参数 + KV Cache |
| 批处理 | 固定batch size | 动态批处理 |
| 精度 | FP32/FP16/BF16 | 可量化到INT8/INT4 |
1.2 推理的性能指标¶
Text Only
1. 吞吐量 (Throughput): tokens/second
- 系统整体处理能力
2. 延迟 (Latency): ms/token
- 单个请求响应时间
- 首token延迟 (Time To First Token, TTFT)
- 每token延迟 (Time Per Output Token, TPOT)
3. 显存占用
- 模型权重
- KV Cache
- 激活值
2. KV Cache:解码加速的核心¶
2.1 为什么需要KV Cache?¶
自回归生成的计算冗余:
Text Only
生成第1个token:
输入: [<sos>]
计算: Q, K, V for position 1
生成第2个token:
输入: [<sos>, token_1]
计算: Q, K, V for position 1, 2
生成第3个token:
输入: [<sos>, token_1, token_2]
计算: Q, K, V for position 1, 2, 3
问题:每次都要重新计算之前位置的K和V!
解决方案:缓存之前计算的K和V
Text Only
生成第1个token:
计算: K1, V1
缓存: [K1], [V1]
生成第2个token:
计算: K2, V2
缓存: [K1, K2], [V1, V2]
注意力: Q2与[K1, K2]计算
生成第3个token:
计算: K3, V3
缓存: [K1, K2, K3], [V1, V2, V3]
注意力: Q3与[K1, K2, K3]计算
2.2 KV Cache的内存占用¶
Text Only
假设:
- batch_size = b
- seq_len = s
- num_layers = l
- num_heads = h
- head_dim = d
- 精度: FP16 (2字节)
每层KV Cache大小:
K: b × h × s × d × 2字节
V: b × h × s × d × 2字节
总计: 4 × b × h × s × d 字节
总KV Cache:
4 × b × h × s × d × l 字节
示例 (LLaMA-7B):
b=1, s=2048, l=32, h=32, d=128
KV Cache = 4 × 1 × 32 × 2048 × 128 × 32
= 1,073,741,824 字节
= 1 GB!
2.3 KV Cache的实现¶
Python
class KVCache:
"""KV Cache实现"""
def __init__(self, num_layers, batch_size, num_heads, max_seq_len, head_dim):
self.num_layers = num_layers
self.batch_size = batch_size
self.num_heads = num_heads
self.head_dim = head_dim
# 预分配内存
self.k_cache = torch.zeros(
num_layers, batch_size, num_heads, max_seq_len, head_dim
)
self.v_cache = torch.zeros(
num_layers, batch_size, num_heads, max_seq_len, head_dim
)
# 当前缓存长度
self.seq_len = 0
def update(self, layer_idx, new_k, new_v):
"""
更新KV Cache
Args:
layer_idx: 层索引
new_k: [batch, num_heads, new_seq_len, head_dim]
new_v: [batch, num_heads, new_seq_len, head_dim]
"""
new_seq_len = new_k.size(2)
# 写入新值
self.k_cache[layer_idx, :, :, self.seq_len:self.seq_len+new_seq_len] = new_k
self.v_cache[layer_idx, :, :, self.seq_len:self.seq_len+new_seq_len] = new_v
# 更新长度
if layer_idx == 0: # 只在第一层更新
self.seq_len += new_seq_len
def get(self, layer_idx):
"""获取当前层的KV Cache"""
return (
self.k_cache[layer_idx, :, :, :self.seq_len],
self.v_cache[layer_idx, :, :, :self.seq_len]
)
2.4 Multi-Query Attention (MQA) 和 Grouped-Query Attention (GQA)¶
问题:标准多头注意力中,每个头有自己的K和V,KV Cache占用大
MQA解决方案:所有头共享同一组K和V
Text Only
标准MHA:
头1: Q1, K1, V1
头2: Q2, K2, V2
...
KV Cache: b × h × s × d × 2 × l × 2字节
MQA:
头1: Q1, K_shared, V_shared
头2: Q2, K_shared, V_shared
...
KV Cache: b × 1 × s × d × 2 × l × 2字节
节省: h倍内存!
GQA折中方案:头分成g组,每组共享K和V
Text Only
GQA (g=4组,h=32头):
组1 (头1-8): 共享K1, V1
组2 (头9-16): 共享K2, V2
组3 (头17-24): 共享K3, V3
组4 (头25-32): 共享K4, V4
KV Cache: b × 4 × s × d × 2 × l × 2字节
节省: 8倍内存(相比MHA)
3. 量化 (Quantization)¶
3.1 为什么需要量化?¶
Text Only
FP32模型: 7B × 4字节 = 28 GB
FP16模型: 7B × 2字节 = 14 GB
INT8模型: 7B × 1字节 = 7 GB
INT4模型: 7B × 0.5字节 = 3.5 GB
量化可以:
- 减少显存占用
- 提高推理速度(支持INT8/INT4的硬件)
- 降低功耗
3.2 量化类型¶
训练后量化 (PTQ)¶
Text Only
流程:
1. 训练好的FP32模型
2. 校准(可选):用少量数据确定缩放因子
3. 量化权重到INT8/INT4
4. 推理时反量化回FP16计算
优点:
- 简单快速
- 不需要重新训练
缺点:
- 可能损失精度
量化感知训练 (QAT)¶
Text Only
流程:
1. 在训练时模拟量化
2. 前向传播:权重先量化再反量化
3. 反向传播:梯度传到原始权重
4. 模型学会适应量化误差
优点:
- 精度损失小
缺点:
- 需要重新训练
- 训练时间长
3.3 量化方法¶
对称量化¶
Text Only
公式:
scale = max(|W|) / 127 (INT8)
W_quant = round(W / scale)
W_dequant = W_quant × scale
示例:
W = [-10, -5, 0, 5, 10]
max_abs = 10
scale = 10 / 127 ≈ 0.0787
W_quant = [-127, -64, 0, 64, 127]
非对称量化¶
Text Only
公式:
scale = (max(W) - min(W)) / 255
zero_point = round(-min(W) / scale)
W_quant = round(W / scale) + zero_point
W_dequant = (W_quant - zero_point) × scale
适用:权重分布不对称时
GPTQ (Group-wise Post-Training Quantization)¶
Text Only
核心思想:
- 逐层量化
- 使用OBS (Optimal Brain Surgeon) 方法最小化误差
- 分组量化(如每128列一组),每组有自己的缩放因子
优点:
- 4-bit量化精度接近FP16
- 适合大模型
AWQ (Activation-aware Weight Quantization)¶
3.4 量化实现示例¶
Python
import torch
import torch.nn as nn
def quantize_tensor(x, num_bits=8):
"""
简单的对称量化
Args:
x: 输入张量
num_bits: 量化位数
Returns:
x_quant: 量化后的张量
scale: 缩放因子
"""
qmax = 2 ** (num_bits - 1) - 1
qmin = -(2 ** (num_bits - 1))
# 计算缩放因子
max_val = x.abs().max()
scale = max_val / qmax
# 量化
x_quant = torch.clamp(torch.round(x / scale), qmin, qmax)
return x_quant, scale
def dequantize_tensor(x_quant, scale):
"""反量化"""
return x_quant * scale
class QuantizedLinear(nn.Module):
"""量化线性层"""
def __init__(self, in_features, out_features, num_bits=8):
super().__init__() # super()调用父类方法
self.in_features = in_features
self.out_features = out_features
self.num_bits = num_bits
# 存储量化后的权重
self.register_buffer('weight_quant', torch.randint(-128, 127, (out_features, in_features)))
self.register_buffer('weight_scale', torch.tensor(1.0))
self.bias = nn.Parameter(torch.zeros(out_features))
def forward(self, x):
# 反量化权重
weight = self.weight_quant.float() * self.weight_scale
# 正常计算
return F.linear(x, weight, self.bias)
@torch.no_grad() # 禁用梯度计算,节省内存(推理时使用)
def quantize(self, weight_fp32):
"""量化权重"""
weight_quant, scale = quantize_tensor(weight_fp32, self.num_bits)
self.weight_quant.copy_(weight_quant)
self.weight_scale.copy_(scale)
4. 连续批处理 (Continuous Batching)¶
4.1 传统批处理的问题¶
Text Only
批处理大小=3:
请求1: [生成中............] 完成时间: 100ms
请求2: [生成中..] 完成时间: 30ms
请求3: [生成中........] 完成时间: 60ms
问题:
- 请求2完成后,GPU空闲等待其他请求
- GPU利用率低
4.2 连续批处理(动态批处理)¶
Text Only
核心思想:
- 当一个请求完成,立即加入新请求
- 保持GPU始终满载
时间线:
t=0: [req1, req2, req3]
t=30: [req1, req3, req4] (req2完成,加入req4)
t=60: [req1, req4, req5] (req3完成,加入req5)
t=100:[req4, req5, req6] (req1完成,加入req6)
优势:
- GPU利用率接近100%
- 吞吐量大幅提升
4.3 实现要点¶
Python
class ContinuousBatchingScheduler:
"""连续批处理调度器"""
def __init__(self, max_batch_size, max_seq_len):
self.max_batch_size = max_batch_size
self.max_seq_len = max_seq_len
self.active_requests = []
self.waiting_queue = []
def add_request(self, request):
"""添加新请求到等待队列"""
self.waiting_queue.append(request)
def schedule(self):
"""
调度请求
返回当前批次的请求列表
"""
# 1. 移除已完成的请求
self.active_requests = [r for r in self.active_requests if not r.is_done()]
# 2. 从等待队列补充新请求
while (len(self.active_requests) < self.max_batch_size and
self.waiting_queue):
request = self.waiting_queue.pop(0)
self.active_requests.append(request)
return self.active_requests
def step(self):
"""执行一步生成"""
batch = self.schedule()
# 构建batch输入
input_ids = [r.get_next_input() for r in batch]
# 批量推理
outputs = model.generate_batch(input_ids)
# 分发结果
for request, output in zip(batch, outputs): # zip按位置配对多个可迭代对象
request.append_token(output)
5. 投机采样 (Speculative Decoding)¶
5.1 核心思想¶
Text Only
问题:
- 大模型生成每个token都很慢
- 但小模型生成很快(虽然质量差)
解决方案:
1. 用小模型(draft model)快速生成k个候选token
2. 用大模型(target model)一次性验证这k个token
3. 接受匹配的token,拒绝后重新采样
效果:
- 如果小模型质量尚可,大部分token会被接受
- 推理速度提升2-3倍
5.2 算法流程¶
Python
def speculative_decoding(draft_model, target_model, input_ids, max_new_tokens):
"""
投机采样
Args:
draft_model: 小模型(如7B)
target_model: 大模型(如70B)
input_ids: 输入token IDs
max_new_tokens: 最大生成token数
"""
accepted_tokens = []
while len(accepted_tokens) < max_new_tokens:
# 1. 小模型生成k个候选token
k = 5
draft_tokens = draft_model.generate(input_ids, max_new_tokens=k)
# 2. 大模型验证(一次前向传播)
target_logits = target_model(input_ids + draft_tokens)
# 3. 逐个验证
for i, draft_token in enumerate(draft_tokens): # enumerate同时获取索引和元素
# 计算接受概率
q = draft_model.get_prob(input_ids + accepted_tokens, draft_token)
p = target_model.get_prob(input_ids + accepted_tokens, draft_token)
# 接受条件
if torch.rand(1).item() < min(1, p / q):
accepted_tokens.append(draft_token)
else:
# 拒绝,从调整后的分布采样
new_token = sample_from_distribution(p - q)
accepted_tokens.append(new_token)
break
# 更新输入
input_ids = input_ids + accepted_tokens[-k:]
return accepted_tokens
6. 其他优化技术¶
6.1 Flash Attention¶
Text Only
核心思想:
- 标准Attention需要存储O(n²)的注意力矩阵
- Flash Attention分块计算,不存储完整矩阵
- 在SRAM(高速缓存)中完成计算
效果:
- 内存从O(n²)降到O(n)
- 速度提升2-4倍
- 支持更长序列
6.2 Page Attention (vLLM)¶
Text Only
核心思想:
- 将KV Cache分页管理(类似操作系统虚拟内存)
- 不同序列可以共享页面(用于beam search)
- 动态分配和释放页面
效果:
- 内存利用率接近100%
- 支持更大batch size
- 支持beam search而不增加内存
6.3 模型并行¶
Text Only
数据并行 (DP):
- 每个GPU有完整模型
- 不同GPU处理不同batch
- 适合模型能放入单卡的情况
张量并行 (TP):
- 每层切分到多个GPU
- 每GPU只存储部分权重
- 通信量大,适合单机多卡
流水线并行 (PP):
- 不同层在不同GPU
- 像流水线一样处理
- 适合模型太大无法放入单卡
7. 实践指南¶
7.1 选择合适的优化组合¶
Text Only
场景1: 单卡推理,追求最低延迟
-> FP16 + KV Cache + Flash Attention
场景2: 单卡推理,追求最大吞吐
-> INT8/INT4量化 + Continuous Batching
场景3: 多卡推理,大模型
-> 张量并行 + Pipeline并行 + KV Cache
场景4: 服务化部署
-> vLLM (Page Attention + Continuous Batching)
场景5: 极致速度
-> 投机采样 + 量化 + 所有其他优化
7.2 性能测试¶
Python
def benchmark(model, tokenizer, prompts, max_tokens=100):
"""
基准测试
"""
import time
total_tokens = 0
total_time = 0
for prompt in prompts:
inputs = tokenizer(prompt, return_tensors="pt")
start = time.time()
outputs = model.generate(**inputs, max_new_tokens=max_tokens)
end = time.time()
num_tokens = outputs.shape[1] - inputs.input_ids.shape[1]
total_tokens += num_tokens
total_time += end - start
throughput = total_tokens / total_time
latency = total_time / len(prompts)
print(f"吞吐量: {throughput:.2f} tokens/s")
print(f"平均延迟: {latency:.2f} s")
print(f"每token延迟: {total_time/total_tokens*1000:.2f} ms")
8. 下一步¶
完成本节后,你应该: - [ ] 理解KV Cache的原理和实现 - [ ] 理解量化的原理和方法 - [ ] 了解连续批处理和投机采样 - [ ] 了解Flash Attention和Page Attention - [ ] 能够选择合适的优化策略
最后更新日期:2026-02-12 适用版本:LLM学习教程 v2026
下一步:03-系统与工程 - 学习大模型的系统工程实践