運行代碼,要求輸入股票代碼和名稱,其他參數可省略
import akshare as ak
import matplotlib.pyplot as plt
import pandas as pd
import mplfinance as mpf
import matplotlib.dates as mdates
import numpy as np
import os
from datetime import datetime, timedelta
plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei"]
plt.rcParams["axes.unicode_minus"] = False def get_stock_data(stock_code, start_date=None, end_date=None, adjust="qfq"):"""使用 AkShare 獲取股票數據參數:stock_code (str): 股票代碼,如 'sh000001' 或 '000001.SZ'start_date (str): 開始日期,格式 'YYYYMMDD',默認為 3 個月前end_date (str): 結束日期,格式 'YYYYMMDD',默認為今天adjust (str): 復權類型,'qfq' 為前復權,'hfq' 為后復權,None 為不復權返回:DataFrame: 包含股票數據的 DataFrame"""if end_date is None:end_date = datetime.now().strftime('%Y%m%d')if start_date is None:start_date = (datetime.now() - timedelta(days=90)).strftime('%Y%m%d')if not stock_code.startswith(('sh', 'sz')):market = 'sh' if stock_code.startswith(('6', '9')) else 'sz'stock_code = f'{market}{stock_code}'try:stock_data = ak.stock_zh_a_hist_tx(symbol=stock_code, start_date=start_date, end_date=end_date, adjust=adjust)volume_columns = ['成交量', '成交額', 'volume','amount']volume_col = next((col for col in volume_columns if col in stock_data.columns), None)if volume_col is None:print("警告: 數據中未找到成交量列,圖表可能不完整")stock_data['volume'] = 0 else:stock_data = stock_data.rename(columns={volume_col: 'volume'})stock_data = stock_data.rename(columns={'日期': 'date', '開盤': 'open', '收盤': 'close', '最高': 'high', '最低': 'low'})stock_data['date'] = pd.to_datetime(stock_data['date'])stock_data = stock_data.sort_values('date')required_columns = ['date', 'open', 'high', 'low', 'close', 'volume']missing_columns = [col for col in required_columns if col not in stock_data.columns]if missing_columns:print(f"錯誤: 數據缺少必要的列: {', '.join(missing_columns)}")return Nonereturn stock_dataexcept Exception as e:print(f"獲取股票數據時出錯: {e}")return Nonedef plot_stock_daily_movement(stock_data, stock_name="股票", save_path=None, ma_periods=None):"""繪制股票日變動情況,包括 K 線圖和成交量及均線參數:stock_data (DataFrame): 包含股票數據的 DataFrame,必須包含 'date', 'open', 'high', 'low', 'close', 'volume' 列stock_name (str): 股票名稱,用于圖表標題save_path (str): 圖表保存路徑,默認為 None 不保存ma_periods (list): 均線周期列表,默認為 [5, 10, 20]"""if stock_data is None or stock_data.empty:print("沒有數據可繪制圖表")returnif ma_periods is None:ma_periods = [5, 10, 20]plot_data = stock_data.copy()plot_data = plot_data.set_index('date')for period in ma_periods:column_name = f'MA{period}'plot_data[column_name] = plot_data['close'].rolling(window=period).mean()mc = mpf.make_marketcolors(up='r', down='g', inherit=True)ma_colors = ['blue', 'purple', 'orange', 'green', 'red', 'brown', 'gray']ma_colors = ma_colors[:len(ma_periods)] s = mpf.make_mpf_style(marketcolors=mc, gridstyle='--', y_on_right=False,rc={'font.family': ['SimHei', 'WenQuanYi Micro Hei', 'Heiti TC', 'Microsoft YaHei'],'lines.linewidth': 1.5 })fig, axes = mpf.plot(plot_data,type='candle',style=s,title=f'{stock_name} 日變動情況',ylabel='價格 (元)',volume=True,ylabel_lower='成交量 (手)',mav=tuple(ma_periods), returnfig=True,figsize=(14, 10),update_width_config=dict(candle_linewidth=1.2, candle_width=0.6, volume_width=0.8, ))fig.suptitle(f'{stock_name} 日變動情況', fontsize=16, y=0.98)ax = axes[0] for i, period in enumerate(ma_periods):ax.plot([], [], color=ma_colors[i], label=f'MA{period}', linewidth=1.5)ax.legend(loc='upper left')if save_path:save_dir = os.path.dirname(save_path)if not os.path.exists(save_dir):os.makedirs(save_dir)plt.savefig(save_path, dpi=300, bbox_inches='tight')print(f"圖表已保存至: {save_path}")plt.show()
if __name__ == "__main__":stock_code = input("請輸入股票代碼(例如 000001 或 sh000001): ").strip()stock_name = input("請輸入股票名稱(例如 平安銀行): ").strip()print("正在獲取股票數據...")stock_data = get_stock_data(stock_code)if stock_data is not None and not stock_data.empty:print(f"成功獲取 {len(stock_data)} 天的股票數據")print(f"數據列名: {', '.join(stock_data.columns)}")ma_input = input("請輸入均線周期(用逗號分隔,默認 5,10,20): ").strip()if ma_input:try:ma_periods = [int(p.strip()) for p in ma_input.split(',')]except ValueError:print("無效的均線周期,使用默認值")ma_periods = [5, 10, 20]else:ma_periods = [5, 10, 20]plot_stock_daily_movement(stock_data, stock_name, ma_periods=ma_periods)else:print("未能獲取股票數據,請檢查股票代碼是否正確")
