[分布式訓練] 單機多卡的正確打開方式:PyTorch
轉自:https://fyubang.com/2019/07/23/distributed-training3/
PyTorch的數據并行相對于TensorFlow而言,要簡單的多,主要分成兩個API:
- DataParallel(DP):Parameter Server模式,一張卡位reducer,實現也超級簡單,一行代碼。
- DistributedDataParallel(DDP):All-Reduce模式,本意是用來分布式訓練,但是也可用于單機多卡。
1. DataParallel
DataParallel是基于Parameter server的算法,負載不均衡的問題比較嚴重,有時在模型較大的時候(比如bert-large),reducer的那張卡會多出3-4g的顯存占用。
先簡單定義一下數據流和模型。
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import osinput_size = 5
output_size = 2
batch_size = 30
data_size = 30class RandomDataset(Dataset):def __init__(self, size, length):self.len = lengthself.data = torch.randn(length, size)def __getitem__(self, index):return self.data[index]def __len__(self):return self.lenrand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),batch_size=batch_size, shuffle=True)class Model(nn.Module):# Our modeldef __init__(self, input_size, output_size):super(Model, self).__init__()self.fc = nn.Linear(input_size, output_size)def forward(self, input):output = self.fc(input)print(" In Model: input size", input.size(),"output size", output.size())return output
model = Model(input_size, output_size)if torch.cuda.is_available():model.cuda()if torch.cuda.device_count() > 1:print("Let's use", torch.cuda.device_count(), "GPUs!")# 就這一行model = nn.DataParallel(model)for data in rand_loader:if torch.cuda.is_available():input_var = Variable(data.cuda())else:input_var = Variable(data)output = model(input_var)print("Outside: input size", input_var.size(), "output_size", output.size())
2. DistributedDataParallel
官方建議用新的DDP,采用all-reduce算法,本來設計主要是為了多機多卡使用,但是單機上也能用,使用方法如下:
初始化使用nccl后端:
torch.distributed.init_process_group(backend="nccl")
模型并行化:
model=torch.nn.parallel.DistributedDataParallel(model)
需要注意的是:DDP并不會自動shard數據
-
如果自己寫數據流,得根據
torch.distributed.get_rank()
去shard數據,獲取自己應用的一份 -
如果用Dataset API,則需要在定義Dataloader的時候用
DistributedSampler
去shard:sampler = DistributedSampler(dataset) # 這個sampler會自動分配數據到各個gpu上 DataLoader(dataset, batch_size=batch_size, sampler=sampler)
完整的例子:
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os
from torch.utils.data.distributed import DistributedSampler
# 1) 初始化
torch.distributed.init_process_group(backend="nccl")input_size = 5
output_size = 2
batch_size = 30
data_size = 90# 2) 配置每個進程的gpu
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)class RandomDataset(Dataset):def __init__(self, size, length):self.len = lengthself.data = torch.randn(length, size).to('cuda')def __getitem__(self, index):return self.data[index]def __len__(self):return self.lendataset = RandomDataset(input_size, data_size)
# 3)使用DistributedSampler
rand_loader = DataLoader(dataset=dataset,batch_size=batch_size,sampler=DistributedSampler(dataset))class Model(nn.Module):def __init__(self, input_size, output_size):super(Model, self).__init__()self.fc = nn.Linear(input_size, output_size)def forward(self, input):output = self.fc(input)print(" In Model: input size", input.size(),"output size", output.size())return outputmodel = Model(input_size, output_size)# 4) 封裝之前要把模型移到對應的gpu
model.to(device)if torch.cuda.device_count() > 1:print("Let's use", torch.cuda.device_count(), "GPUs!")# 5) 封裝model = torch.nn.parallel.DistributedDataParallel(model,device_ids=[local_rank],output_device=local_rank)for data in rand_loader:if torch.cuda.is_available():input_var = dataelse:input_var = dataoutput = model(input_var)print("Outside: input size", input_var.size(), "output_size", output.size())
需要通過命令行啟動:
CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 torch_ddp.py
結果:
Let's use 2 GPUs!
Let's use 2 GPUs!In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
Outside: input size torch.Size([15, 5]) output_size torch.Size([15, 2])In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2])
Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
Outside: input size torch.Size([15, 5]) output_size torch.Size([15, 2])
可以看到有兩個進程,log打印了兩遍
torch.distributed.launch
會給模型分配一個 args.local_rank
的參數,也可以通過torch.distributed.get_rank()
獲取進程id。