Hi,大家好,我是半畝花海。本實驗基于汽車銷量時序數據,使用LSTM網絡(長短期記憶網絡)構建時間序列預測模型。通過數據預處理、模型訓練與評估等完整流程,驗證LSTM在短期時序預測中的有效性。
目錄
一、實驗目標
二、實驗原理
三、實驗環境
四、實驗步驟
1. 數據預處理
2. 構建時間序列數據集
3. 模型構建與訓練
4. 預測與反歸一化
5. 結果可視化
6. 模型評估
五、實驗分析
1. 評估指標解讀
2. 可能的原因分析
3. 改進方法
六、完整代碼
一、實驗目標
本實驗基于汽車銷量時序數據,構建LSTM神經網絡模型,實現對時間序列數據的預測,并通過可視化和評估指標驗證模型性能。實驗目標包括:
- 掌握時間序列數據的預處理方法
- 理解LSTM網絡的工作原理
- 構建端到端的時序預測模型
- 評估模型預測性能
二、實驗原理
1. 核心算法:LSTM網絡
長短期記憶 (Long Short-Term Memory, LSTM) 是一種時間遞歸神經網絡(RNN),它是一種基于機器學習理論的循環網絡時間序列預測方法。該模型可有效處理并解決RNN中人為很難實現的延長時間任務的問題,并預測時間序列中間隔和延遲非常長的重要事件,同時削減了RNN中梯度消失問題對預測研究的影響,總的來說LSTM模型是一種特殊的RNN循環神經網絡。
RNN循環神經網絡結構如圖所示。
LSTM的網絡結構大概為:單層LSTM(5個神經元)+ 全連接層,輸入形狀為(時間步長=1, 特征數=1)
。LSTM網絡結構如圖所示。
(1)輸入門(Input gate)的計算:
(2)遺忘門(Forget gate)的計算:
(3)候選記憶單元的計算:
(4)記憶單元狀態更新的計算:
(5)輸出門的計算:
(6)隱藏狀態(output)的計算:
2. 關鍵技術實現
- 數據歸一化:使用
MinMaxScaler
將數據縮放到[0,1]區間,加速模型收斂。 - 時間序列建模:通過滑動窗口法(
look_back=1
)將數據轉換為監督學習格式,每個樣本包含1個時間步的歷史數據。 - 模型訓練:采用Adam優化器和均方誤差損失函數,訓練150輪,批量大小為2。
三、實驗環境
項目 | 配置/版本 |
---|---|
Python | 3.8+(3.8.20) |
TensorFlow | 2.10.0 |
主要庫 | numpy(1.21.6), matplotlib(3.6.3), pandas(1.4.4), scikit-learn(1.1.3), pillow(10.4.0) |
硬件環境 | CPU/GPU |
四、實驗步驟
1. 數據預處理
(1)數據加載與清洗
# 加載數據(取第2列,跳過末尾3行)
dataframe = pd.read_csv("D:\Python_demo\Time Series\LSTM\car.csv", usecols=[1], skipfooter=3)
- 選取CSV文件第2列數據(銷量數據)
- 跳過末尾3行異常數據
- 轉換為float32類型保證數值精度
(2)數據歸一化
dataset = scaler.fit_transform(dataframe.values.astype('float32'))
- 使用MinMaxScaler進行0-1歸一化
- 消除量綱影響,加速模型收斂
- 公式:
(3)數據集劃分
# 劃分訓練集與測試集(8:2)
train_size = int(len(dataset) * 0.80)
trainlist, testlist = dataset[:train_size], dataset[train_size:]
- 按8:2比例劃分訓練/測試集
- 保持時序連續性,避免隨機劃分
2. 構建時間序列數據集
def create_dataset(dataset, look_back):dataX, dataY = [], []for i in range(len(dataset) - look_back - 1):a = dataset[i:(i + look_back)] # 取look_back長度的歷史數據dataX.append(a)dataY.append(dataset[i + look_back]) # 預測下一個時間點的值return np.array(dataX), np.array(dataY)# 設置時間步長(這里使用1步預測)
look_back = 1
trainX, trainY = create_dataset(trainlist, look_back)
testX, testY = create_dataset(testlist, look_back)# 調整輸入格式(LSTM需要[樣本數, 時間步, 特征數]的3D張量)
trainX = np.reshape(trainX, (trainX.shape[0], look_back, 1))
testX = np.reshape(testX, (testX.shape[0], look_back, 1))
- 滑動窗口法 :
look_back=1
表示用當前時刻數據預測下一時刻值。 - 輸入維度 :LSTM要求輸入為3D張量,此處形狀為
(樣本數, 1, 1)
。
3. 模型構建與訓練
# 構建LSTM模型
model = Sequential() # 創建序列模型
model.add(LSTM(5, input_shape=(look_back, 1))) # 添加LSTM層(5個神經元,輸入形狀為(時間步=1, 特征數=1))
model.add(Dense(1)) # 添加全連接輸出層
model.compile(loss='mean_squared_error', optimizer='adam') # 編譯模型(均方誤差損失,Adam優化器)
model.fit(trainX, trainY, epochs=150, batch_size=2, verbose=2) # 訓練模型(100輪,批量大小2)
- 損失函數 :均方誤差(MSE)適用于回歸任務。
- 優化器 :Adam動態調整學習率,平衡收斂速度與精度。
- 超參數選擇 :小批量(batch_size=2)訓練適合小數據集。
4. 預測與反歸一化
# 對訓練集和測試集進行預測
trainPredict = model.predict(trainX)
testPredict = model.predict(testX)# 反歸一化(將預測結果還原到原始數據范圍)
trainPredict = scaler.inverse_transform(trainPredict)
trainY = scaler.inverse_transform(trainY)
testPredict = scaler.inverse_transform(testPredict)
testY = scaler.inverse_transform(testY)
- 反歸一化 :將預測結果還原到原始數據范圍,便于計算真實誤差。
5. 結果可視化
# 繪制訓練集結果
plt.figure(figsize=(10, 6))
plt.plot(trainY, label="訓練實際數據", color="#FF3B1D", marker='*', linestyle="--")
plt.plot(trainPredict[1:], label="訓練預測數據", color="#F9A602", marker='*', linestyle="--")
plt.title("訓練集預測效果")
plt.xlabel('時間步')
plt.ylabel('數值')
plt.legend()
plt.show()# 繪制測試集結果
plt.figure(figsize=(10, 6))
xx = np.linspace(0, len(testY), len(testY))
plt.plot(testY, label="測試實際數據", color="#FF3B1D", marker='*', linestyle="--")
plt.plot(testPredict, label="測試預測數據", color="#F9A602", marker='*', linestyle="--")
plt.title("測試集預測效果")
plt.xlabel('時間步')
plt.ylabel('數值')
plt.xlim(0, 18.5)
plt.ylim(0, 100)
plt.legend()# 添加數據標簽
for x, y in zip(xx, testY):plt.text(x, y + 0.3, int(y), ha='center', va='bottom', fontsize=10.5)
for x, y in zip(xx, testPredict):plt.text(x, y + 0.3, int(y), ha='center', va='bottom', fontsize=10.5)plt.show()
- 坐標軸限制 :根據測試集數據范圍手動設置,避免自動縮放失真。
- 數據標簽 :通過
plt.text()
標注具體數值,增強可讀性。
6. 模型評估
指標 | 公式 |
---|---|
MAE(Mean Absolute Error,平均絕對誤差) | |
RMSE(Root Mean Squared Error,均方根誤差) | |
R2(Coefficient of Determination,決定系數) | ? |
# 計算評估指標
mae = mean_absolute_error(testPredict, testY)
rmse = np.sqrt(mean_squared_error(testPredict, testY))
r2 = r2_score(testPredict, testY)print('測試集評估指標:')
print('MAE: %.3f' % mae)
print('RMSE: %.3f' % rmse)
print('R2: %.3f' % r2)
指標 | 計算結果 | 意義 |
---|---|---|
MAE | 6.397 | 預測值與真實值的平均絕對誤差,表明平均預測誤差約4.85個單位 |
RMSE | 6.818 | 對較大誤差更敏感的均方根誤差 |
R2 | -25.777 | 模型解釋了一定的數據方差,說明模型的解釋力 |
五、實驗分析
1. 評估指標解讀
(1)MAE(Mean Absolute Error,平均絕對誤差)
- 值:6.397
- 解釋:平均絕對誤差表示預測值與真實值之間的平均差距為6.397。
- 評價:MAE值本身并不算特別大,但需要結合數據的實際范圍來判斷。如果汽車銷量數據的范圍是幾十到幾百,則6.397的誤差可能偏高。
(2)RMSE(Root Mean Squared Error,均方根誤差)
- 值:6.818
- 解釋:均方根誤差對較大的誤差更敏感,其值比MAE稍高,說明可能存在一些較大的預測偏差。
- 評價:RMSE略高于MAE,說明誤差分布中存在一些較大的異常值。
(3)R2(Coefficient of Determination,決定系數)
- 值:-25.777
- 解釋:R2衡量模型對數據方差的解釋能力,理想值為1,負值表示模型的預測效果比直接用均值預測還要差。
- 評價 :R2為負數是一個嚴重的警告信號,說明模型幾乎完全無法捕捉數據的趨勢,甚至可能在某些情況下“反向預測”。
2. 可能的原因分析
(1)數據質量問題
①數據量不足:如果訓練數據太少,模型可能無法學習到有效的模式。
②數據噪聲過多:汽車銷量數據可能存在大量隨機波動或異常值,導致模型難以擬合。
③非平穩性:時間序列數據可能存在趨勢或季節性成分,而模型未對其進行處理。
(2)模型設計問題
①LSTM結構過于簡單
- 單層LSTM(僅5個神經元)可能不足以捕捉復雜的時序關系。
- 時間步長
look_back=1
限制了模型利用歷史信息的能力。
②超參數設置不當
- 訓練輪次(epochs=150)可能過多或過少。
- 批量大小(batch_size=2)可能導致梯度更新不穩定。
(3)數據預處理問題
①歸一化范圍不合適:雖然使用了MinMaxScaler
,但如果數據分布不均勻,歸一化可能放大噪聲。
②滑動窗口法不足:look_back=1
僅使用最近一個時間點的數據進行預測,可能忽略長期依賴關系。
(4)測試集劃分問題
①訓練集和測試集分布不一致:如果測試集包含了訓練集中未見過的模式(如突然的銷量激增或下降),模型可能表現不佳。
②數據泄露:確保測試集數據沒有被意外用于訓練。
3. 改進方法
(1)數據質量優化
①檢查數據完整性 :確認數據無缺失值或異常值,并剔除明顯的噪聲點。
②平滑處理 :對原始數據進行移動平均或其他平滑操作,減少短期波動的影響。
③分解時間序列 :使用STL分解等方法提取趨勢和季節性成分,分別建模。
(2)模型結構調整
①增加LSTM層數和神經元數量
model.add(LSTM(50, return_sequences=True, input_shape=(look_back, 1))) # 第一層LSTM
model.add(LSTM(50)) # 第二層LSTM
model.add(Dense(1)) # 輸出層
我后續只修改了這一項,得出結果如下,感覺只有訓練集效果稍微好了一些。
②調整時間步長 :嘗試look_back=3
或更高值,以利用更多歷史信息。
③添加Dropout層 :防止過擬合,例如:
from tensorflow.keras.layers import Dropout
model.add(Dropout(0.2))
(3)超參數調優
①學習率調整 :嘗試不同的學習率(如0.001或0.01)。
②批量大小優化 :將batch_size
調整為更大的值(如16或32)。
③早停機制 :避免過擬合,使用EarlyStopping
回調函數:
from tensorflow.keras.callbacks import EarlyStopping
early_stopping = EarlyStopping(monitor='val_loss', patience=10)
model.fit(trainX, trainY, epochs=200, batch_size=16, validation_split=0.2, callbacks=[early_stopping])
(4)數據預處理改進
①標準化代替歸一化 :如果數據分布較廣,可以嘗試StandardScaler
(均值為0,標準差為1)。
②特征工程 :引入額外特征(如節假日、促銷活動等)增強模型輸入。
(5)測試集驗證
①交叉驗證 :使用時間序列交叉驗證(TimeSeriesSplit)評估模型性能。
②重新劃分數據集 :確保訓練集和測試集分布一致,避免數據泄露。
六、完整代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
@Project : Time Series/LSTM/LSTM_car
@File : LSTM_car.py
@IDE : PyCharm
@Author : 半畝花海
@Date : 2025/03/10 17:36
"""
# ===================================
# 導入必要庫
# ===================================
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.layers import Dense, LSTM
from tensorflow.keras.models import Sequential, load_model
import tensorflow as tf# 禁用eager模式(兼容舊版本TensorFlow)
tf.compat.v1.disable_eager_execution()# 設置中文字體顯示
matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei']
matplotlib.rcParams['font.serif'] = ['Microsoft YaHei']
matplotlib.rcParams['axes.unicode_minus'] = False# ===================================
# 數據預處理
# ===================================
# 加載數據(取第2列數據,跳過末尾3行)
dataframe = pd.read_csv(r"D:\Python_demo\Time Series\LSTM\car.csv", usecols=[1], engine='python', skipfooter=3)
dataset = dataframe.values# 數據類型轉換
dataset = dataset.astype('float32')# 數據歸一化(0-1范圍)
scaler = MinMaxScaler(feature_range=(0, 1))
dataset = scaler.fit_transform(dataset)# 劃分訓練集和測試集(8:2比例)
train_size = int(len(dataset) * 0.80)
trainlist = dataset[:train_size]
testlist = dataset[train_size:]# ===================================
# 構建時間序列數據集
# ===================================
def create_dataset(dataset, look_back):"""將時間序列數據轉換為監督學習格式:param dataset: 原始數據集:param look_back: 時間步長(使用多少歷史數據預測):return: 輸入特征X和目標值Y"""dataX, dataY = [], []for i in range(len(dataset) - look_back - 1):a = dataset[i:(i + look_back)] # 取look_back長度的歷史數據dataX.append(a)dataY.append(dataset[i + look_back]) # 預測下一個時間點的值return np.array(dataX), np.array(dataY)# 設置時間步長(這里使用1步預測)
look_back = 1
trainX, trainY = create_dataset(trainlist, look_back)
testX, testY = create_dataset(testlist, look_back)# 調整輸入格式(LSTM需要[樣本數, 時間步, 特征數]的3D張量)
trainX = np.reshape(trainX, (trainX.shape[0], look_back, 1))
testX = np.reshape(testX, (testX.shape[0], look_back, 1))# ===================================
# 構建和訓練LSTM模型
# ===================================
# 構建LSTM模型
model = Sequential() # 創建序列模型
model.add(LSTM(5, input_shape=(look_back, 1))) # 添加LSTM層(5個神經元,輸入形狀為(時間步=1, 特征數=1))
model.add(Dense(1)) # 添加全連接輸出層
model.compile(loss='mean_squared_error', optimizer='adam') # 編譯模型(均方誤差損失,Adam優化器)
model.fit(trainX, trainY, epochs=150, batch_size=2, verbose=2) # 訓練模型(100輪,批量大小2)# 保存并加載模型
model_path = r"D:\Python_demo\Time Series\LSTM\lstm_model.h5"
model.save(model_path)
model = load_model(model_path)# ===================================
# 預測與反歸一化
# ===================================
# 對訓練集和測試集進行預測
trainPredict = model.predict(trainX)
testPredict = model.predict(testX)# 反歸一化(將預測結果還原到原始數據范圍)
trainPredict = scaler.inverse_transform(trainPredict)
trainY = scaler.inverse_transform(trainY)
testPredict = scaler.inverse_transform(testPredict)
testY = scaler.inverse_transform(testY)# ===================================
# 可視化結果
# ===================================
# 繪制訓練集結果
plt.figure(figsize=(10, 6))
plt.plot(trainY, label="訓練實際數據", color="#FF3B1D", marker='*', linestyle="--")
plt.plot(trainPredict[1:], label="訓練預測數據", color="#F9A602", marker='*', linestyle="--")
plt.title("訓練集預測效果")
plt.xlabel('時間步')
plt.ylabel('數值')
plt.legend()
plt.show()# 繪制測試集結果
plt.figure(figsize=(10, 6))
xx = np.linspace(0, len(testY), len(testY))
plt.plot(testY, label="測試實際數據", color="#FF3B1D", marker='*', linestyle="--")
plt.plot(testPredict, label="測試預測數據", color="#F9A602", marker='*', linestyle="--")
plt.title("測試集預測效果")
plt.xlabel('時間步')
plt.ylabel('數值')
plt.xlim(0, 18.5)
plt.ylim(0, 100)
plt.legend()# 添加數據標簽
for x, y in zip(xx, testY):plt.text(x, y + 0.3, int(y), ha='center', va='bottom', fontsize=10.5)
for x, y in zip(xx, testPredict):plt.text(x, y + 0.3, int(y), ha='center', va='bottom', fontsize=10.5)plt.show()# ===================================
# 模型評估
# ===================================
# 計算評估指標
mae = mean_absolute_error(testPredict, testY)
rmse = np.sqrt(mean_squared_error(testPredict, testY))
r2 = r2_score(testPredict, testY)print('測試集評估指標:')
print('MAE: %.3f' % mae)
print('RMSE: %.3f' % rmse)
print('R2: %.3f' % r2)