多類圖像分類的目標是為一組固定類別中的圖像分配標簽。
目錄
加載和處理數據
搭建模型
定義損失函數
定義優化器
訓練和遷移學習
用隨機權重進行訓練
用預訓練權重進行訓練
加載和處理數據
將使用 PyTorch torchvision 包中提供的 STL-10 數據集,數據集中有 10 個類:飛機、鳥、車、貓、鹿、狗、馬、猴、船、卡車。圖像為96*96像素的RGB圖像。數據集包含 5,000 張訓練圖像和 8,000 張測試圖像。在訓練數據集和測試數據集中,每個類分別有 500 和 800 張圖像。
from torchvision import datasets
import torchvision.transforms as transforms
import ospath2data="./data"
# 如果數據路徑不存在,則創建
if not os.path.exists(path2data):os.mkdir(path2data)# 定義數據轉換
data_transformer = transforms.Compose([transforms.ToTensor()])# 從datasets庫中導入STL10數據集,并指定數據集的路徑、分割方式、是否下載以及數據轉換器
train_ds=datasets.STL10(path2data, split='train',download=True,transform=data_transformer)# 打印數據形狀
print(train_ds.data.shape)
?若數據集導入較慢可直接下載:http://ai.stanford.edu/~acoates/stl10/stl10_binary.tar.gz
import collections# 獲取標簽
y_train=[y for _,y in train_ds]# 統計標簽
counter_train=collections.Counter(y_train)
print(counter_train)
# 加載數據
test0_ds=datasets.STL10(path2data, split='test', download=True,transform=data_transformer)
# 打印數據形狀
print(test0_ds.data.shape)
# 導入StratifiedShuffleSplit模塊
from sklearn.model_selection import StratifiedShuffleSplit# 創建StratifiedShuffleSplit對象,設置分割次數為1,測試集大小為0.2,隨機種子為0
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=0)# 獲取test0_ds的索引
indices=list(range(len(test0_ds)))# 獲取test0_ds的標簽
y_test0=[y for _,y in test0_ds]# 對索引和標簽進行分割
for test_index, val_index in sss.split(indices, y_test0):# 打印測試集和驗證集的索引print("test:", test_index, "val:", val_index)# 打印測試集和驗證集的大小print(len(val_index),len(test_index))
# 從torch.utils.data中導入Subset類
from torch.utils.data import Subset# 從test0_ds中選取val_index索引的子集,賦值給val_ds
val_ds=Subset(test0_ds,val_index)
# 從test0_ds中選取test_index索引的子集,賦值給test_ds
test_ds=Subset(test0_ds,test_index)import collections
import numpy as np# 獲取標簽
y_test=[y for _,y in test_ds]
y_val=[y for _,y in val_ds]# 統計測試集和驗證集的標簽數量
counter_test=collections.Counter(y_test)
counter_val=collections.Counter(y_val)# 打印測試集和驗證集的標簽數量
print(counter_test)
print(counter_val)
from torchvision import utils
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline# 設置隨機種子為0
np.random.seed(0)# 定義一個函數,用于顯示圖像
def show(img,y=None,color=True):# 將圖像轉換為numpy數組npimg = img.numpy()# 將圖像的維度從(C,H,W)轉換為(H,W,C)npimg_tr=np.transpose(npimg, (1,2,0))# 顯示圖像plt.imshow(npimg_tr)# 如果有標簽,則顯示標簽if y is not None:plt.title("label: "+str(y))# 定義網格大小
grid_size=4
# 隨機生成4個索引
rnd_inds=np.random.randint(0,len(train_ds),grid_size)
print("image indices:",rnd_inds)# 從訓練集中獲取這4個索引對應的圖像和標簽
x_grid=[train_ds[i][0] for i in rnd_inds]
y_grid=[train_ds[i][1] for i in rnd_inds]# 將這4個圖像拼接成一個網格
x_grid=utils.make_grid(x_grid, nrow=4, padding=2)
print(x_grid.shape)# 調用helper函數顯示網格
plt.figure(figsize=(10,10))
show(x_grid,y_grid)
?
# 設置隨機種子為0
np.random.seed(0)# 設置網格大小
grid_size=4
# 從驗證數據集中隨機選擇grid_size個索引
rnd_inds=np.random.randint(0,len(val_ds),grid_size)
print("image indices:",rnd_inds)# 從驗證數據集中選擇對應的圖像
x_grid=[val_ds[i][0] for i in rnd_inds]
# 從驗證數據集中選擇對應的標簽
y_grid=[val_ds[i][1] for i in rnd_inds]# 將圖像排列成網格
x_grid=utils.make_grid(x_grid, nrow=4, padding=2)
print(x_grid.shape)# 調用輔助函數
plt.figure(figsize=(10,10))
# 顯示網格圖像和標簽
show(x_grid,y_grid)
?
?
import numpy as np# 計算訓練集中每個樣本的RGB均值
meanRGB=[np.mean(x.numpy(),axis=(1,2)) for x,_ in train_ds]
# 計算訓練集中每個樣本的RGB標準差
stdRGB=[np.std(x.numpy(),axis=(1,2)) for x,_ in train_ds] meanR=np.mean([m[0] for m in meanRGB]) # 計算所有樣本的R通道均值的平均值
meanG=np.mean([m[1] for m in meanRGB])
meanB=np.mean([m[2] for m in meanRGB]) stdR=np.mean([s[0] for s in stdRGB]) # 計算所有樣本的R通道標準差的平均值
stdG=np.mean([s[1] for s in stdRGB])
stdB=np.mean([s[2] for s in stdRGB]) print(meanR,meanG,meanB) # 打印R、G、B通道的均值
print(stdR,stdG,stdB) # 打印R、G、B通道的標準差
# 定義訓練數據的轉換器
train_transformer = transforms.Compose([# 隨機水平翻轉,翻轉概率為0.5transforms.RandomHorizontalFlip(p=0.5), # 隨機垂直翻轉,翻轉概率為0.5transforms.RandomVerticalFlip(p=0.5), # 將圖像轉換為張量transforms.ToTensor(),# 對圖像進行歸一化,均值和標準差分別為meanR, meanG, meanB和stdR, stdG, stdBtransforms.Normalize([meanR, meanG, meanB], [stdR, stdG, stdB])])# 定義測試數據的轉換器
test0_transformer = transforms.Compose([# 將圖像轉換為張量transforms.ToTensor(),# 對圖像進行歸一化,均值和標準差分別為meanR, meanG, meanB和stdR, stdG, stdBtransforms.Normalize([meanR, meanG, meanB], [stdR, stdG, stdB]),]) # 將訓練數據集的轉換器賦值給訓練數據集的transform屬性
train_ds.transform=train_transformer
# 將測試數據集的轉換器賦值給測試數據集的transform屬性
test0_ds.transform=test0_transformerimport torch
import numpy as np
import matplotlib.pyplot as plt# 設置隨機種子
np.random.seed(0)
torch.manual_seed(0)# 定義網格大小
grid_size=4# 從訓練數據集中隨機選擇grid_size個樣本的索引
rnd_inds=np.random.randint(0,len(train_ds),grid_size)
print("image indices:",rnd_inds)# 根據索引從訓練數據集中獲取對應的樣本
x_grid=[train_ds[i][0] for i in rnd_inds]
y_grid=[train_ds[i][1] for i in rnd_inds]# 將樣本轉換為網格形式
x_grid=utils.make_grid(x_grid, nrow=4, padding=2)
print(x_grid.shape)# 創建一個10x10的圖像
plt.figure(figsize=(10,10))
# 顯示網格和對應的標簽
show(x_grid,y_grid)
from torch.utils.data import DataLoader# 創建訓練數據集的DataLoader,batch_size為32,shuffle為True,表示每次迭代時都會打亂數據集
train_dl = DataLoader(train_ds, batch_size=32, shuffle=True)
# 創建驗證數據集的DataLoader,batch_size為64,shuffle為False,表示每次迭代時不會打亂數據集
val_dl = DataLoader(val_ds, batch_size=64, shuffle=False) # 遍歷訓練數據集
for x, y in train_dl:# 打印x的形狀print(x.shape)# 打印y的形狀print(y.shape)# 跳出循環break
# 遍歷val_dl中的每個元素,x和y分別表示輸入和標簽
for x, y in val_dl:# 打印輸入的形狀print(x.shape)# 打印標簽的形狀print(y.shape)# 退出循環break
# 從datasets庫中導入FashionMNIST數據集,并將其設置為訓練集
fashion_train=datasets.FashionMNIST(path2data, train=True, download=True)
搭建模型
使用torchvision為多分類任務構建一個模型。torchvision軟件包提供了用于圖像分類的多個最先進的深度學習模型的實現,包括 AlexNet、VGG、ResNet、SqueezeNet、DenseNet、Inception、GoogleNet、ShuffleNet。這些模型在 ImageNet 數據集上進行了訓練,其中包含來自 1,000 個班級的 1400 多萬張圖像。可以分別使用具有隨機初始化權重的架構、預訓練權重進行嘗試。
from torchvision import models
import torch# 創建一個resnet18模型,pretrained參數設置為False,表示不使用預訓練的權重
model_resnet18 = models.resnet18(pretrained=False)
# 打印模型ResNet18
print(model_resnet18)
from torch import nn
# 定義類別數量
num_classes=10
# 獲取模型ResNet18的全連接層輸入特征數量
num_ftrs = model_resnet18.fc.in_features
# 將全連接層替換為新的全連接層,輸出特征數量為類別數量
model_resnet18.fc = nn.Linear(num_ftrs, num_classes)# 定義設備為GPU
device = torch.device("cuda:0")
# 將模型移動到GPU上
model_resnet18.to(device)
from torchsummary import summary# 打印模型結構,輸入大小為(3, 224, 224),即3個通道,224x224大小的圖像
summary(model_resnet18, input_size=(3, 224, 224))
# 遍歷模型ResNet18的參數
for w in model_resnet18.parameters():# 將參數轉換為CPU數據w=w.data.cpu()# 打印參數的形狀print(w.shape)break# 計算參數的最小值
min_w=torch.min(w)
# 計算w1,其中w1 = (-1/(2*min_w))*w + 0.5
w1 = (-1/(2*min_w))*w + 0.5
# 打印w1的最小值和最大值
print(torch.min(w1).item(),torch.max(w1).item())# 計算網格大小
grid_size=len(w1)
# 生成網格
x_grid=[w1[i] for i in range(grid_size)]
x_grid=utils.make_grid(x_grid, nrow=8, padding=1)
print(x_grid.shape)# 創建一個5x5的圖像
plt.figure(figsize=(5,5))
show(x_grid)
采用預訓練權重
from torchvision import models
import torch# 加載預訓練的resnet18模型
resnet18_pretrained = models.resnet18(pretrained=True)# 定義分類的類別數
num_classes=10
# 獲取resnet18模型的最后一層全連接層的輸入特征數
num_ftrs = resnet18_pretrained.fc.in_features
# 將最后一層全連接層替換為新的全連接層,新的全連接層的輸出特征數為num_classes
resnet18_pretrained.fc = nn.Linear(num_ftrs, num_classes)# 定義設備為cuda:0
device = torch.device("cuda:0")
# 將模型移動到cuda:0設備上
resnet18_pretrained.to(device)
# 遍歷resnet18_pretrained的參數
for w in resnet18_pretrained.parameters():# 將參數轉換為cpu格式w=w.data.cpu()print(w.shape)break# 計算w的最小值
min_w=torch.min(w)
# 計算w1,其中w1=(-1/(2*min_w))*w + 0.5
w1 = (-1/(2*min_w))*w + 0.5
# 打印w1的最小值和最大值
print(torch.min(w1).item(),torch.max(w1).item())# 計算w1的網格大小
grid_size=len(w1)
# 將w1轉換為網格形式
x_grid=[w1[i] for i in range(grid_size)]
x_grid=utils.make_grid(x_grid, nrow=8, padding=1)
print(x_grid.shape)# 創建一個5x5的圖像
plt.figure(figsize=(5,5))
show(x_grid)
?
定義損失函數
定義損失函數的目的是將模型優化為預定義的指標。分類任務的標準損失函數是交叉熵損失或對數損失。在定義損失函數時,需要考慮模型輸出的數量及其激活函數。對于多類分類任務,輸出數設置為類數,輸出激活函數確確定損失函數。
輸出激活 | 輸出數量 | 損失函數 |
---|---|---|
None? | num_classes? | nn.CrossEntropyLoss |
log_Softmax? | num_classes? | nn.NLLLoss |
torch.manual_seed(0)# 定義輸入數據的維度
n,c=4,5
# 生成隨機輸入數據,并設置requires_grad=True,表示需要計算梯度
y = torch.randn(n, c, requires_grad=True)
# 打印輸入數據的形狀
print(y.shape)# 定義交叉熵損失函數,reduction參數設置為"sum",表示將所有樣本的損失相加
loss_func = nn.CrossEntropyLoss(reduction="sum")
# 生成隨機目標數據,表示每個樣本的類別
target = torch.randint(c,size=(n,))
# 打印目標數據的形狀
print(target.shape)# 計算損失
loss = loss_func(y, target)
# 打印損失值
print(loss.item())
# 反向傳播,計算梯度
loss.backward()
# 打印輸出y的值
print (y.data)
定義優化器
torch.optim 包提供了通用優化器的實現。優化器將保持當前狀態,并根據計算出的梯度更新參數。對于分類任務,隨機梯度下降 (SGD) 和 Adam 優化器非常常用。Adam 優化器在速度和準確性方面通常優于 SGD,因此這里選擇 Adam 優化器。
from torch import optim
# 定義優化器,使用Adam優化算法,優化model_resnet18的參數,學習率為1e-4
opt = optim.Adam(model_resnet18.parameters(), lr=1e-4)
# 定義一個函數,用于獲取優化器的學習率
def get_lr(opt):# 遍歷優化器的參數組for param_group in opt.param_groups:# 返回學習率return param_group['lr']# 調用函數,獲取當前學習率
current_lr=get_lr(opt)
# 打印當前學習率
print('current lr={}'.format(current_lr))
?
from torch.optim.lr_scheduler import CosineAnnealingLR# 創建學習率調度器,T_max表示周期長度,eta_min表示最小學習率
lr_scheduler = CosineAnnealingLR(opt,T_max=2,eta_min=1e-5)
# 定義一個空列表lrs
lrs=[]
# 循環10次
for i in range(10):# 調用lr_scheduler.step()方法lr_scheduler.step()# 調用get_lr()方法獲取當前學習率lr=get_lr(opt)# 打印當前epoch和對應的學習率print("epoch %s, lr: %.1e" %(i,lr))# 將當前學習率添加到列表lrs中lrs.append(lr)
# 繪制lrs列表中的數據
plt.plot(lrs)
?
?
訓練和遷移學習
到目前為止,已經創建了數據集并定義了模型、損失函數和優化器,接下來將進行訓練和驗證。首先使用隨機初始化的權重訓練模型。然后使用預先訓練的權重訓練模型,這也稱為遷移學習。遷移學習將從一個問題中學到的知識(權重)用于其他類似問題。訓練和驗證腳本可能很長且重復。為了提高代碼可讀性并避免代碼重復,將先構建一些輔助函數。
# 定義一個函數metrics_batch,用于計算預測結果和目標之間的正確率
def metrics_batch(output, target):# 將輸出結果的最大值所在的索引作為預測結果pred = output.argmax(dim=1, keepdim=True)# 計算預測結果和目標之間的正確率corrects=pred.eq(target.view_as(pred)).sum().item()# 返回正確率return corrects
def loss_batch(loss_func, output, target, opt=None):# 計算batch的損失loss = loss_func(output, target)# 計算batch的評估指標metric_b = metrics_batch(output,target)# 如果有優化器,則進行反向傳播和參數更新if opt is not None:opt.zero_grad()loss.backward()opt.step()# 返回損失和評估指標return loss.item(), metric_bdevice = torch.device("cuda")# 定義一個函數loss_epoch,用于計算模型在數據集上的損失
def loss_epoch(model,loss_func,dataset_dl,sanity_check=False,opt=None):# 初始化運行損失和運行指標running_loss=0.0running_metric=0.0# 獲取數據集的長度len_data=len(dataset_dl.dataset)# 遍歷數據集for xb, yb in dataset_dl:# 將數據移動到GPU上xb=xb.to(device)yb=yb.to(device)# 獲取模型輸出output=model(xb)# 計算當前批次的損失和指標loss_b,metric_b=loss_batch(loss_func, output, yb, opt)# 累加損失和指標running_loss+=loss_bif metric_b is not None:running_metric+=metric_b# 如果是sanity_check模式,則只計算一個批次if sanity_check is True:break# 計算平均損失和指標loss=running_loss/float(len_data)metric=running_metric/float(len_data)# 返回平均損失和指標return loss, metricdef train_val(model, params):# 獲取參數num_epochs=params["num_epochs"]loss_func=params["loss_func"]opt=params["optimizer"]train_dl=params["train_dl"]val_dl=params["val_dl"]sanity_check=params["sanity_check"]lr_scheduler=params["lr_scheduler"]path2weights=params["path2weights"]# 初始化損失和指標歷史記錄loss_history={"train": [],"val": [],}metric_history={"train": [],"val": [],}# 復制模型參數best_model_wts = copy.deepcopy(model.state_dict())# 初始化最佳損失best_loss=float('inf')# 遍歷每個epochfor epoch in range(num_epochs):# 獲取當前學習率current_lr=get_lr(opt)print('Epoch {}/{}, current lr={}'.format(epoch, num_epochs - 1, current_lr))# 訓練模型model.train()train_loss, train_metric=loss_epoch(model,loss_func,train_dl,sanity_check,opt)# 記錄訓練損失和指標loss_history["train"].append(train_loss)metric_history["train"].append(train_metric)# 評估模型model.eval()with torch.no_grad():val_loss, val_metric=loss_epoch(model,loss_func,val_dl,sanity_check)# 如果驗證損失小于最佳損失,則更新最佳損失和最佳模型參數if val_loss < best_loss:best_loss = val_lossbest_model_wts = copy.deepcopy(model.state_dict())# 將最佳模型參數保存到本地文件torch.save(model.state_dict(), path2weights)print("Copied best model weights!")# 記錄驗證損失和指標loss_history["val"].append(val_loss)metric_history["val"].append(val_metric)# 更新學習率lr_scheduler.step()# 打印訓練損失、驗證損失和準確率print("train loss: %.6f, dev loss: %.6f, accuracy: %.2f" %(train_loss,val_loss,100*val_metric))print("-"*10) # 加載最佳模型參數model.load_state_dict(best_model_wts)# 返回模型、損失歷史和指標歷史return model, loss_history, metric_history
用隨機權重進行訓練
import copy# 定義交叉熵損失函數,reduction參數設置為"sum",表示將所有樣本的損失相加
loss_func = nn.CrossEntropyLoss(reduction="sum")
# 定義Adam優化器,優化模型參數,學習率為1e-4
opt = optim.Adam(model_resnet18.parameters(), lr=1e-4)
# 定義余弦退火學習率調度器,T_max參數設置為5,eta_min參數設置為1e-6
lr_scheduler = CosineAnnealingLR(opt,T_max=5,eta_min=1e-6)# 定義訓練參數字典
params_train={"num_epochs": 3, # 訓練輪數"optimizer": opt, # 優化器"loss_func": loss_func, # 損失函數"train_dl": train_dl, # 訓練數據集"val_dl": val_dl, # 驗證數據集"sanity_check": False, # 是否進行sanity check"lr_scheduler": lr_scheduler, # 學習率調度器"path2weights": "./models/resnet18.pt", # 模型權重保存路徑
}# 訓練和驗證模型
model_resnet18,loss_hist,metric_hist=train_val(model_resnet18,params_train)
# 獲取訓練參數中的訓練輪數
num_epochs=params_train["num_epochs"]# 繪制訓練和驗證損失曲線
plt.title("Train-Val Loss")
plt.plot(range(1,num_epochs+1),loss_hist["train"],label="train")
plt.plot(range(1,num_epochs+1),loss_hist["val"],label="val")
plt.ylabel("Loss")
plt.xlabel("Training Epochs")
plt.legend()
plt.show()# 繪制訓練和驗證準確率曲線
plt.title("Train-Val Accuracy")
plt.plot(range(1,num_epochs+1),metric_hist["train"],label="train")
plt.plot(range(1,num_epochs+1),metric_hist["val"],label="val")
plt.ylabel("Accuracy")
plt.xlabel("Training Epochs")
plt.legend()
plt.show()
用預訓練權重進行訓練
import copy# 定義損失函數,使用交叉熵損失,并設置reduction為sum
loss_func = nn.CrossEntropyLoss(reduction="sum")
# 定義優化器,使用Adam優化器,并設置學習率為1e-4
opt = optim.Adam(resnet18_pretrained.parameters(), lr=1e-4)
# 定義學習率調度器,使用余弦退火調度器,設置最大周期為5,最小學習率為1e-6
lr_scheduler = CosineAnnealingLR(opt,T_max=5,eta_min=1e-6)# 定義訓練參數
params_train={"num_epochs": 3, # 設置訓練周期為3"optimizer": opt, # 設置優化器"loss_func": loss_func, # 設置損失函數"train_dl": train_dl, # 設置訓練數據集"val_dl": val_dl, # 設置驗證數據集"sanity_check": False, # 設置是否進行sanity check"lr_scheduler": lr_scheduler, # 設置學習率調度器"path2weights": "./models/resnet18_pretrained.pt", # 設置權重保存路徑
}# 調用train_val函數進行訓練和驗證,并返回訓練后的模型、損失歷史和指標歷史
resnet18_pretrained,loss_hist,metric_hist=train_val(resnet18_pretrained,params_train)
# 獲取訓練參數中的訓練輪數
num_epochs=params_train["num_epochs"]# 繪制訓練和驗證損失曲線
plt.title("Train-Val Loss")
plt.plot(range(1,num_epochs+1),loss_hist["train"],label="train")
plt.plot(range(1,num_epochs+1),loss_hist["val"],label="val")
plt.ylabel("Loss")
plt.xlabel("Training Epochs")
plt.legend()
plt.show()# 繪制訓練和驗證準確率曲線
plt.title("Train-Val Accuracy")
plt.plot(range(1,num_epochs+1),metric_hist["train"],label="train")
plt.plot(range(1,num_epochs+1),metric_hist["val"],label="val")
plt.ylabel("Accuracy")
plt.xlabel("Training Epochs")
plt.legend()
plt.show()