EMD-SSA-VMD-LSTM混合模型
- 一、環境配置與依賴
- 二、數據生成(示例數據)
- 三、多級信號分解
- 1. 經驗模態分解(EMD)
- 2. 奇異譜分析(SSA)
- 3. 變分模態分解(VMD)
- 四、數據預處理
- 1. 歸一化處理
- 2. 數據集構建
- 五、混合LSTM模型
- 1. 模型架構
- 2. 模型訓練
- 六、預測與結果重構
- 1. 多步預測
- 2. 結果反歸一化
- 七、性能評估與可視化
- 1. 評估指標
- 2. 結果可視化
- 八、完整數據流說明
- 九、參數調優建議
- 十、擴展方向
- 源碼說明
以下是使用Python實現EMD-SSA-VMD-LSTM混合模型進行時間序列預測的完整代碼,結合經驗模態分解(EMD)、奇異譜分析(SSA)、變分模態分解(VMD)與LSTM深度學習模型。該方案適用于復雜非平穩信號的預測任務,代碼包含數據生成、多級分解、模型構建和結果可視化。
一、環境配置與依賴
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PyEMD import EMD
from vmdpy import VMD
from scipy.linalg import hankel, svd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader# 設置隨機種子保證可復現性
np.random.seed(42)
torch.manual_seed(42)
二、數據生成(示例數據)
def generate_complex_signal(length=1000):"""生成含多成分的非平穩信號"""t = np.linspace(0, 10, length)# 趨勢項trend = 0.02 * t**2 + 0.1 * t# 周期成分seasonal1 = 1.5 * np.sin(2 * np.pi * 0.8 * t)seasonal2 = 0.8 * np.cos(2 * np.pi * 2.5 * t)# 脈沖噪聲impulse = np.zeros(length)impulse[np.random.choice(length, 20)] = np.random.uniform(-3, 3, 20)# 高斯噪聲noise = 0.3 * np.random.randn(length)return trend + seasonal1 + seasonal2 + impulse + noise# 生成數據并可視化
data = generate_complex_signal()
plt.figure(figsize=(12,4))
plt.plot(data, color='darkblue')
plt.title("Generated Non-stationary Signal")
plt.show()
三、多級信號分解
1. 經驗模態分解(EMD)
def emd_decomposition(signal):emd = EMD()imfs = emd(signal)return imfsimfs_emd = emd_decomposition(data)
print(f"EMD分解得到 {imfs_emd.shape[0]} 個IMF分量")
2. 奇異譜分析(SSA)
def ssa_decomposition(signal, window=30, rank=3):"""奇異譜分析核心函數"""# 構建軌跡矩陣L = windowK = len(signal) - L + 1X = hankel(signal[:L], signal[L-1:])# 奇異值分解U, S, VT = svd(X, full_matrices=False)# 選擇主成分重構X_rank = (U[:, :rank] * S[:rank]) @ VT[:rank, :]# 對角平均化reconstructed = np.zeros_like(signal)for i in range(len(signal)):X_diag = np.diagonal(X_rank, offset=-(L-1-i))reconstructed[i] = X_diag.mean() if X_diag.size > 0 else 0return reconstructed# 對每個EMD-IMF執行SSA分解
components_ssa = []
for imf in imfs_emd:ssa_comp = ssa_decomposition(imf, window=30, rank=3)components_ssa.append(ssa_comp)
3. 變分模態分解(VMD)
def vmd_decomposition(signal, alpha=2000, K=4):u, _, _ = VMD(signal, alpha=alpha, tau=0, K=K, DC=0, init=1, tol=1e-7)return u# 對SSA結果進行VMD分解
final_components = []
for comp in components_ssa:vmd_comps = vmd_decomposition(comp, K=2)final_components.extend(vmd_comps)# 合并所有分量
all_components = np.vstack(final_components)
print(f"總分解分量數: {all_components.shape[0]}")
四、數據預處理
1. 歸一化處理
scalers = []
scaled_components = []
for comp in all_components:scaler = MinMaxScaler(feature_range=(-1, 1))scaled = scaler.fit_transform(comp.reshape(-1, 1)).flatten()scaled_components.append(scaled)scalers.append(scaler)scaled_components = np.array(scaled_components)
2. 數據集構建
class HybridDataset(Dataset):def __init__(self, components, lookback=60, horizon=1):self.components = componentsself.lookback = lookbackself.horizon = horizondef __len__(self):return self.components.shape[1] - self.lookback - self.horizon + 1def __getitem__(self, idx):x = self.components[:, idx:idx+self.lookback].T # (lookback, n_components)y = self.components[:, idx+self.lookback:idx+self.lookback+self.horizon].Treturn torch.FloatTensor(x), torch.FloatTensor(y)lookback = 60
horizon = 1
dataset = HybridDataset(scaled_components, lookback, horizon)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
五、混合LSTM模型
1. 模型架構
class MultiScaleLSTM(nn.Module):def __init__(self, input_size, hidden_size=128, output_size=1):super().__init__()# 特征提取層self.lstm1 = nn.LSTM(input_size, hidden_size, batch_first=True)self.dropout1 = nn.Dropout(0.3)# 時序預測層self.lstm2 = nn.LSTM(hidden_size, hidden_size//2, batch_first=True)self.dropout2 = nn.Dropout(0.2)# 輸出層self.fc = nn.Linear(hidden_size//2, output_size)def forward(self, x):# 輸入形狀: (batch_size, seq_len, input_size)out, (h, c) = self.lstm1(x)out = self.dropout1(out)out, _ = self.lstm2(out)out = self.dropout2(out[:, -1, :])return self.fc(out)
2. 模型訓練
model = MultiScaleLSTM(input_size=scaled_components.shape[0])
criterion = nn.MSELoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)# 訓練循環
for epoch in range(100):total_loss = 0for x, y in dataloader:optimizer.zero_grad()pred = model(x)loss = criterion(pred, y.squeeze())loss.backward()optimizer.step()total_loss += loss.item()print(f"Epoch {epoch+1}/100 | Loss: {total_loss/len(dataloader):.4f}")
六、預測與結果重構
1. 多步預測
def recursive_forecast(model, initial_seq, steps=50):current_seq = initial_seq.clone()predictions = []for _ in range(steps):with torch.no_grad():pred = model(current_seq.unsqueeze(0))predictions.append(pred.numpy()[0][0])# 更新輸入序列current_seq = torch.cat([current_seq[1:], pred.unsqueeze(0)])return np.array(predictions)# 獲取初始序列
test_input = scaled_components[:, -lookback:]
test_input = torch.FloatTensor(test_input.T) # (lookback, n_components)# 執行預測
pred_steps = 50
prediction = recursive_forecast(model, test_input, pred_steps)
2. 結果反歸一化
# 重構所有分量預測
pred_components = []
for i in range(len(scalers)):pred_scaled = prediction * 0 # 初始化pred_scaled[i::len(scalers)] = prediction # 分量位置插值pred_components.append(scalers[i].inverse_transform(pred_scaled.reshape(-1, 1)))# 合成最終結果
final_pred = np.sum(pred_components, axis=0).flatten()# 獲取真實值
true_values = data[-pred_steps:]
七、性能評估與可視化
1. 評估指標
mae = mean_absolute_error(true_values, final_pred)
rmse = np.sqrt(mean_squared_error(true_values, final_pred))
print(f"MAE: {mae:.4f}")
print(f"RMSE: {rmse:.4f}")
2. 結果可視化
plt.figure(figsize=(12,6))
plt.plot(true_values, label='True', marker='o', linestyle='--')
plt.plot(final_pred, label='Predicted', marker='x', linewidth=2)
plt.fill_between(range(len(final_pred)), final_pred - 1.96*rmse, final_pred + 1.96*rmse, alpha=0.2, color='orange')
plt.title("EMD-SSA-VMD-LSTM Multi-step Prediction")
plt.legend()
plt.grid(True)
plt.show()
八、完整數據流說明
步驟 | 技術實現 | 數學表達 |
---|---|---|
信號生成 | 合成趨勢項+周期項+噪聲 | x ( t ) = ∑ i = 1 n a i f i ( t ) + ? ( t ) x(t) = \sum_{i=1}^{n} a_i f_i(t) + \epsilon(t) x(t)=i=1∑n?ai?fi?(t)+?(t) |
EMD分解 | 自適應分解非平穩信號 | x ( t ) = ∑ k = 1 K c k ( t ) + r ( t ) x(t) = \sum_{k=1}^{K} c_k(t) + r(t) x(t)=k=1∑K?ck?(t)+r(t) |
SSA分解 | 軌跡矩陣SVD分解 | X = U Σ V T \mathbf{X} = \mathbf{U\Sigma V}^T X=UΣVT |
VMD分解 | 變分模態優化分解 | min ? { u k } , { ω k } ∑ k ∥ ? t [ u k ( t ) e ? j ω k t ] ∥ 2 2 \min_{\{u_k\},\{\omega_k\}} \sum_k \|\partial_t[u_k(t)e^{-j\omega_k t}]\|_2^2 {uk?},{ωk?}min?k∑?∥?t?[uk?(t)e?jωk?t]∥22? |
特征融合 | 多分量時序對齊 | X stack = [ C 1 T ; C 2 T ; … ; C n T ] \mathbf{X}_{\text{stack}} = [\mathbf{C}_1^T; \mathbf{C}_2^T; \dots; \mathbf{C}_n^T] Xstack?=[C1T?;C2T?;…;CnT?] |
LSTM建模 | 門控機制時序建模 | f t = σ ( W f ? [ h t ? 1 , x t ] + b f ) f_t = \sigma(W_f \cdot [h_{t-1}, x_t] + b_f) ft?=σ(Wf??[ht?1?,xt?]+bf?) |
結果重構 | 逆歸一化加權求和 | y ^ = ∑ k = 1 K scaler k ? 1 ( c ^ k ) \hat{y} = \sum_{k=1}^{K} \text{scaler}_k^{-1}(\hat{c}_k) y^?=k=1∑K?scalerk?1?(c^k?) |
九、參數調優建議
參數 | 優化策略 | 典型值范圍 |
---|---|---|
EMD最大IMF數 | 根據信號復雜度調整 | 5-10 |
SSA窗口長度 | 取1/3周期長度 | 20-50 |
VMD模態數(K) | 頻譜分析確定 | 3-6 |
LSTM隱藏層 | 防止過擬合 | 64-256 |
學習率 | 余弦退火調整 | 1e-4~1e-3 |
輸入序列長度 | 覆蓋主要周期 | 60-120 |
十、擴展方向
-
自適應分解
# 自動確定VMD的K值 from vmdpy import VMD def auto_vmd(signal, max_K=8):for K in range(3, max_K+1):u, _, _ = VMD(signal, alpha=2000, K=K)if np.any(np.isnan(u)):return K-1return max_K
-
概率預測
# 修改輸出層為分位數回歸 self.fc = nn.Linear(hidden_size//2, 3) # 輸出3個分位數
-
在線學習
# 增量訓練機制 def online_update(model, new_data):model.train()optimizer.zero_grad()outputs = model(new_data)loss = criterion(outputs, targets)loss.backward()optimizer.step()
源碼說明
-
數據兼容性
- 支持CSV輸入:修改
generate_complex_signal()
為pd.read_csv()
- 多變量擴展:調整輸入維度為
(n_features, seq_len)
- 支持CSV輸入:修改
-
性能優化
- 啟用CUDA加速:
model.to('cuda')
- 使用混合精度訓練:
scaler = torch.cuda.amp.GradScaler()
- 啟用CUDA加速:
-
工業級部署
# 模型保存與加載 torch.save(model.state_dict(), 'multiscale_lstm.pth') model.load_state_dict(torch.load('multiscale_lstm.pth'))
該方案通過三級分解(EMD-SSA-VMD)充分提取信號多尺度特征,結合深度LSTM建模復雜時序依賴,在非平穩信號預測中展現出顯著優勢。實際應用時需根據數據特性調整分解參數與模型結構,并通過誤差分析持續優化。