相關博客
【Megatron-DeepSpeed】張量并行工具代碼mpu詳解(四):張量并行版Embedding層及交叉熵的實現及測試
【Megatron-DeepSpeed】張量并行工具代碼mpu詳解(三):張量并行層的實現及測試
【Megatron-DeepSpeed】張量并行工具代碼mpu詳解(一):并行環境初始化
【Megatron-DeepSpeed】張量并行工具代碼mpu詳解(二):Collective通信操作的封裝mappings
【深度學習】【分布式訓練】DeepSpeed:AllReduce與ZeRO-DP
【深度學習】混合精度訓練與顯存分析
【深度學習】【分布式訓練】Collective通信操作及Pytorch示例
【自然語言處理】【大模型】大語言模型BLOOM推理工具測試
【自然語言處理】【大模型】GLM-130B:一個開源雙語預訓練語言模型
【自然語言處理】【大模型】用于大型Transformer的8-bit矩陣乘法介紹
? Megatron-DeepSpeed是DeepSpeed版本的NVIDIA Megatron-LM。像BLOOM、GLM-130B等主流大模型都是基于Megatron-DeepSpeed開發的。這里以BLOOM版本的Megetron-DeepSpeed為例,介紹其模型并行代碼mpu的細節(位于megatron/mpu下)。
? 理解該部分的代碼需要對模型并行的原理以及集合通信有一定的理解,可以看文章:
- 【深度學習】【分布式訓練】Collective通信操作及Pytorch示例
- 【深度學習】【分布式訓練】一文捋順千億模型訓練技術:流水線并行、張量并行和3D并行
- 【深度學習】【分布式訓練】DeepSpeed:AllReduce與ZeRO-DP
? 強烈建議閱讀,不然會影響本文的理解:
- 【Megatron-DeepSpeed】張量并行工具代碼mpu詳解(一):并行環境初始化
- 【Megatron-DeepSpeed】張量并行工具代碼mpu詳解(二):Collective通信操作的封裝mappings
- 【Megatron-DeepSpeed】張量并行工具代碼mpu詳解(三):張量并行層的實現及測試
閱讀建議:
- 本文僅會解析核心代碼,并會不介紹所有代碼;
- 本文會提供一些測試腳本來展現各部分代碼的功能;
- 建議實際動手實操來加深理解;
- 建議對Collective通信以及分布式模型訓練有一定理解,再閱讀本文;
一、總覽
? mpu目錄下核心文件有:
- initialize.py:負責數據并行組、張量并行組和流水線并行組的初始化,以及獲取與各類并行組相關的信息;
- data.py:實現張量并行中的數據廣播功能;
- cross_entropy.py:張量并行版本的交叉熵;
- layers.py:并行版本的Embedding層,以及列并行線性層和行并行線性層;
- mappings.py:用于張量并行的通信操作;
二、張量并行版Embedding層
? Embedding層本質就是一個查找表。如上圖所示,張量并行版embedding層就是將完整的embedding層,在vocab的維度切分。張量并行組中的每個進程僅持有部分embedding層。
1. 實現代碼
? 這里直接在原始的文件(megatron/mpu/layers.py)中,添加一個自定義的并行版Embedding層。其與原始版完全相同,僅添加了一些輸出來展示整個過程。
# layers.py
class MyVocabParallelEmbedding(torch.nn.Module):def __init__(self, num_embeddings, embedding_dim,init_method=init.xavier_normal_):super(MyVocabParallelEmbedding, self).__init__()# 初始化一些參數self.num_embeddings = num_embeddings # 詞表大小self.embedding_dim = embedding_dimself.padding_idx = Noneself.max_norm = Noneself.norm_type = 2.self.scale_grad_by_freq = Falseself.sparse = Falseself._weight = Noneself.tensor_model_parallel_size = get_tensor_model_parallel_world_size()# 張量并行組中的每個rank僅持有部分vocab embedding# 這里會計算當前rank持有的vocab的起始和結束位置self.vocab_start_index, self.vocab_end_index = \VocabUtility.vocab_range_from_global_vocab_size(self.num_embeddings, get_tensor_model_parallel_rank(),self.tensor_model_parallel_size)# 當前rank持有的部分vocab的大小self.num_embeddings_per_partition = self.vocab_end_index - \self.vocab_start_indexargs = get_args()# embedding層添加LayerNormif mpu.is_pipeline_first_stage() and (args.use_bnb_optimizer or args.embed_layernorm):self.norm = LayerNorm(embedding_dim)# bnb是指bitsandbytes,該庫針對8-bit做了一些cuda函數的封裝,這里忽略if args.use_bnb_optimizer:# for BNB we ignore the passed init_method and use torch.nn.init.xavier_uniform_# modified to calculate std on the unpartitioned embeddinginit_method = partial(xavier_uniform_tensor_parallel_, tp_degree=self.tensor_model_parallel_size)# 初始化embedding層的權重# 每個rank僅初始化自己所持有的那部分if args.use_cpu_initialization:self.weight = Parameter(torch.empty(self.num_embeddings_per_partition, self.embedding_dim,dtype=args.params_dtype))_initialize_affine_weight_cpu(self.weight, self.num_embeddings, self.embedding_dim,self.num_embeddings_per_partition, 0, init_method)else:self.weight = Parameter(torch.empty(self.num_embeddings_per_partition, self.embedding_dim,device=torch.cuda.current_device(), dtype=args.params_dtype))_initialize_affine_weight_gpu(self.weight, init_method,partition_dim=0, stride=1)# bnb(忽略)if args.use_bnb_optimizer:from bitsandbytes.optim import GlobalOptimManagerGlobalOptimManager.get_instance().override_config(self.weight, 'optim_bits', 32)GlobalOptimManager.get_instance().register_parameters(self.weight)def forward(self, input_):if torch.any(input_ >= self.num_embeddings):raise ValueError(f"There is an input id in the input that is greater than the highest possible input id.\nInput: {input_}\nnum_embeddings: {self.num_embeddings}")# 全局rankglobal_rank = torch.distributed.get_rank()# 張量并行組中的ranktp_rank = get_tensor_model_parallel_rank()info = f"*"*20 + \f"\n> global_rank={global_rank}\n" + \f"> tensor parallel rank={tp_rank}\n" + \f"> full embedding size={(self.num_embeddings, self.embedding_dim)}\n" + \f"> partial embedding size={list(self.weight.size())}\n" \f"> input = {input_}\n" \f"> vocab_start_index={self.vocab_start_index}, vocab_end_index={self.vocab_end_index}\n"if self.tensor_model_parallel_size > 1:# Build the mask.input_mask = (input_ < self.vocab_start_index) | \(input_ >= self.vocab_end_index)# Mask the input.masked_input = input_.clone() - self.vocab_start_indexmasked_input[input_mask] = 0else:# input_ is garanted to be in the range [0:self.vocab_end_index - self.vocab_start_index] thanks to the first checkmasked_input = input_info += f"> input_mask={input_mask} \n"info += f"> masked_input={masked_input} \n"# 獲得embeddingoutput_parallel = F.embedding(masked_input, self.weight,self.padding_idx, self.max_norm,self.norm_type, self.scale_grad_by_freq,self.sparse)# 由于在當前rank上,僅能獲得部分輸入的embedding# 因此,將mask掉的input對應的embedding設置為全0if self.tensor_model_parallel_size > 1:output_parallel[input_mask, :] = 0.0info += f"> output_parallel={output_parallel}\n"# 上一步設置為全0的embedding會在這一步通過allreduce,組裝成完整的embeddingoutput = reduce_from_tensor_model_parallel_region(output_parallel)info += f"> output={output}\n"if hasattr(self, 'norm'):output = self.norm(output)print(info, end="")return output
2. 測試腳本
? 實驗設置為:張量并行度為2,流水線并行度也為2。測試腳本比較簡單,直接調用上面實現的MyVocabParallelEmbedding
。
# test_embedding.py
import sys
sys.path.append("..")from megatron.mpu import layers
from commons import set_random_seed
from commons import print_separator
from megatron.initialize import _initialize_distributed
from megatron.global_vars import set_global_variables
import megatron.mpu as mpu
from torch.nn.parameter import Parameter
import torch.nn.init as init
import torch
import randomdef test_parallel_embedding():batch_size = 2seq_length = 4vocab_size = 6hidden_size = 8seed = 123set_random_seed(seed)# (2,4)input_data = torch.LongTensor(size=(batch_size, seq_length)).random_(0, vocab_size).cuda()embedding_vocab_parallel = layers.MyVocabParallelEmbedding(vocab_size, hidden_size, init_method=init.normal_).cuda()output = embedding_vocab_parallel(input_data)def main():set_global_variables(ignore_unknown_args=True)_initialize_distributed()world_size = torch.distributed.get_world_size()print_separator('Test test_parallel_embedding')test_parallel_embedding()if __name__ == '__main__':main()
啟動命令:
options=" \--tensor-model-parallel-size 2 \--pipeline-model-parallel-size 2 \--num-layers 10 \--hidden-size 768 \--micro-batch-size 2 \--num-attention-heads 32 \--seq-length 512 \--max-position-embeddings 512\--use_cpu_initialization True"cmd="deepspeed test_embedding.py $@ ${options}"eval ${cmd}
3. 測試結果
- 全局rank為2,在張量并行組中的rank為0;
- 完整的embedding層大小應為(6, 8),當前設備持有的embedding層大小為(3, 8),符合張量并行度為2的假設;
- 當前設備持有的詞表id范圍介于0到3,輸入中超出該詞表范圍都會被mask;
- 當前設備的輸出(output_parallel),會有部分embedding為全0,而完整的輸出(output)則將張量并行組中所有的embedding輸出都聚合在一起;
三、張量并行版交叉熵
? 我們以自然語言模型為例,展示交叉熵的計算原理。
? 若模型針對單個token預測的logit表示為 l ? = [ l 1 , … , l k ] \vec{l}=[l_1,\dots,l_k] l=[l1?,…,lk?],經過softmax轉換后的概率分布為 p ? = [ p 1 , … , p k ] \vec{p}=[p_1,\dots,p_k] p?=[p1?,…,pk?],其中:
p i = e l i ∑ j k e l j p_i=\frac{e^{l_i}}{\sum_{j}^k e^{l_j}} pi?=∑jk?elj?eli??
該token的真實標簽表示為 y ? = [ y 1 , … , y k ] \vec{y}=[y_1,\dots,y_k] y?=[y1?,…,yk?],由于其是one-hot編碼,所以 y ? \vec{y} y?中僅有一個值為1,其余均為0。那么該token上的交叉熵損失函數為
loss = ? ∑ i = 1 k y i log ? ( p i ) = ? ∑ i = 1 k y i log ? ( e l i ∑ j k e l j ) = ∑ i = 1 k y i [ log ? ( ∑ j k e l j ) ? log ? ( e l i ) ] = log ? ( ∑ j k e l j ) ? ∑ i = 1 k y i log ? ( e l i ) = log ? ( ∑ j k e l j ) ? ∑ i = 1 k y i l i \begin{align} \text{loss}&=-\sum_{i=1}^k y_i\log(p_i) \\ &=-\sum_{i=1}^k y_i\log(\frac{e^{l_i}}{\sum_{j}^k e^{l_j}}) \\ &=\sum_{i=1}^k y_i[\log(\sum_{j}^k e^{l_j})-\log(e^{l_i})] \\ &=\log(\sum_{j}^k e^{l_j})-\sum_{i=1}^k y_i \log(e^{l_i}) \\ &=\log(\sum_{j}^k e^{l_j})-\sum_{i=1}^k y_i {l_i} \end{align} loss?=?i=1∑k?yi?log(pi?)=?i=1∑k?yi?log(∑jk?elj?eli??)=i=1∑k?yi?[log(j∑k?elj?)?log(eli?)]=log(j∑k?elj?)?i=1∑k?yi?log(eli?)=log(j∑k?elj?)?i=1∑k?yi?li???
由于模型輸出的 l ? \vec{l} l是已知的,那么上式第一項 log ? ( ∑ j k e l j ) \log(\sum_{j}^k e^{l_j}) log(∑jk?elj?)是一個固定的常數;由于所有的 y i y_i yi?中僅有一個是1,那么第二項 ∑ i = 1 k y i l i \sum_{i=1}^k y_i {l_i} ∑i=1k?yi?li?本質上就是正確token對應的logit值。
? mpu代碼中的交叉熵實現基本上遵循上面的分析,僅是添加了batch size和seq_length維度,但核心思想不變。
1. 實現代碼
? 同樣,也是在原始文件(megatron/mpu/cross_entropy.py)中,添加一個自定義的并行版交叉熵。該實現與原版完全相同,僅添加了一些輸出來展示整個過程。
# cross_entropy.py
class _MyVocabParallelCrossEntropy(torch.autograd.Function):@staticmethoddef forward(ctx, vocab_parallel_logits, target):# vocab_parallel_logits: (batch_size, seq_length, vocab_size)# target: (batch_size, seq_length)global_rank = torch.distributed.get_rank()tp_rank = get_tensor_model_parallel_rank()# 在vocab維度取最大值,也就是每個token對于logits的最大值logits_max = torch.max(vocab_parallel_logits, dim=-1)[0]torch.distributed.all_reduce(logits_max,op=torch.distributed.ReduceOp.MAX,group=get_tensor_model_parallel_group())vocab_parallel_logits.sub_(logits_max.unsqueeze(dim=-1))info = f"*"*20 + \f"\n> global_rank={global_rank}\n" + \f"> tp_rank={tp_rank}\n" + \f"> size of vocab_parallel_logits={list(vocab_parallel_logits.size())}\n" + \f"> size of target={list(target.size())}\n"# 依據當前進程持有的部分詞表大小partition_vocab_size,以及張量并行組中rank和world size,# 確定出當前進程持有詞表的起始索引vocab_start_index和結束索引vocab_end_indexget_vocab_range = VocabUtility.vocab_range_from_per_partition_vocab_sizepartition_vocab_size = vocab_parallel_logits.size()[-1]rank = get_tensor_model_parallel_rank()world_size = get_tensor_model_parallel_world_size()vocab_start_index, vocab_end_index = get_vocab_range(partition_vocab_size, rank, world_size)# 將不在詞表中的target遮蔽掉target_mask = (target < vocab_start_index) | (target >= vocab_end_index)masked_target = target.clone() - vocab_start_indexmasked_target[target_mask] = 0# ligits_2d: (batch_size*seq_length, vocab_size)logits_2d = vocab_parallel_logits.view(-1, partition_vocab_size)# masked_target_1d: (batch_size*seq_length)masked_target_1d = masked_target.view(-1)arange_1d = torch.arange(start=0, end=logits_2d.size()[0],device=logits_2d.device)# predicted_logits_1d 表示正確token對應的logitpredicted_logits_1d = logits_2d[arange_1d, masked_target_1d]predicted_logits_1d = predicted_logits_1d.clone().contiguous()predicted_logits = predicted_logits_1d.view_as(target)# 將當前進程無法獲得的logits設置為0,用于后續allreduce組成完成logitspredicted_logits[target_mask] = 0.0info += f"> size of logits_2d={list(logits_2d.size())}\n" + \f"> size of masked_target_1d={list(masked_target_1d.size())}\n" + \f"> size of predicted_logits={list(predicted_logits_1d.size())}\n"# 各個進程持有的predicted_logits的大小是完全相同的# 但是,當前進程持有的predicted_logits僅在當前詞表上才有取值,其余值為0# 通過allreduce即可得到完整的predicted_logitstorch.distributed.all_reduce(predicted_logits,op=torch.distributed.ReduceOp.SUM,group=get_tensor_model_parallel_group())# 求softmax分母的部分exp_logits = vocab_parallel_logitstorch.exp(vocab_parallel_logits, out=exp_logits)sum_exp_logits = exp_logits.sum(dim=-1)torch.distributed.all_reduce(sum_exp_logits,op=torch.distributed.ReduceOp.SUM,group=get_tensor_model_parallel_group())# 對應上面公式推導的最終結果# loss: (batch_size, seq_length)。# loss是一個矩陣,矩陣的值對應單個token的交叉熵loss = torch.log(sum_exp_logits) - predicted_logitsinfo += f"> size of sum_exp_logits={list(sum_exp_logits.size())}\n" + \f"> size of loss={list(loss.size())}\n"print(info, end="")exp_logits.div_(sum_exp_logits.unsqueeze(dim=-1))ctx.save_for_backward(exp_logits, target_mask, masked_target_1d)return loss@staticmethoddef backward(ctx, grad_output):# Retreive tensors from the forward path.softmax, target_mask, masked_target_1d = ctx.saved_tensors# All the inputs have softmax as thier gradient.grad_input = softmax# For simplicity, work with the 2D gradient.partition_vocab_size = softmax.size()[-1]grad_2d = grad_input.view(-1, partition_vocab_size)# Add the gradient from matching classes.arange_1d = torch.arange(start=0, end=grad_2d.size()[0],device=grad_2d.device)grad_2d[arange_1d, masked_target_1d] -= (1.0 - target_mask.view(-1).float())# Finally elementwise multiplication with the output gradients.grad_input.mul_(grad_output.unsqueeze(dim=-1))return grad_input, None
2. 測試腳本
# test_cross_entropy.py
import sys
sys.path.append("..")from commons import set_random_seed
from commons import IdentityLayer
from commons import print_separator
from commons import initialize_distributed
from megatron.mpu.cross_entropy import _MyVocabParallelCrossEntropy
import megatron.mpu as mpu
import torch.nn.functional as F
import torch
import randomdef test_cross_entropy():tensor_model_parallel_size = mpu.get_tensor_model_parallel_world_size()batch_size = 32seq_length = 128vocab_size_per_partition = 500logits_scale = 1000.0vocab_size = vocab_size_per_partition * tensor_model_parallel_sizeseed = 1234set_random_seed(seed)identity = IdentityLayer((batch_size, seq_length, vocab_size),scale=logits_scale).cuda()logits = identity()logits_parallel = mpu.scatter_to_tensor_model_parallel_region(logits)target = torch.cuda.LongTensor(size=(batch_size, seq_length)).random_(0, vocab_size)loss = _MyVocabParallelCrossEntropy.apply(logits_parallel, target).mean()if __name__ == '__main__':initialize_distributed()world_size = torch.distributed.get_world_size()tensor_model_parallel_size = 2pipeline_model_parallel_size = 2mpu.initialize_model_parallel(tensor_model_parallel_size,pipeline_model_parallel_size)test_cross_entropy()
啟動命名:
deepspeed test_cross_entropy.py