VectorBT:使用PyTorch+Transformer訓練和回測股票模型 進階五
本方案基于PyTorch框架與Transformer模型,結合VectorBT回測引擎構建多股票量化交易系統,采用滑動窗口技術構建時序特征,通過自注意力機制捕捉市場規律預測收益率,集成雙EMA交叉策略動態生成交易信號,利用Optuna優化模型超參與策略參數,支持增量訓練更新特征分布,結合波動率調整非線性倉位,并通過分組標準化與股票分組計算嚴格規避數據泄漏風險,實現端到端的量化策略研發閉環。
文中內容僅限技術學習與代碼實踐參考,市場存在不確定性,技術分析需謹慎驗證,不構成任何投資建議。適合量化新手建立系統認知,為策略開發打下基礎。
本文是進階指南🚀,推薦先閱讀了解基礎知識??
- VectorBT:Python量化交易策略開發與回測評估詳解 🔥
- VectorBT:使用PyTorch+LSTM訓練和回測股票模型 進階一 🔥
- VectorBT:使用PyTorch+LSTM訓練和回測股票模型 進階二 🔥
- VectorBT:使用PyTorch+LSTM訓練和回測股票模型 進階三 🔥
- VectorBT:使用PyTorch+LSTM訓練和回測股票模型 進階四 🔥
1. 方案概述
本方案基于PyTorch框架與Transformer模型,結合VectorBT回測引擎構建多股票量化交易系統,采用滑動窗口技術構建時序特征,通過自注意力機制捕捉市場規律預測收益率,集成雙EMA交叉策略動態生成交易信號,利用Optuna優化模型超參與策略參數,支持增量訓練更新特征分布,結合波動率調整非線性倉位,并通過分組標準化與股票分組計算嚴格規避數據泄漏風險,實現端到端的量化策略研發閉環。
1.1 核心原理
- 多時序特征編碼:通過滑動窗口技術構建三維特征矩陣(樣本×時間步×特征)
- Transformer建模:利用自注意力機制捕捉時序依賴關系
- 動態倉位管理:結合波動率調整倉位大小(使用tanh函數壓縮)
- 增量學習機制:支持在線更新模型參數和特征分布
1.2 關鍵特點
- 多股票聯合訓練:共享特征表示,提升模型泛化能力
- 非線性倉位控制:position_size = tanh(|return|/volatility)
- 參數自動優化:使用Optuna進行雙重優化(模型超參+策略參數)
- 特征魯棒處理:分組標準化(趨勢類用RobustScaler,成交量用StandardScaler)
1.3 注意事項
- 數據泄漏風險:嚴格按股票分組計算收益率和技術指標
- 設備兼容性:支持CUDA/MPS/CPU多設備自動切換
- 內存管理:滑動窗口生成時需控制窗口大小(默認5天)
- 過擬合預防:采用早停機制和學習率動態調整
2. 系統架構
架構說明:
-
應用層:通過
main.py
整合- 統一訓練/回測接口
- 設備自動檢測
- 全流程種子控制
-
數據層:通過
data_processing.py
實現- 多股票數據加載與合并
- 收益率計算與異常值處理
- 嚴格的時間序列管理
-
模型層:定義于
model_definition.py
- Transformer架構實現時序預測
- 包含位置編碼和多頭注意力機制
- 支持動態維度調整的Encoder結構
-
訓練層:通過
training.py
實現- 增量式訓練框架
- 在線特征工程系統
- 自適應特征選擇機制
- Optuna超參優化集成
-
回測層:定義于
backtesting.py
- 雙EMA交叉策略引擎
- 波動率自適應倉位管理
- 策略參數動態優化模塊
2.1 數據層(Data Layer)
對應代碼:data_processing.py
def load_data(ts_codes, data_path="./data", test=False):# 核心數據加載邏輯combined_df["returns"] = combined_df.groupby("ts_code")["close"].pct_change().shift(-1)
- 數據源:本地存儲的Parquet文件(含復權處理)
- 關鍵處理:
- 跨股票數據合并與日期對齊
- 嚴格避免未來信息:按股票分組計算次日收益率
- 收益率截斷(±10%邊界)
- 輸出格式:帶時間戳的DataFrame,包含
open/high/low/close/vol
等原始字段
2.2 特征工程(Feature Engineering)
對應代碼:training.py
class OnlineFeatureEngineer:def generate_features(self, df):# 生成8大類技術指標self.feature_groups = {"Trend": ["MA20","EMA12","MACD"...],"Momentum": ["RSI","KDJ_K"...],...}
-
動態特征生成:
- 趨勢類(MA20, MACD, …)
- 動量類(RSI, KDJ, …)
- 波動率(布林帶, ATR, …)
- 成交量(OBV, MFI, …)
-
標準化策略:
self.scalers = {"Trend": RobustScaler(),"Momentum": MinMaxScaler(),"Volatility": RobustScaler(),... }
-
特征選擇:
selector = RandomForestRegressor(n_estimators=50) feature_selector = SelectFromModel(selector)
2.3 模型系統(Model System)
對應代碼:model_definition.py
class TransformerModel(nn.Module):def __init__(self, input_size, d_model=64, num_heads=4...):# Encoder-Only結構self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, dff) for _ in range(num_layers)])
-
核心架構:
- 可配置的Encoder層數(2-4層)
- 多頭注意力(4-8頭)
- 位置編碼使用正弦/余弦混合編碼
-
訓練機制:
criterion = nn.HuberLoss() optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) scheduler = ReduceLROnPlateau(optimizer, 'min', patience=5)
2.4 交易策略(Trading Strategy)
對應代碼:backtesting.py
class DualEMACrossoverStrategy:def generate_signals(self):ema_fast = pred_returns.ewm(span=fast_span).mean()ema_slow = pred_returns.ewm(span=slow_span).mean()position_size = np.tanh(pred_returns.abs()/volatility)
- 動態參數:
- 快線EMA周期:10-30日
- 慢線EMA周期:50-100日
- 倉位控制:
- 基于波動率的tanh函數非線性映射
- 最大倉位限制在30%-80%之間
2.5 回測引擎(Backtesting)
對應代碼:backtesting.py
class BacktestStrategy:def _configure_vbt(self):vbt.settings.portfolio["fees"] = 0.0025vbt.settings.portfolio["slippage"] = 0.0025
-
成本模型:
- 雙向0.25%傭金
- 0.25%滑點成本
-
執行機制:
Portfolio.from_signals(entries=signals.shift(1) == 1, # T+1信號執行size=position_size,size_type="percent" )
3. 系統流程
3.1 訓練流程序列圖
3.2 回測流程序列圖
3.3 流程關鍵點說明
-
訓練流程:
- 采用兩階段優化:特征選擇 → 超參優化
- 使用Optuna進行貝葉斯優化
- 支持斷點續訓的模型保存機制
-
回測流程:
- 策略參數動態搜索空間
- 基于波動率的非線性倉位控制
- 交易成本的雙向收取模型
- 結果可視化自動適配暗色主題
-
跨模塊協作:
- 特征工程與模型輸入的維度一致性保證
- 訓練/推理的設備自動適配
- 時間序列的嚴格對齊機制
4. 總結與優化建議
4.1 方案優勢
- 架構靈活性:模塊化設計支持策略快速迭代
- 計算效率:GPU加速訓練+VectorBT高效回測
- 風險控制:動態波動率調整+倉位限制(0.3-0.8)
4.2 優化方向
維度 | 當前實現 | 優化建議 |
---|---|---|
特征工程 | 8類技術指標 | 增加市場情緒數據(新聞輿情、資金流向) |
模型結構 | Encoder-only | 增加Decoder實現Seq2Seq預測 |
倉位策略 | 固定比例 | 引入強化學習動態調整 |
數據增強 | 原始序列 | 添加隨機時頻變換增強 |
風險控制 | 波動率約束 | 加入VaR壓力測試模塊 |
4.3 實踐建議
- 增量更新頻率:建議每月更新模型,每周更新特征分布
- 參數搜索空間:可擴展EMA參數范圍(快線5-50,慢線30-200)
- 硬件配置:推薦使用至少16GB顯存的GPU設備
- 回測驗證:建議采用Walk-Forward分析法,避免過擬合
5. 工程代碼
目錄結構:
data/
├── processed_600000.SH.parquet
├── processed_600036.SH.parquet
├── processed_600519.SH.parquet
├── processed_000001.SZ.parquet
models/
├── vectorbt_5_model.pth
├── vectorbt_5_preprocessors.pkl
src/
└── vectorbt_5/├── data_processing.py├── model_definition.py├── training.py├── backtesting.py├── main.py└── __init__.py
5.1 data_processing.py
import pandas as pddef load_data(ts_codes, data_path="./data"):"""加載預處理后的股票數據:param ts_code: 股票代碼(如["600000.SH", "600519.SH", "000001.SZ"]):param data_path: 數據存儲路徑):return: 合并后的DataFrame(含ts_code列標識股票)處理步驟:1. 讀取parquet格式的本地數據2. 轉換交易日期格式3. 計算次日收益率(目標變量)4. 刪除缺失值"""dfs = []for ts_code in ts_codes:file_path = f"{data_path}/processed_{ts_code}.parquet"df = pd.read_parquet(file_path)df["ts_code"] = ts_codedfs.append(df)combined_df = pd.concat(dfs)combined_df["trade_date"] = pd.to_datetime(combined_df["trade_date"], format="%Y%m%d")combined_df.set_index("trade_date", inplace=True)combined_df.sort_index(inplace=True)# 按股票分組計算收益率,避免跨股票計算,嚴格避免未來信息combined_df["returns"] = combined_df.groupby("ts_code", group_keys=False)["close"].apply(lambda x: x.pct_change().shift(-1).clip(-0.1, 0.1) # 添加收益率截斷)combined_df.dropna(inplace=True)return combined_df
5.2 model_definition.py
import mathimport torch
import torch.nn as nnclass MultiHeadAttention(nn.Module):"""多頭注意力機制。:param d_model: 輸入和輸出的維度:param num_heads: 注意力頭的數量"""def __init__(self, d_model, num_heads):super().__init__()self.num_heads = num_heads # 注意力頭的數量self.d_model = d_model # 輸入和輸出的維度self.depth = d_model // num_heads # 每個頭的維度self.wq = nn.Linear(d_model, d_model) # 查詢線性變換self.wk = nn.Linear(d_model, d_model) # 鍵線性變換self.wv = nn.Linear(d_model, d_model) # 值線性變換self.dense = nn.Linear(d_model, d_model) # 輸出線性變換def split_heads(self, x, batch_size):"""將輸入張量分割成多個頭。:param x: 輸入張量 (batch_size, seq_len, d_model):param batch_size: 批次大小:return: 分割后的張量 (batch_size, num_heads, seq_len, depth)"""x = x.view(batch_size, -1, self.num_heads, self.depth) # 將d_model維度拆分成num_heads和depthreturn x.permute(0, 2, 1, 3) # 調整維度順序為 (batch_size, num_heads, seq_len, depth)def forward(self, q, k, v):"""前向傳播函數。:param q: 查詢張量 (batch_size, seq_len, d_model):param k: 鍵張量 (batch_size, seq_len, d_model):param v: 值張量 (batch_size, seq_len, d_model):return: 輸出張量 (batch_size, seq_len, d_model) 和注意力權重 (batch_size, num_heads, seq_len, seq_len)"""batch_size = q.size(0) # 獲取批次大小q = self.wq(q) # 對查詢進行線性變換k = self.wk(k) # 對鍵進行線性變換v = self.wv(v) # 對值進行線性變換q = self.split_heads(q, batch_size) # 將查詢張量分割成多個頭k = self.split_heads(k, batch_size) # 將鍵張量分割成多個頭v = self.split_heads(v, batch_size) # 將值張量分割成多個頭# 計算注意力scaled_attention, attention_weights = self.scaled_dot_product_attention(q, k, v)scaled_attention = scaled_attention.permute(0, 2, 1, 3) # 調整維度順序為 (batch_size, seq_len, num_heads, depth)concat_attention = scaled_attention.reshape(batch_size, -1, self.d_model) # 合并頭,恢復原始維度output = self.dense(concat_attention) # 進行最終的線性變換return output, attention_weightsdef scaled_dot_product_attention(self, q, k, v):"""計算縮放點積注意力。:param q: 查詢張量 (batch_size, num_heads, seq_len, depth):param k: 鍵張量 (batch_size, num_heads, seq_len, depth):param v: 值張量 (batch_size, num_heads, seq_len, depth):return: 注意力輸出 (batch_size, num_heads, seq_len, depth) 和注意力權重 (batch_size, num_heads, seq_len, seq_len)"""matmul_qk = torch.matmul(q, k.transpose(-1, -2)) # 計算Q和K的點積dk = torch.tensor(k.size(-1), dtype=torch.float32) # 獲取深度dkscaled_attention_logits = matmul_qk / torch.sqrt(dk) # 縮放點積attention_weights = torch.softmax(scaled_attention_logits, dim=-1) # 計算注意力權重output = torch.matmul(attention_weights, v) # 應用注意力權重到值上return output, attention_weightsclass EncoderLayer(nn.Module):"""編碼器層。:param d_model: 輸入和輸出的維度:param num_heads: 注意力頭的數量:param dff: 前饋神經網絡的中間維度:param dropout: dropout 概率,默認為 0.1"""def __init__(self, d_model, num_heads, dff, dropout=0.1):super().__init__()self.mha = MultiHeadAttention(d_model, num_heads) # 多頭注意力機制self.ffn = nn.Sequential(nn.Linear(d_model, dff), # 線性變換到dff維度nn.GELU(), # GELU激活函數nn.Linear(dff, d_model), # 線性變換回d_model維度)self.layer_norm1 = nn.LayerNorm(d_model) # 第一個層歸一化self.layer_norm2 = nn.LayerNorm(d_model) # 第二個層歸一化self.dropout = nn.Dropout(dropout) # Dropout層def forward(self, x):"""前向傳播函數。:param x: 輸入張量 (batch_size, seq_len, d_model):return: 輸出張量 (batch_size, seq_len, d_model)"""# 多頭注意力attn_output, _ = self.mha(x, x, x) # 計算多頭注意力attn_output = self.dropout(attn_output) # 應用dropoutout1 = self.layer_norm1(x + attn_output) # 殘差連接和層歸一化# 前饋神經網絡ffn_output = self.ffn(out1) # 前饋神經網絡ffn_output = self.dropout(ffn_output) # 應用dropoutout2 = self.layer_norm2(out1 + ffn_output) # 殘差連接和層歸一化return out2class DecoderLayer(nn.Module):"""解碼器層。:param d_model: 輸入和輸出的維度:param num_heads: 注意力頭的數量:param dff: 前饋神經網絡的中間維度:param dropout: dropout 概率,默認為 0.1"""def __init__(self, d_model, num_heads, dff, dropout=0.1):super().__init__()self.mha1 = MultiHeadAttention(d_model, num_heads) # 掩碼多頭注意力self.mha2 = MultiHeadAttention(d_model, num_heads) # 編碼器-解碼器注意力self.ffn = nn.Sequential(nn.Linear(d_model, dff), # 線性變換到dff維度nn.GELU(), # GELU激活函數nn.Linear(dff, d_model), # 線性變換回d_model維度)self.layer_norm1 = nn.LayerNorm(d_model) # 第一個層歸一化self.layer_norm2 = nn.LayerNorm(d_model) # 第二個層歸一化self.layer_norm3 = nn.LayerNorm(d_model) # 第三個層歸一化self.dropout = nn.Dropout(dropout) # Dropout層def forward(self, x, enc_output):"""前向傳播函數。:param x: 輸入張量 (batch_size, seq_len, d_model):param enc_output: 編碼器輸出 (batch_size, src_seq_len, d_model):return: 輸出張量 (batch_size, seq_len, d_model) 和兩個注意力權重"""# 掩碼多頭注意力attn1, attn_weights1 = self.mha1(x, x, x) # 計算掩碼多頭注意力attn1 = self.dropout(attn1) # 應用dropoutout1 = self.layer_norm1(x + attn1) # 殘差連接和層歸一化# 編碼器-解碼器注意力attn2, attn_weights2 = self.mha2(out1, enc_output, enc_output) # 計算編碼器-解碼器注意力attn2 = self.dropout(attn2) # 應用dropoutout2 = self.layer_norm2(out1 + attn2) # 殘差連接和層歸一化# 前饋神經網絡ffn_output = self.ffn(out2) # 前饋神經網絡ffn_output = self.dropout(ffn_output) # 應用dropoutout3 = self.layer_norm3(out2 + ffn_output) # 殘差連接和層歸一化return out3, attn_weights1, attn_weights2class PositionalEncoding(nn.Module):"""位置編碼。:param d_model: 輸入和輸出的維度:param max_len: 最大序列長度,默認為5000:param dropout: dropout 概率,默認為 0.1"""def __init__(self, d_model, max_len=5000, dropout=0.1):super().__init__()self.dropout = nn.Dropout(dropout) # Dropout層pe = torch.zeros(max_len, d_model) # 初始化位置編碼position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # 位置索引div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # 用于計算正弦和余弦的位置項pe[:, 0::2] = torch.sin(position * div_term) # 正弦位置編碼pe[:, 1::2] = torch.cos(position * div_term) # 余弦位置編碼pe = pe.unsqueeze(0) # 增加批次維度self.register_buffer("pe", pe) # 注冊位置編碼為緩沖區def forward(self, x):"""前向傳播函數。:param x: 輸入張量 (batch_size, seq_len, d_model):return: 添加了位置編碼的張量 (batch_size, seq_len, d_model)"""x = x + self.pe[:, : x.size(1)] # 添加位置編碼return self.dropout(x) # 應用dropoutclass TransformerModel(nn.Module):"""Transformer模型。:param input_size: 輸入特征的維度:param d_model: 輸入和輸出的維度:param num_heads: 注意力頭的數量:param num_layers: 編碼器層數:param dff: 前饋神經網絡的中間維度:param dropout: dropout 概率,默認為 0.1"""def __init__(self, input_size, d_model, num_heads, num_layers, dff, dropout=0.1):super().__init__()self.embedding = nn.Linear(input_size, d_model) # 輸入嵌入self.pos_encoding = PositionalEncoding(d_model, dropout=dropout) # 位置編碼self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, dff, dropout)for _ in range(num_layers)] # 編碼器層)self.fc = nn.Linear(d_model, 1) # 全連接層def forward(self, x):"""前向傳播函數。:param x: 輸入張量 (batch_size, seq_len, input_size):return: 輸出張量 (batch_size, 1)"""x = self.embedding(x) # 輸入嵌入x = self.pos_encoding(x) # 位置編碼for enc_layer in self.encoder_layers:x = enc_layer(x) # 通過每個編碼器層x = self.fc(x.mean(dim=1)) # 使用全局平均池化并進行最終線性變換return x
5.3 training.py
import osimport joblib
import numpy as np
import optuna
import pandas as pd
import torch
import torch.nn as nn
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectFromModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdmfrom vectorbt_5.model_definition import TransformerModelclass SingleWindowDataset(Dataset):def __init__(self, X, y):"""單窗口數據集類,用于將時間序列數據轉換為PyTorch數據集格式。:param X: 滑動窗口特征數據:param y: 目標變量"""self.X = X.astype(np.float32) # 將滑動窗口特征數據轉換為float32類型self.y = y.astype(np.float32) # 將目標變量轉換為float32類型def __len__(self):return len(self.y) # 返回目標變量的長度def __getitem__(self, idx):x = torch.from_numpy(self.X[idx]).float() # 將滑動窗口特征數據轉換為PyTorch張量label = torch.tensor(self.y[idx], dtype=torch.float32) # 將目標變量轉換為PyTorch張量return x, label # 返回特征和標簽class TrainingStateManager:def __init__(self, model_dir="./models"):"""訓練狀態管理器類,負責模型和預處理器的保存和加載。:param model_dir: 模型保存目錄,defaults to "./models""""self.model_dir = model_dir # 設置模型保存目錄self.model_path = f"{self.model_dir}/vectorbt_5_model.pth" # 設置模型文件路徑self.preprocessors_path = (f"{self.model_dir}/vectorbt_5_preprocessors.pkl" # 設置預處理器文件路徑)os.makedirs(model_dir, exist_ok=True) # 創建模型保存目錄(如果不存在)def save(self, model, feature_engineer, feature_selector, config):"""保存模型和預處理器。:param model: 訓練好的模型:param feature_engineer: 特征工程對象:param feature_selector: 特征選擇器:param config: 模型配置"""torch.save(model.state_dict(), self.model_path) # 保存模型參數joblib.dump({"feature_engineer": feature_engineer,"feature_selector": feature_selector,"config": config,},self.preprocessors_path,) # 保存預處理器和配置print(f"Model saved to {self.model_path}") # 打印模型保存路徑print(f"Preprocessor saved to {self.preprocessors_path}") # 打印預處理器保存路徑def load(self, device):"""加載模型和預處理器。:param device: 計算設備(CPU/GPU):return: 加載的模型、特征工程對象、特征選擇器和模型配置"""model = None # 初始化模型feature_engineer = None # 初始化特征工程對象feature_selector = None # 初始化特征選擇器config = None # 初始化配置if os.path.exists(self.preprocessors_path): # 如果預處理器文件存在preprocess = joblib.load(self.preprocessors_path) # 加載預處理器feature_engineer = preprocess["feature_engineer"] # 獲取特征工程對象feature_selector = preprocess["feature_selector"] # 獲取特征選擇器config = preprocess["config"] # 獲取配置if os.path.exists(self.model_path): # 如果模型文件存在model = TransformerModel(input_size=config["input_size"],d_model=config["d_model"],num_heads=config["num_heads"],num_layers=config["num_layers"],dff=config["dff"],dropout=config["dropout"],).to(device)model.load_state_dict(torch.load(self.model_path, weights_only=False, map_location=device)) # 加載模型參數return (model,feature_engineer,feature_selector,config,) # 返回加載的模型、特征工程對象、特征選擇器和配置class OnlineFeatureEngineer:def __init__(self, windows=[5]):"""在線特征工程生成器類,用于生成技術指標特征并進行標準化。:param windows: 滑動窗口列表(用于特征生成),defaults to [5]"""self.windows = windows # 設置滑動窗口列表self.n_features = 10 # 設置特征數量self.scalers = {"Trend": RobustScaler(), # 趨勢類指標使用RobustScaler"Momentum": MinMaxScaler(), # 動量類指標使用MinMaxScaler"Volatility": RobustScaler(), # 波動率類指標使用RobustScaler"Volume": StandardScaler(), # 成交量類指標使用StandardScaler# "Sentiment": StandardScaler(), # 市場情緒類指標使用StandardScaler"SupportResistance": MinMaxScaler(), # 支撐阻力類指標使用MinMaxScaler"Statistical": StandardScaler(), # 統計類指標使用StandardScaler"Composite": RobustScaler(), # 復合型指標使用RobustScaler}# "price": ["open", "high", "low", "close"],self.feature_groups = {# Trend-Following Indicators 趨勢類指標"Trend": ["MA20", "EMA12", "MACD", "ADX", "SAR"],# Momentum Indicators 動量類指標"Momentum": ["RSI", "KDJ_K", "KDJ_D", "KDJ_J", "CCI", "WILLR"],# Volatility Indicators 波動率類指標"Volatility": ["BB_upper", "BB_middle", "BB_lower", "ATR", "STD20"],# Volume Indicators 成交量類指標"Volume": ["OBV", "MFI"],# Market Sentiment Indicators 市場情緒類指標 (需要外部數據)# "Sentiment": [],# Support/Resistance Indicators 支撐阻力類指標"SupportResistance": ["Fib_0.382", "Fib_0.618", "Pivot"],# Statistical Indicators 統計類指標"Statistical": ["LR_slope", "LR_angle"],# Composite Indicators 復合型指標"Composite": ["Ichimoku_tenkan","Ichimoku_kijun","Ichimoku_senkou_a","Ichimoku_senkou_b","Ichimoku_chikou",],}self.all_features = [f for sublist in self.feature_groups.values() for f in sublist] # 生成所有特征列表def partial_fit(self, new_df):"""對新數據進行部分擬合。:param new_df: 新數據DataFrame"""new_features = self.generate_features(new_df, refit=True) # 生成新數據的特征for group, features in self.feature_groups.items(): # 遍歷每個特征組if hasattr(self.scalers[group], "partial_fit"): # 如果該縮放器支持部分擬合self.scalers[group].partial_fit(new_features[features]) # 對新數據進行部分擬合def generate_features(self, df, refit=False):"""生成技術指標特征。:param df: 原始數據DataFrame:param refit: 是否重新擬合標準化器,defaults to False:return: 特征DataFrame生成8大類技術指標:1. 趨勢類指標(MA, MACD等)2. 動量類指標(RSI, KDJ等)3. 波動率指標(布林帶, ATR等)4. 成交量指標(OBV, MFI等)5. 市場情緒類指標 (需要外部數據) -- 忽略6. 支撐阻力指標(斐波那契回撤等)7. 統計指標(線性回歸斜率等)8. 復合指標(Ichimoku云圖等)"""processed = [] # 初始化處理后的特征列表for group, features in self.feature_groups.items(): # 遍歷每個特征組scaler = self.scalers[group] # 獲取對應的縮放器if not refit: # 如果不重新擬合scaler.fit(df[features]) # 擬合縮放器scaled = scaler.transform(df[features]) # 標準化特征processed.append(scaled) # 添加到處理后的特征列表processed_df = pd.DataFrame(np.hstack(processed), index=df.index, columns=self.all_features) # 將處理后的特征合并為DataFrameprocessed_df["ts_code"] = df["ts_code"] # 添加股票代碼processed_df["returns"] = df["returns"] # 添加收益return processed_df.dropna() # 刪除缺失值def feature_selection(self, X, y):"""進行特征選擇。:param X: 特征數據:param y: 目標變量:return: 選擇后的特征數據"""selector = RandomForestRegressor(n_estimators=50, n_jobs=-1) # 初始化隨機森林回歸器selector.fit(X, y) # 擬合隨機森林回歸器feature_selector = SelectFromModel(selector, prefit=True) # 初始化特征選擇器return feature_selector # 返回特征選擇器class IncrementalDataHandler:def __init__(self, feature_engineer, feature_selector, window_size=5):"""增量數據處理器類,用于處理增量數據并生成滑動窗口數據。:param feature_engineer: 特征工程對象:param feature_selector: 特征選擇器:param window_size: 滑動窗口大小,defaults to 5"""self.feature_engineer = feature_engineer # 設置特征工程對象self.feature_selector = feature_selector # 設置特征選擇器self.window_size = window_size # 設置滑動窗口大小self.buffer = pd.DataFrame() # 初始化數據緩沖區def update_buffer(self, new_df):"""更新數據緩沖區。:param new_df: 新數據DataFrame"""self.buffer = pd.concat([self.buffer, new_df]).sort_index() # 更新數據緩沖區并排序def prepare_incremental_data(self):"""準備增量數據。:return: 訓練數據和測試數據"""processed_df = self.feature_engineer.generate_features(self.buffer) # 生成特征X_selected = self.feature_selector.transform(processed_df[self.feature_engineer.all_features].values) # 選擇特征X, y = self.sliding_window(processed_df, X_selected) # 生成滑動窗口數據X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=False) # 劃分訓練集和測試集return (X_train, y_train), (X_test, y_test) # 返回訓練集和測試集def sliding_window(self, df, features):"""生成時序滑動窗口數據"""sequences = [] # 初始化序列列表labels = [] # 初始化標簽列表for ts_code, group in df.groupby("ts_code"): # 按股票代碼分組group_features = features[df["ts_code"] == ts_code] # 獲取該股票的特征for i in range(self.window_size, len(group)): # 生成滑動窗口sequences.append(group_features[i - self.window_size : i]) # 添加滑動窗口特征labels.append(group["returns"].iloc[i]) # 添加標簽return np.array(sequences, dtype=np.float32), np.array(labels, dtype=np.float32) # 返回滑動窗口特征和標簽class IncrementalTrainer:def __init__(self, device, window_size=5):"""增量訓練控制器類,負責模型的初始訓練和增量更新。:param device: 計算設備(CPU/GPU)"""self.device = device # 設置計算設備self.window_size = window_size # 設置滑動窗口大小self.state_manager = TrainingStateManager() # 初始化訓練狀態管理器self.model = None # 初始化模型self.feature_engineer = None # 初始化特征工程對象self.feature_selector = None # 初始化特征選擇器self.config = None # 初始化配置self.load_state() # 加載狀態def load_state(self):"""加載模型和預處理器。"""(self.model,self.feature_engineer,self.feature_selector,self.config,) = self.state_manager.load(self.device) # 加載模型、特征工程對象、特征選擇器和配置def initial_train(self, df):"""初始訓練模型。:param df: 原始數據DataFrame"""# 特征生成self.feature_engineer = OnlineFeatureEngineer(windows=[self.window_size]) # 初始化特征工程對象processed_df = self.feature_engineer.generate_features(df) # 生成特征# 特征選擇X, y = processed_df[self.feature_engineer.all_features], df["returns"].reindex(processed_df.index) # 準備特征和標簽self.feature_selector = self.feature_engineer.feature_selection(X, y) # 選擇特征# 準備訓練數據data_handler = IncrementalDataHandler(feature_engineer=self.feature_engineer,feature_selector=self.feature_selector,window_size=self.window_size,) # 初始化增量數據處理器data_handler.buffer = df # 更新數據緩沖區(X_train, y_train), (X_test, y_test) = (data_handler.prepare_incremental_data()) # 準備訓練數據# Optuna超參優化self._optimize_parameters(X_train, y_train, X_test, y_test) # 優化超參數# 最佳模型訓練print(f"Initial Model Config: {self.config}") # 打印初始模型配置self._train(X_train, y_train) # 訓練模型print("Initial Model Evaluation:") # 打印初始模型評估self._evaluate(X_test, y_test) # 評估模型print("Initial Model Save:") # 打印初始模型保存self.state_manager.save(self.model, self.feature_engineer, self.feature_selector, self.config) # 保存模型def incremental_update(self, new_df):"""增量更新模型。:param new_df: 新數據DataFrame"""if not self.model:print("The model does not exist, please train it first.") # 如果模型不存在,提示先訓練模型returnself.feature_engineer.partial_fit(new_df) # 對新數據進行部分擬合data_handler = IncrementalDataHandler(self.feature_engineer, self.feature_selector) # 初始化增量數據處理器data_handler.update_buffer(new_df) # 更新數據緩沖區(X_train, y_train), (X_test, y_test) = (data_handler.prepare_incremental_data()) # 準備增量數據print(f"Incremental Model Config: {self.config}") # 打印增量模型配置self._train(X_train, y_train) # 訓練模型print("\nIncremental Model Evaluation:") # 打印增量模型評估self._evaluate(X_test, y_test) # 評估模型print("\nIncremental Model Save:") # 打印增量模型保存self.state_manager.save(self.model, self.feature_engineer, self.feature_selector, self.config) # 保存模型def _optimize_parameters(self, X_train, y_train, X_test, y_test):"""Optuna超參優化:param X_train: 訓練集滑動窗口特征數據:param y_train: 訓練集目標變量:param X_test: 測試集滑動窗口特征數據:param y_test: 測試集目標變量"""def objective(trial):self.config = {"num_heads": trial.suggest_categorical("num_heads", [4, 8]),"d_model": trial.suggest_int("d_model", 64, 256, step=32), # 以32為步長"num_layers": trial.suggest_int("num_layers", 2, 4),"dff": trial.suggest_int("dff", 128, 512, step=64),"dropout": trial.suggest_float("dropout", 0.1, 0.3),"lr": trial.suggest_float("lr", 1e-4, 1e-3, log=True),"input_size": X_train[0].shape[-1], # 輸入特征維度"batch_size": trial.suggest_categorical("batch_size", [32, 64, 128]),"epochs": 100, # 訓練輪數"window_size": self.window_size, # 滑動窗口大小}self._train(X_train, y_train) # 訓練模型val_loss = self._evaluate(X_test, y_test) # 評估模型print(f"Val Loss: {val_loss:.4f}") # 打印驗證損失return val_loss # 返回驗證損失# 超參優化study = optuna.create_study(direction="minimize") # 創建研究study.optimize(objective, n_trials=10, show_progress_bar=True) # 優化目標函數print(f"Training Best params: {study.best_params}") # 打印最佳參數# 最佳模型參數self.config.update(study.best_params) # 更新配置def _train(self, X_train, y_train):"""模型訓練內部方法。:param X_train: 訓練集滑動窗口特征數據:param y_train: 訓練集目標變量"""print(f"Training Model Config: {self.config}") # 打印模型配置num_heads = self.config.get("num_heads", 8)d_model = self.config.get("d_model", 64)d_model = (d_model // num_heads) * num_headsnum_layers = self.config.get("num_layers", 3)dff = self.config.get("dff", 256)dropout = self.config.get("dropout", 0.1)lr = self.config.get("lr", 1e-4)input_size = self.config.get("input_dim", X_train[0].shape[-1]) # 獲取輸入特征維度epochs = self.config.get("epochs", 100) # 獲取訓練輪數batch_size = self.config.get("batch_size", 128)# 強制維度約束if d_model % num_heads != 0:d_model = (d_model // num_heads) * num_heads # 自動調整為最近的可整除數dataset = SingleWindowDataset(X_train, y_train) # 初始化數據集loader = DataLoader(dataset,batch_size=batch_size,shuffle=False,) # 初始化數據加載器# 初始化模型self.model = TransformerModel(input_size, d_model, num_heads, num_layers, dff, dropout).to(self.device)optimizer = torch.optim.Adam(self.model.parameters(), lr=lr) # 初始化優化器scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, "min", patience=5) # 初始化學習率調度器criterion = nn.HuberLoss() # 初始化損失函數# 訓練循環for epoch in tqdm(range(epochs), desc="Training"): # 進行訓練self.model.train() # 設置模型為訓練模式total_loss = 0 # 初始化總損失for X_batch, y_batch in loader: # 遍歷數據加載器X_batch = X_batch.to(self.device) # 將特征數據移動到指定設備y_batch = y_batch.to(self.device).unsqueeze(1) # 將標簽數據移動到指定設備并擴展維度optimizer.zero_grad() # 清零梯度preds = self.model(X_batch) # 前向傳播loss = criterion(preds, y_batch) # 計算損失loss.backward() # 反向傳播nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) # 梯度裁剪optimizer.step() # 更新參數total_loss += loss.item() # 累加損失# 學習率調整avg_loss = total_loss / len(loader) # 計算平均損失scheduler.step(avg_loss) # 更新學習率def _evaluate(self, X_test, y_test):"""模型評估內部方法。:param X_test: 測試集滑動窗口特征數據:param y_test: 測試集目標變量:return: 平均損失"""test_dataset = SingleWindowDataset(X_test, y_test) # 初始化測試數據集test_loader = DataLoader(test_dataset,batch_size=128,shuffle=False,) # 初始化測試數據加載器self.model.eval() # 設置模型為評估模式total_loss = 0 # 初始化總損失criterion = nn.HuberLoss() # 初始化損失函數with torch.no_grad(): # 關閉梯度計算for X_batch, y_batch in test_loader: # 遍歷測試數據加載器X_batch = X_batch.to(self.device) # 將特征數據移動到指定設備y_batch = y_batch.to(self.device).unsqueeze(1) # 將標簽數據移動到指定設備并擴展維度preds = self.model(X_batch) # 前向傳播loss = criterion(preds, y_batch) # 計算損失total_loss += loss.item() * len(y_batch) # 累加損失test_loss = total_loss / len(test_dataset) # 計算平均損失print(f"Test Loss: {test_loss}") # 打印測試損失return test_loss # 返回測試損失
5.4 backtesting.py
import numpy as np
import optuna
import pandas as pd
import torch
import vectorbt as vbtclass DualEMACrossoverStrategy:def __init__(self, pred_returns, volatility, params):"""雙EMA交叉策略類。:param pred_returns: 預測的收益率序列:param volatility: 波動率序列(用于倉位計算):param params: 參數字典,包含快慢EMA的時間跨度"""self.pred_returns = pred_returns # 預測的收益率序列self.volatility = volatility.clip(lower=0.01) # 防止波動率為0,導致除零錯誤self.fast_span = params["fast_span"] # 快速EMA的時間跨度self.slow_span = params["slow_span"] # 慢速EMA的時間跨度def generate_signals(self):"""生成交易信號。:return: (交易信號, 倉位大小)"""ema_fast = self.pred_returns.ewm(span=self.fast_span, min_periods=self.fast_span).mean() # 計算快速EMAema_slow = self.pred_returns.ewm(span=self.slow_span, min_periods=self.slow_span).mean() # 計算慢速EMAsignals = pd.Series(0, index=self.pred_returns.index) # 初始化信號序列signals[(ema_fast > ema_slow) & (ema_fast.shift(1) <= ema_slow.shift(1))] = (1 # 買入信號)signals[(ema_fast < ema_slow) & (ema_fast.shift(1) >= ema_slow.shift(1))] = (-1) # 賣出信號# 使用tanh函數壓縮倉位大小,實現非線性映射position_size = np.tanh(self.pred_returns.abs() / self.volatility).clip(0.3, 0.8)return signals, position_size # 返回信號和倉位大小class BacktestStrategy:def __init__(self, model, device):"""回測執行引擎。:param model: 訓練好的預測模型:param device: 計算設備(CPU/GPU)"""self.model = model # 訓練好的預測模型self.device = device # 計算設備self._configure_vbt() # 配置VectorBT全局參數def _configure_vbt(self):"""配置VectorBT全局參數。"""vbt.settings.array_wrapper["freq"] = "D" # 設置時間頻率為日頻vbt.settings.plotting["layout"]["template"] = "vbt_dark" # 使用暗色主題vbt.settings.plotting["layout"]["width"] = 1200 # 設置圖表寬度vbt.settings.portfolio["init_cash"] = 100000.0 # 初始資金10萬元vbt.settings.portfolio["fees"] = 0.0025 # 交易成本(手續費)0.25%vbt.settings.portfolio["slippage"] = 0.0025 # 交易成本(滑點)0.25%def _optimize_parameters(self, result_df):"""優化參數邏輯調整。:param result_df: 包含預測收益率的結果DataFrame:return: 最優參數"""def objective(trial):params = {"fast_span": trial.suggest_int("fast_span", 10, 30), # 建議快速EMA的時間跨度"slow_span": trial.suggest_int("slow_span", 50, 100), # 建議慢速EMA的時間跨度}strategy = DualEMACrossoverStrategy(pred_returns=result_df["pred_returns"],volatility=result_df["volatility"],params=params,) # 創建策略實例signals, position_size = strategy.generate_signals() # 生成交易信號pf = vbt.Portfolio.from_signals(close=result_df["close"],entries=signals.shift(1) == 1, # 買入信號exits=signals.shift(1) == -1, # 賣出信號size=position_size, # 固定倉位freq="D",)return pf.total_profit() # 返回總利潤study = optuna.create_study(direction="maximize") # 創建Optuna研究study.optimize(objective, n_trials=10, show_progress_bar=True) # 優化參數print(f"Strategy Best params: {study.best_params}") # 打印最優參數return study.best_params # 返回最優參數def run(self, test_data, df):"""執行完整回測流程。:param test_data: 測試數據集元組(X_test, y_test):param df: 原始數據DataFrame:return: (組合對象, 結果DataFrame)"""X_test = test_data # 測試數據self.model.eval() # 將模型設置為評估模式with torch.no_grad(): # 禁用梯度計算test_tensor = torch.FloatTensor(X_test).to(self.device) # 轉換為Tensor并移動到指定設備preds = (self.model(test_tensor).cpu().numpy().flatten()) # 獲取預測值并轉換為NumPy數組test_dates = df.index[-len(preds) :] # 獲取測試日期result_df = pd.DataFrame({"close": df["close"].values[-len(preds) :],"pred_returns": preds, # 使用rolling窗口計算動態波動率"volatility": df["ATR"].values[-len(preds) :]/ df["close"].values[-len(preds) :],},index=test_dates,) # 創建結果DataFramebest_params = self._optimize_parameters(result_df) # 運行參數優化strategy = DualEMACrossoverStrategy(pred_returns=result_df["pred_returns"],volatility=result_df["volatility"],params=best_params,) # 創建策略實例signals, position_size = strategy.generate_signals() # 生成交易信號return vbt.Portfolio.from_signals(close=result_df["close"],entries=signals == 1, # 買入信號exits=signals == -1, # 賣出信號size=position_size,size_type="percent",freq="D",) # 執行組合回測
5.5 main.py
import randomimport numpy as np
import torchfrom vectorbt_5.backtesting import BacktestStrategy
from vectorbt_5.data_processing import load_data
from vectorbt_5.training import IncrementalTrainer, TrainingStateManagerdef set_random_seed(seed=42):"""設置全局隨機種子:param seed: 隨機種子, defaults to 42影響范圍:- Python內置隨機模塊- Numpy隨機數生成- PyTorch CPU/CUDA隨機種子"""random.seed(seed)np.random.seed(seed)torch.manual_seed(seed)if torch.cuda.is_available():torch.cuda.manual_seed(seed)torch.cuda.manual_seed_all(seed) # 如果使用多個GPUtorch.backends.cudnn.deterministic = Truetorch.backends.cudnn.benchmark = Falsedef prepare_backtest_data(ts_code, device):"""準備回測數據。:param ts_code: 股票代碼:param device: 計算設備(CPU/GPU):return: 滑動窗口特征數據、原始數據DataFrame、模型、特征工程對象、特征選擇器和配置"""state_manager = TrainingStateManager() # 初始化訓練狀態管理器model, feature_engineer, feature_selector, config = state_manager.load(device) # 加載模型和預處理器print(f"Model Config: {config}") # 打印模型配置# 加載并處理數據test_df = load_data([ts_code]) # 加載數據test_df = test_df[-300:] # 取最近300條數據processed_df = feature_engineer.generate_features(test_df) # 生成特征X_selected = feature_selector.transform(processed_df[feature_engineer.all_features].values) # 選擇特征# 構建滑動窗口window_size = config["window_size"] # 獲取滑動窗口大小sequences = [] # 初始化序列列表for i in range(window_size, len(X_selected)): # 遍歷數據sequences.append(X_selected[i - window_size : i]) # 添加滑動窗口特征return (np.array(sequences, dtype=np.float32), # 返回滑動窗口特征數據test_df, # 返回原始數據DataFramemodel, # 返回模型feature_engineer, # 返回特征工程對象feature_selector, # 返回特征選擇器config, # 返回配置)if __name__ == "__main__":# 設置隨機種子# 函數確保了整個訓練過程的可重復性。# 通過設置相同的隨機種子,可以保證每次運行時生成的隨機數序列一致,這對于調試和實驗驗證非常重要。set_random_seed()# 檢測可用計算設備(優先使用CUDA)device = torch.device("cuda"if torch.cuda.is_available()else "mps" if torch.backends.mps.is_available() else "cpu")# 股票行情# start_date = "20180101"# end_date = "20241231"trainer = IncrementalTrainer(device)# 多股票初始訓練示例# 浦發銀行(600000.SH)# 招商銀行(600036.SH)# 平安銀行(000001.SZ)train_codes = ["600000.SH", "600036.SH", "000001.SZ"]train_df = load_data(train_codes)model = trainer.initial_train(train_df)# 增量訓練示例# 貴州茅臺(600519.SH)# new_df = load_data(["600519.SH"])# model = trainer.incremental_update(new_df)# 單股票回測(test_data, test_df, model, feature_engineer, feature_selector, config) = (prepare_backtest_data("600036.SH", device))backtester = BacktestStrategy(model, device)pf = backtester.run(test_data, test_df)print("回測結果統計:")print(pf.stats())pf.plot().show()
風險提示與免責聲明
本文內容基于公開信息研究整理,不構成任何形式的投資建議。歷史表現不應作為未來收益保證,市場存在不可預見的波動風險。投資者需結合自身財務狀況及風險承受能力獨立決策,并自行承擔交易結果。作者及發布方不對任何依據本文操作導致的損失承擔法律責任。市場有風險,投資須謹慎。