感謝參考原文-http://bjbsair.com/2020-03-27...
神經網絡訓練加速的最簡單方法是使用GPU,對弈神經網絡中常規操做(矩陣乘法和加法)GPU運算速度要倍超於CPU。隨着模型或數據集愈來愈大,一個GPU很快就會變得不足。例如,BERT和GPT-2等大型語言模型是在數百個GPU上訓練的。對於多GPU訓練,須要一種在不一樣GPU之間對模型和數據進行切分和調度的方法。html
PyTorch是很是流行的深度學習框架,它在主流框架中對於靈活性和易用性的平衡最好。Pytorch有兩種方法能夠在多個GPU上切分模型和數據:nn.DataParallel和nn.distributedataparallel。DataParallel更易於使用(只需簡單包裝單GPU模型)。然而,因爲它使用一個進程來計算模型權重,而後在每一個批處理期間將分發到每一個GPU,所以通訊很快成爲一個瓶頸,GPU利用率一般很低。並且,nn.DataParallel要求全部的GPU都在同一個節點上(不支持分佈式),並且不能使用Apex進行混合精度訓練。nn.DataParallel和nn.distributedataparallel的主要差別能夠總結爲如下幾點(譯者注):node
總的來講,Pytorch文檔是至關完備和清晰的,尤爲是在1.0x版本後。可是關於DistributedDataParallel的介紹卻較少,主要的文檔有如下三個:python
這篇教程將經過一個MNISI例子講述如何使用PyTorch的分佈式訓練,這裏將一段段代碼進行解釋,並且也包括任何使用apex進行混合精度訓練。git
DistributedDataParallel經過多進程在多個GPUs間複製模型,每一個GPU都由一個進程控制(固然可讓每一個進程控制多個GPU,但這顯然比每一個進程有一個GPU要慢;也能夠多個進程在一個GPU上運行)。GPU能夠都在同一個節點上,也能夠分佈在多個節點上。每一個進程都執行相同的任務,而且每一個進程都與全部其餘進程通訊。進程或者說GPU之間只傳遞梯度,這樣網絡通訊就再也不是瓶頸。github
在訓練過程當中,每一個進程從磁盤加載batch數據,並將它們傳遞到其GPU。每個GPU都有本身的前向過程,而後梯度在各個GPUs間進行All-Reduce。每一層的梯度不依賴於前一層,因此梯度的All-Reduce和後向過程同時計算,以進一步緩解網絡瓶頸。在後向過程的最後,每一個節點都獲得了平均梯度,這樣模型參數保持同步。後端
這都要求多個進程(可能在多個節點上)同步並通訊。Pytorch經過distributed.init_process_group函數來實現這一點。他須要知道進程0位置以便全部進程均可以同步,以及預期的進程總數。每一個進程都須要知道進程總數及其在進程中的順序,以及使用哪一個GPU。一般將進程總數稱爲world_size.Pytorch提供了nn.utils.data.DistributedSampler來爲各個進程切分數據,以保證訓練數據不重疊。網絡
這裏經過一個MNIST實例來說解,咱們先將其改爲分佈式訓練,而後增長混合精度訓練。多線程
首先,導入所須要的庫:框架
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而後咱們定義一個簡單的CNN模型處理MNIST數據:tcp
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函數main()接受參數,執行訓練:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中訓練部分主函數爲:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
經過啓動主函數來開始訓練:
if __name__ == '__main__': main()
你可能注意到有些參數是多餘的,可是對後面的分佈式訓練是有用的。咱們經過執行如下語句就能夠在單機單卡上訓練:
python src/mnist.py -n 1 -g 1 -nr 0
使用多進程進行分佈式訓練,咱們須要爲每一個GPU啓動一個進程。每一個進程須要知道本身運行在哪一個GPU上,以及自身在全部進程中的序號。對於多節點,咱們須要在每一個節點啓動腳本。
首先,咱們要配置基本的參數:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是節點總數,而args.gpus是每一個節點的GPU總數(每一個節點GPU數是同樣的),而args.nr 是當前節點在全部節點的序號。節點總數乘以每一個節點的GPU數能夠獲得world_size,也即進程總數。全部的進程須要知道進程0的IP地址以及端口,這樣全部進程能夠在開始時同步,通常狀況下稱進程0是master進程,好比咱們會在進程0中打印信息或者保存模型。PyTorch提供了mp.spawn來在一個節點啓動該節點全部進程,每一個進程運行train(i, args),其中i從0到args.gpus - 1。
一樣,咱們要修改訓練函數:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
這裏咱們首先計算出當前進程序號:rank = args.nr * args.gpus + gpu,而後就是經過dist.init_process_group初始化分佈式環境,其中backend參數指定通訊後端,包括mpi, gloo, nccl,這裏選擇nccl,這是Nvidia提供的官方多卡通訊框架,相對比較高效。mpi也是高性能計算經常使用的通訊協議,不過你須要本身安裝MPI實現框架,好比OpenMPI。gloo卻是內置通訊後端,可是不夠高效。init_method指的是如何初始化,以完成剛開始的進程同步;這裏咱們設置的是env://,指的是環境變量初始化方式,須要在環境變量中配置4個參數:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面兩個參數咱們已經配置,後面兩個參數也能夠經過dist.init_process_group函數中world_size和rank參數配置。其它的初始化方式還包括共享文件系統以及TCP,好比init_method='tcp://10.1.1.20:23456',其實也是要提供master的IP地址和端口。注意這個調用是阻塞的,必須等待全部進程來同步,若是任何一個進程出錯,就會失敗。
對於模型側,咱們只須要用DistributedDataParallel包裝一下原來的model便可,在背後它會支持梯度的All-Reduce操做。對於數據側,咱們nn.utils.data.DistributedSampler來給各個進程切分數據,只須要在dataloader中使用這個sampler就好,值得注意的一點是你要訓練循環過程的每一個epoch開始時調用train_sampler.set_epoch(epoch),(主要是爲了保證每一個epoch的劃分是不一樣的)其它的訓練代碼都保持不變。
最後就能夠執行代碼了,好比咱們是4節點,每一個節點是8卡,那麼須要在4個節點分別執行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此時的有效batch_size實際上是batch_size_per_gpu * world_size,對於有BN的模型還能夠採用同步BN獲取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述講述的是分佈式訓練過程,其實一樣適用於評估或者測試過程,好比咱們把數據劃分到不一樣的進程中進行預測,這樣能夠加速預測過程。實現代碼和上述過程徹底同樣,不過咱們想計算某個指標,那就須要從各個進程的統計結果進行All-Reduce,由於每一個進程僅是計算的部分數據的內容。好比咱們要計算分類準確度,咱們能夠統計每一個進程的數據總數total和分類正確的數量count,而後進行聚合。這裏要提的一點,當用dist.init_process_group初始化分佈式環境時,其實就是創建一個默認的分佈式進程組(distributed process group),這個group同時會初始化Pytorch的torch.distributed包。這樣咱們能夠直接用torch.distributed的API就能夠進行分佈式基本操做了,下面是具體實現:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度訓練(混合FP32和FP16訓練)能夠適用更大的batch_size,並且能夠利用NVIDIA Tensor Cores加速計算。採用NVIDIA的apex進行混合精度訓練很是簡單,只須要修改部分代碼:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其實就兩處變化,首先是採用amp.initialize來包裝model和optimizer以支持混合精度訓練,其中opt_level指的是優化級別,若是爲O0或者O3不是真正的混合精度,可是能夠用來肯定模型效果和速度的baseline,而O1和O2是混合精度的兩種設置,能夠選擇某個進行混合精度訓練。另一處是在進行根據梯度更新參數前,要先經過amp.scale_loss對梯度進行scale以防止梯度下溢(underflowing)。此外,你還能夠用apex.parallel.DistributedDataParallel替換nn.DistributedDataParallel。
我以爲PyTorch官方的分佈式實現已經比較完善,並且性能和效果都不錯,能夠替代的方案是horovod,不只支持PyTorch還支持TensorFlow和MXNet框架,實現起來也是比較容易的,速度方面應該不相上下。
神經網絡訓練加速的最簡單方法是使用GPU,對弈神經網絡中常規操做(矩陣乘法和加法)GPU運算速度要倍超於CPU。隨着模型或數據集愈來愈大,一個GPU很快就會變得不足。例如,BERT和GPT-2等大型語言模型是在數百個GPU上訓練的。對於多GPU訓練,須要一種在不一樣GPU之間對模型和數據進行切分和調度的方法。
PyTorch是很是流行的深度學習框架,它在主流框架中對於靈活性和易用性的平衡最好。Pytorch有兩種方法能夠在多個GPU上切分模型和數據:nn.DataParallel和nn.distributedataparallel。DataParallel更易於使用(只需簡單包裝單GPU模型)。然而,因爲它使用一個進程來計算模型權重,而後在每一個批處理期間將分發到每一個GPU,所以通訊很快成爲一個瓶頸,GPU利用率一般很低。並且,nn.DataParallel要求全部的GPU都在同一個節點上(不支持分佈式),並且不能使用Apex進行混合精度訓練。nn.DataParallel和nn.distributedataparallel的主要差別能夠總結爲如下幾點(譯者注):
總的來講,Pytorch文檔是至關完備和清晰的,尤爲是在1.0x版本後。可是關於DistributedDataParallel的介紹卻較少,主要的文檔有如下三個:
這篇教程將經過一個MNISI例子講述如何使用PyTorch的分佈式訓練,這裏將一段段代碼進行解釋,並且也包括任何使用apex進行混合精度訓練。
DistributedDataParallel經過多進程在多個GPUs間複製模型,每一個GPU都由一個進程控制(固然可讓每一個進程控制多個GPU,但這顯然比每一個進程有一個GPU要慢;也能夠多個進程在一個GPU上運行)。GPU能夠都在同一個節點上,也能夠分佈在多個節點上。每一個進程都執行相同的任務,而且每一個進程都與全部其餘進程通訊。進程或者說GPU之間只傳遞梯度,這樣網絡通訊就再也不是瓶頸。
在訓練過程當中,每一個進程從磁盤加載batch數據,並將它們傳遞到其GPU。每個GPU都有本身的前向過程,而後梯度在各個GPUs間進行All-Reduce。每一層的梯度不依賴於前一層,因此梯度的All-Reduce和後向過程同時計算,以進一步緩解網絡瓶頸。在後向過程的最後,每一個節點都獲得了平均梯度,這樣模型參數保持同步。
這都要求多個進程(可能在多個節點上)同步並通訊。Pytorch經過distributed.init_process_group函數來實現這一點。他須要知道進程0位置以便全部進程均可以同步,以及預期的進程總數。每一個進程都須要知道進程總數及其在進程中的順序,以及使用哪一個GPU。一般將進程總數稱爲world_size.Pytorch提供了nn.utils.data.DistributedSampler來爲各個進程切分數據,以保證訓練數據不重疊。
這裏經過一個MNIST實例來說解,咱們先將其改爲分佈式訓練,而後增長混合精度訓練。
首先,導入所須要的庫:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而後咱們定義一個簡單的CNN模型處理MNIST數據:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函數main()接受參數,執行訓練:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中訓練部分主函數爲:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
經過啓動主函數來開始訓練:
if __name__ == '__main__': main()
你可能注意到有些參數是多餘的,可是對後面的分佈式訓練是有用的。咱們經過執行如下語句就能夠在單機單卡上訓練:
python src/mnist.py -n 1 -g 1 -nr 0
使用多進程進行分佈式訓練,咱們須要爲每一個GPU啓動一個進程。每一個進程須要知道本身運行在哪一個GPU上,以及自身在全部進程中的序號。對於多節點,咱們須要在每一個節點啓動腳本。
首先,咱們要配置基本的參數:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是節點總數,而args.gpus是每一個節點的GPU總數(每一個節點GPU數是同樣的),而args.nr 是當前節點在全部節點的序號。節點總數乘以每一個節點的GPU數能夠獲得world_size,也即進程總數。全部的進程須要知道進程0的IP地址以及端口,這樣全部進程能夠在開始時同步,通常狀況下稱進程0是master進程,好比咱們會在進程0中打印信息或者保存模型。PyTorch提供了mp.spawn來在一個節點啓動該節點全部進程,每一個進程運行train(i, args),其中i從0到args.gpus - 1。
一樣,咱們要修改訓練函數:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
這裏咱們首先計算出當前進程序號:rank = args.nr * args.gpus + gpu,而後就是經過dist.init_process_group初始化分佈式環境,其中backend參數指定通訊後端,包括mpi, gloo, nccl,這裏選擇nccl,這是Nvidia提供的官方多卡通訊框架,相對比較高效。mpi也是高性能計算經常使用的通訊協議,不過你須要本身安裝MPI實現框架,好比OpenMPI。gloo卻是內置通訊後端,可是不夠高效。init_method指的是如何初始化,以完成剛開始的進程同步;這裏咱們設置的是env://,指的是環境變量初始化方式,須要在環境變量中配置4個參數:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面兩個參數咱們已經配置,後面兩個參數也能夠經過dist.init_process_group函數中world_size和rank參數配置。其它的初始化方式還包括共享文件系統以及TCP,好比init_method='tcp://10.1.1.20:23456',其實也是要提供master的IP地址和端口。注意這個調用是阻塞的,必須等待全部進程來同步,若是任何一個進程出錯,就會失敗。
對於模型側,咱們只須要用DistributedDataParallel包裝一下原來的model便可,在背後它會支持梯度的All-Reduce操做。對於數據側,咱們nn.utils.data.DistributedSampler來給各個進程切分數據,只須要在dataloader中使用這個sampler就好,值得注意的一點是你要訓練循環過程的每一個epoch開始時調用train_sampler.set_epoch(epoch),(主要是爲了保證每一個epoch的劃分是不一樣的)其它的訓練代碼都保持不變。
最後就能夠執行代碼了,好比咱們是4節點,每一個節點是8卡,那麼須要在4個節點分別執行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此時的有效batch_size實際上是batch_size_per_gpu * world_size,對於有BN的模型還能夠採用同步BN獲取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述講述的是分佈式訓練過程,其實一樣適用於評估或者測試過程,好比咱們把數據劃分到不一樣的進程中進行預測,這樣能夠加速預測過程。實現代碼和上述過程徹底同樣,不過咱們想計算某個指標,那就須要從各個進程的統計結果進行All-Reduce,由於每一個進程僅是計算的部分數據的內容。好比咱們要計算分類準確度,咱們能夠統計每一個進程的數據總數total和分類正確的數量count,而後進行聚合。這裏要提的一點,當用dist.init_process_group初始化分佈式環境時,其實就是創建一個默認的分佈式進程組(distributed process group),這個group同時會初始化Pytorch的torch.distributed包。這樣咱們能夠直接用torch.distributed的API就能夠進行分佈式基本操做了,下面是具體實現:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度訓練(混合FP32和FP16訓練)能夠適用更大的batch_size,並且能夠利用NVIDIA Tensor Cores加速計算。採用NVIDIA的apex進行混合精度訓練很是簡單,只須要修改部分代碼:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其實就兩處變化,首先是採用amp.initialize來包裝model和optimizer以支持混合精度訓練,其中opt_level指的是優化級別,若是爲O0或者O3不是真正的混合精度,可是能夠用來肯定模型效果和速度的baseline,而O1和O2是混合精度的兩種設置,能夠選擇某個進行混合精度訓練。另一處是在進行根據梯度更新參數前,要先經過amp.scale_loss對梯度進行scale以防止梯度下溢(underflowing)。此外,你還能夠用apex.parallel.DistributedDataParallel替換nn.DistributedDataParallel。
我以爲PyTorch官方的分佈式實現已經比較完善,並且性能和效果都不錯,能夠替代的方案是horovod,不只支持PyTorch還支持TensorFlow和MXNet框架,實現起來也是比較容易的,速度方面應該不相上下。
神經網絡訓練加速的最簡單方法是使用GPU,對弈神經網絡中常規操做(矩陣乘法和加法)GPU運算速度要倍超於CPU。隨着模型或數據集愈來愈大,一個GPU很快就會變得不足。例如,BERT和GPT-2等大型語言模型是在數百個GPU上訓練的。對於多GPU訓練,須要一種在不一樣GPU之間對模型和數據進行切分和調度的方法。
PyTorch是很是流行的深度學習框架,它在主流框架中對於靈活性和易用性的平衡最好。Pytorch有兩種方法能夠在多個GPU上切分模型和數據:nn.DataParallel和nn.distributedataparallel。DataParallel更易於使用(只需簡單包裝單GPU模型)。然而,因爲它使用一個進程來計算模型權重,而後在每一個批處理期間將分發到每一個GPU,所以通訊很快成爲一個瓶頸,GPU利用率一般很低。並且,nn.DataParallel要求全部的GPU都在同一個節點上(不支持分佈式),並且不能使用Apex進行混合精度訓練。nn.DataParallel和nn.distributedataparallel的主要差別能夠總結爲如下幾點(譯者注):
總的來講,Pytorch文檔是至關完備和清晰的,尤爲是在1.0x版本後。可是關於DistributedDataParallel的介紹卻較少,主要的文檔有如下三個:
這篇教程將經過一個MNISI例子講述如何使用PyTorch的分佈式訓練,這裏將一段段代碼進行解釋,並且也包括任何使用apex進行混合精度訓練。
DistributedDataParallel經過多進程在多個GPUs間複製模型,每一個GPU都由一個進程控制(固然可讓每一個進程控制多個GPU,但這顯然比每一個進程有一個GPU要慢;也能夠多個進程在一個GPU上運行)。GPU能夠都在同一個節點上,也能夠分佈在多個節點上。每一個進程都執行相同的任務,而且每一個進程都與全部其餘進程通訊。進程或者說GPU之間只傳遞梯度,這樣網絡通訊就再也不是瓶頸。
在訓練過程當中,每一個進程從磁盤加載batch數據,並將它們傳遞到其GPU。每個GPU都有本身的前向過程,而後梯度在各個GPUs間進行All-Reduce。每一層的梯度不依賴於前一層,因此梯度的All-Reduce和後向過程同時計算,以進一步緩解網絡瓶頸。在後向過程的最後,每一個節點都獲得了平均梯度,這樣模型參數保持同步。
這都要求多個進程(可能在多個節點上)同步並通訊。Pytorch經過distributed.init_process_group函數來實現這一點。他須要知道進程0位置以便全部進程均可以同步,以及預期的進程總數。每一個進程都須要知道進程總數及其在進程中的順序,以及使用哪一個GPU。一般將進程總數稱爲world_size.Pytorch提供了nn.utils.data.DistributedSampler來爲各個進程切分數據,以保證訓練數據不重疊。
這裏經過一個MNIST實例來說解,咱們先將其改爲分佈式訓練,而後增長混合精度訓練。
首先,導入所須要的庫:
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from apex.parallel import DistributedDataParallel as DDP from apex import amp
而後咱們定義一個簡單的CNN模型處理MNIST數據:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc = nn.Linear(7*7*32, num_classes) def forward(self, x): out = self.layer1(x) out = self.layer2(out) out = out.reshape(out.size(0), -1) out = self.fc(out) return out
主函數main()接受參數,執行訓練:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() train(0, args)
其中訓練部分主函數爲:
def train(gpu, args): torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if (i + 1) % 100 == 0 and gpu == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format( epoch + 1, args.epochs, i + 1, total_step, loss.item()) ) if gpu == 0: print("Training complete in: " + str(datetime.now() - start))
經過啓動主函數來開始訓練:
if __name__ == '__main__': main()
你可能注意到有些參數是多餘的,可是對後面的分佈式訓練是有用的。咱們經過執行如下語句就能夠在單機單卡上訓練:
python src/mnist.py -n 1 -g 1 -nr 0
使用多進程進行分佈式訓練,咱們須要爲每一個GPU啓動一個進程。每一個進程須要知道本身運行在哪一個GPU上,以及自身在全部進程中的序號。對於多節點,咱們須要在每一個節點啓動腳本。
首先,咱們要配置基本的參數:
def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N') parser.add_argument('-g', '--gpus', default=1, type=int, help='number of gpus per node') parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes') parser.add_argument('--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') args = parser.parse_args() ######################################################### args.world_size = args.gpus * args.nodes # os.environ['MASTER_ADDR'] = '10.57.23.164' # os.environ['MASTER_PORT'] = '8888' # mp.spawn(train, nprocs=args.gpus, args=(args,)) # #########################################################
其中args.nodes是節點總數,而args.gpus是每一個節點的GPU總數(每一個節點GPU數是同樣的),而args.nr 是當前節點在全部節點的序號。節點總數乘以每一個節點的GPU數能夠獲得world_size,也即進程總數。全部的進程須要知道進程0的IP地址以及端口,這樣全部進程能夠在開始時同步,通常狀況下稱進程0是master進程,好比咱們會在進程0中打印信息或者保存模型。PyTorch提供了mp.spawn來在一個節點啓動該節點全部進程,每一個進程運行train(i, args),其中i從0到args.gpus - 1。
一樣,咱們要修改訓練函數:
def train(gpu, args): ############################################################ rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank ) ############################################################ torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) ############################################################### # Wrap the model model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ############################################################### # Data loading code train_dataset = torchvision.datasets.MNIST( root='./data', train=True, transform=transforms.ToTensor(), download=True ) ################################################################ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.world_size, rank=rank ) ################################################################ train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, ############################## shuffle=False, # ############################## num_workers=0, pin_memory=True, ############################# sampler=train_sampler) # ############################# ...
這裏咱們首先計算出當前進程序號:rank = args.nr * args.gpus + gpu,而後就是經過dist.init_process_group初始化分佈式環境,其中backend參數指定通訊後端,包括mpi, gloo, nccl,這裏選擇nccl,這是Nvidia提供的官方多卡通訊框架,相對比較高效。mpi也是高性能計算經常使用的通訊協議,不過你須要本身安裝MPI實現框架,好比OpenMPI。gloo卻是內置通訊後端,可是不夠高效。init_method指的是如何初始化,以完成剛開始的進程同步;這裏咱們設置的是env://,指的是環境變量初始化方式,須要在環境變量中配置4個參數:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面兩個參數咱們已經配置,後面兩個參數也能夠經過dist.init_process_group函數中world_size和rank參數配置。其它的初始化方式還包括共享文件系統以及TCP,好比init_method='tcp://10.1.1.20:23456',其實也是要提供master的IP地址和端口。注意這個調用是阻塞的,必須等待全部進程來同步,若是任何一個進程出錯,就會失敗。
對於模型側,咱們只須要用DistributedDataParallel包裝一下原來的model便可,在背後它會支持梯度的All-Reduce操做。對於數據側,咱們nn.utils.data.DistributedSampler來給各個進程切分數據,只須要在dataloader中使用這個sampler就好,值得注意的一點是你要訓練循環過程的每一個epoch開始時調用train_sampler.set_epoch(epoch),(主要是爲了保證每一個epoch的劃分是不一樣的)其它的訓練代碼都保持不變。
最後就能夠執行代碼了,好比咱們是4節點,每一個節點是8卡,那麼須要在4個節點分別執行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此時的有效batch_size實際上是batch_size_per_gpu * world_size,對於有BN的模型還能夠採用同步BN獲取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述講述的是分佈式訓練過程,其實一樣適用於評估或者測試過程,好比咱們把數據劃分到不一樣的進程中進行預測,這樣能夠加速預測過程。實現代碼和上述過程徹底同樣,不過咱們想計算某個指標,那就須要從各個進程的統計結果進行All-Reduce,由於每一個進程僅是計算的部分數據的內容。好比咱們要計算分類準確度,咱們能夠統計每一個進程的數據總數total和分類正確的數量count,而後進行聚合。這裏要提的一點,當用dist.init_process_group初始化分佈式環境時,其實就是創建一個默認的分佈式進程組(distributed process group),這個group同時會初始化Pytorch的torch.distributed包。這樣咱們能夠直接用torch.distributed的API就能夠進行分佈式基本操做了,下面是具體實現:
# define tensor on GPU, count and total is the result at each GPU t = torch.tensor([count, total], dtype=torch.float64, device='cuda') dist.barrier() # synchronizes all processes dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result. t = t.tolist() all_count = int(t[0]) all_total = int(t[1]) acc = all_count / all_total
混合精度訓練(混合FP32和FP16訓練)能夠適用更大的batch_size,並且能夠利用NVIDIA Tensor Cores加速計算。採用NVIDIA的apex進行混合精度訓練很是簡單,只須要修改部分代碼:
rank = args.nr * args.gpus + gpu dist.init_process_group( backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) torch.manual_seed(0) model = ConvNet() torch.cuda.set_device(gpu) model.cuda(gpu) batch_size = 100 # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().cuda(gpu) optimizer = torch.optim.SGD(model.parameters(), 1e-4) # Wrap the model ############################################################## model, optimizer = amp.initialize(model, optimizer, opt_level='O2') model = DDP(model) ############################################################## # Data loading code ... start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): for i, (images, labels) in enumerate(train_loader): images = images.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) # Forward pass outputs = model(images) loss = criterion(outputs, labels) # Backward and optimize optimizer.zero_grad() ############################################################## with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() ############################################################## optimizer.step() ...
其實就兩處變化,首先是採用amp.initialize來包裝model和optimizer以支持混合精度訓練,其中opt_level指的是優化級別,若是爲O0或者O3不是真正的混合精度,可是能夠用來肯定模型效果和速度的baseline,而O1和O2是混合精度的兩種設置,能夠選擇某個進行混合精度訓練。另一處是在進行根據梯度更新參數前,要先經過amp.scale_loss對梯度進行scale以防止梯度下溢(underflowing)。此外,你還能夠用apex.parallel.DistributedDataParallel替換nn.DistributedDataParallel。
我以爲PyTorch官方的分佈式實現已經比較完善,並且性能和效果都不錯,能夠替代的方案是horovod,不只支持PyTorch還支持TensorFlow和MXNet框架,實現起來也是比較容易的,速度方面應該不相上下。