【3】遷移學習模型
文章目錄
- 前言
- 一、安裝相關模塊
- 二、訓練代碼
- 2.1. 管理預訓練模型
- 2.2. 模型訓練代碼
- 2.3. 可視化結果
- 2.4. 類別函數
- 總結
前言
主要簡述一下訓練代碼
三葉青圖像識別研究簡概
一、安裝相關模塊
#xingyun的筆記本
print('============================xingyun的筆記本=============================')
%pip install d2l
%pip install Ipython
%pip install efficientnet_pytorch #(可選)
%pip install timm
二、訓練代碼
整段代碼大致分為四塊: 管理預訓練模型、模型訓練、可視化結果、類別函數調用。
2.1. 管理預訓練模型
用于管理要使用的遷移學習模型(可添加),這里主要是對EfficientNet系列模型 、ResNet系列模型、MobileNet系列模型進行遷移學習。
import collections
import math
import os
import shutil
import pandas as pd
import torch
import torchvision
import timm
from torch import nn
from d2l import torch as d2l
import re
from itertools import product
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from efficientnet_pytorch import EfficientNet
from sklearn.model_selection import KFold
from torchvision.models import resnet101, resnet152, resnet18, resnet34, resnet50,mobilenet,mobilenet_v2,mobilenet_v3_large, mobilenet_v3_small, mobilenetv2, mobilenetv3class FineTuneModel:"""管理預訓練模型:1. EfficientNet系列模型 3. ResNet系列模型4. MobileNet系列模型"""def __init__(self, devices, num_classes,model_name):self.devices = devicesself.num_classes = num_classesself.model_name = model_namedef get_efficientnet(self):"""微調EfficientNet模型。:param model_name: EfficientNet模型的版本,如'efficientnet-b0'到'efficientnet-b7'。:return: 微調后的模型。"""# 加載預訓練的EfficientNet模型finetune_net = EfficientNet.from_pretrained(self.model_name)# 替換輸出層num_ftrs = finetune_net._fc.in_featuresfinetune_net._fc = nn.Linear(num_ftrs, self.num_classes)# 將模型參數分配到設備上finetune_net = finetune_net.to(self.devices[0])# 凍結所有層(除了最后的全連接層)for name, param in finetune_net.named_parameters():if 'fc' not in name: # 不凍結全連接層param.requires_grad = False# 確保全連接層的參數可訓練for param in finetune_net._fc.parameters():param.requires_grad = Truereturn finetune_netdef get_mobilenet(self):"""加載預訓練的MobileNet模型并進行微調設置。'mobilenet','mobilenet_v2','mobilenet_v3_large', 'mobilenet_v3_small', 'mobilenetv2', 'mobilenetv3',"""# 加載預訓練的MobileNetV2模型base_model_func = getattr(torchvision.models, self.model_name)base_model = base_model_func(pretrained=True)if self.model_name == "mobilenet_v2":num_features = base_model.classifier[-1].in_features# 定義一個新的分類頭classifier = nn.Sequential(nn.Linear(num_features, 256),nn.ReLU(),nn.Linear(256, self.num_classes))# 替換原有的分類頭base_model.classifier = classifierelse:# 獲取最后一個卷積層的輸出通道數量num_features = base_model.features[-1].out_channels# 定義一個新的分類頭classifier = nn.Sequential(nn.Linear(num_features, 256),nn.ReLU(),nn.Linear(256, self.num_classes))# 替換原有的分類頭base_model.classifier = classifier# 將模型參數分配到指定設備base_model = base_model.to(self.devices[0])# 凍結特征提取器的參數for name, param in base_model.named_parameters():if 'classifier' not in name: # 確保只凍結特征提取部分的參數param.requires_grad = Falsereturn base_modeldef get_resnet(self):#加載預訓練的resnet模型"""resnet101, resnet152, resnet18, resnet34, resnet50"""# 從torchvision.models模塊中動態獲取模型base_model_func = getattr(torchvision.models, self.model_name)base_model = base_model_func(pretrained=True)num_features = base_model.fc.in_features# 定義一個新的分類頭classifier = nn.Sequential(nn.Linear(num_features, 256),nn.ReLU(),nn.Linear(256,self.num_classes))# 替換原有的全連接層(分類頭)base_model.fc = classifier# 將模型參數分配到指定設備base_model = base_model.to(self.devices[0])# 凍結特征提取器的參數for name, param in base_model.named_parameters():if 'fc' not in name: # 確保只凍結特征提取部分的參數param.requires_grad = Falsereturn base_model
2.2. 模型訓練代碼
包括數據處理、模型訓練、參數調優、模型保存等。
class MyImageClassifier:"""1. 數據處理2. 模型訓練3. 參數調優4. 模型保存"""def __init__(self, data_dir, target_dir, batch_size, valid_ratio,train_folder,test_folder):self.data_dir = data_dirself.target_dir = target_dirself.batch_size = batch_sizeself.valid_ratio = valid_ratioself.train_folder = train_folderself.test_folder = test_folderdef read_csv_labels(self, fname):"""讀取fname來給標簽字典返回一個文件名"""with open(fname, 'r') as f:# 跳過文件頭行(列名)lines = f.readlines()[1:]tokens = [l.rstrip().split(',') for l in lines]return dict(((name, label) for name, label in tokens))def copyfile(self,filename, target_dir):"""將文件復制到目標目錄"""os.makedirs(target_dir, exist_ok=True)shutil.copy(filename, target_dir)def reorg_train_valid(self,labels):"""將驗證集從原始的訓練集中拆分出來"""# 訓練數據集中樣本最少的類別中的樣本數n = collections.Counter(labels.values()).most_common()[-1][1]# 驗證集中每個類別的樣本數n_valid_per_label = max(1, math.floor(n * self.valid_ratio))label_count = {}for train_file in os.listdir(os.path.join(self.data_dir, self.train_folder)):label = labels[train_file.split('.')[0]]fname = os.path.join(self.data_dir, self.train_folder, train_file)self.copyfile(fname, os.path.join(self.target_dir, 'train_valid_test','train_valid', label))if label not in label_count or label_count[label] < n_valid_per_label:self.copyfile(fname, os.path.join(self.target_dir, 'train_valid_test','valid', label))label_count[label] = label_count.get(label, 0) + 1else:self.copyfile(fname, os.path.join(self.target_dir, 'train_valid_test','train', label))return n_valid_per_labeldef reorg_test(self):"""在預測期間整理測試集,以方便讀取"""for test_file in os.listdir(os.path.join(self.data_dir, self.test_folder)):self.copyfile(os.path.join(self.data_dir, self.test_folder, test_file),os.path.join(self.target_dir, 'train_valid_test', 'test','unknown'))def reorg_san_data(self,labels_csv):labels = self.read_csv_labels(os.path.join(self.data_dir,labels_csv))self.reorg_train_valid(labels)self.reorg_test()print('# 訓練樣本 :', len(labels))print('# 類別 :', len(set(labels.values())))"""以上為數據整理函數"""def classes(self):class_to_idx = {}# 遍歷數據集文件夾中的子文件夾(每個子文件夾代表一個類別)for idx, class_name in enumerate(sorted(os.listdir(os.path.join(self.target_dir, 'train_valid_test', 'valid')))):if class_name.startswith('.'):continueclass_dir = os.path.join(os.path.join(self.target_dir, 'train_valid_test', 'valid'), class_name) # 類別文件夾路徑if os.path.isdir(class_dir):class_to_idx[idx] = class_nameprint(class_to_idx)print("============================")return class_to_idx#統計劃分的訓練集、驗證集數量def count_samples(self):"""統計每個類別訓練集和驗證集的數量"""train_valid_test_dirs = ['train', 'valid']data_counts = {'class': []}for dir_name in train_valid_test_dirs:class_dir = os.path.join(self.target_dir, 'train_valid_test', dir_name)if dir_name not in data_counts:data_counts[dir_name] = []for class_name in os.listdir(class_dir):if class_name.startswith('.'):continueclass_sub_dir = os.path.join(class_dir, class_name)if os.path.isdir(class_sub_dir):if class_name not in data_counts['class']:data_counts['class'].append(class_name)for key in train_valid_test_dirs:if key not in data_counts:data_counts[key] = [0] * len(data_counts['class'])else:data_counts[key].append(0)data_counts[dir_name][data_counts['class'].index(class_name)] += len(os.listdir(class_sub_dir))df = pd.DataFrame(data_counts)return dfdef shuju_zq_jz(self,batch_size):#數據增強transform_train = torchvision.transforms.Compose([# 隨機裁剪圖像,所得圖像為原始面積的0.08~1之間,高寬比在3/4和4/3之間。# 然后,縮放圖像以創建224x224的新圖像torchvision.transforms.RandomResizedCrop(224, scale=(0.08, 1.0),ratio=(3.0/4.0, 4.0/3.0)),torchvision.transforms.RandomHorizontalFlip(),# 隨機更改亮度,對比度和飽和度torchvision.transforms.ColorJitter(brightness=0.4,contrast=0.4,saturation=0.4),#轉換為張量格式torchvision.transforms.ToTensor(),# 標準化圖像的每個通道torchvision.transforms.Normalize([0.485, 0.456, 0.406],[0.229, 0.224, 0.225])])#測試時,我們只使用確定性的圖像預處理操作transform_test = torchvision.transforms.Compose([torchvision.transforms.Resize(256),# 從圖像中心裁切224x224大小的圖片torchvision.transforms.CenterCrop(224),torchvision.transforms.ToTensor(),torchvision.transforms.Normalize([0.485, 0.456, 0.406],[0.229, 0.224, 0.225])])#讀取整理后的含原始圖像文件的數據集train_ds, train_valid_ds = [torchvision.datasets.ImageFolder(os.path.join(self.target_dir, 'train_valid_test', folder),transform=transform_train) for folder in ['train', 'train_valid']]valid_ds, test_ds = [torchvision.datasets.ImageFolder(os.path.join(self.target_dir, 'train_valid_test', folder),transform=transform_test) for folder in ['valid', 'test']]train_iter, train_valid_iter = [torch.utils.data.DataLoader(dataset, batch_size, shuffle=True, drop_last=True)for dataset in (train_ds, train_valid_ds)]valid_iter = torch.utils.data.DataLoader(valid_ds, batch_size, shuffle=False,drop_last=False)test_iter = torch.utils.data.DataLoader(test_ds, batch_size, shuffle=False,drop_last=False)return train_iter,valid_iter,test_iter,train_valid_iter"""以上為數據處理函數"""#微調預訓練模型def get_net(self,devices,num_classes,model_name,model_leibie):fine_tune_model = FineTuneModel(d2l.try_all_gpus(), num_classes=num_classes,model_name = model_name) # 使用微調模型if model_leibie == 'get_efficientnet':base_model = fine_tune_model.get_efficientnet()elif model_leibie == 'get_mobilenet':base_model = fine_tune_model.get_mobilenet()elif model_leibie == 'get_resnet':base_model = fine_tune_model.get_resnet()return base_modeldef evaluate_loss(self,data_iter, net, devices):loss = nn.CrossEntropyLoss(reduction='none') #reduction='none'表示不對損失進行平均或求和,而是返回每個樣本的損失值。l_sum, n = 0.0, 0for features, labels in data_iter:features, labels = features.to(devices[0]), labels.to(devices[0])outputs = net(features)l = loss(outputs, labels)l_sum += l.sum()n += labels.numel() #累加樣本數量到n中,labels.numel()返回標簽張量中元素的個數return (l_sum / n).to('cpu') #計算所有樣本的平均損失值,并將其移動到CPU上返回def train(self, net, train_iter, valid_iter, num_epochs, lr, wd, devices, lr_period, lr_decay):net = nn.DataParallel(net, device_ids=devices).to(devices[0])trainer = torch.optim.SGD((param for param in net.parameters() if param.requires_grad), lr=lr, momentum=0.9, weight_decay=wd)scheduler = torch.optim.lr_scheduler.StepLR(trainer, lr_period, lr_decay)num_batches, timer = len(train_iter), d2l.Timer()legend = ['train loss', 'train acc', 'valid loss', 'valid acc'] # Add valid loss and accanimator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs], legend=legend)loss = nn.CrossEntropyLoss(reduction='none')best_acc = 0best_model_path = ""measures_list = []examples_sec_list = []for epoch in range(num_epochs):metric = d2l.Accumulator(3)net.train() # Switch to training modefor i, (features, labels) in enumerate(train_iter):timer.start()features, labels = features.to(devices[0]), labels.to(devices[0])trainer.zero_grad()output = net(features)l = loss(output, labels).sum()l.backward()trainer.step()metric.add(l, labels.shape[0], d2l.accuracy(output, labels))timer.stop()if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:animator.add(epoch + (i + 1) / num_batches, (metric[0] / metric[1], metric[2] / metric[1], None, None))measures = f'train loss {metric[0] / metric[1]:.3f}, train acc {metric[2] / metric[1]:.3f}'if valid_iter is not None:net.eval() # Switch to evaluation modevalid_metric = d2l.Accumulator(3)with torch.no_grad():for valid_features, valid_labels in valid_iter:valid_features, valid_labels = valid_features.to(devices[0]), valid_labels.to(devices[0])valid_output = net(valid_features)valid_l = loss(valid_output, valid_labels).sum()valid_metric.add(valid_l, valid_labels.shape[0], d2l.accuracy(valid_output, valid_labels))valid_acc = valid_metric[2] / valid_metric[1]animator.add(epoch + 1, (None, None, valid_metric[0] / valid_metric[1], valid_acc))measures += f', valid loss {valid_metric[0] / valid_metric[1]:.3f}, valid acc {valid_acc:.3f}'if valid_acc > best_acc:best_acc = valid_accbest_model_path = f'model_bests.pth'torch.save(net, best_model_path)print(f"Best model saved to {best_model_path} with accuracy {best_acc:.3f}")measures_list.append(measures)examples_sec = f'epoch {epoch}, {metric[1] * num_epochs / timer.sum():.1f} examples/sec on {str(devices)}'examples_sec_list.append(examples_sec)print(f'epoch {epoch}, ' + measures + f'\n{metric[1] * num_epochs / timer.sum():.1f} examples/sec on {str(devices)}')scheduler.step()for i, measure in enumerate(measures_list):print(f"Epoch {(i+1)}: {measure}")print(examples_sec_list)return measures_list, examples_sec_listdef get_valid_acc(self,measures):return float(re.search(r'valid acc (\d+\.\d+)', measures).group(1))# 訓練參數調優def train_parameter_tuning(self, param_grid,num_classes,batch_size,model_name,model_leibie):# 使用網格搜索來搜索最佳超參數組合best_accuracy = 0best_params = None# 初始化列表,用于存儲包含驗證準確率和參數的元組 acc_param_list = [] measures_lt = []for params in product(*param_grid.values()):param_dict = dict(zip(param_grid.keys(), params))print("Training with params:", param_dict)# 創建和訓練模型net = self.get_net(d2l.try_all_gpus(),num_classes,model_name,model_leibie)train_iter,valid_iter = self.shuju_zq_jz(batch_size)[0],self.shuju_zq_jz(batch_size)[1]measures_list,examples_sec_list = self.train(net, train_iter, valid_iter, **param_dict,devices = d2l.try_all_gpus())# 在驗證集上評估模型性能# 使用正則表達式提取valid acc對應的數值best_measures = max(measures_list, key=self.get_valid_acc)valid_acc = float(re.search(r'valid acc (\d+\.\d+)', best_measures).group(1))print(best_measures)# 將驗證準確率和參數字典合并為一個元組,并添加到列表中 acc_param_list.append((valid_acc, param_dict)) measures_lt.append(best_measures)# net.load_state_dict(torch.load("model_best.pth")) # 加載最佳模型net = torch.load('model_bests.pth')if valid_acc > best_accuracy:best_accuracy = valid_accbest_params = param_dictbest_net = net # 這里的最佳網絡是從最佳模型加載的for i,measure in enumerate(measures_lt):print(f"Trial {i+1}:")print(measure)print("========================================================")print()best_acc_index = max(range(len(acc_param_list)), key=lambda i: acc_param_list[i][0]) best_accuracy = acc_param_list[best_acc_index][0] best_params = acc_param_list[best_acc_index][1] print("================================================")print("Best accuracy:", best_accuracy) print("Best params:", best_params) print()for i, (acc, params) in enumerate(acc_param_list): print(f"Trial {i+1}: valid acc {acc}, params {params}")return best_net # 這是從最佳模型加載的網絡"""以上為模型訓練以及參數調優函數"""#保存訓練得到的模型權重文件def save_model(self,model_path,model_path_zheng,best_net):torch.save(best_net.state_dict(), model_path) #只保存模型的參數torch.save(best_net, model_path_zheng) #保存整個模型print(f"Model saved to {model_path}")
2.3. 可視化結果
包括查看模型在驗證集上的每一類的準確率、分類報告、混淆矩陣、AUC-ROC曲線
class ViewResult:"""查看訓練效果:1. 查看每一類在驗證集上的準確率2. 查看precision,recall和f1-score(即分類報告)3. 查看混淆矩陣4. 查看AUC-ROC曲線"""def __init__(self, best_net, valid_iter, devices, classes):self.net = best_netself.valid_iter = valid_iterself.devices = devicesself.classes = classesself.num_classes = len(classes)def view_result(self):print(self.num_classes)class_correct = [0.] * self.num_classesclass_total = [0.] * self.num_classesy_test, y_pred = [], []X_test = []with torch.no_grad():for images, labels in self.valid_iter:X_test.extend([_ for _ in images])outputs = self.net(images.to(self.devices[0]))_, predicted = torch.max(outputs, 1)predicted = predicted.cpu()c = (predicted == labels).squeeze()for i, label in enumerate(labels):class_correct[label] += c[i].item()class_total[label] += 1y_pred.extend(predicted.numpy())y_test.extend(labels.cpu().numpy())for i in range(self.num_classes):if class_total[i] != 0:accuracy = 100 * class_correct[i] / class_total[i]else:accuracy = 0print(f"Accuracy of {self.classes[i]:5s}: {accuracy:2.0f}%")#分類報告try:cr = classification_report(y_test, y_pred, target_names=list(self.classes.values()))print(cr)except Exception as e:print("An error occurred while generating the classification report:", str(e))#混淆矩陣cm = confusion_matrix(y_test, y_pred)labels = pd.DataFrame(cm).applymap(lambda v: f"{v}" if v != 0 else f"")plt.figure(figsize=(25, 20))sns.heatmap(cm, annot=labels, fmt='s', xticklabels=self.classes.items(), yticklabels=self.classes.items(), linewidths=0.1)plt.show()def evaluate_roc(self, num_classes):self.net.eval()y_true = []y_score = []for X, y in self.valid_iter:X, y = X.to(self.devices[0]), y.to(self.devices[0])y_true.append(y.cpu().numpy())y_score.append(self.net(X).detach().cpu().numpy())y_true = np.concatenate(y_true)y_score = np.concatenate(y_score)fpr = dict()tpr = dict()roc_auc = dict()for i in range(num_classes):y_true_i = np.where(y_true == i, 1, 0)y_score_i = y_score[:, i]fpr[i], tpr[i], _ = roc_curve(y_true_i, y_score_i)roc_auc[i] = auc(fpr[i], tpr[i])plt.figure(figsize=(15, 15))colors = list(mcolors.CSS4_COLORS.values())colors = colors[:num_classes]for i in range(num_classes):plt.plot(fpr[i], tpr[i], color=colors[i], lw=2, label=f'Class {i}, AUC = {roc_auc[i]:.2f}')plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')plt.xlim([0.0, 1.0])plt.ylim([0.0, 1.05])plt.xlabel('False Positive Rate')plt.ylabel('True Positive Rate')plt.title('Receiver Operating Characteristic for Multi-class Classification')plt.legend(loc="lower right")plt.show()
2.4. 類別函數
調用之前所定義的類,實現不同分類類別的模型的訓練
def leibie_class(leibie,num_classes,batch_size,valid_ratio,param_grid,model_path,target_dir,model_name,model_leibie,train_folder,test_folder,model_path_zheng):"""leibie: #填寫是幾分類的標簽文件,如要進行十分類,則填寫labels.csv;要進行五分類,則填寫labels_5.csv;要進行二分類,填寫labels_2.csv.num_classes : #填寫與leibie對應的類別數字,如要進行十分類,填寫數字10,以此類推。batch_size : #批量大小,即每次處理的樣本數量。valid_ratio : #驗證集所占比例,如為0.3,則代表驗證集占比為30%;即驗證集:訓練集==3:7。以此類推param_grid : #定義要調整的參數范圍,通過網格搜索,遍歷出最佳模型,獲取對應的參數。model_path : #最佳模型權重文件保存路徑,如'/kaggle/working/model_xcy_shi_1.pth',可以更改'/model_xcy_shi_1.pth',前面的‘/kaggle/working/’不能改變。target_dir : #整理后目標文件存放的目錄,如‘/kaggle/working/my_directory_shi’,代表是按十分類整理的文件;‘/kaggle/working/my_directory_wu’則代表是按五分類整理的文件。以此類推。model_name : #加載模型的名字。例如:'resnet34'model_leibie : #加載模型所屬類別函數。例如: 'get_resnet'train_folder : #原始訓練集存放文件夾test_folder : #原始測試集存放文件夾model_path_zheng : #完整模型存放路徑"""data_dir = "/kaggle/input/sanyeqing/" #存放原始數據的目錄image_classifier = MyImageClassifier(data_dir, target_dir, batch_size, valid_ratio,train_folder,test_folder) #圖像分類訓練模型類的實例化#調用類中的函數:image_classifier.reorg_san_data(leibie) #十分類(labels.csv為十分類,labels_5.csv為五分類......)valid_iter=image_classifier.shuju_zq_jz(batch_size)[1] #數據增強處理函數,class_to_idx=image_classifier.classes() #返回分類標簽與索引對應函數print(class_to_idx)best_net = image_classifier.train_parameter_tuning(param_grid,num_classes,batch_size,model_name,model_leibie) #十分類(10為十分類,5則為五分類......)# print("===================================================")
# print("最終的保存模型:")
# image_classifier.save_model(model_path,model_path_zheng,best_net)
# print();print()#訓練結果可視化:
# result_viewer = ViewResult(best_net, valid_iter, devices=d2l.try_all_gpus(), classes=class_to_idx)
# result_viewer.view_result()
# result_viewer.evaluate_roc(num_classes)# 使用示例
if __name__ == '__main__':# 定義要調整的參數范圍param_grid = {'num_epochs': [201],'lr': [1e-4],'wd': [1e-4],'lr_period': [2],'lr_decay': [1]}print("這是省份分類:")leibie_class("labels_hun_finally_2.csv",num_classes=3,batch_size=128,valid_ratio=0.2,param_grid=param_grid,
# model_path='/kaggle/working/model_wht_wu.pth',model_path=None,target_dir='/kaggle/working/my_directory_er',model_name='mobilenet_v3_large',model_leibie='get_mobilenet',train_folder = 'train_hun_finally',test_folder='test_hun_finally',model_path_zheng=None) #省份分類
總結
主要是運用遷移學習的方法,將預訓練模型在自定義的數據集上進行訓練。
2024/6/12