教材鏈接-3.2. 線性回歸的從零開始實現
c++實現
該博客僅用于記錄一下自己的代碼,可與c++實現作為對照
from d2l import torch as d2l
import torch
import random
# nn是神經網絡的縮寫
from torch import nn
from torch.utils import data# 加載訓練數據
# 加載訓練數據集
simples = torch.load('datas.pt')
# 這里是加載了訓練和測試數據集的真實權重和偏差,僅作為最后訓練結果的驗證使用
tw, tb = torch.load('wb.pt')
# 加載測試數據集
tests = torch.load('test.pt')
# 獲取訓練數據集的樣本數量
simple_num = simples.shape[0]# 獲取數據讀取迭代器
def data_iter(batch_size, features, labels):# 計算數據的總數量num_examples = len(features)# 創建一個包含數據索引的列表 indices = list(range(num_examples))# 隨機打亂索引列表,以實現隨機讀取樣本,對訓練結果意義不明# random.shuffle(indices)# 遍歷打亂后的indices,每次取出batch_size個索引,用于構建一個小批量數據 for i in range(0, num_examples, batch_size):# 獲取當前批次的索引號并以張量形式存儲batch_indices = torch.tensor(indices[i: min(i + batch_size, num_examples)])# 根據索引從特征和標簽中提取數據 yield features[batch_indices], labels[batch_indices]
# 在Python中,yield 是一個關鍵字,用于定義一個生成器(generator)。生成器是一種特殊的迭代器,它允許你定義一個可以記住上一次返回時在函數體中的位置的函數。對生成器函數的第二次(或第n次)調用將恢復函數的執行,并繼續從上次掛起的位置開始。# 定義一個函數來加載并批量處理數據,返回數據獲取迭代器
def load_array(data_arrays, batch_size, is_train=True): #@save"""構造一個PyTorch數據迭代器"""# 使用TensorDataset將多個tensor組合成一個數據集 dataset = data.TensorDataset(*data_arrays)# 使用DataLoader加載數據集,并指定批量大小和是否打亂數據return data.DataLoader(dataset, batch_size, shuffle=is_train)# 定義線性回歸模型
def linreg(X, w, b): #@save"""線性回歸模型"""# 使用矩陣乘法計算預測值,并加上偏差 return torch.matmul(X, w) + b# 定義平方損失函數
def squared_loss(y_hat, y): #@save"""均方損失"""# 計算預測值與實際值之間的平方差,并除以2(方便梯度計算)return (y_hat - y.reshape(y_hat.shape)) ** 2 / 2# 定義交叉熵損失函數,線性回歸模型用不到
def cross_entropy(y_hat, y):return - torch.log(y_hat[range(len(y_hat)), y])# 定義一個魯棒的損失函數,結合了平方損失和絕對值損失
def robust_loss(y_hat, y, delta=1.0):residual = torch.abs(y_hat - y)return torch.where(residual<delta, 0.5* residual **2, delta*(residual-0.5*delta))# 絕對值損失函數
def abs_loss(y_hat, y):return torch.abs(y_hat - y.reshape(y_hat.shape))# 定義隨機梯度下降函數
def sgd(params, lr, batch_size): #@save"""小批量隨機梯度下降"""with torch.no_grad():# 遍歷模型參數 for param in params:# 更新參數值,使用學習率lr乘以參數的梯度,并除以批量大小 param -= lr * param.grad / batch_size# 清除參數的梯度,為下一輪迭代做準備 param.grad.zero_()# 數據標準化處理
def standard(X):X_mean = torch.mean(X, dim=0)X_std = torch.std(X, dim=0)return (X-X_mean)/X_std# 數據最小最大歸一化處理
def min_max(X):X_min = torch.min(X, dim=0)[0]X_max = torch.max(X, dim=0)[0]return (X-X_min)/(X_max-X_min)# 不進行任何處理,直接返回輸入
def noProcess(X):return X
#Linear Regression Implementation from Scratch
if __name__ == '__main__':# 設置學習率和訓練輪數 lr = 0.03num_epochs = 20# 這里其實net變量并沒有定義為一個神經網絡模型,而是一個函數 # 但為了與后續代碼保持一致,我們仍然使用net來表示這個線性回歸函數# loss同理net = linregloss = squared_loss# 使用不進行任何處理的數據處理方式 data_process = noProcess# 將數據分成50個批次,計算每批數據的數量 batch_size = simple_num // 50# 提取特征和標簽 # 提取最后一列作為標簽 label = simples[:,-1]# 提取除最后一列外的所有列作為特征,并使用data_process進行處理feature=data_process(simples[:, :-1])# 初始化權重和偏差,權重使用正態分布初始化,偏差初始化為0 w = torch.normal(0, 1, size=(feature.shape[1], 1), requires_grad=True)# w = torch.tensor([0.3], requires_grad=True)b = torch.tensor([0.0], requires_grad=True)timer = d2l.Timer()# 開始訓練 for epoch in range(num_epochs):# 通過data_iter遍歷數據進行一輪訓練for X,y in data_iter(batch_size, feature, label):# 計算預測值y_hat = net(X, w, b)# 計算損失l = loss(y_hat, y)# 反向傳播計算梯度 l.sum().backward()# 使用隨機梯度下降更新參數sgd([w,b], lr, batch_size)# 一輪訓練結束后,計算整個訓練集上的損失,用以監控訓練效果# with torch.no_grad(): 告訴 PyTorch 在這個上下文內不要計算梯度,從而節省內存并加速計算。with torch.no_grad():label_hat = net(feature, w, b)epoch_loss = loss(label_hat, label)if epoch%5 == 0:print(f'in epoch{epoch+1}, loss is {epoch_loss.sum()}')# 在訓練完成后,計算測試集上的預測值和損失 # 提取測試集的特征和標簽 test_feature = data_process(tests[:, :-1])test_label = tests[:, -1]# 計算測試集上的預測值和損失 test_label_hat = net(test_feature, w, b)label_loss = loss(test_label_hat, test_label)print(f'in test epoch, loss is {label_loss.mean()}')print(f'true_w={tw}, true_b={tb}, w={w}, b={b}')print(f' {num_epochs} epoch, time {timer.stop():.2f} sec')
#Concise Implementation of Linear Regression
#the concise implementation have lower accuracy than from scratch
if __name__ == '__main2__':# 設置學習率、訓練輪數、數據處理方式和批量大小 lr = 0.03num_epochs = 15# 使用不進行任何處理的數據處理方式 data_process = noProcess# 將數據分成50個批次,計算每批數據的數量 batch_size = simple_num // 50# 提取特征和標簽 label = simples[:,-1]feature=data_process(simples[:, :-1])# 加載數據并創建數據迭代器 data_iter = load_array((feature, label), batch_size)# 構建神經網絡模型,這里是一個簡單的線性回歸模型 net = nn.Sequential(nn.Linear(feature.shape[1], 1))# 我們的模型只包含一個層,因此實際上不需要Sequential# 不使用Sequential時,后面的net[0]需要改為net# net = nn.Linear(feature.shape[1], 1)# 初始化網絡權重和偏置 net[0].weight.data.normal_(0, 0.01)net[0].bias.data.fill_(0)# 使用均方誤差損失函數loss = nn.MSELoss()# 使用隨機梯度下降優化器 trainer = torch.optim.SGD(net.parameters(), lr=lr)# 開始訓練 for epoch in range(num_epochs):# 通過data_iter遍歷數據進行一輪訓練for X, y in data_iter:# 前向傳播計算預測值y_hat = net(X)# 計算損失 l = loss(y_hat, y.reshape(y_hat.shape))# 梯度清零,為下一輪迭代計算做準備trainer.zero_grad()# 反向傳播計算梯度 l.backward()# 使用隨機梯度下降更新參數trainer.step()# 在每個epoch結束后,對整個數據集進行前向傳播并計算損失,用于監控訓練過程 label_hat = net(feature)epoch_loss = loss(label_hat, label.reshape(label_hat.shape))if epoch%5 == 0:print(f'in epoch{epoch+1}, loss is {epoch_loss.mean()}')# 在訓練完成后,計算測試集上的預測值和損失 # 提取測試集的特征和標簽 test_feature = data_process(tests[:, :-1])test_label = tests[:, -1]# 計算測試集上的預測值和損失 test_label_hat = net(test_feature)label_loss = loss(test_label_hat, test_label.reshape(test_label_hat.shape))print(f'in test epoch, loss is {label_loss.mean():f}')print(f'tw={tw}, tb={tb}, w={net[0].weight.data}, b={net[0].bias.data}')