跳转至

02 - 推理优化技术

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

📌 定位说明:本章侧重推理优化的算法原理与技术研究。 - 📖 推理优化的工程部署实践请参考 LLM应用/12-推理优化

学习目标:理解并掌握大模型推理优化的核心技术,包括KV Cache、量化、连续批处理等。


1. 推理 vs 训练

1.1 核心区别

方面 训练 推理
目标 更新权重,最小化损失 生成输出,最小化延迟
计算 前向 + 反向传播 只有前向传播
内存 参数 + 梯度 + 优化器状态 + 激活 主要是参数 + KV Cache
批处理 固定batch size 动态批处理
精度 FP32/FP16/BF16 可量化到INT8/INT4

1.2 推理的性能指标

Text Only
1. 吞吐量 (Throughput): tokens/second
   - 系统整体处理能力

2. 延迟 (Latency): ms/token
   - 单个请求响应时间
   - 首token延迟 (Time To First Token, TTFT)
   - 每token延迟 (Time Per Output Token, TPOT)

3. 显存占用
   - 模型权重
   - KV Cache
   - 激活值

2. KV Cache:解码加速的核心

2.1 为什么需要KV Cache?

自回归生成的计算冗余

Text Only
生成第1个token:
  输入: [<sos>]
  计算: Q, K, V for position 1

生成第2个token:
  输入: [<sos>, token_1]
  计算: Q, K, V for position 1, 2

生成第3个token:
  输入: [<sos>, token_1, token_2]
  计算: Q, K, V for position 1, 2, 3

问题:每次都要重新计算之前位置的K和V!

解决方案:缓存之前计算的K和V

Text Only
生成第1个token:
  计算: K1, V1
  缓存: [K1], [V1]

生成第2个token:
  计算: K2, V2
  缓存: [K1, K2], [V1, V2]
  注意力: Q2与[K1, K2]计算

生成第3个token:
  计算: K3, V3
  缓存: [K1, K2, K3], [V1, V2, V3]
  注意力: Q3与[K1, K2, K3]计算

2.2 KV Cache的内存占用

Text Only
假设:
- batch_size = b
- seq_len = s
- num_layers = l
- num_heads = h
- head_dim = d
- 精度: FP16 (2字节)

每层KV Cache大小:
  K: b × h × s × d × 2字节
  V: b × h × s × d × 2字节
  总计: 4 × b × h × s × d 字节

总KV Cache:
  4 × b × h × s × d × l 字节

示例 (LLaMA-7B):
  b=1, s=2048, l=32, h=32, d=128
  KV Cache = 4 × 1 × 32 × 2048 × 128 × 32
           = 1,073,741,824 字节
           = 1 GB!

2.3 KV Cache的实现

Python
class KVCache:
    """KV Cache实现"""

    def __init__(self, num_layers, batch_size, num_heads, max_seq_len, head_dim):
        self.num_layers = num_layers
        self.batch_size = batch_size
        self.num_heads = num_heads
        self.head_dim = head_dim

        # 预分配内存
        self.k_cache = torch.zeros(
            num_layers, batch_size, num_heads, max_seq_len, head_dim
        )
        self.v_cache = torch.zeros(
            num_layers, batch_size, num_heads, max_seq_len, head_dim
        )

        # 当前缓存长度
        self.seq_len = 0

    def update(self, layer_idx, new_k, new_v):
        """
        更新KV Cache

        Args:
            layer_idx: 层索引
            new_k: [batch, num_heads, new_seq_len, head_dim]
            new_v: [batch, num_heads, new_seq_len, head_dim]
        """
        new_seq_len = new_k.size(2)

        # 写入新值
        self.k_cache[layer_idx, :, :, self.seq_len:self.seq_len+new_seq_len] = new_k
        self.v_cache[layer_idx, :, :, self.seq_len:self.seq_len+new_seq_len] = new_v

        # 更新长度
        if layer_idx == 0:  # 只在第一层更新
            self.seq_len += new_seq_len

    def get(self, layer_idx):
        """获取当前层的KV Cache"""
        return (
            self.k_cache[layer_idx, :, :, :self.seq_len],
            self.v_cache[layer_idx, :, :, :self.seq_len]
        )

