跳转至

03 - 分布式推理

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

让多台机器协同工作,突破单机限制

📎 交叉引用:本章侧重张量并行/流水线并行的底层原理与实现。vLLM/SGLang框架级分布式推理请参考 LLM应用/12-推理优化 §12.5,分布式训练请参考 深度学习/06-高级主题/09-分布式训练

📖 章节概述

本章将深入探讨分布式推理技术,包括模型并行、数据并行和流水线并行等方法。这些技术可以将大模型分布到多个GPU或机器上,突破单机显存和计算限制。

🎯 学习目标

完成本章后,你将能够:

  • 理解分布式推理的基本原理
  • 掌握模型并行、数据并行、流水线并行的实现方法
  • 了解不同并行策略的适用场景
  • 能够设计和实现分布式推理系统

1. 分布式推理概述

1.1 为什么需要分布式推理?

显存限制: - 大型模型(如70B参数)需要超过140GB显存 - 单卡GPU通常只有24-80GB显存 - 需要将模型分布到多个GPU

计算需求: - 推理延迟要求高 - 吞吐量需求大 - 需要并行计算加速

1.2 分布式推理类型

Text Only
分布式推理
├── 模型并行 (Model Parallelism)
│   ├── 张量并行 (Tensor Parallelism)
│   └── 流水线并行 (Pipeline Parallelism)
├── 数据并行 (Data Parallelism)
│   ├── 同步数据并行
│   └── 异步数据并行
└── 混合并行 (Hybrid Parallelism)
    ├── 模型并行 + 数据并行
    └── 流水线并行 + 数据并行

2. 模型并行

2.1 张量并行

张量并行将模型的张量(权重、激活值)切分到多个GPU上。

原理: - 将大矩阵切分为多个小矩阵 - 每个GPU计算一部分 - 通过通信合并结果

Python
import torch
import torch.nn as nn
import torch.distributed as dist

class ColumnParallelLinear(nn.Module):
    """
    列并行线性层(Column Parallel Linear)

    将权重矩阵按列切分(输出维度切分),每个GPU计算输出的不同部分。
    适用于:Transformer的FFN层、Attention输出投影等

    通信方式:all_gather(收集所有GPU的部分输出,拼接成完整输出)
    """
    def __init__(self, in_features, out_features, world_size, rank):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.world_size = world_size
        self.rank = rank

        # 切分输出维度(列切分)
        self.out_per_rank = out_features // world_size

        # 每个GPU只存储部分权重(列切片)
        self.weight = nn.Parameter(
            torch.randn(self.out_per_rank, in_features)
        )
        self.bias = nn.Parameter(torch.randn(self.out_per_rank))

    def forward(self, x):
        # 本地计算:每个GPU计算 Y_local = X @ W_local^T + b_local
        local_output = torch.nn.functional.linear(x, self.weight, self.bias)

        # 列并行:使用 all_gather 收集所有GPU的输出并拼接
        # 每个GPU得到的是输出的不同列(特征维度),需要拼接而非求和
        gathered_outputs = [torch.zeros_like(local_output) for _ in range(self.world_size)]
        dist.all_gather(gathered_outputs, local_output)

        # 按列(特征维度)拼接所有GPU的输出
        output = torch.cat(gathered_outputs, dim=-1)

        return output


class RowParallelLinear(nn.Module):
    """
    行并行线性层(Row Parallel Linear)

    将权重矩阵按行切分(输入维度切分),每个GPU计算部分结果后求和。
    适用于:Transformer的第一层FFN、Attention的QKV投影等

    通信方式:all_reduce(对所有GPU的部分结果求和)
    """
    def __init__(self, in_features, out_features, world_size, rank):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.world_size = world_size
        self.rank = rank

        # 切分输入维度(行切分)
        self.in_per_rank = in_features // world_size

        # 每个GPU只存储部分权重(行切片)
        self.weight = nn.Parameter(
            torch.randn(out_features, self.in_per_rank)
        )
        # 行并行通常不加bias,或者只在rank 0上加
        if rank == 0:
            self.bias = nn.Parameter(torch.randn(out_features))
        else:
            self.register_parameter('bias', None)

    def forward(self, x):
        # 每个GPU只处理输入的部分特征
        x_partition = x[..., self.rank * self.in_per_rank : (self.rank + 1) * self.in_per_rank]

        # 本地计算:每个GPU计算 Y_local = X_local @ W_local^T
        local_output = torch.nn.functional.linear(x_partition, self.weight)

        # 行并行:使用 all_reduce 对所有GPU的结果求和
        # 每个GPU得到的是部分和,需要累加得到最终结果
        output = local_output.clone()
        dist.all_reduce(output, op=dist.ReduceOp.SUM)

        # 只在 rank 0 上添加 bias
        if self.bias is not None:
            output = output + self.bias

        return output


