轉自我的個人博客: https://shar-pen.github.io/2025/05/04/torch-distributed-series/1.MNIST/
基礎的單卡訓練
本筆記本演示了訓練一個卷積神經網絡(CNN)來對 MNIST 數據集中的手寫數字進行分類的過程。工作流程包括:
- 數據準備:加載和預處理 MNIST 數據集。
- 模型定義:使用 PyTorch 構建 CNN 模型。
- 模型訓練:在 MNIST 訓練數據集上訓練模型。
- 模型評估:在 MNIST 測試數據集上測試模型并評估其性能。
- 可視化:展示樣本圖像及其對應的標簽。
參考 pytorch 官方示例 https://github.com/pytorch/examples/blob/main/mnist/main.py 。
至于為什么選擇 MNIST 分類任務, 因為它就是深度學習里的 Hello World.
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torchvision import datasets, transforms
from time import time
在深度學習里,真正必要的超參數,大致是下面這些:
-
學習率(learning rate)
- 最最核心的超參數。
- 決定每次參數更新的步幅大小。
- 學習率不合適,訓練幾乎一定失敗。
-
優化器(optimizer)
- 比如
SGD
、Adam
、AdamW
等。 - 不同優化器,收斂速度、最終效果差異很大。
- 有時也需要設置優化器內部超參(比如 Adam 的 β 1 , β 2 \beta_1, \beta_2 β1?,β2?)。
- 比如
-
批大小(batch size)
- 多少樣本合成一批送進模型訓練。
- 影響訓練穩定性、收斂速度、硬件占用。
-
訓練輪次(epoch) 或 最大步數(max steps)
- 總共訓練多久。
- 如果訓練不夠長,模型欠擬合;太久則過擬合或資源浪費。
-
損失函數(loss function)
- 明確訓練目標,比如分類用
CrossEntropyLoss
,回歸用MSELoss
。 - 不同任務必須選對損失。
- 明確訓練目標,比如分類用
超參設置
我們設置些最基礎的超參: epoch, batch size, device, lr
EPOCHS = 5
BATCH_SIZE = 512
LR = 0.001
LR_DECAY_STEP_NUM = 1
LR_DECAY_FACTOR = 0.5
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
數據構建
直接用庫函數生成 dataset 和 dataloader, 前者其實只是拿來生成 dataloader
transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))
])train_data = datasets.MNIST(root = './mnist',train=True, # 設置True為訓練數據,False為測試數據transform = transform,# download=True # 設置True后就自動下載,下載完成后改為False即可
)train_loader = torch.utils.data.DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)test_data = datasets.MNIST(root = './mnist',train=False, # 設置True為訓練數據,False為測試數據transform = transform,
)test_loader = torch.utils.data.DataLoader(dataset=test_data, batch_size=BATCH_SIZE, shuffle=True)# plot one exampleprint(f'dataset: input shape: {train_data.data.size()}, label shape: {train_data.targets.size()}')
print(f'dataloader iter: input shape: {next(iter(train_loader))[0].size()}, label shape: {next(iter(train_loader))[1].size()}')
plt.imshow(train_data.data[0].numpy(), cmap='gray')
plt.title(f'Label: {train_data.targets[0]}')
plt.show()
? dataset: input shape: torch.Size([60000, 28, 28]), label shape: torch.Size([60000])
? dataloader iter: input shape: torch.Size([512, 1, 28, 28]), label shape: torch.Size([512])
網絡
設計簡單的 ConvNet, 幾層 CNN + MLP。初始化新模型后,先將其放到 DEVICE 上
class ConvNet(nn.Module):"""A neural network model for MNIST digit classification.This model is designed to classify images from the MNIST dataset, which consists of grayscale images of handwritten digits (0-9). The network architecture includes convolutional layers for feature extraction, followed by fully connected layers for classification.Attributes:features (nn.Sequential): A sequential container of convolutional layers, activation functions, pooling, and dropout for feature extraction.classifier (nn.Sequential): A sequential container of fully connected layers, activation functions, and dropout for classification.Methods:forward(x):Defines the forward pass of the network. Takes an input tensor `x`, processes it through the feature extractor and classifier, and returns the log-softmax probabilities for each class."""def __init__(self):super(ConvNet, self).__init__()self.features = nn.Sequential(nn.Conv2d(1, 32, 3, 1),nn.ReLU(),nn.Conv2d(32, 64, 3, 1),nn.ReLU(),nn.MaxPool2d(2),nn.Dropout(0.25))self.classifier = nn.Sequential(nn.Linear(9216, 128),nn.ReLU(),nn.Dropout(0.5),nn.Linear(128, 10))def forward(self, x):x = self.features(x)x = torch.flatten(x, 1)x = self.classifier(x)output = F.log_softmax(x, dim=1)return output
訓練和評估函數
將訓練和評估函數分別封裝為函數,使主循環更簡潔
def train(model, device, train_loader, optimizer):model.train()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = F.nll_loss(output, target)loss.backward()optimizer.step()if (batch_idx + 1) % 30 == 0: print('Train: [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(batch_idx * len(data), len(train_loader.dataset),100. * batch_idx / len(train_loader), loss.item()))def test(model, device, test_loader):model.eval()test_loss = 0correct = 0with torch.no_grad():for data, target in test_loader:data, target = data.to(device), target.to(device)output = model(data)test_loss += F.nll_loss(output, target, reduction='sum').item() # 將一批的損失相加pred = output.max(1, keepdim=True)[1] # 找到概率最大的下標correct += pred.eq(target.view_as(pred)).sum().item()test_loss /= len(test_loader.dataset)print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(test_loss, correct, len(test_loader.dataset),100. * correct / len(test_loader.dataset)))
主訓練循環
model = ConvNet().to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=LR)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=LR_DECAY_STEP_NUM, gamma=LR_DECAY_FACTOR)start_time = time() # Record the start time
for epoch in range(EPOCHS):epoch_start_time = time() # Record the start time of the current epochprint(f'Epoch {epoch}/{EPOCHS}')print(f'Learning Rate: {scheduler.get_last_lr()[0]}')train(model, DEVICE, train_loader, optimizer)test(model, DEVICE, test_loader)scheduler.step()epoch_end_time = time() # Record the end time of the current epochprint(f"Time for epoch {epoch}: {epoch_end_time - epoch_start_time:.2f} seconds")end_time = time() # Record the end time
print(f"Total training time: {end_time - start_time:.2f} seconds")
+-----------------------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=========================================================================================|
| 0 N/A N/A 1795609 C ...st/anaconda3/envs/xprepo/bin/python 448MiB |
| 0 N/A N/A 1814253 C ...st/anaconda3/envs/xprepo/bin/python 1036MiB |
| 7 N/A N/A 4167010 C ...guest/anaconda3/envs/QDM/bin/python 19416MiB |
+-----------------------------------------------------------------------------------------+
0 卡的占用 1484 MB
完整代碼
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torchvision import datasets, transforms
from time import time
import argparseclass ConvNet(nn.Module):def __init__(self):super(ConvNet, self).__init__()self.features = nn.Sequential(nn.Conv2d(1, 32, 3, 1),nn.ReLU(),nn.Conv2d(32, 64, 3, 1),nn.ReLU(),nn.MaxPool2d(2),nn.Dropout(0.25))self.classifier = nn.Sequential(nn.Linear(9216, 128),nn.ReLU(),nn.Dropout(0.5),nn.Linear(128, 10))def forward(self, x):x = self.features(x)x = torch.flatten(x, 1)x = self.classifier(x)output = F.log_softmax(x, dim=1)return outputdef arg_parser():parser = argparse.ArgumentParser(description="MNIST Training Script")parser.add_argument("--epochs", type=int, default=5, help="Number of training epochs")parser.add_argument("--batch_size", type=int, default=512, help="Batch size for training")parser.add_argument("--lr", type=float, default=0.0005, help="Learning rate")parser.add_argument("--lr_decay_step_num", type=int, default=1, help="Step size for learning rate decay")parser.add_argument("--lr_decay_factor", type=float, default=0.5, help="Factor by which learning rate is decayed")parser.add_argument("--cuda_id", type=int, default=0, help="CUDA device ID to use")return parser.parse_args()def prepare_data(batch_size):transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])train_data = datasets.MNIST(root = './mnist',train=True, # 設置True為訓練數據,False為測試數據transform = transform,# download=True # 設置True后就自動下載,下載完成后改為False即可)train_loader = torch.utils.data.DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)test_data = datasets.MNIST(root = './mnist',train=False, # 設置True為訓練數據,False為測試數據transform = transform,)test_loader = torch.utils.data.DataLoader(dataset=test_data, batch_size=batch_size, shuffle=True)return train_loader, test_loaderdef train(model, device, train_loader, optimizer):model.train()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = F.nll_loss(output, target)loss.backward()optimizer.step()if (batch_idx + 1) % 30 == 0: print('Train: [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(batch_idx * len(data), len(train_loader.dataset),100. * batch_idx / len(train_loader), loss.item()))def test(model, device, test_loader):model.eval()test_loss = 0correct = 0with torch.no_grad():for data, target in test_loader:data, target = data.to(device), target.to(device)output = model(data)test_loss += F.nll_loss(output, target, reduction='sum').item() # 將一批的損失相加pred = output.max(1, keepdim=True)[1] # 找到概率最大的下標correct += pred.eq(target.view_as(pred)).sum().item()test_loss /= len(test_loader.dataset)print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(test_loss, correct, len(test_loader.dataset),100. * correct / len(test_loader.dataset)))def train_mnist_classification():args = arg_parser()print(args)EPOCHS = args.epochsBATCH_SIZE = args.batch_sizeLR = args.lrLR_DECAY_STEP_NUM = args.lr_decay_step_numLR_DECAY_FACTOR = args.lr_decay_factorCUDA_ID = args.cuda_idDEVICE = torch.device(f"cuda:{CUDA_ID}")train_loader, test_loader = prepare_data(BATCH_SIZE)model = ConvNet().to(DEVICE)optimizer = optim.Adam(model.parameters(), lr=LR)scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=LR_DECAY_STEP_NUM, gamma=LR_DECAY_FACTOR)start_time = time() # Record the start timefor epoch in range(EPOCHS):epoch_start_time = time() # Record the start time of the current epochprint(f'Epoch {epoch}/{EPOCHS}')print(f'Learning Rate: {scheduler.get_last_lr()[0]}')train(model, DEVICE, train_loader, optimizer)test(model, DEVICE, test_loader)scheduler.step()epoch_end_time = time() # Record the end time of the current epochprint(f"Time for epoch {epoch}: {epoch_end_time - epoch_start_time:.2f} seconds")end_time = time() # Record the end timeprint(f"Total training time: {end_time - start_time:.2f} seconds")if __name__ == "__main__":train_mnist_classification()