知識點回顧:
- 過擬合的判斷:測試集和訓練集同步打印指標
- 模型的保存和加載
- 僅保存權重
- 保存權重和模型
- 保存全部信息checkpoint,還包含訓練狀態
- 早停策略
作業:對信貸數據集訓練后保存權重,加載權重后繼續訓練50輪,并采取早停策略
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler, OneHotEncoder, LabelEncoder
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
from imblearn.over_sampling import SMOTE# ------------------- 設備配置(GPU/CPU) -------------------
# 檢查是否有可用的GPU:如果有則用GPU加速訓練(速度更快),否則用CPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"使用設備: {device}") # 打印當前使用的設備(確認是否啟用GPU)# ------------------- 加載并清洗數據 -------------------
# 加載信貸預測數據集(假設data.csv在當前目錄下)
# 數據包含用戶信息(如收入、工作年限)和標簽(是否違約:Credit Default)
data = pd.read_csv(r'data.csv')# 丟棄無用的Id列(Id是用戶唯一標識,與信貸違約無關)
data = data.drop(['Id'], axis=1) # axis=1表示按列刪除# 區分連續特征(數值型)和離散特征(文本型/類別型)
# 連續特征:比如年齡、收入(可以取任意數值)
# 離散特征:比如職業、教育程度(只能取有限的類別)
continuous_features = data.select_dtypes(include=['float64', 'int64']).columns.tolist() # 數值列
discrete_features = data.select_dtypes(exclude=['float64', 'int64']).columns.tolist() # 非數值列# 離散特征用眾數(出現次數最多的值)填充缺失值
# 例:如果"職業"列有缺失,用出現最多的職業填充
for feature in discrete_features:if data[feature].isnull().sum() > 0: # 檢查是否有缺失值mode_value = data[feature].mode()[0] # 計算眾數data[feature].fillna(mode_value, inplace=True) # 填充缺失值# 連續特征用中位數(中間位置的數)填充缺失值
# 例:如果"收入"列有缺失,用所有收入的中間值填充(比平均數更抗異常值)
for feature in continuous_features:if data[feature].isnull().sum() > 0:median_value = data[feature].median() # 計算中位數data[feature].fillna(median_value, inplace=True)# ------------------- 離散特征編碼(轉成數值) -------------------
# 有順序的離散特征(比如"工作年限"有"1年"<"2年"<"10+年")用標簽編碼(轉成數字)
mappings = {"Years in current job": {"10+ years": 10, # "10+年"對應數字10(最大)"2 years": 2, # "2年"對應數字2"3 years": 3,"< 1 year": 0, # "<1年"對應數字0(最小)"5 years": 5,"1 year": 1,"4 years": 4,"6 years": 6,"7 years": 7,"8 years": 8,"9 years": 9},"Home Ownership": { # 房屋所有權(有順序:租房 < 房貸 < 有房貸 < 自有房?)"Home Mortgage": 0, # 房貸"Rent": 1, # 租房"Own Home": 2, # 自有房"Have Mortgage": 3 # 有房貸(可能順序需要根據業務調整)},"Term": { # 貸款期限(短期 < 長期)"Short Term": 0, # 短期"Long Term": 1 # 長期}
}# 使用映射字典將文本轉成數字(標簽編碼)
data["Years in current job"] = data["Years in current job"].map(mappings["Years in current job"])
data["Home Ownership"] = data["Home Ownership"].map(mappings["Home Ownership"])
data["Term"] = data["Term"].map(mappings["Term"])# 無順序的離散特征(比如"貸款用途":購車/教育/裝修,彼此無大小關系)用獨熱編碼
# 獨熱編碼:將1列轉成N列(N是類別數),每列用0/1表示是否屬于該類別
data = pd.get_dummies(data, columns=['Purpose']) # 對"Purpose"列做獨熱編碼# 獨熱編碼后會生成新列(比如Purpose_購車、Purpose_教育),需要將這些列的類型從bool轉成int(0/1)
list_final = [] # 存儲新生成的列名
data2 = pd.read_csv(r'data.csv') # 重新讀取原始數據(對比列名)
for i in data.columns:if i not in data2.columns: # 原始數據沒有的列,就是新生成的獨熱列list_final.append(i)
for i in list_final:data[i] = data[i].astype(int) # 將bool型(True/False)轉成int(1/0)# ------------------- 分離特征和標簽 -------------------
X = data.drop(['Credit Default'], axis=1) # 特征數據(所有列,除了標簽列)
y = data['Credit Default'] # 標簽數據(0=未違約,1=違約)# 劃分訓練集(80%)和測試集(20%):訓練集用來學習規律,測試集驗證模型效果
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # random_state固定隨機種子,保證結果可復現# 特征標準化(將特征縮放到0-1區間,避免大數值特征"欺負"小數值特征)
scaler = MinMaxScaler() # 創建MinMaxScaler(最小-最大標準化)
X_train = scaler.fit_transform(X_train) # 用訓練集擬合標準化參數并轉換
X_test = scaler.transform(X_test) # 用訓練集的參數轉換測試集(保證數據分布一致)# 將數據轉成PyTorch張量(神經網絡只能處理張量數據),并移動到GPU(如果有)
# FloatTensor:32位浮點數(特征數據)
# LongTensor:64位整數(標簽數據,分類任務需要)
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.LongTensor(y_train.values).to(device)
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.LongTensor(y_test.values).to(device)# ------------------- 定義神經網絡模型 -------------------
class MLP(nn.Module):def __init__(self):super(MLP, self).__init__() # 調用父類構造函數(必須)# 全連接層1:輸入30個特征(根據數據預處理后的列數確定),輸出64個神經元self.fc1 = nn.Linear(30, 64)self.relu = nn.ReLU() # 激活函數(引入非線性,讓模型能學習復雜規律)self.dropout = nn.Dropout(0.2) # Dropout層(隨機丟棄30%的神經元,防止過擬合)# 全連接層2:輸入64個神經元,輸出32個神經元self.fc2 = nn.Linear(64, 32)# 全連接層3:輸入32個神經元,輸出2個類別(0=未違約,1=違約)self.fc3 = nn.Linear(32, 2)def forward(self, x):# 前向傳播:數據從輸入層→隱藏層→輸出層的計算流程x = self.fc1(x) # 輸入層→隱藏層1:30→64x = self.relu(x) # 激活函數(過濾負數值)x = self.dropout(x) # 應用Dropout(防止過擬合)x = self.fc2(x) # 隱藏層1→隱藏層2:64→32x = self.relu(x) # 激活函數x = self.fc3(x) # 隱藏層2→輸出層:32→2(輸出未歸一化的分數)return x# ------------------- 初始化模型、損失函數、優化器 -------------------
model = MLP().to(device) # 實例化模型并移動到GPU
criterion = nn.CrossEntropyLoss() # 交叉熵損失函數(適合分類任務)
optimizer = optim.Adam(model.parameters(), lr=0.001) # Adam優化器(比SGD更智能,自動調整學習率)# ------------------- 模型訓練 -------------------
num_epochs = 20000 # 訓練輪數(完整遍歷訓練集的次數)
train_losses = [] # 記錄每200輪的訓練損失
test_losses = [] # 記錄每200輪的測試損失
accuracies = [] # 記錄每200輪的測試準確率
epochs = [] # 記錄對應的輪數
# ==========新增早停相關參數==========
best_test_loss = float('inf') # 記錄最佳測試集損失
best_epoch = 0 # 記錄最佳epoch
patience = 50 # 早停耐心值(連續多少輪測試集損失未改善時停止訓練)
counter = 0 # 早停計數器
early_stopped = False # 是否早停標志
# =====================================
start_time = time.time() # 記錄訓練開始時間# 創建tqdm進度條(可視化訓練進度)
with tqdm(total=num_epochs, desc="訓練進度", unit="epoch") as pbar:for epoch in range(num_epochs):# 前向傳播:模型根據輸入數據計算預測值outputs = model(X_train) # 模型輸出(形狀:[訓練樣本數, 2],表示每個樣本屬于2個類別的分數)train_loss = criterion(outputs, y_train) # 計算損失(預測值與真實標簽的差異,越小越好)# 反向傳播和參數更新optimizer.zero_grad() # 清空歷史梯度(避免梯度累加)train_loss.backward() # 反向傳播計算梯度(自動求導)optimizer.step() # 根據梯度更新模型參數(優化器核心操作)# 每200輪記錄一次損失和準確率(避免記錄太頻繁影響速度)if (epoch + 1) % 200 == 0:# 在測試集上評估模型(不更新參數,只看效果)model.eval() # 切換到評估模式(關閉Dropout,保證結果穩定)with torch.no_grad(): # 禁用梯度計算(節省內存,加速推理)test_outputs = model(X_test) # 測試集預測值test_loss = criterion(test_outputs, y_test) # 計算測試集損失model.train() # 切換回訓練模式# 記錄損失值和準確率train_losses.append(train_loss.item()) # 訓練集損失test_losses.append(test_loss.item()) # 測試集損失epochs.append(epoch + 1) # 記錄輪數# 更新進度條顯示的信息(當前損失和準確率)pbar.set_postfix({'Train Loss': f'{train_loss.item():.4f}', 'Test Loss': f'{test_loss.item():.4f}'})# ===== 新增早停邏輯 =====if test_loss.item() < best_test_loss: # 如果當前測試集損失小于最佳損失best_test_loss = test_loss.item() # 更新最佳損失best_epoch = epoch + 1 # 更新最佳epochcounter = 0 # 重置計數器# 保存最佳模型torch.save(model.state_dict(), 'best_model.pth')else:counter += 1if counter >= patience:print(f"早停觸發!在第{epoch+1}輪,測試集損失已有{patience}輪未改善。")print(f"最佳測試集損失出現在第{best_epoch}輪,損失值為{best_test_loss:.4f}")early_stopped = Truebreak # 終止訓練循環# ======================# 每1000輪更新一次進度條(避免進度條刷新太頻繁)if (epoch + 1) % 1000 == 0:pbar.update(1000) # 進度條前進1000步# 確保進度條最終顯示100%(防止最后一輪未更新)if pbar.n < num_epochs:pbar.update(num_epochs - pbar.n)# 計算總訓練時間并打印
time_all = time.time() - start_time
print(f'Training time: {time_all:.2f} seconds')# ===== 新增:加載最佳模型用于最終評估 =====
if early_stopped:print(f"加載第{best_epoch}輪的最佳模型進行最終評估...")model.load_state_dict(torch.load('best_model.pth'))
# ================================# ------------------- 可視化訓練結果 -------------------
# 創建雙y軸圖表(損失和準確率在同一張圖顯示)
# 可視化損失曲線
plt.figure(figsize=(10, 6))
plt.plot(epochs, train_losses, label='Train Loss')
plt.plot(epochs, test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Test Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()# ------------------- 最終測試集評估 -------------------
model.eval() # 切換到評估模式
with torch.no_grad(): # 禁用梯度計算outputs = model(X_test) # 測試集預測值_, predicted = torch.max(outputs, 1) # 取預測類別(0或1)correct = (predicted == y_test).sum().item() # 正確預測的樣本數accuracy = correct / y_test.size(0) # 計算準確率print(f'測試集準確率: {accuracy * 100:.2f}%') # 打印準確率(百分比形式)
#測試集準確率: 77.20%