import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from pylab import mpl
mpl.rcParams['font.sans-serif']=['SimHei']
mpl.rcParams['axes.unicode_minus']=False#獲取交易日歷
import datetime
def get_cal_date(start,end):dates= ak.tool_trade_date_hist_sina()dates['trade_date'] = dates['trade_date'].apply(lambda x:datetime.datetime.strptime(str(x),"%Y-%m-%d"))
# start = datetime.datetime.strptime(start,"%Y-%m-%d")
# end =datetime.datetime.strptime(end,"%Y-%m-%d")dates = dates.loc[(dates['trade_date']>=start) & (dates['trade_date']<=end)]return dates
#獲取北向資金數據
def get_north_money(start,end):start = datetime.datetime.strptime(start,"%Y-%m-%d")end =datetime.datetime.strptime(end,"%Y-%m-%d")#獲取交易日歷dates=get_cal_date(start,end)#tushare限制流量,每次只能獲取300條記錄df= ak.stock_hsgt_north_net_flow_in_em(symbol="北上")df['value']=df['value'].astype(float)df['date']=df['date'].apply(lambda x:datetime.datetime.strptime(x,"%Y-%m-%d"))df = df.loc[(df['date']>=start) & (df['date']<=end)]return df
#獲取指數數據
def get_index_data(code,start,end):start = datetime.datetime.strptime(start,"%Y-%m-%d")end =datetime.datetime.strptime(end,"%Y-%m-%d")index_df = ak.stock_zh_index_daily(symbol=code)index_df['date']=index_df['date'].apply(lambda x:datetime.datetime.strptime(str(x),"%Y-%m-%d"))index_df = index_df.loc[(index_df['date']>=start) & (index_df['date']<=end)]#index_df.index=pd.to_datetime(index_df.date)#index_df=index_df.sort_index()return index_df
#獲取指數數據
#常用大盤指數
indexs={'上證綜指': 'sh000001','深證成指': 'sz399001','滬深300': 'sh000300','創業板指': 'sz399006','上證50': 'sh000016','中證500': 'sh000905','中小板指': 'sz399005','上證180': 'sh000010'}
start='2014-11-17'
end='2022-08-12'
index_data=pd.DataFrame()
for name,code in indexs.items():index_data[name]=get_index_data(code,start,end)['close']
index_data.tail()
#累計收益
(index_data/index_data.iloc[0]).plot(figsize=(14,6))
plt.title('A股指數累積收益率\n 2014-2020',size=15)
plt.show()
#將價格數據轉為收益率
all_ret=index_data/index_data.shift(1)-1
all_ret.tail()
north_data=get_north_money(start,end)
all_ret.reset_index()
#north_data.to_csv('north_data.csv')
#north_data=pd.read_csv('north_data',index_col=0,header=0)
all_data=pd.merge(all_ret,north_data,on='date')
all_data.rename(columns={'value':'北向資金'},inplace=True)
all_data.dropna(inplace=True)all_data.corr()
def North_Strategy(data,window,stdev_n,cost):'''輸入參數:data:包含北向資金和指數價格數據window:移動窗口stdev_n:幾倍標準差cost:手續費'''# 中軌df=data.copy().dropna()df['mid'] = df['北向資金'].rolling(window).mean()stdev = df['北向資金'].rolling(window).std()# 上下軌df['upper'] = df['mid'] + stdev_n * stdevdf['lower'] = df['mid'] - stdev_n * stdevdf['ret']=df.close/df.close.shift(1)-1df.dropna(inplace=True)#設計買賣信號#當日北向資金突破上軌線發出買入信號設置為1df.loc[df['北向資金']>df.upper, 'signal'] = 1#當日北向資金跌破下軌線發出賣出信號設置為0df.loc[df['北向資金']<df.lower, 'signal'] = 0df['position']=df['signal'].shift(1)df['position'].fillna(method='ffill',inplace=True)df['position'].fillna(0,inplace=True)#根據交易信號和倉位計算策略的每日收益率df.loc[df.index[0], 'capital_ret'] = 0#今天開盤新買入的position在今天的漲幅(扣除手續費)df.loc[df['position'] > df['position'].shift(1), 'capital_ret'] = \(df.close/ df.open-1) * (1- cost) #賣出同理df.loc[df['position'] < df['position'].shift(1), 'capital_ret'] = \(df.open / df.close.shift(1)-1) * (1-cost) # 當倉位不變時,當天的capital是當天的change * positiondf.loc[df['position'] == df['position'].shift(1), 'capital_ret'] = \df['ret'] * df['position']#計算標的、策略、指數的累計收益率df['策略凈值']=(df.capital_ret+1.0).cumprod()df['指數凈值']=(df.ret+1.0).cumprod()return df
def performance(df):df1 = df.loc[:,['ret','capital_ret']]# 計算每一年(月,周)股票,資金曲線的收益year_ret = df1.resample('A').apply(lambda x: (x + 1.0).prod() - 1.0)month_ret = df1.resample('M').apply(lambda x: (x + 1.0).prod() - 1.0)week_ret = df1.resample('W').apply(lambda x: (x + 1.0).prod() - 1.0)#去掉缺失值year_ret.dropna(inplace=True)month_ret.dropna(inplace=True)week_ret.dropna(inplace=True)# 計算策略的年(月,周)勝率year_win_rate = len(year_ret[year_ret['capital_ret'] > 0]) / len(year_ret[year_ret['capital_ret'] != 0])month_win_rate = len(month_ret[month_ret['capital_ret'] > 0]) / len(month_ret[month_ret['capital_ret'] != 0])week_win_rate = len(week_ret[week_ret['capital_ret'] > 0]) / len(week_ret[week_ret['capital_ret'] != 0])#計算總收益率、年化收益率和風險指標total_ret=df[['策略凈值','指數凈值']].iloc[-1]-1annual_ret=pow(1+total_ret,250/len(df1))-1dd=(df[['策略凈值','指數凈值']].cummax()-\df[['策略凈值','指數凈值']])/\df[['策略凈值','指數凈值']].cummax()d=dd.max()beta=df[['capital_ret','ret']].cov().iat[0,1]/df['ret'].var()alpha=(annual_ret['策略凈值']-annual_ret['指數凈值']*beta)exReturn=df['capital_ret']-0.03/250sharper_atio=np.sqrt(len(exReturn))*exReturn.mean()/exReturn.std()TA1=round(total_ret['策略凈值']*100,2)TA2=round(total_ret['指數凈值']*100,2)AR1=round(annual_ret['策略凈值']*100,2)AR2=round(annual_ret['指數凈值']*100,2)MD1=round(d['策略凈值']*100,2)MD2=round(d['指數凈值']*100,2)S=round(sharper_atio,2)#輸出結果print (f'策略年勝率為:{round(year_win_rate*100,2)}%' )print (f'策略月勝率為:{round(month_win_rate*100,2)}%' )print (f'策略周勝率為:{round(week_win_rate*100,2)}%' )print(f'總收益率: 策略:{TA1}%,滬深300:{TA2}%')print(f'年化收益率:策略:{AR1}%, 滬深300:{AR2}%')print(f'最大回撤: 策略:{MD1}%, 滬深300:{MD2}%')print(f'策略Alpha: {round(alpha,2)}, Beta:{round(beta,2)},夏普比率:{S}')
#對策略累計收益率進行可視化
def plot_performance(df,name):d1=df[['策略凈值','指數凈值','signal']]d1[['策略凈值','指數凈值']].plot(figsize=(15,7))for i in d1.index:v=d1['指數凈值'][i]if d1.signal[i]==1:plt.scatter(i, v, c='r')if d1.signal[i]==0:plt.scatter(i, v, c='g')plt.title(name+'—'+'北向資金擇時交易策略回測',size=15)plt.xlabel('')ax=plt.gca()ax.spines['right'].set_color('none')ax.spines['top'].set_color('none')plt.show()
#將上述函數整合成一個執行函數
def main(code='sh000300',start='2015-12-08',end='2020-08-12',window=252,stdev_n=1.5,cost=0.01):hs300=get_index_data(code,start,end)north_data=get_north_money(start,end)result_df=pd.merge(hs300,north_data,on='date')#print(result_df)result_df=result_df.set_index('date')result_df.rename(columns={'value':'北向資金'},inplace=True)result_df=result_df[['close','open','北向資金']].dropna()df=North_Strategy(result_df,window,stdev_n,cost)name=list (indexs.keys()) [list (indexs.values()).index (code)]print(f'回測標的:{name}指數')#print(df.head())#df.set_index('date')startDate=df.index[0].strftime('%Y%m%d')print(f'回測期間:{startDate}—{end}')performance(df)plot_performance(df,name)
main(code='sh000300')