跨市場聯動:基於美股隔日行情預測A股行業漲跌

語言: CN / TW / HK

隨著A股北上資金的不斷湧入,跨市場聯動性也愈發顯著,在內迴圈的同時也時刻受著外部重要市場行情波動的影響,美股作為全球市場,一絲風吹草動都對全球金融造成劇烈波動。本文將探索美股行情中的技術面因子對當天A股市場的行業影響,使用機器學習技術預測行業漲跌情況,並同基準滬深300指數作對比以說明實際效果。

美股因子資料生成

美股標的選擇

美股市場總有有高達數萬個股票、ETF標的,我們這裡選擇那些具有代表性的股票和ETF作為參考標的。

  • 道瓊斯工業平均指數成分股:是在美國證券交易所上市的30家著名公司的價格加權衡量股票市場指數。
  • 標普500指數成分股:美國上市公司總市值Top500,其成分股由400種工業股票、20種運輸業股票、40種公用事業股票和40種金融業股票組成。
  • 納斯達克100指數成分股:是美國納斯達克100支最大型本地及國際非金融類上市公司組成的股市指數,以市值作基礎,並以一些規則平衡較大市值股份造成的影響。
  • 大盤和行業指數ETF:
  • 大宗商品:白銀(SLV)、黃金(GLD)、金礦(GDX)、天然氣(UNG)、太陽能(TAN)、能源ETF(XLE)、商品指數(DBC)、油礦開採(XOP)、原油基金(USO)、油氣服務(OIH)
  • 大盤指數: 標普500(SPY)、道瓊斯指數(DIA)、納斯達克100(QQQ)、羅素2000(IWM)、恐慌指數(UVXY)、恐慌指數(VIXY)、價值股(VTV)、羅素1000成長(IWF) 中國大盤股(FXI)、中國海外網際網路(KWEB)、日本ETF(EWJ)、臺灣(EWT)、韓國(EWY)、澳大利亞(EWA)、香港(EWH)、滬深300(ASHR)、歐洲(VGK)、英國(EWU)、德國(EWG)、歐盟(EZU)、巴西(EWZ)
  • 債券:20年國債(TLT)、全債(AGG)、市政債(MUB)、通脹債(TIP)、債券指數(HYG)、短期國債(SHV)、公司債(LQD)、高價值債(JNK)、短期公司債(VCSH)、中期公司債(VCIT)、1-3年國債(SHY)、新興市場美元債(EMB)
  • 行業:金融(XLF)、生物(XBI)、半導體(SMH)、非必須消費品(XLY)、高科技(XLK)、醫療保健(XLV)、日常消費(XLP)、公共事業(XLU)、工業指數(XLI)、房地產(IYR)、不動產(VNQ)、原料(XLB)、區域銀行(KRE)、資訊科技(VGT)、航空業ETF(JETS)、農產品(DBA)、零售(XRT)、金屬礦業(XME)、房屋建築(XHB)
  • 匯率: 美元(UUP)
  • 中概股代表股:阿里巴巴(BABA)、臺積電(TSM)、京東(JD)、拼多多(PDD)、網易(NTES)、百度(BIDU)、理想汽車(LI)、蔚來(NIO)、小鵬汽車(XPEV)、百勝中國(YUMC)、百濟神州(BGNE)、貝殼(BEKE)、攜程(TCOM)、陸金所(LU)、嗶哩嘩啦(BILI)、騰訊音樂(TME)、富途(FUTU)、萬國資料(GDS)、微博(WB)、新東方(EDU)、愛奇藝(IQ)

程式碼:爬取指數成分股

```python import re, requests

def get_us_market_ticker(): headers = { 'authority': 'www.slickcharts.com', 'cache-control': 'max-age=0', 'upgrade-insecure-requests': '1', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,/;q=0.8,application/signed-exchange;v=b3;q=0.9', 'sec-fetch-site': 'none', 'sec-fetch-mode': 'navigate', 'sec-fetch-user': '?1', 'sec-fetch-dest': 'document', 'referer': 'http://www.google.com/', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7' }

sp500= requests.get('http://www.slickcharts.com/sp500', headers=headers)
nasdaq100 = requests.get('http://www.slickcharts.com/nasdaq100', headers=headers)
dowjones30 = requests.get('http://www.slickcharts.com/dowjones', headers=headers)

component_ticker = set(re.findall(r"/symbol/([A-Za-z\.]+)", sp500.text)) | set(re.findall(r"/symbol/([A-Za-z\.]+)", nasdaq100.text)) | set(re.findall(r"/symbol/([A-Za-z\.]+)", dowjones30.text))

etf_ticker = set(['SLV', 'GLD', 'GDX', 'UNG', 'TAN', 'XLE', 'DBC', 'XOP', 'USO', 'OIH', 'SPY', 'DIA', 'QQQ', 'IWM', 'UVXY', 'VIXY', 'VTV', 'IWF', 'FXI', 'KWEB', 'EWJ', 'EWT', 'EWY', 'EWA', 'EWH', 'ASHR', 'VGK', 'EWU', 'EWG', 'EZU', 'EWZ', 'TLT', 'AGG', 'MUB', 'TIP', 'HYG', 'SHV', 'LQD', 'JNK', 'VCSH', 'VCIT', 'SHY', 'EMB', 'XLF', 'XBI', 'SMH', 'XLY', 'XLK', 'XLV', 'XLP', 'XLU', 'XLI', 'IYR', 'VNQ', 'XLB', 'KRE', 'VGT', 'JETS', 'DBA', 'XRT', 'XME', 'XHB', 'UUP'])

cn_ticker = set(['BABA', 'TSM', 'JD', 'PDD', 'NTES', 'BIDU', 'LI', 'NIO', 'XPEV', 'YUMC', 'BGNE', 'BEKE', 'TCOM', 'LU', 'BILI', 'TME', 'FUTU', 'GDS', 'WB', 'EDU', 'IQ'])

ticker = component_ticker | etf_ticker | cn_ticker

return list(ticker)

```

