日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

「分布式训练」使用 DDP 实现程序单机多卡并行指南

發布時間:2024/5/14 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 「分布式训练」使用 DDP 实现程序单机多卡并行指南 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近在大趨勢的影響下,開始染指大模型。由于實驗室計算資源的限制,需要使用單機多卡并行的方式運行程序,這里以 BLOOM-560m 模型為例,演示如何通過單機多卡DDP并行的方式微調完成下游任務。

目錄

  • 0. 基礎篇
    • - 兩種分布式訓練方式
    • - 數據并行 & 模型并行
  • 1. 程序修改
    • 1.1 導入關鍵包
    • 1.2 定義關鍵函數
    • 1.3 程序入口
    • 1.4 main() 函數
    • 1.5 get_dataloader() 函數
    • 1.6 train() 函數
    • 1.7 validate() 函數
    • 1.8 test() 函數
  • 2. 程序運行
    • 2.1 mp.spawn() 啟動
    • 2.2 tochrnn 啟動
    • 2.3 torch.distributed.launch() 啟動
  • 3. Debug 歷程
    • 問題一:多進程計算數據收集
    • 問題二:模型加載參數缺失
    • 問題三:參數類型轉換異常
    • 問題四:參數泄露


0. 基礎篇

- 兩種分布式訓練方式

??:Pytorch 分布式目前只支持 Linux。實現程序并行主要有 DataParallel 和 DistributedDataParallel 兩種方式:

  • DataParallel (DP):實現簡單,代碼量較少,啟動速度快一點。但速度較慢,且存在負載不均衡的問題。單進程,多線程。主卡顯存占用比其他卡會多很多。不支持 Apex 的混合精度訓練。是Pytorch官方很久之前給的一種方案。受 Python GIL 的限制,DP的操作原理是將一個batchsize的輸入數據均分到多個GPU上分別計算(此處注意,batchsize要大于GPU個數才能劃分)。

  • DistributedDataParallel (DDP):All-Reduce模式,本意是用來分布式訓練(多機多卡),但是也可用于單機多卡。配置稍復雜。多進程。數據分配較均衡。是新一代的多卡訓練方法。使用 torch.distributed 庫實現并行。torch.distributed 庫提供分布式支持,包括 GPU 和 CPU 的分布式訓練支持,該庫提供了一種類似 MPI 的接口,用于跨多機器網絡交換張量數據。它支持幾種不同的后端和初始化方法。DDP通過Ring-Reduce的數據交換方法提高了通訊效率,并通過啟動多個進程的方式減輕Python GIL的限制,從而提高訓練速度。

  • DDP多卡訓練的原理

  • 將模型在各個GPU上復制一份;
  • 將總的 batch 數據等分到不同的GPU上進行計算(shuffle 順序打亂),每個進程都從磁盤加載其自己的數據;
  • 在模型訓練時,損失函數的前向傳播和計算在每個 GPU 上獨立執行,因此,不需要收集網絡輸出。在反向傳播期間,各個進程通過一種叫 Ring-Reduce 的方法與其他進程通訊,交換各自的梯度,從而獲得所有進程的平均梯度;然后用這個值在所有 GPU 上執行梯度下降,從而每個 GPU 在反向傳播結束時最終得到平均梯度的相同副本;
  • 各個進程用平均后的梯度更新自己的參數,因為各個進程的初始參數、更新梯度是一致的,所以更新后的參數也是完全相同的。

- 數據并行 & 模型并行

  • 數據并行是指,多張 GPUs 使用相同的模型副本,但采用同一batch中的不同數據進行訓練。
  • 模型并行是指,多張 GPUs 使用同一 batch 的數據,分別訓練模型的不同部分。

簡單來記就是:并行就是對并行對象進行拆分,以提高運算效率。


1. 程序修改

本教程使用DDP 方式完成程序并行。參考此篇教程,實現模型多卡復制和數據并行。

1.1 導入關鍵包

以下是程序修改過程中會使用到的包;其中,dist 負責多卡通訊,DDP 負責模型傳遞等工作。

import torch.distributed as dist import torch.multiprocessing as mp from torch.cuda.amp import GradScaler from torch.utils.data.distributed import DistributedSampler from torch.nn.parallel import DistributedDataParallel as DDP

1.2 定義關鍵函數

  • init_ddp(local_rank)

對進程進行初始化,使用 nccl 后端,并用 env 作為初始化方法。

local_rank = dist.get_rank()
world_size = dist.get_world_size()

