昇思25天學習打卡營第12天|ResNet50遷移學習
- 前言
- ResNet50遷移學習
- 數據準備
- 下載數據集
- 加載數據集
- 數據集可視化
- 訓練模型
- 構建Resnet50網絡
- 固定特征進行訓練
- 訓練和評估
- 可視化模型預測
- 個人任務打卡(讀者請忽略)
- 個人理解與總結
前言
??非常感謝華為昇思大模型平臺和CSDN邀請體驗昇思大模型!從今天起,筆者將以打卡的方式,將原文搬運和個人思考結合,分享25天的學習內容與成果。為了提升文章質量和閱讀體驗,筆者會將思考部分放在最后,供大家探索討論。同時也歡迎各位領取算力,免費體驗昇思大模型!
ResNet50遷移學習
在實際應用場景中,由于訓練數據集不足,所以很少有人會從頭開始訓練整個網絡。普遍的做法是,在一個非常大的基礎數據集上訓練得到一個預訓練模型,然后使用該模型來初始化網絡的權重參數或作為固定特征提取器應用于特定的任務中。本章將使用遷移學習的方法對ImageNet數據集中的狼和狗圖像進行分類。
遷移學習詳細內容見Stanford University CS231n。
數據準備
下載數據集
下載案例所用到的狗與狼分類數據集,數據集中的圖像來自于ImageNet,每個分類有大約120張訓練圖像與30張驗證圖像。使用download
接口下載數據集,并將下載后的數據集自動解壓到當前目錄下。
%%capture captured_output
# 實驗環境已經預裝了mindspore==2.2.14,如需更換mindspore版本,可更改下面mindspore的版本號
!pip uninstall mindspore -y
!pip install -i https://pypi.mirrors.ustc.edu.cn/simple mindspore==2.2.14
# 查看當前 mindspore 版本
!pip show mindspore
from download import downloaddataset_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/intermediate/Canidae_data.zip"download(dataset_url, "./datasets-Canidae", kind="zip", replace=True) #根據URL下載數據集并解壓
數據集的目錄結構如下:
datasets-Canidae/data/
└── Canidae├── train│ ├── dogs│ └── wolves└── val├── dogs└── wolves
加載數據集
狼狗數據集提取自ImageNet分類數據集,使用mindspore.dataset.ImageFolderDataset
接口來加載數據集,并進行相關圖像增強操作。
首先執行過程定義一些輸入:
batch_size = 18 # 批量大小
image_size = 224 # 訓練圖像空間大小
num_epochs = 5 # 訓練周期數
lr = 0.001 # 學習率
momentum = 0.9 # 動量
workers = 4 # 并行線程個數
import mindspore as ms
import mindspore.dataset as ds
import mindspore.dataset.vision as vision# 數據集目錄路徑
data_path_train = "./datasets-Canidae/data/Canidae/train/"
data_path_val = "./datasets-Canidae/data/Canidae/val/"# 創建訓練數據集def create_dataset_canidae(dataset_path, usage):"""數據加載"""data_set = ds.ImageFolderDataset(dataset_path,num_parallel_workers=workers,shuffle=True,)# 數據增強操作mean = [0.485 * 255, 0.456 * 255, 0.406 * 255] #三通道均值std = [0.229 * 255, 0.224 * 255, 0.225 * 255] #三通道標準差scale = 32if usage == "train":# Define map operations for training datasettrans = [vision.RandomCropDecodeResize(size=image_size, scale=(0.08, 1.0), ratio=(0.75, 1.333)), #圖像隨機縮放裁切vision.RandomHorizontalFlip(prob=0.5), #隨機水平翻轉vision.Normalize(mean=mean, std=std), #歸一化vision.HWC2CHW() #數據HWC轉CHW格式]else:# Define map operations for inference datasettrans = [vision.Decode(),vision.Resize(image_size + scale),vision.CenterCrop(image_size),vision.Normalize(mean=mean, std=std),vision.HWC2CHW()]# 數據映射操作data_set = data_set.map(operations=trans,input_columns='image',num_parallel_workers=workers)# 批量操作data_set = data_set.batch(batch_size)return data_setdataset_train = create_dataset_canidae(data_path_train, "train")
step_size_train = dataset_train.get_dataset_size()dataset_val = create_dataset_canidae(data_path_val, "val")
step_size_val = dataset_val.get_dataset_size()
數據集可視化
從mindspore.dataset.ImageFolderDataset
接口中加載的訓練數據集返回值為字典,用戶可通過 create_dict_iterator
接口創建數據迭代器,使用 next
迭代訪問數據集。本章中 batch_size
設為18,所以使用 next
一次可獲取18個圖像及標簽數據。
data = next(dataset_train.create_dict_iterator())
images = data["image"]
labels = data["label"]print("Tensor of image", images.shape)
print("Labels:", labels)
對獲取到的圖像及標簽數據進行可視化,標題為圖像對應的label名稱。
import matplotlib.pyplot as plt
import numpy as np# class_name對應label,按文件夾字符串從小到大的順序標記label
class_name = {0: "dogs", 1: "wolves"}plt.figure(figsize=(5, 5))
for i in range(4): #展示四張圖# 獲取圖像及其對應的labeldata_image = images[i].asnumpy()data_label = labels[i]# 處理圖像供展示使用data_image = np.transpose(data_image, (1, 2, 0))mean = np.array([0.485, 0.456, 0.406])std = np.array([0.229, 0.224, 0.225])data_image = std * data_image + meandata_image = np.clip(data_image, 0, 1)# 顯示圖像plt.subplot(2, 2, i+1)plt.imshow(data_image)plt.title(class_name[int(labels[i].asnumpy())])plt.axis("off") #不展示軸線plt.show()
訓練模型
本章使用ResNet50模型進行訓練。搭建好模型框架后,通過將pretrained
參數設置為True來下載ResNet50的預訓練模型并將權重參數加載到網絡中。
構建Resnet50網絡
from typing import Type, Union, List, Optional
from mindspore import nn, train
from mindspore.common.initializer import Normalweight_init = Normal(mean=0, sigma=0.02)
gamma_init = Normal(mean=1, sigma=0.02)
class ResidualBlockBase(nn.Cell):expansion: int = 1 # 最后一個卷積核數量與第一個卷積核數量相等def __init__(self, in_channel: int, out_channel: int,stride: int = 1, norm: Optional[nn.Cell] = None,down_sample: Optional[nn.Cell] = None) -> None:super(ResidualBlockBase, self).__init__()if not norm:self.norm = nn.BatchNorm2d(out_channel)else:self.norm = normself.conv1 = nn.Conv2d(in_channel, out_channel,kernel_size=3, stride=stride,weight_init=weight_init) #二維卷積,卷積核尺寸為3*3self.conv2 = nn.Conv2d(in_channel, out_channel,kernel_size=3, weight_init=weight_init) #二維卷積,卷積核尺寸為3*3self.relu = nn.ReLU() #激活函數ReLUself.down_sample = down_sample #下采樣def construct(self, x):"""ResidualBlockBase construct."""identity = x # shortcuts分支out = self.conv1(x) # 主分支第一層:3*3卷積層out = self.norm(out)out = self.relu(out)out = self.conv2(out) # 主分支第二層:3*3卷積層out = self.norm(out)if self.down_sample is not None:identity = self.down_sample(x)out += identity # 輸出為主分支與shortcuts之和out = self.relu(out)return out
class ResidualBlock(nn.Cell):expansion = 4 # 最后一個卷積核的數量是第一個卷積核數量的4倍def __init__(self, in_channel: int, out_channel: int,stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:super(ResidualBlock, self).__init__()self.conv1 = nn.Conv2d(in_channel, out_channel,kernel_size=1, weight_init=weight_init) #二維卷積,卷積核尺寸為1*1self.norm1 = nn.BatchNorm2d(out_channel) #批歸一化self.conv2 = nn.Conv2d(out_channel, out_channel,kernel_size=3, stride=stride,weight_init=weight_init) #二維卷積,卷積核尺寸為3*3self.norm2 = nn.BatchNorm2d(out_channel) #批歸一化self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,kernel_size=1, weight_init=weight_init) #二維卷積,卷積核尺寸為1*1self.norm3 = nn.BatchNorm2d(out_channel * self.expansion)self.relu = nn.ReLU()self.down_sample = down_sampledef construct(self, x):identity = x # shortscuts分支out = self.conv1(x) # 主分支第一層:1*1卷積層out = self.norm1(out)out = self.relu(out)out = self.conv2(out) # 主分支第二層:3*3卷積層out = self.norm2(out)out = self.relu(out)out = self.conv3(out) # 主分支第三層:1*1卷積層out = self.norm3(out)if self.down_sample is not None:identity = self.down_sample(x)out += identity # 輸出為主分支與shortcuts之和out = self.relu(out)return out
def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]],channel: int, block_nums: int, stride: int = 1):down_sample = None # shortcuts分支if stride != 1 or last_out_channel != channel * block.expansion:down_sample = nn.SequentialCell([nn.Conv2d(last_out_channel, channel * block.expansion,kernel_size=1, stride=stride, weight_init=weight_init), #二維卷積,卷積核尺寸為1*1nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init)])layers = []layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))in_channel = channel * block.expansion# 堆疊殘差網絡for _ in range(1, block_nums):layers.append(block(in_channel, channel))return nn.SequentialCell(layers)
from mindspore import load_checkpoint, load_param_into_netclass ResNet(nn.Cell):def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]],layer_nums: List[int], num_classes: int, input_channel: int) -> None:super(ResNet, self).__init__()self.relu = nn.ReLU()# 第一個卷積層,輸入channel為3(彩色圖像),輸出channel為64self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init)self.norm = nn.BatchNorm2d(64)# 最大池化層,縮小圖片的尺寸self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')# 各個殘差網絡結構塊定義,self.layer1 = make_layer(64, block, 64, layer_nums[0])self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)# 平均池化層self.avg_pool = nn.AvgPool2d()# flattern層self.flatten = nn.Flatten()# 全連接層self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes)def construct(self, x):x = self.conv1(x)x = self.norm(x)x = self.relu(x)x = self.max_pool(x)x = self.layer1(x)x = self.layer2(x)x = self.layer3(x)x = self.layer4(x)x = self.avg_pool(x)x = self.flatten(x)x = self.fc(x)return xdef _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]],layers: List[int], num_classes: int, pretrained: bool, pretrianed_ckpt: str,input_channel: int):model = ResNet(block, layers, num_classes, input_channel)if pretrained:# 加載預訓練模型download(url=model_url, path=pretrianed_ckpt, replace=True)param_dict = load_checkpoint(pretrianed_ckpt)load_param_into_net(model, param_dict)return modeldef resnet50(num_classes: int = 1000, pretrained: bool = False):"ResNet50模型"resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt"resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt"return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes,pretrained, resnet50_ckpt, 2048)
固定特征進行訓練
使用固定特征進行訓練的時候,需要凍結除最后一層之外的所有網絡層。通過設置 requires_grad == False
凍結參數,以便不在反向傳播中計算梯度。
import mindspore as ms
import matplotlib.pyplot as plt
import os
import timenet_work = resnet50(pretrained=True)# 全連接層輸入層的大小
in_channels = net_work.fc.in_channels
# 輸出通道數大小為狼狗分類數2
head = nn.Dense(in_channels, 2)
# 重置全連接層
net_work.fc = head# 平均池化層kernel size為7
avg_pool = nn.AvgPool2d(kernel_size=7)
# 重置平均池化層
net_work.avg_pool = avg_pool# 凍結除最后一層外的所有參數
for param in net_work.get_parameters():if param.name not in ["fc.weight", "fc.bias"]:param.requires_grad = False# 定義優化器和損失函數
opt = nn.Momentum(params=net_work.trainable_params(), learning_rate=lr, momentum=0.5)
loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')def forward_fn(inputs, targets):logits = net_work(inputs)loss = loss_fn(logits, targets)return lossgrad_fn = ms.value_and_grad(forward_fn, None, opt.parameters)def train_step(inputs, targets):loss, grads = grad_fn(inputs, targets)opt(grads)return loss# 實例化模型
model1 = train.Model(net_work, loss_fn, opt, metrics={"Accuracy": train.Accuracy()})
訓練和評估
開始訓練模型,與沒有預訓練模型相比,將節約一大半時間,因為此時可以不用計算部分梯度。保存評估精度最高的ckpt文件于當前路徑的./BestCheckpoint/resnet50-best-freezing-param.ckpt。
import mindspore as ms
import matplotlib.pyplot as plt
import os
import time
dataset_train = create_dataset_canidae(data_path_train, "train")
step_size_train = dataset_train.get_dataset_size()dataset_val = create_dataset_canidae(data_path_val, "val")
step_size_val = dataset_val.get_dataset_size()num_epochs = 5# 創建迭代器
data_loader_train = dataset_train.create_tuple_iterator(num_epochs=num_epochs)
data_loader_val = dataset_val.create_tuple_iterator(num_epochs=num_epochs)
best_ckpt_dir = "./BestCheckpoint"
best_ckpt_path = "./BestCheckpoint/resnet50-best-freezing-param.ckpt"
import mindspore as ms
import matplotlib.pyplot as plt
import os
import time
# 開始循環訓練
print("Start Training Loop ...")best_acc = 0for epoch in range(num_epochs):losses = []net_work.set_train()epoch_start = time.time()# 為每輪訓練讀入數據for i, (images, labels) in enumerate(data_loader_train):labels = labels.astype(ms.int32)loss = train_step(images, labels)losses.append(loss)# 每個epoch結束后,驗證準確率acc = model1.eval(dataset_val)['Accuracy']epoch_end = time.time()epoch_seconds = (epoch_end - epoch_start) * 1000step_seconds = epoch_seconds/step_size_trainprint("-" * 20)print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % (epoch+1, num_epochs, sum(losses)/len(losses), acc))print("epoch time: %5.3f ms, per step time: %5.3f ms" % (epoch_seconds, step_seconds))if acc > best_acc:best_acc = accif not os.path.exists(best_ckpt_dir):os.mkdir(best_ckpt_dir)ms.save_checkpoint(net_work, best_ckpt_path)print("=" * 80)
print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, "f"save the best ckpt file in {best_ckpt_path}", flush=True)
可視化模型預測
使用固定特征得到的best.ckpt文件對對驗證集的狼和狗圖像數據進行預測。若預測字體為藍色即為預測正確,若預測字體為紅色則預測錯誤。
import matplotlib.pyplot as plt
import mindspore as msdef visualize_model(best_ckpt_path, val_ds):net = resnet50()# 全連接層輸入層的大小in_channels = net.fc.in_channels# 輸出通道數大小為狼狗分類數2head = nn.Dense(in_channels, 2)# 重置全連接層net.fc = head# 平均池化層kernel size為7avg_pool = nn.AvgPool2d(kernel_size=7)# 重置平均池化層net.avg_pool = avg_pool# 加載模型參數param_dict = ms.load_checkpoint(best_ckpt_path)ms.load_param_into_net(net, param_dict)model = train.Model(net)# 加載驗證集的數據進行驗證data = next(val_ds.create_dict_iterator())images = data["image"].asnumpy()labels = data["label"].asnumpy()class_name = {0: "dogs", 1: "wolves"}# 預測圖像類別output = model.predict(ms.Tensor(data['image']))pred = np.argmax(output.asnumpy(), axis=1)# 顯示圖像及圖像的預測值plt.figure(figsize=(5, 5))for i in range(4):plt.subplot(2, 2, i + 1)# 若預測正確,顯示為藍色;若預測錯誤,顯示為紅色color = 'blue' if pred[i] == labels[i] else 'red'plt.title('predict:{}'.format(class_name[pred[i]]), color=color)picture_show = np.transpose(images[i], (1, 2, 0))mean = np.array([0.485, 0.456, 0.406])std = np.array([0.229, 0.224, 0.225])picture_show = std * picture_show + meanpicture_show = np.clip(picture_show, 0, 1)plt.imshow(picture_show)plt.axis('off')plt.show()
visualize_model(best_ckpt_path, dataset_val)
個人任務打卡(讀者請忽略)
個人理解與總結
本章節主要描述了使用昇思大模型完成ResNet50遷移學習的主要功能。遷移學習的本質是把為任務 A 開發的模型作為初始點,重新使用在為任務 B 開發模型的過程中。具體而言,本章節包含了遷移學習的數據準備、加載數據集、訓練模型等三部分。該章節的重點在于構建ResNet50網絡,然后完成固定特征進行訓練,由于遷移學習的高效性,訓練時間得以大幅度優化,且更容易達到更高的圖像分類準確率。最終,ResNet50圖像分類網絡成功通過遷移學習完成了狼-狗的圖像分類任務。(在常見的pytorch或其他深度學習網絡框架中,如果要實現遷移學習,同樣也需要需要設置pretained=true
、weight='xxxx.pth'
)