目錄
一、從 RNN 到 DRNN:為什么需要 “深度”?
二、DRNN 的核心結構
1. 時間維度:循環傳遞
2. 空間維度:多層隱藏層
3. 雙向 DRNN(Bidirectional DRNN)
三、DRNN 的關鍵挑戰與優化
1. 梯度消失 / 爆炸
2. 訓練不穩定
3. 計算復雜度
四、DRNN 的典型應用場景
五、DRNN 與其他模型的對比
六、深度循環神經網絡結構圖
七、完整代碼
八、實驗結果
深度循環神經網絡(Deep Recurrent Neural Networks, DRNN)是循環神經網絡(RNN)的深度擴展形式,其核心是在序列數據處理中引入多層隱藏結構,以捕捉更復雜的時序特征和層次化信息。相較于淺層 RNN,DRNN 能處理更復雜的序列任務(如長文本理解、語音識別、視頻分析等),因為它可以分層提取從低級到高級的特征(如語音中的 “聲波→音素→單詞→語義”)。
一、從 RNN 到 DRNN:為什么需要 “深度”?
要理解 DRNN,需先明確 RNN 的基礎邏輯:RNN 通過隱藏狀態(hidden state)?保存歷史信息,實現對序列數據(如文本、語音、視頻幀)的建模。但其局限性在于:
- 淺層 RNN(單隱藏層)只能捕捉單一層次的時序特征,難以處理包含多尺度結構的復雜序列(如語言中 “字母→詞→短語→句子” 的層級關系);
- 對于長序列或高維度輸入(如視頻幀的像素級數據),淺層網絡的特征提取能力不足,容易出現 “欠擬合”。
DRNN 的核心改進是在時間步內堆疊多個隱藏層,讓每一層專注于提取不同層次的特征(低層處理局部細節,高層處理抽象全局信息)。例如:在語音識別中,底層可能提取聲波的頻率特征,中層轉換為音素特征,高層聚合為單詞或語義。
二、DRNN 的核心結構
DRNN 的 “深度” 體現在同一時間步內的多層隱藏層堆疊,結合時間維度的循環結構,形成 “空間深度 + 時間循環” 的復合模型。其基本結構可拆解為以下要素:
1. 時間維度:循環傳遞
與 RNN 一致,DRNN 在時間上展開,每個時間步的輸入依賴前序時間步的信息。設序列輸入為?(T?為序列長度),則第?t?時間步的處理與?
相關。
2. 空間維度:多層隱藏層
在每個時間步?t?內,DRNN 包含?L?個隱藏層(),層與層之間垂直堆疊:
- 第 1 層(底層)接收當前時間步的輸入
?和上一時間步第 1 層的隱藏狀態?
,輸出
;
- 第 2 層接收第 1 層的輸出?
和上一時間步第 2 層的隱藏狀態?
,輸出
;
- ...
- 第?L?層(頂層)輸出最終隱藏狀態
,用于預測或后續任務(如分類、生成)。
以數學公式表示(假設使用 LSTM/GRU 作為隱藏層單元,緩解梯度問題): 對于第?l?層(),第?t?時間步的隱藏狀態?
?計算為:
其中:
(第 0 層為輸入層);
可為 LSTM、GRU 或改進的門控單元(避免基礎 RNN 的梯度消失)。
3. 雙向 DRNN(Bidirectional DRNN)
為同時利用 “過去” 和 “未來” 的信息(如語音識別中需結合上下文判斷當前單詞),DRNN 可擴展為雙向結構:
- 正向層(Forward Layer):從序列起始到結束傳遞信息(利用過去);
- 反向層(Backward Layer):從序列結束到起始傳遞信息(利用未來);
- 最終輸出為正向頂層和反向頂層的隱藏狀態拼接(
)。
三、DRNN 的關鍵挑戰與優化
深層結構雖提升特征能力,但也帶來訓練難度,核心挑戰及解決方案如下:
1. 梯度消失 / 爆炸
- 問題:深層網絡中,反向傳播時梯度需經過多層傳遞,易因鏈式法則導致梯度衰減(消失)或放大(爆炸),尤其時間維度的循環會加劇這一問題。
- 解決方案:
- 用LSTM/GRU 替代基礎 RNN:門控機制(輸入門、遺忘門、輸出門)可控制信息流動,減少梯度衰減;
- 梯度裁剪(Gradient Clipping):當梯度 norm 超過閾值時,縮放梯度至閾值內,避免爆炸;
- 優化器選擇:Adam、RMSprop 等自適應學習率優化器可動態調整梯度更新幅度,緩解消失。
2. 訓練不穩定
- 問題:深層網絡中,不同層的學習速度差異大(低層可能飽和,高層仍未收斂),導致整體性能波動。
- 解決方案:
- 初始化策略:采用 Xavier/Glorot 初始化(針對 sigmoid/tanh 激活)或 He 初始化(針對 ReLU),使各層梯度方差相近;
- 時序歸一化(Layer Normalization for Sequences):在循環層內對隱藏狀態做歸一化(避免批量歸一化在時序數據中的分布偏移問題),穩定訓練。
3. 計算復雜度
- 問題:深層 + 長序列會導致計算量激增(時間復雜度?
,L?為層數,T?為序列長度,N?為隱藏單元數)。
- 解決方案:
- 簡化隱藏層單元(如用輕量級 GRU 替代 LSTM);
- 截斷長序列(如按固定長度分段);
- 量化訓練(低精度計算)。
四、DRNN 的典型應用場景
DRNN 的多層特征提取能力使其在復雜序列任務中表現突出,典型場景包括:
-
語音識別 從原始聲波(連續序列)中分層提取特征:底層處理頻譜特征,中層轉換為音素,高層聚合為單詞,最終輸出文本。雙向 DRNN 結合 CTC( Connectionist Temporal Classification)損失,可實現端到端語音識別。
-
機器翻譯 處理長句子時,DRNN 的深層結構可捕捉 “詞→短語→句法結構→語義” 的多層關系,配合注意力機制(如 Encoder-Decoder 框架中的深度循環編碼器),提升翻譯準確性。
-
視頻行為分析 視頻幀序列中,底層提取像素 / 運動特征,中層識別局部動作(如 “揮手”),高層判斷全局行為(如 “打招呼”)。DRNN 可建模幀間依賴,實現行為分類或預測。
-
情感分析(長文本) 長文本的情感依賴多層上下文(如 “雖然… 但是…” 的轉折),DRNN 通過深層隱藏狀態捕捉遠距離語義關聯,提升情感極性判斷精度。
五、DRNN 與其他模型的對比
模型 | 核心優勢 | 局限性 |
---|---|---|
淺層 RNN | 計算簡單,適合短序列 | 無法捕捉多層次特征 |
DRNN | 多層特征提取,適合復雜序列 | 訓練復雜,計算量大 |
Transformer | 并行計算(自注意力),長依賴建模 | 對短序列效率低,需大量數據 |
深度 CNN+RNN | 結合空間特征(CNN)與時序特征(RNN) | 難以處理變長序列的動態依賴 |
六、深度循環神經網絡結構圖
七、完整代碼
"""
文件名: 9.3 從零實現深度循環神經網絡(DRNN)
作者: 墨塵
日期: 2025/7/16
項目名: dl_env
備注:
"""# -------------------------------------------------------------------------------------------------------- 基礎工具庫導入 ------------------------------------------------------------------------------
import collections # 用于統計詞頻(構建詞表時需統計每個詞元出現的次數)
import random # 隨機抽樣生成訓練數據(增加數據隨機性,提升模型泛化能力)
import re # 文本清洗(通過正則表達式過濾非目標字符)
import requests # 下載數據集(從網絡獲取《時間機器》文本數據)
from pathlib import Path # 文件路徑處理(創建目錄、檢查文件是否存在等)
from d2l import torch as d2l # 深度學習工具庫(提供訓練輔助、可視化等功能)
import math # 數學運算(計算困惑度等指標)
import torch # PyTorch框架(核心深度學習庫,提供張量運算、自動求導等)
from torch import nn # 神經網絡模塊(提供損失函數、層定義等)
from torch.nn import functional as F # 函數式API(提供激活函數、one-hot編碼等工具)# 圖像顯示相關庫(解決中文和符號顯示問題)
import matplotlib.pyplot as plt
import matplotlib.text as text# --------------------------------------------------------------------------------------------------------核心解決方案:解決文本顯示問題 --------------------------------------------------------------------------------------------------------
def replace_minus(s):"""解決Matplotlib中Unicode減號(U+2212)顯示為方塊的問題原理:將特殊減號替換為普通ASCII減號('-'),確保所有環境都能正常顯示"""if isinstance(s, str): # 僅處理字符串類型return s.replace('\u2212', '-') # 替換Unicode減號為ASCII減號return s # 非字符串直接返回# 重寫matplotlib的Text類的set_text方法,實現全局生效
original_set_text = text.Text.set_text # 保存原始方法(避免覆蓋后無法恢復)def new_set_text(self, s):s = replace_minus(s) # 先處理減號return original_set_text(self, s) # 調用原始方法設置文本text.Text.set_text = new_set_text # 應用重寫后的方法(所有文本顯示都會經過此處理)# -------------------------- 字體配置(確保中文和數學符號正常顯示)--------------------------
plt.rcParams["font.family"] = ["SimHei"] # 設置中文字體(SimHei支持中文顯示,避免中文亂碼)
plt.rcParams["text.usetex"] = True # 使用LaTeX渲染文本(提升數學符號顯示美觀度)
plt.rcParams["axes.unicode_minus"] = True # 確保負號正確顯示(避免負號顯示為方塊)
plt.rcParams["mathtext.fontset"] = "cm" # 數學符號使用Computer Modern字體(LaTeX標準字體,更專業)
d2l.plt.rcParams.update(plt.rcParams) # 讓d2l庫的繪圖工具繼承上述配置(保持顯示一致性)# --------------------------------------------------------------------------------------------------------數據處理模塊 ------------------------------------------------------------------------------# -------------------------- 1. 讀取數據 --------------------------
def read_time_machine():"""下載并讀取《時間機器》數據集,返回清洗后的文本行列表作用:獲取原始文本數據并預處理,為后續詞元化做準備"""data_dir = Path('./data') # 數據存儲目錄(當前目錄下的data文件夾)data_dir.mkdir(exist_ok=True) # 目錄不存在則創建(exist_ok=True避免重復創建報錯)file_path = data_dir / 'timemachine.txt' # 數據集文件路徑# 檢查文件是否存在,不存在則下載if not file_path.exists():print("開始下載時間機器數據集...")# 從d2l官方地址下載文本(《時間機器》是經典數據集,適合語言模型訓練)response = requests.get('http://d2l-data.s3-accelerate.amazonaws.com/timemachine.txt')# 寫入文件(utf-8編碼確保兼容多種字符)with open(file_path, 'w', encoding='utf-8') as f:f.write(response.text)print(f"數據集下載完成,保存至: {file_path}")# 讀取文件并清洗文本with open(file_path, 'r', encoding='utf-8') as f:lines = f.readlines() # 按行讀取(每行作為列表元素)print(f"文件讀取成功,總行數: {len(lines)}")if len(lines) > 0:print(f"第一行內容: {lines[0].strip()}") # 打印首行驗證是否正確讀取# 清洗規則:# 1. re.sub('[^A-Za-z]+', ' ', line):保留字母,其他字符(如數字、符號)替換為空格# 2. strip():去除首尾空格# 3. lower():轉小寫(統一大小寫,減少詞元數量)# 4. 過濾空行(if line.strip()確保僅保留非空行)cleaned_lines = [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines if line.strip()]print(f"清洗后有效行數: {len(cleaned_lines)}") # 清洗后非空行數量(去除純空格行)return cleaned_lines# -------------------------- 2. 詞元化與詞表構建 --------------------------
def tokenize(lines, token='char'):"""將文本行轉換為詞元列表(詞元是文本的最小處理單位)參數:lines: 清洗后的文本行列表(如["abc def", "ghi jkl"])token: 詞元類型('char'字符級/'word'單詞級)返回:詞元列表(如字符級:[['a','b','c',' ','d','e','f'], ...])作用:將文本拆分為模型可處理的最小單元(詞元),字符級適合簡單語言模型"""if token == 'char':# 字符級詞元化:將每行拆分為單個字符列表(包括空格,如"abc"→['a','b','c'])return [list(line) for line in lines]elif token == 'word':# 單詞級詞元化:按空格拆分每行(需確保文本已用空格分隔單詞,如"abc def"→['abc','def'])return [line.split() for line in lines]else:raise ValueError('未知詞元類型:' + token)class Vocab:"""詞表類:實現詞元與索引的雙向映射,用于將文本轉換為模型可處理的數字序列核心功能:將字符串形式的詞元轉換為整數索引(模型只能處理數字),同時支持索引轉詞元(用于生成文本)"""def __init__(self, tokens, min_freq=0, reserved_tokens=None):"""構建詞表參數:tokens: 詞元列表(可嵌套,如[[token1, token2], [token3]])min_freq: 最低詞頻閾值(低于此值的詞元不加入詞表,減少詞匯量)reserved_tokens: 預留特殊詞元(如分隔符、填充符等,模型可能需要的特殊標記)"""if reserved_tokens is None:reserved_tokens = [] # 默認為空(無預留詞元)# 統計詞頻:# 1. 展平嵌套列表([token for line in tokens for token in line])# 2. 用Counter計數(得到{詞元: 出現次數}字典)counter = collections.Counter([token for line in tokens for token in line])# 按詞頻降序排序(便于后續按頻率篩選,高頻詞優先保留)self.token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True)# 初始化詞表:# <unk>(未知詞元)固定在索引0(所有未見過的詞元都映射到<unk>)# followed by預留詞元(如用戶指定的特殊標記)self.idx_to_token = ['<unk>'] + reserved_tokens# 構建詞元到索引的映射(字典,便于快速查詢)self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)}# 按詞頻添加詞元(過濾低頻詞)for token, freq in self.token_freqs:if freq < min_freq:break # 低頻詞不加入詞表(提前終止,提升效率)if token not in self.token_to_idx: # 避免重復添加預留詞元(如預留詞元已在列表中)self.idx_to_token.append(token)self.token_to_idx[token] = len(self.idx_to_token) - 1 # 索引為當前長度-1(保持連續)def __len__(self):"""返回詞表大小(詞元總數,用于模型輸入/輸出維度設置)"""return len(self.idx_to_token)def __getitem__(self, tokens):"""詞元→索引(支持單個詞元或詞元列表)未知詞元返回<unk>的索引(0),確保模型輸入始終有效"""if not isinstance(tokens, (list, tuple)):# 單個詞元:查字典,默認返回<unk>的索引(0)return self.token_to_idx.get(tokens, self.unk)# 詞元列表:遞歸轉換每個詞元(如['a','b']→[2,3])return [self.__getitem__(token) for token in tokens]def to_tokens(self, indices):"""索引→詞元(支持單個索引或索引列表,用于將模型輸出轉換為文本)"""if not isinstance(indices, (list, tuple)):# 單個索引:直接查列表(如2→'a')return self.idx_to_token[indices]# 索引列表:遞歸轉換每個索引(如[2,3]→['a','b'])return [self.idx_to_token[index] for index in indices]@propertydef unk(self):"""返回<unk>的索引(固定為0,便于統一處理未知詞元)"""return 0# -------------------------- 3. 數據迭代器(隨機抽樣) --------------------------
def seq_data_iter_random(corpus, batch_size, num_steps):"""隨機抽樣生成批量子序列(生成器),用于模型訓練的批量輸入原理:從語料中隨機截取多個長度為num_steps的子序列,組成批次(避免模型學習到固定的句子順序)參數:corpus: 詞元索引序列(1D列表,如[1,3,5,2,...],所有文本的詞元索引拼接而成)batch_size: 批量大小(每個批次包含的子序列數,影響訓練效率和內存占用)num_steps: 子序列長度(時間步,即模型一次處理的序列長度,如35表示一次輸入35個詞元)返回:生成器,每次返回(X, Y):X: 輸入序列(batch_size, num_steps),模型的輸入Y: 標簽序列(batch_size, num_steps),是X右移一位的結果(模型需要預測的下一個詞元)"""# 檢查數據是否足夠生成至少一個子序列(子序列長度+1,因Y是X右移1位,需多1個元素)if len(corpus) < num_steps + 1:raise ValueError(f"語料庫長度({len(corpus)})不足,需至少{num_steps + 1}")# 隨機偏移起始位置(0到num_steps-1),增加數據隨機性(避免每次從固定位置開始)corpus = corpus[random.randint(0, num_steps - 1):]# 計算可生成的子序列總數:# (語料長度-1) // num_steps(-1是因Y需多1個元素,每個子序列需num_steps+1個元素)num_subseqs = (len(corpus) - 1) // num_stepsif num_subseqs < 1:raise ValueError(f"無法生成子序列(語料庫長度不足)")# 生成所有子序列的起始索引(間隔為num_steps,如0, num_steps, 2*num_steps...)initial_indices = list(range(0, num_subseqs * num_steps, num_steps))random.shuffle(initial_indices) # 打亂起始索引,實現隨機抽樣(核心:避免子序列順序固定)# 計算可生成的批次數:子序列總數 // 批量大小(確保每個批次有batch_size個子序列)num_batches = num_subseqs // batch_sizeif num_batches < 1:raise ValueError(f"子序列數量({num_subseqs})不足,需至少{batch_size}個")# 生成批量數據for i in range(0, batch_size * num_batches, batch_size):# 當前批次的起始索引(從打亂的索引中取batch_size個,如i=0時取前batch_size個)indices = initial_indices[i: i + batch_size]# 輸入序列X:每個子序列從indices[j]開始,取num_steps個元素(如indices[j]=0→[0:35])X = [corpus[j: j + num_steps] for j in indices]# 標簽序列Y:每個子序列從indices[j]+1開始,取num_steps個元素(X右移1位,如[1:36])Y = [corpus[j + 1: j + num_steps + 1] for j in indices]# 轉換為張量返回(便于模型處理,PyTorch模型輸入需為張量)yield torch.tensor(X), torch.tensor(Y)# -------------------------- 4. 數據加載函數(關鍵修復:返回可重置的迭代器) --------------------------
def load_data_time_machine(batch_size, num_steps):"""加載《時間機器》數據,返回數據迭代器生成函數和詞表修復點:返回迭代器生成函數(而非一次性迭代器),確保訓練時可重復生成數據(每個epoch重新抽樣)參數:batch_size: 批量大小num_steps: 子序列長度(時間步)返回:data_iter: 迭代器生成函數(調用時返回新的迭代器,每次調用重新抽樣)vocab: 詞表對象(用于詞元與索引的轉換)"""lines = read_time_machine() # 讀取清洗后的文本行tokens = tokenize(lines, token='char') # 字符級詞元化(每個字符為詞元,適合簡單語言模型)vocab = Vocab(tokens) # 構建詞表(根據詞元生成索引映射)# 將所有詞元轉換為索引(展平為1D序列,如[[ 'a', 'b' ], [ 'c' ]]→[2,3,4])corpus = [vocab[token] for line in tokens for token in line]print(f"語料庫長度: {len(corpus)}(詞元索引總數)")# 定義迭代器生成函數:每次調用生成新的隨機抽樣迭代器(確保每個epoch數據不同)def data_iter():return seq_data_iter_random(corpus, batch_size, num_steps)return data_iter, vocab # 返回生成函數和詞表# -------------------------------------------------------------------------------------------------------- 模型構建模塊 ------------------------------------------------------------------------------
# -------------------------- 5. 自定義RNNModel類(解決d2l.RNNModel缺失問題) --------------------------
class RNNModel(nn.Module):"""自定義循環神經網絡模型類,替代d2l.RNNModel,適配多層LSTM"""def __init__(self, rnn_layer, vocab_size):super(RNNModel, self).__init__()self.rnn = rnn_layer # 多層LSTM層(nn.LSTM)self.vocab_size = vocab_size # 詞表大小self.num_hiddens = self.rnn.hidden_size # 隱藏層維度(從LSTM層獲取)# 輸出層:將LSTM的隱藏狀態映射到詞表維度(用于預測下一個詞元)self.dense = nn.Linear(self.num_hiddens, vocab_size)def forward(self, inputs, state):"""前向傳播:處理輸入并返回輸出和狀態參數:inputs: 輸入序列(batch_size, num_steps),元素為詞元索引state: 初始隱藏狀態((H_0, C_0),由begin_state生成)返回:output: 所有時間步的輸出(batch_size * num_steps, vocab_size)state: 最終隱藏狀態((H_t, C_t))"""# 1. 輸入處理:轉換為one-hot編碼并轉置為(num_steps, batch_size, vocab_size)X = F.one_hot(inputs.T.long(), self.vocab_size).type(torch.float32)# 2. LSTM前向傳播:X為輸入,state為初始狀態# - output: (num_steps, batch_size, num_hiddens),所有時間步的頂層隱藏狀態# - state: (H_t, C_t),最終的隱藏狀態和記憶元output, state = self.rnn(X, state)# 3. 輸出層映射:將隱藏狀態轉換為詞表分布# - output.reshape(-1, output.shape[2]): 展平為(num_steps*batch_size, num_hiddens)# - 經dense層后得到(num_steps*batch_size, vocab_size)output = self.dense(output.reshape(-1, output.shape[2]))return output, statedef begin_state(self, batch_size, device):"""生成初始隱藏狀態(LSTM需要兩個初始狀態:隱狀態H和記憶元C)"""# LSTM的初始狀態形狀為:(num_layers, batch_size, num_hiddens)return (torch.zeros((self.rnn.num_layers, batch_size, self.num_hiddens), device=device),torch.zeros((self.rnn.num_layers, batch_size, self.num_hiddens), device=device))# -------------------------- 6. RNN模型包裝類 --------------------------
class RNNModelScratch: # @save"""從零實現的RNN模型包裝類,統一模型調用接口(適配訓練和預測流程)"""def __init__(self, vocab_size, num_hiddens, device,get_params, init_state, forward_fn):"""參數:vocab_size: 詞表大小(輸入/輸出維度)num_hiddens: 隱藏層維度(記憶元/隱狀態的維度)device: 計算設備get_params: 參數初始化函數(如get_lstm_params)init_state: 狀態初始化函數(如init_lstm_state)forward_fn: 前向傳播函數(如lstm)"""self.vocab_size, self.num_hiddens = vocab_size, num_hiddensself.params = get_params(vocab_size, num_hiddens, device) # 模型參數(通過get_params獲取)self.init_state, self.forward_fn = init_state, forward_fn # 狀態初始化和前向傳播函數def __call__(self, X, state):"""模型調用接口(前向傳播入口,兼容PyTorch的調用方式)參數:X: 輸入序列(batch_size, num_steps),元素為詞元索引(未編碼的原始輸入)state: 初始隱藏狀態((H_0, C_0))返回:y_hat: 輸出(num_steps*batch_size, vocab_size),所有時間步的輸出拼接state: 最終隱藏狀態((H_t, C_t))"""# 處理輸入:# 1. X.T:轉置為(num_steps, batch_size)(便于逐時間步處理,時間步在前)# 2. F.one_hot:轉換為one-hot編碼(num_steps, batch_size, vocab_size),將索引轉為向量# 3. type(torch.float32):轉換為浮點型(適配后續矩陣運算,權重為浮點型)X = F.one_hot(X.T, self.vocab_size).type(torch.float32)# 調用前向傳播函數(如lstm)計算輸出和新狀態return self.forward_fn(X, state, self.params)def begin_state(self, batch_size, device):"""獲取初始隱藏狀態(調用初始化函數,封裝狀態初始化邏輯)"""return self.init_state(batch_size, self.num_hiddens, device)# -------------------------------------------------------------------------------------------------------- 文本生成與訓練模塊 ------------------------------------------------------------------------------
# -------------------------- 7. 預測函數(文本生成) --------------------------
def predict_ch8(prefix, num_preds, net, vocab, device): # @save"""根據前綴生成后續字符(文本生成,驗證模型學習效果)參數:prefix: 前綴字符串(如"time traveller",模型基于此生成后續內容)num_preds: 要生成的字符數net: 訓練好的LSTM模型vocab: 詞表(用于詞元與索引的轉換)device: 計算設備返回:生成的字符串(前綴+預測字符,如前綴"ti"生成"time...")"""# 初始化狀態(批量大小為1,因僅生成一條序列,無需并行)state = net.begin_state(batch_size=1, device=device)# 記錄輸出索引:初始為前綴首字符的索引(將前綴轉換為索引序列)outputs = [vocab[prefix[0]]]# 輔助函數:獲取當前輸入(最后一個輸出的索引,形狀(1,1),符合模型輸入格式)def get_input():return torch.tensor([outputs[-1]], device=device).reshape((1, 1))# 預熱期:用前綴更新模型狀態(不生成新字符,僅讓模型"記住"前綴的信息)for y in prefix[1:]:_, state = net(get_input(), state) # 前向傳播,更新狀態(忽略輸出,因只需狀態)outputs.append(vocab[y]) # 記錄前綴字符的索引(確保outputs包含完整前綴)# 預測期:生成num_preds個字符for _ in range(num_preds):y, state = net(get_input(), state) # 前向傳播,獲取輸出和新狀態(y是當前時間步的輸出)# 取概率最大的字符索引(貪婪采樣:簡單策略,選擇模型認為最可能的下一個字符)outputs.append(int(y.argmax(dim=1).reshape(1)))# 將索引轉換為字符,拼接成字符串返回(完成從索引到文本的轉換)return ''.join([vocab.idx_to_token[i] for i in outputs])# -------------------------- 8. 梯度裁剪(防止梯度爆炸) --------------------------
def grad_clipping(net, theta): # @save"""裁剪梯度(將梯度L2范數限制在theta內),防止梯度爆炸(RNN訓練中常見問題)原理:若梯度范數超過閾值theta,則按比例縮小所有梯度,確保訓練穩定參數:net: 模型(自定義模型或nn.Module)theta: 梯度閾值(如1.0,根據經驗設置)"""# 獲取需要梯度更新的參數if isinstance(net, nn.Module):# 若為PyTorch官方Module,直接取parameters(包含所有需要梯度的參數)params = [p for p in net.parameters() if p.requires_grad]else:# 若為自定義模型(如RNNModelScratch),取params屬性(存儲模型參數)params = net.params# 計算所有參數梯度的L2范數(平方和開根號)norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))if norm > theta: # 若范數超過閾值,按比例裁剪(保持梯度方向不變,縮小幅度)for param in params:param.grad[:] *= theta / norm# -------------------------- 9. 訓練函數 --------------------------
def train_epoch_ch8(net, train_iter_fn, loss, updater, device, use_random_iter):"""訓練一個周期(單輪遍歷數據集)參數:net: LSTM模型train_iter_fn: 迭代器生成函數(調用后返回新迭代器,每個epoch重新生成數據)loss: 損失函數(如CrossEntropyLoss,計算預測與標簽的差距)updater: 優化器(如SGD,用于更新模型參數)device: 計算設備use_random_iter: 是否使用隨機抽樣(影響狀態處理方式:隨機抽樣時狀態獨立,無需傳遞)返回:ppl: 困惑度(perplexity,語言模型性能指標,越低表示模型越好)speed: 訓練速度(詞元/秒,衡量訓練效率)"""state, timer = None, d2l.Timer() # 初始化狀態和計時器(timer用于計算訓練速度)metric = d2l.Accumulator(2) # 累加器:(總損失, 總詞元數),用于計算平均損失batches_processed = 0 # 記錄處理的批次數量(驗證是否有數據被處理)# 關鍵修復:每次訓練都通過函數生成新的迭代器(避免迭代器被提前消費,確保每個epoch數據不同)train_iter = train_iter_fn()# 遍歷批量數據(每個X, Y是一個批次)for X, Y in train_iter:batches_processed += 1# 初始化狀態:# - 首次迭代時需初始化(state為None)# - 隨機抽樣時,每個批次的序列獨立(無上下文關聯),需重新初始化if state is None or use_random_iter:state = net.begin_state(batch_size=X.shape[0], device=device)else:# 非隨機抽樣時,分離狀態(切斷梯度回流到之前的批次,避免梯度計算依賴過長導致爆炸)if isinstance(net, nn.Module) and not isinstance(state, tuple):state.detach_() # 單個狀態直接detach(如GRU只有隱狀態)else:for s in state: # 多個狀態(如LSTM有隱狀態和記憶元)逐個detachs.detach_()# 處理標簽:# Y.T.reshape(-1):轉置后展平為(num_steps*batch_size,)(與輸出y_hat的形狀匹配)# 輸出y_hat的形狀是(num_steps*batch_size, vocab_size),標簽需為1D張量y = Y.T.reshape(-1)# 將輸入和標簽移到目標設備(GPU/CPU,確保與模型參數在同一設備)X, y = X.to(device), y.to(device)# 前向傳播:獲取輸出和新狀態y_hat, state = net(X, state)# 計算損失(mean()是因損失函數可能返回每個樣本的損失,取平均得到批次損失)l = loss(y_hat, y.long()).mean()# 反向傳播與參數更新:if isinstance(updater, torch.optim.Optimizer):# 若為PyTorch優化器(如SGD)updater.zero_grad() # 清零梯度(避免梯度累積)l.backward() # 反向傳播計算梯度grad_clipping(net, 1) # 裁剪梯度(閾值1,防止梯度爆炸)updater.step() # 更新參數else:# 若為自定義優化器(如d2l的sgd函數)l.backward()grad_clipping(net, 1)updater(batch_size=1) # 假設批量大小為1的更新(簡化實現)# 累加總損失和總詞元數(用于計算平均損失)# metric[0] += l * y.numel():總損失=批次損失×詞元數(因l是平均損失)# metric[1] += y.numel():總詞元數=累加每個批次的詞元數量metric.add(l * y.numel(), y.numel())# 檢查是否有批次被處理(避免空迭代導致的錯誤)if batches_processed == 0:print("警告:沒有處理任何訓練批次!")return float('inf'), 0# 計算困惑度(perplexity = exp(平均損失),語言模型專用指標,與交叉熵損失正相關)# 平均損失 = 總損失 / 總詞元數,exp后得到困惑度(完美模型困惑度=1)# 速度 = 總詞元數 / 訓練時間(詞元/秒,衡量訓練效率)return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()def train_ch8(net, train_iter_fn, vocab, lr, num_epochs, device, use_random_iter=False):"""訓練模型(多周期,整合單周期訓練邏輯,輸出訓練過程和結果)參數:net: LSTM模型train_iter_fn: 迭代器生成函數vocab: 詞表lr: 學習率(控制參數更新幅度)num_epochs: 訓練周期數(遍歷數據集的次數,影響模型收斂程度)device: 計算設備use_random_iter: 是否使用隨機抽樣(默認False,即順序抽樣)"""loss = nn.CrossEntropyLoss() # 交叉熵損失(適用于分類任務,此處為詞元預測,多分類問題)# 動畫器:可視化訓練過程(實時繪制困惑度隨周期變化的曲線,直觀觀察模型收斂情況)animator = d2l.Animator(xlabel='epoch', ylabel='perplexity',legend=['train'], xlim=[10, num_epochs])# 初始化優化器:if isinstance(net, nn.Module):# 若為PyTorch Module,使用SGD優化器(隨機梯度下降,適合簡單模型)updater = torch.optim.SGD(net.parameters(), lr)else:# 若為自定義模型,使用d2l的sgd函數(簡化的隨機梯度下降實現)updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size)# 定義預測函數:根據前綴"time traveller"生成50個字符(驗證模型學習效果)predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device)# 多周期訓練for epoch in range(num_epochs):# 訓練一個周期,返回困惑度和速度ppl, speed = train_epoch_ch8(net, train_iter_fn, loss, updater, device, use_random_iter)# 每10個周期打印一次預測結果(觀察生成文本質量變化,判斷模型是否學到有意義的模式)if (epoch + 1) % 10 == 0:print(f"epoch {epoch + 1} 預測: {predict('time traveller')}")animator.add(epoch + 1, [ppl]) # 記錄困惑度,更新動畫# 訓練結束后輸出最終結果(總結模型性能)print(f'最終困惑度 {ppl:.1f}, 速度 {speed:.1f} 詞元/秒 {device}')print(f"time traveller 預測: {predict('time traveller')}") # 用"time traveller"前綴生成文本print(f"traveller 預測: {predict('traveller')}") # 用"traveller"前綴生成文本# -------------------------- 主程序 --------------------------
# -------------------------- 主程序 --------------------------
if __name__ == '__main__':# 超參數設置batch_size, num_steps = 32, 35 # 批量大小、時間步(與原代碼一致)train_iter, vocab = load_data_time_machine(batch_size, num_steps) # 加載數據# 深度LSTM模型參數(核心改裝:增加num_layers=2實現深層結構)vocab_size = len(vocab) # 輸入/輸出維度=詞表大小num_hiddens = 256 # 每層隱藏狀態維度num_layers = 2 # 隱藏層數量(深度為2,實現DRNN)device = d2l.try_gpu() # 自動選擇設備(GPU優先)# 改裝1:使用PyTorch官方nn.LSTM構建多層LSTM# nn.LSTM參數說明:# - input_size: 輸入特征維度(此處為詞表大小)# - hidden_size: 每層隱藏狀態維度# - num_layers: 隱藏層數量(>=2即深度循環網絡)lstm_layer = nn.LSTM(input_size=vocab_size,hidden_size=num_hiddens,num_layers=num_layers)# 改裝2:使用d2l.RNNModel封裝多層LSTM(適配訓練框架)# d2l.RNNModel是d2l庫中封裝的循環神經網絡模型類,兼容nn.LSTM/GRU等官方層# 功能:處理輸入編碼、連接輸出層(將隱藏狀態映射到詞表維度)model = RNNModel(rnn_layer=lstm_layer, # 多層LSTM層vocab_size=vocab_size # 詞表大小(輸出層維度))model = model.to(device) # 將模型移動到目標設備# 訓練深度LSTM模型(使用原有訓練函數,兼容nn.Module類型模型)num_epochs, lr = 500, 0.12 # 訓練周期、學習率(與原代碼一致)train_ch8(model, train_iter, vocab, lr, num_epochs, device)plt.show(block=True) # 顯示訓練過程可視化圖表