可以獲得約600個待跟蹤標的。

技術因子生成

生成日線級別的技術因子,基於上篇介紹的pandas_ta,這裡直接給出相關程式碼,需要注意的是要對因子進行標準化,限制取值範圍為[-10, 10],避免極端值對後續模型的影響:

```python

技術指標: 日間

def make_tech_feature_daily(dayline_df, index_col='trade_day'): # 特徵 if dayline_df.index.name != index_col: dayline_df = dayline_df.set_index(index_col) df_len = len(dayline_df) if df_len < 2: feature_df = pd.DataFrame(index=dayline_df.index, columns = ['ADX_2', 'DMP_2', 'DMN_2', 'ADX_5', 'DMP_5', 'DMN_5', 'ADX_22', 'DMP_22', 'DMN_22', 'CCI_2', 'CCI_5', 'CCI_22', 'CMO_2', 'CMO_5', 'CMO_22', 'MACD_12_26_9', 'MACDh_12_26_9', 'MACDs_12_26_9', 'MACD_6_30_9', 'MACDh_6_30_9', 'MACDs_6_30_9', 'MACD_24_52_9', 'MACDh_24_52_9', 'MACDs_24_52_9', 'PPO_12_26_9', 'PPOh_12_26_9', 'PPOs_12_26_9', 'PPO_24_52_9', 'PPOh_24_52_9', 'PPOs_24_52_9', 'PVO_12_26_9', 'PVOh_12_26_9', 'PVOs_12_26_9', 'PVO_24_52_9', 'PVOh_24_52_9', 'PVOs_24_52_9', 'MFI_2', 'MFI_5', 'RSI_2', 'RSI_5', 'RSI_14', 'UO_5_15_30', 'WILLR_3', 'WILLR_5', 'WILLR_10', 'WILLR_20', 'K_9_3', 'D_9_3', 'J_9_3', 'K_19_3', 'D_19_3', 'J_19_3', 'NATR_3', 'NATR_10', 'LOGRET_10', 'PCTRET_1', 'PCTRET_2', 'PCTRET_3', 'PCTRET_4', 'PCTRET_5', 'ZS_5', 'ZS_14', 'RVI_5', 'RVI_14', 'rolling_money_3', 'rolling_money_5', 'rolling_money_10', 'rolling_volume_3', 'rolling_volume_5', 'rolling_volume_10', 'pct_volatility', 'rolling_pct_volatility_3', 'rolling_pct_volatility_5', 'rolling_pct_volatility_10']).fillna(0.0) feature_df.columns = ['daily_%s' % i for i in feature_df.columns]

    feature_df['code'] = dayline_df['code']
    feature_df = feature_df.reset_index().set_index('code').reset_index()
    return feature_df

## 平均趨向指數
try:
    adx_2 = (dayline_df.ta.adx(length=2) / 100).fillna(0.0)
    assert adx_2.columns.tolist() == ['ADX_2', 'DMP_2', 'DMN_2']
except:
    adx_2 = pd.DataFrame(index=dayline_df.index, columns=['ADX_2', 'DMP_2', 'DMN_2']).fillna(0.0)
try:
    adx_5 = (dayline_df.ta.adx(length=5) / 100).fillna(0.0)
    assert adx_5.columns.tolist() == ['ADX_5', 'DMP_5', 'DMN_5']
except:
    adx_5 = pd.DataFrame(index=dayline_df.index, columns=['ADX_5', 'DMP_5', 'DMN_5']).fillna(0.0)
try:
    adx_22 = (dayline_df.ta.adx(length=22) / 100).fillna(0.0)
    assert adx_22.columns.tolist() == ['ADX_22', 'DMP_22', 'DMN_22']
except:
    adx_22 = pd.DataFrame(index=dayline_df.index, columns=['ADX_22', 'DMP_22', 'DMN_22']).fillna(0.0)

## 順勢指標
try:
    cci_2 = (dayline_df.ta.cci(length=2) / 1000).to_frame().fillna(0.0).rename(columns={"CCI_2_0.015": "CCI_2"})
    assert cci_2.columns.tolist() == ['CCI_2']
except:
    cci_2 = pd.DataFrame(index=dayline_df.index, columns=['CCI_2']).fillna(0.0)
try:
    cci_5 = (dayline_df.ta.cci(length=5) / 1000).to_frame().fillna(0.0).rename(columns={"CCI_5_0.015": "CCI_5"})
    assert cci_5.columns.tolist() == ['CCI_5']
except:
    cci_5 = pd.DataFrame(index=dayline_df.index, columns=['CCI_5']).fillna(0.0)
try:
    cci_22 = (dayline_df.ta.cci(length=22) / 1000).to_frame().fillna(0.0).rename(columns={"CCI_22_0.015": "CCI_22"})
    assert cci_22.columns.tolist() == ['CCI_22']
except:
    cci_22 = pd.DataFrame(index=dayline_df.index, columns=['CCI_22']).fillna(0.0)

## 錢德動量擺動指標
try:
    cmo_2 = (dayline_df.ta.cmo(length=2) / 100).to_frame().fillna(0.0)
    assert cmo_2.columns.tolist() == ['CMO_2']
except:
    cmo_2 = pd.DataFrame(index=dayline_df.index, columns=['CMO_2']).fillna(0.0)
try:
    cmo_5 = (dayline_df.ta.cmo(length=5) / 100).to_frame().fillna(0.0)
    assert cmo_5.columns.tolist() == ['CMO_5']
except:
    cmo_5 = pd.DataFrame(index=dayline_df.index, columns=['CMO_5']).fillna(0.0)
try:
    cmo_22 = (dayline_df.ta.cmo(length=22) / 100).to_frame().fillna(0.0)
    assert cmo_22.columns.tolist() == ['CMO_22']
except:
    cmo_22 = pd.DataFrame(index=dayline_df.index, columns=['CMO_22']).fillna(0.0)

## 指數平滑移動平均線 MACD
try:
    macd_12_26_9 = dayline_df.ta.macd(12, 26, 9) 
    for k in macd_12_26_9:
        macd_12_26_9[k] = macd_12_26_9[k].div(dayline_df['close'].values) * 10
    macd_12_26_9 = macd_12_26_9.fillna(0.0)
    assert  macd_12_26_9.columns.tolist() == ['MACD_12_26_9', 'MACDh_12_26_9', 'MACDs_12_26_9']
except:
    macd_12_26_9 = pd.DataFrame(index=dayline_df.index, columns=['MACD_12_26_9', 'MACDh_12_26_9', 'MACDs_12_26_9']).fillna(0.0)

try:
    macd_6_30_9 = dayline_df.ta.macd(6, 30, 9) 
    for k in macd_6_30_9:
        macd_6_30_9[k] = macd_6_30_9[k].div(dayline_df['close'].values) * 10
    macd_6_30_9 = macd_6_30_9.fillna(0.0)
    assert  macd_6_30_9.columns.tolist() == ['MACD_6_30_9', 'MACDh_6_30_9', 'MACDs_6_30_9']
except:
    macd_6_30_9 = pd.DataFrame(index=dayline_df.index, columns=['MACD_6_30_9', 'MACDh_6_30_9', 'MACDs_6_30_9']).fillna(0.0)

try:
    macd_24_52_9 = dayline_df.ta.macd(24, 52, 9) 
    for k in macd_24_52_9:
        macd_24_52_9[k] = macd_24_52_9[k].div(dayline_df['close'].values) * 10
    macd_24_52_9 = macd_24_52_9.fillna(0.0)
    assert  macd_24_52_9.columns.tolist() == ['MACD_24_52_9', 'MACDh_24_52_9', 'MACDs_24_52_9']
except:
    macd_24_52_9 = pd.DataFrame(index=dayline_df.index, columns=['MACD_24_52_9', 'MACDh_24_52_9', 'MACDs_24_52_9']).fillna(0.0)

## 指數平滑移動平均線 PPO
try:
    ppo_12_26_9 = (dayline_df.ta.ppo(12, 26, 9) / 10).fillna(0.0)
    assert ppo_12_26_9.columns.tolist() == ['PPO_12_26_9', 'PPOh_12_26_9', 'PPOs_12_26_9']
except:
    ppo_12_26_9 = pd.DataFrame(index=dayline_df.index, columns=['PPO_12_26_9', 'PPOh_12_26_9', 'PPOs_12_26_9']).fillna(0.0)
try:
    ppo_24_52_9 = (dayline_df.ta.ppo(24, 52, 9) / 10).fillna(0.0)
    assert ppo_24_52_9.columns.tolist() == ['PPO_24_52_9', 'PPOh_24_52_9', 'PPOs_24_52_9']
except:
    ppo_24_52_9 = pd.DataFrame(index=dayline_df.index, columns=['PPO_24_52_9', 'PPOh_24_52_9', 'PPOs_24_52_9']).fillna(0.0)

try:
    pvo_12_26_9 = (dayline_df.ta.pvo(12, 26, 9) / 100).fillna(0.0)
    assert pvo_12_26_9.columns.tolist() == ['PVO_12_26_9', 'PVOh_12_26_9', 'PVOs_12_26_9']
except:
    pvo_12_26_9 = pd.DataFrame(index=dayline_df.index, columns=['PVO_12_26_9', 'PVOh_12_26_9', 'PVOs_12_26_9']).fillna(0.0)
try:
    pvo_24_52_9 = (dayline_df.ta.pvo(24, 52, 9) / 100).fillna(0.0)
    assert pvo_24_52_9.columns.tolist() == ['PVO_24_52_9', 'PVOh_24_52_9', 'PVOs_24_52_9']
except:
    pvo_24_52_9 = pd.DataFrame(index=dayline_df.index, columns=['PVO_24_52_9', 'PVOh_24_52_9', 'PVOs_24_52_9']).fillna(0.0)

try:
    mfi_2 = (dayline_df.ta.mfi(length=2) / 100).to_frame().fillna(0.5)
    assert mfi_2.columns.tolist() == ['MFI_2']
except:
    mfi_2 = pd.DataFrame(index=dayline_df.index, columns=['MFI_2']).fillna(0.5)
try:
    mfi_5 = (dayline_df.ta.mfi(length=5) / 100).to_frame().fillna(0.5)
    assert mfi_5.columns.tolist() == ['MFI_5']
except:
    mfi_5 = pd.DataFrame(index=dayline_df.index, columns=['MFI_5']).fillna(0.5)

try:
    rsi_2 = (dayline_df.ta.rsi(length=2) / 100).to_frame().fillna(0.5)
    assert rsi_2.columns.tolist() == ['RSI_2']
except:
    rsi_2 = pd.DataFrame(index=dayline_df.index, columns=['RSI_2']).fillna(0.5) 
try:
    rsi_5 = (dayline_df.ta.rsi(length=5) / 100).to_frame().fillna(0.5)
    assert rsi_5.columns.tolist() == ['RSI_5']
except:
    rsi_5 = pd.DataFrame(index=dayline_df.index, columns=['RSI_5']).fillna(0.5) 
try:
    rsi_14 = (dayline_df.ta.rsi(length=14) / 100).to_frame().fillna(0.5)
    assert rsi_14.columns.tolist() == ['RSI_14']
except:
    rsi_14 = pd.DataFrame(index=dayline_df.index, columns=['RSI_14']).fillna(0.5)

try:
    uo_5_15_30 = (dayline_df.ta.uo(5, 15, 30) / 100).to_frame().fillna(0.5)
    assert uo_5_15_30.columns.tolist() == ['UO_5_15_30']
except:
    uo_5_15_30 = pd.DataFrame(index=dayline_df.index, columns=['UO_5_15_30']).fillna(0.0)

try:
    willr_3 = (dayline_df.ta.willr(length=3) / 100).to_frame().fillna(-0.5)
    assert willr_3.columns.tolist() == ['WILLR_3']
except:
    willr_3 = pd.DataFrame(index=dayline_df.index, columns=['WILLR_3']).fillna(-0.5) 
try:
    willr_5 = (dayline_df.ta.willr(length=5) / 100).to_frame().fillna(-0.5)
    assert willr_5.columns.tolist() == ['WILLR_5']
except:
    willr_5 = pd.DataFrame(index=dayline_df.index, columns=['WILLR_5']).fillna(-0.5) 
try:
    willr_10 = (dayline_df.ta.willr(length=10) / 100).to_frame().fillna(-0.5)
    assert willr_10.columns.tolist() == ['WILLR_10']
except:
    willr_10 = pd.DataFrame(index=dayline_df.index, columns=['WILLR_10']).fillna(-0.5) 
try:
    willr_20 = (dayline_df.ta.willr(length=20) / 100).to_frame().fillna(-0.5)
    assert willr_20.columns.tolist() == ['WILLR_20']
except:
    willr_20 = pd.DataFrame(index=dayline_df.index, columns=['WILLR_20']).fillna(-0.5)

try:
    kdj_9_3 = (dayline_df.ta.kdj(9, 3) / 100).fillna(0.5)
    assert kdj_9_3.columns.tolist() == ['K_9_3', 'D_9_3', 'J_9_3']
except:
    kdj_9_3 = pd.DataFrame(index=dayline_df.index, columns=['K_9_3', 'D_9_3', 'J_9_3']).fillna(0.5)
try:
    kdj_19_3 = (dayline_df.ta.kdj(19, 3) / 100).fillna(0.5)
    assert kdj_19_3.columns.tolist() == ['K_19_3', 'D_19_3', 'J_19_3']
except:
    kdj_19_3 = pd.DataFrame(index=dayline_df.index, columns=['K_19_3', 'D_19_3', 'J_19_3']).fillna(0.5)

try:
    natr_3 = (dayline_df.ta.natr(length=3) / 10).to_frame().fillna(0.5)
    assert natr_3.columns.tolist() == ['NATR_3']
except:
    natr_3 = pd.DataFrame(index=dayline_df.index, columns=['NATR_3']).fillna(0.5) 
try:
    natr_10 = (dayline_df.ta.natr(length=10) / 10).to_frame().fillna(0.5)
    assert natr_10.columns.tolist() == ['NATR_10']
except:
    natr_10 = pd.DataFrame(index=dayline_df.index, columns=['NATR_10']).fillna(0.5)

try:
    log_return_10 = (dayline_df.ta.log_return(length=10) * 10 ).clip(-10, 10).fillna(0.0).to_frame()
    assert log_return_10.columns.tolist() == ['LOGRET_10']
except:
    log_return_10 = pd.DataFrame(index=dayline_df.index, columns=['LOGRET_10']).fillna(0.)

try:
    percent_return_1 = (dayline_df.ta.percent_return(length=1)).to_frame().fillna(0.0)
    assert percent_return_1.columns.tolist() == ['PCTRET_1']
except:
    percent_return_1 = pd.DataFrame(index=dayline_df.index, columns=['PCTRET_1']).fillna(0.0)
try:
    percent_return_2 = (dayline_df.ta.percent_return(length=2)).to_frame().fillna(0.0) 
    assert percent_return_2.columns.tolist() == ['PCTRET_2']
except:
    percent_return_2 = pd.DataFrame(index=dayline_df.index, columns=['PCTRET_2']).fillna(0.0)
try:
    percent_return_3 = (dayline_df.ta.percent_return(length=3)).to_frame().fillna(0.0)
    assert percent_return_3.columns.tolist() == ['PCTRET_3']
except:
    percent_return_3 = pd.DataFrame(index=dayline_df.index, columns=['PCTRET_3']).fillna(0.0)
try:
    percent_return_4 = (dayline_df.ta.percent_return(length=4)).to_frame().fillna(0.0)
    assert percent_return_4.columns.tolist() == ['PCTRET_4']
except:
    percent_return_4 = pd.DataFrame(index=dayline_df.index, columns=['PCTRET_4']).fillna(0.0)
try:
    percent_return_5 = (dayline_df.ta.percent_return(length=5)).to_frame().fillna(0.0)
    assert percent_return_5.columns.tolist() == ['PCTRET_5']
except:
    percent_return_5 = pd.DataFrame(index=dayline_df.index, columns=['PCTRET_5']).fillna(0.0)

try:
    zscore_5 = (dayline_df.ta.zscore(length=5)).to_frame().fillna(0.0)
    assert zscore_5.columns.tolist() == ['ZS_5']
except:
    zscore_5 = pd.DataFrame(index=dayline_df.index, columns=['ZS_5']).fillna(0.0)
try:
    zscore_14 = (dayline_df.ta.zscore(length=14)).to_frame().fillna(0.0)
    assert zscore_14.columns.tolist() == ['ZS_14']
except:
    zscore_14 = pd.DataFrame(index=dayline_df.index, columns=['ZS_14']).fillna(0.0)

try:
    rvi_5 =  (dayline_df.ta.rvi(length=5) / 100).fillna(0.5).to_frame()
    assert rvi_5.columns.tolist() == ['RVI_5']
except:
    rvi_5 = pd.DataFrame(index=dayline_df.index, columns=['RVI_5']).fillna(0.0)
try:
    rvi_14 =  (dayline_df.ta.rvi(length=14) / 100).fillna(0.5).to_frame()
    assert rvi_14.columns.tolist() == ['RVI_14']
except:
    rvi_14 = pd.DataFrame(index=dayline_df.index, columns=['RVI_14']).fillna(0.5)

rolling_money_3 = ((np.log1p((dayline_df['money']).rolling(3, min_periods=1).mean()) - np.log1p((dayline_df['money']))).clip(-10, 10)).to_frame().rename(columns={'money': 'rolling_money_3'})
rolling_money_5 = ((np.log1p((dayline_df['money']).rolling(5, min_periods=1).mean()) - np.log1p((dayline_df['money']))).clip(-10, 10)).to_frame().rename(columns={'money': 'rolling_money_5'})
rolling_money_10 = ((np.log1p((dayline_df['money']).rolling(10, min_periods=1).mean()) - np.log1p((dayline_df['money']))).clip(-10, 10)).to_frame().rename(columns={'money': 'rolling_money_10'})
rolling_volume_3 = ((np.log1p((dayline_df['volume']).rolling(3, min_periods=1).mean()) - np.log1p((dayline_df['volume']))).clip(-10, 10)).to_frame().rename(columns={'volume': 'rolling_volume_3'})
rolling_volume_5 = ((np.log1p((dayline_df['volume']).rolling(5, min_periods=1).mean()) - np.log1p((dayline_df['volume']))).clip(-10, 10)).to_frame().rename(columns={'volume': 'rolling_volume_5'})
rolling_volume_10 = ((np.log1p((dayline_df['volume']).rolling(10, min_periods=1).mean()) - np.log1p((dayline_df['volume']))).clip(-10, 10)).to_frame().rename(columns={'volume': 'rolling_volume_10'})
pct_volatility = ((dayline_df['high'] - dayline_df['low'])  / dayline_df['close'] * 20).clip(-10, 10).fillna(0.0).to_frame().rename(columns={0: 'pct_volatility'})

rolling_pct_volatility_3 = ((dayline_df['high'].rolling(3, min_periods=1).max() - dayline_df['low'].rolling(3, min_periods=1).min()) / dayline_df['close']* 20).clip(-10, 10).fillna(0.0).to_frame().rename(columns={0: 'rolling_pct_volatility_3'})
rolling_pct_volatility_5 = ((dayline_df['high'].rolling(5, min_periods=1).max() - dayline_df['low'].rolling(5, min_periods=1).min()) / dayline_df['close'] * 20).clip(-10, 10).fillna(0.0).to_frame().rename(columns={0: 'rolling_pct_volatility_5'})
rolling_pct_volatility_10 = ((dayline_df['high'].rolling(10, min_periods=1).max() - dayline_df['low'].rolling(10, min_periods=1).min()) / dayline_df['close'] * 20).clip(-10, 10).fillna(0.0).to_frame().rename(columns={0: 'rolling_pct_volatility_10'})


feature_df = pd.concat([adx_2, adx_5, adx_22, cci_2, cci_5, cci_22, cmo_2, cmo_5, cmo_22, macd_12_26_9, macd_6_30_9, macd_24_52_9, ppo_12_26_9, ppo_24_52_9, pvo_12_26_9, pvo_24_52_9, mfi_2, mfi_5, rsi_2, rsi_5, rsi_14, uo_5_15_30, willr_3, willr_5, willr_10, willr_20, kdj_9_3, kdj_19_3, natr_3, natr_10, log_return_10, percent_return_1, percent_return_2, percent_return_3, percent_return_4, percent_return_5, zscore_5, zscore_14, rvi_5, rvi_14, rolling_money_3, rolling_money_5, rolling_money_10, rolling_volume_3, rolling_volume_5, rolling_volume_10, pct_volatility, rolling_pct_volatility_3, rolling_pct_volatility_5, rolling_pct_volatility_10], axis=1)
feature_df.columns = ['daily_%s' % i for i in feature_df.columns]

feature_df['code'] = dayline_df['code']
feature_df = feature_df.reset_index().set_index('code').reset_index()
feature_df.iloc[:, 2:] = feature_df.iloc[:, 2:].clip(-10, 10).astype(np.float32)

return feature_df

```

