group
進程組。默認狀況只有一個組,一個 job 爲一個組,也爲一個 worldnode
world size
全局進程個數python
rank
表示進程序號,用於進程間的通信。rank=0 的主機爲 master 節點dom
local rank
進程內 GPU 編號,非顯式參數,由 torch.distributed.launch 內部指定。 rank=3, local_rank=0 表示第 3 個進程內的第 1 塊 GPU。分佈式
import argparse parser = argparse.ArgumentParser(description='PyTorch distributed training') parser.add_argument("--local_rank", type=int, default=0) parser.add_argument("--dist", type=bool, default=True) parser.add_argument("--gpu_ids", type=list, default=[0,1,2,3]) args = parser.parse_args()
import os import torch import torch.distributed as dist import torch.multiprocessing as mp def init_dist(backend="nccl", **kwargs): """ initialization for distributed training""" if ( mp.get_start_method(allow_none=True) != "spawn" ): # Return the name of start method used for starting processes mp.set_start_method("spawn", force=True) #'spawn' is the default on Windows rank = int(os.environ['RANK']) # system env process ranks num_gpus = torch.cuda.device_count() # Returns the number of GPUs available torch.cuda.set_device(rank % num_gpus) dist.init_process_group( backend=backend, **kwargs ) # Initializes the default distributed process group if args.dist: init_dist() world_size = ( torch.distributed.get_world_size() ) # Returns the number of processes in the current process group rank = torch.distributed.get_rank() # Returns the rank of current process group else: rank = -1 torch.backends.cudnn.benchmark = True
if rank <= 0: logger.info('Something need to log')
import math from torch.utils.data import DataLoader dataset_ratio = 200 if train: train_set = define_Dataset(train_dataset) train_size = int(math.ceil(len(train_set) / batch_size)) total_epochs = int(math.ceil(total_iters / train_size)) if args.dist: world_size = torch.distributed.get_world_size() assert batch_size % world_size == 0 batch_size = batch_size // world_size train_sampler = DistIterSampler( train_set, world_size, rank, dataset_ratio ) total_epochs = int(math.ceil(total_iters / (train_size * dataset_ratio))) train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=False, num_workers=num_workers, drop_last=True, pin_memory=True, sampler=train_sampler) else: train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True, pin_memory=True) else: test_set = define_Dataset(test_dataset) test_loader = DataLoader(test_set, batch_size=1, shuffle=False, num_workers=1, drop_last=False, pin_memory=True)
須要注意的是
♠ world_size 能夠理解爲 GPU 的數量,須要保證 batch_size 能整除 world_size 即把本來一個 batch 分給幾個 GPU
♣ 使用分佈式訓練時 DataLoader 中 shuffle 須要爲 False
♥ 測試時是使用單 GPU 的
♦ 分佈式須要指定 sampler測試
DistIterSampler 的代碼以下:ui
""" Modified from torch.utils.data.distributed.DistributedSampler Support enlarging the dataset for *iter-oriented* training, for saving time when restart the dataloader after each epoch """ import math import torch import torch.distributed as dist from torch.utils.data.sampler import Sampler class DistIterSampler(Sampler): """Sampler that restricts data loading to a subset of the dataset. It is especially useful in conjunction with :class:`torch.nn.parallel.DistributedDataParallel`. In such case, each process can pass a DistributedSampler instance as a DataLoader sampler, and load a subset of the original dataset that is exclusive to it. .. note:: Dataset is assumed to be of constant size. Arguments: dataset: Dataset used for sampling. num_replicas (optional): Number of processes participating in distributed training. rank (optional): Rank of the current process within num_replicas. """ def __init__(self, dataset, num_replicas=None, rank=None, ratio=100): if num_replicas is None: if not dist.is_available(): raise RuntimeError("Requires distributed package to be available") num_replicas = dist.get_world_size() if rank is None: if not dist.is_available(): raise RuntimeError("Requires distributed package to be available") rank = dist.get_rank() self.dataset = dataset self.num_replicas = num_replicas self.rank = rank self.epoch = 0 self.num_samples = int(math.ceil(len(self.dataset) * ratio / self.num_replicas)) self.total_size = self.num_samples * self.num_replicas def __iter__(self): # deterministically shuffle based on epoch g = torch.Generator() g.manual_seed(self.epoch) indices = torch.randperm( self.total_size, generator=g ).tolist() # Returns a random permutation of integers from 0 to n - 1 dsize = len(self.dataset) indices = [v % dsize for v in indices] # subsample indices = indices[self.rank : self.total_size : self.num_replicas] assert len(indices) == self.num_samples return iter(indices) def __len__(self): return self.num_samples def set_epoch(self, epoch): self.epoch = epoch
from torch.nn.parallel import DataParallel, DistributedDataParallel device = torch.device('cuda' if opt['gpu_ids'] is not None else 'cpu') net = define_net().to(self.device) if args.dist: rank = torch.distributed.get_rank() net = DistributedDataParallel(self.netG, device_ids=[torch.cuda.current_device()]) else: rank = -1 # non dist training net = DataParallel(self.netG) input = input.to(f'cuda:{net.device_ids[0]}') if isinstance(network, nn.DataParallel) or isinstance(network, DistributedDataParallel): network = network.module state_dict = network.state_dict() for key, param in state_dict.items(): state_dict[key] = param.cpu() torch.save(state_dict, save_path)
所以須要在模型的定義、加載、保存以及輸入指定 GPU 須要修改。spa
CUDA_VISIBLE_DEVICES=0,1,2,3 python3 -m torch.distributed.launch --nproc_per_node=4 --master_port=3210 train.py
參數說明
♠ CUDA_VISIBLE_DEVICES 指定 GPU 的編號
♣ nproc_per_node 參數指定爲當前主機建立的進程數。通常設定爲當前主機的 GPU 數量
♥ master_port 分別指定 master 節點的 ip:port.net
其他就是哪錯調哪了。rest