機器學習:系統設計與實現 分布式訓練

機器學習系統:設計與實現 分布式訓練

轉自:https://openmlsys.github.io/chapter_distributed_training/index.html

隨著機器學習的進一步發展,科學家們設計出更大型,更多功能的機器學習模型(例如說,GPT-3)。這種模型含有大量參數,需要復雜的計算以及處理海量的數據。單個機器上有限的資源無法滿足訓練大型機器學習模型的需求。因此,我們需要設計分布式訓練系統,從而將一個機器學習模型任務拆分成多個子任務,并將子任務分發給多個計算節點,解決資源瓶頸。

在本章節中,我們會引入分布式機器學習系統的相關概念,設計挑戰,系統實現和實例研究。我們會首先討論分布式訓練系統的定義,設計動機和好處。進一步,我們會討論常見的分布式訓練方法:數據并行,模型并行和流水線并行。在實際中,這些分布式訓練方法會被參數服務器(Parameter Servers),或者是集合通信庫(Collective Communication Libraries)實現。不同的系統實現具有各自的優勢和劣勢。我們會用大型預訓練模型和大型深度學習推薦系統作為實例來探討不同系統實現的利與弊。

本章的學習目標包括:

  • 掌握分布式訓練相關系統組件的定義,設計動機和好處
  • 掌握常見的分布式訓練方法:數據并行,模型并行和流水線并行
  • 掌握常見的分布式訓練框架實現:參數服務器和集合通信
  • 理解常見分布式訓練的實例,和采用不同實現方法的利弊。

1 系統概述

1.1 設計動機

接下來,我們詳細討論分布式訓練系統的設計動機。

在這里插入圖片描述

圖11.1.1 對比機器學習模型參數量增長和計算硬件的算力增長

1.1.1 算力不足

單處理器的算力不足是促使人們設計分布式訓練系統的一個主要原因。一個處理器的算力可以用每秒鐘浮點數操作(Floating Point Operations Per Second,FLOPS)來衡量。如圖11.1.1所示,根據摩爾定律(Moore’s Law),中央處理器的算力每18個月增長2倍。雖然計算加速卡,如GPU和Tensor Processing Unit(TPU),針對機器學習計算(如矩陣相乘)提供了大量的算力。這些加速卡的發展最終也受限于摩爾定律,增長速度也停留在每18個月2倍。而與此同時,機器學習模型正在快速發展。短短數年,我們從僅能識別有限物體的AlexNet模型,一路發展到在復雜任務中打敗人類的AlphaStar。這期間,模型對于算力需求每18個月增長了35倍。解決處理器性能和算力需求之間的鴻溝的關鍵就在于利用分布式計算。通過大型數據中心和云計算設施,我們可以快速獲取大量的處理器。通過分布式訓練系統有效管理這些處理器,我們可以實現算力的快速增長,從而持續滿足模型的需求。

1.1.2 訓練內存不足

在訓練機器學習模型的過程中,訓練系統需要在內存中存儲大量數據。這些數據包括:模型參數(Parameters)以及訓練和更新這些參數所產生的中間數據,如特征圖(Feature Map)和梯度(Gradients)。假設一個深度神經網絡模型具有10億的參數,所有特征圖共有20億參數,每個參數都由一個32位浮點數表達,而更新這些參數至少還需要產生與特征圖和參數等量的梯度。由于一個32位浮點數需要4個字節(Byte)的內存來存儲,那么訓練這個10億規模的模型就需要至少24GB(24×10924×10^924×109 Byte)的內存。現在,隨著大型預訓練模型的崛起,一個深度神經網絡(如GPT-3)會擁有超過千億的參數。假設我們依然使用32位浮點數來存儲參數,激活值和梯度,那么訓練這個模型就至少需要1.2TB的內存。而如今的訓練加速卡(如NVIDIA A100)僅能提供最高80GB的內存。單卡內存空間的增長受到硬件規格,散熱和成本等諸多因素,難以進一步快速增長。因此,我們需要分布式訓練系統來同時使用數百個訓練加速卡,從而為千億級別的模型提供所需的TB級別的內存。

1.2 分布式訓練架構

受限于單節點的有限算力,內存和存儲資源,人們把關注投向了日益成熟的云計算數據中心。一個數據中心管理著數十萬個計算服務器。隨著數據中心的全球部署,人們可以很方便地獲得數百個服務器。這些服務器可以通過分布式訓練系統來協調和管理,解決訓練大型機器學習模型過程遇到的算力,內存和存儲不足,從而完成訓練過程的加速。

在這里插入圖片描述

圖11.1.2 單節點計算和多節點分布式計算

在設計分布式訓練系統的過程中,我們需要找出有資源瓶頸的計算任務,根據計算任務的特點,將其拆分成多個子任務,然后將子任務分發給多個節點(可以是服務器,機器,或者是加速卡)并行完成。 圖11.1.2描述了如何將單節點執行轉換為分布式執行的一般過程。在機器學習系統中,一個計算任務往往會有一組數據(例如訓練樣本)或者任務(例如算子)作為輸入,利用一個計算節點(例如GPU)生成一組輸出(例如梯度)。假如單節點成為瓶頸,我們可以利用分布式計算進行加速。分布式執行一般具有三個步驟:第一步,我們需要將輸入進行切分。第二步,每個輸入部分會分發給不同的計算節點,實現并行計算。第三步,每個計算節點的輸出,進一步合并,最終得到和單節點等價的計算結果。這種切分-并行-合并的模式,本質上實現了分而治之算法(Divide-and-Conquer Algorithm)的設計思想:由于每個計算節點只需要負責更小的子任務,因此其可以更快速的完成計算,最終形成對整個計算過程的加速。

1.3 用戶益處

通過使用分布式訓練系統,我們往往可以獲得以下幾個關鍵好處:

  • 提升系統性能:使用分布式訓練,往往可以帶來訓練性能的巨大提升。一個分布式訓練系統往往用以下這個指標來衡量性能:到達目標精度所需的時間(time-to-accuracy)。這個指標由兩個參數決定:一個數據周期所需的完成時間,以及一個數據周期模型所提升的精度。通過持續增加并行處理節點,我們可以將數據周期的完成時間不斷變短,最終顯著減少到達目標精度所需的時間。
  • 經濟性(Economy):使用分布式訓練,我們也可以進一步減少訓練及其模型所需的成本。受限于單節點散熱的上限,單節點的算力越高,其所需的散熱硬件成本也更高。因此,在提供同等的算力的條件下,組合多個計算節點是一個更加經濟高效的方式。這促使云服務商(如亞馬遜和微軟等)需要更加注重給用戶提供成本高效的分布式機器學習系統。
  • 抵御硬件故障:分布式訓練系統同時能有效提升抵御硬件故障的能力。機器學習訓練集群往往由商用硬件(Commodity Hardware)組成,這類硬件(例如說,磁盤和網卡)運行一定周期就會產生故障。而僅使用單個硬件進行訓練的話,那么一個硬件的故障就會造成整個訓練的任務的失敗。通過將這個訓練任務由多個硬件共同完成,即使一個硬件故障了,我們也可以通過將這個硬件上相應的計算子任務轉移給其余硬件,繼續完成訓練,從而避免訓練任務的失敗。

2 分布式訓練方法

我們會討論分布式訓練系統實現的常用并行方法。我們首先給出并行方法的設計目標以及分類。然后,我們會詳細描述各個并行方法。

2.1 概述

在這里插入圖片描述

圖11.2.1 單節點訓練系統

分布式訓練系統的設計目標是:將單節點訓練系統轉化成等價的并行訓練系統,從而在不影響模型精度的條件下完成訓練過程的加速。一個單節點訓練系統往往如 圖11.2.1 所示。一個訓練過程會由多個數據小批次(mini-batch)完成。在圖中,一個數據小批次被標示為數據。訓練系統會利用數據小批次來生成梯度,提升模型精度。這個過程由一個訓練程序實現。在實際中,這個程序往往實現了一個多層神經網絡的執行過程。該神經網絡的執行由一個計算圖(Computational Graph)表達。這個圖有多個相互連接的算子(Operator),每個算子會擁有計算參數。每個算子往往會實現一個神經網絡層(Neural Network Layer),而參數則代表了這個層在訓練中所更新的的權重(Weights)。

為了更新參數,計算圖的執行會分為前向傳播和反向傳播兩個階段。前向傳播的第一步會將數據讀入第一個算子,該算子會根據當前的參數,計算出傳播給下一個算子的數據。算子依次重復這個前向傳播的過程(算子1 -> 算子2 -> 算子3),直到最后一個算子結束。最后的算子隨之馬上開始反向傳播。反向傳播中,每個算子依次計算出梯度(梯度3 -> 梯度2 -> 梯度1),并利用梯度更新本地的參數。反向傳播最終在第一個算子結束。反向傳播的結束也標志本次數據小批次的結束,系統隨之讀取下一個小批次,繼續更新模型。