A股因子資料和標籤生成

標籤生成

本次使用申萬行業分類作為建模目標,預測當天及次日的開盤、平均、收盤收益,即: - 今開/昨收:今日開盤相對昨天收盤的漲跌百分比(動量效應) - 今收/昨收:今日收盤相對昨天收盤的漲跌百分比(輪動效應) - 今均/昨收:今日平均價格相對昨天收盤的漲跌百分比(輪動效應) - 今收/今開:今日收盤相對今日開盤的漲跌百分比(日內趨勢) - 今均/今開:今日平均價格相對今日開盤的漲跌百分比(日內趨勢) - 明開/今開:明日開盤價格相對於今日開盤的漲跌百分比(短線效應) - 明收/今開:明日收盤價格相對於今日開盤的漲跌百分比(短線輪動) - 明均/今開:明日平均價格相對於今日開盤的漲跌百分比(短線輪動) - 明開/今均:明日開盤價格相對於今日平均的漲跌百分比(短線輪動) - 明均/今均:明日平均價格相對於今日平均的漲跌百分比(短線輪動)

標籤生成的過程可參考上篇利用pandas_ta自動提取技術面特徵,一個例子為: ```python

日內:平均相對於昨收的漲跌百分比(開盤9:15時間截斷)

sector_feature_intraday_avg_close = sector_feature_grouped.apply(lambda row: ((row['high'] + row['low']) / (2 * row.shift(1)['close']) - 1) * 100).clip(-10, 10).reset_index().rename(columns={0: 'intraday_avg_close'}).set_index(['trade_day', 'code'])

日間:第二天收盤相對於前一天開盤的漲跌百分比(開盤9:15時間截斷)

sector_feature_daily_close_open = sector_feature_grouped.apply(lambda row: (row.shift(-1)['close'] / row['open'] - 1) * 100).clip(-10, 10).reset_index().rename(columns={0: 'daily_close_open'}).set_index(['trade_day', 'code']) ```

