什么是數據并行?
數據并行(Data Parallelism, DP)的核心思想是將大規模的數據集分割成若干個較小的數據子集,并將這些子集分配到不同的 NPU 計算節點上,每個節點運行相同的模型副本,但處理不同的數據子集。
1. 將數據區分為不同的mini-batch
將數據集切分為若干子集,每個mini-batch又不同的設備獨立處理。如你有4個GPU,可以把數據集分為4分,每個GPU處理一個字數據集。
2. 模型參數同步
可以通過某一進程初始化全部模型參數后,向其他進程廣播模型參數,實現同步。
3. 前向運算
每個設備獨立計算前向運算。
4. 反向運算
每個設備計算損失的梯度。
5. 梯度聚合
當所有設備計算完各自的梯度后,對所有設備的梯度取平均,每個設備的模型參數根據平均梯度進行更新。
6. 參數更新
數據并行的參數更新是在數據切分、模型參數同步后進行的。更新前,每個進程的參數相同;更新時,平均梯度相同;故更新后,每個進程的參數也相同。
數據并行的基本操作
Reduce 歸約
歸約是函數式編程的概念。數據歸約包括通過函數將一組數字歸約為較小的一組數字。
如sum([1, 2, 3, 4, 5])=15, multiply([1, 2, 3, 4, 5])=120。
AllReduce
等效于執行Reduce操作后,將結果廣播分配給所有進程。
MindSpore AllReduce
import numpy as np
from mindspore.communication import init
from mindspore.communication.comm_func import all_reduce
from mindspore import Tensorinit()
input_tensor = Tensor(np.ones([2, 8]).astype(np.float32))
output = all_reduce(input_tensor)
數據并行的主要計算思想
Parameter-Server
主要思想:所有node被分為server node和worker node
Server node:負責參數的存儲和全局的聚合操作
Worker node:負責計算?
Parameter-Server的問題:
- 假設有N=5張卡,GPU0作為Server,其余作為Worker
- 將大小為K的數據拆分為N-1份,分給每個Worker GPU
- 每個GPU計算得到local gradients
- N-1塊GPU將計算所得的local gradients發送給GPU 0
- GPU 0對所有local gradients進行all reduce操作得到全局梯度,參數更新
- 將該新模型的參數返回給每張GPU
假設單個Worker到Server的通信開銷為C,那么將local gradients送到GPU 0上的通信成本為C * (N - 1)。收到GPU 0通信帶寬的影響,通信成本隨著設備數的增加而線性增長。
Pytorch DataParallel?
Pytorch DP在Parameter-Server的基礎上,把GPU 0即當作Server也當作Worker。
1. 切分數據,但不切分Label
每個GPU進行正向計算之后,將正向計算結果聚合回GPU 0計算Loss,GPU 0計算完Loss的gradient之后,將梯度分發回其他worker GPU。隨后各個GPU計算整個模型的grad,再將grad聚合回GPU 0,進行AllReduce。
2. 切分數據,同時切分Label?
每張卡自己計算Loss即可,減少一次聚合操作。
Pytorch DataParallel 問題:
1. 為擺脫Parameter-Server模式,性能差。
2. 需要額外的GPU進行梯度聚合/ GPU 0需要額外的顯存。GPU 0限制了其他GPU的上限。
Ring AllReduce
每張卡單向通訊,通訊開銷一定。
每張卡占用的顯存相同。
第一步:Scatter-Reduce?
假設每張卡上各自計算好了梯度。
每張GPU依次傳值:?
重復直至:?
第二步:All-Gather
將每一個累計值a / b / c逐個發送至個張卡
直至每張卡都有每層的梯度累計值。
兩步分別做了四次通訊,便可以實現并行計算。
Ring AllReduce計算開銷
- N-1次Scatter-Reduce
- N-1次All-Gather
- 每個GPUGPU一次通訊量為:K/N,K為總數據大小
- 每個GPU通信次數為:2(N-1)
總通信量為:
當N足夠多時,通信量為一個常數2K。
Gradient Bucketing
集合通信在大張量上更有效。因此,可以在短時間內等待并將多個梯度存儲到一個數據桶(Bucket),然后進行AllReduce操作。而不是對每個梯度立刻啟動AllReduce操作。
MindSpore數據并行?
def forward_fn(data, target):logits = net(data)loss = loss_fn(logits, target)return loss, logitsgrad_fn = ms.value_and_grad(forward_fn, None, net_trainable_param(), has_aux=True)
# 初始化reducer
grad_gather = nn.DistributedGradReducer(optimizer.parameters)for epoch in range(10):i = 0for image, label in data_set:(loss_value, _), grads = grad_fn(image, label)# 進行通訊grads = grad_reducer(grads)optimizer(grads)# ...
MindNLP數據并行
def update_gradient_by_distributed_type(self, model: nn.Module) -> None:'''update gradient by distributed_type'''if accelerate_distributed_type == DistributedType.NO:returnif accelerate_distributed_type == DistrivutedType.MULTI_NPU:from mindspore.communication import get_group_sizefrom mindspore.communication.comm_func iport all_reducerank_size = get_group_size()for parameter in model.parameters():# 進行all_reducenew_grads_mean = all_reduce(parameter.grad) / rank_sizeparameter.grad = new_grads_mean
數據并行的局限性
要求單卡可以放下模型
多卡訓練時內存冗余,相同模型參數復制了多份。
MindSopre中的數據并行
1. 在啟智社區創建云腦任務或華為云創建notebook
環境選擇:mindspore==2.3.0, cann==8.0,昇騰910 * 2
?2. 更新MindSpore框架版本
pip install --upgrade mindspore
同時可以查看NPU信息:
npu--smi info
3. 配置項目環境
克隆mindnlp項目
git clone https://github.com/mindspore-lab/mindnlp.git
下載mindnlp
cd mindnlp
bash scripts/build_and_reinstall.sh
下載完成后,卸載mindformers、soundfile
pip uninstall mindformers
4. 運行訓練腳本
cd mindnlp/llm/parallel/bert_imdb_finetune_dp
msrun --worker_num=2 --local_worker_num=2 --master_port=8118 bert_imdb_finetune_cpu_mindnlp_trainer_npus_same.py
發現兩個NPU都被占用?
日志文件開始記錄模型訓練進度?
成功實現數據并行!?
基于MindSpore微調Roberta+數據并行
數據集:imdb影評數據集
微調代碼:roberta.py
#!/usr/bin/env python
# coding: utf-8
"""
unset MULTI_NPU && python bert_imdb_finetune_cpu_mindnlp_trainer_npus_same.py
bash bert_imdb_finetune_npu_mindnlp_trainer.sh
"""
import mindspore.dataset as ds
from mindnlp.dataset import load_dataset# loading dataset
imdb_ds = load_dataset('imdb', split=['train', 'test'])
imdb_train = imdb_ds['train']
imdb_test = imdb_ds['test']imdb_train.get_dataset_size()import numpy as npdef process_dataset(dataset, tokenizer, max_seq_len=512, batch_size=4, shuffle=False):is_ascend = mindspore.get_context('device_target') == 'Ascend'def tokenize(text):if is_ascend:tokenized = tokenizer(text, padding='max_length', truncation=True, max_length=max_seq_len)else:tokenized = tokenizer(text, truncation=True, max_length=max_seq_len)return tokenized['input_ids'], tokenized['attention_mask']if shuffle:dataset = dataset.shuffle(batch_size)# map datasetdataset = dataset.map(operations=[tokenize], input_columns="text", output_columns=['input_ids', 'attention_mask'])dataset = dataset.map(operations=transforms.TypeCast(mindspore.int32), input_columns="label", output_columns="labels")# batch datasetif is_ascend:dataset = dataset.batch(batch_size)else:dataset = dataset.padded_batch(batch_size, pad_info={'input_ids': (None, tokenizer.pad_token_id),'attention_mask': (None, 0)})return datasetfrom mindnlp.transformers import AutoTokenizer
import mindspore
import mindspore.dataset.transforms as transforms
# tokenizer
tokenizer = AutoTokenizer.from_pretrained('roberta-base')dataset_train = process_dataset(yelp_ds_train, tokenizer, shuffle=True)
from mindnlp.transformers import AutoModelForSequenceClassification# set bert config and define parameters for training
model = AutoModelForSequenceClassification.from_pretrained('AI-ModelScope/roberta-base', num_labels=2, mirror='modelscope')from mindnlp.engine import TrainingArgumentstraining_args = TrainingArguments(output_dir="./",save_strategy="epoch",logging_strategy="epoch",num_train_epochs=3,learning_rate=2e-5
)training_args = training_args.set_optimizer(name="adamw", beta1=0.8)from mindnlp.engine import Trainertrainer = Trainer(model=model,args=training_args,train_dataset=dataset_train
)print('start training')
trainer.train()
運行命令:
msrun --worker_num=2 --local_worker_num=2 --master_port=8118 roberta.py