2.4 Multi-Query Attention (MQA) 和 Grouped-Query Attention (GQA)

问题:标准多头注意力中,每个头有自己的K和V,KV Cache占用大

MQA解决方案:所有头共享同一组K和V

Text Only
标准MHA:
  头1: Q1, K1, V1
  头2: Q2, K2, V2
  ...
  KV Cache: b × h × s × d × 2 × l × 2字节

MQA:
  头1: Q1, K_shared, V_shared
  头2: Q2, K_shared, V_shared
  ...
  KV Cache: b × 1 × s × d × 2 × l × 2字节

节省: h倍内存!

GQA折中方案:头分成g组,每组共享K和V

Text Only
GQA (g=4组,h=32头):
  组1 (头1-8): 共享K1, V1
  组2 (头9-16): 共享K2, V2
  组3 (头17-24): 共享K3, V3
  组4 (头25-32): 共享K4, V4

KV Cache: b × 4 × s × d × 2 × l × 2字节
节省: 8倍内存(相比MHA)

3. 量化 (Quantization)

3.1 为什么需要量化?

Text Only
FP32模型: 7B × 4字节 = 28 GB
FP16模型: 7B × 2字节 = 14 GB
INT8模型: 7B × 1字节 = 7 GB
INT4模型: 7B × 0.5字节 = 3.5 GB

量化可以:
- 减少显存占用
- 提高推理速度(支持INT8/INT4的硬件)
- 降低功耗

3.2 量化类型

训练后量化 (PTQ)

Text Only
流程:
1. 训练好的FP32模型
2. 校准(可选):用少量数据确定缩放因子
3. 量化权重到INT8/INT4
4. 推理时反量化回FP16计算

优点:
- 简单快速
- 不需要重新训练

缺点:
- 可能损失精度

量化感知训练 (QAT)

Text Only
流程:
1. 在训练时模拟量化
2. 前向传播:权重先量化再反量化
3. 反向传播:梯度传到原始权重
4. 模型学会适应量化误差

优点:
- 精度损失小

缺点:
- 需要重新训练
- 训练时间长

3.3 量化方法

对称量化

Text Only
公式:
  scale = max(|W|) / 127  (INT8)
  W_quant = round(W / scale)
  W_dequant = W_quant × scale

示例:
  W = [-10, -5, 0, 5, 10]
  max_abs = 10
  scale = 10 / 127 ≈ 0.0787
  W_quant = [-127, -64, 0, 64, 127]

非对称量化

Text Only
公式:
  scale = (max(W) - min(W)) / 255
  zero_point = round(-min(W) / scale)
  W_quant = round(W / scale) + zero_point
  W_dequant = (W_quant - zero_point) × scale

适用:权重分布不对称时

GPTQ (Group-wise Post-Training Quantization)

Text Only
核心思想:
- 逐层量化
- 使用OBS (Optimal Brain Surgeon) 方法最小化误差
- 分组量化(如每128列一组),每组有自己的缩放因子

优点:
- 4-bit量化精度接近FP16
- 适合大模型

AWQ (Activation-aware Weight Quantization)

Text Only
核心思想:
- 不是所有权重都同样重要
- 根据激活值大小,保护重要的权重通道
- 对重要通道使用更高精度

优点:
- 比GPTQ更好的4-bit精度
- 推理速度更快

3.4 量化实现示例

Python
import torch
import torch.nn as nn

def quantize_tensor(x, num_bits=8):
    """
    简单的对称量化

    Args:
        x: 输入张量
        num_bits: 量化位数

    Returns:
        x_quant: 量化后的张量
        scale: 缩放因子
    """
    qmax = 2 ** (num_bits - 1) - 1
    qmin = -(2 ** (num_bits - 1))

    # 计算缩放因子
    max_val = x.abs().max()
    scale = max_val / qmax

    # 量化
    x_quant = torch.clamp(torch.round(x / scale), qmin, qmax)

    return x_quant, scale