最終,我們得到類似這樣的標籤資料:

trade_label.png

技術因子生成

這裡的技術因子同上美股市場,不再贅述。選擇的標的為申萬行業指數、滬深300成分股及其指數、中證500成分股及其指數,總共約1100多隻。

提取完美股和A股的技術因子後,需要將兩者(cn_trade_day、us_trade_day)合併,與標籤(trade_label_df)一起形成一個數據集。合併的規則是,以A股標籤為基準,選擇最近一個日期的A/美因子資料,考慮到部分標的資料可能存在的停牌、未上市、退市等特殊情況,還需要對技術因子生成一個掩碼,排除掉這些無效資料:

```python data = [] mask = [] for ind, row in tqdm(trade_label_df.iterrows(), total=len(trade_label_df)): fea_mask = [0] * (1119 + 607) trade_day = datetime.date(ind // 10000, (ind % 10000) // 100, ind % 100)

cn_last_day = [i for i in cn_trade_day if i < trade_day]
cn_fea = cn_df[cn_df.trade_day == cn_last_day[-1]]
cn_fea['code'] = cn_fea['code'] - 1
cn_fea_data = [0 ] * 74 * 1119
for row2 in cn_fea.values:
    cn_fea_data[74 * row2[0]: 74 * (row2[0] + 1)] = row2[2:]
    fea_mask[row2[0]] = 1

us_last_day = [i for i in us_trade_day if i < trade_day]
us_fea = us_df[us_df.trade_day == us_last_day[-1]]
us_fea['code'] = us_fea['code'] - 1300
us_fea_data = [0 ] * 74 * 607
for row2 in us_fea.values:
    us_fea_data[74 * row2[0]: 74 * (row2[0] + 1)] = row2[2:]
    fea_mask[row2[0] + 1119] = 1
data.append(cn_fea_data + us_fea_data)
mask.append(fea_mask)

X = np.array(data, dtype=np.float32) mask = np.array(mask, dtype=np.int32) ```

