分布式并行训练

为了解决算力增速不足的问题,人们考虑用多节点集群进行分布式训练,以提升算力,分布式训练势在必行。

并行训练的评价指标

FLOPs/GPU 利用率

关注计算性能是否充分发挥:

  • FLOPs(浮点运算次数):计算模型训练过程中执行的浮点运算次数,通常以 TFLOPs(TeraFLOPs)或 PFLOPs(PetaFLOPs)衡量。
  • FLOPs 利用率(FLOPs Utilization):实际执行的 FLOPs 与硬件理论峰值 FLOPs 的比值,衡量计算资源利用率。
  • GPU 利用率(GPU Utilization):统计 GPU 计算核心的使用情况,反映 GPU 是否被充分利用。

吞吐量与并行效率

关注扩展性是否合理:

  • 吞吐量(Throughput):每秒处理的样本数(samples/sec)或 tokens/sec,通常用于衡量训练速度。
  • 加速比(Speedup):$ S(N) = \frac{T(1)}{T(N)} $其中$ T(1)$ 是单 GPU 训练时间,$T(N)$ 是 N 个 GPU 训练时间。
  • 并行效率(Scaling Efficiency):$ E(N) = \frac{S(N)}{N} = \frac{T(1)}{N \cdot T(N)} $.衡量并行扩展性,理想情况下应接近 100%。

Megatron-LM的实验中:

We sustain 15.1 PetaFLOPs with 512 V100 GPUS across the entire application with 76% scaling efficiency when compared to a strong single GPU baseline that sustains 39 TeraFLOPs, which is 30% of peak FLOPs.

计算过程:

1 PetaFLOPs = 1000 TeraFLOPs

39 * 512 =19,968 TeraFLOPS

15.1 / 20.0 = 75.5 %

并行方法

数据并行

数据并行

运行原理:

  • 所谓的数据并行,就是将数据 x 进行切分,而每个设备上的模型 w 是完整的、一致的。
  • 数据并行策略下,在反向传播过程中,需要对各个设备上的梯度进行 AllReduce,以确保各个设备上的模型始终保持一致。

适用场景:

  • 当数据集较大,模型较小时

注意:If your model contains any BatchNorm layers, it needs to be converted to SyncBatchNorm to sync the running stats of BatchNorm layers across replicas.