單數據多數據
單程序單程序單數據:單點執行單程序多數據:數據并行
多程序多程序單數據:模型并行多程序多數據:混合并行
Table: 分布式訓練方法分類

給定一個單節點訓練系統,人們會對數據程序分區(Partition),從而完成并行加速。 圖11.2.1總結了不同的切分方法。單節點訓練系統可以被歸類于單程序單數據模式。而假如用戶希望使用更多的設備來實現并行計算,他們首先可以選擇對數據進行分區,并將同一個程序復制到多個設備上并行執行。這種方式是單程序多數據模式,常被稱為數據并行(Data Parallelism)。另一種并行方式是對程序進行分區:程序的算子會被分發給多個設備按照依次完成。這種模式是多程序單數據模式,常被稱為模型并行(Model Parallelism)。當訓練超大型智能模型時,開發人們往往要同時對數據和程序進行切分,從而實現最高程度的并行。這種模式是多程序多數據模式,常被稱為混合并行(Hybrid Parallelism)。

接下來,我們詳細講解各種并行方法的執行過程。

2.2 數據并行

在這里插入圖片描述

圖11.2.2 數據并行訓練系統

數據并行往往可以解決單節點的算力不足。這種并行方式在人工智能框架中最為常見,具體實現包括:TensorFlow DistributedStrategy,PyTorch Distributed,Horovod DistributedOptimizer 等。在一個數據并行系統中,假設用戶給定一個訓練批大小 NNN,并且希望使用M個并行設備來加速訓練。那么,該訓練批大小會被分為 MMM 個分區,每個設備會分配到 N/MN/MN/M 個訓練樣本。這些設備共享一個訓練程序的副本,在不同數據分區上獨立執行,計算梯度。不同的設備(假設設備編號為i)會根據本地的訓練樣本估計出梯度 GiG_iGi?。為了確保訓練程序參數的一致性,本地梯度 GiG_iGi? 需要聚合,計算出平均梯度 (∑i=1NGi)/N(\sum_{i=1}^NG_i)/N(i=1N?Gi?)/N。最終,訓練程序利用平均梯度修正模型參數,完成小批量的訓練。

圖11.2.2 展示了2個設備構成的數據并行例子。假設用戶給定的批大小(Batch Size)是64,那么每個設備會分配到32個訓練樣本,并且具有相同的神經網絡參數(程序副本)。本地的訓練樣本會依次通過這個程序副本中的算子,完成前向傳播和反向傳播。在反向傳播的過程中,程序副本會生成局部梯度。不同設備上對應的局部梯度(如設備1和設備2上各自的梯度1)會進行聚合,從而計算平均梯度。這個聚合的過程往往由集合通信庫(Collective Communication)的 Allreduce 操作來完成。

2.3 模型并行

在這里插入圖片描述

圖11.2.3 模型并行系統:算子內并行

模型并行往往用于解決單節點的內存不足問題。一個常見的內存不足場景是模型中含有大型算子,例如說深度神經網絡中需要計算大量分類的全連接層(Fully Connected Layer)。完成這種大型算子計算所需的內存可能超過單設備的內存容量。那么我們需要對這個大型算子進行切分。假設這個算子具有 PPP 個參數,而我們擁有 NNN 個設備,那么我們可以將 PPP 個參數平均分配給 NNN 個設備(每個設備分配 P/NP/NP/N 個參數),從而讓每個設備負責更少的計算量,能夠在內存容量的限制下完成前向傳播和反向傳播中所需的計算。這種切分方式是模型并行的應用,被稱為算子內并行(Intra-operator Parallelism)。

圖11.2.3給出了一個由2個設備實現的算子內并行的例子。在這個例子中,假設一個神經網絡具有2個算子,算子1的計算(包含正向和反向傳播)需要預留16G的內存,算子2的計算需要預留1G的內存。而本例中的設備最多可以提供10G的內存。為了完成這個神經網絡的訓練,我們需要對算子1實現并行。具體做法是,將算子1的參數平均分區,設備1和設備2各負責其中部分算子1的參數。由于設備1和設備2的參數不同,因此它們各自負責程序分區1和程序分區2。在訓練這個神經網絡的過程中,數據(小批量)會首先傳給算子1。由于算子1的參數分別由2個設備負責,因此數據會被廣播給這2個設備。不同設備根據本地的參數分區完成前向計算,生成的本地計算結果需要進一步合并(Combine),發送給下游的算子2。在反向傳播中,算子2的數據會被廣播給設備1和設備2,這些設備根據本地的算子1分區各自完成局部的反向計算。計算結果進一步合并傳播回數據,最終完成反向傳播。

另一種內存不足的場景是:模型的總內存需求超過了單設備的內存容量。在這種場景下,假如我們總共有 NNN 個算子和 MMM 個設備,我們可以將算子平攤給這 MMM 個設備,讓每個設備僅需負責 N/MN/MN/M 個算子的前向和反向計算,降低設備的內存開銷。這種并行方式是模型并行的另一種應用,被稱為算子間并行(Inter-operator Parallelism)。

在這里插入圖片描述

圖11.2.4 模型并行系統:算子間并行

圖11.2.4 給出了一個由2個設備實現的算子間并行的例子。在這個例子中,假設一個神經網絡具有2個算子,算子1和算子2各自需要10G的內存完成計算,則模型總共需要20G的內存。而每個設備僅能提供10G內存。在這個例子中,用戶可以把算子1放置在設備1上,算子2放置在設備2上。在前向傳播中,算子1的輸出會被發送(Send)給下游的設備2。設備2接收(Receive)來自上游的數據,完成算子2的前向計算。在反向傳播中,設備2將算子2的反向計算結果發送給設備1。設備1完成算子1的反向計算,完成本次訓練。

2.4 混合并行

在這里插入圖片描述

圖11.2.5 混合并行系統

在訓練大型人工智能模型中,我們往往會同時面對算力不足和內存不足。因此,我們需要混合使用數據并行和模型并行,這種方法被稱為混合并行。 圖11.2.5 提供了一個由4個設備實現的混合并行的例子。在這個例子中,我們首先實現算子間并行來解決訓練程序內存開銷過大的問題:該訓練程序的算子1和算子2被分攤到了設備1和設備2上。進一步,我們通過數據并行來添加3和設備4,提升系統算力。為了達到這一點,我們對訓練數據進行分區(數據分區1和數據分區2),并將模型(算子1和算子2)分配復制到設備3和設備4上生成可以并行執行的程序副本。在前向計算的過程中,設備1和設備3上的算子1副本同時開始,計算結果分別發送(Send)給設備2和設備4完成算子2副本的計算。在反向計算中,設備2和設備4同時開始計算梯度,本地梯度通過Allreduce進行平均。反向計算傳遞到設備1和設備3上的算子1副本結束。

3 流水線并行

在數據并行和模型并行以外,流水線并行是另一種常用的并行加速方法。流水線并行往往被應用在大型模型并行系統中。這種系統通過算子內并行和算子間并行解決單設備內存不足的問題。然而,當這類系統的運行中,計算圖中的下游設備需要長期持續處于空閑狀態,等待上游設備的計算完成,才可以開始計算,這極大降低了設備的平均使用率。這種現象被稱為模型并行空洞(Model Parallelism Bubble)。

在這里插入圖片描述

圖11.3.1 流水線并行系統。Fi,j表示第j個微批量的第i個前向stage,Bi,j表示第j個微批量的第i個反向stage。

為了減少空洞,提升設備使用率,我們可以在模型并行系統中構建流水線。這種做法的核心想法是將一個數據小批量(Data Mini-batch)劃分為多個微批量(Micro-batch)。假設一個數據小批量有 DDD 個訓練數據,這個小批量可以被劃分為 MMM 個微批量,那么微批量的大小就是 D/MD/MD/M。每個微批量相應進入訓練系統,完成前向傳播(Forwards propagation)和反向傳播(Backwards propagation),計算出梯度。每個微批量對應的梯度將會緩存,等到全部微批量完成,緩存的梯度會被加和,算出平均梯度,更新模型參數。

圖11.3.1 進一步給出了一個流水線并行的執行例子。在本例中,模型參數需要切分給4個設備存儲。為了充分利用起來這4個設備,我們將小批量切分為2個微批量。當設備1完成第一個微批量的前向傳播后(表示為F0,0F_{0,0}F0,0?)后,他會將中間結果發送給設備2,觸發響應的前向傳播任務(表示為 F1,0F_{1,0}F1,0?)。與此同時,設備1也可以開始第二個微批量的前向傳播任務(表示為 F0,1F_{0,1}F0,1?)。前向傳播會在流水線的最后一個設備–設備3–完成。系統于是開始反向傳播。設備4開始第1個微批量的反向傳播任務(表示為 B3,0B_{3,0}B3,0?)。該任務完成后的中間結果會被發送給設備3,觸發響應的反向傳播任務(表示為 B2,0B_{2,0}B2,0?)。與此同時,設備4會緩存好對應第1個微批量的梯度,接下來開始第2個微批量計算(表示為 B3,1B_{3,1}B3,1?)。當設備4完成了全部的反向傳播計算后,他會將本地緩存的梯度進行相加,并且除以微批量數量,計算出平均梯度,該梯度用于更新模型參數。