# 使用示例
# world_size = 4  # 4个GPU
# rank = dist.get_rank()  # 当前GPU的rank
#
# # 列并行:输出维度切分,使用 all_gather
# col_parallel = ColumnParallelLinear(768, 3072, world_size, rank)
#
# # 行并行:输入维度切分,使用 all_reduce
# row_parallel = RowParallelLinear(3072, 768, world_size, rank)

2.1.1 张量并行通信方式对比

并行类型 切分方式 通信操作 输出形状变化
列并行 输出维度切分 all_gather [B, S, out_per_rank] → [B, S, out_features]
行并行 输入维度切分 all_reduce [B, S, out_features] → [B, S, out_features]
Python
# 通信操作对比示例
import torch.distributed as dist

def demonstrate_comm_difference():
    """
    演示 all_gather 和 all_reduce 的区别
    """
    # 假设 4 个 GPU,每个 GPU 计算输出的一部分
    # GPU 0: [1, 1], GPU 1: [2, 2], GPU 2: [3, 3], GPU 3: [4, 4]

    # === all_gather(列并行使用)===
    # 收集所有GPU的结果并拼接
    # 结果: [1, 1, 2, 2, 3, 3, 4, 4] - 拼接成完整输出
    gathered = [torch.zeros(2) for _ in range(4)]
    dist.all_gather(gathered, local_output)
    full_output = torch.cat(gathered, dim=-1)

    # === all_reduce(行并行使用)===
    # 对所有GPU的结果求和
    # 结果: [1+2+3+4, 1+2+3+4] = [10, 10] - 部分和累加
    output = local_output.clone()
    dist.all_reduce(output, op=dist.ReduceOp.SUM)

### 2.2 流水线并行

流水线并行将模型的层分布到多个GPU上形成流水线

**原理**
- GPU1计算第1-3
- GPU2计算第4-6
- GPU3计算第7-9
- 数据像流水一样在GPU间流动

