[深度学习] 分布式Pytorch介绍(三)
[深度學(xué)習(xí)] 分布式模式介紹(一)
[深度學(xué)習(xí)] 分布式Tensorflow介紹(二)
[深度學(xué)習(xí)] 分布式Pytorch介紹(三)
[深度學(xué)習(xí)] 分布式Horovod介紹(四)
?一? Pytorch 分布式簡(jiǎn)介
https://pytorch.org/docs/stable/distributed.html
torch.distributed 包支持
Pytorch 中通過 torch.distributed 包提供分布式支持,包括 GPU 和 CPU 的分布式訓(xùn)練支持。Pytorch 分布式目前只支持 Linux.
By default, the Gloo and NCCL backends are built and included in PyTorch distributed (NCCL only when building with CUDA). MPI is an optional backend that can only be included if you build PyTorch from source. (e.g. building PyTorch on a host that has MPI installed.)
Backends
Which backend to use?
In the past, we were often asked: “which backend should I use?”.
-
Rule of thumb
-
Use the NCCL backend for distributed GPU training
-
Use the Gloo backend for distributed CPU training.
-
-
GPU hosts with InfiniBand interconnect
-
Use NCCL, since it’s the only backend that currently supports InfiniBand and GPU Direct.
-
-
GPU hosts with Ethernet interconnect
-
Use NCCL, since it currently provides the best distributed GPU training performance, especially for multiprocess single-node or multi-node distributed training. If you encounter any problem with NCCL, use Gloo as the fallback option. (Note that Gloo currently runs slower than NCCL for GPUs.)
-
-
CPU hosts with InfiniBand interconnect
-
If your InfiniBand has enabled IP over IB, use Gloo, otherwise, use MPI instead. We are planning on adding InfiniBand support for Gloo in the upcoming releases.
-
-
CPU hosts with Ethernet interconnect
-
Use Gloo, unless you have specific reasons to use MPI.
-
torch.distributed 的優(yōu)勢(shì)如下:
1. 每個(gè)進(jìn)程對(duì)應(yīng)一個(gè)獨(dú)立的訓(xùn)練過程,且只對(duì)梯度等少量數(shù)據(jù)進(jìn)行信息交換。
在每次迭代中,每個(gè)進(jìn)程具有自己的 optimizer ,并獨(dú)立完成所有的優(yōu)化步驟,進(jìn)程內(nèi)與一般的訓(xùn)練無異。
在各進(jìn)程梯度計(jì)算完成之后,各進(jìn)程需要將梯度進(jìn)行匯總平均,然后再由 rank=0 的進(jìn)程,將其 broadcast 到所有進(jìn)程。之后,各進(jìn)程用該梯度來更新參數(shù)。由于各進(jìn)程中的模型,初始參數(shù)一致 (初始時(shí)刻進(jìn)行一次 broadcast),而每次用于更新參數(shù)的梯度也一致,因此,各進(jìn)程的模型參數(shù)始終保持一致。
而在 DataParallel 中,全程維護(hù)一個(gè) optimizer,對(duì)各 GPU 上梯度進(jìn)行求和,而在主 GPU 進(jìn)行參數(shù)更新,之后再將模型參數(shù) broadcast 到其他 GPU。
相較于 DataParallel,torch.distributed 傳輸?shù)臄?shù)據(jù)量更少,因此速度更快,效率更高。
2. 每個(gè)進(jìn)程包含獨(dú)立的解釋器和 GIL。
由于每個(gè)進(jìn)程擁有獨(dú)立的解釋器和 GIL,消除了來自單個(gè) Python 進(jìn)程中的多個(gè)執(zhí)行線程,模型副本或 GPU 的額外解釋器開銷和 GIL-thrashing ,因此可以減少解釋器和 GIL 使用沖突。這對(duì)于嚴(yán)重依賴 Python runtime 的 models 而言,比如說包含 RNN 層或大量小組件的 models 而言,這尤為重要。
新的庫(kù)的主要亮點(diǎn)有:
- 新的 torch.distributed 是性能驅(qū)動(dòng)的,并且對(duì)所有后端 (Gloo,NCCL 和 MPI) 完全異步操作
- 顯著的分布式數(shù)據(jù)并行性能改進(jìn),尤其適用于網(wǎng)絡(luò)較慢的主機(jī),如基于以太網(wǎng)的主機(jī)
- 為torch.distributed? package中的所有分布式集合操作添加異步支持
- 在Gloo后端添加以下CPU操作:send,recv,reduce,all_gather,gather,scatter
- 在NCCL后端添加barrier操作
- 為NCCL后端添加new_group支持
二 分布式訓(xùn)練介紹
分布式訓(xùn)練可以分為:
Pytorch分布訓(xùn)練一開始接觸的往往是DataParallel,這個(gè)wrapper能夠很方便的使用多張卡,而且將進(jìn)程控制在一個(gè)。唯一的問題就在于,DataParallel只能滿足一臺(tái)機(jī)器上gpu的通信,而一臺(tái)機(jī)器一般只能裝8張卡,對(duì)于一些大任務(wù),8張卡就很吃力了,這個(gè)時(shí)候我們就需要面對(duì)多機(jī)多卡分布式訓(xùn)練這個(gè)問題了
DistributedDataParallel (DDP)在模塊級(jí)別實(shí)現(xiàn)數(shù)據(jù)并行性。 它使用 Torch.distributed 程序包中的通信集合來同步梯度,參數(shù)和緩沖區(qū)。 并行性在流程內(nèi)和跨流程均可用。 在一個(gè)過程中,DDP 將輸入模塊復(fù)制到device_ids中指定的設(shè)備,將輸入沿批次維度分散,然后將輸出收集到output_device,這與 DataParallel 相似。 在整個(gè)過程中,DDP 在正向傳遞中插入必要的參數(shù)同步,在反向傳遞中插入梯度同步。 用戶可以將進(jìn)程映射到可用資源,只要進(jìn)程不共享 GPU 設(shè)備即可。 推薦的方法(通常是最快的方法)是為每個(gè)模塊副本創(chuàng)建一個(gè)過程,即在一個(gè)過程中不進(jìn)行任何模塊復(fù)制。
DataParallel和DistributedDataParallel之間的比較
在深入探討之前,讓我們澄清一下為什么盡管增加了復(fù)雜性,但還是考慮使用DistributedDataParallel而不是DataParallel:
1:? torch.nn.parallel.DistributedDataParallel
這個(gè)從名字上就能看出來與DataParallel相類似,也是一個(gè)模型wrapper。這個(gè)包是實(shí)現(xiàn)多機(jī)多卡分布訓(xùn)練最核心東西,它可以幫助我們?cè)诓煌瑱C(jī)器的多個(gè)模型拷貝之間平均梯度。
2: torch.utils.data.distributed.DistributedSampler
在多機(jī)多卡情況下分布式訓(xùn)練數(shù)據(jù)的讀取也是一個(gè)問題,不同的卡讀取到的數(shù)據(jù)應(yīng)該是不同的。dataparallel的做法是直接將batch切分到不同的卡,這種方法對(duì)于多機(jī)來說不可取,因?yàn)槎鄼C(jī)之間直接進(jìn)行數(shù)據(jù)傳輸會(huì)嚴(yán)重影響效率。于是有了利用sampler確保dataloader只會(huì)load到整個(gè)數(shù)據(jù)集的一個(gè)特定子集的做法。DistributedSampler就是做這件事的。它為每一個(gè)子進(jìn)程劃分出一部分?jǐn)?shù)據(jù)集,以避免不同進(jìn)程之間數(shù)據(jù)重復(fù)
1.0的多機(jī)多卡的計(jì)算模型并沒有采用主流的Parameter Server結(jié)構(gòu),而是直接用了Uber Horovod的形式,也是百度開源的RingAllReduce算法。
采用PS計(jì)算模型的分布式,通常會(huì)遇到網(wǎng)絡(luò)的問題,隨著worker數(shù)量的增加,其加速比會(huì)迅速的惡化,例如resnet50這樣的模型,目前的TF在10幾臺(tái)機(jī)器的時(shí)候,加速比已經(jīng)開始惡化的不可接受了。因此,經(jīng)常要上RDMA、InfiniBand等技術(shù),并且還帶來了一波網(wǎng)卡的升級(jí),有些大廠直接上了100GBs的網(wǎng)卡,有錢任性。而Uber的Horovod,采用的RingAllReduce的計(jì)算方案,其特點(diǎn)是網(wǎng)絡(luò)通信量不隨著worker(GPU)的增加而增加,是一個(gè)恒定值。簡(jiǎn)單看下圖理解下,GPU 集群被組織成一個(gè)邏輯環(huán),每個(gè)GPU有一個(gè)左鄰居、一個(gè)右鄰居,每個(gè)GPU只從左鄰居接受數(shù)據(jù)、并發(fā)送數(shù)據(jù)給右鄰居。即每次梯度每個(gè)gpu只獲得部分梯度更新,等一個(gè)完整的Ring完成,每個(gè)GPU都獲得了完整的參數(shù)。
這里引入了一個(gè)新的函數(shù)model = torch.nn.parallel.DistributedDataParallel(model)為的就是支持分布式模式
不同于原來在multiprocessing中的model = torch.nn.DataParallel(model,device_ids=[0,1,2,3]).cuda()函數(shù),這個(gè)函數(shù)只是實(shí)現(xiàn)了在單機(jī)上的多GPU訓(xùn)練,根據(jù)官方文檔的說法,甚至在單機(jī)多卡的模式下,新函數(shù)表現(xiàn)也會(huì)優(yōu)于這個(gè)舊函數(shù)。
這里要提到兩個(gè)問題:
- 每個(gè)進(jìn)程都有自己的Optimizer同時(shí)每個(gè)迭代中都進(jìn)行完整的優(yōu)化步驟,雖然這可能看起來是多余的,但由于梯度已經(jīng)聚集在一起并跨進(jìn)程平均,因此對(duì)于每個(gè)進(jìn)程都是相同的,這意味著不需要參數(shù)廣播步驟,從而減少了在節(jié)點(diǎn)之間傳輸張量tensor所花費(fèi)的時(shí)間。
- 另外一個(gè)問題是Python解釋器的,每個(gè)進(jìn)程都包含一個(gè)獨(dú)立的Python解釋器,消除了來自單個(gè)Python進(jìn)程中的多個(gè)執(zhí)行線程,模型副本或GPU的額外解釋器開銷和“GIL-thrashing”。 這對(duì)于大量使用Python運(yùn)行時(shí)的模型尤其重要。
基本使用流程
Pytorch 中分布式的基本使用流程如下:
train_dataset最好不要用自己寫的sampler,否則還需要再實(shí)現(xiàn)一遍分布式的數(shù)據(jù)劃分方式
單機(jī)多卡--DistributedDataParallel
前面的 DataParallel 并不是完整的分布式計(jì)算,只是將一些部分的計(jì)算(例如,前向和反向)放到了多張卡上,某些東西(例如,梯度)計(jì)算的時(shí)候仍然是「一卡有難,八方圍觀」,可能會(huì)將第一張卡撐爆,并且和 DDP 對(duì)比的話效率實(shí)在不高。
首先增加幾個(gè)概念:
torch.distributed.launch 會(huì)給模型分配一個(gè)args.local_rank的參數(shù),也可以通過torch.distributed.get_rank()獲取進(jìn)程id。
怎么將程序跑起來。這里也有兩種方法:
1. 用 torch.distributed.launch:
python -m torch.distributed.launch --nproc_per_node=4 main.py2. 用 torch.multiprocessing:
import torch.multiprocessing as mpdef main(rank, your_custom_arg_1, your_custom_arg_2): # 這里第一個(gè) rank 變量會(huì)被 mp.spawn 函數(shù)自動(dòng)填充,可以充當(dāng) local_rank 來用pass # 將前面那一堆東西包裝成一個(gè) main 函數(shù)mp.spawn(main, nprocs=how_many_process, args=(your_custom_arg_1, your_custom_arg_2))三 分布式初始化
torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(0, 1800), world_size=-1, rank=-1, store=None, group_name='')參數(shù)說明:
- backend(str): 后端選擇,包括上面那幾種 gloo,nccl,mpi
- init_method(str,optional): 用來初始化包的URL, 我理解是一個(gè)用來做并發(fā)控制的共享方式
- world_size(int, optional): 參與這個(gè)工作的進(jìn)程數(shù)
- rank(int,optional): 當(dāng)前進(jìn)程的rank
- group_name(str,optional): 用來標(biāo)記這組進(jìn)程名的
To enable backend == Backend.MPI, PyTorch needs to built from source on a system that supports MPI.
The same applies to NCCL as well.
四? init_method 三種方式
1. TCP initialization
tcp:// IP組播(要求所有進(jìn)程都在同一個(gè)網(wǎng)絡(luò)中)比較好理解,?? 以TCP協(xié)議的方式進(jìn)行不同分布式進(jìn)程之間的數(shù)據(jù)交流,需要設(shè)置一個(gè)端口,不同進(jìn)程之間公用這一個(gè)端口,并且設(shè)置host的級(jí)別和host的數(shù)量。設(shè)計(jì)兩個(gè)參數(shù)rank和world_size。其中rank為host的編號(hào),默認(rèn)0為主機(jī),端口應(yīng)該位于該主機(jī)上。world_size為分布式主機(jī)的個(gè)數(shù)。
用該方式,運(yùn)行上面的代碼可以使用如下指令:
在主機(jī)01上:
python mnsit.py --init-method tcp://10.172.1.2:22225 --rank 0 --world-size 2在主機(jī)02上:
python mnsit.py --init-method tcp://10.172.1.2:22225 --rank 1 --world-size 2這里沒有設(shè)置backend參數(shù),所以默認(rèn)是gloo。22225是端口號(hào),用一個(gè)沒有沒占用的就行。這兩句指令的先后順序沒有要求,只有兩條指令都輸入,程序才會(huì)運(yùn)行起來。
2. Shared file-system initialization
file:// 共享文件系統(tǒng)(要求所有進(jìn)程可以訪問單個(gè)文件系統(tǒng))有共享文件系統(tǒng)可以選擇?
提供的第二種方式是文件共享,機(jī)器有共享的文件系統(tǒng),故可以采用這種方式,也避免了基于TCP的網(wǎng)絡(luò)傳輸。這里使用方式是使用絕對(duì)路徑在指定一個(gè)共享文件系統(tǒng)下不存在的文件。
在主機(jī)01上:
python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 0 --world-size 2在主機(jī)02上:
python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 1 --world-size 2這里相比于TCP的方式麻煩一點(diǎn)的是運(yùn)行完一次必須更換共享的文件名,或者刪除之前的共享文件,不然第二次運(yùn)行會(huì)報(bào)錯(cuò)。
3. Environment variable initialization
env:// 環(huán)境變量(需要您手動(dòng)分配等級(jí)并知道所有進(jìn)程可訪問節(jié)點(diǎn)的地址)默認(rèn)是這個(gè)
MASTER_PORT - required; has to be a free port on machine with rank 0 MASTER_ADDR - required (except for rank 0); address of rank 0 node WORLD_SIZE - required; can be set either here, or in a call to init function RANK - required; can be set either here, or in a call to init function五? torch.distributed.launch
torch.distributed包提供了一個(gè)啟動(dòng)實(shí)用程序torch.distributed.launch,此幫助程序可用于為每個(gè)節(jié)點(diǎn)啟動(dòng)多個(gè)進(jìn)程以進(jìn)行分布式訓(xùn)練,它在每個(gè)訓(xùn)練節(jié)點(diǎn)上產(chǎn)生多個(gè)分布式訓(xùn)練進(jìn)程。
這個(gè)工具可以用作CPU或者GPU,如果被用于GPU,每個(gè)GPU產(chǎn)生一個(gè)進(jìn)程Process
該工具既可以用來做單節(jié)點(diǎn)多GPU訓(xùn)練,也可用于多節(jié)點(diǎn)多GPU訓(xùn)練。如果是單節(jié)點(diǎn)多GPU,將會(huì)在單個(gè)GPU上運(yùn)行一個(gè)分布式進(jìn)程,據(jù)稱可以非常好地改進(jìn)單節(jié)點(diǎn)訓(xùn)練性能。如果用于多節(jié)點(diǎn)分布式訓(xùn)練,則通過在每個(gè)節(jié)點(diǎn)上產(chǎn)生多個(gè)進(jìn)程來獲得更好的多節(jié)點(diǎn)分布式訓(xùn)練性能。如果有Infiniband接口則加速比會(huì)更高。
在單節(jié)點(diǎn)分布式訓(xùn)練或多節(jié)點(diǎn)分布式訓(xùn)練的兩種情況下,該工具將為每個(gè)節(jié)點(diǎn)啟動(dòng)給定數(shù)量的進(jìn)程(--nproc_per_node)。如果用于GPU訓(xùn)練,則此數(shù)字需要小于或等于當(dāng)前系統(tǒng)上的GPU數(shù)量(nproc_per_node),并且每個(gè)進(jìn)程將在從GPU 0到GPU(nproc_per_node - 1)的單個(gè)GPU上運(yùn)行。
1. Single-Node multi-process distributed training
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVEYOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all otherarguments of your training script)2. Multi-Node multi-process distributed training: (e.g. two nodes)
Node 1: (IP: 192.168.1.1, and has a free port: 1234)
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE--nnodes=2 --node_rank=0 --master_addr="192.168.1.1"--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3and all other arguments of your training script)Node 2:
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE--nnodes=2 --node_rank=1 --master_addr="192.168.1.1"--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3and all other arguments of your training script)需要注意的地方:
- 后端最好用“NCCL”,才能獲取最好的分布式性能
- 訓(xùn)練代碼必須從命令行解析--local_rank=LOCAL_PROCESS_RANK
- torch.distributed初始化方式
- model
訓(xùn)練代碼中這樣寫(省略多余代碼,只保留核心代碼):
import torch.distributed as dist # 這個(gè)參數(shù)是torch.distributed.launch傳遞過來的,我們?cè)O(shè)置位置參數(shù)來接受,local_rank代表當(dāng)前程序進(jìn)程使用的GPU標(biāo)號(hào) parser.add_argument("--local_rank", type=int, default=0) def synchronize():"""Helper function to synchronize (barrier) among all processes whenusing distributed training"""if not dist.is_available():returnif not dist.is_initialized():returnworld_size = dist.get_world_size()if world_size == 1:returndist.barrier()## WORLD_SIZE 由torch.distributed.launch.py產(chǎn)生 具體數(shù)值為 nproc_per_node*node(主機(jī)數(shù),這里為1) num_gpus = int(os.environ["WORLD_SIZE"]) if "WORLD_SIZE" in os.environ else 1is_distributed = num_gpus > 1if is_distributed:torch.cuda.set_device(args.local_rank) # 這里設(shè)定每一個(gè)進(jìn)程使用的GPU是一定的torch.distributed.init_process_group(backend="nccl", init_method="env://")synchronize()# 將模型移至到DistributedDataParallel中,此時(shí)就可以進(jìn)行訓(xùn)練了 if is_distributed:model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank,# this should be removed if we update BatchNorm statsbroadcast_buffers=False,)# 注意,在測(cè)試的時(shí)候需要執(zhí)行 model = model.moduleWRITING DISTRIBUTED APPLICATIONS WITH PYTORCH
https://github.com/pytorch/examples/tree/master/imagenet?
這里,常規(guī)的操作就不多敘述了,主要講一下和分布式相關(guān)的代碼部分。
這幾個(gè)是必要的參數(shù)設(shè)置,其中最后一個(gè)是官網(wǎng)沒有的
if args.distributed:dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,world_size=args.world_size,rank=args.dist_rank)這個(gè)是分布式的初始化,同樣,最后添加一個(gè)rank
model.cuda() model = torch.nn.parallel.DistributedDataParallel(model)這里,把我們平時(shí)使用的單機(jī)多卡,數(shù)據(jù)并行的API
model = torch.nn.DataParallel(model).cuda()換掉即可。
if args.distributed:train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)最后使用這個(gè)官方給的劃分方法,把數(shù)據(jù)集劃分即可。
五 MNIST 分布式例子
MNIST 分布式 (2臺(tái) GPU機(jī)器,每臺(tái)一張GPU)
?GPU1:? ?python mnist_dist.py --init-method=file:///home/workspace/share/1?--rank=0 --world-size=2
?GPU2:? ?python mnist_dist.py --init-method=file:///home/workspace/share/1?--rank=1?--world-size=2
from __future__ import print_function import argparse import timeimport torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transformsimport torch.distributed as dist import torch.utils.data import torch.utils.data.distributedclass Net(nn.Module):def __init__(self):super(Net, self).__init__()self.conv1 = nn.Conv2d(1, 20, 5, 1)self.conv2 = nn.Conv2d(20, 50, 5, 1)self.fc1 = nn.Linear(4 * 4 * 50, 500)self.fc2 = nn.Linear(500, 10)def forward(self, x):x = F.relu(self.conv1(x))x = F.max_pool2d(x, 2, 2)x = F.relu(self.conv2(x))x = F.max_pool2d(x, 2, 2)x = x.view(-1, 4 * 4 * 50)x = F.relu(self.fc1(x))x = self.fc2(x)return F.log_softmax(x)def train(args, model, device, train_loader, optimizer, epoch):model.train()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = F.nll_loss(output, target)loss.backward()optimizer.step()if batch_idx % args.log_interval == 0:print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), len(train_loader.dataset),100. * batch_idx / len(train_loader), loss.item()))def test(args, model, device, test_loader):model.eval()test_loss = 0correct = 0for data, target in test_loader:data, target = data.to(device), target.to(device)output = model(data)test_loss += F.nll_loss(output, target, size_average=False).item() # sum up batch losspred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probabilitycorrect += pred.eq(target.data.view_as(pred)).cpu().sum()test_loss /= len(test_loader.dataset)print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct, len(test_loader.dataset),100. * correct / len(test_loader.dataset)))def main():# Training settingsparser = argparse.ArgumentParser(description='PyTorch MNIST Example')parser.add_argument('--batch-size', type=int, default=64, metavar='N',help='input batch size for training (default: 64)')parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',help='input batch size for testing (default: 1000)')parser.add_argument('--epochs', type=int, default=10, metavar='N',help='number of epochs to train (default: 10)')parser.add_argument('--lr', type=float, default=0.01, metavar='LR',help='learning rate (default: 0.01)')parser.add_argument('--momentum', type=float, default=0.5, metavar='M',help='SGD momentum (default: 0.5)')parser.add_argument('--no-cuda', action='store_true',help='disables CUDA training')parser.add_argument('--seed', type=int, default=1, metavar='S',help='random seed (default: 1)')parser.add_argument('--log-interval', type=int, default=500, metavar='N',help='how many batches to wait before logging training status')parser.add_argument('--init-method', type=str, default='tcp://127.0.0.1:23456')parser.add_argument('--rank', type=int)parser.add_argument('--world-size', type=int)args = parser.parse_args()use_cuda = not args.no_cuda and torch.cuda.is_available()print(args)# 初始化dist.init_process_group(init_method=args.init_method, backend="gloo", world_size=args.world_size, rank=args.rank,group_name="pytorch_test")torch.manual_seed(args.seed)if use_cuda:torch.cuda.manual_seed(args.seed)train_dataset = datasets.MNIST('./data', train=True, download=False,transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))]))# 分發(fā)數(shù)據(jù)train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)kwargs = {'num_workers': 5, 'pin_memory': True} if use_cuda else {}train_loader = torch.utils.data.DataLoader(train_dataset,batch_size=args.batch_size, shuffle=True, **kwargs)test_loader = torch.utils.data.DataLoader(datasets.MNIST('data', train=False, transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])),batch_size=args.test_batch_size, shuffle=True, **kwargs)device = torch.device("cuda" if use_cuda else "cpu")print(device)model = Net().to(device)if use_cuda:model = torch.nn.parallel.DistributedDataParallel(model) if use_cuda else torch.nn.parallel.DistributedDataParallelCPU(model)optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)total_time = 0for epoch in range(1, args.epochs + 1):# 設(shè)置epoch位置,這應(yīng)該是個(gè)為了同步所做的工作train_sampler.set_epoch(epoch)start_cpu_secs = time.time()train(args, model, device, train_loader, optimizer, epoch)end_cpu_secs = time.time()print("Epoch {} of {} took {:.3f}s".format(epoch, args.epochs, end_cpu_secs - start_cpu_secs))total_time += end_cpu_secs - start_cpu_secstest(args, model, device, test_loader)print("Total time= {:.3f}s".format(total_time))if __name__ == '__main__':main()其他單機(jī)例子(一臺(tái)GPU機(jī)器一張GPU卡)
python mnist_no_dist.py?
from __future__ import print_function import argparse import timeimport torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transformsclass Net(nn.Module):def __init__(self):super(Net, self).__init__()self.conv1 = nn.Conv2d(1, 20, 5, 1)self.conv2 = nn.Conv2d(20, 50, 5, 1)self.fc1 = nn.Linear(4 * 4 * 50, 500)self.fc2 = nn.Linear(500, 10)def forward(self, x):x = F.relu(self.conv1(x))x = F.max_pool2d(x, 2, 2)x = F.relu(self.conv2(x))x = F.max_pool2d(x, 2, 2)x = x.view(-1, 4 * 4 * 50)x = F.relu(self.fc1(x))x = self.fc2(x)return F.log_softmax(x, dim=1)def train(args, model, device, train_loader, optimizer, epoch):model.train()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = F.nll_loss(output, target)loss.backward()optimizer.step()if batch_idx % args.log_interval == 0:print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), len(train_loader.dataset),100. * batch_idx / len(train_loader), loss.item()))def test(args, model, device, test_loader):model.eval()test_loss = 0correct = 0with torch.no_grad():for data, target in test_loader:data, target = data.to(device), target.to(device)output = model(data)test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch losspred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probabilitycorrect += pred.eq(target.view_as(pred)).sum().item()test_loss /= len(test_loader.dataset)print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct, len(test_loader.dataset),100. * correct / len(test_loader.dataset)))def main():# Training settingsparser = argparse.ArgumentParser(description='PyTorch MNIST Example')parser.add_argument('--batch-size', type=int, default=64, metavar='N',help='input batch size for training (default: 64)')parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',help='input batch size for testing (default: 1000)')parser.add_argument('--epochs', type=int, default=10, metavar='N',help='number of epochs to train (default: 10)')parser.add_argument('--lr', type=float, default=0.01, metavar='LR',help='learning rate (default: 0.01)')parser.add_argument('--momentum', type=float, default=0.5, metavar='M',help='SGD momentum (default: 0.5)')parser.add_argument('--no-cuda', action='store_true', default=False,help='disables CUDA training')parser.add_argument('--seed', type=int, default=1, metavar='S',help='random seed (default: 1)')parser.add_argument('--log-interval', type=int, default=500, metavar='N',help='how many batches to wait before logging training status')# parser.add_argument('--save-model', action='store_true', default=False,# help='For Saving the current Model')args = parser.parse_args()print(args)use_cuda = not args.no_cuda and torch.cuda.is_available()torch.manual_seed(args.seed)kwargs = {'num_workers': 5, 'pin_memory': True} if use_cuda else {}train_loader = torch.utils.data.DataLoader(datasets.MNIST('./data', train=True, download=True,transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])),batch_size=args.batch_size, shuffle=True, **kwargs)test_loader = torch.utils.data.DataLoader(datasets.MNIST('./data', train=False, transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])),batch_size=args.test_batch_size, shuffle=True, **kwargs)device = torch.device("cuda" if use_cuda else "cpu")print(device)model = Net().to(device)optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)total_time = 0for epoch in range(1, args.epochs + 1):start_cpu_secs = time.time()train(args, model, device, train_loader, optimizer, epoch)end_cpu_secs = time.time()print("Epoch {} of {} took {:.3f}s".format(epoch, args.epochs, end_cpu_secs - start_cpu_secs))total_time += end_cpu_secs - start_cpu_secstest(args, model, device, test_loader)# print("Total time= {:.3f}s".format(total_time))# if (args.save_model):# torch.save(model.state_dict(), "mnist_cnn.pt")if __name__ == '__main__':main()Dataloader中的參數(shù)
如果你的選項(xiàng)剛好是最壞情況,優(yōu)化這個(gè)有可能達(dá)到2倍左右的性能提升(經(jīng)驗(yàn)值哈)
解釋一下DataLoader中其中兩個(gè)參數(shù):
- num_worker:
數(shù)據(jù)集加載的時(shí)候,控制用于同時(shí)加載數(shù)據(jù)的線程數(shù)(默認(rèn)為0,即在主線程讀取)?存在最優(yōu)值,你會(huì)看到運(yùn)行的時(shí)候pytorch會(huì)新建恰等于這個(gè)值的數(shù)據(jù)讀取線程,我猜,線程多于必要的時(shí)候,數(shù)據(jù)讀取線程返回到主線程反而會(huì)因?yàn)榫€程間通信減慢數(shù)據(jù)。因此大了不好小了也不好。建議把模型,loss,優(yōu)化器全注釋了只跑一下數(shù)據(jù)流速度,確定最優(yōu)值 - pin_memory:
是否提前申請(qǐng)CUDA內(nèi)存(默認(rèn)為False,但有說法除非數(shù)據(jù)集很小,否則在N卡上推薦總是打開)在MNIST這樣的小數(shù)據(jù)集上好像是關(guān)閉比較好,到底多小算小說不清楚,建議自己試一下。
pin_memory就是鎖頁(yè)內(nèi)存,創(chuàng)建DataLoader時(shí),設(shè)置pin_memory=True,則意味著生成的Tensor數(shù)據(jù)最開始是屬于內(nèi)存中的鎖頁(yè)內(nèi)存,這樣將內(nèi)存的Tensor轉(zhuǎn)義到GPU的顯存就會(huì)更快一些。
主機(jī)中的內(nèi)存,有兩種存在方式,一是鎖頁(yè),二是不鎖頁(yè),鎖頁(yè)內(nèi)存存放的內(nèi)容在任何情況下都不會(huì)與主機(jī)的虛擬內(nèi)存進(jìn)行交換(注:虛擬內(nèi)存就是硬盤),而不鎖頁(yè)內(nèi)存在主機(jī)內(nèi)存不足時(shí),數(shù)據(jù)會(huì)存放在虛擬內(nèi)存中。顯卡中的顯存全部是鎖頁(yè)內(nèi)存,當(dāng)計(jì)算機(jī)的內(nèi)存充足的時(shí)候,可以設(shè)置pin_memory=True。當(dāng)系統(tǒng)卡住,或者交換內(nèi)存使用過多的時(shí)候,設(shè)置pin_memory=False。因?yàn)閜in_memory與電腦硬件性能有關(guān),pytorch開發(fā)者不能確保每一個(gè)煉丹玩家都有高端設(shè)備,因此pin_memory默認(rèn)為False。
如果機(jī)子的內(nèi)存比較大,建議開啟pin_memory=Ture,如果開啟后發(fā)現(xiàn)有卡頓現(xiàn)象或者內(nèi)存占用過高,此時(shí)建議關(guān)閉。
總之官方的默認(rèn)值很有可能不是最好的,建議自己多試試。在MNIST這樣的小數(shù)據(jù)集上,pin_memory關(guān)閉比較好。而且,num_workers需要調(diào)節(jié),除了默認(rèn)情況外,最快和最慢是有一定差距的,建議在自己的代碼上只跑數(shù)據(jù)讀取這一塊,確定這兩個(gè)參數(shù)的最優(yōu)值。
參考:
https://www.w3cschool.cn/pytorch/pytorch-hv9o3bsy.html
總結(jié)
以上是生活随笔為你收集整理的[深度学习] 分布式Pytorch介绍(三)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 工信部等八部门组织开展全国范围内公共领域
- 下一篇: [深度学习] PyTorch-BigGr