本專欄深入探究從循環神經網絡(RNN)到Transformer等自然語言處理(NLP)模型的架構,以及基于這些模型構建的應用程序。
本系列文章內容:
- NLP自然語言處理基礎
- 詞嵌入(Word Embeddings)
- 循環神經網絡(RNN)、長短期記憶網絡(LSTM)和門控循環單元(GRU)
3.1 循環神經網絡(RNN)
3.2 長短期記憶網絡(LSTM)(本文)
3.3 門控循環單元(GRU) - 編碼器 - 解碼器架構(Encoder - Decoder Architecture)
- 注意力機制(Attention Mechanism)
- Transformer
- 編寫Transformer代碼
- 雙向編碼器表征來自Transformer(BERT)
- 生成式預訓練Transformer(GPT)
- 大語言模型(LLama)
- Mistral
1. 長短期記憶網絡(LSTMs,Long Short-Term Memory Networks)
在我們之前關于循環神經網絡(RNNs)的討論中,我們了解了它們的設計如何使其能夠有效地處理序列數據。這使得它們非常適合處理數據的序列和上下文至關重要的任務,比如分析時間序列數據或處理自然語言。
現在,我們要探討一種能夠解決傳統循環神經網絡面臨的重大挑戰之一的RNN類型:處理長期數據依賴關系。這就是長短期記憶網絡(LSTMs),其復雜程度更上一層樓。它們使用門控系統來控制信息在網絡中的流動,從而決定在長序列中保留哪些信息以及遺忘哪些信息。
循環神經網絡(RNN)與長短期記憶網絡(LSTM)對比。
a:RNN利用其內部狀態(記憶)來處理輸入序列;
b:長短期記憶(LSTM)網絡是RNN的一種變體,具有額外的長期記憶功能,可記住過去的數據
長短期記憶(LSTM)是循環神經網絡(RNN)家族中的一員,或者說是一種特殊的循環神經網絡。LSTM能夠通過長時間記住重要且相關的信息,來學習長期依賴關系,這是其默認的能力。
讓我們通過一個簡單的故事來剖析LSTM背后的核心思想:
曾經,維克拉姆國王打敗了XYZ國王,但隨后去世了。他的兒子小維克拉姆繼位,英勇作戰,但也在戰斗中犧牲了。他的孫子小小維克拉姆,沒有那么強壯,但憑借智慧最終打敗了XYZ國王,為家族報了仇。
當我們閱讀這個故事或任何一系列事件時,我們的大腦首先會關注眼前的細節。例如,我們會先處理維克拉姆國王的勝利和死亡信息。但隨著更多角色的出現,我們會調整對故事的長期理解,同時記住小維克拉姆和小小維克拉姆的情況。這種對上下文的不斷更新,反映了LSTM的工作方式:當新信息流入時,它們會維護并更新短期記憶和長期記憶。
循環神經網絡難以平衡短期和長期上下文信息。就像我們會清晰地記得電視劇的最新一集,但卻會忘記早期的細節一樣,當新數據到來時,RNN常常會丟失長期信息。LSTM通過創建兩條路徑來解決這個問題,一條用于短期記憶,一條用于長期記憶,這使得模型能夠保留重要信息,并丟棄不太重要的信息。
在LSTM中,信息通過細胞狀態流動,細胞狀態就像一條傳送帶,在向前傳遞有用信息的同時,有選擇地遺忘不相關的細節。與RNN中用新數據覆蓋舊數據不同,LSTM會應用精心設計的數學運算(加法和乘法)來保留關鍵信息。這使得它們能夠有效地對新數據和過去的數據進行優先級排序和管理。
每個細胞狀態取決于三種不同的依賴關系,分別是:
- 先前的細胞狀態(上一個時間步結束時存儲的信息)
- 先前的隱藏狀態(與上一個細胞的輸出相同)
- 當前時間步的輸入(當前時間步的新信息/輸入)
說了這么多,讓我們更詳細地討論LSTM的架構和功能。
2. LSTM架構
循環神經網絡(RNN)的架構是由一連串重復的神經網絡組成。這個重復的模塊具有一個簡單且單一的功能:使用tanh激活函數。
LSTM的架構也和RNN類似,同樣是由一連串重復的模塊/神經網絡組成。但LSTM的重復模塊并非只有一個tanh層,而是包含四個不同的功能。
這四個功能操作之間有著特殊的連接,它們分別是:
- Sigmoid激活函數
- Tanh激活函數
- 逐元素乘法
- 逐元素加法
在整個網絡中,信息以向量的形式進行傳遞。讓我們來討論一下上圖中提到的不同符號的含義:
- 方形框:表示單個神經網絡
- 圓形:表示逐元素操作,意味著操作是逐個元素進行的
- 箭頭標記:表示向量信息從一層傳遞到另一層
- 兩條線合并為一條線:表示連接兩個向量
- 一條線分為兩條線:表示將相同的信息傳遞到兩個不同的操作或層中
首先,讓我們討論一下LSTM架構中的主要功能和操作。
2.1 激活函數和線性操作
Sigmoid函數
Sigmoid函數也被稱為邏輯激活函數。這個函數有一條平滑的“S”形曲線。
Sigmoid函數的輸出結果始終在0到1的范圍內。
Sigmoid激活函數主要用于那些我們必須將概率作為輸出進行預測的模型中。由于任何輸入的概率只存在于0到1的范圍內,所以Sigmoid或邏輯激活函數是正確且最佳的選擇。
Tanh激活函數
Tanh激活函數看起來也與Sigmoid/邏輯函數相似。實際上,它是一個經過縮放的Sigmoid函數。我們可以將tanh函數的公式寫成Sigmoid函數的形式。
Tanh函數的結果值范圍是-1到+1。使用這個tanh函數,我們可以判斷輸入是強正、中性還是負。
逐元素乘法
兩個向量的逐元素乘法是對兩個向量的各個元素分別進行乘法運算。例如:
A = [1,2,3,4]
B = [2,3,4,5]
逐元素乘法結果 : [2,6,12,20]
逐元素加法
兩個向量的逐元素加法是將兩個向量的元素分別相加的過程。例如:
A = [1,2,3,4]
B = [2,3,4,5]
逐元素加法結果 : [3,5,7,9]
2.2 LSTM算法背后的關鍵概念
LSTM的主要獨特之處在于細胞狀態;它就像一條傳送帶,進行一些輕微的線性交互。
這意味著細胞狀態通過加法和乘法等基本運算來傳遞信息;這就是為什么信息能夠沿著細胞狀態平穩流動,與原始信息相比不會有太多變化。
LSTM的細胞狀態或傳送帶是下圖中突出顯示的水平線。
LSTM具有獨特的結構,能夠識別哪些信息是重要的,哪些是不重要的。LSTM可以根據信息的重要性,對細胞狀態進行信息的刪除或添加。這些特殊的結構被稱為門。
門是一種獨特的信息轉換方式,LSTM利用這些門來決定哪些信息需要記住、刪除,以及傳遞到另一層等等。
LSTM會根據這些信息對傳送帶(細胞狀態)進行信息的刪除或添加。每個門都由一個Sigmoid神經網絡層和一個逐元素乘法操作組成。
LSTM有三種門,分別是:
- 遺忘門
- 輸入門
- 輸出門
2.2.1 遺忘門
在LSTM架構的重復模塊中,第一個門是遺忘門。這個門的主要任務是決定哪些信息應該被保留,哪些信息應該被丟棄。
這意味著要決定將哪些信息發送到細胞狀態以便進一步處理。遺忘門的輸入是來自上一個隱藏狀態的信息和當前輸入,它將這兩個狀態的信息結合起來,然后通過Sigmoid函數進行處理。
Sigmoid函數的結果介于0和1之間。如果結果接近0,則表示遺忘;如果結果接近1,則表示保留/記住。
2.2.2 輸入門
LSTM架構中有一個輸入門,用于在遺忘門之后更新細胞狀態信息。輸入門有兩種神經網絡層,一種是Sigmoid層,另一種是Tanh層。這兩個網絡層都以上一個隱藏狀態的信息和當前輸入的信息作為輸入。
Sigmoid網絡層的結果范圍在0到1之間,而Tanh層的結果范圍是-1到1。Sigmoid層決定哪些信息是重要的并需要保留,Tanh層則對網絡進行調節。
在對上一個隱藏狀態信息和當前輸入信息應用Sigmoid函數和Tanh函數之后,我們將兩個輸出相乘。最后,Sigmoid層的輸出將決定從Tanh層的輸出中保留哪些重要信息。
2.2.3 輸出門
LSTM中的最后一個門是輸出門。輸出門的主要任務是決定下一個隱藏狀態中應該包含哪些信息。這意味著輸出層的輸出將作為下一個隱藏狀態的輸入。
輸出門也有兩個神經網絡層,與輸入門相同。但操作有所不同。從輸入門我們得到了更新后的細胞狀態信息。
在這個輸出門中,我們需要將隱藏狀態和當前輸入信息通過Sigmoid層,將更新后的細胞狀態信息通過Tanh層。然后將Sigmoid層和Tanh層的兩個結果相乘。
最終結果將作為輸入發送到下一個隱藏層。
3. LSTM的工作過程
LSTM架構中的首要步驟是決定哪些信息是重要的,哪些信息需要從上一個細胞狀態中丟棄。在LSTM中執行這一過程的第一個門是“遺忘門”。
遺忘門的輸入是上一個時間步的隱藏層信息( h t ? 1 h_{t-1} ht?1?)和當前時間步的輸入( x t x_t xt?),然后將其通過Sigmoid神經網絡層。
結果是以向量形式呈現,包含0和1的值。然后,對上一個細胞狀態( C t ? 1 C_{t-1} Ct?1?)的信息(向量形式)和Sigmoid函數的輸出( f t f_t ft?)進行逐元素乘法操作。
遺忘門的最終輸出中,1表示“完全保留這條信息”,0表示“不保留這條信息”。
接下來的步驟是決定將哪些信息存儲在當前細胞狀態( C t C_t Ct?)中。另一個門會執行這個任務,LSTM架構中的第二個門是“輸入門”。
用新的重要信息更新細胞狀態的整個過程將通過兩種激活函數/神經網絡層來完成,即Sigmoid神經網絡層和Tanh神經網絡層。
首先,Sigmoid網絡層的輸入和遺忘門一樣:上一個時間步的隱藏層信息( h t ? 1 h_{t-1} ht?1?)和當前時間步( x t x_t xt?)。
這個過程決定了我們將更新哪些值。然后,Tanh神經網絡層也接收與Sigmoid神經網絡層相同的輸入。它以向量( C ~ t \tilde{C}_t C~t?)的形式創建新的候選值,以調節網絡。
現在,我們對Sigmoid層和Tanh層的輸出進行逐元素乘法操作。之后,我們需要對遺忘門的輸出和輸入門中逐元素乘法的結果進行逐元素加法操作,以更新當前細胞狀態信息( C t C_t Ct?)。
LSTM架構中的最后一步是決定將哪些信息作為輸出;在LSTM中執行這一過程的最后一個門是“輸出門”。這個輸出將基于我們的細胞狀態,但會是經過篩選的版本。
在這個門中,我們首先應用Sigmoid神經網絡,它的輸入和之前門的Sigmoid層一樣:上一個時間步的隱藏層信息( h t ? 1 h_{t-1} ht?1?)和當前時間輸入( x t x_t xt?),以決定細胞狀態信息的哪些部分將作為輸出。
然后將更新后的細胞狀態信息通過Tanh神經網絡層進行調節(將值壓縮到-1和1之間),然后對Sigmoid神經網絡層和Tanh神經網絡層的兩個結果進行逐元素乘法操作。
整個過程在LSTM架構的每個模塊中重復進行。
4. LSTM架構的類型
LSTM是解決或處理序列預測問題的一個非常有趣的切入點。根據LSTM網絡作為層的使用方式,我們可以將LSTM架構分為多種類型。
本節將討論最常用的五種不同類型的LSTM架構,它們分別是:
- 普通LSTM(Vanilla LSTM):普通LSTM架構是基本的LSTM架構,它只有一個隱藏層和一個輸出層來預測結果。
- 堆疊式LSTM(Stacked LSTM):堆疊式LSTM架構是一種將多個LSTM層壓縮成一個列表的LSTM網絡模型,也被稱為深度LSTM網絡模型。在這種架構中,每個LSTM層預測輸出序列并將其發送到下一個LSTM層,而不是預測單個輸出值。然后,最后一個LSTM層預測單個輸出。
- 卷積神經網絡與LSTM結合的架構(CNN LSTM):CNN LSTM架構是CNN和LSTM架構的結合。這種架構使用CNN網絡層從輸入中提取關鍵特征,然后將這些特征發送到LSTM層,以支持序列預測。這種架構的一個應用示例是為輸入圖像或圖像序列(如視頻)生成文本描述。
- 編碼器-解碼器LSTM(Encoder-Decoder LSTM):編碼器-解碼器LSTM架構是一種特殊的LSTM架構,主要用于解決序列到序列的問題,如機器翻譯、語音識別等。編碼器-解碼器LSTM的另一個名稱是seq2seq(序列到序列)。序列到序列的問題在自然語言處理領域是具有挑戰性的問題,因為在這些問題中,輸入和輸出項的數量可能會有所不同。編碼器-解碼器LSTM架構有一個編碼器,用于將輸入轉換為中間編碼器向量。然后,一個解碼器將中間編碼器向量轉換為最終結果。編碼器和解碼器都是堆疊式的LSTM。
- 雙向LSTM(Bidirectional LSTM):雙向LSTM架構是傳統LSTM架構的擴展。這種架構更適合于序列分類問題,如情感分類、意圖分類等。雙向LSTM架構使用兩個LSTM,而不是一個LSTM,一個用于前向(從左到右),另一個用于后向(從右到左)。這種架構比傳統的LSTM能夠為網絡提供更多的上下文信息,因為它會從單詞的左右兩側收集信息。這將提高序列分類問題的性能。
5. 用Python從零構建LSTM
在本節中,我們將參考本文前面介紹的數學基礎和概念,逐步剖析在Python中實現LSTM的過程。我們將使用谷歌股票數據來訓練我們從零構建的模型。該數據集是從Kaggle上獲取的,可免費用于商業用途。
5.1 導入庫和初始設置
numpy
(np
)和pandas
(pd
):用于所有數組和數據幀操作,這在任何類型的數值計算中都是基礎操作,尤其是在神經網絡的實現中。WeightInitializer
、PlotManager
和EarlyStopping
類是自定義類。
WeightInitializer類:
import numpy as np
import pandas as pdfrom src.model import WeightInitializer
from src.trainer import PlotManager, EarlyStoppingclass WeightInitializer:def __init__(self, method='random'):self.method = methoddef initialize(self, shape):if self.method == 'random':return np.random.randn(*shape)elif self.method == 'xavier':return np.random.randn(*shape) / np.sqrt(shape[0])elif self.method == 'he':return np.random.randn(*shape) * np.sqrt(2 / shape[0])elif self.method == 'uniform':return np.random.uniform(-1, 1, shape)else:raise ValueError(f'Unknown initialization method: {self.method}')
WeightInitializer
是一個自定義類,用于處理權重的初始化。這一點至關重要,因為不同的初始化方法會顯著影響LSTM的收斂行為。
PlotManager類:
class PlotManager:def __init__(self):self.fig, self.ax = plt.subplots(3, 1, figsize=(6, 4))def plot_losses(self, train_losses, val_losses):self.ax.plot(train_losses, label='Training Loss')self.ax.plot(val_losses, label='Validation Loss')self.ax.set_title('Training and Validation Losses')self.ax.set_xlabel('Epoch')self.ax.set_ylabel('Loss')self.ax.legend()def show_plots(self):plt.tight_layout()
這是src.trainer
中的實用類,用于管理繪圖,它使我們能夠繪制訓練損失和驗證損失的圖像。
EarlyStopping類:
class EarlyStopping:"""Early stopping to stop the training when the loss does not improve afterArgs:-----patience (int): Number of epochs to wait before stopping the training.verbose (bool): If True, prints a message for each epoch where the lossdoes not improve.delta (float): Minimum change in the monitored quantity to qualify as an improvement."""def __init__(self, patience=7, verbose=False, delta=0):self.patience = patienceself.verbose = verboseself.counter = 0self.best_score = Noneself.early_stop = Falseself.delta = deltadef __call__(self, val_loss):"""Determines if the model should stop training.Args:val_loss (float): The loss of the model on the validation set."""score = -val_lossif self.best_score is None:self.best_score = scoreelif score < self.best_score + self.delta:self.counter += 1if self.counter >= self.patience:self.early_stop = Trueelse:self.best_score = scoreself.counter = 0
這是src.trainer
中的實用類,用于在訓練過程中處理提前停止操作,以防止過擬合。你可以在這篇文章中了解更多關于EarlyStopping
的信息,以及它的功能對深度神經網絡為何極其有用。
5.2 LSTM類
首先,讓我們看看整個類的樣子,然后將其分解為更易于管理的步驟:
class LSTM:"""Long Short-Term Memory (LSTM) network.Parameters:- input_size: int, dimensionality of input space- hidden_size: int, number of LSTM units- output_size: int, dimensionality of output space- init_method: str, weight initialization method (default: 'xavier')"""def __init__(self, input_size, hidden_size, output_size, init_method='xavier'):self.input_size = input_sizeself.hidden_size = hidden_sizeself.output_size = output_sizeself.weight_initializer = WeightInitializer(method=init_method)# Initialize weightsself.wf = self.weight_initializer.initialize((hidden_size, hidden_size + input_size))self.wi = self.weight_initializer.initialize((hidden_size, hidden_size + input_size))self.wo = self.weight_initializer.initialize((hidden_size, hidden_size + input_size))self.wc = self.weight_initializer.initialize((hidden_size, hidden_size + input_size))# Initialize biasesself.bf = np.zeros((hidden_size, 1))self.bi = np.zeros((hidden_size, 1))self.bo = np.zeros((hidden_size, 1))self.bc = np.zeros((hidden_size, 1))# Initialize output layer weights and biasesself.why = self.weight_initializer.initialize((output_size, hidden_size))self.by = np.zeros((output_size, 1))@staticmethoddef sigmoid(z):"""Sigmoid activation function.Parameters:- z: np.ndarray, input to the activation functionReturns:- np.ndarray, output of the activation function"""return 1 / (1 + np.exp(-z))@staticmethoddef dsigmoid(y):"""Derivative of the sigmoid activation function.Parameters:- y: np.ndarray, output of the sigmoid activation functionReturns:- np.ndarray, derivative of the sigmoid function"""return y * (1 - y)@staticmethoddef dtanh(y):"""Derivative of the hyperbolic tangent activation function.Parameters:- y: np.ndarray, output of the hyperbolic tangent activation functionReturns:- np.ndarray, derivative of the hyperbolic tangent function"""return 1 - y * ydef forward(self, x):"""Forward pass through the LSTM network.Parameters:- x: np.ndarray, input to the networkReturns:- np.ndarray, output of the network- list, caches containing intermediate values for backpropagation"""caches = []h_prev = np.zeros((self.hidden_size, 1))c_prev = np.zeros((self.hidden_size, 1))h = h_prevc = c_prevfor t in range(x.shape[0]):x_t = x[t].reshape(-1, 1)combined = np.vstack((h_prev, x_t))f = self.sigmoid(np.dot(self.wf, combined) + self.bf)i = self.sigmoid(np.dot(self.wi, combined) + self.bi)o = self.sigmoid(np.dot(self.wo, combined) + self.bo)c_ = np.tanh(np.dot(self.wc, combined) + self.bc)c = f * c_prev + i * c_h = o * np.tanh(c)cache = (h_prev, c_prev, f, i, o, c_, x_t, combined, c, h)caches.append(cache)h_prev, c_prev = h, cy = np.dot(self.why, h) + self.byreturn y, cachesdef backward(self, dy, caches, clip_value=1.0):"""Backward pass through the LSTM network.Parameters:- dy: np.ndarray, gradient of the loss with respect to the output- caches: list, caches from the forward pass- clip_value: float, value to clip gradients to (default: 1.0)Returns:- tuple, gradients of the loss with respect to the parameters"""dWf, dWi, dWo, dWc = [np.zeros_like(w) for w in (self.wf, self.wi, self.wo, self.wc)]dbf, dbi, dbo, dbc = [np.zeros_like(b) for b in (self.bf, self.bi, self.bo, self.bc)]dWhy = np.zeros_like(self.why)dby = np.zeros_like(self.by)# Ensure dy is reshaped to match output sizedy = dy.reshape(self.output_size, -1)dh_next = np.zeros((self.hidden_size, 1)) # shape must match hidden_sizedc_next = np.zeros_like(dh_next)for cache in reversed(caches):h_prev, c_prev, f, i, o, c_, x_t, combined, c, h = cache# Add gradient from next step to current output gradientdh = np.dot(self.why.T, dy) + dh_nextdc = dc_next + (dh * o * self.dtanh(np.tanh(c)))df = dc * c_prev * self.dsigmoid(f)di = dc * c_ * self.dsigmoid(i)do = dh * self.dtanh(np.tanh(c))dc_ = dc * i * self.dtanh(c_)dcombined_f = np.dot(self.wf.T, df)dcombined_i = np.dot(self.wi.T, di)dcombined_o = np.dot(self.wo.T, do)dcombined_c = np.dot(self.wc.T, dc_)dcombined = dcombined_f + dcombined_i + dcombined_o + dcombined_cdh_next = dcombined[:self.hidden_size]dc_next = f * dcdWf += np.dot(df, combined.T)dWi += np.dot(di, combined.T)dWo += np.dot(do, combined.T)dWc += np.dot(dc_, combined.T)dbf += df.sum(axis=1, keepdims=True)dbi += di.sum(axis=1, keepdims=True)dbo += do.sum(axis=1, keepdims=True)dbc += dc_.sum(axis=1, keepdims=True)dWhy += np.dot(dy, h.T)dby += dygradients = (dWf, dWi, dWo, dWc, dbf, dbi, dbo, dbc, dWhy, dby)# Gradient clippingfor i in range(len(gradients)):np.clip(gradients[i], -clip_value, clip_value, out=gradients[i])return gradientsdef update_params(self, grads, learning_rate):"""Update the parameters of the network using the gradients.Parameters:- grads: tuple, gradients of the loss with respect to the parameters- learning_rate: float, learning rate"""dWf, dWi, dWo, dWc, dbf, dbi, dbo, dbc, dWhy, dby = gradsself.wf -= learning_rate * dWfself.wi -= learning_rate * dWiself.wo -= learning_rate * dWoself.wc -= learning_rate * dWcself.bf -= learning_rate * dbfself.bi -= learning_rate * dbiself.bo -= learning_rate * dboself.bc -= learning_rate * dbcself.why -= learning_rate * dWhyself.by -= learning_rate * dby
初始化
__init__
方法使用指定的輸入層、隱藏層和輸出層的大小來初始化一個LSTM實例,并選擇一種權重初始化方法。
為門(遺忘門wf
、輸入門wi
、輸出門wo
和細胞門wc
)以及將最后一個隱藏狀態連接到輸出的權重(why
)初始化權重。通常選擇Xavier初始化方法,因為它是保持各層激活值方差的一個很好的默認選擇。
所有門和輸出層的偏置都初始化為零。這是一種常見的做法,盡管有時會添加小的常數以避免在開始時出現神經元死亡的情況。
前向傳播方法
我們首先將上一個隱藏狀態h_prev
和細胞狀態c_prev
設置為零,這對于第一個時間步來說是典型的做法。
def forward(self, x):
輸入x
按時間步進行處理,每個時間步都會更新門的激活值、細胞狀態和隱藏狀態。
for t in range(x.shape[0]):x_t = x[t].reshape(-1, 1)combined = np.vstack((h_prev, x_t))
在每個時間步,輸入和上一個隱藏狀態垂直堆疊,形成一個用于矩陣運算的單個組合輸入。這對于一次性高效地執行線性變換至關重要。
f = self.sigmoid(np.dot(self.wf, combined) + self.bf)
i = self.sigmoid(np.dot(self.wi, combined) + self.bi)
o = self.sigmoid(np.dot(self.wo, combined) + self.bo)
c_ = np.tanh(np.dot(self.wc, combined) + self.bc)c = f * c_prev + i * c_
h = o * np.tanh(c)
每個門(遺忘門、輸入門、輸出門)使用Sigmoid函數計算其激活值,這會影響細胞狀態和隱藏狀態的更新方式。
在這里,遺忘門(f
)決定保留上一個細胞狀態的多少。
輸入門(i
)決定添加多少新的候選細胞狀態(c_
)。
最后,輸出門(o
)計算將細胞狀態的哪一部分作為隱藏狀態輸出。
細胞狀態作為上一個狀態和新候選狀態的加權和進行更新。隱藏狀態是通過將更新后的細胞狀態通過一個Tanh函數,然后用輸出門進行門控得到的。
cache = (h_prev, c_prev, f, i, o, c_, x_t, combined, c, h)
caches.append(cache)
我們將反向傳播所需的相關值存儲在cache
中,這包括狀態、門激活值和輸入。
y = np.dot(self.why, h) + self.by
最后,輸出y
計算為最后一個隱藏狀態的線性變換。該方法返回輸出和緩存的值,以便在反向傳播期間使用。
反向傳播方法
此方法用于計算損失函數關于LSTM權重和偏置的梯度。在訓練過程中,這些梯度對于更新模型參數是必不可少的。
def backward(self, dy, caches, clip_value=1.0):dWf, dWi, dWo, dWc = [np.zeros_like(w) for w in (self.wf, self.wi, self.wo, self.wc)]dbf, dbi, dbo, dbc = [np.zeros_like(b) for b in (self.bf, self.bi, self.bo, self.bc)]dWhy = np.zeros_like(self.why)dby = np.zeros_like(self.by)
所有權重(dWf
、dWi
、dWo
、dWc
、dWhy
)和偏置(dbf
、dbi
、dbo
、dbc
、dby
)的梯度都初始化為零。這是必要的,因為梯度是在序列的每個時間步上累加的。
dy = dy.reshape(self.output_size, -1)
dh_next = np.zeros((self.hidden_size, 1))
dc_next = np.zeros_like(dh_next)
在這里,我們確保dy
的形狀適合進行矩陣運算。dh_next
和dc_next
存儲從后續時間步反向傳播回來的梯度。
for cache in reversed(caches):h_prev, c_prev, f, i, o, c_, x_t, combined, c, h = cache
從緩存中提取每個時間步的LSTM狀態和門的激活值。處理從最后一個時間步開始并向前推進(reversed(caches)
),這對于在循環神經網絡中正確應用鏈式法則(通過時間的反向傳播 - BPTT)至關重要。
dh = np.dot(self.why.T, dy) + dh_next
dc = dc_next + (dh * o * self.dtanh(np.tanh(c)))
df = dc * c_prev * self.dsigmoid(f)
di = dc * c_ * self.dsigmoid(i)
do = dh * self.dtanh(np.tanh(c))
dc_ = dc * i * self.dtanh(c_)
dh
和dc
分別是損失關于隱藏狀態和細胞狀態的梯度。每個門(df
、di
、do
)和候選細胞狀態(dc_
)的梯度使用鏈式法則計算,涉及Sigmoid(dsigmoid
)和雙曲正切(dtanh
)函數的導數,這些在門控機制部分已經討論過。
dWf += np.dot(df, combined.T)
dWi += np.dot(di, combined.T)
dWo += np.dot(do, combined.T)
dWc += np.dot(dc_, combined.T)
dbf += df.sum(axis=1, keepdims=True)
dbi += di.sum(axis=1, keepdims=True)
dbo += do.sum(axis=1, keepdims=True)
dbc += dc_.sum(axis=1, keepdims=True)
這些代碼行將每個權重和偏置在所有時間步上的梯度進行累加。
for i in range(len(gradients)):np.clip(gradients[i], -clip_value, clip_value, out=gradients[i])
為了防止梯度爆炸,我們將梯度裁剪到指定的范圍(clip_value
),這是訓練循環神經網絡時的常見做法。
參數更新方法
def update_params(self, grads, learning_rate):dWf, dWi, dWo, dWc, dbf, dbi, dbo, dbc, dWhy, dby = grads...self.wf -= learning_rate * dWf...
每個權重和偏置通過減去相應梯度的一部分(learning_rate
)來更新。這一步調整模型參數以最小化損失函數。
5.3 訓練與驗證
class LSTMTrainer:"""LSTM網絡的訓練器。參數:- model: LSTM,要訓練的LSTM網絡- learning_rate: float,優化器的學習率- patience: int,在提前停止訓練前等待的輪數- verbose: bool,是否打印訓練信息- delta: float,驗證損失有改善的最小變化量"""def __init__(self, model, learning_rate=0.01, patience=7, verbose=True, delta=0):self.model = modelself.learning_rate = learning_rateself.train_losses = []self.val_losses = []self.early_stopping = EarlyStopping(patience, verbose, delta)def train(self, X_train, y_train, X_val=None, y_val=None, epochs=10, batch_size=1, clip_value=1.0):"""訓練LSTM網絡。參數:- X_train: np.ndarray,訓練數據- y_train: np.ndarray,訓練標簽- X_val: np.ndarray,驗證數據- y_val: np.ndarray,驗證標簽- epochs: int,訓練輪數- batch_size: int,小批量的大小- clip_value: float,梯度裁剪的值"""for epoch in range(epochs):epoch_losses = []for i in range(0, len(X_train), batch_size):batch_X = X_train[i:i + batch_size]batch_y = y_train[i:i + batch_size]losses = []for x, y_true in zip(batch_X, batch_y):y_pred, caches = self.model.forward(x)loss = self.compute_loss(y_pred, y_true.reshape(-1, 1))losses.append(loss)# 反向傳播以獲取梯度dy = y_pred - y_true.reshape(-1, 1)grads = self.model.backward(dy, caches, clip_value=clip_value)self.model.update_params(grads, self.learning_rate)batch_loss = np.mean(losses)epoch_losses.append(batch_loss)avg_epoch_loss = np.mean(epoch_losses)self.train_losses.append(avg_epoch_loss)if X_val is not None and y_val is not None:val_loss = self.validate(X_val, y_val)self.val_losses.append(val_loss)print(f'輪數 {epoch + 1}/{epochs} - 損失: {avg_epoch_loss:.5f}, 驗證損失: {val_loss:.5f}')# 檢查提前停止條件self.early_stopping(val_loss)if self.early_stopping.early_stop:print("提前停止")breakelse:print(f'輪數 {epoch + 1}/{epochs} - 損失: {avg_epoch_loss:.5f}')def compute_loss(self, y_pred, y_true):"""計算均方誤差損失。"""return np.mean((y_pred - y_true) ** 2)def validate(self, X_val, y_val):"""在單獨的數據集上驗證模型。"""val_losses = []for x, y_true in zip(X_val, y_val):y_pred, _ = self.model.forward(x)loss = self.compute_loss(y_pred, y_true.reshape(-1, 1))val_losses.append(loss)return np.mean(val_losses)
訓練器會在多個輪次中協調訓練過程,處理數據批次,并可選擇對模型進行驗證。
for epoch in range(epochs):...for i in range(0, len(X_train), batch_size):...for x, y_true in zip(batch_X, batch_y):y_pred, caches = self.model.forward(x)...
每一批數據都會輸入到模型中。前向傳播會生成預測結果,并緩存用于反向傳播的中間值。
dy = y_pred - y_true.reshape(-1, 1)
grads = self.model.backward(dy, caches, clip_value=clip_value)
self.model.update_params(grads, self.learning_rate)
計算損失后,使用關于預測誤差的梯度(dy
)進行反向傳播。得到的梯度用于更新模型參數。
print(f'輪數 {epoch + 1}/{epochs} - 損失: {avg_epoch_loss:.5f}')
記錄訓練進度,以幫助長期監控模型的性能。
5.4 數據預處理
class TimeSeriesDataset:"""時間序列數據的數據集類。參數:- ticker: str,股票代碼- start_date: str,數據檢索的開始日期- end_date: str,數據檢索的結束日期- look_back: int,每個樣本中包含的前幾個時間步的數量- train_size: float,用于訓練的數據比例"""def __init__(self, start_date, end_date, look_back=1, train_size=0.67):self.start_date = start_dateself.end_date = end_dateself.look_back = look_backself.train_size = train_sizedef load_data(self):"""加載股票數據。返回:- np.ndarray,訓練數據- np.ndarray,測試數據"""df = pd.read_csv('data/google.csv')df = df[(df['Date'] >= self.start_date) & (df['Date'] <= self.end_date)]df = df.sort_index()df = df.loc[self.start_date:self.end_date]df = df[['Close']].astype(float) # 使用收盤價df = self.MinMaxScaler(df.values) # 將DataFrame轉換為numpy數組train_size = int(len(df) * self.train_size)train, test = df[0:train_size,:], df[train_size:len(df),:]return train, testdef MinMaxScaler(self, data):"""對數據進行最小 - 最大縮放。參數:- data: np.ndarray,輸入數據"""numerator = data - np.min(data, 0)denominator = np.max(data, 0) - np.min(data, 0)return numerator / (denominator + 1e-7)def create_dataset(self, dataset):"""創建用于時間序列預測的數據集。參數:- dataset: np.ndarray,輸入數據返回:- np.ndarray,輸入數據- np.ndarray,輸出數據"""dataX, dataY = [], []for i in range(len(dataset)-self.look_back):a = dataset[i:(i + self.look_back), 0]dataX.append(a)dataY.append(dataset[i + self.look_back, 0])return np.array(dataX), np.array(dataY)def get_train_test(self):"""獲取訓練和測試數據。返回:- np.ndarray,訓練輸入- np.ndarray,訓練輸出- np.ndarray,測試輸入- np.ndarray,測試輸出"""train, test = self.load_data()trainX, trainY = self.create_dataset(train)testX, testY = self.create_dataset(test)return trainX, trainY, testX, testY
這個類負責獲取數據并將其預處理成適合訓練LSTM的格式,包括縮放數據以及將其劃分為訓練集和測試集。
5.5 模型訓練
現在,讓我們利用上述定義的所有代碼來加載數據集、對其進行預處理,并訓練我們的LSTM模型。
首先,讓我們加載數據集:
# 實例化數據集
dataset = TimeSeriesDataset('2010-1-1', '2020-12-31', look_back=1)
trainX, trainY, testX, testY = dataset.get_train_test()
在這個實例中,它被配置為從Kaggle獲取谷歌(GOOGL)從2010年1月1日到2020年12月31日的歷史數據。
look_back = 1
:這個參數設置了每個輸入樣本中包含的過去時間步的數量。在這里,每個輸入樣本將包含前一個時間步的數據,這意味著模型將使用一天的數據來預測下一天的數據。
get_train_test()
:這個方法會處理獲取到的數據,對其進行歸一化,并將其劃分為訓練集和測試集。這對于在數據的一部分上訓練模型,并在另一部分上驗證其性能以檢查是否過擬合是至關重要的。
# 重塑輸入,使其形狀為 [樣本數, 時間步, 特征數]
trainX = np.reshape(trainX, (trainX.shape[0], trainX.shape[1], 1))
testX = np.reshape(testX, (testX.shape[0], testX.shape[1], 1))
這個重塑步驟將數據格式調整為LSTM所期望的格式。LSTM要求輸入的形狀為[樣本數, 時間步, 特征數]
。
這里:
- 樣本數:數據點的數量。
- 時間步:每個樣本中的時間步數量(
look_back
)。 - 特征數:每個時間步的特征數量(在這種情況下為1,因為我們可能只關注像收盤價這樣的一維數據)。
look_back = 1 # 每個樣本中包含的前幾個時間步的數量
hidden_size = 256 # LSTM單元的數量
output_size = 1 # 輸出空間的維度lstm = LSTM(input_size=1, hidden_size=hidden_size, output_size=output_size)
在這段代碼中:
hidden_size
:隱藏層中LSTM單元的數量,這里設置為256。這決定了模型的容量,更多的單元可能會捕捉到更復雜的模式,但也需要更多的計算能力和數據來有效訓練。output_size
:輸出維度,這里為1,表明模型每個輸入樣本預測一個單一的值,例如第二天的股票價格。
trainer = LSTMTrainer(lstm, learning_rate=1e-3, patience=50, verbose=True, delta=0.001)
trainer.train(trainX, trainY, testX, testY, epochs=1000, batch_size=32)
這里我們將學習率設置為1e - 3
(0.001)。學習率過高會導致模型過快收斂到一個次優解,而過低則會使訓練過程變慢,甚至可能陷入停滯。我們還將patience
設置為50,如果驗證損失在50個輪次內沒有改善,模型訓練將停止。
train()
方法會在指定的輪數和批次大小下執行訓練過程。在訓練期間,模型會每10個輪次打印一次模型性能,輸出結果類似于:
輪數 1/1000 - 損失: 0.25707, 驗證損失: 0.43853
輪數 11/1000 - 損失: 0.06463, 驗證損失: 0.06056
輪數 21/1000 - 損失: 0.05313, 驗證損失: 0.02100
輪數 31/1000 - 損失: 0.04862, 驗證損失: 0.01134
輪數 41/1000 - 損失: 0.04512, 驗證損失: 0.00678
輪數 51/1000 - 損失: 0.04234, 驗證損失: 0.00395
輪數 61/1000 - 損失: 0.04014, 驗證損失: 0.00210
輪數 71/1000 - 損失: 0.03840, 驗證損失: 0.00095
輪數 81/1000 - 損失: 0.03703, 驗證損失: 0.00031
輪數 91/1000 - 損失: 0.03595, 驗證損失: 0.00004
輪數 101/1000 - 損失: 0.03509, 驗證損失: 0.00003
輪數 111/1000 - 損失: 0.03442, 驗證損失: 0.00021
輪數 121/1000 - 損失: 0.03388, 驗證損失: 0.00051
輪數 131/1000 - 損失: 0.03346, 驗證損失: 0.00090
輪數 141/1000 - 損失: 0.03312, 驗證損失: 0.00133
提前停止
最后,讓我們繪制訓練損失和驗證損失,以便更好地了解模型可能的收斂或發散情況。我們可以使用以下代碼實現:
plot_manager = PlotManager()# 在訓練循環中
plot_manager.plot_losses(trainer.train_losses, trainer.val_losses)# 在訓練循環結束后
plot_manager.show_plots()
這將繪制一個類似于下面的圖表:
從圖表中我們可以看到,在早期輪次中,訓練損失和驗證損失都迅速下降,這表明我們的初始化技術(Xavier)可能不太適合這個任務。盡管在大約90個輪次后觸發了提前停止,取得了一些令人印象深刻的性能,但我們可以嘗試降低學習率并增加訓練輪數。此外,我們還可以嘗試使用其他技術,如學習率調度器或Adam優化器。