文章目錄
- 基于pytorch的LSTM進行字符集文本生成
- 前言
- 一、數據集
- 二、代碼實現
- 1.到入庫和LSTM進行模型構建
- 2.數據預處理函數
- 3.訓練函數
- 4.預測函數
- 5.文本生成函數
- 6.主函數
- 完整代碼
- 總結
前言
本文介紹了機器學習中深度學習的內容使用pytorch構建LSTM模型進行字符級文本生成任務
一、數據集
https://download.csdn.net/download/qq_52785473/78428834
二、代碼實現
1.導入庫及LSTM模型構建
代碼如下:
# coding: utf-8
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import OneHotEncoder
import torch.nn.functional as Fclass lstm_model(nn.Module):def __init__(self, vocab, hidden_size, num_layers, dropout=0.5):super(lstm_model, self).__init__()self.vocab = vocab # 字符數據集# 索引,字符self.int_char = {i: char for i, char in enumerate(vocab)}self.char_int = {char: i for i, char in self.int_char.items()}# 對字符進行one-hot encodingself.encoder = OneHotEncoder(sparse=True).fit(vocab.reshape(-1, 1))self.hidden_size = hidden_sizeself.num_layers = num_layers# lstm層self.lstm = nn.LSTM(len(vocab), hidden_size, num_layers, batch_first=True, dropout=dropout)# 全連接層self.linear = nn.Linear(hidden_size, len(vocab))def forward(self, sequence, hs=None):out, hs = self.lstm(sequence, hs) # lstm的輸出格式(batch_size, sequence_length, hidden_size)out = out.reshape(-1, self.hidden_size) # 這里需要將out轉換為linear的輸入格式,即(batch_size * sequence_length, hidden_size)output = self.linear(out) # linear的輸出格式,(batch_size * sequence_length, vocab_size)return output, hsdef onehot_encode(self, data): # 對數據進行編碼return self.encoder.transform(data)def onehot_decode(self, data): # 對數據進行解碼return self.encoder.inverse_transform(data)def label_encode(self, data): # 對標簽進行編碼return np.array([self.char_int[ch] for ch in data])def label_decode(self, data): # 對標簽進行解碼return np.array([self.int_char[ch] for ch in data])
2.數據預處理函數
def get_batches(data, batch_size, seq_len):''':param data: 源數據,輸入格式(num_samples, num_features):param batch_size: batch的大小:param seq_len: 序列的長度(精度):return: (batch_size, seq_len, num_features)'''num_features = data.shape[1]num_chars = batch_size * seq_len # 一個batch_size的長度num_batches = int(np.floor(data.shape[0] / num_chars)) # 計算出有多少個batchesneed_chars = num_batches * num_chars # 計算出需要的總字符量targets = np.vstack((data[1:].A, data[0].A)) # 可能版本問題,取成numpy比較好reshapeinputs = data[:need_chars].A.astype("int") # 從原始數據data中截取所需的字符數量need_wordstargets = targets[:need_chars]targets = targets.reshape(batch_size, -1, num_features)inputs = inputs.reshape(batch_size, -1, num_features)for i in range(0, inputs.shape[1], seq_len):x = inputs[:, i: i+seq_len]y = targets[:, i: i+seq_len]yield x, y # 節省內存
3.訓練函數
def train(model, data, batch_size, seq_len, epochs, lr=0.01, valid=None):device = 'cuda' if torch.cuda.is_available() else 'cpu'model = model.to(device)optimizer = torch.optim.Adam(model.parameters(), lr=lr)criterion = nn.CrossEntropyLoss()if valid is not None:data = model.onehot_encode(data.reshape(-1, 1))valid = model.onehot_encode(valid.reshape(-1, 1))else:data = model.onehot_encode(data.reshape(-1, 1))train_loss = []val_loss = []for epoch in range(epochs):model.train()hs = None # hs等于hidden_size隱藏層節點train_ls = 0.0val_ls = 0.0for x, y in get_batches(data, batch_size, seq_len):optimizer.zero_grad()x = torch.tensor(x).float().to(device)out, hs = model(x, hs)hs = ([h.data for h in hs])y = y.reshape(-1, len(model.vocab))y = model.onehot_decode(y)y = model.label_encode(y.squeeze())y = torch.from_numpy(y).long().to(device)loss = criterion(out, y.squeeze())loss.backward()optimizer.step()train_ls += loss.item()if valid is not None:model.eval()hs = Nonewith torch.no_grad():for x, y in get_batches(valid, batch_size, seq_len):x = torch.tensor(x).float().to(device) # x為一組測試數據,包含batch_size * seq_len個字out, hs = model(x, hs)# out.shape輸出為tensor[batch_size * seq_len, vocab_size]hs = ([h.data for h in hs]) # 更新參數y = y.reshape(-1, len(model.vocab)) # y.shape為(128,100,43),因此需要轉成兩維,每行就代表一個字了,43為字典大小y = model.onehot_decode(y) # y標簽即為測試數據各個字的下一個字,進行one_hot解碼,即變為字符# 但是此時y 是[[..],[..]]形式y = model.label_encode(y.squeeze()) # 因此需要去掉一維才能成功解碼# 此時y為[12...]成為一維的數組,每個代表自己字典里對應字符的字典序y = torch.from_numpy(y).long().to(device)# 這里y和y.squeeze()出來的東西一樣,可能這里沒啥用,不太懂loss = criterion(out, y.squeeze()) # 計算損失值val_ls += loss.item()val_loss.append(np.mean(val_ls))train_loss.append(np.mean(train_ls))print("train_loss:", train_ls)plt.plot(train_loss, label="train_loss")plt.plot(val_loss, label="val loss")plt.title("loop vs epoch")plt.legend()plt.show()model_name = "lstm_model.net"with open(model_name, 'wb') as f: # 訓練完了保存模型torch.save(model.state_dict(), f)
4.預測函數
def predict(model, char, top_k=None, hidden_size=None):device = 'cuda' if torch.cuda.is_available() else 'cpu'model.to(device)model.eval() # 固定參數with torch.no_grad():char = np.array([char]) # 輸入一個字符,預測下一個字是什么,先轉成numpychar = char.reshape(-1, 1) # 變成二維才符合編碼規范char_encoding = model.onehot_encode(char).A # 對char進行編碼,取成numpy比較方便reshapechar_encoding = char_encoding.reshape(1, 1, -1) # char_encoding.shape為(1, 1, 43)變成三維才符合模型輸入格式char_tensor = torch.tensor(char_encoding, dtype=torch.float32) # 轉成tensorchar_tensor = char_tensor.to(device)out, hidden_size = model(char_tensor, hidden_size) # 放入模型進行預測,out為結果probs = F.softmax(out, dim=1).squeeze() # 計算預測值,即所有字符的概率if top_k is None: # 選擇概率最大的top_k個indices = np.arange(vocab_size)else:probs, indices = probs.topk(top_k)indices = indices.cpu().numpy()probs = probs.cpu().numpy()char_index = np.random.choice(indices, p=probs/probs.sum()) # 隨機選擇一個字符索引作為預測值char = model.int_char[char_index] # 通過索引找出預測字符return char, hidden_size
5.文本生成函數
def sample(model, length, top_k=None, sentence="c"):hidden_size = Nonenew_sentence = [char for char in sentence]for i in range(length):next_char, hidden_size = predict(model, new_sentence[-1], top_k=top_k, hidden_size=hidden_size)new_sentence.append(next_char)return "".join(new_sentence)
6.主函數
def main():hidden_size = 512num_layers = 2batch_size = 128seq_len = 100epochs = 2lr = 0.01f = pd.read_csv("../datasets/dev.tsv", sep="\t", header=None)f = f[0]text = list(f)text = ".".join(text)vocab = np.array(sorted(set(text))) # 建立字典vocab_size = len(vocab)val_len = int(np.floor(0.2 * len(text))) # 劃分訓練測試集trainset = np.array(list(text[:-val_len]))validset = np.array(list(text[-val_len:]))model = lstm_model(vocab, hidden_size, num_layers) # 模型實例化train(model, trainset, batch_size, seq_len, epochs, lr=lr, valid=validset) # 訓練模型model.load_state_dict(torch.load("lstm_model.net")) # 調用保存的模型new_text = sample(model, 100, top_k=5) # 預測模型,生成100個字符,預測時選擇概率最大的前5個print(new_text) # 輸出預測文本if __name__ == "__main__":main()
本代碼還是有很大改進空間,例如進行詞語級的文本生成,以及使用word2vec等引入詞向量等,都可以是的模型獲得更好的效果。
完整代碼
# coding: utf-8
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import OneHotEncoder
import torch.nn.functional as Fclass lstm_model(nn.Module):def __init__(self, vocab, hidden_size, num_layers, dropout=0.5):super(lstm_model, self).__init__()self.vocab = vocab # 字符數據集# 索引,字符self.int_char = {i: char for i, char in enumerate(vocab)}self.char_int = {char: i for i, char in self.int_char.items()}# 對字符進行one-hot encodingself.encoder = OneHotEncoder(sparse=True).fit(vocab.reshape(-1, 1))self.hidden_size = hidden_sizeself.num_layers = num_layers# lstm層self.lstm = nn.LSTM(len(vocab), hidden_size, num_layers, batch_first=True, dropout=dropout)# 全連接層self.linear = nn.Linear(hidden_size, len(vocab))def forward(self, sequence, hs=None):out, hs = self.lstm(sequence, hs) # lstm的輸出格式(batch_size, sequence_length, hidden_size)out = out.reshape(-1, self.hidden_size) # 這里需要將out轉換為linear的輸入格式,即(batch_size * sequence_length, hidden_size)output = self.linear(out) # linear的輸出格式,(batch_size * sequence_length, vocab_size)return output, hsdef onehot_encode(self, data):return self.encoder.transform(data)def onehot_decode(self, data):return self.encoder.inverse_transform(data)def label_encode(self, data):return np.array([self.char_int[ch] for ch in data])def label_decode(self, data):return np.array([self.int_char[ch] for ch in data])def get_batches(data, batch_size, seq_len):''':param data: 源數據,輸入格式(num_samples, num_features):param batch_size: batch的大小:param seq_len: 序列的長度(精度):return: (batch_size, seq_len, num_features)'''num_features = data.shape[1]num_chars = batch_size * seq_len # 一個batch_size的長度num_batches = int(np.floor(data.shape[0] / num_chars)) # 計算出有多少個batchesneed_chars = num_batches * num_chars # 計算出需要的總字符量targets = np.vstack((data[1:].A, data[0].A)) # 可能版本問題,取成numpy比較好reshapeinputs = data[:need_chars].A.astype("int") # 從原始數據data中截取所需的字符數量need_wordstargets = targets[:need_chars]targets = targets.reshape(batch_size, -1, num_features)inputs = inputs.reshape(batch_size, -1, num_features)for i in range(0, inputs.shape[1], seq_len):x = inputs[:, i: i+seq_len]y = targets[:, i: i+seq_len]yield x, y # 節省內存def train(model, data, batch_size, seq_len, epochs, lr=0.01, valid=None):device = 'cuda' if torch.cuda.is_available() else 'cpu'model = model.to(device)optimizer = torch.optim.Adam(model.parameters(), lr=lr)criterion = nn.CrossEntropyLoss()if valid is not None:data = model.onehot_encode(data.reshape(-1, 1))valid = model.onehot_encode(valid.reshape(-1, 1))else:data = model.onehot_encode(data.reshape(-1, 1))train_loss = []val_loss = []for epoch in range(epochs):model.train()hs = None # hs等于hidden_size隱藏層節點train_ls = 0.0val_ls = 0.0for x, y in get_batches(data, batch_size, seq_len):optimizer.zero_grad()x = torch.tensor(x).float().to(device)out, hs = model(x, hs)hs = ([h.data for h in hs])y = y.reshape(-1, len(model.vocab))y = model.onehot_decode(y)y = model.label_encode(y.squeeze())y = torch.from_numpy(y).long().to(device)loss = criterion(out, y.squeeze())loss.backward()optimizer.step()train_ls += loss.item()if valid is not None:model.eval()hs = Nonewith torch.no_grad():for x, y in get_batches(valid, batch_size, seq_len):x = torch.tensor(x).float().to(device) # x為一組測試數據,包含batch_size * seq_len個字out, hs = model(x, hs)# out.shape輸出為tensor[batch_size * seq_len, vocab_size]hs = ([h.data for h in hs]) # 更新參數y = y.reshape(-1, len(model.vocab)) # y.shape為(128,100,43),因此需要轉成兩維,每行就代表一個字了,43為字典大小y = model.onehot_decode(y) # y標簽即為測試數據各個字的下一個字,進行one_hot解碼,即變為字符# 但是此時y 是[[..],[..]]形式y = model.label_encode(y.squeeze()) # 因此需要去掉一維才能成功解碼# 此時y為[12...]成為一維的數組,每個代表自己字典里對應字符的字典序y = torch.from_numpy(y).long().to(device)# 這里y和y.squeeze()出來的東西一樣,可能這里沒啥用,不太懂loss = criterion(out, y.squeeze()) # 計算損失值val_ls += loss.item()val_loss.append(np.mean(val_ls))train_loss.append(np.mean(train_ls))print("train_loss:", train_ls)plt.plot(train_loss, label="train_loss")plt.plot(val_loss, label="val loss")plt.title("loop vs epoch")plt.legend()plt.show()model_name = "lstm_model.net"with open(model_name, 'wb') as f: # 訓練完了保存模型torch.save(model.state_dict(), f)def predict(model, char, top_k=None, hidden_size=None):device = 'cuda' if torch.cuda.is_available() else 'cpu'model.to(device)model.eval() # 固定參數with torch.no_grad():char = np.array([char]) # 輸入一個字符,預測下一個字是什么,先轉成numpychar = char.reshape(-1, 1) # 變成二維才符合編碼規范char_encoding = model.onehot_encode(char).A # 對char進行編碼,取成numpy比較方便reshapechar_encoding = char_encoding.reshape(1, 1, -1) # char_encoding.shape為(1, 1, 43)變成三維才符合模型輸入格式char_tensor = torch.tensor(char_encoding, dtype=torch.float32) # 轉成tensorchar_tensor = char_tensor.to(device)out, hidden_size = model(char_tensor, hidden_size) # 放入模型進行預測,out為結果probs = F.softmax(out, dim=1).squeeze() # 計算預測值,即所有字符的概率if top_k is None: # 選擇概率最大的top_k個indices = np.arange(vocab_size)else:probs, indices = probs.topk(top_k)indices = indices.cpu().numpy()probs = probs.cpu().numpy()char_index = np.random.choice(indices, p=probs/probs.sum()) # 隨機選擇一個字符索引作為預測值char = model.int_char[char_index] # 通過索引找出預測字符return char, hidden_sizedef sample(model, length, top_k=None, sentence="c"):hidden_size = Nonenew_sentence = [char for char in sentence]for i in range(length):next_char, hidden_size = predict(model, new_sentence[-1], top_k=top_k, hidden_size=hidden_size)new_sentence.append(next_char)return "".join(new_sentence)def main():hidden_size = 512num_layers = 2batch_size = 128seq_len = 100epochs = 2lr = 0.01f = pd.read_csv("../datasets/dev.tsv", sep="\t", header=None)f = f[0]text = list(f)text = ".".join(text)vocab = np.array(sorted(set(text))) # 建立字典vocab_size = len(vocab)val_len = int(np.floor(0.2 * len(text))) # 劃分訓練測試集trainset = np.array(list(text[:-val_len]))validset = np.array(list(text[-val_len:]))model = lstm_model(vocab, hidden_size, num_layers) # 模型實例化train(model, trainset, batch_size, seq_len, epochs, lr=lr, valid=validset) # 訓練模型model.load_state_dict(torch.load("lstm_model.net")) # 調用保存的模型new_text = sample(model, 100, top_k=5) # 預測模型,生成100個字符,預測時選擇概率最大的前5個print(new_text) # 輸出預測文本if __name__ == "__main__":main()
總結
這個案例他數據預處理的時候,一個序列對應一個序列的關系,例如abcd對應的標簽為dabc,而不是一個字符,因此可能后面進行了某些操作使得他變成一個字符對應一個字符標簽的操作了吧,從而使得預測的時候,只能通過一個字符預測其后面的字符,這就有點失去循環神經網絡精髓的味道了,感覺是割裂字符之間的關系,變成一個普通單純的分類了。
循環神經網絡,因為能夠處理序列位置的信息,需要設定一個滑動窗口值,或者說時間步長什么的,作用應該就是保留序列特征,例如abcdef為訓練數據,設置滑動窗口為3的話,那么按照正常的序列思路可以劃分為abc-d、bcd-e、cde-f作為訓練數據的形式,即連續的三個字符對應的標簽為其后面一個字符,那么我們訓練出來的模型也是需要輸入三個字符,然后生成一個字符,再用預測出來的字符加上他前面兩個字符再預測新的字符,例如預測的初始序列為abc加入abc預測出來d,那么下一次預測就是bcd作為輸入,就像一個窗口一步一步滑動過去一樣,窗口的大小就為開始設定的3。
因此對于這個案例,它雖然seq_len=100,即滑動窗口為100,但是它的訓練數據就不太對,而且模型預測時也是一個字符預測下一個字符,并沒有體現滑動窗口的思想,因此這個代碼案例大家可以自己進行改進優化。
下面是對代碼部分地方的改動,使得它能夠按滑動窗口的思維來進行訓練和預測
數據預處理函數
def get_batches(data, batch_size, seq_len):''':param data: 源數據,輸入格式(num_samples, num_features):param batch_size: batch的大小:param seq_len: 序列的長度(精度):return: (batch_size, seq_len, num_features)'''num_features = data.shape[1]num_chars = batch_size * seq_len # 一個batch_size的長度num_batches = int(np.floor(data.shape[0] / num_chars)) # 計算出有多少個batchesneed_chars = num_batches * num_chars # 計算出需要的總字符量targets = np.vstack((data[1:].A, data[0].A)) # 可能版本問題,取成numpy比較好reshapeinputs = data[:need_chars].A.astype("int") # 從原始數據data中截取所需的字符數量need_wordstargets = targets[:need_chars]train_data = np.zeros((inputs.shape[0] - seq_len, seq_len, num_features))train_label = np.zeros((inputs.shape[0] - seq_len, num_features))for i in range(0, inputs.shape[0] - seq_len, 1):# inputs就是字符數 * 詞向量大小(表示一個字符)# 思路就是abcd中ab-c, bc-d,一共4-3+1個train_data[i] = inputs[i:i+seq_len] # 每seq_len=100的字符train_label[i] = inputs[i+seq_len-1] # 訓練標簽就為他后面那個字符print(train_data.shape)print(train_label.shape)for i in range(0, inputs.shape[0] - seq_len, batch_size): x = train_data[i:i+batch_size] # 每batch_size=128個一起進行訓練更新參數y = train_label[i:i+batch_size] # 對應的128個標簽print(x.shape)print(y.shape)print("-----------")yield x, y
模型構建部分
class lstm_model(nn.Module):def __init__(self, vocab, hidden_size, num_layers, dropout=0.5, seq_len=100):super(lstm_model, self).__init__()self.seq_len = seq_lenself.vocab = vocab # 字符數據集# 索引,字符self.int_char = {i: char for i, char in enumerate(vocab)}self.char_int = {char: i for i, char in self.int_char.items()}# 對字符進行one-hot encodingself.encoder = OneHotEncoder(sparse=True).fit(vocab.reshape(-1, 1))self.hidden_size = hidden_sizeself.num_layers = num_layers# lstm層self.lstm = nn.LSTM(len(vocab), hidden_size, num_layers, batch_first=True, dropout=dropout)# 全連接層self.linear = nn.Linear(hidden_size, len(vocab))def forward(self, sequence, hs=None):# print("==========")# print("forward:", sequence.shape)out, hs = self.lstm(sequence, hs) # lstm的輸出格式(batch_size, sequence_length, hidden_size)print("----", out.shape)# out = out.reshape(-1, self.hidden_size) # 這里需要將out轉換為linear的輸入格式,即(batch_size * sequence_length, hidden_size)print("========", out[:, -1].shape)output = self.linear(out[:, -1]) # 只取[bacth_size,hidden_size],即找到batch_size里每個元素的標簽吧print("output-----:", output.shape)return output, hsdef onehot_encode(self, data):return self.encoder.transform(data)def onehot_decode(self, data):return self.encoder.inverse_transform(data)def label_encode(self, data):return np.array([self.char_int[ch] for ch in data])def label_decode(self, data):return np.array([self.int_char[ch] for ch in data])