日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

pytorch分布式训练(一):torch.nn.DataParallel

發布時間:2025/3/15 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 pytorch分布式训练(一):torch.nn.DataParallel 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

??本文介紹最簡單的pytorch分布式訓練方法:使用torch.nn.DataParallel這個API來實現分布式訓練。環境為單機多gpu,不妨假設有4個可用的gpu。

一、構建方法

使用這個API實現分布式訓練的步驟非常簡單,總共分為3步驟:
1、創建一個model,并將該model推到某個gpu上(這個gpu也將作為output_device,后面具體解釋含義),不妨假設推到第0號gpu上,

device = torch.device("cuda:0") model.to(device)

2、將數據推到output_device對應的gpu上,

data = data.to(device)

3、使用torch.nn.DataParallel這個API來在0,1,2,3四個gpu上構建分布式模型,

model = torch.nn.DataParallel(model, device_ids=[0,1,2,3], output_device=0)

然后這個model就可以像普通的單gpu上的模型一樣開始訓練了。

二、原理詳解

2.1 原理圖

??首先通過圖來看一下這個最簡單的分布式訓練API的工作原理,然后結合代碼詳細闡述。

將模型和數據推入output_device(也就是0號)gpu上。

0號gpu將當前模型在其他幾個gpu上進行復制,同步模型的parameter、buffer和modules等;將當前batch盡可能平均的分為len(device)=4份,分別推給每一個設備,并開啟多線程分別在每個設備上進行前向傳播,得到各自的結果,最后將各自的結果全部匯總在一起,拷貝回0號gpu。

在0號gpu進行反向傳播和模型的參數更新,并將結果同步給其他幾個gpu,即完成了一個batch的訓練。

2.2 代碼原理

??通過分析torch.nn.DataParallel的代碼,可以看到具體的過程,這里重點看一下幾個關鍵的地方。

# 繼承自nn.Module,只要實現__init__和forward函數即可 class DataParallel(Module):# 構造函數里沒有什么關鍵內容,主要是根據傳進來的model、device_ids和output_device進行一些變量生成def __init__(self, module, device_ids=None, output_device=None, dim=0):super(DataParallel, self).__init__()device_type = _get_available_device_type()if device_type is None:self.module = moduleself.device_ids = []returnif device_ids is None:device_ids = _get_all_device_indices()if output_device is None:output_device = device_ids[0]self.dim = dimself.module = moduleself.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids))self.output_device = _get_device_index(output_device, True)self.src_device_obj = torch.device(device_type, self.device_ids[0])_check_balance(self.device_ids)if len(self.device_ids) == 1:self.module.to(self.src_device_obj)def forward(self, *inputs, **kwargs):if not self.device_ids:return self.module(*inputs, **kwargs)for t in chain(self.module.parameters(), self.module.buffers()):if t.device != self.src_device_obj:raise RuntimeError("module must have its parameters and buffers ""on device {} (device_ids[0]) but found one of ""them on device: {}".format(self.src_device_obj, t.device))inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)if len(self.device_ids) == 1:return self.module(*inputs[0], **kwargs[0])# 在每個gpu上都復制一個modelreplicas = self.replicate(self.module, self.device_ids[:len(inputs)])# 開啟多線程進行前向傳播,得到結果outputs = self.parallel_apply(replicas, inputs, kwargs)# 將每個gpu上得到的結果都gather到0號gpu上return self.gather(outputs, self.output_device)def replicate(self, module, device_ids):return replicate(module, device_ids, not torch.is_grad_enabled())def scatter(self, inputs, kwargs, device_ids):return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)def parallel_apply(self, replicas, inputs, kwargs):return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])def gather(self, outputs, output_device):return gather(outputs, output_device, dim=self.dim)

再看一下parallel_apply這個關鍵的函數,

def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):assert len(modules) == len(inputs)if kwargs_tup is not None:assert len(modules) == len(kwargs_tup)else:kwargs_tup = ({},) * len(modules)if devices is not None:assert len(modules) == len(devices)else:devices = [None] * len(modules)devices = list(map(lambda x: _get_device_index(x, True), devices))# 創建一個互斥鎖,防止前后兩個batch的數據覆蓋lock = threading.Lock()results = {}grad_enabled, autocast_enabled = torch.is_grad_enabled(), torch.is_autocast_enabled()# 線程的target函數,實現每個gpu上進行推理,其中i為gpu編號def _worker(i, module, input, kwargs, device=None):torch.set_grad_enabled(grad_enabled)if device is None:device = get_a_var(input).get_device()try:# 根據當前gpu編號確定推理硬件環境with torch.cuda.device(device), autocast(enabled=autocast_enabled):# this also avoids accidental slicing of `input` if it is a Tensorif not isinstance(input, (list, tuple)):input = (input,)output = module(*input, **kwargs)# 鎖住賦值,防止后一個batch的數據將前一個batch的結果覆蓋with lock:results[i] = outputexcept Exception:with lock:results[i] = ExceptionWrapper(where="in replica {} on device {}".format(i, device))if len(modules) > 1:# 創建多個線程,進行不同gpu的前向推理threads = [threading.Thread(target=_worker,args=(i, module, input, kwargs, device))for i, (module, input, kwargs, device) inenumerate(zip(modules, inputs, kwargs_tup, devices))]for thread in threads:thread.start()for thread in threads:thread.join()else:_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])# 將不同gpu上推理的結果打包起來,后面會gather到output_device上outputs = []for i in range(len(inputs)):output = results[i]if isinstance(output, ExceptionWrapper):output.reraise()outputs.append(output)return outputs

結論

??至此我們看到了torch.nn.DataParallel模塊進行分布式訓練的原理,數據和模型首先推入output_device對應的gpu,然后將分成多個子batch的數據和模型分別推給其他gpu,每個gpu單獨處理各自的子batch,結果再打包回原output_device對應的gpu算梯度和更新參數,如此循環往復,其本質是一個單進程多線程的并發程序。
??由此我們也很容易得到torch.nn.DataParallel模塊進行分布式的缺點,
1、每個batch的數據先分發到各gpu上,結果再打包回output_device上,在output_device一個gpu上進行梯度計算和參數更新,再把更新同步給其他gpu上的model。其中涉及數據的來回拷貝,網絡通信耗時嚴重,GPU使用率低。
2、這種模式只支持單機多gpu的硬件拓撲結構,不支持Apex的混合精度訓練等。
3、torch.nn.DataParallel也沒有很完整的考慮到多個gpu做數據并行的一些問題,比如batchnorm,在訓練時各個gpu上的batchnorm的mean和variance是子batch的計算結果,而不是原來整個batch的值,可能會導致訓練不穩定影響收斂等問題。

總結

以上是生活随笔為你收集整理的pytorch分布式训练(一):torch.nn.DataParallel的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。