AI建模

訓練集為20220624之前的因子資料及其標籤,驗證集為20220624之後的15天資料,評估模型的實戰效果。

資料讀取

使用Pytorch的Dataset讀取提取的因子資料:

```python class TechDataset(Dataset): def init(self, label=None, mask=None, feature=None, seq_len=1726, output_size=223): self.label = label self.mask = mask self.feature = feature self.seq_len = seq_len self.output_size = output_size def len(self): return len(self.label) def getitem(self, idx): label = torch.tensor(self.label[idx].reshape((self.output_size, 10)), dtype=torch.float32) input_mask = torch.tensor(self.mask[idx], dtype=torch.bool) input_techs = torch.tensor(self.feature[idx].reshape((self.seq_len, 74)), dtype=torch.float32)

    return label, input_mask, input_techs

X = np.load('trade_X.npy') y = np.load('trade_y.npy') mask = np.load('trade_y_mask.npy')

train_X, valid_X, train_y, valid_y, train_mask_X, valid_mask_X = X[:-15], X[-15:], y[:-15], y[-15:], mask[:-15], mask[-15:]

train_dataset = TechDataset(seq_len=1726, output_size=223, label=train_y, mask=train_mask_X, feature=train_X) train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4)

valid_dataset = TechDataset(seq_len=1726, output_size=223, label=valid_y, mask=valid_mask_X, feature=valid_X) valid_dataloader = DataLoader(valid_dataset, batch_size=15, shuffle=False, num_workers=4) ```