def dequantize_tensor(x_quant, scale):
    """反量化"""
    return x_quant * scale

class QuantizedLinear(nn.Module):
    """量化线性层"""

    def __init__(self, in_features, out_features, num_bits=8):
        super().__init__()  # super()调用父类方法
        self.in_features = in_features
        self.out_features = out_features
        self.num_bits = num_bits

        # 存储量化后的权重
        self.register_buffer('weight_quant', torch.randint(-128, 127, (out_features, in_features)))
        self.register_buffer('weight_scale', torch.tensor(1.0))

        self.bias = nn.Parameter(torch.zeros(out_features))

    def forward(self, x):
        # 反量化权重
        weight = self.weight_quant.float() * self.weight_scale

        # 正常计算
        return F.linear(x, weight, self.bias)

    @torch.no_grad()  # 禁用梯度计算,节省内存(推理时使用)
    def quantize(self, weight_fp32):
        """量化权重"""
        weight_quant, scale = quantize_tensor(weight_fp32, self.num_bits)
        self.weight_quant.copy_(weight_quant)
        self.weight_scale.copy_(scale)

4. 连续批处理 (Continuous Batching)

4.1 传统批处理的问题

Text Only
批处理大小=3:

请求1: [生成中............] 完成时间: 100ms
请求2: [生成中..]           完成时间: 30ms
请求3: [生成中........]     完成时间: 60ms

问题:
- 请求2完成后,GPU空闲等待其他请求
- GPU利用率低

4.2 连续批处理(动态批处理)

Text Only
核心思想:
- 当一个请求完成,立即加入新请求
- 保持GPU始终满载

时间线:
t=0:  [req1, req2, req3]
t=30: [req1, req3, req4]  (req2完成,加入req4)
t=60: [req1, req4, req5]  (req3完成,加入req5)
t=100:[req4, req5, req6]  (req1完成,加入req6)

优势:
- GPU利用率接近100%
- 吞吐量大幅提升

4.3 实现要点

Python
class ContinuousBatchingScheduler:
    """连续批处理调度器"""

    def __init__(self, max_batch_size, max_seq_len):
        self.max_batch_size = max_batch_size
        self.max_seq_len = max_seq_len
        self.active_requests = []
        self.waiting_queue = []

    def add_request(self, request):
        """添加新请求到等待队列"""
        self.waiting_queue.append(request)

    def schedule(self):
        """
        调度请求

        返回当前批次的请求列表
        """
        # 1. 移除已完成的请求
        self.active_requests = [r for r in self.active_requests if not r.is_done()]

        # 2. 从等待队列补充新请求
        while (len(self.active_requests) < self.max_batch_size and
               self.waiting_queue):
            request = self.waiting_queue.pop(0)
            self.active_requests.append(request)

        return self.active_requests

    def step(self):
        """执行一步生成"""
        batch = self.schedule()

        # 构建batch输入
        input_ids = [r.get_next_input() for r in batch]

        # 批量推理
        outputs = model.generate_batch(input_ids)

        # 分发结果
        for request, output in zip(batch, outputs):  # zip按位置配对多个可迭代对象
            request.append_token(output)

5. 投机采样 (Speculative Decoding)

5.1 核心思想

Text Only
问题:
- 大模型生成每个token都很慢
- 但小模型生成很快(虽然质量差)

解决方案:
1. 用小模型(draft model)快速生成k个候选token
2. 用大模型(target model)一次性验证这k个token
3. 接受匹配的token,拒绝后重新采样

效果:
- 如果小模型质量尚可,大部分token会被接受
- 推理速度提升2-3倍

5.2 算法流程

