[深度][PyTorch] DDP系列第一篇:入門教程
轉自:[原創][深度][PyTorch] DDP系列第一篇:入門教程
概覽
想要讓你的PyTorch神經網絡在多卡環境上跑得又快又好?那你definitely需要這一篇!
No one knows DDP better than I do!
– – MagicFrog(手動狗頭)
本文是DDP系列三篇(基本原理與入門,底層實現與代碼解析,實戰與技巧)中的第一篇。本系列力求深入淺出,簡單易懂,猴子都能看得懂(誤)。本篇主要在下述四個方面展開描述:
- DDP的原理?
- 在分類上,DDP屬于Data Parallel。簡單來講,就是通過提高batch size來增加并行度。
- 為什么快?
- DDP通過Ring-Reduce的數據交換方法提高了通訊效率,并通過啟動多個進程的方式減輕Python GIL的限制,從而提高訓練速度。
- DDP有多快?
- 一般來說,DDP都是顯著地比DP快,能達到略低于卡數的加速比(例如,四卡下加速3倍)。所以,其是目前最流行的多機多卡訓練方法。
- 怎么用DDP?
- 有點長,但是給你一個簡單、完整的示例!
請歡快地開始閱讀吧!
Quick Start
不想看原理?給你一個最簡單的DDP Pytorch例子!
依賴
PyTorch(gpu)>=1.5,python>=3.6
環境準備
推薦使用官方打好的PyTorch docker,避免亂七八糟的環境問題影響心情。
# Dockerfile
# Start FROM Nvidia PyTorch image https://ngc.nvidia.com/catalog/containers/nvidia:pytorch
FROM nvcr.io/nvidia/pytorch:20.03-py3
代碼
單GPU代碼
## main.py文件
import torch# 構造模型
model = nn.Linear(10, 10).to(local_rank)# 前向傳播
outputs = model(torch.randn(20, 10).to(rank))
labels = torch.randn(20, 10).to(rank)
loss_fn = nn.MSELoss()
loss_fn(outputs, labels).backward()
# 后向傳播
optimizer = optim.SGD(model.parameters(), lr=0.001)
optimizer.step()## Bash運行
python main.py
加入DDP的代碼
## main.py文件
import torch
# 新增:
import torch.distributed as dist# 新增:從外面得到local_rank參數
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1)
FLAGS = parser.parse_args()
local_rank = FLAGS.local_rank# 新增:DDP backend初始化
torch.cuda.set_device(local_rank)
dist.init_process_group(backend='nccl') # nccl是GPU設備上最快、最推薦的后端# 構造模型
device = torch.device("cuda", local_rank)
model = nn.Linear(10, 10).to(device)
# 新增:構造DDP model
model = DDP(model, device_ids=[local_rank], output_device=local_rank)# 前向傳播
outputs = model(torch.randn(20, 10).to(rank))
labels = torch.randn(20, 10).to(rank)
loss_fn = nn.MSELoss()
loss_fn(outputs, labels).backward()
# 后向傳播
optimizer = optim.SGD(model.parameters(), lr=0.001)
optimizer.step()## Bash運行
# 改變:使用torch.distributed.launch啟動DDP模式,
# 其會給main.py一個local_rank的參數。這就是之前需要"新增:從外面得到local_rank參數"的原因
python -m torch.distributed.launch --nproc_per_node 4 main.py
DDP的基本原理
大白話原理
假如我們有N張顯卡,
- (緩解GIL限制)在DDP模式下,會有N個進程被啟動,每個進程在一張卡上加載一個模型,這些模型的參數在數值上是相同的。
- (Ring-Reduce加速)在模型訓練時,各個進程通過一種叫Ring-Reduce的方法與其他進程通訊,交換各自的梯度,從而獲得所有進程的梯度;
- (實際上就是Data Parallelism)各個進程用平均后的梯度更新自己的參數,因為各個進程的初始參數、更新梯度是一致的,所以更新后的參數也是完全相同的。
是不是很簡單呢?
與DP模式的不同
那么,DDP對比Data Parallel(DP)模式有什么不同呢?
DP模式是很早就出現的、單機多卡的、參數服務器架構的多卡訓練模式,在PyTorch,即是:
model = torch.nn.DataParallel(model)
在DP模式中,總共只有一個進程(受到GIL很強限制)。master節點相當于參數服務器,其會向其他卡廣播其參數;在梯度反向傳播后,各卡將梯度集中到master節點,master節點對搜集來的參數進行平均后更新參數,再將參數統一發送到其他卡上。這種參數更新方式,會導致master節點的計算任務、通訊量很重,從而導致網絡阻塞,降低訓練速度。
但是DP也有優點,優點就是代碼實現簡單。要速度還是要方便,看官可以自行選用噢。
DDP為什么能加速?
本節對上面出現的幾個概念進行一下介紹,看完了你就知道為什么DDP這么快啦!
Python GIL
GIL是個很捉急的東西,如果大家有被煩過的話,相信會相當清楚。如果不了解的同學,可以自行百度一下噢。
這里簡要介紹下其最大的特征(缺點):Python GIL的存在使得,一個python進程只能利用一個CPU核心,不適合用于計算密集型的任務。
使用多進程,才能有效率利用多核的計算資源。
而DDP啟動多進程訓練,一定程度地突破了這個限制。
Ring-Reduce梯度合并
Ring-Reduce是一種分布式程序的通訊方法。
- 因為提高通訊效率,Ring-Reduce比DP的parameter server快。
- 其避免了master階段的通訊阻塞現象,n個進程的耗時是o(n)。
- 詳細的介紹:ring allreduce和tree allreduce的具體區別是什么?
簡單說明
- 各進程獨立計算梯度。
- 每個進程將梯度依次傳遞給下一個進程,之后再把從上一個進程拿到的梯度傳遞給下一個進程。循環n次(進程數量)之后,所有進程就可以得到全部的梯度了。
- 可以看到,每個進程只跟自己上下游兩個進程進行通訊,極大地緩解了參數服務器的通訊阻塞現象!
并行計算
-
Data Parallelism
- 這是最常見的形式,通俗來講,就是增大batch size。
- 平時我們看到的多卡并行就屬于這種。比如DP、DDP都是。這能讓我們方便地利用多卡計算資源。
- 能加速。
- 這是最常見的形式,通俗來講,就是增大batch size。
-
Model Parallelism
- 把模型放在不同GPU上,計算是并行的。
- 有可能是加速的,看通訊效率。
-
Workload Partitioning
- 把模型放在不同GPU上,但計算是串行的。
- 不能加速。
如何在PyTorch中使用DDP
看到這里,你應該對DDP是怎么運作的,為什么能加速有了一定的了解,下面就讓我們學習一下怎么使用DDP吧!
如何在PyTorch中使用DDP:DDP模式
DDP有不同的使用模式。DDP的官方最佳實踐是,每一張卡對應一個單獨的GPU模型(也就是一個進程),在下面介紹中,都會默認遵循這個pattern。
舉個例子:我有兩臺機子,每臺8張顯卡,那就是2x8=16個進程,并行數是16。
但是,我們也是可以給每個進程分配多張卡的。總的來說,分為以下三種情況:
- 每個進程一張卡。這是DDP的最佳使用方法。
- 每個進程多張卡,復制模式。一個模型復制在不同卡上面,每個進程都實質等同于DP模式。這樣做是能跑得通的,但是,速度不如上一種方法,一般不采用。
- 每個進程多張卡,并行模式。一個模型的不同部分分布在不同的卡上面。例如,網絡的前半部分在0號卡上,后半部分在1號卡上。這種場景,一般是因為我們的模型非常大,大到一張卡都塞不下batch size = 1的一個模型。
在本文中,先不會講每個進程多張卡要怎么操作,免得文章過于冗長。在這里,只是讓你知道有這個東西,用的時候再查閱文檔。
如何在PyTorch中使用DDP:概念
下面介紹一些PyTorch分布式編程的基礎概念。
基本概念
在16張顯卡,16的并行數下,DDP會同時啟動16個進程。下面介紹一些分布式的概念。
group
即進程組。默認情況下,只有一個組。這個可以先不管,一直用默認的就行。
world size
表示全局的并行數,簡單來講,就是2x8=16。
# 獲取world size,在不同進程里都是一樣的,得到16
torch.distributed.get_world_size()
rank
表現當前進程的序號,用于進程間通訊。對于16的world sizel來說,就是0,1,2,…,15。
注意:rank=0的進程就是master進程。
# 獲取rank,每個進程都有自己的序號,各不相同
torch.distributed.get_rank()
local_rank
又一個序號。這是每臺機子上的進程的序號。機器一上有0,1,2,3,4,5,6,7,機器二上也有0,1,2,3,4,5,6,7
# 獲取local_rank。一般情況下,你需要用這個local_rank來手動設置當前模型是跑在當前機器的哪塊GPU上面的。
torch.distributed.local_rank()
如何在PyTorch中使用DDP:詳細流程
精髓
DDP的使用非常簡單,因為它不需要修改你網絡的配置。其精髓只有一句話
model = DDP(model, device_ids=[local_rank], output_device=local_rank)
原本的model就是你的PyTorch模型,新得到的model,就是你的DDP模型。
最重要的是,后續的模型關于前向傳播、后向傳播的用法,和原來完全一致!DDP把分布式訓練的細節都隱藏起來了,不需要暴露給用戶,非常優雅!
(對于有時間的人,如果你想知道DDP的實現方式,請看DDP第二篇進階部分)
準備工作
但是,在套model = DDP(model)
之前,我們還是需要做一番準備功夫,把環境準備好的。
這里需要注意的是,我們的程序雖然會在16個進程上跑起來,但是它們跑的是同一份代碼,所以在寫程序的時候要處理好不同進程的關系。
## main.py文件
import torch
import argparse# 新增1:依賴
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP# 新增2:從外面得到local_rank參數,在調用DDP的時候,其會自動給出這個參數,后面還會介紹。所以不用考慮太多,照著抄就是了。
# argparse是python的一個系統庫,用來處理命令行調用,如果不熟悉,可以稍微百度一下,很簡單!
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1)
FLAGS = parser.parse_args()
local_rank = FLAGS.local_rank# 新增3:DDP backend初始化
# a.根據local_rank來設定當前使用哪塊GPU
torch.cuda.set_device(local_rank)
# b.初始化DDP,使用默認backend(nccl)就行。如果是CPU模型運行,需要選擇其他后端。
dist.init_process_group(backend='nccl')# 新增4:定義并把模型放置到單獨的GPU上,需要在調用`model=DDP(model)`前做哦。
# 如果要加載模型,也必須在這里做哦。
device = torch.device("cuda", local_rank)
model = nn.Linear(10, 10).to(device)
# 可能的load模型...# 新增5:之后才是初始化DDP模型
model = DDP(model, device_ids=[local_rank], output_device=local_rank)
前向與后向傳播
有一個很重要的概念,就是數據的并行化。
我們知道,DDP同時起了很多個進程,但是他們用的是同一份數據,那么就會有數據上的冗余性。也就是說,你平時一個epoch如果是一萬份數據,現在就要變成1*16=16萬份數據了。
那么,我們需要使用一個特殊的sampler,來使得各個進程上的數據各不相同,進而讓一個epoch還是1萬份數據。
幸福的是,DDP也幫我們做好了!
my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True)
# 新增1:使用DistributedSampler,DDP幫我們把細節都封裝起來了。用,就完事兒!
# sampler的原理,后面也會介紹。
train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
# 需要注意的是,這里的batch_size指的是每個進程下的batch_size。也就是說,總batch_size是這里的batch_size再乘以并行數(world_size)。
trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size, sampler=train_sampler)for epoch in range(num_epochs):# 新增2:設置sampler的epoch,DistributedSampler需要這個來維持各個進程之間的相同隨機數種子trainloader.sampler.set_epoch(epoch)# 后面這部分,則與原來完全一致了。for data, label in trainloader:prediction = model(data)loss = loss_fn(prediction, label)loss.backward()optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)optimizer.step()
其他需要注意的地方
- 保存參數
# 1. save模型的時候,和DP模式一樣,有一個需要注意的點:保存的是model.module而不是model。
# 因為model其實是DDP model,參數是被`model=DDP(model)`包起來的。
# 2. 我只需要在進程0上保存一次就行了,避免多次保存重復的東西。
if dist.get_rank() == 0:torch.save(model.module, "saved_model.ckpt")
-
理論上,在沒有buffer參數(如BN)的情況下,DDP性能和單卡Gradient Accumulation性能是完全一致的。
-
并行度為8的DDP 等于 Gradient Accumulation Step為8的單卡
-
速度上,DDP當然比Graident Accumulation的單卡快;
- 但是還有加速空間。請見DDP系列第三篇:實戰。
-
如果要對齊性能,需要確保喂進去的數據,在DDP下和在單卡Gradient Accumulation下是一致的。
- 這個說起來簡單,但對于復雜模型,可能是相當困難的。
-
調用方式
像我們在QuickStart里面看到的,DDP模型下,python源代碼的調用方式和原來的不一樣了。現在,需要用torch.distributed.launch
來啟動訓練。
-
作用
- 在這里,我們給出分布式訓練的重要參數:
- 有多少臺機器?
- –nnodes
- 當前是哪臺機器?
- –node_rank
- 每臺機器有多少個進程?
- –nproc_per_node
- 有多少臺機器?
- 高級參數(可以先不看,多機模式才會用到)
- 通訊的address
- 通訊的address
- 在這里,我們給出分布式訓練的重要參數:
-
實現方式
-
我們需要在每一臺機子(總共m臺)上都運行一次
torch.distributed.launch
-
每個
torch.distributed.launch
會啟動n個進程,并給每個進程一個--local_rank=i
的參數- 這就是之前需要"新增:從外面得到local_rank參數"的原因
-
這樣我們就得到n*m個進程,world_size=n*m
-
單機模式
## Bash運行
# 假設我們只在一臺機器上運行,可用卡數是8
python -m torch.distributed.launch --nproc_per_node 8 main.py
多機模式
復習一下,master進程就是rank=0的進程。
在使用多機模式前,需要介紹兩個參數:
-
通訊的address
-
--master_address
-
也就是master進程的網絡地址
-
默認是:127.0.0.1,只能用于單機。
-
-
通訊的port
-
--master_port
-
也就是master進程的一個端口,要先確認這個端口沒有被其他程序占用了哦。一般情況下用默認的就行
-
默認是:29500
-
## Bash運行
# 假設我們在2臺機器上運行,每臺可用卡數是8
# 機器1:
python -m torch.distributed.launch --nnodes=2 --node_rank=0 --nproc_per_node 8 \--master_adderss $my_address --master_port $my_port main.py
# 機器2:
python -m torch.distributed.launch --nnodes=2 --node_rank=1 --nproc_per_node 8 \--master_adderss $my_address --master_port $my_port main.py
小技巧
# 假設我們只用4,5,6,7號卡
CUDA_VISIBLE_DEVICES="4,5,6,7" python -m torch.distributed.launch --nproc_per_node 4 main.py
# 假如我們還有另外一個實驗要跑,也就是同時跑兩個不同實驗。
# 這時,為避免master_port沖突,我們需要指定一個新的。這里我隨便敲了一個。
CUDA_VISIBLE_DEVICES="4,5,6,7" python -m torch.distributed.launch --nproc_per_node 4 \--master_port 53453 main.py
mp.spawn調用方式
PyTorch引入了torch.multiprocessing.spawn,可以使得單卡、DDP下的外部調用一致,即不用使用torch.distributed.launch。 python main.py一句話搞定DDP模式。
給一個mp.spawn的文檔:代碼文檔
下面給一個簡單的demo:
def demo_fn(rank, world_size):dist.init_process_group("nccl", rank=rank, world_size=world_size)# lots of code....def run_demo(demo_fn, world_size):mp.spawn(demo_fn,args=(world_size,),nprocs=world_size,join=True)
mp.spawn與launch各有利弊,請按照自己的情況選用。
按照筆者個人經驗,如果算法程序是提供給別人用的,那么mp.spawn更方便,因為不用解釋launch的用法;但是如果是自己使用,launch更有利,因為你的內部程序會更簡單,支持單卡、多卡DDP模式也更簡單。
總結
既然看到了這里,不妨點個贊/喜歡吧!
在本篇中,我們介紹了DDP的加速原理,和基本用法。如果你能充分理解文章內容,那么,你可以說對DDP初步入門了,可以開始改造你的算法程序,來吃掉多卡訓練速度提升這波紅利了!
在DDP系列的后面兩篇中,我們還會介紹DDP的底層實現方法,以及DDP的一些實戰。這些屬于進階的文章,如果你的DDP程序運行情況不理想/沒有獲得速度提升,或者你比較有探究精神/學習興趣濃厚,那么一定不要錯過后面這兩篇.
最后讓我們來總結一下所有的代碼,這份是一份能直接跑的代碼,推薦收藏!
################
## main.py文件
import argparse
from tqdm import tqdm
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
# 新增:
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP### 1. 基礎模塊 ###
# 假設我們的模型是這個,與DDP無關
class ToyModel(nn.Module):def __init__(self):super(ToyModel, self).__init__()self.conv1 = nn.Conv2d(3, 6, 5)self.pool = nn.MaxPool2d(2, 2)self.conv2 = nn.Conv2d(6, 16, 5)self.fc1 = nn.Linear(16 * 5 * 5, 120)self.fc2 = nn.Linear(120, 84)self.fc3 = nn.Linear(84, 10)def forward(self, x):x = self.pool(F.relu(self.conv1(x)))x = self.pool(F.relu(self.conv2(x)))x = x.view(-1, 16 * 5 * 5)x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))x = self.fc3(x)return x
# 假設我們的數據是這個
def get_dataset():transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor(),torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)# DDP:使用DistributedSampler,DDP幫我們把細節都封裝起來了。# 用,就完事兒!sampler的原理,第二篇中有介紹。train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)# DDP:需要注意的是,這里的batch_size指的是每個進程下的batch_size。# 也就是說,總batch_size是這里的batch_size再乘以并行數(world_size)。trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=16, num_workers=2, sampler=train_sampler)return trainloader### 2. 初始化我們的模型、數據、各種配置 ####
# DDP:從外部得到local_rank參數
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1, type=int)
FLAGS = parser.parse_args()
local_rank = FLAGS.local_rank# DDP:DDP backend初始化
torch.cuda.set_device(local_rank)
dist.init_process_group(backend='nccl') # nccl是GPU設備上最快、最推薦的后端# 準備數據,要在DDP初始化之后進行
trainloader = get_dataset()# 構造模型
model = ToyModel().to(local_rank)
# DDP: Load模型要在構造DDP模型之前,且只需要在master上加載就行了。
ckpt_path = None
if dist.get_rank() == 0 and ckpt_path is not None:model.load_state_dict(torch.load(ckpt_path))
# DDP: 構造DDP model
model = DDP(model, device_ids=[local_rank], output_device=local_rank)# DDP: 要在構造DDP model之后,才能用model初始化optimizer。
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)# 假設我們的loss是這個
loss_func = nn.CrossEntropyLoss().to(local_rank)### 3. 網絡訓練 ###
model.train()
iterator = tqdm(range(100))
for epoch in iterator:# DDP:設置sampler的epoch,# DistributedSampler需要這個來指定shuffle方式,# 通過維持各個進程之間的相同隨機數種子使不同進程能獲得同樣的shuffle效果。trainloader.sampler.set_epoch(epoch)# 后面這部分,則與原來完全一致了。for data, label in trainloader:data, label = data.to(local_rank), label.to(local_rank)optimizer.zero_grad()prediction = model(data)loss = loss_func(prediction, label)loss.backward()iterator.desc = "loss = %0.3f" % lossoptimizer.step()# DDP:# 1. save模型的時候,和DP模式一樣,有一個需要注意的點:保存的是model.module而不是model。# 因為model其實是DDP model,參數是被`model=DDP(model)`包起來的。# 2. 只需要在進程0上保存一次就行了,避免多次保存重復的東西。if dist.get_rank() == 0:torch.save(model.module.state_dict(), "%d.ckpt" % epoch)################
## Bash運行
# DDP: 使用torch.distributed.launch啟動DDP模式
# 使用CUDA_VISIBLE_DEVICES,來決定使用哪些GPU
# CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node 2 main.py
Citation
- 很全面的知乎上的文章:會飛的閑魚:Pytorch 分布式訓練
- pytorch 官方入門:https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
- pytorch 官方設計筆記:https://pytorch.org/docs/master/notes/ddp.html
- 關于并行的介紹:https://medium.com/@esaliya/model-parallelism-in-deep-learning-is-not-what-you-think-94d2f81e82ed