[深度][PyTorch] DDP系列第二篇:實現原理與源代碼解析
轉自:https://zhuanlan.zhihu.com/p/187610959
概覽
想要讓你的PyTorch神經網絡在多卡環境上跑得又快又好?那你definitely需要這一篇!
No one knows DDP better than I do!
– – MagicFrog(手動狗頭)
本文是DDP系列三篇(基本原理與入門,實現原理與源代碼解析,實戰與技巧)中的第二篇。本系列力求深入淺出,簡單易懂,猴子都能看得懂(誤)。本篇主要聚焦于DDP原理和源代碼解析。
雖然是進階篇,但是本篇力求做到簡單易懂,涉及的新概念都會有講解、引用。看完這篇后,你的收獲將是:
- 了解分布式計算的概念
- 了解PyTorch模型的狀態表示和構成
- 學習DDP的精巧的實現技巧
- 學會如何debug你的DDP模型
請歡快地開始閱讀吧!
依賴
pytorch(gpu)>=1.5,python>=3.6
復習
我們先回顧一下DDP的代碼實現。如果有看不懂的地方,那就得回去看上一篇哦。
################
## main.py文件
import argparse
from tqdm import tqdm
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
# 新增:
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP### 1. 基礎模塊 ###
# 假設我們的模型是這個,與DDP無關
class ToyModel(nn.Module):def __init__(self):super(ToyModel, self).__init__()self.conv1 = nn.Conv2d(3, 6, 5)self.pool = nn.MaxPool2d(2, 2)self.conv2 = nn.Conv2d(6, 16, 5)self.fc1 = nn.Linear(16 * 5 * 5, 120)self.fc2 = nn.Linear(120, 84)self.fc3 = nn.Linear(84, 10)def forward(self, x):x = self.pool(F.relu(self.conv1(x)))x = self.pool(F.relu(self.conv2(x)))x = x.view(-1, 16 * 5 * 5)x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))x = self.fc3(x)return x
# 假設我們的數據是這個
def get_dataset():transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor(),torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)# DDP:使用DistributedSampler,DDP幫我們把細節都封裝起來了。# 用,就完事兒!sampler的原理,第二篇中有介紹。train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)# DDP:需要注意的是,這里的batch_size指的是每個進程下的batch_size。# 也就是說,總batch_size是這里的batch_size再乘以并行數(world_size)。trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=16, num_workers=2, sampler=train_sampler)return trainloader### 2. 初始化我們的模型、數據、各種配置 ####
# DDP:從外部得到local_rank參數
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1, type=int)
FLAGS = parser.parse_args()
local_rank = FLAGS.local_rank# DDP:DDP backend初始化
torch.cuda.set_device(local_rank)
dist.init_process_group(backend='nccl') # nccl是GPU設備上最快、最推薦的后端# 準備數據,要在DDP初始化之后進行
trainloader = get_dataset()# 構造模型
model = ToyModel().to(local_rank)
# DDP: Load模型要在構造DDP模型之前,且只需要在master上加載就行了。
ckpt_path = None
if dist.get_rank() == 0 and ckpt_path is not None:model.load_state_dict(torch.load(ckpt_path))
# DDP: 構造DDP model
model = DDP(model, device_ids=[local_rank], output_device=local_rank)# DDP: 要在構造DDP model之后,才能用model初始化optimizer。
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)# 假設我們的loss是這個
loss_func = nn.CrossEntropyLoss().to(local_rank)### 3. 網絡訓練 ###
model.train()
iterator = tqdm(range(100))
for epoch in iterator:# DDP:設置sampler的epoch,# DistributedSampler需要這個來指定shuffle方式,# 通過維持各個進程之間的相同隨機數種子使不同進程能獲得同樣的shuffle效果。trainloader.sampler.set_epoch(epoch)# 后面這部分,則與原來完全一致了。for data, label in trainloader:data, label = data.to(local_rank), label.to(local_rank)optimizer.zero_grad()prediction = model(data)loss = loss_func(prediction, label)loss.backward()iterator.desc = "loss = %0.3f" % lossoptimizer.step()# DDP:# 1. save模型的時候,和DP模式一樣,有一個需要注意的點:保存的是model.module而不是model。# 因為model其實是DDP model,參數是被`model=DDP(model)`包起來的。# 2. 只需要在進程0上保存一次就行了,避免多次保存重復的東西。if dist.get_rank() == 0:torch.save(model.module.state_dict(), "%d.ckpt" % epoch)################
## Bash運行
# DDP: 使用torch.distributed.launch啟動DDP模式
# 使用CUDA_VISIBLE_DEVICES,來決定使用哪些GPU
# CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node 2 main.py
背景概念
在正式介紹之前,我們先認識一些基本概念,打好基礎。地基是很重要的,請各位同學認真學習哦!
分布式編程
一個分布式系統,相對于單機系統,其最大的特征就是,其數據、處理是分布在不同地方的。與此相伴的是,各節點間有交換數據的需求,為此需要定義交換數據的規范、接口。在此基礎上,才能構建起分布式計算的大框架。比如很有名的google大數據三駕馬車之一的map-reduce
概念,簡要地描述,就是將數據分開成N份map到N個地方,并行進行處理;處理完成后,再將結果reduce到一起。
為了滿足分布式編程的需求,PyTorch提供了一些分布式基本接口,在torch.distributed中。有興趣的可以自己翻閱:文檔 and 代碼
下圖闡述了PyTorch實現的分布式接口:
在DDP這里,我們重點介紹一下最重要的實現,all_reduce
。
- 所謂的reduce,就是不同節點各有一份數據,把這些數據匯總到一起。在這里,我們規定各個節點上的這份數據有著相同的shape和data type,并規定匯總的方法是相加。簡而言之,就是把各個節點上的一份相同規范的數據相加到一起。
- 所謂的
all_reduce
,就是在reduce的基礎上,把最終的結果發回到各個節點上。 - 具體的all*reduce實現,要看具體的backend。流行的GPU backend NCCL,*all_reduce的實現就是使用了ring思想。
DDP利用all_reduce
,來進行不同進程上的梯度的平均操作。PyTorch提供了幾個all_reduce的版本,下面這個就是Ring-Reduce版本(我們在前篇闡述了為什么Ring-Reduce是一個更好的版本):
def all_reduce(tensor,op=ReduceOp.SUM,group=group.WORLD,async_op=False):"""Reduces the tensor data across all machines in such a way that all getthe final result.After the call ``tensor`` is going to be bitwise identical in all processes.Arguments:tensor (Tensor): Input and output of the collective. The functionoperates in-place.op (optional): One of the values from``torch.distributed.ReduceOp``enum. Specifies an operation used for element-wise reductions.group (ProcessGroup, optional): The process group to work onasync_op (bool, optional): Whether this op should be an async opReturns:Async work handle, if async_op is set to True.None, if not async_op or if not part of the group"""
PyTorch 數據結構基礎
DDP到底和什么數據結構打交道呢?我們要首先解決這些問題:
- 我們知道,DDP下各進程不同步參數而是同步參數的變化量,所以各進程的模型的狀態同一性是非常重要的。那么模型的狀態由什么構成呢?
- DDP是怎么做到,無論是什么模型進來,一個簡單的
model = DDP(model)
就可以解決問題呢?它的邏輯是怎么嵌入到模型中的?
buffer
解決第一個問題,需要了解buffer的概念。
在PyTorch中,所有的模型都會繼承module類。可以說,一個CNN模型,其就是由一系列module組合而成的。要了解模型,就必須從module下手。下面是module的初始化代碼,可以看到,它定義了一系列變量。可以說,這些變量就組成了一個module的基本要素。
代碼
# torch.nn.modules.py. line 71. Class module:def __init__(self):"""Initializes internal Module state, shared by both nn.Module and ScriptModule."""torch._C._log_api_usage_once("python.nn_module")self.training = Trueself._parameters = OrderedDict()self._buffers = OrderedDict()self._backward_hooks = OrderedDict()self._forward_hooks = OrderedDict()self._forward_pre_hooks = OrderedDict()self._state_dict_hooks = OrderedDict()self._load_state_dict_pre_hooks = OrderedDict()self._modules = OrderedDict()
總的來說,module的基本要素**可以分為2組,一組是狀態,一組是各種各樣的hooks。**狀態有以下4個東西:
- self.training
- 指的是網絡是否在訓練狀態中。這是個非常宏觀的狀態,大家都知道這個是啥,可以略過。
- self._modules
- _modules是下屬的模塊,相當于迭代地定義了self.trainig, self._modules, self._parameters等一系列變量
- self._parameters
- 指的就是網絡的參數
- self._buffers
- 不是參數,但也對網絡很重要,會被持久化保存的數據。
- 舉個例子,BatchNorm中的moving mean and variance就是buffer,其優化不是通過梯度反向傳播而是通過其他途徑。
**從本質上講,當一個模型的網絡結構被定義后,其狀態就是由parameter和buffer的迭代組合表示的。**當我們保存模型,調用model.staic_dict()
的時候,我們同時會得到模型的parameter和buffer;也就是說,在DDP中,如果我們要在不同進程中維持相同的狀態,我們不光要傳遞parameter的梯度,也要傳遞buffer。事實上,DDP就是這么做的。當每次網絡傳播開始前,其都會把master節點上的buffer廣播給其他節點,維持狀態的統一。
hook
回答第二個問題,需要了解hook的概念。
hook的中文是鉤子
,是一種技術概念。用形象的話講,hook提供了這么一種機制:程序提供hook接口,用戶可以寫一個hook函數,然后鉤在hook接口,即程序的主體上從而可以插入到中間執行。DDP使用hook技術把自己的邏輯插入到module的訓練過程中去。
在前一篇文章中,曾經講過
在模型訓練時,各個進程通過一種叫Ring-Reduce的方法與其他進程通訊,從而獲得所有進程的梯度;
那么,Ring-Reduce機制是怎么插入到module中去的呢?這歸功于PyTorch提供了很多個hook接口!
其中,就有一個是,parameter在反向梯度計算結束后提供了一個hook接口。DDP把Ring-Reduce的代碼寫成一個hook函數,插入到這里。每次parameter的反向梯度計算結束后,程序就會調用這個hook函數,從而開啟Ring-Reduce流程。因為所有模型都用到parameter,所以DDP模型用hook函數就解決了所有模型的梯度平均問題了!
下面,我們來看看其具體的代碼實現
torch.nn.parameter
torch.nn.parameter
只是torch.Tensor
上的一層概念封裝,沒什么時候特別的。hook機制也是定義在torch.Tensor
中的。
torch.tensor.Tensor
有一點需要說明,DDP的關鍵代碼(即梯度平均)是用C++實現的。但是,在C++、python代碼中Tensor都給出了hook接口,實現相似的功能。所以我們可以看下Tensor的python hook接口的文檔,來理解下hook這個概念。
# line 200. Class Tensor.def register_hook(self, hook):r"""Registers a backward hook.The hook will be called every time a gradient with respect to theTensor is computed. The hook should have the following signature::hook(grad) -> Tensor or NoneThe hook should not modify its argument, but it can optionally returna new gradient which will be used in place of :attr:`grad`.This function returns a handle with a method ``handle.remove()``that removes the hook from the module.Example::>>> v = torch.tensor([0., 0., 0.], requires_grad=True)>>> h = v.register_hook(lambda grad: grad * 2) # double the gradient>>> v.backward(torch.tensor([1., 2., 3.]))>>> v.grad246[torch.FloatTensor of size (3,)]>>> h.remove() # removes the hook"""
DDP內部實現
Finally,經過一系列鋪墊,終于要來講DDP是怎么實現的了。在讀到這里的時候,你應該對DDP的大致原理、PyTorch是怎么訓練的有一定的了解。現在就來了解一下最底層的細節吧!
下面,我們會給出具體源代碼的URL,復習一下不同的DDP模式,給出一份DDP訓練流程的偽代碼,最后總結一下易錯的注意事項。
代碼位置
DDP的代碼主要在以下幾個地方:
https://github.com/pytorch/pytorch/blob/v1.5.0/torch/nn/parallel/distributed.py
https://github.com/pytorch/pytorch/blob/v1.5.0/torch/distributed/distributed_c10d.py
https://github.com/pytorch/pytorch/blob/v1.5.0/torch/csrc/distributed/c10d/reducer.h
同時推薦一個官方設計筆記,講得很詳細,有興趣可以看看。
DDP模式
之前我們介紹過DDP模式。在這里,我們復習一下。因為,在接下來的DDP流程介紹中,我們是要處理不同的模式的。
- 每個進程一張卡。這是DDP的最佳使用方法。
- 每個進程多張卡,復制模式。一個模型復制在不同卡上面,每個進程都實質等同于DP模式。這樣做是能跑得通的,但是,速度不如上一種方法,一般不采用。
- 每個進程多張卡,并行模式。一個模型的不同部分分布在不同的卡上面。例如,網絡的前半部分在0號卡上,后半部分在1號卡上。這種場景,一般是因為我們的模型非常大,大到一張卡都塞不下batch size = 1的一個模型
正篇!正篇!DDP流程的偽代碼
我們總結了一個DDP模型在訓練過程中的偽代碼,來清晰地描述DDP的細節。
DDP很簡單,但是流程并不簡單。額外的代碼主要是在,處理不同的DDP模式以及加速。刨去這些,主體其實是很簡單的,所以不要害怕,大膽看完!
準備階段
-
環境準備(就是
init_process_group
這一步)。各個進程會在這一步,與master節點進行握手,建立連接。 -
- 注釋:如果連接上的進程數量不足約定的 word_size,進程會一直等待。也就是說,如果你約定了
world_size=64
,但是只開了6臺8卡機器,那么程序會一直暫停在這個地方。
- 注釋:如果連接上的進程數量不足約定的 word_size,進程會一直等待。也就是說,如果你約定了
-
DDP初始化(也就是
model = DDP(model)
這一步) -
-
把parameter,buffer從master節點傳到其他節點,使所有進程上的狀態一致。
-
- 注釋:DDP通過這一步保證所有進程的初始狀態一致。所以,請確保在這一步之后,你的代碼不會再修改模型的任何東西了,包括添加、修改、刪除parameter和buffer!
-
(可能)如果有每個節點有多卡,則在每張卡上創建模型(類似DP)
-
**把parameter進行分組,每一組稱為一個bucket。**臨近的parameter在同一個bucket。
-
- 注釋:這是為了加速,在梯度通訊時,先計算、得到梯度的bucket會馬上進行通訊,不必等到所有梯度計算結束才進行通訊。后面會詳細介紹。
-
創建管理器reducer,給每個parameter注冊梯度平均的hook。
-
- 注釋:這一步的具體實現是在C++代碼里面的,即reducer.h文件。
-
(可能)為可能的SyncBN層做準備
-
正式訓練階段
在每個step中,DDP模型都會做下面的事情:
-
采樣數據,從dataloader得到一個batch的數據,用于當前計算(
for data, label in dataloader
)。- 注釋:因為我們的dataloader使用了DistributedSampler,所以各個進程之間的數據是不會重復的。如果要確保DDP性能和單卡性能一致,這邊需要保證在數據上,DDP模式下的一個epoch和單卡下的一個epoch是等效的。
-
進行網絡的前向計算(
prediction = model(data)
)-
同步各進程狀態
- (可能)對單進程多卡復制模式,要在進程內同步多卡之間的parameter和buffer
- 同步各進程之間的buffer。
-
接下來才是進行真正的前向計算
-
(可能)當DDP參數find_unused_parameter為true時,其會在forward結束時,啟動一個回溯,標記出所有沒被用到的parameter,提前把這些設定為ready。
- 注釋:find_unused_parameter的默認值是false,因為其會拖慢速度。
-
-
計算梯度(
loss.backward()
)-
reducer外面:各個進程各自開始反向地計算梯度。
- 注釋:梯度是反向計算的,所以最后面的參數反而是最先得到梯度的。
-
reducer外面:當某個parameter的梯度計算好了的時候,其之前注冊的grad hook就會被觸發,在reducer里把這個parameter的狀態標記為ready。
-
reducer里面:當某個bucket的所有parameter都是ready狀態時,reducer會開始對這個bucket的所有parameter都開始一個異步的all-reduce梯度平均操作。
- 注釋:
- bucket的執行過程也是有順序的,其順序與parameter是相反的,即最先注冊的parameter的bucket在最后面。
- 所以,我們在創建module的時候,請務必把先進行計算的parameter注冊在前面,后計算的在后面。不然,reducer會卡在某一個bucket等待,使訓練時間延長!
- 所謂的參數注冊,其實就是創建網絡層。也就是要求按照網絡計算順序,依次創建網絡層。
- 注釋:
-
reducer里面:當所有bucket的梯度平均都結束后,reducer才會把得到的平均grad結果正式寫入到parameter.grad里面。
- 注釋:這一步,感覺沒有必要等全部結束之后才進行。可能得對照一下源碼。
-
-
雖然DDP的實現代碼與optimizer沒有關系,但是關于optimizer有個額外的東西需要說明。\
- 注釋:這一步,是和DDP沒關系的。
更新后的參數最終能在各進程間保持一致,是由以下因素保證的:
- 參數初始值相同
- 參數更新值相同
- 更新值相同又是由以下因素保證的:
- optimizer初始狀態相同
- 每次
opimizer.step()
時的梯度相同。
- 更新值相同又是由以下因素保證的:
我們可以看到,因為optimizer和DDP是沒有關系的,所以optimizer初始狀態的同一性是不被DDP保證的!
大多數官方optimizer,其實現能保證從同樣狀態的model初始化時,其初始狀態是相同的。所以這邊我們只要保證在DDP模型創建后才初始化optimizer,就不用做額外的操作。但是,如果自定義optimizer,則需要你自己來保證其統一性!
回顧一下文章最開始的代碼,你會發現,optimizer確實是在DDP之后定義的。這個時候的模式已經是被初始化為相同的參數,所以能夠保證優化器的初始狀態是相同的。
# 新增:構造DDP model
model = DDP(model, device_ids=[local_rank], output_device=local_rank)# 優化器:要在構造DDP model之后,才能初始化model。
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.8)
為什么速度沒怎么提升/性能下降
很多同學可能有這么一個問題,我加入了DDP,為什么速度沒怎么提升/性能下降了呢?我給大家準備了一個check list。
-
是否遵循了“單進程單卡”這樣的最佳工程實踐?
- “單進程多卡復制模式”在速度上不是最優的,而且不被PyTorch社區優先支持,避免使用。
-
是否使用了默認的NCCL后端?
- 用就完事。
-
各進程的模型是否相同?
-
用戶必須保證,不同進程里的模型都是相同結構的;保證parameter(你可以理解為網絡層)的創建順序是一致的。
-
模型的parameter創建順序是否與真實計算順序一致?
-
這涉及到bucket的通訊效率優化
-
產生DDP模型后,是否手動動了它的參數?
-
不允許在產生DDP后,新增、減少、隨機修改、替換參數,會造成梯度reduce出錯、各進程間的參數不相同、丟失hook機制。
-
DDP模式下的一個epoch的數據和單卡下的一個epoch的數據是否是等效的?
- 實際上,n卡的DDP模式,理論上可以等價于n次gradient accumulation的單卡模式。所以,確保你的數據,也是這樣的。
- 如果出現性能下降,切記數據是最有可能出現問題的地方!
-
是否保證初始狀態的同一性?
- parameter、buffer初始狀態同一性
- optimizer初始狀態同一性
DistributedSampler機制
最后,我們額外介紹一下DDP的DistributedSampler機制。
不知道你有沒有好奇,為什么給dataloader加一個DistributedSampler,就可以無縫對接DDP模式呢?其實原理很簡單,就是給不同進程分配數據集的不重疊、不交叉部分。那么問題來了,每次epoch我們都會隨機shuffle數據集,那么,不同進程之間要怎么保持shuffle后數據集的一致性呢?DistributedSampler的實現方式是,不同進程會使用一個相同的隨機數種子,這樣shuffle出來的東西就能確保一致。
具體實現上,DistributedSampler使用當前epoch作為隨機數種子,從而使得不同epoch下有不同的shuffle結果。所以,記得每次epoch開始前都要調用一下sampler的set_epoch
方法,這樣才能讓數據集隨機shuffle起來。
下面看一下DistributedSampler的核心源代碼:
代碼
# line 56def __iter__(self):# deterministically shuffle based on epochg = torch.Generator()g.manual_seed(self.epoch)if self.shuffle:indices = torch.randperm(len(self.dataset), generator=g).tolist()else:indices = list(range(len(self.dataset)))# add extra samples to make it evenly divisibleindices += indices[:(self.total_size - len(indices))]assert len(indices) == self.total_size# subsampleindices = indices[self.rank:self.total_size:self.num_replicas]assert len(indices) == self.num_samplesreturn iter(indices)
# line 79def set_epoch(self, epoch):self.epoch = epoch
總結
既然看到了這里,不妨點個贊/喜歡吧!
在本篇中,我們詳細介紹了DDP的原理和底層代碼實現。如果你能完全理解,相信你對深度學習中的并行加速、分布式計算會有更深入的認識。知己知彼,方能百戰不殆,對DDP有透徹的了解,才能讓你的模型以最快的速度跑起來,加快實驗迭代速度,極大地提高產出!
但是,正所謂理論聯系實踐,如果只掌握理論而不進行實踐,無疑是紙上談兵。代碼的有趣地方也是在這里,就算代碼設計得再好,理解得再透徹,實際編程過程中,你還是會發現遍地是坑。筆者有幸踩過一些坑,跟大家分享一下。請各位有志之士閱讀DDP系列第三篇:實戰!