日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 人工智能 > pytorch >内容正文

pytorch

[深度学习] 分布式Pytorch介绍(三)

發(fā)布時(shí)間:2023/12/15 pytorch 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [深度学习] 分布式Pytorch介绍(三) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

[深度學(xué)習(xí)] 分布式模式介紹(一)

[深度學(xué)習(xí)] 分布式Tensorflow介紹(二)

[深度學(xué)習(xí)] 分布式Pytorch介紹(三)

[深度學(xué)習(xí)] 分布式Horovod介紹(四)


?一? Pytorch 分布式簡(jiǎn)介

https://pytorch.org/docs/stable/distributed.html

torch.distributed 包支持

Pytorch 中通過 torch.distributed 包提供分布式支持,包括 GPU 和 CPU 的分布式訓(xùn)練支持。Pytorch 分布式目前只支持 Linux.

By default, the Gloo and NCCL backends are built and included in PyTorch distributed (NCCL only when building with CUDA). MPI is an optional backend that can only be included if you build PyTorch from source. (e.g. building PyTorch on a host that has MPI installed.)

Backends

Which backend to use?

In the past, we were often asked: “which backend should I use?”.

  • Rule of thumb

    • Use the NCCL backend for distributed GPU training

    • Use the Gloo backend for distributed CPU training.

  • GPU hosts with InfiniBand interconnect

    • Use NCCL, since it’s the only backend that currently supports InfiniBand and GPU Direct.

  • GPU hosts with Ethernet interconnect

    • Use NCCL, since it currently provides the best distributed GPU training performance, especially for multiprocess single-node or multi-node distributed training. If you encounter any problem with NCCL, use Gloo as the fallback option. (Note that Gloo currently runs slower than NCCL for GPUs.)

  • CPU hosts with InfiniBand interconnect

    • If your InfiniBand has enabled IP over IB, use Gloo, otherwise, use MPI instead. We are planning on adding InfiniBand support for Gloo in the upcoming releases.

  • CPU hosts with Ethernet interconnect

    • Use Gloo, unless you have specific reasons to use MPI.

torch.distributed 的優(yōu)勢(shì)如下:

1. 每個(gè)進(jìn)程對(duì)應(yīng)一個(gè)獨(dú)立的訓(xùn)練過程,且只對(duì)梯度等少量數(shù)據(jù)進(jìn)行信息交換。

在每次迭代中,每個(gè)進(jìn)程具有自己的 optimizer ,并獨(dú)立完成所有的優(yōu)化步驟,進(jìn)程內(nèi)與一般的訓(xùn)練無異。

在各進(jìn)程梯度計(jì)算完成之后,各進(jìn)程需要將梯度進(jìn)行匯總平均,然后再由 rank=0 的進(jìn)程,將其 broadcast 到所有進(jìn)程。之后,各進(jìn)程用該梯度來更新參數(shù)。由于各進(jìn)程中的模型,初始參數(shù)一致 (初始時(shí)刻進(jìn)行一次 broadcast),而每次用于更新參數(shù)的梯度也一致,因此,各進(jìn)程的模型參數(shù)始終保持一致。

而在 DataParallel 中,全程維護(hù)一個(gè) optimizer,對(duì)各 GPU 上梯度進(jìn)行求和,而在主 GPU 進(jìn)行參數(shù)更新,之后再將模型參數(shù) broadcast 到其他 GPU。

相較于 DataParallel,torch.distributed 傳輸?shù)臄?shù)據(jù)量更少,因此速度更快,效率更高。

2. 每個(gè)進(jìn)程包含獨(dú)立的解釋器和 GIL

由于每個(gè)進(jìn)程擁有獨(dú)立的解釋器和 GIL,消除了來自單個(gè) Python 進(jìn)程中的多個(gè)執(zhí)行線程,模型副本或 GPU 的額外解釋器開銷和 GIL-thrashing ,因此可以減少解釋器和 GIL 使用沖突。這對(duì)于嚴(yán)重依賴 Python runtime 的 models 而言,比如說包含 RNN 層或大量小組件的 models 而言,這尤為重要。

新的庫(kù)的主要亮點(diǎn)有:

  • 新的 torch.distributed 是性能驅(qū)動(dòng)的,并且對(duì)所有后端 (Gloo,NCCL 和 MPI) 完全異步操作
  • 顯著的分布式數(shù)據(jù)并行性能改進(jìn),尤其適用于網(wǎng)絡(luò)較慢的主機(jī),如基于以太網(wǎng)的主機(jī)
  • 為torch.distributed? package中的所有分布式集合操作添加異步支持
  • 在Gloo后端添加以下CPU操作:send,recv,reduce,all_gather,gather,scatter
  • 在NCCL后端添加barrier操作
  • 為NCCL后端添加new_group支持

二 分布式訓(xùn)練介紹

