1. 項目介紹
在當今數字化時代,金融市場的數據分析和預測已經成為投資決策的重要依據。本文將詳細介紹一個基于Python的股票預測分析系統,該系統利用機器學習算法對歷史股票數據進行分析,并預測未來股票價格走勢,為投資者提供決策支持。
1.1 項目背景
股票市場充滿不確定性,傳統的技術分析和基本面分析方法往往依賴于人為判斷,存在主觀性強、效率低等問題。隨著機器學習技術的發展,利用算法對海量歷史數據進行分析,挖掘其中的規律和模式,已經成為可能。本項目旨在構建一個完整的股票預測分析系統,集成數據采集、預處理、特征工程、模型訓練與評估、預測可視化等功能,為投資決策提供科學依據。
1.2 項目目標
- 構建一個完整的股票數據采集與預處理流程
- 實現多種機器學習模型用于股票價格預測
- 提供直觀的數據可視化和分析工具
- 開發用戶友好的接口,便于投資者使用
- 評估不同模型的預測性能,提供最優預測結果
1.3 技術棧
- 編程語言:Python 3.8+
- 數據處理:Pandas, NumPy
- 機器學習框架:Scikit-learn, TensorFlow, Keras
- 深度學習模型:LSTM, GRU, Transformer
- 數據可視化:Matplotlib, Seaborn, Plotly
- Web接口:Flask, Streamlit
- 數據存儲:SQLite, MongoDB
- API調用:yfinance, alpha_vantage
2. 系統架構
本系統采用模塊化設計,包含以下核心組件:
2.1 系統架構圖
+------------------------+ +------------------------+ +------------------------+
| | | | | |
| 數據采集模塊 | | 數據預處理模塊 | | 特征工程模塊 |
| | | | | |
+------------------------+ +------------------------+ +------------------------+| | |v v v
+------------------------+ +------------------------+ +------------------------+
| | | | | |
| 模型訓練模塊 | <- | 特征選擇模塊 | <- | 數據存儲模塊 |
| | | | | |
+------------------------+ +------------------------+ +------------------------+| ^ ^v | |
+------------------------+ +------------------------+ +------------------------+
| | | | | |
| 預測評估模塊 | -> | 結果可視化模塊 | -> | 用戶接口模塊 |
| | | | | |
+------------------------+ +------------------------+ +------------------------+
2.2 模塊功能說明
- 數據采集模塊:負責從各種數據源獲取股票歷史數據,包括價格、交易量、財務指標等
- 數據預處理模塊:對原始數據進行清洗、標準化、去噪等處理
- 特征工程模塊:構建預測模型所需的特征,包括技術指標、統計特征等
- 數據存儲模塊:將處理后的數據存儲到數據庫中,便于后續分析
- 特征選擇模塊:從眾多特征中選擇最具預測能力的特征子集
- 模型訓練模塊:實現多種機器學習算法,訓練預測模型
- 預測評估模塊:評估模型性能,生成預測結果
- 結果可視化模塊:將預測結果以圖表形式展示
- 用戶接口模塊:提供友好的用戶界面,便于用戶操作和查看結果
3. 數據采集與預處理
3.1 數據來源
本系統支持多種數據來源,主要包括:
-
公開API:
- Yahoo Finance (yfinance)
- Alpha Vantage
- Quandl
- Tushare (針對中國股市)
-
CSV文件導入:支持用戶上傳自定義格式的CSV文件
-
數據庫導入:支持從SQLite、MongoDB等數據庫導入數據
3.2 數據采集實現
以下是使用yfinance庫獲取股票數據的示例代碼:
import yfinance as yf
import pandas as pd
from datetime import datetime, timedeltaclass StockDataCollector:def __init__(self):self.data = Nonedef collect_data(self, ticker, start_date, end_date=None, interval='1d'):"""從Yahoo Finance獲取股票歷史數據參數:ticker (str): 股票代碼,如'AAPL'、'MSFT'start_date (str): 起始日期,格式'YYYY-MM-DD'end_date (str): 結束日期,格式'YYYY-MM-DD',默認為當前日期interval (str): 數據間隔,可選'1d'(日),'1wk'(周),'1mo'(月)返回:pandas.DataFrame: 包含股票歷史數據的DataFrame"""if end_date is None:end_date = datetime.now().strftime('%Y-%m-%d')try:stock = yf.Ticker(ticker)self.data = stock.history(start=start_date, end=end_date, interval=interval)print(f"成功獲取{ticker}從{start_date}到{end_date}的歷史數據")return self.dataexcept Exception as e:print(f"獲取數據時出錯: {e}")return Nonedef save_to_csv(self, file_path):"""將數據保存為CSV文件"""if self.data is not None:self.data.to_csv(file_path)print(f"數據已保存至{file_path}")else:print("沒有數據可保存")def get_stock_info(self, ticker):"""獲取股票基本信息"""try:stock = yf.Ticker(ticker)info = stock.inforeturn infoexcept Exception as e:print(f"獲取股票信息時出錯: {e}")return None
3.3 數據預處理
原始股票數據通常包含缺失值、異常值等問題,需要進行預處理:
class StockDataPreprocessor:def __init__(self, data=None):self.data = datadef load_data(self, data):"""加載數據"""self.data = datareturn selfdef handle_missing_values(self, method='ffill'):"""處理缺失值"""if self.data is None:print("沒有數據可處理")return selfif method == 'ffill':self.data = self.data.fillna(method='ffill')elif method == 'bfill':self.data = self.data.fillna(method='bfill')elif method == 'drop':self.data = self.data.dropna()elif method == 'mean':self.data = self.data.fillna(self.data.mean())return selfdef remove_outliers(self, columns, method='zscore', threshold=3):"""移除異常值"""if self.data is None:print("沒有數據可處理")return selfif method == 'zscore':for col in columns:if col in self.data.columns:mean = self.data[col].mean()std = self.data[col].std()self.data = self.data[(self.data[col] - mean).abs() <= threshold * std]return selfdef normalize_data(self, columns, method='minmax'):"""數據標準化"""if self.data is None:print("沒有數據可處理")return selfif method == 'minmax':for col in columns:if col in self.data.columns:min_val = self.data[col].min()max_val = self.data[col].max()self.data[col] = (self.data[col] - min_val) / (max_val - min_val)elif method == 'zscore':for col in columns:if col in self.data.columns:mean = self.data[col].mean()std = self.data[col].std()self.data[col] = (self.data[col] - mean) / stdreturn selfdef get_processed_data(self):"""獲取處理后的數據"""return self.data## 4. 特征工程特征工程是機器學習模型性能的關鍵決定因素。在股票預測中,我們需要從原始價格數據中提取有價值的特征。### 4.1 技術指標計算技術指標是股票分析中常用的工具,可以揭示價格趨勢、動量和波動性等信息:```python
import numpy as np
import pandas as pd
import talibclass TechnicalIndicators:def __init__(self, data=None):self.data = datadef load_data(self, data):"""加載數據"""self.data = datareturn selfdef add_moving_averages(self, periods=[5, 10, 20, 50, 200]):"""添加移動平均線"""if self.data is None or 'Close' not in self.data.columns:print("數據不包含收盤價")return selffor period in periods:self.data[f'MA_{period}'] = self.data['Close'].rolling(window=period).mean()return selfdef add_exponential_moving_averages(self, periods=[5, 10, 20, 50, 200]):"""添加指數移動平均線"""if self.data is None or 'Close' not in self.data.columns:print("數據不包含收盤價")return selffor period in periods:self.data[f'EMA_{period}'] = self.data['Close'].ewm(span=period, adjust=False).mean()return selfdef add_rsi(self, periods=[14]):"""添加相對強弱指標(RSI)"""if self.data is None or 'Close' not in self.data.columns:print("數據不包含收盤價")return selffor period in periods:delta = self.data['Close'].diff()gain = delta.where(delta > 0, 0)loss = -delta.where(delta < 0, 0)avg_gain = gain.rolling(window=period).mean()avg_loss = loss.rolling(window=period).mean()rs = avg_gain / avg_lossself.data[f'RSI_{period}'] = 100 - (100 / (1 + rs))return selfdef add_macd(self, fast_period=12, slow_period=26, signal_period=9):"""添加MACD指標"""if self.data is None or 'Close' not in self.data.columns:print("數據不包含收盤價")return selfema_fast = self.data['Close'].ewm(span=fast_period, adjust=False).mean()ema_slow = self.data['Close'].ewm(span=slow_period, adjust=False).mean()self.data['MACD'] = ema_fast - ema_slowself.data['MACD_Signal'] = self.data['MACD'].ewm(span=signal_period, adjust=False).mean()self.data['MACD_Hist'] = self.data['MACD'] - self.data['MACD_Signal']return selfdef add_bollinger_bands(self, period=20, std_dev=2):"""添加布林帶指標"""if self.data is None or 'Close' not in self.data.columns:print("數據不包含收盤價")return selfself.data[f'BB_Middle_{period}'] = self.data['Close'].rolling(window=period).mean()self.data[f'BB_Std_{period}'] = self.data['Close'].rolling(window=period).std()self.data[f'BB_Upper_{period}'] = self.data[f'BB_Middle_{period}'] + std_dev * self.data[f'BB_Std_{period}']self.data[f'BB_Lower_{period}'] = self.data[f'BB_Middle_{period}'] - std_dev * self.data[f'BB_Std_{period}']return selfdef add_atr(self, period=14):"""添加平均真實范圍(ATR)指標"""if self.data is None or not all(col in self.data.columns for col in ['High', 'Low', 'Close']):print("數據不包含必要的價格列")return selfhigh_low = self.data['High'] - self.data['Low']high_close = (self.data['High'] - self.data['Close'].shift()).abs()low_close = (self.data['Low'] - self.data['Close'].shift()).abs()ranges = pd.concat([high_low, high_close, low_close], axis=1)true_range = ranges.max(axis=1)self.data[f'ATR_{period}'] = true_range.rolling(window=period).mean()return selfdef add_stochastic_oscillator(self, k_period=14, d_period=3):"""添加隨機指標"""if self.data is None or not all(col in self.data.columns for col in ['High', 'Low', 'Close']):print("數據不包含必要的價格列")return selflow_min = self.data['Low'].rolling(window=k_period).min()high_max = self.data['High'].rolling(window=k_period).max()self.data['%K'] = 100 * ((self.data['Close'] - low_min) / (high_max - low_min))self.data['%D'] = self.data['%K'].rolling(window=d_period).mean()return selfdef add_obv(self):"""添加能量潮(OBV)指標"""if self.data is None or not all(col in self.data.columns for col in ['Close', 'Volume']):print("數據不包含必要的價格和成交量列")return selfobv = [0]for i in range(1, len(self.data)):if self.data['Close'].iloc[i] > self.data['Close'].iloc[i-1]:obv.append(obv[-1] + self.data['Volume'].iloc[i])elif self.data['Close'].iloc[i] < self.data['Close'].iloc[i-1]:obv.append(obv[-1] - self.data['Volume'].iloc[i])else:obv.append(obv[-1])self.data['OBV'] = obvreturn selfdef get_data_with_indicators(self):"""獲取添加了技術指標的數據"""return self.data
4.2 特征選擇
股票數據可能包含大量特征,但并非所有特征都對預測有幫助。特征選擇可以提高模型性能并減少過擬合:
from sklearn.feature_selection import SelectKBest, f_regression, RFE
from sklearn.ensemble import RandomForestRegressorclass FeatureSelector:def __init__(self, data=None):self.data = dataself.selected_features = Nonedef load_data(self, data):"""加載數據"""self.data = datareturn selfdef prepare_data(self, target_col='Close', lag_periods=[1, 2, 3, 5, 10]):"""準備特征和目標變量,創建滯后特征"""if self.data is None:print("沒有數據可處理")return None, None# 創建目標變量(下一天的收盤價)self.data['Target'] = self.data[target_col].shift(-1)# 創建滯后特征for lag in lag_periods:for col in self.data.columns:if col != 'Target':self.data[f'{col}_Lag_{lag}'] = self.data[col].shift(lag)# 刪除包含NaN的行self.data = self.data.dropna()# 分離特征和目標X = self.data.drop(['Target'], axis=1)y = self.data['Target']return X, ydef select_k_best(self, X, y, k=10):"""使用F值統計量選擇最佳特征"""selector = SelectKBest(score_func=f_regression, k=k)selector.fit(X, y)# 獲取選中的特征cols = selector.get_support(indices=True)self.selected_features = X.columns[cols].tolist()return X[self.selected_features], self.selected_featuresdef select_with_rfe(self, X, y, n_features=10):"""使用遞歸特征消除法選擇特征"""estimator = RandomForestRegressor(n_estimators=100, random_state=42)selector = RFE(estimator, n_features_to_select=n_features)selector.fit(X, y)# 獲取選中的特征cols = selector.get_support(indices=True)self.selected_features = X.columns[cols].tolist()return X[self.selected_features], self.selected_featuresdef select_with_random_forest(self, X, y, threshold=0.01):"""使用隨機森林特征重要性選擇特征"""rf = RandomForestRegressor(n_estimators=100, random_state=42)rf.fit(X, y)# 獲取特征重要性importances = rf.feature_importances_indices = np.argsort(importances)[::-1]# 選擇重要性大于閾值的特征self.selected_features = [X.columns[i] for i in indices if importances[i] > threshold]return X[self.selected_features], self.selected_features
5. 模型實現
本系統實現了多種機器學習模型用于股票價格預測,包括傳統機器學習模型和深度學習模型。
5.1 數據準備
在訓練模型前,需要將數據分為訓練集和測試集:
from sklearn.model_selection import train_test_split
import numpy as npclass DataPreparation:def __init__(self, X=None, y=None):self.X = Xself.y = yself.X_train = Noneself.X_test = Noneself.y_train = Noneself.y_test = Nonedef load_data(self, X, y):"""加載特征和目標數據"""self.X = Xself.y = yreturn selfdef train_test_split(self, test_size=0.2, random_state=42):"""劃分訓練集和測試集"""if self.X is None or self.y is None:print("沒有數據可劃分")return selfself.X_train, self.X_test, self.y_train, self.y_test = train_test_split(self.X, self.y, test_size=test_size, random_state=random_state, shuffle=False)return selfdef time_series_split(self, test_size=0.2):"""按時間順序劃分訓練集和測試集"""if self.X is None or self.y is None:print("沒有數據可劃分")return self# 計算測試集大小test_index = int(len(self.X) * (1 - test_size))# 按時間順序劃分self.X_train = self.X.iloc[:test_index]self.X_test = self.X.iloc[test_index:]self.y_train = self.y.iloc[:test_index]self.y_test = self.y.iloc[test_index:]return selfdef prepare_lstm_data(self, time_steps=60):"""準備LSTM模型所需的時間序列數據"""if self.X is None or self.y is None:print("沒有數據可處理")return None, None, None, None# 將數據轉換為numpy數組X_values = self.X.valuesy_values = self.y.valuesX_lstm, y_lstm = [], []for i in range(time_steps, len(X_values)):X_lstm.append(X_values[i-time_steps:i])y_lstm.append(y_values[i])X_lstm, y_lstm = np.array(X_lstm), np.array(y_lstm)# 劃分訓練集和測試集train_size = int(len(X_lstm) * 0.8)X_train = X_lstm[:train_size]X_test = X_lstm[train_size:]y_train = y_lstm[:train_size]y_test = y_lstm[train_size:]return X_train, X_test, y_train, y_testdef get_train_test_data(self):"""獲取劃分后的訓練集和測試集"""return self.X_train, self.X_test, self.y_train, self.y_test
5.2 傳統機器學習模型
實現多種傳統機器學習模型用于股票價格預測:
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np
import joblibclass TraditionalModels:def __init__(self):self.models = {}self.best_model = Noneself.best_score = float('inf')def train_linear_regression(self, X_train, y_train):"""訓練線性回歸模型"""model = LinearRegression()model.fit(X_train, y_train)self.models['LinearRegression'] = modelreturn modeldef train_ridge_regression(self, X_train, y_train, alpha=1.0):"""訓練嶺回歸模型"""model = Ridge(alpha=alpha)model.fit(X_train, y_train)self.models['Ridge'] = modelreturn modeldef train_lasso_regression(self, X_train, y_train, alpha=0.1):"""訓練Lasso回歸模型"""model = Lasso(alpha=alpha)model.fit(X_train, y_train)self.models['Lasso'] = modelreturn modeldef train_random_forest(self, X_train, y_train, n_estimators=100, max_depth=None):"""訓練隨機森林模型"""model = RandomForestRegressor(n_estimators=n_estimators, max_depth=max_depth, random_state=42)model.fit(X_train, y_train)self.models['RandomForest'] = modelreturn modeldef train_gradient_boosting(self, X_train, y_train, n_estimators=100, learning_rate=0.1):"""訓練梯度提升樹模型"""model = GradientBoostingRegressor(n_estimators=n_estimators, learning_rate=learning_rate, random_state=42)model.fit(X_train, y_train)self.models['GradientBoosting'] = modelreturn modeldef train_svr(self, X_train, y_train, kernel='rbf', C=1.0, epsilon=0.1):"""訓練支持向量回歸模型"""model = SVR(kernel=kernel, C=C, epsilon=epsilon)model.fit(X_train, y_train)self.models['SVR'] = modelreturn modeldef train_all_models(self, X_train, y_train):"""訓練所有模型"""self.train_linear_regression(X_train, y_train)self.train_ridge_regression(X_train, y_train)self.train_lasso_regression(X_train, y_train)self.train_random_forest(X_train, y_train)self.train_gradient_boosting(X_train, y_train)self.train_svr(X_train, y_train)return self.modelsdef evaluate_model(self, model, X_test, y_test):"""評估模型性能"""y_pred = model.predict(X_test)mse = mean_squared_error(y_test, y_pred)rmse = np.sqrt(mse)mae = mean_absolute_error(y_test, y_pred)r2 = r2_score(y_test, y_pred)return {'MSE': mse,'RMSE': rmse,'MAE': mae,'R2': r2}def evaluate_all_models(self, X_test, y_test):"""評估所有模型性能"""results = {}for name, model in self.models.items():results[name] = self.evaluate_model(model, X_test, y_test)# 更新最佳模型if results[name]['RMSE'] < self.best_score:self.best_score = results[name]['RMSE']self.best_model = namereturn resultsdef save_model(self, model_name, file_path):"""保存模型"""if model_name in self.models:joblib.dump(self.models[model_name], file_path)print(f"模型已保存至{file_path}")else:print(f"模型{model_name}不存在")def load_model(self, model_name, file_path):"""加載模型"""try:model = joblib.load(file_path)self.models[model_name] = modelprint(f"模型已從{file_path}加載")return modelexcept Exception as e:print(f"加載模型時出錯: {e}")return Nonedef get_best_model(self):"""獲取性能最佳的模型"""if self.best_model is None:print("尚未評估模型性能")return Nonereturn self.models[self.best_model], self.best_model
5.3 深度學習模型
對于時間序列數據,深度學習模型尤其是LSTM和GRU等循環神經網絡具有顯著優勢:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model, Model
from tensorflow.keras.layers import Dense, LSTM, Dropout, GRU, Input, Bidirectional, Concatenate
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as pltclass DeepLearningModels:def __init__(self):self.models = {}self.best_model = Noneself.best_score = float('inf')self.scalers = {}def preprocess_data(self, X_train, X_test, y_train, y_test, feature_range=(0, 1)):"""數據預處理,對每個特征進行標準化"""# 對特征進行標準化X_scaler = MinMaxScaler(feature_range=feature_range)X_train_scaled = X_scaler.fit_transform(X_train)X_test_scaled = X_scaler.transform(X_test)# 對目標變量進行標準化y_scaler = MinMaxScaler(feature_range=feature_range)y_train_scaled = y_scaler.fit_transform(y_train.values.reshape(-1, 1))y_test_scaled = y_scaler.transform(y_test.values.reshape(-1, 1))# 保存縮放器供后續使用self.scalers['X'] = X_scalerself.scalers['y'] = y_scalerreturn X_train_scaled, X_test_scaled, y_train_scaled, y_test_scaleddef reshape_data_for_lstm(self, X_train, X_test):"""將數據重塑為LSTM所需的形狀 [samples, time_steps, features]"""# 假設每個樣本只有一個時間步X_train_reshaped = X_train.reshape(X_train.shape[0], 1, X_train.shape[1])X_test_reshaped = X_test.reshape(X_test.shape[0], 1, X_test.shape[1])return X_train_reshaped, X_test_reshapeddef build_lstm_model(self, input_shape, units=50, dropout=0.2):"""構建LSTM模型"""model = Sequential()model.add(LSTM(units=units, return_sequences=True, input_shape=input_shape))model.add(Dropout(dropout))model.add(LSTM(units=units, return_sequences=False))model.add(Dropout(dropout))model.add(Dense(units=25))model.add(Dense(units=1))model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')return modeldef build_gru_model(self, input_shape, units=50, dropout=0.2):"""構建GRU模型"""model = Sequential()model.add(GRU(units=units, return_sequences=True, input_shape=input_shape))model.add(Dropout(dropout))model.add(GRU(units=units, return_sequences=False))model.add(Dropout(dropout))model.add(Dense(units=25))model.add(Dense(units=1))model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')return modeldef build_bidirectional_lstm_model(self, input_shape, units=50, dropout=0.2):"""構建雙向LSTM模型"""model = Sequential()model.add(Bidirectional(LSTM(units=units, return_sequences=True), input_shape=input_shape))model.add(Dropout(dropout))model.add(Bidirectional(LSTM(units=units, return_sequences=False)))model.add(Dropout(dropout))model.add(Dense(units=25))model.add(Dense(units=1))model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')return modeldef train_model(self, model, X_train, y_train, X_val=None, y_val=None, epochs=100, batch_size=32, model_name=None):"""訓練深度學習模型"""callbacks = [EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)]if model_name:callbacks.append(ModelCheckpoint(f'{model_name}.h5', save_best_only=True))# 如果沒有提供驗證集,使用訓練集的20%作為驗證集if X_val is None or y_val is None:validation_split = 0.2validation_data = Noneelse:validation_split = 0.0validation_data = (X_val, y_val)history = model.fit(X_train, y_train,epochs=epochs,batch_size=batch_size,validation_split=validation_split,validation_data=validation_data,callbacks=callbacks,verbose=1)if model_name:self.models[model_name] = modelreturn model, historydef evaluate_model(self, model, X_test, y_test):"""評估深度學習模型性能"""# 預測y_pred = model.predict(X_test)# 如果數據經過了標準化,需要還原if 'y' in self.scalers:y_test = self.scalers['y'].inverse_transform(y_test)y_pred = self.scalers['y'].inverse_transform(y_pred)# 計算評估指標mse = np.mean(np.square(y_test - y_pred))rmse = np.sqrt(mse)mae = np.mean(np.abs(y_test - y_pred))# 計算R方ss_tot = np.sum(np.square(y_test - np.mean(y_test)))ss_res = np.sum(np.square(y_test - y_pred))r2 = 1 - (ss_res / ss_tot)return {'MSE': mse,'RMSE': rmse,'MAE': mae,'R2': r2}def predict_future(self, model, last_sequence, n_steps=30, scaler=None):"""預測未來n天的股票價格"""predictions = []current_sequence = last_sequence.copy()for _ in range(n_steps):# 預測下一個值current_pred = model.predict(current_sequence)[0][0]predictions.append(current_pred)# 更新序列用于下一次預測current_sequence = np.roll(current_sequence, -1, axis=1)current_sequence[0, -1, 0] = current_pred# 如果有縮放器,需要還原數據if scaler is not None:predictions = scaler.inverse_transform(np.array(predictions).reshape(-1, 1))return predictionsdef save_model(self, model_name, file_path):"""保存模型"""if model_name in self.models:self.models[model_name].save(file_path)print(f"模型已保存至{file_path}")else:print(f"模型{model_name}不存在")def load_model(self, model_name, file_path):"""加載模型"""try:model = load_model(file_path)self.models[model_name] = modelprint(f"模型已從{file_path}加載")return modelexcept Exception as e:print(f"加載模型時出錯: {e}")return Nonedef plot_training_history(self, history, title="模型訓練歷史"):"""繪制訓練過程中的損失曲線"""plt.figure(figsize=(12, 6))plt.plot(history.history['loss'], label='訓練集損失')plt.plot(history.history['val_loss'], label='驗證集損失')plt.title(title)plt.xlabel('迭代次數')plt.ylabel('損失')plt.legend()plt.grid(True)plt.show()
5.4 集成模型
通過集成多個模型的預測結果,可以進一步提高預測的準確性:
import numpy as np
from sklearn.ensemble import VotingRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_scoreclass EnsembleModel:def __init__(self):self.models = {}self.ensemble_model = Nonedef add_model(self, name, model):"""添加模型到集成中"""self.models[name] = modelreturn selfdef create_voting_ensemble(self, weights=None):"""創建投票集成模型"""if not self.models:print("沒有模型可以集成")return Noneestimators = [(name, model) for name, model in self.models.items()]self.ensemble_model = VotingRegressor(estimators=estimators, weights=weights)return self.ensemble_modeldef train_ensemble(self, X_train, y_train):"""訓練集成模型"""if self.ensemble_model is None:print("請先創建集成模型")return Noneself.ensemble_model.fit(X_train, y_train)return self.ensemble_modeldef weighted_average_prediction(self, X, weights=None):"""使用加權平均方式集成預測結果"""if not self.models:print("沒有模型可以集成")return Nonepredictions = []for name, model in self.models.items():pred = model.predict(X)predictions.append(pred)# 將預測結果轉換為數組predictions = np.array(predictions)# 如果沒有提供權重,使用平均值if weights is None:weights = np.ones(len(self.models)) / len(self.models)else:# 強制權重和為1weights = np.array(weights) / np.sum(weights)# 計算加權平均預測weighted_pred = np.sum(predictions * weights.reshape(-1, 1), axis=0)return weighted_preddef evaluate_ensemble(self, X_test, y_test):"""評估集成模型性能"""if self.ensemble_model is None:print("請先創建集成模型")return Noney_pred = self.ensemble_model.predict(X_test)mse = mean_squared_error(y_test, y_pred)rmse = np.sqrt(mse)mae = mean_absolute_error(y_test, y_pred)r2 = r2_score(y_test, y_pred)return {'MSE': mse,'RMSE': rmse,'MAE': mae,'R2': r2}def evaluate_weighted_ensemble(self, X_test, y_test, weights=None):"""評估加權集成模型性能"""y_pred = self.weighted_average_prediction(X_test, weights)mse = mean_squared_error(y_test, y_pred)rmse = np.sqrt(mse)mae = mean_absolute_error(y_test, y_pred)r2 = r2_score(y_test, y_pred)return {'MSE': mse,'RMSE': rmse,'MAE': mae,'R2': r2}
6. 數據可視化
數據可視化是股票預測分析系統的重要組成部分,可以直觀地展示原始數據、技術指標和預測結果。
6.1 原始數據可視化
使用Matplotlib和Plotly等庫可視化股票原始數據:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from datetime import datetime, timedeltaclass StockDataVisualizer:def __init__(self, data=None):self.data = datadef load_data(self, data):"""加載數據"""self.data = datareturn selfdef plot_stock_price(self, title="股票價格趨勢", figsize=(12, 6)):"""使用Matplotlib繪制股票價格趨勢圖"""if self.data is None or 'Close' not in self.data.columns:print("數據不包含收盤價")return Noneplt.figure(figsize=figsize)plt.plot(self.data.index, self.data['Close'], label='收盤價')# 設置日期格式plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))plt.gca().xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('價格')plt.legend()plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return pltdef plot_ohlc(self, title="股票OHLC圖", figsize=(12, 6)):"""使用Matplotlib繪制OHLC圖"""if self.data is None or not all(col in self.data.columns for col in ['Open', 'High', 'Low', 'Close']):print("數據不包含必要的價格列")return None# 創建圖形fig, ax = plt.subplots(figsize=figsize)# 計算柱形圖的寬度width = 0.6# 繪制價格柱形圖up = self.data[self.data['Close'] >= self.data['Open']]down = self.data[self.data['Close'] < self.data['Open']]# 繪制上漲柱形圖(綠色)ax.bar(up.index, up['Close'] - up['Open'], width, bottom=up['Open'], color='g')ax.bar(up.index, up['High'] - up['Close'], width/5, bottom=up['Close'], color='g')ax.bar(up.index, up['Open'] - up['Low'], width/5, bottom=up['Low'], color='g')# 繪制下跌柱形圖(紅色)ax.bar(down.index, down['Open'] - down['Close'], width, bottom=down['Close'], color='r')ax.bar(down.index, down['High'] - down['Open'], width/5, bottom=down['Open'], color='r')ax.bar(down.index, down['Close'] - down['Low'], width/5, bottom=down['Low'], color='r')# 設置日期格式ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))ax.xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('價格')plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return pltdef plot_candlestick_plotly(self, title="股票K線圖"):"""使用Plotly繪制交互式K線圖"""if self.data is None or not all(col in self.data.columns for col in ['Open', 'High', 'Low', 'Close']):print("數據不包含必要的價格列")return None# 創建K線圖fig = go.Figure(data=[go.Candlestick(x=self.data.index,open=self.data['Open'],high=self.data['High'],low=self.data['Low'],close=self.data['Close'],name='K線')])# 添加5日和20日移動平均線if len(self.data) >= 20:fig.add_trace(go.Scatter(x=self.data.index,y=self.data['Close'].rolling(window=5).mean(),line=dict(color='blue', width=1),name='5日移動平均線'))fig.add_trace(go.Scatter(x=self.data.index,y=self.data['Close'].rolling(window=20).mean(),line=dict(color='orange', width=1),name='20日移動平均線'))# 更新布局fig.update_layout(title=title,xaxis_title='日期',yaxis_title='價格',xaxis_rangeslider_visible=False,template='plotly_white')return figdef plot_volume(self, title="成交量分析", figsize=(12, 6)):"""繪制成交量圖"""if self.data is None or 'Volume' not in self.data.columns:print("數據不包含成交量")return Noneplt.figure(figsize=figsize)# 根據價格變化給成交量柱形圖著色if 'Close' in self.data.columns:colors = ['g' if close_price > open_price else 'r' for close_price, open_price in zip(self.data['Close'], self.data['Close'].shift(1))]else:colors = 'b'plt.bar(self.data.index, self.data['Volume'], color=colors, alpha=0.8)# 添加移動平均線if len(self.data) >= 20:plt.plot(self.data.index, self.data['Volume'].rolling(window=20).mean(), color='orange', label='20日平均成交量')# 設置日期格式plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))plt.gca().xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('成交量')plt.legend()plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return pltdef plot_technical_indicators(self, indicators, title="技術指標分析", figsize=(12, 8)):"""繪制技術指標圖"""if self.data is None:print("沒有數據可繪制")return None# 檢查指標是否存在for indicator in indicators:if indicator not in self.data.columns:print(f"指標{indicator}不存在")return None# 創建圖形fig, ax = plt.subplots(figsize=figsize)# 繪制收盤價if 'Close' in self.data.columns:ax.plot(self.data.index, self.data['Close'], label='收盤價', color='black')# 繪制指標colors = ['blue', 'green', 'red', 'purple', 'orange', 'brown', 'pink', 'gray', 'olive', 'cyan']for i, indicator in enumerate(indicators):ax.plot(self.data.index, self.data[indicator], label=indicator, color=colors[i % len(colors)])# 設置日期格式ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))ax.xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('值')plt.legend()plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return plt
6.2 預測結果可視化
將模型預測結果進行可視化,直觀展示預測效果:
class PredictionVisualizer:def __init__(self, actual_data=None, predicted_data=None):self.actual_data = actual_dataself.predicted_data = predicted_datadef load_data(self, actual_data, predicted_data):"""加載實際數據和預測數據"""self.actual_data = actual_dataself.predicted_data = predicted_datareturn selfdef plot_predictions(self, title="股票價格預測結果", figsize=(12, 6)):"""繪制預測結果與實際值對比圖"""if self.actual_data is None or self.predicted_data is None:print("數據不完整")return Noneplt.figure(figsize=figsize)# 繪制實際值plt.plot(self.actual_data.index, self.actual_data, label='實際值', color='blue')# 繪制預測值if isinstance(self.predicted_data, pd.Series) and self.predicted_data.index.equals(self.actual_data.index):plt.plot(self.predicted_data.index, self.predicted_data, label='預測值', color='red', linestyle='--')else:plt.plot(self.actual_data.index, self.predicted_data, label='預測值', color='red', linestyle='--')# 設置日期格式plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))plt.gca().xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('價格')plt.legend()plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return pltdef plot_future_predictions(self, historical_data, future_predictions, prediction_dates=None, title="未來股票價格預測", figsize=(12, 6)):"""繪制歷史數據和未來預測結果"""if historical_data is None or future_predictions is None:print("數據不完整")return Noneplt.figure(figsize=figsize)# 繪制歷史數據plt.plot(historical_data.index, historical_data, label='歷史數據', color='blue')# 生成預測日期(如果沒有提供)if prediction_dates is None:last_date = historical_data.index[-1]if isinstance(last_date, pd.Timestamp):prediction_dates = [last_date + timedelta(days=i+1) for i in range(len(future_predictions))]else:prediction_dates = range(len(historical_data), len(historical_data) + len(future_predictions))# 繪制預測數據plt.plot(prediction_dates, future_predictions, label='未來預測', color='red', linestyle='--')# 添加分隔線plt.axvline(x=historical_data.index[-1], color='green', linestyle='-', label='當前日期')# 設置日期格式(如果是日期類型)if isinstance(historical_data.index[0], pd.Timestamp):plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))plt.gca().xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('價格')plt.legend()plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return pltdef plot_model_comparison(self, actual_data, predictions_dict, title="模型預測效果對比", figsize=(12, 6)):"""繪制多個模型的預測結果對比圖"""if actual_data is None or not predictions_dict:print("數據不完整")return Noneplt.figure(figsize=figsize)# 繪制實際值plt.plot(actual_data.index, actual_data, label='實際值', color='black', linewidth=2)# 繪制各模型預測值colors = ['red', 'blue', 'green', 'purple', 'orange', 'brown', 'pink', 'gray']for i, (model_name, predictions) in enumerate(predictions_dict.items()):plt.plot(actual_data.index, predictions, label=f'{model_name}預測', color=colors[i % len(colors)], linestyle='--')# 設置日期格式plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))plt.gca().xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('價格')plt.legend()plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return pltdef plot_error_distribution(self, actual_data, predicted_data, title="預測誤差分布", figsize=(12, 6)):"""繪制預測誤差分布圖"""if actual_data is None or predicted_data is None:print("數據不完整")return None# 計算誤差errors = actual_data - predicted_dataplt.figure(figsize=figsize)# 繪制誤差直方圖plt.hist(errors, bins=30, alpha=0.7, color='blue')plt.title(title)plt.xlabel('預測誤差')plt.ylabel('頻次')plt.grid(True)plt.tight_layout()return plt