import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
import matplotlib.pyplot as plt
import numpy as np# 設置中文字體支持,避免繪圖時中文亂碼
plt.rcParams["font.family"] = ["SimHei"]
plt.rcParams['axes.unicode_minus'] = False # 檢查 GPU 是否可用,優先使用 GPU 加速訓練
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用設備: {device}")# 1. 數據預處理(調整為 128×128 尺寸,適配數據增強)
# 訓練集數據增強:先隨機裁剪(從原圖多樣本中截取),再 resize 到 128×128;或直接隨機裁剪到目標尺寸(若原圖足夠大)
# 這里演示更靈活的方式:先隨機從原圖取一塊,再統一成 128×128(假設原圖有足夠尺寸支撐裁剪,否則需調整策略)
train_transform = transforms.Compose([transforms.RandomResizedCrop(128, scale=(0.6, 1.0)), # 隨機裁剪并 resize 到 128×128,scale 控制裁剪區域占原圖比例transforms.RandomHorizontalFlip(), # 隨機水平翻轉transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), # 顏色抖動transforms.RandomRotation(15), # 隨機旋轉transforms.ToTensor(), # 轉為張量transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)) # 標準化(可根據實際數據統計均值、方差優化)
])# 測試集預處理:直接 resize 到 128×128,保證尺寸統一
test_transform = transforms.Compose([transforms.Resize((128, 128)), # 統一 resize 到 128×128transforms.ToTensor(),transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])# 2、用全部數據拆分(示例按 8:2 拆分訓練集和測試集)
full_dataset = datasets.ImageFolder(root=r"D:\python_learning\cyl_python\day43CNN_kaggle\BengaliFishImages\fish_images", transform=train_transform
)
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])# 3. 創建數據加載器,按批次加載數據
batch_size = 32 # 因圖像尺寸變大,可適當減小 batch_size 避免顯存不足
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)# 4. 定義適配 128×128 尺寸的 CNN 模型(需重新計算全連接層輸入維度)
class CNN(nn.Module):def __init__(self):super(CNN, self).__init__() # 第一個卷積塊:輸入 3 通道,輸出 32 通道self.conv1 = nn.Conv2d(3, 32, 3, padding=1)self.bn1 = nn.BatchNorm2d(32)self.relu1 = nn.ReLU()self.pool1 = nn.MaxPool2d(2, 2) # 128 -> 64# 第二個卷積塊:輸入 32 通道,輸出 64 通道self.conv2 = nn.Conv2d(32, 64, 3, padding=1)self.bn2 = nn.BatchNorm2d(64)self.relu2 = nn.ReLU()self.pool2 = nn.MaxPool2d(2) # 64 -> 32# 第三個卷積塊:輸入 64 通道,輸出 128 通道self.conv3 = nn.Conv2d(64, 128, 3, padding=1)self.bn3 = nn.BatchNorm2d(128)self.relu3 = nn.ReLU()self.pool3 = nn.MaxPool2d(2) # 32 -> 16# 第四個卷積塊(可選,進一步提取特征,若覺得網絡淺可添加)self.conv4 = nn.Conv2d(128, 256, 3, padding=1)self.bn4 = nn.BatchNorm2d(256)self.relu4 = nn.ReLU()self.pool4 = nn.MaxPool2d(2) # 16 -> 8# 計算全連接層輸入維度:根據卷積后輸出尺寸推算# 經過上述 4 次池化(若用 3 次,需對應調整),假設用 4 次池化,最終特征圖尺寸是 8×8# 若卷積塊數量變化,需重新計算:比如 3 次池化,尺寸是 128/(2^3)=16,那維度是 128*16*16 self.fc1 = nn.Linear(256 * 8 * 8, 512) # 這里按 4 次池化到 8×8 計算self.dropout = nn.Dropout(p=0.5)self.fc2 = nn.Linear(512, 20) # 20 分類,匹配數據集類別數def forward(self, x):# 卷積塊 1 前向x = self.conv1(x)x = self.bn1(x)x = self.relu1(x)x = self.pool1(x)# 卷積塊 2 前向x = self.conv2(x)x = self.bn2(x)x = self.relu2(x)x = self.pool2(x)# 卷積塊 3 前向x = self.conv3(x)x = self.bn3(x)x = self.relu3(x)x = self.pool3(x)# 卷積塊 4 前向(若添加了此塊)x = self.conv4(x)x = self.bn4(x)x = self.relu4(x)x = self.pool4(x)# 展平特征圖,進入全連接層x = x.view(-1, 256 * 8 * 8) # 與 __init__ 中 fc1 輸入維度對應x = self.fc1(x)x = self.relu3(x)x = self.dropout(x)x = self.fc2(x)return x# 初始化模型,并移動到 GPU(若可用)
model = CNN().to(device)# 定義損失函數、優化器、學習率調度器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5
)# 5. 訓練與測試函數(復用邏輯,無需修改)
def train(model, train_loader, test_loader, criterion, optimizer, scheduler, device, epochs):model.train() # 設置為訓練模式all_iter_losses = [] # 記錄每個 batch 的損失iter_indices = [] # 記錄 iteration 序號train_acc_history = [] # 訓練集準確率歷史test_acc_history = [] # 測試集準確率歷史train_loss_history = [] # 訓練集損失歷史test_loss_history = [] # 測試集損失歷史for epoch in range(epochs):running_loss = 0.0correct = 0total = 0for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device) # 數據移到 GPUoptimizer.zero_grad() # 梯度清零output = model(data) # 前向傳播loss = criterion(output, target) # 計算損失loss.backward() # 反向傳播optimizer.step() # 更新參數# 記錄當前 iteration 損失iter_loss = loss.item()all_iter_losses.append(iter_loss)iter_indices.append(epoch * len(train_loader) + batch_idx + 1)# 統計訓練集準確率running_loss += iter_loss_, predicted = output.max(1)total += target.size(0)correct += predicted.eq(target).sum().item()# 每 100 個批次打印訓練信息if (batch_idx + 1) % 100 == 0:print(f'Epoch: {epoch+1}/{epochs} | Batch: {batch_idx+1}/{len(train_loader)} 'f'| 單Batch損失: {iter_loss:.4f} | 累計平均損失: {running_loss/(batch_idx+1):.4f}')# 計算當前 epoch 訓練集指標epoch_train_loss = running_loss / len(train_loader)epoch_train_acc = 100. * correct / totaltrain_acc_history.append(epoch_train_acc)train_loss_history.append(epoch_train_loss)# 測試階段model.eval() # 切換為評估模式test_loss = 0correct_test = 0total_test = 0with torch.no_grad(): # 測試時無需計算梯度for data, target in test_loader:data, target = data.to(device), target.to(device)output = model(data)test_loss += criterion(output, target).item()_, predicted = output.max(1)total_test += target.size(0)correct_test += predicted.eq(target).sum().item()# 計算當前 epoch 測試集指標epoch_test_loss = test_loss / len(test_loader)epoch_test_acc = 100. * correct_test / total_testtest_acc_history.append(epoch_test_acc)test_loss_history.append(epoch_test_loss)# 更新學習率調度器scheduler.step(epoch_test_loss)print(f'Epoch {epoch+1}/{epochs} 完成 | 訓練準確率: {epoch_train_acc:.2f}% | 測試準確率: {epoch_test_acc:.2f}%')# 繪制 iteration 損失曲線plot_iter_losses(all_iter_losses, iter_indices)# 繪制 epoch 指標曲線plot_epoch_metrics(train_acc_history, test_acc_history, train_loss_history, test_loss_history)return epoch_test_acc# 6. 繪圖函數(可視化訓練過程,無需修改)
def plot_iter_losses(losses, indices):plt.figure(figsize=(10, 4))plt.plot(indices, losses, 'b-', alpha=0.7, label='Iteration Loss')plt.xlabel('Iteration(Batch序號)')plt.ylabel('損失值')plt.title('每個 Iteration 的訓練損失')plt.legend()plt.grid(True)plt.tight_layout()plt.show()def plot_epoch_metrics(train_acc, test_acc, train_loss, test_loss):epochs = range(1, len(train_acc) + 1)plt.figure(figsize=(12, 4))# 繪制準確率曲線plt.subplot(1, 2, 1)plt.plot(epochs, train_acc, 'b-', label='訓練準確率')plt.plot(epochs, test_acc, 'r-', label='測試準確率')plt.xlabel('Epoch')plt.ylabel('準確率 (%)')plt.title('訓練和測試準確率')plt.legend()plt.grid(True)# 繪制損失曲線plt.subplot(1, 2, 2)plt.plot(epochs, train_loss, 'b-', label='訓練損失')plt.plot(epochs, test_loss, 'r-', label='測試損失')plt.xlabel('Epoch')plt.ylabel('損失值')plt.title('訓練和測試損失')plt.legend()plt.grid(True)plt.tight_layout()plt.show()# 7. 執行訓練(根據顯存情況調整 epochs 和 batch_size )
epochs = 80 # 圖像尺寸大,訓練慢,可先小批次試訓
print("開始使用 CNN 訓練自定義魚類數據集(128×128 尺寸)...")
final_accuracy = train(model, train_loader, test_loader, criterion, optimizer, scheduler, device, epochs)
print(f"訓練完成!最終測試準確率: {final_accuracy:.2f}%")# 保存模型(可選,根據需求決定是否保存)
# torch.save(model.state_dict(), 'bengali_fish_cnn_128.pth')
import warnings
warnings.filterwarnings("ignore")
import matplotlib.pyplot as plt
import numpy as np
import torch
from torchvision import transforms
from PIL import Image# 設置中文字體支持
plt.rcParams["font.family"] = ["SimHei"]
plt.rcParams['axes.unicode_minus'] = False # 解決負號顯示問題# Grad-CAM實現
class GradCAM:def __init__(self, model, target_layer):self.model = modelself.target_layer = target_layerself.feature_maps = Noneself.gradient = None# 注冊鉤子self.hook_handles = []# 保存特征圖的正向鉤子def forward_hook(module, input, output):self.feature_maps = output.detach()# 保存梯度的反向鉤子def backward_hook(module, grad_in, grad_out):self.gradient = grad_out[0].detach()self.hook_handles.append(target_layer.register_forward_hook(forward_hook))self.hook_handles.append(target_layer.register_backward_hook(backward_hook))# 設置為評估模式self.model.eval()def __call__(self, x):# 前向傳播x = x.to(device)self.model.zero_grad()output = self.model(x)# 獲取預測類別pred_class = torch.argmax(output, dim=1).item()# 反向傳播one_hot = torch.zeros_like(output)one_hot[0, pred_class] = 1output.backward(gradient=one_hot, retain_graph=True)# 計算權重 (全局平均池化梯度)weights = torch.mean(self.gradient, dim=(2, 3), keepdim=True)# 加權組合特征圖cam = torch.sum(weights * self.feature_maps, dim=1).squeeze()# ReLU激活,因為我們只關心對預測有積極貢獻的區域cam = torch.relu(cam)# 歸一化if torch.max(cam) > 0:cam = cam / torch.max(cam)# 調整為圖像大小cam = torch.nn.functional.interpolate(cam.unsqueeze(0).unsqueeze(0),size=(128, 128), # 匹配圖像尺寸mode='bilinear',align_corners=False).squeeze()return cam.cpu().numpy(), pred_classdef remove_hooks(self):for handle in self.hook_handles:handle.remove()# 轉換圖像以便可視化
def tensor_to_np(tensor):img = tensor.cpu().numpy().transpose(1, 2, 0)# 使用實際訓練時的均值和標準差mean = np.array([0.4914, 0.4822, 0.4465])std = np.array([0.2023, 0.1994, 0.2010])img = std * img + mean # 反標準化img = np.clip(img, 0, 1) # 確保像素值在[0,1]范圍內return img# 選擇一個隨機圖像
# idx = np.random.randint(len(test_dataset))
idx = 110 # 選擇測試集中的第103張圖片 (索引從0開始)
image, label = test_dataset[idx]# 獲取類別名稱(根據ImageFolder自動映射的類別順序)
classes = test_dataset.dataset.classes # 注意:如果使用了random_split,需要從原始dataset獲取classes
print(f"選擇的圖像類別: {classes[label]}")# 添加批次維度并移動到設備
input_tensor = image.unsqueeze(0).to(device)# 初始化Grad-CAM(使用model.conv3作為目標層)
# grad_cam = GradCAM(model, model.conv3)
grad_cam = GradCAM(model, model.conv4)# 生成熱力圖(修正方法調用)
heatmap, pred_class = grad_cam(input_tensor) # 直接調用對象,而非使用generate_cam方法# 可視化
plt.figure(figsize=(15, 5))# 原始圖像
plt.subplot(1, 3, 1)
plt.imshow(tensor_to_np(image))
plt.title(f"原始圖像: {classes[label]}")
plt.axis('off')# 熱力圖
plt.subplot(1, 3, 2)
plt.imshow(heatmap, cmap='jet')
plt.title(f"Grad-CAM熱力圖: {classes[pred_class]}")
plt.axis('off')# 疊加的圖像
plt.subplot(1, 3, 3)
img = tensor_to_np(image)
heatmap_resized = np.uint8(255 * heatmap)
heatmap_colored = plt.cm.jet(heatmap_resized)[:, :, :3]
superimposed_img = heatmap_colored * 0.4 + img * 0.6
plt.imshow(superimposed_img)
plt.title("疊加熱力圖")
plt.axis('off')plt.tight_layout()
plt.savefig('grad_cam_result_128_crop_100.png')
plt.show()print("Grad-CAM可視化完成。已保存為grad_cam_result_128_crop_100.png")
@浙大疏錦行