1 圖像分類案例
1.1 CIFAR10數據集介紹
-
cifar數據是torchvision第三方包提供的數據集
-
訓練集5w 測試集1w
-
y標簽 10個類別 10分類問題
-
一張圖形狀 (32, 32, 3)
import torch import torch.nn as nn from torchvision.datasets import CIFAR10 from torchvision.transforms import ToTensor import torch.optim as optim from torch.utils.data import DataLoader import time import matplotlib.pyplot as plt from torchsummary import summary ? # 每批次樣本數 BATCH_SIZE = 8 ? ? # todo: 1-加載數據集轉換成張量數據集 def create_dataset():# root: 文件夾所在目錄路徑# train: 是否加載訓練集# ToTensor(): 將圖片數據轉換成張量數據train_dataset = CIFAR10(root='./data', train=True, transform=ToTensor())valid_dataset = CIFAR10(root='./data', train=False, transform=ToTensor())return train_dataset, valid_dataset ? ? # todo: 2-構建卷積神經網絡分類模型 # todo: 3-模型訓練 # todo: 4-模型評估 if __name__ == '__main__':train_dataset, valid_dataset = create_dataset()print('圖片類別對應關系->', train_dataset.class_to_idx)print('train_dataset->', train_dataset.data[0])print('train_dataset.data.shape->', train_dataset.data.shape)print('valid_dataset.data.shape->', valid_dataset.data.shape)print('train_dataset.targets->', train_dataset.targets[0])# 圖像展示plt.figure(figsize=(2, 2))plt.imshow(train_dataset.data[1])plt.title(train_dataset.targets[1])plt.show()
1.2 構建分類神經網絡模型
# todo: 2-構建卷積神經網絡分類模型
class ImageModel(nn.Module):# todo:2-1 構建init構造函數, 實現搭建神經網絡def __init__(self):super().__init__()# 第1層卷積層# 輸入通道3 一張RGB圖像就是3通道# 輸出通道6 6個神經元提取出6張特征圖# 卷積核大小3self.conv1 = nn.Conv2d(in_channels=3, out_channels=6, kernel_size=3)# 第1層池化層# 窗口大小2*2# 步長2self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)# 第2層卷積層self.conv2 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=3)# 第2層池化層# 池化層輸出的特征圖 16*6*6self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)# 第1層隱藏層# in_features: 將最后池化層的16*6*6三維矩陣轉換為一維矩陣# 一維矩陣就是池化層這圖像self.linear1 = nn.Linear(in_features=16 * 6 * 6, out_features=120)# 第2層隱藏層self.linear2 = nn.Linear(in_features=120, out_features=84)# 輸出層# out_features: 10, 10分類問題self.out = nn.Linear(in_features=84, out_features=10)
?# todo:2-2 構建forward函數, 實現前向傳播def forward(self, x):# 第1層 卷積+激活+池化 計算x = self.pool1(torch.relu(self.conv1(x)))# 第2層 卷積+激活+池化 計算# x->(8, 16, 6, 6) 8個樣本, 每個樣本是16*6*6x = self.pool2(torch.relu(self.conv2(x)))# 第1層隱藏層 只能接收二維數據集# 四維數據集轉換成二維數據集# x.shape[0]: 每批樣本數, 最后一批可能不夠8個, 所以不是寫死8# -1*8=8*16*6*6 -1=16*6*6=576x = x.reshape(shape=(x.shape[0], -1))x = torch.relu(self.linear1(x))# 第2層隱藏層x = torch.relu(self.linear2(x))# 輸出層 沒有使用softmax激活函數, 后續多分類交叉熵損失函數會自動進行softmax激活x = self.out(x)return x# todo: 3-模型訓練
?
?
# todo: 4-模型評估
if __name__ == '__main__':train_dataset, valid_dataset = create_dataset()# print('圖片類別對應關系->', train_dataset.class_to_idx)# print('train_dataset->', train_dataset.data[0])# print('train_dataset.data.shape->', train_dataset.data.shape)# print('valid_dataset.data.shape->', valid_dataset.data.shape)# print('train_dataset.targets->', train_dataset.targets[0])# # 圖像展示# plt.figure(figsize=(2, 2))# plt.imshow(train_dataset.data[1])# plt.title(train_dataset.targets[1])# plt.show()model = ImageModel()summary(model, input_size=(3, 32, 32))
1.3 模型訓練
# todo: 3-模型訓練
def train(train_dataset):# 創建數據加載器dataloader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)# 創建模型對象model = ImageModel()# model.to(device='cuda')# 創建損失函數對象criterion = nn.CrossEntropyLoss()# 創建優化器對象optimizer = optim.Adam(model.parameters(), lr=1e-3)# 循環遍歷epoch# 定義epoch變量epoch = 10for epoch_idx in range(epoch):# 定義總損失變量total_loss = 0.0# 定義預測正確樣本個數變量total_correct = 0# 定義總樣本數據變量total_samples = 0# 定義開始時間變量start = time.time()# 循環遍歷數據加載器 min-batchfor x, y in dataloader:# print('y->', y)# 切換訓練模式model.train()# 模型預測youtput = model(x)# print('output->', output)# 計算損失值 平均損失值loss = criterion(output, y)# print('loss->', loss)# 梯度清零optimizer.zero_grad()# 梯度計算loss.backward()# 參數更新optimizer.step()# 統計預測正確的樣本個數# tensor([9, 9, 9, 9, 9, 9, 9, 9])# print(torch.argmax(output, dim=-1))# tensor([False, False, False, False, False, False, False, False])# print(torch.argmax(output, dim=-1) == y)# tensor(0)# print((torch.argmax(output, dim=-1) == y).sum())total_correct += (torch.argmax(output, dim=-1) == y).sum()# 統計當前批次的總損失值# loss.item(): 當前批次平均損失值total_loss += loss.item() * len(y)# 統計當前批次的樣本數total_samples += len(y)end = time.time()print('epoch:%2s loss:%.5f acc:%.2f time:%.2fs' % (epoch_idx + 1, total_loss / total_samples, total_correct / total_samples, end - start))# 保存訓練模型torch.save(obj=model.state_dict(), f='model/imagemodel.pth')
?
?
# todo: 4-模型評估
if __name__ == '__main__':train_dataset, valid_dataset = create_dataset()# 模型訓練train(train_dataset)
1.4 模型評估
# todo: 4-模型評估
def test(valid_dataset):# 創建測試集數據加載器dataloader = DataLoader(dataset=valid_dataset, batch_size=BATCH_SIZE, shuffle=False)# 創建模型對象, 加載訓練模型參數model = ImageModel()model.load_state_dict(torch.load('model/imagemodel.pth'))# 定義統計預測正確樣本個數變量 總樣本數據變量total_correct = 0total_samples = 0# 遍歷數據加載器for x, y in dataloader:# 切換推理模型model.eval()# 模型預測output = model(x)# 將預測分值轉成類別y_pred = torch.argmax(output, dim=-1)print('y_pred->', y_pred)# 統計預測正確的樣本個數total_correct += (y_pred == y).sum()# 統計總樣本數total_samples += len(y)
?# 打印精度print('Acc: %.2f' % (total_correct / total_samples))
?
if __name__ == '__main__':train_dataset, valid_dataset = create_dataset()# 模型訓練# train(train_dataset)# 模型預測test(valid_dataset)
1.5 網絡性能優化
-
增加卷積層的卷積核數據量
-
增加全連接層神經元數量
-
減小學習率
-
增加dropout隨機失活層
class ImageClassification(nn.Module):def __init__(self):super(ImageClassification, self).__init__()self.conv1 = nn.Conv2d(3, 32, stride=1, kernel_size=3)self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)self.conv2 = nn.Conv2d(32, 128, stride=1, kernel_size=3)self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2) ?self.linear1 = nn.Linear(128 * 6 * 6, 2048)self.linear2 = nn.Linear(2048, 2048)self.out = nn.Linear(2048, 10)# Dropout層,p表示神經元被丟棄的概率self.dropout = nn.Dropout(p=0.5) ?def forward(self, x):x = torch.relu(self.conv1(x))x = self.pool1(x)x = torch.relu(self.conv2(x))x = self.pool2(x)# 由于最后一個批次可能不夠 32,所以需要根據批次數量來 flattenx = x.reshape(x.size(0), -1)x = torch.relu(self.linear1(x))# dropout正則化# 訓練集準確率遠遠高于測試準確率,模型產生了過擬合x = self.dropout(x)x = torch.relu(self.linear2(x))x = self.dropout(x)return self.out(x)
2 RNN介紹
2.1 什么是RNN循環神經網絡
-
RNN是一種處理序列化數據的神經網絡計算模型
-
序列化數據
-
根據時間步生成的數據, 前后數據有關聯
-
文本數據
-
2.2 RNN應用場景
-
NLP: 文本生成, 機器翻譯
-
語音翻譯
-
音樂生成
3 詞嵌入層
3.1 詞嵌入層作用
-
詞向量化表示
-
低維稠密向量能更好學習詞的語義關系
3.2 詞嵌入層工作流程
-
通過jieba等分詞模塊先對句子進行分詞處理
-
獲取每個詞對應位置的下標值
-
將下標值轉換成張量對象丟入詞嵌入層進行詞向量輸出
3.3 詞嵌入層API使用
import torch
import jieba
import torch.nn as nn
?
?
def dm01():# 一句話包含多個詞text = '北京冬奧的進度條已經過半,不少外國運動員在完成自己的比賽后踏上歸途。'# 使用jieba模塊進行分詞words = jieba.lcut(text)# 返回詞的列表print('words->', words)# 創建詞嵌入層# num_embeddings:詞數量# embedding_dim:詞向量維度embed = nn.Embedding(num_embeddings=len(words), embedding_dim=8)# 獲取每個詞對象的下標索引for i, word in enumerate(words):# 將詞索引轉換成張量對象 向量word_vec = embed(torch.tensor(data=i))print('word_vec->', word_vec)
?
?
if __name__ == '__main__':dm01()
4 循環網絡層
4.1 RNN網絡層原理
-
作用: 處理序列文本數據
-
每一層的輸入有上一個時間步的隱藏狀態和當前的詞向量
-
每一層的輸出有當前預測分值y1和當前時間步的隱藏狀態
-
隱藏狀態: 具有記憶功能以及上下文理解功能
-
如果有多層RNN層, 進行生成式AI, 只取最后一層的隱藏狀態和預測輸出, 將預測輸出丟入到全連接神經網絡中進行詞的預測
-
詞匯表有多少個詞, 最后就是預測多少個類別, 每個詞就是一個類別, 獲取概率最大的類別對應的詞
-
-
h1 = relu(wh0+b+wx+b)
-
y1 = wh1+b
4.2 RNN網絡層API使用
import torch
import torch.nn as nn
?
?
def dm01():# 創建RNN層# input_size: 詞向量維度# hidden_size: 隱藏狀態向量維度# num_layers: 隱藏層數量, 默認1層rnn = nn.RNN(input_size=128, hidden_size=256, num_layers=1)# 輸入x# (5, 32, 128)->(每個句子的詞個數, 句子數, 詞向量維度)# RNN對象input_size=詞向量維度x = torch.randn(size=(5, 32, 128))# 上一個時間步的隱藏狀態h0# (1,32,256->(隱藏層數量, 句子數, 隱藏狀態向量維度)# RNN對象hidden_size=隱藏狀態向量維度h0 = torch.randn(size=(1, 32, 256))# 調用RNN層輸出當前預測值和當前的隱藏狀態h1output, h1 = rnn(x, h0)print(output.shape, h1.shape)
?
?
if __name__ == '__main__':dm01()
5 文本生成案例
5.1 構建詞表
import torch
import jieba
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
import time
?
?
# 獲取數據,并進行分詞,構建詞表
def build_vocab():# 數據集位置file_path = 'data/jaychou_lyrics.txt'# 分詞結果存儲位置# 唯一詞列表unique_words = []# 每行文本分詞列表, 文本數據替換成 每行詞列表的結果# [[],[],[]]all_words = []# 遍歷數據集中的每一行文本for line in open(file=file_path, mode='r', encoding='utf-8'):# 使用jieba分詞,分割結果是一個列表words = jieba.lcut(line)# print('words->', words)# 所有的分詞結果存儲到all_words,其中包含重復的詞組# [[],[],[]]all_words.append(words)# 遍歷分詞結果,去重后存儲到unique_words# words->['想要', '有', '直升機', '\n']for word in words:# word詞不再詞匯表中, 就是一個新詞if word not in unique_words:unique_words.append(word)
?# print('unique_words->', unique_words)# print('all_words->', all_words)
?# 語料中詞的數量word_count = len(unique_words)# print('word_count->', word_count)# 詞到索引映射 {詞:詞下標}word_to_index = {}for idx, word in enumerate(unique_words):word_to_index[word] = idx# print('word_to_index->', word_to_index)# 歌詞文本用詞表索引表示corpus_idx = []# print('all_words->', all_words)# 遍歷每一行的分詞結果# all_words->[['老街坊', ' ', '小', '弄堂', '\n'], ['消失', '的', ' ', '舊', '時光', ' ', '一九四三', '\n']]for words in all_words:# 臨時存儲每行詞的索引下標temp = []# 獲取每一行的詞,并獲取相應的索引# words->['老街坊', ' ', '小', '弄堂', '\n']for word in words:# 根據詞獲取字典中的詞索引下標 idx=dict[word]temp.append(word_to_index[word])# 在每行詞之間添加空格隔開 -> 將每行\n和下一行分開, 沒有語義關系temp.append(word_to_index[' '])# 獲取當前文檔中每個詞對應的索引# extend: 將temp列表元素拆分后存儲到corpus_idx列表中corpus_idx.extend(temp)return unique_words, word_to_index, word_count, corpus_idx
?
?
if __name__ == '__main__':unique_words, word_to_index, word_count, corpus_idx = build_vocab()# print('unique_words->', unique_words)# print('word_count->', word_count)# print('corpus_idx->', corpus_idx)# print('word_to_index->', word_to_index)
5.2 構建數據集對象
# 構建數據集對象型
# 創建類繼承 torch.utils.data.Dataset基類
class LyricsDataset(torch.utils.data.Dataset):# 定義構造方法# corpus_idx: 歌詞文本用詞表索引表示# num_chars: 每句話的詞數量def __init__(self, corpus_idx, num_chars):self.corpus_idx = corpus_idxself.num_chars = num_chars# 統計歌詞文本中有多少個詞, 不去重self.word_count = len(corpus_idx)# 歌詞文本能生成多少個句子self.number = self.word_count // self.num_chars
?# 重寫__len__魔法方法, len()->輸出返回值def __len__(self):return self.number
?# 重寫__getitem__魔法方法 obj[idx]執行此方法 遍歷數據加載器執行此方法def __getitem__(self, idx):# 設置起始下標值 start, 不能超過word_count-num_chars-1# -1: y的值要x基礎上后移一位start = min(max(idx, 0), self.word_count - self.num_chars - 1)# end = start + self.num_chars# 獲取xx = self.corpus_idx[start: end]y = self.corpus_idx[start + 1: end + 1]return torch.tensor(x), torch.tensor(y)
?
?
if __name__ == '__main__':unique_words, word_to_index, word_count, corpus_idx = build_vocab()# print('unique_words->', unique_words)# print('word_count->', word_count)# print('corpus_idx->', corpus_idx)# print('word_to_index->', word_to_index)print('corpus_idx->', len(corpus_idx))dataset = LyricsDataset(corpus_idx, 5)print(len(dataset))# 獲取第1組x和yx, y = dataset[49135]print('x->', x)print('y->', y)
5.3 構建網絡模型
class TextGenerator(nn.Module):def __init__(self, unique_word_count):super(TextGenerator, self).__init__()# 初始化詞嵌入層: 語料中詞的數量, 詞向量的維度為128self.ebd = nn.Embedding(unique_word_count, 128)# 循環網絡層: 詞向量維度128, 隱藏向量維度256, 網絡層數1self.rnn = nn.RNN(128, 256, 1)# 輸出層: 特征向量維度256與隱藏向量維度相同, 詞表中詞的個數self.out = nn.Linear(256, unique_word_count)
?def forward(self, inputs, hidden):# 輸出維度: (batch, seq_len, 詞向量維度128)# batch:句子數量# seq_len: 句子長度, 每個句子由多少個詞 詞數量embed = self.ebd(inputs)# rnn層x的表示形式為(seq_len, batch, 詞向量維度128)# output的表示形式與輸入x類似,為(seq_len, batch, 詞向量維度256)# 前后的hidden形狀要一樣, 所以DataLoader加載器的batch數要能被整數output, hidden = self.rnn(embed.transpose(0, 1), hidden)# 全連接層輸入二維數據, 詞數量*詞維度# 輸入維度: (seq_len*batch, 詞向量維度256)# 輸出維度: (seq_len*batch, 語料中詞的數量)# output: 每個詞的分值分布,后續結合softmax輸出概率分布# output.shape[-1]: 詞向量維度表示output = self.out(output.reshape(shape=(-1, output.shape[-1])))# 網絡輸出結果return output, hidden
?def init_hidden(self, bs):# 隱藏層的初始化:[網絡層數, batch, 隱藏層向量維度]return torch.zeros(1, bs, 256)if __name__ == "__main__":# 獲取數據unique_words, word_to_index, unique_word_count, corpus_idx = build_vocab()model = TextGenerator(unique_word_count)for named, parameter in model.named_parameters():print(named)print(parameter)
5.4 構建訓練函數
def train():# 構建詞典unique_words, word_to_index, unique_word_count, corpus_idx = build_vocab()# 數據集 LyricsDataset對象,并實現了 __getitem__ 方法lyrics = LyricsDataset(corpus_idx=corpus_idx, num_chars=32)# 查看句子數量# print(lyrics.number)# 初始化模型model = TextGenerator(unique_word_count)# 數據加載器 DataLoader對象,并將lyrics dataset對象傳遞給它lyrics_dataloader = DataLoader(lyrics, shuffle=True, batch_size=5)# 損失函數criterion = nn.CrossEntropyLoss()# 優化方法optimizer = optim.Adam(model.parameters(), lr=1e-3)# 訓練輪數epoch = 10for epoch_idx in range(epoch):# 訓練時間start = time.time()iter_num = 0 ?# 迭代次數# 訓練損失total_loss = 0.0# 遍歷數據集 DataLoader 會在后臺調用 dataset.__getitem__(index) 來獲取每個樣本的數據和標簽,并將它們組合成一個 batchfor x, y in lyrics_dataloader:print('y.shape->', y.shape)# 隱藏狀態的初始化hidden = model.init_hidden(bs=5)# 模型計算output, hidden = model(x, hidden)print('output.shape->', output.shape)# 計算損失# y形狀為(batch, seq_len), 需要轉換成一維向量->160個詞的下標索引# output形狀為(seq_len, batch, 詞向量維度)# 需要先將y進行維度交換(和output保持一致)再改變形狀y = torch.transpose(y, 0, 1).reshape(shape=(-1,))loss = criterion(output, y)optimizer.zero_grad()loss.backward()optimizer.step()iter_num += 1 ?# 迭代次數加1total_loss += loss.item()# 打印訓練信息print('epoch %3s loss: %.5f time %.2f' % (epoch_idx + 1, total_loss / iter_num, time.time() - start))# 模型存儲torch.save(model.state_dict(), 'model/lyrics_model_%d.pth' % epoch)if __name__ == "__main__":train()
5.5 構建預測函數
def predict(start_word, sentence_length):# 構建詞典unique_words, word_to_index, unique_word_count, _ = build_vocab()# 構建模型model = TextGenerator(unique_word_count)# 加載參數model.load_state_dict(torch.load('model/lyrics_model_10.pth'))# 隱藏狀態hidden = model.init_hidden(bs=1)# 將起始詞轉換為索引word_idx = word_to_index[start_word]# 產生的詞的索引存放位置generate_sentence = [word_idx]# 遍歷到句子長度,獲取每一個詞for _ in range(sentence_length):# 模型預測output, hidden = model(torch.tensor([[word_idx]]), hidden)# 獲取預測結果word_idx = torch.argmax(output)generate_sentence.append(word_idx)# 根據產生的索引獲取對應的詞,并進行打印for idx in generate_sentence:print(unique_words[idx], end='')
?
?
if __name__ == '__main__':# 調用預測函數predict('分手', 50)