模型並行( model parallelism ):即把模型拆分放到不一樣的設備進行訓練,分佈式系統中的不一樣機器(GPU/CPU等)負責網絡模型的不一樣部分 —— 例如,神經網絡模型的不一樣網絡層被分配到不一樣的機器,或者同一層內部的不一樣參數被分配到不一樣機器,如AlexNet的訓練。html
數據並行( data parallelism ):即把數據切分,輸入到不一樣的機器有同一個模型的多個副本,每一個機器分配到不一樣的數據,而後將全部機器的計算結果按照某種方式合併。python
torch.multiprocessing
是 Python 的 multiprocessing
多進程模塊的替代品。它支持徹底相同的操做,但對其進行了擴展,以便全部經過多進程隊列 multiprocessing.Queue
發送的張量都能將其數據移入共享內存,並且僅將其句柄發送到另外一個進程。git
注意:github
當張量
Tensor
被髮送到另外一個進程時,張量的數據和梯度torch.Tensor.grad
都將被共享。網絡
這一特性容許實現各類訓練方法,如 Hogwild,A3C 或任何其餘須要異步操做的訓練方法。app
僅 Python 3 支持進程之間共享 CUDA 張量,咱們可使用 spawn
或forkserver
啓動此類方法。 Python 2 中的 multiprocessing
多進程處理只能使用 fork
建立子進程,而且CUDA運行時不支持多進程處理。異步
警告:async
CUDA API 規定輸出到其餘進程的共享張量,只要它們被這些進程使用時,都將持續保持有效。您應該當心並確保您共享的 CUDA 張量不會超出它應該的做用範圍(不會出現做用範圍延伸的問題)。這對於共享模型的參數應該不是問題,但應該當心地傳遞其餘類型的數據。請注意,此限制不適用於共享的 CPU 內存。分佈式
也能夠參閱: 使用 nn.DataParallel 替代多進程處理優化
產生新進程時會出現不少錯誤,致使死鎖最多見的緣由是後臺線程。若是有任何持有鎖或導入模塊的線程,而且 fork
被調用,則子進程極可能處於崩潰狀態,而且會以不一樣方式死鎖或失敗。請注意,即便您沒有這樣作,Python 中內置的庫也可能會,更沒必要說 多進程處理
了。multiprocessing.Queue
多進程隊列其實是一個很是複雜的類,它產生了多個用於序列化、發送和接收對象的線程,而且它們也可能致使上述問題。若是您發現本身處於這種狀況,請嘗試使用multiprocessing.queues.SimpleQueue
,它不使用任何其餘額外的線程。
咱們正在儘量的爲您提供便利,並確保這些死鎖不會發生,但有些事情不受咱們控制。若是您有任何問題暫時沒法應對,請嘗試到論壇求助,咱們會查看是否能夠解決問題。
請記住,每次將張量放入多進程隊列 multiprocessing.Queue
時,它必須被移動到共享內存中。若是它已經被共享,將會是一個空操做,不然會產生一個額外的內存拷貝,這會減慢整個過程。即便您有一組進程將數據發送到單個進程,也可讓它將緩衝區發送回去,這幾乎是不佔資源的,而且能夠在發送下一批時避免產生拷貝動做。
使用多進程處理 torch.multiprocessing
,能夠異步地訓練一個模型,參數既能夠一直共享,也能夠週期性同步。在第一種狀況下,咱們建議發送整個模型對象,而在後者中,咱們建議只發送狀態字典 state_dict()
。
咱們建議使用多進程處理隊列 multiprocessing.Queue
在進程之間傳遞各類 PyTorch 對象。使用 fork
啓動一個方法時,它也可能會繼承共享內存中的張量和存儲空間,但這種方式也很是容易出錯,應謹慎使用,最好只能讓高階用戶使用。而隊列,儘管它們有時候不太優雅,卻能在任何狀況下正常工做。
警告:
你應該留意沒有用
if __name__ =='__main__'
來保護的全局語句。若是使用了不一樣於fork
啓動方法,它們將在全部子進程中執行。
具體的 Hogwild 實現能夠在 示例庫 中找到,但爲了展現代碼的總體結構,下面還有一個最簡單的示例:
import torch.multiprocessing as mp from model import MyModel def train(model): # 構建 data_loader,優化器等 for data, labels in data_loader: optimizer.zero_grad() loss_fn(model(data), labels).backward() optimizer.step() # 更新共享的參數 if __name__ == '__main__': num_processes = 4 model = MyModel() # 注意:這是 "fork" 方法工做所必需的 model.share_memory() processes = [] for rank in range(num_processes): p = mp.Process(target=train, args=(model,)) p.start() processes.append(p) for p in processes: p.join()
https://ptorch.com/news/176.html