TTAO 預處理、CNN-BiGRU-MSA 模型? 時序數據回歸分析
時序數據分析方法,特點:
- TTAO 預處理:通過三角拓撲結構增強時序特征的局部和全局關系
- 混合模型架構:
- CNN 層提取局部特征模式
- BiGRU 捕獲雙向時序依賴
- 多頭自注意力機制進行序列建模
- 殘差連接和層歸一化提高訓練穩定性
- 高級優化技術:
- Huber 損失函數(對異常值更魯棒)
- 指數學習率衰減
- 早停和學習率回調
- 完整工作流:
- 命令行參數配置
- 數據預處理和特征工程
- 模型訓練和評估
- 多指標評估和可視化
- 多步預測支持:可通過
--pred_steps
參數配置預測未來多個時間步
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import os
import argparse
import warnings
warnings.filterwarnings('ignore')# 設置中文顯示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]def parse_args():"""解析命令行參數"""parser = argparse.ArgumentParser(description='時序數據深度學習回歸分析')parser.add_argument('--data_path', type=str, default='data.csv', help='數據文件路徑')parser.add_argument('--target_col', type=str, default='value', help='目標列名')parser.add_argument('--window_size', type=int, default=10, help='滑動窗口大小')parser.add_argument('--pred_steps', type=int, default=1, help='預測未來步數')parser.add_argument('--epochs', type=int, default=50, help='訓練輪次')parser.add_argument('--batch_size', type=int, default=32, help='批次大小')parser.add_argument('--model_save_path', type=str, default='best_model.h5', help='模型保存路徑')return parser.parse_args()def triangular_topological_aggregation_optimization(X, window_size, alpha=0.5):"""時間拓撲聚合優化(TTAO) - 增強時序數據的局部和全局特征關系基于三角拓撲結構對時序窗口內的數據進行加權聚合"""batch_size, seq_len, feature_dim = X.shapettao_output = np.zeros_like(X)for i in range(window_size, seq_len):# 構建三角權重矩陣triangle_weights = np.zeros((window_size, window_size))for j in range(window_size):for k in range(window_size):if k <= j:triangle_weights[j, k] = 1.0 - (j - k) / window_size# 應用三角權重進行聚合window_data = X[:, i-window_size:i, :]weighted_sum = np.zeros((batch_size, window_size, feature_dim))for j in range(window_size):weighted_window = window_data * triangle_weights[j, :].reshape(1, window_size, 1)weighted_sum[:, j, :] = np.sum(weighted_window, axis=1)# 結合原始數據和聚合結果aggregated = np.mean(weighted_sum, axis=1, keepdims=True)ttao_output[:, i, :] = alpha * X[:, i, :] + (1-alpha) * aggregated[:, 0, :]return ttao_outputdef create_sequences(data, window_size, pred_steps=1):"""創建滑動窗口序列,支持多步預測"""X, y = [], []for i in range(len(data) - window_size - pred_steps + 1):X.append(data[i:i+window_size, 0])y.append(data[i+window_size:i+window_size+pred_steps, 0])return np.array(X), np.array(y)def build_advanced_model(input_shape, head_size=256, num_heads=4, ff_dim=4, num_transformer_blocks=4, mlp_units=[128], dropout=0.1, mlp_dropout=0.1):"""構建結合CNN、BiGRU和多頭自注意力機制的前沿模型"""inputs = Input(shape=input_shape)x = inputs# 1D卷積提取局部特征x = Conv1D(filters=64, kernel_size=3, padding="causal", activation="relu")(x)x = BatchNormalization()(x)x = Conv1D(filters=128, kernel_size=3, padding="causal", activation="relu")(x)x = BatchNormalization()(x)x = MaxPooling1D(pool_size=2)(x)# BiGRU捕獲時序特征x = Bidirectional(GRU(128, return_sequences=True))(x)x = Dropout(0.2)(x)x = Bidirectional(GRU(64, return_sequences=True))(x)x = Dropout(0.2)(x)# 多頭自注意力機制for _ in range(num_transformer_blocks):residual = xx = LayerNormalization(epsilon=1e-6)(x)x = MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)x = Dropout(dropout)(x)x = x + residual # 殘差連接residual = xx = LayerNormalization(epsilon=1e-6)(x)x = Conv1D(filters=ff_dim, kernel_size=1, activation="gelu")(x)x = Dropout(dropout)(x)x = Conv1D(filters=input_shape[-1], kernel_size=1)(x)x = x + residual # 殘差連接x = LayerNormalization(epsilon=1e-6)(x)x = GlobalAveragePooling1D(data_format="channels_first")(x)# MLP層for dim in mlp_units:x = Dense(dim, activation="gelu")(x)x = Dropout(mlp_dropout)(x)outputs = Dense(1)(x) # 單步預測return Model(inputs, outputs)def plot_training_history(history):"""繪制訓練歷史"""plt.figure(figsize=(14, 5))# 繪制損失plt.subplot(1, 2, 1)plt.plot(history.history['loss'], label='訓練損失')plt.plot(history.history['val_loss'], label='驗證損失')plt.title('模型損失')plt.ylabel('損失')plt.xlabel('輪次')plt.legend()# 繪制MAEplt.subplot(1, 2, 2)plt.plot(history.history['mae'], label='訓練MAE')plt.plot(history.history['val_mae'], label='驗證MAE')plt.title('模型MAE')plt.ylabel('MAE')plt.xlabel('輪次')plt.legend()plt.tight_layout()plt.savefig('training_history.png')plt.close()def plot_prediction_results(y_true, y_pred, title='時序數據回歸預測結果'):"""繪制預測結果"""plt.figure(figsize=(14, 7))plt.plot(y_true, label='真實值', linewidth=2)plt.plot(y_pred, label='預測值', alpha=0.7, linewidth=2)plt.title(title, fontsize=16)plt.xlabel('時間點', fontsize=14)plt.ylabel('值', fontsize=14)plt.legend(fontsize=12)plt.grid(True, linestyle='--', alpha=0.7)plt.tight_layout()plt.savefig('prediction_results.png')plt.close()def main():args = parse_args()# 1. 數據加載print(f"正在加載數據: {args.data_path}")try:data = pd.read_csv(args.data_path)print(f"數據加載成功,共{len(data)}條記錄")except FileNotFoundError:print(f"錯誤:找不到數據文件 {args.data_path}")return# 2. 數據預處理print("正在進行數據預處理...")values = data[args.target_col].values.reshape(-1, 1)# 創建序列X, y = create_sequences(values, args.window_size, args.pred_steps)# 劃分訓練集和測試集split_idx = int(0.8 * len(X))X_train, X_test = X[:split_idx], X[split_idx:]y_train, y_test = y[:split_idx], y[split_idx:]# 數據歸一化scaler_X = MinMaxScaler(feature_range=(0, 1))scaler_y = MinMaxScaler(feature_range=(0, 1))X_train_scaled = scaler_X.fit_transform(X_train.reshape(-1, X_train.shape[1])).reshape(X_train.shape)X_test_scaled = scaler_X.transform(X_test.reshape(-1, X_test.shape[1])).reshape(X_test.shape)y_train_scaled = scaler_y.fit_transform(y_train.reshape(-1, y_train.shape[1])).reshape(y_train.shape)y_test_scaled = scaler_y.transform(y_test.reshape(-1, y_test.shape[1])).reshape(y_test.shape)# 調整形狀以適應模型輸入X_train_reshaped = X_train_scaled.reshape(X_train_scaled.shape[0], X_train_scaled.shape[1], 1)X_test_reshaped = X_test_scaled.reshape(X_test_scaled.shape[0], X_test_scaled.shape[1], 1)print(f"訓練集形狀: {X_train_reshaped.shape}, 測試集形狀: {X_test_reshaped.shape}")# 3. 應用TTAO預處理print("正在應用時間拓撲聚合優化(TTAO)...")X_train_tta = triangular_topological_aggregation_optimization(X_train_reshaped, args.window_size)X_test_tta = triangular_topological_aggregation_optimization(X_test_reshaped, args.window_size)# 4. 模型構建print("正在構建模型...")model = build_advanced_model(X_train_tta.shape[1:])# 使用學習率調度和優化器配置lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=1e-3,decay_steps=10000,decay_rate=0.9)optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)model.compile(optimizer=optimizer, loss='huber_loss', metrics=['mae'])model.summary()# 5. 模型訓練print("開始訓練模型...")callbacks = [EarlyStopping(patience=15, restore_best_weights=True),ModelCheckpoint(args.model_save_path, save_best_only=True, monitor='val_loss'),tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=5, min_lr=1e-6)]history = model.fit(X_train_tta, y_train_scaled,epochs=args.epochs,batch_size=args.batch_size,validation_split=0.2,callbacks=callbacks,verbose=1)# 6. 模型評估print("正在評估模型...")loss, mae = model.evaluate(X_test_tta, y_test_scaled, verbose=0)print(f"測試集指標 - Huber Loss: {loss:.4f}, MAE: {mae:.4f}")# 7. 預測并反歸一化y_pred_scaled = model.predict(X_test_tta)y_pred = scaler_y.inverse_transform(y_pred_scaled).flatten()y_true = scaler_y.inverse_transform(y_test_scaled.reshape(-1, y_test_scaled.shape[1])).flatten()# 計算額外指標mse = mean_squared_error(y_true, y_pred)rmse = np.sqrt(mse)mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100print(f"附加指標 - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAPE: {mape:.2f}%")# 8. 可視化plot_training_history(history)plot_prediction_results(y_true, y_pred)print("分析完成!結果已保存為圖表文件")if __name__ == "__main__":main()