流水線并行的關鍵因素是流水線泡沫(Bubble)。當設備完成前向傳播后,必須等到全部反向傳播開始,在此期間設備會處于空閑狀態。在 圖11.3.1 中,我們可以看到設備1在完成2個前向傳播任務后,要等很多時間才能開始2個反向傳播任務。這其中的等待時間即被稱為泡沫。為了減少設備的等待時間,一種常見的做法是盡可能的增加微批量的數量,從而讓反向傳播盡可能早的開始。然而,使用非常小的微批量大小,可能會造成加速器無法被充分利用。因此最優的微批量大小是多種因素的折中。其中最核心的因素是流水線泡沫的大小和加速器的計算能力。

4 集合通信

作為并行計算中的一個重要概念,集合通信算子經常會被用來構建單程序流/多數據流編程環境(single program-multiple data, SPMD)中的許多交互模式。近年來,該領域無論是在對不同硬件架構的支持還是算法性能的發展上都成果頗豐,而因SPMD在大型深度學習系統中與數據并行的深厚聯系,這些框架也在其中受益匪淺。因此,相比點對點 (Point-to-Point, p2p) 通信,我們有更大的興趣去探討如何高效地在數據中心(Data Centers)中實現這些集合通信范式。首先,我們會介紹一些集合通信中常見的算子,一個經典的利用All算法解決分布式訓練系統中網絡瓶頸的示例,探討該算法在不同網絡拓撲結構下的差異性以及一些重要指標(算法帶寬,總線帶寬)的計算方法,最后簡略介紹現有機器學習系統對不同集合通信算法的支持。

4.1 常見算子

在分布式內存模型(Distributed Memory Model)中,一些常見的進程間數據交互模式由硬件支持和并行算法的內在性質而涌現。因此,主流的并行計算架構標準(例如MPI)和機器學習系統的底層集合通信庫(例如gloo,NCCL)通常會支持數個經典的算子并針對其做優化,一般包括Broadcast,Reduce,AllGather,ReduceScatter 和 AllReduce。在一個基于 [Sanders2019-cq] 的簡化理論模型下,可以對這些算子的特性進行簡單的介紹并探討具體的實現方法和計算開銷。

4.1.1 基本定義

首先,假定一個簡化后的分布式內存模型:存在p個隨機存取存儲器(Random Access Machines, RAM)作為基礎的處理單元(Processing Element, PE),并由一個網絡來連接所有的機器。每個處理單元有自己的獨立內存,并且所有的處理單元間的通信都通過網絡傳輸。同時,每個處理單元都知道自己的編號i,通常在1到p之間。 網絡之間的通信在最底層的情況下均為點對點的全雙工通信(full-duplex point-to-point communication):

  • 每次通信有且僅有一個發送者(sender)和一個接收者(receiver)。
  • 在某個特定時刻,每個處理單元僅能至多發送或接收一個信息。但是,在網絡中可以同時傳輸多個信息。每個處理單元也可以在發送一個信息的同時接收一個信息。
  • 傳輸一個長度為l的信息會花費a+bl的時間,其中a代表延遲(latency),即單位信息通過網絡從一個處理單元出發到達另一個處理單元所需的時間;b代表傳輸延遲(transmission delay),即把單位信息從處理單元中放到網絡通信單元所需的時間。前者的大小一般取決于兩個處理單元間的物理距離(同一個機架,同一個數據中心,橫跨全球等),而后者的大小一般取決于通信網絡的帶寬。在這個模型下,假定所有處理單元之間的a和b均為恒定值。
  • 通信可以指定一個發送者或者一個接收者:由于每個存儲單元都有相對應的編號,我們可以定義兩個函數send(i,l) 和receive(i,l)。其中send函數會把信息l從當前的處理單元發送至編號為i的處理單元,而receive函數會從編號為i的處理單元接收信息l。在調用send函數時,處理單元必須同時調用receive來保證編號為i的處理單元收到了該信息。因此,也可以說send和receive 同步(synchronize)了發送者和接收者。
  • 作為拓展,我們也可以定義上述函數的一個變種:i = send(m) 和 i = receive(m),即在傳輸信息時不規定發送者或接收者。這種情況下,網絡中的任意一個處理單元都可以發送或接收該信息,而最終完成傳輸的處理單元的編號會作為函數的返回值。
  • 雖然在現實生活中錯誤(fault)時常發生,但是在這個模型里,暫不考慮通信丟失(dropped message)和通信毀壞(corrupted message)的情況。

分布式內存模型中對于通信同步和傳輸的結合使得在這個理論模型下開發的代碼更好維護。額外的,由于這個框架下提出的算法往往會產生一些很有規律的,包含了網絡中所有處理單元的交互模式,通常會在最基礎的點對點通信上維護一個算子庫,用來歸納總結這些高效且更易于理解的算法,我們將其稱為集合通信算子。

4.1.2 Broadcast

在SPMD中,最常見的一個交互模式經常是把一個位于處理單元i的信息發送到全部其他的節點,用于同步某種全局的變量或者參數。為此Broadcast算子可以定義為從編號為 iii 的處理單元發送長度為 lll 的信息給全部剩余的 p?1p?1p?1 個處理單元。在這里,一種簡單的方法是在一個循環中使用 p?1p?1p?1 次send/receive來實現Broadcast,但這并不能很好地利用通信可并行化的特質(該算法只有 (a+bl)(p?1)(a+bl)(p?1)(a+bl)(p?1) 的線性時間復雜度)。為此,我們可以利用分治思想(divide-and-conquer)來對上述算法進行優化。假設所有的處理單元可以重新對編號進行排列,使得Broadcast的發送者為編號為 111 的處理單元。同時,為了簡化計算過程,假設對于某個自然數 nnnp=2np=2^np=2n。 現在,我們可以通過從1 向 p/2p/2p/2 發送一次信息來把問題轉化為兩個大小為 p/2p/2p/2 的子問題:編號為1的處理單元對1到 p/2?1p/2?1p/2?1 的Broadcast,以及編號為 p/2p/2p/2 的處理單元對 p/2p/2p/2ppp 的Broadcast。我們便可以通過在這兩個子問題上進行遞歸來完成這個算法,并把臨界條件定義為編號為i的處理單元在 [i,i][i,i][i,i] 這個區間里的Broadcast。此時,由于i本身已經擁有該信息,我們不需要做任何操作便可直接完成Broadcast。這個優化后的算法有 (a+bl)log?p(a+bl)log?p(a+bl)log?p 時間復雜度,因為在算法的每一階段 ttt,我們有 2t2^t2t 個計算單元在并行運行Broadcast算子。同時,算法一定會在 log?plog?plog?p 步之內結束。

4.1.3 Reduce

除了Broadcast,另一個常見的交互模式為程序試圖概述在部分處理單元上得到的中間值。這時候,對于一個符合結合律(associative property)的算子 fff,我們可以定義Reduce算子,即將所有處理單元上的某個值兩兩配對重復應用該算子,并把最終結果儲存在編號為 iii 的計算單元上。常見的應用于Reduce中的算子有加和,乘積,最大值,最小值和平均值等。一個簡易的Reduce的優化實現同樣可以用分治思想來實現,即把 111p/2?1p/2?1p/2?1 的Reduce結果存到編號為 111 的處理單元中,然后把 p/2p/2p/2ppp 的Reduce結果存到 p/2p/2p/2 上。最后,我們可以把 p/2p/2p/2 的結果發送至 111,執行 fff,并把最后的結果存至iii。假設f的運行時間復雜度為常數并不改變其輸出信息的長度 lll,Reduce的時間復雜度仍然為(a+bl)log?p(a+bl)log?p(a+bl)log?p

4.1.4 AllReduce

AllReduce算子為Reduce的一個變種,即將f的結果存至所有處理單元上。在這里,我們給出一個簡化版的 AllReduce 實現方式,即首先把最終值通過Reduce存到編號為 111 的處理單元,再將該值通過Broadcast廣播到所有的處理單元上。在兩個子算子都使用上述的算法情況下,AllReduce的時間復雜度仍為 (a+bl)logp(a+bl)logp(a+bl)logp

4.1.5 Gather

