Pytorch多GPU訓練本質上是數據並行,每一個GPU上擁有整個模型的參數,將一個batch的數據均分紅N份,每一個GPU處理一份數據,而後將每一個GPU上的梯度進行整合獲得整個batch的梯度,用整合後的梯度更新全部GPU上的參數,完成一次迭代。html
其中多gpu訓練的方案有兩種,一種是利用nn.DataParallel
實現,這種方法是最先引入pytorch的,使用簡單方便,不涉及多進程。另外一種是用torch.nn.parallel.DistributedDataParallel
和
torch.utils.data.distributed.DistributedSampler
結合多進程實現,第二種方式效率更高,參考,可是實現起來稍難, 第二種方式同時支持多節點分佈式實現。方案二的效率要比方案一高,即便是在單運算節點上,參考pytorch doc:node
In the single-machine synchronous case, torch.distributed or the torch.nn.parallel.DistributedDataParallel() wrapper may still have advantages over other approaches to data parallelism, including torch.nn.DataParallel():python
本篇文章將詳細介紹這兩種方式的實現,只限於單機上實現,分佈式較爲複雜,下一篇文章再介紹。
參考:git
nn.DataParallel
wrap.model = nn.DataParallel(model)
os.environ["CUDA_VISIBLE_DEVICES"]="0"
指定當前程序可使用GPU設備號,若是不指定將會使用設備上全部的GPU設備。os.environ["CUDA_VISIBLE_DEVICES"]="0,1,2" #使用3個GPU
訓練過程與使用單GPU一致,使用這種方法,pytorch會自動的將batch數據拆分爲N份(N是用os.environ
指定的GPU數量),分別forward,backward,而後自動整合每一個GPU上的梯度,在一塊GPU上update參數,最後將參數廣播給其餘GPU,完成一次迭代。github
代碼:
shell
import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader import os # dataset class RandomDataset(Dataset): def __init__(self, size, length): self.len = length self.data = torch.randn(length, size) def __getitem__(self, index): return self.data[index] def __len__(self): return self.len # model define class Model(nn.Module): # Our model def __init__(self, input_size, output_size): super(Model, self).__init__() self.fc = nn.Linear(input_size, output_size) def forward(self, input): output = self.fc(input) print("\tIn Model: input size", input.size(), "output size", output.size()) return output if __name__=="__main__": # Parameters input_size = 5 output_size = 2 batch_size = 30 data_size = 100 dataset = RandomDataset(input_size, data_size) # dataloader define rand_loader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True) # model init model = Model(input_size, output_size) # cuda devices os.environ["CUDA_VISIBLE_DEVICES"]="0,1" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if torch.cuda.device_count() > 1: print("Let's use", torch.cuda.device_count(), "GPUs!") # dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs model = nn.DataParallel(model) model.to(device) for data in rand_loader: input = data.to(device) output = model(input) # loss # backward #update time.sleep(1)#模擬一個比較長的batch時間 print("Outside: input size", input.size(), "output_size", output.size()) torch.save(model.module.state_dict(), "model.pth")
In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2]) Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([5, 5]) output size torch.Size([5, 2]) In Model: input size torch.Size([5, 5]) output size torch.Size([5, 2]) Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])
方案二是用多進程來實現的,其實分佈式就是多進程的意思,分佈在多個機器上的進程,利用網絡通訊協調彼此。關於分佈式的處理下一篇文章再詳細介紹。這裏主要介紹單機上方案二與方案一的不一樣。首先每一個進程都有獨立的訓練過程,一次迭代後share梯度,整合梯度,獨立更新參數。迭代過程當中不會進行參數的傳遞(初始化時會同步全部進程上的參數)。其次進程之間的通訊採用了NCCL,固然NCCL已是pytorch內部支持了,因此通常狀況下不用理這個。分佈式的細節參考下一篇文章,這裏只給出最簡單的實現。網絡
torch.utils.data.distributed.DistributedSampler
. 具體使用參見測試部分的代碼。torch.nn.parallel.DistributedDataParallel
. 具體使用參見測試部分的代碼。代碼與方案一相似,須要初始化進程組,表示本程序是分佈式訓練的。多進程的建立經過指定python -m torch.distributed.launch --nproc_per_node=2 --nnodes=1
來實現的,nnodes爲1,由於這裏咱們是一個計算節點,nproc_per_node=2
表示須要建立兩個進程來訓練,而後每一個進程都得到分配給它rank號,rank惟一標識一個進程,rank 0爲master,其餘是slave。固然通常是須要兩個GPU的,測試程序中是根據rank來指定進程使用GPU,即rank 0使用GPU0,rank 1進程使用GPU1。須要根據數據集建立一個分佈式的sampler,初始化dataloader的時候要指定這個sampler,模型分佈式封裝詳見代碼。
代碼:
多線程
import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader import os import torch.distributed as dist import torch.utils.data.distributed import sys import time # dataset class RandomDataset(Dataset): def __init__(self, size, length): self.len = length self.data = torch.randn(length, size) def __getitem__(self, index): return self.data[index] def __len__(self): return self.len # model define class Model(nn.Module): # Our model def __init__(self, input_size, output_size): super(Model, self).__init__() self.fc = nn.Linear(input_size, output_size) def forward(self, input): output = self.fc(input) # print("\tIn Model: input size", input.size(), # "output size", output.size()) return output if __name__=="__main__": # Parameters input_size = 5 output_size = 2 batch_size = 30 data_size = 100 # check the nccl backend if not dist.is_nccl_available(): print("Error: nccl backend not available.") sys.exit(1) # init group dist.init_process_group(backend="nccl", init_method="env://") # get the process rank and the world size rank = dist.get_rank() world_size = dist.get_world_size() # prepare the dataset dataset = RandomDataset(input_size, data_size) train_sampler = torch.utils.data.distributed.DistributedSampler(dataset) rand_loader = DataLoader(dataset, batch_size=batch_size//world_size, shuffle=(train_sampler is None), sampler=train_sampler) # dataloader define # rand_loader = DataLoader(dataset=dataset, # batch_size=batch_size, shuffle=True) # model init model = Model(input_size, output_size) # cuda devices # os.environ["CUDA_VISIBLE_DEVICES"]="0" # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # if torch.cuda.device_count() > 1: # print("Let's use", torch.cuda.device_count(), "GPUs!") # # dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs # model = nn.DataParallel(model) # model.to(device) # distribute model define device = torch.device('cuda', rank) model = model.to(device) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank], output_device=rank) print("From rank %d: start training, time:%s"%(rank, time.strftime("%Y-%m-%d %H:%M:%S"))) for data in rand_loader: input = data.to(device) output = model(input) # loss # backward #update time.sleep(1)#模擬一個比較長的batch時間 print("From rank %d: Outside: input size %s, output size %s"%(rank, str(input.size()), str(output.size())),flush=True) torch.save(model.module.state_dict(), "model_%d.pth"%rank) print("From rank %d: end training, time: %s"%(rank, time.strftime("%Y-%m-%d %H:%M:%S")))
python -m torch.distributed.launch --nproc_per_node=2 --nnodes=1 simple_test.py
From rank 0: start training, time:2019-09-26 13:20:13 From rank 1: start training, time:2019-09-26 13:20:13 From rank 0: Outside: input size torch.Size([15, 5]), output size torch.Size([15, 2]) From rank 1: Outside: input size torch.Size([15, 5]), output size torch.Size([15, 2]) From rank 0: Outside: input size torch.Size([15, 5]), output size torch.Size([15, 2]) From rank 1: Outside: input size torch.Size([15, 5]), output size torch.Size([15, 2]) From rank 1: Outside: input size torch.Size([15, 5]), output size torch.Size([15, 2])From rank 0: Outside: input size torch.Size([15, 5]), output size torch.Size([15, 2]) From rank 0: Outside: input size torch.Size([5, 5]), output size torch.Size([5, 2]) From rank 0: end training, time: 2019-09-26 13:20:17 From rank 1: Outside: input size torch.Size([5, 5]), output size torch.Size([5, 2]) From rank 1: end training, time: 2019-09-26 13:20:17 ***************************************** Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. *****************************************
我直接將測試果貼上來,能夠看出有點亂,是因爲多進程並行致使的問題,仔細看能夠看出有兩個進程並行訓練,每一個進程處理半個batch數據。最後的OMP_NUM_THREADS 信息是pytorch lanch的時候打印的,翻譯過來就是我沒有指定OMP多線程的數目,它爲了防止系統過負荷,因此貼心的幫我設置爲了1,原碼參考.app
模型的保存與加載,與單GPU的方式有所不一樣。這裏統統將參數以cpu的方式save進存儲, 由於若是是保存的GPU上參數,pth文件中會記錄參數屬於的GPU號,則加載時會加載到相應的GPU上,這樣就會致使若是你GPU數目不夠時會在加載模型時報錯,像下面這樣:dom
RuntimeError: Attempting to deserialize object on CUDA device 1 but torch.cuda.device_count() is 1. Please use torch.load with map_location to map your storages to an existing device.
模型保存都是一致的,不過期刻記住方案二中你有多個進程在同時跑,因此會保存多個模型到存儲上,若是使用共享存儲就要注意文件名的問題,固然通常只在rank0進程上保存參數便可,由於全部進程的模型參數是同步的。
torch.save(model.module.cpu().state_dict(), "model.pth")
模型的加載:
param=torch.load("model.pth")
好了今天就寫到這兒,很久沒有這麼認真的寫篇博客了。固然仍是有一些地方不夠完善,好比關於模型參數同步的檢驗。若是你有什麼問題,或者以爲哪裏有不對的地方請在評論區給出,蟹蟹 ^=^。