分布式訓(xùn)練可以分為:

  • 單機(jī)多卡,DataParallel(最常用,最簡(jiǎn)單)
  • 單機(jī)多卡,DistributedDataParallel(較高級(jí))
  • 多機(jī)多卡,DistributedDataParallel(最高級(jí))
  • Pytorch分布訓(xùn)練一開始接觸的往往是DataParallel,這個(gè)wrapper能夠很方便的使用多張卡,而且將進(jìn)程控制在一個(gè)。唯一的問題就在于,DataParallel只能滿足一臺(tái)機(jī)器上gpu的通信,而一臺(tái)機(jī)器一般只能裝8張卡,對(duì)于一些大任務(wù),8張卡就很吃力了,這個(gè)時(shí)候我們就需要面對(duì)多機(jī)多卡分布式訓(xùn)練這個(gè)問題了

    DistributedDataParallel (DDP)在模塊級(jí)別實(shí)現(xiàn)數(shù)據(jù)并行性。 它使用 Torch.distributed 程序包中的通信集合來同步梯度,參數(shù)和緩沖區(qū)。 并行性在流程內(nèi)和跨流程均可用。 在一個(gè)過程中,DDP 將輸入模塊復(fù)制到device_ids中指定的設(shè)備,將輸入沿批次維度分散,然后將輸出收集到output_device,這與 DataParallel 相似。 在整個(gè)過程中,DDP 在正向傳遞中插入必要的參數(shù)同步,在反向傳遞中插入梯度同步。 用戶可以將進(jìn)程映射到可用資源,只要進(jìn)程不共享 GPU 設(shè)備即可。 推薦的方法(通常是最快的方法)是為每個(gè)模塊副本創(chuàng)建一個(gè)過程,即在一個(gè)過程中不進(jìn)行任何模塊復(fù)制。

    DataParallel和DistributedDataParallel之間的比較

    在深入探討之前,讓我們澄清一下為什么盡管增加了復(fù)雜性,但還是考慮使用DistributedDataParallel而不是DataParallel:

  • 如果模型太大而無法容納在單個(gè) GPU 上,則必須使用模型并行將其拆分到多個(gè) GPU 中。?DistributedDataParallel與模型并行一起使用;?DataParallel目前沒有。
  • DataParallel是單進(jìn)程,多線程,并且只能在單臺(tái)機(jī)器上運(yùn)行,而DistributedDataParallel是多進(jìn)程,并且適用于單機(jī)和多機(jī)訓(xùn)練。 因此,即使在單機(jī)訓(xùn)練中,數(shù)據(jù)足夠小以適合單機(jī),DistributedDataParallel仍比DataParallel快。?DistributedDataParallel還預(yù)先復(fù)制模型,而不是在每次迭代時(shí)復(fù)制模型,并避免了全局解釋器鎖定。
  • 如果您的兩個(gè)數(shù)據(jù)都太大而無法容納在一臺(tái)計(jì)算機(jī)和上,而您的模型又太大了以至于無法安裝在單個(gè) GPU 上,則可以將模型并行(跨多個(gè) GPU 拆分單個(gè)模型)與DistributedDataParallel結(jié)合使用。 在這種情況下,每個(gè)DistributedDataParallel進(jìn)程都可以并行使用模型,而所有進(jìn)程都將并行使用數(shù)據(jù)。
  • 1:? torch.nn.parallel.DistributedDataParallel

    這個(gè)從名字上就能看出來與DataParallel相類似,也是一個(gè)模型wrapper。這個(gè)包是實(shí)現(xiàn)多機(jī)多卡分布訓(xùn)練最核心東西,它可以幫助我們?cè)诓煌瑱C(jī)器的多個(gè)模型拷貝之間平均梯度。

    2: torch.utils.data.distributed.DistributedSampler

    在多機(jī)多卡情況下分布式訓(xùn)練數(shù)據(jù)的讀取也是一個(gè)問題,不同的卡讀取到的數(shù)據(jù)應(yīng)該是不同的。dataparallel的做法是直接將batch切分到不同的卡,這種方法對(duì)于多機(jī)來說不可取,因?yàn)槎鄼C(jī)之間直接進(jìn)行數(shù)據(jù)傳輸會(huì)嚴(yán)重影響效率。于是有了利用sampler確保dataloader只會(huì)load到整個(gè)數(shù)據(jù)集的一個(gè)特定子集的做法。DistributedSampler就是做這件事的。它為每一個(gè)子進(jìn)程劃分出一部分?jǐn)?shù)據(jù)集,以避免不同進(jìn)程之間數(shù)據(jù)重復(fù)

    1.0的多機(jī)多卡的計(jì)算模型并沒有采用主流的Parameter Server結(jié)構(gòu),而是直接用了Uber Horovod的形式,也是百度開源的RingAllReduce算法。

    采用PS計(jì)算模型的分布式,通常會(huì)遇到網(wǎng)絡(luò)的問題,隨著worker數(shù)量的增加,其加速比會(huì)迅速的惡化,例如resnet50這樣的模型,目前的TF在10幾臺(tái)機(jī)器的時(shí)候,加速比已經(jīng)開始惡化的不可接受了。因此,經(jīng)常要上RDMA、InfiniBand等技術(shù),并且還帶來了一波網(wǎng)卡的升級(jí),有些大廠直接上了100GBs的網(wǎng)卡,有錢任性。而Uber的Horovod,采用的RingAllReduce的計(jì)算方案,其特點(diǎn)是網(wǎng)絡(luò)通信量不隨著worker(GPU)的增加而增加,是一個(gè)恒定值。簡(jiǎn)單看下圖理解下,GPU 集群被組織成一個(gè)邏輯環(huán),每個(gè)GPU有一個(gè)左鄰居、一個(gè)右鄰居,每個(gè)GPU只從左鄰居接受數(shù)據(jù)、并發(fā)送數(shù)據(jù)給右鄰居。即每次梯度每個(gè)gpu只獲得部分梯度更新,等一個(gè)完整的Ring完成,每個(gè)GPU都獲得了完整的參數(shù)。

    這里引入了一個(gè)新的函數(shù)model = torch.nn.parallel.DistributedDataParallel(model)為的就是支持分布式模式

    不同于原來在multiprocessing中的model = torch.nn.DataParallel(model,device_ids=[0,1,2,3]).cuda()函數(shù),這個(gè)函數(shù)只是實(shí)現(xiàn)了在單機(jī)上的多GPU訓(xùn)練,根據(jù)官方文檔的說法,甚至在單機(jī)多卡的模式下,新函數(shù)表現(xiàn)也會(huì)優(yōu)于這個(gè)舊函數(shù)。

    這里要提到兩個(gè)問題:

    • 每個(gè)進(jìn)程都有自己的Optimizer同時(shí)每個(gè)迭代中都進(jìn)行完整的優(yōu)化步驟,雖然這可能看起來是多余的,但由于梯度已經(jīng)聚集在一起并跨進(jìn)程平均,因此對(duì)于每個(gè)進(jìn)程都是相同的,這意味著不需要參數(shù)廣播步驟,從而減少了在節(jié)點(diǎn)之間傳輸張量tensor所花費(fèi)的時(shí)間。
    • 另外一個(gè)問題是Python解釋器的,每個(gè)進(jìn)程都包含一個(gè)獨(dú)立的Python解釋器,消除了來自單個(gè)Python進(jìn)程中的多個(gè)執(zhí)行線程,模型副本或GPU的額外解釋器開銷和“GIL-thrashing”。 這對(duì)于大量使用Python運(yùn)行時(shí)的模型尤其重要。

    基本使用流程

    Pytorch 中分布式的基本使用流程如下:

  • 在使用 distributed 包的任何其他函數(shù)之前,需要使用 init_process_group 初始化進(jìn)程組,同時(shí)初始化 distributed 包。
  • 如果需要進(jìn)行小組內(nèi)集體通信,用 new_group 創(chuàng)建子分組
  • 創(chuàng)建分布式并行模型 DDP(model, device_ids=device_ids)
  • 為數(shù)據(jù)集創(chuàng)建 Sampler
  • 使用啟動(dòng)工具 torch.distributed.launch 在每個(gè)主機(jī)上執(zhí)行一次腳本,開始訓(xùn)練
  • 使用 destory_process_group() 銷毀進(jìn)程組
  • train_dataset最好不要用自己寫的sampler,否則還需要再實(shí)現(xiàn)一遍分布式的數(shù)據(jù)劃分方式

    單機(jī)多卡--DistributedDataParallel

    前面的 DataParallel 并不是完整的分布式計(jì)算,只是將一些部分的計(jì)算(例如,前向和反向)放到了多張卡上,某些東西(例如,梯度)計(jì)算的時(shí)候仍然是「一卡有難,八方圍觀」,可能會(huì)將第一張卡撐爆,并且和 DDP 對(duì)比的話效率實(shí)在不高。

    首先增加幾個(gè)概念:

  • world_size:總共有幾個(gè) Worker
  • rank:這個(gè) Worker 是全局第幾個(gè) Worker
  • local_rank:這個(gè) Worker 是這臺(tái)機(jī)器上的第幾個(gè) Worker
  • 每個(gè) Worker 可以用一張或者多張卡,但習(xí)慣于一個(gè) Worker 只用一張卡
  • import torch import torch.nn as nn from torch.autograd import Variable from torch.utils.data import Dataset, DataLoader import os from torch.utils.data.distributed import DistributedSampler # 1) 初始化 torch.distributed.init_process_group(backend="nccl")input_size = 5 output_size = 2 batch_size = 30 data_size = 90# 2) 配置每個(gè)進(jìn)程的gpu local_rank = torch.distributed.get_rank() torch.cuda.set_device(local_rank) device = torch.device("cuda", local_rank)class RandomDataset(Dataset):def __init__(self, size, length):self.len = lengthself.data = torch.randn(length, size).to('cuda')def __getitem__(self, index):return self.data[index]def __len__(self):return self.lendataset = RandomDataset(input_size, data_size) # 3)使用DistributedSampler rand_loader = DataLoader(dataset=dataset,batch_size=batch_size,sampler=DistributedSampler(dataset))class Model(nn.Module):def __init__(self, input_size, output_size):super(Model, self).__init__()self.fc = nn.Linear(input_size, output_size)def forward(self, input):output = self.fc(input)print(" In Model: input size", input.size(),"output size", output.size())return outputmodel = Model(input_size, output_size)# 4) 封裝之前要把模型移到對(duì)應(yīng)的gpu model.to(device)if torch.cuda.device_count() > 1:print("Let's use", torch.cuda.device_count(), "GPUs!")# 5) 封裝model = torch.nn.parallel.DistributedDataParallel(model,device_ids=[local_rank],output_device=local_rank)for data in rand_loader:if torch.cuda.is_available():input_var = data.cuda()else:input_var = dataoutput = model(input_var)print("Outside: input size", input_var.size(), "output_size", output.size())

    torch.distributed.launch 會(huì)給模型分配一個(gè)args.local_rank的參數(shù),也可以通過torch.distributed.get_rank()獲取進(jìn)程id。

    怎么將程序跑起來。這里也有兩種方法:

    1. 用 torch.distributed.launch:

    python -m torch.distributed.launch --nproc_per_node=4 main.py

    2. 用 torch.multiprocessing:

    import torch.multiprocessing as mpdef main(rank, your_custom_arg_1, your_custom_arg_2): # 這里第一個(gè) rank 變量會(huì)被 mp.spawn 函數(shù)自動(dòng)填充,可以充當(dāng) local_rank 來用pass # 將前面那一堆東西包裝成一個(gè) main 函數(shù)mp.spawn(main, nprocs=how_many_process, args=(your_custom_arg_1, your_custom_arg_2))

    三 分布式初始化

    torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(0, 1800), world_size=-1, rank=-1, store=None, group_name='')

    參數(shù)說明:

    • backend(str): 后端選擇,包括上面那幾種 gloo,nccl,mpi
    • init_method(str,optional): 用來初始化包的URL, 我理解是一個(gè)用來做并發(fā)控制的共享方式
    • world_size(int, optional): 參與這個(gè)工作的進(jìn)程數(shù)
    • rank(int,optional): 當(dāng)前進(jìn)程的rank
    • group_name(str,optional): 用來標(biāo)記這組進(jìn)程名的

    To enable backend == Backend.MPI, PyTorch needs to built from source on a system that supports MPI.

    The same applies to NCCL as well.

    四? init_method 三種方式

    1. TCP initialization

    tcp:// IP組播(要求所有進(jìn)程都在同一個(gè)網(wǎng)絡(luò)中)比較好理解,?? 以TCP協(xié)議的方式進(jìn)行不同分布式進(jìn)程之間的數(shù)據(jù)交流,需要設(shè)置一個(gè)端口,不同進(jìn)程之間公用這一個(gè)端口,并且設(shè)置host的級(jí)別和host的數(shù)量。設(shè)計(jì)兩個(gè)參數(shù)rank和world_size。其中rank為host的編號(hào),默認(rèn)0為主機(jī),端口應(yīng)該位于該主機(jī)上。world_size為分布式主機(jī)的個(gè)數(shù)。

    用該方式,運(yùn)行上面的代碼可以使用如下指令:

    在主機(jī)01上:

    python mnsit.py --init-method tcp://10.172.1.2:22225 --rank 0 --world-size 2

    在主機(jī)02上:

    python mnsit.py --init-method tcp://10.172.1.2:22225 --rank 1 --world-size 2

    這里沒有設(shè)置backend參數(shù),所以默認(rèn)是gloo。22225是端口號(hào),用一個(gè)沒有沒占用的就行。這兩句指令的先后順序沒有要求,只有兩條指令都輸入,程序才會(huì)運(yùn)行起來。

    2. Shared file-system initialization

    file:// 共享文件系統(tǒng)(要求所有進(jìn)程可以訪問單個(gè)文件系統(tǒng))有共享文件系統(tǒng)可以選擇?

    提供的第二種方式是文件共享,機(jī)器有共享的文件系統(tǒng),故可以采用這種方式,也避免了基于TCP的網(wǎng)絡(luò)傳輸。這里使用方式是使用絕對(duì)路徑在指定一個(gè)共享文件系統(tǒng)下不存在的文件。

    在主機(jī)01上:

    python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 0 --world-size 2

    在主機(jī)02上:

    python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 1 --world-size 2

    這里相比于TCP的方式麻煩一點(diǎn)的是運(yùn)行完一次必須更換共享的文件名,或者刪除之前的共享文件,不然第二次運(yùn)行會(huì)報(bào)錯(cuò)。

    3. Environment variable initialization

    env:// 環(huán)境變量(需要您手動(dòng)分配等級(jí)并知道所有進(jìn)程可訪問節(jié)點(diǎn)的地址)默認(rèn)是這個(gè)

    MASTER_PORT - required; has to be a free port on machine with rank 0 MASTER_ADDR - required (except for rank 0); address of rank 0 node WORLD_SIZE - required; can be set either here, or in a call to init function RANK - required; can be set either here, or in a call to init function

    五? torch.distributed.launch

    torch.distributed包提供了一個(gè)啟動(dòng)實(shí)用程序torch.distributed.launch,此幫助程序可用于為每個(gè)節(jié)點(diǎn)啟動(dòng)多個(gè)進(jìn)程以進(jìn)行分布式訓(xùn)練,它在每個(gè)訓(xùn)練節(jié)點(diǎn)上產(chǎn)生多個(gè)分布式訓(xùn)練進(jìn)程。

    這個(gè)工具可以用作CPU或者GPU,如果被用于GPU,每個(gè)GPU產(chǎn)生一個(gè)進(jìn)程Process

    該工具既可以用來做單節(jié)點(diǎn)多GPU訓(xùn)練,也可用于多節(jié)點(diǎn)多GPU訓(xùn)練。如果是單節(jié)點(diǎn)多GPU,將會(huì)在單個(gè)GPU上運(yùn)行一個(gè)分布式進(jìn)程,據(jù)稱可以非常好地改進(jìn)單節(jié)點(diǎn)訓(xùn)練性能。如果用于多節(jié)點(diǎn)分布式訓(xùn)練,則通過在每個(gè)節(jié)點(diǎn)上產(chǎn)生多個(gè)進(jìn)程來獲得更好的多節(jié)點(diǎn)分布式訓(xùn)練性能。如果有Infiniband接口則加速比會(huì)更高。

    在單節(jié)點(diǎn)分布式訓(xùn)練或多節(jié)點(diǎn)分布式訓(xùn)練的兩種情況下,該工具將為每個(gè)節(jié)點(diǎn)啟動(dòng)給定數(shù)量的進(jìn)程(--nproc_per_node)。如果用于GPU訓(xùn)練,則此數(shù)字需要小于或等于當(dāng)前系統(tǒng)上的GPU數(shù)量(nproc_per_node),并且每個(gè)進(jìn)程將在從GPU 0到GPU(nproc_per_node - 1)的單個(gè)GPU上運(yùn)行。

    1. Single-Node multi-process distributed training

    python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVEYOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all otherarguments of your training script)

    2. Multi-Node multi-process distributed training: (e.g. two nodes)

    Node 1: (IP: 192.168.1.1, and has a free port: 1234)

    python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE--nnodes=2 --node_rank=0 --master_addr="192.168.1.1"--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3and all other arguments of your training script)

    Node 2:

    python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE--nnodes=2 --node_rank=1 --master_addr="192.168.1.1"--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3and all other arguments of your training script)

    需要注意的地方:

    • 后端最好用“NCCL”,才能獲取最好的分布式性能
    • 訓(xùn)練代碼必須從命令行解析--local_rank=LOCAL_PROCESS_RANK
    import argparse parser = argparse.ArgumentParser() parser.add_argument("--local_rank", type=int) args = parser.parse_args()torch.cuda.set_device(arg.local_rank)
    • torch.distributed初始化方式
    torch.distributed.init_process_group(backend='nccl',init_method='env://')
    • model
    model = torch.nn.parallel.DistributedDataParallel(model,device_ids=[arg.local_rank],output_device=arg.local_rank)

    訓(xùn)練代碼中這樣寫(省略多余代碼,只保留核心代碼):

    import torch.distributed as dist # 這個(gè)參數(shù)是torch.distributed.launch傳遞過來的,我們?cè)O(shè)置位置參數(shù)來接受,local_rank代表當(dāng)前程序進(jìn)程使用的GPU標(biāo)號(hào) parser.add_argument("--local_rank", type=int, default=0) def synchronize():"""Helper function to synchronize (barrier) among all processes whenusing distributed training"""if not dist.is_available():returnif not dist.is_initialized():returnworld_size = dist.get_world_size()if world_size == 1:returndist.barrier()## WORLD_SIZE 由torch.distributed.launch.py產(chǎn)生 具體數(shù)值為 nproc_per_node*node(主機(jī)數(shù),這里為1) num_gpus = int(os.environ["WORLD_SIZE"]) if "WORLD_SIZE" in os.environ else 1is_distributed = num_gpus > 1if is_distributed:torch.cuda.set_device(args.local_rank) # 這里設(shè)定每一個(gè)進(jìn)程使用的GPU是一定的torch.distributed.init_process_group(backend="nccl", init_method="env://")synchronize()# 將模型移至到DistributedDataParallel中,此時(shí)就可以進(jìn)行訓(xùn)練了 if is_distributed:model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank,# this should be removed if we update BatchNorm statsbroadcast_buffers=False,)# 注意,在測(cè)試的時(shí)候需要執(zhí)行 model = model.module

    WRITING DISTRIBUTED APPLICATIONS WITH PYTORCH

    https://github.com/pytorch/examples/tree/master/imagenet?
    這里,常規(guī)的操作就不多敘述了,主要講一下和分布式相關(guān)的代碼部分。

    parser.add_argument('--world-size', default=2, type=int, help='number of distributed processes') parser.add_argument('--dist-url', default='tcp://172.16.1.186:2222', type=str, help='url used to set up distributed training') parser.add_argument('--dist-backend', default='gloo', type=str, help='distributed backend') parser.add_argument('--dist-rank', default=0, type=int, help='rank of distributed processes')

    這幾個(gè)是必要的參數(shù)設(shè)置,其中最后一個(gè)是官網(wǎng)沒有的

    if args.distributed:dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,world_size=args.world_size,rank=args.dist_rank)

    這個(gè)是分布式的初始化,同樣,最后添加一個(gè)rank

    model.cuda() model = torch.nn.parallel.DistributedDataParallel(model)

    這里,把我們平時(shí)使用的單機(jī)多卡,數(shù)據(jù)并行的API

    model = torch.nn.DataParallel(model).cuda()

    換掉即可。

    if args.distributed:train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)

    最后使用這個(gè)官方給的劃分方法,把數(shù)據(jù)集劃分即可。

    五 MNIST 分布式例子

    MNIST 分布式 (2臺(tái) GPU機(jī)器,每臺(tái)一張GPU)

    ?GPU1:? ?python mnist_dist.py --init-method=file:///home/workspace/share/1?--rank=0 --world-size=2

    ?GPU2:? ?python mnist_dist.py --init-method=file:///home/workspace/share/1?--rank=1?--world-size=2

    from __future__ import print_function import argparse import timeimport torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transformsimport torch.distributed as dist import torch.utils.data import torch.utils.data.distributedclass Net(nn.Module):def __init__(self):super(Net, self).__init__()self.conv1 = nn.Conv2d(1, 20, 5, 1)self.conv2 = nn.Conv2d(20, 50, 5, 1)self.fc1 = nn.Linear(4 * 4 * 50, 500)self.fc2 = nn.Linear(500, 10)def forward(self, x):x = F.relu(self.conv1(x))x = F.max_pool2d(x, 2, 2)x = F.relu(self.conv2(x))x = F.max_pool2d(x, 2, 2)x = x.view(-1, 4 * 4 * 50)x = F.relu(self.fc1(x))x = self.fc2(x)return F.log_softmax(x)def train(args, model, device, train_loader, optimizer, epoch):model.train()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = F.nll_loss(output, target)loss.backward()optimizer.step()if batch_idx % args.log_interval == 0:print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), len(train_loader.dataset),100. * batch_idx / len(train_loader), loss.item()))def test(args, model, device, test_loader):model.eval()test_loss = 0correct = 0for data, target in test_loader:data, target = data.to(device), target.to(device)output = model(data)test_loss += F.nll_loss(output, target, size_average=False).item() # sum up batch losspred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probabilitycorrect += pred.eq(target.data.view_as(pred)).cpu().sum()test_loss /= len(test_loader.dataset)print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct, len(test_loader.dataset),100. * correct / len(test_loader.dataset)))def main():# Training settingsparser = argparse.ArgumentParser(description='PyTorch MNIST Example')parser.add_argument('--batch-size', type=int, default=64, metavar='N',help='input batch size for training (default: 64)')parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',help='input batch size for testing (default: 1000)')parser.add_argument('--epochs', type=int, default=10, metavar='N',help='number of epochs to train (default: 10)')parser.add_argument('--lr', type=float, default=0.01, metavar='LR',help='learning rate (default: 0.01)')parser.add_argument('--momentum', type=float, default=0.5, metavar='M',help='SGD momentum (default: 0.5)')parser.add_argument('--no-cuda', action='store_true',help='disables CUDA training')parser.add_argument('--seed', type=int, default=1, metavar='S',help='random seed (default: 1)')parser.add_argument('--log-interval', type=int, default=500, metavar='N',help='how many batches to wait before logging training status')parser.add_argument('--init-method', type=str, default='tcp://127.0.0.1:23456')parser.add_argument('--rank', type=int)parser.add_argument('--world-size', type=int)args = parser.parse_args()use_cuda = not args.no_cuda and torch.cuda.is_available()print(args)# 初始化dist.init_process_group(init_method=args.init_method, backend="gloo", world_size=args.world_size, rank=args.rank,group_name="pytorch_test")torch.manual_seed(args.seed)if use_cuda:torch.cuda.manual_seed(args.seed)train_dataset = datasets.MNIST('./data', train=True, download=False,transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))]))# 分發(fā)數(shù)據(jù)train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)kwargs = {'num_workers': 5, 'pin_memory': True} if use_cuda else {}train_loader = torch.utils.data.DataLoader(train_dataset,batch_size=args.batch_size, shuffle=True, **kwargs)test_loader = torch.utils.data.DataLoader(datasets.MNIST('data', train=False, transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])),batch_size=args.test_batch_size, shuffle=True, **kwargs)device = torch.device("cuda" if use_cuda else "cpu")print(device)model = Net().to(device)if use_cuda:model = torch.nn.parallel.DistributedDataParallel(model) if use_cuda else torch.nn.parallel.DistributedDataParallelCPU(model)optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)total_time = 0for epoch in range(1, args.epochs + 1):# 設(shè)置epoch位置,這應(yīng)該是個(gè)為了同步所做的工作train_sampler.set_epoch(epoch)start_cpu_secs = time.time()train(args, model, device, train_loader, optimizer, epoch)end_cpu_secs = time.time()print("Epoch {} of {} took {:.3f}s".format(epoch, args.epochs, end_cpu_secs - start_cpu_secs))total_time += end_cpu_secs - start_cpu_secstest(args, model, device, test_loader)print("Total time= {:.3f}s".format(total_time))if __name__ == '__main__':main()

    其他單機(jī)例子(一臺(tái)GPU機(jī)器一張GPU卡)

    python mnist_no_dist.py?

    from __future__ import print_function import argparse import timeimport torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transformsclass Net(nn.Module):def __init__(self):super(Net, self).__init__()self.conv1 = nn.Conv2d(1, 20, 5, 1)self.conv2 = nn.Conv2d(20, 50, 5, 1)self.fc1 = nn.Linear(4 * 4 * 50, 500)self.fc2 = nn.Linear(500, 10)def forward(self, x):x = F.relu(self.conv1(x))x = F.max_pool2d(x, 2, 2)x = F.relu(self.conv2(x))x = F.max_pool2d(x, 2, 2)x = x.view(-1, 4 * 4 * 50)x = F.relu(self.fc1(x))x = self.fc2(x)return F.log_softmax(x, dim=1)def train(args, model, device, train_loader, optimizer, epoch):model.train()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = F.nll_loss(output, target)loss.backward()optimizer.step()if batch_idx % args.log_interval == 0:print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), len(train_loader.dataset),100. * batch_idx / len(train_loader), loss.item()))def test(args, model, device, test_loader):model.eval()test_loss = 0correct = 0with torch.no_grad():for data, target in test_loader:data, target = data.to(device), target.to(device)output = model(data)test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch losspred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probabilitycorrect += pred.eq(target.view_as(pred)).sum().item()test_loss /= len(test_loader.dataset)print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct, len(test_loader.dataset),100. * correct / len(test_loader.dataset)))def main():# Training settingsparser = argparse.ArgumentParser(description='PyTorch MNIST Example')parser.add_argument('--batch-size', type=int, default=64, metavar='N',help='input batch size for training (default: 64)')parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',help='input batch size for testing (default: 1000)')parser.add_argument('--epochs', type=int, default=10, metavar='N',help='number of epochs to train (default: 10)')parser.add_argument('--lr', type=float, default=0.01, metavar='LR',help='learning rate (default: 0.01)')parser.add_argument('--momentum', type=float, default=0.5, metavar='M',help='SGD momentum (default: 0.5)')parser.add_argument('--no-cuda', action='store_true', default=False,help='disables CUDA training')parser.add_argument('--seed', type=int, default=1, metavar='S',help='random seed (default: 1)')parser.add_argument('--log-interval', type=int, default=500, metavar='N',help='how many batches to wait before logging training status')# parser.add_argument('--save-model', action='store_true', default=False,# help='For Saving the current Model')args = parser.parse_args()print(args)use_cuda = not args.no_cuda and torch.cuda.is_available()torch.manual_seed(args.seed)kwargs = {'num_workers': 5, 'pin_memory': True} if use_cuda else {}train_loader = torch.utils.data.DataLoader(datasets.MNIST('./data', train=True, download=True,transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])),batch_size=args.batch_size, shuffle=True, **kwargs)test_loader = torch.utils.data.DataLoader(datasets.MNIST('./data', train=False, transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])),batch_size=args.test_batch_size, shuffle=True, **kwargs)device = torch.device("cuda" if use_cuda else "cpu")print(device)model = Net().to(device)optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)total_time = 0for epoch in range(1, args.epochs + 1):start_cpu_secs = time.time()train(args, model, device, train_loader, optimizer, epoch)end_cpu_secs = time.time()print("Epoch {} of {} took {:.3f}s".format(epoch, args.epochs, end_cpu_secs - start_cpu_secs))total_time += end_cpu_secs - start_cpu_secstest(args, model, device, test_loader)# print("Total time= {:.3f}s".format(total_time))# if (args.save_model):# torch.save(model.state_dict(), "mnist_cnn.pt")if __name__ == '__main__':main()

    Dataloader中的參數(shù)

    如果你的選項(xiàng)剛好是最壞情況,優(yōu)化這個(gè)有可能達(dá)到2倍左右的性能提升(經(jīng)驗(yàn)值哈)

    解釋一下DataLoader中其中兩個(gè)參數(shù):

    • num_worker:
      數(shù)據(jù)集加載的時(shí)候,控制用于同時(shí)加載數(shù)據(jù)的線程數(shù)(默認(rèn)為0,即在主線程讀取)?存在最優(yōu)值,你會(huì)看到運(yùn)行的時(shí)候pytorch會(huì)新建恰等于這個(gè)值的數(shù)據(jù)讀取線程,我猜,線程多于必要的時(shí)候,數(shù)據(jù)讀取線程返回到主線程反而會(huì)因?yàn)榫€程間通信減慢數(shù)據(jù)。因此大了不好小了也不好。建議把模型,loss,優(yōu)化器全注釋了只跑一下數(shù)據(jù)流速度,確定最優(yōu)值
    • pin_memory:
      是否提前申請(qǐng)CUDA內(nèi)存(默認(rèn)為False,但有說法除非數(shù)據(jù)集很小,否則在N卡上推薦總是打開)在MNIST這樣的小數(shù)據(jù)集上好像是關(guān)閉比較好,到底多小算小說不清楚,建議自己試一下。

    pin_memory就是鎖頁(yè)內(nèi)存,創(chuàng)建DataLoader時(shí),設(shè)置pin_memory=True,則意味著生成的Tensor數(shù)據(jù)最開始是屬于內(nèi)存中的鎖頁(yè)內(nèi)存,這樣將內(nèi)存的Tensor轉(zhuǎn)義到GPU的顯存就會(huì)更快一些。
    主機(jī)中的內(nèi)存,有兩種存在方式,一是鎖頁(yè),二是不鎖頁(yè),鎖頁(yè)內(nèi)存存放的內(nèi)容在任何情況下都不會(huì)與主機(jī)的虛擬內(nèi)存進(jìn)行交換(注:虛擬內(nèi)存就是硬盤),而不鎖頁(yè)內(nèi)存在主機(jī)內(nèi)存不足時(shí),數(shù)據(jù)會(huì)存放在虛擬內(nèi)存中。顯卡中的顯存全部是鎖頁(yè)內(nèi)存,當(dāng)計(jì)算機(jī)的內(nèi)存充足的時(shí)候,可以設(shè)置pin_memory=True。當(dāng)系統(tǒng)卡住,或者交換內(nèi)存使用過多的時(shí)候,設(shè)置pin_memory=False。因?yàn)閜in_memory與電腦硬件性能有關(guān),pytorch開發(fā)者不能確保每一個(gè)煉丹玩家都有高端設(shè)備,因此pin_memory默認(rèn)為False。

    如果機(jī)子的內(nèi)存比較大,建議開啟pin_memory=Ture,如果開啟后發(fā)現(xiàn)有卡頓現(xiàn)象或者內(nèi)存占用過高,此時(shí)建議關(guān)閉。

    總之官方的默認(rèn)值很有可能不是最好的,建議自己多試試。在MNIST這樣的小數(shù)據(jù)集上,pin_memory關(guān)閉比較好。而且,num_workers需要調(diào)節(jié),除了默認(rèn)情況外,最快和最慢是有一定差距的,建議在自己的代碼上只跑數(shù)據(jù)讀取這一塊,確定這兩個(gè)參數(shù)的最優(yōu)值。

    參考:

    https://www.w3cschool.cn/pytorch/pytorch-hv9o3bsy.html

  • PyTorch 1.0 中文官方教程:使用 Amazon AWS 進(jìn)行分布式訓(xùn)練
  • https://pytorch.org/docs/stable/distributed.html#initialization
  • pytorch 分布式訓(xùn)練 distributed parallel 筆記
  • 使用PyTorch編寫分布式應(yīng)用程序
  • https://zhuanlan.zhihu.com/p/76638962

  • 創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

    總結(jié)

    以上是生活随笔為你收集整理的[深度学习] 分布式Pytorch介绍(三)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。