Gather算子嘗試將每個處理單元上的信息全部聚合到編號為 iii 的處理單元上,通常用于組裝散落在每個處理單元上的獨立信息。在聚合函數符合結合律的情況下,可以通過將其設為Reduce算子中的 fff 來實現Gather算子。但是,在這種情況下,無論是基于鏈表還是數組的實現,在每一步的Reduce子問題中f的時間復雜度或輸出長度 lll 都發生了改變。因此,Gather并不具有先前Reduce或者Broadcast的時間復雜度,而是 alog?p+(p?1)blalog?p+(p?1)blalog?p+(p?1)bl。這是因為在算法的每一階段 ttt,我們傳輸的信息長度為 l2tl2^tl2t

4.1.6 AllGather

相比起Gather,AllGather 算子會把聚合的結果存到所有的處理單元上。在這里,一個簡單的做法是使用Gather和Broadcast把聚合結果先存到編號為1的處理單元中,再將其廣播到剩余的處理單元上。這會產生一個 alog?p+(p?1)bl+(a+plb)log?palog?p+(p?1)bl+(a+plb)log?palog?p+(p?1)bl+(a+plb)log?p 的時間復雜度,因為在Broadcast時如果忽略鏈表/數組實現所帶來的額外空間開銷,每次通信的長度為 plplpl 而不是 lll 。簡化后,我們得到了一個 alog?p+plblog?palog?p+plblog?palog?p+plblog?p 的時間復雜度。在一個基于超立方體的算法下,我們可以將其進一步優化到和Gather一樣的 alog?p+(p?1)blalog?p+(p?1)blalog?p+(p?1)bl (:cite:Sanders2019-cq),然而由于篇幅問題便不再贅述。

4.1.7 Scatter

Scatter算子可以被視作Gather的逆運算:把一個存在于編號為 iii 的處理單元上,長度為 ppp(信息長度為 plplpl )的鏈式數據結構L中的值分散到每個處理單元上,使得編號為i的處理單元會得到 L[i]L[i]L[i] 。我們可以通過模仿Gather算法來設計一個簡易的Scatter實現:每一步的運算中,與其是聚集一半處理單元的結果,我們把現在的子鏈繼續對半切分,并把前半段和后半段作為子問題進行遞歸。這時候,在算法的每一階段 ttt,我們傳輸的信息長度為 l2(m?t)l2^{(m?t)}l2(m?t) ,其中 mmm 是算法總共運行的步驟,不會超過 log?plog?plog?p (見Broadcast)。最終,Scatter算子的檢疫實現和Gather一樣都有 alog?p+(p?1)blalog?p+(p?1)blalog?p+(p?1)bl 時間復雜度。在機器學習系統中,相比于鏈式數據結構,Scatter經常同時被用于可切分的數據結構,例如張量(tensor)在一個維度上的 ppp 等分等。

4.1.8 ReduceScatter

ReduceScatter算子可以視為Reduce 和 Scatter算子的組合體,即對于每個處理單元上分別擁有的一個鏈式/可切分數據結構,在通過f 概述后再重新分散到各個單元中。雖然我們已經知道了Reduce 和Scatter 各自的時間復雜度,但是在對ReduceScatter做時間復雜度分析時需要注意兩部之間信息長度的變化:假設每個處理單元上的數據結構所需通信長度為 plplpl,第一階段的Reduce算法需要 (a+plb)log?p(a+plb)log?p(a+plb)log?p 時間復雜度。參照Scatter的分析,第二階段的算子則需要 alog?p+(p?1)blalog?p+(p?1)blalog?p+(p?1)bl 時間復雜度。綜合下來,ReduceScatter 需要 alog?p+plblog?palog?p+plblog?palog?p+plblog?p 的時間復雜度,和AllGather相同。同時,運行ReduceScatter 和 AllGather的效果等同于運行一次AllReduce。

在SPMD中,通常還有一些額外的集合通信算子,如Prefix Sum,Barrier,All-to-All等,但由于篇幅限制以及與機器學習系統的有限聯系,便不再贅述。最后,由于該模型下通信網絡的拓撲結構較為簡單,上文中呈現二叉樹形的遞歸樹也可以達到很好的實際運行速度。所有關于時間復雜度的分析也是基于這些相對簡化的假設情況。后文中,我們將會用AllReduce舉例介紹如何在更復雜的拓撲結構下設計不同的集合通信算子變種,并在時間復雜度之外去關注實際的通信量和運算時間。

4.2 在數據中心的梯度計算

接下來,我們將用一個示例來闡釋集合通信在機器學習系統中發揮的重要作用。

在這里插入圖片描述

圖11.4.1 數據中心

圖11.4.1 描述了一個典型的用于深度學習模型訓練的數據中心。數據中心中的訓練服務器一般會有多個設備。如需增加服務器,我們會將多個訓練服務器放置在一個機柜(Rack)上,同時接入一個架頂交換機(Top of Rack Switch)將其連接。在現有機柜滿載的情況下,可以通過在架頂交換機間增加骨干交換機(Spine Switch)來接入新的機柜。通過這種方式,可以在數據中心內不斷增加服務器,從而為神經網絡的訓練提供海量的算力和內存。目前的商用數據中心可擁有近百萬臺服務器。

在數據中心中訓練大型神經網絡的首要挑戰是如何高效計算大量的平均梯度。假設給定一個千億級別參數的神經網絡(比如OpenAI 發布的大型語言模型GPT-3 [gpt-3] 有將近1750億參數),如果用32位浮點數來表達每一個參數,那么每一步訓練中,一個數據并行模式下的模型副本(Model Replica)則需要生成700GB的本地梯度數據(即 175G × 4 bytes = 700GB)。假如有3個模型副本,那么至少需要傳輸1.4TB(即,700GB × (3?1))的本地梯度數據(因為對于 NNN 個副本,只需傳送其中的 N?1N?1N?1 個副本來完成計算)。當平均梯度計算完成后,需要進一步將其廣播(Broadcast)到全部的模型副本(即1.4TB的數據)并更新其中的本地參數,從而確保模型副本不會偏離(Diverge)主模型中的參數。

當前的數據中心一般使用以太網(Ethernet)構建不同機柜之間的網絡。主流的商用以太網鏈路帶寬一般在10Gbps到25Gbps之間。利用以太網傳輸海量梯度會產生嚴重的傳輸延遲,從而降低模型訓練的速度。新型深度學習訓練集群(如英偉達的DGX系列機器)往往配置有更快的Inifiband。單個InfiniBand鏈路可以提供100Gbps或200Gbps的帶寬。即使擁有這種高速網絡,傳輸TB級別的本地梯度依然需要大量延遲(即使忽略網絡延遲,1TB的數據在200Gbps的鏈路上傳輸也需要至少40秒)。

為了避免通過機間網絡傳輸數據,現代深度學習服務器一般都會配備多個加速器(例如說,英偉達的DGX-3服務器會配備8個A100 GPU),而在一個服務器內的多個設備可以通過高速機內網絡互聯(如NVLink)。這種高速機內網絡可以提供高達400GBps的帶寬,從而讓傳輸TB級別的數據成為可能。然而,受限于單個服務器的散熱,成本和硬件等限制,通常無法在一個服務器內無限制的持續增加設備。因此,大型深度學習模型的訓練仍需要多個服務器共同完成。在計算平均梯度時,服務器需要同時借助機間網絡通信接口(以太網或InfiniBand)和機內通信接口(NVLink)。

4.3 基于AllReduce的梯度平均算法

我們將討論如何利用AllReduce算子來實現數據中心中的高效梯度平均。首先,參照前文的分析,可以考慮一種簡單的計算平均梯度的方法:在集群中分配一個設備來收集本地梯度,并在計算平均梯度后再將其廣播到全部的設備。這種做法易于實現,但是引入了兩個問題。首先,多臺設備同時給該聚合設備發送數據時,聚合設備會因嚴重的帶寬不足產生網絡擁塞。其次,單臺設備需要負擔大量的梯度平均計算,而受限于單臺設備上的有限算力,這種計算往往會受限于算力瓶頸。

圖11.4.2 AllReduce初始狀態和終止狀態

為了解決上述問題,可以引入AllReduce算子的Reduce-Broadcast實現來優化算法,其設計思路是:通過讓全部的節點參與到梯度的網絡通信和平均計算中,將巨大的網絡和算力開銷均攤給全部節點。這種做法可以解決先前單個梯度聚合節點的問題。假設有 MMM 個設備,每個設備存有一個模型副本,該模型由 NNN 個參數/梯度構成。那么按照AllReduce算子的要求,需要先將全部的參數按照設備數量切分成 MMM 個分區(Partition),使得每個分區具有 N/MN/MN/M 個參數。我們首先給出這個算法的初始和終止狀態。如圖11.4.2 所示,該例子含有3個設備。在每個設備有一個模型副本的情況下,這個副本有3個參數。那么按照AllReduce的分區方法,參數會被劃分成3個分區(3個設備),而每一個分區則有1個參數(N/MN/MN/MNNN 代表3個參數,MMM 代表3個設備)。在這個例子中,假定設備1擁有參數2,4,6,設備2擁有參數1,2,3,設備3擁有參數4,8,12,那么在使用AllReduce算子進行計算過后,全部的設備都將擁有梯度相加后的結果7,14,21,其中分區1的結果7是由3個設備中分區1的初始結果相加而成(7 = 1 + 2 + 4)。為了計算平均梯度,每個設備只需要在最后將梯度之和除以設備數量即可(分區1的最終結果為7除以3)。