結構設計

首先通過一個GLU單元將技術因子資料維度標準到64維,然後使用Transformer對因子資料建模,輸出一個Batch * Seq_len * Dim的向量,然後通過TechPredictor部分進行池化、分類,最終得到Batch * 223 * 10的結果,223為標籤種類,10為每類收益指標。

```python

class GatedLinearUnit(nn.Module): def init(self, input_size, hidden_layer_size, dropout_rate=None, activation = None):

    super(GatedLinearUnit, self).__init__()

    self.input_size = input_size
    self.hidden_layer_size = hidden_layer_size
    self.dropout_rate = dropout_rate
    self.activation_name = activation

    if self.dropout_rate:
        self.dropout = nn.Dropout(p=self.dropout_rate)

    self.W4 = nn.Linear(self.input_size, self.hidden_layer_size)
    self.W5 = nn.Linear(self.input_size, self.hidden_layer_size)

    if self.activation_name:
        self.activation = getattr(nn, self.activation_name)()

    self.sigmoid = nn.Sigmoid()

    self.init_weights()

def init_weights(self):
    for n, p in self.named_parameters():
        if 'bias' not in n:
            torch.nn.init.xavier_uniform_(p)
        elif 'bias' in n:
            torch.nn.init.zeros_(p)

def forward(self, x):
    if self.dropout_rate:
        x = self.dropout(x)

    if self.activation_name:
        output = self.sigmoid(self.W4(x)) * self.activation(self.W5(x))
    else:
        output = self.sigmoid(self.W4(x)) * self.W5(x)

    return output

class TechEncoder(nn.Module): def init(self, depth=1, dim=64, group_size=64, query_key_dim=32, attn_dropout=0.2): super().init() self.dim = dim self.tech_emb = GatedLinearUnit(74, dim) self.transformer_layers = nn.ModuleList([FLASH(dim=dim, group_size=group_size, query_key_dim=query_key_dim, causal=False, dropout=attn_dropout) for _ in range(depth)]) self.norm = nn.LayerNorm(dim)

def forward(self, input_techs, attention_mask):
    input_feature = self.tech_emb(input_techs) # Batch * seq_len  * dim
    for flash in self.transformer_layers:
        input_feature = flash(input_feature, mask=attention_mask)
    output = self.norm(input_feature) # Batch  * seq_len * dim

    return output, attention_mask

class TechPredictor(nn.Module): def init(self, depth=1, dim=64, seq_len=1726, output_size=223, class_num=10): super(TechPredictor, self).init() self.tech_encoder = TechEncoder(depth=depth, dim=dim) stride = np.floor(seq_len / output_size).astype(int) kernel_size = seq_len - (output_size - 1) * stride self.max_pooling = nn.MaxPool1d(kernel_size=kernel_size, stride=stride) self.classifier = nn.Linear(in_features=dim, out_features=class_num) def forward(self, input_techs, attention_mask): tech_feature, attention_mask = self.tech_encoder(input_techs, attention_mask) tech_feature = tech_feature * attention_mask.unsqueeze(-1) output = self.max_pooling(tech_feature.transpose(1, 2).contiguous()).transpose(1, 2).contiguous() # Batch * 223 * 128 output = self.classifier(output) # Batch * 223 * 10 return output

```

