結合生活實例,先簡單認識一下什么是循環神經網絡
先想個問題:為什么需要 “循環”?
你平時看句子、聽語音、看視頻,都是 “按順序” 來的吧?比如 “我吃蘋果” 和 “蘋果吃我”,字一樣但順序不同,意思天差地別。
傳統的神經網絡像個 “健忘癥患者”—— 處理每個字的時候,只看當前這個字,記不住前面的。比如看 “蘋果吃我”,它看到 “蘋果” 時,等下看到 “吃我”,早忘了 “蘋果” 是啥了,自然分不清意思。
而 RNN 就像個 “有記性的人”—— 處理每個字時,會偷偷記住前面看過的內容。比如看 “蘋果吃我”,它看到 “蘋果” 時記下來,看到 “吃” 時,結合前面的 “蘋果”,再看到 “我” 時,就知道是 “蘋果吃我”(雖然不合邏輯,但能記住順序)。
RNN 到底怎么 “記” 東西?
把 RNN 想象成一個 “復讀機 + 記事本” 的結合體,處理序列數據(比如一句話)時,它會按順序一個字一個字地 “讀”,邊讀邊記。
舉個例子:用 RNN 理解句子 “我愛吃西瓜”。
第一步(看 “我”):
它先看到 “我”,心里默默記下來(這就是 “記憶”,專業名叫 “隱藏狀態”),記的內容是 “現在看到了‘我’”。第二步(看 “愛”):
它不會忘了剛才的 “我”,而是把 “愛” 和之前記的 “我” 放一起,更新記憶:“現在是‘我’+‘愛’”。第三步(看 “吃”):
繼續帶著之前的記憶(“我 + 愛”),加上 “吃”,記憶變成:“我 + 愛 + 吃”。第四步(看 “西瓜”):
帶著 “我 + 愛 + 吃” 的記憶,加上 “西瓜”,最終記憶變成整個句子的信息:“我愛吃西瓜”。
你看,它每一步都把新內容和 “之前的記憶” 混在一起,更新記憶 —— 這就是 “循環” 的意思:后一步依賴前一步的記憶,一步步傳遞下去。
為什么說 “權重共享”?
還是剛才的例子,RNN 處理 “我”“愛”“吃”“西瓜” 這四個字時,用的是同一套 “記東西的規則”。
就像你記日記,不管今天記開心事還是難過事,都是用同樣的方式寫在本子上(不會今天用中文,明天用英文)。RNN 也一樣,處理每個字的邏輯完全相同,這樣既能少學很多規則,又能適應不同長度的句子(比如一句話 3 個字或 10 個字,都能用同一套方法處理)。
RNN 能干啥?
說白了,就是處理 “有順序” 的事兒:
- 看一句話猜情緒(“這部電影太爛了” 是負面,得記住每個詞的順序才能判斷);
- 聽語音轉文字(聲音是按時間順序來的,前面的音和后面的音有關聯);
- 預測明天的天氣(今天、昨天的天氣會影響明天,得按時間順序記下來)。
它的毛病在哪?
RNN 的 “記性” 不好,記不住太久遠的事。比如一句話特別長:“今天早上出門時忘了帶傘,結果……(中間 100 個字)…… 所以全身濕透了”。
RNN 處理到 “全身濕透了” 時,可能早就忘了 “早上沒帶傘” 這回事了 —— 這就是 “長時記憶差”,專業叫 “梯度消失”,后面的 LSTM、GRU 就是給它加了 “備忘錄”,幫它記久一點。
總結一下:
RNN 就像一個 “有短期記憶的復讀機”,處理按順序來的數據時,會把新信息和之前的記憶混在一起,一步步傳遞下去,所以能理解順序的重要性。但記性不算太好,長句子容易忘事兒~
專業術語解釋?
循環神經網絡(Recurrent Neural Network, RNN)是一類專門處理序列數據(如文本、語音、時間序列等)的神經網絡,其核心是通過隱藏狀態的循環傳遞捕捉數據中的時序依賴關系。以下從專業角度解析其基本結構與機制:
1. 核心目標
傳統前饋神經網絡(如 CNN、全連接網絡)的輸入是固定維度的非序列數據,且各層神經元間無反饋連接,無法處理時序依賴(如 “蘋果吃我” 與 “我吃蘋果” 的語義差異由詞序決定)。 RNN 的核心設計是:讓網絡在處理序列的第 t 步時,能利用第 t-1 步的信息,從而建模序列中 “前因后果” 的關聯。
2. 基本結構與循環機制
RNN 的結構可簡化為 “輸入層 - 隱藏層 - 輸出層”,但其核心特征是隱藏層存在自循環連接,即隱藏層的輸出會作為自身的輸入參與下一時間步的計算。
關鍵變量定義:
核心計算公式:
隱藏狀態更新(循環的核心):
輸出計算:
3. 權重共享機制
RNN 的關鍵特性是所有時間步共享同一套參數(\(W_{hx}, W_{hh}, W_{ho}, b_h, b_o\))。 這意味著:處理序列中不同位置的元素(如第 1 個詞與第 t 個詞)時,使用相同的權重矩陣與偏置。
- 優勢:極大減少參數數量(與序列長度 T 無關),使模型能適應任意長度的序列輸入;
- 本質:建模 “序列中通用的時序規律”(如語言中 “主謂賓” 的語法規則對所有句子通用)。
4. 序列處理模式
根據輸入序列與輸出序列的長度關系,RNN 的應用模式可分為 4 類:
- 一對一:輸入輸出均為單元素(如固定長度的時序數據分類,如 “用前 3 天天氣預測第 4 天”);
- 一對多:單輸入生成序列(如輸入 “晴天” 生成 “出門帶傘?否;適合野餐?是”);
- 多對一:序列輸入生成單輸出(如文本情感分類,輸入句子輸出 “正面 / 負面”);
- 多對多:序列輸入生成等長 / 不等長序列(如機器翻譯,輸入 “我愛你” 輸出 “I love you”)。
5. 局限性
標準 RNN 的隱藏狀態更新依賴線性變換與簡單激活函數(如 tanh),在處理長序列(如 T>100)時會出現梯度消失 / 爆炸問題:
- 反向傳播時,梯度需通過?\(W_{hh}\)?的多次矩陣乘法傳遞,當
時梯度會指數級衰減(消失),導致模型無法學習長距離依賴(如 “早上忘帶傘...(100 詞后)... 淋濕了” 的關聯);
- 這一缺陷推動了 LSTM(長短期記憶網絡)、GRU(門控循環單元)等改進模型的提出,通過 “門控機制” 動態控制信息的保留與遺忘。
總結:RNN 通過隱藏狀態的循環傳遞與權重共享,實現了對序列時序依賴的建模,是處理時序數據的基礎模型;其核心是公式 所體現的 “當前狀態依賴歷史狀態” 的循環邏輯。
簡易代碼實戰(最后附帶完整代碼)
1. 序列數據的表
# 生成正弦波時序數據
time = np.linspace(0, 2 * np.pi * n_samples / 10, n_samples + seq_length)
data = np.sin(time)# 創建輸入序列X和目標值y
for i in range(n_samples):X.append(data[i:i+seq_length]) # 前seq_length個點作為輸入y.append(data[i+seq_length]) # 下一個點作為預測目標
- 概念對應:
- 序列數據:正弦波是典型的時序數據,每個點依賴于前面的點。
- 輸入序列長度:
seq_length=20
表示用前 20 個時間步預測第 21 個!!!!! - 時間步(time step):每個時間步對應序列中的一個點(如
t=1
對應data[0]
)。
2. RNN 模型結構
class SimpleRNN(nn.Module):def __init__(self, input_size, hidden_size, output_size, num_layers=1):super(SimpleRNN, self).__init__()self.hidden_size = hidden_sizeself.num_layers = num_layers# 定義RNN層(核心組件)self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True, nonlinearity='tanh')# 全連接層:將RNN的輸出映射到預測值self.fc = nn.Linear(hidden_size, output_size)def forward(self, x):# 初始化隱藏狀態h0(形狀:[層數, 批量大小, 隱藏維度])h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)# 前向傳播RNN# out形狀:[批量大小, 序列長度, 隱藏維度]# hn形狀:[層數, 批量大小, 隱藏維度]out, hn = self.rnn(x, h0)# 只取最后一個時間步的輸出(用于預測下一個值)out = out[:, -1, :] # 形狀:[批量大小, 隱藏維度]# 通過全連接層得到預測值out = self.fc(out) # 形狀:[批量大小, 輸出維度]return out
一、模型初始化(__init__
方法)
1.?參數含義
def __init__(self, input_size, hidden_size, output_size, num_layers=1):
input_size
:每個時間步的輸入特征數(本例中為 1,因為只輸入正弦波的當前值)。hidden_size
:隱藏狀態的維度(記憶容量),數值越大,模型能記住的信息越多(本例為 64)。output_size
:輸出的維度(本例中為 1,因為只預測下一個正弦波值)。num_layers
:RNN 的層數(默認 1 層,可堆疊多層增強表達能力)。
2.?RNN 層的定義
self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True, nonlinearity='tanh')
batch_first=True
:輸入張量的第 1 維是批量大小([batch, seq_len, feature]
)。nonlinearity='tanh'
:使用 tanh 激活函數(將輸出值壓縮到 [-1, 1] 區間)。
3.?全連接層的作用
self.fc = nn.Linear(hidden_size, output_size)
- 將 RNN 的隱藏狀態(
hidden_size
維)映射到最終輸出(output_size
維)。 - 相當于做一個線性變換:
y = W*h + b
。
二、前向傳播(forward
方法)
1.?初始化隱藏狀態
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
h0
是序列開始時的初始隱藏狀態,形狀為[層數, 批量大小, 隱藏維度]
。- 初始化為全零向量,表示序列開始時模型沒有任何先驗記憶。
2.?RNN 層的計算
out, hn = self.rnn(x, h0)
輸入:
x
:輸入序列,形狀為[batch, seq_len, input_size]
(本例中為[32, 20, 1]
)。h0
:初始隱藏狀態,形狀為[num_layers, batch, hidden_size]
。
輸出:
out
:所有時間步的隱藏狀態,形狀為[batch, seq_len, hidden_size]
。hn
:最后一個時間步的隱藏狀態(即out
的最后一個時間步),形狀為[num_layers, batch, hidden_size]
。
3.?提取最后時間步的輸出
out = out[:, -1, :] # 取每個樣本的最后一個時間步
out
的原始形狀:[batch, seq_len, hidden_size]
(例如[32, 20, 64]
)。- 提取后形狀:
[batch, hidden_size]
(例如[32, 64]
)。 - 為什么只取最后一個時間步?因為我們的任務是預測序列的下一個值,最后一個時間步的隱藏狀態包含了整個序列的信息。
4.?通過全連接層生成預測
out = self.fc(out) # 將64維隱藏狀態映射到1維輸出
- 最終輸出形狀:
[batch, output_size]
(本例中為[32, 1]
)。
三、用具體例子理解數據流動
假設:
- 批量大小
batch_size=2
(同時處理 2 個序列)。 - 序列長度
seq_length=3
(每個序列有 3 個時間步)。 - 輸入維度
input_size=1
(每個時間步 1 個特征)。 - 隱藏維度
hidden_size=2
(簡化計算)。
1.?輸入 x 的形狀
x.shape = [2, 3, 1]
# 示例數據:
x = [[[0.1], [0.2], [0.3]], # 第1個序列[[0.4], [0.5], [0.6]] # 第2個序列
]
2.?初始隱藏狀態 h0 的形狀
h0.shape = [1, 2, 2] # [層數, 批量, 隱藏維度]
# 初始化為全零:
h0 = [[[0, 0], [0, 0]] # 第1層(唯一層)的兩個批量的初始隱藏狀態
]
3.?RNN 計算過程(簡化版)
對第 1 個序列的第 1 個時間步x[0, 0] = [0.1]
:
h_1 = tanh(W_hx * [0.1] + W_hh * [0, 0] + b_h)
# 假設W_hx = [[0.5], [0.3]], W_hh = [[0.2, 0.1], [0.4, 0.3]]
h_1 = tanh([0.5*0.1 + 0.2*0 + 0.1*0, 0.3*0.1 + 0.4*0 + 0.3*0])= tanh([0.05, 0.03])≈ [0.05, 0.03] # 經過tanh激活后的結果
類似地,計算后續時間步和其他序列,最終得到:
out.shape = [2, 3, 2]
out = [[[0.05, 0.03], [0.12, 0.08], [0.20, 0.15]], # 第1個序列的3個時間步[[0.25, 0.18], [0.35, 0.25], [0.45, 0.32]] # 第2個序列的3個時間步
]
4.?提取最后時間步并通過全連接層
out[:, -1, :] = [[0.20, 0.15], [0.45, 0.32]] # 形狀:[2, 2]# 假設全連接層權重W_fc = [[0.6], [0.7]],偏置b_fc = [0.1]
final_output = [[0.20*0.6 + 0.15*0.7 + 0.1], [0.45*0.6 + 0.32*0.7 + 0.1]]≈ [[0.295], [0.584]] # 形狀:[2, 1]
四、關鍵概念總結
隱藏狀態:
- RNN 的核心是隱藏狀態
h_t
,它整合了當前輸入和歷史信息。 - 每個時間步的計算都依賴上一步的隱藏狀態,形成 “記憶鏈”。
- RNN 的核心是隱藏狀態
權重共享:
W_hx
和W_hh
在所有時間步中保持不變,減少了參數量。
輸入輸出形狀:
- 輸入:
[batch, seq_len, input_size]
。 - 輸出:
[batch, seq_len, hidden_size]
(所有時間步)或[batch, hidden_size]
(最后時間步)。
- 輸入:
序列建模能力:
- 通過隱藏狀態的傳遞,RNN 能捕捉序列中的時序依賴關系(如正弦波的周期性)。
3. 前向傳播與隱藏狀態傳遞
def forward(self, x):h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)out, hn = self.rnn(x, h0)out = out[:, -1, :] # 取最后一個時間步的輸出
- 概念對應:
- 初始隱藏狀態(h0):序列開始時的記憶(全零向量)。
- 隱藏狀態更新:
h_t = tanh(W_hx * x_t + W_hh * h_{t-1})
每個時間步的隱藏狀態h_t
整合當前輸入x_t
和上一步記憶h_{t-1}
。 - 輸出形狀:
out
是所有時間步的隱藏狀態,形狀為[batch, seq_len, hidden_size]
。 - 最終輸出:只取最后一個時間步的隱藏狀態(
out[:, -1, :]
),用于預測下一個值。
4. 批處理與并行計算
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
- 概念對應:
- 批處理(batch):每次訓練同時處理 32 個序列,加速計算。
- 輸入形狀:
[batch_size, seq_length, input_size]
?=?[32, 20, 1]
。 - 并行計算:GPU 同時處理 32 個序列的前向 / 反向傳播。
5. 訓練過程與損失函數
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)for epoch in range(epochs):for X_batch, y_batch in train_loader:outputs = model(X_batch)loss = criterion(outputs, y_batch) # 計算預測值與真實值的MSEoptimizer.zero_grad()loss.backward() # 反向傳播計算梯度optimizer.step() # 更新參數
- 概念對應:
- 損失函數(MSE):
Loss = 1/N * Σ(y_pred - y_true)2
衡量預測值與真實值的差異。 - 反向傳播:通過鏈式法則計算每個參數的梯度(如
dLoss/dW_hh
)。 - 梯度消失:標準 RNN 在長序列中梯度會指數衰減(這里序列較短,問題不明顯)。
- 損失函數(MSE):
6. 長距離依賴的挑戰
# 序列長度seq_length=20,RNN可較好處理
# 若seq_length很大(如100),標準RNN性能會下降
- 概念對應:
- 梯度消失 / 爆炸:RNN 通過
tanh
激活函數傳遞梯度,當序列很長時,梯度會趨近于 0 或無窮大。 - 改進方案:LSTM/GRU 通過門控機制解決這一問題(后續可嘗試替換
nn.RNN
為nn.LSTM
)。
- 梯度消失 / 爆炸:RNN 通過
7. 預測與可視化
plt.plot(targets, label='True Values')
plt.plot(predictions, label='Predictions')
- 概念對應:
- 預測能力:模型學習到正弦波的周期性,能用前 20 個點預測下一個點。
- 泛化驗證:測試集上的預測效果驗證模型是否真正理解序列規律。
8.完整代碼
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt# --------------訓練 RNN 預測下一個時間步的值------------#
# 設置隨機種子以確保結果可復現
torch.manual_seed(42)
np.random.seed(42)# 設置設備
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')# 生成簡單的時序數據用于演示
n_samples = 1000
seq_length = 20
time = np.linspace(0, 2 * np.pi * n_samples / 10, n_samples + seq_length)
data = np.sin(time)
X = []
y = []
for i in range(n_samples):X.append(data[i:i + seq_length])y.append(data[i + seq_length])
X = np.array(X)
y = np.array(y)# 轉換為PyTorch張量
X = torch.FloatTensor(X).view(n_samples, seq_length, 1) # [batch, seq_len, feature_dim]
y = torch.FloatTensor(y).view(n_samples, 1) # [batch, output_dim]# 劃分訓練集和測試集
train_size = int(0.8 * n_samples)
train_X, test_X = X[:train_size], X[train_size:]
train_y, test_y = y[:train_size], y[train_size:]# 創建數據加載器
batch_size = 32
train_dataset = TensorDataset(train_X, train_y)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)test_dataset = TensorDataset(test_X, test_y)
test_loader = DataLoader(test_dataset, batch_size=batch_size)# 定義RNN模型
class SimpleRNN(nn.Module):def __init__(self, input_size, hidden_size, output_size, num_layers=1):super(SimpleRNN, self).__init__()self.hidden_size = hidden_sizeself.num_layers = num_layers# PyTorch內置的RNN層# batch_first=True表示輸入的形狀為[batch, seq_len, feature_dim]self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True, nonlinearity='tanh')# 全連接層:將RNN的輸出映射到預測值self.fc = nn.Linear(hidden_size, output_size)def forward(self, x):# 初始化隱藏狀態# 形狀為[num_layers, batch_size, hidden_size]h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)# 前向傳播RNN# out形狀為[batch_size, seq_len, hidden_size]# hn形狀為[num_layers, batch_size, hidden_size]out, hn = self.rnn(x, h0)# 我們只需要最后一個時間步的輸出# 形狀變為[batch_size, hidden_size]out = out[:, -1, :]# 通過全連接層得到預測值# 形狀變為[batch_size, output_size]out = self.fc(out)return out# 模型參數
input_size = 1 # 輸入特征維度(每個時間步的特征數)
hidden_size = 64 # 隱藏層維度
output_size = 1 # 輸出維度(預測值的維度)
num_layers = 1 # RNN層數# 初始化模型
model = SimpleRNN(input_size, hidden_size, output_size, num_layers).to(device)# 定義損失函數和優化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)# 訓練模型
epochs = 50
train_losses = []print("開始訓練模型...")
for epoch in range(epochs):total_loss = 0for X_batch, y_batch in train_loader:X_batch, y_batch = X_batch.to(device), y_batch.to(device)# 前向傳播outputs = model(X_batch)loss = criterion(outputs, y_batch)# 反向傳播和優化optimizer.zero_grad()loss.backward()optimizer.step()total_loss += loss.item()# 計算平均損失avg_loss = total_loss / len(train_loader)train_losses.append(avg_loss)if (epoch + 1) % 10 == 0:print(f'Epoch [{epoch + 1}/{epochs}], Loss: {avg_loss:.4f}')# 評估模型
model.eval()
predictions = []
targets = []with torch.no_grad():for X_batch, y_batch in test_loader:X_batch, y_batch = X_batch.to(device), y_batch.to(device)# 前向傳播outputs = model(X_batch)predictions.extend(outputs.cpu().numpy())targets.extend(y_batch.cpu().numpy())predictions = np.array(predictions)
targets = np.array(targets)# 可視化結果
plt.figure(figsize=(12, 5))plt.subplot(1, 2, 1)
plt.plot(train_losses)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')plt.subplot(1, 2, 2)
plt.plot(targets, label='True Values')
plt.plot(predictions, label='Predictions')
plt.title('Time Series Prediction')
plt.xlabel('Time Step')
plt.ylabel('Value')
plt.legend()plt.tight_layout()
plt.show()# 保存模型
torch.save(model.state_dict(), '1-0-rnn_model.pth')
print('Model saved as rnn_model.pth')