圖11.4.3 AllReduce算法的過程

AllReduce算子會把梯度的計算拆分成 M?1M?1M?1 個Reduce算子和 M?1M?1M?1 個Broadcast算子(其中 MMM 是節點的數量)。其中,Reduce算子用于計算出梯度的和(Summation),Broadcast算子用于把梯度之和廣播給全部的節點。為了說明這些算子的執行過程,可以參照圖11.4.3 。AllReduce算子由Reduce算子開始,在第一個Reduce算子中,AllReduce算子會對全部節點進行配對(Pairing),讓他們共同完成梯度相加的操作。在圖11.4.3 的第一個Reduce算子中,設備1和設備2進行了配對共同對分區1的數據相加。其中,設備2把本地的梯度數據1發送給設備1,設備將接收到1和本地的分區1內的梯度數據:2進行相加,計算出中間(intermediate)梯度相加的結果:3。與此同時,設備1和設備3進行配對,共同完成對分區3的數據相加。而設備3和設備2進行配對,共同完成對于分區2的數據相加。

在上述Reduce的算子中,梯度的計算實現了以下幾個特性:

  • 網絡優化: 全部設備都同時在接收和發送數據,利用起了每個設備的入口(Ingress)和出口(Egress)帶寬。因此AllReduce過程中可利用的帶寬是 M×BM\times BM×B,其中M是節點數量,B是節點帶寬,從而讓系統實現網絡帶寬上的可擴展性。
  • 算力優化: 全部設備的處理器都參與了梯度相加的計算。因此AllReduce過程中可利用的處理器是 M×PM\times PM×P,其中M是節點數量,PPP 是處理器數量,從而讓系統實現計算上的可擴展性。
  • 負載均衡: 由于數據分區是平均劃分的,因此每次設備分攤到的通訊和計算開銷是相等的。

在接下來的Reduce算子中,AllReduce算法會對不同數據分區選擇另外的配對方法。例如說,在圖11.4.3 的第二個Reduce算子中,AllReduce算法會將:設備1和設備3進行配對,負責分區1的數據相加。將設備1和設備2進行配對,負責分區2。將設備2和設備3進行配對,負責分區3。在一個3個節點的AllReduce集群里,在2個Reduce算子完成后,我們就計算出了每個分區的數據相加結果(分區1的結果7此時在設備3上,分區2的結果14此時在設備1上,分區3的結果21此時在設備2上)。

接下來,AllReduce算法將進入Broadcast階段。這一階段的過程和Reduce算子類似,核心區別是節點進行配對后,他們不再進行數據相加,而是將Reduce的計算結果進行廣播。在圖11.4.3 中的第一個Broadcast算子中,設備1會將分區2的結果14直接寫入設備3的分區2中。設備2會講分區3的結果21直接寫入設備1中。設備3會將分區1的結果直接寫入設備2中。在一個3個節點的AllReduce集群中,我們會重復2次Broadcast算子來將每個分區的Reduce結果告知全部的節點。

4.4 帶寬計算

在討論集合通信算子的性能時,人們經常會使用一些數值化指標去量化不同的算法實現,其中一個重要概念為帶寬(Bandwidth)。在文獻(:cite:nvidia-nccl)中,通常有兩種主流的對帶寬的計算方法,分別為算法帶寬(Algorithm Bandwidth)與總線帶寬(Bus Bandwidth)。

4.4.1 算法帶寬

雖然算法帶寬的計算方法既簡單又高效,但很難將其拓展至對于集合通信算子的帶寬計算。這是因為,取決于具體算子和算法實現的不同,一個集合通信算子在執行過程中測得的算法帶寬往往會遠小于硬件本身的最高帶寬。在實際運行相應的測試中,經常能觀測到隨著處理單元增加,算法帶寬呈下降趨勢。為了解決這一問題,NCCL提出了總線帶寬這一概念,通過對于每個集合通信算子的分析來對測得的算法帶寬乘以一個校正系數(correction factor),來減輕處理單元數量對于測量帶寬的影響并給出一個更貼近實際硬件表現的帶寬值。下面列出了一些常見算子的校正系數,以及背后的簡略推導。

  • AllReduce:2(p?1)/p2(p?1)/p2(p?1)/p 對于在處理單元 n1,n2…npn_1,n_2\dots n_pn1?,n2?np? 上的值 v1,v2…vpv_1,v_2 \dots v_pv1?,v2?vp? 計算 v1(op)v2…(op)vpv_1(op)v_2\dots (op)v_pv1?(op)v2?(op)vp?(其中op為符合結合律的算子),再存回每個處理單元中。在不考慮實際實現算法和網絡拓撲的情況下,這個操作理論上只需要 2(p?1)2(p?1)2(p?1) 次數據傳輸,其中包含在每個處理單元上分開進行的 n?1n?1n?1 次 op的運算,以及最后 nnn 次最終數據值的廣播,再減去第一個處理單元的運算和最后一個處理單元的廣播的影響。假設每個處理單元對于外界所有信息處理的帶寬為 BBB,我們可以得出對于 SSS 個在不同處理單元上的數據運行AllReduce是能得到的最優情況下的運行時間:t=(2S(p?1))/(pB)t=(2S(p?1))/(pB)t=(2S(p?1))/(pB),進行簡化后可得 B=(S/t)(2(p?1)/p)=b(2(p?1)/p)B=(S/t)(2(p?1)/p)=b(2(p?1)/p)B=(S/t)(2(p?1)/p)=b(2(p?1)/p)。這里的 2(p?1)/p2(p?1)/p2(p?1)/p 便是我們的校正系數。
  • ReduceScatter:(p?1)/p(p?1)/p(p?1)/p 對于每個處理單元來說,可以把ReduceScatter理解為只執行AllReduce中的聚合部分。對此,我們只需要考慮上文分析中的 n?1n?1n?1 次op的運算,整理后可得 B=(S/t)((p?1)/p)=b((p?1)/p)B=(S/t)((p?1)/p)=b((p?1)/p)B=(S/t)((p?1)/p)=b((p?1)/p)
  • AllGather:(p?1)/p(p?1)/p(p?1)/p 同理,對于每個處理單元來說,可以把AllGather理解為只執行AllReduce中的廣播部分。我們同理可得 B=(S/t)((p?1)/p)=b((p?1)/p)B=(S/t)((p?1)/p)=b((p?1)/p)B=(S/t)((p?1)/p)=b((p?1)/p)
  • Broadcast:1 與AllReduce不同的是,Broadcast中所有數據需要從算子本身的發送者發出。即使在上文的分治情況下,我們也需要等待所有子問題運行結束才能確保Broadcast算子本身的正確性。因此,在計算帶寬時瓶頸仍為發送者對于外界所有信息處理的帶寬,所以 B=S/tB=S/tB=S/t,即校正系數為1。
  • Reduce:1 同Broadcast,Reduce需要將所有數據送往算子的接收者,因此校正系數同樣為1。

由于Gather和Scatter的帶寬計算與實際聚合/分散時的數據結構相關性更高,故不給出特定的校正系數。

4.5 樣例分析

針對不同的集群性質,現代機器學習系統往往會靈活應用不同集合通信算子的組合來最大化通信效率。這里,我們提供了兩個具體的案例分析,分別為微軟的ZeRO 以及 OpenAI 的 DALL—E。

4.5.1 ZeRO

