03 - 分布式推理¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
让多台机器协同工作,突破单机限制
📎 交叉引用:本章侧重张量并行/流水线并行的底层原理与实现。vLLM/SGLang框架级分布式推理请参考 LLM应用/12-推理优化 §12.5,分布式训练请参考 深度学习/06-高级主题/09-分布式训练。
📖 章节概述¶
本章将深入探讨分布式推理技术,包括模型并行、数据并行和流水线并行等方法。这些技术可以将大模型分布到多个GPU或机器上,突破单机显存和计算限制。
🎯 学习目标¶
完成本章后,你将能够:
- 理解分布式推理的基本原理
- 掌握模型并行、数据并行、流水线并行的实现方法
- 了解不同并行策略的适用场景
- 能够设计和实现分布式推理系统
1. 分布式推理概述¶
1.1 为什么需要分布式推理?¶
显存限制: - 大型模型(如70B参数)需要超过140GB显存 - 单卡GPU通常只有24-80GB显存 - 需要将模型分布到多个GPU
计算需求: - 推理延迟要求高 - 吞吐量需求大 - 需要并行计算加速
1.2 分布式推理类型¶
分布式推理
├── 模型并行 (Model Parallelism)
│ ├── 张量并行 (Tensor Parallelism)
│ └── 流水线并行 (Pipeline Parallelism)
├── 数据并行 (Data Parallelism)
│ ├── 同步数据并行
│ └── 异步数据并行
└── 混合并行 (Hybrid Parallelism)
├── 模型并行 + 数据并行
└── 流水线并行 + 数据并行
2. 模型并行¶
2.1 张量并行¶
张量并行将模型的张量(权重、激活值)切分到多个GPU上。
原理: - 将大矩阵切分为多个小矩阵 - 每个GPU计算一部分 - 通过通信合并结果
import torch
import torch.nn as nn
import torch.distributed as dist
class ColumnParallelLinear(nn.Module):
"""
列并行线性层(Column Parallel Linear)
将权重矩阵按列切分(输出维度切分),每个GPU计算输出的不同部分。
适用于:Transformer的FFN层、Attention输出投影等
通信方式:all_gather(收集所有GPU的部分输出,拼接成完整输出)
"""
def __init__(self, in_features, out_features, world_size, rank):
super().__init__()
self.in_features = in_features
self.out_features = out_features
self.world_size = world_size
self.rank = rank
# 切分输出维度(列切分)
self.out_per_rank = out_features // world_size
# 每个GPU只存储部分权重(列切片)
self.weight = nn.Parameter(
torch.randn(self.out_per_rank, in_features)
)
self.bias = nn.Parameter(torch.randn(self.out_per_rank))
def forward(self, x):
# 本地计算:每个GPU计算 Y_local = X @ W_local^T + b_local
local_output = torch.nn.functional.linear(x, self.weight, self.bias)
# 列并行:使用 all_gather 收集所有GPU的输出并拼接
# 每个GPU得到的是输出的不同列(特征维度),需要拼接而非求和
gathered_outputs = [torch.zeros_like(local_output) for _ in range(self.world_size)]
dist.all_gather(gathered_outputs, local_output)
# 按列(特征维度)拼接所有GPU的输出
output = torch.cat(gathered_outputs, dim=-1)
return output
class RowParallelLinear(nn.Module):
"""
行并行线性层(Row Parallel Linear)
将权重矩阵按行切分(输入维度切分),每个GPU计算部分结果后求和。
适用于:Transformer的第一层FFN、Attention的QKV投影等
通信方式:all_reduce(对所有GPU的部分结果求和)
"""
def __init__(self, in_features, out_features, world_size, rank):
super().__init__()
self.in_features = in_features
self.out_features = out_features
self.world_size = world_size
self.rank = rank
# 切分输入维度(行切分)
self.in_per_rank = in_features // world_size
# 每个GPU只存储部分权重(行切片)
self.weight = nn.Parameter(
torch.randn(out_features, self.in_per_rank)
)
# 行并行通常不加bias,或者只在rank 0上加
if rank == 0:
self.bias = nn.Parameter(torch.randn(out_features))
else:
self.register_parameter('bias', None)
def forward(self, x):
# 每个GPU只处理输入的部分特征
x_partition = x[..., self.rank * self.in_per_rank : (self.rank + 1) * self.in_per_rank]
# 本地计算:每个GPU计算 Y_local = X_local @ W_local^T
local_output = torch.nn.functional.linear(x_partition, self.weight)
# 行并行:使用 all_reduce 对所有GPU的结果求和
# 每个GPU得到的是部分和,需要累加得到最终结果
output = local_output.clone()
dist.all_reduce(output, op=dist.ReduceOp.SUM)
# 只在 rank 0 上添加 bias
if self.bias is not None:
output = output + self.bias
return output
# 使用示例
# world_size = 4 # 4个GPU
# rank = dist.get_rank() # 当前GPU的rank
#
# # 列并行:输出维度切分,使用 all_gather
# col_parallel = ColumnParallelLinear(768, 3072, world_size, rank)
#
# # 行并行:输入维度切分,使用 all_reduce
# row_parallel = RowParallelLinear(3072, 768, world_size, rank)
2.1.1 张量并行通信方式对比¶
| 并行类型 | 切分方式 | 通信操作 | 输出形状变化 |
|---|---|---|---|
| 列并行 | 输出维度切分 | all_gather | [B, S, out_per_rank] → [B, S, out_features] |
| 行并行 | 输入维度切分 | all_reduce | [B, S, out_features] → [B, S, out_features] |
# 通信操作对比示例
import torch.distributed as dist
def demonstrate_comm_difference():
"""
演示 all_gather 和 all_reduce 的区别
"""
# 假设 4 个 GPU,每个 GPU 计算输出的一部分
# GPU 0: [1, 1], GPU 1: [2, 2], GPU 2: [3, 3], GPU 3: [4, 4]
# === all_gather(列并行使用)===
# 收集所有GPU的结果并拼接
# 结果: [1, 1, 2, 2, 3, 3, 4, 4] - 拼接成完整输出
gathered = [torch.zeros(2) for _ in range(4)]
dist.all_gather(gathered, local_output)
full_output = torch.cat(gathered, dim=-1)
# === all_reduce(行并行使用)===
# 对所有GPU的结果求和
# 结果: [1+2+3+4, 1+2+3+4] = [10, 10] - 部分和累加
output = local_output.clone()
dist.all_reduce(output, op=dist.ReduceOp.SUM)
### 2.2 流水线并行
流水线并行将模型的层分布到多个GPU上,形成流水线。
**原理**:
- GPU1计算第1-3层
- GPU2计算第4-6层
- GPU3计算第7-9层
- 数据像流水一样在GPU间流动
```python
import torch
import torch.nn as nn
class PipelineParallelModel(nn.Module):
"""
流水线并行模型
"""
def __init__(self, num_stages=4):
super().__init__()
self.stages = nn.ModuleList()
# 创建多个阶段
for i in range(num_stages):
stage = nn.Sequential(
nn.Linear(768, 768),
nn.ReLU(),
nn.Linear(768, 768),
nn.ReLU()
)
self.stages.append(stage)
def forward(self, x, stage_idx):
"""
在指定阶段执行前向传播
Args:
x: 输入张量
stage_idx: 阶段索引
"""
return self.stages[stage_idx](x)
# 流水线执行
def pipeline_execute(model, inputs, num_stages=4):
"""
执行流水线并行
Args:
model: 流水线模型
inputs: 输入数据
num_stages: 阶段数量
"""
# 初始化各阶段输入
stage_inputs = [None] * num_stages
stage_inputs[0] = inputs
# 流水线执行
outputs = []
for step in range(num_stages):
# 执行当前阶段
for stage_idx in range(num_stages):
if stage_inputs[stage_idx] is not None:
output = model(stage_inputs[stage_idx], stage_idx)
# 传递到下一阶段
if stage_idx < num_stages - 1:
stage_inputs[stage_idx + 1] = output
else:
outputs.append(output)
# 准备下一批输入
if step < num_stages - 1:
stage_inputs[0] = inputs
return outputs
# 使用示例
# model = PipelineParallelModel(num_stages=4)
# inputs = torch.randn(32, 768)
# outputs = pipeline_execute(model, inputs)
2.3 使用Hugging Face的模型并行¶
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
def model_parallel_inference(model_name="meta-llama/Llama-2-7b-hf"):
"""
使用Hugging Face的模型并行
Args:
model_name: 模型名称或路径
"""
# 加载分词器
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 加载模型,自动分布到多个GPU
model = AutoModelForCausalLM.from_pretrained(
model_name,
device_map="auto", # 自动设备映射
torch_dtype=torch.float16
)
# 查看模型分布
print("模型设备分布:")
for name, param in model.named_parameters():
if param.device != torch.device('cpu'):
print(f" {name}: {param.device}")
# 推理
prompt = "请介绍一下人工智能的发展历程"
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad(): # 禁用梯度计算,节省内存
outputs = model.generate(
**inputs,
max_length=200,
do_sample=True,
temperature=0.7
)
result = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(result)
return model, tokenizer
# 使用示例
# model, tokenizer = model_parallel_inference()
3. 数据并行¶
3.1 同步数据并行¶
同步数据并行在多个GPU上复制模型,每个GPU处理不同的数据批次。
import os
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup_distributed():
"""
初始化分布式环境
"""
# 从环境变量获取分布式配置
rank = int(os.environ.get('RANK', 0))
world_size = int(os.environ.get('WORLD_SIZE', 1))
local_rank = int(os.environ.get('LOCAL_RANK', 0))
# 初始化进程组
dist.init_process_group(
backend='nccl',
rank=rank,
world_size=world_size
)
# 设置当前设备
torch.cuda.set_device(local_rank)
return rank, world_size, local_rank
def data_parallel_inference(model, dataloader):
"""
数据并行推理
Args:
model: 要并行化的模型
dataloader: 数据加载器
"""
# 初始化分布式环境
rank, world_size, local_rank = setup_distributed()
# 将模型移动到当前GPU
model = model.to(local_rank)
# 包装为DDP模型
model = DDP(model, device_ids=[local_rank])
# 推理
model.eval() # eval()评估模式
all_outputs = []
with torch.no_grad():
for batch in dataloader:
inputs, targets = batch
inputs = inputs.to(local_rank)
outputs = model(inputs)
all_outputs.append(outputs.cpu())
# 收集所有GPU的结果
if rank == 0:
outputs = torch.cat(all_outputs, dim=0) # torch.cat沿已有维度拼接张量
return outputs
else:
return None
# 使用示例(需要使用torchrun启动)
# torchrun --nproc_per_node=4 your_script.py
3.2 异步数据并行¶
异步数据并行允许GPU独立更新模型参数,无需等待其他GPU。
import copy
import torch
import torch.nn as nn
class AsyncDataParallel(nn.Module):
"""
异步数据并行
"""
def __init__(self, model, device_ids):
super().__init__()
self.device_ids = device_ids
self.models = nn.ModuleList([
copy.deepcopy(model).to(device) for device in device_ids # 移至GPU/CPU
])
def forward(self, inputs):
"""
异步前向传播
"""
# 将输入分发到各个设备
inputs_chunks = inputs.chunk(len(self.device_ids))
# 异步执行
outputs = []
for model, input_chunk in zip(self.models, inputs_chunks): # zip按位置配对
output = model(input_chunk.to(model.device))
outputs.append(output.cpu())
# 合并结果
return torch.cat(outputs, dim=0)
# 使用示例
# model = YourModel()
# parallel_model = AsyncDataParallel(model, device_ids=[0, 1, 2, 3])
# outputs = parallel_model(inputs)
⚠️ 重要说明:上述
AsyncDataParallel类名虽含"Async",但实际是同步数据并行的演示。代码中的torch.cuda.current_stream().synchronize()会阻塞等待所有GPU完成。真正的异步数据并行(如 Hogwild!)需要共享内存参数和无锁梯度更新,在深度学习中较少使用。
4. 混合并行¶
4.1 模型并行 + 数据并行¶
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
class HybridParallelModel(nn.Module):
"""
混合并行模型(模型并行 + 数据并行)
"""
def __init__(self, model_parallel_size, data_parallel_size):
super().__init__()
self.model_parallel_size = model_parallel_size
self.data_parallel_size = data_parallel_size
# 创建模型并行的子模型
self.sub_models = nn.ModuleList()
for i in range(model_parallel_size):
sub_model = self._create_sub_model(i, model_parallel_size)
self.sub_models.append(sub_model)
def _create_sub_model(self, idx, total):
"""
创建子模型
"""
# 这里简化处理,实际需要根据模型架构设计
return nn.Sequential(
nn.Linear(768, 768),
nn.ReLU(),
nn.Linear(768, 768)
)
def forward(self, x):
"""
前向传播
"""
# 模型并行:每个子模型处理部分数据
outputs = []
for sub_model in self.sub_models:
output = sub_model(x)
outputs.append(output)
# 合并结果
output = torch.cat(outputs, dim=-1)
return output
def hybrid_parallel_training(model, dataloader):
"""
混合并行训练
"""
# 初始化分布式环境
rank = dist.get_rank()
world_size = dist.get_world_size()
# 计算模型并行和数据并行的大小
model_parallel_size = 2 # 例如
data_parallel_size = world_size // model_parallel_size
# 创建混合并行模型
model = HybridParallelModel(model_parallel_size, data_parallel_size)
# 数据并行包装
model = DDP(model)
# 训练循环
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
for epoch in range(10):
for batch in dataloader:
inputs, targets = batch
# 前向传播
outputs = model(inputs)
loss = criterion(outputs, targets)
# 反向传播
optimizer.zero_grad() # 清零梯度
loss.backward() # 反向传播计算梯度
optimizer.step() # 更新参数
print(f"Epoch {epoch+1} completed")
# 使用示例
# hybrid_parallel_training(model, dataloader)
4.2 流水线并行 + 数据并行¶
import torch
import torch.nn as nn
import torch.distributed as dist
class PipelineDataParallel(nn.Module):
"""
流水线并行 + 数据并行
"""
def __init__(self, num_pipeline_stages, num_data_parallel):
super().__init__()
self.num_pipeline_stages = num_pipeline_stages
self.num_data_parallel = num_data_parallel
# 创建流水线阶段
self.pipeline_stages = nn.ModuleList()
for i in range(num_pipeline_stages):
stage = self._create_stage(i, num_pipeline_stages)
self.pipeline_stages.append(stage)
def _create_stage(self, idx, total):
"""
创建流水线阶段
"""
return nn.Sequential(
nn.Linear(768, 768),
nn.ReLU(),
nn.Linear(768, 768)
)
def forward(self, x, pipeline_idx, data_idx):
"""
前向传播
Args:
x: 输入张量
pipeline_idx: 流水线索引
data_idx: 数据并行索引
"""
# 执行流水线阶段
output = x
for stage in self.pipeline_stages:
output = stage(output)
return output
def pipeline_data_parallel_execute(model, dataloader):
"""
执行流水线并行 + 数据并行
"""
rank = dist.get_rank()
world_size = dist.get_world_size()
# 计算流水线并行和数据并行的大小
num_pipeline_stages = 4 # 例如
num_data_parallel = world_size // num_pipeline_stages
# 计算当前rank的角色
pipeline_idx = rank // num_data_parallel
data_idx = rank % num_data_parallel
# 执行推理
model.eval()
all_outputs = []
with torch.no_grad():
for batch in dataloader:
inputs, targets = batch
# 只处理当前数据并行的数据
batch_size = inputs.size(0)
chunk_size = batch_size // num_data_parallel
start_idx = data_idx * chunk_size
end_idx = start_idx + chunk_size
chunk_inputs = inputs[start_idx:end_idx]
# 执行流水线
outputs = model(chunk_inputs, pipeline_idx, data_idx)
all_outputs.append(outputs.cpu())
# 收集结果
outputs = torch.cat(all_outputs, dim=0)
return outputs
# 使用示例
# model = PipelineDataParallel(num_pipeline_stages=4, num_data_parallel=2)
# outputs = pipeline_data_parallel_execute(model, dataloader)
5. 使用vLLM进行分布式推理¶
5.1 vLLM简介¶
vLLM是一个高性能的大模型推理库,支持高效的分布式推理。
from vllm import LLM, SamplingParams
def vllm_distributed_inference(model_name="meta-llama/Llama-2-7b-hf"):
"""
使用vLLM进行分布式推理
Args:
model_name: 模型名称或路径
"""
# 初始化LLM(自动使用所有可用GPU)
llm = LLM(
model=model_name,
tensor_parallel_size=4, # 张量并行大小
gpu_memory_utilization=0.9, # GPU内存利用率
max_model_len=4096 # 最大序列长度
)
# 设置采样参数
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.95,
max_tokens=200
)
# 准备提示
prompts = [
"请介绍一下人工智能的发展历程",
"什么是机器学习?",
"深度学习的应用有哪些?"
]
# 推理
outputs = llm.generate(prompts, sampling_params)
# 打印结果
for output in outputs:
print(f"提示: {output.prompt}")
print(f"输出: {output.outputs[0].text}\n")
return llm
# 使用示例
# llm = vllm_distributed_inference()
5.2 vLLM高级配置¶
from vllm import LLM, SamplingParams
def advanced_vllm_inference(model_name="meta-llama/Llama-2-7b-hf"):
"""
高级vLLM配置
"""
llm = LLM(
model=model_name,
# 张量并行配置
tensor_parallel_size=4,
# 内存配置
gpu_memory_utilization=0.9,
swap_space=4, # 交换空间(GB)
# 模型配置
max_model_len=8192,
trust_remote_code=True,
# 量化配置
quantization="awq", # 或 "gptq", "squeezellm"
# 性能优化
enforce_eager=False, # 使用CUDA图优化
disable_log_stats=False,
# 其他配置
dtype="float16", # 或 "bfloat16"
seed=42
)
# 批量推理
prompts = ["提示1", "提示2", "提示3"]
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.95,
max_tokens=200,
n=1, # 每个提示生成的数量
best_of=1, # 采样候选数
presence_penalty=0.0,
frequency_penalty=0.0
)
outputs = llm.generate(prompts, sampling_params)
return outputs
# 使用示例
# outputs = advanced_vllm_inference()
6. 练习题¶
基础练习¶
-
实现简单的张量并行
-
实现简单的流水线并行
进阶练习¶
-
实现混合并行策略
-
实现负载均衡
项目练习¶
- 创建分布式推理框架
- 支持多种并行策略
- 自动负载均衡
- 性能监控和优化
7. 最佳实践¶
✅ 推荐做法¶
- 选择合适的并行策略
- 小模型:数据并行
- 大模型:模型并行
-
超大模型:混合并行
-
优化通信开销
- 减少GPU间通信
- 使用高效的通信后端
-
批量传输数据
-
监控性能
- 监控GPU利用率
- 监控通信开销
- 分析瓶颈
❌ 避免做法¶
- 过度并行化
- 不要使用太多GPU
- 考虑通信开销
-
评估实际收益
-
忽略负载均衡
- 确保各GPU负载均衡
- 避免某些GPU空闲
-
动态调整任务分配
-
不充分的测试
- 在小规模上测试
- 验证正确性
- 逐步扩展规模
8. 总结¶
本章介绍了分布式推理的核心技术:
- 张量并行: 切分张量到多个GPU
- 流水线并行: 切分层到多个GPU
- 数据并行: 复制模型到多个GPU
- 混合并行: 组合多种并行策略
选择合适的并行策略需要考虑模型大小、硬件配置和应用场景。
9. 下一步¶
继续学习04-云端推理服务,了解如何将模型部署到云端并提供服务。