Python
def speculative_decoding(draft_model, target_model, input_ids, max_new_tokens):
    """
    投机采样

    Args:
        draft_model: 小模型(如7B)
        target_model: 大模型(如70B)
        input_ids: 输入token IDs
        max_new_tokens: 最大生成token数
    """
    accepted_tokens = []

    while len(accepted_tokens) < max_new_tokens:
        # 1. 小模型生成k个候选token
        k = 5
        draft_tokens = draft_model.generate(input_ids, max_new_tokens=k)

        # 2. 大模型验证(一次前向传播)
        target_logits = target_model(input_ids + draft_tokens)

        # 3. 逐个验证
        for i, draft_token in enumerate(draft_tokens):  # enumerate同时获取索引和元素
            # 计算接受概率
            q = draft_model.get_prob(input_ids + accepted_tokens, draft_token)
            p = target_model.get_prob(input_ids + accepted_tokens, draft_token)

            # 接受条件
            if torch.rand(1).item() < min(1, p / q):
                accepted_tokens.append(draft_token)
            else:
                # 拒绝,从调整后的分布采样
                new_token = sample_from_distribution(p - q)
                accepted_tokens.append(new_token)
                break

        # 更新输入
        input_ids = input_ids + accepted_tokens[-k:]

    return accepted_tokens

6. 其他优化技术

6.1 Flash Attention

Text Only
核心思想:
- 标准Attention需要存储O(n²)的注意力矩阵
- Flash Attention分块计算,不存储完整矩阵
- 在SRAM(高速缓存)中完成计算

效果:
- 内存从O(n²)降到O(n)
- 速度提升2-4倍
- 支持更长序列

6.2 Page Attention (vLLM)

Text Only
核心思想:
- 将KV Cache分页管理(类似操作系统虚拟内存)
- 不同序列可以共享页面(用于beam search)
- 动态分配和释放页面

效果:
- 内存利用率接近100%
- 支持更大batch size
- 支持beam search而不增加内存

6.3 模型并行

Text Only
数据并行 (DP):
- 每个GPU有完整模型
- 不同GPU处理不同batch
- 适合模型能放入单卡的情况

张量并行 (TP):
- 每层切分到多个GPU
- 每GPU只存储部分权重
- 通信量大,适合单机多卡

流水线并行 (PP):
- 不同层在不同GPU
- 像流水线一样处理
- 适合模型太大无法放入单卡

7. 实践指南

7.1 选择合适的优化组合

Text Only
场景1: 单卡推理,追求最低延迟
  -> FP16 + KV Cache + Flash Attention

场景2: 单卡推理,追求最大吞吐
  -> INT8/INT4量化 + Continuous Batching

场景3: 多卡推理,大模型
  -> 张量并行 + Pipeline并行 + KV Cache

场景4: 服务化部署
  -> vLLM (Page Attention + Continuous Batching)

场景5: 极致速度
  -> 投机采样 + 量化 + 所有其他优化

7.2 性能测试

Python
def benchmark(model, tokenizer, prompts, max_tokens=100):
    """
    基准测试
    """
    import time

    total_tokens = 0
    total_time = 0

    for prompt in prompts:
        inputs = tokenizer(prompt, return_tensors="pt")

        start = time.time()
        outputs = model.generate(**inputs, max_new_tokens=max_tokens)
        end = time.time()

        num_tokens = outputs.shape[1] - inputs.input_ids.shape[1]
        total_tokens += num_tokens
        total_time += end - start

    throughput = total_tokens / total_time
    latency = total_time / len(prompts)

    print(f"吞吐量: {throughput:.2f} tokens/s")
    print(f"平均延迟: {latency:.2f} s")
    print(f"每token延迟: {total_time/total_tokens*1000:.2f} ms")

8. 下一步

完成本节后,你应该: - [ ] 理解KV Cache的原理和实现 - [ ] 理解量化的原理和方法 - [ ] 了解连续批处理和投机采样 - [ ] 了解Flash Attention和Page Attention - [ ] 能够选择合适的优化策略


最后更新日期:2026-02-12 适用版本:LLM学习教程 v2026

下一步03-系统与工程 - 学习大模型的系统工程实践