ZeRO (:cite:rajbhandari2020zero)是微軟提出的神經網絡優化器,可用于訓練千億級參數的神經網絡,也在實踐中成功訓練了當時世界上最大的語言模型(為高達170億參數的transformer)。在訓練這個級別的神經網絡時主要遇到的問題是巨量參數對于加速器內存的占用,其中包括優化器本身的參數,反向傳播時的梯度,以及模型參數本身。通過簡易的計算不難得出,170億參數的模型在32位浮點表示情況下會占用至少680GB的內存,遠超于現在內存最高的深度學習加速器A100 (最高內存80GB)。于是,我們需要考慮如何高效的把模型切成數份存儲在不同的加速器上,以及如何高效的通過使用集合通信算子來進行模型訓練和推理。ZeRO對此提出了多個優化方法,這里例舉了三個典型的例子: 1. 首先,可以發現在現代集群中,節點內部加速器的帶寬往往比節點之間的帶寬要大很多。這在某種程度上偏離了上文中的理論框架。為此,我們需要盡量減少節點間的通信,盡量保證大部分通信僅存在于節點內部的加速器之間。在觀察模型切分時,不難看出模型本身前饋和反向傳播時需要大量的在不同切片之間通信,相比下來不同模型拷貝之間的梯度聚合反而具有相對較少的通信量。針對這一特性,ZeRO選擇了將單一模型的全部切片存儲到同一節點內部,從而大大提高了訓練效率。 2. 進一步地,假設模型中的參數在層的細粒度上呈線性,便可將其從前到后分別存儲到不同加速其中。在前饋時,可以注意到某一層的計算僅依賴于其相鄰層的參數。對此,與其是手動設計點到點通信,我們可以對所有包含模型參數的加速器進行一次AllGather計算,用來提取每一層之后一層的參數,以及計算該層本身的激活值。為了節約內存,我們在AllGather結束后立即丟棄除了該層以外其他層的參數。 3. 同理,在反向傳播時我們只需要前一層的參數來計算本層的激活值和梯度,因此我們只需要再次使用AllGather來完成每個加速器上的梯度計算。同時,我們注意到在聚集梯度后,對于每個加速器我們僅需要在內存中的層數的梯度。對此,我們可以使用ReduceScatter算子來在平均后直接把相應的梯度存到編號為i的加速器上,而不是通常情況下的AllReduce。

4.5.2 DALL-E

DALL-E (:cite:ramesh2021zero)是OpenAI提出的一個基于文字的圖片生成模型,模型同樣擁有高達120億參數。在訓練時,除了運用到ZeRO所使用的AllGather + ReduceScatter 技巧,OpenAI團隊在細節上做了進一步的優化,以達到更快的訓練速度。這里,我們簡略介紹以下和集合通信相關的兩點: 1. 我們注意到,集合通信算子的運行速度和通信本身的長度正相關。在模型訓練中,這代表了模型參數本身的大小。對此,DALL-E 選擇用矩陣分解(matrix factorization)的方法先把高維張量調整為一個二維矩陣,通過分解后分開用集合通信算子進行傳輸,從而大大減少了通信量。 2. 另一個減少通信量的方法在于數據類型本身。一個顯然的做法是使用16位的半精度浮點數,相比正常的32位參數表示可以節省近一倍的通信量。但是,在實踐中發現低精度的數據類型會使得模型收斂不穩定,往往導致最終訓練效果大打折扣。為此,OpenAI分析了DALL—E 的模型結構,并把其中的參數根據對數據類型精度的敏感性分為了多個類。其中對精度最敏感的一類照常使用32位浮點表示并只通過AllReduce來同步,而最不敏感的參數則照常通過矩陣分解進行壓縮和傳輸。對于比較敏感的一類,例如Adam 優化其中的動能(moments)和方差(variance)參數,OpenAI 基于 IEEE 754 標準實現了兩個全新的數據類型:1-6-9和0-6-10(其中第一表示正負所需的位數,第二表示指數所需的位數,第三表示有效數字所需的位數),在節省空間和保持收斂性能之間找到了一個平衡。

4.6 集合通信與深度學習系統

最后,集合通信已經被深度集成到了整個機器學習系統之中,以至于一些在庫級別以上的開發者很難意識到系統在訓練和推理時的一些步驟是由底層邏輯實現的。 一般來說,不同的機器學習系統對于集合通信一般提供了兩個級別的抽象,分別是更與硬件耦合的,可以直接調用集合通信算子的庫,和更偏向神經網絡實現的,通過內部調用集合通信算子來實現分布式訓練和推理的深度學習框架。作為算法工程師,通常會接觸到后者的抽象(包括Horovod, KungFu, TensorFlow distributed等),而作為集群的維護者,往往需要深入了解前者的運行原理和具體的調試方法。以深度學習框架 PyTorch 舉例,在torch.distributed 命名空間(namespace)下實現了一系列方便開發者使用的分布式模型訓練和推理函數。在其內部,會根據實際運行的集群調用更底層的集合通信算子庫,例如MPI,NCCL(前文中已有介紹,適用于GPU分布式訓練),gloo(適用于CPU分布式訓練)等。我們來具體對比PyTorch distributed 中對于AllReduce 的應用和 NCCL 的差異性:下面兩段代碼中,前者(:cite:li2022ddp)通過PyTorch自帶的分布式數據并行(Distributed Data Parallel)方法完成了一次簡易的深度學習模型計算,后者則通過gloo的Python 接口pygloo和Ray(:cite:moritz2018ray)完成了一個二維張量的AllReduce計算。

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mpfrom torch.nn.parallel import DistributedDataParallel as DDPdef setup(rank, world_size):os.environ['MASTER_ADDR'] = 'localhost'os.environ['MASTER_PORT'] = '12355'dist.init_process_group("gloo", rank=rank, world_size=world_size)class ToyModel(nn.Module):def __init__(self):super(ToyModel, self).__init__()self.net1 = nn.Linear(10, 10)self.relu = nn.ReLU()self.net2 = nn.Linear(10, 5)def forward(self, x):return self.net2(self.relu(self.net1(x)))def demo_basic(rank, world_size):setup(rank, world_size)model = ToyModel().to(rank)# 通過調用DDP將模型在每個處理器上完成初始化ddp_model = DDP(model, device_ids=[rank])loss_fn = nn.MSELoss()optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)optimizer.zero_grad()outputs = ddp_model(torch.randn(20, 10))labels = torch.randn(20, 5).to(rank)# 在反向傳播時,框架內部會執行AllReduce算法loss_fn(outputs, labels).backward()optimizer.step()def run_demo(demo_fn, world_size):mp.spawn(demo_fn,args=(world_size,),nprocs=world_size,join=True)if __name__ == "__main__":n_gpus = torch.cuda.device_count()assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"run_demo(demo_basic, n_gpus)
import os
import ray
import pygloo
import numpy as np
import multiprocessing@ray.remote(num_cpus=1)
def test_allreduce(rank, world_size, fileStore_path):context = pygloo.rendezvous.Context(rank, world_size)attr = pygloo.transport.tcp.attr("localhost")dev = pygloo.transport.tcp.CreateDevice(attr)fileStore = pygloo.rendezvous.FileStore(fileStore_path)store = pygloo.rendezvous.PrefixStore(str(world_size), fileStore)context.connectFullMesh(store, dev)sendbuf = np.array([[1,2,3],[1,2,3]], dtype=np.float32)recvbuf = np.zeros_like(sendbuf, dtype=np.float32)sendptr = sendbuf.ctypes.datarecvptr = recvbuf.ctypes.data# 標明發送者和者并直接調用AllReducepygloo.allreduce(context, sendptr, recvptr,sendbuf.size, pygloo.glooDataType_t.glooFloat32,pygloo.ReduceOp.SUM, pygloo.allreduceAlgorithm.RING)if __name__ == "__main__":ray.init()world_size = multiprocessing.cpu_count()fileStore_path = f"{ray.worker._global_node.get_session_dir_path()}" + "/collective/gloo/rendezvous"os.makedirs(fileStore_path)ray.get([test_allreduce.remote(rank, world_size, fileStore_path) for rank in range(world_size)])

可以注意到,前者并沒有顯式的調用集合通信算子,而是通過DistributedDataParallel將分布式訓練和正常訓練之間的不同隱藏了起來。如果我們需要在不同集群上運行這段代碼,只需要在setup 函數內相對的更改PyTorch使用的底層集合通信庫即可。在backward函數被調用時,才會真正的使用AllReduce算法。相比下來,如果想要直接使用gloo,不僅需要使用一步一步的創建通信所需要的數據結構,同時也很難和現有的模型訓練框架無縫連接。

5 參數服務器

接下來,我們介紹另一種常見的分布式訓練系統實現:參數服務器。常見的深度學習框架以不同方式提供了參數服務器。TensorFlow和MindSpore原生提供了參數服務器的實現;PyTorch需要用戶使用框架提供的Rpc接口自行實現;還有一些框架則需要用戶使用第三方的參數服務器實現,例如PS-Lite。

5.1 計算與存儲分離

利用參數服務器的其中一個核心需求是實現:計算和存儲的分離。在訓練模型中,計算可以被理解為計算更新模型參數所需要的計算(例如說,計算本地梯度和計算平均梯度),而存儲可以被理解為將模型參數存儲在內存設備中(例如說,主機內存,加速卡內存和SSD設備)。傳統的神經網絡訓練中,計算往往是核心瓶頸,因此我們只需要配置有合適數量的帶有加速卡的服務器,常被稱為訓練服務器(Training servers)。