训练步骤:

  1. 创建进程组:

    • 给每个进程设置cuda id

    • Init_process_progress

      • 初始化分布式通信环境

        • 建立进程组(Process Group):为多个进程(如不同 GPU/机器上的训练进程)创建通信组,使它们能够相互通信。

        • 设置通信后端:支持多种后端:

          • nccl:NVIDIA GPU 最佳选择,高性能。

          • gloo:CPU 或 GPU(跨平台支持)。

          • mpi:需 MPI 环境支持。

          • 后端决定了通信效率和硬件兼容性。

      • 定义进程拓扑

        • 指定全局进程数(world_size:设置参与训练的总进程数(如 GPU 总数)。
        • 分配唯一标识(rank:为当前进程分配全局唯一 ID(0world_size-1),rank=0通常为主节点。
      • 配置通信方式(init_method

      • 环境变量初始化(env://:通过环境变量 MASTER_ADDRMASTER_PORT指定主节点地址和端口(最常用)。

      • TCP 初始化:直接指定主节点 IP 和端口(tcp://10.1.1.20:23456)。

      • 共享文件初始化:通过共享文件系统同步(file:///path/to/shared_file)。

      • 协调进程启动

        • 同步所有进程:确保所有进程完成初始化后再继续执行(避免死锁)。
        • 建立通信链路:例如,使用 nccl时为 GPU 建立高速通信通道。
      • 准备核心通信操作

        • dist.all_reduce():全局求和等规约操作。
        • dist.broadcast():主节点向其他节点广播数据。
        • dist.barrier():进程同步等待。
  2. 创建ddp模型

    1
    self.model = DDP(model, device_ids=[gpu_id])
  3. 创建数据分发器

    • DistributedSampler chunks the input data across all distributed processes.

    • The DataLoader combines a dataset and a sampler, and provides an iterable over the given dataset.

    • Each process will receive an input batch of 32 samples; the effective batch size is 32 * nprocs, or 128 when using 4 GPUs.

      1
      2
      3
      4
      5
      6
      train_data = torch.utils.data.DataLoader(
      dataset=train_dataset,
      batch_size=32,
      shuffle=False, # We don't shuffle
      sampler=DistributedSampler(train_dataset), # Use the Distributed Sampler here.
      )
    • Calling the set_epoch() method on the DistributedSampler at the beginning of each epoch is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be used in each epoch.

      1
      2
      3
      4
      5
      6
      def _run_epoch(self, epoch):
      b_sz = len(next(iter(self.train_data))[0])
      self.train_data.sampler.set_epoch(epoch) # call this additional line at every epoch
      for source, targets in self.train_data:
      ...
      self._run_batch(source, targets)
  4. 保存模型

    We only need to save model checkpoints from one process. Without this condition, each process would save its copy of the identical mode.

    1
    2
    3
    4
    5
    6
    7
    - ckp = self.model.state_dict()
    + ckp = self.model.module.state_dict()
    ...
    ...
    - if epoch % self.save_every == 0:
    + if self.gpu_id == 0 and epoch % self.save_every == 0:
    self._save_checkpoint(epoch)
  5. 运行分布式训练

    • Include new arguments rank (replacing device) and world_size.

      • rank is auto-allocated by DDP when calling mp.spawn.
      • world_size is the number of processes across the training job. For GPU training, this corresponds to the number of GPUs in use, and each process works on a dedicated GPU.
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      - def main(device, total_epochs, save_every):
      + def main(rank, world_size, total_epochs, save_every):
      + ddp_setup(rank, world_size)
      dataset, model, optimizer = load_train_objs()
      train_data = prepare_dataloader(dataset, batch_size=32)
      - trainer = Trainer(model, train_data, optimizer, device, save_every)
      + trainer = Trainer(model, train_data, optimizer, rank, save_every)
      trainer.train(total_epochs)
      + destroy_process_group()

      if __name__ == "__main__":
      import sys
      total_epochs = int(sys.argv[1])
      save_every = int(sys.argv[2])
      - device = 0 # shorthand for cuda:0
      - main(device, total_epochs, save_every)
      + world_size = torch.cuda.device_count()
      + mp.spawn(main, args=(world_size, total_epochs, save_every,), nprocs=world_size)

模型并行

模型并行

运行步骤:

  • 所谓的模型并行,就是每个设备上的数据是完整的、一致的,而模型 w 被切分到了各个设备上,每个设备只拥有模型的一部分,所有计算设备上的模型拼在一起,才是完整的模型。
  • 模型并行的好处是,省去了多个设备之间的梯度 AllReduce;但是,由于每个设备都需要完整的数据输入,因此,数据会在多个设备之间进行广播,产生通信代价。

适用场景:

  • 神经网络巨大,梯度同步代价大,甚至网络参数无法放倒单一设备上。
  • 通信占比高,适合在机器内做模型并行。

流水并行

流水并行

运行步骤:

  • 流水并行指将网络切为多个阶段,并分发到不同的计算设备上,各个计算设备之间以“接力”的方式完成训练。

  • 4层网络被切分到2个计算设备上,其中 GPU0 上进行 T1T2 的运算,GPU1 上进行 T3T4 的计算。

适用场景:

  • 流水线并行,训练设备容易出现空闲状态,加速效率没有数据并行高;但能减少通信边界支持更多的层数,适合在机器间使用。

混合并行

GPT3 训练架构

  1. 它首先被分为 64 个阶段,进行流水并行。每个阶段都运行在 6 台 DGX-A100 主机上;
  2. 在6台主机之间,进行的是数据并行训练;
  3. 每台主机有 8 张 GPU 显卡,同一台机器上的8张 GPU 显卡之间是进行模型并行训练。

参考文献

  1. 常见的分布式并行策略