def init_ddp(local_rank):# 有了這一句之后,在轉換device的時候直接使用 a=a.cuda()即可,否則要用a=a.cuda(local_rank)torch.cuda.set_device(local_rank)os.environ['RANK'] = str(local_rank)dist.init_process_group(backend='nccl', init_method='env://')

在完成了該初始化后,可以很輕松地在需要時獲得 local_rank、world_size,而不需要作為額外參數從 main() 函數中一層一層往下傳。比如需要 print, log, save_model時,由于多個進程擁有相同的副本,故只需要一個進程執行即可,示例:

if local_rank == 0:print(f'begin validating') ......if local_rank == 0:save_model(actual_epoch, model, scaler, args['model_save_dir'] + '/best_macro_model_DDP_direct.pt')
  • reduce_tensor(tensor)

對多個進程的計算結果進行匯總,如 loss、評價指標。

def reduce_tensor(tensor: torch.Tensor):'''對多個進程計算的多個 tensor 類型的 輸出值取平均操作'''rt = tensor.clone() # tensor(9.1429, device='cuda:1')dist.all_reduce(rt, op=dist.reduce_op.SUM)rt /= dist.get_world_size()return rt
  • get_ddp_generator(seed)

用于訓練過程中,增強訓練的隨機性。

def get_ddp_generator(seed=3407):'''對每個進程使用不同的隨機種子,增強訓練的隨機性'''local_rank = dist.get_rank()g = torch.Generator()g.manual_seed(seed + local_rank)return g

1.3 程序入口

