知識點回顧:
- 過擬合的判斷:測試集和訓練集同步打印指標
- 模型的保存和加載
- 僅保存權重
- 保存權重和模型
- 保存全部信息checkpoint,還包含訓練狀態
- 早停策略
作業:對信貸數據集訓練后保存權重,加載權重后繼續訓練50輪,并采取早停策略
import pandas as pd # 用于數據處理和分析,可處理表格數據。
import numpy as np # 用于數值計算,提供了高效的數組操作。
import matplotlib.pyplot as plt # 用于繪制各種類型的圖表
import seaborn as sns # 基于matplotlib的高級繪圖庫,能繪制更美觀的統計圖形。
import warningswarnings.filterwarnings("ignore")# 設置中文字體(解決中文顯示問題)
plt.rcParams['font.sans-serif'] = ['SimHei'] # Windows系統常用黑體字體
plt.rcParams['axes.unicode_minus'] = False # 正常顯示負號
data = pd.read_excel('data.xlsx') # 讀取數據# 先篩選字符串變量
discrete_features = data.select_dtypes(include=['object']).columns.tolist()
# Home Ownership 標簽編碼
home_ownership_mapping = {'Own Home': 1,'Rent': 2,'Have Mortgage': 3,'Home Mortgage': 4
}
data['Home Ownership'] = data['Home Ownership'].map(home_ownership_mapping)# Years in current job 標簽編碼
years_in_job_mapping = {'< 1 year': 1,'1 year': 2,'2 years': 3,'3 years': 4,'4 years': 5,'5 years': 6,'6 years': 7,'7 years': 8,'8 years': 9,'9 years': 10,'10+ years': 11
}
data['Years in current job'] = data['Years in current job'].map(years_in_job_mapping)# Purpose 獨熱編碼,記得需要將bool類型轉換為數值
data = pd.get_dummies(data, columns=['Purpose'])
data2 = pd.read_excel("data.xlsx") # 重新讀取數據,用來做列名對比
list_final = [] # 新建一個空列表,用于存放獨熱編碼后新增的特征名
for i in data.columns:if i not in data2.columns:list_final.append(i) # 這里打印出來的就是獨熱編碼后的特征名
for i in list_final:data[i] = data[i].astype(int) # 這里的i就是獨熱編碼后的特征名# Term 0 - 1 映射
term_mapping = {'Short Term': 0,'Long Term': 1
}
data['Term'] = data['Term'].map(term_mapping)
data.rename(columns={'Term': 'Long Term'}, inplace=True) # 重命名列
continuous_features = data.select_dtypes(include=['int64', 'float64']).columns.tolist() # 把篩選出來的列名轉換成列表# 連續特征用中位數補全
for feature in continuous_features:mode_value = data[feature].mode()[0] # 獲取該列的眾數。data[feature].fillna(mode_value, inplace=True) # 用眾數填充該列的缺失值,inplace=True表示直接在原數據上修改。
?第一次訓練:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import time
import matplotlib.pyplot as plt
from tqdm import tqdm # 導入tqdm庫用于進度條顯示# 設置GPU設備
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"使用設備: {device}")from sklearn.model_selection import train_test_splitX = data.drop(['Credit Default'], axis=1) # 特征,axis=1表示按列刪除
y = data['Credit Default'] # 標簽# 劃分訓練集和測試集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)# 歸一化數據
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)# 將數據轉換為PyTorch張量并移至GPU
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.LongTensor(y_train.values).to(device) # 添加 .values
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.LongTensor(y_test.values).to(device) # 添加 .values
class MLP(nn.Module):def __init__(self,input_size):super(MLP, self).__init__()self.fc1 = nn.Linear(input_size, 10) # 輸入層到隱藏層self.relu = nn.ReLU()self.fc2 = nn.Linear(10, 2) # 隱藏層到輸出層def forward(self, x):out = self.fc1(x)out = self.relu(out)out = self.fc2(out)return out# 實例化模型并移至GPU
model = MLP(input_size= X_train.shape[1]).to(device)# 分類問題使用交叉熵損失函數
criterion = nn.CrossEntropyLoss()# 使用隨機梯度下降優化器
optimizer = optim.SGD(model.parameters(), lr=0.01)# 訓練模型
num_epochs = 30000 # 訓練的輪數# 用于存儲每100個epoch的損失值和對應的epoch數
losses = []
epochs = []start_time = time.time() # 記錄開始時間# 創建tqdm進度條
with tqdm(total=num_epochs, desc="訓練進度", unit="epoch") as pbar:# 訓練模型for epoch in range(num_epochs):# 前向傳播outputs = model(X_train) # 隱式調用forward函數loss = criterion(outputs, y_train)# 反向傳播和優化optimizer.zero_grad()loss.backward()optimizer.step()# 記錄損失值并更新進度條if (epoch + 1) % 200 == 0:losses.append(loss.item())epochs.append(epoch + 1)# 更新進度條的描述信息pbar.set_postfix({'Loss': f'{loss.item():.4f}'})# 每1000個epoch更新一次進度條if (epoch + 1) % 1000 == 0:pbar.update(1000) # 更新進度條# 確保進度條達到100%if pbar.n < num_epochs:pbar.update(num_epochs - pbar.n) # 計算剩余的進度并更新time_all = time.time() - start_time # 計算訓練時間
print(f'1st Training time: {time_all:.2f} seconds')torch.save({'model_state_dict': model.state_dict(),'optimizer_state_dict': optimizer.state_dict(),
}, "checkpoint.pth")
使用設備: cpu
訓練進度: 100%|██████████| 30000/30000 [00:22<00:00, 1322.08epoch/s, Loss=0.4614]
1st Training time: 22.70 seconds
第二次訓練(增加早停):
# 重新實例化模型并加載參數
model = MLP(input_size=X_train.shape[1]).to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01) # 必須在加載優化器前定義好# 加載模型和優化器狀態
checkpoint = torch.load("checkpoint.pth")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])num_plus_epochs = 50 # 訓練的輪數
# 用于存儲每200個epoch的損失值和對應的epoch數
train_losses = [] # 存儲訓練集損失
test_losses = [] # 存儲測試集損失
epochs = []# ===== 新增早停相關參數 =====
best_test_loss = float('inf') # 記錄最佳測試集損失
best_epoch = 0 # 記錄最佳epoch
patience = 1 # 早停耐心值(連續多少輪測試集損失未改善時停止訓練)
counter = 0 # 早停計數器
early_stopped = False # 是否早停標志
# ==========================start_time = time.time() # 記錄開始時間# 創建tqdm進度條
with tqdm(total=num_plus_epochs, desc="訓練進度plus", unit="epoch") as pbar:# 訓練模型for epoch in range(num_plus_epochs):# 前向傳播outputs = model(X_train) # 隱式調用forward函數train_loss = criterion(outputs, y_train)# 反向傳播和優化optimizer.zero_grad()train_loss.backward()optimizer.step()train_losses.append(train_loss.item())# 記錄損失值并更新進度條model.eval()with torch.no_grad():test_outputs = model(X_test)test_loss = criterion(test_outputs, y_test)model.train()test_losses.append(test_loss.item())epochs.append(epoch + 1)# 更新進度條的描述信息pbar.set_postfix({'Train Loss': f'{train_loss.item():.4f}', 'Test Loss': f'{test_loss.item():.4f}'})# ===== 新增早停邏輯 =====if test_loss.item() < best_test_loss: # 如果當前測試集損失小于最佳損失best_test_loss = test_loss.item() # 更新最佳損失best_epoch = epoch + 1 # 更新最佳epochcounter = 0 # 重置計數器# 保存最佳模型torch.save(model.state_dict(), 'best_model.pth')else:counter += 1if counter >= patience:print(f"早停觸發!在第{epoch + 1}輪,測試集損失已有{patience}輪未改善。")print(f"最佳測試集損失出現在第{best_epoch}輪,損失值為{best_test_loss:.4f}")early_stopped = Truebreak # 終止訓練循環# ======================pbar.update(1) # 更新進度條# 確保進度條達到100%if pbar.n < num_plus_epochs:pbar.update(num_plus_epochs - pbar.n) # 計算剩余的進度并更新time_all = time.time() - start_time # 計算訓練時間
print(f'Training time: {time_all:.2f} seconds')# ===== 新增:加載最佳模型用于最終評估 =====
if early_stopped:print(f"加載第{best_epoch}輪的最佳模型進行最終評估...")model.load_state_dict(torch.load('best_model.pth'))
# ================================# 可視化損失曲線
plt.figure(figsize=(10, 6))
plt.plot(epochs, train_losses, label='Train Loss')
plt.plot(epochs, test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Test Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()
訓練進度plus: 100%|██████████| 50/50 [00:00<00:00, 648.00epoch/s, Train Loss=0.4614, Test Loss=0.4713]
早停觸發!在第8輪,測試集損失已有1輪未改善。 最佳測試集損失出現在第7輪,損失值為0.4713 Training time: 0.09 seconds 加載第7輪的最佳模型進行最終評估...
?
測試:?
model.eval() # 設置模型為評估模式
with torch.no_grad(): # torch.no_grad()的作用是禁用梯度計算,可以提高模型推理速度outputs = model(X_test) # 對測試數據進行前向傳播,獲得預測結果_, predicted = torch.max(outputs, 1) # torch.max(outputs, 1)返回每行的最大值和對應的索引#這個函數返回2個值,分別是最大值和對應索引,參數1是在第1維度(行)上找最大值,_ 是Python的約定,表示忽略這個返回值,所以這個寫法是找到每一行最大值的下標# 此時outputs是一個tensor,p每一行是一個樣本,每一行有3個值,分別是屬于3個類別的概率,取最大值的下標就是預測的類別# predicted == y_test判斷預測值和真實值是否相等,返回一個tensor,1表示相等,0表示不等,然后求和,再除以y_test.size(0)得到準確率# 因為這個時候數據是tensor,所以需要用item()方法將tensor轉化為Python的標量# 之所以不用sklearn的accuracy_score函數,是因為這個函數是在CPU上運行的,需要將數據轉移到CPU上,這樣會慢一些# size(0)獲取第0維的長度,即樣本數量correct = (predicted == y_test).sum().item() # 計算預測正確的樣本數accuracy = correct / y_test.size(0)print(f'測試集準確率: {accuracy * 100:.2f}%')
?測試集準確率: 76.67%
@浙大疏錦行