損失函式

直接使用均方誤差損失函式優化模型,同時也計算出pearson相關係數供參考:

python def calc_loss(y_true, y_pred): y_true = y_true.reshape(-1, 10) y_pred = y_pred.reshape(-1, 10) y_true_label = y_true - y_true.mean(dim=0, keepdim=True) y_pred_label = y_pred - y_pred.mean(dim=0, keepdim=True) loss = F.mse_loss(y_pred, y_true) pearson = torch.cosine_similarity(y_true_label ,y_pred_label, dim=0, eps=1e-6) return loss, pearson

訓練主流程

迭代30次,每次訓練完後評估效果: ```python model = TechPredictor(depth=1, dim=128, seq_len=1726, output_size=223, class_num=10)

model = model.to(device)
opt = optim.AdamW(model.parameters(), lr=2e-5)
step = 0

for epoch in range(0, 30):
    model.train()
    for ind, batch in enumerate(train_dataloader):
        label, input_mask, input_techs = batch
        logit = model(input_techs.to(device), input_mask.to(device))
        loss, pearson = calc_loss(label.to(device), logit)

        writer.add_scalars('train', {'loss':loss.item()}, step)

        loss.backward()
        opt.step()
        opt.zero_grad()

        step += 1
    model.eval()
    with torch.no_grad():
        loss_valid = {0: [], 1:[], 2:[], 3:[], 4:[], 5:[], 6:[], 7:[], 8:[], 9:[]}
        loss_mean = []
        for ind, batch in enumerate(valid_dataloader):
            label, input_mask, input_techs = batch
            logit = model(input_techs.to(device), input_mask.to(device))
            loss, pearson = calc_loss(label.to(device), logit)
            for ind, v in  enumerate(pearson.cpu().tolist()):
                loss_valid[ind].append(v)
            loss_mean.append(loss.item())         
    print("Epoch: %d, loss: %.4f" % (epoch, np.mean(loss_mean)))
    torch.save(model.state_dict(), model_dir + "/tech_model/TechPredictor2_%d.torch" % epoch)

```