```python
import torch
import torch.nn as nn

class PipelineParallelModel(nn.Module):
    """
    流水线并行模型
    """
    def __init__(self, num_stages=4):
        super().__init__()
        self.stages = nn.ModuleList()

        # 创建多个阶段
        for i in range(num_stages):
            stage = nn.Sequential(
                nn.Linear(768, 768),
                nn.ReLU(),
                nn.Linear(768, 768),
                nn.ReLU()
            )
            self.stages.append(stage)

    def forward(self, x, stage_idx):
        """
        在指定阶段执行前向传播

        Args:
            x: 输入张量
            stage_idx: 阶段索引
        """
        return self.stages[stage_idx](x)

# 流水线执行
def pipeline_execute(model, inputs, num_stages=4):
    """
    执行流水线并行

    Args:
        model: 流水线模型
        inputs: 输入数据
        num_stages: 阶段数量
    """
    # 初始化各阶段输入
    stage_inputs = [None] * num_stages
    stage_inputs[0] = inputs

    # 流水线执行
    outputs = []
    for step in range(num_stages):
        # 执行当前阶段
        for stage_idx in range(num_stages):
            if stage_inputs[stage_idx] is not None:
                output = model(stage_inputs[stage_idx], stage_idx)

                # 传递到下一阶段
                if stage_idx < num_stages - 1:
                    stage_inputs[stage_idx + 1] = output
                else:
                    outputs.append(output)

        # 准备下一批输入
        if step < num_stages - 1:
            stage_inputs[0] = inputs

    return outputs

# 使用示例
# model = PipelineParallelModel(num_stages=4)
# inputs = torch.randn(32, 768)
# outputs = pipeline_execute(model, inputs)

2.3 使用Hugging Face的模型并行

Python
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

def model_parallel_inference(model_name="meta-llama/Llama-2-7b-hf"):
    """
    使用Hugging Face的模型并行

    Args:
        model_name: 模型名称或路径
    """
    # 加载分词器
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    # 加载模型,自动分布到多个GPU
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        device_map="auto",  # 自动设备映射
        torch_dtype=torch.float16
    )

    # 查看模型分布
    print("模型设备分布:")
    for name, param in model.named_parameters():
        if param.device != torch.device('cpu'):
            print(f"  {name}: {param.device}")

    # 推理
    prompt = "请介绍一下人工智能的发展历程"
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    with torch.no_grad():  # 禁用梯度计算,节省内存
        outputs = model.generate(
            **inputs,
            max_length=200,
            do_sample=True,
            temperature=0.7
        )

    result = tokenizer.decode(outputs[0], skip_special_tokens=True)
    print(result)

    return model, tokenizer

# 使用示例
# model, tokenizer = model_parallel_inference()

3. 数据并行

3.1 同步数据并行

同步数据并行在多个GPU上复制模型,每个GPU处理不同的数据批次。

Python
import os
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

def setup_distributed():
    """
    初始化分布式环境
    """
    # 从环境变量获取分布式配置
    rank = int(os.environ.get('RANK', 0))
    world_size = int(os.environ.get('WORLD_SIZE', 1))
    local_rank = int(os.environ.get('LOCAL_RANK', 0))

    # 初始化进程组
    dist.init_process_group(
        backend='nccl',
        rank=rank,
        world_size=world_size
    )

    # 设置当前设备
    torch.cuda.set_device(local_rank)

    return rank, world_size, local_rank

def data_parallel_inference(model, dataloader):
    """
    数据并行推理

    Args:
        model: 要并行化的模型
        dataloader: 数据加载器
    """
    # 初始化分布式环境
    rank, world_size, local_rank = setup_distributed()

    # 将模型移动到当前GPU
    model = model.to(local_rank)

    # 包装为DDP模型
    model = DDP(model, device_ids=[local_rank])

    # 推理
    model.eval()  # eval()评估模式
    all_outputs = []

    with torch.no_grad():
        for batch in dataloader:
            inputs, targets = batch
            inputs = inputs.to(local_rank)

            outputs = model(inputs)
            all_outputs.append(outputs.cpu())

    # 收集所有GPU的结果
    if rank == 0:
        outputs = torch.cat(all_outputs, dim=0)  # torch.cat沿已有维度拼接张量
        return outputs
    else:
        return None

# 使用示例(需要使用torchrun启动)
# torchrun --nproc_per_node=4 your_script.py

3.2 异步数据并行

异步数据并行允许GPU独立更新模型参数,无需等待其他GPU。

Python
import copy
import torch
import torch.nn as nn

class AsyncDataParallel(nn.Module):
    """
    异步数据并行
    """
    def __init__(self, model, device_ids):
        super().__init__()
        self.device_ids = device_ids
        self.models = nn.ModuleList([
            copy.deepcopy(model).to(device) for device in device_ids  # 移至GPU/CPU
        ])

    def forward(self, inputs):
        """
        异步前向传播
        """
        # 将输入分发到各个设备
        inputs_chunks = inputs.chunk(len(self.device_ids))

        # 异步执行
        outputs = []
        for model, input_chunk in zip(self.models, inputs_chunks):  # zip按位置配对
            output = model(input_chunk.to(model.device))
            outputs.append(output.cpu())

        # 合并结果
        return torch.cat(outputs, dim=0)

# 使用示例
# model = YourModel()
# parallel_model = AsyncDataParallel(model, device_ids=[0, 1, 2, 3])
# outputs = parallel_model(inputs)

⚠️ 重要说明:上述 AsyncDataParallel 类名虽含"Async",但实际是同步数据并行的演示。代码中的 torch.cuda.current_stream().synchronize() 会阻塞等待所有GPU完成。真正的异步数据并行(如 Hogwild!)需要共享内存参数和无锁梯度更新,在深度学习中较少使用。

4. 混合并行

4.1 模型并行 + 数据并行

Python
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

class HybridParallelModel(nn.Module):
    """
    混合并行模型(模型并行 + 数据并行)
    """
    def __init__(self, model_parallel_size, data_parallel_size):
        super().__init__()
        self.model_parallel_size = model_parallel_size
        self.data_parallel_size = data_parallel_size

        # 创建模型并行的子模型
        self.sub_models = nn.ModuleList()
        for i in range(model_parallel_size):
            sub_model = self._create_sub_model(i, model_parallel_size)
            self.sub_models.append(sub_model)

    def _create_sub_model(self, idx, total):
        """
        创建子模型
        """
        # 这里简化处理,实际需要根据模型架构设计
        return nn.Sequential(
            nn.Linear(768, 768),
            nn.ReLU(),
            nn.Linear(768, 768)
        )

    def forward(self, x):
        """
        前向传播
        """
        # 模型并行:每个子模型处理部分数据
        outputs = []
        for sub_model in self.sub_models:
            output = sub_model(x)
            outputs.append(output)

        # 合并结果
        output = torch.cat(outputs, dim=-1)

        return output

def hybrid_parallel_training(model, dataloader):
    """
    混合并行训练
    """
    # 初始化分布式环境
    rank = dist.get_rank()
    world_size = dist.get_world_size()

    # 计算模型并行和数据并行的大小
    model_parallel_size = 2  # 例如
    data_parallel_size = world_size // model_parallel_size

    # 创建混合并行模型
    model = HybridParallelModel(model_parallel_size, data_parallel_size)

    # 数据并行包装
    model = DDP(model)

    # 训练循环
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(10):
        for batch in dataloader:
            inputs, targets = batch

            # 前向传播
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            # 反向传播
            optimizer.zero_grad()  # 清零梯度
            loss.backward()  # 反向传播计算梯度
            optimizer.step()  # 更新参数

        print(f"Epoch {epoch+1} completed")

# 使用示例
# hybrid_parallel_training(model, dataloader)

4.2 流水线并行 + 数据并行

Python
import torch
import torch.nn as nn
import torch.distributed as dist

class PipelineDataParallel(nn.Module):
    """
    流水线并行 + 数据并行
    """
    def __init__(self, num_pipeline_stages, num_data_parallel):
        super().__init__()
        self.num_pipeline_stages = num_pipeline_stages
        self.num_data_parallel = num_data_parallel

        # 创建流水线阶段
        self.pipeline_stages = nn.ModuleList()
        for i in range(num_pipeline_stages):
            stage = self._create_stage(i, num_pipeline_stages)
            self.pipeline_stages.append(stage)

    def _create_stage(self, idx, total):
        """
        创建流水线阶段
        """
        return nn.Sequential(
            nn.Linear(768, 768),
            nn.ReLU(),
            nn.Linear(768, 768)
        )

    def forward(self, x, pipeline_idx, data_idx):
        """
        前向传播

        Args:
            x: 输入张量
            pipeline_idx: 流水线索引
            data_idx: 数据并行索引
        """
        # 执行流水线阶段
        output = x
        for stage in self.pipeline_stages:
            output = stage(output)

        return output

def pipeline_data_parallel_execute(model, dataloader):
    """
    执行流水线并行 + 数据并行
    """
    rank = dist.get_rank()
    world_size = dist.get_world_size()

    # 计算流水线并行和数据并行的大小
    num_pipeline_stages = 4  # 例如
    num_data_parallel = world_size // num_pipeline_stages

    # 计算当前rank的角色
    pipeline_idx = rank // num_data_parallel
    data_idx = rank % num_data_parallel

    # 执行推理
    model.eval()
    all_outputs = []

    with torch.no_grad():
        for batch in dataloader:
            inputs, targets = batch

            # 只处理当前数据并行的数据
            batch_size = inputs.size(0)
            chunk_size = batch_size // num_data_parallel
            start_idx = data_idx * chunk_size
            end_idx = start_idx + chunk_size

            chunk_inputs = inputs[start_idx:end_idx]

            # 执行流水线
            outputs = model(chunk_inputs, pipeline_idx, data_idx)
            all_outputs.append(outputs.cpu())

    # 收集结果
    outputs = torch.cat(all_outputs, dim=0)
    return outputs

# 使用示例
# model = PipelineDataParallel(num_pipeline_stages=4, num_data_parallel=2)
# outputs = pipeline_data_parallel_execute(model, dataloader)

5. 使用vLLM进行分布式推理

5.1 vLLM简介

vLLM是一个高性能的大模型推理库,支持高效的分布式推理。

Python
from vllm import LLM, SamplingParams

def vllm_distributed_inference(model_name="meta-llama/Llama-2-7b-hf"):
    """
    使用vLLM进行分布式推理

    Args:
        model_name: 模型名称或路径
    """
    # 初始化LLM(自动使用所有可用GPU)
    llm = LLM(
        model=model_name,
        tensor_parallel_size=4,  # 张量并行大小
        gpu_memory_utilization=0.9,  # GPU内存利用率
        max_model_len=4096  # 最大序列长度
    )

    # 设置采样参数
    sampling_params = SamplingParams(
        temperature=0.7,
        top_p=0.95,
        max_tokens=200
    )

    # 准备提示
    prompts = [
        "请介绍一下人工智能的发展历程",
        "什么是机器学习?",
        "深度学习的应用有哪些?"
    ]

    # 推理
    outputs = llm.generate(prompts, sampling_params)

    # 打印结果
    for output in outputs:
        print(f"提示: {output.prompt}")
        print(f"输出: {output.outputs[0].text}\n")

    return llm

# 使用示例
# llm = vllm_distributed_inference()

5.2 vLLM高级配置

Python
from vllm import LLM, SamplingParams

def advanced_vllm_inference(model_name="meta-llama/Llama-2-7b-hf"):
    """
    高级vLLM配置
    """
    llm = LLM(
        model=model_name,

        # 张量并行配置
        tensor_parallel_size=4,

        # 内存配置
        gpu_memory_utilization=0.9,
        swap_space=4,  # 交换空间(GB)

        # 模型配置
        max_model_len=8192,
        trust_remote_code=True,

        # 量化配置
        quantization="awq",  # 或 "gptq", "squeezellm"

        # 性能优化
        enforce_eager=False,  # 使用CUDA图优化
        disable_log_stats=False,

        # 其他配置
        dtype="float16",  # 或 "bfloat16"
        seed=42
    )

    # 批量推理
    prompts = ["提示1", "提示2", "提示3"]
    sampling_params = SamplingParams(
        temperature=0.7,
        top_p=0.95,
        max_tokens=200,
        n=1,  # 每个提示生成的数量
        best_of=1,  # 采样候选数
        presence_penalty=0.0,
        frequency_penalty=0.0
    )

    outputs = llm.generate(prompts, sampling_params)

    return outputs

# 使用示例
# outputs = advanced_vllm_inference()

6. 练习题

基础练习

  1. 实现简单的张量并行

    Python
    # TODO: 实现一个简单的张量并行线性层
    class SimpleTensorParallel(nn.Module):
        def __init__(self, in_features, out_features, world_size):
            # 你的代码
            pass
    
        def forward(self, x):
            # 你的代码
            pass
    

  2. 实现简单的流水线并行

    Python
    # TODO: 实现一个简单的流水线执行器
    class SimplePipeline:
        def __init__(self, stages):
            # 你的代码
            pass
    
        def execute(self, inputs):
            # 你的代码
            pass
    

进阶练习

  1. 实现混合并行策略

    Python
    # TODO: 实现模型并行 + 数据并行的混合策略
    class HybridParallel:
        def __init__(self, model, mp_size, dp_size):
            # 你的代码
            pass
    
        def forward(self, x):
            # 你的代码
            pass
    

  2. 实现负载均衡

    Python
    # TODO: 实现分布式推理的负载均衡
    class LoadBalancer:
        def __init__(self, devices):
            # 你的代码
            pass
    
        def assign_task(self, task):
            # 你的代码
            pass
    

项目练习

  1. 创建分布式推理框架
  2. 支持多种并行策略
  3. 自动负载均衡
  4. 性能监控和优化

7. 最佳实践

✅ 推荐做法

  1. 选择合适的并行策略
  2. 小模型:数据并行
  3. 大模型:模型并行
  4. 超大模型:混合并行

  5. 优化通信开销

  6. 减少GPU间通信
  7. 使用高效的通信后端
  8. 批量传输数据

  9. 监控性能

  10. 监控GPU利用率
  11. 监控通信开销
  12. 分析瓶颈

❌ 避免做法

  1. 过度并行化
  2. 不要使用太多GPU
  3. 考虑通信开销
  4. 评估实际收益

  5. 忽略负载均衡

  6. 确保各GPU负载均衡
  7. 避免某些GPU空闲
  8. 动态调整任务分配

  9. 不充分的测试

  10. 在小规模上测试
  11. 验证正确性
  12. 逐步扩展规模

8. 总结

本章介绍了分布式推理的核心技术:

  • 张量并行: 切分张量到多个GPU
  • 流水线并行: 切分层到多个GPU
  • 数据并行: 复制模型到多个GPU
  • 混合并行: 组合多种并行策略

选择合适的并行策略需要考虑模型大小、硬件配置和应用场景。

9. 下一步

继续学习04-云端推理服务,了解如何将模型部署到云端并提供服务。