隨著機器學習的發展,新型的稀疏模型被開發出來。相比于傳統的神經網絡訓練,稀疏模型的訓練往往不需要大量昂貴的計算加速卡(GPU),而需要海量的內存來存儲嵌入表(Embedding table)。例如說,一個大型深度學習推薦系統中,它們往往使用小型的深度神經網絡(如Multi-layer Perception),訓練這種神經網絡只需要幾個GPU即可。而另一方面,推薦系統中往往需要存儲PB級別的嵌入表。嵌入表往往由推薦系統的用戶特征(User feature)和產品特征(Item feature)構成。這些特征往往是大型向量(Vector)。現代推薦系統需要服務數億的用戶,推薦數以千萬的商品。假設用戶的特征是1MB,而系統需要服務10億的用戶,那么用戶的嵌入表就會有1PB的大小。而這個大小遠遠超過了一個深度學習服務器所具有的內存。假如我們部署大量的昂貴的深度學習服務器來存儲海量嵌入表,那么這些服務器上的加速卡的使用率將會極低,無法實現對于硬件的高效利用。

圖11.5.1 參數服務器

為了解決上述問題,人們往往會在稀疏模型集群中混合部署:訓練服務器和參數服務器,從而實現對于計算需求和內存需求分別滿足。 圖11.5.1 描述了帶有參數服務器的機器學習集群。這個集群中含有2個訓練服務器和2個參數服務器,訓練服務器一般是擁有加速卡的計算優化服務器(Compute-optimised server)。而參數服務器一般是內存優化服務器(Memory-optimised server),其的內存大小一般遠遠大于計算優化服務器。在一個稀疏模型中往往擁有神經網絡參數和嵌入表參數。神經網絡較小,其可以存儲在訓練服務器內存中。而嵌入表很大,因此需要存儲在額外的參數服務器中。參數服務器一般會按照鍵-值對(Key-value pairs)的方式來存儲參數。常用的鍵包括用戶名(User ID),產品名(Item ID)或者是參數名(Parameter Key)。常用的值是以多維度向量(Multi-dimensional tensors)表達的模型參數。假如存在多個參數服務器,參數服務器會用數據分區函數(例如,哈希函數和區域劃分)將健-值映射到不同參數服務器上。

為了完成對于模型的訓練,在每一步訓練中,訓練服務器會根據當前的小批量訓練數據,找到本批量中需要用到的參數。例如說,本小批量數據只會訓練部分用戶的特征,那么這些用戶的特征才會需要。根據參數服務器的數據分區函數,訓練服務器可以知道參數當前在哪個參數服務器上,它們因此會用參數的鍵(Key)向對應的參數服務器發起拉取請求(Pull request)。參數服務器響應,并返回對應的值(Value)。訓練服務器將拉取的參數(往往是嵌入表)和本地內存中的模型參數(往往是神經網絡)進行合并,從而對合并的模型進行訓練,計算梯度。假如訓練服務器實現了數據并行,那么訓練服務器計算出的本地梯度需要利用Allreduce計算出平均梯度。對于訓練服務器本地內存中的參數,訓練服務器可以馬上利用平均梯度進行修改。對于在參數服務器中存儲的參數,訓練服務器發起推送請求(Push request)將平均梯度發送到參數服務器,參數服務器更新本地存儲的參數。

在以上的參數服務器架構中,機器學習集群擁有者可以靈活的根據梯度計算所需要算力配置合理數量的訓練服務器。他們也可以根據參數的數量配置大部分的稀疏參數(Sparse parameters)在參數服務器中,僅留下小部分的密集參數(Dense parameters)在訓練服務器中。密集參數和稀疏參數的核心區別是:稀疏參數在每一步訓練不一定都會被用到,他們需要根據當前訓練小批量來決定。而密集參數每一步訓練都需要用到。因此為了避免頻繁從參數服務器中拉取,密集參數往往會存儲在訓練服務器中。

5.2 數據副本

在參數服務器的實際部署中,人們往往需要解決數據熱點問題。互聯網數據往往符合冪律概率(Power-law distribution),這會導致部分稀疏參數在訓練過程中被訪問的次數會顯著高于其他參數。例如說,熱門商品的特征向量被訓練服務器拉取的次數就會遠遠高于非熱門商品。因此,存儲了熱門數據的參數服務器所承受的數據拉取和推送請求會遠遠高于其他參數服務器,因此形成數據熱點,傷害了系統的可擴展性。

解決數據熱點問題的關鍵是利用在沒有副本的情況下,通用的做法是每隔一段時間將所有參數在外存中保存一份檢查點(checkpoint)。當出現機器故障時,首先所有的訓練必須停止,等待故障的機器恢復上線,然后從外存中重新加載檢查點。這就會導致從上一次保存檢查點到故障發生時的數據全部丟失。保存一次檢查點的開銷隨模型大小而增加,訓練大模型時通常每隔1-2小時保存一次。因此無副本的參數服務器如果發生故障,會丟失最多1-2小時的數據。

解決參數服務器故障和數據熱點問題的常用技術是構建模型主從副本(Master-slave replication)。一份參數在多個機器上擁有副本,并指定其中一個副本作為主副本。訓練服務器的所有更新操作都向主副本寫入并同步至從副本上。如何取得共識確定哪一個副本是主副本是分布式系統領域一個經典問題,已經有了相當多的成熟的算法,例如Paxos和Raft。此外,主副本上的更新如何復制到從副本上也同樣是分布式系統領域的經典共識問題。通常系統設計者需要在可用性(Availability)和一致性(Consistency)之間做出取舍。如果參數服務器副本間采用強一致性的復制協議(例如,鏈式副本(Chain replication))則可能導致訓練服務器的推送請求失敗,即參數服務器不可用。反之,如果參數服務器采用弱一致性的復制協議,則可能導致副本間存儲的參數不一致。

5.3 掉隊者問題

參數服務器的另一大核心作用是可以讓用戶方便解決掉隊者問題。在之前的討論中,在每一步訓練結束后,訓練服務器都需要計算平均梯度來對每一個模型副本進行更新,從而保證下一步訓練開始前,全部模型副本的參數的一致性,這種對于參數一致性的確保一般被稱為同步訓練(Synchronous training)。同步訓練一般會有助于訓練系統達到更好的模型精度,但是當系統規模變大,我們往往會在系統中引入掉隊者(Straggler)。掉隊者出現的原因很多。常見的原因包括:掉隊者設備可能和其他設備不在同一個機柜中,因此掉隊者的通訊帶寬顯著小于其他設備。另外,掉隊者設備也可能和其他進程共享本地的服務器計算和通訊資源,形成資源競爭,從而降低了性能。

掉隊者對于基于Allreduce的同步訓練系統的性能有顯著影響,這是因為Allreduce讓全部節點參與到平均梯度的計算和通訊中,而每個節點負責等量的數據。因此任何一個掉隊者的出現,都會讓整個Allreduce操作延遲完成。為了解決這個問題,人們也會使用參數服務器來計算平均梯度。一種常見的設計是:訓練服務器訓練出本地梯度后,會把本地梯度全部推送到參數服務器。參數服務器在等到一定數據訓練服務器(例如說90%的訓練服務器)的本地梯度后,就開始計算平均梯度。這樣可以確保平均梯度的計算不會被落后者的出現延誤。計算好的平均梯度馬上推送給全部訓練服務器,開始下一輪訓練。

解決掉隊者的另外一種常見做法是利用參數服務器實現異步訓練(Asynchronous training)。在一個異步訓練系統中,每個訓練服務器在訓練開始時,有相同的模型參數副本。在訓練中,他們計算出本地梯度后會馬上將本地梯度推送到參數服務器,參數服務器將推送的梯度立刻用于更新參數,并把更新好的參數馬上推送回對應的訓練服務器。在這個過程中,不同的訓練服務器很可能會使用不同版本的模型參數進行本地梯度的計算,這種做法有可能會傷害模型的精度,但它同時讓不同訓練服務器可以按照各自的運算速度來推送和拉取參數,而無需等待同伴,因此避免了掉隊者對于整個集群性能的影響。

6 總結

  • 大型機器學習模型的出現帶來了對于算力和內存需求的快速增長,催生了分布式訓練系統的出現。
  • 分布式訓練系統的設計往往遵循”分而治之”的設計思路。
  • 利用分布式訓練系統,人們可以顯著提升性能,經濟性,并且幫助抵御硬件故障。
  • 分布式訓練系統可以通過數據并行增加設備來提升算力。
  • 當單節點內存不足時,我們可以通過模型并行來解決單設備內存不足。模型并行有兩種實現方式:算子內并行和算子間并行。
  • 大型模型并行系統容易出現設備使用空洞,而這種空洞可以通過流水線并行解決。
  • 分布式訓練系統往往運行在商用數據中心之中,數據中心網絡無法提供充足的網絡帶寬來傳輸大量訓練中生成的梯度。
  • 為了提供海量的帶寬,機器學習集群擁有異構的網絡:以太網,機內網絡(NVLink)和InfiniBand。
  • 為了解決單節點瓶頸,我們可以使用Allreduce來分攤梯度聚合過程中的計算和通訊開銷。
  • 參數服務器可以幫助機器學習集群實現計算-存儲的分離,從而更好的支持大型稀疏模型。
  • 參數服務器常用數據副本技術解決數據熱點問題,同時它們也可以被用來解決同步訓練系統中常見的掉隊者問題。