結果分析

將驗證集上的預測結果取出,基準資料滬深300、中證500、創業板指的指標均取明均/今均,預測指標限定在次日收益('明均/今均', '明均/今開', '明收/今均', '明開/今開', '明開/今均')中,Top5收益表示取Top5的預測行業對應的平均收益,Top5預估值表示AI模型預測的均值,Top10類似。

| 日期 | 滬深300 | 中證500 | 創業板指 | Top5收益 | Top5預估值 | Top10收益 | Top10預估值 | |---------:|----------:|----------:|-----------:|-----------:|-------------:|------------:|--------------:| | 20220624 | 1.603 | 1.238 | 1.737 | 4.416 | 0.578 | 2.686 | 0.568 | | 20220627 | 0.205 | 0.461 | -1.069 | 2.752 | 0.515 | 1.88 | 0.5 | | 20220628 | 0.133 | -0.212 | -0.22 | 1.277 | 0.614 | 0.479 | 0.601 | | 20220629 | 0.179 | 0.054 | -0.046 | 0.406 | 0.526 | 0.19 | 0.521 | | 20220630 | 0.193 | 0.207 | -0.285 | 1.504 | 0.632 | 1.074 | 0.612 | | 20220701 | -0.295 | 0.191 | -0.151 | -0.218 | 0.858 | 0.207 | 0.817 | | 20220704 | 0.515 | 0.42 | 1.164 | 1.628 | 0.485 | 2.107 | 0.472 | | 20220705 | -1.026 | -0.983 | -0.231 | 0.218 | 0.485 | -0.1 | 0.472 | | 20220706 | -0.329 | 0.048 | 0.182 | -0.496 | 0.53 | -0.755 | 0.515 | | 20220707 | 0.654 | 0.247 | 0.946 | 0.906 | 0.471 | 1.522 | 0.45 | | 20220708 | -1.905 | -1.8 | -2.652 | -1.338 | 0.228 | -1.445 | 0.22 | | 20220711 | -0.799 | -0.609 | -1.36 | -0.425 | 0.327 | -0.922 | 0.31 | | 20220712 | -0.468 | -0.713 | -0.636 | -1.397 | 0.494 | -1.267 | 0.456 | | 20220713 | 0.214 | 0.79 | 2.588 | 0.666 | 0.583 | 1.286 | 0.57 | | 20220714 | -0.595 | -0.586 | 0.319 | -1.321 | 0.421 | -1.239 | 0.414 |

從結果上看Top5收益整體是優於基準指數的,平均收益達到0.5以上,如果做一次過濾,取預估值大於0.5,那麼平均收益可達1.2以上,可以起到一定的擇時、選股效果。

以6月24號為例,AI模型選擇出的Top5行業以及買賣時間策略,收益高達4個點以上:

| trade_day | sector | metric | actual | prediction | |------------:|:---------------|:----------|---------:|-------------:| | 20220624 | 申萬多業態零售 | 明均/今開 | 1.6 | 0.586 | | 20220624 | 申萬酒店 | 明均/今開 | 7.362 | 0.584 | | 20220624 | 申萬旅遊綜合 | 明均/今開 | 8.8 | 0.576 | | 20220624 | 申萬百貨 | 明均/今開 | 1.812 | 0.573 | | 20220624 | 申萬超市 | 明均/今開 | 2.508 | 0.572 |

推薦閱讀


歡迎關注我的公眾號“量化實戰”,原創技術文章第一時間推送。

qrcode.jpg