在if __name__ == '__main__':中,使用 spawn() 函數啟動 DDP,該函數的主要參數包括:

  • fn:需要并行的函數。這里即為 main() 函數,每個線程將執行一次該函數;
  • args:fn所需的參數。注意:傳給fn的參數必須寫成元組的形式,哪怕只有一個參數;
  • nprocs:啟動的進程數,默認值為1. 這里將其設置為world_size即可。nprocs的值與world_size不一致會導致進程等待同步而一直停滯。
  • if __name__ == '__main__':parser = argparse.ArgumentParser()parser.add_argument('-args', help="priority", type=bool, required=False, default=True)parser.add_argument('-gpu', default='0,1', type=str, help='gpu device ids for CUDA_VISIBLE_DEVICES')parser.add_argument('-mode', help="train&test", type=str, required=False, default='train')parser.add_argument('-requires_grad', help="whether to weight_decay", type= bool, required=False, default=True)args = parser.parse_args()os.environ['MASTER_ADDR'] = 'localhost' # 0號機器的IPos.environ['MASTER_PORT'] = '19198' # 0號機器的可用端口os.environ['CUDA_VISIBLE_DEVICES'] = args['gpu'] # 使用哪些GPUworld_size = torch.cuda.device_count()os.environ['WORLD_SIZE'] = str(world_size)os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128"os.environ["TOKENIZERS_PARALLELISM"] = "false" # 指定程序在分詞時不并行執行if args['mode'] == 'train':time_start = time.time()mp.spawn(fn=main, args=(args, ), nprocs=world_size)time_elapsed = time.time() - time_startprint(f'\ntime elapsed: {time_elapsed:.2f} seconds.')elif args['mode'] == 'test': time_start = time.time()mp.spawn(fn=test, args=(args, ), nprocs=world_size)time_elapsed = time.time() - time_startprint(f'\ntime elapsed: {time_elapsed:.2f} seconds.')

    1.4 main() 函數

    這里的 main() 函數即上面提到的 spawn() 函數中傳入的第一個參數。代碼關鍵部位修改如下:

    • 參數列表更新:添加額外參數 local_rank,該參數無需在 mp.spawn() 函數中傳遞,系統會自動分配;

    • 進程初始化:調用 init_ddp() 函數實現;

    • BN層同步:調用 convert_sync_batchnorm() 函數用同步的方法完成BN以盡可能模擬單卡場景,盡管會降低GPU利用率,但可以提高模型在多卡場景下的表現(詳解見此篇博客);BN層同步的必要性依賴于單卡batch_size的大小,如果單卡batch_size太小,使用SyncBN可以提高性能。但是如果batch_size較大的時候就不需要使用SyncBN, 因為這需要多卡之間通信,會導致訓練速度變慢。

    • 數據并行:調用 DistributedDataParallel() 函數實現;

    • 指定混合精度訓練:調用 GradScaler() 函數實現,作為參數傳至 train() 函數中;

    • 訓練采樣器設置:每個 epoch 設置不同的 sampling 順序;

    • 避免副本重復執行:使用 if local_rank==0: 語句進行約束;

    • 消除進程組:調用 destroy_process_group() 函數實現。

    def main(local_rank, args): # 參數列表更新init_ddp(local_rank) ### 進程初始化best_macro = 0model, tokenizer = initialise_model(args['modelname'], args['num_labels'])model.cuda()model = nn.SyncBatchNorm.convert_sync_batchnorm(model) # BN層同步num_gpus = torch.cuda.device_count()if num_gpus > 1:print('use {} gpus!'.format(num_gpus))model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank) ### 套 DDPnum_training_steps = args['num_epochs'] * (args['num_samples'] // args['batch_size']) #總的訓練步數if args['requires_grad']: # 權重衰減param_optimizer = list(model.named_parameters())no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']# 設置模型參數的權重衰減optimizer_grouped_parameters = [{'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],'weight_decay': 0.01},{'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}]optimizer = AdamW(optimizer_grouped_parameters, lr=float(args['learning_rate'])) # 部分參數更新else:optimizer = AdamW(model.parameters(), lr=float(args['learning_rate'])) # 部分參數更新scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=100, num_training_steps=num_training_steps) #創建學習率調度器。scaler = GradScaler() ### 用于混合精度訓練criterion = BCEWithLogitsLoss().cuda() #定義損失函數。train_dataloader = get_dataloader(args['traincsvpath'], args, tokenizer, train=True)valid_dataloader = get_dataloader(args['valcsvpath'], args, tokenizer, train=False)for actual_epoch in trange(args['num_epochs'], desc="Epoch"):if local_rank == 0: ### 防止每個進程都輸出一次print("begin training of epoch %d / %d" % (actual_epoch + 1, args['num_epochs']))train_dataloader.sampler.set_epoch(actual_epoch) # 訓練時每次的 sampling 順序不同train(model, train_dataloader, optimizer, scheduler, criterion, actual_epoch, scaler, args) if local_rank == 0:print(f'begin validating') macro = validate(model, valid_dataloader, criterion, actual_epoch, args) #在驗證集上評估模型。if macro > best_macro:best_macro = macroif local_rank == 0: # 防止每個進程都保存一次save_model(actual_epoch, model, scaler, args['model_save_dir'] + '/best_macro_model_DDP_direct.pt')dist.destroy_process_group() # 消除進程組,和 init_process_group 相對
    • 除上述修改,main() 函數中使用到的三個函數,get_dataloader() 函數、train() 函數以及 validate() 函數,也需要進行相應更新,下面分別對其進行講解。

    1.5 get_dataloader() 函數

    該函數主要對 DataLoader() 函數進行了修改。對「訓練」和「測試」兩個階段,分別定義 train_sampler 和 test_sampler,其中,設置 train_sampler 為隨機采樣,test_sampler 為順序采樣。此外,在「訓練」階段,使用 get_ddp_generator() 函數向 DataLoader() 函數傳入參數 generator(作用于不同worker),否則會減弱訓練的隨機性。

    def get_dataloader(path, args, tokenizer, train:bool): '''根據給定的路徑獲取數據,并將數據和訓練標志傳遞給數據加載器,這樣可以方便地從給定路徑加載數據并生成數據加載器,以供后續的模型訓練和評估使用。path:數據存放路徑tokenizer:分詞器train:是否是訓練階段'''texts, labels = load_dataset(path, args['num_labels'])texts = tokenizer(texts, padding='max_length', truncation=True, return_tensors='pt', max_length=args['max_length']) data = TensorDataset(texts['input_ids'], texts['attention_mask'], torch.tensor(labels)) if train:train_sampler = DistributedSampler(data, shuffle=True) # #創建一個隨機采樣器。g = get_ddp_generator()dataloader = DataLoader(dataset=data,batch_size=args['batch_size'],num_workers=args['num_workers'],pin_memory=True,shuffle=False,sampler=train_sampler, #采用隨機采樣器。generator=g) else:test_sampler = DistributedSampler(data, shuffle=False) #創建一個順序采樣器。dataloader = DataLoader(dataset=data,batch_size=args['batch_size'],num_workers=args['num_workers'],pin_memory=True,shuffle=False,sampler=test_sampler #采用順序采樣器。)return dataloader

    1.6 train() 函數

    該函數主要通過 reduce_tensor() 函數對loss進行了取均值操作,并對反向傳播的方式進行了修改 —— 通過scaler 對梯度進行縮放,防止由于使用混合精度導致損失下溢,并且對scaler自身的狀態進行更新。多個并行進程共用同一個scaler。在模型保存過程中,如果后續需要繼續訓練(比如預訓練-微調模式),最好將scaler 的狀態一起保留,并在后續的微調過程中和模型的參數一同加載。

    def train(model, train_dataloader, optimizer, scheduler, criterion, actual_epoch, scaler, args):model.train()tr_loss = 0num_train_samples = 0for step, batch in enumerate(train_dataloader):batch = tuple(t.cuda(non_blocking=True) for t in batch)b_input_ids, b_input_mask, b_labels = batchwith torch.cuda.amp.autocast():output = model(b_input_ids, attention_mask=b_input_mask, labels=b_labels) # 運行到這一行會增加一下顯存loss = criterion(output.logits.view(-1,args['num_labels']), b_labels.type_as(output.logits).view(-1,args['num_labels']))reduced_loss = reduce_tensor(loss.data) # 對并行進程計算的多個 loss 取平均if dist.get_rank() == 0: # 防止重復輸出print("\nOutput Loss: ", reduced_loss.item())tr_loss += reduced_loss.item()# 并行狀態下的更新,不同進程分別根據自己計算的 loss 更新數據optimizer.zero_grad()scaler.scale(loss).backward()scaler.step(optimizer) # 運行到這一行會增加一下顯存# 下面四行,多個進程只執行一次scheduler.step()scaler.update()num_train_samples += b_labels.size(0) #將批次中的樣本數量添加到 num_train_samples 中。torch.cuda.empty_cache() # 釋放GPU reserved memory顯存epoch_train_loss = tr_loss / num_train_samples # num_train_samples 代表每個進程承接的樣本數量,由于上面已經有對loss取平均的操作,這里分母無需再乘以進程數if dist.get_rank() == 0:print("\nTrain loss after Epoch {} : {}".format(actual_epoch, epoch_train_loss))

    1.7 validate() 函數

    @torch.no_grad() def validate(model, valid_dataloader, criterion, epoch, args, threshold=0.5):model.eval()eval_loss = 0.0num_eval_samples = 0pred_labels = []true_labels = []for step, batch in enumerate(valid_dataloader):batch = tuple(t.cuda(non_blocking=True) for t in batch)b_input_ids, b_input_mask, b_labels = batchwith torch.no_grad():with torch.cuda.amp.autocast():output = model(b_input_ids, attention_mask=b_input_mask)logits = output.logitsloss = criterion(logits.view(-1,args['num_labels']), b_labels.type_as(logits).view(-1,args['num_labels']))reduced_loss = reduce_tensor(loss.data)eval_loss += reduced_loss.item()pred_label = torch.sigmoid(logits)pred_label = pred_label.to('cpu').numpy()b_labels = b_labels.to('cpu').numpy()pred_labels.append(pred_label)true_labels.append(b_labels)num_eval_samples += b_labels.shape[0] # 這里是針對單個 進程 的 計算樣本數epoch_eval_loss = eval_loss/num_eval_samples if dist.get_rank() == 0:print("Validation loss after Epoch {} : {}".format(epoch, epoch_eval_loss))# 每個并行進程都會分別執行下列計算操作,得到各進程對應的macro評價指標pred_labels = [item for sublist in pred_labels for item in sublist]true_labels = [item for sublist in true_labels for item in sublist]pred_bools = [pl>threshold for pl in pred_labels]true_bools = [tl==1 for tl in true_labels]macro = f1_score(true_bools, pred_bools, average='macro')# 匯總不同進程的實驗結果macro = reduce_tensor(torch.tensor(macro).cuda())return macro

    1.8 test() 函數

    由1.3節可知,我這里的程序是將「訓練&驗證」與「測試」過程分開,前一階段保存模型,后一階段對模型進行驗證。所以單獨來介紹一下 test() 函數需要修改的內容,這一部分涉及到checkpoint模型加載。加速推理方法詳見此篇博客。

    @torch.no_grad() def test(local_rank, args):init_ddp(local_rank) # 進程初始化pred_labels = []true_labels = []if local_rank == 0:print(f'begin testing') save_path = args['model_save_dir'] + '/best_macro_model_DDP_direct.pt'model, tokenizer = load_model(save_path, args['modelname'], args['num_labels'])model.cuda()model = nn.SyncBatchNorm.convert_sync_batchnorm(model) ### 轉換模型的 BN 層num_gpus = torch.cuda.device_count()if num_gpus > 1 and local_rank == 0:print('use {} gpus!'.format(num_gpus))model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank) ### 套 DDPmodel.eval()test_dataloader = get_dataloader(args['testcsvpath'], args, tokenizer, train=False)for idx, batch in enumerate(test_dataloader): #遍歷測試集的數據加載器。...... dist.destroy_process_group() # 消除進程組

    注意??在測試階段,也需要將程序并行運行,否則會報錯(以全量保存為例):

    python /data/gluo/CMLTES/codes/BLOOM_DDP_direct.py -mode "test"torch.multiprocessing.spawn.ProcessRaisedException: -- Process 1 terminated with the following error: Traceback (most recent call last): File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap fn(i, *args) File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/autograd/grad_mode.py", line 27, in decorate_context return func(*args, **kwargs) File "/data/gluo/CMLTES/codes/BLOOM_DDP_direct.py", line 449, in test output = model(b_input_ids, attention_mask=b_input_mask, labels=b_labels) #獲取模型的輸出。 File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1130, in _call_impl return forward_call(*input, **kwargs) File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/parallel/distributed.py", line 1008, in forward output = self._run_ddp_forward(*inputs, **kwargs) File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/parallel/distributed.py", line 969, in _run_ddp_forward return module_to_run(*inputs[0], **kwargs[0]) File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1130, in _call_impl return forward_call(*input, **kwargs) File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/parallel/distributed.py", line 1008, in forward output = self._run_ddp_forward(*inputs, **kwargs) File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/parallel/distributed.py", line 969, in _run_ddp_forward return module_to_run(*inputs[0], **kwargs[0]) File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1130, in _call_impl return forward_call(*input, **kwargs) File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/transformers/models/bloom/modeling_bloom.py", line 1030, in forward transformer_outputs = self.transformer( File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1130, in _call_impl return forward_call(*input, **kwargs) File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/transformers/models/bloom/modeling_bloom.py", line 727, in forward inputs_embeds = self.word_embeddings(input_ids) File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1130, in _call_impl return forward_call(*input, **kwargs) File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/modules/sparse.py", line 158, in forward return F.embedding( File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/functional.py", line 2199, in embedding return torch.embedding(weight, input, padding_idx, scale_grad_by_freq, sparse) RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:1 and cuda:0! (when checking argument for argument index in method wrapper__index_select)

    到這里,程序就全部修改完畢啦!


    2. 程序運行

    下面分別介紹 DDP 的幾種多卡啟動方式。

    2.1 mp.spawn() 啟動

    本程序采用的啟動方式是 mp.spawn() 函數,其中mp模塊完成對multiprocessing庫進行封裝,并沒有特定針對DDP。

    一開始,使用兩張 2080 Ti 顯卡并行運行程序,然而發現在第 0 個Epoch剛剛啟動不久,總是報錯 RuntimeError: CUDA out of memory.,如下:

    Traceback (most recent call last):File "/data/CMLTES_codes/experiment/bloom/BLOOM_DDP.py", line 690, in <module>mp.spawn(main, args=(args, ), nprocs=world_size)File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 240, in spawnreturn start_processes(fn, args, nprocs, join, daemon, start_method='spawn')File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processeswhile not context.join():File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 160, in joinraise ProcessRaisedException(msg, error_index, failed_process.pid) torch.multiprocessing.spawn.ProcessRaisedException: -- Process 1 terminated with the following error: Traceback (most recent call last):File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrapfn(i, *args)File "/data/CMLTES_codes/experiment/bloom/BLOOM_DDP.py", line 603, in mainepoch_train_loss = train(model, train_dataloader, optimizer, scheduler, loss_func, actual_epoch, scaler, args)File "/data/CMLTES_codes/experiment/bloom/BLOOM_DDP.py", line 336, in trainscaler.scale(loss).backward() ###File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/_tensor.py", line 396, in backwardtorch.autograd.backward(self, gradient, retain_graph, create_graph, inputs=inputs)File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/autograd/__init__.py", line 173, in backwardVariable._execution_engine.run_backward( # Calls into the C++ engine to run the backward passFile "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/autograd/function.py", line 253, in applyreturn user_fn(self, *args)File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/transformers/models/bloom/modeling_bloom.py", line 188, in backwardtmp = bloom_gelu_back(grad_output, input)File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/transformers/models/bloom/modeling_bloom.py", line 175, in bloom_gelu_backff = 0.5 * x * ((1 - tanh_out * tanh_out) * (0.79788456 + 0.1070322243 * x * x)) + 0.5 * (1 + tanh_out) RuntimeError: CUDA out of memory. Tried to allocate 32.00 MiB (GPU 1; 10.76 GiB total capacity; 8.83 GiB already allocated; 28.56 MiB free; 8.94 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation. See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

    百思不得其解后,嘗試把程序原封不動放到3090上面運行,發現可以正常運行了!這里的經驗教訓就是GPU 單卡顯存對于并行也很重要!一個1.2G左右的模型微調時大約需要40G左右的中間變量來進行反向傳播……這是我屬實沒有想到的情況……


    2.2 tochrnn 啟動

    相較于使用 mp.spawn() 啟動,torchrun 會自動控制一些環境變量的設置,因而更為方便。我們只需要設置os.environ[‘CUDA_VISIBLE_DEVICES’] 即可(不設置默認為該機器上的所有GPU),而無需設置os.environ[‘MASTER_ADDR’] 等。此外,main() 函數不再需要 local_rank 參數。程序入口變為:

    if __name__ == '__main__':......time_start = time.time()main(args)time_elapsed = time.time() - time_startlocal_rank = int(os.environ['LOCAL_RANK'])if local_rank == 0:print(f'\ntime elapsed: {time_elapsed:.2f} seconds')

    運行腳本的命令由python變為了torchrun,如下:

    torchrun --standalone --nproc_per_node=2 ddp_main_torchrun.py --gpu 0,1

    程序能夠成功運行之后,還有一些細節問題,下面一一來進行解決。

    在用這種方式啟動程序時,報如下錯誤:

    ImportError: /usr/lib/x86_64-linux-gnu/libstdc++.so.6: version `GLIBCXX_3.4.29' not found (required by /opt/conda/envs/CMLTES/lib/python3.9/site-packages/google/protobuf/pyext/_message.cpython-39-x86_64-linux-gnu.so)

    解決辦法:替換使用的 /usr/lib/x86_64-linux-gnu/libstdc++.so.6,詳情參照 此篇博客。


    2.3 torch.distributed.launch() 啟動

    這種方式代碼量更少,啟動速度更快。

    python -m torch.distributed.launch --nproc_per_node 8 xxx.py # -m 意思是 run library module as a script # -nproc_per_node 表示每臺機器的進程數

    PS:這種方式要被淘汰了:

    /opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/distributed/launch.py:178: FutureWarning: The module torch.distributed.launch is deprecated and will be removed in future. Use torchrun.

    3. Debug 歷程

    下面對使用DDP期間遇到的問題進行分析并給出解決辦法。

    問題一:多進程計算數據收集

    由于我這里是將模型復制到雙卡上實現數據并行,所以在匯總結果時,需要將不同進程上的數據進行匯總取均值計算。這時就需要用到 1.2 節提到的 all_reduce() 收集函數。

    這里要注意??:對于 float 等非張量型數據,如果我們想對其計算多進程的平均值,可以先使用 torch.tensor() 將需要匯總的變量轉為 tensor 并使用 .cuda() 命令將其放至 gpu 上,然后調用 all_reduce() 收集函數。詳見 1.7 節 validate() 函數中 macro 變量的收集計算。若沒有完成數據轉換,則會報錯如下:

    衍生問題:在進行反向傳播時,每個進程使用的訓練數據是不同的,所以還是需要根據自己當前計算的 loss 分別更新,而不是根據收集函數得到的 loss 值進行更新,否則會報錯,也不合邏輯。


    問題二:模型加載參數缺失

    在 1.4節 main() 函數中,使用「只保存模型參數」的方式存儲模型。在測試階段,用對應方式加載模型時,報錯如下:

    (CMLTES) ? CMLTES git:(master) ? python /data/gluo/CMLTES/codes/BLOOM_DDP.py -mode "test" Model directory for bloom and batch size 4 already exists! TEST FOR bloom and Batch Size4 [W socket.cpp:558] [c10d] The client socket has failed to connect to [localhost]:19198 (errno: 99 - Cannot assign requested address). begin testing Some weights of BloomForSequenceClassification were not initialized from the model checkpoint at /data/gluo/CMLTES/bloom_PRE and are newly initialized: ['score.weight'] You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference. Some weights of BloomForSequenceClassification were not initialized from the model checkpoint at /data/gluo/CMLTES/bloom_PRE and are newly initialized: ['score.weight'] You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference. Traceback (most recent call last):File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 586, in <module>mp.spawn(test, args=(args, ), nprocs=world_size)File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 240, in spawnreturn start_processes(fn, args, nprocs, join, daemon, start_method='spawn')File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processeswhile not context.join():File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 160, in joinraise ProcessRaisedException(msg, error_index, failed_process.pid) torch.multiprocessing.spawn.ProcessRaisedException: -- Process 0 terminated with the following error: Traceback (most recent call last):File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrapfn(i, *args)File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/autograd/grad_mode.py", line 27, in decorate_contextreturn func(*args, **kwargs)File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 450, in testmodel, tokenizer = load_model(save_path, args['modelname'], args['num_labels']) #加載模型。File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 95, in load_modelmodel.load_state_dict(model_state_dict) #, strict=False) #加載模型的參數。File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1604, in load_state_dictraise RuntimeError('Error(s) in loading state_dict for {}:\n\t{}'.format( RuntimeError: Error(s) in loading state_dict for BloomForSequenceClassification:Missing key(s) in state_dict: "transformer.word_embeddings.weight", "transformer.word_embeddings_layernorm.weight", "transformer.word_embeddings_layernorm.bias", "transformer.h.0.input_layernorm.weight", "transformer.h.0.input_layernorm.bias", "transformer.h.0.self_attention.query_key_value.weight", "transformer.h.0.self_attention.query_key_value.bias", "transformer.h.0.self_attention.dense.weight", "transformer.h.0.self_attention.dense.bias",

    根據此篇博客,這里暫時的處理方法是:將 load_state_dict() 函數修改為:model.load_state_dict(model_state_dict, strict=False),即設置 strict 參數值為 False. strict=False 的含義是:不嚴格要求 state_dict 中的鍵與該模塊的鍵返回的鍵匹配。

    上述處理方式可以暫時忽略上述參數缺失問題,但是可能會對模型的性能造成一定程度的影響,這一問題有待后續解決。

    PS:關于模型保存與加載的兩種方法

    根據此篇博客,保存模型有兩種方式,一是全量保存模型的全部信息,二是只保存模型的參數,兩種保存方式對應的模型加載方式自然也有所差別。

    • 保存模型的全部信息
    # 保存模型 checkpoint = {'model': model,\'scaler': scaler} torch.save(checkpoint, save_path)# 加載模型 checkpoint = torch.load(save_path) model = checkpoint['model'] # 加載模型
    • 只保存模型參數
      與第一種方式不同的是,這種方式在加載模型時,需要首先定義與保存的模型相同的模型結構,然后加載模型參數。
    # 保存模型 checkpoint = {'state_dict': model.state_dict(),\'scaler': scaler.state_dict()} torch.save(checkpoint, save_path)# 加載模型 checkpoint = torch.load(save_path, map_location=torch.device('cpu')) model_state_dict = checkpoint['state_dict'] model.load_state_dict(model_state_dict) #, strict=False) #加載模型參數。

    問題三:參數類型轉換異常

    在 1.4節 main() 函數中,使用「只保存模型參數」的方式存儲模型。在測試階段,用對應方式加載模型時,報錯如下:

    使用上述方法解決加載模型參數缺失的問題后,隨之而來的問題如下所示。

    Traceback (most recent call last):File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 587, in <module>mp.spawn(test, args=(args, ), nprocs=world_size)File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 240, in spawnreturn start_processes(fn, args, nprocs, join, daemon, start_method='spawn')File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processeswhile not context.join():File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 160, in joinraise ProcessRaisedException(msg, error_index, failed_process.pid) torch.multiprocessing.spawn.ProcessRaisedException: -- Process 0 terminated with the following error: Traceback (most recent call last):File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrapfn(i, *args)File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/autograd/grad_mode.py", line 27, in decorate_contextreturn func(*args, **kwargs)File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 459, in testmodel = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank) ### 套 DDPFile "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/parallel/distributed.py", line 646, in __init___verify_param_shape_across_processes(self.process_group, parameters)File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/distributed/utils.py", line 89, in _verify_param_shape_across_processesreturn dist._verify_params_across_processes(process_group, tensors, logger) RuntimeError: value cannot be converted to type int without overflow

    深度原因有待后續探索。


    問題四:參數泄露

    報錯日志:UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown warnings.warn('resource_tracker: There appear to be %d '


    由上圖可知,上述警告產生的原因是使用了 Ctrl+C 中斷程序。深度原因有待后續探索。

    注意??:在使用PyTorch設置多線程進行數據讀取時,后臺實際操作情況是開了N個PID連號的子進程模擬多線程工作,所以在程序跑完或者中途kill掉主進程的話,子進程的GPU顯存并不會被釋放,需要手動一個一個kill才行。

    > 本篇博客沒有涉及到的知識點:dist.barrier()、Gradient Accumulation、Apex 實現混合精度訓練&分布式訓練、

    后記:本篇博客是我經過不斷探索總結而得,其中若有表述不當或表意不明之處,還望各位不吝賜教,我們共同進步!


    參考文獻

  • 一看就懂的DDP代碼實踐 - 知乎 (zhihu.com)
  • Pytorch DistributedDataParallel簡明使用指南 - 知乎 (zhihu.com)
  • PyTorch DistributedDataParallel 單機多卡訓練 踩坑記錄
  • pytorch保存模型的兩種方式_SCU-JJkinging的博客-CSDN博客
  • 加載模型出現 RuntimeError: Error(s) in loading state_dict for Model: Missing key(s) in state_dict_sovits 加載淺擴散模型error_大海Git的博客-CSDN博客
  • pytorch中model.to(device)和map_location=device的區別_絳洞花主敏明的博客-CSDN博客
  • RuntimeError: CUDA out of memory.一些調bug路程 - 知乎 (zhihu.com)
  • pytorch 分布式計算 你們都遇到過哪些 坑/bug? - 知乎 (zhihu.com)
  • 關于pytorch中的distributedsampler函數使用_DRACO于的博客-CSDN博客
  • 通過設置PYTORCH_CUDA_ALLOC_CONF中的max_split_size_mb解決Pytorch的顯存碎片化導致的CUDA:Out Of Memory問題_夢音Yune的博客-CSDN博客
  • torch.cuda.amp.autocast()使用示例_生成滯漲網絡~的博客-CSDN博客
  • 可能99%人犯的PyTorch錯誤 set_seed 會破壞隨機性,官方 worker_init_fn 無法解決 - 知乎 (zhihu.com)
  • 原創 深度 PyTorch DDP系列第三篇:實戰與技巧 - 知乎 (zhihu.com)
  • torch.distributed_Wanderer001的博客-CSDN博客
  • 以下系列資源均來自此博主,可以說是關于數據并行十分詳細的教程了!

  • Pytorch(十一) —— 分布式(多GPU/多卡)訓練 并行 (DP & DDP)_pytorch gpu 分布式_hxxjxw的博客-CSDN博客
  • PyTorch多卡/多GPU/分布式DPP的基本概念(node&rank&local_rank&nnodes&node_rank&nproc_per_node&world_size)_hxxjxw的博客-CSDN博客
  • torch.distributed多卡/多GPU/分布式DPP(一) —— torch.distributed.launch & all_gather & init_process_group_hxxjxw的博客-CSDN博客
  • torch.distributed多卡/多GPU/分布式DPP(二)—torch.distributed.all_reduce(reduce_mean)barrier控制進程執行順序&seed隨機種子_torch dpp_hxxjxw的博客-CSDN博客
  • Pytorch分布式訓練/多卡訓練DDP——模型初始化(torch.distribute 與 DDP的區別)_pytorch distribute torchtrun-CSDN博客
  • 多卡訓練中的BN(BatchNorm)_多卡 batchnorm_hxxjxw的博客-CSDN博客
  • 為什么Pytorch多卡訓練容易導致GPU顯存不釋放_hxxjxw的博客-CSDN博客
  • Pytorch分布式訓練/多卡訓練(一) —— Data Parallel并行(DP)_model = nn.dataparallel(model, device_ids=[0, 1])_hxxjxw的博客-CSDN博客
  • Pytorch分布式訓練/多卡訓練(二) —— Data Parallel并行(DDP)(2.1)(基本概念&代碼框架)_slurm_procid_hxxjxw的博客-CSDN博客
  • Pytorch分布式訓練/多卡訓練(二) —— Data Parallel并行(DDP)(2.2)(代碼示例)(BN同步&主卡保存&梯度累加&多卡測試inference&隨機種子seed)_ddp程序中的seed_hxxjxw的博客-CSDN博客
  • Pytorch分布式訓練/多卡訓練(二) —— Data Parallel并行(DDP)(2.3)(torch.multiprocessing(spawn) & Apex)_torch.multiprocessing.spawn_hxxjxw的博客-CSDN博客
  • Pytorch分布式訓練/多卡訓練(三) —— Model Parallel 模型并行_hxxjxw的博客-CSDN博客
  • 總結

    以上是生活随笔為你收集整理的「分布式训练」使用 DDP 实现程序单机多卡并行指南的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。