VectorBT:使用PyTorch+LSTM訓練和回測股票模型 進階四
本方案融合 LSTM 時序預測與動態風險控制。系統采用混合架構,離線訓練構建多尺度特征工程和雙均線策略,結合在線增量更新持續優化模型。技術要點包括三層特征篩選、波動率動態倉位管理、混合精度訓練提升效率,以及用 VectorBT 驗證收益。
文中內容僅限技術學習與代碼實踐參考,市場存在不確定性,技術分析需謹慎驗證,不構成任何投資建議。適合量化新手建立系統認知,為策略開發打下基礎。
本文是進階指南🚀,推薦先閱讀了解基礎知識??
- VectorBT:Python量化交易策略開發與回測評估詳解 🔥
- VectorBT:使用PyTorch+LSTM訓練和回測股票模型 進階一 🔥
- VectorBT:使用PyTorch+LSTM訓練和回測股票模型 進階二 🔥
- VectorBT:使用PyTorch+LSTM訓練和回測股票模型 進階三 🔥
1. 方案概述
本方案設計了一個基于PyTorch和LSTM的短期交易模型,支持多股票增量數據訓練和單股數據回測。該模型利用了時間序列預測、特征工程、波動率動態倉位管理、超參數優化以及回測等技術手段,以實現高效的交易策略生成和評估。
原理
LSTM預測模型:
- 采用多尺度LSTM架構,包含短期(5天)和中期(10天)兩個時間維度
- 引入時間注意力機制,自動捕捉重要時間節點
- 使用Huber Loss作為損失函數,增強對異常值的魯棒性
雙EMA策略:
- 基于預測收益率生成快慢EMA交叉信號
- 動態參數優化機制:使用Optuna框架進行參數尋優
- 風險控制:波動率動態倉位管理,雙重交易成本(0.5%總成本 ??示例數據??)
Optuna: 用于自動化的超參數優化。
特點
- 增量學習架構:
- 多維度特征體系:
- 8大類技術指標
- 自適應特征選擇機制
- 分層標準化策略
- 工程化設計:
- 狀態持久化(模型+預處理器)
- 設備自適應(CUDA/MPS/CPU)
- 全流程隨機種子控制
注意事項
- 數據泄漏防范:
- 嚴格按時間序列劃分數據集
- 分組計算收益率(groupby ts_code)
- 在線特征工程采用滯后窗口
- 參數過擬合風險:
- Optuna優化次數限制(n_trials=10)
- 使用Walk-Forward驗證
- 利潤目標與夏普率結合評估
- 計算資源考量:
- GPU顯存管理(batch_size=128)
- 數據批處理(window_size=5)
- 特征維度壓縮(SelectFromModel)
2. 序列圖
3. 工程代碼
目錄結構:
data/
├── processed_600000.SH.parquet
├── processed_600036.SH.parquet
├── processed_600519.SH.parquet
├── processed_000001.SZ.parquet
models/
├── vectorbt_4_model.pth
├── vectorbt_4_preprocessors.pkl
src/
└── vectorbt_4/├── data_processing.py├── model_definition.py├── training.py├── backtesting.py├── main.py└── __init__.py
3.1 data_processing.py
import pandas as pddef load_data(ts_codes, data_path="./data"):"""加載預處理后的股票數據:param ts_code: 股票代碼(如["600000.SH", "600036.SH", "000001.SZ"]):param data_path: 數據存儲路徑:return: 合并后的DataFrame(含ts_code列標識股票)處理步驟:1. 讀取parquet格式的本地數據2. 轉換交易日期格式3. 計算次日收益率(目標變量)4. 刪除缺失值"""dfs = []for code in ts_codes:df = pd.read_parquet(f"{data_path}/processed_{code}.parquet")df["ts_code"] = codedfs.append(df)combined_df = pd.concat(dfs)combined_df["trade_date"] = pd.to_datetime(combined_df["trade_date"], format="%Y%m%d")combined_df.set_index("trade_date", inplace=True)combined_df.sort_index(inplace=True)# 按股票分組計算收益率,避免跨股票計算,嚴格避免未來信息combined_df["returns"] = combined_df.groupby("ts_code", group_keys=False)["close"].apply(lambda x: x.pct_change().shift(-1).clip(-0.1, 0.1) # 添加收益率截斷)combined_df.dropna(inplace=True)return combined_df
3.2 model_definition.py
import torch
import torch.nn as nn
import torch.nn.functional as Fclass TemporalAttention(nn.Module):def __init__(self, hidden_dim):"""時間注意力機制模塊。:param hidden_dim: 隱藏層維度"""super().__init__()# 定義線性變換W,用于計算注意力得分self.W = nn.Linear(hidden_dim, hidden_dim)# 定義線性變換V,用于將得分轉換為權重self.V = nn.Linear(hidden_dim, 1)def forward(self, hidden):"""前向傳播函數。:param hidden: 輸入隱藏狀態 (batch, seq_len, hidden_dim):return: 上下文向量 (batch, hidden_dim)"""# 對hidden應用W變換,并使用tanh激活函數score = torch.tanh(self.W(hidden)) # (batch, seq_len, hidden_dim)# 計算每個時間步的注意力權重attention_weights = F.softmax(self.V(score), dim=1) # (batch, seq_len, 1)# 加權求和得到上下文向量context = torch.sum(attention_weights * hidden, dim=1) # (batch, hidden_dim)return contextclass MultiScaleLSTM(nn.Module):def __init__(self, input_dim, hidden_dim=256, num_layers=3):"""多尺度LSTM模型。:param input_dim: 輸入特征維度:param hidden_dim: LSTM隱藏層維度,defaults to 256:param num_layers: LSTM層數,defaults to 3"""super().__init__()# 短期LSTM配置self.lstm_short = nn.LSTM(input_dim,hidden_dim // 2,num_layers=num_layers,bidirectional=True,batch_first=True,)# 中期LSTM配置self.lstm_mid = nn.LSTM(input_dim,hidden_dim,num_layers=num_layers,bidirectional=True,batch_first=True,)# 短期特征的時間注意力機制self.temp_attn_short = TemporalAttention(hidden_dim)# 中期特征的時間注意力機制self.temp_attn_mid = TemporalAttention(hidden_dim * 2)# 動態計算融合后的維度self.fusion_dim = hidden_dim + hidden_dim * 2# 收益預測頭self.return_head = nn.Sequential(nn.Linear(self.fusion_dim, 128),nn.LayerNorm(128),nn.GELU(),nn.Dropout(0.3),nn.Linear(128, 1), # 輸出維度為1)def forward(self, x):"""前向傳播函數。:param x: 輸入數據 (batch, seq_len, input_dim):return: 預測收益 (batch_size, 1)"""# 短期特征處理out_short, _ = self.lstm_short(x) # (batch, seq_len, hidden_dim)context_short = self.temp_attn_short(out_short) # (batch, hidden_dim)# 中期特征處理(帶降采樣)x_mid = F.max_pool1d(x.transpose(1, 2), kernel_size=2).transpose(1, 2) # (batch, seq_len/2, input_dim)out_mid, _ = self.lstm_mid(x_mid) # (batch, seq_len/2, hidden_dim*2)context_mid = self.temp_attn_mid(out_mid) # (batch, hidden_dim*2)# 特征融合combined = torch.cat([context_short, context_mid], dim=-1) # (batch, fusion_dim)# 單任務輸出return self.return_head(combined) # 輸出形狀 [batch_size, 1]
3.3 training.py
import osimport joblib
import numpy as np
import optuna
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectFromModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdmfrom vectorbt_4.model_definition import MultiScaleLSTMclass SingleWindowDataset(Dataset):def __init__(self, X, y):"""單窗口數據集類,用于將時間序列數據轉換為PyTorch數據集格式。:param X: 滑動窗口特征數據:param y: 目標變量"""self.X = X.astype(np.float32) # 將滑動窗口特征數據轉換為float32類型self.y = y.astype(np.float32) # 將目標變量轉換為float32類型def __len__(self):return len(self.y) # 返回目標變量的長度def __getitem__(self, idx):x = torch.from_numpy(self.X[idx]).float() # 將滑動窗口特征數據轉換為PyTorch張量label = torch.tensor(self.y[idx], dtype=torch.float32) # 將目標變量轉換為PyTorch張量return x, label # 返回特征和標簽class TrainingStateManager:def __init__(self, model_dir="./models"):"""訓練狀態管理器類,負責模型和預處理器的保存和加載。:param model_dir: 模型保存目錄,defaults to "./models""""self.model_dir = model_dir # 設置模型保存目錄self.model_path = f"{self.model_dir}/vectorbt_4_model.pth" # 設置模型文件路徑self.preprocessors_path = (f"{self.model_dir}/vectorbt_4_preprocessors.pkl" # 設置預處理器文件路徑)os.makedirs(model_dir, exist_ok=True) # 創建模型保存目錄(如果不存在)def save(self, model, feature_engineer, feature_selector, config):"""保存模型和預處理器。:param model: 訓練好的模型:param feature_engineer: 特征工程對象:param feature_selector: 特征選擇器:param config: 模型配置"""torch.save(model.state_dict(), self.model_path) # 保存模型參數joblib.dump({"feature_engineer": feature_engineer,"feature_selector": feature_selector,"config": config,},self.preprocessors_path,) # 保存預處理器和配置print(f"Model saved to {self.model_path}") # 打印模型保存路徑print(f"Preprocessor saved to {self.preprocessors_path}") # 打印預處理器保存路徑def load(self, device):"""加載模型和預處理器。:param device: 計算設備(CPU/GPU):return: 加載的模型、特征工程對象、特征選擇器和模型配置"""model = None # 初始化模型feature_engineer = None # 初始化特征工程對象feature_selector = None # 初始化特征選擇器config = None # 初始化配置if os.path.exists(self.preprocessors_path): # 如果預處理器文件存在preprocess = joblib.load(self.preprocessors_path) # 加載預處理器feature_engineer = preprocess["feature_engineer"] # 獲取特征工程對象feature_selector = preprocess["feature_selector"] # 獲取特征選擇器config = preprocess["config"] # 獲取配置if os.path.exists(self.model_path): # 如果模型文件存在model_weights = torch.load(self.model_path, weights_only=False, map_location=device) # 加載模型參數model = MultiScaleLSTM(input_dim=config["input_dim"], hidden_dim=config["hidden_dim"]).to(device) # 初始化模型并移動到指定設備model.load_state_dict(model_weights) # 加載模型參數return (model,feature_engineer,feature_selector,config,) # 返回加載的模型、特征工程對象、特征選擇器和配置class OnlineFeatureEngineer:def __init__(self, windows=[5]):"""在線特征工程生成器類,用于生成技術指標特征并進行標準化。:param windows: 滑動窗口列表(用于特征生成),defaults to [5]"""self.windows = windows # 設置滑動窗口列表self.n_features = 10 # 設置特征數量self.scalers = {"Trend": RobustScaler(), # 趨勢類指標使用RobustScaler"Momentum": MinMaxScaler(), # 動量類指標使用MinMaxScaler"Volatility": RobustScaler(), # 波動率類指標使用RobustScaler"Volume": StandardScaler(), # 成交量類指標使用StandardScaler# "Sentiment": StandardScaler(), # 市場情緒類指標使用StandardScaler"SupportResistance": MinMaxScaler(), # 支撐阻力類指標使用MinMaxScaler"Statistical": StandardScaler(), # 統計類指標使用StandardScaler"Composite": RobustScaler(), # 復合型指標使用RobustScaler}# "price": ["open", "high", "low", "close"],self.feature_groups = {# Trend-Following Indicators 趨勢類指標"Trend": ["MA20", "EMA12", "MACD", "ADX", "SAR"],# Momentum Indicators 動量類指標"Momentum": ["RSI", "KDJ_K", "KDJ_D", "KDJ_J", "CCI", "WILLR"],# Volatility Indicators 波動率類指標"Volatility": ["BB_upper", "BB_middle", "BB_lower", "ATR", "STD20"],# Volume Indicators 成交量類指標"Volume": ["OBV", "MFI"],# Market Sentiment Indicators 市場情緒類指標 (需要外部數據)# "Sentiment": [],# Support/Resistance Indicators 支撐阻力類指標"SupportResistance": ["Fib_0.382", "Fib_0.618", "Pivot"],# Statistical Indicators 統計類指標"Statistical": ["LR_slope", "LR_angle"],# Composite Indicators 復合型指標"Composite": ["Ichimoku_tenkan","Ichimoku_kijun","Ichimoku_senkou_a","Ichimoku_senkou_b","Ichimoku_chikou",],}self.all_features = [f for sublist in self.feature_groups.values() for f in sublist] # 生成所有特征列表def partial_fit(self, new_df):"""對新數據進行部分擬合。:param new_df: 新數據DataFrame"""new_features = self.generate_features(new_df, refit=True) # 生成新數據的特征for group, features in self.feature_groups.items(): # 遍歷每個特征組if hasattr(self.scalers[group], "partial_fit"): # 如果該縮放器支持部分擬合self.scalers[group].partial_fit(new_features[features]) # 對新數據進行部分擬合def generate_features(self, df, refit=False):"""生成技術指標特征。:param df: 原始數據DataFrame:param refit: 是否重新擬合標準化器,defaults to False:return: 特征DataFrame生成8大類技術指標:1. 趨勢類指標(MA, MACD等)2. 動量類指標(RSI, KDJ等)3. 波動率指標(布林帶, ATR等)4. 成交量指標(OBV, MFI等)5. 市場情緒類指標 (需要外部數據) -- 忽略6. 支撐阻力指標(斐波那契回撤等)7. 統計指標(線性回歸斜率等)8. 復合指標(Ichimoku云圖等)"""processed = [] # 初始化處理后的特征列表for group, features in self.feature_groups.items(): # 遍歷每個特征組scaler = self.scalers[group] # 獲取對應的縮放器if not refit: # 如果不重新擬合scaler.fit(df[features]) # 擬合縮放器scaled = scaler.transform(df[features]) # 標準化特征processed.append(scaled) # 添加到處理后的特征列表processed_df = pd.DataFrame(np.hstack(processed), index=df.index, columns=self.all_features) # 將處理后的特征合并為DataFrameprocessed_df["ts_code"] = df["ts_code"] # 添加股票代碼processed_df["returns"] = df["returns"] # 添加收益return processed_df.dropna() # 刪除缺失值def feature_selection(self, X, y):"""進行特征選擇。:param X: 特征數據:param y: 目標變量:return: 選擇后的特征數據"""selector = RandomForestRegressor(n_estimators=50, n_jobs=-1) # 初始化隨機森林回歸器selector.fit(X, y) # 擬合隨機森林回歸器feature_selector = SelectFromModel(selector, prefit=True) # 初始化特征選擇器return feature_selector # 返回特征選擇器class IncrementalDataHandler:def __init__(self, feature_engineer, feature_selector, window_size=5):"""增量數據處理器類,用于處理增量數據并生成滑動窗口數據。:param feature_engineer: 特征工程對象:param feature_selector: 特征選擇器:param window_size: 滑動窗口大小,defaults to 5"""self.feature_engineer = feature_engineer # 設置特征工程對象self.feature_selector = feature_selector # 設置特征選擇器self.window_size = window_size # 設置滑動窗口大小self.buffer = pd.DataFrame() # 初始化數據緩沖區def update_buffer(self, new_df):"""更新數據緩沖區。:param new_df: 新數據DataFrame"""self.buffer = pd.concat([self.buffer, new_df]).sort_index() # 更新數據緩沖區并排序def prepare_incremental_data(self):"""準備增量數據。:return: 訓練數據和測試數據"""processed_df = self.feature_engineer.generate_features(self.buffer) # 生成特征X_selected = self.feature_selector.transform(processed_df[self.feature_engineer.all_features].values) # 選擇特征X, y = self.sliding_window(processed_df, X_selected) # 生成滑動窗口數據X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=False) # 劃分訓練集和測試集return (X_train, y_train), (X_test, y_test) # 返回訓練集和測試集def sliding_window(self, df, features):"""生成時序滑動窗口數據"""sequences = [] # 初始化序列列表labels = [] # 初始化標簽列表for ts_code, group in df.groupby("ts_code"): # 按股票代碼分組group_features = features[df["ts_code"] == ts_code] # 獲取該股票的特征for i in range(self.window_size, len(group)): # 生成滑動窗口sequences.append(group_features[i - self.window_size : i]) # 添加滑動窗口特征labels.append(group["returns"].iloc[i]) # 添加標簽return np.array(sequences, dtype=np.float32), np.array(labels, dtype=np.float32) # 返回滑動窗口特征和標簽class IncrementalTrainer:def __init__(self, device, window_size=5):"""增量訓練控制器類,負責模型的初始訓練和增量更新。:param device: 計算設備(CPU/GPU)"""self.device = device # 設置計算設備self.window_size = window_size # 設置滑動窗口大小self.state_manager = TrainingStateManager() # 初始化訓練狀態管理器self.model = None # 初始化模型self.feature_engineer = None # 初始化特征工程對象self.feature_selector = None # 初始化特征選擇器self.config = None # 初始化配置self.load_state() # 加載狀態def load_state(self):"""加載模型和預處理器。"""(self.model,self.feature_engineer,self.feature_selector,self.config,) = self.state_manager.load(self.device) # 加載模型、特征工程對象、特征選擇器和配置def initial_train(self, df):"""初始訓練模型。:param df: 原始數據DataFrame"""# 特征生成self.feature_engineer = OnlineFeatureEngineer(windows=[self.window_size]) # 初始化特征工程對象processed_df = self.feature_engineer.generate_features(df) # 生成特征# 特征選擇X, y = processed_df[self.feature_engineer.all_features], df["returns"].reindex(processed_df.index) # 準備特征和標簽self.feature_selector = self.feature_engineer.feature_selection(X, y) # 選擇特征# 準備訓練數據data_handler = IncrementalDataHandler(feature_engineer=self.feature_engineer,feature_selector=self.feature_selector,window_size=self.window_size,) # 初始化增量數據處理器data_handler.buffer = df # 更新數據緩沖區(X_train, y_train), (X_test, y_test) = (data_handler.prepare_incremental_data()) # 準備訓練數據# Optuna超參優化self._optimize_parameters(X_train, y_train, X_test, y_test) # 優化超參數# 最佳模型訓練print(f"Initial Model Config: {self.config}") # 打印初始模型配置self._train(X_train, y_train) # 訓練模型print("Initial Model Evaluation:") # 打印初始模型評估self._evaluate(X_test, y_test) # 評估模型print("Initial Model Save:") # 打印初始模型保存self.state_manager.save(self.model, self.feature_engineer, self.feature_selector, self.config) # 保存模型def incremental_update(self, new_df):"""增量更新模型。:param new_df: 新數據DataFrame"""if not self.model:print("The model does not exist, please train it first.") # 如果模型不存在,提示先訓練模型returnself.feature_engineer.partial_fit(new_df) # 對新數據進行部分擬合data_handler = IncrementalDataHandler(self.feature_engineer, self.feature_selector) # 初始化增量數據處理器data_handler.update_buffer(new_df) # 更新數據緩沖區(X_train, y_train), (X_test, y_test) = (data_handler.prepare_incremental_data()) # 準備增量數據print(f"Incremental Model Config: {self.config}") # 打印增量模型配置self._train(X_train, y_train) # 訓練模型print("\nIncremental Model Evaluation:") # 打印增量模型評估self._evaluate(X_test, y_test) # 評估模型print("\nIncremental Model Save:") # 打印增量模型保存self.state_manager.save(self.model, self.feature_engineer, self.feature_selector, self.config) # 保存模型def _optimize_parameters(self, X_train, y_train, X_test, y_test):"""Optuna超參優化:param X_train: 訓練集滑動窗口特征數據:param y_train: 訓練集目標變量:param X_test: 測試集滑動窗口特征數據:param y_test: 測試集目標變量"""def objective(trial):self.config = {"hidden_dim": trial.suggest_int("hidden_dim", 64, 256), # 建議隱藏層維度"lr": trial.suggest_float("lr", 1e-4, 1e-3, log=True), # 建議學習率"batch_size": trial.suggest_categorical("batch_size", [32, 64, 128]), # 建議批量大小"weight_decay": trial.suggest_float("weight_decay", 1e-6, 1e-4), # 建議權重衰減"input_dim": X_train[0].shape[-1], # 輸入特征維度"epochs": 100, # 訓練輪數"window_size": self.window_size, # 滑動窗口大小}self._train(X_train, y_train) # 訓練模型val_loss = self._evaluate(X_test, y_test) # 評估模型print(f"Val Loss: {val_loss:.4f}") # 打印驗證損失return val_loss # 返回驗證損失# 超參優化study = optuna.create_study(direction="minimize") # 創建研究study.optimize(objective, n_trials=1, show_progress_bar=True) # 優化目標函數print(f"Training Best params: {study.best_params}") # 打印最佳參數# 最佳模型參數self.config.update(study.best_params) # 更新配置def _train(self, X_train, y_train):"""模型訓練內部方法。:param X_train: 訓練集滑動窗口特征數據:param y_train: 訓練集目標變量"""print(f"Training Model Config: {self.config}") # 打印模型配置lr = self.config.get("lr", 1e-4) # 獲取學習率epochs = self.config.get("epochs", 100) # 獲取訓練輪數batch_size = self.config.get("batch_size", 128) # 獲取批量大小weight_decay = self.config.get("weight_decay", 1e-6) # 獲取權重衰減hidden_dim = self.config.get("hidden_dim", 128) # 獲取隱藏層維度input_dim = self.config.get("input_dim", X_train[0].shape[-1]) # 獲取輸入特征維度dataset = SingleWindowDataset(X_train, y_train) # 初始化數據集loader = DataLoader(dataset,batch_size=batch_size,shuffle=False,) # 初始化數據加載器# 初始化模型self.model = MultiScaleLSTM(input_dim=input_dim, hidden_dim=hidden_dim).to(self.device) # 初始化模型并移動到指定設備optimizer = optim.AdamW(self.model.parameters(),lr=lr,weight_decay=weight_decay,) # 初始化優化器scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, "min", patience=5) # 初始化學習率調度器criterion = nn.HuberLoss() # 初始化損失函數# 訓練循環for epoch in tqdm(range(epochs), desc="Training"): # 進行訓練self.model.train() # 設置模型為訓練模式total_loss = 0 # 初始化總損失for X_batch, y_batch in loader: # 遍歷數據加載器X_batch = X_batch.to(self.device) # 將特征數據移動到指定設備y_batch = y_batch.to(self.device).unsqueeze(1) # 將標簽數據移動到指定設備并擴展維度optimizer.zero_grad() # 清零梯度preds = self.model(X_batch) # 前向傳播loss = criterion(preds, y_batch) # 計算損失loss.backward() # 反向傳播nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) # 梯度裁剪optimizer.step() # 更新參數total_loss += loss.item() # 累加損失# 學習率調整avg_loss = total_loss / len(loader) # 計算平均損失scheduler.step(avg_loss) # 更新學習率def _evaluate(self, X_test, y_test):"""模型評估內部方法。:param X_test: 測試集滑動窗口特征數據:param y_test: 測試集目標變量:return: 平均損失"""test_dataset = SingleWindowDataset(X_test, y_test) # 初始化測試數據集test_loader = DataLoader(test_dataset,batch_size=128,shuffle=False,) # 初始化測試數據加載器self.model.eval() # 設置模型為評估模式total_loss = 0 # 初始化總損失criterion = nn.HuberLoss() # 初始化損失函數with torch.no_grad(): # 關閉梯度計算for X_batch, y_batch in test_loader: # 遍歷測試數據加載器X_batch = X_batch.to(self.device) # 將特征數據移動到指定設備y_batch = y_batch.to(self.device).unsqueeze(1) # 將標簽數據移動到指定設備并擴展維度preds = self.model(X_batch) # 前向傳播loss = criterion(preds, y_batch) # 計算損失total_loss += loss.item() * len(y_batch) # 累加損失test_loss = total_loss / len(test_dataset) # 計算平均損失print(f"Test Loss: {test_loss}") # 打印測試損失return test_loss # 返回測試損失
3.4 backtesting.py
import numpy as np
import optuna
import pandas as pd
import torch
import vectorbt as vbtclass DualEMACrossoverStrategy:def __init__(self, pred_returns, volatility, params):"""雙EMA交叉策略類。:param pred_returns: 預測的收益率序列:param volatility: 波動率序列(用于倉位計算):param params: 參數字典,包含快慢EMA的時間跨度"""self.pred_returns = pred_returns # 預測的收益率序列self.volatility = volatility.clip(lower=0.01) # 防止波動率為0,導致除零錯誤self.fast_span = params["fast_span"] # 快速EMA的時間跨度self.slow_span = params["slow_span"] # 慢速EMA的時間跨度def generate_signals(self):"""生成交易信號。:return: (交易信號, 倉位大小)"""ema_fast = self.pred_returns.ewm(span=self.fast_span, min_periods=self.fast_span).mean() # 計算快速EMAema_slow = self.pred_returns.ewm(span=self.slow_span, min_periods=self.slow_span).mean() # 計算慢速EMAsignals = pd.Series(0, index=self.pred_returns.index) # 初始化信號序列signals[(ema_fast > ema_slow) & (ema_fast.shift(1) <= ema_slow.shift(1))] = (1 # 買入信號)signals[(ema_fast < ema_slow) & (ema_fast.shift(1) >= ema_slow.shift(1))] = (-1) # 賣出信號# 使用tanh函數壓縮倉位大小,實現非線性映射position_size = np.tanh(self.pred_returns.abs() / self.volatility).clip(0.3, 0.8)return signals, position_size # 返回信號和倉位大小class BacktestStrategy:def __init__(self, model, device):"""回測執行引擎。:param model: 訓練好的預測模型:param device: 計算設備(CPU/GPU)"""self.model = model # 訓練好的預測模型self.device = device # 計算設備self._configure_vbt() # 配置VectorBT全局參數def _configure_vbt(self):"""配置VectorBT全局參數。"""vbt.settings.array_wrapper["freq"] = "D" # 設置時間頻率為日頻vbt.settings.plotting["layout"]["template"] = "vbt_dark" # 使用暗色主題vbt.settings.plotting["layout"]["width"] = 1200 # 設置圖表寬度vbt.settings.portfolio["init_cash"] = 100000.0 # 初始資金10萬元vbt.settings.portfolio["fees"] = 0.0025 # 交易成本(手續費)0.25%vbt.settings.portfolio["slippage"] = 0.0025 # 交易成本(滑點)0.25%def _optimize_parameters(self, result_df):"""優化參數邏輯調整。:param result_df: 包含預測收益率的結果DataFrame:return: 最優參數"""def objective(trial):params = {"fast_span": trial.suggest_int("fast_span", 10, 30), # 建議快速EMA的時間跨度"slow_span": trial.suggest_int("slow_span", 50, 100), # 建議慢速EMA的時間跨度}strategy = DualEMACrossoverStrategy(pred_returns=result_df["pred_returns"],volatility=result_df["volatility"],params=params,) # 創建策略實例signals, position_size = strategy.generate_signals() # 生成交易信號pf = vbt.Portfolio.from_signals(close=result_df["close"],entries=signals.shift(1) == 1, # 買入信號exits=signals.shift(1) == -1, # 賣出信號size=position_size, # 固定倉位freq="D",)return pf.total_profit() # 返回總利潤study = optuna.create_study(direction="maximize") # 創建Optuna研究study.optimize(objective, n_trials=10, show_progress_bar=True) # 優化參數print(f"Strategy Best params: {study.best_params}") # 打印最優參數return study.best_params # 返回最優參數def run(self, test_data, df):"""執行完整回測流程。:param test_data: 測試數據集元組(X_test, y_test):param df: 原始數據DataFrame:return: (組合對象, 結果DataFrame)"""X_test = test_data # 測試數據self.model.eval() # 將模型設置為評估模式with torch.no_grad(): # 禁用梯度計算test_tensor = torch.FloatTensor(X_test).to(self.device) # 轉換為Tensor并移動到指定設備preds = (self.model(test_tensor).cpu().numpy().flatten()) # 獲取預測值并轉換為NumPy數組test_dates = df.index[-len(preds) :] # 獲取測試日期result_df = pd.DataFrame({"close": df["close"].values[-len(preds) :],"pred_returns": preds, # 使用rolling窗口計算動態波動率"volatility": df["ATR"].values[-len(preds) :]/ df["close"].values[-len(preds) :],},index=test_dates,) # 創建結果DataFramebest_params = self._optimize_parameters(result_df) # 運行參數優化strategy = DualEMACrossoverStrategy(pred_returns=result_df["pred_returns"],volatility=result_df["volatility"],params=best_params,) # 創建策略實例signals, position_size = strategy.generate_signals() # 生成交易信號return vbt.Portfolio.from_signals(close=result_df["close"],entries=signals == 1, # 買入信號exits=signals == -1, # 賣出信號size=position_size,size_type="percent",freq="D",) # 執行組合回測
3.5 main.py
import randomimport numpy as np
import torchfrom vectorbt_4.backtesting import BacktestStrategy
from vectorbt_4.data_processing import load_data
from vectorbt_4.training import IncrementalTrainer, TrainingStateManagerdef set_random_seed(seed=42):"""設置全局隨機種子:param seed: 隨機種子, defaults to 42影響范圍:- Python內置隨機模塊- Numpy隨機數生成- PyTorch CPU/CUDA隨機種子"""random.seed(seed)np.random.seed(seed)torch.manual_seed(seed)if torch.cuda.is_available():torch.cuda.manual_seed(seed)torch.cuda.manual_seed_all(seed) # 如果使用多個GPUtorch.backends.cudnn.deterministic = Truetorch.backends.cudnn.benchmark = Falsedef prepare_backtest_data(ts_code, device):"""準備回測數據。:param ts_code: 股票代碼:param device: 計算設備(CPU/GPU):return: 滑動窗口特征數據、原始數據DataFrame、模型、特征工程對象、特征選擇器和配置"""state_manager = TrainingStateManager() # 初始化訓練狀態管理器model, feature_engineer, feature_selector, config = state_manager.load(device) # 加載模型和預處理器print(f"Model Config: {config}") # 打印模型配置# 加載并處理數據test_df = load_data([ts_code]) # 加載數據test_df = test_df[-300:] # 取最近300條數據processed_df = feature_engineer.generate_features(test_df) # 生成特征X_selected = feature_selector.transform(processed_df[feature_engineer.all_features].values) # 選擇特征# 構建滑動窗口window_size = config["window_size"] # 獲取滑動窗口大小sequences = [] # 初始化序列列表for i in range(window_size, len(X_selected)): # 遍歷數據sequences.append(X_selected[i - window_size : i]) # 添加滑動窗口特征return (np.array(sequences, dtype=np.float32), # 返回滑動窗口特征數據test_df, # 返回原始數據DataFramemodel, # 返回模型feature_engineer, # 返回特征工程對象feature_selector, # 返回特征選擇器config, # 返回配置)if __name__ == "__main__":# 設置隨機種子# 函數確保了整個訓練過程的可重復性。# 通過設置相同的隨機種子,可以保證每次運行時生成的隨機數序列一致,這對于調試和實驗驗證非常重要。set_random_seed()# 檢測可用計算設備(優先使用CUDA)device = torch.device("cuda"if torch.cuda.is_available()else "mps" if torch.backends.mps.is_available() else "cpu")# 股票行情# start_date = "20100101"# end_date = "20241231"trainer = IncrementalTrainer(device)# 多股票初始訓練示例# 浦發銀行(600000.SH)# 招商銀行(600036.SH)# 平安銀行(000001.SZ)train_codes = ["600000.SH", "600036.SH", "000001.SZ"]train_df = load_data(train_codes)model = trainer.initial_train(train_df)# 增量訓練示例# 貴州茅臺(600519.SH)# new_df = load_data(["600519.SH"])# model = trainer.incremental_update(new_df)# 單股票回測(test_data, test_df, model, feature_engineer, feature_selector, config) = (prepare_backtest_data("600519.SH", device))backtester = BacktestStrategy(model, device)pf = backtester.run(test_data, test_df)print("回測結果統計:")print(pf.stats())pf.plot().show()
風險提示與免責聲明
本文內容基于公開信息研究整理,不構成任何形式的投資建議。歷史表現不應作為未來收益保證,市場存在不可預見的波動風險。投資者需結合自身財務狀況及風險承受能力獨立決策,并自行承擔交易結果。作者及發布方不對任何依據本文操作導致的損失承擔法律責任。市場有風險,投資須謹慎。