分布式并行训练
为了解决算力增速不足的问题,人们考虑用多节点集群进行分布式训练,以提升算力,分布式训练势在必行。
并行训练的评价指标
FLOPs/GPU 利用率
关注计算性能是否充分发挥:
- FLOPs(浮点运算次数):计算模型训练过程中执行的浮点运算次数,通常以 TFLOPs(TeraFLOPs)或 PFLOPs(PetaFLOPs)衡量。
- FLOPs 利用率(FLOPs Utilization):实际执行的 FLOPs 与硬件理论峰值 FLOPs 的比值,衡量计算资源利用率。
- GPU 利用率(GPU Utilization):统计 GPU 计算核心的使用情况,反映 GPU 是否被充分利用。
吞吐量与并行效率
关注扩展性是否合理:
- 吞吐量(Throughput):每秒处理的样本数(samples/sec)或 tokens/sec,通常用于衡量训练速度。
- 加速比(Speedup):$ S(N) = \frac{T(1)}{T(N)} $其中$ T(1)$ 是单 GPU 训练时间,$T(N)$ 是 N 个 GPU 训练时间。
- 并行效率(Scaling Efficiency):$ E(N) = \frac{S(N)}{N} = \frac{T(1)}{N \cdot T(N)} $.衡量并行扩展性,理想情况下应接近 100%。
Megatron-LM的实验中:
We sustain 15.1 PetaFLOPs with 512 V100 GPUS across the entire application with 76% scaling efficiency when compared to a strong single GPU baseline that sustains 39 TeraFLOPs, which is 30% of peak FLOPs.
计算过程:
1 PetaFLOPs = 1000 TeraFLOPs
39 * 512 =19,968 TeraFLOPS
15.1 / 20.0 = 75.5 %
并行方法
数据并行
运行原理:
- 所谓的数据并行,就是将数据 x 进行切分,而每个设备上的模型 w 是完整的、一致的。
- 数据并行策略下,在反向传播过程中,需要对各个设备上的梯度进行 AllReduce,以确保各个设备上的模型始终保持一致。
适用场景:
- 当数据集较大,模型较小时
注意:If your model contains any
BatchNorm
layers, it needs to be converted toSyncBatchNorm
to sync the running stats ofBatchNorm
layers across replicas.
训练步骤:
-
创建进程组:
-
给每个进程设置cuda id
-
Init_process_progress
-
初始化分布式通信环境
-
建立进程组(Process Group):为多个进程(如不同 GPU/机器上的训练进程)创建通信组,使它们能够相互通信。
-
设置通信后端:支持多种后端:
-
nccl
:NVIDIA GPU 最佳选择,高性能。 -
gloo
:CPU 或 GPU(跨平台支持)。 -
mpi
:需 MPI 环境支持。 -
后端决定了通信效率和硬件兼容性。
-
-
-
定义进程拓扑
- 指定全局进程数(
world_size
):设置参与训练的总进程数(如 GPU 总数)。 - 分配唯一标识(
rank
):为当前进程分配全局唯一 ID(0
到world_size-1
),rank=0
通常为主节点。
- 指定全局进程数(
-
配置通信方式(
init_method
) -
环境变量初始化(
env://
):通过环境变量MASTER_ADDR
和MASTER_PORT
指定主节点地址和端口(最常用)。 -
TCP 初始化:直接指定主节点 IP 和端口(
tcp://10.1.1.20:23456
)。 -
共享文件初始化:通过共享文件系统同步(
file:///path/to/shared_file
)。 -
协调进程启动
- 同步所有进程:确保所有进程完成初始化后再继续执行(避免死锁)。
- 建立通信链路:例如,使用
nccl
时为 GPU 建立高速通信通道。
-
准备核心通信操作
dist.all_reduce()
:全局求和等规约操作。dist.broadcast()
:主节点向其他节点广播数据。dist.barrier()
:进程同步等待。
-
-
-
创建ddp模型
1
self.model = DDP(model, device_ids=[gpu_id])
-
创建数据分发器
-
DistributedSampler chunks the input data across all distributed processes.
-
The DataLoader combines a dataset and a sampler, and provides an iterable over the given dataset.
-
Each process will receive an input batch of 32 samples; the effective batch size is
32 * nprocs
, or 128 when using 4 GPUs.1
2
3
4
5
6train_data = torch.utils.data.DataLoader(
dataset=train_dataset,
batch_size=32,
shuffle=False, # We don't shuffle
sampler=DistributedSampler(train_dataset), # Use the Distributed Sampler here.
) -
Calling the
set_epoch()
method on theDistributedSampler
at the beginning of each epoch is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be used in each epoch.1
2
3
4
5
6def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
self.train_data.sampler.set_epoch(epoch) # call this additional line at every epoch
for source, targets in self.train_data:
...
self._run_batch(source, targets)
-
-
保存模型
We only need to save model checkpoints from one process. Without this condition, each process would save its copy of the identical mode.
1
2
3
4
5
6
7- ckp = self.model.state_dict()
+ ckp = self.model.module.state_dict()
...
...
- if epoch % self.save_every == 0:
+ if self.gpu_id == 0 and epoch % self.save_every == 0:
self._save_checkpoint(epoch) -
运行分布式训练
-
Include new arguments
rank
(replacingdevice
) andworld_size
.rank
is auto-allocated by DDP when calling mp.spawn.world_size
is the number of processes across the training job. For GPU training, this corresponds to the number of GPUs in use, and each process works on a dedicated GPU.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18- def main(device, total_epochs, save_every):
+ def main(rank, world_size, total_epochs, save_every):
+ ddp_setup(rank, world_size)
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size=32)
- trainer = Trainer(model, train_data, optimizer, device, save_every)
+ trainer = Trainer(model, train_data, optimizer, rank, save_every)
trainer.train(total_epochs)
+ destroy_process_group()
if __name__ == "__main__":
import sys
total_epochs = int(sys.argv[1])
save_every = int(sys.argv[2])
- device = 0 # shorthand for cuda:0
- main(device, total_epochs, save_every)
+ world_size = torch.cuda.device_count()
+ mp.spawn(main, args=(world_size, total_epochs, save_every,), nprocs=world_size)
-
模型并行
运行步骤:
- 所谓的模型并行,就是每个设备上的数据是完整的、一致的,而模型 w 被切分到了各个设备上,每个设备只拥有模型的一部分,所有计算设备上的模型拼在一起,才是完整的模型。
- 模型并行的好处是,省去了多个设备之间的梯度 AllReduce;但是,由于每个设备都需要完整的数据输入,因此,数据会在多个设备之间进行广播,产生通信代价。
适用场景:
- 神经网络巨大,梯度同步代价大,甚至网络参数无法放倒单一设备上。
- 通信占比高,适合在机器内做模型并行。
流水并行
运行步骤:
-
流水并行指将网络切为多个阶段,并分发到不同的计算设备上,各个计算设备之间以“接力”的方式完成训练。
-
4层网络被切分到2个计算设备上,其中
GPU0
上进行T1
与T2
的运算,GPU1
上进行T3
与T4
的计算。
适用场景:
- 流水线并行,训练设备容易出现空闲状态,加速效率没有数据并行高;但能减少通信边界支持更多的层数,适合在机器间使用。
混合并行
- 它首先被分为 64 个阶段,进行流水并行。每个阶段都运行在 6 台 DGX-A100 主机上;
- 在6台主机之间,进行的是数据并行训练;
- 每台主机有 8 张 GPU 显卡,同一台机器上的8张 GPU 显卡之间是进行模型并行训练。