7 擴展閱讀

  • 分布式機器學習系統:綜述
  • 利用集合通信支持并行訓練的實踐:Horovod
  • AllReduce的工程實現細節:樹形結構,環形結構,二維環面結構,以及CollNet算法
  • 流水線并行的實踐:gPipe
  • 在大規模數據并行下的實踐:Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour
  • 模型并行在超大模型上的實踐:ZeRO
  • 最后,在討論集合通信時,經常可以看到一些關于底層通信接口的專業術語,例如以太網,Infiniband 等。這里給出一些常見術語的具體定義:
  • 以太網(Ethernet)
  • NVLink
  • AWS Elastic Fabric Adapter (EFA)
  • Infiniband
  • RDMA
  • RoCE
  • IPoIB

8 參考文獻

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/532464.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/532464.shtml
英文地址,請注明出處:http://en.pswp.cn/news/532464.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux命令行及各常用工具代理設置

Linux命令行及各常用工具代理設置 命令行代理設置 1 通過命令行指定 直接為當前命令行設置代理 對當前終端的全部工具(apt、curl、wget、git 等全都有效)以下僅以 http 代理為例,如果是其他協議(如 socks 等)自行改…

VimScript 五分鐘入門(翻譯)

VimScript 五分鐘入門(翻譯) 轉自:https://zhuanlan.zhihu.com/p/37352209 譯注:折騰 Vim 當然要能看懂和改寫相關腳本,而中文資料匱乏,缺一個提綱挈領的教程。本文翻譯自 Andrew Scala 的 《Five Minute V…

C++多線程推理、生產者消費者模式封裝

C多線程推理、生產者消費者模式封裝 tensorRT從零起步邁向高性能工業級部署(就業導向) 課程筆記,講師講的不錯,可以去看原視頻支持下。 深度學習推理中的多線程知識概覽 本章介紹的多線程主要是指算法部署時所涉及的多線程內容&a…

在Python中調用C/C++:cython及pybind11

在Python中調用C/C:cython及pybind11 轉自:https://zhuanlan.zhihu.com/p/442935082 Python寫起來非常方便, 但面對大量for循環的時候, 執行速度有些捉急. 原因在于, python是一種動態類型語言, 在運行期間才去做數據類型檢查, 這樣效率就很低(尤其是大規…

Pytorch導出onnx模型,C++轉化為TensorRT并實現推理過程

Pytorch導出onnx模型,C轉化為TensorRT并實現推理過程 前言 本文為旨在實現整個Python導出PyTorch模型,C轉化為TensorRT并實現推理過程過程,只與模型推理,模型部署相關,不涉及模型訓練。為突出整個部署過程而非具體模…

從零Makefile落地算法大項目,完整案例教程

從零Makefile落地算法大項目,完整案例教程 轉自:從零Makefile落地算法大項目,完整案例教程 作者:手寫AI 前言 在這里,你能學到基于Makefile的正式大項目的使用方式和考慮,相信我,其實可以很簡單…

PyTorch擴展自定義PyThonC++(CUDA)算子的若干方法總結

PyTorch擴展自定義PyThon/C(CUDA)算子的若干方法總結 轉自:https://zhuanlan.zhihu.com/p/158643792 作者:奔騰的黑貓 在做畢設的時候需要實現一個PyTorch原生代碼中沒有的并行算子,所以用到了這部分的知識,再不總結就要忘光了 &a…

給 Python 算法插上性能的翅膀——pybind11 落地實踐

給 Python 算法插上性能的翅膀——pybind11 落地實踐 轉自:https://zhuanlan.zhihu.com/p/444805518 作者:jesonxiang(向乾彪),騰訊 TEG 后臺開發工程師 1. 背景 目前 AI 算法開發特別是訓練基本都以 Python 為主&…

chrome自動提交文件_收集文檔及提交名單統計

知乎文章若有排版問題請見諒,原文放在個人博客中【歡迎互踩!】文叔叔文檔收集使用動機在我們的學習工作中,少不了要讓大家集體提交文件的情況,舉個最簡單的例子:收作業。 傳統的文件收集流程大致是:群內發出…

Pytorch自定義C++/CUDA擴展

Pytorch自定義C/CUDA擴展 翻譯自:官方文檔 PyTorch 提供了大量與神經網絡、張量代數、數據整理和其他操作。但是,我們有時會需要更加定制化的操作。例如,想要使用論文中找到的一種新型的激活函數,或者實現自己設計的算子。 在 Py…

惠普800g1支持什么內存_惠普黑白激光打印機哪種好 惠普黑白激光打印機推薦【圖文詳解】...

打印機的出現讓我們在生活和日常工作中變得越來越方便,不過隨著科技的發展,打印機的類型也變得非常多,其中就有黑白激光打印機,而黑白激光打印機的品牌也有很多,比如我們的惠普黑白激光打印機,今天小編就給…

控制臺輸出顏色控制

控制臺輸出顏色控制 轉自:https://cloud.tencent.com/developer/article/1142372 前端時間,寫了一篇 PHP 在 Console 模式下的進度顯示 ,正好最近的一個數據合并項目需要用到控制臺顏色輸出,所以就把相關的信息整理下,…

idea連接跳板機_跳板機服務(jumpserver)

一、跳板機服務作用介紹1、有效管理用戶權限信息2、有效記錄用戶登錄情況3、有效記錄用戶操作行為二、跳板機服務架構原理三、跳板機服務安裝過程第一步:安裝跳板機依賴軟件yum -y install git python-pip mariadb-devel gcc automake autoconf python-devel readl…

【詳細圖解】再次理解im2col

【詳細圖解】再次理解im2col 轉自:https://mp.weixin.qq.com/s/GPDYKQlIOq6Su0Ta9ipzig 一句話:im2col是將一個[C,H,W]矩陣變成一個[H,W]矩陣的一個方法,其原理是利用了行列式進行等價轉換。 為什么要做im2col? 減少調用gemm的次數。 重要…

反思 大班 快樂的機器人_幼兒園大班教案《快樂的桌椅》含反思

大班教案《快樂的桌椅》含反思適用于大班的體育主題教學活動當中,讓幼兒提高協調性和靈敏性,創新桌椅的玩法,正確爬的方法,學會匍匐前進,快來看看幼兒園大班《快樂的桌椅》含反思教案吧。幼兒園大班教案《快樂的桌椅》…

DCN可形變卷積實現1:Python實現

DCN可形變卷積實現1:Python實現 我們會先用純 Python 實現一個 Pytorch 版本的 DCN ,然后實現其 C/CUDA 版本。 本文主要關注 DCN 可形變卷積的代碼實現,不會過多的介紹其思想,如有興趣,請參考論文原文: …

藍牙耳機聲音一頓一頓的_線控耳機黨陣地轉移成功,OPPO這款TWS耳機體驗滿分...

“你看到我手機里3.5mm的耳機孔了嗎”,這可能是許多線控耳機黨最想說的話了。確實,如今手機在做“減法”,而廠商們首先就拿3.5mm耳機孔“開刀”,我們也喪失了半夜邊充電邊戴耳機打游戲的樂趣。竟然如此,那如何在耳機、…

AI移動端優化之Im2Col+Pack+Sgemm

AI移動端優化之Im2ColPackSgemm 轉自:https://blog.csdn.net/just_sort/article/details/108412760 這篇文章是基于NCNN的Sgemm卷積為大家介紹Im2ColPackSgemm的原理以及算法實現,希望對算法優化感興趣或者做深度學習模型部署的讀者帶來幫助。 1. 前言 …

elementui的upload組件怎么獲取上傳的文本流、_抖音feed流直播間引流你還不會玩?實操講解...

本文由艾奇在線明星優化師寫作計劃出品在這個全民驚恐多災多難且帶有魔幻的2020,一場突如其來的疫情改變了人們很多消費習慣,同時加速了直播電商的發展,現在直播已經成為商家必爭的營銷之地,直播雖然很火,但如果沒有流…

FFmpeg 視頻處理入門教程

FFmpeg 視頻處理入門教程 轉自:https://www.ruanyifeng.com/blog/2020/01/ffmpeg.html 作者: 阮一峰 日期: 2020年1月14日 FFmpeg 是視頻處理最常用的開源軟件。 它功能強大,用途廣泛,大量用于視頻網站和商業軟件&…